From ab51be172c875c804ec3c69cd90a273114bc771c Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Fri, 27 Sep 2019 16:03:45 -0700 Subject: [PATCH 1/4] Remove RowSize, RowSizeTest, and RowCoder.estimatedSizeBytes --- .../org/apache/beam/sdk/coders/RowCoder.java | 49 -------- .../beam/sdk/nexmark/model/sql/RowSize.java | 83 ------------- .../sdk/nexmark/model/sql/RowSizeTest.java | 109 ------------------ 3 files changed, 241 deletions(-) delete mode 100644 sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/sql/RowSize.java delete mode 100644 sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/model/sql/RowSizeTest.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java index f6cfe6ac34ae..f0180eb0506d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java @@ -23,7 +23,6 @@ import java.io.InputStream; import java.io.OutputStream; import java.util.List; -import java.util.Map; import java.util.UUID; import java.util.stream.Collectors; import javax.annotation.Nullable; @@ -194,54 +193,6 @@ public static Coder coderForFieldType(FieldType fieldType) { } } - /** Return the estimated serialized size of a give row object. */ - public static long estimatedSizeBytes(Row row) { - Schema schema = row.getSchema(); - int fieldCount = schema.getFieldCount(); - int bitmapSize = (((fieldCount - 1) >> 6) + 1) * 8; - - int fieldsSize = 0; - for (int i = 0; i < schema.getFieldCount(); ++i) { - fieldsSize += (int) estimatedSizeBytes(schema.getField(i).getType(), row.getValue(i)); - } - return (long) bitmapSize + fieldsSize; - } - - private static long estimatedSizeBytes(FieldType typeDescriptor, Object value) { - switch (typeDescriptor.getTypeName()) { - case LOGICAL_TYPE: - return estimatedSizeBytes(typeDescriptor.getLogicalType().getBaseType(), value); - case ROW: - return estimatedSizeBytes((Row) value); - case ARRAY: - List list = (List) value; - long listSizeBytes = 0; - for (Object elem : list) { - listSizeBytes += estimatedSizeBytes(typeDescriptor.getCollectionElementType(), elem); - } - return 4 + listSizeBytes; - case BYTES: - byte[] bytes = (byte[]) value; - return 4L + bytes.length; - case MAP: - Map map = (Map) value; - long mapSizeBytes = 0; - for (Map.Entry elem : map.entrySet()) { - mapSizeBytes += - typeDescriptor.getMapKeyType().getTypeName().equals(TypeName.STRING) - ? ((String) elem.getKey()).length() - : ESTIMATED_FIELD_SIZES.get(typeDescriptor.getMapKeyType().getTypeName()); - mapSizeBytes += estimatedSizeBytes(typeDescriptor.getMapValueType(), elem.getValue()); - } - return 4 + mapSizeBytes; - case STRING: - // Not always accurate - String.getBytes().length() would be more accurate here, but slower. - return ((String) value).length(); - default: - return ESTIMATED_FIELD_SIZES.get(typeDescriptor.getTypeName()); - } - } - @Override public String toString() { String string = "Schema: " + schema + " UUID: " + id + " delegateCoder: " + getDelegateCoder(); diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/sql/RowSize.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/sql/RowSize.java deleted file mode 100644 index eb58c3e8481e..000000000000 --- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/sql/RowSize.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.nexmark.model.sql; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CustomCoder; -import org.apache.beam.sdk.coders.RowCoder; -import org.apache.beam.sdk.coders.VarLongCoder; -import org.apache.beam.sdk.nexmark.model.KnownSize; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.Row; - -/** - * {@link KnownSize} implementation to estimate the size of a {@link Row}, similar to Java model. - * NexmarkLauncher/Queries infrastructure expects the events to be able to quickly provide the - * estimates of their sizes. - * - *

The {@link Row} size is calculated at creation time. - */ -public class RowSize implements KnownSize { - private static final Coder LONG_CODER = VarLongCoder.of(); - public static final Coder CODER = - new CustomCoder() { - @Override - public void encode(RowSize rowSize, OutputStream outStream) throws IOException { - - LONG_CODER.encode(rowSize.sizeInBytes(), outStream); - } - - @Override - public RowSize decode(InputStream inStream) throws IOException { - return new RowSize(LONG_CODER.decode(inStream)); - } - }; - - public static ParDo.SingleOutput parDo() { - return ParDo.of( - new DoFn() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(RowSize.of(c.element())); - } - }); - } - - public static RowSize of(Row row) { - return new RowSize(sizeInBytes(row)); - } - - private static long sizeInBytes(Row row) { - return RowCoder.estimatedSizeBytes(row); - } - - private long sizeInBytes; - - private RowSize(long sizeInBytes) { - this.sizeInBytes = sizeInBytes; - } - - @Override - public long sizeInBytes() { - return sizeInBytes; - } -} diff --git a/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/model/sql/RowSizeTest.java b/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/model/sql/RowSizeTest.java deleted file mode 100644 index aa1709a74f13..000000000000 --- a/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/model/sql/RowSizeTest.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.nexmark.model.sql; - -import static org.hamcrest.core.IsEqual.equalTo; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; - -import java.math.BigDecimal; -import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; -import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.schemas.SchemaCoder; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.testing.TestStream; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.Row; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; -import org.joda.time.DateTime; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; - -/** Unit tests for {@link RowSize}. */ -public class RowSizeTest { - - private static final Schema ROW_TYPE = - Schema.builder() - .addByteField("f_tinyint") - .addInt16Field("f_smallint") - .addInt32Field("f_int") - .addInt64Field("f_bigint") - .addFloatField("f_float") - .addDoubleField("f_double") - .addDecimalField("f_decimal") - .addBooleanField("f_boolean") - .addField("f_time", CalciteUtils.TIME) - .addField("f_date", CalciteUtils.DATE) - .addDateTimeField("f_timestamp") - .addField("f_char", CalciteUtils.CHAR) - .addField("f_varchar", CalciteUtils.VARCHAR) - .build(); - - private static final long ROW_SIZE = 96L; - - private static final Row ROW = - Row.withSchema(ROW_TYPE) - .addValues( - (byte) 1, - (short) 2, - (int) 3, - (long) 4, - (float) 5.12, - (double) 6.32, - new BigDecimal(7), - false, - new DateTime().withDate(2019, 03, 02), - new DateTime(10L), - new DateTime(11L), - "12", - "13") - .build(); - - @Rule public TestPipeline testPipeline = TestPipeline.create(); - @Rule public ExpectedException thrown = ExpectedException.none(); - - @Test - public void testCalculatesCorrectSize() throws Exception { - assertEquals(ROW_SIZE, RowSize.of(ROW).sizeInBytes()); - } - - @Test - public void testParDoConvertsToRecordSize() throws Exception { - PCollection rows = - testPipeline.apply( - TestStream.create(SchemaCoder.of(ROW_TYPE)) - .addElements(ROW) - .advanceWatermarkToInfinity()); - - PAssert.that(rows).satisfies(new CorrectSize()); - - testPipeline.run(); - } - - static class CorrectSize implements SerializableFunction, Void> { - @Override - public Void apply(Iterable input) { - RowSize recordSize = RowSize.of(Iterables.getOnlyElement(input)); - assertThat(recordSize.sizeInBytes(), equalTo(ROW_SIZE)); - return null; - } - } -} From af182a15aada34e22dc34581066726550735d970 Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Fri, 23 Aug 2019 17:03:31 -0700 Subject: [PATCH 2/4] Make RowCoder a trival sub-class of SchemaCoder ... so that it can be used as the java implementation of beam:coder:row:v1 in a later change All of its remaining functionality has been moved to SchemaCoder and RowCoderGenerator. --- .../org/apache/beam/sdk/coders/RowCoder.java | 179 +----------------- .../beam/sdk/coders/RowCoderGenerator.java | 3 +- .../apache/beam/sdk/schemas/SchemaCoder.java | 143 ++++++++++++-- .../schemas/transforms/SchemaAggregateFn.java | 5 +- 4 files changed, 143 insertions(+), 187 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java index f0180eb0506d..b3d502e53952 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java @@ -17,185 +17,20 @@ */ package org.apache.beam.sdk.coders; -import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.List; -import java.util.UUID; -import java.util.stream.Collectors; -import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.schemas.Schema.Field; -import org.apache.beam.sdk.schemas.Schema.FieldType; -import org.apache.beam.sdk.schemas.Schema.TypeName; -import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.schemas.SchemaCoder; +import org.apache.beam.sdk.transforms.SerializableFunctions; import org.apache.beam.sdk.values.Row; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; -/** A {@link Coder} for {@link Row}. It wraps the {@link Coder} for each element directly. */ +/** A sub-class of SchemaCoder that can only encode {@link Row} instances. */ @Experimental -public class RowCoder extends CustomCoder { - // This contains a map of primitive types to their coders. - static final ImmutableMap CODER_MAP = - ImmutableMap.builder() - .put(TypeName.BYTE, ByteCoder.of()) - .put(TypeName.BYTES, ByteArrayCoder.of()) - .put(TypeName.INT16, BigEndianShortCoder.of()) - .put(TypeName.INT32, VarIntCoder.of()) - .put(TypeName.INT64, VarLongCoder.of()) - .put(TypeName.DECIMAL, BigDecimalCoder.of()) - .put(TypeName.FLOAT, FloatCoder.of()) - .put(TypeName.DOUBLE, DoubleCoder.of()) - .put(TypeName.STRING, StringUtf8Coder.of()) - .put(TypeName.DATETIME, InstantCoder.of()) - .put(TypeName.BOOLEAN, BooleanCoder.of()) - .build(); - - private static final ImmutableMap ESTIMATED_FIELD_SIZES = - ImmutableMap.builder() - .put(TypeName.BYTE, Byte.BYTES) - .put(TypeName.INT16, Short.BYTES) - .put(TypeName.INT32, Integer.BYTES) - .put(TypeName.INT64, Long.BYTES) - .put(TypeName.FLOAT, Float.BYTES) - .put(TypeName.DOUBLE, Double.BYTES) - .put(TypeName.DECIMAL, 32) - .put(TypeName.BOOLEAN, 1) - .put(TypeName.DATETIME, Long.BYTES) - .build(); - - private final Schema schema; - - public UUID getId() { - return id; - } - - private final UUID id; - @Nullable private transient Coder delegateCoder = null; - +public class RowCoder extends SchemaCoder { public static RowCoder of(Schema schema) { - UUID id = (schema.getUUID() == null) ? UUID.randomUUID() : schema.getUUID(); - return new RowCoder(schema, id); - } - - private RowCoder(Schema schema, UUID id) { - if (schema.getUUID() != null) { - checkArgument( - schema.getUUID().equals(id), - "Schema has a UUID that doesn't match argument to constructor. %s v.s. %s", - schema.getUUID(), - id); - } else { - // Clone the schema before modifying the Java object. - schema = SerializableUtils.clone(schema); - setSchemaIds(schema, id); - } - this.schema = schema; - this.id = id; - } - - // Sets the schema id, and then recursively ensures that all schemas have ids set. - private void setSchemaIds(Schema schema, UUID id) { - if (schema.getUUID() == null) { - schema.setUUID(id); - } - for (Field field : schema.getFields()) { - setSchemaIds(field.getType()); - } - } - - private void setSchemaIds(FieldType fieldType) { - switch (fieldType.getTypeName()) { - case ROW: - setSchemaIds(fieldType.getRowSchema(), UUID.randomUUID()); - return; - case MAP: - setSchemaIds(fieldType.getMapKeyType()); - setSchemaIds(fieldType.getMapValueType()); - return; - case LOGICAL_TYPE: - setSchemaIds(fieldType.getLogicalType().getBaseType()); - return; - - case ARRAY: - setSchemaIds(fieldType.getCollectionElementType()); - return; - - default: - return; - } - } - - // Return the generated coder class for this schema. - private Coder getDelegateCoder() { - if (delegateCoder == null) { - // RowCoderGenerator caches based on id, so if a new instance of this RowCoder is - // deserialized, we don't need to run ByteBuddy again to construct the class. - delegateCoder = RowCoderGenerator.generate(schema); - } - return delegateCoder; - } - - @Override - public void encode(Row value, OutputStream outStream) throws IOException { - getDelegateCoder().encode(value, outStream); - } - - @Override - public Row decode(InputStream inStream) throws IOException { - return getDelegateCoder().decode(inStream); - } - - public Schema getSchema() { - return schema; - } - - @Override - public void verifyDeterministic() - throws org.apache.beam.sdk.coders.Coder.NonDeterministicException { - verifyDeterministic(schema); - } - - private void verifyDeterministic(Schema schema) - throws org.apache.beam.sdk.coders.Coder.NonDeterministicException { - - List> coders = - schema.getFields().stream() - .map(Field::getType) - .map(RowCoder::coderForFieldType) - .collect(Collectors.toList()); - - Coder.verifyDeterministic(this, "All fields must have deterministic encoding", coders); - } - - @Override - public boolean consistentWithEquals() { - return true; - } - - /** Returns the coder used for a given primitive type. */ - public static Coder coderForFieldType(FieldType fieldType) { - switch (fieldType.getTypeName()) { - case ROW: - return (Coder) RowCoder.of(fieldType.getRowSchema()); - case ARRAY: - return (Coder) ListCoder.of(coderForFieldType(fieldType.getCollectionElementType())); - case MAP: - return (Coder) - MapCoder.of( - coderForFieldType(fieldType.getMapKeyType()), - coderForFieldType(fieldType.getMapValueType())); - default: - return (Coder) CODER_MAP.get(fieldType.getTypeName()); - } + return new RowCoder(schema); } - @Override - public String toString() { - String string = "Schema: " + schema + " UUID: " + id + " delegateCoder: " + getDelegateCoder(); - return string; + private RowCoder(Schema schema) { + super(schema, SerializableFunctions.identity(), SerializableFunctions.identity()); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java index d8788d5201f5..b084a09e9fb7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java @@ -31,6 +31,7 @@ import org.apache.beam.sdk.schemas.Schema.Field; import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.Schema.TypeName; +import org.apache.beam.sdk.schemas.SchemaCoder; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.ByteBuddy; import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.description.modifier.FieldManifestation; @@ -114,7 +115,7 @@ public abstract class RowCoderGenerator { // Initialize the CODER_MAP with the StackManipulations to create the primitive coders. // Assumes that each class contains a static of() constructor method. CODER_MAP = Maps.newHashMap(); - for (Map.Entry entry : RowCoder.CODER_MAP.entrySet()) { + for (Map.Entry entry : SchemaCoder.CODER_MAP.entrySet()) { StackManipulation stackManipulation = MethodInvocation.invoke( new ForLoadedType(entry.getValue().getClass()) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java index 0199534c3aba..ff03f3d95df2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java @@ -20,28 +20,73 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.UUID; +import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.coders.BigDecimalCoder; +import org.apache.beam.sdk.coders.BigEndianShortCoder; +import org.apache.beam.sdk.coders.BooleanCoder; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.ByteCoder; +import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CustomCoder; -import org.apache.beam.sdk.coders.RowCoder; +import org.apache.beam.sdk.coders.DoubleCoder; +import org.apache.beam.sdk.coders.FloatCoder; +import org.apache.beam.sdk.coders.InstantCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.MapCoder; +import org.apache.beam.sdk.coders.RowCoderGenerator; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.schemas.Schema.Field; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.Schema.TypeName; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.SerializableFunctions; +import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; /** {@link SchemaCoder} is used as the coder for types that have schemas registered. */ @Experimental(Kind.SCHEMAS) public class SchemaCoder extends CustomCoder { - private final RowCoder rowCoder; + + // This contains a map of primitive types to their coders. + public static final ImmutableMap CODER_MAP = + ImmutableMap.builder() + .put(TypeName.BYTE, ByteCoder.of()) + .put(TypeName.BYTES, ByteArrayCoder.of()) + .put(TypeName.INT16, BigEndianShortCoder.of()) + .put(TypeName.INT32, VarIntCoder.of()) + .put(TypeName.INT64, VarLongCoder.of()) + .put(TypeName.DECIMAL, BigDecimalCoder.of()) + .put(TypeName.FLOAT, FloatCoder.of()) + .put(TypeName.DOUBLE, DoubleCoder.of()) + .put(TypeName.STRING, StringUtf8Coder.of()) + .put(TypeName.DATETIME, InstantCoder.of()) + .put(TypeName.BOOLEAN, BooleanCoder.of()) + .build(); + + private final Schema schema; private final SerializableFunction toRowFunction; private final SerializableFunction fromRowFunction; + @Nullable private transient Coder delegateCoder; - private SchemaCoder( + protected SchemaCoder( Schema schema, SerializableFunction toRowFunction, SerializableFunction fromRowFunction) { + if (schema.getUUID() == null) { + // Clone the schema before modifying the Java object. + schema = SerializableUtils.clone(schema); + setSchemaIds(schema); + } this.toRowFunction = toRowFunction; this.fromRowFunction = fromRowFunction; - this.rowCoder = RowCoder.of(schema); + this.schema = schema; } /** @@ -61,9 +106,26 @@ public static SchemaCoder of(Schema schema) { schema, SerializableFunctions.identity(), SerializableFunctions.identity()); } + /** Returns the coder used for a given primitive type. */ + public static Coder coderForFieldType(FieldType fieldType) { + switch (fieldType.getTypeName()) { + case ROW: + return (Coder) SchemaCoder.of(fieldType.getRowSchema()); + case ARRAY: + return (Coder) ListCoder.of(coderForFieldType(fieldType.getCollectionElementType())); + case MAP: + return (Coder) + MapCoder.of( + coderForFieldType(fieldType.getMapKeyType()), + coderForFieldType(fieldType.getMapValueType())); + default: + return (Coder) CODER_MAP.get(fieldType.getTypeName()); + } + } + /** Returns the schema associated with this type. */ public Schema getSchema() { - return rowCoder.getSchema(); + return schema; } /** Returns the toRow conversion function. */ @@ -76,28 +138,87 @@ public SerializableFunction getToRowFunction() { return toRowFunction; } + private Coder getDelegateCoder() { + if (delegateCoder == null) { + // RowCoderGenerator caches based on id, so if a new instance of this RowCoder is + // deserialized, we don't need to run ByteBuddy again to construct the class. + delegateCoder = RowCoderGenerator.generate(schema); + } + return delegateCoder; + } + @Override public void encode(T value, OutputStream outStream) throws IOException { - rowCoder.encode(toRowFunction.apply(value), outStream); + getDelegateCoder().encode(toRowFunction.apply(value), outStream); } @Override public T decode(InputStream inStream) throws IOException { - return fromRowFunction.apply(rowCoder.decode(inStream)); + return fromRowFunction.apply(getDelegateCoder().decode(inStream)); } @Override - public void verifyDeterministic() throws NonDeterministicException { - rowCoder.verifyDeterministic(); + public void verifyDeterministic() + throws org.apache.beam.sdk.coders.Coder.NonDeterministicException { + verifyDeterministic(schema); + } + + private void verifyDeterministic(Schema schema) + throws org.apache.beam.sdk.coders.Coder.NonDeterministicException { + + ImmutableList> coders = + schema.getFields().stream() + .map(Field::getType) + .map(SchemaCoder::coderForFieldType) + .collect(ImmutableList.toImmutableList()); + + Coder.verifyDeterministic(this, "All fields must have deterministic encoding", coders); } @Override public boolean consistentWithEquals() { - return rowCoder.consistentWithEquals(); + return true; } @Override public String toString() { - return "SchemaCoder: " + rowCoder.toString(); + return "SchemaCoder withSchema(Schema inputSchema, SerializableFunction toRowFuncti if (fieldAggregation.unnestedInputSubSchema.getFieldCount() == 1) { extractFunction = new ExtractSingleFieldFunction<>(fieldAggregation, toRowFunction); extractOutputCoder = - RowCoder.coderForFieldType( + SchemaCoder.coderForFieldType( fieldAggregation.unnestedInputSubSchema.getField(0).getType()); } else { extractFunction = new ExtractFieldsFunction<>(fieldAggregation, toRowFunction); - extractOutputCoder = RowCoder.of(fieldAggregation.inputSubSchema); + extractOutputCoder = SchemaCoder.of(fieldAggregation.inputSubSchema); } if (i == 0) { composedCombineFn = From c134f0a67a64e07844fa115f5d961c7b3f71713e Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Fri, 27 Sep 2019 16:21:55 -0700 Subject: [PATCH 3/4] Always use RowCoder.of when encoding rows --- .../main/java/org/apache/beam/sdk/schemas/SchemaCoder.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java index ff03f3d95df2..8e2db513ba98 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java @@ -36,6 +36,7 @@ import org.apache.beam.sdk.coders.InstantCoder; import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.coders.MapCoder; +import org.apache.beam.sdk.coders.RowCoder; import org.apache.beam.sdk.coders.RowCoderGenerator; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; @@ -44,7 +45,6 @@ import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.Schema.TypeName; import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.transforms.SerializableFunctions; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; @@ -100,10 +100,9 @@ public static SchemaCoder of( return new SchemaCoder<>(schema, toRowFunction, fromRowFunction); } - /** Returns a {@link SchemaCoder} for {@link Row} classes. */ + /** Returns a {@link SchemaCoder} for {@link Row} instances with the given {@code schema}. */ public static SchemaCoder of(Schema schema) { - return new SchemaCoder<>( - schema, SerializableFunctions.identity(), SerializableFunctions.identity()); + return RowCoder.of(schema); } /** Returns the coder used for a given primitive type. */ From e1e7813ddf42852a395a5da2de27efd99c5a1693 Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Tue, 1 Oct 2019 13:17:32 -0700 Subject: [PATCH 4/4] fixup! hack to make CloudObjectsTest happy for now --- .../beam/runners/dataflow/util/CloudObjectsTest.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java index 215567e10797..836fd7822772 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java @@ -52,6 +52,7 @@ import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.SchemaCoder; +import org.apache.beam.sdk.transforms.SerializableFunctions; import org.apache.beam.sdk.transforms.join.CoGbkResult.CoGbkResultCoder; import org.apache.beam.sdk.transforms.join.CoGbkResultSchema; import org.apache.beam.sdk.transforms.join.UnionCoder; @@ -143,7 +144,11 @@ public static Iterable> data() { CoGbkResultSchema.of( ImmutableList.of(new TupleTag(), new TupleTag())), UnionCoder.of(ImmutableList.of(VarLongCoder.of(), ByteArrayCoder.of())))) - .add(SchemaCoder.of(Schema.builder().build())); + .add( + SchemaCoder.of( + Schema.builder().build(), + SerializableFunctions.identity(), + SerializableFunctions.identity())); for (Class atomicCoder : DefaultCoderCloudObjectTranslatorRegistrar.KNOWN_ATOMIC_CODERS) { dataBuilder.add(InstanceBuilder.ofType(atomicCoder).fromFactoryMethod("of").build());