From 8bca5d79091895cdf80ef10f23ad2818fc739b33 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Thu, 1 Oct 2020 12:09:20 -0700 Subject: [PATCH 01/11] [BEAM-2914] Add portable merging window support to Python. --- .../portability/fn_api_runner/execution.py | 217 +++++++++++++++++- .../fn_api_runner/fn_runner_test.py | 32 +++ .../runners/worker/bundle_processor.py | 42 +++- 3 files changed, 279 insertions(+), 12 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py index 5b8e91ca2b57..d7cdf056613d 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py @@ -22,6 +22,8 @@ import collections import copy import itertools +import uuid +import weakref from typing import TYPE_CHECKING from typing import Any from typing import DefaultDict @@ -51,6 +53,7 @@ from apache_beam.runners.portability.fn_api_runner.translations import split_buffer_id from apache_beam.runners.portability.fn_api_runner.translations import unique_name from apache_beam.runners.worker import bundle_processor +from apache_beam.transforms import core from apache_beam.transforms import trigger from apache_beam.transforms import window from apache_beam.transforms.window import GlobalWindow @@ -298,6 +301,199 @@ def from_runner_api_parameter(window_coder_id, context): context.coders[window_coder_id.decode('utf-8')]) +class GenericMergingWindowFn(window.WindowFn): + + URN = 'internal-generic-merging' + + TO_SDK_TRANSFORM = 'read' + FROM_SDK_TRANSFORM = 'write' + + _HANDLES = {} + + def __init__(self, execution_context, windowing_strategy_proto): + self._worker_handle = None + self._handle_id = handle_id = uuid.uuid4().hex + self._HANDLES[handle_id] = self + # ExecutionContexts are expensive, we don't want to keep them in the + # static dictionary forever. Instead we hold a weakref and pop self + # out of the dict once this context goes away. + self._execution_context_ref = weakref.ref( + execution_context, lambda _: self._HANDLES.pop(handle_id, None)) + self._windowing_strategy_proto = windowing_strategy_proto + self._process_bundle_descriptor = None + self._counter = 0 + + def payload(self): + return self._handle_id.encode('utf-8') + + @staticmethod + @window.urns.RunnerApiFn.register_urn(URN, bytes) + def from_runner_api_parameter(handle_id, unused_context): + return GenericMergingWindowFn._HANDLES[handle_id.decode('utf-8')] + + def assign(self, assign_context): + raise NotImplementedError() + + def merge(self, merge_context): + worker_handler = self.worker_handle() + + process_bundle_id = self.uid('process') + to_worker = worker_handler.data_conn.output_stream( + process_bundle_id, self.TO_SDK_TRANSFORM) + to_worker.write( + self.windowed_input_coder_impl.encode_nested( + window.GlobalWindows.windowed_value((b'', merge_context.windows)))) + to_worker.close() + + process_bundle_req = beam_fn_api_pb2.InstructionRequest( + instruction_id=process_bundle_id, + process_bundle=beam_fn_api_pb2.ProcessBundleRequest( + process_bundle_descriptor_id=self._bundle_processor_id)) + result_future = worker_handler.control_conn.push(process_bundle_req) + for output in worker_handler.data_conn.input_elements( + process_bundle_id, + self.FROM_SDK_TRANSFORM, + abort_callback=lambda: + (result_future.is_done() and result_future.get().error)): + if isinstance(output, beam_fn_api_pb2.Elements.Data): + windowed_result = self.windowed_output_coder_impl.decode_nested( + output.data) + for merge_result, originals in windowed_result.value[1][1]: + merge_context.merge(originals, merge_result) + else: + raise RuntimeError("Unexpected data: %s" % output) + + result = result_future.get() + if result.error: + raise RuntimeError(result.error) + # The result was "returned" via the merge callbacks on merge_context above. + + def get_window_coder(self): + return self._execution_context_ref().pipeline_context.coders[ + self._windowing_strategy_proto.window_coder_id] + + def worker_handle(self): + if self._worker_handle is None: + worker_handler_manager = self._execution_context_ref( + ).worker_handler_manager + self._worker_handler = worker_handler_manager.get_worker_handlers( + self._windowing_strategy_proto.environment_id, 1)[0] + process_bundle_decriptor = self.make_process_bundle_descriptor( + self._worker_handler.data_api_service_descriptor(), + self._worker_handler.state_api_service_descriptor()) + worker_handler_manager.register_process_bundle_descriptor( + process_bundle_decriptor) + return self._worker_handler + + def make_process_bundle_descriptor( + self, state_api_service_descriptor, data_api_service_descriptor): + """Creates a ProcessBundleDescriptor for invoking the WindowFn's + merge operation. + """ + def make_channel_payload(coder_id): + data_spec = beam_fn_api_pb2.RemoteGrpcPort(coder_id=coder_id) + if data_api_service_descriptor: + data_spec.api_service_descriptor.url = (data_api_service_descriptor.url) + return data_spec.SerializeToString() + + pipeline_context = self._execution_context_ref().pipeline_context + global_windowing_strategy_id = self.uid('global_windowing_strategy') + global_windowing_strategy_proto = core.Windowing( + window.GlobalWindows()).to_runner_api(pipeline_context) + coders = dict(pipeline_context.coders.get_id_to_proto_map()) + + def make_coder(urn, *components): + coder_proto = beam_runner_api_pb2.Coder( + spec=beam_runner_api_pb2.FunctionSpec(urn=urn), + component_coder_ids=components) + coder_id = self.uid('coder') + coders[coder_id] = coder_proto + pipeline_context.coders.put_proto(coder_id, coder_proto) + return coder_id + + bytes_coder_id = make_coder(common_urns.coders.BYTES.urn) + window_coder_id = self._windowing_strategy_proto.window_coder_id + global_window_coder_id = make_coder(common_urns.coders.GLOBAL_WINDOW.urn) + iter_window_coder_id = make_coder( + common_urns.coders.ITERABLE.urn, window_coder_id) + input_coder_id = make_coder( + common_urns.coders.KV.urn, bytes_coder_id, iter_window_coder_id) + output_coder_id = make_coder( + common_urns.coders.KV.urn, + bytes_coder_id, + make_coder( + common_urns.coders.KV.urn, + iter_window_coder_id, + make_coder( + common_urns.coders.ITERABLE.urn, + make_coder( + common_urns.coders.KV.urn, + window_coder_id, + iter_window_coder_id)))) + windowed_input_coder_id = make_coder( + common_urns.coders.WINDOWED_VALUE.urn, + input_coder_id, + global_window_coder_id) + windowed_output_coder_id = make_coder( + common_urns.coders.WINDOWED_VALUE.urn, + output_coder_id, + global_window_coder_id) + + self.windowed_input_coder_impl = pipeline_context.coders[ + windowed_input_coder_id].get_impl() + self.windowed_output_coder_impl = pipeline_context.coders[ + windowed_output_coder_id].get_impl() + + self._bundle_processor_id = self.uid('merge_windows') + return beam_fn_api_pb2.ProcessBundleDescriptor( + id=self._bundle_processor_id, + transforms={ + self.TO_SDK_TRANSFORM: beam_runner_api_pb2.PTransform( + unique_name='MergeWindows/Read', + spec=beam_runner_api_pb2.FunctionSpec( + urn=bundle_processor.DATA_INPUT_URN, + payload=make_channel_payload(windowed_input_coder_id)), + outputs={'input': 'input'}), + 'Merge': beam_runner_api_pb2.PTransform( + unique_name='MergeWindows/Merge', + spec=beam_runner_api_pb2.FunctionSpec( + urn=common_urns.primitives.MERGE_WINDOWS.urn, + payload=self._windowing_strategy_proto.window_fn. + SerializeToString()), + inputs={'input': 'input'}, + outputs={'output': 'output'}), + self.FROM_SDK_TRANSFORM: beam_runner_api_pb2.PTransform( + unique_name='MergeWindows/Write', + spec=beam_runner_api_pb2.FunctionSpec( + urn=bundle_processor.DATA_OUTPUT_URN, + payload=make_channel_payload(windowed_output_coder_id)), + inputs={'output': 'output'}), + }, + pcollections={ + 'input': beam_runner_api_pb2.PCollection( + unique_name='input', + windowing_strategy_id=global_windowing_strategy_id, + coder_id=input_coder_id), + 'output': beam_runner_api_pb2.PCollection( + unique_name='output', + windowing_strategy_id=global_windowing_strategy_id, + coder_id=output_coder_id), + }, + coders=coders, + windowing_strategies={ + global_windowing_strategy_id: global_windowing_strategy_proto, + }, + environments=dict( + self._execution_context_ref().pipeline_components.environments. + items()), + state_api_service_descriptor=state_api_service_descriptor, + timer_api_service_descriptor=data_api_service_descriptor) + + def uid(self, name=''): + self._counter += 1 + return '%s_%s_%s' % (self._handle_id, name, self._counter) + + class FnApiRunnerExecutionContext(object): """ :var pcoll_buffers: (dict): Mapping of @@ -400,23 +596,22 @@ def _make_safe_windowing_strategy(self, id): windowing_strategy_proto = self.pipeline_components.windowing_strategies[id] if windowing_strategy_proto.window_fn.urn in SAFE_WINDOW_FNS: return id - elif (windowing_strategy_proto.merge_status == - beam_runner_api_pb2.MergeStatus.NON_MERGING) or True: + else: safe_id = id + '_safe' while safe_id in self.pipeline_components.windowing_strategies: safe_id += '_' safe_proto = copy.copy(windowing_strategy_proto) - safe_proto.window_fn.urn = GenericNonMergingWindowFn.URN - safe_proto.window_fn.payload = ( - windowing_strategy_proto.window_coder_id.encode('utf-8')) + if (windowing_strategy_proto.merge_status == + beam_runner_api_pb2.MergeStatus.NON_MERGING): + safe_proto.window_fn.urn = GenericNonMergingWindowFn.URN + safe_proto.window_fn.payload = ( + windowing_strategy_proto.window_coder_id.encode('utf-8')) + else: + window_fn = GenericMergingWindowFn(self, windowing_strategy_proto) + safe_proto.window_fn.urn = GenericMergingWindowFn.URN + safe_proto.window_fn.payload = window_fn.payload() self.pipeline_context.windowing_strategies.put_proto(safe_id, safe_proto) return safe_id - elif windowing_strategy_proto.window_fn.urn == python_urns.PICKLED_WINDOWFN: - return id - else: - raise NotImplementedError( - '[BEAM-10119] Unknown merging WindowFn: %s' % - windowing_strategy_proto) @property def state_servicer(self): diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py index 91343b554d76..3bd5a0a9a253 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py @@ -20,6 +20,7 @@ from __future__ import print_function import collections +import gc import logging import os import random @@ -46,6 +47,7 @@ from tenacity import stop_after_attempt import apache_beam as beam +from apache_beam.coders import coders from apache_beam.coders.coders import StrUtf8Coder from apache_beam.io import restriction_trackers from apache_beam.io.watermark_estimators import ManualWatermarkEstimator @@ -709,6 +711,22 @@ def test_windowing(self): | beam.Map(lambda k_vs1: (k_vs1[0], sorted(k_vs1[1])))) assert_that(res, equal_to([('k', [1, 2]), ('k', [100, 101, 102])])) + def test_custom_merging_window(self): + with self.create_pipeline() as p: + res = ( + p + | beam.Create([1, 2, 100, 101, 102]) + | beam.Map(lambda t: window.TimestampedValue(('k', t), t)) + | beam.WindowInto(CustomMergingWindowFn()) + | beam.GroupByKey() + | beam.Map(lambda k_vs1: (k_vs1[0], sorted(k_vs1[1])))) + assert_that( + res, equal_to([('k', [1]), ('k', [101]), ('k', [2, 100, 102])])) + gc.collect() + from apache_beam.runners.portability.fn_api_runner.execution import GenericMergingWindowFn + self.assertEqual(GenericMergingWindowFn._HANDLES, {}) + + @unittest.skip('BEAM-9119: test is flaky') def test_large_elements(self): with self.create_pipeline() as p: @@ -1836,6 +1854,20 @@ def test_gbk_many_values(self): assert_that(r, equal_to([VALUES_PER_ELEMENT * NUM_OF_ELEMENTS])) +# TODO(robertwb): Why does pickling break when this is inlined? +class CustomMergingWindowFn(window.WindowFn): + def assign(self, assign_context): + return [window.IntervalWindow( + assign_context.timestamp, assign_context.timestamp + 1)] + def merge(self, merge_context): + evens = [w for w in merge_context.windows if w.start % 2 == 0] + if evens: + merge_context.merge(evens, window.IntervalWindow( + min(w.start for w in evens), max(w.end for w in evens))) + def get_window_coder(self): + return coders.IntervalWindowCoder() + + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) unittest.main() diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py index ed2390d93c47..966f01274d6f 100644 --- a/sdks/python/apache_beam/runners/worker/bundle_processor.py +++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py @@ -74,6 +74,7 @@ from apache_beam.transforms import core from apache_beam.transforms import sideinputs from apache_beam.transforms import userstate +from apache_beam.transforms import window from apache_beam.utils import counters from apache_beam.utils import proto_utils from apache_beam.utils import timestamp @@ -86,7 +87,6 @@ from apache_beam.runners.sdf_utils import SplitResultResidual from apache_beam.runners.worker import data_plane from apache_beam.runners.worker import sdk_worker - from apache_beam.transforms import window from apache_beam.utils import windowed_value # This module is experimental. No backwards-compatibility guarantees. @@ -1833,3 +1833,43 @@ def process(self, element): return _create_simple_pardo_operation( factory, transform_id, transform_proto, consumers, MapWindows()) + + +@BeamTransformFactory.register_urn( + common_urns.primitives.MERGE_WINDOWS.urn, beam_runner_api_pb2.FunctionSpec) +def create_map_windows( + factory, # type: BeamTransformFactory + transform_id, # type: str + transform_proto, # type: beam_runner_api_pb2.PTransform + mapping_fn_spec, # type: beam_runner_api_pb2.FunctionSpec + consumers # type: Dict[str, List[operations.Operation]] +): + assert mapping_fn_spec.urn == python_urns.PICKLED_WINDOWFN + window_fn = pickler.loads(mapping_fn_spec.payload) + + class MergeWindows(beam.DoFn): + def process(self, element): + nonce, windows = element + + original_windows = set(windows) # type: Set[window.BoundedWindow] + merged_windows = collections.defaultdict(set) # type: MutableMapping[window.BoundedWindow, Set[window.BoundedWindow]] + + class RecordingMergeContext(window.WindowFn.MergeContext): + def merge( + self, + to_be_merged, # type: Collection[window.BoundedWindow] + merge_result, # type: window.BoundedWindow + ): + originals = merged_windows[merge_result] + for window in to_be_merged: + if window in original_windows: + originals.add(window) + original_windows.remove(window) + else: + originals.update(merged_windows.pop(window)) + + window_fn.merge(RecordingMergeContext(windows)) + yield nonce, (original_windows, merged_windows.items()) + + return _create_simple_pardo_operation( + factory, transform_id, transform_proto, consumers, MergeWindows()) From 23dc93d72863032d9f6e12a9299c7272f4425f4a Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Fri, 2 Oct 2020 16:39:30 -0700 Subject: [PATCH 02/11] yapf --- .../portability/fn_api_runner/fn_runner_test.py | 15 ++++++++++----- .../runners/worker/bundle_processor.py | 4 +++- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py index 3bd5a0a9a253..12a86bcefe87 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py @@ -726,7 +726,6 @@ def test_custom_merging_window(self): from apache_beam.runners.portability.fn_api_runner.execution import GenericMergingWindowFn self.assertEqual(GenericMergingWindowFn._HANDLES, {}) - @unittest.skip('BEAM-9119: test is flaky') def test_large_elements(self): with self.create_pipeline() as p: @@ -1857,13 +1856,19 @@ def test_gbk_many_values(self): # TODO(robertwb): Why does pickling break when this is inlined? class CustomMergingWindowFn(window.WindowFn): def assign(self, assign_context): - return [window.IntervalWindow( - assign_context.timestamp, assign_context.timestamp + 1)] + return [ + window.IntervalWindow( + assign_context.timestamp, assign_context.timestamp + 1) + ] + def merge(self, merge_context): evens = [w for w in merge_context.windows if w.start % 2 == 0] if evens: - merge_context.merge(evens, window.IntervalWindow( - min(w.start for w in evens), max(w.end for w in evens))) + merge_context.merge( + evens, + window.IntervalWindow( + min(w.start for w in evens), max(w.end for w in evens))) + def get_window_coder(self): return coders.IntervalWindowCoder() diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py index 966f01274d6f..50f546ae7518 100644 --- a/sdks/python/apache_beam/runners/worker/bundle_processor.py +++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py @@ -1852,7 +1852,9 @@ def process(self, element): nonce, windows = element original_windows = set(windows) # type: Set[window.BoundedWindow] - merged_windows = collections.defaultdict(set) # type: MutableMapping[window.BoundedWindow, Set[window.BoundedWindow]] + merged_windows = collections.defaultdict( + set + ) # type: MutableMapping[window.BoundedWindow, Set[window.BoundedWindow]] class RecordingMergeContext(window.WindowFn.MergeContext): def merge( From 6afead40fe3350c3bd69d80e1c71abd9dfe28f94 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Fri, 2 Oct 2020 17:30:55 -0700 Subject: [PATCH 03/11] Skip flink, grpc fixes. --- .../apache_beam/runners/portability/flink_runner_test.py | 3 +++ .../runners/portability/fn_api_runner/execution.py | 4 ++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/flink_runner_test.py b/sdks/python/apache_beam/runners/portability/flink_runner_test.py index eb8361175724..16360fb54ad5 100644 --- a/sdks/python/apache_beam/runners/portability/flink_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/flink_runner_test.py @@ -384,6 +384,9 @@ def test_callbacks_with_exception(self): def test_register_finalizations(self): raise unittest.SkipTest("BEAM-6868") + def test_custom_merging_window(self): + raise unittest.SkipTest("BEAM-11004") + # Inherits all other tests. class FlinkRunnerTestOptimized(FlinkRunnerTest): diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py index d7cdf056613d..5e0261a9a6e1 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py @@ -352,7 +352,7 @@ def merge(self, merge_context): result_future = worker_handler.control_conn.push(process_bundle_req) for output in worker_handler.data_conn.input_elements( process_bundle_id, - self.FROM_SDK_TRANSFORM, + [self.FROM_SDK_TRANSFORM], abort_callback=lambda: (result_future.is_done() and result_future.get().error)): if isinstance(output, beam_fn_api_pb2.Elements.Data): @@ -386,7 +386,7 @@ def worker_handle(self): return self._worker_handler def make_process_bundle_descriptor( - self, state_api_service_descriptor, data_api_service_descriptor): + self, data_api_service_descriptor, state_api_service_descriptor): """Creates a ProcessBundleDescriptor for invoking the WindowFn's merge operation. """ From 8c95a28b2b8c89801dbe9e6cd1af83aa8c2e3c62 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Fri, 2 Oct 2020 18:05:17 -0700 Subject: [PATCH 04/11] yapf --- .../apache_beam/runners/portability/fn_api_runner/execution.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py index 5e0261a9a6e1..d8560ad6d943 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py @@ -351,8 +351,7 @@ def merge(self, merge_context): process_bundle_descriptor_id=self._bundle_processor_id)) result_future = worker_handler.control_conn.push(process_bundle_req) for output in worker_handler.data_conn.input_elements( - process_bundle_id, - [self.FROM_SDK_TRANSFORM], + process_bundle_id, [self.FROM_SDK_TRANSFORM], abort_callback=lambda: (result_future.is_done() and result_future.get().error)): if isinstance(output, beam_fn_api_pb2.Elements.Data): From e647cfe6d3c1bf4e289e60074f14b5ebd0dbb151 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Tue, 6 Oct 2020 11:44:13 -0700 Subject: [PATCH 05/11] fix merge --- .../runners/portability/flink_runner_test.py | 148 ------------------ 1 file changed, 148 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/flink_runner_test.py b/sdks/python/apache_beam/runners/portability/flink_runner_test.py index 2e0da08e66bd..9beaa912b47b 100644 --- a/sdks/python/apache_beam/runners/portability/flink_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/flink_runner_test.py @@ -435,154 +435,6 @@ def create_options(self): options.view_as(StandardOptions).streaming = True return options - def test_sql(self): - with self.create_pipeline() as p: - output = ( - p - | 'Create' >> beam.Create([Row(x, str(x)) for x in range(5)]) - | 'Sql' >> SqlTransform( - """SELECT col1, col2 || '*' || col2 as col2, - power(col1, 2) as col3 - FROM PCOLLECTION - """, - expansion_service=self.get_expansion_service())) - assert_that( - output, - equal_to([(x, '{x}*{x}'.format(x=x), x * x) for x in range(5)])) - - def test_flattened_side_input(self): - # Blocked on support for transcoding - # https://jira.apache.org/jira/browse/BEAM-6523 - super(FlinkRunnerTest, - self).test_flattened_side_input(with_transcoding=False) - - def test_metrics(self): - super(FlinkRunnerTest, self).test_metrics(check_gauge=False) - - def test_flink_metrics(self): - """Run a simple DoFn that increments a counter and verifies state - caching metrics. Verifies that its expected value is written to a - temporary file by the FileReporter""" - - counter_name = 'elem_counter' - state_spec = userstate.BagStateSpec('state', VarIntCoder()) - - class DoFn(beam.DoFn): - def __init__(self): - self.counter = Metrics.counter(self.__class__, counter_name) - _LOGGER.info('counter: %s' % self.counter.metric_name) - - def process(self, kv, state=beam.DoFn.StateParam(state_spec)): - # Trigger materialization - list(state.read()) - state.add(1) - self.counter.inc() - - options = self.create_options() - # Test only supports parallelism of 1 - options._all_options['parallelism'] = 1 - # Create multiple bundles to test cache metrics - options._all_options['max_bundle_size'] = 10 - options._all_options['max_bundle_time_millis'] = 95130590130 - experiments = options.view_as(DebugOptions).experiments or [] - experiments.append('state_cache_size=123') - options.view_as(DebugOptions).experiments = experiments - with Pipeline(self.get_runner(), options) as p: - # pylint: disable=expression-not-assigned - ( - p - | "create" >> beam.Create(list(range(0, 110))) - | "mapper" >> beam.Map(lambda x: (x % 10, 'val')) - | "stateful" >> beam.ParDo(DoFn())) - - lines_expected = {'counter: 110'} - if streaming: - lines_expected.update([ - # Gauges for the last finished bundle - 'stateful.beam.metric:statecache:capacity: 123', - 'stateful.beam.metric:statecache:size: 10', - 'stateful.beam.metric:statecache:get: 20', - 'stateful.beam.metric:statecache:miss: 0', - 'stateful.beam.metric:statecache:hit: 20', - 'stateful.beam.metric:statecache:put: 0', - 'stateful.beam.metric:statecache:evict: 0', - # Counters - 'stateful.beam.metric:statecache:get_total: 220', - 'stateful.beam.metric:statecache:miss_total: 10', - 'stateful.beam.metric:statecache:hit_total: 210', - 'stateful.beam.metric:statecache:put_total: 10', - 'stateful.beam.metric:statecache:evict_total: 0', - ]) - else: - # Batch has a different processing model. All values for - # a key are processed at once. - lines_expected.update([ - # Gauges - 'stateful).beam.metric:statecache:capacity: 123', - # For the first key, the cache token will not be set yet. - # It's lazily initialized after first access in StateRequestHandlers - 'stateful).beam.metric:statecache:size: 10', - # We have 11 here because there are 110 / 10 elements per key - 'stateful).beam.metric:statecache:get: 12', - 'stateful).beam.metric:statecache:miss: 1', - 'stateful).beam.metric:statecache:hit: 11', - # State is flushed back once per key - 'stateful).beam.metric:statecache:put: 1', - 'stateful).beam.metric:statecache:evict: 0', - # Counters - 'stateful).beam.metric:statecache:get_total: 120', - 'stateful).beam.metric:statecache:miss_total: 10', - 'stateful).beam.metric:statecache:hit_total: 110', - 'stateful).beam.metric:statecache:put_total: 10', - 'stateful).beam.metric:statecache:evict_total: 0', - ]) - lines_actual = set() - with open(self.test_metrics_path, 'r') as f: - for line in f: - for metric_str in lines_expected: - metric_name = metric_str.split()[0] - if metric_str in line: - lines_actual.add(metric_str) - elif metric_name in line: - lines_actual.add(line) - self.assertSetEqual(lines_actual, lines_expected) - - def test_sdf_with_watermark_tracking(self): - raise unittest.SkipTest("BEAM-2939") - - def test_sdf_with_sdf_initiated_checkpointing(self): - raise unittest.SkipTest("BEAM-2939") - - def test_callbacks_with_exception(self): - raise unittest.SkipTest("BEAM-6868") - - def test_register_finalizations(self): - raise unittest.SkipTest("BEAM-6868") - - # Inherits all other tests. - - class FlinkRunnerTestOptimized(FlinkRunnerTest): - # TODO: Remove these tests after resolving BEAM-7248 and enabling - # PortableRunnerOptimized - def create_options(self): - options = super(FlinkRunnerTestOptimized, self).create_options() - options.view_as(DebugOptions).experiments = [ - 'pre_optimize=all' - ] + options.view_as(DebugOptions).experiments - return options - - def test_external_transform(self): - raise unittest.SkipTest("BEAM-7252") - - def test_expand_kafka_read(self): - raise unittest.SkipTest("BEAM-7252") - - def test_expand_kafka_write(self): - raise unittest.SkipTest("BEAM-7252") - - def test_sql(self): - raise unittest.SkipTest("BEAM-7252") - if __name__ == '__main__': # Run the tests. From 3eb748ef37e12e35722fb0e449931e9af833d006 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Tue, 6 Oct 2020 11:48:01 -0700 Subject: [PATCH 06/11] merge fix --- sdks/python/apache_beam/runners/worker/bundle_processor.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py index 7315a628a1ca..1c64ae6c5537 100644 --- a/sdks/python/apache_beam/runners/worker/bundle_processor.py +++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py @@ -88,11 +88,8 @@ from apache_beam.runners.sdf_utils import SplitResultResidual from apache_beam.runners.worker import data_plane from apache_beam.runners.worker import sdk_worker -<<<<<<< HEAD -======= from apache_beam.transforms import window from apache_beam.transforms.core import Windowing ->>>>>>> master from apache_beam.utils import windowed_value # This module is experimental. No backwards-compatibility guarantees. From 8af2665ff9e7241dd49cf351b1753ecd61e5786e Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Mon, 12 Oct 2020 09:53:48 -0700 Subject: [PATCH 07/11] lint --- sdks/python/apache_beam/runners/worker/bundle_processor.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py index 1c64ae6c5537..66311c63534e 100644 --- a/sdks/python/apache_beam/runners/worker/bundle_processor.py +++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py @@ -88,7 +88,6 @@ from apache_beam.runners.sdf_utils import SplitResultResidual from apache_beam.runners.worker import data_plane from apache_beam.runners.worker import sdk_worker - from apache_beam.transforms import window from apache_beam.transforms.core import Windowing from apache_beam.utils import windowed_value From 37986fe315fa8abc719a86eb1ba763157d3368d6 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Mon, 12 Oct 2020 17:31:47 -0700 Subject: [PATCH 08/11] lint --- sdks/python/apache_beam/runners/worker/bundle_processor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py index 66311c63534e..493eb0494c04 100644 --- a/sdks/python/apache_beam/runners/worker/bundle_processor.py +++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py @@ -1854,7 +1854,7 @@ def process(self, element): @BeamTransformFactory.register_urn( common_urns.primitives.MERGE_WINDOWS.urn, beam_runner_api_pb2.FunctionSpec) -def create_map_windows( +def create_merge_windows( factory, # type: BeamTransformFactory transform_id, # type: str transform_proto, # type: beam_runner_api_pb2.PTransform @@ -1876,7 +1876,7 @@ def process(self, element): class RecordingMergeContext(window.WindowFn.MergeContext): def merge( self, - to_be_merged, # type: Collection[window.BoundedWindow] + to_be_merged, # type: Iterable[window.BoundedWindow] merge_result, # type: window.BoundedWindow ): originals = merged_windows[merge_result] From 2d29890ef16c2d247bbde2976a16a9dc00f5252a Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Mon, 8 Feb 2021 13:58:30 -0800 Subject: [PATCH 09/11] lint --- .../apache_beam/runners/portability/fn_api_runner/execution.py | 1 - sdks/python/apache_beam/runners/worker/bundle_processor.py | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py index 01e08b873d37..1c86cadb7ef4 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py @@ -72,7 +72,6 @@ from apache_beam.runners.portability.fn_api_runner.fn_runner import DataOutput from apache_beam.runners.portability.fn_api_runner.fn_runner import OutputTimers from apache_beam.runners.portability.fn_api_runner.translations import DataSideInput - from apache_beam.transforms import core from apache_beam.transforms.window import BoundedWindow ENCODED_IMPULSE_VALUE = WindowedValueCoder( diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py index c2eec0b90542..f05228e95b14 100644 --- a/sdks/python/apache_beam/runners/worker/bundle_processor.py +++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py @@ -1898,6 +1898,7 @@ def merge( return _create_simple_pardo_operation( factory, transform_id, transform_proto, consumers, MergeWindows()) + @BeamTransformFactory.register_urn(common_urns.primitives.TO_STRING.urn, None) def create_to_string_fn( factory, # type: BeamTransformFactory From 9978c432405fbe5dfcf2d93bb7620a8f815cd6a1 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Mon, 8 Feb 2021 14:30:40 -0800 Subject: [PATCH 10/11] mypy --- .../portability/fn_api_runner/execution.py | 33 ++++++++++++++++--- 1 file changed, 28 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py index 1c86cadb7ef4..493b67a9182e 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py @@ -347,35 +347,52 @@ class GenericMergingWindowFn(window.WindowFn): TO_SDK_TRANSFORM = 'read' FROM_SDK_TRANSFORM = 'write' - _HANDLES = {} + _HANDLES = {} # type: Dict[str, GenericMergingWindowFn] def __init__(self, execution_context, windowing_strategy_proto): - self._worker_handler = None + # type: (FnApiRunnerExecutionContext, beam_runner_api_pb2.WindowingStrategy) -> None + self._worker_handler = None # type: Optional[worker_handlers.WorkerHandler] self._handle_id = handle_id = uuid.uuid4().hex self._HANDLES[handle_id] = self # ExecutionContexts are expensive, we don't want to keep them in the # static dictionary forever. Instead we hold a weakref and pop self # out of the dict once this context goes away. - self._execution_context_ref = weakref.ref( + self._execution_context_ref_obj = weakref.ref( execution_context, lambda _: self._HANDLES.pop(handle_id, None)) self._windowing_strategy_proto = windowing_strategy_proto - self._process_bundle_descriptor = None self._counter = 0 + # Lazily created in make_process_bundle_descriptor() + self._process_bundle_descriptor = None + self._bundle_processor_id = None # type: Optional[str] + self.windowed_input_coder_impl = None # type: Optional[CoderImpl] + self.windowed_output_coder_impl = None # type: Optional[CoderImpl] + + def _execution_context_ref(self): + # type: () -> FnApiRunnerExecutionContext + result = self._execution_context_ref_obj() + assert result is not None + return result def payload(self): + # type: () -> bytes return self._handle_id.encode('utf-8') @staticmethod @window.urns.RunnerApiFn.register_urn(URN, bytes) def from_runner_api_parameter(handle_id, unused_context): + # type: (bytes, Any) -> GenericMergingWindowFn return GenericMergingWindowFn._HANDLES[handle_id.decode('utf-8')] def assign(self, assign_context): + # type: (window.WindowFn.AssignContext) -> Iterable[window.BoundedWindow] raise NotImplementedError() def merge(self, merge_context): + # type: (window.WindowFn.MergeContext) -> None worker_handler = self.worker_handle() + assert self.windowed_input_coder_impl is not None + assert self.windowed_output_coder_impl is not None process_bundle_id = self.uid('process') to_worker = worker_handler.data_conn.output_stream( process_bundle_id, self.TO_SDK_TRANSFORM) @@ -392,7 +409,7 @@ def merge(self, merge_context): for output in worker_handler.data_conn.input_elements( process_bundle_id, [self.FROM_SDK_TRANSFORM], abort_callback=lambda: - (result_future.is_done() and result_future.get().error)): + bool(result_future.is_done() and result_future.get().error)): if isinstance(output, beam_fn_api_pb2.Elements.Data): windowed_result = self.windowed_output_coder_impl.decode_nested( output.data) @@ -407,10 +424,12 @@ def merge(self, merge_context): # The result was "returned" via the merge callbacks on merge_context above. def get_window_coder(self): + # type: () -> coders.Coder return self._execution_context_ref().pipeline_context.coders[ self._windowing_strategy_proto.window_coder_id] def worker_handle(self): + # type: () -> worker_handlers.WorkerHandler if self._worker_handler is None: worker_handler_manager = self._execution_context_ref( ).worker_handler_manager @@ -425,10 +444,12 @@ def worker_handle(self): def make_process_bundle_descriptor( self, data_api_service_descriptor, state_api_service_descriptor): + # type: (Optional[endpoints_pb2.ApiServiceDescriptor], Optional[endpoints_pb2.ApiServiceDescriptor]) -> beam_fn_api_pb2.ProcessBundleDescriptor """Creates a ProcessBundleDescriptor for invoking the WindowFn's merge operation. """ def make_channel_payload(coder_id): + # type: (str) -> bytes data_spec = beam_fn_api_pb2.RemoteGrpcPort(coder_id=coder_id) if data_api_service_descriptor: data_spec.api_service_descriptor.url = (data_api_service_descriptor.url) @@ -441,6 +462,7 @@ def make_channel_payload(coder_id): coders = dict(pipeline_context.coders.get_id_to_proto_map()) def make_coder(urn, *components): + # type: (str, str) -> str coder_proto = beam_runner_api_pb2.Coder( spec=beam_runner_api_pb2.FunctionSpec(urn=urn), component_coder_ids=components) @@ -528,6 +550,7 @@ def make_coder(urn, *components): timer_api_service_descriptor=data_api_service_descriptor) def uid(self, name=''): + # type: (str) -> str self._counter += 1 return '%s_%s_%s' % (self._handle_id, name, self._counter) From 8c107568f5e21f13c947bbbfcedbefdeed6f376b Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Mon, 8 Feb 2021 15:06:44 -0800 Subject: [PATCH 11/11] yapf --- .../runners/portability/fn_api_runner/execution.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py index 493b67a9182e..a08aa5fa8475 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py @@ -408,8 +408,8 @@ def merge(self, merge_context): result_future = worker_handler.control_conn.push(process_bundle_req) for output in worker_handler.data_conn.input_elements( process_bundle_id, [self.FROM_SDK_TRANSFORM], - abort_callback=lambda: - bool(result_future.is_done() and result_future.get().error)): + abort_callback=lambda: bool(result_future.is_done() and result_future. + get().error)): if isinstance(output, beam_fn_api_pb2.Elements.Data): windowed_result = self.windowed_output_coder_impl.decode_nested( output.data) @@ -445,6 +445,7 @@ def worker_handle(self): def make_process_bundle_descriptor( self, data_api_service_descriptor, state_api_service_descriptor): # type: (Optional[endpoints_pb2.ApiServiceDescriptor], Optional[endpoints_pb2.ApiServiceDescriptor]) -> beam_fn_api_pb2.ProcessBundleDescriptor + """Creates a ProcessBundleDescriptor for invoking the WindowFn's merge operation. """