- Reject some packet types if they came in via fallback introKey
   - Increase retransmission timeout for SessionRequest, SessionConfirm,
     and RelayRequest; implement backoff
   - Move UDPFlooder to test
   - More volatiles, finals, cleanups, stat removals, log tweaks
This commit is contained in:
zzz
2012-08-12 11:24:15 +00:00
parent e67dd15308
commit cfcafd2ba3
15 changed files with 253 additions and 145 deletions

View File

@ -1,3 +1,13 @@
2012-08-12 zzz
* Jetty: Don't use direct byte buffers that may be leaking (ticket #679)
* PeerManager: Fix NPE on Android (ticket #687)
* SSU:
- Reject some packet types if they came in via fallback introKey
- Increase retransmission timeout for SessionRequest, SessionConfirm,
and RelayRequest; implement backoff
- Move UDPFlooder to test
- More volatiles, finals, cleanups, stat removals, log tweaks
2012-08-11 zzz
* DataHelper: toString(byte[]) cleanup
* i2psnark:

View File

@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 6;
public final static long BUILD = 7;
/** for example "-test" */
public final static String EXTRA = "";

View File

@ -24,7 +24,7 @@ class ACKSender implements Runnable {
private final PacketBuilder _builder;
/** list of peers (PeerState) who we have received data from but not yet ACKed to */
private final BlockingQueue<PeerState> _peersToACK;
private boolean _alive;
private volatile boolean _alive;
private static final long POISON_PS = -9999999999l;
/** how frequently do we want to send ACKs to a peer? */

View File

@ -20,6 +20,7 @@ import net.i2p.router.Router;
import net.i2p.router.RouterContext;
import net.i2p.router.networkdb.kademlia.FloodfillNetworkDatabaseFacade;
import net.i2p.router.transport.crypto.DHSessionKeyBuilder;
import net.i2p.util.Addresses;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
import net.i2p.util.SimpleScheduler;
@ -44,7 +45,7 @@ class EstablishmentManager {
private final ConcurrentHashMap<RemoteHostId, List<OutNetMessage>> _queuedOutbound;
/** map of nonce (Long) to OutboundEstablishState */
private final ConcurrentHashMap<Long, OutboundEstablishState> _liveIntroductions;
private boolean _alive;
private volatile boolean _alive;
private final Object _activityLock;
private int _activity;
@ -76,6 +77,7 @@ class EstablishmentManager {
_context.statManager().createRateStat("udp.sendIntroRelayRequest", "How often we send a relay request to reach a peer", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendIntroRelayTimeout", "How often a relay request times out before getting a response (due to the target or intro peer being offline)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.receiveIntroRelayResponse", "How long it took to receive a relay response", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.establishDropped", "Dropped an inbound establish message", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.establishRejected", "How many pending outbound connections are there when we refuse to add any more?", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.establishOverflow", "How many messages were queued up on a pending connection when it was too much?", "udp", UDPTransport.RATES);
// following are for PeerState
@ -234,6 +236,8 @@ class EstablishmentManager {
}
if (rejected) {
if (_log.shouldLog(Log.WARN))
_log.warn("Rejecting outbound establish");
_transport.failed(msg, "Too many pending outbound connections");
_context.statManager().addRateData("udp.establishRejected", deferred, 0);
return;
@ -252,12 +256,14 @@ class EstablishmentManager {
}
private class Expire implements SimpleTimer.TimedEvent {
private RemoteHostId _to;
private OutboundEstablishState _state;
private final RemoteHostId _to;
private final OutboundEstablishState _state;
public Expire(RemoteHostId to, OutboundEstablishState state) {
_to = to;
_state = state;
}
public void timeReached() {
// remove only if value == state
boolean removed = _outboundStates.remove(_to, _state);
@ -289,8 +295,12 @@ class EstablishmentManager {
boolean isNew = false;
if (_inboundStates.size() >= maxInbound)
if (_inboundStates.size() >= maxInbound) {
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping inbound establish, increase " + PROP_MAX_CONCURRENT_ESTABLISH);
_context.statManager().addRateData("udp.establishDropped", 1);
return; // drop the packet
}
InboundEstablishState state = _inboundStates.get(from);
if (state == null) {
@ -386,18 +396,20 @@ class EstablishmentManager {
}
/**
* Got a SessionDestroy - maybe after an inbound establish
* Got a SessionDestroy - maybe after an inbound establish?
* As this packet was essentially unauthenticated (i.e. intro key, not session key)
* we just log it as it could be spoofed.
* @since 0.8.1
*/
void receiveSessionDestroy(RemoteHostId from) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Receive session destroy (IB) from: " + from);
InboundEstablishState state = _inboundStates.remove(from);
if (state != null) {
Hash peer = state.getConfirmedIdentity().calculateHash();
if (peer != null)
_transport.dropPeer(peer, false, "received destroy message");
}
if (_log.shouldLog(Log.WARN))
_log.warn("Receive session destroy (IB) from: " + from);
//InboundEstablishState state = _inboundStates.remove(from);
//if (state != null) {
// Hash peer = state.getConfirmedIdentity().calculateHash();
// if (peer != null)
// _transport.dropPeer(peer, false, "received destroy message");
//}
}
/**
@ -507,7 +519,7 @@ class EstablishmentManager {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Handle completely established (inbound): " + state.getRemoteHostId().toString()
+ " - " + peer.getRemotePeer().toBase64());
+ " - " + peer.getRemotePeer());
//if (true) // for now, only support direct
// peer.setRemoteRequiresIntroduction(false);
@ -546,22 +558,24 @@ class EstablishmentManager {
_transport.send(dsm, peer);
_context.simpleScheduler().addEvent(new PublishToNewInbound(peer), 0);
}
private class PublishToNewInbound implements SimpleTimer.TimedEvent {
private PeerState _peer;
private final PeerState _peer;
public PublishToNewInbound(PeerState peer) { _peer = peer; }
public void timeReached() {
Hash peer = _peer.getRemotePeer();
if ((peer != null) && (!_context.shitlist().isShitlisted(peer)) && (!_transport.isUnreachable(peer))) {
// ok, we are fine with them, send them our latest info
if (_log.shouldLog(Log.INFO))
_log.info("Publishing to the peer after confirm plus delay (without shitlist): " + peer.toBase64());
_log.info("Publishing to the peer after confirm plus delay (without shitlist): " + peer);
sendOurInfo(_peer, true);
} else {
// nuh uh. fuck 'em.
// nuh uh.
if (_log.shouldLog(Log.WARN))
_log.warn("NOT publishing to the peer after confirm plus delay (WITH shitlist): " + (peer != null ? peer.toBase64() : "unknown"));
_log.warn("NOT publishing to the peer after confirm plus delay (WITH shitlist): " + (peer != null ? peer.toString() : "unknown"));
}
_peer = null;
}
}
@ -589,7 +603,7 @@ class EstablishmentManager {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Handle completely established (outbound): " + state.getRemoteHostId().toString()
+ " - " + peer.getRemotePeer().toBase64());
+ " - " + peer.getRemotePeer());
_transport.addRemotePeerState(peer);
@ -671,7 +685,7 @@ class EstablishmentManager {
private void sendRequest(OutboundEstablishState state) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Send request to: " + state.getRemoteHostId().toString());
_log.debug("Send SessionRequest to: " + state.getRemoteHostId());
UDPPacket packet = _builder.buildSessionRequestPacket(state);
if (packet != null) {
_transport.send(packet);
@ -705,16 +719,19 @@ class EstablishmentManager {
_transport.send(requests[i]);
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Send intro for " + state.getRemoteHostId().toString() + " with our intro key as " + _transport.getIntroKey().toBase64());
_log.debug("Send intro for " + state.getRemoteHostId().toString() + " with our intro key as " + _transport.getIntroKey());
state.introSent();
}
private class FailIntroduction implements SimpleTimer.TimedEvent {
private long _nonce;
private OutboundEstablishState _state;
private final long _nonce;
private final OutboundEstablishState _state;
public FailIntroduction(OutboundEstablishState state, long nonce) {
_nonce = nonce;
_state = state;
}
public void timeReached() {
// remove only if value equal to state
boolean removed = _liveIntroductions.remove(Long.valueOf(_nonce), _state);
@ -741,7 +758,7 @@ class EstablishmentManager {
addr = InetAddress.getByAddress(ip);
} catch (UnknownHostException uhe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Introducer for " + state + " (" + bob + ") sent us an invalid IP for our targer: " + Base64.encode(ip), uhe);
_log.warn("Introducer for " + state + " (" + bob + ") sent us an invalid IP for our target: " + Addresses.toString(ip), uhe);
// these two cause this peer to requeue for a new intro peer
state.introductionFailed();
notifyActivity();
@ -750,7 +767,7 @@ class EstablishmentManager {
_context.statManager().addRateData("udp.receiveIntroRelayResponse", state.getLifetime(), 0);
int port = reader.getRelayResponseReader().readCharliePort();
if (_log.shouldLog(Log.INFO))
_log.info("Received relay intro for " + state.getRemoteIdentity().calculateHash().toBase64() + " - they are on "
_log.info("Received relay intro for " + state.getRemoteIdentity().calculateHash() + " - they are on "
+ addr.toString() + ":" + port + " (according to " + bob + ")");
RemoteHostId oldId = state.getRemoteHostId();
state.introduced(addr, ip, port);
@ -798,8 +815,9 @@ class EstablishmentManager {
long nextSendTime = -1;
InboundEstablishState inboundState = null;
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("# inbound states: " + _inboundStates.size());
//int active = _inboundStates.size();
//if (active > 0 && _log.shouldLog(Log.DEBUG))
// _log.debug("# inbound states: " + active);
for (Iterator<InboundEstablishState> iter = _inboundStates.values().iterator(); iter.hasNext(); ) {
InboundEstablishState cur = iter.next();
if (cur.getState() == InboundEstablishState.STATE_CONFIRMED_COMPLETELY) {
@ -810,7 +828,7 @@ class EstablishmentManager {
_log.debug("Removing completely confirmed inbound state");
break;
} else if (cur.getLifetime() > MAX_ESTABLISH_TIME) {
// took too long, fuck 'em
// took too long
iter.remove();
_context.statManager().addRateData("udp.inboundEstablishFailedState", cur.getState(), cur.getLifetime());
if (_log.shouldLog(Log.DEBUG))
@ -859,7 +877,7 @@ class EstablishmentManager {
if (remote != null) {
if (_context.shitlist().isShitlistedForever(remote.calculateHash())) {
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping inbound connection from permanently shitlisted peer: " + remote.calculateHash().toBase64());
_log.warn("Dropping inbound connection from permanently shitlisted peer: " + remote.calculateHash());
// So next time we will not accept the con, rather than doing the whole handshake
_context.blocklist().add(inboundState.getSentIP());
inboundState.fail();
@ -903,12 +921,11 @@ class EstablishmentManager {
//int remaining = 0;
//int active = 0;
//active = _outboundStates.size();
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("# outbound states: " + _outboundStates.size());
//int active = _outboundStates.size();
//if (active > 0 && _log.shouldLog(Log.DEBUG))
// _log.debug("# outbound states: " + active);
for (Iterator<OutboundEstablishState> iter = _outboundStates.values().iterator(); iter.hasNext(); ) {
OutboundEstablishState cur = iter.next();
if (cur == null) continue;
if (cur.getState() == OutboundEstablishState.STATE_CONFIRMED_COMPLETELY) {
// completely received
iter.remove();
@ -917,7 +934,7 @@ class EstablishmentManager {
_log.debug("Removing confirmed outbound: " + cur);
break;
} else if (cur.getLifetime() > MAX_ESTABLISH_TIME) {
// took too long, fuck 'em
// took too long
iter.remove();
outboundState = cur;
_context.statManager().addRateData("udp.outboundEstablishFailedState", cur.getState(), cur.getLifetime());
@ -956,6 +973,8 @@ class EstablishmentManager {
// _log.log(Log.CRIT, "Admitted " + admitted + " in push with " + remaining + " remaining queued and " + active + " active");
if (outboundState != null) {
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Processing for outbound: " + outboundState);
if (outboundState.getLifetime() > MAX_ESTABLISH_TIME) {
processExpired(outboundState);
} else {
@ -1051,8 +1070,6 @@ class EstablishmentManager {
while (_alive) {
try {
doPass();
} catch (OutOfMemoryError oom) {
throw oom;
} catch (RuntimeException re) {
_log.log(Log.CRIT, "Error in the establisher", re);
}

View File

@ -26,7 +26,7 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
private final UDPTransport _transport;
private final ACKSender _ackSender;
private final MessageReceiver _messageReceiver;
private boolean _alive;
private volatile boolean _alive;
/** decay the recently completed every 20 seconds */
private static final int DECAY_PERIOD = 10*1000;

View File

@ -59,7 +59,7 @@ abstract class MTU {
/**
* @return min of PeerState.MIN_MTU, max of PeerState.LARGE_MTU,
* rectifyed so rv % 16 == 12
* rectified so rv % 16 == 12
*/
public static int rectify(int mtu) {
int rv = mtu;

View File

@ -26,7 +26,7 @@ class MessageReceiver {
private final UDPTransport _transport;
/** list of messages (InboundMessageState) fully received but not interpreted yet */
private final BlockingQueue<InboundMessageState> _completeMessages;
private boolean _alive;
private volatile boolean _alive;
//private ByteCache _cache;
private static final int MIN_THREADS = 2; // unless < 32MB
@ -78,7 +78,7 @@ class MessageReceiver {
}
private class Runner implements Runnable {
private I2NPMessageHandler _handler;
private final I2NPMessageHandler _handler;
public Runner() { _handler = new I2NPMessageHandler(_context); }
public void run() { loop(_handler); }
}

View File

@ -58,6 +58,10 @@ class OutboundEstablishState {
// intro
private final UDPAddress _remoteAddress;
private boolean _complete;
// counts for backoff
private int _confirmedSentCount;
private int _requestSentCount;
private int _introSentCount;
/** nothin sent yet */
public static final int STATE_UNKNOWN = 0;
@ -72,6 +76,12 @@ class OutboundEstablishState {
/** we need to have someone introduce us to the peer, but haven't received a RelayResponse yet */
public static final int STATE_PENDING_INTRO = 5;
/** basic delay before backoff */
private static final long RETRANSMIT_DELAY = 1500;
/** max delay including backoff */
private static final long MAX_DELAY = 15*1000;
public OutboundEstablishState(RouterContext ctx, InetAddress remoteHost, int remotePort,
RouterIdentity remotePeer, SessionKey introKey, UDPAddress addr,
DHSessionKeyBuilder dh) {
@ -98,7 +108,7 @@ class OutboundEstablishState {
prepareSessionRequest();
if ( (addr != null) && (addr.getIntroducerCount() > 0) ) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("new outbound establish to " + remotePeer.calculateHash().toBase64() + ", with address: " + addr);
_log.debug("new outbound establish to " + remotePeer.calculateHash() + ", with address: " + addr);
_currentState = STATE_PENDING_INTRO;
}
}
@ -355,29 +365,36 @@ class OutboundEstablishState {
/** note that we just sent the SessionConfirmed packet */
public synchronized void confirmedPacketsSent() {
_lastSend = _context.clock().now();
_nextSend = _lastSend + 1000;
long delay = Math.min(RETRANSMIT_DELAY << (_confirmedSentCount++), MAX_DELAY);
_nextSend = _lastSend + delay;
if (_log.shouldLog(Log.DEBUG))
_log.debug("Send confirm packets, nextSend = 1s");
_log.debug("Send confirm packets, nextSend in " + delay);
if ( (_currentState == STATE_UNKNOWN) ||
(_currentState == STATE_REQUEST_SENT) ||
(_currentState == STATE_CREATED_RECEIVED) )
_currentState = STATE_CONFIRMED_PARTIALLY;
}
/** note that we just sent the SessionRequest packet */
public synchronized void requestSent() {
_lastSend = _context.clock().now();
_nextSend = _lastSend + 1000;
long delay = Math.min(RETRANSMIT_DELAY << (_requestSentCount++), MAX_DELAY);
_nextSend = _lastSend + delay;
if (_log.shouldLog(Log.DEBUG))
_log.debug("Send a request packet, nextSend = 1s");
_log.debug("Send a request packet, nextSend in " + delay);
if (_currentState == STATE_UNKNOWN)
_currentState = STATE_REQUEST_SENT;
}
/** note that we just sent the RelayRequest packet */
public synchronized void introSent() {
_lastSend = _context.clock().now();
_nextSend = _lastSend + 1000;
long delay = Math.min(RETRANSMIT_DELAY << (_introSentCount++), MAX_DELAY);
_nextSend = _lastSend + delay;
if (_currentState == STATE_UNKNOWN)
_currentState = STATE_PENDING_INTRO;
}
public synchronized void introductionFailed() {
_nextSend = _context.clock().now();
// keep the state as STATE_PENDING_INTRO, so next time the EstablishmentManager asks us

View File

@ -104,7 +104,7 @@ class OutboundMessageFragments {
void dropPeer(PeerState peer) {
if (_log.shouldLog(Log.INFO))
_log.info("Dropping peer " + peer.getRemotePeer().toBase64());
_log.info("Dropping peer " + peer.getRemotePeer());
peer.dropOutbound();
_activePeers.remove(peer);
}
@ -203,10 +203,10 @@ class OutboundMessageFragments {
boolean added = _activePeers.add(peer);
if (added) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Add a new message to a new peer " + peer.getRemotePeer().toBase64());
_log.debug("Add a new message to a new peer " + peer.getRemotePeer());
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Add a new message to an existing peer " + peer.getRemotePeer().toBase64());
_log.debug("Add a new message to an existing peer " + peer.getRemotePeer());
}
_context.statManager().addRateData("udp.outboundActivePeers", _activePeers.size(), 0);
@ -280,7 +280,7 @@ class OutboundMessageFragments {
// race with add()
_iterator.remove();
if (_log.shouldLog(Log.DEBUG))
_log.debug("No more pending messages for " + peer.getRemotePeer().toBase64());
_log.debug("No more pending messages for " + peer.getRemotePeer());
continue;
}
peersProcessed++;
@ -303,7 +303,7 @@ class OutboundMessageFragments {
if (peer != null && _log.shouldLog(Log.DEBUG))
_log.debug("Done looping, next peer we are sending for: " +
peer.getRemotePeer().toBase64());
peer.getRemotePeer());
// if we've gone all the way through the loop, wait
// ... unless nextSendDelay says we have more ready now

View File

@ -999,7 +999,7 @@ class PacketBuilder {
long tag = addr.getIntroducerTag(i);
if ( (ikey == null) || (iport <= 0) || (iaddr == null) || (tag <= 0) ) {
if (_log.shouldLog(_log.WARN))
_log.warn("Cannot build a relay request to " + state.getRemoteIdentity().calculateHash().toBase64()
_log.warn("Cannot build a relay request to " + state.getRemoteIdentity().calculateHash()
+ ", as their UDP address is invalid: addr=" + addr + " index=" + i);
continue;
}
@ -1126,7 +1126,7 @@ class PacketBuilder {
int off = HEADER_SIZE;
if (_log.shouldLog(Log.INFO))
_log.info("Sending relay response to " + alice + " for " + charlie + " with alice's intro key " + aliceIntroKey.toBase64());
_log.info("Sending relay response to " + alice + " for " + charlie + " with alice's intro key " + aliceIntroKey);
// now for the body
byte charlieIP[] = charlie.getRemoteIP();
@ -1277,7 +1277,7 @@ class PacketBuilder {
_log.debug("Authenticating " + packet.getPacket().getLength() +
"\nIV: " + Base64.encode(iv.getData()) +
"\nraw mac: " + Base64.encode(ba.getData()) +
"\nMAC key: " + macKey.toBase64());
"\nMAC key: " + macKey);
// ok, now lets put it back where it belongs...
System.arraycopy(data, hmacOff, data, encryptOffset, encryptSize);
//System.arraycopy(hmac.getData(), 0, data, hmacOff, UDPPacket.MAC_SIZE);

View File

@ -78,17 +78,17 @@ class PacketHandler {
//_context.statManager().createRateStat("udp.packetVerifyTimeSlow", "How long it takes the PacketHandler to verify a data packet after dequeueing when its slow (period is dequeue time)", "udp", UDPTransport.RATES);
//_context.statManager().createRateStat("udp.packetValidateMultipleCount", "How many times we validate a packet, if done more than once (period = afterValidate-enqueue)", "udp", UDPTransport.RATES);
//_context.statManager().createRateStat("udp.packetNoValidationLifetime", "How long packets that are never validated are around for", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.receivePacketSize.sessionRequest", "Packet size of the given inbound packet type (period is the packet's lifetime)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.receivePacketSize.sessionConfirmed", "Packet size of the given inbound packet type (period is the packet's lifetime)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.receivePacketSize.sessionCreated", "Packet size of the given inbound packet type (period is the packet's lifetime)", "udp", UDPTransport.RATES);
//_context.statManager().createRateStat("udp.receivePacketSize.sessionRequest", "Packet size of the given inbound packet type (period is the packet's lifetime)", "udp", UDPTransport.RATES);
//_context.statManager().createRateStat("udp.receivePacketSize.sessionConfirmed", "Packet size of the given inbound packet type (period is the packet's lifetime)", "udp", UDPTransport.RATES);
//_context.statManager().createRateStat("udp.receivePacketSize.sessionCreated", "Packet size of the given inbound packet type (period is the packet's lifetime)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.receivePacketSize.dataKnown", "Packet size of the given inbound packet type (period is the packet's lifetime)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.receivePacketSize.dataKnownAck", "Packet size of the given inbound packet type (period is the packet's lifetime)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.receivePacketSize.dataUnknown", "Packet size of the given inbound packet type (period is the packet's lifetime)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.receivePacketSize.dataUnknownAck", "Packet size of the given inbound packet type (period is the packet's lifetime)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.receivePacketSize.test", "Packet size of the given inbound packet type (period is the packet's lifetime)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.receivePacketSize.relayRequest", "Packet size of the given inbound packet type (period is the packet's lifetime)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.receivePacketSize.relayIntro", "Packet size of the given inbound packet type (period is the packet's lifetime)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.receivePacketSize.relayResponse", "Packet size of the given inbound packet type (period is the packet's lifetime)", "udp", UDPTransport.RATES);
//_context.statManager().createRateStat("udp.receivePacketSize.test", "Packet size of the given inbound packet type (period is the packet's lifetime)", "udp", UDPTransport.RATES);
//_context.statManager().createRateStat("udp.receivePacketSize.relayRequest", "Packet size of the given inbound packet type (period is the packet's lifetime)", "udp", UDPTransport.RATES);
//_context.statManager().createRateStat("udp.receivePacketSize.relayIntro", "Packet size of the given inbound packet type (period is the packet's lifetime)", "udp", UDPTransport.RATES);
//_context.statManager().createRateStat("udp.receivePacketSize.relayResponse", "Packet size of the given inbound packet type (period is the packet's lifetime)", "udp", UDPTransport.RATES);
}
public void startup() {
@ -144,7 +144,7 @@ class PacketHandler {
packet.received();
if (_log.shouldLog(Log.INFO))
_log.info("Received the packet " + packet);
_log.info("Received: " + packet);
_state = 4;
long queueTime = packet.getLifetime();
long handleStart = _context.clock().now();
@ -163,8 +163,8 @@ class PacketHandler {
_context.statManager().addRateData("udp.queueTime", queueTime, packet.getLifetime());
_state = 8;
if (_log.shouldLog(Log.INFO))
_log.info("Done receiving the packet " + packet);
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Done receiving: " + packet);
/********
if (handleTime > 1000) {
@ -219,8 +219,8 @@ class PacketHandler {
RemoteHostId rem = packet.getRemoteHost();
PeerState state = _transport.getPeerState(rem);
if (state == null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Packet received is not for a connected peer");
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Packet received is not for a connected peer");
_state = 11;
InboundEstablishState est = _establisher.getInboundState(rem);
if (est != null) {
@ -229,8 +229,8 @@ class PacketHandler {
_state = 12;
receivePacket(reader, packet, est);
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Packet received is not for an inbound establishment");
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Packet received is not for an inbound establishment");
_state = 13;
OutboundEstablishState oest = _establisher.getOutboundState(rem);
if (oest != null) {
@ -242,6 +242,7 @@ class PacketHandler {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Packet received is not for an inbound or outbound establishment");
// ok, not already known establishment, try as a new one
// Last chance for success, using our intro key
_state = 15;
receivePacket(reader, packet, NEW_PEER);
}
@ -260,6 +261,7 @@ class PacketHandler {
*/
private void receivePacket(UDPPacketReader reader, UDPPacket packet, PeerState state) {
_state = 17;
boolean isStray = false;
boolean isValid = packet.validate(state.getCurrentMACKey());
if (!isValid) {
_state = 18;
@ -277,9 +279,11 @@ class PacketHandler {
// process, so try our intro key
// (after an outbound establishment process, there wouldn't
// be any stray packets)
// These are generally PeerTest packets
if (_log.shouldLog(Log.DEBUG))
_log.debug("Validation with existing con failed, but validation as reestablish/stray passed");
packet.decrypt(_transport.getIntroKey());
isStray = true;
} else {
_state = 21;
InboundEstablishState est = _establisher.getInboundState(packet.getRemoteHost());
@ -305,13 +309,15 @@ class PacketHandler {
}
_state = 25;
handlePacket(reader, packet, state, null, null);
handlePacket(reader, packet, state, null, null, !isStray);
_state = 26;
}
/**
* New conn or failed validation
* Decrypt and validate the packet then call handlePacket()
* New conn or failed validation - we have no Session Key.
* Here we attempt to validate the packet with our intro key,
* then decrypt the packet with our intro key,
* then call handlePacket().
*
* @param peerType OUTBOUND_FALLBACK, INBOUND_FALLBACK, or NEW_PEER
*/
@ -319,6 +325,8 @@ class PacketHandler {
_state = 27;
boolean isValid = packet.validate(_transport.getIntroKey());
if (!isValid) {
// Note that the vast majority of these are NOT corrupted packets, but
// packets for which we don't have the PeerState (i.e. SessionKey)
if (_log.shouldLog(Log.WARN))
_log.warn("Invalid introduction packet received: " + packet, new Exception("path"));
_context.statManager().addRateData("udp.droppedInvalidEstablish", packet.getLifetime(), packet.getExpiration());
@ -340,9 +348,14 @@ class PacketHandler {
_log.debug("Valid introduction packet received: " + packet);
}
// Packets that get here are probably one of:
// 304 byte Session Request
// 96 byte Relay Request
// 60 byte Relay Response
// 80 byte Peer Test
_state = 29;
packet.decrypt(_transport.getIntroKey());
handlePacket(reader, packet, null, null, null);
handlePacket(reader, packet, null, null, null, false);
_state = 30;
}
@ -379,7 +392,7 @@ class PacketHandler {
_state = 32;
packet.decrypt(state.getCipherKey());
handlePacket(reader, packet, null, null, null);
handlePacket(reader, packet, null, null, null, true);
return;
} else {
if (_log.shouldLog(Log.WARN))
@ -423,7 +436,7 @@ class PacketHandler {
_log.info("Valid introduction packet received for outbound established con: " + packet);
_state = 37;
packet.decrypt(state.getCipherKey());
handlePacket(reader, packet, null, state, null);
handlePacket(reader, packet, null, state, null, true);
_state = 38;
return;
}
@ -436,7 +449,7 @@ class PacketHandler {
_log.info("Valid introduction packet received for outbound established con with old intro key: " + packet);
_state = 39;
packet.decrypt(state.getIntroKey());
handlePacket(reader, packet, null, state, null);
handlePacket(reader, packet, null, state, null, true);
_state = 40;
return;
} else {
@ -457,8 +470,11 @@ class PacketHandler {
* @param state non-null if fully established
* @param outState non-null if outbound establishing in process
* @param inState unused always null
* @param isAuthenticated true if a state key was used, false if our own intro key was used
*/
private void handlePacket(UDPPacketReader reader, UDPPacket packet, PeerState state, OutboundEstablishState outState, InboundEstablishState inState) {
private void handlePacket(UDPPacketReader reader, UDPPacket packet, PeerState state,
OutboundEstablishState outState, InboundEstablishState inState,
boolean isAuthenticated) {
_state = 43;
reader.initialize(packet);
_state = 44;
@ -470,7 +486,8 @@ class PacketHandler {
if (state != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received packet from " + state.getRemoteHostId().toString() + " with skew " + skew);
state.adjustClockSkew(skew);
if (isAuthenticated)
state.adjustClockSkew(skew);
}
_context.statManager().addRateData("udp.receivePacketSkew", skew, packet.getLifetime());
@ -502,24 +519,40 @@ class PacketHandler {
RemoteHostId from = packet.getRemoteHost();
_state = 46;
switch (reader.readPayloadType()) {
int type = reader.readPayloadType();
switch (type) {
case UDPPacket.PAYLOAD_TYPE_SESSION_REQUEST:
_state = 47;
_establisher.receiveSessionRequest(from, reader);
_context.statManager().addRateData("udp.receivePacketSize.sessionRequest", packet.getPacket().getLength(), packet.getLifetime());
//_context.statManager().addRateData("udp.receivePacketSize.sessionRequest", packet.getPacket().getLength(), packet.getLifetime());
break;
case UDPPacket.PAYLOAD_TYPE_SESSION_CONFIRMED:
_state = 48;
if (!isAuthenticated) {
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping unauthenticated type " + type + ": " + packet);
break;
}
_establisher.receiveSessionConfirmed(from, reader);
_context.statManager().addRateData("udp.receivePacketSize.sessionConfirmed", packet.getPacket().getLength(), packet.getLifetime());
//_context.statManager().addRateData("udp.receivePacketSize.sessionConfirmed", packet.getPacket().getLength(), packet.getLifetime());
break;
case UDPPacket.PAYLOAD_TYPE_SESSION_CREATED:
_state = 49;
if (!isAuthenticated) {
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping unauthenticated type " + type + ": " + packet);
break;
}
_establisher.receiveSessionCreated(from, reader);
_context.statManager().addRateData("udp.receivePacketSize.sessionCreated", packet.getPacket().getLength(), packet.getLifetime());
//_context.statManager().addRateData("udp.receivePacketSize.sessionCreated", packet.getPacket().getLength(), packet.getLifetime());
break;
case UDPPacket.PAYLOAD_TYPE_DATA:
_state = 50;
if (!isAuthenticated) {
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping unauthenticated type " + type + ": " + packet);
break;
}
if (outState != null)
state = _establisher.receiveData(outState);
if (_log.shouldLog(Log.DEBUG))
@ -545,6 +578,7 @@ class PacketHandler {
if (dr.readFragmentCount() <= 0)
_context.statManager().addRateData("udp.receivePacketSize.dataKnownAck", packet.getPacket().getLength(), packet.getLifetime());
} else {
// doesn't happen
_context.statManager().addRateData("udp.receivePacketSize.dataUnknown", packet.getPacket().getLength(), packet.getLifetime());
UDPPacketReader.DataReader dr = reader.getDataReader();
if (dr.readFragmentCount() <= 0)
@ -556,39 +590,46 @@ class PacketHandler {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received test packet: " + reader + " from " + from);
_testManager.receiveTest(from, reader);
_context.statManager().addRateData("udp.receivePacketSize.test", packet.getPacket().getLength(), packet.getLifetime());
//_context.statManager().addRateData("udp.receivePacketSize.test", packet.getPacket().getLength(), packet.getLifetime());
break;
case UDPPacket.PAYLOAD_TYPE_RELAY_REQUEST:
if (_log.shouldLog(Log.INFO))
_log.info("Received relay request packet: " + reader + " from " + from);
_introManager.receiveRelayRequest(from, reader);
_context.statManager().addRateData("udp.receivePacketSize.relayRequest", packet.getPacket().getLength(), packet.getLifetime());
//_context.statManager().addRateData("udp.receivePacketSize.relayRequest", packet.getPacket().getLength(), packet.getLifetime());
break;
case UDPPacket.PAYLOAD_TYPE_RELAY_INTRO:
if (!isAuthenticated) {
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping unauthenticated type " + type + ": " + packet);
break;
}
if (_log.shouldLog(Log.INFO))
_log.info("Received relay intro packet: " + reader + " from " + from);
_introManager.receiveRelayIntro(from, reader);
_context.statManager().addRateData("udp.receivePacketSize.relayIntro", packet.getPacket().getLength(), packet.getLifetime());
//_context.statManager().addRateData("udp.receivePacketSize.relayIntro", packet.getPacket().getLength(), packet.getLifetime());
break;
case UDPPacket.PAYLOAD_TYPE_RELAY_RESPONSE:
if (_log.shouldLog(Log.INFO))
_log.info("Received relay response packet: " + reader + " from " + from);
_establisher.receiveRelayResponse(from, reader);
_context.statManager().addRateData("udp.receivePacketSize.relayResponse", packet.getPacket().getLength(), packet.getLifetime());
//_context.statManager().addRateData("udp.receivePacketSize.relayResponse", packet.getPacket().getLength(), packet.getLifetime());
break;
case UDPPacket.PAYLOAD_TYPE_SESSION_DESTROY:
_state = 53;
if (outState != null)
if (!isAuthenticated)
_establisher.receiveSessionDestroy(from); // drops
else if (outState != null)
_establisher.receiveSessionDestroy(from, outState);
else if (state != null)
_establisher.receiveSessionDestroy(from, state);
else
_establisher.receiveSessionDestroy(from);
_establisher.receiveSessionDestroy(from); // drops
break;
default:
_state = 52;
if (_log.shouldLog(Log.WARN))
_log.warn("Unknown payload type: " + reader.readPayloadType());
_log.warn("Unknown payload type: " + type);
_context.statManager().addRateData("udp.droppedInvalidUnknown", packet.getLifetime(), packet.getExpiration());
return;
}

View File

@ -1167,7 +1167,7 @@ class PeerState {
}
state.setPeer(this);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Adding to " + _remotePeer.toBase64() + ": " + state.getMessageId());
_log.debug("Adding to " + _remotePeer + ": " + state.getMessageId());
int rv = 0;
boolean fail = false;
synchronized (_outboundMessages) {
@ -1338,7 +1338,7 @@ class PeerState {
for (OutboundMessageState state : _outboundMessages) {
if (locked_shouldSend(state)) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Allocate sending to " + _remotePeer.toBase64() + ": " + state.getMessageId());
_log.debug("Allocate sending to " + _remotePeer + ": " + state.getMessageId());
/*
while (iter.hasNext()) {
OutboundMessageState later = (OutboundMessageState)iter.next();
@ -1356,7 +1356,7 @@ class PeerState {
}
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Nothing to send to " + _remotePeer.toBase64() + ", with " + _outboundMessages.size() + " remaining");
_log.debug("Nothing to send to " + _remotePeer + ", with " + _outboundMessages.size() + " remaining");
return null;
}
@ -1529,7 +1529,7 @@ class PeerState {
//}
if (_log.shouldLog(Log.INFO))
_log.info("Received ack of " + messageId + " by " + _remotePeer.toBase64()
_log.info("Received ack of " + messageId + " by " + _remotePeer
+ " after " + state.getLifetime() + " and " + numSends + " sends");
_context.statManager().addRateData("udp.sendConfirmTime", state.getLifetime(), state.getLifetime());
if (state.getFragmentCount() > 1)
@ -1597,7 +1597,7 @@ class PeerState {
_context.statManager().addRateData("udp.partialACKReceived", numACKed, state.getLifetime());
if (_log.shouldLog(Log.INFO))
_log.info("Received partial ack of " + state.getMessageId() + " by " + _remotePeer.toBase64()
_log.info("Received partial ack of " + state.getMessageId() + " by " + _remotePeer
+ " after " + state.getLifetime() + " and " + numSends + " sends: " + bitfield + ": completely removed? "
+ isComplete + ": " + state);
@ -1714,7 +1714,7 @@ class PeerState {
@Override
public String toString() {
StringBuilder buf = new StringBuilder(64);
StringBuilder buf = new StringBuilder(256);
buf.append(_remoteHostId.toString());
if (_remotePeer != null)
buf.append(" ").append(_remotePeer.toBase64().substring(0,6));

View File

@ -280,13 +280,13 @@ class UDPPacket {
verifyNotReleased();
StringBuilder buf = new StringBuilder(256);
buf.append(_packet.getLength());
buf.append(" byte packet with ");
buf.append(" byte pkt with ");
buf.append(_packet.getAddress().getHostAddress()).append(":");
buf.append(_packet.getPort());
//buf.append(" id=").append(System.identityHashCode(this));
buf.append(" msg type=").append(_messageType);
buf.append(" mark type=").append(_markedType);
buf.append(" frag count=").append(_fragmentCount);
buf.append(" msgType=").append(_messageType);
buf.append(" markType=").append(_markedType);
buf.append(" fragCount=").append(_fragmentCount);
buf.append(" sinceEnqueued=").append((_enqueueTime > 0 ? _context.clock().now()-_enqueueTime : -1));
buf.append(" sinceReceived=").append((_receivedTime > 0 ? _context.clock().now()-_receivedTime : -1));

View File

@ -60,7 +60,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
private OutboundRefiller _refiller;
private PacketPusher _pusher;
private final InboundMessageFragments _inboundFragments;
private UDPFlooder _flooder;
//private UDPFlooder _flooder;
private PeerTestManager _testManager;
private final IntroductionManager _introManager;
private final ExpirePeerEvent _expireEvent;
@ -156,7 +156,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
private static final int PRIORITY_WEIGHT[] = new int[] { 1, 1, 1, 1, 1, 2 };
/** should we flood all UDP peers with the configured rate? This is for testing only! */
private static final boolean SHOULD_FLOOD_PEERS = false;
//private static final boolean SHOULD_FLOOD_PEERS = false;
private static final int MAX_CONSECUTIVE_FAILED = 5;
@ -207,8 +207,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_destroyBuilder = new PacketBuilder(_context, this);
_fragments = new OutboundMessageFragments(_context, this, _activeThrottle);
_inboundFragments = new InboundMessageFragments(_context, _fragments, this);
if (SHOULD_FLOOD_PEERS)
_flooder = new UDPFlooder(_context, this);
//if (SHOULD_FLOOD_PEERS)
// _flooder = new UDPFlooder(_context, this);
_expireTimeout = EXPIRE_TIMEOUT;
_expireEvent = new ExpirePeerEvent();
_testEvent = new PeerTestEvent();
@ -248,8 +248,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
if (_refiller != null)
_refiller.shutdown();
_inboundFragments.shutdown();
if (_flooder != null)
_flooder.shutdown();
//if (_flooder != null)
// _flooder.shutdown();
_introManager.reset();
UDPPacket.clearCache();
@ -326,8 +326,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
if (USE_PRIORITY && _refiller == null)
_refiller = new OutboundRefiller(_context, _fragments, _outboundMessages);
if (SHOULD_FLOOD_PEERS && _flooder == null)
_flooder = new UDPFlooder(_context, this);
//if (SHOULD_FLOOD_PEERS && _flooder == null)
// _flooder = new UDPFlooder(_context, this);
// Startup the endpoint with the requested port, check the actual port, and
// take action if it failed or was different than requested or it needs to be saved
@ -343,8 +343,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
// attempt to use it as our external port - this will be overridden by
// externalAddressReceived(...)
Map<String, String> changes = new HashMap();
changes.put(PROP_INTERNAL_PORT, newPort+"");
changes.put(PROP_EXTERNAL_PORT, newPort+"");
changes.put(PROP_INTERNAL_PORT, Integer.toString(newPort));
changes.put(PROP_EXTERNAL_PORT, Integer.toString(newPort));
_context.router().saveConfig(changes, null);
}
@ -356,8 +356,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_pusher.startup();
if (USE_PRIORITY)
_refiller.startup();
if (SHOULD_FLOOD_PEERS)
_flooder.startup();
//if (SHOULD_FLOOD_PEERS)
// _flooder.startup();
_expireEvent.setIsAlive(true);
_testEvent.setIsAlive(true); // this queues it for 3-6 minutes in the future...
SimpleTimer.getInstance().addEvent(_testEvent, 10*1000); // lets requeue it for Real Soon
@ -367,8 +367,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
destroyAll();
if (_endpoint != null)
_endpoint.shutdown();
if (_flooder != null)
_flooder.shutdown();
//if (_flooder != null)
// _flooder.shutdown();
if (_refiller != null)
_refiller.shutdown();
if (_handler != null)
@ -392,7 +392,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
* Introduction key that people should use to contact us
*
*/
public SessionKey getIntroKey() { return _introKey; }
SessionKey getIntroKey() { return _introKey; }
public int getLocalPort() { return _externalListenPort; }
public InetAddress getLocalAddress() { return _externalListenHost; }
public int getExternalPort() { return _externalListenPort; }
@ -433,7 +434,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
* @return limited to range PeerState.MIN_MTU to PeerState.LARGE_MTU.
* @since 0.9.2
*/
public int getMTU() {
int getMTU() {
return _mtu;
}
@ -512,7 +513,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
boolean inboundRecent = _lastInboundReceivedOn + ALLOW_IP_CHANGE_INTERVAL > System.currentTimeMillis();
if (_log.shouldLog(Log.INFO))
_log.info("External address received: " + Addresses.toString(ourIP, ourPort) + " from "
+ from.toBase64() + ", isValid? " + isValid + ", explicitSpecified? " + explicitSpecified
+ from + ", isValid? " + isValid + ", explicitSpecified? " + explicitSpecified
+ ", receivedInboundRecent? " + inboundRecent + " status " + _reachabilityStatus);
if (explicitSpecified)
@ -524,7 +525,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
if (!isValid) {
// ignore them
if (_log.shouldLog(Log.ERROR))
_log.error("The router " + from.toBase64() + " told us we have an invalid IP - "
_log.error("The router " + from + " told us we have an invalid IP - "
+ Addresses.toString(ourIP, ourPort) + ". Lets throw tomatoes at them");
markUnreachable(from);
//_context.shitlist().shitlistRouter(from, "They said we had an invalid IP", STYLE);
@ -541,11 +542,11 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_lastOurIP = ourIP;
_lastOurPort = ourPort;
if (_log.shouldLog(Log.INFO))
_log.info("The router " + from.toBase64() + " told us we have a new IP - "
_log.info("The router " + from + " told us we have a new IP - "
+ Addresses.toString(ourIP, ourPort) + ". Wait until somebody else tells us the same thing.");
} else {
if (_log.shouldLog(Log.INFO))
_log.info(from.toBase64() + " and " + _lastFrom.toBase64() + " agree we have a new IP - "
_log.info(from + " and " + _lastFrom + " agree we have a new IP - "
+ Addresses.toString(ourIP, ourPort) + ". Changing address.");
_lastFrom = from;
_lastOurIP = ourIP;
@ -617,7 +618,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_context.statManager().addRateData("udp.addressUpdated", 1, 0);
Map<String, String> changes = new HashMap();
if (!fixedPort)
changes.put(PROP_EXTERNAL_PORT, ourPort+"");
changes.put(PROP_EXTERNAL_PORT, Integer.toString(ourPort));
// queue a country code lookup of the new IP
_context.commSystem().queueLookup(ourIP);
// store these for laptop-mode (change ident on restart... or every time... when IP changes)
@ -633,7 +634,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
}
changes.put(PROP_IP, _externalListenHost.getHostAddress());
changes.put(PROP_IP_CHANGE, "" + now);
changes.put(PROP_IP_CHANGE, Long.toString(now));
_context.router().saveConfig(changes, null);
// laptop mode
@ -678,6 +679,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
private boolean getIsPortFixed() {
return DEFAULT_FIXED_PORT.equals(_context.getProperty(PROP_FIXED_PORT, DEFAULT_FIXED_PORT));
}
/**
* get the state for the peer at the given remote host/port, or null
* if no state exists
@ -690,7 +692,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
* get the state for the peer with the given ident, or null
* if no state exists
*/
public PeerState getPeerState(Hash remotePeer) {
PeerState getPeerState(Hash remotePeer) {
return _peersByIdent.get(remotePeer);
}
@ -698,7 +700,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
* get the state for the peer being introduced, or null if we aren't
* offering to introduce anyone with that tag.
*/
public PeerState getPeerState(long relayTag) {
PeerState getPeerState(long relayTag) {
return _introManager.get(relayTag);
}
@ -785,7 +787,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
RemoteHostId remoteId = peer.getRemoteHostId();
if (remoteId == null) return false;
if (remoteId.getIP() == null && _log.shouldLog(Log.WARN))
_log.warn("Add indirect: " + peer);
oldPeer = _peersByRemoteHost.put(remoteId, peer);
if ( (oldPeer != null) && (oldPeer != peer) ) {
// transfer over the old state/inbound message fragments/etc
@ -806,8 +810,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
markReachable(peer.getRemotePeer(), peer.isInbound());
//_context.shitlist().unshitlistRouter(peer.getRemotePeer(), STYLE);
if (SHOULD_FLOOD_PEERS)
_flooder.addPeer(peer);
//if (SHOULD_FLOOD_PEERS)
// _flooder.addPeer(peer);
_expireEvent.add(peer);
@ -873,7 +877,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
//_context.shitlist().shitlistRouter(peerHash, "Part of the wrong network", STYLE);
dropPeer(peerHash, false, "wrong network");
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping the peer " + peerHash.toBase64() + " because they are in the wrong net: " + entry);
_log.warn("Dropping the peer " + peerHash + " because they are in the wrong net: " + entry);
return;
} else {
if (entry.getType() == DatabaseEntry.KEY_TYPE_ROUTERINFO) {
@ -986,8 +990,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
// unchoke 'em, but just because we'll never talk again...
_activeThrottle.unchoke(peer.getRemotePeer());
if (SHOULD_FLOOD_PEERS)
_flooder.removePeer(peer);
//if (SHOULD_FLOOD_PEERS)
// _flooder.removePeer(peer);
_expireEvent.remove(peer);
if (needsRebuild())
@ -1045,7 +1049,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_log.info("Our direct SSU info is initialized, but not used in our address yet");
rv = true;
} else {
_log.info("Our direct SSU info is initialized");
//_log.info("Our direct SSU info is initialized");
}
}
}
@ -1173,7 +1177,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
return _cachedBid[TRANSIENT_FAIL_BID];
if (_log.shouldLog(Log.DEBUG))
_log.debug("bidding on a message to an unestablished peer: " + to.toBase64());
_log.debug("bidding on a message to an unestablished peer: " + to);
// Try to maintain at least 3 peers so we can determine our IP address and
// we have a selection to run peer tests with.
@ -1235,7 +1239,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
Hash to = msg.getTarget().getIdentity().calculateHash();
PeerState peer = getPeerState(to);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending to " + (to != null ? to.toBase64() : ""));
_log.debug("Sending to " + (to != null ? to.toString() : ""));
if (peer != null) {
long lastSend = peer.getLastSendFullyTime();
long lastRecv = peer.getLastReceiveTime();
@ -1250,7 +1254,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
dropPeer(peer, false, "proactive reconnection");
msg.timestamp("peer is really idle, dropping con and reestablishing");
if (_log.shouldLog(Log.DEBUG))
_log.debug("Proactive reestablish to " + to.toBase64());
_log.debug("Proactive reestablish to " + to);
_establisher.establish(msg);
_context.statManager().addRateData("udp.proactiveReestablish", now-lastSend, now-peer.getKeyEstablishedTime());
return;
@ -1258,7 +1262,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
}
msg.timestamp("enqueueing for an already established peer");
if (_log.shouldLog(Log.DEBUG))
_log.debug("Add to fragments for " + to.toBase64());
_log.debug("Add to fragments for " + to);
// See comments in DummyThrottle.java
if (USE_PRIORITY)
@ -1267,7 +1271,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_fragments.add(msg);
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Establish new connection to " + to.toBase64());
_log.debug("Establish new connection to " + to);
msg.timestamp("establishing a new connection");
_establisher.establish(msg);
}
@ -1494,6 +1498,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
}
public void failed(OutboundMessageState msg) { failed(msg, true); }
void failed(OutboundMessageState msg, boolean allowPeerFailure) {
if (msg == null) return;
int consecutive = 0;
@ -1677,8 +1682,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
private static final int FLAG_RESEND = 14;
private static final int FLAG_DUP = 15;
private static final int FLAG_UPTIME = 16;
private static final int FLAG_DEBUG = 99;
private Comparator getComparator(int sortFlags) {
private static Comparator getComparator(int sortFlags) {
Comparator rv = null;
switch (Math.abs(sortFlags)) {
case FLAG_IDLE_IN:
@ -1738,6 +1744,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
rv = new InverseComparator(rv);
return rv;
}
private static class AlphaComparator extends PeerComparator {
private static final AlphaComparator _instance = new AlphaComparator();
public static final AlphaComparator instance() { return _instance; }
@ -1942,7 +1949,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
}
private static class InverseComparator implements Comparator {
private Comparator _comp;
private final Comparator _comp;
public InverseComparator(Comparator comp) { _comp = comp; }
public int compare(Object lhs, Object rhs) {
return -1 * _comp.compare(lhs, rhs);
@ -1968,8 +1975,6 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
}
}
//public void renderStatusHTML(Writer out) throws IOException { renderStatusHTML(out, 0); }
public void renderStatusHTML(Writer out, int sortFlags) throws IOException {}
@Override
public void renderStatusHTML(Writer out, String urlBase, int sortFlags) throws IOException {
TreeSet<PeerState> peers = new TreeSet(getComparator(sortFlags));
@ -2227,8 +2232,23 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
buf.append("</b></td><td align=\"center\"><b>").append(_mtu).append("</b></td><td align=\"center\"><b>");
buf.append(sendTotal).append("</b></td><td align=\"center\"><b>").append(recvTotal).append("</b></td>\n" +
"<td align=\"center\"><b>").append(resentTotal);
buf.append("</b></td><td align=\"center\"><b>").append(dupRecvTotal).append("</b></td>\n" +
"</tr></table>\n");
buf.append("</b></td><td align=\"center\"><b>").append(dupRecvTotal).append("</b></td></tr>\n");
if (sortFlags == FLAG_DEBUG) {
buf.append("<tr><td colspan=\"16\">");
buf.append("peersByIdent: ").append(_peersByIdent.size());
buf.append(" peersByRemoteHost: ").append(_peersByRemoteHost.size());
int dir = 0;
int indir = 0;
for (RemoteHostId rhi : _peersByRemoteHost.keySet()) {
if (rhi.getIP() != null)
dir++;
else
indir++;
}
buf.append(" pBRH direct: ").append(dir).append(" indirect: ").append(indir);
buf.append("</td></tr>");
}
buf.append("</table>\n");
/*****
long bytesTransmitted = _context.bandwidthLimiter().getTotalAllocatedOutboundBytes();
@ -2283,11 +2303,13 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
private class ExpirePeerEvent implements SimpleTimer.TimedEvent {
private final Set<PeerState> _expirePeers;
private final List<PeerState> _expireBuffer;
private boolean _alive;
private volatile boolean _alive;
public ExpirePeerEvent() {
_expirePeers = new ConcurrentHashSet(128);
_expireBuffer = new ArrayList();
}
public void timeReached() {
// Increase allowed idle time if we are well under allowed connections, otherwise decrease
if (haveCapacity())
@ -2385,6 +2407,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
}
}
private static final String PROP_REACHABILITY_STATUS_OVERRIDE = "i2np.udp.status";
@Override
public short getReachabilityStatus() {
String override = _context.getProperty(PROP_REACHABILITY_STATUS_OVERRIDE);
@ -2428,7 +2451,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
}
private class PeerTestEvent implements SimpleTimer.TimedEvent {
private boolean _alive;
private volatile boolean _alive;
/** when did we last test our reachability */
private long _lastTested;
private boolean _forceRun;