From 22b2ce20299dbd8cf3539d885ea5fc1d73c547f9 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 30 Sep 2025 15:36:06 -0400 Subject: [PATCH 1/4] Add missing SQL_SERVER mappings in python. Use SCHEMAIO jar for JDBCIO managed IO. --- .../schemaio-expansion-service/build.gradle | 7 +++++++ sdks/python/apache_beam/transforms/external.py | 13 +++++++++---- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/sdks/java/extensions/schemaio-expansion-service/build.gradle b/sdks/java/extensions/schemaio-expansion-service/build.gradle index 12ee92a9e109..486392a56827 100644 --- a/sdks/java/extensions/schemaio-expansion-service/build.gradle +++ b/sdks/java/extensions/schemaio-expansion-service/build.gradle @@ -76,3 +76,10 @@ task runExpansionService (type: JavaExec) { classpath = sourceSets.test.runtimeClasspath args = [project.findProperty("constructionService.port") ?: "8097"] } + +shadowJar { + manifest { + attributes(["Multi-Release": true]) + } + outputs.upToDateWhen { false } +} \ No newline at end of file diff --git a/sdks/python/apache_beam/transforms/external.py b/sdks/python/apache_beam/transforms/external.py index 3f9f56a54139..8d7ac85de5f9 100644 --- a/sdks/python/apache_beam/transforms/external.py +++ b/sdks/python/apache_beam/transforms/external.py @@ -71,6 +71,9 @@ _GCP_EXPANSION_SERVICE_JAR_TARGET = ( "sdks:java:io:google-cloud-platform:expansion-service:shadowJar") +_SCHEMAIO_EXPANSION_SERVICE_JAR_TARGET = ( + "sdks:java:extensions:schemaio-expansion-service:shadowJar") + # A mapping from supported managed transforms URNs to expansion service jars # that include the corresponding transforms. MANAGED_TRANSFORM_URN_TO_JAR_TARGET_MAPPING = { @@ -81,10 +84,12 @@ ManagedTransforms.Urns.KAFKA_WRITE.urn: _IO_EXPANSION_SERVICE_JAR_TARGET, ManagedTransforms.Urns.BIGQUERY_READ.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET, ManagedTransforms.Urns.BIGQUERY_WRITE.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET, # pylint: disable=line-too-long - ManagedTransforms.Urns.POSTGRES_READ.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET, - ManagedTransforms.Urns.POSTGRES_WRITE.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET, # pylint: disable=line-too-long - ManagedTransforms.Urns.MYSQL_READ.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET, - ManagedTransforms.Urns.MYSQL_WRITE.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET, + ManagedTransforms.Urns.POSTGRES_READ.urn: _SCHEMAIO_EXPANSION_SERVICE_JAR_TARGET, # pylint: disable=line-too-long + ManagedTransforms.Urns.POSTGRES_WRITE.urn: _SCHEMAIO_EXPANSION_SERVICE_JAR_TARGET, # pylint: disable=line-too-long + ManagedTransforms.Urns.MYSQL_READ.urn: _SCHEMAIO_EXPANSION_SERVICE_JAR_TARGET, # pylint: disable=line-too-long + ManagedTransforms.Urns.MYSQL_WRITE.urn: _SCHEMAIO_EXPANSION_SERVICE_JAR_TARGET, # pylint: disable=line-too-long + ManagedTransforms.Urns.SQL_SERVER_READ.urn: _SCHEMAIO_EXPANSION_SERVICE_JAR_TARGET, # pylint: disable=line-too-long + ManagedTransforms.Urns.SQL_SERVER_WRITE.urn: _SCHEMAIO_EXPANSION_SERVICE_JAR_TARGET, # pylint: disable=line-too-long } From 3a6d555f4e5c7b26d3ca17a637204046a85f2c6d Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 30 Sep 2025 15:41:04 -0400 Subject: [PATCH 2/4] Update parameter check for managed io for postgres so it is consistent with other dbs --- .../ReadFromPostgresSchemaTransformProvider.java | 10 +++++++--- .../WriteToPostgresSchemaTransformProvider.java | 10 +++++++--- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromPostgresSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromPostgresSchemaTransformProvider.java index 834e7a0a4927..05011be73796 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromPostgresSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromPostgresSchemaTransformProvider.java @@ -59,14 +59,18 @@ protected String jdbcType() { JdbcReadSchemaTransformConfiguration configuration) { String jdbcType = configuration.getJdbcType(); if (jdbcType != null && !jdbcType.isEmpty() && !jdbcType.equals(jdbcType())) { - throw new IllegalArgumentException( - String.format("Wrong JDBC type. Expected '%s' but got '%s'", jdbcType(), jdbcType)); + LOG.warn( + "Wrong JDBC type. Expected '{}' but got '{}'. Overriding with '{}'.", + jdbcType(), + jdbcType, + jdbcType()); + configuration = configuration.toBuilder().setJdbcType(jdbcType()).build(); } List<@org.checkerframework.checker.nullness.qual.Nullable String> connectionInitSql = configuration.getConnectionInitSql(); if (connectionInitSql != null && !connectionInitSql.isEmpty()) { - LOG.warn("Postgres does not support connectionInitSql, ignoring."); + throw new IllegalArgumentException("Postgres does not support connectionInitSql."); } Boolean disableAutoCommit = configuration.getDisableAutoCommit(); diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToPostgresSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToPostgresSchemaTransformProvider.java index 97074742dbed..64581c2b01be 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToPostgresSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToPostgresSchemaTransformProvider.java @@ -59,14 +59,18 @@ protected String jdbcType() { JdbcWriteSchemaTransformConfiguration configuration) { String jdbcType = configuration.getJdbcType(); if (jdbcType != null && !jdbcType.isEmpty() && !jdbcType.equals(jdbcType())) { - throw new IllegalArgumentException( - String.format("Wrong JDBC type. Expected '%s' but got '%s'", jdbcType(), jdbcType)); + LOG.warn( + "Wrong JDBC type. Expected '{}' but got '{}'. Overriding with '{}'.", + jdbcType(), + jdbcType, + jdbcType()); + configuration = configuration.toBuilder().setJdbcType(jdbcType()).build(); } List<@org.checkerframework.checker.nullness.qual.Nullable String> connectionInitSql = configuration.getConnectionInitSql(); if (connectionInitSql != null && !connectionInitSql.isEmpty()) { - LOG.warn("Postgres does not support connectionInitSql, ignoring."); + throw new IllegalArgumentException("Postgres does not support connectionInitSql."); } // Override "connectionInitSql" for postgres From adfd431c41bb13b7d76f002a4d270c6490c61f2c Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 8 Oct 2025 13:56:02 -0400 Subject: [PATCH 3/4] Address feedback from reviewers --- sdks/java/expansion-service/container/Dockerfile | 1 - .../schemaio-expansion-service/build.gradle | 1 - sdks/python/apache_beam/transforms/external.py | 15 ++++++--------- 3 files changed, 6 insertions(+), 11 deletions(-) diff --git a/sdks/java/expansion-service/container/Dockerfile b/sdks/java/expansion-service/container/Dockerfile index 2688a3176713..968f5cd2ac25 100644 --- a/sdks/java/expansion-service/container/Dockerfile +++ b/sdks/java/expansion-service/container/Dockerfile @@ -28,7 +28,6 @@ WORKDIR /opt/apache/beam COPY target/avro.jar jars/ COPY target/beam-sdks-java-io-expansion-service.jar jars/ COPY target/beam-sdks-java-io-google-cloud-platform-expansion-service.jar jars/ -COPY target/beam-sdks-java-extensions-schemaio-expansion-service.jar jars/ # Copy licenses COPY target/LICENSE /opt/apache/beam/ diff --git a/sdks/java/extensions/schemaio-expansion-service/build.gradle b/sdks/java/extensions/schemaio-expansion-service/build.gradle index 486392a56827..e33d6b96b636 100644 --- a/sdks/java/extensions/schemaio-expansion-service/build.gradle +++ b/sdks/java/extensions/schemaio-expansion-service/build.gradle @@ -81,5 +81,4 @@ shadowJar { manifest { attributes(["Multi-Release": true]) } - outputs.upToDateWhen { false } } \ No newline at end of file diff --git a/sdks/python/apache_beam/transforms/external.py b/sdks/python/apache_beam/transforms/external.py index 8d7ac85de5f9..4bcb61a814f3 100644 --- a/sdks/python/apache_beam/transforms/external.py +++ b/sdks/python/apache_beam/transforms/external.py @@ -71,9 +71,6 @@ _GCP_EXPANSION_SERVICE_JAR_TARGET = ( "sdks:java:io:google-cloud-platform:expansion-service:shadowJar") -_SCHEMAIO_EXPANSION_SERVICE_JAR_TARGET = ( - "sdks:java:extensions:schemaio-expansion-service:shadowJar") - # A mapping from supported managed transforms URNs to expansion service jars # that include the corresponding transforms. MANAGED_TRANSFORM_URN_TO_JAR_TARGET_MAPPING = { @@ -84,12 +81,12 @@ ManagedTransforms.Urns.KAFKA_WRITE.urn: _IO_EXPANSION_SERVICE_JAR_TARGET, ManagedTransforms.Urns.BIGQUERY_READ.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET, ManagedTransforms.Urns.BIGQUERY_WRITE.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET, # pylint: disable=line-too-long - ManagedTransforms.Urns.POSTGRES_READ.urn: _SCHEMAIO_EXPANSION_SERVICE_JAR_TARGET, # pylint: disable=line-too-long - ManagedTransforms.Urns.POSTGRES_WRITE.urn: _SCHEMAIO_EXPANSION_SERVICE_JAR_TARGET, # pylint: disable=line-too-long - ManagedTransforms.Urns.MYSQL_READ.urn: _SCHEMAIO_EXPANSION_SERVICE_JAR_TARGET, # pylint: disable=line-too-long - ManagedTransforms.Urns.MYSQL_WRITE.urn: _SCHEMAIO_EXPANSION_SERVICE_JAR_TARGET, # pylint: disable=line-too-long - ManagedTransforms.Urns.SQL_SERVER_READ.urn: _SCHEMAIO_EXPANSION_SERVICE_JAR_TARGET, # pylint: disable=line-too-long - ManagedTransforms.Urns.SQL_SERVER_WRITE.urn: _SCHEMAIO_EXPANSION_SERVICE_JAR_TARGET, # pylint: disable=line-too-long + ManagedTransforms.Urns.POSTGRES_READ.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET, # pylint: disable=line-too-long + ManagedTransforms.Urns.POSTGRES_WRITE.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET, # pylint: disable=line-too-long + ManagedTransforms.Urns.MYSQL_READ.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET, # pylint: disable=line-too-long + ManagedTransforms.Urns.MYSQL_WRITE.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET, # pylint: disable=line-too-long + ManagedTransforms.Urns.SQL_SERVER_READ.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET, # pylint: disable=line-too-long + ManagedTransforms.Urns.SQL_SERVER_WRITE.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET, # pylint: disable=line-too-long } From cc5ed6f4c4543b45a50a28c6fe1ab4b693cdf457 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 8 Oct 2025 13:59:57 -0400 Subject: [PATCH 4/4] Fix lints. --- sdks/python/apache_beam/transforms/external.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/transforms/external.py b/sdks/python/apache_beam/transforms/external.py index 4bcb61a814f3..03f079b04b52 100644 --- a/sdks/python/apache_beam/transforms/external.py +++ b/sdks/python/apache_beam/transforms/external.py @@ -81,10 +81,10 @@ ManagedTransforms.Urns.KAFKA_WRITE.urn: _IO_EXPANSION_SERVICE_JAR_TARGET, ManagedTransforms.Urns.BIGQUERY_READ.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET, ManagedTransforms.Urns.BIGQUERY_WRITE.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET, # pylint: disable=line-too-long - ManagedTransforms.Urns.POSTGRES_READ.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET, # pylint: disable=line-too-long + ManagedTransforms.Urns.POSTGRES_READ.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET, ManagedTransforms.Urns.POSTGRES_WRITE.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET, # pylint: disable=line-too-long - ManagedTransforms.Urns.MYSQL_READ.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET, # pylint: disable=line-too-long - ManagedTransforms.Urns.MYSQL_WRITE.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET, # pylint: disable=line-too-long + ManagedTransforms.Urns.MYSQL_READ.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET, + ManagedTransforms.Urns.MYSQL_WRITE.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET, ManagedTransforms.Urns.SQL_SERVER_READ.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET, # pylint: disable=line-too-long ManagedTransforms.Urns.SQL_SERVER_WRITE.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET, # pylint: disable=line-too-long }