From d3d7f7b86c7c0ac8a935aa248f9ccb38da9f27a2 Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Sun, 27 Nov 2022 06:38:04 +0000 Subject: [PATCH 1/5] add method to sort schema fields by name --- .../org/apache/beam/sdk/schemas/Schema.java | 12 +++++++++ .../TypedSchemaTransformProvider.java | 8 +++++- .../apache/beam/sdk/schemas/SchemaTest.java | 26 +++++++++++++++++++ 3 files changed, 45 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java index 2a6ca630e7e1..e301eddac8a9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java @@ -264,6 +264,18 @@ public static Schema of(Field... fields) { return Schema.builder().addFields(fields).build(); } + /** Returns an identical Schema with sorted fields. */ + public Schema sorted() { + Schema sortedSchema = this.fields.stream() + .sorted(Comparator.comparing(Field::getName)) + .collect(Schema.toSchema()) + .withOptions(getOptions()); + sortedSchema.setUUID(getUUID()); + sortedSchema.setEncodingPositions(getEncodingPositions()); + + return sortedSchema; + } + /** Returns a copy of the Schema with the options set. */ public Schema withOptions(Options options) { return new Schema(fields, getOptions().toBuilder().addOptions(options).build()); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java index 944f33a596de..d65ab0633320 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.schemas.transforms; +import java.util.Comparator; import java.util.List; import java.util.Optional; import org.apache.beam.sdk.annotations.Experimental; @@ -25,6 +26,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.schemas.NoSuchSchemaException; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.Field; import org.apache.beam.sdk.schemas.SchemaRegistry; import org.apache.beam.sdk.values.Row; @@ -40,6 +42,9 @@ */ @Internal @Experimental(Kind.SCHEMAS) +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20506) +}) public abstract class TypedSchemaTransformProvider implements SchemaTransformProvider { protected abstract Class configurationClass(); @@ -61,7 +66,8 @@ Optional> dependencies(ConfigT configuration, PipelineOptions optio @Override public final Schema configurationSchema() { try { - return SchemaRegistry.createDefault().getSchema(configurationClass()); + // Sort the fields by name to ensure a consistent schema is produced + return SchemaRegistry.createDefault().getSchema(configurationClass()).sorted(); } catch (NoSuchSchemaException e) { throw new RuntimeException( "Unable to find schema for " diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java index 5477885c62bb..7f98290f686e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java @@ -23,9 +23,12 @@ import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; +import java.util.Objects; +import java.util.UUID; import java.util.stream.Stream; import org.apache.beam.sdk.schemas.Schema.Field; import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.Schema.Options; import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType; import org.junit.Rule; import org.junit.Test; @@ -188,6 +191,29 @@ public void testCollector() { assertEquals(FieldType.STRING, schema.getField(1).getType()); } + @Test + public void testSorted() { + Options testOptions = Options.builder().setOption("test_str_option", FieldType.STRING, "test_str") + .setOption("test_bool_option", FieldType.BOOLEAN, true).build(); + + Schema schema1 = + Schema.builder().addStringField("d").addInt32Field("c").addStringField("b").addByteField("a").build() + .withOptions(testOptions); + + Schema sortedSchema1 = schema1.sorted(); + + Schema schema2 = + Schema.builder().addByteField("a").addStringField("b").addInt32Field("c").addStringField("d").build() + .withOptions(testOptions); + schema2.setEncodingPositions(schema1.getEncodingPositions()); + + assertEquals(true, schema1.equivalent(sortedSchema1)); + assertEquals(true, + Objects.equals(sortedSchema1.getFields(), schema2.getFields()) + && Objects.equals(sortedSchema1.getOptions(), schema2.getOptions()) + && Objects.equals(sortedSchema1.getEncodingPositions(), schema2.getEncodingPositions())); + } + @Test public void testEquivalent() { final Schema expectedNested1 = From 2a13bf0ed466bc061a178cf5fb4e64724d4b3bb9 Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Mon, 28 Nov 2022 13:32:30 +0000 Subject: [PATCH 2/5] spotless --- .../org/apache/beam/sdk/schemas/Schema.java | 9 +++--- .../TypedSchemaTransformProvider.java | 4 +-- .../apache/beam/sdk/schemas/SchemaTest.java | 28 ++++++++++++++----- 3 files changed, 27 insertions(+), 14 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java index e301eddac8a9..bd2382ce119d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java @@ -266,10 +266,11 @@ public static Schema of(Field... fields) { /** Returns an identical Schema with sorted fields. */ public Schema sorted() { - Schema sortedSchema = this.fields.stream() - .sorted(Comparator.comparing(Field::getName)) - .collect(Schema.toSchema()) - .withOptions(getOptions()); + Schema sortedSchema = + this.fields.stream() + .sorted(Comparator.comparing(Field::getName)) + .collect(Schema.toSchema()) + .withOptions(getOptions()); sortedSchema.setUUID(getUUID()); sortedSchema.setEncodingPositions(getEncodingPositions()); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java index d65ab0633320..3f143c5f5b8f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.schemas.transforms; -import java.util.Comparator; import java.util.List; import java.util.Optional; import org.apache.beam.sdk.annotations.Experimental; @@ -26,7 +25,6 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.schemas.NoSuchSchemaException; import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.schemas.Schema.Field; import org.apache.beam.sdk.schemas.SchemaRegistry; import org.apache.beam.sdk.values.Row; @@ -43,7 +41,7 @@ @Internal @Experimental(Kind.SCHEMAS) @SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20506) + "nullness" // TODO(https://github.com/apache/beam/issues/20506) }) public abstract class TypedSchemaTransformProvider implements SchemaTransformProvider { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java index 7f98290f686e..a294a31fbe43 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java @@ -24,7 +24,6 @@ import static org.junit.Assert.assertTrue; import java.util.Objects; -import java.util.UUID; import java.util.stream.Stream; import org.apache.beam.sdk.schemas.Schema.Field; import org.apache.beam.sdk.schemas.Schema.FieldType; @@ -193,25 +192,40 @@ public void testCollector() { @Test public void testSorted() { - Options testOptions = Options.builder().setOption("test_str_option", FieldType.STRING, "test_str") - .setOption("test_bool_option", FieldType.BOOLEAN, true).build(); + Options testOptions = + Options.builder() + .setOption("test_str_option", FieldType.STRING, "test_str") + .setOption("test_bool_option", FieldType.BOOLEAN, true) + .build(); Schema schema1 = - Schema.builder().addStringField("d").addInt32Field("c").addStringField("b").addByteField("a").build() + Schema.builder() + .addStringField("d") + .addInt32Field("c") + .addStringField("b") + .addByteField("a") + .build() .withOptions(testOptions); Schema sortedSchema1 = schema1.sorted(); Schema schema2 = - Schema.builder().addByteField("a").addStringField("b").addInt32Field("c").addStringField("d").build() + Schema.builder() + .addByteField("a") + .addStringField("b") + .addInt32Field("c") + .addStringField("d") + .build() .withOptions(testOptions); schema2.setEncodingPositions(schema1.getEncodingPositions()); assertEquals(true, schema1.equivalent(sortedSchema1)); - assertEquals(true, + assertEquals( + true, Objects.equals(sortedSchema1.getFields(), schema2.getFields()) && Objects.equals(sortedSchema1.getOptions(), schema2.getOptions()) - && Objects.equals(sortedSchema1.getEncodingPositions(), schema2.getEncodingPositions())); + && Objects.equals( + sortedSchema1.getEncodingPositions(), schema2.getEncodingPositions())); } @Test From aa00671bf97cea21e90034d7af4271a99b9bbfda Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Tue, 29 Nov 2022 17:16:50 +0000 Subject: [PATCH 3/5] make test more readable --- .../transforms/TypedSchemaTransformProvider.java | 3 --- .../org/apache/beam/sdk/schemas/SchemaTest.java | 16 ++++++++-------- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java index 3f143c5f5b8f..c3ae723d1d6f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java @@ -40,9 +40,6 @@ */ @Internal @Experimental(Kind.SCHEMAS) -@SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20506) -}) public abstract class TypedSchemaTransformProvider implements SchemaTransformProvider { protected abstract Class configurationClass(); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java index a294a31fbe43..ada1a92832fc 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java @@ -198,7 +198,7 @@ public void testSorted() { .setOption("test_bool_option", FieldType.BOOLEAN, true) .build(); - Schema schema1 = + Schema unorderedSchema = Schema.builder() .addStringField("d") .addInt32Field("c") @@ -207,9 +207,9 @@ public void testSorted() { .build() .withOptions(testOptions); - Schema sortedSchema1 = schema1.sorted(); + Schema unorderedSchemaAfterSorting = unorderedSchema.sorted(); - Schema schema2 = + Schema sortedSchema = Schema.builder() .addByteField("a") .addStringField("b") @@ -217,15 +217,15 @@ public void testSorted() { .addStringField("d") .build() .withOptions(testOptions); - schema2.setEncodingPositions(schema1.getEncodingPositions()); + sortedSchema.setEncodingPositions(unorderedSchema.getEncodingPositions()); - assertEquals(true, schema1.equivalent(sortedSchema1)); + assertEquals(true, unorderedSchema.equivalent(unorderedSchemaAfterSorting)); assertEquals( true, - Objects.equals(sortedSchema1.getFields(), schema2.getFields()) - && Objects.equals(sortedSchema1.getOptions(), schema2.getOptions()) + Objects.equals(unorderedSchemaAfterSorting.getFields(), sortedSchema.getFields()) + && Objects.equals(unorderedSchemaAfterSorting.getOptions(), sortedSchema.getOptions()) && Objects.equals( - sortedSchema1.getEncodingPositions(), schema2.getEncodingPositions())); + unorderedSchemaAfterSorting.getEncodingPositions(), sortedSchema.getEncodingPositions())); } @Test From 5a3d9d4cf43d16f5c4a3b411a911b43c86e8fcd6 Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Tue, 29 Nov 2022 18:20:42 +0000 Subject: [PATCH 4/5] add test that breaks when new attributes are added --- .../org/apache/beam/sdk/schemas/Schema.java | 2 + .../apache/beam/sdk/schemas/SchemaTest.java | 43 ++++++++++++++++++- 2 files changed, 44 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java index bd2382ce119d..9a0805f52d2d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java @@ -266,6 +266,8 @@ public static Schema of(Field... fields) { /** Returns an identical Schema with sorted fields. */ public Schema sorted() { + // Create a new schema and copy over the appropriate Schema object attributes: + // {fields, uuid, encodingPositions, options} Schema sortedSchema = this.fields.stream() .sorted(Comparator.comparing(Field::getName)) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java index ada1a92832fc..da05753bca22 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java @@ -23,7 +23,11 @@ import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.Objects; +import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.beam.sdk.schemas.Schema.Field; import org.apache.beam.sdk.schemas.Schema.FieldType; @@ -225,7 +229,44 @@ public void testSorted() { Objects.equals(unorderedSchemaAfterSorting.getFields(), sortedSchema.getFields()) && Objects.equals(unorderedSchemaAfterSorting.getOptions(), sortedSchema.getOptions()) && Objects.equals( - unorderedSchemaAfterSorting.getEncodingPositions(), sortedSchema.getEncodingPositions())); + unorderedSchemaAfterSorting.getEncodingPositions(), + sortedSchema.getEncodingPositions())); + } + + @Test + public void testSortedMethodIncludesAllSchemaFields() { + // This test is most likely to break when new Schema object attributes are added. It is designed + // this way to make sure that the Schema::sorted() method is updated to return a full sorted + // copy. + + // Schema object attributes that are accounted for in Schema::sorted(). + // Note: Only the appropriate ones are copied over. + List attributesAccountedForInSorted = + Arrays.asList( + "fieldIndices", + "encodingPositions", + "encodingPositionsOverridden", + "fields", + "hashCode", + "uuid", + "options"); + + // Current attributes in Schema object. + List currentAttributes = + Arrays.stream(Schema.class.getDeclaredFields()) + .map(java.lang.reflect.Field::getName) + .collect(Collectors.toList()); + + List differences = new ArrayList<>(currentAttributes); + differences.removeAll(attributesAccountedForInSorted); + + assertEquals( + String.format( + "Detected attributes %s in Schema object that are not accounted for in Schema::sorted(). " + + "If appropriate, sorted() should copy over these attributes as well. Either way, update this test after checking.", + differences.toString()), + currentAttributes, + attributesAccountedForInSorted); } @Test From 76b0db04a087ac1b612b17fd7990617ff840d693 Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Wed, 30 Nov 2022 04:48:08 +0000 Subject: [PATCH 5/5] ignore synthetic fields --- .../src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java index da05753bca22..47746b599259 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java @@ -254,6 +254,7 @@ public void testSortedMethodIncludesAllSchemaFields() { // Current attributes in Schema object. List currentAttributes = Arrays.stream(Schema.class.getDeclaredFields()) + .filter(field -> !field.isSynthetic()) .map(java.lang.reflect.Field::getName) .collect(Collectors.toList());