org/sonews/daemon/ChannelWriter.java
author cli
Tue, 27 Apr 2010 22:11:30 +0200
changeset 27 d879bab39600
parent 25 dd05c3f2fa24
permissions -rw-r--r--
Fix for #567 "mailinglist gateway does not recover after database outage".
     1 /*
     2  *   SONEWS News Server
     3  *   see AUTHORS for the list of contributors
     4  *
     5  *   This program is free software: you can redistribute it and/or modify
     6  *   it under the terms of the GNU General Public License as published by
     7  *   the Free Software Foundation, either version 3 of the License, or
     8  *   (at your option) any later version.
     9  *
    10  *   This program is distributed in the hope that it will be useful,
    11  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
    12  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    13  *   GNU General Public License for more details.
    14  *
    15  *   You should have received a copy of the GNU General Public License
    16  *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
    17  */
    18 
    19 package org.sonews.daemon;
    20 
    21 import org.sonews.util.Log;
    22 import java.io.IOException;
    23 import java.nio.ByteBuffer;
    24 import java.nio.channels.CancelledKeyException;
    25 import java.nio.channels.SelectionKey;
    26 import java.nio.channels.Selector;
    27 import java.nio.channels.SocketChannel;
    28 import java.util.Iterator;
    29 
    30 /**
    31  * A Thread task that processes OP_WRITE events for SocketChannels.
    32  * @author Christian Lins
    33  * @since sonews/0.5.0
    34  */
    35 class ChannelWriter extends AbstractDaemon
    36 {
    37 
    38   private static ChannelWriter instance = new ChannelWriter();
    39 
    40   /**
    41    * @return Returns the active ChannelWriter instance.
    42    */
    43   public static ChannelWriter getInstance()
    44   {
    45     return instance;
    46   }
    47   
    48   private Selector selector = null;
    49   
    50   protected ChannelWriter()
    51   {
    52   }
    53   
    54   /**
    55    * @return Selector associated with this instance.
    56    */
    57   public Selector getSelector()
    58   {
    59     return this.selector;
    60   }
    61   
    62   /**
    63    * Sets the selector that is used by this ChannelWriter.
    64    * @param selector
    65    */
    66   public void setSelector(final Selector selector)
    67   {
    68     this.selector = selector;
    69   }
    70   
    71   /**
    72    * Run loop.
    73    */
    74   @Override
    75   public void run()
    76   {
    77     assert selector != null;
    78 
    79     while(isRunning())
    80     {
    81       try
    82       {
    83         SelectionKey   selKey        = null;
    84         SocketChannel  socketChannel = null;
    85         NNTPConnection connection    = null;
    86 
    87         // select() blocks until some SelectableChannels are ready for
    88         // processing. There is no need to synchronize the selector as we
    89         // have only one thread per selector.
    90         selector.select(); // The return value of select can be ignored
    91 
    92         // Get list of selection keys with pending OP_WRITE events.
    93         // The keySET is not thread-safe whereas the keys itself are.
    94         Iterator it = selector.selectedKeys().iterator();
    95 
    96         while (it.hasNext())
    97         {
    98           // We remove the first event from the set and store it for
    99           // later processing.
   100           selKey = (SelectionKey) it.next();
   101           socketChannel = (SocketChannel) selKey.channel();
   102           connection = Connections.getInstance().get(socketChannel);
   103 
   104           it.remove();
   105           if (connection != null)
   106           {
   107             break;
   108           }
   109           else
   110           {
   111             selKey = null;
   112           }
   113         }
   114         
   115         if (selKey != null)
   116         {
   117           try
   118           {
   119             // Process the selected key.
   120             // As there is only one OP_WRITE key for a given channel, we need
   121             // not to synchronize this processing to retain the order.
   122             processSelectionKey(connection, socketChannel, selKey);
   123           }
   124           catch (IOException ex)
   125           {
   126             Log.get().warning("Error writing to channel: " + ex);
   127 
   128             // Cancel write events for this channel
   129             selKey.cancel();
   130             connection.shutdownInput();
   131             connection.shutdownOutput();
   132           }
   133         }
   134         
   135         // Eventually wait for a register operation
   136         synchronized(NNTPDaemon.RegisterGate) { /* do nothing */ }
   137       }
   138       catch(CancelledKeyException ex)
   139       {
   140         Log.get().info("ChannelWriter.run(): " + ex);
   141       }
   142       catch(Exception ex)
   143       {
   144         ex.printStackTrace();
   145       }
   146     } // while(isRunning())
   147   }
   148   
   149   private void processSelectionKey(final NNTPConnection connection,
   150     final SocketChannel socketChannel, final SelectionKey selKey)
   151     throws InterruptedException, IOException
   152   {
   153     assert connection != null;
   154     assert socketChannel != null;
   155     assert selKey != null;
   156     assert selKey.isWritable();
   157 
   158     // SocketChannel is ready for writing
   159     if(selKey.isValid())
   160     {
   161       // Lock the socket channel
   162       synchronized(socketChannel)
   163       {
   164         // Get next output buffer
   165         ByteBuffer buf = connection.getOutputBuffer();
   166         if(buf == null)
   167         {
   168           // Currently we have nothing to write, so we stop the writeable
   169           // events until we have something to write to the socket channel
   170           //selKey.cancel();
   171           selKey.interestOps(0);
   172           // Update activity timestamp to prevent too early disconnects
   173           // on slow client connections
   174           connection.setLastActivity(System.currentTimeMillis());
   175           return;
   176         }
   177  
   178         while(buf != null) // There is data to be send
   179         {
   180           // Write buffer to socket channel; this method does not block
   181           if(socketChannel.write(buf) <= 0)
   182           {
   183             // Perhaps there is data to be written, but the SocketChannel's
   184             // buffer is full, so we stop writing to until the next event.
   185             break;
   186           }
   187           else
   188           {
   189             // Retrieve next buffer if available; method may return the same
   190             // buffer instance if it still have some bytes remaining
   191             buf = connection.getOutputBuffer();
   192           }
   193         }
   194       }
   195     }
   196     else
   197     {
   198       Log.get().warning("Invalid OP_WRITE key: " + selKey);
   199 
   200       if(socketChannel.socket().isClosed())
   201       {
   202         connection.shutdownInput();
   203         connection.shutdownOutput();
   204         socketChannel.close();
   205         Log.get().info("Connection closed.");
   206       }
   207     }
   208   }
   209   
   210 }