From a8a1f4a1a0f39cc2a5eb0d4ac1babd47c37c175a Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Mon, 10 Jan 2022 11:38:57 -0800 Subject: [PATCH 1/3] add SchemaFieldNumber annotation --- .../beam/sdk/schemas/AutoValueSchema.java | 54 +++++++++++----- .../schemas/FieldValueTypeInformation.java | 25 ++++++-- .../beam/sdk/schemas/JavaBeanSchema.java | 38 +++++++++-- .../beam/sdk/schemas/JavaFieldSchema.java | 39 ++++++++--- .../annotations/SchemaFieldNumber.java | 64 +++++++++++++++++++ .../beam/sdk/schemas/utils/AvroUtils.java | 12 ++-- .../beam/sdk/schemas/AutoValueSchemaTest.java | 47 ++++++++++++++ .../beam/sdk/schemas/JavaBeanSchemaTest.java | 5 +- .../beam/sdk/schemas/JavaFieldSchemaTest.java | 23 ++++++- .../beam/sdk/schemas/utils/TestJavaBeans.java | 30 +++++++++ .../beam/sdk/schemas/utils/TestPOJOs.java | 39 +++++++++++ 11 files changed, 336 insertions(+), 40 deletions(-) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/annotations/SchemaFieldNumber.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AutoValueSchema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AutoValueSchema.java index 0ddc5ebb3ccb..0e022546ce35 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AutoValueSchema.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AutoValueSchema.java @@ -19,11 +19,11 @@ import java.lang.reflect.Method; import java.lang.reflect.Modifier; +import java.util.Comparator; import java.util.List; import java.util.stream.Collectors; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; -import org.apache.beam.sdk.schemas.annotations.SchemaFieldName; import org.apache.beam.sdk.schemas.annotations.SchemaIgnore; import org.apache.beam.sdk.schemas.utils.AutoValueUtils; import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.DefaultTypeConversionsFactory; @@ -32,6 +32,8 @@ import org.apache.beam.sdk.schemas.utils.ReflectUtils; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; import org.checkerframework.checker.nullness.qual.Nullable; /** A {@link SchemaProvider} for AutoValue classes. */ @@ -48,22 +50,44 @@ public static class AbstractGetterTypeSupplier implements FieldValueTypeSupplier @Override public List get(Class clazz) { + // If the generated class is passed in, we want to look at the base class to find the getters. Class targetClass = AutoValueUtils.getBaseAutoValueClass(clazz); - return ReflectUtils.getMethods(targetClass).stream() - .filter(ReflectUtils::isGetter) - // All AutoValue getters are marked abstract. - .filter(m -> Modifier.isAbstract(m.getModifiers())) - .filter(m -> !Modifier.isPrivate(m.getModifiers())) - .filter(m -> !Modifier.isProtected(m.getModifiers())) - .filter(m -> !m.isAnnotationPresent(SchemaIgnore.class)) - .map(FieldValueTypeInformation::forGetter) - .map( - t -> { - SchemaFieldName fieldName = t.getMethod().getAnnotation(SchemaFieldName.class); - return (fieldName == null) ? t : t.withName(fieldName.value()); - }) - .collect(Collectors.toList()); + + List methods = + ReflectUtils.getMethods(targetClass).stream() + .filter(ReflectUtils::isGetter) + // All AutoValue getters are marked abstract. + .filter(m -> Modifier.isAbstract(m.getModifiers())) + .filter(m -> !Modifier.isPrivate(m.getModifiers())) + .filter(m -> !Modifier.isProtected(m.getModifiers())) + .filter(m -> !m.isAnnotationPresent(SchemaIgnore.class)) + .collect(Collectors.toList()); + List types = Lists.newArrayListWithCapacity(methods.size()); + for (int i = 0; i < methods.size(); ++i) { + types.add(FieldValueTypeInformation.forGetter(methods.get(i), i)); + } + types.sort(Comparator.comparing(FieldValueTypeInformation::getNumber)); + validateFieldNumbers(types); + return types; + } + } + + private static void validateFieldNumbers(List types) { + for (int i = 0; i < types.size(); ++i) { + FieldValueTypeInformation type = types.get(i); + @javax.annotation.Nullable Integer number = type.getNumber(); + if (number == null) { + throw new RuntimeException("Unexpected null number for " + type.getName()); + } + Preconditions.checkState( + number == i, + "Expected field number " + + i + + " for field + " + + type.getName() + + " instead got " + + number); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueTypeInformation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueTypeInformation.java index 022097052349..61b2553cbd8c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueTypeInformation.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueTypeInformation.java @@ -30,6 +30,7 @@ import java.util.stream.Stream; import org.apache.beam.sdk.schemas.annotations.SchemaCaseFormat; import org.apache.beam.sdk.schemas.annotations.SchemaFieldName; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldNumber; import org.apache.beam.sdk.schemas.logicaltypes.OneOfType; import org.apache.beam.sdk.schemas.utils.ReflectUtils; import org.apache.beam.sdk.values.TypeDescriptor; @@ -43,6 +44,9 @@ "rawtypes" }) public abstract class FieldValueTypeInformation implements Serializable { + /** Optionally returns the field index. */ + public abstract @Nullable Integer getNumber(); + /** Returns the field name. */ public abstract String getName(); @@ -74,6 +78,8 @@ public abstract class FieldValueTypeInformation implements Serializable { @AutoValue.Builder public abstract static class Builder { + public abstract Builder setNumber(@Nullable Integer number); + public abstract Builder setName(String name); public abstract Builder setNullable(boolean nullable); @@ -113,10 +119,11 @@ public static FieldValueTypeInformation forOneOf( .build(); } - public static FieldValueTypeInformation forField(Field field) { + public static FieldValueTypeInformation forField(Field field, int index) { TypeDescriptor type = TypeDescriptor.of(field.getGenericType()); return new AutoValue_FieldValueTypeInformation.Builder() .setName(getNameOverride(field.getName(), field)) + .setNumber(getNumberOverride(index, field)) .setNullable(hasNullableAnnotation(field)) .setType(type) .setRawType(type.getRawType()) @@ -128,10 +135,19 @@ public static FieldValueTypeInformation forField(Field field) { .build(); } + public static int getNumberOverride(int index, T member) { + @Nullable SchemaFieldNumber fieldNumber = member.getAnnotation(SchemaFieldNumber.class); + if (fieldNumber == null) { + return index; + } + return Integer.parseInt(fieldNumber.value()); + } + public static String getNameOverride( String original, T member) { - SchemaFieldName fieldName = member.getAnnotation(SchemaFieldName.class); - SchemaCaseFormat caseFormatAnnotation = member.getAnnotation(SchemaCaseFormat.class); + @Nullable SchemaFieldName fieldName = member.getAnnotation(SchemaFieldName.class); + @Nullable SchemaCaseFormat caseFormatAnnotation = member.getAnnotation(SchemaCaseFormat.class); + @Nullable SchemaCaseFormat classCaseFormatAnnotation = member.getDeclaringClass().getAnnotation(SchemaCaseFormat.class); if (fieldName != null) { @@ -151,7 +167,7 @@ public static String getNameOverride( } } - public static FieldValueTypeInformation forGetter(Method method) { + public static FieldValueTypeInformation forGetter(Method method, int index) { String name; if (method.getName().startsWith("get")) { name = ReflectUtils.stripPrefix(method.getName(), "get"); @@ -165,6 +181,7 @@ public static FieldValueTypeInformation forGetter(Method method) { boolean nullable = hasNullableReturnType(method); return new AutoValue_FieldValueTypeInformation.Builder() .setName(getNameOverride(name, method)) + .setNumber(getNumberOverride(index, method)) .setNullable(nullable) .setType(type) .setRawType(type.getRawType()) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java index d01aa36f3ef7..9f2f70df9236 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java @@ -19,6 +19,7 @@ import java.lang.reflect.Constructor; import java.lang.reflect.Method; +import java.util.Comparator; import java.util.List; import java.util.stream.Collectors; import org.apache.beam.sdk.annotations.Experimental; @@ -32,6 +33,8 @@ import org.apache.beam.sdk.schemas.utils.ReflectUtils; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; import org.checkerframework.checker.nullness.qual.Nullable; /** @@ -60,11 +63,36 @@ public static class GetterTypeSupplier implements FieldValueTypeSupplier { @Override public List get(Class clazz) { - return ReflectUtils.getMethods(clazz).stream() - .filter(ReflectUtils::isGetter) - .filter(m -> !m.isAnnotationPresent(SchemaIgnore.class)) - .map(FieldValueTypeInformation::forGetter) - .collect(Collectors.toList()); + List methods = + ReflectUtils.getMethods(clazz).stream() + .filter(ReflectUtils::isGetter) + .filter(m -> !m.isAnnotationPresent(SchemaIgnore.class)) + .collect(Collectors.toList()); + List types = Lists.newArrayListWithCapacity(methods.size()); + for (int i = 0; i < methods.size(); ++i) { + types.add(FieldValueTypeInformation.forGetter(methods.get(i), i)); + } + types.sort(Comparator.comparing(FieldValueTypeInformation::getNumber)); + validateFieldNumbers(types); + return types; + } + + private static void validateFieldNumbers(List types) { + for (int i = 0; i < types.size(); ++i) { + FieldValueTypeInformation type = types.get(i); + @javax.annotation.Nullable Integer number = type.getNumber(); + if (number == null) { + throw new RuntimeException("Unexpected null number for " + type.getName()); + } + Preconditions.checkState( + number == i, + "Expected field number " + + i + + " for field: " + + type.getName() + + " instead got " + + number); + } } @Override diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java index 363f4e9fe7d1..310d2c6429a1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java @@ -21,12 +21,13 @@ import java.lang.reflect.Field; import java.lang.reflect.Method; import java.lang.reflect.Modifier; +import java.util.Comparator; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; +import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; -import org.apache.beam.sdk.schemas.annotations.SchemaFieldName; import org.apache.beam.sdk.schemas.annotations.SchemaIgnore; import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.DefaultTypeConversionsFactory; import org.apache.beam.sdk.schemas.utils.FieldValueTypeSupplier; @@ -34,6 +35,8 @@ import org.apache.beam.sdk.schemas.utils.ReflectUtils; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; /** * A {@link SchemaProvider} for Java POJO objects. @@ -58,16 +61,16 @@ public static class JavaFieldTypeSupplier implements FieldValueTypeSupplier { @Override public List get(Class clazz) { - List types = + List fields = ReflectUtils.getFields(clazz).stream() - .filter(f -> !f.isAnnotationPresent(SchemaIgnore.class)) - .map(FieldValueTypeInformation::forField) - .map( - t -> { - SchemaFieldName fieldName = t.getField().getAnnotation(SchemaFieldName.class); - return (fieldName != null) ? t.withName(fieldName.value()) : t; - }) + .filter(m -> !m.isAnnotationPresent(SchemaIgnore.class)) .collect(Collectors.toList()); + List types = Lists.newArrayListWithCapacity(fields.size()); + for (int i = 0; i < fields.size(); ++i) { + types.add(FieldValueTypeInformation.forField(fields.get(i), i)); + } + types.sort(Comparator.comparing(FieldValueTypeInformation::getNumber)); + validateFieldNumbers(types); // If there are no creators registered, then make sure none of the schema fields are final, // as we (currently) have no way of creating classes in this case. @@ -91,6 +94,24 @@ public List get(Class clazz) { } } + private static void validateFieldNumbers(List types) { + for (int i = 0; i < types.size(); ++i) { + FieldValueTypeInformation type = types.get(i); + @Nullable Integer number = type.getNumber(); + if (number == null) { + throw new RuntimeException("Unexpected null number for " + type.getName()); + } + Preconditions.checkState( + number == i, + "Expected field number " + + i + + " for field + " + + type.getName() + + " instead got " + + number); + } + } + @Override public Schema schemaFor(TypeDescriptor typeDescriptor) { return POJOUtils.schemaFromPojoClass( diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/annotations/SchemaFieldNumber.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/annotations/SchemaFieldNumber.java new file mode 100644 index 000000000000..729630900bde --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/annotations/SchemaFieldNumber.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.annotations; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import javax.annotation.Nonnull; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; + +/** + * When used on a {@link org.apache.beam.sdk.schemas.JavaFieldSchema POJO} field, a {@link + * org.apache.beam.sdk.schemas.JavaBeanSchema Java Bean} getter, or an {@link + * org.apache.beam.sdk.schemas.AutoValueSchema AutoValue} getter, the generated field will have the + * specified index. There cannot be "gaps" in field numbers, or schema inference will fail. If used, + * all fields (or getters in the case of a bean) must be annotated. + * + *

For example, say we have a Java POJO with a field that we want in our schema but under a + * different name: + * + *


+ *  {@literal @}DefaultSchema(JavaFieldSchema.class)
+ *   class MyClass {
+ *     {@literal @}SchemaFieldNumber(1)
+ *     public String user;
+ *
+ *    {@literal @}SchemaFieldNumber(0)
+ *     public int ageInYears;
+ *   }
+ * 
+ * + *

The resulting schema will have ageInYears first followed by user. + */ +@Documented +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.FIELD, ElementType.METHOD}) +@SuppressWarnings({ + "rawtypes" // TODO(https://issues.apache.org/jira/browse/BEAM-10556) +}) +@Experimental(Kind.SCHEMAS) +public @interface SchemaFieldNumber { + + /** The name to use for the generated schema field. */ + @Nonnull + String value(); +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java index 0c0f70a5ef5b..1387d847e82d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java @@ -658,11 +658,13 @@ public List get(Class clazz) { @Override public List get(Class clazz, Schema schema) { Map mapping = getMapping(schema); + List methods = ReflectUtils.getMethods(clazz); List types = Lists.newArrayList(); - for (Method method : ReflectUtils.getMethods(clazz)) { + for (int i = 0; i < methods.size(); ++i) { + Method method = methods.get(i); if (ReflectUtils.isGetter(method)) { FieldValueTypeInformation fieldValueTypeInformation = - FieldValueTypeInformation.forGetter(method); + FieldValueTypeInformation.forGetter(method, i); String name = mapping.get(fieldValueTypeInformation.getName()); if (name != null) { types.add(fieldValueTypeInformation.withName(name)); @@ -706,10 +708,12 @@ private Map getMapping(Schema schema) { private static final class AvroPojoFieldValueTypeSupplier implements FieldValueTypeSupplier { @Override public List get(Class clazz) { + List classFields = ReflectUtils.getFields(clazz); Map types = Maps.newHashMap(); - for (java.lang.reflect.Field f : ReflectUtils.getFields(clazz)) { + for (int i = 0; i < classFields.size(); ++i) { + java.lang.reflect.Field f = classFields.get(i); if (!f.isAnnotationPresent(AvroIgnore.class)) { - FieldValueTypeInformation typeInformation = FieldValueTypeInformation.forField(f); + FieldValueTypeInformation typeInformation = FieldValueTypeInformation.forField(f, i); AvroName avroname = f.getAnnotation(AvroName.class); if (avroname != null) { typeInformation = typeInformation.withName(avroname.value()); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/AutoValueSchemaTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/AutoValueSchemaTest.java index db67a1f52030..cd8a824351e5 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/AutoValueSchemaTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/AutoValueSchemaTest.java @@ -33,6 +33,7 @@ import org.apache.beam.sdk.schemas.annotations.SchemaCaseFormat; import org.apache.beam.sdk.schemas.annotations.SchemaCreate; import org.apache.beam.sdk.schemas.annotations.SchemaFieldName; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldNumber; import org.apache.beam.sdk.schemas.utils.SchemaTestUtils; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.values.Row; @@ -536,6 +537,52 @@ public void testFromRowStaticFactory() throws NoSuchSchemaException { verifyAutoValue(value); } + @AutoValue + @DefaultSchema(AutoValueSchema.class) + abstract static class SchemaFieldNumberSimpleClass { + @SchemaFieldNumber("1") + abstract String getStr(); + + @SchemaFieldNumber("0") + abstract Long getLng(); + } + + private static final Schema FIELD_NUMBER_SCHEMA = + Schema.of(Field.of("lng", FieldType.INT64), Field.of("str", FieldType.STRING)); + + @Test + public void testSchema_SchemaFieldNumber() throws NoSuchSchemaException { + SchemaRegistry registry = SchemaRegistry.createDefault(); + Schema schema = registry.getSchema(SchemaFieldNumberSimpleClass.class); + SchemaTestUtils.assertSchemaEquivalent(FIELD_NUMBER_SCHEMA, schema); + } + + @Test + public void testFromRow_SchemaFieldNumber() throws NoSuchSchemaException { + SchemaRegistry registry = SchemaRegistry.createDefault(); + Row row = + Row.withSchema(FIELD_NUMBER_SCHEMA) + .withFieldValue("lng", 42L) + .withFieldValue("str", "value!") + .build(); + SchemaFieldNumberSimpleClass value = + registry.getFromRowFunction(SchemaFieldNumberSimpleClass.class).apply(row); + assertEquals("value!", value.getStr()); + assertEquals(42L, (long) value.getLng()); + } + + @Test + public void testToRow_SchemaFieldNumber() throws NoSuchSchemaException { + SchemaRegistry registry = SchemaRegistry.createDefault(); + AutoValue_AutoValueSchemaTest_SchemaFieldNumberSimpleClass value = + new AutoValue_AutoValueSchemaTest_SchemaFieldNumberSimpleClass("another value!", 42L); + Row row = registry.getToRowFunction(SchemaFieldNumberSimpleClass.class).apply(value); + assertEquals("another value!", row.getValue("str")); + assertEquals((String) row.getValue(1), (String) row.getValue("str")); + assertEquals(42L, (long) row.getValue("lng")); + assertEquals((long) row.getValue(0), (long) row.getValue("lng")); + } + @AutoValue @DefaultSchema(AutoValueSchema.class) abstract static class SchemaFieldNameSimpleClass { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaBeanSchemaTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaBeanSchemaTest.java index 7f5d864c325b..cc8ba12f490e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaBeanSchemaTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaBeanSchemaTest.java @@ -19,6 +19,7 @@ import static org.apache.beam.sdk.schemas.utils.SchemaTestUtils.equivalentTo; import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.ALL_NULLABLE_BEAN_SCHEMA; +import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.ANNOTATED_SIMPLE_BEAN_SCHEMA; import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.ARRAY_OF_BYTE_ARRAY_BEAM_SCHEMA; import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.CASE_FORMAT_BEAM_SCHEMA; import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.ITERABLE_BEAM_SCHEMA; @@ -464,14 +465,16 @@ public void testMapFieldSetters() throws NoSuchSchemaException { public void testAnnotations() throws NoSuchSchemaException { SchemaRegistry registry = SchemaRegistry.createDefault(); Schema schema = registry.getSchema(SimpleBeanWithAnnotations.class); - SchemaTestUtils.assertSchemaEquivalent(SIMPLE_BEAN_SCHEMA, schema); + SchemaTestUtils.assertSchemaEquivalent(ANNOTATED_SIMPLE_BEAN_SCHEMA, schema); SimpleBeanWithAnnotations pojo = createAnnotated("string"); Row row = registry.getToRowFunction(SimpleBeanWithAnnotations.class).apply(pojo); assertEquals(12, row.getFieldCount()); assertEquals("string", row.getString("str")); assertEquals((byte) 1, (Object) row.getByte("aByte")); + assertEquals(row.getValue(2), (Object) row.getByte("aByte")); assertEquals((short) 2, (Object) row.getInt16("aShort")); + assertEquals(row.getValue(1), (Object) row.getInt16("aShort")); assertEquals((int) 3, (Object) row.getInt32("anInt")); assertEquals((long) 4, (Object) row.getInt64("aLong")); assertTrue(row.getBoolean("aBoolean")); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaFieldSchemaTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaFieldSchemaTest.java index 4f8f6bd8a600..90a4c2e4a9fc 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaFieldSchemaTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaFieldSchemaTest.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.schemas; import static org.apache.beam.sdk.schemas.utils.SchemaTestUtils.equivalentTo; +import static org.apache.beam.sdk.schemas.utils.TestPOJOs.ANNOTATED_SIMPLE_POJO_SCHEMA; import static org.apache.beam.sdk.schemas.utils.TestPOJOs.CASE_FORMAT_POJO_SCHEMA; import static org.apache.beam.sdk.schemas.utils.TestPOJOs.ENUMERATION; import static org.apache.beam.sdk.schemas.utils.TestPOJOs.NESTED_ARRAYS_POJO_SCHEMA; @@ -157,6 +158,24 @@ private Row createSimpleRow(String name) { .build(); } + private Row createAnnotatedRow(String name) { + return Row.withSchema(ANNOTATED_SIMPLE_POJO_SCHEMA) + .addValues( + name, + (short) 2, + (byte) 1, + 3, + 4L, + true, + DATE, + INSTANT, + BYTE_ARRAY, + BYTE_BUFFER.array(), + BigDecimal.ONE, + new StringBuilder(name).append("builder").toString()) + .build(); + } + @Test public void testSchema() throws NoSuchSchemaException { SchemaRegistry registry = SchemaRegistry.createDefault(); @@ -512,9 +531,9 @@ public void testNestedNullValuesSetters() throws NoSuchSchemaException { public void testAnnotations() throws NoSuchSchemaException { SchemaRegistry registry = SchemaRegistry.createDefault(); Schema schema = registry.getSchema(AnnotatedSimplePojo.class); - SchemaTestUtils.assertSchemaEquivalent(SIMPLE_POJO_SCHEMA, schema); + SchemaTestUtils.assertSchemaEquivalent(ANNOTATED_SIMPLE_POJO_SCHEMA, schema); - Row simpleRow = createSimpleRow("string"); + Row simpleRow = createAnnotatedRow("string"); AnnotatedSimplePojo pojo = createAnnotated("string"); assertEquals(simpleRow, registry.getToRowFunction(AnnotatedSimplePojo.class).apply(pojo)); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestJavaBeans.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestJavaBeans.java index e1d76f1a2841..7be3b3e84ee6 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestJavaBeans.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestJavaBeans.java @@ -30,6 +30,7 @@ import org.apache.beam.sdk.schemas.annotations.SchemaCaseFormat; import org.apache.beam.sdk.schemas.annotations.SchemaCreate; import org.apache.beam.sdk.schemas.annotations.SchemaFieldName; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldNumber; import org.apache.beam.sdk.schemas.annotations.SchemaIgnore; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.CaseFormat; import org.checkerframework.checker.nullness.qual.Nullable; @@ -548,52 +549,64 @@ public String getUnknown() { return ""; } + @SchemaFieldNumber("0") public String getStr() { return str; } @SchemaFieldName("aByte") + @SchemaFieldNumber("2") public byte getTheByteByte() { return aByte; } @SchemaFieldName("aShort") + @SchemaFieldNumber("1") public short getNotAShort() { return aShort; } + @SchemaFieldNumber("3") public int getAnInt() { return anInt; } + @SchemaFieldNumber("4") public long getaLong() { return aLong; } + @SchemaFieldNumber("5") public boolean isaBoolean() { return aBoolean; } + @SchemaFieldNumber("6") public DateTime getDateTime() { return dateTime; } + @SchemaFieldNumber("7") public byte[] getBytes() { return bytes; } + @SchemaFieldNumber("8") public ByteBuffer getByteBuffer() { return byteBuffer; } + @SchemaFieldNumber("9") public Instant getInstant() { return instant; } + @SchemaFieldNumber("10") public BigDecimal getBigDecimal() { return bigDecimal; } + @SchemaFieldNumber("11") public StringBuilder getStringBuilder() { return stringBuilder; } @@ -641,6 +654,23 @@ public int hashCode() { } } + /** The schema for {@link SimpleBean}. * */ + public static final Schema ANNOTATED_SIMPLE_BEAN_SCHEMA = + Schema.builder() + .addStringField("str") + .addInt16Field("aShort") + .addByteField("aByte") + .addInt32Field("anInt") + .addInt64Field("aLong") + .addBooleanField("aBoolean") + .addDateTimeField("dateTime") + .addDateTimeField("instant") + .addByteArrayField("bytes") + .addByteArrayField("byteBuffer") + .addDecimalField("bigDecimal") + .addStringField("stringBuilder") + .build(); + /** A Bean containing a nested class. * */ @DefaultSchema(JavaBeanSchema.class) public static class NestedBean { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestPOJOs.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestPOJOs.java index 058bd7d82a91..9067b2f37893 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestPOJOs.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestPOJOs.java @@ -30,6 +30,7 @@ import org.apache.beam.sdk.schemas.annotations.SchemaCaseFormat; import org.apache.beam.sdk.schemas.annotations.SchemaCreate; import org.apache.beam.sdk.schemas.annotations.SchemaFieldName; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldNumber; import org.apache.beam.sdk.schemas.annotations.SchemaIgnore; import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.CaseFormat; @@ -225,23 +226,44 @@ public int hashCode() { /** A POJO for testing annotations. */ @DefaultSchema(JavaFieldSchema.class) public static class AnnotatedSimplePojo { + @SchemaFieldNumber("0") public final String str; @SchemaFieldName("aByte") + @SchemaFieldNumber("2") public final byte theByte; @SchemaFieldName("aShort") + @SchemaFieldNumber("1") public final short theShort; + @SchemaFieldNumber("3") public final int anInt; + + @SchemaFieldNumber("4") public final long aLong; + + @SchemaFieldNumber("5") public final boolean aBoolean; + + @SchemaFieldNumber("6") public final DateTime dateTime; + + @SchemaFieldNumber("7") public final Instant instant; + + @SchemaFieldNumber("8") public final byte[] bytes; + + @SchemaFieldNumber("9") public final ByteBuffer byteBuffer; + + @SchemaFieldNumber("10") public final BigDecimal bigDecimal; + + @SchemaFieldNumber("11") public final StringBuilder stringBuilder; + @SchemaIgnore public final Integer pleaseIgnore; // Marked with SchemaCreate, so this will be called to construct instances. @@ -350,6 +372,23 @@ public String toString() { } } + /** The schema for {@link SimplePOJO}. * */ + public static final Schema ANNOTATED_SIMPLE_POJO_SCHEMA = + Schema.builder() + .addStringField("str") + .addInt16Field("aShort") + .addByteField("aByte") + .addInt32Field("anInt") + .addInt64Field("aLong") + .addBooleanField("aBoolean") + .addDateTimeField("dateTime") + .addDateTimeField("instant") + .addByteArrayField("bytes") + .addByteArrayField("byteBuffer") + .addDecimalField("bigDecimal") + .addStringField("stringBuilder") + .build(); + /** A simple POJO containing basic types. * */ @DefaultSchema(JavaFieldSchema.class) public static class SimplePOJO { From fd5b1a3cbc6af263d1bd64457e0665de90227ad5 Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Mon, 10 Jan 2022 13:07:31 -0800 Subject: [PATCH 2/3] fix broken files --- .../beam/sdk/extensions/protobuf/ProtoMessageSchema.java | 7 ++++--- .../java/org/apache/beam/sdk/io/thrift/ThriftSchema.java | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoMessageSchema.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoMessageSchema.java index c4b77382a51c..0d0cc61a5beb 100644 --- a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoMessageSchema.java +++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoMessageSchema.java @@ -65,7 +65,8 @@ public List get(Class clazz, Schema schema) { Multimap methods = ReflectUtils.getMethodsMap(clazz); List types = Lists.newArrayListWithCapacity(schema.getFieldCount()); - for (Field field : schema.getFields()) { + for (int i = 0; i < schema.getFieldCount(); ++i) { + Field field = schema.getField(i); if (field.getType().isLogicalType(OneOfType.IDENTIFIER)) { // This is a OneOf. Look for the getters for each OneOf option. OneOfType oneOfType = field.getType().getLogicalType(OneOfType.class); @@ -74,7 +75,7 @@ public List get(Class clazz, Schema schema) { Method method = getProtoGetter(methods, oneOfField.getName(), oneOfField.getType()); oneOfTypes.put( oneOfField.getName(), - FieldValueTypeInformation.forGetter(method).withName(field.getName())); + FieldValueTypeInformation.forGetter(method, i).withName(field.getName())); } // Add an entry that encapsulates information about all possible getters. types.add( @@ -84,7 +85,7 @@ public List get(Class clazz, Schema schema) { } else { // This is a simple field. Add the getter. Method method = getProtoGetter(methods, field.getName(), field.getType()); - types.add(FieldValueTypeInformation.forGetter(method).withName(field.getName())); + types.add(FieldValueTypeInformation.forGetter(method, i).withName(field.getName())); } } return types; diff --git a/sdks/java/io/thrift/src/main/java/org/apache/beam/sdk/io/thrift/ThriftSchema.java b/sdks/java/io/thrift/src/main/java/org/apache/beam/sdk/io/thrift/ThriftSchema.java index ce13f66112a5..f17e56a5782f 100644 --- a/sdks/java/io/thrift/src/main/java/org/apache/beam/sdk/io/thrift/ThriftSchema.java +++ b/sdks/java/io/thrift/src/main/java/org/apache/beam/sdk/io/thrift/ThriftSchema.java @@ -245,7 +245,7 @@ private FieldValueTypeInformation fieldValueTypeInfo(Class type, String field return FieldValueTypeInformation.forSetter(factoryMethods.get(0), ""); } else { try { - return FieldValueTypeInformation.forField(type.getDeclaredField(fieldName)); + return FieldValueTypeInformation.forField(type.getDeclaredField(fieldName), 0); } catch (NoSuchFieldException e) { throw new IllegalArgumentException(e); } From 457ad7e2bcafa24edddbba4d72166006ebe54aae Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Thu, 13 Jan 2022 14:35:40 -0800 Subject: [PATCH 3/3] add validation --- .../java/org/apache/beam/sdk/schemas/JavaBeanSchema.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java index 9f2f70df9236..d0d89ec442b2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java @@ -26,6 +26,7 @@ import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.schemas.annotations.SchemaCaseFormat; import org.apache.beam.sdk.schemas.annotations.SchemaFieldName; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldNumber; import org.apache.beam.sdk.schemas.annotations.SchemaIgnore; import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.DefaultTypeConversionsFactory; import org.apache.beam.sdk.schemas.utils.FieldValueTypeSupplier; @@ -119,6 +120,12 @@ public List get(Class clazz) { .map(FieldValueTypeInformation::forSetter) .map( t -> { + if (t.getMethod().getAnnotation(SchemaFieldNumber.class) != null) { + throw new RuntimeException( + String.format( + "@SchemaFieldNumber can only be used on getters in Java Beans. Found on setter '%s'", + t.getMethod().getName())); + } if (t.getMethod().getAnnotation(SchemaFieldName.class) != null) { throw new RuntimeException( String.format(