* BuildHandler: Move inbound request handling to its own thread(s)

(ticket #542, see also http://zzz.i2p/topics/996)
This commit is contained in:
zzz
2011-10-28 01:43:33 +00:00
parent d7a5e3ef53
commit e8fe115ffe
3 changed files with 79 additions and 48 deletions

View File

@ -26,20 +26,21 @@ import net.i2p.util.Log;
* changed, such as a tunnel failed, new client started up, or tunnel creation was aborted).
*
* Note that 10 minute tunnel expiration is hardcoded in here.
*
* As of 0.8.11, inbound request handling is done in a separate thread.
*/
class BuildExecutor implements Runnable {
private final ArrayList<Long> _recentBuildIds = new ArrayList(100);
private final RouterContext _context;
private final Log _log;
private final TunnelPoolManager _manager;
/** list of TunnelCreatorConfig elements of tunnels currently being built */
/** Notify lock */
private final Object _currentlyBuilding;
/** indexed by ptcc.getReplyMessageId() */
private final ConcurrentHashMap<Long, PooledTunnelCreatorConfig> _currentlyBuildingMap;
/** indexed by ptcc.getReplyMessageId() */
private final ConcurrentHashMap<Long, PooledTunnelCreatorConfig> _recentlyBuildingMap;
private boolean _isRunning;
private final BuildHandler _handler;
private boolean _repoll;
private static final int MAX_CONCURRENT_BUILDS = 10;
/** accept replies up to a minute after we gave up on them */
@ -63,7 +64,7 @@ class BuildExecutor implements Runnable {
_context.statManager().createRequiredRateStat("tunnel.buildRequestTime", "Time to build a tunnel request (ms)", "Tunnels", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("tunnel.buildConfigTime", "Time to build a tunnel request (ms)", "Tunnels", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("tunnel.buildRequestZeroHopTime", "How long it takes to build a zero hop tunnel", "Tunnels", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("tunnel.pendingRemaining", "How many inbound requests are pending after a pass (period is how long the pass takes)?", "Tunnels", new long[] { 60*1000, 10*60*1000 });
//_context.statManager().createRateStat("tunnel.pendingRemaining", "How many inbound requests are pending after a pass (period is how long the pass takes)?", "Tunnels", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("tunnel.buildFailFirstHop", "How often we fail to build a OB tunnel because we can't contact the first hop", "Tunnels", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("tunnel.buildReplySlow", "Build reply late, but not too late", "Tunnels", new long[] { 10*60*1000 });
@ -81,8 +82,6 @@ class BuildExecutor implements Runnable {
statMgr.createRateStat("tunnel.tierAgreeUnknown", "Agreed joins from unknown", "Tunnels", new long[] { 60*1000, 10*60*1000 });
statMgr.createRateStat("tunnel.tierRejectUnknown", "Rejected joins from unknown", "Tunnels", new long[] { 60*1000, 10*60*1000 });
statMgr.createRateStat("tunnel.tierExpireUnknown", "Expired joins from unknown", "Tunnels", new long[] { 60*1000, 10*60*1000 });
_handler = new BuildHandler(ctx, this);
}
private int allowed() {
@ -266,8 +265,6 @@ class BuildExecutor implements Runnable {
List<TunnelPool> wanted = new ArrayList(MAX_CONCURRENT_BUILDS);
List<TunnelPool> pools = new ArrayList(8);
int pendingRemaining = 0;
//long loopBegin = 0;
//long afterBuildZeroHop = 0;
long afterBuildReal = 0;
@ -276,7 +273,7 @@ class BuildExecutor implements Runnable {
while (!_manager.isShutdown()){
//loopBegin = System.currentTimeMillis();
try {
_repoll = pendingRemaining > 0; // resets repoll to false unless there are inbound requeusts pending
_repoll = false; // resets repoll to false unless there are inbound requeusts pending
_manager.listPools(pools);
for (int i = 0; i < pools.size(); i++) {
TunnelPool pool = pools.get(i);
@ -308,7 +305,7 @@ class BuildExecutor implements Runnable {
synchronized (_currentlyBuilding) {
if (!_repoll) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("No tunnel to build with (allowed=" + allowed + ", wanted=" + wanted.size() + ", pending=" + pendingRemaining + "), wait for a while");
_log.debug("No tunnel to build with (allowed=" + allowed + ", wanted=" + wanted.size() + "), wait for a while");
try {
_currentlyBuilding.wait(1*1000+_context.random().nextInt(1*1000));
} catch (InterruptedException ie) {}
@ -369,12 +366,6 @@ class BuildExecutor implements Runnable {
afterBuildReal = System.currentTimeMillis();
pendingRemaining = _handler.handleInboundRequests();
afterHandleInbound = System.currentTimeMillis();
if (pendingRemaining > 0)
_context.statManager().addRateData("tunnel.pendingRemaining", pendingRemaining, afterHandleInbound-afterBuildReal);
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("build loop complete, tot=" + (afterHandleInbound-loopBegin) +
// " inReply=" + (afterHandleInboundReplies-beforeHandleInboundReplies) +
@ -387,7 +378,6 @@ class BuildExecutor implements Runnable {
wanted.clear();
pools.clear();
} catch (RuntimeException e) {
if (_log.shouldLog(Log.CRIT))
_log.log(Log.CRIT, "B0rked in the tunnel builder", e);
}
}
@ -568,6 +558,4 @@ class BuildExecutor implements Runnable {
}
return rv;
}
public int getInboundBuildQueueSize() { return _handler.getInboundBuildQueueSize(); }
}

View File

@ -39,26 +39,31 @@ import net.i2p.util.Log;
* and updating stats.
*
* Replies are handled immediately on reception; requests are queued.
* As of 0.8.11 the request queue is handled in a separate thread,
* it used to be called from the BuildExecutor thread loop.
*
* Note that 10 minute tunnel expiration is hardcoded in here.
*/
class BuildHandler {
class BuildHandler implements Runnable {
private final RouterContext _context;
private final Log _log;
private final TunnelPoolManager _manager;
private final BuildExecutor _exec;
private final Job _buildMessageHandlerJob;
private final Job _buildReplyMessageHandlerJob;
private final LinkedBlockingQueue<BuildMessageState> _inboundBuildMessages;
private final BuildMessageProcessor _processor;
private final ParticipatingThrottler _throttler;
private boolean _isRunning;
/** TODO these may be too high, review and adjust */
private static final int MIN_QUEUE = 12;
private static final int MAX_QUEUE = 96;
private static final int MIN_QUEUE = 18;
private static final int MAX_QUEUE = 192;
public BuildHandler(RouterContext ctx, BuildExecutor exec) {
public BuildHandler(RouterContext ctx, TunnelPoolManager manager, BuildExecutor exec) {
_context = ctx;
_log = ctx.logManager().getLog(getClass());
_manager = manager;
_exec = exec;
// Queue size = 12 * share BW / 48K
int sz = Math.min(MAX_QUEUE, Math.max(MIN_QUEUE, TunnelDispatcher.getShareBandwidth(ctx) * MIN_QUEUE / 48));
@ -82,7 +87,7 @@ class BuildHandler {
_context.statManager().createRequiredRateStat("tunnel.dropLoadBacklog", "Pending request count when dropped", "Tunnels", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRequiredRateStat("tunnel.dropLoadProactive", "Delay estimate when dropped (ms)", "Tunnels", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRequiredRateStat("tunnel.dropLoadProactiveAbort", "Allowed requests during load", "Tunnels", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("tunnel.handleRemaining", "How many pending inbound requests were left on the queue after one pass?", "Tunnels", new long[] { 60*1000, 10*60*1000 });
//_context.statManager().createRateStat("tunnel.handleRemaining", "How many pending inbound requests were left on the queue after one pass?", "Tunnels", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("tunnel.buildReplyTooSlow", "How often a tunnel build reply came back after we had given up waiting for it?", "Tunnels", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("tunnel.receiveRejectionProbabalistic", "How often we are rejected probabalistically?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
@ -105,18 +110,37 @@ class BuildHandler {
ctx.inNetMessagePool().registerHandlerJobBuilder(VariableTunnelBuildReplyMessage.MESSAGE_TYPE, tbrmhjb);
}
private static final int MAX_HANDLE_AT_ONCE = 2;
private static final int NEXT_HOP_LOOKUP_TIMEOUT = 15*1000;
/**
* Blocking call to handle a few of the pending inbound requests, returning how many
* requests remain after this pass. This is called by BuildExecutor.
* Thread to handle inbound requests
* @since 0.8.11
*/
int handleInboundRequests() {
for (int i = 0; i < MAX_HANDLE_AT_ONCE; ) {
BuildMessageState state = _inboundBuildMessages.poll();
if (state == null)
return 0;
public void run() {
_isRunning = true;
while (!_manager.isShutdown()) {
try {
handleInboundRequest();
} catch (Exception e) {
_log.log(Log.CRIT, "B0rked in the tunnel handler", e);
}
}
if (_log.shouldLog(Log.WARN))
_log.warn("Done handling");
_isRunning = false;
}
/**
* Blocking call to handle a single inbound request
*/
private void handleInboundRequest() {
BuildMessageState state = null;
try {
state = _inboundBuildMessages.take();
} catch (InterruptedException ie) {
return;
}
long dropBefore = System.currentTimeMillis() - (BuildRequestor.REQUEST_TIMEOUT/4);
if (state.recvTime <= dropBefore) {
if (_log.shouldLog(Log.WARN))
@ -124,23 +148,19 @@ class BuildHandler {
+ ", since we received it a long time ago: " + (System.currentTimeMillis() - state.recvTime));
_context.statManager().addRateData("tunnel.dropLoadDelay", System.currentTimeMillis() - state.recvTime, 0);
_context.throttle().setTunnelStatus(_x("Dropping tunnel requests: Too slow"));
continue;
return;
}
handleRequest(state);
i++;
long beforeHandle = System.currentTimeMillis();
long actualTime = handleRequest(state);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Handle took " + (System.currentTimeMillis()-beforeHandle) + "/" + actualTime +
" (" + i + " with " + _inboundBuildMessages.size() + " remaining)");
}
int remaining = _inboundBuildMessages.size();
if (remaining > 0)
_context.statManager().addRateData("tunnel.handleRemaining", remaining, 0);
return remaining;
//int remaining = _inboundBuildMessages.size();
//if (remaining > 0)
// _context.statManager().addRateData("tunnel.handleRemaining", remaining, 0);
//return remaining;
}
/**
* Blocking call to handle a single inbound reply
*/
private void handleReply(BuildReplyMessageState state) {
// search through the tunnels for a reply
long replyMessageId = state.msg.getUniqueId();
@ -157,6 +177,9 @@ class BuildHandler {
}
}
/**
* Blocking call to handle a single inbound reply
*/
private void handleReply(TunnelBuildReplyMessage msg, PooledTunnelCreatorConfig cfg, long delay) {
long requestedOn = cfg.getExpiration() - 10*60*1000;
long rtt = _context.clock().now() - requestedOn;
@ -797,6 +820,7 @@ class BuildHandler {
recvTime = System.currentTimeMillis();
}
}
/** replies for outbound tunnels that we have created */
private static class BuildReplyMessageState {
final TunnelBuildReplyMessage msg;
@ -806,6 +830,7 @@ class BuildHandler {
recvTime = System.currentTimeMillis();
}
}
/** replies for inbound tunnels we have created */
private static class BuildEndMessageState {
final TunnelBuildMessage msg;

View File

@ -26,6 +26,7 @@ import net.i2p.router.TunnelInfo;
import net.i2p.router.TunnelManagerFacade;
import net.i2p.router.TunnelPoolSettings;
import net.i2p.router.tunnel.HopConfig;
import net.i2p.router.tunnel.TunnelDispatcher;
import net.i2p.stat.RateStat;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
@ -34,7 +35,8 @@ import net.i2p.util.SimpleScheduler;
import net.i2p.util.SimpleTimer;
/**
*
* Manage all the exploratory and client tunnel pools.
* Run the tunnel builder and handler threads.
*/
public class TunnelPoolManager implements TunnelManagerFacade {
private final RouterContext _context;
@ -46,8 +48,12 @@ public class TunnelPoolManager implements TunnelManagerFacade {
private TunnelPool _inboundExploratory;
private TunnelPool _outboundExploratory;
private final BuildExecutor _executor;
private final BuildHandler _handler;
private boolean _isShutdown;
private static final long[] RATES = { 60*1000, 10*60*1000l, 60*60*1000l };
private static final int MIN_KBPS_TWO_HANDLERS = 512;
private static final int MIN_KBPS_THREE_HANDLERS = 1024;
public TunnelPoolManager(RouterContext ctx) {
_context = ctx;
@ -63,9 +69,21 @@ public class TunnelPoolManager implements TunnelManagerFacade {
_clientOutboundPools = new ConcurrentHashMap(4);
_executor = new BuildExecutor(ctx, this);
I2PThread execThread = new I2PThread(_executor, "BuildExecutor");
execThread.setDaemon(true);
I2PThread execThread = new I2PThread(_executor, "BuildExecutor", true);
execThread.start();
_handler = new BuildHandler(ctx, this, _executor);
int numHandlerThreads;
int share = TunnelDispatcher.getShareBandwidth(ctx);
if (share >= MIN_KBPS_THREE_HANDLERS)
numHandlerThreads = 3;
else if (share >= MIN_KBPS_TWO_HANDLERS)
numHandlerThreads = 2;
else
numHandlerThreads = 1;
for (int i = 1; i <= numHandlerThreads; i++) {
I2PThread hThread = new I2PThread(_handler, "BuildHandler " + i + '/' + numHandlerThreads, true);
hThread.start();
}
// The following are for TestJob
ctx.statManager().createRequiredRateStat("tunnel.testFailedTime", "Time for tunnel test failure (ms)", "Tunnels",
@ -550,7 +568,7 @@ public class TunnelPoolManager implements TunnelManagerFacade {
BuildExecutor getExecutor() { return _executor; }
boolean isShutdown() { return _isShutdown; }
public int getInboundBuildQueueSize() { return _executor.getInboundBuildQueueSize(); }
public int getInboundBuildQueueSize() { return _handler.getInboundBuildQueueSize(); }
/** @deprecated moved to routerconsole */
public void renderStatusHTML(Writer out) throws IOException {