org/sonews/daemon/NNTPConnection.java
author cli
Sun, 29 Aug 2010 17:04:25 +0200
changeset 34 9f0b95aafaa3
parent 30 146b3275b792
permissions -rw-r--r--
Merge heads.
     1 /*
     2  *   SONEWS News Server
     3  *   see AUTHORS for the list of contributors
     4  *
     5  *   This program is free software: you can redistribute it and/or modify
     6  *   it under the terms of the GNU General Public License as published by
     7  *   the Free Software Foundation, either version 3 of the License, or
     8  *   (at your option) any later version.
     9  *
    10  *   This program is distributed in the hope that it will be useful,
    11  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
    12  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    13  *   GNU General Public License for more details.
    14  *
    15  *   You should have received a copy of the GNU General Public License
    16  *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
    17  */
    18 
    19 package org.sonews.daemon;
    20 
    21 import java.io.IOException;
    22 import java.net.InetSocketAddress;
    23 import java.net.SocketException;
    24 import java.nio.ByteBuffer;
    25 import java.nio.CharBuffer;
    26 import java.nio.channels.ClosedChannelException;
    27 import java.nio.channels.SelectionKey;
    28 import java.nio.channels.SocketChannel;
    29 import java.nio.charset.Charset;
    30 import java.util.Arrays;
    31 import java.util.Timer;
    32 import java.util.TimerTask;
    33 import org.sonews.daemon.command.Command;
    34 import org.sonews.storage.Article;
    35 import org.sonews.storage.Channel;
    36 import org.sonews.storage.StorageBackendException;
    37 import org.sonews.util.Log;
    38 import org.sonews.util.Stats;
    39 
    40 /**
    41  * For every SocketChannel (so TCP/IP connection) there is an instance of
    42  * this class.
    43  * @author Christian Lins
    44  * @since sonews/0.5.0
    45  */
    46 public final class NNTPConnection
    47 {
    48 
    49   public static final String NEWLINE            = "\r\n";    // RFC defines this as newline
    50   public static final String MESSAGE_ID_PATTERN = "<[^>]+>";
    51   
    52   private static final Timer cancelTimer = new Timer(true); // Thread-safe? True for run as daemon
    53   
    54   /** SocketChannel is generally thread-safe */
    55   private SocketChannel   channel        = null;
    56   private Charset         charset        = Charset.forName("UTF-8");
    57   private Command         command        = null;
    58   private Article         currentArticle = null;
    59   private Channel         currentGroup   = null;
    60   private volatile long   lastActivity   = System.currentTimeMillis();
    61   private ChannelLineBuffers lineBuffers = new ChannelLineBuffers();
    62   private int             readLock       = 0;
    63   private final Object    readLockGate   = new Object();
    64   private SelectionKey    writeSelKey    = null;
    65   
    66   public NNTPConnection(final SocketChannel channel)
    67     throws IOException
    68   {
    69     if(channel == null)
    70     {
    71       throw new IllegalArgumentException("channel is null");
    72     }
    73 
    74     this.channel = channel;
    75     Stats.getInstance().clientConnect();
    76   }
    77   
    78   /**
    79    * Tries to get the read lock for this NNTPConnection. This method is Thread-
    80    * safe and returns true of the read lock was successfully set. If the lock
    81    * is still hold by another Thread the method returns false.
    82    */
    83   boolean tryReadLock()
    84   {
    85     // As synchronizing simple types may cause deadlocks,
    86     // we use a gate object.
    87     synchronized(readLockGate)
    88     {
    89       if(readLock != 0)
    90       {
    91         return false;
    92       }
    93       else
    94       {
    95         readLock = Thread.currentThread().hashCode();
    96         return true;
    97       }
    98     }
    99   }
   100   
   101   /**
   102    * Releases the read lock in a Thread-safe way.
   103    * @throws IllegalMonitorStateException if a Thread not holding the lock
   104    * tries to release it.
   105    */
   106   void unlockReadLock()
   107   {
   108     synchronized(readLockGate)
   109     {
   110       if(readLock == Thread.currentThread().hashCode())
   111       {
   112         readLock = 0;
   113       }
   114       else
   115       {
   116         throw new IllegalMonitorStateException();
   117       }
   118     }
   119   }
   120   
   121   /**
   122    * @return Current input buffer of this NNTPConnection instance.
   123    */
   124   public ByteBuffer getInputBuffer()
   125   {
   126     return this.lineBuffers.getInputBuffer();
   127   }
   128   
   129   /**
   130    * @return Output buffer of this NNTPConnection which has at least one byte
   131    * free storage.
   132    */
   133   public ByteBuffer getOutputBuffer()
   134   {
   135     return this.lineBuffers.getOutputBuffer();
   136   }
   137   
   138   /**
   139    * @return ChannelLineBuffers instance associated with this NNTPConnection.
   140    */
   141   public ChannelLineBuffers getBuffers()
   142   {
   143     return this.lineBuffers;
   144   }
   145   
   146   /**
   147    * @return true if this connection comes from a local remote address.
   148    */
   149   public boolean isLocalConnection()
   150   {
   151     return ((InetSocketAddress)this.channel.socket().getRemoteSocketAddress())
   152       .getHostName().equalsIgnoreCase("localhost");
   153   }
   154 
   155   void setWriteSelectionKey(SelectionKey selKey)
   156   {
   157     this.writeSelKey = selKey;
   158   }
   159 
   160   public void shutdownInput()
   161   {
   162     try
   163     {
   164       // Closes the input line of the channel's socket, so no new data
   165       // will be received and a timeout can be triggered.
   166       this.channel.socket().shutdownInput();
   167     }
   168     catch(IOException ex)
   169     {
   170       Log.get().warning("Exception in NNTPConnection.shutdownInput(): " + ex);
   171     }
   172   }
   173   
   174   public void shutdownOutput()
   175   {
   176     cancelTimer.schedule(new TimerTask() 
   177     {
   178       @Override
   179       public void run()
   180       {
   181         try
   182         {
   183           // Closes the output line of the channel's socket.
   184           channel.socket().shutdownOutput();
   185           channel.close();
   186         }
   187         catch(SocketException ex)
   188         {
   189           // Socket was already disconnected
   190           Log.get().info("NNTPConnection.shutdownOutput(): " + ex);
   191         }
   192         catch(Exception ex)
   193         {
   194           Log.get().warning("NNTPConnection.shutdownOutput(): " + ex);
   195         }
   196       }
   197     }, 3000);
   198   }
   199   
   200   public SocketChannel getSocketChannel()
   201   {
   202     return this.channel;
   203   }
   204   
   205   public Article getCurrentArticle()
   206   {
   207     return this.currentArticle;
   208   }
   209   
   210   public Charset getCurrentCharset()
   211   {
   212     return this.charset;
   213   }
   214 
   215   /**
   216    * @return The currently selected communication channel (not SocketChannel)
   217    */
   218   public Channel getCurrentChannel()
   219   {
   220     return this.currentGroup;
   221   }
   222   
   223   public void setCurrentArticle(final Article article)
   224   {
   225     this.currentArticle = article;
   226   }
   227   
   228   public void setCurrentGroup(final Channel group)
   229   {
   230     this.currentGroup = group;
   231   }
   232   
   233   public long getLastActivity()
   234   {
   235     return this.lastActivity;
   236   }
   237   
   238   /**
   239    * Due to the readLockGate there is no need to synchronize this method.
   240    * @param raw
   241    * @throws IllegalArgumentException if raw is null.
   242    * @throws IllegalStateException if calling thread does not own the readLock.
   243    */
   244   void lineReceived(byte[] raw)
   245   {
   246     if(raw == null)
   247     {
   248       throw new IllegalArgumentException("raw is null");
   249     }
   250     
   251     if(readLock == 0 || readLock != Thread.currentThread().hashCode())
   252     {
   253       throw new IllegalStateException("readLock not properly set");
   254     }
   255 
   256     this.lastActivity = System.currentTimeMillis();
   257     
   258     String line = new String(raw, this.charset);
   259     
   260     // There might be a trailing \r, but trim() is a bad idea
   261     // as it removes also leading spaces from long header lines.
   262     if(line.endsWith("\r"))
   263     {
   264       line = line.substring(0, line.length() - 1);
   265       raw  = Arrays.copyOf(raw, raw.length - 1);
   266     }
   267     
   268     Log.get().fine("<< " + line);
   269     
   270     if(command == null)
   271     {
   272       command = parseCommandLine(line);
   273       assert command != null;
   274     }
   275 
   276     try
   277     {
   278       // The command object will process the line we just received
   279       try
   280       {
   281         command.processLine(this, line, raw);
   282       }
   283       catch(StorageBackendException ex)
   284       {
   285         Log.get().info("Retry command processing after StorageBackendException");
   286 
   287         // Try it a second time, so that the backend has time to recover
   288         command.processLine(this, line, raw);
   289       }
   290     }
   291     catch(ClosedChannelException ex0)
   292     {
   293       try
   294       {
   295         Log.get().info("Connection to " + channel.socket().getRemoteSocketAddress()
   296             + " closed: " + ex0);
   297       }
   298       catch(Exception ex0a)
   299       {
   300         ex0a.printStackTrace();
   301       }
   302     }
   303     catch(Exception ex1) // This will catch a second StorageBackendException
   304     {
   305       try
   306       {
   307         command = null;
   308         ex1.printStackTrace();
   309         println("500 Internal server error");
   310       }
   311       catch(Exception ex2)
   312       {
   313         ex2.printStackTrace();
   314       }
   315     }
   316 
   317     if(command == null || command.hasFinished())
   318     {
   319       command = null;
   320       charset = Charset.forName("UTF-8"); // Reset to default
   321     }
   322   }
   323   
   324   /**
   325    * This method determines the fitting command processing class.
   326    * @param line
   327    * @return
   328    */
   329   private Command parseCommandLine(String line)
   330   {
   331     String cmdStr = line.split(" ")[0];
   332     return CommandSelector.getInstance().get(cmdStr);
   333   }
   334   
   335   /**
   336    * Puts the given line into the output buffer, adds a newline character
   337    * and returns. The method returns immediately and does not block until
   338    * the line was sent. If line is longer than 510 octets it is split up in
   339    * several lines. Each line is terminated by \r\n (NNTPConnection.NEWLINE).
   340    * @param line
   341    */
   342   public void println(final CharSequence line, final Charset charset)
   343     throws IOException
   344   {    
   345     writeToChannel(CharBuffer.wrap(line), charset, line);
   346     writeToChannel(CharBuffer.wrap(NEWLINE), charset, null);
   347   }
   348 
   349   /**
   350    * Writes the given raw lines to the output buffers and finishes with
   351    * a newline character (\r\n).
   352    * @param rawLines
   353    */
   354   public void println(final byte[] rawLines)
   355     throws IOException
   356   {
   357     this.lineBuffers.addOutputBuffer(ByteBuffer.wrap(rawLines));
   358     writeToChannel(CharBuffer.wrap(NEWLINE), charset, null);
   359   }
   360   
   361   /**
   362    * Encodes the given CharBuffer using the given Charset to a bunch of
   363    * ByteBuffers (each 512 bytes large) and enqueues them for writing at the
   364    * connected SocketChannel.
   365    * @throws java.io.IOException
   366    */
   367   private void writeToChannel(CharBuffer characters, final Charset charset,
   368     CharSequence debugLine)
   369     throws IOException
   370   {
   371     if(!charset.canEncode())
   372     {
   373       Log.get().severe("FATAL: Charset " + charset + " cannot encode!");
   374       return;
   375     }
   376     
   377     // Write characters to output buffers
   378     LineEncoder lenc = new LineEncoder(characters, charset);
   379     lenc.encode(lineBuffers);
   380     
   381     enableWriteEvents(debugLine);
   382   }
   383 
   384   private void enableWriteEvents(CharSequence debugLine)
   385   {
   386     // Enable OP_WRITE events so that the buffers are processed
   387     try
   388     {
   389       this.writeSelKey.interestOps(SelectionKey.OP_WRITE);
   390       ChannelWriter.getInstance().getSelector().wakeup();
   391     }
   392     catch(Exception ex) // CancelledKeyException and ChannelCloseException
   393     {
   394       Log.get().warning("NNTPConnection.writeToChannel(): " + ex);
   395       return;
   396     }
   397 
   398     // Update last activity timestamp
   399     this.lastActivity = System.currentTimeMillis();
   400     if(debugLine != null)
   401     {
   402       Log.get().fine(">> " + debugLine);
   403     }
   404   }
   405   
   406   public void println(final CharSequence line)
   407     throws IOException
   408   {
   409     println(line, charset);
   410   }
   411   
   412   public void print(final String line)
   413     throws IOException
   414   {
   415     writeToChannel(CharBuffer.wrap(line), charset, line);
   416   }
   417   
   418   public void setCurrentCharset(final Charset charset)
   419   {
   420     this.charset = charset;
   421   }
   422 
   423   void setLastActivity(long timestamp)
   424   {
   425     this.lastActivity = timestamp;
   426   }
   427   
   428 }