src/org/sonews/daemon/ChannelReader.java
author cli
Mon, 12 Sep 2011 20:20:00 +0200
changeset 52 d97b223eab4e
parent 37 74139325d305
permissions -rwxr-xr-x
Change binary package name sonews-server to server
chris@1
     1
/*
chris@1
     2
 *   SONEWS News Server
chris@1
     3
 *   see AUTHORS for the list of contributors
chris@1
     4
 *
chris@1
     5
 *   This program is free software: you can redistribute it and/or modify
chris@1
     6
 *   it under the terms of the GNU General Public License as published by
chris@1
     7
 *   the Free Software Foundation, either version 3 of the License, or
chris@1
     8
 *   (at your option) any later version.
chris@1
     9
 *
chris@1
    10
 *   This program is distributed in the hope that it will be useful,
chris@1
    11
 *   but WITHOUT ANY WARRANTY; without even the implied warranty of
chris@1
    12
 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
chris@1
    13
 *   GNU General Public License for more details.
chris@1
    14
 *
chris@1
    15
 *   You should have received a copy of the GNU General Public License
chris@1
    16
 *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
chris@1
    17
 */
chris@1
    18
chris@1
    19
package org.sonews.daemon;
chris@1
    20
chris@1
    21
import java.io.IOException;
chris@1
    22
import java.nio.ByteBuffer;
chris@1
    23
import java.nio.channels.CancelledKeyException;
chris@1
    24
import java.nio.channels.SelectionKey;
chris@1
    25
import java.nio.channels.Selector;
chris@1
    26
import java.nio.channels.SocketChannel;
chris@1
    27
import java.util.Iterator;
chris@1
    28
import java.util.Set;
cli@15
    29
import java.util.logging.Level;
chris@3
    30
import org.sonews.util.Log;
chris@1
    31
chris@1
    32
/**
chris@1
    33
 * A Thread task listening for OP_READ events from SocketChannels.
chris@1
    34
 * @author Christian Lins
chris@1
    35
 * @since sonews/0.5.0
chris@1
    36
 */
chris@1
    37
class ChannelReader extends AbstractDaemon
chris@1
    38
{
chris@1
    39
cli@37
    40
	private static ChannelReader instance = new ChannelReader();
chris@1
    41
cli@37
    42
	/**
cli@37
    43
	 * @return Active ChannelReader instance.
cli@37
    44
	 */
cli@37
    45
	public static ChannelReader getInstance()
cli@37
    46
	{
cli@37
    47
		return instance;
cli@37
    48
	}
cli@37
    49
	private Selector selector = null;
chris@1
    50
cli@37
    51
	protected ChannelReader()
cli@37
    52
	{
cli@37
    53
	}
chris@1
    54
cli@37
    55
	/**
cli@37
    56
	 * Sets the selector which is used by this reader to determine the channel
cli@37
    57
	 * to read from.
cli@37
    58
	 * @param selector
cli@37
    59
	 */
cli@37
    60
	public void setSelector(final Selector selector)
cli@37
    61
	{
cli@37
    62
		this.selector = selector;
cli@37
    63
	}
chris@1
    64
cli@37
    65
	/**
cli@37
    66
	 * Run loop. Blocks until some data is available in a channel.
cli@37
    67
	 */
cli@37
    68
	@Override
cli@37
    69
	public void run()
cli@37
    70
	{
cli@37
    71
		assert selector != null;
chris@1
    72
cli@37
    73
		while (isRunning()) {
cli@37
    74
			try {
cli@37
    75
				// select() blocks until some SelectableChannels are ready for
cli@37
    76
				// processing. There is no need to lock the selector as we have only
cli@37
    77
				// one thread per selector.
cli@37
    78
				selector.select();
chris@1
    79
cli@37
    80
				// Get list of selection keys with pending events.
cli@37
    81
				// Note: the selected key set is not thread-safe
cli@37
    82
				SocketChannel channel = null;
cli@37
    83
				NNTPConnection conn = null;
cli@37
    84
				final Set<SelectionKey> selKeys = selector.selectedKeys();
cli@37
    85
				SelectionKey selKey = null;
chris@1
    86
cli@37
    87
				synchronized (selKeys) {
cli@37
    88
					Iterator it = selKeys.iterator();
chris@1
    89
cli@37
    90
					// Process the first pending event
cli@37
    91
					while (it.hasNext()) {
cli@37
    92
						selKey = (SelectionKey) it.next();
cli@37
    93
						channel = (SocketChannel) selKey.channel();
cli@37
    94
						conn = Connections.getInstance().get(channel);
cli@37
    95
cli@37
    96
						// Because we cannot lock the selKey as that would cause a deadlock
cli@37
    97
						// we lock the connection. To preserve the order of the received
cli@37
    98
						// byte blocks a selection key for a connection that has pending
cli@37
    99
						// read events is skipped.
cli@37
   100
						if (conn == null || conn.tryReadLock()) {
cli@37
   101
							// Remove from set to indicate that it's being processed
cli@37
   102
							it.remove();
cli@37
   103
							if (conn != null) {
cli@37
   104
								break; // End while loop
cli@37
   105
							}
cli@37
   106
						} else {
cli@37
   107
							selKey = null;
cli@37
   108
							channel = null;
cli@37
   109
							conn = null;
cli@37
   110
						}
cli@37
   111
					}
cli@37
   112
				}
cli@37
   113
cli@37
   114
				// Do not lock the selKeys while processing because this causes
cli@37
   115
				// a deadlock in sun.nio.ch.SelectorImpl.lockAndDoSelect()
cli@37
   116
				if (selKey != null && channel != null && conn != null) {
cli@37
   117
					processSelectionKey(conn, channel, selKey);
cli@37
   118
					conn.unlockReadLock();
cli@37
   119
				}
cli@37
   120
cli@37
   121
			} catch (CancelledKeyException ex) {
cli@37
   122
				Log.get().warning("ChannelReader.run(): " + ex);
cli@37
   123
				Log.get().log(Level.INFO, "", ex);
cli@37
   124
			} catch (Exception ex) {
cli@37
   125
				ex.printStackTrace();
cli@37
   126
			}
cli@37
   127
cli@37
   128
			// Eventually wait for a register operation
cli@37
   129
			synchronized (NNTPDaemon.RegisterGate) {
cli@37
   130
				// Do nothing; FindBugs may warn about an empty synchronized
cli@37
   131
				// statement, but we cannot use a wait()/notify() mechanism here.
cli@37
   132
				// If we used something like RegisterGate.wait() we block here
cli@37
   133
				// until the NNTPDaemon calls notify(). But the daemon only
cli@37
   134
				// calls notify() if itself is NOT blocked in the listening socket.
cli@37
   135
			}
cli@37
   136
		} // while(isRunning())
cli@37
   137
	}
cli@37
   138
cli@37
   139
	private void processSelectionKey(final NNTPConnection connection,
cli@37
   140
		final SocketChannel socketChannel, final SelectionKey selKey)
cli@37
   141
		throws InterruptedException, IOException
cli@37
   142
	{
cli@37
   143
		assert selKey != null;
cli@37
   144
		assert selKey.isReadable();
cli@37
   145
cli@37
   146
		// Some bytes are available for reading
cli@37
   147
		if (selKey.isValid()) {
cli@37
   148
			// Lock the channel
cli@37
   149
			//synchronized(socketChannel)
cli@37
   150
			{
cli@37
   151
				// Read the data into the appropriate buffer
cli@37
   152
				ByteBuffer buf = connection.getInputBuffer();
cli@37
   153
				int read = -1;
cli@37
   154
				try {
cli@37
   155
					read = socketChannel.read(buf);
cli@37
   156
				} catch (IOException ex) {
cli@37
   157
					// The connection was probably closed by the remote host
cli@37
   158
					// in a non-clean fashion
cli@37
   159
					Log.get().info("ChannelReader.processSelectionKey(): " + ex);
cli@37
   160
				} catch (Exception ex) {
cli@37
   161
					Log.get().warning("ChannelReader.processSelectionKey(): " + ex);
cli@37
   162
				}
cli@37
   163
cli@37
   164
				if (read == -1) // End of stream
cli@37
   165
				{
cli@37
   166
					selKey.cancel();
cli@37
   167
				} else if (read > 0) // If some data was read
cli@37
   168
				{
cli@37
   169
					ConnectionWorker.addChannel(socketChannel);
cli@37
   170
				}
cli@37
   171
			}
cli@37
   172
		} else {
cli@37
   173
			// Should not happen
cli@37
   174
			Log.get().severe("Should not happen: " + selKey.toString());
cli@37
   175
		}
cli@37
   176
	}
chris@1
   177
}