diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py index 1b270b903729..97c318be18b3 100644 --- a/sdks/python/apache_beam/runners/worker/bundle_processor.py +++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py @@ -610,3 +610,17 @@ def _create_combine_phase_operation( factory.state_sampler), transform_proto.unique_name, consumers) + + +@BeamTransformFactory.register_urn(urns.FLATTEN_TRANSFORM, None) +def create(factory, transform_id, transform_proto, unused_parameter, consumers): + return factory.augment_oldstyle_op( + operations.create_operation( + transform_proto.unique_name, + operation_specs.WorkerFlatten( + None, + [factory.get_only_output_coder(transform_proto)]), + factory.counter_factory, + factory.state_sampler), + transform_proto.unique_name, + consumers)