Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
import org.apache.beam.sdk.extensions.sql.impl.rule.BeamIOSinkRule;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.transforms.RenameFields;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
Expand Down Expand Up @@ -132,7 +135,8 @@ public PCollection<Row> expand(PCollectionList<Row> pinput) {
"Wrong number of inputs for %s: %s",
BeamIOSinkRel.class.getSimpleName(),
pinput);
PCollection<Row> input = pinput.get(0);
Schema schema = CalciteUtils.toSchema(getExpectedInputRowType(0));
PCollection<Row> input = pinput.get(0).apply(RenameFields.<Row>create()).setRowSchema(schema);

sqlTable.buildIOWriter(input);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void testSimpleInsert() throws Exception {
+ "event_timestamp TIMESTAMP, \n"
+ "attributes MAP<VARCHAR, VARCHAR>, \n"
+ "payload ROW< \n"
+ " id INTEGER, \n"
+ " id BIGINT, \n"
+ " name VARCHAR \n"
+ " > \n"
+ ") \n"
Expand Down Expand Up @@ -111,7 +111,7 @@ public void testSimpleInsertFlat() throws Exception {
String pubsubTableString =
"CREATE EXTERNAL TABLE pubsub_topic (\n"
+ "event_timestamp TIMESTAMP, \n"
+ "id INTEGER, \n"
+ "id BIGINT, \n"
+ "name VARCHAR \n"
+ ") \n"
+ "TYPE 'pubsub' \n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.plan.RelOptCluster;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.plan.volcano.VolcanoPlanner;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.prepare.RelOptTableImpl;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.core.TableModify.Operation;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.type.RelDataType;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rex.RexBuilder;
Expand Down Expand Up @@ -170,7 +171,7 @@ public void testToEnumerable_count() {
RelOptTableImpl.create(null, type, ImmutableList.of(), null),
null,
new BeamValuesRel(cluster, type, tuples, null),
null,
Operation.INSERT,
null,
null,
false,
Expand Down