1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
1.2 +++ b/org/sonews/feed/PullFeeder.java Fri Jun 26 16:48:50 2009 +0200
1.3 @@ -0,0 +1,250 @@
1.4 +/*
1.5 + * SONEWS News Server
1.6 + * see AUTHORS for the list of contributors
1.7 + *
1.8 + * This program is free software: you can redistribute it and/or modify
1.9 + * it under the terms of the GNU General Public License as published by
1.10 + * the Free Software Foundation, either version 3 of the License, or
1.11 + * (at your option) any later version.
1.12 + *
1.13 + * This program is distributed in the hope that it will be useful,
1.14 + * but WITHOUT ANY WARRANTY; without even the implied warranty of
1.15 + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
1.16 + * GNU General Public License for more details.
1.17 + *
1.18 + * You should have received a copy of the GNU General Public License
1.19 + * along with this program. If not, see <http://www.gnu.org/licenses/>.
1.20 + */
1.21 +
1.22 +package org.sonews.feed;
1.23 +
1.24 +import java.io.BufferedReader;
1.25 +import java.io.IOException;
1.26 +import java.io.InputStreamReader;
1.27 +import java.io.PrintWriter;
1.28 +import java.net.Socket;
1.29 +import java.net.SocketException;
1.30 +import java.net.UnknownHostException;
1.31 +import java.sql.SQLException;
1.32 +import java.util.ArrayList;
1.33 +import java.util.HashMap;
1.34 +import java.util.List;
1.35 +import java.util.Map;
1.36 +import org.sonews.daemon.Config;
1.37 +import org.sonews.util.Log;
1.38 +import org.sonews.daemon.storage.Database;
1.39 +import org.sonews.util.Stats;
1.40 +import org.sonews.util.io.ArticleReader;
1.41 +import org.sonews.util.io.ArticleWriter;
1.42 +
1.43 +/**
1.44 + * The PullFeeder class regularily checks another Newsserver for new
1.45 + * messages.
1.46 + * @author Christian Lins
1.47 + * @since sonews/0.5.0
1.48 + */
1.49 +class PullFeeder extends AbstractFeeder
1.50 +{
1.51 +
1.52 + private Map<Subscription, Integer> highMarks = new HashMap<Subscription, Integer>();
1.53 + private BufferedReader in;
1.54 + private PrintWriter out;
1.55 +
1.56 + @Override
1.57 + public void addSubscription(final Subscription sub)
1.58 + {
1.59 + super.addSubscription(sub);
1.60 +
1.61 + // Set a initial highMark
1.62 + this.highMarks.put(sub, 0);
1.63 + }
1.64 +
1.65 + /**
1.66 + * Changes to the given group and returns its high mark.
1.67 + * @param groupName
1.68 + * @return
1.69 + */
1.70 + private int changeGroup(String groupName)
1.71 + throws IOException
1.72 + {
1.73 + this.out.print("GROUP " + groupName + "\r\n");
1.74 + this.out.flush();
1.75 +
1.76 + String line = this.in.readLine();
1.77 + if(line.startsWith("211 "))
1.78 + {
1.79 + int highmark = Integer.parseInt(line.split(" ")[3]);
1.80 + return highmark;
1.81 + }
1.82 + else
1.83 + {
1.84 + throw new IOException("GROUP " + groupName + " returned: " + line);
1.85 + }
1.86 + }
1.87 +
1.88 + private void connectTo(final String host, final int port)
1.89 + throws IOException, UnknownHostException
1.90 + {
1.91 + Socket socket = new Socket(host, port);
1.92 + this.out = new PrintWriter(socket.getOutputStream());
1.93 + this.in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
1.94 +
1.95 + String line = in.readLine();
1.96 + if(!(line.charAt(0) == '2')) // Could be 200 or 2xx if posting is not allowed
1.97 + {
1.98 + throw new IOException(line);
1.99 + }
1.100 + }
1.101 +
1.102 + private void disconnect()
1.103 + throws IOException
1.104 + {
1.105 + this.out.print("QUIT\r\n");
1.106 + this.out.flush();
1.107 + this.out.close();
1.108 + this.in.close();
1.109 +
1.110 + this.out = null;
1.111 + this.in = null;
1.112 + }
1.113 +
1.114 + /**
1.115 + * Uses the OVER or XOVER command to get a list of message overviews that
1.116 + * may be unknown to this feeder and are about to be peered.
1.117 + * @param start
1.118 + * @param end
1.119 + * @return A list of message ids with potentially interesting messages.
1.120 + */
1.121 + private List<String> over(int start, int end)
1.122 + throws IOException
1.123 + {
1.124 + this.out.print("OVER " + start + "-" + end + "\r\n");
1.125 + this.out.flush();
1.126 +
1.127 + String line = this.in.readLine();
1.128 + if(line.startsWith("500 ")) // OVER not supported
1.129 + {
1.130 + this.out.print("XOVER " + start + "-" + end + "\r\n");
1.131 + this.out.flush();
1.132 +
1.133 + line = this.in.readLine();
1.134 + }
1.135 +
1.136 + if(line.startsWith("224 "))
1.137 + {
1.138 + List<String> messages = new ArrayList<String>();
1.139 + line = this.in.readLine();
1.140 + while(!".".equals(line))
1.141 + {
1.142 + String mid = line.split("\t")[4]; // 5th should be the Message-ID
1.143 + messages.add(mid);
1.144 + line = this.in.readLine();
1.145 + }
1.146 + return messages;
1.147 + }
1.148 + else
1.149 + {
1.150 + throw new IOException("Server return for OVER/XOVER: " + line);
1.151 + }
1.152 + }
1.153 +
1.154 + @Override
1.155 + public void run()
1.156 + {
1.157 + while(isRunning())
1.158 + {
1.159 + int pullInterval = 1000 *
1.160 + Config.getInstance().get(Config.FEED_PULLINTERVAL, 3600);
1.161 + String host = "localhost";
1.162 + int port = 119;
1.163 +
1.164 + Log.msg("Start PullFeeder run...", true);
1.165 +
1.166 + try
1.167 + {
1.168 + for(Subscription sub : this.subscriptions)
1.169 + {
1.170 + host = sub.getHost();
1.171 + port = sub.getPort();
1.172 +
1.173 + try
1.174 + {
1.175 + Log.msg("Feeding " + sub.getGroup() + " from " + sub.getHost(), true);
1.176 + try
1.177 + {
1.178 + connectTo(host, port);
1.179 + }
1.180 + catch(SocketException ex)
1.181 + {
1.182 + Log.msg("Skipping " + sub.getHost() + ": " + ex, false);
1.183 + continue;
1.184 + }
1.185 +
1.186 + int oldMark = this.highMarks.get(sub);
1.187 + int newMark = changeGroup(sub.getGroup());
1.188 +
1.189 + if(oldMark != newMark)
1.190 + {
1.191 + List<String> messageIDs = over(oldMark, newMark);
1.192 +
1.193 + for(String messageID : messageIDs)
1.194 + {
1.195 + if(Database.getInstance().isArticleExisting(messageID))
1.196 + {
1.197 + continue;
1.198 + }
1.199 +
1.200 + try
1.201 + {
1.202 + // Post the message via common socket connection
1.203 + ArticleReader aread =
1.204 + new ArticleReader(sub.getHost(), sub.getPort(), messageID);
1.205 + byte[] abuf = aread.getArticleData();
1.206 + if (abuf == null)
1.207 + {
1.208 + Log.msg("Could not feed " + messageID + " from " + sub.getHost(), true);
1.209 + }
1.210 + else
1.211 + {
1.212 + Log.msg("Feeding " + messageID, true);
1.213 + ArticleWriter awrite = new ArticleWriter(
1.214 + "localhost", Config.getInstance().get(Config.PORT, 119));
1.215 + awrite.writeArticle(abuf);
1.216 + awrite.close();
1.217 + }
1.218 + Stats.getInstance().mailFeeded(sub.getGroup());
1.219 + }
1.220 + catch(IOException ex)
1.221 + {
1.222 + // There may be a temporary network failure
1.223 + ex.printStackTrace();
1.224 + Log.msg("Skipping mail " + messageID + " due to exception.", false);
1.225 + }
1.226 + } // for(;;)
1.227 + this.highMarks.put(sub, newMark);
1.228 + }
1.229 +
1.230 + disconnect();
1.231 + }
1.232 + catch(SQLException ex)
1.233 + {
1.234 + ex.printStackTrace();
1.235 + }
1.236 + catch(IOException ex)
1.237 + {
1.238 + ex.printStackTrace();
1.239 + Log.msg("PullFeeder run stopped due to exception.", false);
1.240 + }
1.241 + } // for(Subscription sub : subscriptions)
1.242 +
1.243 + Log.msg("PullFeeder run ended. Waiting " + pullInterval / 1000 + "s", true);
1.244 + Thread.sleep(pullInterval);
1.245 + }
1.246 + catch(InterruptedException ex)
1.247 + {
1.248 + Log.msg(ex.getMessage(), false);
1.249 + }
1.250 + }
1.251 + }
1.252 +
1.253 +}