From 98d906031d8274902e6ddd410807e026e0e14dc6 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Thu, 21 Dec 2017 13:03:53 -0800 Subject: [PATCH 1/2] [BEAM-3207] Create a standard location to enumerate and document URNs. URNs are listed in a markdown file in the pipeline definitions module. This file is used to auto-generate URN constants for the Python SDK and validate URN constants in the Java SDK (though eventually it'd be good to auto-generate them in this case as well). The format of these common URNs has been normalized to org.apache.beam:type:name:vN[.M] SDK-specific URNs are left as they are. Further fleshing out the definitions and specifications of all these URNS, as well as making sure they are used ubiquitiously, can be deferred to a later PR now that there is a central location to work from. --- .gitignore | 1 + .../org/apache/beam/model/common_urns.md | 122 ++++++++++++++++++ pom.xml | 1 + .../core/construction/CoderTranslation.java | 4 + .../construction/ModelCoderRegistrar.java | 18 +-- .../construction/PTransformTranslation.java | 26 ++-- .../runners/core/construction/UrnUtils.java | 63 +++++++++ .../WindowingStrategyTranslation.java | 15 ++- .../core/construction/UrnUtilsTest.java | 59 +++++++++ runners/java-fn-execution/build.gradle | 1 + .../graph/LengthPrefixUnknownCoders.java | 30 +++-- sdks/python/apache_beam/coders/coders.py | 33 +++-- sdks/python/apache_beam/io/iobase.py | 8 +- sdks/python/apache_beam/pipeline.py | 4 +- .../apache_beam/portability/python_urns.py | 30 +++++ sdks/python/apache_beam/pvalue.py | 20 +-- .../runners/portability/fn_api_runner.py | 49 +++---- .../runners/worker/bundle_processor.py | 40 ++---- sdks/python/apache_beam/transforms/core.py | 56 +++----- .../apache_beam/transforms/ptransform.py | 6 +- sdks/python/apache_beam/transforms/window.py | 22 ++-- sdks/python/apache_beam/utils/urns.py | 42 ------ sdks/python/setup.py | 29 +++++ 23 files changed, 480 insertions(+), 199 deletions(-) create mode 100644 model/pipeline/src/main/resources/org/apache/beam/model/common_urns.md create mode 100644 runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UrnUtils.java create mode 100644 runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UrnUtilsTest.java create mode 100644 sdks/python/apache_beam/portability/python_urns.py diff --git a/.gitignore b/.gitignore index ff2faad05fd8..401b4f614b53 100644 --- a/.gitignore +++ b/.gitignore @@ -37,6 +37,7 @@ sdks/python/LICENSE sdks/python/NOTICE sdks/python/README.md sdks/python/apache_beam/portability/api/*pb2*.* +sdks/python/apache_beam/portability/common_urns.py # Ignore IntelliJ files. **/.idea/**/* diff --git a/model/pipeline/src/main/resources/org/apache/beam/model/common_urns.md b/model/pipeline/src/main/resources/org/apache/beam/model/common_urns.md new file mode 100644 index 000000000000..0c23a03329a2 --- /dev/null +++ b/model/pipeline/src/main/resources/org/apache/beam/model/common_urns.md @@ -0,0 +1,122 @@ + + +# Apache Beam URNs + +This file serves as a central place to enumerate and document the various +URNs used in the Beam portability APIs. + + +## Core Transforms + +### beam:transform:pardo:v1 + +Represents Beam's parallel do operation. + +Payload: A serialized ParDoPayload proto. + +### beam:transform:group_by_key:v1 + +Represents Beam's group-by-key operation. + +Payload: None + +### beam:transform:window_into:v1 + +Payload: A windowing strategy id. + +### beam:transform:flatten:v1 + +### beam:transform:read:v1 + + +## Combining + +If any of the combine operations are produced by an SDK, it is assumed that +the SDK understands the last three combine helper operations. + +### beam:transform:combine_globally:v1 + +### beam:transform:combine_per_key:v1 + +### beam:transform:combine_grouped_values:v1 + +### beam:transform:combine_pgbkcv:v1 + +### beam:transform:combine_merge_accumulators:v1 + +### beam:transform:combine_extract_outputs:v1 + + +## Other common transforms + +### beam:transform:reshuffle:v1 + + +## WindowFns + +### beam:windowfn:global_windows:v1 + +### beam:windowfn:fixed_windows:v1 + +### beam:windowfn:sliding_windows:v1 + +### beam:windowfn:session_windows:v1 + + +## Coders + +### beam:coder:bytes:v1 + +Components: None + +### beam:coder:varint:v1 + +Components: None + +### beam:coder:kv:v1 + +Components: The key and value coder, in that order. + +### beam:coder:iterable:v1 + +Encodes an iterable of elements. + +Components: Coder for a single element. + +## Internal coders + +The following coders are typically not specified by manually by the user, +but are used at runtime and must be supported by every SDK. + +### beam:coder:length_prefix:v1 + +### beam:coder:global_window:v1 + +### beam:coder:interval_window:v1 + +### beam:coder:windowed_value:v1 + + +## Side input access + +### beam:side_input:iterable:v1 + +### beam:side_input:multimap:v1 + diff --git a/pom.xml b/pom.xml index 4cbb38794b5e..dc138d3210fd 100644 --- a/pom.xml +++ b/pom.xml @@ -1950,6 +1950,7 @@ **/sdks/python/NOTICE **/sdks/python/README.md **/sdks/python/apache_beam/portability/api/*pb2*.* + **/sdks/python/apache_beam/portability/common_urns.py **/sdks/python/**/*.c **/sdks/python/**/*.so **/sdks/python/**/*.egg diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java index 4806fb416b11..486fff08dd8b 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java @@ -53,8 +53,12 @@ public class CoderTranslation { private static BiMap, String> loadCoderURNs() { ImmutableBiMap.Builder, String> coderUrns = ImmutableBiMap.builder(); for (CoderTranslatorRegistrar registrar : ServiceLoader.load(CoderTranslatorRegistrar.class)) { + System.out.println(registrar + " " + registrar.getCoderURNs()); coderUrns.putAll(registrar.getCoderURNs()); } + if (coderUrns.build().size() == 0) { + //throw new RuntimeException(); + } return coderUrns.build(); } diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java index b595bc914f53..77152b846ae8 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java @@ -18,6 +18,8 @@ package org.apache.beam.runners.core.construction; +import static org.apache.beam.runners.core.construction.UrnUtils.validateCommonUrn; + import com.google.auto.service.AutoService; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.BiMap; @@ -41,14 +43,14 @@ public class ModelCoderRegistrar implements CoderTranslatorRegistrar { @VisibleForTesting static final BiMap, String> BEAM_MODEL_CODER_URNS = ImmutableBiMap., String>builder() - .put(ByteArrayCoder.class, "urn:beam:coders:bytes:0.1") - .put(KvCoder.class, "urn:beam:coders:kv:0.1") - .put(VarLongCoder.class, "urn:beam:coders:varint:0.1") - .put(IntervalWindowCoder.class, "urn:beam:coders:interval_window:0.1") - .put(IterableCoder.class, "urn:beam:coders:stream:0.1") - .put(LengthPrefixCoder.class, "urn:beam:coders:length_prefix:0.1") - .put(GlobalWindow.Coder.class, "urn:beam:coders:global_window:0.1") - .put(FullWindowedValueCoder.class, "urn:beam:coders:windowed_value:0.1") + .put(ByteArrayCoder.class, validateCommonUrn("beam:coder:bytes:v1")) + .put(KvCoder.class, validateCommonUrn("beam:coder:kv:v1")) + .put(VarLongCoder.class, validateCommonUrn("beam:coder:varint:v1")) + .put(IntervalWindowCoder.class, validateCommonUrn("beam:coder:interval_window:v1")) + .put(IterableCoder.class, validateCommonUrn("beam:coder:iterable:v1")) + .put(LengthPrefixCoder.class, validateCommonUrn("beam:coder:length_prefix:v1")) + .put(GlobalWindow.Coder.class, validateCommonUrn("beam:coder:global_window:v1")) + .put(FullWindowedValueCoder.class, validateCommonUrn("beam:coder:windowed_value:v1")) .build(); @VisibleForTesting diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java index 238fb849b3af..164d5526551a 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java @@ -19,6 +19,7 @@ package org.apache.beam.runners.core.construction; import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.runners.core.construction.UrnUtils.validateCommonUrn; import com.google.auto.value.AutoValue; import com.google.common.base.Joiner; @@ -51,27 +52,34 @@ */ public class PTransformTranslation { - public static final String PAR_DO_TRANSFORM_URN = "urn:beam:transform:pardo:v1"; - public static final String FLATTEN_TRANSFORM_URN = "urn:beam:transform:flatten:v1"; - public static final String GROUP_BY_KEY_TRANSFORM_URN = "urn:beam:transform:groupbykey:v1"; - public static final String READ_TRANSFORM_URN = "urn:beam:transform:read:v1"; - public static final String WINDOW_TRANSFORM_URN = "urn:beam:transform:window:v1"; + public static final String PAR_DO_TRANSFORM_URN = + validateCommonUrn("beam:transform:pardo:v1"); + public static final String FLATTEN_TRANSFORM_URN = + validateCommonUrn("beam:transform:flatten:v1"); + public static final String GROUP_BY_KEY_TRANSFORM_URN = + validateCommonUrn("beam:transform:group_by_key:v1"); + public static final String READ_TRANSFORM_URN = + validateCommonUrn("beam:transform:read:v1"); + public static final String WINDOW_TRANSFORM_URN = + validateCommonUrn("beam:transform:window_into:v1"); public static final String TEST_STREAM_TRANSFORM_URN = "urn:beam:transform:teststream:v1"; // Not strictly a primitive transform - public static final String COMBINE_TRANSFORM_URN = "urn:beam:transform:combine:v1"; + public static final String COMBINE_TRANSFORM_URN = + validateCommonUrn("beam:transform:combine_per_key:v1"); - public static final String RESHUFFLE_URN = "urn:beam:transform:reshuffle:v1"; + public static final String RESHUFFLE_URN = + validateCommonUrn("beam:transform:reshuffle:v1"); // Less well-known. And where shall these live? - public static final String WRITE_FILES_TRANSFORM_URN = "urn:beam:transform:write_files:0.1"; + public static final String WRITE_FILES_TRANSFORM_URN = "beam:transform:write_files:0.1"; /** * @deprecated runners should move away from translating `CreatePCollectionView` and treat this as * part of the translation for a `ParDo` side input. */ @Deprecated - public static final String CREATE_VIEW_TRANSFORM_URN = "urn:beam:transform:create_view:v1"; + public static final String CREATE_VIEW_TRANSFORM_URN = "beam:transform:create_view:v1"; private static final Map, TransformPayloadTranslator> KNOWN_PAYLOAD_TRANSLATORS = loadTransformPayloadTranslators(); diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UrnUtils.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UrnUtils.java new file mode 100644 index 000000000000..a65ee7b62dfc --- /dev/null +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UrnUtils.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import com.google.common.io.CharStreams; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.HashSet; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** Utilities for dealing with URNs. */ +public class UrnUtils { + + private static final String STANDARD_URNS_PATH = "/org/apache/beam/model/common_urns.md"; + private static final Pattern URN_REGEX = Pattern.compile("\\bbeam:\\S+:v[0-9.]+"); + private static final Set COMMON_URNS = extractUrnsFromPath(STANDARD_URNS_PATH); + + private static Set extractUrnsFromPath(String path) { + String contents; + try { + contents = CharStreams.toString(new InputStreamReader( + UrnUtils.class.getResourceAsStream(path))); + } catch (IOException exn) { + throw new RuntimeException(exn); + } + Set urns = new HashSet<>(); + Matcher m = URN_REGEX.matcher(contents); + while (m.find()) { + urns.add(m.group()); + } + return urns; + } + + public static String validateCommonUrn(String urn) { + if (!URN_REGEX.matcher(urn).matches()) { + throw new IllegalArgumentException( + String.format("'%s' does not match '%s'", urn, URN_REGEX)); + } + if (!COMMON_URNS.contains(urn)) { + throw new IllegalArgumentException( + String.format("'%s' is not found in '%s'", urn, STANDARD_URNS_PATH)); + } + return urn; + } +} diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java index 0b9c26f1f4cb..2f46b6fa503c 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java @@ -196,10 +196,17 @@ public static TimestampCombiner timestampCombinerFromProto(RunnerApi.OutputTime. } } - public static final String GLOBAL_WINDOWS_FN = "beam:windowfn:global_windows:v0.1"; - public static final String FIXED_WINDOWS_FN = "beam:windowfn:fixed_windows:v0.1"; - public static final String SLIDING_WINDOWS_FN = "beam:windowfn:sliding_windows:v0.1"; - public static final String SESSION_WINDOWS_FN = "beam:windowfn:session_windows:v0.1"; + public static final String GLOBAL_WINDOWS_FN = "beam:windowfn:global_windows:v1"; + public static final String FIXED_WINDOWS_FN = "beam:windowfn:fixed_windows:v1"; + public static final String SLIDING_WINDOWS_FN = "beam:windowfn:sliding_windows:v1"; + public static final String SESSION_WINDOWS_FN = "beam:windowfn:session_windows:v1"; + { + // Out-of-line to facilitate use in the case statements below. + UrnUtils.validateCommonUrn(GLOBAL_WINDOWS_FN); + UrnUtils.validateCommonUrn(FIXED_WINDOWS_FN); + UrnUtils.validateCommonUrn(SLIDING_WINDOWS_FN); + UrnUtils.validateCommonUrn(SESSION_WINDOWS_FN); + } // This URN says that the WindowFn is just a UDF blob the Java SDK understands // TODO: standardize such things public static final String SERIALIZED_JAVA_WINDOWFN_URN = "beam:windowfn:javasdk:v0.1"; diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UrnUtilsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UrnUtilsTest.java new file mode 100644 index 000000000000..b7e0ba756e51 --- /dev/null +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UrnUtilsTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import org.junit.Test; + +/** + * Tests for UrnUtils. + */ +public class UrnUtilsTest { + + private static final String GOOD_URN = "beam:coder:bytes:v1"; + private static final String MISSING_URN = "beam:fake:v1"; + private static final String BAD_URN = "Beam"; + + @Test + public void testGoodUrnSuccedes() { + assertEquals(GOOD_URN, UrnUtils.validateCommonUrn(GOOD_URN)); + } + + @Test + public void testMissingUrnFails() { + try { + UrnUtils.validateCommonUrn(MISSING_URN); + fail("Should have rejected " + MISSING_URN); + } catch (IllegalArgumentException exn) { + // expected + } + } + + @Test + public void testBadUrnFails() { + try { + UrnUtils.validateCommonUrn(BAD_URN); + fail("Should have rejected " + BAD_URN); + } catch (IllegalArgumentException exn) { + // expected + } + } +} diff --git a/runners/java-fn-execution/build.gradle b/runners/java-fn-execution/build.gradle index b1aa9e8c0ca6..26c65e6775ff 100644 --- a/runners/java-fn-execution/build.gradle +++ b/runners/java-fn-execution/build.gradle @@ -33,6 +33,7 @@ dependencies { compile library.java.guava shadow project(path: ":model:pipeline", configuration: "shadow") shadow project(path: ":model:fn-execution", configuration: "shadow") + shadow project(path: ":runners:core-construction-java", configuration: "shadow") shadow project(path: ":sdks:java:core", configuration: "shadow") shadow project(path: ":sdks:java:fn-execution", configuration: "shadow") shadow library.java.grpc_core diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/graph/LengthPrefixUnknownCoders.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/graph/LengthPrefixUnknownCoders.java index ac7e745b939f..a1b7223324a2 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/graph/LengthPrefixUnknownCoders.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/graph/LengthPrefixUnknownCoders.java @@ -18,11 +18,13 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; +import java.util.Map; import java.util.Set; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.model.pipeline.v1.RunnerApi.Coder; import org.apache.beam.model.pipeline.v1.RunnerApi.Components; import org.apache.beam.model.pipeline.v1.RunnerApi.MessageWithComponents; +import org.apache.beam.runners.core.construction.ModelCoderRegistrar; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.LengthPrefixCoder; @@ -32,18 +34,24 @@ *

TODO: Support a dynamic list of well known coders using either registration or manual listing. */ public class LengthPrefixUnknownCoders { - private static final String BYTES_CODER_TYPE = "urn:beam:coders:bytes:0.1"; - private static final String LENGTH_PREFIX_CODER_TYPE = "urn:beam:coders:length_prefix:0.1"; + static { + assert new ModelCoderRegistrar().getCoderURNs().size() > 0; + } + private static final Map, String> MODEL_CODER_URNS = + new ModelCoderRegistrar().getCoderURNs(); + private static final String BYTES_CODER_TYPE = MODEL_CODER_URNS.get(ByteArrayCoder.class); + private static final String LENGTH_PREFIX_CODER_TYPE = MODEL_CODER_URNS.get(LengthPrefixCoder.class); private static final Set WELL_KNOWN_CODER_URNS = - ImmutableSet.of( - BYTES_CODER_TYPE, - "urn:beam:coders:kv:0.1", - "urn:beam:coders:varint:0.1", - "urn:beam:coders:interval_window:0.1", - "urn:beam:coders:stream:0.1", - LENGTH_PREFIX_CODER_TYPE, - "urn:beam:coders:global_window:0.1", - "urn:beam:coders:windowed_value:0.1"); + ImmutableSet.copyOf(MODEL_CODER_URNS.values()); + // ImmutableSet.of( +// BYTES_CODER_TYPE, +// "urn:beam:coders:kv:0.1", +// "urn:beam:coders:varint:0.1", +// "urn:beam:coders:interval_window:0.1", +// "urn:beam:coders:stream:0.1", +// LENGTH_PREFIX_CODER_TYPE, +// "urn:beam:coders:global_window:0.1", +// "urn:beam:coders:windowed_value:0.1"); /** * Recursively traverse the coder tree and wrap the first unknown coder in every branch with a diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index 64902b592e31..f76625869879 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -25,11 +25,13 @@ import cPickle as pickle import google.protobuf +from google.protobuf import wrappers_pb2 from apache_beam.coders import coder_impl +from apache_beam.portability import common_urns +from apache_beam.portability import python_urns from apache_beam.portability.api import beam_runner_api_pb2 from apache_beam.utils import proto_utils -from apache_beam.utils import urns # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports try: @@ -263,8 +265,8 @@ def from_runner_api(cls, coder_proto, context): def to_runner_api_parameter(self, context): return ( - urns.PICKLED_CODER, - google.protobuf.wrappers_pb2.BytesValue(value=serialize_coder(self)), + python_urns.PICKLED_CODER, + wrappers_pb2.BytesValue(value=serialize_coder(self)), ()) @staticmethod @@ -284,7 +286,8 @@ def from_runner_api_parameter(unused_payload, components, unused_context): return cls() -@Coder.register_urn(urns.PICKLED_CODER, google.protobuf.wrappers_pb2.BytesValue) +@Coder.register_urn( + python_urns.PICKLED_CODER, google.protobuf.wrappers_pb2.BytesValue) def _pickle_from_runner_api_parameter(payload, components, context): return deserialize_coder(payload.value) @@ -363,7 +366,7 @@ def __hash__(self): return hash(type(self)) -Coder.register_structured_urn(urns.BYTES_CODER, BytesCoder) +Coder.register_structured_urn(common_urns.BYTES_CODER, BytesCoder) class VarIntCoder(FastCoder): @@ -382,7 +385,7 @@ def __hash__(self): return hash(type(self)) -Coder.register_structured_urn(urns.VAR_INT_CODER, VarIntCoder) +Coder.register_structured_urn(common_urns.VARINT_CODER, VarIntCoder) class FloatCoder(FastCoder): @@ -736,11 +739,11 @@ def __hash__(self): def to_runner_api_parameter(self, context): if self.is_kv_coder(): - return urns.KV_CODER, None, self.coders() + return common_urns.KV_CODER, None, self.coders() else: return super(TupleCoder, self).to_runner_api_parameter(context) - @Coder.register_urn(urns.KV_CODER, None) + @Coder.register_urn(common_urns.KV_CODER, None) def from_runner_api_parameter(unused_payload, components, unused_context): return TupleCoder(components) @@ -829,7 +832,7 @@ def __hash__(self): return hash((type(self), self._elem_coder)) -Coder.register_structured_urn(urns.ITERABLE_CODER, IterableCoder) +Coder.register_structured_urn(common_urns.ITERABLE_CODER, IterableCoder) class GlobalWindowCoder(SingletonCoder): @@ -845,7 +848,8 @@ def as_cloud_object(self): } -Coder.register_structured_urn(urns.GLOBAL_WINDOW_CODER, GlobalWindowCoder) +Coder.register_structured_urn( + common_urns.GLOBAL_WINDOW_CODER, GlobalWindowCoder) class IntervalWindowCoder(FastCoder): @@ -869,7 +873,8 @@ def __hash__(self): return hash(type(self)) -Coder.register_structured_urn(urns.INTERVAL_WINDOW_CODER, IntervalWindowCoder) +Coder.register_structured_urn( + common_urns.INTERVAL_WINDOW_CODER, IntervalWindowCoder) class WindowedValueCoder(FastCoder): @@ -928,7 +933,8 @@ def __hash__(self): (self.wrapped_value_coder, self.timestamp_coder, self.window_coder)) -Coder.register_structured_urn(urns.WINDOWED_VALUE_CODER, WindowedValueCoder) +Coder.register_structured_urn( + common_urns.WINDOWED_VALUE_CODER, WindowedValueCoder) class LengthPrefixCoder(FastCoder): @@ -972,4 +978,5 @@ def __hash__(self): return hash((type(self), self._value_coder)) -Coder.register_structured_urn(urns.LENGTH_PREFIX_CODER, LengthPrefixCoder) +Coder.register_structured_urn( + common_urns.LENGTH_PREFIX_CODER, LengthPrefixCoder) diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py index fc7a2f3f7b14..eb79f4d3c661 100644 --- a/sdks/python/apache_beam/io/iobase.py +++ b/sdks/python/apache_beam/io/iobase.py @@ -36,6 +36,8 @@ from apache_beam import coders from apache_beam import pvalue +from apache_beam.portability import common_urns +from apache_beam.portability import python_urns from apache_beam.portability.api import beam_runner_api_pb2 from apache_beam.pvalue import AsIter from apache_beam.pvalue import AsSingleton @@ -75,7 +77,7 @@ class SourceBase(HasDisplayData, urns.RunnerApiFn): """Base class for all sources that can be passed to beam.io.Read(...). """ - urns.RunnerApiFn.register_pickle_urn(urns.PICKLED_SOURCE) + urns.RunnerApiFn.register_pickle_urn(python_urns.PICKLED_SOURCE) class BoundedSource(SourceBase): @@ -832,7 +834,7 @@ def display_data(self): 'source_dd': self.source} def to_runner_api_parameter(self, context): - return (urns.READ_TRANSFORM, + return (common_urns.READ_TRANSFORM, beam_runner_api_pb2.ReadPayload( source=self.source.to_runner_api(context), is_bounded=beam_runner_api_pb2.IsBounded.BOUNDED @@ -845,7 +847,7 @@ def from_runner_api_parameter(parameter, context): ptransform.PTransform.register_urn( - urns.READ_TRANSFORM, + common_urns.READ_TRANSFORM, beam_runner_api_pb2.ReadPayload, Read.from_runner_api_parameter) diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 4ac5ea86bf28..4c484288f743 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -61,13 +61,13 @@ from apache_beam.options.pipeline_options import StandardOptions from apache_beam.options.pipeline_options import TypeOptions from apache_beam.options.pipeline_options_validator import PipelineOptionsValidator +from apache_beam.portability import common_urns from apache_beam.pvalue import PCollection from apache_beam.runners import PipelineRunner from apache_beam.runners import create_runner from apache_beam.transforms import ptransform from apache_beam.typehints import TypeCheckError from apache_beam.typehints import typehints -from apache_beam.utils import urns from apache_beam.utils.annotations import deprecated __all__ = ['Pipeline', 'PTransformOverride'] @@ -828,7 +828,7 @@ def is_side_input(tag): None if tag == 'None' else tag: context.pcollections.get_by_id(id) for tag, id in proto.outputs.items()} # This annotation is expected by some runners. - if proto.spec.urn == urns.PARDO_TRANSFORM: + if proto.spec.urn == common_urns.PARDO_TRANSFORM: result.transform.output_tags = set(proto.outputs.keys()).difference( {'None'}) if not result.parts: diff --git a/sdks/python/apache_beam/portability/python_urns.py b/sdks/python/apache_beam/portability/python_urns.py new file mode 100644 index 000000000000..a284b5fe66c0 --- /dev/null +++ b/sdks/python/apache_beam/portability/python_urns.py @@ -0,0 +1,30 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Enumeration of URNs specific to the Python SDK. + +For internal use only; no backwards-compatibility guarantees.""" + +PICKLED_CODER = "beam:coder:pickled_python:v1" +PICKLED_COMBINE_FN = "beam:combinefn:pickled_python:v1" +PICKLED_DOFN = "beam:dofn:pickled_python:v1" +PICKLED_DOFN_INFO = "beam:dofn:pickled_python_info:v1" +PICKLED_SOURCE = "beam:source:pickled_python:v1" +PICKLED_TRANSFORM = "beam:ptransform:pickled_python:v1" +PICKLED_WINDOW_MAPPING_FN = "beam:window_mapping_fn:pickled_python:v1" +PICKLED_WINDOWFN = "beam:windowfn:pickled_python:v1" +PICKLED_VIEWFN = "beam:view_fn:pickled_python_data:v1" diff --git a/sdks/python/apache_beam/pvalue.py b/sdks/python/apache_beam/pvalue.py index 3987fd5dfff3..2aca33e667f6 100644 --- a/sdks/python/apache_beam/pvalue.py +++ b/sdks/python/apache_beam/pvalue.py @@ -31,8 +31,9 @@ from apache_beam import coders from apache_beam import typehints from apache_beam.internal import pickler +from apache_beam.portability import common_urns +from apache_beam.portability import python_urns from apache_beam.portability.api import beam_runner_api_pb2 -from apache_beam.utils import urns __all__ = [ 'PCollection', @@ -301,7 +302,7 @@ def _side_input_data(self): view_options = self._view_options() from_runtime_iterable = type(self)._from_runtime_iterable return SideInputData( - urns.ITERABLE_ACCESS, + common_urns.ITERABLE_SIDE_INPUT, self._window_mapping_fn, lambda iterable: from_runtime_iterable(iterable, view_options), self._input_element_coder()) @@ -354,17 +355,18 @@ def to_runner_api(self, unused_context): urn=self.access_pattern), view_fn=beam_runner_api_pb2.SdkFunctionSpec( spec=beam_runner_api_pb2.FunctionSpec( - urn=urns.PICKLED_PYTHON_VIEWFN, + urn=python_urns.PICKLED_VIEWFN, payload=pickler.dumps((self.view_fn, self.coder)))), window_mapping_fn=beam_runner_api_pb2.SdkFunctionSpec( spec=beam_runner_api_pb2.FunctionSpec( - urn=urns.PICKLED_WINDOW_MAPPING_FN, + urn=python_urns.PICKLED_WINDOW_MAPPING_FN, payload=pickler.dumps(self.window_mapping_fn)))) @staticmethod def from_runner_api(proto, unused_context): - assert proto.view_fn.spec.urn == urns.PICKLED_PYTHON_VIEWFN - assert proto.window_mapping_fn.spec.urn == urns.PICKLED_WINDOW_MAPPING_FN + assert proto.view_fn.spec.urn == python_urns.PICKLED_VIEWFN + assert (proto.window_mapping_fn.spec.urn == + python_urns.PICKLED_WINDOW_MAPPING_FN) return SideInputData( proto.access_pattern.urn, pickler.loads(proto.window_mapping_fn.spec.payload), @@ -442,7 +444,7 @@ def _from_runtime_iterable(it, options): def _side_input_data(self): return SideInputData( - urns.ITERABLE_ACCESS, + common_urns.ITERABLE_SIDE_INPUT, self._window_mapping_fn, lambda iterable: iterable, self._input_element_coder()) @@ -473,7 +475,7 @@ def _from_runtime_iterable(it, options): def _side_input_data(self): return SideInputData( - urns.ITERABLE_ACCESS, + common_urns.ITERABLE_SIDE_INPUT, self._window_mapping_fn, list, self._input_element_coder()) @@ -501,7 +503,7 @@ def _from_runtime_iterable(it, options): def _side_input_data(self): return SideInputData( - urns.ITERABLE_ACCESS, + common_urns.ITERABLE_SIDE_INPUT, self._window_mapping_fn, dict, self._input_element_coder()) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py index 341f06f9a6aa..3d5d14a1ead7 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py @@ -21,6 +21,7 @@ import copy import logging import Queue as queue +import re import threading import time from concurrent import futures @@ -35,6 +36,7 @@ from apache_beam.coders.coder_impl import create_OutputStream from apache_beam.internal import pickler from apache_beam.metrics.execution import MetricsEnvironment +from apache_beam.portability import common_urns from apache_beam.portability.api import beam_fn_api_pb2 from apache_beam.portability.api import beam_fn_api_pb2_grpc from apache_beam.portability.api import beam_runner_api_pb2 @@ -46,7 +48,6 @@ from apache_beam.transforms import trigger from apache_beam.transforms.window import GlobalWindows from apache_beam.utils import proto_utils -from apache_beam.utils import urns # This module is experimental. No backwards-compatibility guarantees. @@ -143,7 +144,7 @@ class _WindowGroupingBuffer(object): def __init__(self, side_input_data): # Here's where we would use a different type of partitioning # (e.g. also by key) for a different access pattern. - assert side_input_data.access_pattern == urns.ITERABLE_ACCESS + assert side_input_data.access_pattern == common_urns.ITERABLE_SIDE_INPUT self._windowed_value_coder = side_input_data.coder self._window_coder = side_input_data.coder.window_coder self._value_coder = side_input_data.coder.wrapped_value_coder @@ -251,12 +252,12 @@ def fuse(self, other): union(self.must_follow, other.must_follow)) def is_flatten(self): - return any(transform.spec.urn == urns.FLATTEN_TRANSFORM + return any(transform.spec.urn == common_urns.FLATTEN_TRANSFORM for transform in self.transforms) def side_inputs(self): for transform in self.transforms: - if transform.spec.urn == urns.PARDO_TRANSFORM: + if transform.spec.urn == common_urns.PARDO_TRANSFORM: payload = proto_utils.parse_Bytes( transform.spec.payload, beam_runner_api_pb2.ParDoPayload) for side_input in payload.side_inputs: @@ -264,7 +265,7 @@ def side_inputs(self): def has_as_main_input(self, pcoll): for transform in self.transforms: - if transform.spec.urn == urns.PARDO_TRANSFORM: + if transform.spec.urn == common_urns.PARDO_TRANSFORM: payload = proto_utils.parse_Bytes( transform.spec.payload, beam_runner_api_pb2.ParDoPayload) local_side_inputs = payload.side_inputs @@ -311,14 +312,14 @@ def windowed_coder_id(coder_id): proto = beam_runner_api_pb2.Coder( spec=beam_runner_api_pb2.SdkFunctionSpec( spec=beam_runner_api_pb2.FunctionSpec( - urn=urns.WINDOWED_VALUE_CODER)), + urn=common_urns.WINDOWED_VALUE_CODER)), component_coder_ids=[coder_id, window_coder_id]) return add_or_get_coder_id(proto) for stage in stages: assert len(stage.transforms) == 1 transform = stage.transforms[0] - if transform.spec.urn == urns.COMBINE_PER_KEY_TRANSFORM: + if transform.spec.urn == common_urns.COMBINE_PER_KEY_TRANSFORM: combine_payload = proto_utils.parse_Bytes( transform.spec.payload, beam_runner_api_pb2.CombinePayload) @@ -338,14 +339,14 @@ def windowed_coder_id(coder_id): key_accumulator_coder = beam_runner_api_pb2.Coder( spec=beam_runner_api_pb2.SdkFunctionSpec( spec=beam_runner_api_pb2.FunctionSpec( - urn=urns.KV_CODER)), + urn=common_urns.KV_CODER)), component_coder_ids=[key_coder_id, accumulator_coder_id]) key_accumulator_coder_id = add_or_get_coder_id(key_accumulator_coder) accumulator_iter_coder = beam_runner_api_pb2.Coder( spec=beam_runner_api_pb2.SdkFunctionSpec( spec=beam_runner_api_pb2.FunctionSpec( - urn=urns.ITERABLE_CODER)), + urn=common_urns.ITERABLE_CODER)), component_coder_ids=[accumulator_coder_id]) accumulator_iter_coder_id = add_or_get_coder_id( accumulator_iter_coder) @@ -353,7 +354,7 @@ def windowed_coder_id(coder_id): key_accumulator_iter_coder = beam_runner_api_pb2.Coder( spec=beam_runner_api_pb2.SdkFunctionSpec( spec=beam_runner_api_pb2.FunctionSpec( - urn=urns.KV_CODER)), + urn=common_urns.KV_CODER)), component_coder_ids=[key_coder_id, accumulator_iter_coder_id]) key_accumulator_iter_coder_id = add_or_get_coder_id( key_accumulator_iter_coder) @@ -397,7 +398,7 @@ def make_stage(base_stage, transform): beam_runner_api_pb2.PTransform( unique_name=transform.unique_name + '/Precombine', spec=beam_runner_api_pb2.FunctionSpec( - urn=urns.PRECOMBINE_TRANSFORM, + urn=common_urns.COMBINE_PGBKCV_TRANSFORM, payload=transform.spec.payload), inputs=transform.inputs, outputs={'out': precombined_pcoll_id})) @@ -407,7 +408,7 @@ def make_stage(base_stage, transform): beam_runner_api_pb2.PTransform( unique_name=transform.unique_name + '/Group', spec=beam_runner_api_pb2.FunctionSpec( - urn=urns.GROUP_BY_KEY_TRANSFORM), + urn=common_urns.GROUP_BY_KEY_TRANSFORM), inputs={'in': precombined_pcoll_id}, outputs={'out': grouped_pcoll_id})) @@ -416,7 +417,7 @@ def make_stage(base_stage, transform): beam_runner_api_pb2.PTransform( unique_name=transform.unique_name + '/Merge', spec=beam_runner_api_pb2.FunctionSpec( - urn=urns.MERGE_ACCUMULATORS_TRANSFORM, + urn=common_urns.COMBINE_MERGE_ACCUMULATORS_TRANSFORM, payload=transform.spec.payload), inputs={'in': grouped_pcoll_id}, outputs={'out': merged_pcoll_id})) @@ -426,7 +427,7 @@ def make_stage(base_stage, transform): beam_runner_api_pb2.PTransform( unique_name=transform.unique_name + '/ExtractOutputs', spec=beam_runner_api_pb2.FunctionSpec( - urn=urns.EXTRACT_OUTPUTS_TRANSFORM, + urn=common_urns.COMBINE_EXTRACT_OUTPUTS_TRANSFORM, payload=transform.spec.payload), inputs={'in': merged_pcoll_id}, outputs=transform.outputs)) @@ -437,12 +438,13 @@ def make_stage(base_stage, transform): def expand_gbk(stages): """Transforms each GBK into a write followed by a read. """ - good_coder_urns = set(beam.coders.Coder._known_urns.keys()) - set([ - urns.PICKLED_CODER]) + good_coder_urns = set( + value for key, value in common_urns.__dict__.items() + if re.match('[A-Z][A-Z_]*$', key)) coders = pipeline_components.coders for coder_id, coder_proto in coders.items(): - if coder_proto.spec.spec.urn == urns.BYTES_CODER: + if coder_proto.spec.spec.urn == common_urns.BYTES_CODER: bytes_coder_id = coder_id break else: @@ -456,7 +458,7 @@ def wrap_unknown_coders(coder_id, with_bytes): if (coder_id, with_bytes) not in coder_substitutions: wrapped_coder_id = None coder_proto = coders[coder_id] - if coder_proto.spec.spec.urn == urns.LENGTH_PREFIX_CODER: + if coder_proto.spec.spec.urn == common_urns.LENGTH_PREFIX_CODER: coder_substitutions[coder_id, with_bytes] = ( bytes_coder_id if with_bytes else coder_id) elif coder_proto.spec.spec.urn in good_coder_urns: @@ -483,7 +485,7 @@ def wrap_unknown_coders(coder_id, with_bytes): len_prefix_coder_proto = beam_runner_api_pb2.Coder( spec=beam_runner_api_pb2.SdkFunctionSpec( spec=beam_runner_api_pb2.FunctionSpec( - urn=urns.LENGTH_PREFIX_CODER)), + urn=common_urns.LENGTH_PREFIX_CODER)), component_coder_ids=[coder_id]) coders[wrapped_coder_id].CopyFrom(len_prefix_coder_proto) coder_substitutions[coder_id, with_bytes] = wrapped_coder_id @@ -500,7 +502,7 @@ def fix_pcoll_coder(pcoll): for stage in stages: assert len(stage.transforms) == 1 transform = stage.transforms[0] - if transform.spec.urn == urns.GROUP_BY_KEY_TRANSFORM: + if transform.spec.urn == common_urns.GROUP_BY_KEY_TRANSFORM: for pcoll_id in transform.inputs.values(): fix_pcoll_coder(pipeline_components.pcollections[pcoll_id]) for pcoll_id in transform.outputs.values(): @@ -547,7 +549,7 @@ def sink_flattens(stages): for stage in stages: assert len(stage.transforms) == 1 transform = stage.transforms[0] - if transform.spec.urn == urns.FLATTEN_TRANSFORM: + if transform.spec.urn == common_urns.FLATTEN_TRANSFORM: # This is used later to correlate the read and writes. param = str("materialize:%s" % transform.unique_name) output_pcoll_id, = transform.outputs.values() @@ -773,7 +775,8 @@ def process(stage): coders.populate_map(pipeline_components.coders) known_composites = set( - [urns.GROUP_BY_KEY_TRANSFORM, urns.COMBINE_PER_KEY_TRANSFORM]) + [common_urns.GROUP_BY_KEY_TRANSFORM, + common_urns.COMBINE_PER_KEY_TRANSFORM]) def leaf_transforms(root_ids): for root_id in root_ids: @@ -851,7 +854,7 @@ def extract_endpoints(stage): transform.spec.payload = data_operation_spec.SerializeToString() else: transform.spec.payload = "" - elif transform.spec.urn == urns.PARDO_TRANSFORM: + elif transform.spec.urn == common_urns.PARDO_TRANSFORM: payload = proto_utils.parse_Bytes( transform.spec.payload, beam_runner_api_pb2.ParDoPayload) for tag, si in payload.side_inputs.items(): diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py index 3c14a6f6781d..22cf46d8c92d 100644 --- a/sdks/python/apache_beam/runners/worker/bundle_processor.py +++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py @@ -26,13 +26,13 @@ import json import logging -from google.protobuf import wrappers_pb2 - import apache_beam as beam from apache_beam.coders import WindowedValueCoder from apache_beam.coders import coder_impl from apache_beam.internal import pickler from apache_beam.io import iobase +from apache_beam.portability import common_urns +from apache_beam.portability import python_urns from apache_beam.portability.api import beam_fn_api_pb2 from apache_beam.portability.api import beam_runner_api_pb2 from apache_beam.runners import pipeline_context @@ -42,7 +42,6 @@ from apache_beam.transforms import sideinputs from apache_beam.utils import counters from apache_beam.utils import proto_utils -from apache_beam.utils import urns # This module is experimental. No backwards-compatibility guarantees. @@ -50,9 +49,7 @@ DATA_INPUT_URN = 'urn:org.apache.beam:source:runner:0.1' DATA_OUTPUT_URN = 'urn:org.apache.beam:sink:runner:0.1' IDENTITY_DOFN_URN = 'urn:org.apache.beam:dofn:identity:0.1' -PYTHON_ITERABLE_VIEWFN_URN = 'urn:org.apache.beam:viewfn:iterable:python:0.1' -PYTHON_CODER_URN = 'urn:org.apache.beam:coder:python:0.1' -# TODO(vikasrk): Fix this once runner sends appropriate python urns. +# TODO(vikasrk): Fix this once runner sends appropriate common_urns. OLD_DATAFLOW_RUNNER_HARNESS_PARDO_URN = 'urn:beam:dofn:javasdk:0.1' OLD_DATAFLOW_RUNNER_HARNESS_READ_URN = 'urn:org.apache.beam:source:java:0.1' @@ -199,7 +196,7 @@ def create_execution_tree(self, descriptor): self.state_sampler, self.state_handler) def is_side_input(transform_proto, tag): - if transform_proto.spec.urn == urns.PARDO_TRANSFORM: + if transform_proto.spec.urn == common_urns.PARDO_TRANSFORM: return tag in proto_utils.parse_Bytes( transform_proto.spec.payload, beam_runner_api_pb2.ParDoPayload).side_inputs @@ -417,7 +414,7 @@ def create(factory, transform_id, transform_proto, parameter, consumers): @BeamTransformFactory.register_urn( - urns.READ_TRANSFORM, beam_runner_api_pb2.ReadPayload) + common_urns.READ_TRANSFORM, beam_runner_api_pb2.ReadPayload) def create(factory, transform_id, transform_proto, parameter, consumers): source = iobase.SourceBase.from_runner_api(parameter.source, factory.context) spec = operation_specs.WorkerRead( @@ -440,9 +437,9 @@ def create(factory, transform_id, transform_proto, serialized_fn, consumers): @BeamTransformFactory.register_urn( - urns.PARDO_TRANSFORM, beam_runner_api_pb2.ParDoPayload) + common_urns.PARDO_TRANSFORM, beam_runner_api_pb2.ParDoPayload) def create(factory, transform_id, transform_proto, parameter, consumers): - assert parameter.do_fn.spec.urn == urns.PICKLED_DO_FN_INFO + assert parameter.do_fn.spec.urn == python_urns.PICKLED_DOFN_INFO serialized_fn = parameter.do_fn.spec.payload return _create_pardo_operation( factory, transform_id, transform_proto, consumers, @@ -513,18 +510,7 @@ def _create_simple_pardo_operation( @BeamTransformFactory.register_urn( - urns.GROUP_ALSO_BY_WINDOW_TRANSFORM, wrappers_pb2.BytesValue) -def create(factory, transform_id, transform_proto, parameter, consumers): - # Perhaps this hack can go away once all apply overloads are gone. - from apache_beam.transforms.core import _GroupAlsoByWindowDoFn - return _create_simple_pardo_operation( - factory, transform_id, transform_proto, consumers, - _GroupAlsoByWindowDoFn( - factory.context.windowing_strategies.get_by_id(parameter.value))) - - -@BeamTransformFactory.register_urn( - urns.WINDOW_INTO_TRANSFORM, beam_runner_api_pb2.WindowingStrategy) + common_urns.WINDOW_INTO_TRANSFORM, beam_runner_api_pb2.WindowingStrategy) def create(factory, transform_id, transform_proto, parameter, consumers): class WindowIntoDoFn(beam.DoFn): def __init__(self, windowing): @@ -557,7 +543,7 @@ def create(factory, transform_id, transform_proto, unused_parameter, consumers): @BeamTransformFactory.register_urn( - urns.PRECOMBINE_TRANSFORM, beam_runner_api_pb2.CombinePayload) + common_urns.COMBINE_PGBKCV_TRANSFORM, beam_runner_api_pb2.CombinePayload) def create(factory, transform_id, transform_proto, payload, consumers): # TODO: Combine side inputs. serialized_combine_fn = pickler.dumps( @@ -577,14 +563,16 @@ def create(factory, transform_id, transform_proto, payload, consumers): @BeamTransformFactory.register_urn( - urns.MERGE_ACCUMULATORS_TRANSFORM, beam_runner_api_pb2.CombinePayload) + common_urns.COMBINE_MERGE_ACCUMULATORS_TRANSFORM, + beam_runner_api_pb2.CombinePayload) def create(factory, transform_id, transform_proto, payload, consumers): return _create_combine_phase_operation( factory, transform_proto, payload, consumers, 'merge') @BeamTransformFactory.register_urn( - urns.EXTRACT_OUTPUTS_TRANSFORM, beam_runner_api_pb2.CombinePayload) + common_urns.COMBINE_EXTRACT_OUTPUTS_TRANSFORM, + beam_runner_api_pb2.CombinePayload) def create(factory, transform_id, transform_proto, payload, consumers): return _create_combine_phase_operation( factory, transform_proto, payload, consumers, 'extract') @@ -609,7 +597,7 @@ def _create_combine_phase_operation( consumers) -@BeamTransformFactory.register_urn(urns.FLATTEN_TRANSFORM, None) +@BeamTransformFactory.register_urn(common_urns.FLATTEN_TRANSFORM, None) def create(factory, transform_id, transform_proto, unused_parameter, consumers): return factory.augment_oldstyle_op( operations.create_operation( diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index b4b8b7f64e0a..6e07e3357d4d 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -23,8 +23,6 @@ import inspect import types -from google.protobuf import wrappers_pb2 - from apache_beam import coders from apache_beam import pvalue from apache_beam import typehints @@ -32,6 +30,8 @@ from apache_beam.internal import pickler from apache_beam.internal import util from apache_beam.options.pipeline_options import TypeOptions +from apache_beam.portability import common_urns +from apache_beam.portability import python_urns from apache_beam.portability.api import beam_runner_api_pb2 from apache_beam.transforms import ptransform from apache_beam.transforms.display import DisplayDataItem @@ -378,7 +378,7 @@ def is_process_bounded(self): return False # Method is a classmethod return True - urns.RunnerApiFn.register_pickle_urn(urns.PICKLED_DO_FN) + urns.RunnerApiFn.register_pickle_urn(python_urns.PICKLED_DOFN) def _fn_takes_side_inputs(fn): @@ -582,7 +582,7 @@ def maybe_from_callable(fn): def get_accumulator_coder(self): return coders.registry.get_coder(object) - urns.RunnerApiFn.register_pickle_urn(urns.PICKLED_COMBINE_FN) + urns.RunnerApiFn.register_pickle_urn(python_urns.PICKLED_COMBINE_FN) class CallableWrapperCombineFn(CombineFn): @@ -855,11 +855,11 @@ def to_runner_api_parameter(self, context): "expected instance of ParDo, but got %s" % self.__class__ picked_pardo_fn_data = pickler.dumps(self._pardo_fn_data()) return ( - urns.PARDO_TRANSFORM, + common_urns.PARDO_TRANSFORM, beam_runner_api_pb2.ParDoPayload( do_fn=beam_runner_api_pb2.SdkFunctionSpec( spec=beam_runner_api_pb2.FunctionSpec( - urn=urns.PICKLED_DO_FN_INFO, + urn=python_urns.PICKLED_DOFN_INFO, payload=picked_pardo_fn_data)), # It'd be nice to name these according to their actual # names/positions in the orignal argument list, but such a @@ -871,9 +871,9 @@ def to_runner_api_parameter(self, context): for ix, si in enumerate(self.side_inputs)})) @PTransform.register_urn( - urns.PARDO_TRANSFORM, beam_runner_api_pb2.ParDoPayload) + common_urns.PARDO_TRANSFORM, beam_runner_api_pb2.ParDoPayload) def from_runner_api_parameter(pardo_payload, context): - assert pardo_payload.do_fn.spec.urn == urns.PICKLED_DO_FN_INFO + assert pardo_payload.do_fn.spec.urn == python_urns.PICKLED_DOFN_INFO fn, args, kwargs, si_tags_and_types, windowing = pickler.loads( pardo_payload.do_fn.spec.payload) if si_tags_and_types: @@ -1228,11 +1228,11 @@ def to_runner_api_parameter(self, context): else: combine_fn = self.fn return ( - urns.COMBINE_PER_KEY_TRANSFORM, + common_urns.COMBINE_PER_KEY_TRANSFORM, _combine_payload(combine_fn, context)) @PTransform.register_urn( - urns.COMBINE_PER_KEY_TRANSFORM, beam_runner_api_pb2.CombinePayload) + common_urns.COMBINE_PER_KEY_TRANSFORM, beam_runner_api_pb2.CombinePayload) def from_runner_api_parameter(combine_payload, context): return CombinePerKey( CombineFn.from_runner_api(combine_payload.combine_fn, context)) @@ -1266,11 +1266,12 @@ def to_runner_api_parameter(self, context): else: combine_fn = self.fn return ( - urns.COMBINE_GROUPED_VALUES_TRANSFORM, + common_urns.COMBINE_GROUPED_VALUES_TRANSFORM, _combine_payload(combine_fn, context)) @PTransform.register_urn( - urns.COMBINE_GROUPED_VALUES_TRANSFORM, beam_runner_api_pb2.CombinePayload) + common_urns.COMBINE_GROUPED_VALUES_TRANSFORM, + beam_runner_api_pb2.CombinePayload) def from_runner_api_parameter(combine_payload, context): return CombineValues( CombineFn.from_runner_api(combine_payload.combine_fn, context)) @@ -1395,9 +1396,9 @@ def expand(self, pcoll): | 'GroupByWindow' >> _GroupAlsoByWindow(pcoll.windowing)) def to_runner_api_parameter(self, unused_context): - return urns.GROUP_BY_KEY_TRANSFORM, None + return common_urns.GROUP_BY_KEY_TRANSFORM, None - @PTransform.register_urn(urns.GROUP_BY_KEY_TRANSFORM, None) + @PTransform.register_urn(common_urns.GROUP_BY_KEY_TRANSFORM, None) def from_runner_api_parameter(unused_payload, unused_context): return GroupByKey() @@ -1414,13 +1415,6 @@ def expand(self, pcoll): self._check_pcollection(pcoll) return pvalue.PCollection(pcoll.pipeline) - def to_runner_api_parameter(self, unused_context): - return urns.GROUP_BY_KEY_ONLY_TRANSFORM, None - - @PTransform.register_urn(urns.GROUP_BY_KEY_ONLY_TRANSFORM, None) - def from_runner_api_parameter(unused_payload, unused_context): - return _GroupByKeyOnly() - @typehints.with_input_types(typehints.KV[K, typehints.Iterable[V]]) @typehints.with_output_types(typehints.KV[K, typehints.Iterable[V]]) @@ -1435,18 +1429,6 @@ def expand(self, pcoll): self._check_pcollection(pcoll) return pvalue.PCollection(pcoll.pipeline) - def to_runner_api_parameter(self, context): - return ( - urns.GROUP_ALSO_BY_WINDOW_TRANSFORM, - wrappers_pb2.BytesValue(value=context.windowing_strategies.get_id( - self.windowing))) - - @PTransform.register_urn( - urns.GROUP_ALSO_BY_WINDOW_TRANSFORM, wrappers_pb2.BytesValue) - def from_runner_api_parameter(payload, context): - return _GroupAlsoByWindow( - context.windowing_strategies.get_by_id(payload.value)) - class _GroupAlsoByWindowDoFn(DoFn): # TODO(robertwb): Support combiner lifting. @@ -1644,7 +1626,7 @@ def expand(self, pcoll): def to_runner_api_parameter(self, context): return ( - urns.WINDOW_INTO_TRANSFORM, + common_urns.WINDOW_INTO_TRANSFORM, self.windowing.to_runner_api(context)) @staticmethod @@ -1658,7 +1640,7 @@ def from_runner_api_parameter(proto, context): PTransform.register_urn( - urns.WINDOW_INTO_TRANSFORM, + common_urns.WINDOW_INTO_TRANSFORM, # TODO(robertwb): Update WindowIntoPayload to include the full strategy. # (Right now only WindowFn is used, but we need this to reconstitute the # WindowInto transform, and in the future will need it at runtime to @@ -1715,7 +1697,7 @@ def get_windowing(self, inputs): return super(Flatten, self).get_windowing(inputs) def to_runner_api_parameter(self, context): - return urns.FLATTEN_TRANSFORM, None + return common_urns.FLATTEN_TRANSFORM, None @staticmethod def from_runner_api_parameter(unused_parameter, unused_context): @@ -1723,7 +1705,7 @@ def from_runner_api_parameter(unused_parameter, unused_context): PTransform.register_urn( - urns.FLATTEN_TRANSFORM, None, Flatten.from_runner_api_parameter) + common_urns.FLATTEN_TRANSFORM, None, Flatten.from_runner_api_parameter) class Create(PTransform): diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index 24904956d4d1..c7fc641804dc 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -51,6 +51,7 @@ class and wrapper class that allows lambda functions to be used as from apache_beam import pvalue from apache_beam.internal import pickler from apache_beam.internal import util +from apache_beam.portability import python_urns from apache_beam.transforms.display import DisplayDataItem from apache_beam.transforms.display import HasDisplayData from apache_beam.typehints import typehints @@ -60,7 +61,6 @@ class and wrapper class that allows lambda functions to be used as from apache_beam.typehints.trivial_inference import instance_to_type from apache_beam.typehints.typehints import validate_composite_type_param from apache_beam.utils import proto_utils -from apache_beam.utils import urns __all__ = [ 'PTransform', @@ -555,7 +555,7 @@ def from_runner_api(cls, proto, context): context) def to_runner_api_parameter(self, context): - return (urns.PICKLED_TRANSFORM, + return (python_urns.PICKLED_TRANSFORM, wrappers_pb2.BytesValue(value=pickler.dumps(self))) @staticmethod @@ -564,7 +564,7 @@ def from_runner_api_parameter(spec_parameter, unused_context): PTransform.register_urn( - urns.PICKLED_TRANSFORM, + python_urns.PICKLED_TRANSFORM, wrappers_pb2.BytesValue, PTransform.from_runner_api_parameter) diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py index ee9d6f971871..c250e8c6d365 100644 --- a/sdks/python/apache_beam/transforms/window.py +++ b/sdks/python/apache_beam/transforms/window.py @@ -55,6 +55,8 @@ from google.protobuf import timestamp_pb2 from apache_beam.coders import coders +from apache_beam.portability import common_urns +from apache_beam.portability import python_urns from apache_beam.portability.api import beam_runner_api_pb2 from apache_beam.portability.api import standard_window_fns_pb2 from apache_beam.transforms import timeutil @@ -172,7 +174,7 @@ def get_transformed_output_time(self, window, input_timestamp): # pylint: disab # By default, just return the input timestamp. return input_timestamp - urns.RunnerApiFn.register_pickle_urn(urns.PICKLED_WINDOW_FN) + urns.RunnerApiFn.register_pickle_urn(python_urns.PICKLED_WINDOWFN) class BoundedWindow(object): @@ -306,9 +308,9 @@ def __ne__(self, other): return not self == other def to_runner_api_parameter(self, context): - return urns.GLOBAL_WINDOWS_FN, None + return common_urns.GLOBAL_WINDOWS_WINDOWFN, None - @urns.RunnerApiFn.register_urn(urns.GLOBAL_WINDOWS_FN, None) + @urns.RunnerApiFn.register_urn(common_urns.GLOBAL_WINDOWS_WINDOWFN, None) def from_runner_api_parameter(unused_fn_parameter, unused_context): return GlobalWindows() @@ -349,7 +351,7 @@ def __ne__(self, other): return not self == other def to_runner_api_parameter(self, context): - return (urns.FIXED_WINDOWS_FN, + return (common_urns.FIXED_WINDOWS_WINDOWFN, standard_window_fns_pb2.FixedWindowsPayload( size=proto_utils.from_micros( duration_pb2.Duration, self.size.micros), @@ -357,7 +359,8 @@ def to_runner_api_parameter(self, context): timestamp_pb2.Timestamp, self.offset.micros))) @urns.RunnerApiFn.register_urn( - urns.FIXED_WINDOWS_FN, standard_window_fns_pb2.FixedWindowsPayload) + common_urns.FIXED_WINDOWS_WINDOWFN, + standard_window_fns_pb2.FixedWindowsPayload) def from_runner_api_parameter(fn_parameter, unused_context): return FixedWindows( size=Duration(micros=fn_parameter.size.ToMicroseconds()), @@ -404,7 +407,7 @@ def __eq__(self, other): and self.period == other.period) def to_runner_api_parameter(self, context): - return (urns.SLIDING_WINDOWS_FN, + return (common_urns.SLIDING_WINDOWS_WINDOWFN, standard_window_fns_pb2.SlidingWindowsPayload( size=proto_utils.from_micros( duration_pb2.Duration, self.size.micros), @@ -414,7 +417,7 @@ def to_runner_api_parameter(self, context): duration_pb2.Duration, self.period.micros))) @urns.RunnerApiFn.register_urn( - urns.SLIDING_WINDOWS_FN, + common_urns.SLIDING_WINDOWS_WINDOWFN, standard_window_fns_pb2.SlidingWindowsPayload) def from_runner_api_parameter(fn_parameter, unused_context): return SlidingWindows( @@ -471,13 +474,14 @@ def __eq__(self, other): return self.gap_size == other.gap_size def to_runner_api_parameter(self, context): - return (urns.SESSION_WINDOWS_FN, + return (common_urns.SESSION_WINDOWS_WINDOWFN, standard_window_fns_pb2.SessionsPayload( gap_size=proto_utils.from_micros( duration_pb2.Duration, self.gap_size.micros))) @urns.RunnerApiFn.register_urn( - urns.SESSION_WINDOWS_FN, standard_window_fns_pb2.SessionsPayload) + common_urns.SESSION_WINDOWS_WINDOWFN, + standard_window_fns_pb2.SessionsPayload) def from_runner_api_parameter(fn_parameter, unused_context): return Sessions( gap_size=Duration(micros=fn_parameter.gap_size.ToMicroseconds())) diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py index bd02fe1dfb78..e62fbcd0948c 100644 --- a/sdks/python/apache_beam/utils/urns.py +++ b/sdks/python/apache_beam/utils/urns.py @@ -25,48 +25,6 @@ from apache_beam.internal import pickler from apache_beam.utils import proto_utils -PICKLED_WINDOW_FN = "beam:windowfn:pickled_python:v0.1" -GLOBAL_WINDOWS_FN = "beam:windowfn:global_windows:v0.1" -FIXED_WINDOWS_FN = "beam:windowfn:fixed_windows:v0.1" -SLIDING_WINDOWS_FN = "beam:windowfn:sliding_windows:v0.1" -SESSION_WINDOWS_FN = "beam:windowfn:session_windows:v0.1" - -PICKLED_DO_FN = "beam:dofn:pickled_python:v0.1" -PICKLED_DO_FN_INFO = "beam:dofn:pickled_python_info:v0.1" -PICKLED_COMBINE_FN = "beam:combinefn:pickled_python:v0.1" - -PICKLED_TRANSFORM = "beam:ptransform:pickled_python:v0.1" -PARDO_TRANSFORM = "beam:ptransform:pardo:v0.1" -GROUP_BY_KEY_TRANSFORM = "urn:beam:transform:groupbykey:v1" -GROUP_BY_KEY_ONLY_TRANSFORM = "beam:ptransform:group_by_key_only:v0.1" -GROUP_ALSO_BY_WINDOW_TRANSFORM = "beam:ptransform:group_also_by_window:v0.1" -COMBINE_PER_KEY_TRANSFORM = "beam:ptransform:combine_per_key:v0.1" -COMBINE_GROUPED_VALUES_TRANSFORM = "beam:ptransform:combine_grouped_values:v0.1" -PRECOMBINE_TRANSFORM = "beam:ptransform:combine_pre:v0.1" -MERGE_ACCUMULATORS_TRANSFORM = "beam:ptransform:combine_merge_accumulators:v0.1" -EXTRACT_OUTPUTS_TRANSFORM = "beam:ptransform:combine_extract_outputs:v0.1" -FLATTEN_TRANSFORM = "beam:ptransform:flatten:v0.1" -READ_TRANSFORM = "beam:ptransform:read:v0.1" -RESHUFFLE_TRANSFORM = "beam:ptransform:reshuffle:v0.1" -WINDOW_INTO_TRANSFORM = "beam:ptransform:window_into:v0.1" - -PICKLED_SOURCE = "beam:source:pickled_python:v0.1" - -PICKLED_CODER = "beam:coder:pickled_python:v0.1" -BYTES_CODER = "urn:beam:coders:bytes:0.1" -VAR_INT_CODER = "urn:beam:coders:varint:0.1" -INTERVAL_WINDOW_CODER = "urn:beam:coders:interval_window:0.1" -ITERABLE_CODER = "urn:beam:coders:stream:0.1" -KV_CODER = "urn:beam:coders:kv:0.1" -LENGTH_PREFIX_CODER = "urn:beam:coders:length_prefix:0.1" -GLOBAL_WINDOW_CODER = "urn:beam:coders:global_window:0.1" -WINDOWED_VALUE_CODER = "urn:beam:coders:windowed_value:0.1" - -ITERABLE_ACCESS = "urn:beam:sideinput:iterable" -MULTIMAP_ACCESS = "urn:beam:sideinput:multimap" -PICKLED_PYTHON_VIEWFN = "beam:view_fn:pickled_python_data:v0.1" -PICKLED_WINDOW_MAPPING_FN = "beam:window_mapping_fn:pickled_python:v0.1" - class RunnerApiFn(object): """Abstract base class that provides urn registration utilities. diff --git a/sdks/python/setup.py b/sdks/python/setup.py index d0034f7dee74..d524b706bb49 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -23,6 +23,7 @@ import os import pkg_resources import platform +import re import shutil import subprocess import sys @@ -147,6 +148,34 @@ def run(self): return original_cmd +def generate_common_urns(): + src = os.path.join( + os.path.dirname(__file__), + '../../' + 'model/pipeline/src/main/resources/org/apache/beam/model/common_urns.md') + out = os.path.join( + os.path.dirname(__file__), + 'apache_beam/portability/common_urns.py') + src_time = os.path.getmtime(src) if os.path.exists(src) else -1 + out_time = os.path.getmtime(out) if os.path.exists(out) else -1 + if src_time > out_time: + print 'Regenerating common_urns module.' + urns = {} + for m in re.finditer(r'\bbeam:(\S+):(\S+):(v\S+)', open(src).read()): + kind, name, version = m.groups() + var_name = name.upper() + '_' + kind.upper() + if var_name in urns: + var_name += '_' + version.upper().replace('.', '_') + urns[var_name] = m.group(0) + open(out, 'w').write( + '# Autogenerated from common_urns.md\n' + + '# pylint: disable=line-too-long\n\n' + + '\n'.join('%s = "%s"' % urn + for urn in sorted(urns.items(), key=lambda kv: kv[1])) + + '\n') +generate_common_urns() + + setuptools.setup( name=PACKAGE_NAME, version=PACKAGE_VERSION, From 04c399c4d231849c90ed90b38620b12fd7f4e22b Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Thu, 1 Feb 2018 12:40:29 -0800 Subject: [PATCH 2/2] Revert URNs that are currently hard-coded in the Dataflow worker. --- .../org/apache/beam/model/common_urns.md | 20 ++++++++--- .../core/construction/CoderTranslation.java | 4 --- .../construction/PTransformTranslation.java | 2 +- .../runners/core/construction/UrnUtils.java | 2 +- .../WindowingStrategyTranslation.java | 10 +++--- runners/java-fn-execution/build.gradle | 1 - .../graph/LengthPrefixUnknownCoders.java | 33 ++++++++----------- sdks/python/setup.py | 3 +- 8 files changed, 37 insertions(+), 38 deletions(-) diff --git a/model/pipeline/src/main/resources/org/apache/beam/model/common_urns.md b/model/pipeline/src/main/resources/org/apache/beam/model/common_urns.md index 0c23a03329a2..f20c9e486bf5 100644 --- a/model/pipeline/src/main/resources/org/apache/beam/model/common_urns.md +++ b/model/pipeline/src/main/resources/org/apache/beam/model/common_urns.md @@ -25,7 +25,9 @@ URNs used in the Beam portability APIs. ## Core Transforms -### beam:transform:pardo:v1 +### urn:beam:transform:pardo:v1 + +TODO(BEAM-3595): Change this to beam:transform:pardo:v1. Represents Beam's parallel do operation. @@ -71,13 +73,21 @@ the SDK understands the last three combine helper operations. ## WindowFns -### beam:windowfn:global_windows:v1 +### beam:windowfn:global_windows:v0.1 + +TODO(BEAM-3595): Change this to beam:windowfn:global_windows:v1 + +### beam:windowfn:fixed_windows:v0.1 + +TODO(BEAM-3595): Change this to beam:windowfn:fixed_windows:v1 + +### beam:windowfn:sliding_windows:v0.1 -### beam:windowfn:fixed_windows:v1 +TODO(BEAM-3595): Change this to beam:windowfn:sliding_windows:v1 -### beam:windowfn:sliding_windows:v1 +### beam:windowfn:session_windows:v0.1 -### beam:windowfn:session_windows:v1 +TODO(BEAM-3595): Change this to beam:windowfn:session_windows:v1 ## Coders diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java index 486fff08dd8b..4806fb416b11 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java @@ -53,12 +53,8 @@ public class CoderTranslation { private static BiMap, String> loadCoderURNs() { ImmutableBiMap.Builder, String> coderUrns = ImmutableBiMap.builder(); for (CoderTranslatorRegistrar registrar : ServiceLoader.load(CoderTranslatorRegistrar.class)) { - System.out.println(registrar + " " + registrar.getCoderURNs()); coderUrns.putAll(registrar.getCoderURNs()); } - if (coderUrns.build().size() == 0) { - //throw new RuntimeException(); - } return coderUrns.build(); } diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java index 164d5526551a..f19f2a17ce73 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java @@ -53,7 +53,7 @@ public class PTransformTranslation { public static final String PAR_DO_TRANSFORM_URN = - validateCommonUrn("beam:transform:pardo:v1"); + validateCommonUrn("urn:beam:transform:pardo:v1"); public static final String FLATTEN_TRANSFORM_URN = validateCommonUrn("beam:transform:flatten:v1"); public static final String GROUP_BY_KEY_TRANSFORM_URN = diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UrnUtils.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UrnUtils.java index a65ee7b62dfc..3932390349bb 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UrnUtils.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UrnUtils.java @@ -30,7 +30,7 @@ public class UrnUtils { private static final String STANDARD_URNS_PATH = "/org/apache/beam/model/common_urns.md"; - private static final Pattern URN_REGEX = Pattern.compile("\\bbeam:\\S+:v[0-9.]+"); + private static final Pattern URN_REGEX = Pattern.compile("\\b(urn:)?beam:\\S+:v[0-9.]+"); private static final Set COMMON_URNS = extractUrnsFromPath(STANDARD_URNS_PATH); private static Set extractUrnsFromPath(String path) { diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java index 2f46b6fa503c..0da6f7e572b7 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java @@ -196,11 +196,11 @@ public static TimestampCombiner timestampCombinerFromProto(RunnerApi.OutputTime. } } - public static final String GLOBAL_WINDOWS_FN = "beam:windowfn:global_windows:v1"; - public static final String FIXED_WINDOWS_FN = "beam:windowfn:fixed_windows:v1"; - public static final String SLIDING_WINDOWS_FN = "beam:windowfn:sliding_windows:v1"; - public static final String SESSION_WINDOWS_FN = "beam:windowfn:session_windows:v1"; - { + public static final String GLOBAL_WINDOWS_FN = "beam:windowfn:global_windows:v0.1"; + public static final String FIXED_WINDOWS_FN = "beam:windowfn:fixed_windows:v0.1"; + public static final String SLIDING_WINDOWS_FN = "beam:windowfn:sliding_windows:v0.1"; + public static final String SESSION_WINDOWS_FN = "beam:windowfn:session_windows:v0.1"; + static { // Out-of-line to facilitate use in the case statements below. UrnUtils.validateCommonUrn(GLOBAL_WINDOWS_FN); UrnUtils.validateCommonUrn(FIXED_WINDOWS_FN); diff --git a/runners/java-fn-execution/build.gradle b/runners/java-fn-execution/build.gradle index 26c65e6775ff..b1aa9e8c0ca6 100644 --- a/runners/java-fn-execution/build.gradle +++ b/runners/java-fn-execution/build.gradle @@ -33,7 +33,6 @@ dependencies { compile library.java.guava shadow project(path: ":model:pipeline", configuration: "shadow") shadow project(path: ":model:fn-execution", configuration: "shadow") - shadow project(path: ":runners:core-construction-java", configuration: "shadow") shadow project(path: ":sdks:java:core", configuration: "shadow") shadow project(path: ":sdks:java:fn-execution", configuration: "shadow") shadow library.java.grpc_core diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/graph/LengthPrefixUnknownCoders.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/graph/LengthPrefixUnknownCoders.java index a1b7223324a2..a7b23ab72dde 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/graph/LengthPrefixUnknownCoders.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/graph/LengthPrefixUnknownCoders.java @@ -18,40 +18,33 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; -import java.util.Map; import java.util.Set; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.model.pipeline.v1.RunnerApi.Coder; import org.apache.beam.model.pipeline.v1.RunnerApi.Components; import org.apache.beam.model.pipeline.v1.RunnerApi.MessageWithComponents; -import org.apache.beam.runners.core.construction.ModelCoderRegistrar; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.LengthPrefixCoder; /** * Utilities for replacing or wrapping unknown coders with {@link LengthPrefixCoder}. * - *

TODO: Support a dynamic list of well known coders using either registration or manual listing. + *

TODO: Support a dynamic list of well known coders using either registration or manual listing, + * possibly from ModelCoderRegistrar. */ public class LengthPrefixUnknownCoders { - static { - assert new ModelCoderRegistrar().getCoderURNs().size() > 0; - } - private static final Map, String> MODEL_CODER_URNS = - new ModelCoderRegistrar().getCoderURNs(); - private static final String BYTES_CODER_TYPE = MODEL_CODER_URNS.get(ByteArrayCoder.class); - private static final String LENGTH_PREFIX_CODER_TYPE = MODEL_CODER_URNS.get(LengthPrefixCoder.class); + private static final String BYTES_CODER_TYPE = "beam:coder:bytes:v1"; + private static final String LENGTH_PREFIX_CODER_TYPE = "beam:coder:length_prefix:v1"; private static final Set WELL_KNOWN_CODER_URNS = - ImmutableSet.copyOf(MODEL_CODER_URNS.values()); - // ImmutableSet.of( -// BYTES_CODER_TYPE, -// "urn:beam:coders:kv:0.1", -// "urn:beam:coders:varint:0.1", -// "urn:beam:coders:interval_window:0.1", -// "urn:beam:coders:stream:0.1", -// LENGTH_PREFIX_CODER_TYPE, -// "urn:beam:coders:global_window:0.1", -// "urn:beam:coders:windowed_value:0.1"); + ImmutableSet.of( + BYTES_CODER_TYPE, + "beam:coder:kv:v1", + "beam:coder:varint:v1", + "beam:coder:interval_window:v1", + "beam:coder:stream:v1", + LENGTH_PREFIX_CODER_TYPE, + "beam:coder:global_window:v1", + "beam:coder:windowed_value:v1"); /** * Recursively traverse the coder tree and wrap the first unknown coder in every branch with a diff --git a/sdks/python/setup.py b/sdks/python/setup.py index d524b706bb49..722dd2c42341 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -161,7 +161,8 @@ def generate_common_urns(): if src_time > out_time: print 'Regenerating common_urns module.' urns = {} - for m in re.finditer(r'\bbeam:(\S+):(\S+):(v\S+)', open(src).read()): + for m in re.finditer( + r'\b(?:urn:)?beam:(\S+):(\S+):(v\S+)', open(src).read()): kind, name, version = m.groups() var_name = name.upper() + '_' + kind.upper() if var_name in urns: