diff -r c404a87db5b7 -r 74139325d305 src/org/sonews/daemon/NNTPConnection.java --- a/src/org/sonews/daemon/NNTPConnection.java Sun Aug 29 17:43:58 2010 +0200 +++ b/src/org/sonews/daemon/NNTPConnection.java Sun Aug 29 18:17:37 2010 +0200 @@ -46,383 +46,341 @@ public final class NNTPConnection { - public static final String NEWLINE = "\r\n"; // RFC defines this as newline - public static final String MESSAGE_ID_PATTERN = "<[^>]+>"; - - private static final Timer cancelTimer = new Timer(true); // Thread-safe? True for run as daemon - - /** SocketChannel is generally thread-safe */ - private SocketChannel channel = null; - private Charset charset = Charset.forName("UTF-8"); - private Command command = null; - private Article currentArticle = null; - private Channel currentGroup = null; - private volatile long lastActivity = System.currentTimeMillis(); - private ChannelLineBuffers lineBuffers = new ChannelLineBuffers(); - private int readLock = 0; - private final Object readLockGate = new Object(); - private SelectionKey writeSelKey = null; - - public NNTPConnection(final SocketChannel channel) - throws IOException - { - if(channel == null) - { - throw new IllegalArgumentException("channel is null"); - } + public static final String NEWLINE = "\r\n"; // RFC defines this as newline + public static final String MESSAGE_ID_PATTERN = "<[^>]+>"; + private static final Timer cancelTimer = new Timer(true); // Thread-safe? True for run as daemon + /** SocketChannel is generally thread-safe */ + private SocketChannel channel = null; + private Charset charset = Charset.forName("UTF-8"); + private Command command = null; + private Article currentArticle = null; + private Channel currentGroup = null; + private volatile long lastActivity = System.currentTimeMillis(); + private ChannelLineBuffers lineBuffers = new ChannelLineBuffers(); + private int readLock = 0; + private final Object readLockGate = new Object(); + private SelectionKey writeSelKey = null; - this.channel = channel; - Stats.getInstance().clientConnect(); - } - - /** - * Tries to get the read lock for this NNTPConnection. This method is Thread- - * safe and returns true of the read lock was successfully set. If the lock - * is still hold by another Thread the method returns false. - */ - boolean tryReadLock() - { - // As synchronizing simple types may cause deadlocks, - // we use a gate object. - synchronized(readLockGate) - { - if(readLock != 0) - { - return false; - } - else - { - readLock = Thread.currentThread().hashCode(); - return true; - } - } - } - - /** - * Releases the read lock in a Thread-safe way. - * @throws IllegalMonitorStateException if a Thread not holding the lock - * tries to release it. - */ - void unlockReadLock() - { - synchronized(readLockGate) - { - if(readLock == Thread.currentThread().hashCode()) - { - readLock = 0; - } - else - { - throw new IllegalMonitorStateException(); - } - } - } - - /** - * @return Current input buffer of this NNTPConnection instance. - */ - public ByteBuffer getInputBuffer() - { - return this.lineBuffers.getInputBuffer(); - } - - /** - * @return Output buffer of this NNTPConnection which has at least one byte - * free storage. - */ - public ByteBuffer getOutputBuffer() - { - return this.lineBuffers.getOutputBuffer(); - } - - /** - * @return ChannelLineBuffers instance associated with this NNTPConnection. - */ - public ChannelLineBuffers getBuffers() - { - return this.lineBuffers; - } - - /** - * @return true if this connection comes from a local remote address. - */ - public boolean isLocalConnection() - { - return ((InetSocketAddress)this.channel.socket().getRemoteSocketAddress()) - .getHostName().equalsIgnoreCase("localhost"); - } + public NNTPConnection(final SocketChannel channel) + throws IOException + { + if (channel == null) { + throw new IllegalArgumentException("channel is null"); + } - void setWriteSelectionKey(SelectionKey selKey) - { - this.writeSelKey = selKey; - } + this.channel = channel; + Stats.getInstance().clientConnect(); + } - public void shutdownInput() - { - try - { - // Closes the input line of the channel's socket, so no new data - // will be received and a timeout can be triggered. - this.channel.socket().shutdownInput(); - } - catch(IOException ex) - { - Log.get().warning("Exception in NNTPConnection.shutdownInput(): " + ex); - } - } - - public void shutdownOutput() - { - cancelTimer.schedule(new TimerTask() - { - @Override - public void run() - { - try - { - // Closes the output line of the channel's socket. - channel.socket().shutdownOutput(); - channel.close(); - } - catch(SocketException ex) - { - // Socket was already disconnected - Log.get().info("NNTPConnection.shutdownOutput(): " + ex); - } - catch(Exception ex) - { - Log.get().warning("NNTPConnection.shutdownOutput(): " + ex); - } - } - }, 3000); - } - - public SocketChannel getSocketChannel() - { - return this.channel; - } - - public Article getCurrentArticle() - { - return this.currentArticle; - } - - public Charset getCurrentCharset() - { - return this.charset; - } + /** + * Tries to get the read lock for this NNTPConnection. This method is Thread- + * safe and returns true of the read lock was successfully set. If the lock + * is still hold by another Thread the method returns false. + */ + boolean tryReadLock() + { + // As synchronizing simple types may cause deadlocks, + // we use a gate object. + synchronized (readLockGate) { + if (readLock != 0) { + return false; + } else { + readLock = Thread.currentThread().hashCode(); + return true; + } + } + } - /** - * @return The currently selected communication channel (not SocketChannel) - */ - public Channel getCurrentChannel() - { - return this.currentGroup; - } - - public void setCurrentArticle(final Article article) - { - this.currentArticle = article; - } - - public void setCurrentGroup(final Channel group) - { - this.currentGroup = group; - } - - public long getLastActivity() - { - return this.lastActivity; - } - - /** - * Due to the readLockGate there is no need to synchronize this method. - * @param raw - * @throws IllegalArgumentException if raw is null. - * @throws IllegalStateException if calling thread does not own the readLock. - */ - void lineReceived(byte[] raw) - { - if(raw == null) - { - throw new IllegalArgumentException("raw is null"); - } - - if(readLock == 0 || readLock != Thread.currentThread().hashCode()) - { - throw new IllegalStateException("readLock not properly set"); - } + /** + * Releases the read lock in a Thread-safe way. + * @throws IllegalMonitorStateException if a Thread not holding the lock + * tries to release it. + */ + void unlockReadLock() + { + synchronized (readLockGate) { + if (readLock == Thread.currentThread().hashCode()) { + readLock = 0; + } else { + throw new IllegalMonitorStateException(); + } + } + } - this.lastActivity = System.currentTimeMillis(); - - String line = new String(raw, this.charset); - - // There might be a trailing \r, but trim() is a bad idea - // as it removes also leading spaces from long header lines. - if(line.endsWith("\r")) - { - line = line.substring(0, line.length() - 1); - raw = Arrays.copyOf(raw, raw.length - 1); - } - - Log.get().fine("<< " + line); - - if(command == null) - { - command = parseCommandLine(line); - assert command != null; - } + /** + * @return Current input buffer of this NNTPConnection instance. + */ + public ByteBuffer getInputBuffer() + { + return this.lineBuffers.getInputBuffer(); + } - try - { - // The command object will process the line we just received - try - { - command.processLine(this, line, raw); - } - catch(StorageBackendException ex) - { - Log.get().info("Retry command processing after StorageBackendException"); + /** + * @return Output buffer of this NNTPConnection which has at least one byte + * free storage. + */ + public ByteBuffer getOutputBuffer() + { + return this.lineBuffers.getOutputBuffer(); + } - // Try it a second time, so that the backend has time to recover - command.processLine(this, line, raw); - } - } - catch(ClosedChannelException ex0) - { - try - { - Log.get().info("Connection to " + channel.socket().getRemoteSocketAddress() - + " closed: " + ex0); - } - catch(Exception ex0a) - { - ex0a.printStackTrace(); - } - } - catch(Exception ex1) // This will catch a second StorageBackendException - { - try - { - command = null; - ex1.printStackTrace(); - println("500 Internal server error"); - } - catch(Exception ex2) - { - ex2.printStackTrace(); - } - } + /** + * @return ChannelLineBuffers instance associated with this NNTPConnection. + */ + public ChannelLineBuffers getBuffers() + { + return this.lineBuffers; + } - if(command == null || command.hasFinished()) - { - command = null; - charset = Charset.forName("UTF-8"); // Reset to default - } - } - - /** - * This method determines the fitting command processing class. - * @param line - * @return - */ - private Command parseCommandLine(String line) - { - String cmdStr = line.split(" ")[0]; - return CommandSelector.getInstance().get(cmdStr); - } - - /** - * Puts the given line into the output buffer, adds a newline character - * and returns. The method returns immediately and does not block until - * the line was sent. If line is longer than 510 octets it is split up in - * several lines. Each line is terminated by \r\n (NNTPConnection.NEWLINE). - * @param line - */ - public void println(final CharSequence line, final Charset charset) - throws IOException - { - writeToChannel(CharBuffer.wrap(line), charset, line); - writeToChannel(CharBuffer.wrap(NEWLINE), charset, null); - } + /** + * @return true if this connection comes from a local remote address. + */ + public boolean isLocalConnection() + { + return ((InetSocketAddress) this.channel.socket().getRemoteSocketAddress()).getHostName().equalsIgnoreCase("localhost"); + } - /** - * Writes the given raw lines to the output buffers and finishes with - * a newline character (\r\n). - * @param rawLines - */ - public void println(final byte[] rawLines) - throws IOException - { - this.lineBuffers.addOutputBuffer(ByteBuffer.wrap(rawLines)); - writeToChannel(CharBuffer.wrap(NEWLINE), charset, null); - } - - /** - * Encodes the given CharBuffer using the given Charset to a bunch of - * ByteBuffers (each 512 bytes large) and enqueues them for writing at the - * connected SocketChannel. - * @throws java.io.IOException - */ - private void writeToChannel(CharBuffer characters, final Charset charset, - CharSequence debugLine) - throws IOException - { - if(!charset.canEncode()) - { - Log.get().severe("FATAL: Charset " + charset + " cannot encode!"); - return; - } - - // Write characters to output buffers - LineEncoder lenc = new LineEncoder(characters, charset); - lenc.encode(lineBuffers); - - enableWriteEvents(debugLine); - } + void setWriteSelectionKey(SelectionKey selKey) + { + this.writeSelKey = selKey; + } - private void enableWriteEvents(CharSequence debugLine) - { - // Enable OP_WRITE events so that the buffers are processed - try - { - this.writeSelKey.interestOps(SelectionKey.OP_WRITE); - ChannelWriter.getInstance().getSelector().wakeup(); - } - catch(Exception ex) // CancelledKeyException and ChannelCloseException - { - Log.get().warning("NNTPConnection.writeToChannel(): " + ex); - return; - } + public void shutdownInput() + { + try { + // Closes the input line of the channel's socket, so no new data + // will be received and a timeout can be triggered. + this.channel.socket().shutdownInput(); + } catch (IOException ex) { + Log.get().warning("Exception in NNTPConnection.shutdownInput(): " + ex); + } + } - // Update last activity timestamp - this.lastActivity = System.currentTimeMillis(); - if(debugLine != null) - { - Log.get().fine(">> " + debugLine); - } - } - - public void println(final CharSequence line) - throws IOException - { - println(line, charset); - } - - public void print(final String line) - throws IOException - { - writeToChannel(CharBuffer.wrap(line), charset, line); - } - - public void setCurrentCharset(final Charset charset) - { - this.charset = charset; - } + public void shutdownOutput() + { + cancelTimer.schedule(new TimerTask() + { - void setLastActivity(long timestamp) - { - this.lastActivity = timestamp; - } - + @Override + public void run() + { + try { + // Closes the output line of the channel's socket. + channel.socket().shutdownOutput(); + channel.close(); + } catch (SocketException ex) { + // Socket was already disconnected + Log.get().info("NNTPConnection.shutdownOutput(): " + ex); + } catch (Exception ex) { + Log.get().warning("NNTPConnection.shutdownOutput(): " + ex); + } + } + }, 3000); + } + + public SocketChannel getSocketChannel() + { + return this.channel; + } + + public Article getCurrentArticle() + { + return this.currentArticle; + } + + public Charset getCurrentCharset() + { + return this.charset; + } + + /** + * @return The currently selected communication channel (not SocketChannel) + */ + public Channel getCurrentChannel() + { + return this.currentGroup; + } + + public void setCurrentArticle(final Article article) + { + this.currentArticle = article; + } + + public void setCurrentGroup(final Channel group) + { + this.currentGroup = group; + } + + public long getLastActivity() + { + return this.lastActivity; + } + + /** + * Due to the readLockGate there is no need to synchronize this method. + * @param raw + * @throws IllegalArgumentException if raw is null. + * @throws IllegalStateException if calling thread does not own the readLock. + */ + void lineReceived(byte[] raw) + { + if (raw == null) { + throw new IllegalArgumentException("raw is null"); + } + + if (readLock == 0 || readLock != Thread.currentThread().hashCode()) { + throw new IllegalStateException("readLock not properly set"); + } + + this.lastActivity = System.currentTimeMillis(); + + String line = new String(raw, this.charset); + + // There might be a trailing \r, but trim() is a bad idea + // as it removes also leading spaces from long header lines. + if (line.endsWith("\r")) { + line = line.substring(0, line.length() - 1); + raw = Arrays.copyOf(raw, raw.length - 1); + } + + Log.get().fine("<< " + line); + + if (command == null) { + command = parseCommandLine(line); + assert command != null; + } + + try { + // The command object will process the line we just received + try { + command.processLine(this, line, raw); + } catch (StorageBackendException ex) { + Log.get().info("Retry command processing after StorageBackendException"); + + // Try it a second time, so that the backend has time to recover + command.processLine(this, line, raw); + } + } catch (ClosedChannelException ex0) { + try { + Log.get().info("Connection to " + channel.socket().getRemoteSocketAddress() + + " closed: " + ex0); + } catch (Exception ex0a) { + ex0a.printStackTrace(); + } + } catch (Exception ex1) // This will catch a second StorageBackendException + { + try { + command = null; + ex1.printStackTrace(); + println("500 Internal server error"); + } catch (Exception ex2) { + ex2.printStackTrace(); + } + } + + if (command == null || command.hasFinished()) { + command = null; + charset = Charset.forName("UTF-8"); // Reset to default + } + } + + /** + * This method determines the fitting command processing class. + * @param line + * @return + */ + private Command parseCommandLine(String line) + { + String cmdStr = line.split(" ")[0]; + return CommandSelector.getInstance().get(cmdStr); + } + + /** + * Puts the given line into the output buffer, adds a newline character + * and returns. The method returns immediately and does not block until + * the line was sent. If line is longer than 510 octets it is split up in + * several lines. Each line is terminated by \r\n (NNTPConnection.NEWLINE). + * @param line + */ + public void println(final CharSequence line, final Charset charset) + throws IOException + { + writeToChannel(CharBuffer.wrap(line), charset, line); + writeToChannel(CharBuffer.wrap(NEWLINE), charset, null); + } + + /** + * Writes the given raw lines to the output buffers and finishes with + * a newline character (\r\n). + * @param rawLines + */ + public void println(final byte[] rawLines) + throws IOException + { + this.lineBuffers.addOutputBuffer(ByteBuffer.wrap(rawLines)); + writeToChannel(CharBuffer.wrap(NEWLINE), charset, null); + } + + /** + * Encodes the given CharBuffer using the given Charset to a bunch of + * ByteBuffers (each 512 bytes large) and enqueues them for writing at the + * connected SocketChannel. + * @throws java.io.IOException + */ + private void writeToChannel(CharBuffer characters, final Charset charset, + CharSequence debugLine) + throws IOException + { + if (!charset.canEncode()) { + Log.get().severe("FATAL: Charset " + charset + " cannot encode!"); + return; + } + + // Write characters to output buffers + LineEncoder lenc = new LineEncoder(characters, charset); + lenc.encode(lineBuffers); + + enableWriteEvents(debugLine); + } + + private void enableWriteEvents(CharSequence debugLine) + { + // Enable OP_WRITE events so that the buffers are processed + try { + this.writeSelKey.interestOps(SelectionKey.OP_WRITE); + ChannelWriter.getInstance().getSelector().wakeup(); + } catch (Exception ex) // CancelledKeyException and ChannelCloseException + { + Log.get().warning("NNTPConnection.writeToChannel(): " + ex); + return; + } + + // Update last activity timestamp + this.lastActivity = System.currentTimeMillis(); + if (debugLine != null) { + Log.get().fine(">> " + debugLine); + } + } + + public void println(final CharSequence line) + throws IOException + { + println(line, charset); + } + + public void print(final String line) + throws IOException + { + writeToChannel(CharBuffer.wrap(line), charset, line); + } + + public void setCurrentCharset(final Charset charset) + { + this.charset = charset; + } + + void setLastActivity(long timestamp) + { + this.lastActivity = timestamp; + } }