diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java index c37ddf26abac..c68b33a02607 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java @@ -204,9 +204,10 @@ protected JdbcIO.DataSourceConfiguration getDataSourceConfiguration() { dataSourceConfiguration = dataSourceConfiguration.withConnectionInitSqls(initSqls); } - @Nullable Integer maxConnections = config.getInt32("maxConnections"); + @Nullable Short maxConnections = config.getInt16("maxConnections"); if (maxConnections != null) { - dataSourceConfiguration = dataSourceConfiguration.withMaxConnections(maxConnections); + dataSourceConfiguration = + dataSourceConfiguration.withMaxConnections(maxConnections.intValue()); } @Nullable String driverJars = config.getString("driverJars"); diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProviderTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProviderTest.java index d91eaaef6e62..ed380d813625 100644 --- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProviderTest.java +++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProviderTest.java @@ -17,9 +17,14 @@ */ package org.apache.beam.sdk.io.jdbc; +import static org.junit.Assert.assertEquals; + import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Objects; import javax.sql.DataSource; import org.apache.beam.sdk.io.common.DatabaseTestHelper; import org.apache.beam.sdk.io.common.TestRow; @@ -81,8 +86,7 @@ public void testPartitionedRead() { } // This test shouldn't work because we only support numeric and datetime columns and we are trying - // to use a string - // column as our partition source + // to use a string column as our partition source. @Test public void testPartitionedReadThatShouldntWork() throws Exception { JdbcSchemaIOProvider provider = new JdbcSchemaIOProvider(); @@ -110,6 +114,42 @@ public void testPartitionedReadThatShouldntWork() throws Exception { throw new Exception("Did not throw an exception"); } + // This test verifies we can read back existing data source configuration. It also serves as + // sanity check that the + // getDataSourceConfiguration should keep in sync with JdbcSchemaIOProvider.configurationSchema. + // Otherwise the test + // would throw an exception. + @Test + public void testAbleToReadDataSourceConfiguration() { + JdbcSchemaIOProvider provider = new JdbcSchemaIOProvider(); + + Row config = + Row.withSchema(provider.configurationSchema()) + .withFieldValue("driverClassName", "className") + .withFieldValue("jdbcUrl", "url") + .withFieldValue("username", "user") + .withFieldValue("password", "passwd") + .withFieldValue("connectionProperties", "connectionProp") + .withFieldValue("connectionInitSqls", new ArrayList<>(Collections.singleton("initSql"))) + .withFieldValue("maxConnections", (short) 3) + .withFieldValue("driverJars", "test.jar") + .build(); + JdbcSchemaIOProvider.JdbcSchemaIO schemaIO = + provider.from(READ_TABLE_NAME, config, Schema.builder().build()); + JdbcIO.DataSourceConfiguration dataSourceConf = schemaIO.getDataSourceConfiguration(); + assertEquals("className", Objects.requireNonNull(dataSourceConf.getDriverClassName()).get()); + assertEquals("url", Objects.requireNonNull(dataSourceConf.getUrl()).get()); + assertEquals("user", Objects.requireNonNull(dataSourceConf.getUsername()).get()); + assertEquals("passwd", Objects.requireNonNull(dataSourceConf.getPassword()).get()); + assertEquals( + "connectionProp", Objects.requireNonNull(dataSourceConf.getConnectionProperties()).get()); + assertEquals( + new ArrayList<>(Collections.singleton("initSql")), + Objects.requireNonNull(dataSourceConf.getConnectionInitSqls()).get()); + assertEquals(3, (int) dataSourceConf.getMaxConnections().get()); + assertEquals("test.jar", Objects.requireNonNull(dataSourceConf.getDriverJars()).get()); + } + /** Create test data that is consistent with that generated by TestRow. */ private static void addInitialData(DataSource dataSource, String tableName) throws SQLException { try (Connection connection = dataSource.getConnection()) {