From 04496cdf66d454e3ea250f6f6e1bdc5f8f3f59fc Mon Sep 17 00:00:00 2001 From: Bastien Orivel Date: Tue, 7 Apr 2026 17:55:04 +0200 Subject: [PATCH 1/4] Parallelize get_in_tree_template and populate_jsone_context The .taskcluster.yml fetch and the json-e context population (which fetches the pushlog, scm level) are independent. Run them concurrently with `asyncio.gather`. With a mocked implementation for hg/tc, that sets every hg request to take 5s and every TC request to take 0.5s, this reduces the time to verify CoT on a signing task from 42s to 27s. --- src/scriptworker/cot/verify.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/scriptworker/cot/verify.py b/src/scriptworker/cot/verify.py index bedad9d9..76ce0a82 100644 --- a/src/scriptworker/cot/verify.py +++ b/src/scriptworker/cot/verify.py @@ -1565,8 +1565,10 @@ async def get_jsone_context_and_template(chain, parent_link, decision_link, task if tasks_for in ("action", "pr-action"): jsone_context, tmpl = await get_action_context_and_template(chain, parent_link, decision_link, tasks_for) else: - tmpl = await get_in_tree_template(decision_link) - jsone_context = await populate_jsone_context(chain, parent_link, decision_link, tasks_for) + tmpl, jsone_context = await asyncio.gather( + get_in_tree_template(decision_link), + populate_jsone_context(chain, parent_link, decision_link, tasks_for), + ) return jsone_context, tmpl From 0c153a8974c14aa140669c435b81632f7b61a12b Mon Sep 17 00:00:00 2001 From: Bastien Orivel Date: Tue, 7 Apr 2026 17:59:23 +0200 Subject: [PATCH 2/4] Parallelize verify_task_types and verify_worker_impls across all links Run all task type and worker impl verification functions concurrently with asyncio.gather instead of sequentially. Each verification function only mutates its own link object, never another link's state, so concurrent execution is safe. The log output from different links might interleave now, but given the difference in performance I think that it's a worthy tradeoff. With a mocked implementation for hg/tc, that sets every hg request to take 5s and every TC request to take 0.5s, this reduces the time to verify CoT on a signing task from 27s to 11s. With a beetmover task, it goes from 40s to 20s. --- src/scriptworker/cot/verify.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/scriptworker/cot/verify.py b/src/scriptworker/cot/verify.py index 76ce0a82..0212b7db 100644 --- a/src/scriptworker/cot/verify.py +++ b/src/scriptworker/cot/verify.py @@ -1805,14 +1805,14 @@ async def verify_task_types(chain): """ valid_task_types = get_valid_task_types() task_count = {} + tasks = [] for obj in chain.get_all_links_in_chain(): task_type = obj.task_type log.info("Verifying {} {} as a {} task...".format(obj.name, obj.task_id, task_type)) task_count.setdefault(task_type, 0) task_count[task_type] += 1 - # Run tests synchronously for now. We can parallelize if efficiency - # is more important than a single simple logfile. - await valid_task_types[task_type](chain, obj) + tasks.append(valid_task_types[task_type](chain, obj)) + await asyncio.gather(*tasks) return task_count @@ -1882,12 +1882,12 @@ async def verify_worker_impls(chain): """ valid_worker_impls = get_valid_worker_impls() + tasks = [] for obj in chain.get_all_links_in_chain(): worker_impl = obj.worker_impl log.info("Verifying {} {} as a {} task...".format(obj.name, obj.task_id, worker_impl)) - # Run tests synchronously for now. We can parallelize if efficiency - # is more important than a single simple logfile. - await valid_worker_impls[worker_impl](chain, obj) + tasks.append(valid_worker_impls[worker_impl](chain, obj)) + await asyncio.gather(*tasks) # get_source_url {{{1 From 03a09511f5c048cf40ff41d481c4ef772165f444 Mon Sep 17 00:00:00 2001 From: Bastien Orivel Date: Wed, 8 Apr 2026 15:46:53 +0200 Subject: [PATCH 3/4] Parallelize sibling task definition fetching in `build_task_dependencies` For each task, fetch its direct dependencies in parallel instead of one at a time. We can't really do much more without knowing about the graph itself since we don't want duplicates and might have diamond shapes. And we need the task definition (which is what we're trying to get in the first place) to get that graph shape... This changes `build_link` (renamed to `add_link`) so that it only fetches a single task definition and adds it to the chain instead of recursing directly, recursing into children is now done in `build_task_dependencies` instead, essentially transforming the traversal from a DFS to a BFS. With a mocked implementation for hg/tc, that sets every hg request to take 5s and every TC request to take 0.5s, this reduces the time to verify CoT on a signing task from 11s to 10s. On a beetmover task, the impact is much more visible since the graph is much deeper and I'm seeing improvements from 20s to 15s. --- src/scriptworker/cot/verify.py | 32 ++++++++++++++++++++------------ tests/test_cot_verify.py | 2 +- 2 files changed, 21 insertions(+), 13 deletions(-) diff --git a/src/scriptworker/cot/verify.py b/src/scriptworker/cot/verify.py index 0212b7db..a3dd4dcc 100644 --- a/src/scriptworker/cot/verify.py +++ b/src/scriptworker/cot/verify.py @@ -619,8 +619,8 @@ def _sort_dependencies_by_name_then_task_id(dependencies): # build_task_dependencies {{{1 -async def build_link(chain, task_name, task_id): - """Build a LinkOfTrust and add it to the chain. +async def add_link(chain, task_name, task_id): + """Fetch a task definition and add it as a LinkOfTrust to the chain. Args: chain (ChainOfTrust): the chain of trust to add to. @@ -633,14 +633,11 @@ async def build_link(chain, task_name, task_id): """ link = LinkOfTrust(chain.context, task_name, task_id) json_path = link.get_artifact_full_path("task.json") - task_defn = await retry_get_task_definition(chain.context.queue, task_id, exception=CoTError) - link.task = task_defn + link.task = await retry_get_task_definition(chain.context.queue, task_id, exception=CoTError) chain.links.append(link) - # write task json to disk makedirs(os.path.dirname(json_path)) with open(json_path, "w") as fh: - fh.write(format_json(task_defn)) - await build_task_dependencies(chain, task_defn, task_name, task_id) + fh.write(format_json(link.task)) async def build_task_dependencies(chain, task, name, my_task_id): @@ -661,9 +658,21 @@ async def build_task_dependencies(chain, task, name, my_task_id): raise CoTError("Too deep recursion!\n{}".format(name)) sorted_dependencies = find_sorted_task_dependencies(task, name, my_task_id) + seen = set(chain.dependent_task_ids()) + new_deps = [] for task_name, task_id in sorted_dependencies: - if task_id not in chain.dependent_task_ids(): - await build_link(chain, task_name, task_id) + if task_id not in seen: + seen.add(task_id) + new_deps.append((task_name, task_id)) + + if not new_deps: + return + + await asyncio.gather(*[add_link(chain, task_name, task_id) for task_name, task_id in new_deps]) + + for task_name, task_id in new_deps: + link = chain.get_link(task_id) + await build_task_dependencies(chain, link.task, task_name, task_id) # download_cot {{{1 @@ -2044,9 +2053,8 @@ async def verify_chain_of_trust(chain, *, check_task=False): try: # build LinkOfTrust objects if check_task: - await build_link(chain, chain.name, chain.task_id) - else: - await build_task_dependencies(chain, chain.task, chain.name, chain.task_id) + await add_link(chain, chain.name, chain.task_id) + await build_task_dependencies(chain, chain.task, chain.name, chain.task_id) # download the signed chain of trust artifacts await download_cot(chain) # verify the signatures and populate the ``link.cot``s diff --git a/tests/test_cot_verify.py b/tests/test_cot_verify.py index d06ac5a0..20434fb4 100644 --- a/tests/test_cot_verify.py +++ b/tests/test_cot_verify.py @@ -2208,7 +2208,7 @@ async def maybe_die(*args): if exc is not None: raise exc("blah") - for func in ("build_task_dependencies", "build_link", "download_cot", "download_cot_artifacts", "verify_task_types", "verify_worker_impls"): + for func in ("build_task_dependencies", "add_link", "download_cot", "download_cot_artifacts", "verify_task_types", "verify_worker_impls"): mocker.patch.object(cotverify, func, new=noop_async) mocker.patch.object(cotverify, "verify_cot_signatures", new=noop_sync) mocker.patch.object(cotverify, "trace_back_to_tree", new=maybe_die) From 2497adcbfed4dc5f3b48ae8944ed1957f4699a83 Mon Sep 17 00:00:00 2001 From: Bastien Orivel Date: Wed, 8 Apr 2026 19:13:35 +0200 Subject: [PATCH 4/4] Parallelize the recursion in `build_task_dependencies` Pass the `seen` set downwards in recursion to avoid grabbing the same dependency twice (in case of diamond shaped graphs). In the signing case it's a small change (-0.5s) but on the beetmover case it's a 3.5s win. --- src/scriptworker/cot/verify.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/scriptworker/cot/verify.py b/src/scriptworker/cot/verify.py index a3dd4dcc..58821d1c 100644 --- a/src/scriptworker/cot/verify.py +++ b/src/scriptworker/cot/verify.py @@ -640,7 +640,7 @@ async def add_link(chain, task_name, task_id): fh.write(format_json(link.task)) -async def build_task_dependencies(chain, task, name, my_task_id): +async def build_task_dependencies(chain, task, name, my_task_id, seen=None): """Recursively build the task dependencies of a task. Args: @@ -648,6 +648,7 @@ async def build_task_dependencies(chain, task, name, my_task_id): task (dict): the task definition to operate on. name (str): the name of the task to operate on. my_task_id (str): the taskId of the task to operate on. + seen (set): shared set of already-seen task IDs to avoid duplicates Raises: CoTError: on failure. @@ -658,7 +659,8 @@ async def build_task_dependencies(chain, task, name, my_task_id): raise CoTError("Too deep recursion!\n{}".format(name)) sorted_dependencies = find_sorted_task_dependencies(task, name, my_task_id) - seen = set(chain.dependent_task_ids()) + if seen is None: + seen = set(chain.dependent_task_ids()) new_deps = [] for task_name, task_id in sorted_dependencies: if task_id not in seen: @@ -670,9 +672,7 @@ async def build_task_dependencies(chain, task, name, my_task_id): await asyncio.gather(*[add_link(chain, task_name, task_id) for task_name, task_id in new_deps]) - for task_name, task_id in new_deps: - link = chain.get_link(task_id) - await build_task_dependencies(chain, link.task, task_name, task_id) + await asyncio.gather(*[build_task_dependencies(chain, chain.get_link(task_id).task, task_name, task_id, seen) for task_name, task_id in new_deps]) # download_cot {{{1