removed the session mutex for improved performance

This commit is contained in:
Arvid Norberg
2010-07-14 04:16:38 +00:00
parent 3310198dae
commit 4e576f93fd
21 changed files with 753 additions and 566 deletions

View File

@@ -750,8 +750,10 @@ namespace aux {
m_thread.reset(new thread(boost::bind(&session_impl::main_thread, this)));
}
void session_impl::save_state(entry& e, boost::uint32_t flags, mutex::scoped_lock& l) const
void session_impl::save_state(entry* eptr, boost::uint32_t flags) const
{
entry& e = *eptr;
if (flags & session::save_settings)
{
// TODO: move these to session_settings
@@ -780,12 +782,7 @@ namespace aux {
#ifndef TORRENT_DISABLE_DHT
if (m_dht && (flags & session::save_dht_state))
{
condition cond;
entry& state = e["dht state"];
bool done = false;
m_io_service.post(boost::bind(&session_impl::on_dht_state_callback
, this, boost::ref(cond), boost::ref(state), boost::ref(done)));
while (!done) cond.wait(l);
e["dht state"] = m_dht->state();
}
#endif
@@ -814,29 +811,29 @@ namespace aux {
}
void session_impl::load_state(lazy_entry const& e)
void session_impl::load_state(lazy_entry const* e)
{
lazy_entry const* settings;
if (e.type() != lazy_entry::dict_t) return;
if (e->type() != lazy_entry::dict_t) return;
set_upload_rate_limit(e.dict_find_int_value("upload_rate_limit", 0));
set_download_rate_limit(e.dict_find_int_value("download_rate_limit", 0));
set_local_upload_rate_limit(e.dict_find_int_value("local_upload_rate_limit", 0));
set_local_download_rate_limit(e.dict_find_int_value("local_download_rate_limit", 0));
set_max_uploads(e.dict_find_int_value("max_uploads", 0));
set_max_half_open_connections(e.dict_find_int_value("max_half_open_connections", 0));
set_max_connections(e.dict_find_int_value("max_connections", 0));
set_upload_rate_limit(e->dict_find_int_value("upload_rate_limit", 0));
set_download_rate_limit(e->dict_find_int_value("download_rate_limit", 0));
set_local_upload_rate_limit(e->dict_find_int_value("local_upload_rate_limit", 0));
set_local_download_rate_limit(e->dict_find_int_value("local_download_rate_limit", 0));
set_max_uploads(e->dict_find_int_value("max_uploads", 0));
set_max_half_open_connections(e->dict_find_int_value("max_half_open_connections", 0));
set_max_connections(e->dict_find_int_value("max_connections", 0));
for (int i = 0; i < sizeof(all_settings)/sizeof(all_settings[0]); ++i)
{
session_category const& c = all_settings[i];
settings = e.dict_find_dict(c.name);
settings = e->dict_find_dict(c.name);
if (!settings) continue;
load_struct(*settings, reinterpret_cast<char*>(this) + c.offset, c.map, c.num_entries);
}
#ifndef TORRENT_DISABLE_DHT
settings = e.dict_find_dict("dht");
settings = e->dict_find_dict("dht");
if (settings)
{
dht_settings s;
@@ -844,7 +841,7 @@ namespace aux {
, sizeof(dht_settings_map)/sizeof(dht_settings_map[0]));
set_dht_settings(s);
}
settings = e.dict_find_dict("dht state");
settings = e->dict_find_dict("dht state");
if (settings)
{
m_dht_state = *settings;
@@ -853,7 +850,7 @@ namespace aux {
#endif
#if TORRENT_USE_I2P
settings = e.dict_find_dict("i2p");
settings = e->dict_find_dict("i2p");
if (settings)
{
proxy_settings s;
@@ -863,7 +860,7 @@ namespace aux {
}
#endif
#ifndef TORRENT_DISABLE_GEO_IP
settings = e.dict_find_dict("AS map");
settings = e->dict_find_dict("AS map");
if (settings)
{
for (int i = 0; i < settings->dict_size(); ++i)
@@ -1115,6 +1112,15 @@ namespace aux {
// the uTP connections cannot be closed gracefully
m_udp_socket.close();
m_external_udp_port = 0;
#ifndef TORRENT_DISABLE_GEO_IP
if (m_asnum_db) GeoIP_delete(m_asnum_db);
if (m_country_db) GeoIP_delete(m_country_db);
m_asnum_db = 0;
m_country_db = 0;
#endif
m_disk_thread.abort();
}
void session_impl::set_port_filter(port_filter const& f)
@@ -1635,13 +1641,12 @@ namespace aux {
void session_impl::on_accept_connection(shared_ptr<socket_type> const& s
, weak_ptr<socket_acceptor> listen_socket, error_code const& e)
{
TORRENT_ASSERT(is_network_thread());
boost::shared_ptr<socket_acceptor> listener = listen_socket.lock();
if (!listener) return;
if (e == asio::error::operation_aborted) return;
mutex::scoped_lock l(m_mutex);
if (m_abort) return;
error_code ec;
@@ -1918,8 +1923,8 @@ namespace aux {
// wake them up
void session_impl::on_disk_queue()
{
mutex::scoped_lock l(m_mutex);
TORRENT_ASSERT(is_network_thread());
for (connection_map::iterator i = m_connections.begin();
i != m_connections.end();)
{
@@ -1948,7 +1953,7 @@ namespace aux {
void session_impl::on_tick(error_code const& e)
{
mutex::scoped_lock l(m_mutex);
TORRENT_ASSERT(is_network_thread());
ptime now = time_now_hires();
aux::g_current_time = now;
@@ -2438,9 +2443,9 @@ namespace aux {
void session_impl::on_dht_announce(error_code const& e)
{
TORRENT_ASSERT(is_network_thread());
if (e) return;
mutex::scoped_lock l(m_mutex);
if (m_abort) return;
// announce to DHT every 15 minutes
@@ -2464,9 +2469,9 @@ namespace aux {
void session_impl::on_lsd_announce(error_code const& e)
{
TORRENT_ASSERT(is_network_thread());
if (e) return;
mutex::scoped_lock l(m_mutex);
if (m_abort) return;
// announce on local network every 5 minutes
@@ -2944,6 +2949,12 @@ namespace aux {
void session_impl::main_thread()
{
#ifdef TORRENT_DEBUG
#if defined BOOST_HAS_PTHREADS
m_network_thread = pthread_self();
#endif
#endif
TORRENT_ASSERT(is_network_thread());
eh_initializer();
bool stop_loop = false;
@@ -2961,7 +2972,6 @@ namespace aux {
}
m_io_service.reset();
mutex::scoped_lock l(m_mutex);
stop_loop = m_abort;
}
@@ -2969,7 +2979,6 @@ namespace aux {
(*m_logger) << time_now_string() << " locking mutex\n";
#endif
mutex::scoped_lock l(m_mutex);
/*
#ifdef TORRENT_DEBUG
for (torrent_map::iterator i = m_torrents.begin();
@@ -2993,6 +3002,8 @@ namespace aux {
// session is locked!
boost::weak_ptr<torrent> session_impl::find_torrent(sha1_hash const& info_hash)
{
TORRENT_ASSERT(is_network_thread());
std::map<sha1_hash, boost::shared_ptr<torrent> >::iterator i
= m_torrents.find(info_hash);
#ifdef TORRENT_DEBUG
@@ -3274,7 +3285,7 @@ namespace aux {
void session_impl::on_lsd_peer(tcp::endpoint peer, sha1_hash const& ih)
{
mutex::scoped_lock l(m_mutex);
TORRENT_ASSERT(is_network_thread());
INVARIANT_CHECK;
@@ -3308,7 +3319,8 @@ namespace aux {
void session_impl::on_port_mapping(int mapping, int port
, error_code const& ec, int map_transport)
{
mutex::scoped_lock l(m_mutex);
TORRENT_ASSERT(is_network_thread());
TORRENT_ASSERT(map_transport >= 0 && map_transport <= 1);
if (mapping == m_udp_mapping[map_transport] && port != 0)
@@ -3521,30 +3533,15 @@ namespace aux {
m_dht_settings = settings;
}
void session_impl::on_dht_state_callback(condition& c
, entry& e, bool& done) const
{
mutex::scoped_lock l(m_mutex);
if (m_dht) e = m_dht->state();
done = true;
c.signal(l);
}
#ifndef TORRENT_NO_DEPRECATE
entry session_impl::dht_state(mutex::scoped_lock& l) const
entry session_impl::dht_state() const
{
condition cond;
if (!m_dht) return entry();
entry e;
bool done = false;
m_io_service.post(boost::bind(&session_impl::on_dht_state_callback
, this, boost::ref(cond), boost::ref(e), boost::ref(done)));
while (!done) cond.wait(l);
return e;
return m_dht->state();
}
#endif
void session_impl::add_dht_node(std::pair<std::string, int> const& node)
void session_impl::add_dht_node_name(std::pair<std::string, int> const& node)
{
TORRENT_ASSERT(m_dht);
m_dht->add_node(node);
@@ -3584,22 +3581,15 @@ namespace aux {
session_impl::~session_impl()
{
mutex::scoped_lock l(m_mutex);
#if defined BOOST_HAS_PTHREADS
TORRENT_ASSERT(!is_network_thread());
#endif
#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
(*m_logger) << time_now_string() << "\n\n *** shutting down session *** \n\n";
#endif
abort();
TORRENT_ASSERT(m_connections.empty());
m_io_service.post(boost::bind(&session_impl::abort, this));
#ifndef TORRENT_DISABLE_GEO_IP
if (m_asnum_db) GeoIP_delete(m_asnum_db);
if (m_country_db) GeoIP_delete(m_country_db);
m_asnum_db = 0;
m_country_db = 0;
#endif
l.unlock();
// we need to wait for the disk-io thread to
// die first, to make sure it won't post any
// more messages to the io_service containing references
@@ -3702,7 +3692,7 @@ namespace aux {
m_upload_channel.throttle(bytes_per_second);
}
void session_impl::set_alert_dispatch(boost::function<void(alert const&)> const& fun)
void session_impl::set_alert_dispatch(boost::function<void(std::auto_ptr<alert>)> const& fun)
{
m_alerts.set_dispatch_function(fun);
}
@@ -3712,9 +3702,7 @@ namespace aux {
// too expensive
// INVARIANT_CHECK;
if (m_alerts.pending())
return m_alerts.get();
return std::auto_ptr<alert>(0);
return m_alerts.get();
}
alert const* session_impl::wait_for_alert(time_duration max_wait)
@@ -3765,10 +3753,22 @@ namespace aux {
m_lsd->use_broadcast(true);
}
void session_impl::start_natpmp(natpmp* n)
natpmp* session_impl::start_natpmp()
{
INVARIANT_CHECK;
if (m_natpmp) return m_natpmp.get();
// the natpmp constructor may fail and call the callbacks
// into the session_impl.
natpmp* n = new (std::nothrow) natpmp(m_io_service
, m_listen_interface.address()
, boost::bind(&session_impl::on_port_mapping
, this, _1, _2, _3, 0)
, boost::bind(&session_impl::on_port_map_log
, this, _1, 0));
if (n == 0) return 0;
m_natpmp = n;
if (m_listen_interface.port() > 0)
@@ -3783,10 +3783,25 @@ namespace aux {
}
}
void session_impl::start_upnp(upnp* u)
upnp* session_impl::start_upnp()
{
INVARIANT_CHECK;
if (m_upnp) return m_upnp.get();
// the upnp constructor may fail and call the callbacks
upnp* u = new (std::nothrow) upnp(m_io_service
, m_half_open
, m_listen_interface.address()
, m_settings.user_agent
, boost::bind(&session_impl::on_port_mapping
, this, _1, _2, _3, 1)
, boost::bind(&session_impl::on_port_map_log
, this, _1, 1)
, m_settings.upnp_ignore_nonrouters);
if (u == 0) return 0;
m_upnp = u;
m_upnp->discover_device();