* Transports: Don't send a duplicate store of our RI at

start of a connection (ticket #1187)
 * NTCP:
   - Lower send priority of the RI at exchange
   - Bob will now send his RI even if he doesn't have Alice's
   - Send RI again sooner on long-lived connections
This commit is contained in:
zzz
2014-03-05 16:32:04 +00:00
parent 17c80c29e6
commit 597662d0dc
8 changed files with 103 additions and 31 deletions

View File

@ -1,3 +1,11 @@
2014-03-05 zzz
* Transports: Don't send a duplicate store of our RI at
start of a connection (ticket #1187)
* NTCP:
- Lower send priority of the RI at exchange
- Bob will now send his RI even if he doesn't have Alice's
- Send RI again sooner on long-lived connections
2014-03-05 str4d
* Console: Updated website URLs in readme.html files
* susimail: Removed remaining Jetty dependencies in susimail

View File

@ -69,13 +69,13 @@ public class OutNetMessage implements CDPQEntry {
public static final int PRIORITY_EXPLORATORY = 455;
/** may be adjusted +/- 25 for outbound traffic */
public static final int PRIORITY_MY_DATA = 425;
public static final int PRIORITY_MY_NETDB_STORE_LOW = 300;
public static final int PRIORITY_HIS_BUILD_REQUEST = 300;
public static final int PRIORITY_BUILD_REPLY = 300;
public static final int PRIORITY_NETDB_REPLY = 300;
public static final int PRIORITY_HIS_NETDB_STORE = 200;
public static final int PRIORITY_NETDB_FLOOD = 200;
public static final int PRIORITY_PARTICIPATING = 200;
public static final int PRIORITY_MY_NETDB_STORE_LOW = 150;
public static final int PRIORITY_NETDB_EXPLORE = 100;
public static final int PRIORITY_NETDB_HARVEST = 100;
public static final int PRIORITY_LOWEST = 100;

View File

@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 12;
public final static long BUILD = 13;
/** for example "-test" */
public final static String EXTRA = "";

View File

@ -233,6 +233,11 @@ public abstract class TransportImpl implements Transport {
* @param allowRequeue true if we should try other transports if available
*/
protected void afterSend(OutNetMessage msg, boolean sendSuccessful, boolean allowRequeue, long msToSend) {
if (msg.getTarget() == null) {
// Probably injected by the transport.
// Bail out now as it will NPE in a dozen places below.
return;
}
boolean log = false;
if (sendSuccessful)
msg.timestamp("afterSend(successful)");
@ -244,7 +249,7 @@ public abstract class TransportImpl implements Transport {
if (msToSend > 1500) {
if (_log.shouldLog(Log.INFO))
_log.warn(getStyle() + " afterSend slow: " + (sendSuccessful ? "success " : "FAIL ")
_log.info(getStyle() + " afterSend slow: " + (sendSuccessful ? "success " : "FAIL ")
+ msg.getMessageSize() + " byte "
+ msg.getMessageType() + ' ' + msg.getMessageId() + " to "
+ msg.getTarget().getIdentity().calculateHash().toBase64().substring(0,6) + " took " + msToSend + " ms");

View File

@ -36,6 +36,7 @@ import net.i2p.util.ConcurrentHashSet;
import net.i2p.util.HexDump;
import net.i2p.util.Log;
import net.i2p.util.SystemVersion;
import net.i2p.util.VersionComparator;
/**
* Coordinate the connection to a single peer.
@ -143,7 +144,8 @@ class NTCPConnection {
private static final int META_FREQUENCY = 45*60*1000;
/** how often we send our routerinfo unsolicited */
private static final int INFO_FREQUENCY = 90*60*1000;
private static final int INFO_FREQUENCY = 50*60*1000;
/**
* Why this is 16K, and where it is documented, good question?
* We claim we can do 32K datagrams so this is a problem.
@ -156,7 +158,8 @@ class NTCPConnection {
/** 2 bytes for length and 4 for CRC */
public static final int MAX_MSG_SIZE = BUFFER_SIZE - (2 + 4);
private static final int PRIORITY = OutNetMessage.PRIORITY_MY_NETDB_STORE_LOW;
private static final int INFO_PRIORITY = OutNetMessage.PRIORITY_MY_NETDB_STORE_LOW;
private static final String FIXED_RI_VERSION = "0.9.12";
/**
* Create an inbound connected (though not established) NTCP connection
@ -271,6 +274,7 @@ class NTCPConnection {
_context.statManager().addRateData("ntcp.inboundEstablishedDuplicate", toClose.getUptime());
toClose.close();
}
enqueueInfoMessage();
}
private synchronized NTCPConnection locked_finishInboundEstablishment(
@ -301,7 +305,9 @@ class NTCPConnection {
}
public long getMessagesSent() { return _messagesWritten; }
public long getMessagesReceived() { return _messagesRead; }
public long getOutboundQueueSize() {
int queued;
synchronized(_outbound) {
@ -487,25 +493,36 @@ class NTCPConnection {
}
}
/**
* Inject a DatabaseStoreMessage with our RouterInfo
*/
public void enqueueInfoMessage() {
RouterInfo target = _context.netDb().lookupRouterInfoLocally(_remotePeer.calculateHash());
if (target != null) {
DatabaseStoreMessage dsm = new DatabaseStoreMessage(_context);
dsm.setEntry(_context.router().getRouterInfo());
OutNetMessage infoMsg = new OutNetMessage(_context, dsm, _context.clock().now()+10*1000, PRIORITY, target);
infoMsg.beginSend();
_context.statManager().addRateData("ntcp.infoMessageEnqueued", 1);
send(infoMsg);
// See comment below
//enqueueFloodfillMessage(target);
} else {
if (_isInbound) {
// ok, we shouldn't have enqueued it yet, as we havent received their info
} else {
// how did we make an outbound connection to someone we don't know about?
}
}
int priority = INFO_PRIORITY;
//if (!_isInbound) {
// Workaround for bug at Bob's end.
// This probably isn't helpful because Bob puts the store on the job queue.
// Prior to 0.9.12, Bob would only send his RI if he had our RI after
// the first received message, so make sure it is first in our queue.
// As of 0.9.12 this is fixed and Bob will always send his RI.
// RouterInfo target = _context.netDb().lookupRouterInfoLocally(_remotePeer.calculateHash());
// if (target != null) {
// String v = target.getOption("router.version");
// if (v == null || VersionComparator.comp(v, FIXED_RI_VERSION) < 0) {
// priority = OutNetMessage.PRIORITY_HIGHEST;
// }
// } else {
// priority = OutNetMessage.PRIORITY_HIGHEST;
// }
//}
if (_log.shouldLog(Log.INFO))
_log.info("SENDING INFO message pri. " + priority + ": " + toString());
DatabaseStoreMessage dsm = new DatabaseStoreMessage(_context);
dsm.setEntry(_context.router().getRouterInfo());
// We are injecting directly, so we can use a null target.
OutNetMessage infoMsg = new OutNetMessage(_context, dsm, _context.clock().now()+10*1000, priority, null);
infoMsg.beginSend();
//_context.statManager().addRateData("ntcp.infoMessageEnqueued", 1);
send(infoMsg);
}
//private static final int PEERS_TO_FLOOD = 3;
@ -1474,8 +1491,6 @@ class NTCPConnection {
if (read != null) {
_transport.messageReceived(read, _remotePeer, null, timeToRecv, _size);
if (_messagesRead <= 0)
enqueueInfoMessage();
_lastReceiveTime = System.currentTimeMillis();
_messagesRead++;
}

View File

@ -27,6 +27,8 @@ import net.i2p.data.Hash;
import net.i2p.data.RouterAddress;
import net.i2p.data.RouterIdentity;
import net.i2p.data.RouterInfo;
import net.i2p.data.i2np.DatabaseStoreMessage;
import net.i2p.data.i2np.I2NPMessage;
import net.i2p.router.CommSystemFacade;
import net.i2p.router.OutNetMessage;
import net.i2p.router.RouterContext;
@ -142,8 +144,8 @@ public class NTCPTransport extends TransportImpl {
//_context.statManager().createRateStat("ntcp.inboundCheckConnection", "", "ntcp", RATES);
_context.statManager().createRateStat("ntcp.inboundEstablished", "", "ntcp", RATES);
_context.statManager().createRateStat("ntcp.inboundEstablishedDuplicate", "", "ntcp", RATES);
_context.statManager().createRateStat("ntcp.infoMessageEnqueued", "", "ntcp", RATES);
_context.statManager().createRateStat("ntcp.floodInfoMessageEnqueued", "", "ntcp", RATES);
//_context.statManager().createRateStat("ntcp.infoMessageEnqueued", "", "ntcp", RATES);
//_context.statManager().createRateStat("ntcp.floodInfoMessageEnqueued", "", "ntcp", RATES);
_context.statManager().createRateStat("ntcp.invalidDH", "", "ntcp", RATES);
_context.statManager().createRateStat("ntcp.invalidHXY", "", "ntcp", RATES);
_context.statManager().createRateStat("ntcp.invalidHXxorBIH", "", "ntcp", RATES);
@ -241,8 +243,26 @@ public class NTCPTransport extends TransportImpl {
return;
}
if (isNew) {
con.enqueueInfoMessage(); // enqueues a netDb store of our own info
con.send(msg); // doesn't do anything yet, just enqueues it
// As of 0.9.12, don't send our info if the first message is
// doing the same (common when connecting to a floodfill).
// Also, put the info message after whatever we are trying to send
// (it's a priority queue anyway and the info is low priority)
// Prior to 0.9.12, Bob would not send his RI unless he had ours,
// but that's fixed in 0.9.12.
boolean shouldSkipInfo = false;
I2NPMessage m = msg.getMessage();
if (m.getType() == DatabaseStoreMessage.MESSAGE_TYPE) {
DatabaseStoreMessage dsm = (DatabaseStoreMessage) m;
if (dsm.getKey().equals(_context.routerHash())) {
shouldSkipInfo = true;
}
}
if (!shouldSkipInfo) {
con.enqueueInfoMessage();
} else if (_log.shouldLog(Log.INFO)) {
_log.info("SKIPPING INFO message: " + con);
}
try {
SocketChannel channel = SocketChannel.open();

View File

@ -792,7 +792,11 @@ class EstablishmentManager {
_transport.setIP(remote.calculateHash(), state.getSentIP());
_context.statManager().addRateData("udp.outboundEstablishTime", state.getLifetime(), 0);
sendOurInfo(peer, false);
if (!state.isFirstMessageOurDSM()) {
sendOurInfo(peer, false);
} else if (_log.shouldLog(Log.INFO)) {
_log.info("Skipping publish: " + state);
}
OutNetMessage msg;
while ((msg = state.getNextQueuedMessage()) != null) {

View File

@ -9,6 +9,8 @@ import net.i2p.data.DataHelper;
import net.i2p.data.RouterIdentity;
import net.i2p.data.SessionKey;
import net.i2p.data.Signature;
import net.i2p.data.i2np.DatabaseStoreMessage;
import net.i2p.data.i2np.I2NPMessage;
import net.i2p.router.OutNetMessage;
import net.i2p.router.RouterContext;
import net.i2p.router.transport.crypto.DHSessionKeyBuilder;
@ -56,6 +58,7 @@ class OutboundEstablishState {
private final Queue<OutNetMessage> _queuedMessages;
private OutboundState _currentState;
private long _introductionNonce;
private boolean _isFirstMessageOurDSM;
// intro
private final UDPAddress _remoteAddress;
private boolean _complete;
@ -151,12 +154,29 @@ class OutboundEstablishState {
* Queue a message to be sent after the session is established.
*/
public void addMessage(OutNetMessage msg) {
if (_queuedMessages.isEmpty()) {
I2NPMessage m = msg.getMessage();
if (m.getType() == DatabaseStoreMessage.MESSAGE_TYPE) {
DatabaseStoreMessage dsm = (DatabaseStoreMessage) m;
if (dsm.getKey().equals(_context.routerHash())) {
_isFirstMessageOurDSM = true;
}
}
}
// chance of a duplicate here in a race, that's ok
if (!_queuedMessages.contains(msg))
_queuedMessages.offer(msg);
else if (_log.shouldLog(Log.WARN))
_log.warn("attempt to add duplicate msg to queue: " + msg);
}
/**
* Is the first message queued our own DatabaseStoreMessage?
* @since 0.9.12
*/
public boolean isFirstMessageOurDSM() {
return _isFirstMessageOurDSM;
}
/** @return null if none */
public OutNetMessage getNextQueuedMessage() {
@ -260,8 +280,8 @@ class OutboundEstablishState {
return false;
}
if (_receivedSignature != null) {
if (_log.shouldLog(Log.WARN))
_log.warn("Session created already validated");
if (_log.shouldLog(Log.DEBUG))
_log.debug("Session created already validated");
return true;
}