some DHT work

This commit is contained in:
Arvid Norberg
2009-09-29 17:06:08 +00:00
parent 138fb8c679
commit dfccf0b412
12 changed files with 135 additions and 43 deletions

View File

@@ -1391,8 +1391,9 @@ int main(int argc, char* argv[])
for (std::vector<dht_lookup>::iterator i = sess_stat.active_requests.begin() for (std::vector<dht_lookup>::iterator i = sess_stat.active_requests.begin()
, end(sess_stat.active_requests.end()); i != end; ++i) , end(sess_stat.active_requests.end()); i != end; ++i)
{ {
snprintf(str, sizeof(str), " %s %d (%d) ( timeouts %d responses %d)\n" snprintf(str, sizeof(str), " %s in flight: %d [limit: %d] timeouts %d responses %d\n"
, i->type, i->outstanding_requests, i->branch_factor, i->timeouts, i->responses); , i->type, i->outstanding_requests, i->branch_factor, i->timeouts
, i->responses);
out += str; out += str;
} }
} }

View File

@@ -165,8 +165,6 @@ namespace libtorrent { namespace dht
int m_replies_bytes_sent[5]; int m_replies_bytes_sent[5];
int m_queries_bytes_received[5]; int m_queries_bytes_received[5];
int m_counter; int m_counter;
int m_announces;
int m_failed_announces;
int m_total_message_input; int m_total_message_input;
int m_total_in_bytes; int m_total_in_bytes;

View File

@@ -101,6 +101,7 @@ public:
, m_self(self) , m_self(self)
{} {}
~find_data_observer(); ~find_data_observer();
void short_timeout();
void timeout(); void timeout();
void reply(msg const&); void reply(msg const&);
void abort() { m_algorithm = 0; } void abort() { m_algorithm = 0; }

View File

@@ -164,6 +164,7 @@ public:
, m_token(write_token) , m_token(write_token)
{} {}
void short_timeout() {}
void timeout() {} void timeout() {}
void reply(msg const&) {} void reply(msg const&) {}
void abort() {} void abort() {}

View File

@@ -82,6 +82,10 @@ struct observer : boost::noncopyable
// this is called when a reply is received // this is called when a reply is received
virtual void reply(msg const& m) = 0; virtual void reply(msg const& m) = 0;
// this is called if no response has been received after
// a few seconds, before the request has timed out
virtual void short_timeout() = 0;
// this is called when no reply has been received within // this is called when no reply has been received within
// some timeout // some timeout
virtual void timeout() = 0; virtual void timeout() = 0;

View File

@@ -54,8 +54,6 @@ POSSIBILITY OF SUCH DAMAGE.
namespace libtorrent { namespace dht namespace libtorrent { namespace dht
{ {
struct observer;
#ifdef TORRENT_DHT_VERBOSE_LOGGING #ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_DECLARE_LOG(rpc); TORRENT_DECLARE_LOG(rpc);
#endif #endif
@@ -64,6 +62,7 @@ struct null_observer : public observer
{ {
null_observer(boost::pool<>& allocator): observer(allocator) {} null_observer(boost::pool<>& allocator): observer(allocator) {}
virtual void reply(msg const&) {} virtual void reply(msg const&) {}
virtual void short_timeout() {}
virtual void timeout() {} virtual void timeout() {}
void abort() {} void abort() {}
}; };

View File

@@ -61,7 +61,9 @@ class traversal_algorithm : boost::noncopyable
public: public:
void traverse(node_id const& id, udp::endpoint addr); void traverse(node_id const& id, udp::endpoint addr);
void finished(node_id const& id); void finished(node_id const& id);
void failed(node_id const& id, bool prevent_request = false);
enum flags_t { prevent_request = 1, short_timeout = 2 };
void failed(node_id const& id, int flags = 0);
virtual ~traversal_algorithm(); virtual ~traversal_algorithm();
boost::pool<>& allocator() const; boost::pool<>& allocator() const;
void status(dht_lookup& l); void status(dht_lookup& l);
@@ -81,7 +83,7 @@ public:
node_id id; node_id id;
// TODO: replace with union of address_v4 and address_v6 and a port // TODO: replace with union of address_v4 and address_v6 and a port
udp::endpoint addr; udp::endpoint addr;
enum { queried = 1, initial = 2, no_id = 4 }; enum { queried = 1, initial = 2, no_id = 4, short_timeout = 8 };
unsigned char flags; unsigned char flags;
}; };

View File

@@ -110,6 +110,9 @@ namespace libtorrent { namespace dht
int g_gr_message_input = 0; int g_gr_message_input = 0;
int g_mo_message_input = 0; int g_mo_message_input = 0;
int g_unknown_message_input = 0; int g_unknown_message_input = 0;
int g_announces = 0;
int g_failed_announces = 0;
#endif #endif
void intrusive_ptr_add_ref(dht_tracker const* c) void intrusive_ptr_add_ref(dht_tracker const* c)
@@ -230,8 +233,8 @@ namespace libtorrent { namespace dht
std::fill_n(m_queries_bytes_received, 5, 0); std::fill_n(m_queries_bytes_received, 5, 0);
std::fill_n(m_replies_sent, 5, 0); std::fill_n(m_replies_sent, 5, 0);
std::fill_n(m_queries_received, 5, 0); std::fill_n(m_queries_received, 5, 0);
m_announces = 0; g_announces = 0;
m_failed_announces = 0; g_failed_announces = 0;
m_total_message_input = 0; m_total_message_input = 0;
m_total_in_bytes = 0; m_total_in_bytes = 0;
m_total_out_bytes = 0; m_total_out_bytes = 0;
@@ -396,8 +399,8 @@ namespace libtorrent { namespace dht
pc << "\t" << torrents pc << "\t" << torrents
<< "\t" << peers << "\t" << peers
<< "\t" << m_announces / float(tick_period) << "\t" << g_announces / float(tick_period)
<< "\t" << m_failed_announces / float(tick_period) << "\t" << g_failed_announces / float(tick_period)
<< "\t" << (m_total_message_input / float(tick_period)) << "\t" << (m_total_message_input / float(tick_period))
<< "\t" << (g_az_message_input / float(tick_period)) << "\t" << (g_az_message_input / float(tick_period))
<< "\t" << (g_ut_message_input / float(tick_period)) << "\t" << (g_ut_message_input / float(tick_period))
@@ -414,8 +417,8 @@ namespace libtorrent { namespace dht
std::fill_n(m_queries_bytes_received, 5, 0); std::fill_n(m_queries_bytes_received, 5, 0);
std::fill_n(m_replies_sent, 5, 0); std::fill_n(m_replies_sent, 5, 0);
std::fill_n(m_queries_received, 5, 0); std::fill_n(m_queries_received, 5, 0);
m_announces = 0; g_announces = 0;
m_failed_announces = 0; g_failed_announces = 0;
m_total_message_input = 0; m_total_message_input = 0;
g_az_message_input = 0; g_az_message_input = 0;
g_ut_message_input = 0; g_ut_message_input = 0;

View File

@@ -177,6 +177,12 @@ void find_data_observer::reply(msg const& m)
#endif #endif
} }
void find_data_observer::short_timeout()
{
if (!m_algorithm) return;
m_algorithm->failed(m_self, traversal_algorithm::short_timeout);
}
void find_data_observer::timeout() void find_data_observer::timeout()
{ {
if (!m_algorithm) return; if (!m_algorithm) return;
@@ -184,7 +190,6 @@ void find_data_observer::timeout()
m_algorithm = 0; m_algorithm = 0;
} }
find_data::find_data( find_data::find_data(
node_impl& node node_impl& node
, node_id target , node_id target

View File

@@ -794,6 +794,7 @@ void node_impl::incoming_request(msg const& m, entry& e)
} }
else if (strcmp(query, "announce_peer") == 0) else if (strcmp(query, "announce_peer") == 0)
{ {
extern int g_failed_announces;
key_desc_t msg_desc[] = { key_desc_t msg_desc[] = {
{"info_hash", lazy_entry::string_t, 20, 0}, {"info_hash", lazy_entry::string_t, 20, 0},
{"port", lazy_entry::int_t, 0, 0}, {"port", lazy_entry::int_t, 0, 0},
@@ -803,6 +804,7 @@ void node_impl::incoming_request(msg const& m, entry& e)
lazy_entry const* msg_keys[3]; lazy_entry const* msg_keys[3];
if (!verify_message(arg_ent, msg_desc, msg_keys, 3, error_string, sizeof(error_string))) if (!verify_message(arg_ent, msg_desc, msg_keys, 3, error_string, sizeof(error_string)))
{ {
++g_failed_announces;
incoming_error(e, error_string); incoming_error(e, error_string);
return; return;
} }
@@ -810,6 +812,7 @@ void node_impl::incoming_request(msg const& m, entry& e)
int port = msg_keys[1]->int_value(); int port = msg_keys[1]->int_value();
if (port < 0 || port >= 65536) if (port < 0 || port >= 65536)
{ {
++g_failed_announces;
incoming_error(e, "invalid 'port' in announce"); incoming_error(e, "invalid 'port' in announce");
return; return;
} }
@@ -822,6 +825,7 @@ void node_impl::incoming_request(msg const& m, entry& e)
if (!verify_token(msg_keys[2]->string_value(), msg_keys[0]->string_ptr(), m.addr)) if (!verify_token(msg_keys[2]->string_value(), msg_keys[0]->string_ptr(), m.addr))
{ {
++g_failed_announces;
incoming_error(e, "invalid token in announce"); incoming_error(e, "invalid token in announce");
return; return;
} }
@@ -838,6 +842,8 @@ void node_impl::incoming_request(msg const& m, entry& e)
std::set<peer_entry>::iterator i = v.peers.find(e); std::set<peer_entry>::iterator i = v.peers.find(e);
if (i != v.peers.end()) v.peers.erase(i++); if (i != v.peers.end()) v.peers.erase(i++);
v.peers.insert(i, e); v.peers.insert(i, e);
extern int g_announces;
++g_announces;
} }
else if (strcmp(query, "find_torrent") == 0) else if (strcmp(query, "find_torrent") == 0)
{ {

View File

@@ -338,15 +338,17 @@ time_duration rpc_manager::tick()
{ {
INVARIANT_CHECK; INVARIANT_CHECK;
const int timeout_ms = 10 * 1000; const static int short_timeout = 2;
const static int timeout = 10;
// look for observers that has timed out // look for observers that have timed out
if (m_next_transaction_id == m_oldest_transaction_id) return milliseconds(timeout_ms); if (m_next_transaction_id == m_oldest_transaction_id) return seconds(short_timeout);
std::vector<observer_ptr> timeouts; std::vector<observer_ptr> timeouts;
time_duration ret = milliseconds(timeout_ms); time_duration ret = seconds(short_timeout);
ptime now = time_now();
for (;m_next_transaction_id != m_oldest_transaction_id; for (;m_next_transaction_id != m_oldest_transaction_id;
m_oldest_transaction_id = (m_oldest_transaction_id + 1) % max_transactions) m_oldest_transaction_id = (m_oldest_transaction_id + 1) % max_transactions)
@@ -357,19 +359,14 @@ time_duration rpc_manager::tick()
observer_ptr o = m_transactions[m_oldest_transaction_id]; observer_ptr o = m_transactions[m_oldest_transaction_id];
if (!o) continue; if (!o) continue;
time_duration diff = o->sent() + milliseconds(timeout_ms) - time_now(); // if we reach an observer that hasn't timed out
if (diff > seconds(0)) // break, because every observer after this one will
// also not have timed out yet
time_duration diff = now - o->sent();
if (diff < seconds(timeout))
{ {
if (diff < seconds(1)) ret = seconds(timeout) - diff;
{ break;
ret = seconds(1);
break;
}
else
{
ret = diff;
break;
}
} }
#ifndef BOOST_NO_EXCEPTIONS #ifndef BOOST_NO_EXCEPTIONS
@@ -389,11 +386,35 @@ time_duration rpc_manager::tick()
std::for_each(timeouts.begin(), timeouts.end(), bind(&observer::timeout, _1)); std::for_each(timeouts.begin(), timeouts.end(), bind(&observer::timeout, _1));
timeouts.clear(); timeouts.clear();
// clear the aborted transactions, will likely // clear the aborted transactions, will likely
// generate new requests. We need to swap, since the // generate new requests. We need to swap, since the
// destrutors may add more observers to the m_aborted_transactions // destrutors may add more observers to the m_aborted_transactions
std::vector<observer_ptr>().swap(m_aborted_transactions); std::vector<observer_ptr>().swap(m_aborted_transactions);
for (int i = m_oldest_transaction_id; i != m_next_transaction_id;
i = (i + 1) % max_transactions)
{
observer_ptr o = m_transactions[i];
if (!o) continue;
// if we reach an observer that hasn't timed out
// break, because every observer after this one will
// also not have timed out yet
time_duration diff = now - o->sent();
if (diff < seconds(short_timeout))
{
ret = seconds(short_timeout) - diff;
break;
}
// TODO: don't call short_timeout() again if we've
// already called it once
timeouts.push_back(o);
}
std::for_each(timeouts.begin(), timeouts.end(), bind(&observer::short_timeout, _1));
return ret; return ret;
} }

View File

@@ -108,8 +108,29 @@ void traversal_algorithm::traverse(node_id const& id, udp::endpoint addr)
void traversal_algorithm::finished(node_id const& id) void traversal_algorithm::finished(node_id const& id)
{ {
std::vector<result>::iterator i = std::find_if(
m_results.begin()
, m_results.end()
, bind(
std::equal_to<node_id>()
, bind(&result::id, _1)
, id
)
);
TORRENT_ASSERT(i != m_results.end());
if (i != m_results.end())
{
// if this flag is set, it means we increased the
// branch factor for it, and we should restore it
if (i->flags & result::short_timeout)
--m_branch_factor;
}
++m_responses; ++m_responses;
--m_invoke_count; --m_invoke_count;
TORRENT_ASSERT(m_invoke_count >= 0);
add_requests(); add_requests();
if (m_invoke_count == 0) done(); if (m_invoke_count == 0) done();
} }
@@ -117,9 +138,9 @@ void traversal_algorithm::finished(node_id const& id)
// prevent request means that the total number of requests has // prevent request means that the total number of requests has
// overflown. This query failed because it was the oldest one. // overflown. This query failed because it was the oldest one.
// So, if this is true, don't make another request // So, if this is true, don't make another request
void traversal_algorithm::failed(node_id const& id, bool prevent_request) void traversal_algorithm::failed(node_id const& id, int flags)
{ {
--m_invoke_count; TORRENT_ASSERT(m_invoke_count >= 0);
TORRENT_ASSERT(!id.is_all_zeros()); TORRENT_ASSERT(!id.is_all_zeros());
std::vector<result>::iterator i = std::find_if( std::vector<result>::iterator i = std::find_if(
@@ -137,18 +158,44 @@ void traversal_algorithm::failed(node_id const& id, bool prevent_request)
if (i != m_results.end()) if (i != m_results.end())
{ {
TORRENT_ASSERT(i->flags & result::queried); TORRENT_ASSERT(i->flags & result::queried);
m_failed.insert(i->addr); if (flags & short_timeout)
{
// short timeout means that it has been more than
// two seconds since we sent the request, and that
// we'll most likely not get a response. But, in case
// we do get a late response, keep the handler
// around for some more, but open up the slot
// by increasing the branch factor
if ((i->flags & result::short_timeout) == 0)
++m_branch_factor;
i->flags |= result::short_timeout;
}
else
{
m_failed.insert(i->addr);
#ifdef TORRENT_DHT_VERBOSE_LOGGING #ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(traversal) << "failed: " << i->id << " " << i->addr; TORRENT_LOG(traversal) << "failed: " << i->id << " " << i->addr;
#endif #endif
// don't tell the routing table about // if this flag is set, it means we increased the
// node ids that we just generated ourself // branch factor for it, and we should restore it
if ((i->flags & result::no_id) == 0) if (i->flags & result::short_timeout)
m_node.m_table.node_failed(id); --m_branch_factor;
m_results.erase(i);
++m_timeouts; // don't tell the routing table about
// node ids that we just generated ourself
if ((i->flags & result::no_id) == 0)
m_node.m_table.node_failed(id);
m_results.erase(i);
++m_timeouts;
--m_invoke_count;
}
} }
if (prevent_request) else
{
--m_invoke_count;
}
if (flags & prevent_request)
{ {
--m_branch_factor; --m_branch_factor;
if (m_branch_factor <= 0) m_branch_factor = 1; if (m_branch_factor <= 0) m_branch_factor = 1;
@@ -186,13 +233,17 @@ void traversal_algorithm::add_requests()
if (i == last_iterator()) break; if (i == last_iterator()) break;
#ifndef BOOST_NO_EXCEPTIONS
try try
{ {
#endif
invoke(i->id, i->addr); invoke(i->id, i->addr);
++m_invoke_count; ++m_invoke_count;
i->flags |= result::queried; i->flags |= result::queried;
#ifndef BOOST_NO_EXCEPTIONS
} }
catch (std::exception& e) {} catch (std::exception& e) {}
#endif
} }
} }