forked from I2P_Developers/i2p.i2p
Compare commits
12 Commits
i2p_0_6_1_
...
i2p_0_6_1_
Author | SHA1 | Date | |
---|---|---|---|
034803add7 | |||
b25bb053bb | |||
9bd0c79441 | |||
06b8670410 | |||
6577ae499f | |||
54bc5485ec | |||
84b741ac98 | |||
c48c419d74 | |||
fb2e795add | |||
ec215777ec | |||
d4e0f27c56 | |||
e1c686baa6 |
@ -109,7 +109,10 @@ public class AsyncFortunaStandalone extends FortunaStandalone implements Runnabl
|
||||
asyncBuffers.notifyAll();
|
||||
}
|
||||
Thread.yield();
|
||||
try { Thread.sleep((after-before)*5); } catch (InterruptedException ie) {}
|
||||
long waitTime = (after-before)*5;
|
||||
if (waitTime <= 0) // somehow postman saw waitTime show up as negative
|
||||
waitTime = 50;
|
||||
try { Thread.sleep(waitTime); } catch (InterruptedException ie) {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -14,8 +14,8 @@ package net.i2p;
|
||||
*
|
||||
*/
|
||||
public class CoreVersion {
|
||||
public final static String ID = "$Revision: 1.64 $ $Date: 2006-06-13 21:17:43 $";
|
||||
public final static String VERSION = "0.6.1.22";
|
||||
public final static String ID = "$Revision: 1.66 $ $Date: 2006-07-27 22:35:02 $";
|
||||
public final static String VERSION = "0.6.1.24";
|
||||
|
||||
public static void main(String args[]) {
|
||||
System.out.println("I2P Core version: " + VERSION);
|
||||
|
@ -182,6 +182,12 @@ public class RouterInfo extends DataStructureImpl {
|
||||
return (Properties) _options.clone();
|
||||
}
|
||||
}
|
||||
public String getOption(String opt) {
|
||||
if (_options == null) return null;
|
||||
synchronized (_options) {
|
||||
return _options.getProperty(opt);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Configure a set of options or statistics that the router can expose
|
||||
@ -347,7 +353,7 @@ public class RouterInfo extends DataStructureImpl {
|
||||
String capabilities = getCapabilities();
|
||||
// Iterate through capabilities, searching for known bandwidth tier
|
||||
for (int i = 0; i < capabilities.length(); i++) {
|
||||
if (bwTiers.indexOf(String.valueOf(capabilities.charAt(i))) >= 0) {
|
||||
if (bwTiers.indexOf(String.valueOf(capabilities.charAt(i))) != -1) {
|
||||
bwTier = String.valueOf(capabilities.charAt(i));
|
||||
break;
|
||||
}
|
||||
|
@ -43,7 +43,7 @@ public class StatManager {
|
||||
_log = context.logManager().getLog(StatManager.class);
|
||||
_context = context;
|
||||
_frequencyStats = Collections.synchronizedMap(new HashMap(128));
|
||||
_rateStats = Collections.synchronizedMap(new HashMap(128));
|
||||
_rateStats = new HashMap(128); // synchronized only on add //Collections.synchronizedMap(new HashMap(128));
|
||||
_statLog = new BufferedStatLog(context);
|
||||
}
|
||||
|
||||
@ -80,10 +80,12 @@ public class StatManager {
|
||||
* @param periods array of period lengths (in milliseconds)
|
||||
*/
|
||||
public void createRateStat(String name, String description, String group, long periods[]) {
|
||||
if (_rateStats.containsKey(name)) return;
|
||||
RateStat rs = new RateStat(name, description, group, periods);
|
||||
if (_statLog != null) rs.setStatLog(_statLog);
|
||||
_rateStats.put(name, rs);
|
||||
synchronized (_rateStats) {
|
||||
if (_rateStats.containsKey(name)) return;
|
||||
RateStat rs = new RateStat(name, description, group, periods);
|
||||
if (_statLog != null) rs.setStatLog(_statLog);
|
||||
_rateStats.put(name, rs);
|
||||
}
|
||||
}
|
||||
|
||||
/** update the given frequency statistic, taking note that an event occurred (and recalculating all frequencies) */
|
||||
@ -94,7 +96,7 @@ public class StatManager {
|
||||
|
||||
/** update the given rate statistic, taking note that the given data point was received (and recalculating all rates) */
|
||||
public void addRateData(String name, long data, long eventDuration) {
|
||||
RateStat stat = (RateStat) _rateStats.get(name);
|
||||
RateStat stat = (RateStat) _rateStats.get(name); // unsynchronized
|
||||
if (stat != null) stat.addData(data, eventDuration);
|
||||
}
|
||||
|
||||
|
51
core/java/src/net/i2p/util/Executor.java
Normal file
51
core/java/src/net/i2p/util/Executor.java
Normal file
@ -0,0 +1,51 @@
|
||||
package net.i2p.util;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import net.i2p.I2PAppContext;
|
||||
|
||||
class Executor implements Runnable {
|
||||
private I2PAppContext _context;
|
||||
private Log _log;
|
||||
private List _readyEvents;
|
||||
public Executor(I2PAppContext ctx, Log log, List events) {
|
||||
_context = ctx;
|
||||
_readyEvents = events;
|
||||
}
|
||||
public void run() {
|
||||
while (true) {
|
||||
SimpleTimer.TimedEvent evt = null;
|
||||
synchronized (_readyEvents) {
|
||||
if (_readyEvents.size() <= 0)
|
||||
try { _readyEvents.wait(); } catch (InterruptedException ie) {}
|
||||
if (_readyEvents.size() > 0)
|
||||
evt = (SimpleTimer.TimedEvent)_readyEvents.remove(0);
|
||||
}
|
||||
|
||||
if (evt != null) {
|
||||
long before = _context.clock().now();
|
||||
try {
|
||||
evt.timeReached();
|
||||
} catch (Throwable t) {
|
||||
log("wtf, event borked: " + evt, t);
|
||||
}
|
||||
long time = _context.clock().now() - before;
|
||||
if ( (time > 1000) && (_log != null) && (_log.shouldLog(Log.WARN)) )
|
||||
_log.warn("wtf, event execution took " + time + ": " + evt);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void log(String msg, Throwable t) {
|
||||
synchronized (this) {
|
||||
if (_log == null)
|
||||
_log = I2PAppContext.getGlobalContext().logManager().getLog(SimpleTimer.class);
|
||||
}
|
||||
_log.log(Log.CRIT, msg, t);
|
||||
}
|
||||
}
|
@ -222,43 +222,3 @@ public class SimpleTimer {
|
||||
}
|
||||
}
|
||||
|
||||
class Executor implements Runnable {
|
||||
private I2PAppContext _context;
|
||||
private Log _log;
|
||||
private List _readyEvents;
|
||||
public Executor(I2PAppContext ctx, Log log, List events) {
|
||||
_context = ctx;
|
||||
_readyEvents = events;
|
||||
}
|
||||
public void run() {
|
||||
while (true) {
|
||||
SimpleTimer.TimedEvent evt = null;
|
||||
synchronized (_readyEvents) {
|
||||
if (_readyEvents.size() <= 0)
|
||||
try { _readyEvents.wait(); } catch (InterruptedException ie) {}
|
||||
if (_readyEvents.size() > 0)
|
||||
evt = (SimpleTimer.TimedEvent)_readyEvents.remove(0);
|
||||
}
|
||||
|
||||
if (evt != null) {
|
||||
long before = _context.clock().now();
|
||||
try {
|
||||
evt.timeReached();
|
||||
} catch (Throwable t) {
|
||||
log("wtf, event borked: " + evt, t);
|
||||
}
|
||||
long time = _context.clock().now() - before;
|
||||
if ( (time > 1000) && (_log != null) && (_log.shouldLog(Log.WARN)) )
|
||||
_log.warn("wtf, event execution took " + time + ": " + evt);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void log(String msg, Throwable t) {
|
||||
synchronized (this) {
|
||||
if (_log == null)
|
||||
_log = I2PAppContext.getGlobalContext().logManager().getLog(SimpleTimer.class);
|
||||
}
|
||||
_log.log(Log.CRIT, msg, t);
|
||||
}
|
||||
}
|
||||
|
56
history.txt
56
history.txt
@ -1,4 +1,58 @@
|
||||
$Id: history.txt,v 1.494 2006-07-16 12:20:46 complication Exp $
|
||||
$Id: history.txt,v 1.502 2006-07-28 23:43:04 jrandom Exp $
|
||||
|
||||
* 2006-07-28 0.6.1.24 released
|
||||
|
||||
2006-07-28 jrandom
|
||||
* Don't try to reverify too many netDb entries at once (thanks
|
||||
cervantes and Complication!)
|
||||
|
||||
2006-07-28 jrandom
|
||||
* Actually fix the threading deadlock issue in the netDb (removing
|
||||
the synchronized access to individual kbuckets while validating
|
||||
individual entries) (thanks cervantes, postman, frosk, et al!)
|
||||
|
||||
* 2006-07-27 0.6.1.23 released
|
||||
|
||||
2006-07-27 jrandom
|
||||
* Cut down NTCP connection establishments once we know the peer is skewed
|
||||
(rather than wait for full establishment before verifying)
|
||||
* Removed a lock on the stats framework when accessing rates, which
|
||||
shouldn't be a problem, assuming rates are created (pretty much) all at
|
||||
once and merely updated during the lifetime of the jvm.
|
||||
|
||||
2006-07-27 jrandom
|
||||
* Further NTCP write status cleanup
|
||||
* Handle more oddly-timed NTCP disconnections (thanks bar!)
|
||||
|
||||
2006-07-26 jrandom
|
||||
* When dropping a netDb router reference, only accept newer
|
||||
references as part of the update check
|
||||
* If we have been up for a while, don't accept really old
|
||||
router references (published 2 or more days ago)
|
||||
* Drop router references once they are no longer valid, even if
|
||||
they were allowed in due to the lax restrictions on startup
|
||||
|
||||
2006-07-26 jrandom
|
||||
* Every time we create a new router identity, add an entry to the
|
||||
new "identlog.txt" text file in the I2P install directory. For
|
||||
debugging purposes, publish the count of how many identities the
|
||||
router has cycled through, though not the identities itself.
|
||||
* Cleaned up the way the multitransport shitlisting worked, and
|
||||
added per-transport shitlists
|
||||
* When dropping a router reference locally, first fire a netDb
|
||||
lookup for the entry
|
||||
* Take the peer selection filters into account when organizing the
|
||||
profiles (thanks Complication!)
|
||||
* Avoid some obvious configuration errors for the NTCP transport
|
||||
(invalid ports, "null" ip, etc)
|
||||
* Deal with some small NTCP bugs found in the wild (unresolveable
|
||||
hosts, strange network discons, etc)
|
||||
* Send our netDb info to peers we have direct NTCP connections to
|
||||
after each 6-12 hours of connection uptime
|
||||
* Clean up the NTCP reading and writing queue logic to avoid some
|
||||
potential delays
|
||||
* Allow people to specify the IP that the SSU transport binds on
|
||||
locally, via the advanced config "i2np.udp.bindInterface=1.2.3.4"
|
||||
|
||||
* 2006-07-18 0.6.1.22 released
|
||||
|
||||
|
@ -1,5 +1,5 @@
|
||||
<i2p.news date="$Date: 2006-06-13 21:17:40 $">
|
||||
<i2p.release version="0.6.1.22" date="2006/06/13" minVersion="0.6"
|
||||
<i2p.news date="$Date: 2006-07-18 15:08:00 $">
|
||||
<i2p.release version="0.6.1.24" date="2006/07/29" minVersion="0.6"
|
||||
anonurl="http://i2p/NF2RLVUxVulR3IqK0sGJR0dHQcGXAzwa6rEO4WAWYXOHw-DoZhKnlbf1nzHXwMEJoex5nFTyiNMqxJMWlY54cvU~UenZdkyQQeUSBZXyuSweflUXFqKN-y8xIoK2w9Ylq1k8IcrAFDsITyOzjUKoOPfVq34rKNDo7fYyis4kT5bAHy~2N1EVMs34pi2RFabATIOBk38Qhab57Umpa6yEoE~rbyR~suDRvD7gjBvBiIKFqhFueXsR2uSrPB-yzwAGofTXuklofK3DdKspciclTVzqbDjsk5UXfu2nTrC1agkhLyqlOfjhyqC~t1IXm-Vs2o7911k7KKLGjB4lmH508YJ7G9fLAUyjuB-wwwhejoWqvg7oWvqo4oIok8LG6ECR71C3dzCvIjY2QcrhoaazA9G4zcGMm6NKND-H4XY6tUWhpB~5GefB3YczOqMbHq4wi0O9MzBFrOJEOs3X4hwboKWANf7DT5PZKJZ5KorQPsYRSq0E3wSOsFCSsdVCKUGsAAAA/i2p/i2pupdate.sud"
|
||||
publicurl="http://dev.i2p.net/i2p/i2pupdate.sud"
|
||||
anonannouncement="http://i2p/NF2RLVUxVulR3IqK0sGJR0dHQcGXAzwa6rEO4WAWYXOHw-DoZhKnlbf1nzHXwMEJoex5nFTyiNMqxJMWlY54cvU~UenZdkyQQeUSBZXyuSweflUXFqKN-y8xIoK2w9Ylq1k8IcrAFDsITyOzjUKoOPfVq34rKNDo7fYyis4kT5bAHy~2N1EVMs34pi2RFabATIOBk38Qhab57Umpa6yEoE~rbyR~suDRvD7gjBvBiIKFqhFueXsR2uSrPB-yzwAGofTXuklofK3DdKspciclTVzqbDjsk5UXfu2nTrC1agkhLyqlOfjhyqC~t1IXm-Vs2o7911k7KKLGjB4lmH508YJ7G9fLAUyjuB-wwwhejoWqvg7oWvqo4oIok8LG6ECR71C3dzCvIjY2QcrhoaazA9G4zcGMm6NKND-H4XY6tUWhpB~5GefB3YczOqMbHq4wi0O9MzBFrOJEOs3X4hwboKWANf7DT5PZKJZ5KorQPsYRSq0E3wSOsFCSsdVCKUGsAAAA/pipermail/i2p/2005-September/000878.html"
|
||||
|
@ -4,7 +4,7 @@
|
||||
|
||||
<info>
|
||||
<appname>i2p</appname>
|
||||
<appversion>0.6.1.22</appversion>
|
||||
<appversion>0.6.1.24</appversion>
|
||||
<authors>
|
||||
<author name="I2P" email="support@i2p.net"/>
|
||||
</authors>
|
||||
|
6
news.xml
6
news.xml
@ -1,5 +1,5 @@
|
||||
<i2p.news date="$Date: 2006-06-15 17:15:09 $">
|
||||
<i2p.release version="0.6.1.22" date="2006/06/13" minVersion="0.6"
|
||||
<i2p.news date="$Date: 2006-07-27 22:34:59 $">
|
||||
<i2p.release version="0.6.1.24" date="2006/07/29" minVersion="0.6"
|
||||
anonurl="http://i2p/NF2RLVUxVulR3IqK0sGJR0dHQcGXAzwa6rEO4WAWYXOHw-DoZhKnlbf1nzHXwMEJoex5nFTyiNMqxJMWlY54cvU~UenZdkyQQeUSBZXyuSweflUXFqKN-y8xIoK2w9Ylq1k8IcrAFDsITyOzjUKoOPfVq34rKNDo7fYyis4kT5bAHy~2N1EVMs34pi2RFabATIOBk38Qhab57Umpa6yEoE~rbyR~suDRvD7gjBvBiIKFqhFueXsR2uSrPB-yzwAGofTXuklofK3DdKspciclTVzqbDjsk5UXfu2nTrC1agkhLyqlOfjhyqC~t1IXm-Vs2o7911k7KKLGjB4lmH508YJ7G9fLAUyjuB-wwwhejoWqvg7oWvqo4oIok8LG6ECR71C3dzCvIjY2QcrhoaazA9G4zcGMm6NKND-H4XY6tUWhpB~5GefB3YczOqMbHq4wi0O9MzBFrOJEOs3X4hwboKWANf7DT5PZKJZ5KorQPsYRSq0E3wSOsFCSsdVCKUGsAAAA/i2p/i2pupdate.sud"
|
||||
publicurl="http://dev.i2p.net/i2p/i2pupdate.sud"
|
||||
anonannouncement="http://i2p/NF2RLVUxVulR3IqK0sGJR0dHQcGXAzwa6rEO4WAWYXOHw-DoZhKnlbf1nzHXwMEJoex5nFTyiNMqxJMWlY54cvU~UenZdkyQQeUSBZXyuSweflUXFqKN-y8xIoK2w9Ylq1k8IcrAFDsITyOzjUKoOPfVq34rKNDo7fYyis4kT5bAHy~2N1EVMs34pi2RFabATIOBk38Qhab57Umpa6yEoE~rbyR~suDRvD7gjBvBiIKFqhFueXsR2uSrPB-yzwAGofTXuklofK3DdKspciclTVzqbDjsk5UXfu2nTrC1agkhLyqlOfjhyqC~t1IXm-Vs2o7911k7KKLGjB4lmH508YJ7G9fLAUyjuB-wwwhejoWqvg7oWvqo4oIok8LG6ECR71C3dzCvIjY2QcrhoaazA9G4zcGMm6NKND-H4XY6tUWhpB~5GefB3YczOqMbHq4wi0O9MzBFrOJEOs3X4hwboKWANf7DT5PZKJZ5KorQPsYRSq0E3wSOsFCSsdVCKUGsAAAA/pipermail/i2p/2005-September/000878.html"
|
||||
@ -10,7 +10,7 @@
|
||||
anonlogs="http://i2p/Nf3ab-ZFkmI-LyMt7GjgT-jfvZ3zKDl0L96pmGQXF1B82W2Bfjf0n7~288vafocjFLnQnVcmZd~-p0-Oolfo9aW2Rm-AhyqxnxyLlPBqGxsJBXjPhm1JBT4Ia8FB-VXt0BuY0fMKdAfWwN61-tj4zIcQWRxv3DFquwEf035K~Ra4SWOqiuJgTRJu7~o~DzHVljVgWIzwf8Z84cz0X33pv-mdG~~y0Bsc2qJVnYwjjR178YMcRSmNE0FVMcs6f17c6zqhMw-11qjKpY~EJfHYCx4lBWF37CD0obbWqTNUIbL~78vxqZRT3dgAgnLixog9nqTO-0Rh~NpVUZnoUi7fNR~awW5U3Cf7rU7nNEKKobLue78hjvRcWn7upHUF45QqTDuaM3yZa7OsjbcH-I909DOub2Q0Dno6vIwuA7yrysccN1sbnkwZbKlf4T6~iDdhaSLJd97QCyPOlbyUfYy9QLNExlRqKgNVJcMJRrIual~Lb1CLbnzt0uvobM57UpqSAAAA/meeting141"
|
||||
publiclogs="http://www.i2p.net/meeting141" />
|
||||
•
|
||||
2006-07-14: 0.6.1.22 <a href="http://dev.i2p/pipermail/i2p/2006-June/001294.html">released</a>
|
||||
2006-07-18: 0.6.1.22 <a href="http://dev.i2p/pipermail/i2p/2006-July/001300.html">released</a>
|
||||
<br />
|
||||
•
|
||||
2006-06-13:
|
||||
|
@ -95,8 +95,18 @@ public class I2NPMessageHandler {
|
||||
cur++;
|
||||
_lastReadBegin = System.currentTimeMillis();
|
||||
I2NPMessage msg = I2NPMessageImpl.createMessage(_context, type);
|
||||
if (msg == null)
|
||||
throw new I2NPMessageException("The type "+ type + " is an unknown I2NP message");
|
||||
if (msg == null) {
|
||||
int sz = data.length-offset;
|
||||
boolean allZero = false;
|
||||
for (int i = offset; i < data.length; i++) {
|
||||
if (data[i] != 0) {
|
||||
allZero = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
throw new I2NPMessageException("The type "+ type + " is an unknown I2NP message (remaining sz="
|
||||
+ sz + " all zeros? " + allZero + ")");
|
||||
}
|
||||
try {
|
||||
_lastSize = msg.readBytes(data, type, cur);
|
||||
cur += _lastSize;
|
||||
|
@ -323,6 +323,7 @@ public class Router {
|
||||
ri.setPublished(_context.clock().now());
|
||||
Properties stats = _context.statPublisher().publishStatistics();
|
||||
stats.setProperty(RouterInfo.PROP_NETWORK_ID, NETWORK_ID+"");
|
||||
|
||||
ri.setOptions(stats);
|
||||
ri.setAddresses(_context.commSystem().createAddresses());
|
||||
|
||||
@ -444,15 +445,32 @@ public class Router {
|
||||
"keyBackup/publicSigning.key",
|
||||
"sessionKeys.dat" };
|
||||
|
||||
static final String IDENTLOG = "identlog.txt";
|
||||
public static void killKeys() {
|
||||
new Exception("Clearing identity files").printStackTrace();
|
||||
int remCount = 0;
|
||||
for (int i = 0; i < _rebuildFiles.length; i++) {
|
||||
File f = new File(_rebuildFiles[i]);
|
||||
if (f.exists()) {
|
||||
boolean removed = f.delete();
|
||||
if (removed)
|
||||
if (removed) {
|
||||
System.out.println("INFO: Removing old identity file: " + _rebuildFiles[i]);
|
||||
else
|
||||
remCount++;
|
||||
} else {
|
||||
System.out.println("ERROR: Could not remove old identity file: " + _rebuildFiles[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (remCount > 0) {
|
||||
FileOutputStream log = null;
|
||||
try {
|
||||
log = new FileOutputStream(IDENTLOG, true);
|
||||
log.write((new Date() + ": Old router identity keys cleared\n").getBytes());
|
||||
} catch (IOException ioe) {
|
||||
// ignore
|
||||
} finally {
|
||||
if (log != null)
|
||||
try { log.close(); } catch (IOException ioe) {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -15,8 +15,8 @@ import net.i2p.CoreVersion;
|
||||
*
|
||||
*/
|
||||
public class RouterVersion {
|
||||
public final static String ID = "$Revision: 1.433 $ $Date: 2006-07-16 12:20:47 $";
|
||||
public final static String VERSION = "0.6.1.22";
|
||||
public final static String ID = "$Revision: 1.441 $ $Date: 2006-07-28 23:41:16 $";
|
||||
public final static String VERSION = "0.6.1.24";
|
||||
public final static long BUILD = 0;
|
||||
public static void main(String args[]) {
|
||||
System.out.println("I2P Router version: " + VERSION + "-" + BUILD);
|
||||
|
@ -18,10 +18,10 @@ import net.i2p.router.peermanager.PeerProfile;
|
||||
import net.i2p.util.Log;
|
||||
|
||||
/**
|
||||
* Manage in memory the routers we are oh so fond of.
|
||||
* This needs to get a little bit more sophisticated... currently there is no
|
||||
* way out of the shitlist
|
||||
*
|
||||
* Routers are shitlisted only if none of our transports can talk to them
|
||||
* or their signed router info is completely screwy. Individual transports
|
||||
* manage their own unreachable lists and do not generally add to the overall
|
||||
* shitlist.
|
||||
*/
|
||||
public class Shitlist {
|
||||
private Log _log;
|
||||
@ -43,6 +43,39 @@ public class Shitlist {
|
||||
_context = context;
|
||||
_log = context.logManager().getLog(Shitlist.class);
|
||||
_entries = new HashMap(32);
|
||||
_context.jobQueue().addJob(new Cleanup(_context));
|
||||
}
|
||||
|
||||
private class Cleanup extends JobImpl {
|
||||
private List _toUnshitlist;
|
||||
public Cleanup(RouterContext ctx) {
|
||||
super(ctx);
|
||||
_toUnshitlist = new ArrayList(4);
|
||||
}
|
||||
public String getName() { return "Cleanup shitlist"; }
|
||||
public void runJob() {
|
||||
_toUnshitlist.clear();
|
||||
long now = getContext().clock().now();
|
||||
synchronized (_entries) {
|
||||
for (Iterator iter = _entries.keySet().iterator(); iter.hasNext(); ) {
|
||||
Hash peer = (Hash)iter.next();
|
||||
Entry entry = (Entry)_entries.get(peer);
|
||||
if (entry.expireOn <= now) {
|
||||
iter.remove();
|
||||
_toUnshitlist.add(peer);
|
||||
}
|
||||
}
|
||||
}
|
||||
for (int i = 0; i < _toUnshitlist.size(); i++) {
|
||||
Hash peer = (Hash)_toUnshitlist.get(i);
|
||||
PeerProfile prof = _context.profileOrganizer().getProfile(peer);
|
||||
if (prof != null)
|
||||
prof.unshitlist();
|
||||
_context.messageHistory().unshitlist(peer);
|
||||
}
|
||||
|
||||
requeue(30*1000);
|
||||
}
|
||||
}
|
||||
|
||||
public int getRouterCount() {
|
||||
@ -130,7 +163,10 @@ public class Shitlist {
|
||||
fully = true;
|
||||
} else {
|
||||
e.transports.remove(transport);
|
||||
_entries.put(peer, e);
|
||||
if (e.transports.size() <= 0)
|
||||
fully = true;
|
||||
else
|
||||
_entries.put(peer, e);
|
||||
}
|
||||
}
|
||||
if (fully) {
|
||||
@ -188,9 +224,14 @@ public class Shitlist {
|
||||
}
|
||||
buf.append("<ul>");
|
||||
|
||||
int partial = 0;
|
||||
for (Iterator iter = entries.keySet().iterator(); iter.hasNext(); ) {
|
||||
Hash key = (Hash)iter.next();
|
||||
Entry entry = (Entry)entries.get(key);
|
||||
if ( (entry.transports != null) && (entry.transports.size() > 0) ) {
|
||||
partial++;
|
||||
continue;
|
||||
}
|
||||
buf.append("<li><b>").append(key.toBase64()).append("</b>");
|
||||
buf.append(" <a href=\"netdb.jsp#").append(key.toBase64().substring(0, 6)).append("\">(?)</a>");
|
||||
buf.append(" expiring in ");
|
||||
@ -205,6 +246,9 @@ public class Shitlist {
|
||||
buf.append("</li>\n");
|
||||
}
|
||||
buf.append("</ul>\n");
|
||||
buf.append("<i>Partial shitlisted peers (only blocked on some transports): ");
|
||||
buf.append(partial);
|
||||
buf.append("</i>\n");
|
||||
out.write(buf.toString());
|
||||
out.flush();
|
||||
}
|
||||
|
@ -8,6 +8,8 @@ package net.i2p.router;
|
||||
*
|
||||
*/
|
||||
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.Writer;
|
||||
import java.text.DecimalFormat;
|
||||
import java.text.DecimalFormatSymbols;
|
||||
@ -100,7 +102,28 @@ public class StatisticsManager implements Service {
|
||||
// No longer expose, to make build tracking more expensive
|
||||
// stats.setProperty("router.id", RouterVersion.ID);
|
||||
// stats.setProperty("core.id", CoreVersion.ID);
|
||||
|
||||
|
||||
int newlines = 0;
|
||||
FileInputStream in = null;
|
||||
try {
|
||||
in = new FileInputStream(Router.IDENTLOG);
|
||||
int c = -1;
|
||||
// perhaps later filter this to only include ident changes this
|
||||
// day/week/month
|
||||
while ( (c = in.read()) != -1) {
|
||||
if (c == '\n')
|
||||
newlines++;
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
// ignore
|
||||
} finally {
|
||||
if (in != null)
|
||||
try { in.close(); } catch (IOException ioe) {}
|
||||
}
|
||||
if (newlines > 0)
|
||||
stats.setProperty("stat_identities", newlines+"");
|
||||
|
||||
|
||||
if (_includePeerRankings) {
|
||||
if (false)
|
||||
stats.putAll(_context.profileManager().summarizePeers(_publishedStats));
|
||||
|
@ -320,7 +320,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
|
||||
}
|
||||
boolean wantACK = true;
|
||||
int existingTags = GarlicMessageBuilder.estimateAvailableTags(getContext(), _leaseSet.getEncryptionKey());
|
||||
if (existingTags > 30)
|
||||
if ( (existingTags > 30) && (getContext().random().nextInt(100) >= 5) )
|
||||
wantACK = false;
|
||||
|
||||
long token = (wantACK ? getContext().random().nextLong(I2NPMessage.MAX_ID_VALUE) : -1);
|
||||
|
@ -231,6 +231,55 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad
|
||||
FloodfillPeerSelector sel = (FloodfillPeerSelector)getPeerSelector();
|
||||
return sel.selectFloodfillParticipants(getKBuckets());
|
||||
}
|
||||
|
||||
protected void lookupBeforeDropping(Hash peer, RouterInfo info) {
|
||||
if (_context.jobQueue().getMaxLag() > 500) {
|
||||
// don't try to overload ourselves (e.g. failing 3000 router refs at
|
||||
// once, and then firing off 3000 netDb lookup tasks)
|
||||
super.lookupBeforeDropping(peer, info);
|
||||
return;
|
||||
}
|
||||
// this sends out the search to the floodfill peers even if we already have the
|
||||
// entry locally, firing no job if it gets a reply with an updated value (meaning
|
||||
// we shouldn't drop them but instead use the new data), or if they all time out,
|
||||
// firing the dropLookupFailedJob, which actually removes out local reference
|
||||
search(peer, new DropLookupFoundJob(_context, peer, info), new DropLookupFailedJob(_context, peer, info), 10*1000, false);
|
||||
}
|
||||
|
||||
private class DropLookupFailedJob extends JobImpl {
|
||||
private Hash _peer;
|
||||
private RouterInfo _info;
|
||||
|
||||
public DropLookupFailedJob(RouterContext ctx, Hash peer, RouterInfo info) {
|
||||
super(ctx);
|
||||
_peer = peer;
|
||||
_info = info;
|
||||
}
|
||||
public String getName() { return "Lookup on failure of netDb peer timed out"; }
|
||||
public void runJob() {
|
||||
dropAfterLookupFailed(_peer, _info);
|
||||
}
|
||||
}
|
||||
private class DropLookupFoundJob extends JobImpl {
|
||||
private Hash _peer;
|
||||
private RouterInfo _info;
|
||||
|
||||
public DropLookupFoundJob(RouterContext ctx, Hash peer, RouterInfo info) {
|
||||
super(ctx);
|
||||
_peer = peer;
|
||||
_info = info;
|
||||
}
|
||||
public String getName() { return "Lookup on failure of netDb peer matched"; }
|
||||
public void runJob() {
|
||||
RouterInfo updated = lookupRouterInfoLocally(_peer);
|
||||
if ( (updated != null) && (updated.getPublished() > _info.getPublished()) ) {
|
||||
// great, a legitimate update
|
||||
} else {
|
||||
// they just sent us what we already had. kill 'em both
|
||||
dropAfterLookupFailed(_peer, _info);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -28,6 +28,7 @@ class KBucketSet {
|
||||
private I2PAppContext _context;
|
||||
private Hash _us;
|
||||
private KBucket _buckets[];
|
||||
private volatile int _size;
|
||||
|
||||
public final static int BASE = 8; // must go into KEYSIZE_BITS evenly
|
||||
public final static int KEYSIZE_BITS = Hash.HASH_LENGTH * 8;
|
||||
@ -51,6 +52,8 @@ class KBucketSet {
|
||||
if (bucket >= 0) {
|
||||
int oldSize = _buckets[bucket].getKeyCount();
|
||||
int numInBucket = _buckets[bucket].add(peer);
|
||||
if (numInBucket != oldSize)
|
||||
_size++;
|
||||
if (numInBucket > BUCKET_SIZE) {
|
||||
// perhaps queue up coalesce job? naaahh.. lets let 'er grow for now
|
||||
}
|
||||
@ -62,17 +65,26 @@ class KBucketSet {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Not an exact count (due to concurrency issues) but generally correct
|
||||
*
|
||||
*/
|
||||
public int size() {
|
||||
return _size;
|
||||
/*
|
||||
int size = 0;
|
||||
for (int i = 0; i < _buckets.length; i++)
|
||||
size += _buckets[i].getKeyCount();
|
||||
return size;
|
||||
*/
|
||||
}
|
||||
|
||||
public boolean remove(Hash entry) {
|
||||
int bucket = pickBucket(entry);
|
||||
KBucket kbucket = getBucket(bucket);
|
||||
boolean removed = kbucket.remove(entry);
|
||||
if (removed)
|
||||
_size--;
|
||||
return removed;
|
||||
}
|
||||
|
||||
|
@ -462,9 +462,22 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
|
||||
if (!_initialized) return null;
|
||||
DataStructure ds = _ds.get(key);
|
||||
if (ds != null) {
|
||||
if (ds instanceof RouterInfo)
|
||||
if (ds instanceof RouterInfo) {
|
||||
// more aggressive than perhaps is necessary, but makes sure we
|
||||
// drop old references that we had accepted on startup (since
|
||||
// startup allows some lax rules).
|
||||
boolean valid = true;
|
||||
try {
|
||||
valid = (null == validate(key, (RouterInfo)ds));
|
||||
} catch (IllegalArgumentException iae) {
|
||||
valid = false;
|
||||
}
|
||||
if (!valid) {
|
||||
fail(key);
|
||||
return null;
|
||||
}
|
||||
return (RouterInfo)ds;
|
||||
else {
|
||||
} else {
|
||||
//_log.debug("Looking for a router [" + key + "] but it ISN'T a RouterInfo! " + ds, new Exception("Who thought that lease was a router?"));
|
||||
return null;
|
||||
}
|
||||
@ -677,6 +690,9 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
|
||||
String rv = "Peer " + key.toBase64() + " is from another network, not accepting it (id="
|
||||
+ routerInfo.getNetworkId() + ", want " + Router.NETWORK_ID + ")";
|
||||
return rv;
|
||||
} else if ( (_context.router().getUptime() > 60*60*1000) && (routerInfo.getPublished() < now - 2*24*60*60*1000l) ) {
|
||||
long age = _context.clock().now() - routerInfo.getPublished();
|
||||
return "Peer " + key.toBase64() + " published " + DataHelper.formatDuration(age) + " ago";
|
||||
}
|
||||
return null;
|
||||
}
|
||||
@ -742,12 +758,8 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
|
||||
}
|
||||
}
|
||||
|
||||
_context.peerManager().removeCapabilities(dbEntry);
|
||||
boolean removed = _kb.remove(dbEntry);
|
||||
if (removed) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Removed kbucket entry for " + dbEntry);
|
||||
}
|
||||
lookupBeforeDropping(dbEntry, (RouterInfo)o);
|
||||
return;
|
||||
} else {
|
||||
// we always drop leaseSets that are failed [timed out],
|
||||
// regardless of how many routers we have. this is called on a lease if
|
||||
@ -775,6 +787,30 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
|
||||
}
|
||||
}
|
||||
|
||||
protected void lookupBeforeDropping(Hash peer, RouterInfo info) {
|
||||
//bah, humbug.
|
||||
dropAfterLookupFailed(peer, info);
|
||||
}
|
||||
protected void dropAfterLookupFailed(Hash peer, RouterInfo info) {
|
||||
_context.peerManager().removeCapabilities(peer);
|
||||
boolean removed = _kb.remove(peer);
|
||||
if (removed) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Removed kbucket entry for " + peer);
|
||||
}
|
||||
|
||||
_ds.remove(peer);
|
||||
synchronized (_lastSent) {
|
||||
_lastSent.remove(peer);
|
||||
}
|
||||
synchronized (_explicitSendKeys) {
|
||||
_explicitSendKeys.remove(peer);
|
||||
}
|
||||
synchronized (_passiveSendKeys) {
|
||||
_passiveSendKeys.remove(peer);
|
||||
}
|
||||
}
|
||||
|
||||
public void unpublish(LeaseSet localLeaseSet) {
|
||||
if (!_initialized) return;
|
||||
Hash h = localLeaseSet.getDestination().calculateHash();
|
||||
@ -935,8 +971,8 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
|
||||
renderRouterInfo(buf, ri, false);
|
||||
out.write(buf.toString());
|
||||
buf.setLength(0);
|
||||
String coreVersion = ri.getOptions().getProperty("coreVersion");
|
||||
String routerVersion = ri.getOptions().getProperty("router.version");
|
||||
String coreVersion = ri.getOption("coreVersion");
|
||||
String routerVersion = ri.getOption("router.version");
|
||||
if ( (coreVersion != null) && (routerVersion != null) ) {
|
||||
Map routerVersions = (Map)versions.get(coreVersion);
|
||||
if (routerVersions == null) {
|
||||
@ -1001,7 +1037,7 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
|
||||
buf.append("Stats: <br /><i><code>\n");
|
||||
for (Iterator iter = info.getOptions().keySet().iterator(); iter.hasNext(); ) {
|
||||
String key = (String)iter.next();
|
||||
String val = info.getOptions().getProperty(key);
|
||||
String val = info.getOption(key);
|
||||
buf.append(DataHelper.stripHTML(key)).append(" = ").append(DataHelper.stripHTML(val)).append("<br />\n");
|
||||
}
|
||||
buf.append("</code></i><hr />\n");
|
||||
|
@ -363,16 +363,16 @@ class PersistentDataStore extends TransientDataStore {
|
||||
}
|
||||
}
|
||||
} catch (DataFormatException dfe) {
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
_log.error("Error reading the routerInfo from " + _routerFile.getName(), dfe);
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Error reading the routerInfo from " + _routerFile.getName(), dfe);
|
||||
corrupt = true;
|
||||
} finally {
|
||||
if (fis != null) try { fis.close(); } catch (IOException ioe) {}
|
||||
}
|
||||
if (corrupt) _routerFile.delete();
|
||||
} catch (IOException ioe) {
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
_log.error("Unable to read the router reference in " + _routerFile.getName(), ioe);
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Unable to read the router reference in " + _routerFile.getName(), ioe);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -841,15 +841,17 @@ class SearchReplyJob extends JobImpl {
|
||||
if (!sendsBadInfo) {
|
||||
// we don't need to search for everthing we're given here - only ones that
|
||||
// are next in our search path...
|
||||
if (getContext().shitlist().isShitlisted(peer)) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Not looking for a shitlisted peer...");
|
||||
getContext().statManager().addRateData("netDb.searchReplyValidationSkipped", 1, 0);
|
||||
} else {
|
||||
// note: no need to think about shitlisted targets in the netdb search, given
|
||||
// the floodfill's behavior
|
||||
//if (getContext().shitlist().isShitlisted(peer)) {
|
||||
// if (_log.shouldLog(Log.INFO))
|
||||
// _log.info("Not looking for a shitlisted peer...");
|
||||
// getContext().statManager().addRateData("netDb.searchReplyValidationSkipped", 1, 0);
|
||||
//} else {
|
||||
//getContext().netDb().lookupRouterInfo(peer, new ReplyVerifiedJob(getContext(), peer), new ReplyNotVerifiedJob(getContext(), peer), _timeoutMs);
|
||||
//_repliesPendingVerification++;
|
||||
shouldAdd = true;
|
||||
}
|
||||
//}
|
||||
} else {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Peer " + _peer.toBase64() + " sends us bad replies, so not verifying " + peer.toBase64());
|
||||
|
@ -21,6 +21,7 @@ import net.i2p.data.Hash;
|
||||
import net.i2p.data.RouterInfo;
|
||||
import net.i2p.router.RouterContext;
|
||||
import net.i2p.router.NetworkDatabaseFacade;
|
||||
import net.i2p.router.tunnel.pool.TunnelPeerSelector;
|
||||
import net.i2p.stat.Rate;
|
||||
import net.i2p.stat.RateStat;
|
||||
import net.i2p.util.Log;
|
||||
@ -813,11 +814,16 @@ public class ProfileOrganizer {
|
||||
_log.warn("Peer " + peer.toBase64() + " is marked as hidden, disallowing its use");
|
||||
return false;
|
||||
} else {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Peer " + peer.toBase64() + " is locally known, allowing its use");
|
||||
// perhaps check to see if they are active?
|
||||
|
||||
return true;
|
||||
boolean exclude = TunnelPeerSelector.shouldExclude(_context, info);
|
||||
if (exclude) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Peer " + peer.toBase64() + " has capabilities or other stats suggesting we avoid it");
|
||||
return false;
|
||||
} else {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Peer " + peer.toBase64() + " is locally known, allowing its use");
|
||||
return true;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
|
@ -170,8 +170,15 @@ public class CommSystemFacadeImpl extends CommSystemFacade {
|
||||
isNew = true;
|
||||
}
|
||||
*/
|
||||
if ( (name == null) || (port == null) )
|
||||
if ( (name == null) || (port == null) || (name.trim().length() <= 0) || ("null".equals(name)) )
|
||||
return null;
|
||||
try {
|
||||
int p = Integer.parseInt(port);
|
||||
if ( (p <= 0) || (p > 64*1024) )
|
||||
return null;
|
||||
} catch (NumberFormatException nfe) {
|
||||
return null;
|
||||
}
|
||||
props.setProperty(NTCPAddress.PROP_HOST, name);
|
||||
props.setProperty(NTCPAddress.PROP_PORT, port);
|
||||
addr.setOptions(props);
|
||||
|
@ -48,6 +48,7 @@ public class GetBidsJob extends JobImpl {
|
||||
if (log.shouldLog(Log.WARN))
|
||||
log.warn("Attempt to send a message to a shitlisted peer - " + to);
|
||||
//context.messageRegistry().peerFailed(to);
|
||||
context.statManager().addRateData("transport.bidFailShitlisted", msg.getLifetime(), 0);
|
||||
fail(context, msg);
|
||||
return;
|
||||
}
|
||||
@ -56,6 +57,7 @@ public class GetBidsJob extends JobImpl {
|
||||
if (to.equals(us)) {
|
||||
if (log.shouldLog(Log.ERROR))
|
||||
log.error("wtf, send a message to ourselves? nuh uh. msg = " + msg);
|
||||
context.statManager().addRateData("transport.bidFailSelf", msg.getLifetime(), 0);
|
||||
fail(context, msg);
|
||||
return;
|
||||
}
|
||||
@ -64,8 +66,10 @@ public class GetBidsJob extends JobImpl {
|
||||
if (bid == null) {
|
||||
int failedCount = msg.getFailedTransports().size();
|
||||
if (failedCount == 0) {
|
||||
context.statManager().addRateData("transport.bidFailNoTransports", msg.getLifetime(), 0);
|
||||
context.shitlist().shitlistRouter(to, "We share no common transports with them");
|
||||
} else if (failedCount >= facade.getTransportCount()) {
|
||||
context.statManager().addRateData("transport.bidFailAllTransports", msg.getLifetime(), 0);
|
||||
// fail after all transports were unsuccessful
|
||||
context.netDb().fail(to);
|
||||
}
|
||||
|
@ -12,6 +12,7 @@ import java.io.IOException;
|
||||
import java.io.Writer;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import net.i2p.data.Hash;
|
||||
|
||||
import net.i2p.data.RouterAddress;
|
||||
import net.i2p.data.RouterInfo;
|
||||
@ -44,4 +45,6 @@ public interface Transport {
|
||||
public void renderStatusHTML(Writer out, String urlBase, int sortFlags) throws IOException;
|
||||
public short getReachabilityStatus();
|
||||
public void recheckReachability();
|
||||
|
||||
public boolean isUnreachable(Hash peer);
|
||||
}
|
||||
|
@ -10,13 +10,7 @@ package net.i2p.router.transport;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.Writer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.*;
|
||||
|
||||
import net.i2p.data.Hash;
|
||||
import net.i2p.data.RouterAddress;
|
||||
@ -24,6 +18,7 @@ import net.i2p.data.RouterIdentity;
|
||||
import net.i2p.data.i2np.I2NPMessage;
|
||||
import net.i2p.router.CommSystemFacade;
|
||||
import net.i2p.router.Job;
|
||||
import net.i2p.router.JobImpl;
|
||||
import net.i2p.router.MessageSelector;
|
||||
import net.i2p.router.OutNetMessage;
|
||||
import net.i2p.router.RouterContext;
|
||||
@ -39,6 +34,8 @@ public abstract class TransportImpl implements Transport {
|
||||
private RouterAddress _currentAddress;
|
||||
private List _sendPool;
|
||||
protected RouterContext _context;
|
||||
/** map from routerIdentHash to timestamp (Long) that the peer was last unreachable */
|
||||
private Map _unreachableEntries;
|
||||
|
||||
/**
|
||||
* Initialize the new transport
|
||||
@ -56,6 +53,7 @@ public abstract class TransportImpl implements Transport {
|
||||
_context.statManager().createRateStat("transport.sendProcessingTime", "How long does it take from noticing that we want to send the message to having it completely sent (successfully or failed)?", "Transport", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
|
||||
_context.statManager().createRateStat("transport.expiredOnQueueLifetime", "How long a message that expires on our outbound queue is processed", "Transport", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l } );
|
||||
_sendPool = new ArrayList(16);
|
||||
_unreachableEntries = new HashMap(16);
|
||||
_currentAddress = null;
|
||||
}
|
||||
|
||||
@ -374,6 +372,54 @@ public abstract class TransportImpl implements Transport {
|
||||
public RouterContext getContext() { return _context; }
|
||||
public short getReachabilityStatus() { return CommSystemFacade.STATUS_UNKNOWN; }
|
||||
public void recheckReachability() {}
|
||||
|
||||
private static final long UNREACHABLE_PERIOD = 5*60*1000;
|
||||
public boolean isUnreachable(Hash peer) {
|
||||
long now = _context.clock().now();
|
||||
synchronized (_unreachableEntries) {
|
||||
Long when = (Long)_unreachableEntries.get(peer);
|
||||
if (when == null) return false;
|
||||
if (when.longValue() + UNREACHABLE_PERIOD < now) {
|
||||
_unreachableEntries.remove(peer);
|
||||
return false;
|
||||
} else {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
/** called when we can't reach a peer */
|
||||
public void markUnreachable(Hash peer) {
|
||||
long now = _context.clock().now();
|
||||
synchronized (_unreachableEntries) {
|
||||
_unreachableEntries.put(peer, new Long(now));
|
||||
}
|
||||
}
|
||||
/** called when we establish a peer connection (outbound or inbound) */
|
||||
public void markReachable(Hash peer) {
|
||||
// if *some* transport can reach them, then we shouldn't shitlist 'em
|
||||
_context.shitlist().unshitlistRouter(peer);
|
||||
synchronized (_unreachableEntries) {
|
||||
_unreachableEntries.remove(peer);
|
||||
}
|
||||
}
|
||||
private class CleanupUnreachable extends JobImpl {
|
||||
public CleanupUnreachable(RouterContext ctx) {
|
||||
super(ctx);
|
||||
}
|
||||
public String getName() { return "Cleanup " + getStyle() + " unreachable list"; }
|
||||
public void runJob() {
|
||||
long now = getContext().clock().now();
|
||||
synchronized (_unreachableEntries) {
|
||||
for (Iterator iter = _unreachableEntries.keySet().iterator(); iter.hasNext(); ) {
|
||||
Hash peer = (Hash)iter.next();
|
||||
Long when = (Long)_unreachableEntries.get(peer);
|
||||
if (when.longValue() + UNREACHABLE_PERIOD < now)
|
||||
iter.remove();
|
||||
}
|
||||
}
|
||||
requeue(60*1000);
|
||||
}
|
||||
}
|
||||
|
||||
public static boolean isPubliclyRoutable(byte addr[]) {
|
||||
if (addr.length == 4) {
|
||||
|
@ -44,6 +44,12 @@ public class TransportManager implements TransportEventListener {
|
||||
public TransportManager(RouterContext context) {
|
||||
_context = context;
|
||||
_log = _context.logManager().getLog(TransportManager.class);
|
||||
_context.statManager().createRateStat("transport.shitlistOnUnreachable", "Add a peer to the shitlist since none of the transports can reach them", "Transport", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
|
||||
_context.statManager().createRateStat("transport.noBidsYetNotAllUnreachable", "Add a peer to the shitlist since none of the transports can reach them", "Transport", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
|
||||
_context.statManager().createRateStat("transport.bidFailShitlisted", "Could not attempt to bid on message, as they were shitlisted", "Transport", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
|
||||
_context.statManager().createRateStat("transport.bidFailSelf", "Could not attempt to bid on message, as it targeted ourselves", "Transport", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
|
||||
_context.statManager().createRateStat("transport.bidFailNoTransports", "Could not attempt to bid on message, as none of the transports could attempt it", "Transport", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
|
||||
_context.statManager().createRateStat("transport.bidFailAllTransports", "Could not attempt to bid on message, as all of the transports had failed", "Transport", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
|
||||
_transports = new ArrayList();
|
||||
}
|
||||
|
||||
@ -211,10 +217,16 @@ public class TransportManager implements TransportEventListener {
|
||||
}
|
||||
|
||||
public TransportBid getNextBid(OutNetMessage msg) {
|
||||
int unreachableTransports = 0;
|
||||
Hash peer = msg.getTarget().getIdentity().calculateHash();
|
||||
Set failedTransports = msg.getFailedTransports();
|
||||
TransportBid rv = null;
|
||||
for (int i = 0; i < _transports.size(); i++) {
|
||||
Transport t = (Transport)_transports.get(i);
|
||||
if (t.isUnreachable(peer)) {
|
||||
unreachableTransports++;
|
||||
continue;
|
||||
}
|
||||
if (failedTransports.contains(t.getStyle())) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Skipping transport " + t.getStyle() + " as it already failed");
|
||||
@ -233,8 +245,16 @@ public class TransportManager implements TransportEventListener {
|
||||
} else {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Transport " + t.getStyle() + " did not produce a bid");
|
||||
if (t.isUnreachable(peer))
|
||||
unreachableTransports++;
|
||||
}
|
||||
}
|
||||
if (unreachableTransports >= _transports.size()) {
|
||||
_context.statManager().addRateData("transport.shitlistOnUnreachable", msg.getLifetime(), msg.getLifetime());
|
||||
_context.shitlist().shitlistRouter(peer, "Unreachable on any transport");
|
||||
} else if (rv == null) {
|
||||
_context.statManager().addRateData("transport.noBidsYetNotAllUnreachable", unreachableTransports, msg.getLifetime());
|
||||
}
|
||||
return rv;
|
||||
}
|
||||
|
||||
|
@ -75,6 +75,7 @@ public class EstablishState {
|
||||
private Exception _e;
|
||||
private boolean _verified;
|
||||
private boolean _confirmWritten;
|
||||
private boolean _failedBySkew;
|
||||
|
||||
public EstablishState(RouterContext ctx, NTCPTransport transport, NTCPConnection con) {
|
||||
_context = ctx;
|
||||
@ -131,6 +132,8 @@ public class EstablishState {
|
||||
*/
|
||||
public boolean confirmWritten() { return _confirmWritten; }
|
||||
|
||||
public boolean getFailedBySkew() { return _failedBySkew; }
|
||||
|
||||
/** we are Bob, so receive these bytes as part of an inbound connection */
|
||||
private void receiveInbound(ByteBuffer src) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
@ -141,6 +144,7 @@ public class EstablishState {
|
||||
//if (_log.shouldLog(Log.DEBUG)) _log.debug("recv x" + (int)c + " received=" + _received);
|
||||
if (_received >= _X.length) {
|
||||
if (isCheckInfo(_context, _context.routerHash(), _X)) {
|
||||
_context.statManager().addRateData("ntcp.inboundCheckConnection", 1, 0);
|
||||
fail("Incoming connection was a check connection");
|
||||
return;
|
||||
}
|
||||
@ -170,6 +174,7 @@ public class EstablishState {
|
||||
_log.debug(prefix()+"xor=" + Base64.encode(realXor));
|
||||
}
|
||||
if (!DataHelper.eq(realXor, _hX_xor_bobIdentHash)) {
|
||||
_context.statManager().addRateData("ntcp.invalidHXxorBIH", 1, 0);
|
||||
fail("Invalid hX_xor");
|
||||
return;
|
||||
}
|
||||
@ -217,6 +222,7 @@ public class EstablishState {
|
||||
_transport.getPumper().wantsWrite(_con, write);
|
||||
if (!src.hasRemaining()) return;
|
||||
} catch (DHSessionKeyBuilder.InvalidPublicParameterException e) {
|
||||
_context.statManager().addRateData("ntcp.invalidDH", 1, 0);
|
||||
fail("Invalid X", e);
|
||||
return;
|
||||
}
|
||||
@ -306,6 +312,7 @@ public class EstablishState {
|
||||
_log.debug(prefix()+"DH session key calculated (" + _dh.getSessionKey().toBase64() + ")");
|
||||
_e_hXY_tsB = new byte[Hash.HASH_LENGTH+4+12];
|
||||
} catch (DHSessionKeyBuilder.InvalidPublicParameterException e) {
|
||||
_context.statManager().addRateData("ntcp.invalidDH", 1, 0);
|
||||
fail("Invalid X", e);
|
||||
return;
|
||||
}
|
||||
@ -328,6 +335,7 @@ public class EstablishState {
|
||||
Hash h = _context.sha().calculateHash(XY);
|
||||
if (_log.shouldLog(Log.DEBUG)) _log.debug(prefix() + "h(XY)=" + h.toBase64());
|
||||
if (!DataHelper.eq(h.getData(), 0, hXY_tsB, 0, Hash.HASH_LENGTH)) {
|
||||
_context.statManager().addRateData("ntcp.invalidHXY", 1, 0);
|
||||
fail("Invalid H(X+Y) - mitm attack attempted?");
|
||||
return;
|
||||
}
|
||||
@ -335,6 +343,19 @@ public class EstablishState {
|
||||
_tsA = _context.clock().now()/1000;
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug(prefix()+"h(X+Y) is correct, tsA-tsB=" + (_tsA-_tsB));
|
||||
|
||||
// the skew is not authenticated yet, but it is certainly fatal to
|
||||
// the establishment, so fail hard if appropriate
|
||||
long diff = 1000*Math.abs(_tsA-_tsB);
|
||||
if (diff >= Router.CLOCK_FUDGE_FACTOR) {
|
||||
_context.statManager().addRateData("ntcp.invalidOutboundSkew", diff, 0);
|
||||
_transport.markReachable(_con.getRemotePeer().calculateHash());
|
||||
_context.shitlist().shitlistRouter(_con.getRemotePeer().calculateHash(), "Outbound clock skew of " + diff);
|
||||
fail("Clocks too skewed (" + diff + ")", null, true);
|
||||
return;
|
||||
} else if (_log.shouldLog(Log.DEBUG)) {
|
||||
_log.debug(prefix()+"Clock skew: " + diff);
|
||||
}
|
||||
|
||||
// now prepare and send our response
|
||||
// send E(#+Alice.identity+tsA+padding+S(X+Y+Bob.identHash+tsA+tsB), sk, hX_xor_Bob.identHash[16:31])
|
||||
@ -421,6 +442,7 @@ public class EstablishState {
|
||||
|
||||
_verified = _context.dsa().verifySignature(sig, toVerify, _con.getRemotePeer().getSigningPublicKey());
|
||||
if (!_verified) {
|
||||
_context.statManager().addRateData("ntcp.invalidSignature", 1, 0);
|
||||
fail("Signature was invalid - attempt to spoof " + _con.getRemotePeer().calculateHash().toBase64() + "?");
|
||||
return;
|
||||
} else {
|
||||
@ -478,6 +500,7 @@ public class EstablishState {
|
||||
RouterIdentity alice = new RouterIdentity();
|
||||
int sz = (int)DataHelper.fromLong(b, 0, 2);
|
||||
if ( (sz <= 0) || (sz > b.length-2-4-Signature.SIGNATURE_BYTES) ) {
|
||||
_context.statManager().addRateData("ntcp.invalidInboundSize", sz, 0);
|
||||
fail("size is invalid", new Exception("size is " + sz));
|
||||
return;
|
||||
}
|
||||
@ -486,14 +509,6 @@ public class EstablishState {
|
||||
alice.fromByteArray(aliceData);
|
||||
long tsA = DataHelper.fromLong(b, 2+sz, 4);
|
||||
|
||||
long diff = 1000*Math.abs(tsA-_tsB);
|
||||
if (diff >= Router.CLOCK_FUDGE_FACTOR) {
|
||||
fail("Clocks too skewed (" + diff + ")");
|
||||
return;
|
||||
} else if (_log.shouldLog(Log.DEBUG)) {
|
||||
_log.debug(prefix()+"Clock skew: " + diff);
|
||||
}
|
||||
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream(768);
|
||||
baos.write(_X);
|
||||
baos.write(_Y);
|
||||
@ -515,6 +530,18 @@ public class EstablishState {
|
||||
if (_verified) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug(prefix() + "verification successful for " + _con);
|
||||
|
||||
long diff = 1000*Math.abs(tsA-_tsB);
|
||||
if (diff >= Router.CLOCK_FUDGE_FACTOR) {
|
||||
_context.statManager().addRateData("ntcp.invalidInboundSkew", diff, 0);
|
||||
_transport.markReachable(alice.calculateHash());
|
||||
_context.shitlist().shitlistRouter(alice.calculateHash(), "Clock skew of " + diff);
|
||||
fail("Clocks too skewed (" + diff + ")", null, true);
|
||||
return;
|
||||
} else if (_log.shouldLog(Log.DEBUG)) {
|
||||
_log.debug(prefix()+"Clock skew: " + diff);
|
||||
}
|
||||
|
||||
sendInboundConfirm(alice, tsA);
|
||||
_con.setRemotePeer(alice);
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
@ -525,11 +552,14 @@ public class EstablishState {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info(prefix()+"Verified remote peer as " + alice.calculateHash().toBase64());
|
||||
} else {
|
||||
_context.statManager().addRateData("ntcp.invalidInboundSignature", 1, 0);
|
||||
fail("Peer verification failed - spoof of " + alice.calculateHash().toBase64() + "?");
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
_context.statManager().addRateData("ntcp.invalidInboundIOE", 1, 0);
|
||||
fail("Error verifying peer", ioe);
|
||||
} catch (DataFormatException dfe) {
|
||||
_context.statManager().addRateData("ntcp.invalidInboundDFE", 1, 0);
|
||||
fail("Error verifying peer", dfe);
|
||||
}
|
||||
}
|
||||
@ -578,8 +608,10 @@ public class EstablishState {
|
||||
public byte[] getExtraBytes() { return _extra; }
|
||||
|
||||
private void fail(String reason) { fail(reason, null); }
|
||||
private void fail(String reason, Exception e) {
|
||||
private void fail(String reason, Exception e) { fail(reason, e, false); }
|
||||
private void fail(String reason, Exception e, boolean bySkew) {
|
||||
_corrupt = true;
|
||||
_failedBySkew = bySkew;
|
||||
_err = reason;
|
||||
_e = e;
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
|
@ -33,10 +33,13 @@ public class EventPumper implements Runnable {
|
||||
private static final int BUF_SIZE = 8*1024;
|
||||
private static final int MAX_CACHE_SIZE = 64;
|
||||
/**
|
||||
* every 30s or so, iterate across all ntcp connections just to make sure
|
||||
* we have their interestOps set properly (and to expire any looong idle cons)
|
||||
* every few seconds, iterate across all ntcp connections just to make sure
|
||||
* we have their interestOps set properly (and to expire any looong idle cons).
|
||||
* as the number of connections grows, we should try to make this happen
|
||||
* less frequently (or not at all), but while the connection count is small,
|
||||
* the time to iterate across them to check a few flags shouldn't be a problem.
|
||||
*/
|
||||
private static final long FAILSAFE_ITERATION_FREQ = 60*1000l;
|
||||
private static final long FAILSAFE_ITERATION_FREQ = 2*1000l;
|
||||
|
||||
public EventPumper(RouterContext ctx, NTCPTransport transport) {
|
||||
_context = ctx;
|
||||
@ -75,6 +78,7 @@ public class EventPumper implements Runnable {
|
||||
}
|
||||
public void registerConnect(NTCPConnection con) {
|
||||
if (_log.shouldLog(Log.DEBUG)) _log.debug("Registering outbound connection");
|
||||
_context.statManager().addRateData("ntcp.registerConnect", 1, 0);
|
||||
synchronized (_wantsConRegister) { _wantsConRegister.add(con); }
|
||||
_selector.wakeup();
|
||||
}
|
||||
@ -212,8 +216,10 @@ public class EventPumper implements Runnable {
|
||||
+ "/" + ((key.interestOps()&SelectionKey.OP_READ)!= 0)
|
||||
+ " write? " + write
|
||||
+ "/" + ((key.interestOps()&SelectionKey.OP_WRITE)!= 0)
|
||||
+ " on " + key.attachment()
|
||||
);
|
||||
if (accept) {
|
||||
_context.statManager().addRateData("ntcp.accept", 1, 0);
|
||||
processAccept(key);
|
||||
}
|
||||
if (connect) {
|
||||
@ -221,10 +227,12 @@ public class EventPumper implements Runnable {
|
||||
processConnect(key);
|
||||
}
|
||||
if (read) {
|
||||
_context.statManager().addRateData("ntcp.read", 1, 0);
|
||||
key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
|
||||
processRead(key);
|
||||
}
|
||||
if (write) {
|
||||
_context.statManager().addRateData("ntcp.write", 1, 0);
|
||||
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
|
||||
processWrite(key);
|
||||
}
|
||||
@ -241,6 +249,7 @@ public class EventPumper implements Runnable {
|
||||
if (req.getPendingOutboundRequested() > 0) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("queued write on " + con + " for " + data.length);
|
||||
_context.statManager().addRateData("ntcp.wantsQueuedWrite", 1, 0);
|
||||
con.queuedWrite(buf, req);
|
||||
} else {
|
||||
// fully allocated
|
||||
@ -290,7 +299,8 @@ public class EventPumper implements Runnable {
|
||||
rv = ByteBuffer.allocate(BUF_SIZE);
|
||||
NUM_BUFS = ++__liveBufs;
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("creating a new read buffer " + System.identityHashCode(rv) + " with " + __liveBufs + " live: " + rv);
|
||||
_log.debug("creating a new read buffer " + System.identityHashCode(rv) + " with " + __liveBufs + " live: " + rv);
|
||||
_context.statManager().addRateData("ntcp.liveReadBufs", NUM_BUFS, 0);
|
||||
} else {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("acquiring existing read buffer " + System.identityHashCode(rv) + " with " + __liveBufs + " live: " + rv);
|
||||
@ -351,11 +361,15 @@ public class EventPumper implements Runnable {
|
||||
if (connected) {
|
||||
con.setKey(key);
|
||||
con.outboundConnected();
|
||||
_context.statManager().addRateData("ntcp.connectSuccessful", 1, 0);
|
||||
} else {
|
||||
con.close();
|
||||
_context.statManager().addRateData("ntcp.connectFailedTimeout", 1, 0);
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
if (_log.shouldLog(Log.DEBUG)) _log.debug("Error processing connection", ioe);
|
||||
con.close();
|
||||
_context.statManager().addRateData("ntcp.connectFailedTimeoutIOE", 1, 0);
|
||||
} catch (NoConnectionPendingException ncpe) {
|
||||
// ignore
|
||||
}
|
||||
@ -368,9 +382,12 @@ public class EventPumper implements Runnable {
|
||||
int read = con.getChannel().read(buf);
|
||||
if (read == -1) {
|
||||
if (_log.shouldLog(Log.DEBUG)) _log.debug("EOF on " + con);
|
||||
_context.statManager().addRateData("ntcp.readEOF", 1, 0);
|
||||
con.close();
|
||||
releaseBuf(buf);
|
||||
} else if (read == 0) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("nothing to read for " + con + ", but stay interested");
|
||||
key.interestOps(key.interestOps() | SelectionKey.OP_READ);
|
||||
releaseBuf(buf);
|
||||
} else if (read > 0) {
|
||||
@ -382,16 +399,27 @@ public class EventPumper implements Runnable {
|
||||
FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestInbound(read, "NTCP read", null, null); //con, buf);
|
||||
if (req.getPendingInboundRequested() > 0) {
|
||||
key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("bw throttled reading for " + con + ", so we don't want to read anymore");
|
||||
_context.statManager().addRateData("ntcp.queuedRecv", read, 0);
|
||||
con.queuedRecv(rbuf, req);
|
||||
} else {
|
||||
// fully allocated
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("not bw throttled reading for " + con);
|
||||
key.interestOps(key.interestOps() | SelectionKey.OP_READ);
|
||||
con.recv(rbuf);
|
||||
}
|
||||
}
|
||||
} catch (CancelledKeyException cke) {
|
||||
if (_log.shouldLog(Log.WARN)) _log.warn("error reading", cke);
|
||||
con.close();
|
||||
_context.statManager().addRateData("ntcp.readError", 1, 0);
|
||||
if (buf != null) releaseBuf(buf);
|
||||
} catch (IOException ioe) {
|
||||
if (_log.shouldLog(Log.WARN)) _log.warn("error reading", ioe);
|
||||
con.close();
|
||||
_context.statManager().addRateData("ntcp.readError", 1, 0);
|
||||
if (buf != null) releaseBuf(buf);
|
||||
} catch (NotYetConnectedException nyce) {
|
||||
// ???
|
||||
@ -406,13 +434,23 @@ public class EventPumper implements Runnable {
|
||||
try {
|
||||
while (true) {
|
||||
ByteBuffer buf = con.getNextWriteBuf();
|
||||
if ( (buf != null) && (buf.remaining() > 0) ) {
|
||||
if (buf != null) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("writing " + buf.remaining()+"...");
|
||||
if (buf.remaining() <= 0) {
|
||||
long beforeRem = System.currentTimeMillis();
|
||||
con.removeWriteBuf(buf);
|
||||
long afterRem = System.currentTimeMillis();
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("buffer was already fully written and removed after " + (afterRem-beforeRem) + "...");
|
||||
buf = null;
|
||||
buffers++;
|
||||
continue;
|
||||
}
|
||||
int written = con.getChannel().write(buf);
|
||||
totalWritten += written;
|
||||
if (written == 0) {
|
||||
if ( (buf.remaining() > 0) || (con.getWriteBufCount() > 1) ) {
|
||||
if ( (buf.remaining() > 0) || (con.getWriteBufCount() >= 1) ) {
|
||||
if (_log.shouldLog(Log.DEBUG)) _log.debug("done writing, but data remains...");
|
||||
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
|
||||
} else {
|
||||
@ -439,8 +477,13 @@ public class EventPumper implements Runnable {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (CancelledKeyException cke) {
|
||||
if (_log.shouldLog(Log.WARN)) _log.warn("error writing", cke);
|
||||
_context.statManager().addRateData("ntcp.writeError", 1, 0);
|
||||
con.close();
|
||||
} catch (IOException ioe) {
|
||||
if (_log.shouldLog(Log.WARN)) _log.warn("error writing", ioe);
|
||||
_context.statManager().addRateData("ntcp.writeError", 1, 0);
|
||||
con.close();
|
||||
}
|
||||
long after = System.currentTimeMillis();
|
||||
@ -459,7 +502,11 @@ public class EventPumper implements Runnable {
|
||||
while (buf.size() > 0) {
|
||||
NTCPConnection con = (NTCPConnection)buf.remove(0);
|
||||
SelectionKey key = con.getKey();
|
||||
key.interestOps(key.interestOps() | SelectionKey.OP_READ);
|
||||
try {
|
||||
key.interestOps(key.interestOps() | SelectionKey.OP_READ);
|
||||
} catch (CancelledKeyException cke) {
|
||||
// ignore, we remove/etc elsewhere
|
||||
}
|
||||
}
|
||||
|
||||
synchronized (_wantsWrite) {
|
||||
@ -512,18 +559,34 @@ public class EventPumper implements Runnable {
|
||||
InetSocketAddress saddr = new InetSocketAddress(naddr.getHost(), naddr.getPort());
|
||||
boolean connected = con.getChannel().connect(saddr);
|
||||
if (connected) {
|
||||
_context.statManager().addRateData("ntcp.connectImmediate", 1, 0);
|
||||
key.interestOps(SelectionKey.OP_READ);
|
||||
processConnect(key);
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
if (_log.shouldLog(Log.WARN)) _log.warn("error connecting", ioe);
|
||||
if (ntcpOnly(con)) {
|
||||
_context.shitlist().shitlistRouter(con.getRemotePeer().calculateHash(), "unable to connect: " + ioe.getMessage());
|
||||
con.close(false);
|
||||
} else {
|
||||
_context.shitlist().shitlistRouter(con.getRemotePeer().calculateHash(), "unable to connect: " + ioe.getMessage(), NTCPTransport.STYLE);
|
||||
_context.statManager().addRateData("ntcp.connectFailedIOE", 1, 0);
|
||||
_transport.markUnreachable(con.getRemotePeer().calculateHash());
|
||||
//if (ntcpOnly(con)) {
|
||||
// _context.shitlist().shitlistRouter(con.getRemotePeer().calculateHash(), "unable to connect: " + ioe.getMessage());
|
||||
// con.close(false);
|
||||
//} else {
|
||||
// _context.shitlist().shitlistRouter(con.getRemotePeer().calculateHash(), "unable to connect: " + ioe.getMessage(), NTCPTransport.STYLE);
|
||||
con.close(true);
|
||||
}
|
||||
//}
|
||||
} catch (UnresolvedAddressException uae) {
|
||||
if (_log.shouldLog(Log.WARN)) _log.warn("unresolved address connecting", uae);
|
||||
_context.statManager().addRateData("ntcp.connectFailedUnresolved", 1, 0);
|
||||
_transport.markUnreachable(con.getRemotePeer().calculateHash());
|
||||
//if (ntcpOnly(con)) {
|
||||
// _context.shitlist().shitlistRouter(con.getRemotePeer().calculateHash(), "unable to connect/resolve: " + uae.getMessage());
|
||||
// con.close(false);
|
||||
//} else {
|
||||
// _context.shitlist().shitlistRouter(con.getRemotePeer().calculateHash(), "unable to connect/resolve: " + uae.getMessage(), NTCPTransport.STYLE);
|
||||
con.close(true);
|
||||
//}
|
||||
} catch (CancelledKeyException cke) {
|
||||
con.close(false);
|
||||
}
|
||||
} catch (ClosedChannelException cce) {
|
||||
if (_log.shouldLog(Log.WARN)) _log.warn("Error registering", cce);
|
||||
|
@ -96,8 +96,10 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
|
||||
private boolean _sendingMeta;
|
||||
/** how many consecutive sends were failed due to (estimated) send queue time */
|
||||
private int _consecutiveBacklog;
|
||||
private long _nextInfoTime;
|
||||
|
||||
private static final int META_FREQUENCY = 10*60*1000;
|
||||
private static final int INFO_FREQUENCY = 6*60*60*1000;
|
||||
|
||||
/**
|
||||
* Create an inbound connected (though not established) NTCP connection
|
||||
@ -181,6 +183,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
|
||||
_transport.inboundEstablished(this);
|
||||
_establishState = null;
|
||||
_nextMetaTime = System.currentTimeMillis() + _context.random().nextInt(META_FREQUENCY);
|
||||
_nextInfoTime = System.currentTimeMillis() + INFO_FREQUENCY + _context.random().nextInt(INFO_FREQUENCY);
|
||||
}
|
||||
public long getClockSkew() { return _clockSkew; }
|
||||
public long getUptime() {
|
||||
@ -199,8 +202,8 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
|
||||
return queued;
|
||||
}
|
||||
}
|
||||
public long getTimeSinceSend() { return System.currentTimeMillis()-_lastSendTime; }
|
||||
public long getTimeSinceReceive() { return System.currentTimeMillis()-_lastReceiveTime; }
|
||||
public long getTimeSinceSend() { return _lastSendTime <= 0 ? 0 : System.currentTimeMillis()-_lastSendTime; }
|
||||
public long getTimeSinceReceive() { return _lastReceiveTime <= 0 ? 0 : System.currentTimeMillis()-_lastReceiveTime; }
|
||||
public long getTimeSinceCreated() { return System.currentTimeMillis()-_created; }
|
||||
public int getConsecutiveBacklog() { return _consecutiveBacklog; }
|
||||
|
||||
@ -233,6 +236,16 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
|
||||
}
|
||||
for (int i = 0; i < msgs.size(); i++) {
|
||||
OutNetMessage msg = (OutNetMessage)msgs.get(i);
|
||||
Object buf = msg.releasePreparationBuffer();
|
||||
if (buf != null)
|
||||
releaseBuf((PrepBuffer)buf);
|
||||
_transport.afterSend(msg, false, allowRequeue, msg.getLifetime());
|
||||
}
|
||||
OutNetMessage msg = _currentOutbound;
|
||||
if (msg != null) {
|
||||
Object buf = msg.releasePreparationBuffer();
|
||||
if (buf != null)
|
||||
releaseBuf((PrepBuffer)buf);
|
||||
_transport.afterSend(msg, false, allowRequeue, msg.getLifetime());
|
||||
}
|
||||
}
|
||||
@ -253,22 +266,26 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
|
||||
synchronized (_writeBufs) { blocks = _writeBufs.size(); }
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
_log.error("Too backlogged for too long (" + _consecutiveBacklog + " messages for " + DataHelper.formatDuration(queueTime()) + ", sched? " + wantsWrite + ", blocks: " + blocks + ") sending to " + _remotePeer.calculateHash().toBase64());
|
||||
_context.statManager().addRateData("ntcp.closeOnBacklog", getUptime(), getUptime());
|
||||
close();
|
||||
}
|
||||
_context.statManager().addRateData("ntcp.dontSendOnBacklog", _consecutiveBacklog, msg.getLifetime());
|
||||
return;
|
||||
}
|
||||
_consecutiveBacklog = 0;
|
||||
int enqueued = 0;
|
||||
if (FAST_LARGE)
|
||||
bufferedPrepare(msg);
|
||||
boolean noOutbound = false;
|
||||
synchronized (_outbound) {
|
||||
_outbound.add(msg);
|
||||
enqueued = _outbound.size();
|
||||
msg.setQueueSize(enqueued);
|
||||
noOutbound = (_currentOutbound == null);
|
||||
}
|
||||
if (_log.shouldLog(Log.DEBUG)) _log.debug("messages enqueued on " + toString() + ": " + enqueued + " new one: " + msg.getMessageId() + " of " + msg.getMessageType());
|
||||
if (_established && _currentOutbound == null)
|
||||
_transport.getWriter().wantsWrite(this);
|
||||
if (_established && noOutbound)
|
||||
_transport.getWriter().wantsWrite(this, "enqueued");
|
||||
}
|
||||
|
||||
private long queueTime() {
|
||||
@ -289,8 +306,10 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
|
||||
long queueTime = queueTime();
|
||||
if (queueTime <= 0) return false;
|
||||
int size = 0;
|
||||
boolean currentOutboundSet = false;
|
||||
synchronized (_outbound) {
|
||||
size = _outbound.size();
|
||||
currentOutboundSet = (_currentOutbound != null);
|
||||
}
|
||||
|
||||
// perhaps we could take into account the size of the queued messages too, our
|
||||
@ -299,8 +318,13 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
|
||||
if (getUptime() < 10*1000) // allow some slack just after establishment
|
||||
return false;
|
||||
if (queueTime > 5*1000) { // bloody arbitrary. well, its half the average message lifetime...
|
||||
int writeBufs = 0;
|
||||
synchronized (_writeBufs) { writeBufs = _writeBufs.size(); }
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Too backlogged: queue time " + queueTime + " and the size is " + size);
|
||||
_log.warn("Too backlogged: queue time " + queueTime + " and the size is " + size
|
||||
+ ", wantsWrite? " + (0 != (_conKey.interestOps()&SelectionKey.OP_WRITE))
|
||||
+ ", currentOut set? " + currentOutboundSet
|
||||
+ ", writeBufs: " + writeBufs + " on " + toString());
|
||||
_context.statManager().addRateData("ntcp.sendBacklogTime", queueTime, size);
|
||||
return true;
|
||||
//} else if (size > 32) { // another arbitrary limit.
|
||||
@ -324,6 +348,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
|
||||
if (target != null) {
|
||||
infoMsg.setTarget(target);
|
||||
infoMsg.beginSend();
|
||||
_context.statManager().addRateData("ntcp.infoMessageEnqueued", 1, 0);
|
||||
send(infoMsg);
|
||||
} else {
|
||||
if (_isInbound) {
|
||||
@ -351,16 +376,20 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
|
||||
_established = true;
|
||||
_establishedOn = System.currentTimeMillis();
|
||||
_establishState = null;
|
||||
_context.shitlist().unshitlistRouter(getRemotePeer().calculateHash());
|
||||
_transport.markReachable(getRemotePeer().calculateHash());
|
||||
//_context.shitlist().unshitlistRouter(getRemotePeer().calculateHash(), NTCPTransport.STYLE);
|
||||
boolean msgs = false;
|
||||
synchronized (_outbound) {
|
||||
msgs = (_outbound.size() > 0);
|
||||
}
|
||||
_nextMetaTime = System.currentTimeMillis() + _context.random().nextInt(META_FREQUENCY);
|
||||
_nextInfoTime = System.currentTimeMillis() + INFO_FREQUENCY + _context.random().nextInt(INFO_FREQUENCY);
|
||||
if (msgs)
|
||||
_transport.getWriter().wantsWrite(this);
|
||||
_transport.getWriter().wantsWrite(this, "outbound established");
|
||||
}
|
||||
|
||||
public boolean getIsInbound() { return _isInbound; }
|
||||
|
||||
// Time vs space tradeoff:
|
||||
// on slow GCing jvms, the mallocs in the following preparation can cause the
|
||||
// write to get congested, taking up a substantial portion of the Writer's
|
||||
@ -474,6 +503,14 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
|
||||
// + Base64.encode(encrypted, 0, 16) + "...\ndecrypted: "
|
||||
// + Base64.encode(unencrypted, 0, 16) + "..." + "\nIV=" + Base64.encode(_prevWriteEnd, 0, 16));
|
||||
_transport.getPumper().wantsWrite(this, encrypted);
|
||||
|
||||
// for every 6-12 hours that we are connected to a peer, send them
|
||||
// our updated netDb info (they may not accept it and instead query
|
||||
// the floodfill netDb servers, but they may...)
|
||||
if (_nextInfoTime <= System.currentTimeMillis()) {
|
||||
enqueueInfoMessage();
|
||||
_nextInfoTime = System.currentTimeMillis() + INFO_FREQUENCY + _context.random().nextInt(INFO_FREQUENCY);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -537,6 +574,15 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
|
||||
+ " encrypted=" + (encryptedTime-begin)
|
||||
+ " wantsWrite=" + (wantsTime-encryptedTime)
|
||||
+ " releaseBuf=" + (releaseTime-wantsTime));
|
||||
|
||||
// for every 6-12 hours that we are connected to a peer, send them
|
||||
// our updated netDb info (they may not accept it and instead query
|
||||
// the floodfill netDb servers, but they may...)
|
||||
if (_nextInfoTime <= System.currentTimeMillis()) {
|
||||
// perhaps this should check to see if we are bw throttled, etc?
|
||||
enqueueInfoMessage();
|
||||
_nextInfoTime = System.currentTimeMillis() + INFO_FREQUENCY + _context.random().nextInt(INFO_FREQUENCY);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -608,6 +654,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
|
||||
NUM_PREP_BUFS = ++__liveBufs;
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("creating a new prep buffer with " + __liveBufs + " live");
|
||||
_context.statManager().addRateData("ntcp.prepBufCache", NUM_PREP_BUFS, 0);
|
||||
b.acquired();
|
||||
return b;
|
||||
}
|
||||
@ -675,16 +722,24 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
|
||||
_conKey.interestOps(SelectionKey.OP_READ);
|
||||
// schedule up the beginning of our handshaking by calling prepareNextWrite on the
|
||||
// writer thread pool
|
||||
_transport.getWriter().wantsWrite(this);
|
||||
_transport.getWriter().wantsWrite(this, "outbound connected");
|
||||
}
|
||||
|
||||
public void complete(FIFOBandwidthLimiter.Request req) {
|
||||
removeRequest(req);
|
||||
ByteBuffer buf = (ByteBuffer)req.attachment();
|
||||
if (req.getTotalInboundRequested() > 0)
|
||||
if (req.getTotalInboundRequested() > 0) {
|
||||
_context.statManager().addRateData("ntcp.throttledReadComplete", (System.currentTimeMillis()-req.getRequestTime()), 0);
|
||||
recv(buf);
|
||||
else if (req.getTotalOutboundRequested() > 0)
|
||||
// our reads used to be bw throttled (during which time we were no
|
||||
// longer interested in reading from the network), but we aren't
|
||||
// throttled anymore, so we should resume being interested in reading
|
||||
_transport.getPumper().wantsRead(this);
|
||||
//_transport.getReader().wantsRead(this);
|
||||
} else if (req.getTotalOutboundRequested() > 0) {
|
||||
_context.statManager().addRateData("ntcp.throttledWriteComplete", (System.currentTimeMillis()-req.getRequestTime()), 0);
|
||||
write(buf);
|
||||
}
|
||||
}
|
||||
private void removeRequest(FIFOBandwidthLimiter.Request req) {
|
||||
synchronized (_bwRequests) { _bwRequests.remove(req); }
|
||||
@ -739,6 +794,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
|
||||
synchronized (_writeBufs) {
|
||||
_writeBufs.add(buf);
|
||||
}
|
||||
if (_log.shouldLog(Log.DEBUG)) _log.debug("After write(buf)");
|
||||
_transport.getPumper().wantsWrite(this);
|
||||
}
|
||||
|
||||
@ -760,7 +816,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
|
||||
public ByteBuffer getNextWriteBuf() {
|
||||
synchronized (_writeBufs) {
|
||||
if (_writeBufs.size() > 0)
|
||||
return (ByteBuffer)_writeBufs.remove(0);
|
||||
return (ByteBuffer)_writeBufs.get(0); // not remove! we removeWriteBuf afterwards
|
||||
}
|
||||
return null;
|
||||
}
|
||||
@ -768,30 +824,38 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
|
||||
public void removeWriteBuf(ByteBuffer buf) {
|
||||
_bytesSent += buf.capacity();
|
||||
OutNetMessage msg = null;
|
||||
boolean bufsRemain = false;
|
||||
boolean clearMessage = false;
|
||||
synchronized (_writeBufs) {
|
||||
if (_sendingMeta && (buf.capacity() == _meta.length)) {
|
||||
_sendingMeta = false;
|
||||
} else {
|
||||
if (_currentOutbound != null)
|
||||
clearMessage = true;
|
||||
}
|
||||
_writeBufs.remove(buf);
|
||||
bufsRemain = _writeBufs.size() > 0;
|
||||
}
|
||||
if (clearMessage) {
|
||||
synchronized (_outbound) {
|
||||
if (_currentOutbound != null)
|
||||
msg = _currentOutbound;
|
||||
_currentOutbound = null;
|
||||
}
|
||||
_writeBufs.remove(buf);
|
||||
}
|
||||
if (msg != null) {
|
||||
_lastSendTime = System.currentTimeMillis();
|
||||
_context.statManager().addRateData("ntcp.sendTime", msg.getSendTime(), msg.getSendTime());
|
||||
_context.statManager().addRateData("ntcp.transmitTime", msg.getTransmissionTime(), msg.getTransmissionTime());
|
||||
_context.statManager().addRateData("ntcp.sendQueueSize", msg.getQueueSize(), msg.getLifetime());
|
||||
if (_log.shouldLog(Log.INFO)) {
|
||||
_log.info("I2NP message " + _messagesWritten + "/" + msg.getMessageId() + " sent after "
|
||||
+ msg.getSendTime() + "/" + msg.getTransmissionTime() + "/"
|
||||
+ msg.getPreparationTime() + "/" + msg.getLifetime()
|
||||
+ " queued after " + msg.getQueueSize()
|
||||
+ " with " + buf.capacity() + " bytes (uid=" + System.identityHashCode(msg)+")");
|
||||
if (msg != null) {
|
||||
_lastSendTime = System.currentTimeMillis();
|
||||
_context.statManager().addRateData("ntcp.sendTime", msg.getSendTime(), msg.getSendTime());
|
||||
_context.statManager().addRateData("ntcp.transmitTime", msg.getTransmissionTime(), msg.getTransmissionTime());
|
||||
_context.statManager().addRateData("ntcp.sendQueueSize", msg.getQueueSize(), msg.getLifetime());
|
||||
if (_log.shouldLog(Log.INFO)) {
|
||||
_log.info("I2NP message " + _messagesWritten + "/" + msg.getMessageId() + " sent after "
|
||||
+ msg.getSendTime() + "/" + msg.getTransmissionTime() + "/"
|
||||
+ msg.getPreparationTime() + "/" + msg.getLifetime()
|
||||
+ " queued after " + msg.getQueueSize()
|
||||
+ " with " + buf.capacity() + " bytes (uid=" + System.identityHashCode(msg)+" on " + toString() + ")");
|
||||
}
|
||||
_messagesWritten++;
|
||||
_transport.sendComplete(msg);
|
||||
}
|
||||
_messagesWritten++;
|
||||
_transport.sendComplete(msg);
|
||||
} else {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("I2NP meta message sent completely");
|
||||
@ -801,8 +865,10 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
|
||||
synchronized (_outbound) {
|
||||
msgs = ((_outbound.size() > 0) || (_currentOutbound != null));
|
||||
}
|
||||
if (msgs)
|
||||
_transport.getWriter().wantsWrite(this);
|
||||
if (msgs) // push through the bw limiter to reach _writeBufs
|
||||
_transport.getWriter().wantsWrite(this, "write completed");
|
||||
if (bufsRemain) // send asap
|
||||
_transport.getPumper().wantsWrite(this);
|
||||
updateStats();
|
||||
}
|
||||
|
||||
@ -879,6 +945,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
|
||||
if (!ok) {
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
_log.error("Read buffer " + System.identityHashCode(buf) + " contained corrupt data");
|
||||
_context.statManager().addRateData("ntcp.corruptDecryptedI2NP", 1, getUptime());
|
||||
return;
|
||||
}
|
||||
byte swap[] = _prevReadBlock;
|
||||
@ -895,6 +962,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
|
||||
if (_curReadState.getSize() > 16*1024) {
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
_log.error("i2np message more than 16KB? nuh uh: " + _curReadState.getSize());
|
||||
_context.statManager().addRateData("ntcp.corruptTooLargeI2NP", _curReadState.getSize(), getUptime());
|
||||
close();
|
||||
return false;
|
||||
} else {
|
||||
@ -922,6 +990,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
|
||||
if (read != expected) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("I2NP metadata message had a bad CRC value");
|
||||
_context.statManager().addRateData("ntcp.corruptMetaCRC", 1, getUptime());
|
||||
close();
|
||||
return;
|
||||
} else {
|
||||
@ -929,9 +998,11 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
|
||||
if ( (newSkew > Router.CLOCK_FUDGE_FACTOR) || (newSkew < 0-Router.CLOCK_FUDGE_FACTOR) ) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Peer's skew jumped too far (from " + _clockSkew + " to " + newSkew + "): " + toString());
|
||||
_context.statManager().addRateData("ntcp.corruptSkew", newSkew, getUptime());
|
||||
close();
|
||||
return;
|
||||
}
|
||||
_context.statManager().addRateData("ntcp.receiveMeta", newSkew, getUptime());
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Received NTCP metadata, old skew of " + _clockSkew + ", new skew of " + newSkew);
|
||||
_clockSkew = newSkew;
|
||||
@ -955,6 +1026,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
|
||||
_log.debug("Sending NTCP metadata");
|
||||
_sendingMeta = true;
|
||||
_transport.getPumper().wantsWrite(this, encrypted);
|
||||
// enqueueInfoMessage(); // this often?
|
||||
}
|
||||
|
||||
public int hashCode() { return System.identityHashCode(this); }
|
||||
@ -1118,7 +1190,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
|
||||
releaseHandler(h);
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("I2NP message " + _messagesRead + "/" + (read != null ? read.getUniqueId() : 0)
|
||||
+ " received after " + timeToRecv + " with " + _size +"/"+ (_blocks*16) + " bytes");
|
||||
+ " received after " + timeToRecv + " with " + _size +"/"+ (_blocks*16) + " bytes on " + toString());
|
||||
_context.statManager().addRateData("ntcp.receiveTime", timeToRecv, timeToRecv);
|
||||
_context.statManager().addRateData("ntcp.receiveSize", _size, timeToRecv);
|
||||
if (read != null) {
|
||||
@ -1133,17 +1205,20 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
|
||||
} catch (IOException ioe) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Error parsing I2NP message", ioe);
|
||||
_context.statManager().addRateData("ntcp.corruptI2NPIOE", 1, getUptime());
|
||||
close();
|
||||
return;
|
||||
} catch (I2NPMessageException ime) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Error parsing I2NP message", ime);
|
||||
_context.statManager().addRateData("ntcp.corruptI2NPIME", 1, getUptime());
|
||||
close();
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
_log.error("CRC incorrect for message " + _messagesRead + " (calc=" + val + " expected=" + _expectedCrc + ") size=" + _size + " blocks " + _blocks);
|
||||
_context.statManager().addRateData("ntcp.corruptI2NPCRC", 1, getUptime());
|
||||
close();
|
||||
return;
|
||||
}
|
||||
|
@ -62,7 +62,57 @@ public class NTCPTransport extends TransportImpl {
|
||||
_context.statManager().createRateStat("ntcp.sendBacklogTime", "How long the head of the send queue has been waiting when we fail to add a new one to the queue (period is the number of messages queued)", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.failsafeWrites", "How many times do we need to proactively add in an extra nio write to a peer at any given failsafe pass?", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.failsafeCloses", "How many times do we need to proactively close an idle connection to a peer at any given failsafe pass?", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
|
||||
_context.statManager().createRateStat("ntcp.accept", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.attemptShitlistedPeer", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.attemptUnreachablePeer", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.closeOnBacklog", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.connectFailedIOE", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.connectFailedInvalidPort", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.bidRejectedNoNTCPAddress", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.connectFailedTimeout", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.connectFailedTimeoutIOE", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.connectFailedUnresolved", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.connectImmediate", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.connectSuccessful", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.corruptDecryptedI2NP", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.corruptI2NPCRC", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.corruptI2NPIME", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.corruptI2NPIOE", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.corruptMetaCRC", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.corruptSkew", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.corruptTooLargeI2NP", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.dontSendOnBacklog", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.inboundCheckConnection", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.inboundEstablished", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.inboundEstablishedDuplicate", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.infoMessageEnqueued", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.invalidDH", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.invalidHXY", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.invalidHXxorBIH", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.invalidInboundDFE", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.invalidInboundIOE", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.invalidInboundSignature", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.invalidInboundSize", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.invalidInboundSkew", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.invalidSignature", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.liveReadBufs", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.multipleCloseOnRemove", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.outboundEstablishFailed", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.outboundFailedIOEImmediate", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.invalidOutboundSkew", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.prepBufCache", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.queuedRecv", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.read", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.readEOF", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.readError", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.receiveCorruptEstablishment", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.receiveMeta", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.registerConnect", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.throttledReadComplete", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.throttledWriteComplete", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.wantsQueuedWrite", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.write", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_context.statManager().createRateStat("ntcp.writeError", "", "ntcp", new long[] { 60*1000, 10*60*1000 });
|
||||
_establishing = new ArrayList(4);
|
||||
_conLock = new Object();
|
||||
_conByIdent = new HashMap(64);
|
||||
@ -79,7 +129,9 @@ public class NTCPTransport extends TransportImpl {
|
||||
}
|
||||
|
||||
void inboundEstablished(NTCPConnection con) {
|
||||
_context.shitlist().unshitlistRouter(con.getRemotePeer().calculateHash());
|
||||
_context.statManager().addRateData("ntcp.inboundEstablished", 1, 0);
|
||||
markReachable(con.getRemotePeer().calculateHash());
|
||||
//_context.shitlist().unshitlistRouter(con.getRemotePeer().calculateHash());
|
||||
NTCPConnection old = null;
|
||||
synchronized (_conLock) {
|
||||
old = (NTCPConnection)_conByIdent.put(con.getRemotePeer().calculateHash(), con);
|
||||
@ -87,6 +139,7 @@ public class NTCPTransport extends TransportImpl {
|
||||
if (old != null) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Old connection closed: " + old + " replaced by " + con);
|
||||
_context.statManager().addRateData("ntcp.inboundEstablishedDuplicate", old.getUptime(), 0);
|
||||
old.close();
|
||||
}
|
||||
}
|
||||
@ -127,6 +180,7 @@ public class NTCPTransport extends TransportImpl {
|
||||
} catch (IOException ioe) {
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
_log.error("Error opening a channel", ioe);
|
||||
_context.statManager().addRateData("ntcp.outboundFailedIOEImmediate", 1, 0);
|
||||
con.close();
|
||||
}
|
||||
} else {
|
||||
@ -152,8 +206,8 @@ public class NTCPTransport extends TransportImpl {
|
||||
old = (NTCPConnection)_conByIdent.put(ih, con);
|
||||
}
|
||||
if (old != null) {
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
_log.error("Multiple connections on out ready, closing " + old + " and keeping " + con);
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Multiple connections on out ready, closing " + old + " and keeping " + con);
|
||||
old.close();
|
||||
}
|
||||
con.enqueueInfoMessage(); // enqueues a netDb store of our own info
|
||||
@ -181,23 +235,49 @@ public class NTCPTransport extends TransportImpl {
|
||||
super.afterSend(msg, sendSuccessful, allowRequeue, msToSend);
|
||||
}
|
||||
public TransportBid bid(RouterInfo toAddress, long dataSize) {
|
||||
if (_context.shitlist().isShitlisted(toAddress.getIdentity().calculateHash(), STYLE)) {
|
||||
Hash peer = toAddress.getIdentity().calculateHash();
|
||||
if (_context.shitlist().isShitlisted(peer, STYLE)) {
|
||||
// we aren't shitlisted in general (since we are trying to get a bid), but we have
|
||||
// recently shitlisted the peer on the NTCP transport, so don't try it
|
||||
_context.statManager().addRateData("ntcp.attemptShitlistedPeer", 1, 0);
|
||||
return null;
|
||||
} else if (isUnreachable(peer)) {
|
||||
_context.statManager().addRateData("ntcp.attemptUnreachablePeer", 1, 0);
|
||||
return null;
|
||||
}
|
||||
|
||||
boolean established = isEstablished(toAddress.getIdentity());
|
||||
if (established) { // should we check the queue size? nah, if its valid, use it
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("fast bid when trying to send to " + toAddress.getIdentity().calculateHash().toBase64() + " as its already established");
|
||||
return _fastBid;
|
||||
}
|
||||
RouterAddress addr = toAddress.getTargetAddress(STYLE);
|
||||
|
||||
if (addr == null) {
|
||||
markUnreachable(peer);
|
||||
_context.statManager().addRateData("ntcp.bidRejectedNoNTCPAddress", 1, 0);
|
||||
//_context.shitlist().shitlistRouter(toAddress.getIdentity().calculateHash(), "No NTCP address", STYLE);
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("no bid when trying to send to " + toAddress.getIdentity().calculateHash().toBase64() + " as they don't have an ntcp address");
|
||||
return null;
|
||||
}
|
||||
NTCPAddress naddr = new NTCPAddress(addr);
|
||||
if ( (naddr.getPort() <= 0) || (naddr.getHost() == null) ) {
|
||||
_context.statManager().addRateData("ntcp.connectFailedInvalidPort", 1, 0);
|
||||
markUnreachable(peer);
|
||||
//_context.shitlist().shitlistRouter(toAddress.getIdentity().calculateHash(), "Invalid NTCP address", STYLE);
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("no bid when trying to send to " + toAddress.getIdentity().calculateHash().toBase64() + " as they don't have a valid ntcp address");
|
||||
return null;
|
||||
}
|
||||
|
||||
//if ( (_myAddress != null) && (_myAddress.equals(addr)) )
|
||||
// return null; // dont talk to yourself
|
||||
|
||||
boolean established = isEstablished(toAddress.getIdentity());
|
||||
if (established) // should we check the queue size? nah, if its valid, use it
|
||||
return _fastBid;
|
||||
else if (addr != null)
|
||||
return _slowBid;
|
||||
else
|
||||
return null;
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("slow bid when trying to send to " + toAddress.getIdentity().calculateHash().toBase64());
|
||||
return _slowBid;
|
||||
}
|
||||
|
||||
void sendComplete(OutNetMessage msg) { _finisher.add(msg); }
|
||||
@ -237,8 +317,9 @@ public class NTCPTransport extends TransportImpl {
|
||||
removed = (NTCPConnection)_conByIdent.remove(ident.calculateHash());
|
||||
}
|
||||
if ( (removed != null) && (removed != con) ) {// multiple cons, close 'em both
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
_log.error("Multiple connections on remove, closing " + removed + " (already closed " + con + ")");
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Multiple connections on remove, closing " + removed + " (already closed " + con + ")");
|
||||
_context.statManager().addRateData("ntcp.multipleCloseOnRemove", removed.getUptime(), 0);
|
||||
removed.close();
|
||||
}
|
||||
}
|
||||
@ -256,7 +337,7 @@ public class NTCPTransport extends TransportImpl {
|
||||
synchronized (_conLock) {
|
||||
for (Iterator iter = _conByIdent.values().iterator(); iter.hasNext(); ) {
|
||||
NTCPConnection con = (NTCPConnection)iter.next();
|
||||
if (con.getTimeSinceSend() <= 60*1000)
|
||||
if ( (con.getTimeSinceSend() <= 60*1000) || (con.getTimeSinceReceive() <= 60*1000) )
|
||||
active++;
|
||||
}
|
||||
}
|
||||
@ -349,6 +430,8 @@ public class NTCPTransport extends TransportImpl {
|
||||
}
|
||||
for (int i = 0; expired != null && i < expired.size(); i++)
|
||||
((NTCPConnection)expired.get(i)).close();
|
||||
if ( (expired != null) && (expired.size() > 0) )
|
||||
_context.statManager().addRateData("ntcp.outboundEstablishFailed", expired.size(), 0);
|
||||
}
|
||||
|
||||
//private boolean bindAllInterfaces() { return true; }
|
||||
@ -360,10 +443,17 @@ public class NTCPTransport extends TransportImpl {
|
||||
} else {
|
||||
RouterAddress ra = CommSystemFacadeImpl.createNTCPAddress(ctx);
|
||||
if (ra != null) {
|
||||
_myAddress = new NTCPAddress(ra);
|
||||
replaceAddress(ra);
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("NTCP address configured: " + _myAddress);
|
||||
NTCPAddress addr = new NTCPAddress(ra);
|
||||
if (addr.getPort() <= 0) {
|
||||
_myAddress = null;
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
_log.error("NTCP address is outbound only, since the NTCP configuration is invalid");
|
||||
} else {
|
||||
_myAddress = addr;
|
||||
replaceAddress(ra);
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("NTCP address configured: " + _myAddress);
|
||||
}
|
||||
} else {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("NTCP address is outbound only");
|
||||
@ -404,11 +494,17 @@ public class NTCPTransport extends TransportImpl {
|
||||
int numPeers = 0;
|
||||
int readingPeers = 0;
|
||||
int writingPeers = 0;
|
||||
float bpsSend = 0;
|
||||
float bpsRecv = 0;
|
||||
long totalUptime = 0;
|
||||
long totalSend = 0;
|
||||
long totalRecv = 0;
|
||||
|
||||
StringBuffer buf = new StringBuffer(512);
|
||||
buf.append("<b id=\"ntcpcon\">NTCP connections: ").append(peers.size()).append("</b><br />\n");
|
||||
buf.append("<table border=\"1\">\n");
|
||||
buf.append(" <tr><td><b>peer</b></td>");
|
||||
buf.append(" <td><b>dir</b></td>");
|
||||
buf.append(" <td><b>uptime</b></td>");
|
||||
buf.append(" <td><b>idle</b></td>");
|
||||
buf.append(" <td><b>sent</b></td>");
|
||||
@ -424,13 +520,34 @@ public class NTCPTransport extends TransportImpl {
|
||||
for (Iterator iter = peers.iterator(); iter.hasNext(); ) {
|
||||
NTCPConnection con = (NTCPConnection)iter.next();
|
||||
buf.append("<tr><td>").append(con.getRemotePeer().calculateHash().toBase64().substring(0,8));
|
||||
buf.append("</td><td>");
|
||||
if (con.getIsInbound())
|
||||
buf.append("in");
|
||||
else
|
||||
buf.append("out");
|
||||
buf.append("</td><td>").append(DataHelper.formatDuration(con.getUptime()));
|
||||
buf.append("</td><td>").append(DataHelper.formatDuration(con.getTimeSinceSend()));
|
||||
buf.append("/").append(DataHelper.formatDuration(con.getTimeSinceReceive()));
|
||||
buf.append("</td><td>").append(con.getMessagesSent());
|
||||
totalUptime += con.getUptime();
|
||||
buf.append("</td><td>").append(con.getTimeSinceSend()/1000);
|
||||
buf.append("s/").append(con.getTimeSinceReceive()/1000);
|
||||
buf.append("s</td><td>").append(con.getMessagesSent());
|
||||
totalSend += con.getMessagesSent();
|
||||
buf.append("</td><td>").append(con.getMessagesReceived());
|
||||
buf.append("</td><td>").append(formatRate(con.getSendRate()/1024));
|
||||
buf.append("/").append(formatRate(con.getRecvRate()/1024)).append("KBps");
|
||||
totalRecv += con.getMessagesReceived();
|
||||
buf.append("</td><td>");
|
||||
if (con.getTimeSinceSend() < 10*1000) {
|
||||
buf.append(formatRate(con.getSendRate()/1024));
|
||||
bpsSend += con.getSendRate();
|
||||
} else {
|
||||
buf.append(formatRate(0));
|
||||
}
|
||||
buf.append("/");
|
||||
if (con.getTimeSinceReceive() < 10*1000) {
|
||||
buf.append(formatRate(con.getRecvRate()/1024));
|
||||
bpsRecv += con.getRecvRate();
|
||||
} else {
|
||||
buf.append(formatRate(0));
|
||||
}
|
||||
buf.append("KBps");
|
||||
long outQueue = con.getOutboundQueueSize();
|
||||
if (outQueue <= 0) {
|
||||
buf.append("</td><td>No messages");
|
||||
@ -454,6 +571,15 @@ public class NTCPTransport extends TransportImpl {
|
||||
buf.setLength(0);
|
||||
}
|
||||
|
||||
if (peers.size() > 0) {
|
||||
buf.append("<tr><td colspan=\"11\"><hr /></td></tr>\n");
|
||||
buf.append("<tr><td>").append(peers.size()).append(" peers</td><td> </td><td>").append(DataHelper.formatDuration(totalUptime/peers.size()));
|
||||
buf.append("</td><td> </td><td>").append(totalSend).append("</td><td>").append(totalRecv);
|
||||
buf.append("</td><td>").append(formatRate(bpsSend/1024)).append("/").append(formatRate(bpsRecv/1024)).append("KBps");
|
||||
buf.append("</td><td> </td><td> </td><td> </td><td> </td>");
|
||||
buf.append("</tr>\n");
|
||||
}
|
||||
|
||||
buf.append("</table>\n");
|
||||
buf.append("Peers currently reading I2NP messages: ").append(readingPeers).append("<br />\n");
|
||||
buf.append("Peers currently writing I2NP messages: ").append(writingPeers).append("<br />\n");
|
||||
|
@ -148,6 +148,8 @@ class Reader {
|
||||
if (est.isCorrupt()) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("closing connection on establishment because: " +est.getError(), est.getException());
|
||||
if (!est.getFailedBySkew())
|
||||
_context.statManager().addRateData("ntcp.receiveCorruptEstablishment", 1, 0);
|
||||
con.close();
|
||||
return;
|
||||
} else if (buf.remaining() <= 0) {
|
||||
|
@ -48,10 +48,11 @@ class Writer {
|
||||
}
|
||||
}
|
||||
|
||||
public void wantsWrite(NTCPConnection con) {
|
||||
public void wantsWrite(NTCPConnection con, String source) {
|
||||
//if (con.getCurrentOutbound() != null)
|
||||
// throw new RuntimeException("Current outbound message already in play on " + con);
|
||||
boolean already = false;
|
||||
boolean pending = false;
|
||||
synchronized (_pendingConnections) {
|
||||
if (_liveWrites.contains(con)) {
|
||||
if (!_writeAfterLive.contains(con)) {
|
||||
@ -60,11 +61,12 @@ class Writer {
|
||||
already = true;
|
||||
} else if (!_pendingConnections.contains(con)) {
|
||||
_pendingConnections.add(con);
|
||||
pending = true;
|
||||
}
|
||||
_pendingConnections.notifyAll();
|
||||
}
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("wantsWrite: " + con + " already live? " + already);
|
||||
_log.debug("wantsWrite: " + con + " already live? " + already + " added to pending? " + pending + ": " + source);
|
||||
}
|
||||
public void connectionClosed(NTCPConnection con) {
|
||||
synchronized (_pendingConnections) {
|
||||
@ -87,20 +89,28 @@ class Writer {
|
||||
boolean keepWriting = (con != null) && _writeAfterLive.remove(con);
|
||||
if (keepWriting) {
|
||||
// keep on writing the same one
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Keep writing on the same connection: " + con);
|
||||
} else {
|
||||
_liveWrites.remove(con);
|
||||
con = null;
|
||||
if (_pendingConnections.size() <= 0) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Done writing, but nothing pending, so wait");
|
||||
_pendingConnections.wait();
|
||||
} else {
|
||||
con = (NTCPConnection)_pendingConnections.remove(0);
|
||||
_liveWrites.add(con);
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Switch to writing on: " + con);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException ie) {}
|
||||
if (!_stop && (con != null)) {
|
||||
try {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Prepare next write on: " + con);
|
||||
con.prepareNextWrite();
|
||||
} catch (RuntimeException re) {
|
||||
_log.log(Log.CRIT, "Error in the ntcp writer", re);
|
||||
|
@ -134,6 +134,7 @@ public class EstablishmentManager {
|
||||
}
|
||||
if (msg.getTarget().getNetworkId() != Router.NETWORK_ID) {
|
||||
_context.shitlist().shitlistRouter(msg.getTarget().getIdentity().calculateHash());
|
||||
_transport.markUnreachable(msg.getTarget().getIdentity().calculateHash());
|
||||
_transport.failed(msg, "Remote peer is on the wrong network, cannot establish");
|
||||
return;
|
||||
}
|
||||
@ -146,7 +147,8 @@ public class EstablishmentManager {
|
||||
|
||||
if (!_transport.isValid(to.getIP())) {
|
||||
_transport.failed(msg, "Remote peer's IP isn't valid");
|
||||
_context.shitlist().shitlistRouter(msg.getTarget().getIdentity().calculateHash(), "Invalid SSU address", UDPTransport.STYLE);
|
||||
_transport.markUnreachable(msg.getTarget().getIdentity().calculateHash());
|
||||
//_context.shitlist().shitlistRouter(msg.getTarget().getIdentity().calculateHash(), "Invalid SSU address", UDPTransport.STYLE);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -465,7 +467,7 @@ public class EstablishmentManager {
|
||||
public PublishToNewInbound(PeerState peer) { _peer = peer; }
|
||||
public void timeReached() {
|
||||
Hash peer = _peer.getRemotePeer();
|
||||
if ((peer != null) && (!_context.shitlist().isShitlisted(peer))) {
|
||||
if ((peer != null) && (!_context.shitlist().isShitlisted(peer)) && (!_transport.isUnreachable(peer))) {
|
||||
// ok, we are fine with them, send them our latest info
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Publishing to the peer after confirm plus delay (without shitlist): " + peer.toBase64());
|
||||
@ -942,7 +944,8 @@ public class EstablishmentManager {
|
||||
}
|
||||
|
||||
Hash peer = outboundState.getRemoteIdentity().calculateHash();
|
||||
_context.shitlist().shitlistRouter(peer, err, UDPTransport.STYLE);
|
||||
//_context.shitlist().shitlistRouter(peer, err, UDPTransport.STYLE);
|
||||
_transport.markUnreachable(peer);
|
||||
_transport.dropPeer(peer, false, err);
|
||||
//_context.profileManager().commErrorOccurred(peer);
|
||||
} else {
|
||||
|
@ -1,5 +1,6 @@
|
||||
package net.i2p.router.transport.udp;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.DatagramSocket;
|
||||
import java.net.InetAddress;
|
||||
import java.net.SocketException;
|
||||
@ -18,12 +19,14 @@ public class UDPEndpoint {
|
||||
private UDPTransport _transport;
|
||||
private UDPSender _sender;
|
||||
private UDPReceiver _receiver;
|
||||
private DatagramSocket _socket;
|
||||
private InetAddress _bindAddress;
|
||||
|
||||
public UDPEndpoint(RouterContext ctx, UDPTransport transport, int listenPort) throws SocketException {
|
||||
public UDPEndpoint(RouterContext ctx, UDPTransport transport, int listenPort, InetAddress bindAddress) throws SocketException {
|
||||
_context = ctx;
|
||||
_log = ctx.logManager().getLog(UDPEndpoint.class);
|
||||
_transport = transport;
|
||||
|
||||
_bindAddress = bindAddress;
|
||||
_listenPort = listenPort;
|
||||
}
|
||||
|
||||
@ -32,9 +35,12 @@ public class UDPEndpoint {
|
||||
_log.debug("Starting up the UDP endpoint");
|
||||
shutdown();
|
||||
try {
|
||||
DatagramSocket socket = new DatagramSocket(_listenPort);
|
||||
_sender = new UDPSender(_context, socket, "UDPSend on " + _listenPort);
|
||||
_receiver = new UDPReceiver(_context, _transport, socket, "UDPReceive on " + _listenPort);
|
||||
if (_bindAddress == null)
|
||||
_socket = new DatagramSocket(_listenPort);
|
||||
else
|
||||
_socket = new DatagramSocket(_listenPort, _bindAddress);
|
||||
_sender = new UDPSender(_context, _socket, "UDPSend on " + _listenPort);
|
||||
_receiver = new UDPReceiver(_context, _transport, _socket, "UDPReceive on " + _listenPort);
|
||||
_sender.startup();
|
||||
_receiver.startup();
|
||||
} catch (SocketException se) {
|
||||
@ -48,16 +54,22 @@ public class UDPEndpoint {
|
||||
_sender.shutdown();
|
||||
_receiver.shutdown();
|
||||
}
|
||||
if (_socket != null) {
|
||||
_socket.close();
|
||||
}
|
||||
}
|
||||
|
||||
public void setListenPort(int newPort) { _listenPort = newPort; }
|
||||
public void updateListenPort(int newPort) {
|
||||
if (newPort == _listenPort) return;
|
||||
try {
|
||||
DatagramSocket socket = new DatagramSocket(newPort);
|
||||
_sender.updateListeningPort(socket, newPort);
|
||||
if (_bindAddress == null)
|
||||
_socket = new DatagramSocket(_listenPort);
|
||||
else
|
||||
_socket = new DatagramSocket(_listenPort, _bindAddress);
|
||||
_sender.updateListeningPort(_socket, newPort);
|
||||
// note: this closes the old socket, so call this after the sender!
|
||||
_receiver.updateListeningPort(socket, newPort);
|
||||
_receiver.updateListeningPort(_socket, newPort);
|
||||
_listenPort = newPort;
|
||||
} catch (SocketException se) {
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
|
@ -36,7 +36,7 @@ public class UDPEndpointTest {
|
||||
int base = 2000 + _context.random().nextInt(10000);
|
||||
for (int i = 0; i < numPeers; i++) {
|
||||
_log.debug("Building " + i);
|
||||
UDPEndpoint endpoint = new UDPEndpoint(_context, null, base + i);
|
||||
UDPEndpoint endpoint = new UDPEndpoint(_context, null, base + i, null);
|
||||
_endpoints[i] = endpoint;
|
||||
endpoint.startup();
|
||||
I2PThread read = new I2PThread(new TestRead(endpoint), "Test read " + i);
|
||||
|
@ -108,6 +108,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
|
||||
public static final String PROP_FORCE_INTRODUCERS = "i2np.udp.forceIntroducers";
|
||||
/** do we allow direct SSU connections, sans introducers? */
|
||||
public static final String PROP_ALLOW_DIRECT = "i2np.udp.allowDirect";
|
||||
public static final String PROP_BIND_INTERFACE = "i2np.udp.bindInterface";
|
||||
|
||||
/** how many relays offered to us will we use at a time? */
|
||||
public static final int PUBLIC_RELAY_COUNT = 3;
|
||||
@ -220,8 +221,19 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
|
||||
_log.info("Binding to the explicitly specified external port: " + port);
|
||||
}
|
||||
if (_endpoint == null) {
|
||||
String bindTo = _context.getProperty(PROP_BIND_INTERFACE);
|
||||
InetAddress bindToAddr = null;
|
||||
if (bindTo != null) {
|
||||
try {
|
||||
bindToAddr = InetAddress.getByName(bindTo);
|
||||
} catch (UnknownHostException uhe) {
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
_log.error("Invalid SSU bind interface specified [" + bindTo + "]", uhe);
|
||||
bindToAddr = null;
|
||||
}
|
||||
}
|
||||
try {
|
||||
_endpoint = new UDPEndpoint(_context, this, port);
|
||||
_endpoint = new UDPEndpoint(_context, this, port, bindToAddr);
|
||||
} catch (SocketException se) {
|
||||
if (_log.shouldLog(Log.CRIT))
|
||||
_log.log(Log.CRIT, "Unable to listen on the UDP port (" + port + ")", se);
|
||||
@ -327,7 +339,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
_log.error("The router " + from.toBase64() + " told us we have an invalid IP - "
|
||||
+ RemoteHostId.toString(ourIP) + ". Lets throw tomatoes at them");
|
||||
_context.shitlist().shitlistRouter(from, "They said we had an invalid IP", STYLE);
|
||||
markUnreachable(from);
|
||||
//_context.shitlist().shitlistRouter(from, "They said we had an invalid IP", STYLE);
|
||||
return;
|
||||
} else if (inboundRecent && _externalListenPort > 0 && _externalListenHost != null) {
|
||||
// use OS clock since its an ordering thing, not a time thing
|
||||
@ -533,7 +546,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
|
||||
_log.warn("Peer already connected: old=" + oldPeer + " new=" + peer, new Exception("dup"));
|
||||
|
||||
_activeThrottle.unchoke(peer.getRemotePeer());
|
||||
_context.shitlist().unshitlistRouter(peer.getRemotePeer(), STYLE);
|
||||
markReachable(peer.getRemotePeer());
|
||||
//_context.shitlist().unshitlistRouter(peer.getRemotePeer(), STYLE);
|
||||
|
||||
if (SHOULD_FLOOD_PEERS)
|
||||
_flooder.addPeer(peer);
|
||||
@ -601,7 +615,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
|
||||
SimpleTimer.getInstance().addEvent(new RemoveDropList(remote), DROPLIST_PERIOD);
|
||||
}
|
||||
}
|
||||
_context.shitlist().shitlistRouter(peerHash, "Part of the wrong network", STYLE);
|
||||
markUnreachable(peerHash);
|
||||
//_context.shitlist().shitlistRouter(peerHash, "Part of the wrong network", STYLE);
|
||||
dropPeer(peerHash, false, "wrong network");
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Dropping the peer " + peerHash.toBase64() + " because they are in the wrong net");
|
||||
@ -701,8 +716,10 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
|
||||
if (peer.getRemotePeer() != null) {
|
||||
dropPeerCapacities(peer);
|
||||
|
||||
if (shouldShitlist)
|
||||
_context.shitlist().shitlistRouter(peer.getRemotePeer(), "dropped after too many retries", STYLE);
|
||||
if (shouldShitlist) {
|
||||
markUnreachable(peer.getRemotePeer());
|
||||
//_context.shitlist().shitlistRouter(peer.getRemotePeer(), "dropped after too many retries", STYLE);
|
||||
}
|
||||
long now = _context.clock().now();
|
||||
_context.statManager().addRateData("udp.droppedPeer", now - peer.getLastReceiveTime(), now - peer.getKeyEstablishedTime());
|
||||
synchronized (_peersByIdent) {
|
||||
@ -1630,7 +1647,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
|
||||
buf.append(" [").append(peer.getConsecutiveFailedSends()).append(" failures]");
|
||||
appended = true;
|
||||
}
|
||||
if (_context.shitlist().isShitlisted(peer.getRemotePeer())) {
|
||||
if (_context.shitlist().isShitlisted(peer.getRemotePeer(), STYLE)) {
|
||||
if (!appended) buf.append("<br />");
|
||||
buf.append(" [shitlisted]");
|
||||
appended = true;
|
||||
|
@ -363,6 +363,9 @@ public class FragmentHandler {
|
||||
|
||||
private void receiveComplete(FragmentedMessage msg) {
|
||||
_completed++;
|
||||
String stringified = null;
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
stringified = msg.toString();
|
||||
try {
|
||||
int fragmentCount = msg.getFragmentCount();
|
||||
// toByteArray destroys the contents of the message completely
|
||||
@ -377,11 +380,13 @@ public class FragmentHandler {
|
||||
noteCompletion(m.getUniqueId());
|
||||
_receiver.receiveComplete(m, msg.getTargetRouter(), msg.getTargetTunnel());
|
||||
} catch (IOException ioe) {
|
||||
if (stringified == null) stringified = msg.toString();
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
_log.error("Error receiving fragmented message (corrupt?): " + msg, ioe);
|
||||
_log.error("Error receiving fragmented message (corrupt?): " + stringified, ioe);
|
||||
} catch (I2NPMessageException ime) {
|
||||
if (stringified == null) stringified = msg.toString();
|
||||
if (_log.shouldLog(Log.ERROR))
|
||||
_log.error("Error receiving fragmented message (corrupt?): " + msg, ime);
|
||||
_log.error("Error receiving fragmented message (corrupt?): " + stringified, ime);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -7,13 +7,14 @@ import net.i2p.router.Router;
|
||||
import net.i2p.router.RouterContext;
|
||||
import net.i2p.router.TunnelPoolSettings;
|
||||
import net.i2p.router.networkdb.kademlia.FloodfillNetworkDatabaseFacade;
|
||||
import net.i2p.router.peermanager.PeerProfile;
|
||||
import net.i2p.util.Log;
|
||||
|
||||
/**
|
||||
* Coordinate the selection of peers to go into a tunnel for one particular
|
||||
* pool.
|
||||
*/
|
||||
abstract class TunnelPeerSelector {
|
||||
public abstract class TunnelPeerSelector {
|
||||
/**
|
||||
* Which peers should go into the next tunnel for the given settings?
|
||||
*
|
||||
@ -155,16 +156,20 @@ abstract class TunnelPeerSelector {
|
||||
return rv;
|
||||
} else if (filterSlow(ctx, isInbound, isExploratory)) {
|
||||
Log log = ctx.logManager().getLog(TunnelPeerSelector.class);
|
||||
String excludeCaps = ctx.getProperty("router.excludePeerCaps",
|
||||
String.valueOf(Router.CAPABILITY_BW16));
|
||||
Set peers = new HashSet();
|
||||
if (excludeCaps != null) {
|
||||
char excl[] = excludeCaps.toCharArray();
|
||||
char excl[] = getExcludeCaps(ctx);
|
||||
Set peers = new HashSet(1);
|
||||
if (excl != null) {
|
||||
FloodfillNetworkDatabaseFacade fac = (FloodfillNetworkDatabaseFacade)ctx.netDb();
|
||||
List known = fac.getKnownRouterData();
|
||||
if (known != null) {
|
||||
for (int i = 0; i < known.size(); i++) {
|
||||
RouterInfo peer = (RouterInfo)known.get(i);
|
||||
boolean shouldExclude = shouldExclude(ctx, log, peer, excl);
|
||||
if (shouldExclude) {
|
||||
peers.add(peer.getIdentity().calculateHash());
|
||||
continue;
|
||||
}
|
||||
/*
|
||||
String cap = peer.getCapabilities();
|
||||
if (cap == null) {
|
||||
peers.add(peer.getIdentity().calculateHash());
|
||||
@ -247,6 +252,7 @@ abstract class TunnelPeerSelector {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
*/
|
||||
}
|
||||
}
|
||||
/*
|
||||
@ -264,6 +270,102 @@ abstract class TunnelPeerSelector {
|
||||
}
|
||||
}
|
||||
|
||||
public static boolean shouldExclude(RouterContext ctx, RouterInfo peer) {
|
||||
Log log = ctx.logManager().getLog(TunnelPeerSelector.class);
|
||||
return shouldExclude(ctx, log, peer, getExcludeCaps(ctx));
|
||||
}
|
||||
|
||||
private static char[] getExcludeCaps(RouterContext ctx) {
|
||||
String excludeCaps = ctx.getProperty("router.excludePeerCaps",
|
||||
String.valueOf(Router.CAPABILITY_BW16));
|
||||
Set peers = new HashSet();
|
||||
if (excludeCaps != null) {
|
||||
char excl[] = excludeCaps.toCharArray();
|
||||
return excl;
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private static final long DONT_EXCLUDE_PERIOD = 15*60*1000;
|
||||
private static boolean shouldExclude(RouterContext ctx, Log log, RouterInfo peer, char excl[]) {
|
||||
String cap = peer.getCapabilities();
|
||||
if (cap == null) {
|
||||
return true;
|
||||
}
|
||||
for (int j = 0; j < excl.length; j++) {
|
||||
if (cap.indexOf(excl[j]) >= 0) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
int maxLen = 0;
|
||||
if (cap.indexOf(FloodfillNetworkDatabaseFacade.CAPACITY_FLOODFILL) >= 0)
|
||||
maxLen++;
|
||||
if (cap.indexOf(Router.CAPABILITY_REACHABLE) >= 0)
|
||||
maxLen++;
|
||||
if (cap.indexOf(Router.CAPABILITY_UNREACHABLE) >= 0)
|
||||
maxLen++;
|
||||
if (cap.length() <= maxLen)
|
||||
return true;
|
||||
// otherwise, it contains flags we aren't trying to focus on,
|
||||
// so don't exclude it based on published capacity
|
||||
|
||||
String val = peer.getOption("stat_uptime");
|
||||
if (val != null) {
|
||||
long uptimeMs = 0;
|
||||
if (val != null) {
|
||||
long factor = 1;
|
||||
if (val.endsWith("ms")) {
|
||||
factor = 1;
|
||||
val = val.substring(0, val.length()-2);
|
||||
} else if (val.endsWith("s")) {
|
||||
factor = 1000l;
|
||||
val = val.substring(0, val.length()-1);
|
||||
} else if (val.endsWith("m")) {
|
||||
factor = 60*1000l;
|
||||
val = val.substring(0, val.length()-1);
|
||||
} else if (val.endsWith("h")) {
|
||||
factor = 60*60*1000l;
|
||||
val = val.substring(0, val.length()-1);
|
||||
} else if (val.endsWith("d")) {
|
||||
factor = 24*60*60*1000l;
|
||||
val = val.substring(0, val.length()-1);
|
||||
}
|
||||
try { uptimeMs = Long.parseLong(val); } catch (NumberFormatException nfe) {}
|
||||
uptimeMs *= factor;
|
||||
} else {
|
||||
// not publishing an uptime, so exclude it
|
||||
return true;
|
||||
}
|
||||
|
||||
long infoAge = ctx.clock().now() - peer.getPublished();
|
||||
if (infoAge < 0) {
|
||||
return false;
|
||||
} else if (infoAge > 5*24*60*60*1000) {
|
||||
// Only exclude long-unseen peers if we haven't just started up
|
||||
if (ctx.router().getUptime() < DONT_EXCLUDE_PERIOD) {
|
||||
if (log.shouldLog(Log.DEBUG))
|
||||
log.debug("Not excluding a long-unseen peer, since we just started up.");
|
||||
return false;
|
||||
} else {
|
||||
if (log.shouldLog(Log.DEBUG))
|
||||
log.debug("Excluding a long-unseen peer.");
|
||||
return true;
|
||||
}
|
||||
} else {
|
||||
if ( (infoAge + uptimeMs < 2*60*60*1000) && (ctx.router().getUptime() > DONT_EXCLUDE_PERIOD) ) {
|
||||
// up for less than 2 hours, so exclude it
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// not publishing stats, so exclude it
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
private static final String PROP_OUTBOUND_EXPLORATORY_EXCLUDE_UNREACHABLE = "router.outboundExploratoryExcludeUnreachable";
|
||||
private static final String PROP_OUTBOUND_CLIENT_EXCLUDE_UNREACHABLE = "router.outboundClientExcludeUnreachable";
|
||||
private static final String PROP_INBOUND_EXPLORATORY_EXCLUDE_UNREACHABLE = "router.inboundExploratoryExcludeUnreachable";
|
||||
|
Reference in New Issue
Block a user