big change in the way storage is checked. The checker thread can now check the fastresume data of a new torrent without waiting for a currently checking one

This commit is contained in:
Arvid Norberg
2005-10-13 07:59:05 +00:00
parent 7af0fad1ba
commit 189a8756ee
13 changed files with 975 additions and 598 deletions

View File

@@ -70,14 +70,6 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/peer_connection.hpp"
#include "libtorrent/ip_filter.hpp"
#if defined(_MSC_VER) && _MSC_VER < 1300
namespace std
{
using ::srand;
using ::isprint;
};
#endif
using namespace boost::posix_time;
namespace libtorrent { namespace detail
@@ -100,81 +92,106 @@ namespace libtorrent { namespace detail
void checker_impl::operator()()
{
eh_initializer();
// if we're currently performing a full file check,
// this is the torrent being processed
boost::shared_ptr<piece_checker_data> processing;
boost::shared_ptr<piece_checker_data> t;
for (;;)
{
piece_checker_data* t = 0;
{
boost::mutex::scoped_lock l(m_mutex);
// if the job queue is empty and
// we shouldn't abort
// wait for a signal
if (m_torrents.empty() && !m_abort)
m_cond.wait(l);
if (m_abort) return;
assert(!m_torrents.empty());
t = &m_torrents.front();
if (t->abort)
{
m_torrents.pop_front();
continue;
}
t->processing = true;
}
// temporary torrent used while checking fastresume data
try
{
assert(t != 0);
std::string error_msg;
t->parse_resume_data(t->resume_data, t->torrent_ptr->torrent_file(), error_msg);
// clear the resume data now that it has been used
// (the fast resume data is now parsed and stored in t)
t->resume_data = entry();
t->torrent_ptr->check_files(*t, m_mutex);
// lock the session to add the new torrent
boost::mutex::scoped_lock l(m_mutex);
if (t->abort)
t.reset();
{
m_torrents.pop_front();
continue;
}
boost::mutex::scoped_lock l2(m_ses.m_mutex);
boost::mutex::scoped_lock l(m_mutex);
if (!error_msg.empty() && m_ses.m_alerts.should_post(alert::warning))
{
m_ses.m_alerts.post_alert(fastresume_rejected_alert(
t->torrent_ptr->get_handle()
, error_msg));
}
m_ses.m_torrents.insert(std::make_pair(t->info_hash, t->torrent_ptr));
if (t->torrent_ptr->is_seed() && m_ses.m_alerts.should_post(alert::info))
{
m_ses.m_alerts.post_alert(torrent_finished_alert(
t->torrent_ptr->get_handle()
, "torrent is complete"));
// if the job queue is empty and
// we shouldn't abort
// wait for a signal
if (m_torrents.empty() && !m_abort && !processing)
m_cond.wait(l);
if (m_abort) return;
if (!m_torrents.empty())
{
t = m_torrents.front();
if (t->abort)
{
m_torrents.pop_front();
continue;
}
}
}
peer_id id;
std::fill(id.begin(), id.end(), 0);
for (std::vector<address>::const_iterator i = t->peers.begin();
i != t->peers.end(); ++i)
if (t)
{
t->torrent_ptr->get_policy().peer_from_tracker(*i, id);
std::string error_msg;
t->parse_resume_data(t->resume_data, t->torrent_ptr->torrent_file(), error_msg);
if (!error_msg.empty() && m_ses.m_alerts.should_post(alert::warning))
{
boost::mutex::scoped_lock l2(m_ses.m_mutex);
m_ses.m_alerts.post_alert(fastresume_rejected_alert(
t->torrent_ptr->get_handle()
, error_msg));
}
// clear the resume data now that it has been used
// (the fast resume data is now parsed and stored in t)
t->resume_data = entry();
bool up_to_date = t->torrent_ptr->check_fastresume(*t);
if (up_to_date)
{
// lock the session to add the new torrent
boost::mutex::scoped_lock l(m_ses.m_mutex);
boost::mutex::scoped_lock l2(m_mutex);
assert(m_torrents.front() == t);
t->torrent_ptr->files_checked(t->unfinished_pieces);
m_torrents.pop_front();
m_ses.m_torrents.insert(std::make_pair(t->info_hash, t->torrent_ptr));
if (t->torrent_ptr->is_seed() && m_ses.m_alerts.should_post(alert::info))
{
m_ses.m_alerts.post_alert(torrent_finished_alert(
t->torrent_ptr->get_handle()
, "torrent is complete"));
}
peer_id id;
std::fill(id.begin(), id.end(), 0);
for (std::vector<address>::const_iterator i = t->peers.begin();
i != t->peers.end(); ++i)
{
t->torrent_ptr->get_policy().peer_from_tracker(*i, id);
}
continue;
}
// lock the checker while we move the torrent from
// m_torrents to m_processing
{
boost::mutex::scoped_lock l(m_mutex);
assert(m_torrents.front() == t);
m_torrents.pop_front();
m_processing.push_back(t);
if (!processing)
{
processing = t;
processing->processing = true;
}
}
}
m_torrents.pop_front();
}
catch(const std::exception& e)
{
// This will happen if the storage fails to initialize
boost::mutex::scoped_lock l(m_mutex);
boost::mutex::scoped_lock l2(m_ses.m_mutex);
boost::mutex::scoped_lock l(m_ses.m_mutex);
boost::mutex::scoped_lock l2(m_mutex);
if (m_ses.m_alerts.should_post(alert::fatal))
{
m_ses.m_alerts.post_alert(
@@ -182,41 +199,156 @@ namespace libtorrent { namespace detail
t->torrent_ptr->get_handle()
, e.what()));
}
assert(!m_torrents.empty());
m_torrents.pop_front();
}
catch(...)
{
#ifndef NDEBUG
std::cerr << "error while checking resume data\n";
#endif
boost::mutex::scoped_lock l(m_mutex);
assert(!m_torrents.empty());
m_torrents.pop_front();
assert(false);
}
if (!processing) continue;
try
{
assert(processing);
float finished = false;
float progress = 0.f;
boost::tie(finished, progress) = processing->torrent_ptr->check_files();
{
boost::mutex::scoped_lock l(m_mutex);
processing->progress = progress;
if (processing->abort)
{
assert(!m_processing.empty());
assert(m_processing.front() == processing);
processing.reset();
m_processing.pop_front();
if (!m_processing.empty())
{
processing = m_processing.front();
processing->processing = true;
}
continue;
}
}
if (finished)
{
// lock the session to add the new torrent
boost::mutex::scoped_lock l(m_ses.m_mutex);
boost::mutex::scoped_lock l2(m_mutex);
assert(!m_processing.empty());
assert(m_processing.front() == processing);
processing->torrent_ptr->files_checked(processing->unfinished_pieces);
m_ses.m_torrents.insert(std::make_pair(
processing->info_hash, processing->torrent_ptr));
if (processing->torrent_ptr->is_seed()
&& m_ses.m_alerts.should_post(alert::info))
{
m_ses.m_alerts.post_alert(torrent_finished_alert(
processing->torrent_ptr->get_handle()
, "torrent is complete"));
}
peer_id id;
std::fill(id.begin(), id.end(), 0);
for (std::vector<address>::const_iterator i = processing->peers.begin();
i != processing->peers.end(); ++i)
{
processing->torrent_ptr->get_policy().peer_from_tracker(*i, id);
}
processing.reset();
m_processing.pop_front();
if (!m_processing.empty())
{
processing = m_processing.front();
processing->processing = true;
}
}
}
catch(const std::exception& e)
{
// This will happen if the storage fails to initialize
boost::mutex::scoped_lock l(m_ses.m_mutex);
boost::mutex::scoped_lock l2(m_mutex);
if (m_ses.m_alerts.should_post(alert::fatal))
{
m_ses.m_alerts.post_alert(
file_error_alert(
processing->torrent_ptr->get_handle()
, e.what()));
}
assert(!m_processing.empty());
processing.reset();
m_processing.pop_front();
if (!m_processing.empty())
{
processing = m_processing.front();
processing->processing = true;
}
}
catch(...)
{
#ifndef NDEBUG
std::cerr << "error while checking files\n";
#endif
assert(false);
boost::mutex::scoped_lock l(m_mutex);
m_torrents.pop_front();
assert(!m_processing.empty());
processing.reset();
m_processing.pop_front();
if (!m_processing.empty())
{
processing = m_processing.front();
processing->processing = true;
}
assert(false);
}
}
}
detail::piece_checker_data* checker_impl::find_torrent(sha1_hash const& info_hash)
{
for (std::deque<piece_checker_data>::iterator i
for (std::deque<boost::shared_ptr<piece_checker_data> >::iterator i
= m_torrents.begin(); i != m_torrents.end(); ++i)
{
if (i->info_hash == info_hash) return &(*i);
if ((*i)->info_hash == info_hash) return i->get();
}
for (std::deque<boost::shared_ptr<piece_checker_data> >::iterator i
= m_processing.begin(); i != m_processing.end(); ++i)
{
if ((*i)->info_hash == info_hash) return i->get();
}
return 0;
}
void checker_impl::remove_torrent(sha1_hash const& info_hash)
{
for (std::deque<piece_checker_data>::iterator i
for (std::deque<boost::shared_ptr<piece_checker_data> >::iterator i
= m_torrents.begin(); i != m_torrents.end(); ++i)
{
if (i->info_hash == info_hash)
if ((*i)->info_hash == info_hash)
{
assert((*i)->processing == false);
m_torrents.erase(i);
return;
}
}
assert(false);
}
session_impl::session_impl(
@@ -389,8 +521,7 @@ namespace libtorrent { namespace detail
}
for (std::vector<boost::shared_ptr<libtorrent::socket> >::iterator i =
writable_clients.begin(); i != writable_clients.end();
++i)
writable_clients.begin(); i != writable_clients.end(); ++i)
{
assert((*i)->is_writable());
}
@@ -969,16 +1100,16 @@ namespace libtorrent
std::vector<torrent_handle> session::get_torrents()
{
boost::mutex::scoped_lock l(m_checker_impl.m_mutex);
boost::mutex::scoped_lock l2(m_impl.m_mutex);
boost::mutex::scoped_lock l(m_impl.m_mutex);
boost::mutex::scoped_lock l2(m_checker_impl.m_mutex);
std::vector<torrent_handle> ret;
for (std::deque<detail::piece_checker_data>::iterator i
for (std::deque<boost::shared_ptr<detail::piece_checker_data> >::iterator i
= m_checker_impl.m_torrents.begin()
, end(m_checker_impl.m_torrents.end()); i != end; ++i)
{
if (i->abort) continue;
if ((*i)->abort) continue;
ret.push_back(torrent_handle(&m_impl, &m_checker_impl
, i->info_hash));
, (*i)->info_hash));
}
for (detail::session_impl::torrent_map::iterator i
@@ -1041,14 +1172,15 @@ namespace libtorrent
// the checker thread and store it before starting
// the thread
boost::shared_ptr<torrent> torrent_ptr(
new torrent(m_impl, metadata, save_path, m_impl.m_listen_interface
, compact_mode, block_size));
new torrent(m_impl, m_checker_impl, metadata, save_path
, m_impl.m_listen_interface, compact_mode, block_size));
detail::piece_checker_data d;
d.torrent_ptr = torrent_ptr;
d.save_path = save_path;
d.info_hash = ti.info_hash();
d.resume_data = resume_data;
boost::shared_ptr<detail::piece_checker_data> d(
new detail::piece_checker_data);
d->torrent_ptr = torrent_ptr;
d->save_path = save_path;
d->info_hash = ti.info_hash();
d->resume_data = resume_data;
// add the torrent to the queue to be checked
m_checker_impl.m_torrents.push_back(d);
@@ -1105,7 +1237,7 @@ namespace libtorrent
// the checker thread and store it before starting
// the thread
boost::shared_ptr<torrent> torrent_ptr(
new torrent(m_impl, tracker_url, info_hash, save_path
new torrent(m_impl, m_checker_impl, tracker_url, info_hash, save_path
, m_impl.m_listen_interface, compact_mode, block_size));
m_impl.m_torrents.insert(
@@ -1222,7 +1354,7 @@ namespace libtorrent
// abort the currently checking torrent
if (!m_checker_impl.m_torrents.empty())
{
m_checker_impl.m_torrents.front().abort = true;
m_checker_impl.m_torrents.front()->abort = true;
}
m_checker_impl.m_cond.notify_one();
}