diff -r c404a87db5b7 -r 74139325d305 src/org/sonews/feed/PullFeeder.java --- a/src/org/sonews/feed/PullFeeder.java Sun Aug 29 17:43:58 2010 +0200 +++ b/src/org/sonews/feed/PullFeeder.java Sun Aug 29 18:17:37 2010 +0200 @@ -49,228 +49,192 @@ */ class PullFeeder extends AbstractDaemon { - - private Map highMarks = new HashMap(); - private BufferedReader in; - private PrintWriter out; - private Set subscriptions = new HashSet(); - - private void addSubscription(final Subscription sub) - { - subscriptions.add(sub); - if(!highMarks.containsKey(sub)) - { - // Set a initial highMark - this.highMarks.put(sub, 0); - } - } - - /** - * Changes to the given group and returns its high mark. - * @param groupName - * @return - */ - private int changeGroup(String groupName) - throws IOException - { - this.out.print("GROUP " + groupName + "\r\n"); - this.out.flush(); - - String line = this.in.readLine(); - if(line.startsWith("211 ")) - { - int highmark = Integer.parseInt(line.split(" ")[3]); - return highmark; - } - else - { - throw new IOException("GROUP " + groupName + " returned: " + line); - } - } - - private void connectTo(final String host, final int port) - throws IOException, UnknownHostException - { - Socket socket = new Socket(host, port); - this.out = new PrintWriter(socket.getOutputStream()); - this.in = new BufferedReader(new InputStreamReader(socket.getInputStream())); + private Map highMarks = new HashMap(); + private BufferedReader in; + private PrintWriter out; + private Set subscriptions = new HashSet(); - String line = in.readLine(); - if(!(line.charAt(0) == '2')) // Could be 200 or 2xx if posting is not allowed - { - throw new IOException(line); - } + private void addSubscription(final Subscription sub) + { + subscriptions.add(sub); - // Send MODE READER to peer, some newsservers are friendlier then - this.out.println("MODE READER\r\n"); - this.out.flush(); - line = this.in.readLine(); - } - - private void disconnect() - throws IOException - { - this.out.print("QUIT\r\n"); - this.out.flush(); - this.out.close(); - this.in.close(); - - this.out = null; - this.in = null; - } - - /** - * Uses the OVER or XOVER command to get a list of message overviews that - * may be unknown to this feeder and are about to be peered. - * @param start - * @param end - * @return A list of message ids with potentially interesting messages. - */ - private List over(int start, int end) - throws IOException - { - this.out.print("OVER " + start + "-" + end + "\r\n"); - this.out.flush(); - - String line = this.in.readLine(); - if(line.startsWith("500 ")) // OVER not supported - { - this.out.print("XOVER " + start + "-" + end + "\r\n"); - this.out.flush(); - - line = this.in.readLine(); - } - - if(line.startsWith("224 ")) - { - List messages = new ArrayList(); - line = this.in.readLine(); - while(!".".equals(line)) - { - String mid = line.split("\t")[4]; // 5th should be the Message-ID - messages.add(mid); - line = this.in.readLine(); - } - return messages; - } - else - { - throw new IOException("Server return for OVER/XOVER: " + line); - } - } - - @Override - public void run() - { - while(isRunning()) - { - int pullInterval = 1000 * - Config.inst().get(Config.FEED_PULLINTERVAL, 3600); - String host = "localhost"; - int port = 119; - - Log.get().info("Start PullFeeder run..."); + if (!highMarks.containsKey(sub)) { + // Set a initial highMark + this.highMarks.put(sub, 0); + } + } - try - { - this.subscriptions.clear(); - List subsPull = StorageManager.current() - .getSubscriptions(FeedManager.TYPE_PULL); - for(Subscription sub : subsPull) - { - addSubscription(sub); - } - } - catch(StorageBackendException ex) - { - Log.get().log(Level.SEVERE, host, ex); - } + /** + * Changes to the given group and returns its high mark. + * @param groupName + * @return + */ + private int changeGroup(String groupName) + throws IOException + { + this.out.print("GROUP " + groupName + "\r\n"); + this.out.flush(); - try - { - for(Subscription sub : this.subscriptions) - { - host = sub.getHost(); - port = sub.getPort(); + String line = this.in.readLine(); + if (line.startsWith("211 ")) { + int highmark = Integer.parseInt(line.split(" ")[3]); + return highmark; + } else { + throw new IOException("GROUP " + groupName + " returned: " + line); + } + } - try - { - Log.get().info("Feeding " + sub.getGroup() + " from " + sub.getHost()); - try - { - connectTo(host, port); - } - catch(SocketException ex) - { - Log.get().info("Skipping " + sub.getHost() + ": " + ex); - continue; - } - - int oldMark = this.highMarks.get(sub); - int newMark = changeGroup(sub.getGroup()); - - if(oldMark != newMark) - { - List messageIDs = over(oldMark, newMark); + private void connectTo(final String host, final int port) + throws IOException, UnknownHostException + { + Socket socket = new Socket(host, port); + this.out = new PrintWriter(socket.getOutputStream()); + this.in = new BufferedReader(new InputStreamReader(socket.getInputStream())); - for(String messageID : messageIDs) - { - if(!StorageManager.current().isArticleExisting(messageID)) - { - try - { - // Post the message via common socket connection - ArticleReader aread = - new ArticleReader(sub.getHost(), sub.getPort(), messageID); - byte[] abuf = aread.getArticleData(); - if(abuf == null) - { - Log.get().warning("Could not feed " + messageID - + " from " + sub.getHost()); - } - else - { - Log.get().info("Feeding " + messageID); - ArticleWriter awrite = new ArticleWriter( - "localhost", Config.inst().get(Config.PORT, 119)); - awrite.writeArticle(abuf); - awrite.close(); - } - Stats.getInstance().mailFeeded(sub.getGroup()); - } - catch(IOException ex) - { - // There may be a temporary network failure - ex.printStackTrace(); - Log.get().warning("Skipping mail " + messageID + " due to exception."); - } - } - } // for(;;) - this.highMarks.put(sub, newMark); - } - - disconnect(); - } - catch(StorageBackendException ex) - { - ex.printStackTrace(); - } - catch(IOException ex) - { - ex.printStackTrace(); - Log.get().severe("PullFeeder run stopped due to exception."); - } - } // for(Subscription sub : subscriptions) - - Log.get().info("PullFeeder run ended. Waiting " + pullInterval / 1000 + "s"); - Thread.sleep(pullInterval); - } - catch(InterruptedException ex) - { - Log.get().warning(ex.getMessage()); - } - } - } - + String line = in.readLine(); + if (!(line.charAt(0) == '2')) // Could be 200 or 2xx if posting is not allowed + { + throw new IOException(line); + } + + // Send MODE READER to peer, some newsservers are friendlier then + this.out.println("MODE READER\r\n"); + this.out.flush(); + line = this.in.readLine(); + } + + private void disconnect() + throws IOException + { + this.out.print("QUIT\r\n"); + this.out.flush(); + this.out.close(); + this.in.close(); + + this.out = null; + this.in = null; + } + + /** + * Uses the OVER or XOVER command to get a list of message overviews that + * may be unknown to this feeder and are about to be peered. + * @param start + * @param end + * @return A list of message ids with potentially interesting messages. + */ + private List over(int start, int end) + throws IOException + { + this.out.print("OVER " + start + "-" + end + "\r\n"); + this.out.flush(); + + String line = this.in.readLine(); + if (line.startsWith("500 ")) // OVER not supported + { + this.out.print("XOVER " + start + "-" + end + "\r\n"); + this.out.flush(); + + line = this.in.readLine(); + } + + if (line.startsWith("224 ")) { + List messages = new ArrayList(); + line = this.in.readLine(); + while (!".".equals(line)) { + String mid = line.split("\t")[4]; // 5th should be the Message-ID + messages.add(mid); + line = this.in.readLine(); + } + return messages; + } else { + throw new IOException("Server return for OVER/XOVER: " + line); + } + } + + @Override + public void run() + { + while (isRunning()) { + int pullInterval = 1000 + * Config.inst().get(Config.FEED_PULLINTERVAL, 3600); + String host = "localhost"; + int port = 119; + + Log.get().info("Start PullFeeder run..."); + + try { + this.subscriptions.clear(); + List subsPull = StorageManager.current().getSubscriptions(FeedManager.TYPE_PULL); + for (Subscription sub : subsPull) { + addSubscription(sub); + } + } catch (StorageBackendException ex) { + Log.get().log(Level.SEVERE, host, ex); + } + + try { + for (Subscription sub : this.subscriptions) { + host = sub.getHost(); + port = sub.getPort(); + + try { + Log.get().info("Feeding " + sub.getGroup() + " from " + sub.getHost()); + try { + connectTo(host, port); + } catch (SocketException ex) { + Log.get().info("Skipping " + sub.getHost() + ": " + ex); + continue; + } + + int oldMark = this.highMarks.get(sub); + int newMark = changeGroup(sub.getGroup()); + + if (oldMark != newMark) { + List messageIDs = over(oldMark, newMark); + + for (String messageID : messageIDs) { + if (!StorageManager.current().isArticleExisting(messageID)) { + try { + // Post the message via common socket connection + ArticleReader aread = + new ArticleReader(sub.getHost(), sub.getPort(), messageID); + byte[] abuf = aread.getArticleData(); + if (abuf == null) { + Log.get().warning("Could not feed " + messageID + + " from " + sub.getHost()); + } else { + Log.get().info("Feeding " + messageID); + ArticleWriter awrite = new ArticleWriter( + "localhost", Config.inst().get(Config.PORT, 119)); + awrite.writeArticle(abuf); + awrite.close(); + } + Stats.getInstance().mailFeeded(sub.getGroup()); + } catch (IOException ex) { + // There may be a temporary network failure + ex.printStackTrace(); + Log.get().warning("Skipping mail " + messageID + " due to exception."); + } + } + } // for(;;) + this.highMarks.put(sub, newMark); + } + + disconnect(); + } catch (StorageBackendException ex) { + ex.printStackTrace(); + } catch (IOException ex) { + ex.printStackTrace(); + Log.get().severe("PullFeeder run stopped due to exception."); + } + } // for(Subscription sub : subscriptions) + + Log.get().info("PullFeeder run ended. Waiting " + pullInterval / 1000 + "s"); + Thread.sleep(pullInterval); + } catch (InterruptedException ex) { + Log.get().warning(ex.getMessage()); + } + } + } }