From 7a95d220c7845582599aa985c521d2a614f480bf Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Fri, 18 Oct 2024 15:59:13 -0700 Subject: [PATCH 01/11] [#32564] Allow Beam devs in the repo with Go installed to auto build prism. --- .../runners/portability/prism_runner.py | 99 +++++++++++++------ 1 file changed, 71 insertions(+), 28 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/prism_runner.py b/sdks/python/apache_beam/runners/portability/prism_runner.py index eeccaf5748ce..4c2b5a31b317 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner.py @@ -27,6 +27,7 @@ import platform import shutil import stat +import subprocess import typing import urllib import zipfile @@ -167,38 +168,80 @@ def construct_download_url(self, root_tag: str, sys: str, mach: str) -> str: def path_to_binary(self) -> str: if self._path is not None: - if not os.path.exists(self._path): - url = urllib.parse.urlparse(self._path) - if not url.scheme: - raise ValueError( - 'Unable to parse binary URL "%s". If using a full URL, make ' - 'sure the scheme is specified. If using a local file xpath, ' - 'make sure the file exists; you may have to first build prism ' - 'using `go build `.' % (self._path)) - - # We have a URL, see if we need to construct a valid file name. - if self._path.startswith(GITHUB_DOWNLOAD_PREFIX): - # If this URL starts with the download prefix, let it through. - return self._path - # The only other valid option is a github release page. - if not self._path.startswith(GITHUB_TAG_PREFIX): - raise ValueError( - 'Provided --prism_location URL is not an Apache Beam Github ' - 'Release page URL or download URL: %s' % (self._path)) - # Get the root tag for this URL - root_tag = os.path.basename(os.path.normpath(self._path)) - return self.construct_download_url( - root_tag, platform.system(), platform.machine()) - return self._path - else: - if '.dev' in self._version: + # The path is overidden, check various cases. + if os.path.exists(self._path): + # The path is local and exists, use directly. + return self.bin_path + + # Check if the path is a URL. + url = urllib.parse.urlparse(self._path) + if not url.scheme: raise ValueError( - 'Unable to derive URL for dev versions "%s". Please provide an ' - 'alternate version to derive the release URL with the ' - '--prism_beam_version_override flag.' % (self._version)) + 'Unable to parse binary URL "%s". If using a full URL, make ' + 'sure the scheme is specified. If using a local file xpath, ' + 'make sure the file exists; you may have to first build prism ' + 'using `go build `.' % (self._path)) + + # We have a URL, see if we need to construct a valid file name. + if self._path.startswith(GITHUB_DOWNLOAD_PREFIX): + # If this URL starts with the download prefix, let it through. + return self._path + # The only other valid option is a github release page. + if not self._path.startswith(GITHUB_TAG_PREFIX): + raise ValueError( + 'Provided --prism_location URL is not an Apache Beam Github ' + 'Release page URL or download URL: %s' % (self._path)) + # Get the root tag for this URL + root_tag = os.path.basename(os.path.normpath(self._path)) + return self.construct_download_url( + root_tag, platform.system(), platform.machine()) + + if '.dev' not in self._version: + # Not a development version, so construct the production download URL return self.construct_download_url( self._version, platform.system(), platform.machine()) + # This is a development version! Assume Go is installed. + # Set the install directory to the cache location. + envdict = dict(os.environ) | {"GOBIN": self.BIN_CACHE} + PRISMPKG = "github.com/apache/beam/sdks/v2/go/cmd/prism" + + process = subprocess.run(["go", "install", PRISMPKG], + stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=envdict) + if process.returncode == 0: + # Successfully installed + return '%s/prism' % (self.BIN_CACHE) + + # We failed to build for some reason. + output = process.stdout.decode("utf-8") + if "not in a module" not in output and "no required module provides" not in output: + # This branch handles two classes of failures: + # 1. Go isn't installed, so it needs to be installed by the Beam SDK developer. + # 2. Go is installed, and they are building in a local version of Prism, + # but there was a compile error, that the developer should address. + # Either way, the @latest fallback either would fail, or hide the error, so fail now. + _LOGGER.info(output) + raise ValueError( + 'Unable to install a local of Prism: "%s";\n' + 'Likely Go is not installed, or a local change to Prism did not compile.\n' + 'Please install Go (see https://go.dev/doc/install) to enable automatic local builds.\n' + 'Alternatively provide an alternate version with the ' + '--prism_beam_version_override flag.\nCaptured output:\n %s' % (self._version, output)) + + # Go is installed and claims we're not in a Go module that has access to the Prism package. + # Fallback to using the @latest version of prism, which works everywhere. + process = subprocess.run(["go", "install", PRISMPKG + "@latest"], + stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=envdict) + + if process.returncode == 0: + return '%s/prism' % (self.BIN_CACHE) + + output = process.stdout.decode("utf-8") + raise ValueError( + 'We were unable to execute the subprocess "%s" to automatically build prism. \n' + 'Alternatively provide an alternate version with the ' + '--prism_beam_version_override flag.\nCaptured output:\n %s' % (process.args, output)) + def subprocess_cmd_and_endpoint( self) -> typing.Tuple[typing.List[typing.Any], str]: bin_path = self.local_bin( From aaf15894d4e4d2598d494f26fe54caf10d16f83e Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Fri, 18 Oct 2024 16:02:06 -0700 Subject: [PATCH 02/11] [prism] Have python attempt to use prism, but fallback to direct runner otherwise. --- .../runners/direct/direct_runner.py | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py index 49b6622816ce..e9a58d053da4 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner.py @@ -110,6 +110,27 @@ def visit_transform(self, applied_ptransform): if timer.time_domain == TimeDomain.REAL_TIME: self.supported_by_fnapi_runner = False + class _PrismRunnerSupportVisitor(PipelineVisitor): + """Visitor determining if a Pipeline can be run on the PrismRunner.""" + def accept(self, pipeline): + self.supported_by_prism_runner = True + pipeline.visit(self) + return self.supported_by_prism_runner + + def visit_transform(self, applied_ptransform): + transform = applied_ptransform.transform + if isinstance(transform, beam.ParDo): + dofn = transform.dofn + # It's uncertain if the Prism Runner supports execution of CombineFns + # with deferred side inputs. + if isinstance(dofn, CombineValuesDoFn): + args, kwargs = transform.raw_side_inputs + args_to_check = itertools.chain(args, kwargs.values()) + if any(isinstance(arg, ArgumentPlaceholder) + for arg in args_to_check): + self.supported_by_prism_runner = False + + tryingPrism = False # Check whether all transforms used in the pipeline are supported by the # FnApiRunner, and the pipeline was not meant to be run as streaming. if _FnApiRunnerSupportVisitor().accept(pipeline): @@ -122,8 +143,26 @@ def visit_transform(self, applied_ptransform): beam_provision_api_pb2.ProvisionInfo( pipeline_options=encoded_options)) runner = fn_runner.FnApiRunner(provision_info=provision_info) + elif _PrismRunnerSupportVisitor().accept(pipeline): + _LOGGER.info('Running pipeline with PrismRunner.') + from apache_beam.runners.portability import prism_runner + runner = prism_runner.PrismRunner() + tryingPrism = True else: runner = BundleBasedDirectRunner() + + if tryingPrism: + try: + pr = runner.run_pipeline(pipeline, options) + if pr.state.is_terminal() and pr.state != PipelineState.DONE: + _LOGGER.info('Pipeline failed on PrismRunner, falling back toDirectRunner.') + runner = BundleBasedDirectRunner() + else: + return pr + except Exception as e: + _LOGGER.info('Exception with PrismRunner:\n %s\n' % (e)) + _LOGGER.info('Falling back to DirectRunner') + runner = BundleBasedDirectRunner() return runner.run_pipeline(pipeline, options) From 8cef00221ffd2b6ba2284b70af697a1f49be56f2 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Fri, 18 Oct 2024 16:18:12 -0700 Subject: [PATCH 03/11] Add comments. --- sdks/python/apache_beam/runners/direct/direct_runner.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py index e9a58d053da4..b0e5ae5e8f94 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner.py @@ -154,12 +154,16 @@ def visit_transform(self, applied_ptransform): if tryingPrism: try: pr = runner.run_pipeline(pipeline, options) + # This is non-blocking, so if the state is *already* finished, something + # probably failed on job submission. if pr.state.is_terminal() and pr.state != PipelineState.DONE: _LOGGER.info('Pipeline failed on PrismRunner, falling back toDirectRunner.') runner = BundleBasedDirectRunner() else: return pr except Exception as e: + # If prism fails in Preparing the portable job, then the PortableRunner + # code raises an exception. Catch it, log it, and use the Direct runner instead. _LOGGER.info('Exception with PrismRunner:\n %s\n' % (e)) _LOGGER.info('Falling back to DirectRunner') runner = BundleBasedDirectRunner() From 5148f607bb6acd35fdaf9f02e35a611e4355699e Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Mon, 21 Oct 2024 09:30:12 -0700 Subject: [PATCH 04/11] Attempt to format. --- .../runners/direct/direct_runner.py | 29 +++++++++++++++---- .../runners/portability/prism_runner.py | 27 ++++++++++------- 2 files changed, 41 insertions(+), 15 deletions(-) diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py index b0e5ae5e8f94..14b7928c85be 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner.py @@ -66,6 +66,7 @@ class SwitchingDirectRunner(PipelineRunner): which supports streaming execution and certain primitives not yet implemented in the FnApiRunner. """ + def is_fnapi_compatible(self): return BundleBasedDirectRunner.is_fnapi_compatible() @@ -78,6 +79,7 @@ def run_pipeline(self, pipeline, options): class _FnApiRunnerSupportVisitor(PipelineVisitor): """Visitor determining if a Pipeline can be run on the FnApiRunner.""" + def accept(self, pipeline): self.supported_by_fnapi_runner = True pipeline.visit(self) @@ -112,6 +114,7 @@ def visit_transform(self, applied_ptransform): class _PrismRunnerSupportVisitor(PipelineVisitor): """Visitor determining if a Pipeline can be run on the PrismRunner.""" + def accept(self, pipeline): self.supported_by_prism_runner = True pipeline.visit(self) @@ -143,21 +146,22 @@ def visit_transform(self, applied_ptransform): beam_provision_api_pb2.ProvisionInfo( pipeline_options=encoded_options)) runner = fn_runner.FnApiRunner(provision_info=provision_info) - elif _PrismRunnerSupportVisitor().accept(pipeline): + elif _PrismRunnerSupportVisitor().accept(pipeline): _LOGGER.info('Running pipeline with PrismRunner.') from apache_beam.runners.portability import prism_runner runner = prism_runner.PrismRunner() tryingPrism = True else: runner = BundleBasedDirectRunner() - + if tryingPrism: - try: + try: pr = runner.run_pipeline(pipeline, options) - # This is non-blocking, so if the state is *already* finished, something + # This is non-blocking, so if the state is *already* finished, something # probably failed on job submission. if pr.state.is_terminal() and pr.state != PipelineState.DONE: - _LOGGER.info('Pipeline failed on PrismRunner, falling back toDirectRunner.') + _LOGGER.info( + 'Pipeline failed on PrismRunner, falling back toDirectRunner.') runner = BundleBasedDirectRunner() else: return pr @@ -180,6 +184,7 @@ def visit_transform(self, applied_ptransform): @typehints.with_output_types(typing.Tuple[K, typing.Iterable[V]]) class _GroupByKeyOnly(PTransform): """A group by key transform, ignoring windows.""" + def infer_output_type(self, input_type): key_type, value_type = trivial_inference.key_value_types(input_type) return typehints.KV[key_type, typehints.Iterable[value_type]] @@ -193,6 +198,7 @@ def expand(self, pcoll): @typehints.with_output_types(typing.Tuple[K, typing.Iterable[V]]) class _GroupAlsoByWindow(ParDo): """The GroupAlsoByWindow transform.""" + def __init__(self, windowing): super().__init__(_GroupAlsoByWindowDoFn(windowing)) self.windowing = windowing @@ -267,6 +273,7 @@ def from_runner_api_parameter(unused_ptransform, payload, context): @typehints.with_output_types(typing.Tuple[K, typing.Iterable[V]]) class _GroupByKey(PTransform): """The DirectRunner GroupByKey implementation.""" + def expand(self, pcoll): # Imported here to avoid circular dependencies. # pylint: disable=wrong-import-order, wrong-import-position @@ -329,6 +336,7 @@ def _get_transform_overrides(pipeline_options): from apache_beam.runners.direct.sdf_direct_runner import SplittableParDoOverride class CombinePerKeyOverride(PTransformOverride): + def matches(self, applied_ptransform): if isinstance(applied_ptransform.transform, CombinePerKey): return applied_ptransform.inputs[0].windowing.is_default() @@ -346,6 +354,7 @@ def get_replacement_transform_for_applied_ptransform( return transform class StreamingGroupByKeyOverride(PTransformOverride): + def matches(self, applied_ptransform): # Note: we match the exact class, since we replace it with a subclass. return applied_ptransform.transform.__class__ == _GroupByKeyOnly @@ -357,6 +366,7 @@ def get_replacement_transform_for_applied_ptransform( return transform class StreamingGroupAlsoByWindowOverride(PTransformOverride): + def matches(self, applied_ptransform): # Note: we match the exact class, since we replace it with a subclass. transform = applied_ptransform.transform @@ -373,6 +383,7 @@ def get_replacement_transform_for_applied_ptransform( return transform class TestStreamOverride(PTransformOverride): + def matches(self, applied_ptransform): from apache_beam.testing.test_stream import TestStream self.applied_ptransform = applied_ptransform @@ -388,6 +399,7 @@ class GroupByKeyPTransformOverride(PTransformOverride): This replaces the Beam implementation as a primitive. """ + def matches(self, applied_ptransform): # Imported here to avoid circular dependencies. # pylint: disable=wrong-import-order, wrong-import-position @@ -429,6 +441,7 @@ def get_replacement_transform_for_applied_ptransform( class _DirectReadFromPubSub(PTransform): + def __init__(self, source): self._source = source @@ -504,6 +517,7 @@ def _get_pubsub_transform_overrides(pipeline_options): from apache_beam.pipeline import PTransformOverride class ReadFromPubSubOverride(PTransformOverride): + def matches(self, applied_ptransform): return isinstance( applied_ptransform.transform, beam_pubsub.ReadFromPubSub) @@ -517,6 +531,7 @@ def get_replacement_transform_for_applied_ptransform( return _DirectReadFromPubSub(applied_ptransform.transform._source) class WriteToPubSubOverride(PTransformOverride): + def matches(self, applied_ptransform): return isinstance(applied_ptransform.transform, beam_pubsub.WriteToPubSub) @@ -533,6 +548,7 @@ def get_replacement_transform_for_applied_ptransform( class BundleBasedDirectRunner(PipelineRunner): """Executes a single pipeline on the local machine.""" + @staticmethod def is_fnapi_compatible(): return False @@ -555,6 +571,7 @@ def run_pipeline(self, pipeline, options): class VerifyNoCrossLanguageTransforms(PipelineVisitor): """Visitor determining whether a Pipeline uses a TestStream.""" + def visit_transform(self, applied_ptransform): if isinstance(applied_ptransform.transform, ExternalTransform): raise RuntimeError( @@ -568,6 +585,7 @@ def visit_transform(self, applied_ptransform): # If the TestStream I/O is used, use a mock test clock. class TestStreamUsageVisitor(PipelineVisitor): """Visitor determining whether a Pipeline uses a TestStream.""" + def __init__(self): self.uses_test_stream = False @@ -618,6 +636,7 @@ def visit_transform(self, applied_ptransform): class DirectPipelineResult(PipelineResult): """A DirectPipelineResult provides access to info about a pipeline.""" + def __init__(self, executor, evaluation_context): super().__init__(PipelineState.RUNNING) self._executor = executor diff --git a/sdks/python/apache_beam/runners/portability/prism_runner.py b/sdks/python/apache_beam/runners/portability/prism_runner.py index 4c2b5a31b317..cbdc6715104b 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner.py @@ -56,6 +56,7 @@ class PrismRunner(portable_runner.PortableRunner): """A runner for launching jobs on Prism, automatically downloading and starting a Prism instance if needed. """ + def default_environment( self, options: pipeline_options.PipelineOptions) -> environments.Environment: @@ -197,7 +198,7 @@ def path_to_binary(self) -> str: root_tag, platform.system(), platform.machine()) if '.dev' not in self._version: - # Not a development version, so construct the production download URL + # Not a development version, so construct the production download URL return self.construct_download_url( self._version, platform.system(), platform.machine()) @@ -207,12 +208,14 @@ def path_to_binary(self) -> str: PRISMPKG = "github.com/apache/beam/sdks/v2/go/cmd/prism" process = subprocess.run(["go", "install", PRISMPKG], - stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=envdict) + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + env=envdict) if process.returncode == 0: # Successfully installed return '%s/prism' % (self.BIN_CACHE) - - # We failed to build for some reason. + + # We failed to build for some reason. output = process.stdout.decode("utf-8") if "not in a module" not in output and "no required module provides" not in output: # This branch handles two classes of failures: @@ -226,21 +229,25 @@ def path_to_binary(self) -> str: 'Likely Go is not installed, or a local change to Prism did not compile.\n' 'Please install Go (see https://go.dev/doc/install) to enable automatic local builds.\n' 'Alternatively provide an alternate version with the ' - '--prism_beam_version_override flag.\nCaptured output:\n %s' % (self._version, output)) - - # Go is installed and claims we're not in a Go module that has access to the Prism package. + '--prism_beam_version_override flag.\nCaptured output:\n %s' % + (self._version, output)) + + # Go is installed and claims we're not in a Go module that has access to the Prism package. # Fallback to using the @latest version of prism, which works everywhere. process = subprocess.run(["go", "install", PRISMPKG + "@latest"], - stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=envdict) + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + env=envdict) if process.returncode == 0: return '%s/prism' % (self.BIN_CACHE) - + output = process.stdout.decode("utf-8") raise ValueError( 'We were unable to execute the subprocess "%s" to automatically build prism. \n' 'Alternatively provide an alternate version with the ' - '--prism_beam_version_override flag.\nCaptured output:\n %s' % (process.args, output)) + '--prism_beam_version_override flag.\nCaptured output:\n %s' % + (process.args, output)) def subprocess_cmd_and_endpoint( self) -> typing.Tuple[typing.List[typing.Any], str]: From eae9edfe0275258ef1a4755adadd0a0fe306e2e6 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Mon, 21 Oct 2024 10:27:02 -0700 Subject: [PATCH 05/11] Fix flag references in prism_runner.py --- .../apache_beam/runners/portability/prism_runner.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/prism_runner.py b/sdks/python/apache_beam/runners/portability/prism_runner.py index cbdc6715104b..2148ab9827fa 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner.py @@ -221,15 +221,15 @@ def path_to_binary(self) -> str: # This branch handles two classes of failures: # 1. Go isn't installed, so it needs to be installed by the Beam SDK developer. # 2. Go is installed, and they are building in a local version of Prism, - # but there was a compile error, that the developer should address. + # but there was a compile error that the developer should address. # Either way, the @latest fallback either would fail, or hide the error, so fail now. _LOGGER.info(output) raise ValueError( 'Unable to install a local of Prism: "%s";\n' 'Likely Go is not installed, or a local change to Prism did not compile.\n' 'Please install Go (see https://go.dev/doc/install) to enable automatic local builds.\n' - 'Alternatively provide an alternate version with the ' - '--prism_beam_version_override flag.\nCaptured output:\n %s' % + 'Alternatively provide a binary with the --prism_location flag.' + '\nCaptured output:\n %s' % (self._version, output)) # Go is installed and claims we're not in a Go module that has access to the Prism package. @@ -245,8 +245,8 @@ def path_to_binary(self) -> str: output = process.stdout.decode("utf-8") raise ValueError( 'We were unable to execute the subprocess "%s" to automatically build prism. \n' - 'Alternatively provide an alternate version with the ' - '--prism_beam_version_override flag.\nCaptured output:\n %s' % + 'Alternatively provide an alternate binary with the --prism_location flag.' + '\nCaptured output:\n %s' % (process.args, output)) def subprocess_cmd_and_endpoint( From ba48a1a00c632b008c059bf525fb74132ec08eff Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Mon, 21 Oct 2024 10:27:19 -0700 Subject: [PATCH 06/11] Filter out processing time tests from Prism. --- sdks/python/apache_beam/runners/direct/direct_runner.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py index 14b7928c85be..e7c845d311e1 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner.py @@ -132,6 +132,12 @@ def visit_transform(self, applied_ptransform): if any(isinstance(arg, ArgumentPlaceholder) for arg in args_to_check): self.supported_by_prism_runner = False + if userstate.is_stateful_dofn(dofn): + # https://github.com/apache/beam/issues/32786 - Remove once Real time clock is used. + _, timer_specs = userstate.get_dofn_specs(dofn) + for timer in timer_specs: + if timer.time_domain == TimeDomain.REAL_TIME: + self.supported_by_prism_runner = False tryingPrism = False # Check whether all transforms used in the pipeline are supported by the From 1777b04d0c88d1388d023f12eb21324f2b9b8c9d Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Mon, 21 Oct 2024 11:45:47 -0700 Subject: [PATCH 07/11] Format with yapf 0.29.0 which beam relys on. --- .../runners/direct/direct_runner.py | 18 ------------------ .../runners/portability/prism_runner.py | 10 ++++------ 2 files changed, 4 insertions(+), 24 deletions(-) diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py index e7c845d311e1..2e7942da3b56 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner.py @@ -66,7 +66,6 @@ class SwitchingDirectRunner(PipelineRunner): which supports streaming execution and certain primitives not yet implemented in the FnApiRunner. """ - def is_fnapi_compatible(self): return BundleBasedDirectRunner.is_fnapi_compatible() @@ -79,7 +78,6 @@ def run_pipeline(self, pipeline, options): class _FnApiRunnerSupportVisitor(PipelineVisitor): """Visitor determining if a Pipeline can be run on the FnApiRunner.""" - def accept(self, pipeline): self.supported_by_fnapi_runner = True pipeline.visit(self) @@ -114,7 +112,6 @@ def visit_transform(self, applied_ptransform): class _PrismRunnerSupportVisitor(PipelineVisitor): """Visitor determining if a Pipeline can be run on the PrismRunner.""" - def accept(self, pipeline): self.supported_by_prism_runner = True pipeline.visit(self) @@ -190,7 +187,6 @@ def visit_transform(self, applied_ptransform): @typehints.with_output_types(typing.Tuple[K, typing.Iterable[V]]) class _GroupByKeyOnly(PTransform): """A group by key transform, ignoring windows.""" - def infer_output_type(self, input_type): key_type, value_type = trivial_inference.key_value_types(input_type) return typehints.KV[key_type, typehints.Iterable[value_type]] @@ -204,7 +200,6 @@ def expand(self, pcoll): @typehints.with_output_types(typing.Tuple[K, typing.Iterable[V]]) class _GroupAlsoByWindow(ParDo): """The GroupAlsoByWindow transform.""" - def __init__(self, windowing): super().__init__(_GroupAlsoByWindowDoFn(windowing)) self.windowing = windowing @@ -279,7 +274,6 @@ def from_runner_api_parameter(unused_ptransform, payload, context): @typehints.with_output_types(typing.Tuple[K, typing.Iterable[V]]) class _GroupByKey(PTransform): """The DirectRunner GroupByKey implementation.""" - def expand(self, pcoll): # Imported here to avoid circular dependencies. # pylint: disable=wrong-import-order, wrong-import-position @@ -342,7 +336,6 @@ def _get_transform_overrides(pipeline_options): from apache_beam.runners.direct.sdf_direct_runner import SplittableParDoOverride class CombinePerKeyOverride(PTransformOverride): - def matches(self, applied_ptransform): if isinstance(applied_ptransform.transform, CombinePerKey): return applied_ptransform.inputs[0].windowing.is_default() @@ -360,7 +353,6 @@ def get_replacement_transform_for_applied_ptransform( return transform class StreamingGroupByKeyOverride(PTransformOverride): - def matches(self, applied_ptransform): # Note: we match the exact class, since we replace it with a subclass. return applied_ptransform.transform.__class__ == _GroupByKeyOnly @@ -372,7 +364,6 @@ def get_replacement_transform_for_applied_ptransform( return transform class StreamingGroupAlsoByWindowOverride(PTransformOverride): - def matches(self, applied_ptransform): # Note: we match the exact class, since we replace it with a subclass. transform = applied_ptransform.transform @@ -389,7 +380,6 @@ def get_replacement_transform_for_applied_ptransform( return transform class TestStreamOverride(PTransformOverride): - def matches(self, applied_ptransform): from apache_beam.testing.test_stream import TestStream self.applied_ptransform = applied_ptransform @@ -405,7 +395,6 @@ class GroupByKeyPTransformOverride(PTransformOverride): This replaces the Beam implementation as a primitive. """ - def matches(self, applied_ptransform): # Imported here to avoid circular dependencies. # pylint: disable=wrong-import-order, wrong-import-position @@ -447,7 +436,6 @@ def get_replacement_transform_for_applied_ptransform( class _DirectReadFromPubSub(PTransform): - def __init__(self, source): self._source = source @@ -523,7 +511,6 @@ def _get_pubsub_transform_overrides(pipeline_options): from apache_beam.pipeline import PTransformOverride class ReadFromPubSubOverride(PTransformOverride): - def matches(self, applied_ptransform): return isinstance( applied_ptransform.transform, beam_pubsub.ReadFromPubSub) @@ -537,7 +524,6 @@ def get_replacement_transform_for_applied_ptransform( return _DirectReadFromPubSub(applied_ptransform.transform._source) class WriteToPubSubOverride(PTransformOverride): - def matches(self, applied_ptransform): return isinstance(applied_ptransform.transform, beam_pubsub.WriteToPubSub) @@ -554,7 +540,6 @@ def get_replacement_transform_for_applied_ptransform( class BundleBasedDirectRunner(PipelineRunner): """Executes a single pipeline on the local machine.""" - @staticmethod def is_fnapi_compatible(): return False @@ -577,7 +562,6 @@ def run_pipeline(self, pipeline, options): class VerifyNoCrossLanguageTransforms(PipelineVisitor): """Visitor determining whether a Pipeline uses a TestStream.""" - def visit_transform(self, applied_ptransform): if isinstance(applied_ptransform.transform, ExternalTransform): raise RuntimeError( @@ -591,7 +575,6 @@ def visit_transform(self, applied_ptransform): # If the TestStream I/O is used, use a mock test clock. class TestStreamUsageVisitor(PipelineVisitor): """Visitor determining whether a Pipeline uses a TestStream.""" - def __init__(self): self.uses_test_stream = False @@ -642,7 +625,6 @@ def visit_transform(self, applied_ptransform): class DirectPipelineResult(PipelineResult): """A DirectPipelineResult provides access to info about a pipeline.""" - def __init__(self, executor, evaluation_context): super().__init__(PipelineState.RUNNING) self._executor = executor diff --git a/sdks/python/apache_beam/runners/portability/prism_runner.py b/sdks/python/apache_beam/runners/portability/prism_runner.py index 2148ab9827fa..868cfa91b30a 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner.py @@ -56,7 +56,6 @@ class PrismRunner(portable_runner.PortableRunner): """A runner for launching jobs on Prism, automatically downloading and starting a Prism instance if needed. """ - def default_environment( self, options: pipeline_options.PipelineOptions) -> environments.Environment: @@ -229,11 +228,11 @@ def path_to_binary(self) -> str: 'Likely Go is not installed, or a local change to Prism did not compile.\n' 'Please install Go (see https://go.dev/doc/install) to enable automatic local builds.\n' 'Alternatively provide a binary with the --prism_location flag.' - '\nCaptured output:\n %s' % - (self._version, output)) + '\nCaptured output:\n %s' % (self._version, output)) # Go is installed and claims we're not in a Go module that has access to the Prism package. - # Fallback to using the @latest version of prism, which works everywhere. + + # Fallback to using the @latest version of prism, which works everywhere. process = subprocess.run(["go", "install", PRISMPKG + "@latest"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, @@ -246,8 +245,7 @@ def path_to_binary(self) -> str: raise ValueError( 'We were unable to execute the subprocess "%s" to automatically build prism. \n' 'Alternatively provide an alternate binary with the --prism_location flag.' - '\nCaptured output:\n %s' % - (process.args, output)) + '\nCaptured output:\n %s' % (process.args, output)) def subprocess_cmd_and_endpoint( self) -> typing.Tuple[typing.List[typing.Any], str]: From dc98cfacc7cc6f1c679112f57c9138207b9e14c0 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Mon, 21 Oct 2024 13:16:54 -0700 Subject: [PATCH 08/11] Fix local path, and use older merge syntax. --- sdks/python/apache_beam/runners/portability/prism_runner.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/prism_runner.py b/sdks/python/apache_beam/runners/portability/prism_runner.py index 868cfa91b30a..8100b9a9157d 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner.py @@ -171,7 +171,7 @@ def path_to_binary(self) -> str: # The path is overidden, check various cases. if os.path.exists(self._path): # The path is local and exists, use directly. - return self.bin_path + return self._path # Check if the path is a URL. url = urllib.parse.urlparse(self._path) @@ -203,7 +203,7 @@ def path_to_binary(self) -> str: # This is a development version! Assume Go is installed. # Set the install directory to the cache location. - envdict = dict(os.environ) | {"GOBIN": self.BIN_CACHE} + envdict = {**os.environ, "GOBIN": self.BIN_CACHE} PRISMPKG = "github.com/apache/beam/sdks/v2/go/cmd/prism" process = subprocess.run(["go", "install", PRISMPKG], From 6f5a58e651cf385ff3b498d10a6c9d233e0778e8 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Mon, 21 Oct 2024 13:17:09 -0700 Subject: [PATCH 09/11] Avoid using python teststream with prism. --- sdks/python/apache_beam/runners/direct/direct_runner.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py index 2e7942da3b56..8d0e10095145 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner.py @@ -119,6 +119,9 @@ def accept(self, pipeline): def visit_transform(self, applied_ptransform): transform = applied_ptransform.transform + # Python SDK assumes the direct runner TestStream implementation is being used. + if isinstance(transform, TestStream): + self.supported_by_prism_runner = False if isinstance(transform, beam.ParDo): dofn = transform.dofn # It's uncertain if the Prism Runner supports execution of CombineFns From 3bbe5987c4b1ed50442e091983fefd9a0eb90307 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Mon, 21 Oct 2024 13:36:56 -0700 Subject: [PATCH 10/11] formatter --- sdks/python/apache_beam/runners/portability/prism_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/portability/prism_runner.py b/sdks/python/apache_beam/runners/portability/prism_runner.py index 8100b9a9157d..dcf72b997d11 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner.py @@ -203,7 +203,7 @@ def path_to_binary(self) -> str: # This is a development version! Assume Go is installed. # Set the install directory to the cache location. - envdict = {**os.environ, "GOBIN": self.BIN_CACHE} + envdict = {**os.environ, "GOBIN": self.BIN_CACHE} PRISMPKG = "github.com/apache/beam/sdks/v2/go/cmd/prism" process = subprocess.run(["go", "install", PRISMPKG], From 55ef41a07e77eec2270b33faf706a3d8bd8fc954 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Mon, 21 Oct 2024 14:30:05 -0700 Subject: [PATCH 11/11] Delint --- .../runners/direct/direct_runner.py | 9 ++++-- .../runners/portability/prism_runner.py | 30 ++++++++++++------- 2 files changed, 25 insertions(+), 14 deletions(-) diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py index 8d0e10095145..8b8937653688 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner.py @@ -119,7 +119,8 @@ def accept(self, pipeline): def visit_transform(self, applied_ptransform): transform = applied_ptransform.transform - # Python SDK assumes the direct runner TestStream implementation is being used. + # Python SDK assumes the direct runner TestStream implementation is + # being used. if isinstance(transform, TestStream): self.supported_by_prism_runner = False if isinstance(transform, beam.ParDo): @@ -133,7 +134,8 @@ def visit_transform(self, applied_ptransform): for arg in args_to_check): self.supported_by_prism_runner = False if userstate.is_stateful_dofn(dofn): - # https://github.com/apache/beam/issues/32786 - Remove once Real time clock is used. + # https://github.com/apache/beam/issues/32786 - + # Remove once Real time clock is used. _, timer_specs = userstate.get_dofn_specs(dofn) for timer in timer_specs: if timer.time_domain == TimeDomain.REAL_TIME: @@ -173,7 +175,8 @@ def visit_transform(self, applied_ptransform): return pr except Exception as e: # If prism fails in Preparing the portable job, then the PortableRunner - # code raises an exception. Catch it, log it, and use the Direct runner instead. + # code raises an exception. Catch it, log it, and use the Direct runner + # instead. _LOGGER.info('Exception with PrismRunner:\n %s\n' % (e)) _LOGGER.info('Falling back to DirectRunner') runner = BundleBasedDirectRunner() diff --git a/sdks/python/apache_beam/runners/portability/prism_runner.py b/sdks/python/apache_beam/runners/portability/prism_runner.py index dcf72b997d11..77dc8a214e8e 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner.py @@ -209,42 +209,50 @@ def path_to_binary(self) -> str: process = subprocess.run(["go", "install", PRISMPKG], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, - env=envdict) + env=envdict, + check=False) if process.returncode == 0: # Successfully installed return '%s/prism' % (self.BIN_CACHE) # We failed to build for some reason. output = process.stdout.decode("utf-8") - if "not in a module" not in output and "no required module provides" not in output: + if ("not in a module" not in output) and ( + "no required module provides" not in output): # This branch handles two classes of failures: - # 1. Go isn't installed, so it needs to be installed by the Beam SDK developer. + # 1. Go isn't installed, so it needs to be installed by the Beam SDK + # developer. # 2. Go is installed, and they are building in a local version of Prism, # but there was a compile error that the developer should address. - # Either way, the @latest fallback either would fail, or hide the error, so fail now. + # Either way, the @latest fallback either would fail, or hide the error, + # so fail now. _LOGGER.info(output) raise ValueError( 'Unable to install a local of Prism: "%s";\n' - 'Likely Go is not installed, or a local change to Prism did not compile.\n' - 'Please install Go (see https://go.dev/doc/install) to enable automatic local builds.\n' + 'Likely Go is not installed, or a local change to Prism did not ' + 'compile.\nPlease install Go (see https://go.dev/doc/install) to ' + 'enable automatic local builds.\n' 'Alternatively provide a binary with the --prism_location flag.' '\nCaptured output:\n %s' % (self._version, output)) - # Go is installed and claims we're not in a Go module that has access to the Prism package. + # Go is installed and claims we're not in a Go module that has access to + # the Prism package. - # Fallback to using the @latest version of prism, which works everywhere. + # Fallback to using the @latest version of prism, which works everywhere. process = subprocess.run(["go", "install", PRISMPKG + "@latest"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, - env=envdict) + env=envdict, + check=False) if process.returncode == 0: return '%s/prism' % (self.BIN_CACHE) output = process.stdout.decode("utf-8") raise ValueError( - 'We were unable to execute the subprocess "%s" to automatically build prism. \n' - 'Alternatively provide an alternate binary with the --prism_location flag.' + 'We were unable to execute the subprocess "%s" to automatically ' + 'build prism.\nAlternatively provide an alternate binary with the ' + '--prism_location flag.' '\nCaptured output:\n %s' % (process.args, output)) def subprocess_cmd_and_endpoint(