Skip to content
This repository was archived by the owner on Feb 25, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions ci/licenses_golden/licenses_flutter
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,13 @@ FILE: ../../../flutter/fml/synchronization/sync_switch_unittest.cc
FILE: ../../../flutter/fml/synchronization/waitable_event.cc
FILE: ../../../flutter/fml/synchronization/waitable_event.h
FILE: ../../../flutter/fml/synchronization/waitable_event_unittest.cc
FILE: ../../../flutter/fml/task_queue_id.h
FILE: ../../../flutter/fml/task_runner.cc
FILE: ../../../flutter/fml/task_runner.h
FILE: ../../../flutter/fml/task_source.cc
FILE: ../../../flutter/fml/task_source.h
FILE: ../../../flutter/fml/task_source_grade.h
FILE: ../../../flutter/fml/task_source_unittests.cc
FILE: ../../../flutter/fml/thread.cc
FILE: ../../../flutter/fml/thread.h
FILE: ../../../flutter/fml/thread_local.cc
Expand Down
4 changes: 4 additions & 0 deletions fml/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,11 @@ source_set("fml") {
"synchronization/sync_switch.h",
"synchronization/waitable_event.cc",
"synchronization/waitable_event.h",
"task_queue_id.h",
"task_runner.cc",
"task_runner.h",
"task_source.cc",
"task_source.h",
"thread.cc",
"thread.h",
"thread_local.cc",
Expand Down Expand Up @@ -261,6 +264,7 @@ if (enable_unittests) {
"synchronization/semaphore_unittest.cc",
"synchronization/sync_switch_unittest.cc",
"synchronization/waitable_event_unittest.cc",
"task_source_unittests.cc",
"thread_local_unittests.cc",
"thread_unittests.cc",
"time/time_delta_unittest.cc",
Expand Down
16 changes: 12 additions & 4 deletions fml/delayed_task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,17 @@ namespace fml {

DelayedTask::DelayedTask(size_t order,
const fml::closure& task,
fml::TimePoint target_time)
: order_(order), task_(task), target_time_(target_time) {}

DelayedTask::DelayedTask(const DelayedTask& other) = default;
fml::TimePoint target_time,
fml::TaskSourceGrade task_source_grade)
: order_(order),
task_(task),
target_time_(target_time),
task_source_grade_(task_source_grade) {}

DelayedTask::~DelayedTask() = default;

DelayedTask::DelayedTask(const DelayedTask& other) = default;

const fml::closure& DelayedTask::GetTask() const {
return task_;
}
Expand All @@ -25,6 +29,10 @@ fml::TimePoint DelayedTask::GetTargetTime() const {
return target_time_;
}

fml::TaskSourceGrade DelayedTask::GetTaskSourceGrade() const {
return task_source_grade_;
}

bool DelayedTask::operator>(const DelayedTask& other) const {
if (target_time_ == other.target_time_) {
return order_ > other.order_;
Expand Down
7 changes: 6 additions & 1 deletion fml/delayed_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <queue>

#include "flutter/fml/closure.h"
#include "flutter/fml/task_source_grade.h"
#include "flutter/fml/time/time_point.h"

namespace fml {
Expand All @@ -16,7 +17,8 @@ class DelayedTask {
public:
DelayedTask(size_t order,
const fml::closure& task,
fml::TimePoint target_time);
fml::TimePoint target_time,
fml::TaskSourceGrade task_source_grade);

DelayedTask(const DelayedTask& other);

Expand All @@ -26,12 +28,15 @@ class DelayedTask {

fml::TimePoint GetTargetTime() const;

fml::TaskSourceGrade GetTaskSourceGrade() const;

bool operator>(const DelayedTask& other) const;

private:
size_t order_;
fml::closure task_;
fml::TimePoint target_time_;
fml::TaskSourceGrade task_source_grade_;
};

using DelayedTaskQueue = std::priority_queue<DelayedTask,
Expand Down
124 changes: 89 additions & 35 deletions fml/message_loop_task_queues.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,54 @@
#include "flutter/fml/message_loop_task_queues.h"

#include <iostream>
#include <memory>

#include "flutter/fml/make_copyable.h"
#include "flutter/fml/message_loop_impl.h"
#include "flutter/fml/task_source.h"
#include "flutter/fml/thread_local.h"

namespace fml {

std::mutex MessageLoopTaskQueues::creation_mutex_;

const size_t TaskQueueId::kUnmerged = ULONG_MAX;

// Guarded by creation_mutex_.
fml::RefPtr<MessageLoopTaskQueues> MessageLoopTaskQueues::instance_;

TaskQueueEntry::TaskQueueEntry()
: owner_of(_kUnmerged), subsumed_by(_kUnmerged) {
namespace {

// iOS prior to version 9 prevents c++11 thread_local and __thread specefier,
// having us resort to boxed enum containers.
class TaskSourceGradeHolder {
public:
TaskSourceGrade task_source_grade;

explicit TaskSourceGradeHolder(TaskSourceGrade task_source_grade_arg)
: task_source_grade(task_source_grade_arg) {}
};
} // namespace

// Guarded by creation_mutex_.
FML_THREAD_LOCAL ThreadLocalUniquePtr<TaskSourceGradeHolder>
tls_task_source_grade;

TaskQueueEntry::TaskQueueEntry(TaskQueueId created_for_arg)
: owner_of(_kUnmerged),
subsumed_by(_kUnmerged),
created_for(created_for_arg) {
wakeable = NULL;
task_observers = TaskObservers();
delayed_tasks = DelayedTaskQueue();
task_source = std::make_unique<TaskSource>(created_for);
}

fml::RefPtr<MessageLoopTaskQueues> MessageLoopTaskQueues::GetInstance() {
std::scoped_lock creation(creation_mutex_);
if (!instance_) {
instance_ = fml::MakeRefCounted<MessageLoopTaskQueues>();
tls_task_source_grade.reset(
new TaskSourceGradeHolder{TaskSourceGrade::kUnspecified});
}
return instance_;
}
Expand All @@ -38,7 +63,7 @@ TaskQueueId MessageLoopTaskQueues::CreateTaskQueue() {
std::lock_guard guard(queue_mutex_);
TaskQueueId loop_id = TaskQueueId(task_queue_id_counter_);
++task_queue_id_counter_;
queue_entries_[loop_id] = std::make_unique<TaskQueueEntry>();
queue_entries_[loop_id] = std::make_unique<TaskQueueEntry>(loop_id);
return loop_id;
}

Expand All @@ -63,24 +88,36 @@ void MessageLoopTaskQueues::DisposeTasks(TaskQueueId queue_id) {
const auto& queue_entry = queue_entries_.at(queue_id);
FML_DCHECK(queue_entry->subsumed_by == _kUnmerged);
TaskQueueId subsumed = queue_entry->owner_of;
queue_entry->delayed_tasks = {};
queue_entry->task_source->ShutDown();
if (subsumed != _kUnmerged) {
queue_entries_.at(subsumed)->delayed_tasks = {};
queue_entries_.at(subsumed)->task_source->ShutDown();
}
}

void MessageLoopTaskQueues::RegisterTask(TaskQueueId queue_id,
const fml::closure& task,
fml::TimePoint target_time) {
TaskSourceGrade MessageLoopTaskQueues::GetCurrentTaskSourceGrade() {
std::scoped_lock creation(creation_mutex_);
return tls_task_source_grade.get()->task_source_grade;
}

void MessageLoopTaskQueues::RegisterTask(
TaskQueueId queue_id,
const fml::closure& task,
fml::TimePoint target_time,
fml::TaskSourceGrade task_source_grade) {
std::lock_guard guard(queue_mutex_);
size_t order = order_++;
const auto& queue_entry = queue_entries_.at(queue_id);
queue_entry->delayed_tasks.push({order, task, target_time});
queue_entry->task_source->RegisterTask(
{order, task, target_time, task_source_grade});
TaskQueueId loop_to_wake = queue_id;
if (queue_entry->subsumed_by != _kUnmerged) {
loop_to_wake = queue_entry->subsumed_by;
}
WakeUpUnlocked(loop_to_wake, GetNextWakeTimeUnlocked(loop_to_wake));

// This can happen when the secondary tasks are paused.
if (HasPendingTasksUnlocked(loop_to_wake)) {
WakeUpUnlocked(loop_to_wake, GetNextWakeTimeUnlocked(loop_to_wake));
}
}

bool MessageLoopTaskQueues::HasPendingTasks(TaskQueueId queue_id) const {
Expand All @@ -94,20 +131,25 @@ fml::closure MessageLoopTaskQueues::GetNextTaskToRun(TaskQueueId queue_id,
if (!HasPendingTasksUnlocked(queue_id)) {
return nullptr;
}
TaskQueueId top_queue = _kUnmerged;
const auto& top = PeekNextTaskUnlocked(queue_id, top_queue);
TaskSource::TopTask top = PeekNextTaskUnlocked(queue_id);

if (!HasPendingTasksUnlocked(queue_id)) {
WakeUpUnlocked(queue_id, fml::TimePoint::Max());
} else {
WakeUpUnlocked(queue_id, GetNextWakeTimeUnlocked(queue_id));
}

if (top.GetTargetTime() > from_time) {
if (top.task.GetTargetTime() > from_time) {
return nullptr;
}
fml::closure invocation = top.GetTask();
queue_entries_.at(top_queue)->delayed_tasks.pop();
fml::closure invocation = top.task.GetTask();
queue_entries_.at(top.task_queue_id)
->task_source->PopTask(top.task.GetTaskSourceGrade());
{
std::scoped_lock creation(creation_mutex_);
const auto task_source_grade = top.task.GetTaskSourceGrade();
tls_task_source_grade.reset(new TaskSourceGradeHolder{task_source_grade});
}
return invocation;
}

Expand All @@ -126,12 +168,12 @@ size_t MessageLoopTaskQueues::GetNumPendingTasks(TaskQueueId queue_id) const {
}

size_t total_tasks = 0;
total_tasks += queue_entry->delayed_tasks.size();
total_tasks += queue_entry->task_source->GetNumPendingTasks();

TaskQueueId subsumed = queue_entry->owner_of;
if (subsumed != _kUnmerged) {
const auto& subsumed_entry = queue_entries_.at(subsumed);
total_tasks += subsumed_entry->delayed_tasks.size();
total_tasks += subsumed_entry->task_source->GetNumPendingTasks();
}
return total_tasks;
}
Expand Down Expand Up @@ -248,6 +290,20 @@ TaskQueueId MessageLoopTaskQueues::GetSubsumedTaskQueueId(
return queue_entries_.at(owner)->owner_of;
}

void MessageLoopTaskQueues::PauseSecondarySource(TaskQueueId queue_id) {
std::lock_guard guard(queue_mutex_);
queue_entries_.at(queue_id)->task_source->PauseSecondary();
}

void MessageLoopTaskQueues::ResumeSecondarySource(TaskQueueId queue_id) {
std::lock_guard guard(queue_mutex_);
queue_entries_.at(queue_id)->task_source->ResumeSecondary();
// Schedule a wake as needed.
if (HasPendingTasksUnlocked(queue_id)) {
WakeUpUnlocked(queue_id, GetNextWakeTimeUnlocked(queue_id));
}
}

// Subsumed queues will never have pending tasks.
// Owning queues will consider both their and their subsumed tasks.
bool MessageLoopTaskQueues::HasPendingTasksUnlocked(
Expand All @@ -258,7 +314,7 @@ bool MessageLoopTaskQueues::HasPendingTasksUnlocked(
return false;
}

if (!entry->delayed_tasks.empty()) {
if (!entry->task_source->IsEmpty()) {
return true;
}

Expand All @@ -267,37 +323,35 @@ bool MessageLoopTaskQueues::HasPendingTasksUnlocked(
// this is not an owner and queue is empty.
return false;
} else {
return !queue_entries_.at(subsumed)->delayed_tasks.empty();
return !queue_entries_.at(subsumed)->task_source->IsEmpty();
}
}

fml::TimePoint MessageLoopTaskQueues::GetNextWakeTimeUnlocked(
TaskQueueId queue_id) const {
TaskQueueId tmp = _kUnmerged;
return PeekNextTaskUnlocked(queue_id, tmp).GetTargetTime();
return PeekNextTaskUnlocked(queue_id).task.GetTargetTime();
}

const DelayedTask& MessageLoopTaskQueues::PeekNextTaskUnlocked(
TaskQueueId owner,
TaskQueueId& top_queue_id) const {
TaskSource::TopTask MessageLoopTaskQueues::PeekNextTaskUnlocked(
TaskQueueId owner) const {
FML_DCHECK(HasPendingTasksUnlocked(owner));
const auto& entry = queue_entries_.at(owner);
const TaskQueueId subsumed = entry->owner_of;
if (subsumed == _kUnmerged) {
top_queue_id = owner;
return entry->delayed_tasks.top();
return entry->task_source->Top();
}

const auto& owner_tasks = entry->delayed_tasks;
const auto& subsumed_tasks = queue_entries_.at(subsumed)->delayed_tasks;
TaskSource* owner_tasks = entry->task_source.get();
TaskSource* subsumed_tasks = queue_entries_.at(subsumed)->task_source.get();

// we are owning another task queue
const bool subsumed_has_task = !subsumed_tasks.empty();
const bool owner_has_task = !owner_tasks.empty();
const bool subsumed_has_task = !subsumed_tasks->IsEmpty();
const bool owner_has_task = !owner_tasks->IsEmpty();
fml::TaskQueueId top_queue_id = owner;
if (owner_has_task && subsumed_has_task) {
const auto owner_task = owner_tasks.top();
const auto subsumed_task = subsumed_tasks.top();
if (owner_task > subsumed_task) {
const auto owner_task = owner_tasks->Top();
const auto subsumed_task = subsumed_tasks->Top();
if (owner_task.task > subsumed_task.task) {
top_queue_id = subsumed;
} else {
top_queue_id = owner;
Expand All @@ -307,7 +361,7 @@ const DelayedTask& MessageLoopTaskQueues::PeekNextTaskUnlocked(
} else {
top_queue_id = subsumed;
}
return queue_entries_.at(top_queue_id)->delayed_tasks.top();
return queue_entries_.at(top_queue_id)->task_source->Top();
}

} // namespace fml
Loading