diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java index 697206325..32f8213c6 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java @@ -25,12 +25,14 @@ import javax.net.ssl.SSLServerSocketFactory; import net.i2p.I2PAppContext; import net.i2p.I2PException; +import net.i2p.client.I2PClient; import net.i2p.client.I2PSession; import net.i2p.client.I2PSessionException; import net.i2p.client.streaming.I2PSocket; import net.i2p.client.streaming.I2PSocketManager; import net.i2p.client.streaming.I2PSocketManagerFactory; import net.i2p.client.streaming.I2PSocketOptions; +import net.i2p.crypto.SigType; import net.i2p.data.Destination; import net.i2p.util.EventDispatcher; import net.i2p.util.I2PAppThread; @@ -287,6 +289,10 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna _socketManagerState = SocketManagerState.INIT; // We could be here a LONG time, holding the lock socketManager = buildSocketManager(tunnel, pkf); + // FIXME may not be the right place for this + I2PSession sub = addSubsession(tunnel); + if (sub != null && _log.shouldLog(Log.WARN)) + _log.warn("Added subsession " + sub); } else { if (_log.shouldLog(Log.INFO)) _log.info(tunnel.getClientOptions().getProperty("inbound.nickname") + ": Not building a new socket manager since the old one is open [s=" + s + "]"); @@ -299,10 +305,41 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna if (_log.shouldLog(Log.INFO)) _log.info(tunnel.getClientOptions().getProperty("inbound.nickname") + ": Building a new socket manager since there is no other one"); socketManager = buildSocketManager(tunnel, pkf); + I2PSession sub = addSubsession(tunnel); + if (sub != null && _log.shouldLog(Log.WARN)) + _log.warn("Added subsession " + sub); } return socketManager; } + /** + * Add a subsession to a shared client if necessary. + * + * @since 0.9.20 + */ + protected static synchronized I2PSession addSubsession(I2PTunnel tunnel) { + I2PSession sess = socketManager.getSession(); + if (sess.getMyDestination().getSigType() == SigType.DSA_SHA1) + return null; + Properties props = new Properties(); + props.putAll(tunnel.getClientOptions()); + String name = props.getProperty("inbound.nickname"); + if (name != null) + props.setProperty("inbound.nickname", name + " (DSA)"); + name = props.getProperty("outbound.nickname"); + if (name != null) + props.setProperty("outbound.nickname", name + " (DSA)"); + props.setProperty(I2PClient.PROP_SIGTYPE, "DSA_SHA1"); + try { + return socketManager.addSubsession(null, props); + } catch (I2PSessionException ise) { + Log log = tunnel.getContext().logManager().getLog(I2PTunnelClientBase.class); + if (log.shouldLog(Log.WARN)) + log.warn("Failed to add subssession", ise); + return null; + } + } + /** * Kill the shared client, so that on restart in android * we won't latch onto the old one diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java index 5d1819b70..3c2a37325 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java @@ -5,17 +5,20 @@ package net.i2p.client.streaming; import java.io.IOException; +import java.io.InputStream; import java.io.InterruptedIOException; import java.net.ConnectException; import java.net.NoRouteToHostException; import java.net.ServerSocket; import java.net.Socket; +import java.util.List; import java.util.Properties; import java.util.Set; import net.i2p.I2PAppContext; import net.i2p.I2PException; import net.i2p.client.I2PSession; +import net.i2p.client.I2PSessionException; import net.i2p.data.Destination; @@ -34,6 +37,26 @@ public interface I2PSocketManager { */ public I2PSession getSession(); + /** + * @return a new subsession, non-null + * @param privateKeyStream null for transient, if non-null must have same encryption keys as primary session + * and different signing keys + * @param opts subsession options if any, may be null + * @since 0.9.19 + */ + public I2PSession addSubsession(InputStream privateKeyStream, Properties opts) throws I2PSessionException; + + /** + * @since 0.9.19 + */ + public void removeSubsession(I2PSession session); + + /** + * @return a list of subsessions, non-null, does not include the primary session + * @since 0.9.19 + */ + public List getSubsessions(); + /** * How long should we wait for the client to .accept() a socket before * sending back a NACK/Close? diff --git a/apps/routerconsole/java/src/net/i2p/router/web/SummaryHelper.java b/apps/routerconsole/java/src/net/i2p/router/web/SummaryHelper.java index 86c9daaea..f0ef33f9e 100644 --- a/apps/routerconsole/java/src/net/i2p/router/web/SummaryHelper.java +++ b/apps/routerconsole/java/src/net/i2p/router/web/SummaryHelper.java @@ -452,10 +452,10 @@ public class SummaryHelper extends HelperBase { buf.append("client.png\" alt=\"Client\" title=\"").append(_("Client")).append("\">"); buf.append(""); - if (name.length() < 18) + if (name.length() <= 20) buf.append(DataHelper.escapeHTML(name)); else - buf.append(DataHelper.escapeHTML(name.substring(0,15))).append("…"); + buf.append(DataHelper.escapeHTML(name.substring(0,18))).append("…"); buf.append("\n"); LeaseSet ls = _context.netDb().lookupLeaseSetLocally(h); if (ls != null && _context.tunnelManager().getOutboundClientTunnelCount(h) > 0) { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java index c27bc9dea..3a0a605ef 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java @@ -874,6 +874,9 @@ class Connection { */ public void setOptions(ConnectionOptions opts) { _options = opts; } + /** @since 0.9.21 */ + public ConnectionManager getConnectionManager() { return _connectionManager; } + public I2PSession getSession() { return _connectionManager.getSession(); } public I2PSocketFull getSocket() { return _socket; } public void setSocket(I2PSocketFull socket) { _socket = socket; } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java index 950670efc..55d9522bf 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java @@ -88,7 +88,7 @@ class ConnectionManager { // As of 0.9.1, listen on configured port (default 0 = all) int protocol = defaultOptions.getEnforceProtocol() ? I2PSession.PROTO_STREAMING : I2PSession.PROTO_ANY; _session.addMuxedSessionListener(_messageHandler, protocol, defaultOptions.getLocalPort()); - _outboundQueue = new PacketQueue(_context, _session, this); + _outboundQueue = new PacketQueue(_context, _session); _recentlyClosed = new LHMCache(32); /** Socket timeout for accept() */ _soTimeout = -1; @@ -429,7 +429,8 @@ class ConnectionManager { // try { _connectionLock.wait(remaining); } catch (InterruptedException ie) {} try { Thread.sleep(remaining/4); } catch (InterruptedException ie) {} } else { - con = new Connection(_context, this, _schedulerChooser, _timer, _outboundQueue, _conPacketHandler, opts, false); + con = new Connection(_context, this, _schedulerChooser, _timer, + _outboundQueue, _conPacketHandler, opts, false); con.setRemotePeer(peer); assignReceiveStreamId(con); break; // stop looping as a psuedo-wait @@ -890,4 +891,12 @@ class ConnectionManager { if (req != null) req.pong(payload); } + + /** + * @since 0.9.20 + */ + @Override + public String toString() { + return "ConnectionManager for " + _session; + } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/I2PSocketManagerFull.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/I2PSocketManagerFull.java index 630b1f0b6..7ea8c90bd 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/I2PSocketManagerFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/I2PSocketManagerFull.java @@ -1,26 +1,40 @@ package net.i2p.client.streaming.impl; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.InputStream; import java.net.ConnectException; import java.net.NoRouteToHostException; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketTimeoutException; import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import net.i2p.I2PAppContext; import net.i2p.I2PException; +import net.i2p.client.I2PClient; import net.i2p.client.I2PSession; import net.i2p.client.I2PSessionException; import net.i2p.client.streaming.I2PServerSocket; import net.i2p.client.streaming.I2PSocket; import net.i2p.client.streaming.I2PSocketManager; import net.i2p.client.streaming.I2PSocketOptions; +import net.i2p.crypto.SigType; +import net.i2p.data.Certificate; import net.i2p.data.Destination; +import net.i2p.data.Hash; +import net.i2p.data.PrivateKey; +import net.i2p.data.PublicKey; +import net.i2p.data.SimpleDataStructure; +import net.i2p.util.ConvertToHash; import net.i2p.util.Log; /** @@ -37,6 +51,7 @@ public class I2PSocketManagerFull implements I2PSocketManager { private final I2PAppContext _context; private final Log _log; private final I2PSession _session; + private final ConcurrentHashMap _subsessions; private final I2PServerSocketFull _serverSocket; private StandardServerSocket _realServerSocket; private final ConnectionOptions _defaultOptions; @@ -45,7 +60,52 @@ public class I2PSocketManagerFull implements I2PSocketManager { private static final AtomicInteger __managerId = new AtomicInteger(); private final ConnectionManager _connectionManager; private final AtomicBoolean _isDestroyed = new AtomicBoolean(); + + /** @since 0.9.20 */ + private static final Set _dsaOnly = new HashSet(16); + private static final String[] DSA_ONLY_HASHES = { + // list from http://zzz.i2p/topics/1682?page=1#p8414 + // bzr.welterde.i2p + "Cvs1gCZTTkgD2Z2byh2J9atPmh5~I8~L7BNQnQl0hUE=", + // docs.i2p2.i2p + "WCXV87RdrF6j-mnn6qt7kVSBifHTlPL0PmVMFWwaolo=", + // flibusta.i2p + "yy2hYtqqfl84N9skwdRkeM7baFMXHKyDWU3XRShlEo8=", + // forum.i2p + "3t5Ar2NCTIOId70uzX2bZyJljR0aBogxMEzNyHirB7A=", + // i2jump.i2p + "9vaoGZbOaeqdRK2qEunlwRM9mUSW-I9R4OON35TDKK4=", + // irc.welterde.i2p + "5rjezx4McFk3bNhoJV-NTLlQW1AR~jiUcN6DOWMCCVc=", + // lists.i2p2.i2p + "qwtgoFoMSK0TOtbT4ovBX1jHUzCoZCPzrJVxjKD7RCg=", + // mtn.i2p2.i2p + "X5VDzYaoX9-P6bAWnrVSR5seGLkOeORP2l3Mh4drXPo=", + // nntp.welterde.i2p + "VXwmNIwMy1BcUVmut0oZ72jbWoqFzvxJukmS-G8kAAE=", + // paste.i2p2.i2p + "DoyMyUUgOSTddvRpqYfKHFPPjkkX~iQmResyfjjBYWs=", + // syndie.wetlerde.i2p + "xMxC54BFgyp-~zzrQI3F8m2CK--9XMcNmSAep6RH4Kk=", + // ugha.i2p + "zsu3WF~QLBxZXH-gHq9MuZE6y8ROZmMF7dA2MbMMKkY=", + // tracker.welterde.i2p + "EVkFgKkrDKyGfI7TIuDmlHoAmvHC~FbnY946DfujR0A=", + // www.i2p2.i2p + "im9gytzKT15mT1sB5LC9bHXCcwytQ4EPcrGQhoam-4w=" + }; + static { + for (int i = 0; i < DSA_ONLY_HASHES.length; i++) { + String s = DSA_ONLY_HASHES[i]; + Hash h = ConvertToHash.getHash(s); + if (h != null) + _dsaOnly.add(h); + else + System.out.println("Bad hash " + s); + } + } + /** * How long to wait for the client app to accept() before sending back CLOSE? * This includes the time waiting in the queue. Currently set to 5 seconds. @@ -80,6 +140,7 @@ public class I2PSocketManagerFull implements I2PSocketManager { public I2PSocketManagerFull(I2PAppContext context, I2PSession session, Properties opts, String name) { _context = context; _session = session; + _subsessions = new ConcurrentHashMap(4); _log = _context.logManager().getLog(I2PSocketManagerFull.class); _name = name + " " + (__managerId.incrementAndGet()); @@ -120,6 +181,100 @@ public class I2PSocketManagerFull implements I2PSocketManager { return _session; } + /** + * @return a new subsession, non-null + * @param privateKeyStream null for transient, if non-null must have same encryption keys as primary session + * and different signing keys + * @param opts subsession options if any, may be null + * @since 0.9.19 + */ + public I2PSession addSubsession(InputStream privateKeyStream, Properties opts) throws I2PSessionException { + if (privateKeyStream == null) { + // We don't actually need the same pubkey in the dest, just in the LS. + // The dest one is unused. But this is how we find the LS keys + // to reuse in RequestLeaseSetMessageHandler. + ByteArrayOutputStream keyStream = new ByteArrayOutputStream(1024); + try { + SigType type = getSigType(opts); + if (type != SigType.DSA_SHA1) { + // hassle, have to set up the padding and cert, see I2PClientImpl + throw new I2PSessionException("type " + type + " unsupported"); + } + PublicKey pub = _session.getMyDestination().getPublicKey(); + PrivateKey priv = _session.getDecryptionKey(); + SimpleDataStructure[] keys = _context.keyGenerator().generateSigningKeys(type); + pub.writeBytes(keyStream); + keys[0].writeBytes(keyStream); // signing pub + Certificate.NULL_CERT.writeBytes(keyStream); + priv.writeBytes(keyStream); + keys[1].writeBytes(keyStream); // signing priv + } catch (Exception e) { + throw new I2PSessionException("Error creating keys", e); + } + privateKeyStream = new ByteArrayInputStream(keyStream.toByteArray()); + } + I2PSession rv = _session.addSubsession(privateKeyStream, opts); + ConnectionOptions defaultOptions = new ConnectionOptions(opts); + ConnectionManager connectionManager = new ConnectionManager(_context, rv, defaultOptions); + ConnectionManager old = _subsessions.putIfAbsent(rv, connectionManager); + if (old != null) { + // shouldn't happen + _session.removeSubsession(rv); + connectionManager.shutdown(); + throw new I2PSessionException("dup"); + } + if (_log.shouldLog(Log.WARN)) + _log.warn("Added subsession " + rv); + return rv; + } + + /** + * @param opts may be null + * @since 0.9.20 copied from I2PSocketManagerFactory + */ + private SigType getSigType(Properties opts) { + if (opts != null) { + String st = opts.getProperty(I2PClient.PROP_SIGTYPE); + if (st != null) { + SigType rv = SigType.parseSigType(st); + if (rv != null && rv.isAvailable()) + return rv; + if (rv != null) + st = rv.toString(); + _log.logAlways(Log.WARN, "Unsupported sig type " + st + + ", reverting to " + I2PClient.DEFAULT_SIGTYPE); + // TODO throw instead? + } + } + return I2PClient.DEFAULT_SIGTYPE; + } + + /** + * Remove the subsession + * + * @since 0.9.19 + */ + public void removeSubsession(I2PSession session) { + _session.removeSubsession(session); + ConnectionManager cm = _subsessions.remove(session); + if (cm != null) { + cm.shutdown(); + if (_log.shouldLog(Log.WARN)) + _log.warn("Removeed subsession " + session); + } else { + if (_log.shouldLog(Log.WARN)) + _log.warn("Subsession not found to remove " + session); + } + } + + /** + * @return a list of subsessions, non-null, does not include the primary session + * @since 0.9.19 + */ + public List getSubsessions() { + return _session.getSubsessions(); + } + public ConnectionManager getConnectionManager() { return _connectionManager; } @@ -262,11 +417,16 @@ public class I2PSocketManagerFull implements I2PSocketManager { } private void verifySession() throws I2PException { + verifySession(_connectionManager); + } + + /** @since 0.9.20 */ + private void verifySession(ConnectionManager cm) throws I2PException { if (_isDestroyed.get()) throw new I2PException("Session was closed"); - if (!_connectionManager.getSession().isClosed()) + if (!cm.getSession().isClosed()) return; - _connectionManager.getSession().connect(); + cm.getSession().connect(); } /** @@ -285,7 +445,6 @@ public class I2PSocketManagerFull implements I2PSocketManager { */ public I2PSocket connect(Destination peer, I2PSocketOptions options) throws I2PException, NoRouteToHostException { - verifySession(); if (options == null) options = _defaultOptions; ConnectionOptions opts = null; @@ -297,8 +456,23 @@ public class I2PSocketManagerFull implements I2PSocketManager { if (_log.shouldLog(Log.INFO)) _log.info("Connecting to " + peer.calculateHash().toBase64().substring(0,6) + " with options: " + opts); + // pick the subsession here + ConnectionManager cm = _connectionManager; + if (!_subsessions.isEmpty()) { + Hash h = peer.calculateHash(); + if (_dsaOnly.contains(h)) { + // FIXME just taking the first one for now + for (Map.Entry e : _subsessions.entrySet()) { + if (e.getKey().getMyDestination().getSigType() == SigType.DSA_SHA1) { + cm = e.getValue(); + break; + } + } + } + } + verifySession(cm); // the following blocks unless connect delay > 0 - Connection con = _connectionManager.connect(peer, opts); + Connection con = cm.connect(peer, opts); if (con == null) throw new TooManyStreamsException("Too many streams, max " + _defaultOptions.getMaxConns()); I2PSocketFull socket = new I2PSocketFull(con,_context); @@ -381,6 +555,12 @@ public class I2PSocketManagerFull implements I2PSocketManager { } _connectionManager.setAllowIncomingConnections(false); _connectionManager.shutdown(); + if (!_subsessions.isEmpty()) { + for (I2PSession sess : _subsessions.keySet()) { + removeSubsession(sess); + } + } + // should we destroy the _session too? // yes, since the old lib did (and SAM wants it to, and i dont know why not) if ( (_session != null) && (!_session.isClosed()) ) { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageHandler.java index 9da5568c4..9577e0e5b 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageHandler.java @@ -50,7 +50,7 @@ class MessageHandler implements I2PSessionMuxedListener { * @param size size of the message */ public void messageAvailable(I2PSession session, int msgId, long size, int proto, int fromPort, int toPort) { - byte data[] = null; + byte data[]; try { data = session.receiveMessage(msgId); } catch (I2PSessionException ise) { @@ -59,7 +59,17 @@ class MessageHandler implements I2PSessionMuxedListener { _log.warn("Error receiving the message", ise); return; } - if (data == null) return; + if (data == null) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Received null data on " + session + " proto: " + proto + + " fromPort: " + fromPort + " toPort: " + toPort); + return; + } + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Received " + data.length + " bytes on " + session + + " (" + _manager + ')' + + " proto: " + proto + + " fromPort: " + fromPort + " toPort: " + toPort); Packet packet = new Packet(); try { packet.readPacket(data, 0, data.length); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageInputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageInputStream.java index 3897b2809..a77d282fe 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageInputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageInputStream.java @@ -568,6 +568,27 @@ class MessageInputStream extends InputStream { @Override public void close() { synchronized (_dataLock) { + if (_log.shouldLog(Log.DEBUG)) { + StringBuilder buf = new StringBuilder(128); + buf.append("close(), ready bytes: "); + long available = 0; + for (int i = 0; i < _readyDataBlocks.size(); i++) + available += _readyDataBlocks.get(i).getValid(); + available -= _readyDataBlockIndex; + buf.append(available); + buf.append(" blocks: ").append(_readyDataBlocks.size()); + buf.append(" not ready blocks: "); + long notAvailable = 0; + for (Long id : _notYetReadyBlocks.keySet()) { + ByteArray ba = _notYetReadyBlocks.get(id); + buf.append(id).append(" "); + if (ba != null) + notAvailable += ba.getValid(); + } + buf.append("not ready bytes: ").append(notAvailable); + buf.append(" highest ready block: ").append(_highestReadyBlockId); + _log.debug(buf.toString()); + } //while (_readyDataBlocks.size() > 0) // _cache.release((ByteArray)_readyDataBlocks.remove(0)); _readyDataBlocks.clear(); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/Packet.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/Packet.java index 77de11061..0bf00e4ca 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/Packet.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/Packet.java @@ -766,7 +766,7 @@ class Packet { if (isFlagSet(FLAG_MAX_PACKET_SIZE_INCLUDED)) buf.append(" MS ").append(_optionMaxSize); if (isFlagSet(FLAG_PROFILE_INTERACTIVE)) buf.append(" INTERACTIVE"); if (isFlagSet(FLAG_RESET)) buf.append(" RESET"); - if (isFlagSet(FLAG_SIGNATURE_INCLUDED)) buf.append(" SIG"); + if (isFlagSet(FLAG_SIGNATURE_INCLUDED)) buf.append(" SIG ").append(_optionSignature.length()); if (isFlagSet(FLAG_SIGNATURE_REQUESTED)) buf.append(" SIGREQ"); if (isFlagSet(FLAG_SYNCHRONIZE)) buf.append(" SYN"); } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketQueue.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketQueue.java index 1eb10c7fb..10b448bea 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketQueue.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketQueue.java @@ -29,7 +29,6 @@ class PacketQueue implements SendMessageStatusListener { private final I2PAppContext _context; private final Log _log; private final I2PSession _session; - private final ConnectionManager _connectionManager; private final ByteCache _cache = ByteCache.getInstance(64, 36*1024); private final Map _messageStatusMap; private volatile boolean _dead; @@ -46,10 +45,9 @@ class PacketQueue implements SendMessageStatusListener { private static final long REMOVE_EXPIRED_TIME = 67*1000; private static final boolean ENABLE_STATUS_LISTEN = true; - public PacketQueue(I2PAppContext context, I2PSession session, ConnectionManager mgr) { + public PacketQueue(I2PAppContext context, I2PSession session) { _context = context; _session = session; - _connectionManager = mgr; _log = context.logManager().getLog(PacketQueue.class); _messageStatusMap = new ConcurrentHashMap(16); new RemoveExpired(); @@ -199,9 +197,10 @@ class PacketQueue implements SendMessageStatusListener { //packet.setTagsSent(tagsSent); packet.incrementSends(); Connection c = packet.getConnection(); - String suffix = (c != null ? "wsize " + c.getOptions().getWindowSize() + " rto " + c.getOptions().getRTO() : null); - if (_log.shouldDebug()) - _connectionManager.getPacketHandler().displayPacket(packet, "SEND", suffix); + if (c != null) { + String suffix = "wsize " + c.getOptions().getWindowSize() + " rto " + c.getOptions().getRTO(); + c.getConnectionManager().getPacketHandler().displayPacket(packet, "SEND", suffix); + } if (I2PSocketManagerFull.pcapWriter != null && _context.getBooleanProperty(I2PSocketManagerFull.PROP_PCAP)) packet.logTCPDump(); diff --git a/core/java/src/net/i2p/client/I2PSession.java b/core/java/src/net/i2p/client/I2PSession.java index a6ce4122e..dc79a1f4f 100644 --- a/core/java/src/net/i2p/client/I2PSession.java +++ b/core/java/src/net/i2p/client/I2PSession.java @@ -9,6 +9,8 @@ package net.i2p.client; * */ +import java.io.InputStream; +import java.util.List; import java.util.Properties; import java.util.Set; @@ -21,7 +23,7 @@ import net.i2p.data.SigningPrivateKey; /** *

Define the standard means of sending and receiving messages on the * I2P network by using the I2CP (the client protocol). This is done over a - * bidirectional TCP socket and never sends any private keys. + * bidirectional TCP socket. * * End to end encryption in I2PSession was disabled in release 0.6. * @@ -247,6 +249,27 @@ public interface I2PSession { * */ public void destroySession() throws I2PSessionException; + + /** + * @return a new subsession, non-null + * @param privateKeyStream null for transient, if non-null must have same encryption keys as primary session + * and different signing keys + * @param opts subsession options if any, may be null + * @since 0.9.19 + */ + public I2PSession addSubsession(InputStream privateKeyStream, Properties opts) throws I2PSessionException; + + /** + * @return a list of subsessions, non-null, does not include the primary session + * @since 0.9.19 + */ + public void removeSubsession(I2PSession session); + + /** + * @return a list of subsessions, non-null, does not include the primary session + * @since 0.9.19 + */ + public List getSubsessions(); /** * Actually connect the session and start receiving/sending messages diff --git a/core/java/src/net/i2p/client/I2PSessionDemultiplexer.java b/core/java/src/net/i2p/client/I2PSessionDemultiplexer.java index 1f07a32fc..ea1ef2437 100644 --- a/core/java/src/net/i2p/client/I2PSessionDemultiplexer.java +++ b/core/java/src/net/i2p/client/I2PSessionDemultiplexer.java @@ -74,7 +74,9 @@ public class I2PSessionDemultiplexer implements I2PSessionMuxedListener { * (Streaming lib) */ public void addListener(I2PSessionListener l, int proto, int port) { - _listeners.put(key(proto, port), new NoPortsListener(l)); + I2PSessionListener old = _listeners.put(key(proto, port), new NoPortsListener(l)); + if (old != null && _log.shouldLog(Log.WARN)) + _log.warn("Listener " + l + " replaces " + old + " for proto: " + proto + " port: " + port); } /** @@ -82,7 +84,9 @@ public class I2PSessionDemultiplexer implements I2PSessionMuxedListener { * UDP perhaps */ public void addMuxedListener(I2PSessionMuxedListener l, int proto, int port) { - _listeners.put(key(proto, port), l); + I2PSessionListener old = _listeners.put(key(proto, port), l); + if (old != null && _log.shouldLog(Log.WARN)) + _log.warn("Listener " + l + " replaces " + old + " for proto: " + proto + " port: " + port); } public void removeListener(int proto, int port) { diff --git a/core/java/src/net/i2p/client/I2PSessionImpl.java b/core/java/src/net/i2p/client/I2PSessionImpl.java index d3a11d74e..035536c6c 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl.java @@ -23,6 +23,7 @@ import java.util.Locale; import java.util.Map; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; @@ -43,6 +44,7 @@ import net.i2p.data.i2cp.I2CPMessage; import net.i2p.data.i2cp.I2CPMessageReader; import net.i2p.data.i2cp.MessagePayloadMessage; import net.i2p.data.i2cp.SessionId; +import net.i2p.data.i2cp.SessionStatusMessage; import net.i2p.internal.I2CPMessageQueue; import net.i2p.internal.InternalClientManager; import net.i2p.internal.QueuedI2CPMessageReader; @@ -81,6 +83,15 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2 /** currently granted lease set, or null */ private volatile LeaseSet _leaseSet; + // subsession stuff + // registered subsessions + private final List _subsessions; + // established subsessions + private final ConcurrentHashMap _subsessionMap; + private final Object _subsessionLock = new Object(); + private static final String MIN_SUBSESSION_VERSION = "0.9.19"; + private volatile boolean _routerSupportsSubsessions; + /** hostname of router - will be null if in RouterContext */ protected final String _hostname; /** port num to router - will be 0 if in RouterContext */ @@ -186,6 +197,9 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2 TEST_LOOKUP || (routerVersion != null && routerVersion.length() > 0 && VersionComparator.comp(routerVersion, MIN_HOST_LOOKUP_VERSION) >= 0); + _routerSupportsSubsessions = _context.isRouterContext() || + (routerVersion != null && routerVersion.length() > 0 && + VersionComparator.comp(routerVersion, MIN_SUBSESSION_VERSION) >= 0); synchronized (_stateLock) { if (_state == State.OPENING) { _state = State.GOTDATE; @@ -203,18 +217,42 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2 */ protected I2PSessionImpl(I2PAppContext context, Properties options, I2PClientMessageHandlerMap handlerMap) { - this(context, options, handlerMap, false); + this(context, options, handlerMap, null, false); } - + + /* + * For extension by SubSession via I2PSessionMuxedImpl and I2PSessionImpl2 + * + * @param destKeyStream stream containing the private key data, + * format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} + * @param options set of options to configure the router with, if null will use System properties + * @since 0.9.19 + */ + protected I2PSessionImpl(I2PSessionImpl primary, InputStream destKeyStream, Properties options) throws I2PSessionException { + this(primary.getContext(), options, primary.getHandlerMap(), primary.getProducer(), true); + _availabilityNotifier = new AvailabilityNotifier(); + try { + readDestination(destKeyStream); + } catch (DataFormatException dfe) { + throw new I2PSessionException("Error reading the destination key stream", dfe); + } catch (IOException ioe) { + throw new I2PSessionException("Error reading the destination key stream", ioe); + } + } + /** * Basic setup of finals * @since 0.9.7 */ private I2PSessionImpl(I2PAppContext context, Properties options, - I2PClientMessageHandlerMap handlerMap, boolean hasDest) { + I2PClientMessageHandlerMap handlerMap, + I2CPMessageProducer producer, + boolean hasDest) { _context = context; _handlerMap = handlerMap; _log = context.logManager().getLog(getClass()); + _subsessions = new CopyOnWriteArrayList(); + _subsessionMap = new ConcurrentHashMap(4); if (options == null) options = (Properties) System.getProperties().clone(); _options = loadConfig(options); @@ -222,7 +260,7 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2 _portNum = getPort(); _fastReceive = Boolean.parseBoolean(_options.getProperty(I2PClient.PROP_FAST_RECEIVE)); if (hasDest) { - _producer = new I2CPMessageProducer(context); + _producer = producer; _availableMessages = new ConcurrentHashMap(); _myDestination = new Destination(); _privateKey = new PrivateKey(); @@ -236,6 +274,7 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2 } _routerSupportsFastReceive = _context.isRouterContext(); _routerSupportsHostLookup = _context.isRouterContext(); + _routerSupportsSubsessions = _context.isRouterContext(); } /** @@ -247,10 +286,10 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2 * @param destKeyStream stream containing the private key data, * format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} * @param options set of options to configure the router with, if null will use System properties - * @throws I2PSessionException if there is a problem loading the private keys or + * @throws I2PSessionException if there is a problem loading the private keys */ public I2PSessionImpl(I2PAppContext context, InputStream destKeyStream, Properties options) throws I2PSessionException { - this(context, options, new I2PClientMessageHandlerMap(context), true); + this(context, options, new I2PClientMessageHandlerMap(context), new I2CPMessageProducer(context), true); _availabilityNotifier = new AvailabilityNotifier(); try { readDestination(destKeyStream); @@ -260,6 +299,69 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2 throw new I2PSessionException("Error reading the destination key stream", ioe); } } + + /** + * Router must be connected or was connected... for now. + * + * @return a new subsession, non-null + * @param privateKeyStream null for transient, if non-null must have same encryption keys as primary session + * and different signing keys + * @param opts subsession options if any, may be null + * @since 0.9.19 + */ + public I2PSession addSubsession(InputStream privateKeyStream, Properties opts) throws I2PSessionException { + if (!_routerSupportsSubsessions) + throw new I2PSessionException("Router does not support subsessions"); + SubSession sub; + synchronized(_subsessionLock) { + if (_subsessions.size() > _subsessionMap.size()) + throw new I2PSessionException("Subsession request already pending"); + sub = new SubSession(this, privateKeyStream, opts); + for (SubSession ss : _subsessions) { + if (ss.getDecryptionKey().equals(sub.getDecryptionKey()) && + ss.getPrivateKey().equals(sub.getPrivateKey())) { + throw new I2PSessionException("Dup subsession"); + } + } + _subsessions.add(sub); + } + + synchronized (_stateLock) { + if (_state == State.OPEN) { + _producer.connect(sub); + } // else will be called in connect() + } + return sub; + } + + /** + * @since 0.9.19 + */ + public void removeSubsession(I2PSession session) { + if (!(session instanceof SubSession)) + return; + synchronized(_subsessionLock) { + _subsessions.remove(session); + SessionId id = ((SubSession) session).getSessionId(); + if (id != null) + _subsessionMap.remove(id); + /// tell the subsession + try { + // doesn't really throw + session.destroySession(); + } catch (I2PSessionException ise) {} + } + } + + /** + * @return a list of subsessions, non-null, does not include the primary session + * @since 0.9.19 + */ + public List getSubsessions() { + synchronized(_subsessionLock) { + return new ArrayList(_subsessions); + } + } /** * Parse the config for anything we know about. @@ -553,6 +655,16 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2 startIdleMonitor(); startVerifyUsage(); success = true; + + // now send CreateSessionMessages for all subsessions, one at a time, must wait for each response + synchronized(_subsessionLock) { + for (SubSession ss : _subsessions) { + if (_log.shouldLog(Log.INFO)) + _log.info(getPrefix() + "Connecting subsession " + ss); + _producer.connect(ss); + } + } + } catch (InterruptedException ie) { throw new I2PSessionException("Interrupted", ie); } catch (UnknownHostException uhe) { @@ -763,19 +875,80 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2 /** * The I2CPMessageEventListener callback. * Recieve notification of some I2CP message and handle it if possible. + * + * We route the message based on message type AND session ID. + * + * The following types never contain a session ID and are not routable to + * a subsession: + * BandwidthLimitsMessage, DestReplyMessage + * + * The following types may not ontain a valid session ID + * even when intended for a subsession, so we must take special care: + * SessionStatusMessage + * * @param reader unused */ public void messageReceived(I2CPMessageReader reader, I2CPMessage message) { - I2CPMessageHandler handler = _handlerMap.getHandler(message.getType()); - if (handler == null) { - if (_log.shouldLog(Log.WARN)) - _log.warn(getPrefix() + "Unknown message or unhandleable message received: type = " - + message.getType()); + int type = message.getType(); + SessionId id = message.sessionId(); + if (id == null || id.equals(_sessionId) || + (_sessionId == null && id != null && type == SessionStatusMessage.MESSAGE_TYPE)) { + // it's for us + I2CPMessageHandler handler = _handlerMap.getHandler(type); + if (handler != null) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug(getPrefix() + "Message received of type " + type + + " to be handled by " + handler.getClass().getSimpleName()); + handler.handleMessage(message, this); + } else { + if (_log.shouldLog(Log.WARN)) + _log.warn(getPrefix() + "Unknown message or unhandleable message received: type = " + + type); + } } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getPrefix() + "Message received of type " + message.getType() - + " to be handled by " + handler.getClass().getSimpleName()); - handler.handleMessage(message, this); + SubSession sub = _subsessionMap.get(id); + if (sub != null) { + // it's for a subsession + if (_log.shouldLog(Log.DEBUG)) + _log.debug(getPrefix() + "Message received of type " + type + + " to be handled by " + sub); + sub.messageReceived(reader, message); + } else if (id != null && type == SessionStatusMessage.MESSAGE_TYPE) { + // look for a subsession without a session + synchronized (_subsessionLock) { + for (SubSession sess : _subsessions) { + if (sess.getSessionId() == null) { + sess.messageReceived(reader, message); + id = sess.getSessionId(); + if (id != null) { + if (id.equals(_sessionId)) { + // shouldnt happen + sess.setSessionId(null); + if (_log.shouldLog(Log.WARN)) + _log.warn("Dup or our session id " + id); + } else { + SubSession old = _subsessionMap.putIfAbsent(id, sess); + if (old != null) { + // shouldnt happen + sess.setSessionId(null); + if (_log.shouldLog(Log.WARN)) + _log.warn("Dup session id " + id); + } + } + } + return; + } + if (_log.shouldLog(Log.WARN)) + _log.warn(getPrefix() + "No session " + id + " to handle message: type = " + + type); + } + } + } else { + // it's for nobody + if (_log.shouldLog(Log.WARN)) + _log.warn(getPrefix() + "No session " + id + " to handle message: type = " + + type); + } } } @@ -810,6 +983,18 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2 */ I2CPMessageProducer getProducer() { return _producer; } + /** + * For Subsessions + * @since 0.9.19 + */ + I2PClientMessageHandlerMap getHandlerMap() { return _handlerMap; } + + /** + * For Subsessions + * @since 0.9.19 + */ + I2PAppContext getContext() { return _context; } + /** * Retrieve the configuration options, filtered. * All defaults passed in via constructor have been promoted to the primary map. @@ -923,6 +1108,7 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2 if (_availabilityNotifier != null) _availabilityNotifier.stopNotifying(); closeSocket(); + _subsessionMap.clear(); if (_sessionListener != null) _sessionListener.disconnected(this); } diff --git a/core/java/src/net/i2p/client/I2PSessionImpl2.java b/core/java/src/net/i2p/client/I2PSessionImpl2.java index d58f5b5f1..4002bab80 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl2.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl2.java @@ -50,9 +50,9 @@ class I2PSessionImpl2 extends I2PSessionImpl { private static final long REMOVE_EXPIRED_TIME = 63*1000; - /** - * for extension by SimpleSession (no dest) - */ + /** + * for extension by SimpleSession (no dest) + */ protected I2PSessionImpl2(I2PAppContext context, Properties options, I2PClientMessageHandlerMap handlerMap) { super(context, options, handlerMap); @@ -61,15 +61,17 @@ class I2PSessionImpl2 extends I2PSessionImpl { } /** + * for extension by I2PSessionMuxedImpl + * * Create a new session, reading the Destination, PrivateKey, and SigningPrivateKey * from the destKeyStream, and using the specified options to connect to the router * * @param destKeyStream stream containing the private key data, * format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} * @param options set of options to configure the router with, if null will use System properties - * @throws I2PSessionException if there is a problem loading the private keys or + * @throws I2PSessionException if there is a problem loading the private keys */ - public I2PSessionImpl2(I2PAppContext ctx, InputStream destKeyStream, Properties options) throws I2PSessionException { + protected I2PSessionImpl2(I2PAppContext ctx, InputStream destKeyStream, Properties options) throws I2PSessionException { super(ctx, destKeyStream, options); _sendingStates = new ConcurrentHashMap(32); _sendMessageNonce = new AtomicLong(); @@ -94,6 +96,26 @@ class I2PSessionImpl2 extends I2PSessionImpl { _context.statManager().createRateStat("i2cp.tx.msgExpanded", "size before compression", "i2cp", new long[] { 30*60*1000 }); } + /* + * For extension by SubSession via I2PSessionMuxedImpl + * + * @param destKeyStream stream containing the private key data, + * format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} + * @param options set of options to configure the router with, if null will use System properties + * @since 0.9.19 + */ + protected I2PSessionImpl2(I2PSessionImpl primary, InputStream destKeyStream, Properties options) throws I2PSessionException { + super(primary, destKeyStream, options); + _sendingStates = new ConcurrentHashMap(32); + _sendMessageNonce = new AtomicLong(); + _noEffort = "none".equals(getOptions().getProperty(I2PClient.PROP_RELIABILITY, "").toLowerCase(Locale.US)); + _context.statManager().createRateStat("i2cp.receiveStatusTime.1", "How long it took to get status=1 back", "i2cp", new long[] { 10*60*1000 }); + _context.statManager().createRateStat("i2cp.receiveStatusTime.4", "How long it took to get status=4 back", "i2cp", new long[] { 10*60*1000 }); + _context.statManager().createRateStat("i2cp.receiveStatusTime.5", "How long it took to get status=5 back", "i2cp", new long[] { 10*60*1000 }); + _context.statManager().createRateStat("i2cp.tx.msgCompressed", "compressed size transferred", "i2cp", new long[] { 30*60*1000 }); + _context.statManager().createRateStat("i2cp.tx.msgExpanded", "size before compression", "i2cp", new long[] { 30*60*1000 }); + } + /** * Fire up a periodic task to check for unclaimed messages * @since 0.9.14 diff --git a/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java b/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java index 94eb6ae45..f4ae207e1 100644 --- a/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java @@ -82,6 +82,24 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 { // discards the one in super(), sorry about that... (no it wasn't started yet) _availabilityNotifier = new MuxedAvailabilityNotifier(); } + + /* + * For extension by SubSession + * + * @param destKeyStream stream containing the private key data, + * format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} + * @param options set of options to configure the router with, if null will use System properties + * @since 0.9.19 + */ + protected I2PSessionMuxedImpl(I2PSessionImpl primary, InputStream destKeyStream, Properties options) throws I2PSessionException { + super(primary, destKeyStream, options); + // also stored in _sessionListener but we keep it in _demultipexer + // as well so we don't have to keep casting + _demultiplexer = new I2PSessionDemultiplexer(primary.getContext()); + super.setSessionListener(_demultiplexer); + // discards the one in super(), sorry about that... (no it wasn't started yet) + _availabilityNotifier = new MuxedAvailabilityNotifier(); + } /** listen on all protocols and ports */ @Override @@ -315,9 +333,9 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 { protected class MuxedAvailabilityNotifier extends AvailabilityNotifier { private final LinkedBlockingQueue _msgs; - private volatile boolean _alive = false; + private volatile boolean _alive; private static final int POISON_SIZE = -99999; - private final AtomicBoolean stopping = new AtomicBoolean(false); + private final AtomicBoolean stopping = new AtomicBoolean(); public MuxedAvailabilityNotifier() { _msgs = new LinkedBlockingQueue(); @@ -325,12 +343,12 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 { @Override public void stopNotifying() { - boolean again = true; synchronized (stopping) { if( !stopping.getAndSet(true)) { - if (_alive == true) { + _msgs.clear(); + if (_alive) { // System.out.println("I2PSessionMuxedImpl.stopNotifying()"); - _msgs.clear(); + boolean again = true; while(again) { try { _msgs.put(new MsgData(0, POISON_SIZE, 0, 0, 0)); @@ -340,8 +358,8 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 { continue; } } + _alive = false; } - _alive = false; stopping.set(false); } // stopping.notifyAll(); @@ -355,17 +373,24 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 { try { _msgs.put(new MsgData((int)(msgId & 0xffffffff), size, proto, fromPort, toPort)); } catch (InterruptedException ie) {} + if (!_alive && _log.shouldLog(Log.WARN)) + _log.warn(getPrefix() + "message available but notifier not running"); } @Override public void run() { - MsgData msg; + if (_log.shouldLog(Log.DEBUG)) + _log.debug(getPrefix() + "starting muxed availability notifier"); + _msgs.clear(); _alive=true; while (_alive) { + MsgData msg; try { msg = _msgs.take(); } catch (InterruptedException ie) { - _log.debug("I2PSessionMuxedImpl.run() InterruptedException " + String.valueOf(_msgs.size()) + " Messages, Alive " + _alive); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("I2PSessionMuxedImpl.run() InterruptedException " + + String.valueOf(_msgs.size()) + " Messages, Alive " + _alive); continue; } if (msg.size == POISON_SIZE) { diff --git a/core/java/src/net/i2p/client/MessagePayloadMessageHandler.java b/core/java/src/net/i2p/client/MessagePayloadMessageHandler.java index 49cc2c45b..39dc99403 100644 --- a/core/java/src/net/i2p/client/MessagePayloadMessageHandler.java +++ b/core/java/src/net/i2p/client/MessagePayloadMessageHandler.java @@ -33,7 +33,7 @@ class MessagePayloadMessageHandler extends HandlerImpl { public void handleMessage(I2CPMessage message, I2PSessionImpl session) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("Handle message " + message); + _log.debug("Handle message " + message + " for session " + session); try { MessagePayloadMessage msg = (MessagePayloadMessage) message; long id = msg.getMessageId(); diff --git a/core/java/src/net/i2p/client/RequestLeaseSetMessageHandler.java b/core/java/src/net/i2p/client/RequestLeaseSetMessageHandler.java index e6e9965a8..e6729a2e4 100644 --- a/core/java/src/net/i2p/client/RequestLeaseSetMessageHandler.java +++ b/core/java/src/net/i2p/client/RequestLeaseSetMessageHandler.java @@ -88,9 +88,8 @@ class RequestLeaseSetMessageHandler extends HandlerImpl { String sspk = session.getOptions().getProperty("i2cp.leaseSetSigningPrivateKey"); PrivateKey privKey = null; SigningPrivateKey signingPrivKey = null; - boolean useOldKeys; if (spk != null && sspk != null) { - useOldKeys = true; + boolean useOldKeys = true; int colon = sspk.indexOf(':'); SigType type = dest.getSigType(); if (colon > 0) { @@ -111,6 +110,7 @@ class RequestLeaseSetMessageHandler extends HandlerImpl { signingPrivKey.fromBase64(sspk); } catch (DataFormatException iae) { useOldKeys = false; + signingPrivKey = null; } } if (useOldKeys) { @@ -118,20 +118,36 @@ class RequestLeaseSetMessageHandler extends HandlerImpl { privKey = new PrivateKey(); privKey.fromBase64(spk); } catch (DataFormatException iae) { - useOldKeys = false; + privKey = null; } } - } else { - useOldKeys = false; } - if (useOldKeys) - li = new LeaseInfo(privKey, signingPrivKey); - else + if (privKey == null && !_existingLeaseSets.isEmpty()) { + // look for keypair from another dest using same pubkey + PublicKey pk = dest.getPublicKey(); + for (Map.Entry e : _existingLeaseSets.entrySet()) { + if (pk.equals(e.getKey().getPublicKey())) { + privKey = e.getValue().getPrivateKey(); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Creating new leaseInfo keys for " + dest + " with private key from " + e.getKey()); + break; + } + } + } + if (privKey != null) { + if (signingPrivKey != null) { + li = new LeaseInfo(privKey, signingPrivKey); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Creating new leaseInfo keys for " + dest + " WITH configured private keys"); + } else { + li = new LeaseInfo(privKey, dest); + } + } else { li = new LeaseInfo(dest); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Creating new leaseInfo keys for " + dest + " without configured private keys"); + } _existingLeaseSets.put(dest, li); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Creating new leaseInfo keys for " - + dest + " using configured private keys? " + useOldKeys); } else { if (_log.shouldLog(Log.DEBUG)) _log.debug("Caching the old leaseInfo keys for " @@ -178,6 +194,9 @@ class RequestLeaseSetMessageHandler extends HandlerImpl { private final SigningPublicKey _signingPubKey; private final SigningPrivateKey _signingPrivKey; + /** + * New keys + */ public LeaseInfo(Destination dest) { SimpleDataStructure encKeys[] = KeyGenerator.getInstance().generatePKIKeys(); // must be same type as the Destination's signing key @@ -194,6 +213,7 @@ class RequestLeaseSetMessageHandler extends HandlerImpl { } /** + * Existing keys * @since 0.9.18 */ public LeaseInfo(PrivateKey privKey, SigningPrivateKey signingPrivKey) { @@ -203,6 +223,23 @@ class RequestLeaseSetMessageHandler extends HandlerImpl { _signingPrivKey = signingPrivKey; } + /** + * Existing crypto key, new signing key + * @since 0.9.20 + */ + public LeaseInfo(PrivateKey privKey, Destination dest) { + SimpleDataStructure signKeys[]; + try { + signKeys = KeyGenerator.getInstance().generateSigningKeys(dest.getSigningPublicKey().getType()); + } catch (GeneralSecurityException gse) { + throw new IllegalStateException(gse); + } + _pubKey = KeyGenerator.getPublicKey(privKey); + _privKey = privKey; + _signingPubKey = (SigningPublicKey) signKeys[0]; + _signingPrivKey = (SigningPrivateKey) signKeys[1]; + } + public PublicKey getPublicKey() { return _pubKey; } diff --git a/core/java/src/net/i2p/client/SubSession.java b/core/java/src/net/i2p/client/SubSession.java new file mode 100644 index 000000000..2a2882335 --- /dev/null +++ b/core/java/src/net/i2p/client/SubSession.java @@ -0,0 +1,321 @@ +package net.i2p.client; + +/* + * free (adj.): unencumbered; not under the control of others + * Written by jrandom in 2003 and released into the public domain + * with no warranty of any kind, either expressed or implied. + * It probably won't make your computer catch on fire, or eat + * your children, but it might. Use at your own risk. + * + */ + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import net.i2p.I2PAppContext; +import net.i2p.data.Destination; +import net.i2p.data.Hash; +import net.i2p.data.PrivateKey; +import net.i2p.data.SigningPrivateKey; +import net.i2p.data.i2cp.CreateLeaseSetMessage; +import net.i2p.data.i2cp.CreateSessionMessage; +import net.i2p.data.i2cp.I2CPMessage; +import net.i2p.data.i2cp.SessionId; +import net.i2p.util.I2PAppThread; + +/** + * An additional session using another session's connection. + * + * A subsession uses the same connection to the router as the primary session, + * but has a different Destination. It uses the same tunnels as the primary + * but has its own leaseset. It must use the same encryption keys as the primary + * so that garlic encryption/decryption works. + * + * The message handler map and message producer are reused from primary. + * + * Does NOT reuse the session listener ???? + * + * While the I2CP protocol, in theory, allows for fully independent sessions + * over the same I2CP connection, this is not currently supported by the router. + * + * @since 0.9.19 + */ +class SubSession extends I2PSessionMuxedImpl { + private final I2PSessionMuxedImpl _primary; + + /** + * @param primary must be a I2PSessionMuxedImpl + */ + public SubSession(I2PSession primary, InputStream destKeyStream, Properties options) throws I2PSessionException { + super((I2PSessionMuxedImpl)primary, destKeyStream, options); + _primary = (I2PSessionMuxedImpl) primary; + if (!getDecryptionKey().equals(_primary.getDecryptionKey())) + throw new I2PSessionException("encryption key mismatch"); + if (getPrivateKey().equals(_primary.getPrivateKey())) + throw new I2PSessionException("signing key must differ"); + // state management + } + + /** + * Unsupported in a subsession. + * @throws UnsupportedOperationException always + * @since 0.9.19 + */ + @Override + public I2PSession addSubsession(InputStream destKeyStream, Properties opts) throws I2PSessionException { + throw new UnsupportedOperationException(); + } + + /** + * Unsupported in a subsession. + * Does nothing. + * @since 0.9.19 + */ + @Override + public void removeSubsession(I2PSession session) {} + + /** + * Unsupported in a subsession. + * @return empty list always + * @since 0.9.19 + */ + @Override + public List getSubsessions() { + return Collections.emptyList(); + } + + /** + * Does nothing for now + */ + @Override + public void updateOptions(Properties options) {} + + /** + * Connect to the router and establish a session. This call blocks until + * a session is granted. + * + * Should be threadsafe, other threads will block until complete. + * Disconnect / destroy from another thread may be called simultaneously and + * will (should?) interrupt the connect. + * + * @throws I2PSessionException if there is a configuration error or the router is + * not reachable + */ + @Override + public void connect() throws I2PSessionException { + synchronized(_stateLock) { + if (_state != State.OPEN) { + _state = State.OPENING; + } + } + _primary.connect(); + synchronized(_stateLock) { + if (_state != State.OPEN) { + Thread notifier = new I2PAppThread(_availabilityNotifier, "ClientNotifier " + getPrefix(), true); + notifier.start(); + _state = State.OPEN; + } + } + } + + /** + * Has the session been closed (or not yet connected)? + * False when open and during transitions. + */ + @Override + public boolean isClosed() { + // FIXME + return super.isClosed() || _primary.isClosed(); + } + + /** + * Deliver an I2CP message to the router + * May block for several seconds if the write queue to the router is full + * + * @throws I2PSessionException if the message is malformed or there is an error writing it out + */ + @Override + void sendMessage(I2CPMessage message) throws I2PSessionException { + // workaround for now, as primary will send out our CreateSession + // from his connect, while we are still closed. + // If we did it in connect() we wouldn't need this + if (isClosed() && + message.getType() != CreateSessionMessage.MESSAGE_TYPE && + message.getType() != CreateLeaseSetMessage.MESSAGE_TYPE) + throw new I2PSessionException("Already closed"); + _primary.sendMessage(message); + } + + /** + * Pass off the error to the listener + * Misspelled, oh well. + * @param error non-null + */ + @Override + void propogateError(String msg, Throwable error) { + _primary.propogateError(msg, error); + if (_sessionListener != null) _sessionListener.errorOccurred(this, msg, error); + } + + /** + * Tear down the session, and do NOT reconnect. + * + * Blocks if session has not been fully started. + */ + @Override + public void destroySession() { + _primary.destroySession(); + if (_availabilityNotifier != null) + _availabilityNotifier.stopNotifying(); + if (_sessionListener != null) _sessionListener.disconnected(this); + changeState(State.CLOSED); + } + + /** + * Will interrupt a connect in progress. + */ + @Override + protected void disconnect() { + _primary.disconnect(); + } + + @Override + protected boolean reconnect() { + return _primary.reconnect(); + } + + /** + * Called by the message handler + * on reception of DestReplyMessage + * + * This will never happen, as the dest reply message does not contain a session ID. + */ + @Override + void destReceived(Destination d) { + _primary.destReceived(d); + } + + /** + * Called by the message handler + * on reception of DestReplyMessage + * + * This will never happen, as the dest reply message does not contain a session ID. + * + * @param h non-null + */ + @Override + void destLookupFailed(Hash h) { + _primary.destLookupFailed(h); + } + + /** + * Called by the message handler + * on reception of HostReplyMessage + * @param d non-null + */ + void destReceived(long nonce, Destination d) { + _primary.destReceived(nonce, d); + } + + /** + * Called by the message handler + * on reception of HostReplyMessage + */ + @Override + void destLookupFailed(long nonce) { + _primary.destLookupFailed(nonce); + } + + /** + * Called by the message handler. + * This will never happen, as the bw limits message does not contain a session ID. + */ + @Override + void bwReceived(int[] i) { + _primary.bwReceived(i); + } + + /** + * Blocking. Waits a max of 10 seconds by default. + * See lookupDest with maxWait parameter to change. + * Implemented in 0.8.3 in I2PSessionImpl; + * previously was available only in I2PSimpleSession. + * Multiple outstanding lookups are now allowed. + * @return null on failure + */ + @Override + public Destination lookupDest(Hash h) throws I2PSessionException { + return _primary.lookupDest(h); + } + + /** + * Blocking. + * @param maxWait ms + * @return null on failure + */ + @Override + public Destination lookupDest(Hash h, long maxWait) throws I2PSessionException { + return _primary.lookupDest(h, maxWait); + } + + /** + * Ask the router to lookup a Destination by host name. + * Blocking. Waits a max of 10 seconds by default. + * + * This only makes sense for a b32 hostname, OR outside router context. + * Inside router context, just query the naming service. + * Outside router context, this does NOT query the context naming service. + * Do that first if you expect a local addressbook. + * + * This will log a warning for non-b32 in router context. + * + * See interface for suggested implementation. + * + * Requires router side to be 0.9.11 or higher. If the router is older, + * this will return null immediately. + */ + @Override + public Destination lookupDest(String name) throws I2PSessionException { + return _primary.lookupDest(name); + } + + /** + * Ask the router to lookup a Destination by host name. + * Blocking. See above for details. + * @param maxWait ms + * @return null on failure + */ + @Override + public Destination lookupDest(String name, long maxWait) throws I2PSessionException { + return _primary.lookupDest(name, maxWait); + } + + /** + * This may not work???????????, as the reply does not contain a session ID, so + * it won't be routed back to us? + */ + @Override + public int[] bandwidthLimits() throws I2PSessionException { + return _primary.bandwidthLimits(); + } + + @Override + protected void updateActivity() { + _primary.updateActivity(); + } + + @Override + public long lastActivity() { + return _primary.lastActivity(); + } + + @Override + public void setReduced() { + _primary.setReduced(); + } +} diff --git a/core/java/src/net/i2p/data/i2cp/CreateLeaseSetMessage.java b/core/java/src/net/i2p/data/i2cp/CreateLeaseSetMessage.java index 6477768c5..0cbb854ff 100644 --- a/core/java/src/net/i2p/data/i2cp/CreateLeaseSetMessage.java +++ b/core/java/src/net/i2p/data/i2cp/CreateLeaseSetMessage.java @@ -38,6 +38,11 @@ public class CreateLeaseSetMessage extends I2CPMessageImpl { return _sessionId; } + @Override + public SessionId sessionId() { + return _sessionId; + } + public void setSessionId(SessionId id) { _sessionId = id; } diff --git a/core/java/src/net/i2p/data/i2cp/DestroySessionMessage.java b/core/java/src/net/i2p/data/i2cp/DestroySessionMessage.java index b67f5fa6f..4275863e1 100644 --- a/core/java/src/net/i2p/data/i2cp/DestroySessionMessage.java +++ b/core/java/src/net/i2p/data/i2cp/DestroySessionMessage.java @@ -32,6 +32,16 @@ public class DestroySessionMessage extends I2CPMessageImpl { return _sessionId; } + /** + * Return the SessionId for this message. + * + * @since 0.9.19 + */ + @Override + public SessionId sessionId() { + return _sessionId; + } + public void setSessionId(SessionId id) { _sessionId = id; } diff --git a/core/java/src/net/i2p/data/i2cp/HostLookupMessage.java b/core/java/src/net/i2p/data/i2cp/HostLookupMessage.java index 1a8fcfe86..c4405f90a 100644 --- a/core/java/src/net/i2p/data/i2cp/HostLookupMessage.java +++ b/core/java/src/net/i2p/data/i2cp/HostLookupMessage.java @@ -76,6 +76,16 @@ public class HostLookupMessage extends I2CPMessageImpl { return _sessionId; } + /** + * Return the SessionId for this message. + * + * @since 0.9.19 + */ + @Override + public SessionId sessionId() { + return _sessionId; + } + /** * @return 0 to 2**32 - 1 */ diff --git a/core/java/src/net/i2p/data/i2cp/HostReplyMessage.java b/core/java/src/net/i2p/data/i2cp/HostReplyMessage.java index 37faaa276..b350f225a 100644 --- a/core/java/src/net/i2p/data/i2cp/HostReplyMessage.java +++ b/core/java/src/net/i2p/data/i2cp/HostReplyMessage.java @@ -73,6 +73,16 @@ public class HostReplyMessage extends I2CPMessageImpl { return _sessionId; } + /** + * Return the SessionId for this message. + * + * @since 0.9.19 + */ + @Override + public SessionId sessionId() { + return _sessionId; + } + /** * @return 0 to 2**32 - 1 */ diff --git a/core/java/src/net/i2p/data/i2cp/I2CPMessage.java b/core/java/src/net/i2p/data/i2cp/I2CPMessage.java index 92c4f6163..58d312013 100644 --- a/core/java/src/net/i2p/data/i2cp/I2CPMessage.java +++ b/core/java/src/net/i2p/data/i2cp/I2CPMessage.java @@ -60,9 +60,20 @@ public interface I2CPMessage extends DataStructure { public void writeMessage(OutputStream out) throws I2CPMessageException, IOException; /** - * Return the unique identifier for this type of APIMessage, as specified in the + * Return the unique identifier for this type of message, as specified in the * network specification document under #ClientAccessLayerMessages - * @return unique identifier for this type of APIMessage + * @return unique identifier for this type of message */ public int getType(); -} \ No newline at end of file + + /** + * Return the SessionId for this type of message. + * Most but not all message types include a SessionId. + * The ones that do already define getSessionId(), but some return a SessionId and + * some return a long, so we define a new method here. + * + * @return SessionId or null if this message type does not include a SessionId + * @since 0.9.19 + */ + public SessionId sessionId(); +} diff --git a/core/java/src/net/i2p/data/i2cp/I2CPMessageException.java b/core/java/src/net/i2p/data/i2cp/I2CPMessageException.java index 63d13b040..b2e38c21e 100644 --- a/core/java/src/net/i2p/data/i2cp/I2CPMessageException.java +++ b/core/java/src/net/i2p/data/i2cp/I2CPMessageException.java @@ -12,7 +12,7 @@ package net.i2p.data.i2cp; import net.i2p.I2PException; /** - * Represent an error serializing or deserializing an APIMessage + * Represent an error serializing or deserializing a message * * @author jrandom */ diff --git a/core/java/src/net/i2p/data/i2cp/I2CPMessageImpl.java b/core/java/src/net/i2p/data/i2cp/I2CPMessageImpl.java index 3ace5df9d..19b8d1cc3 100644 --- a/core/java/src/net/i2p/data/i2cp/I2CPMessageImpl.java +++ b/core/java/src/net/i2p/data/i2cp/I2CPMessageImpl.java @@ -127,4 +127,15 @@ public abstract class I2CPMessageImpl extends DataStructureImpl implements I2CPM throw new DataFormatException("Error writing the message", ime); } } + + /** + * Return the SessionId for this type of message. + * Most but not all message types include a SessionId. + * The ones that do already define getSessionId(), but some return a SessionId and + * some return a long, so we define a new method here. + * + * @return null always. Extending classes with a SessionId must override. + * @since 0.9.19 + */ + public SessionId sessionId() { return null; } } diff --git a/core/java/src/net/i2p/data/i2cp/MessagePayloadMessage.java b/core/java/src/net/i2p/data/i2cp/MessagePayloadMessage.java index 6d0c4daac..e5969a0a8 100644 --- a/core/java/src/net/i2p/data/i2cp/MessagePayloadMessage.java +++ b/core/java/src/net/i2p/data/i2cp/MessagePayloadMessage.java @@ -37,6 +37,16 @@ public class MessagePayloadMessage extends I2CPMessageImpl { return _sessionId; } + /** + * Return the SessionId for this message. + * + * @since 0.9.19 + */ + @Override + public SessionId sessionId() { + return _sessionId >= 0 ? new SessionId(_sessionId) : null; + } + /** @param id 0-65535 */ public void setSessionId(long id) { _sessionId = (int) id; diff --git a/core/java/src/net/i2p/data/i2cp/MessageStatusMessage.java b/core/java/src/net/i2p/data/i2cp/MessageStatusMessage.java index dd4f43054..182caca9e 100644 --- a/core/java/src/net/i2p/data/i2cp/MessageStatusMessage.java +++ b/core/java/src/net/i2p/data/i2cp/MessageStatusMessage.java @@ -193,6 +193,16 @@ public class MessageStatusMessage extends I2CPMessageImpl { return _sessionId; } + /** + * Return the SessionId for this message. + * + * @since 0.9.19 + */ + @Override + public SessionId sessionId() { + return _sessionId >= 0 ? new SessionId(_sessionId) : null; + } + /** @param id 0-65535 */ public void setSessionId(long id) { _sessionId = (int) id; @@ -275,6 +285,12 @@ public class MessageStatusMessage extends I2CPMessageImpl { return "GUARANTEED SUCCESS "; case STATUS_SEND_SUCCESS_LOCAL: return "LOCAL SUCCESS "; + case STATUS_SEND_BEST_EFFORT_FAILURE: + return "PROBABLE FAILURE "; + case STATUS_SEND_FAILURE_NO_TUNNELS: + return "NO LOCAL TUNNELS "; + case STATUS_SEND_FAILURE_NO_LEASESET: + return "LEASESET NOT FOUND "; default: return "SEND FAILURE CODE: " + status; } diff --git a/core/java/src/net/i2p/data/i2cp/ReceiveMessageBeginMessage.java b/core/java/src/net/i2p/data/i2cp/ReceiveMessageBeginMessage.java index cc685cbab..744dfd356 100644 --- a/core/java/src/net/i2p/data/i2cp/ReceiveMessageBeginMessage.java +++ b/core/java/src/net/i2p/data/i2cp/ReceiveMessageBeginMessage.java @@ -36,6 +36,16 @@ public class ReceiveMessageBeginMessage extends I2CPMessageImpl { return _sessionId; } + /** + * Return the SessionId for this message. + * + * @since 0.9.19 + */ + @Override + public SessionId sessionId() { + return _sessionId >= 0 ? new SessionId(_sessionId) : null; + } + /** @param id 0-65535 */ public void setSessionId(long id) { _sessionId = (int) id; diff --git a/core/java/src/net/i2p/data/i2cp/ReceiveMessageEndMessage.java b/core/java/src/net/i2p/data/i2cp/ReceiveMessageEndMessage.java index c405b93a3..5be2bbfeb 100644 --- a/core/java/src/net/i2p/data/i2cp/ReceiveMessageEndMessage.java +++ b/core/java/src/net/i2p/data/i2cp/ReceiveMessageEndMessage.java @@ -35,6 +35,16 @@ public class ReceiveMessageEndMessage extends I2CPMessageImpl { return _sessionId; } + /** + * Return the SessionId for this message. + * + * @since 0.9.19 + */ + @Override + public SessionId sessionId() { + return _sessionId >= 0 ? new SessionId(_sessionId) : null; + } + /** @param id 0-65535 */ public void setSessionId(long id) { _sessionId = (int) id; diff --git a/core/java/src/net/i2p/data/i2cp/ReconfigureSessionMessage.java b/core/java/src/net/i2p/data/i2cp/ReconfigureSessionMessage.java index 01130670a..8f80f531b 100644 --- a/core/java/src/net/i2p/data/i2cp/ReconfigureSessionMessage.java +++ b/core/java/src/net/i2p/data/i2cp/ReconfigureSessionMessage.java @@ -33,6 +33,16 @@ public class ReconfigureSessionMessage extends I2CPMessageImpl { return _sessionId; } + /** + * Return the SessionId for this message. + * + * @since 0.9.19 + */ + @Override + public SessionId sessionId() { + return _sessionId; + } + public void setSessionId(SessionId id) { _sessionId = id; } diff --git a/core/java/src/net/i2p/data/i2cp/ReportAbuseMessage.java b/core/java/src/net/i2p/data/i2cp/ReportAbuseMessage.java index 76311ae6e..cfdbd6cfe 100644 --- a/core/java/src/net/i2p/data/i2cp/ReportAbuseMessage.java +++ b/core/java/src/net/i2p/data/i2cp/ReportAbuseMessage.java @@ -35,6 +35,16 @@ public class ReportAbuseMessage extends I2CPMessageImpl { return _sessionId; } + /** + * Return the SessionId for this message. + * + * @since 0.9.19 + */ + @Override + public SessionId sessionId() { + return _sessionId; + } + public void setSessionId(SessionId id) { _sessionId = id; } diff --git a/core/java/src/net/i2p/data/i2cp/RequestLeaseSetMessage.java b/core/java/src/net/i2p/data/i2cp/RequestLeaseSetMessage.java index 1e9b2dcf6..15d9b70ec 100644 --- a/core/java/src/net/i2p/data/i2cp/RequestLeaseSetMessage.java +++ b/core/java/src/net/i2p/data/i2cp/RequestLeaseSetMessage.java @@ -45,6 +45,16 @@ public class RequestLeaseSetMessage extends I2CPMessageImpl { return _sessionId; } + /** + * Return the SessionId for this message. + * + * @since 0.9.19 + */ + @Override + public SessionId sessionId() { + return _sessionId; + } + public void setSessionId(SessionId id) { _sessionId = id; } diff --git a/core/java/src/net/i2p/data/i2cp/RequestVariableLeaseSetMessage.java b/core/java/src/net/i2p/data/i2cp/RequestVariableLeaseSetMessage.java index 160e193dc..3edd06171 100644 --- a/core/java/src/net/i2p/data/i2cp/RequestVariableLeaseSetMessage.java +++ b/core/java/src/net/i2p/data/i2cp/RequestVariableLeaseSetMessage.java @@ -55,6 +55,16 @@ public class RequestVariableLeaseSetMessage extends I2CPMessageImpl { return _sessionId; } + /** + * Return the SessionId for this message. + * + * @since 0.9.19 + */ + @Override + public SessionId sessionId() { + return _sessionId; + } + public void setSessionId(SessionId id) { _sessionId = id; } diff --git a/core/java/src/net/i2p/data/i2cp/SendMessageMessage.java b/core/java/src/net/i2p/data/i2cp/SendMessageMessage.java index 18f6c6df3..4515f3872 100644 --- a/core/java/src/net/i2p/data/i2cp/SendMessageMessage.java +++ b/core/java/src/net/i2p/data/i2cp/SendMessageMessage.java @@ -38,6 +38,16 @@ public class SendMessageMessage extends I2CPMessageImpl { return _sessionId; } + /** + * Return the SessionId for this message. + * + * @since 0.9.19 + */ + @Override + public SessionId sessionId() { + return _sessionId; + } + public void setSessionId(SessionId id) { _sessionId = id; } diff --git a/core/java/src/net/i2p/data/i2cp/SessionStatusMessage.java b/core/java/src/net/i2p/data/i2cp/SessionStatusMessage.java index 5af2a649b..122cf5d1e 100644 --- a/core/java/src/net/i2p/data/i2cp/SessionStatusMessage.java +++ b/core/java/src/net/i2p/data/i2cp/SessionStatusMessage.java @@ -42,6 +42,16 @@ public class SessionStatusMessage extends I2CPMessageImpl { return _sessionId; } + /** + * Return the SessionId for this message. + * + * @since 0.9.19 + */ + @Override + public SessionId sessionId() { + return _sessionId; + } + public void setSessionId(SessionId id) { _sessionId = id; } diff --git a/router/java/src/net/i2p/router/TunnelManagerFacade.java b/router/java/src/net/i2p/router/TunnelManagerFacade.java index 2a35d0c97..a95fbd1f1 100644 --- a/router/java/src/net/i2p/router/TunnelManagerFacade.java +++ b/router/java/src/net/i2p/router/TunnelManagerFacade.java @@ -146,6 +146,21 @@ public interface TunnelManagerFacade extends Service { * */ public void buildTunnels(Destination client, ClientTunnelSettings settings); + + /** + * Add another destination to the same tunnels. + * Must have same encryption key an a different signing key. + * @throws IllegalArgumentException if not + * @return success + * @since 0.9.19 + */ + public boolean addAlias(Destination dest, ClientTunnelSettings settings, Destination existingClient); + + /** + * Remove another destination to the same tunnels. + * @since 0.9.19 + */ + public void removeAlias(Destination dest); public TunnelPoolSettings getInboundSettings(); public TunnelPoolSettings getOutboundSettings(); diff --git a/router/java/src/net/i2p/router/TunnelPoolSettings.java b/router/java/src/net/i2p/router/TunnelPoolSettings.java index 329cf5eae..315aeefb8 100644 --- a/router/java/src/net/i2p/router/TunnelPoolSettings.java +++ b/router/java/src/net/i2p/router/TunnelPoolSettings.java @@ -1,11 +1,13 @@ package net.i2p.router; +import java.util.Set; import java.util.Locale; import java.util.Map; import java.util.Properties; import net.i2p.data.Base64; import net.i2p.data.Hash; +import net.i2p.util.ConcurrentHashSet; import net.i2p.util.NativeBigInteger; import net.i2p.util.RandomSource; import net.i2p.util.SystemVersion; @@ -31,6 +33,8 @@ public class TunnelPoolSettings { private final Properties _unknownOptions; private Hash _randomKey; private int _priority; + private final Set _aliases; + private Hash _aliasOf; /** prefix used to override the router's defaults for clients */ // unimplemented @@ -119,6 +123,10 @@ public class TunnelPoolSettings { _randomKey = generateRandomKey(); if (_isExploratory && !_isInbound) _priority = EXPLORATORY_PRIORITY; + if (!_isExploratory) + _aliases = new ConcurrentHashSet(4); + else + _aliases = null; } /** how many tunnels should be available at all times */ @@ -206,6 +214,34 @@ public class TunnelPoolSettings { /** what destination is this a client tunnel for (or null if exploratory) */ public Hash getDestination() { return _destination; } + + /** + * Other destinations that use the same tunnel (or null if exploratory) + * Modifiable, concurrent, not a copy + * @since 0.9.19 + */ + public Set getAliases() { + return _aliases; + } + + /** + * Other destination that this is an alias of (or null). + * If non-null, don't build tunnels. + * @since 0.9.19 + */ + public Hash getAliasOf() { + return _aliasOf; + } + + + /** + * Set other destination that this is an alias of (or null). + * If non-null, don't build tunnels. + * @since 0.9.19 + */ + public void setAliasOf(Hash h) { + _aliasOf = h; + } /** * random key used for peer ordering @@ -235,7 +271,7 @@ public class TunnelPoolSettings { public int getPriority() { return _priority; } public Properties getUnknownOptions() { return _unknownOptions; } - + /** * Defaults in props are NOT honored. * In-JVM client side must promote defaults to the primary map. diff --git a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java index 10a480c78..9cfa4da19 100644 --- a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java +++ b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java @@ -16,6 +16,7 @@ import java.io.OutputStream; import java.net.Socket; import java.util.concurrent.ConcurrentHashMap; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Map; @@ -39,6 +40,7 @@ import net.i2p.data.i2cp.SendMessageMessage; import net.i2p.data.i2cp.SendMessageExpiresMessage; import net.i2p.data.i2cp.SessionConfig; import net.i2p.data.i2cp.SessionId; +import net.i2p.data.i2cp.SessionStatusMessage; import net.i2p.router.Job; import net.i2p.router.JobImpl; import net.i2p.router.RouterContext; @@ -51,6 +53,9 @@ import net.i2p.util.SimpleTimer; /** * Bridge the router and the client - managing state for a client. * + * As of release 0.9.19, multiple sessions are supported on a single + * I2CP connection. These sessions share tunnels and some configuration. + * * @author jrandom */ class ClientConnectionRunner { @@ -61,21 +66,16 @@ class ClientConnectionRunner { private final Socket _socket; /** output stream of the socket that I2CP messages bound to the client should be written to */ private OutputStream _out; - /** session ID of the current client */ - private SessionId _sessionId; - /** user's config */ - private SessionConfig _config; + + private final ConcurrentHashMap _sessions; + private String _clientVersion; /** * Mapping of MessageId to Payload, storing messages for retrieval. * Unused for i2cp.fastReceive = "true" (_dontSendMSMOnRecive = true) */ private final Map _messages; - /** lease set request state, or null if there is no request pending on at the moment */ - private LeaseRequestState _leaseRequest; private int _consecutiveLeaseRequestFails; - /** currently allocated leaseSet, or null if none is allocated */ - private LeaseSet _currentLeaseSet; /** * Set of messageIds created but not yet ACCEPTED. * Unused for i2cp.messageReliability = "none" (_dontSendMSM = true) @@ -83,7 +83,7 @@ class ClientConnectionRunner { private final Set _acceptedPending; /** thingy that does stuff */ protected I2CPMessageReader _reader; - /** just for this destination */ + /** Used for all sessions, which must all have the same crypto keys */ private SessionKeyManager _sessionKeyManager; /** * This contains the last 10 MessageIds that have had their (non-ack) status @@ -91,7 +91,6 @@ class ClientConnectionRunner { */ private final List _alreadyProcessed; private ClientWriterRunner _writer; - private Hash _destHashCache; /** are we, uh, dead */ private volatile boolean _dead; /** For outbound traffic. true if i2cp.messageReliability = "none"; @since 0.8.1 */ @@ -108,11 +107,30 @@ class ClientConnectionRunner { private static final int MAX_LEASE_FAILS = 5; private static final int BUF_SIZE = 32*1024; + private static final int MAX_SESSIONS = 4; /** @since 0.9.2 */ private static final String PROP_TAGS = "crypto.tagsToSend"; private static final String PROP_THRESH = "crypto.lowTagThreshold"; + /** + * For multisession + * @since 0.9.19 + */ + private static class SessionParams { + final Destination dest; + final boolean isPrimary; + SessionId sessionId; + SessionConfig config; + LeaseRequestState leaseRequest; + LeaseSet currentLeaseSet; + + SessionParams(Destination d, boolean isPrimary) { + dest = d; + this.isPrimary = isPrimary; + } + } + /** * Create a new runner against the given socket * @@ -124,6 +142,7 @@ class ClientConnectionRunner { _socket = socket; // unused for fastReceive _messages = new ConcurrentHashMap(); + _sessions = new ConcurrentHashMap(4); _alreadyProcessed = new ArrayList(); _acceptedPending = new ConcurrentHashSet(); _messageId = new AtomicInteger(_context.random().nextInt()); @@ -168,8 +187,7 @@ class ClientConnectionRunner { // router may be null in unit tests if ((_context.router() == null || _context.router().isAlive()) && _log.shouldWarn()) - _log.warn("Stop the I2CP connection! current leaseSet: " - + _currentLeaseSet, new Exception("Stop client connection")); + _log.warn("Stop the I2CP connection!", new Exception("Stop client connection")); _dead = true; // we need these keys to unpublish the leaseSet if (_reader != null) _reader.stopReading(); @@ -180,21 +198,56 @@ class ClientConnectionRunner { if (_sessionKeyManager != null) _sessionKeyManager.shutdown(); _manager.unregisterConnection(this); - if (_currentLeaseSet != null) - _context.netDb().unpublish(_currentLeaseSet); - _leaseRequest = null; + for (SessionParams sp : _sessions.values()) { + LeaseSet ls = sp.currentLeaseSet; + if (ls != null) + _context.netDb().unpublish(ls); + } synchronized (_alreadyProcessed) { _alreadyProcessed.clear(); } - //_config = null; - //_manager = null; + _sessions.clear(); } /** * Current client's config, - * will be null before session is established + * will be null if session not found + * IS subsession aware. + * @since 0.9.19 added hash param */ - public SessionConfig getConfig() { return _config; } + public SessionConfig getConfig(Hash h) { + SessionParams sp = _sessions.get(h); + if (sp == null) + return null; + return sp.config; + } + + /** + * Current client's config, + * will be null if session not found + * IS subsession aware. + * @since 0.9.19 added id param + */ + public SessionConfig getConfig(SessionId id) { + for (SessionParams sp : _sessions.values()) { + if (id.equals(sp.sessionId)) + return sp.config; + } + return null; + } + + /** + * Primary client's config, + * will be null if session not set up + * @since 0.9.19 + */ + public SessionConfig getPrimaryConfig() { + for (SessionParams sp : _sessions.values()) { + if (sp.isPrimary) + return sp.config; + } + return null; + } /** * The client version. @@ -216,41 +269,186 @@ class ClientConnectionRunner { /** current client's sessionkeymanager */ public SessionKeyManager getSessionKeyManager() { return _sessionKeyManager; } - /** currently allocated leaseSet */ - public LeaseSet getLeaseSet() { return _currentLeaseSet; } - void setLeaseSet(LeaseSet ls) { _currentLeaseSet = ls; } + /** + * Currently allocated leaseSet. + * IS subsession aware. Returns primary leaseset only. + * @return leaseSet or null if not yet set or unknown hash + * @since 0.9.19 added hash parameter + */ + public LeaseSet getLeaseSet(Hash h) { + SessionParams sp = _sessions.get(h); + if (sp == null) + return null; + return sp.currentLeaseSet; + } + + /** + * Currently allocated leaseSet. + * IS subsession aware. + */ + void setLeaseSet(LeaseSet ls) { + Hash h = ls.getDestination().calculateHash(); + SessionParams sp = _sessions.get(h); + if (sp == null) + return; + sp.currentLeaseSet = ls; + } /** * Equivalent to getConfig().getDestination().calculateHash(); * will be null before session is established + * Not subsession aware. Returns random hash from the sessions. + * Don't use if you can help it. + * + * @return primary hash or null if not yet set */ - public Hash getDestHash() { return _destHashCache; } + public Hash getDestHash() { + for (Hash h : _sessions.keySet()) { + return h; + } + return null; + } + + /** + * Return the hash for the given ID + * @return hash or null if unknown + * @since 0.9.19 + */ + public Hash getDestHash(SessionId id) { + for (Map.Entry e : _sessions.entrySet()) { + if (id.equals(e.getValue().sessionId)) + return e.getKey(); + } + return null; + } + + /** + * Return the dest for the given ID + * @return dest or null if unknown + * @since 0.9.19 + */ + public Destination getDestination(SessionId id) { + for (SessionParams sp : _sessions.values()) { + if (id.equals(sp.sessionId)) + return sp.dest; + } + return null; + } /** - * @return current client's sessionId or null if not yet set + * Subsession aware. + * + * @param h the local target + * @return current client's sessionId or null if not yet set or not a valid hash + * @since 0.9.19 */ - SessionId getSessionId() { return _sessionId; } + SessionId getSessionId(Hash h) { + SessionParams sp = _sessions.get(h); + if (sp == null) + return null; + return sp.sessionId; + } + + /** + * Subsession aware. + * + * @return all current client's sessionIds, non-null + * @since 0.9.19 + */ + List getSessionIds() { + List rv = new ArrayList(_sessions.size()); + for (SessionParams sp : _sessions.values()) { + SessionId id = sp.sessionId; + if (id != null) + rv.add(id); + } + return rv; + } + + /** + * Subsession aware. + * + * @return all current client's destinations, non-null + * @since 0.9.19 + */ + List getDestinations() { + List rv = new ArrayList(_sessions.size()); + for (SessionParams sp : _sessions.values()) { + rv.add(sp.dest); + } + return rv; + } /** * To be called only by ClientManager. * + * @param hash for the session * @throws IllegalStateException if already set + * @since 0.9.19 added hash param */ - void setSessionId(SessionId id) { - if (_sessionId != null) + void setSessionId(Hash hash, SessionId id) { + if (hash == null) throw new IllegalStateException(); - _sessionId = id; + SessionParams sp = _sessions.get(hash); + if (sp == null || sp.sessionId != null) + throw new IllegalStateException(); + sp.sessionId = id; + } + + /** + * Kill the session. Caller must kill runner if none left. + * + * @since 0.9.19 + */ + void removeSession(SessionId id) { + boolean isPrimary = false; + for (Iterator iter = _sessions.values().iterator(); iter.hasNext(); ) { + SessionParams sp = iter.next(); + if (id.equals(sp.sessionId)) { + if (_log.shouldLog(Log.INFO)) + _log.info("Destroying client session " + id); + iter.remove(); + // Tell client manger + _manager.unregisterSession(id, sp.dest); + LeaseSet ls = sp.currentLeaseSet; + if (ls != null) + _context.netDb().unpublish(ls); + isPrimary = sp.isPrimary; + } + } + if (isPrimary) { + // kill all the others also + for (SessionParams sp : _sessions.values()) { + _manager.unregisterSession(id, sp.dest); + LeaseSet ls = sp.currentLeaseSet; + if (ls != null) + _context.netDb().unpublish(ls); + } + } } - /** data for the current leaseRequest, or null if there is no active leaseSet request */ - LeaseRequestState getLeaseRequest() { return _leaseRequest; } + /** + * Data for the current leaseRequest, or null if there is no active leaseSet request. + * Not subsession aware. Returns primary ID only. + * @since 0.9.19 added hash param + */ + LeaseRequestState getLeaseRequest(Hash h) { + SessionParams sp = _sessions.get(h); + if (sp == null) + return null; + return sp.leaseRequest; + } /** @param req non-null */ public void failLeaseRequest(LeaseRequestState req) { boolean disconnect = false; + Hash h = req.getRequested().getDestination().calculateHash(); + SessionParams sp = _sessions.get(h); + if (sp == null) + return; synchronized (this) { - if (_leaseRequest == req) { - _leaseRequest = null; + if (sp.leaseRequest == req) { + sp.leaseRequest = null; disconnect = ++_consecutiveLeaseRequestFails > MAX_LEASE_FAILS; } } @@ -291,19 +489,34 @@ class ClientConnectionRunner { * @return SessionStatusMessage return code, 1 for success, != 1 for failure */ public int sessionEstablished(SessionConfig config) { - _destHashCache = config.getDestination().calculateHash(); + Destination dest = config.getDestination(); + Hash destHash = dest.calculateHash(); if (_log.shouldLog(Log.DEBUG)) - _log.debug("SessionEstablished called for destination " + _destHashCache.toBase64()); - _config = config; + _log.debug("SessionEstablished called for destination " + destHash); + if (_sessions.size() > MAX_SESSIONS) + return SessionStatusMessage.STATUS_REFUSED; + boolean isPrimary = _sessions.isEmpty(); + if (!isPrimary) { + // all encryption keys must be the same + for (SessionParams sp : _sessions.values()) { + if (!dest.getPublicKey().equals(sp.dest.getPublicKey())) + return SessionStatusMessage.STATUS_INVALID; + } + } + SessionParams sp = new SessionParams(dest, isPrimary); + sp.config = config; + SessionParams old = _sessions.putIfAbsent(destHash, sp); + if (old != null) + return SessionStatusMessage.STATUS_INVALID; // We process a few options here, but most are handled by the tunnel manager. // The ones here can't be changed later. Properties opts = config.getOptions(); - if (opts != null) { + if (isPrimary && opts != null) { _dontSendMSM = "none".equals(opts.getProperty(I2PClient.PROP_RELIABILITY, "").toLowerCase(Locale.US)); _dontSendMSMOnReceive = Boolean.parseBoolean(opts.getProperty(I2PClient.PROP_FAST_RECEIVE)); } // per-destination session key manager to prevent rather easy correlation - if (_sessionKeyManager == null) { + if (isPrimary && _sessionKeyManager == null) { int tags = TransientSessionKeyManager.DEFAULT_TAGS; int thresh = TransientSessionKeyManager.LOW_THRESHOLD; if (opts != null) { @@ -317,10 +530,8 @@ class ClientConnectionRunner { } } _sessionKeyManager = new TransientSessionKeyManager(_context, tags, thresh); - } else { - _log.error("SessionEstablished called for twice for destination " + _destHashCache.toBase64().substring(0,4)); } - return _manager.destinationEstablished(this); + return _manager.destinationEstablished(this, dest); } /** @@ -331,14 +542,21 @@ class ClientConnectionRunner { * * Do not use for status = STATUS_SEND_ACCEPTED; use ackSendMessage() for that. * + * @param dest the client * @param id the router's ID for this message * @param messageNonce the client's ID for this message * @param status see I2CP MessageStatusMessage for success/failure codes */ - void updateMessageDeliveryStatus(MessageId id, long messageNonce, int status) { + void updateMessageDeliveryStatus(Destination dest, MessageId id, long messageNonce, int status) { if (_dead || messageNonce <= 0) return; - _context.jobQueue().addJob(new MessageDeliveryStatusUpdate(id, messageNonce, status)); + SessionParams sp = _sessions.get(dest.calculateHash()); + if (sp == null) + return; + SessionId sid = sp.sessionId; + if (sid == null) + return; // sid = new SessionId(foo) ??? + _context.jobQueue().addJob(new MessageDeliveryStatusUpdate(sid, id, messageNonce, status)); } /** @@ -346,19 +564,23 @@ class ClientConnectionRunner { * updated. This takes care of all the LeaseRequestState stuff (including firing any jobs) */ void leaseSetCreated(LeaseSet ls) { - LeaseRequestState state = null; + Hash h = ls.getDestination().calculateHash(); + SessionParams sp = _sessions.get(h); + if (sp == null) + return; + LeaseRequestState state; synchronized (this) { - state = _leaseRequest; + state = sp.leaseRequest; if (state == null) { if (_log.shouldLog(Log.WARN)) _log.warn("LeaseRequest is null and we've received a new lease?! perhaps this is odd... " + ls); return; } else { state.setIsSuccessful(true); - _currentLeaseSet = ls; + setLeaseSet(ls); if (_log.shouldLog(Log.DEBUG)) _log.debug("LeaseSet created fully: " + state + " / " + ls); - _leaseRequest = null; + sp.leaseRequest = null; _consecutiveLeaseRequestFails = 0; } } @@ -427,12 +649,12 @@ class ClientConnectionRunner { if (_log.shouldLog(Log.DEBUG)) _log.debug("** Receiving message " + id.getMessageId() + " with payload of size " - + payload.getSize() + " for session " + _sessionId.getSessionId()); + + payload.getSize() + " for session " + message.getSessionId()); //long beforeDistribute = _context.clock().now(); // the following blocks as described above - SessionConfig cfg = _config; - if (cfg != null) - _manager.distributeMessage(cfg.getDestination(), dest, payload, + Destination fromDest = getDestination(message.getSessionId()); + if (fromDest != null) + _manager.distributeMessage(fromDest, dest, payload, id, message.getNonce(), expiration, flags); // else log error? //long timeToDistribute = _context.clock().now() - beforeDistribute; @@ -452,11 +674,9 @@ class ClientConnectionRunner { * @param id OUR id for the message * @param nonce HIS id for the message */ - void ackSendMessage(MessageId id, long nonce) { + void ackSendMessage(SessionId sid, MessageId id, long nonce) { if (_dontSendMSM || nonce == 0) return; - SessionId sid = _sessionId; - if (sid == null) return; if (_log.shouldLog(Log.DEBUG)) _log.debug("Acking message send [accepted]" + id + " / " + nonce + " for sessionId " + sid); @@ -480,6 +700,9 @@ class ClientConnectionRunner { * * Note that no failure indication is available. * Fails silently on e.g. queue overflow to client, client dead, etc. + * + * @param toDest non-null + * @param fromDest generally null when from remote, non-null if from local */ void receiveMessage(Destination toDest, Destination fromDest, Payload payload) { if (_dead) return; @@ -489,13 +712,33 @@ class ClientConnectionRunner { j.runJob(); } + /** + * Asynchronously deliver the message to the current runner + * + * Note that no failure indication is available. + * Fails silently on e.g. queue overflow to client, client dead, etc. + * + * @param toHash non-null + * @param fromDest generally null when from remote, non-null if from local + * @since 0.9.20 + */ + void receiveMessage(Hash toHash, Destination fromDest, Payload payload) { + SessionParams sp = _sessions.get(toHash); + if (sp == null) { + if (_log.shouldLog(Log.WARN)) + _log.warn("No session found for receiveMessage()"); + return; + } + receiveMessage(sp.dest, fromDest, payload); + } + /** * Send async abuse message to the client * */ - public void reportAbuse(String reason, int severity) { + public void reportAbuse(Destination dest, String reason, int severity) { if (_dead) return; - _context.jobQueue().addJob(new ReportAbuseJob(_context, this, reason, severity)); + _context.jobQueue().addJob(new ReportAbuseJob(_context, this, dest, reason, severity)); } /** @@ -504,13 +747,15 @@ class ClientConnectionRunner { * within the timeout specified, queue up the onFailedJob. This call does not * block. * + * @param h the Destination's hash * @param set LeaseSet with requested leases - this object must be updated to contain the * signed version (as well as any changed/added/removed Leases) + * The LeaseSet contains Leases and destination only, it is unsigned. * @param expirationTime ms to wait before failing * @param onCreateJob Job to run after the LeaseSet is authorized, null OK * @param onFailedJob Job to run after the timeout passes without receiving authorization, null OK */ - void requestLeaseSet(LeaseSet set, long expirationTime, Job onCreateJob, Job onFailedJob) { + void requestLeaseSet(Hash h, LeaseSet set, long expirationTime, Job onCreateJob, Job onFailedJob) { if (_dead) { if (_log.shouldLog(Log.WARN)) _log.warn("Requesting leaseSet from a dead client: " + set); @@ -518,6 +763,12 @@ class ClientConnectionRunner { _context.jobQueue().addJob(onFailedJob); return; } + SessionParams sp = _sessions.get(h); + if (sp == null) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Requesting leaseSet for an unknown sesssion"); + return; + } // We can't use LeaseSet.equals() here because the dest, keys, and sig on // the new LeaseSet are null. So we compare leases one by one. // In addition, the client rewrites the expiration time of all the leases to @@ -528,12 +779,15 @@ class ClientConnectionRunner { // so the comparison will always work. int leases = set.getLeaseCount(); // synch so _currentLeaseSet isn't changed out from under us + LeaseSet current = null; + Destination dest = sp.dest; synchronized (this) { - if (_currentLeaseSet != null && _currentLeaseSet.getLeaseCount() == leases) { + current = sp.currentLeaseSet; + if (current != null && current.getLeaseCount() == leases) { for (int i = 0; i < leases; i++) { - if (! _currentLeaseSet.getLease(i).getTunnelId().equals(set.getLease(i).getTunnelId())) + if (! current.getLease(i).getTunnelId().equals(set.getLease(i).getTunnelId())) break; - if (! _currentLeaseSet.getLease(i).getGateway().equals(set.getLease(i).getGateway())) + if (! current.getLease(i).getGateway().equals(set.getLease(i).getGateway())) break; if (i == leases - 1) { if (_log.shouldLog(Log.INFO)) @@ -546,10 +800,10 @@ class ClientConnectionRunner { } } if (_log.shouldLog(Log.INFO)) - _log.info("Current leaseSet " + _currentLeaseSet + "\nNew leaseSet " + set); - LeaseRequestState state = null; + _log.info("Current leaseSet " + current + "\nNew leaseSet " + set); + LeaseRequestState state; synchronized (this) { - state = _leaseRequest; + state = sp.leaseRequest; if (state != null) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Already requesting " + state); @@ -561,12 +815,15 @@ class ClientConnectionRunner { // theirs is newer } else { // ours is newer, so wait a few secs and retry + set.setDestination(dest); _context.simpleTimer2().addEvent(new Rerequest(set, expirationTime, onCreateJob, onFailedJob), 3*1000); } // fire onCreated? return; // already requesting } else { - _leaseRequest = state = new LeaseRequestState(onCreateJob, onFailedJob, _context.clock().now() + expirationTime, set); + set.setDestination(dest); + sp.leaseRequest = state = new LeaseRequestState(onCreateJob, onFailedJob, + _context.clock().now() + expirationTime, set); if (_log.shouldLog(Log.DEBUG)) _log.debug("New request: " + state); } @@ -580,6 +837,7 @@ class ClientConnectionRunner { private final Job _onCreate; private final Job _onFailed; + /** @param ls dest must be set */ public Rerequest(LeaseSet ls, long expirationTime, Job onCreate, Job onFailed) { _ls = ls; _expirationTime = expirationTime; @@ -588,7 +846,7 @@ class ClientConnectionRunner { } public void timeReached() { - requestLeaseSet(_ls, _expirationTime, _onCreate, _onFailed); + requestLeaseSet(_ls.getDestination().calculateHash(), _ls, _expirationTime, _onCreate, _onFailed); } } @@ -697,6 +955,7 @@ class ClientConnectionRunner { private static final int MAX_REQUEUE = 60; // 30 sec. private class MessageDeliveryStatusUpdate extends JobImpl { + private final SessionId _sessId; private final MessageId _messageId; private final long _messageNonce; private final int _status; @@ -710,8 +969,9 @@ class ClientConnectionRunner { * @param messageNonce the client's ID for this message * @param status see I2CP MessageStatusMessage for success/failure codes */ - public MessageDeliveryStatusUpdate(MessageId id, long messageNonce, int status) { + public MessageDeliveryStatusUpdate(SessionId sid, MessageId id, long messageNonce, int status) { super(ClientConnectionRunner.this._context); + _sessId = sid; _messageId = id; _messageNonce = messageNonce; _status = status; @@ -727,7 +987,7 @@ class ClientConnectionRunner { MessageStatusMessage msg = new MessageStatusMessage(); msg.setMessageId(_messageId.getMessageId()); - msg.setSessionId(_sessionId.getSessionId()); + msg.setSessionId(_sessId.getSessionId()); // has to be >= 0, it is initialized to -1 msg.setNonce(_messageNonce); msg.setSize(0); @@ -738,12 +998,12 @@ class ClientConnectionRunner { // bug requeueing forever? failsafe _log.error("Abandon update for message " + _messageId + " to " + MessageStatusMessage.getStatusString(msg.getStatus()) - + " for session " + _sessionId.getSessionId()); + + " for " + _sessId); } else { if (_log.shouldLog(Log.WARN)) _log.warn("Almost send an update for message " + _messageId + " to " + MessageStatusMessage.getStatusString(msg.getStatus()) - + " for session " + _sessionId.getSessionId() + + " for " + _sessId + " before they knew the messageId! delaying .5s"); _lastTried = _context.clock().now(); requeue(REQUEUE_DELAY); @@ -778,14 +1038,14 @@ class ClientConnectionRunner { if (_log.shouldLog(Log.DEBUG)) _log.info("Updating message status for message " + _messageId + " to " + MessageStatusMessage.getStatusString(msg.getStatus()) - + " for session " + _sessionId.getSessionId() + + " for " + _sessId + " (with nonce=2), retrying after " + (_context.clock().now() - _lastTried)); } else { if (_log.shouldLog(Log.DEBUG)) _log.debug("Updating message status for message " + _messageId + " to " + MessageStatusMessage.getStatusString(msg.getStatus()) - + " for session " + _sessionId.getSessionId() + " (with nonce=2)"); + + " for " + _sessId + " (with nonce=2)"); } try { diff --git a/router/java/src/net/i2p/router/client/ClientManager.java b/router/java/src/net/i2p/router/client/ClientManager.java index e94f202d9..0041fc78c 100644 --- a/router/java/src/net/i2p/router/client/ClientManager.java +++ b/router/java/src/net/i2p/router/client/ClientManager.java @@ -55,9 +55,11 @@ class ClientManager { protected final List _listeners; // Destination --> ClientConnectionRunner // Locked for adds/removes but not lookups + // If a runner has multiple sessions it will be in here multiple times, one for each dest private final Map _runners; // Same as what's in _runners, but for fast lookup by Hash // Locked for adds/removes but not lookups + // If a runner has multiple sessions it will be in here multiple times, one for each dest private final Map _runnersByHash; // ClientConnectionRunner for clients w/out a Dest yet private final Set _pendingRunners; @@ -214,24 +216,44 @@ class ClientManager { } } + /** + * Remove all sessions for this runner. + */ public void unregisterConnection(ClientConnectionRunner runner) { - _log.warn("Unregistering (dropping) a client connection"); + if (_log.shouldLog(Log.WARN)) + _log.warn("Unregistering (dropping) a client connection"); synchronized (_pendingRunners) { _pendingRunners.remove(runner); } - if ( (runner.getConfig() != null) && (runner.getConfig().getDestination() != null) ) { - // after connection establishment - Destination dest = runner.getConfig().getDestination(); - synchronized (_runners) { - SessionId id = runner.getSessionId(); - if (id != null) - _runnerSessionIds.remove(id); + + List ids = runner.getSessionIds(); + List dests = runner.getDestinations(); + synchronized (_runners) { + for (SessionId id : ids) { + _runnerSessionIds.remove(id); + } + for (Destination dest : dests) { _runners.remove(dest); _runnersByHash.remove(dest.calculateHash()); } } } + /** + * Remove only the following session. Does not remove the runner if it has more. + * + * @since 0.9.19 + */ + public void unregisterSession(SessionId id, Destination dest) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Unregistering client session " + id); + synchronized (_runners) { + _runnerSessionIds.remove(id); + _runners.remove(dest); + _runnersByHash.remove(dest.calculateHash()); + } + } + /** * Add to the clients list. Check for a dup destination. * Side effect: Sets the session ID of the runner. @@ -239,8 +261,7 @@ class ClientManager { * * @return SessionStatusMessage return code, 1 for success, != 1 for failure */ - public int destinationEstablished(ClientConnectionRunner runner) { - Destination dest = runner.getConfig().getDestination(); + public int destinationEstablished(ClientConnectionRunner runner, Destination dest) { if (_log.shouldLog(Log.DEBUG)) _log.debug("DestinationEstablished called for destination " + dest.calculateHash().toBase64()); @@ -255,9 +276,10 @@ class ClientManager { } else { SessionId id = locked_getNextSessionId(); if (id != null) { - runner.setSessionId(id); + Hash h = dest.calculateHash(); + runner.setSessionId(h, id); _runners.put(dest, runner); - _runnersByHash.put(dest.calculateHash(), runner); + _runnersByHash.put(h, runner); rv = SessionStatusMessage.STATUS_CREATED; } else { rv = SessionStatusMessage.STATUS_REFUSED; @@ -323,8 +345,11 @@ class ClientManager { // sender went away return; } - ClientMessage msg = new ClientMessage(toDest, payload, runner.getConfig(), - runner.getConfig().getDestination(), msgId, + SessionConfig config = runner.getConfig(fromDest.calculateHash()); + if (config == null) + return; + ClientMessage msg = new ClientMessage(toDest, payload, config, + fromDest, msgId, messageNonce, expiration, flags); _ctx.clientMessagePool().add(msg, true); } @@ -362,7 +387,7 @@ class ClientManager { // note that receiveMessage() does not indicate a failure, // so a queue overflow is not recognized. we always return success. if (_from != null) { - _from.updateMessageDeliveryStatus(_msgId, _messageNonce, MessageStatusMessage.STATUS_SEND_SUCCESS_LOCAL); + _from.updateMessageDeliveryStatus(_fromDest, _msgId, _messageNonce, MessageStatusMessage.STATUS_SEND_SUCCESS_LOCAL); } } } @@ -378,7 +403,8 @@ class ClientManager { * * @param dest Destination from which the LeaseSet's authorization should be requested * @param set LeaseSet with requested leases - this object must be updated to contain the - * signed version (as well as any changed/added/removed Leases) + * signed version (as well as any changed/added/removed Leases). + * The LeaseSet contains Leases only; it is unsigned and does not have the destination set. * @param timeout ms to wait before failing * @param onCreateJob Job to run after the LeaseSet is authorized * @param onFailedJob Job to run after the timeout passes without receiving authorization @@ -386,20 +412,33 @@ class ClientManager { public void requestLeaseSet(Destination dest, LeaseSet set, long timeout, Job onCreateJob, Job onFailedJob) { ClientConnectionRunner runner = getRunner(dest); if (runner == null) { - if (_log.shouldLog(Log.ERROR)) + if (_log.shouldLog(Log.WARN)) _log.warn("Cannot request the lease set, as we can't find a client runner for " + dest.calculateHash().toBase64() + ". disconnected?"); _ctx.jobQueue().addJob(onFailedJob); } else { - runner.requestLeaseSet(set, timeout, onCreateJob, onFailedJob); + runner.requestLeaseSet(dest.calculateHash(), set, timeout, onCreateJob, onFailedJob); } } + /** + * Request that a particular client authorize the Leases contained in the + * LeaseSet. + * + * @param dest Destination from which the LeaseSet's authorization should be requested + * @param ls LeaseSet with requested leases - this object must be updated to contain the + * signed version (as well as any changed/added/removed Leases). + * The LeaseSet contains Leases only; it is unsigned and does not have the destination set. + */ public void requestLeaseSet(Hash dest, LeaseSet ls) { ClientConnectionRunner runner = getRunner(dest); if (runner != null) { // no need to fire off any jobs... - runner.requestLeaseSet(ls, REQUEST_LEASESET_TIMEOUT, null, null); + runner.requestLeaseSet(dest, ls, REQUEST_LEASESET_TIMEOUT, null, null); + } else { + if (_log.shouldLog(Log.WARN)) + _log.warn("Cannot request the lease set, as we can't find a client runner for " + + dest + ". disconnected?"); } } @@ -425,7 +464,9 @@ class ClientManager { if (destHash == null) return true; ClientConnectionRunner runner = getRunner(destHash); if (runner == null) return true; - return !Boolean.parseBoolean(runner.getConfig().getOptions().getProperty(ClientManagerFacade.PROP_CLIENT_ONLY)); + SessionConfig config = runner.getConfig(destHash); + if (config == null) return true; + return !Boolean.parseBoolean(config.getOptions().getProperty(ClientManagerFacade.PROP_CLIENT_ONLY)); } /** @@ -452,7 +493,7 @@ class ClientManager { public SessionConfig getClientSessionConfig(Destination dest) { ClientConnectionRunner runner = getRunner(dest); if (runner != null) - return runner.getConfig(); + return runner.getConfig(dest.calculateHash()); else return null; } @@ -490,7 +531,7 @@ class ClientManager { if (_log.shouldLog(Log.DEBUG)) _log.debug("Delivering status " + status + " to " + fromDest.calculateHash() + " for message " + id); - runner.updateMessageDeliveryStatus(id, messageNonce, status); + runner.updateMessageDeliveryStatus(fromDest, id, messageNonce, status); } else { if (_log.shouldLog(Log.WARN)) _log.warn("Cannot deliver status " + status + " to " @@ -514,7 +555,7 @@ class ClientManager { if (dest != null) { ClientConnectionRunner runner = getRunner(dest); if (runner != null) { - runner.reportAbuse(reason, severity); + runner.reportAbuse(dest, reason, severity); } } else { for (Destination d : _runners.keySet()) { @@ -592,21 +633,25 @@ class ClientManager { public void runJob() { ClientConnectionRunner runner; - if (_msg.getDestination() != null) - runner = getRunner(_msg.getDestination()); + Destination dest = _msg.getDestination(); + if (dest != null) + runner = getRunner(dest); else runner = getRunner(_msg.getDestinationHash()); if (runner != null) { //_ctx.statManager().addRateData("client.receiveMessageSize", // _msg.getPayload().getSize(), 0); - runner.receiveMessage(_msg.getDestination(), null, _msg.getPayload()); + if (dest != null) + runner.receiveMessage(dest, null, _msg.getPayload()); + else + runner.receiveMessage(_msg.getDestinationHash(), null, _msg.getPayload()); } else { // no client connection... // we should pool these somewhere... if (_log.shouldLog(Log.WARN)) _log.warn("Message received but we don't have a connection to " - + _msg.getDestination() + "/" + _msg.getDestinationHash() + + dest + "/" + _msg.getDestinationHash() + " currently. DROPPED"); } } diff --git a/router/java/src/net/i2p/router/client/ClientManagerFacadeImpl.java b/router/java/src/net/i2p/router/client/ClientManagerFacadeImpl.java index 6168ab794..cc82592d5 100644 --- a/router/java/src/net/i2p/router/client/ClientManagerFacadeImpl.java +++ b/router/java/src/net/i2p/router/client/ClientManagerFacadeImpl.java @@ -90,7 +90,7 @@ public class ClientManagerFacadeImpl extends ClientManagerFacade implements Inte for (Destination dest : _manager.getRunnerDestinations()) { ClientConnectionRunner runner = _manager.getRunner(dest); if ( (runner == null) || (runner.getIsDead())) continue; - LeaseSet ls = runner.getLeaseSet(); + LeaseSet ls = runner.getLeaseSet(dest.calculateHash()); if (ls == null) continue; // still building long howLongAgo = _context.clock().now() - ls.getEarliestLeaseDate(); @@ -115,6 +115,7 @@ public class ClientManagerFacadeImpl extends ClientManagerFacade implements Inte * @param dest Destination from which the LeaseSet's authorization should be requested * @param set LeaseSet with requested leases - this object must be updated to contain the * signed version (as well as any changed/added/removed Leases) + * The LeaseSet contains Leases only; it is unsigned and does not have the destination set. * @param timeout ms to wait before failing * @param onCreateJob Job to run after the LeaseSet is authorized * @param onFailedJob Job to run after the timeout passes without receiving authorization @@ -126,6 +127,15 @@ public class ClientManagerFacadeImpl extends ClientManagerFacade implements Inte _log.error("Null manager on requestLeaseSet!"); } + /** + * Request that a particular client authorize the Leases contained in the + * LeaseSet. + * + * @param dest Destination from which the LeaseSet's authorization should be requested + * @param ls LeaseSet with requested leases - this object must be updated to contain the + * signed version (as well as any changed/added/removed Leases). + * The LeaseSet contains Leases only; it is unsigned and does not have the destination set. + */ public void requestLeaseSet(Hash dest, LeaseSet set) { if (_manager != null) _manager.requestLeaseSet(dest, set); diff --git a/router/java/src/net/i2p/router/client/ClientMessageEventListener.java b/router/java/src/net/i2p/router/client/ClientMessageEventListener.java index e20039b6e..628d8b0a5 100644 --- a/router/java/src/net/i2p/router/client/ClientMessageEventListener.java +++ b/router/java/src/net/i2p/router/client/ClientMessageEventListener.java @@ -204,12 +204,13 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi */ private void handleCreateSession(CreateSessionMessage message) { SessionConfig in = message.getSessionConfig(); + Destination dest = in.getDestination(); if (in.verifySignature()) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Signature verified correctly on create session message"); } else { // For now, we do NOT send a SessionStatusMessage - see javadoc above - int itype = in.getDestination().getCertificate().getCertificateType(); + int itype = dest.getCertificate().getCertificateType(); SigType stype = SigType.getByCode(itype); if (stype == null || !stype.isAvailable()) { _log.error("Client requested unsupported signature type " + itype); @@ -235,7 +236,7 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi if (!checkAuth(inProps)) return; - SessionId id = _runner.getSessionId(); + SessionId id = _runner.getSessionId(dest.calculateHash()); if (id != null) { _runner.disconnectClient("Already have session " + id); return; @@ -244,11 +245,22 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi // Copy over the whole config structure so we don't later corrupt it on // the client side if we change settings or later get a // ReconfigureSessionMessage - SessionConfig cfg = new SessionConfig(in.getDestination()); + SessionConfig cfg = new SessionConfig(dest); cfg.setSignature(in.getSignature()); Properties props = new Properties(); - props.putAll(in.getOptions()); + boolean isPrimary = _runner.getSessionIds().isEmpty(); + if (!isPrimary) { + // all the primary options, then the overrides from the alias + SessionConfig pcfg = _runner.getPrimaryConfig(); + if (pcfg != null) { + props.putAll(pcfg.getOptions()); + } else { + _log.error("no primary config?"); + } + } + props.putAll(inProps); cfg.setOptions(props); + // this sets the session id int status = _runner.sessionEstablished(cfg); if (status != SessionStatusMessage.STATUS_CREATED) { // For now, we do NOT send a SessionStatusMessage - see javadoc above @@ -264,11 +276,33 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi _runner.disconnectClient(msg); return; } - sendStatusMessage(status); + // get the new session ID + id = _runner.getSessionId(dest.calculateHash()); if (_log.shouldLog(Log.INFO)) - _log.info("Session " + _runner.getSessionId() + " established for " + _runner.getDestHash()); - startCreateSessionJob(); + _log.info("Session " + id + " established for " + dest.calculateHash()); + if (isPrimary) { + sendStatusMessage(id, status); + startCreateSessionJob(cfg); + } else { + SessionConfig pcfg = _runner.getPrimaryConfig(); + if (pcfg != null) { + ClientTunnelSettings settings = new ClientTunnelSettings(dest.calculateHash()); + settings.readFromProperties(props); + // addAlias() sends the create lease set msg, so we have to send the SMS first + sendStatusMessage(id, status); + boolean ok = _context.tunnelManager().addAlias(dest, settings, pcfg.getDestination()); + if (!ok) { + _log.error("Add alias failed"); + // FIXME cleanup + } + } else { + _log.error("no primary config?"); + status = SessionStatusMessage.STATUS_INVALID; + sendStatusMessage(id, status); + // FIXME cleanup + } + } } /** @@ -314,8 +348,8 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi * @since 0.9.8 * */ - protected void startCreateSessionJob() { - _context.jobQueue().addJob(new CreateSessionJob(_context, _runner)); + protected void startCreateSessionJob(SessionConfig config) { + _context.jobQueue().addJob(new CreateSessionJob(_context, config)); } /** @@ -324,7 +358,8 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi * */ private void handleSendMessage(SendMessageMessage message) { - SessionConfig cfg = _runner.getConfig(); + SessionId sid = message.getSessionId(); + SessionConfig cfg = _runner.getConfig(sid); if (cfg == null) { if (_log.shouldLog(Log.ERROR)) _log.error("SendMessage w/o session"); @@ -336,7 +371,8 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi long beforeDistribute = _context.clock().now(); MessageId id = _runner.distributeMessage(message); long timeToDistribute = _context.clock().now() - beforeDistribute; - _runner.ackSendMessage(id, message.getNonce()); + // TODO validate session id + _runner.ackSendMessage(message.getSessionId(), id, message.getNonce()); _context.statManager().addRateData("client.distributeTime", timeToDistribute); if ( (timeToDistribute > 50) && (_log.shouldLog(Log.INFO)) ) _log.info("Took too long to distribute the message (which holds up the ack): " + timeToDistribute); @@ -353,7 +389,8 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi _log.debug("Handling recieve begin: id = " + message.getMessageId()); MessagePayloadMessage msg = new MessagePayloadMessage(); msg.setMessageId(message.getMessageId()); - msg.setSessionId(_runner.getSessionId().getSessionId()); + // TODO validate session id + msg.setSessionId(message.getSessionId()); Payload payload = _runner.getPayload(new MessageId(message.getMessageId())); if (payload == null) { if (_log.shouldLog(Log.WARN)) @@ -382,9 +419,18 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi } private void handleDestroySession(DestroySessionMessage message) { - if (_log.shouldLog(Log.INFO)) - _log.info("Destroying client session " + _runner.getSessionId()); - _runner.stopRunning(); + SessionId id = message.getSessionId(); + SessionConfig cfg = _runner.getConfig(id); + _runner.removeSession(id); + int left = _runner.getSessionIds().size(); + if (left <= 0) { + _runner.stopRunning(); + } else { + if (cfg != null) + _context.tunnelManager().removeAlias(cfg.getDestination()); + if (_log.shouldLog(Log.INFO)) + _log.info("Still " + left + " sessions left"); + } } /** override for testing */ @@ -395,7 +441,8 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi _runner.disconnectClient("Invalid CreateLeaseSetMessage"); return; } - SessionConfig cfg = _runner.getConfig(); + SessionId id = message.getSessionId(); + SessionConfig cfg = _runner.getConfig(id); if (cfg == null) { if (_log.shouldLog(Log.ERROR)) _log.error("CreateLeaseSet w/o session"); @@ -446,8 +493,7 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi return; } if (_log.shouldLog(Log.INFO)) - _log.info("New lease set granted for destination " - + _runner.getDestHash()); + _log.info("New lease set granted for destination " + dest); // leaseSetCreated takes care of all the LeaseRequestState stuff (including firing any jobs) _runner.leaseSetCreated(message.getLeaseSet()); @@ -455,6 +501,7 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi /** override for testing */ protected void handleDestLookup(DestLookupMessage message) { + // no session id in DLM _context.jobQueue().addJob(new LookupDestJob(_context, _runner, message.getHash(), _runner.getDestHash())); } @@ -464,10 +511,12 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi * @since 0.9.11 */ protected void handleHostLookup(HostLookupMessage message) { + Hash h = _runner.getDestHash(message.getSessionId()); + if (h == null) + return; // ok? _context.jobQueue().addJob(new LookupDestJob(_context, _runner, message.getReqID(), message.getTimeout(), message.getSessionId(), - message.getHash(), message.getHostname(), - _runner.getDestHash())); + message.getHash(), message.getHostname(), h)); } /** @@ -482,10 +531,12 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi * In-JVM client side must promote defaults to the primary map. */ private void handleReconfigureSession(ReconfigureSessionMessage message) { - SessionConfig cfg = _runner.getConfig(); + SessionId id = message.getSessionId(); + SessionConfig cfg = _runner.getConfig(id); if (cfg == null) { if (_log.shouldLog(Log.ERROR)) _log.error("ReconfigureSession w/o session"); + //sendStatusMessage(id, SessionStatusMessage.STATUS_INVALID); _runner.disconnectClient("ReconfigureSession w/o session"); return; } @@ -493,12 +544,12 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi _log.info("Updating options - old: " + cfg + " new: " + message.getSessionConfig()); if (!message.getSessionConfig().getDestination().equals(cfg.getDestination())) { _log.error("Dest mismatch"); - sendStatusMessage(SessionStatusMessage.STATUS_INVALID); + sendStatusMessage(id, SessionStatusMessage.STATUS_INVALID); _runner.stopRunning(); return; } + Hash dest = cfg.getDestination().calculateHash(); cfg.getOptions().putAll(message.getSessionConfig().getOptions()); - Hash dest = _runner.getDestHash(); ClientTunnelSettings settings = new ClientTunnelSettings(dest); Properties props = new Properties(); props.putAll(cfg.getOptions()); @@ -507,14 +558,11 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi settings.getInboundSettings()); _context.tunnelManager().setOutboundSettings(dest, settings.getOutboundSettings()); - sendStatusMessage(SessionStatusMessage.STATUS_UPDATED); + sendStatusMessage(id, SessionStatusMessage.STATUS_UPDATED); } - private void sendStatusMessage(int status) { + private void sendStatusMessage(SessionId id, int status) { SessionStatusMessage msg = new SessionStatusMessage(); - SessionId id = _runner.getSessionId(); - if (id == null) - id = ClientManager.UNKNOWN_SESSION_ID; msg.setSessionId(id); msg.setStatus(status); try { diff --git a/router/java/src/net/i2p/router/client/CreateSessionJob.java b/router/java/src/net/i2p/router/client/CreateSessionJob.java index af01a496c..558dba8e7 100644 --- a/router/java/src/net/i2p/router/client/CreateSessionJob.java +++ b/router/java/src/net/i2p/router/client/CreateSessionJob.java @@ -26,25 +26,20 @@ import net.i2p.util.Log; */ class CreateSessionJob extends JobImpl { private final Log _log; - private final ClientConnectionRunner _runner; + private final SessionConfig _config; - public CreateSessionJob(RouterContext context, ClientConnectionRunner runner) { + public CreateSessionJob(RouterContext context, SessionConfig config) { super(context); _log = context.logManager().getLog(CreateSessionJob.class); - _runner = runner; + _config = config; if (_log.shouldLog(Log.DEBUG)) - _log.debug("CreateSessionJob for runner " + _runner + " / config: " + _runner.getConfig()); + _log.debug("CreateSessionJob for config: " + config); } public String getName() { return "Request tunnels for a new client"; } + public void runJob() { - SessionConfig cfg = _runner.getConfig(); - if ( (cfg == null) || (cfg.getDestination() == null) ) { - if (_log.shouldLog(Log.ERROR)) - _log.error("No session config on runner " + _runner); - return; - } - Hash dest = cfg.getDestination().calculateHash(); + Hash dest = _config.getDestination().calculateHash(); if (_log.shouldLog(Log.INFO)) _log.info("Requesting lease set for destination " + dest); ClientTunnelSettings settings = new ClientTunnelSettings(dest); @@ -61,10 +56,10 @@ class CreateSessionJob extends JobImpl { // XXX props.putAll(Router.getInstance().getConfigMap()); // override them by the client's settings - props.putAll(cfg.getOptions()); + props.putAll(_config.getOptions()); // and load 'em up (using anything not yet set as the software defaults) settings.readFromProperties(props); - getContext().tunnelManager().buildTunnels(cfg.getDestination(), settings); + getContext().tunnelManager().buildTunnels(_config.getDestination(), settings); } } diff --git a/router/java/src/net/i2p/router/client/LeaseRequestState.java b/router/java/src/net/i2p/router/client/LeaseRequestState.java index 03bc2e4fa..2fb6b9760 100644 --- a/router/java/src/net/i2p/router/client/LeaseRequestState.java +++ b/router/java/src/net/i2p/router/client/LeaseRequestState.java @@ -30,6 +30,9 @@ class LeaseRequestState { /** * @param expiration absolute time, when the request expires (not when the LS expires) + * @param requested LeaseSet with requested leases - this object must be updated to contain the + * signed version (as well as any changed/added/removed Leases) + * The LeaseSet contains Leases and destination only, it is unsigned. */ public LeaseRequestState(Job onGranted, Job onFailed, long expiration, LeaseSet requested) { _onGranted = onGranted; @@ -40,6 +43,7 @@ class LeaseRequestState { /** created lease set from client - FIXME always null */ public LeaseSet getGranted() { return _grantedLeaseSet; } + /** FIXME unused - why? */ public void setGranted(LeaseSet ls) { _grantedLeaseSet = ls; } diff --git a/router/java/src/net/i2p/router/client/MessageReceivedJob.java b/router/java/src/net/i2p/router/client/MessageReceivedJob.java index 0ee148282..3614d1693 100644 --- a/router/java/src/net/i2p/router/client/MessageReceivedJob.java +++ b/router/java/src/net/i2p/router/client/MessageReceivedJob.java @@ -14,6 +14,7 @@ import net.i2p.data.i2cp.I2CPMessageException; import net.i2p.data.i2cp.MessageId; import net.i2p.data.i2cp.MessagePayloadMessage; import net.i2p.data.i2cp.MessageStatusMessage; +import net.i2p.data.i2cp.SessionId; import net.i2p.router.JobImpl; import net.i2p.router.RouterContext; import net.i2p.util.Log; @@ -26,14 +27,20 @@ import net.i2p.util.Log; class MessageReceivedJob extends JobImpl { private final Log _log; private final ClientConnectionRunner _runner; + private final Destination _toDest; private final Payload _payload; private final boolean _sendDirect; + /** + * @param toDest non-null, required to pick session + * @param fromDest ignored, generally null + */ public MessageReceivedJob(RouterContext ctx, ClientConnectionRunner runner, Destination toDest, Destination fromDest, Payload payload, boolean sendDirect) { super(ctx); _log = ctx.logManager().getLog(MessageReceivedJob.class); _runner = runner; + _toDest = toDest; _payload = payload; _sendDirect = sendDirect; } @@ -43,8 +50,8 @@ class MessageReceivedJob extends JobImpl { public void runJob() { if (_runner.isDead()) return; MessageId id = null; - long nextID = _runner.getNextMessageId(); try { + long nextID = _runner.getNextMessageId(); if (_sendDirect) { sendMessage(nextID); } else { @@ -55,7 +62,7 @@ class MessageReceivedJob extends JobImpl { } catch (I2CPMessageException ime) { if (_log.shouldLog(Log.WARN)) _log.warn("Error writing out the message", ime); - if (!_sendDirect) + if (id != null && !_sendDirect) _runner.removePayload(id); } } @@ -69,7 +76,13 @@ class MessageReceivedJob extends JobImpl { // + " (with nonce=1)", new Exception("available")); MessageStatusMessage msg = new MessageStatusMessage(); msg.setMessageId(id.getMessageId()); - msg.setSessionId(_runner.getSessionId().getSessionId()); + SessionId sid = _runner.getSessionId(_toDest.calculateHash()); + if (sid == null) { + if (_log.shouldLog(Log.WARN)) + _log.warn("No session for " + _toDest.calculateHash()); + return; + } + msg.setSessionId(sid.getSessionId()); msg.setSize(size); // has to be >= 0, it is initialized to -1 msg.setNonce(1); @@ -84,7 +97,13 @@ class MessageReceivedJob extends JobImpl { private void sendMessage(long id) throws I2CPMessageException { MessagePayloadMessage msg = new MessagePayloadMessage(); msg.setMessageId(id); - msg.setSessionId(_runner.getSessionId().getSessionId()); + SessionId sid = _runner.getSessionId(_toDest.calculateHash()); + if (sid == null) { + if (_log.shouldLog(Log.WARN)) + _log.warn("No session for " + _toDest.calculateHash()); + return; + } + msg.setSessionId(sid.getSessionId()); msg.setPayload(_payload); _runner.doSend(msg); } diff --git a/router/java/src/net/i2p/router/client/ReportAbuseJob.java b/router/java/src/net/i2p/router/client/ReportAbuseJob.java index 8dd36ac3c..c24e5e9ba 100644 --- a/router/java/src/net/i2p/router/client/ReportAbuseJob.java +++ b/router/java/src/net/i2p/router/client/ReportAbuseJob.java @@ -8,10 +8,12 @@ package net.i2p.router.client; * */ +import net.i2p.data.Destination; import net.i2p.data.i2cp.AbuseReason; import net.i2p.data.i2cp.AbuseSeverity; import net.i2p.data.i2cp.I2CPMessageException; import net.i2p.data.i2cp.ReportAbuseMessage; +import net.i2p.data.i2cp.SessionId; import net.i2p.router.JobImpl; import net.i2p.router.RouterContext; import net.i2p.util.Log; @@ -23,17 +25,22 @@ import net.i2p.util.Log; class ReportAbuseJob extends JobImpl { private final Log _log; private final ClientConnectionRunner _runner; + private final Destination _dest; private final String _reason; private final int _severity; - public ReportAbuseJob(RouterContext context, ClientConnectionRunner runner, String reason, int severity) { + + public ReportAbuseJob(RouterContext context, ClientConnectionRunner runner, + Destination dest, String reason, int severity) { super(context); _log = context.logManager().getLog(ReportAbuseJob.class); _runner = runner; + _dest = dest; _reason = reason; _severity = severity; } public String getName() { return "Report Abuse"; } + public void runJob() { if (_runner.isDead()) return; AbuseReason res = new AbuseReason(); @@ -41,9 +48,11 @@ class ReportAbuseJob extends JobImpl { AbuseSeverity sev = new AbuseSeverity(); sev.setSeverity(_severity); ReportAbuseMessage msg = new ReportAbuseMessage(); - msg.setMessageId(null); msg.setReason(res); - msg.setSessionId(_runner.getSessionId()); + SessionId id = _runner.getSessionId(_dest.calculateHash()); + if (id == null) + return; + msg.setSessionId(id); msg.setSeverity(sev); try { _runner.doSend(msg); diff --git a/router/java/src/net/i2p/router/client/RequestLeaseSetJob.java b/router/java/src/net/i2p/router/client/RequestLeaseSetJob.java index 9b07978f6..f5ec343a0 100644 --- a/router/java/src/net/i2p/router/client/RequestLeaseSetJob.java +++ b/router/java/src/net/i2p/router/client/RequestLeaseSetJob.java @@ -16,6 +16,7 @@ import net.i2p.data.i2cp.I2CPMessage; import net.i2p.data.i2cp.I2CPMessageException; import net.i2p.data.i2cp.RequestLeaseSetMessage; import net.i2p.data.i2cp.RequestVariableLeaseSetMessage; +import net.i2p.data.i2cp.SessionId; import net.i2p.router.JobImpl; import net.i2p.router.RouterContext; import net.i2p.util.Log; @@ -63,13 +64,16 @@ class RequestLeaseSetJob extends JobImpl { // _log.debug("Adding fudge " + fudge); endTime += fudge; + SessionId id = _runner.getSessionId(_requestState.getRequested().getDestination().calculateHash()); + if (id == null) + return; I2CPMessage msg; if (getContext().getProperty(PROP_VARIABLE, DFLT_VARIABLE) && (_runner instanceof QueuedClientConnectionRunner || RequestVariableLeaseSetMessage.isSupported(_runner.getClientVersion()))) { // new style - leases will have individual expirations RequestVariableLeaseSetMessage rmsg = new RequestVariableLeaseSetMessage(); - rmsg.setSessionId(_runner.getSessionId()); + rmsg.setSessionId(id); for (int i = 0; i < requested.getLeaseCount(); i++) { Lease lease = requested.getLease(i); if (lease.getEndDate().getTime() < endTime) { @@ -90,7 +94,7 @@ class RequestLeaseSetJob extends JobImpl { RequestLeaseSetMessage rmsg = new RequestLeaseSetMessage(); Date end = new Date(endTime); rmsg.setEndDate(end); - rmsg.setSessionId(_runner.getSessionId()); + rmsg.setSessionId(id); for (int i = 0; i < requested.getLeaseCount(); i++) { Lease lease = requested.getLease(i); rmsg.addEndpoint(lease.getGateway(), @@ -144,8 +148,7 @@ class RequestLeaseSetJob extends JobImpl { CheckLeaseRequestStatus.this.getContext().statManager().addRateData("client.requestLeaseSetTimeout", 1); if (_log.shouldLog(Log.ERROR)) { long waited = System.currentTimeMillis() - _start; - _log.error("Failed to receive a leaseSet in the time allotted (" + waited + "): " + _requestState + " for " - + _runner.getConfig().getDestination().calculateHash().toBase64()); + _log.error("Failed to receive a leaseSet in the time allotted (" + waited + "): " + _requestState); } if (_requestState.getOnFailed() != null) RequestLeaseSetJob.this.getContext().jobQueue().addJob(_requestState.getOnFailed()); diff --git a/router/java/src/net/i2p/router/dummy/DummyTunnelManagerFacade.java b/router/java/src/net/i2p/router/dummy/DummyTunnelManagerFacade.java index 5de3db1a3..96dceea18 100644 --- a/router/java/src/net/i2p/router/dummy/DummyTunnelManagerFacade.java +++ b/router/java/src/net/i2p/router/dummy/DummyTunnelManagerFacade.java @@ -50,6 +50,8 @@ public class DummyTunnelManagerFacade implements TunnelManagerFacade { public int getOutboundClientTunnelCount(Hash destination) { return 0; } public long getLastParticipatingExpiration() { return -1; } public void buildTunnels(Destination client, ClientTunnelSettings settings) {} + public boolean addAlias(Destination dest, ClientTunnelSettings settings, Destination existingClient) { return false; } + public void removeAlias(Destination dest) {} public TunnelPoolSettings getInboundSettings() { return null; } public TunnelPoolSettings getOutboundSettings() { return null; } public TunnelPoolSettings getInboundSettings(Hash client) { return null; } diff --git a/router/java/src/net/i2p/router/tunnel/InboundMessageDistributor.java b/router/java/src/net/i2p/router/tunnel/InboundMessageDistributor.java index 62c1e6ed6..d5a90ee68 100644 --- a/router/java/src/net/i2p/router/tunnel/InboundMessageDistributor.java +++ b/router/java/src/net/i2p/router/tunnel/InboundMessageDistributor.java @@ -18,6 +18,7 @@ import net.i2p.data.i2np.VariableTunnelBuildReplyMessage; import net.i2p.router.ClientMessage; import net.i2p.router.RouterContext; import net.i2p.router.TunnelInfo; +import net.i2p.router.TunnelPoolSettings; import net.i2p.router.message.GarlicMessageReceiver; import net.i2p.router.networkdb.kademlia.FloodfillNetworkDatabaseFacade; import net.i2p.util.Log; @@ -204,11 +205,11 @@ class InboundMessageDistributor implements GarlicMessageReceiver.CloveReceiver { * */ public void handleClove(DeliveryInstructions instructions, I2NPMessage data) { + int type = data.getType(); switch (instructions.getDeliveryMode()) { case DeliveryInstructions.DELIVERY_MODE_LOCAL: if (_log.shouldLog(Log.DEBUG)) _log.debug("local delivery instructions for clove: " + data.getClass().getSimpleName()); - int type = data.getType(); if (type == GarlicMessage.MESSAGE_TYPE) { _receiver.receive((GarlicMessage)data); } else if (type == DatabaseStoreMessage.MESSAGE_TYPE) { @@ -296,28 +297,45 @@ class InboundMessageDistributor implements GarlicMessageReceiver.CloveReceiver { _context.inNetMessagePool().add(data, null, null); } return; + case DeliveryInstructions.DELIVERY_MODE_DESTINATION: + Hash to = instructions.getDestination(); // Can we route UnknownI2NPMessages to a destination too? - if (!(data instanceof DataMessage)) { + if (type != DataMessage.MESSAGE_TYPE) { if (_log.shouldLog(Log.ERROR)) _log.error("cant send a " + data.getClass().getSimpleName() + " to a destination"); - } else if ( (_client != null) && (_client.equals(instructions.getDestination())) ) { + } else if (_client != null && _client.equals(to)) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("data message came down a tunnel for " - + _client); + _log.debug("data message came down a tunnel for " + _client); DataMessage dm = (DataMessage)data; Payload payload = new Payload(); payload.setEncryptedData(dm.getData()); ClientMessage m = new ClientMessage(_client, payload); _context.clientManager().messageReceived(m); + } else if (_client != null) { + // Shared tunnel? + TunnelPoolSettings tgt = _context.tunnelManager().getInboundSettings(to); + if (tgt != null && _client.equals(tgt.getAliasOf())) { + // same as above, just different log + if (_log.shouldLog(Log.DEBUG)) + _log.debug("data message came down a tunnel for " + + _client + " targeting shared " + to); + DataMessage dm = (DataMessage)data; + Payload payload = new Payload(); + payload.setEncryptedData(dm.getData()); + ClientMessage m = new ClientMessage(to, payload); + _context.clientManager().messageReceived(m); + } else { + if (_log.shouldLog(Log.ERROR)) + _log.error("Data message came down a tunnel for " + + _client + " but targetted " + to); + } } else { if (_log.shouldLog(Log.ERROR)) - _log.error("this data message came down a tunnel for " - + (_client == null ? "no one" : _client) - + " but targetted " - + instructions.getDestination()); + _log.error("Data message came down an exploratory tunnel targeting " + to); } return; + case DeliveryInstructions.DELIVERY_MODE_ROUTER: // fall through case DeliveryInstructions.DELIVERY_MODE_TUNNEL: if (_log.shouldLog(Log.INFO)) @@ -325,6 +343,7 @@ class InboundMessageDistributor implements GarlicMessageReceiver.CloveReceiver { + ", treat recursively to prevent leakage"); distribute(data, instructions.getRouter(), instructions.getTunnelId()); return; + default: if (_log.shouldLog(Log.ERROR)) _log.error("Unknown instruction " + instructions.getDeliveryMode() + ": " + instructions); diff --git a/router/java/src/net/i2p/router/tunnel/pool/AliasedTunnelPool.java b/router/java/src/net/i2p/router/tunnel/pool/AliasedTunnelPool.java new file mode 100644 index 000000000..03b8f0563 --- /dev/null +++ b/router/java/src/net/i2p/router/tunnel/pool/AliasedTunnelPool.java @@ -0,0 +1,156 @@ +package net.i2p.router.tunnel.pool; + +import java.util.List; + +import net.i2p.data.Hash; +import net.i2p.data.Lease; +import net.i2p.data.LeaseSet; +import net.i2p.data.TunnelId; +import net.i2p.router.RouterContext; +import net.i2p.router.TunnelInfo; +import net.i2p.router.TunnelPoolSettings; +import net.i2p.util.Log; + +/** + * A tunnel pool with its own settings and Destination, + * but uses another pool for its tunnels. + * + * @since 0.9.20 + */ +public class AliasedTunnelPool extends TunnelPool { + + private final TunnelPool _aliasOf; + + AliasedTunnelPool(RouterContext ctx, TunnelPoolManager mgr, TunnelPoolSettings settings, TunnelPool aliasOf) { + super(ctx, mgr, settings, null); + if (settings.isExploratory()) + throw new IllegalArgumentException(); + if (settings.getAliasOf() == null) + throw new IllegalArgumentException(); + _aliasOf = aliasOf; + } + + @Override + synchronized void startup() { + if (_log.shouldLog(Log.INFO)) + _log.info(toString() + ": Startup() called, was already alive? " + _alive, new Exception()); + _alive = true; + super.refreshLeaseSet(); + } + + @Override + synchronized void shutdown() { + if (_log.shouldLog(Log.WARN)) + _log.warn(toString() + ": Shutdown called"); + _alive = false; + } + + @Override + TunnelInfo selectTunnel() { + return _aliasOf.selectTunnel(); + } + + @Override + TunnelInfo selectTunnel(Hash closestTo) { + return _aliasOf.selectTunnel(closestTo); + } + + @Override + public TunnelInfo getTunnel(TunnelId gatewayId) { + return _aliasOf.getTunnel(gatewayId); + } + + @Override + public List listTunnels() { + return _aliasOf.listTunnels(); + } + + @Override + boolean needFallback() { + return false; + } + + @Override + public List listPending() { + return _aliasOf.listPending(); + } + + @Override + public boolean isAlive() { + return _alive && _aliasOf.isAlive(); + } + + @Override + public int size() { + return _aliasOf.size(); + } + + @Override + void addTunnel(TunnelInfo info) { + _aliasOf.addTunnel(info); + } + + @Override + void removeTunnel(TunnelInfo info) { + _aliasOf.removeTunnel(info); + } + + @Override + void tunnelFailed(TunnelInfo cfg) { + _aliasOf.tunnelFailed(cfg); + } + + @Override + void tunnelFailed(TunnelInfo cfg, Hash blamePeer) { + _aliasOf.tunnelFailed(cfg, blamePeer); + } + + @Override + void refreshLeaseSet() {} + + @Override + boolean buildFallback() { + return _aliasOf.buildFallback(); + } + + @Override + protected LeaseSet locked_buildNewLeaseSet() { + LeaseSet ls = _context.netDb().lookupLeaseSetLocally(_aliasOf.getSettings().getDestination()); + if (ls == null) + return null; + // copy everything so it isn't corrupted + LeaseSet rv = new LeaseSet(); + for (int i = 0; i < ls.getLeaseCount(); i++) { + Lease old = ls.getLease(i); + Lease lease = new Lease(); + lease.setEndDate(old.getEndDate()); + lease.setTunnelId(old.getTunnelId()); + lease.setGateway(old.getGateway()); + rv.addLease(lease); + } + return rv; + } + + @Override + public long getLifetimeProcessed() { + return _aliasOf.getLifetimeProcessed(); + } + + @Override + int countHowManyToBuild() { + return 0; + } + + @Override + PooledTunnelCreatorConfig configureNewTunnel() { + return null; + } + + @Override + void buildComplete(PooledTunnelCreatorConfig cfg) {} + + @Override + public String toString() { + return "Aliased " + super.toString(); + } +} diff --git a/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java b/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java index 521ff740a..ef6403a5b 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java +++ b/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java @@ -8,6 +8,7 @@ import java.util.Date; import java.util.Iterator; import java.util.List; import java.util.Properties; +import java.util.Set; import java.util.TreeSet; import net.i2p.data.Hash; @@ -30,13 +31,13 @@ import net.i2p.util.Log; */ public class TunnelPool { private final List _inProgress = new ArrayList(); - private final RouterContext _context; - private final Log _log; + protected final RouterContext _context; + protected final Log _log; private TunnelPoolSettings _settings; private final List _tunnels; private final TunnelPeerSelector _peerSelector; private final TunnelPoolManager _manager; - private volatile boolean _alive; + protected volatile boolean _alive; private long _lifetimeProcessed; private TunnelInfo _lastSelected; private long _lastSelectionPeriod; @@ -118,19 +119,15 @@ public class TunnelPool { } } - void refreshSettings() { - if (!_settings.isExploratory()) { + private void refreshSettings() { + if (!_settings.isExploratory()) return; // don't override client specified settings - } else { - if (_settings.isExploratory()) { - Properties props = new Properties(); - props.putAll(_context.router().getConfigMap()); - if (_settings.isInbound()) - _settings.readFromProperties(TunnelPoolSettings.PREFIX_INBOUND_EXPLORATORY, props); - else - _settings.readFromProperties(TunnelPoolSettings.PREFIX_OUTBOUND_EXPLORATORY, props); - } - } + Properties props = new Properties(); + props.putAll(_context.router().getConfigMap()); + if (_settings.isInbound()) + _settings.readFromProperties(TunnelPoolSettings.PREFIX_INBOUND_EXPLORATORY, props); + else + _settings.readFromProperties(TunnelPoolSettings.PREFIX_OUTBOUND_EXPLORATORY, props); } /** @@ -412,11 +409,15 @@ public class TunnelPool { public List listPending() { synchronized (_inProgress) { return new ArrayList(_inProgress); } } /** duplicate of size(), let's pick one */ - int getTunnelCount() { synchronized (_tunnels) { return _tunnels.size(); } } + int getTunnelCount() { return size(); } public TunnelPoolSettings getSettings() { return _settings; } void setSettings(TunnelPoolSettings settings) { + if (settings != null && _settings != null) { + settings.getAliases().addAll(_settings.getAliases()); + settings.setAliasOf(_settings.getAliasOf()); + } _settings = settings; if (_settings != null) { if (_log.shouldLog(Log.INFO)) @@ -606,12 +607,18 @@ public class TunnelPool { if (_settings.isInbound() && !_settings.isExploratory()) { if (_log.shouldLog(Log.DEBUG)) _log.debug(toString() + ": refreshing leaseSet on tunnel expiration (but prior to grace timeout)"); - LeaseSet ls = null; + LeaseSet ls; synchronized (_tunnels) { ls = locked_buildNewLeaseSet(); } if (ls != null) { _context.clientManager().requestLeaseSet(_settings.getDestination(), ls); + Set aliases = _settings.getAliases(); + if (aliases != null && !aliases.isEmpty()) { + for (Hash h : aliases) { + _context.clientManager().requestLeaseSet(h, ls); + } + } } } } @@ -710,7 +717,7 @@ public class TunnelPool { * * @return null on failure */ - private LeaseSet locked_buildNewLeaseSet() { + protected LeaseSet locked_buildNewLeaseSet() { if (!_alive) return null; diff --git a/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java b/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java index 4cf54cb69..f318b479d 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java +++ b/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java @@ -97,7 +97,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { ctx.statManager().createRateStat("tunnel.testAborted", "Tunnel test could not occur, since there weren't any tunnels to test with", "Tunnels", RATES); } - + /** * Pick a random inbound exploratory tunnel. * Warning - selectInboundExploratoryTunnel(Hash) is preferred. @@ -113,7 +113,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { } return info; } - + /** * Pick a random inbound tunnel from the given destination's pool. * Warning - selectOutboundTunnel(Hash, Hash) is preferred. @@ -132,7 +132,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { " but there isn't a pool?"); return null; } - + /** * Pick a random outbound exploratory tunnel. * Warning - selectOutboundExploratoryTunnel(Hash) is preferred. @@ -148,7 +148,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { } return info; } - + /** * Pick a random outbound tunnel from the given destination's pool. * Warning - selectOutboundTunnel(Hash, Hash) is preferred. @@ -164,7 +164,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { } return null; } - + /** * Pick the inbound exploratory tunnel with the gateway closest to the given hash. * By using this instead of the random selectTunnel(), @@ -184,7 +184,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { } return info; } - + /** * Pick the inbound tunnel with the gateway closest to the given hash * from the given destination's pool. @@ -208,7 +208,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { " but there isn't a pool?"); return null; } - + /** * Pick the outbound exploratory tunnel with the endpoint closest to the given hash. * By using this instead of the random selectTunnel(), @@ -228,7 +228,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { } return info; } - + /** * Pick the outbound tunnel with the endpoint closest to the given hash * from the given destination's pool. @@ -249,7 +249,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { } return null; } - + /** * Expensive (iterates through all tunnels of all pools) and unnecessary. * @deprecated unused @@ -267,7 +267,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { if (info != null) return info; return null; } - + /** @return number of inbound exploratory tunnels */ public int getFreeTunnelCount() { return _inboundExploratory.size(); @@ -304,10 +304,11 @@ public class TunnelPoolManager implements TunnelManagerFacade { return pool.getTunnelCount(); return 0; } - + public int getParticipatingCount() { return _context.tunnelDispatcher().getParticipatingCount(); } + public long getLastParticipatingExpiration() { return _context.tunnelDispatcher().getLastParticipatingExpiration(); } - + /** * @return (number of part. tunnels) / (estimated total number of hops in our expl.+client tunnels) * 100 max. @@ -330,7 +331,6 @@ public class TunnelPoolManager implements TunnelManagerFacade { return Math.min(part / (double) count, 100d); } - public boolean isValidTunnel(Hash client, TunnelInfo tunnel) { if (tunnel.getExpiration() < _context.clock().now()) return false; @@ -386,17 +386,18 @@ public class TunnelPoolManager implements TunnelManagerFacade { pool.setSettings(settings); } } - + public synchronized void restart() { _handler.restart(); _executor.restart(); shutdownExploratory(); startup(); } - + /** * Used only at session startup. * Do not use to change settings. + * Do not use for aliased destinations; use addAlias(). */ public void buildTunnels(Destination client, ClientTunnelSettings settings) { Hash dest = client.calculateHash(); @@ -434,8 +435,89 @@ public class TunnelPoolManager implements TunnelManagerFacade { else outbound.startup(); } - - + + /** + * Add another destination to the same tunnels. + * Must have same encryption key an a different signing key. + * @throws IllegalArgumentException if not + * @return success + * @since 0.9.19 + */ + public boolean addAlias(Destination dest, ClientTunnelSettings settings, Destination existingClient) { + if (dest.getSigningPublicKey().equals(existingClient.getSigningPublicKey())) + throw new IllegalArgumentException("signing key must differ"); + if (!dest.getPublicKey().equals(existingClient.getPublicKey())) + throw new IllegalArgumentException("encryption key mismatch"); + Hash h = dest.calculateHash(); + Hash e = existingClient.calculateHash(); + synchronized(this) { + TunnelPool inbound = _clientInboundPools.get(h); + TunnelPool outbound = _clientOutboundPools.get(h); + if (inbound != null || outbound != null) { + if (_log.shouldLog(Log.WARN)) + _log.warn("already have alias " + dest); + return false; + } + TunnelPool eInbound = _clientInboundPools.get(e); + TunnelPool eOutbound = _clientOutboundPools.get(e); + if (eInbound == null || eOutbound == null) { + if (_log.shouldLog(Log.WARN)) + _log.warn("primary not found " + existingClient); + return false; + } + eInbound.getSettings().getAliases().add(h); + eOutbound.getSettings().getAliases().add(h); + TunnelPoolSettings newIn = settings.getInboundSettings(); + TunnelPoolSettings newOut = settings.getOutboundSettings(); + newIn.setAliasOf(e); + newOut.setAliasOf(e); + inbound = new AliasedTunnelPool(_context, this, newIn, eInbound); + outbound = new AliasedTunnelPool(_context, this, newOut, eOutbound); + _clientInboundPools.put(h, inbound); + _clientOutboundPools.put(h, outbound); + inbound.startup(); + outbound.startup(); + } + if (_log.shouldLog(Log.WARN)) + _log.warn("Added " + h + " as alias for " + e + " with settings " + settings); + return true; + } + + /** + * Remove a destination for the same tunnels as another. + * @since 0.9.19 + */ + public void removeAlias(Destination dest) { + Hash h = dest.calculateHash(); + synchronized(this) { + TunnelPool inbound = _clientInboundPools.remove(h); + if (inbound != null) { + Hash p = inbound.getSettings().getAliasOf(); + if (p != null) { + TunnelPool pri = _clientInboundPools.get(p); + if (pri != null) { + Set aliases = pri.getSettings().getAliases(); + if (aliases != null) + aliases.remove(h); + } + } + } + TunnelPool outbound = _clientOutboundPools.remove(h); + if (outbound != null) { + Hash p = outbound.getSettings().getAliasOf(); + if (p != null) { + TunnelPool pri = _clientOutboundPools.get(p); + if (pri != null) { + Set aliases = pri.getSettings().getAliases(); + if (aliases != null) + aliases.remove(h); + } + } + } + // TODO if primary already vanished... + } + } + private static class DelayedStartup implements SimpleTimer.TimedEvent { private final TunnelPool pool; @@ -469,7 +551,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { if (outbound != null) outbound.shutdown(); } - + /** queue a recurring test job if appropriate */ void buildComplete(PooledTunnelCreatorConfig cfg) { if (cfg.getLength() > 1 && @@ -518,7 +600,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { _context.jobQueue().addJob(new BootstrapPool(_context, _inboundExploratory)); _context.jobQueue().addJob(new BootstrapPool(_context, _outboundExploratory)); } - + private static class BootstrapPool extends JobImpl { private TunnelPool _pool; public BootstrapPool(RouterContext ctx, TunnelPool pool) { @@ -531,7 +613,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { _pool.buildFallback(); } } - + /** * Cannot be restarted */ @@ -546,7 +628,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { _inboundExploratory.shutdown(); _outboundExploratory.shutdown(); } - + /** list of TunnelPool instances currently in play */ public void listPools(List out) { out.addAll(_clientInboundPools.values());