chris@1: /*
chris@1: * SONEWS News Server
chris@1: * see AUTHORS for the list of contributors
chris@1: *
chris@1: * This program is free software: you can redistribute it and/or modify
chris@1: * it under the terms of the GNU General Public License as published by
chris@1: * the Free Software Foundation, either version 3 of the License, or
chris@1: * (at your option) any later version.
chris@1: *
chris@1: * This program is distributed in the hope that it will be useful,
chris@1: * but WITHOUT ANY WARRANTY; without even the implied warranty of
chris@1: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
chris@1: * GNU General Public License for more details.
chris@1: *
chris@1: * You should have received a copy of the GNU General Public License
chris@1: * along with this program. If not, see .
chris@1: */
chris@1:
chris@1: package org.sonews.feed;
chris@1:
chris@1: import java.io.BufferedReader;
chris@1: import java.io.IOException;
chris@1: import java.io.InputStreamReader;
chris@1: import java.io.PrintWriter;
chris@1: import java.net.Socket;
chris@1: import java.net.SocketException;
chris@1: import java.net.UnknownHostException;
chris@1: import java.util.ArrayList;
chris@1: import java.util.HashMap;
cli@22: import java.util.HashSet;
chris@1: import java.util.List;
chris@1: import java.util.Map;
cli@22: import java.util.Set;
cli@22: import java.util.logging.Level;
chris@3: import org.sonews.config.Config;
cli@22: import org.sonews.daemon.AbstractDaemon;
chris@1: import org.sonews.util.Log;
chris@3: import org.sonews.storage.StorageBackendException;
chris@3: import org.sonews.storage.StorageManager;
chris@1: import org.sonews.util.Stats;
chris@1: import org.sonews.util.io.ArticleReader;
chris@1: import org.sonews.util.io.ArticleWriter;
chris@1:
chris@1: /**
chris@1: * The PullFeeder class regularily checks another Newsserver for new
chris@1: * messages.
chris@1: * @author Christian Lins
chris@1: * @since sonews/0.5.0
chris@1: */
cli@22: class PullFeeder extends AbstractDaemon
chris@1: {
chris@1:
chris@1: private Map highMarks = new HashMap();
chris@1: private BufferedReader in;
chris@1: private PrintWriter out;
cli@22: private Set subscriptions = new HashSet();
chris@1:
cli@22: private void addSubscription(final Subscription sub)
chris@1: {
cli@22: subscriptions.add(sub);
cli@22:
cli@22: if(!highMarks.containsKey(sub))
cli@22: {
cli@22: // Set a initial highMark
cli@22: this.highMarks.put(sub, 0);
cli@22: }
chris@1: }
chris@1:
chris@1: /**
chris@1: * Changes to the given group and returns its high mark.
chris@1: * @param groupName
chris@1: * @return
chris@1: */
chris@1: private int changeGroup(String groupName)
chris@1: throws IOException
chris@1: {
chris@1: this.out.print("GROUP " + groupName + "\r\n");
chris@1: this.out.flush();
chris@1:
chris@1: String line = this.in.readLine();
chris@1: if(line.startsWith("211 "))
chris@1: {
chris@1: int highmark = Integer.parseInt(line.split(" ")[3]);
chris@1: return highmark;
chris@1: }
chris@1: else
chris@1: {
chris@1: throw new IOException("GROUP " + groupName + " returned: " + line);
chris@1: }
chris@1: }
chris@1:
chris@1: private void connectTo(final String host, final int port)
chris@1: throws IOException, UnknownHostException
chris@1: {
chris@1: Socket socket = new Socket(host, port);
chris@1: this.out = new PrintWriter(socket.getOutputStream());
chris@1: this.in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
chris@1:
chris@1: String line = in.readLine();
chris@1: if(!(line.charAt(0) == '2')) // Could be 200 or 2xx if posting is not allowed
chris@1: {
chris@1: throw new IOException(line);
chris@1: }
cli@7:
cli@7: // Send MODE READER to peer, some newsservers are friendlier then
cli@7: this.out.println("MODE READER\r\n");
cli@7: this.out.flush();
cli@7: line = this.in.readLine();
chris@1: }
chris@1:
chris@1: private void disconnect()
chris@1: throws IOException
chris@1: {
chris@1: this.out.print("QUIT\r\n");
chris@1: this.out.flush();
chris@1: this.out.close();
chris@1: this.in.close();
chris@1:
chris@1: this.out = null;
chris@1: this.in = null;
chris@1: }
chris@1:
chris@1: /**
chris@1: * Uses the OVER or XOVER command to get a list of message overviews that
chris@1: * may be unknown to this feeder and are about to be peered.
chris@1: * @param start
chris@1: * @param end
chris@1: * @return A list of message ids with potentially interesting messages.
chris@1: */
chris@1: private List over(int start, int end)
chris@1: throws IOException
chris@1: {
chris@1: this.out.print("OVER " + start + "-" + end + "\r\n");
chris@1: this.out.flush();
chris@1:
chris@1: String line = this.in.readLine();
chris@1: if(line.startsWith("500 ")) // OVER not supported
chris@1: {
chris@1: this.out.print("XOVER " + start + "-" + end + "\r\n");
chris@1: this.out.flush();
chris@1:
chris@1: line = this.in.readLine();
chris@1: }
chris@1:
chris@1: if(line.startsWith("224 "))
chris@1: {
chris@1: List messages = new ArrayList();
chris@1: line = this.in.readLine();
chris@1: while(!".".equals(line))
chris@1: {
chris@1: String mid = line.split("\t")[4]; // 5th should be the Message-ID
chris@1: messages.add(mid);
chris@1: line = this.in.readLine();
chris@1: }
chris@1: return messages;
chris@1: }
chris@1: else
chris@1: {
chris@1: throw new IOException("Server return for OVER/XOVER: " + line);
chris@1: }
chris@1: }
chris@1:
chris@1: @Override
chris@1: public void run()
chris@1: {
chris@1: while(isRunning())
chris@1: {
chris@1: int pullInterval = 1000 *
chris@3: Config.inst().get(Config.FEED_PULLINTERVAL, 3600);
chris@1: String host = "localhost";
chris@1: int port = 119;
chris@1:
cli@15: Log.get().info("Start PullFeeder run...");
chris@1:
chris@1: try
chris@1: {
cli@22: this.subscriptions.clear();
cli@22: List subsPull = StorageManager.current()
cli@22: .getSubscriptions(FeedManager.TYPE_PULL);
cli@22: for(Subscription sub : subsPull)
cli@22: {
cli@22: addSubscription(sub);
cli@22: }
cli@22: }
cli@22: catch(StorageBackendException ex)
cli@22: {
cli@22: Log.get().log(Level.SEVERE, host, ex);
cli@22: }
cli@22:
cli@22: try
cli@22: {
chris@1: for(Subscription sub : this.subscriptions)
chris@1: {
chris@1: host = sub.getHost();
chris@1: port = sub.getPort();
chris@1:
chris@1: try
chris@1: {
cli@15: Log.get().info("Feeding " + sub.getGroup() + " from " + sub.getHost());
chris@1: try
chris@1: {
chris@1: connectTo(host, port);
chris@1: }
chris@1: catch(SocketException ex)
chris@1: {
cli@15: Log.get().info("Skipping " + sub.getHost() + ": " + ex);
chris@1: continue;
chris@1: }
chris@1:
chris@1: int oldMark = this.highMarks.get(sub);
chris@1: int newMark = changeGroup(sub.getGroup());
chris@1:
chris@1: if(oldMark != newMark)
chris@1: {
chris@1: List messageIDs = over(oldMark, newMark);
chris@1:
chris@1: for(String messageID : messageIDs)
chris@1: {
chris@3: if(!StorageManager.current().isArticleExisting(messageID))
chris@1: {
chris@3: try
chris@1: {
chris@3: // Post the message via common socket connection
chris@3: ArticleReader aread =
chris@3: new ArticleReader(sub.getHost(), sub.getPort(), messageID);
chris@3: byte[] abuf = aread.getArticleData();
cli@15: if(abuf == null)
chris@3: {
cli@15: Log.get().warning("Could not feed " + messageID
cli@15: + " from " + sub.getHost());
chris@3: }
chris@3: else
chris@3: {
cli@15: Log.get().info("Feeding " + messageID);
chris@3: ArticleWriter awrite = new ArticleWriter(
chris@3: "localhost", Config.inst().get(Config.PORT, 119));
chris@3: awrite.writeArticle(abuf);
chris@3: awrite.close();
chris@3: }
chris@3: Stats.getInstance().mailFeeded(sub.getGroup());
chris@1: }
chris@3: catch(IOException ex)
chris@1: {
chris@3: // There may be a temporary network failure
chris@3: ex.printStackTrace();
cli@15: Log.get().warning("Skipping mail " + messageID + " due to exception.");
chris@1: }
chris@1: }
chris@1: } // for(;;)
chris@1: this.highMarks.put(sub, newMark);
chris@1: }
chris@1:
chris@1: disconnect();
chris@1: }
chris@3: catch(StorageBackendException ex)
chris@1: {
chris@1: ex.printStackTrace();
chris@1: }
chris@1: catch(IOException ex)
chris@1: {
chris@1: ex.printStackTrace();
cli@15: Log.get().severe("PullFeeder run stopped due to exception.");
chris@1: }
chris@1: } // for(Subscription sub : subscriptions)
chris@1:
cli@15: Log.get().info("PullFeeder run ended. Waiting " + pullInterval / 1000 + "s");
chris@1: Thread.sleep(pullInterval);
chris@1: }
chris@1: catch(InterruptedException ex)
chris@1: {
cli@15: Log.get().warning(ex.getMessage());
chris@1: }
chris@1: }
chris@1: }
chris@1:
chris@1: }