optimize disk I/O elevator algorithm to spend less time picking job

This commit is contained in:
Arvid Norberg
2010-01-12 01:56:48 +00:00
parent c76ebe23c7
commit a1299c3a10
6 changed files with 174 additions and 163 deletions

View File

@@ -52,6 +52,10 @@ POSSIBILITY OF SUCH DAMAGE.
namespace libtorrent
{
bool should_cancel_on_abort(disk_io_job const& j);
bool is_read_operation(disk_io_job const& j);
bool operation_has_buffer(disk_io_job const& j);
disk_buffer_pool::disk_buffer_pool(int block_size)
: m_block_size(block_size)
, m_in_use(0)
@@ -371,15 +375,9 @@ namespace libtorrent
++i;
continue;
}
if (i->action == disk_io_job::read)
if (should_cancel_on_abort(*i))
{
post_callback(i->callback, *i, -1);
m_jobs.erase(i++);
continue;
}
if (i->action == disk_io_job::check_files)
{
post_callback(i->callback, *i, piece_manager::disk_check_aborted);
post_callback(i->callback, *i, -3);
m_jobs.erase(i++);
continue;
}
@@ -1320,34 +1318,34 @@ namespace libtorrent
enum action_flags_t
{
read_operation = 1
, fence_operation = 2
, buffer_operation = 4
, buffer_operation = 2
, cancel_on_abort = 4
};
static const boost::uint8_t action_flags[] =
{
read_operation + buffer_operation // read
read_operation + buffer_operation + cancel_on_abort // read
, buffer_operation // write
, 0 // hash
, fence_operation // move_storage
, fence_operation // release_files
, fence_operation // delete_files
, fence_operation // check_fastresume
, read_operation // check_files
, fence_operation // save_resume_data
, fence_operation // rename_file
, fence_operation // abort_thread
, fence_operation // clear_read_cache
, fence_operation // abort_torrent
, 0 // update_settings
, read_operation // read_and_hash
, 0 // move_storage
, 0 // release_files
, 0 // delete_files
, 0 // check_fastresume
, read_operation + cancel_on_abort // check_files
, 0 // save_resume_data
, 0 // rename_file
, 0 // abort_thread
, 0 // clear_read_cache
, 0 // abort_torrent
, cancel_on_abort // update_settings
, read_operation + cancel_on_abort // read_and_hash
, 0 // finalize_file
};
bool is_fence_operation(disk_io_job const& j)
bool should_cancel_on_abort(disk_io_job const& j)
{
TORRENT_ASSERT(j.action >= 0 && j.action < sizeof(action_flags));
return action_flags[j.action] & fence_operation;
return action_flags[j.action] & cancel_on_abort;
}
bool is_read_operation(disk_io_job const& j)
@@ -1364,9 +1362,13 @@ namespace libtorrent
void disk_io_thread::thread_fun()
{
size_type elevator_position = 0;
// 1 = forward in list, -1 = backwards in list
int elevator_direction = 1;
typedef std::multimap<size_type, disk_io_job> read_jobs_t;
read_jobs_t sorted_read_jobs;
read_jobs_t::iterator elevator_job_pos = sorted_read_jobs.begin();
for (;;)
{
#ifdef TORRENT_DISK_STATS
@@ -1374,7 +1376,7 @@ namespace libtorrent
#endif
mutex::scoped_lock jl(m_queue_mutex);
while (m_jobs.empty() && !m_abort)
while (m_jobs.empty() && sorted_read_jobs.empty() && !m_abort)
{
// if there hasn't been an event in one second
// see if we should flush the cache
@@ -1404,111 +1406,96 @@ namespace libtorrent
return;
}
std::list<disk_io_job>::iterator selected_job = m_jobs.begin();
disk_io_job j;
if (m_settings.allow_reordered_disk_operations
&& is_read_operation(*selected_job))
if (!m_jobs.empty())
{
// Before reading the current block, read any
// blocks between the read head and the queued
// block, elevator style
// we have a job in the job queue. If it's
// a read operation and we are allowed to
// reorder jobs, sort it into the read job
// list and continue, otherwise just pop it
// and use it later
j = m_jobs.front();
m_jobs.pop_front();
jl.unlock();
std::list<disk_io_job>::iterator best_job, i;
size_type score, best_score = (size_type) -1;
bool defer = false;
for (;;)
if (is_read_operation(j))
{
for (i = m_jobs.begin(); i != m_jobs.end(); ++i)
defer = true;
// at this point the operation we're looking
// at is a read operation. If this read operation
// can be fully satisfied by the read cache, handle
// it immediately
if (m_settings.use_read_cache)
{
// ignore fence_operations
if (is_fence_operation(*i))
continue;
// always prioritize all disk-I/O jobs
// that are not read operations
if (!is_read_operation(*i))
{
best_job = i;
best_score = 0;
break;
}
// at this point the operation we're looking
// at is a read operation. If this read operation
// can be fully satisfied by the read cache, handle
// it immediately
if (m_settings.use_read_cache)
{
// unfortunately we need to lock the cache
// if the cache querying function would be
// made asyncronous, this would not be
// necessary anymore
mutex::scoped_lock l(m_piece_mutex);
cache_t::iterator p
= find_cached_piece(m_read_pieces, *i, l);
// if it's a cache hit, process the job immediately
if (p != m_read_pieces.end() && is_cache_hit(p, *i, l))
{
best_job = i;
best_score = 0;
break;
}
}
// we only need to query for physical offset
// for read operations, since those are
// the only ones we re-order
if (i->phys_offset == -1)
i->phys_offset = i->storage->physical_offset(i->piece, i->offset);
if (elevator_direction > 0)
{
score = i->phys_offset - elevator_position;
if (i->phys_offset >= elevator_position
&& (score < best_score
|| best_score == (size_type)-1))
{
best_score = score;
best_job = i;
}
}
else
{
score = elevator_position - i->phys_offset;
if (i->phys_offset <= elevator_position
&& (score < best_score
|| best_score == (size_type)-1))
{
best_score = score;
best_job = i;
}
}
#ifdef TORRENT_DISK_STATS
m_log << log_time() << " check_cache_hit" << std::endl;
#endif
// unfortunately we need to lock the cache
// if the cache querying function would be
// made asyncronous, this would not be
// necessary anymore
mutex::scoped_lock l(m_piece_mutex);
cache_t::iterator p
= find_cached_piece(m_read_pieces, j, l);
// if it's a cache hit, process the job immediately
if (p != m_read_pieces.end() && is_cache_hit(p, j, l))
defer = false;
}
if (best_score != (size_type) -1)
break;
elevator_direction = -elevator_direction;
}
selected_job = best_job;
// only update the elevator position for read jobs
if (is_read_operation(*selected_job))
elevator_position = selected_job->phys_offset;
if (m_settings.allow_reordered_disk_operations && defer)
{
#ifdef TORRENT_DISK_STATS
m_log << log_time() << " sorting_job" << std::endl;
#endif
size_type phys_off = j.storage->physical_offset(j.piece, j.offset);
sorted_read_jobs.insert(std::pair<size_type, disk_io_job>(phys_off, j));
continue;
}
}
else
{
// the job queue is empty, pick the next read job
// from the sorted job list. So we don't need the
// job queue lock anymore
jl.unlock();
TORRENT_ASSERT(!sorted_read_jobs.empty());
// if we've reached the end, change the elevator direction
if (elevator_job_pos == sorted_read_jobs.end() && elevator_direction == 1)
{
elevator_direction = -1;
--elevator_job_pos;
}
j = elevator_job_pos->second;
read_jobs_t::iterator to_erase = elevator_job_pos;
// if we've reached the begining of the sorted list,
// change the elvator direction
if (elevator_job_pos == sorted_read_jobs.begin() && elevator_direction == -1)
elevator_direction = 1;
// move the elevator before erasing the job we're processing
// to keep the iterator valid
if (elevator_direction > 0) ++elevator_job_pos;
else --elevator_job_pos;
sorted_read_jobs.erase(to_erase);
}
// if there's a buffer in this job, it will be freed
// when this holder is destructed, unless it has been
// released.
disk_buffer_holder holder(*this
, operation_has_buffer(*selected_job) ? selected_job->buffer : 0);
, operation_has_buffer(j) ? j.buffer : 0);
boost::function<void(int, disk_io_job const&)> handler;
handler.swap(selected_job->callback);
disk_io_job j = *selected_job;
m_jobs.erase(selected_job);
if (j.action == disk_io_job::write)
{
TORRENT_ASSERT(m_queue_buffer_size >= j.buffer_size);
@@ -1526,7 +1513,6 @@ namespace libtorrent
// can trigger all the connections waiting for this event
post = true;
}
jl.unlock();
if (post) m_ios.post(m_queue_callback);
@@ -1576,14 +1562,27 @@ namespace libtorrent
++i;
continue;
}
if (i->action == disk_io_job::check_files)
if (should_cancel_on_abort(*i))
{
post_callback(i->callback, *i, piece_manager::disk_check_aborted);
post_callback(i->callback, *i, -3);
m_jobs.erase(i++);
continue;
}
++i;
}
jl.unlock();
// now clear all the read jobs
for (read_jobs_t::iterator i = sorted_read_jobs.begin();
i != sorted_read_jobs.end();)
{
if (i->second.storage != j.storage)
{
++i;
continue;
}
post_callback(i->second.callback, i->second, -3);
sorted_read_jobs.erase(i++);
}
break;
}
case disk_io_job::abort_thread:
@@ -1591,25 +1590,33 @@ namespace libtorrent
#ifdef TORRENT_DISK_STATS
m_log << log_time() << " abort_thread " << std::endl;
#endif
// clear all read jobs
mutex::scoped_lock jl(m_queue_mutex);
for (std::list<disk_io_job>::iterator i = m_jobs.begin();
i != m_jobs.end();)
{
if (i->action == disk_io_job::read)
if (should_cancel_on_abort(*i))
{
post_callback(i->callback, *i, -1);
m_jobs.erase(i++);
continue;
}
if (i->action == disk_io_job::check_files)
{
post_callback(i->callback, *i, piece_manager::disk_check_aborted);
post_callback(i->callback, *i, -3);
m_jobs.erase(i++);
continue;
}
++i;
}
jl.unlock();
for (read_jobs_t::iterator i = sorted_read_jobs.begin();
i != sorted_read_jobs.end();)
{
if (i->second.storage != j.storage)
{
++i;
continue;
}
post_callback(i->second.callback, i->second, -3);
sorted_read_jobs.erase(i++);
}
m_abort = true;
break;
@@ -1777,7 +1784,7 @@ namespace libtorrent
--p->num_blocks;
}
p->blocks[block].buf = j.buffer;
p->blocks[block].callback.swap(handler);
p->blocks[block].callback.swap(j.callback);
#ifdef TORRENT_DISK_STATS
rename_buffer(j.buffer, "write cache");
#endif
@@ -1790,7 +1797,7 @@ namespace libtorrent
}
else
{
if (cache_block(j, handler, l) < 0)
if (cache_block(j, j.callback, l) < 0)
{
l.unlock();
file::iovec_t iov = {j.buffer, j.buffer_size};
@@ -1998,9 +2005,9 @@ namespace libtorrent
#ifndef BOOST_NO_EXCEPTIONS
try {
#endif
TORRENT_ASSERT(handler);
if (handler && ret == piece_manager::need_full_check)
post_callback(handler, j, ret);
TORRENT_ASSERT(j.callback);
if (j.callback && ret == piece_manager::need_full_check)
post_callback(j.callback, j, ret);
#ifndef BOOST_NO_EXCEPTIONS
} catch (std::exception&) {}
#endif
@@ -2016,7 +2023,7 @@ namespace libtorrent
// if the check is not done, add it at the end of the job queue
if (ret == piece_manager::need_full_check)
{
add_job(j, handler);
add_job(j, j.callback);
continue;
}
break;
@@ -2057,7 +2064,7 @@ namespace libtorrent
}
#endif
// if (!handler) std::cerr << "DISK THREAD: no callback specified" << std::endl;
// if (!j.callback) std::cerr << "DISK THREAD: no callback specified" << std::endl;
// else std::cerr << "DISK THREAD: invoking callback" << std::endl;
#ifndef BOOST_NO_EXCEPTIONS
try {
@@ -2069,7 +2076,7 @@ namespace libtorrent
&& j.buffer != 0)
rename_buffer(j.buffer, "posted send buffer");
#endif
post_callback(handler, j, ret);
post_callback(j.callback, j, ret);
#ifndef BOOST_NO_EXCEPTIONS
} catch (std::exception&)
{