Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion buildSrc/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ dependencies {
implementation("com.github.spotbugs.snom:spotbugs-gradle-plugin:5.0.3")

runtimeOnly("com.google.protobuf:protobuf-gradle-plugin:0.8.13") // Enable proto code generation
runtimeOnly("com.commercehub.gradle.plugin:gradle-avro-plugin:0.11.0") // Enable Avro code generation
runtimeOnly("com.github.davidmc24.gradle.plugin:gradle-avro-plugin:1.2.0") // Enable Avro code generation
runtimeOnly("com.diffplug.spotless:spotless-plugin-gradle:5.6.1") // Enable a code formatting plugin
runtimeOnly("gradle.plugin.com.palantir.gradle.docker:gradle-docker:0.22.0") // Enable building Docker containers
runtimeOnly("gradle.plugin.com.dorongold.plugins:task-tree:1.5") // Adds a 'taskTree' task to print task dependency tree
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,8 +507,8 @@ class BeamModulePlugin implements Plugin<Project> {
antlr_runtime : "org.antlr:antlr4-runtime:4.7",
args4j : "args4j:args4j:2.33",
auto_value_annotations : "com.google.auto.value:auto-value-annotations:$autovalue_version",
avro : "org.apache.avro:avro:1.8.2",
avro_tests : "org.apache.avro:avro:1.8.2:tests",
avro : "org.apache.avro:avro:1.9.2",
avro_tests : "org.apache.avro:avro:1.9.2:tests",
aws_java_sdk_cloudwatch : "com.amazonaws:aws-java-sdk-cloudwatch:$aws_java_sdk_version",
aws_java_sdk_core : "com.amazonaws:aws-java-sdk-core:$aws_java_sdk_version",
aws_java_sdk_dynamodb : "com.amazonaws:aws-java-sdk-dynamodb:$aws_java_sdk_version",
Expand Down Expand Up @@ -695,6 +695,7 @@ class BeamModulePlugin implements Plugin<Project> {
vendored_guava_26_0_jre : "org.apache.beam:beam-vendor-guava-26_0-jre:0.1",
vendored_calcite_1_28_0 : "org.apache.beam:beam-vendor-calcite-1_28_0:0.2",
woodstox_core_asl : "org.codehaus.woodstox:woodstox-core-asl:4.4.1",
xz_java : "org.tukaani:xz:1.9",
zstd_jni : "com.github.luben:zstd-jni:1.5.2-1",
quickcheck_core : "com.pholser:junit-quickcheck-core:$quickcheck_version",
quickcheck_generators : "com.pholser:junit-quickcheck-generators:$quickcheck_version",
Expand Down Expand Up @@ -2149,7 +2150,7 @@ class BeamModulePlugin implements Plugin<Project> {

// TODO: Decide whether this should be inlined into the one project that relies on it
// or be left here.
project.ext.applyAvroNature = { project.apply plugin: "com.commercehub.gradle.plugin.avro" }
project.ext.applyAvroNature = { project.apply plugin: "com.github.davidmc24.gradle.plugin.avro" }

project.ext.applyAntlrNature = {
project.apply plugin: 'antlr'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
<suppress checks=".*" files=".+[\\\/]generated-src[\\\/].+\.java" />
<suppress checks=".*" files=".+[\\\/]generated-sources[\\\/].+\.java" />
<suppress checks=".*" files=".+[\\\/]generated-test-sources[\\\/].+\.java" />
<suppress checks=".*" files=".+[\\\/]generated-test-avro-java[\\\/].+\.java" />

<!-- Suppression on JavadocMethod violations.

Expand Down
2 changes: 1 addition & 1 deletion sdks/java/container/license_scripts/dep_urls_java.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ paranamer:
'2.8':
license: "https://raw.githubusercontent.com/paul-hammant/paranamer/master/LICENSE.txt"
xz:
'1.5': # The original repo is down. This license is taken from https://tukaani.org/xz/java.html.
'1.9': # The original repo is down. This license is taken from https://tukaani.org/xz/java.html.
license: "file://{}/xz/COPYING"
jackson-bom:
'2.13.0':
Expand Down
1 change: 1 addition & 0 deletions sdks/java/core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -105,5 +105,6 @@ dependencies {
shadowTest library.java.avro_tests
shadowTest library.java.zstd_jni
shadowTest library.java.jamm
shadowTest library.java.xz_java
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to add this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because starting from Avro 1.9.0 this dependency became provided but we need this for tests

testRuntimeOnly library.java.slf4j_jdk14
}
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ private Object readResolve() throws IOException, ClassNotFoundException {
* Serializable}'s usage of the {@link #writeReplace} method. Kryo doesn't utilize Java's
* serialization and hence is able to encode the {@link Schema} object directly.
*/
private static class SerializableSchemaSupplier implements Serializable, Supplier<Schema> {
static class SerializableSchemaSupplier implements Serializable, Supplier<Schema> {
// writeReplace makes this object serializable. This is a limitation of FindBugs as discussed
// here:
// http://stackoverflow.com/questions/26156523/is-writeobject-not-neccesary-using-the-serialization-proxy-pattern
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,14 @@
* the start offset of the block is greater than or equal to the start offset of the source and less
* than the end offset of the source.
*
* <p>To use XZ-encoded Avro files, please include an explicit dependency on {@code xz-1.8.jar},
* which has been marked as optional in the Maven {@code sdk/pom.xml}.
* <p>To use XZ-encoded Avro files, please include an explicit dependency on {@code xz-1.9.jar},
* which has been marked as optional in the Maven {@code org.apache.avro:avro} dependency.
*
* <pre>{@code
* <dependency>
* <groupId>org.tukaani</groupId>
* <artifactId>xz</artifactId>
* <version>1.8</version>
* <version>1.9</version>
* </dependency>
* }</pre>
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.DateTime;
import org.joda.time.Instant;

/** Function to convert a {@link Row} to a user type using a creator factory. */
@Experimental(Kind.SCHEMAS)
Expand Down Expand Up @@ -119,6 +121,8 @@ public <ValueT> ValueT fromRow(
return (ValueT)
fromIterableValue(
type.getCollectionElementType(), (Iterable) value, elementType, typeFactory);
} else if (TypeName.DATETIME.equals(type.getTypeName())) {
return fromDatetimeValue(value, fieldType);
}
if (TypeName.MAP.equals(type.getTypeName())) {
return (ValueT)
Expand Down Expand Up @@ -200,6 +204,19 @@ private <ElementT> Iterable fromIterableValue(
typeFactory));
}

@SuppressWarnings("unchecked")
private <ValueT> ValueT fromDatetimeValue(ValueT value, Type fieldType) {
if (fieldType.getTypeName().equals("java.time.LocalDate")) {
DateTime dateTime = new DateTime(value);
return (ValueT)
java.time.LocalDate.of(
dateTime.getYear(), dateTime.getMonthOfYear(), dateTime.getDayOfMonth());
} else if (fieldType.getTypeName().equals("java.time.Instant")) {
return (ValueT) java.time.Instant.ofEpochMilli(((Instant) value).getMillis());
}
return value;
}

@SuppressWarnings("unchecked")
private Map<?, ?> fromMapValue(
FieldType keyType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.values;

import java.time.Instant;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -128,6 +129,14 @@ private <T> T getValue(FieldType type, Object fieldValue, @Nullable Integer cach
cachedMaps.computeIfAbsent(
cacheKey, i -> getMapValue(type.getMapKeyType(), type.getMapValueType(), map))
: (T) getMapValue(type.getMapKeyType(), type.getMapValueType(), map);
} else if (type.getTypeName().equals(TypeName.DATETIME)) {
if (fieldValue instanceof java.time.LocalDate) {
Instant instant = java.time.Instant.parse(fieldValue + "T00:00:00.00Z");
return (T) org.joda.time.Instant.ofEpochMilli(instant.toEpochMilli());
} else if (fieldValue instanceof java.time.Instant) {
return (T) org.joda.time.Instant.ofEpochMilli(((Instant) fieldValue).toEpochMilli());
}
return (T) fieldValue;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this logic should be in the Avro getters, instead of branching on this instance check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure it's related to only Avro case but since Beam schema uses joda time internally then java time should be converted in any case until we won't switch to java time (if we will, of course).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aromanenko-dev I think @TheNeuralBit is right. Based on benchmarks I've done just recently, branching in RowWithGetters doesn't perform well. In #17172 I'm suggesting to push all of the current code down into the getters.

The GetterBasedSchemaProviders (except the Avro one) only support Joda ReadableInstant (type is the method return type or field type) as DATETIME. Attempting to use java time would most likely fail during schema generation (or generate a row schema with nested internal fields)

For Avro there's already a conversion layer in place you could leverage for that. For DATETIME it's using these converters:

} else {
if (type.isLogicalType(OneOfType.IDENTIFIER)) {
OneOfType oneOfType = type.getLogicalType(OneOfType.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
Expand Down Expand Up @@ -85,7 +86,6 @@
import org.hamcrest.TypeSafeMatcher;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.LocalDate;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
Expand All @@ -112,8 +112,9 @@ public class AvroCoderTest {
"mystring",
ByteBuffer.wrap(new byte[] {1, 2, 3, 4}),
new fixed4(new byte[] {1, 2, 3, 4}),
new LocalDate(1979, 3, 14),
new DateTime().withDate(1979, 3, 14).withTime(1, 2, 3, 4),
java.time.LocalDate.of(1979, 3, 14),
java.time.Instant.ofEpochMilli(
new DateTime().withDate(1979, 3, 14).withTime(1, 2, 3, 4).getMillis()),
TestEnum.abc,
AVRO_NESTED_SPECIFIC_RECORD,
ImmutableList.of(AVRO_NESTED_SPECIFIC_RECORD, AVRO_NESTED_SPECIFIC_RECORD),
Expand Down Expand Up @@ -284,6 +285,7 @@ public void testKryoSerialization() throws Exception {
// Kryo instantiation
Kryo kryo = new Kryo();
kryo.setInstantiatorStrategy(new StdInstantiatorStrategy());
kryo.addDefaultSerializer(AvroCoder.SerializableSchemaSupplier.class, JavaSerializer.class);

// Serialization of object without any memoization
ByteArrayOutputStream coderWithoutMemoizationBos = new ByteArrayOutputStream();
Expand Down Expand Up @@ -350,10 +352,7 @@ public void testDisableReflectionEncoding() {
AvroCoder.of(Pojo.class, false);
fail("When userReclectApi is disable, schema should not be generated through reflection");
} catch (AvroRuntimeException e) {
String message =
"avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: "
+ "org.apache.avro.AvroRuntimeException: "
+ "Not a Specific class: class org.apache.beam.sdk.coders.AvroCoderTest$Pojo";
String message = "Not a Specific class: class org.apache.beam.sdk.coders.AvroCoderTest$Pojo";
assertEquals(message, e.getMessage());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,8 @@ public String toString() {
"mystring",
ByteBuffer.wrap(BYTE_ARRAY),
new fixed4(BYTE_ARRAY),
DATE,
DATE_TIME,
java.time.LocalDate.of(DATE.getYear(), DATE.getMonthOfYear(), DATE.getDayOfMonth()),
java.time.Instant.ofEpochMilli(DATE_TIME.getMillis()),
TestEnum.abc,
AVRO_NESTED_SPECIFIC_RECORD,
ImmutableList.of(AVRO_NESTED_SPECIFIC_RECORD, AVRO_NESTED_SPECIFIC_RECORD),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@
import org.apache.avro.Conversions;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.RandomData;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.util.RandomData;
import org.apache.avro.util.Utf8;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
Expand Down Expand Up @@ -98,7 +98,7 @@ public void avroToBeamRoundTrip(
@From(RecordSchemaGenerator.class) org.apache.avro.Schema avroSchema) {

Schema schema = AvroUtils.toBeamSchema(avroSchema);
Iterable iterable = new RandomData(avroSchema, 10);
Iterable iterable = new RandomData(avroSchema, 10, true);
List<GenericRecord> records = Lists.newArrayList((Iterable<GenericRecord>) iterable);

for (GenericRecord record : records) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ public void testGcpApiSurface() throws Exception {
classesInPackage("org.apache.avro"),
classesInPackage("org.apache.beam"),
classesInPackage("org.apache.commons.logging"),
classesInPackage("org.codehaus.jackson"),
Copy link
Contributor Author

@aromanenko-dev aromanenko-dev Apr 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not an Avro's dependency anymore .

classesInPackage("org.joda.time"),
Matchers.<Class<?>>equalTo(org.threeten.bp.Duration.class),
Matchers.<Class<?>>equalTo(org.threeten.bp.format.ResolverStyle.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public void testConvertGenericRecordToTableRow() throws Exception {
}
// After a Field is added to a Schema, it is assigned a position, so we can't simply reuse
// the existing Field.
avroFields.add(new Schema.Field(field.name(), schema, field.doc(), field.defaultValue()));
avroFields.add(new Schema.Field(field.name(), schema, field.doc(), field.defaultVal()));
}
Schema avroSchema = Schema.createRecord(avroFields);

Expand Down