Notes on STREAM STATUS messages when SILENT=true

Fix one message for STREAM CONNECT that wasn't honoring SILENT setting
PING failure sends a SESSION STATUS message
Implement ping/pong in client
Delay at end of client send so data gets through in v3 mode
log tweaks
Exception catch tweaks
This commit is contained in:
zzz
2015-11-26 16:40:45 +00:00
parent e5f186f61a
commit 9367aca50a
8 changed files with 72 additions and 23 deletions

View File

@@ -158,6 +158,8 @@ abstract class SAMHandler implements Runnable, Handler {
* unregister with the bridge. * unregister with the bridge.
*/ */
public void stopHandling() { public void stopHandling() {
if (_log.shouldInfo())
_log.info("Stopping: " + this, new Exception("I did it"));
synchronized (stopLock) { synchronized (stopLock) {
stopHandler = true; stopHandler = true;
} }

View File

@@ -56,7 +56,7 @@ class SAMHandlerFactory {
throw new SAMException("Timeout waiting for HELLO VERSION", e); throw new SAMException("Timeout waiting for HELLO VERSION", e);
} catch (IOException e) { } catch (IOException e) {
throw new SAMException("Error reading from socket", e); throw new SAMException("Error reading from socket", e);
} catch (Exception e) { } catch (RuntimeException e) {
throw new SAMException("Unexpected error", e); throw new SAMException("Unexpected error", e);
} }

View File

@@ -187,7 +187,9 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
} catch (IOException e) { } catch (IOException e) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Caught IOException for message [" + msg + "]", e); _log.debug("Caught IOException for message [" + msg + "]", e);
} catch (Exception e) { } catch (SAMException e) {
_log.error("Unexpected exception for message [" + msg + "]", e);
} catch (RuntimeException e) {
_log.error("Unexpected exception for message [" + msg + "]", e); _log.error("Unexpected exception for message [" + msg + "]", e);
} finally { } finally {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))

View File

@@ -294,7 +294,7 @@ class SAMv3Handler extends SAMv1Handler
if (now - _lastPing >= READ_TIMEOUT) { if (now - _lastPing >= READ_TIMEOUT) {
if (_log.shouldWarn()) if (_log.shouldWarn())
_log.warn("Failed to respond to PING"); _log.warn("Failed to respond to PING");
writeString("PING STATUS RESULT=I2P_ERROR MESSAGE=\"PONG timeout\"\n"); writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"PONG timeout\"\n");
break; break;
} }
} else { } else {
@@ -309,7 +309,7 @@ class SAMv3Handler extends SAMv1Handler
if (now - _lastPing >= 2*READ_TIMEOUT) { if (now - _lastPing >= 2*READ_TIMEOUT) {
if (_log.shouldWarn()) if (_log.shouldWarn())
_log.warn("Failed to respond to PING"); _log.warn("Failed to respond to PING");
writeString("PING STATUS RESULT=I2P_ERROR MESSAGE=\"PONG timeout\"\n"); writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"PONG timeout\"\n");
break; break;
} }
} else if (_lastPing < 0) { } else if (_lastPing < 0) {
@@ -420,9 +420,11 @@ class SAMv3Handler extends SAMv1Handler
} // while } // while
} catch (IOException e) { } catch (IOException e) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Caught IOException for message [" + msg + "]", e); _log.debug("Caught IOException in handler", e);
} catch (Exception e) { } catch (SAMException e) {
_log.error("Unexpected exception for message [" + msg + "]", e); _log.error("Unexpected exception for message [" + msg + ']', e);
} catch (RuntimeException e) {
_log.error("Unexpected exception for message [" + msg + ']', e);
} finally { } finally {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Stopping handler"); _log.debug("Stopping handler");
@@ -464,6 +466,8 @@ class SAMv3Handler extends SAMv1Handler
*/ */
@Override @Override
public void stopHandling() { public void stopHandling() {
if (_log.shouldInfo())
_log.info("Stopping (stolen? " + stolenSocket + "): " + this, new Exception("I did it"));
synchronized (stopLock) { synchronized (stopLock) {
stopHandler = true; stopHandler = true;
} }
@@ -728,14 +732,16 @@ class SAMv3Handler extends SAMv1Handler
@Override @Override
protected boolean execStreamConnect( Properties props) { protected boolean execStreamConnect( Properties props) {
// Messages are NOT sent if SILENT=true,
// The specs said that they were.
boolean verbose = !Boolean.parseBoolean(props.getProperty("SILENT"));
try { try {
if (props.isEmpty()) { if (props.isEmpty()) {
notifyStreamResult(true,"I2P_ERROR","No parameters specified in STREAM CONNECT message"); notifyStreamResult(verbose, "I2P_ERROR","No parameters specified in STREAM CONNECT message");
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("No parameters specified in STREAM CONNECT message"); _log.debug("No parameters specified in STREAM CONNECT message");
return false; return false;
} }
boolean verbose = !Boolean.parseBoolean(props.getProperty("SILENT"));
String dest = props.getProperty("DESTINATION"); String dest = props.getProperty("DESTINATION");
if (dest == null) { if (dest == null) {
@@ -776,6 +782,9 @@ class SAMv3Handler extends SAMv1Handler
} }
private boolean execStreamForwardIncoming( Properties props ) { private boolean execStreamForwardIncoming( Properties props ) {
// Messages ARE sent if SILENT=true,
// which is different from CONNECT and ACCEPT.
// But this matched the specs.
try { try {
try { try {
streamForwardingSocket = true ; streamForwardingSocket = true ;
@@ -794,6 +803,8 @@ class SAMv3Handler extends SAMv1Handler
private boolean execStreamAccept( Properties props ) private boolean execStreamAccept( Properties props )
{ {
// Messages are NOT sent if SILENT=true,
// The specs said that they were.
boolean verbose = !Boolean.parseBoolean(props.getProperty("SILENT")); boolean verbose = !Boolean.parseBoolean(props.getProperty("SILENT"));
try { try {
try { try {

View File

@@ -14,5 +14,7 @@ public class SAMClientEventListenerImpl implements SAMReader.SAMClientEventListe
public void streamConnectedReceived(String remoteDestination, String id) {} public void streamConnectedReceived(String remoteDestination, String id) {}
public void streamDataReceived(String id, byte[] data, int offset, int length) {} public void streamDataReceived(String id, byte[] data, int offset, int length) {}
public void streamStatusReceived(String result, String id, String message) {} public void streamStatusReceived(String result, String id, String message) {}
public void pingReceived(String data) {}
public void pongReceived(String data) {}
public void unknownMessageReceived(String major, String minor, Properties params) {} public void unknownMessageReceived(String major, String minor, Properties params) {}
} }

View File

@@ -80,6 +80,8 @@ public class SAMReader {
public void streamDataReceived(String id, byte data[], int offset, int length); public void streamDataReceived(String id, byte data[], int offset, int length);
public void namingReplyReceived(String name, String result, String value, String message); public void namingReplyReceived(String name, String result, String value, String message);
public void destReplyReceived(String publicKey, String privateKey); public void destReplyReceived(String publicKey, String privateKey);
public void pingReceived(String data);
public void pongReceived(String data);
public void unknownMessageReceived(String major, String minor, Properties params); public void unknownMessageReceived(String major, String minor, Properties params);
} }
@@ -118,13 +120,13 @@ public class SAMReader {
StringTokenizer tok = new StringTokenizer(line); StringTokenizer tok = new StringTokenizer(line);
if (tok.countTokens() < 2) { if (tok.countTokens() <= 0) {
_log.error("Invalid SAM line: [" + line + "]"); _log.error("Invalid SAM line: [" + line + "]");
break; break;
} }
String major = tok.nextToken(); String major = tok.nextToken();
String minor = tok.nextToken(); String minor = tok.hasMoreTokens() ? tok.nextToken() : "";
params.clear(); params.clear();
while (tok.hasMoreTokens()) { while (tok.hasMoreTokens()) {
@@ -247,6 +249,12 @@ public class SAMReader {
} else { } else {
_listener.unknownMessageReceived(major, minor, params); _listener.unknownMessageReceived(major, minor, params);
} }
} else if ("PING".equals(major)) {
// this omits anything after a space
_listener.pingReceived(minor);
} else if ("PONG".equals(major)) {
// this omits anything after a space
_listener.pongReceived(minor);
} else { } else {
_listener.unknownMessageReceived(major, minor, params); _listener.unknownMessageReceived(major, minor, params);
} }

View File

@@ -174,7 +174,7 @@ public class SAMStreamSend {
_log.info("We are " + destination); _log.info("We are " + destination);
} }
return destination; return destination;
} catch (Exception e) { } catch (IOException e) {
_log.error("Error handshaking", e); _log.error("Error handshaking", e);
return null; return null;
} }
@@ -327,6 +327,12 @@ public class SAMStreamSend {
_reader2.stopReading(); _reader2.stopReading();
// stop the reader, since we're only doing this once for testing // stop the reader, since we're only doing this once for testing
// you wouldn't do this in a real application // you wouldn't do this in a real application
if (_isV3) {
// closing the master socket too fast will kill the data socket flushing through
try {
Thread.sleep(10000);
} catch (InterruptedException ie) {}
}
_reader.stopReading(); _reader.stopReading();
} }
} }

View File

@@ -66,12 +66,12 @@ public class SAMStreamSink {
_log.debug("Starting up"); _log.debug("Starting up");
try { try {
Socket sock = connect(); Socket sock = connect();
SAMEventHandler eventHandler = new SinkEventHandler(_context); OutputStream out = sock.getOutputStream();
SAMEventHandler eventHandler = new SinkEventHandler(_context, out);
_reader = new SAMReader(_context, sock.getInputStream(), eventHandler); _reader = new SAMReader(_context, sock.getInputStream(), eventHandler);
_reader.startReading(); _reader.startReading();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Reader created"); _log.debug("Reader created");
OutputStream out = sock.getOutputStream();
String ourDest = handshake(out, version, true, eventHandler); String ourDest = handshake(out, version, true, eventHandler);
if (ourDest == null) if (ourDest == null)
throw new IOException("handshake failed"); throw new IOException("handshake failed");
@@ -79,12 +79,12 @@ public class SAMStreamSink {
_log.debug("Handshake complete. we are " + ourDest); _log.debug("Handshake complete. we are " + ourDest);
if (_isV3) { if (_isV3) {
Socket sock2 = connect(); Socket sock2 = connect();
eventHandler = new SinkEventHandler2(_context, sock2.getInputStream()); out = sock2.getOutputStream();
eventHandler = new SinkEventHandler2(_context, sock2.getInputStream(), out);
_reader2 = new SAMReader(_context, sock2.getInputStream(), eventHandler); _reader2 = new SAMReader(_context, sock2.getInputStream(), eventHandler);
_reader2.startReading(); _reader2.startReading();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Reader2 created"); _log.debug("Reader2 created");
out = sock2.getOutputStream();
String ok = handshake(out, version, false, eventHandler); String ok = handshake(out, version, false, eventHandler);
if (ok == null) if (ok == null)
throw new IOException("2nd handshake failed"); throw new IOException("2nd handshake failed");
@@ -99,7 +99,12 @@ public class SAMStreamSink {
private class SinkEventHandler extends SAMEventHandler { private class SinkEventHandler extends SAMEventHandler {
public SinkEventHandler(I2PAppContext ctx) { super(ctx); } protected final OutputStream _out;
public SinkEventHandler(I2PAppContext ctx, OutputStream out) {
super(ctx);
_out = out;
}
@Override @Override
public void streamClosedReceived(String result, String id, String message) { public void streamClosedReceived(String result, String id, String message) {
@@ -143,14 +148,28 @@ public class SAMStreamSink {
_log.error("Error creating a new sink", ioe); _log.error("Error creating a new sink", ioe);
} }
} }
@Override
public void pingReceived(String data) {
if (_log.shouldInfo())
_log.info("Got PING " + data + ", sending PONG " + data);
synchronized (_out) {
try {
_out.write(("PONG " + data + '\n').getBytes());
_out.flush();
} catch (IOException ioe) {
_log.error("PONG fail", ioe);
}
}
}
} }
private class SinkEventHandler2 extends SinkEventHandler { private class SinkEventHandler2 extends SinkEventHandler {
private final InputStream _in; private final InputStream _in;
public SinkEventHandler2(I2PAppContext ctx, InputStream in) { public SinkEventHandler2(I2PAppContext ctx, InputStream in, OutputStream out) {
super(ctx); super(ctx, out);
_in = in; _in = in;
} }
@@ -159,10 +178,9 @@ public class SAMStreamSink {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("got STREAM STATUS, result=" + result); _log.debug("got STREAM STATUS, result=" + result);
super.streamStatusReceived(result, id, message); super.streamStatusReceived(result, id, message);
// with SILENT=true, there's nothing else coming, so fire up the Sink
Sink sink = null; Sink sink = null;
try { try {
String dest = "TODO if not silent"; String dest = "TODO_if_not_silent";
sink = new Sink(_v3ID, dest); sink = new Sink(_v3ID, dest);
synchronized (_remotePeers) { synchronized (_remotePeers) {
_remotePeers.put(_v3ID, sink); _remotePeers.put(_v3ID, sink);
@@ -315,7 +333,7 @@ public class SAMStreamSink {
_log.info(_destFile + " is located at " + destination); _log.info(_destFile + " is located at " + destination);
} }
return destination; return destination;
} catch (Exception e) { } catch (IOException e) {
_log.error("Error handshaking", e); _log.error("Error handshaking", e);
return null; return null;
} }
@@ -337,7 +355,7 @@ public class SAMStreamSink {
fos.write(dest.getBytes()); fos.write(dest.getBytes());
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("My destination written to " + _destFile); _log.debug("My destination written to " + _destFile);
} catch (Exception e) { } catch (IOException e) {
_log.error("Error writing to " + _destFile, e); _log.error("Error writing to " + _destFile, e);
return false; return false;
} finally { } finally {