Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ examples:
"\u000A": 10
"\u00c8\u0001": 200
"\u00e8\u0007": 1000
"\u00a9\u0046": 9001
"\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u0001": -1

---
Expand Down Expand Up @@ -275,3 +276,25 @@ examples:
"\u007f\u00f0\0\0\0\0\0\0": "Infinity"
"\u00ff\u00f0\0\0\0\0\0\0": "-Infinity"
"\u007f\u00f8\0\0\0\0\0\0": "NaN"

---

coder:
urn: "beam:coder:row:v1"
# str: string, i32: int32, f64: float64, arr: array[string]
payload: "\n\t\n\x03str\x1a\x02\x10\x07\n\t\n\x03i32\x1a\x02\x10\x03\n\t\n\x03f64\x1a\x02\x10\x06\n\r\n\x03arr\x1a\x06\x1a\x04\n\x02\x10\x07\x12$4e5e554c-d4c1-4a5d-b5e1-f3293a6b9f05"
nested: false
examples:
"\u0004\u0000\u0003foo\u00a9\u0046\u003f\u00b9\u0099\u0099\u0099\u0099\u0099\u009a\0\0\0\u0003\u0003foo\u0003bar\u0003baz": {str: "foo", i32: 9001, f64: "0.1", arr: ["foo", "bar", "baz"]}

---

coder:
urn: "beam:coder:row:v1"
# str: nullable string, i32: nullable int32, f64: nullable float64
payload: "\n\x0b\n\x03str\x1a\x04\x08\x01\x10\x07\n\x0b\n\x03i32\x1a\x04\x08\x01\x10\x03\n\x0b\n\x03f64\x1a\x04\x08\x01\x10\x06\x12$b20c6545-57af-4bc8-b2a9-51ace21c7393"
nested: false
examples:
"\u0003\u0001\u0007": {str: null, i32: null, f64: null}
"\u0003\u0001\u0004\u0003foo\u00a9\u0046": {str: "foo", i32: 9001, f64: null}
"\u0003\u0000\u0003foo\u00a9\u0046\u003f\u00b9\u0099\u0099\u0099\u0099\u0099\u009a": {str: "foo", i32: 9001, f64: "0.1"}
44 changes: 44 additions & 0 deletions model/pipeline/src/main/proto/beam_runner_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,50 @@ message StandardCoders {
// Components: Coder for a single element.
// Experimental.
STATE_BACKED_ITERABLE = 9 [(beam_urn) = "beam:coder:state_backed_iterable:v1"];

// Additional Standard Coders
// --------------------------
// The following coders are not required to be implemented for an SDK or
// runner to support the Beam model, but enable users to take advantage of
// schema-aware transforms.

// Encodes a "row", an element with a known schema, defined by an
// instance of Schema from schema.proto.
//
// A row is encoded as the concatenation of:
// - The number of attributes in the schema, encoded with
// beam:coder:varint:v1. This makes it possible to detect certain
// allowed schema changes (appending or removing columns) in
// long-running streaming pipelines.
// - A byte array representing a packed bitset indicating null fields (a
// 1 indicating a null) encoded with beam:coder:bytes:v1. The unused
// bits in the last byte must be set to 0. If there are no nulls an
// empty byte array is encoded.
// The two-byte bitset (not including the lenghth-prefix) for the row
// [NULL, 0, 0, 0, NULL, 0, 0, NULL, 0, NULL] would be
// [0b10010001, 0b00000010]
// - An encoding for each non-null field, concatenated together.
//
// Schema types are mapped to coders as follows:
// AtomicType:
// BYTE: not yet a standard coder (BEAM-7996)
// INT16: not yet a standard coder (BEAM-7996)
// INT32: beam:coder:varint:v1
// INT64: beam:coder:varint:v1
// FLOAT: not yet a standard coder (BEAM-7996)
// DOUBLE: beam:coder:double:v1
// STRING: beam:coder:string_utf8:v1
// BOOLEAN: beam:coder:bool:v1
// BYTES: beam:coder:bytes:v1
// ArrayType: beam:coder:iterable:v1 (always has a known length)
// MapType: not yet a standard coder (BEAM-7996)
// RowType: beam:coder:row:v1
// LogicalType: Uses the coder for its representation.
//
// The payload for RowCoder is an instance of Schema.
// Components: None
// Experimental.
ROW = 13 [(beam_urn) = "beam:coder:row:v1"];
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,22 @@
*/
package org.apache.beam.runners.core.construction;

import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;

import java.util.Collections;
import java.util.List;
import org.apache.beam.model.pipeline.v1.SchemaApi;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.LengthPrefixCoder;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.InstanceBuilder;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;

/** {@link CoderTranslator} implementations for known coder types. */
Expand Down Expand Up @@ -118,6 +124,33 @@ public FullWindowedValueCoder<?> fromComponents(List<Coder<?>> components) {
};
}

static CoderTranslator<RowCoder> row() {
return new CoderTranslator<RowCoder>() {
@Override
public List<? extends Coder<?>> getComponents(RowCoder from) {
return ImmutableList.of();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So for the time being, we're inlining everything, rather than using components. Was there a bug tracking doing better for this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes right now there's just a fixed mapping from fieldtype to coder. There's not a bug filed for using components, I was thinking that we would just continue inlining everything. Do you think we should plan on using components instead? What does that get us?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For coders, if one had a coder T one was likely to have KV<K, T> for various K, an Iterable, WindowedValue for possibly several window types, and various other permutations. Coupled with the fact that leaf coders were often huge serialized blobs made for some pretty significant savings.

Maybe this'll be less of an issue in the streaming world. I think it should not be a blocker assuming we'll be able to update this in the (short-term) future.

}

@Override
public byte[] getPayload(RowCoder from) {
return SchemaTranslation.schemaToProto(from.getSchema()).toByteArray();
}

@Override
public RowCoder fromComponents(List<Coder<?>> components, byte[] payload) {
checkArgument(
components.isEmpty(), "Expected empty component list, but received: " + components);
Schema schema;
try {
schema = SchemaTranslation.fromProto(SchemaApi.Schema.parseFrom(payload));
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException("Unable to parse schema for RowCoder: ", e);
}
return RowCoder.of(schema);
}
};
}

public abstract static class SimpleStructuredCoderTranslator<T extends Coder<?>>
implements CoderTranslator<T> {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.LengthPrefixCoder;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
Expand Down Expand Up @@ -60,6 +61,7 @@ public class ModelCoderRegistrar implements CoderTranslatorRegistrar {
.put(GlobalWindow.Coder.class, ModelCoders.GLOBAL_WINDOW_CODER_URN)
.put(FullWindowedValueCoder.class, ModelCoders.WINDOWED_VALUE_CODER_URN)
.put(DoubleCoder.class, ModelCoders.DOUBLE_CODER_URN)
.put(RowCoder.class, ModelCoders.ROW_CODER_URN)
.build();

public static final Set<String> WELL_KNOWN_CODER_URNS = BEAM_MODEL_CODER_URNS.values();
Expand All @@ -79,6 +81,7 @@ public class ModelCoderRegistrar implements CoderTranslatorRegistrar {
.put(LengthPrefixCoder.class, CoderTranslators.lengthPrefix())
.put(FullWindowedValueCoder.class, CoderTranslators.fullWindowedValue())
.put(DoubleCoder.class, CoderTranslators.atomic(DoubleCoder.class))
.put(RowCoder.class, CoderTranslators.row())
.build();

static {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ private ModelCoders() {}

public static final String WINDOWED_VALUE_CODER_URN = getUrn(StandardCoders.Enum.WINDOWED_VALUE);

public static final String ROW_CODER_URN = getUrn(StandardCoders.Enum.ROW);

private static final Set<String> MODEL_CODER_URNS =
ImmutableSet.of(
BYTES_CODER_URN,
Expand All @@ -67,7 +69,8 @@ private ModelCoders() {}
GLOBAL_WINDOW_CODER_URN,
INTERVAL_WINDOW_CODER_URN,
WINDOWED_VALUE_CODER_URN,
DOUBLE_CODER_URN);
DOUBLE_CODER_URN,
ROW_CODER_URN);

public static Set<String> urns() {
return MODEL_CODER_URNS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,14 @@
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.LengthPrefixCoder;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.schemas.LogicalTypes;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder;
import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
Expand All @@ -60,8 +64,8 @@

/** Tests for {@link CoderTranslation}. */
public class CoderTranslationTest {
private static final Set<StructuredCoder<?>> KNOWN_CODERS =
ImmutableSet.<StructuredCoder<?>>builder()
private static final Set<Coder<?>> KNOWN_CODERS =
ImmutableSet.<Coder<?>>builder()
.add(ByteArrayCoder.of())
.add(BooleanCoder.of())
.add(KvCoder.of(VarLongCoder.of(), VarLongCoder.of()))
Expand All @@ -76,6 +80,13 @@ public class CoderTranslationTest {
FullWindowedValueCoder.of(
IterableCoder.of(VarLongCoder.of()), IntervalWindowCoder.of()))
.add(DoubleCoder.of())
.add(
RowCoder.of(
Schema.of(
Field.of("i16", FieldType.INT16),
Field.of("array", FieldType.array(FieldType.STRING)),
Field.of("map", FieldType.map(FieldType.STRING, FieldType.INT32)),
Field.of("bar", FieldType.logicalType(LogicalTypes.FixedBytes.of(123))))))
.build();

/**
Expand Down
Loading