diff --git a/CMakeLists.txt b/CMakeLists.txt index 9de16c1f6..ad50ff465 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -66,7 +66,6 @@ set(sources # -- kademlia -- set(kademlia_sources - closest_nodes dht_tracker node refresh diff --git a/Jamfile b/Jamfile index efcaccd41..ac5bb2cd4 100755 --- a/Jamfile +++ b/Jamfile @@ -403,7 +403,6 @@ SOURCES = ; KADEMLIA_SOURCES = - closest_nodes dht_tracker node refresh diff --git a/include/libtorrent/Makefile.am b/include/libtorrent/Makefile.am index 8ad274d3f..e4e92bc84 100644 --- a/include/libtorrent/Makefile.am +++ b/include/libtorrent/Makefile.am @@ -99,7 +99,6 @@ nobase_include_HEADERS = \ extensions/ut_metadata.hpp \ extensions/ut_pex.hpp \ \ - kademlia/closest_nodes.hpp \ kademlia/dht_tracker.hpp \ kademlia/find_data.hpp \ kademlia/logging.hpp \ diff --git a/include/libtorrent/kademlia/closest_nodes.hpp b/include/libtorrent/kademlia/closest_nodes.hpp deleted file mode 100644 index 2ff1cd565..000000000 --- a/include/libtorrent/kademlia/closest_nodes.hpp +++ /dev/null @@ -1,104 +0,0 @@ -/* - -Copyright (c) 2006, Arvid Norberg & Daniel Wallin -All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions -are met: - - * Redistributions of source code must retain the above copyright - notice, this list of conditions and the following disclaimer. - * Redistributions in binary form must reproduce the above copyright - notice, this list of conditions and the following disclaimer in - the documentation and/or other materials provided with the distribution. - * Neither the name of the author nor the names of its - contributors may be used to endorse or promote products derived - from this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE -LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF -SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN -CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) -ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE -POSSIBILITY OF SUCH DAMAGE. - -*/ - -#ifndef CLOSEST_NODES_050323_HPP -#define CLOSEST_NODES_050323_HPP - -#include - -#include -#include -#include -#include -#include - -#include - -namespace libtorrent { namespace dht -{ - -class rpc_manager; - -// -------- closest nodes ----------- - -class closest_nodes : public traversal_algorithm -{ -public: - typedef boost::function< - void(std::vector const&) - > done_callback; - - closest_nodes( - node_impl& node - , node_id target - , done_callback const& callback - ); - - virtual char const* name() const { return "closest nodes"; } - -private: - void done(); - void invoke(node_id const& id, udp::endpoint addr); - - done_callback m_done_callback; -}; - -class closest_nodes_observer : public observer -{ -public: - closest_nodes_observer( - boost::intrusive_ptr const& algorithm - , node_id self) - : observer(algorithm->allocator()) - , m_algorithm(algorithm) - , m_self(self) - {} - ~closest_nodes_observer(); - - void send(msg& p) - { - p.info_hash = m_algorithm->target(); - } - - void timeout(); - void reply(msg const&); - void abort() { m_algorithm = 0; } - -private: - boost::intrusive_ptr m_algorithm; - node_id const m_self; -}; - -} } // namespace libtorrent::dht - -#endif // CLOSEST_NODES_050323_HPP - diff --git a/include/libtorrent/kademlia/dht_tracker.hpp b/include/libtorrent/kademlia/dht_tracker.hpp index 7be5cbb56..106dd1b64 100644 --- a/include/libtorrent/kademlia/dht_tracker.hpp +++ b/include/libtorrent/kademlia/dht_tracker.hpp @@ -77,6 +77,7 @@ namespace libtorrent { namespace dht { friend void intrusive_ptr_add_ref(dht_tracker const*); friend void intrusive_ptr_release(dht_tracker const*); + friend void send_callback(void* userdata, entry const& e, udp::endpoint const& addr, int flags); dht_tracker(libtorrent::aux::session_impl& ses, rate_limited_udp_socket& sock , dht_settings const& settings, entry const* state = 0); @@ -113,8 +114,8 @@ namespace libtorrent { namespace dht void refresh_timeout(error_code const& e); void tick(error_code const& e); - void on_bootstrap(); - void send_packet(msg const& m); + void on_bootstrap(std::vector > const&); + void send_packet(libtorrent::entry const& e, udp::endpoint const& addr, int send_flags); void incoming_error(char const* msg, lazy_entry const& e, udp::endpoint const& ep); @@ -170,13 +171,6 @@ namespace libtorrent { namespace dht int m_failed_announces; int m_total_message_input; - int m_az_message_input; - int m_ut_message_input; - int m_lt_message_input; - int m_mp_message_input; - int m_gr_message_input; - int m_mo_message_input; - int m_total_in_bytes; int m_total_out_bytes; diff --git a/include/libtorrent/kademlia/find_data.hpp b/include/libtorrent/kademlia/find_data.hpp index 37f62157b..9565004af 100644 --- a/include/libtorrent/kademlia/find_data.hpp +++ b/include/libtorrent/kademlia/find_data.hpp @@ -56,13 +56,14 @@ class node_impl; // -------- find data ----------- +//TODO: rename this to find_peers class find_data : public traversal_algorithm { public: typedef boost::function const&)> data_callback; typedef boost::function > const&)> nodes_callback; - void got_data(msg const* m); + void got_peers(std::vector const& peers); void got_write_token(node_id const& n, std::string const& write_token) { m_write_tokens[n] = write_token; } @@ -71,12 +72,16 @@ public: , nodes_callback const& ncallback); virtual char const* name() const { return "get_peers"; } + node_id const target() const { return m_target; } +protected: + + void done(); + private: - void done(); - void invoke(node_id const& id, udp::endpoint addr); + virtual void invoke(node_id const& id, udp::endpoint addr); data_callback m_data_callback; nodes_callback m_nodes_callback; @@ -96,20 +101,21 @@ public: , m_self(self) {} ~find_data_observer(); - - void send(msg& m) - { - m.reply = false; - m.message_id = messages::get_peers; - m.info_hash = m_algorithm->target(); - } - void timeout(); void reply(msg const&); void abort() { m_algorithm = 0; } + // with verbose logging, we log the size and + // offset of this structs members, so we need + // access to all of them +#ifndef TORRENT_DHT_VERBOSE_LOGGING private: +#endif boost::intrusive_ptr m_algorithm; + // the node this observer sent a message to + // this is used to mark the result in the right + // node when we get a response + // TODO: replace this with the observer this-pointer node_id const m_self; }; diff --git a/include/libtorrent/kademlia/msg.hpp b/include/libtorrent/kademlia/msg.hpp index 841fe00da..dda06548f 100644 --- a/include/libtorrent/kademlia/msg.hpp +++ b/include/libtorrent/kademlia/msg.hpp @@ -35,6 +35,7 @@ POSSIBILITY OF SUCH DAMAGE. #include #include +#include "libtorrent/lazy_entry.hpp" #if BOOST_VERSION < 103500 #include #else @@ -45,7 +46,7 @@ namespace libtorrent { namespace dht { typedef std::vector packet_t; - +/* namespace messages { enum { ping = 0, find_node = 1, get_peers = 2, announce_peer = 3, error = 4 }; @@ -74,10 +75,8 @@ struct msg // the message. udp::endpoint addr; // if this is a nodes response, these are the nodes - typedef std::vector nodes_t; nodes_t nodes; - typedef std::vector peers_t; peers_t peers; // similar to transaction_id but for write operations. @@ -94,7 +93,20 @@ struct msg int error_code; std::string error_msg; }; +*/ +typedef std::vector nodes_t; +typedef std::vector peers_t; + +struct msg +{ + // the message + lazy_entry const& message; + + // the address of the process sending or receiving + // the message. + udp::endpoint addr; +}; } } diff --git a/include/libtorrent/kademlia/node.hpp b/include/libtorrent/kademlia/node.hpp index b2d5f6059..a5a927d3f 100644 --- a/include/libtorrent/kademlia/node.hpp +++ b/include/libtorrent/kademlia/node.hpp @@ -41,6 +41,7 @@ POSSIBILITY OF SUCH DAMAGE. #include #include #include +#include #include #include @@ -108,13 +109,6 @@ public: , m_token(write_token) {} - void send(msg& m) - { - m.port = m_listen_port; - m.info_hash = m_info_hash; - m.write_token = m_token; - } - void timeout() {} void reply(msg const&) {} void abort() {} @@ -129,16 +123,16 @@ class node_impl : boost::noncopyable { typedef std::map table_t; public: - node_impl(libtorrent::aux::session_impl& ses, boost::function const& f - , dht_settings const& settings, boost::optional nid); + node_impl(libtorrent::aux::session_impl& ses + , void (*f)(void*, entry const&, udp::endpoint const&, int) + , dht_settings const& settings, boost::optional nid + , void* userdata); virtual ~node_impl() {} - void refresh(node_id const& id, boost::function0 f); + void refresh(node_id const& id, find_data::nodes_callback const& f); void bootstrap(std::vector const& nodes - , boost::function0 f); - void find_node(node_id const& id, boost::function< - void(std::vector const&)> f); + , find_data::nodes_callback const& f); void add_router_node(udp::endpoint router); void unreachable(udp::endpoint const& ep); @@ -173,8 +167,10 @@ public: void announce(sha1_hash const& info_hash, int listen_port , boost::function const&)> f); - bool verify_token(msg const& m); - std::string generate_token(msg const& m); + bool verify_token(std::string const& token, char const* info_hash + , udp::endpoint const& addr); + + std::string generate_token(udp::endpoint const& addr, char const* info_hash); // the returned time is the delay until connection_timeout() // should be called again the next time @@ -212,7 +208,7 @@ protected: // is called when a find data request is received. Should // return false if the data is not stored on this node. If // the data is stored, it should be serialized into 'data'. - bool on_find(msg const& m, std::vector& peers) const; + bool on_find(sha1_hash const& info_hash, std::vector& peers) const; // this is called when a store request is received. The data // is store-parameters and the data to be stored. @@ -233,7 +229,7 @@ private: // since it might have references to it std::set m_running_requests; - void incoming_request(msg const& h); + void incoming_request(msg const& h, entry& e); node_id m_id; @@ -250,6 +246,8 @@ private: int m_secret[2]; libtorrent::aux::session_impl& m_ses; + void (*m_send)(void*, entry const&, udp::endpoint const&, int); + void* m_userdata; }; diff --git a/include/libtorrent/kademlia/node_entry.hpp b/include/libtorrent/kademlia/node_entry.hpp index 807babaaa..2d5c6f706 100644 --- a/include/libtorrent/kademlia/node_entry.hpp +++ b/include/libtorrent/kademlia/node_entry.hpp @@ -79,6 +79,7 @@ struct node_entry udp::endpoint ep() const { return udp::endpoint(addr, port); } bool confirmed() const { return timeout_count == 0; } + // TODO: replace with a union of address_v4 and address_v6 address addr; boost::uint16_t port; // the number of times this node has failed to diff --git a/include/libtorrent/kademlia/observer.hpp b/include/libtorrent/kademlia/observer.hpp index 8d2f8a9ee..239e47cff 100644 --- a/include/libtorrent/kademlia/observer.hpp +++ b/include/libtorrent/kademlia/observer.hpp @@ -48,15 +48,26 @@ struct msg; TORRENT_EXPORT void intrusive_ptr_add_ref(observer const*); TORRENT_EXPORT void intrusive_ptr_release(observer const*); +// intended struct layout (on 32 bit architectures) +// offset size alignment field +// 0 8 8 sent +// 8 8 4 m_refs +// 16 4 4 pool_allocator +// 20 16 4 m_addr +// 36 2 2 m_port +// 38 1 1 m_is_v6, m_in_constructor +// 39 1 1 +// 40 + struct observer : boost::noncopyable { friend TORRENT_EXPORT void intrusive_ptr_add_ref(observer const*); friend TORRENT_EXPORT void intrusive_ptr_release(observer const*); observer(boost::pool<>& p) - : sent(time_now()) - , pool_allocator(p) + : m_sent() , m_refs(0) + , pool_allocator(p) { #ifdef TORRENT_DEBUG m_in_constructor = true; @@ -68,10 +79,6 @@ struct observer : boost::noncopyable TORRENT_ASSERT(!m_in_constructor); } - // these two callbacks lets the observer add - // information to the message before it's sent - virtual void send(msg& m) = 0; - // this is called when a reply is received virtual void reply(msg const& m) = 0; @@ -85,21 +92,38 @@ struct observer : boost::noncopyable // is being destructed virtual void abort() = 0; -#if TORRENT_USE_IPV6 - address target_addr; -#else - address_v4 target_addr; -#endif - boost::uint16_t port; - udp::endpoint target_ep() const { return udp::endpoint(target_addr, port); } - ptime sent; -#ifdef TORRENT_DEBUG - bool m_in_constructor; -#endif + ptime sent() const { return m_sent; } + + void set_target(udp::endpoint const& ep); + address target_addr() const; + udp::endpoint target_ep() const; + + // with verbose logging, we log the size and + // offset of this structs members, so we need + // access to all of them +#ifndef TORRENT_DHT_VERBOSE_LOGGING private: - boost::pool<>& pool_allocator; +#endif + + ptime m_sent; + // reference counter for intrusive_ptr mutable boost::detail::atomic_count m_refs; + boost::pool<>& pool_allocator; + union addr_t + { +#if TORRENT_USE_IPV6 + address_v6::bytes_type v6; +#endif + address_v4::bytes_type v4; + } m_addr; + + boost::uint16_t m_port; + + bool m_is_v6:1; +#ifdef TORRENT_DEBUG + bool m_in_constructor:1; +#endif }; typedef boost::intrusive_ptr observer_ptr; @@ -107,3 +131,4 @@ typedef boost::intrusive_ptr observer_ptr; } } #endif + diff --git a/include/libtorrent/kademlia/refresh.hpp b/include/libtorrent/kademlia/refresh.hpp index 36dd5964f..03ed6d993 100644 --- a/include/libtorrent/kademlia/refresh.hpp +++ b/include/libtorrent/kademlia/refresh.hpp @@ -33,119 +33,33 @@ POSSIBILITY OF SUCH DAMAGE. #ifndef REFRESH_050324_HPP #define REFRESH_050324_HPP -#include - #include #include -#include -#include +#include #include namespace libtorrent { namespace dht { -#ifdef TORRENT_DHT_VERBOSE_LOGGING -TORRENT_DECLARE_LOG(refresh); -#endif - class routing_table; class rpc_manager; -class refresh : public traversal_algorithm +class refresh : public find_data { public: - typedef std::vector::iterator InIt; - typedef boost::function done_callback; + typedef find_data::nodes_callback done_callback; - void ping_reply(node_id id); - void ping_timeout(node_id id, bool prevent_request = false); - - refresh(node_impl& node, node_id target, InIt first, InIt last + refresh(node_impl& node, node_id target , done_callback const& callback); - virtual char const* name() const { return "refresh"; } + virtual char const* name() const; private: - void done(); void invoke(node_id const& id, udp::endpoint addr); - - void invoke_pings_or_finish(bool prevent_request = false); - - int m_max_active_pings; - int m_active_pings; - - done_callback m_done_callback; - - std::vector::iterator m_leftover_nodes_iterator; }; -class refresh_observer : public observer -{ -public: - refresh_observer( - boost::intrusive_ptr const& algorithm - , node_id self) - : observer(algorithm->allocator()) - , m_algorithm(algorithm) - , m_self(self) - {} - ~refresh_observer(); - - void send(msg& m) - { - m.info_hash = m_algorithm->target(); - } - - void timeout(); - void reply(msg const& m); - void abort() { m_algorithm = 0; } - - -private: - boost::intrusive_ptr m_algorithm; - node_id const m_self; -}; - -class ping_observer : public observer -{ -public: - ping_observer( - boost::intrusive_ptr const& algorithm - , node_id self) - : observer(algorithm->allocator()) - , m_self(self) - , m_algorithm(algorithm) - {} - ~ping_observer(); - - void send(msg& p) {} - void timeout(); - void reply(msg const& m); - void abort() { m_algorithm = 0; } - - -private: - node_id const m_self; - boost::intrusive_ptr m_algorithm; -}; - -inline refresh::refresh( - node_impl& node - , node_id target - , refresh::InIt first - , refresh::InIt last - , done_callback const& callback) - : traversal_algorithm(node, target, first, last) - , m_max_active_pings(10) - , m_active_pings(0) - , m_done_callback(callback) -{ - boost::intrusive_ptr self(this); - add_requests(); -} - } } // namespace libtorrent::dht #endif // REFRESH_050324_HPP diff --git a/include/libtorrent/kademlia/rpc_manager.hpp b/include/libtorrent/kademlia/rpc_manager.hpp index 0a08be34d..ca86bf671 100644 --- a/include/libtorrent/kademlia/rpc_manager.hpp +++ b/include/libtorrent/kademlia/rpc_manager.hpp @@ -65,7 +65,6 @@ struct null_observer : public observer null_observer(boost::pool<>& allocator): observer(allocator) {} virtual void reply(msg const&) {} virtual void timeout() {} - virtual void send(msg&) {} void abort() {} }; @@ -74,11 +73,11 @@ class routing_table; class rpc_manager { public: - typedef boost::function1 fun; - typedef boost::function1 send_fun; + typedef void (*send_fun)(void* userdata, entry const&, udp::endpoint const&, int); - rpc_manager(fun const& incoming_fun, node_id const& our_id - , routing_table& table, send_fun const& sf); + rpc_manager(node_id const& our_id + , routing_table& table, send_fun const& sf + , void* userdata); ~rpc_manager(); void unreachable(udp::endpoint const& ep); @@ -87,10 +86,10 @@ public: bool incoming(msg const&); time_duration tick(); - void invoke(int message_id, udp::endpoint target + void invoke(entry& e, udp::endpoint target , observer_ptr o); - void reply(msg& m); + void add_our_id(entry& e); #ifdef TORRENT_DEBUG size_t allocation_size() const; @@ -124,8 +123,8 @@ private: // waiting for to time out int m_oldest_transaction_id; - fun m_incoming; send_fun m_send; + void* m_userdata; node_id m_our_id; routing_table& m_table; ptime m_timer; diff --git a/include/libtorrent/kademlia/traversal_algorithm.hpp b/include/libtorrent/kademlia/traversal_algorithm.hpp index 405b9118b..9f2f62eca 100644 --- a/include/libtorrent/kademlia/traversal_algorithm.hpp +++ b/include/libtorrent/kademlia/traversal_algorithm.hpp @@ -67,21 +67,11 @@ public: void status(dht_lookup& l); virtual char const* name() const { return "traversal_algorithm"; } + virtual void start(); node_id const& target() const { return m_target; } -protected: - - template - traversal_algorithm(node_impl& node, node_id target, InIt start, InIt end); - - void add_requests(); void add_entry(node_id const& id, udp::endpoint addr, unsigned char flags); - void add_router_entries(); - void init(); - - virtual void done() = 0; - virtual void invoke(node_id const& id, udp::endpoint addr) = 0; struct result { @@ -89,11 +79,33 @@ protected: : id(id), addr(addr), flags(f) {} node_id id; + // TODO: replace with union of address_v4 and address_v6 and a port udp::endpoint addr; enum { queried = 1, initial = 2, no_id = 4 }; unsigned char flags; }; +protected: + + traversal_algorithm::traversal_algorithm( + node_impl& node + , node_id target) + : m_ref_count(0) + , m_node(node) + , m_target(target) + , m_invoke_count(0) + , m_branch_factor(3) + , m_responses(0) + , m_timeouts(0) + {} + + void add_requests(); + void add_router_entries(); + void init(); + + virtual void done() = 0; + virtual void invoke(node_id const& id, udp::endpoint addr) = 0; + std::vector::iterator last_iterator(); friend void intrusive_ptr_add_ref(traversal_algorithm* p) @@ -119,33 +131,6 @@ protected: int m_timeouts; }; -template -traversal_algorithm::traversal_algorithm( - node_impl& node - , node_id target - , InIt start // <- nodes to initiate traversal with - , InIt end) - : m_ref_count(0) - , m_node(node) - , m_target(target) - , m_invoke_count(0) - , m_branch_factor(3) - , m_responses(0) - , m_timeouts(0) -{ - using boost::bind; - - for (InIt i = start; i != end; ++i) - { - add_entry(i->id, udp::endpoint(i->addr, i->port), result::initial); - } - - // in case the routing table is empty, use the - // router nodes in the table - if (start == end) add_router_entries(); - init(); -} - } } // namespace libtorrent::dht #endif // TRAVERSAL_ALGORITHM_050324_HPP diff --git a/include/libtorrent/lazy_entry.hpp b/include/libtorrent/lazy_entry.hpp index 9bea1a225..cb5678181 100644 --- a/include/libtorrent/lazy_entry.hpp +++ b/include/libtorrent/lazy_entry.hpp @@ -230,7 +230,7 @@ namespace libtorrent char const* m_end; }; - TORRENT_EXPORT std::string print_entry(lazy_entry const& e); + TORRENT_EXPORT std::string print_entry(lazy_entry const& e, bool single_line = false); #if TORRENT_USE_IOSTREAM TORRENT_EXPORT std::ostream& operator<<(std::ostream& os, lazy_entry const& e); #endif diff --git a/include/libtorrent/socket_io.hpp b/include/libtorrent/socket_io.hpp index e21afa12b..2cbaeb0ce 100644 --- a/include/libtorrent/socket_io.hpp +++ b/include/libtorrent/socket_io.hpp @@ -36,6 +36,7 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/socket.hpp" #include "libtorrent/io.hpp" #include "libtorrent/error_code.hpp" +#include "libtorrent/lazy_entry.hpp" #include namespace libtorrent @@ -109,6 +110,27 @@ namespace libtorrent return Endpoint(addr, port); } #endif + + template + void read_endpoint_list(libtorrent::lazy_entry const* n, std::vector& epl) + { + using namespace libtorrent; + if (n->type() != lazy_entry::list_t) return; + for (int i = 0; i < n->list_size(); ++i) + { + lazy_entry const* e = n->list_at(i); + if (e->type() != lazy_entry::string_t) return; + if (e->string_length() < 6) continue; + char const* in = e->string_ptr(); + if (e->string_length() == 6) + epl.push_back(read_v4_endpoint(in)); +#if TORRENT_USE_IPV6 + else if (e->string_length() == 18) + epl.push_back(read_v6_endpoint(in)); +#endif + } + } + } diff --git a/src/Makefile.am b/src/Makefile.am index ce94c5664..7e72f5046 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -2,7 +2,6 @@ lib_LTLIBRARIES = libtorrent-rasterbar.la if ENABLE_DHT KADEMLIA_SOURCES = \ - kademlia/closest_nodes.cpp \ kademlia/dht_tracker.cpp \ kademlia/find_data.cpp \ kademlia/node.cpp \ diff --git a/src/kademlia/closest_nodes.cpp b/src/kademlia/closest_nodes.cpp deleted file mode 100644 index 646155659..000000000 --- a/src/kademlia/closest_nodes.cpp +++ /dev/null @@ -1,120 +0,0 @@ -/* - -Copyright (c) 2006, Arvid Norberg & Daniel Wallin -All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions -are met: - - * Redistributions of source code must retain the above copyright - notice, this list of conditions and the following disclaimer. - * Redistributions in binary form must reproduce the above copyright - notice, this list of conditions and the following disclaimer in - the documentation and/or other materials provided with the distribution. - * Neither the name of the author nor the names of its - contributors may be used to endorse or promote products derived - from this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE -LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF -SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN -CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) -ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE -POSSIBILITY OF SUCH DAMAGE. - -*/ - -#include "libtorrent/pch.hpp" - -#include -#include -#include -#include -#include "libtorrent/assert.hpp" - -namespace libtorrent { namespace dht -{ - -closest_nodes_observer::~closest_nodes_observer() -{ - if (m_algorithm) m_algorithm->failed(m_self, true); -} - -void closest_nodes_observer::reply(msg const& in) -{ - if (!m_algorithm) - { - TORRENT_ASSERT(false); - return; - } - - if (!in.nodes.empty()) - { - for (msg::nodes_t::const_iterator i = in.nodes.begin() - , end(in.nodes.end()); i != end; ++i) - { - m_algorithm->traverse(i->id, i->ep()); - } - } - m_algorithm->finished(m_self); - m_algorithm = 0; -} - -void closest_nodes_observer::timeout() -{ - if (!m_algorithm) return; - m_algorithm->failed(m_self); - m_algorithm = 0; -} - -closest_nodes::closest_nodes( - node_impl& node - , node_id target - , done_callback const& callback) - : traversal_algorithm(node, target, node.m_table.begin(), node.m_table.end()) - , m_done_callback(callback) -{ - boost::intrusive_ptr self(this); - add_requests(); -} - -void closest_nodes::invoke(node_id const& id, udp::endpoint addr) -{ - TORRENT_ASSERT(m_node.m_rpc.allocation_size() >= sizeof(closest_nodes_observer)); - void* ptr = m_node.m_rpc.allocator().malloc(); - if (ptr == 0) - { - done(); - return; - } - m_node.m_rpc.allocator().set_next_size(10); - observer_ptr o(new (ptr) closest_nodes_observer(this, id)); -#ifdef TORRENT_DEBUG - o->m_in_constructor = false; -#endif - m_node.m_rpc.invoke(messages::find_node, addr, o); -} - -void closest_nodes::done() -{ - std::vector results; - int num_results = m_node.m_table.bucket_size(); - for (std::vector::iterator i = m_results.begin() - , end(m_results.end()); i != end && num_results > 0; ++i) - { - if (i->flags & result::no_id) continue; - if ((i->flags & result::queried) == 0) continue; - results.push_back(node_entry(i->id, i->addr)); - --num_results; - } - m_done_callback(results); -} - -} } // namespace libtorrent::dht - diff --git a/src/kademlia/dht_tracker.cpp b/src/kademlia/dht_tracker.cpp index 37d7ff63d..8d6d555ea 100644 --- a/src/kademlia/dht_tracker.cpp +++ b/src/kademlia/dht_tracker.cpp @@ -62,7 +62,6 @@ using libtorrent::dht::node_impl; using libtorrent::dht::node_id; using libtorrent::dht::packet_t; using libtorrent::dht::msg; -namespace messages = libtorrent::dht::messages; using namespace libtorrent::detail; enum @@ -85,26 +84,6 @@ namespace } }; - template - void read_endpoint_list(libtorrent::lazy_entry const* n, std::vector& epl) - { - using namespace libtorrent; - if (n->type() != lazy_entry::list_t) return; - for (int i = 0; i < n->list_size(); ++i) - { - lazy_entry const* e = n->list_at(i); - if (e->type() != lazy_entry::string_t) return; - if (e->string_length() < 6) continue; - char const* in = e->string_ptr(); - if (e->string_length() == 6) - epl.push_back(read_v4_endpoint(in)); -#if TORRENT_USE_IPV6 - else if (e->string_length() == 18) - epl.push_back(read_v6_endpoint(in)); -#endif - } - } - template void read_endpoint_list(libtorrent::entry const* n, std::vector& epl) { @@ -132,6 +111,16 @@ namespace namespace libtorrent { namespace dht { +#ifdef TORRENT_DHT_VERBOSE_LOGGING + int g_az_message_input = 0; + int g_ut_message_input = 0; + int g_lt_message_input = 0; + int g_mp_message_input = 0; + int g_gr_message_input = 0; + int g_mo_message_input = 0; + int g_unknown_message_input = 0; +#endif + void intrusive_ptr_add_ref(dht_tracker const* c) { TORRENT_ASSERT(c != 0); @@ -147,6 +136,55 @@ namespace libtorrent { namespace dht delete c; } +#ifdef TORRENT_DHT_VERBOSE_LOGGING + std::string parse_dht_client(lazy_entry const& e) + { + lazy_entry const* ver = e.dict_find_string("v"); + if (!ver) return "generic"; + std::string const& client = ver->string_value(); + if (client.size() < 2) + { + ++g_unknown_message_input; + return client; + } + else if (std::equal(client.begin(), client.begin() + 2, "Az")) + { + ++g_az_message_input; + return "Azureus"; + } + else if (std::equal(client.begin(), client.begin() + 2, "UT")) + { + ++g_ut_message_input; + return "uTorrent"; + } + else if (std::equal(client.begin(), client.begin() + 2, "LT")) + { + ++g_lt_message_input; + return "libtorrent"; + } + else if (std::equal(client.begin(), client.begin() + 2, "MP")) + { + ++g_mp_message_input; + return "MooPolice"; + } + else if (std::equal(client.begin(), client.begin() + 2, "GR")) + { + ++g_gr_message_input; + return "GetRight"; + } + else if (std::equal(client.begin(), client.begin() + 2, "MO")) + { + ++g_mo_message_input; + return "Mono Torrent"; + } + else + { + ++g_unknown_message_input; + return client; + } + } +#endif + #ifdef TORRENT_DHT_VERBOSE_LOGGING TORRENT_DEFINE_LOG(dht_tracker) #endif @@ -168,11 +206,17 @@ namespace libtorrent { namespace dht return boost::optional(node_id(nid->string().c_str())); } + void send_callback(void* userdata, entry const& e, udp::endpoint const& addr, int flags) + { + dht_tracker* self = (dht_tracker*)userdata; + self->send_packet(e, addr, flags); + } + // class that puts the networking and the kademlia node in a single // unit and connecting them together. dht_tracker::dht_tracker(libtorrent::aux::session_impl& ses, rate_limited_udp_socket& sock , dht_settings const& settings, entry const* state) - : m_dht(ses, bind(&dht_tracker::send_packet, this, _1), settings, extract_node_id(state)) + : m_dht(ses, &send_callback, settings, extract_node_id(state), this) , m_ses(ses) , m_sock(sock) , m_last_new_key(time_now() - minutes(key_refresh)) @@ -198,19 +242,13 @@ namespace libtorrent { namespace dht m_announces = 0; m_failed_announces = 0; m_total_message_input = 0; - m_az_message_input = 0; - m_ut_message_input = 0; - m_lt_message_input = 0; - m_mp_message_input = 0; - m_gr_message_input = 0; - m_mo_message_input = 0; m_total_in_bytes = 0; m_total_out_bytes = 0; m_queries_out_bytes = 0; // turns on and off individual components' logging - rpc_log().enable(false); +// rpc_log().enable(false); // node_log().enable(false); traversal_log().enable(false); // dht_tracker_log.enable(false); @@ -219,21 +257,6 @@ namespace libtorrent { namespace dht #endif } - void dht_tracker::incoming_error(char const* message, lazy_entry const& e, udp::endpoint const& ep) - { -#ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(dht_tracker) << "ERROR: '" << message << "' " << e; -#endif - msg reply; - reply.reply = true; - reply.message_id = messages::error; - reply.error_code = 203; // Protocol error - reply.error_msg = message; - reply.addr = ep; - reply.transaction_id = ""; - send_packet(reply); - } - void dht_tracker::start(entry const& bootstrap) { std::vector initial_nodes; @@ -262,8 +285,7 @@ namespace libtorrent { namespace dht m_refresh_timer.expires_from_now(seconds(5), ec); m_refresh_timer.async_wait(bind(&dht_tracker::refresh_timeout, self(), _1)); - - m_dht.bootstrap(initial_nodes, bind(&dht_tracker::on_bootstrap, self())); + m_dht.bootstrap(initial_nodes, bind(&dht_tracker::on_bootstrap, self(), _1)); } void dht_tracker::stop() @@ -387,12 +409,12 @@ namespace libtorrent { namespace dht << "\t" << m_announces / float(tick_period) << "\t" << m_failed_announces / float(tick_period) << "\t" << (m_total_message_input / float(tick_period)) - << "\t" << (m_az_message_input / float(tick_period)) - << "\t" << (m_ut_message_input / float(tick_period)) - << "\t" << (m_lt_message_input / float(tick_period)) - << "\t" << (m_mp_message_input / float(tick_period)) - << "\t" << (m_gr_message_input / float(tick_period)) - << "\t" << (m_mo_message_input / float(tick_period)) + << "\t" << (g_az_message_input / float(tick_period)) + << "\t" << (g_ut_message_input / float(tick_period)) + << "\t" << (g_lt_message_input / float(tick_period)) + << "\t" << (g_mp_message_input / float(tick_period)) + << "\t" << (g_gr_message_input / float(tick_period)) + << "\t" << (g_mo_message_input / float(tick_period)) << "\t" << (m_total_in_bytes / float(tick_period*60)) << "\t" << (m_total_out_bytes / float(tick_period*60)) << "\t" << (m_queries_out_bytes / float(tick_period*60)) @@ -405,9 +427,13 @@ namespace libtorrent { namespace dht m_announces = 0; m_failed_announces = 0; m_total_message_input = 0; - m_az_message_input = 0; - m_ut_message_input = 0; - m_lt_message_input = 0; + g_az_message_input = 0; + g_ut_message_input = 0; + g_lt_message_input = 0; + g_mp_message_input = 0; + g_gr_message_input = 0; + g_mo_message_input = 0; + g_unknown_message_input = 0; m_total_in_bytes = 0; m_total_out_bytes = 0; m_queries_out_bytes = 0; @@ -459,7 +485,7 @@ namespace libtorrent { namespace dht #ifdef TORRENT_DHT_VERBOSE_LOGGING if (match->count == 20) { - TORRENT_LOG(dht_tracker) << time_now_string() << " BANNING PEER [ ip: " + TORRENT_LOG(dht_tracker) << " BANNING PEER [ ip: " << ep << " time: " << total_milliseconds((now - match->limit) + seconds(5)) / 1000.f << " count: " << match->count << " ]"; } @@ -493,385 +519,35 @@ namespace libtorrent { namespace dht TORRENT_ASSERT(bytes_transferred > 0); + extern void incoming_error(entry& e, char const* msg); + lazy_entry e; int ret = lazy_bdecode(buf, buf + bytes_transferred, e); if (ret != 0) { - incoming_error("invalid bencoding", e, ep); + TORRENT_LOG(dht_tracker) << "<== " << ep << " ERROR: Invalid bencoding"; return; } -#ifdef TORRENT_DHT_VERBOSE_LOGGING - std::stringstream log_line; - log_line << "RECEIVED [" - " ip: " << ep; -#endif + libtorrent::dht::msg m = {e, ep}; if (e.type() != lazy_entry::dict_t) { - incoming_error("message is not a dictionary", e, ep); - return; - } - - libtorrent::dht::msg m; - m.message_id = 0; - m.addr = ep; - - lazy_entry const* transaction = e.dict_find_string("t"); - if (!transaction) - { - incoming_error("missing or invalid transaction id", e, ep); - return; - } - - m.transaction_id = transaction->string_value(); - #ifdef TORRENT_DHT_VERBOSE_LOGGING - lazy_entry const* ver = e.dict_find_string("v"); - if (!ver) - { - log_line << " c: generic"; - } - else - { - std::string const& client = ver->string_value(); - if (client.size() < 2) - { - log_line << " c: " << client; - } - else if (std::equal(client.begin(), client.begin() + 2, "Az")) - { - ++m_az_message_input; - log_line << " c: Azureus"; - } - else if (std::equal(client.begin(), client.begin() + 2, "UT")) - { - ++m_ut_message_input; - log_line << " c: uTorrent"; - } - else if (std::equal(client.begin(), client.begin() + 2, "LT")) - { - ++m_lt_message_input; - log_line << " c: libtorrent"; - } - else if (std::equal(client.begin(), client.begin() + 2, "MP")) - { - ++m_mp_message_input; - log_line << " c: MooPolice"; - } - else if (std::equal(client.begin(), client.begin() + 2, "GR")) - { - ++m_gr_message_input; - log_line << " c: GetRight"; - } - else if (std::equal(client.begin(), client.begin() + 2, "MO")) - { - ++m_mo_message_input; - log_line << " c: Mono Torrent"; - } - else - { - log_line << " c: " << client; - } - } + TORRENT_LOG(dht_tracker) << "<== " << ep << " ERROR: not a dictionary: " + << print_entry(e, true); #endif - - lazy_entry const* y = e.dict_find_string("y"); - if (!y || y->string_length() < 1) - { - incoming_error("missing or invalid message type", e, ep); - return; - } - - char msg_type = *y->string_ptr(); - - if (msg_type == 'r') - { -#ifdef TORRENT_DHT_VERBOSE_LOGGING - log_line << " t: " << to_hex(m.transaction_id); -#endif - - m.reply = true; - lazy_entry const* r = e.dict_find_dict("r"); - if (!r) - { - incoming_error("missing or invalid reply dict", e, ep); - return; - } - - lazy_entry const* id = r->dict_find_string("id"); - if (!id) - { - incoming_error("missing or invalid id", e, ep); - return; - } - if (id->string_length() != 20) - { - incoming_error("invalid node id (not 20 bytes)", e, ep); - return; - } - std::copy(id->string_ptr(), id->string_ptr() - + id->string_length(), m.id.begin()); - -#ifdef TORRENT_DHT_VERBOSE_LOGGING - log_line << " id: " << m.id; -#endif - lazy_entry const* n = r->dict_find_list("values"); - if (n) - { - m.peers.clear(); - if (n->list_size() == 1 && n->list_at(0)->type() == lazy_entry::string_t) - { - // assume it's mainline format - char const* peers = n->list_at(0)->string_ptr(); - char const* end = peers + n->list_at(0)->string_length(); - - while (end - peers >= 6) - m.peers.push_back(read_v4_endpoint(peers)); - } - else - { - // assume it's uTorrent/libtorrent format - read_endpoint_list(n, m.peers); - } -#ifdef TORRENT_DHT_VERBOSE_LOGGING - log_line << " p: " << m.peers.size(); -#endif - } - - m.nodes.clear(); - n = r->dict_find_string("nodes"); - if (n) - { - char const* nodes = n->string_ptr(); - char const* end = nodes + n->string_length(); - - while (end - nodes >= 26) - { - node_id id; - std::copy(nodes, nodes + 20, id.begin()); - nodes += 20; - m.nodes.push_back(libtorrent::dht::node_entry( - id, read_v4_endpoint(nodes))); - } - -#ifdef TORRENT_DHT_VERBOSE_LOGGING - log_line << " n: " << m.nodes.size(); -#endif - } - - n = r->dict_find_list("nodes2"); - if (n) - { - for (int i = 0; i < n->list_size(); ++i) - { - lazy_entry const* p = n->list_at(0); - if (p->type() != lazy_entry::string_t) continue; - if (p->string_length() < 6 + 20) continue; - char const* in = p->string_ptr(); - - node_id id; - std::copy(in, in + 20, id.begin()); - in += 20; - if (p->string_length() == 6 + 20) - m.nodes.push_back(libtorrent::dht::node_entry( - id, read_v4_endpoint(in))); -#if TORRENT_USE_IPV6 - else if (p->string_length() == 18 + 20) - m.nodes.push_back(libtorrent::dht::node_entry( - id, read_v6_endpoint(in))); -#endif - } -#ifdef TORRENT_DHT_VERBOSE_LOGGING - log_line << " n2: " << m.nodes.size(); -#endif - } - - lazy_entry const* token = r->dict_find_string("token"); - if (token) - { - m.write_token = token->string_value(); - TORRENT_ASSERT(m.write_token.size() == token->string_length()); -#ifdef TORRENT_DHT_VERBOSE_LOGGING - log_line << " token: " << to_hex(m.write_token); -#endif - } - } - else if (msg_type == 'q') - { - m.reply = false; - lazy_entry const* a = e.dict_find_dict("a"); - if (!a) - { - incoming_error("missing or invalid argument dictionary", e, ep); - return; - } - - lazy_entry const* id = a->dict_find_string("id"); - if (!id) - { - incoming_error("missing or invalid node id", e, ep); - return; - } - if (id->string_length() != 20) - { - incoming_error("invalid node id (not 20 bytes)", e, ep); - return; - } - std::copy(id->string_ptr(), id->string_ptr() - + id->string_length(), m.id.begin()); - - lazy_entry const* q = e.dict_find_string("q"); - if (!q) - { - incoming_error("invalid or missing query string", e, ep); - return; - } - std::string request_kind = q->string_value(); -#ifdef TORRENT_DHT_VERBOSE_LOGGING - log_line << " q: " << request_kind; -#endif - - if (request_kind == "ping") - { - m.message_id = libtorrent::dht::messages::ping; - } - else if (request_kind == "find_node") - { - lazy_entry const* target = a->dict_find_string("target"); - if (!target) - { - incoming_error("missing or invalid target", e, ep); - return; - } - - if (target->string_length() != 20) - { - incoming_error("invalid target (not 20 bytes)", e, ep); - return; - } - std::copy(target->string_ptr(), target->string_ptr() - + target->string_length(), m.info_hash.begin()); -#ifdef TORRENT_DHT_VERBOSE_LOGGING - log_line << " t: " << boost::lexical_cast(m.info_hash); -#endif - - m.message_id = libtorrent::dht::messages::find_node; - } - else if (request_kind == "get_peers") - { - lazy_entry const* info_hash = a->dict_find_string("info_hash"); - if (!info_hash) - { - incoming_error("missing or invalid info_hash", e, ep); - return; - } - - if (info_hash->string_length() != 20) - { - incoming_error("invalid info_hash (not 20 bytes)", e, ep); - return; - } - std::copy(info_hash->string_ptr(), info_hash->string_ptr() - + info_hash->string_length(), m.info_hash.begin()); - m.message_id = libtorrent::dht::messages::get_peers; -#ifdef TORRENT_DHT_VERBOSE_LOGGING - log_line << " ih: " << boost::lexical_cast(m.info_hash); -#endif - } - else if (request_kind == "announce_peer") - { -#ifdef TORRENT_DHT_VERBOSE_LOGGING - ++m_announces; -#endif - lazy_entry const* info_hash = a->dict_find_string("info_hash"); - if (!info_hash) - { - incoming_error("missing or invalid info_hash", e, ep); - return; - } - - if (info_hash->string_length() != 20) - { - incoming_error("invalid info_hash (not 20 bytes)", e, ep); - return; - } - std::copy(info_hash->string_ptr(), info_hash->string_ptr() - + info_hash->string_length(), m.info_hash.begin()); - m.port = a->dict_find_int_value("port", -1); - if (m.port == -1) - { - incoming_error("missing or invalid port in announce_peer message", e, ep); - return; - } - lazy_entry const* token = a->dict_find_string("token"); - if (!token) - { - incoming_error("missing or invalid token in announce peer", e, ep); - return; - } - m.write_token = token->string_value(); - m.message_id = libtorrent::dht::messages::announce_peer; -#ifdef TORRENT_DHT_VERBOSE_LOGGING - log_line << " token: " << to_hex(m.write_token); - log_line << " ih: " << boost::lexical_cast(m.info_hash); - log_line << " p: " << m.port; - - if (!m_dht.verify_token(m)) - ++m_failed_announces; -#endif - } - else - { - incoming_error("unknown query", e, ep); - return; - } - } - else if (msg_type == 'e') - { - m.message_id = messages::error; - m.error_code = 0; - lazy_entry const* list = e.dict_find_list("e"); - if (!list) - { - list = e.dict_find_string("e"); - if (!list) - { - incoming_error("missing or invalid 'e' in error message", e, ep); - return; - } - m.error_msg = list->string_value(); - } - else - { - if (list->list_size() > 0 && list->list_at(0)->type() == lazy_entry::int_t) - m.error_code = list->list_at(0)->int_value(); - if (list->list_size() > 1 && list->list_at(1)->type() == lazy_entry::string_t) - m.error_msg = list->list_at(1)->string_value(); - } -#ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(dht_tracker) << log_line.str() << " ]"; - TORRENT_LOG(dht_tracker) << "ERROR: incoming error: " << m.error_code - << " " << m.error_msg; -#endif - return; - } - else - { - incoming_error("unknown message", e, ep); + entry r; + incoming_error(r, "message is not a dictionary"); + send_packet(r, ep, 0); return; } #ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(dht_tracker) << log_line.str() << " ]"; -// TORRENT_LOG(dht_tracker) << std::string(buf, buf + bytes_transferred); - if (!m.reply) - { - ++m_queries_received[m.message_id]; - m_queries_bytes_received[m.message_id] += int(bytes_transferred); - } + parse_dht_client(e); + TORRENT_LOG(dht_tracker) << "<== " << ep << " " << print_entry(e, true); #endif - TORRENT_ASSERT(m.message_id != messages::error); + m_dht.incoming(m); } @@ -945,217 +621,52 @@ namespace libtorrent { namespace dht m_dht.add_router_node(host->endpoint()); } - void dht_tracker::on_bootstrap() + void dht_tracker::on_bootstrap(std::vector > const&) {} - namespace - { - void write_nodes_entry(entry& r, libtorrent::dht::msg const& m) - { - bool ipv6_nodes = false; - entry& n = r["nodes"]; - std::back_insert_iterator out(n.string()); - for (msg::nodes_t::const_iterator i = m.nodes.begin() - , end(m.nodes.end()); i != end; ++i) - { - if (!i->addr.is_v4()) - { - ipv6_nodes = true; - continue; - } - std::copy(i->id.begin(), i->id.end(), out); - write_endpoint(udp::endpoint(i->addr, i->port), out); - } - - if (ipv6_nodes) - { - entry& p = r["nodes2"]; - std::string endpoint; - for (msg::nodes_t::const_iterator i = m.nodes.begin() - , end(m.nodes.end()); i != end; ++i) - { - if (!i->addr.is_v6()) continue; - endpoint.resize(18 + 20); - std::string::iterator out = endpoint.begin(); - std::copy(i->id.begin(), i->id.end(), out); - out += 20; - write_endpoint(udp::endpoint(i->addr, i->port), out); - endpoint.resize(out - endpoint.begin()); - p.list().push_back(entry(endpoint)); - } - } - } - } - - void dht_tracker::send_packet(msg const& m) + void dht_tracker::send_packet(libtorrent::entry const& e, udp::endpoint const& addr, int send_flags) { using libtorrent::bencode; using libtorrent::entry; - int send_flags = 0; - entry e(entry::dictionary_t); - TORRENT_ASSERT(!m.transaction_id.empty() || m.message_id == messages::error); - e["t"] = m.transaction_id; - static char const version_str[] = {'L', 'T' - , LIBTORRENT_VERSION_MAJOR, LIBTORRENT_VERSION_MINOR}; - e["v"] = std::string(version_str, version_str + 4); - -#ifdef TORRENT_DHT_VERBOSE_LOGGING - std::stringstream log_line; - log_line << "SENDING [ ip: " << m.addr - << " t: " << to_hex(m.transaction_id); -#endif - - if (m.message_id == messages::error) - { - TORRENT_ASSERT(m.reply); - e["y"] = "e"; - entry error_list(entry::list_t); - TORRENT_ASSERT(m.error_code > 200 && m.error_code <= 204); - error_list.list().push_back(entry(m.error_code)); - error_list.list().push_back(entry(m.error_msg)); - e["e"] = error_list; -#ifdef TORRENT_DHT_VERBOSE_LOGGING - log_line << " err: " << m.error_code - << " msg: " << m.error_msg; -#endif - } - else if (m.reply) - { - e["y"] = "r"; - e["r"] = entry(entry::dictionary_t); - entry& r = e["r"]; - r["id"] = std::string((char*)m.id.begin(), (char*)m.id.end()); - if (!m.write_token.empty()) - { - r["token"] = m.write_token; -#ifdef TORRENT_DHT_VERBOSE_LOGGING - log_line << " token: " << to_hex(m.write_token); -#endif - } - -#ifdef TORRENT_DHT_VERBOSE_LOGGING - log_line << " r: " << messages::ids[m.message_id] - << " id: " << m.id; -#endif - - switch (m.message_id) - { - case messages::ping: - break; - case messages::find_node: - { - write_nodes_entry(r, m); - break; - } - case messages::get_peers: - { - write_nodes_entry(r, m); - - if (!m.peers.empty()) - { - r["values"] = entry(entry::list_t); - entry& p = r["values"]; - std::string endpoint; - for (msg::peers_t::const_iterator i = m.peers.begin() - , end(m.peers.end()); i != end; ++i) - { - endpoint.resize(18); - std::string::iterator out = endpoint.begin(); - write_endpoint(*i, out); - endpoint.resize(out - endpoint.begin()); - p.list().push_back(entry(endpoint)); - } -#ifdef TORRENT_DHT_VERBOSE_LOGGING - log_line << " values: " << m.peers.size(); -#endif - } - break; - } - - case messages::announce_peer: - break; - break; - } - } - else - { - // set bit 1 of send_flags to indicate that - // this packet should not be dropped by the - // rate limiter. - e["y"] = "q"; - e["a"] = entry(entry::dictionary_t); - entry& a = e["a"]; - a["id"] = std::string((char*)m.id.begin(), (char*)m.id.end()); - - TORRENT_ASSERT(m.message_id <= messages::error); - e["q"] = messages::ids[m.message_id]; - -#ifdef TORRENT_DHT_VERBOSE_LOGGING - log_line << " q: " << messages::ids[m.message_id] - << " id: " << m.id; -#endif - - switch (m.message_id) - { - case messages::find_node: - { - send_flags = 1; - a["target"] = std::string((char*)m.info_hash.begin(), (char*)m.info_hash.end()); -#ifdef TORRENT_DHT_VERBOSE_LOGGING - log_line << " target: " << boost::lexical_cast(m.info_hash); -#endif - break; - } - case messages::get_peers: - { - send_flags = 1; - a["info_hash"] = std::string((char*)m.info_hash.begin(), (char*)m.info_hash.end()); -#ifdef TORRENT_DHT_VERBOSE_LOGGING - log_line << " ih: " << boost::lexical_cast(m.info_hash); -#endif - break; - } - case messages::announce_peer: - send_flags = 1; - a["port"] = m.port; - a["info_hash"] = std::string((char*)m.info_hash.begin(), (char*)m.info_hash.end()); - a["token"] = m.write_token; -#ifdef TORRENT_DHT_VERBOSE_LOGGING - log_line << " port: " << m.port - << " ih: " << boost::lexical_cast(m.info_hash) - << " token: " << to_hex(m.write_token); -#endif - break; - default: break; - } - - } m_send_buf.clear(); bencode(std::back_inserter(m_send_buf), e); + +#ifdef TORRENT_DHT_VERBOSE_LOGGING + std::stringstream log_line; + lazy_entry print; + int ret = lazy_bdecode(&m_send_buf[0], &m_send_buf[0] + m_send_buf.size(), print); + TORRENT_ASSERT(ret == 0); + log_line << print_entry(print, true); +#endif + error_code ec; - if (m_sock.send(m.addr, &m_send_buf[0], (int)m_send_buf.size(), ec, send_flags)) + if (m_sock.send(addr, &m_send_buf[0], (int)m_send_buf.size(), ec, send_flags)) { // account for IP and UDP overhead - m_sent_bytes += m_send_buf.size() + (m.addr.address().is_v6() ? 48 : 28); + m_sent_bytes += m_send_buf.size() + (addr.address().is_v6() ? 48 : 28); #ifdef TORRENT_DHT_VERBOSE_LOGGING m_total_out_bytes += m_send_buf.size(); - if (m.reply) + if (e["y"].string() == "r") { - ++m_replies_sent[m.message_id]; - m_replies_bytes_sent[m.message_id] += int(m_send_buf.size()); + // TODO: fix this stats logging +// ++m_replies_sent[e["r"]]; +// m_replies_bytes_sent[e["r"]] += int(m_send_buf.size()); } - else + else if (e["y"].string() == "q") { m_queries_out_bytes += m_send_buf.size(); } + TORRENT_LOG(dht_tracker) << "==> " << addr << " " << log_line.str(); #endif } #ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(dht_tracker) << log_line.str() << " ]"; -// TORRENT_LOG(dht_tracker) << std::string(m_send_buf.begin(), m_send_buf.end()); + else + { + TORRENT_LOG(dht_tracker) << "==> " << addr << " DROPPED " << log_line.str(); + } #endif } diff --git a/src/kademlia/find_data.cpp b/src/kademlia/find_data.cpp index 6fc8b0060..53eb5de47 100644 --- a/src/kademlia/find_data.cpp +++ b/src/kademlia/find_data.cpp @@ -38,10 +38,20 @@ POSSIBILITY OF SUCH DAMAGE. #include #include #include +#include +#include namespace libtorrent { namespace dht { +#ifdef TORRENT_DHT_VERBOSE_LOGGING + TORRENT_DECLARE_LOG(dht_tracker); +#endif + +using detail::read_v4_endpoint; +using detail::read_v6_endpoint; +using detail::read_endpoint_list; + find_data_observer::~find_data_observer() { if (m_algorithm) m_algorithm->failed(m_self); @@ -55,22 +65,116 @@ void find_data_observer::reply(msg const& m) return; } - if (!m.write_token.empty()) - m_algorithm->got_write_token(m.id, m.write_token); +#ifdef TORRENT_DHT_VERBOSE_LOGGING + std::stringstream log_line; + log_line << " incoming get_peer response [ "; +#endif - if (!m.peers.empty()) - m_algorithm->got_data(&m); - - if (!m.nodes.empty()) + lazy_entry const* r = m.message.dict_find_dict("r"); + if (!r) { - for (msg::nodes_t::const_iterator i = m.nodes.begin() - , end(m.nodes.end()); i != end; ++i) +#ifdef TORRENT_DHT_VERBOSE_LOGGING + TORRENT_LOG(dht_tracker) << " missing response dict"; +#endif + return; + } + + lazy_entry const* id = r->dict_find_string("id"); + if (!id || id->string_length() != 20) + { +#ifdef TORRENT_DHT_VERBOSE_LOGGING + TORRENT_LOG(dht_tracker) << " invalid id in response"; +#endif + return; + } + + lazy_entry const* token = r->dict_find_string("token"); + if (token) + { + m_algorithm->got_write_token(node_id(id->string_ptr()), token->string_value()); +#ifdef TORRENT_DHT_VERBOSE_LOGGING + log_line << " token: " << to_hex(token->string_value()); +#endif + } + + // look for peers + lazy_entry const* n = r->dict_find_list("values"); + if (n) + { + std::vector peer_list; + if (n->list_size() == 1 && n->list_at(0)->type() == lazy_entry::string_t) { - m_algorithm->traverse(i->id, udp::endpoint(i->addr, i->port)); + // assume it's mainline format + char const* peers = n->list_at(0)->string_ptr(); + char const* end = peers + n->list_at(0)->string_length(); + +#ifdef TORRENT_DHT_VERBOSE_LOGGING + log_line << " p: " << ((end - peers) / 6); +#endif + while (end - peers >= 6) + peer_list.push_back(read_v4_endpoint(peers)); + } + else + { + // assume it's uTorrent/libtorrent format + read_endpoint_list(n, peer_list); +#ifdef TORRENT_DHT_VERBOSE_LOGGING + log_line << " p: " << n->list_size(); +#endif + } + m_algorithm->got_peers(peer_list); + } + + // look for nodes + n = r->dict_find_string("nodes"); + if (n) + { + std::vector node_list; + char const* nodes = n->string_ptr(); + char const* end = nodes + n->string_length(); + +#ifdef TORRENT_DHT_VERBOSE_LOGGING + log_line << " nodes: " << ((end - nodes) / 26); +#endif + while (end - nodes >= 26) + { + node_id id; + std::copy(nodes, nodes + 20, id.begin()); + nodes += 20; + m_algorithm->traverse(id, read_v4_endpoint(nodes)); + } + } + + n = r->dict_find_list("nodes2"); + if (n) + { +#ifdef TORRENT_DHT_VERBOSE_LOGGING + log_line << " nodes2: " << n->list_size(); +#endif + for (int i = 0; i < n->list_size(); ++i) + { + lazy_entry const* p = n->list_at(0); + if (p->type() != lazy_entry::string_t) continue; + if (p->string_length() < 6 + 20) continue; + char const* in = p->string_ptr(); + + node_id id; + std::copy(in, in + 20, id.begin()); + in += 20; + if (p->string_length() == 6 + 20) + m_algorithm->traverse(id, read_v4_endpoint(in)); +#if TORRENT_USE_IPV6 + else if (p->string_length() == 18 + 20) + m_algorithm->traverse(id, read_v6_endpoint(in)); +#endif } } m_algorithm->finished(m_self); m_algorithm = 0; +#ifdef TORRENT_DHT_VERBOSE_LOGGING + log_line << " ]"; + TORRENT_LOG(dht_tracker) << log_line.str(); +#endif } void find_data_observer::timeout() @@ -86,14 +190,17 @@ find_data::find_data( , node_id target , data_callback const& dcallback , nodes_callback const& ncallback) - : traversal_algorithm(node, target, node.m_table.begin(), node.m_table.end()) + : traversal_algorithm(node, target) , m_data_callback(dcallback) , m_nodes_callback(ncallback) , m_target(target) , m_done(false) { - boost::intrusive_ptr self(this); - add_requests(); + for (routing_table::const_iterator i = node.m_table.begin() + , end(node.m_table.end()); i != end; ++i) + { + add_entry(i->id, i->ep(), result::initial); + } } void find_data::invoke(node_id const& id, udp::endpoint addr) @@ -116,12 +223,17 @@ void find_data::invoke(node_id const& id, udp::endpoint addr) #ifdef TORRENT_DEBUG o->m_in_constructor = false; #endif - m_node.m_rpc.invoke(messages::get_peers, addr, o); + entry e; + e["y"] = "q"; + e["q"] = "get_peers"; + entry& a = e["a"]; + a["info_hash"] = id.to_string(); + m_node.m_rpc.invoke(e, addr, o); } -void find_data::got_data(msg const* m) +void find_data::got_peers(std::vector const& peers) { - m_data_callback(m->peers); + m_data_callback(peers); } void find_data::done() diff --git a/src/kademlia/node.cpp b/src/kademlia/node.cpp index 74899fe8b..02f7bc966 100644 --- a/src/kademlia/node.cpp +++ b/src/kademlia/node.cpp @@ -42,6 +42,7 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/hasher.hpp" #include "libtorrent/random_sample.hpp" #include "libtorrent/alert_types.hpp" +#include "libtorrent/socket.hpp" #include "libtorrent/aux_/session_impl.hpp" #include "libtorrent/kademlia/node_id.hpp" #include "libtorrent/kademlia/rpc_manager.hpp" @@ -49,7 +50,6 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/kademlia/node.hpp" #include "libtorrent/kademlia/refresh.hpp" -#include "libtorrent/kademlia/closest_nodes.hpp" #include "libtorrent/kademlia/find_data.hpp" using boost::bind; @@ -57,6 +57,8 @@ using boost::bind; namespace libtorrent { namespace dht { +using detail::write_endpoint; + #ifdef _MSC_VER namespace { @@ -93,24 +95,26 @@ void purge_peers(std::set& peers) void nop() {} node_impl::node_impl(libtorrent::aux::session_impl& ses - , boost::function const& f + , void (*f)(void*, entry const&, udp::endpoint const&, int) , dht_settings const& settings - , boost::optional nid) + , boost::optional nid + , void* userdata) : m_settings(settings) , m_id(nid ? *nid : generate_id()) , m_table(m_id, 8, settings) - , m_rpc(bind(&node_impl::incoming_request, this, _1) - , m_id, m_table, f) + , m_rpc(m_id, m_table, f, userdata) , m_last_tracker_tick(time_now()) , m_ses(ses) + , m_send(f) + , m_userdata(userdata) { m_secret[0] = std::rand(); m_secret[1] = std::rand(); } -bool node_impl::verify_token(msg const& m) +bool node_impl::verify_token(std::string const& token, char const* info_hash + , udp::endpoint const& addr) { - std::string const& token = m.write_token; if (token.length() != 4) { #ifdef TORRENT_DHT_VERBOSE_LOGGING @@ -121,11 +125,11 @@ bool node_impl::verify_token(msg const& m) hasher h1; error_code ec; - std::string address = m.addr.address().to_string(ec); + std::string address = addr.address().to_string(ec); if (ec) return false; h1.update(&address[0], address.length()); h1.update((char*)&m_secret[0], sizeof(m_secret[0])); - h1.update((char*)&m.info_hash[0], sha1_hash::size); + h1.update((char*)info_hash, sha1_hash::size); sha1_hash h = h1.final(); if (std::equal(token.begin(), token.end(), (signed char*)&h[0])) @@ -134,24 +138,24 @@ bool node_impl::verify_token(msg const& m) hasher h2; h2.update(&address[0], address.length()); h2.update((char*)&m_secret[1], sizeof(m_secret[1])); - h2.update((char*)&m.info_hash[0], sha1_hash::size); + h2.update((char*)info_hash, sha1_hash::size); h = h2.final(); if (std::equal(token.begin(), token.end(), (signed char*)&h[0])) return true; return false; } -std::string node_impl::generate_token(msg const& m) +std::string node_impl::generate_token(udp::endpoint const& addr, char const* info_hash) { std::string token; token.resize(4); hasher h; error_code ec; - std::string address = m.addr.address().to_string(ec); + std::string address = addr.address().to_string(ec); TORRENT_ASSERT(!ec); h.update(&address[0], address.length()); h.update((char*)&m_secret[0], sizeof(m_secret[0])); - h.update((char*)&m.info_hash[0], sha1_hash::size); + h.update(info_hash, sha1_hash::size); sha1_hash hash = h.final(); std::copy(hash.begin(), hash.begin() + 4, (signed char*)&token[0]); @@ -159,40 +163,30 @@ std::string node_impl::generate_token(msg const& m) } void node_impl::refresh(node_id const& id - , boost::function0 f) + , find_data::nodes_callback const& f) { - // use the 'bucket size' closest nodes - // to start the refresh with - std::vector start; - start.reserve(m_table.bucket_size()); - m_table.find_node(id, start, routing_table::include_failed); - new dht::refresh(*this, id, start.begin(), start.end(), f); + boost::intrusive_ptr r(new dht::refresh(*this, id, f)); + r->start(); } void node_impl::bootstrap(std::vector const& nodes - , boost::function0 f) + , find_data::nodes_callback const& f) { -/* -#ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(node) << "bootrapping: " << nodes.size(); + boost::intrusive_ptr r(new dht::refresh(*this, m_id, f)); + for (std::vector::const_iterator i = nodes.begin() , end(nodes.end()); i != end; ++i) - TORRENT_LOG(node) << " " << *i; -#endif -*/ - std::vector start; - start.reserve(nodes.size()); - std::copy(nodes.begin(), nodes.end(), std::back_inserter(start)); - new dht::refresh(*this, m_id, start.begin(), start.end(), f); + { + r->add_entry(node_id(0), *i, traversal_algorithm::result::initial); + } + + r->start(); } void node_impl::refresh() { - std::vector start; - start.reserve(m_table.size().get<0>()); - std::copy(m_table.begin(), m_table.end(), std::back_inserter(start)); - - new dht::refresh(*this, m_id, start.begin(), start.end(), bind(&nop)); + boost::intrusive_ptr r(new dht::refresh(*this, m_id, boost::bind(&nop))); + r->start(); } int node_impl::bucket_size(int bucket) @@ -234,11 +228,8 @@ void node_impl::refresh_bucket(int bucket) TORRENT_ASSERT(distance_exp(m_id, target) == bucket); - std::vector start; - start.reserve(m_table.bucket_size()); - m_table.find_node(target, start, routing_table::include_failed); - - new dht::refresh(*this, target, start.begin(), start.end(), bind(&nop)); + boost::intrusive_ptr ta(new dht::refresh(*this, target, bind(&nop))); + ta->start(); m_table.touch_bucket(bucket); } @@ -249,9 +240,46 @@ void node_impl::unreachable(udp::endpoint const& ep) void node_impl::incoming(msg const& m) { - if (m_rpc.incoming(m)) + extern void incoming_error(entry& e, char const* msg); + + // is this a reply? + lazy_entry const* y_ent = m.message.dict_find_string("y"); + if (!y_ent || y_ent->string_length() == 0) { - refresh(); + entry e; + incoming_error(e, "missing 'y' entry"); + m_send(m_userdata, e, m.addr, 0); + return; + } + + char y = *(y_ent->string_ptr()); + + switch (y) + { + case 'r': + { + if (m_rpc.incoming(m)) refresh(); + break; + } + case 'q': + { + TORRENT_ASSERT(m.message.dict_find_string_value("y") == "q"); + entry e; + incoming_request(m, e); + m_send(m_userdata, e, m.addr, 0); + break; + } + case 'e': + { +#ifdef TORRENT_DHT_VERBOSE_LOGGING + lazy_entry const* err = m.message.dict_find_list("e"); + if (err && err->list_size() >= 2) + { + TORRENT_LOG(node) << "INCOMING ERROR: " << err->list_string_value_at(1); + } +#endif + break; + } } } @@ -282,7 +310,13 @@ namespace #ifdef TORRENT_DEBUG o->m_in_constructor = false; #endif - rpc.invoke(messages::announce_peer, i->first.ep(), o); + entry e; + e["y"] = "q"; + e["q"] = "announce_peer"; + entry& a = e["a"]; + a["port"] = listen_port; + a["token"] = i->second; + rpc.invoke(e, i->first.ep(), o); } } } @@ -306,7 +340,10 @@ void node_impl::add_node(udp::endpoint node) #ifdef TORRENT_DEBUG o->m_in_constructor = false; #endif - m_rpc.invoke(messages::ping, node, o); + entry e; + e["y"] = "q"; + e["q"] = "ping"; + m_rpc.invoke(e, node, o); } void node_impl::announce(sha1_hash const& info_hash, int listen_port @@ -317,8 +354,10 @@ void node_impl::announce(sha1_hash const& info_hash, int listen_port #endif // search for nodes with ids close to id or with peers // for info-hash id. then send announce_peer to them. - new find_data(*this, info_hash, f, boost::bind(&announce_fun, _1, boost::ref(m_rpc) - , listen_port, info_hash)); + boost::intrusive_ptr ta(new find_data(*this, info_hash, f + , boost::bind(&announce_fun, _1, boost::ref(m_rpc) + , listen_port, info_hash))); + ta->start(); } time_duration node_impl::refresh_timeout() @@ -388,30 +427,6 @@ time_duration node_impl::connection_timeout() void node_impl::on_announce(msg const& m, msg& reply) { - if (m_ses.m_alerts.should_post()) - m_ses.m_alerts.post_alert(dht_announce_alert( - m.addr.address(), m.port, m.info_hash)); - - if (!verify_token(m)) - { - reply.message_id = messages::error; - reply.error_code = 203; - reply.error_msg = "Incorrect token in announce_peer"; - return; - } - - // the token was correct. That means this - // node is not spoofing its address. So, let - // the table get a chance to add it. - m_table.node_seen(m.id, m.addr); - - torrent_entry& v = m_map[m.info_hash]; - peer_entry e; - e.addr = tcp::endpoint(m.addr.address(), m.port); - e.added = time_now(); - std::set::iterator i = v.peers.find(e); - if (i != v.peers.end()) v.peers.erase(i++); - v.peers.insert(i, e); } namespace @@ -438,12 +453,12 @@ void node_impl::status(session_status& s) } } -bool node_impl::on_find(msg const& m, std::vector& peers) const +bool node_impl::on_find(sha1_hash const& info_hash, std::vector& peers) const { if (m_ses.m_alerts.should_post()) - m_ses.m_alerts.post_alert(dht_get_peers_alert(m.info_hash)); + m_ses.m_alerts.post_alert(dht_get_peers_alert(info_hash)); - table_t::const_iterator i = m_map.find(m.info_hash); + table_t::const_iterator i = m_map.find(info_hash); if (i == m_map.end()) return false; torrent_entry const& v = i->second; @@ -454,75 +469,206 @@ bool node_impl::on_find(msg const& m, std::vector& peers) const random_sample_n(boost::make_transform_iterator(v.peers.begin(), &get_endpoint) , boost::make_transform_iterator(v.peers.end(), &get_endpoint) , std::back_inserter(peers), num); -/* -#ifdef TORRENT_DHT_VERBOSE_LOGGING - for (std::vector::iterator i = peers.begin() - , end(peers.end()); i != end; ++i) - { - TORRENT_LOG(node) << " " << *i; - } -#endif -*/ return true; } -void node_impl::incoming_request(msg const& m) +namespace { - msg reply; - reply.message_id = m.message_id; - reply.addr = m.addr; - reply.reply = true; - reply.transaction_id = m.transaction_id; - - switch (m.message_id) + void write_nodes_entry(entry& r, nodes_t const& nodes) { - case messages::ping: - break; - case messages::get_peers: + bool ipv6_nodes = false; + entry& n = r["nodes"]; + std::back_insert_iterator out(n.string()); + for (nodes_t::const_iterator i = nodes.begin() + , end(nodes.end()); i != end; ++i) { - reply.info_hash = m.info_hash; - reply.write_token = generate_token(m); - - on_find(m, reply.peers); - // always return nodes as well as peers - m_table.find_node(m.info_hash, reply.nodes, 0); -/* -#ifdef TORRENT_DHT_VERBOSE_LOGGING - for (std::vector::iterator i = reply.nodes.begin() - , end(reply.nodes.end()); i != end; ++i) + if (!i->addr.is_v4()) { - TORRENT_LOG(node) << " " << i->id << " " << i->ep(); + ipv6_nodes = true; + continue; } -#endif -*/ + std::copy(i->id.begin(), i->id.end(), out); + write_endpoint(udp::endpoint(i->addr, i->port), out); } - break; - case messages::find_node: + + if (ipv6_nodes) { - reply.info_hash = m.info_hash; - - m_table.find_node(m.info_hash, reply.nodes, 0); -/* -#ifdef TORRENT_DHT_VERBOSE_LOGGING - for (std::vector::iterator i = reply.nodes.begin() - , end(reply.nodes.end()); i != end; ++i) + entry& p = r["nodes2"]; + std::string endpoint; + for (nodes_t::const_iterator i = nodes.begin() + , end(nodes.end()); i != end; ++i) { - TORRENT_LOG(node) << " " << i->id << " " << i->ep(); + if (!i->addr.is_v6()) continue; + endpoint.resize(18 + 20); + std::string::iterator out = endpoint.begin(); + std::copy(i->id.begin(), i->id.end(), out); + out += 20; + write_endpoint(udp::endpoint(i->addr, i->port), out); + endpoint.resize(out - endpoint.begin()); + p.list().push_back(entry(endpoint)); } -#endif -*/ } - break; - case messages::announce_peer: - on_announce(m, reply); - break; - default: - TORRENT_ASSERT(false); - }; + } +} - m_table.heard_about(m.id, m.addr); - m_rpc.reply(reply); +void incoming_error(entry& e, char const* msg) +{ + e["y"] = "e"; + entry::list_type& l = e["e"].list(); + l.push_back(entry(203)); + l.push_back(entry(msg)); +} + +// build response +void node_impl::incoming_request(msg const& m, entry& e) +{ + e = entry(entry::dictionary_t); + e["y"] = "r"; + e["t"] = m.message.dict_find_string_value("t"); + + lazy_entry const* query_ent = m.message.dict_find_string("q"); + if (query_ent == 0) + { + incoming_error(e, "missing 'q' key"); + return; + } + + char const* query = query_ent->string_cstr(); + + lazy_entry const* arg_ent = m.message.dict_find_dict("a"); + if (arg_ent == 0) + { + incoming_error(e, "missing 'a' key"); + return; + } + + lazy_entry const* node_id_ent = arg_ent->dict_find_string("id"); + if (node_id_ent == 0 || node_id_ent->string_length() != 20) + { + incoming_error(e, "missing 'id' key"); + return; + } + + node_id id(node_id_ent->string_ptr()); + + m_table.heard_about(id, m.addr); + + entry& reply = e["r"]; + m_rpc.add_our_id(reply); + + + if (strcmp(query, "ping") == 0) + { + // we already have 't' and 'id' in the response + // no more left to add + } + else if (strcmp(query, "get_peers") == 0) + { + lazy_entry const* info_hash_ent = arg_ent->dict_find_string("info_hash"); + if (info_hash_ent == 0 || info_hash_ent->string_length() != 20) + { + incoming_error(e, "missing 'info-hash' key"); + return; + } + + reply["token"] = generate_token(m.addr, info_hash_ent->string_ptr()); + + sha1_hash info_hash(info_hash_ent->string_ptr()); + nodes_t n; + // always return nodes as well as peers + m_table.find_node(info_hash, n, 0); + write_nodes_entry(reply, n); + + peers_t p; + on_find(info_hash, p); + if (!p.empty()) + { + entry::list_type& pe = reply["values"].list(); + std::string endpoint; + for (peers_t::const_iterator i = p.begin() + , end(p.end()); i != end; ++i) + { + endpoint.resize(18); + std::string::iterator out = endpoint.begin(); + write_endpoint(*i, out); + endpoint.resize(out - endpoint.begin()); + pe.push_back(entry(endpoint)); + } +#ifdef TORRENT_DHT_VERBOSE_LOGGING + TORRENT_LOG(node) << " values: " << p.size(); +#endif + } + } + else if (strcmp(query, "find_node") == 0) + { + lazy_entry const* target_ent = arg_ent->dict_find_string("target"); + if (target_ent == 0 || target_ent->string_length() != 20) + { + incoming_error(e, "missing 'target' key"); + return; + } + + sha1_hash target(target_ent->string_ptr()); + nodes_t n; + // always return nodes as well as peers + m_table.find_node(target, n, 0); + write_nodes_entry(reply, n); + } + else if (strcmp(query, "announce_peer") == 0) + { + lazy_entry const* info_hash_ent = arg_ent->dict_find_string("info_hash"); + if (info_hash_ent == 0 || info_hash_ent->string_length() != 20) + { + incoming_error(e, "missing 'info-hash' key"); + return; + } + + int port = arg_ent->dict_find_int_value("port", -1); + if (port < 0 || port >= 65536) + { + incoming_error(e, "invalid 'port' in announce"); + return; + } + + sha1_hash info_hash(info_hash_ent->string_ptr()); + + if (m_ses.m_alerts.should_post()) + m_ses.m_alerts.post_alert(dht_announce_alert( + m.addr.address(), port, info_hash)); + + lazy_entry const* token = arg_ent->dict_find_string("token"); + if (!token) + { + incoming_error(e, "missing 'token' key in announce"); + return; + } + + if (!verify_token(token->string_value(), info_hash_ent->string_ptr(), m.addr)) + { + incoming_error(e, "invalid token in announce"); + return; + } + + // the token was correct. That means this + // node is not spoofing its address. So, let + // the table get a chance to add it. + m_table.node_seen(id, m.addr); + + torrent_entry& v = m_map[info_hash]; + peer_entry e; + e.addr = tcp::endpoint(m.addr.address(), port); + e.added = time_now(); + std::set::iterator i = v.peers.find(e); + if (i != v.peers.end()) v.peers.erase(i++); + v.peers.insert(i, e); + } + else + { + incoming_error(e, "unknown message"); + return; + } } } } // namespace libtorrent::dht + diff --git a/src/kademlia/refresh.cpp b/src/kademlia/refresh.cpp index fe9d38017..57e4a1183 100644 --- a/src/kademlia/refresh.cpp +++ b/src/kademlia/refresh.cpp @@ -33,76 +33,30 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/pch.hpp" #include -#include #include -#include #include -#include #include -#include - -using boost::bind; - namespace libtorrent { namespace dht { -#ifdef TORRENT_DHT_VERBOSE_LOGGING -TORRENT_DEFINE_LOG(refresh) -#endif - -refresh_observer::~refresh_observer() +refresh::refresh( + node_impl& node + , node_id target + , done_callback const& callback) + : find_data(node, target, find_data::data_callback(), callback) { - if (m_algorithm) m_algorithm->failed(m_self, true); } -void refresh_observer::reply(msg const& in) +char const* refresh::name() const { - if (!m_algorithm) return; - - if (!in.nodes.empty()) - { - for (msg::nodes_t::const_iterator i = in.nodes.begin() - , end(in.nodes.end()); i != end; ++i) - { - m_algorithm->traverse(i->id, udp::endpoint(i->addr, i->port)); - } - } - m_algorithm->finished(m_self); - m_algorithm = 0; -} - -void refresh_observer::timeout() -{ - if (!m_algorithm) return; - m_algorithm->failed(m_self); - m_algorithm = 0; -} - -ping_observer::~ping_observer() -{ - if (m_algorithm) m_algorithm->ping_timeout(m_self, true); -} - -void ping_observer::reply(msg const& m) -{ - if (!m_algorithm) return; - - m_algorithm->ping_reply(m_self); - m_algorithm = 0; -} - -void ping_observer::timeout() -{ - if (!m_algorithm) return; - m_algorithm->ping_timeout(m_self); - m_algorithm = 0; + return "refresh"; } void refresh::invoke(node_id const& nid, udp::endpoint addr) { - TORRENT_ASSERT(m_node.m_rpc.allocation_size() >= sizeof(refresh_observer)); + TORRENT_ASSERT(m_node.m_rpc.allocation_size() >= sizeof(find_data_observer)); void* ptr = m_node.m_rpc.allocator().malloc(); if (ptr == 0) { @@ -110,84 +64,16 @@ void refresh::invoke(node_id const& nid, udp::endpoint addr) return; } m_node.m_rpc.allocator().set_next_size(10); - observer_ptr o(new (ptr) refresh_observer( this, nid)); + observer_ptr o(new (ptr) find_data_observer(this, nid)); #ifdef TORRENT_DEBUG o->m_in_constructor = false; #endif - - m_node.m_rpc.invoke(messages::find_node, addr, o); -} - -void refresh::done() -{ - int max_results = m_node.m_table.bucket_size(); - m_leftover_nodes_iterator = (int)m_results.size() > max_results ? - m_results.begin() + max_results : m_results.end(); - - invoke_pings_or_finish(); -} - -void refresh::ping_reply(node_id nid) -{ - m_active_pings--; - invoke_pings_or_finish(); -} - -void refresh::ping_timeout(node_id nid, bool prevent_request) -{ - m_active_pings--; - invoke_pings_or_finish(prevent_request); -} - -void refresh::invoke_pings_or_finish(bool prevent_request) -{ - if (prevent_request) - { - --m_max_active_pings; - if (m_max_active_pings <= 0) - m_max_active_pings = 1; - } - else - { - while (m_active_pings < m_max_active_pings) - { - if (m_leftover_nodes_iterator == m_results.end()) break; - - result const& node = *m_leftover_nodes_iterator; - - // Skip initial nodes - if (node.flags & result::initial) - { - ++m_leftover_nodes_iterator; - continue; - } - -#ifndef BOOST_NO_EXCEPTIONS - try - { -#endif - TORRENT_ASSERT(m_node.m_rpc.allocation_size() >= sizeof(ping_observer)); - void* ptr = m_node.m_rpc.allocator().malloc(); - if (ptr == 0) return; - m_node.m_rpc.allocator().set_next_size(10); - observer_ptr o(new (ptr) ping_observer(this, node.id)); -#ifdef TORRENT_DEBUG - o->m_in_constructor = false; -#endif - m_node.m_rpc.invoke(messages::ping, node.addr, o); - ++m_active_pings; - ++m_leftover_nodes_iterator; -#ifndef BOOST_NO_EXCEPTIONS - } - catch (std::exception& e) {} -#endif - } - } - - if (m_active_pings == 0) - { - m_done_callback(); - } + entry e; + e["y"] = "q"; + e["q"] = "find_node"; + entry& a = e["a"]; + a["target"] = target().to_string(); + m_node.m_rpc.invoke(e, addr, o); } } } // namespace libtorrent::dht diff --git a/src/kademlia/rpc_manager.cpp b/src/kademlia/rpc_manager.cpp index be4cc519a..d73f8a7e2 100644 --- a/src/kademlia/rpc_manager.cpp +++ b/src/kademlia/rpc_manager.cpp @@ -47,7 +47,6 @@ POSSIBILITY OF SUCH DAMAGE. #include #include #include -#include #include #include #include @@ -87,14 +86,49 @@ void intrusive_ptr_release(observer const* o) } } +void observer::set_target(udp::endpoint const& ep) +{ +#ifdef TORRENT_DHT_VERBOSE_LOGGING + // use high resolution timers for logging + m_sent = time_now_hires(); +#else + m_sent = time_now(); +#endif + + m_port = ep.port(); +#if TORRENT_USE_IPV6 + if (ep.address().is_v6()) + { + m_is_v6 = true; + m_addr.v6 = ep.address().to_v6().to_bytes(); + } + else +#endif + { + m_is_v6 = false; + m_addr.v4 = ep.address().to_v4().to_bytes(); + } +} + +address observer::target_addr() const +{ + if (m_is_v6) + return address_v6(m_addr.v6); + else + return address_v4(m_addr.v4); +} + +udp::endpoint observer::target_ep() const +{ + return udp::endpoint(target_addr(), m_port); +} + + node_id generate_id(); typedef mpl::vector< - closest_nodes_observer - , find_data_observer + find_data_observer , announce_observer - , refresh_observer - , ping_observer , null_observer > observer_types; @@ -102,13 +136,14 @@ typedef mpl::max_element< mpl::transform_view > >::type max_observer_type_iter; -rpc_manager::rpc_manager(fun const& f, node_id const& our_id - , routing_table& table, send_fun const& sf) +rpc_manager::rpc_manager(node_id const& our_id + , routing_table& table, send_fun const& sf + , void* userdata) : m_pool_allocator(sizeof(mpl::deref::type), 10) , m_next_transaction_id(std::rand() % max_transactions) , m_oldest_transaction_id(m_next_transaction_id) - , m_incoming(f) , m_send(sf) + , m_userdata(userdata) , m_our_id(our_id) , m_table(table) , m_timer(time_now()) @@ -119,12 +154,23 @@ rpc_manager::rpc_manager(fun const& f, node_id const& our_id #ifdef TORRENT_DHT_VERBOSE_LOGGING TORRENT_LOG(rpc) << "Constructing"; - TORRENT_LOG(rpc) << " closest_nodes_observer: " << sizeof(closest_nodes_observer); - TORRENT_LOG(rpc) << " find_data_observer: " << sizeof(find_data_observer); TORRENT_LOG(rpc) << " announce_observer: " << sizeof(announce_observer); - TORRENT_LOG(rpc) << " refresh_observer: " << sizeof(refresh_observer); - TORRENT_LOG(rpc) << " ping_observer: " << sizeof(ping_observer); TORRENT_LOG(rpc) << " null_observer: " << sizeof(null_observer); + +#define PRINT_OFFSETOF(x, y) TORRENT_LOG(rpc) << " +" << offsetof(x, y) << ": " #y + + TORRENT_LOG(rpc) << " observer: " << sizeof(observer); + PRINT_OFFSETOF(observer, pool_allocator); + PRINT_OFFSETOF(observer, m_sent); + PRINT_OFFSETOF(observer, m_refs); + PRINT_OFFSETOF(observer, m_addr); + PRINT_OFFSETOF(observer, m_port); + + TORRENT_LOG(rpc) << " find_data_observer: " << sizeof(find_data_observer); + PRINT_OFFSETOF(find_data_observer, m_algorithm); + PRINT_OFFSETOF(find_data_observer, m_self); + +#undef PRINT_OFFSETOF #endif } @@ -201,96 +247,91 @@ void rpc_manager::unreachable(udp::endpoint const& ep) } } +// defined in node.cpp +void incoming_error(entry& e, char const* msg); + bool rpc_manager::incoming(msg const& m) { INVARIANT_CHECK; if (m_destructing) return false; - if (m.reply) + // we only deal with replies, not queries + TORRENT_ASSERT(m.message.dict_find_string_value("y") == "r"); + + // if we don't have the transaction id in our + // request list, ignore the packet + + std::string transaction_id = m.message.dict_find_string_value("t"); + + std::string::const_iterator i = transaction_id.begin(); + int tid = transaction_id.size() != 2 ? -1 : io::read_uint16(i); + + observer_ptr o; + + if (tid >= (int)m_transactions.size() || tid < 0) { - // if we don't have the transaction id in our - // request list, ignore the packet +#ifdef TORRENT_DHT_VERBOSE_LOGGING + TORRENT_LOG(rpc) << "Reply with invalid transaction id size: " + << transaction_id.size() << " from " << m.addr; +#endif + entry e; + incoming_error(e, "invalid transaction id"); + m_send(m_userdata, e, m.addr, 0); + return false; + } - if (m.transaction_id.size() < 2) - { -#ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(rpc) << "Reply with invalid transaction id size: " - << m.transaction_id.size() << " from " << m.addr; -#endif - msg reply; - reply.reply = true; - reply.message_id = messages::error; - reply.error_code = 203; // Protocol error - reply.error_msg = "reply with invalid transaction id, size " - + boost::lexical_cast(m.transaction_id.size()); - reply.addr = m.addr; - reply.transaction_id = ""; - m_send(reply); - return false; - } - - std::string::const_iterator i = m.transaction_id.begin(); - int tid = io::read_uint16(i); + o = m_transactions[tid]; - if (tid >= (int)m_transactions.size() - || tid < 0) - { + if (!o) + { #ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(rpc) << "Reply with invalid transaction id: " - << tid << " from " << m.addr; -#endif - msg reply; - reply.reply = true; - reply.message_id = messages::error; - reply.error_code = 203; // Protocol error - reply.error_msg = "reply with invalid transaction id"; - reply.addr = m.addr; - reply.transaction_id = ""; - m_send(reply); - return false; - } - - observer_ptr o = m_transactions[tid]; - - if (!o) - { -#ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(rpc) << "Reply with unknown transaction id: " - << tid << " from " << m.addr << " (possibly timed out)"; -#endif - return false; - } - - if (m.addr.address() != o->target_addr) - { -#ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(rpc) << "Reply with incorrect address and valid transaction id: " - << tid << " from " << m.addr << " expected: " << o->target_addr; -#endif - return false; - } - -#ifdef TORRENT_DHT_VERBOSE_LOGGING - std::ofstream reply_stats("round_trip_ms.log", std::ios::app); - reply_stats << m.addr << "\t" << total_milliseconds(time_now() - o->sent) - << std::endl; -#endif -#ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(rpc) << "Reply with transaction id: " + TORRENT_LOG(rpc) << "Reply to a timed out request " << tid << " from " << m.addr; #endif - o->reply(m); - m_transactions[tid] = 0; - return m_table.node_seen(m.id, m.addr); + return false; } - else + + if (m.addr.address() != o->target_addr()) { - TORRENT_ASSERT(m.message_id != messages::error); - // this is an incoming request - m_incoming(m); +#ifdef TORRENT_DHT_VERBOSE_LOGGING + TORRENT_LOG(rpc) << "Reply with incorrect address and valid transaction id: " + << tid << " from " << m.addr << " expected: " << o->target_addr(); +#endif + return false; } - return false; + +#ifdef TORRENT_DHT_VERBOSE_LOGGING + std::ofstream reply_stats("round_trip_ms.log", std::ios::app); + reply_stats << m.addr << "\t" << total_milliseconds(time_now_hires() - o->sent()) + << std::endl; +#endif + + lazy_entry const* ret_ent = m.message.dict_find_dict("r"); + if (ret_ent == 0) + { + entry e; + incoming_error(e, "missing 'r' key"); + m_send(m_userdata, e, m.addr, 0); + return false; + } + + lazy_entry const* node_id_ent = ret_ent->dict_find_string("id"); + if (node_id_ent == 0 || node_id_ent->string_length() != 20) + { + entry e; + incoming_error(e, "missing 'id' key"); + m_send(m_userdata, e, m.addr, 0); + return false; + } + +#ifdef TORRENT_DHT_VERBOSE_LOGGING + TORRENT_LOG(rpc) << "Reply with transaction id: " + << tid << " from " << m.addr; +#endif + o->reply(m); + m_transactions[tid] = 0; + return m_table.node_seen(node_id(node_id_ent->string_ptr()), m.addr); } time_duration rpc_manager::tick() @@ -316,7 +357,7 @@ time_duration rpc_manager::tick() observer_ptr o = m_transactions[m_oldest_transaction_id]; if (!o) continue; - time_duration diff = o->sent + milliseconds(timeout_ms) - time_now(); + time_duration diff = o->sent() + milliseconds(timeout_ms) - time_now(); if (diff > seconds(0)) { if (diff < seconds(1)) @@ -372,7 +413,7 @@ unsigned int rpc_manager::new_transaction_id(observer_ptr o) #ifdef TORRENT_DHT_VERBOSE_LOGGING TORRENT_LOG(rpc) << "[new_transaction_id] Aborting message with transaction id: " << m_next_transaction_id << " sent to " << o->target_ep() - << " " << total_seconds(time_now() - o->sent) << " seconds ago"; + << " " << total_seconds(time_now() - o->sent()) << " seconds ago"; #endif m_transactions[m_next_transaction_id] = 0; TORRENT_ASSERT(m_oldest_transaction_id == m_next_transaction_id); @@ -406,7 +447,12 @@ void rpc_manager::update_oldest_transaction_id() } } -void rpc_manager::invoke(int message_id, udp::endpoint target_addr +void rpc_manager::add_our_id(entry& e) +{ + e["id"] = m_our_id.to_string(); +} + +void rpc_manager::invoke(entry& e, udp::endpoint target_addr , observer_ptr o) { INVARIANT_CHECK; @@ -417,11 +463,10 @@ void rpc_manager::invoke(int message_id, udp::endpoint target_addr return; } - msg m; - m.message_id = message_id; - m.reply = false; - m.id = m_our_id; - m.addr = target_addr; + e["y"] = "q"; + entry& a = e["a"]; + add_our_id(a); + TORRENT_ASSERT(!m_transactions[m_next_transaction_id]); #ifdef TORRENT_DEBUG int potential_new_id = m_next_transaction_id; @@ -430,25 +475,18 @@ void rpc_manager::invoke(int message_id, udp::endpoint target_addr try { #endif - m.transaction_id.clear(); - std::back_insert_iterator out(m.transaction_id); + std::string transaction_id; + transaction_id.resize(2); + char* out = &transaction_id[0]; io::write_uint16(m_next_transaction_id, out); + e["t"] = transaction_id; - o->send(m); - - o->sent = time_now(); -#if TORRENT_USE_IPV6 - o->target_addr = target_addr.address(); -#else - o->target_addr = target_addr.address().to_v4(); -#endif - o->port = target_addr.port(); + o->set_target(target_addr); #ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(rpc) << "Invoking " << messages::ids[message_id] - << " -> " << target_addr; + TORRENT_LOG(rpc) << "Invoking " << e["q"].string() << " -> " << target_addr; #endif - m_send(m); + m_send(m_userdata, e, target_addr, 1); new_transaction_id(o); #ifndef BOOST_NO_EXCEPTIONS } @@ -460,7 +498,7 @@ void rpc_manager::invoke(int message_id, udp::endpoint target_addr } #endif } - +/* void rpc_manager::reply(msg& m) { INVARIANT_CHECK; @@ -472,6 +510,6 @@ void rpc_manager::reply(msg& m) m_send(m); } - +*/ } } // namespace libtorrent::dht diff --git a/src/kademlia/traversal_algorithm.cpp b/src/kademlia/traversal_algorithm.cpp index a63bfe388..539f21097 100644 --- a/src/kademlia/traversal_algorithm.cpp +++ b/src/kademlia/traversal_algorithm.cpp @@ -82,6 +82,16 @@ void traversal_algorithm::add_entry(node_id const& id, udp::endpoint addr, unsig } } +void traversal_algorithm::start() +{ + add_requests(); + + // in case the routing table is empty, use the + // router nodes in the table + if (m_results.empty()) add_router_entries(); + init(); +} + boost::pool<>& traversal_algorithm::allocator() const { return m_node.m_rpc.allocator(); diff --git a/src/lazy_bdecode.cpp b/src/lazy_bdecode.cpp index 7fcf4e176..c0c3e61fc 100644 --- a/src/lazy_bdecode.cpp +++ b/src/lazy_bdecode.cpp @@ -372,7 +372,7 @@ namespace libtorrent } #endif // TORRENT_USE_IOSTREAM - std::string print_entry(lazy_entry const& e) + std::string print_entry(lazy_entry const& e, bool single_line) { std::string ret; switch (e.type()) @@ -420,7 +420,9 @@ namespace libtorrent || (e.list_at(0)->type() == lazy_entry::string_t && (e.list_at(0)->string_length() < 10 || e.list_size() < 2) - && e.list_size() < 5)); + && e.list_size() < 5)) + || single_line; + if (!one_liner) ret += "\n"; for (int i = 0; i < e.list_size(); ++i) { @@ -435,12 +437,13 @@ namespace libtorrent case lazy_entry::dict_t: { ret += "{"; - bool one_liner = (e.dict_size() == 0 + bool one_liner = ((e.dict_size() == 0 || e.dict_at(0).second->type() == lazy_entry::int_t || (e.dict_at(0).second->type() == lazy_entry::string_t && e.dict_at(0).second->string_length() < 30) || e.dict_at(0).first.size() < 10) - && e.dict_size() < 5; + && e.dict_size() < 5) + || single_line; if (!one_liner) ret += "\n"; for (int i = 0; i < e.dict_size(); ++i) diff --git a/src/session_impl.cpp b/src/session_impl.cpp index ecc2492ec..32d7c051a 100644 --- a/src/session_impl.cpp +++ b/src/session_impl.cpp @@ -289,11 +289,8 @@ namespace aux { PRINT_SIZEOF(policy::ipv6_peer) #endif - PRINT_SIZEOF(dht::closest_nodes_observer) PRINT_SIZEOF(dht::find_data_observer) PRINT_SIZEOF(dht::announce_observer) - PRINT_SIZEOF(dht::refresh_observer) - PRINT_SIZEOF(dht::ping_observer) PRINT_SIZEOF(dht::null_observer) #undef PRINT_OFFSETOF #undef PRINT_SIZEOF