forked from I2P_Developers/i2p.i2p
* Netdb Search:
- Follow all DSRM's, not just the last one, by moving the code from the match job to the selector - Update peer profile after SingleSearchJob - Cleanups, javadocs, log tweaks, final
This commit is contained in:
@ -1,3 +1,11 @@
|
||||
2011-07-29 zzz
|
||||
* Netdb Search:
|
||||
- Follow all DSRM's, not just the last one, by moving the code
|
||||
from the match job to the selector
|
||||
- Update peer profile after SingleSearchJob
|
||||
- Cleanups, javadocs, log tweaks, final
|
||||
* ProfileOrganizer: Tweak fast tier size
|
||||
|
||||
2011-07-28 zzz
|
||||
* Context: Split up big lock to avoid deadlocks
|
||||
* Streaming: Avoid a rare exception on race
|
||||
|
@ -18,7 +18,7 @@ public class RouterVersion {
|
||||
/** deprecated */
|
||||
public final static String ID = "Monotone";
|
||||
public final static String VERSION = CoreVersion.VERSION;
|
||||
public final static long BUILD = 16;
|
||||
public final static long BUILD = 17;
|
||||
|
||||
/** for example "-test" */
|
||||
public final static String EXTRA = "";
|
||||
|
@ -14,7 +14,6 @@ import net.i2p.util.Log;
|
||||
class FloodOnlyLookupMatchJob extends JobImpl implements ReplyJob {
|
||||
private final Log _log;
|
||||
private final FloodOnlySearchJob _search;
|
||||
private DatabaseSearchReplyMessage _dsrm;
|
||||
|
||||
public FloodOnlyLookupMatchJob(RouterContext ctx, FloodOnlySearchJob job) {
|
||||
super(ctx);
|
||||
@ -28,19 +27,8 @@ class FloodOnlyLookupMatchJob extends JobImpl implements ReplyJob {
|
||||
_log.info(_search.getJobId() + ": search match and found locally");
|
||||
_search.success();
|
||||
} else {
|
||||
int remaining = _search.getLookupsRemaining();
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info(_search.getJobId() + ": got a DatabaseSearchReply when we were looking for "
|
||||
+ _search.getKey().toBase64() + ", with " + remaining + " outstanding searches");
|
||||
// netDb reply pointing us at other people
|
||||
// Only process if we don't know enough floodfills
|
||||
// This only works if both reply, otherwise we aren't called - should be fixed
|
||||
if (_search.shouldProcessDSRM() && _dsrm != null) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info(_search.getJobId() + ": Processing DatabaseSearchReply");
|
||||
// Chase the hashes from the reply
|
||||
getContext().jobQueue().addJob(new SingleLookupJob(getContext(), _dsrm));
|
||||
}
|
||||
// In practice, we always have zero remaining when this is called,
|
||||
// because the selector only returns true when there is zero remaining
|
||||
_search.failed();
|
||||
}
|
||||
}
|
||||
@ -49,11 +37,9 @@ class FloodOnlyLookupMatchJob extends JobImpl implements ReplyJob {
|
||||
|
||||
public void setMessage(I2NPMessage message) {
|
||||
if (message instanceof DatabaseSearchReplyMessage) {
|
||||
// DSRM processing now in FloodOnlyLookupSelector instead of here,
|
||||
// a dsrm is only passed in when there are no more lookups remaining
|
||||
// If more than one peer sent one, we only process the last one
|
||||
// And sadly if the first peer sends a DRSM and the second one times out,
|
||||
// this won't get called...
|
||||
_dsrm = (DatabaseSearchReplyMessage) message;
|
||||
// so that all DSRM's are processed, not just the last one.
|
||||
_search.failed();
|
||||
return;
|
||||
}
|
||||
|
@ -38,14 +38,32 @@ class FloodOnlyLookupSelector implements MessageSelector {
|
||||
} else if (message instanceof DatabaseSearchReplyMessage) {
|
||||
DatabaseSearchReplyMessage dsrm = (DatabaseSearchReplyMessage)message;
|
||||
if (_search.getKey().equals(dsrm.getSearchKey())) {
|
||||
_search.decrementRemaining(dsrm.getFromHash());
|
||||
// assume 0 old, all new, 0 invalid, 0 dup
|
||||
|
||||
// TODO - dsrm.getFromHash() can't be trusted - check against the list of
|
||||
// those we sent the search to in _search ?
|
||||
|
||||
// assume 0 new, all old, 0 invalid, 0 dup
|
||||
_context.profileManager().dbLookupReply(dsrm.getFromHash(), 0, dsrm.getNumReplies(), 0, 0,
|
||||
System.currentTimeMillis()-_search.getCreated());
|
||||
if (_search.getLookupsRemaining() <= 0)
|
||||
return true; // ok, no more left, so time to fail
|
||||
else
|
||||
return false;
|
||||
|
||||
// Moved from FloodOnlyLookupMatchJob so it is called for all replies
|
||||
// rather than just the last one
|
||||
// Got a netDb reply pointing us at other floodfills...
|
||||
// Only process if we don't know enough floodfills or are starting up
|
||||
if (_search.shouldProcessDSRM()) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info(_search.getJobId() + ": Processing DSRM via SingleLookupJob, apparently from " + dsrm.getFromHash());
|
||||
// Chase the hashes from the reply
|
||||
_context.jobQueue().addJob(new SingleLookupJob(_context, dsrm));
|
||||
} else if (_log.shouldLog(Log.INFO)) {
|
||||
int remaining = _search.getLookupsRemaining();
|
||||
_log.info(_search.getJobId() + ": got a DSRM apparently from " + dsrm.getFromHash() + " when we were looking for "
|
||||
+ _search.getKey() + ", with " + remaining + " outstanding searches");
|
||||
}
|
||||
|
||||
// if no more left, time to fail
|
||||
int remaining = _search.decrementRemaining(dsrm.getFromHash());
|
||||
return remaining <= 0;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
|
@ -6,17 +6,16 @@ import net.i2p.util.Log;
|
||||
|
||||
class FloodOnlyLookupTimeoutJob extends JobImpl {
|
||||
private final FloodSearchJob _search;
|
||||
private final Log _log;
|
||||
|
||||
public FloodOnlyLookupTimeoutJob(RouterContext ctx, FloodOnlySearchJob job) {
|
||||
super(ctx);
|
||||
_search = job;
|
||||
_log = ctx.logManager().getLog(getClass());
|
||||
}
|
||||
|
||||
public void runJob() {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info(_search.getJobId() + ": search timed out");
|
||||
Log log = getContext().logManager().getLog(getClass());
|
||||
if (log.shouldLog(Log.INFO))
|
||||
log.info(_search.getJobId() + ": search timed out");
|
||||
_search.failed();
|
||||
}
|
||||
|
||||
|
@ -38,24 +38,24 @@ import net.i2p.util.Log;
|
||||
*/
|
||||
class FloodOnlySearchJob extends FloodSearchJob {
|
||||
private volatile boolean _dead;
|
||||
private final long _created;
|
||||
protected final long _created;
|
||||
private boolean _shouldProcessDSRM;
|
||||
private final HashSet<Hash> _unheardFrom;
|
||||
|
||||
private final List<OutNetMessage> _out;
|
||||
/** this is a marker to register with the MessageRegistry, it is never sent */
|
||||
private OutNetMessage _out;
|
||||
protected final MessageSelector _replySelector;
|
||||
protected final ReplyJob _onReply;
|
||||
protected final Job _onTimeout;
|
||||
|
||||
private static final int MIN_FOR_NO_DSRM = 4;
|
||||
|
||||
public FloodOnlySearchJob(RouterContext ctx, FloodfillNetworkDatabaseFacade facade, Hash key, Job onFind, Job onFailed, int timeoutMs, boolean isLease) {
|
||||
super(ctx, facade, key, onFind, onFailed, timeoutMs, isLease);
|
||||
// these override the settings in super
|
||||
_log = ctx.logManager().getLog(FloodOnlySearchJob.class);
|
||||
_timeoutMs = Math.min(timeoutMs, SearchJob.PER_FLOODFILL_PEER_TIMEOUT);
|
||||
_expiration = _timeoutMs + ctx.clock().now();
|
||||
_origExpiration = _timeoutMs + ctx.clock().now();
|
||||
// do we need a synchronizedList, since we synch on _out everywhere below...
|
||||
_out = Collections.synchronizedList(new ArrayList(2));
|
||||
_unheardFrom = new HashSet(CONCURRENT_SEARCHES);
|
||||
_replySelector = new FloodOnlyLookupSelector(getContext(), this);
|
||||
_onReply = new FloodOnlyLookupMatchJob(getContext(), this);
|
||||
@ -63,10 +63,10 @@ class FloodOnlySearchJob extends FloodSearchJob {
|
||||
_created = System.currentTimeMillis();
|
||||
}
|
||||
|
||||
/** System time, NOT context time */
|
||||
public long getCreated() { return _created; }
|
||||
|
||||
public boolean shouldProcessDSRM() { return _shouldProcessDSRM; }
|
||||
private static final int CONCURRENT_SEARCHES = 2;
|
||||
private static final int MIN_FOR_NO_DSRM = 4;
|
||||
|
||||
@Override
|
||||
public void runJob() {
|
||||
@ -105,8 +105,10 @@ class FloodOnlySearchJob extends FloodSearchJob {
|
||||
}
|
||||
Collections.shuffle(floodfillPeers, getContext().random());
|
||||
}
|
||||
OutNetMessage out = getContext().messageRegistry().registerPending(_replySelector, _onReply, _onTimeout, _timeoutMs);
|
||||
synchronized (_out) { _out.add(out); }
|
||||
|
||||
// This OutNetMessage is never used or sent (setMessage() is never called), it's only
|
||||
// so we can register a reply selector.
|
||||
_out = getContext().messageRegistry().registerPending(_replySelector, _onReply, _onTimeout, _timeoutMs);
|
||||
|
||||
/********
|
||||
// We need to randomize our ff selection, else we stay with the same ones since
|
||||
@ -194,11 +196,15 @@ class FloodOnlySearchJob extends FloodSearchJob {
|
||||
@Override
|
||||
public String getName() { return "NetDb flood search (phase 1)"; }
|
||||
|
||||
/** Note that we heard from the peer */
|
||||
void decrementRemaining(Hash peer) {
|
||||
decrementRemaining();
|
||||
/**
|
||||
* Note that we heard from the peer
|
||||
*
|
||||
* @return number remaining after decrementing
|
||||
*/
|
||||
int decrementRemaining(Hash peer) {
|
||||
synchronized(_unheardFrom) {
|
||||
_unheardFrom.remove(peer);
|
||||
return decrementRemaining();
|
||||
}
|
||||
}
|
||||
|
||||
@ -208,12 +214,7 @@ class FloodOnlySearchJob extends FloodSearchJob {
|
||||
if (_dead) return;
|
||||
_dead = true;
|
||||
}
|
||||
List outBuf = null;
|
||||
synchronized (_out) { outBuf = new ArrayList(_out); }
|
||||
for (int i = 0; i < outBuf.size(); i++) {
|
||||
OutNetMessage out = (OutNetMessage)outBuf.get(i);
|
||||
getContext().messageRegistry().unregisterPending(out);
|
||||
}
|
||||
getContext().messageRegistry().unregisterPending(_out);
|
||||
int timeRemaining = (int)(_origExpiration - getContext().clock().now());
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info(getJobId() + ": Floodfill search for " + _key.toBase64() + " failed with " + timeRemaining + " remaining after " + (System.currentTimeMillis()-_created));
|
||||
|
@ -29,7 +29,7 @@ import net.i2p.util.Log;
|
||||
* Note that this does NOT extend SearchJob.
|
||||
*/
|
||||
public class FloodSearchJob extends JobImpl {
|
||||
protected Log _log;
|
||||
protected final Log _log;
|
||||
protected final FloodfillNetworkDatabaseFacade _facade;
|
||||
protected final Hash _key;
|
||||
protected final List<Job> _onFind;
|
||||
@ -43,24 +43,22 @@ public class FloodSearchJob extends JobImpl {
|
||||
|
||||
public FloodSearchJob(RouterContext ctx, FloodfillNetworkDatabaseFacade facade, Hash key, Job onFind, Job onFailed, int timeoutMs, boolean isLease) {
|
||||
super(ctx);
|
||||
_log = ctx.logManager().getLog(FloodSearchJob.class);
|
||||
_log = ctx.logManager().getLog(getClass());
|
||||
_facade = facade;
|
||||
_key = key;
|
||||
_onFind = new ArrayList();
|
||||
_onFind = new ArrayList(4);
|
||||
_onFind.add(onFind);
|
||||
_onFailed = new ArrayList();
|
||||
_onFailed = new ArrayList(4);
|
||||
_onFailed.add(onFailed);
|
||||
int timeout = -1;
|
||||
timeout = timeoutMs / FLOOD_SEARCH_TIME_FACTOR;
|
||||
int timeout = timeoutMs / FLOOD_SEARCH_TIME_FACTOR;
|
||||
if (timeout < timeoutMs)
|
||||
timeout = timeoutMs;
|
||||
_timeoutMs = timeout;
|
||||
_expiration = timeout + ctx.clock().now();
|
||||
_origExpiration = timeoutMs + ctx.clock().now();
|
||||
_isLease = isLease;
|
||||
_lookupsRemaining = 0;
|
||||
_dead = false;
|
||||
}
|
||||
|
||||
void addDeferred(Job onFind, Job onFailed, long timeoutMs, boolean isLease) {
|
||||
if (_dead) {
|
||||
getContext().jobQueue().addJob(onFailed);
|
||||
@ -69,10 +67,14 @@ public class FloodSearchJob extends JobImpl {
|
||||
if (onFailed != null) synchronized (_onFailed) { _onFailed.add(onFailed); }
|
||||
}
|
||||
}
|
||||
|
||||
/** using context clock */
|
||||
public long getExpiration() { return _expiration; }
|
||||
private static final int CONCURRENT_SEARCHES = 2;
|
||||
|
||||
protected static final int CONCURRENT_SEARCHES = 2;
|
||||
private static final int FLOOD_SEARCH_TIME_FACTOR = 2;
|
||||
private static final int FLOOD_SEARCH_TIME_MIN = 30*1000;
|
||||
|
||||
public void runJob() {
|
||||
// pick some floodfill peers and send out the searches
|
||||
List floodfillPeers = _facade.getFloodfillPeers();
|
||||
@ -120,10 +122,21 @@ public class FloodSearchJob extends JobImpl {
|
||||
_facade.searchFull(_key, _onFind, _onFailed, _timeoutMs*FLOOD_SEARCH_TIME_FACTOR, _isLease);
|
||||
}
|
||||
}
|
||||
|
||||
public String getName() { return "NetDb search (phase 1)"; }
|
||||
|
||||
protected Hash getKey() { return _key; }
|
||||
protected void decrementRemaining() { if (_lookupsRemaining > 0) _lookupsRemaining--; }
|
||||
|
||||
/**
|
||||
* TODO AtomicInteger?
|
||||
* @return number remaining after decrementing
|
||||
*/
|
||||
protected int decrementRemaining() {
|
||||
if (_lookupsRemaining > 0)
|
||||
return (--_lookupsRemaining);
|
||||
return 0;
|
||||
}
|
||||
|
||||
protected int getLookupsRemaining() { return _lookupsRemaining; }
|
||||
|
||||
void failed() {
|
||||
@ -144,6 +157,7 @@ public class FloodSearchJob extends JobImpl {
|
||||
getContext().jobQueue().addJob(removed.remove(0));
|
||||
}
|
||||
}
|
||||
|
||||
void success() {
|
||||
if (_dead) return;
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
@ -166,8 +180,8 @@ public class FloodSearchJob extends JobImpl {
|
||||
_search = job;
|
||||
}
|
||||
public void runJob() {
|
||||
_search.decrementRemaining();
|
||||
if (_search.getLookupsRemaining() <= 0)
|
||||
int remaining = _search.decrementRemaining();
|
||||
if (remaining <= 0)
|
||||
_search.failed();
|
||||
}
|
||||
public String getName() { return "NetDb search (phase 1) timeout"; }
|
||||
|
@ -33,7 +33,7 @@ import net.i2p.util.Log;
|
||||
*/
|
||||
public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacade {
|
||||
public static final char CAPABILITY_FLOODFILL = 'f';
|
||||
private final Map _activeFloodQueries;
|
||||
private final Map<Hash, FloodSearchJob> _activeFloodQueries;
|
||||
private boolean _floodfillEnabled;
|
||||
/** for testing, see isFloodfill() below */
|
||||
private static String _alwaysQuery;
|
||||
@ -250,7 +250,7 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad
|
||||
boolean isNew = false;
|
||||
FloodSearchJob searchJob = null;
|
||||
synchronized (_activeFloodQueries) {
|
||||
searchJob = (FloodSearchJob)_activeFloodQueries.get(key);
|
||||
searchJob = _activeFloodQueries.get(key);
|
||||
if (searchJob == null) {
|
||||
//if (SearchJob.onlyQueryFloodfillPeers(_context)) {
|
||||
searchJob = new FloodOnlySearchJob(_context, this, key, onFindJob, onFailedLookupJob, (int)timeoutMs, isLease);
|
||||
@ -325,6 +325,7 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void complete(Hash key) {
|
||||
synchronized (_activeFloodQueries) { _activeFloodQueries.remove(key); }
|
||||
}
|
||||
|
@ -43,6 +43,7 @@ class FloodfillPeerSelector extends PeerSelector {
|
||||
* Puts the floodfill peers that are directly connected first in the list.
|
||||
* List will not include our own hash.
|
||||
*
|
||||
* @param key the ROUTING key (NOT the original key)
|
||||
* @param peersToIgnore can be null
|
||||
* @return List of Hash for the peers selected
|
||||
*/
|
||||
@ -57,6 +58,7 @@ class FloodfillPeerSelector extends PeerSelector {
|
||||
* Does not prefer the floodfill peers that are directly connected.
|
||||
* List will not include our own hash.
|
||||
*
|
||||
* @param key the ROUTING key (NOT the original key)
|
||||
* @param peersToIgnore can be null
|
||||
* @return List of Hash for the peers selected
|
||||
*/
|
||||
@ -70,6 +72,7 @@ class FloodfillPeerSelector extends PeerSelector {
|
||||
* after they're complete, sort via kademlia.
|
||||
* List will not include our own hash.
|
||||
*
|
||||
* @param key the ROUTING key (NOT the original key)
|
||||
* @param peersToIgnore can be null
|
||||
* @return List of Hash for the peers selected
|
||||
*/
|
||||
@ -133,7 +136,7 @@ class FloodfillPeerSelector extends PeerSelector {
|
||||
* List will not include our own hash.
|
||||
*
|
||||
* @return floodfills closest to the key that are not shitlisted forever
|
||||
* @param key the routing key
|
||||
* @param key the ROUTING key (NOT the original key)
|
||||
* @param maxNumRouters max to return
|
||||
* Sorted by closest to the key if > maxNumRouters, otherwise not
|
||||
* The list is in 3 groups - sorted by routing key within each group.
|
||||
@ -159,6 +162,7 @@ class FloodfillPeerSelector extends PeerSelector {
|
||||
/**
|
||||
* See above for description
|
||||
* List will not include our own hash
|
||||
* @param key the ROUTING key (NOT the original key)
|
||||
* @param toIgnore can be null
|
||||
*/
|
||||
List<Hash> selectFloodfillParticipants(Hash key, int howMany, Set<Hash> toIgnore, KBucketSet kbuckets) {
|
||||
@ -175,6 +179,7 @@ class FloodfillPeerSelector extends PeerSelector {
|
||||
/**
|
||||
* See above for description
|
||||
* List MAY CONTAIN our own hash unless included in toIgnore
|
||||
* @param key the ROUTING key (NOT the original key)
|
||||
* @param toIgnore can be null
|
||||
*/
|
||||
private List<Hash> selectFloodfillParticipantsIncludingUs(Hash key, int howMany, Set<Hash> toIgnore, KBucketSet kbuckets) {
|
||||
@ -271,6 +276,7 @@ class FloodfillPeerSelector extends PeerSelector {
|
||||
|
||||
/**
|
||||
* Warning - may return our router hash - add to toIgnore if necessary
|
||||
* @param key the ROUTING key (NOT the original key)
|
||||
* @param toIgnore can be null
|
||||
*/
|
||||
public FloodfillSelectionCollector(Hash key, Set<Hash> toIgnore, int wanted) {
|
||||
|
@ -5,7 +5,7 @@ import net.i2p.data.RouterInfo;
|
||||
import net.i2p.data.i2np.DatabaseSearchReplyMessage;
|
||||
import net.i2p.router.JobImpl;
|
||||
import net.i2p.router.RouterContext;
|
||||
import net.i2p.util.Log;
|
||||
//import net.i2p.util.Log;
|
||||
|
||||
/**
|
||||
* Ask the peer who sent us the DSRM for the RouterInfos.
|
||||
@ -20,12 +20,12 @@ import net.i2p.util.Log;
|
||||
*
|
||||
*/
|
||||
class SingleLookupJob extends JobImpl {
|
||||
private final Log _log;
|
||||
//private final Log _log;
|
||||
private final DatabaseSearchReplyMessage _dsrm;
|
||||
|
||||
public SingleLookupJob(RouterContext ctx, DatabaseSearchReplyMessage dsrm) {
|
||||
super(ctx);
|
||||
_log = ctx.logManager().getLog(getClass());
|
||||
//_log = ctx.logManager().getLog(getClass());
|
||||
_dsrm = dsrm;
|
||||
}
|
||||
|
||||
|
@ -56,8 +56,11 @@ class SingleSearchJob extends FloodOnlySearchJob {
|
||||
@Override
|
||||
void failed() {
|
||||
getContext().messageRegistry().unregisterPending(_onm);
|
||||
getContext().profileManager().dbLookupFailed(_to);
|
||||
}
|
||||
|
||||
@Override
|
||||
void success() {}
|
||||
void success() {
|
||||
getContext().profileManager().dbLookupSuccessful(_to, System.currentTimeMillis()-_created);
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user