org/sonews/daemon/ChannelWriter.java
author cli
Wed, 12 Aug 2009 13:03:23 +0200
changeset 7 0b76e099eb96
parent 1 6fceb66e1ad7
child 15 f2293e8566f5
permissions -rw-r--r--
PullFeeder sends an addition "MODE READER" to peers.
chris@1
     1
/*
chris@1
     2
 *   SONEWS News Server
chris@1
     3
 *   see AUTHORS for the list of contributors
chris@1
     4
 *
chris@1
     5
 *   This program is free software: you can redistribute it and/or modify
chris@1
     6
 *   it under the terms of the GNU General Public License as published by
chris@1
     7
 *   the Free Software Foundation, either version 3 of the License, or
chris@1
     8
 *   (at your option) any later version.
chris@1
     9
 *
chris@1
    10
 *   This program is distributed in the hope that it will be useful,
chris@1
    11
 *   but WITHOUT ANY WARRANTY; without even the implied warranty of
chris@1
    12
 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
chris@1
    13
 *   GNU General Public License for more details.
chris@1
    14
 *
chris@1
    15
 *   You should have received a copy of the GNU General Public License
chris@1
    16
 *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
chris@1
    17
 */
chris@1
    18
chris@1
    19
package org.sonews.daemon;
chris@1
    20
chris@1
    21
import org.sonews.util.Log;
chris@1
    22
import java.io.IOException;
chris@1
    23
import java.nio.ByteBuffer;
chris@1
    24
import java.nio.channels.CancelledKeyException;
chris@1
    25
import java.nio.channels.SelectionKey;
chris@1
    26
import java.nio.channels.Selector;
chris@1
    27
import java.nio.channels.SocketChannel;
chris@1
    28
import java.util.Iterator;
chris@1
    29
chris@1
    30
/**
chris@1
    31
 * A Thread task that processes OP_WRITE events for SocketChannels.
chris@1
    32
 * @author Christian Lins
chris@1
    33
 * @since sonews/0.5.0
chris@1
    34
 */
chris@1
    35
class ChannelWriter extends AbstractDaemon
chris@1
    36
{
chris@1
    37
chris@1
    38
  private static ChannelWriter instance = new ChannelWriter();
chris@1
    39
chris@1
    40
  /**
chris@1
    41
   * @return Returns the active ChannelWriter instance.
chris@1
    42
   */
chris@1
    43
  public static ChannelWriter getInstance()
chris@1
    44
  {
chris@1
    45
    return instance;
chris@1
    46
  }
chris@1
    47
  
chris@1
    48
  private Selector selector = null;
chris@1
    49
  
chris@1
    50
  protected ChannelWriter()
chris@1
    51
  {
chris@1
    52
  }
chris@1
    53
  
chris@1
    54
  /**
chris@1
    55
   * @return Selector associated with this instance.
chris@1
    56
   */
chris@1
    57
  public Selector getSelector()
chris@1
    58
  {
chris@1
    59
    return this.selector;
chris@1
    60
  }
chris@1
    61
  
chris@1
    62
  /**
chris@1
    63
   * Sets the selector that is used by this ChannelWriter.
chris@1
    64
   * @param selector
chris@1
    65
   */
chris@1
    66
  public void setSelector(final Selector selector)
chris@1
    67
  {
chris@1
    68
    this.selector = selector;
chris@1
    69
  }
chris@1
    70
  
chris@1
    71
  /**
chris@1
    72
   * Run loop.
chris@1
    73
   */
chris@1
    74
  @Override
chris@1
    75
  public void run()
chris@1
    76
  {
chris@1
    77
    assert selector != null;
chris@1
    78
chris@1
    79
    while(isRunning())
chris@1
    80
    {
chris@1
    81
      try
chris@1
    82
      {
chris@1
    83
        SelectionKey   selKey        = null;
chris@1
    84
        SocketChannel  socketChannel = null;
chris@1
    85
        NNTPConnection connection    = null;
chris@1
    86
chris@1
    87
        // select() blocks until some SelectableChannels are ready for
chris@1
    88
        // processing. There is no need to synchronize the selector as we
chris@1
    89
        // have only one thread per selector.
chris@1
    90
        selector.select(); // The return value of select can be ignored
chris@1
    91
chris@1
    92
        // Get list of selection keys with pending OP_WRITE events.
chris@1
    93
        // The keySET is not thread-safe whereas the keys itself are.
chris@1
    94
        Iterator it = selector.selectedKeys().iterator();
chris@1
    95
chris@1
    96
        while (it.hasNext())
chris@1
    97
        {
chris@1
    98
          // We remove the first event from the set and store it for
chris@1
    99
          // later processing.
chris@1
   100
          selKey = (SelectionKey) it.next();
chris@1
   101
          socketChannel = (SocketChannel) selKey.channel();
chris@1
   102
          connection = Connections.getInstance().get(socketChannel);
chris@1
   103
chris@1
   104
          it.remove();
chris@1
   105
          if (connection != null)
chris@1
   106
          {
chris@1
   107
            break;
chris@1
   108
          }
chris@1
   109
          else
chris@1
   110
          {
chris@1
   111
            selKey = null;
chris@1
   112
          }
chris@1
   113
        }
chris@1
   114
        
chris@1
   115
        if (selKey != null)
chris@1
   116
        {
chris@1
   117
          try
chris@1
   118
          {
chris@1
   119
            // Process the selected key.
chris@1
   120
            // As there is only one OP_WRITE key for a given channel, we need
chris@1
   121
            // not to synchronize this processing to retain the order.
chris@1
   122
            processSelectionKey(connection, socketChannel, selKey);
chris@1
   123
          }
chris@1
   124
          catch (IOException ex)
chris@1
   125
          {
chris@1
   126
            Log.msg("Error writing to channel: " + ex, false);
chris@1
   127
chris@1
   128
            // Cancel write events for this channel
chris@1
   129
            selKey.cancel();
chris@1
   130
            connection.shutdownInput();
chris@1
   131
            connection.shutdownOutput();
chris@1
   132
          }
chris@1
   133
        }
chris@1
   134
        
chris@1
   135
        // Eventually wait for a register operation
chris@1
   136
        synchronized(NNTPDaemon.RegisterGate) { /* do nothing */ }
chris@1
   137
      }
chris@1
   138
      catch(CancelledKeyException ex)
chris@1
   139
      {
chris@1
   140
        Log.msg("ChannelWriter.run(): " + ex, true);
chris@1
   141
      }
chris@1
   142
      catch(Exception ex)
chris@1
   143
      {
chris@1
   144
        ex.printStackTrace();
chris@1
   145
      }
chris@1
   146
    } // while(isRunning())
chris@1
   147
  }
chris@1
   148
  
chris@1
   149
  private void processSelectionKey(final NNTPConnection connection,
chris@1
   150
    final SocketChannel socketChannel, final SelectionKey selKey)
chris@1
   151
    throws InterruptedException, IOException
chris@1
   152
  {
chris@1
   153
    assert connection != null;
chris@1
   154
    assert socketChannel != null;
chris@1
   155
    assert selKey != null;
chris@1
   156
    assert selKey.isWritable();
chris@1
   157
chris@1
   158
    // SocketChannel is ready for writing
chris@1
   159
    if(selKey.isValid())
chris@1
   160
    {
chris@1
   161
      // Lock the socket channel
chris@1
   162
      synchronized(socketChannel)
chris@1
   163
      {
chris@1
   164
        // Get next output buffer
chris@1
   165
        ByteBuffer buf = connection.getOutputBuffer();
chris@1
   166
        if(buf == null)
chris@1
   167
        {
chris@1
   168
          // Currently we have nothing to write, so we stop the writeable
chris@1
   169
          // events until we have something to write to the socket channel
chris@1
   170
          //selKey.cancel();
chris@1
   171
          selKey.interestOps(0);
chris@1
   172
          return;
chris@1
   173
        }
chris@1
   174
 
chris@1
   175
        while(buf != null) // There is data to be send
chris@1
   176
        {
chris@1
   177
          // Write buffer to socket channel; this method does not block
chris@1
   178
          if(socketChannel.write(buf) <= 0)
chris@1
   179
          {
chris@1
   180
            // Perhaps there is data to be written, but the SocketChannel's
chris@1
   181
            // buffer is full, so we stop writing to until the next event.
chris@1
   182
            break;
chris@1
   183
          }
chris@1
   184
          else
chris@1
   185
          {
chris@1
   186
            // Retrieve next buffer if available; method may return the same
chris@1
   187
            // buffer instance if it still have some bytes remaining
chris@1
   188
            buf = connection.getOutputBuffer();
chris@1
   189
          }
chris@1
   190
        }
chris@1
   191
      }
chris@1
   192
    }
chris@1
   193
    else
chris@1
   194
    {
chris@1
   195
      Log.msg("Invalid OP_WRITE key: " + selKey, false);
chris@1
   196
chris@1
   197
      if (socketChannel.socket().isClosed())
chris@1
   198
      {
chris@1
   199
        connection.shutdownInput();
chris@1
   200
        connection.shutdownOutput();
chris@1
   201
        socketChannel.close();
chris@1
   202
        Log.msg("Connection closed.", true);
chris@1
   203
      }
chris@1
   204
    }
chris@1
   205
  }
chris@1
   206
  
chris@1
   207
}