introduced a proper half open TCP connection limit. Also exposed the connection queue to let clients use the same connection limiter as libtorrent. UPnP connections and tracker connection are now also limited as well as peer connections and web seeds

This commit is contained in:
Arvid Norberg
2007-05-05 00:29:33 +00:00
parent 4000cc6ac3
commit 49bd69cad4
26 changed files with 488 additions and 216 deletions

View File

@@ -480,13 +480,13 @@ namespace libtorrent { namespace detail
, m_dl_bandwidth_manager(m_io_service, peer_connection::download_channel)
, m_ul_bandwidth_manager(m_io_service, peer_connection::upload_channel)
, m_tracker_manager(m_settings, m_tracker_proxy)
, m_half_open(m_io_service)
, m_listen_port_range(listen_port_range)
, m_listen_interface(address::from_string(listen_interface), listen_port_range.first)
, m_external_listen_port(0)
, m_abort(false)
, m_max_uploads(-1)
, m_max_connections(-1)
, m_half_open_limit(-1)
, m_incoming_connection(false)
, m_files(40)
, m_last_tick(time_now())
@@ -496,12 +496,13 @@ namespace libtorrent { namespace detail
#endif
, m_natpmp(m_io_service, m_listen_interface.address()
, bind(&session_impl::on_port_mapping, this, _1, _2, _3))
, m_upnp(m_io_service, m_listen_interface.address()
, m_upnp(m_io_service, m_half_open, m_listen_interface.address()
, m_settings.user_agent
, bind(&session_impl::on_port_mapping, this, _1, _2, _3))
, m_lsd(m_io_service, m_listen_interface.address()
, bind(&session_impl::on_lsd_peer, this, _1, _2))
, m_timer(m_io_service)
, m_next_connect_torrent(0)
, m_checker_impl(*this)
{
@@ -698,35 +699,6 @@ namespace libtorrent { namespace detail
if (m_listen_socket) async_accept();
}
void session_impl::process_connection_queue()
{
while (!m_connection_queue.empty())
{
if ((int)m_half_open.size() >= m_half_open_limit
&& m_half_open_limit > 0)
return;
connection_queue::value_type c = m_connection_queue.front();
try
{
m_connection_queue.pop_front();
assert(c->associated_torrent().lock().get());
c->connect();
m_half_open.insert(std::make_pair(c->get_socket(), c));
}
catch (std::exception& e)
{
c->disconnect();
#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
(*m_logger) << "connect failed [" << c->remote() << "]: "
<< e.what() << "\n";
#endif
}
}
}
void session_impl::async_accept()
{
shared_ptr<socket_type> c(new socket_type(m_io_service));
@@ -808,46 +780,21 @@ namespace libtorrent { namespace detail
connection_map::iterator p = m_connections.find(s);
// the connection may have been disconnected in the receive or send phase
if (p != m_connections.end())
if (p == m_connections.end()) return;
if (m_alerts.should_post(alert::debug))
{
if (m_alerts.should_post(alert::debug))
{
m_alerts.post_alert(
peer_error_alert(
a
, p->second->pid()
, message));
}
m_alerts.post_alert(
peer_error_alert(
a
, p->second->pid()
, message));
}
#if defined(TORRENT_VERBOSE_LOGGING)
(*p->second->m_logger) << "*** CONNECTION FAILED " << message << "\n";
(*p->second->m_logger) << "*** CONNECTION FAILED " << message << "\n";
#endif
p->second->set_failed();
p->second->disconnect();
}
else
{
// the error was not in one of the connected
// conenctions. Look among the half-open ones.
p = m_half_open.find(s);
if (p != m_half_open.end())
{
if (m_alerts.should_post(alert::debug))
{
m_alerts.post_alert(
peer_error_alert(
a
, p->second->pid()
, message));
}
#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
(*m_logger) << "CLOSED: " << a.address().to_string()
<< " " << message << "\n";
#endif
p->second->set_failed();
p->second->disconnect();
}
}
p->second->set_failed();
p->second->disconnect();
}
#ifndef NDEBUG
catch (...)
@@ -861,43 +808,9 @@ namespace libtorrent { namespace detail
mutex_t::scoped_lock l(m_mutex);
assert(p->is_disconnecting());
if (p->is_connecting())
{
assert(p->is_local());
assert(m_connections.find(p->get_socket()) == m_connections.end());
// Since this peer is still connecting, will not be
// in the list of completed connections.
connection_map::iterator i = m_half_open.find(p->get_socket());
if (i == m_half_open.end())
{
// this connection is not in the half-open list, so it
// has to be in the queue, waiting to be connected.
connection_queue::iterator j = std::find(
m_connection_queue.begin(), m_connection_queue.end(), p);
// if this connection was closed while being connected
// it has been removed from the connection queue and
// not yet put into the half-open queue.
if (j != m_connection_queue.end())
m_connection_queue.erase(j);
}
else
{
m_half_open.erase(i);
process_connection_queue();
}
}
else
{
assert(m_half_open.find(p->get_socket()) == m_half_open.end());
assert(std::find(m_connection_queue.begin()
, m_connection_queue.end(), p) == m_connection_queue.end());
connection_map::iterator i = m_connections.find(p->get_socket());
// assert (i != m_connections.end());
if (i != m_connections.end())
m_connections.erase(i);
}
connection_map::iterator i = m_connections.find(p->get_socket());
if (i != m_connections.end())
m_connections.erase(i);
}
void session_impl::set_peer_id(peer_id const& id)
@@ -933,7 +846,34 @@ namespace libtorrent { namespace detail
m_timer.expires_from_now(seconds(1));
m_timer.async_wait(m_strand.wrap(
bind(&session_impl::second_tick, this, _1)));
// let torrents connect to peers if they want to
// if there are any torrents and any free slots
if (!m_torrents.empty() && m_half_open.free_slots())
{
torrent_map::iterator next_connect_torrent = m_torrents.begin();
if (m_next_connect_torrent < int(m_torrents.size()))
std::advance(next_connect_torrent, m_next_connect_torrent);
else
m_next_connect_torrent = 0;
torrent_map::iterator i = next_connect_torrent;
do
{
torrent& t = *i->second;
if (t.want_more_peers())
t.try_connect_peer();
++m_next_connect_torrent;
if (!m_half_open.free_slots()) break;
++i;
if (i == m_torrents.end())
{
assert(m_next_connect_torrent == int(m_torrents.size()));
i = m_torrents.begin();
m_next_connect_torrent = 0;
}
} while (i != next_connect_torrent);
}
// do the second_tick() on each connection
// this will update their statistics (download and upload speeds)
// also purge sockets that have timed out
@@ -972,8 +912,8 @@ namespace libtorrent { namespace detail
// check each torrent for tracker updates
// TODO: do this in a timer-event in each torrent instead
for (std::map<sha1_hash, boost::shared_ptr<torrent> >::iterator i
= m_torrents.begin(); i != m_torrents.end();)
for (torrent_map::iterator i = m_torrents.begin();
i != m_torrents.end();)
{
torrent& t = *i->second;
assert(!t.is_aborted());
@@ -982,8 +922,8 @@ namespace libtorrent { namespace detail
tracker_request req = t.generate_tracker_request();
req.listen_port = m_external_listen_port;
req.key = m_key;
m_tracker_manager.queue_request(m_strand, req, t.tracker_login()
, m_listen_interface.address(), i->second);
m_tracker_manager.queue_request(m_strand, m_half_open, req
, t.tracker_login(), m_listen_interface.address(), i->second);
if (m_alerts.should_post(alert::info))
{
@@ -1033,7 +973,7 @@ namespace libtorrent { namespace detail
assert(false);
#endif
}; // msvc 7.1 seems to require this
/*
void session_impl::connection_completed(
boost::intrusive_ptr<peer_connection> const& p) try
{
@@ -1055,7 +995,7 @@ namespace libtorrent { namespace detail
assert(false);
#endif
};
*/
void session_impl::operator()()
{
eh_initializer();
@@ -1115,9 +1055,11 @@ namespace libtorrent { namespace detail
#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
boost::shared_ptr<tracker_logger> tl(new tracker_logger(*this));
m_tracker_loggers.push_back(tl);
m_tracker_manager.queue_request(m_strand, req, login, m_listen_interface.address(), tl);
m_tracker_manager.queue_request(m_strand, m_half_open, req, login
, m_listen_interface.address(), tl);
#else
m_tracker_manager.queue_request(m_strand, req, login, m_listen_interface.address());
m_tracker_manager.queue_request(m_strand, m_half_open, req, login
, m_listen_interface.address());
#endif
}
}
@@ -1141,11 +1083,6 @@ namespace libtorrent { namespace detail
assert(m_abort);
m_abort = true;
m_connection_queue.clear();
while (!m_half_open.empty())
m_half_open.begin()->second->disconnect();
while (!m_connections.empty())
m_connections.begin()->second->disconnect();
@@ -1412,10 +1349,10 @@ namespace libtorrent { namespace detail
#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
boost::shared_ptr<tracker_logger> tl(new tracker_logger(*this));
m_tracker_loggers.push_back(tl);
m_tracker_manager.queue_request(m_strand, req
m_tracker_manager.queue_request(m_strand, m_half_open, req
, t.tracker_login(), m_listen_interface.address(), tl);
#else
m_tracker_manager.queue_request(m_strand, req
m_tracker_manager.queue_request(m_strand, m_half_open, req
, t.tracker_login(), m_listen_interface.address());
#endif
@@ -1775,7 +1712,7 @@ namespace libtorrent { namespace detail
assert(limit > 0 || limit == -1);
mutex_t::scoped_lock l(m_mutex);
m_half_open_limit = limit;
m_half_open.limit(limit);
}
void session_impl::set_upload_rate_limit(int bytes_per_second)
@@ -1801,7 +1738,7 @@ namespace libtorrent { namespace detail
int session_impl::num_connections() const
{
mutex_t::scoped_lock l(m_mutex);
return m_connections.size() + m_half_open.size();
return m_connections.size();
}
@@ -1835,31 +1772,10 @@ namespace libtorrent { namespace detail
void session_impl::check_invariant(const char *place)
{
assert(place);
for (connection_map::iterator i = m_half_open.begin();
i != m_half_open.end(); ++i)
{
assert(i->second->is_connecting());
}
for (connection_map::iterator i = m_connections.begin();
i != m_connections.end(); ++i)
{
assert(i->second);
assert(!i->second->is_connecting());
if (i->second->is_connecting())
{
std::ofstream error_log("error.log", std::ios_base::app);
boost::intrusive_ptr<peer_connection> p = i->second;
error_log << "peer_connection::is_connecting() " << p->is_connecting() << "\n";
error_log << "peer_connection::can_write() " << p->can_write() << "\n";
error_log << "peer_connection::can_read() " << p->can_read() << "\n";
error_log << "peer_connection::get_peer_id " << p->pid() << "\n";
error_log << "place: " << place << "\n";
error_log.flush();
assert(false);
}
boost::shared_ptr<torrent> t = i->second->associated_torrent().lock();
if (t)