1.1 --- a/src/org/sonews/daemon/NNTPDaemon.java Sun Aug 29 17:43:58 2010 +0200
1.2 +++ b/src/org/sonews/daemon/NNTPDaemon.java Sun Aug 29 18:17:37 2010 +0200
1.3 @@ -40,158 +40,130 @@
1.4 public final class NNTPDaemon extends AbstractDaemon
1.5 {
1.6
1.7 - public static final Object RegisterGate = new Object();
1.8 -
1.9 - private static NNTPDaemon instance = null;
1.10 -
1.11 - public static synchronized NNTPDaemon createInstance(int port)
1.12 - {
1.13 - if(instance == null)
1.14 - {
1.15 - instance = new NNTPDaemon(port);
1.16 - return instance;
1.17 - }
1.18 - else
1.19 - {
1.20 - throw new RuntimeException("NNTPDaemon.createInstance() called twice");
1.21 - }
1.22 - }
1.23 -
1.24 - private int port;
1.25 -
1.26 - private NNTPDaemon(final int port)
1.27 - {
1.28 - Log.get().info("Server listening on port " + port);
1.29 - this.port = port;
1.30 - }
1.31 + public static final Object RegisterGate = new Object();
1.32 + private static NNTPDaemon instance = null;
1.33
1.34 - @Override
1.35 - public void run()
1.36 - {
1.37 - try
1.38 - {
1.39 - // Create a Selector that handles the SocketChannel multiplexing
1.40 - final Selector readSelector = Selector.open();
1.41 - final Selector writeSelector = Selector.open();
1.42 -
1.43 - // Start working threads
1.44 - final int workerThreads = Runtime.getRuntime().availableProcessors() * 4;
1.45 - ConnectionWorker[] cworkers = new ConnectionWorker[workerThreads];
1.46 - for(int n = 0; n < workerThreads; n++)
1.47 - {
1.48 - cworkers[n] = new ConnectionWorker();
1.49 - cworkers[n].start();
1.50 - }
1.51 -
1.52 - ChannelWriter.getInstance().setSelector(writeSelector);
1.53 - ChannelReader.getInstance().setSelector(readSelector);
1.54 - ChannelWriter.getInstance().start();
1.55 - ChannelReader.getInstance().start();
1.56 -
1.57 - final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
1.58 - serverSocketChannel.configureBlocking(true); // Set to blocking mode
1.59 -
1.60 - // Configure ServerSocket; bind to socket...
1.61 - final ServerSocket serverSocket = serverSocketChannel.socket();
1.62 - serverSocket.bind(new InetSocketAddress(this.port));
1.63 -
1.64 - while(isRunning())
1.65 - {
1.66 - SocketChannel socketChannel;
1.67 -
1.68 - try
1.69 - {
1.70 - // As we set the server socket channel to blocking mode the accept()
1.71 - // method will block.
1.72 - socketChannel = serverSocketChannel.accept();
1.73 - socketChannel.configureBlocking(false);
1.74 - assert socketChannel.isConnected();
1.75 - assert socketChannel.finishConnect();
1.76 - }
1.77 - catch(IOException ex)
1.78 - {
1.79 - // Under heavy load an IOException "Too many open files may
1.80 - // be thrown. It most cases we should slow down the connection
1.81 - // accepting, to give the worker threads some time to process work.
1.82 - Log.get().severe("IOException while accepting connection: " + ex.getMessage());
1.83 - Log.get().info("Connection accepting sleeping for seconds...");
1.84 - Thread.sleep(5000); // 5 seconds
1.85 - continue;
1.86 - }
1.87 -
1.88 - final NNTPConnection conn;
1.89 - try
1.90 - {
1.91 - conn = new NNTPConnection(socketChannel);
1.92 - Connections.getInstance().add(conn);
1.93 - }
1.94 - catch(IOException ex)
1.95 - {
1.96 - Log.get().warning(ex.toString());
1.97 - socketChannel.close();
1.98 - continue;
1.99 - }
1.100 -
1.101 - try
1.102 - {
1.103 - SelectionKey selKeyWrite =
1.104 - registerSelector(writeSelector, socketChannel, SelectionKey.OP_WRITE);
1.105 - registerSelector(readSelector, socketChannel, SelectionKey.OP_READ);
1.106 -
1.107 - Log.get().info("Connected: " + socketChannel.socket().getRemoteSocketAddress());
1.108 + public static synchronized NNTPDaemon createInstance(int port)
1.109 + {
1.110 + if (instance == null) {
1.111 + instance = new NNTPDaemon(port);
1.112 + return instance;
1.113 + } else {
1.114 + throw new RuntimeException("NNTPDaemon.createInstance() called twice");
1.115 + }
1.116 + }
1.117 + private int port;
1.118
1.119 - // Set write selection key and send hello to client
1.120 - conn.setWriteSelectionKey(selKeyWrite);
1.121 - conn.println("200 " + Config.inst().get(Config.HOSTNAME, "localhost")
1.122 - + " " + Main.VERSION + " news server ready - (posting ok).");
1.123 - }
1.124 - catch(CancelledKeyException cke)
1.125 - {
1.126 - Log.get().warning("CancelledKeyException " + cke.getMessage() + " was thrown: "
1.127 - + socketChannel.socket());
1.128 - }
1.129 - catch(ClosedChannelException cce)
1.130 - {
1.131 - Log.get().warning("ClosedChannelException " + cce.getMessage() + " was thrown: "
1.132 - + socketChannel.socket());
1.133 - }
1.134 - }
1.135 - }
1.136 - catch(BindException ex)
1.137 - {
1.138 - // Could not bind to socket; this is a fatal problem; so perform shutdown
1.139 - ex.printStackTrace();
1.140 - System.exit(1);
1.141 - }
1.142 - catch(IOException ex)
1.143 - {
1.144 - ex.printStackTrace();
1.145 - }
1.146 - catch(Exception ex)
1.147 - {
1.148 - ex.printStackTrace();
1.149 - }
1.150 - }
1.151 -
1.152 - public static SelectionKey registerSelector(final Selector selector,
1.153 - final SocketChannel channel, final int op)
1.154 - throws CancelledKeyException, ClosedChannelException
1.155 - {
1.156 - // Register the selector at the channel, so that it will be notified
1.157 - // on the socket's events
1.158 - synchronized(RegisterGate)
1.159 - {
1.160 - // Wakeup the currently blocking reader/writer thread; we have locked
1.161 - // the RegisterGate to prevent the awakened thread to block again
1.162 - selector.wakeup();
1.163 -
1.164 - // Lock the selector to prevent the waiting worker threads going into
1.165 - // selector.select() which would block the selector.
1.166 - synchronized (selector)
1.167 - {
1.168 - return channel.register(selector, op, null);
1.169 - }
1.170 - }
1.171 - }
1.172 -
1.173 + private NNTPDaemon(final int port)
1.174 + {
1.175 + Log.get().info("Server listening on port " + port);
1.176 + this.port = port;
1.177 + }
1.178 +
1.179 + @Override
1.180 + public void run()
1.181 + {
1.182 + try {
1.183 + // Create a Selector that handles the SocketChannel multiplexing
1.184 + final Selector readSelector = Selector.open();
1.185 + final Selector writeSelector = Selector.open();
1.186 +
1.187 + // Start working threads
1.188 + final int workerThreads = Runtime.getRuntime().availableProcessors() * 4;
1.189 + ConnectionWorker[] cworkers = new ConnectionWorker[workerThreads];
1.190 + for (int n = 0; n < workerThreads; n++) {
1.191 + cworkers[n] = new ConnectionWorker();
1.192 + cworkers[n].start();
1.193 + }
1.194 +
1.195 + ChannelWriter.getInstance().setSelector(writeSelector);
1.196 + ChannelReader.getInstance().setSelector(readSelector);
1.197 + ChannelWriter.getInstance().start();
1.198 + ChannelReader.getInstance().start();
1.199 +
1.200 + final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
1.201 + serverSocketChannel.configureBlocking(true); // Set to blocking mode
1.202 +
1.203 + // Configure ServerSocket; bind to socket...
1.204 + final ServerSocket serverSocket = serverSocketChannel.socket();
1.205 + serverSocket.bind(new InetSocketAddress(this.port));
1.206 +
1.207 + while (isRunning()) {
1.208 + SocketChannel socketChannel;
1.209 +
1.210 + try {
1.211 + // As we set the server socket channel to blocking mode the accept()
1.212 + // method will block.
1.213 + socketChannel = serverSocketChannel.accept();
1.214 + socketChannel.configureBlocking(false);
1.215 + assert socketChannel.isConnected();
1.216 + assert socketChannel.finishConnect();
1.217 + } catch (IOException ex) {
1.218 + // Under heavy load an IOException "Too many open files may
1.219 + // be thrown. It most cases we should slow down the connection
1.220 + // accepting, to give the worker threads some time to process work.
1.221 + Log.get().severe("IOException while accepting connection: " + ex.getMessage());
1.222 + Log.get().info("Connection accepting sleeping for seconds...");
1.223 + Thread.sleep(5000); // 5 seconds
1.224 + continue;
1.225 + }
1.226 +
1.227 + final NNTPConnection conn;
1.228 + try {
1.229 + conn = new NNTPConnection(socketChannel);
1.230 + Connections.getInstance().add(conn);
1.231 + } catch (IOException ex) {
1.232 + Log.get().warning(ex.toString());
1.233 + socketChannel.close();
1.234 + continue;
1.235 + }
1.236 +
1.237 + try {
1.238 + SelectionKey selKeyWrite =
1.239 + registerSelector(writeSelector, socketChannel, SelectionKey.OP_WRITE);
1.240 + registerSelector(readSelector, socketChannel, SelectionKey.OP_READ);
1.241 +
1.242 + Log.get().info("Connected: " + socketChannel.socket().getRemoteSocketAddress());
1.243 +
1.244 + // Set write selection key and send hello to client
1.245 + conn.setWriteSelectionKey(selKeyWrite);
1.246 + conn.println("200 " + Config.inst().get(Config.HOSTNAME, "localhost")
1.247 + + " " + Main.VERSION + " news server ready - (posting ok).");
1.248 + } catch (CancelledKeyException cke) {
1.249 + Log.get().warning("CancelledKeyException " + cke.getMessage() + " was thrown: "
1.250 + + socketChannel.socket());
1.251 + } catch (ClosedChannelException cce) {
1.252 + Log.get().warning("ClosedChannelException " + cce.getMessage() + " was thrown: "
1.253 + + socketChannel.socket());
1.254 + }
1.255 + }
1.256 + } catch (BindException ex) {
1.257 + // Could not bind to socket; this is a fatal problem; so perform shutdown
1.258 + ex.printStackTrace();
1.259 + System.exit(1);
1.260 + } catch (IOException ex) {
1.261 + ex.printStackTrace();
1.262 + } catch (Exception ex) {
1.263 + ex.printStackTrace();
1.264 + }
1.265 + }
1.266 +
1.267 + public static SelectionKey registerSelector(final Selector selector,
1.268 + final SocketChannel channel, final int op)
1.269 + throws CancelledKeyException, ClosedChannelException
1.270 + {
1.271 + // Register the selector at the channel, so that it will be notified
1.272 + // on the socket's events
1.273 + synchronized (RegisterGate) {
1.274 + // Wakeup the currently blocking reader/writer thread; we have locked
1.275 + // the RegisterGate to prevent the awakened thread to block again
1.276 + selector.wakeup();
1.277 +
1.278 + // Lock the selector to prevent the waiting worker threads going into
1.279 + // selector.select() which would block the selector.
1.280 + synchronized (selector) {
1.281 + return channel.register(selector, op, null);
1.282 + }
1.283 + }
1.284 + }
1.285 }