core side of certificate exchange

This commit is contained in:
Zlatin Balevsky
2019-11-04 17:17:57 +00:00
parent 5d51b1c580
commit 36c1a1a288
12 changed files with 254 additions and 31 deletions

View File

@ -270,7 +270,7 @@ public class Core {
I2PConnector i2pConnector = new I2PConnector(socketManager)
log.info "initializing results sender"
ResultsSender resultsSender = new ResultsSender(eventBus, i2pConnector, me, props)
ResultsSender resultsSender = new ResultsSender(eventBus, i2pConnector, me, props, certificateManager)
log.info "initializing search manager"
SearchManager searchManager = new SearchManager(eventBus, me, resultsSender)
@ -296,7 +296,8 @@ public class Core {
log.info("initializing acceptor")
I2PAcceptor i2pAcceptor = new I2PAcceptor(socketManager)
connectionAcceptor = new ConnectionAcceptor(eventBus, connectionManager, props,
i2pAcceptor, hostCache, trustService, searchManager, uploadManager, fileManager, connectionEstablisher)
i2pAcceptor, hostCache, trustService, searchManager, uploadManager, fileManager, connectionEstablisher,
certificateManager)
log.info("initializing directory watcher")
directoryWatcher = new DirectoryWatcher(eventBus, fileManager, home, props)

View File

@ -1,6 +1,7 @@
package com.muwire.core.connection
import java.nio.charset.StandardCharsets
import java.nio.file.attribute.DosFileAttributes
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.logging.Level
@ -11,8 +12,11 @@ import java.util.zip.InflaterInputStream
import com.muwire.core.Constants
import com.muwire.core.EventBus
import com.muwire.core.InfoHash
import com.muwire.core.MuWireSettings
import com.muwire.core.Persona
import com.muwire.core.filecert.Certificate
import com.muwire.core.filecert.CertificateManager
import com.muwire.core.files.FileManager
import com.muwire.core.hostcache.HostCache
import com.muwire.core.trust.TrustLevel
@ -45,6 +49,7 @@ class ConnectionAcceptor {
final UploadManager uploadManager
final FileManager fileManager
final ConnectionEstablisher establisher
final CertificateManager certificateManager
final ExecutorService acceptorThread
final ExecutorService handshakerThreads
@ -56,7 +61,7 @@ class ConnectionAcceptor {
ConnectionAcceptor(EventBus eventBus, UltrapeerConnectionManager manager,
MuWireSettings settings, I2PAcceptor acceptor, HostCache hostCache,
TrustService trustService, SearchManager searchManager, UploadManager uploadManager,
FileManager fileManager, ConnectionEstablisher establisher) {
FileManager fileManager, ConnectionEstablisher establisher, CertificateManager certificateManager) {
this.eventBus = eventBus
this.manager = manager
this.settings = settings
@ -67,6 +72,7 @@ class ConnectionAcceptor {
this.fileManager = fileManager
this.uploadManager = uploadManager
this.establisher = establisher
this.certificateManager = certificateManager
acceptorThread = Executors.newSingleThreadExecutor { r ->
def rv = new Thread(r)
@ -145,6 +151,9 @@ class ConnectionAcceptor {
case (byte)'B':
processBROWSE(e)
break
case (byte)'C':
processCERTIFICATES(e)
break
default:
throw new Exception("Invalid read $read")
}
@ -353,7 +362,8 @@ class ConnectionAcceptor {
JsonOutput jsonOutput = new JsonOutput()
sharedFiles.each {
it.hit()
def obj = ResultsSender.sharedFileToObj(it, false)
int certificates = certificateManager.getByInfoHash(it.getInfoHash()).size()
def obj = ResultsSender.sharedFileToObj(it, false, certificates)
def json = jsonOutput.toJson(obj)
dos.writeShort((short)json.length())
dos.write(json.getBytes(StandardCharsets.US_ASCII))
@ -406,5 +416,55 @@ class ConnectionAcceptor {
e.close()
}
}
private void processCERTIFICATES(Endpoint e) {
try {
byte [] ERTIFICATES = new byte[12]
DataInputStream dis = new DataInputStream(e.getInputStream())
dis.readFully(ERTIFICATES)
if (ERTIFICATES != "ERTIFICATES ".getBytes(StandardCharsets.US_ASCII))
throw new IOException("Invalid CERTIFICATES connection")
byte [] infoHashStringBytes = new byte[44]
dis.readFully(infoHashStringBytes)
String infoHashString = new String(infoHashStringBytes, StandardCharsets.US_ASCII)
byte[] rn = new byte[2]
dis.readFully(rn)
if (rn != "\r\n".getBytes(StandardCharsets.US_ASCII))
throw new IOException("Malformed CERTIFICATES request")
String header
while ((header = DataUtil.readTillRN(dis)) != ""); // ignore headers for now
log.info("responding to certificates request for $infoHashString")
byte [] root = Base64.decode(infoHashString)
Set<Certificate> certs = certificateManager.getByInfoHash(new InfoHash(root))
if (certs.isEmpty()) {
log.info("certs not found")
e.getOutputStream().write("404 Certs Not Found\r\n\r\n".getBytes(StandardCharsets.US_ASCII))
e.getOutputStream().flush()
return
}
OutputStream os = e.getOutputStream()
os.write("200 OK\r\n".getBytes(StandardCharsets.US_ASCII))
os.write("Count: ${certs.size()}\r\n".getBytes(StandardCharsets.US_ASCII))
os.write("\r\n".getBytes(StandardCharsets.US_ASCII))
DataOutputStream dos = new DataOutputStream(os)
certs.each {
ByteArrayOutputStream baos = new ByteArrayOutputStream()
it.write(baos)
byte [] payload = baos.toByteArray()
dos.writeShort(payload.length)
dos.write(payload)
}
dos.close()
} finally {
e.close()
}
}
}

View File

@ -0,0 +1,88 @@
package com.muwire.core.filecert
import java.nio.charset.StandardCharsets
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.logging.Level
import net.i2p.data.Base64
import com.muwire.core.Constants
import com.muwire.core.EventBus
import com.muwire.core.InvalidSignatureException
import com.muwire.core.connection.Endpoint
import com.muwire.core.connection.I2PConnector
import com.muwire.core.util.DataUtil
import groovy.util.logging.Log
@Log
class CertificateClient {
private final EventBus eventBus
private final I2PConnector connector
private final ExecutorService fetcherThread = Executors.newSingleThreadExecutor()
CertificateClient(EventBus eventBus, I2PConnector connector) {
this.eventBus = eventBus
this.connector = connector
}
void onUIFetchCertificatesEvent(UIFetchCertificatesEvent e) {
fetcherThread.execute({
Endpoint endpoint = null
try {
eventBus.publish(new CertificateFetchEvent(status : CertificateFetchStatus.CONNECTING))
endpoint = connector.connect(e.host.destination)
String infoHashString = Base64.encode(e.infoHash.getRoot())
OutputStream os = endpoint.getOutputStream()
os.write("CERTIFICATES ${infoHashString}\r\n\r\n".getBytes(StandardCharsets.US_ASCII))
InputStream is = endpoint.getInputStream()
String code = DataUtil.readTillRN(is)
if (!code.startsWith("200"))
throw new IOException("invalid code $code")
// parse all headers
Map<String,String> headers = new HashMap<>()
String header
while((header = DataUtil.readTillRN(is)) != "" && headers.size() < Constants.MAX_HEADERS) {
int colon = header.indexOf(':')
if (colon == -1 || colon == header.length() - 1)
throw new IOException("invalid header $header")
String key = header.substring(0, colon)
String value = header.substring(colon + 1)
headers[key] = value.trim()
}
if (!headers.containsKey("Count"))
throw new IOException("No count header")
int count = Integer.parseInt(headers['Count'])
// start pulling the certs
eventBus.publish(new CertificateFetchEvent(status : CertificateFetchStatus.FETCHING, count : count))
DataInputStream dis = new DataInputStream(is)
for (int i = 0; i < count; i++) {
int size = dis.readUnsignedShort()
byte [] tmp = new byte[size]
dis.readFully(tmp)
Certificate cert = null
try {
cert = new Certificate(new ByteArrayInputStream(tmp))
} catch (IOException | InvalidSignatureException ignore) {
continue
}
eventBus.publish(new CertificateFetchedEvent(certificate : cert))
}
} catch (Exception bad) {
log.log(Level.WARNING,"Fetching certificates failed", bad)
eventBus.publish(new CertificateFetchEvent(status : CertificateFetchStatus.FAILED))
} finally {
endpoint?.close()
}
})
}
}

View File

@ -0,0 +1,8 @@
package com.muwire.core.filecert
import com.muwire.core.Event
class CertificateFetchEvent extends Event {
CertificateFetchStatus status
int count
}

View File

@ -0,0 +1,5 @@
package com.muwire.core.filecert;
public enum CertificateFetchStatus {
CONNECTING, FETCHING, DONE, FAILED
}

View File

@ -0,0 +1,7 @@
package com.muwire.core.filecert
import com.muwire.core.Event
class CertificateFetchedEvent extends Event {
Certificate certificate
}

View File

@ -12,6 +12,7 @@ import com.muwire.core.Persona
import groovy.util.logging.Log
import net.i2p.data.Base64
import net.i2p.data.SigningPrivateKey
import net.i2p.util.ConcurrentHashSet
@Log
class CertificateManager {
@ -51,14 +52,14 @@ class CertificateManager {
Set<Certificate> existing = byInfoHash.get(cert.infoHash)
if (existing == null) {
existing = new HashSet<>()
existing = new ConcurrentHashSet<>()
byInfoHash.put(cert.infoHash, existing)
}
existing.add(cert)
existing = byIssuer.get(cert.issuer)
if (existing == null) {
existing = new HashSet<>()
existing = new ConcurrentHashSet<>()
byIssuer.put(cert.issuer, existing)
}
existing.add(cert)
@ -73,30 +74,45 @@ class CertificateManager {
long timestamp = System.currentTimeMillis()
Certificate cert = new Certificate(infoHash, name, timestamp, me, spk)
boolean added = true
Set<Certificate> existing = byInfoHash.get(cert.infoHash)
if (existing == null) {
existing = new HashSet<>()
byInfoHash.put(cert.infoHash, existing)
}
added &= existing.add(cert)
existing = byIssuer.get(cert.issuer)
if (existing == null) {
existing = new HashSet<>()
byIssuer.put(cert.issuer, existing)
}
added &= existing.add(cert)
if (added) {
String infoHashString = Base64.encode(infoHash.getRoot())
File certFile = new File(certDir, "${infoHashString}${name}.mwcert")
certFile.withOutputStream { cert.write(it) }
if (addToMaps(cert)) {
saveCert(cert)
eventBus.publish(new CertificateCreatedEvent(certificate : cert))
}
}
void onUIImportCertificateEvent(UIImportCertificateEvent e) {
Certificate cert = e.certificate
if (!addToMaps(cert))
return
saveCert(cert)
}
private void saveCert(Certificate cert) {
String infoHashString = Base64.encode(cert.infoHash.getRoot())
File certFile = new File(certDir, "${infoHashString}_${cert.issuer.getHumanReadableName()}.mwcert")
certFile.withOutputStream { cert.write(it) }
}
private boolean addToMaps(Certificate cert) {
boolean added = true
Set<Certificate> existing = byInfoHash.get(cert.infoHash)
if (existing == null) {
existing = new ConcurrentHashSet<>()
byInfoHash.put(cert.infoHash, existing)
}
added &= existing.add(cert)
existing = byIssuer.get(cert.issuer)
if (existing == null) {
existing = new ConcurrentHashSet<>()
byIssuer.put(cert.issuer, existing)
}
added &= existing.add(cert)
added
}
boolean hasLocalCertificate(InfoHash infoHash) {
if (!byInfoHash.containsKey(infoHash))
return false
@ -106,5 +122,12 @@ class CertificateManager {
return true
}
return false
}
Set<Certificate> getByInfoHash(InfoHash infoHash) {
Set<Certificate> rv = new HashSet<>()
if (byInfoHash.containsKey(infoHash))
rv.addAll(byInfoHash.get(infoHash))
rv
}
}

View File

@ -0,0 +1,10 @@
package com.muwire.core.filecert
import com.muwire.core.Event
import com.muwire.core.InfoHash
import com.muwire.core.Persona
class UIFetchCertificatesEvent extends Event {
Persona host
InfoHash infoHash
}

View File

@ -0,0 +1,7 @@
package com.muwire.core.filecert
import com.muwire.core.Event
class UIImportCertificateEvent extends Event {
Certificate certificate
}

View File

@ -99,6 +99,10 @@ class ResultsParser {
boolean browse = false
if (json.browse != null)
browse = json.browse
int certificates = 0
if (json.certificates != null)
certificates = json.certificates
return new UIResultEvent( sender : p,
name : name,
@ -108,7 +112,8 @@ class ResultsParser {
sources : sources,
comment : comment,
browse : browse,
uuid: uuid)
uuid: uuid,
certificates : certificates)
} catch (Exception e) {
throw new InvalidSearchResultException("parsing search result failed",e)
}

View File

@ -3,6 +3,7 @@ package com.muwire.core.search
import com.muwire.core.SharedFile
import com.muwire.core.connection.Endpoint
import com.muwire.core.connection.I2PConnector
import com.muwire.core.filecert.CertificateManager
import com.muwire.core.files.FileHasher
import com.muwire.core.util.DataUtil
import com.muwire.core.Persona
@ -46,12 +47,14 @@ class ResultsSender {
private final Persona me
private final EventBus eventBus
private final MuWireSettings settings
private final CertificateManager certificateManager
ResultsSender(EventBus eventBus, I2PConnector connector, Persona me, MuWireSettings settings) {
ResultsSender(EventBus eventBus, I2PConnector connector, Persona me, MuWireSettings settings, CertificateManager certificateManager) {
this.connector = connector;
this.eventBus = eventBus
this.me = me
this.settings = settings
this.certificateManager = certificateManager
}
void sendResults(UUID uuid, SharedFile[] results, Destination target, boolean oobInfohash, boolean compressedResults) {
@ -70,6 +73,7 @@ class ResultsSender {
if (it.getComment() != null) {
comment = DataUtil.readi18nString(Base64.decode(it.getComment()))
}
int certificates = certificateManager.getByInfoHash(it.getInfoHash()).size()
def uiResultEvent = new UIResultEvent( sender : me,
name : it.getFile().getName(),
size : length,
@ -77,7 +81,8 @@ class ResultsSender {
pieceSize : pieceSize,
uuid : uuid,
sources : suggested,
comment : comment
comment : comment,
certificates : certificates
)
uiResultEvents << uiResultEvent
}
@ -108,7 +113,8 @@ class ResultsSender {
me.write(os)
os.writeShort((short)results.length)
results.each {
def obj = sharedFileToObj(it, settings.browseFiles)
int certificates = certificateManager.getByInfoHash(it.getInfoHash()).size()
def obj = sharedFileToObj(it, settings.browseFiles, certificates)
def json = jsonOutput.toJson(obj)
os.writeShort((short)json.length())
os.write(json.getBytes(StandardCharsets.US_ASCII))
@ -125,9 +131,10 @@ class ResultsSender {
os.write("Sender: ${me.toBase64()}\r\n".getBytes(StandardCharsets.US_ASCII))
os.write("Count: $results.length\r\n".getBytes(StandardCharsets.US_ASCII))
os.write("\r\n".getBytes(StandardCharsets.US_ASCII))
int certificates = certificateManager.getByInfoHash(it.getInfoHash()).size()
DataOutputStream dos = new DataOutputStream(new GZIPOutputStream(os))
results.each {
def obj = sharedFileToObj(it, settings.browseFiles)
def obj = sharedFileToObj(it, settings.browseFiles, certificates)
def json = jsonOutput.toJson(obj)
dos.writeShort((short)json.length())
dos.write(json.getBytes(StandardCharsets.US_ASCII))
@ -143,7 +150,7 @@ class ResultsSender {
}
}
public static def sharedFileToObj(SharedFile sf, boolean browseFiles) {
public static def sharedFileToObj(SharedFile sf, boolean browseFiles, int certificates) {
byte [] name = sf.getFile().getName().getBytes(StandardCharsets.UTF_8)
def baos = new ByteArrayOutputStream()
def daos = new DataOutputStream(baos)
@ -166,6 +173,7 @@ class ResultsSender {
obj.comment = sf.getComment()
obj.browse = browseFiles
obj.certificates = certificates
obj
}
}

View File

@ -16,6 +16,7 @@ class UIResultEvent extends Event {
int pieceSize
String comment
boolean browse
int certificates
@Override
public String toString() {