From b733fb33f391a9d6a2a7d92503a512fed83a56da Mon Sep 17 00:00:00 2001 From: Alfredo Scaccialepre Date: Wed, 19 Feb 2025 16:30:56 +0100 Subject: [PATCH 01/20] #34009 avro generic record to beam row conversion added support for all logical types and conversions --- build.gradle.kts | 1 + buildSrc/build.gradle.kts | 2 +- sdks/java/extensions/avro/build.gradle | 5 +- .../avro/schemas/utils/AvroUtils.java | 218 +++++++++++++++--- .../schemas/logicaltypes/logical-types.avsc | 140 +++++++++++ .../avro/schemas/utils/AvroUtilsTest.java | 128 ++++++++++ 6 files changed, 454 insertions(+), 40 deletions(-) create mode 100644 sdks/java/extensions/avro/src/test/avro/org/apache/beam/sdk/extensions/avro/schemas/logicaltypes/logical-types.avsc diff --git a/build.gradle.kts b/build.gradle.kts index 0adb29058479..f94f093406b9 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -44,6 +44,7 @@ tasks.rat { "**/package-list", "**/test.avsc", + "**/logical-types.avsc", "**/user.avsc", "**/test/resources/**/*.txt", "**/test/resources/**/*.csv", diff --git a/buildSrc/build.gradle.kts b/buildSrc/build.gradle.kts index cd8aed6d3a67..7057dbde8866 100644 --- a/buildSrc/build.gradle.kts +++ b/buildSrc/build.gradle.kts @@ -45,7 +45,7 @@ dependencies { implementation("com.github.spotbugs.snom:spotbugs-gradle-plugin:5.0.14") runtimeOnly("com.google.protobuf:protobuf-gradle-plugin:0.8.13") // Enable proto code generation - runtimeOnly("com.github.davidmc24.gradle.plugin:gradle-avro-plugin:1.9.1") // Enable Avro code generation + runtimeOnly("com.github.davidmc24.gradle.plugin:gradle-avro-plugin:1.1.0") // Enable Avro code generation. Version 1.1.0 is the last supporting avro 1.10.2 runtimeOnly("com.diffplug.spotless:spotless-plugin-gradle:5.6.1") // Enable a code formatting plugin runtimeOnly("gradle.plugin.com.dorongold.plugins:task-tree:1.5") // Adds a 'taskTree' task to print task dependency tree runtimeOnly("gradle.plugin.com.github.johnrengelman:shadow:7.1.1") // Enable shading Java dependencies diff --git a/sdks/java/extensions/avro/build.gradle b/sdks/java/extensions/avro/build.gradle index 6631779e609c..63c825b178d5 100644 --- a/sdks/java/extensions/avro/build.gradle +++ b/sdks/java/extensions/avro/build.gradle @@ -67,6 +67,7 @@ dependencies { implementation library.java.error_prone_annotations implementation library.java.avro implementation library.java.joda_time + implementation 'org.apache.commons:commons-lang3' testImplementation(project(path: ":sdks:java:core", configuration: "shadowTest")) { // Exclude Avro dependencies from "core" since Avro support moved to this extension exclude group: "org.apache.avro", module: "avro" @@ -143,9 +144,11 @@ avroVersions.each { k, v -> main = "org.apache.avro.tool.Main" args = [ "compile", + "-bigDecimal", // Use BigDecimal for logical type decimal, similarly to what gradle-avro-plugin does "schema", "src/test/avro/org/apache/beam/sdk/extensions/avro/io/user.avsc", "src/test/avro/org/apache/beam/sdk/extensions/avro/schemas/test.avsc", + "src/test/avro/org/apache/beam/sdk/extensions/avro/schemas/logicaltypes/logical-types.avsc", "build/generated/sources/avro$k/test/java" ] } @@ -161,4 +164,4 @@ static def createTaskNames(Map prefixMap, String suffix) { return prefixMap.keySet().stream() .map { version -> "avroVersion${version}${suffix}" } .collect(Collectors.toList()) -} \ No newline at end of file +} diff --git a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java index da7daf605d89..bd21cea72f41 100644 --- a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java +++ b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java @@ -34,6 +34,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import javax.annotation.Nonnull; @@ -49,6 +50,7 @@ import net.bytebuddy.implementation.bytecode.member.MethodInvocation; import net.bytebuddy.matcher.ElementMatchers; import org.apache.avro.AvroRuntimeException; +import org.apache.avro.Conversion; import org.apache.avro.Conversions; import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; @@ -61,6 +63,7 @@ import org.apache.avro.reflect.AvroName; import org.apache.avro.reflect.ReflectData; import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.specific.SpecificRecordBase; import org.apache.avro.util.Utf8; import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; import org.apache.beam.sdk.extensions.avro.schemas.AvroRecordSchema; @@ -97,6 +100,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; +import org.apache.commons.lang3.reflect.FieldUtils; import org.checkerframework.checker.nullness.qual.EnsuresNonNullIf; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; @@ -154,6 +158,15 @@ public class AvroUtils { new ForLoadedType(ReadableInstant.class); private static final ForLoadedType JODA_INSTANT = new ForLoadedType(Instant.class); + private static final GenericData GENERIC_DATA_WITH_DEFAULT_CONVERSIONS; + + static { + GENERIC_DATA_WITH_DEFAULT_CONVERSIONS = new GenericData(); + addLogicalTypeConversions(GENERIC_DATA_WITH_DEFAULT_CONVERSIONS); + GENERIC_DATA_WITH_DEFAULT_CONVERSIONS.addLogicalTypeConversion( + new Conversions.DecimalConversion()); + } + // contains workarounds for third-party methods that accept nullable arguments but lack proper // annotations private static class NullnessCheckerWorkarounds { @@ -552,23 +565,43 @@ public static org.apache.avro.Schema toAvroSchema(Schema beamSchema) { * Strict conversion from AVRO to Beam, strict because it doesn't do widening or narrowing during * conversion. If Schema is not provided, one is inferred from the AVRO schema. */ - public static Row toBeamRowStrict(GenericRecord record, @Nullable Schema schema) { + public static Row toBeamRowStrict( + GenericRecord record, @Nullable Schema schema, @Nullable GenericData genericData) { if (schema == null) { schema = toBeamSchema(record.getSchema()); } + if (genericData == null) { + if (record instanceof SpecificRecordBase) { + // in case of SpecificRecord, the MODEL$ GenericData already has registered the specific + // conversions + genericData = getGenericData((SpecificRecordBase) record); + } else { + genericData = GENERIC_DATA_WITH_DEFAULT_CONVERSIONS; + } + } + Row.Builder builder = Row.withSchema(schema); org.apache.avro.Schema avroSchema = record.getSchema(); for (Field field : schema.getFields()) { Object value = record.get(field.getName()); org.apache.avro.Schema fieldAvroSchema = avroSchema.getField(field.getName()).schema(); - builder.addValue(convertAvroFieldStrict(value, fieldAvroSchema, field.getType())); + builder.addValue( + convertAvroFieldStrict(value, fieldAvroSchema, field.getType(), genericData)); } return builder.build(); } + /** + * Strict conversion from AVRO to Beam, strict because it doesn't do widening or narrowing during + * conversion. If Schema is not provided, one is inferred from the AVRO schema. + */ + public static Row toBeamRowStrict(GenericRecord record, @Nullable Schema schema) { + return toBeamRowStrict(record, schema, null); + } + /** * Convert from a Beam Row to an AVRO GenericRecord. The Avro Schema is inferred from the Beam * schema on the row. @@ -1323,6 +1356,64 @@ private static org.apache.avro.Schema getFieldSchema( } } + private static Object convertLogicalType( + @PolyNull Object value, + @Nonnull org.apache.avro.Schema avroSchema, + @Nonnull FieldType fieldType, + @Nonnull GenericData genericData) { + TypeWithNullability type = new TypeWithNullability(avroSchema); + LogicalType logicalType = LogicalTypes.fromSchema(type.type); + if (logicalType == null) { + return null; + } + + Object rawType = value; + + Conversion conversion = genericData.getConversionByClass(value.getClass(), logicalType); + Class convertedType = null; + if (conversion != null) { + convertedType = conversion.getConvertedType(); + if (convertedType.isInstance(value)) { + rawType = Conversions.convertToRawType(value, avroSchema, logicalType, conversion); + } + } + + if (logicalType instanceof LogicalTypes.Date) { + return convertDateStrict( + checkRawType(Integer.class, value, logicalType, rawType, conversion, convertedType), + fieldType); + } else if (logicalType instanceof LogicalTypes.TimeMillis) { + return checkRawType(Integer.class, value, logicalType, rawType, conversion, convertedType); + } else if (logicalType instanceof LogicalTypes.TimeMicros) { + return checkRawType(Long.class, value, logicalType, rawType, conversion, convertedType); + } else if (logicalType instanceof LogicalTypes.TimestampMillis) { + return convertDateTimeStrict( + checkRawType(Long.class, value, logicalType, rawType, conversion, convertedType), + fieldType); + } else if (logicalType instanceof LogicalTypes.TimestampMicros) { + return checkRawType(Long.class, value, logicalType, rawType, conversion, convertedType); + } else if ("local-timestamp-millis".equals(logicalType.getName())) { + return checkRawType(Long.class, value, logicalType, rawType, conversion, convertedType); + } else if ("local-timestamp-micros".equals(logicalType.getName())) { + return checkRawType(Long.class, value, logicalType, rawType, conversion, convertedType); + } else if (logicalType instanceof LogicalTypes.Decimal) { + if (rawType instanceof GenericFixed) { + // Decimal can be backed by ByteBuffer or GenericFixed. in case of GenericFixed, we convert + // it to ByteBuffer here + rawType = ByteBuffer.wrap(((GenericFixed) rawType).bytes()); + } + ByteBuffer byteBuffer = + checkRawType(ByteBuffer.class, value, logicalType, rawType, conversion, convertedType); + Conversion decimalConversion = new Conversions.DecimalConversion(); + BigDecimal bigDecimal = + decimalConversion.fromBytes(byteBuffer.duplicate(), type.type, logicalType); + return convertDecimal(bigDecimal, fieldType); + } else if (LogicalTypes.uuid().equals(logicalType)) { + return UUID.fromString(rawType.toString()).toString(); + } + return null; + } + /** * Strict conversion from AVRO to Beam, strict because it doesn't do widening or narrowing during * conversion. @@ -1330,44 +1421,26 @@ private static org.apache.avro.Schema getFieldSchema( * @param value {@link GenericRecord} or any nested value * @param avroSchema schema for value * @param fieldType target beam field type + * @param genericData {@link GenericData} instance to use for conversions * @return value converted for {@link Row} */ - @SuppressWarnings("unchecked") public static @PolyNull Object convertAvroFieldStrict( @PolyNull Object value, @Nonnull org.apache.avro.Schema avroSchema, - @Nonnull FieldType fieldType) { + @Nonnull FieldType fieldType, + @Nonnull GenericData genericData) { + if (value == null) { return null; } + Object convertedLogicalType = convertLogicalType(value, avroSchema, fieldType, genericData); - TypeWithNullability type = new TypeWithNullability(avroSchema); - LogicalType logicalType = LogicalTypes.fromSchema(type.type); - if (logicalType != null) { - if (logicalType instanceof LogicalTypes.Decimal) { - ByteBuffer byteBuffer = (ByteBuffer) value; - BigDecimal bigDecimal = - new Conversions.DecimalConversion() - .fromBytes(byteBuffer.duplicate(), type.type, logicalType); - return convertDecimal(bigDecimal, fieldType); - } else if (logicalType instanceof LogicalTypes.TimestampMillis) { - if (value instanceof ReadableInstant) { - return convertDateTimeStrict(((ReadableInstant) value).getMillis(), fieldType); - } else { - return convertDateTimeStrict((Long) value, fieldType); - } - } else if (logicalType instanceof LogicalTypes.Date) { - if (value instanceof ReadableInstant) { - int epochDays = Days.daysBetween(Instant.EPOCH, (ReadableInstant) value).getDays(); - return convertDateStrict(epochDays, fieldType); - } else if (value instanceof java.time.LocalDate) { - return convertDateStrict((int) ((java.time.LocalDate) value).toEpochDay(), fieldType); - } else { - return convertDateStrict((Integer) value, fieldType); - } - } + if (convertedLogicalType != null) { + return convertedLogicalType; } + TypeWithNullability type = new TypeWithNullability(avroSchema); + switch (type.type.getType()) { case FIXED: return convertFixedStrict((GenericFixed) value, fieldType); @@ -1402,14 +1475,15 @@ private static org.apache.avro.Schema getFieldSchema( return convertEnumStrict(value, fieldType); case ARRAY: - return convertArrayStrict((List) value, type.type.getElementType(), fieldType); + return convertArrayStrict( + (List) value, type.type.getElementType(), fieldType, genericData); case MAP: return convertMapStrict( - (Map) value, type.type.getValueType(), fieldType); + (Map) value, type.type.getValueType(), fieldType, genericData); case UNION: - return convertUnionStrict(value, type.type, fieldType); + return convertUnionStrict(value, type.type, fieldType, genericData); case NULL: throw new IllegalArgumentException("Can't convert 'null' to non-nullable field"); @@ -1419,6 +1493,24 @@ private static org.apache.avro.Schema getFieldSchema( } } + /** + * Strict conversion from AVRO to Beam, strict because it doesn't do widening or narrowing during + * conversion. + * + * @param value {@link GenericRecord} or any nested value + * @param avroSchema schema for value + * @param fieldType target beam field type + * @return value converted for {@link Row} + */ + @SuppressWarnings("unchecked") + public static @PolyNull Object convertAvroFieldStrict( + @PolyNull Object value, + @Nonnull org.apache.avro.Schema avroSchema, + @Nonnull FieldType fieldType) { + return convertAvroFieldStrict( + value, avroSchema, fieldType, GENERIC_DATA_WITH_DEFAULT_CONVERSIONS); + } + private static Object convertRecordStrict(GenericRecord record, FieldType fieldType) { checkTypeName(fieldType.getTypeName(), TypeName.ROW, "record"); return toBeamRowStrict(record, fieldType.getRowSchema()); @@ -1495,7 +1587,10 @@ private static Object convertEnumStrict(Object value, FieldType fieldType) { } private static Object convertUnionStrict( - Object value, org.apache.avro.Schema unionAvroSchema, FieldType fieldType) { + Object value, + org.apache.avro.Schema unionAvroSchema, + FieldType fieldType, + GenericData genericData) { checkTypeName(fieldType.getTypeName(), TypeName.LOGICAL_TYPE, "oneOfType"); checkArgument( checkNotNull(fieldType.getLogicalType()).getIdentifier().equals(OneOfType.IDENTIFIER)); @@ -1503,19 +1598,24 @@ private static Object convertUnionStrict( int fieldNumber = GenericData.get().resolveUnion(unionAvroSchema, value); FieldType baseFieldType = oneOfType.getOneOfSchema().getField(fieldNumber).getType(); Object convertedValue = - convertAvroFieldStrict(value, unionAvroSchema.getTypes().get(fieldNumber), baseFieldType); + convertAvroFieldStrict( + value, unionAvroSchema.getTypes().get(fieldNumber), baseFieldType, genericData); return oneOfType.createValue(fieldNumber, convertedValue); } private static Object convertArrayStrict( - List values, org.apache.avro.Schema elemAvroSchema, FieldType fieldType) { + List values, + org.apache.avro.Schema elemAvroSchema, + FieldType fieldType, + GenericData genericData) { checkTypeName(fieldType.getTypeName(), TypeName.ARRAY, "array"); List ret = new ArrayList<>(values.size()); FieldType elemFieldType = fieldType.getCollectionElementType(); for (Object value : values) { - ret.add(convertAvroFieldStrict(value, elemAvroSchema, checkNotNull(elemFieldType))); + ret.add( + convertAvroFieldStrict(value, elemAvroSchema, checkNotNull(elemFieldType), genericData)); } return ret; @@ -1524,7 +1624,8 @@ private static Object convertArrayStrict( private static Object convertMapStrict( Map values, org.apache.avro.Schema valueAvroSchema, - FieldType fieldType) { + FieldType fieldType, + GenericData genericData) { checkTypeName(fieldType.getTypeName(), TypeName.MAP, "map"); FieldType mapKeyType = checkNotNull(fieldType.getMapKeyType()); FieldType mapValueType = checkNotNull(fieldType.getMapValueType()); @@ -1539,7 +1640,7 @@ private static Object convertMapStrict( for (Map.Entry value : values.entrySet()) { ret.put( convertStringStrict(value.getKey(), mapKeyType), - convertAvroFieldStrict(value.getValue(), valueAvroSchema, mapValueType)); + convertAvroFieldStrict(value.getValue(), valueAvroSchema, mapValueType, genericData)); } return ret; @@ -1563,4 +1664,45 @@ private static org.apache.avro.Schema buildHiveLogicalTypeSchema( hiveLogicalType, size); return new org.apache.avro.Schema.Parser().parse(schemaJson); } + + private static GenericData getGenericData(SpecificRecordBase record) { + try { + return record.getSpecificData(); + } catch (NoSuchMethodError e) { + try { + // SpecificRecordBase.getSpecificData() was not available in avro 182 + return (GenericData) FieldUtils.readStaticField(record.getClass(), "MODEL$", true); + } catch (IllegalAccessException ex) { + throw new IllegalArgumentException( + "Unable to access MODEL$ field in SpecificRecordBase class", ex); + } + } + } + + private static T checkRawType( + Class desiredRawType, + Object value, + LogicalType logicalType, + Object rawType, + Conversion conversion, + Class convertedType) { + String msg = + String.format( + "Value %s of class %s is not a supported type for logical type %s. " + + "Underlying avro built-in raw type should be instance of %s. " + + "However it is instance of %s and has value %s ." + + "Generic data has conversion %s, convertedType %s", + value, + value.getClass(), + logicalType, + desiredRawType, + rawType.getClass(), + rawType, + conversion, + convertedType); + if (!desiredRawType.isInstance(rawType)) { + throw new IllegalArgumentException(msg); + } + return (T) rawType; + } } diff --git a/sdks/java/extensions/avro/src/test/avro/org/apache/beam/sdk/extensions/avro/schemas/logicaltypes/logical-types.avsc b/sdks/java/extensions/avro/src/test/avro/org/apache/beam/sdk/extensions/avro/schemas/logicaltypes/logical-types.avsc new file mode 100644 index 000000000000..1dd1d8426f6d --- /dev/null +++ b/sdks/java/extensions/avro/src/test/avro/org/apache/beam/sdk/extensions/avro/schemas/logicaltypes/logical-types.avsc @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{ + "type": "record", + "name": "LogicalTypesExample", + "namespace": "org.apache.beam.sdk.extensions.avro.schemas.logicaltypes", + "fields": [ + { + "name": "dateField", + "type": { + "type": "int", + "logicalType": "date" + } + }, + { + "name": "timeMillisField", + "type": { + "type": "int", + "logicalType": "time-millis" + } + }, + { + "name": "timeMicrosField", + "type": { + "type": "long", + "logicalType": "time-micros" + } + }, + { + "name": "timestampMillisField", + "type": { + "type": "long", + "logicalType": "timestamp-millis" + } + }, + { + "name": "timestampMicrosField", + "type": { + "type": "long", + "logicalType": "timestamp-micros" + } + }, + { + "name": "localTimestampMillisField", + "type": { + "type": "long", + "logicalType": "local-timestamp-millis" + } + }, + { + "name": "localTimestampMicrosField", + "type": { + "type": "long", + "logicalType": "local-timestamp-micros" + } + }, + { + "name": "decimalSmall", + "type": { + "type": "bytes", + "logicalType": "decimal", + "precision": 5, + "scale": 2 + } + }, + { + "name": "decimalMedium", + "type": { + "type": "bytes", + "logicalType": "decimal", + "precision": 10, + "scale": 4 + } + }, + { + "name": "decimalLarge", + "type": { + "type": "bytes", + "logicalType": "decimal", + "precision": 20, + "scale": 6 + } + }, + { + "name": "fixedDecimalSmall", + "type": { + "type": "fixed", + "size": 4, + "logicalType": "decimal", + "precision": 6, + "scale": 2, + "name": "fixedDecimalSmall" + } + }, + { + "name": "fixedDecimalMedium", + "type": { + "type": "fixed", + "size": 8, + "logicalType": "decimal", + "precision": 14, + "scale": 4, + "name": "fixedDecimalMedium" + } + }, + { + "name": "fixedDecimalLarge", + "type": { + "type": "fixed", + "size": 12, + "logicalType": "decimal", + "precision": 22, + "scale": 6, + "name": "fixedDecimalLarge" + } + }, + { + "name": "uuidField", + "type": { + "type": "string", + "logicalType": "uuid" + } + } + ] +} diff --git a/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java index 85781c4b8d0e..1b00007bae6d 100644 --- a/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java +++ b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java @@ -27,8 +27,11 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; import java.sql.JDBCType; +import java.time.temporal.ChronoUnit; +import java.time.temporal.TemporalUnit; import java.util.List; import java.util.Map; +import java.util.UUID; import org.apache.avro.Conversions; import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; @@ -41,6 +44,7 @@ import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; import org.apache.beam.sdk.extensions.avro.io.AvroGeneratedUser; import org.apache.beam.sdk.extensions.avro.io.AvroGeneratedUserFactory; +import org.apache.beam.sdk.extensions.avro.schemas.logicaltypes.LogicalTypesExample; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.Field; import org.apache.beam.sdk.schemas.Schema.FieldType; @@ -60,6 +64,7 @@ import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.DateTime; +import org.joda.time.DateTimeFieldType; import org.joda.time.DateTimeZone; import org.joda.time.Days; import org.joda.time.Instant; @@ -115,6 +120,111 @@ public void supportsAnyAvroSchema( } } + @Test + public void supportsAllLogicalTypes() { + if (VERSION_AVRO.equals("1.8.2") || VERSION_AVRO.equals("1.9.2")) { + // Skip this test for Avro 1.8.2 and 1.9.2 as they do not support all logical types + // and do not register all conversions to the GenericRecord.MODEL$. In user code, + // those older versions can still be used; if conversions are needed, a GenericData with the + // appropriate conversions can be passed to AvroUtils.toBeamRowStrict + return; + } + + BigDecimal bigDecimalPrecision5Scale2 = new BigDecimal("123.45"); + BigDecimal bigDecimalPrecision10Scale4 = new BigDecimal("12345.6789"); + BigDecimal bigDecimalPrecision20Scale6 = new BigDecimal("1234567.123456"); + UUID uuid = java.util.UUID.fromString("aa5961a8-a14a-4e8c-91a9-e5d3f35389e8"); + + long timestampMicros = 1739543415001000L; + long timeMicros = 52215000500L; + + DateTime dateTime = new DateTime(2025, 2, 17, 0, 0, 0, DateTimeZone.UTC); + + GenericRecord specificRecord = + getSpecificRecordWithLogicalTypes( + dateTime, + timeMicros, + timestampMicros, + bigDecimalPrecision5Scale2, + bigDecimalPrecision10Scale4, + bigDecimalPrecision20Scale6, + uuid); + + Row expected = + getRowWithLogicalTypes( + dateTime, + timeMicros, + timestampMicros, + bigDecimalPrecision5Scale2, + bigDecimalPrecision10Scale4, + bigDecimalPrecision20Scale6, + uuid); + + Row actual = AvroUtils.toBeamRowStrict(specificRecord, null, null); + + assertEquals(expected, actual); + } + + private static Row getRowWithLogicalTypes( + DateTime dateTime, + long timeMicros, + long timestampMicros, + BigDecimal bigDecimalPrecision5Scale2, + BigDecimal bigDecimalPrecision10Scale4, + BigDecimal bigDecimalPrecision20Scale6, + UUID uuid) { + return Row.withSchema(AvroUtils.toBeamSchema(LogicalTypesExample.getClassSchema())) + .withFieldValue("dateField", dateTime) + .withFieldValue("timeMillisField", (int) (timeMicros / 1000)) + .withFieldValue("timeMicrosField", timeMicros) + .withFieldValue("timestampMillisField", jodaInstant(timestampMicros)) + .withFieldValue("timestampMicrosField", timestampMicros) + .withFieldValue("localTimestampMillisField", timestampMicros / 1000) + .withFieldValue("localTimestampMicrosField", timestampMicros) + .withFieldValue("decimalSmall", bigDecimalPrecision5Scale2) + .withFieldValue("decimalMedium", bigDecimalPrecision10Scale4) + .withFieldValue("decimalLarge", bigDecimalPrecision20Scale6) + .withFieldValue("fixedDecimalSmall", bigDecimalPrecision5Scale2) + .withFieldValue("fixedDecimalMedium", bigDecimalPrecision10Scale4) + .withFieldValue("fixedDecimalLarge", bigDecimalPrecision20Scale6) + .withFieldValue("uuidField", uuid.toString()) + .build(); + } + + private static LogicalTypesExample getSpecificRecordWithLogicalTypes( + org.joda.time.DateTime dateTime, + long timeMicros, + long timestampMicros, + BigDecimal bigDecimalPrecision5Scale2, + BigDecimal bigDecimalPrecision10Scale4, + BigDecimal bigDecimalPrecision20Scale6, + UUID uuid) { + + java.time.LocalDate localDate = + java.time.LocalDate.of( + dateTime.get(DateTimeFieldType.year()), + dateTime.get(DateTimeFieldType.monthOfYear()), + dateTime.get(DateTimeFieldType.dayOfMonth())); + LogicalTypesExample r = new LogicalTypesExample(); + + r.put("dateField", localDate); + r.put("timeMillisField", javaLocalTime(timeMicros, ChronoUnit.MILLIS)); + r.put("timeMicrosField", javaLocalTime(timeMicros, ChronoUnit.MICROS)); + r.put("timestampMillisField", javaInstant(timestampMicros, ChronoUnit.MILLIS)); + r.put("timestampMicrosField", javaInstant(timestampMicros, ChronoUnit.MICROS)); + r.put("localTimestampMillisField", javaLocalDateTimeAtUtc(timestampMicros, ChronoUnit.MILLIS)); + r.put("localTimestampMicrosField", javaLocalDateTimeAtUtc(timestampMicros, ChronoUnit.MICROS)); + r.put("decimalSmall", bigDecimalPrecision5Scale2); + r.put("decimalMedium", bigDecimalPrecision10Scale4); + r.put("decimalLarge", bigDecimalPrecision20Scale6); + r.put("fixedDecimalSmall", bigDecimalPrecision5Scale2); + r.put("fixedDecimalMedium", bigDecimalPrecision10Scale4); + r.put("fixedDecimalLarge", bigDecimalPrecision20Scale6); + r.put("uuidField", uuid.toString()); + + return r; + } + @Property(trials = 1000) @SuppressWarnings("unchecked") public void avroToBeamRoundTrip( @@ -356,6 +466,24 @@ private static GenericRecord getGenericRecord() { .build(); } + private static java.time.Instant javaInstant(long micros, TemporalUnit temporalUnit) { + return java.time.Instant.ofEpochSecond(micros / 1000000, micros * 1000 % 1000000000) + .truncatedTo(temporalUnit); + } + + private static java.time.LocalDateTime javaLocalDateTimeAtUtc( + long micros, TemporalUnit temporalUnit) { + return javaInstant(micros, temporalUnit).atOffset(java.time.ZoneOffset.UTC).toLocalDateTime(); + } + + private static org.joda.time.Instant jodaInstant(long micros) { + return org.joda.time.Instant.ofEpochMilli(micros / 1000); + } + + private static java.time.LocalTime javaLocalTime(long micros, TemporalUnit temporalUnit) { + return java.time.LocalTime.ofNanoOfDay(micros * 1000).truncatedTo(temporalUnit); + } + @Test public void testFromAvroSchema() { assertEquals(getBeamSchema(), AvroUtils.toBeamSchema(getAvroSchema())); From efeffb99702c12ecae3b5e79ea3c57dbb0f73842 Mon Sep 17 00:00:00 2001 From: Alfredo Scaccialepre Date: Wed, 19 Feb 2025 20:46:49 +0100 Subject: [PATCH 02/20] using string comparison to avoid class not found issues with earlier versions of avro --- .../avro/schemas/utils/AvroUtils.java | 67 ++++++++++--------- 1 file changed, 35 insertions(+), 32 deletions(-) diff --git a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java index bd21cea72f41..1758e2948f58 100644 --- a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java +++ b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java @@ -1378,38 +1378,41 @@ private static Object convertLogicalType( } } - if (logicalType instanceof LogicalTypes.Date) { - return convertDateStrict( - checkRawType(Integer.class, value, logicalType, rawType, conversion, convertedType), - fieldType); - } else if (logicalType instanceof LogicalTypes.TimeMillis) { - return checkRawType(Integer.class, value, logicalType, rawType, conversion, convertedType); - } else if (logicalType instanceof LogicalTypes.TimeMicros) { - return checkRawType(Long.class, value, logicalType, rawType, conversion, convertedType); - } else if (logicalType instanceof LogicalTypes.TimestampMillis) { - return convertDateTimeStrict( - checkRawType(Long.class, value, logicalType, rawType, conversion, convertedType), - fieldType); - } else if (logicalType instanceof LogicalTypes.TimestampMicros) { - return checkRawType(Long.class, value, logicalType, rawType, conversion, convertedType); - } else if ("local-timestamp-millis".equals(logicalType.getName())) { - return checkRawType(Long.class, value, logicalType, rawType, conversion, convertedType); - } else if ("local-timestamp-micros".equals(logicalType.getName())) { - return checkRawType(Long.class, value, logicalType, rawType, conversion, convertedType); - } else if (logicalType instanceof LogicalTypes.Decimal) { - if (rawType instanceof GenericFixed) { - // Decimal can be backed by ByteBuffer or GenericFixed. in case of GenericFixed, we convert - // it to ByteBuffer here - rawType = ByteBuffer.wrap(((GenericFixed) rawType).bytes()); - } - ByteBuffer byteBuffer = - checkRawType(ByteBuffer.class, value, logicalType, rawType, conversion, convertedType); - Conversion decimalConversion = new Conversions.DecimalConversion(); - BigDecimal bigDecimal = - decimalConversion.fromBytes(byteBuffer.duplicate(), type.type, logicalType); - return convertDecimal(bigDecimal, fieldType); - } else if (LogicalTypes.uuid().equals(logicalType)) { - return UUID.fromString(rawType.toString()).toString(); + // switch on string name because some LogicalType classes are not available in all versions of + // Avro + switch (logicalType.getName()) { + case "date": + return convertDateStrict( + checkRawType(Integer.class, value, logicalType, rawType, conversion, convertedType), + fieldType); + case "time-millis": + return checkRawType(Integer.class, value, logicalType, rawType, conversion, convertedType); + case "time-micros": + case "timestamp-micros": + case "local-timestamp-millis": + case "local-timestamp-micros": + return checkRawType(Long.class, value, logicalType, rawType, conversion, convertedType); + case "timestamp-millis": + return convertDateTimeStrict( + checkRawType(Long.class, value, logicalType, rawType, conversion, convertedType), + fieldType); + case "decimal": + { + if (rawType instanceof GenericFixed) { + // Decimal can be backed by ByteBuffer or GenericFixed. in case of GenericFixed, we + // convert it to ByteBuffer here + rawType = ByteBuffer.wrap(((GenericFixed) rawType).bytes()); + } + ByteBuffer byteBuffer = + checkRawType( + ByteBuffer.class, value, logicalType, rawType, conversion, convertedType); + Conversion decimalConversion = new Conversions.DecimalConversion(); + BigDecimal bigDecimal = + decimalConversion.fromBytes(byteBuffer.duplicate(), type.type, logicalType); + return convertDecimal(bigDecimal, fieldType); + } + case "uuid": + return UUID.fromString(rawType.toString()).toString(); } return null; } From d5a461833e8f124219ea9d6378d94e857cdc6b05 Mon Sep 17 00:00:00 2001 From: Alfredo Scaccialepre Date: Wed, 19 Feb 2025 20:46:49 +0100 Subject: [PATCH 03/20] using string comparison to avoid class not found issues with earlier versions of avro --- .../avro/schemas/utils/AvroUtils.java | 67 ++++++++++--------- 1 file changed, 35 insertions(+), 32 deletions(-) diff --git a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java index bd21cea72f41..1758e2948f58 100644 --- a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java +++ b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java @@ -1378,38 +1378,41 @@ private static Object convertLogicalType( } } - if (logicalType instanceof LogicalTypes.Date) { - return convertDateStrict( - checkRawType(Integer.class, value, logicalType, rawType, conversion, convertedType), - fieldType); - } else if (logicalType instanceof LogicalTypes.TimeMillis) { - return checkRawType(Integer.class, value, logicalType, rawType, conversion, convertedType); - } else if (logicalType instanceof LogicalTypes.TimeMicros) { - return checkRawType(Long.class, value, logicalType, rawType, conversion, convertedType); - } else if (logicalType instanceof LogicalTypes.TimestampMillis) { - return convertDateTimeStrict( - checkRawType(Long.class, value, logicalType, rawType, conversion, convertedType), - fieldType); - } else if (logicalType instanceof LogicalTypes.TimestampMicros) { - return checkRawType(Long.class, value, logicalType, rawType, conversion, convertedType); - } else if ("local-timestamp-millis".equals(logicalType.getName())) { - return checkRawType(Long.class, value, logicalType, rawType, conversion, convertedType); - } else if ("local-timestamp-micros".equals(logicalType.getName())) { - return checkRawType(Long.class, value, logicalType, rawType, conversion, convertedType); - } else if (logicalType instanceof LogicalTypes.Decimal) { - if (rawType instanceof GenericFixed) { - // Decimal can be backed by ByteBuffer or GenericFixed. in case of GenericFixed, we convert - // it to ByteBuffer here - rawType = ByteBuffer.wrap(((GenericFixed) rawType).bytes()); - } - ByteBuffer byteBuffer = - checkRawType(ByteBuffer.class, value, logicalType, rawType, conversion, convertedType); - Conversion decimalConversion = new Conversions.DecimalConversion(); - BigDecimal bigDecimal = - decimalConversion.fromBytes(byteBuffer.duplicate(), type.type, logicalType); - return convertDecimal(bigDecimal, fieldType); - } else if (LogicalTypes.uuid().equals(logicalType)) { - return UUID.fromString(rawType.toString()).toString(); + // switch on string name because some LogicalType classes are not available in all versions of + // Avro + switch (logicalType.getName()) { + case "date": + return convertDateStrict( + checkRawType(Integer.class, value, logicalType, rawType, conversion, convertedType), + fieldType); + case "time-millis": + return checkRawType(Integer.class, value, logicalType, rawType, conversion, convertedType); + case "time-micros": + case "timestamp-micros": + case "local-timestamp-millis": + case "local-timestamp-micros": + return checkRawType(Long.class, value, logicalType, rawType, conversion, convertedType); + case "timestamp-millis": + return convertDateTimeStrict( + checkRawType(Long.class, value, logicalType, rawType, conversion, convertedType), + fieldType); + case "decimal": + { + if (rawType instanceof GenericFixed) { + // Decimal can be backed by ByteBuffer or GenericFixed. in case of GenericFixed, we + // convert it to ByteBuffer here + rawType = ByteBuffer.wrap(((GenericFixed) rawType).bytes()); + } + ByteBuffer byteBuffer = + checkRawType( + ByteBuffer.class, value, logicalType, rawType, conversion, convertedType); + Conversion decimalConversion = new Conversions.DecimalConversion(); + BigDecimal bigDecimal = + decimalConversion.fromBytes(byteBuffer.duplicate(), type.type, logicalType); + return convertDecimal(bigDecimal, fieldType); + } + case "uuid": + return UUID.fromString(rawType.toString()).toString(); } return null; } From 7546865b506abe452deb496c9deaa9ba94f537b1 Mon Sep 17 00:00:00 2001 From: Alfredo Scaccialepre Date: Thu, 20 Feb 2025 19:33:26 +0100 Subject: [PATCH 04/20] com.github.davidmc24.gradle.plugin:gradle-avro-plugin:1.9.1 --- buildSrc/build.gradle.kts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/buildSrc/build.gradle.kts b/buildSrc/build.gradle.kts index 7057dbde8866..d2f89cc62325 100644 --- a/buildSrc/build.gradle.kts +++ b/buildSrc/build.gradle.kts @@ -45,7 +45,7 @@ dependencies { implementation("com.github.spotbugs.snom:spotbugs-gradle-plugin:5.0.14") runtimeOnly("com.google.protobuf:protobuf-gradle-plugin:0.8.13") // Enable proto code generation - runtimeOnly("com.github.davidmc24.gradle.plugin:gradle-avro-plugin:1.1.0") // Enable Avro code generation. Version 1.1.0 is the last supporting avro 1.10.2 + runtimeOnly("com.github.davidmc24.gradle.plugin:gradle-avro-plugin:1.9.1") // Enable Avro code generation. Version 1.1.0 is the last supporting avro 1.10.2 runtimeOnly("com.diffplug.spotless:spotless-plugin-gradle:5.6.1") // Enable a code formatting plugin runtimeOnly("gradle.plugin.com.dorongold.plugins:task-tree:1.5") // Adds a 'taskTree' task to print task dependency tree runtimeOnly("gradle.plugin.com.github.johnrengelman:shadow:7.1.1") // Enable shading Java dependencies From ee332cbaa243be68a60c9c229583c72c493800c4 Mon Sep 17 00:00:00 2001 From: Alfredo Scaccialepre Date: Wed, 19 Feb 2025 20:46:49 +0100 Subject: [PATCH 05/20] using string comparison to avoid class not found issues with earlier versions of avro --- .../avro/schemas/utils/AvroUtils.java | 67 ++++++++++--------- 1 file changed, 35 insertions(+), 32 deletions(-) diff --git a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java index bd21cea72f41..1758e2948f58 100644 --- a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java +++ b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java @@ -1378,38 +1378,41 @@ private static Object convertLogicalType( } } - if (logicalType instanceof LogicalTypes.Date) { - return convertDateStrict( - checkRawType(Integer.class, value, logicalType, rawType, conversion, convertedType), - fieldType); - } else if (logicalType instanceof LogicalTypes.TimeMillis) { - return checkRawType(Integer.class, value, logicalType, rawType, conversion, convertedType); - } else if (logicalType instanceof LogicalTypes.TimeMicros) { - return checkRawType(Long.class, value, logicalType, rawType, conversion, convertedType); - } else if (logicalType instanceof LogicalTypes.TimestampMillis) { - return convertDateTimeStrict( - checkRawType(Long.class, value, logicalType, rawType, conversion, convertedType), - fieldType); - } else if (logicalType instanceof LogicalTypes.TimestampMicros) { - return checkRawType(Long.class, value, logicalType, rawType, conversion, convertedType); - } else if ("local-timestamp-millis".equals(logicalType.getName())) { - return checkRawType(Long.class, value, logicalType, rawType, conversion, convertedType); - } else if ("local-timestamp-micros".equals(logicalType.getName())) { - return checkRawType(Long.class, value, logicalType, rawType, conversion, convertedType); - } else if (logicalType instanceof LogicalTypes.Decimal) { - if (rawType instanceof GenericFixed) { - // Decimal can be backed by ByteBuffer or GenericFixed. in case of GenericFixed, we convert - // it to ByteBuffer here - rawType = ByteBuffer.wrap(((GenericFixed) rawType).bytes()); - } - ByteBuffer byteBuffer = - checkRawType(ByteBuffer.class, value, logicalType, rawType, conversion, convertedType); - Conversion decimalConversion = new Conversions.DecimalConversion(); - BigDecimal bigDecimal = - decimalConversion.fromBytes(byteBuffer.duplicate(), type.type, logicalType); - return convertDecimal(bigDecimal, fieldType); - } else if (LogicalTypes.uuid().equals(logicalType)) { - return UUID.fromString(rawType.toString()).toString(); + // switch on string name because some LogicalType classes are not available in all versions of + // Avro + switch (logicalType.getName()) { + case "date": + return convertDateStrict( + checkRawType(Integer.class, value, logicalType, rawType, conversion, convertedType), + fieldType); + case "time-millis": + return checkRawType(Integer.class, value, logicalType, rawType, conversion, convertedType); + case "time-micros": + case "timestamp-micros": + case "local-timestamp-millis": + case "local-timestamp-micros": + return checkRawType(Long.class, value, logicalType, rawType, conversion, convertedType); + case "timestamp-millis": + return convertDateTimeStrict( + checkRawType(Long.class, value, logicalType, rawType, conversion, convertedType), + fieldType); + case "decimal": + { + if (rawType instanceof GenericFixed) { + // Decimal can be backed by ByteBuffer or GenericFixed. in case of GenericFixed, we + // convert it to ByteBuffer here + rawType = ByteBuffer.wrap(((GenericFixed) rawType).bytes()); + } + ByteBuffer byteBuffer = + checkRawType( + ByteBuffer.class, value, logicalType, rawType, conversion, convertedType); + Conversion decimalConversion = new Conversions.DecimalConversion(); + BigDecimal bigDecimal = + decimalConversion.fromBytes(byteBuffer.duplicate(), type.type, logicalType); + return convertDecimal(bigDecimal, fieldType); + } + case "uuid": + return UUID.fromString(rawType.toString()).toString(); } return null; } From 774fbba76fec19f3177be556870f214193272b1f Mon Sep 17 00:00:00 2001 From: Alfredo Scaccialepre Date: Thu, 20 Feb 2025 19:33:26 +0100 Subject: [PATCH 06/20] com.github.davidmc24.gradle.plugin:gradle-avro-plugin:1.9.1 --- buildSrc/build.gradle.kts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/buildSrc/build.gradle.kts b/buildSrc/build.gradle.kts index 7057dbde8866..d2f89cc62325 100644 --- a/buildSrc/build.gradle.kts +++ b/buildSrc/build.gradle.kts @@ -45,7 +45,7 @@ dependencies { implementation("com.github.spotbugs.snom:spotbugs-gradle-plugin:5.0.14") runtimeOnly("com.google.protobuf:protobuf-gradle-plugin:0.8.13") // Enable proto code generation - runtimeOnly("com.github.davidmc24.gradle.plugin:gradle-avro-plugin:1.1.0") // Enable Avro code generation. Version 1.1.0 is the last supporting avro 1.10.2 + runtimeOnly("com.github.davidmc24.gradle.plugin:gradle-avro-plugin:1.9.1") // Enable Avro code generation. Version 1.1.0 is the last supporting avro 1.10.2 runtimeOnly("com.diffplug.spotless:spotless-plugin-gradle:5.6.1") // Enable a code formatting plugin runtimeOnly("gradle.plugin.com.dorongold.plugins:task-tree:1.5") // Adds a 'taskTree' task to print task dependency tree runtimeOnly("gradle.plugin.com.github.johnrengelman:shadow:7.1.1") // Enable shading Java dependencies From 2f3051c90fbb77aa5b095e6e6ab979c425b3e913 Mon Sep 17 00:00:00 2001 From: synenka <97878236+synenka@users.noreply.github.com> Date: Wed, 19 Feb 2025 16:38:46 +0100 Subject: [PATCH 07/20] Add `types.Unalias` to types assertions and types switches to get an underlying type instead of types.Alias (#33868) --- sdks/go/pkg/beam/util/starcgenx/starcgenx.go | 41 +++++++++++++------- 1 file changed, 28 insertions(+), 13 deletions(-) diff --git a/sdks/go/pkg/beam/util/starcgenx/starcgenx.go b/sdks/go/pkg/beam/util/starcgenx/starcgenx.go index b5cd0ddc8eb8..22d2be6e43f1 100644 --- a/sdks/go/pkg/beam/util/starcgenx/starcgenx.go +++ b/sdks/go/pkg/beam/util/starcgenx/starcgenx.go @@ -15,7 +15,7 @@ // Package starcgenx is a Static Analysis Type Assertion shim and Registration Code Generator // which provides an extractor to extract types from a package, in order to generate -// approprate shimsr a package so code can be generated for it. +// appropriate shims for a package so code can be generated for it. // // It's written for use by the starcgen tool, but separate to permit // alternative "go/importer" Importers for accessing types from imported packages. @@ -336,6 +336,7 @@ func (e *Extractor) isRequired(ident string, obj types.Object, idsRequired, idsF // or it's receiver type identifier needs to be in the filtered identifiers. if idsRequired[ident] { idsFound[ident] = true + e.Printf("isRequired found: %s\n", ident) return true } // Check if this is a function. @@ -347,10 +348,10 @@ func (e *Extractor) isRequired(ident string, obj types.Object, idsRequired, idsF if recv := sig.Recv(); recv != nil && graph.IsLifecycleMethod(ident) { // We don't want to care about pointers, so dereference to value type. t := recv.Type() - p, ok := t.(*types.Pointer) + p, ok := types.Unalias(t).(*types.Pointer) for ok { t = p.Elem() - p, ok = t.(*types.Pointer) + p, ok = types.Unalias(t).(*types.Pointer) } ts := types.TypeString(t, e.qualifier) e.Printf("recv %v has %v, ts: %s %s--- ", recv, sig, ts, ident) @@ -384,6 +385,8 @@ func (e *Extractor) fromObj(fset *token.FileSet, id *ast.Ident, obj types.Object ident = obj.Name() } if !e.isRequired(ident, obj, idsRequired, idsFound) { + e.Printf("%s: %q with package %q is not required \n", + fset.Position(id.Pos()), id.Name, pkg.Name()) return } @@ -391,7 +394,7 @@ func (e *Extractor) fromObj(fset *token.FileSet, id *ast.Ident, obj types.Object case *types.Var: // Vars are tricky since they could be anything, and anywhere (package scope, parameters, etc) // eg. Flags, or Field Tags, among others. - // I'm increasingly convinced that we should simply igonore vars. + // I'm increasingly convinced that we should simply ignore vars. // Do nothing for vars. case *types.Func: sig := obj.Type().(*types.Signature) @@ -405,10 +408,10 @@ func (e *Extractor) fromObj(fset *token.FileSet, id *ast.Ident, obj types.Object } // This must be a structural DoFn! We should generate a closure wrapper for it. t := recv.Type() - p, ok := t.(*types.Pointer) + p, ok := types.Unalias(t).(*types.Pointer) for ok { t = p.Elem() - p, ok = t.(*types.Pointer) + p, ok = types.Unalias(t).(*types.Pointer) } ts := types.TypeString(t, e.qualifier) mthdMap := e.wraps[ts] @@ -453,6 +456,10 @@ func (e *Extractor) extractType(ot *types.TypeName) { // A single level is safe since the code we're analysing imports it, // so we can assume the generated code can access it too. if ot.IsAlias() { + if t, ok := ot.Type().(*types.Alias); ok { + ot = t.Obj() + name = types.TypeString(t, e.qualifier) + } if t, ok := ot.Type().(*types.Named); ok { ot = t.Obj() name = types.TypeString(t, e.qualifier) @@ -461,7 +468,7 @@ func (e *Extractor) extractType(ot *types.TypeName) { // Only register non-universe types (eg. avoid `error` and similar) if pkg := ot.Pkg(); pkg != nil { path := pkg.Path() - e.imports[pkg.Path()] = struct{}{} + e.imports[path] = struct{}{} // Do not add universal types to be registered. if path == shimx.TypexImport { @@ -484,17 +491,17 @@ func (e *Extractor) extractFromContainer(t types.Type) types.Type { // Container types need to be iteratively unwrapped until we're at the base type, // so we can get the import if necessary. for { - if s, ok := t.(*types.Slice); ok { + if s, ok := types.Unalias(t).(*types.Slice); ok { t = s.Elem() continue } - if p, ok := t.(*types.Pointer); ok { + if p, ok := types.Unalias(t).(*types.Pointer); ok { t = p.Elem() continue } - if a, ok := t.(*types.Array); ok { + if a, ok := types.Unalias(t).(*types.Array); ok { t = a.Elem() continue } @@ -510,9 +517,17 @@ func (e *Extractor) extractFromTuple(tuple *types.Tuple) { t := e.extractFromContainer(s.Type()) // Here's where we ensure we register new imports. + if at, ok := t.(*types.Alias); ok { + if pkg := at.Obj().Pkg(); pkg != nil { + e.imports[pkg.Path()] = struct{}{} + } + } if t, ok := t.(*types.Named); ok { if pkg := t.Obj().Pkg(); pkg != nil { + e.Printf("extractType: adding import path %q for %v\n", pkg.Path(), t) e.imports[pkg.Path()] = struct{}{} + } else { + e.Printf("extractType: %v has no package to import\n", t) } e.extractType(t.Obj()) } @@ -683,7 +698,7 @@ func (e *Extractor) makeEmitter(sig *types.Signature) (shimx.Emitter, bool) { // makeInput checks if the given signature is an iterator or not, and if so, // returns a shimx.Input struct for the signature for use by the code -// generator. The canonical check for an iterater signature is in the +// generator. The canonical check for an iterator signature is in the // funcx.UnfoldIter function which uses the reflect library, // and this logic is replicated here. func (e *Extractor) makeInput(sig *types.Signature) (shimx.Input, bool) { @@ -692,13 +707,13 @@ func (e *Extractor) makeInput(sig *types.Signature) (shimx.Input, bool) { return shimx.Input{}, false } // Iterators must return a bool. - if b, ok := r.At(0).Type().(*types.Basic); !ok || b.Kind() != types.Bool { + if b, ok := types.Unalias(r.At(0).Type()).(*types.Basic); !ok || b.Kind() != types.Bool { return shimx.Input{}, false } p := sig.Params() for i := 0; i < p.Len(); i++ { // All params for iterators must be pointers. - if _, ok := p.At(i).Type().(*types.Pointer); !ok { + if _, ok := types.Unalias(p.At(i).Type()).(*types.Pointer); !ok { return shimx.Input{}, false } } From 364ffe8a1fffdc483383c7b56ed5dd6304a947d2 Mon Sep 17 00:00:00 2001 From: Vitaly Terentyev Date: Wed, 19 Feb 2025 22:35:47 +0400 Subject: [PATCH 08/20] Revert huggingface transformers to 4.30.0 (#34025) --- .../apache_beam/ml/inference/huggingface_tests_requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/ml/inference/huggingface_tests_requirements.txt b/sdks/python/apache_beam/ml/inference/huggingface_tests_requirements.txt index ce783ffe6d90..adb4816cab6b 100644 --- a/sdks/python/apache_beam/ml/inference/huggingface_tests_requirements.txt +++ b/sdks/python/apache_beam/ml/inference/huggingface_tests_requirements.txt @@ -16,5 +16,5 @@ # torch>=1.7.1 -transformers==4.48.0 +transformers==4.30.0 tensorflow>=2.12.0 \ No newline at end of file From 6290844769b4ca5429e666221be3a848a19c105d Mon Sep 17 00:00:00 2001 From: martin trieu Date: Wed, 19 Feb 2025 12:18:09 -0800 Subject: [PATCH 09/20] add endpoint type to WorkerMetadataResponse proto (#33953) * add endpoint type to WorkerMetadataResponse proto * add default value to endpoint_type --- .../worker/windmill/src/main/proto/windmill.proto | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto index e1b52547fe73..c58edab0fb47 100644 --- a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto +++ b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto @@ -926,6 +926,15 @@ message WorkerMetadataResponse { // Used to set gRPC authority. optional string external_endpoint = 5; + enum EndpointType { + UNKNOWN = 0; + CLOUDPATH = 1; + BORG = 2; + DIRECTPATH = 3; + } + + optional EndpointType endpoint_type = 6 [default = CLOUDPATH]; + reserved 4; } From d21542339cde54a8f3146df2a374dc76b3001138 Mon Sep 17 00:00:00 2001 From: scwhittle Date: Wed, 19 Feb 2025 20:19:36 +0000 Subject: [PATCH 10/20] add hashcode/equals to WaitTest helper classes to avoid log error (#34006) --- .../apache/beam/sdk/transforms/WaitTest.java | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WaitTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WaitTest.java index b539bc8aabcf..05739c0f451d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WaitTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WaitTest.java @@ -22,6 +22,7 @@ import java.io.Serializable; import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -95,6 +96,22 @@ public String toString() { .add("watermarkUpdate", watermarkUpdate) .toString(); } + + @Override + public int hashCode() { + return Objects.hash(processingTime, element, watermarkUpdate); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof Event)) { + return false; + } + Event otherEvent = (Event) other; + return Objects.equals(processingTime, otherEvent.processingTime) + && Objects.equals(watermarkUpdate, otherEvent.watermarkUpdate) + && Objects.equals(element, otherEvent.element); + } } /** @@ -238,6 +255,21 @@ public WindowExpirationValue(@Nullable Instant watermarkAdvance, long value) { this.watermarkAdvance = watermarkAdvance; this.value = value; } + + @Override + public boolean equals(Object other) { + if (!(other instanceof WindowExpirationValue)) { + return false; + } + WindowExpirationValue otherValue = (WindowExpirationValue) other; + return Objects.equals(watermarkAdvance, otherValue.watermarkAdvance) + && value == otherValue.value; + } + + @Override + public int hashCode() { + return Objects.hash(watermarkAdvance, value); + } } @Test From 666a0d28c3602aa589ac1b61ca2574a886480f50 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Wed, 19 Feb 2025 21:23:22 -0500 Subject: [PATCH 11/20] Add enable_lineage experiment to Dataflow tests (#34027) --- .../beam_PostCommit_Java_ValidatesRunner_Dataflow.json | 5 +---- runners/google-cloud-dataflow-java/build.gradle | 2 ++ 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json index dbf131c70637..e3d6056a5de9 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json @@ -1,7 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test", - "https://github.com/apache/beam/pull/31268": "noting that PR #31268 should run this test", - "https://github.com/apache/beam/pull/31490": "noting that PR #31490 should run this test", - "https://github.com/apache/beam/pull/33921": "noting that PR #33921 should run this test" + "modification": 1 } diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index bf39672773be..3138b1481d33 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -166,6 +166,7 @@ def legacyPipelineOptions = [ "--tempRoot=${dataflowValidatesTempRoot}", "--dataflowWorkerJar=${dataflowLegacyWorkerJar}", "--workerHarnessContainerImage=", + "--experiments=enable_lineage" ] def runnerV2PipelineOptions = [ @@ -176,6 +177,7 @@ def runnerV2PipelineOptions = [ "--sdkContainerImage=${dockerJavaImageContainer}:${dockerTag}", "--experiments=use_unified_worker,use_runner_v2", "--firestoreDb=${firestoreDb}", + "--experiments=enable_lineage" ] def commonLegacyExcludeCategories = [ From 5c79a5db3d601afd7e3b505ab27d617bfcb1ccdd Mon Sep 17 00:00:00 2001 From: Luv Agarwal Date: Thu, 20 Feb 2025 20:40:27 +0530 Subject: [PATCH 12/20] Add UUID support in SpannerSchema (#34034) * Add UUID support in Spanner Schema * Add test --- .../org/apache/beam/sdk/io/gcp/spanner/SpannerSchema.java | 6 ++++++ .../apache/beam/sdk/io/gcp/spanner/SpannerSchemaTest.java | 8 ++++++-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchema.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchema.java index fa44cadeba0a..e851e29c23fb 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchema.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchema.java @@ -181,6 +181,9 @@ private static Type parseSpannerType(String spannerType, Dialect dialect) { if (spannerType.startsWith("STRING")) { return Type.string(); } + if ("UUID".equals(spannerType)) { + return Type.string(); + } if (spannerType.startsWith("BYTES")) { return Type.bytes(); } @@ -260,6 +263,9 @@ private static Type parseSpannerType(String spannerType, Dialect dialect) { if (spannerType.startsWith("JSONB")) { return Type.pgJsonb(); } + if ("UUID".equals(spannerType)) { + return Type.string(); + } throw new IllegalArgumentException("Unknown spanner type " + spannerType); default: throw new IllegalArgumentException("Unrecognized dialect: " + dialect.name()); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchemaTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchemaTest.java index 1e89326d1e8c..fefe7dc1ef85 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchemaTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchemaTest.java @@ -41,10 +41,11 @@ public void testSingleTable() throws Exception { .addColumn("test", "protoVal", "PROTO") .addColumn("test", "enumVal", "ENUM") .addColumn("test", "tokens", "TOKENLIST") + .addColumn("test", "uuidCol", "UUID") .build(); assertEquals(1, schema.getTables().size()); - assertEquals(7, schema.getColumns("test").size()); + assertEquals(8, schema.getColumns("test").size()); assertEquals(1, schema.getKeyParts("test").size()); assertEquals(Type.json(), schema.getColumns("test").get(3).getType()); assertEquals( @@ -52,6 +53,7 @@ public void testSingleTable() throws Exception { assertEquals( Type.protoEnum("customer.app.TestEnum"), schema.getColumns("test").get(5).getType()); assertEquals(Type.bytes(), schema.getColumns("test").get(6).getType()); + assertEquals(Type.string(), schema.getColumns("test").get(7).getType()); } @Test @@ -84,12 +86,14 @@ public void testSinglePgTable() throws Exception { .addColumn("test", "numericVal", "numeric") .addColumn("test", "commitTime", "spanner.commit_timestamp") .addColumn("test", "jsonbCol", "jsonb") + .addColumn("test", "uuidCol", "uuid") .build(); assertEquals(1, schema.getTables().size()); - assertEquals(5, schema.getColumns("test").size()); + assertEquals(6, schema.getColumns("test").size()); assertEquals(1, schema.getKeyParts("test").size()); assertEquals(Type.timestamp(), schema.getColumns("test").get(3).getType()); + assertEquals(Type.string(), schema.getColumns("test").get(5).getType()); } @Test From 656b7a078e14aafad316c5ac68c85a26afac664e Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud <65791736+ahmedabu98@users.noreply.github.com> Date: Thu, 20 Feb 2025 16:32:55 +0000 Subject: [PATCH 13/20] fix dashboard link (#34023) --- website/www/site/content/en/documentation/io/testing.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/website/www/site/content/en/documentation/io/testing.md b/website/www/site/content/en/documentation/io/testing.md index 124ee7ac29a0..f35a1bcb3843 100644 --- a/website/www/site/content/en/documentation/io/testing.md +++ b/website/www/site/content/en/documentation/io/testing.md @@ -334,7 +334,7 @@ If you modified/added new Jenkins job definitions in your Pull Request, run the As mentioned before, we measure the performance of IOITs by gathering test execution times from Jenkins jobs that run periodically. The consequent results are stored in a database (BigQuery), therefore we can display them in a form of plots. -The dashboard gathering all the results is available here: [Performance Testing Dashboard](http://metrics.beam.apache.org/d/1/getting-started?orgId=1&viewPanel=123125) +The dashboard gathering all the results is available here: [Performance Testing Dashboard](http://35.193.202.176/d/1/getting-started?orgId=1&viewPanel=123125) ### Implementing Integration Tests {#implementing-integration-tests} From e8057410a8fc54e5f4474df1715451c3a9394d41 Mon Sep 17 00:00:00 2001 From: Robert Burke Date: Thu, 20 Feb 2025 10:18:00 -0800 Subject: [PATCH 14/20] [Go SDK] Add missing type inspection case for Alias types. (#34039) --- sdks/go/pkg/beam/util/starcgenx/starcgenx.go | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/go/pkg/beam/util/starcgenx/starcgenx.go b/sdks/go/pkg/beam/util/starcgenx/starcgenx.go index 22d2be6e43f1..b3231fd41274 100644 --- a/sdks/go/pkg/beam/util/starcgenx/starcgenx.go +++ b/sdks/go/pkg/beam/util/starcgenx/starcgenx.go @@ -521,6 +521,7 @@ func (e *Extractor) extractFromTuple(tuple *types.Tuple) { if pkg := at.Obj().Pkg(); pkg != nil { e.imports[pkg.Path()] = struct{}{} } + e.extractType(at.Obj()) } if t, ok := t.(*types.Named); ok { if pkg := t.Obj().Pkg(); pkg != nil { From dc3ddfca9599e91dc21b919bdc6ebafa996752bb Mon Sep 17 00:00:00 2001 From: Alfredo Scaccialepre Date: Fri, 21 Feb 2025 12:50:48 +0100 Subject: [PATCH 15/20] removed unneeded license header --- .../schemas/logicaltypes/logical-types.avsc | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/sdks/java/extensions/avro/src/test/avro/org/apache/beam/sdk/extensions/avro/schemas/logicaltypes/logical-types.avsc b/sdks/java/extensions/avro/src/test/avro/org/apache/beam/sdk/extensions/avro/schemas/logicaltypes/logical-types.avsc index 1dd1d8426f6d..1fddfde068ff 100644 --- a/sdks/java/extensions/avro/src/test/avro/org/apache/beam/sdk/extensions/avro/schemas/logicaltypes/logical-types.avsc +++ b/sdks/java/extensions/avro/src/test/avro/org/apache/beam/sdk/extensions/avro/schemas/logicaltypes/logical-types.avsc @@ -1,20 +1,3 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ { "type": "record", "name": "LogicalTypesExample", From 1f4d9a56fdce73f90ff662dec19f1413df8b6c06 Mon Sep 17 00:00:00 2001 From: Alfredo Scaccialepre Date: Fri, 21 Feb 2025 13:03:09 +0100 Subject: [PATCH 16/20] remove unneeded license header --- .../schemas/logicaltypes/logical-types.avsc | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/sdks/java/extensions/avro/src/test/avro/org/apache/beam/sdk/extensions/avro/schemas/logicaltypes/logical-types.avsc b/sdks/java/extensions/avro/src/test/avro/org/apache/beam/sdk/extensions/avro/schemas/logicaltypes/logical-types.avsc index 1dd1d8426f6d..1fddfde068ff 100644 --- a/sdks/java/extensions/avro/src/test/avro/org/apache/beam/sdk/extensions/avro/schemas/logicaltypes/logical-types.avsc +++ b/sdks/java/extensions/avro/src/test/avro/org/apache/beam/sdk/extensions/avro/schemas/logicaltypes/logical-types.avsc @@ -1,20 +1,3 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ { "type": "record", "name": "LogicalTypesExample", From 6ff042ededd9d909b5344ee04618a29cee1ca87b Mon Sep 17 00:00:00 2001 From: Alfredo Scaccialepre Date: Tue, 25 Feb 2025 10:10:56 +0100 Subject: [PATCH 17/20] Added tests for specific records generated with avro 1.8.2 and 1.9.2, and to add custom conversions --- .../avro/schemas/utils/AvroUtils.java | 5 +- .../avro/schemas/utils/AvroUtilsTest.java | 77 +++++++++++++++---- 2 files changed, 64 insertions(+), 18 deletions(-) diff --git a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java index 1758e2948f58..460bfaec4a36 100644 --- a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java +++ b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java @@ -1668,7 +1668,7 @@ private static org.apache.avro.Schema buildHiveLogicalTypeSchema( return new org.apache.avro.Schema.Parser().parse(schemaJson); } - private static GenericData getGenericData(SpecificRecordBase record) { + static GenericData getGenericData(SpecificRecordBase record) { try { return record.getSpecificData(); } catch (NoSuchMethodError e) { @@ -1691,12 +1691,13 @@ private static T checkRawType( Class convertedType) { String msg = String.format( - "Value %s of class %s is not a supported type for logical type %s. " + "Value %s of class %s is not a supported type for logical type %s (%s). " + "Underlying avro built-in raw type should be instance of %s. " + "However it is instance of %s and has value %s ." + "Generic data has conversion %s, convertedType %s", value, value.getClass(), + logicalType.getName(), logicalType, desiredRawType, rawType.getClass(), diff --git a/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java index 1b00007bae6d..35183f0044b6 100644 --- a/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java +++ b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java @@ -36,9 +36,11 @@ import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema.Type; +import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; import org.apache.avro.reflect.ReflectData; +import org.apache.avro.specific.SpecificRecordBase; import org.apache.avro.util.Utf8; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; @@ -122,13 +124,6 @@ public void supportsAnyAvroSchema( @Test public void supportsAllLogicalTypes() { - if (VERSION_AVRO.equals("1.8.2") || VERSION_AVRO.equals("1.9.2")) { - // Skip this test for Avro 1.8.2 and 1.9.2 as they do not support all logical types - // and do not register all conversions to the GenericRecord.MODEL$. In user code, - // those older versions can still be used; if conversions are needed, a GenericData with the - // appropriate conversions can be passed to AvroUtils.toBeamRowStrict - return; - } BigDecimal bigDecimalPrecision5Scale2 = new BigDecimal("123.45"); BigDecimal bigDecimalPrecision10Scale4 = new BigDecimal("12345.6789"); @@ -140,7 +135,7 @@ public void supportsAllLogicalTypes() { DateTime dateTime = new DateTime(2025, 2, 17, 0, 0, 0, DateTimeZone.UTC); - GenericRecord specificRecord = + SpecificRecordBase genericRecord = getSpecificRecordWithLogicalTypes( dateTime, timeMicros, @@ -160,7 +155,35 @@ public void supportsAllLogicalTypes() { bigDecimalPrecision20Scale6, uuid); - Row actual = AvroUtils.toBeamRowStrict(specificRecord, null, null); + GenericData genericData; + switch (VERSION_AVRO) { + case "1.8.2": + // SpecificRecords generated with 1.8.2 have no registered conversions. Still this is a + // supported case, as the user can pass a GenericData with the appropriate conversions to + // AvroUtils.toBeamRowStrict. + // Basically GenericRecords can contain objects of any type, as long as the user provides + // the appropriate conversions. + genericData = new GenericData(); + genericData.addLogicalTypeConversion(new AvroJodaTimeConversions.DateConversion()); + genericData.addLogicalTypeConversion(new AvroJodaTimeConversions.TimeConversion()); + genericData.addLogicalTypeConversion(new AvroJodaTimeConversions.TimestampConversion()); + genericData.addLogicalTypeConversion(new Conversions.DecimalConversion()); + break; + case "1.9.2": + // SpecificRecords generated with 1.9.2 have some registered conversions, but not all. We + // can add the missing ones manually. + genericData = AvroUtils.getGenericData(genericRecord); + genericData.addLogicalTypeConversion(new AvroJavaTimeConversions.TimeMicrosConversion()); + genericData.addLogicalTypeConversion( + new AvroJavaTimeConversions.TimestampMicrosConversion()); + break; + default: + // SpecificRecords generated with 1.10.0+ have all conversions registered. Passing null to + // toBeamRowStrict ensures that the GenericData of the record is used as is. + genericData = null; + } + + Row actual = AvroUtils.toBeamRowStrict(genericRecord, null, genericData); assertEquals(expected, actual); } @@ -207,13 +230,31 @@ private static LogicalTypesExample getSpecificRecordWithLogicalTypes( dateTime.get(DateTimeFieldType.dayOfMonth())); LogicalTypesExample r = new LogicalTypesExample(); - r.put("dateField", localDate); - r.put("timeMillisField", javaLocalTime(timeMicros, ChronoUnit.MILLIS)); - r.put("timeMicrosField", javaLocalTime(timeMicros, ChronoUnit.MICROS)); - r.put("timestampMillisField", javaInstant(timestampMicros, ChronoUnit.MILLIS)); - r.put("timestampMicrosField", javaInstant(timestampMicros, ChronoUnit.MICROS)); - r.put("localTimestampMillisField", javaLocalDateTimeAtUtc(timestampMicros, ChronoUnit.MILLIS)); - r.put("localTimestampMicrosField", javaLocalDateTimeAtUtc(timestampMicros, ChronoUnit.MICROS)); + if (VERSION_AVRO.equals("1.8.2")) { + // Avro 1.8.2 does not support java.time, must use joda time + r.put("dateField", dateTime.toLocalDate()); + r.put("timeMillisField", jodaLocalTime(timeMicros)); + r.put("timeMicrosField", timeMicros); + r.put("timestampMillisField", jodaInstant(timestampMicros).toDateTime()); + r.put("timestampMicrosField", timestampMicros); + } else { + r.put("dateField", localDate); + r.put("timeMillisField", javaLocalTime(timeMicros, ChronoUnit.MILLIS)); + r.put("timeMicrosField", javaLocalTime(timeMicros, ChronoUnit.MICROS)); + r.put("timestampMillisField", javaInstant(timestampMicros, ChronoUnit.MILLIS)); + r.put("timestampMicrosField", javaInstant(timestampMicros, ChronoUnit.MICROS)); + } + if (VERSION_AVRO.equals("1.8.2") || VERSION_AVRO.equals("1.9.2")) { + // local-timestamp-millis and local-timestamp-micros only in 1.10.0+ + r.put("localTimestampMillisField", timestampMicros / 1000); + r.put("localTimestampMicrosField", timestampMicros); + } else { + r.put( + "localTimestampMillisField", javaLocalDateTimeAtUtc(timestampMicros, ChronoUnit.MILLIS)); + r.put( + "localTimestampMicrosField", javaLocalDateTimeAtUtc(timestampMicros, ChronoUnit.MICROS)); + } + r.put("decimalSmall", bigDecimalPrecision5Scale2); r.put("decimalMedium", bigDecimalPrecision10Scale4); r.put("decimalLarge", bigDecimalPrecision20Scale6); @@ -484,6 +525,10 @@ private static java.time.LocalTime javaLocalTime(long micros, TemporalUnit tempo return java.time.LocalTime.ofNanoOfDay(micros * 1000).truncatedTo(temporalUnit); } + private static org.joda.time.LocalTime jodaLocalTime(long micros) { + return org.joda.time.LocalTime.fromMillisOfDay(micros / 1000); + } + @Test public void testFromAvroSchema() { assertEquals(getBeamSchema(), AvroUtils.toBeamSchema(getAvroSchema())); From d4bbf462da04a4c9913b0e457ada5370ed749dbd Mon Sep 17 00:00:00 2001 From: Alfredo Scaccialepre Date: Tue, 25 Feb 2025 12:27:45 +0100 Subject: [PATCH 18/20] Supporting different UUID representations in different avro versions --- .../sdk/extensions/avro/schemas/utils/AvroUtilsTest.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java index 35183f0044b6..7cda1e9dba5a 100644 --- a/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java +++ b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java @@ -261,7 +261,14 @@ private static LogicalTypesExample getSpecificRecordWithLogicalTypes( r.put("fixedDecimalSmall", bigDecimalPrecision5Scale2); r.put("fixedDecimalMedium", bigDecimalPrecision10Scale4); r.put("fixedDecimalLarge", bigDecimalPrecision20Scale6); - r.put("uuidField", uuid.toString()); + + try { + r.put("uuidField", uuid.toString()); + } catch (ClassCastException e) { + // the avro tools version used by gradle-avro-plugin is more recent and uses UUID, while the + // ones used for backward compatibility tests (1.8.2, 1.9.2 and 1.10.2) use CharSequence + r.put("uuidField", uuid); + } return r; } From 58931ebba5d08d0ceb96ca7419e795fc20d7e3d0 Mon Sep 17 00:00:00 2001 From: Alfredo Scaccialepre Date: Tue, 25 Feb 2025 15:52:04 +0100 Subject: [PATCH 19/20] Spotless fixes --- .../bigquery/StorageApiSinkRowUpdateIT.java | 7 ++-- .../StorageApiSinkSchemaUpdateIT.java | 34 +++++++++++++------ 2 files changed, 27 insertions(+), 14 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkRowUpdateIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkRowUpdateIT.java index fa7a3d048750..42702a10d1c9 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkRowUpdateIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkRowUpdateIT.java @@ -24,10 +24,9 @@ import com.google.api.services.bigquery.model.TableFieldSchema; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; +import com.google.cloud.bigquery.storage.v1.Exceptions; import java.io.IOException; import java.util.List; - -import com.google.cloud.bigquery.storage.v1.Exceptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; @@ -212,12 +211,12 @@ private void runPipelineAndWait(Pipeline p) { while (root != null && root.getCause() != null) { root = root.getCause(); } - // Tolerate a StreamWriterClosedException, which sometimes happens after all writes have been flushed. + // Tolerate a StreamWriterClosedException, which sometimes happens after all writes have been + // flushed. if (root instanceof Exceptions.StreamWriterClosedException) { return; } throw e; } } - } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaUpdateIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaUpdateIT.java index ab9a472b7088..434e8bf3f4b7 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaUpdateIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaUpdateIT.java @@ -228,7 +228,8 @@ public void setup() { public void processElement(ProcessContext c, @StateId(ROW_COUNTER) ValueState counter) throws Exception { int current = firstNonNull(counter.read(), 0); - // We update schema early on to leave a healthy amount of time for the StreamWriter to recognize it, + // We update schema early on to leave a healthy amount of time for the StreamWriter to + // recognize it, // ensuring that subsequent writers are created with the updated schema. if (current == SCHEMA_UPDATE_TRIGGER) { for (Map.Entry entry : newSchemas.entrySet()) { @@ -246,8 +247,10 @@ public void processElement(ProcessContext c, @StateId(ROW_COUNTER) ValueState entry : newSchemas.entrySet()) { - TableSchema currentSchema = bqClient.getTableResource(projectId, datasetId, entry.getKey()).getSchema(); - TableSchema expectedSchema = BigQueryHelpers.fromJsonString(entry.getValue(), TableSchema.class); + TableSchema currentSchema = + bqClient.getTableResource(projectId, datasetId, entry.getKey()).getSchema(); + TableSchema expectedSchema = + BigQueryHelpers.fromJsonString(entry.getValue(), TableSchema.class); if (currentSchema.getFields().size() != expectedSchema.getFields().size()) { schemaPropagated = false; break; @@ -261,7 +264,9 @@ public void processElement(ProcessContext c, @StateId(ROW_COUNTER) ValueState getIdFromInstant = waitLonger ? (Function & Serializable) @@ -666,7 +676,8 @@ public void runDynamicDestinationsWithAutoSchemaUpdate(boolean useAtLeastOnce) t write = write .withMethod(Write.Method.STORAGE_WRITE_API) - .withTriggeringFrequency(Duration.standardSeconds(changeTableSchema ? LONG_WAIT_SECONDS : 1)); + .withTriggeringFrequency( + Duration.standardSeconds(changeTableSchema ? LONG_WAIT_SECONDS : 1)); } int numRows = TOTAL_N; @@ -674,9 +685,12 @@ public void runDynamicDestinationsWithAutoSchemaUpdate(boolean useAtLeastOnce) t Instant start = new Instant(0); // We give a healthy waiting period between each element to give Storage API streams a chance to // recognize the new schema. Apply on relevant tests. - Duration interval = changeTableSchema ? Duration.standardSeconds(LONG_WAIT_SECONDS) : Duration.millis(1); + Duration interval = + changeTableSchema ? Duration.standardSeconds(LONG_WAIT_SECONDS) : Duration.millis(1); Duration stop = - changeTableSchema ? Duration.standardSeconds((numRows - 1) * LONG_WAIT_SECONDS) : Duration.millis(numRows - 1); + changeTableSchema + ? Duration.standardSeconds((numRows - 1) * LONG_WAIT_SECONDS) + : Duration.millis(numRows - 1); Function getIdFromInstant = changeTableSchema ? (Function & Serializable) From 6f1fcff8755f4db025e2eaf16ebbd0b71ff69037 Mon Sep 17 00:00:00 2001 From: Alfredo Scaccialepre Date: Mon, 17 Mar 2025 14:19:44 +0100 Subject: [PATCH 20/20] fix dependency typo --- sdks/java/extensions/avro/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/extensions/avro/build.gradle b/sdks/java/extensions/avro/build.gradle index 63c825b178d5..3d22befaf4d6 100644 --- a/sdks/java/extensions/avro/build.gradle +++ b/sdks/java/extensions/avro/build.gradle @@ -67,7 +67,7 @@ dependencies { implementation library.java.error_prone_annotations implementation library.java.avro implementation library.java.joda_time - implementation 'org.apache.commons:commons-lang3' + implementation library.java.commons_lang3 testImplementation(project(path: ":sdks:java:core", configuration: "shadowTest")) { // Exclude Avro dependencies from "core" since Avro support moved to this extension exclude group: "org.apache.avro", module: "avro"