* SAM: Volatiles and finals

This commit is contained in:
zzz
2012-08-08 17:01:59 +00:00
parent 9cee0ee504
commit d8dd76c6e0
13 changed files with 88 additions and 123 deletions

View File

@@ -35,20 +35,20 @@ import net.i2p.util.Log;
*/ */
public class SAMBridge implements Runnable { public class SAMBridge implements Runnable {
private final static Log _log = new Log(SAMBridge.class); private final static Log _log = new Log(SAMBridge.class);
private ServerSocketChannel serverSocket; private final ServerSocketChannel serverSocket;
private Properties i2cpProps; private final Properties i2cpProps;
/** /**
* filename in which the name to private key mapping should * filename in which the name to private key mapping should
* be stored (and loaded from) * be stored (and loaded from)
*/ */
private String persistFilename; private final String persistFilename;
/** /**
* app designated destination name to the base64 of the I2P formatted * app designated destination name to the base64 of the I2P formatted
* destination keys (Destination+PrivateKey+SigningPrivateKey) * destination keys (Destination+PrivateKey+SigningPrivateKey)
*/ */
private Map<String,String> nameToPrivKeys; private final Map<String,String> nameToPrivKeys;
private boolean acceptConnections = true; private volatile boolean acceptConnections = true;
private static final int SAM_LISTENPORT = 7656; private static final int SAM_LISTENPORT = 7656;
@@ -64,8 +64,6 @@ public class SAMBridge implements Runnable {
protected static final String DEFAULT_DATAGRAM_PORT = "7655"; protected static final String DEFAULT_DATAGRAM_PORT = "7655";
private SAMBridge() {}
/** /**
* Build a new SAM bridge. * Build a new SAM bridge.
* *
@@ -73,6 +71,7 @@ public class SAMBridge implements Runnable {
* @param listenPort port number to listen for SAM connections on * @param listenPort port number to listen for SAM connections on
* @param i2cpProps set of I2CP properties for finding and communicating with the router * @param i2cpProps set of I2CP properties for finding and communicating with the router
* @param persistFile location to store/load named keys to/from * @param persistFile location to store/load named keys to/from
* @throws RuntimeException if a server socket can't be opened
*/ */
public SAMBridge(String listenHost, int listenPort, Properties i2cpProps, String persistFile) { public SAMBridge(String listenHost, int listenPort, Properties i2cpProps, String persistFile) {
persistFilename = persistFile; persistFilename = persistFile;
@@ -96,6 +95,7 @@ public class SAMBridge implements Runnable {
_log.error("Error starting SAM bridge on " _log.error("Error starting SAM bridge on "
+ (listenHost == null ? "0.0.0.0" : listenHost) + (listenHost == null ? "0.0.0.0" : listenHost)
+ ":" + listenPort, e); + ":" + listenPort, e);
throw new RuntimeException(e);
} }
this.i2cpProps = i2cpProps; this.i2cpProps = i2cpProps;

View File

@@ -30,10 +30,11 @@ public class SAMDatagramSession extends SAMMessageSession {
private final static Log _log = new Log(SAMDatagramSession.class); private final static Log _log = new Log(SAMDatagramSession.class);
public static int DGRAM_SIZE_MAX = 31*1024; public static int DGRAM_SIZE_MAX = 31*1024;
protected SAMDatagramReceiver recv = null; // FIXME make final after fixing SAMv3DatagramSession override
protected SAMDatagramReceiver recv;
private I2PDatagramMaker dgramMaker; private final I2PDatagramMaker dgramMaker;
private I2PDatagramDissector dgramDissector = new I2PDatagramDissector(); private final I2PDatagramDissector dgramDissector = new I2PDatagramDissector();
/** /**
* Create a new SAM DATAGRAM session. * Create a new SAM DATAGRAM session.
* *

View File

@@ -30,17 +30,17 @@ public abstract class SAMHandler implements Runnable {
protected I2PAppThread thread = null; protected I2PAppThread thread = null;
protected SAMBridge bridge = null; protected SAMBridge bridge = null;
private Object socketWLock = new Object(); // Guards writings on socket private final Object socketWLock = new Object(); // Guards writings on socket
protected SocketChannel socket = null; protected final SocketChannel socket;
protected int verMajor = 0; protected final int verMajor;
protected int verMinor = 0; protected final int verMinor;
/** I2CP options configuring the I2CP connection (port, host, numHops, etc) */ /** I2CP options configuring the I2CP connection (port, host, numHops, etc) */
protected Properties i2cpProps = null; protected final Properties i2cpProps;
private Object stopLock = new Object(); private final Object stopLock = new Object();
private boolean stopHandler = false; private volatile boolean stopHandler;
/** /**
* SAMHandler constructor (to be called by subclasses) * SAMHandler constructor (to be called by subclasses)
@@ -148,9 +148,7 @@ public abstract class SAMHandler implements Runnable {
* @throws IOException * @throws IOException
*/ */
protected final void closeClientSocket() throws IOException { protected final void closeClientSocket() throws IOException {
if (socket != null)
socket.close(); socket.close();
socket = null;
} }
/** /**

View File

@@ -159,8 +159,8 @@ public abstract class SAMMessageSession {
*/ */
public class SAMMessageSessionHandler implements Runnable, I2PSessionListener { public class SAMMessageSessionHandler implements Runnable, I2PSessionListener {
private Object runningLock = new Object(); private final Object runningLock = new Object();
private boolean stillRunning = true; private volatile boolean stillRunning = true;
/** /**
* Create a new SAM message-based session handler * Create a new SAM message-based session handler

View File

@@ -26,7 +26,9 @@ public class SAMRawSession extends SAMMessageSession {
private final static Log _log = new Log(SAMRawSession.class); private final static Log _log = new Log(SAMRawSession.class);
public static final int RAW_SIZE_MAX = 32*1024; public static final int RAW_SIZE_MAX = 32*1024;
protected SAMRawReceiver recv = null; // FIXME make final after fixing SAMv3DatagramSession override
protected SAMRawReceiver recv;
/** /**
* Create a new SAM RAW session. * Create a new SAM RAW session.
* *

View File

@@ -51,19 +51,19 @@ public class SAMStreamSession {
protected final static int SOCKET_HANDLER_BUF_SIZE = 32768; protected final static int SOCKET_HANDLER_BUF_SIZE = 32768;
protected SAMStreamReceiver recv = null; protected final SAMStreamReceiver recv;
protected SAMStreamSessionServer server = null; protected final SAMStreamSessionServer server;
protected I2PSocketManager socketMgr = null; protected final I2PSocketManager socketMgr;
private Object handlersMapLock = new Object(); private final Object handlersMapLock = new Object();
/** stream id (Long) to SAMStreamSessionSocketReader */ /** stream id (Long) to SAMStreamSessionSocketReader */
private HashMap<Integer,SAMStreamSessionSocketReader> handlersMap = new HashMap<Integer,SAMStreamSessionSocketReader>(); private final HashMap<Integer,SAMStreamSessionSocketReader> handlersMap = new HashMap<Integer,SAMStreamSessionSocketReader>();
/** stream id (Long) to StreamSender */ /** stream id (Long) to StreamSender */
private HashMap<Integer,StreamSender> sendersMap = new HashMap<Integer,StreamSender>(); private final HashMap<Integer,StreamSender> sendersMap = new HashMap<Integer,StreamSender>();
private Object idLock = new Object(); private final Object idLock = new Object();
private int lastNegativeId = 0; private int lastNegativeId = 0;
// Can we create outgoing connections? // Can we create outgoing connections?
@@ -73,15 +73,11 @@ public class SAMStreamSession {
* should we flush every time we get a STREAM SEND, or leave that up to * should we flush every time we get a STREAM SEND, or leave that up to
* the streaming lib to decide? * the streaming lib to decide?
*/ */
protected boolean forceFlush = false; protected final boolean forceFlush;
public static String PROP_FORCE_FLUSH = "sam.forceFlush"; public static String PROP_FORCE_FLUSH = "sam.forceFlush";
public static String DEFAULT_FORCE_FLUSH = "false"; public static String DEFAULT_FORCE_FLUSH = "false";
public SAMStreamSession() {
}
/** /**
* Create a new SAM STREAM session. * Create a new SAM STREAM session.
* *
@@ -95,11 +91,7 @@ public class SAMStreamSession {
*/ */
public SAMStreamSession(String dest, String dir, Properties props, public SAMStreamSession(String dest, String dir, Properties props,
SAMStreamReceiver recv) throws IOException, DataFormatException, SAMException { SAMStreamReceiver recv) throws IOException, DataFormatException, SAMException {
ByteArrayInputStream bais; this(new ByteArrayInputStream(Base64.decode(dest)), dir, props, recv);
bais = new ByteArrayInputStream(Base64.decode(dest));
initSAMStreamSession(bais, dir, props, recv);
} }
/** /**
@@ -115,11 +107,6 @@ public class SAMStreamSession {
*/ */
public SAMStreamSession(InputStream destStream, String dir, public SAMStreamSession(InputStream destStream, String dir,
Properties props, SAMStreamReceiver recv) throws IOException, DataFormatException, SAMException { Properties props, SAMStreamReceiver recv) throws IOException, DataFormatException, SAMException {
initSAMStreamSession(destStream, dir, props, recv);
}
private void initSAMStreamSession(InputStream destStream, String dir,
Properties props, SAMStreamReceiver recv) throws IOException, DataFormatException, SAMException{
this.recv = recv; this.recv = recv;
_log.debug("SAM STREAM session instantiated"); _log.debug("SAM STREAM session instantiated");
@@ -168,6 +155,8 @@ public class SAMStreamSession {
Thread t = new I2PAppThread(server, "SAMStreamSessionServer"); Thread t = new I2PAppThread(server, "SAMStreamSessionServer");
t.start(); t.start();
} else {
server = null;
} }
} }
@@ -417,10 +406,10 @@ public class SAMStreamSession {
*/ */
public class SAMStreamSessionServer implements Runnable { public class SAMStreamSessionServer implements Runnable {
private Object runningLock = new Object(); private final Object runningLock = new Object();
private boolean stillRunning = true; private volatile boolean stillRunning = true;
private I2PServerSocket serverSocket = null; private final I2PServerSocket serverSocket;
/** /**
* Create a new SAM STREAM session server * Create a new SAM STREAM session server
@@ -511,9 +500,9 @@ public class SAMStreamSession {
protected I2PSocket i2pSocket = null; protected I2PSocket i2pSocket = null;
protected Object runningLock = new Object(); protected final Object runningLock = new Object();
protected boolean stillRunning = true; protected volatile boolean stillRunning = true;
protected int id; protected int id;
@@ -660,8 +649,8 @@ public class SAMStreamSession {
private int _id; private int _id;
private ByteCache _cache; private ByteCache _cache;
private OutputStream _out = null; private OutputStream _out = null;
private boolean _stillRunning, _shuttingDownGracefully; private volatile boolean _stillRunning, _shuttingDownGracefully;
private Object runningLock = new Object(); private final Object runningLock = new Object();
private I2PSocket i2pSocket = null; private I2PSocket i2pSocket = null;
public v1StreamSender ( I2PSocket s, int id ) throws IOException { public v1StreamSender ( I2PSocket s, int id ) throws IOException {

View File

@@ -206,6 +206,7 @@ public class SAMUtils {
return msg; return msg;
} }
/****
public static void main(String args[]) { public static void main(String args[]) {
try { try {
test("a=b c=d e=\"f g h\""); test("a=b c=d e=\"f g h\"");
@@ -220,4 +221,5 @@ public class SAMUtils {
Properties p = parseParams(tok); Properties p = parseParams(tok);
System.out.println(p); System.out.println(p);
} }
****/
} }

View File

@@ -46,7 +46,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
protected SAMDatagramSession getDatagramSession() {return datagramSession ;} protected SAMDatagramSession getDatagramSession() {return datagramSession ;}
protected SAMStreamSession getStreamSession() {return streamSession ;} protected SAMStreamSession getStreamSession() {return streamSession ;}
protected long _id; protected final long _id;
protected static volatile long __id = 0; protected static volatile long __id = 0;
/** /**

View File

@@ -244,14 +244,14 @@ public class SAMv2StreamSession extends SAMStreamSession
protected class v2StreamSender extends StreamSender protected class v2StreamSender extends StreamSender
{ {
private List<ByteArray> _data; private final List<ByteArray> _data;
private int _dataSize; private int _dataSize;
private int _id; private final int _id;
private ByteCache _cache; private final ByteCache _cache;
private OutputStream _out = null; private final OutputStream _out;
private boolean _stillRunning, _shuttingDownGracefully; private volatile boolean _stillRunning, _shuttingDownGracefully;
private Object runningLock = new Object(); private final Object runningLock = new Object();
private I2PSocket i2pSocket = null; private final I2PSocket i2pSocket;
public v2StreamSender ( I2PSocket s, int id ) throws IOException public v2StreamSender ( I2PSocket s, int id ) throws IOException
{ {

View File

@@ -21,10 +21,10 @@ public class SAMv3DatagramSession extends SAMDatagramSession implements SAMv3Han
private final static Log _log = new Log ( SAMv3DatagramSession.class ); private final static Log _log = new Log ( SAMv3DatagramSession.class );
SAMv3Handler handler = null ; final SAMv3Handler handler;
SAMv3Handler.DatagramServer server = null ; final SAMv3Handler.DatagramServer server;
String nick = null ; final String nick;
SocketAddress clientAddress = null ; final SocketAddress clientAddress;
public String getNick() { return nick; } public String getNick() { return nick; }
@@ -41,10 +41,10 @@ public class SAMv3DatagramSession extends SAMDatagramSession implements SAMv3Han
super(SAMv3Handler.sSessionsHash.get(nick).getDest(), super(SAMv3Handler.sSessionsHash.get(nick).getDest(),
SAMv3Handler.sSessionsHash.get(nick).getProps(), SAMv3Handler.sSessionsHash.get(nick).getProps(),
null null // to be replaced by this
); );
this.nick = nick ; this.nick = nick ;
this.recv = this ; this.recv = this ; // replacement
this.server = SAMv3Handler.DatagramServer.getInstance() ; this.server = SAMv3Handler.DatagramServer.getInstance() ;
SAMv3Handler.SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick); SAMv3Handler.SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick);
@@ -56,6 +56,7 @@ public class SAMv3DatagramSession extends SAMDatagramSession implements SAMv3Han
String portStr = props.getProperty("PORT") ; String portStr = props.getProperty("PORT") ;
if ( portStr==null ) { if ( portStr==null ) {
_log.debug("receiver port not specified. Current socket will be used."); _log.debug("receiver port not specified. Current socket will be used.");
this.clientAddress = null;
} }
else { else {
int port = Integer.parseInt(portStr); int port = Integer.parseInt(portStr);

View File

@@ -142,7 +142,7 @@ public class SAMv3Handler extends SAMv1Handler
class Listener implements Runnable { class Listener implements Runnable {
DatagramChannel server = null; final DatagramChannel server;
public Listener(DatagramChannel server) public Listener(DatagramChannel server)
{ {
@@ -172,7 +172,7 @@ public class SAMv3Handler extends SAMv1Handler
public static class MessageDispatcher implements Runnable public static class MessageDispatcher implements Runnable
{ {
ByteArrayInputStream is = null ; final ByteArrayInputStream is;
public MessageDispatcher(byte[] buf) public MessageDispatcher(byte[] buf)
{ {
@@ -210,10 +210,10 @@ public class SAMv3Handler extends SAMv1Handler
public class SessionRecord public class SessionRecord
{ {
protected String m_dest ; protected final String m_dest ;
protected Properties m_props ; protected final Properties m_props ;
protected ThreadGroup m_threadgroup ; protected ThreadGroup m_threadgroup ;
protected SAMv3Handler m_handler ; protected final SAMv3Handler m_handler ;
public SessionRecord( String dest, Properties props, SAMv3Handler handler ) public SessionRecord( String dest, Properties props, SAMv3Handler handler )
{ {
@@ -268,7 +268,7 @@ public class SAMv3Handler extends SAMv1Handler
static final long serialVersionUID = 0x1 ; static final long serialVersionUID = 0x1 ;
} }
HashMap<String, SessionRecord> map ; final HashMap<String, SessionRecord> map;
public SessionsDB() { public SessionsDB() {
map = new HashMap<String, SessionRecord>() ; map = new HashMap<String, SessionRecord>() ;
@@ -578,6 +578,9 @@ public class SAMv3Handler extends SAMv1Handler
} }
} }
/**
* @throws NPE if login nickname is not registered
*/
SAMv3StreamSession newSAMStreamSession(String login ) SAMv3StreamSession newSAMStreamSession(String login )
throws IOException, DataFormatException, SAMException throws IOException, DataFormatException, SAMException
{ {

View File

@@ -20,11 +20,11 @@ import net.i2p.util.Log;
*/ */
public class SAMv3RawSession extends SAMRawSession implements SAMv3Handler.Session, SAMRawReceiver { public class SAMv3RawSession extends SAMRawSession implements SAMv3Handler.Session, SAMRawReceiver {
String nick = null ; final String nick;
SAMv3Handler handler = null ; final SAMv3Handler handler;
SAMv3Handler.DatagramServer server ; final SAMv3Handler.DatagramServer server;
private final static Log _log = new Log ( SAMv3DatagramSession.class ); private final static Log _log = new Log ( SAMv3DatagramSession.class );
SocketAddress clientAddress = null ; final SocketAddress clientAddress;
public String getNick() { return nick; } public String getNick() { return nick; }
@@ -42,10 +42,10 @@ public class SAMv3RawSession extends SAMRawSession implements SAMv3Handler.Sess
super(SAMv3Handler.sSessionsHash.get(nick).getDest(), super(SAMv3Handler.sSessionsHash.get(nick).getDest(),
SAMv3Handler.sSessionsHash.get(nick).getProps(), SAMv3Handler.sSessionsHash.get(nick).getProps(),
SAMv3Handler.sSessionsHash.get(nick).getHandler() SAMv3Handler.sSessionsHash.get(nick).getHandler() // to be replaced by this
); );
this.nick = nick ; this.nick = nick ;
this.recv = this ; this.recv = this ; // replacement
this.server = SAMv3Handler.DatagramServer.getInstance() ; this.server = SAMv3Handler.DatagramServer.getInstance() ;
SAMv3Handler.SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick); SAMv3Handler.SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick);
@@ -59,6 +59,7 @@ public class SAMv3RawSession extends SAMRawSession implements SAMv3Handler.Sess
String portStr = props.getProperty("PORT") ; String portStr = props.getProperty("PORT") ;
if ( portStr==null ) { if ( portStr==null ) {
_log.debug("receiver port not specified. Current socket will be used."); _log.debug("receiver port not specified. Current socket will be used.");
this.clientAddress = null;
} }
else { else {
int port = Integer.parseInt(portStr); int port = Integer.parseInt(portStr);

View File

@@ -45,10 +45,10 @@ public class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handle
protected final int BUFFER_SIZE = 1024 ; protected final int BUFFER_SIZE = 1024 ;
protected Object socketServerLock = new Object(); protected final Object socketServerLock = new Object();
protected I2PServerSocket socketServer = null; protected I2PServerSocket socketServer = null;
protected String nick ; protected final String nick ;
public String getNick() { public String getNick() {
return nick ; return nick ;
@@ -62,11 +62,15 @@ public class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handle
* @throws IOException * @throws IOException
* @throws DataFormatException * @throws DataFormatException
* @throws SAMException * @throws SAMException
* @throws NPE if login nickname is not registered
*/ */
public SAMv3StreamSession(String login) public SAMv3StreamSession(String login)
throws IOException, DataFormatException, SAMException throws IOException, DataFormatException, SAMException
{ {
initSAMStreamSession(login); super(getDB().get(login).getDest(), "CREATE",
getDB().get(login).getProps(),
getDB().get(login).getHandler());
this.nick = login ;
} }
public static SAMv3Handler.SessionsDB getDB() public static SAMv3Handler.SessionsDB getDB()
@@ -74,42 +78,6 @@ public class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handle
return SAMv3Handler.sSessionsHash ; return SAMv3Handler.sSessionsHash ;
} }
private void initSAMStreamSession(String login)
throws IOException, DataFormatException, SAMException {
SAMv3Handler.SessionRecord rec = getDB().get(login);
String dest = rec.getDest() ;
ByteArrayInputStream ba_dest = new ByteArrayInputStream(Base64.decode(dest));
this.recv = rec.getHandler();
_log.debug("SAM STREAM session instantiated");
Properties allprops = (Properties) System.getProperties().clone();
allprops.putAll(rec.getProps());
String i2cpHost = allprops.getProperty(I2PClient.PROP_TCP_HOST, "127.0.0.1");
int i2cpPort ;
String port = allprops.getProperty(I2PClient.PROP_TCP_PORT, "7654");
try {
i2cpPort = Integer.parseInt(port);
} catch (NumberFormatException nfe) {
throw new SAMException("Invalid I2CP port specified [" + port + "]");
}
_log.debug("Creating I2PSocketManager...");
socketMgr = I2PSocketManagerFactory.createManager(ba_dest,
i2cpHost,
i2cpPort,
allprops);
if (socketMgr == null) {
throw new SAMException("Error creating I2PSocketManager towards "+i2cpHost+":"+i2cpPort);
}
socketMgr.addDisconnectListener(new DisconnectListener());
this.nick = login ;
}
/** /**
* Connect the SAM STREAM session to the specified Destination * Connect the SAM STREAM session to the specified Destination
* *
@@ -248,10 +216,10 @@ public class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handle
public class SocketForwarder extends Thread public class SocketForwarder extends Thread
{ {
String host = null ; final String host;
int port = 0 ; final int port;
SAMv3StreamSession session; final SAMv3StreamSession session;
boolean verbose; final boolean verbose;
SocketForwarder(String host, int port, SAMv3StreamSession session, boolean verbose) { SocketForwarder(String host, int port, SAMv3StreamSession session, boolean verbose) {
this.host = host ; this.host = host ;
@@ -317,9 +285,9 @@ public class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handle
} }
public class Pipe extends Thread public class Pipe extends Thread
{ {
ReadableByteChannel in ; final ReadableByteChannel in ;
WritableByteChannel out ; final WritableByteChannel out ;
ByteBuffer buf ; final ByteBuffer buf ;
public Pipe(ReadableByteChannel in, WritableByteChannel out, String name) public Pipe(ReadableByteChannel in, WritableByteChannel out, String name)
{ {