src/org/sonews/daemon/ChannelReader.java
author cli
Sun, 29 Aug 2010 17:28:58 +0200
changeset 35 ed84c8bdd87b
parent 15 org/sonews/daemon/ChannelReader.java@f2293e8566f5
child 37 74139325d305
permissions -rw-r--r--
Moving source files into src/-subdir.
     1 /*
     2  *   SONEWS News Server
     3  *   see AUTHORS for the list of contributors
     4  *
     5  *   This program is free software: you can redistribute it and/or modify
     6  *   it under the terms of the GNU General Public License as published by
     7  *   the Free Software Foundation, either version 3 of the License, or
     8  *   (at your option) any later version.
     9  *
    10  *   This program is distributed in the hope that it will be useful,
    11  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
    12  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    13  *   GNU General Public License for more details.
    14  *
    15  *   You should have received a copy of the GNU General Public License
    16  *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
    17  */
    18 
    19 package org.sonews.daemon;
    20 
    21 import java.io.IOException;
    22 import java.nio.ByteBuffer;
    23 import java.nio.channels.CancelledKeyException;
    24 import java.nio.channels.SelectionKey;
    25 import java.nio.channels.Selector;
    26 import java.nio.channels.SocketChannel;
    27 import java.util.Iterator;
    28 import java.util.Set;
    29 import java.util.logging.Level;
    30 import org.sonews.util.Log;
    31 
    32 /**
    33  * A Thread task listening for OP_READ events from SocketChannels.
    34  * @author Christian Lins
    35  * @since sonews/0.5.0
    36  */
    37 class ChannelReader extends AbstractDaemon
    38 {
    39 
    40   private static ChannelReader instance = new ChannelReader();
    41 
    42   /**
    43    * @return Active ChannelReader instance.
    44    */
    45   public static ChannelReader getInstance()
    46   {
    47     return instance;
    48   }
    49   
    50   private Selector selector = null;
    51   
    52   protected ChannelReader()
    53   {
    54   }
    55   
    56   /**
    57    * Sets the selector which is used by this reader to determine the channel
    58    * to read from.
    59    * @param selector
    60    */
    61   public void setSelector(final Selector selector)
    62   {
    63     this.selector = selector;
    64   }
    65   
    66   /**
    67    * Run loop. Blocks until some data is available in a channel.
    68    */
    69   @Override
    70   public void run()
    71   {
    72     assert selector != null;
    73 
    74     while(isRunning())
    75     {
    76       try
    77       {
    78         // select() blocks until some SelectableChannels are ready for
    79         // processing. There is no need to lock the selector as we have only
    80         // one thread per selector.
    81         selector.select();
    82 
    83         // Get list of selection keys with pending events.
    84         // Note: the selected key set is not thread-safe
    85         SocketChannel channel = null;
    86         NNTPConnection conn = null;
    87         final Set<SelectionKey> selKeys = selector.selectedKeys();
    88         SelectionKey selKey = null;
    89 
    90         synchronized (selKeys)
    91         {
    92           Iterator it = selKeys.iterator();
    93 
    94           // Process the first pending event
    95           while (it.hasNext())
    96           {
    97             selKey = (SelectionKey) it.next();
    98             channel = (SocketChannel) selKey.channel();
    99             conn = Connections.getInstance().get(channel);
   100 
   101             // Because we cannot lock the selKey as that would cause a deadlock
   102             // we lock the connection. To preserve the order of the received
   103             // byte blocks a selection key for a connection that has pending
   104             // read events is skipped.
   105             if (conn == null || conn.tryReadLock())
   106             {
   107               // Remove from set to indicate that it's being processed
   108               it.remove();
   109               if (conn != null)
   110               {
   111                 break; // End while loop
   112               }
   113             }
   114             else
   115             {
   116               selKey = null;
   117               channel = null;
   118               conn = null;
   119             }
   120           }
   121         }
   122 
   123         // Do not lock the selKeys while processing because this causes
   124         // a deadlock in sun.nio.ch.SelectorImpl.lockAndDoSelect()
   125         if (selKey != null && channel != null && conn != null)
   126         {
   127           processSelectionKey(conn, channel, selKey);
   128           conn.unlockReadLock();
   129         }
   130 
   131       }
   132       catch(CancelledKeyException ex)
   133       {
   134         Log.get().warning("ChannelReader.run(): " + ex);
   135         Log.get().log(Level.INFO, "", ex);
   136       }
   137       catch(Exception ex)
   138       {
   139         ex.printStackTrace();
   140       }
   141       
   142       // Eventually wait for a register operation
   143       synchronized (NNTPDaemon.RegisterGate)
   144       {
   145       // Do nothing; FindBugs may warn about an empty synchronized 
   146       // statement, but we cannot use a wait()/notify() mechanism here.
   147       // If we used something like RegisterGate.wait() we block here
   148       // until the NNTPDaemon calls notify(). But the daemon only
   149       // calls notify() if itself is NOT blocked in the listening socket.
   150       }
   151     } // while(isRunning())
   152   }
   153   
   154   private void processSelectionKey(final NNTPConnection connection,
   155     final SocketChannel socketChannel, final SelectionKey selKey)
   156     throws InterruptedException, IOException
   157   {
   158     assert selKey != null;
   159     assert selKey.isReadable();
   160     
   161     // Some bytes are available for reading
   162     if(selKey.isValid())
   163     {   
   164       // Lock the channel
   165       //synchronized(socketChannel)
   166       {
   167         // Read the data into the appropriate buffer
   168         ByteBuffer buf = connection.getInputBuffer();
   169         int read = -1;
   170         try 
   171         {
   172           read = socketChannel.read(buf);
   173         }
   174         catch(IOException ex)
   175         {
   176           // The connection was probably closed by the remote host
   177           // in a non-clean fashion
   178           Log.get().info("ChannelReader.processSelectionKey(): " + ex);
   179         }
   180         catch(Exception ex) 
   181         {
   182           Log.get().warning("ChannelReader.processSelectionKey(): " + ex);
   183         }
   184         
   185         if(read == -1) // End of stream
   186         {
   187           selKey.cancel();
   188         }
   189         else if(read > 0) // If some data was read
   190         {
   191           ConnectionWorker.addChannel(socketChannel);
   192         }
   193       }
   194     }
   195     else
   196     {
   197       // Should not happen
   198       Log.get().severe("Should not happen: " + selKey.toString());
   199     }
   200   }
   201   
   202 }