From bdcac3d36c8dacbe4781938012b71c17a8bf8201 Mon Sep 17 00:00:00 2001 From: Daniel Oliveira Date: Fri, 12 Jan 2018 16:20:29 -0800 Subject: [PATCH] [BEAM-3126] Creating flatten operation for bundle processor. Added the flatten URN to the URNs registered with the BeamTransformFactory. --- .../apache_beam/runners/worker/bundle_processor.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py index 1b270b903729..97c318be18b3 100644 --- a/sdks/python/apache_beam/runners/worker/bundle_processor.py +++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py @@ -610,3 +610,17 @@ def _create_combine_phase_operation( factory.state_sampler), transform_proto.unique_name, consumers) + + +@BeamTransformFactory.register_urn(urns.FLATTEN_TRANSFORM, None) +def create(factory, transform_id, transform_proto, unused_parameter, consumers): + return factory.augment_oldstyle_op( + operations.create_operation( + transform_proto.unique_name, + operation_specs.WorkerFlatten( + None, + [factory.get_only_output_coder(transform_proto)]), + factory.counter_factory, + factory.state_sampler), + transform_proto.unique_name, + consumers)