src/org/sonews/feed/PullFeeder.java
author cli
Tue, 07 Jun 2011 09:23:34 +0200
changeset 43 7d0e65712a95
parent 37 74139325d305
permissions -rwxr-xr-x
Adapt config sample to use hsqldb
chris@1
     1
/*
chris@1
     2
 *   SONEWS News Server
chris@1
     3
 *   see AUTHORS for the list of contributors
chris@1
     4
 *
chris@1
     5
 *   This program is free software: you can redistribute it and/or modify
chris@1
     6
 *   it under the terms of the GNU General Public License as published by
chris@1
     7
 *   the Free Software Foundation, either version 3 of the License, or
chris@1
     8
 *   (at your option) any later version.
chris@1
     9
 *
chris@1
    10
 *   This program is distributed in the hope that it will be useful,
chris@1
    11
 *   but WITHOUT ANY WARRANTY; without even the implied warranty of
chris@1
    12
 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
chris@1
    13
 *   GNU General Public License for more details.
chris@1
    14
 *
chris@1
    15
 *   You should have received a copy of the GNU General Public License
chris@1
    16
 *   along with this program.  If not, see <http://www.gnu.org/licenses/>.
chris@1
    17
 */
chris@1
    18
chris@1
    19
package org.sonews.feed;
chris@1
    20
chris@1
    21
import java.io.BufferedReader;
chris@1
    22
import java.io.IOException;
chris@1
    23
import java.io.InputStreamReader;
chris@1
    24
import java.io.PrintWriter;
chris@1
    25
import java.net.Socket;
chris@1
    26
import java.net.SocketException;
chris@1
    27
import java.net.UnknownHostException;
chris@1
    28
import java.util.ArrayList;
chris@1
    29
import java.util.HashMap;
cli@22
    30
import java.util.HashSet;
chris@1
    31
import java.util.List;
chris@1
    32
import java.util.Map;
cli@22
    33
import java.util.Set;
cli@22
    34
import java.util.logging.Level;
chris@3
    35
import org.sonews.config.Config;
cli@22
    36
import org.sonews.daemon.AbstractDaemon;
chris@1
    37
import org.sonews.util.Log;
chris@3
    38
import org.sonews.storage.StorageBackendException;
chris@3
    39
import org.sonews.storage.StorageManager;
chris@1
    40
import org.sonews.util.Stats;
chris@1
    41
import org.sonews.util.io.ArticleReader;
chris@1
    42
import org.sonews.util.io.ArticleWriter;
chris@1
    43
chris@1
    44
/**
chris@1
    45
 * The PullFeeder class regularily checks another Newsserver for new
chris@1
    46
 * messages.
chris@1
    47
 * @author Christian Lins
chris@1
    48
 * @since sonews/0.5.0
chris@1
    49
 */
cli@22
    50
class PullFeeder extends AbstractDaemon
chris@1
    51
{
cli@22
    52
cli@37
    53
	private Map<Subscription, Integer> highMarks = new HashMap<Subscription, Integer>();
cli@37
    54
	private BufferedReader in;
cli@37
    55
	private PrintWriter out;
cli@37
    56
	private Set<Subscription> subscriptions = new HashSet<Subscription>();
chris@1
    57
cli@37
    58
	private void addSubscription(final Subscription sub)
cli@37
    59
	{
cli@37
    60
		subscriptions.add(sub);
cli@7
    61
cli@37
    62
		if (!highMarks.containsKey(sub)) {
cli@37
    63
			// Set a initial highMark
cli@37
    64
			this.highMarks.put(sub, 0);
cli@37
    65
		}
cli@37
    66
	}
chris@1
    67
cli@37
    68
	/**
cli@37
    69
	 * Changes to the given group and returns its high mark.
cli@37
    70
	 * @param groupName
cli@37
    71
	 * @return
cli@37
    72
	 */
cli@37
    73
	private int changeGroup(String groupName)
cli@37
    74
		throws IOException
cli@37
    75
	{
cli@37
    76
		this.out.print("GROUP " + groupName + "\r\n");
cli@37
    77
		this.out.flush();
cli@22
    78
cli@37
    79
		String line = this.in.readLine();
cli@37
    80
		if (line.startsWith("211 ")) {
cli@37
    81
			int highmark = Integer.parseInt(line.split(" ")[3]);
cli@37
    82
			return highmark;
cli@37
    83
		} else {
cli@37
    84
			throw new IOException("GROUP " + groupName + " returned: " + line);
cli@37
    85
		}
cli@37
    86
	}
chris@1
    87
cli@37
    88
	private void connectTo(final String host, final int port)
cli@37
    89
		throws IOException, UnknownHostException
cli@37
    90
	{
cli@37
    91
		Socket socket = new Socket(host, port);
cli@37
    92
		this.out = new PrintWriter(socket.getOutputStream());
cli@37
    93
		this.in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
chris@1
    94
cli@37
    95
		String line = in.readLine();
cli@37
    96
		if (!(line.charAt(0) == '2')) // Could be 200 or 2xx if posting is not allowed
cli@37
    97
		{
cli@37
    98
			throw new IOException(line);
cli@37
    99
		}
cli@37
   100
cli@37
   101
		// Send MODE READER to peer, some newsservers are friendlier then
cli@37
   102
		this.out.println("MODE READER\r\n");
cli@37
   103
		this.out.flush();
cli@37
   104
		line = this.in.readLine();
cli@37
   105
	}
cli@37
   106
cli@37
   107
	private void disconnect()
cli@37
   108
		throws IOException
cli@37
   109
	{
cli@37
   110
		this.out.print("QUIT\r\n");
cli@37
   111
		this.out.flush();
cli@37
   112
		this.out.close();
cli@37
   113
		this.in.close();
cli@37
   114
cli@37
   115
		this.out = null;
cli@37
   116
		this.in = null;
cli@37
   117
	}
cli@37
   118
cli@37
   119
	/**
cli@37
   120
	 * Uses the OVER or XOVER command to get a list of message overviews that
cli@37
   121
	 * may be unknown to this feeder and are about to be peered.
cli@37
   122
	 * @param start
cli@37
   123
	 * @param end
cli@37
   124
	 * @return A list of message ids with potentially interesting messages.
cli@37
   125
	 */
cli@37
   126
	private List<String> over(int start, int end)
cli@37
   127
		throws IOException
cli@37
   128
	{
cli@37
   129
		this.out.print("OVER " + start + "-" + end + "\r\n");
cli@37
   130
		this.out.flush();
cli@37
   131
cli@37
   132
		String line = this.in.readLine();
cli@37
   133
		if (line.startsWith("500 ")) // OVER not supported
cli@37
   134
		{
cli@37
   135
			this.out.print("XOVER " + start + "-" + end + "\r\n");
cli@37
   136
			this.out.flush();
cli@37
   137
cli@37
   138
			line = this.in.readLine();
cli@37
   139
		}
cli@37
   140
cli@37
   141
		if (line.startsWith("224 ")) {
cli@37
   142
			List<String> messages = new ArrayList<String>();
cli@37
   143
			line = this.in.readLine();
cli@37
   144
			while (!".".equals(line)) {
cli@37
   145
				String mid = line.split("\t")[4]; // 5th should be the Message-ID
cli@37
   146
				messages.add(mid);
cli@37
   147
				line = this.in.readLine();
cli@37
   148
			}
cli@37
   149
			return messages;
cli@37
   150
		} else {
cli@37
   151
			throw new IOException("Server return for OVER/XOVER: " + line);
cli@37
   152
		}
cli@37
   153
	}
cli@37
   154
cli@37
   155
	@Override
cli@37
   156
	public void run()
cli@37
   157
	{
cli@37
   158
		while (isRunning()) {
cli@37
   159
			int pullInterval = 1000
cli@37
   160
				* Config.inst().get(Config.FEED_PULLINTERVAL, 3600);
cli@37
   161
			String host = "localhost";
cli@37
   162
			int port = 119;
cli@37
   163
cli@37
   164
			Log.get().info("Start PullFeeder run...");
cli@37
   165
cli@37
   166
			try {
cli@37
   167
				this.subscriptions.clear();
cli@37
   168
				List<Subscription> subsPull = StorageManager.current().getSubscriptions(FeedManager.TYPE_PULL);
cli@37
   169
				for (Subscription sub : subsPull) {
cli@37
   170
					addSubscription(sub);
cli@37
   171
				}
cli@37
   172
			} catch (StorageBackendException ex) {
cli@37
   173
				Log.get().log(Level.SEVERE, host, ex);
cli@37
   174
			}
cli@37
   175
cli@37
   176
			try {
cli@37
   177
				for (Subscription sub : this.subscriptions) {
cli@37
   178
					host = sub.getHost();
cli@37
   179
					port = sub.getPort();
cli@37
   180
cli@37
   181
					try {
cli@37
   182
						Log.get().info("Feeding " + sub.getGroup() + " from " + sub.getHost());
cli@37
   183
						try {
cli@37
   184
							connectTo(host, port);
cli@37
   185
						} catch (SocketException ex) {
cli@37
   186
							Log.get().info("Skipping " + sub.getHost() + ": " + ex);
cli@37
   187
							continue;
cli@37
   188
						}
cli@37
   189
cli@37
   190
						int oldMark = this.highMarks.get(sub);
cli@37
   191
						int newMark = changeGroup(sub.getGroup());
cli@37
   192
cli@37
   193
						if (oldMark != newMark) {
cli@37
   194
							List<String> messageIDs = over(oldMark, newMark);
cli@37
   195
cli@37
   196
							for (String messageID : messageIDs) {
cli@37
   197
								if (!StorageManager.current().isArticleExisting(messageID)) {
cli@37
   198
									try {
cli@37
   199
										// Post the message via common socket connection
cli@37
   200
										ArticleReader aread =
cli@37
   201
											new ArticleReader(sub.getHost(), sub.getPort(), messageID);
cli@37
   202
										byte[] abuf = aread.getArticleData();
cli@37
   203
										if (abuf == null) {
cli@37
   204
											Log.get().warning("Could not feed " + messageID
cli@37
   205
												+ " from " + sub.getHost());
cli@37
   206
										} else {
cli@37
   207
											Log.get().info("Feeding " + messageID);
cli@37
   208
											ArticleWriter awrite = new ArticleWriter(
cli@37
   209
												"localhost", Config.inst().get(Config.PORT, 119));
cli@37
   210
											awrite.writeArticle(abuf);
cli@37
   211
											awrite.close();
cli@37
   212
										}
cli@37
   213
										Stats.getInstance().mailFeeded(sub.getGroup());
cli@37
   214
									} catch (IOException ex) {
cli@37
   215
										// There may be a temporary network failure
cli@37
   216
										ex.printStackTrace();
cli@37
   217
										Log.get().warning("Skipping mail " + messageID + " due to exception.");
cli@37
   218
									}
cli@37
   219
								}
cli@37
   220
							} // for(;;)
cli@37
   221
							this.highMarks.put(sub, newMark);
cli@37
   222
						}
cli@37
   223
cli@37
   224
						disconnect();
cli@37
   225
					} catch (StorageBackendException ex) {
cli@37
   226
						ex.printStackTrace();
cli@37
   227
					} catch (IOException ex) {
cli@37
   228
						ex.printStackTrace();
cli@37
   229
						Log.get().severe("PullFeeder run stopped due to exception.");
cli@37
   230
					}
cli@37
   231
				} // for(Subscription sub : subscriptions)
cli@37
   232
cli@37
   233
				Log.get().info("PullFeeder run ended. Waiting " + pullInterval / 1000 + "s");
cli@37
   234
				Thread.sleep(pullInterval);
cli@37
   235
			} catch (InterruptedException ex) {
cli@37
   236
				Log.get().warning(ex.getMessage());
cli@37
   237
			}
cli@37
   238
		}
cli@37
   239
	}
chris@1
   240
}