diff --git a/ci/licenses_golden/licenses_flutter b/ci/licenses_golden/licenses_flutter index 7762d8304ab6d..c8750b66b4b5d 100644 --- a/ci/licenses_golden/licenses_flutter +++ b/ci/licenses_golden/licenses_flutter @@ -124,6 +124,7 @@ FILE: ../../../flutter/fml/memory/weak_ptr.h FILE: ../../../flutter/fml/memory/weak_ptr_internal.cc FILE: ../../../flutter/fml/memory/weak_ptr_internal.h FILE: ../../../flutter/fml/memory/weak_ptr_unittest.cc +FILE: ../../../flutter/fml/merged_queues_runner.cc FILE: ../../../flutter/fml/message.cc FILE: ../../../flutter/fml/message.h FILE: ../../../flutter/fml/message_loop.cc @@ -133,6 +134,7 @@ FILE: ../../../flutter/fml/message_loop_impl.h FILE: ../../../flutter/fml/message_loop_task_queues.cc FILE: ../../../flutter/fml/message_loop_task_queues.h FILE: ../../../flutter/fml/message_loop_task_queues_benchmark.cc +FILE: ../../../flutter/fml/message_loop_task_queues_merge_unmerge_unittests.cc FILE: ../../../flutter/fml/message_loop_task_queues_unittests.cc FILE: ../../../flutter/fml/message_loop_unittests.cc FILE: ../../../flutter/fml/message_unittests.cc diff --git a/fml/BUILD.gn b/fml/BUILD.gn index ac65ae1c29f31..6ee116e54d948 100644 --- a/fml/BUILD.gn +++ b/fml/BUILD.gn @@ -40,6 +40,7 @@ source_set("fml") { "memory/weak_ptr.h", "memory/weak_ptr_internal.cc", "memory/weak_ptr_internal.h", + "merged_queues_runner.cc", "message.cc", "message.h", "message_loop.cc", @@ -199,6 +200,7 @@ executable("fml_unittests") { "file_unittest.cc", "memory/ref_counted_unittest.cc", "memory/weak_ptr_unittest.cc", + "message_loop_task_queues_merge_unmerge_unittests.cc", "message_loop_task_queues_unittests.cc", "message_loop_unittests.cc", "message_unittests.cc", diff --git a/fml/merged_queues_runner.cc b/fml/merged_queues_runner.cc new file mode 100644 index 0000000000000..6c8e510bdf9bc --- /dev/null +++ b/fml/merged_queues_runner.cc @@ -0,0 +1,58 @@ +// Copyright 2013 The Flutter Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#define FML_USED_ON_EMBEDDER + +#include "flutter/fml/message_loop_task_queues.h" + +namespace fml { + +// RAII class for managing merged locks. +class MessageLoopTaskQueues::MergedQueuesRunner { + public: + // TODO (kaushikiska): refactor mutexes out side of MessageLoopTaskQueues + // for better DI. + MergedQueuesRunner(MessageLoopTaskQueues& task_queues, + TaskQueueId owner, + MutexType type = MutexType::kTasks) + : owner_(owner), + subsumed_(task_queues_._kUnmerged), + task_queues_(task_queues), + type_(type) { + task_queues_.GetMutex(owner, type).lock(); + subsumed_ = task_queues_.owner_to_subsumed_[owner]; + if (isMerged(subsumed_)) { + task_queues_.GetMutex(subsumed_, type).lock(); + } + } + + // First invokes on owner and then subsumed (if present). + void InvokeMerged(std::function closure) { + closure(owner_); + if (isMerged(subsumed_)) { + closure(subsumed_); + } + } + + ~MergedQueuesRunner() { + if (isMerged(subsumed_)) { + task_queues_.GetMutex(subsumed_, type_).unlock(); + } + task_queues_.GetMutex(owner_, type_).unlock(); + } + + private: + bool isMerged(TaskQueueId queue_id) { + return queue_id != MessageLoopTaskQueues::_kUnmerged; + } + + const TaskQueueId owner_; + TaskQueueId subsumed_; + MessageLoopTaskQueues& task_queues_; + const MutexType type_; + + FML_DISALLOW_COPY_ASSIGN_AND_MOVE(MergedQueuesRunner); +}; + +} // namespace fml diff --git a/fml/message_loop_impl.cc b/fml/message_loop_impl.cc index 5adf8ddac5fe3..6181278a67138 100644 --- a/fml/message_loop_impl.cc +++ b/fml/message_loop_impl.cc @@ -148,4 +148,8 @@ void MessageLoopImpl::RunSingleExpiredTaskNow() { FlushTasks(FlushType::kSingle); } +TaskQueueId MessageLoopImpl::GetTaskQueueId() const { + return queue_id_; +} + } // namespace fml diff --git a/fml/message_loop_impl.h b/fml/message_loop_impl.h index 0b89cf82a43d9..a84208d636d1f 100644 --- a/fml/message_loop_impl.h +++ b/fml/message_loop_impl.h @@ -45,6 +45,8 @@ class MessageLoopImpl : public Wakeable, void DoTerminate(); + virtual TaskQueueId GetTaskQueueId() const; + void SwapTaskQueues(const fml::RefPtr& other); protected: diff --git a/fml/message_loop_task_queues.cc b/fml/message_loop_task_queues.cc index ab641eff68828..0933379eb8185 100644 --- a/fml/message_loop_task_queues.cc +++ b/fml/message_loop_task_queues.cc @@ -5,11 +5,15 @@ #define FML_USED_ON_EMBEDDER #include "flutter/fml/message_loop_task_queues.h" +#include "flutter/fml/merged_queues_runner.cc" #include "flutter/fml/message_loop_impl.h" namespace fml { std::mutex MessageLoopTaskQueues::creation_mutex_; +const size_t TaskQueueId::kUnmerged = ULONG_MAX; +const TaskQueueId MessageLoopTaskQueues::_kUnmerged = + TaskQueueId(TaskQueueId::kUnmerged); fml::RefPtr MessageLoopTaskQueues::instance_; fml::RefPtr MessageLoopTaskQueues::GetInstance() { @@ -22,7 +26,7 @@ fml::RefPtr MessageLoopTaskQueues::GetInstance() { TaskQueueId MessageLoopTaskQueues::CreateTaskQueue() { std::scoped_lock creation(queue_meta_mutex_); - TaskQueueId loop_id = task_queue_id_counter_; + TaskQueueId loop_id = TaskQueueId(task_queue_id_counter_); ++task_queue_id_counter_; observers_mutexes_.push_back(std::make_unique()); @@ -33,6 +37,9 @@ TaskQueueId MessageLoopTaskQueues::CreateTaskQueue() { delayed_tasks_.push_back(DelayedTaskQueue()); wakeables_.push_back(NULL); + owner_to_subsumed_.push_back(_kUnmerged); + subsumed_to_owner_.push_back(_kUnmerged); + return loop_id; } @@ -42,8 +49,9 @@ MessageLoopTaskQueues::MessageLoopTaskQueues() MessageLoopTaskQueues::~MessageLoopTaskQueues() = default; void MessageLoopTaskQueues::Dispose(TaskQueueId queue_id) { - std::scoped_lock lock(GetMutex(queue_id, MutexType::kTasks)); - delayed_tasks_[queue_id] = {}; + MergedQueuesRunner merged_tasks = MergedQueuesRunner(*this, queue_id); + merged_tasks.InvokeMerged( + [&](TaskQueueId queue_id) { delayed_tasks_[queue_id] = {}; }); } void MessageLoopTaskQueues::RegisterTask(TaskQueueId queue_id, @@ -52,39 +60,47 @@ void MessageLoopTaskQueues::RegisterTask(TaskQueueId queue_id, std::scoped_lock lock(GetMutex(queue_id, MutexType::kTasks)); size_t order = order_++; delayed_tasks_[queue_id].push({order, std::move(task), target_time}); - WakeUp(queue_id, delayed_tasks_[queue_id].top().GetTargetTime()); + TaskQueueId loop_to_wake = queue_id; + if (subsumed_to_owner_[queue_id] != _kUnmerged) { + loop_to_wake = subsumed_to_owner_[queue_id]; + } + WakeUp(loop_to_wake, delayed_tasks_[queue_id].top().GetTargetTime()); } bool MessageLoopTaskQueues::HasPendingTasks(TaskQueueId queue_id) { - std::scoped_lock lock(GetMutex(queue_id, MutexType::kTasks)); - return !delayed_tasks_[queue_id].empty(); + MergedQueuesRunner merged_tasks = MergedQueuesRunner(*this, queue_id); + return HasPendingTasksUnlocked(queue_id); } void MessageLoopTaskQueues::GetTasksToRunNow( TaskQueueId queue_id, FlushType type, std::vector& invocations) { - std::scoped_lock lock(GetMutex(queue_id, MutexType::kTasks)); + MergedQueuesRunner merged_tasks = MergedQueuesRunner(*this, queue_id); + + if (!HasPendingTasksUnlocked(queue_id)) { + return; + } const auto now = fml::TimePoint::Now(); - DelayedTaskQueue& tasks = delayed_tasks_[queue_id]; - while (!tasks.empty()) { - const auto& top = tasks.top(); + while (HasPendingTasksUnlocked(queue_id)) { + TaskQueueId top_queue = _kUnmerged; + const auto& top = PeekNextTaskUnlocked(queue_id, top_queue); if (top.GetTargetTime() > now) { break; } invocations.emplace_back(std::move(top.GetTask())); - tasks.pop(); + delayed_tasks_[top_queue].pop(); if (type == FlushType::kSingle) { break; } } - if (tasks.empty()) { + if (!HasPendingTasksUnlocked(queue_id)) { WakeUp(queue_id, fml::TimePoint::Max()); } else { - WakeUp(queue_id, tasks.top().GetTargetTime()); + WakeUp(queue_id, GetNextWakeTimeUnlocked(queue_id)); } } @@ -96,8 +112,14 @@ void MessageLoopTaskQueues::WakeUp(TaskQueueId queue_id, fml::TimePoint time) { } size_t MessageLoopTaskQueues::GetNumPendingTasks(TaskQueueId queue_id) { - std::scoped_lock lock(GetMutex(queue_id, MutexType::kTasks)); - return delayed_tasks_[queue_id].size(); + MergedQueuesRunner merged_tasks = MergedQueuesRunner(*this, queue_id); + if (subsumed_to_owner_[queue_id] != _kUnmerged) { + return 0; + } + size_t total_tasks = 0; + merged_tasks.InvokeMerged( + [&](TaskQueueId queue) { total_tasks += delayed_tasks_[queue].size(); }); + return total_tasks; } void MessageLoopTaskQueues::AddTaskObserver(TaskQueueId queue_id, @@ -114,10 +136,14 @@ void MessageLoopTaskQueues::RemoveTaskObserver(TaskQueueId queue_id, } void MessageLoopTaskQueues::NotifyObservers(TaskQueueId queue_id) { - std::scoped_lock lock(GetMutex(queue_id, MutexType::kObservers)); - for (const auto& observer : task_observers_[queue_id]) { - observer.second(); - } + MergedQueuesRunner merged_observers = + MergedQueuesRunner(*this, queue_id, MutexType::kObservers); + + merged_observers.InvokeMerged([&](TaskQueueId queue) { + for (const auto& observer : task_observers_[queue]) { + observer.second(); + } + }); } // Thread safety analysis disabled as it does not account for defered locks. @@ -131,7 +157,7 @@ void MessageLoopTaskQueues::Swap(TaskQueueId primary, TaskQueueId secondary) std::mutex& t1 = GetMutex(primary, MutexType::kTasks); std::mutex& t2 = GetMutex(secondary, MutexType::kTasks); - std::scoped_lock(o1, o2, t1, t2); + std::scoped_lock lock(o1, o2, t1, t2); std::swap(task_observers_[primary], task_observers_[secondary]); std::swap(delayed_tasks_[primary], delayed_tasks_[secondary]); @@ -140,9 +166,133 @@ void MessageLoopTaskQueues::Swap(TaskQueueId primary, TaskQueueId secondary) void MessageLoopTaskQueues::SetWakeable(TaskQueueId queue_id, fml::Wakeable* wakeable) { std::scoped_lock lock(GetMutex(queue_id, MutexType::kWakeables)); + FML_CHECK(!wakeables_[queue_id]) << "Wakeable can only be set once."; wakeables_[queue_id] = wakeable; } +bool MessageLoopTaskQueues::Merge(TaskQueueId owner, TaskQueueId subsumed) { + // task_observers locks + std::mutex& o1 = GetMutex(owner, MutexType::kObservers); + std::mutex& o2 = GetMutex(subsumed, MutexType::kObservers); + + // delayed_tasks locks + std::mutex& t1 = GetMutex(owner, MutexType::kTasks); + std::mutex& t2 = GetMutex(subsumed, MutexType::kTasks); + + std::scoped_lock lock(o1, o2, t1, t2); + + if (owner == subsumed) { + return true; + } + + if (owner_to_subsumed_[owner] == subsumed) { + return true; + } + + std::vector owner_subsumed_keys = { + owner_to_subsumed_[owner], owner_to_subsumed_[subsumed], + subsumed_to_owner_[owner], subsumed_to_owner_[subsumed]}; + + for (auto key : owner_subsumed_keys) { + if (key != _kUnmerged) { + return false; + } + } + + owner_to_subsumed_[owner] = subsumed; + subsumed_to_owner_[subsumed] = owner; + + if (HasPendingTasksUnlocked(owner)) { + WakeUp(owner, GetNextWakeTimeUnlocked(owner)); + } + + return true; +} + +bool MessageLoopTaskQueues::Unmerge(TaskQueueId owner) { + MergedQueuesRunner merged_observers = + MergedQueuesRunner(*this, owner, MutexType::kObservers); + MergedQueuesRunner merged_tasks = + MergedQueuesRunner(*this, owner, MutexType::kTasks); + + const TaskQueueId subsumed = owner_to_subsumed_[owner]; + if (subsumed == _kUnmerged) { + return false; + } + + subsumed_to_owner_[subsumed] = _kUnmerged; + owner_to_subsumed_[owner] = _kUnmerged; + + if (HasPendingTasksUnlocked(owner)) { + WakeUp(owner, GetNextWakeTimeUnlocked(owner)); + } + + if (HasPendingTasksUnlocked(subsumed)) { + WakeUp(subsumed, GetNextWakeTimeUnlocked(subsumed)); + } + + return true; +} + +bool MessageLoopTaskQueues::Owns(TaskQueueId owner, TaskQueueId subsumed) { + MergedQueuesRunner merged_observers = MergedQueuesRunner(*this, owner); + return subsumed == owner_to_subsumed_[owner] || owner == subsumed; +} + +// Subsumed queues will never have pending tasks. +// Owning queues will consider both their and their subsumed tasks. +bool MessageLoopTaskQueues::HasPendingTasksUnlocked(TaskQueueId queue_id) { + if (subsumed_to_owner_[queue_id] != _kUnmerged) { + return false; + } + + if (!delayed_tasks_[queue_id].empty()) { + return true; + } + + const TaskQueueId subsumed = owner_to_subsumed_[queue_id]; + if (subsumed == _kUnmerged) { + // this is not an owner and queue is empty. + return false; + } else { + return !delayed_tasks_[subsumed].empty(); + } +} + +fml::TimePoint MessageLoopTaskQueues::GetNextWakeTimeUnlocked( + TaskQueueId queue_id) { + TaskQueueId tmp = _kUnmerged; + return PeekNextTaskUnlocked(queue_id, tmp).GetTargetTime(); +} + +const DelayedTask& MessageLoopTaskQueues::PeekNextTaskUnlocked( + TaskQueueId owner, + TaskQueueId& top_queue_id) { + FML_DCHECK(HasPendingTasksUnlocked(owner)); + const TaskQueueId subsumed = owner_to_subsumed_[owner]; + if (subsumed == _kUnmerged) { + top_queue_id = owner; + return delayed_tasks_[owner].top(); + } + // we are owning another task queue + const bool subsumed_has_task = !delayed_tasks_[subsumed].empty(); + const bool owner_has_task = !delayed_tasks_[owner].empty(); + if (owner_has_task && subsumed_has_task) { + const auto owner_task = delayed_tasks_[owner].top(); + const auto subsumed_task = delayed_tasks_[subsumed].top(); + if (owner_task > subsumed_task) { + top_queue_id = subsumed; + } else { + top_queue_id = owner; + } + } else if (owner_has_task) { + top_queue_id = owner; + } else { + top_queue_id = subsumed; + } + return delayed_tasks_[top_queue_id].top(); +} + std::mutex& MessageLoopTaskQueues::GetMutex(TaskQueueId queue_id, MutexType type) { std::scoped_lock lock(queue_meta_mutex_); diff --git a/fml/message_loop_task_queues.h b/fml/message_loop_task_queues.h index f86ad9be51657..935d5019c637b 100644 --- a/fml/message_loop_task_queues.h +++ b/fml/message_loop_task_queues.h @@ -18,7 +18,17 @@ namespace fml { -typedef size_t TaskQueueId; +class TaskQueueId { + public: + static const size_t kUnmerged; + + explicit TaskQueueId(size_t value) : value_(value) {} + + operator int() const { return value_; } + + private: + size_t value_ = kUnmerged; +}; enum class FlushType { kSingle, @@ -69,7 +79,30 @@ class MessageLoopTaskQueues void SetWakeable(TaskQueueId queue_id, fml::Wakeable* wakeable); + // Invariants for merge and un-merge + // 1. RegisterTask will always submit to the queue_id that is passed + // to it. It is not aware of whether a queue is merged or not. Same with + // task observers. + // 2. When we get the tasks to run now, we look at both the queue_ids + // for the owner, subsumed will spin. + // 3. Each task queue can only be merged and subsumed once. + // + // Methods currently aware of the merged state of the queues: + // HasPendingTasks, GetTasksToRunNow, GetNumPendingTasks + + // This method returns false if either the owner or subsumed has already been + // merged with something else. + bool Merge(TaskQueueId owner, TaskQueueId subsumed); + + // Will return false if the owner has not been merged before. + bool Unmerge(TaskQueueId owner); + + // Returns true if owner owns the subsumed task queue. + bool Owns(TaskQueueId owner, TaskQueueId subsumed); + private: + class MergedQueuesRunner; + enum class MutexType { kTasks, kObservers, @@ -85,6 +118,13 @@ class MessageLoopTaskQueues void WakeUp(TaskQueueId queue_id, fml::TimePoint time); + bool HasPendingTasksUnlocked(TaskQueueId queue_id); + + const DelayedTask& PeekNextTaskUnlocked(TaskQueueId queue_id, + TaskQueueId& top_queue_id); + + fml::TimePoint GetNextWakeTimeUnlocked(TaskQueueId queue_id); + std::mutex& GetMutex(TaskQueueId queue_id, MutexType type); static std::mutex creation_mutex_; @@ -104,6 +144,11 @@ class MessageLoopTaskQueues std::vector task_observers_; std::vector delayed_tasks_; + static const TaskQueueId _kUnmerged; + // These are guarded by delayed_tasks_mutexes_ + std::vector owner_to_subsumed_; + std::vector subsumed_to_owner_; + std::atomic_int order_; FML_FRIEND_MAKE_REF_COUNTED(MessageLoopTaskQueues); diff --git a/fml/message_loop_task_queues_benchmark.cc b/fml/message_loop_task_queues_benchmark.cc index 7b7808586c2a9..6a59a0aab604c 100644 --- a/fml/message_loop_task_queues_benchmark.cc +++ b/fml/message_loop_task_queues_benchmark.cc @@ -35,13 +35,13 @@ static void BM_RegisterAndGetTasks(benchmark::State& state) { &tasks_registered]() { for (int j = 0; j < num_tasks_per_queue; j++) { task_queue->RegisterTask( - task_runner_id, [] {}, past); + TaskQueueId(task_runner_id), [] {}, past); } tasks_registered.CountDown(); tasks_registered.Wait(); std::vector invocations; - task_queue->GetTasksToRunNow(task_runner_id, fml::FlushType::kAll, - invocations); + task_queue->GetTasksToRunNow(TaskQueueId(task_runner_id), + fml::FlushType::kAll, invocations); assert(invocations.size() == num_tasks_per_queue); tasks_done.CountDown(); }); diff --git a/fml/message_loop_task_queues_merge_unmerge_unittests.cc b/fml/message_loop_task_queues_merge_unmerge_unittests.cc new file mode 100644 index 0000000000000..45f50d11c524f --- /dev/null +++ b/fml/message_loop_task_queues_merge_unmerge_unittests.cc @@ -0,0 +1,210 @@ +// Copyright 2013 The Flutter Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#define FML_USED_ON_EMBEDDER + +#include + +#include "flutter/fml/message_loop_task_queues.h" +#include "flutter/fml/synchronization/count_down_latch.h" +#include "flutter/fml/synchronization/waitable_event.h" +#include "gtest/gtest.h" + +class TestWakeable : public fml::Wakeable { + public: + using WakeUpCall = std::function; + + TestWakeable(WakeUpCall call) : wake_up_call_(call) {} + + void WakeUp(fml::TimePoint time_point) override { wake_up_call_(time_point); } + + private: + WakeUpCall wake_up_call_; +}; + +TEST(MessageLoopTaskQueueMergeUnmerge, + AfterMergePrimaryTasksServicedOnPrimary) { + auto task_queue = fml::MessageLoopTaskQueues::GetInstance(); + + auto queue_id_1 = task_queue->CreateTaskQueue(); + auto queue_id_2 = task_queue->CreateTaskQueue(); + + task_queue->RegisterTask( + queue_id_1, []() {}, fml::TimePoint::Now()); + ASSERT_EQ(1u, task_queue->GetNumPendingTasks(queue_id_1)); + + task_queue->Merge(queue_id_1, queue_id_2); + task_queue->RegisterTask( + queue_id_1, []() {}, fml::TimePoint::Now()); + + ASSERT_EQ(2u, task_queue->GetNumPendingTasks(queue_id_1)); + ASSERT_EQ(0u, task_queue->GetNumPendingTasks(queue_id_2)); +} + +TEST(MessageLoopTaskQueueMergeUnmerge, + AfterMergeSecondaryTasksAlsoServicedOnPrimary) { + auto task_queue = fml::MessageLoopTaskQueues::GetInstance(); + + auto queue_id_1 = task_queue->CreateTaskQueue(); + auto queue_id_2 = task_queue->CreateTaskQueue(); + + task_queue->RegisterTask( + queue_id_2, []() {}, fml::TimePoint::Now()); + ASSERT_EQ(1u, task_queue->GetNumPendingTasks(queue_id_2)); + + task_queue->Merge(queue_id_1, queue_id_2); + ASSERT_EQ(1u, task_queue->GetNumPendingTasks(queue_id_1)); + ASSERT_EQ(0u, task_queue->GetNumPendingTasks(queue_id_2)); +} + +TEST(MessageLoopTaskQueueMergeUnmerge, MergeUnmergeTasksPreserved) { + auto task_queue = fml::MessageLoopTaskQueues::GetInstance(); + + auto queue_id_1 = task_queue->CreateTaskQueue(); + auto queue_id_2 = task_queue->CreateTaskQueue(); + + task_queue->RegisterTask( + queue_id_1, []() {}, fml::TimePoint::Now()); + task_queue->RegisterTask( + queue_id_2, []() {}, fml::TimePoint::Now()); + + ASSERT_EQ(1u, task_queue->GetNumPendingTasks(queue_id_1)); + ASSERT_EQ(1u, task_queue->GetNumPendingTasks(queue_id_2)); + + task_queue->Merge(queue_id_1, queue_id_2); + + ASSERT_EQ(2u, task_queue->GetNumPendingTasks(queue_id_1)); + ASSERT_EQ(0u, task_queue->GetNumPendingTasks(queue_id_2)); + + task_queue->Unmerge(queue_id_1); + + ASSERT_EQ(1u, task_queue->GetNumPendingTasks(queue_id_1)); + ASSERT_EQ(1u, task_queue->GetNumPendingTasks(queue_id_2)); +} + +TEST(MessageLoopTaskQueueMergeUnmerge, MergeFailIfAlreadyMergedOrSubsumed) { + auto task_queue = fml::MessageLoopTaskQueues::GetInstance(); + + auto queue_id_1 = task_queue->CreateTaskQueue(); + auto queue_id_2 = task_queue->CreateTaskQueue(); + auto queue_id_3 = task_queue->CreateTaskQueue(); + + task_queue->Merge(queue_id_1, queue_id_2); + + ASSERT_FALSE(task_queue->Merge(queue_id_1, queue_id_3)); + ASSERT_FALSE(task_queue->Merge(queue_id_2, queue_id_3)); +} + +TEST(MessageLoopTaskQueueMergeUnmerge, UnmergeFailsOnSubsumed) { + auto task_queue = fml::MessageLoopTaskQueues::GetInstance(); + + auto queue_id_1 = task_queue->CreateTaskQueue(); + auto queue_id_2 = task_queue->CreateTaskQueue(); + + task_queue->Merge(queue_id_1, queue_id_2); + + ASSERT_FALSE(task_queue->Unmerge(queue_id_2)); +} + +TEST(MessageLoopTaskQueueMergeUnmerge, MergeInvokesBothWakeables) { + auto task_queue = fml::MessageLoopTaskQueues::GetInstance(); + + auto queue_id_1 = task_queue->CreateTaskQueue(); + auto queue_id_2 = task_queue->CreateTaskQueue(); + + fml::CountDownLatch latch(2); + + task_queue->SetWakeable( + queue_id_1, + new TestWakeable([&](fml::TimePoint wake_time) { latch.CountDown(); })); + task_queue->SetWakeable( + queue_id_2, + new TestWakeable([&](fml::TimePoint wake_time) { latch.CountDown(); })); + + task_queue->RegisterTask( + queue_id_1, []() {}, fml::TimePoint::Now()); + + task_queue->Merge(queue_id_1, queue_id_2); + + std::vector invocations; + task_queue->GetTasksToRunNow(queue_id_1, fml::FlushType::kAll, invocations); + + latch.Wait(); +} + +TEST(MessageLoopTaskQueueMergeUnmerge, + MergeUnmergeInvokesBothWakeablesSeparately) { + auto task_queue = fml::MessageLoopTaskQueues::GetInstance(); + + auto queue_id_1 = task_queue->CreateTaskQueue(); + auto queue_id_2 = task_queue->CreateTaskQueue(); + + fml::AutoResetWaitableEvent latch_1, latch_2; + + task_queue->SetWakeable( + queue_id_1, + new TestWakeable([&](fml::TimePoint wake_time) { latch_1.Signal(); })); + task_queue->SetWakeable( + queue_id_2, + new TestWakeable([&](fml::TimePoint wake_time) { latch_2.Signal(); })); + + task_queue->RegisterTask( + queue_id_1, []() {}, fml::TimePoint::Now()); + task_queue->RegisterTask( + queue_id_2, []() {}, fml::TimePoint::Now()); + + task_queue->Merge(queue_id_1, queue_id_2); + task_queue->Unmerge(queue_id_1); + + std::vector invocations; + + task_queue->GetTasksToRunNow(queue_id_1, fml::FlushType::kAll, invocations); + latch_1.Wait(); + + task_queue->GetTasksToRunNow(queue_id_2, fml::FlushType::kAll, invocations); + latch_2.Wait(); +} + +TEST(MessageLoopTaskQueueMergeUnmerge, GetTasksToRunNowBlocksMerge) { + auto task_queue = fml::MessageLoopTaskQueues::GetInstance(); + + auto queue_id_1 = task_queue->CreateTaskQueue(); + auto queue_id_2 = task_queue->CreateTaskQueue(); + + fml::AutoResetWaitableEvent wake_up_start, wake_up_end, merge_start, + merge_end; + + task_queue->RegisterTask( + queue_id_1, []() {}, fml::TimePoint::Now()); + task_queue->SetWakeable(queue_id_1, + new TestWakeable([&](fml::TimePoint wake_time) { + wake_up_start.Signal(); + wake_up_end.Wait(); + })); + + std::thread tasks_to_run_now_thread([&]() { + std::vector invocations; + task_queue->GetTasksToRunNow(queue_id_1, fml::FlushType::kAll, invocations); + }); + + wake_up_start.Wait(); + bool merge_done = false; + + std::thread merge_thread([&]() { + merge_start.Signal(); + task_queue->Merge(queue_id_1, queue_id_2); + merge_done = true; + merge_end.Signal(); + }); + + merge_start.Wait(); + ASSERT_FALSE(merge_done); + wake_up_end.Signal(); + + merge_end.Wait(); + ASSERT_TRUE(merge_done); + + tasks_to_run_now_thread.join(); + merge_thread.join(); +} diff --git a/fml/message_loop_task_queues_unittests.cc b/fml/message_loop_task_queues_unittests.cc index 49b09a49695e3..8b83f22c9cc32 100644 --- a/fml/message_loop_task_queues_unittests.cc +++ b/fml/message_loop_task_queues_unittests.cc @@ -113,22 +113,6 @@ TEST(MessageLoopTaskQueue, WakeUpIndependentOfTime) { ASSERT_TRUE(num_wakes == 2); } -TEST(MessageLoopTaskQueue, WakeUpWithMaxIfNoInvocations) { - auto task_queue = fml::MessageLoopTaskQueues::GetInstance(); - auto queue_id = task_queue->CreateTaskQueue(); - fml::AutoResetWaitableEvent ev; - - task_queue->SetWakeable(queue_id, - new TestWakeable([&ev](fml::TimePoint wake_time) { - ASSERT_TRUE(wake_time == fml::TimePoint::Max()); - ev.Signal(); - })); - - std::vector invocations; - task_queue->GetTasksToRunNow(queue_id, fml::FlushType::kAll, invocations); - ev.Wait(); -} - TEST(MessageLoopTaskQueue, WokenUpWithNewerTime) { auto task_queue = fml::MessageLoopTaskQueues::GetInstance(); auto queue_id = task_queue->CreateTaskQueue(); diff --git a/fml/task_runner.cc b/fml/task_runner.cc index 2c4cfe4b638a2..c82f2ac7da7e7 100644 --- a/fml/task_runner.cc +++ b/fml/task_runner.cc @@ -32,11 +32,32 @@ void TaskRunner::PostDelayedTask(fml::closure task, fml::TimeDelta delay) { loop_->PostTask(std::move(task), fml::TimePoint::Now() + delay); } +TaskQueueId TaskRunner::GetTaskQueueId() { + FML_DCHECK(loop_); + return loop_->GetTaskQueueId(); +} + bool TaskRunner::RunsTasksOnCurrentThread() { if (!fml::MessageLoop::IsInitializedForCurrentThread()) { return false; } - return MessageLoop::GetCurrent().GetLoopImpl() == loop_; + const auto current_queue_id = + MessageLoop::GetCurrent().GetLoopImpl()->GetTaskQueueId(); + const auto loop_queue_id = loop_->GetTaskQueueId(); + + if (current_queue_id == loop_queue_id) { + return true; + } + + auto queues = MessageLoopTaskQueues::GetInstance(); + if (queues->Owns(current_queue_id, loop_queue_id)) { + return true; + } + if (queues->Owns(loop_queue_id, current_queue_id)) { + return true; + } + + return false; } void TaskRunner::RunNowOrPostTask(fml::RefPtr runner, diff --git a/fml/task_runner.h b/fml/task_runner.h index 72c4219029f34..b7569058af17f 100644 --- a/fml/task_runner.h +++ b/fml/task_runner.h @@ -9,6 +9,7 @@ #include "flutter/fml/macros.h" #include "flutter/fml/memory/ref_counted.h" #include "flutter/fml/memory/ref_ptr.h" +#include "flutter/fml/message_loop_task_queues.h" #include "flutter/fml/time/time_point.h" namespace fml { @@ -27,6 +28,8 @@ class TaskRunner : public fml::RefCountedThreadSafe { virtual bool RunsTasksOnCurrentThread(); + virtual TaskQueueId GetTaskQueueId(); + static void RunNowOrPostTask(fml::RefPtr runner, fml::closure task);