From 65bd7c0bda0a878fbcfb06669d930241a89b3830 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Sun, 21 Sep 2025 22:47:33 -0400 Subject: [PATCH 1/2] Add sqlserver read and write to managed io --- .../pipeline/v1/external_transforms.proto | 4 + ...dFromSqlServerSchemaTransformProvider.java | 43 +++- .../SqlServerSchemaTransformTranslation.java | 93 +++++++ ...iteToSqlServerSchemaTransformProvider.java | 43 +++- ...lServerSchemaTransformTranslationTest.java | 235 ++++++++++++++++++ .../org/apache/beam/sdk/managed/Managed.java | 3 + sdks/python/apache_beam/transforms/managed.py | 3 + 7 files changed, 422 insertions(+), 2 deletions(-) create mode 100644 sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/SqlServerSchemaTransformTranslation.java create mode 100644 sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/providers/SqlServerSchemaTransformTranslationTest.java diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto index 31232eb60671..043a72dd34f2 100644 --- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto +++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto @@ -84,6 +84,10 @@ message ManagedTransforms { "beam:schematransform:org.apache.beam:mysql_read:v1"]; MYSQL_WRITE = 10 [(org.apache.beam.model.pipeline.v1.beam_urn) = "beam:schematransform:org.apache.beam:mysql_write:v1"]; + SQL_SERVER_READ = 11 [(org.apache.beam.model.pipeline.v1.beam_urn) = + "beam:schematransform:org.apache.beam:sql_server_read:v1"]; + SQL_SERVER_WRITE = 12 [(org.apache.beam.model.pipeline.v1.beam_urn) = + "beam:schematransform:org.apache.beam:sql_server_write:v1"]; } } diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromSqlServerSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromSqlServerSchemaTransformProvider.java index e4767177bb2f..8a4f02d775a8 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromSqlServerSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromSqlServerSchemaTransformProvider.java @@ -18,20 +18,30 @@ package org.apache.beam.sdk.io.jdbc.providers; import static org.apache.beam.sdk.io.jdbc.JdbcUtil.MSSQL; +import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn; import com.google.auto.service.AutoService; +import java.util.Collections; +import java.util.List; +import org.apache.beam.model.pipeline.v1.ExternalTransforms; import org.apache.beam.sdk.io.jdbc.JdbcReadSchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.checkerframework.checker.initialization.qual.Initialized; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.UnknownKeyFor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @AutoService(SchemaTransformProvider.class) public class ReadFromSqlServerSchemaTransformProvider extends JdbcReadSchemaTransformProvider { + private static final Logger LOG = + LoggerFactory.getLogger(ReadFromSqlServerSchemaTransformProvider.class); + @Override public @UnknownKeyFor @NonNull @Initialized String identifier() { - return "beam:schematransform:org.apache.beam:sql_server_read:v1"; + return getUrn(ExternalTransforms.ManagedTransforms.Urns.SQL_SERVER_READ); } @Override @@ -43,4 +53,35 @@ public String description() { protected String jdbcType() { return MSSQL; } + + @Override + public @UnknownKeyFor @NonNull @Initialized SchemaTransform from( + JdbcReadSchemaTransformConfiguration configuration) { + String jdbcType = configuration.getJdbcType(); + if (jdbcType != null && !jdbcType.isEmpty() && !jdbcType.equals(jdbcType())) { + LOG.warn( + "Wrong JDBC type. Expected '{}' but got '{}'. Overriding with '{}'.", + jdbcType(), + jdbcType, + jdbcType()); + configuration = configuration.toBuilder().setJdbcType(jdbcType()).build(); + } + + List<@org.checkerframework.checker.nullness.qual.Nullable String> connectionInitSql = + configuration.getConnectionInitSql(); + if (connectionInitSql != null && !connectionInitSql.isEmpty()) { + LOG.warn("SQL Server does not support connectionInitSql, ignoring."); + } + + // Override "connectionInitSql" for sqlserver + configuration = configuration.toBuilder().setConnectionInitSql(Collections.emptyList()).build(); + return new SqlServerReadSchemaTransform(configuration); + } + + public static class SqlServerReadSchemaTransform extends JdbcReadSchemaTransform { + public SqlServerReadSchemaTransform(JdbcReadSchemaTransformConfiguration config) { + super(config, MSSQL); + config.validate(MSSQL); + } + } } diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/SqlServerSchemaTransformTranslation.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/SqlServerSchemaTransformTranslation.java new file mode 100644 index 000000000000..cea52f8d9620 --- /dev/null +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/SqlServerSchemaTransformTranslation.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.jdbc.providers; + +import static org.apache.beam.sdk.io.jdbc.providers.ReadFromSqlServerSchemaTransformProvider.SqlServerReadSchemaTransform; +import static org.apache.beam.sdk.io.jdbc.providers.WriteToSqlServerSchemaTransformProvider.SqlServerWriteSchemaTransform; +import static org.apache.beam.sdk.schemas.transforms.SchemaTransformTranslation.SchemaTransformPayloadTranslator; + +import com.google.auto.service.AutoService; +import java.util.Map; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.construction.PTransformTranslation; +import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; + +public class SqlServerSchemaTransformTranslation { + static class SqlServerReadSchemaTransformTranslator + extends SchemaTransformPayloadTranslator { + @Override + public SchemaTransformProvider provider() { + return new ReadFromSqlServerSchemaTransformProvider(); + } + + @Override + public Row toConfigRow(SqlServerReadSchemaTransform transform) { + return transform.getConfigurationRow(); + } + } + + @AutoService(TransformPayloadTranslatorRegistrar.class) + public static class ReadRegistrar implements TransformPayloadTranslatorRegistrar { + @Override + @SuppressWarnings({ + "rawtypes", + }) + public Map< + ? extends Class, + ? extends PTransformTranslation.TransformPayloadTranslator> + getTransformPayloadTranslators() { + return ImmutableMap + ., PTransformTranslation.TransformPayloadTranslator>builder() + .put(SqlServerReadSchemaTransform.class, new SqlServerReadSchemaTransformTranslator()) + .build(); + } + } + + static class SqlServerWriteSchemaTransformTranslator + extends SchemaTransformPayloadTranslator { + @Override + public SchemaTransformProvider provider() { + return new WriteToSqlServerSchemaTransformProvider(); + } + + @Override + public Row toConfigRow(SqlServerWriteSchemaTransform transform) { + return transform.getConfigurationRow(); + } + } + + @AutoService(TransformPayloadTranslatorRegistrar.class) + public static class WriteRegistrar implements TransformPayloadTranslatorRegistrar { + @Override + @SuppressWarnings({ + "rawtypes", + }) + public Map< + ? extends Class, + ? extends PTransformTranslation.TransformPayloadTranslator> + getTransformPayloadTranslators() { + return ImmutableMap + ., PTransformTranslation.TransformPayloadTranslator>builder() + .put(SqlServerWriteSchemaTransform.class, new SqlServerWriteSchemaTransformTranslator()) + .build(); + } + } +} diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToSqlServerSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToSqlServerSchemaTransformProvider.java index 9e849f4e49e2..42986399ae47 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToSqlServerSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToSqlServerSchemaTransformProvider.java @@ -18,20 +18,30 @@ package org.apache.beam.sdk.io.jdbc.providers; import static org.apache.beam.sdk.io.jdbc.JdbcUtil.MSSQL; +import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn; import com.google.auto.service.AutoService; +import java.util.Collections; +import java.util.List; +import org.apache.beam.model.pipeline.v1.ExternalTransforms; import org.apache.beam.sdk.io.jdbc.JdbcWriteSchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.checkerframework.checker.initialization.qual.Initialized; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.UnknownKeyFor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @AutoService(SchemaTransformProvider.class) public class WriteToSqlServerSchemaTransformProvider extends JdbcWriteSchemaTransformProvider { + private static final Logger LOG = + LoggerFactory.getLogger(WriteToSqlServerSchemaTransformProvider.class); + @Override public @UnknownKeyFor @NonNull @Initialized String identifier() { - return "beam:schematransform:org.apache.beam:sql_server_write:v1"; + return getUrn(ExternalTransforms.ManagedTransforms.Urns.SQL_SERVER_WRITE); } @Override @@ -43,4 +53,35 @@ public String description() { protected String jdbcType() { return MSSQL; } + + @Override + public @UnknownKeyFor @NonNull @Initialized SchemaTransform from( + JdbcWriteSchemaTransformConfiguration configuration) { + String jdbcType = configuration.getJdbcType(); + if (jdbcType != null && !jdbcType.isEmpty() && !jdbcType.equals(jdbcType())) { + LOG.warn( + "Wrong JDBC type. Expected '{}' but got '{}'. Overriding with '{}'.", + jdbcType(), + jdbcType, + jdbcType()); + configuration = configuration.toBuilder().setJdbcType(jdbcType()).build(); + } + + List<@org.checkerframework.checker.nullness.qual.Nullable String> connectionInitSql = + configuration.getConnectionInitSql(); + if (connectionInitSql != null && !connectionInitSql.isEmpty()) { + LOG.warn("SQL Server does not support connectionInitSql, ignoring."); + } + + // Override "connectionInitSql" for sqlserver + configuration = configuration.toBuilder().setConnectionInitSql(Collections.emptyList()).build(); + return new SqlServerWriteSchemaTransform(configuration); + } + + public static class SqlServerWriteSchemaTransform extends JdbcWriteSchemaTransform { + public SqlServerWriteSchemaTransform(JdbcWriteSchemaTransformConfiguration config) { + super(config, MSSQL); + config.validate(MSSQL); + } + } } diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/providers/SqlServerSchemaTransformTranslationTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/providers/SqlServerSchemaTransformTranslationTest.java new file mode 100644 index 000000000000..d8890987fbf2 --- /dev/null +++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/providers/SqlServerSchemaTransformTranslationTest.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.jdbc.providers; + +import static org.apache.beam.model.pipeline.v1.ExternalTransforms.ExpansionMethods.Enum.SCHEMA_TRANSFORM; +import static org.apache.beam.sdk.io.jdbc.providers.ReadFromSqlServerSchemaTransformProvider.SqlServerReadSchemaTransform; +import static org.apache.beam.sdk.io.jdbc.providers.SqlServerSchemaTransformTranslation.SqlServerReadSchemaTransformTranslator; +import static org.apache.beam.sdk.io.jdbc.providers.SqlServerSchemaTransformTranslation.SqlServerWriteSchemaTransformTranslator; +import static org.apache.beam.sdk.io.jdbc.providers.WriteToSqlServerSchemaTransformProvider.SqlServerWriteSchemaTransform; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.ExternalTransforms.SchemaTransformPayload; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.RowCoder; +import org.apache.beam.sdk.io.jdbc.JdbcIO; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaTranslation; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.util.construction.BeamUrns; +import org.apache.beam.sdk.util.construction.PipelineTranslation; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.InvalidProtocolBufferException; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; +import org.mockito.MockedStatic; +import org.mockito.Mockito; + +public class SqlServerSchemaTransformTranslationTest { + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Rule public transient ExpectedException thrown = ExpectedException.none(); + + static final WriteToSqlServerSchemaTransformProvider WRITE_PROVIDER = + new WriteToSqlServerSchemaTransformProvider(); + static final ReadFromSqlServerSchemaTransformProvider READ_PROVIDER = + new ReadFromSqlServerSchemaTransformProvider(); + + static final Row READ_CONFIG = + Row.withSchema(READ_PROVIDER.configurationSchema()) + .withFieldValue("jdbc_url", "jdbc:sqlserver://host:port;databaseName=database") + .withFieldValue("location", "test_table") + .withFieldValue("connection_properties", "some_property") + .withFieldValue("connection_init_sql", ImmutableList.builder().build()) + .withFieldValue("driver_class_name", null) + .withFieldValue("driver_jars", null) + .withFieldValue("disable_auto_commit", true) + .withFieldValue("fetch_size", 10) + .withFieldValue("num_partitions", 5) + .withFieldValue("output_parallelization", true) + .withFieldValue("partition_column", "col") + .withFieldValue("read_query", null) + .withFieldValue("username", "my_user") + .withFieldValue("password", "my_pass") + .build(); + + static final Row WRITE_CONFIG = + Row.withSchema(WRITE_PROVIDER.configurationSchema()) + .withFieldValue("jdbc_url", "jdbc:sqlserver://host:port;databaseName=database") + .withFieldValue("location", "test_table") + .withFieldValue("autosharding", true) + .withFieldValue("connection_init_sql", ImmutableList.builder().build()) + .withFieldValue("connection_properties", "some_property") + .withFieldValue("driver_class_name", null) + .withFieldValue("driver_jars", null) + .withFieldValue("batch_size", 100L) + .withFieldValue("username", "my_user") + .withFieldValue("password", "my_pass") + .withFieldValue("write_statement", null) + .build(); + + @Test + public void testRecreateWriteTransformFromRow() { + SqlServerWriteSchemaTransform writeTransform = + (SqlServerWriteSchemaTransform) WRITE_PROVIDER.from(WRITE_CONFIG); + + SqlServerWriteSchemaTransformTranslator translator = + new SqlServerWriteSchemaTransformTranslator(); + Row translatedRow = translator.toConfigRow(writeTransform); + + SqlServerWriteSchemaTransform writeTransformFromRow = + translator.fromConfigRow(translatedRow, PipelineOptionsFactory.create()); + + assertEquals(WRITE_CONFIG, writeTransformFromRow.getConfigurationRow()); + } + + @Test + public void testWriteTransformProtoTranslation() + throws InvalidProtocolBufferException, IOException { + // First build a pipeline + Pipeline p = Pipeline.create(); + Schema inputSchema = Schema.builder().addStringField("name").build(); + PCollection input = + p.apply( + Create.of( + Collections.singletonList( + Row.withSchema(inputSchema).addValue("test").build()))) + .setRowSchema(inputSchema); + + SqlServerWriteSchemaTransform writeTransform = + (SqlServerWriteSchemaTransform) WRITE_PROVIDER.from(WRITE_CONFIG); + PCollectionRowTuple.of("input", input).apply(writeTransform); + + // Then translate the pipeline to a proto and extract SqlServerWriteSchemaTransform proto + RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p); + List writeTransformProto = + pipelineProto.getComponents().getTransformsMap().values().stream() + .filter( + tr -> { + RunnerApi.FunctionSpec spec = tr.getSpec(); + try { + return spec.getUrn().equals(BeamUrns.getUrn(SCHEMA_TRANSFORM)) + && SchemaTransformPayload.parseFrom(spec.getPayload()) + .getIdentifier() + .equals(WRITE_PROVIDER.identifier()); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toList()); + assertEquals(1, writeTransformProto.size()); + RunnerApi.FunctionSpec spec = writeTransformProto.get(0).getSpec(); + + // Check that the proto contains correct values + SchemaTransformPayload payload = SchemaTransformPayload.parseFrom(spec.getPayload()); + Schema schemaFromSpec = SchemaTranslation.schemaFromProto(payload.getConfigurationSchema()); + assertEquals(WRITE_PROVIDER.configurationSchema(), schemaFromSpec); + Row rowFromSpec = RowCoder.of(schemaFromSpec).decode(payload.getConfigurationRow().newInput()); + + assertEquals(WRITE_CONFIG, rowFromSpec); + + // Use the information in the proto to recreate the SqlServerWriteSchemaTransform + SqlServerWriteSchemaTransformTranslator translator = + new SqlServerWriteSchemaTransformTranslator(); + SqlServerWriteSchemaTransform writeTransformFromSpec = + translator.fromConfigRow(rowFromSpec, PipelineOptionsFactory.create()); + + assertEquals(WRITE_CONFIG, writeTransformFromSpec.getConfigurationRow()); + } + + @Test + public void testReCreateReadTransformFromRow() { + // setting a subset of fields here. + SqlServerReadSchemaTransform readTransform = + (SqlServerReadSchemaTransform) READ_PROVIDER.from(READ_CONFIG); + + SqlServerReadSchemaTransformTranslator translator = + new SqlServerReadSchemaTransformTranslator(); + Row row = translator.toConfigRow(readTransform); + + SqlServerReadSchemaTransform readTransformFromRow = + translator.fromConfigRow(row, PipelineOptionsFactory.create()); + + assertEquals(READ_CONFIG, readTransformFromRow.getConfigurationRow()); + } + + @Test + public void testReadTransformProtoTranslation() + throws InvalidProtocolBufferException, IOException { + // First build a pipeline + Pipeline p = Pipeline.create(); + + SqlServerReadSchemaTransform readTransform = + (SqlServerReadSchemaTransform) READ_PROVIDER.from(READ_CONFIG); + + // Mock inferBeamSchema since it requires database connection. + Schema expectedSchema = Schema.builder().addStringField("name").build(); + try (MockedStatic mock = Mockito.mockStatic(JdbcIO.ReadRows.class)) { + mock.when(() -> JdbcIO.ReadRows.inferBeamSchema(Mockito.any(), Mockito.any())) + .thenReturn(expectedSchema); + PCollectionRowTuple.empty(p).apply(readTransform); + } + + // Then translate the pipeline to a proto and extract SqlServerReadSchemaTransform proto + RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p); + List readTransformProto = + pipelineProto.getComponents().getTransformsMap().values().stream() + .filter( + tr -> { + RunnerApi.FunctionSpec spec = tr.getSpec(); + try { + return spec.getUrn().equals(BeamUrns.getUrn(SCHEMA_TRANSFORM)) + && SchemaTransformPayload.parseFrom(spec.getPayload()) + .getIdentifier() + .equals(READ_PROVIDER.identifier()); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toList()); + assertEquals(1, readTransformProto.size()); + RunnerApi.FunctionSpec spec = readTransformProto.get(0).getSpec(); + + // Check that the proto contains correct values + SchemaTransformPayload payload = SchemaTransformPayload.parseFrom(spec.getPayload()); + Schema schemaFromSpec = SchemaTranslation.schemaFromProto(payload.getConfigurationSchema()); + assertEquals(READ_PROVIDER.configurationSchema(), schemaFromSpec); + Row rowFromSpec = RowCoder.of(schemaFromSpec).decode(payload.getConfigurationRow().newInput()); + assertEquals(READ_CONFIG, rowFromSpec); + + // Use the information in the proto to recreate the SqlServerReadSchemaTransform + SqlServerReadSchemaTransformTranslator translator = + new SqlServerReadSchemaTransformTranslator(); + SqlServerReadSchemaTransform readTransformFromSpec = + translator.fromConfigRow(rowFromSpec, PipelineOptionsFactory.create()); + + assertEquals(READ_CONFIG, readTransformFromSpec.getConfigurationRow()); + } +} diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java index 4f45eeac861e..a5e7d879b441 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java @@ -98,6 +98,7 @@ public class Managed { public static final String BIGQUERY = "bigquery"; public static final String POSTGRES = "postgres"; public static final String MYSQL = "mysql"; + public static final String SQL_SERVER = "sqlserver"; // Supported SchemaTransforms public static final Map READ_TRANSFORMS = @@ -108,6 +109,7 @@ public class Managed { .put(BIGQUERY, getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_READ)) .put(POSTGRES, getUrn(ExternalTransforms.ManagedTransforms.Urns.POSTGRES_READ)) .put(MYSQL, getUrn(ExternalTransforms.ManagedTransforms.Urns.MYSQL_READ)) + .put(SQL_SERVER, getUrn(ExternalTransforms.ManagedTransforms.Urns.SQL_SERVER_READ)) .build(); public static final Map WRITE_TRANSFORMS = ImmutableMap.builder() @@ -116,6 +118,7 @@ public class Managed { .put(BIGQUERY, getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_WRITE)) .put(POSTGRES, getUrn(ExternalTransforms.ManagedTransforms.Urns.POSTGRES_WRITE)) .put(MYSQL, getUrn(ExternalTransforms.ManagedTransforms.Urns.MYSQL_WRITE)) + .put(SQL_SERVER, getUrn(ExternalTransforms.ManagedTransforms.Urns.SQL_SERVER_WRITE)) .build(); /** diff --git a/sdks/python/apache_beam/transforms/managed.py b/sdks/python/apache_beam/transforms/managed.py index 03449236ac92..33ba8d41a99f 100644 --- a/sdks/python/apache_beam/transforms/managed.py +++ b/sdks/python/apache_beam/transforms/managed.py @@ -87,6 +87,7 @@ BIGQUERY = "bigquery" POSTGRES = "postgres" MYSQL = "mysql" +SQL_SERVER = "sqlserver" __all__ = ["ICEBERG", "KAFKA", "BIGQUERY", "Read", "Write"] @@ -100,6 +101,7 @@ class Read(PTransform): BIGQUERY: ManagedTransforms.Urns.BIGQUERY_READ.urn, POSTGRES: ManagedTransforms.Urns.POSTGRES_READ.urn, MYSQL: ManagedTransforms.Urns.MYSQL_READ.urn, + SQL_SERVER: ManagedTransforms.Urns.SQL_SERVER_READ.urn, } def __init__( @@ -143,6 +145,7 @@ class Write(PTransform): BIGQUERY: ManagedTransforms.Urns.BIGQUERY_WRITE.urn, POSTGRES: ManagedTransforms.Urns.POSTGRES_WRITE.urn, MYSQL: ManagedTransforms.Urns.MYSQL_WRITE.urn, + SQL_SERVER: ManagedTransforms.Urns.SQL_SERVER_WRITE.urn } def __init__( From 3cc723d936dabbefacf7c4b7a3d534ad028ca77f Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 23 Sep 2025 21:51:58 -0400 Subject: [PATCH 2/2] Address reviewer's comment. --- .../providers/ReadFromSqlServerSchemaTransformProvider.java | 2 +- .../jdbc/providers/WriteToSqlServerSchemaTransformProvider.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromSqlServerSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromSqlServerSchemaTransformProvider.java index 8a4f02d775a8..eec6660aa88b 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromSqlServerSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromSqlServerSchemaTransformProvider.java @@ -70,7 +70,7 @@ protected String jdbcType() { List<@org.checkerframework.checker.nullness.qual.Nullable String> connectionInitSql = configuration.getConnectionInitSql(); if (connectionInitSql != null && !connectionInitSql.isEmpty()) { - LOG.warn("SQL Server does not support connectionInitSql, ignoring."); + throw new IllegalArgumentException("SQL Server does not support connectionInitSql."); } // Override "connectionInitSql" for sqlserver diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToSqlServerSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToSqlServerSchemaTransformProvider.java index 42986399ae47..dc26c240958b 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToSqlServerSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToSqlServerSchemaTransformProvider.java @@ -70,7 +70,7 @@ protected String jdbcType() { List<@org.checkerframework.checker.nullness.qual.Nullable String> connectionInitSql = configuration.getConnectionInitSql(); if (connectionInitSql != null && !connectionInitSql.isEmpty()) { - LOG.warn("SQL Server does not support connectionInitSql, ignoring."); + throw new IllegalArgumentException("SQL Server does not support connectionInitSql."); } // Override "connectionInitSql" for sqlserver