diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index e54946456d4e..7c692fcb3f4e 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -635,6 +635,7 @@ class BeamModulePlugin implements Plugin { def sbe_tool_version = "1.25.1" def singlestore_jdbc_version = "1.1.4" def slf4j_version = "1.7.30" + def snakeyaml_version = "2.2" def spark2_version = "2.4.8" def spark3_version = "3.2.2" def spotbugs_version = "4.0.6" @@ -866,6 +867,7 @@ class BeamModulePlugin implements Plugin { sbe_tool : "uk.co.real-logic:sbe-tool:$sbe_tool_version", singlestore_jdbc : "com.singlestore:singlestore-jdbc-client:$singlestore_jdbc_version", slf4j_api : "org.slf4j:slf4j-api:$slf4j_version", + snake_yaml : "org.yaml:snakeyaml:$snakeyaml_version", slf4j_android : "org.slf4j:slf4j-android:$slf4j_version", slf4j_ext : "org.slf4j:slf4j-ext:$slf4j_version", slf4j_jdk14 : "org.slf4j:slf4j-jdk14:$slf4j_version", diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle index 5a47cb5237ea..e150c22de62d 100644 --- a/sdks/java/core/build.gradle +++ b/sdks/java/core/build.gradle @@ -98,7 +98,7 @@ dependencies { permitUnusedDeclared enforcedPlatform(library.java.google_cloud_platform_libraries_bom) provided library.java.json_org implementation library.java.everit_json_schema - implementation "org.yaml:snakeyaml:2.0" + implementation library.java.snake_yaml shadowTest library.java.everit_json_schema provided library.java.junit testImplementation "com.github.stefanbirkner:system-rules:1.19.0" diff --git a/sdks/java/expansion-service/build.gradle b/sdks/java/expansion-service/build.gradle index 206b2fe1bc46..2926bfad633f 100644 --- a/sdks/java/expansion-service/build.gradle +++ b/sdks/java/expansion-service/build.gradle @@ -41,10 +41,8 @@ dependencies { implementation project(path: ":sdks:java:core", configuration: "shadow") implementation project(path: ":runners:java-fn-execution") implementation project(path: ":sdks:java:harness") + implementation library.java.snake_yaml permitUnusedDeclared project(path: ":model:fn-execution") - implementation library.java.jackson_annotations - implementation library.java.jackson_databind - implementation library.java.jackson_dataformat_yaml implementation library.java.vendored_grpc_1_60_1 implementation library.java.vendored_guava_32_1_2_jre implementation library.java.slf4j_api diff --git a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/Dependency.java b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/Dependency.java index 4b701e923dc9..579e135a65b8 100644 --- a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/Dependency.java +++ b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/Dependency.java @@ -17,8 +17,6 @@ */ package org.apache.beam.sdk.expansion.service; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; import com.google.auto.value.AutoValue; //// TODO(https://github.com/apache/beam/issues/26527): generalize to support other types of @@ -27,8 +25,7 @@ public abstract class Dependency { abstract String getPath(); - @JsonCreator - static Dependency create(@JsonProperty("path") String path) { + static Dependency create(String path) { return new AutoValue_Dependency(path); } } diff --git a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionServiceConfig.java b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionServiceConfig.java index 868416141305..7d8d1266c57a 100644 --- a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionServiceConfig.java +++ b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionServiceConfig.java @@ -17,17 +17,19 @@ */ package org.apache.beam.sdk.expansion.service; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; import com.google.auto.value.AutoValue; +import java.io.InputStream; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; +import org.yaml.snakeyaml.Yaml; @SuppressWarnings("nullness") @AutoValue public abstract class ExpansionServiceConfig { + public abstract List getAllowlist(); public abstract Map> getDependencies(); @@ -36,14 +38,52 @@ public static ExpansionServiceConfig empty() { return create(new ArrayList<>(), new HashMap<>()); } - @JsonCreator - static ExpansionServiceConfig create( - @JsonProperty("allowlist") List allowlist, - @JsonProperty("dependencies") Map> dependencies) { + public static ExpansionServiceConfig create( + List allowlist, Map> dependencies) { if (allowlist == null) { allowlist = new ArrayList<>(); } - System.out.println("Dependencies list: " + dependencies); return new AutoValue_ExpansionServiceConfig(allowlist, dependencies); } + + static ExpansionServiceConfig parseFromYamlStream(InputStream inputStream) { + Yaml yaml = new Yaml(); + Map config = yaml.load(inputStream); + + if (config == null) { + throw new IllegalArgumentException( + "Could not parse the provided YAML stream into a non-trivial ExpansionServiceConfig"); + } + + List allowList = new ArrayList<>(); + Map> dependencies = new HashMap<>(); + if (config.get("allowlist") != null) { + allowList = (List) config.get("allowlist"); + } + + if (config.get("dependencies") != null) { + Map> dependenciesFromConfig = + (Map>) config.get("dependencies"); + dependenciesFromConfig.forEach( + (k, v) -> { + if (v != null) { + List dependenciesForTransform = + v.stream() + .map( + val -> { + Map depProperties = (Map) val; + String path = (String) depProperties.get("path"); + if (path == null) { + throw new IllegalArgumentException( + "Expected the path to be not null"); + } + return Dependency.create(path); + }) + .collect(Collectors.toList()); + dependencies.put(k, dependenciesForTransform); + } + }); + } + return ExpansionServiceConfig.create(allowList, dependencies); + } } diff --git a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionServiceOptions.java b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionServiceOptions.java index bc2dc95066c0..8862feac36c6 100644 --- a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionServiceOptions.java +++ b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionServiceOptions.java @@ -17,10 +17,11 @@ */ package org.apache.beam.sdk.expansion.service; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; import org.apache.beam.sdk.expansion.service.JavaClassLookupTransformProvider.AllowList; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.DefaultValueFactory; @@ -78,17 +79,19 @@ public AllowList create(PipelineOptions options) { if (allowListFile.equals("*")) { return AllowList.everything(); } - ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); File allowListFileObj = new File(allowListFile); if (!allowListFileObj.exists()) { throw new IllegalArgumentException( "Allow list file " + allowListFile + " does not exist"); } - try { - return mapper.readValue(allowListFileObj, AllowList.class); + try (InputStream stream = new FileInputStream(allowListFileObj)) { + return AllowList.parseFromYamlStream(stream); + } catch (FileNotFoundException e) { + throw new RuntimeException( + "Could not parse the provided allowlist file " + allowListFile, e); } catch (IOException e) { - throw new IllegalArgumentException( - "Could not load the provided allowlist file " + allowListFile, e); + throw new RuntimeException( + "Could not parse the provided allowlist file " + allowListFile, e); } } @@ -104,16 +107,18 @@ class ExpansionServiceConfigFactory implements DefaultValueFactory config = yaml.load(inputStream); + + if (config == null) { + throw new IllegalArgumentException( + "Could not parse the provided YAML stream into a non-trivial AllowList"); + } + + String version = config.get("version") != null ? (String) config.get("version") : ""; + List allowedClasses = new ArrayList<>(); + if (config.get("allowedClasses") != null) { + allowedClasses = + ((List>) config.get("allowedClasses")) + .stream() + .map( + data -> { + String className = (String) data.get("className"); + if (className == null) { + throw new IllegalArgumentException( + "Expected each entry in the allowlist to include the 'className'"); + } + List allowedBuilderMethods = + (List) data.get("allowedBuilderMethods"); + List allowedConstructorMethods = + (List) data.get("allowedConstructorMethods"); + if (allowedBuilderMethods == null) { + allowedBuilderMethods = new ArrayList<>(); + } + if (allowedConstructorMethods == null) { + allowedConstructorMethods = new ArrayList<>(); + } + return AllowedClass.create( + className, allowedBuilderMethods, allowedConstructorMethods); + }) + .collect(Collectors.toList()); + } + return AllowList.create(version, allowedClasses); + } + public abstract String getVersion(); public abstract List getAllowedClasses(); @@ -512,11 +553,7 @@ public AllowedClass getAllowedClass(String className) { return allowlistClass; } - @JsonCreator - static AllowList create( - @JsonProperty("version") String version, - @JsonProperty("allowedClasses") @javax.annotation.Nullable - List allowedClasses) { + static AllowList create(String version, List allowedClasses) { if (allowedClasses == null) { allowedClasses = new ArrayList<>(); } @@ -553,13 +590,10 @@ public boolean isAllowedConstructorMethod(String methodName) { || getAllowedConstructorMethods().equals(WILDCARD); } - @JsonCreator static AllowedClass create( - @JsonProperty("className") String className, - @JsonProperty("allowedBuilderMethods") @javax.annotation.Nullable - List allowedBuilderMethods, - @JsonProperty("allowedConstructorMethods") @javax.annotation.Nullable - List allowedConstructorMethods) { + String className, + List allowedBuilderMethods, + List allowedConstructorMethods) { if (allowedBuilderMethods == null) { allowedBuilderMethods = new ArrayList<>(); } diff --git a/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceTest.java b/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceTest.java index eb277917588d..3bd87c2ae5c7 100644 --- a/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceTest.java +++ b/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceTest.java @@ -32,6 +32,11 @@ import com.google.auto.service.AutoService; import com.google.auto.value.AutoValue; import java.io.IOException; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -59,6 +64,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Resources; import org.checkerframework.checker.nullness.qual.Nullable; import org.hamcrest.Matchers; import org.junit.Test; @@ -338,6 +344,40 @@ public void testExternalConfiguration_simpleSchema() throws Exception { assertThat(config.getList(), Matchers.is(ImmutableList.of("abc", "123"))); } + @Test + public void testExpansionServiceConfig() throws Exception { + URL expansionServiceConfigFile = Resources.getResource("./test_expansion_service_config.yaml"); + ExpansionServiceConfig config = + ExpansionServiceConfig.parseFromYamlStream( + Files.newInputStream(Paths.get(expansionServiceConfigFile.getPath()))); + assertEquals(3, config.getAllowlist().size()); + assertTrue(config.getAllowlist().contains("beam:transform:my_dummy_transform_1")); + assertTrue(config.getAllowlist().contains("beam:transform:my_dummy_transform_2")); + assertTrue(config.getAllowlist().contains("beam:transform:my_dummy_transform_3")); + + assertEquals(2, config.getDependencies().size()); + assertTrue(config.getDependencies().containsKey("beam:transform:my_dummy_transform_2")); + assertTrue(config.getDependencies().containsKey("beam:transform:my_dummy_transform_3")); + + assertEquals(1, config.getDependencies().get("beam:transform:my_dummy_transform_2").size()); + assertEquals( + "jars/my_dummy_transform_2_dep1.jar", + config.getDependencies().get("beam:transform:my_dummy_transform_2").get(0).getPath()); + assertEquals(2, config.getDependencies().get("beam:transform:my_dummy_transform_3").size()); + + ArrayList expectedDepsOfTransform3 = + new ArrayList<>( + Arrays.asList( + "jars/my_dummy_transform_3_dep1.jar", "jars/my_dummy_transform_3_dep2.jar")); + + assertTrue( + expectedDepsOfTransform3.contains( + config.getDependencies().get("beam:transform:my_dummy_transform_3").get(0).getPath())); + assertTrue( + expectedDepsOfTransform3.contains( + config.getDependencies().get("beam:transform:my_dummy_transform_3").get(1).getPath())); + } + @DefaultSchema(AutoValueSchema.class) @AutoValue abstract static class TestConfigSimpleSchema { diff --git a/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/JavaClassLookupTransformProviderTest.java b/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/JavaClassLookupTransformProviderTest.java index 34e22bc24122..cefd1c346390 100644 --- a/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/JavaClassLookupTransformProviderTest.java +++ b/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/JavaClassLookupTransformProviderTest.java @@ -31,7 +31,6 @@ import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; -import java.io.File; import java.io.IOException; import java.io.Serializable; import java.net.URL; @@ -88,7 +87,6 @@ public class JavaClassLookupTransformProviderTest { public static void setupExpansionService() { PipelineOptionsFactory.register(ExpansionServiceOptions.class); URL allowListFile = Resources.getResource("./test_allowlist.yaml"); - System.out.println("Exists: " + new File(allowListFile.getPath()).exists()); expansionService = new ExpansionService( new String[] {"--javaClassLookupAllowlistFile=" + allowListFile.getPath()}); diff --git a/sdks/java/expansion-service/src/test/resources/test_expansion_service_config.yaml b/sdks/java/expansion-service/src/test/resources/test_expansion_service_config.yaml new file mode 100644 index 000000000000..c0fa37cd0ab4 --- /dev/null +++ b/sdks/java/expansion-service/src/test/resources/test_expansion_service_config.yaml @@ -0,0 +1,23 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +allowlist: +- "beam:transform:my_dummy_transform_1" +- "beam:transform:my_dummy_transform_2" +- "beam:transform:my_dummy_transform_3" +dependencies: + "beam:transform:my_dummy_transform_1": + "beam:transform:my_dummy_transform_2": + - path: "jars/my_dummy_transform_2_dep1.jar" + "beam:transform:my_dummy_transform_3": + - path: "jars/my_dummy_transform_3_dep1.jar" + - path: "jars/my_dummy_transform_3_dep2.jar" diff --git a/sdks/java/transform-service/build.gradle b/sdks/java/transform-service/build.gradle index 231f955cbe72..91e6185152da 100644 --- a/sdks/java/transform-service/build.gradle +++ b/sdks/java/transform-service/build.gradle @@ -43,9 +43,7 @@ dependencies { implementation project(path: ":sdks:java:core", configuration: "shadow") implementation library.java.vendored_grpc_1_60_1 implementation library.java.vendored_guava_32_1_2_jre - implementation library.java.jackson_annotations - implementation library.java.jackson_databind - implementation library.java.jackson_dataformat_yaml + implementation library.java.snake_yaml testImplementation library.java.junit testImplementation library.java.mockito_core testImplementation project(path: ":runners:java-fn-execution") diff --git a/sdks/java/transform-service/src/main/java/org/apache/beam/sdk/transformservice/TransformServiceConfig.java b/sdks/java/transform-service/src/main/java/org/apache/beam/sdk/transformservice/TransformServiceConfig.java index 07ea5a2c3f48..bbada3205463 100644 --- a/sdks/java/transform-service/src/main/java/org/apache/beam/sdk/transformservice/TransformServiceConfig.java +++ b/sdks/java/transform-service/src/main/java/org/apache/beam/sdk/transformservice/TransformServiceConfig.java @@ -17,11 +17,12 @@ */ package org.apache.beam.sdk.transformservice; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; import com.google.auto.value.AutoValue; +import java.io.InputStream; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import org.yaml.snakeyaml.Yaml; @AutoValue public abstract class TransformServiceConfig { @@ -31,13 +32,32 @@ public static TransformServiceConfig empty() { return create(new ArrayList<>()); } - @JsonCreator - static TransformServiceConfig create( - @JsonProperty("expansionservices") List expansionservices) { + static TransformServiceConfig create(List expansionservices) { if (expansionservices == null) { expansionservices = new ArrayList<>(); } return new AutoValue_TransformServiceConfig(expansionservices); } + + static TransformServiceConfig parseFromYamlStream(InputStream inputStream) { + Yaml yaml = new Yaml(); + Map config = yaml.load(inputStream); + if (config == null) { + throw new IllegalArgumentException( + "Could not parse the provided YAML stream into a non-trivial TransformServiceConfig"); + } + + List expansionservices = null; + if (config.get("expansionservices") != null) { + expansionservices = (List) config.get("expansionservices"); + } + + if (expansionservices == null) { + throw new IllegalArgumentException( + "Expected the Transform Service config to contain at least one Expansion Service."); + } + + return TransformServiceConfig.create(expansionservices); + } } diff --git a/sdks/java/transform-service/src/main/java/org/apache/beam/sdk/transformservice/TransformServiceOptions.java b/sdks/java/transform-service/src/main/java/org/apache/beam/sdk/transformservice/TransformServiceOptions.java index 558dffc5bcfb..721ff33d8e5a 100644 --- a/sdks/java/transform-service/src/main/java/org/apache/beam/sdk/transformservice/TransformServiceOptions.java +++ b/sdks/java/transform-service/src/main/java/org/apache/beam/sdk/transformservice/TransformServiceOptions.java @@ -17,10 +17,11 @@ */ package org.apache.beam.sdk.transformservice; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.sdk.options.Description; @@ -51,16 +52,18 @@ class TransformServiceConfigFactory implements DefaultValueFactory