1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
1.2 +++ b/org/sonews/daemon/NNTPConnection.java Fri Jun 26 16:48:50 2009 +0200
1.3 @@ -0,0 +1,480 @@
1.4 +/*
1.5 + * SONEWS News Server
1.6 + * see AUTHORS for the list of contributors
1.7 + *
1.8 + * This program is free software: you can redistribute it and/or modify
1.9 + * it under the terms of the GNU General Public License as published by
1.10 + * the Free Software Foundation, either version 3 of the License, or
1.11 + * (at your option) any later version.
1.12 + *
1.13 + * This program is distributed in the hope that it will be useful,
1.14 + * but WITHOUT ANY WARRANTY; without even the implied warranty of
1.15 + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
1.16 + * GNU General Public License for more details.
1.17 + *
1.18 + * You should have received a copy of the GNU General Public License
1.19 + * along with this program. If not, see <http://www.gnu.org/licenses/>.
1.20 + */
1.21 +
1.22 +package org.sonews.daemon;
1.23 +
1.24 +import org.sonews.util.Log;
1.25 +import java.io.IOException;
1.26 +import java.net.InetSocketAddress;
1.27 +import java.nio.ByteBuffer;
1.28 +import java.nio.CharBuffer;
1.29 +import java.nio.channels.ClosedChannelException;
1.30 +import java.nio.channels.SelectionKey;
1.31 +import java.nio.channels.SocketChannel;
1.32 +import java.nio.charset.Charset;
1.33 +import java.util.Timer;
1.34 +import java.util.TimerTask;
1.35 +import org.sonews.daemon.command.ArticleCommand;
1.36 +import org.sonews.daemon.command.CapabilitiesCommand;
1.37 +import org.sonews.daemon.command.AbstractCommand;
1.38 +import org.sonews.daemon.command.GroupCommand;
1.39 +import org.sonews.daemon.command.HelpCommand;
1.40 +import org.sonews.daemon.command.ListCommand;
1.41 +import org.sonews.daemon.command.ListGroupCommand;
1.42 +import org.sonews.daemon.command.ModeReaderCommand;
1.43 +import org.sonews.daemon.command.NewGroupsCommand;
1.44 +import org.sonews.daemon.command.NextPrevCommand;
1.45 +import org.sonews.daemon.command.OverCommand;
1.46 +import org.sonews.daemon.command.PostCommand;
1.47 +import org.sonews.daemon.command.QuitCommand;
1.48 +import org.sonews.daemon.command.StatCommand;
1.49 +import org.sonews.daemon.command.UnsupportedCommand;
1.50 +import org.sonews.daemon.command.XDaemonCommand;
1.51 +import org.sonews.daemon.command.XPatCommand;
1.52 +import org.sonews.daemon.storage.Article;
1.53 +import org.sonews.daemon.storage.Group;
1.54 +import org.sonews.util.Stats;
1.55 +
1.56 +/**
1.57 + * For every SocketChannel (so TCP/IP connection) there is an instance of
1.58 + * this class.
1.59 + * @author Christian Lins
1.60 + * @since sonews/0.5.0
1.61 + */
1.62 +public final class NNTPConnection
1.63 +{
1.64 +
1.65 + public static final String NEWLINE = "\r\n"; // RFC defines this as newline
1.66 + public static final String MESSAGE_ID_PATTERN = "<[^>]+>";
1.67 +
1.68 + private static final Timer cancelTimer = new Timer(true); // Thread-safe? True for run as daemon
1.69 +
1.70 + /** SocketChannel is generally thread-safe */
1.71 + private SocketChannel channel = null;
1.72 + private Charset charset = Charset.forName("UTF-8");
1.73 + private AbstractCommand command = null;
1.74 + private Article currentArticle = null;
1.75 + private Group currentGroup = null;
1.76 + private volatile long lastActivity = System.currentTimeMillis();
1.77 + private ChannelLineBuffers lineBuffers = new ChannelLineBuffers();
1.78 + private int readLock = 0;
1.79 + private final Object readLockGate = new Object();
1.80 + private SelectionKey writeSelKey = null;
1.81 +
1.82 + public NNTPConnection(final SocketChannel channel)
1.83 + throws IOException
1.84 + {
1.85 + if(channel == null)
1.86 + {
1.87 + throw new IllegalArgumentException("channel is null");
1.88 + }
1.89 +
1.90 + this.channel = channel;
1.91 + Stats.getInstance().clientConnect();
1.92 + }
1.93 +
1.94 + /**
1.95 + * Tries to get the read lock for this NNTPConnection. This method is Thread-
1.96 + * safe and returns true of the read lock was successfully set. If the lock
1.97 + * is still hold by another Thread the method returns false.
1.98 + */
1.99 + boolean tryReadLock()
1.100 + {
1.101 + // As synchronizing simple types may cause deadlocks,
1.102 + // we use a gate object.
1.103 + synchronized(readLockGate)
1.104 + {
1.105 + if(readLock != 0)
1.106 + {
1.107 + return false;
1.108 + }
1.109 + else
1.110 + {
1.111 + readLock = Thread.currentThread().hashCode();
1.112 + return true;
1.113 + }
1.114 + }
1.115 + }
1.116 +
1.117 + /**
1.118 + * Releases the read lock in a Thread-safe way.
1.119 + * @throws IllegalMonitorStateException if a Thread not holding the lock
1.120 + * tries to release it.
1.121 + */
1.122 + void unlockReadLock()
1.123 + {
1.124 + synchronized(readLockGate)
1.125 + {
1.126 + if(readLock == Thread.currentThread().hashCode())
1.127 + {
1.128 + readLock = 0;
1.129 + }
1.130 + else
1.131 + {
1.132 + throw new IllegalMonitorStateException();
1.133 + }
1.134 + }
1.135 + }
1.136 +
1.137 + /**
1.138 + * @return Current input buffer of this NNTPConnection instance.
1.139 + */
1.140 + public ByteBuffer getInputBuffer()
1.141 + {
1.142 + return this.lineBuffers.getInputBuffer();
1.143 + }
1.144 +
1.145 + /**
1.146 + * @return Output buffer of this NNTPConnection which has at least one byte
1.147 + * free storage.
1.148 + */
1.149 + public ByteBuffer getOutputBuffer()
1.150 + {
1.151 + return this.lineBuffers.getOutputBuffer();
1.152 + }
1.153 +
1.154 + /**
1.155 + * @return ChannelLineBuffers instance associated with this NNTPConnection.
1.156 + */
1.157 + public ChannelLineBuffers getBuffers()
1.158 + {
1.159 + return this.lineBuffers;
1.160 + }
1.161 +
1.162 + /**
1.163 + * @return true if this connection comes from a local remote address.
1.164 + */
1.165 + public boolean isLocalConnection()
1.166 + {
1.167 + return ((InetSocketAddress)this.channel.socket().getRemoteSocketAddress())
1.168 + .getHostName().equalsIgnoreCase("localhost");
1.169 + }
1.170 +
1.171 + void setWriteSelectionKey(SelectionKey selKey)
1.172 + {
1.173 + this.writeSelKey = selKey;
1.174 + }
1.175 +
1.176 + public void shutdownInput()
1.177 + {
1.178 + try
1.179 + {
1.180 + // Closes the input line of the channel's socket, so no new data
1.181 + // will be received and a timeout can be triggered.
1.182 + this.channel.socket().shutdownInput();
1.183 + }
1.184 + catch(IOException ex)
1.185 + {
1.186 + Log.msg("Exception in NNTPConnection.shutdownInput(): " + ex, false);
1.187 + if(Log.isDebug())
1.188 + {
1.189 + ex.printStackTrace();
1.190 + }
1.191 + }
1.192 + }
1.193 +
1.194 + public void shutdownOutput()
1.195 + {
1.196 + cancelTimer.schedule(new TimerTask()
1.197 + {
1.198 + @Override
1.199 + public void run()
1.200 + {
1.201 + try
1.202 + {
1.203 + // Closes the output line of the channel's socket.
1.204 + channel.socket().shutdownOutput();
1.205 + channel.close();
1.206 + }
1.207 + catch(Exception ex)
1.208 + {
1.209 + Log.msg("NNTPConnection.shutdownOutput(): " + ex, false);
1.210 + if(Log.isDebug())
1.211 + {
1.212 + ex.printStackTrace();
1.213 + }
1.214 + }
1.215 + }
1.216 + }, 3000);
1.217 + }
1.218 +
1.219 + public SocketChannel getChannel()
1.220 + {
1.221 + return this.channel;
1.222 + }
1.223 +
1.224 + public Article getCurrentArticle()
1.225 + {
1.226 + return this.currentArticle;
1.227 + }
1.228 +
1.229 + public Charset getCurrentCharset()
1.230 + {
1.231 + return this.charset;
1.232 + }
1.233 +
1.234 + public Group getCurrentGroup()
1.235 + {
1.236 + return this.currentGroup;
1.237 + }
1.238 +
1.239 + public void setCurrentArticle(final Article article)
1.240 + {
1.241 + this.currentArticle = article;
1.242 + }
1.243 +
1.244 + public void setCurrentGroup(final Group group)
1.245 + {
1.246 + this.currentGroup = group;
1.247 + }
1.248 +
1.249 + public long getLastActivity()
1.250 + {
1.251 + return this.lastActivity;
1.252 + }
1.253 +
1.254 + /**
1.255 + * Due to the readLockGate there is no need to synchronize this method.
1.256 + * @param raw
1.257 + * @throws IllegalArgumentException if raw is null.
1.258 + * @throws IllegalStateException if calling thread does not own the readLock.
1.259 + */
1.260 + void lineReceived(byte[] raw)
1.261 + {
1.262 + if(raw == null)
1.263 + {
1.264 + throw new IllegalArgumentException("raw is null");
1.265 + }
1.266 +
1.267 + if(readLock == 0 || readLock != Thread.currentThread().hashCode())
1.268 + {
1.269 + throw new IllegalStateException("readLock not properly set");
1.270 + }
1.271 +
1.272 + this.lastActivity = System.currentTimeMillis();
1.273 +
1.274 + String line = new String(raw, this.charset);
1.275 +
1.276 + // There might be a trailing \r, but trim() is a bad idea
1.277 + // as it removes also leading spaces from long header lines.
1.278 + if(line.endsWith("\r"))
1.279 + {
1.280 + line = line.substring(0, line.length() - 1);
1.281 + }
1.282 +
1.283 + Log.msg("<< " + line, true);
1.284 +
1.285 + if(command == null)
1.286 + {
1.287 + command = parseCommandLine(line);
1.288 + assert command != null;
1.289 + }
1.290 +
1.291 + try
1.292 + {
1.293 + // The command object will process the line we just received
1.294 + command.processLine(line);
1.295 + }
1.296 + catch(ClosedChannelException ex0)
1.297 + {
1.298 + try
1.299 + {
1.300 + Log.msg("Connection to " + channel.socket().getRemoteSocketAddress()
1.301 + + " closed: " + ex0, true);
1.302 + }
1.303 + catch(Exception ex0a)
1.304 + {
1.305 + ex0a.printStackTrace();
1.306 + }
1.307 + }
1.308 + catch(Exception ex1)
1.309 + {
1.310 + try
1.311 + {
1.312 + command = null;
1.313 + ex1.printStackTrace();
1.314 + println("500 Internal server error");
1.315 + }
1.316 + catch(Exception ex2)
1.317 + {
1.318 + ex2.printStackTrace();
1.319 + }
1.320 + }
1.321 +
1.322 + if(command == null || command.hasFinished())
1.323 + {
1.324 + command = null;
1.325 + charset = Charset.forName("UTF-8"); // Reset to default
1.326 + }
1.327 + }
1.328 +
1.329 + /**
1.330 + * This method performes several if/elseif constructs to determine the
1.331 + * fitting command object.
1.332 + * TODO: This string comparisons are probably slow!
1.333 + * @param line
1.334 + * @return
1.335 + */
1.336 + private AbstractCommand parseCommandLine(String line)
1.337 + {
1.338 + AbstractCommand cmd = new UnsupportedCommand(this);
1.339 + String cmdStr = line.split(" ")[0];
1.340 +
1.341 + if(cmdStr.equalsIgnoreCase("ARTICLE") ||
1.342 + cmdStr.equalsIgnoreCase("BODY"))
1.343 + {
1.344 + cmd = new ArticleCommand(this);
1.345 + }
1.346 + else if(cmdStr.equalsIgnoreCase("CAPABILITIES"))
1.347 + {
1.348 + cmd = new CapabilitiesCommand(this);
1.349 + }
1.350 + else if(cmdStr.equalsIgnoreCase("GROUP"))
1.351 + {
1.352 + cmd = new GroupCommand(this);
1.353 + }
1.354 + else if(cmdStr.equalsIgnoreCase("HEAD"))
1.355 + {
1.356 + cmd = new ArticleCommand(this);
1.357 + }
1.358 + else if(cmdStr.equalsIgnoreCase("HELP"))
1.359 + {
1.360 + cmd = new HelpCommand(this);
1.361 + }
1.362 + else if(cmdStr.equalsIgnoreCase("LIST"))
1.363 + {
1.364 + cmd = new ListCommand(this);
1.365 + }
1.366 + else if(cmdStr.equalsIgnoreCase("LISTGROUP"))
1.367 + {
1.368 + cmd = new ListGroupCommand(this);
1.369 + }
1.370 + else if(cmdStr.equalsIgnoreCase("MODE"))
1.371 + {
1.372 + cmd = new ModeReaderCommand(this);
1.373 + }
1.374 + else if(cmdStr.equalsIgnoreCase("NEWGROUPS"))
1.375 + {
1.376 + cmd = new NewGroupsCommand(this);
1.377 + }
1.378 + else if(cmdStr.equalsIgnoreCase("NEXT") ||
1.379 + cmdStr.equalsIgnoreCase("PREV"))
1.380 + {
1.381 + cmd = new NextPrevCommand(this);
1.382 + }
1.383 + else if(cmdStr.equalsIgnoreCase("OVER") ||
1.384 + cmdStr.equalsIgnoreCase("XOVER")) // for compatibility with older RFCs
1.385 + {
1.386 + cmd = new OverCommand(this);
1.387 + }
1.388 + else if(cmdStr.equalsIgnoreCase("POST"))
1.389 + {
1.390 + cmd = new PostCommand(this);
1.391 + }
1.392 + else if(cmdStr.equalsIgnoreCase("QUIT"))
1.393 + {
1.394 + cmd = new QuitCommand(this);
1.395 + }
1.396 + else if(cmdStr.equalsIgnoreCase("STAT"))
1.397 + {
1.398 + cmd = new StatCommand(this);
1.399 + }
1.400 + else if(cmdStr.equalsIgnoreCase("XDAEMON"))
1.401 + {
1.402 + cmd = new XDaemonCommand(this);
1.403 + }
1.404 + else if(cmdStr.equalsIgnoreCase("XPAT"))
1.405 + {
1.406 + cmd = new XPatCommand(this);
1.407 + }
1.408 +
1.409 + return cmd;
1.410 + }
1.411 +
1.412 + /**
1.413 + * Puts the given line into the output buffer, adds a newline character
1.414 + * and returns. The method returns immediately and does not block until
1.415 + * the line was sent. If line is longer than 510 octets it is split up in
1.416 + * several lines. Each line is terminated by \r\n (NNTPConnection.NEWLINE).
1.417 + * @param line
1.418 + */
1.419 + public void println(final CharSequence line, final Charset charset)
1.420 + throws IOException
1.421 + {
1.422 + writeToChannel(CharBuffer.wrap(line), charset, line);
1.423 + writeToChannel(CharBuffer.wrap(NEWLINE), charset, null);
1.424 + }
1.425 +
1.426 + /**
1.427 + * Encodes the given CharBuffer using the given Charset to a bunch of
1.428 + * ByteBuffers (each 512 bytes large) and enqueues them for writing at the
1.429 + * connected SocketChannel.
1.430 + * @throws java.io.IOException
1.431 + */
1.432 + private void writeToChannel(CharBuffer characters, final Charset charset,
1.433 + CharSequence debugLine)
1.434 + throws IOException
1.435 + {
1.436 + if(!charset.canEncode())
1.437 + {
1.438 + Log.msg("FATAL: Charset " + charset + " cannot encode!", false);
1.439 + return;
1.440 + }
1.441 +
1.442 + // Write characters to output buffers
1.443 + LineEncoder lenc = new LineEncoder(characters, charset);
1.444 + lenc.encode(lineBuffers);
1.445 +
1.446 + // Enable OP_WRITE events so that the buffers are processed
1.447 + try
1.448 + {
1.449 + this.writeSelKey.interestOps(SelectionKey.OP_WRITE);
1.450 + ChannelWriter.getInstance().getSelector().wakeup();
1.451 + }
1.452 + catch (Exception ex) // CancelledKeyException and ChannelCloseException
1.453 + {
1.454 + Log.msg("NNTPConnection.writeToChannel(): " + ex, false);
1.455 + return;
1.456 + }
1.457 +
1.458 + // Update last activity timestamp
1.459 + this.lastActivity = System.currentTimeMillis();
1.460 + if(debugLine != null)
1.461 + {
1.462 + Log.msg(">> " + debugLine, true);
1.463 + }
1.464 + }
1.465 +
1.466 + public void println(final CharSequence line)
1.467 + throws IOException
1.468 + {
1.469 + println(line, charset);
1.470 + }
1.471 +
1.472 + public void print(final String line)
1.473 + throws IOException
1.474 + {
1.475 + writeToChannel(CharBuffer.wrap(line), charset, line);
1.476 + }
1.477 +
1.478 + public void setCurrentCharset(final Charset charset)
1.479 + {
1.480 + this.charset = charset;
1.481 + }
1.482 +
1.483 +}