diff -r c404a87db5b7 -r 74139325d305 src/org/sonews/daemon/ChannelWriter.java --- a/src/org/sonews/daemon/ChannelWriter.java Sun Aug 29 17:43:58 2010 +0200 +++ b/src/org/sonews/daemon/ChannelWriter.java Sun Aug 29 18:17:37 2010 +0200 @@ -35,176 +35,151 @@ class ChannelWriter extends AbstractDaemon { - private static ChannelWriter instance = new ChannelWriter(); + private static ChannelWriter instance = new ChannelWriter(); - /** - * @return Returns the active ChannelWriter instance. - */ - public static ChannelWriter getInstance() - { - return instance; - } - - private Selector selector = null; - - protected ChannelWriter() - { - } - - /** - * @return Selector associated with this instance. - */ - public Selector getSelector() - { - return this.selector; - } - - /** - * Sets the selector that is used by this ChannelWriter. - * @param selector - */ - public void setSelector(final Selector selector) - { - this.selector = selector; - } - - /** - * Run loop. - */ - @Override - public void run() - { - assert selector != null; + /** + * @return Returns the active ChannelWriter instance. + */ + public static ChannelWriter getInstance() + { + return instance; + } + private Selector selector = null; - while(isRunning()) - { - try - { - SelectionKey selKey = null; - SocketChannel socketChannel = null; - NNTPConnection connection = null; + protected ChannelWriter() + { + } - // select() blocks until some SelectableChannels are ready for - // processing. There is no need to synchronize the selector as we - // have only one thread per selector. - selector.select(); // The return value of select can be ignored + /** + * @return Selector associated with this instance. + */ + public Selector getSelector() + { + return this.selector; + } - // Get list of selection keys with pending OP_WRITE events. - // The keySET is not thread-safe whereas the keys itself are. - Iterator it = selector.selectedKeys().iterator(); + /** + * Sets the selector that is used by this ChannelWriter. + * @param selector + */ + public void setSelector(final Selector selector) + { + this.selector = selector; + } - while (it.hasNext()) - { - // We remove the first event from the set and store it for - // later processing. - selKey = (SelectionKey) it.next(); - socketChannel = (SocketChannel) selKey.channel(); - connection = Connections.getInstance().get(socketChannel); + /** + * Run loop. + */ + @Override + public void run() + { + assert selector != null; - it.remove(); - if (connection != null) - { - break; - } - else - { - selKey = null; - } - } - - if (selKey != null) - { - try - { - // Process the selected key. - // As there is only one OP_WRITE key for a given channel, we need - // not to synchronize this processing to retain the order. - processSelectionKey(connection, socketChannel, selKey); - } - catch (IOException ex) - { - Log.get().warning("Error writing to channel: " + ex); + while (isRunning()) { + try { + SelectionKey selKey = null; + SocketChannel socketChannel = null; + NNTPConnection connection = null; - // Cancel write events for this channel - selKey.cancel(); - connection.shutdownInput(); - connection.shutdownOutput(); - } - } - - // Eventually wait for a register operation - synchronized(NNTPDaemon.RegisterGate) { /* do nothing */ } - } - catch(CancelledKeyException ex) - { - Log.get().info("ChannelWriter.run(): " + ex); - } - catch(Exception ex) - { - ex.printStackTrace(); - } - } // while(isRunning()) - } - - private void processSelectionKey(final NNTPConnection connection, - final SocketChannel socketChannel, final SelectionKey selKey) - throws InterruptedException, IOException - { - assert connection != null; - assert socketChannel != null; - assert selKey != null; - assert selKey.isWritable(); + // select() blocks until some SelectableChannels are ready for + // processing. There is no need to synchronize the selector as we + // have only one thread per selector. + selector.select(); // The return value of select can be ignored - // SocketChannel is ready for writing - if(selKey.isValid()) - { - // Lock the socket channel - synchronized(socketChannel) - { - // Get next output buffer - ByteBuffer buf = connection.getOutputBuffer(); - if(buf == null) - { - // Currently we have nothing to write, so we stop the writeable - // events until we have something to write to the socket channel - //selKey.cancel(); - selKey.interestOps(0); - // Update activity timestamp to prevent too early disconnects - // on slow client connections - connection.setLastActivity(System.currentTimeMillis()); - return; - } - - while(buf != null) // There is data to be send - { - // Write buffer to socket channel; this method does not block - if(socketChannel.write(buf) <= 0) - { - // Perhaps there is data to be written, but the SocketChannel's - // buffer is full, so we stop writing to until the next event. - break; - } - else - { - // Retrieve next buffer if available; method may return the same - // buffer instance if it still have some bytes remaining - buf = connection.getOutputBuffer(); - } - } - } - } - else - { - Log.get().warning("Invalid OP_WRITE key: " + selKey); + // Get list of selection keys with pending OP_WRITE events. + // The keySET is not thread-safe whereas the keys itself are. + Iterator it = selector.selectedKeys().iterator(); - if(socketChannel.socket().isClosed()) - { - connection.shutdownInput(); - connection.shutdownOutput(); - socketChannel.close(); - Log.get().info("Connection closed."); - } - } - } - + while (it.hasNext()) { + // We remove the first event from the set and store it for + // later processing. + selKey = (SelectionKey) it.next(); + socketChannel = (SocketChannel) selKey.channel(); + connection = Connections.getInstance().get(socketChannel); + + it.remove(); + if (connection != null) { + break; + } else { + selKey = null; + } + } + + if (selKey != null) { + try { + // Process the selected key. + // As there is only one OP_WRITE key for a given channel, we need + // not to synchronize this processing to retain the order. + processSelectionKey(connection, socketChannel, selKey); + } catch (IOException ex) { + Log.get().warning("Error writing to channel: " + ex); + + // Cancel write events for this channel + selKey.cancel(); + connection.shutdownInput(); + connection.shutdownOutput(); + } + } + + // Eventually wait for a register operation + synchronized (NNTPDaemon.RegisterGate) { /* do nothing */ } + } catch (CancelledKeyException ex) { + Log.get().info("ChannelWriter.run(): " + ex); + } catch (Exception ex) { + ex.printStackTrace(); + } + } // while(isRunning()) + } + + private void processSelectionKey(final NNTPConnection connection, + final SocketChannel socketChannel, final SelectionKey selKey) + throws InterruptedException, IOException + { + assert connection != null; + assert socketChannel != null; + assert selKey != null; + assert selKey.isWritable(); + + // SocketChannel is ready for writing + if (selKey.isValid()) { + // Lock the socket channel + synchronized (socketChannel) { + // Get next output buffer + ByteBuffer buf = connection.getOutputBuffer(); + if (buf == null) { + // Currently we have nothing to write, so we stop the writeable + // events until we have something to write to the socket channel + //selKey.cancel(); + selKey.interestOps(0); + // Update activity timestamp to prevent too early disconnects + // on slow client connections + connection.setLastActivity(System.currentTimeMillis()); + return; + } + + while (buf != null) // There is data to be send + { + // Write buffer to socket channel; this method does not block + if (socketChannel.write(buf) <= 0) { + // Perhaps there is data to be written, but the SocketChannel's + // buffer is full, so we stop writing to until the next event. + break; + } else { + // Retrieve next buffer if available; method may return the same + // buffer instance if it still have some bytes remaining + buf = connection.getOutputBuffer(); + } + } + } + } else { + Log.get().warning("Invalid OP_WRITE key: " + selKey); + + if (socketChannel.socket().isClosed()) { + connection.shutdownInput(); + connection.shutdownOutput(); + socketChannel.close(); + Log.get().info("Connection closed."); + } + } + } }