From 245e6b61d947e3623c870769185a7bb31d3de835 Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Thu, 29 Aug 2019 14:41:10 -0700 Subject: [PATCH 1/4] Add ValidatesRunner test to AvroSchemaTest --- .../beam/sdk/schemas/AvroSchemaTest.java | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/AvroSchemaTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/AvroSchemaTest.java index 4ffad67646ec..c30e50a47a71 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/AvroSchemaTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/AvroSchemaTest.java @@ -32,9 +32,16 @@ import org.apache.avro.reflect.AvroSchema; import org.apache.avro.util.Utf8; import org.apache.beam.sdk.schemas.LogicalTypes.FixedBytes; +import org.apache.beam.sdk.schemas.Schema.Field; import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.transforms.Select; import org.apache.beam.sdk.schemas.utils.AvroUtils; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.ValidatesRunner; +import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; @@ -43,7 +50,9 @@ import org.joda.time.DateTimeZone; import org.joda.time.Days; import org.joda.time.LocalDate; +import org.junit.Rule; import org.junit.Test; +import org.junit.experimental.categories.Category; /** Tests for AVRO schema classes. */ public class AvroSchemaTest { @@ -406,4 +415,21 @@ public void testRowToPojo() { new AvroRecordSchema().fromRowFunction(TypeDescriptor.of(AvroPojo.class)); assertEquals(AVRO_POJO, fromRow.apply(ROW_FOR_POJO)); } + + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + + @Test + @Category(ValidatesRunner.class) + public void testAvroPipeline() { + PCollection input = pipeline.apply(Create.of(ROW_FOR_POJO)).setRowSchema(POJO_SCHEMA); + + PCollection output = input.apply(Select.fieldNames("fixed")); + PAssert.that(output) + .containsInAnyOrder( + Row.withSchema(Schema.of(Field.of("fixed", FieldType.logicalType(FixedBytes.of(4))))) + .addValue(BYTE_ARRAY) + .build()); + + pipeline.run(); + } } From d7db5b3dd277334c66eeeb09bab8deba47b87b96 Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Wed, 4 Sep 2019 11:44:11 -0700 Subject: [PATCH 2/4] Make validates runner test do a group by to ensure there's a fusion break --- .../apache/beam/sdk/schemas/AvroSchemaTest.java | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/AvroSchemaTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/AvroSchemaTest.java index c30e50a47a71..f10733237fa1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/AvroSchemaTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/AvroSchemaTest.java @@ -34,13 +34,14 @@ import org.apache.beam.sdk.schemas.LogicalTypes.FixedBytes; import org.apache.beam.sdk.schemas.Schema.Field; import org.apache.beam.sdk.schemas.Schema.FieldType; -import org.apache.beam.sdk.schemas.transforms.Select; +import org.apache.beam.sdk.schemas.transforms.Group; import org.apache.beam.sdk.schemas.utils.AvroUtils; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.ValidatesRunner; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TypeDescriptor; @@ -420,15 +421,17 @@ public void testRowToPojo() { @Test @Category(ValidatesRunner.class) - public void testAvroPipeline() { + public void testAvroPipelineGroupBy() { PCollection input = pipeline.apply(Create.of(ROW_FOR_POJO)).setRowSchema(POJO_SCHEMA); - PCollection output = input.apply(Select.fieldNames("fixed")); + PCollection>> output = input.apply(Group.byFieldNames("string")); PAssert.that(output) .containsInAnyOrder( - Row.withSchema(Schema.of(Field.of("fixed", FieldType.logicalType(FixedBytes.of(4))))) - .addValue(BYTE_ARRAY) - .build()); + KV.of( + Row.withSchema(Schema.of(Field.of("string", FieldType.STRING))) + .addValue("mystring") + .build(), + ImmutableList.of(ROW_FOR_POJO))); pipeline.run(); } From fb93067aee5af06025ce0318f8a29d6d5ebf3500 Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Wed, 4 Sep 2019 11:48:55 -0700 Subject: [PATCH 3/4] Break logical types on dataflow --- .../runners/core/construction/SchemaTranslation.java | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SchemaTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SchemaTranslation.java index 26b154d12ed2..4f1100828577 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SchemaTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SchemaTranslation.java @@ -98,16 +98,7 @@ private static RunnerApi.Schema.FieldType toProto(FieldType fieldType) { break; case LOGICAL_TYPE: - LogicalType logicalType = fieldType.getLogicalType(); - builder.setLogicalType( - RunnerApi.Schema.LogicalType.newBuilder() - .setId(logicalType.getIdentifier()) - .setArgs(logicalType.getArgument()) - .setBaseType(toProto(logicalType.getBaseType())) - .setSerializedClass( - ByteString.copyFrom(SerializableUtils.serializeToByteArray(logicalType))) - .build()); - break; + throw new IllegalArgumentException("Logical types are not supported."); default: break; From 80ce9ede84f35b2674d07f3e40497c89464976e4 Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Wed, 4 Sep 2019 15:53:13 -0700 Subject: [PATCH 4/4] Revert "Break logical types on dataflow" This reverts commit fb93067aee5af06025ce0318f8a29d6d5ebf3500. --- .../runners/core/construction/SchemaTranslation.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SchemaTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SchemaTranslation.java index 4f1100828577..26b154d12ed2 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SchemaTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SchemaTranslation.java @@ -98,7 +98,16 @@ private static RunnerApi.Schema.FieldType toProto(FieldType fieldType) { break; case LOGICAL_TYPE: - throw new IllegalArgumentException("Logical types are not supported."); + LogicalType logicalType = fieldType.getLogicalType(); + builder.setLogicalType( + RunnerApi.Schema.LogicalType.newBuilder() + .setId(logicalType.getIdentifier()) + .setArgs(logicalType.getArgument()) + .setBaseType(toProto(logicalType.getBaseType())) + .setSerializedClass( + ByteString.copyFrom(SerializableUtils.serializeToByteArray(logicalType))) + .build()); + break; default: break;