diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java index 2a6ca630e7e1..9a0805f52d2d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java @@ -264,6 +264,21 @@ public static Schema of(Field... fields) { return Schema.builder().addFields(fields).build(); } + /** Returns an identical Schema with sorted fields. */ + public Schema sorted() { + // Create a new schema and copy over the appropriate Schema object attributes: + // {fields, uuid, encodingPositions, options} + Schema sortedSchema = + this.fields.stream() + .sorted(Comparator.comparing(Field::getName)) + .collect(Schema.toSchema()) + .withOptions(getOptions()); + sortedSchema.setUUID(getUUID()); + sortedSchema.setEncodingPositions(getEncodingPositions()); + + return sortedSchema; + } + /** Returns a copy of the Schema with the options set. */ public Schema withOptions(Options options) { return new Schema(fields, getOptions().toBuilder().addOptions(options).build()); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java index 944f33a596de..c3ae723d1d6f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java @@ -61,7 +61,8 @@ Optional> dependencies(ConfigT configuration, PipelineOptions optio @Override public final Schema configurationSchema() { try { - return SchemaRegistry.createDefault().getSchema(configurationClass()); + // Sort the fields by name to ensure a consistent schema is produced + return SchemaRegistry.createDefault().getSchema(configurationClass()).sorted(); } catch (NoSuchSchemaException e) { throw new RuntimeException( "Unable to find schema for " diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java index 5477885c62bb..47746b599259 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java @@ -23,9 +23,15 @@ import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.beam.sdk.schemas.Schema.Field; import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.Schema.Options; import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType; import org.junit.Rule; import org.junit.Test; @@ -188,6 +194,82 @@ public void testCollector() { assertEquals(FieldType.STRING, schema.getField(1).getType()); } + @Test + public void testSorted() { + Options testOptions = + Options.builder() + .setOption("test_str_option", FieldType.STRING, "test_str") + .setOption("test_bool_option", FieldType.BOOLEAN, true) + .build(); + + Schema unorderedSchema = + Schema.builder() + .addStringField("d") + .addInt32Field("c") + .addStringField("b") + .addByteField("a") + .build() + .withOptions(testOptions); + + Schema unorderedSchemaAfterSorting = unorderedSchema.sorted(); + + Schema sortedSchema = + Schema.builder() + .addByteField("a") + .addStringField("b") + .addInt32Field("c") + .addStringField("d") + .build() + .withOptions(testOptions); + sortedSchema.setEncodingPositions(unorderedSchema.getEncodingPositions()); + + assertEquals(true, unorderedSchema.equivalent(unorderedSchemaAfterSorting)); + assertEquals( + true, + Objects.equals(unorderedSchemaAfterSorting.getFields(), sortedSchema.getFields()) + && Objects.equals(unorderedSchemaAfterSorting.getOptions(), sortedSchema.getOptions()) + && Objects.equals( + unorderedSchemaAfterSorting.getEncodingPositions(), + sortedSchema.getEncodingPositions())); + } + + @Test + public void testSortedMethodIncludesAllSchemaFields() { + // This test is most likely to break when new Schema object attributes are added. It is designed + // this way to make sure that the Schema::sorted() method is updated to return a full sorted + // copy. + + // Schema object attributes that are accounted for in Schema::sorted(). + // Note: Only the appropriate ones are copied over. + List attributesAccountedForInSorted = + Arrays.asList( + "fieldIndices", + "encodingPositions", + "encodingPositionsOverridden", + "fields", + "hashCode", + "uuid", + "options"); + + // Current attributes in Schema object. + List currentAttributes = + Arrays.stream(Schema.class.getDeclaredFields()) + .filter(field -> !field.isSynthetic()) + .map(java.lang.reflect.Field::getName) + .collect(Collectors.toList()); + + List differences = new ArrayList<>(currentAttributes); + differences.removeAll(attributesAccountedForInSorted); + + assertEquals( + String.format( + "Detected attributes %s in Schema object that are not accounted for in Schema::sorted(). " + + "If appropriate, sorted() should copy over these attributes as well. Either way, update this test after checking.", + differences.toString()), + currentAttributes, + attributesAccountedForInSorted); + } + @Test public void testEquivalent() { final Schema expectedNested1 =