diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 7c692fcb3f4e..e54946456d4e 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -635,7 +635,6 @@ class BeamModulePlugin implements Plugin { def sbe_tool_version = "1.25.1" def singlestore_jdbc_version = "1.1.4" def slf4j_version = "1.7.30" - def snakeyaml_version = "2.2" def spark2_version = "2.4.8" def spark3_version = "3.2.2" def spotbugs_version = "4.0.6" @@ -867,7 +866,6 @@ class BeamModulePlugin implements Plugin { sbe_tool : "uk.co.real-logic:sbe-tool:$sbe_tool_version", singlestore_jdbc : "com.singlestore:singlestore-jdbc-client:$singlestore_jdbc_version", slf4j_api : "org.slf4j:slf4j-api:$slf4j_version", - snake_yaml : "org.yaml:snakeyaml:$snakeyaml_version", slf4j_android : "org.slf4j:slf4j-android:$slf4j_version", slf4j_ext : "org.slf4j:slf4j-ext:$slf4j_version", slf4j_jdk14 : "org.slf4j:slf4j-jdk14:$slf4j_version", diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle index e150c22de62d..5f32b64893df 100644 --- a/sdks/java/core/build.gradle +++ b/sdks/java/core/build.gradle @@ -91,6 +91,7 @@ dependencies { shadow library.java.jackson_core shadow library.java.jackson_annotations shadow library.java.jackson_databind + shadow library.java.jackson_dataformat_yaml shadow library.java.slf4j_api shadow library.java.snappy_java shadow library.java.joda_time @@ -98,7 +99,6 @@ dependencies { permitUnusedDeclared enforcedPlatform(library.java.google_cloud_platform_libraries_bom) provided library.java.json_org implementation library.java.everit_json_schema - implementation library.java.snake_yaml shadowTest library.java.everit_json_schema provided library.java.junit testImplementation "com.github.stefanbirkner:system-rules:1.19.0" @@ -107,7 +107,6 @@ dependencies { provided 'com.facebook.presto.hadoop:hadoop-apache2:3.2.0-1' provided library.java.zstd_jni permitUnusedDeclared 'com.facebook.presto.hadoop:hadoop-apache2:3.2.0-1' - shadowTest library.java.jackson_dataformat_yaml shadowTest library.java.guava_testlib shadowTest library.java.mockito_core shadowTest library.java.hamcrest diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java index e631e166e8be..e004225ef226 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java @@ -19,6 +19,11 @@ import static org.apache.beam.sdk.values.Row.toRow; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import java.io.IOException; +import java.io.InputStream; import java.math.BigDecimal; import java.util.Collections; import java.util.List; @@ -34,9 +39,10 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding; import org.checkerframework.checker.nullness.qual.Nullable; -import org.yaml.snakeyaml.Yaml; public class YamlUtils { + private static final ObjectMapper MAPPER = new ObjectMapper(new YAMLFactory()); + private static final Map> YAML_VALUE_PARSERS = ImmutableMap .) Preconditions.checkNotNull(yamlMap), schema, convertNamesToCamelCase); + return toBeamRow(yamlStringToMap(yamlString), schema, convertNamesToCamelCase); } private static @Nullable Object toBeamValue( @@ -180,13 +177,29 @@ public static String yamlStringFromMap(@Nullable Map map) { if (map == null || map.isEmpty()) { return ""; } - return new Yaml().dumpAsMap(map); + try { + return MAPPER.writeValueAsString(map); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } } - public static Map yamlStringToMap(@Nullable String yaml) { - if (yaml == null || yaml.isEmpty()) { + public static Map yamlStringToMap(@Nullable String yamlString) { + if (yamlString == null || yamlString.isEmpty()) { return Collections.emptyMap(); } - return new Yaml().load(yaml); + try { + return MAPPER.readValue(yamlString, Map.class); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + public static Map inputStreamToMap(InputStream inputStream) { + try { + return MAPPER.readValue(inputStream, Map.class); + } catch (IOException e) { + throw new RuntimeException(e); + } } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/YamlUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/YamlUtilsTest.java index bf032aed7b5c..4866b5fb74c7 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/YamlUtilsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/YamlUtilsTest.java @@ -123,11 +123,11 @@ public void testExtraFieldsAreIgnored() { @Test public void testInvalidTopLevelArray() { - String invalidYaml = "- top_level_list" + "- another_list"; + String invalidYaml = "- top_level_list\n" + "- another_list"; Schema schema = Schema.builder().build(); - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Expected a YAML mapping"); + thrown.expect(RuntimeException.class); + thrown.expectMessage("Array value"); YamlUtils.toBeamRow(invalidYaml, schema); } diff --git a/sdks/java/expansion-service/build.gradle b/sdks/java/expansion-service/build.gradle index 2926bfad633f..fe00c8e50350 100644 --- a/sdks/java/expansion-service/build.gradle +++ b/sdks/java/expansion-service/build.gradle @@ -41,7 +41,6 @@ dependencies { implementation project(path: ":sdks:java:core", configuration: "shadow") implementation project(path: ":runners:java-fn-execution") implementation project(path: ":sdks:java:harness") - implementation library.java.snake_yaml permitUnusedDeclared project(path: ":model:fn-execution") implementation library.java.vendored_grpc_1_60_1 implementation library.java.vendored_guava_32_1_2_jre diff --git a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionServiceConfig.java b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionServiceConfig.java index 7d8d1266c57a..dd4343502a99 100644 --- a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionServiceConfig.java +++ b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionServiceConfig.java @@ -24,7 +24,7 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; -import org.yaml.snakeyaml.Yaml; +import org.apache.beam.sdk.schemas.utils.YamlUtils; @SuppressWarnings("nullness") @AutoValue @@ -47,8 +47,7 @@ public static ExpansionServiceConfig create( } static ExpansionServiceConfig parseFromYamlStream(InputStream inputStream) { - Yaml yaml = new Yaml(); - Map config = yaml.load(inputStream); + Map config = YamlUtils.inputStreamToMap(inputStream); if (config == null) { throw new IllegalArgumentException( diff --git a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/JavaClassLookupTransformProvider.java b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/JavaClassLookupTransformProvider.java index 5e0820da468d..9303dc45cfca 100644 --- a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/JavaClassLookupTransformProvider.java +++ b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/JavaClassLookupTransformProvider.java @@ -53,6 +53,7 @@ import org.apache.beam.sdk.schemas.Schema.TypeName; import org.apache.beam.sdk.schemas.SchemaRegistry; import org.apache.beam.sdk.schemas.SchemaTranslation; +import org.apache.beam.sdk.schemas.utils.YamlUtils; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.util.common.ReflectHelpers; @@ -62,7 +63,6 @@ import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.InvalidProtocolBufferException; import org.checkerframework.checker.nullness.qual.Nullable; -import org.yaml.snakeyaml.Yaml; /** * A transform provider that can be used to directly instantiate a transform using Java class name @@ -490,8 +490,7 @@ public static AllowList everything() { } static AllowList parseFromYamlStream(InputStream inputStream) { - Yaml yaml = new Yaml(); - Map config = yaml.load(inputStream); + Map config = YamlUtils.inputStreamToMap(inputStream); if (config == null) { throw new IllegalArgumentException( diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java index 46168a487dda..3957efc76c8b 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java @@ -28,6 +28,7 @@ import java.util.stream.Stream; import org.apache.beam.sdk.managed.Managed; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.utils.YamlUtils; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.PCollection; @@ -40,7 +41,6 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import org.yaml.snakeyaml.Yaml; public class IcebergReadSchemaTransformProviderTest { @@ -163,7 +163,7 @@ public void testReadUsingManagedTransform() throws Exception { + " catalog_type: %s\n" + " warehouse_location: %s", identifier, CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP, warehouse.location); - Map configMap = new Yaml().load(yamlConfig); + Map configMap = YamlUtils.yamlStringToMap(yamlConfig); PCollection output = testPipeline diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java index 9ef3e9945ec9..046930d8f815 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.UUID; import org.apache.beam.sdk.managed.Managed; +import org.apache.beam.sdk.schemas.utils.YamlUtils; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; @@ -47,7 +48,6 @@ import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import org.yaml.snakeyaml.Yaml; @RunWith(JUnit4.class) public class IcebergWriteSchemaTransformProviderTest { @@ -132,7 +132,7 @@ public void testWriteUsingManagedTransform() { + " catalog_type: %s\n" + " warehouse_location: %s", identifier, CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP, warehouse.location); - Map configMap = new Yaml().load(yamlConfig); + Map configMap = YamlUtils.yamlStringToMap(yamlConfig); PCollection inputRows = testPipeline diff --git a/sdks/java/transform-service/build.gradle b/sdks/java/transform-service/build.gradle index 91e6185152da..df6e2281b5c4 100644 --- a/sdks/java/transform-service/build.gradle +++ b/sdks/java/transform-service/build.gradle @@ -43,7 +43,6 @@ dependencies { implementation project(path: ":sdks:java:core", configuration: "shadow") implementation library.java.vendored_grpc_1_60_1 implementation library.java.vendored_guava_32_1_2_jre - implementation library.java.snake_yaml testImplementation library.java.junit testImplementation library.java.mockito_core testImplementation project(path: ":runners:java-fn-execution") diff --git a/sdks/java/transform-service/src/main/java/org/apache/beam/sdk/transformservice/TransformServiceConfig.java b/sdks/java/transform-service/src/main/java/org/apache/beam/sdk/transformservice/TransformServiceConfig.java index bbada3205463..166d94ef55cc 100644 --- a/sdks/java/transform-service/src/main/java/org/apache/beam/sdk/transformservice/TransformServiceConfig.java +++ b/sdks/java/transform-service/src/main/java/org/apache/beam/sdk/transformservice/TransformServiceConfig.java @@ -22,7 +22,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import org.yaml.snakeyaml.Yaml; +import org.apache.beam.sdk.schemas.utils.YamlUtils; @AutoValue public abstract class TransformServiceConfig { @@ -41,8 +41,7 @@ static TransformServiceConfig create(List expansionservices) { } static TransformServiceConfig parseFromYamlStream(InputStream inputStream) { - Yaml yaml = new Yaml(); - Map config = yaml.load(inputStream); + Map config = YamlUtils.inputStreamToMap(inputStream); if (config == null) { throw new IllegalArgumentException( "Could not parse the provided YAML stream into a non-trivial TransformServiceConfig");