multisource downloads, untested

This commit is contained in:
Zlatin Balevsky
2019-06-04 03:30:55 +01:00
parent e7240dcb6f
commit 3cea1870cd
7 changed files with 138 additions and 69 deletions

View File

@ -3,6 +3,7 @@ package com.muwire.core.download
import com.muwire.core.connection.I2PConnector
import net.i2p.data.Base64
import net.i2p.data.Destination
import com.muwire.core.EventBus
import com.muwire.core.Persona
@ -16,16 +17,13 @@ public class DownloadManager {
private final I2PConnector connector
private final Executor executor
private final File incompletes
private final String meB64
private final Persona me
public DownloadManager(EventBus eventBus, I2PConnector connector, File incompletes, Persona me) {
this.eventBus = eventBus
this.connector = connector
this.incompletes = incompletes
def baos = new ByteArrayOutputStream()
me.write(baos)
this.meB64 = Base64.encode(baos.toByteArray())
this.me = me
incompletes.mkdir()
@ -39,8 +37,18 @@ public class DownloadManager {
public void onUIDownloadEvent(UIDownloadEvent e) {
def downloader = new Downloader(this, meB64, e.target, e.result.size,
e.result.infohash, e.result.pieceSize, connector, e.result.sender.destination,
def size = e.result[0].size
def infohash = e.result[0].infohash
def pieceSize = e.result[0].pieceSize
Set<Destination> destinations = new HashSet<>()
e.result.each {
destinations.add(it.sender.destination)
}
def downloader = new Downloader(this, me, e.target, size,
infohash, pieceSize, connector, destinations,
incompletes)
executor.execute({downloader.download()} as Runnable)
eventBus.publish(new DownloadStartedEvent(downloader : downloader))

View File

@ -1,8 +1,12 @@
package com.muwire.core.download
import com.muwire.core.InfoHash
import com.muwire.core.Persona
import com.muwire.core.connection.Endpoint
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.logging.Level
import com.muwire.core.Constants
@ -14,35 +18,41 @@ import net.i2p.data.Destination
@Log
public class Downloader {
public enum DownloadState { CONNECTING, DOWNLOADING, FAILED, CANCELLED, FINISHED }
private enum WorkerState { CONNECTING, DOWNLOADING, FINISHED}
private static final ExecutorService executorService = Executors.newCachedThreadPool({r ->
Thread rv = new Thread(r)
rv.setName("download worker")
rv.setDaemon(true)
rv
})
private final DownloadManager downloadManager
private final String meB64
private final Persona me
private final File file
private final Pieces downloaded, claimed
private final long length
private final InfoHash infoHash
private final int pieceSize
private final I2PConnector connector
private final Destination destination
private final Set<Destination> destinations
private final int nPieces
private final File piecesFile
private final Map<Destination, DownloadWorker> activeWorkers = new ConcurrentHashMap<>()
private Endpoint endpoint
private volatile DownloadSession currentSession
private volatile DownloadState currentState
private volatile boolean cancelled
private volatile Thread downloadThread
public Downloader(DownloadManager downloadManager, String meB64, File file, long length, InfoHash infoHash,
int pieceSizePow2, I2PConnector connector, Destination destination,
public Downloader(DownloadManager downloadManager, Persona me, File file, long length, InfoHash infoHash,
int pieceSizePow2, I2PConnector connector, Set<Destination> destinations,
File incompletes) {
this.meB64 = meB64
this.me = me
this.downloadManager = downloadManager
this.file = file
this.infoHash = infoHash
this.length = length
this.connector = connector
this.destination = destination
this.destinations = destinations
this.piecesFile = new File(incompletes, file.getName()+".pieces")
this.pieceSize = 1 << pieceSizePow2
@ -55,36 +65,16 @@ public class Downloader {
downloaded = new Pieces(nPieces, Constants.DOWNLOAD_SEQUENTIAL_RATIO)
claimed = new Pieces(nPieces)
currentState = DownloadState.CONNECTING
}
void download() {
readPieces()
downloadThread = Thread.currentThread()
Endpoint endpoint = null
try {
endpoint = connector.connect(destination)
currentState = DownloadState.DOWNLOADING
boolean requestPerformed
while(!downloaded.isComplete()) {
currentSession = new DownloadSession(meB64, downloaded, claimed, infoHash, endpoint, file, pieceSize, length)
requestPerformed = currentSession.request()
if (!requestPerformed)
break
writePieces()
destinations.each {
if (it != me.destination) {
def worker = new DownloadWorker(it)
activeWorkers.put(it, worker)
executorService.submit(worker)
}
if (requestPerformed) {
currentState = DownloadState.FINISHED
piecesFile.delete()
} else log.info("request not performed")
} catch (Exception bad) {
log.log(Level.WARNING,"Exception while downloading",bad)
if (cancelled)
currentState = DownloadState.CANCELLED
else if (currentState != DownloadState.FINISHED)
currentState = DownloadState.FAILED
} finally {
endpoint?.close()
}
}
@ -109,29 +99,96 @@ public class Downloader {
downloaded.donePieces()
}
public int positionInPiece() {
if (currentSession == null)
return 0
currentSession.positionInPiece()
}
public int speed() {
if (currentSession == null)
return 0
currentSession.speed()
int total = 0
activeWorkers.values().each {
total += it.speed()
}
total
}
public DownloadState getCurrentState() {
currentState
if (cancelled)
return DownloadState.CANCELLED
boolean allFinished = true
activeWorkers.values().each {
allFinished &= it.currentState == WorkerState.FINISHED
}
if (allFinished) {
if (downloaded.isComplete())
return DownloadState.FINISHED
return DownloadState.FAILED
}
// if at least one is downloading...
boolean oneDownloading = false
activeWorkers.values().each {
if (it.currentState == WorkerState.DOWNLOADING) {
oneDownloading = true
return
}
}
if (oneDownloading)
return DownloadState.DOWNLOADING
return DownloadState.CONNECTING
}
public void cancel() {
cancelled = true
downloadThread?.interrupt()
activeWorkers.values().each {
it.cancel()
}
}
public void resume() {
currentState = DownloadState.CONNECTING
downloadManager.resume(this)
}
class DownloadWorker implements Runnable {
private final Destination destination
private volatile WorkerState currentState
private volatile Thread downloadThread
private Endpoint endpoint
private volatile DownloadSession currentSession
DownloadWorker(Destination destination) {
this.destination = destination
}
public void run() {
downloadThread = Thread.currentThread()
currentState = WorkerState.CONNECTING
Endpoint endpoint = null
try {
endpoint = connector.connect(destination)
currentState = WorkerState.DOWNLOADING
boolean requestPerformed
while(!downloaded.isComplete()) {
currentSession = new DownloadSession(me.toBase64(), downloaded, claimed, infoHash, endpoint, file, pieceSize, length)
requestPerformed = currentSession.request()
if (!requestPerformed)
break
writePieces()
}
} catch (Exception bad) {
log.log(Level.WARNING,"Exception while downloading",bad)
} finally {
currentState = WorkerState.FINISHED
endpoint?.close()
}
}
int speed() {
if (currentSession == null)
return 0
currentSession.speed()
}
void cancel() {
downloadThread?.interrupt()
}
}
}

View File

@ -5,6 +5,6 @@ import com.muwire.core.search.UIResultEvent
class UIDownloadEvent extends Event {
UIResultEvent result
UIResultEvent[] result
File target
}

View File

@ -71,8 +71,15 @@ class MainFrameController {
def result = selectedResult()
if (result == null)
return // TODO disable button
def file = new File(application.context.get("muwire-settings").downloadLocation, result.name)
core.eventBus.publish(new UIDownloadEvent(result : result, target : file))
def file = new File(application.context.get("muwire-settings").downloadLocation, result.name)
def selected = builder.getVariable("result-tabs").getSelectedComponent()
def group = selected.getClientProperty("mvc-group")
def resultsBucket = group.model.hashBucket[result.infohash]
core.eventBus.publish(new UIDownloadEvent(result : resultsBucket, target : file))
}
@ControllerAction

View File

@ -21,7 +21,7 @@ class SearchTabModel {
Core core
String uuid
def results = []
def hashCount = [:]
def hashBucket = [:]
void mvcGroupInit(Map<String, String> args) {
@ -35,11 +35,12 @@ class SearchTabModel {
void handleResult(UIResultEvent e) {
runInsideUIAsync {
Integer count = hashCount.get(e.infohash)
if (count == null)
count = 0
count++
hashCount[e.infohash] = count
def bucket = hashBucket.get(e.infohash)
if (bucket == null) {
bucket = []
hashBucket[e.infohash] = bucket
}
bucket << e
results << e
JTable table = builder.getVariable("results-table")

View File

@ -104,11 +104,7 @@ class MainFrameView {
int done = row.downloader.donePieces()
"$done/$pieces pieces"
})
closureColumn(header: "Piece", type: String, read: { row ->
int position = row.downloader.positionInPiece()
int pieceSize = row.downloader.pieceSize // TODO: fix for last piece
"$position/$pieceSize bytes"
})
closureColumn(header: "Sources", type: Integer, read : {row -> row.downloader.activeWorkers.size()})
closureColumn(header: "Speed (bytes/second)", type:Integer, read :{row -> row.downloader.speed()})
}
}

View File

@ -31,7 +31,7 @@ class SearchTabView {
tableModel(list: model.results) {
closureColumn(header: "Name", type: String, read : {row -> row.name})
closureColumn(header: "Size", preferredWidth: 150, type: Long, read : {row -> row.size})
closureColumn(header: "Sources", type : Integer, read : { row -> model.hashCount[row.infohash]})
closureColumn(header: "Sources", type : Integer, read : { row -> model.hashBucket[row.infohash].size()})
closureColumn(header: "Sender", type: String, read : {row -> row.sender.getHumanReadableName()})
closureColumn(header: "Trust", type: String, read : {row ->
model.core.trustService.getLevel(row.sender.destination)