From 53878d6e9fd6c422b4a3fb2ca9c13c1a7cae3c65 Mon Sep 17 00:00:00 2001 From: Lukas Bindreiter Date: Fri, 7 Nov 2025 16:37:27 +0100 Subject: [PATCH 1/2] Grouped and packed encoding of task submissions --- apis/workflows/v1/core.proto | 65 +++++++++++++++++++++++++++--------- apis/workflows/v1/task.proto | 7 +++- 2 files changed, 55 insertions(+), 17 deletions(-) diff --git a/apis/workflows/v1/core.proto b/apis/workflows/v1/core.proto index 70547e1..9f8403f 100644 --- a/apis/workflows/v1/core.proto +++ b/apis/workflows/v1/core.proto @@ -182,14 +182,6 @@ message Tasks { // TaskSubmission is a message of a task that is just about to be submitted, either by submitting a job or as a subtask. message TaskSubmission { - option (buf.validate.message).oneof = { - fields: [ - "input", - "inputs" - ] - required: true - }; - // The cluster that this task should be run on string cluster_slug = 1; // The task identifier @@ -206,15 +198,56 @@ message TaskSubmission { // The serialized task instance, if there is only a single instance. bytes input = 3 [(buf.validate.field).bytes.max_len = 2048]; +} - // A list of serialized task instances, all sharing the same task properties. This is useful for cases where we have - // a larger number of very similar subtasks, but only the input parameters vary. - repeated bytes inputs = 7 [(buf.validate.field).repeated = { - items: { - bytes: {max_len: 2048} - } - max_items: 100000 // maximum of 100k subtasks in a single subtask tree - }]; +// TaskSubmissions is a structure for representing a set of tasks about to be submitted, either as a job or as subtasks. +// It is optimized for efficient serialization for cases where a large number of very similar tasks are submitted, +// with potentially only the individual input parameters varying. +// To reduce the serialization size, we keep a separate list/lookup table of unique task properties, that can then be +// referenced by their index. +message TaskSubmissions { + // Concrete instantiations of tasks, grouped by their dependencies and dependants, and referencing entries in the + // lookup tables also contained in this message by index. + repeated TaskSubmissionGroup task_groups = 1; + // Unique values of cluster slugs, referenced by index in the task instantiations. + repeated string cluster_slug_lookup = 2; + // Unique values of task identifiers, referenced by index in the task instantiations. + repeated TaskIdentifier identifier_lookup = 3; + // Unique values of display names, referenced by index in the task instantiations. + repeated string display_lookup = 4 [(buf.validate.field).repeated.items.string.min_len = 1]; +} + +// TaskSubmissionGroup is a structure for representing a list of submitted tasks, that all share the exact same +// dependencies and dependants. Grouping tasks by their dependency edges, and then converting task dependencies to +// group dependencies can help to drastically reduce the number of edges we need to serialize and transmit. +// Dependants are not explicitly specified, since they can be inferred from the dependencies of the other groups +// in the containing TaskSubmissions message. This means that the `dependencies_on_other_groups` field is not unique, +// across groups, since there may be two groups sharing the same dependencies but having different dependants. +message TaskSubmissionGroup { + option (buf.validate.message).cel = { + id: "task_submission_group.task_fields_size_match" + message: "The length of the task related fields must match." + expression: "this.inputs.size() == this.max_retries.size() && this.inputs.size() == this.identifier.size() && this.inputs.size() == this.cluster_slug.size() && this.inputs.size() == this.display.size() && this.inputs.size() == this.max_retries.size()" + }; + + // The indices of the groups that this submission group depends on. Indices refer to the groups field of the + // containing TaskSubmissions message. + repeated uint32 dependencies_on_other_groups = 1; + // The input parameters for each task. + // We explicitly don't group the fields into a submessage and then have a single repeated field for that submessage, + // to enable packed encoding of the repeated fields. + repeated bytes inputs = 2 [(buf.validate.field).repeated.items.bytes.max_len = 2048]; + // Index of the task identifier in the identifier_lookup field of the containing TaskSubmissions message + // for each task. + repeated uint64 identifier = 3; + // Index of the cluster slug in the cluster_slug_lookup field of the containing TaskSubmissions message for each task, + // indicating the cluster that the task should be run on. + repeated uint64 cluster_slug = 4; + // Index of the display name in the display_lookup field of the containing TaskSubmissions message for each task, + // specifying a human-readable description of the task. + repeated uint64 display = 5; + // The maximum number of retries for each task. Not a pointer to a lookup table, since we just inline the values. + repeated int64 max_retries = 6; } // A lease for a task. diff --git a/apis/workflows/v1/task.proto b/apis/workflows/v1/task.proto index 6cf5b07..2ecd1b6 100644 --- a/apis/workflows/v1/task.proto +++ b/apis/workflows/v1/task.proto @@ -36,7 +36,12 @@ message ComputedTask { // If not set, the display message specified upon task submission will be kept. string display = 2; // A list of sub-tasks that the just computed task spawned. - repeated TaskSubmission sub_tasks = 3 [(buf.validate.field).repeated.max_items = 64]; + repeated TaskSubmission legacy_sub_tasks = 3 [ + deprecated = true, + (buf.validate.field).repeated.max_items = 64 + ]; + // A list of sub-tasks that the just computed task spawned. + TaskSubmissions sub_tasks = 5; // A list of progress updates that the computed task wants to report. repeated Progress progress_updates = 4; } From 5ce91ecb1d515691485dd2c0a183595bad6aedcc Mon Sep 17 00:00:00 2001 From: Lukas Bindreiter Date: Mon, 10 Nov 2025 10:32:07 +0100 Subject: [PATCH 2/2] Variable naming to indicate pointer/value --- apis/workflows/v1/core.proto | 34 +++++++++++++++++++++++++--------- 1 file changed, 25 insertions(+), 9 deletions(-) diff --git a/apis/workflows/v1/core.proto b/apis/workflows/v1/core.proto index 9f8403f..574ca59 100644 --- a/apis/workflows/v1/core.proto +++ b/apis/workflows/v1/core.proto @@ -206,8 +206,9 @@ message TaskSubmission { // To reduce the serialization size, we keep a separate list/lookup table of unique task properties, that can then be // referenced by their index. message TaskSubmissions { - // Concrete instantiations of tasks, grouped by their dependencies and dependants, and referencing entries in the - // lookup tables also contained in this message by index. + // Concrete instantiations of tasks, grouped by their dependencies and dependants. Each group is uniquely defined by + // the set of groups that it depends on (dependencies_on_other_groups) and the set of groups that depend on it, + // (which is implicitly given by the inverse of the dependencies on other groups). repeated TaskSubmissionGroup task_groups = 1; // Unique values of cluster slugs, referenced by index in the task instantiations. repeated string cluster_slug_lookup = 2; @@ -225,9 +226,24 @@ message TaskSubmissions { // across groups, since there may be two groups sharing the same dependencies but having different dependants. message TaskSubmissionGroup { option (buf.validate.message).cel = { - id: "task_submission_group.task_fields_size_match" - message: "The length of the task related fields must match." - expression: "this.inputs.size() == this.max_retries.size() && this.inputs.size() == this.identifier.size() && this.inputs.size() == this.cluster_slug.size() && this.inputs.size() == this.display.size() && this.inputs.size() == this.max_retries.size()" + id: "task_submission_group.identifiers_size_match" + message: "The number of inputs must match the number of task identifiers." + expression: "this.inputs.size() == this.identifier_pointers.size()" + }; + option (buf.validate.message).cel = { + id: "task_submission_group.cluster_slugs_size_match" + message: "The number of cluster slugs must match the number of inputs." + expression: "this.inputs.size() == this.cluster_slug_pointers.size()" + }; + option (buf.validate.message).cel = { + id: "task_submission_group.displays_size_match" + message: "The number of display pointers must match the number of inputs." + expression: "this.inputs.size() == this.display_pointers.size()" + }; + option (buf.validate.message).cel = { + id: "task_submission_group.max_retries_size_match" + message: "The number of max_retries_values must match the number of inputs." + expression: "this.inputs.size() == this.max_retries_values.size()" }; // The indices of the groups that this submission group depends on. Indices refer to the groups field of the @@ -239,15 +255,15 @@ message TaskSubmissionGroup { repeated bytes inputs = 2 [(buf.validate.field).repeated.items.bytes.max_len = 2048]; // Index of the task identifier in the identifier_lookup field of the containing TaskSubmissions message // for each task. - repeated uint64 identifier = 3; + repeated uint64 identifier_pointers = 3; // Index of the cluster slug in the cluster_slug_lookup field of the containing TaskSubmissions message for each task, // indicating the cluster that the task should be run on. - repeated uint64 cluster_slug = 4; + repeated uint64 cluster_slug_pointers = 4; // Index of the display name in the display_lookup field of the containing TaskSubmissions message for each task, // specifying a human-readable description of the task. - repeated uint64 display = 5; + repeated uint64 display_pointers = 5; // The maximum number of retries for each task. Not a pointer to a lookup table, since we just inline the values. - repeated int64 max_retries = 6; + repeated int64 max_retries_values = 6; } // A lease for a task.