forked from I2P_Developers/i2p.i2p
cleanup, sync, more logging
This commit is contained in:
@@ -1,7 +1,9 @@
|
|||||||
package net.i2p.client.streaming;
|
package net.i2p.client.streaming;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import net.i2p.I2PAppContext;
|
import net.i2p.I2PAppContext;
|
||||||
import net.i2p.data.Destination;
|
import net.i2p.data.Destination;
|
||||||
@@ -21,14 +23,14 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
|
|||||||
private final Connection _connection;
|
private final Connection _connection;
|
||||||
private final Destination _to;
|
private final Destination _to;
|
||||||
private SessionKey _keyUsed;
|
private SessionKey _keyUsed;
|
||||||
private Set _tagsSent;
|
|
||||||
private final long _createdOn;
|
private final long _createdOn;
|
||||||
private volatile int _numSends;
|
private volatile int _numSends;
|
||||||
private long _lastSend;
|
private volatile long _lastSend;
|
||||||
private long _acceptedOn;
|
private long _acceptedOn;
|
||||||
private long _ackOn;
|
/** LOCKING: this */
|
||||||
|
private long _ackOn;
|
||||||
private long _cancelledOn;
|
private long _cancelledOn;
|
||||||
private volatile int _nackCount;
|
private final AtomicInteger _nackCount = new AtomicInteger(0);
|
||||||
private volatile boolean _retransmitted;
|
private volatile boolean _retransmitted;
|
||||||
private SimpleTimer2.TimedEvent _resendEvent;
|
private SimpleTimer2.TimedEvent _resendEvent;
|
||||||
|
|
||||||
@@ -66,7 +68,7 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
|
|||||||
/**
|
/**
|
||||||
* @deprecated should always return null or an empty set
|
* @deprecated should always return null or an empty set
|
||||||
*/
|
*/
|
||||||
public Set getTagsSent() { return _tagsSent; }
|
public Set getTagsSent() { return Collections.EMPTY_SET; }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @deprecated I2PSession throws out the tags
|
* @deprecated I2PSession throws out the tags
|
||||||
@@ -111,9 +113,10 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
|
|||||||
_lastSend = _context.clock().now();
|
_lastSend = _context.clock().now();
|
||||||
}
|
}
|
||||||
public void ackReceived() {
|
public void ackReceived() {
|
||||||
|
final long now = _context.clock().now();
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
if (_ackOn <= 0)
|
if (_ackOn <= 0)
|
||||||
_ackOn = _context.clock().now();
|
_ackOn = now;
|
||||||
releasePayload();
|
releasePayload();
|
||||||
notifyAll();
|
notifyAll();
|
||||||
}
|
}
|
||||||
@@ -134,7 +137,7 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
|
|||||||
/** how long after packet creation was it acked?
|
/** how long after packet creation was it acked?
|
||||||
* @return how long after packet creation the packet was ACKed in ms
|
* @return how long after packet creation the packet was ACKed in ms
|
||||||
*/
|
*/
|
||||||
public int getAckTime() {
|
public synchronized int getAckTime() {
|
||||||
if (_ackOn <= 0)
|
if (_ackOn <= 0)
|
||||||
return -1;
|
return -1;
|
||||||
else
|
else
|
||||||
@@ -151,15 +154,28 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
|
|||||||
* but only if it's the lowest unacked (see Connection.ResendPacketEvent)
|
* but only if it's the lowest unacked (see Connection.ResendPacketEvent)
|
||||||
*/
|
*/
|
||||||
public void incrementNACKs() {
|
public void incrementNACKs() {
|
||||||
int cnt = ++_nackCount;
|
final int cnt = _nackCount.incrementAndGet();
|
||||||
SimpleTimer2.TimedEvent evt = _resendEvent;
|
SimpleTimer2.TimedEvent evt = _resendEvent;
|
||||||
if (cnt >= Connection.FAST_RETRANSMIT_THRESHOLD && evt != null && (!_retransmitted) &&
|
if (cnt >= Connection.FAST_RETRANSMIT_THRESHOLD && evt != null && (!_retransmitted) &&
|
||||||
(_numSends == 1 || _lastSend < _context.clock().now() + 4*1000)) { // Don't fast retx if we recently resent it
|
(_numSends == 1 || _lastSend < _context.clock().now() + 4*1000)) { // Don't fast retx if we recently resent it
|
||||||
_retransmitted = true;
|
_retransmitted = true;
|
||||||
evt.reschedule(0);
|
evt.reschedule(0);
|
||||||
|
// shouldn't ^^^ be clock.now() - 4000 ??? --zab
|
||||||
|
|
||||||
|
if (_log.shouldLog(Log.DEBUG)) {
|
||||||
|
final String log = String.format("%s nacks and retransmits. Criteria: nacks=%d, retransmitted=%b,"+
|
||||||
|
" numSends=%d, lastSend=%d, now=%d",
|
||||||
|
toString(), cnt, _retransmitted, _numSends, _lastSend, _context.clock().now());
|
||||||
|
_log.debug(log);
|
||||||
|
}
|
||||||
|
} else if (_log.shouldLog(Log.DEBUG)) {
|
||||||
|
final String log = String.format("%s nack but no retransmit. Criteria: nacks=%d, retransmitted=%b,"+
|
||||||
|
" numSends=%d, lastSend=%d, now=%d",
|
||||||
|
toString(), cnt, _retransmitted, _numSends, _lastSend, _context.clock().now());
|
||||||
|
_log.debug(log);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
public int getNACKs() { return _nackCount; }
|
public int getNACKs() { return _nackCount.get(); }
|
||||||
|
|
||||||
public void setResendPacketEvent(SimpleTimer2.TimedEvent evt) { _resendEvent = evt; }
|
public void setResendPacketEvent(SimpleTimer2.TimedEvent evt) { _resendEvent = evt; }
|
||||||
|
|
||||||
@@ -173,12 +189,14 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
|
|||||||
|
|
||||||
//if ( (_tagsSent != null) && (!_tagsSent.isEmpty()) )
|
//if ( (_tagsSent != null) && (!_tagsSent.isEmpty()) )
|
||||||
// buf.append(" with tags");
|
// buf.append(" with tags");
|
||||||
|
final int nackCount = _nackCount.get();
|
||||||
|
if (nackCount > 0)
|
||||||
|
buf.append(" nacked ").append(nackCount).append(" times");
|
||||||
|
|
||||||
if (_nackCount > 0)
|
synchronized(this) {
|
||||||
buf.append(" nacked ").append(_nackCount).append(" times");
|
if (_ackOn > 0)
|
||||||
|
buf.append(" ack after ").append(getAckTime());
|
||||||
if (_ackOn > 0)
|
}
|
||||||
buf.append(" ack after ").append(getAckTime());
|
|
||||||
|
|
||||||
if (_numSends > 1)
|
if (_numSends > 1)
|
||||||
buf.append(" sent ").append(_numSends).append(" times");
|
buf.append(" sent ").append(_numSends).append(" times");
|
||||||
@@ -256,9 +274,9 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
|
|||||||
releasePayload();
|
releasePayload();
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean writeAccepted() { return _acceptedOn > 0 && _cancelledOn <= 0; }
|
public synchronized boolean writeAccepted() { return _acceptedOn > 0 && _cancelledOn <= 0; }
|
||||||
public boolean writeFailed() { return _cancelledOn > 0; }
|
public synchronized boolean writeFailed() { return _cancelledOn > 0; }
|
||||||
public boolean writeSuccessful() { return _ackOn > 0 && _cancelledOn <= 0; }
|
public synchronized boolean writeSuccessful() { return _ackOn > 0 && _cancelledOn <= 0; }
|
||||||
|
|
||||||
/** Generate a pcap/tcpdump-compatible format,
|
/** Generate a pcap/tcpdump-compatible format,
|
||||||
* so we can use standard debugging tools.
|
* so we can use standard debugging tools.
|
||||||
|
Reference in New Issue
Block a user