org/sonews/feed/PushFeeder.java
author chris <chris@marvin>
Fri, 26 Jun 2009 16:48:50 +0200
changeset 1 6fceb66e1ad7
child 3 2fdc9cc89502
permissions -rw-r--r--
Hooray... sonews/0.5.0 final

HG: Enter commit message. Lines beginning with 'HG:' are removed.
HG: Remove all lines to abort the collapse operation.
     1 /*
     2  *   SONEWS News Server
     3  *   see AUTHORS for the list of contributors
     4  *
     5  *   This program is free software: you can redistribute it and/or modify
     6  *   it under the terms of the GNU General Public License as published by
     7  *   the Free Software Foundation, either version 3 of the License, or
     8  *   (at your option) any later version.
     9  *
    10  *   This program is distributed in the hope that it will be useful,
    11  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
    12  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    13  *   GNU General Public License for more details.
    14  *
    15  *   You should have received a copy of the GNU General Public License
    16  *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
    17  */
    18 
    19 package org.sonews.feed;
    20 
    21 import java.io.IOException;
    22 import java.util.concurrent.ConcurrentLinkedQueue;
    23 import org.sonews.daemon.storage.Article;
    24 import org.sonews.daemon.storage.Headers;
    25 import org.sonews.util.Log;
    26 import org.sonews.util.io.ArticleWriter;
    27 
    28 /**
    29  * Pushes new articles to remote newsservers. This feeder sleeps until a new
    30  * message is posted to the sonews instance.
    31  * @author Christian Lins
    32  * @since sonews/0.5.0
    33  */
    34 class PushFeeder extends AbstractFeeder
    35 {
    36   
    37   private ConcurrentLinkedQueue<Article> articleQueue = 
    38     new ConcurrentLinkedQueue<Article>();
    39   
    40   @Override
    41   public void run()
    42   {
    43     while(isRunning())
    44     {
    45       try
    46       {
    47         synchronized(this)
    48         {
    49           this.wait();
    50         }
    51         
    52         Article  article = this.articleQueue.poll();
    53         String[] groups  = article.getHeader(Headers.NEWSGROUPS)[0].split(",");
    54         Log.msg("PushFeed: " + article.getMessageID(), true);
    55         for(Subscription sub : this.subscriptions)
    56         {
    57           // Circle check
    58           if(article.getHeader(Headers.PATH)[0].contains(sub.getHost()))
    59           {
    60             Log.msg(article.getMessageID() + " skipped for host " 
    61               + sub.getHost(), true);
    62             continue;
    63           }
    64 
    65           try
    66           {
    67             for(String group : groups)
    68             {
    69               if(sub.getGroup().equals(group))
    70               {
    71                 // Delete headers that may cause problems
    72                 article.removeHeader(Headers.NNTP_POSTING_DATE);
    73                 article.removeHeader(Headers.NNTP_POSTING_HOST);
    74                 article.removeHeader(Headers.X_COMPLAINTS_TO);
    75                 article.removeHeader(Headers.X_TRACE);
    76                 article.removeHeader(Headers.XREF);
    77                 
    78                 // POST the message to remote server
    79                 ArticleWriter awriter = new ArticleWriter(sub.getHost(), sub.getPort());
    80                 awriter.writeArticle(article);
    81                 break;
    82               }
    83             }
    84           }
    85           catch(IOException ex)
    86           {
    87             Log.msg(ex, false);
    88           }
    89         }
    90       }
    91       catch(InterruptedException ex)
    92       {
    93         Log.msg("PushFeeder interrupted.", true);
    94       }
    95     }
    96   }
    97   
    98   public void queueForPush(Article article)
    99   {
   100     this.articleQueue.add(article);
   101     synchronized(this)
   102     {
   103       this.notifyAll();
   104     }
   105   }
   106   
   107 }