org/sonews/feed/PullFeeder.java
author chris <chris@marvin>
Thu, 06 Aug 2009 18:34:10 +0200
changeset 5 ad210aa137a4
parent 3 2fdc9cc89502
child 7 0b76e099eb96
permissions -rw-r--r--
Removing tags as they point to invalid changesets after history refactoring.
chris@1
     1
/*
chris@1
     2
 *   SONEWS News Server
chris@1
     3
 *   see AUTHORS for the list of contributors
chris@1
     4
 *
chris@1
     5
 *   This program is free software: you can redistribute it and/or modify
chris@1
     6
 *   it under the terms of the GNU General Public License as published by
chris@1
     7
 *   the Free Software Foundation, either version 3 of the License, or
chris@1
     8
 *   (at your option) any later version.
chris@1
     9
 *
chris@1
    10
 *   This program is distributed in the hope that it will be useful,
chris@1
    11
 *   but WITHOUT ANY WARRANTY; without even the implied warranty of
chris@1
    12
 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
chris@1
    13
 *   GNU General Public License for more details.
chris@1
    14
 *
chris@1
    15
 *   You should have received a copy of the GNU General Public License
chris@1
    16
 *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
chris@1
    17
 */
chris@1
    18
chris@1
    19
package org.sonews.feed;
chris@1
    20
chris@1
    21
import java.io.BufferedReader;
chris@1
    22
import java.io.IOException;
chris@1
    23
import java.io.InputStreamReader;
chris@1
    24
import java.io.PrintWriter;
chris@1
    25
import java.net.Socket;
chris@1
    26
import java.net.SocketException;
chris@1
    27
import java.net.UnknownHostException;
chris@1
    28
import java.util.ArrayList;
chris@1
    29
import java.util.HashMap;
chris@1
    30
import java.util.List;
chris@1
    31
import java.util.Map;
chris@3
    32
import org.sonews.config.Config;
chris@1
    33
import org.sonews.util.Log;
chris@3
    34
import org.sonews.storage.StorageBackendException;
chris@3
    35
import org.sonews.storage.StorageManager;
chris@1
    36
import org.sonews.util.Stats;
chris@1
    37
import org.sonews.util.io.ArticleReader;
chris@1
    38
import org.sonews.util.io.ArticleWriter;
chris@1
    39
chris@1
    40
/**
chris@1
    41
 * The PullFeeder class regularily checks another Newsserver for new
chris@1
    42
 * messages.
chris@1
    43
 * @author Christian Lins
chris@1
    44
 * @since sonews/0.5.0
chris@1
    45
 */
chris@1
    46
class PullFeeder extends AbstractFeeder
chris@1
    47
{
chris@1
    48
  
chris@1
    49
  private Map<Subscription, Integer> highMarks = new HashMap<Subscription, Integer>();
chris@1
    50
  private BufferedReader             in;
chris@1
    51
  private PrintWriter                out;
chris@1
    52
  
chris@1
    53
  @Override
chris@1
    54
  public void addSubscription(final Subscription sub)
chris@1
    55
  {
chris@1
    56
    super.addSubscription(sub);
chris@1
    57
    
chris@1
    58
    // Set a initial highMark
chris@1
    59
    this.highMarks.put(sub, 0);
chris@1
    60
  }
chris@1
    61
  
chris@1
    62
  /**
chris@1
    63
   * Changes to the given group and returns its high mark.
chris@1
    64
   * @param groupName
chris@1
    65
   * @return
chris@1
    66
   */
chris@1
    67
  private int changeGroup(String groupName)
chris@1
    68
    throws IOException
chris@1
    69
  {
chris@1
    70
    this.out.print("GROUP " + groupName + "\r\n");
chris@1
    71
    this.out.flush();
chris@1
    72
    
chris@1
    73
    String line = this.in.readLine();
chris@1
    74
    if(line.startsWith("211 "))
chris@1
    75
    {
chris@1
    76
      int highmark = Integer.parseInt(line.split(" ")[3]);
chris@1
    77
      return highmark;
chris@1
    78
    }
chris@1
    79
    else
chris@1
    80
    {
chris@1
    81
      throw new IOException("GROUP " + groupName + " returned: " + line);
chris@1
    82
    }
chris@1
    83
  }
chris@1
    84
  
chris@1
    85
  private void connectTo(final String host, final int port)
chris@1
    86
    throws IOException, UnknownHostException
chris@1
    87
  {
chris@1
    88
    Socket socket = new Socket(host, port);
chris@1
    89
    this.out = new PrintWriter(socket.getOutputStream());
chris@1
    90
    this.in  = new BufferedReader(new InputStreamReader(socket.getInputStream()));
chris@1
    91
chris@1
    92
    String line = in.readLine();
chris@1
    93
    if(!(line.charAt(0) == '2')) // Could be 200 or 2xx if posting is not allowed
chris@1
    94
    {
chris@1
    95
      throw new IOException(line);
chris@1
    96
    }
chris@1
    97
  }
chris@1
    98
  
chris@1
    99
  private void disconnect()
chris@1
   100
    throws IOException
chris@1
   101
  {
chris@1
   102
    this.out.print("QUIT\r\n");
chris@1
   103
    this.out.flush();
chris@1
   104
    this.out.close();
chris@1
   105
    this.in.close();
chris@1
   106
    
chris@1
   107
    this.out = null;
chris@1
   108
    this.in  = null;
chris@1
   109
  }
chris@1
   110
  
chris@1
   111
  /**
chris@1
   112
   * Uses the OVER or XOVER command to get a list of message overviews that
chris@1
   113
   * may be unknown to this feeder and are about to be peered.
chris@1
   114
   * @param start
chris@1
   115
   * @param end
chris@1
   116
   * @return A list of message ids with potentially interesting messages.
chris@1
   117
   */
chris@1
   118
  private List<String> over(int start, int end)
chris@1
   119
    throws IOException
chris@1
   120
  {
chris@1
   121
    this.out.print("OVER " + start + "-" + end + "\r\n");
chris@1
   122
    this.out.flush();
chris@1
   123
    
chris@1
   124
    String line = this.in.readLine();
chris@1
   125
    if(line.startsWith("500 ")) // OVER not supported
chris@1
   126
    {
chris@1
   127
      this.out.print("XOVER " + start + "-" + end + "\r\n");
chris@1
   128
      this.out.flush();
chris@1
   129
      
chris@1
   130
      line = this.in.readLine();
chris@1
   131
    }
chris@1
   132
    
chris@1
   133
    if(line.startsWith("224 "))
chris@1
   134
    {
chris@1
   135
      List<String> messages = new ArrayList<String>();
chris@1
   136
      line = this.in.readLine();
chris@1
   137
      while(!".".equals(line))
chris@1
   138
      {
chris@1
   139
        String mid = line.split("\t")[4]; // 5th should be the Message-ID
chris@1
   140
        messages.add(mid);
chris@1
   141
        line = this.in.readLine();
chris@1
   142
      }
chris@1
   143
      return messages;
chris@1
   144
    }
chris@1
   145
    else
chris@1
   146
    {
chris@1
   147
      throw new IOException("Server return for OVER/XOVER: " + line);
chris@1
   148
    }
chris@1
   149
  }
chris@1
   150
  
chris@1
   151
  @Override
chris@1
   152
  public void run()
chris@1
   153
  {
chris@1
   154
    while(isRunning())
chris@1
   155
    {
chris@1
   156
      int pullInterval = 1000 * 
chris@3
   157
        Config.inst().get(Config.FEED_PULLINTERVAL, 3600);
chris@1
   158
      String host = "localhost";
chris@1
   159
      int    port = 119;
chris@1
   160
      
chris@1
   161
      Log.msg("Start PullFeeder run...", true);
chris@1
   162
chris@1
   163
      try
chris@1
   164
      {
chris@1
   165
        for(Subscription sub : this.subscriptions)
chris@1
   166
        {
chris@1
   167
          host = sub.getHost();
chris@1
   168
          port = sub.getPort();
chris@1
   169
chris@1
   170
          try
chris@1
   171
          {
chris@1
   172
            Log.msg("Feeding " + sub.getGroup() + " from " + sub.getHost(), true);
chris@1
   173
            try
chris@1
   174
            {
chris@1
   175
              connectTo(host, port);
chris@1
   176
            }
chris@1
   177
            catch(SocketException ex)
chris@1
   178
            {
chris@1
   179
              Log.msg("Skipping " + sub.getHost() + ": " + ex, false);
chris@1
   180
              continue;
chris@1
   181
            }
chris@1
   182
            
chris@1
   183
            int oldMark = this.highMarks.get(sub);
chris@1
   184
            int newMark = changeGroup(sub.getGroup());
chris@1
   185
            
chris@1
   186
            if(oldMark != newMark)
chris@1
   187
            {
chris@1
   188
              List<String> messageIDs = over(oldMark, newMark);
chris@1
   189
chris@1
   190
              for(String messageID : messageIDs)
chris@1
   191
              {
chris@3
   192
                if(!StorageManager.current().isArticleExisting(messageID))
chris@1
   193
                {
chris@3
   194
                  try
chris@1
   195
                  {
chris@3
   196
                    // Post the message via common socket connection
chris@3
   197
                    ArticleReader aread =
chris@3
   198
                      new ArticleReader(sub.getHost(), sub.getPort(), messageID);
chris@3
   199
                    byte[] abuf = aread.getArticleData();
chris@3
   200
                    if (abuf == null)
chris@3
   201
                    {
chris@3
   202
                      Log.msg("Could not feed " + messageID + " from " + sub.getHost(), true);
chris@3
   203
                    }
chris@3
   204
                    else
chris@3
   205
                    {
chris@3
   206
                      Log.msg("Feeding " + messageID, true);
chris@3
   207
                      ArticleWriter awrite = new ArticleWriter(
chris@3
   208
                        "localhost", Config.inst().get(Config.PORT, 119));
chris@3
   209
                      awrite.writeArticle(abuf);
chris@3
   210
                      awrite.close();
chris@3
   211
                    }
chris@3
   212
                    Stats.getInstance().mailFeeded(sub.getGroup());
chris@1
   213
                  }
chris@3
   214
                  catch(IOException ex)
chris@1
   215
                  {
chris@3
   216
                    // There may be a temporary network failure
chris@3
   217
                    ex.printStackTrace();
chris@3
   218
                    Log.msg("Skipping mail " + messageID + " due to exception.", false);
chris@1
   219
                  }
chris@1
   220
                }
chris@1
   221
              } // for(;;)
chris@1
   222
              this.highMarks.put(sub, newMark);
chris@1
   223
            }
chris@1
   224
            
chris@1
   225
            disconnect();
chris@1
   226
          }
chris@3
   227
          catch(StorageBackendException ex)
chris@1
   228
          {
chris@1
   229
            ex.printStackTrace();
chris@1
   230
          }
chris@1
   231
          catch(IOException ex)
chris@1
   232
          {
chris@1
   233
            ex.printStackTrace();
chris@1
   234
            Log.msg("PullFeeder run stopped due to exception.", false);
chris@1
   235
          }
chris@1
   236
        } // for(Subscription sub : subscriptions)
chris@1
   237
        
chris@1
   238
        Log.msg("PullFeeder run ended. Waiting " + pullInterval / 1000 + "s", true);
chris@1
   239
        Thread.sleep(pullInterval);
chris@1
   240
      }
chris@1
   241
      catch(InterruptedException ex)
chris@1
   242
      {
chris@1
   243
        Log.msg(ex.getMessage(), false);
chris@1
   244
      }
chris@1
   245
    }
chris@1
   246
  }
chris@1
   247
  
chris@1
   248
}