improved piece timeout and peer snubbing logic
This commit is contained in:
@@ -82,6 +82,7 @@ namespace libtorrent
|
||||
, m_last_receive(time_now())
|
||||
, m_last_sent(time_now())
|
||||
, m_requested(min_time())
|
||||
, m_timeout_extend(0)
|
||||
, m_remote_dl_update(time_now())
|
||||
, m_became_uninterested(time_now())
|
||||
, m_became_uninteresting(time_now())
|
||||
@@ -121,7 +122,6 @@ namespace libtorrent
|
||||
, m_failed(false)
|
||||
, m_ignore_bandwidth_limits(false)
|
||||
, m_have_all(false)
|
||||
, m_assume_fifo(false)
|
||||
, m_disconnecting(false)
|
||||
, m_connecting(true)
|
||||
, m_queued(true)
|
||||
@@ -185,6 +185,7 @@ namespace libtorrent
|
||||
, m_last_receive(time_now())
|
||||
, m_last_sent(time_now())
|
||||
, m_requested(min_time())
|
||||
, m_timeout_extend(0)
|
||||
, m_remote_dl_update(time_now())
|
||||
, m_became_uninterested(time_now())
|
||||
, m_became_uninteresting(time_now())
|
||||
@@ -223,7 +224,6 @@ namespace libtorrent
|
||||
, m_failed(false)
|
||||
, m_ignore_bandwidth_limits(false)
|
||||
, m_have_all(false)
|
||||
, m_assume_fifo(false)
|
||||
, m_disconnecting(false)
|
||||
, m_connecting(false)
|
||||
, m_queued(false)
|
||||
@@ -574,7 +574,7 @@ namespace libtorrent
|
||||
return m_request_queue;
|
||||
}
|
||||
|
||||
std::deque<piece_block> const& peer_connection::download_queue() const
|
||||
std::deque<pending_block> const& peer_connection::download_queue() const
|
||||
{
|
||||
return m_download_queue;
|
||||
}
|
||||
@@ -851,9 +851,10 @@ namespace libtorrent
|
||||
|
||||
if (is_disconnecting()) return;
|
||||
|
||||
std::deque<piece_block>::iterator i = std::find_if(
|
||||
std::deque<pending_block>::iterator i = std::find_if(
|
||||
m_download_queue.begin(), m_download_queue.end()
|
||||
, bind(match_request, boost::cref(r), _1, t->block_size()));
|
||||
, bind(match_request, boost::cref(r), bind(&pending_block::block, _1)
|
||||
, t->block_size()));
|
||||
|
||||
#ifdef TORRENT_VERBOSE_LOGGING
|
||||
(*m_logger) << time_now_string()
|
||||
@@ -863,7 +864,7 @@ namespace libtorrent
|
||||
piece_block b(-1, 0);
|
||||
if (i != m_download_queue.end())
|
||||
{
|
||||
b = *i;
|
||||
b = i->block;
|
||||
m_download_queue.erase(i);
|
||||
|
||||
// if the peer is in parole mode, keep the request
|
||||
@@ -1512,11 +1513,11 @@ namespace libtorrent
|
||||
TORRENT_ASSERT(p.length == t->block_size()
|
||||
|| p.length == t->torrent_file().total_size() % t->block_size());
|
||||
|
||||
std::deque<piece_block>::iterator b
|
||||
= std::find(
|
||||
std::deque<pending_block>::iterator b
|
||||
= std::find_if(
|
||||
m_download_queue.begin()
|
||||
, m_download_queue.end()
|
||||
, block_finished);
|
||||
, has_block(block_finished));
|
||||
|
||||
if (b == m_download_queue.end())
|
||||
{
|
||||
@@ -1535,39 +1536,41 @@ namespace libtorrent
|
||||
return;
|
||||
}
|
||||
|
||||
if (m_assume_fifo)
|
||||
for (std::deque<pending_block>::iterator i = m_download_queue.begin();
|
||||
i != b;)
|
||||
{
|
||||
for (std::deque<piece_block>::iterator i = m_download_queue.begin();
|
||||
i != b; ++i)
|
||||
{
|
||||
|
||||
#if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
|
||||
(*m_logger) << time_now_string()
|
||||
<< " *** SKIPPED_PIECE [ piece: " << i->piece_index << " | "
|
||||
"b: " << i->block_index << " ] ***\n";
|
||||
(*m_logger) << time_now_string()
|
||||
<< " *** SKIPPED_PIECE [ piece: " << i->piece_index << " | "
|
||||
"b: " << i->block_index << " ] ***\n";
|
||||
#endif
|
||||
// since this piece was skipped, clear it and allow it to
|
||||
// be requested from other peers
|
||||
// TODO: send cancel?
|
||||
picker.abort_download(*i);
|
||||
|
||||
++i->skipped;
|
||||
// if the number of times a block is skipped by out of order
|
||||
// blocks exceeds the size of the outstanding queue, assume that
|
||||
// the other end dropped the request.
|
||||
if (i->skipped > m_desired_queue_size)
|
||||
{
|
||||
if (m_ses.m_alerts.should_post<request_dropped_alert>())
|
||||
m_ses.m_alerts.post_alert(request_dropped_alert(t->get_handle()
|
||||
, i->block.block_index, i->block.piece_index));
|
||||
picker.abort_download(i->block);
|
||||
i = m_download_queue.erase(i);
|
||||
}
|
||||
else
|
||||
{
|
||||
++i;
|
||||
}
|
||||
|
||||
// remove the request that just finished
|
||||
// from the download queue plus the
|
||||
// skipped blocks.
|
||||
m_download_queue.erase(m_download_queue.begin(), b);
|
||||
b = m_download_queue.begin();
|
||||
TORRENT_ASSERT(*b == block_finished);
|
||||
}
|
||||
|
||||
if (total_seconds(time_now() - m_requested) < m_ses.settings().request_timeout)
|
||||
m_snubbed = false;
|
||||
|
||||
|
||||
// if the block we got is already finished, then ignore it
|
||||
if (picker.is_downloaded(block_finished))
|
||||
{
|
||||
t->received_redundant_data(p.length);
|
||||
|
||||
m_download_queue.erase(b);
|
||||
m_timeout_extend = 0;
|
||||
|
||||
if (!m_download_queue.empty())
|
||||
m_requested = time_now();
|
||||
@@ -1577,12 +1580,25 @@ namespace libtorrent
|
||||
return;
|
||||
}
|
||||
|
||||
if (total_seconds(time_now() - m_requested)
|
||||
< m_ses.settings().request_timeout
|
||||
&& m_snubbed)
|
||||
{
|
||||
m_snubbed = false;
|
||||
if (m_ses.m_alerts.should_post<peer_unsnubbed_alert>())
|
||||
{
|
||||
m_ses.m_alerts.post_alert(peer_unsnubbed_alert(t->get_handle()
|
||||
, m_remote, m_peer_id));
|
||||
}
|
||||
}
|
||||
|
||||
fs.async_write(p, data, bind(&peer_connection::on_disk_write_complete
|
||||
, self(), _1, _2, p, t));
|
||||
m_outstanding_writing_bytes += p.length;
|
||||
TORRENT_ASSERT(m_channel_state[download_channel] == peer_info::bw_idle);
|
||||
m_download_queue.erase(b);
|
||||
|
||||
m_timeout_extend = 0;
|
||||
if (!m_download_queue.empty())
|
||||
m_requested = time_now();
|
||||
|
||||
@@ -1902,8 +1918,10 @@ namespace libtorrent
|
||||
TORRENT_ASSERT(block.block_index < t->torrent_file().piece_size(block.piece_index));
|
||||
TORRENT_ASSERT(!t->picker().is_requested(block) || (t->picker().num_peers(block) > 0));
|
||||
TORRENT_ASSERT(!t->have_piece(block.piece_index));
|
||||
TORRENT_ASSERT(std::find(m_download_queue.begin(), m_download_queue.end(), block) == m_download_queue.end());
|
||||
TORRENT_ASSERT(std::find(m_request_queue.begin(), m_request_queue.end(), block) == m_request_queue.end());
|
||||
TORRENT_ASSERT(std::find_if(m_download_queue.begin(), m_download_queue.end()
|
||||
, has_block(block)) == m_download_queue.end());
|
||||
TORRENT_ASSERT(std::find(m_request_queue.begin(), m_request_queue.end()
|
||||
, block) == m_request_queue.end());
|
||||
|
||||
piece_picker::piece_state_t state;
|
||||
peer_speed_t speed = peer_speed();
|
||||
@@ -1955,18 +1973,20 @@ namespace libtorrent
|
||||
// cancelled, then just ignore the cancel.
|
||||
if (!t->picker().is_requested(block)) return;
|
||||
|
||||
std::deque<piece_block>::iterator it
|
||||
= std::find(m_download_queue.begin(), m_download_queue.end(), block);
|
||||
std::deque<pending_block>::iterator it
|
||||
= std::find_if(m_download_queue.begin(), m_download_queue.end(), has_block(block));
|
||||
if (it == m_download_queue.end())
|
||||
{
|
||||
it = std::find(m_request_queue.begin(), m_request_queue.end(), block);
|
||||
std::deque<piece_block>::iterator rit = std::find(m_request_queue.begin()
|
||||
, m_request_queue.end(), block);
|
||||
|
||||
// when a multi block is received, it is cancelled
|
||||
// from all peers, so if this one hasn't requested
|
||||
// the block, just ignore to cancel it.
|
||||
if (it == m_request_queue.end()) return;
|
||||
if (rit == m_request_queue.end()) return;
|
||||
|
||||
t->picker().abort_download(block);
|
||||
m_request_queue.erase(it);
|
||||
m_request_queue.erase(rit);
|
||||
// since we found it in the request queue, it means it hasn't been
|
||||
// sent yet, so we don't have to send a cancel.
|
||||
return;
|
||||
@@ -2260,7 +2280,7 @@ namespace libtorrent
|
||||
|
||||
while (!m_download_queue.empty())
|
||||
{
|
||||
picker.abort_download(m_download_queue.back());
|
||||
picker.abort_download(m_download_queue.back().block);
|
||||
m_download_queue.pop_back();
|
||||
}
|
||||
while (!m_request_queue.empty())
|
||||
@@ -2354,7 +2374,8 @@ namespace libtorrent
|
||||
p.send_quota = m_bandwidth_limit[upload_channel].quota_left();
|
||||
p.receive_quota = m_bandwidth_limit[download_channel].quota_left();
|
||||
if (m_download_queue.empty()) p.request_timeout = -1;
|
||||
else p.request_timeout = total_seconds(m_requested - now) + m_ses.settings().request_timeout;
|
||||
else p.request_timeout = total_seconds(m_requested - now) + m_ses.settings().request_timeout
|
||||
+ m_timeout_extend;
|
||||
#ifndef TORRENT_DISABLE_GEO_IP
|
||||
p.inet_as_name = m_inet_as_name;
|
||||
#endif
|
||||
@@ -2608,29 +2629,12 @@ namespace libtorrent
|
||||
}
|
||||
|
||||
if (!m_download_queue.empty()
|
||||
&& now > m_requested + seconds(m_ses.settings().request_timeout)
|
||||
&& t->has_picker())
|
||||
&& now > m_requested + seconds(m_ses.settings().request_timeout
|
||||
+ m_timeout_extend))
|
||||
{
|
||||
if (!m_snubbed)
|
||||
{
|
||||
m_snubbed = true;
|
||||
if (m_ses.m_alerts.should_post<peer_snubbed_alert>())
|
||||
{
|
||||
m_ses.m_alerts.post_alert(peer_snubbed_alert(t->get_handle()
|
||||
, m_remote, m_peer_id));
|
||||
}
|
||||
}
|
||||
m_desired_queue_size = 1;
|
||||
piece_picker& picker = t->picker();
|
||||
// the front request timed out!
|
||||
picker.abort_download(m_download_queue[0]);
|
||||
m_download_queue.pop_front();
|
||||
if (!m_download_queue.empty())
|
||||
m_requested = time_now();
|
||||
request_a_block(*t, *this);
|
||||
send_block_requests();
|
||||
snub_peer();
|
||||
}
|
||||
|
||||
|
||||
// if we haven't sent something in too long, send a keep-alive
|
||||
keep_alive();
|
||||
|
||||
@@ -2686,7 +2690,8 @@ namespace libtorrent
|
||||
}
|
||||
|
||||
if (!m_download_queue.empty()
|
||||
&& now - m_last_piece > seconds(m_ses.settings().piece_timeout))
|
||||
&& now - m_last_piece > seconds(m_ses.settings().piece_timeout
|
||||
+ m_timeout_extend))
|
||||
{
|
||||
// this peer isn't sending the pieces we've
|
||||
// requested (this has been observed by BitComet)
|
||||
@@ -2698,51 +2703,7 @@ namespace libtorrent
|
||||
<< " " << total_seconds(now - m_last_piece) << "] ***\n";
|
||||
#endif
|
||||
|
||||
if (!m_snubbed)
|
||||
{
|
||||
m_snubbed = true;
|
||||
if (m_ses.m_alerts.should_post<peer_snubbed_alert>())
|
||||
{
|
||||
m_ses.m_alerts.post_alert(peer_snubbed_alert(t->get_handle()
|
||||
, m_remote, m_peer_id));
|
||||
}
|
||||
}
|
||||
m_desired_queue_size = 1;
|
||||
|
||||
if (t->is_seed())
|
||||
{
|
||||
m_download_queue.clear();
|
||||
m_request_queue.clear();
|
||||
}
|
||||
else
|
||||
{
|
||||
piece_picker& picker = t->picker();
|
||||
|
||||
std::deque<piece_block> dl(m_download_queue);
|
||||
for (std::deque<piece_block>::iterator i = dl.begin()
|
||||
, end(dl.end()); i != end; ++i)
|
||||
{
|
||||
piece_block const& r = m_download_queue.back();
|
||||
#ifdef TORRENT_VERBOSE_LOGGING
|
||||
(*m_logger) << time_now_string()
|
||||
<< " ==> CANCEL [ piece: " << r.piece_index
|
||||
<< " | block: " << r.block_index
|
||||
<< " ]\n";
|
||||
#endif
|
||||
write_cancel(t->to_req(r));
|
||||
}
|
||||
while (!m_request_queue.empty())
|
||||
{
|
||||
piece_block const& r = m_request_queue.back();
|
||||
picker.abort_download(r);
|
||||
m_request_queue.pop_back();
|
||||
}
|
||||
|
||||
m_assume_fifo = true;
|
||||
|
||||
request_a_block(*t, *this);
|
||||
send_block_requests();
|
||||
}
|
||||
snub_peer();
|
||||
}
|
||||
|
||||
// If the client sends more data
|
||||
@@ -2802,6 +2763,71 @@ namespace libtorrent
|
||||
fill_send_buffer();
|
||||
}
|
||||
|
||||
void peer_connection::snub_peer()
|
||||
{
|
||||
boost::shared_ptr<torrent> t = m_torrent.lock();
|
||||
TORRENT_ASSERT(t);
|
||||
|
||||
if (!m_snubbed)
|
||||
{
|
||||
m_snubbed = true;
|
||||
if (m_ses.m_alerts.should_post<peer_snubbed_alert>())
|
||||
{
|
||||
m_ses.m_alerts.post_alert(peer_snubbed_alert(t->get_handle()
|
||||
, m_remote, m_peer_id));
|
||||
}
|
||||
}
|
||||
m_desired_queue_size = 1;
|
||||
|
||||
if (!t->has_picker()) return;
|
||||
piece_picker& picker = t->picker();
|
||||
|
||||
piece_block r(-1, -1);
|
||||
// time out the last request in the queue
|
||||
if (!m_request_queue.empty())
|
||||
{
|
||||
r = m_request_queue.back();
|
||||
m_request_queue.pop_back();
|
||||
}
|
||||
else
|
||||
{
|
||||
TORRENT_ASSERT(!m_download_queue.empty());
|
||||
r = m_download_queue.back().block;
|
||||
|
||||
// only time out a request if it blocks the piece
|
||||
// from being completed (i.e. no free blocks to
|
||||
// request from it)
|
||||
piece_picker::downloading_piece p;
|
||||
picker.piece_info(r.piece_index, p);
|
||||
int free_blocks = picker.blocks_in_piece(r.piece_index)
|
||||
- p.finished - p.writing - p.requested;
|
||||
if (free_blocks > 0)
|
||||
{
|
||||
m_timeout_extend += m_ses.settings().request_timeout;
|
||||
return;
|
||||
}
|
||||
|
||||
if (m_ses.m_alerts.should_post<block_timeout_alert>())
|
||||
{
|
||||
m_ses.m_alerts.post_alert(block_timeout_alert(t->get_handle()
|
||||
, r.block_index, r.piece_index));
|
||||
}
|
||||
m_download_queue.pop_back();
|
||||
}
|
||||
if (!m_download_queue.empty() || !m_request_queue.empty())
|
||||
m_timeout_extend += m_ses.settings().request_timeout;
|
||||
|
||||
request_a_block(*t, *this);
|
||||
send_block_requests();
|
||||
|
||||
// abort the block after the new one has
|
||||
// been requested in order to prevent it from
|
||||
// picking the same block again, stalling the
|
||||
// same piece indefinitely.
|
||||
if (r != piece_block(-1, -1))
|
||||
picker.abort_download(r);
|
||||
}
|
||||
|
||||
void peer_connection::fill_send_buffer()
|
||||
{
|
||||
INVARIANT_CHECK;
|
||||
@@ -3530,7 +3556,8 @@ namespace libtorrent
|
||||
TORRENT_ASSERT(m_bandwidth_limit[upload_channel].quota_left() == 0);
|
||||
|
||||
std::set<piece_block> unique;
|
||||
std::copy(m_download_queue.begin(), m_download_queue.end(), std::inserter(unique, unique.begin()));
|
||||
std::transform(m_download_queue.begin(), m_download_queue.end()
|
||||
, std::inserter(unique, unique.begin()), boost::bind(&pending_block::block, _1));
|
||||
std::copy(m_request_queue.begin(), m_request_queue.end(), std::inserter(unique, unique.begin()));
|
||||
TORRENT_ASSERT(unique.size() == m_download_queue.size() + m_request_queue.size());
|
||||
if (m_peer_info)
|
||||
@@ -3565,9 +3592,9 @@ namespace libtorrent
|
||||
for (std::deque<piece_block>::const_iterator i = p.request_queue().begin()
|
||||
, end(p.request_queue().end()); i != end; ++i)
|
||||
++num_requests[*i];
|
||||
for (std::deque<piece_block>::const_iterator i = p.download_queue().begin()
|
||||
for (std::deque<pending_block>::const_iterator i = p.download_queue().begin()
|
||||
, end(p.download_queue().end()); i != end; ++i)
|
||||
++num_requests[*i];
|
||||
++num_requests[i->block];
|
||||
}
|
||||
for (std::map<piece_block, int>::iterator i = num_requests.begin()
|
||||
, end(num_requests.end()); i != end; ++i)
|
||||
|
Reference in New Issue
Block a user