From ec61d60f8b5e116bf605561052ba59f7a03ebf43 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 3 Sep 2025 10:13:47 -0400 Subject: [PATCH 1/6] Add mysql read to managed io --- .../pipeline/v1/external_transforms.proto | 2 ++ .../ReadFromMySqlSchemaTransformProvider.java | 31 ++++++++++++++++++- .../org/apache/beam/sdk/managed/Managed.java | 2 ++ .../python/apache_beam/transforms/external.py | 1 + sdks/python/apache_beam/transforms/managed.py | 2 ++ 5 files changed, 37 insertions(+), 1 deletion(-) diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto index 02a5dd18e2c6..afe82382a3dd 100644 --- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto +++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto @@ -80,6 +80,8 @@ message ManagedTransforms { "beam:schematransform:org.apache.beam:postgres_read:v1"]; POSTGRES_WRITE = 8 [(org.apache.beam.model.pipeline.v1.beam_urn) = "beam:schematransform:org.apache.beam:postgres_write:v1"]; + MYSQL_READ = 9 [(org.apache.beam.model.pipeline.v1.beam_urn) = + "beam:schematransform:org.apache.beam:mysql_read:v1"]; } } diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java index 3d0135ef8ecd..0ca3064f9528 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java @@ -18,20 +18,28 @@ package org.apache.beam.sdk.io.jdbc.providers; import static org.apache.beam.sdk.io.jdbc.JdbcUtil.MYSQL; +import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn; import com.google.auto.service.AutoService; +import org.apache.beam.model.pipeline.v1.ExternalTransforms; import org.apache.beam.sdk.io.jdbc.JdbcReadSchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.checkerframework.checker.initialization.qual.Initialized; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.UnknownKeyFor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @AutoService(SchemaTransformProvider.class) public class ReadFromMySqlSchemaTransformProvider extends JdbcReadSchemaTransformProvider { + private static final Logger LOG = + LoggerFactory.getLogger(ReadFromMySqlSchemaTransformProvider.class); + @Override public @UnknownKeyFor @NonNull @Initialized String identifier() { - return "beam:schematransform:org.apache.beam:mysql_read:v1"; + return getUrn(ExternalTransforms.ManagedTransforms.Urns.MYSQL_READ); } @Override @@ -43,4 +51,25 @@ public String description() { protected String jdbcType() { return MYSQL; } + + @Override + public @UnknownKeyFor @NonNull @Initialized SchemaTransform from( + JdbcReadSchemaTransformConfiguration configuration) { + String jdbcType = configuration.getJdbcType(); + if (jdbcType != null && !jdbcType.equals(jdbcType())) { + throw new IllegalArgumentException( + String.format("Wrong JDBC type. Expected '%s' but got '%s'", jdbcType(), jdbcType)); + } + + Integer fetchSize = configuration.getFetchSize(); + if (fetchSize != null + && fetchSize > 0 + && configuration.getJdbcUrl() != null + && !configuration.getJdbcUrl().contains("useCursorFetch=true")) { + LOG.warn( + "The fetchSize option is ignored. It is required to set useCursorFetch=true" + + " in the JDBC URL when using fetchSize for MySQL"); + } + return super.from(configuration); + } } diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java index cda84629a7d7..37ea11dd68c7 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java @@ -97,6 +97,7 @@ public class Managed { public static final String KAFKA = "kafka"; public static final String BIGQUERY = "bigquery"; public static final String POSTGRES = "postgres"; + public static final String MYSQL = "mysql"; // Supported SchemaTransforms public static final Map READ_TRANSFORMS = @@ -106,6 +107,7 @@ public class Managed { .put(KAFKA, getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_READ)) .put(BIGQUERY, getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_READ)) .put(POSTGRES, getUrn(ExternalTransforms.ManagedTransforms.Urns.POSTGRES_READ)) + .put(MYSQL, getUrn(ExternalTransforms.ManagedTransforms.Urns.MYSQL_READ)) .build(); public static final Map WRITE_TRANSFORMS = ImmutableMap.builder() diff --git a/sdks/python/apache_beam/transforms/external.py b/sdks/python/apache_beam/transforms/external.py index b22ed6e0c645..463d669f3649 100644 --- a/sdks/python/apache_beam/transforms/external.py +++ b/sdks/python/apache_beam/transforms/external.py @@ -83,6 +83,7 @@ ManagedTransforms.Urns.BIGQUERY_WRITE.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET, # pylint: disable=line-too-long ManagedTransforms.Urns.POSTGRES_READ.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET, ManagedTransforms.Urns.POSTGRES_WRITE.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET, # pylint: disable=line-too-long + ManagedTransforms.Urns.MYSQL_READ.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET, } diff --git a/sdks/python/apache_beam/transforms/managed.py b/sdks/python/apache_beam/transforms/managed.py index 72dfb6fd9a0a..45c3b2a40acc 100644 --- a/sdks/python/apache_beam/transforms/managed.py +++ b/sdks/python/apache_beam/transforms/managed.py @@ -86,6 +86,7 @@ KAFKA = "kafka" BIGQUERY = "bigquery" POSTGRES = "postgres" +MYSQL = "mysql" __all__ = ["ICEBERG", "KAFKA", "BIGQUERY", "Read", "Write"] @@ -98,6 +99,7 @@ class Read(PTransform): KAFKA: ManagedTransforms.Urns.KAFKA_READ.urn, BIGQUERY: ManagedTransforms.Urns.BIGQUERY_READ.urn, POSTGRES: ManagedTransforms.Urns.POSTGRES_READ.urn, + MYSQL: ManagedTransforms.Urns.MYSQL_READ.urn, } def __init__( From 254ee357e8374a034dcfe6a1733e91794b0e111d Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 3 Sep 2025 10:23:14 -0400 Subject: [PATCH 2/6] Add mysql write to managed io --- .../model/pipeline/v1/external_transforms.proto | 2 ++ .../WriteToMySqlSchemaTransformProvider.java | 16 +++++++++++++++- .../org/apache/beam/sdk/managed/Managed.java | 1 + sdks/python/apache_beam/transforms/external.py | 1 + sdks/python/apache_beam/transforms/managed.py | 1 + 5 files changed, 20 insertions(+), 1 deletion(-) diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto index afe82382a3dd..31232eb60671 100644 --- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto +++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto @@ -82,6 +82,8 @@ message ManagedTransforms { "beam:schematransform:org.apache.beam:postgres_write:v1"]; MYSQL_READ = 9 [(org.apache.beam.model.pipeline.v1.beam_urn) = "beam:schematransform:org.apache.beam:mysql_read:v1"]; + MYSQL_WRITE = 10 [(org.apache.beam.model.pipeline.v1.beam_urn) = + "beam:schematransform:org.apache.beam:mysql_write:v1"]; } } diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java index 57f085220162..9c234dfed988 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java @@ -18,9 +18,12 @@ package org.apache.beam.sdk.io.jdbc.providers; import static org.apache.beam.sdk.io.jdbc.JdbcUtil.MYSQL; +import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn; import com.google.auto.service.AutoService; +import org.apache.beam.model.pipeline.v1.ExternalTransforms; import org.apache.beam.sdk.io.jdbc.JdbcWriteSchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.checkerframework.checker.initialization.qual.Initialized; import org.checkerframework.checker.nullness.qual.NonNull; @@ -31,7 +34,7 @@ public class WriteToMySqlSchemaTransformProvider extends JdbcWriteSchemaTransfor @Override public @UnknownKeyFor @NonNull @Initialized String identifier() { - return "beam:schematransform:org.apache.beam:mysql_write:v1"; + return getUrn(ExternalTransforms.ManagedTransforms.Urns.MYSQL_WRITE); } @Override @@ -43,4 +46,15 @@ public String description() { protected String jdbcType() { return MYSQL; } + + @Override + public @UnknownKeyFor @NonNull @Initialized SchemaTransform from( + JdbcWriteSchemaTransformConfiguration configuration) { + String jdbcType = configuration.getJdbcType(); + if (jdbcType != null && !jdbcType.equals(jdbcType())) { + throw new IllegalArgumentException( + String.format("Wrong JDBC type. Expected '%s' but got '%s'", jdbcType(), jdbcType)); + } + return super.from(configuration); + } } diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java index 37ea11dd68c7..4f45eeac861e 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java @@ -115,6 +115,7 @@ public class Managed { .put(KAFKA, getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_WRITE)) .put(BIGQUERY, getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_WRITE)) .put(POSTGRES, getUrn(ExternalTransforms.ManagedTransforms.Urns.POSTGRES_WRITE)) + .put(MYSQL, getUrn(ExternalTransforms.ManagedTransforms.Urns.MYSQL_WRITE)) .build(); /** diff --git a/sdks/python/apache_beam/transforms/external.py b/sdks/python/apache_beam/transforms/external.py index 463d669f3649..3f9f56a54139 100644 --- a/sdks/python/apache_beam/transforms/external.py +++ b/sdks/python/apache_beam/transforms/external.py @@ -84,6 +84,7 @@ ManagedTransforms.Urns.POSTGRES_READ.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET, ManagedTransforms.Urns.POSTGRES_WRITE.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET, # pylint: disable=line-too-long ManagedTransforms.Urns.MYSQL_READ.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET, + ManagedTransforms.Urns.MYSQL_WRITE.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET, } diff --git a/sdks/python/apache_beam/transforms/managed.py b/sdks/python/apache_beam/transforms/managed.py index 45c3b2a40acc..03449236ac92 100644 --- a/sdks/python/apache_beam/transforms/managed.py +++ b/sdks/python/apache_beam/transforms/managed.py @@ -142,6 +142,7 @@ class Write(PTransform): KAFKA: ManagedTransforms.Urns.KAFKA_WRITE.urn, BIGQUERY: ManagedTransforms.Urns.BIGQUERY_WRITE.urn, POSTGRES: ManagedTransforms.Urns.POSTGRES_WRITE.urn, + MYSQL: ManagedTransforms.Urns.MYSQL_WRITE.urn, } def __init__( From 74a7b5b0f18f10fa0a7da02ed93b8d743fa467db Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 3 Sep 2025 20:52:08 -0400 Subject: [PATCH 3/6] Add schema transform translation and test for mysql read and write --- .../MySqlSchemaTransformTranslation.java | 93 +++++++ .../ReadFromMySqlSchemaTransformProvider.java | 9 +- .../WriteToMySqlSchemaTransformProvider.java | 9 +- .../MysqlSchemaTransformTranslationTest.java | 231 ++++++++++++++++++ 4 files changed, 340 insertions(+), 2 deletions(-) create mode 100644 sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/MySqlSchemaTransformTranslation.java create mode 100644 sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/providers/MysqlSchemaTransformTranslationTest.java diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/MySqlSchemaTransformTranslation.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/MySqlSchemaTransformTranslation.java new file mode 100644 index 000000000000..3367248b7198 --- /dev/null +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/MySqlSchemaTransformTranslation.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.jdbc.providers; + +import static org.apache.beam.sdk.io.jdbc.providers.ReadFromMySqlSchemaTransformProvider.MySqlReadSchemaTransform; +import static org.apache.beam.sdk.io.jdbc.providers.WriteToMySqlSchemaTransformProvider.MySqlWriteSchemaTransform; +import static org.apache.beam.sdk.schemas.transforms.SchemaTransformTranslation.SchemaTransformPayloadTranslator; + +import com.google.auto.service.AutoService; +import java.util.Map; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.construction.PTransformTranslation; +import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; + +public class MySqlSchemaTransformTranslation { + static class MySqlReadSchemaTransformTranslator + extends SchemaTransformPayloadTranslator { + @Override + public SchemaTransformProvider provider() { + return new ReadFromMySqlSchemaTransformProvider(); + } + + @Override + public Row toConfigRow(MySqlReadSchemaTransform transform) { + return transform.getConfigurationRow(); + } + } + + @AutoService(TransformPayloadTranslatorRegistrar.class) + public static class ReadRegistrar implements TransformPayloadTranslatorRegistrar { + @Override + @SuppressWarnings({ + "rawtypes", + }) + public Map< + ? extends Class, + ? extends PTransformTranslation.TransformPayloadTranslator> + getTransformPayloadTranslators() { + return ImmutableMap + ., PTransformTranslation.TransformPayloadTranslator>builder() + .put(MySqlReadSchemaTransform.class, new MySqlReadSchemaTransformTranslator()) + .build(); + } + } + + static class MySqlWriteSchemaTransformTranslator + extends SchemaTransformPayloadTranslator { + @Override + public SchemaTransformProvider provider() { + return new WriteToMySqlSchemaTransformProvider(); + } + + @Override + public Row toConfigRow(MySqlWriteSchemaTransform transform) { + return transform.getConfigurationRow(); + } + } + + @AutoService(TransformPayloadTranslatorRegistrar.class) + public static class WriteRegistrar implements TransformPayloadTranslatorRegistrar { + @Override + @SuppressWarnings({ + "rawtypes", + }) + public Map< + ? extends Class, + ? extends PTransformTranslation.TransformPayloadTranslator> + getTransformPayloadTranslators() { + return ImmutableMap + ., PTransformTranslation.TransformPayloadTranslator>builder() + .put(MySqlWriteSchemaTransform.class, new MySqlWriteSchemaTransformTranslator()) + .build(); + } + } +} diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java index 0ca3064f9528..2bf6928b5eb0 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java @@ -70,6 +70,13 @@ protected String jdbcType() { "The fetchSize option is ignored. It is required to set useCursorFetch=true" + " in the JDBC URL when using fetchSize for MySQL"); } - return super.from(configuration); + return new MySqlReadSchemaTransform(configuration); + } + + public static class MySqlReadSchemaTransform extends JdbcReadSchemaTransform { + public MySqlReadSchemaTransform(JdbcReadSchemaTransformConfiguration config) { + super(config, MYSQL); + config.validate(MYSQL); + } } } diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java index 9c234dfed988..a283a64d29c4 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java @@ -55,6 +55,13 @@ protected String jdbcType() { throw new IllegalArgumentException( String.format("Wrong JDBC type. Expected '%s' but got '%s'", jdbcType(), jdbcType)); } - return super.from(configuration); + return new MySqlWriteSchemaTransform(configuration); + } + + public static class MySqlWriteSchemaTransform extends JdbcWriteSchemaTransform { + public MySqlWriteSchemaTransform(JdbcWriteSchemaTransformConfiguration config) { + super(config, MYSQL); + config.validate(MYSQL); + } } } diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/providers/MysqlSchemaTransformTranslationTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/providers/MysqlSchemaTransformTranslationTest.java new file mode 100644 index 000000000000..dbcd3cecf8bc --- /dev/null +++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/providers/MysqlSchemaTransformTranslationTest.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.jdbc.providers; + +import static org.apache.beam.model.pipeline.v1.ExternalTransforms.ExpansionMethods.Enum.SCHEMA_TRANSFORM; +import static org.apache.beam.sdk.io.jdbc.providers.MySqlSchemaTransformTranslation.MySqlReadSchemaTransformTranslator; +import static org.apache.beam.sdk.io.jdbc.providers.MySqlSchemaTransformTranslation.MySqlWriteSchemaTransformTranslator; +import static org.apache.beam.sdk.io.jdbc.providers.ReadFromMySqlSchemaTransformProvider.MySqlReadSchemaTransform; +import static org.apache.beam.sdk.io.jdbc.providers.WriteToMySqlSchemaTransformProvider.MySqlWriteSchemaTransform; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.ExternalTransforms.SchemaTransformPayload; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.RowCoder; +import org.apache.beam.sdk.io.jdbc.JdbcIO; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaTranslation; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.util.construction.BeamUrns; +import org.apache.beam.sdk.util.construction.PipelineTranslation; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.InvalidProtocolBufferException; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; +import org.mockito.MockedStatic; +import org.mockito.Mockito; + +public class MysqlSchemaTransformTranslationTest { + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Rule public transient ExpectedException thrown = ExpectedException.none(); + + static final WriteToMySqlSchemaTransformProvider WRITE_PROVIDER = + new WriteToMySqlSchemaTransformProvider(); + static final ReadFromMySqlSchemaTransformProvider READ_PROVIDER = + new ReadFromMySqlSchemaTransformProvider(); + + static final Row READ_CONFIG = + Row.withSchema(READ_PROVIDER.configurationSchema()) + .withFieldValue("jdbc_url", "jdbc:mysql://host:port/database") + .withFieldValue("location", "test_table") + .withFieldValue("connection_properties", "some_property") + .withFieldValue("connection_init_sql", ImmutableList.builder().build()) + .withFieldValue("driver_class_name", null) + .withFieldValue("driver_jars", null) + .withFieldValue("disable_auto_commit", true) + .withFieldValue("fetch_size", 10) + .withFieldValue("num_partitions", 5) + .withFieldValue("output_parallelization", true) + .withFieldValue("partition_column", "col") + .withFieldValue("read_query", null) + .withFieldValue("username", "my_user") + .withFieldValue("password", "my_pass") + .build(); + + static final Row WRITE_CONFIG = + Row.withSchema(WRITE_PROVIDER.configurationSchema()) + .withFieldValue("jdbc_url", "jdbc:mysql://host:port/database") + .withFieldValue("location", "test_table") + .withFieldValue("autosharding", true) + .withFieldValue("connection_init_sql", ImmutableList.builder().build()) + .withFieldValue("connection_properties", "some_property") + .withFieldValue("driver_class_name", null) + .withFieldValue("driver_jars", null) + .withFieldValue("batch_size", 100L) + .withFieldValue("username", "my_user") + .withFieldValue("password", "my_pass") + .withFieldValue("write_statement", null) + .build(); + + @Test + public void testRecreateWriteTransformFromRow() { + MySqlWriteSchemaTransform writeTransform = + (MySqlWriteSchemaTransform) WRITE_PROVIDER.from(WRITE_CONFIG); + + MySqlWriteSchemaTransformTranslator translator = new MySqlWriteSchemaTransformTranslator(); + Row translatedRow = translator.toConfigRow(writeTransform); + + MySqlWriteSchemaTransform writeTransformFromRow = + translator.fromConfigRow(translatedRow, PipelineOptionsFactory.create()); + + assertEquals(WRITE_CONFIG, writeTransformFromRow.getConfigurationRow()); + } + + @Test + public void testWriteTransformProtoTranslation() + throws InvalidProtocolBufferException, IOException { + // First build a pipeline + Pipeline p = Pipeline.create(); + Schema inputSchema = Schema.builder().addStringField("name").build(); + PCollection input = + p.apply( + Create.of( + Collections.singletonList( + Row.withSchema(inputSchema).addValue("test").build()))) + .setRowSchema(inputSchema); + + MySqlWriteSchemaTransform writeTransform = + (MySqlWriteSchemaTransform) WRITE_PROVIDER.from(WRITE_CONFIG); + PCollectionRowTuple.of("input", input).apply(writeTransform); + + // Then translate the pipeline to a proto and extract MySqlWriteSchemaTransform proto + RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p); + List writeTransformProto = + pipelineProto.getComponents().getTransformsMap().values().stream() + .filter( + tr -> { + RunnerApi.FunctionSpec spec = tr.getSpec(); + try { + return spec.getUrn().equals(BeamUrns.getUrn(SCHEMA_TRANSFORM)) + && SchemaTransformPayload.parseFrom(spec.getPayload()) + .getIdentifier() + .equals(WRITE_PROVIDER.identifier()); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toList()); + assertEquals(1, writeTransformProto.size()); + RunnerApi.FunctionSpec spec = writeTransformProto.get(0).getSpec(); + + // Check that the proto contains correct values + SchemaTransformPayload payload = SchemaTransformPayload.parseFrom(spec.getPayload()); + Schema schemaFromSpec = SchemaTranslation.schemaFromProto(payload.getConfigurationSchema()); + assertEquals(WRITE_PROVIDER.configurationSchema(), schemaFromSpec); + Row rowFromSpec = RowCoder.of(schemaFromSpec).decode(payload.getConfigurationRow().newInput()); + + assertEquals(WRITE_CONFIG, rowFromSpec); + + // Use the information in the proto to recreate the MySqlWriteSchemaTransform + MySqlWriteSchemaTransformTranslator translator = new MySqlWriteSchemaTransformTranslator(); + MySqlWriteSchemaTransform writeTransformFromSpec = + translator.fromConfigRow(rowFromSpec, PipelineOptionsFactory.create()); + + assertEquals(WRITE_CONFIG, writeTransformFromSpec.getConfigurationRow()); + } + + @Test + public void testReCreateReadTransformFromRow() { + // setting a subset of fields here. + MySqlReadSchemaTransform readTransform = + (MySqlReadSchemaTransform) READ_PROVIDER.from(READ_CONFIG); + + MySqlReadSchemaTransformTranslator translator = new MySqlReadSchemaTransformTranslator(); + Row row = translator.toConfigRow(readTransform); + + MySqlReadSchemaTransform readTransformFromRow = + translator.fromConfigRow(row, PipelineOptionsFactory.create()); + + assertEquals(READ_CONFIG, readTransformFromRow.getConfigurationRow()); + } + + @Test + public void testReadTransformProtoTranslation() + throws InvalidProtocolBufferException, IOException { + // First build a pipeline + Pipeline p = Pipeline.create(); + + MySqlReadSchemaTransform readTransform = + (MySqlReadSchemaTransform) READ_PROVIDER.from(READ_CONFIG); + + // Mock inferBeamSchema since it requires database connection. + Schema expectedSchema = Schema.builder().addStringField("name").build(); + try (MockedStatic mock = Mockito.mockStatic(JdbcIO.ReadRows.class)) { + mock.when(() -> JdbcIO.ReadRows.inferBeamSchema(Mockito.any(), Mockito.any())) + .thenReturn(expectedSchema); + PCollectionRowTuple.empty(p).apply(readTransform); + } + + // Then translate the pipeline to a proto and extract MySqlReadSchemaTransform proto + RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p); + List readTransformProto = + pipelineProto.getComponents().getTransformsMap().values().stream() + .filter( + tr -> { + RunnerApi.FunctionSpec spec = tr.getSpec(); + try { + return spec.getUrn().equals(BeamUrns.getUrn(SCHEMA_TRANSFORM)) + && SchemaTransformPayload.parseFrom(spec.getPayload()) + .getIdentifier() + .equals(READ_PROVIDER.identifier()); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toList()); + assertEquals(1, readTransformProto.size()); + RunnerApi.FunctionSpec spec = readTransformProto.get(0).getSpec(); + + // Check that the proto contains correct values + SchemaTransformPayload payload = SchemaTransformPayload.parseFrom(spec.getPayload()); + Schema schemaFromSpec = SchemaTranslation.schemaFromProto(payload.getConfigurationSchema()); + assertEquals(READ_PROVIDER.configurationSchema(), schemaFromSpec); + Row rowFromSpec = RowCoder.of(schemaFromSpec).decode(payload.getConfigurationRow().newInput()); + assertEquals(READ_CONFIG, rowFromSpec); + + // Use the information in the proto to recreate the MySqlReadSchemaTransform + MySqlReadSchemaTransformTranslator translator = new MySqlReadSchemaTransformTranslator(); + MySqlReadSchemaTransform readTransformFromSpec = + translator.fromConfigRow(rowFromSpec, PipelineOptionsFactory.create()); + + assertEquals(READ_CONFIG, readTransformFromSpec.getConfigurationRow()); + } +} From 3a4db0ca6880988f6d8a30badfdbd7efba18140d Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Thu, 4 Sep 2025 21:08:14 -0400 Subject: [PATCH 4/6] Remove redundant config validation. --- .../io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java | 1 - .../io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java | 1 - 2 files changed, 2 deletions(-) diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java index 2bf6928b5eb0..d3680eb53e3d 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java @@ -76,7 +76,6 @@ protected String jdbcType() { public static class MySqlReadSchemaTransform extends JdbcReadSchemaTransform { public MySqlReadSchemaTransform(JdbcReadSchemaTransformConfiguration config) { super(config, MYSQL); - config.validate(MYSQL); } } } diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java index a283a64d29c4..600b9ea2af46 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java @@ -61,7 +61,6 @@ protected String jdbcType() { public static class MySqlWriteSchemaTransform extends JdbcWriteSchemaTransform { public MySqlWriteSchemaTransform(JdbcWriteSchemaTransformConfiguration config) { super(config, MYSQL); - config.validate(MYSQL); } } } From 3121b0238da9ee6730652ab629aa471cf9e1e26c Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 10 Sep 2025 10:29:53 -0400 Subject: [PATCH 5/6] Allow jdbcType to be empty. --- .../io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java | 2 +- .../io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java index d3680eb53e3d..06d933a58d7c 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java @@ -56,7 +56,7 @@ protected String jdbcType() { public @UnknownKeyFor @NonNull @Initialized SchemaTransform from( JdbcReadSchemaTransformConfiguration configuration) { String jdbcType = configuration.getJdbcType(); - if (jdbcType != null && !jdbcType.equals(jdbcType())) { + if (jdbcType != null && !jdbcType.isEmpty() && !jdbcType.equals(jdbcType())) { throw new IllegalArgumentException( String.format("Wrong JDBC type. Expected '%s' but got '%s'", jdbcType(), jdbcType)); } diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java index 600b9ea2af46..02ced3366073 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java @@ -51,7 +51,7 @@ protected String jdbcType() { public @UnknownKeyFor @NonNull @Initialized SchemaTransform from( JdbcWriteSchemaTransformConfiguration configuration) { String jdbcType = configuration.getJdbcType(); - if (jdbcType != null && !jdbcType.equals(jdbcType())) { + if (jdbcType != null && !jdbcType.isEmpty() && !jdbcType.equals(jdbcType())) { throw new IllegalArgumentException( String.format("Wrong JDBC type. Expected '%s' but got '%s'", jdbcType(), jdbcType)); } From d23075545c4619dd7e56df7ad811d844643fa5ff Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 17 Sep 2025 12:02:41 -0400 Subject: [PATCH 6/6] Address reviewer's comments. --- .../ReadFromMySqlSchemaTransformProvider.java | 12 ++++++++---- .../WriteToMySqlSchemaTransformProvider.java | 13 +++++++++++-- .../MysqlSchemaTransformTranslationTest.java | 2 +- 3 files changed, 20 insertions(+), 7 deletions(-) diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java index 06d933a58d7c..b51ee7236415 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java @@ -57,8 +57,12 @@ protected String jdbcType() { JdbcReadSchemaTransformConfiguration configuration) { String jdbcType = configuration.getJdbcType(); if (jdbcType != null && !jdbcType.isEmpty() && !jdbcType.equals(jdbcType())) { - throw new IllegalArgumentException( - String.format("Wrong JDBC type. Expected '%s' but got '%s'", jdbcType(), jdbcType)); + LOG.warn( + "Wrong JDBC type. Expected '{}' but got '{}'. Overriding with '{}'.", + jdbcType(), + jdbcType, + jdbcType()); + configuration = configuration.toBuilder().setJdbcType(jdbcType()).build(); } Integer fetchSize = configuration.getFetchSize(); @@ -66,8 +70,8 @@ protected String jdbcType() { && fetchSize > 0 && configuration.getJdbcUrl() != null && !configuration.getJdbcUrl().contains("useCursorFetch=true")) { - LOG.warn( - "The fetchSize option is ignored. It is required to set useCursorFetch=true" + throw new IllegalArgumentException( + "It is required to set useCursorFetch=true" + " in the JDBC URL when using fetchSize for MySQL"); } return new MySqlReadSchemaTransform(configuration); diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java index 02ced3366073..9f38fccf65ba 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java @@ -28,10 +28,15 @@ import org.checkerframework.checker.initialization.qual.Initialized; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.UnknownKeyFor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @AutoService(SchemaTransformProvider.class) public class WriteToMySqlSchemaTransformProvider extends JdbcWriteSchemaTransformProvider { + private static final Logger LOG = + LoggerFactory.getLogger(WriteToMySqlSchemaTransformProvider.class); + @Override public @UnknownKeyFor @NonNull @Initialized String identifier() { return getUrn(ExternalTransforms.ManagedTransforms.Urns.MYSQL_WRITE); @@ -52,8 +57,12 @@ protected String jdbcType() { JdbcWriteSchemaTransformConfiguration configuration) { String jdbcType = configuration.getJdbcType(); if (jdbcType != null && !jdbcType.isEmpty() && !jdbcType.equals(jdbcType())) { - throw new IllegalArgumentException( - String.format("Wrong JDBC type. Expected '%s' but got '%s'", jdbcType(), jdbcType)); + LOG.warn( + "Wrong JDBC type. Expected '{}' but got '{}'. Overriding with '{}'.", + jdbcType(), + jdbcType, + jdbcType()); + configuration = configuration.toBuilder().setJdbcType(jdbcType()).build(); } return new MySqlWriteSchemaTransform(configuration); } diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/providers/MysqlSchemaTransformTranslationTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/providers/MysqlSchemaTransformTranslationTest.java index dbcd3cecf8bc..cfc48b6a8a0b 100644 --- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/providers/MysqlSchemaTransformTranslationTest.java +++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/providers/MysqlSchemaTransformTranslationTest.java @@ -71,7 +71,7 @@ public class MysqlSchemaTransformTranslationTest { .withFieldValue("driver_class_name", null) .withFieldValue("driver_jars", null) .withFieldValue("disable_auto_commit", true) - .withFieldValue("fetch_size", 10) + .withFieldValue("fetch_size", null) .withFieldValue("num_partitions", 5) .withFieldValue("output_parallelization", true) .withFieldValue("partition_column", "col")