diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto index 02a5dd18e2c6..31232eb60671 100644 --- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto +++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto @@ -80,6 +80,10 @@ message ManagedTransforms { "beam:schematransform:org.apache.beam:postgres_read:v1"]; POSTGRES_WRITE = 8 [(org.apache.beam.model.pipeline.v1.beam_urn) = "beam:schematransform:org.apache.beam:postgres_write:v1"]; + MYSQL_READ = 9 [(org.apache.beam.model.pipeline.v1.beam_urn) = + "beam:schematransform:org.apache.beam:mysql_read:v1"]; + MYSQL_WRITE = 10 [(org.apache.beam.model.pipeline.v1.beam_urn) = + "beam:schematransform:org.apache.beam:mysql_write:v1"]; } } diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/MySqlSchemaTransformTranslation.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/MySqlSchemaTransformTranslation.java new file mode 100644 index 000000000000..3367248b7198 --- /dev/null +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/MySqlSchemaTransformTranslation.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.jdbc.providers; + +import static org.apache.beam.sdk.io.jdbc.providers.ReadFromMySqlSchemaTransformProvider.MySqlReadSchemaTransform; +import static org.apache.beam.sdk.io.jdbc.providers.WriteToMySqlSchemaTransformProvider.MySqlWriteSchemaTransform; +import static org.apache.beam.sdk.schemas.transforms.SchemaTransformTranslation.SchemaTransformPayloadTranslator; + +import com.google.auto.service.AutoService; +import java.util.Map; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.construction.PTransformTranslation; +import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; + +public class MySqlSchemaTransformTranslation { + static class MySqlReadSchemaTransformTranslator + extends SchemaTransformPayloadTranslator { + @Override + public SchemaTransformProvider provider() { + return new ReadFromMySqlSchemaTransformProvider(); + } + + @Override + public Row toConfigRow(MySqlReadSchemaTransform transform) { + return transform.getConfigurationRow(); + } + } + + @AutoService(TransformPayloadTranslatorRegistrar.class) + public static class ReadRegistrar implements TransformPayloadTranslatorRegistrar { + @Override + @SuppressWarnings({ + "rawtypes", + }) + public Map< + ? extends Class, + ? extends PTransformTranslation.TransformPayloadTranslator> + getTransformPayloadTranslators() { + return ImmutableMap + ., PTransformTranslation.TransformPayloadTranslator>builder() + .put(MySqlReadSchemaTransform.class, new MySqlReadSchemaTransformTranslator()) + .build(); + } + } + + static class MySqlWriteSchemaTransformTranslator + extends SchemaTransformPayloadTranslator { + @Override + public SchemaTransformProvider provider() { + return new WriteToMySqlSchemaTransformProvider(); + } + + @Override + public Row toConfigRow(MySqlWriteSchemaTransform transform) { + return transform.getConfigurationRow(); + } + } + + @AutoService(TransformPayloadTranslatorRegistrar.class) + public static class WriteRegistrar implements TransformPayloadTranslatorRegistrar { + @Override + @SuppressWarnings({ + "rawtypes", + }) + public Map< + ? extends Class, + ? extends PTransformTranslation.TransformPayloadTranslator> + getTransformPayloadTranslators() { + return ImmutableMap + ., PTransformTranslation.TransformPayloadTranslator>builder() + .put(MySqlWriteSchemaTransform.class, new MySqlWriteSchemaTransformTranslator()) + .build(); + } + } +} diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java index 3d0135ef8ecd..b51ee7236415 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromMySqlSchemaTransformProvider.java @@ -18,20 +18,28 @@ package org.apache.beam.sdk.io.jdbc.providers; import static org.apache.beam.sdk.io.jdbc.JdbcUtil.MYSQL; +import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn; import com.google.auto.service.AutoService; +import org.apache.beam.model.pipeline.v1.ExternalTransforms; import org.apache.beam.sdk.io.jdbc.JdbcReadSchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.checkerframework.checker.initialization.qual.Initialized; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.UnknownKeyFor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @AutoService(SchemaTransformProvider.class) public class ReadFromMySqlSchemaTransformProvider extends JdbcReadSchemaTransformProvider { + private static final Logger LOG = + LoggerFactory.getLogger(ReadFromMySqlSchemaTransformProvider.class); + @Override public @UnknownKeyFor @NonNull @Initialized String identifier() { - return "beam:schematransform:org.apache.beam:mysql_read:v1"; + return getUrn(ExternalTransforms.ManagedTransforms.Urns.MYSQL_READ); } @Override @@ -43,4 +51,35 @@ public String description() { protected String jdbcType() { return MYSQL; } + + @Override + public @UnknownKeyFor @NonNull @Initialized SchemaTransform from( + JdbcReadSchemaTransformConfiguration configuration) { + String jdbcType = configuration.getJdbcType(); + if (jdbcType != null && !jdbcType.isEmpty() && !jdbcType.equals(jdbcType())) { + LOG.warn( + "Wrong JDBC type. Expected '{}' but got '{}'. Overriding with '{}'.", + jdbcType(), + jdbcType, + jdbcType()); + configuration = configuration.toBuilder().setJdbcType(jdbcType()).build(); + } + + Integer fetchSize = configuration.getFetchSize(); + if (fetchSize != null + && fetchSize > 0 + && configuration.getJdbcUrl() != null + && !configuration.getJdbcUrl().contains("useCursorFetch=true")) { + throw new IllegalArgumentException( + "It is required to set useCursorFetch=true" + + " in the JDBC URL when using fetchSize for MySQL"); + } + return new MySqlReadSchemaTransform(configuration); + } + + public static class MySqlReadSchemaTransform extends JdbcReadSchemaTransform { + public MySqlReadSchemaTransform(JdbcReadSchemaTransformConfiguration config) { + super(config, MYSQL); + } + } } diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java index 57f085220162..9f38fccf65ba 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToMySqlSchemaTransformProvider.java @@ -18,20 +18,28 @@ package org.apache.beam.sdk.io.jdbc.providers; import static org.apache.beam.sdk.io.jdbc.JdbcUtil.MYSQL; +import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn; import com.google.auto.service.AutoService; +import org.apache.beam.model.pipeline.v1.ExternalTransforms; import org.apache.beam.sdk.io.jdbc.JdbcWriteSchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.checkerframework.checker.initialization.qual.Initialized; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.UnknownKeyFor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @AutoService(SchemaTransformProvider.class) public class WriteToMySqlSchemaTransformProvider extends JdbcWriteSchemaTransformProvider { + private static final Logger LOG = + LoggerFactory.getLogger(WriteToMySqlSchemaTransformProvider.class); + @Override public @UnknownKeyFor @NonNull @Initialized String identifier() { - return "beam:schematransform:org.apache.beam:mysql_write:v1"; + return getUrn(ExternalTransforms.ManagedTransforms.Urns.MYSQL_WRITE); } @Override @@ -43,4 +51,25 @@ public String description() { protected String jdbcType() { return MYSQL; } + + @Override + public @UnknownKeyFor @NonNull @Initialized SchemaTransform from( + JdbcWriteSchemaTransformConfiguration configuration) { + String jdbcType = configuration.getJdbcType(); + if (jdbcType != null && !jdbcType.isEmpty() && !jdbcType.equals(jdbcType())) { + LOG.warn( + "Wrong JDBC type. Expected '{}' but got '{}'. Overriding with '{}'.", + jdbcType(), + jdbcType, + jdbcType()); + configuration = configuration.toBuilder().setJdbcType(jdbcType()).build(); + } + return new MySqlWriteSchemaTransform(configuration); + } + + public static class MySqlWriteSchemaTransform extends JdbcWriteSchemaTransform { + public MySqlWriteSchemaTransform(JdbcWriteSchemaTransformConfiguration config) { + super(config, MYSQL); + } + } } diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/providers/MysqlSchemaTransformTranslationTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/providers/MysqlSchemaTransformTranslationTest.java new file mode 100644 index 000000000000..cfc48b6a8a0b --- /dev/null +++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/providers/MysqlSchemaTransformTranslationTest.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.jdbc.providers; + +import static org.apache.beam.model.pipeline.v1.ExternalTransforms.ExpansionMethods.Enum.SCHEMA_TRANSFORM; +import static org.apache.beam.sdk.io.jdbc.providers.MySqlSchemaTransformTranslation.MySqlReadSchemaTransformTranslator; +import static org.apache.beam.sdk.io.jdbc.providers.MySqlSchemaTransformTranslation.MySqlWriteSchemaTransformTranslator; +import static org.apache.beam.sdk.io.jdbc.providers.ReadFromMySqlSchemaTransformProvider.MySqlReadSchemaTransform; +import static org.apache.beam.sdk.io.jdbc.providers.WriteToMySqlSchemaTransformProvider.MySqlWriteSchemaTransform; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.ExternalTransforms.SchemaTransformPayload; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.RowCoder; +import org.apache.beam.sdk.io.jdbc.JdbcIO; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaTranslation; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.util.construction.BeamUrns; +import org.apache.beam.sdk.util.construction.PipelineTranslation; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.InvalidProtocolBufferException; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; +import org.mockito.MockedStatic; +import org.mockito.Mockito; + +public class MysqlSchemaTransformTranslationTest { + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Rule public transient ExpectedException thrown = ExpectedException.none(); + + static final WriteToMySqlSchemaTransformProvider WRITE_PROVIDER = + new WriteToMySqlSchemaTransformProvider(); + static final ReadFromMySqlSchemaTransformProvider READ_PROVIDER = + new ReadFromMySqlSchemaTransformProvider(); + + static final Row READ_CONFIG = + Row.withSchema(READ_PROVIDER.configurationSchema()) + .withFieldValue("jdbc_url", "jdbc:mysql://host:port/database") + .withFieldValue("location", "test_table") + .withFieldValue("connection_properties", "some_property") + .withFieldValue("connection_init_sql", ImmutableList.builder().build()) + .withFieldValue("driver_class_name", null) + .withFieldValue("driver_jars", null) + .withFieldValue("disable_auto_commit", true) + .withFieldValue("fetch_size", null) + .withFieldValue("num_partitions", 5) + .withFieldValue("output_parallelization", true) + .withFieldValue("partition_column", "col") + .withFieldValue("read_query", null) + .withFieldValue("username", "my_user") + .withFieldValue("password", "my_pass") + .build(); + + static final Row WRITE_CONFIG = + Row.withSchema(WRITE_PROVIDER.configurationSchema()) + .withFieldValue("jdbc_url", "jdbc:mysql://host:port/database") + .withFieldValue("location", "test_table") + .withFieldValue("autosharding", true) + .withFieldValue("connection_init_sql", ImmutableList.builder().build()) + .withFieldValue("connection_properties", "some_property") + .withFieldValue("driver_class_name", null) + .withFieldValue("driver_jars", null) + .withFieldValue("batch_size", 100L) + .withFieldValue("username", "my_user") + .withFieldValue("password", "my_pass") + .withFieldValue("write_statement", null) + .build(); + + @Test + public void testRecreateWriteTransformFromRow() { + MySqlWriteSchemaTransform writeTransform = + (MySqlWriteSchemaTransform) WRITE_PROVIDER.from(WRITE_CONFIG); + + MySqlWriteSchemaTransformTranslator translator = new MySqlWriteSchemaTransformTranslator(); + Row translatedRow = translator.toConfigRow(writeTransform); + + MySqlWriteSchemaTransform writeTransformFromRow = + translator.fromConfigRow(translatedRow, PipelineOptionsFactory.create()); + + assertEquals(WRITE_CONFIG, writeTransformFromRow.getConfigurationRow()); + } + + @Test + public void testWriteTransformProtoTranslation() + throws InvalidProtocolBufferException, IOException { + // First build a pipeline + Pipeline p = Pipeline.create(); + Schema inputSchema = Schema.builder().addStringField("name").build(); + PCollection input = + p.apply( + Create.of( + Collections.singletonList( + Row.withSchema(inputSchema).addValue("test").build()))) + .setRowSchema(inputSchema); + + MySqlWriteSchemaTransform writeTransform = + (MySqlWriteSchemaTransform) WRITE_PROVIDER.from(WRITE_CONFIG); + PCollectionRowTuple.of("input", input).apply(writeTransform); + + // Then translate the pipeline to a proto and extract MySqlWriteSchemaTransform proto + RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p); + List writeTransformProto = + pipelineProto.getComponents().getTransformsMap().values().stream() + .filter( + tr -> { + RunnerApi.FunctionSpec spec = tr.getSpec(); + try { + return spec.getUrn().equals(BeamUrns.getUrn(SCHEMA_TRANSFORM)) + && SchemaTransformPayload.parseFrom(spec.getPayload()) + .getIdentifier() + .equals(WRITE_PROVIDER.identifier()); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toList()); + assertEquals(1, writeTransformProto.size()); + RunnerApi.FunctionSpec spec = writeTransformProto.get(0).getSpec(); + + // Check that the proto contains correct values + SchemaTransformPayload payload = SchemaTransformPayload.parseFrom(spec.getPayload()); + Schema schemaFromSpec = SchemaTranslation.schemaFromProto(payload.getConfigurationSchema()); + assertEquals(WRITE_PROVIDER.configurationSchema(), schemaFromSpec); + Row rowFromSpec = RowCoder.of(schemaFromSpec).decode(payload.getConfigurationRow().newInput()); + + assertEquals(WRITE_CONFIG, rowFromSpec); + + // Use the information in the proto to recreate the MySqlWriteSchemaTransform + MySqlWriteSchemaTransformTranslator translator = new MySqlWriteSchemaTransformTranslator(); + MySqlWriteSchemaTransform writeTransformFromSpec = + translator.fromConfigRow(rowFromSpec, PipelineOptionsFactory.create()); + + assertEquals(WRITE_CONFIG, writeTransformFromSpec.getConfigurationRow()); + } + + @Test + public void testReCreateReadTransformFromRow() { + // setting a subset of fields here. + MySqlReadSchemaTransform readTransform = + (MySqlReadSchemaTransform) READ_PROVIDER.from(READ_CONFIG); + + MySqlReadSchemaTransformTranslator translator = new MySqlReadSchemaTransformTranslator(); + Row row = translator.toConfigRow(readTransform); + + MySqlReadSchemaTransform readTransformFromRow = + translator.fromConfigRow(row, PipelineOptionsFactory.create()); + + assertEquals(READ_CONFIG, readTransformFromRow.getConfigurationRow()); + } + + @Test + public void testReadTransformProtoTranslation() + throws InvalidProtocolBufferException, IOException { + // First build a pipeline + Pipeline p = Pipeline.create(); + + MySqlReadSchemaTransform readTransform = + (MySqlReadSchemaTransform) READ_PROVIDER.from(READ_CONFIG); + + // Mock inferBeamSchema since it requires database connection. + Schema expectedSchema = Schema.builder().addStringField("name").build(); + try (MockedStatic mock = Mockito.mockStatic(JdbcIO.ReadRows.class)) { + mock.when(() -> JdbcIO.ReadRows.inferBeamSchema(Mockito.any(), Mockito.any())) + .thenReturn(expectedSchema); + PCollectionRowTuple.empty(p).apply(readTransform); + } + + // Then translate the pipeline to a proto and extract MySqlReadSchemaTransform proto + RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p); + List readTransformProto = + pipelineProto.getComponents().getTransformsMap().values().stream() + .filter( + tr -> { + RunnerApi.FunctionSpec spec = tr.getSpec(); + try { + return spec.getUrn().equals(BeamUrns.getUrn(SCHEMA_TRANSFORM)) + && SchemaTransformPayload.parseFrom(spec.getPayload()) + .getIdentifier() + .equals(READ_PROVIDER.identifier()); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toList()); + assertEquals(1, readTransformProto.size()); + RunnerApi.FunctionSpec spec = readTransformProto.get(0).getSpec(); + + // Check that the proto contains correct values + SchemaTransformPayload payload = SchemaTransformPayload.parseFrom(spec.getPayload()); + Schema schemaFromSpec = SchemaTranslation.schemaFromProto(payload.getConfigurationSchema()); + assertEquals(READ_PROVIDER.configurationSchema(), schemaFromSpec); + Row rowFromSpec = RowCoder.of(schemaFromSpec).decode(payload.getConfigurationRow().newInput()); + assertEquals(READ_CONFIG, rowFromSpec); + + // Use the information in the proto to recreate the MySqlReadSchemaTransform + MySqlReadSchemaTransformTranslator translator = new MySqlReadSchemaTransformTranslator(); + MySqlReadSchemaTransform readTransformFromSpec = + translator.fromConfigRow(rowFromSpec, PipelineOptionsFactory.create()); + + assertEquals(READ_CONFIG, readTransformFromSpec.getConfigurationRow()); + } +} diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java index cda84629a7d7..4f45eeac861e 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java @@ -97,6 +97,7 @@ public class Managed { public static final String KAFKA = "kafka"; public static final String BIGQUERY = "bigquery"; public static final String POSTGRES = "postgres"; + public static final String MYSQL = "mysql"; // Supported SchemaTransforms public static final Map READ_TRANSFORMS = @@ -106,6 +107,7 @@ public class Managed { .put(KAFKA, getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_READ)) .put(BIGQUERY, getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_READ)) .put(POSTGRES, getUrn(ExternalTransforms.ManagedTransforms.Urns.POSTGRES_READ)) + .put(MYSQL, getUrn(ExternalTransforms.ManagedTransforms.Urns.MYSQL_READ)) .build(); public static final Map WRITE_TRANSFORMS = ImmutableMap.builder() @@ -113,6 +115,7 @@ public class Managed { .put(KAFKA, getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_WRITE)) .put(BIGQUERY, getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_WRITE)) .put(POSTGRES, getUrn(ExternalTransforms.ManagedTransforms.Urns.POSTGRES_WRITE)) + .put(MYSQL, getUrn(ExternalTransforms.ManagedTransforms.Urns.MYSQL_WRITE)) .build(); /** diff --git a/sdks/python/apache_beam/transforms/external.py b/sdks/python/apache_beam/transforms/external.py index b22ed6e0c645..3f9f56a54139 100644 --- a/sdks/python/apache_beam/transforms/external.py +++ b/sdks/python/apache_beam/transforms/external.py @@ -83,6 +83,8 @@ ManagedTransforms.Urns.BIGQUERY_WRITE.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET, # pylint: disable=line-too-long ManagedTransforms.Urns.POSTGRES_READ.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET, ManagedTransforms.Urns.POSTGRES_WRITE.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET, # pylint: disable=line-too-long + ManagedTransforms.Urns.MYSQL_READ.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET, + ManagedTransforms.Urns.MYSQL_WRITE.urn: _GCP_EXPANSION_SERVICE_JAR_TARGET, } diff --git a/sdks/python/apache_beam/transforms/managed.py b/sdks/python/apache_beam/transforms/managed.py index 72dfb6fd9a0a..03449236ac92 100644 --- a/sdks/python/apache_beam/transforms/managed.py +++ b/sdks/python/apache_beam/transforms/managed.py @@ -86,6 +86,7 @@ KAFKA = "kafka" BIGQUERY = "bigquery" POSTGRES = "postgres" +MYSQL = "mysql" __all__ = ["ICEBERG", "KAFKA", "BIGQUERY", "Read", "Write"] @@ -98,6 +99,7 @@ class Read(PTransform): KAFKA: ManagedTransforms.Urns.KAFKA_READ.urn, BIGQUERY: ManagedTransforms.Urns.BIGQUERY_READ.urn, POSTGRES: ManagedTransforms.Urns.POSTGRES_READ.urn, + MYSQL: ManagedTransforms.Urns.MYSQL_READ.urn, } def __init__( @@ -140,6 +142,7 @@ class Write(PTransform): KAFKA: ManagedTransforms.Urns.KAFKA_WRITE.urn, BIGQUERY: ManagedTransforms.Urns.BIGQUERY_WRITE.urn, POSTGRES: ManagedTransforms.Urns.POSTGRES_WRITE.urn, + MYSQL: ManagedTransforms.Urns.MYSQL_WRITE.urn, } def __init__(