org/sonews/daemon/ChannelReader.java
author chris <chris@marvin>
Fri, 26 Jun 2009 16:48:50 +0200
changeset 1 6fceb66e1ad7
child 3 2fdc9cc89502
permissions -rw-r--r--
Hooray... sonews/0.5.0 final

HG: Enter commit message. Lines beginning with 'HG:' are removed.
HG: Remove all lines to abort the collapse operation.
     1 /*
     2  *   SONEWS News Server
     3  *   see AUTHORS for the list of contributors
     4  *
     5  *   This program is free software: you can redistribute it and/or modify
     6  *   it under the terms of the GNU General Public License as published by
     7  *   the Free Software Foundation, either version 3 of the License, or
     8  *   (at your option) any later version.
     9  *
    10  *   This program is distributed in the hope that it will be useful,
    11  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
    12  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    13  *   GNU General Public License for more details.
    14  *
    15  *   You should have received a copy of the GNU General Public License
    16  *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
    17  */
    18 
    19 package org.sonews.daemon;
    20 
    21 import org.sonews.util.Log;
    22 import java.io.IOException;
    23 import java.nio.ByteBuffer;
    24 import java.nio.channels.CancelledKeyException;
    25 import java.nio.channels.SelectionKey;
    26 import java.nio.channels.Selector;
    27 import java.nio.channels.SocketChannel;
    28 import java.util.Iterator;
    29 import java.util.Set;
    30 
    31 /**
    32  * A Thread task listening for OP_READ events from SocketChannels.
    33  * @author Christian Lins
    34  * @since sonews/0.5.0
    35  */
    36 class ChannelReader extends AbstractDaemon
    37 {
    38 
    39   private static ChannelReader instance = new ChannelReader();
    40 
    41   /**
    42    * @return Active ChannelReader instance.
    43    */
    44   public static ChannelReader getInstance()
    45   {
    46     return instance;
    47   }
    48   
    49   private Selector selector = null;
    50   
    51   protected ChannelReader()
    52   {
    53   }
    54   
    55   /**
    56    * Sets the selector which is used by this reader to determine the channel
    57    * to read from.
    58    * @param selector
    59    */
    60   public void setSelector(final Selector selector)
    61   {
    62     this.selector = selector;
    63   }
    64   
    65   /**
    66    * Run loop. Blocks until some data is available in a channel.
    67    */
    68   @Override
    69   public void run()
    70   {
    71     assert selector != null;
    72 
    73     while(isRunning())
    74     {
    75       try
    76       {
    77         // select() blocks until some SelectableChannels are ready for
    78         // processing. There is no need to lock the selector as we have only
    79         // one thread per selector.
    80         selector.select();
    81 
    82         // Get list of selection keys with pending events.
    83         // Note: the selected key set is not thread-safe
    84         SocketChannel channel = null;
    85         NNTPConnection conn = null;
    86         final Set<SelectionKey> selKeys = selector.selectedKeys();
    87         SelectionKey selKey = null;
    88 
    89         synchronized (selKeys)
    90         {
    91           Iterator it = selKeys.iterator();
    92 
    93           // Process the first pending event
    94           while (it.hasNext())
    95           {
    96             selKey = (SelectionKey) it.next();
    97             channel = (SocketChannel) selKey.channel();
    98             conn = Connections.getInstance().get(channel);
    99 
   100             // Because we cannot lock the selKey as that would cause a deadlock
   101             // we lock the connection. To preserve the order of the received
   102             // byte blocks a selection key for a connection that has pending
   103             // read events is skipped.
   104             if (conn == null || conn.tryReadLock())
   105             {
   106               // Remove from set to indicate that it's being processed
   107               it.remove();
   108               if (conn != null)
   109               {
   110                 break; // End while loop
   111               }
   112             }
   113             else
   114             {
   115               selKey = null;
   116               channel = null;
   117               conn = null;
   118             }
   119           }
   120         }
   121 
   122         // Do not lock the selKeys while processing because this causes
   123         // a deadlock in sun.nio.ch.SelectorImpl.lockAndDoSelect()
   124         if (selKey != null && channel != null && conn != null)
   125         {
   126           processSelectionKey(conn, channel, selKey);
   127           conn.unlockReadLock();
   128         }
   129 
   130       }
   131       catch(CancelledKeyException ex)
   132       {
   133         Log.msg("ChannelReader.run(): " + ex, false);
   134         if(Log.isDebug())
   135         {
   136           ex.printStackTrace();
   137         }
   138       }
   139       catch(Exception ex)
   140       {
   141         ex.printStackTrace();
   142       }
   143       
   144       // Eventually wait for a register operation
   145       synchronized (NNTPDaemon.RegisterGate)
   146       {
   147       // Do nothing; FindBugs may warn about an empty synchronized 
   148       // statement, but we cannot use a wait()/notify() mechanism here.
   149       // If we used something like RegisterGate.wait() we block here
   150       // until the NNTPDaemon calls notify(). But the daemon only
   151       // calls notify() if itself is NOT blocked in the listening socket.
   152       }
   153     } // while(isRunning())
   154   }
   155   
   156   private void processSelectionKey(final NNTPConnection connection,
   157     final SocketChannel socketChannel, final SelectionKey selKey)
   158     throws InterruptedException, IOException
   159   {
   160     assert selKey != null;
   161     assert selKey.isReadable();
   162     
   163     // Some bytes are available for reading
   164     if(selKey.isValid())
   165     {      
   166       // Lock the channel
   167       //synchronized(socketChannel)
   168       {
   169         // Read the data into the appropriate buffer
   170         ByteBuffer buf = connection.getInputBuffer();
   171         int read = -1;
   172         try 
   173         {
   174           read = socketChannel.read(buf);
   175         } 
   176         catch(Exception ex) 
   177         {
   178           Log.msg("ChannelReader.processSelectionKey(): " + ex, false);
   179           if(Log.isDebug())
   180           {
   181             ex.printStackTrace();
   182           }
   183         }
   184         
   185         if(read == -1) // End of stream
   186         {
   187           selKey.cancel();
   188         }
   189         else if(read > 0) // If some data was read
   190         {
   191           ConnectionWorker.addChannel(socketChannel);
   192         }
   193       }
   194     }
   195     else
   196     {
   197       // Should not happen
   198       Log.msg(selKey, false);
   199     }
   200   }
   201   
   202 }