From 95bd4b2853f5571f12845df83824b044720052a3 Mon Sep 17 00:00:00 2001 From: Chad Dombrova Date: Tue, 6 Aug 2019 16:11:21 -0700 Subject: [PATCH 1/6] [BEAM-7738] Add external transform support to PubsubIO --- .../flink/job-server/flink_job_server.gradle | 1 + .../io/google-cloud-platform/build.gradle | 4 + .../beam/sdk/io/gcp/pubsub/PubsubIO.java | 181 ++++++++++++- .../io/gcp/pubsub/PubsubIOExternalTest.java | 250 ++++++++++++++++++ .../apache_beam/io/external/gcp/__init__.py | 18 ++ .../apache_beam/io/external/gcp/pubsub.py | 183 +++++++++++++ 6 files changed, 635 insertions(+), 2 deletions(-) create mode 100644 sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java create mode 100644 sdks/python/apache_beam/io/external/gcp/__init__.py create mode 100644 sdks/python/apache_beam/io/external/gcp/pubsub.py diff --git a/runners/flink/job-server/flink_job_server.gradle b/runners/flink/job-server/flink_job_server.gradle index ee856e791ffe..b93b950dc024 100644 --- a/runners/flink/job-server/flink_job_server.gradle +++ b/runners/flink/job-server/flink_job_server.gradle @@ -89,6 +89,7 @@ dependencies { // For resolving external transform requests runtime project(":sdks:java:io:kafka") runtime library.java.kafka_clients + runtime project(":sdks:java:io:google-cloud-platform") } // NOTE: runShadow must be used in order to run the job server. The standard run diff --git a/sdks/java/io/google-cloud-platform/build.gradle b/sdks/java/io/google-cloud-platform/build.gradle index 0a9b8a9195cc..0c1befd311c9 100644 --- a/sdks/java/io/google-cloud-platform/build.gradle +++ b/sdks/java/io/google-cloud-platform/build.gradle @@ -69,6 +69,10 @@ dependencies { testCompile project(path: ":sdks:java:core", configuration: "shadowTest") testCompile project(path: ":sdks:java:extensions:google-cloud-platform-core", configuration: "testRuntime") testCompile project(path: ":runners:direct-java", configuration: "shadow") + testCompile project(path: ":sdks:java:io:common", configuration: "testRuntime") + testCompile project(path: ":sdks:java:testing:test-utils", configuration: "testRuntime") + // For testing Cross-language transforms + testCompile project(":runners:core-construction-java") testCompile library.java.hamcrest_core testCompile library.java.hamcrest_library testCompile library.java.junit diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index 4f745ea9a4bb..a919f44ff48f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -20,7 +20,10 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; import com.google.api.client.util.Clock; +import com.google.auto.service.AutoService; import com.google.auto.value.AutoValue; +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Message; import java.io.IOException; import java.io.Serializable; @@ -39,9 +42,11 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.expansion.ExternalTransformRegistrar; import org.apache.beam.sdk.extensions.protobuf.ProtoCoder; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.ProjectPath; @@ -53,6 +58,7 @@ import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.utils.AvroUtils; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ExternalTransformBuilder; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -66,6 +72,7 @@ import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.joda.time.Instant; @@ -705,7 +712,8 @@ public abstract static class Read extends PTransform> abstract Builder toBuilder(); @AutoValue.Builder - abstract static class Builder { + abstract static class Builder + implements ExternalTransformBuilder> { abstract Builder setTopicProvider(ValueProvider topic); abstract Builder setPubsubClientFactory(PubsubClient.PubsubClientFactory clientFactory); @@ -733,6 +741,86 @@ abstract static class Builder { abstract Builder setClock(@Nullable Clock clock); abstract Read build(); + + @Override + public PTransform> buildExternal(External.Configuration config) { + if (config.topic != null) { + StaticValueProvider topic = StaticValueProvider.of(utf8String(config.topic)); + setTopicProvider(NestedValueProvider.of(topic, new TopicTranslator())); + } + if (config.subscription != null) { + StaticValueProvider subscription = + StaticValueProvider.of(utf8String(config.subscription)); + setSubscriptionProvider( + NestedValueProvider.of(subscription, new SubscriptionTranslator())); + } + if (config.idAttribute != null) { + String idAttribute = utf8String(config.idAttribute); + setIdAttribute(idAttribute); + } + if (config.timestampAttribute != null) { + String timestampAttribute = utf8String(config.timestampAttribute); + setTimestampAttribute(timestampAttribute); + } + setPubsubClientFactory(FACTORY); + setNeedsAttributes(config.needsAttributes); + Coder coder = ByteArrayCoder.of(); + if (config.needsAttributes) { + SimpleFunction parseFn = + (SimpleFunction) new ParsePayloadAsPubsubMessageProto(); + setParseFn(parseFn); + setCoder(coder); + } else { + setParseFn(new ParsePayloadUsingCoder<>(coder)); + setCoder(coder); + } + setNeedsMessageId(false); + return build(); + } + } + + /** Exposes {@link PubSubIO.Read} as an external transform for cross-language usage. */ + @Experimental + @AutoService(ExternalTransformRegistrar.class) + public static class External implements ExternalTransformRegistrar { + + public static final String URN = "beam:external:java:pubsub:read:v1"; + + @Override + public Map> knownBuilders() { + return ImmutableMap.of(URN, AutoValue_PubsubIO_Read.Builder.class); + } + + /** Parameters class to expose the transform to an external SDK. */ + public static class Configuration { + + // All byte arrays are UTF-8 encoded strings + @Nullable private byte[] topic; + @Nullable private byte[] subscription; + @Nullable private byte[] idAttribute; + @Nullable private byte[] timestampAttribute; + private boolean needsAttributes; + + public void setTopic(@Nullable byte[] topic) { + this.topic = topic; + } + + public void setSubscription(@Nullable byte[] subscription) { + this.subscription = subscription; + } + + public void setIdLabel(@Nullable byte[] idAttribute) { + this.idAttribute = idAttribute; + } + + public void setTimestampAttribute(@Nullable byte[] timestampAttribute) { + this.timestampAttribute = timestampAttribute; + } + + public void setWithAttributes(Long needsAttributes) { + this.needsAttributes = needsAttributes >= 1; + } + } } /** @@ -955,7 +1043,8 @@ public abstract static class Write extends PTransform, PDone> abstract Builder toBuilder(); @AutoValue.Builder - abstract static class Builder { + abstract static class Builder + implements ExternalTransformBuilder, PDone> { abstract Builder setTopicProvider(ValueProvider topicProvider); abstract Builder setPubsubClientFactory(PubsubClient.PubsubClientFactory factory); @@ -971,6 +1060,60 @@ abstract static class Builder { abstract Builder setFormatFn(SimpleFunction formatFn); abstract Write build(); + + @Override + public PTransform, PDone> buildExternal(External.Configuration config) { + if (config.topic != null) { + StaticValueProvider topic = StaticValueProvider.of(utf8String(config.topic)); + setTopicProvider(NestedValueProvider.of(topic, new TopicTranslator())); + } + if (config.idAttribute != null) { + String idAttribute = utf8String(config.idAttribute); + setIdAttribute(idAttribute); + } + if (config.timestampAttribute != null) { + String timestampAttribute = utf8String(config.timestampAttribute); + setTimestampAttribute(timestampAttribute); + } + SimpleFunction parseFn = + (SimpleFunction) new FormatPayloadFromPubsubMessageProto(); + setFormatFn(parseFn); + return build(); + } + } + + /** Exposes {@link PubSubIO.Write} as an external transform for cross-language usage. */ + @Experimental + @AutoService(ExternalTransformRegistrar.class) + public static class External implements ExternalTransformRegistrar { + + public static final String URN = "beam:external:java:pubsub:write:v1"; + + @Override + public Map> knownBuilders() { + return ImmutableMap.of(URN, AutoValue_PubsubIO_Write.Builder.class); + } + + /** Parameters class to expose the transform to an external SDK. */ + public static class Configuration { + + // All byte arrays are UTF-8 encoded strings + private byte[] topic; + @Nullable private byte[] idAttribute; + @Nullable private byte[] timestampAttribute; + + public void setTopic(byte[] topic) { + this.topic = topic; + } + + public void setIdLabel(@Nullable byte[] idAttribute) { + this.idAttribute = idAttribute; + } + + public void setTimestampAttribute(@Nullable byte[] timestampAttribute) { + this.timestampAttribute = timestampAttribute; + } + } } /** @@ -1213,6 +1356,22 @@ public T apply(PubsubMessage input) { } } + private static class ParsePayloadAsPubsubMessageProto + extends SimpleFunction { + @Override + public byte[] apply(PubsubMessage input) { + Map attributes = input.getAttributeMap(); + com.google.pubsub.v1.PubsubMessage.Builder message = + com.google.pubsub.v1.PubsubMessage.newBuilder() + .setData(ByteString.copyFrom(input.getPayload())); + // TODO(BEAM-8085) this should not be null + if (attributes != null) { + message.putAllAttributes(attributes); + } + return message.build().toByteArray(); + } + } + private static class FormatPayloadAsUtf8 extends SimpleFunction { @Override public PubsubMessage apply(String input) { @@ -1237,10 +1396,28 @@ public PubsubMessage apply(T input) { } } + private static class FormatPayloadFromPubsubMessageProto + extends SimpleFunction { + @Override + public PubsubMessage apply(byte[] input) { + try { + com.google.pubsub.v1.PubsubMessage message = + com.google.pubsub.v1.PubsubMessage.parseFrom(input); + return new PubsubMessage(message.getData().toByteArray(), message.getAttributesMap()); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException("Could not decode Pubsub message", e); + } + } + } + private static class IdentityMessageFn extends SimpleFunction { @Override public PubsubMessage apply(PubsubMessage input) { return input; } } + + private static String utf8String(byte[] bytes) { + return new String(bytes, Charsets.UTF_8); + } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java new file mode 100644 index 000000000000..00ede35482c9 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import static org.hamcrest.MatcherAssert.assertThat; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import javax.annotation.Nullable; +import org.apache.beam.model.expansion.v1.ExpansionApi; +import org.apache.beam.model.pipeline.v1.ExternalTransforms; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.ParDoTranslation; +import org.apache.beam.runners.core.construction.PipelineTranslation; +import org.apache.beam.runners.core.construction.ReadTranslation; +import org.apache.beam.runners.core.construction.expansion.ExpansionService; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.ByteString; +import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.StreamObserver; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; +import org.hamcrest.Matchers; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.powermock.reflect.Whitebox; + +/** Tests for building {@link PubsubIO} externally via the ExpansionService. */ +@RunWith(JUnit4.class) +public class PubsubIOExternalTest { + @Test + public void testConstructPubsubRead() throws Exception { + String topic = "projects/project-1234/topics/topic_name"; + String idAttribute = "id_foo"; + Long needsAttributes = 1L; + + ExternalTransforms.ExternalConfigurationPayload payload = + ExternalTransforms.ExternalConfigurationPayload.newBuilder() + .putConfiguration( + "topic", + ExternalTransforms.ConfigValue.newBuilder() + .addCoderUrn("beam:coder:bytes:v1") + .setPayload(ByteString.copyFrom(encodeString(topic))) + .build()) + .putConfiguration( + "id_label", + ExternalTransforms.ConfigValue.newBuilder() + .addCoderUrn("beam:coder:bytes:v1") + .setPayload(ByteString.copyFrom(encodeString(idAttribute))) + .build()) + .putConfiguration( + "with_attributes", + ExternalTransforms.ConfigValue.newBuilder() + .addCoderUrn("beam:coder:varint:v1") + .setPayload(ByteString.copyFrom(encodeVarLong(needsAttributes))) + .build()) + .build(); + + RunnerApi.Components defaultInstance = RunnerApi.Components.getDefaultInstance(); + ExpansionApi.ExpansionRequest request = + ExpansionApi.ExpansionRequest.newBuilder() + .setComponents(defaultInstance) + .setTransform( + RunnerApi.PTransform.newBuilder() + .setUniqueName("test") + .setSpec( + RunnerApi.FunctionSpec.newBuilder() + .setUrn("beam:external:java:pubsub:read:v1") + .setPayload(payload.toByteString()))) + .setNamespace("test_namespace") + .build(); + + ExpansionService expansionService = new ExpansionService(); + TestStreamObserver observer = new TestStreamObserver<>(); + expansionService.expand(request, observer); + + ExpansionApi.ExpansionResponse result = observer.result; + RunnerApi.PTransform transform = result.getTransform(); + assertThat( + transform.getSubtransformsList(), + Matchers.contains( + "test_namespacetest/PubsubUnboundedSource", "test_namespacetest/MapElements")); + assertThat(transform.getInputsCount(), Matchers.is(0)); + assertThat(transform.getOutputsCount(), Matchers.is(1)); + + RunnerApi.PTransform pubsubComposite = + result.getComponents().getTransformsOrThrow(transform.getSubtransforms(0)); + RunnerApi.PTransform pubsubRead = + result.getComponents().getTransformsOrThrow(pubsubComposite.getSubtransforms(0)); + RunnerApi.ReadPayload readPayload = + RunnerApi.ReadPayload.parseFrom(pubsubRead.getSpec().getPayload()); + PubsubUnboundedSource.PubsubSource source = + (PubsubUnboundedSource.PubsubSource) ReadTranslation.unboundedSourceFromProto(readPayload); + PubsubUnboundedSource spec = source.outer; + + assertThat( + spec.getTopicProvider() == null ? null : String.valueOf(spec.getTopicProvider()), + Matchers.is(topic)); + assertThat(spec.getIdAttribute(), Matchers.is(idAttribute)); + assertThat(spec.getNeedsAttributes(), Matchers.is(true)); + } + + @Test + public void testConstructPubsubWrite() throws Exception { + String topic = "projects/project-1234/topics/topic_name"; + String idAttribute = "id_foo"; + + ExternalTransforms.ExternalConfigurationPayload payload = + ExternalTransforms.ExternalConfigurationPayload.newBuilder() + .putConfiguration( + "topic", + ExternalTransforms.ConfigValue.newBuilder() + .addCoderUrn("beam:coder:bytes:v1") + .setPayload(ByteString.copyFrom(encodeString(topic))) + .build()) + .putConfiguration( + "id_label", + ExternalTransforms.ConfigValue.newBuilder() + .addCoderUrn("beam:coder:bytes:v1") + .setPayload(ByteString.copyFrom(encodeString(idAttribute))) + .build()) + .build(); + + Pipeline p = Pipeline.create(); + p.apply("unbounded", Create.of(1, 2, 3)).setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED); + + RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p); + String inputPCollection = + Iterables.getOnlyElement( + Iterables.getLast(pipelineProto.getComponents().getTransformsMap().values()) + .getOutputsMap() + .values()); + + ExpansionApi.ExpansionRequest request = + ExpansionApi.ExpansionRequest.newBuilder() + .setComponents(pipelineProto.getComponents()) + .setTransform( + RunnerApi.PTransform.newBuilder() + .setUniqueName("test") + .putInputs("input", inputPCollection) + .setSpec( + RunnerApi.FunctionSpec.newBuilder() + .setUrn("beam:external:java:pubsub:write:v1") + .setPayload(payload.toByteString()))) + .setNamespace("test_namespace") + .build(); + + ExpansionService expansionService = new ExpansionService(); + TestStreamObserver observer = new TestStreamObserver<>(); + expansionService.expand(request, observer); + + ExpansionApi.ExpansionResponse result = observer.result; + + RunnerApi.PTransform transform = result.getTransform(); + assertThat( + transform.getSubtransformsList(), + Matchers.contains( + "test_namespacetest/MapElements", "test_namespacetest/PubsubUnboundedSink")); + assertThat(transform.getInputsCount(), Matchers.is(1)); + assertThat(transform.getOutputsCount(), Matchers.is(0)); + + // test_namespacetest/PubsubUnboundedSink + RunnerApi.PTransform writeComposite = + result.getComponents().getTransformsOrThrow(transform.getSubtransforms(1)); + + // test_namespacetest/PubsubUnboundedSink/PubsubUnboundedSink.Writer + RunnerApi.PTransform writeComposite2 = + result.getComponents().getTransformsOrThrow(writeComposite.getSubtransforms(3)); + + // test_namespacetest/PubsubUnboundedSink/PubsubUnboundedSink.Writer/ParMultiDo(Writer) + RunnerApi.PTransform writeParDo = + result.getComponents().getTransformsOrThrow(writeComposite2.getSubtransforms(0)); + + RunnerApi.ParDoPayload parDoPayload = + RunnerApi.ParDoPayload.parseFrom(writeParDo.getSpec().getPayload()); + DoFn pubsubWriter = ParDoTranslation.getDoFn(parDoPayload); + + String idAttributeActual = (String) Whitebox.getInternalState(pubsubWriter, "idAttribute"); + + ValueProvider topicActual = + (ValueProvider) Whitebox.getInternalState(pubsubWriter, "topic"); + + assertThat(topicActual == null ? null : String.valueOf(topicActual), Matchers.is(topic)); + assertThat(idAttributeActual, Matchers.is(idAttribute)); + } + + private static byte[] encodeString(String str) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ByteArrayCoder.of().encode(utf8Bytes(str), baos); + return baos.toByteArray(); + } + + private static byte[] encodeVarLong(Long value) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + VarLongCoder.of().encode(value, baos); + return baos.toByteArray(); + } + + private static @Nullable String getTopic(@Nullable ValueProvider value) { + if (value == null) { + return null; + } + return String.valueOf(value); + } + + private static byte[] utf8Bytes(String str) { + Preconditions.checkNotNull(str, "String must not be null."); + return str.getBytes(Charsets.UTF_8); + } + + private static class TestStreamObserver implements StreamObserver { + + private T result; + + @Override + public void onNext(T t) { + result = t; + } + + @Override + public void onError(Throwable throwable) { + throw new RuntimeException("Should not happen", throwable); + } + + @Override + public void onCompleted() {} + } +} diff --git a/sdks/python/apache_beam/io/external/gcp/__init__.py b/sdks/python/apache_beam/io/external/gcp/__init__.py new file mode 100644 index 000000000000..6569e3fe5de4 --- /dev/null +++ b/sdks/python/apache_beam/io/external/gcp/__init__.py @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import absolute_import diff --git a/sdks/python/apache_beam/io/external/gcp/pubsub.py b/sdks/python/apache_beam/io/external/gcp/pubsub.py new file mode 100644 index 000000000000..4ff954419b2e --- /dev/null +++ b/sdks/python/apache_beam/io/external/gcp/pubsub.py @@ -0,0 +1,183 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import absolute_import + +from apache_beam import ExternalTransform +from apache_beam import pvalue +from apache_beam.coders import BytesCoder +from apache_beam.coders import VarIntCoder +from apache_beam.coders.coders import LengthPrefixCoder +from apache_beam.io.gcp import pubsub +from apache_beam.portability.api.external_transforms_pb2 import ConfigValue +from apache_beam.portability.api.external_transforms_pb2 import ExternalConfigurationPayload +from apache_beam.transforms import Map +from apache_beam.transforms import ptransform + + +class ReadFromPubSub(ptransform.PTransform): + """An external ``PTransform`` for reading from Cloud Pub/Sub.""" + + _urn = 'beam:external:java:pubsub:read:v1' + + def __init__(self, topic=None, subscription=None, id_label=None, + with_attributes=False, timestamp_attribute=None, + expansion_service='localhost:8097'): + """Initializes ``ReadFromPubSub``. + + Args: + topic: Cloud Pub/Sub topic in the form + "projects//topics/". If provided, subscription must be + None. + subscription: Existing Cloud Pub/Sub subscription to use in the + form "projects//subscriptions/". If not + specified, a temporary subscription will be created from the specified + topic. If provided, topic must be None. + id_label: The attribute on incoming Pub/Sub messages to use as a unique + record identifier. When specified, the value of this attribute (which + can be any string that uniquely identifies the record) will be used for + deduplication of messages. If not provided, we cannot guarantee + that no duplicate data will be delivered on the Pub/Sub stream. In this + case, deduplication of the stream will be strictly best effort. + with_attributes: + True - output elements will be :class:`~PubsubMessage` objects. + False - output elements will be of type ``bytes`` (message + data only). + timestamp_attribute: Message value to use as element timestamp. If None, + uses message publishing time as the timestamp. + + Timestamp values should be in one of two formats: + + - A numerical value representing the number of milliseconds since the + Unix epoch. + - A string in RFC 3339 format, UTC timezone. Example: + ``2015-10-29T23:41:41.123Z``. The sub-second component of the + timestamp is optional, and digits beyond the first three (i.e., time + units smaller than milliseconds) may be ignored. + """ + super(ReadFromPubSub, self).__init__() + self.topic = topic + self.subscription = subscription + self.id_label = id_label + self.with_attributes = with_attributes + self.timestamp_attribute = timestamp_attribute + self.expansion_service = expansion_service + + def expand(self, pbegin): + if not isinstance(pbegin, pvalue.PBegin): + raise Exception("ReadFromPubSub must be a root transform") + + args = { + 'with_attributes': _encode_bool(self.with_attributes), + } + + if self.topic is not None: + args['topic'] = _encode_str(self.topic) + + if self.subscription is not None: + args['subscription'] = _encode_str(self.subscription) + + if self.id_label is not None: + args['id_label'] = _encode_str(self.id_label) + + if self.timestamp_attribute is not None: + args['timestamp_attribute'] = _encode_str(self.timestamp_attribute) + + payload = ExternalConfigurationPayload(configuration=args) + pcoll = pbegin.apply( + ExternalTransform( + self._urn, + payload.SerializeToString(), + self.expansion_service)) + if self.with_attributes: + pcoll = pcoll | Map(pubsub.PubsubMessage._from_proto_str) + pcoll.element_type = pubsub.PubsubMessage + else: + pcoll.element_type = bytes + return pcoll + + +class WriteToPubSub(ptransform.PTransform): + """An external ``PTransform`` for writing messages to Cloud Pub/Sub.""" + + _urn = 'beam:external:java:pubsub:write:v1' + + def __init__(self, topic, with_attributes=False, id_label=None, + timestamp_attribute=None, expansion_service='localhost:8097'): + """Initializes ``WriteToPubSub``. + + Args: + topic: Cloud Pub/Sub topic in the form "/topics//". + with_attributes: + True - input elements will be :class:`~PubsubMessage` objects. + False - input elements will be of type ``bytes`` (message + data only). + id_label: If set, will set an attribute for each Cloud Pub/Sub message + with the given name and a unique value. This attribute can then be used + in a ReadFromPubSub PTransform to deduplicate messages. + timestamp_attribute: If set, will set an attribute for each Cloud Pub/Sub + message with the given name and the message's publish time as the value. + """ + super(WriteToPubSub, self).__init__() + self.topic = topic + self.with_attributes = with_attributes + self.id_label = id_label + self.timestamp_attribute = timestamp_attribute + self.expansion_service = expansion_service + + def expand(self, pvalue): + + if self.with_attributes: + pcoll = pvalue | 'ToProtobuf' >> Map(pubsub.WriteToPubSub.to_proto_str) + else: + pcoll = pvalue | 'ToProtobuf' >> Map( + lambda x: pubsub.PubsubMessage(x, {})._to_proto_str()) + pcoll.element_type = bytes + + args = { + 'topic': _encode_str(self.topic), + } + + if self.id_label is not None: + args['id_label'] = _encode_str(self.id_label) + + if self.timestamp_attribute is not None: + args['timestamp_attribute'] = _encode_str(self.timestamp_attribute) + + payload = ExternalConfigurationPayload(configuration=args) + return pcoll.apply( + ExternalTransform( + self._urn, + payload.SerializeToString(), + self.expansion_service)) + + +def _encode_str(str_obj): + encoded_str = str_obj.encode('utf-8') + coder = LengthPrefixCoder(BytesCoder()) + coder_urns = ['beam:coder:bytes:v1'] + return ConfigValue( + coder_urn=coder_urns, + payload=coder.encode(encoded_str)) + + +def _encode_bool(bool_obj): + coder = VarIntCoder() + coder_urns = ['beam:coder:varint:v1'] + return ConfigValue( + coder_urn=coder_urns, + payload=coder.encode(int(bool_obj))) From 61814dd90bd9c01157802d2d7ff527bee15058fd Mon Sep 17 00:00:00 2001 From: Chad Dombrova Date: Thu, 26 Sep 2019 16:57:55 -0700 Subject: [PATCH 2/6] Adapt io.external.pubsub reader and writer to new API --- .../beam/sdk/io/gcp/pubsub/PubsubIO.java | 52 +++--- .../io/gcp/pubsub/PubsubIOExternalTest.java | 21 +-- .../apache_beam/io/external/gcp/pubsub.py | 157 ++++++++---------- .../apache_beam/transforms/external_test.py | 12 ++ .../transforms/external_test_py3.py | 4 + .../transforms/external_test_py37.py | 2 + 6 files changed, 113 insertions(+), 135 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index a919f44ff48f..5fddeb145920 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -72,7 +72,6 @@ import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.joda.time.Instant; @@ -745,22 +744,19 @@ abstract static class Builder @Override public PTransform> buildExternal(External.Configuration config) { if (config.topic != null) { - StaticValueProvider topic = StaticValueProvider.of(utf8String(config.topic)); + StaticValueProvider topic = StaticValueProvider.of(config.topic); setTopicProvider(NestedValueProvider.of(topic, new TopicTranslator())); } if (config.subscription != null) { - StaticValueProvider subscription = - StaticValueProvider.of(utf8String(config.subscription)); + StaticValueProvider subscription = StaticValueProvider.of(config.subscription); setSubscriptionProvider( NestedValueProvider.of(subscription, new SubscriptionTranslator())); } if (config.idAttribute != null) { - String idAttribute = utf8String(config.idAttribute); - setIdAttribute(idAttribute); + setIdAttribute(config.idAttribute); } if (config.timestampAttribute != null) { - String timestampAttribute = utf8String(config.timestampAttribute); - setTimestampAttribute(timestampAttribute); + setTimestampAttribute(config.timestampAttribute); } setPubsubClientFactory(FACTORY); setNeedsAttributes(config.needsAttributes); @@ -795,25 +791,25 @@ public Map> knownBuilders() { public static class Configuration { // All byte arrays are UTF-8 encoded strings - @Nullable private byte[] topic; - @Nullable private byte[] subscription; - @Nullable private byte[] idAttribute; - @Nullable private byte[] timestampAttribute; + @Nullable private String topic; + @Nullable private String subscription; + @Nullable private String idAttribute; + @Nullable private String timestampAttribute; private boolean needsAttributes; - public void setTopic(@Nullable byte[] topic) { + public void setTopic(@Nullable String topic) { this.topic = topic; } - public void setSubscription(@Nullable byte[] subscription) { + public void setSubscription(@Nullable String subscription) { this.subscription = subscription; } - public void setIdLabel(@Nullable byte[] idAttribute) { + public void setIdLabel(@Nullable String idAttribute) { this.idAttribute = idAttribute; } - public void setTimestampAttribute(@Nullable byte[] timestampAttribute) { + public void setTimestampAttribute(@Nullable String timestampAttribute) { this.timestampAttribute = timestampAttribute; } @@ -1064,16 +1060,14 @@ abstract static class Builder @Override public PTransform, PDone> buildExternal(External.Configuration config) { if (config.topic != null) { - StaticValueProvider topic = StaticValueProvider.of(utf8String(config.topic)); + StaticValueProvider topic = StaticValueProvider.of(config.topic); setTopicProvider(NestedValueProvider.of(topic, new TopicTranslator())); } if (config.idAttribute != null) { - String idAttribute = utf8String(config.idAttribute); - setIdAttribute(idAttribute); + setIdAttribute(config.idAttribute); } if (config.timestampAttribute != null) { - String timestampAttribute = utf8String(config.timestampAttribute); - setTimestampAttribute(timestampAttribute); + setTimestampAttribute(config.timestampAttribute); } SimpleFunction parseFn = (SimpleFunction) new FormatPayloadFromPubsubMessageProto(); @@ -1098,19 +1092,19 @@ public Map> knownBuilders() { public static class Configuration { // All byte arrays are UTF-8 encoded strings - private byte[] topic; - @Nullable private byte[] idAttribute; - @Nullable private byte[] timestampAttribute; + private String topic; + @Nullable private String idAttribute; + @Nullable private String timestampAttribute; - public void setTopic(byte[] topic) { + public void setTopic(String topic) { this.topic = topic; } - public void setIdLabel(@Nullable byte[] idAttribute) { + public void setIdLabel(@Nullable String idAttribute) { this.idAttribute = idAttribute; } - public void setTimestampAttribute(@Nullable byte[] timestampAttribute) { + public void setTimestampAttribute(@Nullable String timestampAttribute) { this.timestampAttribute = timestampAttribute; } } @@ -1416,8 +1410,4 @@ public PubsubMessage apply(PubsubMessage input) { return input; } } - - private static String utf8String(byte[] bytes) { - return new String(bytes, Charsets.UTF_8); - } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java index 00ede35482c9..ea43ec0522ef 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java @@ -30,7 +30,7 @@ import org.apache.beam.runners.core.construction.ReadTranslation; import org.apache.beam.runners.core.construction.expansion.ExpansionService; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.Create; @@ -38,8 +38,6 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.ByteString; import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.StreamObserver; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; import org.hamcrest.Matchers; import org.junit.Test; @@ -61,19 +59,19 @@ public void testConstructPubsubRead() throws Exception { .putConfiguration( "topic", ExternalTransforms.ConfigValue.newBuilder() - .addCoderUrn("beam:coder:bytes:v1") + .addCoderUrn("beam:coder:string_utf8:v1") .setPayload(ByteString.copyFrom(encodeString(topic))) .build()) .putConfiguration( "id_label", ExternalTransforms.ConfigValue.newBuilder() - .addCoderUrn("beam:coder:bytes:v1") + .addCoderUrn("beam:coder:string_utf8:v1") .setPayload(ByteString.copyFrom(encodeString(idAttribute))) .build()) .putConfiguration( "with_attributes", ExternalTransforms.ConfigValue.newBuilder() - .addCoderUrn("beam:coder:varint:v1") + .addCoderUrn("beam:coder:string_utf8:v1") .setPayload(ByteString.copyFrom(encodeVarLong(needsAttributes))) .build()) .build(); @@ -132,13 +130,13 @@ public void testConstructPubsubWrite() throws Exception { .putConfiguration( "topic", ExternalTransforms.ConfigValue.newBuilder() - .addCoderUrn("beam:coder:bytes:v1") + .addCoderUrn("beam:coder:string_utf8:v1") .setPayload(ByteString.copyFrom(encodeString(topic))) .build()) .putConfiguration( "id_label", ExternalTransforms.ConfigValue.newBuilder() - .addCoderUrn("beam:coder:bytes:v1") + .addCoderUrn("beam:coder:string_utf8:v1") .setPayload(ByteString.copyFrom(encodeString(idAttribute))) .build()) .build(); @@ -208,7 +206,7 @@ public void testConstructPubsubWrite() throws Exception { private static byte[] encodeString(String str) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); - ByteArrayCoder.of().encode(utf8Bytes(str), baos); + StringUtf8Coder.of().encode(str, baos); return baos.toByteArray(); } @@ -225,11 +223,6 @@ private static byte[] encodeVarLong(Long value) throws IOException { return String.valueOf(value); } - private static byte[] utf8Bytes(String str) { - Preconditions.checkNotNull(str, "String must not be null."); - return str.getBytes(Charsets.UTF_8); - } - private static class TestStreamObserver implements StreamObserver { private T result; diff --git a/sdks/python/apache_beam/io/external/gcp/pubsub.py b/sdks/python/apache_beam/io/external/gcp/pubsub.py index 4ff954419b2e..964a646b623e 100644 --- a/sdks/python/apache_beam/io/external/gcp/pubsub.py +++ b/sdks/python/apache_beam/io/external/gcp/pubsub.py @@ -17,26 +17,35 @@ from __future__ import absolute_import -from apache_beam import ExternalTransform -from apache_beam import pvalue -from apache_beam.coders import BytesCoder -from apache_beam.coders import VarIntCoder -from apache_beam.coders.coders import LengthPrefixCoder -from apache_beam.io.gcp import pubsub -from apache_beam.portability.api.external_transforms_pb2 import ConfigValue -from apache_beam.portability.api.external_transforms_pb2 import ExternalConfigurationPayload -from apache_beam.transforms import Map -from apache_beam.transforms import ptransform +import typing +from past.builtins import unicode -class ReadFromPubSub(ptransform.PTransform): +from apache_beam.io.gcp import pubsub +from apache_beam.transforms import Map +from apache_beam.transforms.external import ExternalTransform +from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder + +ReadFromPubsubSchema = typing.NamedTuple( + 'ReadFromPubsubSchema', + [ + ('topic', typing.Optional[unicode]), + ('subscription', typing.Optional[unicode]), + ('id_label', typing.Optional[unicode]), + ('with_attributes', bool), + ('timestamp_attribute', typing.Optional[unicode]), + ] +) + + +class ReadFromPubSub(ExternalTransform): """An external ``PTransform`` for reading from Cloud Pub/Sub.""" - _urn = 'beam:external:java:pubsub:read:v1' + URN = 'beam:external:java:pubsub:read:v1' def __init__(self, topic=None, subscription=None, id_label=None, with_attributes=False, timestamp_attribute=None, - expansion_service='localhost:8097'): + expansion_service=None): """Initializes ``ReadFromPubSub``. Args: @@ -69,40 +78,23 @@ def __init__(self, topic=None, subscription=None, id_label=None, timestamp is optional, and digits beyond the first three (i.e., time units smaller than milliseconds) may be ignored. """ - super(ReadFromPubSub, self).__init__() - self.topic = topic - self.subscription = subscription - self.id_label = id_label + super(ReadFromPubSub, self).__init__( + self.URN, + NamedTupleBasedPayloadBuilder( + ReadFromPubsubSchema( + topic=topic, + subscription=subscription, + id_label=id_label, + with_attributes=with_attributes, + timestamp_attribute=timestamp_attribute, + ) + ), + expansion_service + ) self.with_attributes = with_attributes - self.timestamp_attribute = timestamp_attribute - self.expansion_service = expansion_service - - def expand(self, pbegin): - if not isinstance(pbegin, pvalue.PBegin): - raise Exception("ReadFromPubSub must be a root transform") - - args = { - 'with_attributes': _encode_bool(self.with_attributes), - } - if self.topic is not None: - args['topic'] = _encode_str(self.topic) - - if self.subscription is not None: - args['subscription'] = _encode_str(self.subscription) - - if self.id_label is not None: - args['id_label'] = _encode_str(self.id_label) - - if self.timestamp_attribute is not None: - args['timestamp_attribute'] = _encode_str(self.timestamp_attribute) - - payload = ExternalConfigurationPayload(configuration=args) - pcoll = pbegin.apply( - ExternalTransform( - self._urn, - payload.SerializeToString(), - self.expansion_service)) + def expand(self, pcoll): + pcoll = super(ReadFromPubSub, self).expand(pcoll) if self.with_attributes: pcoll = pcoll | Map(pubsub.PubsubMessage._from_proto_str) pcoll.element_type = pubsub.PubsubMessage @@ -111,13 +103,25 @@ def expand(self, pbegin): return pcoll -class WriteToPubSub(ptransform.PTransform): +WriteToPubsubSchema = typing.NamedTuple( + 'WriteToPubsubSchema', + [ + ('topic', unicode), + ('id_label', typing.Optional[unicode]), + # this is not implemented yet on the Java side: + # ('with_attributes', bool), + ('timestamp_attribute', typing.Optional[unicode]), + ] +) + + +class WriteToPubSub(beam.PTransform): """An external ``PTransform`` for writing messages to Cloud Pub/Sub.""" - _urn = 'beam:external:java:pubsub:write:v1' + URN = 'beam:external:java:pubsub:write:v1' def __init__(self, topic, with_attributes=False, id_label=None, - timestamp_attribute=None, expansion_service='localhost:8097'): + timestamp_attribute=None, expansion_service=None): """Initializes ``WriteToPubSub``. Args: @@ -132,52 +136,25 @@ def __init__(self, topic, with_attributes=False, id_label=None, timestamp_attribute: If set, will set an attribute for each Cloud Pub/Sub message with the given name and the message's publish time as the value. """ - super(WriteToPubSub, self).__init__() - self.topic = topic + super(WriteToPubSub, self).__init__( + self.URN, + NamedTupleBasedPayloadBuilder( + WriteToPubsubSchema( + topic=topic, + id_label=id_label, + # with_attributes=with_attributes, + timestamp_attribute=timestamp_attribute, + ) + ), + expansion_service + ) self.with_attributes = with_attributes - self.id_label = id_label - self.timestamp_attribute = timestamp_attribute - self.expansion_service = expansion_service def expand(self, pvalue): - if self.with_attributes: - pcoll = pvalue | 'ToProtobuf' >> Map(pubsub.WriteToPubSub.to_proto_str) + pcoll = pvalue | 'ToProto' >> Map(pubsub.WriteToPubSub.to_proto_str) else: - pcoll = pvalue | 'ToProtobuf' >> Map( + pcoll = pvalue | 'ToProto' >> Map( lambda x: pubsub.PubsubMessage(x, {})._to_proto_str()) pcoll.element_type = bytes - - args = { - 'topic': _encode_str(self.topic), - } - - if self.id_label is not None: - args['id_label'] = _encode_str(self.id_label) - - if self.timestamp_attribute is not None: - args['timestamp_attribute'] = _encode_str(self.timestamp_attribute) - - payload = ExternalConfigurationPayload(configuration=args) - return pcoll.apply( - ExternalTransform( - self._urn, - payload.SerializeToString(), - self.expansion_service)) - - -def _encode_str(str_obj): - encoded_str = str_obj.encode('utf-8') - coder = LengthPrefixCoder(BytesCoder()) - coder_urns = ['beam:coder:bytes:v1'] - return ConfigValue( - coder_urn=coder_urns, - payload=coder.encode(encoded_str)) - - -def _encode_bool(bool_obj): - coder = VarIntCoder() - coder_urns = ['beam:coder:varint:v1'] - return ConfigValue( - coder_urn=coder_urns, - payload=coder.encode(int(bool_obj))) + return super(WriteToPubSub, self).expand(pcoll) diff --git a/sdks/python/apache_beam/transforms/external_test.py b/sdks/python/apache_beam/transforms/external_test.py index ba315f98dfb5..fe269770c376 100644 --- a/sdks/python/apache_beam/transforms/external_test.py +++ b/sdks/python/apache_beam/transforms/external_test.py @@ -34,6 +34,7 @@ import apache_beam as beam from apache_beam import Pipeline +from apache_beam.coders import BooleanCoder from apache_beam.coders import FloatCoder from apache_beam.coders import IterableCoder from apache_beam.coders import StrUtf8Coder @@ -66,6 +67,7 @@ def get_payload(args): class PayloadBase(object): values = { 'integer_example': 1, + 'boolean': True, 'string_example': u'thing', 'list_of_strings': [u'foo', u'bar'], 'optional_kv': (u'key', 1.1), @@ -74,6 +76,7 @@ class PayloadBase(object): bytes_values = { 'integer_example': 1, + 'boolean': True, 'string_example': 'thing', 'list_of_strings': ['foo', 'bar'], 'optional_kv': ('key', 1.1), @@ -85,6 +88,10 @@ class PayloadBase(object): coder_urn=['beam:coder:varint:v1'], payload=VarIntCoder() .get_impl().encode_nested(values['integer_example'])), + 'boolean': ConfigValue( + coder_urn=['beam:coder:bool:v1'], + payload=BooleanCoder() + .get_impl().encode_nested(values['boolean'])), 'string_example': ConfigValue( coder_urn=['beam:coder:string_utf8:v1'], payload=StrUtf8Coder() @@ -151,6 +158,7 @@ def get_payload_from_typing_hints(self, values): 'TestSchema', [ ('integer_example', int), + ('boolean', bool), ('string_example', unicode), ('list_of_strings', typing.List[unicode]), ('optional_kv', typing.Optional[typing.Tuple[unicode, float]]), @@ -188,6 +196,10 @@ def test_implicit_payload_builder_with_bytes(self): coder_urn=['beam:coder:varint:v1'], payload=VarIntCoder() .get_impl().encode_nested(values['integer_example'])), + 'boolean': ConfigValue( + coder_urn=['beam:coder:bool:v1'], + payload=BooleanCoder() + .get_impl().encode_nested(values['boolean'])), 'string_example': ConfigValue( coder_urn=['beam:coder:bytes:v1'], payload=StrUtf8Coder() diff --git a/sdks/python/apache_beam/transforms/external_test_py3.py b/sdks/python/apache_beam/transforms/external_test_py3.py index 88fa870f17b5..980ad8027e60 100644 --- a/sdks/python/apache_beam/transforms/external_test_py3.py +++ b/sdks/python/apache_beam/transforms/external_test_py3.py @@ -43,6 +43,7 @@ class AnnotatedTransform(beam.ExternalTransform): def __init__(self, integer_example: int, + boolean: bool, string_example: str, list_of_strings: typing.List[str], optional_kv: typing.Optional[typing.Tuple[str, float]] = None, @@ -53,6 +54,7 @@ def __init__(self, AnnotationBasedPayloadBuilder( self, integer_example=integer_example, + boolean=boolean, string_example=string_example, list_of_strings=list_of_strings, optional_kv=optional_kv, @@ -69,6 +71,7 @@ class AnnotatedTransform(beam.ExternalTransform): def __init__(self, integer_example: int, + boolean: bool, string_example: str, list_of_strings: typehints.List[str], optional_kv: typehints.Optional[typehints.KV[str, float]] = None, @@ -79,6 +82,7 @@ def __init__(self, AnnotationBasedPayloadBuilder( self, integer_example=integer_example, + boolean=boolean, string_example=string_example, list_of_strings=list_of_strings, optional_kv=optional_kv, diff --git a/sdks/python/apache_beam/transforms/external_test_py37.py b/sdks/python/apache_beam/transforms/external_test_py37.py index ad1ff72f0cfe..1a3cc2dfa7ee 100644 --- a/sdks/python/apache_beam/transforms/external_test_py37.py +++ b/sdks/python/apache_beam/transforms/external_test_py37.py @@ -44,6 +44,7 @@ class DataclassTransform(beam.ExternalTransform): URN = 'beam:external:fakeurn:v1' integer_example: int + boolean: bool string_example: str list_of_strings: typing.List[str] optional_kv: typing.Optional[typing.Tuple[str, float]] = None @@ -59,6 +60,7 @@ class DataclassTransform(beam.ExternalTransform): URN = 'beam:external:fakeurn:v1' integer_example: int + boolean: bool string_example: str list_of_strings: typehints.List[str] optional_kv: typehints.Optional[typehints.KV[str, float]] = None From 6dff94d8de9021e3c221da09482ed5d606ec4b52 Mon Sep 17 00:00:00 2001 From: Chad Dombrova Date: Tue, 1 Oct 2019 14:43:47 -0700 Subject: [PATCH 3/6] Adjust the way that the new API is used to handle the encoded data Additional transforms must be added to convert to and from PubsubMessage proto objects --- .../apache_beam/io/external/gcp/pubsub.py | 64 +++++++++---------- 1 file changed, 31 insertions(+), 33 deletions(-) diff --git a/sdks/python/apache_beam/io/external/gcp/pubsub.py b/sdks/python/apache_beam/io/external/gcp/pubsub.py index 964a646b623e..5d0683b243b8 100644 --- a/sdks/python/apache_beam/io/external/gcp/pubsub.py +++ b/sdks/python/apache_beam/io/external/gcp/pubsub.py @@ -21,6 +21,7 @@ from past.builtins import unicode +import apache_beam as beam from apache_beam.io.gcp import pubsub from apache_beam.transforms import Map from apache_beam.transforms.external import ExternalTransform @@ -38,7 +39,7 @@ ) -class ReadFromPubSub(ExternalTransform): +class ReadFromPubSub(beam.PTransform): """An external ``PTransform`` for reading from Cloud Pub/Sub.""" URN = 'beam:external:java:pubsub:read:v1' @@ -78,25 +79,22 @@ def __init__(self, topic=None, subscription=None, id_label=None, timestamp is optional, and digits beyond the first three (i.e., time units smaller than milliseconds) may be ignored. """ - super(ReadFromPubSub, self).__init__( - self.URN, - NamedTupleBasedPayloadBuilder( - ReadFromPubsubSchema( - topic=topic, - subscription=subscription, - id_label=id_label, - with_attributes=with_attributes, - timestamp_attribute=timestamp_attribute, - ) - ), - expansion_service - ) - self.with_attributes = with_attributes - - def expand(self, pcoll): - pcoll = super(ReadFromPubSub, self).expand(pcoll) - if self.with_attributes: - pcoll = pcoll | Map(pubsub.PubsubMessage._from_proto_str) + self.params = ReadFromPubsubSchema( + topic=topic, + subscription=subscription, + id_label=id_label, + with_attributes=with_attributes, + timestamp_attribute=timestamp_attribute) + self.expansion_service = expansion_service + + def expand(self, pbegin): + pcoll = pbegin.apply( + ExternalTransform( + self.URN, NamedTupleBasedPayloadBuilder(self.params), + self.expansion_service)) + + if self.params.with_attributes: + pcoll = pcoll | 'FromProto' >> Map(pubsub.PubsubMessage._from_proto_str) pcoll.element_type = pubsub.PubsubMessage else: pcoll.element_type = bytes @@ -136,18 +134,12 @@ def __init__(self, topic, with_attributes=False, id_label=None, timestamp_attribute: If set, will set an attribute for each Cloud Pub/Sub message with the given name and the message's publish time as the value. """ - super(WriteToPubSub, self).__init__( - self.URN, - NamedTupleBasedPayloadBuilder( - WriteToPubsubSchema( - topic=topic, - id_label=id_label, - # with_attributes=with_attributes, - timestamp_attribute=timestamp_attribute, - ) - ), - expansion_service - ) + self.params = WriteToPubsubSchema( + topic=topic, + id_label=id_label, + # with_attributes=with_attributes, + timestamp_attribute=timestamp_attribute) + self.expansion_service = expansion_service self.with_attributes = with_attributes def expand(self, pvalue): @@ -157,4 +149,10 @@ def expand(self, pvalue): pcoll = pvalue | 'ToProto' >> Map( lambda x: pubsub.PubsubMessage(x, {})._to_proto_str()) pcoll.element_type = bytes - return super(WriteToPubSub, self).expand(pcoll) + + return pcoll.apply( + ExternalTransform( + self.URN, + NamedTupleBasedPayloadBuilder(self.params), + self.expansion_service) + ) From 63cabe7849fc3b046e5fcfe8ab66b9bc58534dd5 Mon Sep 17 00:00:00 2001 From: Chad Dombrova Date: Thu, 17 Oct 2019 18:13:13 -0700 Subject: [PATCH 4/6] Must use Boolean arg to make use of the BooleanCoder in Java --- .../org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 6 ++++-- .../beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java | 12 ++++++------ 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index 5fddeb145920..3212b3b0bbf4 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -813,8 +813,10 @@ public void setTimestampAttribute(@Nullable String timestampAttribute) { this.timestampAttribute = timestampAttribute; } - public void setWithAttributes(Long needsAttributes) { - this.needsAttributes = needsAttributes >= 1; + public void setWithAttributes(Boolean needsAttributes) { + // we must use Boolean instead of boolean because the external payload system + // inspects the native type of each coder urn, and BooleanCoder wants Boolean. + this.needsAttributes = needsAttributes; } } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java index ea43ec0522ef..50f75289122c 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java @@ -30,8 +30,8 @@ import org.apache.beam.runners.core.construction.ReadTranslation; import org.apache.beam.runners.core.construction.expansion.ExpansionService; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.BooleanCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; @@ -52,7 +52,7 @@ public class PubsubIOExternalTest { public void testConstructPubsubRead() throws Exception { String topic = "projects/project-1234/topics/topic_name"; String idAttribute = "id_foo"; - Long needsAttributes = 1L; + Boolean needsAttributes = true; ExternalTransforms.ExternalConfigurationPayload payload = ExternalTransforms.ExternalConfigurationPayload.newBuilder() @@ -71,8 +71,8 @@ public void testConstructPubsubRead() throws Exception { .putConfiguration( "with_attributes", ExternalTransforms.ConfigValue.newBuilder() - .addCoderUrn("beam:coder:string_utf8:v1") - .setPayload(ByteString.copyFrom(encodeVarLong(needsAttributes))) + .addCoderUrn("beam:coder:bool:v1") + .setPayload(ByteString.copyFrom(encodeBoolean(needsAttributes))) .build()) .build(); @@ -210,9 +210,9 @@ private static byte[] encodeString(String str) throws IOException { return baos.toByteArray(); } - private static byte[] encodeVarLong(Long value) throws IOException { + private static byte[] encodeBoolean(Boolean value) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); - VarLongCoder.of().encode(value, baos); + BooleanCoder.of().encode(value, baos); return baos.toByteArray(); } From b1e3fd1da059cfeb42cb8a08468f8841e6ec5b32 Mon Sep 17 00:00:00 2001 From: Chad Dombrova Date: Sun, 20 Oct 2019 13:25:00 -0700 Subject: [PATCH 5/6] Fix docs --- sdks/python/apache_beam/io/external/gcp/pubsub.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/io/external/gcp/pubsub.py b/sdks/python/apache_beam/io/external/gcp/pubsub.py index 5d0683b243b8..7d66819fdb89 100644 --- a/sdks/python/apache_beam/io/external/gcp/pubsub.py +++ b/sdks/python/apache_beam/io/external/gcp/pubsub.py @@ -64,7 +64,8 @@ def __init__(self, topic=None, subscription=None, id_label=None, that no duplicate data will be delivered on the Pub/Sub stream. In this case, deduplication of the stream will be strictly best effort. with_attributes: - True - output elements will be :class:`~PubsubMessage` objects. + True - output elements will be + :class:`~apache_beam.io.gcp.pubsub.PubsubMessage` objects. False - output elements will be of type ``bytes`` (message data only). timestamp_attribute: Message value to use as element timestamp. If None, @@ -125,7 +126,8 @@ def __init__(self, topic, with_attributes=False, id_label=None, Args: topic: Cloud Pub/Sub topic in the form "/topics//". with_attributes: - True - input elements will be :class:`~PubsubMessage` objects. + True - input elements will be + :class:`~apache_beam.io.gcp.pubsub.PubsubMessage` objects. False - input elements will be of type ``bytes`` (message data only). id_label: If set, will set an attribute for each Cloud Pub/Sub message From 7b3ce81c0e5af627bf7bdfd3849a0be297d0de28 Mon Sep 17 00:00:00 2001 From: Chad Dombrova Date: Thu, 24 Oct 2019 08:37:59 -0700 Subject: [PATCH 6/6] Mark kafka and pubsub external transforms as experimental --- sdks/python/apache_beam/io/external/gcp/pubsub.py | 12 ++++++++++-- sdks/python/apache_beam/io/external/kafka.py | 6 ++++-- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/io/external/gcp/pubsub.py b/sdks/python/apache_beam/io/external/gcp/pubsub.py index 7d66819fdb89..f0988ed75962 100644 --- a/sdks/python/apache_beam/io/external/gcp/pubsub.py +++ b/sdks/python/apache_beam/io/external/gcp/pubsub.py @@ -40,7 +40,11 @@ class ReadFromPubSub(beam.PTransform): - """An external ``PTransform`` for reading from Cloud Pub/Sub.""" + """An external ``PTransform`` for reading from Cloud Pub/Sub. + + Experimental; no backwards compatibility guarantees. It requires special + preparation of the Java SDK. See BEAM-7870. + """ URN = 'beam:external:java:pubsub:read:v1' @@ -115,7 +119,11 @@ def expand(self, pbegin): class WriteToPubSub(beam.PTransform): - """An external ``PTransform`` for writing messages to Cloud Pub/Sub.""" + """An external ``PTransform`` for writing messages to Cloud Pub/Sub. + + Experimental; no backwards compatibility guarantees. It requires special + preparation of the Java SDK. See BEAM-7870. + """ URN = 'beam:external:java:pubsub:write:v1' diff --git a/sdks/python/apache_beam/io/external/kafka.py b/sdks/python/apache_beam/io/external/kafka.py index f824515b1aa4..04d91a79a26d 100644 --- a/sdks/python/apache_beam/io/external/kafka.py +++ b/sdks/python/apache_beam/io/external/kafka.py @@ -64,7 +64,8 @@ class ReadFromKafka(ExternalTransform): Note: Runners need to support translating Read operations in order to use this source. At the moment only the Flink Runner supports this. - Experimental; no backwards compatibility guarantees. + Experimental; no backwards compatibility guarantees. It requires special + preparation of the Java SDK. See BEAM-7870. """ # Returns the key/value data as raw byte arrays @@ -128,7 +129,8 @@ class WriteToKafka(ExternalTransform): If no Kafka Serializer for key/value is provided, then key/value are assumed to be byte arrays. - Experimental; no backwards compatibility guarantees. + Experimental; no backwards compatibility guarantees. It requires special + preparation of the Java SDK. See BEAM-7870. """ # Default serializer which passes raw bytes to Kafka