* Streaming: Add support for connection throttling

This commit is contained in:
zzz
2010-05-10 14:15:31 +00:00
parent c2b73d9fb5
commit d843646b4f
3 changed files with 154 additions and 4 deletions

View File

@ -0,0 +1,59 @@
package net.i2p.client.streaming;
import java.util.concurrent.atomic.AtomicInteger;
import net.i2p.data.Hash;
import net.i2p.util.ObjectCounter;
import net.i2p.util.SimpleScheduler;
import net.i2p.util.SimpleTimer;
/**
* Count how often we have received an incoming connection
* This offers basic DOS protection but is not a complete solution.
*
* @since 0.7.14
*/
class ConnThrottler {
private final ObjectCounter<Hash> counter;
private final int _max;
private final int _totalMax;
private final AtomicInteger _currentTotal;
/*
* @param max per-peer, 0 for unlimited
* @param totalMax for all peers, 0 for unlimited
* @param period ms
*/
ConnThrottler(int max, int totalMax, long period) {
_max = max;
_totalMax = totalMax;
if (max > 0) {
SimpleScheduler.getInstance().addPeriodicEvent(new Cleaner(), period);
this.counter = new ObjectCounter();
} else {
this.counter = null;
}
if (totalMax > 0)
_currentTotal = new AtomicInteger();
else
_currentTotal = null;
}
/** increments before checking */
boolean shouldThrottle(Hash h) {
if (_totalMax > 0 && _currentTotal.incrementAndGet() > _totalMax)
return true;
if (_max > 0)
return this.counter.increment(h) > _max;
return false;
}
private class Cleaner implements SimpleTimer.TimedEvent {
public void timeReached() {
if (_totalMax > 0)
_currentTotal.set(0);
if (_max > 0)
ConnThrottler.this.counter.clear();
}
}
}

View File

@ -10,6 +10,7 @@ import net.i2p.I2PAppContext;
import net.i2p.I2PException;
import net.i2p.client.I2PSession;
import net.i2p.data.Destination;
import net.i2p.data.Hash;
import net.i2p.data.SessionKey;
import net.i2p.util.Log;
import net.i2p.util.SimpleTimer;
@ -35,10 +36,14 @@ public class ConnectionManager {
/** Ping ID (Long) to PingRequest */
private final Map<Long, PingRequest> _pendingPings;
private boolean _allowIncoming;
private boolean _throttlersInitialized;
private int _maxConcurrentStreams;
private ConnectionOptions _defaultOptions;
private volatile int _numWaiting;
private long SoTimeout;
private ConnThrottler _minuteThrottler;
private ConnThrottler _hourThrottler;
private ConnThrottler _dayThrottler;
public ConnectionManager(I2PAppContext context, I2PSession session, int maxConcurrent, ConnectionOptions defaultOptions) {
_context = context;
@ -106,7 +111,23 @@ public class ConnectionManager {
public void setAllowIncomingConnections(boolean allow) {
_connectionHandler.setActive(allow);
if (allow && !_throttlersInitialized) {
_throttlersInitialized = true;
if (_defaultOptions.getMaxConnsPerMinute() > 0 || _defaultOptions.getMaxTotalConnsPerMinute() > 0) {
_context.statManager().createRateStat("stream.con.throttledMinute", "Dropped for conn limit", "Stream", new long[] { 5*60*1000 });
_minuteThrottler = new ConnThrottler(_defaultOptions.getMaxConnsPerMinute(), _defaultOptions.getMaxTotalConnsPerMinute(), 60*1000);
}
if (_defaultOptions.getMaxConnsPerHour() > 0 || _defaultOptions.getMaxTotalConnsPerHour() > 0) {
_context.statManager().createRateStat("stream.con.throttledHour", "Dropped for conn limit", "Stream", new long[] { 5*60*1000 });
_hourThrottler = new ConnThrottler(_defaultOptions.getMaxConnsPerHour(), _defaultOptions.getMaxTotalConnsPerHour(), 60*60*1000);
}
if (_defaultOptions.getMaxConnsPerDay() > 0 || _defaultOptions.getMaxTotalConnsPerDay() > 0) {
_context.statManager().createRateStat("stream.con.throttledDay", "Dropped for conn limit", "Stream", new long[] { 5*60*1000 });
_dayThrottler = new ConnThrottler(_defaultOptions.getMaxConnsPerDay(), _defaultOptions.getMaxTotalConnsPerDay(), 24*60*60*1000);
}
}
}
/** @return if we should accept connections */
public boolean getAllowIncomingConnections() {
return _connectionHandler.getActive();
@ -140,8 +161,15 @@ public class ConnectionManager {
+ _maxConcurrentStreams + " connections");
reject = true;
} else if (shouldRejectConnection(synPacket)) {
_log.error("Refusing connection since peer is " +
(_defaultOptions.isAccessListEnabled() ? "not whitelisted: " : "blacklisted: ") +
// this may not be right if more than one is enabled
String why;
if (_defaultOptions.isAccessListEnabled())
why = "not whitelisted: ";
else if (_defaultOptions.isBlacklistEnabled())
why = "blacklisted: ";
else
why = "throttled: ";
_log.error("Refusing connection since peer is " + why +
(synPacket.getOptionalFrom() == null ? "null from" : synPacket.getOptionalFrom().calculateHash().toBase64()));
reject = true;
} else {
@ -281,11 +309,28 @@ public class ConnectionManager {
Destination from = syn.getOptionalFrom();
if (from == null)
return true;
Hash h = from.calculateHash();
boolean throttled = false;
// always call all 3 to increment all counters
if (_minuteThrottler != null && _minuteThrottler.shouldThrottle(h)) {
_context.statManager().addRateData("stream.con.throttledMinute", 1, 0);
throttled = true;
}
if (_hourThrottler != null && _hourThrottler.shouldThrottle(h)) {
_context.statManager().addRateData("stream.con.throttledHour", 1, 0);
throttled = true;
}
if (_dayThrottler != null && _dayThrottler.shouldThrottle(h)) {
_context.statManager().addRateData("stream.con.throttledDay", 1, 0);
throttled = true;
}
if (throttled)
return true;
// if the sig is absent or bad it will be caught later (in CPH)
if (_defaultOptions.isAccessListEnabled())
return !_defaultOptions.getAccessList().contains(from.calculateHash());
return !_defaultOptions.getAccessList().contains(h);
if (_defaultOptions.isBlacklistEnabled())
return _defaultOptions.getBlacklist().contains(from.calculateHash());
return _defaultOptions.getBlacklist().contains(h);
return false;
}

View File

@ -40,6 +40,12 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
private boolean _blackListEnabled;
private Set<Hash> _accessList;
private Set<Hash> _blackList;
private int _maxConnsPerMinute;
private int _maxConnsPerHour;
private int _maxConnsPerDay;
private int _maxTotalConnsPerMinute;
private int _maxTotalConnsPerHour;
private int _maxTotalConnsPerDay;
public static final int PROFILE_BULK = 1;
public static final int PROFILE_INTERACTIVE = 2;
@ -67,9 +73,17 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
public static final String PROP_CONGESTION_AVOIDANCE_GROWTH_RATE_FACTOR = "i2p.streaming.congestionAvoidanceGrowthRateFactor";
public static final String PROP_SLOW_START_GROWTH_RATE_FACTOR = "i2p.streaming.slowStartGrowthRateFactor";
public static final String PROP_ANSWER_PINGS = "i2p.streaming.answerPings";
/** all of these are @since 0.7.13 */
public static final String PROP_ENABLE_ACCESS_LIST = "i2cp.enableAccessList";
public static final String PROP_ENABLE_BLACKLIST = "i2cp.enableBlackList";
public static final String PROP_ACCESS_LIST = "i2cp.accessList";
/** all of these are @since 0.7.14 */
public static final String PROP_MAX_CONNS_MIN = "i2p.streaming.maxConnsPerMinute";
public static final String PROP_MAX_CONNS_HOUR = "i2p.streaming.maxConnsPerHour";
public static final String PROP_MAX_CONNS_DAY = "i2p.streaming.maxConnsPerDay";
public static final String PROP_MAX_TOTAL_CONNS_MIN = "i2p.streaming.maxTotalConnsPerMinute";
public static final String PROP_MAX_TOTAL_CONNS_HOUR = "i2p.streaming.maxTotalConnsPerHour";
public static final String PROP_MAX_TOTAL_CONNS_DAY = "i2p.streaming.maxTotalConnsPerDay";
private static final int TREND_COUNT = 3;
static final int INITIAL_WINDOW_SIZE = 6;
@ -222,6 +236,12 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
setReadTimeout(opts.getReadTimeout());
setAnswerPings(opts.getAnswerPings());
initLists(opts);
_maxConnsPerMinute = opts.getMaxConnsPerMinute();
_maxConnsPerHour = opts.getMaxConnsPerHour();
_maxConnsPerDay = opts.getMaxConnsPerDay();
_maxTotalConnsPerMinute = opts.getMaxTotalConnsPerMinute();
_maxTotalConnsPerHour = opts.getMaxTotalConnsPerHour();
_maxTotalConnsPerDay = opts.getMaxTotalConnsPerDay();
}
}
@ -248,6 +268,12 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
setConnectTimeout(getInt(opts, PROP_CONNECT_TIMEOUT, Connection.DISCONNECT_TIMEOUT));
setAnswerPings(getBool(opts, PROP_ANSWER_PINGS, DEFAULT_ANSWER_PINGS));
initLists(opts);
_maxConnsPerMinute = getInt(opts, PROP_MAX_CONNS_MIN, 0);
_maxConnsPerHour = getInt(opts, PROP_MAX_CONNS_HOUR, 0);
_maxConnsPerDay = getInt(opts, PROP_MAX_CONNS_DAY, 0);
_maxTotalConnsPerMinute = getInt(opts, PROP_MAX_TOTAL_CONNS_MIN, 0);
_maxTotalConnsPerHour = getInt(opts, PROP_MAX_TOTAL_CONNS_HOUR, 0);
_maxTotalConnsPerDay = getInt(opts, PROP_MAX_TOTAL_CONNS_DAY, 0);
}
@Override
@ -291,6 +317,18 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
if (opts.containsKey(PROP_ANSWER_PINGS))
setAnswerPings(getBool(opts, PROP_ANSWER_PINGS, DEFAULT_ANSWER_PINGS));
initLists(opts);
if (opts.containsKey(PROP_MAX_CONNS_MIN))
_maxConnsPerMinute = getInt(opts, PROP_MAX_CONNS_MIN, 0);
if (opts.containsKey(PROP_MAX_CONNS_HOUR))
_maxConnsPerHour = getInt(opts, PROP_MAX_CONNS_HOUR, 0);
if (opts.containsKey(PROP_MAX_CONNS_DAY))
_maxConnsPerDay = getInt(opts, PROP_MAX_CONNS_DAY, 0);
if (opts.containsKey(PROP_MAX_TOTAL_CONNS_MIN))
_maxTotalConnsPerMinute = getInt(opts, PROP_MAX_TOTAL_CONNS_MIN, 0);
if (opts.containsKey(PROP_MAX_TOTAL_CONNS_HOUR))
_maxTotalConnsPerHour = getInt(opts, PROP_MAX_TOTAL_CONNS_HOUR, 0);
if (opts.containsKey(PROP_MAX_TOTAL_CONNS_DAY))
_maxTotalConnsPerDay = getInt(opts, PROP_MAX_TOTAL_CONNS_DAY, 0);
}
/**
@ -523,6 +561,14 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
public int getSlowStartGrowthRateFactor() { return _slowStartGrowthRateFactor; }
public void setSlowStartGrowthRateFactor(int factor) { _slowStartGrowthRateFactor = factor; }
/** all of these are @since 0.7.14; no public setters */
public int getMaxConnsPerMinute() { return _maxConnsPerMinute; }
public int getMaxConnsPerHour() { return _maxConnsPerHour; }
public int getMaxConnsPerDay() { return _maxConnsPerDay; }
public int getMaxTotalConnsPerMinute() { return _maxConnsPerMinute; }
public int getMaxTotalConnsPerHour() { return _maxTotalConnsPerHour; }
public int getMaxTotalConnsPerDay() { return _maxTotalConnsPerDay; }
public boolean isAccessListEnabled() { return _accessListEnabled; }
public boolean isBlacklistEnabled() { return _blackListEnabled; }
public Set<Hash> getAccessList() { return _accessList; }