Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryResourceNaming.createTempTableReference;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

Expand Down Expand Up @@ -50,6 +49,7 @@
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.Message;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -132,10 +132,13 @@
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Function;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
Expand Down Expand Up @@ -646,19 +649,29 @@ public static TypedRead<TableRow> readTableRowsWithSchema() {
BigQueryUtils.tableRowFromBeamRow());
}

private static class TableSchemaFunction
implements Serializable, Function<@Nullable String, @Nullable TableSchema> {
@Override
public @Nullable TableSchema apply(@Nullable String input) {
return BigQueryHelpers.fromJsonString(input, TableSchema.class);
}
}

@VisibleForTesting
static class GenericDatumTransformer<T> implements DatumReader<T> {
private final SerializableFunction<SchemaAndRecord, T> parseFn;
private final TableSchema tableSchema;
private final Supplier<TableSchema> tableSchema;
private GenericDatumReader<T> reader;
private org.apache.avro.Schema writerSchema;

public GenericDatumTransformer(
SerializableFunction<SchemaAndRecord, T> parseFn,
TableSchema tableSchema,
String tableSchema,
org.apache.avro.Schema writer) {
this.parseFn = parseFn;
this.tableSchema = tableSchema;
this.tableSchema =
Suppliers.memoize(
Suppliers.compose(new TableSchemaFunction(), Suppliers.ofInstance(tableSchema)));
this.writerSchema = writer;
this.reader = new GenericDatumReader<>(this.writerSchema);
}
Expand All @@ -676,7 +689,7 @@ public void setSchema(org.apache.avro.Schema schema) {
@Override
public T read(T reuse, Decoder in) throws IOException {
GenericRecord record = (GenericRecord) this.reader.read(reuse, in);
return parseFn.apply(new SchemaAndRecord(record, this.tableSchema));
return parseFn.apply(new SchemaAndRecord(record, this.tableSchema.get()));
}
}

Expand Down Expand Up @@ -708,9 +721,16 @@ public static <T> TypedRead<T> read(SerializableFunction<SchemaAndRecord, T> par
.setDatumReaderFactory(
(SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<T>>)
input -> {
TableSchema safeInput = checkStateNotNull(input);
return (AvroSource.DatumReaderFactory<T>)
(writer, reader) -> new GenericDatumTransformer<>(parseFn, safeInput, writer);
try {
String jsonTableSchema = BigQueryIO.JSON_FACTORY.toString(input);
return (AvroSource.DatumReaderFactory<T>)
(writer, reader) ->
new GenericDatumTransformer<>(parseFn, jsonTableSchema, writer);
} catch (IOException e) {
LOG.warn(
String.format("Error while converting table schema %s to JSON!", input), e);
return null;
}
})
// TODO: Remove setParseFn once https://github.com/apache/beam/issues/21076 is fixed.
.setParseFn(parseFn)
Expand Down Expand Up @@ -3366,7 +3386,9 @@ private <DestinationT> WriteResult expandTyped(
@SuppressWarnings({"unchecked", "nullness"})
Descriptors.Descriptor descriptor =
(Descriptors.Descriptor)
checkStateNotNull(writeProtoClass.getMethod("getDescriptor")).invoke(null);
org.apache.beam.sdk.util.Preconditions.checkStateNotNull(
writeProtoClass.getMethod("getDescriptor"))
.invoke(null);
TableSchema tableSchema =
TableRowToStorageApiProto.protoSchemaToTableSchema(
TableRowToStorageApiProto.tableSchemaFromDescriptor(descriptor));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryResourceNaming.createTempTableReference;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -144,11 +143,18 @@ public void evaluate() throws Throwable {

private SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<TableRow>>
datumReaderFactoryFn =
input ->
(AvroSource.DatumReaderFactory<TableRow>)
(writer, reader) ->
new BigQueryIO.GenericDatumTransformer<>(
BigQueryIO.TableRowParser.INSTANCE, checkStateNotNull(input), writer);
(SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<TableRow>>)
input -> {
try {
String jsonSchema = BigQueryIO.JSON_FACTORY.toString(input);
return (AvroSource.DatumReaderFactory<TableRow>)
(writer, reader) ->
new BigQueryIO.GenericDatumTransformer<>(
BigQueryIO.TableRowParser.INSTANCE, jsonSchema, writer);
} catch (IOException e) {
return null;
}
};

private static class MyData implements Serializable {
private String name;
Expand Down