From cdfffd80097fd022fafd9512c2ed27ac14a15621 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Tue, 19 Sep 2023 11:12:12 -0400 Subject: [PATCH] Revert "Remove TableSchema to JSON conversion. (#28274)" This reverts commit 7e830593e61ba1fbff16411b5825bfb4aea53ba2. --- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 40 ++++++++++++++----- .../io/gcp/bigquery/BigQueryIOReadTest.java | 18 ++++++--- 2 files changed, 43 insertions(+), 15 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 3c006d24d037..58d769312444 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -19,7 +19,6 @@ import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation; import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryResourceNaming.createTempTableReference; -import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; @@ -50,6 +49,7 @@ import com.google.protobuf.DynamicMessage; import com.google.protobuf.Message; import java.io.IOException; +import java.io.Serializable; import java.lang.reflect.InvocationTargetException; import java.util.Collections; import java.util.List; @@ -132,10 +132,13 @@ import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.beam.sdk.values.ValueInSingleWindow; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Function; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; @@ -646,19 +649,29 @@ public static TypedRead readTableRowsWithSchema() { BigQueryUtils.tableRowFromBeamRow()); } + private static class TableSchemaFunction + implements Serializable, Function<@Nullable String, @Nullable TableSchema> { + @Override + public @Nullable TableSchema apply(@Nullable String input) { + return BigQueryHelpers.fromJsonString(input, TableSchema.class); + } + } + @VisibleForTesting static class GenericDatumTransformer implements DatumReader { private final SerializableFunction parseFn; - private final TableSchema tableSchema; + private final Supplier tableSchema; private GenericDatumReader reader; private org.apache.avro.Schema writerSchema; public GenericDatumTransformer( SerializableFunction parseFn, - TableSchema tableSchema, + String tableSchema, org.apache.avro.Schema writer) { this.parseFn = parseFn; - this.tableSchema = tableSchema; + this.tableSchema = + Suppliers.memoize( + Suppliers.compose(new TableSchemaFunction(), Suppliers.ofInstance(tableSchema))); this.writerSchema = writer; this.reader = new GenericDatumReader<>(this.writerSchema); } @@ -676,7 +689,7 @@ public void setSchema(org.apache.avro.Schema schema) { @Override public T read(T reuse, Decoder in) throws IOException { GenericRecord record = (GenericRecord) this.reader.read(reuse, in); - return parseFn.apply(new SchemaAndRecord(record, this.tableSchema)); + return parseFn.apply(new SchemaAndRecord(record, this.tableSchema.get())); } } @@ -708,9 +721,16 @@ public static TypedRead read(SerializableFunction par .setDatumReaderFactory( (SerializableFunction>) input -> { - TableSchema safeInput = checkStateNotNull(input); - return (AvroSource.DatumReaderFactory) - (writer, reader) -> new GenericDatumTransformer<>(parseFn, safeInput, writer); + try { + String jsonTableSchema = BigQueryIO.JSON_FACTORY.toString(input); + return (AvroSource.DatumReaderFactory) + (writer, reader) -> + new GenericDatumTransformer<>(parseFn, jsonTableSchema, writer); + } catch (IOException e) { + LOG.warn( + String.format("Error while converting table schema %s to JSON!", input), e); + return null; + } }) // TODO: Remove setParseFn once https://github.com/apache/beam/issues/21076 is fixed. .setParseFn(parseFn) @@ -3366,7 +3386,9 @@ private WriteResult expandTyped( @SuppressWarnings({"unchecked", "nullness"}) Descriptors.Descriptor descriptor = (Descriptors.Descriptor) - checkStateNotNull(writeProtoClass.getMethod("getDescriptor")).invoke(null); + org.apache.beam.sdk.util.Preconditions.checkStateNotNull( + writeProtoClass.getMethod("getDescriptor")) + .invoke(null); TableSchema tableSchema = TableRowToStorageApiProto.protoSchemaToTableSchema( TableRowToStorageApiProto.tableSchemaFromDescriptor(descriptor)); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java index e274a8ac68ef..bc75ba8bd9ba 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java @@ -19,7 +19,6 @@ import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryResourceNaming.createTempTableReference; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; -import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; @@ -144,11 +143,18 @@ public void evaluate() throws Throwable { private SerializableFunction> datumReaderFactoryFn = - input -> - (AvroSource.DatumReaderFactory) - (writer, reader) -> - new BigQueryIO.GenericDatumTransformer<>( - BigQueryIO.TableRowParser.INSTANCE, checkStateNotNull(input), writer); + (SerializableFunction>) + input -> { + try { + String jsonSchema = BigQueryIO.JSON_FACTORY.toString(input); + return (AvroSource.DatumReaderFactory) + (writer, reader) -> + new BigQueryIO.GenericDatumTransformer<>( + BigQueryIO.TableRowParser.INSTANCE, jsonSchema, writer); + } catch (IOException e) { + return null; + } + }; private static class MyData implements Serializable { private String name;