Switch intent style to Original K&R / Linux / Kernel.
3 * see AUTHORS for the list of contributors
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 package org.sonews.daemon;
21 import java.nio.ByteBuffer;
22 import java.nio.channels.ClosedChannelException;
23 import java.util.ArrayList;
24 import java.util.List;
27 * Class holding ByteBuffers for SocketChannels/NNTPConnection.
28 * Due to the complex nature of AIO/NIO we must properly handle the line
29 * buffers for the input and output of the SocketChannels.
30 * @author Christian Lins
33 public class ChannelLineBuffers
37 * Size of one small buffer;
38 * per default this is 512 bytes to fit one standard line.
40 public static final int BUFFER_SIZE = 512;
41 private static int maxCachedBuffers = 2048; // Cached buffers maximum
42 private static final List<ByteBuffer> freeSmallBuffers = new ArrayList<ByteBuffer>(maxCachedBuffers);
45 * Allocates a predefined number of direct ByteBuffers (allocated via
46 * ByteBuffer.allocateDirect()). This method is Thread-safe, but should only
49 public static void allocateDirect()
51 synchronized (freeSmallBuffers) {
52 for (int n = 0; n < maxCachedBuffers; n++) {
53 ByteBuffer buffer = ByteBuffer.allocateDirect(BUFFER_SIZE);
54 freeSmallBuffers.add(buffer);
58 private ByteBuffer inputBuffer = newLineBuffer();
59 private List<ByteBuffer> outputBuffers = new ArrayList<ByteBuffer>();
62 * Add the given ByteBuffer to the list of buffers to be send to the client.
63 * This method is Thread-safe.
65 * @throws java.nio.channels.ClosedChannelException If the client channel was
68 public void addOutputBuffer(ByteBuffer buffer)
69 throws ClosedChannelException
71 if (outputBuffers == null) {
72 throw new ClosedChannelException();
75 synchronized (outputBuffers) {
76 outputBuffers.add(buffer);
81 * Currently a channel has only one input buffer. This *may* be a bottleneck
82 * and should investigated in the future.
84 * @return The input buffer associated with given channel.
86 public ByteBuffer getInputBuffer()
92 * Returns the current output buffer for writing(!) to SocketChannel.
94 * @return The next input buffer that contains unprocessed data or null
95 * if the connection was closed or there are no more unprocessed buffers.
97 public ByteBuffer getOutputBuffer()
99 synchronized (outputBuffers) {
100 if (outputBuffers == null || outputBuffers.isEmpty()) {
103 ByteBuffer buffer = outputBuffers.get(0);
104 if (buffer.remaining() == 0) {
105 outputBuffers.remove(0);
106 // Add old buffers to the list of free buffers
107 recycleBuffer(buffer);
108 buffer = getOutputBuffer();
116 * @return false if there are output buffers pending to be written to the
119 boolean isOutputBufferEmpty()
121 synchronized (outputBuffers) {
122 return outputBuffers.isEmpty();
127 * Goes through the input buffer of the given channel and searches
128 * for next line terminator. If a '\n' is found, the bytes up to the
129 * line terminator are returned as array of bytes (the line terminator
130 * is omitted). If none is found the method returns null.
132 * @return A ByteBuffer wrapping the line.
134 ByteBuffer nextInputLine()
136 if (inputBuffer == null) {
140 synchronized (inputBuffer) {
141 ByteBuffer buffer = inputBuffer;
143 // Mark the current write position
144 int mark = buffer.position();
146 // Set position to 0 and limit to current position
149 ByteBuffer lineBuffer = newLineBuffer();
151 while (buffer.position() < buffer.limit()) {
152 byte b = buffer.get();
155 // The bytes between the buffer's current position and its limit,
156 // if any, are copied to the beginning of the buffer. That is, the
157 // byte at index p = position() is copied to index zero, the byte at
158 // index p + 1 is copied to index one, and so forth until the byte
159 // at index limit() - 1 is copied to index n = limit() - 1 - p.
160 // The buffer's position is then set to n+1 and its limit is set to
164 lineBuffer.flip(); // limit to position, position to 0
171 buffer.limit(BUFFER_SIZE);
172 buffer.position(mark);
174 if (buffer.hasRemaining()) {
177 // In the first 512 was no newline found, so the input is not standard
178 // compliant. We return the current buffer as new line and add a space
179 // to the beginning of the next line which corrects some overlong header
181 inputBuffer = newLineBuffer();
182 inputBuffer.put((byte) ' ');
190 * Returns a at least 512 bytes long ByteBuffer ready for usage.
191 * The method first try to reuse an already allocated (cached) buffer but
192 * if that fails returns a newly allocated direct buffer.
193 * Use recycleBuffer() method when you do not longer use the allocated buffer.
195 static ByteBuffer newLineBuffer()
197 ByteBuffer buf = null;
198 synchronized (freeSmallBuffers) {
199 if (!freeSmallBuffers.isEmpty()) {
200 buf = freeSmallBuffers.remove(0);
205 // Allocate a non-direct buffer
206 buf = ByteBuffer.allocate(BUFFER_SIZE);
209 assert buf.position() == 0;
210 assert buf.limit() >= BUFFER_SIZE;
216 * Adds the given buffer to the list of free buffers if it is a valuable
217 * direct allocated buffer.
220 public static void recycleBuffer(ByteBuffer buffer)
222 assert buffer != null;
224 if (buffer.isDirect()) {
225 assert buffer.capacity() >= BUFFER_SIZE;
227 // Add old buffers to the list of free buffers
228 synchronized (freeSmallBuffers) {
229 buffer.clear(); // Set position to 0 and limit to capacity
230 freeSmallBuffers.add(buffer);
232 } // if(buffer.isDirect())
236 * Recycles all buffers of this ChannelLineBuffers object.
238 public void recycleBuffers()
240 synchronized (inputBuffer) {
241 recycleBuffer(inputBuffer);
242 this.inputBuffer = null;
245 synchronized (outputBuffers) {
246 for (ByteBuffer buf : outputBuffers) {
249 outputBuffers = null;