diff --git a/core/src/main/groovy/com/muwire/core/MuWireSettings.groovy b/core/src/main/groovy/com/muwire/core/MuWireSettings.groovy index cf61ed81..9c184bc6 100644 --- a/core/src/main/groovy/com/muwire/core/MuWireSettings.groovy +++ b/core/src/main/groovy/com/muwire/core/MuWireSettings.groovy @@ -16,7 +16,7 @@ class MuWireSettings { boolean allowTrustLists int trustListInterval Set trustSubscriptions - int downloadRetryInterval + int downloadRetryInterval, downloadMaxFailures int totalUploadSlots int uploadSlotsPerUser int updateCheckInterval @@ -80,6 +80,7 @@ class MuWireSettings { if (incompleteLocationProp != null) incompleteLocation = new File(incompleteLocationProp) downloadRetryInterval = Integer.parseInt(props.getProperty("downloadRetryInterval","60")) + downloadMaxFailures = Integer.parseInt(props.getProperty("downloadMaxFailures","10")) updateCheckInterval = Integer.parseInt(props.getProperty("updateCheckInterval","24")) lastUpdateCheck = Long.parseLong(props.getProperty("lastUpdateChec","0")) autoDownloadUpdate = Boolean.parseBoolean(props.getProperty("autoDownloadUpdate","true")) @@ -154,6 +155,7 @@ class MuWireSettings { if (incompleteLocation != null) props.setProperty("incompleteLocation", incompleteLocation.getAbsolutePath()) props.setProperty("downloadRetryInterval", String.valueOf(downloadRetryInterval)) + props.setProperty("downloadMaxFailures", String.valueOf(downloadMaxFailures)) props.setProperty("updateCheckInterval", String.valueOf(updateCheckInterval)) props.setProperty("lastUpdateCheck", String.valueOf(lastUpdateCheck)) props.setProperty("autoDownloadUpdate", String.valueOf(autoDownloadUpdate)) diff --git a/core/src/main/groovy/com/muwire/core/download/DownloadManager.groovy b/core/src/main/groovy/com/muwire/core/download/DownloadManager.groovy index 98bb9947..75bff753 100644 --- a/core/src/main/groovy/com/muwire/core/download/DownloadManager.groovy +++ b/core/src/main/groovy/com/muwire/core/download/DownloadManager.groovy @@ -100,7 +100,7 @@ public class DownloadManager { Pieces pieces = getPieces(infoHash, size, pieceSize, sequential) def downloader = new Downloader(eventBus, this, chatServer, me, target, size, infoHash, pieceSize, connector, destinations, - incompletes, pieces) + incompletes, pieces, muSettings.downloadMaxFailures) downloaders.put(infoHash, downloader) persistDownloaders() executor.execute({downloader.download()} as Runnable) @@ -163,7 +163,7 @@ public class DownloadManager { Pieces pieces = getPieces(infoHash, (long)json.length, json.pieceSizePow2, sequential) def downloader = new Downloader(eventBus, this, chatServer, me, file, (long)json.length, - infoHash, json.pieceSizePow2, connector, destinations, incompletes, pieces) + infoHash, json.pieceSizePow2, connector, destinations, incompletes, pieces, muSettings.downloadMaxFailures) if (json.paused != null) downloader.paused = json.paused diff --git a/core/src/main/groovy/com/muwire/core/download/Downloader.groovy b/core/src/main/groovy/com/muwire/core/download/Downloader.groovy index 58373753..0bff9077 100644 --- a/core/src/main/groovy/com/muwire/core/download/Downloader.groovy +++ b/core/src/main/groovy/com/muwire/core/download/Downloader.groovy @@ -59,6 +59,9 @@ public class Downloader { final int pieceSizePow2 private final Map activeWorkers = new ConcurrentHashMap<>() private final Set successfulDestinations = new ConcurrentHashSet<>() + /** LOCKING: itself */ + private final Map failingDestinations = new HashMap<>() + private final int maxFailures private volatile boolean cancelled, paused @@ -74,7 +77,7 @@ public class Downloader { public Downloader(EventBus eventBus, DownloadManager downloadManager, ChatServer chatServer, Persona me, File file, long length, InfoHash infoHash, int pieceSizePow2, I2PConnector connector, Set destinations, - File incompletes, Pieces pieces) { + File incompletes, Pieces pieces, int maxFailures) { this.eventBus = eventBus this.me = me this.downloadManager = downloadManager @@ -91,6 +94,7 @@ public class Downloader { this.pieceSize = 1 << pieceSizePow2 this.pieces = pieces this.nPieces = pieces.nPieces + this.maxFailures = maxFailures } public synchronized InfoHash getInfoHash() { @@ -120,7 +124,7 @@ public class Downloader { void download() { readPieces() destinations.each { - if (it != me.destination) { + if (it != me.destination && !isHopeless(it)) { def worker = new DownloadWorker(it) activeWorkers.put(it, worker) executorService.submit(worker) @@ -273,11 +277,22 @@ public class Downloader { public int getTotalWorkers() { return activeWorkers.size(); } + + public int countHopelessSources() { + synchronized(failingDestinations) { + return destinations.count { isHopeless(it)} + } + } + + public boolean hasLiveSources() { + destinations.size() > countHopelessSources() + } public void resume() { paused = false readPieces() - destinations.each { destination -> + destinations.stream().filter({!isHopeless(it)}).forEach { destination -> + log.fine("resuming source ${destination.toBase32()}") def worker = activeWorkers.get(destination) if (worker != null) { if (worker.currentState == WorkerState.FINISHED) { @@ -294,8 +309,9 @@ public class Downloader { } void addSource(Destination d) { - if (activeWorkers.containsKey(d)) + if (activeWorkers.containsKey(d) || isHopeless(d)) return + destinations.add(d) DownloadWorker newWorker = new DownloadWorker(d) activeWorkers.put(d, newWorker) executorService.submit(newWorker) @@ -351,6 +367,28 @@ public class Downloader { try {os?.close() } catch (IOException ignore) {} } } + + private boolean isHopeless(Destination d) { + if (maxFailures < 0) + return false + synchronized(failingDestinations) { + return !successfulDestinations.contains(d) && + failingDestinations.containsKey(d) && + failingDestinations[d] >= maxFailures + } + } + + private void markFailed(Destination d) { + log.fine("marking failed ${d.toBase32()}") + synchronized(failingDestinations) { + Integer count = failingDestinations.get(d) + if (count == null) { + failingDestinations.put(d, 1) + } else { + failingDestinations.put(d, count + 1) + } + } + } class DownloadWorker implements Runnable { private final Destination destination @@ -395,6 +433,7 @@ public class Downloader { } } catch (Exception bad) { log.log(Level.WARNING,"Exception while downloading",DataUtil.findRoot(bad)) + markFailed(destination) } finally { writePieces() currentState = WorkerState.FINISHED