diff --git a/include/libtorrent/aux_/session_impl.hpp b/include/libtorrent/aux_/session_impl.hpp index b7d79ec66..933b26675 100644 --- a/include/libtorrent/aux_/session_impl.hpp +++ b/include/libtorrent/aux_/session_impl.hpp @@ -146,6 +146,7 @@ namespace libtorrent #endif ); ~session_impl(); + void start(); #ifndef TORRENT_DISABLE_EXTENSIONS void add_extension(boost::function( diff --git a/src/session_impl.cpp b/src/session_impl.cpp index 7aa93f78a..b9f6b26b1 100644 --- a/src/session_impl.cpp +++ b/src/session_impl.cpp @@ -750,14 +750,26 @@ namespace aux { url_random((char*)&m_peer_id[print.length()], (char*)&m_peer_id[0] + 20); + m_thread.reset(new thread(boost::bind(&session_impl::main_thread, this))); + } + + void session_impl::start() + { + // this is where we should set up all async operations. This + // is called from within the network thread as opposed to the + // constructor which is called from the main thread + + error_code ec; m_timer.expires_from_now(milliseconds(m_settings.tick_interval), ec); m_timer.async_wait(boost::bind(&session_impl::on_tick, this, _1)); + TORRENT_ASSERT(!ec); int delay = (std::max)(m_settings.local_service_announce_interval / (std::max)(int(m_torrents.size()), 1), 1); m_lsd_announce_timer.expires_from_now(seconds(delay), ec); m_lsd_announce_timer.async_wait( boost::bind(&session_impl::on_lsd_announce, this, _1)); + TORRENT_ASSERT(!ec); #ifndef TORRENT_DISABLE_DHT delay = (std::max)(m_settings.dht_announce_interval @@ -765,16 +777,17 @@ namespace aux { m_dht_announce_timer.expires_from_now(seconds(delay), ec); m_dht_announce_timer.async_wait( boost::bind(&session_impl::on_dht_announce, this, _1)); + TORRENT_ASSERT(!ec); #endif // no reuse_address open_listen_port(false); - - m_thread.reset(new thread(boost::bind(&session_impl::main_thread, this))); } void session_impl::save_state(entry* eptr, boost::uint32_t flags) const { + TORRENT_ASSERT(is_network_thread()); + entry& e = *eptr; if (flags & session::save_settings) @@ -836,6 +849,8 @@ namespace aux { void session_impl::set_proxy(proxy_settings const& s) { + TORRENT_ASSERT(is_network_thread()); + m_proxy = s; // in case we just set a socks proxy, we might have to // open the socks incoming connection @@ -845,6 +860,8 @@ namespace aux { void session_impl::load_state(lazy_entry const* e) { + TORRENT_ASSERT(is_network_thread()); + lazy_entry const* settings; if (e->type() != lazy_entry::dict_t) return; @@ -927,12 +944,16 @@ namespace aux { char const* session_impl::country_for_ip(address const& a) { + TORRENT_ASSERT(is_network_thread()); + if (!a.is_v4() || m_country_db == 0) return 0; return GeoIP_country_code_by_ipnum(m_country_db, a.to_v4().to_ulong()); } int session_impl::as_for_ip(address const& a) { + TORRENT_ASSERT(is_network_thread()); + if (!a.is_v4() || m_asnum_db == 0) return 0; char* name = GeoIP_name_by_ipnum(m_asnum_db, a.to_v4().to_ulong()); if (name == 0) return 0; @@ -943,6 +964,8 @@ namespace aux { std::string session_impl::as_name_for_ip(address const& a) { + TORRENT_ASSERT(is_network_thread()); + if (!a.is_v4() || m_asnum_db == 0) return std::string(); char* name = GeoIP_name_by_ipnum(m_asnum_db, a.to_v4().to_ulong()); if (name == 0) return std::string(); @@ -954,6 +977,8 @@ namespace aux { std::pair* session_impl::lookup_as(int as) { + TORRENT_ASSERT(is_network_thread()); + std::map::iterator i = m_as_peak.lower_bound(as); if (i == m_as_peak.end() || i->first != as) @@ -966,6 +991,8 @@ namespace aux { void session_impl::load_asnum_db(std::string file) { + TORRENT_ASSERT(is_network_thread()); + if (m_asnum_db) GeoIP_delete(m_asnum_db); m_asnum_db = GeoIP_open(file.c_str(), GEOIP_STANDARD); // return m_asnum_db; @@ -974,6 +1001,8 @@ namespace aux { #if TORRENT_USE_WSTRING void session_impl::load_asnum_dbw(std::wstring file) { + TORRENT_ASSERT(is_network_thread()); + if (m_asnum_db) GeoIP_delete(m_asnum_db); std::string utf8; wchar_utf8(file, utf8); @@ -983,6 +1012,8 @@ namespace aux { void session_impl::load_country_dbw(std::wstring file) { + TORRENT_ASSERT(is_network_thread()); + if (m_country_db) GeoIP_delete(m_country_db); std::string utf8; wchar_utf8(file, utf8); @@ -993,6 +1024,8 @@ namespace aux { void session_impl::load_country_db(std::string file) { + TORRENT_ASSERT(is_network_thread()); + if (m_country_db) GeoIP_delete(m_country_db); m_country_db = GeoIP_open(file.c_str(), GEOIP_STANDARD); // return m_country_db; @@ -1004,6 +1037,7 @@ namespace aux { void session_impl::add_extension( boost::function(torrent*, void*)> ext) { + TORRENT_ASSERT(is_network_thread()); TORRENT_ASSERT_VAL(ext, ext); typedef boost::shared_ptr(*function_t)(torrent*, void*); @@ -1022,12 +1056,16 @@ namespace aux { #ifndef TORRENT_DISABLE_DHT void session_impl::add_dht_node(udp::endpoint n) { + TORRENT_ASSERT(is_network_thread()); + if (m_dht) m_dht->add_node(n); } #endif void session_impl::pause() { + TORRENT_ASSERT(is_network_thread()); + if (m_paused) return; #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING (*m_logger) << time_now_string() << " *** session paused ***\n"; @@ -1043,6 +1081,8 @@ namespace aux { void session_impl::resume() { + TORRENT_ASSERT(is_network_thread()); + if (!m_paused) return; m_paused = false; for (torrent_map::iterator i = m_torrents.begin() @@ -1055,6 +1095,8 @@ namespace aux { void session_impl::abort() { + TORRENT_ASSERT(is_network_thread()); + if (m_abort) return; #if defined TORRENT_LOGGING (*m_logger) << time_now_string() << " *** ABORT CALLED ***\n"; @@ -1449,6 +1491,8 @@ namespace aux { void session_impl::open_listen_port(bool reuse_address) { + TORRENT_ASSERT(is_network_thread()); + // close the open listen sockets m_listen_sockets.clear(); m_incoming_connection = false; @@ -3028,6 +3072,9 @@ namespace aux { TORRENT_ASSERT(is_network_thread()); eh_initializer(); + // initialize async operations + start(); + bool stop_loop = false; while (!stop_loop) { diff --git a/src/udp_socket.cpp b/src/udp_socket.cpp index a86c7346d..fac96c965 100644 --- a/src/udp_socket.cpp +++ b/src/udp_socket.cpp @@ -36,6 +36,7 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/connection_queue.hpp" #include "libtorrent/escape_string.hpp" #include "libtorrent/socket_io.hpp" +#include "libtorrent/error.hpp" #include #include #include @@ -79,7 +80,7 @@ udp_socket::~udp_socket() #ifdef TORRENT_DEBUG TORRENT_ASSERT(m_magic == 0x1337); TORRENT_ASSERT(!m_callback || !m_started); - TORRENT_ASSERT(m_outstanding == 0); + TORRENT_ASSERT_VAL(m_outstanding == 0, m_outstanding); m_magic = 0; #endif } @@ -420,11 +421,17 @@ void udp_socket::close() TORRENT_ASSERT(m_magic == 0x1337); error_code ec; + // if we close the socket here, we can't shut down + // utp connections or NAT-PMP. We need to cancel the + // outstanding operations m_ipv4_sock.cancel(ec); + TORRENT_ASSERT_VAL(!ec || ec == error::bad_descriptor, ec); #if TORRENT_USE_IPV6 m_ipv6_sock.cancel(ec); + TORRENT_ASSERT_VAL(!ec || ec == error::bad_descriptor, ec); #endif m_socks5_sock.cancel(ec); + TORRENT_ASSERT_VAL(!ec || ec == error::bad_descriptor, ec); m_resolver.cancel(); m_abort = true; @@ -615,7 +622,7 @@ void udp_socket::on_connected(error_code const& e) write_uint8(0, p); // no authentication write_uint8(2, p); // username/password } - TORRENT_ASSERT(p - m_tmp_buf < sizeof(m_tmp_buf)); + TORRENT_ASSERT_VAL(p - m_tmp_buf < sizeof(m_tmp_buf), (p - m_tmp_buf)); asio::async_write(m_socks5_sock, asio::buffer(m_tmp_buf, p - m_tmp_buf) , boost::bind(&udp_socket::handshake1, this, _1)); } @@ -666,7 +673,7 @@ void udp_socket::handshake2(error_code const& e) write_string(m_proxy_settings.username, p); write_uint8(m_proxy_settings.password.size(), p); write_string(m_proxy_settings.password, p); - TORRENT_ASSERT(p - m_tmp_buf < sizeof(m_tmp_buf)); + TORRENT_ASSERT_VAL(p - m_tmp_buf < sizeof(m_tmp_buf), (p - m_tmp_buf)); asio::async_write(m_socks5_sock, asio::buffer(m_tmp_buf, p - m_tmp_buf) , boost::bind(&udp_socket::handshake3, this, _1)); } @@ -732,7 +739,7 @@ void udp_socket::socks_forward_udp(mutex::scoped_lock& l) port = m_ipv6_sock.local_endpoint(ec).port(); #endif detail::write_uint16(port , p); - TORRENT_ASSERT(p - m_tmp_buf < sizeof(m_tmp_buf)); + TORRENT_ASSERT_VAL(p - m_tmp_buf < sizeof(m_tmp_buf), (p - m_tmp_buf)); asio::async_write(m_socks5_sock, asio::buffer(m_tmp_buf, p - m_tmp_buf) , boost::bind(&udp_socket::connect1, this, _1)); } @@ -828,7 +835,7 @@ rate_limited_udp_socket::rate_limited_udp_socket(io_service& ios error_code ec; m_timer.expires_from_now(seconds(1), ec); m_timer.async_wait(boost::bind(&rate_limited_udp_socket::on_tick, this, _1)); - TORRENT_ASSERT(!ec); + TORRENT_ASSERT_VAL(!ec, ec); } bool rate_limited_udp_socket::send(udp::endpoint const& ep, char const* p, int len, error_code& ec, int flags)