3 * see AUTHORS for the list of contributors
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 package org.sonews.feed;
21 import java.io.BufferedReader;
22 import java.io.IOException;
23 import java.io.InputStreamReader;
24 import java.io.PrintWriter;
25 import java.net.Socket;
26 import java.net.SocketException;
27 import java.net.UnknownHostException;
28 import java.util.ArrayList;
29 import java.util.HashMap;
30 import java.util.List;
32 import org.sonews.config.Config;
33 import org.sonews.util.Log;
34 import org.sonews.storage.StorageBackendException;
35 import org.sonews.storage.StorageManager;
36 import org.sonews.util.Stats;
37 import org.sonews.util.io.ArticleReader;
38 import org.sonews.util.io.ArticleWriter;
41 * The PullFeeder class regularily checks another Newsserver for new
43 * @author Christian Lins
46 class PullFeeder extends AbstractFeeder
49 private Map<Subscription, Integer> highMarks = new HashMap<Subscription, Integer>();
50 private BufferedReader in;
51 private PrintWriter out;
54 public void addSubscription(final Subscription sub)
56 super.addSubscription(sub);
58 // Set a initial highMark
59 this.highMarks.put(sub, 0);
63 * Changes to the given group and returns its high mark.
67 private int changeGroup(String groupName)
70 this.out.print("GROUP " + groupName + "\r\n");
73 String line = this.in.readLine();
74 if(line.startsWith("211 "))
76 int highmark = Integer.parseInt(line.split(" ")[3]);
81 throw new IOException("GROUP " + groupName + " returned: " + line);
85 private void connectTo(final String host, final int port)
86 throws IOException, UnknownHostException
88 Socket socket = new Socket(host, port);
89 this.out = new PrintWriter(socket.getOutputStream());
90 this.in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
92 String line = in.readLine();
93 if(!(line.charAt(0) == '2')) // Could be 200 or 2xx if posting is not allowed
95 throw new IOException(line);
99 private void disconnect()
102 this.out.print("QUIT\r\n");
112 * Uses the OVER or XOVER command to get a list of message overviews that
113 * may be unknown to this feeder and are about to be peered.
116 * @return A list of message ids with potentially interesting messages.
118 private List<String> over(int start, int end)
121 this.out.print("OVER " + start + "-" + end + "\r\n");
124 String line = this.in.readLine();
125 if(line.startsWith("500 ")) // OVER not supported
127 this.out.print("XOVER " + start + "-" + end + "\r\n");
130 line = this.in.readLine();
133 if(line.startsWith("224 "))
135 List<String> messages = new ArrayList<String>();
136 line = this.in.readLine();
137 while(!".".equals(line))
139 String mid = line.split("\t")[4]; // 5th should be the Message-ID
141 line = this.in.readLine();
147 throw new IOException("Server return for OVER/XOVER: " + line);
156 int pullInterval = 1000 *
157 Config.inst().get(Config.FEED_PULLINTERVAL, 3600);
158 String host = "localhost";
161 Log.msg("Start PullFeeder run...", true);
165 for(Subscription sub : this.subscriptions)
167 host = sub.getHost();
168 port = sub.getPort();
172 Log.msg("Feeding " + sub.getGroup() + " from " + sub.getHost(), true);
175 connectTo(host, port);
177 catch(SocketException ex)
179 Log.msg("Skipping " + sub.getHost() + ": " + ex, false);
183 int oldMark = this.highMarks.get(sub);
184 int newMark = changeGroup(sub.getGroup());
186 if(oldMark != newMark)
188 List<String> messageIDs = over(oldMark, newMark);
190 for(String messageID : messageIDs)
192 if(!StorageManager.current().isArticleExisting(messageID))
196 // Post the message via common socket connection
197 ArticleReader aread =
198 new ArticleReader(sub.getHost(), sub.getPort(), messageID);
199 byte[] abuf = aread.getArticleData();
202 Log.msg("Could not feed " + messageID + " from " + sub.getHost(), true);
206 Log.msg("Feeding " + messageID, true);
207 ArticleWriter awrite = new ArticleWriter(
208 "localhost", Config.inst().get(Config.PORT, 119));
209 awrite.writeArticle(abuf);
212 Stats.getInstance().mailFeeded(sub.getGroup());
214 catch(IOException ex)
216 // There may be a temporary network failure
217 ex.printStackTrace();
218 Log.msg("Skipping mail " + messageID + " due to exception.", false);
222 this.highMarks.put(sub, newMark);
227 catch(StorageBackendException ex)
229 ex.printStackTrace();
231 catch(IOException ex)
233 ex.printStackTrace();
234 Log.msg("PullFeeder run stopped due to exception.", false);
236 } // for(Subscription sub : subscriptions)
238 Log.msg("PullFeeder run ended. Waiting " + pullInterval / 1000 + "s", true);
239 Thread.sleep(pullInterval);
241 catch(InterruptedException ex)
243 Log.msg(ex.getMessage(), false);