src/org/sonews/daemon/NNTPDaemon.java
changeset 37 74139325d305
parent 35 ed84c8bdd87b
     1.1 --- a/src/org/sonews/daemon/NNTPDaemon.java	Sun Aug 29 17:43:58 2010 +0200
     1.2 +++ b/src/org/sonews/daemon/NNTPDaemon.java	Sun Aug 29 18:17:37 2010 +0200
     1.3 @@ -40,158 +40,130 @@
     1.4  public final class NNTPDaemon extends AbstractDaemon
     1.5  {
     1.6  
     1.7 -  public static final Object RegisterGate = new Object();
     1.8 -  
     1.9 -  private static NNTPDaemon instance = null;
    1.10 -  
    1.11 -  public static synchronized NNTPDaemon createInstance(int port)
    1.12 -  {
    1.13 -    if(instance == null)
    1.14 -    {
    1.15 -      instance = new NNTPDaemon(port);
    1.16 -      return instance;
    1.17 -    }
    1.18 -    else
    1.19 -    {
    1.20 -      throw new RuntimeException("NNTPDaemon.createInstance() called twice");
    1.21 -    }
    1.22 -  }
    1.23 -  
    1.24 -  private int port;
    1.25 -  
    1.26 -  private NNTPDaemon(final int port)
    1.27 -  {
    1.28 -    Log.get().info("Server listening on port " + port);
    1.29 -    this.port = port;
    1.30 -  }
    1.31 +	public static final Object RegisterGate = new Object();
    1.32 +	private static NNTPDaemon instance = null;
    1.33  
    1.34 -  @Override
    1.35 -  public void run()
    1.36 -  {
    1.37 -    try
    1.38 -    {
    1.39 -      // Create a Selector that handles the SocketChannel multiplexing
    1.40 -      final Selector readSelector  = Selector.open();
    1.41 -      final Selector writeSelector = Selector.open();
    1.42 -      
    1.43 -      // Start working threads
    1.44 -      final int workerThreads = Runtime.getRuntime().availableProcessors() * 4;
    1.45 -      ConnectionWorker[] cworkers = new ConnectionWorker[workerThreads];
    1.46 -      for(int n = 0; n < workerThreads; n++)
    1.47 -      {
    1.48 -        cworkers[n] = new ConnectionWorker();
    1.49 -        cworkers[n].start();
    1.50 -      }
    1.51 -      
    1.52 -      ChannelWriter.getInstance().setSelector(writeSelector);
    1.53 -      ChannelReader.getInstance().setSelector(readSelector);
    1.54 -      ChannelWriter.getInstance().start();
    1.55 -      ChannelReader.getInstance().start();
    1.56 -      
    1.57 -      final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    1.58 -      serverSocketChannel.configureBlocking(true);  // Set to blocking mode
    1.59 -      
    1.60 -      // Configure ServerSocket; bind to socket...
    1.61 -      final ServerSocket serverSocket = serverSocketChannel.socket();
    1.62 -      serverSocket.bind(new InetSocketAddress(this.port));
    1.63 -      
    1.64 -      while(isRunning())
    1.65 -      {
    1.66 -        SocketChannel socketChannel;
    1.67 -        
    1.68 -        try
    1.69 -        {
    1.70 -          // As we set the server socket channel to blocking mode the accept()
    1.71 -          // method will block.
    1.72 -          socketChannel = serverSocketChannel.accept();
    1.73 -          socketChannel.configureBlocking(false);
    1.74 -          assert socketChannel.isConnected();
    1.75 -          assert socketChannel.finishConnect();
    1.76 -        }
    1.77 -        catch(IOException ex)
    1.78 -        {
    1.79 -          // Under heavy load an IOException "Too many open files may
    1.80 -          // be thrown. It most cases we should slow down the connection
    1.81 -          // accepting, to give the worker threads some time to process work.
    1.82 -          Log.get().severe("IOException while accepting connection: " + ex.getMessage());
    1.83 -          Log.get().info("Connection accepting sleeping for seconds...");
    1.84 -          Thread.sleep(5000); // 5 seconds
    1.85 -          continue;
    1.86 -        }
    1.87 -        
    1.88 -        final NNTPConnection conn;
    1.89 -        try
    1.90 -        {
    1.91 -          conn = new NNTPConnection(socketChannel);
    1.92 -          Connections.getInstance().add(conn);
    1.93 -        }
    1.94 -        catch(IOException ex)
    1.95 -        {
    1.96 -          Log.get().warning(ex.toString());
    1.97 -          socketChannel.close();
    1.98 -          continue;
    1.99 -        }
   1.100 -        
   1.101 -        try
   1.102 -        {
   1.103 -          SelectionKey selKeyWrite =
   1.104 -            registerSelector(writeSelector, socketChannel, SelectionKey.OP_WRITE);
   1.105 -          registerSelector(readSelector, socketChannel, SelectionKey.OP_READ);
   1.106 -          
   1.107 -          Log.get().info("Connected: " + socketChannel.socket().getRemoteSocketAddress());
   1.108 +	public static synchronized NNTPDaemon createInstance(int port)
   1.109 +	{
   1.110 +		if (instance == null) {
   1.111 +			instance = new NNTPDaemon(port);
   1.112 +			return instance;
   1.113 +		} else {
   1.114 +			throw new RuntimeException("NNTPDaemon.createInstance() called twice");
   1.115 +		}
   1.116 +	}
   1.117 +	private int port;
   1.118  
   1.119 -          // Set write selection key and send hello to client
   1.120 -          conn.setWriteSelectionKey(selKeyWrite);
   1.121 -          conn.println("200 " + Config.inst().get(Config.HOSTNAME, "localhost")
   1.122 -              + " " + Main.VERSION + " news server ready - (posting ok).");
   1.123 -        }
   1.124 -        catch(CancelledKeyException cke)
   1.125 -        {
   1.126 -          Log.get().warning("CancelledKeyException " + cke.getMessage() + " was thrown: "
   1.127 -            + socketChannel.socket());
   1.128 -        }
   1.129 -        catch(ClosedChannelException cce)
   1.130 -        {
   1.131 -          Log.get().warning("ClosedChannelException " + cce.getMessage() + " was thrown: "
   1.132 -            + socketChannel.socket());
   1.133 -        }
   1.134 -      }
   1.135 -    }
   1.136 -    catch(BindException ex)
   1.137 -    {
   1.138 -      // Could not bind to socket; this is a fatal problem; so perform shutdown
   1.139 -      ex.printStackTrace();
   1.140 -      System.exit(1);
   1.141 -    }
   1.142 -    catch(IOException ex)
   1.143 -    {
   1.144 -      ex.printStackTrace();
   1.145 -    }
   1.146 -    catch(Exception ex)
   1.147 -    {
   1.148 -      ex.printStackTrace();
   1.149 -    }
   1.150 -  }
   1.151 -  
   1.152 -  public static SelectionKey registerSelector(final Selector selector,
   1.153 -    final SocketChannel channel, final int op)
   1.154 -    throws CancelledKeyException, ClosedChannelException
   1.155 -  {
   1.156 -    // Register the selector at the channel, so that it will be notified
   1.157 -    // on the socket's events
   1.158 -    synchronized(RegisterGate)
   1.159 -    {
   1.160 -      // Wakeup the currently blocking reader/writer thread; we have locked
   1.161 -      // the RegisterGate to prevent the awakened thread to block again
   1.162 -      selector.wakeup();
   1.163 -      
   1.164 -      // Lock the selector to prevent the waiting worker threads going into
   1.165 -      // selector.select() which would block the selector.
   1.166 -      synchronized (selector)
   1.167 -      {
   1.168 -        return channel.register(selector, op, null);
   1.169 -      }
   1.170 -    }
   1.171 -  }
   1.172 -  
   1.173 +	private NNTPDaemon(final int port)
   1.174 +	{
   1.175 +		Log.get().info("Server listening on port " + port);
   1.176 +		this.port = port;
   1.177 +	}
   1.178 +
   1.179 +	@Override
   1.180 +	public void run()
   1.181 +	{
   1.182 +		try {
   1.183 +			// Create a Selector that handles the SocketChannel multiplexing
   1.184 +			final Selector readSelector = Selector.open();
   1.185 +			final Selector writeSelector = Selector.open();
   1.186 +
   1.187 +			// Start working threads
   1.188 +			final int workerThreads = Runtime.getRuntime().availableProcessors() * 4;
   1.189 +			ConnectionWorker[] cworkers = new ConnectionWorker[workerThreads];
   1.190 +			for (int n = 0; n < workerThreads; n++) {
   1.191 +				cworkers[n] = new ConnectionWorker();
   1.192 +				cworkers[n].start();
   1.193 +			}
   1.194 +
   1.195 +			ChannelWriter.getInstance().setSelector(writeSelector);
   1.196 +			ChannelReader.getInstance().setSelector(readSelector);
   1.197 +			ChannelWriter.getInstance().start();
   1.198 +			ChannelReader.getInstance().start();
   1.199 +
   1.200 +			final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
   1.201 +			serverSocketChannel.configureBlocking(true);  // Set to blocking mode
   1.202 +
   1.203 +			// Configure ServerSocket; bind to socket...
   1.204 +			final ServerSocket serverSocket = serverSocketChannel.socket();
   1.205 +			serverSocket.bind(new InetSocketAddress(this.port));
   1.206 +
   1.207 +			while (isRunning()) {
   1.208 +				SocketChannel socketChannel;
   1.209 +
   1.210 +				try {
   1.211 +					// As we set the server socket channel to blocking mode the accept()
   1.212 +					// method will block.
   1.213 +					socketChannel = serverSocketChannel.accept();
   1.214 +					socketChannel.configureBlocking(false);
   1.215 +					assert socketChannel.isConnected();
   1.216 +					assert socketChannel.finishConnect();
   1.217 +				} catch (IOException ex) {
   1.218 +					// Under heavy load an IOException "Too many open files may
   1.219 +					// be thrown. It most cases we should slow down the connection
   1.220 +					// accepting, to give the worker threads some time to process work.
   1.221 +					Log.get().severe("IOException while accepting connection: " + ex.getMessage());
   1.222 +					Log.get().info("Connection accepting sleeping for seconds...");
   1.223 +					Thread.sleep(5000); // 5 seconds
   1.224 +					continue;
   1.225 +				}
   1.226 +
   1.227 +				final NNTPConnection conn;
   1.228 +				try {
   1.229 +					conn = new NNTPConnection(socketChannel);
   1.230 +					Connections.getInstance().add(conn);
   1.231 +				} catch (IOException ex) {
   1.232 +					Log.get().warning(ex.toString());
   1.233 +					socketChannel.close();
   1.234 +					continue;
   1.235 +				}
   1.236 +
   1.237 +				try {
   1.238 +					SelectionKey selKeyWrite =
   1.239 +						registerSelector(writeSelector, socketChannel, SelectionKey.OP_WRITE);
   1.240 +					registerSelector(readSelector, socketChannel, SelectionKey.OP_READ);
   1.241 +
   1.242 +					Log.get().info("Connected: " + socketChannel.socket().getRemoteSocketAddress());
   1.243 +
   1.244 +					// Set write selection key and send hello to client
   1.245 +					conn.setWriteSelectionKey(selKeyWrite);
   1.246 +					conn.println("200 " + Config.inst().get(Config.HOSTNAME, "localhost")
   1.247 +						+ " " + Main.VERSION + " news server ready - (posting ok).");
   1.248 +				} catch (CancelledKeyException cke) {
   1.249 +					Log.get().warning("CancelledKeyException " + cke.getMessage() + " was thrown: "
   1.250 +						+ socketChannel.socket());
   1.251 +				} catch (ClosedChannelException cce) {
   1.252 +					Log.get().warning("ClosedChannelException " + cce.getMessage() + " was thrown: "
   1.253 +						+ socketChannel.socket());
   1.254 +				}
   1.255 +			}
   1.256 +		} catch (BindException ex) {
   1.257 +			// Could not bind to socket; this is a fatal problem; so perform shutdown
   1.258 +			ex.printStackTrace();
   1.259 +			System.exit(1);
   1.260 +		} catch (IOException ex) {
   1.261 +			ex.printStackTrace();
   1.262 +		} catch (Exception ex) {
   1.263 +			ex.printStackTrace();
   1.264 +		}
   1.265 +	}
   1.266 +
   1.267 +	public static SelectionKey registerSelector(final Selector selector,
   1.268 +		final SocketChannel channel, final int op)
   1.269 +		throws CancelledKeyException, ClosedChannelException
   1.270 +	{
   1.271 +		// Register the selector at the channel, so that it will be notified
   1.272 +		// on the socket's events
   1.273 +		synchronized (RegisterGate) {
   1.274 +			// Wakeup the currently blocking reader/writer thread; we have locked
   1.275 +			// the RegisterGate to prevent the awakened thread to block again
   1.276 +			selector.wakeup();
   1.277 +
   1.278 +			// Lock the selector to prevent the waiting worker threads going into
   1.279 +			// selector.select() which would block the selector.
   1.280 +			synchronized (selector) {
   1.281 +				return channel.register(selector, op, null);
   1.282 +			}
   1.283 +		}
   1.284 +	}
   1.285  }