added disk cache for write operations

This commit is contained in:
Arvid Norberg
2008-02-08 10:22:05 +00:00
parent 196f9c3544
commit 8cf0510144
18 changed files with 645 additions and 166 deletions

View File

@@ -34,22 +34,30 @@ POSSIBILITY OF SUCH DAMAGE.
#include <deque>
#include "libtorrent/disk_io_thread.hpp"
#ifdef _WIN32
#include <malloc.h>
#define alloca(s) _alloca(s)
#endif
#ifdef TORRENT_DISK_STATS
#include "libtorrent/time.hpp"
#endif
namespace libtorrent
{
disk_io_thread::disk_io_thread(int block_size)
disk_io_thread::disk_io_thread(asio::io_service& ios, int block_size)
: m_abort(false)
, m_queue_buffer_size(0)
, m_num_cached_blocks(0)
, m_cache_size(128) // 128 * 16kB = 2MB
, m_pool(block_size)
#ifndef NDEBUG
, m_block_size(block_size)
#endif
, m_writes(0)
, m_blocks_written(0)
, m_ios(ios)
, m_disk_io_thread(boost::ref(*this))
{
#ifdef TORRENT_STATS
@@ -100,6 +108,44 @@ namespace libtorrent
m_disk_io_thread.join();
}
void disk_io_thread::get_cache_info(sha1_hash const& ih, std::vector<cached_piece_info>& ret) const
{
mutex_t::scoped_lock l(m_mutex);
ret.clear();
ret.reserve(m_pieces.size());
for (std::vector<cached_piece_entry>::const_iterator i = m_pieces.begin()
, end(m_pieces.end()); i != end; ++i)
{
torrent_info const& ti = *i->storage->info();
if (ti.info_hash() != ih) continue;
cached_piece_info info;
info.piece = i->piece;
info.last_write = i->last_write;
int blocks_in_piece = (ti.piece_size(i->piece) + (16 * 1024) - 1) / (16 * 1024);
info.blocks.resize(blocks_in_piece);
for (int b = 0; b < blocks_in_piece; ++b)
if (i->blocks[b]) info.blocks[b] = true;
ret.push_back(info);
}
}
cache_status disk_io_thread::status() const
{
mutex_t::scoped_lock l(m_mutex);
cache_status st;
st.blocks_written = m_blocks_written;
st.writes = m_writes;
st.write_size = m_num_cached_blocks;
return st;
}
void disk_io_thread::set_cache_size(int s)
{
mutex_t::scoped_lock l(m_mutex);
TORRENT_ASSERT(s >= 0);
m_cache_size = s;
}
// aborts read operations
void disk_io_thread::stop(boost::intrusive_ptr<piece_manager> s)
{
@@ -115,7 +161,7 @@ namespace libtorrent
}
if (i->action == disk_io_job::read)
{
i->callback(-1, *i);
if (i->callback) m_ios.post(bind(i->callback, -1, *i));
m_jobs.erase(i++);
continue;
}
@@ -150,14 +196,130 @@ namespace libtorrent
return false;
}
}
std::vector<disk_io_thread::cached_piece_entry>::iterator disk_io_thread::find_cached_piece(
disk_io_job const& j, mutex_t::scoped_lock& l)
{
TORRENT_ASSERT(l.locked());
for (std::vector<cached_piece_entry>::iterator i = m_pieces.begin()
, end(m_pieces.end()); i != end; ++i)
{
if (i->storage != j.storage || i->piece != j.piece) continue;
return i;
}
return m_pieces.end();
}
void disk_io_thread::flush_oldest_piece(mutex_t::scoped_lock& l)
{
TORRENT_ASSERT(l.locked());
std::vector<cached_piece_entry>::iterator i = std::min_element(
m_pieces.begin(), m_pieces.end()
, bind(&cached_piece_entry::last_write, _1)
< bind(&cached_piece_entry::last_write, _1));
if (i == m_pieces.end()) return;
flush_and_remove(i, l);
}
void disk_io_thread::flush_and_remove(std::vector<disk_io_thread::cached_piece_entry>::iterator e
, mutex_t::scoped_lock& l)
{
flush(e, l);
m_pieces.erase(e);
}
void disk_io_thread::flush(std::vector<disk_io_thread::cached_piece_entry>::iterator e
, mutex_t::scoped_lock& l)
{
TORRENT_ASSERT(l.locked());
cached_piece_entry& p = *e;
int piece_size = p.storage->info()->piece_size(p.piece);
TORRENT_ASSERT(piece_size > 0);
// char* buf = (char*)alloca(piece_size);
std::vector<char> temp(piece_size);
char* buf = &temp[0];
TORRENT_ASSERT(buf != 0);
int blocks_in_piece = (piece_size + (16 * 1024) - 1) / (16 * 1024);
int buffer_size = 0;
int offset = 0;
for (int i = 0; i <= blocks_in_piece; ++i)
{
if (i == blocks_in_piece || p.blocks[i] == 0)
{
if (buffer_size == 0) continue;
TORRENT_ASSERT(buffer_size <= i * 16 * 1024);
l.unlock();
p.storage->write_impl(buf, p.piece, (std::min)(i * 16 * 1024, piece_size) - buffer_size, buffer_size);
l.lock();
++m_writes;
// std::cerr << " flushing p: " << p.piece << " bytes: " << buffer_size << std::endl;
buffer_size = 0;
offset = 0;
continue;
}
int block_size = (std::min)(piece_size - offset, 16 * 1024);
TORRENT_ASSERT(offset + block_size <= piece_size);
TORRENT_ASSERT(offset + block_size > 0);
std::memcpy(buf + offset, p.blocks[i], block_size);
offset += 16 * 1024;
free_buffer(p.blocks[i], l);
p.blocks[i] = 0;
buffer_size += block_size;
++m_blocks_written;
--m_num_cached_blocks;
}
TORRENT_ASSERT(buffer_size == 0);
// std::cerr << " flushing p: " << p.piece << " cached_blocks: " << m_num_cached_blocks << std::endl;
#ifndef NDEBUG
for (int i = 0; i < blocks_in_piece; ++i)
TORRENT_ASSERT(p.blocks[i] == 0);
#endif
}
void disk_io_thread::cache_block(disk_io_job& j, mutex_t::scoped_lock& l)
{
TORRENT_ASSERT(l.locked());
TORRENT_ASSERT(find_cached_piece(j, l) == m_pieces.end());
cached_piece_entry p;
int piece_size = j.storage->info()->piece_size(j.piece);
int blocks_in_piece = (piece_size + (16 * 1024) - 1) / (16 * 1024);
p.piece = j.piece;
p.storage = j.storage;
p.last_write = time_now();
p.num_blocks = 1;
p.blocks.reset(new char*[blocks_in_piece]);
std::memset(&p.blocks[0], 0, blocks_in_piece * sizeof(char*));
int block = j.offset / (16 * 1024);
// std::cerr << " adding cache entry for p: " << j.piece << " block: " << block << " cached_blocks: " << m_num_cached_blocks << std::endl;
p.blocks[block] = j.buffer;
++m_num_cached_blocks;
m_pieces.push_back(p);
}
void disk_io_thread::add_job(disk_io_job const& j
, boost::function<void(int, disk_io_job const&)> const& f)
{
TORRENT_ASSERT(!j.callback);
TORRENT_ASSERT(j.storage);
TORRENT_ASSERT(j.buffer_size <= 16 * 1024);
mutex_t::scoped_lock l(m_mutex);
#ifndef NDEBUG
if (j.action == disk_io_job::write)
{
std::vector<cached_piece_entry>::iterator p = find_cached_piece(j, l);
if (p != m_pieces.end())
{
int block = j.offset / (16 * 1024);
char const* buffer = p->blocks[block];
TORRENT_ASSERT(buffer == 0);
}
}
#endif
std::deque<disk_io_job>::reverse_iterator i = m_jobs.rbegin();
if (j.action == disk_io_job::read)
{
@@ -212,15 +374,27 @@ namespace libtorrent
char* disk_io_thread::allocate_buffer()
{
mutex_t::scoped_lock l(m_mutex);
return allocate_buffer(l);
}
void disk_io_thread::free_buffer(char* buf)
{
mutex_t::scoped_lock l(m_mutex);
free_buffer(buf, l);
}
char* disk_io_thread::allocate_buffer(mutex_t::scoped_lock& l)
{
TORRENT_ASSERT(l.locked());
#ifdef TORRENT_STATS
++m_allocations;
#endif
return (char*)m_pool.ordered_malloc();
}
void disk_io_thread::free_buffer(char* buf)
void disk_io_thread::free_buffer(char* buf, mutex_t::scoped_lock& l)
{
mutex_t::scoped_lock l(m_mutex);
TORRENT_ASSERT(l.locked());
#ifdef TORRENT_STATS
--m_allocations;
#endif
@@ -288,27 +462,53 @@ namespace libtorrent
// usleep(300);
break;
case disk_io_job::write:
{
mutex_t::scoped_lock l(m_mutex);
#ifdef TORRENT_DISK_STATS
m_log << log_time() << " write " << j.buffer_size << std::endl;
#endif
std::vector<cached_piece_entry>::iterator p = find_cached_piece(j, l);
int block = j.offset / (16 * 1024);
TORRENT_ASSERT(j.buffer);
TORRENT_ASSERT(j.buffer_size <= m_block_size);
j.storage->write_impl(j.buffer, j.piece, j.offset
, j.buffer_size);
if (p != m_pieces.end())
{
TORRENT_ASSERT(p->blocks[block] == 0);
if (p->blocks[block]) free_buffer(p->blocks[block]);
p->blocks[block] = j.buffer;
++m_num_cached_blocks;
++p->num_blocks;
p->last_write = time_now();
// std::cerr << " adding cache entry for p: " << j.piece
// << " block: " << block
// << " cached_blocks: " << m_num_cached_blocks << std::endl;
}
else
{
cache_block(j, l);
}
free_current_buffer = false;
if (m_num_cached_blocks >= m_cache_size)
flush_oldest_piece(l);
// simulates a slow drive
// usleep(300);
break;
}
case disk_io_job::hash:
{
{
#ifdef TORRENT_DISK_STATS
m_log << log_time() << " hash" << std::endl;
m_log << log_time() << " hash" << std::endl;
#endif
sha1_hash h = j.storage->hash_for_piece_impl(j.piece);
j.str.resize(20);
std::memcpy(&j.str[0], &h[0], 20);
}
mutex_t::scoped_lock l(m_mutex);
std::vector<cached_piece_entry>::iterator i = find_cached_piece(j, l);
if (i != m_pieces.end()) flush_and_remove(i, l);
l.unlock();
sha1_hash h = j.storage->hash_for_piece_impl(j.piece);
j.str.resize(20);
std::memcpy(&j.str[0], &h[0], 20);
break;
}
case disk_io_job::move_storage:
#ifdef TORRENT_DISK_STATS
m_log << log_time() << " move" << std::endl;
@@ -317,17 +517,50 @@ namespace libtorrent
j.str = j.storage->save_path().string();
break;
case disk_io_job::release_files:
{
#ifdef TORRENT_DISK_STATS
m_log << log_time() << " release" << std::endl;
#endif
mutex_t::scoped_lock l(m_mutex);
std::vector<cached_piece_entry>::iterator i = std::remove_if(
m_pieces.begin(), m_pieces.end(), bind(&cached_piece_entry::storage, _1) == j.storage);
for (std::vector<cached_piece_entry>::iterator k = i; k != m_pieces.end(); ++k)
flush(k, l);
m_pieces.erase(i, m_pieces.end());
m_pool.release_memory();
l.unlock();
j.storage->release_files_impl();
break;
}
case disk_io_job::delete_files:
{
#ifdef TORRENT_DISK_STATS
m_log << log_time() << " delete" << std::endl;
#endif
mutex_t::scoped_lock l(m_mutex);
std::vector<cached_piece_entry>::iterator i = std::remove_if(
m_pieces.begin(), m_pieces.end(), bind(&cached_piece_entry::storage, _1) == j.storage);
for (std::vector<cached_piece_entry>::iterator k = i; k != m_pieces.end(); ++k)
{
torrent_info const& ti = *k->storage->info();
int blocks_in_piece = (ti.piece_size(k->piece) + (16 * 1024) - 1) / (16 * 1024);
for (int j = 0; j < blocks_in_piece; ++j)
{
if (k->blocks[j] == 0) continue;
free_buffer(k->blocks[j], l);
k->blocks[j] = 0;
}
}
m_pieces.erase(i, m_pieces.end());
m_pool.release_memory();
l.unlock();
j.storage->delete_files_impl();
break;
}
}
}
catch (std::exception& e)
@@ -343,8 +576,8 @@ namespace libtorrent
// if (!handler) std::cerr << "DISK THREAD: no callback specified" << std::endl;
// else std::cerr << "DISK THREAD: invoking callback" << std::endl;
try { if (handler) handler(ret, j); }
catch (std::exception&) {}
if (handler) m_ios.post(bind(handler, ret, j));
#ifndef NDEBUG
m_current.storage = 0;