move web_server for unit tests over to python

This commit is contained in:
Arvid Norberg
2013-11-03 08:15:51 +00:00
parent 5812e8415d
commit b4aad2a481
11 changed files with 204 additions and 614 deletions

View File

@@ -1,4 +1,4 @@
/*
/*
Copyright (c) 2008, Arvid Norberg
All rights reserved.
@@ -85,11 +85,6 @@ void report_failure(char const* err, char const* file, int line)
int print_failures()
{
for (std::vector<std::string>::iterator i = failure_strings.begin()
, end(failure_strings.end()); i != end; ++i)
{
fputs(i->c_str(), stderr);
}
fprintf(stderr, "\n\n\x1b[41m == %d TEST(S) FAILED ==\x1b[0m\n\n\n", tests_failure);
return tests_failure;
}
@@ -380,20 +375,25 @@ pid_type async_run(char const* cmdline)
#endif
}
void stop_process(pid_type p)
{
#ifdef _WIN32
HANDLE proc = OpenProcess(PROCESS_TERMINATE | SYNCHRONIZE, FALSE, p);
TerminateProcess(proc, 138);
CloseHandle(proc);
#else
printf("killing pid: %d\n", p);
kill(p, SIGKILL);
#endif
}
void stop_all_proxies()
{
std::map<int, proxy_t> proxies = running_proxies;
for (std::map<int, proxy_t>::iterator i = proxies.begin()
, end(proxies.end()); i != end; ++i)
{
#ifdef _WIN32
HANDLE proc = OpenProcess(PROCESS_TERMINATE | SYNCHRONIZE, FALSE, i->second.pid);
TerminateProcess(proc, 138);
CloseHandle(proc);
#else
printf("killing pid: %d\n", i->second.pid);
kill(i->second.pid, SIGKILL);
#endif
stop_process(i->second.pid);
running_proxies.erase(i->second.pid);
}
}
@@ -449,7 +449,7 @@ int start_proxy(int proxy_type)
fprintf(stderr, "%s starting proxy on port %d (%s %s)...\n", time_now_string(), port, type, auth);
fprintf(stderr, "%s\n", buf);
int r = async_run(buf);
pid_type r = async_run(buf);
if (r == 0) exit(1);
proxy_t t = { r, proxy_type };
running_proxies.insert(std::make_pair(port, t));
@@ -790,7 +790,6 @@ int start_tracker()
}
boost::detail::atomic_count g_udp_tracker_requests(0);
boost::detail::atomic_count g_http_tracker_requests(0);
void on_udp_receive(error_code const& ec, size_t bytes_transferred, udp::endpoint const* from, char* buffer, udp::socket* sock)
{
@@ -909,584 +908,34 @@ void udp_tracker_thread(int* port)
}
}
boost::asio::io_service* web_ios = 0;
boost::shared_ptr<libtorrent::thread> web_server;
libtorrent::mutex web_lock;
libtorrent::event web_initialized;
bool stop_thread = false;
pid_type web_server_pid = 0;
static void terminate_web_thread()
int start_web_server(bool ssl, bool chunked_encoding)
{
stop_thread = true;
web_ios->stop();
web_ios = 0;
unsigned int seed = total_microseconds(time_now_hires() - min_time());
printf("random seed: %u\n", seed);
std::srand(seed);
int port = 5000 + (rand() % 55000);
char buf[200];
snprintf(buf, sizeof(buf), "python ../web_server.py %d %d %d"
, port, chunked_encoding , ssl);
fprintf(stderr, "%s starting web_server on port %d...\n", time_now_string(), port);
fprintf(stderr, "%s\n", buf);
pid_type r = async_run(buf);
if (r == 0) exit(1);
web_server_pid = r;
fprintf(stderr, "%s launched\n", time_now_string());
test_sleep(500);
return port;
}
void stop_web_server()
{
fprintf(stderr, "%s: stop_web_server()\n", time_now_string());
if (web_server && web_ios)
{
fprintf(stderr, "%s: stopping web server thread\n", time_now_string());
web_ios->post(&terminate_web_thread);
web_server->join();
web_server.reset();
}
remove("server.pem");
fprintf(stderr, "%s: stop_web_server() done\n", time_now_string());
if (web_server_pid == 0) return;
stop_process(web_server_pid);
web_server_pid = 0;
}
void web_server_thread(int* port, bool ssl, bool chunked);
int start_web_server(bool ssl, bool chunked_encoding)
{
stop_web_server();
stop_thread = false;
{
libtorrent::mutex::scoped_lock l(web_lock);
web_initialized.clear(l);
}
int port = 0;
web_server.reset(new libtorrent::thread(boost::bind(
&web_server_thread, &port, ssl, chunked_encoding)));
{
libtorrent::mutex::scoped_lock l(web_lock);
web_initialized.wait(l);
}
// create this directory so that the path
// "relative/../test_file" can resolve
error_code ec;
create_directory("relative", ec);
// test_sleep(100);
return port;
}
void send_response(socket_type& s, error_code& ec
, int code, char const* status_message, char const** extra_header
, int len)
{
char msg[600];
int pkt_len = snprintf(msg, sizeof(msg), "HTTP/1.1 %d %s\r\n"
"content-length: %d\r\n"
"%s"
"%s"
"%s"
"%s"
"\r\n"
, code, status_message, len
, extra_header[0]
, extra_header[1]
, extra_header[2]
, extra_header[3]);
DLOG(stderr, ">> %s\n", msg);
write(s, boost::asio::buffer(msg, pkt_len), boost::asio::transfer_all(), ec);
if (ec) fprintf(stderr, "*** send failed: %s\n", ec.message().c_str());
}
void on_accept(error_code& accept_ec, error_code const& ec, bool* done)
{
accept_ec = ec;
*done = true;
}
void send_content(socket_type& s, char const* file, int size, bool chunked)
{
error_code ec;
if (chunked)
{
int chunk_size = 13;
char head[20];
std::vector<boost::asio::const_buffer> bufs(3);
bufs[2] = asio::const_buffer("\r\n", 2);
while (chunk_size > 0)
{
chunk_size = std::min(chunk_size, size);
int len = snprintf(head, sizeof(head), "%x\r\n", chunk_size);
bufs[0] = asio::const_buffer(head, len);
if (chunk_size == 0)
{
// terminate
bufs.erase(bufs.begin()+1);
}
else
{
bufs[1] = asio::const_buffer(file, chunk_size);
}
write(s, bufs, boost::asio::transfer_all(), ec);
if (ec) fprintf(stderr, "*** send failed: %s\n", ec.message().c_str());
size -= chunk_size;
file += chunk_size;
chunk_size *= 2;
}
}
else
{
write(s, boost::asio::buffer(file, size), boost::asio::transfer_all(), ec);
// DLOG(stderr, " >> %s\n", std::string(file, size).c_str());
if (ec) fprintf(stderr, "*** send failed: %s\n", ec.message().c_str());
}
}
void on_read(error_code const& ec, size_t bytes_transferred, size_t* bt, error_code* e, bool* done)
{
DLOG(stderr, "on_read %d [ ec: %s ]\n", int(bytes_transferred), ec.message().c_str());
*bt = bytes_transferred;
*e = ec;
*done = true;
}
void on_read_timeout(error_code const& ec, bool* timed_out)
{
if (ec) return;
fprintf(stderr, "read timed out\n");
*timed_out = true;
}
void web_server_thread(int* port, bool ssl, bool chunked)
{
io_service ios;
socket_acceptor acceptor(ios);
error_code ec;
acceptor.open(tcp::v4(), ec);
if (ec)
{
fprintf(stderr, "Error opening listen socket: %s\n", ec.message().c_str());
libtorrent::mutex::scoped_lock l(web_lock);
web_initialized.signal(l);
return;
}
acceptor.set_option(socket_acceptor::reuse_address(true), ec);
if (ec)
{
fprintf(stderr, "Error setting listen socket to reuse addr: %s\n", ec.message().c_str());
libtorrent::mutex::scoped_lock l(web_lock);
web_initialized.signal(l);
return;
}
acceptor.bind(tcp::endpoint(address_v4::any(), 0), ec);
if (ec)
{
fprintf(stderr, "Error binding listen socket to port 0: %s\n", ec.message().c_str());
libtorrent::mutex::scoped_lock l(web_lock);
web_initialized.signal(l);
return;
}
*port = acceptor.local_endpoint().port();
acceptor.listen(10, ec);
if (ec)
{
fprintf(stderr, "Error listening on socket: %s\n", ec.message().c_str());
libtorrent::mutex::scoped_lock l(web_lock);
web_initialized.signal(l);
return;
}
web_ios = &ios;
char buf[10000];
int len = 0;
int offset = 0;
bool connection_close = false;
socket_type s(ios);
void* ctx = 0;
#ifdef TORRENT_USE_OPENSSL
boost::asio::ssl::context ssl_ctx(ios, boost::asio::ssl::context::sslv23_server);
if (ssl)
{
ssl_ctx.use_certificate_chain_file("../ssl/server.pem");
ssl_ctx.use_private_key_file("../ssl/server.pem", asio::ssl::context::pem);
ssl_ctx.set_verify_mode(boost::asio::ssl::context::verify_none);
ctx = &ssl_ctx;
}
#endif
proxy_settings p;
instantiate_connection(ios, p, s, ctx);
fprintf(stderr, "web server initialized on port %d%s\n", *port, ssl ? " [SSL]" : "");
{
libtorrent::mutex::scoped_lock l(web_lock);
web_initialized.signal(l);
}
for (;;)
{
if (connection_close)
{
error_code ec;
#ifdef TORRENT_USE_OPENSSL
if (ssl)
{
DLOG(stderr, "shutting down SSL connection\n");
s.get<ssl_stream<stream_socket> >()->shutdown(ec);
if (ec) fprintf(stderr, "SSL shutdown failed: %s\n", ec.message().c_str());
ec.clear();
}
#endif
DLOG(stderr, "closing connection\n");
s.close(ec);
if (ec) fprintf(stderr, "close failed: %s\n", ec.message().c_str());
connection_close = false;
}
if (!s.is_open())
{
len = 0;
offset = 0;
error_code ec;
instantiate_connection(ios, p, s, ctx);
stream_socket* sock;
#ifdef TORRENT_USE_OPENSSL
if (ssl) sock = &s.get<ssl_stream<stream_socket> >()->next_layer();
else
#endif
sock = s.get<stream_socket>();
bool accept_done = false;
fprintf(stderr, "HTTP waiting for incoming connection\n");
acceptor.async_accept(*sock, boost::bind(&on_accept, boost::ref(ec), _1, &accept_done));
while (!accept_done)
{
error_code e;
ios.reset();
if (stop_thread || ios.run_one(e) == 0)
{
fprintf(stderr, "%s: io_service stopped: %s\n", time_now_string(), e.message().c_str());
break;
}
}
if (stop_thread) break;
if (ec)
{
fprintf(stderr, "%s: accept failed: %s\n", time_now_string(), ec.message().c_str());
return;
}
fprintf(stderr, "%s: accepting incoming connection\n", time_now_string());
if (!s.is_open())
{
fprintf(stderr, "%s: incoming connection closed\n", time_now_string());
continue;
}
#ifdef TORRENT_USE_OPENSSL
if (ssl)
{
DLOG(stderr, "%s: SSL handshake\n", time_now_string());
s.get<ssl_stream<stream_socket> >()->accept_handshake(ec);
if (ec)
{
fprintf(stderr, "SSL handshake failed: %s\n", ec.message().c_str());
connection_close = true;
continue;
}
}
#endif
}
http_parser p;
bool failed = false;
do
{
p.reset();
bool error = false;
p.incoming(buffer::const_interval(buf + offset, buf + len), error);
char const* extra_header[4] = {"","","",""};
TEST_CHECK(error == false);
if (error)
{
fprintf(stderr, "HTTP parse failed\n");
failed = true;
break;
}
while (!p.finished())
{
TORRENT_ASSERT(len <= int(sizeof(buf)));
size_t received = 0;
bool done = false;
bool timed_out = false;
DLOG(stderr, "async_read_some %d bytes [ len: %d ]\n", int(sizeof(buf) - len), len);
s.async_read_some(boost::asio::buffer(&buf[len]
, sizeof(buf) - len), boost::bind(&on_read, _1, _2, &received, &ec, &done));
deadline_timer timer(ios);
timer.expires_at(time_now_hires() + seconds(2));
timer.async_wait(boost::bind(&on_read_timeout, _1, &timed_out));
while (!done && !timed_out)
{
error_code e;
ios.reset();
if (stop_thread || ios.run_one(e) == 0)
{
fprintf(stderr, "HTTP io_service stopped: %s\n", e.message().c_str());
break;
}
}
if (timed_out)
{
fprintf(stderr, "HTTP read timed out, closing connection\n");
failed = true;
break;
}
// fprintf(stderr, "read: %d\n", int(received));
if (ec || received <= 0)
{
fprintf(stderr, "HTTP read failed: \"%s\" (%s) received: %d\n"
, ec.message().c_str(), ec.category().name(), int(received));
failed = true;
break;
}
timer.cancel(ec);
if (ec)
fprintf(stderr, "HTTP timer.cancel failed: %s\n", ec.message().c_str());
len += received;
p.incoming(buffer::const_interval(buf + offset, buf + len), error);
TEST_CHECK(error == false);
if (error)
{
fprintf(stderr, "HTTP parse failed\n");
failed = true;
break;
}
}
std::string connection = p.header("connection");
std::string via = p.header("via");
if (p.protocol() == "HTTP/1.0")
{
DLOG(stderr, "*** HTTP/1.0, closing connection when done\n");
connection_close = true;
}
DLOG(stderr, "REQ: %s", std::string(buf + offset, p.body_start()).c_str());
if (failed)
{
fprintf(stderr, "*** connection failed\n");
connection_close = true;
break;
}
offset += int(p.body_start() + p.content_length());
// fprintf(stderr, "offset: %d len: %d\n", offset, len);
if (p.method() != "get" && p.method() != "post")
{
fprintf(stderr, "*** incorrect method: %s\n", p.method().c_str());
connection_close = true;
break;
}
std::string path = p.path();
std::vector<char> file_buf;
if (path.substr(0, 4) == "http")
{
// remove the http://hostname and the first / of the path
path = path.substr(path.find("://")+3);
path = path.substr(path.find_first_of('/')+1);
}
else
{
// remove the / from the path
path = path.substr(1);
}
// fprintf(stderr, "%s: [HTTP] %s\n", time_now_string(), path.c_str());
if (path == "redirect")
{
extra_header[0] = "Location: /test_file\r\n";
send_response(s, ec, 301, "Moved Permanently", extra_header, 0);
break;
}
if (path == "infinite_redirect")
{
extra_header[0] = "Location: /infinite_redirect\r\n";
send_response(s, ec, 301, "Moved Permanently", extra_header, 0);
break;
}
if (path == "relative/redirect")
{
extra_header[0] = "Location: ../test_file\r\n";
send_response(s, ec, 301, "Moved Permanently", extra_header, 0);
break;
}
if (path.substr(0, 8) == "announce")
{
fprintf(stderr, "%s\n", path.c_str());
entry announce;
announce["interval"] = 1800;
announce["complete"] = 1;
announce["incomplete"] = 1;
announce["peers"].string();
std::vector<char> buf;
bencode(std::back_inserter(buf), announce);
++g_http_tracker_requests;
fprintf(stderr, "[HTTP]: announce [%d]\n", int(g_http_tracker_requests));
send_response(s, ec, 200, "OK", extra_header, buf.size());
write(s, boost::asio::buffer(&buf[0], buf.size()), boost::asio::transfer_all(), ec);
if (ec)
fprintf(stderr, "[HTTP] *** send response failed: %s\n", ec.message().c_str());
continue;
}
if (filename(path).substr(0, 5) == "seed?")
{
char const* piece = strstr(path.c_str(), "&piece=");
if (piece == 0) piece = strstr(path.c_str(), "?piece=");
if (piece == 0)
{
fprintf(stderr, "invalid web seed request: %s\n", path.c_str());
break;
}
boost::uint64_t idx = atoi(piece + 7);
char const* range = strstr(path.c_str(), "&ranges=");
if (range == 0) range = strstr(path.c_str(), "?ranges=");
int range_end = 0;
int range_start = 0;
if (range)
{
range_start = atoi(range + 8);
range = strchr(range, '-');
if (range == 0)
{
fprintf(stderr, "invalid web seed request: %s\n", path.c_str());
break;
}
range_end = atoi(range + 1);
}
else
{
range_start = 0;
// assume piece size of 64kiB
range_end = 64*1024+1;
}
int size = range_end - range_start + 1;
boost::uint64_t off = idx * 64 * 1024 + range_start;
std::vector<char> file_buf;
error_code ec;
std::string file_path = path.substr(0, path.find_first_of('?'));
int res = load_file(file_path.c_str(), file_buf, ec);
if (res == -1 || file_buf.empty())
{
fprintf(stderr, "file not found: %s\n", file_path.c_str());
send_response(s, ec, 404, "Not Found", extra_header, 0);
continue;
}
send_response(s, ec, 200, "OK", extra_header, size);
DLOG(stderr, "sending %d bytes of payload [%d, %d) piece: %d\n"
, size, int(off), int(off + size), int(idx));
write(s, boost::asio::buffer(&file_buf[0] + off, size)
, boost::asio::transfer_all(), ec);
if (ec)
fprintf(stderr, "*** send failed: %s\n", ec.message().c_str());
else
{
DLOG(stderr, "*** done\n");
}
memmove(buf, buf + offset, len - offset);
len -= offset;
offset = 0;
continue;
}
DLOG(stderr, ">> serving file %s\n", path.c_str());
error_code ec;
int res = load_file(path, file_buf, ec, 8000000);
if (res == -1)
{
fprintf(stderr, ">> file not found: %s\n", path.c_str());
send_response(s, ec, 404, "Not Found", extra_header, 0);
continue;
}
if (res != 0)
{
// this means the file was either too big or couldn't be read
fprintf(stderr, ">> file too big: %s\n", path.c_str());
send_response(s, ec, 503, "Internal Error", extra_header, 0);
continue;
}
// serve file
if (extension(path) == ".gz")
{
extra_header[0] = "Content-Encoding: gzip\r\n";
}
if (chunked)
{
extra_header[2] = "Transfer-Encoding: chunked\r\n";
}
if (!p.header("range").empty())
{
std::string range = p.header("range");
int start, end;
sscanf(range.c_str(), "bytes=%d-%d", &start, &end);
char eh[400];
snprintf(eh, sizeof(eh), "Content-Range: bytes %d-%d\r\n", start, end);
extra_header[1] = eh;
if (end - start + 1 >= 1000)
{
DLOG(stderr, "request size: %.2f kB\n", int(end - start + 1)/1000.f);
}
else
{
DLOG(stderr, "request size: %d Bytes\n", int(end - start + 1));
}
send_response(s, ec, 206, "Partial", extra_header, end - start + 1);
if (!file_buf.empty())
{
send_content(s, &file_buf[0] + start, end - start + 1, chunked);
}
DLOG(stderr, "send %d bytes of payload\n", end - start + 1);
}
else
{
send_response(s, ec, 200, "OK", extra_header, file_buf.size());
if (!file_buf.empty())
send_content(s, &file_buf[0], file_buf.size(), chunked);
}
DLOG(stderr, "%d bytes left in receive buffer. offset: %d\n", len - offset, offset);
memmove(buf, buf + offset, len - offset);
len -= offset;
offset = 0;
} while (offset < len);
}
web_ios = 0;
fprintf(stderr, "%s: exiting web server thread\n", time_now_string());
}