Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.apache.beam.runners.core.construction.SdkComponents;
import org.apache.beam.sdk.coders.AvroCoder;
Expand All @@ -54,13 +55,15 @@
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.join.CoGbkResult.CoGbkResultCoder;
import org.apache.beam.sdk.transforms.join.CoGbkResultSchema;
import org.apache.beam.sdk.transforms.join.UnionCoder;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.util.InstanceBuilder;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList.Builder;
Expand Down Expand Up @@ -162,8 +165,8 @@ public static Iterable<Coder<?>> data() {
CoGbkResultSchema.of(
ImmutableList.of(new TupleTag<Long>(), new TupleTag<byte[]>())),
UnionCoder.of(ImmutableList.of(VarLongCoder.of(), ByteArrayCoder.of()))))
.add(SchemaCoder.of(Schema.builder().build()))
.add(SchemaCoder.of(TEST_SCHEMA));
.add(SchemaCoder.of(Schema.builder().build(), new RowIdentity(), new RowIdentity()))
.add(SchemaCoder.of(TEST_SCHEMA, new RowIdentity(), new RowIdentity()));
for (Class<? extends Coder> atomicCoder :
DefaultCoderCloudObjectTranslatorRegistrar.KNOWN_ATOMIC_CODERS) {
dataBuilder.add(InstanceBuilder.ofType(atomicCoder).fromFactoryMethod("of").build());
Expand Down Expand Up @@ -268,4 +271,25 @@ public List<? extends Coder<?>> getCoderArguments() {
@Override
public void verifyDeterministic() throws NonDeterministicException {}
}

/** Hack to satisfy SchemaCoder.equals until BEAM-8146 is fixed. */
private static class RowIdentity implements SerializableFunction<Row, Row> {
@Override
public Row apply(Row input) {
return input;
}

@Override
public int hashCode() {
return Objects.hash(getClass());
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
return o != null && getClass() == o.getClass();
}
Comment on lines +276 to +293
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note I moved this RowIdentity class here just to make CloudObjectsTest happy until #9493 is in, and we can pass in some real SchemaCoder instances created by the providers.

}
}
228 changes: 7 additions & 221 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,236 +17,22 @@
*/
package org.apache.beam.sdk.coders;

import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.Schema.TypeName;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;

/** A {@link Coder} for {@link Row}. It wraps the {@link Coder} for each element directly. */
/** A sub-class of SchemaCoder that can only encode {@link Row} instances. */
@Experimental
public class RowCoder extends CustomCoder<Row> {
// This contains a map of primitive types to their coders.
static final ImmutableMap<TypeName, Coder> CODER_MAP =
ImmutableMap.<TypeName, Coder>builder()
.put(TypeName.BYTE, ByteCoder.of())
.put(TypeName.BYTES, ByteArrayCoder.of())
.put(TypeName.INT16, BigEndianShortCoder.of())
.put(TypeName.INT32, VarIntCoder.of())
.put(TypeName.INT64, VarLongCoder.of())
.put(TypeName.DECIMAL, BigDecimalCoder.of())
.put(TypeName.FLOAT, FloatCoder.of())
.put(TypeName.DOUBLE, DoubleCoder.of())
.put(TypeName.STRING, StringUtf8Coder.of())
.put(TypeName.DATETIME, InstantCoder.of())
.put(TypeName.BOOLEAN, BooleanCoder.of())
.build();

private static final ImmutableMap<TypeName, Integer> ESTIMATED_FIELD_SIZES =
ImmutableMap.<TypeName, Integer>builder()
.put(TypeName.BYTE, Byte.BYTES)
.put(TypeName.INT16, Short.BYTES)
.put(TypeName.INT32, Integer.BYTES)
.put(TypeName.INT64, Long.BYTES)
.put(TypeName.FLOAT, Float.BYTES)
.put(TypeName.DOUBLE, Double.BYTES)
.put(TypeName.DECIMAL, 32)
.put(TypeName.BOOLEAN, 1)
.put(TypeName.DATETIME, Long.BYTES)
.build();

private final Schema schema;

public UUID getId() {
return id;
}

private final UUID id;
@Nullable private transient Coder<Row> delegateCoder = null;

public class RowCoder extends SchemaCoder<Row> {
public static RowCoder of(Schema schema) {
UUID id = (schema.getUUID() == null) ? UUID.randomUUID() : schema.getUUID();
return new RowCoder(schema, id);
}

private RowCoder(Schema schema, UUID id) {
if (schema.getUUID() != null) {
checkArgument(
schema.getUUID().equals(id),
"Schema has a UUID that doesn't match argument to constructor. %s v.s. %s",
schema.getUUID(),
id);
} else {
// Clone the schema before modifying the Java object.
schema = SerializableUtils.clone(schema);
setSchemaIds(schema, id);
}
this.schema = schema;
this.id = id;
}

// Sets the schema id, and then recursively ensures that all schemas have ids set.
private void setSchemaIds(Schema schema, UUID id) {
if (schema.getUUID() == null) {
schema.setUUID(id);
}
for (Field field : schema.getFields()) {
setSchemaIds(field.getType());
}
return new RowCoder(schema);
}

private void setSchemaIds(FieldType fieldType) {
switch (fieldType.getTypeName()) {
case ROW:
setSchemaIds(fieldType.getRowSchema(), UUID.randomUUID());
return;
case MAP:
setSchemaIds(fieldType.getMapKeyType());
setSchemaIds(fieldType.getMapValueType());
return;
case LOGICAL_TYPE:
setSchemaIds(fieldType.getLogicalType().getBaseType());
return;

case ARRAY:
setSchemaIds(fieldType.getCollectionElementType());
return;

default:
return;
}
}

// Return the generated coder class for this schema.
private Coder<Row> getDelegateCoder() {
if (delegateCoder == null) {
// RowCoderGenerator caches based on id, so if a new instance of this RowCoder is
// deserialized, we don't need to run ByteBuddy again to construct the class.
delegateCoder = RowCoderGenerator.generate(schema);
}
return delegateCoder;
}

@Override
public void encode(Row value, OutputStream outStream) throws IOException {
getDelegateCoder().encode(value, outStream);
}

@Override
public Row decode(InputStream inStream) throws IOException {
return getDelegateCoder().decode(inStream);
}

public Schema getSchema() {
return schema;
}

@Override
public void verifyDeterministic()
throws org.apache.beam.sdk.coders.Coder.NonDeterministicException {
verifyDeterministic(schema);
}

private void verifyDeterministic(Schema schema)
throws org.apache.beam.sdk.coders.Coder.NonDeterministicException {

List<Coder<?>> coders =
schema.getFields().stream()
.map(Field::getType)
.map(RowCoder::coderForFieldType)
.collect(Collectors.toList());

Coder.verifyDeterministic(this, "All fields must have deterministic encoding", coders);
}

@Override
public boolean consistentWithEquals() {
return true;
}

/** Returns the coder used for a given primitive type. */
public static <T> Coder<T> coderForFieldType(FieldType fieldType) {
switch (fieldType.getTypeName()) {
case ROW:
return (Coder<T>) RowCoder.of(fieldType.getRowSchema());
case ARRAY:
return (Coder<T>) ListCoder.of(coderForFieldType(fieldType.getCollectionElementType()));
case MAP:
return (Coder<T>)
MapCoder.of(
coderForFieldType(fieldType.getMapKeyType()),
coderForFieldType(fieldType.getMapValueType()));
default:
return (Coder<T>) CODER_MAP.get(fieldType.getTypeName());
}
}

/** Return the estimated serialized size of a give row object. */
public static long estimatedSizeBytes(Row row) {
Schema schema = row.getSchema();
int fieldCount = schema.getFieldCount();
int bitmapSize = (((fieldCount - 1) >> 6) + 1) * 8;

int fieldsSize = 0;
for (int i = 0; i < schema.getFieldCount(); ++i) {
fieldsSize += (int) estimatedSizeBytes(schema.getField(i).getType(), row.getValue(i));
}
return (long) bitmapSize + fieldsSize;
}

private static long estimatedSizeBytes(FieldType typeDescriptor, Object value) {
switch (typeDescriptor.getTypeName()) {
case LOGICAL_TYPE:
return estimatedSizeBytes(typeDescriptor.getLogicalType().getBaseType(), value);
case ROW:
return estimatedSizeBytes((Row) value);
case ARRAY:
List list = (List) value;
long listSizeBytes = 0;
for (Object elem : list) {
listSizeBytes += estimatedSizeBytes(typeDescriptor.getCollectionElementType(), elem);
}
return 4 + listSizeBytes;
case BYTES:
byte[] bytes = (byte[]) value;
return 4L + bytes.length;
case MAP:
Map<Object, Object> map = (Map<Object, Object>) value;
long mapSizeBytes = 0;
for (Map.Entry<Object, Object> elem : map.entrySet()) {
mapSizeBytes +=
typeDescriptor.getMapKeyType().getTypeName().equals(TypeName.STRING)
? ((String) elem.getKey()).length()
: ESTIMATED_FIELD_SIZES.get(typeDescriptor.getMapKeyType().getTypeName());
mapSizeBytes += estimatedSizeBytes(typeDescriptor.getMapValueType(), elem.getValue());
}
return 4 + mapSizeBytes;
case STRING:
// Not always accurate - String.getBytes().length() would be more accurate here, but slower.
return ((String) value).length();
default:
return ESTIMATED_FIELD_SIZES.get(typeDescriptor.getTypeName());
}
}

@Override
public String toString() {
String string = "Schema: " + schema + " UUID: " + id + " delegateCoder: " + getDelegateCoder();
return string;
private RowCoder(Schema schema) {
super(schema, SerializableFunctions.identity(), SerializableFunctions.identity());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.Schema.TypeName;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.ByteBuddy;
import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.description.modifier.FieldManifestation;
Expand Down Expand Up @@ -114,7 +115,7 @@ public abstract class RowCoderGenerator {
// Initialize the CODER_MAP with the StackManipulations to create the primitive coders.
// Assumes that each class contains a static of() constructor method.
CODER_MAP = Maps.newHashMap();
for (Map.Entry<TypeName, Coder> entry : RowCoder.CODER_MAP.entrySet()) {
for (Map.Entry<TypeName, Coder> entry : SchemaCoder.CODER_MAP.entrySet()) {
StackManipulation stackManipulation =
MethodInvocation.invoke(
new ForLoadedType(entry.getValue().getClass())
Expand Down
Loading