Drupal: bez dědičnosti, implementujeme rovnou rozhraní (nelze dědit).
3 * see AUTHORS for the list of contributors
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 package org.sonews.feed;
21 import java.io.BufferedReader;
22 import java.io.IOException;
23 import java.io.InputStreamReader;
24 import java.io.PrintWriter;
25 import java.net.Socket;
26 import java.net.SocketException;
27 import java.net.UnknownHostException;
28 import java.util.ArrayList;
29 import java.util.HashMap;
30 import java.util.HashSet;
31 import java.util.List;
34 import java.util.logging.Level;
35 import org.sonews.config.Config;
36 import org.sonews.daemon.AbstractDaemon;
37 import org.sonews.util.Log;
38 import org.sonews.storage.StorageBackendException;
39 import org.sonews.storage.StorageManager;
40 import org.sonews.util.Stats;
41 import org.sonews.util.io.ArticleReader;
42 import org.sonews.util.io.ArticleWriter;
45 * The PullFeeder class regularily checks another Newsserver for new
47 * @author Christian Lins
50 class PullFeeder extends AbstractDaemon
53 private Map<Subscription, Integer> highMarks = new HashMap<Subscription, Integer>();
54 private BufferedReader in;
55 private PrintWriter out;
56 private Set<Subscription> subscriptions = new HashSet<Subscription>();
58 private void addSubscription(final Subscription sub)
60 subscriptions.add(sub);
62 if (!highMarks.containsKey(sub)) {
63 // Set a initial highMark
64 this.highMarks.put(sub, 0);
69 * Changes to the given group and returns its high mark.
73 private int changeGroup(String groupName)
76 this.out.print("GROUP " + groupName + "\r\n");
79 String line = this.in.readLine();
80 if (line.startsWith("211 ")) {
81 int highmark = Integer.parseInt(line.split(" ")[3]);
84 throw new IOException("GROUP " + groupName + " returned: " + line);
88 private void connectTo(final String host, final int port)
89 throws IOException, UnknownHostException
91 Socket socket = new Socket(host, port);
92 this.out = new PrintWriter(socket.getOutputStream());
93 this.in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
95 String line = in.readLine();
96 if (!(line.charAt(0) == '2')) // Could be 200 or 2xx if posting is not allowed
98 throw new IOException(line);
101 // Send MODE READER to peer, some newsservers are friendlier then
102 this.out.println("MODE READER\r\n");
104 line = this.in.readLine();
107 private void disconnect()
110 this.out.print("QUIT\r\n");
120 * Uses the OVER or XOVER command to get a list of message overviews that
121 * may be unknown to this feeder and are about to be peered.
124 * @return A list of message ids with potentially interesting messages.
126 private List<String> over(int start, int end)
129 this.out.print("OVER " + start + "-" + end + "\r\n");
132 String line = this.in.readLine();
133 if (line.startsWith("500 ")) // OVER not supported
135 this.out.print("XOVER " + start + "-" + end + "\r\n");
138 line = this.in.readLine();
141 if (line.startsWith("224 ")) {
142 List<String> messages = new ArrayList<String>();
143 line = this.in.readLine();
144 while (!".".equals(line)) {
145 String mid = line.split("\t")[4]; // 5th should be the Message-ID
147 line = this.in.readLine();
151 throw new IOException("Server return for OVER/XOVER: " + line);
158 while (isRunning()) {
159 int pullInterval = 1000
160 * Config.inst().get(Config.FEED_PULLINTERVAL, 3600);
161 String host = "localhost";
164 Log.get().info("Start PullFeeder run...");
167 this.subscriptions.clear();
168 List<Subscription> subsPull = StorageManager.current().getSubscriptions(FeedManager.TYPE_PULL);
169 for (Subscription sub : subsPull) {
170 addSubscription(sub);
172 } catch (StorageBackendException ex) {
173 Log.get().log(Level.SEVERE, host, ex);
177 for (Subscription sub : this.subscriptions) {
178 host = sub.getHost();
179 port = sub.getPort();
182 Log.get().info("Feeding " + sub.getGroup() + " from " + sub.getHost());
184 connectTo(host, port);
185 } catch (SocketException ex) {
186 Log.get().info("Skipping " + sub.getHost() + ": " + ex);
190 int oldMark = this.highMarks.get(sub);
191 int newMark = changeGroup(sub.getGroup());
193 if (oldMark != newMark) {
194 List<String> messageIDs = over(oldMark, newMark);
196 for (String messageID : messageIDs) {
197 if (!StorageManager.current().isArticleExisting(messageID)) {
199 // Post the message via common socket connection
200 ArticleReader aread =
201 new ArticleReader(sub.getHost(), sub.getPort(), messageID);
202 byte[] abuf = aread.getArticleData();
204 Log.get().warning("Could not feed " + messageID
205 + " from " + sub.getHost());
207 Log.get().info("Feeding " + messageID);
208 ArticleWriter awrite = new ArticleWriter(
209 "localhost", Config.inst().get(Config.PORT, 119));
210 awrite.writeArticle(abuf);
213 Stats.getInstance().mailFeeded(sub.getGroup());
214 } catch (IOException ex) {
215 // There may be a temporary network failure
216 ex.printStackTrace();
217 Log.get().warning("Skipping mail " + messageID + " due to exception.");
221 this.highMarks.put(sub, newMark);
225 } catch (StorageBackendException ex) {
226 ex.printStackTrace();
227 } catch (IOException ex) {
228 ex.printStackTrace();
229 Log.get().severe("PullFeeder run stopped due to exception.");
231 } // for(Subscription sub : subscriptions)
233 Log.get().info("PullFeeder run ended. Waiting " + pullInterval / 1000 + "s");
234 Thread.sleep(pullInterval);
235 } catch (InterruptedException ex) {
236 Log.get().warning(ex.getMessage());