diff --git a/.gitignore b/.gitignore index ff2faad05fd8..401b4f614b53 100644 --- a/.gitignore +++ b/.gitignore @@ -37,6 +37,7 @@ sdks/python/LICENSE sdks/python/NOTICE sdks/python/README.md sdks/python/apache_beam/portability/api/*pb2*.* +sdks/python/apache_beam/portability/common_urns.py # Ignore IntelliJ files. **/.idea/**/* diff --git a/model/pipeline/src/main/resources/org/apache/beam/model/common_urns.md b/model/pipeline/src/main/resources/org/apache/beam/model/common_urns.md new file mode 100644 index 000000000000..f20c9e486bf5 --- /dev/null +++ b/model/pipeline/src/main/resources/org/apache/beam/model/common_urns.md @@ -0,0 +1,132 @@ + + +# Apache Beam URNs + +This file serves as a central place to enumerate and document the various +URNs used in the Beam portability APIs. + + +## Core Transforms + +### urn:beam:transform:pardo:v1 + +TODO(BEAM-3595): Change this to beam:transform:pardo:v1. + +Represents Beam's parallel do operation. + +Payload: A serialized ParDoPayload proto. + +### beam:transform:group_by_key:v1 + +Represents Beam's group-by-key operation. + +Payload: None + +### beam:transform:window_into:v1 + +Payload: A windowing strategy id. + +### beam:transform:flatten:v1 + +### beam:transform:read:v1 + + +## Combining + +If any of the combine operations are produced by an SDK, it is assumed that +the SDK understands the last three combine helper operations. + +### beam:transform:combine_globally:v1 + +### beam:transform:combine_per_key:v1 + +### beam:transform:combine_grouped_values:v1 + +### beam:transform:combine_pgbkcv:v1 + +### beam:transform:combine_merge_accumulators:v1 + +### beam:transform:combine_extract_outputs:v1 + + +## Other common transforms + +### beam:transform:reshuffle:v1 + + +## WindowFns + +### beam:windowfn:global_windows:v0.1 + +TODO(BEAM-3595): Change this to beam:windowfn:global_windows:v1 + +### beam:windowfn:fixed_windows:v0.1 + +TODO(BEAM-3595): Change this to beam:windowfn:fixed_windows:v1 + +### beam:windowfn:sliding_windows:v0.1 + +TODO(BEAM-3595): Change this to beam:windowfn:sliding_windows:v1 + +### beam:windowfn:session_windows:v0.1 + +TODO(BEAM-3595): Change this to beam:windowfn:session_windows:v1 + + +## Coders + +### beam:coder:bytes:v1 + +Components: None + +### beam:coder:varint:v1 + +Components: None + +### beam:coder:kv:v1 + +Components: The key and value coder, in that order. + +### beam:coder:iterable:v1 + +Encodes an iterable of elements. + +Components: Coder for a single element. + +## Internal coders + +The following coders are typically not specified by manually by the user, +but are used at runtime and must be supported by every SDK. + +### beam:coder:length_prefix:v1 + +### beam:coder:global_window:v1 + +### beam:coder:interval_window:v1 + +### beam:coder:windowed_value:v1 + + +## Side input access + +### beam:side_input:iterable:v1 + +### beam:side_input:multimap:v1 + diff --git a/pom.xml b/pom.xml index 4cbb38794b5e..dc138d3210fd 100644 --- a/pom.xml +++ b/pom.xml @@ -1950,6 +1950,7 @@ **/sdks/python/NOTICE **/sdks/python/README.md **/sdks/python/apache_beam/portability/api/*pb2*.* + **/sdks/python/apache_beam/portability/common_urns.py **/sdks/python/**/*.c **/sdks/python/**/*.so **/sdks/python/**/*.egg diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java index b595bc914f53..77152b846ae8 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java @@ -18,6 +18,8 @@ package org.apache.beam.runners.core.construction; +import static org.apache.beam.runners.core.construction.UrnUtils.validateCommonUrn; + import com.google.auto.service.AutoService; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.BiMap; @@ -41,14 +43,14 @@ public class ModelCoderRegistrar implements CoderTranslatorRegistrar { @VisibleForTesting static final BiMap, String> BEAM_MODEL_CODER_URNS = ImmutableBiMap., String>builder() - .put(ByteArrayCoder.class, "urn:beam:coders:bytes:0.1") - .put(KvCoder.class, "urn:beam:coders:kv:0.1") - .put(VarLongCoder.class, "urn:beam:coders:varint:0.1") - .put(IntervalWindowCoder.class, "urn:beam:coders:interval_window:0.1") - .put(IterableCoder.class, "urn:beam:coders:stream:0.1") - .put(LengthPrefixCoder.class, "urn:beam:coders:length_prefix:0.1") - .put(GlobalWindow.Coder.class, "urn:beam:coders:global_window:0.1") - .put(FullWindowedValueCoder.class, "urn:beam:coders:windowed_value:0.1") + .put(ByteArrayCoder.class, validateCommonUrn("beam:coder:bytes:v1")) + .put(KvCoder.class, validateCommonUrn("beam:coder:kv:v1")) + .put(VarLongCoder.class, validateCommonUrn("beam:coder:varint:v1")) + .put(IntervalWindowCoder.class, validateCommonUrn("beam:coder:interval_window:v1")) + .put(IterableCoder.class, validateCommonUrn("beam:coder:iterable:v1")) + .put(LengthPrefixCoder.class, validateCommonUrn("beam:coder:length_prefix:v1")) + .put(GlobalWindow.Coder.class, validateCommonUrn("beam:coder:global_window:v1")) + .put(FullWindowedValueCoder.class, validateCommonUrn("beam:coder:windowed_value:v1")) .build(); @VisibleForTesting diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java index 238fb849b3af..f19f2a17ce73 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java @@ -19,6 +19,7 @@ package org.apache.beam.runners.core.construction; import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.runners.core.construction.UrnUtils.validateCommonUrn; import com.google.auto.value.AutoValue; import com.google.common.base.Joiner; @@ -51,27 +52,34 @@ */ public class PTransformTranslation { - public static final String PAR_DO_TRANSFORM_URN = "urn:beam:transform:pardo:v1"; - public static final String FLATTEN_TRANSFORM_URN = "urn:beam:transform:flatten:v1"; - public static final String GROUP_BY_KEY_TRANSFORM_URN = "urn:beam:transform:groupbykey:v1"; - public static final String READ_TRANSFORM_URN = "urn:beam:transform:read:v1"; - public static final String WINDOW_TRANSFORM_URN = "urn:beam:transform:window:v1"; + public static final String PAR_DO_TRANSFORM_URN = + validateCommonUrn("urn:beam:transform:pardo:v1"); + public static final String FLATTEN_TRANSFORM_URN = + validateCommonUrn("beam:transform:flatten:v1"); + public static final String GROUP_BY_KEY_TRANSFORM_URN = + validateCommonUrn("beam:transform:group_by_key:v1"); + public static final String READ_TRANSFORM_URN = + validateCommonUrn("beam:transform:read:v1"); + public static final String WINDOW_TRANSFORM_URN = + validateCommonUrn("beam:transform:window_into:v1"); public static final String TEST_STREAM_TRANSFORM_URN = "urn:beam:transform:teststream:v1"; // Not strictly a primitive transform - public static final String COMBINE_TRANSFORM_URN = "urn:beam:transform:combine:v1"; + public static final String COMBINE_TRANSFORM_URN = + validateCommonUrn("beam:transform:combine_per_key:v1"); - public static final String RESHUFFLE_URN = "urn:beam:transform:reshuffle:v1"; + public static final String RESHUFFLE_URN = + validateCommonUrn("beam:transform:reshuffle:v1"); // Less well-known. And where shall these live? - public static final String WRITE_FILES_TRANSFORM_URN = "urn:beam:transform:write_files:0.1"; + public static final String WRITE_FILES_TRANSFORM_URN = "beam:transform:write_files:0.1"; /** * @deprecated runners should move away from translating `CreatePCollectionView` and treat this as * part of the translation for a `ParDo` side input. */ @Deprecated - public static final String CREATE_VIEW_TRANSFORM_URN = "urn:beam:transform:create_view:v1"; + public static final String CREATE_VIEW_TRANSFORM_URN = "beam:transform:create_view:v1"; private static final Map, TransformPayloadTranslator> KNOWN_PAYLOAD_TRANSLATORS = loadTransformPayloadTranslators(); diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UrnUtils.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UrnUtils.java new file mode 100644 index 000000000000..3932390349bb --- /dev/null +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UrnUtils.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import com.google.common.io.CharStreams; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.HashSet; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** Utilities for dealing with URNs. */ +public class UrnUtils { + + private static final String STANDARD_URNS_PATH = "/org/apache/beam/model/common_urns.md"; + private static final Pattern URN_REGEX = Pattern.compile("\\b(urn:)?beam:\\S+:v[0-9.]+"); + private static final Set COMMON_URNS = extractUrnsFromPath(STANDARD_URNS_PATH); + + private static Set extractUrnsFromPath(String path) { + String contents; + try { + contents = CharStreams.toString(new InputStreamReader( + UrnUtils.class.getResourceAsStream(path))); + } catch (IOException exn) { + throw new RuntimeException(exn); + } + Set urns = new HashSet<>(); + Matcher m = URN_REGEX.matcher(contents); + while (m.find()) { + urns.add(m.group()); + } + return urns; + } + + public static String validateCommonUrn(String urn) { + if (!URN_REGEX.matcher(urn).matches()) { + throw new IllegalArgumentException( + String.format("'%s' does not match '%s'", urn, URN_REGEX)); + } + if (!COMMON_URNS.contains(urn)) { + throw new IllegalArgumentException( + String.format("'%s' is not found in '%s'", urn, STANDARD_URNS_PATH)); + } + return urn; + } +} diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java index 0b9c26f1f4cb..0da6f7e572b7 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java @@ -200,6 +200,13 @@ public static TimestampCombiner timestampCombinerFromProto(RunnerApi.OutputTime. public static final String FIXED_WINDOWS_FN = "beam:windowfn:fixed_windows:v0.1"; public static final String SLIDING_WINDOWS_FN = "beam:windowfn:sliding_windows:v0.1"; public static final String SESSION_WINDOWS_FN = "beam:windowfn:session_windows:v0.1"; + static { + // Out-of-line to facilitate use in the case statements below. + UrnUtils.validateCommonUrn(GLOBAL_WINDOWS_FN); + UrnUtils.validateCommonUrn(FIXED_WINDOWS_FN); + UrnUtils.validateCommonUrn(SLIDING_WINDOWS_FN); + UrnUtils.validateCommonUrn(SESSION_WINDOWS_FN); + } // This URN says that the WindowFn is just a UDF blob the Java SDK understands // TODO: standardize such things public static final String SERIALIZED_JAVA_WINDOWFN_URN = "beam:windowfn:javasdk:v0.1"; diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UrnUtilsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UrnUtilsTest.java new file mode 100644 index 000000000000..b7e0ba756e51 --- /dev/null +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UrnUtilsTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import org.junit.Test; + +/** + * Tests for UrnUtils. + */ +public class UrnUtilsTest { + + private static final String GOOD_URN = "beam:coder:bytes:v1"; + private static final String MISSING_URN = "beam:fake:v1"; + private static final String BAD_URN = "Beam"; + + @Test + public void testGoodUrnSuccedes() { + assertEquals(GOOD_URN, UrnUtils.validateCommonUrn(GOOD_URN)); + } + + @Test + public void testMissingUrnFails() { + try { + UrnUtils.validateCommonUrn(MISSING_URN); + fail("Should have rejected " + MISSING_URN); + } catch (IllegalArgumentException exn) { + // expected + } + } + + @Test + public void testBadUrnFails() { + try { + UrnUtils.validateCommonUrn(BAD_URN); + fail("Should have rejected " + BAD_URN); + } catch (IllegalArgumentException exn) { + // expected + } + } +} diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/graph/LengthPrefixUnknownCoders.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/graph/LengthPrefixUnknownCoders.java index ac7e745b939f..a7b23ab72dde 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/graph/LengthPrefixUnknownCoders.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/graph/LengthPrefixUnknownCoders.java @@ -29,21 +29,22 @@ /** * Utilities for replacing or wrapping unknown coders with {@link LengthPrefixCoder}. * - *

TODO: Support a dynamic list of well known coders using either registration or manual listing. + *

TODO: Support a dynamic list of well known coders using either registration or manual listing, + * possibly from ModelCoderRegistrar. */ public class LengthPrefixUnknownCoders { - private static final String BYTES_CODER_TYPE = "urn:beam:coders:bytes:0.1"; - private static final String LENGTH_PREFIX_CODER_TYPE = "urn:beam:coders:length_prefix:0.1"; + private static final String BYTES_CODER_TYPE = "beam:coder:bytes:v1"; + private static final String LENGTH_PREFIX_CODER_TYPE = "beam:coder:length_prefix:v1"; private static final Set WELL_KNOWN_CODER_URNS = ImmutableSet.of( BYTES_CODER_TYPE, - "urn:beam:coders:kv:0.1", - "urn:beam:coders:varint:0.1", - "urn:beam:coders:interval_window:0.1", - "urn:beam:coders:stream:0.1", + "beam:coder:kv:v1", + "beam:coder:varint:v1", + "beam:coder:interval_window:v1", + "beam:coder:stream:v1", LENGTH_PREFIX_CODER_TYPE, - "urn:beam:coders:global_window:0.1", - "urn:beam:coders:windowed_value:0.1"); + "beam:coder:global_window:v1", + "beam:coder:windowed_value:v1"); /** * Recursively traverse the coder tree and wrap the first unknown coder in every branch with a diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index 64902b592e31..f76625869879 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -25,11 +25,13 @@ import cPickle as pickle import google.protobuf +from google.protobuf import wrappers_pb2 from apache_beam.coders import coder_impl +from apache_beam.portability import common_urns +from apache_beam.portability import python_urns from apache_beam.portability.api import beam_runner_api_pb2 from apache_beam.utils import proto_utils -from apache_beam.utils import urns # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports try: @@ -263,8 +265,8 @@ def from_runner_api(cls, coder_proto, context): def to_runner_api_parameter(self, context): return ( - urns.PICKLED_CODER, - google.protobuf.wrappers_pb2.BytesValue(value=serialize_coder(self)), + python_urns.PICKLED_CODER, + wrappers_pb2.BytesValue(value=serialize_coder(self)), ()) @staticmethod @@ -284,7 +286,8 @@ def from_runner_api_parameter(unused_payload, components, unused_context): return cls() -@Coder.register_urn(urns.PICKLED_CODER, google.protobuf.wrappers_pb2.BytesValue) +@Coder.register_urn( + python_urns.PICKLED_CODER, google.protobuf.wrappers_pb2.BytesValue) def _pickle_from_runner_api_parameter(payload, components, context): return deserialize_coder(payload.value) @@ -363,7 +366,7 @@ def __hash__(self): return hash(type(self)) -Coder.register_structured_urn(urns.BYTES_CODER, BytesCoder) +Coder.register_structured_urn(common_urns.BYTES_CODER, BytesCoder) class VarIntCoder(FastCoder): @@ -382,7 +385,7 @@ def __hash__(self): return hash(type(self)) -Coder.register_structured_urn(urns.VAR_INT_CODER, VarIntCoder) +Coder.register_structured_urn(common_urns.VARINT_CODER, VarIntCoder) class FloatCoder(FastCoder): @@ -736,11 +739,11 @@ def __hash__(self): def to_runner_api_parameter(self, context): if self.is_kv_coder(): - return urns.KV_CODER, None, self.coders() + return common_urns.KV_CODER, None, self.coders() else: return super(TupleCoder, self).to_runner_api_parameter(context) - @Coder.register_urn(urns.KV_CODER, None) + @Coder.register_urn(common_urns.KV_CODER, None) def from_runner_api_parameter(unused_payload, components, unused_context): return TupleCoder(components) @@ -829,7 +832,7 @@ def __hash__(self): return hash((type(self), self._elem_coder)) -Coder.register_structured_urn(urns.ITERABLE_CODER, IterableCoder) +Coder.register_structured_urn(common_urns.ITERABLE_CODER, IterableCoder) class GlobalWindowCoder(SingletonCoder): @@ -845,7 +848,8 @@ def as_cloud_object(self): } -Coder.register_structured_urn(urns.GLOBAL_WINDOW_CODER, GlobalWindowCoder) +Coder.register_structured_urn( + common_urns.GLOBAL_WINDOW_CODER, GlobalWindowCoder) class IntervalWindowCoder(FastCoder): @@ -869,7 +873,8 @@ def __hash__(self): return hash(type(self)) -Coder.register_structured_urn(urns.INTERVAL_WINDOW_CODER, IntervalWindowCoder) +Coder.register_structured_urn( + common_urns.INTERVAL_WINDOW_CODER, IntervalWindowCoder) class WindowedValueCoder(FastCoder): @@ -928,7 +933,8 @@ def __hash__(self): (self.wrapped_value_coder, self.timestamp_coder, self.window_coder)) -Coder.register_structured_urn(urns.WINDOWED_VALUE_CODER, WindowedValueCoder) +Coder.register_structured_urn( + common_urns.WINDOWED_VALUE_CODER, WindowedValueCoder) class LengthPrefixCoder(FastCoder): @@ -972,4 +978,5 @@ def __hash__(self): return hash((type(self), self._value_coder)) -Coder.register_structured_urn(urns.LENGTH_PREFIX_CODER, LengthPrefixCoder) +Coder.register_structured_urn( + common_urns.LENGTH_PREFIX_CODER, LengthPrefixCoder) diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py index fc7a2f3f7b14..eb79f4d3c661 100644 --- a/sdks/python/apache_beam/io/iobase.py +++ b/sdks/python/apache_beam/io/iobase.py @@ -36,6 +36,8 @@ from apache_beam import coders from apache_beam import pvalue +from apache_beam.portability import common_urns +from apache_beam.portability import python_urns from apache_beam.portability.api import beam_runner_api_pb2 from apache_beam.pvalue import AsIter from apache_beam.pvalue import AsSingleton @@ -75,7 +77,7 @@ class SourceBase(HasDisplayData, urns.RunnerApiFn): """Base class for all sources that can be passed to beam.io.Read(...). """ - urns.RunnerApiFn.register_pickle_urn(urns.PICKLED_SOURCE) + urns.RunnerApiFn.register_pickle_urn(python_urns.PICKLED_SOURCE) class BoundedSource(SourceBase): @@ -832,7 +834,7 @@ def display_data(self): 'source_dd': self.source} def to_runner_api_parameter(self, context): - return (urns.READ_TRANSFORM, + return (common_urns.READ_TRANSFORM, beam_runner_api_pb2.ReadPayload( source=self.source.to_runner_api(context), is_bounded=beam_runner_api_pb2.IsBounded.BOUNDED @@ -845,7 +847,7 @@ def from_runner_api_parameter(parameter, context): ptransform.PTransform.register_urn( - urns.READ_TRANSFORM, + common_urns.READ_TRANSFORM, beam_runner_api_pb2.ReadPayload, Read.from_runner_api_parameter) diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 4ac5ea86bf28..4c484288f743 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -61,13 +61,13 @@ from apache_beam.options.pipeline_options import StandardOptions from apache_beam.options.pipeline_options import TypeOptions from apache_beam.options.pipeline_options_validator import PipelineOptionsValidator +from apache_beam.portability import common_urns from apache_beam.pvalue import PCollection from apache_beam.runners import PipelineRunner from apache_beam.runners import create_runner from apache_beam.transforms import ptransform from apache_beam.typehints import TypeCheckError from apache_beam.typehints import typehints -from apache_beam.utils import urns from apache_beam.utils.annotations import deprecated __all__ = ['Pipeline', 'PTransformOverride'] @@ -828,7 +828,7 @@ def is_side_input(tag): None if tag == 'None' else tag: context.pcollections.get_by_id(id) for tag, id in proto.outputs.items()} # This annotation is expected by some runners. - if proto.spec.urn == urns.PARDO_TRANSFORM: + if proto.spec.urn == common_urns.PARDO_TRANSFORM: result.transform.output_tags = set(proto.outputs.keys()).difference( {'None'}) if not result.parts: diff --git a/sdks/python/apache_beam/portability/python_urns.py b/sdks/python/apache_beam/portability/python_urns.py new file mode 100644 index 000000000000..a284b5fe66c0 --- /dev/null +++ b/sdks/python/apache_beam/portability/python_urns.py @@ -0,0 +1,30 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Enumeration of URNs specific to the Python SDK. + +For internal use only; no backwards-compatibility guarantees.""" + +PICKLED_CODER = "beam:coder:pickled_python:v1" +PICKLED_COMBINE_FN = "beam:combinefn:pickled_python:v1" +PICKLED_DOFN = "beam:dofn:pickled_python:v1" +PICKLED_DOFN_INFO = "beam:dofn:pickled_python_info:v1" +PICKLED_SOURCE = "beam:source:pickled_python:v1" +PICKLED_TRANSFORM = "beam:ptransform:pickled_python:v1" +PICKLED_WINDOW_MAPPING_FN = "beam:window_mapping_fn:pickled_python:v1" +PICKLED_WINDOWFN = "beam:windowfn:pickled_python:v1" +PICKLED_VIEWFN = "beam:view_fn:pickled_python_data:v1" diff --git a/sdks/python/apache_beam/pvalue.py b/sdks/python/apache_beam/pvalue.py index 3987fd5dfff3..2aca33e667f6 100644 --- a/sdks/python/apache_beam/pvalue.py +++ b/sdks/python/apache_beam/pvalue.py @@ -31,8 +31,9 @@ from apache_beam import coders from apache_beam import typehints from apache_beam.internal import pickler +from apache_beam.portability import common_urns +from apache_beam.portability import python_urns from apache_beam.portability.api import beam_runner_api_pb2 -from apache_beam.utils import urns __all__ = [ 'PCollection', @@ -301,7 +302,7 @@ def _side_input_data(self): view_options = self._view_options() from_runtime_iterable = type(self)._from_runtime_iterable return SideInputData( - urns.ITERABLE_ACCESS, + common_urns.ITERABLE_SIDE_INPUT, self._window_mapping_fn, lambda iterable: from_runtime_iterable(iterable, view_options), self._input_element_coder()) @@ -354,17 +355,18 @@ def to_runner_api(self, unused_context): urn=self.access_pattern), view_fn=beam_runner_api_pb2.SdkFunctionSpec( spec=beam_runner_api_pb2.FunctionSpec( - urn=urns.PICKLED_PYTHON_VIEWFN, + urn=python_urns.PICKLED_VIEWFN, payload=pickler.dumps((self.view_fn, self.coder)))), window_mapping_fn=beam_runner_api_pb2.SdkFunctionSpec( spec=beam_runner_api_pb2.FunctionSpec( - urn=urns.PICKLED_WINDOW_MAPPING_FN, + urn=python_urns.PICKLED_WINDOW_MAPPING_FN, payload=pickler.dumps(self.window_mapping_fn)))) @staticmethod def from_runner_api(proto, unused_context): - assert proto.view_fn.spec.urn == urns.PICKLED_PYTHON_VIEWFN - assert proto.window_mapping_fn.spec.urn == urns.PICKLED_WINDOW_MAPPING_FN + assert proto.view_fn.spec.urn == python_urns.PICKLED_VIEWFN + assert (proto.window_mapping_fn.spec.urn == + python_urns.PICKLED_WINDOW_MAPPING_FN) return SideInputData( proto.access_pattern.urn, pickler.loads(proto.window_mapping_fn.spec.payload), @@ -442,7 +444,7 @@ def _from_runtime_iterable(it, options): def _side_input_data(self): return SideInputData( - urns.ITERABLE_ACCESS, + common_urns.ITERABLE_SIDE_INPUT, self._window_mapping_fn, lambda iterable: iterable, self._input_element_coder()) @@ -473,7 +475,7 @@ def _from_runtime_iterable(it, options): def _side_input_data(self): return SideInputData( - urns.ITERABLE_ACCESS, + common_urns.ITERABLE_SIDE_INPUT, self._window_mapping_fn, list, self._input_element_coder()) @@ -501,7 +503,7 @@ def _from_runtime_iterable(it, options): def _side_input_data(self): return SideInputData( - urns.ITERABLE_ACCESS, + common_urns.ITERABLE_SIDE_INPUT, self._window_mapping_fn, dict, self._input_element_coder()) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py index 341f06f9a6aa..3d5d14a1ead7 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py @@ -21,6 +21,7 @@ import copy import logging import Queue as queue +import re import threading import time from concurrent import futures @@ -35,6 +36,7 @@ from apache_beam.coders.coder_impl import create_OutputStream from apache_beam.internal import pickler from apache_beam.metrics.execution import MetricsEnvironment +from apache_beam.portability import common_urns from apache_beam.portability.api import beam_fn_api_pb2 from apache_beam.portability.api import beam_fn_api_pb2_grpc from apache_beam.portability.api import beam_runner_api_pb2 @@ -46,7 +48,6 @@ from apache_beam.transforms import trigger from apache_beam.transforms.window import GlobalWindows from apache_beam.utils import proto_utils -from apache_beam.utils import urns # This module is experimental. No backwards-compatibility guarantees. @@ -143,7 +144,7 @@ class _WindowGroupingBuffer(object): def __init__(self, side_input_data): # Here's where we would use a different type of partitioning # (e.g. also by key) for a different access pattern. - assert side_input_data.access_pattern == urns.ITERABLE_ACCESS + assert side_input_data.access_pattern == common_urns.ITERABLE_SIDE_INPUT self._windowed_value_coder = side_input_data.coder self._window_coder = side_input_data.coder.window_coder self._value_coder = side_input_data.coder.wrapped_value_coder @@ -251,12 +252,12 @@ def fuse(self, other): union(self.must_follow, other.must_follow)) def is_flatten(self): - return any(transform.spec.urn == urns.FLATTEN_TRANSFORM + return any(transform.spec.urn == common_urns.FLATTEN_TRANSFORM for transform in self.transforms) def side_inputs(self): for transform in self.transforms: - if transform.spec.urn == urns.PARDO_TRANSFORM: + if transform.spec.urn == common_urns.PARDO_TRANSFORM: payload = proto_utils.parse_Bytes( transform.spec.payload, beam_runner_api_pb2.ParDoPayload) for side_input in payload.side_inputs: @@ -264,7 +265,7 @@ def side_inputs(self): def has_as_main_input(self, pcoll): for transform in self.transforms: - if transform.spec.urn == urns.PARDO_TRANSFORM: + if transform.spec.urn == common_urns.PARDO_TRANSFORM: payload = proto_utils.parse_Bytes( transform.spec.payload, beam_runner_api_pb2.ParDoPayload) local_side_inputs = payload.side_inputs @@ -311,14 +312,14 @@ def windowed_coder_id(coder_id): proto = beam_runner_api_pb2.Coder( spec=beam_runner_api_pb2.SdkFunctionSpec( spec=beam_runner_api_pb2.FunctionSpec( - urn=urns.WINDOWED_VALUE_CODER)), + urn=common_urns.WINDOWED_VALUE_CODER)), component_coder_ids=[coder_id, window_coder_id]) return add_or_get_coder_id(proto) for stage in stages: assert len(stage.transforms) == 1 transform = stage.transforms[0] - if transform.spec.urn == urns.COMBINE_PER_KEY_TRANSFORM: + if transform.spec.urn == common_urns.COMBINE_PER_KEY_TRANSFORM: combine_payload = proto_utils.parse_Bytes( transform.spec.payload, beam_runner_api_pb2.CombinePayload) @@ -338,14 +339,14 @@ def windowed_coder_id(coder_id): key_accumulator_coder = beam_runner_api_pb2.Coder( spec=beam_runner_api_pb2.SdkFunctionSpec( spec=beam_runner_api_pb2.FunctionSpec( - urn=urns.KV_CODER)), + urn=common_urns.KV_CODER)), component_coder_ids=[key_coder_id, accumulator_coder_id]) key_accumulator_coder_id = add_or_get_coder_id(key_accumulator_coder) accumulator_iter_coder = beam_runner_api_pb2.Coder( spec=beam_runner_api_pb2.SdkFunctionSpec( spec=beam_runner_api_pb2.FunctionSpec( - urn=urns.ITERABLE_CODER)), + urn=common_urns.ITERABLE_CODER)), component_coder_ids=[accumulator_coder_id]) accumulator_iter_coder_id = add_or_get_coder_id( accumulator_iter_coder) @@ -353,7 +354,7 @@ def windowed_coder_id(coder_id): key_accumulator_iter_coder = beam_runner_api_pb2.Coder( spec=beam_runner_api_pb2.SdkFunctionSpec( spec=beam_runner_api_pb2.FunctionSpec( - urn=urns.KV_CODER)), + urn=common_urns.KV_CODER)), component_coder_ids=[key_coder_id, accumulator_iter_coder_id]) key_accumulator_iter_coder_id = add_or_get_coder_id( key_accumulator_iter_coder) @@ -397,7 +398,7 @@ def make_stage(base_stage, transform): beam_runner_api_pb2.PTransform( unique_name=transform.unique_name + '/Precombine', spec=beam_runner_api_pb2.FunctionSpec( - urn=urns.PRECOMBINE_TRANSFORM, + urn=common_urns.COMBINE_PGBKCV_TRANSFORM, payload=transform.spec.payload), inputs=transform.inputs, outputs={'out': precombined_pcoll_id})) @@ -407,7 +408,7 @@ def make_stage(base_stage, transform): beam_runner_api_pb2.PTransform( unique_name=transform.unique_name + '/Group', spec=beam_runner_api_pb2.FunctionSpec( - urn=urns.GROUP_BY_KEY_TRANSFORM), + urn=common_urns.GROUP_BY_KEY_TRANSFORM), inputs={'in': precombined_pcoll_id}, outputs={'out': grouped_pcoll_id})) @@ -416,7 +417,7 @@ def make_stage(base_stage, transform): beam_runner_api_pb2.PTransform( unique_name=transform.unique_name + '/Merge', spec=beam_runner_api_pb2.FunctionSpec( - urn=urns.MERGE_ACCUMULATORS_TRANSFORM, + urn=common_urns.COMBINE_MERGE_ACCUMULATORS_TRANSFORM, payload=transform.spec.payload), inputs={'in': grouped_pcoll_id}, outputs={'out': merged_pcoll_id})) @@ -426,7 +427,7 @@ def make_stage(base_stage, transform): beam_runner_api_pb2.PTransform( unique_name=transform.unique_name + '/ExtractOutputs', spec=beam_runner_api_pb2.FunctionSpec( - urn=urns.EXTRACT_OUTPUTS_TRANSFORM, + urn=common_urns.COMBINE_EXTRACT_OUTPUTS_TRANSFORM, payload=transform.spec.payload), inputs={'in': merged_pcoll_id}, outputs=transform.outputs)) @@ -437,12 +438,13 @@ def make_stage(base_stage, transform): def expand_gbk(stages): """Transforms each GBK into a write followed by a read. """ - good_coder_urns = set(beam.coders.Coder._known_urns.keys()) - set([ - urns.PICKLED_CODER]) + good_coder_urns = set( + value for key, value in common_urns.__dict__.items() + if re.match('[A-Z][A-Z_]*$', key)) coders = pipeline_components.coders for coder_id, coder_proto in coders.items(): - if coder_proto.spec.spec.urn == urns.BYTES_CODER: + if coder_proto.spec.spec.urn == common_urns.BYTES_CODER: bytes_coder_id = coder_id break else: @@ -456,7 +458,7 @@ def wrap_unknown_coders(coder_id, with_bytes): if (coder_id, with_bytes) not in coder_substitutions: wrapped_coder_id = None coder_proto = coders[coder_id] - if coder_proto.spec.spec.urn == urns.LENGTH_PREFIX_CODER: + if coder_proto.spec.spec.urn == common_urns.LENGTH_PREFIX_CODER: coder_substitutions[coder_id, with_bytes] = ( bytes_coder_id if with_bytes else coder_id) elif coder_proto.spec.spec.urn in good_coder_urns: @@ -483,7 +485,7 @@ def wrap_unknown_coders(coder_id, with_bytes): len_prefix_coder_proto = beam_runner_api_pb2.Coder( spec=beam_runner_api_pb2.SdkFunctionSpec( spec=beam_runner_api_pb2.FunctionSpec( - urn=urns.LENGTH_PREFIX_CODER)), + urn=common_urns.LENGTH_PREFIX_CODER)), component_coder_ids=[coder_id]) coders[wrapped_coder_id].CopyFrom(len_prefix_coder_proto) coder_substitutions[coder_id, with_bytes] = wrapped_coder_id @@ -500,7 +502,7 @@ def fix_pcoll_coder(pcoll): for stage in stages: assert len(stage.transforms) == 1 transform = stage.transforms[0] - if transform.spec.urn == urns.GROUP_BY_KEY_TRANSFORM: + if transform.spec.urn == common_urns.GROUP_BY_KEY_TRANSFORM: for pcoll_id in transform.inputs.values(): fix_pcoll_coder(pipeline_components.pcollections[pcoll_id]) for pcoll_id in transform.outputs.values(): @@ -547,7 +549,7 @@ def sink_flattens(stages): for stage in stages: assert len(stage.transforms) == 1 transform = stage.transforms[0] - if transform.spec.urn == urns.FLATTEN_TRANSFORM: + if transform.spec.urn == common_urns.FLATTEN_TRANSFORM: # This is used later to correlate the read and writes. param = str("materialize:%s" % transform.unique_name) output_pcoll_id, = transform.outputs.values() @@ -773,7 +775,8 @@ def process(stage): coders.populate_map(pipeline_components.coders) known_composites = set( - [urns.GROUP_BY_KEY_TRANSFORM, urns.COMBINE_PER_KEY_TRANSFORM]) + [common_urns.GROUP_BY_KEY_TRANSFORM, + common_urns.COMBINE_PER_KEY_TRANSFORM]) def leaf_transforms(root_ids): for root_id in root_ids: @@ -851,7 +854,7 @@ def extract_endpoints(stage): transform.spec.payload = data_operation_spec.SerializeToString() else: transform.spec.payload = "" - elif transform.spec.urn == urns.PARDO_TRANSFORM: + elif transform.spec.urn == common_urns.PARDO_TRANSFORM: payload = proto_utils.parse_Bytes( transform.spec.payload, beam_runner_api_pb2.ParDoPayload) for tag, si in payload.side_inputs.items(): diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py index 3c14a6f6781d..22cf46d8c92d 100644 --- a/sdks/python/apache_beam/runners/worker/bundle_processor.py +++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py @@ -26,13 +26,13 @@ import json import logging -from google.protobuf import wrappers_pb2 - import apache_beam as beam from apache_beam.coders import WindowedValueCoder from apache_beam.coders import coder_impl from apache_beam.internal import pickler from apache_beam.io import iobase +from apache_beam.portability import common_urns +from apache_beam.portability import python_urns from apache_beam.portability.api import beam_fn_api_pb2 from apache_beam.portability.api import beam_runner_api_pb2 from apache_beam.runners import pipeline_context @@ -42,7 +42,6 @@ from apache_beam.transforms import sideinputs from apache_beam.utils import counters from apache_beam.utils import proto_utils -from apache_beam.utils import urns # This module is experimental. No backwards-compatibility guarantees. @@ -50,9 +49,7 @@ DATA_INPUT_URN = 'urn:org.apache.beam:source:runner:0.1' DATA_OUTPUT_URN = 'urn:org.apache.beam:sink:runner:0.1' IDENTITY_DOFN_URN = 'urn:org.apache.beam:dofn:identity:0.1' -PYTHON_ITERABLE_VIEWFN_URN = 'urn:org.apache.beam:viewfn:iterable:python:0.1' -PYTHON_CODER_URN = 'urn:org.apache.beam:coder:python:0.1' -# TODO(vikasrk): Fix this once runner sends appropriate python urns. +# TODO(vikasrk): Fix this once runner sends appropriate common_urns. OLD_DATAFLOW_RUNNER_HARNESS_PARDO_URN = 'urn:beam:dofn:javasdk:0.1' OLD_DATAFLOW_RUNNER_HARNESS_READ_URN = 'urn:org.apache.beam:source:java:0.1' @@ -199,7 +196,7 @@ def create_execution_tree(self, descriptor): self.state_sampler, self.state_handler) def is_side_input(transform_proto, tag): - if transform_proto.spec.urn == urns.PARDO_TRANSFORM: + if transform_proto.spec.urn == common_urns.PARDO_TRANSFORM: return tag in proto_utils.parse_Bytes( transform_proto.spec.payload, beam_runner_api_pb2.ParDoPayload).side_inputs @@ -417,7 +414,7 @@ def create(factory, transform_id, transform_proto, parameter, consumers): @BeamTransformFactory.register_urn( - urns.READ_TRANSFORM, beam_runner_api_pb2.ReadPayload) + common_urns.READ_TRANSFORM, beam_runner_api_pb2.ReadPayload) def create(factory, transform_id, transform_proto, parameter, consumers): source = iobase.SourceBase.from_runner_api(parameter.source, factory.context) spec = operation_specs.WorkerRead( @@ -440,9 +437,9 @@ def create(factory, transform_id, transform_proto, serialized_fn, consumers): @BeamTransformFactory.register_urn( - urns.PARDO_TRANSFORM, beam_runner_api_pb2.ParDoPayload) + common_urns.PARDO_TRANSFORM, beam_runner_api_pb2.ParDoPayload) def create(factory, transform_id, transform_proto, parameter, consumers): - assert parameter.do_fn.spec.urn == urns.PICKLED_DO_FN_INFO + assert parameter.do_fn.spec.urn == python_urns.PICKLED_DOFN_INFO serialized_fn = parameter.do_fn.spec.payload return _create_pardo_operation( factory, transform_id, transform_proto, consumers, @@ -513,18 +510,7 @@ def _create_simple_pardo_operation( @BeamTransformFactory.register_urn( - urns.GROUP_ALSO_BY_WINDOW_TRANSFORM, wrappers_pb2.BytesValue) -def create(factory, transform_id, transform_proto, parameter, consumers): - # Perhaps this hack can go away once all apply overloads are gone. - from apache_beam.transforms.core import _GroupAlsoByWindowDoFn - return _create_simple_pardo_operation( - factory, transform_id, transform_proto, consumers, - _GroupAlsoByWindowDoFn( - factory.context.windowing_strategies.get_by_id(parameter.value))) - - -@BeamTransformFactory.register_urn( - urns.WINDOW_INTO_TRANSFORM, beam_runner_api_pb2.WindowingStrategy) + common_urns.WINDOW_INTO_TRANSFORM, beam_runner_api_pb2.WindowingStrategy) def create(factory, transform_id, transform_proto, parameter, consumers): class WindowIntoDoFn(beam.DoFn): def __init__(self, windowing): @@ -557,7 +543,7 @@ def create(factory, transform_id, transform_proto, unused_parameter, consumers): @BeamTransformFactory.register_urn( - urns.PRECOMBINE_TRANSFORM, beam_runner_api_pb2.CombinePayload) + common_urns.COMBINE_PGBKCV_TRANSFORM, beam_runner_api_pb2.CombinePayload) def create(factory, transform_id, transform_proto, payload, consumers): # TODO: Combine side inputs. serialized_combine_fn = pickler.dumps( @@ -577,14 +563,16 @@ def create(factory, transform_id, transform_proto, payload, consumers): @BeamTransformFactory.register_urn( - urns.MERGE_ACCUMULATORS_TRANSFORM, beam_runner_api_pb2.CombinePayload) + common_urns.COMBINE_MERGE_ACCUMULATORS_TRANSFORM, + beam_runner_api_pb2.CombinePayload) def create(factory, transform_id, transform_proto, payload, consumers): return _create_combine_phase_operation( factory, transform_proto, payload, consumers, 'merge') @BeamTransformFactory.register_urn( - urns.EXTRACT_OUTPUTS_TRANSFORM, beam_runner_api_pb2.CombinePayload) + common_urns.COMBINE_EXTRACT_OUTPUTS_TRANSFORM, + beam_runner_api_pb2.CombinePayload) def create(factory, transform_id, transform_proto, payload, consumers): return _create_combine_phase_operation( factory, transform_proto, payload, consumers, 'extract') @@ -609,7 +597,7 @@ def _create_combine_phase_operation( consumers) -@BeamTransformFactory.register_urn(urns.FLATTEN_TRANSFORM, None) +@BeamTransformFactory.register_urn(common_urns.FLATTEN_TRANSFORM, None) def create(factory, transform_id, transform_proto, unused_parameter, consumers): return factory.augment_oldstyle_op( operations.create_operation( diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index b4b8b7f64e0a..6e07e3357d4d 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -23,8 +23,6 @@ import inspect import types -from google.protobuf import wrappers_pb2 - from apache_beam import coders from apache_beam import pvalue from apache_beam import typehints @@ -32,6 +30,8 @@ from apache_beam.internal import pickler from apache_beam.internal import util from apache_beam.options.pipeline_options import TypeOptions +from apache_beam.portability import common_urns +from apache_beam.portability import python_urns from apache_beam.portability.api import beam_runner_api_pb2 from apache_beam.transforms import ptransform from apache_beam.transforms.display import DisplayDataItem @@ -378,7 +378,7 @@ def is_process_bounded(self): return False # Method is a classmethod return True - urns.RunnerApiFn.register_pickle_urn(urns.PICKLED_DO_FN) + urns.RunnerApiFn.register_pickle_urn(python_urns.PICKLED_DOFN) def _fn_takes_side_inputs(fn): @@ -582,7 +582,7 @@ def maybe_from_callable(fn): def get_accumulator_coder(self): return coders.registry.get_coder(object) - urns.RunnerApiFn.register_pickle_urn(urns.PICKLED_COMBINE_FN) + urns.RunnerApiFn.register_pickle_urn(python_urns.PICKLED_COMBINE_FN) class CallableWrapperCombineFn(CombineFn): @@ -855,11 +855,11 @@ def to_runner_api_parameter(self, context): "expected instance of ParDo, but got %s" % self.__class__ picked_pardo_fn_data = pickler.dumps(self._pardo_fn_data()) return ( - urns.PARDO_TRANSFORM, + common_urns.PARDO_TRANSFORM, beam_runner_api_pb2.ParDoPayload( do_fn=beam_runner_api_pb2.SdkFunctionSpec( spec=beam_runner_api_pb2.FunctionSpec( - urn=urns.PICKLED_DO_FN_INFO, + urn=python_urns.PICKLED_DOFN_INFO, payload=picked_pardo_fn_data)), # It'd be nice to name these according to their actual # names/positions in the orignal argument list, but such a @@ -871,9 +871,9 @@ def to_runner_api_parameter(self, context): for ix, si in enumerate(self.side_inputs)})) @PTransform.register_urn( - urns.PARDO_TRANSFORM, beam_runner_api_pb2.ParDoPayload) + common_urns.PARDO_TRANSFORM, beam_runner_api_pb2.ParDoPayload) def from_runner_api_parameter(pardo_payload, context): - assert pardo_payload.do_fn.spec.urn == urns.PICKLED_DO_FN_INFO + assert pardo_payload.do_fn.spec.urn == python_urns.PICKLED_DOFN_INFO fn, args, kwargs, si_tags_and_types, windowing = pickler.loads( pardo_payload.do_fn.spec.payload) if si_tags_and_types: @@ -1228,11 +1228,11 @@ def to_runner_api_parameter(self, context): else: combine_fn = self.fn return ( - urns.COMBINE_PER_KEY_TRANSFORM, + common_urns.COMBINE_PER_KEY_TRANSFORM, _combine_payload(combine_fn, context)) @PTransform.register_urn( - urns.COMBINE_PER_KEY_TRANSFORM, beam_runner_api_pb2.CombinePayload) + common_urns.COMBINE_PER_KEY_TRANSFORM, beam_runner_api_pb2.CombinePayload) def from_runner_api_parameter(combine_payload, context): return CombinePerKey( CombineFn.from_runner_api(combine_payload.combine_fn, context)) @@ -1266,11 +1266,12 @@ def to_runner_api_parameter(self, context): else: combine_fn = self.fn return ( - urns.COMBINE_GROUPED_VALUES_TRANSFORM, + common_urns.COMBINE_GROUPED_VALUES_TRANSFORM, _combine_payload(combine_fn, context)) @PTransform.register_urn( - urns.COMBINE_GROUPED_VALUES_TRANSFORM, beam_runner_api_pb2.CombinePayload) + common_urns.COMBINE_GROUPED_VALUES_TRANSFORM, + beam_runner_api_pb2.CombinePayload) def from_runner_api_parameter(combine_payload, context): return CombineValues( CombineFn.from_runner_api(combine_payload.combine_fn, context)) @@ -1395,9 +1396,9 @@ def expand(self, pcoll): | 'GroupByWindow' >> _GroupAlsoByWindow(pcoll.windowing)) def to_runner_api_parameter(self, unused_context): - return urns.GROUP_BY_KEY_TRANSFORM, None + return common_urns.GROUP_BY_KEY_TRANSFORM, None - @PTransform.register_urn(urns.GROUP_BY_KEY_TRANSFORM, None) + @PTransform.register_urn(common_urns.GROUP_BY_KEY_TRANSFORM, None) def from_runner_api_parameter(unused_payload, unused_context): return GroupByKey() @@ -1414,13 +1415,6 @@ def expand(self, pcoll): self._check_pcollection(pcoll) return pvalue.PCollection(pcoll.pipeline) - def to_runner_api_parameter(self, unused_context): - return urns.GROUP_BY_KEY_ONLY_TRANSFORM, None - - @PTransform.register_urn(urns.GROUP_BY_KEY_ONLY_TRANSFORM, None) - def from_runner_api_parameter(unused_payload, unused_context): - return _GroupByKeyOnly() - @typehints.with_input_types(typehints.KV[K, typehints.Iterable[V]]) @typehints.with_output_types(typehints.KV[K, typehints.Iterable[V]]) @@ -1435,18 +1429,6 @@ def expand(self, pcoll): self._check_pcollection(pcoll) return pvalue.PCollection(pcoll.pipeline) - def to_runner_api_parameter(self, context): - return ( - urns.GROUP_ALSO_BY_WINDOW_TRANSFORM, - wrappers_pb2.BytesValue(value=context.windowing_strategies.get_id( - self.windowing))) - - @PTransform.register_urn( - urns.GROUP_ALSO_BY_WINDOW_TRANSFORM, wrappers_pb2.BytesValue) - def from_runner_api_parameter(payload, context): - return _GroupAlsoByWindow( - context.windowing_strategies.get_by_id(payload.value)) - class _GroupAlsoByWindowDoFn(DoFn): # TODO(robertwb): Support combiner lifting. @@ -1644,7 +1626,7 @@ def expand(self, pcoll): def to_runner_api_parameter(self, context): return ( - urns.WINDOW_INTO_TRANSFORM, + common_urns.WINDOW_INTO_TRANSFORM, self.windowing.to_runner_api(context)) @staticmethod @@ -1658,7 +1640,7 @@ def from_runner_api_parameter(proto, context): PTransform.register_urn( - urns.WINDOW_INTO_TRANSFORM, + common_urns.WINDOW_INTO_TRANSFORM, # TODO(robertwb): Update WindowIntoPayload to include the full strategy. # (Right now only WindowFn is used, but we need this to reconstitute the # WindowInto transform, and in the future will need it at runtime to @@ -1715,7 +1697,7 @@ def get_windowing(self, inputs): return super(Flatten, self).get_windowing(inputs) def to_runner_api_parameter(self, context): - return urns.FLATTEN_TRANSFORM, None + return common_urns.FLATTEN_TRANSFORM, None @staticmethod def from_runner_api_parameter(unused_parameter, unused_context): @@ -1723,7 +1705,7 @@ def from_runner_api_parameter(unused_parameter, unused_context): PTransform.register_urn( - urns.FLATTEN_TRANSFORM, None, Flatten.from_runner_api_parameter) + common_urns.FLATTEN_TRANSFORM, None, Flatten.from_runner_api_parameter) class Create(PTransform): diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index 24904956d4d1..c7fc641804dc 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -51,6 +51,7 @@ class and wrapper class that allows lambda functions to be used as from apache_beam import pvalue from apache_beam.internal import pickler from apache_beam.internal import util +from apache_beam.portability import python_urns from apache_beam.transforms.display import DisplayDataItem from apache_beam.transforms.display import HasDisplayData from apache_beam.typehints import typehints @@ -60,7 +61,6 @@ class and wrapper class that allows lambda functions to be used as from apache_beam.typehints.trivial_inference import instance_to_type from apache_beam.typehints.typehints import validate_composite_type_param from apache_beam.utils import proto_utils -from apache_beam.utils import urns __all__ = [ 'PTransform', @@ -555,7 +555,7 @@ def from_runner_api(cls, proto, context): context) def to_runner_api_parameter(self, context): - return (urns.PICKLED_TRANSFORM, + return (python_urns.PICKLED_TRANSFORM, wrappers_pb2.BytesValue(value=pickler.dumps(self))) @staticmethod @@ -564,7 +564,7 @@ def from_runner_api_parameter(spec_parameter, unused_context): PTransform.register_urn( - urns.PICKLED_TRANSFORM, + python_urns.PICKLED_TRANSFORM, wrappers_pb2.BytesValue, PTransform.from_runner_api_parameter) diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py index ee9d6f971871..c250e8c6d365 100644 --- a/sdks/python/apache_beam/transforms/window.py +++ b/sdks/python/apache_beam/transforms/window.py @@ -55,6 +55,8 @@ from google.protobuf import timestamp_pb2 from apache_beam.coders import coders +from apache_beam.portability import common_urns +from apache_beam.portability import python_urns from apache_beam.portability.api import beam_runner_api_pb2 from apache_beam.portability.api import standard_window_fns_pb2 from apache_beam.transforms import timeutil @@ -172,7 +174,7 @@ def get_transformed_output_time(self, window, input_timestamp): # pylint: disab # By default, just return the input timestamp. return input_timestamp - urns.RunnerApiFn.register_pickle_urn(urns.PICKLED_WINDOW_FN) + urns.RunnerApiFn.register_pickle_urn(python_urns.PICKLED_WINDOWFN) class BoundedWindow(object): @@ -306,9 +308,9 @@ def __ne__(self, other): return not self == other def to_runner_api_parameter(self, context): - return urns.GLOBAL_WINDOWS_FN, None + return common_urns.GLOBAL_WINDOWS_WINDOWFN, None - @urns.RunnerApiFn.register_urn(urns.GLOBAL_WINDOWS_FN, None) + @urns.RunnerApiFn.register_urn(common_urns.GLOBAL_WINDOWS_WINDOWFN, None) def from_runner_api_parameter(unused_fn_parameter, unused_context): return GlobalWindows() @@ -349,7 +351,7 @@ def __ne__(self, other): return not self == other def to_runner_api_parameter(self, context): - return (urns.FIXED_WINDOWS_FN, + return (common_urns.FIXED_WINDOWS_WINDOWFN, standard_window_fns_pb2.FixedWindowsPayload( size=proto_utils.from_micros( duration_pb2.Duration, self.size.micros), @@ -357,7 +359,8 @@ def to_runner_api_parameter(self, context): timestamp_pb2.Timestamp, self.offset.micros))) @urns.RunnerApiFn.register_urn( - urns.FIXED_WINDOWS_FN, standard_window_fns_pb2.FixedWindowsPayload) + common_urns.FIXED_WINDOWS_WINDOWFN, + standard_window_fns_pb2.FixedWindowsPayload) def from_runner_api_parameter(fn_parameter, unused_context): return FixedWindows( size=Duration(micros=fn_parameter.size.ToMicroseconds()), @@ -404,7 +407,7 @@ def __eq__(self, other): and self.period == other.period) def to_runner_api_parameter(self, context): - return (urns.SLIDING_WINDOWS_FN, + return (common_urns.SLIDING_WINDOWS_WINDOWFN, standard_window_fns_pb2.SlidingWindowsPayload( size=proto_utils.from_micros( duration_pb2.Duration, self.size.micros), @@ -414,7 +417,7 @@ def to_runner_api_parameter(self, context): duration_pb2.Duration, self.period.micros))) @urns.RunnerApiFn.register_urn( - urns.SLIDING_WINDOWS_FN, + common_urns.SLIDING_WINDOWS_WINDOWFN, standard_window_fns_pb2.SlidingWindowsPayload) def from_runner_api_parameter(fn_parameter, unused_context): return SlidingWindows( @@ -471,13 +474,14 @@ def __eq__(self, other): return self.gap_size == other.gap_size def to_runner_api_parameter(self, context): - return (urns.SESSION_WINDOWS_FN, + return (common_urns.SESSION_WINDOWS_WINDOWFN, standard_window_fns_pb2.SessionsPayload( gap_size=proto_utils.from_micros( duration_pb2.Duration, self.gap_size.micros))) @urns.RunnerApiFn.register_urn( - urns.SESSION_WINDOWS_FN, standard_window_fns_pb2.SessionsPayload) + common_urns.SESSION_WINDOWS_WINDOWFN, + standard_window_fns_pb2.SessionsPayload) def from_runner_api_parameter(fn_parameter, unused_context): return Sessions( gap_size=Duration(micros=fn_parameter.gap_size.ToMicroseconds())) diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py index bd02fe1dfb78..e62fbcd0948c 100644 --- a/sdks/python/apache_beam/utils/urns.py +++ b/sdks/python/apache_beam/utils/urns.py @@ -25,48 +25,6 @@ from apache_beam.internal import pickler from apache_beam.utils import proto_utils -PICKLED_WINDOW_FN = "beam:windowfn:pickled_python:v0.1" -GLOBAL_WINDOWS_FN = "beam:windowfn:global_windows:v0.1" -FIXED_WINDOWS_FN = "beam:windowfn:fixed_windows:v0.1" -SLIDING_WINDOWS_FN = "beam:windowfn:sliding_windows:v0.1" -SESSION_WINDOWS_FN = "beam:windowfn:session_windows:v0.1" - -PICKLED_DO_FN = "beam:dofn:pickled_python:v0.1" -PICKLED_DO_FN_INFO = "beam:dofn:pickled_python_info:v0.1" -PICKLED_COMBINE_FN = "beam:combinefn:pickled_python:v0.1" - -PICKLED_TRANSFORM = "beam:ptransform:pickled_python:v0.1" -PARDO_TRANSFORM = "beam:ptransform:pardo:v0.1" -GROUP_BY_KEY_TRANSFORM = "urn:beam:transform:groupbykey:v1" -GROUP_BY_KEY_ONLY_TRANSFORM = "beam:ptransform:group_by_key_only:v0.1" -GROUP_ALSO_BY_WINDOW_TRANSFORM = "beam:ptransform:group_also_by_window:v0.1" -COMBINE_PER_KEY_TRANSFORM = "beam:ptransform:combine_per_key:v0.1" -COMBINE_GROUPED_VALUES_TRANSFORM = "beam:ptransform:combine_grouped_values:v0.1" -PRECOMBINE_TRANSFORM = "beam:ptransform:combine_pre:v0.1" -MERGE_ACCUMULATORS_TRANSFORM = "beam:ptransform:combine_merge_accumulators:v0.1" -EXTRACT_OUTPUTS_TRANSFORM = "beam:ptransform:combine_extract_outputs:v0.1" -FLATTEN_TRANSFORM = "beam:ptransform:flatten:v0.1" -READ_TRANSFORM = "beam:ptransform:read:v0.1" -RESHUFFLE_TRANSFORM = "beam:ptransform:reshuffle:v0.1" -WINDOW_INTO_TRANSFORM = "beam:ptransform:window_into:v0.1" - -PICKLED_SOURCE = "beam:source:pickled_python:v0.1" - -PICKLED_CODER = "beam:coder:pickled_python:v0.1" -BYTES_CODER = "urn:beam:coders:bytes:0.1" -VAR_INT_CODER = "urn:beam:coders:varint:0.1" -INTERVAL_WINDOW_CODER = "urn:beam:coders:interval_window:0.1" -ITERABLE_CODER = "urn:beam:coders:stream:0.1" -KV_CODER = "urn:beam:coders:kv:0.1" -LENGTH_PREFIX_CODER = "urn:beam:coders:length_prefix:0.1" -GLOBAL_WINDOW_CODER = "urn:beam:coders:global_window:0.1" -WINDOWED_VALUE_CODER = "urn:beam:coders:windowed_value:0.1" - -ITERABLE_ACCESS = "urn:beam:sideinput:iterable" -MULTIMAP_ACCESS = "urn:beam:sideinput:multimap" -PICKLED_PYTHON_VIEWFN = "beam:view_fn:pickled_python_data:v0.1" -PICKLED_WINDOW_MAPPING_FN = "beam:window_mapping_fn:pickled_python:v0.1" - class RunnerApiFn(object): """Abstract base class that provides urn registration utilities. diff --git a/sdks/python/setup.py b/sdks/python/setup.py index d0034f7dee74..722dd2c42341 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -23,6 +23,7 @@ import os import pkg_resources import platform +import re import shutil import subprocess import sys @@ -147,6 +148,35 @@ def run(self): return original_cmd +def generate_common_urns(): + src = os.path.join( + os.path.dirname(__file__), + '../../' + 'model/pipeline/src/main/resources/org/apache/beam/model/common_urns.md') + out = os.path.join( + os.path.dirname(__file__), + 'apache_beam/portability/common_urns.py') + src_time = os.path.getmtime(src) if os.path.exists(src) else -1 + out_time = os.path.getmtime(out) if os.path.exists(out) else -1 + if src_time > out_time: + print 'Regenerating common_urns module.' + urns = {} + for m in re.finditer( + r'\b(?:urn:)?beam:(\S+):(\S+):(v\S+)', open(src).read()): + kind, name, version = m.groups() + var_name = name.upper() + '_' + kind.upper() + if var_name in urns: + var_name += '_' + version.upper().replace('.', '_') + urns[var_name] = m.group(0) + open(out, 'w').write( + '# Autogenerated from common_urns.md\n' + + '# pylint: disable=line-too-long\n\n' + + '\n'.join('%s = "%s"' % urn + for urn in sorted(urns.items(), key=lambda kv: kv[1])) + + '\n') +generate_common_urns() + + setuptools.setup( name=PACKAGE_NAME, version=PACKAGE_VERSION,