bandwidth limiter fixes. proper priority that cannot starve connections
This commit is contained in:
@@ -1,6 +1,8 @@
|
||||
#include "test.hpp"
|
||||
|
||||
#include "libtorrent/bandwidth_manager.hpp"
|
||||
#include "libtorrent/bandwidth_queue_entry.hpp"
|
||||
#include "libtorrent/bandwidth_limit.hpp"
|
||||
#include "libtorrent/socket.hpp"
|
||||
#include "libtorrent/stat.hpp"
|
||||
#include "libtorrent/time.hpp"
|
||||
@@ -17,6 +19,43 @@ const float sample_time = 6.f; // seconds
|
||||
|
||||
//#define VERBOSE_LOGGING
|
||||
|
||||
|
||||
struct peer_connection: intrusive_ptr_base<peer_connection>
|
||||
{
|
||||
typedef torrent torrent_type;
|
||||
|
||||
peer_connection(io_service& ios, boost::shared_ptr<torrent> const& t
|
||||
, int prio, bool ignore_limits, std::string name);
|
||||
|
||||
bool ignore_bandwidth_limits() { return m_ignore_limits; }
|
||||
int max_assignable_bandwidth(int channel) const
|
||||
{ return m_bandwidth_limit[channel].max_assignable(); }
|
||||
boost::weak_ptr<torrent> associated_torrent() const
|
||||
{ return m_torrent; }
|
||||
bool is_disconnecting() const { return m_abort; }
|
||||
void assign_bandwidth(int channel, int amount);
|
||||
void on_transfer(int channel, int amount);
|
||||
void start();
|
||||
void stop() { m_abort = true; }
|
||||
void expire_bandwidth(int channel, int amount);
|
||||
void tick();
|
||||
|
||||
int bandwidth_throttle(int channel) const
|
||||
{ return m_bandwidth_limit[channel].throttle(); }
|
||||
|
||||
void throttle(int limit) { m_bandwidth_limit[0].throttle(limit); }
|
||||
|
||||
bandwidth_limit m_bandwidth_limit[1];
|
||||
boost::weak_ptr<torrent> m_torrent;
|
||||
int m_priority;
|
||||
bool m_ignore_limits;
|
||||
bool m_abort;
|
||||
libtorrent::stat m_stats;
|
||||
io_service& m_ios;
|
||||
std::string m_name;
|
||||
bool m_writing;
|
||||
};
|
||||
|
||||
struct torrent
|
||||
{
|
||||
torrent(bandwidth_manager<peer_connection, torrent>& m)
|
||||
@@ -43,10 +82,13 @@ struct torrent
|
||||
|
||||
void request_bandwidth(int channel
|
||||
, boost::intrusive_ptr<peer_connection> const& p
|
||||
, bool non_prioritized)
|
||||
, int max_block_size
|
||||
, int priority)
|
||||
{
|
||||
TEST_CHECK(m_bandwidth_limit[channel].throttle() > 0);
|
||||
int block_size = m_bandwidth_limit[channel].throttle() / 10;
|
||||
TORRENT_ASSERT(max_block_size > 0);
|
||||
TORRENT_ASSERT(m_bandwidth_limit[channel].throttle() > 0);
|
||||
int block_size = (std::min)(m_bandwidth_limit[channel].throttle() / 10
|
||||
, max_block_size);
|
||||
if (block_size <= 0) block_size = 1;
|
||||
|
||||
if (m_bandwidth_limit[channel].max_assignable() > 0)
|
||||
@@ -55,7 +97,7 @@ struct torrent
|
||||
std::cerr << time_now_string()
|
||||
<< ": request bandwidth " << block_size << std::endl;
|
||||
#endif
|
||||
perform_bandwidth_request(channel, p, block_size, non_prioritized);
|
||||
perform_bandwidth_request(channel, p, block_size, priority);
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -63,13 +105,16 @@ struct torrent
|
||||
std::cerr << time_now_string()
|
||||
<< ": queue bandwidth request" << block_size << std::endl;
|
||||
#endif
|
||||
|
||||
// skip forward in the queue until we find a prioritized peer
|
||||
// or hit the front of it.
|
||||
queue_t::reverse_iterator i = m_bandwidth_queue[channel].rbegin();
|
||||
while (i != m_bandwidth_queue[channel].rend() && i->non_prioritized) ++i;
|
||||
while (i != m_bandwidth_queue[channel].rend() && priority > i->priority)
|
||||
{
|
||||
++i->priority;
|
||||
++i;
|
||||
}
|
||||
m_bandwidth_queue[channel].insert(i.base(), bw_queue_entry<peer_connection>(
|
||||
p, block_size, non_prioritized));
|
||||
p, block_size, priority));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -78,10 +123,10 @@ struct torrent
|
||||
void perform_bandwidth_request(int channel
|
||||
, boost::intrusive_ptr<peer_connection> const& p
|
||||
, int block_size
|
||||
, bool non_prioritized)
|
||||
, int priority)
|
||||
{
|
||||
m_bandwidth_manager.request_bandwidth(p
|
||||
, block_size, non_prioritized);
|
||||
, block_size, priority);
|
||||
m_bandwidth_limit[channel].assign(block_size);
|
||||
}
|
||||
bandwidth_limit m_bandwidth_limit[1];
|
||||
@@ -90,99 +135,79 @@ struct torrent
|
||||
bandwidth_manager<peer_connection, torrent>& m_bandwidth_manager;
|
||||
};
|
||||
|
||||
struct peer_connection: intrusive_ptr_base<peer_connection>
|
||||
{
|
||||
peer_connection(io_service& ios, boost::shared_ptr<torrent> const& t
|
||||
, bool prio, bool ignore_limits, std::string name)
|
||||
: m_torrent(t)
|
||||
, m_prioritized(prio)
|
||||
, m_ignore_limits(ignore_limits)
|
||||
, m_abort(false)
|
||||
, m_ios(ios)
|
||||
, m_name(name)
|
||||
, m_writing(false)
|
||||
{}
|
||||
peer_connection::peer_connection(io_service& ios, boost::shared_ptr<torrent> const& t
|
||||
, int prio, bool ignore_limits, std::string name)
|
||||
: m_torrent(t)
|
||||
, m_priority(prio)
|
||||
, m_ignore_limits(ignore_limits)
|
||||
, m_abort(false)
|
||||
, m_ios(ios)
|
||||
, m_name(name)
|
||||
, m_writing(false)
|
||||
{}
|
||||
|
||||
bool ignore_bandwidth_limits() { return m_ignore_limits; }
|
||||
int max_assignable_bandwidth(int channel) const
|
||||
{ return m_bandwidth_limit[channel].max_assignable(); }
|
||||
boost::weak_ptr<torrent> associated_torrent() const
|
||||
{ return m_torrent; }
|
||||
bool is_disconnecting() const { return m_abort; }
|
||||
void assign_bandwidth(int channel, int amount)
|
||||
{
|
||||
TEST_CHECK(m_writing);
|
||||
void peer_connection::assign_bandwidth(int channel, int amount)
|
||||
{
|
||||
TEST_CHECK(m_writing);
|
||||
#ifdef VERBOSE_LOGGING
|
||||
std::cerr << time_now_string() << ": [" << m_name
|
||||
<< "] assign bandwidth, " << amount << std::endl;
|
||||
std::cerr << time_now_string() << ": [" << m_name
|
||||
<< "] assign bandwidth, " << amount << std::endl;
|
||||
#endif
|
||||
TEST_CHECK(amount > 0);
|
||||
m_bandwidth_limit[channel].assign(amount);
|
||||
m_ios.post(boost::bind(&peer_connection::on_transfer, self(), channel, amount));
|
||||
}
|
||||
void on_transfer(int channel, int amount)
|
||||
TEST_CHECK(amount > 0);
|
||||
m_bandwidth_limit[channel].assign(amount);
|
||||
m_ios.post(boost::bind(&peer_connection::on_transfer, self(), channel, amount));
|
||||
}
|
||||
|
||||
void peer_connection::on_transfer(int channel, int amount)
|
||||
{
|
||||
TEST_CHECK(m_writing);
|
||||
m_writing = false;
|
||||
m_stats.sent_bytes(amount, 0);
|
||||
|
||||
boost::shared_ptr<torrent> t = m_torrent.lock();
|
||||
if (!t) return;
|
||||
if (m_bandwidth_limit[channel].max_assignable() > 0)
|
||||
{
|
||||
TEST_CHECK(m_writing);
|
||||
m_writing = false;
|
||||
m_stats.sent_bytes(amount, 0);
|
||||
|
||||
boost::shared_ptr<torrent> t = m_torrent.lock();
|
||||
if (!t) return;
|
||||
if (m_bandwidth_limit[channel].max_assignable() > 0)
|
||||
{
|
||||
m_writing = true;
|
||||
t->request_bandwidth(0, this, !m_prioritized);
|
||||
}
|
||||
m_writing = true;
|
||||
t->request_bandwidth(0, this, 32 * 1024, m_priority);
|
||||
}
|
||||
void start()
|
||||
}
|
||||
|
||||
void peer_connection::start()
|
||||
{
|
||||
boost::shared_ptr<torrent> t = m_torrent.lock();
|
||||
if (!t) return;
|
||||
m_writing = true;
|
||||
t->request_bandwidth(0, this, 32 * 1024, m_priority);
|
||||
}
|
||||
|
||||
void peer_connection::expire_bandwidth(int channel, int amount)
|
||||
{
|
||||
TEST_CHECK(amount > 0);
|
||||
#ifdef VERBOSE_LOGGING
|
||||
std::cerr << time_now_string() << ": [" << m_name
|
||||
<< "] expire bandwidth, " << amount << std::endl;
|
||||
#endif
|
||||
m_bandwidth_limit[channel].expire(amount);
|
||||
|
||||
if (!m_writing && m_bandwidth_limit[channel].max_assignable() > 0)
|
||||
{
|
||||
boost::shared_ptr<torrent> t = m_torrent.lock();
|
||||
if (!t) return;
|
||||
m_writing = true;
|
||||
t->request_bandwidth(0, this, !m_prioritized);
|
||||
t->request_bandwidth(0, this, 32 * 1024, m_priority);
|
||||
}
|
||||
void stop() { m_abort = true; }
|
||||
void expire_bandwidth(int channel, int amount)
|
||||
{
|
||||
TEST_CHECK(amount > 0);
|
||||
}
|
||||
|
||||
void peer_connection::tick()
|
||||
{
|
||||
#ifdef VERBOSE_LOGGING
|
||||
std::cerr << time_now_string() << ": [" << m_name
|
||||
<< "] expire bandwidth, " << amount << std::endl;
|
||||
std::cerr << time_now_string() << ": [" << m_name
|
||||
<< "] tick, rate: " << m_stats.upload_rate() << std::endl;
|
||||
#endif
|
||||
m_bandwidth_limit[channel].expire(amount);
|
||||
m_stats.second_tick(1.f);
|
||||
}
|
||||
|
||||
if (!m_writing && m_bandwidth_limit[channel].max_assignable() > 0)
|
||||
{
|
||||
boost::shared_ptr<torrent> t = m_torrent.lock();
|
||||
if (!t) return;
|
||||
m_writing = true;
|
||||
t->request_bandwidth(0, this, !m_prioritized);
|
||||
}
|
||||
}
|
||||
void tick()
|
||||
{
|
||||
#ifdef VERBOSE_LOGGING
|
||||
std::cerr << time_now_string() << ": [" << m_name
|
||||
<< "] tick, rate: " << m_stats.upload_rate() << std::endl;
|
||||
#endif
|
||||
m_stats.second_tick(1.f);
|
||||
}
|
||||
|
||||
int bandwidth_throttle(int channel) const
|
||||
{ return m_bandwidth_limit[channel].throttle(); }
|
||||
|
||||
void throttle(int limit) { m_bandwidth_limit[0].throttle(limit); }
|
||||
|
||||
bandwidth_limit m_bandwidth_limit[1];
|
||||
boost::weak_ptr<torrent> m_torrent;
|
||||
bool m_prioritized;
|
||||
bool m_ignore_limits;
|
||||
bool m_abort;
|
||||
libtorrent::stat m_stats;
|
||||
io_service& m_ios;
|
||||
std::string m_name;
|
||||
bool m_writing;
|
||||
};
|
||||
|
||||
void torrent::expire_bandwidth(int channel, int amount)
|
||||
{
|
||||
@@ -201,12 +226,11 @@ void torrent::expire_bandwidth(int channel, int amount)
|
||||
m_bandwidth_queue[channel].pop_front();
|
||||
if (qe.peer->max_assignable_bandwidth(channel) <= 0)
|
||||
{
|
||||
TORRENT_ASSERT(m_bandwidth_manager.is_in_history(qe.peer.get()));
|
||||
if (!qe.peer->is_disconnecting()) tmp.push_back(qe);
|
||||
continue;
|
||||
}
|
||||
perform_bandwidth_request(channel, qe.peer
|
||||
, qe.max_block_size, qe.non_prioritized);
|
||||
, qe.max_block_size, qe.priority);
|
||||
}
|
||||
m_bandwidth_queue[channel].insert(m_bandwidth_queue[channel].begin(), tmp.begin(), tmp.end());
|
||||
}
|
||||
@@ -311,7 +335,7 @@ void spawn_connections(connections_t& v, io_service& ios
|
||||
{
|
||||
for (int i = 0; i < num; ++i)
|
||||
{
|
||||
v.push_back(new peer_connection(ios, t, true, false
|
||||
v.push_back(new peer_connection(ios, t, 200, false
|
||||
, prefix + boost::lexical_cast<std::string>(i)));
|
||||
}
|
||||
}
|
||||
@@ -545,7 +569,7 @@ void test_peer_priority(int limit, bool torrent_limit)
|
||||
connections_t v;
|
||||
std::copy(v1.begin(), v1.end(), std::back_inserter(v));
|
||||
boost::intrusive_ptr<peer_connection> p(
|
||||
new peer_connection(ios, t1, false, false, "no-priority"));
|
||||
new peer_connection(ios, t1, 0, false, "no-priority"));
|
||||
v.push_back(p);
|
||||
run_test(ios, v);
|
||||
|
||||
@@ -581,7 +605,7 @@ void test_no_starvation(int limit)
|
||||
connections_t v;
|
||||
std::copy(v1.begin(), v1.end(), std::back_inserter(v));
|
||||
boost::intrusive_ptr<peer_connection> p(
|
||||
new peer_connection(ios, t2, false, false, "no-priority"));
|
||||
new peer_connection(ios, t2, 0, false, "no-priority"));
|
||||
v.push_back(p);
|
||||
run_test(ios, v);
|
||||
|
||||
|
Reference in New Issue
Block a user