Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/common.env
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Shared common variables

CI_IMAGE_VERSION=master-1091773482
CI_IMAGE_VERSION=master-1361451963
CI_TOXENV_MAIN=py37,py38,py39,py310,py311,py312
CI_TOXENV_PLUGINS=py37-plugins,py38-plugins,py39-plugins,py310-plugins,py311-plugins,py312-plugins
CI_TOXENV_ALL="${CI_TOXENV_MAIN},${CI_TOXENV_PLUGINS}"
37 changes: 37 additions & 0 deletions src/buildstream/_cas/casdprocessmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
from .._protos.build.buildgrid import local_cas_pb2_grpc
from .._protos.google.bytestream import bytestream_pb2_grpc
from .._protos.google.longrunning import operations_pb2_grpc

from .. import _site
from .. import utils
Expand Down Expand Up @@ -125,6 +126,9 @@ def __init__(self, path, log_dir, log_level, cache_quota, remote_cache_spec, pro
self._local_cas = None
self._asset_fetch = None
self._asset_push = None
self._exec_service = None
self._operations_service = None
self._ac_service = None
self._shutdown_requested = False

self._lock = threading.Lock()
Expand Down Expand Up @@ -282,6 +286,9 @@ def release_resources(self, messenger=None):
self._shutdown_requested = True
with self._lock:
if self._casd_channel:
self._ac_service = None
self._operations_service = None
self._exec_service = None
self._asset_push = None
self._asset_fetch = None
self._local_cas = None
Expand Down Expand Up @@ -374,6 +381,9 @@ def _establish_connection(self):
self._local_cas = local_cas_pb2_grpc.LocalContentAddressableStorageStub(self._casd_channel)
self._asset_fetch = remote_asset_pb2_grpc.FetchStub(self._casd_channel)
self._asset_push = remote_asset_pb2_grpc.PushStub(self._casd_channel)
self._exec_service = remote_execution_pb2_grpc.ExecutionStub(self._casd_channel)
self._operations_service = operations_pb2_grpc.OperationsStub(self._casd_channel)
self._ac_service = remote_execution_pb2_grpc.ActionCacheStub(self._casd_channel)

# get_cas():
#
Expand Down Expand Up @@ -415,3 +425,30 @@ def get_asset_push(self):
if self._casd_channel is None:
self._establish_connection()
return self._asset_push

# get_exec_service():
#
# Return Remote Execution stub for buildbox-casd channel.
#
def get_exec_service(self):
if self._casd_channel is None:
self._establish_connection()
return self._exec_service

# get_operations_service():
#
# Return Operations stub for buildbox-casd channel.
#
def get_operations_service(self):
if self._casd_channel is None:
self._establish_connection()
return self._operations_service

# get_ac_service():
#
# Return Action Cache stub for buildbox-casd channel.
#
def get_ac_service(self):
if self._casd_channel is None:
self._establish_connection()
return self._ac_service
142 changes: 113 additions & 29 deletions src/buildstream/sandbox/_sandboxremote.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,84 @@
# Jim MacArthur <jim.macarthur@codethink.co.uk>

import shutil
from functools import partial

import grpc

from ._sandboxreapi import SandboxREAPI
from .. import _signals
from .._remote import BaseRemote
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from .._protos.build.buildgrid import local_cas_pb2
from .._protos.google.rpc import code_pb2
from .._exceptions import BstError, SandboxError
from .._protos.google.longrunning import operations_pb2, operations_pb2_grpc
from .._cas import CASRemote


class ExecutionRemote(BaseRemote):
def __init__(self, spec, casd):
super().__init__(spec)
self.casd = casd
self.instance_name = None
self.exec_service = None
self.operations_service = None

def close(self):
self.exec_service = None
self.operations_service = None
super().close()

def _configure_protocols(self):
local_cas = self.casd.get_local_cas()
request = local_cas_pb2.GetInstanceNameForRemotesRequest()
self.spec.to_localcas_remote(request.execution)
try:
response = local_cas.GetInstanceNameForRemotes(request)
self.instance_name = response.instance_name
self.exec_service = self.casd.get_exec_service()
self.operations_service = self.casd.get_operations_service()
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.UNIMPLEMENTED or e.code() == grpc.StatusCode.INVALID_ARGUMENT:
# buildbox-casd is too old to support execution service remotes.
# Fall back to direct connection.
self.instance_name = self.spec.instance_name
self.channel = self.spec.open_channel()
self.exec_service = remote_execution_pb2_grpc.ExecutionStub(self.channel)
self.operations_service = operations_pb2_grpc.OperationsStub(self.channel)
else:
raise


class ActionCacheRemote(BaseRemote):
def __init__(self, spec, casd):
super().__init__(spec)
self.casd = casd
self.instance_name = None
self.ac_service = None

def close(self):
self.ac_service = None
super().close()

def _configure_protocols(self):
local_cas = self.casd.get_local_cas()
request = local_cas_pb2.GetInstanceNameForRemotesRequest()
self.spec.to_localcas_remote(request.action_cache)
try:
response = local_cas.GetInstanceNameForRemotes(request)
self.instance_name = response.instance_name
self.ac_service = self.casd.get_ac_service()
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.UNIMPLEMENTED or e.code() == grpc.StatusCode.INVALID_ARGUMENT:
# buildbox-casd is too old to support action cache remotes.
# Fall back to direct connection.
self.instance_name = self.spec.instance_name
self.channel = self.spec.open_channel()
self.ac_service = remote_execution_pb2_grpc.ActionCacheStub(self.channel)
else:
raise


# SandboxRemote()
#
# This isn't really a sandbox, it's a stub which sends all the sources and build
Expand All @@ -39,6 +104,7 @@ def __init__(self, *args, **kwargs):

context = self._get_context()
cascache = context.get_cascache()
casd = context.get_casd()

specs = context.remote_execution_specs
if specs is None:
Expand All @@ -62,15 +128,33 @@ def __init__(self, *args, **kwargs):
self.own_storage_remote = False
self.storage_remote = cascache.get_default_remote()

def run_remote_command(self, channel, action_digest):
self.exec_remote = ExecutionRemote(self.exec_spec, casd)
try:
self.exec_remote.init()
except grpc.RpcError as e:
raise SandboxError(
"Failed to contact remote execution service at {}: {}".format(self.exec_spec.url, e)
) from e

if self.action_spec:
self.ac_remote = ActionCacheRemote(self.action_spec, casd)
try:
self.ac_remote.init()
except grpc.RpcError as e:
raise SandboxError(
"Failed to contact action cache service at {}: {}".format(self.action_spec.url, e)
) from e
else:
self.ac_remote = None

def run_remote_command(self, action_digest):
# Sends an execution request to the remote execution server.
#
# This function blocks until it gets a response from the server.

# Try to create a communication channel to the BuildGrid server.
stub = remote_execution_pb2_grpc.ExecutionStub(channel)
stub = self.exec_remote.exec_service
request = remote_execution_pb2.ExecuteRequest(
instance_name=self.exec_spec.instance_name, action_digest=action_digest, skip_cache_lookup=False
instance_name=self.exec_remote.instance_name, action_digest=action_digest, skip_cache_lookup=False
)

def __run_remote_command(stub, execute_request=None, running_operation=None):
Expand Down Expand Up @@ -117,7 +201,7 @@ def __run_remote_command(stub, execute_request=None, running_operation=None):
operation = None
with self._get_context().messenger.timed_activity(
"Waiting for the remote build to complete", element_name=self._get_element_name()
), _signals.terminator(partial(self.cancel_operation, channel)):
), _signals.terminator(self.cancel_operation):
operation = __run_remote_command(stub, execute_request=request)
if operation is None:
return None
Expand All @@ -128,12 +212,12 @@ def __run_remote_command(stub, execute_request=None, running_operation=None):

return operation

def cancel_operation(self, channel):
def cancel_operation(self):
# If we don't have the name can't send request.
if self.operation_name is None:
return

stub = operations_pb2_grpc.OperationsStub(channel)
stub = self.exec_remote.operations_service
request = operations_pb2.CancelOperationRequest(name=str(self.operation_name))

try:
Expand Down Expand Up @@ -205,10 +289,8 @@ def _execute_action(self, action, flags):
raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e

# Now request to execute the action
channel = self.exec_spec.open_channel()
with channel:
operation = self.run_remote_command(channel, action_digest)
action_result = self._extract_action_result(operation)
operation = self.run_remote_command(action_digest)
action_result = self._extract_action_result(operation)

# Fetch outputs
for output_directory in action_result.output_directories:
Expand Down Expand Up @@ -243,25 +325,23 @@ def _check_action_cache(self, action_digest):
#
# Should return either the action response or None if not found, raise
# Sandboxerror if other grpc error was raised
if not self.action_spec:
if not self.ac_remote:
return None

channel = self.action_spec.open_channel()
with channel:
request = remote_execution_pb2.GetActionResultRequest(
instance_name=self.action_spec.instance_name, action_digest=action_digest
)
stub = remote_execution_pb2_grpc.ActionCacheStub(channel)
try:
result = stub.GetActionResult(request)
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.NOT_FOUND:
raise SandboxError("Failed to query action cache: {} ({})".format(e.code(), e.details()))
return None
else:
context = self._get_context()
context.messenger.info("Action result found in action cache", element_name=self._get_element_name())
return result
request = remote_execution_pb2.GetActionResultRequest(
instance_name=self.ac_remote.instance_name, action_digest=action_digest
)
stub = self.ac_remote.ac_service
try:
result = stub.GetActionResult(request)
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.NOT_FOUND:
raise SandboxError("Failed to query action cache: {} ({})".format(e.code(), e.details()))
return None
else:
context = self._get_context()
context.messenger.info("Action result found in action cache", element_name=self._get_element_name())
return result

@staticmethod
def _extract_action_result(operation):
Expand All @@ -288,5 +368,9 @@ def _extract_action_result(operation):
return execution_response.result

def _cleanup(self):
if self.ac_remote:
self.ac_remote.close()
if self.exec_remote:
self.exec_remote.close()
if self.own_storage_remote:
self.storage_remote.close()
3 changes: 2 additions & 1 deletion tests/remotecache/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,5 @@ def test_remote_autotools_build_no_cache(cli, datafiles):
result.assert_success()

assert """WARNING Failed to initialize remote""" in result.stderr
assert """Remote initialisation failed with status UNAVAILABLE: DNS resolution failed""" in result.stderr
assert """Remote initialisation failed with status UNAVAILABLE""" in result.stderr
assert """DNS resolution failed""" in result.stderr