diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java index f672621f4759..ffe707fa6819 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java @@ -24,7 +24,10 @@ import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel; import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats; import org.apache.beam.sdk.extensions.sql.impl.rule.BeamIOSinkRule; +import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.transforms.RenameFields; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; @@ -132,7 +135,8 @@ public PCollection expand(PCollectionList pinput) { "Wrong number of inputs for %s: %s", BeamIOSinkRel.class.getSimpleName(), pinput); - PCollection input = pinput.get(0); + Schema schema = CalciteUtils.toSchema(getExpectedInputRowType(0)); + PCollection input = pinput.get(0).apply(RenameFields.create()).setRowSchema(schema); sqlTable.buildIOWriter(input); diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/PubsubToBigqueryIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/PubsubToBigqueryIT.java index b45986c207df..16d6c11a33b6 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/PubsubToBigqueryIT.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/PubsubToBigqueryIT.java @@ -57,7 +57,7 @@ public void testSimpleInsert() throws Exception { + "event_timestamp TIMESTAMP, \n" + "attributes MAP, \n" + "payload ROW< \n" - + " id INTEGER, \n" + + " id BIGINT, \n" + " name VARCHAR \n" + " > \n" + ") \n" @@ -111,7 +111,7 @@ public void testSimpleInsertFlat() throws Exception { String pubsubTableString = "CREATE EXTERNAL TABLE pubsub_topic (\n" + "event_timestamp TIMESTAMP, \n" - + "id INTEGER, \n" + + "id BIGINT, \n" + "name VARCHAR \n" + ") \n" + "TYPE 'pubsub' \n" diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java index 58b0fc7c27a3..c8db3ba68aff 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java @@ -47,6 +47,7 @@ import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.plan.RelOptCluster; import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.plan.volcano.VolcanoPlanner; import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.prepare.RelOptTableImpl; +import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.core.TableModify.Operation; import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.type.RelDataType; import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.type.RelDataTypeSystem; import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rex.RexBuilder; @@ -170,7 +171,7 @@ public void testToEnumerable_count() { RelOptTableImpl.create(null, type, ImmutableList.of(), null), null, new BeamValuesRel(cluster, type, tuples, null), - null, + Operation.INSERT, null, null, false,