From 236c4ed936305db6f9427170415ef9b8a10383d8 Mon Sep 17 00:00:00 2001 From: Heejong Lee Date: Wed, 8 Jun 2022 17:00:36 -0700 Subject: [PATCH] add withResources to External --- .../runners/core/construction/External.java | 93 ++++++++++++++----- .../python/PythonExternalTransform.java | 11 +++ 2 files changed, 80 insertions(+), 24 deletions(-) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java index 720f1ee8b516..aef45b885c64 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java @@ -94,7 +94,13 @@ SingleOutputExpandableTransform of( Endpoints.ApiServiceDescriptor apiDesc = Endpoints.ApiServiceDescriptor.newBuilder().setUrl(endpoint).build(); return new SingleOutputExpandableTransform<>( - urn, payload, apiDesc, DEFAULT, getFreshNamespaceIndex(), ImmutableMap.of()); + urn, + payload, + apiDesc, + DEFAULT, + getFreshNamespaceIndex(), + ImmutableMap.of(), + ImmutableMap.of()); } @VisibleForTesting @@ -103,7 +109,13 @@ static SingleOutputExpandableTransform( - urn, payload, apiDesc, clientFactory, getFreshNamespaceIndex(), ImmutableMap.of()); + urn, + payload, + apiDesc, + clientFactory, + getFreshNamespaceIndex(), + ImmutableMap.of(), + ImmutableMap.of()); } /** Expandable transform for output type of PCollection. */ @@ -115,8 +127,9 @@ public static class SingleOutputExpandableTransform> outputCoders) { - super(urn, payload, endpoint, clientFactory, namespaceIndex, outputCoders); + Map> outputCoders, + Map resources) { + super(urn, payload, endpoint, clientFactory, namespaceIndex, outputCoders, resources); } @Override @@ -125,6 +138,18 @@ PCollection toOutputCollection(Map, PCollection> output) { return Iterables.getOnlyElement(output.values()); } + public SingleOutputExpandableTransform withResources( + Map resources) { + return new SingleOutputExpandableTransform<>( + getUrn(), + getPayload(), + getEndpoint(), + getClientFactory(), + getNamespaceIndex(), + getOutputCoders(), + resources); + } + public MultiOutputExpandableTransform withMultiOutputs() { return new MultiOutputExpandableTransform<>( getUrn(), @@ -132,7 +157,8 @@ public MultiOutputExpandableTransform withMultiOutputs() { getEndpoint(), getClientFactory(), getNamespaceIndex(), - getOutputCoders()); + getOutputCoders(), + getResources()); } public SingleOutputExpandableTransform withOutputCoder(Coder outputCoder) { @@ -142,7 +168,8 @@ public SingleOutputExpandableTransform withOutputCoder(Coder getEndpoint(), getClientFactory(), getNamespaceIndex(), - ImmutableMap.of("0", outputCoder)); + ImmutableMap.of("0", outputCoder), + getResources()); } } @@ -155,8 +182,9 @@ public static class MultiOutputExpandableTransform Endpoints.ApiServiceDescriptor endpoint, ExpansionServiceClientFactory clientFactory, Integer namespaceIndex, - Map> outputCoders) { - super(urn, payload, endpoint, clientFactory, namespaceIndex, outputCoders); + Map> outputCoders, + Map resources) { + super(urn, payload, endpoint, clientFactory, namespaceIndex, outputCoders, resources); } @Override @@ -178,7 +206,8 @@ public MultiOutputExpandableTransform withOutputCoder( getEndpoint(), getClientFactory(), getNamespaceIndex(), - outputCoders); + outputCoders, + getResources()); } } @@ -191,6 +220,7 @@ public abstract static class ExpandableTransform> outputCoders; + private final Map resources; private transient RunnerApi.@Nullable Components expandedComponents; private transient RunnerApi.@Nullable PTransform expandedTransform; @@ -204,13 +234,15 @@ public abstract static class ExpandableTransform> outputCoders) { + Map> outputCoders, + Map resources) { this.urn = urn; this.payload = payload; this.endpoint = endpoint; this.clientFactory = clientFactory; this.namespaceIndex = namespaceIndex; this.outputCoders = outputCoders; + this.resources = resources; } @Override @@ -281,20 +313,29 @@ public OutputT expand(InputT input) { String.format("expansion service error: %s", response.getError())); } - Map newEnvironmentsWithDependencies = - response.getComponents().getEnvironmentsMap().entrySet().stream() - .filter( - kv -> - !originalComponents.getEnvironmentsMap().containsKey(kv.getKey()) - && kv.getValue().getDependenciesCount() != 0) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - - expandedComponents = - response - .getComponents() - .toBuilder() - .putAllEnvironments(resolveArtifacts(newEnvironmentsWithDependencies)) - .build(); + RunnerApi.Components.Builder componentsBuilder = response.getComponents().toBuilder(); + componentsBuilder.putAllEnvironments( + resolveArtifacts( + componentsBuilder.getEnvironmentsMap().entrySet().stream() + .filter( + kv -> + !originalComponents.getEnvironmentsMap().containsKey(kv.getKey()) + && kv.getValue().getDependenciesCount() != 0) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)))); + List artifacts = + Environments.getArtifacts( + resources.entrySet().stream() + .map(e -> String.format("%s=%s", e.getValue(), e.getKey())) + .collect(Collectors.toList())); + componentsBuilder.putAllEnvironments( + componentsBuilder.getEnvironmentsMap().entrySet().stream() + .filter(kv -> !originalComponents.getEnvironmentsMap().containsKey(kv.getKey())) + .collect( + Collectors.toMap( + Map.Entry::getKey, + e -> e.getValue().toBuilder().addAllDependencies(artifacts).build()))); + + expandedComponents = componentsBuilder.build(); expandedTransform = response.getTransform(); expandedRequirements = response.getRequirementsList(); @@ -478,5 +519,9 @@ Integer getNamespaceIndex() { Map> getOutputCoders() { return outputCoders; } + + Map getResources() { + return resources; + } } } diff --git a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java b/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java index 7ae434d89dc4..2add63aafc30 100644 --- a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java +++ b/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java @@ -74,6 +74,7 @@ public class PythonExternalTransform> outputCoders; + Map resources; private PythonExternalTransform(String fullyQualifiedName, String expansionService) { this.fullyQualifiedName = fullyQualifiedName; @@ -86,6 +87,7 @@ private PythonExternalTransform(String fullyQualifiedName, String expansionServi PythonCallableSource.class, Schema.FieldType.logicalType(new PythonCallable())); argsArray = new Object[] {}; this.outputCoders = new HashMap<>(); + this.resources = new HashMap<>(); } /** @@ -228,6 +230,14 @@ public PythonExternalTransform withOutputCoder(Coder outputC return this; } + public PythonExternalTransform withResources(Map resources) { + if (this.resources.size() > 0) { + throw new IllegalArgumentException("resources were already specified"); + } + this.resources = resources; + return this; + } + @VisibleForTesting Row buildOrGetKwargsRow() { if (providedKwargsRow != null) { @@ -406,6 +416,7 @@ private OutputT apply( "beam:transforms:python:fully_qualified_named", payload.toByteArray(), expansionService) + .withResources(this.resources) .withMultiOutputs() .withOutputCoder(this.outputCoders); PCollectionTuple outputs;