Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,6 @@ class BeamModulePlugin implements Plugin<Project> {
def sbe_tool_version = "1.25.1"
def singlestore_jdbc_version = "1.1.4"
def slf4j_version = "1.7.30"
def snakeyaml_version = "2.2"
def spark2_version = "2.4.8"
def spark3_version = "3.2.2"
def spotbugs_version = "4.0.6"
Expand Down Expand Up @@ -867,7 +866,6 @@ class BeamModulePlugin implements Plugin<Project> {
sbe_tool : "uk.co.real-logic:sbe-tool:$sbe_tool_version",
singlestore_jdbc : "com.singlestore:singlestore-jdbc-client:$singlestore_jdbc_version",
slf4j_api : "org.slf4j:slf4j-api:$slf4j_version",
snake_yaml : "org.yaml:snakeyaml:$snakeyaml_version",
slf4j_android : "org.slf4j:slf4j-android:$slf4j_version",
slf4j_ext : "org.slf4j:slf4j-ext:$slf4j_version",
slf4j_jdk14 : "org.slf4j:slf4j-jdk14:$slf4j_version",
Expand Down
3 changes: 1 addition & 2 deletions sdks/java/core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,14 @@ dependencies {
shadow library.java.jackson_core
shadow library.java.jackson_annotations
shadow library.java.jackson_databind
shadow library.java.jackson_dataformat_yaml
shadow library.java.slf4j_api
shadow library.java.snappy_java
shadow library.java.joda_time
implementation enforcedPlatform(library.java.google_cloud_platform_libraries_bom)
permitUnusedDeclared enforcedPlatform(library.java.google_cloud_platform_libraries_bom)
provided library.java.json_org
implementation library.java.everit_json_schema
implementation library.java.snake_yaml
shadowTest library.java.everit_json_schema
provided library.java.junit
testImplementation "com.github.stefanbirkner:system-rules:1.19.0"
Expand All @@ -107,7 +107,6 @@ dependencies {
provided 'com.facebook.presto.hadoop:hadoop-apache2:3.2.0-1'
provided library.java.zstd_jni
permitUnusedDeclared 'com.facebook.presto.hadoop:hadoop-apache2:3.2.0-1'
shadowTest library.java.jackson_dataformat_yaml
shadowTest library.java.guava_testlib
shadowTest library.java.mockito_core
shadowTest library.java.hamcrest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@

import static org.apache.beam.sdk.values.Row.toRow;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import java.io.IOException;
import java.io.InputStream;
import java.math.BigDecimal;
import java.util.Collections;
import java.util.List;
Expand All @@ -34,9 +39,10 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.yaml.snakeyaml.Yaml;

public class YamlUtils {
private static final ObjectMapper MAPPER = new ObjectMapper(new YAMLFactory());

private static final Map<Schema.TypeName, Function<String, @Nullable Object>> YAML_VALUE_PARSERS =
ImmutableMap
.<Schema.TypeName,
Expand Down Expand Up @@ -74,16 +80,7 @@ public static Row toBeamRow(
requiredFields));
}
}
Yaml yaml = new Yaml();
Object yamlMap = yaml.load(yamlString);

Preconditions.checkArgument(
yamlMap instanceof Map,
"Expected a YAML mapping but got type '%s' instead.",
Preconditions.checkNotNull(yamlMap).getClass());

return toBeamRow(
(Map<String, Object>) Preconditions.checkNotNull(yamlMap), schema, convertNamesToCamelCase);
return toBeamRow(yamlStringToMap(yamlString), schema, convertNamesToCamelCase);
}

private static @Nullable Object toBeamValue(
Expand Down Expand Up @@ -180,13 +177,29 @@ public static String yamlStringFromMap(@Nullable Map<String, Object> map) {
if (map == null || map.isEmpty()) {
return "";
}
return new Yaml().dumpAsMap(map);
try {
return MAPPER.writeValueAsString(map);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}

public static Map<String, Object> yamlStringToMap(@Nullable String yaml) {
if (yaml == null || yaml.isEmpty()) {
public static Map<String, Object> yamlStringToMap(@Nullable String yamlString) {
if (yamlString == null || yamlString.isEmpty()) {
return Collections.emptyMap();
}
return new Yaml().load(yaml);
try {
return MAPPER.readValue(yamlString, Map.class);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}

public static Map<String, Object> inputStreamToMap(InputStream inputStream) {
try {
return MAPPER.readValue(inputStream, Map.class);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,11 @@ public void testExtraFieldsAreIgnored() {

@Test
public void testInvalidTopLevelArray() {
String invalidYaml = "- top_level_list" + "- another_list";
String invalidYaml = "- top_level_list\n" + "- another_list";
Schema schema = Schema.builder().build();

thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Expected a YAML mapping");
thrown.expect(RuntimeException.class);
thrown.expectMessage("Array value");
YamlUtils.toBeamRow(invalidYaml, schema);
}

Expand Down
1 change: 0 additions & 1 deletion sdks/java/expansion-service/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ dependencies {
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(path: ":runners:java-fn-execution")
implementation project(path: ":sdks:java:harness")
implementation library.java.snake_yaml
permitUnusedDeclared project(path: ":model:fn-execution")
implementation library.java.vendored_grpc_1_60_1
implementation library.java.vendored_guava_32_1_2_jre
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.yaml.snakeyaml.Yaml;
import org.apache.beam.sdk.schemas.utils.YamlUtils;

@SuppressWarnings("nullness")
@AutoValue
Expand All @@ -47,8 +47,7 @@ public static ExpansionServiceConfig create(
}

static ExpansionServiceConfig parseFromYamlStream(InputStream inputStream) {
Yaml yaml = new Yaml();
Map<Object, Object> config = yaml.load(inputStream);
Map<String, Object> config = YamlUtils.inputStreamToMap(inputStream);

if (config == null) {
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.beam.sdk.schemas.Schema.TypeName;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.schemas.SchemaTranslation;
import org.apache.beam.sdk.schemas.utils.YamlUtils;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.common.ReflectHelpers;
Expand All @@ -62,7 +63,6 @@
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.InvalidProtocolBufferException;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.yaml.snakeyaml.Yaml;

/**
* A transform provider that can be used to directly instantiate a transform using Java class name
Expand Down Expand Up @@ -490,8 +490,7 @@ public static AllowList everything() {
}

static AllowList parseFromYamlStream(InputStream inputStream) {
Yaml yaml = new Yaml();
Map<Object, Object> config = yaml.load(inputStream);
Map<String, Object> config = YamlUtils.inputStreamToMap(inputStream);

if (config == null) {
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.stream.Stream;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.utils.YamlUtils;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.PCollection;
Expand All @@ -40,7 +41,6 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.yaml.snakeyaml.Yaml;

public class IcebergReadSchemaTransformProviderTest {

Expand Down Expand Up @@ -163,7 +163,7 @@ public void testReadUsingManagedTransform() throws Exception {
+ " catalog_type: %s\n"
+ " warehouse_location: %s",
identifier, CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP, warehouse.location);
Map<String, Object> configMap = new Yaml().load(yamlConfig);
Map<String, Object> configMap = YamlUtils.yamlStringToMap(yamlConfig);

PCollection<Row> output =
testPipeline
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Map;
import java.util.UUID;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.schemas.utils.YamlUtils;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
Expand All @@ -47,7 +48,6 @@
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.yaml.snakeyaml.Yaml;

@RunWith(JUnit4.class)
public class IcebergWriteSchemaTransformProviderTest {
Expand Down Expand Up @@ -132,7 +132,7 @@ public void testWriteUsingManagedTransform() {
+ " catalog_type: %s\n"
+ " warehouse_location: %s",
identifier, CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP, warehouse.location);
Map<String, Object> configMap = new Yaml().load(yamlConfig);
Map<String, Object> configMap = YamlUtils.yamlStringToMap(yamlConfig);

PCollection<Row> inputRows =
testPipeline
Expand Down
1 change: 0 additions & 1 deletion sdks/java/transform-service/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ dependencies {
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation library.java.vendored_grpc_1_60_1
implementation library.java.vendored_guava_32_1_2_jre
implementation library.java.snake_yaml
testImplementation library.java.junit
testImplementation library.java.mockito_core
testImplementation project(path: ":runners:java-fn-execution")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.yaml.snakeyaml.Yaml;
import org.apache.beam.sdk.schemas.utils.YamlUtils;

@AutoValue
public abstract class TransformServiceConfig {
Expand All @@ -41,8 +41,7 @@ static TransformServiceConfig create(List<String> expansionservices) {
}

static TransformServiceConfig parseFromYamlStream(InputStream inputStream) {
Yaml yaml = new Yaml();
Map<Object, Object> config = yaml.load(inputStream);
Map<String, Object> config = YamlUtils.inputStreamToMap(inputStream);
if (config == null) {
throw new IllegalArgumentException(
"Could not parse the provided YAML stream into a non-trivial TransformServiceConfig");
Expand Down