From 02668e8f75459b586bc1ccf5b10bb997cac626b8 Mon Sep 17 00:00:00 2001 From: Arvid Norberg Date: Thu, 26 Aug 2010 17:00:24 +0000 Subject: [PATCH] fix message posting to work from multiple threads --- CMakeLists.txt | 1 + Jamfile | 1 + include/libtorrent/alert.hpp | 2 +- include/libtorrent/disk_io_thread.hpp | 2 +- include/libtorrent/thread.hpp | 29 ++++--- src/Makefile.am | 1 + src/session.cpp | 25 +++--- src/thread.cpp | 109 ++++++++++++++++++++++++++ src/torrent_handle.cpp | 10 +-- test/Jamfile | 1 + test/setup_transfer.cpp | 4 +- test/test_storage.cpp | 4 +- test/test_threads.cpp | 73 +++++++++++++++++ 13 files changed, 219 insertions(+), 43 deletions(-) create mode 100644 src/thread.cpp create mode 100644 test/test_threads.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 9e31b9c58..d68e0674a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -40,6 +40,7 @@ set(sources socks5_stream stat storage + thread time torrent torrent_handle diff --git a/Jamfile b/Jamfile index 1c0a1c2e5..ceb91ff49 100755 --- a/Jamfile +++ b/Jamfile @@ -409,6 +409,7 @@ SOURCES = magnet_uri parse_url ConvertUTF + thread # -- extensions -- metadata_transfer diff --git a/include/libtorrent/alert.hpp b/include/libtorrent/alert.hpp index fb62c3c76..32d9b6a4d 100644 --- a/include/libtorrent/alert.hpp +++ b/include/libtorrent/alert.hpp @@ -150,7 +150,7 @@ namespace libtorrent { private: std::deque m_alerts; mutable mutex m_mutex; - condition m_condition; + event m_condition; int m_alert_mask; size_t m_queue_size_limit; boost::function)> m_dispatch; diff --git a/include/libtorrent/disk_io_thread.hpp b/include/libtorrent/disk_io_thread.hpp index b418262fe..b32f26d14 100644 --- a/include/libtorrent/disk_io_thread.hpp +++ b/include/libtorrent/disk_io_thread.hpp @@ -406,7 +406,7 @@ namespace libtorrent // this mutex only protects m_jobs, m_queue_buffer_size // and m_abort mutable mutex m_queue_mutex; - condition m_signal; + event m_signal; bool m_abort; bool m_waiting_to_shutdown; std::list m_jobs; diff --git a/include/libtorrent/thread.hpp b/include/libtorrent/thread.hpp index 2c417098a..cdd7dc15c 100644 --- a/include/libtorrent/thread.hpp +++ b/include/libtorrent/thread.hpp @@ -44,26 +44,31 @@ POSSIBILITY OF SUCH DAMAGE. #include #include -#ifdef TORRENT_BEOS -#include -#endif - namespace libtorrent { typedef boost::asio::detail::thread thread; typedef boost::asio::detail::mutex mutex; - typedef boost::asio::detail::event condition; + typedef boost::asio::detail::event event; - inline void sleep(int milliseconds) + void sleep(int milliseconds); + + struct condition { -#if defined TORRENT_WINDOWS || defined TORRENT_CYGWIN - Sleep(milliseconds); -#elif defined TORRENT_BEOS - snooze_until(system_time() + boost::int64_t(milliseconds) * 1000, B_SYSTEM_TIMEBASE); + condition(); + ~condition(); + void wait(mutex::scoped_lock& l); + void signal_all(mutex::scoped_lock& l); + private: +#ifdef BOOST_HAS_PTHREADS + pthread_cond_t m_cond; +#elif defined TORRENT_WINDOWS || defined TORRENT_CYGWIN + HANDLE m_sem; + mutex m_mutex; + int m_num_waiters; #else - usleep(milliseconds * 1000); +#error not implemented #endif - } + }; } #endif diff --git a/src/Makefile.am b/src/Makefile.am index 9c804f86f..ef8d06ffa 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -69,6 +69,7 @@ libtorrent_rasterbar_la_SOURCES = \ socks5_stream.cpp \ stat.cpp \ storage.cpp \ + thread.cpp \ torrent.cpp \ torrent_handle.cpp \ torrent_info.cpp \ diff --git a/src/session.cpp b/src/session.cpp index 5eb898327..8f729b0f3 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -237,7 +237,7 @@ namespace libtorrent *ret = f(); mutex::scoped_lock l(*m); *done = true; - e->signal(l); + e->signal_all(l); } void fun_wrap(bool* done, condition* e, mutex* m, boost::function f) @@ -245,7 +245,7 @@ namespace libtorrent f(); mutex::scoped_lock l(*m); *done = true; - e->signal(l); + e->signal_all(l); } #define TORRENT_ASYNC_CALL(x) \ @@ -260,55 +260,48 @@ namespace libtorrent #define TORRENT_SYNC_CALL(x) \ bool done = false; \ mutex::scoped_lock l(m_impl->mut); \ - m_impl->cond.clear(l); \ m_impl->m_io_service.post(boost::bind(&fun_wrap, &done, &m_impl->cond, &m_impl->mut, boost::function(boost::bind(&session_impl:: x, m_impl.get())))); \ - do { m_impl->cond.wait(l); m_impl->cond.clear(l); } while(!done) + do { m_impl->cond.wait(l); } while(!done) #define TORRENT_SYNC_CALL1(x, a1) \ bool done = false; \ mutex::scoped_lock l(m_impl->mut); \ - m_impl->cond.clear(l); \ m_impl->m_io_service.post(boost::bind(&fun_wrap, &done, &m_impl->cond, &m_impl->mut, boost::function(boost::bind(&session_impl:: x, m_impl.get(), a1)))); \ - do { m_impl->cond.wait(l); m_impl->cond.clear(l); } while(!done) + do { m_impl->cond.wait(l); } while(!done) #define TORRENT_SYNC_CALL2(x, a1, a2) \ bool done = false; \ mutex::scoped_lock l(m_impl->mut); \ - m_impl->cond.clear(l); \ m_impl->m_io_service.post(boost::bind(&fun_wrap, &done, &m_impl->cond, &m_impl->mut, boost::function(boost::bind(&session_impl:: x, m_impl.get(), a1, a2)))); \ - do { m_impl->cond.wait(l); m_impl->cond.clear(l); } while(!done) + do { m_impl->cond.wait(l); } while(!done) #define TORRENT_SYNC_CALL_RET(type, x) \ bool done = false; \ type r; \ mutex::scoped_lock l(m_impl->mut); \ - m_impl->cond.clear(l); \ m_impl->m_io_service.post(boost::bind(&fun_ret, &r, &done, &m_impl->cond, &m_impl->mut, boost::function(boost::bind(&session_impl:: x, m_impl.get())))); \ - do { m_impl->cond.wait(l); m_impl->cond.clear(l); } while(!done) + do { m_impl->cond.wait(l); } while(!done) #define TORRENT_SYNC_CALL_RET1(type, x, a1) \ bool done = false; \ type r; \ mutex::scoped_lock l(m_impl->mut); \ - m_impl->cond.clear(l); \ m_impl->m_io_service.post(boost::bind(&fun_ret, &r, &done, &m_impl->cond, &m_impl->mut, boost::function(boost::bind(&session_impl:: x, m_impl.get(), a1)))); \ - do { m_impl->cond.wait(l); m_impl->cond.clear(l); } while(!done) + do { m_impl->cond.wait(l); } while(!done) #define TORRENT_SYNC_CALL_RET2(type, x, a1, a2) \ bool done = false; \ type r; \ mutex::scoped_lock l(m_impl->mut); \ - m_impl->cond.clear(l); \ m_impl->m_io_service.post(boost::bind(&fun_ret, &r, &done, &m_impl->cond, &m_impl->mut, boost::function(boost::bind(&session_impl:: x, m_impl.get(), a1, a2)))); \ - do { m_impl->cond.wait(l); m_impl->cond.clear(l); } while(!done) + do { m_impl->cond.wait(l); } while(!done) #define TORRENT_SYNC_CALL_RET3(type, x, a1, a2, a3) \ bool done = false; \ type r; \ mutex::scoped_lock l(m_impl->mut); \ - m_impl->cond.clear(l); \ m_impl->m_io_service.post(boost::bind(&fun_ret, &r, &done, &m_impl->cond, &m_impl->mut, boost::function(boost::bind(&session_impl:: x, m_impl.get(), a1, a2, a3)))); \ - do { m_impl->cond.wait(l); m_impl->cond.clear(l); } while(!done) + do { m_impl->cond.wait(l); } while(!done) session::session( fingerprint const& id diff --git a/src/thread.cpp b/src/thread.cpp new file mode 100644 index 000000000..39fda92c1 --- /dev/null +++ b/src/thread.cpp @@ -0,0 +1,109 @@ +/* + +Copyright (c) 2010, Arvid Norberg +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the distribution. + * Neither the name of the author nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + +#include "libtorrent/thread.hpp" +#include "libtorrent/assert.hpp" + +#ifdef TORRENT_BEOS +#include +#endif + +namespace libtorrent +{ + void sleep(int milliseconds) + { +#if defined TORRENT_WINDOWS || defined TORRENT_CYGWIN + Sleep(milliseconds); +#elif defined TORRENT_BEOS + snooze_until(system_time() + boost::int64_t(milliseconds) * 1000, B_SYSTEM_TIMEBASE); +#else + usleep(milliseconds * 1000); +#endif + } + +#ifdef BOOST_HAS_PTHREADS + + condition::condition() + { + pthread_cond_init(&m_cond, 0); + } + + condition::~condition() + { + pthread_cond_destroy(&m_cond); + } + + void condition::wait(mutex::scoped_lock& l) + { + TORRENT_ASSERT(l.locked()); + // wow, this is quite a hack + pthread_cond_wait(&m_cond, (::pthread_mutex_t*)&l.mutex()); + } + + void condition::signal_all(mutex::scoped_lock& l) + { + TORRENT_ASSERT(l.locked()); + pthread_cond_broadcast(&m_cond); + } +#elif defined TORRENT_WINDOWS || defined TORRENT_CYGWIN + condition::condition() + : m_num_waiters(0) + { + m_sem = CreateSemaphore(0, 0, INT_MAX, 0); + } + + condition::~condition() + { + CloseHandle(m_sem); + } + + void condition::wait(mutex::scoped_lock& l) + { + TORRENT_ASSERT(l.locked()); + ++m_num_waiters; + l.unlock(); + WaitForSingleObject(m_sem, INFINITE); + l.lock(); + --m_num_waiters; + } + + void condition::signal_all(mutex::scoped_lock& l) + { + TORRENT_ASSERT(l.locked()); + ReleaseSemaphore(m_sem, m_num_waiters, 0); + } +#else +#error not implemented +#endif + +} + diff --git a/src/torrent_handle.cpp b/src/torrent_handle.cpp index 15a7a41c4..01cb464ee 100644 --- a/src/torrent_handle.cpp +++ b/src/torrent_handle.cpp @@ -82,7 +82,7 @@ namespace libtorrent *ret = f(); mutex::scoped_lock l(*m); *done = true; - e->signal(l); + e->signal_all(l); } // defined in session.cpp @@ -118,7 +118,6 @@ namespace libtorrent bool done = false; \ session_impl& ses = t->session(); \ mutex::scoped_lock l(ses.mut); \ - ses.cond.clear(l); \ ses.m_io_service.post(boost::bind(&fun_wrap, &done, &ses.cond, &ses.mut, boost::function(boost::bind(&torrent:: x, t)))); \ do { ses.cond.wait(l); } while(!done) @@ -128,7 +127,6 @@ namespace libtorrent bool done = false; \ session_impl& ses = t->session(); \ mutex::scoped_lock l(ses.mut); \ - ses.cond.clear(l); \ ses.m_io_service.post(boost::bind(&fun_wrap, &done, &ses.cond, &ses.mut, boost::function(boost::bind(&torrent:: x, t, a1)))); \ t.reset(); \ do { ses.cond.wait(l); } while(!done); } @@ -139,7 +137,6 @@ namespace libtorrent bool done = false; \ session_impl& ses = t->session(); \ mutex::scoped_lock l(ses.mut); \ - ses.cond.clear(l); \ ses.m_io_service.post(boost::bind(&fun_wrap, &done, &ses.cond, &ses.mut, boost::function(boost::bind(&torrent:: x, t, a1, a2)))); \ t.reset(); \ do { ses.cond.wait(l); } while(!done); } @@ -150,7 +147,6 @@ namespace libtorrent bool done = false; \ session_impl& ses = t->session(); \ mutex::scoped_lock l(ses.mut); \ - ses.cond.clear(l); \ ses.m_io_service.post(boost::bind(&fun_wrap, &done, &ses.cond, &ses.mut, boost::function(boost::bind(&torrent:: x, t, a1, a2, a3)))); \ t.reset(); \ do { ses.cond.wait(l); } while(!done); } @@ -162,7 +158,6 @@ namespace libtorrent session_impl& ses = t->session(); \ type r; \ mutex::scoped_lock l(ses.mut); \ - ses.cond.clear(l); \ ses.m_io_service.post(boost::bind(&fun_ret, &r, &done, &ses.cond, &ses.mut, boost::function(boost::bind(&torrent:: x, t)))); \ t.reset(); \ do { ses.cond.wait(l); } while(!done) @@ -174,7 +169,6 @@ namespace libtorrent session_impl& ses = t->session(); \ type r; \ mutex::scoped_lock l(ses.mut); \ - ses.cond.clear(l); \ ses.m_io_service.post(boost::bind(&fun_ret, &r, &done, &ses.cond, &ses.mut, boost::function(boost::bind(&torrent:: x, t, a1)))); \ t.reset(); \ do { ses.cond.wait(l); } while(!done) @@ -186,7 +180,6 @@ namespace libtorrent session_impl& ses = t->session(); \ type r; \ mutex::scoped_lock l(ses.mut); \ - ses.cond.clear(l); \ ses.m_io_service.post(boost::bind(&fun_ret, &r, &done, &ses.cond, &ses.mut, boost::function(boost::bind(&torrent:: x, t, a1, a2)))); \ t.reset(); \ do { ses.cond.wait(l); } while(!done) @@ -734,7 +727,6 @@ namespace libtorrent bool done = false; session_impl& ses = t->session(); mutex::scoped_lock l(ses.mut); - ses.cond.clear(l); ses.m_io_service.post(boost::bind(&fun_wrap, &done, &ses.cond , &ses.mut, boost::function(boost::bind( &piece_manager::write_resume_data, &t->filesystem(), boost::ref(ret))))); diff --git a/test/Jamfile b/test/Jamfile index ed378448b..8a2c1efc9 100644 --- a/test/Jamfile +++ b/test/Jamfile @@ -17,6 +17,7 @@ project ; test-suite libtorrent : + [ run test_threads.cpp ] [ run test_bandwidth_limiter.cpp ] [ run test_buffer.cpp ] [ run test_piece_picker.cpp ] diff --git a/test/setup_transfer.cpp b/test/setup_transfer.cpp index 60c3bd3a0..44675640d 100644 --- a/test/setup_transfer.cpp +++ b/test/setup_transfer.cpp @@ -345,7 +345,7 @@ setup_transfer(session* ses1, session* ses2, session* ses3 boost::asio::io_service* tracker_ios = 0; boost::shared_ptr tracker_server; libtorrent::mutex tracker_lock; -libtorrent::condition tracker_initialized; +libtorrent::event tracker_initialized; bool udp_failed = false; @@ -491,7 +491,7 @@ void udp_tracker_thread(int* port) boost::asio::io_service* web_ios = 0; boost::shared_ptr web_server; libtorrent::mutex web_lock; -libtorrent::condition web_initialized; +libtorrent::event web_initialized; void stop_web_server() { diff --git a/test/test_storage.cpp b/test/test_storage.cpp index 71ddb9623..c06197acb 100644 --- a/test/test_storage.cpp +++ b/test/test_storage.cpp @@ -240,8 +240,8 @@ struct test_storage : storage_interface } private: - condition m_ready_condition; - condition m_condition; + event m_ready_condition; + event m_condition; libtorrent::mutex m_mutex; bool m_started; bool m_ready; diff --git a/test/test_threads.cpp b/test/test_threads.cpp new file mode 100644 index 000000000..b92fea06e --- /dev/null +++ b/test/test_threads.cpp @@ -0,0 +1,73 @@ +/* + +Copyright (c) 2010, Arvid Norberg +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the distribution. + * Neither the name of the author nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + +#include +#include +#include "libtorrent/thread.hpp" +#include "test.hpp" + +using namespace libtorrent; + +void fun(condition* s, mutex* m, int i) +{ + fprintf(stderr, "thread %d waiting\n", i); + mutex::scoped_lock l(*m); + s->wait(l); + fprintf(stderr, "thread %d done\n", i); +} + +int test_main() +{ + condition cond; + mutex m; + std::list threads; + for (int i = 0; i < 20; ++i) + { + threads.push_back(new thread(boost::bind(&fun, &cond, &m, i))); + } + + // make sure all threads are waiting on the condition + sleep(10); + + mutex::scoped_lock l(m); + cond.signal_all(l); + l.unlock(); + + for (std::list::iterator i = threads.begin(); i != threads.end(); ++i) + { + (*i)->join(); + delete *i; + } + + return 0; +} +