factor out disk_buffer_pool from disk_io_thread. refactored the file open modes to be platform independent. gave the disk_io_thread its own copy of session_settings that it shares with storage. added an unaligned_read implementation to storage. Added options to session_settings on when to open files in unbuffered mode. Added unit tests for unaligned reads

This commit is contained in:
Arvid Norberg
2009-01-21 07:31:49 +00:00
parent 9a9f08f70f
commit 00808473e7
11 changed files with 423 additions and 316 deletions

View File

@@ -44,26 +44,102 @@ POSSIBILITY OF SUCH DAMAGE.
namespace libtorrent
{
disk_io_thread::disk_io_thread(asio::io_service& ios, int block_size)
: m_abort(false)
, m_queue_buffer_size(0)
, m_cache_size(512) // 512 * 16kB = 8MB
, m_cache_expiry(60) // 1 minute
, m_coalesce_writes(false)
, m_coalesce_reads(false)
, m_use_read_cache(true)
, m_disk_io_no_buffer(true)
disk_buffer_pool::disk_buffer_pool(int block_size)
: m_block_size(block_size)
#ifndef TORRENT_DISABLE_POOL_ALLOCATOR
, m_pool(block_size)
#endif
, m_block_size(block_size)
, m_ios(ios)
, m_disk_io_thread(boost::ref(*this))
{
#ifdef TORRENT_STATS
m_allocations = 0;
#endif
}
#ifdef TORRENT_DEBUG
bool disk_buffer_pool::is_disk_buffer(char* buffer) const
{
#ifdef TORRENT_DISABLE_POOL_ALLOCATOR
return true;
#else
mutex_t::scoped_lock l(m_pool_mutex);
return m_pool.is_from(buffer);
#endif
}
#endif
char* disk_buffer_pool::allocate_buffer()
{
mutex_t::scoped_lock l(m_pool_mutex);
#ifdef TORRENT_STATS
++m_allocations;
#endif
#ifdef TORRENT_DISABLE_POOL_ALLOCATOR
return page_aligned_allocator::malloc(m_block_size);
#else
return (char*)m_pool.ordered_malloc();
#endif
}
void disk_buffer_pool::free_buffer(char* buf)
{
TORRENT_ASSERT(buf);
mutex_t::scoped_lock l(m_pool_mutex);
#ifdef TORRENT_STATS
--m_allocations;
#endif
#ifdef TORRENT_DISABLE_POOL_ALLOCATOR
page_aligned_allocator::free(buf);
#else
m_pool.ordered_free(buf);
#endif
}
char* disk_buffer_pool::allocate_buffers(int num_blocks)
{
mutex_t::scoped_lock l(m_pool_mutex);
#ifdef TORRENT_STATS
m_allocations += num_blocks;
#endif
#ifdef TORRENT_DISABLE_POOL_ALLOCATOR
return page_aligned_allocator::malloc(m_block_size * num_blocks);
#else
return (char*)m_pool.ordered_malloc(num_blocks);
#endif
}
void disk_buffer_pool::free_buffers(char* buf, int num_blocks)
{
TORRENT_ASSERT(buf);
TORRENT_ASSERT(num_blocks >= 1);
mutex_t::scoped_lock l(m_pool_mutex);
#ifdef TORRENT_STATS
m_allocations -= num_blocks;
#endif
#ifdef TORRENT_DISABLE_POOL_ALLOCATOR
page_aligned_allocator::free(buf);
#else
m_pool.ordered_free(buf, num_blocks);
#endif
}
void disk_buffer_pool::release_memory()
{
#ifndef TORRENT_DISABLE_POOL_ALLOCATOR
mutex_t::scoped_lock l(m_pool_mutex);
m_pool.release_memory();
#endif
}
// ------- disk_io_thread ------
disk_io_thread::disk_io_thread(asio::io_service& ios, int block_size)
: disk_buffer_pool(block_size)
, m_abort(false)
, m_queue_buffer_size(0)
, m_ios(ios)
, m_disk_io_thread(boost::ref(*this))
{
#ifdef TORRENT_DISK_STATS
m_log.open("disk_io_thread.log", std::ios::trunc);
#endif
@@ -222,7 +298,7 @@ namespace libtorrent
< bind(&cached_piece_entry::last_use, _2));
if (i == m_pieces.end()) break;
int age = total_seconds(now - i->last_use);
if (age < m_cache_expiry) break;
if (age < m_settings.cache_expiry) break;
flush_and_remove(i, l);
}
@@ -235,7 +311,7 @@ namespace libtorrent
< bind(&cached_piece_entry::last_use, _2));
if (i == m_read_pieces.end()) break;
int age = total_seconds(now - i->last_use);
if (age < m_cache_expiry) break;
if (age < m_settings.cache_expiry) break;
free_piece(*i, l);
m_read_pieces.erase(i);
}
@@ -319,7 +395,7 @@ namespace libtorrent
boost::scoped_array<char> buf;
file::iovec_t* iov = 0;
int iov_counter = 0;
if (m_coalesce_writes) buf.reset(new (std::nothrow) char[piece_size]);
if (m_settings.coalesce_writes) buf.reset(new (std::nothrow) char[piece_size]);
else iov = TORRENT_ALLOCA(file::iovec_t, blocks_in_piece);
for (int i = 0; i <= blocks_in_piece; ++i)
@@ -417,7 +493,7 @@ namespace libtorrent
int end_block = start_block;
for (int i = start_block; i < blocks_in_piece
&& m_cache_stats.cache_size < m_cache_size; ++i)
&& m_cache_stats.cache_size < m_settings.cache_size; ++i)
{
// this is a block that is already allocated
// stop allocating and don't read more than
@@ -444,7 +520,7 @@ namespace libtorrent
boost::scoped_array<char> buf;
boost::scoped_array<file::iovec_t> iov;
int iov_counter = 0;
if (m_coalesce_reads) buf.reset(new (std::nothrow) char[buffer_size]);
if (m_settings.coalesce_reads) buf.reset(new (std::nothrow) char[buffer_size]);
else iov.reset(new file::iovec_t[end_block - start_block]);
int ret = 0;
@@ -454,6 +530,7 @@ namespace libtorrent
file::iovec_t b = { buf.get(), buffer_size };
ret = p.storage->read_impl(&b, p.piece, start_block * m_block_size, 1);
l.lock();
TORRENT_ASSERT(ret == buffer_size || p.storage->error());
if (p.storage->error()) { return -1; }
++m_cache_stats.reads;
}
@@ -486,6 +563,7 @@ namespace libtorrent
l.unlock();
ret = p.storage->read_impl(iov.get(), p.piece, start_block * m_block_size, iov_counter);
l.lock();
TORRENT_ASSERT(ret == buffer_size || p.storage->error());
if (p.storage->error()) { return -1; }
++m_cache_stats.reads;
}
@@ -499,14 +577,14 @@ namespace libtorrent
, cache_t::iterator ignore
, mutex_t::scoped_lock& l)
{
if (m_cache_size - m_cache_stats.cache_size < num_blocks)
if (m_settings.cache_size - m_cache_stats.cache_size < num_blocks)
{
// there's not enough room in the cache, clear a piece
// from the read cache
if (!clear_oldest_read_piece(ignore, l)) return false;
}
return m_cache_size - m_cache_stats.cache_size >= num_blocks;
return m_settings.cache_size - m_cache_stats.cache_size >= num_blocks;
}
// returns -1 on read error, -2 if there isn't any space in the cache
@@ -597,7 +675,7 @@ namespace libtorrent
// when writing, there may be a one block difference, right before an old piece
// is flushed
TORRENT_ASSERT(m_cache_stats.cache_size <= m_cache_size + 1);
TORRENT_ASSERT(m_cache_stats.cache_size <= m_settings.cache_size + 1);
}
#endif
@@ -606,7 +684,7 @@ namespace libtorrent
TORRENT_ASSERT(j.buffer);
mutex_t::scoped_lock l(m_piece_mutex);
if (!m_use_read_cache) return -2;
if (!m_settings.use_read_cache) return -2;
cache_t::iterator p
= find_cached_piece(m_read_pieces, j, l);
@@ -729,73 +807,6 @@ namespace libtorrent
m_signal.notify_all();
}
#ifdef TORRENT_DEBUG
bool disk_io_thread::is_disk_buffer(char* buffer) const
{
#ifdef TORRENT_DISABLE_POOL_ALLOCATOR
return true;
#else
mutex_t::scoped_lock l(m_pool_mutex);
return m_pool.is_from(buffer);
#endif
}
#endif
char* disk_io_thread::allocate_buffer()
{
mutex_t::scoped_lock l(m_pool_mutex);
#ifdef TORRENT_STATS
++m_allocations;
#endif
#ifdef TORRENT_DISABLE_POOL_ALLOCATOR
return page_aligned_allocator::malloc(m_block_size);
#else
return (char*)m_pool.ordered_malloc();
#endif
}
void disk_io_thread::free_buffer(char* buf)
{
TORRENT_ASSERT(buf);
mutex_t::scoped_lock l(m_pool_mutex);
#ifdef TORRENT_STATS
--m_allocations;
#endif
#ifdef TORRENT_DISABLE_POOL_ALLOCATOR
page_aligned_allocator::free(buf);
#else
m_pool.ordered_free(buf);
#endif
}
char* disk_io_thread::allocate_buffers(int num_blocks)
{
mutex_t::scoped_lock l(m_pool_mutex);
#ifdef TORRENT_STATS
m_allocations += num_blocks;
#endif
#ifdef TORRENT_DISABLE_POOL_ALLOCATOR
return page_aligned_allocator::malloc(m_block_size * num_blocks);
#else
return (char*)m_pool.ordered_malloc(num_blocks);
#endif
}
void disk_io_thread::free_buffers(char* buf, int num_blocks)
{
TORRENT_ASSERT(buf);
TORRENT_ASSERT(num_blocks >= 1);
mutex_t::scoped_lock l(m_pool_mutex);
#ifdef TORRENT_STATS
m_allocations -= num_blocks;
#endif
#ifdef TORRENT_DISABLE_POOL_ALLOCATOR
page_aligned_allocator::free(buf);
#else
m_pool.ordered_free(buf, num_blocks);
#endif
}
bool disk_io_thread::test_error(disk_io_job& j)
{
error_code const& ec = j.storage->error();
@@ -868,6 +879,9 @@ namespace libtorrent
try {
#endif
if (j.storage && j.storage->get_storage_impl()->m_settings == 0)
j.storage->get_storage_impl()->m_settings = &m_settings;
switch (j.action)
{
case disk_io_job::update_settings:
@@ -880,11 +894,7 @@ namespace libtorrent
TORRENT_ASSERT(s.cache_size >= 0);
TORRENT_ASSERT(s.cache_expiry > 0);
mutex_t::scoped_lock l(m_piece_mutex);
m_cache_size = s.cache_size;
m_cache_expiry = s.cache_expiry;
m_use_read_cache = s.use_read_cache;
m_disk_io_no_buffer = s.disk_io_no_buffer;
m_settings = s;
}
case disk_io_job::abort_torrent:
{
@@ -1024,7 +1034,7 @@ namespace libtorrent
// in the cache, we should not
// free it at the end
holder.release();
if (m_cache_stats.cache_size >= m_cache_size)
if (m_cache_stats.cache_size >= m_settings.cache_size)
flush_oldest_piece(l);
break;
}
@@ -1098,12 +1108,8 @@ namespace libtorrent
}
}
l.unlock();
#ifndef TORRENT_DISABLE_POOL_ALLOCATOR
{
mutex_t::scoped_lock l(m_pool_mutex);
m_pool.release_memory();
}
#endif
release_memory();
ret = j.storage->release_files_impl();
if (ret != 0) test_error(j);
break;
@@ -1132,12 +1138,7 @@ namespace libtorrent
}
}
l.unlock();
#ifndef TORRENT_DISABLE_POOL_ALLOCATOR
{
mutex_t::scoped_lock l(m_pool_mutex);
m_pool.release_memory();
}
#endif
release_memory();
ret = 0;
break;
}
@@ -1168,12 +1169,8 @@ namespace libtorrent
}
m_pieces.erase(i, m_pieces.end());
l.unlock();
#ifndef TORRENT_DISABLE_POOL_ALLOCATOR
{
mutex_t::scoped_lock l(m_pool_mutex);
m_pool.release_memory();
}
#endif
release_memory();
ret = j.storage->delete_files_impl();
if (ret != 0) test_error(j);
break;