-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Description
What would you like to happen?
We're in the early stages of using Beam to run jobs on DataProc using Flink (a la the setup guide here). All of the documentation I've been able to find and the tests in interactive_runner_test.py suggest that Clusters will be auto-created and their references cached when created as part of a pipeline. However, configure_for_flink will skip caching the cluster metadata if a master url isn't already defined
I would expect the documentation to be clearer or for ClusterMetadata to be auto-cached in configure_for_flink, any chance we could make either/both of those happen? We have enterprise-specific configurations that don't jive with creating clusters ad-hoc so would prefer to simply pass along the master URL. Currently planning to set a manual entry in ie.current_env().clusters but let me know if there's another recommended approach
Snippet:
import apache_beam as beam
import apache_beam.runners.interactive.interactive_environment as ie
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
from apache_beam.runners.portability.flink_runner import FlinkRunner
from apache_beam.options.pipeline_options import PipelineOptions
opts = {
"region": "us-central1",
"project_id": "project-1",
"flink_version": "1.12",
"flink_submit_uber_jar": True,
"environment_type": "DOCKER",
"environment_config": "image:tag",
"flink_master": "000.00.00.000:8080",
}
options = PipelineOptions.from_dictionary(opts)
runner = InteractiveRunner(underlying_runner=FlinkRunner())
p = beam.Pipeline(runner=runner, options=options)
runner.configure_for_flink(p, options) # can be ib.collect/ib.show/etc
ie.current_env().clusters.__dict__
which returns
{'dataproc_cluster_managers': {},
'master_urls': {},
'pipelines': {},
'default_cluster_metadata': None}
Issue Priority
Priority: 2
Issue Component
Component: beam-community