Give up on download sources after a number of attempts

This commit is contained in:
Zlatin Balevsky
2020-09-23 14:00:52 +01:00
parent c73a821c67
commit 140231e362
3 changed files with 48 additions and 7 deletions

View File

@ -16,7 +16,7 @@ class MuWireSettings {
boolean allowTrustLists
int trustListInterval
Set<Persona> trustSubscriptions
int downloadRetryInterval
int downloadRetryInterval, downloadMaxFailures
int totalUploadSlots
int uploadSlotsPerUser
int updateCheckInterval
@ -80,6 +80,7 @@ class MuWireSettings {
if (incompleteLocationProp != null)
incompleteLocation = new File(incompleteLocationProp)
downloadRetryInterval = Integer.parseInt(props.getProperty("downloadRetryInterval","60"))
downloadMaxFailures = Integer.parseInt(props.getProperty("downloadMaxFailures","10"))
updateCheckInterval = Integer.parseInt(props.getProperty("updateCheckInterval","24"))
lastUpdateCheck = Long.parseLong(props.getProperty("lastUpdateChec","0"))
autoDownloadUpdate = Boolean.parseBoolean(props.getProperty("autoDownloadUpdate","true"))
@ -154,6 +155,7 @@ class MuWireSettings {
if (incompleteLocation != null)
props.setProperty("incompleteLocation", incompleteLocation.getAbsolutePath())
props.setProperty("downloadRetryInterval", String.valueOf(downloadRetryInterval))
props.setProperty("downloadMaxFailures", String.valueOf(downloadMaxFailures))
props.setProperty("updateCheckInterval", String.valueOf(updateCheckInterval))
props.setProperty("lastUpdateCheck", String.valueOf(lastUpdateCheck))
props.setProperty("autoDownloadUpdate", String.valueOf(autoDownloadUpdate))

View File

@ -100,7 +100,7 @@ public class DownloadManager {
Pieces pieces = getPieces(infoHash, size, pieceSize, sequential)
def downloader = new Downloader(eventBus, this, chatServer, me, target, size,
infoHash, pieceSize, connector, destinations,
incompletes, pieces)
incompletes, pieces, muSettings.downloadMaxFailures)
downloaders.put(infoHash, downloader)
persistDownloaders()
executor.execute({downloader.download()} as Runnable)
@ -163,7 +163,7 @@ public class DownloadManager {
Pieces pieces = getPieces(infoHash, (long)json.length, json.pieceSizePow2, sequential)
def downloader = new Downloader(eventBus, this, chatServer, me, file, (long)json.length,
infoHash, json.pieceSizePow2, connector, destinations, incompletes, pieces)
infoHash, json.pieceSizePow2, connector, destinations, incompletes, pieces, muSettings.downloadMaxFailures)
if (json.paused != null)
downloader.paused = json.paused

View File

@ -59,6 +59,9 @@ public class Downloader {
final int pieceSizePow2
private final Map<Destination, DownloadWorker> activeWorkers = new ConcurrentHashMap<>()
private final Set<Destination> successfulDestinations = new ConcurrentHashSet<>()
/** LOCKING: itself */
private final Map<Destination, Integer> failingDestinations = new HashMap<>()
private final int maxFailures
private volatile boolean cancelled, paused
@ -74,7 +77,7 @@ public class Downloader {
public Downloader(EventBus eventBus, DownloadManager downloadManager, ChatServer chatServer,
Persona me, File file, long length, InfoHash infoHash,
int pieceSizePow2, I2PConnector connector, Set<Destination> destinations,
File incompletes, Pieces pieces) {
File incompletes, Pieces pieces, int maxFailures) {
this.eventBus = eventBus
this.me = me
this.downloadManager = downloadManager
@ -91,6 +94,7 @@ public class Downloader {
this.pieceSize = 1 << pieceSizePow2
this.pieces = pieces
this.nPieces = pieces.nPieces
this.maxFailures = maxFailures
}
public synchronized InfoHash getInfoHash() {
@ -120,7 +124,7 @@ public class Downloader {
void download() {
readPieces()
destinations.each {
if (it != me.destination) {
if (it != me.destination && !isHopeless(it)) {
def worker = new DownloadWorker(it)
activeWorkers.put(it, worker)
executorService.submit(worker)
@ -273,11 +277,22 @@ public class Downloader {
public int getTotalWorkers() {
return activeWorkers.size();
}
public int countHopelessSources() {
synchronized(failingDestinations) {
return destinations.count { isHopeless(it)}
}
}
public boolean hasLiveSources() {
destinations.size() > countHopelessSources()
}
public void resume() {
paused = false
readPieces()
destinations.each { destination ->
destinations.stream().filter({!isHopeless(it)}).forEach { destination ->
log.fine("resuming source ${destination.toBase32()}")
def worker = activeWorkers.get(destination)
if (worker != null) {
if (worker.currentState == WorkerState.FINISHED) {
@ -294,8 +309,9 @@ public class Downloader {
}
void addSource(Destination d) {
if (activeWorkers.containsKey(d))
if (activeWorkers.containsKey(d) || isHopeless(d))
return
destinations.add(d)
DownloadWorker newWorker = new DownloadWorker(d)
activeWorkers.put(d, newWorker)
executorService.submit(newWorker)
@ -351,6 +367,28 @@ public class Downloader {
try {os?.close() } catch (IOException ignore) {}
}
}
private boolean isHopeless(Destination d) {
if (maxFailures < 0)
return false
synchronized(failingDestinations) {
return !successfulDestinations.contains(d) &&
failingDestinations.containsKey(d) &&
failingDestinations[d] >= maxFailures
}
}
private void markFailed(Destination d) {
log.fine("marking failed ${d.toBase32()}")
synchronized(failingDestinations) {
Integer count = failingDestinations.get(d)
if (count == null) {
failingDestinations.put(d, 1)
} else {
failingDestinations.put(d, count + 1)
}
}
}
class DownloadWorker implements Runnable {
private final Destination destination
@ -395,6 +433,7 @@ public class Downloader {
}
} catch (Exception bad) {
log.log(Level.WARNING,"Exception while downloading",DataUtil.findRoot(bad))
markFailed(destination)
} finally {
writePieces()
currentState = WorkerState.FINISHED