org/sonews/feed/PushFeeder.java
author cli
Tue, 27 Apr 2010 22:11:30 +0200
changeset 27 d879bab39600
parent 22 2541bdb54cb2
permissions -rw-r--r--
Fix for #567 "mailinglist gateway does not recover after database outage".
     1 /*
     2  *   SONEWS News Server
     3  *   see AUTHORS for the list of contributors
     4  *
     5  *   This program is free software: you can redistribute it and/or modify
     6  *   it under the terms of the GNU General Public License as published by
     7  *   the Free Software Foundation, either version 3 of the License, or
     8  *   (at your option) any later version.
     9  *
    10  *   This program is distributed in the hope that it will be useful,
    11  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
    12  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    13  *   GNU General Public License for more details.
    14  *
    15  *   You should have received a copy of the GNU General Public License
    16  *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
    17  */
    18 
    19 package org.sonews.feed;
    20 
    21 import java.io.IOException;
    22 import java.util.List;
    23 import java.util.concurrent.ConcurrentLinkedQueue;
    24 import org.sonews.daemon.AbstractDaemon;
    25 import org.sonews.storage.Article;
    26 import org.sonews.storage.Headers;
    27 import org.sonews.storage.StorageBackendException;
    28 import org.sonews.storage.StorageManager;
    29 import org.sonews.util.Log;
    30 import org.sonews.util.io.ArticleWriter;
    31 
    32 /**
    33  * Pushes new articles to remote newsservers. This feeder sleeps until a new
    34  * message is posted to the sonews instance.
    35  * @author Christian Lins
    36  * @since sonews/0.5.0
    37  */
    38 class PushFeeder extends AbstractDaemon
    39 {
    40   
    41   private ConcurrentLinkedQueue<Article> articleQueue = 
    42     new ConcurrentLinkedQueue<Article>();
    43   
    44   @Override
    45   public void run()
    46   {
    47     while(isRunning())
    48     {
    49       try
    50       {
    51         synchronized(this)
    52         {
    53           this.wait();
    54         }
    55         
    56         List<Subscription> subscriptions = StorageManager.current()
    57           .getSubscriptions(FeedManager.TYPE_PUSH);
    58 
    59         Article  article = this.articleQueue.poll();
    60         String[] groups  = article.getHeader(Headers.NEWSGROUPS)[0].split(",");
    61         Log.get().info("PushFeed: " + article.getMessageID());
    62         for(Subscription sub : subscriptions)
    63         {
    64           // Circle check
    65           if(article.getHeader(Headers.PATH)[0].contains(sub.getHost()))
    66           {
    67             Log.get().info(article.getMessageID() + " skipped for host "
    68               + sub.getHost());
    69             continue;
    70           }
    71 
    72           try
    73           {
    74             for(String group : groups)
    75             {
    76               if(sub.getGroup().equals(group))
    77               {
    78                 // Delete headers that may cause problems
    79                 article.removeHeader(Headers.NNTP_POSTING_DATE);
    80                 article.removeHeader(Headers.NNTP_POSTING_HOST);
    81                 article.removeHeader(Headers.X_COMPLAINTS_TO);
    82                 article.removeHeader(Headers.X_TRACE);
    83                 article.removeHeader(Headers.XREF);
    84                 
    85                 // POST the message to remote server
    86                 ArticleWriter awriter = new ArticleWriter(sub.getHost(), sub.getPort());
    87                 awriter.writeArticle(article);
    88                 break;
    89               }
    90             }
    91           }
    92           catch(IOException ex)
    93           {
    94             Log.get().warning(ex.toString());
    95           }
    96         }
    97       }
    98       catch(StorageBackendException ex)
    99       {
   100         Log.get().severe(ex.toString());
   101       }
   102       catch(InterruptedException ex)
   103       {
   104         Log.get().warning("PushFeeder interrupted: " + ex);
   105       }
   106     }
   107   }
   108   
   109   public void queueForPush(Article article)
   110   {
   111     this.articleQueue.add(article);
   112     synchronized(this)
   113     {
   114       this.notifyAll();
   115     }
   116   }
   117   
   118 }