From 512273a5077e1aaf6ab8506595d7d1e36e4d7a02 Mon Sep 17 00:00:00 2001 From: Scott Mansell Date: Mon, 23 Jan 2023 19:48:07 +1300 Subject: [PATCH 01/10] WorkQueueThread: Add flush capability --- Source/Core/Common/WorkQueueThread.h | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/Source/Core/Common/WorkQueueThread.h b/Source/Core/Common/WorkQueueThread.h index 6039f7b3b9..f74c164e60 100644 --- a/Source/Core/Common/WorkQueueThread.h +++ b/Source/Core/Common/WorkQueueThread.h @@ -70,6 +70,19 @@ public: } } + // Doesn't return until the most recent function invocation has finished. + void Flush() + { + if (m_thread.joinable()) + { + m_flush.Set(); + Clear(); + m_flushed.Wait(); + } + } + + bool IsFlushing() const { return m_flush.IsSet() || m_shutdown.IsSet(); } + private: void ThreadLoop() { @@ -83,7 +96,14 @@ private: { std::unique_lock lg(m_lock); if (m_items.empty()) + { + if (m_flush.IsSet()) + { + m_flush.Clear(); + m_flushed.Set(); + } break; + } T item{std::move(m_items.front())}; m_items.pop(); lg.unlock(); @@ -101,6 +121,8 @@ private: Common::Event m_wakeup; Common::Flag m_shutdown; Common::Flag m_cancelled; + Common::Flag m_flush; + Common::Event m_flushed; std::mutex m_lock; std::queue m_items; }; From 9badcc6eb8b95252bac91305b06e1c0ebaea329a Mon Sep 17 00:00:00 2001 From: Robin Kertels Date: Fri, 27 Jan 2023 15:05:51 +0100 Subject: [PATCH 02/10] WorkQueueThread: Add Push --- Source/Core/Common/WorkQueueThread.h | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/Source/Core/Common/WorkQueueThread.h b/Source/Core/Common/WorkQueueThread.h index f74c164e60..7d3d9395fb 100644 --- a/Source/Core/Common/WorkQueueThread.h +++ b/Source/Core/Common/WorkQueueThread.h @@ -42,6 +42,26 @@ public: m_wakeup.Set(); } + void Push(T&& item) + { + if (!m_cancelled.IsSet()) + { + std::lock_guard lg(m_lock); + m_items.push(item); + } + m_wakeup.Set(); + } + + void Push(const T& item) + { + if (!m_cancelled.IsSet()) + { + std::lock_guard lg(m_lock); + m_items.push(item); + } + m_wakeup.Set(); + } + void Clear() { { From 9affbfe683ac568d89a00bf6eb20af588ccd1b03 Mon Sep 17 00:00:00 2001 From: Robin Kertels Date: Sun, 29 Jan 2023 16:16:29 +0100 Subject: [PATCH 03/10] WorkQueueThread: Implement proper Flush and rename the existing Flush to FlushOne. --- Source/Core/Common/WorkQueueThread.h | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/Source/Core/Common/WorkQueueThread.h b/Source/Core/Common/WorkQueueThread.h index 7d3d9395fb..2a756f3d7e 100644 --- a/Source/Core/Common/WorkQueueThread.h +++ b/Source/Core/Common/WorkQueueThread.h @@ -91,7 +91,7 @@ public: } // Doesn't return until the most recent function invocation has finished. - void Flush() + void FlushOne() { if (m_thread.joinable()) { @@ -101,6 +101,17 @@ public: } } + // Doesn't return until the queue is empty. + void Flush() + { + if (m_thread.joinable()) + { + m_flush.Set(); + m_wakeup.Set(); + m_flushed.Wait(); + } + } + bool IsFlushing() const { return m_flush.IsSet() || m_shutdown.IsSet(); } private: From 94a0c50bf889dea188f1993202657e8f202a302c Mon Sep 17 00:00:00 2001 From: Robin Kertels Date: Sun, 29 Jan 2023 17:07:36 +0100 Subject: [PATCH 04/10] WorkQueueThread: Rework without Flags/Events --- Source/Core/Common/WorkQueueThread.h | 142 +++++++++++++++------------ 1 file changed, 80 insertions(+), 62 deletions(-) diff --git a/Source/Core/Common/WorkQueueThread.h b/Source/Core/Common/WorkQueueThread.h index 2a756f3d7e..a6cb71b7f9 100644 --- a/Source/Core/Common/WorkQueueThread.h +++ b/Source/Core/Common/WorkQueueThread.h @@ -25,8 +25,8 @@ public: void Reset(std::function function) { Shutdown(); - m_shutdown.Clear(); - m_cancelled.Clear(); + std::lock_guard lg(m_lock); + m_cancelled = false; m_function = std::move(function); m_thread = std::thread(&WorkQueueThread::ThreadLoop, this); } @@ -34,85 +34,107 @@ public: template void EmplaceItem(Args&&... args) { - if (!m_cancelled.IsSet()) + std::lock_guard lg(m_lock); + if (m_shutdown) { - std::lock_guard lg(m_lock); - m_items.emplace(std::forward(args)...); + return; } - m_wakeup.Set(); + m_items.emplace(std::forward(args)...); + m_idle = false; + m_worker_cond_var.notify_one(); } void Push(T&& item) { - if (!m_cancelled.IsSet()) + std::lock_guard lg(m_lock); + if (m_cancelled) { - std::lock_guard lg(m_lock); - m_items.push(item); + return; } - m_wakeup.Set(); + m_items.push(item); + m_idle = false; + m_worker_cond_var.notify_one(); } void Push(const T& item) { - if (!m_cancelled.IsSet()) - { std::lock_guard lg(m_lock); + if (m_cancelled) + { + return; + } m_items.push(item); - } - m_wakeup.Set(); + m_idle = false; + m_worker_cond_var.notify_one(); } void Clear() { - { - std::lock_guard lg(m_lock); - m_items = std::queue(); - } - m_wakeup.Set(); + std::lock_guard lg(m_lock); + m_items = std::queue(); + m_worker_cond_var.notify_one(); } void Cancel() { - m_cancelled.Set(); - Clear(); - Shutdown(); - } + if (!m_thread.joinable()) + { + return; + } - bool IsCancelled() const { return m_cancelled.IsSet(); } + { + std::unique_lock lg(m_lock); + m_items = std::queue(); + m_cancelled = true; + m_shutdown = true; + m_worker_cond_var.notify_one(); + } + m_thread.join(); + } void Shutdown() { - if (m_thread.joinable()) + if (!m_thread.joinable()) { - m_shutdown.Set(); - m_wakeup.Set(); - m_thread.join(); + return; } + + { + std::unique_lock lg(m_lock); + m_shutdown = true; + m_worker_cond_var.notify_one(); + } + m_thread.join(); } // Doesn't return until the most recent function invocation has finished. - void FlushOne() + void ClearAndFlush() { - if (m_thread.joinable()) + if (!m_thread.joinable()) { - m_flush.Set(); - Clear(); - m_flushed.Wait(); + return; } + + std::unique_lock lg(m_lock); + m_items = std::queue(); + m_wait_cond_var.wait(lg, [&] { + return m_idle; + }); } - // Doesn't return until the queue is empty. + // Doesn't return until the most recent function invocation has finished. void Flush() { - if (m_thread.joinable()) + if (!m_thread.joinable()) { - m_flush.Set(); - m_wakeup.Set(); - m_flushed.Wait(); + return; } - } - bool IsFlushing() const { return m_flush.IsSet() || m_shutdown.IsSet(); } + std::unique_lock lg(m_lock); + m_wait_cond_var.wait(lg, [&] { + return m_idle; + }); + } private: void ThreadLoop() @@ -121,41 +143,37 @@ private: while (true) { - m_wakeup.Wait(); - - while (true) + std::unique_lock lg(m_lock); + if (m_items.empty()) { - std::unique_lock lg(m_lock); - if (m_items.empty()) + m_idle = true; + m_wait_cond_var.notify_all(); + m_worker_cond_var.wait(lg, [&] { + return m_shutdown || !m_items.empty(); + }); + if (m_shutdown) { - if (m_flush.IsSet()) - { - m_flush.Clear(); - m_flushed.Set(); - } break; } - T item{std::move(m_items.front())}; - m_items.pop(); - lg.unlock(); - - m_function(std::move(item)); + continue; } + T item{std::move(m_items.front())}; + m_items.pop(); + lg.unlock(); - if (m_shutdown.IsSet()) - break; + m_function(std::move(item)); } } std::function m_function; std::thread m_thread; - Common::Event m_wakeup; - Common::Flag m_shutdown; - Common::Flag m_cancelled; - Common::Flag m_flush; - Common::Event m_flushed; std::mutex m_lock; std::queue m_items; + std::condition_variable m_wait_cond_var; + std::condition_variable m_worker_cond_var; + bool m_idle = true; + bool m_shutdown = false; + bool m_cancelled = false; }; } // namespace Common From acdb0c5be12ae3db1f09283a0d491fe07f178683 Mon Sep 17 00:00:00 2001 From: Scott Mansell Date: Sat, 4 Feb 2023 11:31:49 +1300 Subject: [PATCH 05/10] WorkQueueThread: Implement thread name Otherwise we will end up with a dozen threads named "WorkQueueThread" --- Source/Core/Common/WorkQueueThread.h | 11 ++++++++--- Source/Core/Core/HW/EXI/EXI_DeviceMic.cpp | 2 +- Source/Core/Core/HW/EXI/EXI_DeviceMic.h | 2 +- Source/Core/Core/IOS/Network/IP/Top.h | 2 +- Source/Core/Core/IOS/Network/KD/NetKDRequest.h | 2 +- Source/Core/Core/State.cpp | 2 +- Source/Core/DolphinQt/GameList/GameTracker.h | 2 +- 7 files changed, 14 insertions(+), 9 deletions(-) diff --git a/Source/Core/Common/WorkQueueThread.h b/Source/Core/Common/WorkQueueThread.h index a6cb71b7f9..c3315ac26f 100644 --- a/Source/Core/Common/WorkQueueThread.h +++ b/Source/Core/Common/WorkQueueThread.h @@ -5,6 +5,7 @@ #include #include +#include #include #include "Common/Event.h" @@ -19,8 +20,11 @@ template class WorkQueueThread { public: - WorkQueueThread() = default; - WorkQueueThread(std::function function) { Reset(std::move(function)); } + WorkQueueThread(std::string name) : m_thread_name(name){}; + WorkQueueThread(std::function function, std::string name) : m_thread_name(name) + { + Reset(std::move(function)); + } ~WorkQueueThread() { Shutdown(); } void Reset(std::function function) { @@ -139,7 +143,7 @@ public: private: void ThreadLoop() { - Common::SetCurrentThreadName("WorkQueueThread"); + Common::SetCurrentThreadName(m_thread_name.c_str()); while (true) { @@ -166,6 +170,7 @@ private: } std::function m_function; + std::string m_thread_name; std::thread m_thread; std::mutex m_lock; std::queue m_items; diff --git a/Source/Core/Core/HW/EXI/EXI_DeviceMic.cpp b/Source/Core/Core/HW/EXI/EXI_DeviceMic.cpp index 31de1ddb5a..7aac4177b5 100644 --- a/Source/Core/Core/HW/EXI/EXI_DeviceMic.cpp +++ b/Source/Core/Core/HW/EXI/EXI_DeviceMic.cpp @@ -200,7 +200,7 @@ CEXIMic::CEXIMic(int index) : slot(index) #ifdef _WIN32 , - m_work_queue([](const std::function& func) { func(); }) + m_work_queue([](const std::function& func) { func(); }, "Mic Worker") #endif { m_position = 0; diff --git a/Source/Core/Core/HW/EXI/EXI_DeviceMic.h b/Source/Core/Core/HW/EXI/EXI_DeviceMic.h index a831ef33e5..937665525b 100644 --- a/Source/Core/Core/HW/EXI/EXI_DeviceMic.h +++ b/Source/Core/Core/HW/EXI/EXI_DeviceMic.h @@ -102,7 +102,7 @@ private: int samples_avail; #ifdef _WIN32 - Common::WorkQueueThread> m_work_queue; + Common::WorkQueueThread> m_work_queue{"Mic Worker"}; bool m_coinit_success = false; bool m_should_couninit = false; #endif diff --git a/Source/Core/Core/IOS/Network/IP/Top.h b/Source/Core/Core/IOS/Network/IP/Top.h index cc41180860..1e5f86cb29 100644 --- a/Source/Core/Core/IOS/Network/IP/Top.h +++ b/Source/Core/Core/IOS/Network/IP/Top.h @@ -120,7 +120,7 @@ private: IPCReply HandleICMPPingRequest(const IOCtlVRequest& request); Common::SocketContext m_socket_context; - Common::WorkQueueThread m_work_queue; + Common::WorkQueueThread m_work_queue{"Network Worker"}; std::mutex m_async_reply_lock; std::queue m_async_replies; }; diff --git a/Source/Core/Core/IOS/Network/KD/NetKDRequest.h b/Source/Core/Core/IOS/Network/KD/NetKDRequest.h index aae52a11f5..8d2cb9153c 100644 --- a/Source/Core/Core/IOS/Network/KD/NetKDRequest.h +++ b/Source/Core/Core/IOS/Network/KD/NetKDRequest.h @@ -54,7 +54,7 @@ private: NWC24::NWC24Config config; NWC24::NWC24Dl m_dl_list; - Common::WorkQueueThread m_work_queue; + Common::WorkQueueThread m_work_queue{"WiiConnect24 Worker"}; std::mutex m_async_reply_lock; std::queue m_async_replies; // TODO: Maybe move away from Common::HttpRequest? diff --git a/Source/Core/Core/State.cpp b/Source/Core/Core/State.cpp index 405af72f4a..f045afadb5 100644 --- a/Source/Core/Core/State.cpp +++ b/Source/Core/Core/State.cpp @@ -85,7 +85,7 @@ struct CompressAndDumpState_args static std::mutex s_save_thread_mutex; // Queue for compressing and writing savestates to disk. -static Common::WorkQueueThread s_save_thread; +static Common::WorkQueueThread s_save_thread("Savestate Worker"); // Keeps track of savestate writes that are currently happening, so we don't load a state while // another one is still saving. This is particularly important so if you save to a slot and then diff --git a/Source/Core/DolphinQt/GameList/GameTracker.h b/Source/Core/DolphinQt/GameList/GameTracker.h index 97ec5bb268..1cfd02e62a 100644 --- a/Source/Core/DolphinQt/GameList/GameTracker.h +++ b/Source/Core/DolphinQt/GameList/GameTracker.h @@ -87,7 +87,7 @@ private: // game path -> directories that track it QMap> m_tracked_files; QVector m_tracked_paths; - Common::WorkQueueThread m_load_thread; + Common::WorkQueueThread m_load_thread{"GameList Tracker"}; UICommon::GameFileCache m_cache; Common::Event m_cache_loaded_event; Common::Event m_initial_games_emitted_event; From 6594532f103f85d2dc7362506040b3c25cbacb42 Mon Sep 17 00:00:00 2001 From: Scott Mansell Date: Sat, 4 Feb 2023 12:32:58 +1300 Subject: [PATCH 06/10] WorkQueueThread: rework Cancel/Shutdown workflow - Cancel doesn't shut down anymore. Allowing it to be used multiple times thoughout the life of the WorkQueue - Remove Clear, so we only have Cancel semantics - Add IsCancelling so work items can abort early if cancelling - Replace m_cancelled and m_thread.joinable() guars with m_shutdown. - Rename Flush to WaitForCompletion (As it's ambiguous if a function called flush should be blocking or not) - Add documentation --- Source/Core/Common/WorkQueueThread.h | 123 ++++++++---------- .../Core/DolphinQt/GameList/GameTracker.cpp | 4 +- 2 files changed, 55 insertions(+), 72 deletions(-) diff --git a/Source/Core/Common/WorkQueueThread.h b/Source/Core/Common/WorkQueueThread.h index c3315ac26f..f7c70edee8 100644 --- a/Source/Core/Common/WorkQueueThread.h +++ b/Source/Core/Common/WorkQueueThread.h @@ -3,13 +3,13 @@ #pragma once +#include +#include #include #include #include #include -#include "Common/Event.h" -#include "Common/Flag.h" #include "Common/Thread.h" // A thread that executes the given function for every item placed into its queue. @@ -26,120 +26,104 @@ public: Reset(std::move(function)); } ~WorkQueueThread() { Shutdown(); } + + // Shuts the current work thread down (if any) and starts a new thread with the given function + // Note: Some consumers of this API push items to the queue before starting the thread. void Reset(std::function function) { Shutdown(); std::lock_guard lg(m_lock); - m_cancelled = false; + m_shutdown = false; m_function = std::move(function); m_thread = std::thread(&WorkQueueThread::ThreadLoop, this); } + // Adds an item to the work queue template void EmplaceItem(Args&&... args) { std::lock_guard lg(m_lock); if (m_shutdown) - { return; - } + m_items.emplace(std::forward(args)...); m_idle = false; m_worker_cond_var.notify_one(); } + // Adds an item to the work queue void Push(T&& item) { std::lock_guard lg(m_lock); - if (m_cancelled) - { + if (m_shutdown) return; - } + m_items.push(item); m_idle = false; m_worker_cond_var.notify_one(); } + // Adds an item to the work queue void Push(const T& item) { - std::lock_guard lg(m_lock); - if (m_cancelled) - { - return; - } - m_items.push(item); - m_idle = false; - m_worker_cond_var.notify_one(); + std::lock_guard lg(m_lock); + if (m_shutdown) + return; + + m_items.push(item); + m_idle = false; + m_worker_cond_var.notify_one(); } - void Clear() + // Empties the queue + // If the worker polls IsCanceling(), it can abort it's work when Cancelling + void Cancel() { - std::lock_guard lg(m_lock); + std::unique_lock lg(m_lock); + if (m_shutdown) + return; + + m_cancelling = true; m_items = std::queue(); m_worker_cond_var.notify_one(); } - void Cancel() + // Tells the worker to shut down when it's queue is empty + // Blocks until the worker thread exits. + // If cancel is true, will Cancel before before telling the worker to exit + void Shutdown(bool cancel = false) { - if (!m_thread.joinable()) - { - return; - } - { std::unique_lock lg(m_lock); - m_items = std::queue(); - m_cancelled = true; + if (m_shutdown || !m_thread.joinable()) + return; + + if (cancel) + { + m_cancelling = true; + m_items = std::queue(); + } + m_shutdown = true; m_worker_cond_var.notify_one(); } + m_thread.join(); } - void Shutdown() + // Blocks until all items in the queue have been processed (or cancelled) + void WaitForCompletion() { - if (!m_thread.joinable()) - { - return; - } - - { - std::unique_lock lg(m_lock); - m_shutdown = true; - m_worker_cond_var.notify_one(); - } - m_thread.join(); - } - - // Doesn't return until the most recent function invocation has finished. - void ClearAndFlush() - { - if (!m_thread.joinable()) - { - return; - } - std::unique_lock lg(m_lock); - m_items = std::queue(); - m_wait_cond_var.wait(lg, [&] { - return m_idle; - }); - } - - // Doesn't return until the most recent function invocation has finished. - void Flush() - { - if (!m_thread.joinable()) - { + if (m_idle) // Only check m_idle, we want this to work even another thread called Shutdown return; - } - std::unique_lock lg(m_lock); - m_wait_cond_var.wait(lg, [&] { - return m_idle; - }); + m_wait_cond_var.wait(lg, [&] { return m_idle; }); } + // For the worker to check if it should abort it's work early. + bool IsCancelling() const { return m_cancelling.load(); } + private: void ThreadLoop() { @@ -151,14 +135,13 @@ private: if (m_items.empty()) { m_idle = true; + m_cancelling = false; m_wait_cond_var.notify_all(); - m_worker_cond_var.wait(lg, [&] { - return m_shutdown || !m_items.empty(); - }); + m_worker_cond_var.wait( + lg, [&] { return m_shutdown || m_cancelling.load() || !m_items.empty(); }); + if (m_shutdown) - { break; - } continue; } T item{std::move(m_items.front())}; @@ -176,9 +159,9 @@ private: std::queue m_items; std::condition_variable m_wait_cond_var; std::condition_variable m_worker_cond_var; + std::atomic m_cancelling = false; bool m_idle = true; bool m_shutdown = false; - bool m_cancelled = false; }; } // namespace Common diff --git a/Source/Core/DolphinQt/GameList/GameTracker.cpp b/Source/Core/DolphinQt/GameList/GameTracker.cpp index 7ee0c04e3a..6298bbdca1 100644 --- a/Source/Core/DolphinQt/GameList/GameTracker.cpp +++ b/Source/Core/DolphinQt/GameList/GameTracker.cpp @@ -37,7 +37,7 @@ GameTracker::GameTracker(QObject* parent) : QFileSystemWatcher(parent) connect(qApp, &QApplication::aboutToQuit, this, [this] { m_processing_halted = true; - m_load_thread.Cancel(); + m_load_thread.Shutdown(true); }); connect(this, &QFileSystemWatcher::directoryChanged, this, &GameTracker::UpdateDirectory); connect(this, &QFileSystemWatcher::fileChanged, this, &GameTracker::UpdateFile); @@ -203,7 +203,7 @@ void GameTracker::RemoveDirectory(const QString& dir) void GameTracker::RefreshAll() { m_processing_halted = true; - m_load_thread.Clear(); + m_load_thread.Cancel(); m_load_thread.EmplaceItem(Command{CommandType::ResumeProcessing, {}}); if (m_needs_purge) From 7c4fcc30a369763b9fd5f04964d275c707fb17c2 Mon Sep 17 00:00:00 2001 From: Scott Mansell Date: Sat, 4 Feb 2023 15:56:27 +1300 Subject: [PATCH 07/10] WorkQueueThread: provide name and function at same time --- Source/Core/AudioCommon/CubebStream.cpp | 2 +- Source/Core/Common/WorkQueueThread.h | 9 +++++---- Source/Core/Core/HW/EXI/EXI_DeviceMic.cpp | 2 +- Source/Core/Core/HW/EXI/EXI_DeviceMic.h | 2 +- Source/Core/Core/IOS/Network/IP/Top.cpp | 2 +- Source/Core/Core/IOS/Network/IP/Top.h | 2 +- Source/Core/Core/IOS/Network/KD/NetKDRequest.cpp | 2 +- Source/Core/Core/IOS/Network/KD/NetKDRequest.h | 2 +- Source/Core/Core/State.cpp | 4 ++-- Source/Core/DolphinQt/GameList/GameTracker.cpp | 2 +- Source/Core/DolphinQt/GameList/GameTracker.h | 2 +- 11 files changed, 16 insertions(+), 15 deletions(-) diff --git a/Source/Core/AudioCommon/CubebStream.cpp b/Source/Core/AudioCommon/CubebStream.cpp index f4566cb57e..92ab886580 100644 --- a/Source/Core/AudioCommon/CubebStream.cpp +++ b/Source/Core/AudioCommon/CubebStream.cpp @@ -39,7 +39,7 @@ void CubebStream::StateCallback(cubeb_stream* stream, void* user_data, cubeb_sta CubebStream::CubebStream() #ifdef _WIN32 - : m_work_queue([](const std::function& func) { func(); }) + : m_work_queue("Cubeb Worker", [](const std::function& func) { func(); }) { Common::Event sync_event; m_work_queue.EmplaceItem([this, &sync_event] { diff --git a/Source/Core/Common/WorkQueueThread.h b/Source/Core/Common/WorkQueueThread.h index f7c70edee8..74074a326a 100644 --- a/Source/Core/Common/WorkQueueThread.h +++ b/Source/Core/Common/WorkQueueThread.h @@ -20,19 +20,20 @@ template class WorkQueueThread { public: - WorkQueueThread(std::string name) : m_thread_name(name){}; - WorkQueueThread(std::function function, std::string name) : m_thread_name(name) + WorkQueueThread() = default; + WorkQueueThread(const std::string name, std::function function) { - Reset(std::move(function)); + Reset(std::move(name), std::move(function)); } ~WorkQueueThread() { Shutdown(); } // Shuts the current work thread down (if any) and starts a new thread with the given function // Note: Some consumers of this API push items to the queue before starting the thread. - void Reset(std::function function) + void Reset(const std::string& name, std::function function) { Shutdown(); std::lock_guard lg(m_lock); + m_thread_name = std::move(name); m_shutdown = false; m_function = std::move(function); m_thread = std::thread(&WorkQueueThread::ThreadLoop, this); diff --git a/Source/Core/Core/HW/EXI/EXI_DeviceMic.cpp b/Source/Core/Core/HW/EXI/EXI_DeviceMic.cpp index 7aac4177b5..e88cd6162c 100644 --- a/Source/Core/Core/HW/EXI/EXI_DeviceMic.cpp +++ b/Source/Core/Core/HW/EXI/EXI_DeviceMic.cpp @@ -200,7 +200,7 @@ CEXIMic::CEXIMic(int index) : slot(index) #ifdef _WIN32 , - m_work_queue([](const std::function& func) { func(); }, "Mic Worker") + m_work_queue("Mic Worker", [](const std::function& func) { func(); }) #endif { m_position = 0; diff --git a/Source/Core/Core/HW/EXI/EXI_DeviceMic.h b/Source/Core/Core/HW/EXI/EXI_DeviceMic.h index 937665525b..a831ef33e5 100644 --- a/Source/Core/Core/HW/EXI/EXI_DeviceMic.h +++ b/Source/Core/Core/HW/EXI/EXI_DeviceMic.h @@ -102,7 +102,7 @@ private: int samples_avail; #ifdef _WIN32 - Common::WorkQueueThread> m_work_queue{"Mic Worker"}; + Common::WorkQueueThread> m_work_queue; bool m_coinit_success = false; bool m_should_couninit = false; #endif diff --git a/Source/Core/Core/IOS/Network/IP/Top.cpp b/Source/Core/Core/IOS/Network/IP/Top.cpp index ac512be37c..27487918d0 100644 --- a/Source/Core/Core/IOS/Network/IP/Top.cpp +++ b/Source/Core/Core/IOS/Network/IP/Top.cpp @@ -65,7 +65,7 @@ enum SOResultCode : s32 NetIPTopDevice::NetIPTopDevice(Kernel& ios, const std::string& device_name) : Device(ios, device_name) { - m_work_queue.Reset([this](AsyncTask task) { + m_work_queue.Reset("Network Worker", [this](AsyncTask task) { const IPCReply reply = task.handler(); { std::lock_guard lg(m_async_reply_lock); diff --git a/Source/Core/Core/IOS/Network/IP/Top.h b/Source/Core/Core/IOS/Network/IP/Top.h index 1e5f86cb29..cc41180860 100644 --- a/Source/Core/Core/IOS/Network/IP/Top.h +++ b/Source/Core/Core/IOS/Network/IP/Top.h @@ -120,7 +120,7 @@ private: IPCReply HandleICMPPingRequest(const IOCtlVRequest& request); Common::SocketContext m_socket_context; - Common::WorkQueueThread m_work_queue{"Network Worker"}; + Common::WorkQueueThread m_work_queue; std::mutex m_async_reply_lock; std::queue m_async_replies; }; diff --git a/Source/Core/Core/IOS/Network/KD/NetKDRequest.cpp b/Source/Core/Core/IOS/Network/KD/NetKDRequest.cpp index 1e8063a60d..8c5f80b6d4 100644 --- a/Source/Core/Core/IOS/Network/KD/NetKDRequest.cpp +++ b/Source/Core/Core/IOS/Network/KD/NetKDRequest.cpp @@ -150,7 +150,7 @@ s32 NWC24MakeUserID(u64* nwc24_id, u32 hollywood_id, u16 id_ctr, HardwareModel h NetKDRequestDevice::NetKDRequestDevice(Kernel& ios, const std::string& device_name) : Device(ios, device_name), config{ios.GetFS()}, m_dl_list{ios.GetFS()} { - m_work_queue.Reset([this](AsyncTask task) { + m_work_queue.Reset("WiiConnect24 Worker", [this](AsyncTask task) { const IPCReply reply = task.handler(); { std::lock_guard lg(m_async_reply_lock); diff --git a/Source/Core/Core/IOS/Network/KD/NetKDRequest.h b/Source/Core/Core/IOS/Network/KD/NetKDRequest.h index 8d2cb9153c..aae52a11f5 100644 --- a/Source/Core/Core/IOS/Network/KD/NetKDRequest.h +++ b/Source/Core/Core/IOS/Network/KD/NetKDRequest.h @@ -54,7 +54,7 @@ private: NWC24::NWC24Config config; NWC24::NWC24Dl m_dl_list; - Common::WorkQueueThread m_work_queue{"WiiConnect24 Worker"}; + Common::WorkQueueThread m_work_queue; std::mutex m_async_reply_lock; std::queue m_async_replies; // TODO: Maybe move away from Common::HttpRequest? diff --git a/Source/Core/Core/State.cpp b/Source/Core/Core/State.cpp index f045afadb5..fe1a270b2c 100644 --- a/Source/Core/Core/State.cpp +++ b/Source/Core/Core/State.cpp @@ -85,7 +85,7 @@ struct CompressAndDumpState_args static std::mutex s_save_thread_mutex; // Queue for compressing and writing savestates to disk. -static Common::WorkQueueThread s_save_thread("Savestate Worker"); +static Common::WorkQueueThread s_save_thread; // Keeps track of savestate writes that are currently happening, so we don't load a state while // another one is still saving. This is particularly important so if you save to a slot and then @@ -724,7 +724,7 @@ void Init() if (lzo_init() != LZO_E_OK) PanicAlertFmtT("Internal LZO Error - lzo_init() failed"); - s_save_thread.Reset([](CompressAndDumpState_args args) { + s_save_thread.Reset("Savestate Worker", [](CompressAndDumpState_args args) { CompressAndDumpState(args); { diff --git a/Source/Core/DolphinQt/GameList/GameTracker.cpp b/Source/Core/DolphinQt/GameList/GameTracker.cpp index 6298bbdca1..9428729997 100644 --- a/Source/Core/DolphinQt/GameList/GameTracker.cpp +++ b/Source/Core/DolphinQt/GameList/GameTracker.cpp @@ -55,7 +55,7 @@ GameTracker::GameTracker(QObject* parent) : QFileSystemWatcher(parent) m_load_thread.EmplaceItem(Command{CommandType::UpdateMetadata, {}}); }); - m_load_thread.Reset([this](Command command) { + m_load_thread.Reset("GameList Tracker", [this](Command command) { switch (command.type) { case CommandType::LoadCache: diff --git a/Source/Core/DolphinQt/GameList/GameTracker.h b/Source/Core/DolphinQt/GameList/GameTracker.h index 1cfd02e62a..97ec5bb268 100644 --- a/Source/Core/DolphinQt/GameList/GameTracker.h +++ b/Source/Core/DolphinQt/GameList/GameTracker.h @@ -87,7 +87,7 @@ private: // game path -> directories that track it QMap> m_tracked_files; QVector m_tracked_paths; - Common::WorkQueueThread m_load_thread{"GameList Tracker"}; + Common::WorkQueueThread m_load_thread; UICommon::GameFileCache m_cache; Common::Event m_cache_loaded_event; Common::Event m_initial_games_emitted_event; From 9c012b09b35727265fbac45ed25b5ea87e592f77 Mon Sep 17 00:00:00 2001 From: Scott Mansell Date: Sat, 4 Feb 2023 16:42:50 +1300 Subject: [PATCH 08/10] Address review feedback --- Source/Core/Common/WorkQueueThread.h | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/Source/Core/Common/WorkQueueThread.h b/Source/Core/Common/WorkQueueThread.h index 74074a326a..604ccffb29 100644 --- a/Source/Core/Common/WorkQueueThread.h +++ b/Source/Core/Common/WorkQueueThread.h @@ -8,6 +8,7 @@ #include #include #include +#include #include #include "Common/Thread.h" @@ -21,19 +22,19 @@ class WorkQueueThread { public: WorkQueueThread() = default; - WorkQueueThread(const std::string name, std::function function) + WorkQueueThread(const std::string_view name, std::function function) { - Reset(std::move(name), std::move(function)); + Reset(name, std::move(function)); } ~WorkQueueThread() { Shutdown(); } // Shuts the current work thread down (if any) and starts a new thread with the given function // Note: Some consumers of this API push items to the queue before starting the thread. - void Reset(const std::string& name, std::function function) + void Reset(const std::string_view name, std::function function) { Shutdown(); std::lock_guard lg(m_lock); - m_thread_name = std::move(name); + m_thread_name = name; m_shutdown = false; m_function = std::move(function); m_thread = std::thread(&WorkQueueThread::ThreadLoop, this); @@ -59,7 +60,7 @@ public: if (m_shutdown) return; - m_items.push(item); + m_items.push(std::move(item)); m_idle = false; m_worker_cond_var.notify_one(); } @@ -122,7 +123,7 @@ public: m_wait_cond_var.wait(lg, [&] { return m_idle; }); } - // For the worker to check if it should abort it's work early. + // If the worker polls IsCanceling(), it can abort its work when Cancelling bool IsCancelling() const { return m_cancelling.load(); } private: From 271ffde71d5ac2f7ecc2d5850358731eb0613c3c Mon Sep 17 00:00:00 2001 From: Scott Mansell Date: Sun, 5 Feb 2023 17:17:16 +1300 Subject: [PATCH 09/10] Prevent WaitForCompletion shutdown deadlock. Adjust shutdown order to prevent potential deadlocks when one thread calls Shutdown, and another calls WaitForCompletion. --- Source/Core/Common/WorkQueueThread.h | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/Source/Core/Common/WorkQueueThread.h b/Source/Core/Common/WorkQueueThread.h index 604ccffb29..f7eb06f82f 100644 --- a/Source/Core/Common/WorkQueueThread.h +++ b/Source/Core/Common/WorkQueueThread.h @@ -93,6 +93,7 @@ public: // Tells the worker to shut down when it's queue is empty // Blocks until the worker thread exits. // If cancel is true, will Cancel before before telling the worker to exit + // Otherwise, all currently queued items will complete before the worker exits void Shutdown(bool cancel = false) { { @@ -117,10 +118,13 @@ public: void WaitForCompletion() { std::unique_lock lg(m_lock); - if (m_idle) // Only check m_idle, we want this to work even another thread called Shutdown + // don't check m_shutdown, because it gets set to request a shutdown, and we want to wait until + // after the shutdown completes. + // We also check m_cancelling, because we want to ensure the worker acknowledges our cancel. + if (m_idle && !m_cancelling.load()) return; - m_wait_cond_var.wait(lg, [&] { return m_idle; }); + m_wait_cond_var.wait(lg, [&] { return m_idle && m_cancelling.load(); }); } // If the worker polls IsCanceling(), it can abort its work when Cancelling @@ -134,17 +138,16 @@ private: while (true) { std::unique_lock lg(m_lock); - if (m_items.empty()) + while (m_items.empty()) { m_idle = true; m_cancelling = false; m_wait_cond_var.notify_all(); + if (m_shutdown) + return; + m_worker_cond_var.wait( lg, [&] { return m_shutdown || m_cancelling.load() || !m_items.empty(); }); - - if (m_shutdown) - break; - continue; } T item{std::move(m_items.front())}; m_items.pop(); From 2ff155f74280ea2fdbbeb0536f66b3b04a92930b Mon Sep 17 00:00:00 2001 From: Scott Mansell Date: Sun, 5 Feb 2023 17:20:00 +1300 Subject: [PATCH 10/10] Optimise cond_var predicate order m_items.empty() is by far the most likely reason for a notification. --- Source/Core/Common/WorkQueueThread.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Source/Core/Common/WorkQueueThread.h b/Source/Core/Common/WorkQueueThread.h index f7eb06f82f..be3c3198d4 100644 --- a/Source/Core/Common/WorkQueueThread.h +++ b/Source/Core/Common/WorkQueueThread.h @@ -147,7 +147,7 @@ private: return; m_worker_cond_var.wait( - lg, [&] { return m_shutdown || m_cancelling.load() || !m_items.empty(); }); + lg, [&] { return !m_items.empty() || m_shutdown || m_cancelling.load(); }); } T item{std::move(m_items.front())}; m_items.pop();