distinguish between discovered sources and verified sources. Only propagate and persist verified sources
This commit is contained in:
@ -25,6 +25,7 @@ import com.muwire.core.connection.LeafConnectionManager
|
||||
import com.muwire.core.connection.UltrapeerConnectionManager
|
||||
import com.muwire.core.download.DownloadManager
|
||||
import com.muwire.core.download.SourceDiscoveredEvent
|
||||
import com.muwire.core.download.SourceVerifiedEvent
|
||||
import com.muwire.core.download.UIDownloadCancelledEvent
|
||||
import com.muwire.core.download.UIDownloadEvent
|
||||
import com.muwire.core.download.UIDownloadPausedEvent
|
||||
@ -285,6 +286,7 @@ public class Core {
|
||||
log.info("initializing mesh manager")
|
||||
MeshManager meshManager = new MeshManager(fileManager, home, props)
|
||||
eventBus.register(SourceDiscoveredEvent.class, meshManager)
|
||||
eventBus.register(SourceVerifiedEvent.class, meshManager)
|
||||
|
||||
log.info "initializing persistence service"
|
||||
persisterService = new PersisterService(new File(home, "files.json"), eventBus, 60000, fileManager)
|
||||
|
@ -215,6 +215,8 @@ class DownloadSession {
|
||||
pieces.markPartial(piece, 0)
|
||||
throw new BadHashException("bad hash on piece $piece")
|
||||
}
|
||||
|
||||
eventBus.publish(new SourceVerifiedEvent(infoHash : infoHash, source : endpoint.destination))
|
||||
} finally {
|
||||
try { channel?.close() } catch (IOException ignore) {}
|
||||
DataUtil.tryUnmap(mapped)
|
||||
|
@ -0,0 +1,11 @@
|
||||
package com.muwire.core.download
|
||||
|
||||
import com.muwire.core.Event
|
||||
import com.muwire.core.InfoHash
|
||||
|
||||
import net.i2p.data.Destination
|
||||
|
||||
class SourceVerifiedEvent extends Event {
|
||||
InfoHash infoHash
|
||||
Destination source
|
||||
}
|
@ -1,15 +1,29 @@
|
||||
package com.muwire.core.mesh
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.stream.Collectors
|
||||
|
||||
import com.muwire.core.InfoHash
|
||||
import com.muwire.core.Persona
|
||||
import com.muwire.core.download.Pieces
|
||||
import com.muwire.core.util.DataUtil
|
||||
|
||||
import net.i2p.data.Base64
|
||||
import net.i2p.data.Destination
|
||||
import net.i2p.util.ConcurrentHashSet
|
||||
|
||||
/**
|
||||
* Representation of a download mesh.
|
||||
*
|
||||
* Two data structures - collection of known sources and collection of sources
|
||||
* we have successfully transferred data with.
|
||||
*
|
||||
* @author zab
|
||||
*/
|
||||
class Mesh {
|
||||
private final InfoHash infoHash
|
||||
private final Set<Persona> sources = new ConcurrentHashSet<>()
|
||||
private final Map<Destination,Persona> sources = new HashMap<>()
|
||||
private final Set<Destination> verified = new HashSet<>()
|
||||
final Pieces pieces
|
||||
|
||||
Mesh(InfoHash infoHash, Pieces pieces) {
|
||||
@ -17,12 +31,37 @@ class Mesh {
|
||||
this.pieces = pieces
|
||||
}
|
||||
|
||||
Set<Persona> getRandom(int n, Persona exclude) {
|
||||
List<Persona> tmp = new ArrayList<>(sources)
|
||||
tmp.remove(exclude)
|
||||
synchronized Set<Persona> getRandom(int n, Persona exclude) {
|
||||
List<Destination> tmp = new ArrayList<>(verified)
|
||||
if (exclude != null)
|
||||
tmp.remove(exclude.destination)
|
||||
Collections.shuffle(tmp)
|
||||
if (tmp.size() < n)
|
||||
return tmp
|
||||
tmp[0..n-1]
|
||||
if (tmp.size() > n)
|
||||
tmp = tmp[0..n-1]
|
||||
tmp.collect(new HashSet<>(), { sources[it] })
|
||||
}
|
||||
|
||||
synchronized void add(Persona persona) {
|
||||
sources.put(persona.destination, persona)
|
||||
}
|
||||
|
||||
synchronized void verify(Destination d) {
|
||||
verified.add(d)
|
||||
}
|
||||
|
||||
synchronized def toJson() {
|
||||
def json = [:]
|
||||
json.timestamp = System.currentTimeMillis()
|
||||
json.infoHash = Base64.encode(infoHash.getRoot())
|
||||
|
||||
Set<Persona> toPersist = new HashSet<>(sources.values())
|
||||
toPersist.retainAll { verified.contains(it.destination) }
|
||||
json.sources = toPersist.collect {it.toBase64()}
|
||||
json.nPieces = pieces.nPieces
|
||||
List<Integer> downloaded = pieces.getDownloaded()
|
||||
if( downloaded.size() > pieces.nPieces)
|
||||
return null
|
||||
json.xHave = DataUtil.encodeXHave(downloaded, pieces.nPieces)
|
||||
json
|
||||
}
|
||||
}
|
||||
|
@ -9,6 +9,7 @@ import com.muwire.core.MuWireSettings
|
||||
import com.muwire.core.Persona
|
||||
import com.muwire.core.download.Pieces
|
||||
import com.muwire.core.download.SourceDiscoveredEvent
|
||||
import com.muwire.core.download.SourceVerifiedEvent
|
||||
import com.muwire.core.files.FileManager
|
||||
import com.muwire.core.util.DataUtil
|
||||
|
||||
@ -56,25 +57,25 @@ class MeshManager {
|
||||
Mesh mesh = meshes.get(e.infoHash)
|
||||
if (mesh == null)
|
||||
return
|
||||
mesh.sources.add(e.source)
|
||||
save()
|
||||
mesh.add(e.source)
|
||||
}
|
||||
|
||||
void onSourceVerifiedEvent(SourceVerifiedEvent e) {
|
||||
Mesh mesh = meshes.get(e.infoHash)
|
||||
if (mesh == null)
|
||||
return
|
||||
mesh.verify(e.source)
|
||||
save()
|
||||
}
|
||||
|
||||
private void save() {
|
||||
File meshFile = new File(home, "mesh.json")
|
||||
synchronized(meshes) {
|
||||
meshFile.withPrintWriter { writer ->
|
||||
meshes.values().each { mesh ->
|
||||
def json = [:]
|
||||
json.timestamp = System.currentTimeMillis()
|
||||
json.infoHash = Base64.encode(mesh.infoHash.getRoot())
|
||||
json.sources = mesh.sources.stream().map({it.toBase64()}).collect(Collectors.toList())
|
||||
json.nPieces = mesh.pieces.nPieces
|
||||
List<Integer> downloaded = mesh.pieces.getDownloaded()
|
||||
if( downloaded.size() > mesh.pieces.nPieces)
|
||||
return
|
||||
json.xHave = DataUtil.encodeXHave(downloaded, mesh.pieces.nPieces)
|
||||
writer.println(JsonOutput.toJson(json))
|
||||
def json = mesh.toJson()
|
||||
if (json != null)
|
||||
writer.println(JsonOutput.toJson(json))
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -99,7 +100,8 @@ class MeshManager {
|
||||
Mesh mesh = new Mesh(infoHash, pieces)
|
||||
json.sources.each { source ->
|
||||
Persona persona = new Persona(new ByteArrayInputStream(Base64.decode(source)))
|
||||
mesh.sources.add(persona)
|
||||
mesh.add(persona)
|
||||
mesh.verify(persona.destination) // assume if persisted it was verified
|
||||
}
|
||||
|
||||
if (json.xHave != null) {
|
||||
|
@ -11,6 +11,7 @@ import com.muwire.core.connection.Endpoint
|
||||
import com.muwire.core.download.DownloadManager
|
||||
import com.muwire.core.download.Downloader
|
||||
import com.muwire.core.download.SourceDiscoveredEvent
|
||||
import com.muwire.core.download.SourceVerifiedEvent
|
||||
import com.muwire.core.files.FileManager
|
||||
import com.muwire.core.files.PersisterFolderService
|
||||
import com.muwire.core.mesh.Mesh
|
||||
@ -123,6 +124,7 @@ public class UploadManager {
|
||||
eventBus.publish(new UploadEvent(uploader : uploader))
|
||||
try {
|
||||
uploader.respond()
|
||||
eventBus.publish(new SourceVerifiedEvent(infoHash : request.infoHash, source : request.downloader.destination))
|
||||
} finally {
|
||||
decrementUploads(request.downloader)
|
||||
eventBus.publish(new UploadFinishedEvent(uploader : uploader))
|
||||
@ -259,6 +261,7 @@ public class UploadManager {
|
||||
eventBus.publish(new UploadEvent(uploader : uploader))
|
||||
try {
|
||||
uploader.respond()
|
||||
eventBus.publish(new SourceVerifiedEvent(infoHash : request.infoHash, source : request.downloader.destination))
|
||||
} finally {
|
||||
eventBus.publish(new UploadFinishedEvent(uploader : uploader))
|
||||
}
|
||||
|
Reference in New Issue
Block a user