From 801bd716810a4794557908a7c0e7d3ac931fe07a Mon Sep 17 00:00:00 2001 From: Damon Douglas Date: Thu, 31 Aug 2023 20:57:28 +0000 Subject: [PATCH 1/4] Rethrow error converting TableSchema to JSON --- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 58d769312444..a17fc299cea1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -19,6 +19,7 @@ import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation; import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryResourceNaming.createTempTableReference; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; @@ -721,15 +722,16 @@ public static TypedRead read(SerializableFunction par .setDatumReaderFactory( (SerializableFunction>) input -> { + TableSchema safeInput = checkStateNotNull(input); try { - String jsonTableSchema = BigQueryIO.JSON_FACTORY.toString(input); + String jsonTableSchema = BigQueryIO.JSON_FACTORY.toString(safeInput); return (AvroSource.DatumReaderFactory) (writer, reader) -> new GenericDatumTransformer<>(parseFn, jsonTableSchema, writer); } catch (IOException e) { - LOG.warn( - String.format("Error while converting table schema %s to JSON!", input), e); - return null; + throw new IllegalStateException( + String.format( + "error converting TableSchema to JSON: %s, error: %s", safeInput, e)); } }) // TODO: Remove setParseFn once https://github.com/apache/beam/issues/21076 is fixed. @@ -3386,9 +3388,7 @@ private WriteResult expandTyped( @SuppressWarnings({"unchecked", "nullness"}) Descriptors.Descriptor descriptor = (Descriptors.Descriptor) - org.apache.beam.sdk.util.Preconditions.checkStateNotNull( - writeProtoClass.getMethod("getDescriptor")) - .invoke(null); + checkStateNotNull(writeProtoClass.getMethod("getDescriptor")).invoke(null); TableSchema tableSchema = TableRowToStorageApiProto.protoSchemaToTableSchema( TableRowToStorageApiProto.tableSchemaFromDescriptor(descriptor)); From 806dcb085392d66c1fd241549683a0e8b957f3bc Mon Sep 17 00:00:00 2001 From: Damon Douglas Date: Mon, 11 Sep 2023 21:31:07 +0000 Subject: [PATCH 2/4] Remove need to parse TableSchema to/from JSON --- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 32 +++++++++---------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index a17fc299cea1..3653475d5afc 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -138,8 +138,6 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; @@ -661,7 +659,7 @@ private static class TableSchemaFunction @VisibleForTesting static class GenericDatumTransformer implements DatumReader { private final SerializableFunction parseFn; - private final Supplier tableSchema; + private final TableSchema tableSchema; private GenericDatumReader reader; private org.apache.avro.Schema writerSchema; @@ -670,9 +668,17 @@ public GenericDatumTransformer( String tableSchema, org.apache.avro.Schema writer) { this.parseFn = parseFn; - this.tableSchema = - Suppliers.memoize( - Suppliers.compose(new TableSchemaFunction(), Suppliers.ofInstance(tableSchema))); + this.tableSchema = new TableSchemaFunction().apply(tableSchema); + this.writerSchema = writer; + this.reader = new GenericDatumReader<>(this.writerSchema); + } + + public GenericDatumTransformer( + SerializableFunction parseFn, + TableSchema tableSchema, + org.apache.avro.Schema writer) { + this.parseFn = parseFn; + this.tableSchema = tableSchema; this.writerSchema = writer; this.reader = new GenericDatumReader<>(this.writerSchema); } @@ -690,7 +696,7 @@ public void setSchema(org.apache.avro.Schema schema) { @Override public T read(T reuse, Decoder in) throws IOException { GenericRecord record = (GenericRecord) this.reader.read(reuse, in); - return parseFn.apply(new SchemaAndRecord(record, this.tableSchema.get())); + return parseFn.apply(new SchemaAndRecord(record, this.tableSchema)); } } @@ -723,16 +729,8 @@ public static TypedRead read(SerializableFunction par (SerializableFunction>) input -> { TableSchema safeInput = checkStateNotNull(input); - try { - String jsonTableSchema = BigQueryIO.JSON_FACTORY.toString(safeInput); - return (AvroSource.DatumReaderFactory) - (writer, reader) -> - new GenericDatumTransformer<>(parseFn, jsonTableSchema, writer); - } catch (IOException e) { - throw new IllegalStateException( - String.format( - "error converting TableSchema to JSON: %s, error: %s", safeInput, e)); - } + return (AvroSource.DatumReaderFactory) + (writer, reader) -> new GenericDatumTransformer<>(parseFn, safeInput, writer); }) // TODO: Remove setParseFn once https://github.com/apache/beam/issues/21076 is fixed. .setParseFn(parseFn) From 9d2dd5e35550b0c2159933a4ec587afe1c74cb22 Mon Sep 17 00:00:00 2001 From: Damon Douglas Date: Sat, 16 Sep 2023 00:21:56 +0000 Subject: [PATCH 3/4] Remove GenericDatumTransformer's JSON string param --- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 10 ---------- .../io/gcp/bigquery/BigQueryIOReadTest.java | 18 ++++++------------ 2 files changed, 6 insertions(+), 22 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 3653475d5afc..4834d88e0b85 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -663,16 +663,6 @@ static class GenericDatumTransformer implements DatumReader { private GenericDatumReader reader; private org.apache.avro.Schema writerSchema; - public GenericDatumTransformer( - SerializableFunction parseFn, - String tableSchema, - org.apache.avro.Schema writer) { - this.parseFn = parseFn; - this.tableSchema = new TableSchemaFunction().apply(tableSchema); - this.writerSchema = writer; - this.reader = new GenericDatumReader<>(this.writerSchema); - } - public GenericDatumTransformer( SerializableFunction parseFn, TableSchema tableSchema, diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java index bc75ba8bd9ba..e274a8ac68ef 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java @@ -19,6 +19,7 @@ import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryResourceNaming.createTempTableReference; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; @@ -143,18 +144,11 @@ public void evaluate() throws Throwable { private SerializableFunction> datumReaderFactoryFn = - (SerializableFunction>) - input -> { - try { - String jsonSchema = BigQueryIO.JSON_FACTORY.toString(input); - return (AvroSource.DatumReaderFactory) - (writer, reader) -> - new BigQueryIO.GenericDatumTransformer<>( - BigQueryIO.TableRowParser.INSTANCE, jsonSchema, writer); - } catch (IOException e) { - return null; - } - }; + input -> + (AvroSource.DatumReaderFactory) + (writer, reader) -> + new BigQueryIO.GenericDatumTransformer<>( + BigQueryIO.TableRowParser.INSTANCE, checkStateNotNull(input), writer); private static class MyData implements Serializable { private String name; From e8a738a1aacf2f2e30e0c2671771d35709de0765 Mon Sep 17 00:00:00 2001 From: Damon Douglas Date: Sat, 16 Sep 2023 01:15:22 +0000 Subject: [PATCH 4/4] Remove unused TableSchemaFunction --- .../apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 4834d88e0b85..3c006d24d037 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -50,7 +50,6 @@ import com.google.protobuf.DynamicMessage; import com.google.protobuf.Message; import java.io.IOException; -import java.io.Serializable; import java.lang.reflect.InvocationTargetException; import java.util.Collections; import java.util.List; @@ -133,7 +132,6 @@ import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.beam.sdk.values.ValueInSingleWindow; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Function; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates; @@ -648,14 +646,6 @@ public static TypedRead readTableRowsWithSchema() { BigQueryUtils.tableRowFromBeamRow()); } - private static class TableSchemaFunction - implements Serializable, Function<@Nullable String, @Nullable TableSchema> { - @Override - public @Nullable TableSchema apply(@Nullable String input) { - return BigQueryHelpers.fromJsonString(input, TableSchema.class); - } - } - @VisibleForTesting static class GenericDatumTransformer implements DatumReader { private final SerializableFunction parseFn;