chris@1: /*
chris@1: * SONEWS News Server
chris@1: * see AUTHORS for the list of contributors
chris@1: *
chris@1: * This program is free software: you can redistribute it and/or modify
chris@1: * it under the terms of the GNU General Public License as published by
chris@1: * the Free Software Foundation, either version 3 of the License, or
chris@1: * (at your option) any later version.
chris@1: *
chris@1: * This program is distributed in the hope that it will be useful,
chris@1: * but WITHOUT ANY WARRANTY; without even the implied warranty of
chris@1: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
chris@1: * GNU General Public License for more details.
chris@1: *
chris@1: * You should have received a copy of the GNU General Public License
chris@1: * along with this program. If not, see .
chris@1: */
chris@1:
chris@1: package org.sonews.feed;
chris@1:
chris@1: import java.io.IOException;
chris@1: import java.util.concurrent.ConcurrentLinkedQueue;
chris@3: import org.sonews.storage.Article;
chris@3: import org.sonews.storage.Headers;
chris@1: import org.sonews.util.Log;
chris@1: import org.sonews.util.io.ArticleWriter;
chris@1:
chris@1: /**
chris@1: * Pushes new articles to remote newsservers. This feeder sleeps until a new
chris@1: * message is posted to the sonews instance.
chris@1: * @author Christian Lins
chris@1: * @since sonews/0.5.0
chris@1: */
chris@1: class PushFeeder extends AbstractFeeder
chris@1: {
chris@1:
chris@1: private ConcurrentLinkedQueue articleQueue =
chris@1: new ConcurrentLinkedQueue();
chris@1:
chris@1: @Override
chris@1: public void run()
chris@1: {
chris@1: while(isRunning())
chris@1: {
chris@1: try
chris@1: {
chris@1: synchronized(this)
chris@1: {
chris@1: this.wait();
chris@1: }
chris@1:
chris@1: Article article = this.articleQueue.poll();
chris@1: String[] groups = article.getHeader(Headers.NEWSGROUPS)[0].split(",");
cli@15: Log.get().info("PushFeed: " + article.getMessageID());
chris@1: for(Subscription sub : this.subscriptions)
chris@1: {
chris@1: // Circle check
chris@1: if(article.getHeader(Headers.PATH)[0].contains(sub.getHost()))
chris@1: {
cli@15: Log.get().info(article.getMessageID() + " skipped for host "
cli@15: + sub.getHost());
chris@1: continue;
chris@1: }
chris@1:
chris@1: try
chris@1: {
chris@1: for(String group : groups)
chris@1: {
chris@1: if(sub.getGroup().equals(group))
chris@1: {
chris@1: // Delete headers that may cause problems
chris@1: article.removeHeader(Headers.NNTP_POSTING_DATE);
chris@1: article.removeHeader(Headers.NNTP_POSTING_HOST);
chris@1: article.removeHeader(Headers.X_COMPLAINTS_TO);
chris@1: article.removeHeader(Headers.X_TRACE);
chris@1: article.removeHeader(Headers.XREF);
chris@1:
chris@1: // POST the message to remote server
chris@1: ArticleWriter awriter = new ArticleWriter(sub.getHost(), sub.getPort());
chris@1: awriter.writeArticle(article);
chris@1: break;
chris@1: }
chris@1: }
chris@1: }
chris@1: catch(IOException ex)
chris@1: {
cli@15: Log.get().warning(ex.toString());
chris@1: }
chris@1: }
chris@1: }
chris@1: catch(InterruptedException ex)
chris@1: {
cli@15: Log.get().warning("PushFeeder interrupted: " + ex);
chris@1: }
chris@1: }
chris@1: }
chris@1:
chris@1: public void queueForPush(Article article)
chris@1: {
chris@1: this.articleQueue.add(article);
chris@1: synchronized(this)
chris@1: {
chris@1: this.notifyAll();
chris@1: }
chris@1: }
chris@1:
chris@1: }