From f2470de45edd40d8b2a36cf6d7cf61f6272766dc Mon Sep 17 00:00:00 2001 From: Heejong Lee Date: Wed, 25 May 2022 18:59:46 -0700 Subject: [PATCH] [BEAM-14517] Add files_to_stage option to Python SDK --- .../apache_beam/options/pipeline_options.py | 9 +++ .../apache_beam/runners/portability/stager.py | 68 ++++++++++++------- .../runners/portability/stager_test.py | 29 ++++++++ 3 files changed, 81 insertions(+), 25 deletions(-) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 5aa29c0fd96e..09e7aa750fff 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -1114,6 +1114,15 @@ def _add_argparse_args(cls, parser): 'During job submission a source distribution will be built and ' 'the worker will install the resulting package before running any ' 'custom code.')) + parser.add_argument( + '--file_to_stage', + '--files_to_stage', + dest='files_to_stage', + action='append', + default=None, + help=( + 'Files to stage to the artifact service and be accessible to ' + 'workers.')) parser.add_argument( '--beam_plugin', '--beam_plugins', diff --git a/sdks/python/apache_beam/runners/portability/stager.py b/sdks/python/apache_beam/runners/portability/stager.py index 743beb490ed7..3fc16fd77953 100644 --- a/sdks/python/apache_beam/runners/portability/stager.py +++ b/sdks/python/apache_beam/runners/portability/stager.py @@ -206,6 +206,17 @@ def create_job_resources(options, # type: PipelineOptions pickler.set_library(setup_options.pickle_library) + if setup_options.files_to_stage is not None: + files_to_stage = setup_options.files_to_stage + if len(files_to_stage) != len(set(files_to_stage)): + raise RuntimeError( + 'Duplicated entries are found in %s. It was specified in the ' + '--files_to_stage command line option.' % files_to_stage) + resources.extend( + Stager._create_files_to_stage( + files_to_stage, temp_dir=temp_dir, + option_hint='--files_to_stage')) + # We can skip boot dependencies: apache beam sdk, python packages from # requirements.txt, python packages from extra_packages and workflow tarball # if we know we are using a dependency pre-installed sdk container image. @@ -351,9 +362,15 @@ def create_job_resources(options, # type: PipelineOptions jar_packages = options.view_as(DebugOptions).lookup_experiment( 'jar_packages') if jar_packages is not None: + _LOGGER.warning( + 'jar_packages experimental flag is not necessary and will be removed ' + 'in the future version of Beam.') resources.extend( - Stager._create_jar_packages( - jar_packages.split(','), temp_dir=temp_dir)) + Stager._create_files_to_stage( + jar_packages.split(','), + temp_dir=temp_dir, + option_hint="--experiment='jar_packages='", + check_extension='.jar')) # Pickle the main session if requested. # We will create the pickled main session locally and then copy it to the @@ -521,13 +538,14 @@ def _is_remote_path(path): return path.find('://') != -1 @staticmethod - def _create_jar_packages(jar_packages, temp_dir): + def _create_files_to_stage( + files_to_stage, temp_dir, option_hint, check_extension=None): # type: (...) -> List[beam_runner_api_pb2.ArtifactInformation] - """Creates a list of local jar packages for Java SDK Harness. + """Creates a list of local files for Python SDK Harness. - :param jar_packages: Ordered list of local paths to jar packages to be - staged. Only packages on localfile system and GCS are supported. + :param files_to_stage: Ordered list of local paths to files to be + staged. Only files on localfile system and GCS are supported. :param temp_dir: Temporary folder where the resource building can happen. :return: A list of tuples of local file paths and file names (no paths) for the resource staged. All the files are assumed to be staged in @@ -538,36 +556,36 @@ def _create_jar_packages(jar_packages, temp_dir): """ resources = [] # type: List[beam_runner_api_pb2.ArtifactInformation] staging_temp_dir = tempfile.mkdtemp(dir=temp_dir) - local_packages = [] # type: List[str] - for package in jar_packages: - if not os.path.basename(package).endswith('.jar'): + local_files = [] # type: List[str] + for file in files_to_stage: + if check_extension is not None and not os.path.basename(file).endswith( + check_extension): raise RuntimeError( - 'The --experiment=\'jar_packages=\' option expects a full path ' - 'ending with ".jar" instead of %s' % package) - - if not os.path.isfile(package): - if Stager._is_remote_path(package): - # Download remote package. - _LOGGER.info( - 'Downloading jar package: %s locally before staging', package) - _, last_component = FileSystems.split(package) + 'The %s option expects a full path ending with "%s" instead of %s' % + (option_hint, check_extension, file)) + + if not os.path.isfile(file): + if Stager._is_remote_path(file): + # Download remote file. + _LOGGER.info('Downloading file: %s locally before staging', file) + _, last_component = FileSystems.split(file) local_file_path = FileSystems.join(staging_temp_dir, last_component) - Stager._download_file(package, local_file_path) + Stager._download_file(file, local_file_path) else: raise RuntimeError( 'The file %s cannot be found. It was specified in the ' - '--experiment=\'jar_packages=\' command line option.' % package) + '%s command line option.' % (file, option_hint)) else: - local_packages.append(package) + local_files.append(file) - local_packages.extend([ + local_files.extend([ FileSystems.join(staging_temp_dir, f) for f in os.listdir(staging_temp_dir) ]) - for package in local_packages: - basename = os.path.basename(package) - resources.append(Stager._create_file_stage_to_artifact(package, basename)) + for file in local_files: + basename = os.path.basename(file) + resources.append(Stager._create_file_stage_to_artifact(file, basename)) return resources diff --git a/sdks/python/apache_beam/runners/portability/stager_test.py b/sdks/python/apache_beam/runners/portability/stager_test.py index f87d4a33bc4e..14d97b237f81 100644 --- a/sdks/python/apache_beam/runners/portability/stager_test.py +++ b/sdks/python/apache_beam/runners/portability/stager_test.py @@ -745,6 +745,35 @@ def test_with_jar_packages(self): options, staging_location=staging_dir)[1]) self.assertEqual(['/tmp/remote/remote.jar'], self.remote_copied_files) + def test_with_files_to_stage(self): + staging_dir = self.make_temp_dir() + source_dir = self.make_temp_dir() + self.create_temp_file(os.path.join(source_dir, 'abc.txt'), 'nothing') + self.create_temp_file(os.path.join(source_dir, 'xyz.bin'), 'nothing') + self.create_temp_file(os.path.join(source_dir, 'ijk.tmp'), 'nothing') + + options = PipelineOptions() + self.update_options(options) + options.view_as(SetupOptions).files_to_stage = [ + os.path.join(source_dir, 'abc.txt'), + os.path.join(source_dir, 'xyz.bin'), + os.path.join(source_dir, 'ijk.tmp'), + '/tmp/remote/remote.jpg' + ] + + self.remote_copied_files = [] + + with mock.patch('apache_beam.runners.portability.stager_test' + '.stager.Stager._download_file', + staticmethod(self.file_copy)): + with mock.patch('apache_beam.runners.portability.stager_test' + '.stager.Stager._is_remote_path', + staticmethod(self.is_remote_path)): + self.assertEqual(['abc.txt', 'xyz.bin', 'ijk.tmp', 'remote.jpg'], + self.stager.create_and_stage_job_resources( + options, staging_location=staging_dir)[1]) + self.assertEqual(['/tmp/remote/remote.jpg'], self.remote_copied_files) + def test_remove_dependency_from_requirements(self): requirements_cache_dir = self.make_temp_dir() requirements = ['apache_beam\n', 'avro-python3\n', 'fastavro\n', 'numpy\n']