got rid of the last recursive mutexes. abstracted the threading primitives (and switched over to use asio's internal ones).

This commit is contained in:
Arvid Norberg
2009-10-20 02:49:56 +00:00
parent a5fb1b3455
commit 8a5b7d5d36
41 changed files with 449 additions and 484 deletions

View File

@@ -84,7 +84,7 @@ namespace libtorrent
#if defined TORRENT_DEBUG || defined TORRENT_DISK_STATS
bool disk_buffer_pool::is_disk_buffer(char* buffer
,boost::mutex::scoped_lock& l) const
, mutex::scoped_lock& l) const
{
TORRENT_ASSERT(m_magic == 0x1337);
#ifdef TORRENT_DISK_STATS
@@ -100,14 +100,14 @@ namespace libtorrent
bool disk_buffer_pool::is_disk_buffer(char* buffer) const
{
mutex_t::scoped_lock l(m_pool_mutex);
mutex::scoped_lock l(m_pool_mutex);
return is_disk_buffer(buffer, l);
}
#endif
char* disk_buffer_pool::allocate_buffer(char const* category)
{
mutex_t::scoped_lock l(m_pool_mutex);
mutex::scoped_lock l(m_pool_mutex);
TORRENT_ASSERT(m_magic == 0x1337);
#ifdef TORRENT_DISABLE_POOL_ALLOCATOR
char* ret = page_aligned_allocator::malloc(m_block_size);
@@ -142,7 +142,7 @@ namespace libtorrent
#ifdef TORRENT_DISK_STATS
void disk_buffer_pool::rename_buffer(char* buf, char const* category)
{
mutex_t::scoped_lock l(m_pool_mutex);
mutex::scoped_lock l(m_pool_mutex);
TORRENT_ASSERT(is_disk_buffer(buf, l));
TORRENT_ASSERT(m_categories.find(m_buf_to_category[buf])
!= m_categories.end());
@@ -161,7 +161,7 @@ namespace libtorrent
void disk_buffer_pool::free_buffer(char* buf)
{
TORRENT_ASSERT(buf);
mutex_t::scoped_lock l(m_pool_mutex);
mutex::scoped_lock l(m_pool_mutex);
TORRENT_ASSERT(m_magic == 0x1337);
TORRENT_ASSERT(is_disk_buffer(buf, l));
#if defined TORRENT_DISK_STATS || defined TORRENT_STATS
@@ -195,7 +195,7 @@ namespace libtorrent
char* disk_buffer_pool::allocate_buffers(int num_blocks, char const* category)
{
mutex_t::scoped_lock l(m_pool_mutex);
mutex::scoped_lock l(m_pool_mutex);
TORRENT_ASSERT(m_magic == 0x1337);
#ifdef TORRENT_DISABLE_POOL_ALLOCATOR
char* ret = page_aligned_allocator::malloc(m_block_size * num_blocks);
@@ -230,7 +230,7 @@ namespace libtorrent
{
TORRENT_ASSERT(buf);
TORRENT_ASSERT(num_blocks >= 1);
mutex_t::scoped_lock l(m_pool_mutex);
mutex::scoped_lock l(m_pool_mutex);
TORRENT_ASSERT(m_magic == 0x1337);
TORRENT_ASSERT(is_disk_buffer(buf, l));
#if defined TORRENT_DISK_STATS || defined TORRENT_STATS
@@ -266,7 +266,7 @@ namespace libtorrent
{
TORRENT_ASSERT(m_magic == 0x1337);
#ifndef TORRENT_DISABLE_POOL_ALLOCATOR
mutex_t::scoped_lock l(m_pool_mutex);
mutex::scoped_lock l(m_pool_mutex);
m_pool.release_memory();
#endif
}
@@ -285,7 +285,7 @@ namespace libtorrent
, m_ios(ios)
, m_queue_callback(queue_callback)
, m_work(io_service::work(m_ios))
, m_disk_io_thread(boost::ref(*this))
, m_disk_io_thread(boost::bind(&disk_io_thread::thread_fun, this))
{
#ifdef TORRENT_DISK_STATS
m_log.open("disk_io_thread.log", std::ios::trunc);
@@ -299,12 +299,12 @@ namespace libtorrent
void disk_io_thread::join()
{
mutex_t::scoped_lock l(m_queue_mutex);
mutex::scoped_lock l(m_queue_mutex);
disk_io_job j;
m_waiting_to_shutdown = true;
j.action = disk_io_job::abort_thread;
m_jobs.insert(m_jobs.begin(), j);
m_signal.notify_all();
m_signal.signal(l);
l.unlock();
m_disk_io_thread.join();
@@ -315,7 +315,7 @@ namespace libtorrent
void disk_io_thread::get_cache_info(sha1_hash const& ih, std::vector<cached_piece_info>& ret) const
{
mutex_t::scoped_lock l(m_piece_mutex);
mutex::scoped_lock l(m_piece_mutex);
ret.clear();
ret.reserve(m_pieces.size());
for (cache_t::const_iterator i = m_pieces.begin()
@@ -352,7 +352,7 @@ namespace libtorrent
cache_status disk_io_thread::status() const
{
mutex_t::scoped_lock l(m_piece_mutex);
mutex::scoped_lock l(m_piece_mutex);
m_cache_stats.total_used_buffers = in_use();
m_cache_stats.queued_bytes = m_queue_buffer_size;
return m_cache_stats;
@@ -361,7 +361,7 @@ namespace libtorrent
// aborts read operations
void disk_io_thread::stop(boost::intrusive_ptr<piece_manager> s)
{
mutex_t::scoped_lock l(m_queue_mutex);
mutex::scoped_lock l(m_queue_mutex);
// read jobs are aborted, write and move jobs are syncronized
for (std::list<disk_io_job>::iterator i = m_jobs.begin();
i != m_jobs.end();)
@@ -388,7 +388,7 @@ namespace libtorrent
disk_io_job j;
j.action = disk_io_job::abort_torrent;
j.storage = s;
add_job(j);
add_job(j, l);
}
bool range_overlap(int start1, int length1, int start2, int length2)
@@ -420,7 +420,7 @@ namespace libtorrent
disk_io_thread::cache_t::iterator disk_io_thread::find_cached_piece(
disk_io_thread::cache_t& cache
, disk_io_job const& j, mutex_t::scoped_lock& l)
, disk_io_job const& j, mutex::scoped_lock& l)
{
for (cache_t::iterator i = cache.begin()
, end(cache.end()); i != end; ++i)
@@ -435,7 +435,7 @@ namespace libtorrent
{
ptime now = time_now();
mutex_t::scoped_lock l(m_piece_mutex);
mutex::scoped_lock l(m_piece_mutex);
INVARIANT_CHECK;
// flush write cache
@@ -467,7 +467,7 @@ namespace libtorrent
}
// returns the number of blocks that were freed
int disk_io_thread::free_piece(cached_piece_entry& p, mutex_t::scoped_lock& l)
int disk_io_thread::free_piece(cached_piece_entry& p, mutex::scoped_lock& l)
{
int piece_size = p.storage->info()->piece_size(p.piece);
int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
@@ -490,7 +490,7 @@ namespace libtorrent
int disk_io_thread::clear_oldest_read_piece(
int num_blocks
, cache_t::iterator ignore
, mutex_t::scoped_lock& l)
, mutex::scoped_lock& l)
{
INVARIANT_CHECK;
@@ -564,7 +564,7 @@ namespace libtorrent
}
int disk_io_thread::flush_contiguous_blocks(disk_io_thread::cache_t::iterator e
, mutex_t::scoped_lock& l, int lower_limit)
, mutex::scoped_lock& l, int lower_limit)
{
// first find the largest range of contiguous blocks
int len = 0;
@@ -600,7 +600,7 @@ namespace libtorrent
}
// flushes 'blocks' blocks from the cache
int disk_io_thread::flush_cache_blocks(mutex_t::scoped_lock& l
int disk_io_thread::flush_cache_blocks(mutex::scoped_lock& l
, int blocks, cache_t::iterator ignore, int options)
{
// first look if there are any read cache entries that can
@@ -647,7 +647,7 @@ namespace libtorrent
}
int disk_io_thread::flush_and_remove(disk_io_thread::cache_t::iterator e
, mutex_t::scoped_lock& l)
, mutex::scoped_lock& l)
{
int ret = flush_range(e, 0, INT_MAX, l);
m_pieces.erase(e);
@@ -655,7 +655,7 @@ namespace libtorrent
}
int disk_io_thread::flush_range(disk_io_thread::cache_t::iterator e
, int start, int end, mutex_t::scoped_lock& l)
, int start, int end, mutex::scoped_lock& l)
{
INVARIANT_CHECK;
@@ -761,7 +761,7 @@ namespace libtorrent
// returns -1 on failure
int disk_io_thread::cache_block(disk_io_job& j
, boost::function<void(int,disk_io_job const&)>& handler
, mutex_t::scoped_lock& l)
, mutex::scoped_lock& l)
{
INVARIANT_CHECK;
TORRENT_ASSERT(find_cached_piece(m_pieces, j, l) == m_pieces.end());
@@ -796,7 +796,7 @@ namespace libtorrent
// fills a piece with data from disk, returns the total number of bytes
// read or -1 if there was an error
int disk_io_thread::read_into_piece(cached_piece_entry& p, int start_block
, int options, int num_blocks, mutex_t::scoped_lock& l)
, int options, int num_blocks, mutex::scoped_lock& l)
{
int piece_size = p.storage->info()->piece_size(p.piece);
int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
@@ -904,7 +904,7 @@ namespace libtorrent
// piece regardless of the offset in j
// this is used for seed-mode, where we need to read the entire piece to calculate
// the hash
int disk_io_thread::cache_read_piece(disk_io_job const& j, mutex_t::scoped_lock& l)
int disk_io_thread::cache_read_piece(disk_io_job const& j, mutex::scoped_lock& l)
{
INVARIANT_CHECK;
@@ -933,7 +933,7 @@ namespace libtorrent
// returns -1 on read error, -2 if there isn't any space in the cache
// or the number of bytes read
int disk_io_thread::cache_read_block(disk_io_job const& j, mutex_t::scoped_lock& l)
int disk_io_thread::cache_read_block(disk_io_job const& j, mutex::scoped_lock& l)
{
INVARIANT_CHECK;
@@ -1041,7 +1041,7 @@ namespace libtorrent
{
TORRENT_ASSERT(j.buffer);
mutex_t::scoped_lock l(m_piece_mutex);
mutex::scoped_lock l(m_piece_mutex);
cache_t::iterator p
= find_cached_piece(m_read_pieces, j, l);
@@ -1103,7 +1103,7 @@ namespace libtorrent
}
int disk_io_thread::copy_from_piece(cache_t::iterator p, bool& hit
, disk_io_job const& j, mutex_t::scoped_lock& l)
, disk_io_job const& j, mutex::scoped_lock& l)
{
TORRENT_ASSERT(j.buffer);
@@ -1165,7 +1165,7 @@ namespace libtorrent
{
TORRENT_ASSERT(j.buffer);
mutex_t::scoped_lock l(m_piece_mutex);
mutex::scoped_lock l(m_piece_mutex);
if (!m_settings.use_read_cache) return -2;
cache_t::iterator p
@@ -1202,10 +1202,22 @@ namespace libtorrent
size_type disk_io_thread::queue_buffer_size() const
{
mutex_t::scoped_lock l(m_queue_mutex);
mutex::scoped_lock l(m_queue_mutex);
return m_queue_buffer_size;
}
void disk_io_thread::add_job(disk_io_job const& j
, mutex::scoped_lock& l
, boost::function<void(int, disk_io_job const&)> const& f)
{
m_jobs.push_back(j);
m_jobs.back().callback.swap(const_cast<boost::function<void(int, disk_io_job const&)>&>(f));
if (j.action == disk_io_job::write)
m_queue_buffer_size += j.buffer_size;
m_signal.signal(l);
}
void disk_io_thread::add_job(disk_io_job const& j
, boost::function<void(int, disk_io_job const&)> const& f)
{
@@ -1215,56 +1227,8 @@ namespace libtorrent
|| j.action == disk_io_job::abort_thread
|| j.action == disk_io_job::update_settings);
TORRENT_ASSERT(j.buffer_size <= m_block_size);
mutex_t::scoped_lock l(m_queue_mutex);
std::list<disk_io_job>::reverse_iterator i = m_jobs.rbegin();
if (j.action == disk_io_job::read)
{
// when we're reading, we may not skip
// ahead of any write operation that overlaps
// the region we're reading
for (; i != m_jobs.rend(); i++)
{
// if *i should come before j, stop
// and insert j before i
if (*i < j) break;
// if we come across a write operation that
// overlaps the region we're reading, we need
// to stop
if (i->action == disk_io_job::write
&& i->storage == j.storage
&& i->piece == j.piece
&& range_overlap(i->offset, i->buffer_size
, j.offset, j.buffer_size))
break;
}
}
else if (j.action == disk_io_job::write)
{
for (; i != m_jobs.rend(); ++i)
{
if (*i < j)
{
if (i != m_jobs.rbegin()
&& i.base()->storage.get() != j.storage.get())
i = m_jobs.rbegin();
break;
}
}
}
// if we are placed in front of all other jobs, put it on the back of
// the queue, to sweep the disk in the same direction, and to avoid
// starvation. The exception is if the priority is higher than the
// job at the front of the queue
if (i == m_jobs.rend() && (m_jobs.empty() || j.priority <= m_jobs.back().priority))
i = m_jobs.rbegin();
std::list<disk_io_job>::iterator k = m_jobs.insert(i.base(), j);
k->callback.swap(const_cast<boost::function<void(int, disk_io_job const&)>&>(f));
if (j.action == disk_io_job::write)
m_queue_buffer_size += j.buffer_size;
m_signal.notify_all();
mutex::scoped_lock l(m_queue_mutex);
add_job(j, l, f);
}
bool disk_io_thread::test_error(disk_io_job& j)
@@ -1278,8 +1242,7 @@ namespace libtorrent
j.error = ec;
j.error_file = j.storage->error_file();
#ifdef TORRENT_DEBUG
std::cout << "ERROR: '" << ec.message() << " in "
<< j.error_file << std::endl;
printf("ERROR: '%s' in %s\n", ec.message().c_str(), j.error_file.c_str());
#endif
j.storage->clear_error();
return true;
@@ -1340,7 +1303,7 @@ namespace libtorrent
return action_flags[j.action] & buffer_operation;
}
void disk_io_thread::operator()()
void disk_io_thread::thread_fun()
{
size_type elevator_position = 0;
int elevator_direction = 1;
@@ -1350,7 +1313,7 @@ namespace libtorrent
#ifdef TORRENT_DISK_STATS
m_log << log_time() << " idle" << std::endl;
#endif
mutex_t::scoped_lock jl(m_queue_mutex);
mutex::scoped_lock jl(m_queue_mutex);
while (m_jobs.empty() && !m_abort)
{
@@ -1359,13 +1322,14 @@ namespace libtorrent
// if (!m_signal.timed_wait(jl, boost::posix_time::seconds(1)))
// flush_expired_pieces();
m_signal.wait(jl);
m_signal.clear(jl);
}
if (m_abort && m_jobs.empty())
{
jl.unlock();
mutex_t::scoped_lock l(m_piece_mutex);
mutex::scoped_lock l(m_piece_mutex);
// flush all disk caches
for (cache_t::iterator i = m_pieces.begin()
, end(m_pieces.end()); i != end; ++i)
@@ -1521,7 +1485,7 @@ namespace libtorrent
#ifdef TORRENT_DISK_STATS
m_log << log_time() << " abort_torrent " << std::endl;
#endif
mutex_t::scoped_lock jl(m_queue_mutex);
mutex::scoped_lock jl(m_queue_mutex);
for (std::list<disk_io_job>::iterator i = m_jobs.begin();
i != m_jobs.end();)
{
@@ -1545,7 +1509,7 @@ namespace libtorrent
#ifdef TORRENT_DISK_STATS
m_log << log_time() << " abort_thread " << std::endl;
#endif
mutex_t::scoped_lock jl(m_queue_mutex);
mutex::scoped_lock jl(m_queue_mutex);
for (std::list<disk_io_job>::iterator i = m_jobs.begin();
i != m_jobs.end();)
@@ -1702,7 +1666,7 @@ namespace libtorrent
#ifdef TORRENT_DISK_STATS
m_log << log_time() << " write " << j.buffer_size << std::endl;
#endif
mutex_t::scoped_lock l(m_piece_mutex);
mutex::scoped_lock l(m_piece_mutex);
INVARIANT_CHECK;
if (in_use() >= m_settings.cache_size)
@@ -1762,7 +1726,7 @@ namespace libtorrent
#ifdef TORRENT_DISK_STATS
m_log << log_time() << " hash" << std::endl;
#endif
mutex_t::scoped_lock l(m_piece_mutex);
mutex::scoped_lock l(m_piece_mutex);
INVARIANT_CHECK;
cache_t::iterator i
@@ -1817,7 +1781,7 @@ namespace libtorrent
#endif
TORRENT_ASSERT(j.buffer == 0);
mutex_t::scoped_lock l(m_piece_mutex);
mutex::scoped_lock l(m_piece_mutex);
INVARIANT_CHECK;
for (cache_t::iterator i = m_pieces.begin(); i != m_pieces.end();)
@@ -1846,7 +1810,7 @@ namespace libtorrent
#endif
TORRENT_ASSERT(j.buffer == 0);
mutex_t::scoped_lock l(m_piece_mutex);
mutex::scoped_lock l(m_piece_mutex);
INVARIANT_CHECK;
for (cache_t::iterator i = m_read_pieces.begin();
@@ -1874,7 +1838,7 @@ namespace libtorrent
#endif
TORRENT_ASSERT(j.buffer == 0);
mutex_t::scoped_lock l(m_piece_mutex);
mutex::scoped_lock l(m_piece_mutex);
INVARIANT_CHECK;
cache_t::iterator i = std::remove_if(
@@ -1929,8 +1893,7 @@ namespace libtorrent
if (sleep_time < 0) sleep_time = 0;
TORRENT_ASSERT(sleep_time < 5 * 1000);
boost::thread::sleep(boost::get_system_time()
+ boost::posix_time::milliseconds(sleep_time));
sleep(sleep_time);
}
m_last_file_check = time_now_hires();
#endif