diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryReadWriteIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryReadWriteIT.java index 5b3c83ec3650..14d742294c5b 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryReadWriteIT.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryReadWriteIT.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.extensions.sql.meta.provider.bigquery; -import static junit.framework.TestCase.assertNull; import static org.apache.beam.sdk.extensions.sql.meta.provider.bigquery.BigQueryTable.METHOD_PROPERTY; import static org.apache.beam.sdk.extensions.sql.meta.provider.bigquery.BigQueryTable.WRITE_DISPOSITION_PROPERTY; import static org.apache.beam.sdk.extensions.sql.utils.DateTimeUtils.parseTimestampWithUTCTimeZone; @@ -42,7 +41,6 @@ import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.PipelineResult.State; import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; -import org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamPushDownIOSourceRel; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils; @@ -740,12 +738,10 @@ public void testSQLRead_withDirectRead_withProjectPushDown() { BeamRelNode relNode = sqlEnv.parseQuery(selectTableStatement); PCollection output = BeamSqlRelUtils.toPCollection(readPipeline, relNode); - // Calc is not dropped because BigQuery does not support field reordering yet. - assertThat(relNode, instanceOf(BeamCalcRel.class)); - assertThat(relNode.getInput(0), instanceOf(BeamPushDownIOSourceRel.class)); + assertThat(relNode, instanceOf(BeamPushDownIOSourceRel.class)); // IO projects fields in the same order they are defined in the schema. assertThat( - relNode.getInput(0).getRowType().getFieldNames(), + relNode.getRowType().getFieldNames(), containsInAnyOrder("c_tinyint", "c_integer", "c_varchar")); // Field reordering is done in a Calc assertThat( @@ -816,15 +812,9 @@ public void testSQLRead_withDirectRead_withProjectAndFilterPushDown() { BeamRelNode relNode = sqlEnv.parseQuery(selectTableStatement); PCollection output = BeamSqlRelUtils.toPCollection(readPipeline, relNode); - assertThat(relNode, instanceOf(BeamCalcRel.class)); - // Predicate should be pushed-down to IO level - assertNull(((BeamCalcRel) relNode).getProgram().getCondition()); - - assertThat(relNode.getInput(0), instanceOf(BeamPushDownIOSourceRel.class)); + assertThat(relNode, instanceOf(BeamPushDownIOSourceRel.class)); // Unused fields should not be projected by an IO - assertThat( - relNode.getInput(0).getRowType().getFieldNames(), - containsInAnyOrder("c_varchar", "c_integer")); + assertThat(relNode.getRowType().getFieldNames(), containsInAnyOrder("c_varchar", "c_integer")); assertThat( output.getSchema(),