* ack packets with a payload, even if they have ID=0 (duh)

* properly implement the connection timeout
* make sure we clear the outbound packets on close
* don't b0rk on repeated close() calls
This commit is contained in:
jrandom
2004-11-13 09:49:31 +00:00
committed by zzz
parent 5f7982540f
commit 258244fed8
8 changed files with 77 additions and 28 deletions

View File

@@ -68,7 +68,7 @@ public class Connection {
/** wait up to 5 minutes after disconnection so we can ack/close packets */
public static int DISCONNECT_TIMEOUT = 5*60*1000;
/** lets be sane.. no more than 32 packets in the air in each dir */
/** lets be sane- no more than 32 packets in the air in each dir */
public static final int MAX_WINDOW_SIZE = 32;
public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser, PacketQueue queue, ConnectionPacketHandler handler) {
@@ -350,6 +350,15 @@ public class Connection {
+ toString());
_connectionManager.removeConnection(this);
}
synchronized (_outboundPackets) {
for (Iterator iter = _outboundPackets.values().iterator(); iter.hasNext(); ) {
PacketLocal pl = (PacketLocal)iter.next();
pl.cancelled();
}
_outboundPackets.clear();
_outboundPackets.notifyAll();
}
}
private class DisconnectEvent implements SimpleTimer.TimedEvent {

View File

@@ -10,6 +10,7 @@ import net.i2p.I2PAppContext;
import net.i2p.I2PException;
import net.i2p.client.I2PSession;
import net.i2p.data.ByteArray;
import net.i2p.data.DataHelper;
import net.i2p.data.Destination;
import net.i2p.data.SessionKey;
import net.i2p.util.SimpleTimer;
@@ -63,6 +64,20 @@ public class ConnectionManager {
return (Connection)_connectionByInboundId.get(new ByteArray(id));
}
}
/**
* not guaranteed to be unique, but in case we receive more than one packet
* on an inbound connection that we havent ack'ed yet...
*/
Connection getConnectionByOutboundId(byte[] id) {
synchronized (_connectionLock) {
for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
Connection con = (Connection)iter.next();
if (DataHelper.eq(con.getSendStreamId(), id))
return con;
}
}
return null;
}
public void setAllowIncomingConnections(boolean allow) {
_connectionHandler.setActive(allow);
@@ -138,21 +153,20 @@ public class ConnectionManager {
public Connection connect(Destination peer, ConnectionOptions opts) {
Connection con = null;
byte receiveId[] = new byte[4];
_context.random().nextBytes(receiveId);
long expiration = _context.clock().now() + opts.getConnectTimeout();
if (opts.getConnectTimeout() <= 0)
expiration = _context.clock().now() + DEFAULT_STREAM_DELAY_MAX;
_numWaiting++;
while (true) {
if (expiration < _context.clock().now()) {
long remaining = expiration - _context.clock().now();
if (remaining <= 0) {
if (_log.shouldLog(Log.WARN))
_log.warn("Refusing to connect since we have exceeded our max of "
+ _maxConcurrentStreams + " connections");
_numWaiting--;
return null;
}
con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, opts);
con.setRemotePeer(peer);
_context.random().nextBytes(receiveId);
boolean reject = false;
synchronized (_connectionLock) {
if (locked_tooManyStreams()) {
@@ -165,18 +179,21 @@ public class ConnectionManager {
_numWaiting--;
return null;
}
reject = true;
// no remaining streams, lets wait a bit
try { _connectionLock.wait(remaining); } catch (InterruptedException ie) {}
} else {
con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, opts);
con.setRemotePeer(peer);
ByteArray ba = new ByteArray(receiveId);
while (_connectionByInboundId.containsKey(ba)) {
_context.random().nextBytes(receiveId);
}
_connectionByInboundId.put(ba, con);
break; // stop looping as a psuedo-wait
}
}
if (!reject)
break;
}
// ok we're in...
@@ -223,12 +240,14 @@ public class ConnectionManager {
con.disconnect(false, false);
}
_connectionByInboundId.clear();
_connectionLock.notifyAll();
}
}
public void removeConnection(Connection con) {
synchronized (_connectionLock) {
_connectionByInboundId.remove(new ByteArray(con.getReceiveStreamId()));
_connectionLock.notifyAll();
}
}

View File

@@ -56,9 +56,9 @@ public class ConnectionPacketHandler {
_log.debug("Scheduling ack in " + delay + "ms for received packet " + packet);
}
} else {
if (packet.getSequenceNum() > 0) {
if ( (packet.getSequenceNum() > 0) || (packet.getPayloadSize() > 0) ) {
// take note of congestion
con.getOptions().setResendDelay(con.getOptions().getResendDelay()*2);
//con.getOptions().setResendDelay(con.getOptions().getResendDelay()*2);
//con.getOptions().setWindowSize(con.getOptions().getWindowSize()/2);
if (_log.shouldLog(Log.WARN))
_log.warn("congestion.. dup " + packet);

View File

@@ -23,7 +23,7 @@ public class I2PSocketFull implements I2PSocket {
_connection.getOutputStream().close();
_connection.disconnect(true);
} else {
throw new IOException("Not connected");
//throw new IOException("Not connected");
}
}

View File

@@ -5,6 +5,8 @@ import java.io.InterruptedIOException;
import java.io.OutputStream;
import net.i2p.I2PAppContext;
import net.i2p.data.ByteArray;
import net.i2p.util.ByteCache;
import net.i2p.util.Log;
/**
@@ -21,15 +23,17 @@ public class MessageOutputStream extends OutputStream {
private boolean _closed;
private long _written;
private int _writeTimeout;
private ByteCache _dataCache;
public MessageOutputStream(I2PAppContext ctx, DataReceiver receiver) {
this(ctx, receiver, Packet.MAX_PAYLOAD_SIZE);
}
public MessageOutputStream(I2PAppContext ctx, DataReceiver receiver, int bufSize) {
super();
_dataCache = ByteCache.getInstance(128, bufSize);
_context = ctx;
_log = ctx.logManager().getLog(MessageOutputStream.class);
_buf = new byte[bufSize];
_buf = _dataCache.acquire().getData(); // new byte[bufSize];
_dataReceiver = receiver;
_dataLock = new Object();
_written = 0;
@@ -130,6 +134,10 @@ public class MessageOutputStream extends OutputStream {
_closed = true;
flush();
_log.debug("Output stream closed after writing " + _written);
if (_buf != null) {
_dataCache.release(new ByteArray(_buf));
_buf = null;
}
}
public void closeInternal() {
_closed = true;
@@ -143,6 +151,10 @@ public class MessageOutputStream extends OutputStream {
}
_dataLock.notifyAll();
}
if (_buf != null) {
_dataCache.release(new ByteArray(_buf));
_buf = null;
}
}
public boolean getClosed() { return _closed; }

View File

@@ -206,6 +206,9 @@ public class Packet {
if ( (payload != null) && (payload.length > MAX_PAYLOAD_SIZE) )
throw new IllegalArgumentException("Too large payload: " + payload.length);
}
public int getPayloadSize() {
return (_payload == null ? 0 : _payload.length);
}
/** is a particular flag set on this packet? */
public boolean isFlagSet(int flag) { return 0 != (_flags & flag); }

View File

@@ -175,20 +175,18 @@ public class PacketHandler {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Packet received on an unknown stream (and not an ECHO): " + packet);
if (sendId == null) {
for (Iterator iter = _manager.listConnections().iterator(); iter.hasNext(); ) {
Connection con = (Connection)iter.next();
if (DataHelper.eq(con.getSendStreamId(), packet.getReceiveStreamId())) {
if (con.getAckedPackets() <= 0) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received additional packets before the syn on " + con + ": " + packet);
receiveKnownCon(con, packet);
return;
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("hrmph, received while ack of syn was in flight on " + con + ": " + packet + " acked: " + con.getAckedPackets());
receiveKnownCon(con, packet);
return;
}
Connection con = _manager.getConnectionByOutboundId(packet.getReceiveStreamId());
if (con != null) {
if (con.getAckedPackets() <= 0) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received additional packets before the syn on " + con + ": " + packet);
receiveKnownCon(con, packet);
return;
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("hrmph, received while ack of syn was in flight on " + con + ": " + packet + " acked: " + con.getAckedPackets());
receiveKnownCon(con, packet);
return;
}
}
}

View File

@@ -7,7 +7,9 @@ import java.util.HashSet;
import net.i2p.I2PAppContext;
import net.i2p.client.I2PSession;
import net.i2p.client.I2PSessionException;
import net.i2p.data.ByteArray;
import net.i2p.data.SessionKey;
import net.i2p.util.ByteCache;
import net.i2p.util.Log;
/**
@@ -18,14 +20,20 @@ class PacketQueue {
private Log _log;
private I2PSession _session;
private byte _buf[];
private ByteCache _cache = ByteCache.getInstance(64, 36*1024);
public PacketQueue(I2PAppContext context, I2PSession session) {
_context = context;
_session = session;
_buf = new byte[36*1024];
_buf = _cache.acquire().getData(); // new byte[36*1024];
_log = context.logManager().getLog(PacketQueue.class);
}
protected void finalize() throws Throwable {
_cache.release(new ByteArray(_buf));
super.finalize();
}
/**
* Add a new packet to be sent out ASAP
*/