From 62a4b2af71ad4f4013d702838c7f7a3367464354 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 3 May 2024 11:46:40 -0400 Subject: [PATCH 1/8] managed kafka read --- .../beam/sdk/schemas/utils/YamlUtils.java | 8 +++++ sdks/java/io/kafka/build.gradle | 1 + .../KafkaReadSchemaTransformProvider.java | 7 ++-- .../KafkaReadSchemaTransformProviderTest.java | 34 ++++++++++++++++++- .../org/apache/beam/sdk/managed/Managed.java | 7 +++- .../ManagedSchemaTransformProvider.java | 19 ++++++++++- .../managed/ManagedTransformConstants.java | 25 ++++++++++++++ 7 files changed, 94 insertions(+), 7 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java index 122f2d1963b9..e631e166e8be 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java @@ -20,6 +20,7 @@ import static org.apache.beam.sdk.values.Row.toRow; import java.math.BigDecimal; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.function.Function; @@ -181,4 +182,11 @@ public static String yamlStringFromMap(@Nullable Map map) { } return new Yaml().dumpAsMap(map); } + + public static Map yamlStringToMap(@Nullable String yaml) { + if (yaml == null || yaml.isEmpty()) { + return Collections.emptyMap(); + } + return new Yaml().load(yaml); + } } diff --git a/sdks/java/io/kafka/build.gradle b/sdks/java/io/kafka/build.gradle index 269ddb3f5eb2..3e095a2bacca 100644 --- a/sdks/java/io/kafka/build.gradle +++ b/sdks/java/io/kafka/build.gradle @@ -90,6 +90,7 @@ dependencies { provided library.java.everit_json_schema testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") testImplementation project(":sdks:java:io:synthetic") + testImplementation project(":sdks:java:managed") testImplementation project(path: ":sdks:java:extensions:avro", configuration: "testRuntimeMigration") testImplementation project(path: ":sdks:java:extensions:protobuf", configuration: "testRuntimeMigration") testImplementation project(path: ":sdks:java:io:common", configuration: "testRuntimeMigration") diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java index 2776c388f7cc..13240ea9dc40 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java @@ -151,11 +151,10 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { } }; } - - if (format.equals("RAW")) { + if ("RAW".equals(format)) { beamSchema = Schema.builder().addField("payload", Schema.FieldType.BYTES).build(); valueMapper = getRawBytesToRowFunction(beamSchema); - } else if (format.equals("PROTO")) { + } else if ("PROTO".equals(format)) { String fileDescriptorPath = configuration.getFileDescriptorPath(); String messageName = configuration.getMessageName(); if (fileDescriptorPath != null) { @@ -165,7 +164,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { beamSchema = ProtoByteUtils.getBeamSchemaFromProtoSchema(inputSchema, messageName); valueMapper = ProtoByteUtils.getProtoBytesToRowFromSchemaFunction(inputSchema, messageName); } - } else if (format.equals("JSON")) { + } else if ("JSON".equals(format)) { beamSchema = JsonUtils.beamSchemaFromJsonSchema(inputSchema); valueMapper = JsonUtils.getJsonBytesToRowFunction(beamSchema); } else { diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java index f6e231c758a5..7e0c12100780 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java @@ -22,12 +22,19 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.List; import java.util.Objects; import java.util.ServiceLoader; import java.util.stream.Collectors; import java.util.stream.StreamSupport; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.managed.Managed; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.utils.YamlUtils; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteStreams; @@ -51,7 +58,7 @@ public class KafkaReadSchemaTransformProviderTest { + " string name = 2;\n" + " bool active = 3;\n" + "\n" - + " // Nested field\n" + + " // Nested field\n\n" + " message Address {\n" + " string street = 1;\n" + " string city = 2;\n" @@ -284,4 +291,29 @@ public void testBuildTransformWithoutProtoSchemaFormat() { .setMessageName("MyMessage") .build())); } + + @Test + public void testBuildTransformWithManaged() { + List configs = Arrays.asList( + "topic: topic_1\n" + + "bootstrap_servers: some bootstrap\n" + + "data_format: RAW", + "topic: topic_2\n" + + "bootstrap_servers: some bootstrap\n" + + "schema: '{\"type\":\"record\",\"name\":\"my_record\",\"fields\":[{\"name\":\"bool\",\"type\":\"boolean\"}]}'", + "topic: topic_3\n" + + "bootstrap_servers: some bootstrap\n" + + "schema_registry_url: some-url\n" + + "schema_registry_subject: some-subject\n" + + "data_format: RAW", + "topic: topic_4\n" + + "bootstrap_servers: some bootstrap\n" + + "data_format: PROTO\n" + + "schema: '" + PROTO_SCHEMA + "'\n" + + "message_name: MyMessage"); + + for (String config: configs) { + Managed.read(Managed.KAFKA).withConfig(YamlUtils.yamlStringToMap(config)).expand(PCollectionRowTuple.empty(Pipeline.create())); + } + } } diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java index d24a3fd88ddc..7e4699597fc5 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java @@ -19,6 +19,7 @@ import static org.apache.beam.sdk.managed.ManagedTransformConstants.ICEBERG_READ; import static org.apache.beam.sdk.managed.ManagedTransformConstants.ICEBERG_WRITE; +import static org.apache.beam.sdk.managed.ManagedTransformConstants.KAFKA_READ; import com.google.auto.value.AutoValue; import java.util.ArrayList; @@ -80,10 +81,14 @@ public class Managed { // TODO: Dynamically generate a list of supported transforms public static final String ICEBERG = "iceberg"; + public static final String KAFKA = "kafka"; // Supported SchemaTransforms public static final Map READ_TRANSFORMS = - ImmutableMap.builder().put(ICEBERG, ICEBERG_READ).build(); + ImmutableMap.builder() + .put(ICEBERG, ICEBERG_READ) + .put(KAFKA, KAFKA_READ) + .build(); public static final Map WRITE_TRANSFORMS = ImmutableMap.builder().put(ICEBERG, ICEBERG_WRITE).build(); diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java index e13741e86b4a..af101ecba6df 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.managed; +import static org.apache.beam.sdk.managed.ManagedTransformConstants.MAPPINGS; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import com.google.auto.service.AutoService; @@ -176,6 +177,7 @@ static class ManagedSchemaTransform extends SchemaTransform { @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { + System.out.println("CONFIG: " + underlyingTransformConfig); return input.apply(underlyingTransformProvider.from(underlyingTransformConfig)); } @@ -202,7 +204,22 @@ Row getConfigurationRow() { static Row getRowConfig(ManagedConfig config, Schema transformSchema) { // May return an empty row (perhaps the underlying transform doesn't have any required // parameters) - return YamlUtils.toBeamRow(config.resolveUnderlyingConfig(), transformSchema, false); + String yamlConfig = config.resolveUnderlyingConfig(); + Map configMap = YamlUtils.yamlStringToMap(yamlConfig); + + Map mapping = MAPPINGS.get(config.getTransformIdentifier()); + if (mapping != null && configMap != null) { + Map remappedConfig = new HashMap<>(); + + for (Map.Entry entry : configMap.entrySet()) { + String key = + mapping.containsKey(entry.getKey()) ? mapping.get(entry.getKey()) : entry.getKey(); + remappedConfig.put(key, entry.getValue()); + } + configMap = remappedConfig; + } + + return YamlUtils.toBeamRow(configMap, transformSchema, false); } Map getAllProviders() { diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java index 48735d8c33a3..14a0c2eace1f 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java @@ -17,9 +17,34 @@ */ package org.apache.beam.sdk.managed; +import java.util.Map; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; + /** This class contains constants for supported managed transform identifiers. */ public class ManagedTransformConstants { public static final String ICEBERG_READ = "beam:schematransform:org.apache.beam:iceberg_read:v1"; public static final String ICEBERG_WRITE = "beam:schematransform:org.apache.beam:iceberg_write:v1"; + public static final String KAFKA_READ = "beam:schematransform:org.apache.beam:kafka_read:v1"; + + public static final Map KAFKA_READ_MAPPINGS = + ImmutableMap.builder() + .put("topic", "topic") + .put("bootstrap_servers", "bootstrapServers") + .put("consumer_config_updates", "consumerConfigUpdates") + .put("confluent_schema_registry_url", "confluentSchemaRegistryUrl") + .put("confluent_schema_registry_subject", "confluentSchemaRegistrySubject") + .put("data_format", "format") + .put("schema", "schema") + .put("file_descriptor_path", "fileDescriptorPath") + .put("message_name", "messageName") + .build(); + + // Configuration parameter names exposed via the Managed interface may differ from the parameter + // names in the + // actual SchemaTransform implementation. + // Any naming differences should be laid out here so that we can remap the keys before building + // the transform + public static final Map> MAPPINGS = + ImmutableMap.of(KAFKA_READ, KAFKA_READ_MAPPINGS); } From b8c5b36c25a77a265cb4747f40b43d7b036b9bb2 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 6 May 2024 15:01:59 -0400 Subject: [PATCH 2/8] managed kafka write --- .../KafkaReadSchemaTransformProviderTest.java | 67 +++++++++++-------- ...KafkaWriteSchemaTransformProviderTest.java | 58 ++++++++++++++++ .../org/apache/beam/sdk/managed/Managed.java | 13 ++-- .../ManagedSchemaTransformProvider.java | 13 ++-- .../managed/ManagedTransformConstants.java | 20 +++++- 5 files changed, 130 insertions(+), 41 deletions(-) diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java index 7e0c12100780..233983d8051a 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java @@ -17,24 +17,20 @@ */ package org.apache.beam.sdk.io.kafka; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThrows; +import static org.junit.Assert.*; +import static org.junit.Assert.assertTrue; import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.util.Arrays; -import java.util.List; -import java.util.Objects; -import java.util.ServiceLoader; +import java.util.*; import java.util.stream.Collectors; import java.util.stream.StreamSupport; - import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.managed.Managed; +import org.apache.beam.sdk.managed.ManagedTransformConstants; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.utils.YamlUtils; import org.apache.beam.sdk.values.PCollectionRowTuple; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteStreams; @@ -294,26 +290,43 @@ public void testBuildTransformWithoutProtoSchemaFormat() { @Test public void testBuildTransformWithManaged() { - List configs = Arrays.asList( - "topic: topic_1\n" + - "bootstrap_servers: some bootstrap\n" + - "data_format: RAW", - "topic: topic_2\n" + - "bootstrap_servers: some bootstrap\n" + - "schema: '{\"type\":\"record\",\"name\":\"my_record\",\"fields\":[{\"name\":\"bool\",\"type\":\"boolean\"}]}'", - "topic: topic_3\n" + - "bootstrap_servers: some bootstrap\n" + - "schema_registry_url: some-url\n" + - "schema_registry_subject: some-subject\n" + - "data_format: RAW", - "topic: topic_4\n" + - "bootstrap_servers: some bootstrap\n" + - "data_format: PROTO\n" + - "schema: '" + PROTO_SCHEMA + "'\n" + - "message_name: MyMessage"); + List configs = + Arrays.asList( + "topic: topic_1\n" + "bootstrap_servers: some bootstrap\n" + "data_format: RAW", + "topic: topic_2\n" + + "bootstrap_servers: some bootstrap\n" + + "schema: '{\"type\":\"record\",\"name\":\"my_record\",\"fields\":[{\"name\":\"bool\",\"type\":\"boolean\"}]}'", + "topic: topic_3\n" + + "bootstrap_servers: some bootstrap\n" + + "schema_registry_url: some-url\n" + + "schema_registry_subject: some-subject\n" + + "data_format: RAW", + "topic: topic_4\n" + + "bootstrap_servers: some bootstrap\n" + + "data_format: PROTO\n" + + "schema: '" + + PROTO_SCHEMA + + "'\n" + + "message_name: MyMessage"); + + for (String config : configs) { + // Kafka Read SchemaTransform gets built in ManagedSchemaTransformProvider's expand + Managed.read(Managed.KAFKA) + .withConfig(YamlUtils.yamlStringToMap(config)) + .expand(PCollectionRowTuple.empty(Pipeline.create())); + } + } + + @Test + public void testManagedMappings() { + KafkaReadSchemaTransformProvider provider = new KafkaReadSchemaTransformProvider(); + Map mapping = ManagedTransformConstants.MAPPINGS.get(provider.identifier()); + + assertNotNull(mapping); - for (String config: configs) { - Managed.read(Managed.KAFKA).withConfig(YamlUtils.yamlStringToMap(config)).expand(PCollectionRowTuple.empty(Pipeline.create())); + List configSchemaFieldNames = provider.configurationSchema().getFieldNames(); + for (String paramName : mapping.values()) { + assertTrue(configSchemaFieldNames.contains(paramName)); } } } diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProviderTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProviderTest.java index 48d463a8f436..cc0c3e8c2012 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProviderTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProviderTest.java @@ -18,17 +18,23 @@ package org.apache.beam.sdk.io.kafka; import static org.apache.beam.sdk.io.kafka.KafkaWriteSchemaTransformProvider.getRowToRawBytesFunction; +import static org.junit.Assert.*; import java.io.UnsupportedEncodingException; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; +import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.extensions.protobuf.ProtoByteUtils; import org.apache.beam.sdk.io.kafka.KafkaWriteSchemaTransformProvider.KafkaWriteSchemaTransform.ErrorCounterFn; +import org.apache.beam.sdk.managed.Managed; +import org.apache.beam.sdk.managed.ManagedTransformConstants; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling; import org.apache.beam.sdk.schemas.utils.JsonUtils; +import org.apache.beam.sdk.schemas.utils.YamlUtils; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; @@ -36,6 +42,7 @@ import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TupleTag; @@ -185,4 +192,55 @@ public void testKafkaErrorFnProtoSuccess() { output.get(ERROR_TAG).setRowSchema(errorSchema); p.run().waitUntilFinish(); } + + private static final String PROTO_SCHEMA = + "syntax = \"proto3\";\n" + + "\n" + + "message MyMessage {\n" + + " int32 id = 1;\n" + + " string name = 2;\n" + + " bool active = 3;\n" + + "}"; + + @Test + public void testBuildTransformWithManaged() { + List configs = + Arrays.asList( + "topic: topic_1\n" + "bootstrap_servers: some bootstrap\n" + "data_format: RAW", + "topic: topic_2\n" + + "bootstrap_servers: some bootstrap\n" + + "producer_config_updates: {\"foo\": \"bar\"}\n" + + "data_format: AVRO", + "topic: topic_3\n" + + "bootstrap_servers: some bootstrap\n" + + "data_format: PROTO\n" + + "schema: '" + + PROTO_SCHEMA + + "'\n" + + "message_name: MyMessage"); + + for (String config : configs) { + // Kafka Write SchemaTransform gets built in ManagedSchemaTransformProvider's expand + Managed.write(Managed.KAFKA) + .withConfig(YamlUtils.yamlStringToMap(config)) + .expand( + PCollectionRowTuple.of( + "input", + Pipeline.create() + .apply(Create.empty(Schema.builder().addByteArrayField("bytes").build())))); + } + } + + @Test + public void testManagedMappings() { + KafkaWriteSchemaTransformProvider provider = new KafkaWriteSchemaTransformProvider(); + Map mapping = ManagedTransformConstants.MAPPINGS.get(provider.identifier()); + + assertNotNull(mapping); + + List configSchemaFieldNames = provider.configurationSchema().getFieldNames(); + for (String paramName : mapping.values()) { + assertTrue(configSchemaFieldNames.contains(paramName)); + } + } } diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java index 7e4699597fc5..da4a0853fb39 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java @@ -17,10 +17,6 @@ */ package org.apache.beam.sdk.managed; -import static org.apache.beam.sdk.managed.ManagedTransformConstants.ICEBERG_READ; -import static org.apache.beam.sdk.managed.ManagedTransformConstants.ICEBERG_WRITE; -import static org.apache.beam.sdk.managed.ManagedTransformConstants.KAFKA_READ; - import com.google.auto.value.AutoValue; import java.util.ArrayList; import java.util.List; @@ -86,11 +82,14 @@ public class Managed { // Supported SchemaTransforms public static final Map READ_TRANSFORMS = ImmutableMap.builder() - .put(ICEBERG, ICEBERG_READ) - .put(KAFKA, KAFKA_READ) + .put(ICEBERG, ManagedTransformConstants.ICEBERG_READ) + .put(KAFKA, ManagedTransformConstants.KAFKA_READ) .build(); public static final Map WRITE_TRANSFORMS = - ImmutableMap.builder().put(ICEBERG, ICEBERG_WRITE).build(); + ImmutableMap.builder() + .put(ICEBERG, ManagedTransformConstants.ICEBERG_WRITE) + .put(KAFKA, ManagedTransformConstants.KAFKA_WRITE) + .build(); /** * Instantiates a {@link Managed.ManagedTransform} transform for the specified source. The diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java index af101ecba6df..c8d20ffe29a8 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java @@ -177,7 +177,6 @@ static class ManagedSchemaTransform extends SchemaTransform { @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { - System.out.println("CONFIG: " + underlyingTransformConfig); return input.apply(underlyingTransformProvider.from(underlyingTransformConfig)); } @@ -207,14 +206,18 @@ static Row getRowConfig(ManagedConfig config, Schema transformSchema) { String yamlConfig = config.resolveUnderlyingConfig(); Map configMap = YamlUtils.yamlStringToMap(yamlConfig); + // The config Row object will be used to build the underlying SchemaTransform. + // If a mapping for the SchemaTransform exists, we use it to update parameter names to align + // with the underlying config schema Map mapping = MAPPINGS.get(config.getTransformIdentifier()); if (mapping != null && configMap != null) { Map remappedConfig = new HashMap<>(); - for (Map.Entry entry : configMap.entrySet()) { - String key = - mapping.containsKey(entry.getKey()) ? mapping.get(entry.getKey()) : entry.getKey(); - remappedConfig.put(key, entry.getValue()); + String paramName = entry.getKey(); + if (mapping.containsKey(paramName)) { + paramName = mapping.get(paramName); + } + remappedConfig.put(paramName, entry.getValue()); } configMap = remappedConfig; } diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java index 14a0c2eace1f..5deae9c7afea 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java @@ -26,8 +26,9 @@ public class ManagedTransformConstants { public static final String ICEBERG_WRITE = "beam:schematransform:org.apache.beam:iceberg_write:v1"; public static final String KAFKA_READ = "beam:schematransform:org.apache.beam:kafka_read:v1"; + public static final String KAFKA_WRITE = "beam:schematransform:org.apache.beam:kafka_write:v1"; - public static final Map KAFKA_READ_MAPPINGS = + private static final Map KAFKA_READ_MAPPINGS = ImmutableMap.builder() .put("topic", "topic") .put("bootstrap_servers", "bootstrapServers") @@ -40,11 +41,26 @@ public class ManagedTransformConstants { .put("message_name", "messageName") .build(); + private static final Map KAFKA_WRITE_MAPPINGS = + ImmutableMap.builder() + .put("topic", "topic") + .put("bootstrap_servers", "bootstrapServers") + .put("producer_config_updates", "producerConfigUpdates") + .put("data_format", "format") + .put("file_descriptor_path", "fileDescriptorPath") + .put("message_name", "messageName") + .build(); + // Configuration parameter names exposed via the Managed interface may differ from the parameter // names in the // actual SchemaTransform implementation. // Any naming differences should be laid out here so that we can remap the keys before building // the transform + // Mappings don't need to include ALL underlying parameter names, as we may not want to expose + // every single parameter through the Managed interface public static final Map> MAPPINGS = - ImmutableMap.of(KAFKA_READ, KAFKA_READ_MAPPINGS); + ImmutableMap.>builder() + .put(KAFKA_READ, KAFKA_READ_MAPPINGS) + .put(KAFKA_WRITE, KAFKA_WRITE_MAPPINGS) + .build(); } From 591726bc73c04591f506da9c92c50cecb482a46e Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 6 May 2024 15:12:50 -0400 Subject: [PATCH 3/8] docs --- .../KafkaReadSchemaTransformProviderTest.java | 3 ++- ...KafkaWriteSchemaTransformProviderTest.java | 3 ++- .../managed/ManagedTransformConstants.java | 25 +++++++++++++------ 3 files changed, 21 insertions(+), 10 deletions(-) diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java index 233983d8051a..be30085dc111 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java @@ -17,7 +17,8 @@ */ package org.apache.beam.sdk.io.kafka; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.io.IOException; diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProviderTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProviderTest.java index cc0c3e8c2012..60bff89b3555 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProviderTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProviderTest.java @@ -18,7 +18,8 @@ package org.apache.beam.sdk.io.kafka; import static org.apache.beam.sdk.io.kafka.KafkaWriteSchemaTransformProvider.getRowToRawBytesFunction; -import static org.junit.Assert.*; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import java.io.UnsupportedEncodingException; import java.util.Arrays; diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java index 5deae9c7afea..8165633cf15e 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java @@ -20,7 +20,23 @@ import java.util.Map; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; -/** This class contains constants for supported managed transform identifiers. */ +/** + * This class contains constants for supported managed transforms, including: + * + *
    + *
  • Identifiers of supported transforms + *
  • Configuration parameter renaming + *
+ * + *

Configuration parameter names exposed via Managed interface may differ from the parameter + * names in the underlying SchemaTransform implementation. + * + *

Any naming differences are laid out in {@link ManagedTransformConstants#MAPPINGS} to update + * the configuration object before it's used to build the underlying transform. + * + *

Mappings don't need to include ALL underlying parameter names, as we may not want to expose + * every single parameter through the Managed interface. + */ public class ManagedTransformConstants { public static final String ICEBERG_READ = "beam:schematransform:org.apache.beam:iceberg_read:v1"; public static final String ICEBERG_WRITE = @@ -51,13 +67,6 @@ public class ManagedTransformConstants { .put("message_name", "messageName") .build(); - // Configuration parameter names exposed via the Managed interface may differ from the parameter - // names in the - // actual SchemaTransform implementation. - // Any naming differences should be laid out here so that we can remap the keys before building - // the transform - // Mappings don't need to include ALL underlying parameter names, as we may not want to expose - // every single parameter through the Managed interface public static final Map> MAPPINGS = ImmutableMap.>builder() .put(KAFKA_READ, KAFKA_READ_MAPPINGS) From 1a43ed2e6594a94dedad116c268ad3735e6141da Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 6 May 2024 15:59:14 -0400 Subject: [PATCH 4/8] spotless --- .../beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java index be30085dc111..4b52e1009fc0 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java @@ -19,6 +19,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import java.io.IOException; From 8a2acfe2282bfb60bce7a0afc50fc8fcd14f88b5 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 7 May 2024 15:14:15 -0400 Subject: [PATCH 5/8] spotless --- .../sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java index 4b52e1009fc0..d5962a737baf 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java @@ -24,7 +24,11 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.util.*; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.ServiceLoader; import java.util.stream.Collectors; import java.util.stream.StreamSupport; import org.apache.beam.sdk.Pipeline; From 06b9bdbb216962ef687d9ca78ccff51252221bfa Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 8 May 2024 18:17:43 -0400 Subject: [PATCH 6/8] cleanup --- .../apache/beam/sdk/managed/ManagedSchemaTransformProvider.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java index c8d20ffe29a8..8805f8dcb7b2 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java @@ -207,7 +207,7 @@ static Row getRowConfig(ManagedConfig config, Schema transformSchema) { Map configMap = YamlUtils.yamlStringToMap(yamlConfig); // The config Row object will be used to build the underlying SchemaTransform. - // If a mapping for the SchemaTransform exists, we use it to update parameter names to align + // If a mapping for the SchemaTransform exists, we use it to update parameter names and align // with the underlying config schema Map mapping = MAPPINGS.get(config.getTransformIdentifier()); if (mapping != null && configMap != null) { From 1404d1c019993ae5a3563f284f13506ef350d0be Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Thu, 9 May 2024 13:03:54 -0400 Subject: [PATCH 7/8] add debugging log --- .../beam/sdk/managed/ManagedSchemaTransformProvider.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java index 8805f8dcb7b2..79eb6347541c 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java @@ -50,10 +50,13 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @AutoService(SchemaTransformProvider.class) public class ManagedSchemaTransformProvider extends TypedSchemaTransformProvider { + private static final Logger LOG = LoggerFactory.getLogger(ManagedSchemaTransformProvider.class); @Override public String identifier() { @@ -177,6 +180,11 @@ static class ManagedSchemaTransform extends SchemaTransform { @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { + LOG.debug( + "Building transform \"{}\" with Row configuration: {}", + underlyingTransformProvider.identifier(), + underlyingTransformConfig); + return input.apply(underlyingTransformProvider.from(underlyingTransformConfig)); } From 96030e2bae5e3a9e2b7699c033c6755d3e5b8713 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Thu, 9 May 2024 15:19:02 -0400 Subject: [PATCH 8/8] add slf4j dependency --- sdks/java/managed/build.gradle | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/java/managed/build.gradle b/sdks/java/managed/build.gradle index f06df27429b1..add0d7f3cc0d 100644 --- a/sdks/java/managed/build.gradle +++ b/sdks/java/managed/build.gradle @@ -29,6 +29,7 @@ ext.summary = """Library that provides managed IOs.""" dependencies { implementation project(path: ":sdks:java:core", configuration: "shadow") implementation library.java.vendored_guava_32_1_2_jre + implementation library.java.slf4j_api testImplementation library.java.junit testRuntimeOnly "org.yaml:snakeyaml:2.0"