diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java index 4c4277985..543406b7f 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java @@ -54,45 +54,40 @@ class ConnectionHandler { if (_log.shouldLog(Log.DEBUG)) _log.debug("Accept("+ timeoutMs+") called"); - long expiration = timeoutMs; - if (expiration > 0) - expiration += _context.clock().now(); - Packet syn = null; - synchronized (_synQueue) { - while ( _active && (_synQueue.size() <= 0) ) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Accept("+ timeoutMs+"): active=" + _active + " queue: " + _synQueue.size()); - if (timeoutMs <= 0) { - try { _synQueue.wait(); } catch (InterruptedException ie) {} - } else { - long remaining = expiration - _context.clock().now(); - if (remaining < 0) - break; - try { _synQueue.wait(remaining); } catch (InterruptedException ie) {} + long expiration = timeoutMs + _context.clock().now(); + while (true) { + if ( (timeoutMs > 0) && (expiration < _context.clock().now()) ) + return null; + if (!_active) + return null; + + Packet syn = null; + synchronized (_synQueue) { + while ( _active && (_synQueue.size() <= 0) ) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Accept("+ timeoutMs+"): active=" + _active + " queue: " + + _synQueue.size()); + if (timeoutMs <= 0) { + try { _synQueue.wait(); } catch (InterruptedException ie) {} + } else { + long remaining = expiration - _context.clock().now(); + if (remaining < 0) + break; + try { _synQueue.wait(remaining); } catch (InterruptedException ie) {} + } + } + if (_active && _synQueue.size() > 0) { + syn = (Packet)_synQueue.remove(0); } } - if (_active && _synQueue.size() > 0) { - syn = (Packet)_synQueue.remove(0); + + if (syn != null) { + // deal with forged / invalid syn packets + Connection con = _manager.receiveConnection(syn); + if (con != null) + return con; } - } - - if (syn != null) { - // deal with forged / invalid syn packets - Connection con = _manager.receiveConnection(syn); - if (con != null) { - return con; - } else if (timeoutMs > 0) { - long remaining = expiration - _context.clock().now(); - if (remaining <= 0) { - return null; - } else { - return accept(remaining); - } - } else { - return accept(timeoutMs); - } - } else { - return null; + // keep looping... } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java index c7b39c299..8f7588666 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java @@ -35,9 +35,11 @@ public class ConnectionManager { /** Ping ID (ByteArray) to PingRequest */ private Map _pendingPings; private boolean _allowIncoming; + private int _maxConcurrentStreams; + private volatile int _numWaiting; private Object _connectionLock; - public ConnectionManager(I2PAppContext context, I2PSession session) { + public ConnectionManager(I2PAppContext context, I2PSession session, int maxConcurrent) { _context = context; _log = context.logManager().getLog(ConnectionManager.class); _connectionByInboundId = new HashMap(32); @@ -52,6 +54,8 @@ public class ConnectionManager { session.setSessionListener(_messageHandler); _outboundQueue = new PacketQueue(context, session); _allowIncoming = false; + _maxConcurrentStreams = maxConcurrent; + _numWaiting = 0; } Connection getConnectionByInboundId(byte[] id) { @@ -77,19 +81,40 @@ public class ConnectionManager { Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler); byte receiveId[] = new byte[4]; _context.random().nextBytes(receiveId); + boolean reject = false; synchronized (_connectionLock) { - while (true) { - Connection oldCon = (Connection)_connectionByInboundId.put(new ByteArray(receiveId), con); - if (oldCon == null) { - break; - } else { - _connectionByInboundId.put(new ByteArray(receiveId), oldCon); - // receiveId already taken, try another - _context.random().nextBytes(receiveId); + if (locked_tooManyStreams()) { + reject = true; + } else { + while (true) { + Connection oldCon = (Connection)_connectionByInboundId.put(new ByteArray(receiveId), con); + if (oldCon == null) { + break; + } else { + _connectionByInboundId.put(new ByteArray(receiveId), oldCon); + // receiveId already taken, try another + _context.random().nextBytes(receiveId); + } } } } + if (reject) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Refusing connection since we have exceeded our max of " + + _maxConcurrentStreams + " connections"); + PacketLocal reply = new PacketLocal(_context, synPacket.getOptionalFrom()); + reply.setFlag(Packet.FLAG_RESET); + reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED); + reply.setAckThrough(synPacket.getSequenceNum()); + reply.setSendStreamId(synPacket.getReceiveStreamId()); + reply.setReceiveStreamId(null); + reply.setOptionalFrom(_session.getMyDestination()); + // this just sends the packet - no retries or whatnot + _outboundQueue.enqueue(reply); + return null; + } + con.setReceiveStreamId(receiveId); try { con.getPacketHandler().receivePacket(synPacket, con); @@ -102,24 +127,59 @@ public class ConnectionManager { return con; } + private static final long DEFAULT_STREAM_DELAY_MAX = 10*1000; + /** * Build a new connection to the given peer. This blocks if there is no * connection delay, otherwise it returns immediately. * + * @return new connection, or null if we have exceeded our limit */ public Connection connect(Destination peer, ConnectionOptions opts) { - Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, opts); - con.setRemotePeer(peer); + Connection con = null; byte receiveId[] = new byte[4]; - _context.random().nextBytes(receiveId); - synchronized (_connectionLock) { - ByteArray ba = new ByteArray(receiveId); - while (_connectionByInboundId.containsKey(ba)) { - _context.random().nextBytes(receiveId); + long expiration = _context.clock().now() + opts.getConnectTimeout(); + if (opts.getConnectTimeout() <= 0) + expiration = _context.clock().now() + DEFAULT_STREAM_DELAY_MAX; + _numWaiting++; + while (true) { + if (expiration < _context.clock().now()) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Refusing to connect since we have exceeded our max of " + + _maxConcurrentStreams + " connections"); + _numWaiting--; + return null; } - _connectionByInboundId.put(ba, con); + con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, opts); + con.setRemotePeer(peer); + _context.random().nextBytes(receiveId); + boolean reject = false; + synchronized (_connectionLock) { + if (locked_tooManyStreams()) { + // allow a full buffer of pending/waiting streams + if (_numWaiting > _maxConcurrentStreams) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Refusing connection since we have exceeded our max of " + + _maxConcurrentStreams + " and there are " + _numWaiting + + " waiting already"); + _numWaiting--; + return null; + } + + reject = true; + } else { + ByteArray ba = new ByteArray(receiveId); + while (_connectionByInboundId.containsKey(ba)) { + _context.random().nextBytes(receiveId); + } + _connectionByInboundId.put(ba, con); + } + } + if (!reject) + break; } - + + // ok we're in... con.setReceiveStreamId(receiveId); con.eventOccurred(); @@ -127,9 +187,24 @@ public class ConnectionManager { if (opts.getConnectDelay() <= 0) { con.waitForConnect(); } + if (_numWaiting > 0) + _numWaiting--; return con; } + private boolean locked_tooManyStreams() { + if (_maxConcurrentStreams <= 0) return false; + if (_connectionByInboundId.size() < _maxConcurrentStreams) return false; + + int active = 0; + for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) { + Connection con = (Connection)iter.next(); + if (con.getIsConnected()) + active++; + } + return (active >= _maxConcurrentStreams); + } + public MessageHandler getMessageHandler() { return _messageHandler; } public PacketHandler getPacketHandler() { return _packetHandler; } public ConnectionHandler getConnectionHandler() { return _connectionHandler; } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java index 422d49451..ef2330b8e 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java @@ -36,6 +36,7 @@ public class I2PSocketManagerFull implements I2PSocketManager { private ConnectionOptions _defaultOptions; private long _acceptTimeout; private String _name; + private int _maxStreams; private static int __managerId = 0; private ConnectionManager _connectionManager; @@ -54,6 +55,9 @@ public class I2PSocketManagerFull implements I2PSocketManager { init(context, session, opts, name); } + /** how many streams will we allow at once? */ + public static final String PROP_MAX_STREAMS = "i2p.streaming.maxConcurrentStreams"; + /** * */ @@ -61,7 +65,17 @@ public class I2PSocketManagerFull implements I2PSocketManager { _context = context; _session = session; _log = _context.logManager().getLog(I2PSocketManagerFull.class); - _connectionManager = new ConnectionManager(_context, _session); + + _maxStreams = -1; + try { + String num = (opts != null ? opts.getProperty(PROP_MAX_STREAMS, "-1") : "-1"); + _maxStreams = Integer.parseInt(num); + } catch (NumberFormatException nfe) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Invalid max # of concurrent streams, defaulting to unlimited", nfe); + _maxStreams = -1; + } + _connectionManager = new ConnectionManager(_context, _session, _maxStreams); _name = name + " " + (++__managerId); _acceptTimeout = ACCEPT_TIMEOUT_DEFAULT; _defaultOptions = new ConnectionOptions(opts); @@ -140,6 +154,8 @@ public class I2PSocketManagerFull implements I2PSocketManager { else opts = new ConnectionOptions(options); Connection con = _connectionManager.connect(peer, opts); + if (con == null) + throw new TooManyStreamsException("Too many streams (max " + _maxStreams + ")"); I2PSocketFull socket = new I2PSocketFull(con); con.setSocket(socket); if (con.getConnectionError() != null) { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/TooManyStreamsException.java b/apps/streaming/java/src/net/i2p/client/streaming/TooManyStreamsException.java new file mode 100644 index 000000000..d53ef08a9 --- /dev/null +++ b/apps/streaming/java/src/net/i2p/client/streaming/TooManyStreamsException.java @@ -0,0 +1,21 @@ +package net.i2p.client.streaming; + +import net.i2p.I2PException; + +/** + * We attempted to have more open streams than we are willing to put up with + * + */ +public class TooManyStreamsException extends I2PException { + public TooManyStreamsException(String message, Throwable parent) { + super(message, parent); + } + + public TooManyStreamsException(String message) { + super(message); + } + + public TooManyStreamsException() { + super(); + } +}