org/sonews/daemon/ConnectionWorker.java
author cli
Fri, 21 Aug 2009 17:33:15 +0200
changeset 18 7e527fdf0fa8
parent 15 f2293e8566f5
permissions -rw-r--r--
Fix for #549.
     1 /*
     2  *   SONEWS News Server
     3  *   see AUTHORS for the list of contributors
     4  *
     5  *   This program is free software: you can redistribute it and/or modify
     6  *   it under the terms of the GNU General Public License as published by
     7  *   the Free Software Foundation, either version 3 of the License, or
     8  *   (at your option) any later version.
     9  *
    10  *   This program is distributed in the hope that it will be useful,
    11  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
    12  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    13  *   GNU General Public License for more details.
    14  *
    15  *   You should have received a copy of the GNU General Public License
    16  *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
    17  */
    18 
    19 package org.sonews.daemon;
    20 
    21 import org.sonews.util.Log;
    22 import java.nio.ByteBuffer;
    23 import java.nio.channels.SocketChannel;
    24 import java.util.concurrent.ArrayBlockingQueue;
    25 
    26 /**
    27  * Does most of the work: parsing input, talking to client and Database.
    28  * @author Christian Lins
    29  * @since sonews/0.5.0
    30  */
    31 class ConnectionWorker extends AbstractDaemon
    32 {
    33 
    34   // 256 pending events should be enough
    35   private static ArrayBlockingQueue<SocketChannel> pendingChannels
    36     = new ArrayBlockingQueue<SocketChannel>(256, true);
    37   
    38   /**
    39    * Registers the given channel for further event processing.
    40    * @param channel
    41    */
    42   public static void addChannel(SocketChannel channel)
    43     throws InterruptedException
    44   {
    45     pendingChannels.put(channel);
    46   }
    47   
    48   /**
    49    * Processing loop.
    50    */
    51   @Override
    52   public void run()
    53   {
    54     while(isRunning())
    55     {
    56       try
    57       {
    58         // Retrieve and remove if available, otherwise wait.
    59         SocketChannel channel = pendingChannels.take();
    60 
    61         if(channel != null)
    62         {
    63           // Connections.getInstance().get() MAY return null
    64           NNTPConnection conn = Connections.getInstance().get(channel);
    65           
    66           // Try to lock the connection object
    67           if(conn != null && conn.tryReadLock())
    68           {
    69             ByteBuffer buf = conn.getBuffers().nextInputLine();
    70             while(buf != null) // Complete line was received
    71             {
    72               final byte[] line = new byte[buf.limit()];
    73               buf.get(line);
    74               ChannelLineBuffers.recycleBuffer(buf);
    75               
    76               // Here is the actual work done
    77               conn.lineReceived(line);
    78 
    79               // Read next line as we could have already received the next line
    80               buf = conn.getBuffers().nextInputLine();
    81             }
    82             conn.unlockReadLock();
    83           }
    84           else
    85           {
    86             addChannel(channel);
    87           }
    88         }
    89       }
    90       catch(InterruptedException ex)
    91       {
    92         Log.get().info("ConnectionWorker interrupted: " + ex);
    93       }
    94       catch(Exception ex)
    95       {
    96         Log.get().severe("Exception in ConnectionWorker: " + ex);
    97         ex.printStackTrace();
    98       }
    99     } // end while(isRunning())
   100   }
   101   
   102 }