From beee914277e975348187bd533724be6cec1273c1 Mon Sep 17 00:00:00 2001 From: Arvid Norberg Date: Tue, 27 Sep 2005 08:07:24 +0000 Subject: [PATCH] separated the queue of blocks to be requested from peers and those allocated for peers. This sets a hard limit on the remote request queue size and works better with BitComet --- include/libtorrent/peer_connection.hpp | 7 ++ include/libtorrent/policy.hpp | 12 ++ src/peer_connection.cpp | 151 +++++++++++++++++-------- src/policy.cpp | 17 +-- 4 files changed, 125 insertions(+), 62 deletions(-) diff --git a/include/libtorrent/peer_connection.hpp b/include/libtorrent/peer_connection.hpp index 73b628089..38a4a8d64 100755 --- a/include/libtorrent/peer_connection.hpp +++ b/include/libtorrent/peer_connection.hpp @@ -139,6 +139,7 @@ namespace libtorrent bool has_piece(int i) const; const std::deque& download_queue() const; + const std::deque& request_queue() const; const std::deque& upload_queue() const; // returns the block currently being @@ -283,6 +284,7 @@ namespace libtorrent private: + void send_block_requests(); bool dispatch_message(int received); // if we don't have all metadata @@ -324,6 +326,7 @@ namespace libtorrent msg_request, msg_piece, msg_cancel, + msg_dht_port, // extension protocol message msg_extension_list = 20, msg_extended, @@ -451,6 +454,10 @@ namespace libtorrent // the peer std::vector m_announce_queue; + // the blocks we have reserved in the piece + // picker and will send to this peer. + std::deque m_request_queue; + // the queue of blocks we have requested // from this peer std::deque m_download_queue; diff --git a/include/libtorrent/policy.hpp b/include/libtorrent/policy.hpp index 7c5720330..c980b292f 100755 --- a/include/libtorrent/policy.hpp +++ b/include/libtorrent/policy.hpp @@ -59,6 +59,18 @@ namespace libtorrent class address; class peer_connection; + enum + { + // the limits of the download queue size + max_request_queue = 48, + min_request_queue = 2, + + // the amount of free upload allowed before + // the peer is choked + free_upload_amount = 4 * 16 * 1024 + }; + + class policy { public: diff --git a/src/peer_connection.cpp b/src/peer_connection.cpp index c4f214dc5..8ea1b8047 100755 --- a/src/peer_connection.cpp +++ b/src/peer_connection.cpp @@ -386,12 +386,17 @@ namespace libtorrent return m_have_piece[i]; } - const std::deque& peer_connection::download_queue() const + std::deque const& peer_connection::request_queue() const + { + return m_request_queue; + } + + std::deque const& peer_connection::download_queue() const { return m_download_queue; } - const std::deque& peer_connection::upload_queue() const + std::deque const& peer_connection::upload_queue() const { return m_requests; } @@ -401,7 +406,7 @@ namespace libtorrent m_statistics.add_stat(downloaded, uploaded); } - const std::vector& peer_connection::get_bitfield() const + std::vector const& peer_connection::get_bitfield() const { return m_have_piece; } @@ -584,7 +589,16 @@ namespace libtorrent { m_torrent->picker().abort_download(*i); } + for (std::deque::const_iterator i = m_request_queue.begin() + , end(m_request_queue.end()); i != end; ++i) + { + // since this piece was skipped, clear it and allow it to + // be requested from other peers + m_torrent->picker().abort_download(*i); + } m_download_queue.clear(); + m_request_queue.clear(); + #ifndef NDEBUG // m_torrent->picker().integrity_check(m_torrent); #endif @@ -1011,6 +1025,7 @@ namespace libtorrent // skipped blocks. m_download_queue.erase(m_download_queue.begin() , boost::next(b)); + send_block_requests(); } else { @@ -1480,7 +1495,7 @@ namespace libtorrent assert(it != m_download_queue.end()); m_download_queue.erase(it); - + send_block_requests(); int block_offset = block.block_index * m_torrent->block_size(); int block_size @@ -1515,6 +1530,76 @@ namespace libtorrent send_buffer_updated(); } + void peer_connection::send_block_requests() + { + // TODO: calculate the desired request queue each tick instead. + // TODO: make this constant user-settable + const int queue_time = 3; // seconds + // (if the latency is more than this, the download will stall) + // so, the queue size is 5 * down_rate / 16 kiB (16 kB is the size of each request) + // the minimum request size is 2 and the maximum is 48 + // the block size doesn't have to be 16. So we first query the torrent for it + const int block_size = m_torrent->block_size(); + assert(block_size > 0); + + int desired_queue_size = static_cast(queue_time + * statistics().download_rate() / block_size); + if (desired_queue_size > max_request_queue) desired_queue_size = max_request_queue; + if (desired_queue_size < min_request_queue) desired_queue_size = min_request_queue; + + if ((int)m_download_queue.size() >= desired_queue_size) return; + + while (!m_request_queue.empty() + && (int)m_download_queue.size() < desired_queue_size) + { + piece_block block = m_request_queue.front(); + m_request_queue.pop_front(); + m_download_queue.push_back(block); + + int block_offset = block.block_index * m_torrent->block_size(); + int block_size + = std::min((int)m_torrent->torrent_file().piece_size(block.piece_index)-block_offset, + m_torrent->block_size()); + assert(block_size > 0); + assert(block_size <= m_torrent->block_size()); + + char buf[] = {0,0,0,13, msg_request}; + + buffer::interval i = m_send_buffer.allocate(17); + + std::copy(buf, buf + 5, i.begin); + i.begin += 5; + + // index + detail::write_int32(block.piece_index, i.begin); + // begin + detail::write_int32(block_offset, i.begin); + // length + detail::write_int32(block_size, i.begin); + + assert(i.begin == i.end); + using namespace boost::posix_time; + +#ifdef TORRENT_VERBOSE_LOGGING + (*m_logger) << to_simple_string(second_clock::universal_time()) + << " ==> REQUEST [ " + "piece: " << block.piece_index << " | " + "b: " << block.block_index << " | " + "s: " << block_offset << " | " + "l: " << block_size << " ]\n"; + + peer_request r; + r.piece = block.piece_index; + r.start = block_offset; + r.length = block_size; + assert(verify_piece(r)); +#endif + } + m_last_piece = second_clock::universal_time(); + send_buffer_updated(); + + } + void peer_connection::send_request(piece_block block) { INVARIANT_CHECK; @@ -1527,50 +1612,8 @@ namespace libtorrent assert(!m_torrent->picker().is_downloading(block)); m_torrent->picker().mark_as_downloading(block, m_socket->sender()); - - m_download_queue.push_back(block); - - int block_offset = block.block_index * m_torrent->block_size(); - int block_size - = std::min((int)m_torrent->torrent_file().piece_size(block.piece_index)-block_offset, - m_torrent->block_size()); - assert(block_size > 0); - assert(block_size <= m_torrent->block_size()); - - char buf[] = {0,0,0,13, msg_request}; - - buffer::interval i = m_send_buffer.allocate(17); - - std::copy(buf, buf + 5, i.begin); - i.begin += 5; - - // index - detail::write_int32(block.piece_index, i.begin); - // begin - detail::write_int32(block_offset, i.begin); - // length - detail::write_int32(block_size, i.begin); - - assert(i.begin == i.end); - using namespace boost::posix_time; - -#ifdef TORRENT_VERBOSE_LOGGING - (*m_logger) << to_simple_string(second_clock::universal_time()) - << " ==> REQUEST [ " - "piece: " << block.piece_index << " | " - "b: " << block.block_index << " | " - "s: " << block_offset << " | " - "l: " << block_size << " ]\n"; - - peer_request r; - r.piece = block.piece_index; - r.start = block_offset; - r.length = block_size; - assert(verify_piece(r)); -#endif - - m_last_piece = second_clock::universal_time(); - send_buffer_updated(); + m_request_queue.push_back(block); + send_block_requests(); } void peer_connection::send_metadata(std::pair req) @@ -1884,8 +1927,9 @@ namespace libtorrent ptime now(second_clock::universal_time()); + // TODO: the timeout should be user-settable if (!m_download_queue.empty() - && now - m_last_piece > seconds(30)) + && now - m_last_piece > seconds(15)) { // this peer isn't sending the pieces we've // requested (this has been observed by BitComet) @@ -1904,7 +1948,16 @@ namespace libtorrent // be requested from other peers picker.abort_download(*i); } + for (std::deque::const_iterator i = m_request_queue.begin() + , end(m_request_queue.end()); i != end; ++i) + { + // since this piece was skipped, clear it and allow it to + // be requested from other peers + picker.abort_download(*i); + } + m_download_queue.clear(); + m_request_queue.clear(); // this will trigger new picking of pieces m_torrent->get_policy().unchoked(*this); diff --git a/src/policy.cpp b/src/policy.cpp index 07246c3bf..576b6a394 100755 --- a/src/policy.cpp +++ b/src/policy.cpp @@ -58,17 +58,6 @@ using namespace boost::posix_time; namespace { - enum - { - // the limits of the download queue size - max_request_queue = 48, - min_request_queue = 2, - - // the amount of free upload allowed before - // the peer is choked - free_upload_amount = 4 * 16 * 1024 - }; - using namespace libtorrent; // the case where ignore_peer is motivated is if two peers @@ -83,7 +72,8 @@ namespace // this will make the number of requests linearly dependent // on the rate in which we download from the peer. // we want the queue to represent: - const int queue_time = 5; // seconds + // TODO: make this constant user-settable + const int queue_time = 3; // seconds // (if the latency is more than this, the download will stall) // so, the queue size is 5 * down_rate / 16 kiB (16 kB is the size of each request) // the minimum request size is 2 and the maximum is 48 @@ -98,7 +88,8 @@ namespace assert(desired_queue_size >= min_request_queue); - int num_requests = desired_queue_size - (int)c.download_queue().size(); + int num_requests = desired_queue_size - (int)c.download_queue().size() + - (int)c.request_queue().size(); // if our request queue is already full, we // don't have to make any new requests yet