org/sonews/feed/PullFeeder.java
author cli
Fri, 21 Aug 2009 17:33:15 +0200
changeset 18 7e527fdf0fa8
parent 15 f2293e8566f5
child 22 2541bdb54cb2
permissions -rw-r--r--
Fix for #549.
     1 /*
     2  *   SONEWS News Server
     3  *   see AUTHORS for the list of contributors
     4  *
     5  *   This program is free software: you can redistribute it and/or modify
     6  *   it under the terms of the GNU General Public License as published by
     7  *   the Free Software Foundation, either version 3 of the License, or
     8  *   (at your option) any later version.
     9  *
    10  *   This program is distributed in the hope that it will be useful,
    11  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
    12  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    13  *   GNU General Public License for more details.
    14  *
    15  *   You should have received a copy of the GNU General Public License
    16  *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
    17  */
    18 
    19 package org.sonews.feed;
    20 
    21 import java.io.BufferedReader;
    22 import java.io.IOException;
    23 import java.io.InputStreamReader;
    24 import java.io.PrintWriter;
    25 import java.net.Socket;
    26 import java.net.SocketException;
    27 import java.net.UnknownHostException;
    28 import java.util.ArrayList;
    29 import java.util.HashMap;
    30 import java.util.List;
    31 import java.util.Map;
    32 import org.sonews.config.Config;
    33 import org.sonews.util.Log;
    34 import org.sonews.storage.StorageBackendException;
    35 import org.sonews.storage.StorageManager;
    36 import org.sonews.util.Stats;
    37 import org.sonews.util.io.ArticleReader;
    38 import org.sonews.util.io.ArticleWriter;
    39 
    40 /**
    41  * The PullFeeder class regularily checks another Newsserver for new
    42  * messages.
    43  * @author Christian Lins
    44  * @since sonews/0.5.0
    45  */
    46 class PullFeeder extends AbstractFeeder
    47 {
    48   
    49   private Map<Subscription, Integer> highMarks = new HashMap<Subscription, Integer>();
    50   private BufferedReader             in;
    51   private PrintWriter                out;
    52   
    53   @Override
    54   public void addSubscription(final Subscription sub)
    55   {
    56     super.addSubscription(sub);
    57     
    58     // Set a initial highMark
    59     this.highMarks.put(sub, 0);
    60   }
    61   
    62   /**
    63    * Changes to the given group and returns its high mark.
    64    * @param groupName
    65    * @return
    66    */
    67   private int changeGroup(String groupName)
    68     throws IOException
    69   {
    70     this.out.print("GROUP " + groupName + "\r\n");
    71     this.out.flush();
    72     
    73     String line = this.in.readLine();
    74     if(line.startsWith("211 "))
    75     {
    76       int highmark = Integer.parseInt(line.split(" ")[3]);
    77       return highmark;
    78     }
    79     else
    80     {
    81       throw new IOException("GROUP " + groupName + " returned: " + line);
    82     }
    83   }
    84   
    85   private void connectTo(final String host, final int port)
    86     throws IOException, UnknownHostException
    87   {
    88     Socket socket = new Socket(host, port);
    89     this.out = new PrintWriter(socket.getOutputStream());
    90     this.in  = new BufferedReader(new InputStreamReader(socket.getInputStream()));
    91 
    92     String line = in.readLine();
    93     if(!(line.charAt(0) == '2')) // Could be 200 or 2xx if posting is not allowed
    94     {
    95       throw new IOException(line);
    96     }
    97 
    98     // Send MODE READER to peer, some newsservers are friendlier then
    99     this.out.println("MODE READER\r\n");
   100     this.out.flush();
   101     line = this.in.readLine();
   102   }
   103   
   104   private void disconnect()
   105     throws IOException
   106   {
   107     this.out.print("QUIT\r\n");
   108     this.out.flush();
   109     this.out.close();
   110     this.in.close();
   111     
   112     this.out = null;
   113     this.in  = null;
   114   }
   115   
   116   /**
   117    * Uses the OVER or XOVER command to get a list of message overviews that
   118    * may be unknown to this feeder and are about to be peered.
   119    * @param start
   120    * @param end
   121    * @return A list of message ids with potentially interesting messages.
   122    */
   123   private List<String> over(int start, int end)
   124     throws IOException
   125   {
   126     this.out.print("OVER " + start + "-" + end + "\r\n");
   127     this.out.flush();
   128     
   129     String line = this.in.readLine();
   130     if(line.startsWith("500 ")) // OVER not supported
   131     {
   132       this.out.print("XOVER " + start + "-" + end + "\r\n");
   133       this.out.flush();
   134       
   135       line = this.in.readLine();
   136     }
   137     
   138     if(line.startsWith("224 "))
   139     {
   140       List<String> messages = new ArrayList<String>();
   141       line = this.in.readLine();
   142       while(!".".equals(line))
   143       {
   144         String mid = line.split("\t")[4]; // 5th should be the Message-ID
   145         messages.add(mid);
   146         line = this.in.readLine();
   147       }
   148       return messages;
   149     }
   150     else
   151     {
   152       throw new IOException("Server return for OVER/XOVER: " + line);
   153     }
   154   }
   155   
   156   @Override
   157   public void run()
   158   {
   159     while(isRunning())
   160     {
   161       int pullInterval = 1000 * 
   162         Config.inst().get(Config.FEED_PULLINTERVAL, 3600);
   163       String host = "localhost";
   164       int    port = 119;
   165       
   166       Log.get().info("Start PullFeeder run...");
   167 
   168       try
   169       {
   170         for(Subscription sub : this.subscriptions)
   171         {
   172           host = sub.getHost();
   173           port = sub.getPort();
   174 
   175           try
   176           {
   177             Log.get().info("Feeding " + sub.getGroup() + " from " + sub.getHost());
   178             try
   179             {
   180               connectTo(host, port);
   181             }
   182             catch(SocketException ex)
   183             {
   184               Log.get().info("Skipping " + sub.getHost() + ": " + ex);
   185               continue;
   186             }
   187             
   188             int oldMark = this.highMarks.get(sub);
   189             int newMark = changeGroup(sub.getGroup());
   190             
   191             if(oldMark != newMark)
   192             {
   193               List<String> messageIDs = over(oldMark, newMark);
   194 
   195               for(String messageID : messageIDs)
   196               {
   197                 if(!StorageManager.current().isArticleExisting(messageID))
   198                 {
   199                   try
   200                   {
   201                     // Post the message via common socket connection
   202                     ArticleReader aread =
   203                       new ArticleReader(sub.getHost(), sub.getPort(), messageID);
   204                     byte[] abuf = aread.getArticleData();
   205                     if(abuf == null)
   206                     {
   207                       Log.get().warning("Could not feed " + messageID
   208                         + " from " + sub.getHost());
   209                     }
   210                     else
   211                     {
   212                       Log.get().info("Feeding " + messageID);
   213                       ArticleWriter awrite = new ArticleWriter(
   214                         "localhost", Config.inst().get(Config.PORT, 119));
   215                       awrite.writeArticle(abuf);
   216                       awrite.close();
   217                     }
   218                     Stats.getInstance().mailFeeded(sub.getGroup());
   219                   }
   220                   catch(IOException ex)
   221                   {
   222                     // There may be a temporary network failure
   223                     ex.printStackTrace();
   224                     Log.get().warning("Skipping mail " + messageID + " due to exception.");
   225                   }
   226                 }
   227               } // for(;;)
   228               this.highMarks.put(sub, newMark);
   229             }
   230             
   231             disconnect();
   232           }
   233           catch(StorageBackendException ex)
   234           {
   235             ex.printStackTrace();
   236           }
   237           catch(IOException ex)
   238           {
   239             ex.printStackTrace();
   240             Log.get().severe("PullFeeder run stopped due to exception.");
   241           }
   242         } // for(Subscription sub : subscriptions)
   243         
   244         Log.get().info("PullFeeder run ended. Waiting " + pullInterval / 1000 + "s");
   245         Thread.sleep(pullInterval);
   246       }
   247       catch(InterruptedException ex)
   248       {
   249         Log.get().warning(ex.getMessage());
   250       }
   251     }
   252   }
   253   
   254 }