org/sonews/feed/PullFeeder.java
author chris <chris@marvin>
Thu, 06 Aug 2009 18:34:10 +0200
changeset 5 ad210aa137a4
parent 3 2fdc9cc89502
child 7 0b76e099eb96
permissions -rw-r--r--
Removing tags as they point to invalid changesets after history refactoring.
     1 /*
     2  *   SONEWS News Server
     3  *   see AUTHORS for the list of contributors
     4  *
     5  *   This program is free software: you can redistribute it and/or modify
     6  *   it under the terms of the GNU General Public License as published by
     7  *   the Free Software Foundation, either version 3 of the License, or
     8  *   (at your option) any later version.
     9  *
    10  *   This program is distributed in the hope that it will be useful,
    11  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
    12  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    13  *   GNU General Public License for more details.
    14  *
    15  *   You should have received a copy of the GNU General Public License
    16  *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
    17  */
    18 
    19 package org.sonews.feed;
    20 
    21 import java.io.BufferedReader;
    22 import java.io.IOException;
    23 import java.io.InputStreamReader;
    24 import java.io.PrintWriter;
    25 import java.net.Socket;
    26 import java.net.SocketException;
    27 import java.net.UnknownHostException;
    28 import java.util.ArrayList;
    29 import java.util.HashMap;
    30 import java.util.List;
    31 import java.util.Map;
    32 import org.sonews.config.Config;
    33 import org.sonews.util.Log;
    34 import org.sonews.storage.StorageBackendException;
    35 import org.sonews.storage.StorageManager;
    36 import org.sonews.util.Stats;
    37 import org.sonews.util.io.ArticleReader;
    38 import org.sonews.util.io.ArticleWriter;
    39 
    40 /**
    41  * The PullFeeder class regularily checks another Newsserver for new
    42  * messages.
    43  * @author Christian Lins
    44  * @since sonews/0.5.0
    45  */
    46 class PullFeeder extends AbstractFeeder
    47 {
    48   
    49   private Map<Subscription, Integer> highMarks = new HashMap<Subscription, Integer>();
    50   private BufferedReader             in;
    51   private PrintWriter                out;
    52   
    53   @Override
    54   public void addSubscription(final Subscription sub)
    55   {
    56     super.addSubscription(sub);
    57     
    58     // Set a initial highMark
    59     this.highMarks.put(sub, 0);
    60   }
    61   
    62   /**
    63    * Changes to the given group and returns its high mark.
    64    * @param groupName
    65    * @return
    66    */
    67   private int changeGroup(String groupName)
    68     throws IOException
    69   {
    70     this.out.print("GROUP " + groupName + "\r\n");
    71     this.out.flush();
    72     
    73     String line = this.in.readLine();
    74     if(line.startsWith("211 "))
    75     {
    76       int highmark = Integer.parseInt(line.split(" ")[3]);
    77       return highmark;
    78     }
    79     else
    80     {
    81       throw new IOException("GROUP " + groupName + " returned: " + line);
    82     }
    83   }
    84   
    85   private void connectTo(final String host, final int port)
    86     throws IOException, UnknownHostException
    87   {
    88     Socket socket = new Socket(host, port);
    89     this.out = new PrintWriter(socket.getOutputStream());
    90     this.in  = new BufferedReader(new InputStreamReader(socket.getInputStream()));
    91 
    92     String line = in.readLine();
    93     if(!(line.charAt(0) == '2')) // Could be 200 or 2xx if posting is not allowed
    94     {
    95       throw new IOException(line);
    96     }
    97   }
    98   
    99   private void disconnect()
   100     throws IOException
   101   {
   102     this.out.print("QUIT\r\n");
   103     this.out.flush();
   104     this.out.close();
   105     this.in.close();
   106     
   107     this.out = null;
   108     this.in  = null;
   109   }
   110   
   111   /**
   112    * Uses the OVER or XOVER command to get a list of message overviews that
   113    * may be unknown to this feeder and are about to be peered.
   114    * @param start
   115    * @param end
   116    * @return A list of message ids with potentially interesting messages.
   117    */
   118   private List<String> over(int start, int end)
   119     throws IOException
   120   {
   121     this.out.print("OVER " + start + "-" + end + "\r\n");
   122     this.out.flush();
   123     
   124     String line = this.in.readLine();
   125     if(line.startsWith("500 ")) // OVER not supported
   126     {
   127       this.out.print("XOVER " + start + "-" + end + "\r\n");
   128       this.out.flush();
   129       
   130       line = this.in.readLine();
   131     }
   132     
   133     if(line.startsWith("224 "))
   134     {
   135       List<String> messages = new ArrayList<String>();
   136       line = this.in.readLine();
   137       while(!".".equals(line))
   138       {
   139         String mid = line.split("\t")[4]; // 5th should be the Message-ID
   140         messages.add(mid);
   141         line = this.in.readLine();
   142       }
   143       return messages;
   144     }
   145     else
   146     {
   147       throw new IOException("Server return for OVER/XOVER: " + line);
   148     }
   149   }
   150   
   151   @Override
   152   public void run()
   153   {
   154     while(isRunning())
   155     {
   156       int pullInterval = 1000 * 
   157         Config.inst().get(Config.FEED_PULLINTERVAL, 3600);
   158       String host = "localhost";
   159       int    port = 119;
   160       
   161       Log.msg("Start PullFeeder run...", true);
   162 
   163       try
   164       {
   165         for(Subscription sub : this.subscriptions)
   166         {
   167           host = sub.getHost();
   168           port = sub.getPort();
   169 
   170           try
   171           {
   172             Log.msg("Feeding " + sub.getGroup() + " from " + sub.getHost(), true);
   173             try
   174             {
   175               connectTo(host, port);
   176             }
   177             catch(SocketException ex)
   178             {
   179               Log.msg("Skipping " + sub.getHost() + ": " + ex, false);
   180               continue;
   181             }
   182             
   183             int oldMark = this.highMarks.get(sub);
   184             int newMark = changeGroup(sub.getGroup());
   185             
   186             if(oldMark != newMark)
   187             {
   188               List<String> messageIDs = over(oldMark, newMark);
   189 
   190               for(String messageID : messageIDs)
   191               {
   192                 if(!StorageManager.current().isArticleExisting(messageID))
   193                 {
   194                   try
   195                   {
   196                     // Post the message via common socket connection
   197                     ArticleReader aread =
   198                       new ArticleReader(sub.getHost(), sub.getPort(), messageID);
   199                     byte[] abuf = aread.getArticleData();
   200                     if (abuf == null)
   201                     {
   202                       Log.msg("Could not feed " + messageID + " from " + sub.getHost(), true);
   203                     }
   204                     else
   205                     {
   206                       Log.msg("Feeding " + messageID, true);
   207                       ArticleWriter awrite = new ArticleWriter(
   208                         "localhost", Config.inst().get(Config.PORT, 119));
   209                       awrite.writeArticle(abuf);
   210                       awrite.close();
   211                     }
   212                     Stats.getInstance().mailFeeded(sub.getGroup());
   213                   }
   214                   catch(IOException ex)
   215                   {
   216                     // There may be a temporary network failure
   217                     ex.printStackTrace();
   218                     Log.msg("Skipping mail " + messageID + " due to exception.", false);
   219                   }
   220                 }
   221               } // for(;;)
   222               this.highMarks.put(sub, newMark);
   223             }
   224             
   225             disconnect();
   226           }
   227           catch(StorageBackendException ex)
   228           {
   229             ex.printStackTrace();
   230           }
   231           catch(IOException ex)
   232           {
   233             ex.printStackTrace();
   234             Log.msg("PullFeeder run stopped due to exception.", false);
   235           }
   236         } // for(Subscription sub : subscriptions)
   237         
   238         Log.msg("PullFeeder run ended. Waiting " + pullInterval / 1000 + "s", true);
   239         Thread.sleep(pullInterval);
   240       }
   241       catch(InterruptedException ex)
   242       {
   243         Log.msg(ex.getMessage(), false);
   244       }
   245     }
   246   }
   247   
   248 }