fixed race condition in peer_connection, fixed assert in torrent destructor, updated tests

This commit is contained in:
Arvid Norberg
2006-05-28 19:03:54 +00:00
parent 58698d6aea
commit 94628fa78d
20 changed files with 521 additions and 322 deletions

View File

@@ -117,21 +117,39 @@ namespace libtorrent { namespace detail
if (m_torrents.empty() && !m_abort && !processing)
m_cond.wait(l);
if (m_abort) return;
if (m_abort)
{
// no lock is needed here, because the main thread
// has already been shut down by now
processing.reset();
t.reset();
std::for_each(m_torrents.begin(), m_torrents.end()
, boost::bind(&torrent::abort
, boost::bind(&shared_ptr<torrent>::get
, boost::bind(&piece_checker_data::torrent_ptr, _1))));
m_torrents.clear();
std::for_each(m_processing.begin(), m_processing.end()
, boost::bind(&torrent::abort
, boost::bind(&shared_ptr<torrent>::get
, boost::bind(&piece_checker_data::torrent_ptr, _1))));
m_processing.clear();
return;
}
if (!m_torrents.empty())
{
t = m_torrents.front();
if (t->abort)
{
if (processing->torrent_ptr->num_peers())
{
m_ses.m_torrents.insert(std::make_pair(
t->info_hash, t->torrent_ptr));
t->torrent_ptr->abort();
}
// make sure the locking order is
// consistent to avoid dead locks
// we need to lock the session because closing
// torrents assume to have access to it
l.unlock();
session_impl::mutex_t::scoped_lock l2(m_ses.m_mutex);
l.lock();
t->torrent_ptr->abort();
m_torrents.pop_front();
continue;
}
@@ -145,12 +163,11 @@ namespace libtorrent { namespace detail
if (!error_msg.empty() && m_ses.m_alerts.should_post(alert::warning))
{
session_impl::mutex_t::scoped_lock l2(m_ses.m_mutex);
session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
m_ses.m_alerts.post_alert(fastresume_rejected_alert(
t->torrent_ptr->get_handle()
, error_msg));
#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
(*m_ses.m_logger) << "fastresume data for "
<< t->torrent_ptr->torrent_file().name() << " rejected: "
<< error_msg << "\n";
@@ -172,21 +189,31 @@ namespace libtorrent { namespace detail
t->torrent_ptr->files_checked(t->unfinished_pieces);
m_torrents.pop_front();
m_ses.m_torrents.insert(std::make_pair(t->info_hash, t->torrent_ptr));
if (t->torrent_ptr->is_seed() && m_ses.m_alerts.should_post(alert::info))
{
m_ses.m_alerts.post_alert(torrent_finished_alert(
t->torrent_ptr->get_handle()
, "torrent is complete"));
}
peer_id id;
std::fill(id.begin(), id.end(), 0);
for (std::vector<tcp::endpoint>::const_iterator i = t->peers.begin();
i != t->peers.end(); ++i)
// we cannot add the torrent if the session is aborted.
if (!m_ses.m_abort)
{
t->torrent_ptr->get_policy().peer_from_tracker(*i, id);
m_ses.m_torrents.insert(std::make_pair(t->info_hash, t->torrent_ptr));
if (t->torrent_ptr->is_seed() && m_ses.m_alerts.should_post(alert::info))
{
m_ses.m_alerts.post_alert(torrent_finished_alert(
t->torrent_ptr->get_handle()
, "torrent is complete"));
}
peer_id id;
std::fill(id.begin(), id.end(), 0);
for (std::vector<tcp::endpoint>::const_iterator i = t->peers.begin();
i != t->peers.end(); ++i)
{
t->torrent_ptr->get_policy().peer_from_tracker(*i, id);
}
}
else
{
t->torrent_ptr->abort();
}
t.reset();
continue;
}
@@ -202,6 +229,7 @@ namespace libtorrent { namespace detail
{
processing = t;
processing->processing = true;
t.reset();
}
}
}
@@ -219,12 +247,7 @@ namespace libtorrent { namespace detail
t->torrent_ptr->get_handle()
, e.what()));
}
if (t->torrent_ptr->num_peers())
{
m_ses.m_torrents.insert(std::make_pair(
t->info_hash, t->torrent_ptr));
t->torrent_ptr->abort();
}
t->torrent_ptr->abort();
assert(!m_torrents.empty());
m_torrents.pop_front();
@@ -258,12 +281,7 @@ namespace libtorrent { namespace detail
assert(!m_processing.empty());
assert(m_processing.front() == processing);
if (processing->torrent_ptr->num_peers())
{
m_ses.m_torrents.insert(std::make_pair(
processing->info_hash, processing->torrent_ptr));
processing->torrent_ptr->abort();
}
processing->torrent_ptr->abort();
processing.reset();
m_processing.pop_front();
@@ -284,23 +302,33 @@ namespace libtorrent { namespace detail
assert(!m_processing.empty());
assert(m_processing.front() == processing);
processing->torrent_ptr->files_checked(processing->unfinished_pieces);
m_ses.m_torrents.insert(std::make_pair(
processing->info_hash, processing->torrent_ptr));
if (processing->torrent_ptr->is_seed()
&& m_ses.m_alerts.should_post(alert::info))
// TODO: factor out the adding of torrents to the session
// and to the checker thread to avoid duplicating the
// check for abortion.
if (!m_ses.m_abort)
{
m_ses.m_alerts.post_alert(torrent_finished_alert(
processing->torrent_ptr->get_handle()
, "torrent is complete"));
}
processing->torrent_ptr->files_checked(processing->unfinished_pieces);
m_ses.m_torrents.insert(std::make_pair(
processing->info_hash, processing->torrent_ptr));
if (processing->torrent_ptr->is_seed()
&& m_ses.m_alerts.should_post(alert::info))
{
m_ses.m_alerts.post_alert(torrent_finished_alert(
processing->torrent_ptr->get_handle()
, "torrent is complete"));
}
peer_id id;
std::fill(id.begin(), id.end(), 0);
for (std::vector<tcp::endpoint>::const_iterator i = processing->peers.begin();
peer_id id;
std::fill(id.begin(), id.end(), 0);
for (std::vector<tcp::endpoint>::const_iterator i = processing->peers.begin();
i != processing->peers.end(); ++i)
{
processing->torrent_ptr->get_policy().peer_from_tracker(*i, id);
}
}
else
{
processing->torrent_ptr->get_policy().peer_from_tracker(*i, id);
processing->torrent_ptr->abort();
}
processing.reset();
m_processing.pop_front();
@@ -311,7 +339,7 @@ namespace libtorrent { namespace detail
}
}
}
catch(const std::exception& e)
catch(std::exception const& e)
{
// This will happen if the storage fails to initialize
session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
@@ -326,12 +354,7 @@ namespace libtorrent { namespace detail
}
assert(!m_processing.empty());
if (processing->torrent_ptr->num_peers())
{
m_ses.m_torrents.insert(std::make_pair(
processing->info_hash, processing->torrent_ptr));
processing->torrent_ptr->abort();
}
processing->torrent_ptr->abort();
processing.reset();
m_processing.pop_front();
@@ -604,6 +627,8 @@ namespace libtorrent { namespace detail
mutex_t::scoped_lock l(m_mutex);
assert(listen_socket.lock() == m_listen_socket);
if (m_abort) return;
async_accept();
if (e)
{
@@ -715,6 +740,7 @@ namespace libtorrent { namespace detail
if (p->is_connecting())
{
assert(p->is_local());
assert(m_connections.find(p->get_socket()) == m_connections.end());
// Since this peer is still connecting, will not be
// in the list of completed connections.
connection_map::iterator i = m_half_open.find(p->get_socket());
@@ -725,6 +751,7 @@ namespace libtorrent { namespace detail
connection_queue::iterator j = std::find(
m_connection_queue.begin(), m_connection_queue.end(), p);
assert(j != m_connection_queue.end());
if (j != m_connection_queue.end())
m_connection_queue.erase(j);
}
@@ -736,7 +763,11 @@ namespace libtorrent { namespace detail
}
else
{
assert(m_half_open.find(p->get_socket()) == m_half_open.end());
assert(std::find(m_connection_queue.begin()
, m_connection_queue.end(), p) == m_connection_queue.end());
connection_map::iterator i = m_connections.find(p->get_socket());
// assert (i != m_connections.end());
if (i != m_connections.end())
m_connections.erase(i);
}
@@ -744,6 +775,8 @@ namespace libtorrent { namespace detail
void session_impl::second_tick(asio::error const& e) try
{
session_impl::mutex_t::scoped_lock l(m_mutex);
if (e)
{
#if defined(TORRENT_LOGGING)
@@ -753,9 +786,7 @@ namespace libtorrent { namespace detail
m_selector.interrupt();
return;
}
session_impl::mutex_t::scoped_lock l(m_mutex);
if (m_abort) return;
float tick_interval = (microsec_clock::universal_time()
- m_last_tick).total_milliseconds() / 1000.f;
@@ -777,26 +808,27 @@ namespace libtorrent { namespace detail
++i;
// if this socket has timed out
// close it.
if (j->second->has_timed_out())
peer_connection& c = *j->second;
if (c.has_timed_out())
{
if (m_alerts.should_post(alert::debug))
{
m_alerts.post_alert(
peer_error_alert(
j->second->remote()
, j->second->pid()
c.remote()
, c.pid()
, "connection timed out"));
}
#if defined(TORRENT_VERBOSE_LOGGING)
(*j->second->m_logger) << "*** CONNECTION TIMED OUT\n";
(*c.m_logger) << "*** CONNECTION TIMED OUT\n";
#endif
j->second->set_failed();
j->second->disconnect();
c.set_failed();
c.disconnect();
continue;
}
j->second->keep_alive();
c.keep_alive();
}
// check each torrent for tracker updates
@@ -923,40 +955,55 @@ namespace libtorrent { namespace detail
}
catch (std::exception& e)
{
std::cerr << e.what() << "\n";
#ifndef NDEBUG
std::cerr << e.what() << "\n";
std::string err = e.what();
#endif
assert(false);
}
{
session_impl::mutex_t::scoped_lock l(m_mutex);
session_impl::mutex_t::scoped_lock l(m_mutex);
m_tracker_manager.abort_all_requests();
for (std::map<sha1_hash, boost::shared_ptr<torrent> >::iterator i =
m_torrents.begin(); i != m_torrents.end(); ++i)
{
i->second->abort();
if (!i->second->is_paused() || i->second->should_request())
m_tracker_manager.abort_all_requests();
for (std::map<sha1_hash, boost::shared_ptr<torrent> >::iterator i =
m_torrents.begin(); i != m_torrents.end(); ++i)
{
tracker_request req = i->second->generate_tracker_request();
req.listen_port = m_listen_interface.port();
req.key = m_key;
std::string login = i->second->tracker_login();
m_tracker_manager.queue_request(m_selector, req, login);
i->second->abort();
if (!i->second->is_paused() || i->second->should_request())
{
tracker_request req = i->second->generate_tracker_request();
req.listen_port = m_listen_interface.port();
req.key = m_key;
std::string login = i->second->tracker_login();
m_tracker_manager.queue_request(m_selector, req, login);
}
}
}
m_timer.expires_from_now(boost::posix_time::seconds(
m_settings.stop_tracker_timeout));
m_timer.async_wait(bind(&demuxer::interrupt, &m_selector));
m_timer.expires_from_now(boost::posix_time::seconds(
m_settings.stop_tracker_timeout));
m_timer.async_wait(bind(&demuxer::interrupt, &m_selector));
}
m_selector.reset();
m_selector.run();
m_torrents.clear();
session_impl::mutex_t::scoped_lock l(m_mutex);
assert(m_abort);
m_abort = true;
m_connections.clear();
m_half_open.clear();
m_connection_queue.clear();
#ifndef NDEBUG
for (torrent_map::iterator i = m_torrents.begin();
i != m_torrents.end(); ++i)
{
assert(i->second->num_peers() == 0);
}
#endif
m_torrents.clear();
assert(m_torrents.empty());
assert(m_connections.empty());
@@ -1263,6 +1310,9 @@ namespace libtorrent
if (!m_impl.find_torrent(info_hash).expired())
throw duplicate_torrent();
// you cannot add new torrents to a session that is closing down
assert(!m_impl.m_abort);
// create the torrent and the data associated with
// the checker thread and store it before starting
// the thread
@@ -1407,6 +1457,14 @@ namespace libtorrent
m_impl.m_abort = true;
m_impl.m_selector.interrupt();
}
m_thread.join();
// it's important that the main thread is closed completely before
// the checker thread is terminated. Because all the connections
// have to be closed and removed from the torrents before they
// can be destructed. (because the weak pointers in the
// peer_connections will be invalidated when the torrents are
// destructed and then the invariant will be broken).
{
mutex::scoped_lock l(m_checker_impl.m_mutex);
@@ -1421,8 +1479,10 @@ namespace libtorrent
m_checker_impl.m_cond.notify_one();
}
m_thread.join();
m_checker_thread.join();
assert(m_impl.m_torrents.empty());
assert(m_impl.m_connections.empty());
}
void session::set_max_uploads(int limit)