1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
1.2 +++ b/src/org/sonews/daemon/NNTPConnection.java Sun Aug 29 17:28:58 2010 +0200
1.3 @@ -0,0 +1,428 @@
1.4 +/*
1.5 + * SONEWS News Server
1.6 + * see AUTHORS for the list of contributors
1.7 + *
1.8 + * This program is free software: you can redistribute it and/or modify
1.9 + * it under the terms of the GNU General Public License as published by
1.10 + * the Free Software Foundation, either version 3 of the License, or
1.11 + * (at your option) any later version.
1.12 + *
1.13 + * This program is distributed in the hope that it will be useful,
1.14 + * but WITHOUT ANY WARRANTY; without even the implied warranty of
1.15 + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
1.16 + * GNU General Public License for more details.
1.17 + *
1.18 + * You should have received a copy of the GNU General Public License
1.19 + * along with this program. If not, see <http://www.gnu.org/licenses/>.
1.20 + */
1.21 +
1.22 +package org.sonews.daemon;
1.23 +
1.24 +import java.io.IOException;
1.25 +import java.net.InetSocketAddress;
1.26 +import java.net.SocketException;
1.27 +import java.nio.ByteBuffer;
1.28 +import java.nio.CharBuffer;
1.29 +import java.nio.channels.ClosedChannelException;
1.30 +import java.nio.channels.SelectionKey;
1.31 +import java.nio.channels.SocketChannel;
1.32 +import java.nio.charset.Charset;
1.33 +import java.util.Arrays;
1.34 +import java.util.Timer;
1.35 +import java.util.TimerTask;
1.36 +import org.sonews.daemon.command.Command;
1.37 +import org.sonews.storage.Article;
1.38 +import org.sonews.storage.Channel;
1.39 +import org.sonews.storage.StorageBackendException;
1.40 +import org.sonews.util.Log;
1.41 +import org.sonews.util.Stats;
1.42 +
1.43 +/**
1.44 + * For every SocketChannel (so TCP/IP connection) there is an instance of
1.45 + * this class.
1.46 + * @author Christian Lins
1.47 + * @since sonews/0.5.0
1.48 + */
1.49 +public final class NNTPConnection
1.50 +{
1.51 +
1.52 + public static final String NEWLINE = "\r\n"; // RFC defines this as newline
1.53 + public static final String MESSAGE_ID_PATTERN = "<[^>]+>";
1.54 +
1.55 + private static final Timer cancelTimer = new Timer(true); // Thread-safe? True for run as daemon
1.56 +
1.57 + /** SocketChannel is generally thread-safe */
1.58 + private SocketChannel channel = null;
1.59 + private Charset charset = Charset.forName("UTF-8");
1.60 + private Command command = null;
1.61 + private Article currentArticle = null;
1.62 + private Channel currentGroup = null;
1.63 + private volatile long lastActivity = System.currentTimeMillis();
1.64 + private ChannelLineBuffers lineBuffers = new ChannelLineBuffers();
1.65 + private int readLock = 0;
1.66 + private final Object readLockGate = new Object();
1.67 + private SelectionKey writeSelKey = null;
1.68 +
1.69 + public NNTPConnection(final SocketChannel channel)
1.70 + throws IOException
1.71 + {
1.72 + if(channel == null)
1.73 + {
1.74 + throw new IllegalArgumentException("channel is null");
1.75 + }
1.76 +
1.77 + this.channel = channel;
1.78 + Stats.getInstance().clientConnect();
1.79 + }
1.80 +
1.81 + /**
1.82 + * Tries to get the read lock for this NNTPConnection. This method is Thread-
1.83 + * safe and returns true of the read lock was successfully set. If the lock
1.84 + * is still hold by another Thread the method returns false.
1.85 + */
1.86 + boolean tryReadLock()
1.87 + {
1.88 + // As synchronizing simple types may cause deadlocks,
1.89 + // we use a gate object.
1.90 + synchronized(readLockGate)
1.91 + {
1.92 + if(readLock != 0)
1.93 + {
1.94 + return false;
1.95 + }
1.96 + else
1.97 + {
1.98 + readLock = Thread.currentThread().hashCode();
1.99 + return true;
1.100 + }
1.101 + }
1.102 + }
1.103 +
1.104 + /**
1.105 + * Releases the read lock in a Thread-safe way.
1.106 + * @throws IllegalMonitorStateException if a Thread not holding the lock
1.107 + * tries to release it.
1.108 + */
1.109 + void unlockReadLock()
1.110 + {
1.111 + synchronized(readLockGate)
1.112 + {
1.113 + if(readLock == Thread.currentThread().hashCode())
1.114 + {
1.115 + readLock = 0;
1.116 + }
1.117 + else
1.118 + {
1.119 + throw new IllegalMonitorStateException();
1.120 + }
1.121 + }
1.122 + }
1.123 +
1.124 + /**
1.125 + * @return Current input buffer of this NNTPConnection instance.
1.126 + */
1.127 + public ByteBuffer getInputBuffer()
1.128 + {
1.129 + return this.lineBuffers.getInputBuffer();
1.130 + }
1.131 +
1.132 + /**
1.133 + * @return Output buffer of this NNTPConnection which has at least one byte
1.134 + * free storage.
1.135 + */
1.136 + public ByteBuffer getOutputBuffer()
1.137 + {
1.138 + return this.lineBuffers.getOutputBuffer();
1.139 + }
1.140 +
1.141 + /**
1.142 + * @return ChannelLineBuffers instance associated with this NNTPConnection.
1.143 + */
1.144 + public ChannelLineBuffers getBuffers()
1.145 + {
1.146 + return this.lineBuffers;
1.147 + }
1.148 +
1.149 + /**
1.150 + * @return true if this connection comes from a local remote address.
1.151 + */
1.152 + public boolean isLocalConnection()
1.153 + {
1.154 + return ((InetSocketAddress)this.channel.socket().getRemoteSocketAddress())
1.155 + .getHostName().equalsIgnoreCase("localhost");
1.156 + }
1.157 +
1.158 + void setWriteSelectionKey(SelectionKey selKey)
1.159 + {
1.160 + this.writeSelKey = selKey;
1.161 + }
1.162 +
1.163 + public void shutdownInput()
1.164 + {
1.165 + try
1.166 + {
1.167 + // Closes the input line of the channel's socket, so no new data
1.168 + // will be received and a timeout can be triggered.
1.169 + this.channel.socket().shutdownInput();
1.170 + }
1.171 + catch(IOException ex)
1.172 + {
1.173 + Log.get().warning("Exception in NNTPConnection.shutdownInput(): " + ex);
1.174 + }
1.175 + }
1.176 +
1.177 + public void shutdownOutput()
1.178 + {
1.179 + cancelTimer.schedule(new TimerTask()
1.180 + {
1.181 + @Override
1.182 + public void run()
1.183 + {
1.184 + try
1.185 + {
1.186 + // Closes the output line of the channel's socket.
1.187 + channel.socket().shutdownOutput();
1.188 + channel.close();
1.189 + }
1.190 + catch(SocketException ex)
1.191 + {
1.192 + // Socket was already disconnected
1.193 + Log.get().info("NNTPConnection.shutdownOutput(): " + ex);
1.194 + }
1.195 + catch(Exception ex)
1.196 + {
1.197 + Log.get().warning("NNTPConnection.shutdownOutput(): " + ex);
1.198 + }
1.199 + }
1.200 + }, 3000);
1.201 + }
1.202 +
1.203 + public SocketChannel getSocketChannel()
1.204 + {
1.205 + return this.channel;
1.206 + }
1.207 +
1.208 + public Article getCurrentArticle()
1.209 + {
1.210 + return this.currentArticle;
1.211 + }
1.212 +
1.213 + public Charset getCurrentCharset()
1.214 + {
1.215 + return this.charset;
1.216 + }
1.217 +
1.218 + /**
1.219 + * @return The currently selected communication channel (not SocketChannel)
1.220 + */
1.221 + public Channel getCurrentChannel()
1.222 + {
1.223 + return this.currentGroup;
1.224 + }
1.225 +
1.226 + public void setCurrentArticle(final Article article)
1.227 + {
1.228 + this.currentArticle = article;
1.229 + }
1.230 +
1.231 + public void setCurrentGroup(final Channel group)
1.232 + {
1.233 + this.currentGroup = group;
1.234 + }
1.235 +
1.236 + public long getLastActivity()
1.237 + {
1.238 + return this.lastActivity;
1.239 + }
1.240 +
1.241 + /**
1.242 + * Due to the readLockGate there is no need to synchronize this method.
1.243 + * @param raw
1.244 + * @throws IllegalArgumentException if raw is null.
1.245 + * @throws IllegalStateException if calling thread does not own the readLock.
1.246 + */
1.247 + void lineReceived(byte[] raw)
1.248 + {
1.249 + if(raw == null)
1.250 + {
1.251 + throw new IllegalArgumentException("raw is null");
1.252 + }
1.253 +
1.254 + if(readLock == 0 || readLock != Thread.currentThread().hashCode())
1.255 + {
1.256 + throw new IllegalStateException("readLock not properly set");
1.257 + }
1.258 +
1.259 + this.lastActivity = System.currentTimeMillis();
1.260 +
1.261 + String line = new String(raw, this.charset);
1.262 +
1.263 + // There might be a trailing \r, but trim() is a bad idea
1.264 + // as it removes also leading spaces from long header lines.
1.265 + if(line.endsWith("\r"))
1.266 + {
1.267 + line = line.substring(0, line.length() - 1);
1.268 + raw = Arrays.copyOf(raw, raw.length - 1);
1.269 + }
1.270 +
1.271 + Log.get().fine("<< " + line);
1.272 +
1.273 + if(command == null)
1.274 + {
1.275 + command = parseCommandLine(line);
1.276 + assert command != null;
1.277 + }
1.278 +
1.279 + try
1.280 + {
1.281 + // The command object will process the line we just received
1.282 + try
1.283 + {
1.284 + command.processLine(this, line, raw);
1.285 + }
1.286 + catch(StorageBackendException ex)
1.287 + {
1.288 + Log.get().info("Retry command processing after StorageBackendException");
1.289 +
1.290 + // Try it a second time, so that the backend has time to recover
1.291 + command.processLine(this, line, raw);
1.292 + }
1.293 + }
1.294 + catch(ClosedChannelException ex0)
1.295 + {
1.296 + try
1.297 + {
1.298 + Log.get().info("Connection to " + channel.socket().getRemoteSocketAddress()
1.299 + + " closed: " + ex0);
1.300 + }
1.301 + catch(Exception ex0a)
1.302 + {
1.303 + ex0a.printStackTrace();
1.304 + }
1.305 + }
1.306 + catch(Exception ex1) // This will catch a second StorageBackendException
1.307 + {
1.308 + try
1.309 + {
1.310 + command = null;
1.311 + ex1.printStackTrace();
1.312 + println("500 Internal server error");
1.313 + }
1.314 + catch(Exception ex2)
1.315 + {
1.316 + ex2.printStackTrace();
1.317 + }
1.318 + }
1.319 +
1.320 + if(command == null || command.hasFinished())
1.321 + {
1.322 + command = null;
1.323 + charset = Charset.forName("UTF-8"); // Reset to default
1.324 + }
1.325 + }
1.326 +
1.327 + /**
1.328 + * This method determines the fitting command processing class.
1.329 + * @param line
1.330 + * @return
1.331 + */
1.332 + private Command parseCommandLine(String line)
1.333 + {
1.334 + String cmdStr = line.split(" ")[0];
1.335 + return CommandSelector.getInstance().get(cmdStr);
1.336 + }
1.337 +
1.338 + /**
1.339 + * Puts the given line into the output buffer, adds a newline character
1.340 + * and returns. The method returns immediately and does not block until
1.341 + * the line was sent. If line is longer than 510 octets it is split up in
1.342 + * several lines. Each line is terminated by \r\n (NNTPConnection.NEWLINE).
1.343 + * @param line
1.344 + */
1.345 + public void println(final CharSequence line, final Charset charset)
1.346 + throws IOException
1.347 + {
1.348 + writeToChannel(CharBuffer.wrap(line), charset, line);
1.349 + writeToChannel(CharBuffer.wrap(NEWLINE), charset, null);
1.350 + }
1.351 +
1.352 + /**
1.353 + * Writes the given raw lines to the output buffers and finishes with
1.354 + * a newline character (\r\n).
1.355 + * @param rawLines
1.356 + */
1.357 + public void println(final byte[] rawLines)
1.358 + throws IOException
1.359 + {
1.360 + this.lineBuffers.addOutputBuffer(ByteBuffer.wrap(rawLines));
1.361 + writeToChannel(CharBuffer.wrap(NEWLINE), charset, null);
1.362 + }
1.363 +
1.364 + /**
1.365 + * Encodes the given CharBuffer using the given Charset to a bunch of
1.366 + * ByteBuffers (each 512 bytes large) and enqueues them for writing at the
1.367 + * connected SocketChannel.
1.368 + * @throws java.io.IOException
1.369 + */
1.370 + private void writeToChannel(CharBuffer characters, final Charset charset,
1.371 + CharSequence debugLine)
1.372 + throws IOException
1.373 + {
1.374 + if(!charset.canEncode())
1.375 + {
1.376 + Log.get().severe("FATAL: Charset " + charset + " cannot encode!");
1.377 + return;
1.378 + }
1.379 +
1.380 + // Write characters to output buffers
1.381 + LineEncoder lenc = new LineEncoder(characters, charset);
1.382 + lenc.encode(lineBuffers);
1.383 +
1.384 + enableWriteEvents(debugLine);
1.385 + }
1.386 +
1.387 + private void enableWriteEvents(CharSequence debugLine)
1.388 + {
1.389 + // Enable OP_WRITE events so that the buffers are processed
1.390 + try
1.391 + {
1.392 + this.writeSelKey.interestOps(SelectionKey.OP_WRITE);
1.393 + ChannelWriter.getInstance().getSelector().wakeup();
1.394 + }
1.395 + catch(Exception ex) // CancelledKeyException and ChannelCloseException
1.396 + {
1.397 + Log.get().warning("NNTPConnection.writeToChannel(): " + ex);
1.398 + return;
1.399 + }
1.400 +
1.401 + // Update last activity timestamp
1.402 + this.lastActivity = System.currentTimeMillis();
1.403 + if(debugLine != null)
1.404 + {
1.405 + Log.get().fine(">> " + debugLine);
1.406 + }
1.407 + }
1.408 +
1.409 + public void println(final CharSequence line)
1.410 + throws IOException
1.411 + {
1.412 + println(line, charset);
1.413 + }
1.414 +
1.415 + public void print(final String line)
1.416 + throws IOException
1.417 + {
1.418 + writeToChannel(CharBuffer.wrap(line), charset, line);
1.419 + }
1.420 +
1.421 + public void setCurrentCharset(final Charset charset)
1.422 + {
1.423 + this.charset = charset;
1.424 + }
1.425 +
1.426 + void setLastActivity(long timestamp)
1.427 + {
1.428 + this.lastActivity = timestamp;
1.429 + }
1.430 +
1.431 +}