remove uTP delayed ack and instead send acks when the udp socket has been drained. simplify the udp socket to use null_buffers and allocate less memory for buffers. this also eliminated the race condition when resizing the udp socket receive buffer which greatly simplified it
This commit is contained in:
@@ -55,18 +55,16 @@ using namespace libtorrent;
|
||||
udp_socket::udp_socket(asio::io_service& ios
|
||||
, udp_socket::callback_t const& c
|
||||
, udp_socket::callback2_t const& c2
|
||||
, udp_socket::drain_callback_t const& dc
|
||||
, connection_queue& cc)
|
||||
: m_callback(c)
|
||||
, m_callback2(c2)
|
||||
, m_drained_callback(dc)
|
||||
, m_ipv4_sock(ios)
|
||||
, m_v4_buf_size(0)
|
||||
, m_v4_buf(0)
|
||||
, m_reallocate_buffer4(false)
|
||||
, m_buf_size(0)
|
||||
, m_buf(0)
|
||||
#if TORRENT_USE_IPV6
|
||||
, m_ipv6_sock(ios)
|
||||
, m_v6_buf_size(0)
|
||||
, m_v6_buf(0)
|
||||
, m_reallocate_buffer6(false)
|
||||
#endif
|
||||
, m_bind_port(0)
|
||||
, m_v4_outstanding(0)
|
||||
@@ -91,19 +89,14 @@ udp_socket::udp_socket(asio::io_service& ios
|
||||
#endif
|
||||
#endif
|
||||
|
||||
m_v4_buf_size = 2000;
|
||||
m_v4_buf = (char*)malloc(m_v4_buf_size);
|
||||
#if TORRENT_USE_IPV6
|
||||
m_v6_buf_size = 2000;
|
||||
m_v6_buf = (char*)malloc(m_v6_buf_size);
|
||||
#endif
|
||||
m_buf_size = 2000;
|
||||
m_buf = (char*)malloc(m_buf_size);
|
||||
}
|
||||
|
||||
udp_socket::~udp_socket()
|
||||
{
|
||||
free(m_v4_buf);
|
||||
free(m_buf);
|
||||
#if TORRENT_USE_IPV6
|
||||
free(m_v6_buf);
|
||||
TORRENT_ASSERT_VAL(m_v6_outstanding == 0, m_v6_outstanding);
|
||||
#endif
|
||||
TORRENT_ASSERT_VAL(m_v4_outstanding == 0, m_v4_outstanding);
|
||||
@@ -214,45 +207,8 @@ void udp_socket::send(udp::endpoint const& ep, char const* p, int len
|
||||
#endif
|
||||
}
|
||||
|
||||
void udp_socket::maybe_realloc_buffers(int which)
|
||||
{
|
||||
TORRENT_ASSERT(is_single_thread());
|
||||
bool no_mem = false;
|
||||
if (m_reallocate_buffer4 && (which & 1) && m_v4_outstanding == 0)
|
||||
{
|
||||
TORRENT_ASSERT(m_v4_outstanding == 0);
|
||||
void* tmp = realloc(m_v4_buf, m_v4_buf_size);
|
||||
if (tmp != 0) m_v4_buf = (char*)tmp;
|
||||
else no_mem = true;
|
||||
m_reallocate_buffer4 = false;
|
||||
}
|
||||
#if TORRENT_USE_IPV6
|
||||
if (m_reallocate_buffer6 && (which & 2) && m_v6_outstanding == 0)
|
||||
{
|
||||
TORRENT_ASSERT(m_v6_outstanding == 0);
|
||||
void* tmp = realloc(m_v6_buf, m_v6_buf_size);
|
||||
if (tmp != 0) m_v6_buf = (char*)tmp;
|
||||
else no_mem = true;
|
||||
m_reallocate_buffer6 = false;
|
||||
}
|
||||
#endif
|
||||
|
||||
if (no_mem)
|
||||
{
|
||||
free(m_v4_buf);
|
||||
m_v4_buf = 0;
|
||||
m_v4_buf_size = 0;
|
||||
#if TORRENT_USE_IPV6
|
||||
free(m_v6_buf);
|
||||
m_v6_buf = 0;
|
||||
m_v6_buf_size = 0;
|
||||
#endif
|
||||
if (m_callback) m_callback(error::no_memory, m_v4_ep, 0, 0);
|
||||
close();
|
||||
}
|
||||
}
|
||||
|
||||
void udp_socket::on_read(udp::socket* s, error_code const& e, std::size_t bytes_transferred)
|
||||
// called whenever the socket is readable
|
||||
void udp_socket::on_read(udp::socket* s)
|
||||
{
|
||||
#if defined TORRENT_ASIO_DEBUGGING
|
||||
complete_async("udp_socket::on_read");
|
||||
@@ -283,16 +239,34 @@ void udp_socket::on_read(udp::socket* s, error_code const& e, std::size_t bytes_
|
||||
CHECK_MAGIC;
|
||||
if (!m_callback) return;
|
||||
|
||||
for (;;)
|
||||
{
|
||||
error_code ec;
|
||||
udp::endpoint ep;
|
||||
size_t bytes_transferred = s->receive_from(asio::buffer(m_buf, m_buf_size), ep, 0, ec);
|
||||
if (ec == asio::error::would_block) break;
|
||||
on_read_impl(s, ep, ec, bytes_transferred);
|
||||
}
|
||||
if (m_drained_callback) m_drained_callback();
|
||||
setup_read(s);
|
||||
}
|
||||
|
||||
void udp_socket::on_read_impl(udp::socket* s, udp::endpoint const& ep
|
||||
, error_code const& e, std::size_t bytes_transferred)
|
||||
{
|
||||
TORRENT_ASSERT(m_magic == 0x1337);
|
||||
TORRENT_ASSERT(is_single_thread());
|
||||
|
||||
if (e)
|
||||
{
|
||||
TORRENT_TRY {
|
||||
|
||||
#if TORRENT_USE_IPV6
|
||||
if (s == &m_ipv6_sock)
|
||||
m_callback(e, m_v6_ep, 0, 0);
|
||||
m_callback(e, ep, 0, 0);
|
||||
else
|
||||
#endif
|
||||
m_callback(e, m_v4_ep, 0, 0);
|
||||
m_callback(e, ep, 0, 0);
|
||||
|
||||
} TORRENT_CATCH (std::exception&) {}
|
||||
|
||||
@@ -315,106 +289,42 @@ void udp_socket::on_read(udp::socket* s, error_code const& e, std::size_t bytes_
|
||||
|
||||
if (m_abort) return;
|
||||
|
||||
#if defined TORRENT_ASIO_DEBUGGING
|
||||
add_outstanding_async("udp_socket::on_read");
|
||||
#endif
|
||||
#if TORRENT_USE_IPV6
|
||||
if (s == &m_ipv6_sock && num_outstanding() == 0)
|
||||
{
|
||||
maybe_realloc_buffers(2);
|
||||
if (m_abort) return;
|
||||
++m_v6_outstanding;
|
||||
s->async_receive_from(asio::buffer(m_v6_buf, m_v6_buf_size)
|
||||
, m_v6_ep, boost::bind(&udp_socket::on_read, this, s, _1, _2));
|
||||
}
|
||||
else
|
||||
#endif
|
||||
if (m_v4_outstanding == 0)
|
||||
{
|
||||
maybe_realloc_buffers(1);
|
||||
if (m_abort) return;
|
||||
++m_v4_outstanding;
|
||||
s->async_receive_from(asio::buffer(m_v4_buf, m_v4_buf_size)
|
||||
, m_v4_ep, boost::bind(&udp_socket::on_read, this, s, _1, _2));
|
||||
}
|
||||
|
||||
#ifdef TORRENT_DEBUG
|
||||
m_started = true;
|
||||
#endif
|
||||
return;
|
||||
}
|
||||
|
||||
TORRENT_TRY {
|
||||
|
||||
if (m_tunnel_packets)
|
||||
{
|
||||
// if the source IP doesn't match the proxy's, ignore the packet
|
||||
if (ep == m_proxy_addr)
|
||||
unwrap(e, m_buf, bytes_transferred);
|
||||
}
|
||||
else
|
||||
{
|
||||
m_callback(e, ep, m_buf, bytes_transferred);
|
||||
}
|
||||
|
||||
} TORRENT_CATCH (std::exception&) {}
|
||||
}
|
||||
|
||||
void udp_socket::setup_read(udp::socket* s)
|
||||
{
|
||||
if (m_abort) return;
|
||||
|
||||
#if TORRENT_USE_IPV6
|
||||
if (s == &m_ipv6_sock)
|
||||
{
|
||||
TORRENT_TRY {
|
||||
|
||||
if (m_tunnel_packets)
|
||||
{
|
||||
// if the source IP doesn't match the proxy's, ignore the packet
|
||||
if (m_v6_ep == m_proxy_addr)
|
||||
unwrap(e, m_v6_buf, bytes_transferred);
|
||||
}
|
||||
else
|
||||
{
|
||||
m_callback(e, m_v6_ep, m_v6_buf, bytes_transferred);
|
||||
}
|
||||
|
||||
} TORRENT_CATCH (std::exception&) {}
|
||||
|
||||
if (m_abort) return;
|
||||
|
||||
if (num_outstanding() == 0)
|
||||
{
|
||||
maybe_realloc_buffers(2);
|
||||
if (m_abort) return;
|
||||
|
||||
#if defined TORRENT_ASIO_DEBUGGING
|
||||
add_outstanding_async("udp_socket::on_read");
|
||||
#endif
|
||||
++m_v6_outstanding;
|
||||
s->async_receive_from(asio::buffer(m_v6_buf, m_v6_buf_size)
|
||||
, m_v6_ep, boost::bind(&udp_socket::on_read, this, s, _1, _2));
|
||||
}
|
||||
}
|
||||
++m_v6_outstanding;
|
||||
else
|
||||
#endif // TORRENT_USE_IPV6
|
||||
{
|
||||
|
||||
TORRENT_TRY {
|
||||
|
||||
if (m_tunnel_packets)
|
||||
{
|
||||
// if the source IP doesn't match the proxy's, ignore the packet
|
||||
if (m_v4_ep == m_proxy_addr)
|
||||
unwrap(e, m_v4_buf, bytes_transferred);
|
||||
}
|
||||
else
|
||||
{
|
||||
m_callback(e, m_v4_ep, m_v4_buf, bytes_transferred);
|
||||
}
|
||||
|
||||
} TORRENT_CATCH (std::exception&) {}
|
||||
|
||||
if (m_abort) return;
|
||||
|
||||
if (m_v4_outstanding == 0)
|
||||
{
|
||||
maybe_realloc_buffers(1);
|
||||
if (m_abort) return;
|
||||
#endif
|
||||
++m_v4_outstanding;
|
||||
|
||||
#if defined TORRENT_ASIO_DEBUGGING
|
||||
add_outstanding_async("udp_socket::on_read");
|
||||
#endif
|
||||
++m_v4_outstanding;
|
||||
s->async_receive_from(asio::buffer(m_v4_buf, m_v4_buf_size)
|
||||
, m_v4_ep, boost::bind(&udp_socket::on_read, this, s, _1, _2));
|
||||
}
|
||||
}
|
||||
|
||||
#ifdef TORRENT_DEBUG
|
||||
m_started = true;
|
||||
add_outstanding_async("udp_socket::on_read");
|
||||
#endif
|
||||
udp::endpoint ep;
|
||||
s->async_receive_from(asio::null_buffers()
|
||||
, ep, boost::bind(&udp_socket::on_read, this, s));
|
||||
}
|
||||
|
||||
void udp_socket::wrap(udp::endpoint const& ep, char const* p, int len, error_code& ec)
|
||||
@@ -575,14 +485,26 @@ void udp_socket::close()
|
||||
void udp_socket::set_buf_size(int s)
|
||||
{
|
||||
TORRENT_ASSERT(is_single_thread());
|
||||
if (s > m_v4_buf_size)
|
||||
bool no_mem = false;
|
||||
void* tmp = realloc(m_buf, s);
|
||||
if (tmp != 0)
|
||||
{
|
||||
m_v4_buf_size = s;
|
||||
m_reallocate_buffer4 = true;
|
||||
#if TORRENT_USE_IPV6
|
||||
m_v6_buf_size = s;
|
||||
m_reallocate_buffer6 = true;
|
||||
#endif
|
||||
m_buf = (char*)tmp;
|
||||
m_buf_size = s;
|
||||
}
|
||||
else
|
||||
{
|
||||
no_mem = true;
|
||||
}
|
||||
|
||||
if (no_mem)
|
||||
{
|
||||
free(m_buf);
|
||||
m_buf = 0;
|
||||
m_buf_size = 0;
|
||||
udp::endpoint ep;
|
||||
if (m_callback) m_callback(error::no_memory, ep, 0, 0);
|
||||
close();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -605,18 +527,11 @@ void udp_socket::bind(udp::endpoint const& ep, error_code& ec)
|
||||
if (ec) return;
|
||||
m_ipv4_sock.bind(ep, ec);
|
||||
if (ec) return;
|
||||
udp::socket::non_blocking_io ioc(true);
|
||||
m_ipv4_sock.io_control(ioc, ec);
|
||||
if (ec) return;
|
||||
if (m_v4_outstanding == 0)
|
||||
{
|
||||
maybe_realloc_buffers(1);
|
||||
if (m_abort) return;
|
||||
#if defined TORRENT_ASIO_DEBUGGING
|
||||
add_outstanding_async("udp_socket::on_read");
|
||||
#endif
|
||||
++m_v4_outstanding;
|
||||
m_ipv4_sock.async_receive_from(asio::buffer(m_v4_buf, m_v4_buf_size)
|
||||
, m_v4_ep, boost::bind(&udp_socket::on_read, this, &m_ipv4_sock
|
||||
, _1, _2));
|
||||
}
|
||||
setup_read(&m_ipv4_sock);
|
||||
}
|
||||
#if TORRENT_USE_IPV6
|
||||
else
|
||||
@@ -625,18 +540,11 @@ void udp_socket::bind(udp::endpoint const& ep, error_code& ec)
|
||||
if (ec) return;
|
||||
m_ipv6_sock.bind(ep, ec);
|
||||
if (ec) return;
|
||||
udp::socket::non_blocking_io ioc(true);
|
||||
m_ipv6_sock.io_control(ioc, ec);
|
||||
if (ec) return;
|
||||
if (m_v6_outstanding == 0)
|
||||
{
|
||||
maybe_realloc_buffers(2);
|
||||
if (m_abort) return;
|
||||
#if defined TORRENT_ASIO_DEBUGGING
|
||||
add_outstanding_async("udp_socket::on_read");
|
||||
#endif
|
||||
++m_v6_outstanding;
|
||||
m_ipv6_sock.async_receive_from(asio::buffer(m_v6_buf, m_v6_buf_size)
|
||||
, m_v6_ep, boost::bind(&udp_socket::on_read, this, &m_ipv6_sock
|
||||
, _1, _2));
|
||||
}
|
||||
setup_read(&m_ipv6_sock);
|
||||
}
|
||||
#endif
|
||||
#ifdef TORRENT_DEBUG
|
||||
@@ -660,41 +568,24 @@ void udp_socket::bind(int port)
|
||||
if (m_ipv6_sock.is_open()) m_ipv6_sock.close(ec);
|
||||
#endif
|
||||
|
||||
maybe_realloc_buffers();
|
||||
if (m_abort) return;
|
||||
|
||||
m_ipv4_sock.open(udp::v4(), ec);
|
||||
if (!ec)
|
||||
{
|
||||
#if defined TORRENT_ASIO_DEBUGGING
|
||||
add_outstanding_async("udp_socket::on_read");
|
||||
#endif
|
||||
m_ipv4_sock.bind(udp::endpoint(address_v4::any(), port), ec);
|
||||
if (m_v4_outstanding == 0)
|
||||
{
|
||||
++m_v4_outstanding;
|
||||
m_ipv4_sock.async_receive_from(asio::buffer(m_v4_buf, m_v4_buf_size)
|
||||
, m_v4_ep, boost::bind(&udp_socket::on_read, this, &m_ipv4_sock
|
||||
, _1, _2));
|
||||
}
|
||||
setup_read(&m_ipv4_sock);
|
||||
}
|
||||
#if TORRENT_USE_IPV6
|
||||
m_ipv6_sock.open(udp::v6(), ec);
|
||||
if (!ec)
|
||||
{
|
||||
#if defined TORRENT_ASIO_DEBUGGING
|
||||
add_outstanding_async("udp_socket::on_read");
|
||||
#endif
|
||||
m_ipv6_sock.set_option(v6only(true), ec);
|
||||
m_ipv6_sock.bind(udp::endpoint(address_v6::any(), port), ec);
|
||||
|
||||
if (m_v6_outstanding == 0)
|
||||
{
|
||||
++m_v6_outstanding;
|
||||
m_ipv6_sock.async_receive_from(asio::buffer(m_v6_buf, m_v6_buf_size)
|
||||
, m_v6_ep, boost::bind(&udp_socket::on_read, this, &m_ipv6_sock
|
||||
, _1, _2));
|
||||
}
|
||||
setup_read(&m_ipv6_sock);
|
||||
}
|
||||
#endif // TORRENT_USE_IPV6
|
||||
|
||||
@@ -1189,8 +1080,9 @@ void udp_socket::hung_up(error_code const& e)
|
||||
rate_limited_udp_socket::rate_limited_udp_socket(io_service& ios
|
||||
, callback_t const& c
|
||||
, callback2_t const& c2
|
||||
, drain_callback_t const& dc
|
||||
, connection_queue& cc)
|
||||
: udp_socket(ios, c, c2, cc)
|
||||
: udp_socket(ios, c, c2, dc, cc)
|
||||
, m_timer(ios)
|
||||
, m_queue_size_limit(200)
|
||||
, m_rate_limit(4000)
|
||||
|
Reference in New Issue
Block a user