diff --git a/include/libtorrent/http_stream.hpp b/include/libtorrent/http_stream.hpp index a7139c4ee..2bd124b43 100644 --- a/include/libtorrent/http_stream.hpp +++ b/include/libtorrent/http_stream.hpp @@ -44,6 +44,30 @@ public: m_sock.async_read_some(buffers, handler); } + template + std::size_t read_some(Mutable_Buffers const& buffers, asio::error_code& ec) + { + return m_sock.read_some(buffers, ec); + } + + template + std::size_t read_some(Mutable_Buffers const& buffers) + { + return m_sock.read_some(buffers); + } + + template + void io_control(IO_Control_Command& ioc) + { + m_sock.io_control(ioc); + } + + template + void io_control(IO_Control_Command& ioc, asio::error_code& ec) + { + m_sock.io_control(ioc, ec); + } + template void async_write_some(Const_Buffers const& buffers, Handler const& handler) { diff --git a/include/libtorrent/socks5_stream.hpp b/include/libtorrent/socks5_stream.hpp index 01cddab7e..9e8a0d04b 100644 --- a/include/libtorrent/socks5_stream.hpp +++ b/include/libtorrent/socks5_stream.hpp @@ -41,6 +41,30 @@ public: m_sock.async_read_some(buffers, handler); } + template + std::size_t read_some(Mutable_Buffers const& buffers, asio::error_code& ec) + { + return m_sock.read_some(buffers, ec); + } + + template + std::size_t read_some(Mutable_Buffers const& buffers) + { + return m_sock.read_some(buffers); + } + + template + void io_control(IO_Control_Command& ioc) + { + m_sock.io_control(ioc); + } + + template + void io_control(IO_Control_Command& ioc, asio::error_code& ec) + { + m_sock.io_control(ioc, ec); + } + template void async_write_some(Const_Buffers const& buffers, Handler const& handler) { diff --git a/include/libtorrent/variant_stream.hpp b/include/libtorrent/variant_stream.hpp index 784eefbb0..8f9888519 100644 --- a/include/libtorrent/variant_stream.hpp +++ b/include/libtorrent/variant_stream.hpp @@ -43,6 +43,45 @@ namespace aux {} }; +// -------------- io_control ----------- + + template + struct io_control_visitor_ec: boost::static_visitor<> + { + io_control_visitor_ec(IO_Control_Command& ioc, asio::error_code& ec) + : ioc(ioc), ec(ec) {} + + template + void operator()(T* p) const + { + p->io_control(ioc, ec); + } + + void operator()(boost::blank) const + {} + + IO_Control_Command& ioc; + asio::error_code& ec; + }; + + template + struct io_control_visitor + : boost::static_visitor<> + { + io_control_visitor(IO_Control_Command& ioc) + : ioc(ioc) {} + + template + void operator()(T* p) const + { + p->io_control(ioc); + } + + void operator()(boost::blank) const + {} + + IO_Control_Command& ioc; + }; // -------------- async_connect ----------- template @@ -294,6 +333,46 @@ namespace aux Handler const& handler; }; +// -------------- read_some ----------- + + template + struct read_some_visitor + : boost::static_visitor + { + read_some_visitor(Mutable_Buffers const& buffers) + : buffers(buffers) + {} + + template + std::size_t operator()(T* p) const + { return p->read_some(buffers); } + + std::size_t operator()(boost::blank) const + { return 0; } + + Mutable_Buffers const& buffers; + }; + + template + struct read_some_visitor_ec + : boost::static_visitor + { + read_some_visitor_ec(Mutable_Buffers const& buffers, asio::error_code& ec) + : buffers(buffers) + , ec(ec) + {} + + template + std::size_t operator()(T* p) const + { return p->read_some(buffers, ec); } + + std::size_t operator()(boost::blank) const + { return 0; } + + Mutable_Buffers const& buffers; + asio::error_code& ec; + }; + // -------------- async_write_some ----------- template @@ -452,6 +531,26 @@ public: boost::apply_visitor(aux::delete_visitor(), m_variant); } + template + std::size_t read_some(Mutable_Buffers const& buffers, asio::error_code& ec) + { + assert(instantiated()); + return boost::apply_visitor( + aux::read_some_visitor_ec(buffers, ec) + , m_variant + ); + } + + template + std::size_t read_some(Mutable_Buffers const& buffers) + { + assert(instantiated()); + return boost::apply_visitor( + aux::read_some_visitor(buffers) + , m_variant + ); + } + template void async_read_some(Mutable_Buffers const& buffers, Handler const& handler) { @@ -481,6 +580,25 @@ public: ); } + template + void io_control(IO_Control_Command& ioc) + { + assert(instantiated()); + boost::apply_visitor( + aux::io_control_visitor(ioc), m_variant + ); + } + + template + void io_control(IO_Control_Command& ioc, asio::error_code& ec) + { + assert(instantiated()); + boost::apply_visitor( + aux::io_control_visitor_ec(ioc, ec) + , m_variant + ); + } + void bind(endpoint_type const& endpoint) { assert(instantiated()); diff --git a/src/peer_connection.cpp b/src/peer_connection.cpp index 0d33acff9..14a7faddc 100755 --- a/src/peer_connection.cpp +++ b/src/peer_connection.cpp @@ -74,6 +74,7 @@ namespace libtorrent delete c; } + // outbound connection peer_connection::peer_connection( session_impl& ses , boost::weak_ptr tor @@ -147,6 +148,7 @@ namespace libtorrent init(); } + // incoming connection peer_connection::peer_connection( session_impl& ses , boost::shared_ptr s @@ -198,6 +200,8 @@ namespace libtorrent , m_in_constructor(true) #endif { + tcp::socket::non_blocking_io ioc(true); + m_socket->io_control(ioc); #ifndef TORRENT_DISABLE_RESOLVE_COUNTRIES std::fill(m_country, m_country + 2, 0); #endif @@ -2004,7 +2008,6 @@ namespace libtorrent && !m_connecting && t) { - assert(t); if (m_bandwidth_limit[download_channel].max_assignable() > 0) { #ifdef TORRENT_VERBOSE_LOGGING @@ -2076,7 +2079,6 @@ namespace libtorrent bool m_cond; }; - // -------------------------- // RECEIVE DATA // -------------------------- @@ -2091,8 +2093,6 @@ namespace libtorrent assert(m_reading); m_reading = false; - // correct the dl quota usage, if not all of the buffer was actually read - m_bandwidth_limit[download_channel].use_quota(bytes_transferred); if (error) { @@ -2103,28 +2103,46 @@ namespace libtorrent throw std::runtime_error(error.message()); } - if (m_disconnecting) return; + do + { + // correct the dl quota usage, if not all of the buffer was actually read + m_bandwidth_limit[download_channel].use_quota(bytes_transferred); + + if (m_disconnecting) return; - assert(m_packet_size > 0); - assert(bytes_transferred > 0); + assert(m_packet_size > 0); + assert(bytes_transferred > 0); - m_last_receive = time_now(); - m_recv_pos += bytes_transferred; - assert(m_recv_pos <= int(m_recv_buffer.size())); + m_last_receive = time_now(); + m_recv_pos += bytes_transferred; + assert(m_recv_pos <= int(m_recv_buffer.size())); - { - INVARIANT_CHECK; - on_receive(error, bytes_transferred); - } + { + INVARIANT_CHECK; + on_receive(error, bytes_transferred); + } - assert(m_packet_size > 0); + assert(m_packet_size > 0); - if (m_peer_choked - && m_recv_pos == 0 - && (m_recv_buffer.capacity() - m_packet_size) > 128) - { - std::vector(m_packet_size).swap(m_recv_buffer); + if (m_peer_choked + && m_recv_pos == 0 + && (m_recv_buffer.capacity() - m_packet_size) > 128) + { + std::vector(m_packet_size).swap(m_recv_buffer); + } + + if (m_bandwidth_limit[download_channel].quota_left() == 0) break; + + int max_receive = std::min( + m_bandwidth_limit[download_channel].quota_left() + , m_packet_size - m_recv_pos); + asio::error_code ec; + bytes_transferred = m_socket->read_some(asio::buffer(&m_recv_buffer[m_recv_pos] + , max_receive), ec); + if (ec && ec != asio::error::would_block) + throw asio::system_error(ec); } + while (bytes_transferred > 0); setup_receive(); } @@ -2195,6 +2213,11 @@ namespace libtorrent m_queued = false; assert(m_connecting); m_socket->open(t->get_interface().protocol()); + + // set the socket to non-blocking, so that we can + // read the entire buffer on each read event we get + tcp::socket::non_blocking_io ioc(true); + m_socket->io_control(ioc); m_socket->bind(t->get_interface()); m_socket->async_connect(m_remote , bind(&peer_connection::on_connection_complete, self(), _1)); diff --git a/src/torrent.cpp b/src/torrent.cpp index 36650eb4f..219215d29 100755 --- a/src/torrent.cpp +++ b/src/torrent.cpp @@ -1557,6 +1557,9 @@ namespace libtorrent } catch (std::exception& exc) { +#ifndef NDEBUG + std::cerr << exc.what() << std::endl; +#endif assert(false); };