Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryResourceNaming.createTempTableReference;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

Expand Down Expand Up @@ -49,7 +50,6 @@
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.Message;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -132,13 +132,10 @@
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Function;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
Expand Down Expand Up @@ -649,29 +646,19 @@ public static TypedRead<TableRow> readTableRowsWithSchema() {
BigQueryUtils.tableRowFromBeamRow());
}

private static class TableSchemaFunction
implements Serializable, Function<@Nullable String, @Nullable TableSchema> {
@Override
public @Nullable TableSchema apply(@Nullable String input) {
return BigQueryHelpers.fromJsonString(input, TableSchema.class);
}
}

@VisibleForTesting
static class GenericDatumTransformer<T> implements DatumReader<T> {
private final SerializableFunction<SchemaAndRecord, T> parseFn;
private final Supplier<TableSchema> tableSchema;
private final TableSchema tableSchema;
private GenericDatumReader<T> reader;
private org.apache.avro.Schema writerSchema;

public GenericDatumTransformer(
SerializableFunction<SchemaAndRecord, T> parseFn,
String tableSchema,
TableSchema tableSchema,
org.apache.avro.Schema writer) {
this.parseFn = parseFn;
this.tableSchema =
Suppliers.memoize(
Suppliers.compose(new TableSchemaFunction(), Suppliers.ofInstance(tableSchema)));
this.tableSchema = tableSchema;
this.writerSchema = writer;
this.reader = new GenericDatumReader<>(this.writerSchema);
}
Expand All @@ -689,7 +676,7 @@ public void setSchema(org.apache.avro.Schema schema) {
@Override
public T read(T reuse, Decoder in) throws IOException {
GenericRecord record = (GenericRecord) this.reader.read(reuse, in);
return parseFn.apply(new SchemaAndRecord(record, this.tableSchema.get()));
return parseFn.apply(new SchemaAndRecord(record, this.tableSchema));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removes the use of a Supplier function to store the TableSchema and simply uses a class property to hold the TableSchema.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I digged into this was added back to 7fde976 (https://issues.apache.org/jira/browse/BEAM-2532) then refactored into BigQueryIO.java by #22718. Is BEAM-2532 no longer an issue?

Copy link
Contributor

@Abacn Abacn Sep 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(answer myself) I think after the change of #22718, the supplier is not needed, as it no longer involves preserving a TableSchema in SerializableFunction. It is now a named static class GenericDatumTransformer not implementing Serializable.

}
}

Expand Down Expand Up @@ -721,16 +708,9 @@ public static <T> TypedRead<T> read(SerializableFunction<SchemaAndRecord, T> par
.setDatumReaderFactory(
(SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<T>>)
input -> {
try {
String jsonTableSchema = BigQueryIO.JSON_FACTORY.toString(input);
return (AvroSource.DatumReaderFactory<T>)
(writer, reader) ->
new GenericDatumTransformer<>(parseFn, jsonTableSchema, writer);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Analyzing the code further, I saw that the original GenericDatumTransformer's constructor received a JSON string so that later it would re-parse that JSON into a TableSchema again but via a Supplier function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that was because TableSchema is not serializable

} catch (IOException e) {
LOG.warn(
String.format("Error while converting table schema %s to JSON!", input), e);
return null;
}
TableSchema safeInput = checkStateNotNull(input);
return (AvroSource.DatumReaderFactory<T>)
(writer, reader) -> new GenericDatumTransformer<>(parseFn, safeInput, writer);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This uses the new GenericDatumTransformer constructor that just takes the TableSchema input directly instead of needing to convert to JSON and then back again to the TableSchema later.

})
// TODO: Remove setParseFn once https://github.com/apache/beam/issues/21076 is fixed.
.setParseFn(parseFn)
Expand Down Expand Up @@ -3386,9 +3366,7 @@ private <DestinationT> WriteResult expandTyped(
@SuppressWarnings({"unchecked", "nullness"})
Descriptors.Descriptor descriptor =
(Descriptors.Descriptor)
org.apache.beam.sdk.util.Preconditions.checkStateNotNull(
writeProtoClass.getMethod("getDescriptor"))
.invoke(null);
checkStateNotNull(writeProtoClass.getMethod("getDescriptor")).invoke(null);
TableSchema tableSchema =
TableRowToStorageApiProto.protoSchemaToTableSchema(
TableRowToStorageApiProto.tableSchemaFromDescriptor(descriptor));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryResourceNaming.createTempTableReference;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -143,18 +144,11 @@ public void evaluate() throws Throwable {

private SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<TableRow>>
datumReaderFactoryFn =
(SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<TableRow>>)
input -> {
try {
String jsonSchema = BigQueryIO.JSON_FACTORY.toString(input);
return (AvroSource.DatumReaderFactory<TableRow>)
(writer, reader) ->
new BigQueryIO.GenericDatumTransformer<>(
BigQueryIO.TableRowParser.INSTANCE, jsonSchema, writer);
} catch (IOException e) {
return null;
}
};
input ->
(AvroSource.DatumReaderFactory<TableRow>)
(writer, reader) ->
new BigQueryIO.GenericDatumTransformer<>(
BigQueryIO.TableRowParser.INSTANCE, checkStateNotNull(input), writer);

private static class MyData implements Serializable {
private String name;
Expand Down