chat client and server

This commit is contained in:
Zlatin Balevsky
2019-11-11 10:43:52 +00:00
parent 915deb1dee
commit 6080c8b308
6 changed files with 192 additions and 1 deletions

View File

@ -32,6 +32,7 @@ class MuWireSettings {
boolean searchComments
boolean browseFiles
boolean enableChat
int maxChatConnections
Set<String> watchedDirectories
float downloadSequentialRatio
int hostClearInterval, hostHopelessInterval, hostRejectInterval
@ -81,6 +82,7 @@ class MuWireSettings {
totalUploadSlots = Integer.valueOf(props.getProperty("totalUploadSlots","-1"))
uploadSlotsPerUser = Integer.valueOf(props.getProperty("uploadSlotsPerUser","-1"))
enableChat = Boolean.valueOf(props.getProperty("enableChat","false"))
maxChatConnections = Integer.valueOf(props.get("maxChatConnections", "-1"))
watchedDirectories = DataUtil.readEncodedSet(props, "watchedDirectories")
watchedKeywords = DataUtil.readEncodedSet(props, "watchedKeywords")
@ -130,6 +132,7 @@ class MuWireSettings {
props.setProperty("totalUploadSlots", String.valueOf(totalUploadSlots))
props.setProperty("uploadSlotsPerUser", String.valueOf(uploadSlotsPerUser))
props.setProperty("enableChat", String.valueOf(enableChat))
props.setProperty("maxChatConnectios", String.valueOf(maxChatConnections))
DataUtil.writeEncodedSet(watchedDirectories, "watchedDirectories", props)
DataUtil.writeEncodedSet(watchedKeywords, "watchedKeywords", props)

View File

@ -0,0 +1,108 @@
package com.muwire.core.chat
import java.nio.charset.StandardCharsets
import java.util.concurrent.Executor
import java.util.concurrent.Executors
import com.muwire.core.Constants
import com.muwire.core.EventBus
import com.muwire.core.MuWireSettings
import com.muwire.core.Persona
import com.muwire.core.connection.Endpoint
import com.muwire.core.connection.I2PConnector
import com.muwire.core.trust.TrustService
import com.muwire.core.util.DataUtil
import groovy.util.logging.Log
@Log
class ChatClient implements Closeable {
private static final long REJECTION_BACKOFF = 60 * 1000
private static final Executor CONNECTOR = Executors.newCachedThreadPool()
private final I2PConnector connector
private final EventBus eventBus
private final Persona host, me
private final TrustService trustService
private final MuWireSettings settings
private volatile ChatConnection connection
private volatile boolean connectInProgress
private volatile long lastRejectionTime
private volatile Thread connectThread
ChatClient(I2PConnector connector, EventBus eventBus, Persona host, Persona me, TrustService trustService,
MuWireSettings settings) {
this.connector = connector
this.eventBus = eventBus
this.host = host
this.me = me
this.trustService = trustService
this.settings = settings
}
void connectIfNeeded() {
if (connection != null || connectInProgress || (System.currentTimeMillis() - lastRejectionTime < REJECTION_BACKOFF))
return
CONNECTOR.execute({connect()})
}
private void connect() {
connectInProgress = true
connectThread = Thread.currentThread()
Endpoint endpoint = null
try {
eventBus.publish(new ChatConnectionEvent(status : ChatConnectionAttemptStatus.CONNECTING, persona : host))
endpoint = connector.connect(host.destination)
DataOutputStream dos = new DataOutputStream(endpoint.getOutputStream())
DataInputStream dis = new DataInputStream(endpoint.getInputStream())
dos.with {
write("IRC\r\n".getBytes(StandardCharsets.US_ASCII))
write("Version:${Constants.CHAT_VERSION}\r\n".getBytes(StandardCharsets.US_ASCII))
write("Persona:${me.toBase64()}\r\n".getBytes(StandardCharsets.US_ASCII))
write("\r\n".getBytes(StandardCharsets.US_ASCII))
flush()
}
String codeString = DataUtil.readTillRN(dis)
int code = Integer.parseInt(codeString.split(" ")[0])
if (code == 429) {
eventBus.publish(new ChatConnectionEvent(status : ChatConnectionAttemptStatus.REJECTED, persona : host))
endpoint.close()
lastRejectionTime = System.currentTimeMillis()
return
}
if (code != 200)
throw new Exception("unknown code $code")
Map<String,String> headers = DataUtil.readAllHeaders(dis)
if (!headers.containsKey('Version'))
throw new Exception("Version header missing")
int version = Integer.parseInt(headers['Version'])
if (version != Constants.CHAT_VERSION)
throw new Exception("Unknown chat version $version")
connection = new ChatConnection(eventBus, endpoint, host, false, trustService, settings)
eventBus.publish(new ChatConnectionEvent(status : ChatConnectionAttemptStatus.SUCCESSFUL, persona : host,
connection : connection))
} catch (Exception e) {
eventBus.publish(new ChatConnectionEvent(status : ChatConnectionAttemptStatus.FAILED, persona : host))
endpoint?.close()
} finally {
connectInProgress = false
connectThread = null
}
}
@Override
public void close() {
connectThread?.interrupt()
connection?.close()
}
}

View File

@ -1,5 +1,5 @@
package com.muwire.core.chat;
public enum ChatConnectionAttemptStatus {
SUCCESSFUL, REJECTED, FAILED
CONNECTING, SUCCESSFUL, REJECTED, FAILED
}

View File

@ -6,4 +6,5 @@ import com.muwire.core.Persona
class ChatConnectionEvent extends Event {
ChatConnectionAttemptStatus status
Persona persona
ChatConnection connection
}

View File

@ -0,0 +1,77 @@
package com.muwire.core.chat
import java.nio.charset.StandardCharsets
import java.util.concurrent.ConcurrentHashMap
import com.muwire.core.Constants
import com.muwire.core.EventBus
import com.muwire.core.MuWireSettings
import com.muwire.core.Persona
import com.muwire.core.connection.Endpoint
import com.muwire.core.trust.TrustService
import com.muwire.core.util.DataUtil
import net.i2p.data.Base64
import net.i2p.data.Destination
import net.i2p.util.ConcurrentHashSet
class ChatServer {
private final EventBus eventBus
private final MuWireSettings settings
private final TrustService trustService
private final Map<Destination, ChatConnection> connections = new ConcurrentHashMap()
ChatServer(EventBus eventBus, MuWireSettings settings, TrustService trustService) {
this.eventBus = eventBus
this.settings = settings
this.trustService = trustService
}
public void handle(Endpoint endpoint) {
InputStream is = endpoint.getInputStream()
OutputStream os = endpoint.getOutputStream()
Map<String, String> headers = DataUtil.readAllHeaders(is)
if (!headers.containsKey("Version"))
throw new Exception("Version header missing")
int version = Integer.parseInt(headers['Version'])
if (version != Constants.CHAT_VERSION)
throw new Exception("Unknown chat version $version")
if (!headers.containsKey('Persona'))
throw new Exception("Persona header missing")
Persona client = new Persona(new ByteArrayInputStream(Base64.decode(headers['Persona'])))
if (client.destination != endpoint.destination)
throw new Exception("Client destination mismatch")
if (!settings.enableChat) {
os.write("400 Chat Not Enabled\r\n\r\n".getBytes(StandardCharsets.US_ASCII))
os.close()
endpoint.close()
return
}
if (connections.containsKey(client.destination) || connections.size() == settings.maxChatConnections) {
os.write("429 Rejected\r\n\r\n".getBytes(StandardCharsets.US_ASCII))
os.close()
endpoint.close()
return
}
os.with {
write("200 OK\r\n".getBytes(StandardCharsets.US_ASCII))
write("Version:${Constants.CHAT_VERSION}\r\n".getBytes(StandardCharsets.US_ASCII))
write("\r\n".getBytes(StandardCharsets.US_ASCII))
flush()
}
ChatConnection connection = new ChatConnection(eventBus, endpoint, client, true, trustService, settings)
connections.put(endpoint.destination, connection)
connection.start()
eventBus.publish(new ChatConnectionEvent(connection : connection, status : ChatConnectionAttemptStatus.SUCCESSFUL, persona : client))
}
}

View File

@ -5,6 +5,8 @@ import net.i2p.crypto.SigType;
public class Constants {
public static final byte PERSONA_VERSION = (byte)1;
public static final byte FILE_CERT_VERSION = (byte)2;
public static final int CHAT_VERSION = 1;
public static final SigType SIG_TYPE = SigType.EdDSA_SHA512_Ed25519;
public static final int MAX_HEADER_SIZE = 0x1 << 14;