org/sonews/daemon/ChannelLineBuffers.java
author cli
Fri, 25 Dec 2009 15:42:46 +0100
changeset 25 dd05c3f2fa24
parent 3 2fdc9cc89502
permissions -rw-r--r--
Fix for too early disconnects on slow client connections. (#563)
chris@1
     1
/*
chris@1
     2
 *   SONEWS News Server
chris@1
     3
 *   see AUTHORS for the list of contributors
chris@1
     4
 *
chris@1
     5
 *   This program is free software: you can redistribute it and/or modify
chris@1
     6
 *   it under the terms of the GNU General Public License as published by
chris@1
     7
 *   the Free Software Foundation, either version 3 of the License, or
chris@1
     8
 *   (at your option) any later version.
chris@1
     9
 *
chris@1
    10
 *   This program is distributed in the hope that it will be useful,
chris@1
    11
 *   but WITHOUT ANY WARRANTY; without even the implied warranty of
chris@1
    12
 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
chris@1
    13
 *   GNU General Public License for more details.
chris@1
    14
 *
chris@1
    15
 *   You should have received a copy of the GNU General Public License
chris@1
    16
 *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
chris@1
    17
 */
chris@1
    18
chris@1
    19
package org.sonews.daemon;
chris@1
    20
chris@1
    21
import java.nio.ByteBuffer;
chris@1
    22
import java.nio.channels.ClosedChannelException;
chris@1
    23
import java.util.ArrayList;
chris@1
    24
import java.util.List;
chris@1
    25
chris@1
    26
/**
chris@1
    27
 * Class holding ByteBuffers for SocketChannels/NNTPConnection.
chris@1
    28
 * Due to the complex nature of AIO/NIO we must properly handle the line 
chris@1
    29
 * buffers for the input and output of the SocketChannels.
chris@1
    30
 * @author Christian Lins
chris@1
    31
 * @since sonews/0.5.0
chris@1
    32
 */
chris@1
    33
public class ChannelLineBuffers 
chris@1
    34
{
chris@1
    35
  
chris@1
    36
  /**
chris@1
    37
   * Size of one small buffer; 
chris@1
    38
   * per default this is 512 bytes to fit one standard line.
chris@1
    39
   */
chris@1
    40
  public static final int BUFFER_SIZE = 512;
chris@1
    41
  
chris@1
    42
  private static int maxCachedBuffers = 2048; // Cached buffers maximum
chris@1
    43
  
chris@1
    44
  private static final List<ByteBuffer> freeSmallBuffers
chris@1
    45
    = new ArrayList<ByteBuffer>(maxCachedBuffers);
chris@1
    46
  
chris@1
    47
  /**
chris@1
    48
   * Allocates a predefined number of direct ByteBuffers (allocated via
chris@1
    49
   * ByteBuffer.allocateDirect()). This method is Thread-safe, but should only
chris@1
    50
   * called at startup.
chris@1
    51
   */
chris@1
    52
  public static void allocateDirect()
chris@1
    53
  {
chris@1
    54
    synchronized(freeSmallBuffers)
chris@1
    55
    {
chris@1
    56
      for(int n = 0; n < maxCachedBuffers; n++)
chris@1
    57
      {
chris@1
    58
        ByteBuffer buffer = ByteBuffer.allocateDirect(BUFFER_SIZE);
chris@1
    59
        freeSmallBuffers.add(buffer);
chris@1
    60
      }
chris@1
    61
    }
chris@1
    62
  }
chris@1
    63
  
chris@1
    64
  private ByteBuffer       inputBuffer   = newLineBuffer();
chris@1
    65
  private List<ByteBuffer> outputBuffers = new ArrayList<ByteBuffer>();
chris@1
    66
  
chris@1
    67
  /**
chris@1
    68
   * Add the given ByteBuffer to the list of buffers to be send to the client.
chris@1
    69
   * This method is Thread-safe.
chris@1
    70
   * @param buffer
chris@1
    71
   * @throws java.nio.channels.ClosedChannelException If the client channel was
chris@1
    72
   * already closed.
chris@1
    73
   */
chris@1
    74
  public void addOutputBuffer(ByteBuffer buffer)
chris@1
    75
    throws ClosedChannelException
chris@1
    76
  {
chris@1
    77
    if(outputBuffers == null)
chris@1
    78
    {
chris@1
    79
      throw new ClosedChannelException();
chris@1
    80
    }
chris@1
    81
    
chris@1
    82
    synchronized(outputBuffers)
chris@1
    83
    {
chris@1
    84
      outputBuffers.add(buffer);
chris@1
    85
    }
chris@1
    86
  }
chris@1
    87
  
chris@1
    88
  /**
chris@1
    89
   * Currently a channel has only one input buffer. This *may* be a bottleneck
chris@1
    90
   * and should investigated in the future.
chris@1
    91
   * @param channel
chris@1
    92
   * @return The input buffer associated with given channel.
chris@1
    93
   */
chris@1
    94
  public ByteBuffer getInputBuffer()
chris@1
    95
  {
chris@1
    96
    return inputBuffer;
chris@1
    97
  }
chris@1
    98
  
chris@1
    99
  /**
chris@1
   100
   * Returns the current output buffer for writing(!) to SocketChannel.
chris@1
   101
   * @param channel
chris@1
   102
   * @return The next input buffer that contains unprocessed data or null
chris@1
   103
   * if the connection was closed or there are no more unprocessed buffers.
chris@1
   104
   */
chris@1
   105
  public ByteBuffer getOutputBuffer()
chris@1
   106
  {
chris@1
   107
    synchronized(outputBuffers)
chris@1
   108
    {
chris@1
   109
      if(outputBuffers == null || outputBuffers.isEmpty())
chris@1
   110
      {
chris@1
   111
        return null;
chris@1
   112
      }
chris@1
   113
      else
chris@1
   114
      {
chris@1
   115
        ByteBuffer buffer = outputBuffers.get(0);
chris@1
   116
        if(buffer.remaining() == 0)
chris@1
   117
        {
chris@1
   118
          outputBuffers.remove(0);
chris@1
   119
          // Add old buffers to the list of free buffers
chris@1
   120
          recycleBuffer(buffer);
chris@1
   121
          buffer = getOutputBuffer();
chris@1
   122
        }
chris@1
   123
        return buffer;
chris@1
   124
      }
chris@1
   125
    }
chris@1
   126
  }
cli@25
   127
cli@25
   128
  /**
cli@25
   129
   * @return false if there are output buffers pending to be written to the
cli@25
   130
   * client.
cli@25
   131
   */
cli@25
   132
  boolean isOutputBufferEmpty()
cli@25
   133
  {
cli@25
   134
    synchronized(outputBuffers)
cli@25
   135
    {
cli@25
   136
      return outputBuffers.isEmpty();
cli@25
   137
    }
cli@25
   138
  }
chris@1
   139
  
chris@1
   140
  /**
chris@1
   141
   * Goes through the input buffer of the given channel and searches
chris@1
   142
   * for next line terminator. If a '\n' is found, the bytes up to the
chris@1
   143
   * line terminator are returned as array of bytes (the line terminator
chris@1
   144
   * is omitted). If none is found the method returns null.
chris@1
   145
   * @param channel
chris@1
   146
   * @return A ByteBuffer wrapping the line.
chris@1
   147
   */
chris@1
   148
  ByteBuffer nextInputLine()
chris@1
   149
  {
chris@1
   150
    if(inputBuffer == null)
chris@1
   151
    {
chris@1
   152
      return null;
chris@1
   153
    }
chris@1
   154
    
chris@1
   155
    synchronized(inputBuffer)
chris@1
   156
    {
chris@1
   157
      ByteBuffer buffer = inputBuffer;
chris@1
   158
chris@1
   159
      // Mark the current write position
chris@1
   160
      int mark = buffer.position();
chris@1
   161
chris@1
   162
      // Set position to 0 and limit to current position
chris@1
   163
      buffer.flip();
chris@1
   164
chris@1
   165
      ByteBuffer lineBuffer = newLineBuffer();
chris@1
   166
chris@1
   167
      while (buffer.position() < buffer.limit())
chris@1
   168
      {
chris@1
   169
        byte b = buffer.get();
chris@1
   170
        if (b == 10) // '\n'
chris@1
   171
        {
chris@1
   172
          // The bytes between the buffer's current position and its limit, 
chris@1
   173
          // if any, are copied to the beginning of the buffer. That is, the 
chris@1
   174
          // byte at index p = position() is copied to index zero, the byte at 
chris@1
   175
          // index p + 1 is copied to index one, and so forth until the byte 
chris@1
   176
          // at index limit() - 1 is copied to index n = limit() - 1 - p. 
chris@1
   177
          // The buffer's position is then set to n+1 and its limit is set to 
chris@1
   178
          // its capacity.
chris@1
   179
          buffer.compact();
chris@1
   180
chris@1
   181
          lineBuffer.flip(); // limit to position, position to 0
chris@1
   182
          return lineBuffer;
chris@1
   183
        }
chris@1
   184
        else
chris@1
   185
        {
chris@1
   186
          lineBuffer.put(b);
chris@1
   187
        }
chris@1
   188
      }
chris@1
   189
chris@1
   190
      buffer.limit(BUFFER_SIZE);
chris@1
   191
      buffer.position(mark);
chris@1
   192
chris@1
   193
      if(buffer.hasRemaining())
chris@1
   194
      {
chris@1
   195
        return null;
chris@1
   196
      }
chris@1
   197
      else
chris@1
   198
      {
chris@1
   199
        // In the first 512 was no newline found, so the input is not standard
chris@1
   200
        // compliant. We return the current buffer as new line and add a space
chris@1
   201
        // to the beginning of the next line which corrects some overlong header
chris@1
   202
        // lines.
chris@1
   203
        inputBuffer = newLineBuffer();
chris@1
   204
        inputBuffer.put((byte)' ');
chris@1
   205
        buffer.flip();
chris@1
   206
        return buffer;
chris@1
   207
      }
chris@1
   208
    }
chris@1
   209
  }
chris@1
   210
  
chris@1
   211
  /**
chris@1
   212
   * Returns a at least 512 bytes long ByteBuffer ready for usage.
chris@1
   213
   * The method first try to reuse an already allocated (cached) buffer but
chris@1
   214
   * if that fails returns a newly allocated direct buffer.
chris@1
   215
   * Use recycleBuffer() method when you do not longer use the allocated buffer.
chris@1
   216
   */
chris@1
   217
  static ByteBuffer newLineBuffer()
chris@1
   218
  {
chris@1
   219
    ByteBuffer buf = null;
chris@1
   220
    synchronized(freeSmallBuffers)
chris@1
   221
    {
chris@1
   222
      if(!freeSmallBuffers.isEmpty())
chris@1
   223
      {
chris@1
   224
        buf = freeSmallBuffers.remove(0);
chris@1
   225
      }
chris@1
   226
    }
chris@1
   227
      
chris@1
   228
    if(buf == null)
chris@1
   229
    {
chris@1
   230
      // Allocate a non-direct buffer
chris@1
   231
      buf = ByteBuffer.allocate(BUFFER_SIZE);
chris@1
   232
    }
chris@1
   233
    
chris@1
   234
    assert buf.position() == 0;
chris@1
   235
    assert buf.limit() >= BUFFER_SIZE;
chris@1
   236
    
chris@1
   237
    return buf;
chris@1
   238
  }
chris@1
   239
  
chris@1
   240
  /**
chris@1
   241
   * Adds the given buffer to the list of free buffers if it is a valuable
chris@1
   242
   * direct allocated buffer.
chris@1
   243
   * @param buffer
chris@1
   244
   */
chris@1
   245
  public static void recycleBuffer(ByteBuffer buffer)
chris@1
   246
  {
chris@1
   247
    assert buffer != null;
chris@1
   248
chris@1
   249
    if(buffer.isDirect())
chris@1
   250
    {
chris@3
   251
      assert buffer.capacity() >= BUFFER_SIZE;
chris@3
   252
      
chris@1
   253
      // Add old buffers to the list of free buffers
chris@1
   254
      synchronized(freeSmallBuffers)
chris@1
   255
      {
chris@1
   256
        buffer.clear(); // Set position to 0 and limit to capacity
chris@1
   257
        freeSmallBuffers.add(buffer);
chris@1
   258
      }
chris@1
   259
    } // if(buffer.isDirect())
chris@1
   260
  }
chris@1
   261
  
chris@1
   262
  /**
chris@1
   263
   * Recycles all buffers of this ChannelLineBuffers object.
chris@1
   264
   */
chris@1
   265
  public void recycleBuffers()
chris@1
   266
  {
chris@1
   267
    synchronized(inputBuffer)
chris@1
   268
    {
chris@1
   269
      recycleBuffer(inputBuffer);
chris@1
   270
      this.inputBuffer = null;
chris@1
   271
    }
chris@1
   272
    
chris@1
   273
    synchronized(outputBuffers)
chris@1
   274
    {
chris@1
   275
      for(ByteBuffer buf : outputBuffers)
chris@1
   276
      {
chris@1
   277
        recycleBuffer(buf);
chris@1
   278
      }
chris@1
   279
      outputBuffers = null;
chris@1
   280
    }
chris@1
   281
  }
chris@1
   282
  
chris@1
   283
}