diff --git a/ci/licenses_golden/licenses_flutter b/ci/licenses_golden/licenses_flutter index e69dd41f3f3d2..db42ac8f387e0 100644 --- a/ci/licenses_golden/licenses_flutter +++ b/ci/licenses_golden/licenses_flutter @@ -134,6 +134,9 @@ FILE: ../../../flutter/fml/message_loop.cc FILE: ../../../flutter/fml/message_loop.h FILE: ../../../flutter/fml/message_loop_impl.cc FILE: ../../../flutter/fml/message_loop_impl.h +FILE: ../../../flutter/fml/message_loop_task_queue.cc +FILE: ../../../flutter/fml/message_loop_task_queue.h +FILE: ../../../flutter/fml/message_loop_task_queue_unittests.cc FILE: ../../../flutter/fml/message_loop_unittests.cc FILE: ../../../flutter/fml/message_unittests.cc FILE: ../../../flutter/fml/native_library.h diff --git a/fml/BUILD.gn b/fml/BUILD.gn index 506d4c3508561..279afe4e206e7 100644 --- a/fml/BUILD.gn +++ b/fml/BUILD.gn @@ -46,6 +46,8 @@ source_set("fml") { "message_loop.h", "message_loop_impl.cc", "message_loop_impl.h", + "message_loop_task_queue.cc", + "message_loop_task_queue.h", "native_library.h", "paths.cc", "paths.h", @@ -198,6 +200,7 @@ executable("fml_unittests") { "file_unittest.cc", "memory/ref_counted_unittest.cc", "memory/weak_ptr_unittest.cc", + "message_loop_task_queue_unittests.cc", "message_loop_unittests.cc", "message_unittests.cc", "paths_unittests.cc", diff --git a/fml/message_loop_impl.cc b/fml/message_loop_impl.cc index f1efe0a36e8d4..5d8f4b9984c00 100644 --- a/fml/message_loop_impl.cc +++ b/fml/message_loop_impl.cc @@ -39,13 +39,22 @@ fml::RefPtr MessageLoopImpl::Create() { #endif } -MessageLoopImpl::MessageLoopImpl() : order_(0), terminated_(false) {} +MessageLoopImpl::MessageLoopImpl() : terminated_(false) { + task_queue_ = std::make_unique(); +} MessageLoopImpl::~MessageLoopImpl() = default; void MessageLoopImpl::PostTask(fml::closure task, fml::TimePoint target_time) { FML_DCHECK(task != nullptr); - RegisterTask(task, target_time); + FML_DCHECK(task != nullptr); + if (terminated_) { + // If the message loop has already been terminated, PostTask should destruct + // |task| synchronously within this function. + return; + } + const auto wake_up = task_queue_->RegisterTask(task, target_time); + WakeUp(wake_up); } void MessageLoopImpl::AddTaskObserver(intptr_t key, fml::closure callback) { @@ -53,16 +62,14 @@ void MessageLoopImpl::AddTaskObserver(intptr_t key, fml::closure callback) { FML_DCHECK(MessageLoop::GetCurrent().GetLoopImpl().get() == this) << "Message loop task observer must be added on the same thread as the " "loop."; - std::lock_guard observers_lock(observers_mutex_); - task_observers_[key] = std::move(callback); + task_queue_->AddTaskObserver(key, callback); } void MessageLoopImpl::RemoveTaskObserver(intptr_t key) { FML_DCHECK(MessageLoop::GetCurrent().GetLoopImpl().get() == this) << "Message loop task observer must be removed from the same thread as " "the loop."; - std::lock_guard observers_lock(observers_mutex_); - task_observers_.erase(key); + task_queue_->RemoveTaskObserver(key); } void MessageLoopImpl::DoRun() { @@ -88,8 +95,7 @@ void MessageLoopImpl::DoRun() { // should be destructed on the message loop's thread. We have just returned // from the implementations |Run| method which we know is on the correct // thread. Drop all pending tasks on the floor. - std::lock_guard lock(delayed_tasks_mutex_); - delayed_tasks_ = {}; + task_queue_->Dispose(); } void MessageLoopImpl::DoTerminate() { @@ -109,31 +115,8 @@ void MessageLoopImpl::SwapTaskQueues(const fml::RefPtr& other) std::unique_lock t2(other->tasks_flushing_mutex_, std::defer_lock); - // task_observers locks - std::unique_lock o1(observers_mutex_, std::defer_lock); - std::unique_lock o2(other->observers_mutex_, std::defer_lock); - - // delayed_tasks locks - std::unique_lock d1(delayed_tasks_mutex_, std::defer_lock); - std::unique_lock d2(other->delayed_tasks_mutex_, std::defer_lock); - - std::lock(t1, t2, o1, o2, d1, d2); - - std::swap(task_observers_, other->task_observers_); - std::swap(delayed_tasks_, other->delayed_tasks_); -} - -void MessageLoopImpl::RegisterTask(fml::closure task, - fml::TimePoint target_time) { - FML_DCHECK(task != nullptr); - if (terminated_) { - // If the message loop has already been terminated, PostTask should destruct - // |task| synchronously within this function. - return; - } - std::lock_guard lock(delayed_tasks_mutex_); - delayed_tasks_.push({++order_, std::move(task), target_time}); - WakeUp(delayed_tasks_.top().GetTargetTime()); + std::lock(t1, t2); + task_queue_->Swap(*other->task_queue_); } void MessageLoopImpl::FlushTasks(FlushType type) { @@ -148,36 +131,12 @@ void MessageLoopImpl::FlushTasks(FlushType type) { // will lead us to run invocations on the wrong thread. std::lock_guard task_flush_lock(tasks_flushing_mutex_); - { - std::lock_guard lock(delayed_tasks_mutex_); - - if (delayed_tasks_.empty()) { - return; - } - - auto now = fml::TimePoint::Now(); - while (!delayed_tasks_.empty()) { - const auto& top = delayed_tasks_.top(); - if (top.GetTargetTime() > now) { - break; - } - invocations.emplace_back(std::move(top.GetTask())); - delayed_tasks_.pop(); - if (type == FlushType::kSingle) { - break; - } - } - - WakeUp(delayed_tasks_.empty() ? fml::TimePoint::Max() - : delayed_tasks_.top().GetTargetTime()); - } + const auto wake_up = task_queue_->GetTasksToRunNow(type, invocations); + WakeUp(wake_up); for (const auto& invocation : invocations) { invocation(); - std::lock_guard observers_lock(observers_mutex_); - for (const auto& observer : task_observers_) { - observer.second(); - } + task_queue_->NotifyObservers(); } } diff --git a/fml/message_loop_impl.h b/fml/message_loop_impl.h index e4aad707fa5a4..8fa85aed007ce 100644 --- a/fml/message_loop_impl.h +++ b/fml/message_loop_impl.h @@ -17,6 +17,7 @@ #include "flutter/fml/macros.h" #include "flutter/fml/memory/ref_counted.h" #include "flutter/fml/message_loop.h" +#include "flutter/fml/message_loop_task_queue.h" #include "flutter/fml/synchronization/thread_annotations.h" #include "flutter/fml/time/time_point.h" @@ -61,21 +62,9 @@ class MessageLoopImpl : public fml::RefCountedThreadSafe { private: std::mutex tasks_flushing_mutex_; - std::mutex observers_mutex_; - std::map task_observers_ - FML_GUARDED_BY(observers_mutex_); - - std::mutex delayed_tasks_mutex_; - DelayedTaskQueue delayed_tasks_ FML_GUARDED_BY(delayed_tasks_mutex_); - size_t order_ FML_GUARDED_BY(delayed_tasks_mutex_); + std::unique_ptr task_queue_; std::atomic_bool terminated_; - void RegisterTask(fml::closure task, fml::TimePoint target_time); - - enum class FlushType { - kSingle, - kAll, - }; void FlushTasks(FlushType type); FML_DISALLOW_COPY_AND_ASSIGN(MessageLoopImpl); diff --git a/fml/message_loop_task_queue.cc b/fml/message_loop_task_queue.cc new file mode 100644 index 0000000000000..11927da2703ef --- /dev/null +++ b/fml/message_loop_task_queue.cc @@ -0,0 +1,97 @@ +// Copyright 2013 The Flutter Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#define FML_USED_ON_EMBEDDER + +#include "flutter/fml/message_loop_task_queue.h" + +namespace fml { + +MessageLoopTaskQueue::MessageLoopTaskQueue() : order_(0) {} + +MessageLoopTaskQueue::~MessageLoopTaskQueue() = default; + +void MessageLoopTaskQueue::Dispose() { + std::lock_guard lock(delayed_tasks_mutex_); + delayed_tasks_ = {}; +} + +fml::TimePoint MessageLoopTaskQueue::RegisterTask(fml::closure task, + fml::TimePoint target_time) { + std::lock_guard lock(delayed_tasks_mutex_); + delayed_tasks_.push({++order_, std::move(task), target_time}); + return delayed_tasks_.top().GetTargetTime(); +} + +bool MessageLoopTaskQueue::HasPendingTasks() { + std::lock_guard lock(delayed_tasks_mutex_); + return !delayed_tasks_.empty(); +} + +fml::TimePoint MessageLoopTaskQueue::GetTasksToRunNow( + FlushType type, + std::vector& invocations) { + std::lock_guard lock(delayed_tasks_mutex_); + + const auto now = fml::TimePoint::Now(); + while (!delayed_tasks_.empty()) { + const auto& top = delayed_tasks_.top(); + if (top.GetTargetTime() > now) { + break; + } + invocations.emplace_back(std::move(top.GetTask())); + delayed_tasks_.pop(); + if (type == FlushType::kSingle) { + break; + } + } + + if (delayed_tasks_.empty()) { + return fml::TimePoint::Max(); + } else { + return delayed_tasks_.top().GetTargetTime(); + } +} + +size_t MessageLoopTaskQueue::GetNumPendingTasks() { + std::lock_guard lock(delayed_tasks_mutex_); + return delayed_tasks_.size(); +} + +void MessageLoopTaskQueue::AddTaskObserver(intptr_t key, + fml::closure callback) { + std::lock_guard observers_lock(observers_mutex_); + task_observers_[key] = std::move(callback); +} + +void MessageLoopTaskQueue::RemoveTaskObserver(intptr_t key) { + std::lock_guard observers_lock(observers_mutex_); + task_observers_.erase(key); +} + +void MessageLoopTaskQueue::NotifyObservers() { + std::lock_guard observers_lock(observers_mutex_); + for (const auto& observer : task_observers_) { + observer.second(); + } +} + +// Thread safety analysis disabled as it does not account for defered locks. +void MessageLoopTaskQueue::Swap(MessageLoopTaskQueue& other) + FML_NO_THREAD_SAFETY_ANALYSIS { + // task_observers locks + std::unique_lock o1(observers_mutex_, std::defer_lock); + std::unique_lock o2(other.observers_mutex_, std::defer_lock); + + // delayed_tasks locks + std::unique_lock d1(delayed_tasks_mutex_, std::defer_lock); + std::unique_lock d2(other.delayed_tasks_mutex_, std::defer_lock); + + std::lock(o1, o2, d1, d2); + + std::swap(task_observers_, other.task_observers_); + std::swap(delayed_tasks_, other.delayed_tasks_); +} + +} // namespace fml diff --git a/fml/message_loop_task_queue.h b/fml/message_loop_task_queue.h new file mode 100644 index 0000000000000..393565dedf1a1 --- /dev/null +++ b/fml/message_loop_task_queue.h @@ -0,0 +1,75 @@ +// Copyright 2013 The Flutter Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef FLUTTER_FML_MESSAGE_LOOP_TASK_QUEUE_H_ +#define FLUTTER_FML_MESSAGE_LOOP_TASK_QUEUE_H_ + +#include +#include +#include + +#include "flutter/fml/closure.h" +#include "flutter/fml/delayed_task.h" +#include "flutter/fml/macros.h" +#include "flutter/fml/memory/ref_counted.h" +#include "flutter/fml/synchronization/thread_annotations.h" + +namespace fml { + +enum class FlushType { + kSingle, + kAll, +}; + +// This class keeps track of all the tasks and observers that +// need to be run on it's MessageLoopImpl. +class MessageLoopTaskQueue { + public: + // Lifecycle. + + MessageLoopTaskQueue(); + + ~MessageLoopTaskQueue(); + + void Dispose(); + + // Tasks methods. + + fml::TimePoint RegisterTask(fml::closure task, fml::TimePoint target_time); + + bool HasPendingTasks(); + + // Returns the wake up time. + fml::TimePoint GetTasksToRunNow(FlushType type, + std::vector& invocations); + + size_t GetNumPendingTasks(); + + // Observers methods. + + void AddTaskObserver(intptr_t key, fml::closure callback); + + void RemoveTaskObserver(intptr_t key); + + void NotifyObservers(); + + // Misc. + + void Swap(MessageLoopTaskQueue& other); + + private: + std::mutex observers_mutex_; + std::map task_observers_ + FML_GUARDED_BY(observers_mutex_); + + std::mutex delayed_tasks_mutex_; + DelayedTaskQueue delayed_tasks_ FML_GUARDED_BY(delayed_tasks_mutex_); + size_t order_ FML_GUARDED_BY(delayed_tasks_mutex_); + + FML_DISALLOW_COPY_ASSIGN_AND_MOVE(MessageLoopTaskQueue); +}; + +} // namespace fml + +#endif // FLUTTER_FML_MESSAGE_LOOP_TASK_QUEUE_H_ diff --git a/fml/message_loop_task_queue_unittests.cc b/fml/message_loop_task_queue_unittests.cc new file mode 100644 index 0000000000000..06077bcb5a265 --- /dev/null +++ b/fml/message_loop_task_queue_unittests.cc @@ -0,0 +1,70 @@ +// Copyright 2013 The Flutter Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#define FML_USED_ON_EMBEDDER + +#include "flutter/fml/message_loop_task_queue.h" +#include "gtest/gtest.h" + +TEST(MessageLoopTaskQueue, StartsWithNoPendingTasks) { + auto task_queue = std::make_unique(); + ASSERT_FALSE(task_queue->HasPendingTasks()); +} + +TEST(MessageLoopTaskQueue, RegisterOneTask) { + auto task_queue = std::make_unique(); + const auto time = fml::TimePoint::Max(); + const auto wake_time = task_queue->RegisterTask([] {}, time); + ASSERT_TRUE(task_queue->HasPendingTasks()); + ASSERT_TRUE(task_queue->GetNumPendingTasks() == 1); + ASSERT_TRUE(wake_time == time); +} + +TEST(MessageLoopTaskQueue, RegisterTwoTasksAndCount) { + auto task_queue = std::make_unique(); + task_queue->RegisterTask([] {}, fml::TimePoint::Now()); + task_queue->RegisterTask([] {}, fml::TimePoint::Max()); + ASSERT_TRUE(task_queue->HasPendingTasks()); + ASSERT_TRUE(task_queue->GetNumPendingTasks() == 2); +} + +TEST(MessageLoopTaskQueue, PreserveTaskOrdering) { + auto task_queue = std::make_unique(); + int test_val = 0; + + // order: 0 + task_queue->RegisterTask([&test_val]() { test_val = 1; }, + fml::TimePoint::Now()); + + // order: 1 + task_queue->RegisterTask([&test_val]() { test_val = 2; }, + fml::TimePoint::Now()); + + std::vector invocations; + task_queue->GetTasksToRunNow(fml::FlushType::kAll, invocations); + + int expected_value = 1; + + for (auto& invocation : invocations) { + invocation(); + ASSERT_TRUE(test_val == expected_value); + expected_value++; + } +} + +TEST(MessageLoopTaskQueue, AddRemoveNotifyObservers) { + auto task_queue = std::make_unique(); + + int test_val = 0; + intptr_t key = 123; + + task_queue->AddTaskObserver(key, [&test_val]() { test_val = 1; }); + task_queue->NotifyObservers(); + ASSERT_TRUE(test_val == 1); + + test_val = 0; + task_queue->RemoveTaskObserver(key); + task_queue->NotifyObservers(); + ASSERT_TRUE(test_val == 0); +}