From e305ffa6bf734db80af9f10d41a01596548c41e5 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Wed, 5 Feb 2020 13:41:30 -0800 Subject: [PATCH] [WIP] Simplify task queues locking mechanism We now have one mutex guarding all accesses to the underlying task heaps. This simplifies the more granular but bug prone mechanism of having striped locks. This also re-enables GPUThreadMerger tests that are currently disabled due to their flaky nature. --- fml/gpu_thread_merger_unittests.cc | 9 ++-- fml/message_loop_task_queues.cc | 76 ++++++++++-------------------- fml/message_loop_task_queues.h | 7 +-- 3 files changed, 28 insertions(+), 64 deletions(-) diff --git a/fml/gpu_thread_merger_unittests.cc b/fml/gpu_thread_merger_unittests.cc index a8c3681712a0f..66f21c0fc494d 100644 --- a/fml/gpu_thread_merger_unittests.cc +++ b/fml/gpu_thread_merger_unittests.cc @@ -14,8 +14,7 @@ #include "flutter/fml/task_runner.h" #include "gtest/gtest.h" -// TODO(49007): Flaky. Investigate, fix and re-enable. -TEST(GpuThreadMerger, DISABLED_RemainMergedTillLeaseExpires) { +TEST(GpuThreadMerger, RemainMergedTillLeaseExpires) { fml::MessageLoop* loop1 = nullptr; fml::AutoResetWaitableEvent latch1; fml::AutoResetWaitableEvent term1; @@ -62,8 +61,7 @@ TEST(GpuThreadMerger, DISABLED_RemainMergedTillLeaseExpires) { thread2.join(); } -// TODO(49007): Flaky. Investigate, fix and re-enable. -TEST(GpuThreadMerger, DISABLED_IsNotOnRasterizingThread) { +TEST(GpuThreadMerger, IsNotOnRasterizingThread) { fml::MessageLoop* loop1 = nullptr; fml::AutoResetWaitableEvent latch1; std::thread thread1([&loop1, &latch1]() { @@ -148,8 +146,7 @@ TEST(GpuThreadMerger, DISABLED_IsNotOnRasterizingThread) { thread2.join(); } -// TODO(49007): Flaky. Investigate, fix and re-enable. -TEST(GpuThreadMerger, DISABLED_LeaseExtension) { +TEST(GpuThreadMerger, LeaseExtension) { fml::MessageLoop* loop1 = nullptr; fml::AutoResetWaitableEvent latch1; fml::AutoResetWaitableEvent term1; diff --git a/fml/message_loop_task_queues.cc b/fml/message_loop_task_queues.cc index 0efc33e0414ad..52838676feab6 100644 --- a/fml/message_loop_task_queues.cc +++ b/fml/message_loop_task_queues.cc @@ -8,6 +8,8 @@ #include "flutter/fml/make_copyable.h" #include "flutter/fml/message_loop_impl.h" +#include + namespace fml { std::mutex MessageLoopTaskQueues::creation_mutex_; @@ -32,44 +34,36 @@ fml::RefPtr MessageLoopTaskQueues::GetInstance() { } TaskQueueId MessageLoopTaskQueues::CreateTaskQueue() { - fml::UniqueLock lock(*queue_meta_mutex_); + std::lock_guard guard(queue_mutex_); TaskQueueId loop_id = TaskQueueId(task_queue_id_counter_); ++task_queue_id_counter_; - queue_entries_[loop_id] = std::make_unique(); - queue_locks_[loop_id] = std::make_unique(); - return loop_id; } MessageLoopTaskQueues::MessageLoopTaskQueues() - : queue_meta_mutex_(fml::SharedMutex::Create()), - task_queue_id_counter_(0), - order_(0) {} + : task_queue_id_counter_(0), order_(0) {} MessageLoopTaskQueues::~MessageLoopTaskQueues() = default; void MessageLoopTaskQueues::Dispose(TaskQueueId queue_id) { - std::scoped_lock queue_lock(GetMutex(queue_id)); - + std::lock_guard guard(queue_mutex_); const auto& queue_entry = queue_entries_.at(queue_id); FML_DCHECK(queue_entry->subsumed_by == _kUnmerged); TaskQueueId subsumed = queue_entry->owner_of; queue_entries_.erase(queue_id); if (subsumed != _kUnmerged) { - std::scoped_lock subsumed_lock(*queue_locks_.at(subsumed)); queue_entries_.erase(subsumed); } } void MessageLoopTaskQueues::DisposeTasks(TaskQueueId queue_id) { - std::scoped_lock queue_lock(GetMutex(queue_id)); + std::lock_guard guard(queue_mutex_); const auto& queue_entry = queue_entries_.at(queue_id); FML_DCHECK(queue_entry->subsumed_by == _kUnmerged); TaskQueueId subsumed = queue_entry->owner_of; queue_entry->delayed_tasks = {}; if (subsumed != _kUnmerged) { - std::scoped_lock subsumed_lock(*queue_locks_.at(subsumed)); queue_entries_.at(subsumed)->delayed_tasks = {}; } } @@ -77,10 +71,9 @@ void MessageLoopTaskQueues::DisposeTasks(TaskQueueId queue_id) { void MessageLoopTaskQueues::RegisterTask(TaskQueueId queue_id, const fml::closure& task, fml::TimePoint target_time) { - std::scoped_lock queue_lock(GetMutex(queue_id)); - + std::lock_guard guard(queue_mutex_); size_t order = order_++; - const auto& queue_entry = queue_entries_[queue_id]; + const auto& queue_entry = queue_entries_.at(queue_id); queue_entry->delayed_tasks.push({order, task, target_time}); TaskQueueId loop_to_wake = queue_id; if (queue_entry->subsumed_by != _kUnmerged) { @@ -91,8 +84,7 @@ void MessageLoopTaskQueues::RegisterTask(TaskQueueId queue_id, } bool MessageLoopTaskQueues::HasPendingTasks(TaskQueueId queue_id) const { - std::scoped_lock queue_lock(GetMutex(queue_id)); - + std::lock_guard guard(queue_mutex_); return HasPendingTasksUnlocked(queue_id); } @@ -100,8 +92,7 @@ void MessageLoopTaskQueues::GetTasksToRunNow( TaskQueueId queue_id, FlushType type, std::vector& invocations) { - std::scoped_lock queue_lock(GetMutex(queue_id)); - + std::lock_guard guard(queue_mutex_); if (!HasPendingTasksUnlocked(queue_id)) { return; } @@ -115,7 +106,7 @@ void MessageLoopTaskQueues::GetTasksToRunNow( break; } invocations.emplace_back(std::move(top.GetTask())); - queue_entries_[top_queue]->delayed_tasks.pop(); + queue_entries_.at(top_queue)->delayed_tasks.pop(); if (type == FlushType::kSingle) { break; } @@ -136,8 +127,7 @@ void MessageLoopTaskQueues::WakeUpUnlocked(TaskQueueId queue_id, } size_t MessageLoopTaskQueues::GetNumPendingTasks(TaskQueueId queue_id) const { - std::scoped_lock queue_lock(GetMutex(queue_id)); - + std::lock_guard guard(queue_mutex_); const auto& queue_entry = queue_entries_.at(queue_id); if (queue_entry->subsumed_by != _kUnmerged) { return 0; @@ -148,7 +138,6 @@ size_t MessageLoopTaskQueues::GetNumPendingTasks(TaskQueueId queue_id) const { TaskQueueId subsumed = queue_entry->owner_of; if (subsumed != _kUnmerged) { - std::scoped_lock subsumed_lock(*queue_locks_.at(subsumed)); const auto& subsumed_entry = queue_entries_.at(subsumed); total_tasks += subsumed_entry->delayed_tasks.size(); } @@ -158,22 +147,20 @@ size_t MessageLoopTaskQueues::GetNumPendingTasks(TaskQueueId queue_id) const { void MessageLoopTaskQueues::AddTaskObserver(TaskQueueId queue_id, intptr_t key, const fml::closure& callback) { - std::scoped_lock queue_lock(GetMutex(queue_id)); - + std::lock_guard guard(queue_mutex_); FML_DCHECK(callback != nullptr) << "Observer callback must be non-null."; - queue_entries_[queue_id]->task_observers[key] = std::move(callback); + queue_entries_.at(queue_id)->task_observers[key] = std::move(callback); } void MessageLoopTaskQueues::RemoveTaskObserver(TaskQueueId queue_id, intptr_t key) { - std::scoped_lock queue_lock(GetMutex(queue_id)); - - queue_entries_[queue_id]->task_observers.erase(key); + std::lock_guard guard(queue_mutex_); + queue_entries_.at(queue_id)->task_observers.erase(key); } std::vector MessageLoopTaskQueues::GetObserversToNotify( TaskQueueId queue_id) const { - std::scoped_lock queue_lock(GetMutex(queue_id)); + std::lock_guard guard(queue_mutex_); std::vector observers; if (queue_entries_.at(queue_id)->subsumed_by != _kUnmerged) { @@ -186,7 +173,6 @@ std::vector MessageLoopTaskQueues::GetObserversToNotify( TaskQueueId subsumed = queue_entries_.at(queue_id)->owner_of; if (subsumed != _kUnmerged) { - std::scoped_lock subsumed_lock(*queue_locks_.at(subsumed)); for (const auto& observer : queue_entries_.at(subsumed)->task_observers) { observers.push_back(observer.second); } @@ -197,9 +183,8 @@ std::vector MessageLoopTaskQueues::GetObserversToNotify( void MessageLoopTaskQueues::SetWakeable(TaskQueueId queue_id, fml::Wakeable* wakeable) { - std::scoped_lock queue_lock(GetMutex(queue_id)); - - FML_CHECK(!queue_entries_[queue_id]->wakeable) + std::lock_guard guard(queue_mutex_); + FML_CHECK(!queue_entries_.at(queue_id)->wakeable) << "Wakeable can only be set once."; queue_entries_.at(queue_id)->wakeable = wakeable; } @@ -208,12 +193,7 @@ bool MessageLoopTaskQueues::Merge(TaskQueueId owner, TaskQueueId subsumed) { if (owner == subsumed) { return true; } - - std::mutex& owner_mutex = GetMutex(owner); - std::mutex& subsumed_mutex = GetMutex(subsumed); - - std::scoped_lock lock(owner_mutex, subsumed_mutex); - + std::lock_guard guard(queue_mutex_); auto& owner_entry = queue_entries_.at(owner); auto& subsumed_entry = queue_entries_.at(subsumed); @@ -242,15 +222,14 @@ bool MessageLoopTaskQueues::Merge(TaskQueueId owner, TaskQueueId subsumed) { } bool MessageLoopTaskQueues::Unmerge(TaskQueueId owner) { - std::scoped_lock owner_lock(GetMutex(owner)); - - auto& owner_entry = queue_entries_[owner]; + std::lock_guard guard(queue_mutex_); + const auto& owner_entry = queue_entries_.at(owner); const TaskQueueId subsumed = owner_entry->owner_of; if (subsumed == _kUnmerged) { return false; } - queue_entries_[subsumed]->subsumed_by = _kUnmerged; + queue_entries_.at(subsumed)->subsumed_by = _kUnmerged; owner_entry->owner_of = _kUnmerged; if (HasPendingTasksUnlocked(owner)) { @@ -266,17 +245,10 @@ bool MessageLoopTaskQueues::Unmerge(TaskQueueId owner) { bool MessageLoopTaskQueues::Owns(TaskQueueId owner, TaskQueueId subsumed) const { - std::scoped_lock owner_lock(GetMutex(owner)); + std::lock_guard guard(queue_mutex_); return subsumed == queue_entries_.at(owner)->owner_of || owner == subsumed; } -std::mutex& MessageLoopTaskQueues::GetMutex(TaskQueueId queue_id) const { - fml::SharedLock queue_reader(*queue_meta_mutex_); - FML_DCHECK(queue_locks_.count(queue_id) && queue_entries_.count(queue_id)) - << "Trying to acquire a lock on an invalid queue_id: " << queue_id; - return *queue_locks_.at(queue_id); -} - // Subsumed queues will never have pending tasks. // Owning queues will consider both their and their subsumed tasks. bool MessageLoopTaskQueues::HasPendingTasksUnlocked( diff --git a/fml/message_loop_task_queues.h b/fml/message_loop_task_queues.h index 67c46991a6f13..8c1aa3121506f 100644 --- a/fml/message_loop_task_queues.h +++ b/fml/message_loop_task_queues.h @@ -127,16 +127,12 @@ class MessageLoopTaskQueues private: class MergedQueuesRunner; - using Mutexes = std::vector>; - MessageLoopTaskQueues(); ~MessageLoopTaskQueues(); void WakeUpUnlocked(TaskQueueId queue_id, fml::TimePoint time) const; - std::mutex& GetMutex(TaskQueueId queue_id) const; - bool HasPendingTasksUnlocked(TaskQueueId queue_id) const; const DelayedTask& PeekNextTaskUnlocked(TaskQueueId queue_id, @@ -147,9 +143,8 @@ class MessageLoopTaskQueues static std::mutex creation_mutex_; static fml::RefPtr instance_; - std::unique_ptr queue_meta_mutex_; + mutable std::mutex queue_mutex_; std::map> queue_entries_; - std::map> queue_locks_; size_t task_queue_id_counter_;