chris@1: /* chris@1: * SONEWS News Server chris@1: * see AUTHORS for the list of contributors chris@1: * chris@1: * This program is free software: you can redistribute it and/or modify chris@1: * it under the terms of the GNU General Public License as published by chris@1: * the Free Software Foundation, either version 3 of the License, or chris@1: * (at your option) any later version. chris@1: * chris@1: * This program is distributed in the hope that it will be useful, chris@1: * but WITHOUT ANY WARRANTY; without even the implied warranty of chris@1: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the chris@1: * GNU General Public License for more details. chris@1: * chris@1: * You should have received a copy of the GNU General Public License chris@1: * along with this program. If not, see . chris@1: */ chris@1: chris@1: package org.sonews.feed; chris@1: chris@1: import java.io.IOException; cli@22: import java.util.List; chris@1: import java.util.concurrent.ConcurrentLinkedQueue; cli@22: import org.sonews.daemon.AbstractDaemon; chris@3: import org.sonews.storage.Article; chris@3: import org.sonews.storage.Headers; cli@22: import org.sonews.storage.StorageBackendException; cli@22: import org.sonews.storage.StorageManager; chris@1: import org.sonews.util.Log; chris@1: import org.sonews.util.io.ArticleWriter; chris@1: chris@1: /** chris@1: * Pushes new articles to remote newsservers. This feeder sleeps until a new chris@1: * message is posted to the sonews instance. chris@1: * @author Christian Lins chris@1: * @since sonews/0.5.0 chris@1: */ cli@22: class PushFeeder extends AbstractDaemon chris@1: { chris@1: chris@1: private ConcurrentLinkedQueue
articleQueue = chris@1: new ConcurrentLinkedQueue
(); chris@1: chris@1: @Override chris@1: public void run() chris@1: { chris@1: while(isRunning()) chris@1: { chris@1: try chris@1: { chris@1: synchronized(this) chris@1: { chris@1: this.wait(); chris@1: } chris@1: cli@22: List subscriptions = StorageManager.current() cli@22: .getSubscriptions(FeedManager.TYPE_PUSH); cli@22: chris@1: Article article = this.articleQueue.poll(); chris@1: String[] groups = article.getHeader(Headers.NEWSGROUPS)[0].split(","); cli@15: Log.get().info("PushFeed: " + article.getMessageID()); cli@22: for(Subscription sub : subscriptions) chris@1: { chris@1: // Circle check chris@1: if(article.getHeader(Headers.PATH)[0].contains(sub.getHost())) chris@1: { cli@15: Log.get().info(article.getMessageID() + " skipped for host " cli@15: + sub.getHost()); chris@1: continue; chris@1: } chris@1: chris@1: try chris@1: { chris@1: for(String group : groups) chris@1: { chris@1: if(sub.getGroup().equals(group)) chris@1: { chris@1: // Delete headers that may cause problems chris@1: article.removeHeader(Headers.NNTP_POSTING_DATE); chris@1: article.removeHeader(Headers.NNTP_POSTING_HOST); chris@1: article.removeHeader(Headers.X_COMPLAINTS_TO); chris@1: article.removeHeader(Headers.X_TRACE); chris@1: article.removeHeader(Headers.XREF); chris@1: chris@1: // POST the message to remote server chris@1: ArticleWriter awriter = new ArticleWriter(sub.getHost(), sub.getPort()); chris@1: awriter.writeArticle(article); chris@1: break; chris@1: } chris@1: } chris@1: } chris@1: catch(IOException ex) chris@1: { cli@15: Log.get().warning(ex.toString()); chris@1: } chris@1: } chris@1: } cli@22: catch(StorageBackendException ex) cli@22: { cli@22: Log.get().severe(ex.toString()); cli@22: } chris@1: catch(InterruptedException ex) chris@1: { cli@15: Log.get().warning("PushFeeder interrupted: " + ex); chris@1: } chris@1: } chris@1: } chris@1: chris@1: public void queueForPush(Article article) chris@1: { chris@1: this.articleQueue.add(article); chris@1: synchronized(this) chris@1: { chris@1: this.notifyAll(); chris@1: } chris@1: } chris@1: chris@1: }