Skip to content
This repository was archived by the owner on Feb 25, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ci/licenses_golden/licenses_flutter
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ FILE: ../../../flutter/fml/message_loop.cc
FILE: ../../../flutter/fml/message_loop.h
FILE: ../../../flutter/fml/message_loop_impl.cc
FILE: ../../../flutter/fml/message_loop_impl.h
FILE: ../../../flutter/fml/message_loop_task_queue.cc
FILE: ../../../flutter/fml/message_loop_task_queue.h
FILE: ../../../flutter/fml/message_loop_task_queue_unittests.cc
FILE: ../../../flutter/fml/message_loop_unittests.cc
FILE: ../../../flutter/fml/message_unittests.cc
FILE: ../../../flutter/fml/native_library.h
Expand Down
3 changes: 3 additions & 0 deletions fml/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ source_set("fml") {
"message_loop.h",
"message_loop_impl.cc",
"message_loop_impl.h",
"message_loop_task_queue.cc",
"message_loop_task_queue.h",
"native_library.h",
"paths.cc",
"paths.h",
Expand Down Expand Up @@ -198,6 +200,7 @@ executable("fml_unittests") {
"file_unittest.cc",
"memory/ref_counted_unittest.cc",
"memory/weak_ptr_unittest.cc",
"message_loop_task_queue_unittests.cc",
"message_loop_unittests.cc",
"message_unittests.cc",
"paths_unittests.cc",
Expand Down
79 changes: 19 additions & 60 deletions fml/message_loop_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,30 +39,37 @@ fml::RefPtr<MessageLoopImpl> MessageLoopImpl::Create() {
#endif
}

MessageLoopImpl::MessageLoopImpl() : order_(0), terminated_(false) {}
MessageLoopImpl::MessageLoopImpl() : terminated_(false) {
task_queue_ = std::make_unique<MessageLoopTaskQueue>();
}

MessageLoopImpl::~MessageLoopImpl() = default;

void MessageLoopImpl::PostTask(fml::closure task, fml::TimePoint target_time) {
FML_DCHECK(task != nullptr);
RegisterTask(task, target_time);
FML_DCHECK(task != nullptr);
if (terminated_) {
// If the message loop has already been terminated, PostTask should destruct
// |task| synchronously within this function.
return;
}
const auto wake_up = task_queue_->RegisterTask(task, target_time);
WakeUp(wake_up);
}

void MessageLoopImpl::AddTaskObserver(intptr_t key, fml::closure callback) {
FML_DCHECK(callback != nullptr);
FML_DCHECK(MessageLoop::GetCurrent().GetLoopImpl().get() == this)
<< "Message loop task observer must be added on the same thread as the "
"loop.";
std::lock_guard<std::mutex> observers_lock(observers_mutex_);
task_observers_[key] = std::move(callback);
task_queue_->AddTaskObserver(key, callback);
}

void MessageLoopImpl::RemoveTaskObserver(intptr_t key) {
FML_DCHECK(MessageLoop::GetCurrent().GetLoopImpl().get() == this)
<< "Message loop task observer must be removed from the same thread as "
"the loop.";
std::lock_guard<std::mutex> observers_lock(observers_mutex_);
task_observers_.erase(key);
task_queue_->RemoveTaskObserver(key);
}

void MessageLoopImpl::DoRun() {
Expand All @@ -88,8 +95,7 @@ void MessageLoopImpl::DoRun() {
// should be destructed on the message loop's thread. We have just returned
// from the implementations |Run| method which we know is on the correct
// thread. Drop all pending tasks on the floor.
std::lock_guard<std::mutex> lock(delayed_tasks_mutex_);
delayed_tasks_ = {};
task_queue_->Dispose();
}

void MessageLoopImpl::DoTerminate() {
Expand All @@ -109,31 +115,8 @@ void MessageLoopImpl::SwapTaskQueues(const fml::RefPtr<MessageLoopImpl>& other)
std::unique_lock<std::mutex> t2(other->tasks_flushing_mutex_,
std::defer_lock);

// task_observers locks
std::unique_lock<std::mutex> o1(observers_mutex_, std::defer_lock);
std::unique_lock<std::mutex> o2(other->observers_mutex_, std::defer_lock);

// delayed_tasks locks
std::unique_lock<std::mutex> d1(delayed_tasks_mutex_, std::defer_lock);
std::unique_lock<std::mutex> d2(other->delayed_tasks_mutex_, std::defer_lock);

std::lock(t1, t2, o1, o2, d1, d2);

std::swap(task_observers_, other->task_observers_);
std::swap(delayed_tasks_, other->delayed_tasks_);
}

void MessageLoopImpl::RegisterTask(fml::closure task,
fml::TimePoint target_time) {
FML_DCHECK(task != nullptr);
if (terminated_) {
// If the message loop has already been terminated, PostTask should destruct
// |task| synchronously within this function.
return;
}
std::lock_guard<std::mutex> lock(delayed_tasks_mutex_);
delayed_tasks_.push({++order_, std::move(task), target_time});
WakeUp(delayed_tasks_.top().GetTargetTime());
std::lock(t1, t2);
task_queue_->Swap(*other->task_queue_);
}

void MessageLoopImpl::FlushTasks(FlushType type) {
Expand All @@ -148,36 +131,12 @@ void MessageLoopImpl::FlushTasks(FlushType type) {
// will lead us to run invocations on the wrong thread.
std::lock_guard<std::mutex> task_flush_lock(tasks_flushing_mutex_);

{
std::lock_guard<std::mutex> lock(delayed_tasks_mutex_);

if (delayed_tasks_.empty()) {
return;
}

auto now = fml::TimePoint::Now();
while (!delayed_tasks_.empty()) {
const auto& top = delayed_tasks_.top();
if (top.GetTargetTime() > now) {
break;
}
invocations.emplace_back(std::move(top.GetTask()));
delayed_tasks_.pop();
if (type == FlushType::kSingle) {
break;
}
}

WakeUp(delayed_tasks_.empty() ? fml::TimePoint::Max()
: delayed_tasks_.top().GetTargetTime());
}
const auto wake_up = task_queue_->GetTasksToRunNow(type, invocations);
WakeUp(wake_up);

for (const auto& invocation : invocations) {
invocation();
std::lock_guard<std::mutex> observers_lock(observers_mutex_);
for (const auto& observer : task_observers_) {
observer.second();
}
task_queue_->NotifyObservers();
}
}

Expand Down
15 changes: 2 additions & 13 deletions fml/message_loop_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "flutter/fml/macros.h"
#include "flutter/fml/memory/ref_counted.h"
#include "flutter/fml/message_loop.h"
#include "flutter/fml/message_loop_task_queue.h"
#include "flutter/fml/synchronization/thread_annotations.h"
#include "flutter/fml/time/time_point.h"

Expand Down Expand Up @@ -61,21 +62,9 @@ class MessageLoopImpl : public fml::RefCountedThreadSafe<MessageLoopImpl> {
private:
std::mutex tasks_flushing_mutex_;

std::mutex observers_mutex_;
std::map<intptr_t, fml::closure> task_observers_
FML_GUARDED_BY(observers_mutex_);

std::mutex delayed_tasks_mutex_;
DelayedTaskQueue delayed_tasks_ FML_GUARDED_BY(delayed_tasks_mutex_);
size_t order_ FML_GUARDED_BY(delayed_tasks_mutex_);
std::unique_ptr<MessageLoopTaskQueue> task_queue_;
std::atomic_bool terminated_;

void RegisterTask(fml::closure task, fml::TimePoint target_time);

enum class FlushType {
kSingle,
kAll,
};
void FlushTasks(FlushType type);

FML_DISALLOW_COPY_AND_ASSIGN(MessageLoopImpl);
Expand Down
97 changes: 97 additions & 0 deletions fml/message_loop_task_queue.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright 2013 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#define FML_USED_ON_EMBEDDER

#include "flutter/fml/message_loop_task_queue.h"

namespace fml {

MessageLoopTaskQueue::MessageLoopTaskQueue() : order_(0) {}

MessageLoopTaskQueue::~MessageLoopTaskQueue() = default;

void MessageLoopTaskQueue::Dispose() {
std::lock_guard<std::mutex> lock(delayed_tasks_mutex_);
delayed_tasks_ = {};
}

fml::TimePoint MessageLoopTaskQueue::RegisterTask(fml::closure task,
fml::TimePoint target_time) {
std::lock_guard<std::mutex> lock(delayed_tasks_mutex_);
delayed_tasks_.push({++order_, std::move(task), target_time});
return delayed_tasks_.top().GetTargetTime();
}

bool MessageLoopTaskQueue::HasPendingTasks() {
std::lock_guard<std::mutex> lock(delayed_tasks_mutex_);
return !delayed_tasks_.empty();
}

fml::TimePoint MessageLoopTaskQueue::GetTasksToRunNow(
FlushType type,
std::vector<fml::closure>& invocations) {
std::lock_guard<std::mutex> lock(delayed_tasks_mutex_);

const auto now = fml::TimePoint::Now();
while (!delayed_tasks_.empty()) {
const auto& top = delayed_tasks_.top();
if (top.GetTargetTime() > now) {
break;
}
invocations.emplace_back(std::move(top.GetTask()));
delayed_tasks_.pop();
if (type == FlushType::kSingle) {
break;
}
}

if (delayed_tasks_.empty()) {
return fml::TimePoint::Max();
} else {
return delayed_tasks_.top().GetTargetTime();
}
}

size_t MessageLoopTaskQueue::GetNumPendingTasks() {
std::lock_guard<std::mutex> lock(delayed_tasks_mutex_);
return delayed_tasks_.size();
}

void MessageLoopTaskQueue::AddTaskObserver(intptr_t key,
fml::closure callback) {
std::lock_guard<std::mutex> observers_lock(observers_mutex_);
task_observers_[key] = std::move(callback);
}

void MessageLoopTaskQueue::RemoveTaskObserver(intptr_t key) {
std::lock_guard<std::mutex> observers_lock(observers_mutex_);
task_observers_.erase(key);
}

void MessageLoopTaskQueue::NotifyObservers() {
std::lock_guard<std::mutex> observers_lock(observers_mutex_);
for (const auto& observer : task_observers_) {
observer.second();
}
}

// Thread safety analysis disabled as it does not account for defered locks.
void MessageLoopTaskQueue::Swap(MessageLoopTaskQueue& other)
FML_NO_THREAD_SAFETY_ANALYSIS {
// task_observers locks
std::unique_lock<std::mutex> o1(observers_mutex_, std::defer_lock);
std::unique_lock<std::mutex> o2(other.observers_mutex_, std::defer_lock);

// delayed_tasks locks
std::unique_lock<std::mutex> d1(delayed_tasks_mutex_, std::defer_lock);
std::unique_lock<std::mutex> d2(other.delayed_tasks_mutex_, std::defer_lock);

std::lock(o1, o2, d1, d2);

std::swap(task_observers_, other.task_observers_);
std::swap(delayed_tasks_, other.delayed_tasks_);
}

} // namespace fml
75 changes: 75 additions & 0 deletions fml/message_loop_task_queue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright 2013 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifndef FLUTTER_FML_MESSAGE_LOOP_TASK_QUEUE_H_
#define FLUTTER_FML_MESSAGE_LOOP_TASK_QUEUE_H_

#include <map>
#include <mutex>
#include <vector>

#include "flutter/fml/closure.h"
#include "flutter/fml/delayed_task.h"
#include "flutter/fml/macros.h"
#include "flutter/fml/memory/ref_counted.h"
#include "flutter/fml/synchronization/thread_annotations.h"

namespace fml {

enum class FlushType {
kSingle,
kAll,
};

// This class keeps track of all the tasks and observers that
// need to be run on it's MessageLoopImpl.
class MessageLoopTaskQueue {
public:
// Lifecycle.

MessageLoopTaskQueue();

~MessageLoopTaskQueue();

void Dispose();

// Tasks methods.

fml::TimePoint RegisterTask(fml::closure task, fml::TimePoint target_time);

bool HasPendingTasks();

// Returns the wake up time.
fml::TimePoint GetTasksToRunNow(FlushType type,
std::vector<fml::closure>& invocations);

size_t GetNumPendingTasks();

// Observers methods.

void AddTaskObserver(intptr_t key, fml::closure callback);

void RemoveTaskObserver(intptr_t key);

void NotifyObservers();

// Misc.

void Swap(MessageLoopTaskQueue& other);

private:
std::mutex observers_mutex_;
std::map<intptr_t, fml::closure> task_observers_
FML_GUARDED_BY(observers_mutex_);

std::mutex delayed_tasks_mutex_;
DelayedTaskQueue delayed_tasks_ FML_GUARDED_BY(delayed_tasks_mutex_);
size_t order_ FML_GUARDED_BY(delayed_tasks_mutex_);

FML_DISALLOW_COPY_ASSIGN_AND_MOVE(MessageLoopTaskQueue);
};

} // namespace fml

#endif // FLUTTER_FML_MESSAGE_LOOP_TASK_QUEUE_H_
Loading