Skip to content
This repository was archived by the owner on Feb 25, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ci/licenses_golden/licenses_flutter
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ FILE: ../../../flutter/fml/memory/weak_ptr.h
FILE: ../../../flutter/fml/memory/weak_ptr_internal.cc
FILE: ../../../flutter/fml/memory/weak_ptr_internal.h
FILE: ../../../flutter/fml/memory/weak_ptr_unittest.cc
FILE: ../../../flutter/fml/merged_queues_runner.cc
FILE: ../../../flutter/fml/message.cc
FILE: ../../../flutter/fml/message.h
FILE: ../../../flutter/fml/message_loop.cc
Expand All @@ -133,6 +134,7 @@ FILE: ../../../flutter/fml/message_loop_impl.h
FILE: ../../../flutter/fml/message_loop_task_queues.cc
FILE: ../../../flutter/fml/message_loop_task_queues.h
FILE: ../../../flutter/fml/message_loop_task_queues_benchmark.cc
FILE: ../../../flutter/fml/message_loop_task_queues_merge_unmerge_unittests.cc
FILE: ../../../flutter/fml/message_loop_task_queues_unittests.cc
FILE: ../../../flutter/fml/message_loop_unittests.cc
FILE: ../../../flutter/fml/message_unittests.cc
Expand Down
2 changes: 2 additions & 0 deletions fml/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ source_set("fml") {
"memory/weak_ptr.h",
"memory/weak_ptr_internal.cc",
"memory/weak_ptr_internal.h",
"merged_queues_runner.cc",
"message.cc",
"message.h",
"message_loop.cc",
Expand Down Expand Up @@ -199,6 +200,7 @@ executable("fml_unittests") {
"file_unittest.cc",
"memory/ref_counted_unittest.cc",
"memory/weak_ptr_unittest.cc",
"message_loop_task_queues_merge_unmerge_unittests.cc",
"message_loop_task_queues_unittests.cc",
"message_loop_unittests.cc",
"message_unittests.cc",
Expand Down
58 changes: 58 additions & 0 deletions fml/merged_queues_runner.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2013 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#define FML_USED_ON_EMBEDDER

#include "flutter/fml/message_loop_task_queues.h"

namespace fml {

// RAII class for managing merged locks.
class MessageLoopTaskQueues::MergedQueuesRunner {
public:
// TODO (kaushikiska): refactor mutexes out side of MessageLoopTaskQueues
// for better DI.
MergedQueuesRunner(MessageLoopTaskQueues& task_queues,
TaskQueueId owner,
MutexType type = MutexType::kTasks)
: owner_(owner),
subsumed_(task_queues_._kUnmerged),
task_queues_(task_queues),
type_(type) {
task_queues_.GetMutex(owner, type).lock();
subsumed_ = task_queues_.owner_to_subsumed_[owner];
if (isMerged(subsumed_)) {
task_queues_.GetMutex(subsumed_, type).lock();
}
}

// First invokes on owner and then subsumed (if present).
void InvokeMerged(std::function<void(const TaskQueueId)> closure) {
closure(owner_);
if (isMerged(subsumed_)) {
closure(subsumed_);
}
}

~MergedQueuesRunner() {
if (isMerged(subsumed_)) {
task_queues_.GetMutex(subsumed_, type_).unlock();
}
task_queues_.GetMutex(owner_, type_).unlock();
}

private:
bool isMerged(TaskQueueId queue_id) {
return queue_id != MessageLoopTaskQueues::_kUnmerged;
}

const TaskQueueId owner_;
TaskQueueId subsumed_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets make task queue ID a struct with a single member of size_t.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

MessageLoopTaskQueues& task_queues_;
const MutexType type_;

FML_DISALLOW_COPY_ASSIGN_AND_MOVE(MergedQueuesRunner);
};

} // namespace fml
4 changes: 4 additions & 0 deletions fml/message_loop_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,8 @@ void MessageLoopImpl::RunSingleExpiredTaskNow() {
FlushTasks(FlushType::kSingle);
}

TaskQueueId MessageLoopImpl::GetTaskQueueId() const {
return queue_id_;
}

} // namespace fml
2 changes: 2 additions & 0 deletions fml/message_loop_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class MessageLoopImpl : public Wakeable,

void DoTerminate();

virtual TaskQueueId GetTaskQueueId() const;

void SwapTaskQueues(const fml::RefPtr<MessageLoopImpl>& other);

protected:
Expand Down
190 changes: 170 additions & 20 deletions fml/message_loop_task_queues.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@
#define FML_USED_ON_EMBEDDER

#include "flutter/fml/message_loop_task_queues.h"
#include "flutter/fml/merged_queues_runner.cc"
#include "flutter/fml/message_loop_impl.h"

namespace fml {

std::mutex MessageLoopTaskQueues::creation_mutex_;
const size_t TaskQueueId::kUnmerged = ULONG_MAX;
const TaskQueueId MessageLoopTaskQueues::_kUnmerged =
TaskQueueId(TaskQueueId::kUnmerged);
fml::RefPtr<MessageLoopTaskQueues> MessageLoopTaskQueues::instance_;

fml::RefPtr<MessageLoopTaskQueues> MessageLoopTaskQueues::GetInstance() {
Expand All @@ -22,7 +26,7 @@ fml::RefPtr<MessageLoopTaskQueues> MessageLoopTaskQueues::GetInstance() {

TaskQueueId MessageLoopTaskQueues::CreateTaskQueue() {
std::scoped_lock creation(queue_meta_mutex_);
TaskQueueId loop_id = task_queue_id_counter_;
TaskQueueId loop_id = TaskQueueId(task_queue_id_counter_);
++task_queue_id_counter_;

observers_mutexes_.push_back(std::make_unique<std::mutex>());
Expand All @@ -33,6 +37,9 @@ TaskQueueId MessageLoopTaskQueues::CreateTaskQueue() {
delayed_tasks_.push_back(DelayedTaskQueue());
wakeables_.push_back(NULL);

owner_to_subsumed_.push_back(_kUnmerged);
subsumed_to_owner_.push_back(_kUnmerged);

return loop_id;
}

Expand All @@ -42,8 +49,9 @@ MessageLoopTaskQueues::MessageLoopTaskQueues()
MessageLoopTaskQueues::~MessageLoopTaskQueues() = default;

void MessageLoopTaskQueues::Dispose(TaskQueueId queue_id) {
std::scoped_lock lock(GetMutex(queue_id, MutexType::kTasks));
delayed_tasks_[queue_id] = {};
MergedQueuesRunner merged_tasks = MergedQueuesRunner(*this, queue_id);
merged_tasks.InvokeMerged(
[&](TaskQueueId queue_id) { delayed_tasks_[queue_id] = {}; });
}

void MessageLoopTaskQueues::RegisterTask(TaskQueueId queue_id,
Expand All @@ -52,39 +60,47 @@ void MessageLoopTaskQueues::RegisterTask(TaskQueueId queue_id,
std::scoped_lock lock(GetMutex(queue_id, MutexType::kTasks));
size_t order = order_++;
delayed_tasks_[queue_id].push({order, std::move(task), target_time});
WakeUp(queue_id, delayed_tasks_[queue_id].top().GetTargetTime());
TaskQueueId loop_to_wake = queue_id;
if (subsumed_to_owner_[queue_id] != _kUnmerged) {
loop_to_wake = subsumed_to_owner_[queue_id];
}
WakeUp(loop_to_wake, delayed_tasks_[queue_id].top().GetTargetTime());
}

bool MessageLoopTaskQueues::HasPendingTasks(TaskQueueId queue_id) {
std::scoped_lock lock(GetMutex(queue_id, MutexType::kTasks));
return !delayed_tasks_[queue_id].empty();
MergedQueuesRunner merged_tasks = MergedQueuesRunner(*this, queue_id);
return HasPendingTasksUnlocked(queue_id);
}

void MessageLoopTaskQueues::GetTasksToRunNow(
TaskQueueId queue_id,
FlushType type,
std::vector<fml::closure>& invocations) {
std::scoped_lock lock(GetMutex(queue_id, MutexType::kTasks));
MergedQueuesRunner merged_tasks = MergedQueuesRunner(*this, queue_id);

if (!HasPendingTasksUnlocked(queue_id)) {
return;
}

const auto now = fml::TimePoint::Now();
DelayedTaskQueue& tasks = delayed_tasks_[queue_id];

while (!tasks.empty()) {
const auto& top = tasks.top();
while (HasPendingTasksUnlocked(queue_id)) {
TaskQueueId top_queue = _kUnmerged;
const auto& top = PeekNextTaskUnlocked(queue_id, top_queue);
if (top.GetTargetTime() > now) {
break;
}
invocations.emplace_back(std::move(top.GetTask()));
tasks.pop();
delayed_tasks_[top_queue].pop();
if (type == FlushType::kSingle) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We no longer make use of this in the concurrent tasks queue. So lets just get rid of flush type with the default being multiple.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created this flutter/flutter#36083, will follow this up.

break;
}
}

if (tasks.empty()) {
if (!HasPendingTasksUnlocked(queue_id)) {
WakeUp(queue_id, fml::TimePoint::Max());
} else {
WakeUp(queue_id, tasks.top().GetTargetTime());
WakeUp(queue_id, GetNextWakeTimeUnlocked(queue_id));
}
}

Expand All @@ -96,8 +112,14 @@ void MessageLoopTaskQueues::WakeUp(TaskQueueId queue_id, fml::TimePoint time) {
}

size_t MessageLoopTaskQueues::GetNumPendingTasks(TaskQueueId queue_id) {
std::scoped_lock lock(GetMutex(queue_id, MutexType::kTasks));
return delayed_tasks_[queue_id].size();
MergedQueuesRunner merged_tasks = MergedQueuesRunner(*this, queue_id);
if (subsumed_to_owner_[queue_id] != _kUnmerged) {
return 0;
}
size_t total_tasks = 0;
merged_tasks.InvokeMerged(
[&](TaskQueueId queue) { total_tasks += delayed_tasks_[queue].size(); });
return total_tasks;
}

void MessageLoopTaskQueues::AddTaskObserver(TaskQueueId queue_id,
Expand All @@ -114,10 +136,14 @@ void MessageLoopTaskQueues::RemoveTaskObserver(TaskQueueId queue_id,
}

void MessageLoopTaskQueues::NotifyObservers(TaskQueueId queue_id) {
std::scoped_lock lock(GetMutex(queue_id, MutexType::kObservers));
for (const auto& observer : task_observers_[queue_id]) {
observer.second();
}
MergedQueuesRunner merged_observers =
MergedQueuesRunner(*this, queue_id, MutexType::kObservers);

merged_observers.InvokeMerged([&](TaskQueueId queue) {
for (const auto& observer : task_observers_[queue]) {
observer.second();
}
});
}

// Thread safety analysis disabled as it does not account for defered locks.
Expand All @@ -131,7 +157,7 @@ void MessageLoopTaskQueues::Swap(TaskQueueId primary, TaskQueueId secondary)
std::mutex& t1 = GetMutex(primary, MutexType::kTasks);
std::mutex& t2 = GetMutex(secondary, MutexType::kTasks);

std::scoped_lock(o1, o2, t1, t2);
std::scoped_lock lock(o1, o2, t1, t2);

std::swap(task_observers_[primary], task_observers_[secondary]);
std::swap(delayed_tasks_[primary], delayed_tasks_[secondary]);
Expand All @@ -140,9 +166,133 @@ void MessageLoopTaskQueues::Swap(TaskQueueId primary, TaskQueueId secondary)
void MessageLoopTaskQueues::SetWakeable(TaskQueueId queue_id,
fml::Wakeable* wakeable) {
std::scoped_lock lock(GetMutex(queue_id, MutexType::kWakeables));
FML_CHECK(!wakeables_[queue_id]) << "Wakeable can only be set once.";
wakeables_[queue_id] = wakeable;
}

bool MessageLoopTaskQueues::Merge(TaskQueueId owner, TaskQueueId subsumed) {
// task_observers locks
std::mutex& o1 = GetMutex(owner, MutexType::kObservers);
std::mutex& o2 = GetMutex(subsumed, MutexType::kObservers);

// delayed_tasks locks
std::mutex& t1 = GetMutex(owner, MutexType::kTasks);
std::mutex& t2 = GetMutex(subsumed, MutexType::kTasks);

std::scoped_lock lock(o1, o2, t1, t2);

if (owner == subsumed) {
return true;
}

if (owner_to_subsumed_[owner] == subsumed) {
return true;
}

std::vector<TaskQueueId> owner_subsumed_keys = {
owner_to_subsumed_[owner], owner_to_subsumed_[subsumed],
subsumed_to_owner_[owner], subsumed_to_owner_[subsumed]};

for (auto key : owner_subsumed_keys) {
if (key != _kUnmerged) {
return false;
}
}

owner_to_subsumed_[owner] = subsumed;
subsumed_to_owner_[subsumed] = owner;

if (HasPendingTasksUnlocked(owner)) {
WakeUp(owner, GetNextWakeTimeUnlocked(owner));
}

return true;
}

bool MessageLoopTaskQueues::Unmerge(TaskQueueId owner) {
MergedQueuesRunner merged_observers =
MergedQueuesRunner(*this, owner, MutexType::kObservers);
MergedQueuesRunner merged_tasks =
MergedQueuesRunner(*this, owner, MutexType::kTasks);

const TaskQueueId subsumed = owner_to_subsumed_[owner];
if (subsumed == _kUnmerged) {
return false;
}

subsumed_to_owner_[subsumed] = _kUnmerged;
owner_to_subsumed_[owner] = _kUnmerged;

if (HasPendingTasksUnlocked(owner)) {
WakeUp(owner, GetNextWakeTimeUnlocked(owner));
}

if (HasPendingTasksUnlocked(subsumed)) {
WakeUp(subsumed, GetNextWakeTimeUnlocked(subsumed));
}

return true;
}

bool MessageLoopTaskQueues::Owns(TaskQueueId owner, TaskQueueId subsumed) {
MergedQueuesRunner merged_observers = MergedQueuesRunner(*this, owner);
return subsumed == owner_to_subsumed_[owner] || owner == subsumed;
}

// Subsumed queues will never have pending tasks.
// Owning queues will consider both their and their subsumed tasks.
bool MessageLoopTaskQueues::HasPendingTasksUnlocked(TaskQueueId queue_id) {
if (subsumed_to_owner_[queue_id] != _kUnmerged) {
return false;
}

if (!delayed_tasks_[queue_id].empty()) {
return true;
}

const TaskQueueId subsumed = owner_to_subsumed_[queue_id];
if (subsumed == _kUnmerged) {
// this is not an owner and queue is empty.
return false;
} else {
return !delayed_tasks_[subsumed].empty();
}
}

fml::TimePoint MessageLoopTaskQueues::GetNextWakeTimeUnlocked(
TaskQueueId queue_id) {
TaskQueueId tmp = _kUnmerged;
return PeekNextTaskUnlocked(queue_id, tmp).GetTargetTime();
}

const DelayedTask& MessageLoopTaskQueues::PeekNextTaskUnlocked(
TaskQueueId owner,
TaskQueueId& top_queue_id) {
FML_DCHECK(HasPendingTasksUnlocked(owner));
const TaskQueueId subsumed = owner_to_subsumed_[owner];
if (subsumed == _kUnmerged) {
top_queue_id = owner;
return delayed_tasks_[owner].top();
}
// we are owning another task queue
const bool subsumed_has_task = !delayed_tasks_[subsumed].empty();
const bool owner_has_task = !delayed_tasks_[owner].empty();
if (owner_has_task && subsumed_has_task) {
const auto owner_task = delayed_tasks_[owner].top();
const auto subsumed_task = delayed_tasks_[subsumed].top();
if (owner_task > subsumed_task) {
top_queue_id = subsumed;
} else {
top_queue_id = owner;
}
} else if (owner_has_task) {
top_queue_id = owner;
} else {
top_queue_id = subsumed;
}
return delayed_tasks_[top_queue_id].top();
}

std::mutex& MessageLoopTaskQueues::GetMutex(TaskQueueId queue_id,
MutexType type) {
std::scoped_lock lock(queue_meta_mutex_);
Expand Down
Loading