back-ported connection tester to trunk

This commit is contained in:
Arvid Norberg
2011-11-27 21:15:33 +00:00
parent 413c04abcf
commit 72dd233d99

View File

@@ -44,6 +44,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include <boost/bind.hpp> #include <boost/bind.hpp>
#include <iostream> #include <iostream>
#include <boost/array.hpp> #include <boost/array.hpp>
#include <boost/detail/atomic_count.hpp>
using namespace libtorrent; using namespace libtorrent;
using namespace libtorrent::detail; // for write_* and read_* using namespace libtorrent::detail; // for write_* and read_*
@@ -65,6 +66,24 @@ void generate_block(boost::uint32_t* buffer, int piece, int start, int length)
int local_if_counter = 0; int local_if_counter = 0;
bool local_bind = false; bool local_bind = false;
// number of seeds we've spawned. The test is terminated
// when this reaches zero, for dual tests
boost::detail::atomic_count num_seeds(0);
// the kind of test to run. Upload sends data to a
// bittorrent client, download requests data from
// a client and dual uploads and downloads from a client
// at the same time (this is presumably the most realistic
// test)
enum { none, upload_test, download_test, dual_test } test_mode = none;
// the number of suggest messages received (total across all peers)
boost::detail::atomic_count num_suggest(0);
// the number of requests made from suggested pieces
boost::detail::atomic_count num_suggested_requests(0);
struct peer_conn struct peer_conn
{ {
peer_conn(io_service& ios, int num_pieces, int blocks_pp, tcp::endpoint const& ep peer_conn(io_service& ios, int num_pieces, int blocks_pp, tcp::endpoint const& ep
@@ -82,6 +101,7 @@ struct peer_conn
, num_pieces(num_pieces) , num_pieces(num_pieces)
, start_time(time_now_hires()) , start_time(time_now_hires())
{ {
if (seed) ++num_seeds;
pieces.reserve(num_pieces); pieces.reserve(num_pieces);
if (local_bind) if (local_bind)
{ {
@@ -108,6 +128,7 @@ struct peer_conn
} }
stream_socket s; stream_socket s;
char write_buf_proto[100];
boost::uint32_t write_buffer[17*1024/4]; boost::uint32_t write_buffer[17*1024/4];
boost::uint32_t buffer[17*1024/4]; boost::uint32_t buffer[17*1024/4];
int read_pos; int read_pos;
@@ -120,6 +141,8 @@ struct peer_conn
}; };
int state; int state;
std::vector<int> pieces; std::vector<int> pieces;
std::vector<int> suggested_pieces;
int current_piece; // the piece we're currently requesting blocks from
int block; int block;
int blocks_per_piece; int blocks_per_piece;
char const* info_hash; char const* info_hash;
@@ -195,10 +218,15 @@ struct peer_conn
{ {
if (fast_extension) if (fast_extension)
{ {
// have_all and unchoke char* ptr = write_buf_proto;
static char msg[] = "\0\0\0\x01\x0e\0\0\0\x01\x01"; // have_all
write_uint32(1, ptr);
write_uint8(0xe, ptr);
// unchoke
write_uint32(1, ptr);
write_uint8(1, ptr);
error_code ec; error_code ec;
boost::asio::async_write(s, libtorrent::asio::buffer(msg, sizeof(msg) - 1) boost::asio::async_write(s, libtorrent::asio::buffer(write_buf_proto, ptr - write_buf_proto)
, boost::bind(&peer_conn::on_have_all_sent, this, _1, _2)); , boost::bind(&peer_conn::on_have_all_sent, this, _1, _2));
} }
else else
@@ -233,12 +261,28 @@ struct peer_conn
, boost::bind(&peer_conn::on_msg_length, this, _1, _2)); , boost::bind(&peer_conn::on_msg_length, this, _1, _2));
} }
void write_request() bool write_request()
{ {
if (pieces.empty()) return; if (pieces.empty() && suggested_pieces.empty() && current_piece == -1) return false;
int piece = pieces.back();
if (current_piece == -1)
{
if (suggested_pieces.size() > 0)
{
current_piece = suggested_pieces.front();
suggested_pieces.erase(suggested_pieces.begin());
++num_suggested_requests;
}
else if (pieces.size() > 0)
{
current_piece = pieces.front();
pieces.erase(pieces.begin());
}
else
{
TORRENT_ASSERT(false);
}
}
char msg[] = "\0\0\0\xd\x06" char msg[] = "\0\0\0\xd\x06"
" " // piece " " // piece
" " // offset " " // offset
@@ -246,19 +290,21 @@ struct peer_conn
char* m = (char*)malloc(sizeof(msg)); char* m = (char*)malloc(sizeof(msg));
memcpy(m, msg, sizeof(msg)); memcpy(m, msg, sizeof(msg));
char* ptr = m + 5; char* ptr = m + 5;
write_uint32(piece, ptr); write_uint32(current_piece, ptr);
write_uint32(block * 16 * 1024, ptr); write_uint32(block * 16 * 1024, ptr);
write_uint32(16 * 1024, ptr); write_uint32(16 * 1024, ptr);
error_code ec; error_code ec;
boost::asio::async_write(s, libtorrent::asio::buffer(m, sizeof(msg) - 1) boost::asio::async_write(s, libtorrent::asio::buffer(m, sizeof(msg) - 1)
, boost::bind(&peer_conn::on_req_sent, this, m, _1, _2)); , boost::bind(&peer_conn::on_req_sent, this, m, _1, _2));
++outstanding_requests;
++block; ++block;
if (block == blocks_per_piece) if (block == blocks_per_piece)
{ {
block = 0; block = 0;
pieces.pop_back(); current_piece = -1;
} }
return true;
} }
void on_req_sent(char* m, error_code const& ec, size_t bytes_transferred) void on_req_sent(char* m, error_code const& ec, size_t bytes_transferred)
@@ -270,8 +316,6 @@ struct peer_conn
return; return;
} }
++outstanding_requests;
work_download(); work_download();
} }
@@ -286,11 +330,14 @@ struct peer_conn
float down = (boost::int64_t(blocks_received) * 0x4000) / time / 1000.f; float down = (boost::int64_t(blocks_received) * 0x4000) / time / 1000.f;
printf("%s sent: %d received: %d duration: %d ms up: %.1fMB/s down: %.1fMB/s\n" printf("%s sent: %d received: %d duration: %d ms up: %.1fMB/s down: %.1fMB/s\n"
, tmp, blocks_sent, blocks_received, time, up, down); , tmp, blocks_sent, blocks_received, time, up, down);
if (seed) --num_seeds;
} }
void work_download() void work_download()
{ {
if (pieces.empty() if (pieces.empty()
&& suggested_pieces.empty()
&& current_piece == -1
&& outstanding_requests == 0 && outstanding_requests == 0
&& blocks_received >= num_pieces * blocks_per_piece) && blocks_received >= num_pieces * blocks_per_piece)
{ {
@@ -299,10 +346,9 @@ struct peer_conn
} }
// send requests // send requests
if (outstanding_requests < 20 && !pieces.empty()) if (outstanding_requests < 40)
{ {
write_request(); if (write_request()) return;
return;
} }
// read message // read message
@@ -338,6 +384,13 @@ struct peer_conn
char* ptr = (char*)buffer; char* ptr = (char*)buffer;
int msg = read_uint8(ptr); int msg = read_uint8(ptr);
if (test_mode == dual_test && num_seeds == 0)
{
TORRENT_ASSERT(!seed);
close("NO MORE SEEDS, test done", error_code());
return;
}
//printf("msg: %d len: %d\n", msg, int(bytes_transferred)); //printf("msg: %d len: %d\n", msg, int(bytes_transferred));
if (seed) if (seed)
@@ -400,10 +453,28 @@ struct peer_conn
} }
std::random_shuffle(pieces.begin(), pieces.end()); std::random_shuffle(pieces.begin(), pieces.end());
} }
else if (msg == 7) else if (msg == 7) // piece
{ {
++blocks_received; ++blocks_received;
--outstanding_requests; --outstanding_requests;
int piece = detail::read_int32(ptr);
int start = detail::read_int32(ptr);
if ((start + bytes_transferred) / 0x4000 == blocks_per_piece)
{
write_have(piece);
return;
}
}
else if (msg == 13) // suggest
{
int piece = detail::read_int32(ptr);
std::vector<int>::iterator i = std::find(pieces.begin(), pieces.end(), piece);
if (i != pieces.end())
{
pieces.erase(i);
suggested_pieces.push_back(piece);
++num_suggest;
}
} }
work_download(); work_download();
} }
@@ -412,22 +483,27 @@ struct peer_conn
void write_piece(int piece, int start, int length) void write_piece(int piece, int start, int length)
{ {
generate_block(write_buffer, piece, start, length); generate_block(write_buffer, piece, start, length);
static char msg[] = " \x07" char* ptr = write_buf_proto;
" " // piece
" "; // start
char* ptr = msg;
write_uint32(9 + length, ptr); write_uint32(9 + length, ptr);
assert(length == 0x4000); assert(length == 0x4000);
assert(*ptr == 7); write_uint8(7, ptr);
++ptr; // skip message id
write_uint32(piece, ptr); write_uint32(piece, ptr);
write_uint32(start, ptr); write_uint32(start, ptr);
boost::array<libtorrent::asio::const_buffer, 2> vec; boost::array<libtorrent::asio::const_buffer, 2> vec;
vec[0] = libtorrent::asio::buffer(msg, sizeof(msg)-1); vec[0] = libtorrent::asio::buffer(write_buf_proto, ptr - write_buf_proto);
vec[1] = libtorrent::asio::buffer(write_buffer, length); vec[1] = libtorrent::asio::buffer(write_buffer, length);
boost::asio::async_write(s, vec, boost::bind(&peer_conn::on_have_all_sent, this, _1, _2)); boost::asio::async_write(s, vec, boost::bind(&peer_conn::on_have_all_sent, this, _1, _2));
++blocks_sent; ++blocks_sent;
} }
void write_have(int piece)
{
char* ptr = write_buf_proto;
write_uint32(5, ptr);
write_uint8(4, ptr);
write_uint32(piece, ptr);
boost::asio::async_write(s, asio::buffer(write_buf_proto, 9), boost::bind(&peer_conn::on_have_all_sent, this, _1, _2));
}
}; };
void print_usage() void print_usage()
@@ -548,8 +624,6 @@ int main(int argc, char* argv[])
{ {
if (argc <= 1) print_usage(); if (argc <= 1) print_usage();
enum { none, upload_test, download_test, dual_test } test_mode = none;
if (strcmp(argv[1], "gen-torrent") == 0) if (strcmp(argv[1], "gen-torrent") == 0)
{ {
if (argc != 4) print_usage(); if (argc != 4) print_usage();
@@ -718,8 +792,10 @@ int main(int argc, char* argv[])
} }
printf("=========================\n" printf("=========================\n"
"suggests: %d suggested-requests: %d\n"
"total sent: %.1f %% received: %.1f %%\n" "total sent: %.1f %% received: %.1f %%\n"
"rate sent: %.1f MB/s received: %.1f MB/s\n" "rate sent: %.1f MB/s received: %.1f MB/s\n"
, int(num_suggest), int(num_suggested_requests)
, total_sent * 0x4000 * 100.f / float(ti.total_size()) , total_sent * 0x4000 * 100.f / float(ti.total_size())
, total_received * 0x4000 * 100.f / float(ti.total_size()) , total_received * 0x4000 * 100.f / float(ti.total_size())
, up, down); , up, down);