diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java index e77d33b0b..c8038fc0a 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java @@ -54,11 +54,11 @@ public class ConnectionPacketHandler { //con.getOptions().setWindowSize(con.getOptions().getWindowSize()/2); if (_log.shouldLog(Log.WARN)) _log.warn("congestion.. dup " + packet); - con.incrementUnackedPacketsReceived(); + //con.incrementUnackedPacketsReceived(); con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay()); } else { if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) { - con.incrementUnackedPacketsReceived(); + //con.incrementUnackedPacketsReceived(); con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay()); } else { if (_log.shouldLog(Log.DEBUG)) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java index 483baf817..9cbd27b92 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java @@ -273,12 +273,12 @@ public class Packet { */ private int writePacket(byte buffer[], int offset, boolean includeSig) throws IllegalStateException { int cur = offset; - if (_sendStreamId != null) + if ( (_sendStreamId != null) && (_sendStreamId.length == 4) ) System.arraycopy(_sendStreamId, 0, buffer, cur, _sendStreamId.length); else System.arraycopy(STREAM_ID_UNKNOWN, 0, buffer, cur, STREAM_ID_UNKNOWN.length); cur += 4; - if (_receiveStreamId != null) + if ( (_receiveStreamId != null) && (_receiveStreamId.length == 4) ) System.arraycopy(_receiveStreamId, 0, buffer, cur, _receiveStreamId.length); else System.arraycopy(STREAM_ID_UNKNOWN, 0, buffer, cur, STREAM_ID_UNKNOWN.length); @@ -354,8 +354,8 @@ public class Packet { */ public int writtenSize() throws IllegalStateException { int size = 0; - size += _sendStreamId.length; - size += _receiveStreamId.length; + size += 4; // _sendStreamId.length; + size += 4; // _receiveStreamId.length; size += 4; // sequenceNum size += 4; // ackThrough if (_nacks != null) { @@ -426,12 +426,13 @@ public class Packet { int optionSize = (int)DataHelper.fromLong(buffer, cur, 2); cur += 2; int payloadBegin = cur + optionSize; + int payloadSize = length - payloadBegin; + if ( (payloadSize < 0) || (payloadSize > MAX_PAYLOAD_SIZE) ) + throw new IllegalArgumentException("length: " + length + " offset: " + offset + " begin: " + payloadBegin); // skip ahead to the payload - _payload = new byte[offset + length - payloadBegin]; - if (_payload.length > MAX_PAYLOAD_SIZE) - throw new IllegalArgumentException("length: " + length + " offset: " + offset + " begin: " + payloadBegin); - System.arraycopy(buffer, payloadBegin, _payload, 0, _payload.length); + _payload = new byte[payloadSize]; + System.arraycopy(buffer, payloadBegin, _payload, 0, payloadSize); // ok now lets go back and deal with the options if (isFlagSet(FLAG_DELAY_REQUESTED)) { @@ -480,7 +481,7 @@ public class Packet { } boolean ok = ctx.dsa().verifySignature(_optionSignature, buffer, 0, size, from.getSigningPublicKey()); if (!ok) { - ctx.logManager().getLog(Packet.class).error("Signature failed with sig " + Base64.encode(_optionSignature.getData()), new Exception("moo")); + ctx.logManager().getLog(Packet.class).error("Signature failed on " + toString(), new Exception("moo")); } return ok; } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java index abf3ca30f..fba030270 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java @@ -173,7 +173,7 @@ public class PacketHandler { } } else { if (_log.shouldLog(Log.DEBUG)) - _log.debug("Packet received on an unknown stream (and not a SYN): " + packet); + _log.debug("Packet received on an unknown stream (and not an ECHO): " + packet); if (sendId == null) { for (Iterator iter = _manager.listConnections().iterator(); iter.hasNext(); ) { Connection con = (Connection)iter.next(); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java index cf861441f..ea02dc64f 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java @@ -56,7 +56,8 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat public boolean shouldSign() { return isFlagSet(FLAG_SIGNATURE_INCLUDED) || isFlagSet(FLAG_SYNCHRONIZE) || - isFlagSet(FLAG_CLOSE); + isFlagSet(FLAG_CLOSE) || + isFlagSet(FLAG_ECHO); } /** last minute update of ack fields, just before write/sign */ diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java index 3bc9c4370..f01da61d3 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java @@ -1,5 +1,6 @@ package net.i2p.client.streaming; +import java.util.Arrays; import java.util.Set; import java.util.HashSet; @@ -30,11 +31,6 @@ class PacketQueue { */ public void enqueue(PacketLocal packet) { packet.prepare(); - int size = 0; - if (packet.shouldSign()) - size = packet.writeSignedPacket(_buf, 0, _context, _session.getPrivateKey()); - else - size = packet.writePacket(_buf, 0); SessionKey keyUsed = packet.getKeyUsed(); if (keyUsed == null) @@ -42,42 +38,57 @@ class PacketQueue { Set tagsSent = packet.getTagsSent(); if (tagsSent == null) tagsSent = new HashSet(); + + // cache this from before sendMessage + String conStr = (packet.getConnection() != null ? packet.getConnection().toString() : ""); + if (packet.getAckTime() > 0) { + _log.debug("Not resending " + packet); + return; + } else { + _log.debug("Sending... " + packet); + } + + long begin = 0; + long end = 0; + boolean sent = false; try { - // cache this from before sendMessage - String conStr = (packet.getConnection() != null ? packet.getConnection().toString() : ""); - if (packet.getAckTime() > 0) { - _log.debug("Not resending " + packet); - return; - } else { - _log.debug("Sending... " + packet); - } - // this should not block! - long begin = _context.clock().now(); - boolean sent = _session.sendMessage(packet.getTo(), _buf, 0, size, keyUsed, tagsSent); - long end = _context.clock().now(); - if (!sent) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Send failed for " + packet); - packet.getConnection().disconnect(false); - } else { - packet.setKeyUsed(keyUsed); - packet.setTagsSent(tagsSent); - packet.incrementSends(); - if (_log.shouldLog(Log.DEBUG)) { - String msg = "SEND " + packet + (tagsSent.size() > 0 - ? " with " + tagsSent.size() + " tags" - : "") - + " send # " + packet.getNumSends() - + " sendTime: " + (end-begin) - + " con: " + conStr; - _log.debug(msg); - } - PacketHandler.displayPacket(packet, "SEND"); + synchronized (this) { + Arrays.fill(_buf, (byte)0x0); + int size = 0; + if (packet.shouldSign()) + size = packet.writeSignedPacket(_buf, 0, _context, _session.getPrivateKey()); + else + size = packet.writePacket(_buf, 0); + + // this should not block! + begin = _context.clock().now(); + sent = _session.sendMessage(packet.getTo(), _buf, 0, size, keyUsed, tagsSent); + end = _context.clock().now(); } } catch (I2PSessionException ise) { if (_log.shouldLog(Log.WARN)) _log.warn("Unable to send the packet " + packet, ise); } + + if (!sent) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Send failed for " + packet); + packet.getConnection().disconnect(false); + } else { + packet.setKeyUsed(keyUsed); + packet.setTagsSent(tagsSent); + packet.incrementSends(); + if (_log.shouldLog(Log.DEBUG)) { + String msg = "SEND " + packet + (tagsSent.size() > 0 + ? " with " + tagsSent.size() + " tags" + : "") + + " send # " + packet.getNumSends() + + " sendTime: " + (end-begin) + + " con: " + conStr; + _log.debug(msg); + } + PacketHandler.displayPacket(packet, "SEND"); + } } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerClosing.java b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerClosing.java index f77325c50..0c17f7b61 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerClosing.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerClosing.java @@ -46,8 +46,10 @@ class SchedulerClosing extends SchedulerImpl { con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay()); long remaining = con.getNextSendTime() - _context.clock().now(); if (remaining <= 0) { - con.sendAvailable(); - con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay()); + if (con.getCloseSentOn() <= 0) { + con.sendAvailable(); + con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay()); + } } else { //if (remaining < 5*1000) // remaining = 5*1000; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerConnectedBulk.java b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerConnectedBulk.java index e5229c709..a7616da5f 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerConnectedBulk.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerConnectedBulk.java @@ -43,9 +43,9 @@ class SchedulerConnectedBulk extends SchedulerImpl { (!con.getResetReceived()) && ( (con.getCloseSentOn() <= 0) || (con.getCloseReceivedOn() <= 0) ); if (!ok) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("con: " + con + " closeSentOn: " + con.getCloseSentOn() - + " closeReceivedOn: " + con.getCloseReceivedOn()); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("con: " + con + " closeSentOn: " + con.getCloseSentOn() + // + " closeReceivedOn: " + con.getCloseReceivedOn()); } return ok; }