org/sonews/daemon/ChannelLineBuffers.java
author cli
Thu, 20 Aug 2009 22:18:45 +0200
changeset 17 4ae6ada7ea23
parent 3 2fdc9cc89502
child 25 dd05c3f2fa24
permissions -rw-r--r--
#544 fixed.
chris@1
     1
/*
chris@1
     2
 *   SONEWS News Server
chris@1
     3
 *   see AUTHORS for the list of contributors
chris@1
     4
 *
chris@1
     5
 *   This program is free software: you can redistribute it and/or modify
chris@1
     6
 *   it under the terms of the GNU General Public License as published by
chris@1
     7
 *   the Free Software Foundation, either version 3 of the License, or
chris@1
     8
 *   (at your option) any later version.
chris@1
     9
 *
chris@1
    10
 *   This program is distributed in the hope that it will be useful,
chris@1
    11
 *   but WITHOUT ANY WARRANTY; without even the implied warranty of
chris@1
    12
 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
chris@1
    13
 *   GNU General Public License for more details.
chris@1
    14
 *
chris@1
    15
 *   You should have received a copy of the GNU General Public License
chris@1
    16
 *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
chris@1
    17
 */
chris@1
    18
chris@1
    19
package org.sonews.daemon;
chris@1
    20
chris@1
    21
import java.nio.ByteBuffer;
chris@1
    22
import java.nio.channels.ClosedChannelException;
chris@1
    23
import java.util.ArrayList;
chris@1
    24
import java.util.List;
chris@1
    25
chris@1
    26
/**
chris@1
    27
 * Class holding ByteBuffers for SocketChannels/NNTPConnection.
chris@1
    28
 * Due to the complex nature of AIO/NIO we must properly handle the line 
chris@1
    29
 * buffers for the input and output of the SocketChannels.
chris@1
    30
 * @author Christian Lins
chris@1
    31
 * @since sonews/0.5.0
chris@1
    32
 */
chris@1
    33
public class ChannelLineBuffers 
chris@1
    34
{
chris@1
    35
  
chris@1
    36
  /**
chris@1
    37
   * Size of one small buffer; 
chris@1
    38
   * per default this is 512 bytes to fit one standard line.
chris@1
    39
   */
chris@1
    40
  public static final int BUFFER_SIZE = 512;
chris@1
    41
  
chris@1
    42
  private static int maxCachedBuffers = 2048; // Cached buffers maximum
chris@1
    43
  
chris@1
    44
  private static final List<ByteBuffer> freeSmallBuffers
chris@1
    45
    = new ArrayList<ByteBuffer>(maxCachedBuffers);
chris@1
    46
  
chris@1
    47
  /**
chris@1
    48
   * Allocates a predefined number of direct ByteBuffers (allocated via
chris@1
    49
   * ByteBuffer.allocateDirect()). This method is Thread-safe, but should only
chris@1
    50
   * called at startup.
chris@1
    51
   */
chris@1
    52
  public static void allocateDirect()
chris@1
    53
  {
chris@1
    54
    synchronized(freeSmallBuffers)
chris@1
    55
    {
chris@1
    56
      for(int n = 0; n < maxCachedBuffers; n++)
chris@1
    57
      {
chris@1
    58
        ByteBuffer buffer = ByteBuffer.allocateDirect(BUFFER_SIZE);
chris@1
    59
        freeSmallBuffers.add(buffer);
chris@1
    60
      }
chris@1
    61
    }
chris@1
    62
  }
chris@1
    63
  
chris@1
    64
  private ByteBuffer       inputBuffer   = newLineBuffer();
chris@1
    65
  private List<ByteBuffer> outputBuffers = new ArrayList<ByteBuffer>();
chris@1
    66
  
chris@1
    67
  /**
chris@1
    68
   * Add the given ByteBuffer to the list of buffers to be send to the client.
chris@1
    69
   * This method is Thread-safe.
chris@1
    70
   * @param buffer
chris@1
    71
   * @throws java.nio.channels.ClosedChannelException If the client channel was
chris@1
    72
   * already closed.
chris@1
    73
   */
chris@1
    74
  public void addOutputBuffer(ByteBuffer buffer)
chris@1
    75
    throws ClosedChannelException
chris@1
    76
  {
chris@1
    77
    if(outputBuffers == null)
chris@1
    78
    {
chris@1
    79
      throw new ClosedChannelException();
chris@1
    80
    }
chris@1
    81
    
chris@1
    82
    synchronized(outputBuffers)
chris@1
    83
    {
chris@1
    84
      outputBuffers.add(buffer);
chris@1
    85
    }
chris@1
    86
  }
chris@1
    87
  
chris@1
    88
  /**
chris@1
    89
   * Currently a channel has only one input buffer. This *may* be a bottleneck
chris@1
    90
   * and should investigated in the future.
chris@1
    91
   * @param channel
chris@1
    92
   * @return The input buffer associated with given channel.
chris@1
    93
   */
chris@1
    94
  public ByteBuffer getInputBuffer()
chris@1
    95
  {
chris@1
    96
    return inputBuffer;
chris@1
    97
  }
chris@1
    98
  
chris@1
    99
  /**
chris@1
   100
   * Returns the current output buffer for writing(!) to SocketChannel.
chris@1
   101
   * @param channel
chris@1
   102
   * @return The next input buffer that contains unprocessed data or null
chris@1
   103
   * if the connection was closed or there are no more unprocessed buffers.
chris@1
   104
   */
chris@1
   105
  public ByteBuffer getOutputBuffer()
chris@1
   106
  {
chris@1
   107
    synchronized(outputBuffers)
chris@1
   108
    {
chris@1
   109
      if(outputBuffers == null || outputBuffers.isEmpty())
chris@1
   110
      {
chris@1
   111
        return null;
chris@1
   112
      }
chris@1
   113
      else
chris@1
   114
      {
chris@1
   115
        ByteBuffer buffer = outputBuffers.get(0);
chris@1
   116
        if(buffer.remaining() == 0)
chris@1
   117
        {
chris@1
   118
          outputBuffers.remove(0);
chris@1
   119
          // Add old buffers to the list of free buffers
chris@1
   120
          recycleBuffer(buffer);
chris@1
   121
          buffer = getOutputBuffer();
chris@1
   122
        }
chris@1
   123
        return buffer;
chris@1
   124
      }
chris@1
   125
    }
chris@1
   126
  }
chris@1
   127
  
chris@1
   128
  /**
chris@1
   129
   * Goes through the input buffer of the given channel and searches
chris@1
   130
   * for next line terminator. If a '\n' is found, the bytes up to the
chris@1
   131
   * line terminator are returned as array of bytes (the line terminator
chris@1
   132
   * is omitted). If none is found the method returns null.
chris@1
   133
   * @param channel
chris@1
   134
   * @return A ByteBuffer wrapping the line.
chris@1
   135
   */
chris@1
   136
  ByteBuffer nextInputLine()
chris@1
   137
  {
chris@1
   138
    if(inputBuffer == null)
chris@1
   139
    {
chris@1
   140
      return null;
chris@1
   141
    }
chris@1
   142
    
chris@1
   143
    synchronized(inputBuffer)
chris@1
   144
    {
chris@1
   145
      ByteBuffer buffer = inputBuffer;
chris@1
   146
chris@1
   147
      // Mark the current write position
chris@1
   148
      int mark = buffer.position();
chris@1
   149
chris@1
   150
      // Set position to 0 and limit to current position
chris@1
   151
      buffer.flip();
chris@1
   152
chris@1
   153
      ByteBuffer lineBuffer = newLineBuffer();
chris@1
   154
chris@1
   155
      while (buffer.position() < buffer.limit())
chris@1
   156
      {
chris@1
   157
        byte b = buffer.get();
chris@1
   158
        if (b == 10) // '\n'
chris@1
   159
        {
chris@1
   160
          // The bytes between the buffer's current position and its limit, 
chris@1
   161
          // if any, are copied to the beginning of the buffer. That is, the 
chris@1
   162
          // byte at index p = position() is copied to index zero, the byte at 
chris@1
   163
          // index p + 1 is copied to index one, and so forth until the byte 
chris@1
   164
          // at index limit() - 1 is copied to index n = limit() - 1 - p. 
chris@1
   165
          // The buffer's position is then set to n+1 and its limit is set to 
chris@1
   166
          // its capacity.
chris@1
   167
          buffer.compact();
chris@1
   168
chris@1
   169
          lineBuffer.flip(); // limit to position, position to 0
chris@1
   170
          return lineBuffer;
chris@1
   171
        }
chris@1
   172
        else
chris@1
   173
        {
chris@1
   174
          lineBuffer.put(b);
chris@1
   175
        }
chris@1
   176
      }
chris@1
   177
chris@1
   178
      buffer.limit(BUFFER_SIZE);
chris@1
   179
      buffer.position(mark);
chris@1
   180
chris@1
   181
      if(buffer.hasRemaining())
chris@1
   182
      {
chris@1
   183
        return null;
chris@1
   184
      }
chris@1
   185
      else
chris@1
   186
      {
chris@1
   187
        // In the first 512 was no newline found, so the input is not standard
chris@1
   188
        // compliant. We return the current buffer as new line and add a space
chris@1
   189
        // to the beginning of the next line which corrects some overlong header
chris@1
   190
        // lines.
chris@1
   191
        inputBuffer = newLineBuffer();
chris@1
   192
        inputBuffer.put((byte)' ');
chris@1
   193
        buffer.flip();
chris@1
   194
        return buffer;
chris@1
   195
      }
chris@1
   196
    }
chris@1
   197
  }
chris@1
   198
  
chris@1
   199
  /**
chris@1
   200
   * Returns a at least 512 bytes long ByteBuffer ready for usage.
chris@1
   201
   * The method first try to reuse an already allocated (cached) buffer but
chris@1
   202
   * if that fails returns a newly allocated direct buffer.
chris@1
   203
   * Use recycleBuffer() method when you do not longer use the allocated buffer.
chris@1
   204
   */
chris@1
   205
  static ByteBuffer newLineBuffer()
chris@1
   206
  {
chris@1
   207
    ByteBuffer buf = null;
chris@1
   208
    synchronized(freeSmallBuffers)
chris@1
   209
    {
chris@1
   210
      if(!freeSmallBuffers.isEmpty())
chris@1
   211
      {
chris@1
   212
        buf = freeSmallBuffers.remove(0);
chris@1
   213
      }
chris@1
   214
    }
chris@1
   215
      
chris@1
   216
    if(buf == null)
chris@1
   217
    {
chris@1
   218
      // Allocate a non-direct buffer
chris@1
   219
      buf = ByteBuffer.allocate(BUFFER_SIZE);
chris@1
   220
    }
chris@1
   221
    
chris@1
   222
    assert buf.position() == 0;
chris@1
   223
    assert buf.limit() >= BUFFER_SIZE;
chris@1
   224
    
chris@1
   225
    return buf;
chris@1
   226
  }
chris@1
   227
  
chris@1
   228
  /**
chris@1
   229
   * Adds the given buffer to the list of free buffers if it is a valuable
chris@1
   230
   * direct allocated buffer.
chris@1
   231
   * @param buffer
chris@1
   232
   */
chris@1
   233
  public static void recycleBuffer(ByteBuffer buffer)
chris@1
   234
  {
chris@1
   235
    assert buffer != null;
chris@1
   236
chris@1
   237
    if(buffer.isDirect())
chris@1
   238
    {
chris@3
   239
      assert buffer.capacity() >= BUFFER_SIZE;
chris@3
   240
      
chris@1
   241
      // Add old buffers to the list of free buffers
chris@1
   242
      synchronized(freeSmallBuffers)
chris@1
   243
      {
chris@1
   244
        buffer.clear(); // Set position to 0 and limit to capacity
chris@1
   245
        freeSmallBuffers.add(buffer);
chris@1
   246
      }
chris@1
   247
    } // if(buffer.isDirect())
chris@1
   248
  }
chris@1
   249
  
chris@1
   250
  /**
chris@1
   251
   * Recycles all buffers of this ChannelLineBuffers object.
chris@1
   252
   */
chris@1
   253
  public void recycleBuffers()
chris@1
   254
  {
chris@1
   255
    synchronized(inputBuffer)
chris@1
   256
    {
chris@1
   257
      recycleBuffer(inputBuffer);
chris@1
   258
      this.inputBuffer = null;
chris@1
   259
    }
chris@1
   260
    
chris@1
   261
    synchronized(outputBuffers)
chris@1
   262
    {
chris@1
   263
      for(ByteBuffer buf : outputBuffers)
chris@1
   264
      {
chris@1
   265
        recycleBuffer(buf);
chris@1
   266
      }
chris@1
   267
      outputBuffers = null;
chris@1
   268
    }
chris@1
   269
  }
chris@1
   270
  
chris@1
   271
}