src/org/sonews/daemon/ConnectionWorker.java
author cli
Wed, 14 Sep 2011 12:54:40 +0200
changeset 60 39f1fadf50a0
parent 37 74139325d305
permissions -rwxr-xr-x
Add libcommons-codec-java to Debian dependencies.
     1 /*
     2  *   SONEWS News Server
     3  *   see AUTHORS for the list of contributors
     4  *
     5  *   This program is free software: you can redistribute it and/or modify
     6  *   it under the terms of the GNU General Public License as published by
     7  *   the Free Software Foundation, either version 3 of the License, or
     8  *   (at your option) any later version.
     9  *
    10  *   This program is distributed in the hope that it will be useful,
    11  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
    12  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    13  *   GNU General Public License for more details.
    14  *
    15  *   You should have received a copy of the GNU General Public License
    16  *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
    17  */
    18 
    19 package org.sonews.daemon;
    20 
    21 import org.sonews.util.Log;
    22 import java.nio.ByteBuffer;
    23 import java.nio.channels.SocketChannel;
    24 import java.util.concurrent.ArrayBlockingQueue;
    25 
    26 /**
    27  * Does most of the work: parsing input, talking to client and Database.
    28  * @author Christian Lins
    29  * @since sonews/0.5.0
    30  */
    31 class ConnectionWorker extends AbstractDaemon
    32 {
    33 
    34 	// 256 pending events should be enough
    35 	private static ArrayBlockingQueue<SocketChannel> pendingChannels = new ArrayBlockingQueue<SocketChannel>(256, true);
    36 
    37 	/**
    38 	 * Registers the given channel for further event processing.
    39 	 * @param channel
    40 	 */
    41 	public static void addChannel(SocketChannel channel)
    42 		throws InterruptedException
    43 	{
    44 		pendingChannels.put(channel);
    45 	}
    46 
    47 	/**
    48 	 * Processing loop.
    49 	 */
    50 	@Override
    51 	public void run()
    52 	{
    53 		while (isRunning()) {
    54 			try {
    55 				// Retrieve and remove if available, otherwise wait.
    56 				SocketChannel channel = pendingChannels.take();
    57 
    58 				if (channel != null) {
    59 					// Connections.getInstance().get() MAY return null
    60 					NNTPConnection conn = Connections.getInstance().get(channel);
    61 
    62 					// Try to lock the connection object
    63 					if (conn != null && conn.tryReadLock()) {
    64 						ByteBuffer buf = conn.getBuffers().nextInputLine();
    65 						while (buf != null) // Complete line was received
    66 						{
    67 							final byte[] line = new byte[buf.limit()];
    68 							buf.get(line);
    69 							ChannelLineBuffers.recycleBuffer(buf);
    70 
    71 							// Here is the actual work done
    72 							conn.lineReceived(line);
    73 
    74 							// Read next line as we could have already received the next line
    75 							buf = conn.getBuffers().nextInputLine();
    76 						}
    77 						conn.unlockReadLock();
    78 					} else {
    79 						addChannel(channel);
    80 					}
    81 				}
    82 			} catch (InterruptedException ex) {
    83 				Log.get().info("ConnectionWorker interrupted: " + ex);
    84 			} catch (Exception ex) {
    85 				Log.get().severe("Exception in ConnectionWorker: " + ex);
    86 				ex.printStackTrace();
    87 			}
    88 		} // end while(isRunning())
    89 	}
    90 }