diff --git a/apps/i2psnark/java/src/org/klomp/snark/dht/DHTNodes.java b/apps/i2psnark/java/src/org/klomp/snark/dht/DHTNodes.java index f752efb17..1b2ae428e 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/dht/DHTNodes.java +++ b/apps/i2psnark/java/src/org/klomp/snark/dht/DHTNodes.java @@ -44,6 +44,8 @@ class DHTNodes { private static final long MIN_EXPIRE_TIME = 10*60*1000; private static final long DELTA_EXPIRE_TIME = 3*60*1000; private static final int MAX_PEERS = 799; + /** Buckets older than this are refreshed - BEP 5 says 15 minutes */ + private static final long MAX_BUCKET_AGE = 15*60*1000; public DHTNodes(I2PAppContext ctx, NID me) { _context = ctx; @@ -121,7 +123,7 @@ class DHTNodes { * DHT - get random keys to explore */ public List getExploreKeys() { - return _kad.getExploreKeys(15*60*1000); + return _kad.getExploreKeys(MAX_BUCKET_AGE); } /** */ diff --git a/apps/i2psnark/java/src/org/klomp/snark/dht/KRPC.java b/apps/i2psnark/java/src/org/klomp/snark/dht/KRPC.java index b3fd95da2..fa769a31e 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/dht/KRPC.java +++ b/apps/i2psnark/java/src/org/klomp/snark/dht/KRPC.java @@ -133,6 +133,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT { private static final int REPLY_PONG = 1; private static final int REPLY_PEERS = 2; private static final int REPLY_NODES = 3; + private static final int REPLY_NETWORK_FAIL = 4; public static final boolean SECURE_NID = true; @@ -272,6 +273,8 @@ public class KRPC implements I2PSessionMuxedListener, DHT { if (! (ni.equals(_myNodeInfo) || (toTry.contains(ni) && tried.contains(ni)))) toTry.add(ni); } + } else if (replyType == REPLY_NETWORK_FAIL) { + break; } else { if (_log.shouldLog(Log.INFO)) _log.info("Got unexpected reply " + replyType + ": " + waiter.getReplyObject()); @@ -370,6 +373,8 @@ public class KRPC implements I2PSessionMuxedListener, DHT { if (! (ni.equals(_myNodeInfo) || tried.contains(ni) || toTry.contains(ni))) toTry.add(ni); } + } else if (replyType == REPLY_NETWORK_FAIL) { + break; } else { if (_log.shouldLog(Log.INFO)) _log.info("Got unexpected reply " + replyType + ": " + waiter.getReplyObject()); @@ -564,7 +569,11 @@ public class KRPC implements I2PSessionMuxedListener, DHT { _tracker.stop(); PersistDHT.saveDHT(_knownNodes, _dhtFile); _knownNodes.stop(); - _sentQueries.clear(); + for (Iterator iter = _sentQueries.values().iterator(); iter.hasNext(); ) { + ReplyWaiter waiter = iter.next(); + iter.remove(); + waiter.networkFail(); + } _outgoingTokens.clear(); _incomingTokens.clear(); } @@ -1317,7 +1326,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT { private final NodeInfo sentTo; private final Runnable onReply; private final Runnable onTimeout; - private int replyCode; + private volatile int replyCode; private Object sentObject; private Object replyObject; @@ -1400,6 +1409,18 @@ public class KRPC implements I2PSessionMuxedListener, DHT { this.notifyAll(); } } + + /** + * Will notify this but not run onReply or onTimeout, + * or remove from _sentQueries, or call heardFrom(). + */ + public void networkFail() { + cancel(); + replyCode = REPLY_NETWORK_FAIL; + synchronized(this) { + this.notifyAll(); + } + } } // I2PSessionMuxedListener interface ---------------- @@ -1532,22 +1553,24 @@ public class KRPC implements I2PSessionMuxedListener, DHT { return; if (!_hasBootstrapped) { if (_log.shouldLog(Log.INFO)) - _log.info("Bootstrap start size: " + _knownNodes.size()); + _log.info("Bootstrap start, size: " + _knownNodes.size()); explore(_myNID, 8, 60*1000, 1); if (_log.shouldLog(Log.INFO)) - _log.info("Bootstrap done size: " + _knownNodes.size()); + _log.info("Bootstrap done, size: " + _knownNodes.size()); _hasBootstrapped = true; } if (!_isRunning) return; if (_log.shouldLog(Log.INFO)) - _log.info("Explore start size: " + _knownNodes.size()); + _log.info("Explore start. size: " + _knownNodes.size()); List keys = _knownNodes.getExploreKeys(); for (NID nid : keys) { explore(nid, 8, 60*1000, 1); + if (!_isRunning) + return; } if (_log.shouldLog(Log.INFO)) - _log.info("Explore done size: " + _knownNodes.size()); + _log.info("Explore of " + keys.size() + " buckets done, new size: " + _knownNodes.size()); new Explorer(EXPLORE_TIME); } }