1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
1.2 +++ b/src/org/sonews/feed/PullFeeder.java Sun Aug 29 17:28:58 2010 +0200
1.3 @@ -0,0 +1,276 @@
1.4 +/*
1.5 + * SONEWS News Server
1.6 + * see AUTHORS for the list of contributors
1.7 + *
1.8 + * This program is free software: you can redistribute it and/or modify
1.9 + * it under the terms of the GNU General Public License as published by
1.10 + * the Free Software Foundation, either version 3 of the License, or
1.11 + * (at your option) any later version.
1.12 + *
1.13 + * This program is distributed in the hope that it will be useful,
1.14 + * but WITHOUT ANY WARRANTY; without even the implied warranty of
1.15 + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
1.16 + * GNU General Public License for more details.
1.17 + *
1.18 + * You should have received a copy of the GNU General Public License
1.19 + * along with this program. If not, see <http://www.gnu.org/licenses/>.
1.20 + */
1.21 +
1.22 +package org.sonews.feed;
1.23 +
1.24 +import java.io.BufferedReader;
1.25 +import java.io.IOException;
1.26 +import java.io.InputStreamReader;
1.27 +import java.io.PrintWriter;
1.28 +import java.net.Socket;
1.29 +import java.net.SocketException;
1.30 +import java.net.UnknownHostException;
1.31 +import java.util.ArrayList;
1.32 +import java.util.HashMap;
1.33 +import java.util.HashSet;
1.34 +import java.util.List;
1.35 +import java.util.Map;
1.36 +import java.util.Set;
1.37 +import java.util.logging.Level;
1.38 +import org.sonews.config.Config;
1.39 +import org.sonews.daemon.AbstractDaemon;
1.40 +import org.sonews.util.Log;
1.41 +import org.sonews.storage.StorageBackendException;
1.42 +import org.sonews.storage.StorageManager;
1.43 +import org.sonews.util.Stats;
1.44 +import org.sonews.util.io.ArticleReader;
1.45 +import org.sonews.util.io.ArticleWriter;
1.46 +
1.47 +/**
1.48 + * The PullFeeder class regularily checks another Newsserver for new
1.49 + * messages.
1.50 + * @author Christian Lins
1.51 + * @since sonews/0.5.0
1.52 + */
1.53 +class PullFeeder extends AbstractDaemon
1.54 +{
1.55 +
1.56 + private Map<Subscription, Integer> highMarks = new HashMap<Subscription, Integer>();
1.57 + private BufferedReader in;
1.58 + private PrintWriter out;
1.59 + private Set<Subscription> subscriptions = new HashSet<Subscription>();
1.60 +
1.61 + private void addSubscription(final Subscription sub)
1.62 + {
1.63 + subscriptions.add(sub);
1.64 +
1.65 + if(!highMarks.containsKey(sub))
1.66 + {
1.67 + // Set a initial highMark
1.68 + this.highMarks.put(sub, 0);
1.69 + }
1.70 + }
1.71 +
1.72 + /**
1.73 + * Changes to the given group and returns its high mark.
1.74 + * @param groupName
1.75 + * @return
1.76 + */
1.77 + private int changeGroup(String groupName)
1.78 + throws IOException
1.79 + {
1.80 + this.out.print("GROUP " + groupName + "\r\n");
1.81 + this.out.flush();
1.82 +
1.83 + String line = this.in.readLine();
1.84 + if(line.startsWith("211 "))
1.85 + {
1.86 + int highmark = Integer.parseInt(line.split(" ")[3]);
1.87 + return highmark;
1.88 + }
1.89 + else
1.90 + {
1.91 + throw new IOException("GROUP " + groupName + " returned: " + line);
1.92 + }
1.93 + }
1.94 +
1.95 + private void connectTo(final String host, final int port)
1.96 + throws IOException, UnknownHostException
1.97 + {
1.98 + Socket socket = new Socket(host, port);
1.99 + this.out = new PrintWriter(socket.getOutputStream());
1.100 + this.in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
1.101 +
1.102 + String line = in.readLine();
1.103 + if(!(line.charAt(0) == '2')) // Could be 200 or 2xx if posting is not allowed
1.104 + {
1.105 + throw new IOException(line);
1.106 + }
1.107 +
1.108 + // Send MODE READER to peer, some newsservers are friendlier then
1.109 + this.out.println("MODE READER\r\n");
1.110 + this.out.flush();
1.111 + line = this.in.readLine();
1.112 + }
1.113 +
1.114 + private void disconnect()
1.115 + throws IOException
1.116 + {
1.117 + this.out.print("QUIT\r\n");
1.118 + this.out.flush();
1.119 + this.out.close();
1.120 + this.in.close();
1.121 +
1.122 + this.out = null;
1.123 + this.in = null;
1.124 + }
1.125 +
1.126 + /**
1.127 + * Uses the OVER or XOVER command to get a list of message overviews that
1.128 + * may be unknown to this feeder and are about to be peered.
1.129 + * @param start
1.130 + * @param end
1.131 + * @return A list of message ids with potentially interesting messages.
1.132 + */
1.133 + private List<String> over(int start, int end)
1.134 + throws IOException
1.135 + {
1.136 + this.out.print("OVER " + start + "-" + end + "\r\n");
1.137 + this.out.flush();
1.138 +
1.139 + String line = this.in.readLine();
1.140 + if(line.startsWith("500 ")) // OVER not supported
1.141 + {
1.142 + this.out.print("XOVER " + start + "-" + end + "\r\n");
1.143 + this.out.flush();
1.144 +
1.145 + line = this.in.readLine();
1.146 + }
1.147 +
1.148 + if(line.startsWith("224 "))
1.149 + {
1.150 + List<String> messages = new ArrayList<String>();
1.151 + line = this.in.readLine();
1.152 + while(!".".equals(line))
1.153 + {
1.154 + String mid = line.split("\t")[4]; // 5th should be the Message-ID
1.155 + messages.add(mid);
1.156 + line = this.in.readLine();
1.157 + }
1.158 + return messages;
1.159 + }
1.160 + else
1.161 + {
1.162 + throw new IOException("Server return for OVER/XOVER: " + line);
1.163 + }
1.164 + }
1.165 +
1.166 + @Override
1.167 + public void run()
1.168 + {
1.169 + while(isRunning())
1.170 + {
1.171 + int pullInterval = 1000 *
1.172 + Config.inst().get(Config.FEED_PULLINTERVAL, 3600);
1.173 + String host = "localhost";
1.174 + int port = 119;
1.175 +
1.176 + Log.get().info("Start PullFeeder run...");
1.177 +
1.178 + try
1.179 + {
1.180 + this.subscriptions.clear();
1.181 + List<Subscription> subsPull = StorageManager.current()
1.182 + .getSubscriptions(FeedManager.TYPE_PULL);
1.183 + for(Subscription sub : subsPull)
1.184 + {
1.185 + addSubscription(sub);
1.186 + }
1.187 + }
1.188 + catch(StorageBackendException ex)
1.189 + {
1.190 + Log.get().log(Level.SEVERE, host, ex);
1.191 + }
1.192 +
1.193 + try
1.194 + {
1.195 + for(Subscription sub : this.subscriptions)
1.196 + {
1.197 + host = sub.getHost();
1.198 + port = sub.getPort();
1.199 +
1.200 + try
1.201 + {
1.202 + Log.get().info("Feeding " + sub.getGroup() + " from " + sub.getHost());
1.203 + try
1.204 + {
1.205 + connectTo(host, port);
1.206 + }
1.207 + catch(SocketException ex)
1.208 + {
1.209 + Log.get().info("Skipping " + sub.getHost() + ": " + ex);
1.210 + continue;
1.211 + }
1.212 +
1.213 + int oldMark = this.highMarks.get(sub);
1.214 + int newMark = changeGroup(sub.getGroup());
1.215 +
1.216 + if(oldMark != newMark)
1.217 + {
1.218 + List<String> messageIDs = over(oldMark, newMark);
1.219 +
1.220 + for(String messageID : messageIDs)
1.221 + {
1.222 + if(!StorageManager.current().isArticleExisting(messageID))
1.223 + {
1.224 + try
1.225 + {
1.226 + // Post the message via common socket connection
1.227 + ArticleReader aread =
1.228 + new ArticleReader(sub.getHost(), sub.getPort(), messageID);
1.229 + byte[] abuf = aread.getArticleData();
1.230 + if(abuf == null)
1.231 + {
1.232 + Log.get().warning("Could not feed " + messageID
1.233 + + " from " + sub.getHost());
1.234 + }
1.235 + else
1.236 + {
1.237 + Log.get().info("Feeding " + messageID);
1.238 + ArticleWriter awrite = new ArticleWriter(
1.239 + "localhost", Config.inst().get(Config.PORT, 119));
1.240 + awrite.writeArticle(abuf);
1.241 + awrite.close();
1.242 + }
1.243 + Stats.getInstance().mailFeeded(sub.getGroup());
1.244 + }
1.245 + catch(IOException ex)
1.246 + {
1.247 + // There may be a temporary network failure
1.248 + ex.printStackTrace();
1.249 + Log.get().warning("Skipping mail " + messageID + " due to exception.");
1.250 + }
1.251 + }
1.252 + } // for(;;)
1.253 + this.highMarks.put(sub, newMark);
1.254 + }
1.255 +
1.256 + disconnect();
1.257 + }
1.258 + catch(StorageBackendException ex)
1.259 + {
1.260 + ex.printStackTrace();
1.261 + }
1.262 + catch(IOException ex)
1.263 + {
1.264 + ex.printStackTrace();
1.265 + Log.get().severe("PullFeeder run stopped due to exception.");
1.266 + }
1.267 + } // for(Subscription sub : subscriptions)
1.268 +
1.269 + Log.get().info("PullFeeder run ended. Waiting " + pullInterval / 1000 + "s");
1.270 + Thread.sleep(pullInterval);
1.271 + }
1.272 + catch(InterruptedException ex)
1.273 + {
1.274 + Log.get().warning(ex.getMessage());
1.275 + }
1.276 + }
1.277 + }
1.278 +
1.279 +}