org/sonews/feed/PullFeeder.java
author cli
Fri, 21 Aug 2009 17:33:15 +0200
changeset 18 7e527fdf0fa8
parent 15 f2293e8566f5
child 22 2541bdb54cb2
permissions -rw-r--r--
Fix for #549.
chris@1
     1
/*
chris@1
     2
 *   SONEWS News Server
chris@1
     3
 *   see AUTHORS for the list of contributors
chris@1
     4
 *
chris@1
     5
 *   This program is free software: you can redistribute it and/or modify
chris@1
     6
 *   it under the terms of the GNU General Public License as published by
chris@1
     7
 *   the Free Software Foundation, either version 3 of the License, or
chris@1
     8
 *   (at your option) any later version.
chris@1
     9
 *
chris@1
    10
 *   This program is distributed in the hope that it will be useful,
chris@1
    11
 *   but WITHOUT ANY WARRANTY; without even the implied warranty of
chris@1
    12
 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
chris@1
    13
 *   GNU General Public License for more details.
chris@1
    14
 *
chris@1
    15
 *   You should have received a copy of the GNU General Public License
chris@1
    16
 *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
chris@1
    17
 */
chris@1
    18
chris@1
    19
package org.sonews.feed;
chris@1
    20
chris@1
    21
import java.io.BufferedReader;
chris@1
    22
import java.io.IOException;
chris@1
    23
import java.io.InputStreamReader;
chris@1
    24
import java.io.PrintWriter;
chris@1
    25
import java.net.Socket;
chris@1
    26
import java.net.SocketException;
chris@1
    27
import java.net.UnknownHostException;
chris@1
    28
import java.util.ArrayList;
chris@1
    29
import java.util.HashMap;
chris@1
    30
import java.util.List;
chris@1
    31
import java.util.Map;
chris@3
    32
import org.sonews.config.Config;
chris@1
    33
import org.sonews.util.Log;
chris@3
    34
import org.sonews.storage.StorageBackendException;
chris@3
    35
import org.sonews.storage.StorageManager;
chris@1
    36
import org.sonews.util.Stats;
chris@1
    37
import org.sonews.util.io.ArticleReader;
chris@1
    38
import org.sonews.util.io.ArticleWriter;
chris@1
    39
chris@1
    40
/**
chris@1
    41
 * The PullFeeder class regularily checks another Newsserver for new
chris@1
    42
 * messages.
chris@1
    43
 * @author Christian Lins
chris@1
    44
 * @since sonews/0.5.0
chris@1
    45
 */
chris@1
    46
class PullFeeder extends AbstractFeeder
chris@1
    47
{
chris@1
    48
  
chris@1
    49
  private Map<Subscription, Integer> highMarks = new HashMap<Subscription, Integer>();
chris@1
    50
  private BufferedReader             in;
chris@1
    51
  private PrintWriter                out;
chris@1
    52
  
chris@1
    53
  @Override
chris@1
    54
  public void addSubscription(final Subscription sub)
chris@1
    55
  {
chris@1
    56
    super.addSubscription(sub);
chris@1
    57
    
chris@1
    58
    // Set a initial highMark
chris@1
    59
    this.highMarks.put(sub, 0);
chris@1
    60
  }
chris@1
    61
  
chris@1
    62
  /**
chris@1
    63
   * Changes to the given group and returns its high mark.
chris@1
    64
   * @param groupName
chris@1
    65
   * @return
chris@1
    66
   */
chris@1
    67
  private int changeGroup(String groupName)
chris@1
    68
    throws IOException
chris@1
    69
  {
chris@1
    70
    this.out.print("GROUP " + groupName + "\r\n");
chris@1
    71
    this.out.flush();
chris@1
    72
    
chris@1
    73
    String line = this.in.readLine();
chris@1
    74
    if(line.startsWith("211 "))
chris@1
    75
    {
chris@1
    76
      int highmark = Integer.parseInt(line.split(" ")[3]);
chris@1
    77
      return highmark;
chris@1
    78
    }
chris@1
    79
    else
chris@1
    80
    {
chris@1
    81
      throw new IOException("GROUP " + groupName + " returned: " + line);
chris@1
    82
    }
chris@1
    83
  }
chris@1
    84
  
chris@1
    85
  private void connectTo(final String host, final int port)
chris@1
    86
    throws IOException, UnknownHostException
chris@1
    87
  {
chris@1
    88
    Socket socket = new Socket(host, port);
chris@1
    89
    this.out = new PrintWriter(socket.getOutputStream());
chris@1
    90
    this.in  = new BufferedReader(new InputStreamReader(socket.getInputStream()));
chris@1
    91
chris@1
    92
    String line = in.readLine();
chris@1
    93
    if(!(line.charAt(0) == '2')) // Could be 200 or 2xx if posting is not allowed
chris@1
    94
    {
chris@1
    95
      throw new IOException(line);
chris@1
    96
    }
cli@7
    97
cli@7
    98
    // Send MODE READER to peer, some newsservers are friendlier then
cli@7
    99
    this.out.println("MODE READER\r\n");
cli@7
   100
    this.out.flush();
cli@7
   101
    line = this.in.readLine();
chris@1
   102
  }
chris@1
   103
  
chris@1
   104
  private void disconnect()
chris@1
   105
    throws IOException
chris@1
   106
  {
chris@1
   107
    this.out.print("QUIT\r\n");
chris@1
   108
    this.out.flush();
chris@1
   109
    this.out.close();
chris@1
   110
    this.in.close();
chris@1
   111
    
chris@1
   112
    this.out = null;
chris@1
   113
    this.in  = null;
chris@1
   114
  }
chris@1
   115
  
chris@1
   116
  /**
chris@1
   117
   * Uses the OVER or XOVER command to get a list of message overviews that
chris@1
   118
   * may be unknown to this feeder and are about to be peered.
chris@1
   119
   * @param start
chris@1
   120
   * @param end
chris@1
   121
   * @return A list of message ids with potentially interesting messages.
chris@1
   122
   */
chris@1
   123
  private List<String> over(int start, int end)
chris@1
   124
    throws IOException
chris@1
   125
  {
chris@1
   126
    this.out.print("OVER " + start + "-" + end + "\r\n");
chris@1
   127
    this.out.flush();
chris@1
   128
    
chris@1
   129
    String line = this.in.readLine();
chris@1
   130
    if(line.startsWith("500 ")) // OVER not supported
chris@1
   131
    {
chris@1
   132
      this.out.print("XOVER " + start + "-" + end + "\r\n");
chris@1
   133
      this.out.flush();
chris@1
   134
      
chris@1
   135
      line = this.in.readLine();
chris@1
   136
    }
chris@1
   137
    
chris@1
   138
    if(line.startsWith("224 "))
chris@1
   139
    {
chris@1
   140
      List<String> messages = new ArrayList<String>();
chris@1
   141
      line = this.in.readLine();
chris@1
   142
      while(!".".equals(line))
chris@1
   143
      {
chris@1
   144
        String mid = line.split("\t")[4]; // 5th should be the Message-ID
chris@1
   145
        messages.add(mid);
chris@1
   146
        line = this.in.readLine();
chris@1
   147
      }
chris@1
   148
      return messages;
chris@1
   149
    }
chris@1
   150
    else
chris@1
   151
    {
chris@1
   152
      throw new IOException("Server return for OVER/XOVER: " + line);
chris@1
   153
    }
chris@1
   154
  }
chris@1
   155
  
chris@1
   156
  @Override
chris@1
   157
  public void run()
chris@1
   158
  {
chris@1
   159
    while(isRunning())
chris@1
   160
    {
chris@1
   161
      int pullInterval = 1000 * 
chris@3
   162
        Config.inst().get(Config.FEED_PULLINTERVAL, 3600);
chris@1
   163
      String host = "localhost";
chris@1
   164
      int    port = 119;
chris@1
   165
      
cli@15
   166
      Log.get().info("Start PullFeeder run...");
chris@1
   167
chris@1
   168
      try
chris@1
   169
      {
chris@1
   170
        for(Subscription sub : this.subscriptions)
chris@1
   171
        {
chris@1
   172
          host = sub.getHost();
chris@1
   173
          port = sub.getPort();
chris@1
   174
chris@1
   175
          try
chris@1
   176
          {
cli@15
   177
            Log.get().info("Feeding " + sub.getGroup() + " from " + sub.getHost());
chris@1
   178
            try
chris@1
   179
            {
chris@1
   180
              connectTo(host, port);
chris@1
   181
            }
chris@1
   182
            catch(SocketException ex)
chris@1
   183
            {
cli@15
   184
              Log.get().info("Skipping " + sub.getHost() + ": " + ex);
chris@1
   185
              continue;
chris@1
   186
            }
chris@1
   187
            
chris@1
   188
            int oldMark = this.highMarks.get(sub);
chris@1
   189
            int newMark = changeGroup(sub.getGroup());
chris@1
   190
            
chris@1
   191
            if(oldMark != newMark)
chris@1
   192
            {
chris@1
   193
              List<String> messageIDs = over(oldMark, newMark);
chris@1
   194
chris@1
   195
              for(String messageID : messageIDs)
chris@1
   196
              {
chris@3
   197
                if(!StorageManager.current().isArticleExisting(messageID))
chris@1
   198
                {
chris@3
   199
                  try
chris@1
   200
                  {
chris@3
   201
                    // Post the message via common socket connection
chris@3
   202
                    ArticleReader aread =
chris@3
   203
                      new ArticleReader(sub.getHost(), sub.getPort(), messageID);
chris@3
   204
                    byte[] abuf = aread.getArticleData();
cli@15
   205
                    if(abuf == null)
chris@3
   206
                    {
cli@15
   207
                      Log.get().warning("Could not feed " + messageID
cli@15
   208
                        + " from " + sub.getHost());
chris@3
   209
                    }
chris@3
   210
                    else
chris@3
   211
                    {
cli@15
   212
                      Log.get().info("Feeding " + messageID);
chris@3
   213
                      ArticleWriter awrite = new ArticleWriter(
chris@3
   214
                        "localhost", Config.inst().get(Config.PORT, 119));
chris@3
   215
                      awrite.writeArticle(abuf);
chris@3
   216
                      awrite.close();
chris@3
   217
                    }
chris@3
   218
                    Stats.getInstance().mailFeeded(sub.getGroup());
chris@1
   219
                  }
chris@3
   220
                  catch(IOException ex)
chris@1
   221
                  {
chris@3
   222
                    // There may be a temporary network failure
chris@3
   223
                    ex.printStackTrace();
cli@15
   224
                    Log.get().warning("Skipping mail " + messageID + " due to exception.");
chris@1
   225
                  }
chris@1
   226
                }
chris@1
   227
              } // for(;;)
chris@1
   228
              this.highMarks.put(sub, newMark);
chris@1
   229
            }
chris@1
   230
            
chris@1
   231
            disconnect();
chris@1
   232
          }
chris@3
   233
          catch(StorageBackendException ex)
chris@1
   234
          {
chris@1
   235
            ex.printStackTrace();
chris@1
   236
          }
chris@1
   237
          catch(IOException ex)
chris@1
   238
          {
chris@1
   239
            ex.printStackTrace();
cli@15
   240
            Log.get().severe("PullFeeder run stopped due to exception.");
chris@1
   241
          }
chris@1
   242
        } // for(Subscription sub : subscriptions)
chris@1
   243
        
cli@15
   244
        Log.get().info("PullFeeder run ended. Waiting " + pullInterval / 1000 + "s");
chris@1
   245
        Thread.sleep(pullInterval);
chris@1
   246
      }
chris@1
   247
      catch(InterruptedException ex)
chris@1
   248
      {
cli@15
   249
        Log.get().warning(ex.getMessage());
chris@1
   250
      }
chris@1
   251
    }
chris@1
   252
  }
chris@1
   253
  
chris@1
   254
}