* Streaming: Major rework of connection disconnect process. Tickets 1040-1042.

- Prevent multiple calls or reentrancy in disconnect() (ticket #1041)
   - Implement processing of close to skip TIME-WAIT, and
     wait for all packets to be acked (not just the CLOSE) before
     doing so, if possible (ticket #1042)
   - Don't call disconnect() or disconnectComplete() from I2PSocketFull.destroy()
     so retransmissions and acks can still happen (removes some close loops)
   - Don't call disconnect() until we have both sent and received a CLOSE (ticket #1040)
   - Don't reset the connection from CPH just because we sent a CLOSE
     and it was acked (ticket #1040)
   - Ack packets even if we sent a CLOSE  (ticket #1040)
   - Retransmit CLOSE if not acked (ticket #1040)
   - Send received packets to the MessageInputStream even if we haven't received a SYN
   - Don't call MessageInputStream.messageReceived() for ack-only packets, that was pointless
   - Don't send a RESET after timeout of an outbound connection
   - Work around bugs on other end by limiting retransmission of CLOSE packets
   - Make _isInbound final
   - More cleanups, javadocs, log tweaks
This commit is contained in:
zzz
2013-09-28 11:47:47 +00:00
parent 30a666c833
commit 2884df873e
5 changed files with 266 additions and 142 deletions

View File

@@ -35,7 +35,8 @@ class Connection {
private final AtomicLong _lastSendId;
private final AtomicBoolean _resetReceived = new AtomicBoolean();
private final AtomicLong _resetSentOn = new AtomicLong();
private volatile boolean _connected;
private final AtomicBoolean _connected = new AtomicBoolean(true);
private final AtomicBoolean _finalDisconnect = new AtomicBoolean();
private boolean _hardDisconnected;
private final MessageInputStream _inputStream;
private final MessageOutputStream _outputStream;
@@ -48,7 +49,7 @@ class Connection {
private int _unackedPacketsReceived;
private long _congestionWindowEnd;
private volatile long _highestAckedThrough;
private boolean _isInbound;
private final boolean _isInbound;
private boolean _updatedShareOpts;
/** Packet ID (Long) to PacketLocal for sent but unacked packets */
private final Map<Long, PacketLocal> _outboundPackets;
@@ -87,7 +88,10 @@ class Connection {
public static final long MAX_RESEND_DELAY = 45*1000;
public static final long MIN_RESEND_DELAY = 2*1000;
/** wait up to 5 minutes after disconnection so we can ack/close packets */
/**
* Wait up to 5 minutes after disconnection so we can ack/close packets.
* Roughly equal to the TIME-WAIT time in RFC 793, where the recommendation is 4 minutes (2 * MSL)
*/
public static final int DISCONNECT_TIMEOUT = 5*60*1000;
public static final int DEFAULT_CONNECT_TIMEOUT = 60*1000;
@@ -107,12 +111,14 @@ class Connection {
*/
public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser,
SimpleTimer2 timer,
PacketQueue queue, ConnectionPacketHandler handler, ConnectionOptions opts) {
PacketQueue queue, ConnectionPacketHandler handler, ConnectionOptions opts,
boolean isInbound) {
_context = ctx;
_connectionManager = manager;
_chooser = chooser;
_outboundQueue = queue;
_handler = handler;
_isInbound = isInbound;
_log = _context.logManager().getLog(Connection.class);
_receiver = new ConnectionDataReceiver(_context, this);
_inputStream = new MessageInputStream(_context);
@@ -135,7 +141,6 @@ class Connection {
_lastCongestionSeenAt = MAX_WINDOW_SIZE*2; // lets allow it to grow
_lastCongestionTime = -1;
_lastCongestionHighestUnacked = -1;
_connected = true;
_lastReceivedOn = -1;
_activityTimer = new ActivityTimer();
_ackSinceCongestion = true;
@@ -151,24 +156,6 @@ class Connection {
return _lastSendId.incrementAndGet();
}
/**
* Notify that a close was received
*/
public void closeReceived() {
if (_closeReceivedOn.compareAndSet(0, _context.clock().now())) {
_inputStream.closeReceived();
synchronized (_connectLock) { _connectLock.notifyAll(); }
}
}
/**
* Notify that a close that we sent was acked
* @since 0.9.9
*/
public void ourCloseAcked() {
// todo
}
/**
* This doesn't "send a choke". Rather, it blocks if the outbound window is full,
* thus choking the sender that calls this.
@@ -196,7 +183,7 @@ class Connection {
// no need to wait until the other side has ACKed us before sending the first few wsize
// packets through
// Incorrect assumption, the constructor defaults _connected to true --Sponge
if (!_connected)
if (!_connected.get())
return false;
started = true;
// Try to keep things moving even during NACKs and retransmissions...
@@ -234,6 +221,7 @@ class Connection {
}
}
}
void windowAdjusted() {
synchronized (_outboundPackets) {
_outboundPackets.notifyAll();
@@ -290,8 +278,7 @@ class Connection {
* Got a packet we shouldn't have, send 'em a reset.
* More than one reset may be sent.
*/
public void sendReset() {
scheduleDisconnectEvent();
private void sendReset() {
long now = _context.clock().now();
if (_resetSentOn.get() + 10*1000 > now) return; // don't send resets too fast
if (_resetReceived.get()) return;
@@ -544,22 +531,74 @@ class Connection {
if ( (elapsed > 250) && (_log.shouldLog(Log.WARN)) )
_log.warn("Took " + elapsed + "ms to pump through " + sched + " on " + toString());
}
/**
* Notify that a close was sent.
* Called by CPH.
* May be called multiple times... but shouldn't be.
*/
public void notifyCloseSent() {
if (!_closeSentOn.compareAndSet(0, _context.clock().now())) {
// TODO ackImmediately() after sending CLOSE causes this. Bad?
if (_log.shouldLog(Log.DEBUG))
_log.debug("Sent more than one CLOSE: " + toString());
}
// that's it, wait for notifyLastPacketAcked() or closeReceived()
}
/** notify that a reset was received */
/**
* Notify that a close was received.
* Called by CPH.
* May be called multiple times.
*/
public void closeReceived() {
if (_closeReceivedOn.compareAndSet(0, _context.clock().now())) {
_inputStream.closeReceived();
// TODO if outbound && no SYN received, treat like a reset? Could this happen?
if (_closeSentOn.get() > 0) {
// received after sent
disconnect(true);
} else {
synchronized (_connectLock) { _connectLock.notifyAll(); }
}
}
}
/**
* Notify that a close that we sent, and all previous packets, were acked.
* Called by CPH. Only call this once.
* @since 0.9.9
*/
public void notifyLastPacketAcked() {
long cso = _closeSentOn.get();
if (cso <= 0)
throw new IllegalStateException();
// we only create one CLOSE packet so we will only get called once,
// no need to check
long cro = _closeReceivedOn.get();
if (cro > 0 && cro < cso)
// received before sent
disconnect(true);
}
/**
* Notify that a reset was received.
* May be called multiple times.
*/
public void resetReceived() {
if (!_resetReceived.compareAndSet(false, true))
return;
scheduleDisconnectEvent();
IOException ioe = new IOException("Reset received");
_outputStream.streamErrorOccurred(ioe);
_inputStream.streamErrorOccurred(ioe);
_connectionError = "Connection reset";
synchronized (_connectLock) { _connectLock.notifyAll(); }
// RFC 793 end of setion 3.4: We are completely done.
disconnectComplete();
}
public boolean getResetReceived() { return _resetReceived.get(); }
public void setInbound() { _isInbound = true; }
public boolean isInbound() { return _isInbound; }
/**
@@ -567,10 +606,16 @@ class Connection {
* outbound connection. Only set to false on disconnect.
* For outbound, use getHighestAckedThrough() >= 0 also,
* to determine if the connection is up.
*
* In general, this is true until either:
* - CLOSE received and CLOSE sent and our CLOSE is acked
* - RESET received or sent
* - closed on the socket side
*/
public boolean getIsConnected() { return _connected; }
public boolean getIsConnected() { return _connected.get(); }
public boolean getHardDisconnected() { return _hardDisconnected; }
public boolean getResetSent() { return _resetSentOn.get() > 0; }
/** @return 0 if not sent */
@@ -579,38 +624,103 @@ class Connection {
/** @return 0 if not scheduled */
public long getDisconnectScheduledOn() { return _disconnectScheduledOn.get(); }
void disconnect(boolean cleanDisconnect) {
/**
* Must be called when we are done with this connection.
* Enters TIME-WAIT if necessary, and removes from connection manager.
* May be called multiple times.
* This closes the socket side.
* In normal operation, this is called when a CLOSE has been received,
* AND a CLOSE has been sent, AND EITHER:
* received close before sent close AND our CLOSE has been acked
* OR
* received close after sent close.
*
* @param cleanDisconnect if true, normal close; if false, send a RESET
*/
public void disconnect(boolean cleanDisconnect) {
disconnect(cleanDisconnect, true);
}
void disconnect(boolean cleanDisconnect, boolean removeFromConMgr) {
/**
* Must be called when we are done with this connection.
* May be called multiple times.
* This closes the socket side.
* In normal operation, this is called when a CLOSE has been received,
* AND a CLOSE has been sent, AND EITHER:
* received close before sent close AND our CLOSE has been acked
* OR
* received close after sent close.
*
* @param cleanDisconnect if true, normal close; if false, send a RESET
* @param removeFromConMgr if true, enters TIME-WAIT if necessary.
* if false, MUST call disconnectComplete() later.
* Should always be true unless called from ConnectionManager.
*/
public void disconnect(boolean cleanDisconnect, boolean removeFromConMgr) {
if (!_connected.compareAndSet(true, false)) {
return;
}
synchronized (_connectLock) { _connectLock.notifyAll(); }
if (_log.shouldLog(Log.DEBUG))
_log.debug("Disconnecting " + toString(), new Exception("discon"));
if (!cleanDisconnect) {
_hardDisconnected = true;
if (_log.shouldLog(Log.WARN))
_log.warn("Hard disconnecting and sending a reset on " + toString(), new Exception("cause"));
sendReset();
if (_closeReceivedOn.get() <= 0) {
// should have already been called from closeReceived() above
_inputStream.closeReceived();
}
if (cleanDisconnect && _connected) {
// send close packets and schedule stuff...
if (cleanDisconnect) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Clean disconnecting, remove? " + removeFromConMgr +
": " + toString(), new Exception("discon"));
_outputStream.closeInternal();
_inputStream.close();
} else {
if (_connected)
doClose();
killOutstandingPackets();
_hardDisconnected = true;
if (_inputStream.getHighestBlockId() >= 0 && !getResetReceived()) {
// only send a RESET if we ever got something (and he didn't RESET us),
// otherwise don't waste the crypto and tags
if (_log.shouldLog(Log.WARN))
_log.warn("Hard disconnecting and sending reset, remove? " + removeFromConMgr +
" on " + toString(), new Exception("cause"));
sendReset();
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Hard disconnecting, remove? " + removeFromConMgr +
" on " + toString(), new Exception("cause"));
}
_outputStream.streamErrorOccurred(new IOException("Hard disconnect"));
}
if (removeFromConMgr) {
scheduleDisconnectEvent();
if (!cleanDisconnect) {
disconnectComplete();
} else {
long cro = _closeReceivedOn.get();
long cso = _closeSentOn.get();
if (cro > 0 && cro < cso && getUnackedPacketsSent() <= 0) {
if (_log.shouldLog(Log.INFO))
_log.info("Rcv close -> send close -> last acked, skip TIME-WAIT for " + toString());
// They sent the first CLOSE.
// We do not need to enter TIME-WAIT, we are done.
// clean disconnect, don't schedule TIME-WAIT
// remove conn
disconnectComplete();
} else {
scheduleDisconnectEvent();
}
}
}
_connected = false;
}
void disconnectComplete() {
_connected = false;
private static final IOException DISCON_IOE = new IOException("disconnected!");
/**
* Must be called when we are done with this connection.
* Final disconnect. Remove from conn manager.
* May be called multiple times.
*/
public void disconnectComplete() {
if (!_finalDisconnect.compareAndSet(false, true))
return;
_connected.set(false);
I2PSocketFull s = _socket;
if (s != null) {
s.destroy2();
@@ -619,38 +729,35 @@ class Connection {
_outputStream.destroy();
_receiver.destroy();
_activityTimer.cancel();
_inputStream.streamErrorOccurred(new IOException("disconnected!"));
_inputStream.streamErrorOccurred(DISCON_IOE);
if (_disconnectScheduledOn.compareAndSet(0, _context.clock().now())) {
if (_log.shouldLog(Log.INFO))
_log.info("Connection disconnect complete from dead, drop the con "
if (_log.shouldLog(Log.INFO))
_log.info("Connection disconnect complete: "
+ toString());
_connectionManager.removeConnection(this);
}
_connectionManager.removeConnection(this);
killOutstandingPackets();
}
/** ignore tag issues */
/**
* Cancel and remove all packets awaiting ack
*/
private void killOutstandingPackets() {
//boolean tagsCancelled = false;
synchronized (_outboundPackets) {
for (Iterator<PacketLocal> iter = _outboundPackets.values().iterator(); iter.hasNext(); ) {
PacketLocal pl = iter.next();
//if ( (pl.getTagsSent() != null) && (pl.getTagsSent().size() > 0) )
// tagsCancelled = true;
if (_outboundPackets.isEmpty())
return; // short circuit iterator
for (PacketLocal pl : _outboundPackets.values()) {
pl.cancelled();
}
_outboundPackets.clear();
_outboundPackets.notifyAll();
}
//if (tagsCancelled)
// _context.sessionKeyManager().failTags(_remotePeer.getPublicKey());
}
/**
* Schedule the end of the TIME-WAIT state,
* but only if not previously scheduled.
* Must call either this or disconnectComplete()
*
* @return true if a new event was scheduled; false if already scheduled
* @since 0.9.9
*/
@@ -665,23 +772,13 @@ class Connection {
public DisconnectEvent() {
if (_log.shouldLog(Log.INFO))
_log.info("Connection disconnect timer initiated: 5 minutes to drop "
+ Connection.this.toString());
+ Connection.this.toString(), new Exception());
}
public void timeReached() {
killOutstandingPackets();
if (_log.shouldLog(Log.INFO))
_log.info("Connection disconnect timer complete, drop the con "
+ Connection.this.toString());
_connectionManager.removeConnection(Connection.this);
disconnectComplete();
}
}
private void doClose() {
_outputStream.streamErrorOccurred(new IOException("Hard disconnect"));
_inputStream.closeReceived();
synchronized (_connectLock) { _connectLock.notifyAll(); }
}
private boolean _remotePeerSet = false;
/** who are we talking with
* @return peer Destination
@@ -832,12 +929,6 @@ class Connection {
/** @return 0 if not sent */
public long getCloseSentOn() { return _closeSentOn.get(); }
/** notify that a close was sent */
public void setCloseSentOn(long when) {
if (_closeSentOn.compareAndSet(0, when))
scheduleDisconnectEvent();
}
/** @return 0 if not received */
public long getCloseReceivedOn() { return _closeReceivedOn.get(); }
@@ -849,6 +940,7 @@ class Connection {
}
public void incrementUnackedPacketsReceived() { _unackedPacketsReceived++; }
public int getUnackedPacketsReceived() { return _unackedPacketsReceived; }
/** how many packets have we sent but not yet received an ACK for?
* @return Count of packets in-flight.
*/
@@ -894,7 +986,7 @@ class Connection {
void waitForConnect() {
long expiration = _context.clock().now() + _options.getConnectTimeout();
while (true) {
if (_connected && (_receiveStreamId > 0) && (_sendStreamId > 0) ) {
if (_connected.get() && (_receiveStreamId > 0) && (_sendStreamId > 0) ) {
// w00t
if (_log.shouldLog(Log.DEBUG))
_log.debug("waitForConnect(): Connected and we have stream IDs");
@@ -905,7 +997,7 @@ class Connection {
_log.debug("waitForConnect(): connection error found: " + _connectionError);
return;
}
if (!_connected) {
if (!_connected.get()) {
_connectionError = "Connection failed";
if (_log.shouldLog(Log.DEBUG))
_log.debug("waitForConnect(): not connected");
@@ -964,7 +1056,7 @@ class Connection {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Fire inactivity timer on " + Connection.this.toString());
// uh, nothing more to do...
if (!_connected) {
if (!_connected.get()) {
if (_log.shouldLog(Log.DEBUG)) _log.debug("Inactivity timeout reached, but we are already closed");
return;
}
@@ -1101,7 +1193,7 @@ class Connection {
if (getResetSent())
buf.append(" reset sent ").append(DataHelper.formatDuration(_context.clock().now() - getResetSentOn())).append(" ago");
if (getResetReceived())
buf.append(" reset received ").append(DataHelper.formatDuration(_context.clock().now() - getDisconnectScheduledOn())).append(" ago");
buf.append(" reset rcvd ").append(DataHelper.formatDuration(_context.clock().now() - getDisconnectScheduledOn())).append(" ago");
if (getCloseSentOn() > 0) {
buf.append(" close sent ");
long timeSinceClose = _context.clock().now() - getCloseSentOn();
@@ -1109,7 +1201,7 @@ class Connection {
buf.append(" ago");
}
if (getCloseReceivedOn() > 0)
buf.append(" close received ").append(DataHelper.formatDuration(_context.clock().now() - getCloseReceivedOn())).append(" ago");
buf.append(" close rcvd ").append(DataHelper.formatDuration(_context.clock().now() - getCloseReceivedOn())).append(" ago");
buf.append(" sent: ").append(1 + _lastSendId.get());
buf.append(" rcvd: ").append(1 + _inputStream.getHighestBlockId() - missing);
buf.append(" ackThru ").append(_highestAckedThrough);
@@ -1180,9 +1272,7 @@ class Connection {
if (_packet.getAckTime() > 0)
return false;
if (_resetSentOn.get() > 0 || _resetReceived.get() || !_connected) {
if(_log.shouldLog(Log.WARN) && (_resetSentOn.get() <= 0) && (!_resetReceived.get()))
_log.warn("??? no resets but not connected: " + _packet); // don't think this is possible
if (_resetSentOn.get() > 0 || _resetReceived.get() || _finalDisconnect.get()) {
_packet.cancelled();
return false;
}
@@ -1277,9 +1367,22 @@ class Connection {
if (numSends - 1 > _options.getMaxResends()) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Too many resends");
_log.debug("Disconnecting, too many resends of " + _packet);
_packet.cancelled();
disconnect(false);
} else if (numSends >= 3 &&
_packet.isFlagSet(Packet.FLAG_CLOSE) &&
_packet.getPayloadSize() <= 0 &&
_outboundPackets.size() <= 1 &&
getCloseReceivedOn() > 0) {
// Bug workaround to prevent 5 minutes of retransmission
// Routers before 0.9.9 have bugs, they won't ack anything after
// they sent a close. Only send 3 CLOSE packets total, then
// shut down normally.
if (_log.shouldLog(Log.INFO))
_log.info("Too many CLOSE resends, disconnecting: " + Connection.this.toString());
_packet.cancelled();
disconnect(true);
} else {
//long timeout = _options.getResendDelay() << numSends;
long rto = _options.getRTO();

View File

@@ -57,6 +57,8 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
* may generate a packet with a plain ACK/NACK or CLOSE, or nothing whatsoever
* if there's nothing new to send.
*
* This is called from MessageOutputStream, i.e. data from the client.
*
* @param buf data to be sent - may be null
* @param off offset into the buffer to start writing from
* @param size how many bytes of the buffer to write (may be 0)
@@ -108,6 +110,8 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
* Send some data through the connection, attaching any appropriate flags
* onto the packet.
*
* Called externally from Connection with args (null, 0, 0) to send an ack
*
* @param buf data to be sent - may be null
* @param off offset into the buffer to start writing from
* @param size how many bytes of the buffer to write (may be 0)
@@ -118,6 +122,8 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
}
/**
* Called externally from Connection with args (null, 0, 0, true) to send an empty data packet
*
* @param buf data to be sent - may be null
* @param off offset into the buffer to start writing from
* @param size how many bytes of the buffer to write (may be 0)
@@ -210,13 +216,17 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
// packets sent, otherwise the other side could receive the CLOSE prematurely,
// since this ACK could arrive before the unacked payload message.
// TODO if the only unacked packet is the CLOSE packet and it didn't have any data...
//
// FIXME Implement better half-close by sending CLOSE whenever. Needs 0.9.9 bug fixes
// throughout network?
//
if (con.getOutputStream().getClosed() &&
( (size > 0) || (con.getUnackedPacketsSent() <= 0) || (packet.getSequenceNum() > 0) ) ) {
packet.setFlag(Packet.FLAG_CLOSE);
con.setCloseSentOn(_context.clock().now());
con.notifyCloseSent();
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("New outbound packet on " + _connection + ": " + packet);
_log.debug("New OB pkt (acks not yet filled in): " + packet + " on " + _connection);
return packet;
}

View File

@@ -192,9 +192,8 @@ class ConnectionManager {
ConnectionOptions opts = new ConnectionOptions(_defaultOptions);
opts.setPort(synPacket.getRemotePort());
opts.setLocalPort(synPacket.getLocalPort());
Connection con = new Connection(_context, this, _schedulerChooser, _timer, _outboundQueue, _conPacketHandler, opts);
Connection con = new Connection(_context, this, _schedulerChooser, _timer, _outboundQueue, _conPacketHandler, opts, true);
_tcbShare.updateOptsFromShare(con);
con.setInbound();
long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1;
boolean reject = false;
int active = 0;
@@ -326,7 +325,7 @@ class ConnectionManager {
// try { _connectionLock.wait(remaining); } catch (InterruptedException ie) {}
try { Thread.sleep(remaining/4); } catch (InterruptedException ie) {}
} else {
con = new Connection(_context, this, _schedulerChooser, _timer, _outboundQueue, _conPacketHandler, opts);
con = new Connection(_context, this, _schedulerChooser, _timer, _outboundQueue, _conPacketHandler, opts, false);
con.setRemotePeer(peer);
while (_connectionByInboundId.containsKey(Long.valueOf(receiveId))) {

View File

@@ -52,12 +52,13 @@ class ConnectionPacketHandler {
return;
}
final long seqNum = packet.getSequenceNum();
if (con.getHardDisconnected()) {
if ( (packet.getSequenceNum() > 0) || (packet.getPayloadSize() > 0) ||
if ( (seqNum > 0) || (packet.getPayloadSize() > 0) ||
(packet.isFlagSet(Packet.FLAG_SYNCHRONIZE | Packet.FLAG_CLOSE)) ) {
if (_log.shouldLog(Log.WARN))
_log.warn("Received a data packet after hard disconnect: " + packet + " on " + con);
con.sendReset();
// the following will send a RESET
con.disconnect(false);
} else {
if (_log.shouldLog(Log.WARN))
@@ -68,14 +69,12 @@ class ConnectionPacketHandler {
}
if ( (con.getCloseSentOn() > 0) && (con.getUnackedPacketsSent() <= 0) &&
(packet.getSequenceNum() > 0) && (packet.getPayloadSize() > 0)) {
if (_log.shouldLog(Log.WARN))
_log.warn("Received new data when we've sent them data and all of our data is acked: "
(seqNum > 0) && (packet.getPayloadSize() > 0)) {
if (_log.shouldLog(Log.INFO))
_log.info("Received new data when we've sent them data and all of our data is acked: "
+ packet + " on " + con + "");
con.sendReset();
con.disconnect(false);
packet.releasePayload();
return;
// this is fine, half-close
// Major bug before 0.9.9, packets were dropped here and a reset sent
}
if (packet.isFlagSet(Packet.FLAG_MAX_PACKET_SIZE_INCLUDED)) {
@@ -111,7 +110,7 @@ class ConnectionPacketHandler {
long ready = con.getInputStream().getHighestReadyBockId();
int available = con.getOptions().getInboundBufferSize() - con.getInputStream().getTotalReadySize();
int allowedBlocks = available/con.getOptions().getMaxMessageSize();
if (packet.getSequenceNum() > ready + allowedBlocks) {
if (seqNum > ready + allowedBlocks) {
if (_log.shouldLog(Log.WARN))
_log.warn("Inbound buffer exceeded on connection " + con + " ("
+ ready + "/"+ (ready+allowedBlocks) + "/" + available
@@ -127,23 +126,28 @@ class ConnectionPacketHandler {
_context.statManager().addRateData("stream.con.receiveMessageSize", packet.getPayloadSize(), 0);
boolean isNew = false;
boolean allowAck = true;
final boolean isSYN = packet.isFlagSet(Packet.FLAG_SYNCHRONIZE);
// We allow the SendStreamID to be 0 so that the originator can send
// multiple packets before he gets the first ACK back.
// If we want to limit the number of packets we receive without a
// SendStreamID, do it in PacketHandler.receiveUnknownCon().
if ( (!packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) &&
if ( (!isSYN) &&
(packet.getReceiveStreamId() <= 0) )
allowAck = false;
if (allowAck) {
isNew = con.getInputStream().messageReceived(packet.getSequenceNum(), packet.getPayload());
} else {
con.getInputStream().notifyActivity();
// Receive the message.
// Note that this is called even for empty packets, including CLOSE packets, so the
// MessageInputStream will know the last sequence number.
// But not ack-only packets!
boolean isNew;
if (seqNum > 0 || isSYN)
isNew = con.getInputStream().messageReceived(seqNum, packet.getPayload());
else
isNew = false;
if (!allowAck)
isNew = false;
}
//if ( (packet.getSequenceNum() == 0) && (packet.getPayloadSize() > 0) ) {
// if (_log.shouldLog(Log.DEBUG))
@@ -151,13 +155,19 @@ class ConnectionPacketHandler {
// + " packet: " + packet + " con: " + con);
//}
if (_log.shouldLog(Log.DEBUG))
_log.debug((isNew ? "New" : "Dup or ack-only") + " inbound packet on " + con + ": " + packet);
if (_log.shouldLog(Log.DEBUG)) {
String type;
if (!allowAck)
type = "Non-SYN before SYN";
else if (isNew)
type = "New";
else if (packet.getPayloadSize() <= 0)
type = "Ack-only";
else
type = "Dup";
_log.debug(type + " IB pkt: " + packet + " on " + con);
}
// close *after* receiving the data, as well as after verifying the signatures / etc
if (packet.isFlagSet(Packet.FLAG_CLOSE) && packet.isFlagSet(Packet.FLAG_SIGNATURE_INCLUDED))
con.closeReceived();
boolean fastAck = false;
boolean ackOnly = false;
@@ -180,8 +190,7 @@ class ConnectionPacketHandler {
_log.debug("Scheduling ack in " + delay + "ms for received packet " + packet);
}
} else {
if ( (packet.getSequenceNum() > 0) || (packet.getPayloadSize() > 0) ||
(packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) ) {
if ( (seqNum > 0) || (packet.getPayloadSize() > 0) || isSYN) {
_context.statManager().addRateData("stream.con.receiveDuplicateSize", packet.getPayloadSize(), 0);
con.incrementDupMessagesReceived(1);
@@ -209,7 +218,7 @@ class ConnectionPacketHandler {
}
} else {
if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) {
if (isSYN) {
//con.incrementUnackedPacketsReceived();
con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay());
} else {
@@ -220,7 +229,7 @@ class ConnectionPacketHandler {
}
}
if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE) && (packet.getSendStreamId() <= 0) ) {
if (isSYN && (packet.getSendStreamId() <= 0) ) {
// don't honor the ACK 0 in SYN packets received when the other side
// has obviously not seen our messages
} else {
@@ -249,10 +258,14 @@ class ConnectionPacketHandler {
// non-ack message payloads are queued in the MessageInputStream
packet.releasePayload();
}
// close *after* receiving the data, as well as after verifying the signatures / etc
// update the TCB Cache now that we've processed the acks and updated our rtt etc.
if (isNew && packet.isFlagSet(Packet.FLAG_CLOSE) && packet.isFlagSet(Packet.FLAG_SIGNATURE_INCLUDED))
con.updateShareOpts();
if (packet.isFlagSet(Packet.FLAG_CLOSE) && packet.isFlagSet(Packet.FLAG_SIGNATURE_INCLUDED)) {
con.closeReceived();
if (isNew)
con.updateShareOpts();
}
//if (choke)
// con.fastRetransmit();
@@ -285,6 +298,7 @@ class ConnectionPacketHandler {
else
return false;
boolean lastPacketAcked = false;
if ( (acked != null) && (!acked.isEmpty()) ) {
if (_log.shouldLog(Log.DEBUG))
_log.debug(acked.size() + " of our packets acked with " + packet);
@@ -305,9 +319,6 @@ class ConnectionPacketHandler {
_context.statManager().addRateData("stream.sendsBeforeAck", numSends, ackTime);
if (p.isFlagSet(Packet.FLAG_CLOSE))
con.ourCloseAcked();
// ACK the tags we delivered so we can use them
//if ( (p.getKeyUsed() != null) && (p.getTagsSent() != null)
// && (p.getTagsSent().size() > 0) ) {
@@ -339,9 +350,14 @@ class ConnectionPacketHandler {
}
}
_context.statManager().addRateData("stream.con.packetsAckedPerMessageReceived", acked.size(), highestRTT);
if (con.getCloseSentOn() > 0 && con.getUnackedPacketsSent() <= 0)
lastPacketAcked = true;
}
return adjustWindow(con, isNew, packet.getSequenceNum(), numResends, (acked != null ? acked.size() : 0), choke);
boolean rv = adjustWindow(con, isNew, packet.getSequenceNum(), numResends, (acked != null ? acked.size() : 0), choke);
if (lastPacketAcked)
con.notifyLastPacketAcked();
return rv;
}
/** @return are we congested? */

View File

@@ -43,15 +43,14 @@ class I2PSocketFull implements I2PSocket {
if (c == null) return;
if (c.getIsConnected()) {
OutputStream out = c.getOutputStream();
if (out != null) {
try {
out.close();
} catch (IOException ioe) {
// ignore any write error, as we want to keep on and kill the
// con (thanks Complication!)
}
try {
out.close();
} catch (IOException ioe) {
// ignore any write error, as we want to keep on and kill the
// con (thanks Complication!)
}
c.disconnect(true);
MessageInputStream in = c.getInputStream();
in.close();
} else {
//throw new IOException("Not connected");
}
@@ -143,10 +142,7 @@ class I2PSocketFull implements I2PSocket {
}
void destroy() {
Connection c = _connection;
destroy2();
if (c != null)
c.disconnectComplete();
}
/**