chris@1: /* chris@1: * SONEWS News Server chris@1: * see AUTHORS for the list of contributors chris@1: * chris@1: * This program is free software: you can redistribute it and/or modify chris@1: * it under the terms of the GNU General Public License as published by chris@1: * the Free Software Foundation, either version 3 of the License, or chris@1: * (at your option) any later version. chris@1: * chris@1: * This program is distributed in the hope that it will be useful, chris@1: * but WITHOUT ANY WARRANTY; without even the implied warranty of chris@1: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the chris@1: * GNU General Public License for more details. chris@1: * chris@1: * You should have received a copy of the GNU General Public License chris@1: * along with this program. If not, see . chris@1: */ chris@1: chris@1: package org.sonews.feed; chris@1: chris@1: import java.io.IOException; chris@1: import java.util.concurrent.ConcurrentLinkedQueue; chris@1: import org.sonews.daemon.storage.Article; chris@1: import org.sonews.daemon.storage.Headers; chris@1: import org.sonews.util.Log; chris@1: import org.sonews.util.io.ArticleWriter; chris@1: chris@1: /** chris@1: * Pushes new articles to remote newsservers. This feeder sleeps until a new chris@1: * message is posted to the sonews instance. chris@1: * @author Christian Lins chris@1: * @since sonews/0.5.0 chris@1: */ chris@1: class PushFeeder extends AbstractFeeder chris@1: { chris@1: chris@1: private ConcurrentLinkedQueue
articleQueue = chris@1: new ConcurrentLinkedQueue
(); chris@1: chris@1: @Override chris@1: public void run() chris@1: { chris@1: while(isRunning()) chris@1: { chris@1: try chris@1: { chris@1: synchronized(this) chris@1: { chris@1: this.wait(); chris@1: } chris@1: chris@1: Article article = this.articleQueue.poll(); chris@1: String[] groups = article.getHeader(Headers.NEWSGROUPS)[0].split(","); chris@1: Log.msg("PushFeed: " + article.getMessageID(), true); chris@1: for(Subscription sub : this.subscriptions) chris@1: { chris@1: // Circle check chris@1: if(article.getHeader(Headers.PATH)[0].contains(sub.getHost())) chris@1: { chris@1: Log.msg(article.getMessageID() + " skipped for host " chris@1: + sub.getHost(), true); chris@1: continue; chris@1: } chris@1: chris@1: try chris@1: { chris@1: for(String group : groups) chris@1: { chris@1: if(sub.getGroup().equals(group)) chris@1: { chris@1: // Delete headers that may cause problems chris@1: article.removeHeader(Headers.NNTP_POSTING_DATE); chris@1: article.removeHeader(Headers.NNTP_POSTING_HOST); chris@1: article.removeHeader(Headers.X_COMPLAINTS_TO); chris@1: article.removeHeader(Headers.X_TRACE); chris@1: article.removeHeader(Headers.XREF); chris@1: chris@1: // POST the message to remote server chris@1: ArticleWriter awriter = new ArticleWriter(sub.getHost(), sub.getPort()); chris@1: awriter.writeArticle(article); chris@1: break; chris@1: } chris@1: } chris@1: } chris@1: catch(IOException ex) chris@1: { chris@1: Log.msg(ex, false); chris@1: } chris@1: } chris@1: } chris@1: catch(InterruptedException ex) chris@1: { chris@1: Log.msg("PushFeeder interrupted.", true); chris@1: } chris@1: } chris@1: } chris@1: chris@1: public void queueForPush(Article article) chris@1: { chris@1: this.articleQueue.add(article); chris@1: synchronized(this) chris@1: { chris@1: this.notifyAll(); chris@1: } chris@1: } chris@1: chris@1: }