What happened?
PCollection<Row> rows = pipeline.apply(JdbcIO.<Row>readWithPartitions()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
input.driverClassName, input.url)
.withUsername(input.username)
.withPassword(input.password))
.withTable(input.tableName)
.withPartitionColumn(input.primeKeyForPartitionRead)
.withRowOutput());
Now there is a column named "name" which is VARCHAR, which has been read as beam type LOGICAL_TYPE NOT NULL.
Then I do
Schema schemaAfterSql = Schema.builder().addNullableField("id", Schema.FieldType.INT64).addNullableField("name", Schema.FieldType.String).build();
String selectQuery = "SELECT id, name FROM PCOLLECTION" ;
PCollection<Row> processedRows = rows.apply("EnforcePolicy", SqlTransform.query(selectQuery)).setRowSchema(schemaAfterSql);
It gets "An exception occured while executing the Java class. Unable to parse query SELECT id, policy_name FROM PCOLLECTION: java.lang.IllegalArgumentException: Cannot find a matching Calcite SqlTypeName for Beam type: LOGICAL_TYPE NOT NULL"
Issue Priority
Priority: 0
Issue Component
Component: io-java-jdbc