chris@1: /*
chris@1: * SONEWS News Server
chris@1: * see AUTHORS for the list of contributors
chris@1: *
chris@1: * This program is free software: you can redistribute it and/or modify
chris@1: * it under the terms of the GNU General Public License as published by
chris@1: * the Free Software Foundation, either version 3 of the License, or
chris@1: * (at your option) any later version.
chris@1: *
chris@1: * This program is distributed in the hope that it will be useful,
chris@1: * but WITHOUT ANY WARRANTY; without even the implied warranty of
chris@1: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
chris@1: * GNU General Public License for more details.
chris@1: *
chris@1: * You should have received a copy of the GNU General Public License
chris@1: * along with this program. If not, see .
chris@1: */
chris@1:
chris@1: package org.sonews.feed;
chris@1:
chris@1: import java.io.IOException;
cli@22: import java.util.List;
chris@1: import java.util.concurrent.ConcurrentLinkedQueue;
cli@22: import org.sonews.daemon.AbstractDaemon;
chris@3: import org.sonews.storage.Article;
chris@3: import org.sonews.storage.Headers;
cli@22: import org.sonews.storage.StorageBackendException;
cli@22: import org.sonews.storage.StorageManager;
chris@1: import org.sonews.util.Log;
chris@1: import org.sonews.util.io.ArticleWriter;
chris@1:
chris@1: /**
chris@1: * Pushes new articles to remote newsservers. This feeder sleeps until a new
chris@1: * message is posted to the sonews instance.
chris@1: * @author Christian Lins
chris@1: * @since sonews/0.5.0
chris@1: */
cli@22: class PushFeeder extends AbstractDaemon
chris@1: {
cli@22:
cli@37: private ConcurrentLinkedQueue articleQueue =
cli@37: new ConcurrentLinkedQueue();
chris@1:
cli@37: @Override
cli@37: public void run()
cli@37: {
cli@37: while (isRunning()) {
cli@37: try {
cli@37: synchronized (this) {
cli@37: this.wait();
cli@37: }
cli@37:
cli@37: List subscriptions = StorageManager.current().getSubscriptions(FeedManager.TYPE_PUSH);
cli@37:
cli@37: Article article = this.articleQueue.poll();
cli@37: String[] groups = article.getHeader(Headers.NEWSGROUPS)[0].split(",");
cli@37: Log.get().info("PushFeed: " + article.getMessageID());
cli@37: for (Subscription sub : subscriptions) {
cli@37: // Circle check
cli@37: if (article.getHeader(Headers.PATH)[0].contains(sub.getHost())) {
cli@37: Log.get().info(article.getMessageID() + " skipped for host "
cli@37: + sub.getHost());
cli@37: continue;
cli@37: }
cli@37:
cli@37: try {
cli@37: for (String group : groups) {
cli@37: if (sub.getGroup().equals(group)) {
cli@37: // Delete headers that may cause problems
cli@37: article.removeHeader(Headers.NNTP_POSTING_DATE);
cli@37: article.removeHeader(Headers.NNTP_POSTING_HOST);
cli@37: article.removeHeader(Headers.X_COMPLAINTS_TO);
cli@37: article.removeHeader(Headers.X_TRACE);
cli@37: article.removeHeader(Headers.XREF);
cli@37:
cli@37: // POST the message to remote server
cli@37: ArticleWriter awriter = new ArticleWriter(sub.getHost(), sub.getPort());
cli@37: awriter.writeArticle(article);
cli@37: break;
cli@37: }
cli@37: }
cli@37: } catch (IOException ex) {
cli@37: Log.get().warning(ex.toString());
cli@37: }
cli@37: }
cli@37: } catch (StorageBackendException ex) {
cli@37: Log.get().severe(ex.toString());
cli@37: } catch (InterruptedException ex) {
cli@37: Log.get().warning("PushFeeder interrupted: " + ex);
cli@37: }
cli@37: }
cli@37: }
cli@37:
cli@37: public void queueForPush(Article article)
cli@37: {
cli@37: this.articleQueue.add(article);
cli@37: synchronized (this) {
cli@37: this.notifyAll();
cli@37: }
cli@37: }
chris@1: }