From b4b0025acd695208d911febe1c480b491463592f Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Mon, 14 Feb 2022 14:16:26 -0500 Subject: [PATCH 01/25] [BEAM-10529] add java and generic components of nullable xlang tests --- .../model/fnexecution/v1/standard_coders.yaml | 24 +++++++++++++++++++ .../src/main/proto/beam_runner_api.proto | 8 +++++++ .../core/construction/CoderTranslators.java | 15 ++++++++++++ .../construction/ModelCoderRegistrar.java | 3 +++ .../core/construction/ModelCoders.java | 5 +++- .../core/construction/CommonCoderTest.java | 16 ++++++++++++- 6 files changed, 69 insertions(+), 2 deletions(-) diff --git a/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml b/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml index df119baaa481..4736ce83f8b9 100644 --- a/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml +++ b/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml @@ -496,3 +496,27 @@ coder: examples: "\u0080\u0000\u0001\u0052\u009a\u00a4\u009b\u0067\u0080\u0000\u0001\u0052\u009a\u00a4\u009b\u0068\u0080\u00dd\u00db\u0001" : {window: {end: 1454293425000, span: 3600000}} "\u007f\u00df\u003b\u0064\u005a\u001c\u00ad\u0075\u007f\u00df\u003b\u0064\u005a\u001c\u00ad\u0076\u00ed\u0002" : {window: {end: -9223372036854410, span: 365}} + + +--- +# code snippet: +# public static void main(String[] args) throws Exception{ +# Coder coder = NullableCoder.of(ByteArrayCoder.of()); +# byte[] bytes = CoderUtils.encodeToByteArray(coder, null); +# String str = new String(bytes, java.nio.charset.StandardCharsets.ISO_8859_1); +# StringBuilder example = new StringBuilder(); +# for(int i = 0; i < str.length(); i++){ +# example.append(CharUtils.unicodeEscaped(str.charAt(i))); +# } +# System.out.println(example); +# } + +coder: + urn: "beam:coder:nullable:v1" + components: [{urn: "beam:coder:bytes:v1"}] + contex: nested + +examples: + "\u0001\u0061\u0062\u0063" : "abc" + "\u0001\u006d\u006f\u0072\u0065\u0020\u0062\u0079\u0074\u0065\u0073" : "more bytes" + "\u0000" : null diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto index 539c9b83c350..a8d4be778594 100644 --- a/model/pipeline/src/main/proto/beam_runner_api.proto +++ b/model/pipeline/src/main/proto/beam_runner_api.proto @@ -1052,6 +1052,14 @@ message StandardCoders { // Components: the user key coder. // Experimental. SHARDED_KEY = 15 [(beam_urn) = "beam:coder:sharded_key:v1"]; + + //Wraps a coder of a potentially null value + // + // A Nullable coder encodes nullable values of wrapped coder value that does + // not tolerate null values. A Nullable coder uses exactly 1 byte per entry + // to indicate whether the value is null, then adds the encoding of the + // inner coder for non-null values. + NULLABLE = 17 [(beam_urn) = "beam:coder:nullable:v1"]; } } diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java index 1838fa692e1b..a32d42711dbb 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java @@ -27,6 +27,7 @@ import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.LengthPrefixCoder; +import org.apache.beam.sdk.coders.NullableCoder; import org.apache.beam.sdk.coders.RowCoder; import org.apache.beam.sdk.coders.TimestampPrefixingWindowCoder; import org.apache.beam.sdk.schemas.Schema; @@ -204,6 +205,20 @@ public List> getComponents(TimestampPrefixingWindowCoder f }; } + static CoderTranslator> nullable(){ + return new SimpleStructuredCoderTranslator>() { + @Override + protected NullableCoder fromComponents(List> components) { + return NullableCoder.of(components.get(0)); + } + + @Override + public List> getComponents(NullableCoder from) { + return from.getComponents(); + } + }; + } + public abstract static class SimpleStructuredCoderTranslator> implements CoderTranslator { @Override diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java index 1fc8379977e0..e35dfd239549 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java @@ -30,6 +30,7 @@ import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.LengthPrefixCoder; +import org.apache.beam.sdk.coders.NullableCoder; import org.apache.beam.sdk.coders.RowCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.TimestampPrefixingWindowCoder; @@ -76,6 +77,7 @@ public class ModelCoderRegistrar implements CoderTranslatorRegistrar { .put(RowCoder.class, ModelCoders.ROW_CODER_URN) .put(ShardedKey.Coder.class, ModelCoders.SHARDED_KEY_CODER_URN) .put(TimestampPrefixingWindowCoder.class, ModelCoders.CUSTOM_WINDOW_CODER_URN) + .put(NullableCoder.class, ModelCoders.NULLABLE_CODER_URN) .build(); public static final Set WELL_KNOWN_CODER_URNS = BEAM_MODEL_CODER_URNS.values(); @@ -99,6 +101,7 @@ public class ModelCoderRegistrar implements CoderTranslatorRegistrar { .put(RowCoder.class, CoderTranslators.row()) .put(ShardedKey.Coder.class, CoderTranslators.shardedKey()) .put(TimestampPrefixingWindowCoder.class, CoderTranslators.timestampPrefixingWindow()) + .put(NullableCoder.class, CoderTranslators.nullable()) .build(); static { diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java index b616ffab462e..bc0ec755f4cc 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java @@ -67,6 +67,8 @@ private ModelCoders() {} public static final String SHARDED_KEY_CODER_URN = getUrn(StandardCoders.Enum.SHARDED_KEY); + public static final String NULLABLE_CODER_URN = getUrn(StandardCoders.Enum.NULLABLE); + static { checkState( STATE_BACKED_ITERABLE_CODER_URN.equals(getUrn(StandardCoders.Enum.STATE_BACKED_ITERABLE))); @@ -90,7 +92,8 @@ private ModelCoders() {} ROW_CODER_URN, PARAM_WINDOWED_VALUE_CODER_URN, STATE_BACKED_ITERABLE_CODER_URN, - SHARDED_KEY_CODER_URN); + SHARDED_KEY_CODER_URN, + NULLABLE_CODER_URN); public static Set urns() { return MODEL_CODER_URNS; diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java index bf387ce576cd..59c15beaaa97 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java @@ -57,6 +57,7 @@ import org.apache.beam.sdk.coders.DoubleCoder; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.NullableCoder; import org.apache.beam.sdk.coders.RowCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.TimestampPrefixingWindowCoder; @@ -118,6 +119,7 @@ public class CommonCoderTest { .put(getUrn(StandardCoders.Enum.ROW), RowCoder.class) .put(getUrn(StandardCoders.Enum.SHARDED_KEY), ShardedKey.Coder.class) .put(getUrn(StandardCoders.Enum.CUSTOM_WINDOW), TimestampPrefixingWindowCoder.class) + .put(getUrn(StandardCoders.Enum.NULLABLE), NullableCoder.class) .build(); @AutoValue @@ -171,7 +173,7 @@ abstract static class OneCoderTestSpec { @SuppressWarnings("mutable") abstract byte[] getSerialized(); - abstract Object getValue(); + abstract @Nullable Object getValue(); static OneCoderTestSpec create( CommonCoder coder, boolean nested, byte[] serialized, Object value) { @@ -351,6 +353,16 @@ private static Object convertValue(Object value, CommonCoder coderSpec, Coder co Map kvMap = (Map) value; Coder windowCoder = ((TimestampPrefixingWindowCoder) coder).getWindowCoder(); return convertValue(kvMap.get("window"), coderSpec.getComponents().get(0), windowCoder); + } else if (s.equals(getUrn(StandardCoders.Enum.NULLABLE))) { + if (coderSpec.getComponents().size() == 1 && coderSpec.getComponents().get(0).getUrn().equals(getUrn(StandardCoders.Enum.BYTES))){ + if (value == null){ + return null; + } else { + return ((String) value).getBytes(StandardCharsets.ISO_8859_1); + } + } else { + throw new IllegalStateException("Unknown or missing nested coder for nullable coder"); + } } else { throw new IllegalStateException("Unknown coder URN: " + coderSpec.getUrn()); } @@ -510,6 +522,8 @@ private void verifyDecodedValue(CommonCoder coder, Object expectedValue, Object assertEquals(expectedValue, actualValue); } else if (s.equals(getUrn(StandardCoders.Enum.CUSTOM_WINDOW))) { assertEquals(expectedValue, actualValue); + } else if (s.equals(getUrn(StandardCoders.Enum.NULLABLE))){ + assertThat(expectedValue, equalTo(actualValue)); } else { throw new IllegalStateException("Unknown coder URN: " + coder.getUrn()); } From 798fd532832ebee9efbd08af3f2a2247537977bb Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Mon, 14 Feb 2022 14:42:51 -0500 Subject: [PATCH 02/25] [BEAM-10529] fix test case --- .../model/fnexecution/v1/standard_coders.yaml | 18 +++--------------- 1 file changed, 3 insertions(+), 15 deletions(-) diff --git a/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml b/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml index 4736ce83f8b9..7e17c27ce808 100644 --- a/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml +++ b/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml @@ -499,24 +499,12 @@ examples: --- -# code snippet: -# public static void main(String[] args) throws Exception{ -# Coder coder = NullableCoder.of(ByteArrayCoder.of()); -# byte[] bytes = CoderUtils.encodeToByteArray(coder, null); -# String str = new String(bytes, java.nio.charset.StandardCharsets.ISO_8859_1); -# StringBuilder example = new StringBuilder(); -# for(int i = 0; i < str.length(); i++){ -# example.append(CharUtils.unicodeEscaped(str.charAt(i))); -# } -# System.out.println(example); -# } - coder: urn: "beam:coder:nullable:v1" components: [{urn: "beam:coder:bytes:v1"}] - contex: nested +nested: true examples: - "\u0001\u0061\u0062\u0063" : "abc" - "\u0001\u006d\u006f\u0072\u0065\u0020\u0062\u0079\u0074\u0065\u0073" : "more bytes" + "\u0001\u0003\u0061\u0062\u0063" : "abc" + "\u0001\u000a\u006d\u006f\u0072\u0065\u0020\u0062\u0079\u0074\u0065\u0073" : "more bytes" "\u0000" : null From 87435496681abc9605a2d66c6284cc8cf0c0c256 Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Tue, 22 Feb 2022 12:33:44 -0500 Subject: [PATCH 03/25] [BEAM-10529] add coders and typehints to support nullable xlang coders --- sdks/python/apache_beam/coders/coders.py | 32 +++++++++++++------ .../coders/standard_coders_test.py | 4 ++- sdks/python/apache_beam/coders/typecoders.py | 5 ++- .../python/apache_beam/typehints/typehints.py | 3 ++ 4 files changed, 32 insertions(+), 12 deletions(-) diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index d1ceef70b6ed..18c99e389868 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -326,10 +326,10 @@ def register_urn( @classmethod @overload def register_urn( - cls, - urn, # type: str - parameter_type, # type: Optional[Type[T]] - fn # type: Callable[[T, List[Coder], PipelineContext], Any] + cls, + urn, # type: str + parameter_type, # type: Optional[Type[T]] + fn # type: Callable[[T, List[Coder], PipelineContext], Any] ): # type: (...) -> None pass @@ -572,6 +572,16 @@ def _create_impl(self): def to_type_hint(self): return typehints.Optional[self._value_coder.to_type_hint()] + def _get_component_coders(self): + # type: () -> List[Coder] + return [self._value_coder] + + @classmethod + def from_type_hint(cls, typehint, registry): + value_type = list( + filter(lambda t: t is not type(None), typehint._inner_types()))[0] + return cls(registry.get_coder(value_type)) + def is_deterministic(self): # type: () -> bool return self._value_coder.is_deterministic() @@ -584,6 +594,9 @@ def __hash__(self): return hash(type(self)) + hash(self._value_coder) +Coder.register_structured_urn(common_urns.coders.NULLABLE.urn, NullableCoder) + + class VarIntCoder(FastCoder): """Variable-length integer coder.""" def _create_impl(self): @@ -1489,15 +1502,14 @@ def __hash__(self): class StateBackedIterableCoder(FastCoder): - DEFAULT_WRITE_THRESHOLD = 1 def __init__( - self, - element_coder, # type: Coder - read_state=None, # type: Optional[coder_impl.IterableStateReader] - write_state=None, # type: Optional[coder_impl.IterableStateWriter] - write_state_threshold=DEFAULT_WRITE_THRESHOLD): + self, + element_coder, # type: Coder + read_state=None, # type: Optional[coder_impl.IterableStateReader] + write_state=None, # type: Optional[coder_impl.IterableStateWriter] + write_state_threshold=DEFAULT_WRITE_THRESHOLD): self._element_coder = element_coder self._read_state = read_state self._write_state = write_state diff --git a/sdks/python/apache_beam/coders/standard_coders_test.py b/sdks/python/apache_beam/coders/standard_coders_test.py index acec22a46a55..6ef4bb13e7f0 100644 --- a/sdks/python/apache_beam/coders/standard_coders_test.py +++ b/sdks/python/apache_beam/coders/standard_coders_test.py @@ -191,7 +191,9 @@ class StandardCodersTest(unittest.TestCase): value_parser: ShardedKey( key=value_parser(x['key']), shard_id=x['shardId'].encode('utf-8')), 'beam:coder:custom_window:v1': lambda x, - window_parser: window_parser(x['window']) + window_parser: window_parser(x['window']), + 'beam:coder:nullable:v1': lambda x, + value_parser: x.encode('utf-8') if x else None } def test_standard_coders(self): diff --git a/sdks/python/apache_beam/coders/typecoders.py b/sdks/python/apache_beam/coders/typecoders.py index 3bf2d15e7874..0b528be832ec 100644 --- a/sdks/python/apache_beam/coders/typecoders.py +++ b/sdks/python/apache_beam/coders/typecoders.py @@ -65,7 +65,6 @@ def MakeXyzs(v): """ # pytype: skip-file - from typing import Any from typing import Dict from typing import Iterable @@ -138,6 +137,10 @@ def get_coder(self, typehint): return coders.IterableCoder.from_type_hint(typehint, self) elif isinstance(typehint, typehints.ListConstraint): return coders.ListCoder.from_type_hint(typehint, self) + elif isinstance(typehint, + typehints.UnionConstraint) and typehint.contains_type( + type(None)): + return coders.NullableCoder.from_type_hint(typehint, self) elif typehint is None: # In some old code, None is used for Any. # TODO(robertwb): Clean this up. diff --git a/sdks/python/apache_beam/typehints/typehints.py b/sdks/python/apache_beam/typehints/typehints.py index 796178c299f8..d58a1f9ddabf 100644 --- a/sdks/python/apache_beam/typehints/typehints.py +++ b/sdks/python/apache_beam/typehints/typehints.py @@ -507,6 +507,9 @@ def _inner_types(self): for t in self.union_types: yield t + def contains_type(self, maybe_type): + return maybe_type in self.union_types + def _consistent_with_check_(self, sub): if isinstance(sub, UnionConstraint): # A union type is compatible if every possible type is compatible. From 1ce66059a9708f049c013a867225ac20c4529851 Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Tue, 22 Feb 2022 13:09:58 -0500 Subject: [PATCH 04/25] [BEAM-10529] update external builder to support nullable coder --- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 8164daf0be76..3c9384b59b29 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -48,6 +48,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.NullableCoder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.coders.VoidCoder; @@ -763,11 +764,11 @@ private static Coder resolveCoder(Class deserializer) { continue; } if (returnType.equals(byte[].class)) { - return ByteArrayCoder.of(); + return NullableCoder.of(ByteArrayCoder.of()); } else if (returnType.equals(Integer.class)) { - return VarIntCoder.of(); + return NullableCoder.of(VarIntCoder.of()); } else if (returnType.equals(Long.class)) { - return VarLongCoder.of(); + return NullableCoder.of(VarLongCoder.of()); } else { throw new RuntimeException("Couldn't infer Coder from " + deserializer); } @@ -1701,15 +1702,15 @@ public PTransform> buildExternal( Class keyDeserializer = resolveClass(config.keyDeserializer); Coder keyCoder = Read.Builder.resolveCoder(keyDeserializer); - if (!(keyCoder instanceof ByteArrayCoder)) { + if (!(keyCoder instanceof NullableCoder && keyCoder.getCoderArguments().get(0) instanceof ByteArrayCoder)) { throw new RuntimeException( - "ExternalWithMetadata transform only supports keys of type byte[]"); + "ExternalWithMetadata transform only supports keys of type nullable(byte[])"); } Class valueDeserializer = resolveClass(config.valueDeserializer); Coder valueCoder = Read.Builder.resolveCoder(valueDeserializer); - if (!(valueCoder instanceof ByteArrayCoder)) { + if (!(valueCoder instanceof NullableCoder && valueCoder.getCoderArguments().get(0) instanceof ByteArrayCoder)) { throw new RuntimeException( - "ExternalWithMetadata transform only supports values of type byte[]"); + "ExternalWithMetadata transform only supports values of type nullable(byte[])"); } return readBuilder.build().externalWithMetadata(); From a6a38d0ccc9ee911a6747195b88f688245c5c1ed Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Tue, 22 Feb 2022 16:01:44 -0500 Subject: [PATCH 05/25] [BEAM-10529] clean up coders.py --- sdks/python/apache_beam/coders/coders.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index 18c99e389868..0a191c4d579c 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -326,10 +326,10 @@ def register_urn( @classmethod @overload def register_urn( - cls, - urn, # type: str - parameter_type, # type: Optional[Type[T]] - fn # type: Callable[[T, List[Coder], PipelineContext], Any] + cls, + urn, # type: str + parameter_type, # type: Optional[Type[T]] + fn # type: Callable[[T, List[Coder], PipelineContext], Any] ): # type: (...) -> None pass @@ -1505,11 +1505,11 @@ class StateBackedIterableCoder(FastCoder): DEFAULT_WRITE_THRESHOLD = 1 def __init__( - self, - element_coder, # type: Coder - read_state=None, # type: Optional[coder_impl.IterableStateReader] - write_state=None, # type: Optional[coder_impl.IterableStateWriter] - write_state_threshold=DEFAULT_WRITE_THRESHOLD): + self, + element_coder, # type: Coder + read_state=None, # type: Optional[coder_impl.IterableStateReader] + write_state=None, # type: Optional[coder_impl.IterableStateWriter] + write_state_threshold=DEFAULT_WRITE_THRESHOLD): self._element_coder = element_coder self._read_state = read_state self._write_state = write_state From e3e02235a5afe4a8ce6e3bfd6ddc05e73a773528 Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Tue, 22 Feb 2022 16:57:04 -0500 Subject: [PATCH 06/25] [BEAM-10529] add coder translation test --- .../beam/runners/core/construction/CoderTranslationTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java index 544f43d0f0f4..f759ebede63c 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java @@ -42,6 +42,7 @@ import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.LengthPrefixCoder; +import org.apache.beam.sdk.coders.NullableCoder; import org.apache.beam.sdk.coders.RowCoder; import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -97,6 +98,7 @@ public class CoderTranslationTest { Field.of("bar", FieldType.logicalType(FixedBytes.of(123)))))) .add(ShardedKey.Coder.of(StringUtf8Coder.of())) .add(TimestampPrefixingWindowCoder.of(IntervalWindowCoder.of())) + .add(NullableCoder.of(ByteArrayCoder.of())) .build(); /** From 30f06d69dfda19be1e3014913269e1a33d258ca0 Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Wed, 23 Feb 2022 10:41:57 -0500 Subject: [PATCH 07/25] [BEAM-10529] add additional check to typecoder to not accidentally misidentify coders as nullable --- sdks/python/apache_beam/coders/typecoders.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/coders/typecoders.py b/sdks/python/apache_beam/coders/typecoders.py index 0b528be832ec..49140cef9506 100644 --- a/sdks/python/apache_beam/coders/typecoders.py +++ b/sdks/python/apache_beam/coders/typecoders.py @@ -137,9 +137,9 @@ def get_coder(self, typehint): return coders.IterableCoder.from_type_hint(typehint, self) elif isinstance(typehint, typehints.ListConstraint): return coders.ListCoder.from_type_hint(typehint, self) - elif isinstance(typehint, - typehints.UnionConstraint) and typehint.contains_type( - type(None)): + elif (isinstance(typehint, typehints.UnionConstraint) and + typehint.contains_type(type(None) and + len(list(typehint._inner_types())) == 2)): return coders.NullableCoder.from_type_hint(typehint, self) elif typehint is None: # In some old code, None is used for Any. From 24dd8767553d1fc89dc49bb364e012373d94e2e7 Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Wed, 23 Feb 2022 13:08:32 -0500 Subject: [PATCH 08/25] [BEAM-10529] add test to retrieve nullable coder from typehint --- sdks/python/apache_beam/coders/typecoders_test.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sdks/python/apache_beam/coders/typecoders_test.py b/sdks/python/apache_beam/coders/typecoders_test.py index 02f4565c5e2d..5727127e1ef8 100644 --- a/sdks/python/apache_beam/coders/typecoders_test.py +++ b/sdks/python/apache_beam/coders/typecoders_test.py @@ -140,6 +140,14 @@ def test_list_coder(self): self.assertIs( list, type(expected_coder.decode(expected_coder.encode(values)))) + def test_nullable_coder(self): + expected_coder = coders.NullableCoder(coders.BytesCoder()) + real_coder = typecoders.registry.get_coder( + typehints.UnionConstraint([type(None), type(bytes)])) + self.assertEqual(expected_coder, real_coder) + self.assertEqual(expected_coder.encode(None), real_coder.encode(None)) + self.assertEqual(expected_coder.encode(b'abc'), real_coder.encode(b'abc')) + if __name__ == '__main__': unittest.main() From e642e7c437ac10b1aaf1f6d2217e37b9e2ebb70e Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Wed, 23 Feb 2022 13:35:36 -0500 Subject: [PATCH 09/25] [BEAM-10529] run spotless --- .../beam/runners/core/construction/CoderTranslators.java | 2 +- .../beam/runners/fnexecution/wire/CommonCoderTest.java | 7 ++++--- .../main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 6 ++++-- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java index a32d42711dbb..79a04a01d0be 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java @@ -205,7 +205,7 @@ public List> getComponents(TimestampPrefixingWindowCoder f }; } - static CoderTranslator> nullable(){ + static CoderTranslator> nullable() { return new SimpleStructuredCoderTranslator>() { @Override protected NullableCoder fromComponents(List> components) { diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/wire/CommonCoderTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/wire/CommonCoderTest.java index cdae6551c5d8..ca5274a358bf 100644 --- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/wire/CommonCoderTest.java +++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/wire/CommonCoderTest.java @@ -385,8 +385,9 @@ private static Object convertValue(Object value, CommonCoder coderSpec, Coder co Coder windowCoder = ((TimestampPrefixingWindowCoder) coder).getWindowCoder(); return convertValue(kvMap.get("window"), coderSpec.getComponents().get(0), windowCoder); } else if (s.equals(getUrn(StandardCoders.Enum.NULLABLE))) { - if (coderSpec.getComponents().size() == 1 && coderSpec.getComponents().get(0).getUrn().equals(getUrn(StandardCoders.Enum.BYTES))){ - if (value == null){ + if (coderSpec.getComponents().size() == 1 + && coderSpec.getComponents().get(0).getUrn().equals(getUrn(StandardCoders.Enum.BYTES))) { + if (value == null) { return null; } else { return ((String) value).getBytes(StandardCharsets.ISO_8859_1); @@ -587,7 +588,7 @@ private void verifyDecodedValue(CommonCoder coder, Object expectedValue, Object assertEquals(expectedValue, actualValue); } else if (s.equals(getUrn(StandardCoders.Enum.CUSTOM_WINDOW))) { assertEquals(expectedValue, actualValue); - } else if (s.equals(getUrn(StandardCoders.Enum.NULLABLE))){ + } else if (s.equals(getUrn(StandardCoders.Enum.NULLABLE))) { assertThat(expectedValue, equalTo(actualValue)); } else { throw new IllegalStateException("Unknown coder URN: " + coder.getUrn()); diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 5540b80f6e27..91497c8d068c 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -1715,13 +1715,15 @@ public PTransform> buildExternal( Class keyDeserializer = resolveClass(config.keyDeserializer); Coder keyCoder = Read.Builder.resolveCoder(keyDeserializer); - if (!(keyCoder instanceof NullableCoder && keyCoder.getCoderArguments().get(0) instanceof ByteArrayCoder)) { + if (!(keyCoder instanceof NullableCoder + && keyCoder.getCoderArguments().get(0) instanceof ByteArrayCoder)) { throw new RuntimeException( "ExternalWithMetadata transform only supports keys of type nullable(byte[])"); } Class valueDeserializer = resolveClass(config.valueDeserializer); Coder valueCoder = Read.Builder.resolveCoder(valueDeserializer); - if (!(valueCoder instanceof NullableCoder && valueCoder.getCoderArguments().get(0) instanceof ByteArrayCoder)) { + if (!(valueCoder instanceof NullableCoder + && valueCoder.getCoderArguments().get(0) instanceof ByteArrayCoder)) { throw new RuntimeException( "ExternalWithMetadata transform only supports values of type nullable(byte[])"); } From 4dd1c54813960cbf1710be7f3198e087db91cf27 Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Tue, 8 Mar 2022 15:28:26 -0500 Subject: [PATCH 10/25] [BEAM-10529] add go nullable coder --- sdks/go/pkg/beam/core/graph/coder/coder.go | 24 +++++- .../pkg/beam/core/graph/coder/coder_test.go | 60 ++++++++++++++ sdks/go/pkg/beam/core/graph/coder/map.go | 32 ------- sdks/go/pkg/beam/core/graph/coder/map_test.go | 4 +- sdks/go/pkg/beam/core/graph/coder/nil.go | 38 +++++++++ sdks/go/pkg/beam/core/graph/coder/nil_test.go | 83 +++++++++++++++++++ .../pkg/beam/core/graph/coder/row_decoder.go | 2 +- .../pkg/beam/core/graph/coder/row_encoder.go | 2 +- sdks/go/pkg/beam/core/runtime/exec/plan.go | 1 + sdks/go/pkg/beam/core/runtime/graphx/coder.go | 19 ++++- sdks/go/pkg/beam/core/typex/fulltype.go | 2 + sdks/go/pkg/beam/core/typex/special.go | 3 + 12 files changed, 231 insertions(+), 39 deletions(-) create mode 100644 sdks/go/pkg/beam/core/graph/coder/nil.go create mode 100644 sdks/go/pkg/beam/core/graph/coder/nil_test.go diff --git a/sdks/go/pkg/beam/core/graph/coder/coder.go b/sdks/go/pkg/beam/core/graph/coder/coder.go index 6eea66b0d317..ef7446ee5d2f 100644 --- a/sdks/go/pkg/beam/core/graph/coder/coder.go +++ b/sdks/go/pkg/beam/core/graph/coder/coder.go @@ -169,6 +169,7 @@ const ( VarInt Kind = "varint" Double Kind = "double" Row Kind = "R" + Nullable Kind = "N" Timer Kind = "T" PaneInfo Kind = "PI" WindowedValue Kind = "W" @@ -198,7 +199,7 @@ type Coder struct { Kind Kind T typex.FullType - Components []*Coder // WindowedValue, KV, CoGBK + Components []*Coder // WindowedValue, KV, CoGBK, Nullable Custom *CustomCoder // Custom Window *WindowCoder // WindowedValue @@ -260,7 +261,7 @@ func (c *Coder) String() string { switch c.Kind { case WindowedValue, ParamWindowedValue, Window, Timer: ret += fmt.Sprintf("!%v", c.Window) - case KV, CoGBK, Bytes, Bool, VarInt, Double, String, LP: // No additional info. + case KV, CoGBK, Bytes, Bool, VarInt, Double, String, LP, Nullable: // No additional info. default: ret += fmt.Sprintf("[%v]", c.T) } @@ -394,6 +395,19 @@ func NewKV(components []*Coder) *Coder { } } +func NewN(component *Coder) *Coder { + checkCoderNotNil(component, "Nullable") + return &Coder{ + Kind: Nullable, + T: typex.New(typex.NullableType, component.T), + Components: []*Coder{component}, + } +} + +func IsNullable(c *Coder) bool { + return c.Kind == Nullable +} + // IsCoGBK returns true iff the coder is for a CoGBK type. func IsCoGBK(c *Coder) bool { return c.Kind == CoGBK @@ -440,3 +454,9 @@ func checkCodersNotNil(list []*Coder) { } } } + +func checkCoderNotNil(c *Coder, outercoder string) { + if c == nil { + panic(fmt.Sprintf("nil inner coder for %v coder", outercoder)) + } +} diff --git a/sdks/go/pkg/beam/core/graph/coder/coder_test.go b/sdks/go/pkg/beam/core/graph/coder/coder_test.go index 762ed848f589..efdca9698c65 100644 --- a/sdks/go/pkg/beam/core/graph/coder/coder_test.go +++ b/sdks/go/pkg/beam/core/graph/coder/coder_test.go @@ -168,6 +168,9 @@ func TestCoder_String(t *testing.T) { }, { want: "KV", c: NewKV([]*Coder{bytes, ints}), + }, { + want: "N", + c: NewN(bytes), }, { want: "CoGBK", c: NewCoGBK([]*Coder{bytes, ints, bytes}), @@ -277,6 +280,10 @@ func TestCoder_Equals(t *testing.T) { want: true, a: NewKV([]*Coder{custom1, ints}), b: NewKV([]*Coder{customSame, ints}), + }, { + want: true, + a: NewN(custom1), + b: NewN(customSame), }, { want: true, a: NewCoGBK([]*Coder{custom1, ints, customSame}), @@ -517,6 +524,59 @@ func TestNewKV(t *testing.T) { } } +func TestNewNullable(t *testing.T) { + bytes := NewBytes() + + tests := []struct { + name string + component *Coder + shouldpanic bool + want *Coder + }{{ + name: "nil", + component: nil, + shouldpanic: true, + }, + { + name: "empty", + component: &Coder{}, + shouldpanic: true, + }, + { + name: "bytes", + component: bytes, + shouldpanic: false, + want: &Coder{ + Kind: Nullable, + T: typex.New(typex.NullableType, bytes.T), + Components: []*Coder{bytes}, + }, + }, + } + + for _, test := range tests { + test := test + t.Run(test.name, func(t *testing.T) { + if test.shouldpanic { + defer func() { + if p := recover(); p != nil { + t.Log(p) + return + } + t.Fatalf("NewNullable(%v): want panic", test.component) + }() + } + got := NewN(test.component) + if !IsNullable(got) { + t.Errorf("IsNullable(%v) = false, want true", got) + } + if test.want != nil && !test.want.Equals(got) { + t.Fatalf("NewNullable(%v) = %v, want %v", test.component, got, test.want) + } + }) + } +} + func TestNewCoGBK(t *testing.T) { bytes := NewBytes() ints := NewVarInt() diff --git a/sdks/go/pkg/beam/core/graph/coder/map.go b/sdks/go/pkg/beam/core/graph/coder/map.go index 2d72446bf444..30eee7b008a0 100644 --- a/sdks/go/pkg/beam/core/graph/coder/map.go +++ b/sdks/go/pkg/beam/core/graph/coder/map.go @@ -62,24 +62,6 @@ func mapDecoder(rt reflect.Type, decodeToKey, decodeToElem typeDecoderFieldRefle } } -// containerNilDecoder handles when a value is nillable for map or iterable components. -// Nillable types have an extra byte prefixing them indicating nil status. -func containerNilDecoder(decodeToElem func(reflect.Value, io.Reader) error) func(reflect.Value, io.Reader) error { - return func(ret reflect.Value, r io.Reader) error { - hasValue, err := DecodeBool(r) - if err != nil { - return err - } - if !hasValue { - return nil - } - if err := decodeToElem(ret, r); err != nil { - return err - } - return nil - } -} - // mapEncoder reflectively encodes a map or array type using the beam map encoding. func mapEncoder(rt reflect.Type, encodeKey, encodeValue typeEncoderFieldReflect) func(reflect.Value, io.Writer) error { return func(rv reflect.Value, w io.Writer) error { @@ -132,17 +114,3 @@ func mapEncoder(rt reflect.Type, encodeKey, encodeValue typeEncoderFieldReflect) return nil } } - -// containerNilEncoder handles when a value is nillable for map or iterable components. -// Nillable types have an extra byte prefixing them indicating nil status. -func containerNilEncoder(encodeElem func(reflect.Value, io.Writer) error) func(reflect.Value, io.Writer) error { - return func(rv reflect.Value, w io.Writer) error { - if rv.IsNil() { - return EncodeBool(false, w) - } - if err := EncodeBool(true, w); err != nil { - return err - } - return encodeElem(rv, w) - } -} diff --git a/sdks/go/pkg/beam/core/graph/coder/map_test.go b/sdks/go/pkg/beam/core/graph/coder/map_test.go index ee4c35afa609..3291f7fbd5a2 100644 --- a/sdks/go/pkg/beam/core/graph/coder/map_test.go +++ b/sdks/go/pkg/beam/core/graph/coder/map_test.go @@ -38,8 +38,8 @@ func TestEncodeDecodeMap(t *testing.T) { v.Set(reflect.New(reflectx.Uint8)) return byteDec(v.Elem(), r) } - byteCtnrPtrEnc := containerNilEncoder(bytePtrEnc) - byteCtnrPtrDec := containerNilDecoder(bytePtrDec) + byteCtnrPtrEnc := NullableEncoder(bytePtrEnc) + byteCtnrPtrDec := NullableDecoder(bytePtrDec) ptrByte := byte(42) diff --git a/sdks/go/pkg/beam/core/graph/coder/nil.go b/sdks/go/pkg/beam/core/graph/coder/nil.go new file mode 100644 index 000000000000..eb4bc7e1b4d3 --- /dev/null +++ b/sdks/go/pkg/beam/core/graph/coder/nil.go @@ -0,0 +1,38 @@ +package coder + +import ( + "io" + "reflect" +) + +// NullableDecoder handles when a value is nillable. +// Nillable types have an extra byte prefixing them indicating nil status. +func NullableDecoder(decodeToElem func(reflect.Value, io.Reader) error) func(reflect.Value, io.Reader) error { + return func(ret reflect.Value, r io.Reader) error { + hasValue, err := DecodeBool(r) + if err != nil { + return err + } + if !hasValue { + return nil + } + if err := decodeToElem(ret, r); err != nil { + return err + } + return nil + } +} + +// NullableEncoder handles when a value is nillable. +// Nillable types have an extra byte prefixing them indicating nil status. +func NullableEncoder(encodeElem func(reflect.Value, io.Writer) error) func(reflect.Value, io.Writer) error { + return func(rv reflect.Value, w io.Writer) error { + if rv.IsNil() { + return EncodeBool(false, w) + } + if err := EncodeBool(true, w); err != nil { + return err + } + return encodeElem(rv, w) + } +} diff --git a/sdks/go/pkg/beam/core/graph/coder/nil_test.go b/sdks/go/pkg/beam/core/graph/coder/nil_test.go new file mode 100644 index 000000000000..f3def723193e --- /dev/null +++ b/sdks/go/pkg/beam/core/graph/coder/nil_test.go @@ -0,0 +1,83 @@ +package coder + +import ( + "bytes" + "fmt" + "io" + "reflect" + "testing" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx" + "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" + "github.com/google/go-cmp/cmp" +) + +func TestEncodeDecodeNullable(t *testing.T) { + byteEnc := func(v reflect.Value, w io.Writer) error { + return EncodeByte(byte(v.Uint()), w) + } + byteDec := func(v reflect.Value, r io.Reader) error { + b, err := DecodeByte(r) + if err != nil { + return errors.Wrap(err, "error decoding single byte field") + } + v.SetUint(uint64(b)) + return nil + } + bytePtrEnc := func(v reflect.Value, w io.Writer) error { + return byteEnc(v.Elem(), w) + } + bytePtrDec := func(v reflect.Value, r io.Reader) error { + v.Set(reflect.New(reflectx.Uint8)) + return byteDec(v.Elem(), r) + } + byteCtnrPtrEnc := NullableEncoder(bytePtrEnc) + byteCtnrPtrDec := NullableDecoder(bytePtrDec) + + tests := []struct { + decoded interface{} + encoded []byte + }{ + { + decoded: (*byte)(nil), + encoded: []byte{0}, + }, + { + decoded: create(10), + encoded: []byte{1, 10}, + }, + { + decoded: create(20), + encoded: []byte{1, 20}, + }, + } + + for _, test := range tests { + t.Run(fmt.Sprintf("encode %q", test.encoded), func(t *testing.T) { + var buf bytes.Buffer + encErr := byteCtnrPtrEnc(reflect.ValueOf(test.decoded), &buf) + if encErr != nil { + t.Fatalf("NullableEncoder(%q) = %v", test.decoded, encErr) + } + if d := cmp.Diff(test.encoded, buf.Bytes()); d != "" { + t.Errorf("NullableEncoder(%q) = %v, want %v diff(-want,+got):\n %v", test.decoded, buf.Bytes(), test.encoded, d) + } + }) + t.Run(fmt.Sprintf("decode %q", test.decoded), func(t *testing.T) { + buf := bytes.NewBuffer(test.encoded) + rv := reflect.New(reflect.TypeOf(test.decoded)).Elem() + decErr := byteCtnrPtrDec(rv, buf) + if decErr != nil { + t.Fatalf("NullableDecoder(%q) = %v", test.encoded, decErr) + } + if d := cmp.Diff(test.decoded, rv.Interface()); d != "" { + t.Errorf("NullableDecoder (%q) = %q, want %v diff(-want,+got):\n %v", test.encoded, rv.Interface(), test.decoded, d) + } + }) + } + +} + +func create(x byte) *byte { + return &x +} diff --git a/sdks/go/pkg/beam/core/graph/coder/row_decoder.go b/sdks/go/pkg/beam/core/graph/coder/row_decoder.go index 1e1fcad32154..9688ed9876c4 100644 --- a/sdks/go/pkg/beam/core/graph/coder/row_decoder.go +++ b/sdks/go/pkg/beam/core/graph/coder/row_decoder.go @@ -386,7 +386,7 @@ func (b *RowDecoderBuilder) containerDecoderForType(t reflect.Type) (typeDecoder return typeDecoderFieldReflect{}, err } if t.Kind() == reflect.Ptr { - return typeDecoderFieldReflect{decode: containerNilDecoder(dec.decode), addr: dec.addr}, nil + return typeDecoderFieldReflect{decode: NullableDecoder(dec.decode), addr: dec.addr}, nil } return dec, nil } diff --git a/sdks/go/pkg/beam/core/graph/coder/row_encoder.go b/sdks/go/pkg/beam/core/graph/coder/row_encoder.go index e12776459da8..cfc1a8e51a3d 100644 --- a/sdks/go/pkg/beam/core/graph/coder/row_encoder.go +++ b/sdks/go/pkg/beam/core/graph/coder/row_encoder.go @@ -262,7 +262,7 @@ func (b *RowEncoderBuilder) containerEncoderForType(t reflect.Type) (typeEncoder return typeEncoderFieldReflect{}, err } if t.Kind() == reflect.Ptr { - return typeEncoderFieldReflect{encode: containerNilEncoder(encf.encode), addr: encf.addr}, nil + return typeEncoderFieldReflect{encode: NullableEncoder(encf.encode), addr: encf.addr}, nil } return encf, nil } diff --git a/sdks/go/pkg/beam/core/runtime/exec/plan.go b/sdks/go/pkg/beam/core/runtime/exec/plan.go index 7f89ce37322c..a3dc3e910225 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/plan.go +++ b/sdks/go/pkg/beam/core/runtime/exec/plan.go @@ -52,6 +52,7 @@ func NewPlan(id string, units []Unit) (*Plan, error) { } if r, ok := u.(Root); ok { roots = append(roots, r) + fmt.Println("Roots: ", roots) } if s, ok := u.(*DataSource); ok { source = s diff --git a/sdks/go/pkg/beam/core/runtime/graphx/coder.go b/sdks/go/pkg/beam/core/runtime/graphx/coder.go index a8b897538232..e70567d705bd 100644 --- a/sdks/go/pkg/beam/core/runtime/graphx/coder.go +++ b/sdks/go/pkg/beam/core/runtime/graphx/coder.go @@ -46,6 +46,7 @@ const ( urnParamWindowedValueCoder = "beam:coder:param_windowed_value:v1" urnTimerCoder = "beam:coder:timer:v1" urnRowCoder = "beam:coder:row:v1" + urnNullableCoder = "beam:coder:nullable:v1" urnGlobalWindow = "beam:coder:global_window:v1" urnIntervalWindow = "beam:coder:interval_window:v1" @@ -71,6 +72,7 @@ func knownStandardCoders() []string { urnGlobalWindow, urnIntervalWindow, urnRowCoder, + urnNullableCoder, // TODO(BEAM-10660): Add urnTimerCoder once finalized. } } @@ -368,6 +370,15 @@ func (b *CoderUnmarshaller) makeCoder(id string, c *pipepb.Coder) (*coder.Coder, return nil, err } return coder.NewR(typex.New(t)), nil + case urnNullableCoder: + if len(components) != 1 { + return nil, errors.Errorf("could not unmarshal nullable coder from %v, expected one component but got %d", c, len(components)) + } + elm, err := b.Coder(components[0]) + if err != nil { + return nil, err + } + return coder.NewN(elm), nil // Special handling for window coders so they can be treated as // a general coder. Generally window coders are not used outside of @@ -386,7 +397,6 @@ func (b *CoderUnmarshaller) makeCoder(id string, c *pipepb.Coder) (*coder.Coder, return nil, err } return &coder.Coder{Kind: coder.Window, T: typex.New(reflect.TypeOf((*struct{})(nil)).Elem()), Window: w}, nil - default: return nil, errors.Errorf("could not unmarshal coder from %v, unknown URN %v", c, urn) } @@ -465,6 +475,13 @@ func (b *CoderMarshaller) Add(c *coder.Coder) (string, error) { } return b.internBuiltInCoder(urnKVCoder, comp...), nil + case coder.Nullable: + comp, err := b.AddMulti(c.Components) + if err != nil { + return "", errors.Wrapf(err, "failed to marshal Nullable coder %v", c) + } + return b.internBuiltInCoder(urnNullableCoder, comp...), nil + case coder.CoGBK: comp, err := b.AddMulti(c.Components) if err != nil { diff --git a/sdks/go/pkg/beam/core/typex/fulltype.go b/sdks/go/pkg/beam/core/typex/fulltype.go index cbfb443755ba..df5425a4e1a9 100644 --- a/sdks/go/pkg/beam/core/typex/fulltype.go +++ b/sdks/go/pkg/beam/core/typex/fulltype.go @@ -87,6 +87,8 @@ func printShortComposite(t reflect.Type) string { return "CoGBK" case KVType: return "KV" + case NullableType: + return "Nullable" default: return fmt.Sprintf("invalid(%v)", t) } diff --git a/sdks/go/pkg/beam/core/typex/special.go b/sdks/go/pkg/beam/core/typex/special.go index dd4199c5628e..24f6540fac93 100644 --- a/sdks/go/pkg/beam/core/typex/special.go +++ b/sdks/go/pkg/beam/core/typex/special.go @@ -38,6 +38,7 @@ var ( PaneInfoType = reflect.TypeOf((*PaneInfo)(nil)).Elem() KVType = reflect.TypeOf((*KV)(nil)).Elem() + NullableType = reflect.TypeOf((*Nullable)(nil)).Elem() CoGBKType = reflect.TypeOf((*CoGBK)(nil)).Elem() WindowedValueType = reflect.TypeOf((*WindowedValue)(nil)).Elem() ) @@ -85,6 +86,8 @@ type PaneInfo struct { type KV struct{} +type Nullable struct{} + type CoGBK struct{} type WindowedValue struct{} From 43c84e62b5b96b2c635aa224518ecc20d1cfe73f Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Tue, 8 Mar 2022 15:30:47 -0500 Subject: [PATCH 11/25] [BEAM-10529] cleanup extra println --- sdks/go/pkg/beam/core/runtime/exec/plan.go | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/go/pkg/beam/core/runtime/exec/plan.go b/sdks/go/pkg/beam/core/runtime/exec/plan.go index a3dc3e910225..7f89ce37322c 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/plan.go +++ b/sdks/go/pkg/beam/core/runtime/exec/plan.go @@ -52,7 +52,6 @@ func NewPlan(id string, units []Unit) (*Plan, error) { } if r, ok := u.(Root); ok { roots = append(roots, r) - fmt.Println("Roots: ", roots) } if s, ok := u.(*DataSource); ok { source = s From 3e93b56c886a8aa61fe91bc6cb31414c3bbb57b7 Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Wed, 9 Mar 2022 10:04:11 -0500 Subject: [PATCH 12/25] [BEAM-10529] improve comments, clean up python --- .../src/main/proto/beam_runner_api.proto | 19 ++++++++----------- sdks/python/apache_beam/coders/typecoders.py | 4 +--- .../apache_beam/coders/typecoders_test.py | 3 +-- .../python/apache_beam/typehints/typehints.py | 7 +++++++ .../apache_beam/typehints/typehints_test.py | 8 ++++++++ 5 files changed, 25 insertions(+), 16 deletions(-) diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto index 2b67e81c2ebe..23d0a65bc14a 100644 --- a/model/pipeline/src/main/proto/beam_runner_api.proto +++ b/model/pipeline/src/main/proto/beam_runner_api.proto @@ -1043,11 +1043,8 @@ message StandardCoders { // - Followed by N interleaved keys and values, encoded with their // corresponding coder. // - // Nullable types in container types (ArrayType, MapType) are encoded by: - // - A one byte null indicator, 0x00 for null values, or 0x01 for present - // values. - // - For present values the null indicator is followed by the value - // encoded with it's corresponding coder. + // Nullable types in container types (ArrayType, MapType) per the + // encoding described for general Nullable types below. // // Well known logical types: // beam:logical_type:micros_instant:v1 @@ -1082,12 +1079,12 @@ message StandardCoders { // Experimental. SHARDED_KEY = 15 [(beam_urn) = "beam:coder:sharded_key:v1"]; - //Wraps a coder of a potentially null value - // - // A Nullable coder encodes nullable values of wrapped coder value that does - // not tolerate null values. A Nullable coder uses exactly 1 byte per entry - // to indicate whether the value is null, then adds the encoding of the - // inner coder for non-null values. + // Wraps a coder of a potentially null value + // A Nullable Type is encoded by: + // - A one byte null indicator, 0x00 for null values, or 0x01 for present + // values. + // - For present values the null indicator is followed by the value + // encoded with it's corresponding coder. NULLABLE = 17 [(beam_urn) = "beam:coder:nullable:v1"]; } } diff --git a/sdks/python/apache_beam/coders/typecoders.py b/sdks/python/apache_beam/coders/typecoders.py index 49140cef9506..b81bea375874 100644 --- a/sdks/python/apache_beam/coders/typecoders.py +++ b/sdks/python/apache_beam/coders/typecoders.py @@ -137,9 +137,7 @@ def get_coder(self, typehint): return coders.IterableCoder.from_type_hint(typehint, self) elif isinstance(typehint, typehints.ListConstraint): return coders.ListCoder.from_type_hint(typehint, self) - elif (isinstance(typehint, typehints.UnionConstraint) and - typehint.contains_type(type(None) and - len(list(typehint._inner_types())) == 2)): + elif typehints.is_optional(typehint): return coders.NullableCoder.from_type_hint(typehint, self) elif typehint is None: # In some old code, None is used for Any. diff --git a/sdks/python/apache_beam/coders/typecoders_test.py b/sdks/python/apache_beam/coders/typecoders_test.py index 5727127e1ef8..f74483ad48dc 100644 --- a/sdks/python/apache_beam/coders/typecoders_test.py +++ b/sdks/python/apache_beam/coders/typecoders_test.py @@ -142,8 +142,7 @@ def test_list_coder(self): def test_nullable_coder(self): expected_coder = coders.NullableCoder(coders.BytesCoder()) - real_coder = typecoders.registry.get_coder( - typehints.UnionConstraint([type(None), type(bytes)])) + real_coder = typecoders.registry.get_coder(typehints.Optional(bytes)) self.assertEqual(expected_coder, real_coder) self.assertEqual(expected_coder.encode(None), real_coder.encode(None)) self.assertEqual(expected_coder.encode(b'abc'), real_coder.encode(b'abc')) diff --git a/sdks/python/apache_beam/typehints/typehints.py b/sdks/python/apache_beam/typehints/typehints.py index d58a1f9ddabf..fbb30e2e3347 100644 --- a/sdks/python/apache_beam/typehints/typehints.py +++ b/sdks/python/apache_beam/typehints/typehints.py @@ -604,6 +604,13 @@ def __getitem__(self, py_type): return Union[py_type, type(None)] +def is_optional(typehint): + return ( + isinstance(typehint, UnionConstraint) and + typehint.contains_type(type(None)) and + len(list(typehint._inner_types())) == 2) + + class TupleHint(CompositeTypeHint): """A Tuple type-hint. diff --git a/sdks/python/apache_beam/typehints/typehints_test.py b/sdks/python/apache_beam/typehints/typehints_test.py index b3a4d636e9b5..a40c180fc463 100644 --- a/sdks/python/apache_beam/typehints/typehints_test.py +++ b/sdks/python/apache_beam/typehints/typehints_test.py @@ -320,6 +320,14 @@ def test_getitem_proxy_to_union(self): hint = typehints.Optional[int] self.assertTrue(isinstance(hint, typehints.UnionHint.UnionConstraint)) + def test_is_optional(self): + hint1 = typehints.Optional[int] + self.assertTrue(typehints.is_optional(hint1)) + hint2 = typehints.UnionConstraint({int, bytes}) + self.assertFalse(typehints.is_optional(hint2)) + hint3 = typehints.UnionConstraint({int, bytes, type(None)}) + self.assertFalse(typehints.is_optional(hint3)) + class TupleHintTestCase(TypeHintTestCase): def test_getitem_invalid_ellipsis_type_param(self): From a0780296c6613a107a4acd9dae12258480c44411 Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Wed, 9 Mar 2022 10:09:26 -0500 Subject: [PATCH 13/25] [BEAM-10529] remove changes to kafkaIO to simplify pr --- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 91497c8d068c..21fba4515294 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -48,7 +48,6 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.NullableCoder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.coders.VoidCoder; @@ -777,11 +776,11 @@ private static Coder resolveCoder(Class deserializer) { continue; } if (returnType.equals(byte[].class)) { - return NullableCoder.of(ByteArrayCoder.of()); + return ByteArrayCoder.of(); } else if (returnType.equals(Integer.class)) { - return NullableCoder.of(VarIntCoder.of()); + return VarIntCoder.of(); } else if (returnType.equals(Long.class)) { - return NullableCoder.of(VarLongCoder.of()); + return VarLongCoder.of(); } else { throw new RuntimeException("Couldn't infer Coder from " + deserializer); } @@ -1715,17 +1714,15 @@ public PTransform> buildExternal( Class keyDeserializer = resolveClass(config.keyDeserializer); Coder keyCoder = Read.Builder.resolveCoder(keyDeserializer); - if (!(keyCoder instanceof NullableCoder - && keyCoder.getCoderArguments().get(0) instanceof ByteArrayCoder)) { + if (!(keyCoder instanceof ByteArrayCoder)) { throw new RuntimeException( - "ExternalWithMetadata transform only supports keys of type nullable(byte[])"); + "ExternalWithMetadata transform only supports keys of type byte[]"); } Class valueDeserializer = resolveClass(config.valueDeserializer); Coder valueCoder = Read.Builder.resolveCoder(valueDeserializer); - if (!(valueCoder instanceof NullableCoder - && valueCoder.getCoderArguments().get(0) instanceof ByteArrayCoder)) { + if (!(valueCoder instanceof ByteArrayCoder)) { throw new RuntimeException( - "ExternalWithMetadata transform only supports values of type nullable(byte[])"); + "ExternalWithMetadata transform only supports values of type byte[]"); } return readBuilder.build().externalWithMetadata(); From 258d7a9ff7046e6391ef01508e744f56be7bc9b5 Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Wed, 9 Mar 2022 13:52:18 -0500 Subject: [PATCH 14/25] [BEAM-10529] add coders to go exec, add asf license text --- sdks/go/pkg/beam/core/graph/coder/coder.go | 10 +-- sdks/go/pkg/beam/core/graph/coder/nil.go | 15 +++++ sdks/go/pkg/beam/core/graph/coder/nil_test.go | 15 +++++ sdks/go/pkg/beam/core/runtime/exec/coder.go | 65 +++++++++++++++++++ .../pkg/beam/core/runtime/exec/coder_test.go | 6 ++ 5 files changed, 104 insertions(+), 7 deletions(-) diff --git a/sdks/go/pkg/beam/core/graph/coder/coder.go b/sdks/go/pkg/beam/core/graph/coder/coder.go index ef7446ee5d2f..27ca861d162a 100644 --- a/sdks/go/pkg/beam/core/graph/coder/coder.go +++ b/sdks/go/pkg/beam/core/graph/coder/coder.go @@ -396,7 +396,9 @@ func NewKV(components []*Coder) *Coder { } func NewN(component *Coder) *Coder { - checkCoderNotNil(component, "Nullable") + coders := make([]*Coder, 1) + coders[0] = component + checkCodersNotNil(coders) return &Coder{ Kind: Nullable, T: typex.New(typex.NullableType, component.T), @@ -454,9 +456,3 @@ func checkCodersNotNil(list []*Coder) { } } } - -func checkCoderNotNil(c *Coder, outercoder string) { - if c == nil { - panic(fmt.Sprintf("nil inner coder for %v coder", outercoder)) - } -} diff --git a/sdks/go/pkg/beam/core/graph/coder/nil.go b/sdks/go/pkg/beam/core/graph/coder/nil.go index eb4bc7e1b4d3..a7ed27cb6d5e 100644 --- a/sdks/go/pkg/beam/core/graph/coder/nil.go +++ b/sdks/go/pkg/beam/core/graph/coder/nil.go @@ -1,3 +1,18 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package coder import ( diff --git a/sdks/go/pkg/beam/core/graph/coder/nil_test.go b/sdks/go/pkg/beam/core/graph/coder/nil_test.go index f3def723193e..89410b9c9395 100644 --- a/sdks/go/pkg/beam/core/graph/coder/nil_test.go +++ b/sdks/go/pkg/beam/core/graph/coder/nil_test.go @@ -1,3 +1,18 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package coder import ( diff --git a/sdks/go/pkg/beam/core/runtime/exec/coder.go b/sdks/go/pkg/beam/core/runtime/exec/coder.go index c7a19eae0470..5b108ba05f28 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/coder.go +++ b/sdks/go/pkg/beam/core/runtime/exec/coder.go @@ -156,6 +156,12 @@ func MakeElementEncoder(c *coder.Coder) ElementEncoder { enc: enc, } + case coder.Nullable: + return &nullableEncoder{ + inner: MakeElementEncoder(c.Components[0]), + be: boolEncoder{}, + } + default: panic(fmt.Sprintf("Unexpected coder: %v", c)) } @@ -267,6 +273,12 @@ func MakeElementDecoder(c *coder.Coder) ElementDecoder { dec: dec, } + case coder.Nullable: + return &nullableDecoder{ + inner: MakeElementDecoder(c.Components[0]), + bd: boolDecoder{}, + } + default: panic(fmt.Sprintf("Unexpected coder: %v", c)) } @@ -609,6 +621,59 @@ func convertIfNeeded(v interface{}, allocated *FullValue) *FullValue { return allocated } +type nullableEncoder struct { + inner ElementEncoder + be boolEncoder +} + +func (n nullableEncoder) Encode(value *FullValue, writer io.Writer) error { + if value.Elm == nil { + err := n.be.Encode(&FullValue{Elm: false}, writer) + if err != nil { + return err + } + return nil + } + err := n.be.Encode(&FullValue{Elm: true}, writer) + if err != nil { + return err + } + err = n.inner.Encode(value, writer) + if err != nil { + return err + } + return nil +} + +type nullableDecoder struct { + inner ElementDecoder + bd boolDecoder +} + +func (n nullableDecoder) Decode(reader io.Reader) (*FullValue, error) { + hasValue, err := n.bd.Decode(reader) + if err != nil { + return nil, err + } + if !hasValue.Elm.(bool) { + return &FullValue{}, nil + } + val, err := n.inner.Decode(reader) + if err != nil { + return nil, err + } + return val, nil +} + +func (n nullableDecoder) DecodeTo(reader io.Reader, value *FullValue) error { + val, err := n.Decode(reader) + if err != nil { + return err + } + value = val + return nil +} + type iterableEncoder struct { t reflect.Type enc ElementEncoder diff --git a/sdks/go/pkg/beam/core/runtime/exec/coder_test.go b/sdks/go/pkg/beam/core/runtime/exec/coder_test.go index 25812aca4e56..450a7e67e1c5 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/coder_test.go +++ b/sdks/go/pkg/beam/core/runtime/exec/coder_test.go @@ -64,6 +64,12 @@ func TestCoders(t *testing.T) { coder.NewDouble(), coder.NewBool()})}), val: &FullValue{Elm: int64(42), Elm2: &FullValue{Elm: float64(3.14), Elm2: true}}, + }, { + coder: coder.NewN(coder.NewBytes()), + val: &FullValue{}, + }, { + coder: coder.NewN(coder.NewBytes()), + val: &FullValue{Elm: []byte("myBytes")}, }, } { t.Run(fmt.Sprintf("%v", test.coder), func(t *testing.T) { From d19c47c14109f36a03cecb9616dab6fa386fa9ee Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Wed, 9 Mar 2022 14:22:32 -0500 Subject: [PATCH 15/25] [BEAM-10529] clean up error handlign --- sdks/go/pkg/beam/core/runtime/exec/coder.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/exec/coder.go b/sdks/go/pkg/beam/core/runtime/exec/coder.go index 5b108ba05f28..fb52f448d33b 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/coder.go +++ b/sdks/go/pkg/beam/core/runtime/exec/coder.go @@ -628,18 +628,15 @@ type nullableEncoder struct { func (n nullableEncoder) Encode(value *FullValue, writer io.Writer) error { if value.Elm == nil { - err := n.be.Encode(&FullValue{Elm: false}, writer) - if err != nil { + if err := n.be.Encode(&FullValue{Elm: false}, writer); err != nil { return err } return nil } - err := n.be.Encode(&FullValue{Elm: true}, writer) - if err != nil { + if err := n.be.Encode(&FullValue{Elm: true}, writer); err != nil { return err } - err = n.inner.Encode(value, writer) - if err != nil { + if err := n.inner.Encode(value, writer); err != nil { return err } return nil From 269b4148a7226a36a3b77575a73a836932e920a2 Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Wed, 9 Mar 2022 15:31:56 -0500 Subject: [PATCH 16/25] [BEAM-10529] update go fromyaml to handle nullable cases --- sdks/go/pkg/beam/core/runtime/exec/coder.go | 2 +- sdks/go/test/regression/coders/fromyaml/fromyaml.go | 13 +++++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/sdks/go/pkg/beam/core/runtime/exec/coder.go b/sdks/go/pkg/beam/core/runtime/exec/coder.go index fb52f448d33b..2ed50be165fb 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/coder.go +++ b/sdks/go/pkg/beam/core/runtime/exec/coder.go @@ -667,7 +667,7 @@ func (n nullableDecoder) DecodeTo(reader io.Reader, value *FullValue) error { if err != nil { return err } - value = val + value.Elm = val.Elm return nil } diff --git a/sdks/go/test/regression/coders/fromyaml/fromyaml.go b/sdks/go/test/regression/coders/fromyaml/fromyaml.go index 82d3e9fdb248..199ff4e2a91d 100644 --- a/sdks/go/test/regression/coders/fromyaml/fromyaml.go +++ b/sdks/go/test/regression/coders/fromyaml/fromyaml.go @@ -218,6 +218,19 @@ func diff(c Coder, elem *exec.FullValue, eg yaml.MapItem) bool { } return pass + case "beam:coder:nullable:v1": + if elem.Elm == nil || eg.Value == nil { + got, want = elem.Elm, eg.Value + } else { + got = string(elem.Elm.([]byte)) + switch egv := eg.Value.(type) { + case string: + want = egv + case []byte: + want = string(egv) + } + } + case "beam:coder:iterable:v1": pass := true gotrv := reflect.ValueOf(elem.Elm) From 4656b312bdd37bff49c70f97fe138ed237354649 Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Wed, 9 Mar 2022 16:17:48 -0500 Subject: [PATCH 17/25] [BEAM-10529] add unit test, register nullable coder in dataflow.go --- .../beam/core/runtime/graphx/coder_test.go | 4 ++++ .../pkg/beam/core/runtime/graphx/dataflow.go | 24 +++++++++++++++++++ sdks/gorun.sh | 16 +++++++++++++ 3 files changed, 44 insertions(+) create mode 100755 sdks/gorun.sh diff --git a/sdks/go/pkg/beam/core/runtime/graphx/coder_test.go b/sdks/go/pkg/beam/core/runtime/graphx/coder_test.go index 8296c9fd3381..aad15df0f23f 100644 --- a/sdks/go/pkg/beam/core/runtime/graphx/coder_test.go +++ b/sdks/go/pkg/beam/core/runtime/graphx/coder_test.go @@ -88,6 +88,10 @@ func TestMarshalUnmarshalCoders(t *testing.T) { "W", coder.NewW(coder.NewBytes(), coder.NewGlobalWindow()), }, + { + "N", + coder.NewN(coder.NewBytes()), + }, { "KV", coder.NewKV([]*coder.Coder{foo, bar}), diff --git a/sdks/go/pkg/beam/core/runtime/graphx/dataflow.go b/sdks/go/pkg/beam/core/runtime/graphx/dataflow.go index 77aa6ca46a57..e2eec3b5bcc8 100644 --- a/sdks/go/pkg/beam/core/runtime/graphx/dataflow.go +++ b/sdks/go/pkg/beam/core/runtime/graphx/dataflow.go @@ -48,6 +48,7 @@ const ( doubleType = "kind:double" streamType = "kind:stream" pairType = "kind:pair" + nullableType = "kind:nullable" lengthPrefixType = "kind:length_prefix" rowType = "kind:row" @@ -117,6 +118,16 @@ func EncodeCoderRef(c *coder.Coder) (*CoderRef, error) { } return &CoderRef{Type: pairType, Components: []*CoderRef{key, value}, IsPairLike: true}, nil + case coder.Nullable: + if len(c.Components) != 1 { + return nil, errors.Errorf("bad N: %v", c) + } + innerref, err := EncodeCoderRef(c.Components[0]) + if err != nil { + return nil, err + } + return &CoderRef{Type: nullableType, Components: []*CoderRef{innerref}}, nil + case coder.CoGBK: if len(c.Components) < 2 { return nil, errors.Errorf("bad CoGBK: %v", c) @@ -264,6 +275,19 @@ func DecodeCoderRef(c *CoderRef) (*coder.Coder, error) { t := typex.New(root, key.T, value.T) return &coder.Coder{Kind: kind, T: t, Components: []*coder.Coder{key, value}}, nil + case nullableType: + if len(c.Components) != 1 { + return nil, errors.Errorf("bad nullable: %+v", c) + } + + inner, err := DecodeCoderRef(c.Components[0]) + if err != nil { + return nil, err + } + + t := typex.New(typex.NullableType, inner.T) + return &coder.Coder{Kind: coder.Nullable, T: t, Components: []*coder.Coder{inner}}, nil + case lengthPrefixType: if len(c.Components) != 1 { return nil, errors.Errorf("bad length prefix: %+v", c) diff --git a/sdks/gorun.sh b/sdks/gorun.sh new file mode 100755 index 000000000000..b0b21d6448f2 --- /dev/null +++ b/sdks/gorun.sh @@ -0,0 +1,16 @@ +#!/bin/bash +export PROJECT="$(gcloud config get-value project)" +export TEMP_LOCATION="gs://MY-BUCKET/temp" +export REGION="us-central1" +export JOB_NAME="kafka-taxi-`date +%Y%m%d-%H%M%S`" +export BOOTSTRAP_SERVERS="123.45.67.89:1234" +export EXPANSION_ADDR="localhost:1234" +go run ./sdks/go/examples/kafka/taxi.go --expansion_addr=$EXPANSION_ADDR +# --runner=DataflowRunner \ +# --temp_location=$TEMP_LOCATION \ +# --staging_location=$STAGING_LOCATION \ +# --project=$PROJECT \ +# --region=$REGION \ +# --job_name="${JOB_NAME}" \ +# --bootstrap_servers=$BOOTSTRAP_SERVER \ +# --experiments=use_portable_job_submission,use_runner_v2 \ From 59d3d73b933accc1607355dc98a9ee3146005d1e Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Wed, 9 Mar 2022 16:18:40 -0500 Subject: [PATCH 18/25] [BEAM-10529] remove mistaken commit --- sdks/gorun.sh | 16 ---------------- 1 file changed, 16 deletions(-) delete mode 100755 sdks/gorun.sh diff --git a/sdks/gorun.sh b/sdks/gorun.sh deleted file mode 100755 index b0b21d6448f2..000000000000 --- a/sdks/gorun.sh +++ /dev/null @@ -1,16 +0,0 @@ -#!/bin/bash -export PROJECT="$(gcloud config get-value project)" -export TEMP_LOCATION="gs://MY-BUCKET/temp" -export REGION="us-central1" -export JOB_NAME="kafka-taxi-`date +%Y%m%d-%H%M%S`" -export BOOTSTRAP_SERVERS="123.45.67.89:1234" -export EXPANSION_ADDR="localhost:1234" -go run ./sdks/go/examples/kafka/taxi.go --expansion_addr=$EXPANSION_ADDR -# --runner=DataflowRunner \ -# --temp_location=$TEMP_LOCATION \ -# --staging_location=$STAGING_LOCATION \ -# --project=$PROJECT \ -# --region=$REGION \ -# --job_name="${JOB_NAME}" \ -# --bootstrap_servers=$BOOTSTRAP_SERVER \ -# --experiments=use_portable_job_submission,use_runner_v2 \ From f60e4ea97ee208f5965e022a97ce30602a7e036e Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Tue, 29 Mar 2022 13:26:54 -0400 Subject: [PATCH 19/25] [BEAM-10529] add argument check to CoderTranslators --- .../beam/runners/core/construction/CoderTranslators.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java index 79a04a01d0be..96fb185f3bac 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java @@ -209,6 +209,8 @@ static CoderTranslator> nullable() { return new SimpleStructuredCoderTranslator>() { @Override protected NullableCoder fromComponents(List> components) { + checkArgument( + components.size() == 1, "Expected one component component, but received: " + components); return NullableCoder.of(components.get(0)); } @@ -224,6 +226,7 @@ public abstract static class SimpleStructuredCoderTranslator> @Override public final T fromComponents( List> components, byte[] payload, TranslationContext context) { + checkArgument(components.size() == 1, "Expected one component, but received: "+ components); return fromComponents(components); } From 1805a49eb4558bd2315d65aebf2befc3e26a8b5b Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Tue, 29 Mar 2022 14:23:39 -0400 Subject: [PATCH 20/25] [BEAM-10529] Address python comments & cleanup --- sdks/python/apache_beam/coders/coders.py | 12 +++++++++--- sdks/python/apache_beam/coders/typecoders.py | 2 +- sdks/python/apache_beam/typehints/typehints.py | 15 ++++++++++++--- .../apache_beam/typehints/typehints_test.py | 6 +++--- 4 files changed, 25 insertions(+), 10 deletions(-) diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index 0a191c4d579c..ba5e4c4fd29b 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -578,9 +578,15 @@ def _get_component_coders(self): @classmethod def from_type_hint(cls, typehint, registry): - value_type = list( - filter(lambda t: t is not type(None), typehint._inner_types()))[0] - return cls(registry.get_coder(value_type)) + if typehints.is_nullable(typehint): + return cls( + registry.get_coder( + typehints.get_concrete_type_from_nullable(typehint))) + else: + raise TypeError( + 'Typehint is not of nullable type, ' + 'and cannot be converted to a NullableCoder', + typehint) def is_deterministic(self): # type: () -> bool diff --git a/sdks/python/apache_beam/coders/typecoders.py b/sdks/python/apache_beam/coders/typecoders.py index b81bea375874..a66ebe523697 100644 --- a/sdks/python/apache_beam/coders/typecoders.py +++ b/sdks/python/apache_beam/coders/typecoders.py @@ -137,7 +137,7 @@ def get_coder(self, typehint): return coders.IterableCoder.from_type_hint(typehint, self) elif isinstance(typehint, typehints.ListConstraint): return coders.ListCoder.from_type_hint(typehint, self) - elif typehints.is_optional(typehint): + elif typehints.is_nullable(typehint): return coders.NullableCoder.from_type_hint(typehint, self) elif typehint is None: # In some old code, None is used for Any. diff --git a/sdks/python/apache_beam/typehints/typehints.py b/sdks/python/apache_beam/typehints/typehints.py index fbb30e2e3347..c0ec3b7537db 100644 --- a/sdks/python/apache_beam/typehints/typehints.py +++ b/sdks/python/apache_beam/typehints/typehints.py @@ -503,7 +503,7 @@ def __repr__(self): return 'Union[%s]' % ( ', '.join(sorted(_unified_repr(t) for t in self.union_types))) - def _inner_types(self): + def inner_types(self): for t in self.union_types: yield t @@ -604,11 +604,20 @@ def __getitem__(self, py_type): return Union[py_type, type(None)] -def is_optional(typehint): +def is_nullable(typehint): return ( isinstance(typehint, UnionConstraint) and typehint.contains_type(type(None)) and - len(list(typehint._inner_types())) == 2) + len(list(typehint.inner_types())) == 2) + + +def get_concrete_type_from_nullable(typehint): + if is_nullable(typehint): + for inner_type in typehint.inner_types(): + if not inner_type.isInstance(type(None)): + return inner_type + else: + raise TypeError('Typehint is not of nullable type', typehint) class TupleHint(CompositeTypeHint): diff --git a/sdks/python/apache_beam/typehints/typehints_test.py b/sdks/python/apache_beam/typehints/typehints_test.py index a40c180fc463..8818639035ff 100644 --- a/sdks/python/apache_beam/typehints/typehints_test.py +++ b/sdks/python/apache_beam/typehints/typehints_test.py @@ -322,11 +322,11 @@ def test_getitem_proxy_to_union(self): def test_is_optional(self): hint1 = typehints.Optional[int] - self.assertTrue(typehints.is_optional(hint1)) + self.assertTrue(typehints.is_nullable(hint1)) hint2 = typehints.UnionConstraint({int, bytes}) - self.assertFalse(typehints.is_optional(hint2)) + self.assertFalse(typehints.is_nullable(hint2)) hint3 = typehints.UnionConstraint({int, bytes, type(None)}) - self.assertFalse(typehints.is_optional(hint3)) + self.assertFalse(typehints.is_nullable(hint3)) class TupleHintTestCase(TypeHintTestCase): From cc243f18da28338b0351d9c053bbb8b222873148 Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Wed, 30 Mar 2022 16:42:30 -0400 Subject: [PATCH 21/25] [BEAM-10529] address go comments --- model/pipeline/src/main/proto/beam_runner_api.proto | 1 + sdks/go/pkg/beam/core/graph/coder/coder.go | 5 ++--- sdks/go/pkg/beam/core/graph/coder/coder_test.go | 11 ++++++----- sdks/go/pkg/beam/core/runtime/exec/coder.go | 6 +++--- 4 files changed, 12 insertions(+), 11 deletions(-) diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto index e27a1c771705..c1e318491f27 100644 --- a/model/pipeline/src/main/proto/beam_runner_api.proto +++ b/model/pipeline/src/main/proto/beam_runner_api.proto @@ -1089,6 +1089,7 @@ message StandardCoders { // values. // - For present values the null indicator is followed by the value // encoded with it's corresponding coder. + // Components: single coder for the value NULLABLE = 17 [(beam_urn) = "beam:coder:nullable:v1"]; } } diff --git a/sdks/go/pkg/beam/core/graph/coder/coder.go b/sdks/go/pkg/beam/core/graph/coder/coder.go index 27ca861d162a..8424f7af8758 100644 --- a/sdks/go/pkg/beam/core/graph/coder/coder.go +++ b/sdks/go/pkg/beam/core/graph/coder/coder.go @@ -396,13 +396,12 @@ func NewKV(components []*Coder) *Coder { } func NewN(component *Coder) *Coder { - coders := make([]*Coder, 1) - coders[0] = component + coders := []*Coder{component} checkCodersNotNil(coders) return &Coder{ Kind: Nullable, T: typex.New(typex.NullableType, component.T), - Components: []*Coder{component}, + Components: coders, } } diff --git a/sdks/go/pkg/beam/core/graph/coder/coder_test.go b/sdks/go/pkg/beam/core/graph/coder/coder_test.go index efdca9698c65..44606dc1efb2 100644 --- a/sdks/go/pkg/beam/core/graph/coder/coder_test.go +++ b/sdks/go/pkg/beam/core/graph/coder/coder_test.go @@ -532,11 +532,12 @@ func TestNewNullable(t *testing.T) { component *Coder shouldpanic bool want *Coder - }{{ - name: "nil", - component: nil, - shouldpanic: true, - }, + }{ + { + name: "nil", + component: nil, + shouldpanic: true, + }, { name: "empty", component: &Coder{}, diff --git a/sdks/go/pkg/beam/core/runtime/exec/coder.go b/sdks/go/pkg/beam/core/runtime/exec/coder.go index 2ed50be165fb..145209a492cd 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/coder.go +++ b/sdks/go/pkg/beam/core/runtime/exec/coder.go @@ -626,7 +626,7 @@ type nullableEncoder struct { be boolEncoder } -func (n nullableEncoder) Encode(value *FullValue, writer io.Writer) error { +func (n *nullableEncoder) Encode(value *FullValue, writer io.Writer) error { if value.Elm == nil { if err := n.be.Encode(&FullValue{Elm: false}, writer); err != nil { return err @@ -647,7 +647,7 @@ type nullableDecoder struct { bd boolDecoder } -func (n nullableDecoder) Decode(reader io.Reader) (*FullValue, error) { +func (n *nullableDecoder) Decode(reader io.Reader) (*FullValue, error) { hasValue, err := n.bd.Decode(reader) if err != nil { return nil, err @@ -662,7 +662,7 @@ func (n nullableDecoder) Decode(reader io.Reader) (*FullValue, error) { return val, nil } -func (n nullableDecoder) DecodeTo(reader io.Reader, value *FullValue) error { +func (n *nullableDecoder) DecodeTo(reader io.Reader, value *FullValue) error { val, err := n.Decode(reader) if err != nil { return err From c9761f56edc250db5e8873d673a72c4829616b25 Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Fri, 1 Apr 2022 10:24:39 -0400 Subject: [PATCH 22/25] [BEAM-10529] remove extra check that was added in error --- .../apache/beam/runners/core/construction/CoderTranslators.java | 1 - 1 file changed, 1 deletion(-) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java index 96fb185f3bac..9cdf62bcf6e1 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java @@ -226,7 +226,6 @@ public abstract static class SimpleStructuredCoderTranslator> @Override public final T fromComponents( List> components, byte[] payload, TranslationContext context) { - checkArgument(components.size() == 1, "Expected one component, but received: "+ components); return fromComponents(components); } From c5ae489469ab59633e1c9bb8af42de8f8f549037 Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Fri, 1 Apr 2022 10:26:05 -0400 Subject: [PATCH 23/25] [BEAM-10529] fix typo --- .../apache/beam/runners/core/construction/CoderTranslators.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java index 9cdf62bcf6e1..59d14b608621 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java @@ -210,7 +210,7 @@ static CoderTranslator> nullable() { @Override protected NullableCoder fromComponents(List> components) { checkArgument( - components.size() == 1, "Expected one component component, but received: " + components); + components.size() == 1, "Expected one component, but received: " + components); return NullableCoder.of(components.get(0)); } From d0a59799d3634bac49a2f35675e4e37507a4a736 Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Fri, 1 Apr 2022 11:52:03 -0400 Subject: [PATCH 24/25] [BEAM-10529] re-order check for nonetype to prevent attribute errors --- sdks/python/apache_beam/typehints/typehints.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/typehints/typehints.py b/sdks/python/apache_beam/typehints/typehints.py index c0ec3b7537db..d07a11ea7483 100644 --- a/sdks/python/apache_beam/typehints/typehints.py +++ b/sdks/python/apache_beam/typehints/typehints.py @@ -614,7 +614,7 @@ def is_nullable(typehint): def get_concrete_type_from_nullable(typehint): if is_nullable(typehint): for inner_type in typehint.inner_types(): - if not inner_type.isInstance(type(None)): + if not type(None).isInstance(inner_type): return inner_type else: raise TypeError('Typehint is not of nullable type', typehint) From dbe1bb3cbecd3fd0de0cf01e79c702e2eef799c9 Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Fri, 1 Apr 2022 12:56:54 -0400 Subject: [PATCH 25/25] [BEAM-10529] change isinstance to == --- sdks/python/apache_beam/typehints/typehints.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/typehints/typehints.py b/sdks/python/apache_beam/typehints/typehints.py index 32fb1e791d48..45c2366dd8b8 100644 --- a/sdks/python/apache_beam/typehints/typehints.py +++ b/sdks/python/apache_beam/typehints/typehints.py @@ -614,7 +614,7 @@ def is_nullable(typehint): def get_concrete_type_from_nullable(typehint): if is_nullable(typehint): for inner_type in typehint.inner_types(): - if not type(None).isInstance(inner_type): + if not type(None) == inner_type: return inner_type else: raise TypeError('Typehint is not of nullable type', typehint)