From cee42ff5a1518e66a847b81b80f0cacf47aef68d Mon Sep 17 00:00:00 2001 From: Arvid Norberg Date: Wed, 7 Oct 2009 20:51:02 +0000 Subject: [PATCH] more DHT simplifications --- examples/client_test.cpp | 5 +- include/libtorrent/kademlia/dht_tracker.hpp | 4 +- include/libtorrent/kademlia/find_data.hpp | 31 +- include/libtorrent/kademlia/node.hpp | 24 +- include/libtorrent/kademlia/observer.hpp | 51 ++- include/libtorrent/kademlia/refresh.hpp | 4 +- include/libtorrent/kademlia/rpc_manager.hpp | 25 +- .../kademlia/traversal_algorithm.hpp | 64 +++- include/libtorrent/torrent_handle.hpp | 7 +- src/kademlia/dht_tracker.cpp | 16 +- src/kademlia/find_data.cpp | 55 ++-- src/kademlia/node.cpp | 33 +- src/kademlia/refresh.cpp | 5 +- src/kademlia/rpc_manager.cpp | 302 +++++++----------- src/kademlia/traversal_algorithm.cpp | 54 ++-- 15 files changed, 305 insertions(+), 375 deletions(-) diff --git a/examples/client_test.cpp b/examples/client_test.cpp index fc7fafbe1..6dcfd6705 100644 --- a/examples/client_test.cpp +++ b/examples/client_test.cpp @@ -783,7 +783,10 @@ int main(int argc, char* argv[]) handles_t handles; session ses(fingerprint("LT", LIBTORRENT_VERSION_MAJOR, LIBTORRENT_VERSION_MINOR, 0, 0) , session::add_default_plugins - , alert::all_categories & (~alert::dht_notification)); + , alert::all_categories + & ~(alert::dht_notification + + alert::progress_notification + + alert::debug_notification)); std::vector in; if (load_file(".ses_state", in) == 0) diff --git a/include/libtorrent/kademlia/dht_tracker.hpp b/include/libtorrent/kademlia/dht_tracker.hpp index 53aa424c4..e0bd057e0 100644 --- a/include/libtorrent/kademlia/dht_tracker.hpp +++ b/include/libtorrent/kademlia/dht_tracker.hpp @@ -77,7 +77,7 @@ namespace libtorrent { namespace dht { friend void intrusive_ptr_add_ref(dht_tracker const*); friend void intrusive_ptr_release(dht_tracker const*); - friend void send_callback(void* userdata, entry const& e, udp::endpoint const& addr, int flags); + friend bool send_callback(void* userdata, entry const& e, udp::endpoint const& addr, int flags); dht_tracker(libtorrent::aux::session_impl& ses, rate_limited_udp_socket& sock , dht_settings const& settings, entry const* state = 0); @@ -115,7 +115,7 @@ namespace libtorrent { namespace dht void tick(error_code const& e); void on_bootstrap(std::vector > const&); - void send_packet(libtorrent::entry const& e, udp::endpoint const& addr, int send_flags); + bool send_packet(libtorrent::entry const& e, udp::endpoint const& addr, int send_flags); node_impl m_dht; libtorrent::aux::session_impl& m_ses; diff --git a/include/libtorrent/kademlia/find_data.hpp b/include/libtorrent/kademlia/find_data.hpp index 39f330e83..94d233e20 100644 --- a/include/libtorrent/kademlia/find_data.hpp +++ b/include/libtorrent/kademlia/find_data.hpp @@ -61,7 +61,7 @@ class find_data : public traversal_algorithm { public: typedef boost::function const&)> data_callback; - typedef boost::function > const&)> nodes_callback; + typedef boost::function > const&, bool)> nodes_callback; void got_peers(std::vector const& peers); void got_write_token(node_id const& n, std::string const& write_token) @@ -78,46 +78,27 @@ public: protected: void done(); + virtual bool invoke(node_id const& id, udp::endpoint addr); private: - virtual void invoke(node_id const& id, udp::endpoint addr); - data_callback m_data_callback; nodes_callback m_nodes_callback; std::map m_write_tokens; node_id const m_target; - bool m_done; + bool m_done:1; + bool m_got_peers:1; }; class find_data_observer : public observer { public: find_data_observer( - boost::intrusive_ptr const& algorithm + boost::intrusive_ptr const& algorithm , node_id self) - : observer(algorithm->allocator()) - , m_algorithm(algorithm) - , m_self(self) + : observer(algorithm) {} - ~find_data_observer(); - void short_timeout(); - void timeout(); void reply(msg const&); - void abort() { m_algorithm = 0; } - - // with verbose logging, we log the size and - // offset of this structs members, so we need - // access to all of them -#ifndef TORRENT_DHT_VERBOSE_LOGGING -private: -#endif - boost::intrusive_ptr m_algorithm; - // the node this observer sent a message to - // this is used to mark the result in the right - // node when we get a response - // TODO: replace this with the observer this-pointer - node_id const m_self; }; } } // namespace libtorrent::dht diff --git a/include/libtorrent/kademlia/node.hpp b/include/libtorrent/kademlia/node.hpp index 18d08ddcf..c26cb994f 100644 --- a/include/libtorrent/kademlia/node.hpp +++ b/include/libtorrent/kademlia/node.hpp @@ -154,25 +154,11 @@ struct null_type {}; class announce_observer : public observer { public: - announce_observer(boost::pool<>& allocator - , sha1_hash const& info_hash - , int listen_port - , std::string const& write_token) - : observer(allocator) - , m_info_hash(info_hash) - , m_listen_port(listen_port) - , m_token(write_token) + announce_observer(boost::intrusive_ptr const& algo) + : observer(algo) {} - void short_timeout() {} - void timeout() {} - void reply(msg const&) {} - void abort() {} - -private: - sha1_hash m_info_hash; - int m_listen_port; - std::string m_token; + void reply(msg const&) { m_done = true; } }; struct count_peers @@ -192,7 +178,7 @@ typedef std::map table_t; typedef std::map, search_torrent_entry> search_table_t; public: node_impl(libtorrent::aux::session_impl& ses - , void (*f)(void*, entry const&, udp::endpoint const&, int) + , bool (*f)(void*, entry const&, udp::endpoint const&, int) , dht_settings const& settings, boost::optional nid , void* userdata); @@ -317,7 +303,7 @@ private: int m_secret[2]; libtorrent::aux::session_impl& m_ses; - void (*m_send)(void*, entry const&, udp::endpoint const&, int); + bool (*m_send)(void*, entry const&, udp::endpoint const&, int); void* m_userdata; }; diff --git a/include/libtorrent/kademlia/observer.hpp b/include/libtorrent/kademlia/observer.hpp index 7aa9c109e..4c36bd0f1 100644 --- a/include/libtorrent/kademlia/observer.hpp +++ b/include/libtorrent/kademlia/observer.hpp @@ -43,6 +43,7 @@ namespace dht { struct observer; struct msg; +struct traversal_algorithm; // defined in rpc_manager.cpp TORRENT_EXPORT void intrusive_ptr_add_ref(observer const*); @@ -55,7 +56,7 @@ TORRENT_EXPORT void intrusive_ptr_release(observer const*); // 16 4 4 pool_allocator // 20 16 4 m_addr // 36 2 2 m_port -// 38 1 1 m_is_v6, m_in_constructor +// 38 1 1 m_is_v6, m_short_timeout, m_in_constructor, m_was_sent // 39 1 1 // 40 @@ -64,37 +65,41 @@ struct observer : boost::noncopyable friend TORRENT_EXPORT void intrusive_ptr_add_ref(observer const*); friend TORRENT_EXPORT void intrusive_ptr_release(observer const*); - observer(boost::pool<>& p) + observer(boost::intrusive_ptr const& a) : m_sent() , m_refs(0) - , pool_allocator(p) + , m_algorithm(a) + , m_is_v6(false) + , m_short_timeout(false) + , m_done(false) { + TORRENT_ASSERT(a); #ifdef TORRENT_DEBUG m_in_constructor = true; + m_was_sent = false; #endif } - virtual ~observer() - { - TORRENT_ASSERT(!m_in_constructor); - } + virtual ~observer(); // this is called when a reply is received virtual void reply(msg const& m) = 0; // this is called if no response has been received after // a few seconds, before the request has timed out - virtual void short_timeout() = 0; + void short_timeout(); + + bool has_short_timeout() const { return m_short_timeout; } // this is called when no reply has been received within // some timeout - virtual void timeout() = 0; + void timeout(); // if this is called the destructor should // not invoke any new messages, and should // only clean up. It means the rpc-manager // is being destructed - virtual void abort() = 0; + void abort(); ptime sent() const { return m_sent; } @@ -102,18 +107,25 @@ struct observer : boost::noncopyable address target_addr() const; udp::endpoint target_ep() const; - // with verbose logging, we log the size and - // offset of this structs members, so we need - // access to all of them + void set_transaction_id(boost::uint16_t tid) + { m_transaction_id = tid; } + + boost::uint16_t transaction_id() const + { return m_transaction_id; } + #ifndef TORRENT_DHT_VERBOSE_LOGGING -private: +protected: #endif + void done(); + ptime m_sent; // reference counter for intrusive_ptr mutable boost::detail::atomic_count m_refs; - boost::pool<>& pool_allocator; + + const boost::intrusive_ptr m_algorithm; + union addr_t { #if TORRENT_USE_IPV6 @@ -124,10 +136,19 @@ private: boost::uint16_t m_port; + // the transaction ID for this call + boost::uint16_t m_transaction_id; + bool m_is_v6:1; + bool m_short_timeout:1; + // when true, this observer has reported + // back to the traversal algorithm already + bool m_done:1; + #ifdef TORRENT_DEBUG public: bool m_in_constructor:1; + bool m_was_sent:1; #endif }; diff --git a/include/libtorrent/kademlia/refresh.hpp b/include/libtorrent/kademlia/refresh.hpp index 03ed6d993..dffea9d88 100644 --- a/include/libtorrent/kademlia/refresh.hpp +++ b/include/libtorrent/kademlia/refresh.hpp @@ -55,9 +55,9 @@ public: virtual char const* name() const; -private: +protected: - void invoke(node_id const& id, udp::endpoint addr); + virtual bool invoke(node_id const& id, udp::endpoint addr); }; } } // namespace libtorrent::dht diff --git a/include/libtorrent/kademlia/rpc_manager.hpp b/include/libtorrent/kademlia/rpc_manager.hpp index 035465640..8784574c9 100644 --- a/include/libtorrent/kademlia/rpc_manager.hpp +++ b/include/libtorrent/kademlia/rpc_manager.hpp @@ -60,11 +60,8 @@ TORRENT_DECLARE_LOG(rpc); struct null_observer : public observer { - null_observer(boost::pool<>& allocator): observer(allocator) {} - virtual void reply(msg const&) {} - virtual void short_timeout() {} - virtual void timeout() {} - void abort() {} + null_observer(boost::intrusive_ptr& a): observer(a) {} + virtual void reply(msg const&) { m_done = true; } }; class routing_table; @@ -72,7 +69,7 @@ class routing_table; class rpc_manager { public: - typedef void (*send_fun)(void* userdata, entry const&, udp::endpoint const&, int); + typedef bool (*send_fun)(void* userdata, entry const&, udp::endpoint const&, int); rpc_manager(node_id const& our_id , routing_table& table, send_fun const& sf @@ -85,7 +82,7 @@ public: bool incoming(msg const&); time_duration tick(); - void invoke(entry& e, udp::endpoint target + bool invoke(entry& e, udp::endpoint target , observer_ptr o); void add_our_id(entry& e); @@ -100,27 +97,17 @@ public: private: - enum { max_transactions = 2048 }; + enum { max_transaction_id = 0x10000 }; - unsigned int new_transaction_id(observer_ptr o); - void update_oldest_transaction_id(); - boost::uint32_t calc_connection_id(udp::endpoint addr); mutable boost::pool<> m_pool_allocator; - typedef boost::array - transactions_t; + typedef std::list transactions_t; transactions_t m_transactions; - std::vector m_aborted_transactions; // this is the next transaction id to be used int m_next_transaction_id; - // this is the oldest transaction id still - // (possibly) in use. This is the transaction - // that will time out first, the one we are - // waiting for to time out - int m_oldest_transaction_id; send_fun m_send; void* m_userdata; diff --git a/include/libtorrent/kademlia/traversal_algorithm.hpp b/include/libtorrent/kademlia/traversal_algorithm.hpp index cdcf8b044..e82755d91 100644 --- a/include/libtorrent/kademlia/traversal_algorithm.hpp +++ b/include/libtorrent/kademlia/traversal_algorithm.hpp @@ -60,10 +60,10 @@ class traversal_algorithm : boost::noncopyable { public: void traverse(node_id const& id, udp::endpoint addr); - void finished(node_id const& id); + void finished(udp::endpoint const& ep); enum flags_t { prevent_request = 1, short_timeout = 2 }; - void failed(node_id const& id, int flags = 0); + void failed(udp::endpoint const& ep, int flags = 0); virtual ~traversal_algorithm(); boost::pool<>& allocator() const; void status(dht_lookup& l); @@ -77,18 +77,51 @@ public: struct result { - result(node_id const& id, udp::endpoint addr, unsigned char f = 0) - : id(id), addr(addr), flags(f) {} + result(node_id const& id, udp::endpoint ep, unsigned char f = 0) + : id(id), flags(f) + { + if (ep.address().is_v6()) + { + flags |= ipv6_address; + addr.v6 = ep.address().to_v6().to_bytes(); + } + else + { + flags &= ~ipv6_address; + addr.v4 = ep.address().to_v4().to_bytes(); + } + port = ep.port(); + } + + udp::endpoint endpoint() const + { + if (flags & ipv6_address) + return udp::endpoint(address_v6(addr.v6), port); + else + return udp::endpoint(address_v4(addr.v4), port); + } node_id id; - // TODO: replace with union of address_v4 and address_v6 and a port - udp::endpoint addr; - enum { queried = 1, initial = 2, no_id = 4, short_timeout = 8 }; + + union addr_t + { + address_v4::bytes_type v4; + address_v6::bytes_type v6; + } addr; + + boost::uint16_t port; + + enum { + queried = 1, + initial = 2, + no_id = 4, + short_timeout = 8, + failed = 16, + ipv6_address = 32 + }; unsigned char flags; }; -protected: - traversal_algorithm( node_impl& node , node_id target) @@ -99,14 +132,20 @@ protected: , m_branch_factor(3) , m_responses(0) , m_timeouts(0) - {} + { +#ifdef TORRENT_DHT_VERBOSE_LOGGING + TORRENT_LOG(traversal) << " [" << this << "] new traversal process"; +#endif + } + +protected: void add_requests(); void add_router_entries(); void init(); - virtual void done() = 0; - virtual void invoke(node_id const& id, udp::endpoint addr) = 0; + virtual void done() {} + virtual bool invoke(node_id const& id, udp::endpoint addr) { return false; } std::vector::iterator last_iterator(); @@ -126,7 +165,6 @@ protected: node_impl& m_node; node_id m_target; std::vector m_results; - std::set m_failed; int m_invoke_count; int m_branch_factor; int m_responses; diff --git a/include/libtorrent/torrent_handle.hpp b/include/libtorrent/torrent_handle.hpp index 8441a6876..f5b96e73f 100644 --- a/include/libtorrent/torrent_handle.hpp +++ b/include/libtorrent/torrent_handle.hpp @@ -308,12 +308,7 @@ namespace libtorrent { none, requested, writing, finished }; private: -#ifdef __SUNPRO_CC - // sunpro is strict about POD types in unions - struct -#else - union -#endif + union addr_t { address_v4::bytes_type v4; address_v6::bytes_type v6; diff --git a/src/kademlia/dht_tracker.cpp b/src/kademlia/dht_tracker.cpp index 97ca90012..7257fff46 100644 --- a/src/kademlia/dht_tracker.cpp +++ b/src/kademlia/dht_tracker.cpp @@ -200,10 +200,10 @@ namespace libtorrent { namespace dht return boost::optional(node_id(nid->string().c_str())); } - void send_callback(void* userdata, entry const& e, udp::endpoint const& addr, int flags) + bool send_callback(void* userdata, entry const& e, udp::endpoint const& addr, int flags) { dht_tracker* self = (dht_tracker*)userdata; - self->send_packet(e, addr, flags); + return self->send_packet(e, addr, flags); } // class that puts the networking and the kademlia node in a single @@ -244,7 +244,7 @@ namespace libtorrent { namespace dht // rpc_log().enable(false); // node_log().enable(false); - traversal_log().enable(false); +// traversal_log().enable(false); // dht_tracker_log.enable(false); TORRENT_LOG(dht_tracker) << "starting DHT tracker with node id: " << m_dht.nid(); @@ -617,7 +617,7 @@ namespace libtorrent { namespace dht void dht_tracker::on_bootstrap(std::vector > const&) {} - void dht_tracker::send_packet(libtorrent::entry const& e, udp::endpoint const& addr, int send_flags) + bool dht_tracker::send_packet(libtorrent::entry const& e, udp::endpoint const& addr, int send_flags) { using libtorrent::bencode; using libtorrent::entry; @@ -636,6 +636,8 @@ namespace libtorrent { namespace dht error_code ec; if (m_sock.send(addr, &m_send_buf[0], (int)m_send_buf.size(), ec, send_flags)) { + if (ec) return false; + // account for IP and UDP overhead m_sent_bytes += m_send_buf.size() + (addr.address().is_v6() ? 48 : 28); @@ -654,13 +656,15 @@ namespace libtorrent { namespace dht } TORRENT_LOG(dht_tracker) << "==> " << addr << " " << log_line.str(); #endif + return true; } -#ifdef TORRENT_DHT_VERBOSE_LOGGING else { +#ifdef TORRENT_DHT_VERBOSE_LOGGING TORRENT_LOG(dht_tracker) << "==> " << addr << " DROPPED " << log_line.str(); - } #endif + return false; + } } }} diff --git a/src/kademlia/find_data.cpp b/src/kademlia/find_data.cpp index aaf488860..6f131e763 100644 --- a/src/kademlia/find_data.cpp +++ b/src/kademlia/find_data.cpp @@ -52,29 +52,18 @@ using detail::read_v4_endpoint; using detail::read_v6_endpoint; using detail::read_endpoint_list; -find_data_observer::~find_data_observer() -{ - if (m_algorithm) m_algorithm->failed(m_self); -} - void find_data_observer::reply(msg const& m) { - if (!m_algorithm) - { - TORRENT_ASSERT(false); - return; - } - #ifdef TORRENT_DHT_VERBOSE_LOGGING std::stringstream log_line; - log_line << " incoming get_peer response [ "; + log_line << "[" << m_algorithm.get() << "] incoming get_peer response [ "; #endif lazy_entry const* r = m.message.dict_find_dict("r"); if (!r) { #ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(dht_tracker) << " missing response dict"; + TORRENT_LOG(dht_tracker) << "[" << m_algorithm.get() << "] missing response dict"; #endif return; } @@ -83,7 +72,7 @@ void find_data_observer::reply(msg const& m) if (!id || id->string_length() != 20) { #ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(dht_tracker) << " invalid id in response"; + TORRENT_LOG(dht_tracker) << "[" << m_algorithm.get() << "] invalid id in response"; #endif return; } @@ -91,7 +80,9 @@ void find_data_observer::reply(msg const& m) lazy_entry const* token = r->dict_find_string("token"); if (token) { - m_algorithm->got_write_token(node_id(id->string_ptr()), token->string_value()); + static_cast(m_algorithm.get())->got_write_token( + node_id(id->string_ptr()), token->string_value()); + #ifdef TORRENT_DHT_VERBOSE_LOGGING log_line << " token: " << to_hex(token->string_value()); #endif @@ -122,7 +113,7 @@ void find_data_observer::reply(msg const& m) log_line << " p: " << n->list_size(); #endif } - m_algorithm->got_peers(peer_list); + static_cast(m_algorithm.get())->got_peers(peer_list); } // look for nodes @@ -169,25 +160,11 @@ void find_data_observer::reply(msg const& m) #endif } } - m_algorithm->finished(m_self); - m_algorithm = 0; #ifdef TORRENT_DHT_VERBOSE_LOGGING log_line << " ]"; TORRENT_LOG(dht_tracker) << log_line.str(); #endif -} - -void find_data_observer::short_timeout() -{ - if (!m_algorithm) return; - m_algorithm->failed(m_self, traversal_algorithm::short_timeout); -} - -void find_data_observer::timeout() -{ - if (!m_algorithm) return; - m_algorithm->failed(m_self); - m_algorithm = 0; + done(); } find_data::find_data( @@ -200,6 +177,7 @@ find_data::find_data( , m_nodes_callback(ncallback) , m_target(target) , m_done(false) + , m_got_peers(false) { for (routing_table::const_iterator i = node.m_table.begin() , end(node.m_table.end()); i != end; ++i) @@ -208,12 +186,12 @@ find_data::find_data( } } -void find_data::invoke(node_id const& id, udp::endpoint addr) +bool find_data::invoke(node_id const& id, udp::endpoint addr) { if (m_done) { m_invoke_count = -1; - return; + return false; } TORRENT_ASSERT(m_node.m_rpc.allocation_size() >= sizeof(find_data_observer)); @@ -221,7 +199,7 @@ void find_data::invoke(node_id const& id, udp::endpoint addr) if (ptr == 0) { done(); - return; + return false; } m_node.m_rpc.allocator().set_next_size(10); observer_ptr o(new (ptr) find_data_observer(this, id)); @@ -233,11 +211,12 @@ void find_data::invoke(node_id const& id, udp::endpoint addr) e["q"] = "get_peers"; entry& a = e["a"]; a["info_hash"] = id.to_string(); - m_node.m_rpc.invoke(e, addr, o); + return m_node.m_rpc.invoke(e, addr, o); } void find_data::got_peers(std::vector const& peers) { + if (!peers.empty()) m_got_peers = true; m_data_callback(peers); } @@ -245,6 +224,8 @@ void find_data::done() { if (m_invoke_count != 0) return; + m_done = true; + std::vector > results; int num_results = m_node.m_table.bucket_size(); for (std::vector::iterator i = m_results.begin() @@ -254,10 +235,10 @@ void find_data::done() if ((i->flags & result::queried) == 0) continue; std::map::iterator j = m_write_tokens.find(i->id); if (j == m_write_tokens.end()) continue; - results.push_back(std::make_pair(node_entry(i->id, i->addr), j->second)); + results.push_back(std::make_pair(node_entry(i->id, i->endpoint()), j->second)); --num_results; } - m_nodes_callback(results); + m_nodes_callback(results, m_got_peers); } } } // namespace libtorrent::dht diff --git a/src/kademlia/node.cpp b/src/kademlia/node.cpp index 8016ea8dc..4ecb7bff6 100644 --- a/src/kademlia/node.cpp +++ b/src/kademlia/node.cpp @@ -183,7 +183,7 @@ void nop() {} // instead, and make the dht_tracker less dependent on session_impl // which would make it simpler to unit test node_impl::node_impl(libtorrent::aux::session_impl& ses - , void (*f)(void*, entry const&, udp::endpoint const&, int) + , bool (*f)(void*, entry const&, udp::endpoint const&, int) , dht_settings const& settings , boost::optional nid , void* userdata) @@ -372,14 +372,18 @@ void node_impl::incoming(msg const& m) namespace { void announce_fun(std::vector > const& v - , rpc_manager& rpc, int listen_port, sha1_hash const& ih) + , node_impl& node, int listen_port, sha1_hash const& ih) { #ifdef TORRENT_DHT_VERBOSE_LOGGING TORRENT_LOG(node) << "sending announce_peer [ ih: " << ih << " p: " << listen_port << " nodes: " << v.size() << " ]" ; #endif - + + // create a dummy traversal_algorithm + boost::intrusive_ptr algo( + new traversal_algorithm(node, node_id::min())); + // store on the first k nodes for (std::vector >::const_iterator i = v.begin() , end(v.end()); i != end; ++i) @@ -388,11 +392,10 @@ namespace TORRENT_LOG(node) << " distance: " << (160 - distance_exp(ih, i->first.id)); #endif - void* ptr = rpc.allocator().malloc(); + void* ptr = node.m_rpc.allocator().malloc(); if (ptr == 0) return; - rpc.allocator().set_next_size(10); - observer_ptr o(new (ptr) announce_observer( - rpc.allocator(), ih, listen_port, i->second)); + node.m_rpc.allocator().set_next_size(10); + observer_ptr o(new (ptr) announce_observer(algo)); #ifdef TORRENT_DEBUG o->m_in_constructor = false; #endif @@ -400,9 +403,10 @@ namespace e["y"] = "q"; e["q"] = "announce_peer"; entry& a = e["a"]; + a["info_hash"] = ih.to_string(); a["port"] = listen_port; a["token"] = i->second; - rpc.invoke(e, i->first.ep(), o); + node.m_rpc.invoke(e, i->first.ep(), o); } } } @@ -422,7 +426,11 @@ void node_impl::add_node(udp::endpoint node) void* ptr = m_rpc.allocator().malloc(); if (ptr == 0) return; m_rpc.allocator().set_next_size(10); - observer_ptr o(new (ptr) null_observer(m_rpc.allocator())); + + // create a dummy traversal_algorithm + boost::intrusive_ptr algo( + new traversal_algorithm(*this, node_id::min())); + observer_ptr o(new (ptr) null_observer(algo)); #ifdef TORRENT_DEBUG o->m_in_constructor = false; #endif @@ -441,7 +449,7 @@ void node_impl::announce(sha1_hash const& info_hash, int listen_port // search for nodes with ids close to id or with peers // for info-hash id. then send announce_peer to them. boost::intrusive_ptr ta(new find_data(*this, info_hash, f - , boost::bind(&announce_fun, _1, boost::ref(m_rpc) + , boost::bind(&announce_fun, _1, boost::ref(*this) , listen_port, info_hash))); ta->start(); } @@ -583,6 +591,7 @@ bool node_impl::lookup_peers(sha1_hash const& info_hash, entry& reply) const if (i == m_map.end()) return false; torrent_entry const& v = i->second; + if (v.peers.empty()) return false; int num = (std::min)((int)v.peers.size(), m_settings.max_peers_reply); int t = 0; @@ -767,9 +776,9 @@ void node_impl::incoming_request(msg const& m, entry& e) m_table.find_node(info_hash, n, 0); write_nodes_entry(reply, n); - lookup_peers(info_hash, reply); + bool ret = lookup_peers(info_hash, reply); #ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(node) << " values: " << reply["values"].list().size(); + if (ret) TORRENT_LOG(node) << " values: " << reply["values"].list().size(); #endif } else if (strcmp(query, "find_node") == 0) diff --git a/src/kademlia/refresh.cpp b/src/kademlia/refresh.cpp index 57e4a1183..1b777ffdf 100644 --- a/src/kademlia/refresh.cpp +++ b/src/kademlia/refresh.cpp @@ -54,14 +54,14 @@ char const* refresh::name() const return "refresh"; } -void refresh::invoke(node_id const& nid, udp::endpoint addr) +bool refresh::invoke(node_id const& nid, udp::endpoint addr) { TORRENT_ASSERT(m_node.m_rpc.allocation_size() >= sizeof(find_data_observer)); void* ptr = m_node.m_rpc.allocator().malloc(); if (ptr == 0) { done(); - return; + return false; } m_node.m_rpc.allocator().set_next_size(10); observer_ptr o(new (ptr) find_data_observer(this, nid)); @@ -74,6 +74,7 @@ void refresh::invoke(node_id const& nid, udp::endpoint addr) entry& a = e["a"]; a["target"] = target().to_string(); m_node.m_rpc.invoke(e, addr, o); + return true; } } } // namespace libtorrent::dht diff --git a/src/kademlia/rpc_manager.cpp b/src/kademlia/rpc_manager.cpp index f94b507ae..6d29be3a6 100644 --- a/src/kademlia/rpc_manager.cpp +++ b/src/kademlia/rpc_manager.cpp @@ -80,7 +80,7 @@ void intrusive_ptr_release(observer const* o) TORRENT_ASSERT(o != 0); if (--o->m_refs == 0) { - boost::pool<>& p = o->pool_allocator; + boost::pool<>& p = o->m_algorithm->allocator(); (const_cast(o))->~observer(); p.free(const_cast(o)); } @@ -123,6 +123,36 @@ udp::endpoint observer::target_ep() const return udp::endpoint(target_addr(), m_port); } +void observer::abort() +{ + if (m_done) return; + m_done = true; + m_algorithm->failed(target_ep(), traversal_algorithm::prevent_request); +} + +void observer::done() +{ + if (m_done) return; + m_done = true; + m_algorithm->finished(target_ep()); +} + +void observer::short_timeout() +{ + if (m_short_timeout) return; + TORRENT_ASSERT(m_short_timeout == false); + m_short_timeout = true; + m_algorithm->failed(target_ep(), traversal_algorithm::short_timeout); +} + +// this is called when no reply has been received within +// some timeout +void observer::timeout() +{ + if (m_done) return; + m_done = true; + m_algorithm->failed(target_ep()); +} node_id generate_id(); @@ -140,8 +170,7 @@ rpc_manager::rpc_manager(node_id const& our_id , routing_table& table, send_fun const& sf , void* userdata) : m_pool_allocator(sizeof(mpl::deref::type), 10) - , m_next_transaction_id(std::rand() % max_transactions) - , m_oldest_transaction_id(m_next_transaction_id) + , m_next_transaction_id(std::rand() % max_transaction_id) , m_send(sf) , m_userdata(userdata) , m_our_id(our_id) @@ -154,24 +183,30 @@ rpc_manager::rpc_manager(node_id const& our_id #ifdef TORRENT_DHT_VERBOSE_LOGGING TORRENT_LOG(rpc) << "Constructing"; - TORRENT_LOG(rpc) << " announce_observer: " << sizeof(announce_observer); - TORRENT_LOG(rpc) << " null_observer: " << sizeof(null_observer); #define PRINT_OFFSETOF(x, y) TORRENT_LOG(rpc) << " +" << offsetof(x, y) << ": " #y TORRENT_LOG(rpc) << " observer: " << sizeof(observer); - PRINT_OFFSETOF(observer, pool_allocator); PRINT_OFFSETOF(observer, m_sent); PRINT_OFFSETOF(observer, m_refs); + PRINT_OFFSETOF(observer, m_algorithm); PRINT_OFFSETOF(observer, m_addr); PRINT_OFFSETOF(observer, m_port); + PRINT_OFFSETOF(observer, m_transaction_id); + TORRENT_LOG(rpc) << " announce_observer: " << sizeof(announce_observer); + TORRENT_LOG(rpc) << " null_observer: " << sizeof(null_observer); TORRENT_LOG(rpc) << " find_data_observer: " << sizeof(find_data_observer); - PRINT_OFFSETOF(find_data_observer, m_algorithm); - PRINT_OFFSETOF(find_data_observer, m_self); + + TORRENT_LOG(rpc) << " traversal_algorithm::result: " << sizeof(traversal_algorithm::result); + PRINT_OFFSETOF(traversal_algorithm::result, id); + PRINT_OFFSETOF(traversal_algorithm::result, addr); + PRINT_OFFSETOF(traversal_algorithm::result, port); + PRINT_OFFSETOF(traversal_algorithm::result, flags); #undef PRINT_OFFSETOF #endif + } rpc_manager::~rpc_manager() @@ -181,13 +216,11 @@ rpc_manager::~rpc_manager() #ifdef TORRENT_DHT_VERBOSE_LOGGING TORRENT_LOG(rpc) << "Destructing"; #endif - std::for_each(m_aborted_transactions.begin(), m_aborted_transactions.end() - , bind(&observer::abort, _1)); for (transactions_t::iterator i = m_transactions.begin() , end(m_transactions.end()); i != end; ++i) { - if (*i) (*i)->abort(); + (*i)->abort(); } } @@ -200,16 +233,13 @@ size_t rpc_manager::allocation_size() const void rpc_manager::check_invariant() const { - TORRENT_ASSERT(m_oldest_transaction_id >= 0); - TORRENT_ASSERT(m_oldest_transaction_id < max_transactions); TORRENT_ASSERT(m_next_transaction_id >= 0); - TORRENT_ASSERT(m_next_transaction_id < max_transactions); - TORRENT_ASSERT(!m_transactions[m_next_transaction_id]); + TORRENT_ASSERT(m_next_transaction_id < max_transaction_id); - for (int i = (m_next_transaction_id + 1) % max_transactions; - i != m_oldest_transaction_id; i = (i + 1) % max_transactions) + for (transactions_t::const_iterator i = m_transactions.begin() + , end(m_transactions.end()); i != end; ++i) { - TORRENT_ASSERT(!m_transactions[i]); + TORRENT_ASSERT(*i); } } #endif @@ -219,31 +249,20 @@ void rpc_manager::unreachable(udp::endpoint const& ep) #ifdef TORRENT_DHT_VERBOSE_LOGGING TORRENT_LOG(rpc) << time_now_string() << " PORT_UNREACHABLE [ ip: " << ep << " ]"; #endif - int num_active = m_oldest_transaction_id < m_next_transaction_id - ? m_next_transaction_id - m_oldest_transaction_id - : max_transactions - m_oldest_transaction_id + m_next_transaction_id; - TORRENT_ASSERT((m_oldest_transaction_id + num_active) % max_transactions - == m_next_transaction_id); - int tid = m_oldest_transaction_id; - for (int i = 0; i < num_active; ++i, ++tid) + + for (transactions_t::iterator i = m_transactions.begin(); + i != m_transactions.end();) { - if (tid >= max_transactions) tid = 0; - observer_ptr const& o = m_transactions[tid]; - if (!o) continue; + TORRENT_ASSERT(*i); + observer_ptr const& o = *i; if (o->target_ep() != ep) continue; - observer_ptr ptr = m_transactions[tid]; - m_transactions[tid] = 0; - if (tid == m_oldest_transaction_id) - { - ++m_oldest_transaction_id; - if (m_oldest_transaction_id >= max_transactions) - m_oldest_transaction_id = 0; - } + observer_ptr ptr = *i; + m_transactions.erase(i++); #ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(rpc) << " found transaction [ tid: " << tid << " ]"; + TORRENT_LOG(rpc) << " found transaction [ tid: " << ptr->transaction_id() << " ]"; #endif ptr->timeout(); - return; + break; } } @@ -269,7 +288,18 @@ bool rpc_manager::incoming(msg const& m) observer_ptr o; - if (tid >= (int)m_transactions.size() || tid < 0) + for (transactions_t::iterator i = m_transactions.begin() + , end(m_transactions.end()); i != end; ++i) + { + TORRENT_ASSERT(*i); + if ((*i)->transaction_id() != tid) continue; + if (m.addr.address() != (*i)->target_addr()) continue; + o = *i; + m_transactions.erase(i); + break; + } + + if (!o) { #ifdef TORRENT_DHT_VERBOSE_LOGGING TORRENT_LOG(rpc) << "Reply with invalid transaction id size: " @@ -281,26 +311,6 @@ bool rpc_manager::incoming(msg const& m) return false; } - o = m_transactions[tid]; - - if (!o) - { -#ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(rpc) << "Reply to a timed out request " - << tid << " from " << m.addr; -#endif - return false; - } - - if (m.addr.address() != o->target_addr()) - { -#ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(rpc) << "Reply with incorrect address and valid transaction id: " - << tid << " from " << m.addr << " expected: " << o->target_addr(); -#endif - return false; - } - #ifdef TORRENT_DHT_VERBOSE_LOGGING std::ofstream reply_stats("round_trip_ms.log", std::ios::app); reply_stats << m.addr << "\t" << total_milliseconds(time_now_hires() - o->sent()) @@ -326,11 +336,10 @@ bool rpc_manager::incoming(msg const& m) } #ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(rpc) << "Reply with transaction id: " + TORRENT_LOG(rpc) << "[" << o->m_algorithm.get() << "] Reply with transaction id: " << tid << " from " << m.addr; #endif o->reply(m); - m_transactions[tid] = 0; return m_table.node_seen(node_id(node_id_ent->string_ptr()), m.addr); } @@ -338,26 +347,22 @@ time_duration rpc_manager::tick() { INVARIANT_CHECK; - const static int short_timeout = 2; - const static int timeout = 10; + const static int short_timeout = 3; + const static int timeout = 20; // look for observers that have timed out - if (m_next_transaction_id == m_oldest_transaction_id) return seconds(short_timeout); + if (m_transactions.empty()) return seconds(short_timeout); - std::vector timeouts; + std::list timeouts; time_duration ret = seconds(short_timeout); ptime now = time_now(); - for (;m_next_transaction_id != m_oldest_transaction_id; - m_oldest_transaction_id = (m_oldest_transaction_id + 1) % max_transactions) + for (transactions_t::iterator i = m_transactions.begin(); + i != m_transactions.end();) { - TORRENT_ASSERT(m_oldest_transaction_id >= 0); - TORRENT_ASSERT(m_oldest_transaction_id < max_transactions); - - observer_ptr o = m_transactions[m_oldest_transaction_id]; - if (!o) continue; + observer_ptr o = *i; // if we reach an observer that hasn't timed out // break, because every observer after this one will @@ -369,34 +374,21 @@ time_duration rpc_manager::tick() break; } -#ifndef BOOST_NO_EXCEPTIONS - try - { -#endif - m_transactions[m_oldest_transaction_id] = 0; #ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(rpc) << "Timing out transaction id: " - << m_oldest_transaction_id << " from " << o->target_ep(); -#endif - timeouts.push_back(o); -#ifndef BOOST_NO_EXCEPTIONS - } catch (std::exception) {} + TORRENT_LOG(rpc) << "[" << o->m_algorithm.get() << "] Timing out transaction id: " + << (*i)->transaction_id() << " from " << o->target_ep(); #endif + m_transactions.erase(i++); + timeouts.push_back(o); } std::for_each(timeouts.begin(), timeouts.end(), bind(&observer::timeout, _1)); timeouts.clear(); - // clear the aborted transactions, will likely - // generate new requests. We need to swap, since the - // destrutors may add more observers to the m_aborted_transactions - std::vector().swap(m_aborted_transactions); - - for (int i = m_oldest_transaction_id; i != m_next_transaction_id; - i = (i + 1) % max_transactions) + for (transactions_t::iterator i = m_transactions.begin(); + i != m_transactions.end(); ++i) { - observer_ptr o = m_transactions[i]; - if (!o) continue; + observer_ptr o = *i; // if we reach an observer that hasn't timed out // break, because every observer after this one will @@ -408,6 +400,8 @@ time_duration rpc_manager::tick() break; } + if (o->has_short_timeout()) continue; + // TODO: don't call short_timeout() again if we've // already called it once timeouts.push_back(o); @@ -418,119 +412,57 @@ time_duration rpc_manager::tick() return ret; } -unsigned int rpc_manager::new_transaction_id(observer_ptr o) -{ - INVARIANT_CHECK; - - unsigned int tid = m_next_transaction_id; - m_next_transaction_id = (m_next_transaction_id + 1) % max_transactions; - if (m_transactions[m_next_transaction_id]) - { - // moving the observer into the set of aborted transactions - // it will prevent it from spawning new requests right now, - // since that would break the invariant - observer_ptr o = m_transactions[m_next_transaction_id]; - m_aborted_transactions.push_back(o); -#ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(rpc) << "[new_transaction_id] Aborting message with transaction id: " - << m_next_transaction_id << " sent to " << o->target_ep() - << " " << total_seconds(time_now() - o->sent()) << " seconds ago"; -#endif - m_transactions[m_next_transaction_id] = 0; - TORRENT_ASSERT(m_oldest_transaction_id == m_next_transaction_id); - } - TORRENT_ASSERT(!m_transactions[tid]); - m_transactions[tid] = o; - if (m_oldest_transaction_id == m_next_transaction_id) - { - m_oldest_transaction_id = (m_oldest_transaction_id + 1) % max_transactions; -#ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(rpc) << "WARNING: transaction limit reached! Too many concurrent" - " messages! limit: " << (int)max_transactions; -#endif - update_oldest_transaction_id(); - } - - return tid; -} - -void rpc_manager::update_oldest_transaction_id() -{ - INVARIANT_CHECK; - - TORRENT_ASSERT(m_oldest_transaction_id != m_next_transaction_id); - while (!m_transactions[m_oldest_transaction_id]) - { - m_oldest_transaction_id = (m_oldest_transaction_id + 1) - % max_transactions; - if (m_oldest_transaction_id == m_next_transaction_id) - break; - } -} - void rpc_manager::add_our_id(entry& e) { e["id"] = m_our_id.to_string(); } -void rpc_manager::invoke(entry& e, udp::endpoint target_addr +bool rpc_manager::invoke(entry& e, udp::endpoint target_addr , observer_ptr o) { INVARIANT_CHECK; - if (m_destructing) - { - o->abort(); - return; - } + if (m_destructing) return false; e["y"] = "q"; entry& a = e["a"]; add_our_id(a); - TORRENT_ASSERT(!m_transactions[m_next_transaction_id]); -#ifdef TORRENT_DEBUG - int potential_new_id = m_next_transaction_id; -#endif -#ifndef BOOST_NO_EXCEPTIONS - try - { -#endif - std::string transaction_id; - transaction_id.resize(2); - char* out = &transaction_id[0]; - io::write_uint16(m_next_transaction_id, out); - e["t"] = transaction_id; + std::string transaction_id; + transaction_id.resize(2); + char* out = &transaction_id[0]; + io::write_uint16(m_next_transaction_id, out); + e["t"] = transaction_id; - o->set_target(target_addr); + o->set_target(target_addr); + o->set_transaction_id(m_next_transaction_id); #ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(rpc) << "Invoking " << e["q"].string() << " -> " << target_addr; -#endif - m_send(m_userdata, e, target_addr, 1); - new_transaction_id(o); -#ifndef BOOST_NO_EXCEPTIONS - } - catch (std::exception& e) - { - // m_send may fail with "no route to host" - TORRENT_ASSERT(potential_new_id == m_next_transaction_id); - o->abort(); - } + TORRENT_LOG(rpc) << "[" << o->m_algorithm.get() << "] invoking " + << e["q"].string() << " -> " << target_addr; #endif + + if (m_send(m_userdata, e, target_addr, 1)) + { + m_transactions.push_back(o); + ++m_next_transaction_id; + m_next_transaction_id %= max_transaction_id; +#ifdef TORRENT_DEBUG + o->m_was_sent = true; +#endif + } + return true; } -/* -void rpc_manager::reply(msg& m) + +observer::~observer() { - INVARIANT_CHECK; - - if (m_destructing) return; - - TORRENT_ASSERT(m.reply); - m.id = m_our_id; - - m_send(m); + // if the message was sent, it must have been + // reported back to the traversal_algorithm as + // well. If it wasn't sent, it cannot have been + // reported back + TORRENT_ASSERT(m_was_sent == m_done); + TORRENT_ASSERT(!m_in_constructor); } -*/ + } } // namespace libtorrent::dht diff --git a/src/kademlia/traversal_algorithm.cpp b/src/kademlia/traversal_algorithm.cpp index a414344bc..ddbe258e1 100644 --- a/src/kademlia/traversal_algorithm.cpp +++ b/src/kademlia/traversal_algorithm.cpp @@ -50,8 +50,6 @@ TORRENT_DEFINE_LOG(traversal) void traversal_algorithm::add_entry(node_id const& id, udp::endpoint addr, unsigned char flags) { - if (m_failed.find(addr) != m_failed.end()) return; - result entry(id, addr, flags); if (entry.id.is_all_zeros()) { @@ -76,7 +74,7 @@ void traversal_algorithm::add_entry(node_id const& id, udp::endpoint addr, unsig TORRENT_ASSERT(std::find_if(m_results.begin(), m_results.end() , bind(&result::id, _1) == id) == m_results.end()); #ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(traversal) << "adding result: " << id << " " << addr; + TORRENT_LOG(traversal) << "[" << this << "] adding result: " << id << " " << addr; #endif m_results.insert(i, entry); } @@ -84,12 +82,11 @@ void traversal_algorithm::add_entry(node_id const& id, udp::endpoint addr, unsig void traversal_algorithm::start() { - add_requests(); - // in case the routing table is empty, use the // router nodes in the table if (m_results.empty()) add_router_entries(); init(); + add_requests(); } boost::pool<>& traversal_algorithm::allocator() const @@ -101,20 +98,21 @@ void traversal_algorithm::traverse(node_id const& id, udp::endpoint addr) { #ifdef TORRENT_DHT_VERBOSE_LOGGING if (id.is_all_zeros()) - TORRENT_LOG(traversal) << time_now_string() << " WARNING: node returned a list which included a node with id 0"; + TORRENT_LOG(traversal) << time_now_string() << "[" << this << "] WARNING: " + "node returned a list which included a node with id 0"; #endif add_entry(id, addr, 0); } -void traversal_algorithm::finished(node_id const& id) +void traversal_algorithm::finished(udp::endpoint const& ep) { std::vector::iterator i = std::find_if( m_results.begin() , m_results.end() , bind( - std::equal_to() - , bind(&result::id, _1) - , id + std::equal_to() + , bind(&result::endpoint, _1) + , ep ) ); @@ -138,18 +136,19 @@ void traversal_algorithm::finished(node_id const& id) // prevent request means that the total number of requests has // overflown. This query failed because it was the oldest one. // So, if this is true, don't make another request -void traversal_algorithm::failed(node_id const& id, int flags) +void traversal_algorithm::failed(udp::endpoint const& ep, int flags) { TORRENT_ASSERT(m_invoke_count >= 0); - TORRENT_ASSERT(!id.is_all_zeros()); + if (m_results.empty()) return; + std::vector::iterator i = std::find_if( m_results.begin() , m_results.end() , bind( - std::equal_to() - , bind(&result::id, _1) - , id + std::equal_to() + , bind(&result::endpoint, _1) + , ep ) ); @@ -172,9 +171,10 @@ void traversal_algorithm::failed(node_id const& id, int flags) } else { - m_failed.insert(i->addr); + i->flags |= result::failed; #ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(traversal) << "failed: " << i->id << " " << i->addr; + TORRENT_LOG(traversal) << " [" << this << "] failed: " + << i->id << " " << i->endpoint(); #endif // if this flag is set, it means we increased the // branch factor for it, and we should restore it @@ -184,16 +184,12 @@ void traversal_algorithm::failed(node_id const& id, int flags) // don't tell the routing table about // node ids that we just generated ourself if ((i->flags & result::no_id) == 0) - m_node.m_table.node_failed(id); - m_results.erase(i); + m_node.m_table.node_failed(i->id); ++m_timeouts; --m_invoke_count; + TORRENT_ASSERT(m_invoke_count >= 0); } } - else - { - --m_invoke_count; - } if (flags & prevent_request) { @@ -228,22 +224,18 @@ void traversal_algorithm::add_requests() ) ); #ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(traversal) << "nodes left (" << this << "): " << (last_iterator() - i); + TORRENT_LOG(traversal) << " [" << this << "] nodes left (" + << this << "): " << (last_iterator() - i); #endif if (i == last_iterator()) break; -#ifndef BOOST_NO_EXCEPTIONS - try + if (invoke(i->id, i->endpoint())) { -#endif - invoke(i->id, i->addr); + TORRENT_ASSERT(m_invoke_count >= 0); ++m_invoke_count; i->flags |= result::queried; -#ifndef BOOST_NO_EXCEPTIONS } - catch (std::exception& e) {} -#endif } }