Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,7 @@ class BeamModulePlugin implements Plugin<Project> {
def sbe_tool_version = "1.25.1"
def singlestore_jdbc_version = "1.1.4"
def slf4j_version = "1.7.30"
def snakeyaml_version = "2.2"
def spark2_version = "2.4.8"
def spark3_version = "3.2.2"
def spotbugs_version = "4.0.6"
Expand Down Expand Up @@ -866,6 +867,7 @@ class BeamModulePlugin implements Plugin<Project> {
sbe_tool : "uk.co.real-logic:sbe-tool:$sbe_tool_version",
singlestore_jdbc : "com.singlestore:singlestore-jdbc-client:$singlestore_jdbc_version",
slf4j_api : "org.slf4j:slf4j-api:$slf4j_version",
snake_yaml : "org.yaml:snakeyaml:$snakeyaml_version",
slf4j_android : "org.slf4j:slf4j-android:$slf4j_version",
slf4j_ext : "org.slf4j:slf4j-ext:$slf4j_version",
slf4j_jdk14 : "org.slf4j:slf4j-jdk14:$slf4j_version",
Expand Down
2 changes: 1 addition & 1 deletion sdks/java/core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ dependencies {
permitUnusedDeclared enforcedPlatform(library.java.google_cloud_platform_libraries_bom)
provided library.java.json_org
implementation library.java.everit_json_schema
implementation "org.yaml:snakeyaml:2.0"
implementation library.java.snake_yaml
shadowTest library.java.everit_json_schema
provided library.java.junit
testImplementation "com.github.stefanbirkner:system-rules:1.19.0"
Expand Down
4 changes: 1 addition & 3 deletions sdks/java/expansion-service/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,8 @@ dependencies {
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(path: ":runners:java-fn-execution")
implementation project(path: ":sdks:java:harness")
implementation library.java.snake_yaml
permitUnusedDeclared project(path: ":model:fn-execution")
implementation library.java.jackson_annotations
implementation library.java.jackson_databind
implementation library.java.jackson_dataformat_yaml
implementation library.java.vendored_grpc_1_60_1
implementation library.java.vendored_guava_32_1_2_jre
implementation library.java.slf4j_api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
*/
package org.apache.beam.sdk.expansion.service;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.auto.value.AutoValue;

//// TODO(https://github.com/apache/beam/issues/26527): generalize to support other types of
Expand All @@ -27,8 +25,7 @@
public abstract class Dependency {
abstract String getPath();

@JsonCreator
static Dependency create(@JsonProperty("path") String path) {
static Dependency create(String path) {
return new AutoValue_Dependency(path);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,19 @@
*/
package org.apache.beam.sdk.expansion.service;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.auto.value.AutoValue;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.yaml.snakeyaml.Yaml;

@SuppressWarnings("nullness")
@AutoValue
public abstract class ExpansionServiceConfig {

public abstract List<String> getAllowlist();

public abstract Map<String, List<Dependency>> getDependencies();
Expand All @@ -36,14 +38,52 @@ public static ExpansionServiceConfig empty() {
return create(new ArrayList<>(), new HashMap<>());
}

@JsonCreator
static ExpansionServiceConfig create(
@JsonProperty("allowlist") List<String> allowlist,
@JsonProperty("dependencies") Map<String, List<Dependency>> dependencies) {
public static ExpansionServiceConfig create(
List<String> allowlist, Map<String, List<Dependency>> dependencies) {
if (allowlist == null) {
allowlist = new ArrayList<>();
}
System.out.println("Dependencies list: " + dependencies);
return new AutoValue_ExpansionServiceConfig(allowlist, dependencies);
}

static ExpansionServiceConfig parseFromYamlStream(InputStream inputStream) {
Yaml yaml = new Yaml();
Map<Object, Object> config = yaml.load(inputStream);

if (config == null) {
throw new IllegalArgumentException(
"Could not parse the provided YAML stream into a non-trivial ExpansionServiceConfig");
}

List<String> allowList = new ArrayList<>();
Map<String, List<Dependency>> dependencies = new HashMap<>();
if (config.get("allowlist") != null) {
allowList = (List<String>) config.get("allowlist");
}

if (config.get("dependencies") != null) {
Map<String, List<Object>> dependenciesFromConfig =
(Map<String, List<Object>>) config.get("dependencies");
dependenciesFromConfig.forEach(
(k, v) -> {
if (v != null) {
List<Dependency> dependenciesForTransform =
v.stream()
.map(
val -> {
Map<String, Object> depProperties = (Map<String, Object>) val;
String path = (String) depProperties.get("path");
if (path == null) {
throw new IllegalArgumentException(
"Expected the path to be not null");
}
return Dependency.create(path);
})
.collect(Collectors.toList());
dependencies.put(k, dependenciesForTransform);
}
});
}
return ExpansionServiceConfig.create(allowList, dependencies);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
*/
package org.apache.beam.sdk.expansion.service;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import org.apache.beam.sdk.expansion.service.JavaClassLookupTransformProvider.AllowList;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
Expand Down Expand Up @@ -78,17 +79,19 @@ public AllowList create(PipelineOptions options) {
if (allowListFile.equals("*")) {
return AllowList.everything();
}
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
File allowListFileObj = new File(allowListFile);
if (!allowListFileObj.exists()) {
throw new IllegalArgumentException(
"Allow list file " + allowListFile + " does not exist");
}
try {
return mapper.readValue(allowListFileObj, AllowList.class);
try (InputStream stream = new FileInputStream(allowListFileObj)) {
return AllowList.parseFromYamlStream(stream);
} catch (FileNotFoundException e) {
throw new RuntimeException(
"Could not parse the provided allowlist file " + allowListFile, e);
} catch (IOException e) {
throw new IllegalArgumentException(
"Could not load the provided allowlist file " + allowListFile, e);
throw new RuntimeException(
"Could not parse the provided allowlist file " + allowListFile, e);
}
}

Expand All @@ -104,16 +107,18 @@ class ExpansionServiceConfigFactory implements DefaultValueFactory<ExpansionServ
public ExpansionServiceConfig create(PipelineOptions options) {
String configFile = options.as(ExpansionServiceOptions.class).getExpansionServiceConfigFile();
if (configFile != null) {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
File configFileObj = new File(configFile);
if (!configFileObj.exists()) {
throw new IllegalArgumentException("Config file " + configFile + " does not exist");
}
try {
return mapper.readValue(configFileObj, ExpansionServiceConfig.class);
try (InputStream stream = new FileInputStream(configFileObj)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here too, would be nice to add a message to these RuntimeException wrappers, like "when opening file to parse as ExpansionServiceConfig"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

return ExpansionServiceConfig.parseFromYamlStream(stream);
} catch (FileNotFoundException e) {
throw new RuntimeException(
"Could not parse the provided Expansion Service config file" + configFile, e);
} catch (IOException e) {
throw new IllegalArgumentException(
"Could not load the provided config file " + configFile, e);
throw new RuntimeException(
"Could not parse the provided Expansion Service config file" + configFile, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@

import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.auto.value.AutoValue;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.io.InputStream;
import java.lang.annotation.Annotation;
import java.lang.reflect.Array;
import java.lang.reflect.Constructor;
Expand All @@ -36,6 +35,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.beam.model.pipeline.v1.ExternalTransforms.BuilderMethod;
Expand All @@ -62,6 +62,7 @@
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.InvalidProtocolBufferException;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.yaml.snakeyaml.Yaml;

/**
* A transform provider that can be used to directly instantiate a transform using Java class name
Expand Down Expand Up @@ -488,6 +489,46 @@ public static AllowList everything() {
AllowedClass.create("*", AllowedClass.WILDCARD, AllowedClass.WILDCARD)));
}

static AllowList parseFromYamlStream(InputStream inputStream) {
Yaml yaml = new Yaml();
Map<Object, Object> config = yaml.load(inputStream);

if (config == null) {
throw new IllegalArgumentException(
"Could not parse the provided YAML stream into a non-trivial AllowList");
}

String version = config.get("version") != null ? (String) config.get("version") : "";
List<AllowedClass> allowedClasses = new ArrayList<>();
if (config.get("allowedClasses") != null) {
allowedClasses =
((List<Map<Object, Object>>) config.get("allowedClasses"))
.stream()
.map(
data -> {
String className = (String) data.get("className");
if (className == null) {
throw new IllegalArgumentException(
"Expected each entry in the allowlist to include the 'className'");
}
List<String> allowedBuilderMethods =
(List<String>) data.get("allowedBuilderMethods");
List<String> allowedConstructorMethods =
(List<String>) data.get("allowedConstructorMethods");
if (allowedBuilderMethods == null) {
allowedBuilderMethods = new ArrayList<>();
}
if (allowedConstructorMethods == null) {
allowedConstructorMethods = new ArrayList<>();
}
return AllowedClass.create(
className, allowedBuilderMethods, allowedConstructorMethods);
})
.collect(Collectors.toList());
}
return AllowList.create(version, allowedClasses);
}

public abstract String getVersion();

public abstract List<AllowedClass> getAllowedClasses();
Expand All @@ -512,11 +553,7 @@ public AllowedClass getAllowedClass(String className) {
return allowlistClass;
}

@JsonCreator
static AllowList create(
@JsonProperty("version") String version,
@JsonProperty("allowedClasses") @javax.annotation.Nullable
List<AllowedClass> allowedClasses) {
static AllowList create(String version, List<AllowedClass> allowedClasses) {
if (allowedClasses == null) {
allowedClasses = new ArrayList<>();
}
Expand Down Expand Up @@ -553,13 +590,10 @@ public boolean isAllowedConstructorMethod(String methodName) {
|| getAllowedConstructorMethods().equals(WILDCARD);
}

@JsonCreator
static AllowedClass create(
@JsonProperty("className") String className,
@JsonProperty("allowedBuilderMethods") @javax.annotation.Nullable
List<String> allowedBuilderMethods,
@JsonProperty("allowedConstructorMethods") @javax.annotation.Nullable
List<String> allowedConstructorMethods) {
String className,
List<String> allowedBuilderMethods,
List<String> allowedConstructorMethods) {
if (allowedBuilderMethods == null) {
allowedBuilderMethods = new ArrayList<>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@
import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -59,6 +64,7 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Resources;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.hamcrest.Matchers;
import org.junit.Test;
Expand Down Expand Up @@ -338,6 +344,40 @@ public void testExternalConfiguration_simpleSchema() throws Exception {
assertThat(config.getList(), Matchers.is(ImmutableList.of("abc", "123")));
}

@Test
public void testExpansionServiceConfig() throws Exception {
URL expansionServiceConfigFile = Resources.getResource("./test_expansion_service_config.yaml");
ExpansionServiceConfig config =
ExpansionServiceConfig.parseFromYamlStream(
Files.newInputStream(Paths.get(expansionServiceConfigFile.getPath())));
assertEquals(3, config.getAllowlist().size());
assertTrue(config.getAllowlist().contains("beam:transform:my_dummy_transform_1"));
assertTrue(config.getAllowlist().contains("beam:transform:my_dummy_transform_2"));
assertTrue(config.getAllowlist().contains("beam:transform:my_dummy_transform_3"));

assertEquals(2, config.getDependencies().size());
assertTrue(config.getDependencies().containsKey("beam:transform:my_dummy_transform_2"));
assertTrue(config.getDependencies().containsKey("beam:transform:my_dummy_transform_3"));

assertEquals(1, config.getDependencies().get("beam:transform:my_dummy_transform_2").size());
assertEquals(
"jars/my_dummy_transform_2_dep1.jar",
config.getDependencies().get("beam:transform:my_dummy_transform_2").get(0).getPath());
assertEquals(2, config.getDependencies().get("beam:transform:my_dummy_transform_3").size());

ArrayList<String> expectedDepsOfTransform3 =
new ArrayList<>(
Arrays.asList(
"jars/my_dummy_transform_3_dep1.jar", "jars/my_dummy_transform_3_dep2.jar"));

assertTrue(
expectedDepsOfTransform3.contains(
config.getDependencies().get("beam:transform:my_dummy_transform_3").get(0).getPath()));
assertTrue(
expectedDepsOfTransform3.contains(
config.getDependencies().get("beam:transform:my_dummy_transform_3").get(1).getPath()));
}

@DefaultSchema(AutoValueSchema.class)
@AutoValue
abstract static class TestConfigSimpleSchema {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.net.URL;
Expand Down Expand Up @@ -88,7 +87,6 @@ public class JavaClassLookupTransformProviderTest {
public static void setupExpansionService() {
PipelineOptionsFactory.register(ExpansionServiceOptions.class);
URL allowListFile = Resources.getResource("./test_allowlist.yaml");
System.out.println("Exists: " + new File(allowListFile.getPath()).exists());
expansionService =
new ExpansionService(
new String[] {"--javaClassLookupAllowlistFile=" + allowListFile.getPath()});
Expand Down
Loading