Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,21 @@ public static Schema of(Field... fields) {
return Schema.builder().addFields(fields).build();
}

/** Returns an identical Schema with sorted fields. */
public Schema sorted() {
// Create a new schema and copy over the appropriate Schema object attributes:
// {fields, uuid, encodingPositions, options}
Schema sortedSchema =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add an assertion or a unit test that will break if any new properties are added to the Schema object that are not copied over here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, PTAL

this.fields.stream()
.sorted(Comparator.comparing(Field::getName))
.collect(Schema.toSchema())
.withOptions(getOptions());
sortedSchema.setUUID(getUUID());
sortedSchema.setEncodingPositions(getEncodingPositions());

return sortedSchema;
}

/** Returns a copy of the Schema with the options set. */
public Schema withOptions(Options options) {
return new Schema(fields, getOptions().toBuilder().addOptions(options).build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ Optional<List<String>> dependencies(ConfigT configuration, PipelineOptions optio
@Override
public final Schema configurationSchema() {
try {
return SchemaRegistry.createDefault().getSchema(configurationClass());
// Sort the fields by name to ensure a consistent schema is produced
return SchemaRegistry.createDefault().getSchema(configurationClass()).sorted();
} catch (NoSuchSchemaException e) {
throw new RuntimeException(
"Unable to find schema for "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,15 @@
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.Schema.Options;
import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -188,6 +194,82 @@ public void testCollector() {
assertEquals(FieldType.STRING, schema.getField(1).getType());
}

@Test
public void testSorted() {
Options testOptions =
Options.builder()
.setOption("test_str_option", FieldType.STRING, "test_str")
.setOption("test_bool_option", FieldType.BOOLEAN, true)
.build();

Schema unorderedSchema =
Schema.builder()
.addStringField("d")
.addInt32Field("c")
.addStringField("b")
.addByteField("a")
.build()
.withOptions(testOptions);

Schema unorderedSchemaAfterSorting = unorderedSchema.sorted();

Schema sortedSchema =
Schema.builder()
.addByteField("a")
.addStringField("b")
.addInt32Field("c")
.addStringField("d")
.build()
.withOptions(testOptions);
sortedSchema.setEncodingPositions(unorderedSchema.getEncodingPositions());

assertEquals(true, unorderedSchema.equivalent(unorderedSchemaAfterSorting));
assertEquals(
true,
Objects.equals(unorderedSchemaAfterSorting.getFields(), sortedSchema.getFields())
&& Objects.equals(unorderedSchemaAfterSorting.getOptions(), sortedSchema.getOptions())
&& Objects.equals(
unorderedSchemaAfterSorting.getEncodingPositions(),
sortedSchema.getEncodingPositions()));
}

@Test
public void testSortedMethodIncludesAllSchemaFields() {
// This test is most likely to break when new Schema object attributes are added. It is designed
// this way to make sure that the Schema::sorted() method is updated to return a full sorted
// copy.

// Schema object attributes that are accounted for in Schema::sorted().
// Note: Only the appropriate ones are copied over.
List<String> attributesAccountedForInSorted =
Arrays.asList(
"fieldIndices",
"encodingPositions",
"encodingPositionsOverridden",
"fields",
"hashCode",
"uuid",
"options");

// Current attributes in Schema object.
List<String> currentAttributes =
Arrays.stream(Schema.class.getDeclaredFields())
.filter(field -> !field.isSynthetic())
.map(java.lang.reflect.Field::getName)
.collect(Collectors.toList());

List<String> differences = new ArrayList<>(currentAttributes);
differences.removeAll(attributesAccountedForInSorted);

assertEquals(
String.format(
"Detected attributes %s in Schema object that are not accounted for in Schema::sorted(). "
+ "If appropriate, sorted() should copy over these attributes as well. Either way, update this test after checking.",
differences.toString()),
currentAttributes,
attributesAccountedForInSorted);
}

@Test
public void testEquivalent() {
final Schema expectedNested1 =
Expand Down