src/org/sonews/feed/PullFeeder.java
author cli
Wed, 04 May 2011 18:06:24 +0200
changeset 40 f8894df2d079
parent 37 74139325d305
permissions -rwxr-xr-x
Refactoring.
     1 /*
     2  *   SONEWS News Server
     3  *   see AUTHORS for the list of contributors
     4  *
     5  *   This program is free software: you can redistribute it and/or modify
     6  *   it under the terms of the GNU General Public License as published by
     7  *   the Free Software Foundation, either version 3 of the License, or
     8  *   (at your option) any later version.
     9  *
    10  *   This program is distributed in the hope that it will be useful,
    11  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
    12  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    13  *   GNU General Public License for more details.
    14  *
    15  *   You should have received a copy of the GNU General Public License
    16  *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
    17  */
    18 
    19 package org.sonews.feed;
    20 
    21 import java.io.BufferedReader;
    22 import java.io.IOException;
    23 import java.io.InputStreamReader;
    24 import java.io.PrintWriter;
    25 import java.net.Socket;
    26 import java.net.SocketException;
    27 import java.net.UnknownHostException;
    28 import java.util.ArrayList;
    29 import java.util.HashMap;
    30 import java.util.HashSet;
    31 import java.util.List;
    32 import java.util.Map;
    33 import java.util.Set;
    34 import java.util.logging.Level;
    35 import org.sonews.config.Config;
    36 import org.sonews.daemon.AbstractDaemon;
    37 import org.sonews.util.Log;
    38 import org.sonews.storage.StorageBackendException;
    39 import org.sonews.storage.StorageManager;
    40 import org.sonews.util.Stats;
    41 import org.sonews.util.io.ArticleReader;
    42 import org.sonews.util.io.ArticleWriter;
    43 
    44 /**
    45  * The PullFeeder class regularily checks another Newsserver for new
    46  * messages.
    47  * @author Christian Lins
    48  * @since sonews/0.5.0
    49  */
    50 class PullFeeder extends AbstractDaemon
    51 {
    52 
    53 	private Map<Subscription, Integer> highMarks = new HashMap<Subscription, Integer>();
    54 	private BufferedReader in;
    55 	private PrintWriter out;
    56 	private Set<Subscription> subscriptions = new HashSet<Subscription>();
    57 
    58 	private void addSubscription(final Subscription sub)
    59 	{
    60 		subscriptions.add(sub);
    61 
    62 		if (!highMarks.containsKey(sub)) {
    63 			// Set a initial highMark
    64 			this.highMarks.put(sub, 0);
    65 		}
    66 	}
    67 
    68 	/**
    69 	 * Changes to the given group and returns its high mark.
    70 	 * @param groupName
    71 	 * @return
    72 	 */
    73 	private int changeGroup(String groupName)
    74 		throws IOException
    75 	{
    76 		this.out.print("GROUP " + groupName + "\r\n");
    77 		this.out.flush();
    78 
    79 		String line = this.in.readLine();
    80 		if (line.startsWith("211 ")) {
    81 			int highmark = Integer.parseInt(line.split(" ")[3]);
    82 			return highmark;
    83 		} else {
    84 			throw new IOException("GROUP " + groupName + " returned: " + line);
    85 		}
    86 	}
    87 
    88 	private void connectTo(final String host, final int port)
    89 		throws IOException, UnknownHostException
    90 	{
    91 		Socket socket = new Socket(host, port);
    92 		this.out = new PrintWriter(socket.getOutputStream());
    93 		this.in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
    94 
    95 		String line = in.readLine();
    96 		if (!(line.charAt(0) == '2')) // Could be 200 or 2xx if posting is not allowed
    97 		{
    98 			throw new IOException(line);
    99 		}
   100 
   101 		// Send MODE READER to peer, some newsservers are friendlier then
   102 		this.out.println("MODE READER\r\n");
   103 		this.out.flush();
   104 		line = this.in.readLine();
   105 	}
   106 
   107 	private void disconnect()
   108 		throws IOException
   109 	{
   110 		this.out.print("QUIT\r\n");
   111 		this.out.flush();
   112 		this.out.close();
   113 		this.in.close();
   114 
   115 		this.out = null;
   116 		this.in = null;
   117 	}
   118 
   119 	/**
   120 	 * Uses the OVER or XOVER command to get a list of message overviews that
   121 	 * may be unknown to this feeder and are about to be peered.
   122 	 * @param start
   123 	 * @param end
   124 	 * @return A list of message ids with potentially interesting messages.
   125 	 */
   126 	private List<String> over(int start, int end)
   127 		throws IOException
   128 	{
   129 		this.out.print("OVER " + start + "-" + end + "\r\n");
   130 		this.out.flush();
   131 
   132 		String line = this.in.readLine();
   133 		if (line.startsWith("500 ")) // OVER not supported
   134 		{
   135 			this.out.print("XOVER " + start + "-" + end + "\r\n");
   136 			this.out.flush();
   137 
   138 			line = this.in.readLine();
   139 		}
   140 
   141 		if (line.startsWith("224 ")) {
   142 			List<String> messages = new ArrayList<String>();
   143 			line = this.in.readLine();
   144 			while (!".".equals(line)) {
   145 				String mid = line.split("\t")[4]; // 5th should be the Message-ID
   146 				messages.add(mid);
   147 				line = this.in.readLine();
   148 			}
   149 			return messages;
   150 		} else {
   151 			throw new IOException("Server return for OVER/XOVER: " + line);
   152 		}
   153 	}
   154 
   155 	@Override
   156 	public void run()
   157 	{
   158 		while (isRunning()) {
   159 			int pullInterval = 1000
   160 				* Config.inst().get(Config.FEED_PULLINTERVAL, 3600);
   161 			String host = "localhost";
   162 			int port = 119;
   163 
   164 			Log.get().info("Start PullFeeder run...");
   165 
   166 			try {
   167 				this.subscriptions.clear();
   168 				List<Subscription> subsPull = StorageManager.current().getSubscriptions(FeedManager.TYPE_PULL);
   169 				for (Subscription sub : subsPull) {
   170 					addSubscription(sub);
   171 				}
   172 			} catch (StorageBackendException ex) {
   173 				Log.get().log(Level.SEVERE, host, ex);
   174 			}
   175 
   176 			try {
   177 				for (Subscription sub : this.subscriptions) {
   178 					host = sub.getHost();
   179 					port = sub.getPort();
   180 
   181 					try {
   182 						Log.get().info("Feeding " + sub.getGroup() + " from " + sub.getHost());
   183 						try {
   184 							connectTo(host, port);
   185 						} catch (SocketException ex) {
   186 							Log.get().info("Skipping " + sub.getHost() + ": " + ex);
   187 							continue;
   188 						}
   189 
   190 						int oldMark = this.highMarks.get(sub);
   191 						int newMark = changeGroup(sub.getGroup());
   192 
   193 						if (oldMark != newMark) {
   194 							List<String> messageIDs = over(oldMark, newMark);
   195 
   196 							for (String messageID : messageIDs) {
   197 								if (!StorageManager.current().isArticleExisting(messageID)) {
   198 									try {
   199 										// Post the message via common socket connection
   200 										ArticleReader aread =
   201 											new ArticleReader(sub.getHost(), sub.getPort(), messageID);
   202 										byte[] abuf = aread.getArticleData();
   203 										if (abuf == null) {
   204 											Log.get().warning("Could not feed " + messageID
   205 												+ " from " + sub.getHost());
   206 										} else {
   207 											Log.get().info("Feeding " + messageID);
   208 											ArticleWriter awrite = new ArticleWriter(
   209 												"localhost", Config.inst().get(Config.PORT, 119));
   210 											awrite.writeArticle(abuf);
   211 											awrite.close();
   212 										}
   213 										Stats.getInstance().mailFeeded(sub.getGroup());
   214 									} catch (IOException ex) {
   215 										// There may be a temporary network failure
   216 										ex.printStackTrace();
   217 										Log.get().warning("Skipping mail " + messageID + " due to exception.");
   218 									}
   219 								}
   220 							} // for(;;)
   221 							this.highMarks.put(sub, newMark);
   222 						}
   223 
   224 						disconnect();
   225 					} catch (StorageBackendException ex) {
   226 						ex.printStackTrace();
   227 					} catch (IOException ex) {
   228 						ex.printStackTrace();
   229 						Log.get().severe("PullFeeder run stopped due to exception.");
   230 					}
   231 				} // for(Subscription sub : subscriptions)
   232 
   233 				Log.get().info("PullFeeder run ended. Waiting " + pullInterval / 1000 + "s");
   234 				Thread.sleep(pullInterval);
   235 			} catch (InterruptedException ex) {
   236 				Log.get().warning(ex.getMessage());
   237 			}
   238 		}
   239 	}
   240 }