- More optimizations in recvEncrypted()
    - More efficient XOR
    - Reduce bandwidth stat update frequency
    - Check for repeated zero-length reads
This commit is contained in:
zzz
2011-11-23 23:36:37 +00:00
parent f69f06b038
commit f630d2dd27
5 changed files with 175 additions and 48 deletions

View File

@ -1,3 +1,13 @@
2011-11-23 zzz
* CryptixAESEngine: Fix bogus bounds checks
* NTCP:
- More optimizations in recvEncrypted()
- More efficient XOR
- Reduce bandwidth stat update frequency
- Check for repeated zero-length reads
* RandomSource: Add new method getBytes(buf, offset, length)
* Tunnel encryption: More efficient XOR
2011-11-21 zzz
* NTCP Pumper:
- Ensure failsafe pumper code gets run on schedule

View File

@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 6;
public final static long BUILD = 7;
/** for example "-test" */
public final static String EXTRA = "";

View File

@ -101,6 +101,8 @@ class EventPumper implements Runnable {
_context.statManager().createRateStat("ntcp.pumperKeySetSize", "", "ntcp", new long[] {10*60*1000} );
_context.statManager().createRateStat("ntcp.pumperKeysPerLoop", "", "ntcp", new long[] {10*60*1000} );
_context.statManager().createRateStat("ntcp.pumperLoopsPerSecond", "", "ntcp", new long[] {10*60*1000} );
_context.statManager().createRateStat("ntcp.zeroRead", "", "ntcp", new long[] {10*60*1000} );
_context.statManager().createRateStat("ntcp.zeroReadDrop", "", "ntcp", new long[] {10*60*1000} );
}
public synchronized void startPumping() {
@ -561,7 +563,21 @@ class EventPumper implements Runnable {
// stay interested
//key.interestOps(key.interestOps() | SelectionKey.OP_READ);
releaseBuf(buf);
// workaround for channel stuck returning 0 all the time, causing 100% CPU
int consec = con.gotZeroRead();
if (consec >= 5) {
_context.statManager().addRateData("ntcp.zeroReadDrop", 1);
if (_log.shouldLog(Log.WARN))
_log.warn("Fail safe zero read close " + con);
con.close();
} else {
_context.statManager().addRateData("ntcp.zeroRead", consec);
if (_log.shouldLog(Log.INFO))
_log.info("nothing to read for " + con + ", but stay interested");
}
} else if (read > 0) {
// clear counter for workaround above
con.clearZeroRead();
// ZERO COPY. The buffer will be returned in Reader.processRead()
buf.flip();
FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestInbound(read, "NTCP read"); //con, buf);

View File

@ -106,15 +106,26 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
private long _messagesWritten;
private long _lastSendTime;
private long _lastReceiveTime;
private long _lastRateUpdated;
private final long _created;
private long _nextMetaTime;
private int _consecutiveZeroReads;
private static final int BLOCK_SIZE = 16;
private static final int META_SIZE = BLOCK_SIZE;
/** unencrypted outbound metadata buffer */
private final byte _meta[] = new byte[16];
private final byte _meta[] = new byte[META_SIZE];
private boolean _sendingMeta;
/** how many consecutive sends were failed due to (estimated) send queue time */
private int _consecutiveBacklog;
private long _nextInfoTime;
/*
* Update frequency for send/recv rates in console peers page
*/
private static final long STAT_UPDATE_TIME_MS = 30*1000;
private static final int META_FREQUENCY = 10*60*1000;
/** how often we send our routerinfo unsolicited */
private static final int INFO_FREQUENCY = 90*60*1000;
@ -144,7 +155,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
// TODO possible switch to CLQ but beware non-constant size() - see below
_outbound = new LinkedBlockingQueue();
_isInbound = true;
_decryptBlockBuf = new byte[16];
_decryptBlockBuf = new byte[BLOCK_SIZE];
_curReadState = new ReadState();
_establishState = new EstablishState(ctx, transport, this);
_conKey = key;
@ -169,7 +180,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
// TODO possible switch to CLQ but beware non-constant size() - see below
_outbound = new LinkedBlockingQueue();
_isInbound = false;
_decryptBlockBuf = new byte[16];
_decryptBlockBuf = new byte[BLOCK_SIZE];
_curReadState = new ReadState();
initialize();
}
@ -177,8 +188,9 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
private void initialize() {
_lastSendTime = _created;
_lastReceiveTime = _created;
_curReadBlock = new byte[16];
_prevReadBlock = new byte[16];
_lastRateUpdated = _created;
_curReadBlock = new byte[BLOCK_SIZE];
_prevReadBlock = new byte[BLOCK_SIZE];
_transport.establishing(this);
}
@ -200,9 +212,9 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
_sessionKey = key;
_clockSkew = clockSkew;
_prevWriteEnd = prevWriteEnd;
System.arraycopy(prevReadEnd, prevReadEnd.length-16, _prevReadBlock, 0, _prevReadBlock.length);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Inbound established, prevWriteEnd: " + Base64.encode(prevWriteEnd) + " prevReadEnd: " + Base64.encode(prevReadEnd));
System.arraycopy(prevReadEnd, prevReadEnd.length - BLOCK_SIZE, _prevReadBlock, 0, BLOCK_SIZE);
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Inbound established, prevWriteEnd: " + Base64.encode(prevWriteEnd) + " prevReadEnd: " + Base64.encode(prevReadEnd));
_established = true;
_establishedOn = System.currentTimeMillis();
_transport.inboundEstablished(this);
@ -241,7 +253,24 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
public long getTimeSinceCreated() { return System.currentTimeMillis()-_created; }
public int getConsecutiveBacklog() { return _consecutiveBacklog; }
/**
* workaround for EventPumper
* @since 0.8.12
*/
public void clearZeroRead() {
_consecutiveZeroReads = 0;
}
/**
* workaround for EventPumper
* @return value after incrementing
* @since 0.8.12
*/
public int gotZeroRead() {
return ++_consecutiveZeroReads;
}
public boolean isClosed() { return _closed; }
public void close() { close(false); }
public void close(boolean allowRequeue) {
@ -441,7 +470,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
_sessionKey = key;
_clockSkew = clockSkew;
_prevWriteEnd = prevWriteEnd;
System.arraycopy(prevReadEnd, prevReadEnd.length-16, _prevReadBlock, 0, _prevReadBlock.length);
System.arraycopy(prevReadEnd, prevReadEnd.length - BLOCK_SIZE, _prevReadBlock, 0, BLOCK_SIZE);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Outbound established, prevWriteEnd: " + Base64.encode(prevWriteEnd) + " prevReadEnd: " + Base64.encode(prevReadEnd));
@ -595,8 +624,8 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
*
*/
synchronized void prepareNextWriteFast() {
if (_log.shouldLog(Log.DEBUG))
_log.debug("prepare next write w/ isInbound? " + _isInbound + " established? " + _established);
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("prepare next write w/ isInbound? " + _isInbound + " established? " + _established);
if (!_isInbound && !_established) {
if (_establishState == null) {
_establishState = new EstablishState(_context, _transport, this);
@ -715,13 +744,12 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
if (rem > 0)
padding = 16 - rem;
buf.padLength = padding;
buf.unencryptedLength = min+padding;
DataHelper.toLong(buf.unencrypted, 0, 2, sz);
System.arraycopy(buf.base, 0, buf.unencrypted, 2, buf.baseLength);
if (padding > 0)
_context.random().nextBytes(buf.pad); // maybe more than necessary, but its only the prng
System.arraycopy(buf.pad, 0, buf.unencrypted, 2+sz, buf.padLength);
if (padding > 0) {
_context.random().nextBytes(buf.unencrypted, 2+sz, padding);
}
//long serialized = System.currentTimeMillis();
buf.crc.update(buf.unencrypted, 0, buf.unencryptedLength-4);
@ -765,22 +793,18 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
int unencryptedLength;
final byte base[];
int baseLength;
final byte pad[];
int padLength;
final Adler32 crc;
byte encrypted[];
PrepBuffer() {
unencrypted = new byte[BUFFER_SIZE];
base = new byte[BUFFER_SIZE];
pad = new byte[16];
crc = new Adler32();
}
private void init() {
unencryptedLength = 0;
baseLength = 0;
padLength = 0;
encrypted = null;
crc.reset();
}
@ -948,20 +972,24 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
private long _lastBytesReceived;
/** _bytesSent when we last updated the rate */
private long _lastBytesSent;
private long _lastRateUpdated;
private float _sendBps;
private float _recvBps;
private float _sendBps15s;
private float _recvBps15s;
//private float _sendBps15s;
//private float _recvBps15s;
public float getSendRate() { return _sendBps15s; }
public float getRecvRate() { return _recvBps15s; }
public float getSendRate() { return _sendBps; }
public float getRecvRate() { return _recvBps; }
/**
* Stats only for console
*/
private void updateStats() {
long now = System.currentTimeMillis();
long time = now - _lastRateUpdated;
// If at least one second has passed
if (time >= 1000) {
// If enough time has passed...
// Perhaps should synchronize, but if so do the time check before synching...
// only for console so don't bother....
if (time >= STAT_UPDATE_TIME_MS) {
long totS = _bytesSent;
long totR = _bytesReceived;
long sent = totS - _lastBytesSent; // How much we sent meanwhile
@ -976,14 +1004,14 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
// Maintain an approximate average with a 15-second halflife
// Weights (0.955 and 0.045) are tuned so that transition between two values (e.g. 0..10)
// would reach their midpoint (e.g. 5) in 15s
_sendBps15s = (0.955f)*_sendBps15s + (0.045f)*((float)sent*1000f)/(float)time;
_recvBps15s = (0.955f)*_recvBps15s + (0.045f)*((float)recv*1000)/(float)time;
//_sendBps15s = (0.955f)*_sendBps15s + (0.045f)*((float)sent*1000f)/(float)time;
//_recvBps15s = (0.955f)*_recvBps15s + (0.045f)*((float)recv*1000)/(float)time;
if (_log.shouldLog(Log.DEBUG))
_log.debug("Rates updated to "
+ _sendBps + "/" + _recvBps + "Bps in/out ("
+ _sendBps15s + "/" + _recvBps15s + "Bps in/out 15s) after "
+ sent + "/" + recv + " in " + time);
+ _sendBps + '/' + _recvBps + "Bps in/out "
//+ _sendBps15s + "/" + _recvBps15s + "Bps in/out 15s after "
+ sent + '/' + recv + " in " + DataHelper.formatDuration(time));
}
}
@ -1003,17 +1031,30 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
synchronized void recvEncryptedI2NP(ByteBuffer buf) {
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("receive encrypted i2np: " + buf.remaining());
// hasArray() is false for direct buffers, at least on my system...
if (_curReadBlockIndex == 0 && buf.hasArray()) {
// fast way
int tot = buf.remaining();
if (tot >= 32 && tot % 16 == 0) {
recvEncryptedFast(buf);
return;
}
}
while (buf.hasRemaining() && !_closed) {
int want = Math.min(buf.remaining(), _curReadBlock.length-_curReadBlockIndex);
int want = Math.min(buf.remaining(), BLOCK_SIZE - _curReadBlockIndex);
if (want > 0) {
buf.get(_curReadBlock, _curReadBlockIndex, want);
_curReadBlockIndex += want;
}
//_curReadBlock[_curReadBlockIndex++] = buf.get();
if (_curReadBlockIndex >= _curReadBlock.length) {
if (_curReadBlockIndex >= BLOCK_SIZE) {
// cbc
_context.aes().decryptBlock(_curReadBlock, 0, _sessionKey, _decryptBlockBuf, 0);
DataHelper.xor(_decryptBlockBuf, 0, _prevReadBlock, 0, _decryptBlockBuf, 0, _decryptBlockBuf.length);
//DataHelper.xor(_decryptBlockBuf, 0, _prevReadBlock, 0, _decryptBlockBuf, 0, BLOCK_SIZE);
for (int i = 0; i < BLOCK_SIZE; i++) {
_decryptBlockBuf[i] ^= _prevReadBlock[i];
}
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("parse decrypted i2np block (remaining: " + buf.remaining() + ")");
boolean ok = recvUnencryptedI2NP();
@ -1029,6 +1070,51 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
}
}
}
/**
* Decrypt directly out of the ByteBuffer instead of copying the bytes
* 16 at a time to the _curReadBlock / _prevReadBlock flip buffers.
*
* More efficient but can only be used if buf.hasArray == true AND
* _curReadBlockIndex must be 0 and buf.getRemaining() % 16 must be 0
* and buf.getRemaining() must be >= 16.
* All this is true for most buffers.
* In theory this could be fixed up to handle the other cases too but that's hard.
* Caller must synchronize!
* @since 0.8.12
*/
private void recvEncryptedFast(ByteBuffer buf) {
byte[] array = buf.array();
int pos = buf.arrayOffset();
int end = pos + buf.remaining();
boolean first = true;
for ( ; pos < end && !_closed; pos += BLOCK_SIZE) {
_context.aes().decryptBlock(array, pos, _sessionKey, _decryptBlockBuf, 0);
if (first) {
// XOR with _prevReadBlock the first time...
//DataHelper.xor(_decryptBlockBuf, 0, _prevReadBlock, 0, _decryptBlockBuf, 0, BLOCK_SIZE);
for (int i = 0; i < BLOCK_SIZE; i++) {
_decryptBlockBuf[i] ^= _prevReadBlock[i];
}
first = false;
} else {
//DataHelper.xor(_decryptBlockBuf, 0, array, pos - BLOCK_SIZE, _decryptBlockBuf, 0, BLOCK_SIZE);
int start = pos - BLOCK_SIZE;
for (int i = 0; i < BLOCK_SIZE; i++) {
_decryptBlockBuf[i] ^= array[start + i];
}
}
boolean ok = recvUnencryptedI2NP();
if (!ok) {
_log.error("Read buffer " + System.identityHashCode(buf) + " contained corrupt data");
_context.statManager().addRateData("ntcp.corruptDecryptedI2NP", 1);
return;
}
}
// ...and copy to _prevReadBlock the last time
System.arraycopy(array, end - BLOCK_SIZE, _prevReadBlock, 0, BLOCK_SIZE);
}
/**
* Append the next 16 bytes of cleartext to the read state.
@ -1038,6 +1124,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
*/
private boolean recvUnencryptedI2NP() {
_curReadState.receiveBlock(_decryptBlockBuf);
// FIXME move check to ReadState; must we close? possible attack vector?
if (_curReadState.getSize() > BUFFER_SIZE) {
_log.error("I2NP message too big - size: " + _curReadState.getSize() + " Dropping " + toString());
_context.statManager().addRateData("ntcp.corruptTooLargeI2NP", _curReadState.getSize());
@ -1087,12 +1174,23 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
}
}
/**
* One special case is a metadata message where the sizeof(data) is 0. In
* that case, the unencrypted message is encoded as:
*<pre>
* +-------+-------+-------+-------+-------+-------+-------+-------+
* | 0 | timestamp in seconds | uninterpreted
* +-------+-------+-------+-------+-------+-------+-------+-------+
* uninterpreted | adler checksum of sz+data+pad |
* +-------+-------+-------+-------+-------+-------+-------+-------+
*</pre>
*/
private void sendMeta() {
byte encrypted[] = new byte[_meta.length];
synchronized (_meta) {
_context.random().nextBytes(_meta); // randomize the uninterpreted, then overwrite w/ data
DataHelper.toLong(_meta, 0, 2, 0);
DataHelper.toLong(_meta, 2, 4, (_context.clock().now() + 500) / 1000);
_context.random().nextBytes(_meta, 6, 6);
Adler32 crc = new Adler32();
crc.update(_meta, 0, _meta.length-4);
DataHelper.toLong(_meta, _meta.length-4, 4, crc.getValue());
@ -1234,13 +1332,12 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
/** @param buf 16 bytes */
private void receiveInitial(byte buf[]) {
_stateBegin = System.currentTimeMillis();
_size = (int)DataHelper.fromLong(buf, 0, 2);
if (_size == 0) {
readMeta(buf);
init();
return;
} else {
_stateBegin = System.currentTimeMillis();
_dataBuf = acquireReadBuf();
System.arraycopy(buf, 2, _dataBuf.data, 0, buf.length-2);
_nextWrite += buf.length-2;
@ -1262,6 +1359,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
remaining -= blockUsed;
}
if ( (remaining <= 0) && (buf.length-blockUsed < 4) ) {
// we've received all the data but not the 4-byte checksum
if (_log.shouldLog(Log.DEBUG))
_log.debug("crc wraparound required on block " + _blocks + " in message " + _messagesRead);
_crc.update(buf);
@ -1284,8 +1382,8 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
_expectedCrc = DataHelper.fromLong(buf, buf.length-4, 4);
_crc.update(buf, 0, buf.length-4);
long val = _crc.getValue();
if (_log.shouldLog(Log.DEBUG))
_log.debug("CRC value computed: " + val + " expected: " + _expectedCrc + " size: " + _size);
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("CRC value computed: " + val + " expected: " + _expectedCrc + " size: " + _size);
if (val == _expectedCrc) {
try {
I2NPMessageHandler h = acquireHandler(_context);
@ -1329,6 +1427,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
if (_log.shouldLog(Log.WARN))
_log.warn("Error parsing I2NP message", ime);
_context.statManager().addRateData("ntcp.corruptI2NPIME", 1);
// FIXME don't close the con, possible attack vector?
close();
// handler and databuf are lost
return;
@ -1337,7 +1436,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
if (_log.shouldLog(Log.WARN))
_log.warn("CRC incorrect for message " + _messagesRead + " (calc=" + val + " expected=" + _expectedCrc + ") size=" + _size + " blocks " + _blocks);
_context.statManager().addRateData("ntcp.corruptI2NPCRC", 1);
// FIXME should we try to read in the message and keep going?
// FIXME don't close the con, possible attack vector?
close();
// databuf is lost
return;

View File

@ -756,16 +756,18 @@ public class NTCPTransport extends TransportImpl {
buf.append(DataHelper.formatDuration2(con.getTimeSinceReceive()));
buf.append(THINSP).append(DataHelper.formatDuration2(con.getTimeSinceSend()));
buf.append("</td><td class=\"cells\" align=\"right\">");
if (con.getTimeSinceReceive() < 10*1000) {
buf.append(formatRate(con.getRecvRate()/1024));
bpsRecv += con.getRecvRate();
if (con.getTimeSinceReceive() < 2*60*1000) {
float r = con.getRecvRate();
buf.append(formatRate(r / 1024));
bpsRecv += r;
} else {
buf.append(formatRate(0));
}
buf.append(THINSP);
if (con.getTimeSinceSend() < 10*1000) {
buf.append(formatRate(con.getSendRate()/1024));
bpsSend += con.getSendRate();
if (con.getTimeSinceSend() < 2*60*1000) {
float r = con.getSendRate();
buf.append(formatRate(r / 1024));
bpsSend += r;
} else {
buf.append(formatRate(0));
}