Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion sdks/python/apache_beam/runners/interactive/cache_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,14 @@ def __init__(self, cache_dir=None, cache_format='text'):

def size(self, *labels):
if self.exists(*labels):
return sum(os.path.getsize(path) for path in self._match(*labels))
matched_path = self._match(*labels)
# if any matched path has a gs:// prefix, it must be cached on GCS
if 'gs://' in matched_path[0]:
from apache_beam.io.gcp import gcsio
return sum(
sum(gcsio.GcsIO().list_prefix(path).values())
for path in matched_path)
return sum(os.path.getsize(path) for path in matched_path)
return 0

def exists(self, *labels):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.runners.interactive import interactive_environment as ie
from apache_beam.runners.interactive.utils import progress_indicated
from apache_beam.version import __version__ as beam_version

try:
from google.cloud import dataproc_v1
Expand Down Expand Up @@ -67,9 +66,6 @@ class DataprocClusterManager:
required for creating and deleting Dataproc clusters for use
under Interactive Beam.
"""
IMAGE_VERSION = '2.0.31-debian10'
STAGING_LOG_NAME = 'dataproc-startup-script_output'

def __init__(self, cluster_metadata: MasterURLIdentifier) -> None:
"""Initializes the DataprocClusterManager with properties required
to interface with the Dataproc ClusterControllerClient.
Expand Down Expand Up @@ -162,13 +158,13 @@ def create_flink_cluster(self) -> None:
'cluster_name': self.cluster_metadata.cluster_name,
'config': {
'software_config': {
'image_version': self.IMAGE_VERSION,
'image_version': ie.current_env().clusters.
DATAPROC_IMAGE_VERSION,
'optional_components': ['DOCKER', 'FLINK']
},
'gce_cluster_config': {
'metadata': {
'flink-start-yarn-session': 'true',
'PIP_PACKAGES': 'apache-beam[gcp]=={}'.format(beam_version)
'flink-start-yarn-session': 'true'
},
'service_account_scopes': [
'https://www.googleapis.com/auth/cloud-platform'
Expand Down Expand Up @@ -310,7 +306,7 @@ def get_master_url_and_dashboard(
"""Returns the master_url of the current cluster."""
startup_logs = []
for file in self._fs._list(staging_bucket):
if self.STAGING_LOG_NAME in file.path:
if ie.current_env().clusters.DATAPROC_STAGING_LOG_NAME in file.path:
startup_logs.append(file.path)

for log in startup_logs:
Expand Down
19 changes: 12 additions & 7 deletions sdks/python/apache_beam/runners/interactive/interactive_beam.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,13 @@
from typing import DefaultDict
from typing import Dict
from typing import List
from typing import Mapping
from typing import Optional

import pandas as pd

import apache_beam as beam
from apache_beam.dataframe.frame_base import DeferredBase
from apache_beam.runners.interactive import interactive_environment as ie
from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import DataprocClusterManager
from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import MasterURLIdentifier
from apache_beam.runners.interactive.display import pipeline_graph
from apache_beam.runners.interactive.display.pcoll_visualization import visualize
from apache_beam.runners.interactive.display.pcoll_visualization import visualize_computed_pcoll
Expand Down Expand Up @@ -349,17 +346,26 @@ class Clusters:
Example of calling the Interactive Beam clusters describe method::
ib.clusters.describe()
"""
# Explicitly set the Flink version here to ensure compatibility with 2.0
# Dataproc images:
# https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-release-2.0
DATAPROC_FLINK_VERSION = '1.12'
DATAPROC_IMAGE_VERSION = '2.0.31-debian10'
DATAPROC_STAGING_LOG_NAME = 'dataproc-startup-script_output'

def __init__(self) -> None:
"""Instantiates default values for Dataproc cluster interactions.
"""
# Set the default_cluster_name that will be used when creating Dataproc
# clusters.
self.default_cluster_name = 'interactive-beam-cluster'
# Bidirectional 1-1 mapping between master_urls (str) to cluster metadata
# (MasterURLIdentifier), where self.master_urls.inverse is a mapping from
# MasterURLIdentifier -> str.
self.master_urls: Mapping[str, MasterURLIdentifier] = bidict()
self.master_urls = bidict()
# self.dataproc_cluster_managers map string pipeline ids to instances of
# DataprocClusterManager.
self.dataproc_cluster_managers: Dict[str, DataprocClusterManager] = {}
self.dataproc_cluster_managers = {}
# self.master_urls_to_pipelines map string master_urls to lists of
# pipelines that use the corresponding master_url.
self.master_urls_to_pipelines: DefaultDict[
Expand Down Expand Up @@ -457,8 +463,7 @@ def cleanup(
# Examples:
# ib.clusters.describe(p)
# Check the docstrings for detailed usages.
# TODO(victorhc): Resolve connection issue and add a working example
# clusters = Clusters()
clusters = Clusters()


def watch(watchable):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,8 @@ def __init__(self):
self._test_stream_service_controllers = {}
self._cached_source_signature = {}
self._tracked_user_pipelines = UserPipelineTracker()
# TODO(victorhc): remove the cluster instantiation after the
# interactive_beam.clusters class has been enabled.
from apache_beam.runners.interactive.interactive_beam import Clusters
self.clusters = Clusters()
from apache_beam.runners.interactive.interactive_beam import clusters
self.clusters = clusters

# Tracks the computation completeness of PCollections. PCollections tracked
# here don't need to be re-computed when data introspection is needed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,10 @@ def run_pipeline(self, pipeline, options):
master_url = self._get_dataproc_cluster_master_url_if_applicable(
user_pipeline)
if master_url:
options.view_as(FlinkRunnerOptions).flink_master = master_url
flink_options = options.view_as(FlinkRunnerOptions)
flink_options.flink_master = master_url
flink_options.flink_version = ie.current_env(
).clusters.DATAPROC_FLINK_VERSION
pipeline_instrument = inst.build_pipeline_instrument(pipeline, options)

# The user_pipeline analyzed might be None if the pipeline given has nothing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import apache_beam as beam
from apache_beam.dataframe.convert import to_dataframe
from apache_beam.options.pipeline_options import FlinkRunnerOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.runners.direct import direct_runner
Expand Down Expand Up @@ -543,12 +544,39 @@ def test_get_master_url_flink_master_provided(self):
from apache_beam.runners.portability.flink_runner import FlinkRunner
p = beam.Pipeline(
interactive_runner.InteractiveRunner(underlying_runner=FlinkRunner()),
options=PipelineOptions(
flink_master='--flink_master=example.internal:1'))
options=PipelineOptions(flink_master='--flink_master=test.internal:1'))
runner._get_dataproc_cluster_master_url_if_applicable(p)
self.assertEqual(ie.current_env().clusters.describe(), {})
ie.current_env().clusters = ib.Clusters()

@unittest.skipIf(
not ie.current_env().is_interactive_ready,
'[interactive] dependency is not installed.')
@patch(
'apache_beam.runners.interactive.interactive_runner.'
'InteractiveRunner._get_dataproc_cluster_master_url_if_applicable',
return_value='test.internal:1')
def test_set_flink_dataproc_version(self, mock_get_master_url):
runner = interactive_runner.InteractiveRunner()
options = PipelineOptions()
p = beam.Pipeline(interactive_runner.InteractiveRunner())

# Watch the local scope for Interactive Beam so that values will be cached.
ib.watch(locals())

# This is normally done in the interactive_utils when a transform is
# applied but needs an IPython environment. So we manually run this here.
ie.current_env().track_user_pipelines()

# Run the pipeline
runner.run_pipeline(p, options)

# Check that the Flink version is set to the Dataproc image Flink version
# inside ib.clusters.
self.assertEqual(
options.view_as(FlinkRunnerOptions).flink_version,
ib.clusters.DATAPROC_FLINK_VERSION)


if __name__ == '__main__':
unittest.main()