diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Convert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Convert.java index 1d8116cb7bb2..9b01b352c333 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Convert.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Convert.java @@ -17,9 +17,12 @@ */ package org.apache.beam.sdk.schemas.transforms; +import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.schemas.NoSuchSchemaException; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.SchemaCoder; import org.apache.beam.sdk.schemas.SchemaRegistry; import org.apache.beam.sdk.transforms.DoFn; @@ -70,7 +73,9 @@ public static PTransform, PCollection> fromR * *

This function allows converting between two types as long as the two types have * compatible schemas. Two schemas are said to be compatible if they recursively - * have fields with the same names, but possibly different orders. + * have fields with the same names, but possibly different orders. If the source schema can be + * unboxed to match the target schema (i.e. the source schema contains a single field that is + * compatible with the target schema), then conversion also succeeds. */ public static PTransform, PCollection> to( Class clazz) { @@ -82,7 +87,9 @@ public static PTransform, PCollectionThis function allows converting between two types as long as the two types have * compatible schemas. Two schemas are said to be compatible if they recursively - * have fields with the same names, but possibly different orders. + * have fields with the same names, but possibly different orders. If the source schema can be + * unboxed to match the target schema (i.e. the source schema contains a single field that is + * compatible with the target schema), then conversion also succeeds. */ public static PTransform, PCollection> to( TypeDescriptor typeDescriptor) { @@ -92,11 +99,24 @@ public static PTransform, PCollection extends PTransform, PCollection> { TypeDescriptor outputTypeDescriptor; + Schema unboxedSchema = null; ConvertTransform(TypeDescriptor outputTypeDescriptor) { this.outputTypeDescriptor = outputTypeDescriptor; } + @Nullable + private static Schema getBoxedNestedSchema(Schema schema) { + if (schema.getFieldCount() != 1) { + return null; + } + FieldType fieldType = schema.getField(0).getType(); + if (!fieldType.getTypeName().isCompositeType()) { + return null; + } + return fieldType.getRowSchema(); + } + @Override @SuppressWarnings("unchecked") public PCollection expand(PCollection input) { @@ -124,15 +144,21 @@ public PCollection expand(PCollection input) { registry.getSchema(outputTypeDescriptor), registry.getToRowFunction(outputTypeDescriptor), registry.getFromRowFunction(outputTypeDescriptor)); - // assert matches input schema. - // TODO: Properly handle nullable. - if (!outputSchemaCoder.getSchema().assignableToIgnoreNullable(input.getSchema())) { - throw new RuntimeException( - "Cannot convert between types that don't have equivalent schemas." - + " input schema: " - + input.getSchema() - + " output schema: " - + outputSchemaCoder.getSchema()); + + Schema outputSchema = outputSchemaCoder.getSchema(); + if (!outputSchema.assignableToIgnoreNullable(input.getSchema())) { + // We also support unboxing nested Row schemas, so attempt that. + // TODO: Support unboxing to primitive types as well. + unboxedSchema = getBoxedNestedSchema(input.getSchema()); + if (unboxedSchema == null || !outputSchema.assignableToIgnoreNullable(unboxedSchema)) { + Schema checked = (unboxedSchema == null) ? input.getSchema() : unboxedSchema; + throw new RuntimeException( + "Cannot convert between types that don't have equivalent schemas." + + " input schema: " + + checked + + " output schema: " + + outputSchemaCoder.getSchema()); + } } } catch (NoSuchSchemaException e) { throw new RuntimeException("No schema registered for " + outputTypeDescriptor); @@ -145,7 +171,9 @@ public PCollection expand(PCollection input) { new DoFn() { @ProcessElement public void processElement(@Element Row row, OutputReceiver o) { - o.output(outputSchemaCoder.getFromRowFunction().apply(row)); + // Read the row, potentially unboxing if necessary. + Row input = (unboxedSchema == null) ? row : row.getValue(0); + o.output(outputSchemaCoder.getFromRowFunction().apply(input)); } })) .setSchema( diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java index 8686a7a279bc..077cc33f34de 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java @@ -64,7 +64,8 @@ * *

{@code
  * PCollection events = readUserEvents();
- * PCollection rows = event.apply(Select.fieldNames("location.*"));
+ * PCollection rows = event.apply(Select.fieldNames("location")
+ *                              .apply(Convert.to(Location.class));
  * }
*/ @Experimental(Kind.SCHEMAS) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java index e3baf68df954..37742a5c06f7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java @@ -34,32 +34,84 @@ import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Maps; -/** Helper methods to select fields from a Schema. */ +/** Helper methods to select subrows out of rows. */ public class SelectHelpers { - // Currently we don't flatten selected nested fields. + + private static Schema union(Iterable schemas) { + Schema.Builder unioned = Schema.builder(); + for (Schema schema : schemas) { + unioned.addFields(schema.getFields()); + } + return unioned.build(); + } + + /** + * Get the output schema resulting from selecting the given {@link FieldAccessDescriptor} from the + * given schema. + * + *

Fields are always extracted and then stored in a new Row. For example, consider the + * following Java POJOs: + * + *

{@code
+   *  class UserEvent {
+   *    String userId;
+   *    String eventId;
+   *    int eventType;
+   *    Location location;
+   * }
+   * }
+ * + *
{@code
+   * class Location {
+   *   double latitude;
+   *   double longtitude;
+   * }
+   * }
+ * + *

If selecting just the location field, then the returned schema will wrap that of the + * singular field being selected; in this case the returned schema will be a Row containing a + * single Location field. If location.latitude is selected, then the returned Schema will be a Row + * containing a double latitude field. + * + *

The same holds true when selecting from lists or maps. For example: + * + *

{@code
+   * class EventList {
+   *   List events;
+   * }
+   * }
+ * + *

If selecting events.location.latitude, the returned schema will contain a single array of + * Row, where that Row contains a single double latitude field; it will not contain an array of + * double. + */ public static Schema getOutputSchema( Schema inputSchema, FieldAccessDescriptor fieldAccessDescriptor) { if (fieldAccessDescriptor.getAllFields()) { return inputSchema; } - Schema.Builder builder = new Schema.Builder(); + + List schemas = Lists.newArrayList(); + Schema.Builder builder = Schema.builder(); for (int fieldId : fieldAccessDescriptor.fieldIdsAccessed()) { builder.addField(inputSchema.getField(fieldId)); } + schemas.add(builder.build()); for (Map.Entry nested : fieldAccessDescriptor.getNestedFieldsAccessed().entrySet()) { FieldDescriptor fieldDescriptor = nested.getKey(); + FieldAccessDescriptor nestedAccess = nested.getValue(); Field field = inputSchema.getField(checkNotNull(fieldDescriptor.getFieldId())); - FieldType outputType = - getOutputSchemaHelper( - field.getType(), nested.getValue(), fieldDescriptor.getQualifiers(), 0); - builder.addField(field.getName(), outputType); + Schema outputSchema = + getOutputSchemaHelper(field.getType(), nestedAccess, fieldDescriptor.getQualifiers(), 0); + schemas.add(outputSchema); } - return builder.build(); + + return union(schemas); } - private static FieldType getOutputSchemaHelper( + private static Schema getOutputSchemaHelper( FieldType inputFieldType, FieldAccessDescriptor fieldAccessDescriptor, List qualifiers, @@ -68,34 +120,45 @@ private static FieldType getOutputSchemaHelper( // We have walked through any containers, and are at a row type. Extract the subschema // for the row, preserving nullable attributes. checkArgument(inputFieldType.getTypeName().isCompositeType()); - return FieldType.row(getOutputSchema(inputFieldType.getRowSchema(), fieldAccessDescriptor)) - .withNullable(inputFieldType.getNullable()); + return getOutputSchema(inputFieldType.getRowSchema(), fieldAccessDescriptor); } Qualifier qualifier = qualifiers.get(qualifierPosition); + Schema.Builder builder = Schema.builder(); switch (qualifier.getKind()) { case LIST: checkArgument(qualifier.getList().equals(ListQualifier.ALL)); FieldType componentType = checkNotNull(inputFieldType.getCollectionElementType()); - FieldType outputComponent = + Schema outputComponent = getOutputSchemaHelper( - componentType, fieldAccessDescriptor, qualifiers, qualifierPosition + 1) - .withNullable(componentType.getNullable()); - return FieldType.array(outputComponent).withNullable(inputFieldType.getNullable()); + componentType, fieldAccessDescriptor, qualifiers, qualifierPosition + 1); + for (Field field : outputComponent.getFields()) { + Field newField = + Field.of(field.getName(), FieldType.array(field.getType())) + .withNullable(inputFieldType.getNullable()); + builder.addField(newField); + } + return builder.build(); case MAP: checkArgument(qualifier.getMap().equals(MapQualifier.ALL)); FieldType keyType = checkNotNull(inputFieldType.getMapKeyType()); FieldType valueType = checkNotNull(inputFieldType.getMapValueType()); - FieldType outputValueType = + Schema outputValueSchema = getOutputSchemaHelper( - valueType, fieldAccessDescriptor, qualifiers, qualifierPosition + 1) - .withNullable(valueType.getNullable()); - return FieldType.map(keyType, outputValueType).withNullable(inputFieldType.getNullable()); + valueType, fieldAccessDescriptor, qualifiers, qualifierPosition + 1); + for (Field field : outputValueSchema.getFields()) { + Field newField = + Field.of(field.getName(), FieldType.map(keyType, field.getType())) + .withNullable(inputFieldType.getNullable()); + builder.addField(newField); + } + return builder.build(); default: throw new RuntimeException("unexpected"); } } + /** Select a sub Row from an input Row. */ public static Row selectRow( Row input, FieldAccessDescriptor fieldAccessDescriptor, @@ -106,47 +169,73 @@ public static Row selectRow( } Row.Builder output = Row.withSchema(outputSchema); + selectIntoRow(input, output, fieldAccessDescriptor); + return output.build(); + } + + /** Select out of a given {@link Row} object. */ + public static void selectIntoRow( + Row input, Row.Builder output, FieldAccessDescriptor fieldAccessDescriptor) { + if (fieldAccessDescriptor.getAllFields()) { + output.addValues(input.getValues()); + return; + } + for (int fieldId : fieldAccessDescriptor.fieldIdsAccessed()) { // TODO: Once we support specific qualifiers (like array slices), extract them here. output.addValue(input.getValue(fieldId)); } + Schema outputSchema = output.getSchema(); for (Map.Entry nested : fieldAccessDescriptor.getNestedFieldsAccessed().entrySet()) { FieldDescriptor field = nested.getKey(); - String fieldName = inputSchema.nameOf(checkNotNull(field.getFieldId())); - FieldType nestedInputType = inputSchema.getField(field.getFieldId()).getType(); - FieldType nestedOutputType = outputSchema.getField(fieldName).getType(); - Object value = - selectRowHelper( - field.getQualifiers(), - 0, - input.getValue(fieldName), - nested.getValue(), - nestedInputType, - nestedOutputType); - output.addValue(value); + FieldAccessDescriptor nestedAccess = nested.getValue(); + FieldType nestedInputType = input.getSchema().getField(field.getFieldId()).getType(); + FieldType nestedOutputType = outputSchema.getField(output.nextFieldId()).getType(); + selectIntoRowHelper( + field.getQualifiers(), + input.getValue(field.getFieldId()), + output, + nestedAccess, + nestedInputType, + nestedOutputType); } - return output.build(); } @SuppressWarnings("unchecked") - private static Object selectRowHelper( + private static void selectIntoRowHelper( List qualifiers, - int qualifierPosition, Object value, + Row.Builder output, FieldAccessDescriptor fieldAccessDescriptor, FieldType inputType, FieldType outputType) { - if (qualifierPosition >= qualifiers.size()) { + if (qualifiers.isEmpty()) { Row row = (Row) value; - return selectRow( - row, fieldAccessDescriptor, inputType.getRowSchema(), outputType.getRowSchema()); + selectIntoRow(row, output, fieldAccessDescriptor); + return; } - if (fieldAccessDescriptor.getAllFields()) { - // Since we are selecting all fields (and we do not yet support array slicing), short circuit. - return value; + // There are qualifiers. That means that the result will be either a list or a map, so + // construct the result and add that to our Row. + selectIntoRowWithQualifiers( + qualifiers, 0, value, output, fieldAccessDescriptor, inputType, outputType); + } + + private static void selectIntoRowWithQualifiers( + List qualifiers, + int qualifierPosition, + Object value, + Row.Builder output, + FieldAccessDescriptor fieldAccessDescriptor, + FieldType inputType, + FieldType outputType) { + if (qualifierPosition >= qualifiers.size()) { + // We have already constructed all arrays and maps. What remains must be a Row. + Row row = (Row) value; + selectIntoRow(row, output, fieldAccessDescriptor); + return; } Qualifier qualifier = qualifiers.get(qualifierPosition); @@ -156,38 +245,87 @@ private static Object selectRowHelper( FieldType nestedInputType = checkNotNull(inputType.getCollectionElementType()); FieldType nestedOutputType = checkNotNull(outputType.getCollectionElementType()); List list = (List) value; - List selectedList = Lists.newArrayListWithCapacity(list.size()); + + // When selecting multiple subelements under a list, we distribute the select + // resulting in multiple lists. For example, if there is a field "list" with type + // {a: string, b: int}[], selecting list.a, list.b results in a schema of type + // {a: string[], b: int[]}. This preserves the invariant that the name selected always + // appears in the top-level schema. + Schema tempSchema = Schema.builder().addField("a", nestedInputType).build(); + FieldAccessDescriptor tempAccessDescriptor = + FieldAccessDescriptor.create() + .withNestedField("a", fieldAccessDescriptor) + .resolve(tempSchema); + // TODO: doing this on each element might be inefficient. Consider caching this, or + // using codegen based on the schema. + Schema nestedSchema = getOutputSchema(tempSchema, tempAccessDescriptor); + + List> selectedLists = + Lists.newArrayListWithCapacity(nestedSchema.getFieldCount()); + for (int i = 0; i < nestedSchema.getFieldCount(); i++) { + selectedLists.add(Lists.newArrayListWithCapacity(list.size())); + } for (Object o : list) { - Object selected = - selectRowHelper( - qualifiers, - qualifierPosition + 1, - o, - fieldAccessDescriptor, - nestedInputType, - nestedOutputType); - selectedList.add(selected); + Row.Builder selectElementBuilder = Row.withSchema(nestedSchema); + selectIntoRowWithQualifiers( + qualifiers, + qualifierPosition + 1, + o, + selectElementBuilder, + fieldAccessDescriptor, + nestedInputType, + nestedOutputType); + + Row elementBeforeDistribution = selectElementBuilder.build(); + for (int i = 0; i < nestedSchema.getFieldCount(); ++i) { + selectedLists.get(i).add(elementBeforeDistribution.getValue(i)); + } } - return selectedList; + for (List aList : selectedLists) { + output.addValue(aList); + } + break; } case MAP: { FieldType nestedInputType = checkNotNull(inputType.getMapValueType()); FieldType nestedOutputType = checkNotNull(outputType.getMapValueType()); + + // When selecting multiple subelements under a map, we distribute the select + // resulting in multiple maps. The semantics are the same as for lists above (except we + // only support subelement select for map values, not for map keys). + Schema tempSchema = Schema.builder().addField("a", nestedInputType).build(); + FieldAccessDescriptor tempAccessDescriptor = + FieldAccessDescriptor.create() + .withNestedField("a", fieldAccessDescriptor) + .resolve(tempSchema); + Schema nestedSchema = getOutputSchema(tempSchema, tempAccessDescriptor); + List selectedMaps = Lists.newArrayListWithExpectedSize(nestedSchema.getFieldCount()); + for (int i = 0; i < nestedSchema.getFieldCount(); ++i) { + selectedMaps.add(Maps.newHashMap()); + } + Map map = (Map) value; - Map selectedMap = Maps.newHashMapWithExpectedSize(map.size()); for (Map.Entry entry : map.entrySet()) { - Object selected = - selectRowHelper( - qualifiers, - qualifierPosition + 1, - entry.getValue(), - fieldAccessDescriptor, - nestedInputType, - nestedOutputType); - selectedMap.put(entry.getKey(), selected); + Row.Builder selectValueBuilder = Row.withSchema(nestedSchema); + selectIntoRowWithQualifiers( + qualifiers, + qualifierPosition + 1, + entry.getValue(), + selectValueBuilder, + fieldAccessDescriptor, + nestedInputType, + nestedOutputType); + + Row valueBeforeDistribution = selectValueBuilder.build(); + for (int i = 0; i < nestedSchema.getFieldCount(); ++i) { + selectedMaps.get(i).put(entry.getKey(), valueBeforeDistribution.getValue(i)); + } + } + for (Map aMap : selectedMaps) { + output.addValue(aMap); } - return selectedMap; + break; } default: throw new RuntimeException("Unexpected type " + qualifier.getKind()); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java index 47958e2d6119..1f6e38617006 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java @@ -500,6 +500,17 @@ public static class Builder { this.schema = schema; } + public int nextFieldId() { + if (fieldValueGetterFactory != null) { + throw new RuntimeException("Not supported"); + } + return values.size(); + } + + public Schema getSchema() { + return schema; + } + public Builder addValue(@Nullable Object values) { this.values.add(values); return this; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/GroupTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/GroupTest.java index cd8ec7a8b888..0229c55e9557 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/GroupTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/GroupTest.java @@ -216,22 +216,16 @@ public void testGroupByNestedKey() { new OuterPOJO(new POJO("key2", 2L, "value4")))) .apply(Group.byFieldNames("inner.field1", "inner.field2")); - Schema selectedSchema = - Schema.builder().addStringField("field1").addInt64Field("field2").build(); - Schema keySchema = Schema.builder().addRowField("inner", selectedSchema).build(); + Schema keySchema = Schema.builder().addStringField("field1").addInt64Field("field2").build(); List>> expected = ImmutableList.of( KV.of( - Row.withSchema(keySchema) - .addValue(Row.withSchema(selectedSchema).addValues("key1", 1L).build()) - .build(), + Row.withSchema(keySchema).addValues("key1", 1L).build(), ImmutableList.of( new OuterPOJO(new POJO("key1", 1L, "value1")), new OuterPOJO(new POJO("key1", 1L, "value2")))), KV.of( - Row.withSchema(keySchema) - .addValue(Row.withSchema(selectedSchema).addValues("key2", 2L).build()) - .build(), + Row.withSchema(keySchema).addValues("key2", 2L).build(), ImmutableList.of( new OuterPOJO(new POJO("key2", 2L, "value3")), new OuterPOJO(new POJO("key2", 2L, "value4"))))); @@ -535,8 +529,7 @@ public void testByKeyWithSchemaAggregateFnNestedFields() { .aggregateField("inner.field3", Sum.ofIntegers(), "field3_sum") .aggregateField("inner.field1", Top.largestLongsFn(1), "field1_top")); - Schema innerKeySchema = Schema.builder().addInt64Field("field2").build(); - Schema keySchema = Schema.builder().addRowField("inner", innerKeySchema).build(); + Schema keySchema = Schema.builder().addInt64Field("field2").build(); Schema valueSchema = Schema.builder() .addInt64Field("field1_sum") @@ -547,14 +540,10 @@ public void testByKeyWithSchemaAggregateFnNestedFields() { List> expected = ImmutableList.of( KV.of( - Row.withSchema(keySchema) - .addValue(Row.withSchema(innerKeySchema).addValue(1L).build()) - .build(), + Row.withSchema(keySchema).addValue(1L).build(), Row.withSchema(valueSchema).addValue(3L).addValue(5).addArray(2L).build()), KV.of( - Row.withSchema(keySchema) - .addValue(Row.withSchema(innerKeySchema).addValue(2L).build()) - .build(), + Row.withSchema(keySchema).addValue(2L).build(), Row.withSchema(valueSchema).addValue(7L).addValue(9).addArray(4L).build())); PAssert.that(aggregations).satisfies(actual -> containsKvs(expected, actual)); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SelectTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SelectTest.java index 3238ebd9d72b..f2728e0e7aee 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SelectTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SelectTest.java @@ -127,52 +127,6 @@ public int hashCode() { } } - /** A pojo matching the schema results from selection field2.*. */ - @DefaultSchema(JavaFieldSchema.class) - static class POJO2NestedAll { - POJO1 field2 = new POJO1(); - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - POJO2NestedAll that = (POJO2NestedAll) o; - return Objects.equals(field2, that.field2); - } - - @Override - public int hashCode() { - return Objects.hash(field2); - } - } - - /** A pojo matching the schema results from selection field2.field1, field2.field3. */ - @DefaultSchema(JavaFieldSchema.class) - static class POJO2NestedPartial { - POJO1Selected field2 = new POJO1Selected(); - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - POJO2NestedPartial that = (POJO2NestedPartial) o; - return Objects.equals(field2, that.field2); - } - - @Override - public int hashCode() { - return Objects.hash(field2); - } - } - @Test @Category(NeedsRunner.class) public void testSelectMissingFieldName() { @@ -216,24 +170,36 @@ public void testSimpleSelect() { @Test @Category(NeedsRunner.class) public void testSelectNestedAll() { - PCollection pojos = + PCollection pojos = + pipeline + .apply(Create.of(new POJO2())) + .apply(Select.fieldNames("field2")) + .apply(Convert.to(POJO1.class)); + PAssert.that(pojos).containsInAnyOrder(new POJO1()); + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testSelectNestedAllWildcard() { + PCollection pojos = pipeline .apply(Create.of(new POJO2())) .apply(Select.fieldNames("field2.*")) - .apply(Convert.to(POJO2NestedAll.class)); - PAssert.that(pojos).containsInAnyOrder(new POJO2NestedAll()); + .apply(Convert.to(POJO1.class)); + PAssert.that(pojos).containsInAnyOrder(new POJO1()); pipeline.run(); } @Test @Category(NeedsRunner.class) public void testSelectNestedPartial() { - PCollection pojos = + PCollection pojos = pipeline .apply(Create.of(new POJO2())) .apply(Select.fieldNames("field2.field1", "field2.field3")) - .apply(Convert.to(POJO2NestedPartial.class)); - PAssert.that(pojos).containsInAnyOrder(new POJO2NestedPartial()); + .apply(Convert.to(POJO1Selected.class)); + PAssert.that(pojos).containsInAnyOrder(new POJO1Selected()); pipeline.run(); } @@ -295,8 +261,8 @@ public int hashCode() { @DefaultSchema(JavaFieldSchema.class) static class PartialRowSingleArray { - List field1 = - ImmutableList.of(new POJO1Selected(), new POJO1Selected(), new POJO1Selected()); + List field1 = ImmutableList.of("field1", "field1", "field1"); + List field3 = ImmutableList.of(3.14, 3.14, 3.14); @Override public boolean equals(Object o) { @@ -307,12 +273,12 @@ public boolean equals(Object o) { return false; } PartialRowSingleArray that = (PartialRowSingleArray) o; - return Objects.equals(field1, that.field1); + return Objects.equals(field1, that.field1) && Objects.equals(field3, that.field3); } @Override public int hashCode() { - return Objects.hash(field1); + return Objects.hash(field1, field3); } } @@ -356,11 +322,16 @@ public int hashCode() { @DefaultSchema(JavaFieldSchema.class) static class PartialRowSingleMap { - Map field1 = + Map field1 = ImmutableMap.of( - "key1", new POJO1Selected(), - "key2", new POJO1Selected(), - "key3", new POJO1Selected()); + "key1", "field1", + "key2", "field1", + "key3", "field1"); + Map field3 = + ImmutableMap.of( + "key1", 3.14, + "key2", 3.14, + "key3", 3.14); @Override public boolean equals(Object o) { @@ -371,12 +342,12 @@ public boolean equals(Object o) { return false; } PartialRowSingleMap that = (PartialRowSingleMap) o; - return Objects.equals(field1, that.field1); + return Objects.equals(field1, that.field1) && Objects.equals(field3, that.field3); } @Override public int hashCode() { - return Objects.hash(field1); + return Objects.hash(field1, field3); } } @@ -427,8 +398,17 @@ static class PartialRowMultipleArray { private static final List> POJO_LIST_LIST = ImmutableList.of(POJO_LIST, POJO_LIST, POJO_LIST); - List>> field1 = - ImmutableList.of(POJO_LIST_LIST, POJO_LIST_LIST, POJO_LIST_LIST); + private static final List STRING_LIST = ImmutableList.of("field1", "field1", "field1"); + private static final List> STRING_LISTLIST = + ImmutableList.of(STRING_LIST, STRING_LIST, STRING_LIST); + List>> field1 = + ImmutableList.of(STRING_LISTLIST, STRING_LISTLIST, STRING_LISTLIST); + + private static final List DOUBLE_LIST = ImmutableList.of(3.14, 3.14, 3.14); + private static final List> DOUBLE_LISTLIST = + ImmutableList.of(DOUBLE_LIST, DOUBLE_LIST, DOUBLE_LIST); + List>> field3 = + ImmutableList.of(DOUBLE_LISTLIST, DOUBLE_LISTLIST, DOUBLE_LISTLIST); @Override public boolean equals(Object o) { @@ -507,21 +487,37 @@ public int hashCode() { @DefaultSchema(JavaFieldSchema.class) static class PartialRowMultipleMaps { - static final Map POJO_MAP = + static final Map STRING_MAP = ImmutableMap.of( - "key1", new POJO1Selected(), - "key2", new POJO1Selected(), - "key3", new POJO1Selected()); - static final Map> POJO_MAP_MAP = + "key1", "field1", + "key2", "field1", + "key3", "field1"); + static final Map> STRING_MAPMAP = ImmutableMap.of( - "key1", POJO_MAP, - "key2", POJO_MAP, - "key3", POJO_MAP); - Map>> field1 = + "key1", STRING_MAP, + "key2", STRING_MAP, + "key3", STRING_MAP); + Map>> field1 = ImmutableMap.of( - "key1", POJO_MAP_MAP, - "key2", POJO_MAP_MAP, - "key3", POJO_MAP_MAP); + "key1", STRING_MAPMAP, + "key2", STRING_MAPMAP, + "key3", STRING_MAPMAP); + static final Map DOUBLE_MAP = + ImmutableMap.of( + "key1", 3.14, + "key2", 3.14, + "key3", 3.14); + static final Map> DOUBLE_MAPMAP = + ImmutableMap.of( + "key1", DOUBLE_MAP, + "key2", DOUBLE_MAP, + "key3", DOUBLE_MAP); + + Map>> field3 = + ImmutableMap.of( + "key1", DOUBLE_MAPMAP, + "key2", DOUBLE_MAPMAP, + "key3", DOUBLE_MAPMAP); @Override public boolean equals(Object o) { @@ -532,12 +528,12 @@ public boolean equals(Object o) { return false; } PartialRowMultipleMaps that = (PartialRowMultipleMaps) o; - return Objects.equals(field1, that.field1); + return Objects.equals(field1, that.field1) && Objects.equals(field3, that.field3); } @Override public int hashCode() { - return Objects.hash(field1); + return Objects.hash(field1, field3); } } @@ -592,15 +588,20 @@ public int hashCode() { @DefaultSchema(JavaFieldSchema.class) static class PartialRowNestedArraysAndMaps { - static final List POJO_LIST = - ImmutableList.of(new POJO1Selected(), new POJO1Selected(), new POJO1Selected()); - static final Map> POJO_MAP_LIST = + static final Map> STRING_MAP = ImmutableMap.of( - "key1", POJO_LIST, - "key2", POJO_LIST, - "key3", POJO_LIST); - List>> field1 = - ImmutableList.of(POJO_MAP_LIST, POJO_MAP_LIST, POJO_MAP_LIST); + "key1", ImmutableList.of("field1", "field1", "field1"), + "key2", ImmutableList.of("field1", "field1", "field1"), + "key3", ImmutableList.of("field1", "field1", "field1")); + List>> field1 = ImmutableList.of(STRING_MAP, STRING_MAP, STRING_MAP); + + static final Map> DOUBLE_MAP = + ImmutableMap.of( + "key1", ImmutableList.of(3.14, 3.14, 3.14), + "key2", ImmutableList.of(3.14, 3.14, 3.14), + "key3", ImmutableList.of(3.14, 3.14, 3.14)); + + List>> field3 = ImmutableList.of(DOUBLE_MAP, DOUBLE_MAP, DOUBLE_MAP); @Override public boolean equals(Object o) { @@ -611,12 +612,17 @@ public boolean equals(Object o) { return false; } PartialRowNestedArraysAndMaps that = (PartialRowNestedArraysAndMaps) o; - return Objects.equals(field1, that.field1); + return Objects.equals(field1, that.field1) && Objects.equals(field3, that.field3); } @Override public int hashCode() { - return Objects.hash(field1); + return Objects.hash(field1, field3); + } + + @Override + public String toString() { + return "PartialRowNestedArraysAndMaps{" + "field1=" + field1 + ", field3=" + field3 + '}'; } } @@ -637,6 +643,7 @@ public void testSelectRowNestedListsAndMaps() { .apply("convert2", Convert.to(PartialRowNestedArraysAndMaps.class)); PAssert.that(selected).containsInAnyOrder(new PartialRowNestedArraysAndMaps()); + PAssert.that(selected2).containsInAnyOrder(new PartialRowNestedArraysAndMaps()); pipeline.run(); } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/SelectHelpersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/SelectHelpersTest.java new file mode 100644 index 000000000000..77183bf08e18 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/SelectHelpersTest.java @@ -0,0 +1,381 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.utils; + +import static org.junit.Assert.assertEquals; + +import org.apache.beam.sdk.schemas.FieldAccessDescriptor; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists; +import org.junit.Test; + +/** Tests for {@link SelectHelpers}. */ +public class SelectHelpersTest { + static final Schema FLAT_SCHEMA = + Schema.builder() + .addStringField("field1") + .addInt32Field("field2") + .addDoubleField("field3") + .build(); + static final Row FLAT_ROW = Row.withSchema(FLAT_SCHEMA).addValues("first", 42, 3.14).build(); + + static final Schema NESTED_SCHEMA = + Schema.builder().addRowField("nested", FLAT_SCHEMA).addStringField("foo").build(); + static final Row NESTED_ROW = Row.withSchema(NESTED_SCHEMA).addValues(FLAT_ROW, "").build(); + + static final Schema DOUBLE_NESTED_SCHEMA = + Schema.builder().addRowField("nested2", NESTED_SCHEMA).build(); + static final Row DOUBLE_NESTED_ROW = + Row.withSchema(DOUBLE_NESTED_SCHEMA).addValue(NESTED_ROW).build(); + + static final Schema ARRAY_SCHEMA = + Schema.builder() + .addArrayField("primitiveArray", FieldType.INT32) + .addArrayField("rowArray", FieldType.row(FLAT_SCHEMA)) + .addArrayField("arrayOfRowArray", FieldType.array(FieldType.row(FLAT_SCHEMA))) + .addArrayField("nestedRowArray", FieldType.row(NESTED_SCHEMA)) + .build(); + static final Row ARRAY_ROW = + Row.withSchema(ARRAY_SCHEMA) + .addArray(1, 2) + .addArray(FLAT_ROW, FLAT_ROW) + .addArray(ImmutableList.of(FLAT_ROW), ImmutableList.of(FLAT_ROW)) + .addArray(NESTED_ROW, NESTED_ROW) + .build(); + + static final Schema MAP_SCHEMA = + Schema.builder().addMapField("map", FieldType.INT32, FieldType.row(FLAT_SCHEMA)).build(); + static final Row MAP_ROW = + Row.withSchema(MAP_SCHEMA).addValue(ImmutableMap.of(1, FLAT_ROW)).build(); + + static final Schema MAP_ARRAY_SCHEMA = + Schema.builder() + .addMapField("map", FieldType.INT32, FieldType.array(FieldType.row(FLAT_SCHEMA))) + .build(); + static final Row MAP_ARRAY_ROW = + Row.withSchema(MAP_ARRAY_SCHEMA) + .addValue(ImmutableMap.of(1, ImmutableList.of(FLAT_ROW))) + .build(); + + @Test + public void testSelectAll() { + FieldAccessDescriptor fieldAccessDescriptor = + FieldAccessDescriptor.withFieldNames("*").resolve(FLAT_SCHEMA); + Schema outputSchema = SelectHelpers.getOutputSchema(FLAT_SCHEMA, fieldAccessDescriptor); + assertEquals(FLAT_SCHEMA, outputSchema); + + Row row = SelectHelpers.selectRow(FLAT_ROW, fieldAccessDescriptor, FLAT_SCHEMA, outputSchema); + assertEquals(FLAT_ROW, row); + } + + @Test + public void testsSimpleSelectSingle() { + FieldAccessDescriptor fieldAccessDescriptor = + FieldAccessDescriptor.withFieldNames("field1").resolve(FLAT_SCHEMA); + Schema outputSchema = SelectHelpers.getOutputSchema(FLAT_SCHEMA, fieldAccessDescriptor); + Schema expectedSchema = Schema.builder().addStringField("field1").build(); + assertEquals(expectedSchema, outputSchema); + + Row row = SelectHelpers.selectRow(FLAT_ROW, fieldAccessDescriptor, FLAT_SCHEMA, outputSchema); + Row expectedRow = Row.withSchema(expectedSchema).addValue("first").build(); + assertEquals(expectedRow, row); + } + + @Test + public void testsSimpleSelectMultiple() { + FieldAccessDescriptor fieldAccessDescriptor = + FieldAccessDescriptor.withFieldNames("field1", "field3").resolve(FLAT_SCHEMA); + Schema outputSchema = SelectHelpers.getOutputSchema(FLAT_SCHEMA, fieldAccessDescriptor); + Schema expectedSchema = + Schema.builder().addStringField("field1").addDoubleField("field3").build(); + assertEquals(expectedSchema, outputSchema); + + Row row = SelectHelpers.selectRow(FLAT_ROW, fieldAccessDescriptor, FLAT_SCHEMA, outputSchema); + Row expectedRow = Row.withSchema(expectedSchema).addValues("first", 3.14).build(); + assertEquals(expectedRow, row); + } + + @Test + public void testSelectedNested() { + FieldAccessDescriptor fieldAccessDescriptor = + FieldAccessDescriptor.withFieldNames("nested").resolve(NESTED_SCHEMA); + Schema outputSchema = SelectHelpers.getOutputSchema(NESTED_SCHEMA, fieldAccessDescriptor); + Schema expectedSchema = Schema.builder().addRowField("nested", FLAT_SCHEMA).build(); + assertEquals(expectedSchema, outputSchema); + + Row row = + SelectHelpers.selectRow(NESTED_ROW, fieldAccessDescriptor, NESTED_SCHEMA, outputSchema); + Row expectedRow = Row.withSchema(expectedSchema).addValue(FLAT_ROW).build(); + assertEquals(expectedRow, row); + } + + @Test + public void testSelectedNestedSingle() { + FieldAccessDescriptor fieldAccessDescriptor = + FieldAccessDescriptor.withFieldNames("nested.field1").resolve(NESTED_SCHEMA); + Schema outputSchema = SelectHelpers.getOutputSchema(NESTED_SCHEMA, fieldAccessDescriptor); + Schema expectedSchema = Schema.builder().addStringField("field1").build(); + assertEquals(expectedSchema, outputSchema); + + Row row = + SelectHelpers.selectRow(NESTED_ROW, fieldAccessDescriptor, NESTED_SCHEMA, outputSchema); + Row expectedRow = Row.withSchema(expectedSchema).addValue("first").build(); + assertEquals(expectedRow, row); + } + + @Test + public void testSelectedNestedWildcard() { + FieldAccessDescriptor fieldAccessDescriptor = + FieldAccessDescriptor.withFieldNames("nested.*").resolve(NESTED_SCHEMA); + Schema outputSchema = SelectHelpers.getOutputSchema(NESTED_SCHEMA, fieldAccessDescriptor); + assertEquals(FLAT_SCHEMA, outputSchema); + + Row row = + SelectHelpers.selectRow(NESTED_ROW, fieldAccessDescriptor, NESTED_SCHEMA, outputSchema); + assertEquals(FLAT_ROW, row); + } + + @Test + public void testSelectDoubleNested() { + FieldAccessDescriptor fieldAccessDescriptor = + FieldAccessDescriptor.withFieldNames("nested2.nested.field1").resolve(DOUBLE_NESTED_SCHEMA); + Schema outputSchema = + SelectHelpers.getOutputSchema(DOUBLE_NESTED_SCHEMA, fieldAccessDescriptor); + Schema expectedSchema = Schema.builder().addStringField("field1").build(); + assertEquals(expectedSchema, outputSchema); + + Row row = + SelectHelpers.selectRow( + DOUBLE_NESTED_ROW, fieldAccessDescriptor, DOUBLE_NESTED_SCHEMA, outputSchema); + Row expectedRow = Row.withSchema(expectedSchema).addValue("first").build(); + assertEquals(expectedRow, row); + } + + @Test + public void testSelectArrayOfPrimitive() { + FieldAccessDescriptor fieldAccessDescriptor = + FieldAccessDescriptor.withFieldNames("primitiveArray").resolve(ARRAY_SCHEMA); + Schema outputSchema = SelectHelpers.getOutputSchema(ARRAY_SCHEMA, fieldAccessDescriptor); + Schema expectedSchema = + Schema.builder().addArrayField("primitiveArray", FieldType.INT32).build(); + assertEquals(expectedSchema, outputSchema); + + Row row = SelectHelpers.selectRow(ARRAY_ROW, fieldAccessDescriptor, ARRAY_SCHEMA, outputSchema); + Row expectedRow = Row.withSchema(expectedSchema).addArray(1, 2).build(); + assertEquals(expectedRow, row); + } + + @Test + public void testSelectArrayOfRow() { + FieldAccessDescriptor fieldAccessDescriptor = + FieldAccessDescriptor.withFieldNames("rowArray").resolve(ARRAY_SCHEMA); + Schema outputSchema = SelectHelpers.getOutputSchema(ARRAY_SCHEMA, fieldAccessDescriptor); + Schema expectedSchema = + Schema.builder().addArrayField("rowArray", FieldType.row(FLAT_SCHEMA)).build(); + assertEquals(expectedSchema, outputSchema); + + Row row = SelectHelpers.selectRow(ARRAY_ROW, fieldAccessDescriptor, ARRAY_SCHEMA, outputSchema); + Row expectedRow = Row.withSchema(expectedSchema).addArray(FLAT_ROW, FLAT_ROW).build(); + assertEquals(expectedRow, row); + } + + @Test + public void testSelectArrayOfRowPartial() { + FieldAccessDescriptor fieldAccessDescriptor = + FieldAccessDescriptor.withFieldNames("rowArray[].field1").resolve(ARRAY_SCHEMA); + Schema outputSchema = SelectHelpers.getOutputSchema(ARRAY_SCHEMA, fieldAccessDescriptor); + + Schema expectedSchema = Schema.builder().addArrayField("field1", FieldType.STRING).build(); + assertEquals(expectedSchema, outputSchema); + + Row row = SelectHelpers.selectRow(ARRAY_ROW, fieldAccessDescriptor, ARRAY_SCHEMA, outputSchema); + Row expectedRow = Row.withSchema(expectedSchema).addArray("first", "first").build(); + assertEquals(expectedRow, row); + } + + @Test + public void testSelectArrayOfRowArray() { + FieldAccessDescriptor fieldAccessDescriptor = + FieldAccessDescriptor.withFieldNames("arrayOfRowArray[][].field1").resolve(ARRAY_SCHEMA); + Schema outputSchema = SelectHelpers.getOutputSchema(ARRAY_SCHEMA, fieldAccessDescriptor); + + Schema expectedSchema = + Schema.builder().addArrayField("field1", FieldType.array(FieldType.STRING)).build(); + assertEquals(expectedSchema, outputSchema); + + Row row = SelectHelpers.selectRow(ARRAY_ROW, fieldAccessDescriptor, ARRAY_SCHEMA, outputSchema); + + Row expectedRow = + Row.withSchema(expectedSchema) + .addArray(ImmutableList.of("first"), ImmutableList.of("first")) + .build(); + assertEquals(expectedRow, row); + } + + @Test + public void testSelectArrayOfNestedRow() { + FieldAccessDescriptor fieldAccessDescriptor = + FieldAccessDescriptor.withFieldNames("nestedRowArray[].nested.field1") + .resolve(ARRAY_SCHEMA); + Schema outputSchema = SelectHelpers.getOutputSchema(ARRAY_SCHEMA, fieldAccessDescriptor); + + Schema expectedElementSchema = Schema.builder().addStringField("field1").build(); + Schema expectedSchema = Schema.builder().addArrayField("field1", FieldType.STRING).build(); + assertEquals(expectedSchema, outputSchema); + + Row row = SelectHelpers.selectRow(ARRAY_ROW, fieldAccessDescriptor, ARRAY_SCHEMA, outputSchema); + Row expectedRow = Row.withSchema(expectedSchema).addArray("first", "first").build(); + assertEquals(expectedRow, row); + } + + @Test + public void testSelectMapOfRowSelectSingle() { + FieldAccessDescriptor fieldAccessDescriptor = + FieldAccessDescriptor.withFieldNames("map{}.field1").resolve(MAP_SCHEMA); + Schema outputSchema = SelectHelpers.getOutputSchema(MAP_SCHEMA, fieldAccessDescriptor); + + Schema expectedValueSchema = Schema.builder().addStringField("field1").build(); + Schema expectedSchema = + Schema.builder().addMapField("field1", FieldType.INT32, FieldType.STRING).build(); + assertEquals(expectedSchema, outputSchema); + + Row row = SelectHelpers.selectRow(MAP_ROW, fieldAccessDescriptor, MAP_SCHEMA, outputSchema); + Row expectedRow = Row.withSchema(expectedSchema).addValue(ImmutableMap.of(1, "first")).build(); + assertEquals(expectedRow, row); + } + + @Test + public void testSelectMapOfRowSelectAll() { + FieldAccessDescriptor fieldAccessDescriptor = + FieldAccessDescriptor.withFieldNames("map{}.*").resolve(MAP_SCHEMA); + Schema outputSchema = SelectHelpers.getOutputSchema(MAP_SCHEMA, fieldAccessDescriptor); + Schema expectedSchema = + Schema.builder() + .addMapField("field1", FieldType.INT32, FieldType.STRING) + .addMapField("field2", FieldType.INT32, FieldType.INT32) + .addMapField("field3", FieldType.INT32, FieldType.DOUBLE) + .build(); + assertEquals(expectedSchema, outputSchema); + + Row row = SelectHelpers.selectRow(MAP_ROW, fieldAccessDescriptor, MAP_SCHEMA, outputSchema); + Row expectedRow = + Row.withSchema(expectedSchema) + .addValue(ImmutableMap.of(1, FLAT_ROW.getValue(0))) + .addValue(ImmutableMap.of(1, FLAT_ROW.getValue(1))) + .addValue(ImmutableMap.of(1, FLAT_ROW.getValue(2))) + .build(); + assertEquals(expectedRow, row); + } + + @Test + public void testSelectMapOfArray() { + FieldAccessDescriptor fieldAccessDescriptor = + FieldAccessDescriptor.withFieldNames("map.field1").resolve(MAP_ARRAY_SCHEMA); + Schema outputSchema = SelectHelpers.getOutputSchema(MAP_ARRAY_SCHEMA, fieldAccessDescriptor); + + Schema expectedSchema = + Schema.builder() + .addMapField("field1", FieldType.INT32, FieldType.array(FieldType.STRING)) + .build(); + assertEquals(expectedSchema, outputSchema); + + Row row = + SelectHelpers.selectRow( + MAP_ARRAY_ROW, fieldAccessDescriptor, MAP_ARRAY_SCHEMA, outputSchema); + + Row expectedRow = + Row.withSchema(expectedSchema) + .addValue(ImmutableMap.of(1, ImmutableList.of("first"))) + .build(); + assertEquals(expectedRow, row); + } + + @Test + public void testSelectFieldOfRecord() { + Schema f1 = Schema.builder().addInt64Field("f0").build(); + Schema f2 = Schema.builder().addRowField("f1", f1).build(); + Schema f3 = Schema.builder().addRowField("f2", f2).build(); + + Row r1 = Row.withSchema(f1).addValue(42L).build(); // {"f0": 42} + Row r2 = Row.withSchema(f2).addValue(r1).build(); // {"f1": {"f0": 42}} + Row r3 = Row.withSchema(f3).addValue(r2).build(); // {"f2": {"f1": {"f0": 42}}} + + FieldAccessDescriptor fieldAccessDescriptor = + FieldAccessDescriptor.withFieldNames("f2.f1").resolve(f3); + + Schema outputSchema = SelectHelpers.getOutputSchema(f3, fieldAccessDescriptor); + + Row out = SelectHelpers.selectRow(r3, fieldAccessDescriptor, r3.getSchema(), outputSchema); + + assertEquals(f2, outputSchema); + assertEquals(r2, out); + } + + @Test + public void testSelectFieldOfRecordOrRecord() { + Schema f1 = Schema.builder().addInt64Field("f0").build(); + Schema f2 = Schema.builder().addRowField("f1", f1).build(); + Schema f3 = Schema.builder().addRowField("f2", f2).build(); + Schema f4 = Schema.builder().addRowField("f3", f3).build(); + + Row r1 = Row.withSchema(f1).addValue(42L).build(); // {"f0": 42} + Row r2 = Row.withSchema(f2).addValue(r1).build(); // {"f1": {"f0": 42}} + Row r3 = Row.withSchema(f3).addValue(r2).build(); // {"f2": {"f1": {"f0": 42}}} + Row r4 = Row.withSchema(f4).addValue(r3).build(); // {"f3": {"f2": {"f1": {"f0": 42}}}} + + FieldAccessDescriptor fieldAccessDescriptor = + FieldAccessDescriptor.withFieldNames("f3.f2").resolve(f4); + + Schema outputSchema = SelectHelpers.getOutputSchema(f4, fieldAccessDescriptor); + + Row out = SelectHelpers.selectRow(r4, fieldAccessDescriptor, r4.getSchema(), outputSchema); + + assertEquals(f3, outputSchema); + assertEquals(r3, out); + } + + @Test + public void testArrayRowArray() { + Schema f1 = Schema.builder().addStringField("f0").build(); + Schema f2 = Schema.builder().addArrayField("f1", FieldType.row(f1)).build(); + Schema f3 = Schema.builder().addRowField("f2", f2).build(); + Schema f4 = Schema.builder().addArrayField("f3", FieldType.row(f3)).build(); + + Row r1 = Row.withSchema(f1).addValue("first").build(); + Row r2 = Row.withSchema(f2).addArray(r1, r1).build(); + Row r3 = Row.withSchema(f3).addValue(r2).build(); + Row r4 = Row.withSchema(f4).addArray(r3, r3).build(); + + FieldAccessDescriptor fieldAccessDescriptor = + FieldAccessDescriptor.withFieldNames("f3.f2.f1.f0").resolve(f4); + Schema outputSchema = SelectHelpers.getOutputSchema(f4, fieldAccessDescriptor); + Schema expectedSchema = + Schema.builder().addArrayField("f0", FieldType.array(FieldType.STRING)).build(); + assertEquals(expectedSchema, outputSchema); + Row out = SelectHelpers.selectRow(r4, fieldAccessDescriptor, r4.getSchema(), outputSchema); + Row expected = + Row.withSchema(outputSchema) + .addArray(Lists.newArrayList("first", "first"), Lists.newArrayList("first", "first")) + .build(); + assertEquals(expected, out); + } +}