diff --git a/core/java/src/net/i2p/stat/StatManager.java b/core/java/src/net/i2p/stat/StatManager.java index fc1b18fdf0..edf865d9a1 100644 --- a/core/java/src/net/i2p/stat/StatManager.java +++ b/core/java/src/net/i2p/stat/StatManager.java @@ -53,7 +53,7 @@ public class StatManager { "tunnel.buildRatio.*,tunnel.buildFailure,tunnel.buildSuccess,tunnel.corruptMessage," + "tunnel.decryptRequestTime,tunnel.fragmentedDropped,tunnel.participatingMessageCount,"+ "tunnel.participatingTunnels,tunnel.testFailedTime,tunnel.testSuccessTime," + - "udp.sendPacketSize,udp.packetsRetransmitted" ; + "tunnel.participatingBandwidth,udp.sendPacketSize,udp.packetsRetransmitted" ; /** * The stat manager should only be constructed and accessed through the diff --git a/history.txt b/history.txt index 5599a0613d..bfb6b6d677 100644 --- a/history.txt +++ b/history.txt @@ -1,3 +1,17 @@ +2008-10-10 zzz + * Profiles: Reduce reject penalty in + capacity calculation to avoid a congestion collapse + * Throttle: Change reject to BANDWIDTH from CRIT on shutdown + for improved anonymity + * Tunnels: Implement random discard to enforce share limit + * Tunnel Tests: Add time for outbound delay, to avoid + congestion collapse + * UDPPacketReader: Adjust logging + * build files: Change to source=1.5, target=1.5 + * configpeer.jsp: Table cleanup + * i2psnark: Change default tunnel length from 1+1 to 2+0 + * peers.jsp: Change <,> to in,out for UDP + 2008-10-09 sponge * Update version to -3 * BOB database threadlocking fixes diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 88ab2c4ad9..19b0794b59 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -17,7 +17,7 @@ import net.i2p.CoreVersion; public class RouterVersion { public final static String ID = "$Revision: 1.548 $ $Date: 2008-06-07 23:00:00 $"; public final static String VERSION = "0.6.4"; - public final static long BUILD = 3; + public final static long BUILD = 4; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION + "-" + BUILD); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/tunnel/HopConfig.java b/router/java/src/net/i2p/router/tunnel/HopConfig.java index ef7ef5535d..740cf9025c 100644 --- a/router/java/src/net/i2p/router/tunnel/HopConfig.java +++ b/router/java/src/net/i2p/router/tunnel/HopConfig.java @@ -28,6 +28,8 @@ public class HopConfig { private Map _options; private long _messagesProcessed; private long _oldMessagesProcessed; + private long _messagesSent; + private long _oldMessagesSent; /** IV length for {@link #getReplyIV} */ public static final int REPLY_IV_LENGTH = 16; @@ -44,6 +46,8 @@ public class HopConfig { _options = null; _messagesProcessed = 0; _oldMessagesProcessed = 0; + _messagesSent = 0; + _oldMessagesSent = 0; } /** what tunnel ID are we receiving on? */ @@ -115,6 +119,7 @@ public class HopConfig { public void setOptions(Map options) { _options = options; } /** take note of a message being pumped through this tunnel */ + /** "processed" is for incoming and "sent" is for outgoing (could be dropped in between) */ public void incrementProcessedMessages() { _messagesProcessed++; } public long getProcessedMessagesCount() { return _messagesProcessed; } public long getRecentMessagesCount() { @@ -122,6 +127,13 @@ public class HopConfig { _oldMessagesProcessed = _messagesProcessed; return rv; } + public void incrementSentMessages() { _messagesSent++; } + public long getSentMessagesCount() { return _messagesSent; } + public long getRecentSentMessagesCount() { + long rv = _messagesSent - _oldMessagesSent; + _oldMessagesSent = _messagesSent; + return rv; + } public String toString() { StringBuffer buf = new StringBuffer(64); diff --git a/router/java/src/net/i2p/router/tunnel/InboundGatewayReceiver.java b/router/java/src/net/i2p/router/tunnel/InboundGatewayReceiver.java index 1d198a0b48..822c1f6370 100644 --- a/router/java/src/net/i2p/router/tunnel/InboundGatewayReceiver.java +++ b/router/java/src/net/i2p/router/tunnel/InboundGatewayReceiver.java @@ -22,6 +22,8 @@ public class InboundGatewayReceiver implements TunnelGateway.Receiver { return receiveEncrypted(encrypted, false); } public long receiveEncrypted(byte[] encrypted, boolean alreadySearched) { + if (!alreadySearched) + _config.incrementProcessedMessages(); if (_target == null) { _target = _context.netDb().lookupRouterInfoLocally(_config.getSendTo()); if (_target == null) { @@ -33,7 +35,9 @@ public class InboundGatewayReceiver implements TunnelGateway.Receiver { } } - _config.incrementProcessedMessages(); + if (_context.tunnelDispatcher().shouldDropParticipatingMessage()) + return -1; + _config.incrementSentMessages(); TunnelDataMessage msg = new TunnelDataMessage(_context); msg.setData(encrypted); msg.setTunnelId(_config.getSendTunnel()); diff --git a/router/java/src/net/i2p/router/tunnel/OutboundTunnelEndpoint.java b/router/java/src/net/i2p/router/tunnel/OutboundTunnelEndpoint.java index 18c2caefb4..9aa9e667b1 100644 --- a/router/java/src/net/i2p/router/tunnel/OutboundTunnelEndpoint.java +++ b/router/java/src/net/i2p/router/tunnel/OutboundTunnelEndpoint.java @@ -41,6 +41,11 @@ public class OutboundTunnelEndpoint { + " to be forwarded on to " + (toRouter != null ? toRouter.toBase64().substring(0,4) : "") + (toTunnel != null ? toTunnel.getTunnelId() + "" : "")); + // don't drop it if we are the target + if ((!_context.routerHash().equals(toRouter)) && + _context.tunnelDispatcher().shouldDropParticipatingMessage()) + return; + _config.incrementSentMessages(); _outDistributor.distribute(msg, toRouter, toTunnel); } } diff --git a/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java b/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java index 2fd3d160a2..18bb6273c0 100644 --- a/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java +++ b/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java @@ -18,6 +18,8 @@ import net.i2p.router.Router; import net.i2p.router.RouterContext; import net.i2p.router.Service; import net.i2p.router.peermanager.PeerProfile; +import net.i2p.stat.Rate; +import net.i2p.stat.RateStat; import net.i2p.util.Log; /** @@ -110,6 +112,12 @@ public class TunnelDispatcher implements Service { ctx.statManager().createRateStat("tunnel.participatingBandwidth", "Participating traffic", "Tunnels", new long[] { 60*1000l, 60*10*1000l }); + ctx.statManager().createRateStat("tunnel.participatingBandwidthOut", + "Participating traffic", "Tunnels", + new long[] { 60*1000l, 60*10*1000l }); + ctx.statManager().createRateStat("tunnel.participatingMessageDropped", + "Dropped for exceeding share limit", "Tunnels", + new long[] { 60*1000l, 60*10*1000l }); ctx.statManager().createRateStat("tunnel.participatingMessageCount", "How many messages are sent through a participating tunnel?", "Tunnels", new long[] { 60*1000l, 60*10*1000l, 60*60*1000l }); @@ -550,6 +558,7 @@ public class TunnelDispatcher implements Service { int size = participating.size(); long count = 0; long bw = 0; + long bwOut = 0; long tcount = 0; long tooYoung = _context.clock().now() - 60*1000; long tooOld = tooYoung - 9*60*1000; @@ -557,6 +566,7 @@ public class TunnelDispatcher implements Service { HopConfig cfg = (HopConfig)participating.get(i); long c = cfg.getRecentMessagesCount(); bw += c; + bwOut += cfg.getRecentSentMessagesCount(); long created = cfg.getCreation(); if (created > tooYoung || created < tooOld) continue; @@ -567,9 +577,64 @@ public class TunnelDispatcher implements Service { count = count * 30 / tcount; _context.statManager().addRateData("tunnel.participatingMessageCount", count, 20*1000); _context.statManager().addRateData("tunnel.participatingBandwidth", bw*1024/20, 20*1000); + _context.statManager().addRateData("tunnel.participatingBandwidthOut", bwOut*1024/20, 20*1000); _context.statManager().addRateData("tunnel.participatingTunnels", size, 0); } + /** + * Implement random early discard (RED) to enforce the share bandwidth limit. + * For now, this does not enforce the available bandwidth, + * we leave that to Throttle. + * This is similar to the code in ../RouterThrottleImpl.java + * We drop in proportion to how far over the limit we are. + * Perhaps an exponential function would be better? + */ + public boolean shouldDropParticipatingMessage() { + RateStat rs = _context.statManager().getRate("tunnel.participatingBandwidth"); + if (rs == null) + return false; + Rate r = rs.getRate(60*1000); + if (r == null) + return false; + // weight current period higher + long count = r.getLastEventCount() + (3 * r.getCurrentEventCount()); + int bw = 0; + if (count > 0) + bw = (int) ((r.getLastTotalValue() + (3 * r.getCurrentTotalValue())) / count); + else + bw = (int) r.getLifetimeAverageValue(); + + int usedIn = Math.min(_context.router().get1sRateIn(), _context.router().get15sRateIn()); + usedIn = Math.min(usedIn, bw); + if (usedIn <= 0) + return false; + int usedOut = Math.min(_context.router().get1sRate(true), _context.router().get15sRate(true)); + usedOut = Math.min(usedOut, bw); + if (usedOut <= 0) + return false; + int used = Math.min(usedIn, usedOut); + int maxKBps = Math.min(_context.bandwidthLimiter().getInboundKBytesPerSecond(), + _context.bandwidthLimiter().getOutboundKBytesPerSecond()); + float share = (float) _context.router().getSharePercentage(); + + // start dropping at 95% of the limit + float maxBps = maxKBps * share * 1024f * 0.95f; + float pctDrop = (used - maxBps) / used; + if (pctDrop <= 0) + return false; + float rand = _context.random().nextFloat(); + boolean reject = rand <= pctDrop; + if (reject) { + if (_log.shouldLog(Log.WARN)) { + int availBps = (int) (((maxKBps*1024)*share) - used); + _log.warn("Drop part. msg. avail/max/used " + availBps + "/" + (int) maxBps + "/" + + used + " %Drop = " + pctDrop); + } + _context.statManager().addRateData("tunnel.participatingMessageDropped", 1, 0); + } + return reject; + } + private static final int DROP_BASE_INTERVAL = 40 * 1000; private static final int DROP_RANDOM_BOOST = 10 * 1000; @@ -685,9 +750,9 @@ public class TunnelDispatcher implements Service { // already scheduled for the future, and before this expiration } } - if (_log.shouldLog(Log.INFO)) { + if (_log.shouldLog(Log.DEBUG)) { long now = getContext().clock().now(); - _log.info("Scheduling leave in " + DataHelper.formatDuration(dropTime.longValue()-now) +": " + cfg); + _log.debug("Scheduling leave in " + DataHelper.formatDuration(dropTime.longValue()-now) +": " + cfg); } } diff --git a/router/java/src/net/i2p/router/tunnel/TunnelParticipant.java b/router/java/src/net/i2p/router/tunnel/TunnelParticipant.java index af334ea705..5fc8708572 100644 --- a/router/java/src/net/i2p/router/tunnel/TunnelParticipant.java +++ b/router/java/src/net/i2p/router/tunnel/TunnelParticipant.java @@ -75,6 +75,7 @@ public class TunnelParticipant { } if ( (_config != null) && (_config.getSendTo() != null) ) { + _config.incrementProcessedMessages(); RouterInfo ri = _nextHopCache; if (ri == null) ri = _context.netDb().lookupRouterInfoLocally(_config.getSendTo()); @@ -82,10 +83,10 @@ public class TunnelParticipant { if (_log.shouldLog(Log.DEBUG)) _log.debug("Send off to nextHop directly (" + _config.getSendTo().toBase64().substring(0,4) + " for " + msg); - _config.incrementProcessedMessages(); send(_config, msg, ri); - if (_config != null) - incrementThroughput(_config.getReceiveFrom()); + // see comments below + //if (_config != null) + // incrementThroughput(_config.getReceiveFrom()); } else { if (_log.shouldLog(Log.WARN)) _log.warn("Lookup the nextHop (" + _config.getSendTo().toBase64().substring(0,4) @@ -109,6 +110,7 @@ public class TunnelParticipant { * influence who we spend our time profiling is dangerous, so this will be disabled for * now. */ +/**** private void incrementThroughput(Hash prev) { if (true) return; long now = System.currentTimeMillis(); @@ -123,6 +125,7 @@ public class TunnelParticipant { _periodMessagesTransferred++; } } +****/ public int getCompleteCount() { if (_handler != null) @@ -147,6 +150,9 @@ public class TunnelParticipant { } private void send(HopConfig config, TunnelDataMessage msg, RouterInfo ri) { + if (_context.tunnelDispatcher().shouldDropParticipatingMessage()) + return; + _config.incrementSentMessages(); long oldId = msg.getUniqueId(); long newId = _context.random().nextLong(I2NPMessage.MAX_ID_VALUE); _context.messageHistory().wrap("TunnelDataMessage", oldId, "TunnelDataMessage", newId); @@ -221,4 +227,4 @@ public class TunnelParticipant { return "inbound endpoint"; } } -} \ No newline at end of file +}