fix uTP edge case where udp socket buffer fills up

This commit is contained in:
Arvid Norberg
2012-07-01 18:44:46 +00:00
parent 96aa1f162b
commit bd4f173bc5
7 changed files with 186 additions and 15 deletions

View File

@@ -74,6 +74,10 @@ udp_socket::udp_socket(asio::io_service& ios
, m_tunnel_packets(false)
, m_abort(false)
, m_outstanding_ops(0)
#if TORRENT_USE_IPV6
, m_v6_write_subscribed(false)
#endif
, m_v4_write_subscribed(false)
{
#if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS
m_magic = 0x1337;
@@ -178,13 +182,47 @@ void udp_socket::send(udp::endpoint const& ep, char const* p, int len
}
#if TORRENT_USE_IPV6
if (ep.address().is_v4() && m_ipv4_sock.is_open())
if (ep.address().is_v6() && m_ipv6_sock.is_open())
m_ipv6_sock.send_to(asio::buffer(p, len), ep, 0, ec);
else
#endif
m_ipv4_sock.send_to(asio::buffer(p, len), ep, 0, ec);
if (ec == error::would_block)
{
#if TORRENT_USE_IPV6
else
m_ipv6_sock.send_to(asio::buffer(p, len), ep, 0, ec);
if (ep.address().is_v6() && m_ipv6_sock.is_open())
{
if (!m_v6_write_subscribed)
{
m_ipv6_sock.async_send(asio::null_buffers()
, boost::bind(&udp_socket::on_writable, this, _1, &m_ipv6_sock));
m_v6_write_subscribed = true;
}
}
else
#endif
{
if (!m_v4_write_subscribed)
{
m_ipv4_sock.async_send(asio::null_buffers()
, boost::bind(&udp_socket::on_writable, this, _1, &m_ipv4_sock));
m_v4_write_subscribed = true;
}
}
}
}
void udp_socket::on_writable(error_code const& ec, udp::socket* s)
{
#if TORRENT_USE_IPV6
if (s == &m_ipv6_sock)
m_v6_write_subscribed = false;
else
#endif
m_v4_write_subscribed = false;
call_writable_handler();
}
// called whenever the socket is readable
@@ -290,6 +328,26 @@ void udp_socket::call_drained_handler()
m_observers_locked = false;
}
void udp_socket::call_writable_handler()
{
m_observers_locked = true;
for (std::vector<udp_socket_observer*>::iterator i = m_observers.begin();
i != m_observers.end();)
{
TORRENT_TRY {
(*i)->writable();
} TORRENT_CATCH (std::exception&) {}
if (*i == NULL) i = m_observers.erase(i);
else ++i;
}
if (!m_added_observers.empty())
{
m_observers.insert(m_observers.end(), m_added_observers.begin(), m_added_observers.end());
m_added_observers.clear();
}
m_observers_locked = false;
}
void udp_socket::subscribe(udp_socket_observer* o)
{
TORRENT_ASSERT(std::find(m_observers.begin(), m_observers.end(), o) == m_observers.end());