diff --git a/build.gradle.kts b/build.gradle.kts index 8dcdc14f04e7..dd9bf14ceb54 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -44,6 +44,7 @@ tasks.rat { "**/package-list", "**/test.avsc", + "**/logical-types.avsc", "**/user.avsc", "**/test/resources/**/*.txt", "**/test/resources/**/*.csv", diff --git a/buildSrc/build.gradle.kts b/buildSrc/build.gradle.kts index cd8aed6d3a67..d2f89cc62325 100644 --- a/buildSrc/build.gradle.kts +++ b/buildSrc/build.gradle.kts @@ -45,7 +45,7 @@ dependencies { implementation("com.github.spotbugs.snom:spotbugs-gradle-plugin:5.0.14") runtimeOnly("com.google.protobuf:protobuf-gradle-plugin:0.8.13") // Enable proto code generation - runtimeOnly("com.github.davidmc24.gradle.plugin:gradle-avro-plugin:1.9.1") // Enable Avro code generation + runtimeOnly("com.github.davidmc24.gradle.plugin:gradle-avro-plugin:1.9.1") // Enable Avro code generation. Version 1.1.0 is the last supporting avro 1.10.2 runtimeOnly("com.diffplug.spotless:spotless-plugin-gradle:5.6.1") // Enable a code formatting plugin runtimeOnly("gradle.plugin.com.dorongold.plugins:task-tree:1.5") // Adds a 'taskTree' task to print task dependency tree runtimeOnly("gradle.plugin.com.github.johnrengelman:shadow:7.1.1") // Enable shading Java dependencies diff --git a/sdks/java/extensions/avro/build.gradle b/sdks/java/extensions/avro/build.gradle index 6631779e609c..3d22befaf4d6 100644 --- a/sdks/java/extensions/avro/build.gradle +++ b/sdks/java/extensions/avro/build.gradle @@ -67,6 +67,7 @@ dependencies { implementation library.java.error_prone_annotations implementation library.java.avro implementation library.java.joda_time + implementation library.java.commons_lang3 testImplementation(project(path: ":sdks:java:core", configuration: "shadowTest")) { // Exclude Avro dependencies from "core" since Avro support moved to this extension exclude group: "org.apache.avro", module: "avro" @@ -143,9 +144,11 @@ avroVersions.each { k, v -> main = "org.apache.avro.tool.Main" args = [ "compile", + "-bigDecimal", // Use BigDecimal for logical type decimal, similarly to what gradle-avro-plugin does "schema", "src/test/avro/org/apache/beam/sdk/extensions/avro/io/user.avsc", "src/test/avro/org/apache/beam/sdk/extensions/avro/schemas/test.avsc", + "src/test/avro/org/apache/beam/sdk/extensions/avro/schemas/logicaltypes/logical-types.avsc", "build/generated/sources/avro$k/test/java" ] } @@ -161,4 +164,4 @@ static def createTaskNames(Map prefixMap, String suffix) { return prefixMap.keySet().stream() .map { version -> "avroVersion${version}${suffix}" } .collect(Collectors.toList()) -} \ No newline at end of file +} diff --git a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java index da7daf605d89..460bfaec4a36 100644 --- a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java +++ b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java @@ -34,6 +34,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import javax.annotation.Nonnull; @@ -49,6 +50,7 @@ import net.bytebuddy.implementation.bytecode.member.MethodInvocation; import net.bytebuddy.matcher.ElementMatchers; import org.apache.avro.AvroRuntimeException; +import org.apache.avro.Conversion; import org.apache.avro.Conversions; import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; @@ -61,6 +63,7 @@ import org.apache.avro.reflect.AvroName; import org.apache.avro.reflect.ReflectData; import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.specific.SpecificRecordBase; import org.apache.avro.util.Utf8; import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; import org.apache.beam.sdk.extensions.avro.schemas.AvroRecordSchema; @@ -97,6 +100,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; +import org.apache.commons.lang3.reflect.FieldUtils; import org.checkerframework.checker.nullness.qual.EnsuresNonNullIf; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; @@ -154,6 +158,15 @@ public class AvroUtils { new ForLoadedType(ReadableInstant.class); private static final ForLoadedType JODA_INSTANT = new ForLoadedType(Instant.class); + private static final GenericData GENERIC_DATA_WITH_DEFAULT_CONVERSIONS; + + static { + GENERIC_DATA_WITH_DEFAULT_CONVERSIONS = new GenericData(); + addLogicalTypeConversions(GENERIC_DATA_WITH_DEFAULT_CONVERSIONS); + GENERIC_DATA_WITH_DEFAULT_CONVERSIONS.addLogicalTypeConversion( + new Conversions.DecimalConversion()); + } + // contains workarounds for third-party methods that accept nullable arguments but lack proper // annotations private static class NullnessCheckerWorkarounds { @@ -552,23 +565,43 @@ public static org.apache.avro.Schema toAvroSchema(Schema beamSchema) { * Strict conversion from AVRO to Beam, strict because it doesn't do widening or narrowing during * conversion. If Schema is not provided, one is inferred from the AVRO schema. */ - public static Row toBeamRowStrict(GenericRecord record, @Nullable Schema schema) { + public static Row toBeamRowStrict( + GenericRecord record, @Nullable Schema schema, @Nullable GenericData genericData) { if (schema == null) { schema = toBeamSchema(record.getSchema()); } + if (genericData == null) { + if (record instanceof SpecificRecordBase) { + // in case of SpecificRecord, the MODEL$ GenericData already has registered the specific + // conversions + genericData = getGenericData((SpecificRecordBase) record); + } else { + genericData = GENERIC_DATA_WITH_DEFAULT_CONVERSIONS; + } + } + Row.Builder builder = Row.withSchema(schema); org.apache.avro.Schema avroSchema = record.getSchema(); for (Field field : schema.getFields()) { Object value = record.get(field.getName()); org.apache.avro.Schema fieldAvroSchema = avroSchema.getField(field.getName()).schema(); - builder.addValue(convertAvroFieldStrict(value, fieldAvroSchema, field.getType())); + builder.addValue( + convertAvroFieldStrict(value, fieldAvroSchema, field.getType(), genericData)); } return builder.build(); } + /** + * Strict conversion from AVRO to Beam, strict because it doesn't do widening or narrowing during + * conversion. If Schema is not provided, one is inferred from the AVRO schema. + */ + public static Row toBeamRowStrict(GenericRecord record, @Nullable Schema schema) { + return toBeamRowStrict(record, schema, null); + } + /** * Convert from a Beam Row to an AVRO GenericRecord. The Avro Schema is inferred from the Beam * schema on the row. @@ -1323,6 +1356,67 @@ private static org.apache.avro.Schema getFieldSchema( } } + private static Object convertLogicalType( + @PolyNull Object value, + @Nonnull org.apache.avro.Schema avroSchema, + @Nonnull FieldType fieldType, + @Nonnull GenericData genericData) { + TypeWithNullability type = new TypeWithNullability(avroSchema); + LogicalType logicalType = LogicalTypes.fromSchema(type.type); + if (logicalType == null) { + return null; + } + + Object rawType = value; + + Conversion conversion = genericData.getConversionByClass(value.getClass(), logicalType); + Class convertedType = null; + if (conversion != null) { + convertedType = conversion.getConvertedType(); + if (convertedType.isInstance(value)) { + rawType = Conversions.convertToRawType(value, avroSchema, logicalType, conversion); + } + } + + // switch on string name because some LogicalType classes are not available in all versions of + // Avro + switch (logicalType.getName()) { + case "date": + return convertDateStrict( + checkRawType(Integer.class, value, logicalType, rawType, conversion, convertedType), + fieldType); + case "time-millis": + return checkRawType(Integer.class, value, logicalType, rawType, conversion, convertedType); + case "time-micros": + case "timestamp-micros": + case "local-timestamp-millis": + case "local-timestamp-micros": + return checkRawType(Long.class, value, logicalType, rawType, conversion, convertedType); + case "timestamp-millis": + return convertDateTimeStrict( + checkRawType(Long.class, value, logicalType, rawType, conversion, convertedType), + fieldType); + case "decimal": + { + if (rawType instanceof GenericFixed) { + // Decimal can be backed by ByteBuffer or GenericFixed. in case of GenericFixed, we + // convert it to ByteBuffer here + rawType = ByteBuffer.wrap(((GenericFixed) rawType).bytes()); + } + ByteBuffer byteBuffer = + checkRawType( + ByteBuffer.class, value, logicalType, rawType, conversion, convertedType); + Conversion decimalConversion = new Conversions.DecimalConversion(); + BigDecimal bigDecimal = + decimalConversion.fromBytes(byteBuffer.duplicate(), type.type, logicalType); + return convertDecimal(bigDecimal, fieldType); + } + case "uuid": + return UUID.fromString(rawType.toString()).toString(); + } + return null; + } + /** * Strict conversion from AVRO to Beam, strict because it doesn't do widening or narrowing during * conversion. @@ -1330,44 +1424,26 @@ private static org.apache.avro.Schema getFieldSchema( * @param value {@link GenericRecord} or any nested value * @param avroSchema schema for value * @param fieldType target beam field type + * @param genericData {@link GenericData} instance to use for conversions * @return value converted for {@link Row} */ - @SuppressWarnings("unchecked") public static @PolyNull Object convertAvroFieldStrict( @PolyNull Object value, @Nonnull org.apache.avro.Schema avroSchema, - @Nonnull FieldType fieldType) { + @Nonnull FieldType fieldType, + @Nonnull GenericData genericData) { + if (value == null) { return null; } + Object convertedLogicalType = convertLogicalType(value, avroSchema, fieldType, genericData); - TypeWithNullability type = new TypeWithNullability(avroSchema); - LogicalType logicalType = LogicalTypes.fromSchema(type.type); - if (logicalType != null) { - if (logicalType instanceof LogicalTypes.Decimal) { - ByteBuffer byteBuffer = (ByteBuffer) value; - BigDecimal bigDecimal = - new Conversions.DecimalConversion() - .fromBytes(byteBuffer.duplicate(), type.type, logicalType); - return convertDecimal(bigDecimal, fieldType); - } else if (logicalType instanceof LogicalTypes.TimestampMillis) { - if (value instanceof ReadableInstant) { - return convertDateTimeStrict(((ReadableInstant) value).getMillis(), fieldType); - } else { - return convertDateTimeStrict((Long) value, fieldType); - } - } else if (logicalType instanceof LogicalTypes.Date) { - if (value instanceof ReadableInstant) { - int epochDays = Days.daysBetween(Instant.EPOCH, (ReadableInstant) value).getDays(); - return convertDateStrict(epochDays, fieldType); - } else if (value instanceof java.time.LocalDate) { - return convertDateStrict((int) ((java.time.LocalDate) value).toEpochDay(), fieldType); - } else { - return convertDateStrict((Integer) value, fieldType); - } - } + if (convertedLogicalType != null) { + return convertedLogicalType; } + TypeWithNullability type = new TypeWithNullability(avroSchema); + switch (type.type.getType()) { case FIXED: return convertFixedStrict((GenericFixed) value, fieldType); @@ -1402,14 +1478,15 @@ private static org.apache.avro.Schema getFieldSchema( return convertEnumStrict(value, fieldType); case ARRAY: - return convertArrayStrict((List) value, type.type.getElementType(), fieldType); + return convertArrayStrict( + (List) value, type.type.getElementType(), fieldType, genericData); case MAP: return convertMapStrict( - (Map) value, type.type.getValueType(), fieldType); + (Map) value, type.type.getValueType(), fieldType, genericData); case UNION: - return convertUnionStrict(value, type.type, fieldType); + return convertUnionStrict(value, type.type, fieldType, genericData); case NULL: throw new IllegalArgumentException("Can't convert 'null' to non-nullable field"); @@ -1419,6 +1496,24 @@ private static org.apache.avro.Schema getFieldSchema( } } + /** + * Strict conversion from AVRO to Beam, strict because it doesn't do widening or narrowing during + * conversion. + * + * @param value {@link GenericRecord} or any nested value + * @param avroSchema schema for value + * @param fieldType target beam field type + * @return value converted for {@link Row} + */ + @SuppressWarnings("unchecked") + public static @PolyNull Object convertAvroFieldStrict( + @PolyNull Object value, + @Nonnull org.apache.avro.Schema avroSchema, + @Nonnull FieldType fieldType) { + return convertAvroFieldStrict( + value, avroSchema, fieldType, GENERIC_DATA_WITH_DEFAULT_CONVERSIONS); + } + private static Object convertRecordStrict(GenericRecord record, FieldType fieldType) { checkTypeName(fieldType.getTypeName(), TypeName.ROW, "record"); return toBeamRowStrict(record, fieldType.getRowSchema()); @@ -1495,7 +1590,10 @@ private static Object convertEnumStrict(Object value, FieldType fieldType) { } private static Object convertUnionStrict( - Object value, org.apache.avro.Schema unionAvroSchema, FieldType fieldType) { + Object value, + org.apache.avro.Schema unionAvroSchema, + FieldType fieldType, + GenericData genericData) { checkTypeName(fieldType.getTypeName(), TypeName.LOGICAL_TYPE, "oneOfType"); checkArgument( checkNotNull(fieldType.getLogicalType()).getIdentifier().equals(OneOfType.IDENTIFIER)); @@ -1503,19 +1601,24 @@ private static Object convertUnionStrict( int fieldNumber = GenericData.get().resolveUnion(unionAvroSchema, value); FieldType baseFieldType = oneOfType.getOneOfSchema().getField(fieldNumber).getType(); Object convertedValue = - convertAvroFieldStrict(value, unionAvroSchema.getTypes().get(fieldNumber), baseFieldType); + convertAvroFieldStrict( + value, unionAvroSchema.getTypes().get(fieldNumber), baseFieldType, genericData); return oneOfType.createValue(fieldNumber, convertedValue); } private static Object convertArrayStrict( - List values, org.apache.avro.Schema elemAvroSchema, FieldType fieldType) { + List values, + org.apache.avro.Schema elemAvroSchema, + FieldType fieldType, + GenericData genericData) { checkTypeName(fieldType.getTypeName(), TypeName.ARRAY, "array"); List ret = new ArrayList<>(values.size()); FieldType elemFieldType = fieldType.getCollectionElementType(); for (Object value : values) { - ret.add(convertAvroFieldStrict(value, elemAvroSchema, checkNotNull(elemFieldType))); + ret.add( + convertAvroFieldStrict(value, elemAvroSchema, checkNotNull(elemFieldType), genericData)); } return ret; @@ -1524,7 +1627,8 @@ private static Object convertArrayStrict( private static Object convertMapStrict( Map values, org.apache.avro.Schema valueAvroSchema, - FieldType fieldType) { + FieldType fieldType, + GenericData genericData) { checkTypeName(fieldType.getTypeName(), TypeName.MAP, "map"); FieldType mapKeyType = checkNotNull(fieldType.getMapKeyType()); FieldType mapValueType = checkNotNull(fieldType.getMapValueType()); @@ -1539,7 +1643,7 @@ private static Object convertMapStrict( for (Map.Entry value : values.entrySet()) { ret.put( convertStringStrict(value.getKey(), mapKeyType), - convertAvroFieldStrict(value.getValue(), valueAvroSchema, mapValueType)); + convertAvroFieldStrict(value.getValue(), valueAvroSchema, mapValueType, genericData)); } return ret; @@ -1563,4 +1667,46 @@ private static org.apache.avro.Schema buildHiveLogicalTypeSchema( hiveLogicalType, size); return new org.apache.avro.Schema.Parser().parse(schemaJson); } + + static GenericData getGenericData(SpecificRecordBase record) { + try { + return record.getSpecificData(); + } catch (NoSuchMethodError e) { + try { + // SpecificRecordBase.getSpecificData() was not available in avro 182 + return (GenericData) FieldUtils.readStaticField(record.getClass(), "MODEL$", true); + } catch (IllegalAccessException ex) { + throw new IllegalArgumentException( + "Unable to access MODEL$ field in SpecificRecordBase class", ex); + } + } + } + + private static T checkRawType( + Class desiredRawType, + Object value, + LogicalType logicalType, + Object rawType, + Conversion conversion, + Class convertedType) { + String msg = + String.format( + "Value %s of class %s is not a supported type for logical type %s (%s). " + + "Underlying avro built-in raw type should be instance of %s. " + + "However it is instance of %s and has value %s ." + + "Generic data has conversion %s, convertedType %s", + value, + value.getClass(), + logicalType.getName(), + logicalType, + desiredRawType, + rawType.getClass(), + rawType, + conversion, + convertedType); + if (!desiredRawType.isInstance(rawType)) { + throw new IllegalArgumentException(msg); + } + return (T) rawType; + } } diff --git a/sdks/java/extensions/avro/src/test/avro/org/apache/beam/sdk/extensions/avro/schemas/logicaltypes/logical-types.avsc b/sdks/java/extensions/avro/src/test/avro/org/apache/beam/sdk/extensions/avro/schemas/logicaltypes/logical-types.avsc new file mode 100644 index 000000000000..1fddfde068ff --- /dev/null +++ b/sdks/java/extensions/avro/src/test/avro/org/apache/beam/sdk/extensions/avro/schemas/logicaltypes/logical-types.avsc @@ -0,0 +1,123 @@ +{ + "type": "record", + "name": "LogicalTypesExample", + "namespace": "org.apache.beam.sdk.extensions.avro.schemas.logicaltypes", + "fields": [ + { + "name": "dateField", + "type": { + "type": "int", + "logicalType": "date" + } + }, + { + "name": "timeMillisField", + "type": { + "type": "int", + "logicalType": "time-millis" + } + }, + { + "name": "timeMicrosField", + "type": { + "type": "long", + "logicalType": "time-micros" + } + }, + { + "name": "timestampMillisField", + "type": { + "type": "long", + "logicalType": "timestamp-millis" + } + }, + { + "name": "timestampMicrosField", + "type": { + "type": "long", + "logicalType": "timestamp-micros" + } + }, + { + "name": "localTimestampMillisField", + "type": { + "type": "long", + "logicalType": "local-timestamp-millis" + } + }, + { + "name": "localTimestampMicrosField", + "type": { + "type": "long", + "logicalType": "local-timestamp-micros" + } + }, + { + "name": "decimalSmall", + "type": { + "type": "bytes", + "logicalType": "decimal", + "precision": 5, + "scale": 2 + } + }, + { + "name": "decimalMedium", + "type": { + "type": "bytes", + "logicalType": "decimal", + "precision": 10, + "scale": 4 + } + }, + { + "name": "decimalLarge", + "type": { + "type": "bytes", + "logicalType": "decimal", + "precision": 20, + "scale": 6 + } + }, + { + "name": "fixedDecimalSmall", + "type": { + "type": "fixed", + "size": 4, + "logicalType": "decimal", + "precision": 6, + "scale": 2, + "name": "fixedDecimalSmall" + } + }, + { + "name": "fixedDecimalMedium", + "type": { + "type": "fixed", + "size": 8, + "logicalType": "decimal", + "precision": 14, + "scale": 4, + "name": "fixedDecimalMedium" + } + }, + { + "name": "fixedDecimalLarge", + "type": { + "type": "fixed", + "size": 12, + "logicalType": "decimal", + "precision": 22, + "scale": 6, + "name": "fixedDecimalLarge" + } + }, + { + "name": "uuidField", + "type": { + "type": "string", + "logicalType": "uuid" + } + } + ] +} diff --git a/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java index 85781c4b8d0e..7cda1e9dba5a 100644 --- a/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java +++ b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java @@ -27,20 +27,26 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; import java.sql.JDBCType; +import java.time.temporal.ChronoUnit; +import java.time.temporal.TemporalUnit; import java.util.List; import java.util.Map; +import java.util.UUID; import org.apache.avro.Conversions; import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema.Type; +import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; import org.apache.avro.reflect.ReflectData; +import org.apache.avro.specific.SpecificRecordBase; import org.apache.avro.util.Utf8; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; import org.apache.beam.sdk.extensions.avro.io.AvroGeneratedUser; import org.apache.beam.sdk.extensions.avro.io.AvroGeneratedUserFactory; +import org.apache.beam.sdk.extensions.avro.schemas.logicaltypes.LogicalTypesExample; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.Field; import org.apache.beam.sdk.schemas.Schema.FieldType; @@ -60,6 +66,7 @@ import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.DateTime; +import org.joda.time.DateTimeFieldType; import org.joda.time.DateTimeZone; import org.joda.time.Days; import org.joda.time.Instant; @@ -115,6 +122,157 @@ public void supportsAnyAvroSchema( } } + @Test + public void supportsAllLogicalTypes() { + + BigDecimal bigDecimalPrecision5Scale2 = new BigDecimal("123.45"); + BigDecimal bigDecimalPrecision10Scale4 = new BigDecimal("12345.6789"); + BigDecimal bigDecimalPrecision20Scale6 = new BigDecimal("1234567.123456"); + UUID uuid = java.util.UUID.fromString("aa5961a8-a14a-4e8c-91a9-e5d3f35389e8"); + + long timestampMicros = 1739543415001000L; + long timeMicros = 52215000500L; + + DateTime dateTime = new DateTime(2025, 2, 17, 0, 0, 0, DateTimeZone.UTC); + + SpecificRecordBase genericRecord = + getSpecificRecordWithLogicalTypes( + dateTime, + timeMicros, + timestampMicros, + bigDecimalPrecision5Scale2, + bigDecimalPrecision10Scale4, + bigDecimalPrecision20Scale6, + uuid); + + Row expected = + getRowWithLogicalTypes( + dateTime, + timeMicros, + timestampMicros, + bigDecimalPrecision5Scale2, + bigDecimalPrecision10Scale4, + bigDecimalPrecision20Scale6, + uuid); + + GenericData genericData; + switch (VERSION_AVRO) { + case "1.8.2": + // SpecificRecords generated with 1.8.2 have no registered conversions. Still this is a + // supported case, as the user can pass a GenericData with the appropriate conversions to + // AvroUtils.toBeamRowStrict. + // Basically GenericRecords can contain objects of any type, as long as the user provides + // the appropriate conversions. + genericData = new GenericData(); + genericData.addLogicalTypeConversion(new AvroJodaTimeConversions.DateConversion()); + genericData.addLogicalTypeConversion(new AvroJodaTimeConversions.TimeConversion()); + genericData.addLogicalTypeConversion(new AvroJodaTimeConversions.TimestampConversion()); + genericData.addLogicalTypeConversion(new Conversions.DecimalConversion()); + break; + case "1.9.2": + // SpecificRecords generated with 1.9.2 have some registered conversions, but not all. We + // can add the missing ones manually. + genericData = AvroUtils.getGenericData(genericRecord); + genericData.addLogicalTypeConversion(new AvroJavaTimeConversions.TimeMicrosConversion()); + genericData.addLogicalTypeConversion( + new AvroJavaTimeConversions.TimestampMicrosConversion()); + break; + default: + // SpecificRecords generated with 1.10.0+ have all conversions registered. Passing null to + // toBeamRowStrict ensures that the GenericData of the record is used as is. + genericData = null; + } + + Row actual = AvroUtils.toBeamRowStrict(genericRecord, null, genericData); + + assertEquals(expected, actual); + } + + private static Row getRowWithLogicalTypes( + DateTime dateTime, + long timeMicros, + long timestampMicros, + BigDecimal bigDecimalPrecision5Scale2, + BigDecimal bigDecimalPrecision10Scale4, + BigDecimal bigDecimalPrecision20Scale6, + UUID uuid) { + return Row.withSchema(AvroUtils.toBeamSchema(LogicalTypesExample.getClassSchema())) + .withFieldValue("dateField", dateTime) + .withFieldValue("timeMillisField", (int) (timeMicros / 1000)) + .withFieldValue("timeMicrosField", timeMicros) + .withFieldValue("timestampMillisField", jodaInstant(timestampMicros)) + .withFieldValue("timestampMicrosField", timestampMicros) + .withFieldValue("localTimestampMillisField", timestampMicros / 1000) + .withFieldValue("localTimestampMicrosField", timestampMicros) + .withFieldValue("decimalSmall", bigDecimalPrecision5Scale2) + .withFieldValue("decimalMedium", bigDecimalPrecision10Scale4) + .withFieldValue("decimalLarge", bigDecimalPrecision20Scale6) + .withFieldValue("fixedDecimalSmall", bigDecimalPrecision5Scale2) + .withFieldValue("fixedDecimalMedium", bigDecimalPrecision10Scale4) + .withFieldValue("fixedDecimalLarge", bigDecimalPrecision20Scale6) + .withFieldValue("uuidField", uuid.toString()) + .build(); + } + + private static LogicalTypesExample getSpecificRecordWithLogicalTypes( + org.joda.time.DateTime dateTime, + long timeMicros, + long timestampMicros, + BigDecimal bigDecimalPrecision5Scale2, + BigDecimal bigDecimalPrecision10Scale4, + BigDecimal bigDecimalPrecision20Scale6, + UUID uuid) { + + java.time.LocalDate localDate = + java.time.LocalDate.of( + dateTime.get(DateTimeFieldType.year()), + dateTime.get(DateTimeFieldType.monthOfYear()), + dateTime.get(DateTimeFieldType.dayOfMonth())); + LogicalTypesExample r = new LogicalTypesExample(); + + if (VERSION_AVRO.equals("1.8.2")) { + // Avro 1.8.2 does not support java.time, must use joda time + r.put("dateField", dateTime.toLocalDate()); + r.put("timeMillisField", jodaLocalTime(timeMicros)); + r.put("timeMicrosField", timeMicros); + r.put("timestampMillisField", jodaInstant(timestampMicros).toDateTime()); + r.put("timestampMicrosField", timestampMicros); + } else { + r.put("dateField", localDate); + r.put("timeMillisField", javaLocalTime(timeMicros, ChronoUnit.MILLIS)); + r.put("timeMicrosField", javaLocalTime(timeMicros, ChronoUnit.MICROS)); + r.put("timestampMillisField", javaInstant(timestampMicros, ChronoUnit.MILLIS)); + r.put("timestampMicrosField", javaInstant(timestampMicros, ChronoUnit.MICROS)); + } + if (VERSION_AVRO.equals("1.8.2") || VERSION_AVRO.equals("1.9.2")) { + // local-timestamp-millis and local-timestamp-micros only in 1.10.0+ + r.put("localTimestampMillisField", timestampMicros / 1000); + r.put("localTimestampMicrosField", timestampMicros); + } else { + r.put( + "localTimestampMillisField", javaLocalDateTimeAtUtc(timestampMicros, ChronoUnit.MILLIS)); + r.put( + "localTimestampMicrosField", javaLocalDateTimeAtUtc(timestampMicros, ChronoUnit.MICROS)); + } + + r.put("decimalSmall", bigDecimalPrecision5Scale2); + r.put("decimalMedium", bigDecimalPrecision10Scale4); + r.put("decimalLarge", bigDecimalPrecision20Scale6); + r.put("fixedDecimalSmall", bigDecimalPrecision5Scale2); + r.put("fixedDecimalMedium", bigDecimalPrecision10Scale4); + r.put("fixedDecimalLarge", bigDecimalPrecision20Scale6); + + try { + r.put("uuidField", uuid.toString()); + } catch (ClassCastException e) { + // the avro tools version used by gradle-avro-plugin is more recent and uses UUID, while the + // ones used for backward compatibility tests (1.8.2, 1.9.2 and 1.10.2) use CharSequence + r.put("uuidField", uuid); + } + + return r; + } + @Property(trials = 1000) @SuppressWarnings("unchecked") public void avroToBeamRoundTrip( @@ -356,6 +514,28 @@ private static GenericRecord getGenericRecord() { .build(); } + private static java.time.Instant javaInstant(long micros, TemporalUnit temporalUnit) { + return java.time.Instant.ofEpochSecond(micros / 1000000, micros * 1000 % 1000000000) + .truncatedTo(temporalUnit); + } + + private static java.time.LocalDateTime javaLocalDateTimeAtUtc( + long micros, TemporalUnit temporalUnit) { + return javaInstant(micros, temporalUnit).atOffset(java.time.ZoneOffset.UTC).toLocalDateTime(); + } + + private static org.joda.time.Instant jodaInstant(long micros) { + return org.joda.time.Instant.ofEpochMilli(micros / 1000); + } + + private static java.time.LocalTime javaLocalTime(long micros, TemporalUnit temporalUnit) { + return java.time.LocalTime.ofNanoOfDay(micros * 1000).truncatedTo(temporalUnit); + } + + private static org.joda.time.LocalTime jodaLocalTime(long micros) { + return org.joda.time.LocalTime.fromMillisOfDay(micros / 1000); + } + @Test public void testFromAvroSchema() { assertEquals(getBeamSchema(), AvroUtils.toBeamSchema(getAvroSchema()));