clean up the udp socket and its consumers by adding an observer interface and have the udp tracker, utp socket manager and dht tracker subscribe to it instead of going through the session_impl. This probably improves performance a tiny bit but primarily improves modularization and testability

This commit is contained in:
Arvid Norberg
2012-06-22 04:21:20 +00:00
parent 14c9d8d7e9
commit d73bbf5053
10 changed files with 164 additions and 229 deletions

View File

@@ -188,6 +188,7 @@ namespace libtorrent
, dht::dht_observer , dht::dht_observer
, boost::noncopyable , boost::noncopyable
, initialize_timer , initialize_timer
, udp_socket_observer
, boost::enable_shared_from_this<session_impl> , boost::enable_shared_from_this<session_impl>
{ {
#if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING
@@ -884,13 +885,8 @@ namespace libtorrent
deadline_timer m_dht_announce_timer; deadline_timer m_dht_announce_timer;
#endif #endif
void on_receive_udp(error_code const& e bool incoming_packet(error_code const& ec
, udp::endpoint const& ep, char const* buf, int len); , udp::endpoint const&, char const* buf, int size);
void on_receive_udp_hostname(error_code const& e
, char const* hostname, char const* buf, int len);
void on_udp_socket_drained();
// see m_external_listen_port. This is the same // see m_external_listen_port. This is the same
// but for the udp port used by the DHT. // but for the udp port used by the DHT.

View File

@@ -71,11 +71,12 @@ namespace libtorrent { namespace dht
TORRENT_EXTRA_EXPORT void intrusive_ptr_add_ref(dht_tracker const*); TORRENT_EXTRA_EXPORT void intrusive_ptr_add_ref(dht_tracker const*);
TORRENT_EXTRA_EXPORT void intrusive_ptr_release(dht_tracker const*); TORRENT_EXTRA_EXPORT void intrusive_ptr_release(dht_tracker const*);
struct dht_tracker : udp_socket_interface struct dht_tracker : udp_socket_interface, udp_socket_observer
{ {
friend void intrusive_ptr_add_ref(dht_tracker const*); friend void intrusive_ptr_add_ref(dht_tracker const*);
friend void intrusive_ptr_release(dht_tracker const*); friend void intrusive_ptr_release(dht_tracker const*);
// TODO: take a udp_socket_interface here instead. Move udp_socket_interface down into libtorrent core
dht_tracker(libtorrent::aux::session_impl& ses, rate_limited_udp_socket& sock dht_tracker(libtorrent::aux::session_impl& ses, rate_limited_udp_socket& sock
, dht_settings const& settings, entry const* state = 0); , dht_settings const& settings, entry const* state = 0);
@@ -96,8 +97,8 @@ namespace libtorrent { namespace dht
// translate bittorrent kademlia message into the generic kademlia message // translate bittorrent kademlia message into the generic kademlia message
// used by the library // used by the library
void on_receive(udp::endpoint const& ep, char const* pkt, int size); virtual bool incoming_packet(error_code const& ec
void on_unreachable(udp::endpoint const& ep); , udp::endpoint const&, char const* buf, int size);
private: private:

View File

@@ -63,6 +63,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/intrusive_ptr_base.hpp" #include "libtorrent/intrusive_ptr_base.hpp"
#include "libtorrent/size_type.hpp" #include "libtorrent/size_type.hpp"
#include "libtorrent/union_endpoint.hpp" #include "libtorrent/union_endpoint.hpp"
#include "libtorrent/udp_socket.hpp" // for udp_socket_observer
#ifdef TORRENT_USE_OPENSSL #ifdef TORRENT_USE_OPENSSL
#include <boost/asio/ssl/context.hpp> #include <boost/asio/ssl/context.hpp>
#endif #endif
@@ -263,7 +264,7 @@ namespace libtorrent
const tracker_request m_req; const tracker_request m_req;
}; };
class TORRENT_EXTRA_EXPORT tracker_manager: boost::noncopyable class TORRENT_EXTRA_EXPORT tracker_manager: public udp_socket_observer, boost::noncopyable
{ {
public: public:
@@ -289,11 +290,13 @@ namespace libtorrent
void sent_bytes(int bytes); void sent_bytes(int bytes);
void received_bytes(int bytes); void received_bytes(int bytes);
bool incoming_udp(error_code const& e, udp::endpoint const& ep, char const* buf, int size); virtual bool incoming_packet(error_code const& e, udp::endpoint const& ep
, char const* buf, int size);
// this is only used for SOCKS packets, since // this is only used for SOCKS packets, since
// they may be addressed to hostname // they may be addressed to hostname
bool incoming_udp(error_code const& e, char const* hostname, char const* buf, int size); virtual bool incoming_packet(error_code const& e, char const* hostname
, char const* buf, int size);
private: private:

View File

@@ -42,25 +42,28 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/deadline_timer.hpp" #include "libtorrent/deadline_timer.hpp"
#include <deque> #include <deque>
#include <boost/function/function4.hpp>
#include <boost/function/function0.hpp>
namespace libtorrent namespace libtorrent
{ {
class connection_queue; class connection_queue;
struct udp_socket_observer
{
// return true if the packet was handled (it won't be
// propagated to the next observer)
virtual bool incoming_packet(error_code const& ec
, udp::endpoint const&, char const* buf, int size) = 0;
virtual bool incoming_packet(error_code const& ec
, char const* hostname, char const* buf, int size) { return false; }
// called every time the socket is drained of packets
virtual void socket_drained() {}
};
class udp_socket class udp_socket
{ {
public: public:
// TODO: instead of these callbacks, support observers udp_socket(io_service& ios, connection_queue& cc);
typedef boost::function<void(error_code const& ec
, udp::endpoint const&, char const* buf, int size)> callback_t;
typedef boost::function<void(error_code const& ec
, char const*, char const* buf, int size)> callback2_t;
typedef boost::function<void()> drain_callback_t;
udp_socket(io_service& ios, callback_t const& c, callback2_t const& c2
, drain_callback_t const& dc, connection_queue& cc);
~udp_socket(); ~udp_socket();
enum flags_t { dont_drop = 1, peer_connection = 2 }; enum flags_t { dont_drop = 1, peer_connection = 2 };
@@ -75,6 +78,9 @@ namespace libtorrent
} }
io_service& get_io_service() { return m_ipv4_sock.get_io_service(); } io_service& get_io_service() { return m_ipv4_sock.get_io_service(); }
void subscribe(udp_socket_observer* o);
void unsubscribe(udp_socket_observer* o);
// this is only valid when using a socks5 proxy // this is only valid when using a socks5 proxy
void send_hostname(char const* hostname, int port, char const* p, int len, error_code& ec); void send_hostname(char const* hostname, int port, char const* p, int len, error_code& ec);
@@ -140,15 +146,11 @@ namespace libtorrent
public: public:
#endif #endif
// callback for regular incoming packets std::vector<udp_socket_observer*> m_observers;
callback_t m_callback;
// callback for proxied incoming packets with a domain void call_handler(error_code const& ec, udp::endpoint const& ep, char const* buf, int size);
// name as source void call_handler(error_code const& ec, const char* host, char const* buf, int size);
callback2_t m_callback2; void call_drained_handler();
// called every time we drain the udp sockets
drain_callback_t m_drained_callback;
void setup_read(udp::socket* s); void setup_read(udp::socket* s);
void on_read(udp::socket* s); void on_read(udp::socket* s);
@@ -171,8 +173,6 @@ namespace libtorrent
void wrap(char const* hostname, int port, char const* p, int len, error_code& ec); void wrap(char const* hostname, int port, char const* p, int len, error_code& ec);
void unwrap(error_code const& e, char const* buf, int size); void unwrap(error_code const& e, char const* buf, int size);
bool maybe_clear_callback();
#if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS #if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS
#if defined BOOST_HAS_PTHREADS #if defined BOOST_HAS_PTHREADS
mutable pthread_t m_thread; mutable pthread_t m_thread;
@@ -230,8 +230,7 @@ namespace libtorrent
struct rate_limited_udp_socket : public udp_socket struct rate_limited_udp_socket : public udp_socket
{ {
rate_limited_udp_socket(io_service& ios, callback_t const& c rate_limited_udp_socket(io_service& ios, connection_queue& cc);
, callback2_t const& c2, drain_callback_t const& dc, connection_queue& cc);
void set_rate_limit(int limit) { m_rate_limit = limit; } void set_rate_limit(int limit) { m_rate_limit = limit; }
bool can_send() const { return int(m_queue.size()) >= m_queue_size_limit; } bool can_send() const { return int(m_queue.size()) >= m_queue_size_limit; }
bool send(udp::endpoint const& ep, char const* p, int len, error_code& ec, int flags = 0); bool send(udp::endpoint const& ep, char const* p, int len, error_code& ec, int flags = 0);

View File

@@ -47,7 +47,7 @@ namespace libtorrent
typedef boost::function<void(boost::shared_ptr<socket_type> const&)> incoming_utp_callback_t; typedef boost::function<void(boost::shared_ptr<socket_type> const&)> incoming_utp_callback_t;
struct utp_socket_manager struct utp_socket_manager : udp_socket_observer
{ {
utp_socket_manager(session_settings const& sett, udp_socket& s, incoming_utp_callback_t cb); utp_socket_manager(session_settings const& sett, udp_socket& s, incoming_utp_callback_t cb);
~utp_socket_manager(); ~utp_socket_manager();
@@ -55,8 +55,12 @@ namespace libtorrent
void get_status(utp_status& s) const; void get_status(utp_status& s) const;
// return false if this is not a uTP packet // return false if this is not a uTP packet
bool incoming_packet(char const* p, int size, udp::endpoint const& ep); virtual bool incoming_packet(error_code const& ec, udp::endpoint const& ep
void socket_drained(); , char const* p, int size);
virtual bool incoming_packet(error_code const& ec, char const* host, char const* p, int size)
{ return false; }
virtual void socket_drained();
void tick(ptime now); void tick(ptime now);

View File

@@ -421,17 +421,26 @@ namespace libtorrent { namespace dht
} }
void dht_tracker::on_unreachable(udp::endpoint const& ep)
{
m_dht.unreachable(ep);
}
// translate bittorrent kademlia message into the generice kademlia message // translate bittorrent kademlia message into the generice kademlia message
// used by the library // used by the library
void dht_tracker::on_receive(udp::endpoint const& ep, char const* buf, int bytes_transferred) bool dht_tracker::incoming_packet(error_code const& ec
, udp::endpoint const& ep, char const* buf, int size)
{ {
if (ec)
{
if (ec == asio::error::connection_refused
|| ec == asio::error::connection_reset
|| ec == asio::error::connection_aborted)
{
m_dht.unreachable(ep);
}
return false;
}
if (size <= 20 || *buf != 'd' || buf[size-1] != 'e') return false;
// account for IP and UDP overhead // account for IP and UDP overhead
m_received_bytes += bytes_transferred + (ep.address().is_v6() ? 48 : 28); m_received_bytes += size + (ep.address().is_v6() ? 48 : 28);
node_ban_entry* match = 0; node_ban_entry* match = 0;
node_ban_entry* min = m_ban_nodes; node_ban_entry* min = m_ban_nodes;
@@ -464,7 +473,7 @@ namespace libtorrent { namespace dht
// we've received 20 messages in less than 5 seconds from // we've received 20 messages in less than 5 seconds from
// this node. Ignore it until it's silent for 5 minutes // this node. Ignore it until it's silent for 5 minutes
match->limit = now + minutes(5); match->limit = now + minutes(5);
return; return true;
} }
// we got 50 messages from this peer, but it was in // we got 50 messages from this peer, but it was in
@@ -488,19 +497,19 @@ namespace libtorrent { namespace dht
using libtorrent::entry; using libtorrent::entry;
using libtorrent::bdecode; using libtorrent::bdecode;
TORRENT_ASSERT(bytes_transferred > 0); TORRENT_ASSERT(size > 0);
lazy_entry e; lazy_entry e;
int pos; int pos;
error_code ec; error_code err;
int ret = lazy_bdecode(buf, buf + bytes_transferred, e, ec, &pos, 10, 500); int ret = lazy_bdecode(buf, buf + size, e, err, &pos, 10, 500);
if (ret != 0) if (ret != 0)
{ {
#ifdef TORRENT_DHT_VERBOSE_LOGGING #ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(dht_tracker) << "<== " << ep << " ERROR: " TORRENT_LOG(dht_tracker) << "<== " << ep << " ERROR: "
<< ec.message() << " pos: " << pos; << err.message() << " pos: " << pos;
#endif #endif
return; return false;
} }
libtorrent::dht::msg m(e, ep); libtorrent::dht::msg m(e, ep);
@@ -516,7 +525,7 @@ namespace libtorrent { namespace dht
// entry r; // entry r;
// libtorrent::dht::incoming_error(r, "message is not a dictionary"); // libtorrent::dht::incoming_error(r, "message is not a dictionary");
// send_packet(r, ep, 0); // send_packet(r, ep, 0);
return; return false;
} }
#ifdef TORRENT_DHT_VERBOSE_LOGGING #ifdef TORRENT_DHT_VERBOSE_LOGGING
@@ -525,6 +534,7 @@ namespace libtorrent { namespace dht
#endif #endif
m_dht.incoming(m); m_dht.incoming(m);
return true;
} }
void add_node_fun(void* userdata, node_entry const& e) void add_node_fun(void* userdata, node_entry const& e)

View File

@@ -641,11 +641,7 @@ namespace aux {
, m_dht_announce_timer(m_io_service) , m_dht_announce_timer(m_io_service)
#endif #endif
, m_external_udp_port(0) , m_external_udp_port(0)
, m_udp_socket(m_io_service , m_udp_socket(m_io_service, m_half_open)
, boost::bind(&session_impl::on_receive_udp, this, _1, _2, _3, _4)
, boost::bind(&session_impl::on_receive_udp_hostname, this, _1, _2, _3, _4)
, boost::bind(&session_impl::on_udp_socket_drained, this)
, m_half_open)
, m_utp_socket_manager(m_settings, m_udp_socket , m_utp_socket_manager(m_settings, m_udp_socket
, boost::bind(&session_impl::incoming_connection, this, _1)) , boost::bind(&session_impl::incoming_connection, this, _1))
, m_boost_connections(0) , m_boost_connections(0)
@@ -670,6 +666,10 @@ namespace aux {
memset(m_redundant_bytes, 0, sizeof(m_redundant_bytes)); memset(m_redundant_bytes, 0, sizeof(m_redundant_bytes));
m_udp_socket.set_rate_limit(m_settings.dht_upload_rate_limit); m_udp_socket.set_rate_limit(m_settings.dht_upload_rate_limit);
m_udp_socket.subscribe(&m_tracker_manager);
m_udp_socket.subscribe(&m_utp_socket_manager);
m_udp_socket.subscribe(this);
m_disk_queues[0] = 0; m_disk_queues[0] = 0;
m_disk_queues[1] = 0; m_disk_queues[1] = 0;
@@ -2448,65 +2448,20 @@ namespace aux {
} }
#endif #endif
void session_impl::on_receive_udp(error_code const& e bool session_impl::incoming_packet(error_code const& ec
, udp::endpoint const& ep, char const* buf, int len) , udp::endpoint const& ep, char const* buf, int size)
{ {
#ifdef TORRENT_STATS #ifdef TORRENT_STATS
++m_num_messages[on_udp_counter]; ++m_num_messages[on_udp_counter];
#endif #endif
if (e)
{
if (e == asio::error::connection_refused
|| e == asio::error::connection_reset
|| e == asio::error::connection_aborted)
{
#ifndef TORRENT_DISABLE_DHT
if (m_dht) m_dht->on_unreachable(ep);
#endif
if (m_tracker_manager.incoming_udp(e, ep, buf, len))
m_stat.received_tracker_bytes(len + 28);
}
if (ec)
{
// don't bubble up operation aborted errors to the user // don't bubble up operation aborted errors to the user
if (e != asio::error::operation_aborted if (ec != asio::error::operation_aborted
&& m_alerts.should_post<udp_error_alert>()) && m_alerts.should_post<udp_error_alert>())
m_alerts.post_alert(udp_error_alert(ep, e)); m_alerts.post_alert(udp_error_alert(ep, ec));
return;
} }
#ifndef TORRENT_DISABLE_DHT
if (len > 20 && *buf == 'd' && buf[len-1] == 'e' && m_dht)
{
// this is probably a dht message
m_dht->on_receive(ep, buf, len);
return;
}
#endif
if (m_utp_socket_manager.incoming_packet(buf, len, ep))
return;
// maybe it's a udp tracker response
if (m_tracker_manager.incoming_udp(e, ep, buf, len))
m_stat.received_tracker_bytes(len + 28);
}
void session_impl::on_receive_udp_hostname(error_code const& e
, char const* hostname, char const* buf, int len)
{
// it's probably a udp tracker response
if (m_tracker_manager.incoming_udp(e, hostname, buf, len))
{
m_stat.received_tracker_bytes(len + 28);
}
}
// this is called every time all packets have been read from
// the udp socket. The utp_socket_manager uses this event to
// trigger a flush of deferred ACKs
void session_impl::on_udp_socket_drained()
{
m_utp_socket_manager.socket_drained();
} }
void session_impl::async_accept(boost::shared_ptr<socket_acceptor> const& listener, bool ssl) void session_impl::async_accept(boost::shared_ptr<socket_acceptor> const& listener, bool ssl)
@@ -5405,6 +5360,8 @@ namespace aux {
m_dht->start(startup_state); m_dht->start(startup_state);
m_udp_socket.subscribe(m_dht.get());
// announce all torrents we have to the DHT // announce all torrents we have to the DHT
for (torrent_map::const_iterator i = m_torrents.begin() for (torrent_map::const_iterator i = m_torrents.begin()
, end(m_torrents.end()); i != end; ++i) , end(m_torrents.end()); i != end; ++i)
@@ -5416,6 +5373,7 @@ namespace aux {
void session_impl::stop_dht() void session_impl::stop_dht()
{ {
if (!m_dht) return; if (!m_dht) return;
m_udp_socket.unsubscribe(m_dht.get());
m_dht->stop(); m_dht->stop();
m_dht = 0; m_dht = 0;
} }
@@ -5543,6 +5501,10 @@ namespace aux {
if (m_thread) m_thread->join(); if (m_thread) m_thread->join();
m_udp_socket.unsubscribe(this);
m_udp_socket.unsubscribe(&m_utp_socket_manager);
m_udp_socket.unsubscribe(&m_tracker_manager);
TORRENT_ASSERT(m_torrents.empty()); TORRENT_ASSERT(m_torrents.empty());
TORRENT_ASSERT(m_connections.empty()); TORRENT_ASSERT(m_connections.empty());
TORRENT_ASSERT(m_connections.empty()); TORRENT_ASSERT(m_connections.empty());

View File

@@ -274,9 +274,10 @@ namespace libtorrent
con->start(); con->start();
} }
bool tracker_manager::incoming_udp(error_code const& e bool tracker_manager::incoming_packet(error_code const& e
, udp::endpoint const& ep, char const* buf, int size) , udp::endpoint const& ep, char const* buf, int size)
{ {
// m_ses.m_stat.received_tracker_bytes(len + 28);
for (tracker_connections_t::iterator i = m_connections.begin(); for (tracker_connections_t::iterator i = m_connections.begin();
i != m_connections.end();) i != m_connections.end();)
{ {
@@ -288,9 +289,10 @@ namespace libtorrent
return false; return false;
} }
bool tracker_manager::incoming_udp(error_code const& e bool tracker_manager::incoming_packet(error_code const& e
, char const* hostname, char const* buf, int size) , char const* hostname, char const* buf, int size)
{ {
// m_ses.m_stat.received_tracker_bytes(len + 28);
for (tracker_connections_t::iterator i = m_connections.begin(); for (tracker_connections_t::iterator i = m_connections.begin();
i != m_connections.end();) i != m_connections.end();)
{ {

View File

@@ -53,14 +53,8 @@ POSSIBILITY OF SUCH DAMAGE.
using namespace libtorrent; using namespace libtorrent;
udp_socket::udp_socket(asio::io_service& ios udp_socket::udp_socket(asio::io_service& ios
, udp_socket::callback_t const& c
, udp_socket::callback2_t const& c2
, udp_socket::drain_callback_t const& dc
, connection_queue& cc) , connection_queue& cc)
: m_callback(c) : m_ipv4_sock(ios)
, m_callback2(c2)
, m_drained_callback(dc)
, m_ipv4_sock(ios)
, m_buf_size(0) , m_buf_size(0)
, m_buf(0) , m_buf(0)
#if TORRENT_USE_IPV6 #if TORRENT_USE_IPV6
@@ -101,7 +95,6 @@ udp_socket::~udp_socket()
#endif #endif
TORRENT_ASSERT_VAL(m_v4_outstanding == 0, m_v4_outstanding); TORRENT_ASSERT_VAL(m_v4_outstanding == 0, m_v4_outstanding);
TORRENT_ASSERT(m_magic == 0x1337); TORRENT_ASSERT(m_magic == 0x1337);
TORRENT_ASSERT(!m_callback || !m_started);
#if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS #if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS
m_magic = 0; m_magic = 0;
#endif #endif
@@ -150,21 +143,6 @@ void udp_socket::send_hostname(char const* hostname, int port
qp.flags = 0; qp.flags = 0;
} }
bool udp_socket::maybe_clear_callback()
{
if (m_outstanding_ops + m_v4_outstanding
#if TORRENT_USE_IPV6
+ m_v6_outstanding
#endif
== 0)
{
// "this" may be destructed in the callback
m_callback.clear();
return true;
}
return false;
}
void udp_socket::send(udp::endpoint const& ep, char const* p, int len void udp_socket::send(udp::endpoint const& ep, char const* p, int len
, error_code& ec, int flags) , error_code& ec, int flags)
{ {
@@ -230,14 +208,9 @@ void udp_socket::on_read(udp::socket* s)
--m_v4_outstanding; --m_v4_outstanding;
} }
if (m_abort) if (m_abort) return;
{
maybe_clear_callback();
return;
}
CHECK_MAGIC; CHECK_MAGIC;
if (!m_callback) return;
for (;;) for (;;)
{ {
@@ -247,10 +220,62 @@ void udp_socket::on_read(udp::socket* s)
if (ec == asio::error::would_block) break; if (ec == asio::error::would_block) break;
on_read_impl(s, ep, ec, bytes_transferred); on_read_impl(s, ep, ec, bytes_transferred);
} }
if (m_drained_callback) m_drained_callback(); call_drained_handler();
setup_read(s); setup_read(s);
} }
void udp_socket::call_handler(error_code const& ec, udp::endpoint const& ep, char const* buf, int size)
{
for (std::vector<udp_socket_observer*>::iterator i = m_observers.begin()
, end(m_observers.end()); i != end; ++i)
{
TORRENT_TRY {
if ((*i)->incoming_packet(ec, ep, buf, size))
break;
} TORRENT_CATCH (std::exception&) {}
}
}
void udp_socket::call_handler(error_code const& ec, const char* host, char const* buf, int size)
{
for (std::vector<udp_socket_observer*>::iterator i = m_observers.begin()
, end(m_observers.end()); i != end; ++i)
{
TORRENT_TRY {
if ((*i)->incoming_packet(ec, host, buf, size))
break;
} TORRENT_CATCH (std::exception&) {}
}
}
void udp_socket::subscribe(udp_socket_observer* o)
{
TORRENT_ASSERT(std::find(m_observers.begin(), m_observers.end(), o) == m_observers.end());
m_observers.push_back(o);
}
void udp_socket::unsubscribe(udp_socket_observer* o)
{
std::vector<udp_socket_observer*>::iterator i = std::find(m_observers.begin(), m_observers.end(), o);
if (i == m_observers.end()) return;
m_observers.erase(i);
}
void udp_socket::call_drained_handler()
{
for (std::vector<udp_socket_observer*>::iterator i = m_observers.begin()
, end(m_observers.end()); i != end; ++i)
{
TORRENT_TRY {
(*i)->socket_drained();
} TORRENT_CATCH (std::exception&) {}
}
}
void udp_socket::on_read_impl(udp::socket* s, udp::endpoint const& ep void udp_socket::on_read_impl(udp::socket* s, udp::endpoint const& ep
, error_code const& e, std::size_t bytes_transferred) , error_code const& e, std::size_t bytes_transferred)
{ {
@@ -259,16 +284,7 @@ void udp_socket::on_read_impl(udp::socket* s, udp::endpoint const& ep
if (e) if (e)
{ {
TORRENT_TRY { call_handler(e, ep, 0, 0);
#if TORRENT_USE_IPV6
if (s == &m_ipv6_sock)
m_callback(e, ep, 0, 0);
else
#endif
m_callback(e, ep, 0, 0);
} TORRENT_CATCH (std::exception&) {}
// don't stop listening on recoverable errors // don't stop listening on recoverable errors
if (e != asio::error::host_unreachable if (e != asio::error::host_unreachable
@@ -283,7 +299,6 @@ void udp_socket::on_read_impl(udp::socket* s, udp::endpoint const& ep
#endif #endif
&& e != asio::error::message_size) && e != asio::error::message_size)
{ {
maybe_clear_callback();
return; return;
} }
@@ -302,7 +317,7 @@ void udp_socket::on_read_impl(udp::socket* s, udp::endpoint const& ep
} }
else else
{ {
m_callback(e, ep, m_buf, bytes_transferred); call_handler(e, ep, m_buf, bytes_transferred);
} }
} TORRENT_CATCH (std::exception&) {} } TORRENT_CATCH (std::exception&) {}
@@ -421,11 +436,11 @@ void udp_socket::unwrap(error_code const& e, char const* buf, int size)
if (len > (buf + size) - p) return; if (len > (buf + size) - p) return;
std::string hostname(p, p + len); std::string hostname(p, p + len);
p += len; p += len;
m_callback2(e, hostname.c_str(), p, size - (p - buf)); call_handler(e, hostname.c_str(), p, size - (p - buf));
return; return;
} }
m_callback(e, sender, p, size - (p - buf)); call_handler(e, sender, p, size - (p - buf));
} }
#if !defined BOOST_ASIO_ENABLE_CANCELIO && defined TORRENT_WINDOWS #if !defined BOOST_ASIO_ENABLE_CANCELIO && defined TORRENT_WINDOWS
@@ -472,14 +487,9 @@ void udp_socket::close()
// ops counter for that // ops counter for that
TORRENT_ASSERT(m_outstanding_ops > 0); TORRENT_ASSERT(m_outstanding_ops > 0);
--m_outstanding_ops; --m_outstanding_ops;
if (m_abort) if (m_abort) return;
{
maybe_clear_callback();
return;
}
} }
maybe_clear_callback();
} }
void udp_socket::set_buf_size(int s) void udp_socket::set_buf_size(int s)
@@ -503,7 +513,7 @@ void udp_socket::set_buf_size(int s)
m_buf = 0; m_buf = 0;
m_buf_size = 0; m_buf_size = 0;
udp::endpoint ep; udp::endpoint ep;
if (m_callback) m_callback(error::no_memory, ep, 0, 0); call_handler(error::no_memory, ep, 0, 0);
close(); close();
} }
} }
@@ -625,11 +635,7 @@ void udp_socket::on_name_lookup(error_code const& e, tcp::resolver::iterator i)
TORRENT_ASSERT(m_outstanding_ops > 0); TORRENT_ASSERT(m_outstanding_ops > 0);
--m_outstanding_ops; --m_outstanding_ops;
if (m_abort) if (m_abort) return;
{
maybe_clear_callback();
return;
}
CHECK_MAGIC; CHECK_MAGIC;
if (e == asio::error::operation_aborted) return; if (e == asio::error::operation_aborted) return;
@@ -638,9 +644,7 @@ void udp_socket::on_name_lookup(error_code const& e, tcp::resolver::iterator i)
if (e) if (e)
{ {
TORRENT_TRY { call_handler(e, udp::endpoint(), 0, 0);
if (m_callback) m_callback(e, udp::endpoint(), 0, 0);
} TORRENT_CATCH (std::exception&) {}
return; return;
} }
@@ -662,11 +666,7 @@ void udp_socket::on_timeout()
{ {
TORRENT_ASSERT(m_outstanding_ops > 0); TORRENT_ASSERT(m_outstanding_ops > 0);
--m_outstanding_ops; --m_outstanding_ops;
if (m_abort) if (m_abort) return;
{
maybe_clear_callback();
return;
}
CHECK_MAGIC; CHECK_MAGIC;
TORRENT_ASSERT(is_single_thread()); TORRENT_ASSERT(is_single_thread());
@@ -680,11 +680,6 @@ void udp_socket::on_connect(int ticket)
TORRENT_ASSERT(is_single_thread()); TORRENT_ASSERT(is_single_thread());
TORRENT_ASSERT(m_outstanding_ops > 0); TORRENT_ASSERT(m_outstanding_ops > 0);
--m_outstanding_ops; --m_outstanding_ops;
if (m_abort)
{
maybe_clear_callback();
return;
}
CHECK_MAGIC; CHECK_MAGIC;
if (m_abort) return; if (m_abort) return;
@@ -715,11 +710,7 @@ void udp_socket::on_connected(error_code const& e)
#endif #endif
TORRENT_ASSERT(m_outstanding_ops > 0); TORRENT_ASSERT(m_outstanding_ops > 0);
--m_outstanding_ops; --m_outstanding_ops;
if (m_abort) if (m_abort) return;
{
maybe_clear_callback();
return;
}
CHECK_MAGIC; CHECK_MAGIC;
@@ -734,17 +725,11 @@ void udp_socket::on_connected(error_code const& e)
// ops counter for that // ops counter for that
TORRENT_ASSERT(m_outstanding_ops > 0); TORRENT_ASSERT(m_outstanding_ops > 0);
--m_outstanding_ops; --m_outstanding_ops;
if (m_abort) if (m_abort) return;
{
maybe_clear_callback();
return;
}
if (e) if (e)
{ {
TORRENT_TRY { call_handler(e, udp::endpoint(), 0, 0);
if (m_callback) m_callback(e, udp::endpoint(), 0, 0);
} TORRENT_CATCH (std::exception&) {}
return; return;
} }
@@ -781,11 +766,7 @@ void udp_socket::handshake1(error_code const& e)
#endif #endif
TORRENT_ASSERT(m_outstanding_ops > 0); TORRENT_ASSERT(m_outstanding_ops > 0);
--m_outstanding_ops; --m_outstanding_ops;
if (m_abort) if (m_abort) return;
{
maybe_clear_callback();
return;
}
CHECK_MAGIC; CHECK_MAGIC;
if (e) return; if (e) return;
@@ -807,11 +788,7 @@ void udp_socket::handshake2(error_code const& e)
#endif #endif
TORRENT_ASSERT(m_outstanding_ops > 0); TORRENT_ASSERT(m_outstanding_ops > 0);
--m_outstanding_ops; --m_outstanding_ops;
if (m_abort) if (m_abort) return;
{
maybe_clear_callback();
return;
}
CHECK_MAGIC; CHECK_MAGIC;
if (e) return; if (e) return;
@@ -869,11 +846,7 @@ void udp_socket::handshake3(error_code const& e)
#endif #endif
TORRENT_ASSERT(m_outstanding_ops > 0); TORRENT_ASSERT(m_outstanding_ops > 0);
--m_outstanding_ops; --m_outstanding_ops;
if (m_abort) if (m_abort) return;
{
maybe_clear_callback();
return;
}
CHECK_MAGIC; CHECK_MAGIC;
if (e) return; if (e) return;
@@ -895,11 +868,7 @@ void udp_socket::handshake4(error_code const& e)
#endif #endif
TORRENT_ASSERT(m_outstanding_ops > 0); TORRENT_ASSERT(m_outstanding_ops > 0);
--m_outstanding_ops; --m_outstanding_ops;
if (m_abort) if (m_abort) return;
{
maybe_clear_callback();
return;
}
CHECK_MAGIC; CHECK_MAGIC;
if (e) return; if (e) return;
@@ -958,11 +927,7 @@ void udp_socket::connect1(error_code const& e)
#endif #endif
TORRENT_ASSERT(m_outstanding_ops > 0); TORRENT_ASSERT(m_outstanding_ops > 0);
--m_outstanding_ops; --m_outstanding_ops;
if (m_abort) if (m_abort) return;
{
maybe_clear_callback();
return;
}
CHECK_MAGIC; CHECK_MAGIC;
if (e) return; if (e) return;
@@ -987,7 +952,6 @@ void udp_socket::connect2(error_code const& e)
if (m_abort) if (m_abort)
{ {
m_queue.clear(); m_queue.clear();
maybe_clear_callback();
return; return;
} }
@@ -1062,11 +1026,7 @@ void udp_socket::hung_up(error_code const& e)
#endif #endif
TORRENT_ASSERT(m_outstanding_ops > 0); TORRENT_ASSERT(m_outstanding_ops > 0);
--m_outstanding_ops; --m_outstanding_ops;
if (m_abort) if (m_abort) return;
{
maybe_clear_callback();
return;
}
CHECK_MAGIC; CHECK_MAGIC;
TORRENT_ASSERT(is_single_thread()); TORRENT_ASSERT(is_single_thread());
@@ -1078,11 +1038,8 @@ void udp_socket::hung_up(error_code const& e)
} }
rate_limited_udp_socket::rate_limited_udp_socket(io_service& ios rate_limited_udp_socket::rate_limited_udp_socket(io_service& ios
, callback_t const& c
, callback2_t const& c2
, drain_callback_t const& dc
, connection_queue& cc) , connection_queue& cc)
: udp_socket(ios, c, c2, dc, cc) : udp_socket(ios, cc)
, m_timer(ios) , m_timer(ios)
, m_queue_size_limit(200) , m_queue_size_limit(200)
, m_rate_limit(4000) , m_rate_limit(4000)

View File

@@ -199,7 +199,8 @@ namespace libtorrent
return m_sock.local_endpoint(ec); return m_sock.local_endpoint(ec);
} }
bool utp_socket_manager::incoming_packet(char const* p, int size, udp::endpoint const& ep) bool utp_socket_manager::incoming_packet(error_code const& ec, udp::endpoint const& ep
, char const* p, int size)
{ {
// UTP_LOGV("incoming packet size:%d\n", size); // UTP_LOGV("incoming packet size:%d\n", size);