wip on pinging swarm members
This commit is contained in:
@ -1,14 +1,113 @@
|
||||
package com.muwire.tracker
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.logging.Level
|
||||
|
||||
import javax.annotation.PostConstruct
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired
|
||||
import org.springframework.stereotype.Component
|
||||
|
||||
import com.muwire.core.Constants
|
||||
import com.muwire.core.Core
|
||||
|
||||
import groovy.json.JsonOutput
|
||||
import groovy.util.logging.Log
|
||||
import net.i2p.client.I2PSession
|
||||
import net.i2p.client.I2PSessionMuxedListener
|
||||
import net.i2p.client.SendMessageOptions
|
||||
import net.i2p.client.datagram.I2PDatagramMaker
|
||||
import net.i2p.data.Base64
|
||||
|
||||
@Component
|
||||
@Log
|
||||
class Pinger {
|
||||
private final Core core
|
||||
@Autowired
|
||||
private Core core
|
||||
|
||||
Pinger(Core core) {
|
||||
this.core = core
|
||||
@Autowired
|
||||
private SwarmManager swarmManager
|
||||
|
||||
@Autowired
|
||||
private TrackerProperties trackerProperties
|
||||
|
||||
private final Map<UUID, PingInProgress> inFlight = new ConcurrentHashMap<>()
|
||||
private final Timer expiryTimer = new Timer("pinger-timer",true)
|
||||
|
||||
@PostConstruct
|
||||
private void registerListener() {
|
||||
core.getI2pSession().addMuxedSessionListener(new Listener(), I2PSession.PROTO_DATAGRAM, Constants.TRACKER_PORT)
|
||||
expiryTimer.schedule({expirePings()} as TimerTask, 1000, 1000)
|
||||
}
|
||||
|
||||
private void expirePings() {
|
||||
final long now = System.currentTimeMillis()
|
||||
for(Iterator<UUID> iter = inFlight.keySet().iterator(); iter.hasNext();) {
|
||||
UUID uuid = iter.next()
|
||||
PingInProgress ping = inFlight.get(uuid)
|
||||
if (now - ping.pingTime > trackerProperties.getSwarmParameters().getPingTimeout() * 1000L) {
|
||||
iter.remove()
|
||||
swarmManager.fail(ping.target)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ping(SwarmManager.HostAndIH target, long now) {
|
||||
UUID uuid = UUID.randomUUID()
|
||||
def ping = new PingInProgress(target, now)
|
||||
inFlight.put(uuid, ping)
|
||||
|
||||
def message = [:]
|
||||
message.type = "TrackerPing"
|
||||
message.version = 1
|
||||
message.infoHash = Base64.encode(target.getInfoHash().getRoot())
|
||||
message.uuid = uuid.toString()
|
||||
|
||||
message = JsonOutput.toJson(message)
|
||||
def maker = new I2PDatagramMaker(core.getI2pSession())
|
||||
message = maker.makeI2PDatagram(message.bytes)
|
||||
def options = new SendMessageOptions()
|
||||
options.setSendLeaseSet(true)
|
||||
core.getI2pSession().sendMessage(target.getHost().getPersona().getDestination(), message, 0, message.length, I2PSession.PROTO_DATAGRAM,
|
||||
Constants.TRACKER_PORT, Constants.TRACKER_PORT, options)
|
||||
}
|
||||
|
||||
private static class PingInProgress {
|
||||
private final SwarmManager.HostAndIH target
|
||||
private final long pingTime
|
||||
PingInProgress(SwarmManager.HostAndIH target, long pingTime) {
|
||||
this.target = target
|
||||
this.pingTime = pingTime
|
||||
}
|
||||
}
|
||||
|
||||
private class Listener implements I2PSessionMuxedListener {
|
||||
|
||||
@Override
|
||||
public void messageAvailable(I2PSession session, int msgId, long size) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void messageAvailable(I2PSession session, int msgId, long size, int proto, int fromport, int toport) {
|
||||
// TODO Auto-generated method stub
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reportAbuse(I2PSession session, int severity) {
|
||||
log.warning("reportabuse $session $severity")
|
||||
}
|
||||
|
||||
@Override
|
||||
public void disconnected(I2PSession session) {
|
||||
log.severe("disconnected")
|
||||
}
|
||||
|
||||
@Override
|
||||
public void errorOccurred(I2PSession session, String message, Throwable error) {
|
||||
log.log(Level.SEVERE,message,error)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -90,6 +90,8 @@ class SwarmManager {
|
||||
}
|
||||
|
||||
log.info("will ping $toPing")
|
||||
|
||||
toPing.each { pinger.ping(it, now) }
|
||||
}
|
||||
|
||||
private void query(Swarm swarm) {
|
||||
@ -132,9 +134,13 @@ class SwarmManager {
|
||||
swarms.get(infoHash)?.info()
|
||||
}
|
||||
|
||||
private static class HostAndIH {
|
||||
private final Host host
|
||||
private final InfoHash infoHash
|
||||
void fail(HostAndIH target) {
|
||||
swarms.get(target.infoHash)?.fail(target.host)
|
||||
}
|
||||
|
||||
public static class HostAndIH {
|
||||
final Host host
|
||||
final InfoHash infoHash
|
||||
HostAndIH(Host host, InfoHash infoHash) {
|
||||
this.host = host
|
||||
this.infoHash = infoHash
|
||||
|
@ -25,5 +25,7 @@ class TrackerProperties {
|
||||
int pingInterval = 15
|
||||
/** how long to wait before declaring a host is dead, in minutes */
|
||||
int expiry = 60
|
||||
/** how long to wait for a host to respond to a ping, in seconds */
|
||||
int pingTimeout = 20
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user