* UDP Inbound:

- Hopefully fix race NPE, thx dream
    - Catch some more fragment errors
    - Exception and log tweaks
    - Cleanups and javadocs
This commit is contained in:
zzz
2011-09-03 17:44:23 +00:00
parent 7b974e7e0b
commit 6364f142ff
9 changed files with 109 additions and 26 deletions

View File

@ -1,3 +1,11 @@
2011-09-03 zzz
* i2psnark: Fix "eject" button in certain browsers (ticket #511)
* UDP Inbound:
- Hopefully fix race NPE, thx dream
- Catch some more fragment errors
- Exception and log tweaks
- Cleanups and javadocs
2011-09-02 zzz
* Console: Cache user-agent processing
* NetDB: Hopefully fix ISJ deadlock, thx dream

View File

@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 9;
public final static long BUILD = 10;
/** for example "-test" */
public final static String EXTRA = "";

View File

@ -27,8 +27,8 @@ import net.i2p.util.Log;
*
*/
class EventPumper implements Runnable {
private RouterContext _context;
private Log _log;
private final RouterContext _context;
private final Log _log;
private volatile boolean _alive;
private Selector _selector;
private final LinkedBlockingQueue<ByteBuffer> _bufCache;
@ -36,7 +36,7 @@ class EventPumper implements Runnable {
private final LinkedBlockingQueue<NTCPConnection> _wantsWrite = new LinkedBlockingQueue<NTCPConnection>();
private final LinkedBlockingQueue<ServerSocketChannel> _wantsRegister = new LinkedBlockingQueue<ServerSocketChannel>();
private final LinkedBlockingQueue<NTCPConnection> _wantsConRegister = new LinkedBlockingQueue<NTCPConnection>();
private NTCPTransport _transport;
private final NTCPTransport _transport;
private long _expireIdleWriteTime;
private static final int BUF_SIZE = 8*1024;

View File

@ -963,8 +963,8 @@ class EstablishmentManager {
private void processExpired(OutboundEstablishState outboundState) {
if (outboundState.getState() != OutboundEstablishState.STATE_CONFIRMED_COMPLETELY) {
if (_log.shouldLog(Log.WARN))
_log.warn("Lifetime of expired outbound establish: " + outboundState.getLifetime());
if (_log.shouldLog(Log.INFO))
_log.info("Lifetime of expired outbound establish: " + outboundState.getLifetime());
while (true) {
OutNetMessage msg = outboundState.getNextQueuedMessage();
if (msg == null)

View File

@ -122,29 +122,33 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
boolean partialACK = false;
synchronized (messages) {
boolean isNew = false;
state = messages.get(messageId);
if (state == null) {
state = new InboundMessageState(_context, mid, fromPeer);
messages.put(messageId, state);
isNew = true;
// we will add to messages shortly if it isn't complete
}
fragmentOK = state.receiveFragment(data, i);
if (state.isComplete()) {
messageComplete = true;
messages.remove(messageId);
if (!isNew)
messages.remove(messageId);
} else if (state.isExpired()) {
messageExpired = true;
messages.remove(messageId);
if (!isNew)
messages.remove(messageId);
} else {
partialACK = true;
if (isNew)
messages.put(messageId, state);
}
}
if (messageComplete) {
_recentlyCompletedMessages.add(mid);
_messageReceiver.receiveMessage(state);
from.messageFullyReceived(messageId, state.getCompleteSize());
_ackSender.ackPeer(from);
@ -154,11 +158,15 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
_context.statManager().addRateData("udp.receivedCompleteTime", state.getLifetime(), state.getLifetime());
if (state.getFragmentCount() > 0)
_context.statManager().addRateData("udp.receivedCompleteFragments", state.getFragmentCount(), state.getLifetime());
// this calls state.releaseResources(), all state access must be before this
_messageReceiver.receiveMessage(state);
} else if (messageExpired) {
state.releaseResources();
if (_log.shouldLog(Log.WARN))
_log.warn("Message expired while only being partially read: " + state);
_context.messageHistory().droppedInboundMessage(state.getMessageId(), state.getFrom(), "expired while partially read: " + state.toString());
// all state access must be before this
state.releaseResources();
} else if (partialACK) {
// not expired but not yet complete... lets queue up a partial ACK
if (_log.shouldLog(Log.DEBUG))
@ -167,6 +175,7 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
_ackSender.ackPeer(from);
}
// TODO: Why give up on other fragments if one is bad?
if (!fragmentOK)
break;
}

View File

@ -8,8 +8,10 @@ import net.i2p.util.ByteCache;
import net.i2p.util.Log;
/**
* Hold the raw data fragments of an inbound message
* Hold the raw data fragments of an inbound message.
*
* Warning - there is no synchronization in this class, take care in
* InboundMessageFragments to avoid use-after-release, etc.
*/
class InboundMessageState {
private final RouterContext _context;
@ -21,8 +23,10 @@ class InboundMessageState {
* received fragments are null.
*/
private final ByteArray _fragments[];
/**
* what is the last fragment in the message (or -1 if not yet known)
* Fragment count is _lastFragment + 1
*/
private int _lastFragment;
private final long _receiveBegin;
@ -49,13 +53,14 @@ class InboundMessageState {
/**
* Read in the data from the fragment.
* Caller should synchronize.
*
* @return true if the data was ok, false if it was corrupt
*/
public boolean receiveFragment(UDPPacketReader.DataReader data, int dataFragment) {
int fragmentNum = data.readMessageFragmentNum(dataFragment);
if ( (fragmentNum < 0) || (fragmentNum > _fragments.length)) {
_log.warn("Invalid fragment " + fragmentNum + "/" + _fragments.length);
if ( (fragmentNum < 0) || (fragmentNum >= MAX_FRAGMENTS)) {
_log.warn("Invalid fragment " + fragmentNum + '/' + MAX_FRAGMENTS);
return false;
}
if (_fragments[fragmentNum] == null) {
@ -67,8 +72,21 @@ class InboundMessageState {
message.setValid(size);
_fragments[fragmentNum] = message;
boolean isLast = data.readMessageIsLast(dataFragment);
if (isLast)
if (isLast) {
// don't allow _lastFragment to be set twice
if (_lastFragment >= 0) {
if (_log.shouldLog(Log.ERROR))
_log.error("Multiple last fragments for message " + _messageId + " from " + _from);
return false;
}
// TODO - check for non-last fragments after this one?
_lastFragment = fragmentNum;
} else if (_lastFragment >= 0 && fragmentNum >= _lastFragment) {
// don't allow non-last after last
if (_log.shouldLog(Log.ERROR))
_log.error("Non-last fragment " + fragmentNum + " when last is " + _lastFragment + " for message " + _messageId + " from " + _from);
return false;
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("New fragment " + fragmentNum + " for message " + _messageId
+ ", size=" + size
@ -87,6 +105,9 @@ class InboundMessageState {
return true;
}
/**
* May not be valid after released
*/
public boolean isComplete() {
if (_lastFragment < 0) return false;
for (int i = 0; i <= _lastFragment; i++)
@ -94,23 +115,40 @@ class InboundMessageState {
return false;
return true;
}
public boolean isExpired() {
return _context.clock().now() > _receiveBegin + MAX_RECEIVE_TIME;
}
public long getLifetime() {
return _context.clock().now() - _receiveBegin;
}
public Hash getFrom() { return _from; }
public long getMessageId() { return _messageId; }
/**
* @throws IllegalStateException if released or not isComplete()
*/
public int getCompleteSize() {
if (_completeSize < 0) {
if (_lastFragment < 0)
throw new IllegalStateException("last fragment not set");
if (_released)
throw new IllegalStateException("SSU IMS 2 Use after free");
int size = 0;
for (int i = 0; i <= _lastFragment; i++)
size += _fragments[i].getValid();
for (int i = 0; i <= _lastFragment; i++) {
ByteArray frag = _fragments[i];
if (frag == null)
throw new IllegalStateException("null fragment " + i + '/' + _lastFragment);
size += frag.getValid();
}
_completeSize = size;
}
return _completeSize;
}
public ACKBitfield createACKBitfield() {
return new PartialBitfield(_messageId, _fragments);
}
@ -154,34 +192,44 @@ class InboundMessageState {
}
public void releaseResources() {
for (int i = 0; i < _fragments.length; i++) {
_released = true;
for (int i = 0; i < MAX_FRAGMENTS; i++) {
if (_fragments[i] != null) {
_fragmentCache.release(_fragments[i]);
_fragments[i] = null;
}
}
_released = true;
}
/**
* @throws IllegalStateException if released
*/
public ByteArray[] getFragments() {
if (_released) {
RuntimeException e = new RuntimeException("Use after free: " + toString());
RuntimeException e = new IllegalStateException("Use after free: " + _messageId);
_log.error("SSU IMS", e);
throw e;
}
return _fragments;
}
public int getFragmentCount() { return _lastFragment+1; }
/**
* May not be valid if released, or may NPE on race with release, use with care in exception text
*/
@Override
public String toString() {
StringBuilder buf = new StringBuilder(256);
buf.append("IB Message: ").append(_messageId);
buf.append(" from ").append(_from.toString());
if (isComplete()) {
buf.append(" completely received with ");
buf.append(getCompleteSize()).append(" bytes");
//buf.append(getCompleteSize()).append(" bytes");
// may display -1 but avoid cascaded exceptions after release
buf.append(_completeSize).append(" bytes");
} else {
for (int i = 0; i < _lastFragment; i++) {
for (int i = 0; i <= _lastFragment; i++) {
buf.append(" fragment ").append(i);
if (_fragments[i] != null)
buf.append(": known at size ").append(_fragments[i].getValid());

View File

@ -89,6 +89,10 @@ class MessageReceiver {
_completeMessages.clear();
}
/**
* This queues the message for processing.
* Processing will call state.releaseResources(), do not access state after calling this.
*/
public void receiveMessage(InboundMessageState state) {
//int total = 0;
//long lag = -1;
@ -120,6 +124,7 @@ class MessageReceiver {
}
if ( (message != null) && (message.isExpired()) ) {
expiredLifetime += message.getLifetime();
// message.releaseResources() ??
message = null;
expired++;
}
@ -160,6 +165,11 @@ class MessageReceiver {
//_cache.release(buf, false);
}
/**
* Assemble all the fragments into an I2NP message.
* This calls state.releaseResources(), do not access state after calling this.
* @return null on error
*/
private I2NPMessage readMessage(ByteArray buf, InboundMessageState state, I2NPMessageHandler handler) {
try {
//byte buf[] = new byte[state.getCompleteSize()];

View File

@ -425,4 +425,10 @@ class OutboundEstablishState {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Got a packet, nextSend == now");
}
/** @since 0.8.9 */
@Override
public String toString() {
return "OES " + _remoteHostId;
}
}

View File

@ -649,15 +649,16 @@ class PeerState {
int rv = 0;
synchronized (_inboundMessages) {
int remaining = _inboundMessages.size();
for (Iterator iter = _inboundMessages.values().iterator(); remaining > 0; remaining--) {
InboundMessageState state = (InboundMessageState)iter.next();
for (Iterator<InboundMessageState> iter = _inboundMessages.values().iterator(); iter.hasNext(); ) {
InboundMessageState state = iter.next();
if (state.isExpired() || _dead) {
iter.remove();
// state.releaseResources() ??
} else {
if (state.isComplete()) {
_log.error("inbound message is complete, but wasn't handled inline? " + state + " with " + this);
iter.remove();
// state.releaseResources() ??
} else {
rv++;
}
@ -841,6 +842,7 @@ class PeerState {
//if (_context instanceof RouterContext)
// ((RouterContext)_context).messageHistory().droppedInboundMessage(state.getMessageId(), state.getFrom(), "expired partially received: " + state.toString());
iter.remove();
// state.releaseResources() ??
} else {
if (!state.isComplete()) {
if (states == null)