From d91702414d3b0765f93a286e74338aa8389b9d3a Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Fri, 21 Jun 2019 13:12:32 -0700 Subject: [PATCH 1/9] Add the functionality to merge and unmerge MessageLoopTaskQueues This introduces a notion of a "owning" and "subsumed" queue ids. Owning queue will take care of the tasks submitted to both that and it's subsumed queue. - The tasks submitted still maintain the queue affinity - Same for the task observers - Also adds MergedQueuesRunner which grabs both the locks owner and subsumed queues in RAII fashion. --- .gitignore | 1 + fml/BUILD.gn | 2 + fml/merged_queues_runner.cc | 55 +++++ fml/message_loop_task_queues.cc | 176 +++++++++++++-- fml/message_loop_task_queues.h | 32 +++ ...oop_task_queues_merge_unmerge_unittests.cc | 210 ++++++++++++++++++ fml/message_loop_task_queues_unittests.cc | 16 -- 7 files changed, 459 insertions(+), 33 deletions(-) create mode 100644 fml/merged_queues_runner.cc create mode 100644 fml/message_loop_task_queues_merge_unmerge_unittests.cc diff --git a/.gitignore b/.gitignore index 870f951370586..97b22719c9a1c 100644 --- a/.gitignore +++ b/.gitignore @@ -21,3 +21,4 @@ Thumbs.db .idea pubspec.lock .vscode/ +compile_commands.json diff --git a/fml/BUILD.gn b/fml/BUILD.gn index dbe4d49b00f61..fb8fcbc42cd9f 100644 --- a/fml/BUILD.gn +++ b/fml/BUILD.gn @@ -40,6 +40,7 @@ source_set("fml") { "memory/weak_ptr.h", "memory/weak_ptr_internal.cc", "memory/weak_ptr_internal.h", + "merged_queues_runner.cc", "message.cc", "message.h", "message_loop.cc", @@ -201,6 +202,7 @@ executable("fml_unittests") { "file_unittest.cc", "memory/ref_counted_unittest.cc", "memory/weak_ptr_unittest.cc", + "message_loop_task_queues_merge_unmerge_unittests.cc", "message_loop_task_queues_unittests.cc", "message_loop_unittests.cc", "message_unittests.cc", diff --git a/fml/merged_queues_runner.cc b/fml/merged_queues_runner.cc new file mode 100644 index 0000000000000..db8901a92f0f8 --- /dev/null +++ b/fml/merged_queues_runner.cc @@ -0,0 +1,55 @@ +// Copyright 2013 The Flutter Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#define FML_USED_ON_EMBEDDER + +#include "flutter/fml/message_loop_task_queues.h" + +namespace fml { + +// RAII class for managing merged locks. +class MessageLoopTaskQueues::MergedQueuesRunner { + public: + // TODO (kaushikiska): refactor mutexes out side of MessageLoopTaskQueues + // for better DI. + MergedQueuesRunner(MessageLoopTaskQueues* task_queues, + TaskQueueId owner, + MutexType type = MutexType::kTasks) + : owner_(owner), task_queues_(task_queues), type_(type) { + task_queues_->GetMutex(owner, type).lock(); + subsumed_ = task_queues->owner_to_subsumed_[owner]; + if (isMerged(subsumed_)) { + task_queues_->GetMutex(subsumed_, type).lock(); + } + } + + // First invokes on owner and then subsumed (if present). + void InvokeMerged(std::function closure) { + closure(owner_); + if (isMerged(subsumed_)) { + closure(subsumed_); + } + } + + ~MergedQueuesRunner() { + if (isMerged(subsumed_)) { + task_queues_->GetMutex(subsumed_, type_).unlock(); + } + task_queues_->GetMutex(owner_, type_).unlock(); + } + + private: + bool isMerged(TaskQueueId queue_id) { + return queue_id != MessageLoopTaskQueues::_kUnmerged; + } + + const TaskQueueId owner_; + TaskQueueId subsumed_; + MessageLoopTaskQueues* task_queues_; + const MutexType type_; + + FML_DISALLOW_COPY_ASSIGN_AND_MOVE(MergedQueuesRunner); +}; + +} // namespace fml diff --git a/fml/message_loop_task_queues.cc b/fml/message_loop_task_queues.cc index ab641eff68828..187cdfecc7337 100644 --- a/fml/message_loop_task_queues.cc +++ b/fml/message_loop_task_queues.cc @@ -5,11 +5,13 @@ #define FML_USED_ON_EMBEDDER #include "flutter/fml/message_loop_task_queues.h" +#include "flutter/fml/merged_queues_runner.cc" #include "flutter/fml/message_loop_impl.h" namespace fml { std::mutex MessageLoopTaskQueues::creation_mutex_; +const size_t MessageLoopTaskQueues::_kUnmerged; fml::RefPtr MessageLoopTaskQueues::instance_; fml::RefPtr MessageLoopTaskQueues::GetInstance() { @@ -33,6 +35,9 @@ TaskQueueId MessageLoopTaskQueues::CreateTaskQueue() { delayed_tasks_.push_back(DelayedTaskQueue()); wakeables_.push_back(NULL); + owner_to_subsumed_.push_back(_kUnmerged); + subsumed_to_owner_.push_back(_kUnmerged); + return loop_id; } @@ -42,8 +47,9 @@ MessageLoopTaskQueues::MessageLoopTaskQueues() MessageLoopTaskQueues::~MessageLoopTaskQueues() = default; void MessageLoopTaskQueues::Dispose(TaskQueueId queue_id) { - std::scoped_lock lock(GetMutex(queue_id, MutexType::kTasks)); - delayed_tasks_[queue_id] = {}; + MergedQueuesRunner merged_tasks = MergedQueuesRunner(this, queue_id); + merged_tasks.InvokeMerged( + [&](TaskQueueId queue_id) { delayed_tasks_[queue_id] = {}; }); } void MessageLoopTaskQueues::RegisterTask(TaskQueueId queue_id, @@ -56,35 +62,39 @@ void MessageLoopTaskQueues::RegisterTask(TaskQueueId queue_id, } bool MessageLoopTaskQueues::HasPendingTasks(TaskQueueId queue_id) { - std::scoped_lock lock(GetMutex(queue_id, MutexType::kTasks)); - return !delayed_tasks_[queue_id].empty(); + MergedQueuesRunner merged_tasks = MergedQueuesRunner(this, queue_id); + return HasPendingTasksUnlocked(queue_id); } void MessageLoopTaskQueues::GetTasksToRunNow( TaskQueueId queue_id, FlushType type, std::vector& invocations) { - std::scoped_lock lock(GetMutex(queue_id, MutexType::kTasks)); + MergedQueuesRunner merged_tasks = MergedQueuesRunner(this, queue_id); + + if (!HasPendingTasksUnlocked(queue_id)) { + return; + } const auto now = fml::TimePoint::Now(); - DelayedTaskQueue& tasks = delayed_tasks_[queue_id]; - while (!tasks.empty()) { - const auto& top = tasks.top(); + while (HasPendingTasksUnlocked(queue_id)) { + TaskQueueId top_queue; + const auto& top = PeekNextTaskUnlocked(queue_id, top_queue); if (top.GetTargetTime() > now) { break; } invocations.emplace_back(std::move(top.GetTask())); - tasks.pop(); + delayed_tasks_[top_queue].pop(); if (type == FlushType::kSingle) { break; } } - if (tasks.empty()) { + if (!HasPendingTasksUnlocked(queue_id)) { WakeUp(queue_id, fml::TimePoint::Max()); } else { - WakeUp(queue_id, tasks.top().GetTargetTime()); + WakeUp(queue_id, GetNextWakeTimeUnlocked(queue_id)); } } @@ -96,8 +106,14 @@ void MessageLoopTaskQueues::WakeUp(TaskQueueId queue_id, fml::TimePoint time) { } size_t MessageLoopTaskQueues::GetNumPendingTasks(TaskQueueId queue_id) { - std::scoped_lock lock(GetMutex(queue_id, MutexType::kTasks)); - return delayed_tasks_[queue_id].size(); + MergedQueuesRunner merged_tasks = MergedQueuesRunner(this, queue_id); + if (subsumed_to_owner_[queue_id] != _kUnmerged) { + return 0; + } + size_t total_tasks = 0; + merged_tasks.InvokeMerged( + [&](TaskQueueId queue) { total_tasks += delayed_tasks_[queue].size(); }); + return total_tasks; } void MessageLoopTaskQueues::AddTaskObserver(TaskQueueId queue_id, @@ -114,10 +130,14 @@ void MessageLoopTaskQueues::RemoveTaskObserver(TaskQueueId queue_id, } void MessageLoopTaskQueues::NotifyObservers(TaskQueueId queue_id) { - std::scoped_lock lock(GetMutex(queue_id, MutexType::kObservers)); - for (const auto& observer : task_observers_[queue_id]) { - observer.second(); - } + MergedQueuesRunner merged_observers = + MergedQueuesRunner(this, queue_id, MutexType::kObservers); + + merged_observers.InvokeMerged([&](TaskQueueId queue) { + for (const auto& observer : task_observers_[queue]) { + observer.second(); + } + }); } // Thread safety analysis disabled as it does not account for defered locks. @@ -140,9 +160,131 @@ void MessageLoopTaskQueues::Swap(TaskQueueId primary, TaskQueueId secondary) void MessageLoopTaskQueues::SetWakeable(TaskQueueId queue_id, fml::Wakeable* wakeable) { std::scoped_lock lock(GetMutex(queue_id, MutexType::kWakeables)); + FML_CHECK(!wakeables_[queue_id]) << "Wakeable can only be set once."; wakeables_[queue_id] = wakeable; } +bool MessageLoopTaskQueues::Merge(TaskQueueId owner, TaskQueueId subsumed) { + // task_observers locks + std::mutex& o1 = GetMutex(owner, MutexType::kObservers); + std::mutex& o2 = GetMutex(subsumed, MutexType::kObservers); + + // delayed_tasks locks + std::mutex& t1 = GetMutex(owner, MutexType::kTasks); + std::mutex& t2 = GetMutex(subsumed, MutexType::kTasks); + + std::scoped_lock(o1, o2, t1, t2); + + if (owner == subsumed) { + return true; + } + + if (owner_to_subsumed_[owner] == subsumed) { + return true; + } + + std::vector owner_subsumed_keys = { + owner_to_subsumed_[owner], owner_to_subsumed_[subsumed], + subsumed_to_owner_[owner], subsumed_to_owner_[subsumed]}; + + for (auto key : owner_subsumed_keys) { + if (key != _kUnmerged) { + return false; + } + } + + owner_to_subsumed_[owner] = subsumed; + subsumed_to_owner_[subsumed] = owner; + + if (HasPendingTasksUnlocked(owner)) { + WakeUp(owner, GetNextWakeTimeUnlocked(owner)); + } + + return true; +} + +bool MessageLoopTaskQueues::Unmerge(TaskQueueId owner) { + // task_observers locks + std::mutex& o = GetMutex(owner, MutexType::kObservers); + + // delayed_tasks locks + std::mutex& t = GetMutex(owner, MutexType::kTasks); + + std::scoped_lock(o, t); + + if (owner_to_subsumed_[owner] == _kUnmerged) { + return false; + } + + const size_t subsumed = owner_to_subsumed_[owner]; + subsumed_to_owner_[subsumed] = _kUnmerged; + owner_to_subsumed_[owner] = _kUnmerged; + + if (HasPendingTasksUnlocked(owner)) { + WakeUp(owner, GetNextWakeTimeUnlocked(owner)); + } + + if (HasPendingTasksUnlocked(subsumed)) { + WakeUp(subsumed, GetNextWakeTimeUnlocked(subsumed)); + } + + return true; +} + +// Subsumed queues will never have pending tasks. +// Owning queues will consider both their and their subsumed tasks. +bool MessageLoopTaskQueues::HasPendingTasksUnlocked(TaskQueueId queue_id) { + if (subsumed_to_owner_[queue_id] != _kUnmerged) { + return false; + } + + if (!delayed_tasks_[queue_id].empty()) { + return true; + } + + const size_t subsumed = owner_to_subsumed_[queue_id]; + if (subsumed == _kUnmerged) { + // this is not an owner and queue is empty. + return false; + } else { + return !delayed_tasks_[subsumed].empty(); + } +} + +fml::TimePoint MessageLoopTaskQueues::GetNextWakeTimeUnlocked( + TaskQueueId queue_id) { + TaskQueueId tmp; + return PeekNextTaskUnlocked(queue_id, tmp).GetTargetTime(); +} + +const DelayedTask& MessageLoopTaskQueues::PeekNextTaskUnlocked( + TaskQueueId owner, + TaskQueueId& top_queue_id) { + FML_DCHECK(HasPendingTasksUnlocked(owner)); + const size_t subsumed = owner_to_subsumed_[owner]; + if (subsumed == _kUnmerged) { + top_queue_id = owner; + return delayed_tasks_[owner].top(); + } + // we are owning another task queue + const bool subsumed_has_task = !delayed_tasks_[subsumed].empty(); + const bool owner_has_task = !delayed_tasks_[owner].empty(); + if (owner_has_task && subsumed_has_task) { + const auto owner_task = delayed_tasks_[owner].top(); + const auto subsumed_task = delayed_tasks_[subsumed].top(); + if (owner_task > subsumed_task) { + top_queue_id = subsumed; + } else { + top_queue_id = owner; + } + } else if (owner_has_task) { + top_queue_id = owner; + } else { + top_queue_id = subsumed; + } + return delayed_tasks_[top_queue_id].top(); +} + std::mutex& MessageLoopTaskQueues::GetMutex(TaskQueueId queue_id, MutexType type) { std::scoped_lock lock(queue_meta_mutex_); diff --git a/fml/message_loop_task_queues.h b/fml/message_loop_task_queues.h index f86ad9be51657..e69adcc3569ea 100644 --- a/fml/message_loop_task_queues.h +++ b/fml/message_loop_task_queues.h @@ -69,7 +69,27 @@ class MessageLoopTaskQueues void SetWakeable(TaskQueueId queue_id, fml::Wakeable* wakeable); + // Invariants for merge and un-merge + // 1. RegisterTask will always submit to the queue_id that is passed + // to it. It is not aware of whether a queue is merged or not. Same with + // task observers. + // 2. When we get the tasks to run now, we look at both the queue_ids + // for the owner, subsumed will spin. + // 3. Each task queue can only be merged and subsumed once. + // + // Methods currently aware of the merged state of the queues: + // HasPendingTasks, GetTasksToRunNow, GetNumPendingTasks + + // This method returns false if either the owner or subsumed has already been + // merged with something else. + bool Merge(TaskQueueId owner, TaskQueueId subsumed); + + // Will return false if the owner has not been merged before. + bool Unmerge(TaskQueueId owner); + private: + class MergedQueuesRunner; + enum class MutexType { kTasks, kObservers, @@ -85,6 +105,13 @@ class MessageLoopTaskQueues void WakeUp(TaskQueueId queue_id, fml::TimePoint time); + bool HasPendingTasksUnlocked(TaskQueueId queue_id); + + const DelayedTask& PeekNextTaskUnlocked(TaskQueueId queue_id, + TaskQueueId& top_queue_id); + + fml::TimePoint GetNextWakeTimeUnlocked(TaskQueueId queue_id); + std::mutex& GetMutex(TaskQueueId queue_id, MutexType type); static std::mutex creation_mutex_; @@ -104,6 +131,11 @@ class MessageLoopTaskQueues std::vector task_observers_; std::vector delayed_tasks_; + static const size_t _kUnmerged = ULONG_MAX; + // These are guarded by delayed_tasks_mutexes_ + std::vector owner_to_subsumed_; + std::vector subsumed_to_owner_; + std::atomic_int order_; FML_FRIEND_MAKE_REF_COUNTED(MessageLoopTaskQueues); diff --git a/fml/message_loop_task_queues_merge_unmerge_unittests.cc b/fml/message_loop_task_queues_merge_unmerge_unittests.cc new file mode 100644 index 0000000000000..45f50d11c524f --- /dev/null +++ b/fml/message_loop_task_queues_merge_unmerge_unittests.cc @@ -0,0 +1,210 @@ +// Copyright 2013 The Flutter Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#define FML_USED_ON_EMBEDDER + +#include + +#include "flutter/fml/message_loop_task_queues.h" +#include "flutter/fml/synchronization/count_down_latch.h" +#include "flutter/fml/synchronization/waitable_event.h" +#include "gtest/gtest.h" + +class TestWakeable : public fml::Wakeable { + public: + using WakeUpCall = std::function; + + TestWakeable(WakeUpCall call) : wake_up_call_(call) {} + + void WakeUp(fml::TimePoint time_point) override { wake_up_call_(time_point); } + + private: + WakeUpCall wake_up_call_; +}; + +TEST(MessageLoopTaskQueueMergeUnmerge, + AfterMergePrimaryTasksServicedOnPrimary) { + auto task_queue = fml::MessageLoopTaskQueues::GetInstance(); + + auto queue_id_1 = task_queue->CreateTaskQueue(); + auto queue_id_2 = task_queue->CreateTaskQueue(); + + task_queue->RegisterTask( + queue_id_1, []() {}, fml::TimePoint::Now()); + ASSERT_EQ(1u, task_queue->GetNumPendingTasks(queue_id_1)); + + task_queue->Merge(queue_id_1, queue_id_2); + task_queue->RegisterTask( + queue_id_1, []() {}, fml::TimePoint::Now()); + + ASSERT_EQ(2u, task_queue->GetNumPendingTasks(queue_id_1)); + ASSERT_EQ(0u, task_queue->GetNumPendingTasks(queue_id_2)); +} + +TEST(MessageLoopTaskQueueMergeUnmerge, + AfterMergeSecondaryTasksAlsoServicedOnPrimary) { + auto task_queue = fml::MessageLoopTaskQueues::GetInstance(); + + auto queue_id_1 = task_queue->CreateTaskQueue(); + auto queue_id_2 = task_queue->CreateTaskQueue(); + + task_queue->RegisterTask( + queue_id_2, []() {}, fml::TimePoint::Now()); + ASSERT_EQ(1u, task_queue->GetNumPendingTasks(queue_id_2)); + + task_queue->Merge(queue_id_1, queue_id_2); + ASSERT_EQ(1u, task_queue->GetNumPendingTasks(queue_id_1)); + ASSERT_EQ(0u, task_queue->GetNumPendingTasks(queue_id_2)); +} + +TEST(MessageLoopTaskQueueMergeUnmerge, MergeUnmergeTasksPreserved) { + auto task_queue = fml::MessageLoopTaskQueues::GetInstance(); + + auto queue_id_1 = task_queue->CreateTaskQueue(); + auto queue_id_2 = task_queue->CreateTaskQueue(); + + task_queue->RegisterTask( + queue_id_1, []() {}, fml::TimePoint::Now()); + task_queue->RegisterTask( + queue_id_2, []() {}, fml::TimePoint::Now()); + + ASSERT_EQ(1u, task_queue->GetNumPendingTasks(queue_id_1)); + ASSERT_EQ(1u, task_queue->GetNumPendingTasks(queue_id_2)); + + task_queue->Merge(queue_id_1, queue_id_2); + + ASSERT_EQ(2u, task_queue->GetNumPendingTasks(queue_id_1)); + ASSERT_EQ(0u, task_queue->GetNumPendingTasks(queue_id_2)); + + task_queue->Unmerge(queue_id_1); + + ASSERT_EQ(1u, task_queue->GetNumPendingTasks(queue_id_1)); + ASSERT_EQ(1u, task_queue->GetNumPendingTasks(queue_id_2)); +} + +TEST(MessageLoopTaskQueueMergeUnmerge, MergeFailIfAlreadyMergedOrSubsumed) { + auto task_queue = fml::MessageLoopTaskQueues::GetInstance(); + + auto queue_id_1 = task_queue->CreateTaskQueue(); + auto queue_id_2 = task_queue->CreateTaskQueue(); + auto queue_id_3 = task_queue->CreateTaskQueue(); + + task_queue->Merge(queue_id_1, queue_id_2); + + ASSERT_FALSE(task_queue->Merge(queue_id_1, queue_id_3)); + ASSERT_FALSE(task_queue->Merge(queue_id_2, queue_id_3)); +} + +TEST(MessageLoopTaskQueueMergeUnmerge, UnmergeFailsOnSubsumed) { + auto task_queue = fml::MessageLoopTaskQueues::GetInstance(); + + auto queue_id_1 = task_queue->CreateTaskQueue(); + auto queue_id_2 = task_queue->CreateTaskQueue(); + + task_queue->Merge(queue_id_1, queue_id_2); + + ASSERT_FALSE(task_queue->Unmerge(queue_id_2)); +} + +TEST(MessageLoopTaskQueueMergeUnmerge, MergeInvokesBothWakeables) { + auto task_queue = fml::MessageLoopTaskQueues::GetInstance(); + + auto queue_id_1 = task_queue->CreateTaskQueue(); + auto queue_id_2 = task_queue->CreateTaskQueue(); + + fml::CountDownLatch latch(2); + + task_queue->SetWakeable( + queue_id_1, + new TestWakeable([&](fml::TimePoint wake_time) { latch.CountDown(); })); + task_queue->SetWakeable( + queue_id_2, + new TestWakeable([&](fml::TimePoint wake_time) { latch.CountDown(); })); + + task_queue->RegisterTask( + queue_id_1, []() {}, fml::TimePoint::Now()); + + task_queue->Merge(queue_id_1, queue_id_2); + + std::vector invocations; + task_queue->GetTasksToRunNow(queue_id_1, fml::FlushType::kAll, invocations); + + latch.Wait(); +} + +TEST(MessageLoopTaskQueueMergeUnmerge, + MergeUnmergeInvokesBothWakeablesSeparately) { + auto task_queue = fml::MessageLoopTaskQueues::GetInstance(); + + auto queue_id_1 = task_queue->CreateTaskQueue(); + auto queue_id_2 = task_queue->CreateTaskQueue(); + + fml::AutoResetWaitableEvent latch_1, latch_2; + + task_queue->SetWakeable( + queue_id_1, + new TestWakeable([&](fml::TimePoint wake_time) { latch_1.Signal(); })); + task_queue->SetWakeable( + queue_id_2, + new TestWakeable([&](fml::TimePoint wake_time) { latch_2.Signal(); })); + + task_queue->RegisterTask( + queue_id_1, []() {}, fml::TimePoint::Now()); + task_queue->RegisterTask( + queue_id_2, []() {}, fml::TimePoint::Now()); + + task_queue->Merge(queue_id_1, queue_id_2); + task_queue->Unmerge(queue_id_1); + + std::vector invocations; + + task_queue->GetTasksToRunNow(queue_id_1, fml::FlushType::kAll, invocations); + latch_1.Wait(); + + task_queue->GetTasksToRunNow(queue_id_2, fml::FlushType::kAll, invocations); + latch_2.Wait(); +} + +TEST(MessageLoopTaskQueueMergeUnmerge, GetTasksToRunNowBlocksMerge) { + auto task_queue = fml::MessageLoopTaskQueues::GetInstance(); + + auto queue_id_1 = task_queue->CreateTaskQueue(); + auto queue_id_2 = task_queue->CreateTaskQueue(); + + fml::AutoResetWaitableEvent wake_up_start, wake_up_end, merge_start, + merge_end; + + task_queue->RegisterTask( + queue_id_1, []() {}, fml::TimePoint::Now()); + task_queue->SetWakeable(queue_id_1, + new TestWakeable([&](fml::TimePoint wake_time) { + wake_up_start.Signal(); + wake_up_end.Wait(); + })); + + std::thread tasks_to_run_now_thread([&]() { + std::vector invocations; + task_queue->GetTasksToRunNow(queue_id_1, fml::FlushType::kAll, invocations); + }); + + wake_up_start.Wait(); + bool merge_done = false; + + std::thread merge_thread([&]() { + merge_start.Signal(); + task_queue->Merge(queue_id_1, queue_id_2); + merge_done = true; + merge_end.Signal(); + }); + + merge_start.Wait(); + ASSERT_FALSE(merge_done); + wake_up_end.Signal(); + + merge_end.Wait(); + ASSERT_TRUE(merge_done); + + tasks_to_run_now_thread.join(); + merge_thread.join(); +} diff --git a/fml/message_loop_task_queues_unittests.cc b/fml/message_loop_task_queues_unittests.cc index 49b09a49695e3..8b83f22c9cc32 100644 --- a/fml/message_loop_task_queues_unittests.cc +++ b/fml/message_loop_task_queues_unittests.cc @@ -113,22 +113,6 @@ TEST(MessageLoopTaskQueue, WakeUpIndependentOfTime) { ASSERT_TRUE(num_wakes == 2); } -TEST(MessageLoopTaskQueue, WakeUpWithMaxIfNoInvocations) { - auto task_queue = fml::MessageLoopTaskQueues::GetInstance(); - auto queue_id = task_queue->CreateTaskQueue(); - fml::AutoResetWaitableEvent ev; - - task_queue->SetWakeable(queue_id, - new TestWakeable([&ev](fml::TimePoint wake_time) { - ASSERT_TRUE(wake_time == fml::TimePoint::Max()); - ev.Signal(); - })); - - std::vector invocations; - task_queue->GetTasksToRunNow(queue_id, fml::FlushType::kAll, invocations); - ev.Wait(); -} - TEST(MessageLoopTaskQueue, WokenUpWithNewerTime) { auto task_queue = fml::MessageLoopTaskQueues::GetInstance(); auto queue_id = task_queue->CreateTaskQueue(); From 3edf1a91f001b80c58ae36b8c47727add8beb4da Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Fri, 21 Jun 2019 16:45:51 -0700 Subject: [PATCH 2/9] fix licenses --- ci/licenses_golden/licenses_flutter | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ci/licenses_golden/licenses_flutter b/ci/licenses_golden/licenses_flutter index d877f2acf507a..e23432d3552ed 100644 --- a/ci/licenses_golden/licenses_flutter +++ b/ci/licenses_golden/licenses_flutter @@ -126,6 +126,7 @@ FILE: ../../../flutter/fml/memory/weak_ptr.h FILE: ../../../flutter/fml/memory/weak_ptr_internal.cc FILE: ../../../flutter/fml/memory/weak_ptr_internal.h FILE: ../../../flutter/fml/memory/weak_ptr_unittest.cc +FILE: ../../../flutter/fml/merged_queues_runner.cc FILE: ../../../flutter/fml/message.cc FILE: ../../../flutter/fml/message.h FILE: ../../../flutter/fml/message_loop.cc @@ -135,6 +136,7 @@ FILE: ../../../flutter/fml/message_loop_impl.h FILE: ../../../flutter/fml/message_loop_task_queues.cc FILE: ../../../flutter/fml/message_loop_task_queues.h FILE: ../../../flutter/fml/message_loop_task_queues_benchmark.cc +FILE: ../../../flutter/fml/message_loop_task_queues_merge_unmerge_unittests.cc FILE: ../../../flutter/fml/message_loop_task_queues_unittests.cc FILE: ../../../flutter/fml/message_loop_unittests.cc FILE: ../../../flutter/fml/message_unittests.cc From 3930258a797011a9204576ad8540d1fab1ba8507 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Wed, 3 Jul 2019 10:24:50 -0700 Subject: [PATCH 3/9] Expose TaskQueueId from TaskRunner interface - Also use task queue id to verify if we are running in the same thread. - This is to enable merging the backed message loop task queues to enable dynamic thread merging in IOS. --- fml/concurrent_message_loop.cc | 6 ++++++ fml/concurrent_message_loop.h | 3 +++ fml/message_loop_impl.cc | 4 ++++ fml/message_loop_impl.h | 2 ++ fml/task_runner.cc | 9 ++++++++- fml/task_runner.h | 3 +++ 6 files changed, 26 insertions(+), 1 deletion(-) diff --git a/fml/concurrent_message_loop.cc b/fml/concurrent_message_loop.cc index 0c25db27619fe..437e9bcee4aeb 100644 --- a/fml/concurrent_message_loop.cc +++ b/fml/concurrent_message_loop.cc @@ -67,4 +67,10 @@ void ConcurrentMessageLoop::WorkerMain() { shutdown_latch_.CountDown(); } +TaskQueueId ConcurrentMessageLoop::GetTaskQueueId() const { + FML_CHECK(false) + << "ConcurrentMessageLoop is not backed by MessageLoopTaskQueues."; + return 0; +} + } // namespace fml diff --git a/fml/concurrent_message_loop.h b/fml/concurrent_message_loop.h index 7879d05239f83..5e3e4883cfdfa 100644 --- a/fml/concurrent_message_loop.h +++ b/fml/concurrent_message_loop.h @@ -41,6 +41,9 @@ class ConcurrentMessageLoop : public MessageLoopImpl { // |fml::MessageLoopImpl| void WakeUp(fml::TimePoint time_point) override; + // |fml::MessageLoopImpl| + TaskQueueId GetTaskQueueId() const override; + static void WorkerMain(ConcurrentMessageLoop* loop); void WorkerMain(); diff --git a/fml/message_loop_impl.cc b/fml/message_loop_impl.cc index 5adf8ddac5fe3..6181278a67138 100644 --- a/fml/message_loop_impl.cc +++ b/fml/message_loop_impl.cc @@ -148,4 +148,8 @@ void MessageLoopImpl::RunSingleExpiredTaskNow() { FlushTasks(FlushType::kSingle); } +TaskQueueId MessageLoopImpl::GetTaskQueueId() const { + return queue_id_; +} + } // namespace fml diff --git a/fml/message_loop_impl.h b/fml/message_loop_impl.h index 0b89cf82a43d9..a84208d636d1f 100644 --- a/fml/message_loop_impl.h +++ b/fml/message_loop_impl.h @@ -45,6 +45,8 @@ class MessageLoopImpl : public Wakeable, void DoTerminate(); + virtual TaskQueueId GetTaskQueueId() const; + void SwapTaskQueues(const fml::RefPtr& other); protected: diff --git a/fml/task_runner.cc b/fml/task_runner.cc index 2c4cfe4b638a2..7d59b3cac1f2c 100644 --- a/fml/task_runner.cc +++ b/fml/task_runner.cc @@ -32,11 +32,18 @@ void TaskRunner::PostDelayedTask(fml::closure task, fml::TimeDelta delay) { loop_->PostTask(std::move(task), fml::TimePoint::Now() + delay); } +TaskQueueId TaskRunner::GetTaskQueueId() { + FML_DCHECK(loop_); + return loop_->GetTaskQueueId(); +} + bool TaskRunner::RunsTasksOnCurrentThread() { if (!fml::MessageLoop::IsInitializedForCurrentThread()) { return false; } - return MessageLoop::GetCurrent().GetLoopImpl() == loop_; + const auto current_loop_id = + MessageLoop::GetCurrent().GetLoopImpl()->GetTaskQueueId(); + return current_loop_id == loop_->GetTaskQueueId(); } void TaskRunner::RunNowOrPostTask(fml::RefPtr runner, diff --git a/fml/task_runner.h b/fml/task_runner.h index 72c4219029f34..b7569058af17f 100644 --- a/fml/task_runner.h +++ b/fml/task_runner.h @@ -9,6 +9,7 @@ #include "flutter/fml/macros.h" #include "flutter/fml/memory/ref_counted.h" #include "flutter/fml/memory/ref_ptr.h" +#include "flutter/fml/message_loop_task_queues.h" #include "flutter/fml/time/time_point.h" namespace fml { @@ -27,6 +28,8 @@ class TaskRunner : public fml::RefCountedThreadSafe { virtual bool RunsTasksOnCurrentThread(); + virtual TaskQueueId GetTaskQueueId(); + static void RunNowOrPostTask(fml::RefPtr runner, fml::closure task); From a33cb8e237519243818309b02500c69c530d0648 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Wed, 10 Jul 2019 12:39:46 -0700 Subject: [PATCH 4/9] Fix wake-up and also add a method to check Owns --- fml/message_loop_task_queues.cc | 11 ++++++++++- fml/message_loop_task_queues.h | 3 +++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/fml/message_loop_task_queues.cc b/fml/message_loop_task_queues.cc index 187cdfecc7337..0424a0e7d7a2e 100644 --- a/fml/message_loop_task_queues.cc +++ b/fml/message_loop_task_queues.cc @@ -58,7 +58,11 @@ void MessageLoopTaskQueues::RegisterTask(TaskQueueId queue_id, std::scoped_lock lock(GetMutex(queue_id, MutexType::kTasks)); size_t order = order_++; delayed_tasks_[queue_id].push({order, std::move(task), target_time}); - WakeUp(queue_id, delayed_tasks_[queue_id].top().GetTargetTime()); + TaskQueueId loop_to_wake = queue_id; + if (subsumed_to_owner_[queue_id] != _kUnmerged) { + loop_to_wake = subsumed_to_owner_[queue_id]; + } + WakeUp(loop_to_wake, delayed_tasks_[queue_id].top().GetTargetTime()); } bool MessageLoopTaskQueues::HasPendingTasks(TaskQueueId queue_id) { @@ -231,6 +235,11 @@ bool MessageLoopTaskQueues::Unmerge(TaskQueueId owner) { return true; } +bool MessageLoopTaskQueues::Owns(TaskQueueId owner, TaskQueueId subsumed) { + MergedQueuesRunner merged_observers = MergedQueuesRunner(this, owner); + return subsumed == owner_to_subsumed_[owner]; +} + // Subsumed queues will never have pending tasks. // Owning queues will consider both their and their subsumed tasks. bool MessageLoopTaskQueues::HasPendingTasksUnlocked(TaskQueueId queue_id) { diff --git a/fml/message_loop_task_queues.h b/fml/message_loop_task_queues.h index e69adcc3569ea..8054dee603dfa 100644 --- a/fml/message_loop_task_queues.h +++ b/fml/message_loop_task_queues.h @@ -87,6 +87,9 @@ class MessageLoopTaskQueues // Will return false if the owner has not been merged before. bool Unmerge(TaskQueueId owner); + // Returns true if owner owns the subsumed task queue. + bool Owns(TaskQueueId owner, TaskQueueId subsumed); + private: class MergedQueuesRunner; From 997cbfc062a8087aaed139043f47efe1f959df19 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Wed, 10 Jul 2019 12:56:13 -0700 Subject: [PATCH 5/9] fix merge conflicts for realz --- fml/concurrent_message_loop.cc | 6 ------ fml/concurrent_message_loop.h | 3 --- 2 files changed, 9 deletions(-) diff --git a/fml/concurrent_message_loop.cc b/fml/concurrent_message_loop.cc index 90329651e3adf..482ceb479018d 100644 --- a/fml/concurrent_message_loop.cc +++ b/fml/concurrent_message_loop.cc @@ -123,10 +123,4 @@ void ConcurrentTaskRunner::PostTask(fml::closure task) { task(); } -TaskQueueId ConcurrentMessageLoop::GetTaskQueueId() const { - FML_CHECK(false) - << "ConcurrentMessageLoop is not backed by MessageLoopTaskQueues."; - return 0; -} - } // namespace fml diff --git a/fml/concurrent_message_loop.h b/fml/concurrent_message_loop.h index 90da5cca02872..0cfd85c90ec0f 100644 --- a/fml/concurrent_message_loop.h +++ b/fml/concurrent_message_loop.h @@ -41,9 +41,6 @@ class ConcurrentMessageLoop std::queue tasks_; bool shutdown_ = false; - // |fml::MessageLoopImpl| - TaskQueueId GetTaskQueueId() const override; - ConcurrentMessageLoop(size_t worker_count); void WorkerMain(); From b897c5a8ac15a98e13c776efd6fbd385f7ae8791 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Fri, 12 Jul 2019 11:05:46 -0700 Subject: [PATCH 6/9] Fix scope of scoped locks and make queue_id a struct --- .gitignore | 1 - fml/merged_queues_runner.cc | 19 ++++++++------ fml/message_loop_task_queues.cc | 45 +++++++++++++++------------------ fml/message_loop_task_queues.h | 16 +++++++++--- 4 files changed, 44 insertions(+), 37 deletions(-) diff --git a/.gitignore b/.gitignore index e04d20b59e12f..225943503b13f 100644 --- a/.gitignore +++ b/.gitignore @@ -21,6 +21,5 @@ Thumbs.db .idea pubspec.lock .vscode/ -compile_commands.json docs/doxygen/ xcuserdata diff --git a/fml/merged_queues_runner.cc b/fml/merged_queues_runner.cc index db8901a92f0f8..6c8e510bdf9bc 100644 --- a/fml/merged_queues_runner.cc +++ b/fml/merged_queues_runner.cc @@ -13,14 +13,17 @@ class MessageLoopTaskQueues::MergedQueuesRunner { public: // TODO (kaushikiska): refactor mutexes out side of MessageLoopTaskQueues // for better DI. - MergedQueuesRunner(MessageLoopTaskQueues* task_queues, + MergedQueuesRunner(MessageLoopTaskQueues& task_queues, TaskQueueId owner, MutexType type = MutexType::kTasks) - : owner_(owner), task_queues_(task_queues), type_(type) { - task_queues_->GetMutex(owner, type).lock(); - subsumed_ = task_queues->owner_to_subsumed_[owner]; + : owner_(owner), + subsumed_(task_queues_._kUnmerged), + task_queues_(task_queues), + type_(type) { + task_queues_.GetMutex(owner, type).lock(); + subsumed_ = task_queues_.owner_to_subsumed_[owner]; if (isMerged(subsumed_)) { - task_queues_->GetMutex(subsumed_, type).lock(); + task_queues_.GetMutex(subsumed_, type).lock(); } } @@ -34,9 +37,9 @@ class MessageLoopTaskQueues::MergedQueuesRunner { ~MergedQueuesRunner() { if (isMerged(subsumed_)) { - task_queues_->GetMutex(subsumed_, type_).unlock(); + task_queues_.GetMutex(subsumed_, type_).unlock(); } - task_queues_->GetMutex(owner_, type_).unlock(); + task_queues_.GetMutex(owner_, type_).unlock(); } private: @@ -46,7 +49,7 @@ class MessageLoopTaskQueues::MergedQueuesRunner { const TaskQueueId owner_; TaskQueueId subsumed_; - MessageLoopTaskQueues* task_queues_; + MessageLoopTaskQueues& task_queues_; const MutexType type_; FML_DISALLOW_COPY_ASSIGN_AND_MOVE(MergedQueuesRunner); diff --git a/fml/message_loop_task_queues.cc b/fml/message_loop_task_queues.cc index 0424a0e7d7a2e..5880ad8b9904f 100644 --- a/fml/message_loop_task_queues.cc +++ b/fml/message_loop_task_queues.cc @@ -11,7 +11,7 @@ namespace fml { std::mutex MessageLoopTaskQueues::creation_mutex_; -const size_t MessageLoopTaskQueues::_kUnmerged; +const TaskQueueId MessageLoopTaskQueues::_kUnmerged = TaskQueueId(ULONG_MAX); fml::RefPtr MessageLoopTaskQueues::instance_; fml::RefPtr MessageLoopTaskQueues::GetInstance() { @@ -24,7 +24,7 @@ fml::RefPtr MessageLoopTaskQueues::GetInstance() { TaskQueueId MessageLoopTaskQueues::CreateTaskQueue() { std::scoped_lock creation(queue_meta_mutex_); - TaskQueueId loop_id = task_queue_id_counter_; + TaskQueueId loop_id = TaskQueueId(task_queue_id_counter_); ++task_queue_id_counter_; observers_mutexes_.push_back(std::make_unique()); @@ -47,7 +47,7 @@ MessageLoopTaskQueues::MessageLoopTaskQueues() MessageLoopTaskQueues::~MessageLoopTaskQueues() = default; void MessageLoopTaskQueues::Dispose(TaskQueueId queue_id) { - MergedQueuesRunner merged_tasks = MergedQueuesRunner(this, queue_id); + MergedQueuesRunner merged_tasks = MergedQueuesRunner(*this, queue_id); merged_tasks.InvokeMerged( [&](TaskQueueId queue_id) { delayed_tasks_[queue_id] = {}; }); } @@ -66,7 +66,7 @@ void MessageLoopTaskQueues::RegisterTask(TaskQueueId queue_id, } bool MessageLoopTaskQueues::HasPendingTasks(TaskQueueId queue_id) { - MergedQueuesRunner merged_tasks = MergedQueuesRunner(this, queue_id); + MergedQueuesRunner merged_tasks = MergedQueuesRunner(*this, queue_id); return HasPendingTasksUnlocked(queue_id); } @@ -74,7 +74,7 @@ void MessageLoopTaskQueues::GetTasksToRunNow( TaskQueueId queue_id, FlushType type, std::vector& invocations) { - MergedQueuesRunner merged_tasks = MergedQueuesRunner(this, queue_id); + MergedQueuesRunner merged_tasks = MergedQueuesRunner(*this, queue_id); if (!HasPendingTasksUnlocked(queue_id)) { return; @@ -83,7 +83,7 @@ void MessageLoopTaskQueues::GetTasksToRunNow( const auto now = fml::TimePoint::Now(); while (HasPendingTasksUnlocked(queue_id)) { - TaskQueueId top_queue; + TaskQueueId top_queue = _kUnmerged; const auto& top = PeekNextTaskUnlocked(queue_id, top_queue); if (top.GetTargetTime() > now) { break; @@ -110,7 +110,7 @@ void MessageLoopTaskQueues::WakeUp(TaskQueueId queue_id, fml::TimePoint time) { } size_t MessageLoopTaskQueues::GetNumPendingTasks(TaskQueueId queue_id) { - MergedQueuesRunner merged_tasks = MergedQueuesRunner(this, queue_id); + MergedQueuesRunner merged_tasks = MergedQueuesRunner(*this, queue_id); if (subsumed_to_owner_[queue_id] != _kUnmerged) { return 0; } @@ -135,7 +135,7 @@ void MessageLoopTaskQueues::RemoveTaskObserver(TaskQueueId queue_id, void MessageLoopTaskQueues::NotifyObservers(TaskQueueId queue_id) { MergedQueuesRunner merged_observers = - MergedQueuesRunner(this, queue_id, MutexType::kObservers); + MergedQueuesRunner(*this, queue_id, MutexType::kObservers); merged_observers.InvokeMerged([&](TaskQueueId queue) { for (const auto& observer : task_observers_[queue]) { @@ -155,7 +155,7 @@ void MessageLoopTaskQueues::Swap(TaskQueueId primary, TaskQueueId secondary) std::mutex& t1 = GetMutex(primary, MutexType::kTasks); std::mutex& t2 = GetMutex(secondary, MutexType::kTasks); - std::scoped_lock(o1, o2, t1, t2); + std::scoped_lock lock(o1, o2, t1, t2); std::swap(task_observers_[primary], task_observers_[secondary]); std::swap(delayed_tasks_[primary], delayed_tasks_[secondary]); @@ -177,7 +177,7 @@ bool MessageLoopTaskQueues::Merge(TaskQueueId owner, TaskQueueId subsumed) { std::mutex& t1 = GetMutex(owner, MutexType::kTasks); std::mutex& t2 = GetMutex(subsumed, MutexType::kTasks); - std::scoped_lock(o1, o2, t1, t2); + std::scoped_lock lock(o1, o2, t1, t2); if (owner == subsumed) { return true; @@ -187,7 +187,7 @@ bool MessageLoopTaskQueues::Merge(TaskQueueId owner, TaskQueueId subsumed) { return true; } - std::vector owner_subsumed_keys = { + std::vector owner_subsumed_keys = { owner_to_subsumed_[owner], owner_to_subsumed_[subsumed], subsumed_to_owner_[owner], subsumed_to_owner_[subsumed]}; @@ -208,19 +208,16 @@ bool MessageLoopTaskQueues::Merge(TaskQueueId owner, TaskQueueId subsumed) { } bool MessageLoopTaskQueues::Unmerge(TaskQueueId owner) { - // task_observers locks - std::mutex& o = GetMutex(owner, MutexType::kObservers); - - // delayed_tasks locks - std::mutex& t = GetMutex(owner, MutexType::kTasks); - - std::scoped_lock(o, t); + MergedQueuesRunner merged_observers = + MergedQueuesRunner(*this, owner, MutexType::kObservers); + MergedQueuesRunner merged_tasks = + MergedQueuesRunner(*this, owner, MutexType::kTasks); - if (owner_to_subsumed_[owner] == _kUnmerged) { + const TaskQueueId subsumed = owner_to_subsumed_[owner]; + if (subsumed == _kUnmerged) { return false; } - const size_t subsumed = owner_to_subsumed_[owner]; subsumed_to_owner_[subsumed] = _kUnmerged; owner_to_subsumed_[owner] = _kUnmerged; @@ -236,7 +233,7 @@ bool MessageLoopTaskQueues::Unmerge(TaskQueueId owner) { } bool MessageLoopTaskQueues::Owns(TaskQueueId owner, TaskQueueId subsumed) { - MergedQueuesRunner merged_observers = MergedQueuesRunner(this, owner); + MergedQueuesRunner merged_observers = MergedQueuesRunner(*this, owner); return subsumed == owner_to_subsumed_[owner]; } @@ -251,7 +248,7 @@ bool MessageLoopTaskQueues::HasPendingTasksUnlocked(TaskQueueId queue_id) { return true; } - const size_t subsumed = owner_to_subsumed_[queue_id]; + const TaskQueueId subsumed = owner_to_subsumed_[queue_id]; if (subsumed == _kUnmerged) { // this is not an owner and queue is empty. return false; @@ -262,7 +259,7 @@ bool MessageLoopTaskQueues::HasPendingTasksUnlocked(TaskQueueId queue_id) { fml::TimePoint MessageLoopTaskQueues::GetNextWakeTimeUnlocked( TaskQueueId queue_id) { - TaskQueueId tmp; + TaskQueueId tmp = _kUnmerged; return PeekNextTaskUnlocked(queue_id, tmp).GetTargetTime(); } @@ -270,7 +267,7 @@ const DelayedTask& MessageLoopTaskQueues::PeekNextTaskUnlocked( TaskQueueId owner, TaskQueueId& top_queue_id) { FML_DCHECK(HasPendingTasksUnlocked(owner)); - const size_t subsumed = owner_to_subsumed_[owner]; + const TaskQueueId subsumed = owner_to_subsumed_[owner]; if (subsumed == _kUnmerged) { top_queue_id = owner; return delayed_tasks_[owner].top(); diff --git a/fml/message_loop_task_queues.h b/fml/message_loop_task_queues.h index 8054dee603dfa..cc5e2787f4261 100644 --- a/fml/message_loop_task_queues.h +++ b/fml/message_loop_task_queues.h @@ -18,7 +18,15 @@ namespace fml { -typedef size_t TaskQueueId; +class TaskQueueId { + public: + explicit TaskQueueId(size_t value) : value_(value) {} + + operator int() const { return value_; } + + private: + size_t value_; +}; enum class FlushType { kSingle, @@ -134,10 +142,10 @@ class MessageLoopTaskQueues std::vector task_observers_; std::vector delayed_tasks_; - static const size_t _kUnmerged = ULONG_MAX; + static const TaskQueueId _kUnmerged; // These are guarded by delayed_tasks_mutexes_ - std::vector owner_to_subsumed_; - std::vector subsumed_to_owner_; + std::vector owner_to_subsumed_; + std::vector subsumed_to_owner_; std::atomic_int order_; From c9d4e82d42a0f6ddd45781cb1a339ff54e38ee6b Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Fri, 12 Jul 2019 11:09:32 -0700 Subject: [PATCH 7/9] fix benchmarks --- fml/message_loop_task_queues_benchmark.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/fml/message_loop_task_queues_benchmark.cc b/fml/message_loop_task_queues_benchmark.cc index 7b7808586c2a9..6a59a0aab604c 100644 --- a/fml/message_loop_task_queues_benchmark.cc +++ b/fml/message_loop_task_queues_benchmark.cc @@ -35,13 +35,13 @@ static void BM_RegisterAndGetTasks(benchmark::State& state) { &tasks_registered]() { for (int j = 0; j < num_tasks_per_queue; j++) { task_queue->RegisterTask( - task_runner_id, [] {}, past); + TaskQueueId(task_runner_id), [] {}, past); } tasks_registered.CountDown(); tasks_registered.Wait(); std::vector invocations; - task_queue->GetTasksToRunNow(task_runner_id, fml::FlushType::kAll, - invocations); + task_queue->GetTasksToRunNow(TaskQueueId(task_runner_id), + fml::FlushType::kAll, invocations); assert(invocations.size() == num_tasks_per_queue); tasks_done.CountDown(); }); From 926e9c2c7a7ed0f4edbbf179ad6bc63aa7aa842d Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Fri, 12 Jul 2019 11:14:18 -0700 Subject: [PATCH 8/9] fix some ownership semantics --- fml/message_loop_task_queues.cc | 2 +- fml/task_runner.cc | 18 ++++++++++++++++-- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/fml/message_loop_task_queues.cc b/fml/message_loop_task_queues.cc index 5880ad8b9904f..8c5501ec75810 100644 --- a/fml/message_loop_task_queues.cc +++ b/fml/message_loop_task_queues.cc @@ -234,7 +234,7 @@ bool MessageLoopTaskQueues::Unmerge(TaskQueueId owner) { bool MessageLoopTaskQueues::Owns(TaskQueueId owner, TaskQueueId subsumed) { MergedQueuesRunner merged_observers = MergedQueuesRunner(*this, owner); - return subsumed == owner_to_subsumed_[owner]; + return subsumed == owner_to_subsumed_[owner] || owner == subsumed; } // Subsumed queues will never have pending tasks. diff --git a/fml/task_runner.cc b/fml/task_runner.cc index 7d59b3cac1f2c..c82f2ac7da7e7 100644 --- a/fml/task_runner.cc +++ b/fml/task_runner.cc @@ -41,9 +41,23 @@ bool TaskRunner::RunsTasksOnCurrentThread() { if (!fml::MessageLoop::IsInitializedForCurrentThread()) { return false; } - const auto current_loop_id = + const auto current_queue_id = MessageLoop::GetCurrent().GetLoopImpl()->GetTaskQueueId(); - return current_loop_id == loop_->GetTaskQueueId(); + const auto loop_queue_id = loop_->GetTaskQueueId(); + + if (current_queue_id == loop_queue_id) { + return true; + } + + auto queues = MessageLoopTaskQueues::GetInstance(); + if (queues->Owns(current_queue_id, loop_queue_id)) { + return true; + } + if (queues->Owns(loop_queue_id, current_queue_id)) { + return true; + } + + return false; } void TaskRunner::RunNowOrPostTask(fml::RefPtr runner, From 3b96103cf5b1287092b9c268ba607ce6b9516d02 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Fri, 12 Jul 2019 16:31:16 -0700 Subject: [PATCH 9/9] Zero out TaskQueueId --- fml/message_loop_task_queues.cc | 4 +++- fml/message_loop_task_queues.h | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/fml/message_loop_task_queues.cc b/fml/message_loop_task_queues.cc index 8c5501ec75810..0933379eb8185 100644 --- a/fml/message_loop_task_queues.cc +++ b/fml/message_loop_task_queues.cc @@ -11,7 +11,9 @@ namespace fml { std::mutex MessageLoopTaskQueues::creation_mutex_; -const TaskQueueId MessageLoopTaskQueues::_kUnmerged = TaskQueueId(ULONG_MAX); +const size_t TaskQueueId::kUnmerged = ULONG_MAX; +const TaskQueueId MessageLoopTaskQueues::_kUnmerged = + TaskQueueId(TaskQueueId::kUnmerged); fml::RefPtr MessageLoopTaskQueues::instance_; fml::RefPtr MessageLoopTaskQueues::GetInstance() { diff --git a/fml/message_loop_task_queues.h b/fml/message_loop_task_queues.h index cc5e2787f4261..935d5019c637b 100644 --- a/fml/message_loop_task_queues.h +++ b/fml/message_loop_task_queues.h @@ -20,12 +20,14 @@ namespace fml { class TaskQueueId { public: + static const size_t kUnmerged; + explicit TaskQueueId(size_t value) : value_(value) {} operator int() const { return value_; } private: - size_t value_; + size_t value_ = kUnmerged; }; enum class FlushType {