From e63a42ffe10cfbc3160d732ed12862fe797e1b63 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Sat, 22 Jun 2024 11:56:31 +0200 Subject: [PATCH 1/3] casdprocessmanager.py: Add stubs for the Remote Execution API --- src/buildstream/_cas/casdprocessmanager.py | 37 ++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/src/buildstream/_cas/casdprocessmanager.py b/src/buildstream/_cas/casdprocessmanager.py index ab497a8ac..0d5cd6a8d 100644 --- a/src/buildstream/_cas/casdprocessmanager.py +++ b/src/buildstream/_cas/casdprocessmanager.py @@ -31,6 +31,7 @@ from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc from .._protos.build.buildgrid import local_cas_pb2_grpc from .._protos.google.bytestream import bytestream_pb2_grpc +from .._protos.google.longrunning import operations_pb2_grpc from .. import _site from .. import utils @@ -125,6 +126,9 @@ def __init__(self, path, log_dir, log_level, cache_quota, remote_cache_spec, pro self._local_cas = None self._asset_fetch = None self._asset_push = None + self._exec_service = None + self._operations_service = None + self._ac_service = None self._shutdown_requested = False self._lock = threading.Lock() @@ -282,6 +286,9 @@ def release_resources(self, messenger=None): self._shutdown_requested = True with self._lock: if self._casd_channel: + self._ac_service = None + self._operations_service = None + self._exec_service = None self._asset_push = None self._asset_fetch = None self._local_cas = None @@ -374,6 +381,9 @@ def _establish_connection(self): self._local_cas = local_cas_pb2_grpc.LocalContentAddressableStorageStub(self._casd_channel) self._asset_fetch = remote_asset_pb2_grpc.FetchStub(self._casd_channel) self._asset_push = remote_asset_pb2_grpc.PushStub(self._casd_channel) + self._exec_service = remote_execution_pb2_grpc.ExecutionStub(self._casd_channel) + self._operations_service = operations_pb2_grpc.OperationsStub(self._casd_channel) + self._ac_service = remote_execution_pb2_grpc.ActionCacheStub(self._casd_channel) # get_cas(): # @@ -415,3 +425,30 @@ def get_asset_push(self): if self._casd_channel is None: self._establish_connection() return self._asset_push + + # get_exec_service(): + # + # Return Remote Execution stub for buildbox-casd channel. + # + def get_exec_service(self): + if self._casd_channel is None: + self._establish_connection() + return self._exec_service + + # get_operations_service(): + # + # Return Operations stub for buildbox-casd channel. + # + def get_operations_service(self): + if self._casd_channel is None: + self._establish_connection() + return self._operations_service + + # get_ac_service(): + # + # Return Action Cache stub for buildbox-casd channel. + # + def get_ac_service(self): + if self._casd_channel is None: + self._establish_connection() + return self._ac_service From aec62cd58bbdb03fbe9753dc2c31915f6de39045 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Sat, 22 Jun 2024 11:57:33 +0200 Subject: [PATCH 2/3] _sandboxremote.py: Use buildbox-casd as remote execution proxy buildbox-casd is already used as CAS and RA proxy. Using buildbox-casd for all remote connections aims to improve robustness (e.g., consistent retry behavior) and enables support for token-based authentication. --- src/buildstream/sandbox/_sandboxremote.py | 142 +++++++++++++++++----- tests/remotecache/simple.py | 3 +- 2 files changed, 115 insertions(+), 30 deletions(-) diff --git a/src/buildstream/sandbox/_sandboxremote.py b/src/buildstream/sandbox/_sandboxremote.py index 1e8598d6c..ef8ef7de8 100644 --- a/src/buildstream/sandbox/_sandboxremote.py +++ b/src/buildstream/sandbox/_sandboxremote.py @@ -15,19 +15,84 @@ # Jim MacArthur import shutil -from functools import partial import grpc from ._sandboxreapi import SandboxREAPI from .. import _signals +from .._remote import BaseRemote from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc +from .._protos.build.buildgrid import local_cas_pb2 from .._protos.google.rpc import code_pb2 from .._exceptions import BstError, SandboxError from .._protos.google.longrunning import operations_pb2, operations_pb2_grpc from .._cas import CASRemote +class ExecutionRemote(BaseRemote): + def __init__(self, spec, casd): + super().__init__(spec) + self.casd = casd + self.instance_name = None + self.exec_service = None + self.operations_service = None + + def close(self): + self.exec_service = None + self.operations_service = None + super().close() + + def _configure_protocols(self): + local_cas = self.casd.get_local_cas() + request = local_cas_pb2.GetInstanceNameForRemotesRequest() + self.spec.to_localcas_remote(request.execution) + try: + response = local_cas.GetInstanceNameForRemotes(request) + self.instance_name = response.instance_name + self.exec_service = self.casd.get_exec_service() + self.operations_service = self.casd.get_operations_service() + except grpc.RpcError as e: + if e.code() == grpc.StatusCode.UNIMPLEMENTED or e.code() == grpc.StatusCode.INVALID_ARGUMENT: + # buildbox-casd is too old to support execution service remotes. + # Fall back to direct connection. + self.instance_name = self.spec.instance_name + self.channel = self.spec.open_channel() + self.exec_service = remote_execution_pb2_grpc.ExecutionStub(self.channel) + self.operations_service = operations_pb2_grpc.OperationsStub(self.channel) + else: + raise + + +class ActionCacheRemote(BaseRemote): + def __init__(self, spec, casd): + super().__init__(spec) + self.casd = casd + self.instance_name = None + self.ac_service = None + + def close(self): + self.ac_service = None + super().close() + + def _configure_protocols(self): + local_cas = self.casd.get_local_cas() + request = local_cas_pb2.GetInstanceNameForRemotesRequest() + self.spec.to_localcas_remote(request.action_cache) + try: + response = local_cas.GetInstanceNameForRemotes(request) + self.instance_name = response.instance_name + self.ac_service = self.casd.get_ac_service() + except grpc.RpcError as e: + if e.code() == grpc.StatusCode.UNIMPLEMENTED or e.code() == grpc.StatusCode.INVALID_ARGUMENT: + # buildbox-casd is too old to support action cache remotes. + # Fall back to direct connection. + self.instance_name = self.spec.instance_name + self.channel = self.spec.open_channel() + self.ac_service = remote_execution_pb2_grpc.ActionCacheStub(self.channel) + else: + raise + + # SandboxRemote() # # This isn't really a sandbox, it's a stub which sends all the sources and build @@ -39,6 +104,7 @@ def __init__(self, *args, **kwargs): context = self._get_context() cascache = context.get_cascache() + casd = context.get_casd() specs = context.remote_execution_specs if specs is None: @@ -62,15 +128,33 @@ def __init__(self, *args, **kwargs): self.own_storage_remote = False self.storage_remote = cascache.get_default_remote() - def run_remote_command(self, channel, action_digest): + self.exec_remote = ExecutionRemote(self.exec_spec, casd) + try: + self.exec_remote.init() + except grpc.RpcError as e: + raise SandboxError( + "Failed to contact remote execution service at {}: {}".format(self.exec_spec.url, e) + ) from e + + if self.action_spec: + self.ac_remote = ActionCacheRemote(self.action_spec, casd) + try: + self.ac_remote.init() + except grpc.RpcError as e: + raise SandboxError( + "Failed to contact action cache service at {}: {}".format(self.action_spec.url, e) + ) from e + else: + self.ac_remote = None + + def run_remote_command(self, action_digest): # Sends an execution request to the remote execution server. # # This function blocks until it gets a response from the server. - # Try to create a communication channel to the BuildGrid server. - stub = remote_execution_pb2_grpc.ExecutionStub(channel) + stub = self.exec_remote.exec_service request = remote_execution_pb2.ExecuteRequest( - instance_name=self.exec_spec.instance_name, action_digest=action_digest, skip_cache_lookup=False + instance_name=self.exec_remote.instance_name, action_digest=action_digest, skip_cache_lookup=False ) def __run_remote_command(stub, execute_request=None, running_operation=None): @@ -117,7 +201,7 @@ def __run_remote_command(stub, execute_request=None, running_operation=None): operation = None with self._get_context().messenger.timed_activity( "Waiting for the remote build to complete", element_name=self._get_element_name() - ), _signals.terminator(partial(self.cancel_operation, channel)): + ), _signals.terminator(self.cancel_operation): operation = __run_remote_command(stub, execute_request=request) if operation is None: return None @@ -128,12 +212,12 @@ def __run_remote_command(stub, execute_request=None, running_operation=None): return operation - def cancel_operation(self, channel): + def cancel_operation(self): # If we don't have the name can't send request. if self.operation_name is None: return - stub = operations_pb2_grpc.OperationsStub(channel) + stub = self.exec_remote.operations_service request = operations_pb2.CancelOperationRequest(name=str(self.operation_name)) try: @@ -205,10 +289,8 @@ def _execute_action(self, action, flags): raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e # Now request to execute the action - channel = self.exec_spec.open_channel() - with channel: - operation = self.run_remote_command(channel, action_digest) - action_result = self._extract_action_result(operation) + operation = self.run_remote_command(action_digest) + action_result = self._extract_action_result(operation) # Fetch outputs for output_directory in action_result.output_directories: @@ -243,25 +325,23 @@ def _check_action_cache(self, action_digest): # # Should return either the action response or None if not found, raise # Sandboxerror if other grpc error was raised - if not self.action_spec: + if not self.ac_remote: return None - channel = self.action_spec.open_channel() - with channel: - request = remote_execution_pb2.GetActionResultRequest( - instance_name=self.action_spec.instance_name, action_digest=action_digest - ) - stub = remote_execution_pb2_grpc.ActionCacheStub(channel) - try: - result = stub.GetActionResult(request) - except grpc.RpcError as e: - if e.code() != grpc.StatusCode.NOT_FOUND: - raise SandboxError("Failed to query action cache: {} ({})".format(e.code(), e.details())) - return None - else: - context = self._get_context() - context.messenger.info("Action result found in action cache", element_name=self._get_element_name()) - return result + request = remote_execution_pb2.GetActionResultRequest( + instance_name=self.ac_remote.instance_name, action_digest=action_digest + ) + stub = self.ac_remote.ac_service + try: + result = stub.GetActionResult(request) + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.NOT_FOUND: + raise SandboxError("Failed to query action cache: {} ({})".format(e.code(), e.details())) + return None + else: + context = self._get_context() + context.messenger.info("Action result found in action cache", element_name=self._get_element_name()) + return result @staticmethod def _extract_action_result(operation): @@ -288,5 +368,9 @@ def _extract_action_result(operation): return execution_response.result def _cleanup(self): + if self.ac_remote: + self.ac_remote.close() + if self.exec_remote: + self.exec_remote.close() if self.own_storage_remote: self.storage_remote.close() diff --git a/tests/remotecache/simple.py b/tests/remotecache/simple.py index 77d08b888..6cbc45da4 100644 --- a/tests/remotecache/simple.py +++ b/tests/remotecache/simple.py @@ -80,4 +80,5 @@ def test_remote_autotools_build_no_cache(cli, datafiles): result.assert_success() assert """WARNING Failed to initialize remote""" in result.stderr - assert """Remote initialisation failed with status UNAVAILABLE: DNS resolution failed""" in result.stderr + assert """Remote initialisation failed with status UNAVAILABLE""" in result.stderr + assert """DNS resolution failed""" in result.stderr From b099526533ca58330748cc24e9f54ddaa6d5c35c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Fri, 5 Jul 2024 09:21:31 +0200 Subject: [PATCH 3/3] .github/common.env: Update CI images to use BuildBox 1.2.8 --- .github/common.env | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/common.env b/.github/common.env index f2b67727b..ceb8adef4 100644 --- a/.github/common.env +++ b/.github/common.env @@ -1,6 +1,6 @@ # Shared common variables -CI_IMAGE_VERSION=master-1091773482 +CI_IMAGE_VERSION=master-1361451963 CI_TOXENV_MAIN=py37,py38,py39,py310,py311,py312 CI_TOXENV_PLUGINS=py37-plugins,py38-plugins,py39-plugins,py310-plugins,py311-plugins,py312-plugins CI_TOXENV_ALL="${CI_TOXENV_MAIN},${CI_TOXENV_PLUGINS}"