org/sonews/daemon/NNTPConnection.java
author cli
Fri, 25 Dec 2009 15:42:46 +0100
changeset 25 dd05c3f2fa24
parent 15 f2293e8566f5
child 30 146b3275b792
permissions -rw-r--r--
Fix for too early disconnects on slow client connections. (#563)
     1 /*
     2  *   SONEWS News Server
     3  *   see AUTHORS for the list of contributors
     4  *
     5  *   This program is free software: you can redistribute it and/or modify
     6  *   it under the terms of the GNU General Public License as published by
     7  *   the Free Software Foundation, either version 3 of the License, or
     8  *   (at your option) any later version.
     9  *
    10  *   This program is distributed in the hope that it will be useful,
    11  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
    12  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    13  *   GNU General Public License for more details.
    14  *
    15  *   You should have received a copy of the GNU General Public License
    16  *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
    17  */
    18 
    19 package org.sonews.daemon;
    20 
    21 import java.io.IOException;
    22 import java.net.InetSocketAddress;
    23 import java.net.SocketException;
    24 import java.nio.ByteBuffer;
    25 import java.nio.CharBuffer;
    26 import java.nio.channels.ClosedChannelException;
    27 import java.nio.channels.SelectionKey;
    28 import java.nio.channels.SocketChannel;
    29 import java.nio.charset.Charset;
    30 import java.util.Arrays;
    31 import java.util.Timer;
    32 import java.util.TimerTask;
    33 import org.sonews.daemon.command.Command;
    34 import org.sonews.storage.Article;
    35 import org.sonews.storage.Channel;
    36 import org.sonews.util.Log;
    37 import org.sonews.util.Stats;
    38 
    39 /**
    40  * For every SocketChannel (so TCP/IP connection) there is an instance of
    41  * this class.
    42  * @author Christian Lins
    43  * @since sonews/0.5.0
    44  */
    45 public final class NNTPConnection
    46 {
    47 
    48   public static final String NEWLINE            = "\r\n";    // RFC defines this as newline
    49   public static final String MESSAGE_ID_PATTERN = "<[^>]+>";
    50   
    51   private static final Timer cancelTimer = new Timer(true); // Thread-safe? True for run as daemon
    52   
    53   /** SocketChannel is generally thread-safe */
    54   private SocketChannel   channel        = null;
    55   private Charset         charset        = Charset.forName("UTF-8");
    56   private Command         command        = null;
    57   private Article         currentArticle = null;
    58   private Channel         currentGroup   = null;
    59   private volatile long   lastActivity   = System.currentTimeMillis();
    60   private ChannelLineBuffers lineBuffers = new ChannelLineBuffers();
    61   private int             readLock       = 0;
    62   private final Object    readLockGate   = new Object();
    63   private SelectionKey    writeSelKey    = null;
    64   
    65   public NNTPConnection(final SocketChannel channel)
    66     throws IOException
    67   {
    68     if(channel == null)
    69     {
    70       throw new IllegalArgumentException("channel is null");
    71     }
    72 
    73     this.channel = channel;
    74     Stats.getInstance().clientConnect();
    75   }
    76   
    77   /**
    78    * Tries to get the read lock for this NNTPConnection. This method is Thread-
    79    * safe and returns true of the read lock was successfully set. If the lock
    80    * is still hold by another Thread the method returns false.
    81    */
    82   boolean tryReadLock()
    83   {
    84     // As synchronizing simple types may cause deadlocks,
    85     // we use a gate object.
    86     synchronized(readLockGate)
    87     {
    88       if(readLock != 0)
    89       {
    90         return false;
    91       }
    92       else
    93       {
    94         readLock = Thread.currentThread().hashCode();
    95         return true;
    96       }
    97     }
    98   }
    99   
   100   /**
   101    * Releases the read lock in a Thread-safe way.
   102    * @throws IllegalMonitorStateException if a Thread not holding the lock
   103    * tries to release it.
   104    */
   105   void unlockReadLock()
   106   {
   107     synchronized(readLockGate)
   108     {
   109       if(readLock == Thread.currentThread().hashCode())
   110       {
   111         readLock = 0;
   112       }
   113       else
   114       {
   115         throw new IllegalMonitorStateException();
   116       }
   117     }
   118   }
   119   
   120   /**
   121    * @return Current input buffer of this NNTPConnection instance.
   122    */
   123   public ByteBuffer getInputBuffer()
   124   {
   125     return this.lineBuffers.getInputBuffer();
   126   }
   127   
   128   /**
   129    * @return Output buffer of this NNTPConnection which has at least one byte
   130    * free storage.
   131    */
   132   public ByteBuffer getOutputBuffer()
   133   {
   134     return this.lineBuffers.getOutputBuffer();
   135   }
   136   
   137   /**
   138    * @return ChannelLineBuffers instance associated with this NNTPConnection.
   139    */
   140   public ChannelLineBuffers getBuffers()
   141   {
   142     return this.lineBuffers;
   143   }
   144   
   145   /**
   146    * @return true if this connection comes from a local remote address.
   147    */
   148   public boolean isLocalConnection()
   149   {
   150     return ((InetSocketAddress)this.channel.socket().getRemoteSocketAddress())
   151       .getHostName().equalsIgnoreCase("localhost");
   152   }
   153 
   154   void setWriteSelectionKey(SelectionKey selKey)
   155   {
   156     this.writeSelKey = selKey;
   157   }
   158 
   159   public void shutdownInput()
   160   {
   161     try
   162     {
   163       // Closes the input line of the channel's socket, so no new data
   164       // will be received and a timeout can be triggered.
   165       this.channel.socket().shutdownInput();
   166     }
   167     catch(IOException ex)
   168     {
   169       Log.get().warning("Exception in NNTPConnection.shutdownInput(): " + ex);
   170     }
   171   }
   172   
   173   public void shutdownOutput()
   174   {
   175     cancelTimer.schedule(new TimerTask() 
   176     {
   177       @Override
   178       public void run()
   179       {
   180         try
   181         {
   182           // Closes the output line of the channel's socket.
   183           channel.socket().shutdownOutput();
   184           channel.close();
   185         }
   186         catch(SocketException ex)
   187         {
   188           // Socket was already disconnected
   189           Log.get().info("NNTPConnection.shutdownOutput(): " + ex);
   190         }
   191         catch(Exception ex)
   192         {
   193           Log.get().warning("NNTPConnection.shutdownOutput(): " + ex);
   194         }
   195       }
   196     }, 3000);
   197   }
   198   
   199   public SocketChannel getSocketChannel()
   200   {
   201     return this.channel;
   202   }
   203   
   204   public Article getCurrentArticle()
   205   {
   206     return this.currentArticle;
   207   }
   208   
   209   public Charset getCurrentCharset()
   210   {
   211     return this.charset;
   212   }
   213 
   214   /**
   215    * @return The currently selected communication channel (not SocketChannel)
   216    */
   217   public Channel getCurrentChannel()
   218   {
   219     return this.currentGroup;
   220   }
   221   
   222   public void setCurrentArticle(final Article article)
   223   {
   224     this.currentArticle = article;
   225   }
   226   
   227   public void setCurrentGroup(final Channel group)
   228   {
   229     this.currentGroup = group;
   230   }
   231   
   232   public long getLastActivity()
   233   {
   234     return this.lastActivity;
   235   }
   236   
   237   /**
   238    * Due to the readLockGate there is no need to synchronize this method.
   239    * @param raw
   240    * @throws IllegalArgumentException if raw is null.
   241    * @throws IllegalStateException if calling thread does not own the readLock.
   242    */
   243   void lineReceived(byte[] raw)
   244   {
   245     if(raw == null)
   246     {
   247       throw new IllegalArgumentException("raw is null");
   248     }
   249     
   250     if(readLock == 0 || readLock != Thread.currentThread().hashCode())
   251     {
   252       throw new IllegalStateException("readLock not properly set");
   253     }
   254 
   255     this.lastActivity = System.currentTimeMillis();
   256     
   257     String line = new String(raw, this.charset);
   258     
   259     // There might be a trailing \r, but trim() is a bad idea
   260     // as it removes also leading spaces from long header lines.
   261     if(line.endsWith("\r"))
   262     {
   263       line = line.substring(0, line.length() - 1);
   264       raw  = Arrays.copyOf(raw, raw.length - 1);
   265     }
   266     
   267     Log.get().fine("<< " + line);
   268     
   269     if(command == null)
   270     {
   271       command = parseCommandLine(line);
   272       assert command != null;
   273     }
   274 
   275     try
   276     {
   277       // The command object will process the line we just received
   278       command.processLine(this, line, raw);
   279     }
   280     catch(ClosedChannelException ex0)
   281     {
   282       try
   283       {
   284         Log.get().info("Connection to " + channel.socket().getRemoteSocketAddress()
   285             + " closed: " + ex0);
   286       }
   287       catch(Exception ex0a)
   288       {
   289         ex0a.printStackTrace();
   290       }
   291     }
   292     catch(Exception ex1)
   293     {
   294       try
   295       {
   296         command = null;
   297         ex1.printStackTrace();
   298         println("500 Internal server error");
   299       }
   300       catch(Exception ex2)
   301       {
   302         ex2.printStackTrace();
   303       }
   304     }
   305 
   306     if(command == null || command.hasFinished())
   307     {
   308       command = null;
   309       charset = Charset.forName("UTF-8"); // Reset to default
   310     }
   311   }
   312   
   313   /**
   314    * This method determines the fitting command processing class.
   315    * @param line
   316    * @return
   317    */
   318   private Command parseCommandLine(String line)
   319   {
   320     String cmdStr = line.split(" ")[0];
   321     return CommandSelector.getInstance().get(cmdStr);
   322   }
   323   
   324   /**
   325    * Puts the given line into the output buffer, adds a newline character
   326    * and returns. The method returns immediately and does not block until
   327    * the line was sent. If line is longer than 510 octets it is split up in
   328    * several lines. Each line is terminated by \r\n (NNTPConnection.NEWLINE).
   329    * @param line
   330    */
   331   public void println(final CharSequence line, final Charset charset)
   332     throws IOException
   333   {    
   334     writeToChannel(CharBuffer.wrap(line), charset, line);
   335     writeToChannel(CharBuffer.wrap(NEWLINE), charset, null);
   336   }
   337 
   338   /**
   339    * Writes the given raw lines to the output buffers and finishes with
   340    * a newline character (\r\n).
   341    * @param rawLines
   342    */
   343   public void println(final byte[] rawLines)
   344     throws IOException
   345   {
   346     this.lineBuffers.addOutputBuffer(ByteBuffer.wrap(rawLines));
   347     writeToChannel(CharBuffer.wrap(NEWLINE), charset, null);
   348   }
   349   
   350   /**
   351    * Encodes the given CharBuffer using the given Charset to a bunch of
   352    * ByteBuffers (each 512 bytes large) and enqueues them for writing at the
   353    * connected SocketChannel.
   354    * @throws java.io.IOException
   355    */
   356   private void writeToChannel(CharBuffer characters, final Charset charset,
   357     CharSequence debugLine)
   358     throws IOException
   359   {
   360     if(!charset.canEncode())
   361     {
   362       Log.get().severe("FATAL: Charset " + charset + " cannot encode!");
   363       return;
   364     }
   365     
   366     // Write characters to output buffers
   367     LineEncoder lenc = new LineEncoder(characters, charset);
   368     lenc.encode(lineBuffers);
   369     
   370     enableWriteEvents(debugLine);
   371   }
   372 
   373   private void enableWriteEvents(CharSequence debugLine)
   374   {
   375     // Enable OP_WRITE events so that the buffers are processed
   376     try
   377     {
   378       this.writeSelKey.interestOps(SelectionKey.OP_WRITE);
   379       ChannelWriter.getInstance().getSelector().wakeup();
   380     }
   381     catch(Exception ex) // CancelledKeyException and ChannelCloseException
   382     {
   383       Log.get().warning("NNTPConnection.writeToChannel(): " + ex);
   384       return;
   385     }
   386 
   387     // Update last activity timestamp
   388     this.lastActivity = System.currentTimeMillis();
   389     if(debugLine != null)
   390     {
   391       Log.get().fine(">> " + debugLine);
   392     }
   393   }
   394   
   395   public void println(final CharSequence line)
   396     throws IOException
   397   {
   398     println(line, charset);
   399   }
   400   
   401   public void print(final String line)
   402     throws IOException
   403   {
   404     writeToChannel(CharBuffer.wrap(line), charset, line);
   405   }
   406   
   407   public void setCurrentCharset(final Charset charset)
   408   {
   409     this.charset = charset;
   410   }
   411 
   412   void setLastActivity(long timestamp)
   413   {
   414     this.lastActivity = timestamp;
   415   }
   416   
   417 }