cache recent responders and always forward queries to them. Thanks to qtm for the idea
This commit is contained in:
@ -70,6 +70,7 @@ import com.muwire.core.hostcache.HostDiscoveredEvent
|
||||
import com.muwire.core.mesh.MeshManager
|
||||
import com.muwire.core.search.BrowseManager
|
||||
import com.muwire.core.search.QueryEvent
|
||||
import com.muwire.core.search.ResponderCache
|
||||
import com.muwire.core.search.ResultsEvent
|
||||
import com.muwire.core.search.ResultsSender
|
||||
import com.muwire.core.search.SearchEvent
|
||||
@ -306,10 +307,16 @@ public class Core {
|
||||
eventBus.register(HostDiscoveredEvent.class, hostCache)
|
||||
eventBus.register(ConnectionEvent.class, hostCache)
|
||||
|
||||
|
||||
log.info("initializing responder cache")
|
||||
ResponderCache responderCache = new ResponderCache((int)(Math.sqrt(props.peerConnections)) + 1)
|
||||
eventBus.register(UIResultBatchEvent.class, responderCache)
|
||||
|
||||
|
||||
log.info("initializing connection manager")
|
||||
connectionManager = props.isLeaf() ?
|
||||
new LeafConnectionManager(eventBus, me, 3, hostCache, props) :
|
||||
new UltrapeerConnectionManager(eventBus, me, props.peerConnections, props.leafConnections, hostCache, trustService, props)
|
||||
new UltrapeerConnectionManager(eventBus, me, props.peerConnections, props.leafConnections, hostCache, responderCache, trustService, props)
|
||||
eventBus.register(TrustEvent.class, connectionManager)
|
||||
eventBus.register(ConnectionEvent.class, connectionManager)
|
||||
eventBus.register(DisconnectionEvent.class, connectionManager)
|
||||
|
@ -8,6 +8,7 @@ import com.muwire.core.MuWireSettings
|
||||
import com.muwire.core.Persona
|
||||
import com.muwire.core.hostcache.HostCache
|
||||
import com.muwire.core.search.QueryEvent
|
||||
import com.muwire.core.search.ResponderCache
|
||||
import com.muwire.core.trust.TrustService
|
||||
|
||||
import groovy.util.logging.Log
|
||||
@ -18,6 +19,7 @@ class UltrapeerConnectionManager extends ConnectionManager {
|
||||
|
||||
final int maxPeers, maxLeafs
|
||||
final TrustService trustService
|
||||
final ResponderCache responderCache
|
||||
|
||||
final Map<Destination, PeerConnection> peerConnections = new ConcurrentHashMap()
|
||||
final Map<Destination, LeafConnection> leafConnections = new ConcurrentHashMap()
|
||||
@ -27,11 +29,12 @@ class UltrapeerConnectionManager extends ConnectionManager {
|
||||
UltrapeerConnectionManager() {}
|
||||
|
||||
public UltrapeerConnectionManager(EventBus eventBus, Persona me, int maxPeers, int maxLeafs,
|
||||
HostCache hostCache, TrustService trustService, MuWireSettings settings) {
|
||||
HostCache hostCache, ResponderCache responderCache, TrustService trustService, MuWireSettings settings) {
|
||||
super(eventBus, me, hostCache, settings)
|
||||
this.maxPeers = maxPeers
|
||||
this.maxLeafs = maxLeafs
|
||||
this.trustService = trustService
|
||||
this.responderCache = responderCache
|
||||
}
|
||||
@Override
|
||||
public void drop(Destination d) {
|
||||
@ -53,9 +56,11 @@ class UltrapeerConnectionManager extends ConnectionManager {
|
||||
peerConnections.values().each {
|
||||
// 1. do not send query back to originator
|
||||
// 2. if firstHop forward to everyone
|
||||
// 3. otherwise to randomized sqrt of neighbors
|
||||
// 3. otherwise to everyone who has recently responded to us + randomized sqrt of neighbors
|
||||
if (e.getReceivedOn() != it.getEndpoint().getDestination() &&
|
||||
(e.firstHop || random.nextInt(connCount) < treshold))
|
||||
(e.firstHop ||
|
||||
responderCache.hasResponded(it.endpoint.destination) ||
|
||||
random.nextInt(connCount) < treshold))
|
||||
it.sendQuery(e)
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,25 @@
|
||||
package com.muwire.core.search
|
||||
|
||||
import com.muwire.core.util.FixedSizeFIFOSet
|
||||
|
||||
import net.i2p.data.Destination
|
||||
|
||||
/**
|
||||
* Caches destinations that have recently responded to with results.
|
||||
*/
|
||||
class ResponderCache {
|
||||
|
||||
private final FixedSizeFIFOSet<Destination> cache
|
||||
|
||||
ResponderCache(int capacity) {
|
||||
cache = new FixedSizeFIFOSet<>(capacity)
|
||||
}
|
||||
|
||||
synchronized void onUIResultBatchEvent(UIResultBatchEvent e) {
|
||||
cache.add(e.results[0].sender.destination)
|
||||
}
|
||||
|
||||
synchronized boolean hasResponded(Destination d) {
|
||||
cache.contains(d)
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user