From 6c5906d0a3a1ea5c2351be42d1f7677f1a90db6c Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 29 Mar 2024 17:45:24 -0400 Subject: [PATCH 01/21] managed api for java --- sdks/java/io/kafka/build.gradle | 1 + .../KafkaWriteSchemaTransformProvider.java | 4 +- sdks/java/managed/build.gradle | 32 +++ .../org/apache/beam/sdk/managed/Managed.java | 163 ++++++++++++++ .../ManagedSchemaTransformProvider.java | 203 ++++++++++++++++++ .../apache/beam/sdk/managed/package-info.java | 20 ++ settings.gradle.kts | 2 + 7 files changed, 423 insertions(+), 2 deletions(-) create mode 100644 sdks/java/managed/build.gradle create mode 100644 sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java create mode 100644 sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java create mode 100644 sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/package-info.java diff --git a/sdks/java/io/kafka/build.gradle b/sdks/java/io/kafka/build.gradle index 269ddb3f5eb2..abcaff4bb7ff 100644 --- a/sdks/java/io/kafka/build.gradle +++ b/sdks/java/io/kafka/build.gradle @@ -50,6 +50,7 @@ kafkaVersions.each{k,v -> configurations.create("kafkaVersion$k")} dependencies { implementation library.java.vendored_guava_32_1_2_jre + implementation project(path: ':sdks:java:managed') provided library.java.jackson_dataformat_csv permitUnusedDeclared library.java.jackson_dataformat_csv implementation project(path: ":sdks:java:core", configuration: "shadow") diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java index 26f37b790ef8..eeeefb8665bd 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java @@ -28,6 +28,7 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils; import org.apache.beam.sdk.extensions.protobuf.ProtoByteUtils; +import org.apache.beam.sdk.managed.ManagedSchemaTransformProvider; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.schemas.AutoValueSchema; @@ -35,7 +36,6 @@ import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; -import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling; import org.apache.beam.sdk.schemas.utils.JsonUtils; @@ -58,7 +58,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@AutoService(SchemaTransformProvider.class) +@AutoService(ManagedSchemaTransformProvider.class) public class KafkaWriteSchemaTransformProvider extends TypedSchemaTransformProvider< KafkaWriteSchemaTransformProvider.KafkaWriteSchemaTransformConfiguration> { diff --git a/sdks/java/managed/build.gradle b/sdks/java/managed/build.gradle new file mode 100644 index 000000000000..05ec19d597f7 --- /dev/null +++ b/sdks/java/managed/build.gradle @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +apply plugin: 'org.apache.beam.module' +applyJavaNature( + automaticModuleName: 'org.apache.beam.sdk.managed', +) + + +description = "Apache Beam :: SDKs :: Java :: Managed" +ext.summary = """Library that provides managed IOs.""" + + +dependencies { + implementation project(path: ":sdks:java:core", configuration: "shadow") + testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") +} diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java new file mode 100644 index 000000000000..ff828625fdaf --- /dev/null +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.managed; + +import com.google.auto.value.AutoValue; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; + +public class Managed { + public static final String READ = "READ"; + public static final String WRITE = "WRITE"; + + public enum IO { + ICEBERG + } + + public static Read read() { + return new AutoValue_Managed_Read.Builder().build(); + } + + @AutoValue + public abstract static class Read extends SchemaTransform { + private final Map identifiers = + ImmutableMap.of(IO.ICEBERG, "beam:schematransform:org.apache.beam:iceberg_read:v1"); + + abstract IO getSource(); + + abstract @Nullable String getConfig(); + + abstract @Nullable String getConfigUrl(); + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setSource(IO source); + + abstract Builder setConfig(String config); + + abstract Builder setConfigUrl(String configUrl); + + abstract Read build(); + } + + public Read from(IO source) { + return toBuilder().setSource(source).build(); + } + + public Read withConfigUrl(String configUrl) { + return toBuilder().setConfigUrl(configUrl).build(); + } + + public Read withConfig(String config) { + return toBuilder().setConfigUrl(config).build(); + } + + public Read withConfig(Map config) { + return toBuilder().setConfigUrl(mapToYamlString(config)).build(); + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + String underlyingTransformIdentifier = identifiers.get(getSource()); + + ManagedSchemaTransformProvider.ManagedConfig managedConfig = + ManagedSchemaTransformProvider.ManagedConfig.builder() + .setIdentifier(underlyingTransformIdentifier) + .setType(READ) + .setConfig(getConfig()) + .setConfigUrl(getConfigUrl()) + .build(); + + SchemaTransform underlyingTransform = ManagedSchemaTransformProvider.of().from(managedConfig); + + return input.apply(underlyingTransform); + } + } + + public static Write write() { + return new AutoValue_Managed_Write.Builder().build(); + } + + @AutoValue + public abstract static class Write extends SchemaTransform { + private final Map identifiers = + ImmutableMap.of(IO.ICEBERG, "beam:schematransform:org.apache.beam:iceberg_write:v1"); + + abstract IO getSink(); + + abstract @Nullable String getConfig(); + + abstract @Nullable String getConfigUrl(); + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setSource(IO source); + + abstract Builder setConfig(String config); + + abstract Builder setConfigUrl(String configUrl); + + abstract Write build(); + } + + public Write to(IO source) { + return toBuilder().setSource(source).build(); + } + + public Write withConfigUrl(String configUrl) { + return toBuilder().setConfigUrl(configUrl).build(); + } + + public Write withConfig(String config) { + return toBuilder().setConfigUrl(config).build(); + } + + public Write withConfig(Map config) { + return toBuilder().setConfigUrl(mapToYamlString(config)).build(); + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + String underlyingTransformIdentifier = identifiers.get(getSink()); + + ManagedSchemaTransformProvider.ManagedConfig managedConfig = + ManagedSchemaTransformProvider.ManagedConfig.builder() + .setIdentifier(underlyingTransformIdentifier) + .setType(WRITE) + .setConfig(getConfig()) + .setConfigUrl(getConfigUrl()) + .build(); + + SchemaTransform underlyingTransform = ManagedSchemaTransformProvider.of().from(managedConfig); + + return input.apply(underlyingTransform); + } + } + + // TODO: implement this + private static String mapToYamlString(Map map) { + return ""; + } +} diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java new file mode 100644 index 000000000000..63fcbd1f3297 --- /dev/null +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.managed; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.service.AutoService; +import com.google.auto.value.AutoValue; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.ServiceLoader; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; + +@AutoService(SchemaTransformProvider.class) +public class ManagedSchemaTransformProvider + extends TypedSchemaTransformProvider { + public static final String INPUT_TAG = "input"; + + @Override + public String identifier() { + return "beam:schematransform:org.apache.beam:managed:v1"; + } + + private Map schemaTransformProviders = new HashMap<>(); + + private ManagedSchemaTransformProvider() { + try { + for (SchemaTransformProvider schemaTransformProvider : + ServiceLoader.load(ManagedSchemaTransformProvider.class)) { + if (schemaTransformProviders.containsKey(schemaTransformProvider.identifier())) { + throw new IllegalArgumentException( + "Found multiple SchemaTransformProvider implementations with the same identifier " + + schemaTransformProvider.identifier()); + } + schemaTransformProviders.put(schemaTransformProvider.identifier(), schemaTransformProvider); + } + } catch (Exception e) { + throw new RuntimeException(e.getMessage()); + } + } + + private static @Nullable ManagedSchemaTransformProvider managedProvider = null; + + public static ManagedSchemaTransformProvider of() { + if (managedProvider == null) { + managedProvider = new ManagedSchemaTransformProvider(); + } + return managedProvider; + } + + @DefaultSchema(AutoValueSchema.class) + @AutoValue + public abstract static class ManagedConfig { + public static Builder builder() { + return new AutoValue_ManagedSchemaTransformProvider_ManagedConfig.Builder(); + } + + @SchemaFieldDescription("Identifier of the underlying IO to instantiate.") + public abstract String getIdentifier(); + + @SchemaFieldDescription("Specifies whether this is a read or write IO.") + public abstract String getType(); + + @SchemaFieldDescription("URL path to the YAML config file used to build the underlying IO.") + public abstract @Nullable String getConfigUrl(); + + @SchemaFieldDescription("YAML string config used to build the underlying IO.") + public abstract @Nullable String getConfig(); + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setIdentifier(String identifier); + + public abstract Builder setType(String type); + + public abstract Builder setConfigUrl(String configUrl); + + public abstract Builder setConfig(String config); + + public abstract ManagedConfig build(); + } + + protected void validate() { + boolean configExists = !Strings.isNullOrEmpty(getConfig()); + boolean configUrlExists = !Strings.isNullOrEmpty(getConfigUrl()); + checkArgument( + !(configExists && configUrlExists) && (configExists || configUrlExists), + "Please specify a config or a config URL, but not both."); + + Set validOperations = Sets.newHashSet(Managed.READ, Managed.WRITE); + checkArgument( + validOperations.contains(getType()), + "Invalid operation type. Please specify one of %s", + validOperations); + } + } + + @Override + protected SchemaTransform from(ManagedConfig managedConfig) { + checkArgument( + schemaTransformProviders.containsKey(managedConfig.getIdentifier()), + "Could not find transform with identifier %s, or it may not be supported", + managedConfig.getIdentifier()); + + SchemaTransformProvider schemaTransformProvider = + schemaTransformProviders.get(managedConfig.getIdentifier()); + + // parse config before expansion to check if it matches underlying transform's config schema + Schema transformConfigSchema = schemaTransformProvider.configurationSchema(); + Row transformConfig; + try { + transformConfig = getRowConfig(managedConfig, transformConfigSchema); + } catch (Exception e) { + throw new IllegalArgumentException( + String.format( + "Specified configuration does not align with the underlying transform's configuration schema [%s].", + transformConfigSchema), + e); + } + + return new ManagedSchemaTransform(managedConfig, transformConfig, schemaTransformProvider); + } + + protected static class ManagedSchemaTransform extends SchemaTransform { + private final ManagedConfig managedConfig; + private final Row transformConfig; + private final SchemaTransformProvider underlyingTransformProvider; + + ManagedSchemaTransform( + ManagedConfig managedConfig, + Row transformConfig, + SchemaTransformProvider underlyingTransformProvider) { + managedConfig.validate(); + this.managedConfig = managedConfig; + this.transformConfig = transformConfig; + this.underlyingTransformProvider = underlyingTransformProvider; + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + SchemaTransform underlyingTransform = underlyingTransformProvider.from(transformConfig); + + if (managedConfig.getType().equalsIgnoreCase(Managed.READ)) { + return PCollectionRowTuple.empty(input.getPipeline()).apply(underlyingTransform); + } else { + return input.apply(underlyingTransform); + } + } + } + + private static Row getRowConfig(ManagedConfig config, Schema transformSchema) throws Exception { + String transformYamlConfig; + if (!Strings.isNullOrEmpty(config.getConfigUrl())) { + try { + transformYamlConfig = + FileSystems.open(FileSystems.matchSingleFileSpec(config.getConfigUrl()).resourceId()) + .toString(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } else { + transformYamlConfig = config.getConfig(); + } + + return yamlToBeamRow(transformYamlConfig, transformSchema); + } + + // TODO: implement this method + private static Row yamlToBeamRow(String yaml, Schema schema) throws Exception { + // parse yaml string and convert to Row + // throw an exception if there are missing required fields or if types don't match + return Row.nullRow(schema); + } +} diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/package-info.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/package-info.java new file mode 100644 index 000000000000..e8e442c6b699 --- /dev/null +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Managed reads and writes. */ +package org.apache.beam.sdk.managed; \ No newline at end of file diff --git a/settings.gradle.kts b/settings.gradle.kts index ec11fd32fdd3..1e52e425b215 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -353,3 +353,5 @@ include("sdks:java:io:kafka:kafka-100") findProject(":sdks:java:io:kafka:kafka-100")?.name = "kafka-100" include("sdks:java:io:kafka:kafka-01103") findProject(":sdks:java:io:kafka:kafka-01103")?.name = "kafka-01103" +include("sdks:java:managed") +findProject(":sdks:java:managed")?.name = "managed" From 9ac92db0a245ffd97fc21d9d525c8f95b64fe69c Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 29 Mar 2024 17:46:35 -0400 Subject: [PATCH 02/21] cleanup --- .../apache/beam/sdk/managed/ManagedSchemaTransformProvider.java | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java index 63fcbd1f3297..aafab16bdc4f 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java @@ -43,7 +43,6 @@ @AutoService(SchemaTransformProvider.class) public class ManagedSchemaTransformProvider extends TypedSchemaTransformProvider { - public static final String INPUT_TAG = "input"; @Override public String identifier() { From cb5f3825623219b173f1da50f5b1f1972463a497 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 29 Mar 2024 17:54:36 -0400 Subject: [PATCH 03/21] fix typo --- .../src/main/java/org/apache/beam/sdk/managed/Managed.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java index ff828625fdaf..87f2465e4cb4 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java @@ -113,7 +113,7 @@ public abstract static class Write extends SchemaTransform { @AutoValue.Builder abstract static class Builder { - abstract Builder setSource(IO source); + abstract Builder setSink(IO source); abstract Builder setConfig(String config); @@ -123,7 +123,7 @@ abstract static class Builder { } public Write to(IO source) { - return toBuilder().setSource(source).build(); + return toBuilder().setSink(source).build(); } public Write withConfigUrl(String configUrl) { From c2cd910e01122e2b45d7bc51d0f4516901910522 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 1 Apr 2024 16:38:52 -0400 Subject: [PATCH 04/21] address comments --- .../KafkaWriteSchemaTransformProvider.java | 4 +- .../org/apache/beam/sdk/managed/Managed.java | 68 +++++++++++-------- .../ManagedSchemaTransformProvider.java | 67 +++++++----------- 3 files changed, 66 insertions(+), 73 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java index eeeefb8665bd..26f37b790ef8 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java @@ -28,7 +28,6 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils; import org.apache.beam.sdk.extensions.protobuf.ProtoByteUtils; -import org.apache.beam.sdk.managed.ManagedSchemaTransformProvider; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.schemas.AutoValueSchema; @@ -36,6 +35,7 @@ import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling; import org.apache.beam.sdk.schemas.utils.JsonUtils; @@ -58,7 +58,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@AutoService(ManagedSchemaTransformProvider.class) +@AutoService(SchemaTransformProvider.class) public class KafkaWriteSchemaTransformProvider extends TypedSchemaTransformProvider< KafkaWriteSchemaTransformProvider.KafkaWriteSchemaTransformConfiguration> { diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java index 87f2465e4cb4..fbf5756c91b7 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java @@ -19,29 +19,36 @@ import com.google.auto.value.AutoValue; import java.util.Map; +import java.util.regex.Pattern; import javax.annotation.Nullable; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.values.PCollectionRowTuple; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; public class Managed { - public static final String READ = "READ"; - public static final String WRITE = "WRITE"; + protected enum Type { + READ, + WRITE + } + public enum IO { ICEBERG } public static Read read() { - return new AutoValue_Managed_Read.Builder().build(); + return new AutoValue_Managed_Read.Builder() + .setPattern(Read.PATTERN) + .build(); } @AutoValue public abstract static class Read extends SchemaTransform { - private final Map identifiers = - ImmutableMap.of(IO.ICEBERG, "beam:schematransform:org.apache.beam:iceberg_read:v1"); + protected static final Pattern PATTERN = Pattern.compile("beam:schematransform:org.apache.beam:[\\w-]+_read[\\w-]*:[\\w-]+"); + - abstract IO getSource(); + abstract String getSource(); + + abstract Pattern getPattern(); abstract @Nullable String getConfig(); @@ -51,7 +58,9 @@ public abstract static class Read extends SchemaTransform { @AutoValue.Builder abstract static class Builder { - abstract Builder setSource(IO source); + abstract Builder setSource(String source); + + abstract Builder setPattern(Pattern pattern); abstract Builder setConfig(String config); @@ -60,8 +69,8 @@ abstract static class Builder { abstract Read build(); } - public Read from(IO source) { - return toBuilder().setSource(source).build(); + public Read from(String identifier) { + return toBuilder().setSource(identifier).build(); } public Read withConfigUrl(String configUrl) { @@ -69,41 +78,41 @@ public Read withConfigUrl(String configUrl) { } public Read withConfig(String config) { - return toBuilder().setConfigUrl(config).build(); + return toBuilder().setConfig(config).build(); } public Read withConfig(Map config) { - return toBuilder().setConfigUrl(mapToYamlString(config)).build(); + return toBuilder().setConfig(mapToYamlString(config)).build(); } @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { - String underlyingTransformIdentifier = identifiers.get(getSource()); - ManagedSchemaTransformProvider.ManagedConfig managedConfig = ManagedSchemaTransformProvider.ManagedConfig.builder() - .setIdentifier(underlyingTransformIdentifier) - .setType(READ) + .setTransformIdentifier(getSource()) .setConfig(getConfig()) .setConfigUrl(getConfigUrl()) .build(); - SchemaTransform underlyingTransform = ManagedSchemaTransformProvider.of().from(managedConfig); + SchemaTransform underlyingTransform = ManagedSchemaTransformProvider.of(getPattern()).from(managedConfig); return input.apply(underlyingTransform); } } public static Write write() { - return new AutoValue_Managed_Write.Builder().build(); + return new AutoValue_Managed_Write.Builder() + .setPattern(Write.PATTERN) + .build(); } @AutoValue public abstract static class Write extends SchemaTransform { - private final Map identifiers = - ImmutableMap.of(IO.ICEBERG, "beam:schematransform:org.apache.beam:iceberg_write:v1"); + protected static final Pattern PATTERN = Pattern.compile("beam:schematransform:org.apache.beam:[\\w-]+_write[\\w-]*:[\\w-]+"); + + abstract String getSink(); - abstract IO getSink(); + abstract Pattern getPattern(); abstract @Nullable String getConfig(); @@ -113,7 +122,9 @@ public abstract static class Write extends SchemaTransform { @AutoValue.Builder abstract static class Builder { - abstract Builder setSink(IO source); + abstract Builder setSink(String source); + + abstract Builder setPattern(Pattern pattern); abstract Builder setConfig(String config); @@ -122,7 +133,7 @@ abstract static class Builder { abstract Write build(); } - public Write to(IO source) { + public Write to(String source) { return toBuilder().setSink(source).build(); } @@ -131,26 +142,23 @@ public Write withConfigUrl(String configUrl) { } public Write withConfig(String config) { - return toBuilder().setConfigUrl(config).build(); + return toBuilder().setConfig(config).build(); } public Write withConfig(Map config) { - return toBuilder().setConfigUrl(mapToYamlString(config)).build(); + return toBuilder().setConfig(mapToYamlString(config)).build(); } @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { - String underlyingTransformIdentifier = identifiers.get(getSink()); - ManagedSchemaTransformProvider.ManagedConfig managedConfig = ManagedSchemaTransformProvider.ManagedConfig.builder() - .setIdentifier(underlyingTransformIdentifier) - .setType(WRITE) + .setTransformIdentifier(getSink()) .setConfig(getConfig()) .setConfigUrl(getConfigUrl()) .build(); - SchemaTransform underlyingTransform = ManagedSchemaTransformProvider.of().from(managedConfig); + SchemaTransform underlyingTransform = ManagedSchemaTransformProvider.of(getPattern()).from(managedConfig); return input.apply(underlyingTransform); } diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java index aafab16bdc4f..a5fdc91d5ec7 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java @@ -26,7 +26,9 @@ import java.util.Map; import java.util.ServiceLoader; import java.util.Set; +import java.util.regex.Pattern; import javax.annotation.Nullable; + import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.Schema; @@ -37,41 +39,44 @@ import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.grpc.v1p60p1.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; @AutoService(SchemaTransformProvider.class) -public class ManagedSchemaTransformProvider +class ManagedSchemaTransformProvider extends TypedSchemaTransformProvider { + private static final String MANAGED_NAMESPACE = "managed"; @Override public String identifier() { return "beam:schematransform:org.apache.beam:managed:v1"; } - private Map schemaTransformProviders = new HashMap<>(); + private final Map schemaTransformProviders = new HashMap<>(); - private ManagedSchemaTransformProvider() { + private ManagedSchemaTransformProvider(Pattern pattern) { try { - for (SchemaTransformProvider schemaTransformProvider : - ServiceLoader.load(ManagedSchemaTransformProvider.class)) { + for (SchemaTransformProvider schemaTransformProvider : ServiceLoader.load(SchemaTransformProvider.class)) { if (schemaTransformProviders.containsKey(schemaTransformProvider.identifier())) { throw new IllegalArgumentException( - "Found multiple SchemaTransformProvider implementations with the same identifier " - + schemaTransformProvider.identifier()); + "Found multiple SchemaTransformProvider implementations with the same identifier " + + schemaTransformProvider.identifier()); } schemaTransformProviders.put(schemaTransformProvider.identifier(), schemaTransformProvider); } } catch (Exception e) { throw new RuntimeException(e.getMessage()); } + + schemaTransformProviders.entrySet().removeIf(e -> !pattern.matcher(e.getKey()).matches()); } private static @Nullable ManagedSchemaTransformProvider managedProvider = null; - public static ManagedSchemaTransformProvider of() { + public static ManagedSchemaTransformProvider of(Pattern pattern) { if (managedProvider == null) { - managedProvider = new ManagedSchemaTransformProvider(); + managedProvider = new ManagedSchemaTransformProvider(pattern); } return managedProvider; } @@ -84,10 +89,7 @@ public static Builder builder() { } @SchemaFieldDescription("Identifier of the underlying IO to instantiate.") - public abstract String getIdentifier(); - - @SchemaFieldDescription("Specifies whether this is a read or write IO.") - public abstract String getType(); + public abstract String getTransformIdentifier(); @SchemaFieldDescription("URL path to the YAML config file used to build the underlying IO.") public abstract @Nullable String getConfigUrl(); @@ -97,9 +99,7 @@ public static Builder builder() { @AutoValue.Builder public abstract static class Builder { - public abstract Builder setIdentifier(String identifier); - - public abstract Builder setType(String type); + public abstract Builder setTransformIdentifier(String identifier); public abstract Builder setConfigUrl(String configUrl); @@ -114,24 +114,16 @@ protected void validate() { checkArgument( !(configExists && configUrlExists) && (configExists || configUrlExists), "Please specify a config or a config URL, but not both."); - - Set validOperations = Sets.newHashSet(Managed.READ, Managed.WRITE); - checkArgument( - validOperations.contains(getType()), - "Invalid operation type. Please specify one of %s", - validOperations); } } @Override protected SchemaTransform from(ManagedConfig managedConfig) { - checkArgument( - schemaTransformProviders.containsKey(managedConfig.getIdentifier()), - "Could not find transform with identifier %s, or it may not be supported", - managedConfig.getIdentifier()); - - SchemaTransformProvider schemaTransformProvider = - schemaTransformProviders.get(managedConfig.getIdentifier()); + managedConfig.validate(); + SchemaTransformProvider schemaTransformProvider = Preconditions.checkNotNull( + schemaTransformProviders.get(managedConfig.getTransformIdentifier()), + "Could not find transform with identifier %s, or it may not be supported", + managedConfig.getTransformIdentifier()); // parse config before expansion to check if it matches underlying transform's config schema Schema transformConfigSchema = schemaTransformProvider.configurationSchema(); @@ -146,20 +138,16 @@ protected SchemaTransform from(ManagedConfig managedConfig) { e); } - return new ManagedSchemaTransform(managedConfig, transformConfig, schemaTransformProvider); + return new ManagedSchemaTransform(transformConfig, schemaTransformProvider); } protected static class ManagedSchemaTransform extends SchemaTransform { - private final ManagedConfig managedConfig; private final Row transformConfig; private final SchemaTransformProvider underlyingTransformProvider; ManagedSchemaTransform( - ManagedConfig managedConfig, Row transformConfig, SchemaTransformProvider underlyingTransformProvider) { - managedConfig.validate(); - this.managedConfig = managedConfig; this.transformConfig = transformConfig; this.underlyingTransformProvider = underlyingTransformProvider; } @@ -168,11 +156,7 @@ protected static class ManagedSchemaTransform extends SchemaTransform { public PCollectionRowTuple expand(PCollectionRowTuple input) { SchemaTransform underlyingTransform = underlyingTransformProvider.from(transformConfig); - if (managedConfig.getType().equalsIgnoreCase(Managed.READ)) { - return PCollectionRowTuple.empty(input.getPipeline()).apply(underlyingTransform); - } else { - return input.apply(underlyingTransform); - } + return input.apply(underlyingTransform); } } @@ -181,7 +165,7 @@ private static Row getRowConfig(ManagedConfig config, Schema transformSchema) th if (!Strings.isNullOrEmpty(config.getConfigUrl())) { try { transformYamlConfig = - FileSystems.open(FileSystems.matchSingleFileSpec(config.getConfigUrl()).resourceId()) + FileSystems.open(FileSystems.matchSingleFileSpec(Preconditions.checkNotNull(config.getConfigUrl())).resourceId()) .toString(); } catch (IOException e) { throw new RuntimeException(e); @@ -190,13 +174,14 @@ private static Row getRowConfig(ManagedConfig config, Schema transformSchema) th transformYamlConfig = config.getConfig(); } - return yamlToBeamRow(transformYamlConfig, transformSchema); + return yamlToBeamRow(Preconditions.checkNotNull(transformYamlConfig), transformSchema); } // TODO: implement this method private static Row yamlToBeamRow(String yaml, Schema schema) throws Exception { // parse yaml string and convert to Row // throw an exception if there are missing required fields or if types don't match + System.out.println(yaml); return Row.nullRow(schema); } } From 9d52d6ca581efa92b3da7c93cbf41b7aaef07750 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 1 Apr 2024 16:40:42 -0400 Subject: [PATCH 05/21] spotless --- .../org/apache/beam/sdk/managed/Managed.java | 22 +++++++++---------- .../ManagedSchemaTransformProvider.java | 21 +++++++++--------- .../apache/beam/sdk/managed/package-info.java | 2 +- 3 files changed, 22 insertions(+), 23 deletions(-) diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java index fbf5756c91b7..4c0c56386baf 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java @@ -30,21 +30,18 @@ protected enum Type { WRITE } - public enum IO { ICEBERG } public static Read read() { - return new AutoValue_Managed_Read.Builder() - .setPattern(Read.PATTERN) - .build(); + return new AutoValue_Managed_Read.Builder().setPattern(Read.PATTERN).build(); } @AutoValue public abstract static class Read extends SchemaTransform { - protected static final Pattern PATTERN = Pattern.compile("beam:schematransform:org.apache.beam:[\\w-]+_read[\\w-]*:[\\w-]+"); - + protected static final Pattern PATTERN = + Pattern.compile("beam:schematransform:org.apache.beam:[\\w-]+_read[\\w-]*:[\\w-]+"); abstract String getSource(); @@ -94,21 +91,21 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { .setConfigUrl(getConfigUrl()) .build(); - SchemaTransform underlyingTransform = ManagedSchemaTransformProvider.of(getPattern()).from(managedConfig); + SchemaTransform underlyingTransform = + ManagedSchemaTransformProvider.of(getPattern()).from(managedConfig); return input.apply(underlyingTransform); } } public static Write write() { - return new AutoValue_Managed_Write.Builder() - .setPattern(Write.PATTERN) - .build(); + return new AutoValue_Managed_Write.Builder().setPattern(Write.PATTERN).build(); } @AutoValue public abstract static class Write extends SchemaTransform { - protected static final Pattern PATTERN = Pattern.compile("beam:schematransform:org.apache.beam:[\\w-]+_write[\\w-]*:[\\w-]+"); + protected static final Pattern PATTERN = + Pattern.compile("beam:schematransform:org.apache.beam:[\\w-]+_write[\\w-]*:[\\w-]+"); abstract String getSink(); @@ -158,7 +155,8 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { .setConfigUrl(getConfigUrl()) .build(); - SchemaTransform underlyingTransform = ManagedSchemaTransformProvider.of(getPattern()).from(managedConfig); + SchemaTransform underlyingTransform = + ManagedSchemaTransformProvider.of(getPattern()).from(managedConfig); return input.apply(underlyingTransform); } diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java index a5fdc91d5ec7..ccabc78a3df7 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java @@ -25,10 +25,8 @@ import java.util.HashMap; import java.util.Map; import java.util.ServiceLoader; -import java.util.Set; import java.util.regex.Pattern; import javax.annotation.Nullable; - import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.Schema; @@ -41,7 +39,6 @@ import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.grpc.v1p60p1.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; @AutoService(SchemaTransformProvider.class) class ManagedSchemaTransformProvider @@ -57,11 +54,12 @@ public String identifier() { private ManagedSchemaTransformProvider(Pattern pattern) { try { - for (SchemaTransformProvider schemaTransformProvider : ServiceLoader.load(SchemaTransformProvider.class)) { + for (SchemaTransformProvider schemaTransformProvider : + ServiceLoader.load(SchemaTransformProvider.class)) { if (schemaTransformProviders.containsKey(schemaTransformProvider.identifier())) { throw new IllegalArgumentException( - "Found multiple SchemaTransformProvider implementations with the same identifier " - + schemaTransformProvider.identifier()); + "Found multiple SchemaTransformProvider implementations with the same identifier " + + schemaTransformProvider.identifier()); } schemaTransformProviders.put(schemaTransformProvider.identifier(), schemaTransformProvider); } @@ -120,7 +118,8 @@ protected void validate() { @Override protected SchemaTransform from(ManagedConfig managedConfig) { managedConfig.validate(); - SchemaTransformProvider schemaTransformProvider = Preconditions.checkNotNull( + SchemaTransformProvider schemaTransformProvider = + Preconditions.checkNotNull( schemaTransformProviders.get(managedConfig.getTransformIdentifier()), "Could not find transform with identifier %s, or it may not be supported", managedConfig.getTransformIdentifier()); @@ -146,8 +145,7 @@ protected static class ManagedSchemaTransform extends SchemaTransform { private final SchemaTransformProvider underlyingTransformProvider; ManagedSchemaTransform( - Row transformConfig, - SchemaTransformProvider underlyingTransformProvider) { + Row transformConfig, SchemaTransformProvider underlyingTransformProvider) { this.transformConfig = transformConfig; this.underlyingTransformProvider = underlyingTransformProvider; } @@ -165,7 +163,10 @@ private static Row getRowConfig(ManagedConfig config, Schema transformSchema) th if (!Strings.isNullOrEmpty(config.getConfigUrl())) { try { transformYamlConfig = - FileSystems.open(FileSystems.matchSingleFileSpec(Preconditions.checkNotNull(config.getConfigUrl())).resourceId()) + FileSystems.open( + FileSystems.matchSingleFileSpec( + Preconditions.checkNotNull(config.getConfigUrl())) + .resourceId()) .toString(); } catch (IOException e) { throw new RuntimeException(e); diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/package-info.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/package-info.java index e8e442c6b699..d129e4a7a225 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/package-info.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/package-info.java @@ -17,4 +17,4 @@ */ /** Managed reads and writes. */ -package org.apache.beam.sdk.managed; \ No newline at end of file +package org.apache.beam.sdk.managed; From 5c40668f2f55999e43633cb24a927d91fbee9d5b Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 1 Apr 2024 16:48:33 -0400 Subject: [PATCH 06/21] cleanup --- .../main/java/org/apache/beam/sdk/managed/Managed.java | 9 --------- 1 file changed, 9 deletions(-) diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java index 4c0c56386baf..8565f094dd52 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java @@ -25,15 +25,6 @@ import org.apache.beam.sdk.values.PCollectionRowTuple; public class Managed { - protected enum Type { - READ, - WRITE - } - - public enum IO { - ICEBERG - } - public static Read read() { return new AutoValue_Managed_Read.Builder().setPattern(Read.PATTERN).build(); } From db2e751292ed0522152502f4ae5af9b50dc9edec Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Thu, 4 Apr 2024 13:20:58 -0400 Subject: [PATCH 07/21] yaml utils --- .../beam/sdk/schemas/utils/YamlUtils.java | 139 +++++++++++ .../apache/beam/sdk/util/YamlUtilsTest.java | 224 ++++++++++++++++++ 2 files changed, 363 insertions(+) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/util/YamlUtilsTest.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java new file mode 100644 index 000000000000..06f74c1c431f --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.utils; + +import static org.apache.beam.sdk.values.Row.toRow; + +import java.math.BigDecimal; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.Field; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.yaml.snakeyaml.Yaml; + +public class YamlUtils { + public static final Map> YAML_VALUE_PARSERS = + ImmutableMap + .> + builder() + .put(Schema.TypeName.BYTE, Byte::valueOf) + .put(Schema.TypeName.INT16, Short::valueOf) + .put(Schema.TypeName.INT32, Integer::valueOf) + .put(Schema.TypeName.INT64, Long::valueOf) + .put(Schema.TypeName.FLOAT, Float::valueOf) + .put(Schema.TypeName.DOUBLE, Double::valueOf) + .put(Schema.TypeName.DECIMAL, BigDecimal::new) + .put(Schema.TypeName.BOOLEAN, Boolean::valueOf) + .put(Schema.TypeName.STRING, str -> str) + .put(Schema.TypeName.BYTES, str -> BaseEncoding.base64().decode(str)) + .build(); + + public static Row toBeamRow(String yamlString, Schema schema) { + Yaml yaml = new Yaml(); + Object yamlMap = yaml.load(yamlString); + if (yamlMap == null) { + if (schema.getFieldCount() == 0) { + return Row.withSchema(schema).build(); + } else { + throw new IllegalArgumentException( + "Received an empty YAML value, but output schema is not empty"); + } + } + + Preconditions.checkArgument( + yamlMap instanceof Map, + "Expected a YAML mapping but got type '%s' instead.", + yamlMap.getClass()); + + return toBeamRow((Map) yamlMap, schema); + } + + private static @Nullable Object toBeamValue(Field field, @Nullable Object yamlValue) { + FieldType fieldType = field.getType(); + + if (yamlValue == null) { + if (fieldType.getNullable()) { + return null; + } else { + throw new IllegalArgumentException( + "Received null value for non-nullable field \"" + field.getName() + "\""); + } + } + + if (yamlValue instanceof String + || yamlValue instanceof Number + || yamlValue instanceof Boolean) { + String yamlStringValue = yamlValue.toString(); + if (YAML_VALUE_PARSERS.containsKey(fieldType.getTypeName())) { + return YAML_VALUE_PARSERS.get(fieldType.getTypeName()).apply(yamlStringValue); + } + } + + if (yamlValue instanceof byte[] && fieldType.getTypeName() == Schema.TypeName.BYTES) { + return yamlValue; + } + + if (yamlValue instanceof List) { + FieldType innerType = + Preconditions.checkNotNull( + fieldType.getCollectionElementType(), + "Cannot convert YAML type '%s` to `%s` because the YAML value is a List, but the output schema field does not define a collection type.", + yamlValue.getClass(), + fieldType); + return ((List) yamlValue) + .stream() + .map(v -> Preconditions.checkNotNull(toBeamValue(field.withType(innerType), v))) + .collect(Collectors.toList()); + } + + if (yamlValue instanceof Map) { + if (fieldType.getTypeName() == Schema.TypeName.ROW) { + Schema nestedSchema = + Preconditions.checkNotNull( + fieldType.getRowSchema(), + "Received a YAML '%s' type, but output schema field '%s' does not define a Row Schema", + yamlValue.getClass(), + fieldType); + return toBeamRow((Map) yamlValue, nestedSchema); + } else if (fieldType.getTypeName() == Schema.TypeName.MAP) { + return yamlValue; + } + } + + throw new UnsupportedOperationException( + String.format( + "Converting YAML type '%s' to '%s' is not supported", yamlValue.getClass(), fieldType)); + } + + @SuppressWarnings("nullness") + public static Row toBeamRow(Map yamlMap, Schema rowSchema) { + + return rowSchema.getFields().stream() + .map(field -> toBeamValue(field, yamlMap.get(field.getName()))) + .collect(toRow(rowSchema)); + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/YamlUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/YamlUtilsTest.java new file mode 100644 index 000000000000..dc9d26e0d4ef --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/YamlUtilsTest.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static org.junit.Assert.assertEquals; + +import java.math.BigDecimal; +import java.util.Arrays; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.utils.YamlUtils; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class YamlUtilsTest { + @Rule public transient ExpectedException thrown = ExpectedException.none(); + + public String makeNested(String input) { + return Arrays.stream(input.split("\n")) + .map(str -> " " + str) + .collect(Collectors.joining("\n")); + } + + @Test + public void testEmptyYamlString() { + Schema schema = Schema.builder().build(); + + assertEquals(Row.nullRow(schema), YamlUtils.toBeamRow("", schema)); + } + + @Test + public void testInvalidEmptyYamlWithNonEmptySchema() { + Schema schema = Schema.builder().addStringField("dummy").build(); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Received an empty YAML value, but output schema is not empty"); + + YamlUtils.toBeamRow("", schema); + } + + @Test + public void testNullableValues() { + String yamlString = "nullable_string:\n" + "nullable_integer:\n" + "nullable_boolean:\n"; + Schema schema = + Schema.builder() + .addNullableStringField("nullable_string") + .addNullableInt32Field("nullable_integer") + .addNullableBooleanField("nullable_boolean") + .build(); + + assertEquals(Row.nullRow(schema), YamlUtils.toBeamRow(yamlString, schema)); + } + + @Test + public void testMissingNullableValues() { + String yamlString = "nullable_string:"; + Schema schema = + Schema.builder() + .addNullableStringField("nullable_string") + .addNullableInt32Field("nullable_integer") + .addNullableBooleanField("nullable_boolean") + .build(); + + assertEquals(Row.nullRow(schema), YamlUtils.toBeamRow(yamlString, schema)); + } + + @Test + public void testInvalidNullableValues() { + String yamlString = "nullable_string:\n" + "integer:"; + Schema schema = + Schema.builder().addNullableStringField("nullable_string").addInt32Field("integer").build(); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Received null value for non-nullable field \"integer\""); + YamlUtils.toBeamRow(yamlString, schema); + } + + @Test + public void testInvalidMissingRequiredValues() { + String yamlString = "nullable_string:"; + Schema schema = + Schema.builder().addNullableStringField("nullable_string").addInt32Field("integer").build(); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Received null value for non-nullable field \"integer\""); + + YamlUtils.toBeamRow(yamlString, schema); + } + + @Test + public void testExtraFieldsAreIgnored() { + String yamlString = "field1: val1\n" + "field2: val2"; + Schema schema = Schema.builder().addStringField("field1").build(); + Row expectedRow = Row.withSchema(schema).withFieldValue("field1", "val1").build(); + + assertEquals(expectedRow, YamlUtils.toBeamRow(yamlString, schema)); + } + + @Test + public void testInvalidTopLevelArray() { + String invalidYaml = "- top_level_list" + "- another_list"; + Schema schema = Schema.builder().build(); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Expected a YAML mapping"); + YamlUtils.toBeamRow(invalidYaml, schema); + } + + final Schema FLAT_SCHEMA = + Schema.builder() + .addByteField("byte_field") + .addInt16Field("int16_field") + .addInt32Field("int32_field") + .addInt64Field("int64_field") + .addFloatField("float_field") + .addDoubleField("double_field") + .addDecimalField("decimal_field") + .addBooleanField("boolean_field") + .addStringField("string_field") + .addByteArrayField("bytes_field") + .build(); + + final Row FLAT_ROW = + Row.withSchema(FLAT_SCHEMA) + .withFieldValue("byte_field", Byte.valueOf("123")) + .withFieldValue("int16_field", Short.valueOf("16")) + .withFieldValue("int32_field", 32) + .withFieldValue("int64_field", 64L) + .withFieldValue("float_field", 123.456F) + .withFieldValue("double_field", 456.789) + .withFieldValue("decimal_field", BigDecimal.valueOf(789.123)) + .withFieldValue("boolean_field", true) + .withFieldValue("string_field", "some string") + .withFieldValue("bytes_field", BaseEncoding.base64().decode("abc")) + .build(); + + String FLAT_YAML = + "byte_field: 123\n" + + "int16_field: 16\n" + + "int32_field: 32\n" + + "int64_field: 64\n" + + "float_field: 123.456\n" + + "double_field: 456.789\n" + + "decimal_field: 789.123\n" + + "boolean_field: true\n" + + "string_field: some string\n" + + "bytes_field: abc"; + + @Test + public void testAllTypesFlat() { + assertEquals(FLAT_ROW, YamlUtils.toBeamRow(FLAT_YAML, FLAT_SCHEMA)); + } + + @Test + public void testAllTypesNested() { + String nestedFlatTypes = makeNested(FLAT_YAML); + String topLevelYaml = "top_string: abc\n" + "nested: \n" + nestedFlatTypes; + + Schema schema = + Schema.builder().addStringField("top_string").addRowField("nested", FLAT_SCHEMA).build(); + Row expectedRow = + Row.withSchema(schema) + .withFieldValue("top_string", "abc") + .withFieldValue("nested", FLAT_ROW) + .build(); + + assertEquals(expectedRow, YamlUtils.toBeamRow(topLevelYaml, schema)); + } + + String INT_ARRAY_YAML = "arr:\n" + " - 1\n" + " - 2\n" + " - 3\n" + " - 4\n" + " - 5\n"; + + Schema INT_ARRAY_SCHEMA = Schema.builder().addArrayField("arr", Schema.FieldType.INT32).build(); + + Row INT_ARRAY_ROW = + Row.withSchema(INT_ARRAY_SCHEMA) + .withFieldValue("arr", IntStream.range(1, 6).boxed().collect(Collectors.toList())) + .build(); + + @Test + public void testArray() { + assertEquals(INT_ARRAY_ROW, YamlUtils.toBeamRow(INT_ARRAY_YAML, INT_ARRAY_SCHEMA)); + } + + @Test + public void testNestedArray() { + String nestedArray = makeNested(INT_ARRAY_YAML); + String yamlString = "str_field: some string\n" + "nested: \n" + nestedArray; + + Schema schema = + Schema.builder() + .addStringField("str_field") + .addRowField("nested", INT_ARRAY_SCHEMA) + .build(); + + Row expectedRow = + Row.withSchema(schema) + .withFieldValue("str_field", "some string") + .withFieldValue("nested", INT_ARRAY_ROW) + .build(); + + assertEquals(expectedRow, YamlUtils.toBeamRow(yamlString, schema)); + } +} From a8c77d825485314dbf527c7f7ac94f5c932f3527 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Thu, 4 Apr 2024 13:21:29 -0400 Subject: [PATCH 08/21] snakeyaml dependency --- sdks/java/core/build.gradle | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle index 438a3fb1806c..0658d9e0d3f3 100644 --- a/sdks/java/core/build.gradle +++ b/sdks/java/core/build.gradle @@ -98,6 +98,7 @@ dependencies { permitUnusedDeclared enforcedPlatform(library.java.google_cloud_platform_libraries_bom) provided library.java.json_org implementation library.java.everit_json_schema + implementation group: 'org.yaml', name: 'snakeyaml', version: '1.28' shadowTest library.java.everit_json_schema provided library.java.junit testImplementation "com.github.stefanbirkner:system-rules:1.19.0" From cbcdea6f60c041cda01ce348f84d50bffd4735c7 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Thu, 4 Apr 2024 14:28:52 -0400 Subject: [PATCH 09/21] address comments --- .../beam/sdk/schemas/utils/YamlUtils.java | 4 ++ .../org/apache/beam/sdk/managed/Managed.java | 59 ++++++++++--------- .../ManagedSchemaTransformProvider.java | 22 +++---- .../apache/beam/sdk/managed/ManagedTest.java | 2 + 4 files changed, 44 insertions(+), 43 deletions(-) create mode 100644 sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java index 06f74c1c431f..0f0aaf383046 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java @@ -136,4 +136,8 @@ public static Row toBeamRow(Map yamlMap, Schema rowSchema) { .map(field -> toBeamValue(field, yamlMap.get(field.getName()))) .collect(toRow(rowSchema)); } + + public static String yamlStringFromMap(Map map) { + return new Yaml().dumpAsMap(map); + } } diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java index 8565f094dd52..d8a8614b1069 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java @@ -19,25 +19,27 @@ import com.google.auto.value.AutoValue; import java.util.Map; -import java.util.regex.Pattern; import javax.annotation.Nullable; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.utils.YamlUtils; import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.vendor.grpc.v1p60p1.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; public class Managed { public static Read read() { - return new AutoValue_Managed_Read.Builder().setPattern(Read.PATTERN).build(); + return new AutoValue_Managed_Read.Builder().build(); } @AutoValue public abstract static class Read extends SchemaTransform { - protected static final Pattern PATTERN = - Pattern.compile("beam:schematransform:org.apache.beam:[\\w-]+_read[\\w-]*:[\\w-]+"); + public static Map TRANSFORMS = + ImmutableMap.builder() + .put("iceberg", "beam:schematransform:org.apache.beam:iceberg_read:v1") + .build(); abstract String getSource(); - abstract Pattern getPattern(); - abstract @Nullable String getConfig(); abstract @Nullable String getConfigUrl(); @@ -48,8 +50,6 @@ public abstract static class Read extends SchemaTransform { abstract static class Builder { abstract Builder setSource(String source); - abstract Builder setPattern(Pattern pattern); - abstract Builder setConfig(String config); abstract Builder setConfigUrl(String configUrl); @@ -57,8 +57,13 @@ abstract static class Builder { abstract Read build(); } - public Read from(String identifier) { - return toBuilder().setSource(identifier).build(); + public Read from(String source) { + Preconditions.checkArgument( + TRANSFORMS.containsKey(source.toLowerCase()), + "An unsupported source was specified: '%s'. Please specify one of the following source: %s", + source, + TRANSFORMS.keySet()); + return toBuilder().setSource(TRANSFORMS.get(source.toLowerCase())).build(); } public Read withConfigUrl(String configUrl) { @@ -70,7 +75,7 @@ public Read withConfig(String config) { } public Read withConfig(Map config) { - return toBuilder().setConfig(mapToYamlString(config)).build(); + return toBuilder().setConfig(YamlUtils.yamlStringFromMap(config)).build(); } @Override @@ -83,25 +88,25 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { .build(); SchemaTransform underlyingTransform = - ManagedSchemaTransformProvider.of(getPattern()).from(managedConfig); + ManagedSchemaTransformProvider.of(TRANSFORMS.values()).from(managedConfig); return input.apply(underlyingTransform); } } public static Write write() { - return new AutoValue_Managed_Write.Builder().setPattern(Write.PATTERN).build(); + return new AutoValue_Managed_Write.Builder().build(); } @AutoValue public abstract static class Write extends SchemaTransform { - protected static final Pattern PATTERN = - Pattern.compile("beam:schematransform:org.apache.beam:[\\w-]+_write[\\w-]*:[\\w-]+"); + public static Map TRANSFORMS = + ImmutableMap.builder() + .put("iceberg", "beam:schematransform:org.apache.beam:iceberg_write:v1") + .build(); abstract String getSink(); - abstract Pattern getPattern(); - abstract @Nullable String getConfig(); abstract @Nullable String getConfigUrl(); @@ -112,8 +117,6 @@ public abstract static class Write extends SchemaTransform { abstract static class Builder { abstract Builder setSink(String source); - abstract Builder setPattern(Pattern pattern); - abstract Builder setConfig(String config); abstract Builder setConfigUrl(String configUrl); @@ -121,8 +124,13 @@ abstract static class Builder { abstract Write build(); } - public Write to(String source) { - return toBuilder().setSink(source).build(); + public Write to(String sink) { + Preconditions.checkArgument( + TRANSFORMS.containsKey(sink.toLowerCase()), + "An unsupported sink was specified: '%s'. Please specify one of the following sinks: %s", + sink, + TRANSFORMS.keySet()); + return toBuilder().setSink(TRANSFORMS.get(sink.toLowerCase())).build(); } public Write withConfigUrl(String configUrl) { @@ -134,7 +142,7 @@ public Write withConfig(String config) { } public Write withConfig(Map config) { - return toBuilder().setConfig(mapToYamlString(config)).build(); + return toBuilder().setConfig(YamlUtils.yamlStringFromMap(config)).build(); } @Override @@ -147,14 +155,9 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { .build(); SchemaTransform underlyingTransform = - ManagedSchemaTransformProvider.of(getPattern()).from(managedConfig); + ManagedSchemaTransformProvider.of(TRANSFORMS.values()).from(managedConfig); return input.apply(underlyingTransform); } } - - // TODO: implement this - private static String mapToYamlString(Map map) { - return ""; - } } diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java index ccabc78a3df7..3afcfde5968d 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java @@ -22,10 +22,10 @@ import com.google.auto.service.AutoService; import com.google.auto.value.AutoValue; import java.io.IOException; +import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.ServiceLoader; -import java.util.regex.Pattern; import javax.annotation.Nullable; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.schemas.AutoValueSchema; @@ -35,6 +35,7 @@ import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.schemas.utils.YamlUtils; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.grpc.v1p60p1.com.google.common.base.Preconditions; @@ -43,7 +44,6 @@ @AutoService(SchemaTransformProvider.class) class ManagedSchemaTransformProvider extends TypedSchemaTransformProvider { - private static final String MANAGED_NAMESPACE = "managed"; @Override public String identifier() { @@ -52,7 +52,7 @@ public String identifier() { private final Map schemaTransformProviders = new HashMap<>(); - private ManagedSchemaTransformProvider(Pattern pattern) { + private ManagedSchemaTransformProvider(Collection identifiers) { try { for (SchemaTransformProvider schemaTransformProvider : ServiceLoader.load(SchemaTransformProvider.class)) { @@ -67,14 +67,14 @@ private ManagedSchemaTransformProvider(Pattern pattern) { throw new RuntimeException(e.getMessage()); } - schemaTransformProviders.entrySet().removeIf(e -> !pattern.matcher(e.getKey()).matches()); + schemaTransformProviders.entrySet().removeIf(e -> !identifiers.contains(e.getKey())); } private static @Nullable ManagedSchemaTransformProvider managedProvider = null; - public static ManagedSchemaTransformProvider of(Pattern pattern) { + public static ManagedSchemaTransformProvider of(Collection supportedIdentifiers) { if (managedProvider == null) { - managedProvider = new ManagedSchemaTransformProvider(pattern); + managedProvider = new ManagedSchemaTransformProvider(supportedIdentifiers); } return managedProvider; } @@ -175,14 +175,6 @@ private static Row getRowConfig(ManagedConfig config, Schema transformSchema) th transformYamlConfig = config.getConfig(); } - return yamlToBeamRow(Preconditions.checkNotNull(transformYamlConfig), transformSchema); - } - - // TODO: implement this method - private static Row yamlToBeamRow(String yaml, Schema schema) throws Exception { - // parse yaml string and convert to Row - // throw an exception if there are missing required fields or if types don't match - System.out.println(yaml); - return Row.nullRow(schema); + return YamlUtils.toBeamRow(transformYamlConfig, transformSchema); } } diff --git a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java new file mode 100644 index 000000000000..af0659e4ee2e --- /dev/null +++ b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java @@ -0,0 +1,2 @@ +package org.apache.beam.sdk.managed;public class ManagedTest { +} From 7013ce30066c7aba292db57c510af1c2eb743970 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Thu, 4 Apr 2024 14:51:57 -0400 Subject: [PATCH 10/21] address compile failures --- .../org/apache/beam/sdk/managed/Managed.java | 33 ++++++++++++++++--- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java index d8a8614b1069..1e017cdc71b3 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java @@ -18,17 +18,22 @@ package org.apache.beam.sdk.managed; import com.google.auto.value.AutoValue; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import javax.annotation.Nullable; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.utils.YamlUtils; import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.vendor.grpc.v1p60p1.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.grpc.v1p60p1.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; public class Managed { public static Read read() { - return new AutoValue_Managed_Read.Builder().build(); + return new AutoValue_Managed_Read.Builder() + .setSupportedIdentifiers(new ArrayList<>(Read.TRANSFORMS.values())) + .build(); } @AutoValue @@ -44,15 +49,19 @@ public abstract static class Read extends SchemaTransform { abstract @Nullable String getConfigUrl(); + abstract List getSupportedIdentifiers(); + abstract Builder toBuilder(); @AutoValue.Builder abstract static class Builder { abstract Builder setSource(String source); - abstract Builder setConfig(String config); + abstract Builder setConfig(@Nullable String config); - abstract Builder setConfigUrl(String configUrl); + abstract Builder setConfigUrl(@Nullable String configUrl); + + abstract Builder setSupportedIdentifiers(List supportedIdentifiers); abstract Read build(); } @@ -78,6 +87,11 @@ public Read withConfig(Map config) { return toBuilder().setConfig(YamlUtils.yamlStringFromMap(config)).build(); } + @VisibleForTesting + Read withSupportedIdentifiers(List supportedIdentifiers) { + return toBuilder().setSupportedIdentifiers(supportedIdentifiers).build(); + } + @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { ManagedSchemaTransformProvider.ManagedConfig managedConfig = @@ -95,7 +109,9 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { } public static Write write() { - return new AutoValue_Managed_Write.Builder().build(); + return new AutoValue_Managed_Write.Builder() + .setSupportedIdentifiers(new ArrayList<>(Write.TRANSFORMS.values())) + .build(); } @AutoValue @@ -111,6 +127,8 @@ public abstract static class Write extends SchemaTransform { abstract @Nullable String getConfigUrl(); + abstract List getSupportedIdentifiers(); + abstract Builder toBuilder(); @AutoValue.Builder @@ -121,6 +139,8 @@ abstract static class Builder { abstract Builder setConfigUrl(String configUrl); + abstract Builder setSupportedIdentifiers(List supportedIdentifiers); + abstract Write build(); } @@ -145,6 +165,11 @@ public Write withConfig(Map config) { return toBuilder().setConfig(YamlUtils.yamlStringFromMap(config)).build(); } + @VisibleForTesting + Write withSupportedIdentifiers(List supportedIdentifiers) { + return toBuilder().setSupportedIdentifiers(supportedIdentifiers).build(); + } + @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { ManagedSchemaTransformProvider.ManagedConfig managedConfig = From d4258cd5e598294ed58f0e62409f6847d7a67e33 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Thu, 4 Apr 2024 15:55:43 -0400 Subject: [PATCH 11/21] address compile failures; address comments --- .../beam/sdk/schemas/utils/YamlUtils.java | 26 +++++---- .../apache/beam/sdk/util/YamlUtilsTest.java | 4 +- sdks/java/managed/build.gradle | 3 ++ .../org/apache/beam/sdk/managed/Managed.java | 53 +++++++++---------- .../ManagedSchemaTransformProvider.java | 6 +-- 5 files changed, 50 insertions(+), 42 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java index 0f0aaf383046..758b75482511 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java @@ -35,7 +35,7 @@ import org.yaml.snakeyaml.Yaml; public class YamlUtils { - public static final Map> YAML_VALUE_PARSERS = + private static final Map> YAML_VALUE_PARSERS = ImmutableMap .> @@ -52,24 +52,30 @@ public class YamlUtils { .put(Schema.TypeName.BYTES, str -> BaseEncoding.base64().decode(str)) .build(); - public static Row toBeamRow(String yamlString, Schema schema) { - Yaml yaml = new Yaml(); - Object yamlMap = yaml.load(yamlString); - if (yamlMap == null) { - if (schema.getFieldCount() == 0) { - return Row.withSchema(schema).build(); + public static Row toBeamRow(@Nullable String yamlString, Schema schema) { + if (yamlString == null || yamlString.isEmpty()) { + List requiredFields = + schema.getFields().stream() + .filter(field -> !field.getType().getNullable()) + .collect(Collectors.toList()); + if (requiredFields.isEmpty()) { + return Row.nullRow(schema); } else { throw new IllegalArgumentException( - "Received an empty YAML value, but output schema is not empty"); + String.format( + "Received an empty YAML string, but output schema contains required fields: %s", + requiredFields)); } } + Yaml yaml = new Yaml(); + Object yamlMap = yaml.load(yamlString); Preconditions.checkArgument( yamlMap instanceof Map, "Expected a YAML mapping but got type '%s' instead.", - yamlMap.getClass()); + Preconditions.checkNotNull(yamlMap).getClass()); - return toBeamRow((Map) yamlMap, schema); + return toBeamRow((Map) Preconditions.checkNotNull(yamlMap), schema); } private static @Nullable Object toBeamValue(Field field, @Nullable Object yamlValue) { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/YamlUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/YamlUtilsTest.java index dc9d26e0d4ef..e7105d9e42ca 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/YamlUtilsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/YamlUtilsTest.java @@ -55,7 +55,9 @@ public void testInvalidEmptyYamlWithNonEmptySchema() { Schema schema = Schema.builder().addStringField("dummy").build(); thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Received an empty YAML value, but output schema is not empty"); + thrown.expectMessage( + "Received an empty YAML string, but output schema contains required fields"); + thrown.expectMessage("dummy"); YamlUtils.toBeamRow("", schema); } diff --git a/sdks/java/managed/build.gradle b/sdks/java/managed/build.gradle index 05ec19d597f7..48f51b8e94d5 100644 --- a/sdks/java/managed/build.gradle +++ b/sdks/java/managed/build.gradle @@ -28,5 +28,8 @@ ext.summary = """Library that provides managed IOs.""" dependencies { implementation project(path: ":sdks:java:core", configuration: "shadow") + implementation library.java.vendored_guava_32_1_2_jre + + testImplementation library.java.junit testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") } diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java index 1e017cdc71b3..c6225e903ad8 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java @@ -25,22 +25,31 @@ import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.utils.YamlUtils; import org.apache.beam.sdk.values.PCollectionRowTuple; -import org.apache.beam.vendor.grpc.v1p60p1.com.google.common.annotations.VisibleForTesting; -import org.apache.beam.vendor.grpc.v1p60p1.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; public class Managed { - public static Read read() { + public static final String ICEBERG = "iceberg"; + + public static Read read(String source) { + return new AutoValue_Managed_Read.Builder() + .setSource( + Preconditions.checkNotNull( + Read.TRANSFORMS.get(source.toLowerCase()), + "An unsupported source was specified: '%s'. Please specify one of the following source: %s", + source, + Read.TRANSFORMS.keySet())) .setSupportedIdentifiers(new ArrayList<>(Read.TRANSFORMS.values())) .build(); } @AutoValue public abstract static class Read extends SchemaTransform { - public static Map TRANSFORMS = + public static final Map TRANSFORMS = ImmutableMap.builder() - .put("iceberg", "beam:schematransform:org.apache.beam:iceberg_read:v1") + .put(ICEBERG, "beam:schematransform:org.apache.beam:iceberg_read:v1") .build(); abstract String getSource(); @@ -66,15 +75,6 @@ abstract static class Builder { abstract Read build(); } - public Read from(String source) { - Preconditions.checkArgument( - TRANSFORMS.containsKey(source.toLowerCase()), - "An unsupported source was specified: '%s'. Please specify one of the following source: %s", - source, - TRANSFORMS.keySet()); - return toBuilder().setSource(TRANSFORMS.get(source.toLowerCase())).build(); - } - public Read withConfigUrl(String configUrl) { return toBuilder().setConfigUrl(configUrl).build(); } @@ -108,17 +108,23 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { } } - public static Write write() { + public static Write write(String sink) { return new AutoValue_Managed_Write.Builder() + .setSink( + Preconditions.checkNotNull( + Write.TRANSFORMS.get(sink.toLowerCase()), + "An unsupported sink was specified: '%s'. Please specify one of the following sinks: %s", + sink, + Write.TRANSFORMS.keySet())) .setSupportedIdentifiers(new ArrayList<>(Write.TRANSFORMS.values())) .build(); } @AutoValue public abstract static class Write extends SchemaTransform { - public static Map TRANSFORMS = + public static final Map TRANSFORMS = ImmutableMap.builder() - .put("iceberg", "beam:schematransform:org.apache.beam:iceberg_write:v1") + .put(ICEBERG, "beam:schematransform:org.apache.beam:iceberg_write:v1") .build(); abstract String getSink(); @@ -135,24 +141,15 @@ public abstract static class Write extends SchemaTransform { abstract static class Builder { abstract Builder setSink(String source); - abstract Builder setConfig(String config); + abstract Builder setConfig(@Nullable String config); - abstract Builder setConfigUrl(String configUrl); + abstract Builder setConfigUrl(@Nullable String configUrl); abstract Builder setSupportedIdentifiers(List supportedIdentifiers); abstract Write build(); } - public Write to(String sink) { - Preconditions.checkArgument( - TRANSFORMS.containsKey(sink.toLowerCase()), - "An unsupported sink was specified: '%s'. Please specify one of the following sinks: %s", - sink, - TRANSFORMS.keySet()); - return toBuilder().setSink(TRANSFORMS.get(sink.toLowerCase())).build(); - } - public Write withConfigUrl(String configUrl) { return toBuilder().setConfigUrl(configUrl).build(); } diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java index 3afcfde5968d..9c9eb8914045 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java @@ -38,7 +38,7 @@ import org.apache.beam.sdk.schemas.utils.YamlUtils; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; -import org.apache.beam.vendor.grpc.v1p60p1.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; @AutoService(SchemaTransformProvider.class) @@ -99,9 +99,9 @@ public static Builder builder() { public abstract static class Builder { public abstract Builder setTransformIdentifier(String identifier); - public abstract Builder setConfigUrl(String configUrl); + public abstract Builder setConfigUrl(@Nullable String configUrl); - public abstract Builder setConfig(String config); + public abstract Builder setConfig(@Nullable String config); public abstract ManagedConfig build(); } From 02c04e607082009f7c3144d59427fae802707921 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 5 Apr 2024 10:29:08 -0400 Subject: [PATCH 12/21] add tests; actually read from config file --- sdks/java/core/build.gradle | 2 +- .../beam/sdk/schemas/utils/YamlUtils.java | 33 ++++- sdks/java/managed/build.gradle | 2 + .../org/apache/beam/sdk/managed/Managed.java | 10 +- .../ManagedSchemaTransformProvider.java | 40 +++--- .../ManagedSchemaTransformProviderTest.java | 103 ++++++++++++++ .../apache/beam/sdk/managed/ManagedTest.java | 127 +++++++++++++++++- .../managed/TestSchemaTransformProvider.java | 98 ++++++++++++++ .../src/test/resources/test_config.yaml | 2 + 9 files changed, 384 insertions(+), 33 deletions(-) create mode 100644 sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java create mode 100644 sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/TestSchemaTransformProvider.java create mode 100644 sdks/java/managed/src/test/resources/test_config.yaml diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle index 0658d9e0d3f3..5a47cb5237ea 100644 --- a/sdks/java/core/build.gradle +++ b/sdks/java/core/build.gradle @@ -98,7 +98,7 @@ dependencies { permitUnusedDeclared enforcedPlatform(library.java.google_cloud_platform_libraries_bom) provided library.java.json_org implementation library.java.everit_json_schema - implementation group: 'org.yaml', name: 'snakeyaml', version: '1.28' + implementation "org.yaml:snakeyaml:2.0" shadowTest library.java.everit_json_schema provided library.java.junit testImplementation "com.github.stefanbirkner:system-rules:1.19.0" diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java index 758b75482511..9fe7b09fb311 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java @@ -28,6 +28,7 @@ import org.apache.beam.sdk.schemas.Schema.Field; import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.CaseFormat; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding; @@ -53,6 +54,11 @@ public class YamlUtils { .build(); public static Row toBeamRow(@Nullable String yamlString, Schema schema) { + return toBeamRow(yamlString, schema, false); + } + + public static Row toBeamRow( + @Nullable String yamlString, Schema schema, boolean convertNamesToCamelCase) { if (yamlString == null || yamlString.isEmpty()) { List requiredFields = schema.getFields().stream() @@ -75,10 +81,12 @@ public static Row toBeamRow(@Nullable String yamlString, Schema schema) { "Expected a YAML mapping but got type '%s' instead.", Preconditions.checkNotNull(yamlMap).getClass()); - return toBeamRow((Map) Preconditions.checkNotNull(yamlMap), schema); + return toBeamRow( + (Map) Preconditions.checkNotNull(yamlMap), schema, convertNamesToCamelCase); } - private static @Nullable Object toBeamValue(Field field, @Nullable Object yamlValue) { + private static @Nullable Object toBeamValue( + Field field, @Nullable Object yamlValue, boolean convertNamesToCamelCase) { FieldType fieldType = field.getType(); if (yamlValue == null) { @@ -112,7 +120,10 @@ public static Row toBeamRow(@Nullable String yamlString, Schema schema) { fieldType); return ((List) yamlValue) .stream() - .map(v -> Preconditions.checkNotNull(toBeamValue(field.withType(innerType), v))) + .map( + v -> + Preconditions.checkNotNull( + toBeamValue(field.withType(innerType), v, convertNamesToCamelCase))) .collect(Collectors.toList()); } @@ -124,7 +135,7 @@ public static Row toBeamRow(@Nullable String yamlString, Schema schema) { "Received a YAML '%s' type, but output schema field '%s' does not define a Row Schema", yamlValue.getClass(), fieldType); - return toBeamRow((Map) yamlValue, nestedSchema); + return toBeamRow((Map) yamlValue, nestedSchema, convertNamesToCamelCase); } else if (fieldType.getTypeName() == Schema.TypeName.MAP) { return yamlValue; } @@ -136,13 +147,21 @@ public static Row toBeamRow(@Nullable String yamlString, Schema schema) { } @SuppressWarnings("nullness") - public static Row toBeamRow(Map yamlMap, Schema rowSchema) { - + public static Row toBeamRow(Map yamlMap, Schema rowSchema, boolean toCamelCase) { return rowSchema.getFields().stream() - .map(field -> toBeamValue(field, yamlMap.get(field.getName()))) + .map( + field -> + toBeamValue( + field, + yamlMap.get(maybeGetSnakeCase(field.getName(), toCamelCase)), + toCamelCase)) .collect(toRow(rowSchema)); } + private static String maybeGetSnakeCase(String str, boolean getSnakeCase) { + return getSnakeCase ? CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, str) : str; + } + public static String yamlStringFromMap(Map map) { return new Yaml().dumpAsMap(map); } diff --git a/sdks/java/managed/build.gradle b/sdks/java/managed/build.gradle index 48f51b8e94d5..88e537d66f8c 100644 --- a/sdks/java/managed/build.gradle +++ b/sdks/java/managed/build.gradle @@ -29,7 +29,9 @@ ext.summary = """Library that provides managed IOs.""" dependencies { implementation project(path: ":sdks:java:core", configuration: "shadow") implementation library.java.vendored_guava_32_1_2_jre +// implementation library.java.vendored_grpc_1_60_1 testImplementation library.java.junit + testRuntimeOnly "org.yaml:snakeyaml:2.0" testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") } diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java index c6225e903ad8..ce3007ef6042 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java @@ -38,7 +38,7 @@ public static Read read(String source) { .setSource( Preconditions.checkNotNull( Read.TRANSFORMS.get(source.toLowerCase()), - "An unsupported source was specified: '%s'. Please specify one of the following source: %s", + "An unsupported source was specified: '%s'. Please specify one of the following sources: %s", source, Read.TRANSFORMS.keySet())) .setSupportedIdentifiers(new ArrayList<>(Read.TRANSFORMS.values())) @@ -64,7 +64,7 @@ public abstract static class Read extends SchemaTransform { @AutoValue.Builder abstract static class Builder { - abstract Builder setSource(String source); + abstract Builder setSource(String sourceIdentifier); abstract Builder setConfig(@Nullable String config); @@ -102,7 +102,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { .build(); SchemaTransform underlyingTransform = - ManagedSchemaTransformProvider.of(TRANSFORMS.values()).from(managedConfig); + new ManagedSchemaTransformProvider(getSupportedIdentifiers()).from(managedConfig); return input.apply(underlyingTransform); } @@ -139,7 +139,7 @@ public abstract static class Write extends SchemaTransform { @AutoValue.Builder abstract static class Builder { - abstract Builder setSink(String source); + abstract Builder setSink(String sinkIdentifier); abstract Builder setConfig(@Nullable String config); @@ -177,7 +177,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { .build(); SchemaTransform underlyingTransform = - ManagedSchemaTransformProvider.of(TRANSFORMS.values()).from(managedConfig); + new ManagedSchemaTransformProvider(getSupportedIdentifiers()).from(managedConfig); return input.apply(underlyingTransform); } diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java index 9c9eb8914045..5c4d19b7f85b 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java @@ -22,12 +22,15 @@ import com.google.auto.service.AutoService; import com.google.auto.value.AutoValue; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.ServiceLoader; import javax.annotation.Nullable; import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.MatchResult; import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; @@ -38,11 +41,12 @@ import org.apache.beam.sdk.schemas.utils.YamlUtils; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; @AutoService(SchemaTransformProvider.class) -class ManagedSchemaTransformProvider +public class ManagedSchemaTransformProvider extends TypedSchemaTransformProvider { @Override @@ -52,7 +56,9 @@ public String identifier() { private final Map schemaTransformProviders = new HashMap<>(); - private ManagedSchemaTransformProvider(Collection identifiers) { + public ManagedSchemaTransformProvider() {} + + public ManagedSchemaTransformProvider(Collection identifiers) { try { for (SchemaTransformProvider schemaTransformProvider : ServiceLoader.load(SchemaTransformProvider.class)) { @@ -70,15 +76,6 @@ private ManagedSchemaTransformProvider(Collection identifiers) { schemaTransformProviders.entrySet().removeIf(e -> !identifiers.contains(e.getKey())); } - private static @Nullable ManagedSchemaTransformProvider managedProvider = null; - - public static ManagedSchemaTransformProvider of(Collection supportedIdentifiers) { - if (managedProvider == null) { - managedProvider = new ManagedSchemaTransformProvider(supportedIdentifiers); - } - return managedProvider; - } - @DefaultSchema(AutoValueSchema.class) @AutoValue public abstract static class ManagedConfig { @@ -158,16 +155,16 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { } } - private static Row getRowConfig(ManagedConfig config, Schema transformSchema) throws Exception { + @VisibleForTesting + static Row getRowConfig(ManagedConfig config, Schema transformSchema) { String transformYamlConfig; if (!Strings.isNullOrEmpty(config.getConfigUrl())) { try { - transformYamlConfig = - FileSystems.open( - FileSystems.matchSingleFileSpec( - Preconditions.checkNotNull(config.getConfigUrl())) - .resourceId()) - .toString(); + MatchResult.Metadata fileMetaData = + FileSystems.matchSingleFileSpec(Preconditions.checkNotNull(config.getConfigUrl())); + ByteBuffer buffer = ByteBuffer.allocate((int) fileMetaData.sizeBytes()); + FileSystems.open(fileMetaData.resourceId()).read(buffer); + transformYamlConfig = new String(buffer.array(), StandardCharsets.US_ASCII); } catch (IOException e) { throw new RuntimeException(e); } @@ -175,6 +172,11 @@ private static Row getRowConfig(ManagedConfig config, Schema transformSchema) th transformYamlConfig = config.getConfig(); } - return YamlUtils.toBeamRow(transformYamlConfig, transformSchema); + return YamlUtils.toBeamRow(transformYamlConfig, transformSchema, true); + } + + @VisibleForTesting + Map getAllProviders() { + return schemaTransformProviders; } } diff --git a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java new file mode 100644 index 000000000000..0c495d0d2c5c --- /dev/null +++ b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.managed; + +import static org.apache.beam.sdk.managed.ManagedSchemaTransformProvider.ManagedConfig; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.net.URISyntaxException; +import java.nio.file.Paths; +import java.util.Arrays; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.values.Row; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class ManagedSchemaTransformProviderTest { + @Rule public transient ExpectedException thrown = ExpectedException.none(); + + @Test + public void testFailWhenNoConfigSpecified() { + ManagedSchemaTransformProvider.ManagedConfig config = + ManagedSchemaTransformProvider.ManagedConfig.builder() + .setTransformIdentifier("some identifier") + .build(); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Please specify a config or a config URL, but not both"); + config.validate(); + } + + @Test + public void testGetRowFromYamlConfig() { + String yamlString = "extra_string: abc\n" + "extra_integer: 123"; + ManagedConfig config = + ManagedConfig.builder() + .setTransformIdentifier(TestSchemaTransformProvider.IDENTIFIER) + .setConfig(yamlString) + .build(); + Schema configSchema = new TestSchemaTransformProvider().configurationSchema(); + Row expectedRow = + Row.withSchema(configSchema) + .withFieldValue("extraString", "abc") + .withFieldValue("extraInteger", 123) + .build(); + Row configRow = + ManagedSchemaTransformProvider.getRowConfig( + config, new TestSchemaTransformProvider().configurationSchema()); + + assertEquals(expectedRow, configRow); + } + + @Test + public void testGetRowFromConfigUrl() throws URISyntaxException { + String yamlConfigPath = + Paths.get(getClass().getClassLoader().getResource("test_config.yaml").toURI()) + .toFile() + .getAbsolutePath(); + ManagedConfig config = + ManagedConfig.builder() + .setTransformIdentifier(TestSchemaTransformProvider.IDENTIFIER) + .setConfigUrl(yamlConfigPath) + .build(); + Schema configSchema = new TestSchemaTransformProvider().configurationSchema(); + Row expectedRow = + Row.withSchema(configSchema) + .withFieldValue("extraString", "abc") + .withFieldValue("extraInteger", 123) + .build(); + Row configRow = + ManagedSchemaTransformProvider.getRowConfig( + config, new TestSchemaTransformProvider().configurationSchema()); + + assertEquals(expectedRow, configRow); + } + + @Test + public void testDiscoverTestProvider() { + ManagedSchemaTransformProvider provider = + new ManagedSchemaTransformProvider(Arrays.asList(TestSchemaTransformProvider.IDENTIFIER)); + + assertTrue(provider.getAllProviders().containsKey(TestSchemaTransformProvider.IDENTIFIER)); + } +} diff --git a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java index af0659e4ee2e..af79aed62231 100644 --- a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java +++ b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java @@ -1,2 +1,127 @@ -package org.apache.beam.sdk.managed;public class ManagedTest { +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.managed; + +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class ManagedTest { + @Rule public transient ExpectedException thrown = ExpectedException.none(); + + @Test + public void testInvalidTransform() { + thrown.expect(NullPointerException.class); + thrown.expectMessage("An unsupported source was specified"); + Managed.read("nonexistent-source"); + + thrown.expect(NullPointerException.class); + thrown.expectMessage("An unsupported sink was specified"); + Managed.write("nonexistent-sink"); + } + + @Rule public TestPipeline pipeline = TestPipeline.create(); + + private static final Schema SCHEMA = + Schema.builder().addStringField("str").addInt32Field("int").build(); + private static final List ROWS = + Arrays.asList( + Row.withSchema(SCHEMA).withFieldValue("str", "a").withFieldValue("int", 1).build(), + Row.withSchema(SCHEMA).withFieldValue("str", "b").withFieldValue("int", 2).build(), + Row.withSchema(SCHEMA).withFieldValue("str", "c").withFieldValue("int", 3).build()); + + public void runTestProviderTest(Managed.Write writeOp) { + PCollection rows = + PCollectionRowTuple.of("input", pipeline.apply(Create.of(ROWS)).setRowSchema(SCHEMA)) + .apply(writeOp) + .get("output"); + + Schema outputSchema = rows.getSchema(); + PAssert.that(rows) + .containsInAnyOrder( + ROWS.stream() + .map( + row -> + Row.withSchema(outputSchema) + .addValues(row.getValues()) + .addValue("abc") + .addValue(123) + .build()) + .collect(Collectors.toList())); + pipeline.run(); + } + + @Test + public void testManagedTestProviderWithMapConfig() { + Managed.Write writeOp = + Managed.write(Managed.ICEBERG) + .toBuilder() + .setSink(TestSchemaTransformProvider.IDENTIFIER) + .build() + .withSupportedIdentifiers(Arrays.asList(TestSchemaTransformProvider.IDENTIFIER)) + .withConfig(ImmutableMap.of("extra_string", "abc", "extra_integer", 123)); + + runTestProviderTest(writeOp); + } + + @Test + public void testManagedTestProviderWithStringConfig() { + Managed.Write writeOp = + Managed.write(Managed.ICEBERG) + .toBuilder() + .setSink(TestSchemaTransformProvider.IDENTIFIER) + .build() + .withSupportedIdentifiers(Arrays.asList(TestSchemaTransformProvider.IDENTIFIER)) + .withConfig("extra_string: abc\nextra_integer: 123"); + + runTestProviderTest(writeOp); + } + + @Test + public void testManagedTestProviderWithConfigFile() throws Exception { + String yamlConfigPath = + Paths.get(getClass().getClassLoader().getResource("test_config.yaml").toURI()) + .toFile() + .getAbsolutePath(); + + Managed.Write writeOp = + Managed.write(Managed.ICEBERG) + .toBuilder() + .setSink(TestSchemaTransformProvider.IDENTIFIER) + .build() + .withSupportedIdentifiers(Arrays.asList(TestSchemaTransformProvider.IDENTIFIER)) + .withConfigUrl(yamlConfigPath); + + runTestProviderTest(writeOp); + } } diff --git a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/TestSchemaTransformProvider.java b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/TestSchemaTransformProvider.java new file mode 100644 index 000000000000..136d98d468d0 --- /dev/null +++ b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/TestSchemaTransformProvider.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.managed; + +import com.google.auto.service.AutoService; +import com.google.auto.value.AutoValue; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptors; + +@AutoService(SchemaTransformProvider.class) +public class TestSchemaTransformProvider + extends TypedSchemaTransformProvider { + static final String IDENTIFIER = "beam:schematransform:org.apache.beam:test_transform:v1"; + + @DefaultSchema(AutoValueSchema.class) + @AutoValue + public abstract static class Config { + public static Builder builder() { + return new AutoValue_TestSchemaTransformProvider_Config.Builder(); + } + + @SchemaFieldDescription("String to add to each row element.") + public abstract String getExtraString(); + + @SchemaFieldDescription("Integer to add to each row element.") + public abstract Integer getExtraInteger(); + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setExtraString(String extraString); + + public abstract Builder setExtraInteger(Integer extraInteger); + + public abstract Config build(); + } + } + + @Override + public SchemaTransform from(Config config) { + String extraString = config.getExtraString(); + Integer extraInteger = config.getExtraInteger(); + return new SchemaTransform() { + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + Schema schema = + Schema.builder() + .addFields(input.get("input").getSchema().getFields()) + .addStringField("extra_string") + .addInt32Field("extra_integer") + .build(); + PCollection rows = + input + .get("input") + .apply( + MapElements.into(TypeDescriptors.rows()) + .via( + row -> + Row.withSchema(schema) + .addValues(row.getValues()) + .addValue(extraString) + .addValue(extraInteger) + .build())) + .setRowSchema(schema); + return PCollectionRowTuple.of("output", rows); + } + }; + } + + @Override + public String identifier() { + return IDENTIFIER; + } +} diff --git a/sdks/java/managed/src/test/resources/test_config.yaml b/sdks/java/managed/src/test/resources/test_config.yaml new file mode 100644 index 000000000000..c2d3465f62cc --- /dev/null +++ b/sdks/java/managed/src/test/resources/test_config.yaml @@ -0,0 +1,2 @@ +extra_string: "abc" +extra_integer: 123 \ No newline at end of file From 134f7d5c4bcc0dad0e64053f7586c3c23ba9ff71 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 5 Apr 2024 10:38:13 -0400 Subject: [PATCH 13/21] add license header; address some comments --- .../org/apache/beam/sdk/managed/Managed.java | 2 ++ .../ManagedSchemaTransformProvider.java | 5 +++-- .../src/test/resources/test_config.yaml | 19 +++++++++++++++++++ 3 files changed, 24 insertions(+), 2 deletions(-) diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java index ce3007ef6042..cfd93f4a45b1 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java @@ -30,6 +30,8 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; public class Managed { + + // TODO: Dynamically generate a list of supported transforms public static final String ICEBERG = "iceberg"; public static Read read(String source) { diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java index 5c4d19b7f85b..6785bdc36a8f 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java @@ -78,7 +78,8 @@ public ManagedSchemaTransformProvider(Collection identifiers) { @DefaultSchema(AutoValueSchema.class) @AutoValue - public abstract static class ManagedConfig { + @VisibleForTesting + abstract static class ManagedConfig { public static Builder builder() { return new AutoValue_ManagedSchemaTransformProvider_ManagedConfig.Builder(); } @@ -137,7 +138,7 @@ protected SchemaTransform from(ManagedConfig managedConfig) { return new ManagedSchemaTransform(transformConfig, schemaTransformProvider); } - protected static class ManagedSchemaTransform extends SchemaTransform { + private static class ManagedSchemaTransform extends SchemaTransform { private final Row transformConfig; private final SchemaTransformProvider underlyingTransformProvider; diff --git a/sdks/java/managed/src/test/resources/test_config.yaml b/sdks/java/managed/src/test/resources/test_config.yaml index c2d3465f62cc..7725c32b348e 100644 --- a/sdks/java/managed/src/test/resources/test_config.yaml +++ b/sdks/java/managed/src/test/resources/test_config.yaml @@ -1,2 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + extra_string: "abc" extra_integer: 123 \ No newline at end of file From 59fce6ae136703688f4edef7e87a53dd00c700da Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 5 Apr 2024 10:54:39 -0400 Subject: [PATCH 14/21] spotless --- .../org/apache/beam/sdk/util/YamlUtilsTest.java | 14 ++++++++------ .../managed/ManagedSchemaTransformProvider.java | 2 +- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/YamlUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/YamlUtilsTest.java index e7105d9e42ca..6e6984dde3a6 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/YamlUtilsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/YamlUtilsTest.java @@ -129,7 +129,7 @@ public void testInvalidTopLevelArray() { YamlUtils.toBeamRow(invalidYaml, schema); } - final Schema FLAT_SCHEMA = + private static final Schema FLAT_SCHEMA = Schema.builder() .addByteField("byte_field") .addInt16Field("int16_field") @@ -143,7 +143,7 @@ public void testInvalidTopLevelArray() { .addByteArrayField("bytes_field") .build(); - final Row FLAT_ROW = + private static final Row FLAT_ROW = Row.withSchema(FLAT_SCHEMA) .withFieldValue("byte_field", Byte.valueOf("123")) .withFieldValue("int16_field", Short.valueOf("16")) @@ -157,7 +157,7 @@ public void testInvalidTopLevelArray() { .withFieldValue("bytes_field", BaseEncoding.base64().decode("abc")) .build(); - String FLAT_YAML = + private static final String FLAT_YAML = "byte_field: 123\n" + "int16_field: 16\n" + "int32_field: 32\n" @@ -190,11 +190,13 @@ public void testAllTypesNested() { assertEquals(expectedRow, YamlUtils.toBeamRow(topLevelYaml, schema)); } - String INT_ARRAY_YAML = "arr:\n" + " - 1\n" + " - 2\n" + " - 3\n" + " - 4\n" + " - 5\n"; + private static final String INT_ARRAY_YAML = + "arr:\n" + " - 1\n" + " - 2\n" + " - 3\n" + " - 4\n" + " - 5\n"; - Schema INT_ARRAY_SCHEMA = Schema.builder().addArrayField("arr", Schema.FieldType.INT32).build(); + private static final Schema INT_ARRAY_SCHEMA = + Schema.builder().addArrayField("arr", Schema.FieldType.INT32).build(); - Row INT_ARRAY_ROW = + private static final Row INT_ARRAY_ROW = Row.withSchema(INT_ARRAY_SCHEMA) .withFieldValue("arr", IntStream.range(1, 6).boxed().collect(Collectors.toList())) .build(); diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java index 6785bdc36a8f..643b62df556f 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java @@ -58,7 +58,7 @@ public String identifier() { public ManagedSchemaTransformProvider() {} - public ManagedSchemaTransformProvider(Collection identifiers) { + ManagedSchemaTransformProvider(Collection identifiers) { try { for (SchemaTransformProvider schemaTransformProvider : ServiceLoader.load(SchemaTransformProvider.class)) { From 5e4983ab857e165b7bfb6d286a2677762478aab0 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 5 Apr 2024 11:39:46 -0400 Subject: [PATCH 15/21] remove managed dependency from kafka --- sdks/java/io/kafka/build.gradle | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/java/io/kafka/build.gradle b/sdks/java/io/kafka/build.gradle index abcaff4bb7ff..269ddb3f5eb2 100644 --- a/sdks/java/io/kafka/build.gradle +++ b/sdks/java/io/kafka/build.gradle @@ -50,7 +50,6 @@ kafkaVersions.each{k,v -> configurations.create("kafkaVersion$k")} dependencies { implementation library.java.vendored_guava_32_1_2_jre - implementation project(path: ':sdks:java:managed') provided library.java.jackson_dataformat_csv permitUnusedDeclared library.java.jackson_dataformat_csv implementation project(path: ":sdks:java:core", configuration: "shadow") From c9f08336f97334b8dfafa8984870dc1ec4be0a72 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 5 Apr 2024 17:47:36 -0400 Subject: [PATCH 16/21] add some javadoc --- .../beam/sdk/schemas/utils/YamlUtils.java | 5 +- .../org/apache/beam/sdk/managed/Managed.java | 88 +++++++++++++++---- .../apache/beam/sdk/managed/ManagedTest.java | 15 +--- 3 files changed, 78 insertions(+), 30 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java index 9fe7b09fb311..5c05b2bed396 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java @@ -162,7 +162,10 @@ private static String maybeGetSnakeCase(String str, boolean getSnakeCase) { return getSnakeCase ? CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, str) : str; } - public static String yamlStringFromMap(Map map) { + public static String yamlStringFromMap(@Nullable Map map) { + if (map == null || map.isEmpty()) { + return ""; + } return new Yaml().dumpAsMap(map); } } diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java index cfd93f4a45b1..1bb5599100fa 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java @@ -23,17 +23,60 @@ import java.util.Map; import javax.annotation.Nullable; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.utils.YamlUtils; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +/** + * Top-level {@link org.apache.beam.sdk.transforms.PTransform}s that build and instantiate turnkey transforms. + * + *

Available transforms

+ * + *

This API currently supports two operations: {@link Read} and {@link Write}. Each one enumerates the available + * transforms in a {@code TRANSFORMS} map. + * + *

Building a Managed turnkey transform

+ * + *

Turnkey transforms are represented as {@link SchemaTransform}s, which means each one has a defined configuration. + * A given transform can be built with a {@code Map} that specifies arguments using like so: + *

{@code
+ * PCollectionRowTuple output = Managed.read(ICEBERG)
+ *      .withConfig(ImmutableMap..builder()
+ *          .put("foo", "abc")
+ *          .put("bar", 123)
+ *          .build());
+ * }
+ * + *

Instead of specifying configuration arguments directly in the code, one can provide the location to a YAML file + * that contains this information. Say we have the following YAML file: + * + *

{@code
+ * foo: "abc"
+ * bar: 123
+ * }
+ * + *

The file's path can be passed in to the Managed API like so: + * + *

{@code
+ * PCollectionRowTuple output = Managed.write(ICEBERG)
+ *      .withConfigUrl();
+ * }
+ */ public class Managed { // TODO: Dynamically generate a list of supported transforms public static final String ICEBERG = "iceberg"; + + /** + * Instantiates a {@link Managed.Read} transform for the specified source. The supported managed sources are: + *
    + *
  • {@link Managed#ICEBERG} : Read from Apache Iceberg + *
+ */ public static Read read(String source) { return new AutoValue_Managed_Read.Builder() @@ -77,18 +120,22 @@ abstract static class Builder { abstract Read build(); } - public Read withConfigUrl(String configUrl) { - return toBuilder().setConfigUrl(configUrl).build(); - } - - public Read withConfig(String config) { - return toBuilder().setConfig(config).build(); - } - + /** + * Use the input Map of configuration arguments to build and instantiate the underlying transform. + * The map can ignore nullable parameters, but needs to include all required parameters. Check the + * underlying transform's schema ({@link SchemaTransformProvider#configurationSchema()}) to see which parameters are available. + */ public Read withConfig(Map config) { return toBuilder().setConfig(YamlUtils.yamlStringFromMap(config)).build(); } + /** + * Like {@link #withConfig(Map)}, but instead extracts the configuration arguments from a specified YAML file location. + */ + public Read withConfigUrl(String configUrl) { + return toBuilder().setConfigUrl(configUrl).build(); + } + @VisibleForTesting Read withSupportedIdentifiers(List supportedIdentifiers) { return toBuilder().setSupportedIdentifiers(supportedIdentifiers).build(); @@ -110,6 +157,12 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { } } + /** + * Instantiates a {@link Managed.Write} transform for the specified sink. The supported managed sinks are: + *
    + *
  • {@link Managed#ICEBERG} : Write to Apache Iceberg + *
+ */ public static Write write(String sink) { return new AutoValue_Managed_Write.Builder() .setSink( @@ -152,17 +205,22 @@ abstract static class Builder { abstract Write build(); } - public Write withConfigUrl(String configUrl) { - return toBuilder().setConfigUrl(configUrl).build(); + /** + * Use the input Map of configuration arguments to build and instantiate the underlying sink. + * The map can ignore nullable parameters, but needs to include all required parameters. Check the + * underlying sink's configuration schema to see which parameters are available. + */ + public Write withConfig(Map config) { + return toBuilder().setConfig(YamlUtils.yamlStringFromMap(config)).build(); } - public Write withConfig(String config) { - return toBuilder().setConfig(config).build(); + /** + * Like {@link #withConfig(Map)}, but instead extracts the configuration arguments from a specified YAML file location. + */ + public Write withConfigUrl(String configUrl) { + return toBuilder().setConfigUrl(configUrl).build(); } - public Write withConfig(Map config) { - return toBuilder().setConfig(YamlUtils.yamlStringFromMap(config)).build(); - } @VisibleForTesting Write withSupportedIdentifiers(List supportedIdentifiers) { diff --git a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java index af79aed62231..449a1528c2d4 100644 --- a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java +++ b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java @@ -82,7 +82,7 @@ public void runTestProviderTest(Managed.Write writeOp) { } @Test - public void testManagedTestProviderWithMapConfig() { + public void testManagedTestProviderWithConfigMap() { Managed.Write writeOp = Managed.write(Managed.ICEBERG) .toBuilder() @@ -94,19 +94,6 @@ public void testManagedTestProviderWithMapConfig() { runTestProviderTest(writeOp); } - @Test - public void testManagedTestProviderWithStringConfig() { - Managed.Write writeOp = - Managed.write(Managed.ICEBERG) - .toBuilder() - .setSink(TestSchemaTransformProvider.IDENTIFIER) - .build() - .withSupportedIdentifiers(Arrays.asList(TestSchemaTransformProvider.IDENTIFIER)) - .withConfig("extra_string: abc\nextra_integer: 123"); - - runTestProviderTest(writeOp); - } - @Test public void testManagedTestProviderWithConfigFile() throws Exception { String yamlConfigPath = From 65cc688969d7cd9d12dc63418a928abefbd796f0 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 5 Apr 2024 17:53:36 -0400 Subject: [PATCH 17/21] use utf_8 encoding when reading files --- .../apache/beam/sdk/managed/ManagedSchemaTransformProvider.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java index 643b62df556f..60502ee1b7b5 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java @@ -165,7 +165,7 @@ static Row getRowConfig(ManagedConfig config, Schema transformSchema) { FileSystems.matchSingleFileSpec(Preconditions.checkNotNull(config.getConfigUrl())); ByteBuffer buffer = ByteBuffer.allocate((int) fileMetaData.sizeBytes()); FileSystems.open(fileMetaData.resourceId()).read(buffer); - transformYamlConfig = new String(buffer.array(), StandardCharsets.US_ASCII); + transformYamlConfig = new String(buffer.array(), StandardCharsets.UTF_8); } catch (IOException e) { throw new RuntimeException(e); } From 389de7b18020723a4ecfc503ecfa7dafedc4f37a Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 5 Apr 2024 18:51:07 -0400 Subject: [PATCH 18/21] spotless; add tests to javaPrecommit --- build.gradle.kts | 1 + .../org/apache/beam/sdk/managed/Managed.java | 44 +++++++++++-------- 2 files changed, 27 insertions(+), 18 deletions(-) diff --git a/build.gradle.kts b/build.gradle.kts index ded692677b53..9c42ffdc8cea 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -303,6 +303,7 @@ tasks.register("javaPreCommit") { dependsOn(":sdks:java:io:synthetic:build") dependsOn(":sdks:java:io:xml:build") dependsOn(":sdks:java:javadoc:allJavadoc") + dependsOn(":sdks:java:managed:build") dependsOn(":sdks:java:testing:expansion-service:build") dependsOn(":sdks:java:testing:jpms-tests:build") dependsOn(":sdks:java:testing:load-tests:build") diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java index 1bb5599100fa..856c4b8f1571 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java @@ -31,17 +31,20 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; /** - * Top-level {@link org.apache.beam.sdk.transforms.PTransform}s that build and instantiate turnkey transforms. + * Top-level {@link org.apache.beam.sdk.transforms.PTransform}s that build and instantiate turnkey + * transforms. * *

Available transforms

* - *

This API currently supports two operations: {@link Read} and {@link Write}. Each one enumerates the available - * transforms in a {@code TRANSFORMS} map. + *

This API currently supports two operations: {@link Read} and {@link Write}. Each one + * enumerates the available transforms in a {@code TRANSFORMS} map. * *

Building a Managed turnkey transform

* - *

Turnkey transforms are represented as {@link SchemaTransform}s, which means each one has a defined configuration. - * A given transform can be built with a {@code Map} that specifies arguments using like so: + *

Turnkey transforms are represented as {@link SchemaTransform}s, which means each one has a + * defined configuration. A given transform can be built with a {@code Map} that + * specifies arguments using like so: + * *

{@code
  * PCollectionRowTuple output = Managed.read(ICEBERG)
  *      .withConfig(ImmutableMap..builder()
@@ -50,8 +53,8 @@
  *          .build());
  * }
* - *

Instead of specifying configuration arguments directly in the code, one can provide the location to a YAML file - * that contains this information. Say we have the following YAML file: + *

Instead of specifying configuration arguments directly in the code, one can provide the + * location to a YAML file that contains this information. Say we have the following YAML file: * *

{@code
  * foo: "abc"
@@ -70,9 +73,10 @@ public class Managed {
   // TODO: Dynamically generate a list of supported transforms
   public static final String ICEBERG = "iceberg";
 
-
   /**
-   * Instantiates a {@link Managed.Read} transform for the specified source. The supported managed sources are:
+   * Instantiates a {@link Managed.Read} transform for the specified source. The supported managed
+   * sources are:
+   *
    * 
    *
  • {@link Managed#ICEBERG} : Read from Apache Iceberg *
@@ -121,16 +125,18 @@ abstract static class Builder { } /** - * Use the input Map of configuration arguments to build and instantiate the underlying transform. - * The map can ignore nullable parameters, but needs to include all required parameters. Check the - * underlying transform's schema ({@link SchemaTransformProvider#configurationSchema()}) to see which parameters are available. + * Use the input Map of configuration arguments to build and instantiate the underlying + * transform. The map can ignore nullable parameters, but needs to include all required + * parameters. Check the underlying transform's schema ({@link + * SchemaTransformProvider#configurationSchema()}) to see which parameters are available. */ public Read withConfig(Map config) { return toBuilder().setConfig(YamlUtils.yamlStringFromMap(config)).build(); } /** - * Like {@link #withConfig(Map)}, but instead extracts the configuration arguments from a specified YAML file location. + * Like {@link #withConfig(Map)}, but instead extracts the configuration arguments from a + * specified YAML file location. */ public Read withConfigUrl(String configUrl) { return toBuilder().setConfigUrl(configUrl).build(); @@ -158,7 +164,9 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { } /** - * Instantiates a {@link Managed.Write} transform for the specified sink. The supported managed sinks are: + * Instantiates a {@link Managed.Write} transform for the specified sink. The supported managed + * sinks are: + * *
    *
  • {@link Managed#ICEBERG} : Write to Apache Iceberg *
@@ -207,21 +215,21 @@ abstract static class Builder { /** * Use the input Map of configuration arguments to build and instantiate the underlying sink. - * The map can ignore nullable parameters, but needs to include all required parameters. Check the - * underlying sink's configuration schema to see which parameters are available. + * The map can ignore nullable parameters, but needs to include all required parameters. Check + * the underlying sink's configuration schema to see which parameters are available. */ public Write withConfig(Map config) { return toBuilder().setConfig(YamlUtils.yamlStringFromMap(config)).build(); } /** - * Like {@link #withConfig(Map)}, but instead extracts the configuration arguments from a specified YAML file location. + * Like {@link #withConfig(Map)}, but instead extracts the configuration arguments from a + * specified YAML file location. */ public Write withConfigUrl(String configUrl) { return toBuilder().setConfigUrl(configUrl).build(); } - @VisibleForTesting Write withSupportedIdentifiers(List supportedIdentifiers) { return toBuilder().setSupportedIdentifiers(supportedIdentifiers).build(); From fac06bca155ad28001c752a846acd60262363147 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Sat, 6 Apr 2024 17:13:04 -0400 Subject: [PATCH 19/21] unify to one builder class; fix docs --- .../org/apache/beam/sdk/managed/Managed.java | 146 +++++------------- .../ManagedSchemaTransformProvider.java | 4 +- .../apache/beam/sdk/managed/ManagedTest.java | 10 +- 3 files changed, 49 insertions(+), 111 deletions(-) diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java index 856c4b8f1571..9988af373a53 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java @@ -46,11 +46,12 @@ * specifies arguments using like so: * *
{@code
- * PCollectionRowTuple output = Managed.read(ICEBERG)
- *      .withConfig(ImmutableMap..builder()
- *          .put("foo", "abc")
- *          .put("bar", 123)
- *          .build());
+ * PCollectionRowTuple output = p.apply(
+ *       Managed.read(ICEBERG)
+ *           .withConfig(ImmutableMap..builder()
+ *               .put("foo", "abc")
+ *               .put("bar", 123)
+ *               .build()));
  * }
* *

Instead of specifying configuration arguments directly in the code, one can provide the @@ -64,8 +65,9 @@ *

The file's path can be passed in to the Managed API like so: * *

{@code
- * PCollectionRowTuple output = Managed.write(ICEBERG)
- *      .withConfigUrl();
+ * PCollectionRowTuple output = pipeline.apply(
+ *     Managed.write(ICEBERG)
+ *         .withConfigUrl());
  * }
*/ public class Managed { @@ -73,6 +75,15 @@ public class Managed { // TODO: Dynamically generate a list of supported transforms public static final String ICEBERG = "iceberg"; + public static final Map READ_TRANSFORMS = + ImmutableMap.builder() + .put(ICEBERG, "beam:schematransform:org.apache.beam:iceberg_read:v1") + .build(); + public static final Map WRITE_TRANSFORMS = + ImmutableMap.builder() + .put(ICEBERG, "beam:schematransform:org.apache.beam:iceberg_write:v1") + .build(); + /** * Instantiates a {@link Managed.Read} transform for the specified source. The supported managed * sources are: @@ -81,88 +92,19 @@ public class Managed { *
  • {@link Managed#ICEBERG} : Read from Apache Iceberg * */ - public static Read read(String source) { + public static ManagedTransform read(String source) { - return new AutoValue_Managed_Read.Builder() - .setSource( + return new AutoValue_Managed_ManagedTransform.Builder() + .setIdentifier( Preconditions.checkNotNull( - Read.TRANSFORMS.get(source.toLowerCase()), + READ_TRANSFORMS.get(source.toLowerCase()), "An unsupported source was specified: '%s'. Please specify one of the following sources: %s", source, - Read.TRANSFORMS.keySet())) - .setSupportedIdentifiers(new ArrayList<>(Read.TRANSFORMS.values())) + READ_TRANSFORMS.keySet())) + .setSupportedIdentifiers(new ArrayList<>(READ_TRANSFORMS.values())) .build(); } - @AutoValue - public abstract static class Read extends SchemaTransform { - public static final Map TRANSFORMS = - ImmutableMap.builder() - .put(ICEBERG, "beam:schematransform:org.apache.beam:iceberg_read:v1") - .build(); - - abstract String getSource(); - - abstract @Nullable String getConfig(); - - abstract @Nullable String getConfigUrl(); - - abstract List getSupportedIdentifiers(); - - abstract Builder toBuilder(); - - @AutoValue.Builder - abstract static class Builder { - abstract Builder setSource(String sourceIdentifier); - - abstract Builder setConfig(@Nullable String config); - - abstract Builder setConfigUrl(@Nullable String configUrl); - - abstract Builder setSupportedIdentifiers(List supportedIdentifiers); - - abstract Read build(); - } - - /** - * Use the input Map of configuration arguments to build and instantiate the underlying - * transform. The map can ignore nullable parameters, but needs to include all required - * parameters. Check the underlying transform's schema ({@link - * SchemaTransformProvider#configurationSchema()}) to see which parameters are available. - */ - public Read withConfig(Map config) { - return toBuilder().setConfig(YamlUtils.yamlStringFromMap(config)).build(); - } - - /** - * Like {@link #withConfig(Map)}, but instead extracts the configuration arguments from a - * specified YAML file location. - */ - public Read withConfigUrl(String configUrl) { - return toBuilder().setConfigUrl(configUrl).build(); - } - - @VisibleForTesting - Read withSupportedIdentifiers(List supportedIdentifiers) { - return toBuilder().setSupportedIdentifiers(supportedIdentifiers).build(); - } - - @Override - public PCollectionRowTuple expand(PCollectionRowTuple input) { - ManagedSchemaTransformProvider.ManagedConfig managedConfig = - ManagedSchemaTransformProvider.ManagedConfig.builder() - .setTransformIdentifier(getSource()) - .setConfig(getConfig()) - .setConfigUrl(getConfigUrl()) - .build(); - - SchemaTransform underlyingTransform = - new ManagedSchemaTransformProvider(getSupportedIdentifiers()).from(managedConfig); - - return input.apply(underlyingTransform); - } - } - /** * Instantiates a {@link Managed.Write} transform for the specified sink. The supported managed * sinks are: @@ -171,26 +113,21 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { *
  • {@link Managed#ICEBERG} : Write to Apache Iceberg * */ - public static Write write(String sink) { - return new AutoValue_Managed_Write.Builder() - .setSink( + public static ManagedTransform write(String sink) { + return new AutoValue_Managed_ManagedTransform.Builder() + .setIdentifier( Preconditions.checkNotNull( - Write.TRANSFORMS.get(sink.toLowerCase()), + WRITE_TRANSFORMS.get(sink.toLowerCase()), "An unsupported sink was specified: '%s'. Please specify one of the following sinks: %s", sink, - Write.TRANSFORMS.keySet())) - .setSupportedIdentifiers(new ArrayList<>(Write.TRANSFORMS.values())) + WRITE_TRANSFORMS.keySet())) + .setSupportedIdentifiers(new ArrayList<>(WRITE_TRANSFORMS.values())) .build(); } @AutoValue - public abstract static class Write extends SchemaTransform { - public static final Map TRANSFORMS = - ImmutableMap.builder() - .put(ICEBERG, "beam:schematransform:org.apache.beam:iceberg_write:v1") - .build(); - - abstract String getSink(); + abstract static class ManagedTransform extends SchemaTransform { + abstract String getIdentifier(); abstract @Nullable String getConfig(); @@ -202,7 +139,7 @@ public abstract static class Write extends SchemaTransform { @AutoValue.Builder abstract static class Builder { - abstract Builder setSink(String sinkIdentifier); + abstract Builder setIdentifier(String identifier); abstract Builder setConfig(@Nullable String config); @@ -210,15 +147,16 @@ abstract static class Builder { abstract Builder setSupportedIdentifiers(List supportedIdentifiers); - abstract Write build(); + abstract ManagedTransform build(); } /** - * Use the input Map of configuration arguments to build and instantiate the underlying sink. - * The map can ignore nullable parameters, but needs to include all required parameters. Check - * the underlying sink's configuration schema to see which parameters are available. + * Use the input Map of configuration arguments to build and instantiate the underlying + * transform. The map can ignore nullable parameters, but needs to include all required + * parameters. Check the underlying transform's schema ({@link + * SchemaTransformProvider#configurationSchema()}) to see which parameters are available. */ - public Write withConfig(Map config) { + public ManagedTransform withConfig(Map config) { return toBuilder().setConfig(YamlUtils.yamlStringFromMap(config)).build(); } @@ -226,12 +164,12 @@ public Write withConfig(Map config) { * Like {@link #withConfig(Map)}, but instead extracts the configuration arguments from a * specified YAML file location. */ - public Write withConfigUrl(String configUrl) { + public ManagedTransform withConfigUrl(String configUrl) { return toBuilder().setConfigUrl(configUrl).build(); } @VisibleForTesting - Write withSupportedIdentifiers(List supportedIdentifiers) { + ManagedTransform withSupportedIdentifiers(List supportedIdentifiers) { return toBuilder().setSupportedIdentifiers(supportedIdentifiers).build(); } @@ -239,7 +177,7 @@ Write withSupportedIdentifiers(List supportedIdentifiers) { public PCollectionRowTuple expand(PCollectionRowTuple input) { ManagedSchemaTransformProvider.ManagedConfig managedConfig = ManagedSchemaTransformProvider.ManagedConfig.builder() - .setTransformIdentifier(getSink()) + .setTransformIdentifier(getIdentifier()) .setConfig(getConfig()) .setConfigUrl(getConfigUrl()) .build(); diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java index 60502ee1b7b5..1ee2b11a90ff 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java @@ -58,7 +58,7 @@ public String identifier() { public ManagedSchemaTransformProvider() {} - ManagedSchemaTransformProvider(Collection identifiers) { + ManagedSchemaTransformProvider(Collection supportedIdentifiers) { try { for (SchemaTransformProvider schemaTransformProvider : ServiceLoader.load(SchemaTransformProvider.class)) { @@ -73,7 +73,7 @@ public ManagedSchemaTransformProvider() {} throw new RuntimeException(e.getMessage()); } - schemaTransformProviders.entrySet().removeIf(e -> !identifiers.contains(e.getKey())); + schemaTransformProviders.entrySet().removeIf(e -> !supportedIdentifiers.contains(e.getKey())); } @DefaultSchema(AutoValueSchema.class) diff --git a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java index 449a1528c2d4..ceb71a06f33c 100644 --- a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java +++ b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java @@ -60,7 +60,7 @@ public void testInvalidTransform() { Row.withSchema(SCHEMA).withFieldValue("str", "b").withFieldValue("int", 2).build(), Row.withSchema(SCHEMA).withFieldValue("str", "c").withFieldValue("int", 3).build()); - public void runTestProviderTest(Managed.Write writeOp) { + public void runTestProviderTest(Managed.ManagedTransform writeOp) { PCollection rows = PCollectionRowTuple.of("input", pipeline.apply(Create.of(ROWS)).setRowSchema(SCHEMA)) .apply(writeOp) @@ -83,10 +83,10 @@ public void runTestProviderTest(Managed.Write writeOp) { @Test public void testManagedTestProviderWithConfigMap() { - Managed.Write writeOp = + Managed.ManagedTransform writeOp = Managed.write(Managed.ICEBERG) .toBuilder() - .setSink(TestSchemaTransformProvider.IDENTIFIER) + .setIdentifier(TestSchemaTransformProvider.IDENTIFIER) .build() .withSupportedIdentifiers(Arrays.asList(TestSchemaTransformProvider.IDENTIFIER)) .withConfig(ImmutableMap.of("extra_string", "abc", "extra_integer", 123)); @@ -101,10 +101,10 @@ public void testManagedTestProviderWithConfigFile() throws Exception { .toFile() .getAbsolutePath(); - Managed.Write writeOp = + Managed.ManagedTransform writeOp = Managed.write(Managed.ICEBERG) .toBuilder() - .setSink(TestSchemaTransformProvider.IDENTIFIER) + .setIdentifier(TestSchemaTransformProvider.IDENTIFIER) .build() .withSupportedIdentifiers(Arrays.asList(TestSchemaTransformProvider.IDENTIFIER)) .withConfigUrl(yamlConfigPath); From 5ef97a907507413dc6b4be64fec4f58537af9ac1 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Sun, 7 Apr 2024 00:02:02 -0400 Subject: [PATCH 20/21] update docs --- .../src/main/java/org/apache/beam/sdk/managed/Managed.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java index 9988af373a53..10ec985e0814 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java @@ -46,7 +46,7 @@ * specifies arguments using like so: * *
    {@code
    - * PCollectionRowTuple output = p.apply(
    + * PCollectionRowTuple output = PCollectionRowTuple.empty(pipeline).apply(
      *       Managed.read(ICEBERG)
      *           .withConfig(ImmutableMap..builder()
      *               .put("foo", "abc")
    @@ -65,7 +65,9 @@
      * 

    The file's path can be passed in to the Managed API like so: * *

    {@code
    - * PCollectionRowTuple output = pipeline.apply(
    + * PCollectionRowTuple input = PCollectionRowTuple.of("input", pipeline.apply(Create.of(...)))
    + *
    + * PCollectionRowTuple output = input.apply(
      *     Managed.write(ICEBERG)
      *         .withConfigUrl());
      * }
    From 7d527e266d025d16f77e2722c881331598079578 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Sun, 7 Apr 2024 14:15:59 -0400 Subject: [PATCH 21/21] make ManagedTransform public --- .../src/main/java/org/apache/beam/sdk/managed/Managed.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java index 10ec985e0814..b2b010b1e434 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java @@ -128,13 +128,14 @@ public static ManagedTransform write(String sink) { } @AutoValue - abstract static class ManagedTransform extends SchemaTransform { + public abstract static class ManagedTransform extends SchemaTransform { abstract String getIdentifier(); abstract @Nullable String getConfig(); abstract @Nullable String getConfigUrl(); + @VisibleForTesting abstract List getSupportedIdentifiers(); abstract Builder toBuilder(); @@ -147,6 +148,7 @@ abstract static class Builder { abstract Builder setConfigUrl(@Nullable String configUrl); + @VisibleForTesting abstract Builder setSupportedIdentifiers(List supportedIdentifiers); abstract ManagedTransform build();