org/sonews/daemon/NNTPConnection.java
author cli
Wed, 12 May 2010 11:18:02 +0200
changeset 31 087ef6fe6a1a
parent 30 146b3275b792
permissions -rw-r--r--
Group.getByName() removed. Channel.getByName() does no longer catch StorageBackendExceptions but throw them further.
chris@1
     1
/*
chris@1
     2
 *   SONEWS News Server
chris@1
     3
 *   see AUTHORS for the list of contributors
chris@1
     4
 *
chris@1
     5
 *   This program is free software: you can redistribute it and/or modify
chris@1
     6
 *   it under the terms of the GNU General Public License as published by
chris@1
     7
 *   the Free Software Foundation, either version 3 of the License, or
chris@1
     8
 *   (at your option) any later version.
chris@1
     9
 *
chris@1
    10
 *   This program is distributed in the hope that it will be useful,
chris@1
    11
 *   but WITHOUT ANY WARRANTY; without even the implied warranty of
chris@1
    12
 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
chris@1
    13
 *   GNU General Public License for more details.
chris@1
    14
 *
chris@1
    15
 *   You should have received a copy of the GNU General Public License
chris@1
    16
 *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
chris@1
    17
 */
chris@1
    18
chris@1
    19
package org.sonews.daemon;
chris@1
    20
chris@1
    21
import java.io.IOException;
chris@1
    22
import java.net.InetSocketAddress;
chris@3
    23
import java.net.SocketException;
chris@1
    24
import java.nio.ByteBuffer;
chris@1
    25
import java.nio.CharBuffer;
chris@1
    26
import java.nio.channels.ClosedChannelException;
chris@1
    27
import java.nio.channels.SelectionKey;
chris@1
    28
import java.nio.channels.SocketChannel;
chris@1
    29
import java.nio.charset.Charset;
chris@3
    30
import java.util.Arrays;
chris@1
    31
import java.util.Timer;
chris@1
    32
import java.util.TimerTask;
chris@3
    33
import org.sonews.daemon.command.Command;
chris@3
    34
import org.sonews.storage.Article;
chris@3
    35
import org.sonews.storage.Channel;
cli@30
    36
import org.sonews.storage.StorageBackendException;
cli@25
    37
import org.sonews.util.Log;
chris@1
    38
import org.sonews.util.Stats;
chris@1
    39
chris@1
    40
/**
chris@1
    41
 * For every SocketChannel (so TCP/IP connection) there is an instance of
chris@1
    42
 * this class.
chris@1
    43
 * @author Christian Lins
chris@1
    44
 * @since sonews/0.5.0
chris@1
    45
 */
chris@1
    46
public final class NNTPConnection
chris@1
    47
{
chris@1
    48
chris@1
    49
  public static final String NEWLINE            = "\r\n";    // RFC defines this as newline
chris@1
    50
  public static final String MESSAGE_ID_PATTERN = "<[^>]+>";
chris@1
    51
  
chris@1
    52
  private static final Timer cancelTimer = new Timer(true); // Thread-safe? True for run as daemon
chris@1
    53
  
chris@1
    54
  /** SocketChannel is generally thread-safe */
chris@1
    55
  private SocketChannel   channel        = null;
chris@1
    56
  private Charset         charset        = Charset.forName("UTF-8");
chris@3
    57
  private Command         command        = null;
chris@1
    58
  private Article         currentArticle = null;
chris@3
    59
  private Channel         currentGroup   = null;
chris@1
    60
  private volatile long   lastActivity   = System.currentTimeMillis();
chris@1
    61
  private ChannelLineBuffers lineBuffers = new ChannelLineBuffers();
chris@1
    62
  private int             readLock       = 0;
chris@1
    63
  private final Object    readLockGate   = new Object();
chris@1
    64
  private SelectionKey    writeSelKey    = null;
chris@1
    65
  
chris@1
    66
  public NNTPConnection(final SocketChannel channel)
chris@1
    67
    throws IOException
chris@1
    68
  {
chris@1
    69
    if(channel == null)
chris@1
    70
    {
chris@1
    71
      throw new IllegalArgumentException("channel is null");
chris@1
    72
    }
chris@1
    73
chris@1
    74
    this.channel = channel;
chris@1
    75
    Stats.getInstance().clientConnect();
chris@1
    76
  }
chris@1
    77
  
chris@1
    78
  /**
chris@1
    79
   * Tries to get the read lock for this NNTPConnection. This method is Thread-
chris@1
    80
   * safe and returns true of the read lock was successfully set. If the lock
chris@1
    81
   * is still hold by another Thread the method returns false.
chris@1
    82
   */
chris@1
    83
  boolean tryReadLock()
chris@1
    84
  {
chris@1
    85
    // As synchronizing simple types may cause deadlocks,
chris@1
    86
    // we use a gate object.
chris@1
    87
    synchronized(readLockGate)
chris@1
    88
    {
chris@1
    89
      if(readLock != 0)
chris@1
    90
      {
chris@1
    91
        return false;
chris@1
    92
      }
chris@1
    93
      else
chris@1
    94
      {
chris@1
    95
        readLock = Thread.currentThread().hashCode();
chris@1
    96
        return true;
chris@1
    97
      }
chris@1
    98
    }
chris@1
    99
  }
chris@1
   100
  
chris@1
   101
  /**
chris@1
   102
   * Releases the read lock in a Thread-safe way.
chris@1
   103
   * @throws IllegalMonitorStateException if a Thread not holding the lock
chris@1
   104
   * tries to release it.
chris@1
   105
   */
chris@1
   106
  void unlockReadLock()
chris@1
   107
  {
chris@1
   108
    synchronized(readLockGate)
chris@1
   109
    {
chris@1
   110
      if(readLock == Thread.currentThread().hashCode())
chris@1
   111
      {
chris@1
   112
        readLock = 0;
chris@1
   113
      }
chris@1
   114
      else
chris@1
   115
      {
chris@1
   116
        throw new IllegalMonitorStateException();
chris@1
   117
      }
chris@1
   118
    }
chris@1
   119
  }
chris@1
   120
  
chris@1
   121
  /**
chris@1
   122
   * @return Current input buffer of this NNTPConnection instance.
chris@1
   123
   */
chris@1
   124
  public ByteBuffer getInputBuffer()
chris@1
   125
  {
chris@1
   126
    return this.lineBuffers.getInputBuffer();
chris@1
   127
  }
chris@1
   128
  
chris@1
   129
  /**
chris@1
   130
   * @return Output buffer of this NNTPConnection which has at least one byte
chris@1
   131
   * free storage.
chris@1
   132
   */
chris@1
   133
  public ByteBuffer getOutputBuffer()
chris@1
   134
  {
chris@1
   135
    return this.lineBuffers.getOutputBuffer();
chris@1
   136
  }
chris@1
   137
  
chris@1
   138
  /**
chris@1
   139
   * @return ChannelLineBuffers instance associated with this NNTPConnection.
chris@1
   140
   */
chris@1
   141
  public ChannelLineBuffers getBuffers()
chris@1
   142
  {
chris@1
   143
    return this.lineBuffers;
chris@1
   144
  }
chris@1
   145
  
chris@1
   146
  /**
chris@1
   147
   * @return true if this connection comes from a local remote address.
chris@1
   148
   */
chris@1
   149
  public boolean isLocalConnection()
chris@1
   150
  {
chris@1
   151
    return ((InetSocketAddress)this.channel.socket().getRemoteSocketAddress())
chris@1
   152
      .getHostName().equalsIgnoreCase("localhost");
chris@1
   153
  }
chris@1
   154
chris@1
   155
  void setWriteSelectionKey(SelectionKey selKey)
chris@1
   156
  {
chris@1
   157
    this.writeSelKey = selKey;
chris@1
   158
  }
chris@1
   159
chris@1
   160
  public void shutdownInput()
chris@1
   161
  {
chris@1
   162
    try
chris@1
   163
    {
chris@1
   164
      // Closes the input line of the channel's socket, so no new data
chris@1
   165
      // will be received and a timeout can be triggered.
chris@1
   166
      this.channel.socket().shutdownInput();
chris@1
   167
    }
chris@1
   168
    catch(IOException ex)
chris@1
   169
    {
cli@15
   170
      Log.get().warning("Exception in NNTPConnection.shutdownInput(): " + ex);
chris@1
   171
    }
chris@1
   172
  }
chris@1
   173
  
chris@1
   174
  public void shutdownOutput()
chris@1
   175
  {
chris@1
   176
    cancelTimer.schedule(new TimerTask() 
chris@1
   177
    {
chris@1
   178
      @Override
chris@1
   179
      public void run()
chris@1
   180
      {
chris@1
   181
        try
chris@1
   182
        {
chris@1
   183
          // Closes the output line of the channel's socket.
chris@1
   184
          channel.socket().shutdownOutput();
chris@1
   185
          channel.close();
chris@1
   186
        }
chris@3
   187
        catch(SocketException ex)
chris@3
   188
        {
chris@3
   189
          // Socket was already disconnected
cli@15
   190
          Log.get().info("NNTPConnection.shutdownOutput(): " + ex);
chris@3
   191
        }
chris@1
   192
        catch(Exception ex)
chris@1
   193
        {
cli@15
   194
          Log.get().warning("NNTPConnection.shutdownOutput(): " + ex);
chris@1
   195
        }
chris@1
   196
      }
chris@1
   197
    }, 3000);
chris@1
   198
  }
chris@1
   199
  
chris@3
   200
  public SocketChannel getSocketChannel()
chris@1
   201
  {
chris@1
   202
    return this.channel;
chris@1
   203
  }
chris@1
   204
  
chris@1
   205
  public Article getCurrentArticle()
chris@1
   206
  {
chris@1
   207
    return this.currentArticle;
chris@1
   208
  }
chris@1
   209
  
chris@1
   210
  public Charset getCurrentCharset()
chris@1
   211
  {
chris@1
   212
    return this.charset;
chris@1
   213
  }
chris@3
   214
chris@3
   215
  /**
chris@3
   216
   * @return The currently selected communication channel (not SocketChannel)
chris@3
   217
   */
chris@3
   218
  public Channel getCurrentChannel()
chris@1
   219
  {
chris@1
   220
    return this.currentGroup;
chris@1
   221
  }
chris@1
   222
  
chris@1
   223
  public void setCurrentArticle(final Article article)
chris@1
   224
  {
chris@1
   225
    this.currentArticle = article;
chris@1
   226
  }
chris@1
   227
  
chris@3
   228
  public void setCurrentGroup(final Channel group)
chris@1
   229
  {
chris@1
   230
    this.currentGroup = group;
chris@1
   231
  }
chris@1
   232
  
chris@1
   233
  public long getLastActivity()
chris@1
   234
  {
chris@1
   235
    return this.lastActivity;
chris@1
   236
  }
chris@1
   237
  
chris@1
   238
  /**
chris@1
   239
   * Due to the readLockGate there is no need to synchronize this method.
chris@1
   240
   * @param raw
chris@1
   241
   * @throws IllegalArgumentException if raw is null.
chris@1
   242
   * @throws IllegalStateException if calling thread does not own the readLock.
chris@1
   243
   */
chris@1
   244
  void lineReceived(byte[] raw)
chris@1
   245
  {
chris@1
   246
    if(raw == null)
chris@1
   247
    {
chris@1
   248
      throw new IllegalArgumentException("raw is null");
chris@1
   249
    }
chris@1
   250
    
chris@1
   251
    if(readLock == 0 || readLock != Thread.currentThread().hashCode())
chris@1
   252
    {
chris@1
   253
      throw new IllegalStateException("readLock not properly set");
chris@1
   254
    }
chris@1
   255
chris@1
   256
    this.lastActivity = System.currentTimeMillis();
chris@1
   257
    
chris@1
   258
    String line = new String(raw, this.charset);
chris@1
   259
    
chris@1
   260
    // There might be a trailing \r, but trim() is a bad idea
chris@1
   261
    // as it removes also leading spaces from long header lines.
chris@1
   262
    if(line.endsWith("\r"))
chris@1
   263
    {
chris@1
   264
      line = line.substring(0, line.length() - 1);
chris@3
   265
      raw  = Arrays.copyOf(raw, raw.length - 1);
chris@1
   266
    }
chris@1
   267
    
cli@15
   268
    Log.get().fine("<< " + line);
chris@1
   269
    
chris@1
   270
    if(command == null)
chris@1
   271
    {
chris@1
   272
      command = parseCommandLine(line);
chris@1
   273
      assert command != null;
chris@1
   274
    }
chris@1
   275
chris@1
   276
    try
chris@1
   277
    {
chris@1
   278
      // The command object will process the line we just received
cli@30
   279
      try
cli@30
   280
      {
cli@30
   281
        command.processLine(this, line, raw);
cli@30
   282
      }
cli@30
   283
      catch(StorageBackendException ex)
cli@30
   284
      {
cli@30
   285
        Log.get().info("Retry command processing after StorageBackendException");
cli@30
   286
cli@30
   287
        // Try it a second time, so that the backend has time to recover
cli@30
   288
        command.processLine(this, line, raw);
cli@30
   289
      }
chris@1
   290
    }
chris@1
   291
    catch(ClosedChannelException ex0)
chris@1
   292
    {
chris@1
   293
      try
chris@1
   294
      {
cli@15
   295
        Log.get().info("Connection to " + channel.socket().getRemoteSocketAddress()
cli@15
   296
            + " closed: " + ex0);
chris@1
   297
      }
chris@1
   298
      catch(Exception ex0a)
chris@1
   299
      {
chris@1
   300
        ex0a.printStackTrace();
chris@1
   301
      }
chris@1
   302
    }
cli@30
   303
    catch(Exception ex1) // This will catch a second StorageBackendException
chris@1
   304
    {
chris@1
   305
      try
chris@1
   306
      {
chris@1
   307
        command = null;
chris@1
   308
        ex1.printStackTrace();
chris@1
   309
        println("500 Internal server error");
chris@1
   310
      }
chris@1
   311
      catch(Exception ex2)
chris@1
   312
      {
chris@1
   313
        ex2.printStackTrace();
chris@1
   314
      }
chris@1
   315
    }
chris@1
   316
chris@1
   317
    if(command == null || command.hasFinished())
chris@1
   318
    {
chris@1
   319
      command = null;
chris@1
   320
      charset = Charset.forName("UTF-8"); // Reset to default
chris@1
   321
    }
chris@1
   322
  }
chris@1
   323
  
chris@1
   324
  /**
chris@3
   325
   * This method determines the fitting command processing class.
chris@1
   326
   * @param line
chris@1
   327
   * @return
chris@1
   328
   */
chris@3
   329
  private Command parseCommandLine(String line)
chris@1
   330
  {
chris@3
   331
    String cmdStr = line.split(" ")[0];
chris@3
   332
    return CommandSelector.getInstance().get(cmdStr);
chris@1
   333
  }
chris@1
   334
  
chris@1
   335
  /**
chris@1
   336
   * Puts the given line into the output buffer, adds a newline character
chris@1
   337
   * and returns. The method returns immediately and does not block until
chris@1
   338
   * the line was sent. If line is longer than 510 octets it is split up in
chris@1
   339
   * several lines. Each line is terminated by \r\n (NNTPConnection.NEWLINE).
chris@1
   340
   * @param line
chris@1
   341
   */
chris@1
   342
  public void println(final CharSequence line, final Charset charset)
chris@1
   343
    throws IOException
chris@1
   344
  {    
chris@1
   345
    writeToChannel(CharBuffer.wrap(line), charset, line);
chris@1
   346
    writeToChannel(CharBuffer.wrap(NEWLINE), charset, null);
chris@1
   347
  }
chris@3
   348
chris@3
   349
  /**
chris@3
   350
   * Writes the given raw lines to the output buffers and finishes with
chris@3
   351
   * a newline character (\r\n).
chris@3
   352
   * @param rawLines
chris@3
   353
   */
chris@3
   354
  public void println(final byte[] rawLines)
chris@3
   355
    throws IOException
chris@3
   356
  {
chris@3
   357
    this.lineBuffers.addOutputBuffer(ByteBuffer.wrap(rawLines));
chris@3
   358
    writeToChannel(CharBuffer.wrap(NEWLINE), charset, null);
chris@3
   359
  }
chris@1
   360
  
chris@1
   361
  /**
chris@1
   362
   * Encodes the given CharBuffer using the given Charset to a bunch of
chris@1
   363
   * ByteBuffers (each 512 bytes large) and enqueues them for writing at the
chris@1
   364
   * connected SocketChannel.
chris@1
   365
   * @throws java.io.IOException
chris@1
   366
   */
chris@1
   367
  private void writeToChannel(CharBuffer characters, final Charset charset,
chris@1
   368
    CharSequence debugLine)
chris@1
   369
    throws IOException
chris@1
   370
  {
chris@1
   371
    if(!charset.canEncode())
chris@1
   372
    {
cli@15
   373
      Log.get().severe("FATAL: Charset " + charset + " cannot encode!");
chris@1
   374
      return;
chris@1
   375
    }
chris@1
   376
    
chris@1
   377
    // Write characters to output buffers
chris@1
   378
    LineEncoder lenc = new LineEncoder(characters, charset);
chris@1
   379
    lenc.encode(lineBuffers);
chris@1
   380
    
chris@3
   381
    enableWriteEvents(debugLine);
chris@3
   382
  }
chris@3
   383
chris@3
   384
  private void enableWriteEvents(CharSequence debugLine)
chris@3
   385
  {
chris@1
   386
    // Enable OP_WRITE events so that the buffers are processed
chris@1
   387
    try
chris@1
   388
    {
chris@1
   389
      this.writeSelKey.interestOps(SelectionKey.OP_WRITE);
chris@1
   390
      ChannelWriter.getInstance().getSelector().wakeup();
chris@1
   391
    }
cli@15
   392
    catch(Exception ex) // CancelledKeyException and ChannelCloseException
chris@1
   393
    {
cli@15
   394
      Log.get().warning("NNTPConnection.writeToChannel(): " + ex);
chris@1
   395
      return;
chris@1
   396
    }
chris@1
   397
chris@1
   398
    // Update last activity timestamp
chris@1
   399
    this.lastActivity = System.currentTimeMillis();
chris@1
   400
    if(debugLine != null)
chris@1
   401
    {
cli@15
   402
      Log.get().fine(">> " + debugLine);
chris@1
   403
    }
chris@1
   404
  }
chris@1
   405
  
chris@1
   406
  public void println(final CharSequence line)
chris@1
   407
    throws IOException
chris@1
   408
  {
chris@1
   409
    println(line, charset);
chris@1
   410
  }
chris@1
   411
  
chris@1
   412
  public void print(final String line)
chris@1
   413
    throws IOException
chris@1
   414
  {
chris@1
   415
    writeToChannel(CharBuffer.wrap(line), charset, line);
chris@1
   416
  }
chris@1
   417
  
chris@1
   418
  public void setCurrentCharset(final Charset charset)
chris@1
   419
  {
chris@1
   420
    this.charset = charset;
chris@1
   421
  }
cli@25
   422
cli@25
   423
  void setLastActivity(long timestamp)
cli@25
   424
  {
cli@25
   425
    this.lastActivity = timestamp;
cli@25
   426
  }
chris@1
   427
  
chris@1
   428
}