Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
b4b0025
[BEAM-10529] add java and generic components of nullable xlang tests
johnjcasey Feb 14, 2022
798fd53
[BEAM-10529] fix test case
johnjcasey Feb 14, 2022
8743549
[BEAM-10529] add coders and typehints to support nullable xlang coders
johnjcasey Feb 22, 2022
1ce6605
[BEAM-10529] update external builder to support nullable coder
johnjcasey Feb 22, 2022
04868d6
Merge remote-tracking branch 'origin/master' into feature/BEAM-10529-…
johnjcasey Feb 22, 2022
a6a38d0
[BEAM-10529] clean up coders.py
johnjcasey Feb 22, 2022
e3e0223
[BEAM-10529] add coder translation test
johnjcasey Feb 22, 2022
30f06d6
[BEAM-10529] add additional check to typecoder to not accidentally mi…
johnjcasey Feb 23, 2022
24dd876
[BEAM-10529] add test to retrieve nullable coder from typehint
johnjcasey Feb 23, 2022
e642e7c
[BEAM-10529] run spotless
johnjcasey Feb 23, 2022
f55074c
Merge remote-tracking branch 'origin/feature/BEAM-10529-nullable-Xlan…
johnjcasey Feb 23, 2022
4dd1c54
[BEAM-10529] add go nullable coder
johnjcasey Mar 8, 2022
43c84e6
[BEAM-10529] cleanup extra println
johnjcasey Mar 8, 2022
3e93b56
[BEAM-10529] improve comments, clean up python
johnjcasey Mar 9, 2022
48b6f34
Merge remote-tracking branch 'origin/feature/BEAM-10529-nullable-Xlan…
johnjcasey Mar 9, 2022
a078029
[BEAM-10529] remove changes to kafkaIO to simplify pr
johnjcasey Mar 9, 2022
258d7a9
[BEAM-10529] add coders to go exec, add asf license text
johnjcasey Mar 9, 2022
af2930f
Merge remote-tracking branch 'origin/master' into feature/BEAM-10529-…
johnjcasey Mar 9, 2022
d19c47c
[BEAM-10529] clean up error handlign
johnjcasey Mar 9, 2022
269b414
[BEAM-10529] update go fromyaml to handle nullable cases
johnjcasey Mar 9, 2022
4656b31
[BEAM-10529] add unit test, register nullable coder in dataflow.go
johnjcasey Mar 9, 2022
59d3d73
[BEAM-10529] remove mistaken commit
johnjcasey Mar 9, 2022
b54da74
Merge branch 'apache:master' into feature/BEAM-10529-nullable-Xlang-C…
johnjcasey Mar 14, 2022
f60e4ea
[BEAM-10529] add argument check to CoderTranslators
johnjcasey Mar 29, 2022
1805a49
[BEAM-10529] Address python comments & cleanup
johnjcasey Mar 29, 2022
40556b7
Merge remote-tracking branch 'origin/feature/BEAM-10529-nullable-Xlan…
johnjcasey Mar 29, 2022
488e1d1
Merge remote-tracking branch 'origin/master' into feature/BEAM-10529-…
johnjcasey Mar 29, 2022
cc243f1
[BEAM-10529] address go comments
johnjcasey Mar 30, 2022
a343a57
Merge remote-tracking branch 'origin/master' into feature/BEAM-10529-…
johnjcasey Apr 1, 2022
c9761f5
[BEAM-10529] remove extra check that was added in error
johnjcasey Apr 1, 2022
c5ae489
[BEAM-10529] fix typo
johnjcasey Apr 1, 2022
d0a5979
[BEAM-10529] re-order check for nonetype to prevent attribute errors
johnjcasey Apr 1, 2022
962ec4b
Merge remote-tracking branch 'origin/feature/BEAM-10529-nullable-Xlan…
johnjcasey Apr 1, 2022
dbe1bb3
[BEAM-10529] change isinstance to ==
johnjcasey Apr 1, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -569,3 +569,15 @@ coder:
examples:
"\u0080\u0000\u0001\u0052\u009a\u00a4\u009b\u0067\u0080\u0000\u0001\u0052\u009a\u00a4\u009b\u0068\u0080\u00dd\u00db\u0001" : {window: {end: 1454293425000, span: 3600000}}
"\u007f\u00df\u003b\u0064\u005a\u001c\u00ad\u0075\u007f\u00df\u003b\u0064\u005a\u001c\u00ad\u0076\u00ed\u0002" : {window: {end: -9223372036854410, span: 365}}


---
coder:
urn: "beam:coder:nullable:v1"
components: [{urn: "beam:coder:bytes:v1"}]
nested: true

examples:
"\u0001\u0003\u0061\u0062\u0063" : "abc"
"\u0001\u000a\u006d\u006f\u0072\u0065\u0020\u0062\u0079\u0074\u0065\u0073" : "more bytes"
"\u0000" : null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify, only the last byte here will be in the an encoded element that has value null ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep. 0000 means null, 0001 means "the rest of this is non-null"

16 changes: 11 additions & 5 deletions model/pipeline/src/main/proto/beam_runner_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1043,11 +1043,8 @@ message StandardCoders {
// - Followed by N interleaved keys and values, encoded with their
// corresponding coder.
//
// Nullable types in container types (ArrayType, MapType) are encoded by:
// - A one byte null indicator, 0x00 for null values, or 0x01 for present
// values.
// - For present values the null indicator is followed by the value
// encoded with it's corresponding coder.
// Nullable types in container types (ArrayType, MapType) per the
// encoding described for general Nullable types below.
//
// Well known logical types:
// beam:logical_type:micros_instant:v1
Expand Down Expand Up @@ -1085,6 +1082,15 @@ message StandardCoders {
// Components: the user key coder.
// Experimental.
SHARDED_KEY = 15 [(beam_urn) = "beam:coder:sharded_key:v1"];

// Wraps a coder of a potentially null value
// A Nullable Type is encoded by:
// - A one byte null indicator, 0x00 for null values, or 0x01 for present
// values.
// - For present values the null indicator is followed by the value
// encoded with it's corresponding coder.
// Components: single coder for the value
NULLABLE = 17 [(beam_urn) = "beam:coder:nullable:v1"];
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.LengthPrefixCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.coders.TimestampPrefixingWindowCoder;
import org.apache.beam.sdk.schemas.Schema;
Expand Down Expand Up @@ -204,6 +205,22 @@ public List<? extends Coder<?>> getComponents(TimestampPrefixingWindowCoder<?> f
};
}

static CoderTranslator<NullableCoder<?>> nullable() {
return new SimpleStructuredCoderTranslator<NullableCoder<?>>() {
@Override
protected NullableCoder<?> fromComponents(List<Coder<?>> components) {
checkArgument(
components.size() == 1, "Expected one component, but received: " + components);
return NullableCoder.of(components.get(0));
}

@Override
public List<? extends Coder<?>> getComponents(NullableCoder<?> from) {
return from.getComponents();
}
};
}

public abstract static class SimpleStructuredCoderTranslator<T extends Coder<?>>
implements CoderTranslator<T> {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.LengthPrefixCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.TimestampPrefixingWindowCoder;
Expand Down Expand Up @@ -73,6 +74,7 @@ public class ModelCoderRegistrar implements CoderTranslatorRegistrar {
.put(RowCoder.class, ModelCoders.ROW_CODER_URN)
.put(ShardedKey.Coder.class, ModelCoders.SHARDED_KEY_CODER_URN)
.put(TimestampPrefixingWindowCoder.class, ModelCoders.CUSTOM_WINDOW_CODER_URN)
.put(NullableCoder.class, ModelCoders.NULLABLE_CODER_URN)
.build();

private static final Map<Class<? extends Coder>, CoderTranslator<? extends Coder>>
Expand All @@ -96,6 +98,7 @@ public class ModelCoderRegistrar implements CoderTranslatorRegistrar {
.put(RowCoder.class, CoderTranslators.row())
.put(ShardedKey.Coder.class, CoderTranslators.shardedKey())
.put(TimestampPrefixingWindowCoder.class, CoderTranslators.timestampPrefixingWindow())
.put(NullableCoder.class, CoderTranslators.nullable())
.build();

static {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ private ModelCoders() {}

public static final String SHARDED_KEY_CODER_URN = getUrn(StandardCoders.Enum.SHARDED_KEY);

public static final String NULLABLE_CODER_URN = getUrn(StandardCoders.Enum.NULLABLE);

static {
checkState(
STATE_BACKED_ITERABLE_CODER_URN.equals(getUrn(StandardCoders.Enum.STATE_BACKED_ITERABLE)));
Expand All @@ -90,7 +92,8 @@ private ModelCoders() {}
ROW_CODER_URN,
PARAM_WINDOWED_VALUE_CODER_URN,
STATE_BACKED_ITERABLE_CODER_URN,
SHARDED_KEY_CODER_URN);
SHARDED_KEY_CODER_URN,
NULLABLE_CODER_URN);

public static Set<String> urns() {
return MODEL_CODER_URNS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.LengthPrefixCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
Expand Down Expand Up @@ -97,6 +98,7 @@ public class CoderTranslationTest {
Field.of("bar", FieldType.logicalType(FixedBytes.of(123))))))
.add(ShardedKey.Coder.of(StringUtf8Coder.of()))
.add(TimestampPrefixingWindowCoder.of(IntervalWindowCoder.of()))
.add(NullableCoder.of(ByteArrayCoder.of()))
.build();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.IterableLikeCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.TimestampPrefixingWindowCoder;
Expand Down Expand Up @@ -134,6 +135,7 @@ public class CommonCoderTest {
.put(getUrn(StandardCoders.Enum.SHARDED_KEY), ShardedKey.Coder.class)
.put(getUrn(StandardCoders.Enum.CUSTOM_WINDOW), TimestampPrefixingWindowCoder.class)
.put(getUrn(StandardCoders.Enum.STATE_BACKED_ITERABLE), StateBackedIterable.Coder.class)
.put(getUrn(StandardCoders.Enum.NULLABLE), NullableCoder.class)
.build();

@AutoValue
Expand Down Expand Up @@ -201,7 +203,7 @@ abstract static class OneCoderTestSpec {
@SuppressWarnings("mutable")
abstract byte[] getSerialized();

abstract Object getValue();
abstract @Nullable Object getValue();

static OneCoderTestSpec create(
CommonCoder coder, boolean nested, byte[] serialized, Object value) {
Expand Down Expand Up @@ -382,6 +384,17 @@ private static Object convertValue(Object value, CommonCoder coderSpec, Coder co
Map<String, Object> kvMap = (Map<String, Object>) value;
Coder windowCoder = ((TimestampPrefixingWindowCoder) coder).getWindowCoder();
return convertValue(kvMap.get("window"), coderSpec.getComponents().get(0), windowCoder);
} else if (s.equals(getUrn(StandardCoders.Enum.NULLABLE))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we generating individual tests for coders here or are we lumping everything into a single test ? If it's latter probably we should consider adding tests specifically for the new case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a single test in the java sense, but there are 3 test cases in the yaml file for nullable coders.

if (coderSpec.getComponents().size() == 1
&& coderSpec.getComponents().get(0).getUrn().equals(getUrn(StandardCoders.Enum.BYTES))) {
if (value == null) {
return null;
} else {
return ((String) value).getBytes(StandardCharsets.ISO_8859_1);
}
} else {
throw new IllegalStateException("Unknown or missing nested coder for nullable coder");
}
} else {
throw new IllegalStateException("Unknown coder URN: " + coderSpec.getUrn());
}
Expand Down Expand Up @@ -575,6 +588,8 @@ private void verifyDecodedValue(CommonCoder coder, Object expectedValue, Object
assertEquals(expectedValue, actualValue);
} else if (s.equals(getUrn(StandardCoders.Enum.CUSTOM_WINDOW))) {
assertEquals(expectedValue, actualValue);
} else if (s.equals(getUrn(StandardCoders.Enum.NULLABLE))) {
assertThat(expectedValue, equalTo(actualValue));
} else {
throw new IllegalStateException("Unknown coder URN: " + coder.getUrn());
}
Expand Down
19 changes: 17 additions & 2 deletions sdks/go/pkg/beam/core/graph/coder/coder.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ const (
VarInt Kind = "varint"
Double Kind = "double"
Row Kind = "R"
Nullable Kind = "N"
Timer Kind = "T"
PaneInfo Kind = "PI"
WindowedValue Kind = "W"
Expand Down Expand Up @@ -198,7 +199,7 @@ type Coder struct {
Kind Kind
T typex.FullType

Components []*Coder // WindowedValue, KV, CoGBK
Components []*Coder // WindowedValue, KV, CoGBK, Nullable
Custom *CustomCoder // Custom
Window *WindowCoder // WindowedValue

Expand Down Expand Up @@ -260,7 +261,7 @@ func (c *Coder) String() string {
switch c.Kind {
case WindowedValue, ParamWindowedValue, Window, Timer:
ret += fmt.Sprintf("!%v", c.Window)
case KV, CoGBK, Bytes, Bool, VarInt, Double, String, LP: // No additional info.
case KV, CoGBK, Bytes, Bool, VarInt, Double, String, LP, Nullable: // No additional info.
default:
ret += fmt.Sprintf("[%v]", c.T)
}
Expand Down Expand Up @@ -394,6 +395,20 @@ func NewKV(components []*Coder) *Coder {
}
}

func NewN(component *Coder) *Coder {
coders := []*Coder{component}
checkCodersNotNil(coders)
return &Coder{
Kind: Nullable,
T: typex.New(typex.NullableType, component.T),
Components: coders,
}
}

func IsNullable(c *Coder) bool {
return c.Kind == Nullable
}

// IsCoGBK returns true iff the coder is for a CoGBK type.
func IsCoGBK(c *Coder) bool {
return c.Kind == CoGBK
Expand Down
61 changes: 61 additions & 0 deletions sdks/go/pkg/beam/core/graph/coder/coder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ func TestCoder_String(t *testing.T) {
}, {
want: "KV<bytes,varint>",
c: NewKV([]*Coder{bytes, ints}),
}, {
want: "N<bytes>",
c: NewN(bytes),
}, {
want: "CoGBK<bytes,varint,bytes>",
c: NewCoGBK([]*Coder{bytes, ints, bytes}),
Expand Down Expand Up @@ -277,6 +280,10 @@ func TestCoder_Equals(t *testing.T) {
want: true,
a: NewKV([]*Coder{custom1, ints}),
b: NewKV([]*Coder{customSame, ints}),
}, {
want: true,
a: NewN(custom1),
b: NewN(customSame),
}, {
want: true,
a: NewCoGBK([]*Coder{custom1, ints, customSame}),
Expand Down Expand Up @@ -517,6 +524,60 @@ func TestNewKV(t *testing.T) {
}
}

func TestNewNullable(t *testing.T) {
bytes := NewBytes()

tests := []struct {
name string
component *Coder
shouldpanic bool
want *Coder
}{
{
name: "nil",
component: nil,
shouldpanic: true,
},
{
name: "empty",
component: &Coder{},
shouldpanic: true,
},
{
name: "bytes",
component: bytes,
shouldpanic: false,
want: &Coder{
Kind: Nullable,
T: typex.New(typex.NullableType, bytes.T),
Components: []*Coder{bytes},
},
},
}

for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
if test.shouldpanic {
defer func() {
if p := recover(); p != nil {
t.Log(p)
return
}
t.Fatalf("NewNullable(%v): want panic", test.component)
}()
}
got := NewN(test.component)
if !IsNullable(got) {
t.Errorf("IsNullable(%v) = false, want true", got)
}
if test.want != nil && !test.want.Equals(got) {
t.Fatalf("NewNullable(%v) = %v, want %v", test.component, got, test.want)
}
})
}
}

func TestNewCoGBK(t *testing.T) {
bytes := NewBytes()
ints := NewVarInt()
Expand Down
32 changes: 0 additions & 32 deletions sdks/go/pkg/beam/core/graph/coder/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,24 +62,6 @@ func mapDecoder(rt reflect.Type, decodeToKey, decodeToElem typeDecoderFieldRefle
}
}

// containerNilDecoder handles when a value is nillable for map or iterable components.
// Nillable types have an extra byte prefixing them indicating nil status.
func containerNilDecoder(decodeToElem func(reflect.Value, io.Reader) error) func(reflect.Value, io.Reader) error {
return func(ret reflect.Value, r io.Reader) error {
hasValue, err := DecodeBool(r)
if err != nil {
return err
}
if !hasValue {
return nil
}
if err := decodeToElem(ret, r); err != nil {
return err
}
return nil
}
}

// mapEncoder reflectively encodes a map or array type using the beam map encoding.
func mapEncoder(rt reflect.Type, encodeKey, encodeValue typeEncoderFieldReflect) func(reflect.Value, io.Writer) error {
return func(rv reflect.Value, w io.Writer) error {
Expand Down Expand Up @@ -132,17 +114,3 @@ func mapEncoder(rt reflect.Type, encodeKey, encodeValue typeEncoderFieldReflect)
return nil
}
}

// containerNilEncoder handles when a value is nillable for map or iterable components.
// Nillable types have an extra byte prefixing them indicating nil status.
func containerNilEncoder(encodeElem func(reflect.Value, io.Writer) error) func(reflect.Value, io.Writer) error {
return func(rv reflect.Value, w io.Writer) error {
if rv.IsNil() {
return EncodeBool(false, w)
}
if err := EncodeBool(true, w); err != nil {
return err
}
return encodeElem(rv, w)
}
}
4 changes: 2 additions & 2 deletions sdks/go/pkg/beam/core/graph/coder/map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ func TestEncodeDecodeMap(t *testing.T) {
v.Set(reflect.New(reflectx.Uint8))
return byteDec(v.Elem(), r)
}
byteCtnrPtrEnc := containerNilEncoder(bytePtrEnc)
byteCtnrPtrDec := containerNilDecoder(bytePtrDec)
byteCtnrPtrEnc := NullableEncoder(bytePtrEnc)
byteCtnrPtrDec := NullableDecoder(bytePtrDec)

ptrByte := byte(42)

Expand Down
Loading