Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion model/pipeline/src/main/proto/external_transforms.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,15 @@ option java_outer_classname = "ExternalTransforms";
import "beam_runner_api.proto";

message ConfigValue {
string coder_urn = 1;
// Coder and its components (in case of a compound Coder)
repeated string coder_urn = 1;
// The Payload which is decoded using the coder_urn
bytes payload = 2;
}

// A configuration payload for an external transform.
// Used as the payload of ExternalTransform as part of an ExpansionRequest.
message ExternalConfigurationPayload {
// Configuration key => value
map<string, ConfigValue> configuration = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,25 @@
import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.beam.model.expansion.v1.ExpansionApi;
import org.apache.beam.model.expansion.v1.ExpansionServiceGrpc;
import org.apache.beam.model.pipeline.v1.ExternalTransforms;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.BeamUrns;
import org.apache.beam.runners.core.construction.CoderTranslation;
import org.apache.beam.runners.core.construction.Environments;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.runners.core.construction.RehydratedComponents;
import org.apache.beam.runners.core.construction.SdkComponents;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
import org.apache.beam.sdk.transforms.PTransform;
Expand Down Expand Up @@ -85,7 +87,7 @@ public interface ExpansionServiceRegistrar {
* Exposes Java transforms via {@link org.apache.beam.sdk.expansion.ExternalTransformRegistrar}.
*/
@AutoService(ExpansionService.ExpansionServiceRegistrar.class)
public static class ExternalTransformRegistrarLoader<ConfigT>
public static class ExternalTransformRegistrarLoader
implements ExpansionService.ExpansionServiceRegistrar {

@Override
Expand All @@ -94,10 +96,10 @@ public Map<String, ExpansionService.TransformProvider> knownTransforms() {
ImmutableMap.builder();
for (ExternalTransformRegistrar registrar :
ServiceLoader.load(ExternalTransformRegistrar.class)) {
for (Map.Entry<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> entry :
for (Map.Entry<String, Class<? extends ExternalTransformBuilder>> entry :
registrar.knownBuilders().entrySet()) {
String urn = entry.getKey();
Class<? extends ExternalTransformBuilder<?, ?, ?>> builderClass = entry.getValue();
Class<? extends ExternalTransformBuilder> builderClass = entry.getValue();
builder.put(
urn,
spec -> {
Expand All @@ -117,7 +119,7 @@ public Map<String, ExpansionService.TransformProvider> knownTransforms() {

private static PTransform translate(
ExternalTransforms.ExternalConfigurationPayload payload,
Class<? extends ExternalTransformBuilder<?, ?, ?>> builderClass)
Class<? extends ExternalTransformBuilder> builderClass)
throws Exception {
Preconditions.checkState(
ExternalTransformBuilder.class.isAssignableFrom(builderClass),
Expand All @@ -129,8 +131,8 @@ private static PTransform translate(
return buildTransform(builderClass, configObject);
}

private static Object initConfiguration(
Class<? extends ExternalTransformBuilder<?, ?, ?>> builderClass) throws Exception {
private static Object initConfiguration(Class<? extends ExternalTransformBuilder> builderClass)
throws Exception {
for (Method method : builderClass.getMethods()) {
if (method.getName().equals("buildExternal")) {
Preconditions.checkState(
Expand All @@ -148,24 +150,22 @@ private static Object initConfiguration(
throw new RuntimeException("Couldn't find build method on ExternalTransformBuilder.");
}

private static void populateConfiguration(
@VisibleForTesting
static void populateConfiguration(
Object config, ExternalTransforms.ExternalConfigurationPayload payload) throws Exception {
Converter<String, String> camelCaseConverter =
CaseFormat.LOWER_UNDERSCORE.converterTo(CaseFormat.LOWER_CAMEL);
for (Map.Entry<String, ExternalTransforms.ConfigValue> entry :
payload.getConfigurationMap().entrySet()) {
String fieldName = camelCaseConverter.convert(entry.getKey());
String coderUrn = entry.getValue().getCoderUrn();

final Coder coder;
final Class type;
if (BeamUrns.getUrn(RunnerApi.StandardCoders.Enum.VARINT).equals(coderUrn)) {
coder = VarLongCoder.of();
type = Long.class;
} else {
// TODO Use RehydratedComponents with coder ids instead
throw new RuntimeException("Unsupported coder urn " + coderUrn);
}
String key = entry.getKey();
ExternalTransforms.ConfigValue value = entry.getValue();

String fieldName = camelCaseConverter.convert(key);
List<String> coderUrns = value.getCoderUrnList();
Preconditions.checkArgument(coderUrns.size() > 0, "No Coder URN provided.");
Coder coder = resolveCoder(coderUrns);
Class type = coder.getEncodedTypeDescriptor().getRawType();

String setterName =
"set" + Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
Method method;
Expand All @@ -183,10 +183,52 @@ private static void populateConfiguration(
}
}

private static Coder resolveCoder(List<String> coderUrns) throws Exception {
Preconditions.checkArgument(coderUrns.size() > 0, "No Coder URN provided.");
RunnerApi.Components.Builder componentsBuilder = RunnerApi.Components.newBuilder();
RunnerApi.Coder coder = buildProto(0, coderUrns, componentsBuilder);

RehydratedComponents rehydratedComponents =
RehydratedComponents.forComponents(componentsBuilder.build());
return CoderTranslation.fromProto(coder, rehydratedComponents);
}

private static RunnerApi.Coder buildProto(
int coderPos, List<String> coderUrns, RunnerApi.Components.Builder componentsBuilder) {
Preconditions.checkArgument(
coderPos < coderUrns.size(), "Pointer into coderURNs is not correct.");

final String coderUrn = coderUrns.get(coderPos);
RunnerApi.Coder.Builder coderBuilder =
RunnerApi.Coder.newBuilder()
.setSpec(
RunnerApi.SdkFunctionSpec.newBuilder()
.setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(coderUrn).build())
.build());

if (coderUrn.equals(BeamUrns.getUrn(RunnerApi.StandardCoders.Enum.ITERABLE))) {
RunnerApi.Coder elementCoder = buildProto(coderPos + 1, coderUrns, componentsBuilder);
String coderId = UUID.randomUUID().toString();
componentsBuilder.putCoders(coderId, elementCoder);
coderBuilder.addComponentCoderIds(coderId);
} else if (coderUrn.equals(BeamUrns.getUrn(RunnerApi.StandardCoders.Enum.KV))) {
RunnerApi.Coder element1Coder = buildProto(coderPos + 1, coderUrns, componentsBuilder);
RunnerApi.Coder element2Coder = buildProto(coderPos + 2, coderUrns, componentsBuilder);
String coderId1 = UUID.randomUUID().toString();
String coderId2 = UUID.randomUUID().toString();
componentsBuilder.putCoders(coderId1, element1Coder);
componentsBuilder.putCoders(coderId2, element2Coder);
coderBuilder.addComponentCoderIds(coderId1);
coderBuilder.addComponentCoderIds(coderId2);
}

return coderBuilder.build();
}

private static PTransform buildTransform(
Class<? extends ExternalTransformBuilder<?, ?, ?>> builderClass, Object configObject)
Class<? extends ExternalTransformBuilder> builderClass, Object configObject)
throws Exception {
Constructor<? extends ExternalTransformBuilder<?, ?, ?>> constructor =
Constructor<? extends ExternalTransformBuilder> constructor =
builderClass.getDeclaredConstructor();
constructor.setAccessible(true);
ExternalTransformBuilder<?, ?, ?> externalTransformBuilder = constructor.newInstance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,35 @@
package org.apache.beam.runners.core.construction.expansion;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;

import com.google.auto.service.AutoService;
import java.io.ByteArrayOutputStream;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.beam.model.expansion.v1.ExpansionApi;
import org.apache.beam.model.pipeline.v1.ExternalTransforms;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.BeamUrns;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Charsets;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.hamcrest.Matchers;
Expand Down Expand Up @@ -105,13 +116,13 @@ public void testConstructGenerateSequence() {
.putConfiguration(
"start",
ExternalTransforms.ConfigValue.newBuilder()
.setCoderUrn(BeamUrns.getUrn(RunnerApi.StandardCoders.Enum.VARINT))
.addCoderUrn(BeamUrns.getUrn(RunnerApi.StandardCoders.Enum.VARINT))
.setPayload(ByteString.copyFrom(new byte[] {0}))
.build())
.putConfiguration(
"stop",
ExternalTransforms.ConfigValue.newBuilder()
.setCoderUrn(BeamUrns.getUrn(RunnerApi.StandardCoders.Enum.VARINT))
.addCoderUrn(BeamUrns.getUrn(RunnerApi.StandardCoders.Enum.VARINT))
.setPayload(ByteString.copyFrom(new byte[] {1}))
.build())
.build();
Expand All @@ -137,6 +148,81 @@ public void testConstructGenerateSequence() {
assertThat(expandedTransform.getSubtransformsCount(), Matchers.greaterThan(0));
}

@Test
public void testCompoundCodersForExternalConfiguration() throws Exception {
ExternalTransforms.ExternalConfigurationPayload.Builder builder =
ExternalTransforms.ExternalConfigurationPayload.newBuilder();

builder.putConfiguration(
"config_key1",
ExternalTransforms.ConfigValue.newBuilder()
.addCoderUrn(BeamUrns.getUrn(RunnerApi.StandardCoders.Enum.VARINT))
.setPayload(ByteString.copyFrom(new byte[] {1}))
.build());

List<byte[]> byteList =
ImmutableList.of("testing", "compound", "coders").stream()
.map(str -> str.getBytes(Charsets.UTF_8))
.collect(Collectors.toList());
IterableCoder<byte[]> compoundCoder = IterableCoder.of(ByteArrayCoder.of());
ByteArrayOutputStream baos = new ByteArrayOutputStream();
compoundCoder.encode(byteList, baos);

builder.putConfiguration(
"config_key2",
ExternalTransforms.ConfigValue.newBuilder()
.addCoderUrn(BeamUrns.getUrn(RunnerApi.StandardCoders.Enum.ITERABLE))
.addCoderUrn(BeamUrns.getUrn(RunnerApi.StandardCoders.Enum.BYTES))
.setPayload(ByteString.copyFrom(baos.toByteArray()))
.build());

List<KV<byte[], Long>> byteKvList =
ImmutableList.of("testing", "compound", "coders").stream()
.map(str -> KV.of(str.getBytes(Charsets.UTF_8), (long) str.length()))
.collect(Collectors.toList());
IterableCoder<KV<byte[], Long>> compoundCoder2 =
IterableCoder.of(KvCoder.of(ByteArrayCoder.of(), VarLongCoder.of()));
ByteArrayOutputStream baos2 = new ByteArrayOutputStream();
compoundCoder2.encode(byteKvList, baos2);

builder.putConfiguration(
"config_key3",
ExternalTransforms.ConfigValue.newBuilder()
.addCoderUrn(BeamUrns.getUrn(RunnerApi.StandardCoders.Enum.ITERABLE))
.addCoderUrn(BeamUrns.getUrn(RunnerApi.StandardCoders.Enum.KV))
.addCoderUrn(BeamUrns.getUrn(RunnerApi.StandardCoders.Enum.BYTES))
.addCoderUrn(BeamUrns.getUrn(RunnerApi.StandardCoders.Enum.VARINT))
.setPayload(ByteString.copyFrom(baos2.toByteArray()))
.build());

ExternalTransforms.ExternalConfigurationPayload externalConfig = builder.build();
TestConfig config = new TestConfig();
ExpansionService.ExternalTransformRegistrarLoader.populateConfiguration(config, externalConfig);

assertThat(config.configKey1, Matchers.is(1L));
assertArrayEquals(Iterables.toArray(config.configKey2, byte[].class), byteList.toArray());
assertArrayEquals(Iterables.toArray(config.configKey3, KV.class), byteKvList.toArray());
}

private static class TestConfig {

private Long configKey1;
private Iterable<byte[]> configKey2;
private Iterable<KV<byte[], Long>> configKey3;

public void setConfigKey1(Long configKey1) {
this.configKey1 = configKey1;
}

public void setConfigKey2(Iterable<byte[]> configKey2) {
this.configKey2 = configKey2;
}

public void setConfigKey3(Iterable<KV<byte[], Long>> configKey3) {
this.configKey3 = configKey3;
}
}

public Set<String> allIds(RunnerApi.Components components) {
Set<String> all = new HashSet<>();
all.addAll(components.getTransformsMap().keySet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,5 @@
public interface ExternalTransformRegistrar {

/** A mapping from URN to an {@link ExternalTransformBuilder} class. */
Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders();
Map<String, Class<? extends ExternalTransformBuilder>> knownBuilders();
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public static class External implements ExternalTransformRegistrar {
public static final String URN = "beam:external:java:generate_sequence:v1";

@Override
public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
public Map<String, Class<? extends ExternalTransformBuilder>> knownBuilders() {
return ImmutableMap.of(URN, AutoValue_GenerateSequence.Builder.class);
}

Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/io/external/generate_sequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __init__(self, start, stop=None,
elements_per_period=None, max_read_time=None,
expansion_service=None):
coder = VarIntCoder()
coder_urn = 'beam:coder:varint:v1'
coder_urn = ['beam:coder:varint:v1']
args = {
'start':
ConfigValue(
Expand Down