more IPv6 fixes. support for multihomed machines by listening on multiple interfaces. added listen_succeeded_alert to advertize which interfaces are being listened on

This commit is contained in:
Arvid Norberg
2007-09-22 16:27:29 +00:00
parent 022c38d5f4
commit 2e93b92cb4
8 changed files with 285 additions and 157 deletions

View File

@@ -73,6 +73,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/socket.hpp"
#include "libtorrent/aux_/session_impl.hpp"
#include "libtorrent/kademlia/dht_tracker.hpp"
#include "libtorrent/enum_net.hpp"
#ifndef TORRENT_DISABLE_ENCRYPTION
@@ -512,9 +513,8 @@ namespace detail
, m_download_channel(m_io_service, peer_connection::download_channel)
, m_upload_channel(m_io_service, peer_connection::upload_channel)
, m_tracker_manager(m_settings, m_tracker_proxy)
, m_listen_port_range(listen_port_range)
, m_listen_port_retries(listen_port_range.second - listen_port_range.first)
, m_listen_interface(address::from_string(listen_interface), listen_port_range.first)
, m_external_listen_port(0)
, m_abort(false)
, m_max_uploads(8)
, m_max_connections(200)
@@ -661,129 +661,212 @@ namespace detail
*i = ' ';
}
void session_impl::open_listen_port()
tcp::endpoint session_impl::get_ipv6_interface() const
{
try
{
// create listener socket
m_listen_socket.reset(new socket_acceptor(m_io_service));
return m_ipv6_interface;
}
for(;;)
{
try
{
m_listen_socket->open(m_listen_interface.protocol());
m_listen_socket->set_option(socket_acceptor::reuse_address(true));
m_listen_socket->bind(m_listen_interface);
m_listen_socket->listen();
m_listen_interface = m_listen_socket->local_endpoint();
m_external_listen_port = m_listen_interface.port();
break;
}
catch (asio::system_error& e)
{
// TODO: make sure this is correct
if (e.code() == asio::error::host_not_found
|| m_listen_interface.port() == 0)
{
if (m_alerts.should_post(alert::fatal))
{
std::string msg = "cannot listen on the given interface '"
+ m_listen_interface.address().to_string() + "'";
m_alerts.post_alert(listen_failed_alert(msg));
}
#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
std::string msg = "cannot listen on the given interface '"
+ m_listen_interface.address().to_string() + "'";
(*m_logger) << msg << "\n";
#endif
assert(m_listen_socket.unique());
m_listen_socket.reset();
break;
}
m_listen_socket->close();
m_listen_interface.port(m_listen_interface.port() + 1);
if (m_listen_interface.port() > m_listen_port_range.second)
{
std::stringstream msg;
msg << "none of the ports in the range ["
<< m_listen_port_range.first
<< ", " << m_listen_port_range.second
<< "] could be opened for listening";
m_alerts.post_alert(listen_failed_alert(msg.str()));
#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
(*m_logger) << msg.str() << "\n";
#endif
m_listen_socket.reset();
break;
}
}
}
session_impl::listen_socket_t session_impl::setup_listener(tcp::endpoint ep, int retries)
{
asio::error_code ec;
listen_socket_t s;
s.sock.reset(new socket_acceptor(m_io_service));
s.sock->open(ep.protocol(), ec);
s.sock->set_option(socket_acceptor::reuse_address(true), ec);
s.sock->bind(ep, ec);
while (ec && retries > 0)
{
ec = asio::error_code();
assert(!ec);
--retries;
ep.port(ep.port() + 1);
s.sock->bind(ep, ec);
}
catch (asio::system_error& e)
if (ec)
{
// instead of giving up, try
// let the OS pick a port
ep.port(0);
ec = asio::error_code();
s.sock->bind(ep, ec);
}
if (ec)
{
// not even that worked, give up
if (m_alerts.should_post(alert::fatal))
{
std::string msg = "cannot bind to the given interface '"
+ boost::lexical_cast<std::string>(ep) + "' " + ec.message();
m_alerts.post_alert(listen_failed_alert(ep, msg));
}
#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
std::string msg = "cannot bind to the given interface '"
+ boost::lexical_cast<std::string>(ep) + "' " + ec.message();
(*m_logger) << msg << "\n";
#endif
return listen_socket_t();
}
s.external_port = s.sock->local_endpoint(ec).port();
s.sock->listen(0, ec);
if (ec)
{
if (m_alerts.should_post(alert::fatal))
{
m_alerts.post_alert(listen_failed_alert(
std::string("failed to open listen port: ") + e.what()));
std::string msg = "cannot listen the given interface '"
+ boost::lexical_cast<std::string>(ep) + "' " + ec.message();
m_alerts.post_alert(listen_failed_alert(ep, msg));
}
#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
std::string msg = "cannot listen the given interface '"
+ boost::lexical_cast<std::string>(ep) + "' " + ec.message();
(*m_logger) << msg << "\n";
#endif
return listen_socket_t();
}
if (m_alerts.should_post(alert::fatal))
{
std::string msg = "listening on interface "
+ boost::lexical_cast<std::string>(ep);
m_alerts.post_alert(listen_succeeded_alert(ep, msg));
}
#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
(*m_logger) << "listening on: " << ep.address().to_string() << ":" << ep.port()
<< " external port: " << s.external_port << "\n";
#endif
return s;
}
void session_impl::open_listen_port() throw()
{
// close the open listen sockets
m_listen_sockets.clear();
m_incoming_connection = false;
if (is_any(m_listen_interface.address()))
{
// this means we should open two listen sockets
// one for IPv4 and one for IPv6
listen_socket_t s = setup_listener(
tcp::endpoint(address_v4::any(), m_listen_interface.port())
, m_listen_port_retries);
if (s.sock)
{
m_listen_sockets.push_back(s);
async_accept(s.sock);
}
s = setup_listener(
tcp::endpoint(address_v6::any(), m_listen_interface.port())
, m_listen_port_retries);
if (s.sock)
{
m_listen_sockets.push_back(s);
async_accept(s.sock);
}
}
else
{
// we should only open a single listen socket, that
// binds to the given interface
listen_socket_t s = setup_listener(
m_listen_interface, m_listen_port_retries);
if (s.sock)
{
m_listen_sockets.push_back(s);
async_accept(s.sock);
}
}
if (m_listen_socket)
m_ipv6_interface = tcp::endpoint();
for (std::list<listen_socket_t>::const_iterator i = m_listen_sockets.begin()
, end(m_listen_sockets.end()); i != end; ++i)
{
#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
(*m_logger) << "listening on port: " << m_listen_interface.port()
<< " external port: " << m_external_listen_port << "\n";
#endif
async_accept();
if (m_natpmp.get())
m_natpmp->set_mappings(m_listen_interface.port(), 0);
if (m_upnp.get())
m_upnp->set_mappings(m_listen_interface.port(), 0);
asio::error_code ec;
tcp::endpoint ep = i->sock->local_endpoint(ec);
if (ec || ep.address().is_v4()) continue;
std::vector<address> const& ifs = enum_net_interfaces(m_io_service, ec);
for (std::vector<address>::const_iterator i = ifs.begin()
, end(ifs.end()); i != end; ++i)
{
if (i->is_v4() || i->to_v6().is_link_local() || i->to_v6().is_loopback()) continue;
m_ipv6_interface = tcp::endpoint(*i, ep.port());
break;
}
break;
}
if (!m_listen_sockets.empty())
{
asio::error_code ec;
tcp::endpoint local = m_listen_sockets.front().sock->local_endpoint(ec);
if (!ec)
{
if (m_natpmp.get()) m_natpmp->set_mappings(local.port(), 0);
if (m_upnp.get()) m_upnp->set_mappings(local.port(), 0);
}
}
}
void session_impl::async_accept()
void session_impl::async_accept(boost::shared_ptr<socket_acceptor> const& listener)
{
shared_ptr<socket_type> c(new socket_type(m_io_service));
c->instantiate<stream_socket>();
m_listen_socket->async_accept(c->get<stream_socket>()
listener->async_accept(c->get<stream_socket>()
, bind(&session_impl::on_incoming_connection, this, c
, weak_ptr<socket_acceptor>(m_listen_socket), _1));
, boost::weak_ptr<socket_acceptor>(listener), _1));
}
void session_impl::on_incoming_connection(shared_ptr<socket_type> const& s
, weak_ptr<socket_acceptor> const& listen_socket, asio::error_code const& e) try
, weak_ptr<socket_acceptor> listen_socket, asio::error_code const& e) try
{
if (listen_socket.expired())
return;
boost::shared_ptr<socket_acceptor> listener = listen_socket.lock();
if (!listener) return;
if (e == asio::error::operation_aborted)
return;
if (e == asio::error::operation_aborted) return;
mutex_t::scoped_lock l(m_mutex);
assert(listen_socket.lock() == m_listen_socket);
if (m_abort) return;
asio::error_code ec;
if (e)
{
tcp::endpoint ep = listener->local_endpoint(ec);
#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
std::string msg = "error accepting connection on '"
+ m_listen_interface.address().to_string() + "'";
+ boost::lexical_cast<std::string>(ep) + "' " + e.message();
(*m_logger) << msg << "\n";
#endif
assert(m_listen_socket.unique());
// try any random port
m_listen_interface.port(0);
open_listen_port();
if (m_alerts.should_post(alert::fatal))
{
std::string msg = "error accepting connection on '"
+ boost::lexical_cast<std::string>(ep) + "' " + ec.message();
m_alerts.post_alert(listen_failed_alert(ep, msg));
}
return;
}
async_accept();
async_accept(listener);
// we got a connection request!
m_incoming_connection = true;
tcp::endpoint endp = s->remote_endpoint();
tcp::endpoint endp = s->remote_endpoint(ec);
if (ec)
{
#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
(*m_logger) << endp << " <== INCOMING CONNECTION FAILED, could "
"not retrieve remote endpoint " << ec.message() << "\n";
#endif
return;
}
#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
(*m_logger) << endp << " <== INCOMING CONNECTION\n";
@@ -803,28 +886,22 @@ namespace detail
// check if we have any active torrents
// if we don't reject the connection
if (m_torrents.empty())
if (m_torrents.empty()) return;
bool has_active_torrent = false;
for (torrent_map::iterator i = m_torrents.begin()
, end(m_torrents.end()); i != end; ++i)
{
return;
}
else
{
bool has_active_torrent = false;
for (torrent_map::iterator i = m_torrents.begin()
, end(m_torrents.end()); i != end; ++i)
if (!i->second->is_paused())
{
if (!i->second->is_paused())
{
has_active_torrent = true;
break;
}
has_active_torrent = true;
break;
}
if (!has_active_torrent)
return;
}
if (!has_active_torrent) return;
boost::intrusive_ptr<peer_connection> c(
new bt_peer_connection(*this, s, 0));
new bt_peer_connection(*this, s, 0));
#ifndef NDEBUG
c->m_in_constructor = false;
#endif
@@ -1064,7 +1141,9 @@ namespace detail
if (t.should_request())
{
tracker_request req = t.generate_tracker_request();
req.listen_port = m_external_listen_port;
req.listen_port = 0;
if (!m_listen_sockets.empty())
req.listen_port = m_listen_sockets.front().external_port;
req.key = m_key;
m_tracker_manager.queue_request(m_strand, m_half_open, req
, t.tracker_login(), m_listen_interface.address(), i->second);
@@ -1274,10 +1353,9 @@ namespace detail
{
eh_initializer();
if (m_listen_port_range.first != 0 && m_listen_port_range.second != 0)
{
session_impl::mutex_t::scoped_lock l(m_mutex);
open_listen_port();
if (m_listen_interface.port() != 0) open_listen_port();
}
ptime timer = time_now();
@@ -1331,8 +1409,10 @@ namespace detail
&& !i->second->trackers().empty())
{
tracker_request req = i->second->generate_tracker_request();
assert(m_external_listen_port > 0);
req.listen_port = m_external_listen_port;
assert(!m_listen_sockets.empty());
req.listen_port = 0;
if (!m_listen_sockets.empty())
req.listen_port = m_listen_sockets.front().external_port;
req.key = m_key;
std::string login = i->second->tracker_login();
#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
@@ -1347,8 +1427,8 @@ namespace detail
}
}
// close listen socket
m_listen_socket.reset();
// close the listen sockets
m_listen_sockets.clear();
ptime start(time_now());
l.unlock();
@@ -1477,10 +1557,12 @@ namespace detail
, bool paused
, void* userdata)
{
assert(!save_path.empty());
// if you get this assert, you haven't managed to
// open a listen port. call listen_on() first.
assert(m_external_listen_port > 0);
assert(!save_path.empty());
if (m_listen_sockets.empty())
throw std::runtime_error("no listen socket opened");
if (ti->begin_files() == ti->end_files())
throw std::runtime_error("no files in torrent");
@@ -1489,7 +1571,7 @@ namespace detail
mutex_t::scoped_lock l(m_mutex);
mutex::scoped_lock l2(m_checker_impl.m_mutex);
INVARIANT_CHECK;
// INVARIANT_CHECK;
if (is_aborted())
throw std::runtime_error("session is closing");
@@ -1573,7 +1655,7 @@ namespace detail
// lock the session
session_impl::mutex_t::scoped_lock l(m_mutex);
INVARIANT_CHECK;
// INVARIANT_CHECK;
// is the torrent already active?
if (!find_torrent(info_hash).expired())
@@ -1628,8 +1710,10 @@ namespace detail
{
tracker_request req = t.generate_tracker_request();
assert(req.event == tracker_request::stopped);
assert(m_external_listen_port > 0);
req.listen_port = m_external_listen_port;
assert(!m_listen_sockets.empty());
req.listen_port = 0;
if (!m_listen_sockets.empty())
req.listen_port = m_listen_sockets.front().external_port;
req.key = m_key;
#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
@@ -1684,19 +1768,15 @@ namespace detail
if (net_interface && std::strlen(net_interface) > 0)
new_interface = tcp::endpoint(address::from_string(net_interface), port_range.first);
else
new_interface = tcp::endpoint(address(), port_range.first);
new_interface = tcp::endpoint(address_v4::any(), port_range.first);
m_listen_port_range = port_range;
m_listen_port_retries = port_range.second - port_range.first;
// if the interface is the same and the socket is open
// don't do anything
if (new_interface == m_listen_interface
&& m_listen_socket) return true;
&& !m_listen_sockets.empty()) return true;
if (m_listen_socket)
m_listen_socket.reset();
m_incoming_connection = false;
m_listen_interface = new_interface;
open_listen_port();
@@ -1723,13 +1803,14 @@ namespace detail
(*m_logger) << time_now_string() << "\n";
#endif
return m_listen_socket;
return !m_listen_sockets.empty();
}
unsigned short session_impl::listen_port() const
{
mutex_t::scoped_lock l(m_mutex);
return m_external_listen_port;
if (m_listen_sockets.empty()) return 0;
return m_listen_sockets.front().external_port;;
}
void session_impl::announce_lsd(sha1_hash const& ih)
@@ -1777,7 +1858,8 @@ namespace detail
if (tcp_port != 0)
{
m_external_listen_port = tcp_port;
if (!m_listen_sockets.empty())
m_listen_sockets.front().external_port = tcp_port;
if (m_alerts.should_post(alert::info))
{
std::stringstream msg;
@@ -1801,7 +1883,7 @@ namespace detail
{
mutex_t::scoped_lock l(m_mutex);
INVARIANT_CHECK;
// INVARIANT_CHECK;
session_status s;
s.has_incoming_connections = m_incoming_connection;
@@ -1945,7 +2027,7 @@ namespace detail
bool session_impl::is_listening() const
{
mutex_t::scoped_lock l(m_mutex);
return m_listen_socket;
return !m_listen_sockets.empty();
}
session_impl::~session_impl()