src/org/sonews/daemon/NNTPConnection.java
author František Kučera <franta-hg@frantovo.cz>
Thu, 05 Jul 2012 13:19:19 +0200
changeset 119 f5b57e221e38
parent 113 a059aecd1794
permissions -rwxr-xr-x
mezery, tabulátory
chris@1
     1
/*
chris@1
     2
 *   SONEWS News Server
chris@1
     3
 *   see AUTHORS for the list of contributors
chris@1
     4
 *
chris@1
     5
 *   This program is free software: you can redistribute it and/or modify
chris@1
     6
 *   it under the terms of the GNU General Public License as published by
chris@1
     7
 *   the Free Software Foundation, either version 3 of the License, or
chris@1
     8
 *   (at your option) any later version.
chris@1
     9
 *
chris@1
    10
 *   This program is distributed in the hope that it will be useful,
chris@1
    11
 *   but WITHOUT ANY WARRANTY; without even the implied warranty of
chris@1
    12
 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
chris@1
    13
 *   GNU General Public License for more details.
chris@1
    14
 *
chris@1
    15
 *   You should have received a copy of the GNU General Public License
chris@1
    16
 *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
chris@1
    17
 */
chris@1
    18
package org.sonews.daemon;
chris@1
    19
franta-hg@108
    20
import java.io.ByteArrayOutputStream;
chris@1
    21
import java.io.IOException;
chris@1
    22
import java.net.InetSocketAddress;
chris@3
    23
import java.net.SocketException;
chris@1
    24
import java.nio.ByteBuffer;
chris@1
    25
import java.nio.CharBuffer;
chris@1
    26
import java.nio.channels.ClosedChannelException;
chris@1
    27
import java.nio.channels.SelectionKey;
chris@1
    28
import java.nio.channels.SocketChannel;
chris@1
    29
import java.nio.charset.Charset;
chris@3
    30
import java.util.Arrays;
chris@1
    31
import java.util.Timer;
chris@1
    32
import java.util.TimerTask;
cli@49
    33
import java.util.logging.Level;
franta-hg@113
    34
import org.sonews.acl.User;
chris@3
    35
import org.sonews.daemon.command.Command;
chris@3
    36
import org.sonews.storage.Article;
cli@48
    37
import org.sonews.storage.Group;
cli@30
    38
import org.sonews.storage.StorageBackendException;
cli@25
    39
import org.sonews.util.Log;
chris@1
    40
import org.sonews.util.Stats;
franta-hg@108
    41
import org.sonews.util.io.CRLFOutputStream;
franta-hg@108
    42
import org.sonews.util.io.SMTPOutputStream;
chris@1
    43
chris@1
    44
/**
chris@1
    45
 * For every SocketChannel (so TCP/IP connection) there is an instance of
chris@1
    46
 * this class.
chris@1
    47
 * @author Christian Lins
chris@1
    48
 * @since sonews/0.5.0
chris@1
    49
 */
cli@49
    50
public final class NNTPConnection {
chris@1
    51
cli@37
    52
	public static final String NEWLINE = "\r\n";    // RFC defines this as newline
cli@37
    53
	public static final String MESSAGE_ID_PATTERN = "<[^>]+>";
cli@37
    54
	private static final Timer cancelTimer = new Timer(true); // Thread-safe? True for run as daemon
cli@37
    55
	/** SocketChannel is generally thread-safe */
cli@37
    56
	private SocketChannel channel = null;
cli@37
    57
	private Charset charset = Charset.forName("UTF-8");
cli@37
    58
	private Command command = null;
cli@37
    59
	private Article currentArticle = null;
cli@48
    60
	private Group currentGroup = null;
cli@37
    61
	private volatile long lastActivity = System.currentTimeMillis();
cli@37
    62
	private ChannelLineBuffers lineBuffers = new ChannelLineBuffers();
cli@37
    63
	private int readLock = 0;
cli@37
    64
	private final Object readLockGate = new Object();
cli@37
    65
	private SelectionKey writeSelKey = null;
franta-hg@113
    66
	private User user;
chris@1
    67
cli@37
    68
	public NNTPConnection(final SocketChannel channel)
cli@49
    69
			throws IOException {
cli@37
    70
		if (channel == null) {
cli@37
    71
			throw new IllegalArgumentException("channel is null");
cli@37
    72
		}
chris@1
    73
cli@37
    74
		this.channel = channel;
cli@37
    75
		Stats.getInstance().clientConnect();
cli@37
    76
	}
chris@1
    77
cli@37
    78
	/**
cli@37
    79
	 * Tries to get the read lock for this NNTPConnection. This method is Thread-
cli@37
    80
	 * safe and returns true of the read lock was successfully set. If the lock
cli@37
    81
	 * is still hold by another Thread the method returns false.
cli@37
    82
	 */
cli@49
    83
	boolean tryReadLock() {
cli@37
    84
		// As synchronizing simple types may cause deadlocks,
cli@37
    85
		// we use a gate object.
cli@37
    86
		synchronized (readLockGate) {
cli@37
    87
			if (readLock != 0) {
cli@37
    88
				return false;
cli@37
    89
			} else {
cli@37
    90
				readLock = Thread.currentThread().hashCode();
cli@37
    91
				return true;
cli@37
    92
			}
cli@37
    93
		}
cli@37
    94
	}
chris@3
    95
cli@37
    96
	/**
cli@37
    97
	 * Releases the read lock in a Thread-safe way.
cli@37
    98
	 * @throws IllegalMonitorStateException if a Thread not holding the lock
cli@37
    99
	 * tries to release it.
cli@37
   100
	 */
cli@49
   101
	void unlockReadLock() {
cli@37
   102
		synchronized (readLockGate) {
cli@37
   103
			if (readLock == Thread.currentThread().hashCode()) {
cli@37
   104
				readLock = 0;
cli@37
   105
			} else {
cli@37
   106
				throw new IllegalMonitorStateException();
cli@37
   107
			}
cli@37
   108
		}
cli@37
   109
	}
chris@1
   110
cli@37
   111
	/**
cli@37
   112
	 * @return Current input buffer of this NNTPConnection instance.
cli@37
   113
	 */
cli@49
   114
	public ByteBuffer getInputBuffer() {
cli@37
   115
		return this.lineBuffers.getInputBuffer();
cli@37
   116
	}
chris@1
   117
cli@37
   118
	/**
cli@37
   119
	 * @return Output buffer of this NNTPConnection which has at least one byte
cli@37
   120
	 * free storage.
cli@37
   121
	 */
cli@49
   122
	public ByteBuffer getOutputBuffer() {
cli@37
   123
		return this.lineBuffers.getOutputBuffer();
cli@37
   124
	}
cli@30
   125
cli@37
   126
	/**
cli@37
   127
	 * @return ChannelLineBuffers instance associated with this NNTPConnection.
cli@37
   128
	 */
cli@49
   129
	public ChannelLineBuffers getBuffers() {
cli@37
   130
		return this.lineBuffers;
cli@37
   131
	}
chris@1
   132
cli@37
   133
	/**
cli@37
   134
	 * @return true if this connection comes from a local remote address.
cli@37
   135
	 */
cli@49
   136
	public boolean isLocalConnection() {
cli@37
   137
		return ((InetSocketAddress) this.channel.socket().getRemoteSocketAddress()).getHostName().equalsIgnoreCase("localhost");
cli@37
   138
	}
chris@3
   139
cli@49
   140
	void setWriteSelectionKey(SelectionKey selKey) {
cli@37
   141
		this.writeSelKey = selKey;
cli@37
   142
	}
chris@3
   143
cli@49
   144
	public void shutdownInput() {
cli@37
   145
		try {
cli@37
   146
			// Closes the input line of the channel's socket, so no new data
cli@37
   147
			// will be received and a timeout can be triggered.
cli@37
   148
			this.channel.socket().shutdownInput();
cli@37
   149
		} catch (IOException ex) {
franta-hg@113
   150
			Log.get().log(Level.WARNING, "Exception in NNTPConnection.shutdownInput(): {0}", ex);
cli@37
   151
		}
cli@37
   152
	}
chris@1
   153
cli@49
   154
	public void shutdownOutput() {
cli@49
   155
		cancelTimer.schedule(new TimerTask() {
cli@37
   156
			@Override
cli@49
   157
			public void run() {
cli@37
   158
				try {
cli@37
   159
					// Closes the output line of the channel's socket.
cli@37
   160
					channel.socket().shutdownOutput();
cli@37
   161
					channel.close();
cli@37
   162
				} catch (SocketException ex) {
cli@37
   163
					// Socket was already disconnected
franta-hg@113
   164
					Log.get().log(Level.INFO, "NNTPConnection.shutdownOutput(): {0}", ex);
cli@37
   165
				} catch (Exception ex) {
franta-hg@113
   166
					Log.get().log(Level.WARNING, "NNTPConnection.shutdownOutput(): {0}", ex);
cli@37
   167
				}
cli@37
   168
			}
cli@37
   169
		}, 3000);
cli@37
   170
	}
cli@37
   171
cli@49
   172
	public SocketChannel getSocketChannel() {
cli@37
   173
		return this.channel;
cli@37
   174
	}
cli@37
   175
cli@49
   176
	public Article getCurrentArticle() {
cli@37
   177
		return this.currentArticle;
cli@37
   178
	}
cli@37
   179
cli@49
   180
	public Charset getCurrentCharset() {
cli@37
   181
		return this.charset;
cli@37
   182
	}
cli@37
   183
cli@37
   184
	/**
cli@37
   185
	 * @return The currently selected communication channel (not SocketChannel)
cli@37
   186
	 */
cli@49
   187
	public Group getCurrentChannel() {
cli@37
   188
		return this.currentGroup;
cli@37
   189
	}
cli@37
   190
cli@49
   191
	public void setCurrentArticle(final Article article) {
cli@37
   192
		this.currentArticle = article;
cli@37
   193
	}
cli@37
   194
cli@49
   195
	public void setCurrentGroup(final Group group) {
cli@37
   196
		this.currentGroup = group;
cli@37
   197
	}
cli@37
   198
cli@49
   199
	public long getLastActivity() {
cli@37
   200
		return this.lastActivity;
cli@37
   201
	}
cli@37
   202
cli@37
   203
	/**
cli@37
   204
	 * Due to the readLockGate there is no need to synchronize this method.
cli@37
   205
	 * @param raw
cli@37
   206
	 * @throws IllegalArgumentException if raw is null.
cli@37
   207
	 * @throws IllegalStateException if calling thread does not own the readLock.
cli@37
   208
	 */
cli@49
   209
	void lineReceived(byte[] raw) {
cli@37
   210
		if (raw == null) {
cli@37
   211
			throw new IllegalArgumentException("raw is null");
cli@37
   212
		}
cli@37
   213
cli@37
   214
		if (readLock == 0 || readLock != Thread.currentThread().hashCode()) {
cli@37
   215
			throw new IllegalStateException("readLock not properly set");
cli@37
   216
		}
cli@37
   217
cli@37
   218
		this.lastActivity = System.currentTimeMillis();
cli@37
   219
cli@37
   220
		String line = new String(raw, this.charset);
cli@37
   221
cli@37
   222
		// There might be a trailing \r, but trim() is a bad idea
cli@37
   223
		// as it removes also leading spaces from long header lines.
cli@37
   224
		if (line.endsWith("\r")) {
cli@37
   225
			line = line.substring(0, line.length() - 1);
cli@37
   226
			raw = Arrays.copyOf(raw, raw.length - 1);
cli@37
   227
		}
cli@37
   228
franta-hg@113
   229
		Log.get().log(Level.FINE, "<< {0}", line);
cli@37
   230
cli@37
   231
		if (command == null) {
cli@37
   232
			command = parseCommandLine(line);
cli@37
   233
			assert command != null;
cli@37
   234
		}
cli@37
   235
cli@37
   236
		try {
cli@37
   237
			// The command object will process the line we just received
cli@37
   238
			try {
cli@37
   239
				command.processLine(this, line, raw);
cli@37
   240
			} catch (StorageBackendException ex) {
cli@37
   241
				Log.get().info("Retry command processing after StorageBackendException");
cli@37
   242
cli@37
   243
				// Try it a second time, so that the backend has time to recover
cli@37
   244
				command.processLine(this, line, raw);
cli@37
   245
			}
cli@37
   246
		} catch (ClosedChannelException ex0) {
cli@37
   247
			try {
cli@49
   248
				StringBuilder strBuf = new StringBuilder();
cli@49
   249
				strBuf.append("Connection to ");
cli@49
   250
				strBuf.append(channel.socket().getRemoteSocketAddress());
cli@49
   251
				strBuf.append(" closed: ");
cli@49
   252
				strBuf.append(ex0);
cli@49
   253
				Log.get().info(strBuf.toString());
cli@37
   254
			} catch (Exception ex0a) {
cli@37
   255
				ex0a.printStackTrace();
cli@37
   256
			}
cli@49
   257
		} catch (Exception ex1) { // This will catch a second StorageBackendException
cli@37
   258
			try {
cli@37
   259
				command = null;
cli@49
   260
				Log.get().log(Level.WARNING, ex1.getLocalizedMessage(), ex1);
cli@49
   261
				println("403 Internal server error");
cli@49
   262
cli@49
   263
				// Should we end the connection here?
cli@49
   264
				// RFC says we MUST return 400 before closing the connection
cli@49
   265
				shutdownInput();
cli@49
   266
				shutdownOutput();
cli@37
   267
			} catch (Exception ex2) {
cli@37
   268
				ex2.printStackTrace();
cli@37
   269
			}
cli@37
   270
		}
cli@37
   271
cli@37
   272
		if (command == null || command.hasFinished()) {
cli@37
   273
			command = null;
cli@37
   274
			charset = Charset.forName("UTF-8"); // Reset to default
cli@37
   275
		}
cli@37
   276
	}
cli@37
   277
cli@37
   278
	/**
cli@37
   279
	 * This method determines the fitting command processing class.
cli@37
   280
	 * @param line
cli@37
   281
	 * @return
cli@37
   282
	 */
cli@49
   283
	private Command parseCommandLine(String line) {
cli@37
   284
		String cmdStr = line.split(" ")[0];
cli@37
   285
		return CommandSelector.getInstance().get(cmdStr);
cli@37
   286
	}
cli@37
   287
cli@37
   288
	/**
cli@37
   289
	 * Puts the given line into the output buffer, adds a newline character
cli@37
   290
	 * and returns. The method returns immediately and does not block until
cli@37
   291
	 * the line was sent. If line is longer than 510 octets it is split up in
cli@37
   292
	 * several lines. Each line is terminated by \r\n (NNTPConnection.NEWLINE).
cli@37
   293
	 * @param line
cli@37
   294
	 */
cli@37
   295
	public void println(final CharSequence line, final Charset charset)
cli@49
   296
			throws IOException {
cli@37
   297
		writeToChannel(CharBuffer.wrap(line), charset, line);
cli@37
   298
		writeToChannel(CharBuffer.wrap(NEWLINE), charset, null);
cli@37
   299
	}
cli@37
   300
cli@37
   301
	/**
cli@37
   302
	 * Writes the given raw lines to the output buffers and finishes with
cli@37
   303
	 * a newline character (\r\n).
cli@37
   304
	 * @param rawLines
cli@37
   305
	 */
cli@37
   306
	public void println(final byte[] rawLines)
cli@49
   307
			throws IOException {
cli@37
   308
		this.lineBuffers.addOutputBuffer(ByteBuffer.wrap(rawLines));
cli@37
   309
		writeToChannel(CharBuffer.wrap(NEWLINE), charset, null);
cli@37
   310
	}
franta-hg@108
   311
	
franta-hg@108
   312
	/**
franta-hg@108
   313
	 * Same as {@link #println(byte[]) } but escapes lines containing single dot,
franta-hg@108
   314
	 * which has special meaning in protocol (end of message).
franta-hg@108
   315
	 * 
franta-hg@108
   316
	 * This method is safe to be used for writing messages – if message contains 
franta-hg@108
   317
	 * a line with single dot, it will be doubled and thus not interpreted 
franta-hg@108
   318
	 * by NNTP client as end of message
franta-hg@108
   319
	 * 
franta-hg@108
   320
	 * @param rawLines
franta-hg@108
   321
	 * @throws IOException 
franta-hg@108
   322
	 */
franta-hg@108
   323
	public void printlnEscapeDots(final byte[] rawLines) throws IOException {
franta-hg@108
   324
		// TODO: optimalizace
franta-hg@108
   325
		
franta-hg@108
   326
		ByteArrayOutputStream baos = new ByteArrayOutputStream(rawLines.length + 10);
franta-hg@108
   327
		CRLFOutputStream crlfStream = new CRLFOutputStream(baos);
franta-hg@108
   328
		SMTPOutputStream smtpStream = new SMTPOutputStream(crlfStream);
franta-hg@108
   329
		smtpStream.write(rawLines);
franta-hg@108
   330
		
franta-hg@108
   331
		println(baos.toByteArray());
franta-hg@108
   332
		
franta-hg@108
   333
		smtpStream.close();
franta-hg@108
   334
	}
cli@37
   335
cli@37
   336
	/**
cli@37
   337
	 * Encodes the given CharBuffer using the given Charset to a bunch of
cli@37
   338
	 * ByteBuffers (each 512 bytes large) and enqueues them for writing at the
cli@37
   339
	 * connected SocketChannel.
cli@37
   340
	 * @throws java.io.IOException
cli@37
   341
	 */
cli@37
   342
	private void writeToChannel(CharBuffer characters, final Charset charset,
cli@49
   343
			CharSequence debugLine)
cli@49
   344
			throws IOException {
cli@37
   345
		if (!charset.canEncode()) {
franta-hg@113
   346
			Log.get().log(Level.SEVERE, "FATAL: Charset {0} cannot encode!", charset);
cli@37
   347
			return;
cli@37
   348
		}
cli@37
   349
cli@37
   350
		// Write characters to output buffers
cli@37
   351
		LineEncoder lenc = new LineEncoder(characters, charset);
cli@37
   352
		lenc.encode(lineBuffers);
cli@37
   353
cli@37
   354
		enableWriteEvents(debugLine);
cli@37
   355
	}
cli@37
   356
cli@49
   357
	private void enableWriteEvents(CharSequence debugLine) {
cli@37
   358
		// Enable OP_WRITE events so that the buffers are processed
cli@37
   359
		try {
cli@37
   360
			this.writeSelKey.interestOps(SelectionKey.OP_WRITE);
cli@37
   361
			ChannelWriter.getInstance().getSelector().wakeup();
cli@37
   362
		} catch (Exception ex) // CancelledKeyException and ChannelCloseException
cli@37
   363
		{
franta-hg@113
   364
			Log.get().log(Level.WARNING, "NNTPConnection.writeToChannel(): {0}", ex);
cli@37
   365
			return;
cli@37
   366
		}
cli@37
   367
cli@37
   368
		// Update last activity timestamp
cli@37
   369
		this.lastActivity = System.currentTimeMillis();
cli@37
   370
		if (debugLine != null) {
franta-hg@113
   371
			Log.get().log(Level.FINE, ">> {0}", debugLine);
cli@37
   372
		}
cli@37
   373
	}
cli@37
   374
cli@37
   375
	public void println(final CharSequence line)
cli@49
   376
			throws IOException {
cli@37
   377
		println(line, charset);
cli@37
   378
	}
cli@37
   379
cli@37
   380
	public void print(final String line)
cli@49
   381
			throws IOException {
cli@37
   382
		writeToChannel(CharBuffer.wrap(line), charset, line);
cli@37
   383
	}
cli@37
   384
cli@49
   385
	public void setCurrentCharset(final Charset charset) {
cli@37
   386
		this.charset = charset;
cli@37
   387
	}
cli@37
   388
cli@49
   389
	void setLastActivity(long timestamp) {
cli@37
   390
		this.lastActivity = timestamp;
cli@37
   391
	}
franta-hg@101
   392
franta-hg@101
   393
	/**
franta-hg@113
   394
	 * @return Currently logged user (but you should check {@link User#isAuthenticated()}, if user is athenticated, or we just trust him)
franta-hg@101
   395
	 */
franta-hg@113
   396
	public User getUser() {
franta-hg@113
   397
		return user;
franta-hg@101
   398
	}
franta-hg@101
   399
franta-hg@101
   400
	/**
franta-hg@101
   401
	 * This method is to be called from AUTHINFO USER Command implementation.
franta-hg@101
   402
	 * @param username username from AUTHINFO USER username.
franta-hg@101
   403
	 */
franta-hg@113
   404
	public void setUser(User user) {
franta-hg@113
   405
		this.user = user;
franta-hg@101
   406
	}
chris@1
   407
}