storage fixes

This commit is contained in:
Arvid Norberg
2008-04-12 22:08:07 +00:00
parent dbfd400536
commit d4cfa126aa
11 changed files with 199 additions and 110 deletions

View File

@@ -331,11 +331,15 @@ namespace libtorrent
struct TORRENT_EXPORT file_error_alert: torrent_alert
{
file_error_alert(
const torrent_handle& h
std::string const& f
, const torrent_handle& h
, const std::string& msg)
: torrent_alert(h, alert::fatal, msg)
, file(f)
{}
std::string file;
virtual std::auto_ptr<alert> clone() const
{ return std::auto_ptr<alert>(new file_error_alert(*this)); }
};

View File

@@ -39,20 +39,20 @@ namespace libtorrent
{
namespace aux { class session_impl; }
class disk_io_thread;
struct TORRENT_EXPORT disk_buffer_holder
{
disk_buffer_holder(aux::session_impl& ses, char* buf)
: m_ses(ses), m_buf(buf) {}
disk_buffer_holder(aux::session_impl& ses, char* buf);
disk_buffer_holder(disk_io_thread& iothread, char* buf);
~disk_buffer_holder();
char* release();
char* buffer() const { return m_buf; }
private:
aux::session_impl& m_ses;
disk_io_thread& m_iothread;
char* m_buf;
};
}
#endif

View File

@@ -94,6 +94,10 @@ namespace libtorrent
// to the error message
std::string str;
// on error, this is set to the path of the
// file the disk operation failed on
std::string error_file;
// priority decides whether or not this
// job will skip entries in the queue or
// not. It always skips in front of entries

View File

@@ -161,9 +161,18 @@ namespace libtorrent
// non-zero return value indicates an error
virtual bool delete_files() = 0;
virtual std::string const& error() const = 0;
virtual std::string const& error_file() const = 0;
virtual void clear_error() = 0;
void set_error(std::string const& file, std::string const& msg) const
{
m_error_file = file;
m_error = msg;
}
std::string const& error() const { return m_error; }
std::string const& error_file() const { return m_error_file; }
void clear_error() { m_error.clear(); m_error_file.clear(); }
mutable std::string m_error;
mutable std::string m_error_file;
virtual ~storage_interface() {}
};
@@ -254,6 +263,7 @@ namespace libtorrent
void mark_failed(int index);
std::string const& error() const { return m_storage->error(); }
std::string const& error_file() const { return m_storage->error_file(); }
void clear_error() { m_storage->clear_error(); }
int slot_for(int piece) const;

View File

@@ -467,12 +467,12 @@ namespace libtorrent
// completed() is called immediately after it.
void finished();
void async_verify_piece(int piece_index, boost::function<void(bool)> const&);
void async_verify_piece(int piece_index, boost::function<void(int)> const&);
// this is called from the peer_connection
// each time a piece has failed the hash
// test
void piece_finished(int index, bool passed_hash_check);
void piece_finished(int index, int passed_hash_check);
void piece_failed(int index);
void received_redundant_data(int num_bytes)
{ TORRENT_ASSERT(num_bytes > 0); m_total_redundant_bytes += num_bytes; }
@@ -564,7 +564,7 @@ namespace libtorrent
void on_storage_moved(int ret, disk_io_job const& j);
void on_piece_verified(int ret, disk_io_job const& j
, boost::function<void(bool)> f);
, boost::function<void(int)> f);
void try_next_tracker();
int prioritize_tracker(int tracker_index);

View File

@@ -32,9 +32,23 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/disk_buffer_holder.hpp"
#include "libtorrent/aux_/session_impl.hpp"
#include "libtorrent/disk_io_thread.hpp"
namespace libtorrent
{
disk_buffer_holder::disk_buffer_holder(aux::session_impl& ses, char* buf)
: m_iothread(ses.m_disk_thread), m_buf(buf)
{
TORRENT_ASSERT(buf == 0 || m_iothread.is_disk_buffer(buf));
}
disk_buffer_holder::disk_buffer_holder(disk_io_thread& iothread, char* buf)
: m_iothread(iothread), m_buf(buf)
{
TORRENT_ASSERT(buf == 0 || m_iothread.is_disk_buffer(buf));
}
char* disk_buffer_holder::release()
{
char* ret = m_buf;
@@ -44,7 +58,7 @@ namespace libtorrent
disk_buffer_holder::~disk_buffer_holder()
{
if (m_buf) m_ses.free_disk_buffer(m_buf);
if (m_buf) m_iothread.free_buffer(m_buf);
}
}

View File

@@ -33,6 +33,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/storage.hpp"
#include <deque>
#include "libtorrent/disk_io_thread.hpp"
#include "libtorrent/disk_buffer_holder.hpp"
#include <boost/scoped_array.hpp>
#ifdef _WIN32
@@ -400,6 +401,7 @@ namespace libtorrent
l.unlock();
ret += p.storage->read_impl(buf.get(), p.piece, start_block * m_block_size, buffer_size);
l.lock();
if (!p.storage->error().empty()) { return -1; }
++m_cache_stats.reads;
}
@@ -419,6 +421,7 @@ namespace libtorrent
{
l.unlock();
ret += p.storage->read_impl(p.blocks[i], p.piece, piece_offset, block_size);
if (!p.storage->error().empty()) { return -1; }
l.lock();
++m_cache_stats.reads;
}
@@ -741,6 +744,13 @@ namespace libtorrent
m_signal.wait(l);
if (m_abort && m_jobs.empty()) return;
// if there's a buffer in this job, it will be freed
// when this holder is destructed, unless it has been
// released.
disk_buffer_holder holder(*this
, m_jobs.front().action != disk_io_job::check_fastresume
? m_jobs.front().buffer : 0);
boost::function<void(int, disk_io_job const&)> handler;
handler.swap(m_jobs.front().callback);
@@ -763,12 +773,13 @@ namespace libtorrent
std::string const& error_string = j.storage->error();
if (!error_string.empty())
{
#ifndef NDEBUG
std::cout << "ERROR: " << error_string << std::endl;
#endif
j.str = error_string;
j.error_file = j.storage->error_file();
j.storage->clear_error();
ret = -1;
#ifndef NDEBUG
std::cout << "ERROR: " << error_string << " " << j.error_file << std::endl;
#endif
}
else
{
@@ -791,15 +802,16 @@ namespace libtorrent
break;
}
disk_buffer_holder read_holder(*this, j.buffer);
ret = try_read_from_cache(j, l);
// -2 means there's no space in the read cache
// or that the read cache is disabled
if (ret == -1)
{
free_buffer(j.buffer, l);
j.buffer = 0;
j.str = j.storage->error();
j.error_file = j.storage->error_file();
j.storage->clear_error();
break;
}
@@ -810,14 +822,15 @@ namespace libtorrent
, j.buffer_size);
if (ret < 0)
{
free_buffer(j.buffer);
j.str = j.storage->error();
j.error_file = j.storage->error_file();
j.storage->clear_error();
break;
}
l.lock();
++m_cache_stats.blocks_read;
}
read_holder.release();
break;
}
case disk_io_job::write:
@@ -849,6 +862,10 @@ namespace libtorrent
{
cache_block(j, l);
}
// we've now inserted the buffer
// in the cache, we should not
// free it at the end
holder.release();
if (m_cache_stats.cache_size >= m_cache_size)
flush_oldest_piece(l);
break;
@@ -864,19 +881,34 @@ namespace libtorrent
cache_t::iterator i
= find_cached_piece(m_pieces, j, l);
if (i != m_pieces.end()) flush_and_remove(i, l);
if (i != m_pieces.end())
{
flush_and_remove(i, l);
std::string const& e = j.storage->error();
if (!e.empty())
{
j.str = e;
j.error_file = j.storage->error_file();
ret = -1;
j.storage->clear_error();
j.storage->mark_failed(j.piece);
break;
}
}
l.unlock();
sha1_hash h = j.storage->hash_for_piece_impl(j.piece);
std::string const& e = j.storage->error();
if (!e.empty())
{
j.str = e;
j.error_file = j.storage->error_file();
ret = -1;
j.storage->clear_error();
j.storage->mark_failed(j.piece);
break;
}
ret = (j.storage->info()->hash_for_piece(j.piece) == h)?0:-1;
if (ret == -1) j.storage->mark_failed(j.piece);
ret = (j.storage->info()->hash_for_piece(j.piece) == h)?0:-2;
if (ret == -2) j.storage->mark_failed(j.piece);
break;
}
case disk_io_job::move_storage:
@@ -889,6 +921,7 @@ namespace libtorrent
if (ret != 0)
{
j.str = j.storage->error();
j.error_file = j.storage->error_file();
j.storage->clear_error();
break;
}
@@ -925,6 +958,7 @@ namespace libtorrent
if (ret != 0)
{
j.str = j.storage->error();
j.error_file = j.storage->error_file();
j.storage->clear_error();
}
break;
@@ -962,6 +996,7 @@ namespace libtorrent
if (ret != 0)
{
j.str = j.storage->error();
j.error_file = j.storage->error_file();
j.storage->clear_error();
}
break;
@@ -1030,7 +1065,7 @@ namespace libtorrent
#ifndef BOOST_NO_EXCEPTIONS
} catch (std::exception&)
{
if (j.buffer) free_buffer(j.buffer);
TORRENT_ASSERT(false);
}
#endif
}

View File

@@ -297,8 +297,7 @@ namespace libtorrent
if (!view.valid())
{
m_error = "failed to open file for reading";
m_error_file = (m_save_path / file_iter->path).string();
set_error((m_save_path / file_iter->path).string(), "failed to open file for reading");
return -1;
}
TORRENT_ASSERT(view.const_addr() != 0);
@@ -359,9 +358,7 @@ namespace libtorrent
if (!view.valid())
{
m_error = "failed to open file '";
m_error += (m_save_path / file_iter->path).string();
m_error += "'for reading";
set_error((m_save_path / file_iter->path).string(), "failed to open for reading");
return -1;
}
TORRENT_ASSERT(view.const_addr() != 0);
@@ -370,7 +367,7 @@ namespace libtorrent
}
catch (std::exception& e)
{
m_error = e.what();
set_error("", e.what());
return -1;
}
@@ -417,8 +414,7 @@ namespace libtorrent
if (!view.valid())
{
m_error = "failed to open file for writing";
m_error_file = (m_save_path / file_iter->path).string();
set_error((m_save_path / file_iter->path).string(), "failed to open file for writing");
return -1;
}
TORRENT_ASSERT(view.addr() != 0);
@@ -475,8 +471,7 @@ namespace libtorrent
if (!view.valid())
{
m_error = "failed to open file for reading";
m_error_file = (m_save_path / file_iter->path).string();
set_error((m_save_path / file_iter->path).string(), "failed to open file for reading");
return -1;
}
TORRENT_ASSERT(view.addr() != 0);
@@ -485,8 +480,7 @@ namespace libtorrent
}
catch (std::exception& e)
{
m_error = e.what();
m_error_file = (m_save_path / file_iter->path).string();
set_error((m_save_path / file_iter->path).string(), e.what());
return -1;
}
return size;
@@ -644,8 +638,7 @@ namespace libtorrent
{
if (rd.type() != entry::dictionary_t)
{
m_error = "invalid fastresume file";
m_error_file.clear();
set_error("", "invalid fastresume file");
return true;
}
std::vector<std::pair<size_type, std::time_t> > file_sizes
@@ -792,18 +785,10 @@ namespace libtorrent
}
}
if (!error.empty())
{
m_error.swap(error);
m_error_file.swap(error_file);
}
if (!error.empty()) set_error(error_file, error);
return result != 0;
}
std::string const& error() const { return m_error; }
std::string const& error_file() const { return m_error_file; }
void clear_error() { m_error.clear(); m_error_file.clear(); }
private:
boost::intrusive_ptr<torrent_info const> m_info;
@@ -813,9 +798,6 @@ namespace libtorrent
buffer m_scratch_buffer;
static mapped_file_pool m_pool;
mutable std::string m_error;
mutable std::string m_error_file;
};
storage_interface* mapped_storage_constructor(boost::intrusive_ptr<torrent_info const> ti

View File

@@ -1587,8 +1587,7 @@ namespace libtorrent
if (t->alerts().should_post(alert::fatal))
{
std::string err = "torrent paused: disk write error, " + j.str;
t->alerts().post_alert(file_error_alert(t->get_handle(), err));
t->alerts().post_alert(file_error_alert(j.error_file, t->get_handle(), j.str));
}
t->pause();
return;
@@ -2641,13 +2640,7 @@ namespace libtorrent
if (t->alerts().should_post(alert::fatal))
{
std::string err = "torrent paused: disk read error";
if (!j.str.empty())
{
err += ", ";
err += j.str;
}
t->alerts().post_alert(file_error_alert(t->get_handle(), err));
t->alerts().post_alert(file_error_alert(j.error_file, t->get_handle(), j.str));
}
t->pause();
return;

View File

@@ -376,10 +376,6 @@ namespace libtorrent
bool write_resume_data(entry& rd) const;
sha1_hash hash_for_slot(int slot, partial_hash& ph, int piece_size);
std::string const& error() const { return m_error; }
std::string const& error_file() const { return m_error_file; }
void clear_error() { m_error.clear(); m_error_file.clear(); }
int read_impl(char* buf, int slot, int offset, int size, bool fill_zero);
~storage()
@@ -394,9 +390,6 @@ namespace libtorrent
// temporary storage for moving pieces
buffer m_scratch_buffer;
mutable std::string m_error;
mutable std::string m_error_file;
};
sha1_hash storage::hash_for_slot(int slot, partial_hash& ph, int piece_size)
@@ -406,7 +399,7 @@ namespace libtorrent
hasher whole;
int slot_size1 = piece_size;
m_scratch_buffer.resize(slot_size1);
read_impl(&m_scratch_buffer[0], slot, 0, slot_size1, true);
int read_result = read_impl(&m_scratch_buffer[0], slot, 0, slot_size1, true);
if (ph.offset > 0)
partial.update(&m_scratch_buffer[0], ph.offset);
whole.update(&m_scratch_buffer[0], slot_size1);
@@ -469,8 +462,7 @@ namespace libtorrent
}
catch (std::exception& e)
{
m_error_file = (m_save_path / file_iter->path).string();
m_error = e.what();
set_error((m_save_path / file_iter->path).string(), e.what());
return true;
}
#endif
@@ -493,8 +485,7 @@ namespace libtorrent
}
catch (std::exception& e)
{
m_error_file = (m_save_path / file_iter->path).string();
m_error = e.what();
set_error((m_save_path / file_iter->path).string(), e.what());
return true;
}
#endif
@@ -570,8 +561,7 @@ namespace libtorrent
{
if (rd.type() != entry::dictionary_t)
{
m_error_file.clear();
m_error = "invalid fastresume file";
set_error("", "invalid fastresume file");
return true;
}
std::vector<std::pair<size_type, std::time_t> > file_sizes
@@ -860,14 +850,12 @@ namespace libtorrent
, error));
if (!in)
{
m_error_file = (m_save_path / file_iter->path).string();
m_error = error;
set_error((m_save_path / file_iter->path).string(), error);
return -1;
}
if (!in->error().empty())
{
m_error_file = (m_save_path / file_iter->path).string();
m_error = in->error();
set_error((m_save_path / file_iter->path).string(), in->error());
return -1;
}
TORRENT_ASSERT(file_offset < file_iter->size);
@@ -879,8 +867,7 @@ namespace libtorrent
// the file was not big enough
if (!fill_zero)
{
m_error_file = (m_save_path / file_iter->path).string();
m_error = "seek failed";
set_error((m_save_path / file_iter->path).string(), "seek failed");
return -1;
}
std::memset(buf + buf_pos, 0, size - buf_pos);
@@ -930,8 +917,7 @@ namespace libtorrent
if (actual_read > 0) buf_pos += actual_read;
if (!fill_zero)
{
m_error_file = (m_save_path / file_iter->path).string();
m_error = "read failed";
set_error((m_save_path / file_iter->path).string(), "read failed");
return -1;
}
std::memset(buf + buf_pos, 0, size - buf_pos);
@@ -960,14 +946,12 @@ namespace libtorrent
this, path, file::in, error);
if (!in)
{
m_error_file = path.string();
m_error = error;
set_error(path.string(), error);
return -1;
}
if (!in->error().empty())
{
m_error_file = (m_save_path / file_iter->path).string();
m_error = in->error();
set_error((m_save_path / file_iter->path).string(), in->error());
return -1;
}
size_type pos = in->seek(file_iter->file_base);
@@ -975,8 +959,7 @@ namespace libtorrent
{
if (!fill_zero)
{
m_error_file = (m_save_path / file_iter->path).string();
m_error = "seek failed";
set_error((m_save_path / file_iter->path).string(), "seek failed");
return -1;
}
std::memset(buf + buf_pos, 0, size - buf_pos);
@@ -987,7 +970,6 @@ namespace libtorrent
return result;
}
// throws file_error if it fails to write
int storage::write(
const char* buf
, int slot
@@ -1029,14 +1011,12 @@ namespace libtorrent
if (!out)
{
m_error_file += p.string();
m_error = error;
set_error(p.string(), error);
return -1;
}
if (!out->error().empty())
{
m_error_file += p.string();
m_error = out->error();
set_error(p.string(), out->error());
return -1;
}
TORRENT_ASSERT(file_offset < file_iter->size);
@@ -1046,8 +1026,7 @@ namespace libtorrent
if (pos != file_offset + file_iter->file_base)
{
m_error_file = (m_save_path / file_iter->path).string();
m_error = "seek failed";
set_error((m_save_path / file_iter->path).string(), "seek failed");
return -1;
}
@@ -1085,8 +1064,7 @@ namespace libtorrent
if (written != write_bytes)
{
m_error_file = (m_save_path / file_iter->path).string();
m_error = "write failed";
set_error((m_save_path / file_iter->path).string(), "write failed");
return -1;
}
@@ -1113,14 +1091,12 @@ namespace libtorrent
if (!out)
{
m_error_file = p.string();
m_error = error;
set_error(p.string(), error);
return -1;
}
if (!out->error().empty())
{
m_error_file += p.string();
m_error = out->error();
set_error(p.string(), out->error());
return -1;
}
@@ -1128,8 +1104,7 @@ namespace libtorrent
if (pos != file_iter->file_base)
{
m_error_file = (m_save_path / file_iter->path).string();
m_error = "seek failed";
set_error((m_save_path / file_iter->path).string(), "seek failed");
return -1;
}
}
@@ -1380,12 +1355,28 @@ namespace libtorrent
TORRENT_ASSERT(size > 0);
TORRENT_ASSERT(piece_index >= 0 && piece_index < m_info->num_pieces());
int slot = allocate_slot_for_piece(piece_index);
int ret = m_storage->write(buf, slot, offset, size);
// only save the partial hash if the write succeeds
if (ret != size) return ret;
// std::ofstream out("partial_hash.log", std::ios::app);
if (offset == 0)
{
partial_hash& ph = m_piece_hasher[piece_index];
TORRENT_ASSERT(ph.offset == 0);
ph.offset = size;
ph.h.update(buf, size);
/*
out << time_now_string() << " NEW ["
" s: " << this
<< " p: " << piece_index
<< " off: " << offset
<< " size: " << size
<< " entries: " << m_piece_hasher.size()
<< " ]" << std::endl;
*/
}
else
{
@@ -1399,14 +1390,43 @@ namespace libtorrent
#endif
if (offset == i->second.offset)
{
/*
out << time_now_string() << " UPDATING ["
" s: " << this
<< " p: " << piece_index
<< " off: " << offset
<< " size: " << size
<< " entries: " << m_piece_hasher.size()
<< " ]" << std::endl;
*/
i->second.offset += size;
i->second.h.update(buf, size);
}
/* else
{
out << time_now_string() << " SKIPPING (out of order) ["
" s: " << this
<< " p: " << piece_index
<< " off: " << offset
<< " size: " << size
<< " entries: " << m_piece_hasher.size()
<< " ]" << std::endl;
}
*/ }
/* else
{
out << time_now_string() << " SKIPPING (no entry) ["
" s: " << this
<< " p: " << piece_index
<< " off: " << offset
<< " size: " << size
<< " entries: " << m_piece_hasher.size()
<< " ]" << std::endl;
}
*/
}
int slot = allocate_slot_for_piece(piece_index);
return m_storage->write(buf, slot, offset, size);
return ret;
}
int piece_manager::identify_data(

View File

@@ -433,7 +433,7 @@ namespace libtorrent
{
if (m_ses.m_alerts.should_post(alert::fatal))
{
m_ses.m_alerts.post_alert(file_error_alert(get_handle(), j.str));
m_ses.m_alerts.post_alert(file_error_alert(j.error_file, get_handle(), j.str));
#if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING
(*m_ses.m_logger) << time_now_string() << ": fatal disk error ["
" error: " << j.str <<
@@ -519,6 +519,7 @@ namespace libtorrent
std::fill(m_have_pieces.begin(), m_have_pieces.end(), false);
if (!fastresume_rejected)
{
TORRENT_ASSERT(m_resume_data.type() == entry::dictionary_t);
// parse slots
entry const* slots_ent = m_resume_data.find_key("slots");
if (slots_ent != 0 && slots_ent->type() == entry::list_t)
@@ -614,7 +615,7 @@ namespace libtorrent
{
if (m_ses.m_alerts.should_post(alert::fatal))
{
m_ses.m_alerts.post_alert(file_error_alert(get_handle(), j.str));
m_ses.m_alerts.post_alert(file_error_alert(j.error_file, get_handle(), j.str));
#if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING
(*m_ses.m_logger) << time_now_string() << ": fatal disk error ["
" error: " << j.str <<
@@ -1138,7 +1139,11 @@ namespace libtorrent
return make_tuple(total_done, wanted_done);
}
void torrent::piece_finished(int index, bool passed_hash_check)
// passed_hash_check
// 0: success, piece passed check
// -1: disk failure
// -2: piece failed check
void torrent::piece_finished(int index, int passed_hash_check)
{
session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
@@ -1151,7 +1156,7 @@ namespace libtorrent
bool was_finished = m_picker->num_filtered() + num_pieces()
== torrent_file().num_pieces();
if (passed_hash_check)
if (passed_hash_check == 0)
{
if (m_ses.m_alerts.should_post(alert::debug))
{
@@ -1176,12 +1181,17 @@ namespace libtorrent
finished();
}
}
else
else if (passed_hash_check == -2)
{
piece_failed(index);
}
else
{
TORRENT_ASSERT(passed_hash_check == -1);
m_picker->restore_piece(index);
}
m_policy.piece_finished(index, passed_hash_check);
m_policy.piece_finished(index, passed_hash_check == 0);
if (!was_seed && is_seed())
{
@@ -2859,6 +2869,9 @@ namespace libtorrent
{
session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
TORRENT_ASSERT(m_resume_data.type() == entry::dictionary_t
|| m_resume_data.type() == entry::undefined_t);
TORRENT_ASSERT(m_bandwidth_queue[0].size() <= m_connections.size());
TORRENT_ASSERT(m_bandwidth_queue[1].size() <= m_connections.size());
@@ -3271,7 +3284,7 @@ namespace libtorrent
m_deficit_counter += points;
}
void torrent::async_verify_piece(int piece_index, boost::function<void(bool)> const& f)
void torrent::async_verify_piece(int piece_index, boost::function<void(int)> const& f)
{
// INVARIANT_CHECK;
@@ -3299,10 +3312,24 @@ namespace libtorrent
}
void torrent::on_piece_verified(int ret, disk_io_job const& j
, boost::function<void(bool)> f)
, boost::function<void(int)> f)
{
session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
f(ret == 0);
// return value:
// 0: success, piece passed hash check
// -1: disk failure
// -2: hash check failed
if (ret == -1)
{
if (alerts().should_post(alert::fatal))
{
alerts().post_alert(file_error_alert(j.error_file, get_handle(), j.str));
}
pause();
}
f(ret);
}
const tcp::endpoint& torrent::current_tracker() const