* BuildHandler:

- Add early throttler based on previous hop
   - Limit concurrent next-hop lookups
This commit is contained in:
zzz
2013-01-27 16:24:29 +00:00
parent bd900d8d55
commit 9247dc898c
4 changed files with 115 additions and 7 deletions

View File

@ -1,3 +1,10 @@
2013-01-27 zzz
* BuildHandler:
- Add early throttler based on previous hop
- Limit concurrent next-hop lookups
* NetDB: Increase floodfills again
* RandomSource: Seed from SecureRandom too
2013-01-23 str4d
* i2ptunnel: Added IP -> I2P URL mapping to SOCKS client tunnel
(via custom option ipmapping.IP=URL)

View File

@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 5;
public final static long BUILD = 6;
/** for example "-test" */
public final static String EXTRA = "";

View File

@ -3,6 +3,7 @@ package net.i2p.router.tunnel.pool;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import net.i2p.data.Base64;
import net.i2p.data.ByteArray;
@ -56,7 +57,9 @@ class BuildHandler implements Runnable {
private final Job _buildReplyMessageHandlerJob;
private final BlockingQueue<BuildMessageState> _inboundBuildMessages;
private final BuildMessageProcessor _processor;
private final RequestThrottler _requestThrottler;
private final ParticipatingThrottler _throttler;
private final AtomicInteger _currentLookups = new AtomicInteger();
private volatile boolean _isRunning;
/** TODO these may be too high, review and adjust */
@ -65,6 +68,12 @@ class BuildHandler implements Runnable {
private static final int NEXT_HOP_LOOKUP_TIMEOUT = 15*1000;
private static final int PRIORITY = OutNetMessage.PRIORITY_BUILD_REPLY;
/** limits on concurrent next-hop RI lookup */
private static final int MIN_LOOKUP_LIMIT = 10;
private static final int MAX_LOOKUP_LIMIT = 100;
/** limit lookups to this % of current participating tunnels */
private static final int PERCENT_LOOKUP_LIMIT = 3;
/**
* This must be high, as if we timeout the send we remove the tunnel from
@ -96,6 +105,9 @@ class BuildHandler implements Runnable {
_context.statManager().createRequiredRateStat("tunnel.rejectDupID", "Part. tunnel dup ID", "Tunnels", new long[] { 24*60*60*1000 });
_context.statManager().createRequiredRateStat("tunnel.ownDupID", "Our tunnel dup. ID", "Tunnels", new long[] { 24*60*60*1000 });
_context.statManager().createRequiredRateStat("tunnel.rejectHostile", "Reject malicious tunnel", "Tunnels", new long[] { 24*60*60*1000 });
_context.statManager().createRequiredRateStat("tunnel.rejectHopThrottle", "Reject per-hop limit", "Tunnels", new long[] { 60*60*1000 });
_context.statManager().createRequiredRateStat("tunnel.dropReqThrottle", "Drop per-hop limit", "Tunnels", new long[] { 60*60*1000 });
_context.statManager().createRequiredRateStat("tunnel.dropLookupThrottle", "Drop next hop lookup", "Tunnels", new long[] { 60*60*1000 });
_context.statManager().createRequiredRateStat("tunnel.rejectOverloaded", "Delay to process rejected request (ms)", "Tunnels", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRequiredRateStat("tunnel.acceptLoad", "Delay to process accepted request (ms)", "Tunnels", new long[] { 60*1000, 10*60*1000 });
@ -118,6 +130,9 @@ class BuildHandler implements Runnable {
ctx.statManager().createRateStat("tunnel.buildLookupSuccess", "Was a deferred lookup successful?", "Tunnels", new long[] { 60*60*1000 });
_processor = new BuildMessageProcessor(ctx);
// used for previous hop, for all requests
_requestThrottler = new RequestThrottler(ctx);
// used for previous and next hops, for successful builds only
_throttler = new ParticipatingThrottler(ctx);
_buildMessageHandlerJob = new TunnelBuildMessageHandlerJob(ctx);
_buildReplyMessageHandlerJob = new TunnelBuildReplyMessageHandlerJob(ctx);
@ -338,7 +353,12 @@ class BuildHandler implements Runnable {
}
}
/** @return handle time or -1 if it wasn't completely handled */
/**
* Decrypt the request, lookup the RI locally,
* and call handleReq() if found or queue a lookup job.
*
* @return handle time or -1 if it wasn't completely handled
*/
private long handleRequest(BuildMessageState state) {
long timeSinceReceived = _context.clock().now()-state.recvTime;
if (_log.shouldLog(Log.DEBUG))
@ -377,11 +397,23 @@ class BuildHandler implements Runnable {
if (lookupTime > 500 && _log.shouldLog(Log.WARN))
_log.warn("Took too long to lookup the request: " + lookupTime + "/" + readPeerTime + " for message " + state.msg.getUniqueId() + " received " + (timeSinceReceived+decryptTime) + " ago");
if (nextPeerInfo == null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Request " + state.msg.getUniqueId() + "/" + req.readReceiveTunnelId() + "/" + req.readNextTunnelId()
+ " handled, looking for the next peer " + nextPeer);
_context.netDb().lookupRouterInfo(nextPeer, new HandleReq(_context, state, req, nextPeer),
// limit concurrent next-hop lookups to prevent job queue overload attacks
int numTunnels = _context.tunnelManager().getParticipatingCount();
int limit = Math.max(MIN_LOOKUP_LIMIT, Math.min(MAX_LOOKUP_LIMIT, numTunnels * PERCENT_LOOKUP_LIMIT / 100));
int current = _currentLookups.incrementAndGet();
if (current <= limit) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Request " + state.msg.getUniqueId() + '/' + req.readReceiveTunnelId() + '/' + req.readNextTunnelId()
+ " handled, lookup next peer " + nextPeer
+ " lookups: " + current + '/' + limit);
_context.netDb().lookupRouterInfo(nextPeer, new HandleReq(_context, state, req, nextPeer),
new TimeoutReq(_context, state, req, nextPeer), NEXT_HOP_LOOKUP_TIMEOUT);
} else {
_currentLookups.decrementAndGet();
if (_log.shouldLog(Log.WARN))
_log.warn("Drop next hop lookup, limit " + limit);
_context.statManager().addRateData("tunnel.dropLookupThrottle", 1);
}
return -1;
} else {
long beforeHandle = System.currentTimeMillis();
@ -415,14 +447,19 @@ class BuildHandler implements Runnable {
private final BuildMessageState _state;
private final BuildRequestRecord _req;
private final Hash _nextPeer;
HandleReq(RouterContext ctx, BuildMessageState state, BuildRequestRecord req, Hash nextPeer) {
super(ctx);
_state = state;
_req = req;
_nextPeer = nextPeer;
}
public String getName() { return "Deferred tunnel join processing"; }
public void runJob() {
// decrement in-progress counter
_currentLookups.decrementAndGet();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Request " + _state.msg.getUniqueId() + " handled with a successful deferred lookup for the next peer " + _nextPeer);
@ -438,18 +475,23 @@ class BuildHandler implements Runnable {
}
}
private static class TimeoutReq extends JobImpl {
private class TimeoutReq extends JobImpl {
private final BuildMessageState _state;
private final BuildRequestRecord _req;
private final Hash _nextPeer;
TimeoutReq(RouterContext ctx, BuildMessageState state, BuildRequestRecord req, Hash nextPeer) {
super(ctx);
_state = state;
_req = req;
_nextPeer = nextPeer;
}
public String getName() { return "Timeout looking for next peer for tunnel join"; }
public void runJob() {
// decrement in-progress counter
_currentLookups.decrementAndGet();
getContext().statManager().addRateData("tunnel.rejectTimeout", 1);
getContext().statManager().addRateData("tunnel.buildLookupSuccess", 0);
// logging commented out so class can be static
@ -614,6 +656,7 @@ class BuildHandler implements Runnable {
if (_log.shouldLog(Log.WARN))
_log.warn("Rejecting tunnel (hop throttle), previous hop: " + from);
// no setTunnelStatus() indication
_context.statManager().addRateData("tunnel.rejectHopThrottle", 1);
response = TunnelHistory.TUNNEL_REJECT_BANDWIDTH;
}
}
@ -621,6 +664,7 @@ class BuildHandler implements Runnable {
_throttler.shouldThrottle(nextPeer)) {
if (_log.shouldLog(Log.WARN))
_log.warn("Rejecting tunnel (hop throttle), next hop: " + nextPeer);
_context.statManager().addRateData("tunnel.rejectHopThrottle", 1);
// no setTunnelStatus() indication
response = TunnelHistory.TUNNEL_REJECT_BANDWIDTH;
}
@ -819,6 +863,18 @@ class BuildHandler implements Runnable {
accept = false;
}
}
if (accept) {
// early request throttle check, before queueing and decryption
Hash fh = fromHash;
if (fh == null)
fh = from.calculateHash();
if (fh != null && _requestThrottler.shouldThrottle(fh)) {
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping tunnel request (from throttle), previous hop: " + from);
_context.statManager().addRateData("tunnel.dropReqThrottle", 1);
accept = false;
}
}
if (accept) {
// This is expensive and rarely seen, use CoDel instead
//int queueTime = estimateQueueTime(sz);

View File

@ -0,0 +1,45 @@
package net.i2p.router.tunnel.pool;
import net.i2p.data.Hash;
import net.i2p.router.RouterContext;
import net.i2p.util.ObjectCounter;
import net.i2p.util.SimpleScheduler;
import net.i2p.util.SimpleTimer;
/**
* Like ParticipatingThrottler, but checked much earlier,
* cleaned more frequently, and with more than double the min and max limits.
* This is called before the request is queued or decrypted.
*
* @since 0.9.5
*/
class RequestThrottler {
private final RouterContext context;
private final ObjectCounter<Hash> counter;
/** portion of the tunnel lifetime */
private static final int LIFETIME_PORTION = 6;
private static final int MIN_LIMIT = 45 / LIFETIME_PORTION;
private static final int MAX_LIMIT = 165 / LIFETIME_PORTION;
private static final int PERCENT_LIMIT = 12 / LIFETIME_PORTION;
private static final long CLEAN_TIME = 11*60*1000 / LIFETIME_PORTION;
RequestThrottler(RouterContext ctx) {
this.context = ctx;
this.counter = new ObjectCounter();
ctx.simpleScheduler().addPeriodicEvent(new Cleaner(), CLEAN_TIME);
}
/** increments before checking */
boolean shouldThrottle(Hash h) {
int numTunnels = this.context.tunnelManager().getParticipatingCount();
int limit = Math.max(MIN_LIMIT, Math.min(MAX_LIMIT, numTunnels * PERCENT_LIMIT / 100));
return this.counter.increment(h) > limit;
}
private class Cleaner implements SimpleTimer.TimedEvent {
public void timeReached() {
RequestThrottler.this.counter.clear();
}
}
}