Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -1114,6 +1114,15 @@ def _add_argparse_args(cls, parser):
'During job submission a source distribution will be built and '
'the worker will install the resulting package before running any '
'custom code.'))
parser.add_argument(
'--file_to_stage',
'--files_to_stage',
dest='files_to_stage',
action='append',
default=None,
help=(
'Files to stage to the artifact service and be accessible to '
'workers.'))
parser.add_argument(
'--beam_plugin',
'--beam_plugins',
Expand Down
68 changes: 43 additions & 25 deletions sdks/python/apache_beam/runners/portability/stager.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,17 @@ def create_job_resources(options, # type: PipelineOptions

pickler.set_library(setup_options.pickle_library)

if setup_options.files_to_stage is not None:
files_to_stage = setup_options.files_to_stage
if len(files_to_stage) != len(set(files_to_stage)):
raise RuntimeError(
'Duplicated entries are found in %s. It was specified in the '
'--files_to_stage command line option.' % files_to_stage)
resources.extend(
Stager._create_files_to_stage(
files_to_stage, temp_dir=temp_dir,
option_hint='--files_to_stage'))

# We can skip boot dependencies: apache beam sdk, python packages from
# requirements.txt, python packages from extra_packages and workflow tarball
# if we know we are using a dependency pre-installed sdk container image.
Expand Down Expand Up @@ -351,9 +362,15 @@ def create_job_resources(options, # type: PipelineOptions
jar_packages = options.view_as(DebugOptions).lookup_experiment(
'jar_packages')
if jar_packages is not None:
_LOGGER.warning(
'jar_packages experimental flag is not necessary and will be removed '
'in the future version of Beam.')
resources.extend(
Stager._create_jar_packages(
jar_packages.split(','), temp_dir=temp_dir))
Stager._create_files_to_stage(
jar_packages.split(','),
temp_dir=temp_dir,
option_hint="--experiment='jar_packages='",
check_extension='.jar'))

# Pickle the main session if requested.
# We will create the pickled main session locally and then copy it to the
Expand Down Expand Up @@ -521,13 +538,14 @@ def _is_remote_path(path):
return path.find('://') != -1

@staticmethod
def _create_jar_packages(jar_packages, temp_dir):
def _create_files_to_stage(
files_to_stage, temp_dir, option_hint, check_extension=None):
# type: (...) -> List[beam_runner_api_pb2.ArtifactInformation]

"""Creates a list of local jar packages for Java SDK Harness.
"""Creates a list of local files for Python SDK Harness.

:param jar_packages: Ordered list of local paths to jar packages to be
staged. Only packages on localfile system and GCS are supported.
:param files_to_stage: Ordered list of local paths to files to be
staged. Only files on localfile system and GCS are supported.
:param temp_dir: Temporary folder where the resource building can happen.
:return: A list of tuples of local file paths and file names (no paths) for
the resource staged. All the files are assumed to be staged in
Expand All @@ -538,36 +556,36 @@ def _create_jar_packages(jar_packages, temp_dir):
"""
resources = [] # type: List[beam_runner_api_pb2.ArtifactInformation]
staging_temp_dir = tempfile.mkdtemp(dir=temp_dir)
local_packages = [] # type: List[str]
for package in jar_packages:
if not os.path.basename(package).endswith('.jar'):
local_files = [] # type: List[str]
for file in files_to_stage:
if check_extension is not None and not os.path.basename(file).endswith(
check_extension):
raise RuntimeError(
'The --experiment=\'jar_packages=\' option expects a full path '
'ending with ".jar" instead of %s' % package)

if not os.path.isfile(package):
if Stager._is_remote_path(package):
# Download remote package.
_LOGGER.info(
'Downloading jar package: %s locally before staging', package)
_, last_component = FileSystems.split(package)
'The %s option expects a full path ending with "%s" instead of %s' %
(option_hint, check_extension, file))

if not os.path.isfile(file):
if Stager._is_remote_path(file):
# Download remote file.
_LOGGER.info('Downloading file: %s locally before staging', file)
_, last_component = FileSystems.split(file)
local_file_path = FileSystems.join(staging_temp_dir, last_component)
Stager._download_file(package, local_file_path)
Stager._download_file(file, local_file_path)
else:
raise RuntimeError(
'The file %s cannot be found. It was specified in the '
'--experiment=\'jar_packages=\' command line option.' % package)
'%s command line option.' % (file, option_hint))
else:
local_packages.append(package)
local_files.append(file)

local_packages.extend([
local_files.extend([
FileSystems.join(staging_temp_dir, f)
for f in os.listdir(staging_temp_dir)
])

for package in local_packages:
basename = os.path.basename(package)
resources.append(Stager._create_file_stage_to_artifact(package, basename))
for file in local_files:
basename = os.path.basename(file)
resources.append(Stager._create_file_stage_to_artifact(file, basename))

return resources

Expand Down
29 changes: 29 additions & 0 deletions sdks/python/apache_beam/runners/portability/stager_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,35 @@ def test_with_jar_packages(self):
options, staging_location=staging_dir)[1])
self.assertEqual(['/tmp/remote/remote.jar'], self.remote_copied_files)

def test_with_files_to_stage(self):
staging_dir = self.make_temp_dir()
source_dir = self.make_temp_dir()
self.create_temp_file(os.path.join(source_dir, 'abc.txt'), 'nothing')
self.create_temp_file(os.path.join(source_dir, 'xyz.bin'), 'nothing')
self.create_temp_file(os.path.join(source_dir, 'ijk.tmp'), 'nothing')

options = PipelineOptions()
self.update_options(options)
options.view_as(SetupOptions).files_to_stage = [
os.path.join(source_dir, 'abc.txt'),
os.path.join(source_dir, 'xyz.bin'),
os.path.join(source_dir, 'ijk.tmp'),
'/tmp/remote/remote.jpg'
]

self.remote_copied_files = []

with mock.patch('apache_beam.runners.portability.stager_test'
'.stager.Stager._download_file',
staticmethod(self.file_copy)):
with mock.patch('apache_beam.runners.portability.stager_test'
'.stager.Stager._is_remote_path',
staticmethod(self.is_remote_path)):
self.assertEqual(['abc.txt', 'xyz.bin', 'ijk.tmp', 'remote.jpg'],
self.stager.create_and_stage_job_resources(
options, staging_location=staging_dir)[1])
self.assertEqual(['/tmp/remote/remote.jpg'], self.remote_copied_files)

def test_remove_dependency_from_requirements(self):
requirements_cache_dir = self.make_temp_dir()
requirements = ['apache_beam\n', 'avro-python3\n', 'fastavro\n', 'numpy\n']
Expand Down