src/org/sonews/daemon/ChannelLineBuffers.java
author cli
Sun, 29 Aug 2010 17:43:58 +0200
changeset 36 c404a87db5b7
parent 35 ed84c8bdd87b
child 37 74139325d305
permissions -rw-r--r--
Add some checks to prevent #13 happen.
     1 /*
     2  *   SONEWS News Server
     3  *   see AUTHORS for the list of contributors
     4  *
     5  *   This program is free software: you can redistribute it and/or modify
     6  *   it under the terms of the GNU General Public License as published by
     7  *   the Free Software Foundation, either version 3 of the License, or
     8  *   (at your option) any later version.
     9  *
    10  *   This program is distributed in the hope that it will be useful,
    11  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
    12  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    13  *   GNU General Public License for more details.
    14  *
    15  *   You should have received a copy of the GNU General Public License
    16  *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
    17  */
    18 
    19 package org.sonews.daemon;
    20 
    21 import java.nio.ByteBuffer;
    22 import java.nio.channels.ClosedChannelException;
    23 import java.util.ArrayList;
    24 import java.util.List;
    25 
    26 /**
    27  * Class holding ByteBuffers for SocketChannels/NNTPConnection.
    28  * Due to the complex nature of AIO/NIO we must properly handle the line 
    29  * buffers for the input and output of the SocketChannels.
    30  * @author Christian Lins
    31  * @since sonews/0.5.0
    32  */
    33 public class ChannelLineBuffers 
    34 {
    35   
    36   /**
    37    * Size of one small buffer; 
    38    * per default this is 512 bytes to fit one standard line.
    39    */
    40   public static final int BUFFER_SIZE = 512;
    41   
    42   private static int maxCachedBuffers = 2048; // Cached buffers maximum
    43   
    44   private static final List<ByteBuffer> freeSmallBuffers
    45     = new ArrayList<ByteBuffer>(maxCachedBuffers);
    46   
    47   /**
    48    * Allocates a predefined number of direct ByteBuffers (allocated via
    49    * ByteBuffer.allocateDirect()). This method is Thread-safe, but should only
    50    * called at startup.
    51    */
    52   public static void allocateDirect()
    53   {
    54     synchronized(freeSmallBuffers)
    55     {
    56       for(int n = 0; n < maxCachedBuffers; n++)
    57       {
    58         ByteBuffer buffer = ByteBuffer.allocateDirect(BUFFER_SIZE);
    59         freeSmallBuffers.add(buffer);
    60       }
    61     }
    62   }
    63   
    64   private ByteBuffer       inputBuffer   = newLineBuffer();
    65   private List<ByteBuffer> outputBuffers = new ArrayList<ByteBuffer>();
    66   
    67   /**
    68    * Add the given ByteBuffer to the list of buffers to be send to the client.
    69    * This method is Thread-safe.
    70    * @param buffer
    71    * @throws java.nio.channels.ClosedChannelException If the client channel was
    72    * already closed.
    73    */
    74   public void addOutputBuffer(ByteBuffer buffer)
    75     throws ClosedChannelException
    76   {
    77     if(outputBuffers == null)
    78     {
    79       throw new ClosedChannelException();
    80     }
    81     
    82     synchronized(outputBuffers)
    83     {
    84       outputBuffers.add(buffer);
    85     }
    86   }
    87   
    88   /**
    89    * Currently a channel has only one input buffer. This *may* be a bottleneck
    90    * and should investigated in the future.
    91    * @param channel
    92    * @return The input buffer associated with given channel.
    93    */
    94   public ByteBuffer getInputBuffer()
    95   {
    96     return inputBuffer;
    97   }
    98   
    99   /**
   100    * Returns the current output buffer for writing(!) to SocketChannel.
   101    * @param channel
   102    * @return The next input buffer that contains unprocessed data or null
   103    * if the connection was closed or there are no more unprocessed buffers.
   104    */
   105   public ByteBuffer getOutputBuffer()
   106   {
   107     synchronized(outputBuffers)
   108     {
   109       if(outputBuffers == null || outputBuffers.isEmpty())
   110       {
   111         return null;
   112       }
   113       else
   114       {
   115         ByteBuffer buffer = outputBuffers.get(0);
   116         if(buffer.remaining() == 0)
   117         {
   118           outputBuffers.remove(0);
   119           // Add old buffers to the list of free buffers
   120           recycleBuffer(buffer);
   121           buffer = getOutputBuffer();
   122         }
   123         return buffer;
   124       }
   125     }
   126   }
   127 
   128   /**
   129    * @return false if there are output buffers pending to be written to the
   130    * client.
   131    */
   132   boolean isOutputBufferEmpty()
   133   {
   134     synchronized(outputBuffers)
   135     {
   136       return outputBuffers.isEmpty();
   137     }
   138   }
   139   
   140   /**
   141    * Goes through the input buffer of the given channel and searches
   142    * for next line terminator. If a '\n' is found, the bytes up to the
   143    * line terminator are returned as array of bytes (the line terminator
   144    * is omitted). If none is found the method returns null.
   145    * @param channel
   146    * @return A ByteBuffer wrapping the line.
   147    */
   148   ByteBuffer nextInputLine()
   149   {
   150     if(inputBuffer == null)
   151     {
   152       return null;
   153     }
   154     
   155     synchronized(inputBuffer)
   156     {
   157       ByteBuffer buffer = inputBuffer;
   158 
   159       // Mark the current write position
   160       int mark = buffer.position();
   161 
   162       // Set position to 0 and limit to current position
   163       buffer.flip();
   164 
   165       ByteBuffer lineBuffer = newLineBuffer();
   166 
   167       while (buffer.position() < buffer.limit())
   168       {
   169         byte b = buffer.get();
   170         if (b == 10) // '\n'
   171         {
   172           // The bytes between the buffer's current position and its limit, 
   173           // if any, are copied to the beginning of the buffer. That is, the 
   174           // byte at index p = position() is copied to index zero, the byte at 
   175           // index p + 1 is copied to index one, and so forth until the byte 
   176           // at index limit() - 1 is copied to index n = limit() - 1 - p. 
   177           // The buffer's position is then set to n+1 and its limit is set to 
   178           // its capacity.
   179           buffer.compact();
   180 
   181           lineBuffer.flip(); // limit to position, position to 0
   182           return lineBuffer;
   183         }
   184         else
   185         {
   186           lineBuffer.put(b);
   187         }
   188       }
   189 
   190       buffer.limit(BUFFER_SIZE);
   191       buffer.position(mark);
   192 
   193       if(buffer.hasRemaining())
   194       {
   195         return null;
   196       }
   197       else
   198       {
   199         // In the first 512 was no newline found, so the input is not standard
   200         // compliant. We return the current buffer as new line and add a space
   201         // to the beginning of the next line which corrects some overlong header
   202         // lines.
   203         inputBuffer = newLineBuffer();
   204         inputBuffer.put((byte)' ');
   205         buffer.flip();
   206         return buffer;
   207       }
   208     }
   209   }
   210   
   211   /**
   212    * Returns a at least 512 bytes long ByteBuffer ready for usage.
   213    * The method first try to reuse an already allocated (cached) buffer but
   214    * if that fails returns a newly allocated direct buffer.
   215    * Use recycleBuffer() method when you do not longer use the allocated buffer.
   216    */
   217   static ByteBuffer newLineBuffer()
   218   {
   219     ByteBuffer buf = null;
   220     synchronized(freeSmallBuffers)
   221     {
   222       if(!freeSmallBuffers.isEmpty())
   223       {
   224         buf = freeSmallBuffers.remove(0);
   225       }
   226     }
   227       
   228     if(buf == null)
   229     {
   230       // Allocate a non-direct buffer
   231       buf = ByteBuffer.allocate(BUFFER_SIZE);
   232     }
   233     
   234     assert buf.position() == 0;
   235     assert buf.limit() >= BUFFER_SIZE;
   236     
   237     return buf;
   238   }
   239   
   240   /**
   241    * Adds the given buffer to the list of free buffers if it is a valuable
   242    * direct allocated buffer.
   243    * @param buffer
   244    */
   245   public static void recycleBuffer(ByteBuffer buffer)
   246   {
   247     assert buffer != null;
   248 
   249     if(buffer.isDirect())
   250     {
   251       assert buffer.capacity() >= BUFFER_SIZE;
   252       
   253       // Add old buffers to the list of free buffers
   254       synchronized(freeSmallBuffers)
   255       {
   256         buffer.clear(); // Set position to 0 and limit to capacity
   257         freeSmallBuffers.add(buffer);
   258       }
   259     } // if(buffer.isDirect())
   260   }
   261   
   262   /**
   263    * Recycles all buffers of this ChannelLineBuffers object.
   264    */
   265   public void recycleBuffers()
   266   {
   267     synchronized(inputBuffer)
   268     {
   269       recycleBuffer(inputBuffer);
   270       this.inputBuffer = null;
   271     }
   272     
   273     synchronized(outputBuffers)
   274     {
   275       for(ByteBuffer buf : outputBuffers)
   276       {
   277         recycleBuffer(buf);
   278       }
   279       outputBuffers = null;
   280     }
   281   }
   282   
   283 }