diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/AvroSchemaTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/AvroSchemaTest.java index 4ffad67646ec..f10733237fa1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/AvroSchemaTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/AvroSchemaTest.java @@ -32,9 +32,17 @@ import org.apache.avro.reflect.AvroSchema; import org.apache.avro.util.Utf8; import org.apache.beam.sdk.schemas.LogicalTypes.FixedBytes; +import org.apache.beam.sdk.schemas.Schema.Field; import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.transforms.Group; import org.apache.beam.sdk.schemas.utils.AvroUtils; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.ValidatesRunner; +import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; @@ -43,7 +51,9 @@ import org.joda.time.DateTimeZone; import org.joda.time.Days; import org.joda.time.LocalDate; +import org.junit.Rule; import org.junit.Test; +import org.junit.experimental.categories.Category; /** Tests for AVRO schema classes. */ public class AvroSchemaTest { @@ -406,4 +416,23 @@ public void testRowToPojo() { new AvroRecordSchema().fromRowFunction(TypeDescriptor.of(AvroPojo.class)); assertEquals(AVRO_POJO, fromRow.apply(ROW_FOR_POJO)); } + + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + + @Test + @Category(ValidatesRunner.class) + public void testAvroPipelineGroupBy() { + PCollection input = pipeline.apply(Create.of(ROW_FOR_POJO)).setRowSchema(POJO_SCHEMA); + + PCollection>> output = input.apply(Group.byFieldNames("string")); + PAssert.that(output) + .containsInAnyOrder( + KV.of( + Row.withSchema(Schema.of(Field.of("string", FieldType.STRING))) + .addValue("mystring") + .build(), + ImmutableList.of(ROW_FOR_POJO))); + + pipeline.run(); + } }