DHT refactoring and support for storing arbitrary data with put

This commit is contained in:
Arvid Norberg
2013-12-27 04:28:25 +00:00
parent c9bfa1279e
commit d6b1aa4c36
22 changed files with 1756 additions and 394 deletions

View File

@@ -32,6 +32,7 @@ POSSIBILITY OF SUCH DAMAGE.
#ifndef TORRENT_DISABLE_DHT
#include "libtorrent/config.hpp"
#include "libtorrent/session.hpp"
#include "libtorrent/kademlia/node.hpp" // for verify_message
#include "libtorrent/bencode.hpp"
@@ -41,6 +42,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/kademlia/node_id.hpp"
#include "libtorrent/kademlia/routing_table.hpp"
#include "libtorrent/kademlia/item.hpp"
#include <numeric>
#include "test.hpp"
@@ -57,6 +59,8 @@ POSSIBILITY OF SUCH DAMAGE.
using namespace libtorrent;
using namespace libtorrent::dht;
void nop() {}
sha1_hash to_hash(char const* s)
{
sha1_hash ret;
@@ -84,13 +88,13 @@ void node_push_back(void* userdata, libtorrent::dht::node_entry const& n)
void nop(void* userdata, libtorrent::dht::node_entry const& n) {}
std::list<std::pair<udp::endpoint, entry> > g_responses;
std::list<std::pair<udp::endpoint, entry> > g_sent_packets;
struct mock_socket : udp_socket_interface
{
bool send_packet(entry& msg, udp::endpoint const& ep, int flags)
{
g_responses.push_back(std::make_pair(ep, msg));
g_sent_packets.push_back(std::make_pair(ep, msg));
return true;
}
};
@@ -121,7 +125,23 @@ boost::array<char, 64> generate_key()
static const std::string no;
void send_dht_msg(node_impl& node, char const* msg, udp::endpoint const& ep
std::list<std::pair<udp::endpoint, entry> >::iterator
find_packet(udp::endpoint ep)
{
return std::find_if(g_sent_packets.begin(), g_sent_packets.end()
, boost::bind(&std::pair<udp::endpoint, entry>::first, _1) == ep);
}
void lazy_from_entry(entry const& e, lazy_entry& l)
{
error_code ec;
static char inbuf[1500];
int len = bencode(inbuf, e);
int ret = lazy_bdecode(inbuf, inbuf + len, l, ec);
TEST_CHECK(ret == 0);
}
void send_dht_request(node_impl& node, char const* msg, udp::endpoint const& ep
, lazy_entry* reply, char const* t = "10", char const* info_hash = 0
, char const* name = 0, std::string const token = std::string(), int port = 0
, char const* target = 0, entry const* value = 0
@@ -171,22 +191,81 @@ void send_dht_msg(node_impl& node, char const* msg, udp::endpoint const& ep
node.incoming(m);
// by now the node should have invoked the send function and put the
// response in g_responses
// response in g_sent_packets
std::list<std::pair<udp::endpoint, entry> >::iterator i
= std::find_if(g_responses.begin(), g_responses.end()
, boost::bind(&std::pair<udp::endpoint, entry>::first, _1) == ep);
if (i == g_responses.end())
= find_packet(ep);
if (i == g_sent_packets.end())
{
TEST_ERROR("not response from DHT node");
return;
}
static char inbuf[1500];
int len = bencode(inbuf, i->second);
g_responses.erase(i);
int ret = lazy_bdecode(inbuf, inbuf + len, *reply, ec);
TEST_CHECK(ret == 0);
lazy_from_entry(i->second, *reply);
g_sent_packets.erase(i);
}
namespace libtorrent { namespace dht { namespace detail
{
// defined in node.cpp
void write_nodes_entry(entry& r, nodes_t const& nodes);
} } }
void write_peers(entry::dictionary_type& r, std::set<tcp::endpoint> const& peers)
{
entry::list_type& pe = r["values"].list();
for (std::set<tcp::endpoint>::const_iterator it = peers.begin()
; it != peers.end(); ++it)
{
std::string endpoint(18, '\0');
std::string::iterator out = endpoint.begin();
libtorrent::detail::write_endpoint(*it, out);
endpoint.resize(out - endpoint.begin());
pe.push_back(entry(endpoint));
}
}
void send_dht_response(node_impl& node, lazy_entry const& request, udp::endpoint const& ep
, nodes_t const& nodes = nodes_t()
, std::string const token = std::string(), int port = 0
, std::set<tcp::endpoint> const& peers = std::set<tcp::endpoint>()
, char const* target = 0, entry const* value = 0
, std::string const key = std::string(), std::string const sig = std::string()
, int seq = -1, sha1_hash const* nid = NULL)
{
entry e;
e["y"] = "r";
e["t"] = request.dict_find_string_value("t");
// e["ip"] = endpoint_to_bytes(ep);
entry::dictionary_type& r = e["r"].dict();
if (nid == NULL) r["id"] = generate_next().to_string();
else r["id"] = nid->to_string();
if (!token.empty()) r["token"] = token;
if (port) r["p"] = port;
if (!nodes.empty()) dht::detail::write_nodes_entry(e["r"], nodes);
if (!peers.empty()) write_peers(r, peers);
if (value) r["v"] = *value;
if (!sig.empty()) r["sig"] = sig;
if (!key.empty()) r["k"] = key;
if (seq >= 0) r["seq"] = seq;
char msg_buf[1500];
int size = bencode(msg_buf, e);
#if defined TORRENT_DEBUG && TORRENT_USE_IOSTREAM
// this yields a lot of output. too much
// std::cerr << "sending: " << e << "\n";
#endif
#ifdef TORRENT_USE_VALGRIND
VALGRIND_CHECK_MEM_IS_DEFINED(msg_buf, size);
#endif
lazy_entry decoded;
error_code ec;
lazy_bdecode(msg_buf, msg_buf + size, decoded, ec);
if (ec) fprintf(stderr, "lazy_bdecode failed: %s\n", ec.message().c_str());
dht::msg m(decoded, ep);
node.incoming(m);
}
struct announce_item
@@ -220,7 +299,7 @@ void announce_immutable_items(node_impl& node, udp::endpoint const* eps
{
if ((i % items[j].num_peers) == 0) continue;
lazy_entry response;
send_dht_msg(node, "get", eps[i], &response, "10", 0
send_dht_request(node, "get", eps[i], &response, "10", 0
, 0, no, 0, (char const*)&items[j].target[0]);
key_desc_t desc[] =
@@ -258,7 +337,7 @@ void announce_immutable_items(node_impl& node, udp::endpoint const* eps
TEST_EQUAL(addr, eps[i].address());
}
send_dht_msg(node, "put", eps[i], &response, "10", 0
send_dht_request(node, "put", eps[i], &response, "10", 0
, 0, token, 0, (char const*)&items[j].target[0], &items[j].ent);
key_desc_t desc2[] =
@@ -287,7 +366,7 @@ void announce_immutable_items(node_impl& node, udp::endpoint const* eps
for (int j = 0; j < num_items; ++j)
{
lazy_entry response;
send_dht_msg(node, "get", eps[j], &response, "10", 0
send_dht_request(node, "get", eps[j], &response, "10", 0
, 0, no, 0, (char const*)&items[j].target[0]);
key_desc_t desc[] =
@@ -327,12 +406,37 @@ struct print_alert : alert_dispatcher
}
};
int sum_distance_exp(int s, node_entry const& e, node_id const& ref)
{
return s + distance_exp(e.id, ref);
}
// TODO: 3 test find_data, obfuscated_get_peers and bootstrap
std::vector<tcp::endpoint> g_got_peers;
void get_peers_cb(std::vector<tcp::endpoint> const& peers)
{
g_got_peers.insert(g_got_peers.end(), peers.begin(), peers.end());
}
std::vector<dht::item> g_got_items;
dht::item g_put_item;
int g_put_count;
bool get_item_cb(dht::item& i)
{
if (!i.empty())
g_got_items.push_back(i);
if (!g_put_item.empty())
{
i = g_put_item;
g_put_count++;
return true;
}
return false;
}
// TODO: 3 test obfuscated_get_peers
int test_main()
{
dht_settings sett;
@@ -346,13 +450,13 @@ int test_main()
// DHT should be running on port 48199 now
lazy_entry response;
lazy_entry const* parsed[10];
lazy_entry const* parsed[11];
char error_string[200];
bool ret;
// ====== ping ======
udp::endpoint source(address::from_string("10.0.0.1"), 20);
send_dht_msg(node, "ping", source, &response, "10");
send_dht_request(node, "ping", source, &response, "10");
dht::key_desc_t pong_desc[] = {
{"y", lazy_entry::string_t, 1, 0},
@@ -376,7 +480,7 @@ int test_main()
// ====== invalid message ======
send_dht_msg(node, "find_node", source, &response, "10");
send_dht_request(node, "find_node", source, &response, "10");
dht::key_desc_t err_desc[] = {
{"y", lazy_entry::string_t, 1, 0},
@@ -406,7 +510,7 @@ int test_main()
// ====== get_peers ======
send_dht_msg(node, "get_peers", source, &response, "10", "01010101010101010101");
send_dht_request(node, "get_peers", source, &response, "10", "01010101010101010101");
dht::key_desc_t peer1_desc[] = {
{"y", lazy_entry::string_t, 1, 0},
@@ -433,7 +537,7 @@ int test_main()
// ====== announce ======
send_dht_msg(node, "announce_peer", source, &response, "10", "01010101010101010101", "test", token, 8080);
send_dht_request(node, "announce_peer", source, &response, "10", "01010101010101010101", "test", token, 8080);
dht::key_desc_t ann_desc[] = {
{"y", lazy_entry::string_t, 1, 0},
@@ -458,7 +562,7 @@ int test_main()
for (int i = 0; i < 100; ++i)
{
source = udp::endpoint(rand_v4(), 6000);
send_dht_msg(node, "get_peers", source, &response, "10", "01010101010101010101");
send_dht_request(node, "get_peers", source, &response, "10", "01010101010101010101");
ret = dht::verify_message(&response, peer1_desc, parsed, 4, error_string, sizeof(error_string));
if (ret)
@@ -473,14 +577,14 @@ int test_main()
fprintf(stderr, " invalid get_peers response: %s\n", error_string);
}
response.clear();
send_dht_msg(node, "announce_peer", source, &response, "10", "01010101010101010101"
send_dht_request(node, "announce_peer", source, &response, "10", "01010101010101010101"
, "test", token, 8080, 0, 0, false, i >= 50);
response.clear();
}
// ====== get_peers ======
send_dht_msg(node, "get_peers", source, &response, "10", "01010101010101010101"
send_dht_request(node, "get_peers", source, &response, "10", "01010101010101010101"
, 0, no, 0, 0, 0, true);
dht::key_desc_t peer2_desc[] = {
@@ -524,7 +628,7 @@ int test_main()
// http://libtorrent.org/dht_sec.html
source = udp::endpoint(address::from_string("124.31.75.21"), 20);
node_id nid = to_hash("1712f6c70c5d6a4ec8a88e4c6ab4c28b95eee401");
send_dht_msg(node, "find_node", source, &response, "10", 0, 0, std::string()
send_dht_request(node, "find_node", source, &response, "10", 0, 0, std::string()
, 0, "0101010101010101010101010101010101010101", 0, false, false, std::string(), std::string(), -1, 0, &nid);
dht::key_desc_t nodes_desc[] = {
@@ -549,7 +653,7 @@ int test_main()
// verify that we reject invalid node IDs
// this is now an invalid node-id for 'source'
nid[0] = 0x18;
send_dht_msg(node, "find_node", source, &response, "10", 0, 0, std::string()
send_dht_request(node, "find_node", source, &response, "10", 0, 0, std::string()
, 0, "0101010101010101010101010101010101010101", 0, false, false, std::string(), std::string(), -1, 0, &nid);
ret = dht::verify_message(&response, err_desc, parsed, 2, error_string, sizeof(error_string));
@@ -648,18 +752,18 @@ int test_main()
fprintf(stderr, "generating ed25519 keys\n");
unsigned char seed[32];
ed25519_create_seed(seed);
unsigned char private_key[64];
unsigned char public_key[32];
char private_key[item_sk_len];
char public_key[item_pk_len];
ed25519_create_keypair(public_key, private_key, seed);
ed25519_create_keypair((unsigned char*)public_key, (unsigned char*)private_key, seed);
fprintf(stderr, "pub: %s priv: %s\n"
, to_hex(std::string((char*)public_key, 32)).c_str()
, to_hex(std::string((char*)private_key, 64)).c_str());
, to_hex(std::string(public_key, item_pk_len)).c_str()
, to_hex(std::string(private_key, item_sk_len)).c_str());
TEST_CHECK(ret);
send_dht_msg(node, "get", source, &response, "10", 0
, 0, no, 0, (char*)&hasher((char*)public_key, 32).final()[0]
send_dht_request(node, "get", source, &response, "10", 0
, 0, no, 0, (char*)&hasher(public_key, item_pk_len).final()[0]
, 0, false, false, std::string(), std::string(), 64);
key_desc_t desc[] =
@@ -686,22 +790,20 @@ int test_main()
TEST_ERROR(error_string);
}
unsigned char signature[64];
char signature[item_sig_len];
char buffer[1200];
int seq = 4;
int pos = snprintf(buffer, sizeof(buffer), "3:seqi%de1:v", seq);
char* ptr = buffer + pos;
pos += bencode(ptr, items[0].ent);
ed25519_sign(signature, (unsigned char*)buffer, pos, public_key, private_key);
TEST_EQUAL(ed25519_verify(signature, (unsigned char*)buffer, pos, public_key), 1);
std::pair<const char*, int> itemv(buffer, bencode(buffer, items[0].ent));
sign_mutable_item(itemv, seq, public_key, private_key, signature);
TEST_EQUAL(verify_mutable_item(itemv, seq, public_key, signature), true);
#ifdef TORRENT_USE_VALGRIND
VALGRIND_CHECK_MEM_IS_DEFINED(signature, 64);
VALGRIND_CHECK_MEM_IS_DEFINED(signature, item_sig_len);
#endif
send_dht_msg(node, "put", source, &response, "10", 0
send_dht_request(node, "put", source, &response, "10", 0
, 0, token, 0, 0, &items[0].ent, false, false
, std::string((char*)public_key, 32)
, std::string((char*)signature, 64), seq);
, std::string(public_key, item_pk_len)
, std::string(signature, item_sig_len), seq);
key_desc_t desc2[] =
{
@@ -722,8 +824,8 @@ int test_main()
TEST_ERROR(error_string);
}
send_dht_msg(node, "get", source, &response, "10", 0
, 0, no, 0, (char*)&hasher((char*)public_key, 32).final()[0]
send_dht_request(node, "get", source, &response, "10", 0
, 0, no, 0, (char*)&hasher(public_key, item_pk_len).final()[0]
, 0, false, false, std::string(), std::string(), 64);
key_desc_t desc3[] =
@@ -759,23 +861,21 @@ int test_main()
// also test that invalid signatures fail!
pos = snprintf(buffer, sizeof(buffer), "3:seqi%de1:v", seq);
ptr = buffer + pos;
pos += bencode(ptr, items[0].ent);
ed25519_sign(signature, (unsigned char*)buffer, pos, public_key, private_key);
TEST_EQUAL(ed25519_verify(signature, (unsigned char*)buffer, pos, public_key), 1);
itemv.second = bencode(buffer, items[0].ent);
sign_mutable_item(itemv, seq, public_key, private_key, signature);
TEST_EQUAL(verify_mutable_item(itemv, seq, public_key, signature), 1);
#ifdef TORRENT_USE_VALGRIND
VALGRIND_CHECK_MEM_IS_DEFINED(signature, 64);
VALGRIND_CHECK_MEM_IS_DEFINED(signature, item_sig_len);
#endif
// break the signature
signature[2] ^= 0xaa;
TEST_CHECK(ed25519_verify(signature, (unsigned char*)buffer, pos, public_key) != 1);
TEST_CHECK(verify_mutable_item(itemv, seq, public_key, signature) != 1);
send_dht_msg(node, "put", source, &response, "10", 0
send_dht_request(node, "put", source, &response, "10", 0
, 0, token, 0, 0, &items[0].ent, false, false
, std::string((char*)public_key, 32)
, std::string((char*)signature, 64), seq);
, std::string(public_key, item_pk_len)
, std::string(signature, item_sig_len), seq);
key_desc_t desc_error[] =
{
@@ -801,23 +901,21 @@ int test_main()
// === test CAS put ===
// this is the hash that we expect to be there
sha1_hash cas = hasher(buffer, pos).final();
sha1_hash cas = mutable_item_cas(itemv, seq);
// increment sequence number
++seq;
pos = snprintf(buffer, sizeof(buffer), "3:seqi%de1:v", seq);
ptr = buffer + pos;
// put item 1
pos += bencode(ptr, items[1].ent);
ed25519_sign(signature, (unsigned char*)buffer, pos, public_key, private_key);
TEST_EQUAL(ed25519_verify(signature, (unsigned char*)buffer, pos, public_key), 1);
itemv.second = bencode(buffer, items[1].ent);
sign_mutable_item(itemv, seq, public_key, private_key, signature);
TEST_EQUAL(verify_mutable_item(itemv, seq, public_key, signature), 1);
#ifdef TORRENT_USE_VALGRIND
VALGRIND_CHECK_MEM_IS_DEFINED(signature, 64);
VALGRIND_CHECK_MEM_IS_DEFINED(signature, item_sig_len);
#endif
send_dht_msg(node, "put", source, &response, "10", 0
send_dht_request(node, "put", source, &response, "10", 0
, 0, token, 0, 0, &items[1].ent, false, false
, std::string((char*)public_key, 32)
, std::string((char*)signature, 64), seq
, std::string(public_key, item_pk_len)
, std::string(signature, item_sig_len), seq
, (char const*)&cas[0]);
ret = verify_message(&response, desc2, parsed, 1, error_string, sizeof(error_string));
@@ -837,10 +935,10 @@ int test_main()
// put the same message again. This should fail because the
// CAS hash is outdated, it's not the hash of the value that's
// stored anymore
send_dht_msg(node, "put", source, &response, "10", 0
send_dht_request(node, "put", source, &response, "10", 0
, 0, token, 0, 0, &items[1].ent, false, false
, std::string((char*)public_key, 32)
, std::string((char*)signature, 64), seq
, std::string(public_key, item_pk_len)
, std::string(signature, item_sig_len), seq
, (char const*)&cas[0]);
ret = verify_message(&response, desc_error, parsed, 2, error_string, sizeof(error_string));
@@ -1231,6 +1329,471 @@ int test_main()
, rs[i], to_hex(id.to_string()).c_str());
}
}
// test traversal algorithms
dht::key_desc_t ping_desc[] = {
{"y", lazy_entry::string_t, 1, 0},
{"t", lazy_entry::string_t, 2, 0},
{"q", lazy_entry::string_t, 4, 0},
{"a", lazy_entry::dict_t, 0, key_desc_t::parse_children},
{"id", lazy_entry::string_t, 20, key_desc_t::last_child},
};
dht::key_desc_t find_node_desc[] = {
{"y", lazy_entry::string_t, 1, 0},
{"t", lazy_entry::string_t, 2, 0},
{"q", lazy_entry::string_t, 9, 0},
{"a", lazy_entry::dict_t, 0, key_desc_t::parse_children},
{"id", lazy_entry::string_t, 20, 0},
{"target", lazy_entry::string_t, 20, key_desc_t::last_child},
};
dht::key_desc_t get_peers_desc[] = {
{"y", lazy_entry::string_t, 1, 0},
{"t", lazy_entry::string_t, 2, 0},
{"q", lazy_entry::string_t, 9, 0},
{"a", lazy_entry::dict_t, 0, key_desc_t::parse_children},
{"id", lazy_entry::string_t, 20, 0},
{"info_hash", lazy_entry::string_t, 20, key_desc_t::last_child},
};
dht::key_desc_t get_item_desc[] = {
{"y", lazy_entry::string_t, 1, 0},
{"t", lazy_entry::string_t, 2, 0},
{"q", lazy_entry::string_t, 3, 0},
{"a", lazy_entry::dict_t, 0, key_desc_t::parse_children},
{"id", lazy_entry::string_t, 20, 0},
{"target", lazy_entry::string_t, 20, key_desc_t::last_child},
};
// bootstrap
do
{
dht::node_impl node(&ad, &s, sett, node_id::min(), ext, 0);
udp::endpoint initial_node(address_v4::from_string("4.4.4.4"), 1234);
std::vector<udp::endpoint> nodesv;
nodesv.push_back(initial_node);
node.bootstrap(nodesv, boost::bind(&nop));
TEST_EQUAL(g_sent_packets.size(), 1);
if (g_sent_packets.empty()) break;
TEST_EQUAL(g_sent_packets.front().first, initial_node);
lazy_from_entry(g_sent_packets.front().second, response);
ret = verify_message(&response, find_node_desc, parsed, 6, error_string, sizeof(error_string));
if (ret)
{
TEST_EQUAL(parsed[0]->string_value(), "q");
TEST_EQUAL(parsed[2]->string_value(), "find_node");
if (parsed[0]->string_value() != "q" || parsed[2]->string_value() != "find_node") break;
}
else
{
fprintf(stderr, " invalid find_node request: %s\n", print_entry(response).c_str());
TEST_ERROR(error_string);
break;
}
udp::endpoint found_node(address_v4::from_string("5.5.5.5"), 2235);
nodes_t nodes;
nodes.push_back(found_node);
g_sent_packets.clear();
send_dht_response(node, response, initial_node, nodes);
TEST_EQUAL(g_sent_packets.size(), 1);
if (g_sent_packets.empty()) break;
TEST_EQUAL(g_sent_packets.front().first, found_node);
lazy_from_entry(g_sent_packets.front().second, response);
ret = verify_message(&response, find_node_desc, parsed, 6, error_string, sizeof(error_string));
if (ret)
{
TEST_EQUAL(parsed[0]->string_value(), "q");
TEST_EQUAL(parsed[2]->string_value(), "find_node");
if (parsed[0]->string_value() != "q" || parsed[2]->string_value() != "find_node") break;
}
else
{
fprintf(stderr, " invalid find_node request: %s\n", print_entry(response).c_str());
TEST_ERROR(error_string);
break;
}
g_sent_packets.clear();
send_dht_response(node, response, found_node);
TEST_CHECK(g_sent_packets.empty());
TEST_EQUAL(node.num_global_nodes(), 3);
} while (false);
// get_peers
do
{
dht::node_id target = to_hash("1234876923549721020394873245098347598635");
dht::node_impl node(&ad, &s, sett, node_id::min(), ext, 0);
udp::endpoint initial_node(address_v4::from_string("4.4.4.4"), 1234);
node.m_table.add_node(initial_node);
node.announce(target, 1234, false, get_peers_cb);
TEST_EQUAL(g_sent_packets.size(), 1);
if (g_sent_packets.empty()) break;
TEST_EQUAL(g_sent_packets.front().first, initial_node);
lazy_from_entry(g_sent_packets.front().second, response);
ret = verify_message(&response, get_peers_desc, parsed, 6, error_string, sizeof(error_string));
if (ret)
{
TEST_EQUAL(parsed[0]->string_value(), "q");
TEST_EQUAL(parsed[2]->string_value(), "get_peers");
TEST_EQUAL(parsed[5]->string_value(), target.to_string());
if (parsed[0]->string_value() != "q" || parsed[2]->string_value() != "get_peers") break;
}
else
{
fprintf(stderr, " invalid get_peers request: %s\n", print_entry(response).c_str());
TEST_ERROR(error_string);
break;
}
std::set<tcp::endpoint> peers[2];
peers[0].insert(tcp::endpoint(address_v4::from_string("4.1.1.1"), 4111));
peers[0].insert(tcp::endpoint(address_v4::from_string("4.1.1.2"), 4112));
peers[0].insert(tcp::endpoint(address_v4::from_string("4.1.1.3"), 4113));
udp::endpoint next_node(address_v4::from_string("5.5.5.5"), 2235);
nodes_t nodes;
nodes.push_back(next_node);
g_sent_packets.clear();
send_dht_response(node, response, initial_node, nodes, "10", 1234, peers[0]);
TEST_EQUAL(g_sent_packets.size(), 1);
if (g_sent_packets.empty()) break;
TEST_EQUAL(g_sent_packets.front().first, next_node);
lazy_from_entry(g_sent_packets.front().second, response);
ret = verify_message(&response, get_peers_desc, parsed, 6, error_string, sizeof(error_string));
if (ret)
{
TEST_EQUAL(parsed[0]->string_value(), "q");
TEST_EQUAL(parsed[2]->string_value(), "get_peers");
TEST_EQUAL(parsed[5]->string_value(), target.to_string());
if (parsed[0]->string_value() != "q" || parsed[2]->string_value() != "get_peers") break;
}
else
{
fprintf(stderr, " invalid get_peers request: %s\n", print_entry(response).c_str());
TEST_ERROR(error_string);
break;
}
peers[1].insert(tcp::endpoint(address_v4::from_string("4.1.1.4"), 4114));
peers[1].insert(tcp::endpoint(address_v4::from_string("4.1.1.5"), 4115));
peers[1].insert(tcp::endpoint(address_v4::from_string("4.1.1.6"), 4116));
g_sent_packets.clear();
send_dht_response(node, response, next_node, nodes_t(), "11", 1234, peers[1]);
TEST_CHECK(g_sent_packets.empty());
for (int i = 0; i < 2; ++i)
for (std::set<tcp::endpoint>::iterator peer = peers[i].begin(); peer != peers[i].end(); ++peer)
{
TEST_CHECK(std::find(g_got_peers.begin(), g_got_peers.end(), *peer) != g_got_peers.end());
}
g_got_peers.clear();
} while (false);
// immutable get
do
{
dht::node_impl node(&ad, &s, sett, node_id::min(), ext, 0);
udp::endpoint initial_node(address_v4::from_string("4.4.4.4"), 1234);
node.m_table.add_node(initial_node);
node.get_item(items[0].target, get_item_cb);
TEST_EQUAL(g_sent_packets.size(), 1);
if (g_sent_packets.empty()) break;
TEST_EQUAL(g_sent_packets.front().first, initial_node);
lazy_from_entry(g_sent_packets.front().second, response);
ret = verify_message(&response, get_item_desc, parsed, 6, error_string, sizeof(error_string));
if (ret)
{
TEST_EQUAL(parsed[0]->string_value(), "q");
TEST_EQUAL(parsed[2]->string_value(), "get");
TEST_EQUAL(parsed[5]->string_value(), items[0].target.to_string());
if (parsed[0]->string_value() != "q" || parsed[2]->string_value() != "get") break;
}
else
{
fprintf(stderr, " invalid get request: %s\n", print_entry(response).c_str());
TEST_ERROR(error_string);
break;
}
g_sent_packets.clear();
send_dht_response(node, response, initial_node, nodes_t(), "10", 1234, std::set<tcp::endpoint>()
, NULL, &items[0].ent);
TEST_CHECK(g_sent_packets.empty());
TEST_EQUAL(g_got_items.size(), 1);
if (g_got_items.empty()) break;
TEST_EQUAL(g_got_items.front().value(), items[0].ent);
g_got_items.clear();
} while (false);
// mutable get
do
{
dht::node_impl node(&ad, &s, sett, node_id::min(), ext, 0);
udp::endpoint initial_node(address_v4::from_string("4.4.4.4"), 1234);
node.m_table.add_node(initial_node);
sha1_hash target = hasher(public_key, item_pk_len).final();
node.get_item(target, get_item_cb);
TEST_EQUAL(g_sent_packets.size(), 1);
if (g_sent_packets.empty()) break;
TEST_EQUAL(g_sent_packets.front().first, initial_node);
lazy_from_entry(g_sent_packets.front().second, response);
ret = verify_message(&response, get_item_desc, parsed, 6, error_string, sizeof(error_string));
if (ret)
{
TEST_EQUAL(parsed[0]->string_value(), "q");
TEST_EQUAL(parsed[2]->string_value(), "get");
TEST_EQUAL(parsed[5]->string_value(), target.to_string());
if (parsed[0]->string_value() != "q" || parsed[2]->string_value() != "get") break;
}
else
{
fprintf(stderr, " invalid get request: %s\n", print_entry(response).c_str());
TEST_ERROR(error_string);
break;
}
g_sent_packets.clear();
itemv.second = bencode(buffer, items[0].ent);
sign_mutable_item(itemv, seq, public_key, private_key, signature);
send_dht_response(node, response, initial_node, nodes_t(), "10", 1234, std::set<tcp::endpoint>()
, NULL, &items[0].ent, std::string(public_key, item_pk_len), std::string(signature, item_sig_len), seq);
TEST_CHECK(g_sent_packets.empty());
TEST_EQUAL(g_got_items.size(), 1);
if (g_got_items.empty()) break;
TEST_EQUAL(g_got_items.front().value(), items[0].ent);
TEST_CHECK(memcmp(g_got_items.front().pk(), public_key, item_pk_len) == 0);
TEST_CHECK(memcmp(g_got_items.front().sig(), signature, item_sig_len) == 0);
TEST_EQUAL(g_got_items.front().seq(), seq);
g_got_items.clear();
} while (false);
dht::key_desc_t put_immutable_item_desc[] = {
{"y", lazy_entry::string_t, 1, 0},
{"t", lazy_entry::string_t, 2, 0},
{"q", lazy_entry::string_t, 3, 0},
{"a", lazy_entry::dict_t, 0, key_desc_t::parse_children},
{"id", lazy_entry::string_t, 20, 0},
{"token", lazy_entry::string_t, 2, 0},
{"v", lazy_entry::none_t, 0, key_desc_t::last_child},
};
dht::key_desc_t put_mutable_item_desc[] = {
{"y", lazy_entry::string_t, 1, 0},
{"t", lazy_entry::string_t, 2, 0},
{"q", lazy_entry::string_t, 3, 0},
{"a", lazy_entry::dict_t, 0, key_desc_t::parse_children},
{"id", lazy_entry::string_t, 20, 0},
{"cas", lazy_entry::string_t, 20, key_desc_t::optional},
{"k", lazy_entry::string_t, item_pk_len, 0},
{"seq", lazy_entry::int_t, 0, 0},
{"sig", lazy_entry::string_t, item_sig_len, 0},
{"token", lazy_entry::string_t, 2, 0},
{"v", lazy_entry::none_t, 0, key_desc_t::last_child},
};
// immutable put
do
{
dht::node_impl node(&ad, &s, sett, node_id::min(), ext, 0);
enum { num_test_nodes = 2 };
node_entry nodes[num_test_nodes] =
{ node_entry(generate_next(), udp::endpoint(address_v4::from_string("4.4.4.4"), 1234))
, node_entry(generate_next(), udp::endpoint(address_v4::from_string("5.5.5.5"), 1235)) };
for (int i = 0; i < num_test_nodes; ++i)
node.m_table.add_node(nodes[i]);
g_put_item.assign(items[0].ent);
node.get_item(items[0].target, get_item_cb);
TEST_EQUAL(g_sent_packets.size(), num_test_nodes);
if (g_sent_packets.size() != num_test_nodes) break;
for (int i = 0; i < num_test_nodes; ++i)
{
std::list<std::pair<udp::endpoint, entry> >::iterator packet = find_packet(nodes[i].ep());
TEST_CHECK(packet != g_sent_packets.end());
if (packet == g_sent_packets.end()) continue;
lazy_from_entry(packet->second, response);
ret = verify_message(&response, get_item_desc, parsed, 6, error_string, sizeof(error_string));
if (!ret)
{
fprintf(stderr, " invalid get request: %s\n", print_entry(response).c_str());
TEST_ERROR(error_string);
continue;
}
char t[10];
snprintf(t, sizeof(t), "%02d", i);
send_dht_response(node, response, nodes[i].ep(), nodes_t(), t, 1234,
std::set<tcp::endpoint>(), 0, 0, std::string(), std::string(), -1, &nodes[i].id);
g_sent_packets.erase(packet);
}
TEST_EQUAL(g_put_count, 1);
TEST_EQUAL(g_sent_packets.size(), num_test_nodes);
if (g_sent_packets.size() != num_test_nodes) break;
itemv.second = bencode(buffer, items[0].ent);
for (int i = 0; i < num_test_nodes; ++i)
{
std::list<std::pair<udp::endpoint, entry> >::iterator packet = find_packet(nodes[i].ep());
TEST_CHECK(packet != g_sent_packets.end());
if (packet == g_sent_packets.end()) continue;
lazy_from_entry(packet->second, response);
ret = verify_message(&response, put_immutable_item_desc, parsed, 7, error_string, sizeof(error_string));
if (ret)
{
TEST_EQUAL(parsed[0]->string_value(), "q");
TEST_EQUAL(parsed[2]->string_value(), "put");
std::pair<const char*, int> v = parsed[6]->data_section();
TEST_EQUAL(v.second, itemv.second);
TEST_CHECK(memcmp(v.first, itemv.first, itemv.second) == 0);
char t[10];
snprintf(t, sizeof(t), "%02d", i);
TEST_EQUAL(parsed[5]->string_value(), t);
if (parsed[0]->string_value() != "q" || parsed[2]->string_value() != "put") continue;
}
else
{
fprintf(stderr, " invalid immutable put request: %s\n", print_entry(response).c_str());
TEST_ERROR(error_string);
continue;
}
}
g_sent_packets.clear();
g_put_item.clear();
g_put_count = 0;
} while (false);
// mutable put
do
{
dht::node_impl node(&ad, &s, sett, node_id::min(), ext, 0);
enum { num_test_nodes = 2 };
node_entry nodes[num_test_nodes] =
{ node_entry(generate_next(), udp::endpoint(address_v4::from_string("4.4.4.4"), 1234))
, node_entry(generate_next(), udp::endpoint(address_v4::from_string("5.5.5.5"), 1235)) };
for (int i = 0; i < num_test_nodes; ++i)
node.m_table.add_node(nodes[i]);
sha1_hash target = hasher(public_key, item_pk_len).final();
g_put_item.assign(items[0].ent, seq, public_key, private_key);
std::string sig(g_put_item.sig(), item_sig_len);
node.get_item(target, get_item_cb);
TEST_EQUAL(g_sent_packets.size(), num_test_nodes);
if (g_sent_packets.size() != num_test_nodes) break;
for (int i = 0; i < num_test_nodes; ++i)
{
std::list<std::pair<udp::endpoint, entry> >::iterator packet = find_packet(nodes[i].ep());
TEST_CHECK(packet != g_sent_packets.end());
if (packet == g_sent_packets.end()) continue;
lazy_from_entry(packet->second, response);
ret = verify_message(&response, get_item_desc, parsed, 6, error_string, sizeof(error_string));
if (!ret)
{
fprintf(stderr, " invalid mutable put request: %s\n", print_entry(response).c_str());
TEST_ERROR(error_string);
continue;
}
char t[10];
snprintf(t, sizeof(t), "%02d", i);
send_dht_response(node, response, nodes[i].ep(), nodes_t(), t, 1234,
std::set<tcp::endpoint>(), 0, 0, std::string(), std::string(), -1, &nodes[i].id);
g_sent_packets.erase(packet);
}
TEST_EQUAL(g_put_count, 1);
TEST_EQUAL(g_sent_packets.size(), num_test_nodes);
if (g_sent_packets.size() != num_test_nodes) break;
itemv.second = bencode(buffer, items[0].ent);
for (int i = 0; i < num_test_nodes; ++i)
{
std::list<std::pair<udp::endpoint, entry> >::iterator packet = find_packet(nodes[i].ep());
TEST_CHECK(packet != g_sent_packets.end());
if (packet == g_sent_packets.end()) continue;
lazy_from_entry(packet->second, response);
ret = verify_message(&response, put_mutable_item_desc, parsed, 11, error_string, sizeof(error_string));
if (ret)
{
TEST_EQUAL(parsed[0]->string_value(), "q");
TEST_EQUAL(parsed[2]->string_value(), "put");
TEST_EQUAL(parsed[6]->string_value(), std::string(public_key, item_pk_len));
TEST_EQUAL(parsed[7]->int_value(), seq);
TEST_EQUAL(parsed[8]->string_value(), sig);
std::pair<const char*, int> v = parsed[10]->data_section();
TEST_EQUAL(v.second, itemv.second);
TEST_CHECK(memcmp(v.first, itemv.first, itemv.second) == 0);
char t[10];
snprintf(t, sizeof(t), "%02d", i);
TEST_EQUAL(parsed[9]->string_value(), t);
if (parsed[0]->string_value() != "q" || parsed[2]->string_value() != "put") continue;
}
else
{
fprintf(stderr, " invalid put request: %s\n", print_entry(response).c_str());
TEST_ERROR(error_string);
continue;
}
}
g_sent_packets.clear();
g_put_item.clear();
g_put_count = 0;
} while (false);
return 0;
}