From 5c12c46476d173b913c704e46fa80845dc597575 Mon Sep 17 00:00:00 2001 From: Fei Xie Date: Sat, 26 Aug 2023 18:03:38 -0500 Subject: [PATCH 1/2] Add a test' --- .../sdk/io/jdbc/JdbcSchemaIOProvider.java | 4 +- .../sdk/io/jdbc/JdbcSchemaIOProviderTest.java | 40 ++++++++++++++++++- 2 files changed, 40 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java index c37ddf26abac..d1a029219c39 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java @@ -204,9 +204,9 @@ protected JdbcIO.DataSourceConfiguration getDataSourceConfiguration() { dataSourceConfiguration = dataSourceConfiguration.withConnectionInitSqls(initSqls); } - @Nullable Integer maxConnections = config.getInt32("maxConnections"); + @Nullable Short maxConnections = config.getInt16("maxConnections"); if (maxConnections != null) { - dataSourceConfiguration = dataSourceConfiguration.withMaxConnections(maxConnections); + dataSourceConfiguration = dataSourceConfiguration.withMaxConnections(maxConnections.intValue()); } @Nullable String driverJars = config.getString("driverJars"); diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProviderTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProviderTest.java index d91eaaef6e62..89a766ff53a8 100644 --- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProviderTest.java +++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProviderTest.java @@ -20,6 +20,9 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Objects; import javax.sql.DataSource; import org.apache.beam.sdk.io.common.DatabaseTestHelper; import org.apache.beam.sdk.io.common.TestRow; @@ -35,6 +38,9 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + @RunWith(JUnit4.class) public class JdbcSchemaIOProviderTest { @@ -81,8 +87,7 @@ public void testPartitionedRead() { } // This test shouldn't work because we only support numeric and datetime columns and we are trying - // to use a string - // column as our partition source + // to use a string column as our partition source. @Test public void testPartitionedReadThatShouldntWork() throws Exception { JdbcSchemaIOProvider provider = new JdbcSchemaIOProvider(); @@ -110,6 +115,37 @@ public void testPartitionedReadThatShouldntWork() throws Exception { throw new Exception("Did not throw an exception"); } + // This test verifies we can read back existing data source configuration. It also serves as sanity check that the + // getDataSourceConfiguration should keep in sync with JdbcSchemaIOProvider.configurationSchema. Otherwise the test + // would throw an exception. + @Test + public void testAbleToReadDataSourceConfiguration() { + JdbcSchemaIOProvider provider = new JdbcSchemaIOProvider(); + + Row config = + Row.withSchema(provider.configurationSchema()) + .withFieldValue("driverClassName", "className") + .withFieldValue("jdbcUrl", "url") + .withFieldValue("username", "user") + .withFieldValue("password", "passwd") + .withFieldValue("connectionProperties", "connectionProp") + .withFieldValue("connectionInitSqls", new ArrayList<>(Collections.singleton("initSql"))) + .withFieldValue("maxConnections", (short) 3) + .withFieldValue("driverJars", "test.jar") + .build(); + JdbcSchemaIOProvider.JdbcSchemaIO schemaIO = + provider.from(READ_TABLE_NAME, config, Schema.builder().build()); + JdbcIO.DataSourceConfiguration dataSourceConf = schemaIO.getDataSourceConfiguration(); + assertEquals("className", Objects.requireNonNull(dataSourceConf.getDriverClassName()).get()); + assertEquals("url", Objects.requireNonNull(dataSourceConf.getUrl()).get()); + assertEquals("user", Objects.requireNonNull(dataSourceConf.getUsername()).get()); + assertEquals("passwd", Objects.requireNonNull(dataSourceConf.getPassword()).get()); + assertEquals("connectionProp", Objects.requireNonNull(dataSourceConf.getConnectionProperties()).get()); + assertEquals(new ArrayList<>(Collections.singleton("initSql")), Objects.requireNonNull(dataSourceConf.getConnectionInitSqls()).get()); + assertEquals(3, (int) dataSourceConf.getMaxConnections().get()); + assertEquals("test.jar", Objects.requireNonNull(dataSourceConf.getDriverJars()).get()); + } + /** Create test data that is consistent with that generated by TestRow. */ private static void addInitialData(DataSource dataSource, String tableName) throws SQLException { try (Connection connection = dataSource.getConnection()) { From ff3c658f1bb883333e408618c9edf1aab44a20fa Mon Sep 17 00:00:00 2001 From: Fei Xie Date: Sun, 27 Aug 2023 11:25:05 -0500 Subject: [PATCH 2/2] Format --- .../beam/sdk/io/jdbc/JdbcSchemaIOProvider.java | 3 ++- .../sdk/io/jdbc/JdbcSchemaIOProviderTest.java | 18 +++++++++++------- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java index d1a029219c39..c68b33a02607 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java @@ -206,7 +206,8 @@ protected JdbcIO.DataSourceConfiguration getDataSourceConfiguration() { @Nullable Short maxConnections = config.getInt16("maxConnections"); if (maxConnections != null) { - dataSourceConfiguration = dataSourceConfiguration.withMaxConnections(maxConnections.intValue()); + dataSourceConfiguration = + dataSourceConfiguration.withMaxConnections(maxConnections.intValue()); } @Nullable String driverJars = config.getString("driverJars"); diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProviderTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProviderTest.java index 89a766ff53a8..ed380d813625 100644 --- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProviderTest.java +++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProviderTest.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.io.jdbc; +import static org.junit.Assert.assertEquals; + import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; @@ -38,9 +40,6 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - @RunWith(JUnit4.class) public class JdbcSchemaIOProviderTest { @@ -115,8 +114,10 @@ public void testPartitionedReadThatShouldntWork() throws Exception { throw new Exception("Did not throw an exception"); } - // This test verifies we can read back existing data source configuration. It also serves as sanity check that the - // getDataSourceConfiguration should keep in sync with JdbcSchemaIOProvider.configurationSchema. Otherwise the test + // This test verifies we can read back existing data source configuration. It also serves as + // sanity check that the + // getDataSourceConfiguration should keep in sync with JdbcSchemaIOProvider.configurationSchema. + // Otherwise the test // would throw an exception. @Test public void testAbleToReadDataSourceConfiguration() { @@ -140,8 +141,11 @@ public void testAbleToReadDataSourceConfiguration() { assertEquals("url", Objects.requireNonNull(dataSourceConf.getUrl()).get()); assertEquals("user", Objects.requireNonNull(dataSourceConf.getUsername()).get()); assertEquals("passwd", Objects.requireNonNull(dataSourceConf.getPassword()).get()); - assertEquals("connectionProp", Objects.requireNonNull(dataSourceConf.getConnectionProperties()).get()); - assertEquals(new ArrayList<>(Collections.singleton("initSql")), Objects.requireNonNull(dataSourceConf.getConnectionInitSqls()).get()); + assertEquals( + "connectionProp", Objects.requireNonNull(dataSourceConf.getConnectionProperties()).get()); + assertEquals( + new ArrayList<>(Collections.singleton("initSql")), + Objects.requireNonNull(dataSourceConf.getConnectionInitSqls()).get()); assertEquals(3, (int) dataSourceConf.getMaxConnections().get()); assertEquals("test.jar", Objects.requireNonNull(dataSourceConf.getDriverJars()).get()); }