From 7493dd8b549f8faf8725b420ad8a1ed6c956578f Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Tue, 2 Apr 2019 20:29:33 +0200 Subject: [PATCH 1/3] Remove undefined generic arguments for ExternalTransformBuilder The type arguments are very verbose and do not help the type checking. --- .../construction/expansion/ExpansionService.java | 15 +++++++-------- .../sdk/expansion/ExternalTransformRegistrar.java | 2 +- .../org/apache/beam/sdk/io/GenerateSequence.java | 2 +- 3 files changed, 9 insertions(+), 10 deletions(-) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionService.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionService.java index e3820b88647f..0f180644818a 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionService.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionService.java @@ -85,8 +85,7 @@ public interface ExpansionServiceRegistrar { * Exposes Java transforms via {@link org.apache.beam.sdk.expansion.ExternalTransformRegistrar}. */ @AutoService(ExpansionService.ExpansionServiceRegistrar.class) - public static class ExternalTransformRegistrarLoader - implements ExpansionService.ExpansionServiceRegistrar { + public static class ExternalTransformRegistrarLoader implements ExpansionService.ExpansionServiceRegistrar { @Override public Map knownTransforms() { @@ -94,10 +93,10 @@ public Map knownTransforms() { ImmutableMap.builder(); for (ExternalTransformRegistrar registrar : ServiceLoader.load(ExternalTransformRegistrar.class)) { - for (Map.Entry>> entry : + for (Map.Entry> entry : registrar.knownBuilders().entrySet()) { String urn = entry.getKey(); - Class> builderClass = entry.getValue(); + Class builderClass = entry.getValue(); builder.put( urn, spec -> { @@ -117,7 +116,7 @@ public Map knownTransforms() { private static PTransform translate( ExternalTransforms.ExternalConfigurationPayload payload, - Class> builderClass) + Class builderClass) throws Exception { Preconditions.checkState( ExternalTransformBuilder.class.isAssignableFrom(builderClass), @@ -130,7 +129,7 @@ private static PTransform translate( } private static Object initConfiguration( - Class> builderClass) throws Exception { + Class builderClass) throws Exception { for (Method method : builderClass.getMethods()) { if (method.getName().equals("buildExternal")) { Preconditions.checkState( @@ -184,9 +183,9 @@ private static void populateConfiguration( } private static PTransform buildTransform( - Class> builderClass, Object configObject) + Class builderClass, Object configObject) throws Exception { - Constructor> constructor = + Constructor constructor = builderClass.getDeclaredConstructor(); constructor.setAccessible(true); ExternalTransformBuilder externalTransformBuilder = constructor.newInstance(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/expansion/ExternalTransformRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/expansion/ExternalTransformRegistrar.java index 7e057cb3af48..26ee3799973b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/expansion/ExternalTransformRegistrar.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/expansion/ExternalTransformRegistrar.java @@ -27,5 +27,5 @@ public interface ExternalTransformRegistrar { /** A mapping from URN to an {@link ExternalTransformBuilder} class. */ - Map>> knownBuilders(); + Map> knownBuilders(); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/GenerateSequence.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/GenerateSequence.java index 763941cded2c..6a33f388bd04 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/GenerateSequence.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/GenerateSequence.java @@ -134,7 +134,7 @@ public static class External implements ExternalTransformRegistrar { public static final String URN = "beam:external:java:generate_sequence:v1"; @Override - public Map>> knownBuilders() { + public Map> knownBuilders() { return ImmutableMap.of(URN, AutoValue_GenerateSequence.Builder.class); } From 4e3e42060b41e9dfd884723110f3c1e76c56e0bd Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Wed, 3 Apr 2019 12:30:12 +0200 Subject: [PATCH 2/3] [BEAM-6990] Use CoderTranslation for resolving coders in cross-language configuration This replaces the custom coder URN translation code in ExternalTransformRegistrarLoader with the usual CoderTranslation. This will allow to support all coders available across languages. --- .../src/main/proto/external_transforms.proto | 1 + .../expansion/ExpansionService.java | 42 ++++++++++++------- 2 files changed, 27 insertions(+), 16 deletions(-) diff --git a/model/pipeline/src/main/proto/external_transforms.proto b/model/pipeline/src/main/proto/external_transforms.proto index 2ed02048e518..26dd6438410c 100644 --- a/model/pipeline/src/main/proto/external_transforms.proto +++ b/model/pipeline/src/main/proto/external_transforms.proto @@ -38,5 +38,6 @@ message ConfigValue { // A configuration payload for an external transform. // Used as the payload of ExternalTransform as part of an ExpansionRequest. message ExternalConfigurationPayload { + // Configuration key => value map configuration = 1; } diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionService.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionService.java index 0f180644818a..ad92c8a4ff3b 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionService.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionService.java @@ -31,14 +31,13 @@ import org.apache.beam.model.expansion.v1.ExpansionServiceGrpc; import org.apache.beam.model.pipeline.v1.ExternalTransforms; import org.apache.beam.model.pipeline.v1.RunnerApi; -import org.apache.beam.runners.core.construction.BeamUrns; +import org.apache.beam.runners.core.construction.CoderTranslation; import org.apache.beam.runners.core.construction.Environments; import org.apache.beam.runners.core.construction.PipelineTranslation; import org.apache.beam.runners.core.construction.RehydratedComponents; import org.apache.beam.runners.core.construction.SdkComponents; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.expansion.ExternalTransformRegistrar; import org.apache.beam.sdk.transforms.ExternalTransformBuilder; import org.apache.beam.sdk.transforms.PTransform; @@ -85,7 +84,8 @@ public interface ExpansionServiceRegistrar { * Exposes Java transforms via {@link org.apache.beam.sdk.expansion.ExternalTransformRegistrar}. */ @AutoService(ExpansionService.ExpansionServiceRegistrar.class) - public static class ExternalTransformRegistrarLoader implements ExpansionService.ExpansionServiceRegistrar { + public static class ExternalTransformRegistrarLoader + implements ExpansionService.ExpansionServiceRegistrar { @Override public Map knownTransforms() { @@ -128,8 +128,8 @@ private static PTransform translate( return buildTransform(builderClass, configObject); } - private static Object initConfiguration( - Class builderClass) throws Exception { + private static Object initConfiguration(Class builderClass) + throws Exception { for (Method method : builderClass.getMethods()) { if (method.getName().equals("buildExternal")) { Preconditions.checkState( @@ -153,18 +153,13 @@ private static void populateConfiguration( CaseFormat.LOWER_UNDERSCORE.converterTo(CaseFormat.LOWER_CAMEL); for (Map.Entry entry : payload.getConfigurationMap().entrySet()) { - String fieldName = camelCaseConverter.convert(entry.getKey()); - String coderUrn = entry.getValue().getCoderUrn(); + String key = entry.getKey(); + ExternalTransforms.ConfigValue value = entry.getValue(); + + String fieldName = camelCaseConverter.convert(key); + Coder coder = resolveCoder(value.getCoderUrn()); + Class type = coder.getEncodedTypeDescriptor().getRawType(); - final Coder coder; - final Class type; - if (BeamUrns.getUrn(RunnerApi.StandardCoders.Enum.VARINT).equals(coderUrn)) { - coder = VarLongCoder.of(); - type = Long.class; - } else { - // TODO Use RehydratedComponents with coder ids instead - throw new RuntimeException("Unsupported coder urn " + coderUrn); - } String setterName = "set" + Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1); Method method; @@ -182,6 +177,21 @@ private static void populateConfiguration( } } + private static Coder resolveCoder(String coderUrn) throws Exception { + RunnerApi.Coder coder = + RunnerApi.Coder.newBuilder() + .setSpec( + RunnerApi.SdkFunctionSpec.newBuilder() + .setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(coderUrn).build())) + .build(); + + // TODO This uses simple structured coders, need to support compound coders + RunnerApi.Components components = RunnerApi.Components.newBuilder().build(); + RehydratedComponents rehydratedComponents = RehydratedComponents.forComponents(components); + + return CoderTranslation.fromProto(coder, rehydratedComponents); + } + private static PTransform buildTransform( Class builderClass, Object configObject) throws Exception { From 41230a97afe57b397a4d0a583468493c6c729e95 Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Wed, 3 Apr 2019 17:08:25 +0200 Subject: [PATCH 3/3] [BEAM-6990] Allow compound coders for externally configured transforms --- .../src/main/proto/external_transforms.proto | 4 +- .../expansion/ExpansionService.java | 53 ++++++++--- .../expansion/ExpansionServiceTest.java | 90 ++++++++++++++++++- .../io/external/generate_sequence.py | 2 +- 4 files changed, 135 insertions(+), 14 deletions(-) diff --git a/model/pipeline/src/main/proto/external_transforms.proto b/model/pipeline/src/main/proto/external_transforms.proto index 26dd6438410c..b8529d7cc965 100644 --- a/model/pipeline/src/main/proto/external_transforms.proto +++ b/model/pipeline/src/main/proto/external_transforms.proto @@ -31,7 +31,9 @@ option java_outer_classname = "ExternalTransforms"; import "beam_runner_api.proto"; message ConfigValue { - string coder_urn = 1; + // Coder and its components (in case of a compound Coder) + repeated string coder_urn = 1; + // The Payload which is decoded using the coder_urn bytes payload = 2; } diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionService.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionService.java index ad92c8a4ff3b..75c7aba5f995 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionService.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionService.java @@ -22,15 +22,18 @@ import java.lang.reflect.Constructor; import java.lang.reflect.Method; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.ServiceLoader; import java.util.Set; +import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.beam.model.expansion.v1.ExpansionApi; import org.apache.beam.model.expansion.v1.ExpansionServiceGrpc; import org.apache.beam.model.pipeline.v1.ExternalTransforms; import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.BeamUrns; import org.apache.beam.runners.core.construction.CoderTranslation; import org.apache.beam.runners.core.construction.Environments; import org.apache.beam.runners.core.construction.PipelineTranslation; @@ -147,7 +150,8 @@ private static Object initConfiguration(Class camelCaseConverter = CaseFormat.LOWER_UNDERSCORE.converterTo(CaseFormat.LOWER_CAMEL); @@ -157,7 +161,9 @@ private static void populateConfiguration( ExternalTransforms.ConfigValue value = entry.getValue(); String fieldName = camelCaseConverter.convert(key); - Coder coder = resolveCoder(value.getCoderUrn()); + List coderUrns = value.getCoderUrnList(); + Preconditions.checkArgument(coderUrns.size() > 0, "No Coder URN provided."); + Coder coder = resolveCoder(coderUrns); Class type = coder.getEncodedTypeDescriptor().getRawType(); String setterName = @@ -177,19 +183,46 @@ private static void populateConfiguration( } } - private static Coder resolveCoder(String coderUrn) throws Exception { - RunnerApi.Coder coder = + private static Coder resolveCoder(List coderUrns) throws Exception { + Preconditions.checkArgument(coderUrns.size() > 0, "No Coder URN provided."); + RunnerApi.Components.Builder componentsBuilder = RunnerApi.Components.newBuilder(); + RunnerApi.Coder coder = buildProto(0, coderUrns, componentsBuilder); + + RehydratedComponents rehydratedComponents = + RehydratedComponents.forComponents(componentsBuilder.build()); + return CoderTranslation.fromProto(coder, rehydratedComponents); + } + + private static RunnerApi.Coder buildProto( + int coderPos, List coderUrns, RunnerApi.Components.Builder componentsBuilder) { + Preconditions.checkArgument( + coderPos < coderUrns.size(), "Pointer into coderURNs is not correct."); + + final String coderUrn = coderUrns.get(coderPos); + RunnerApi.Coder.Builder coderBuilder = RunnerApi.Coder.newBuilder() .setSpec( RunnerApi.SdkFunctionSpec.newBuilder() - .setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(coderUrn).build())) - .build(); + .setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(coderUrn).build()) + .build()); - // TODO This uses simple structured coders, need to support compound coders - RunnerApi.Components components = RunnerApi.Components.newBuilder().build(); - RehydratedComponents rehydratedComponents = RehydratedComponents.forComponents(components); + if (coderUrn.equals(BeamUrns.getUrn(RunnerApi.StandardCoders.Enum.ITERABLE))) { + RunnerApi.Coder elementCoder = buildProto(coderPos + 1, coderUrns, componentsBuilder); + String coderId = UUID.randomUUID().toString(); + componentsBuilder.putCoders(coderId, elementCoder); + coderBuilder.addComponentCoderIds(coderId); + } else if (coderUrn.equals(BeamUrns.getUrn(RunnerApi.StandardCoders.Enum.KV))) { + RunnerApi.Coder element1Coder = buildProto(coderPos + 1, coderUrns, componentsBuilder); + RunnerApi.Coder element2Coder = buildProto(coderPos + 2, coderUrns, componentsBuilder); + String coderId1 = UUID.randomUUID().toString(); + String coderId2 = UUID.randomUUID().toString(); + componentsBuilder.putCoders(coderId1, element1Coder); + componentsBuilder.putCoders(coderId2, element2Coder); + coderBuilder.addComponentCoderIds(coderId1); + coderBuilder.addComponentCoderIds(coderId2); + } - return CoderTranslation.fromProto(coder, rehydratedComponents); + return coderBuilder.build(); } private static PTransform buildTransform( diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/expansion/ExpansionServiceTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/expansion/ExpansionServiceTest.java index 8fcbdce920dc..d740296c062c 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/expansion/ExpansionServiceTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/expansion/ExpansionServiceTest.java @@ -18,24 +18,35 @@ package org.apache.beam.runners.core.construction.expansion; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import com.google.auto.service.AutoService; +import java.io.ByteArrayOutputStream; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import org.apache.beam.model.expansion.v1.ExpansionApi; import org.apache.beam.model.pipeline.v1.ExternalTransforms; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.construction.BeamUrns; import org.apache.beam.runners.core.construction.PipelineTranslation; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Impulse; +import org.apache.beam.sdk.values.KV; import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.ByteString; +import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Charsets; +import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables; import org.hamcrest.Matchers; @@ -105,13 +116,13 @@ public void testConstructGenerateSequence() { .putConfiguration( "start", ExternalTransforms.ConfigValue.newBuilder() - .setCoderUrn(BeamUrns.getUrn(RunnerApi.StandardCoders.Enum.VARINT)) + .addCoderUrn(BeamUrns.getUrn(RunnerApi.StandardCoders.Enum.VARINT)) .setPayload(ByteString.copyFrom(new byte[] {0})) .build()) .putConfiguration( "stop", ExternalTransforms.ConfigValue.newBuilder() - .setCoderUrn(BeamUrns.getUrn(RunnerApi.StandardCoders.Enum.VARINT)) + .addCoderUrn(BeamUrns.getUrn(RunnerApi.StandardCoders.Enum.VARINT)) .setPayload(ByteString.copyFrom(new byte[] {1})) .build()) .build(); @@ -137,6 +148,81 @@ public void testConstructGenerateSequence() { assertThat(expandedTransform.getSubtransformsCount(), Matchers.greaterThan(0)); } + @Test + public void testCompoundCodersForExternalConfiguration() throws Exception { + ExternalTransforms.ExternalConfigurationPayload.Builder builder = + ExternalTransforms.ExternalConfigurationPayload.newBuilder(); + + builder.putConfiguration( + "config_key1", + ExternalTransforms.ConfigValue.newBuilder() + .addCoderUrn(BeamUrns.getUrn(RunnerApi.StandardCoders.Enum.VARINT)) + .setPayload(ByteString.copyFrom(new byte[] {1})) + .build()); + + List byteList = + ImmutableList.of("testing", "compound", "coders").stream() + .map(str -> str.getBytes(Charsets.UTF_8)) + .collect(Collectors.toList()); + IterableCoder compoundCoder = IterableCoder.of(ByteArrayCoder.of()); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + compoundCoder.encode(byteList, baos); + + builder.putConfiguration( + "config_key2", + ExternalTransforms.ConfigValue.newBuilder() + .addCoderUrn(BeamUrns.getUrn(RunnerApi.StandardCoders.Enum.ITERABLE)) + .addCoderUrn(BeamUrns.getUrn(RunnerApi.StandardCoders.Enum.BYTES)) + .setPayload(ByteString.copyFrom(baos.toByteArray())) + .build()); + + List> byteKvList = + ImmutableList.of("testing", "compound", "coders").stream() + .map(str -> KV.of(str.getBytes(Charsets.UTF_8), (long) str.length())) + .collect(Collectors.toList()); + IterableCoder> compoundCoder2 = + IterableCoder.of(KvCoder.of(ByteArrayCoder.of(), VarLongCoder.of())); + ByteArrayOutputStream baos2 = new ByteArrayOutputStream(); + compoundCoder2.encode(byteKvList, baos2); + + builder.putConfiguration( + "config_key3", + ExternalTransforms.ConfigValue.newBuilder() + .addCoderUrn(BeamUrns.getUrn(RunnerApi.StandardCoders.Enum.ITERABLE)) + .addCoderUrn(BeamUrns.getUrn(RunnerApi.StandardCoders.Enum.KV)) + .addCoderUrn(BeamUrns.getUrn(RunnerApi.StandardCoders.Enum.BYTES)) + .addCoderUrn(BeamUrns.getUrn(RunnerApi.StandardCoders.Enum.VARINT)) + .setPayload(ByteString.copyFrom(baos2.toByteArray())) + .build()); + + ExternalTransforms.ExternalConfigurationPayload externalConfig = builder.build(); + TestConfig config = new TestConfig(); + ExpansionService.ExternalTransformRegistrarLoader.populateConfiguration(config, externalConfig); + + assertThat(config.configKey1, Matchers.is(1L)); + assertArrayEquals(Iterables.toArray(config.configKey2, byte[].class), byteList.toArray()); + assertArrayEquals(Iterables.toArray(config.configKey3, KV.class), byteKvList.toArray()); + } + + private static class TestConfig { + + private Long configKey1; + private Iterable configKey2; + private Iterable> configKey3; + + public void setConfigKey1(Long configKey1) { + this.configKey1 = configKey1; + } + + public void setConfigKey2(Iterable configKey2) { + this.configKey2 = configKey2; + } + + public void setConfigKey3(Iterable> configKey3) { + this.configKey3 = configKey3; + } + } + public Set allIds(RunnerApi.Components components) { Set all = new HashSet<>(); all.addAll(components.getTransformsMap().keySet()); diff --git a/sdks/python/apache_beam/io/external/generate_sequence.py b/sdks/python/apache_beam/io/external/generate_sequence.py index 2918c7971e2a..f1b6a37c0058 100644 --- a/sdks/python/apache_beam/io/external/generate_sequence.py +++ b/sdks/python/apache_beam/io/external/generate_sequence.py @@ -32,7 +32,7 @@ def __init__(self, start, stop=None, elements_per_period=None, max_read_time=None, expansion_service=None): coder = VarIntCoder() - coder_urn = 'beam:coder:varint:v1' + coder_urn = ['beam:coder:varint:v1'] args = { 'start': ConfigValue(