diff --git a/runners/flink/job-server/flink_job_server.gradle b/runners/flink/job-server/flink_job_server.gradle index ee856e791ffe..b93b950dc024 100644 --- a/runners/flink/job-server/flink_job_server.gradle +++ b/runners/flink/job-server/flink_job_server.gradle @@ -89,6 +89,7 @@ dependencies { // For resolving external transform requests runtime project(":sdks:java:io:kafka") runtime library.java.kafka_clients + runtime project(":sdks:java:io:google-cloud-platform") } // NOTE: runShadow must be used in order to run the job server. The standard run diff --git a/sdks/java/io/google-cloud-platform/build.gradle b/sdks/java/io/google-cloud-platform/build.gradle index 0a9b8a9195cc..0c1befd311c9 100644 --- a/sdks/java/io/google-cloud-platform/build.gradle +++ b/sdks/java/io/google-cloud-platform/build.gradle @@ -69,6 +69,10 @@ dependencies { testCompile project(path: ":sdks:java:core", configuration: "shadowTest") testCompile project(path: ":sdks:java:extensions:google-cloud-platform-core", configuration: "testRuntime") testCompile project(path: ":runners:direct-java", configuration: "shadow") + testCompile project(path: ":sdks:java:io:common", configuration: "testRuntime") + testCompile project(path: ":sdks:java:testing:test-utils", configuration: "testRuntime") + // For testing Cross-language transforms + testCompile project(":runners:core-construction-java") testCompile library.java.hamcrest_core testCompile library.java.hamcrest_library testCompile library.java.junit diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index 4f745ea9a4bb..3212b3b0bbf4 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -20,7 +20,10 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; import com.google.api.client.util.Clock; +import com.google.auto.service.AutoService; import com.google.auto.value.AutoValue; +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Message; import java.io.IOException; import java.io.Serializable; @@ -39,9 +42,11 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.expansion.ExternalTransformRegistrar; import org.apache.beam.sdk.extensions.protobuf.ProtoCoder; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.ProjectPath; @@ -53,6 +58,7 @@ import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.utils.AvroUtils; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ExternalTransformBuilder; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -705,7 +711,8 @@ public abstract static class Read extends PTransform> abstract Builder toBuilder(); @AutoValue.Builder - abstract static class Builder { + abstract static class Builder + implements ExternalTransformBuilder> { abstract Builder setTopicProvider(ValueProvider topic); abstract Builder setPubsubClientFactory(PubsubClient.PubsubClientFactory clientFactory); @@ -733,6 +740,85 @@ abstract static class Builder { abstract Builder setClock(@Nullable Clock clock); abstract Read build(); + + @Override + public PTransform> buildExternal(External.Configuration config) { + if (config.topic != null) { + StaticValueProvider topic = StaticValueProvider.of(config.topic); + setTopicProvider(NestedValueProvider.of(topic, new TopicTranslator())); + } + if (config.subscription != null) { + StaticValueProvider subscription = StaticValueProvider.of(config.subscription); + setSubscriptionProvider( + NestedValueProvider.of(subscription, new SubscriptionTranslator())); + } + if (config.idAttribute != null) { + setIdAttribute(config.idAttribute); + } + if (config.timestampAttribute != null) { + setTimestampAttribute(config.timestampAttribute); + } + setPubsubClientFactory(FACTORY); + setNeedsAttributes(config.needsAttributes); + Coder coder = ByteArrayCoder.of(); + if (config.needsAttributes) { + SimpleFunction parseFn = + (SimpleFunction) new ParsePayloadAsPubsubMessageProto(); + setParseFn(parseFn); + setCoder(coder); + } else { + setParseFn(new ParsePayloadUsingCoder<>(coder)); + setCoder(coder); + } + setNeedsMessageId(false); + return build(); + } + } + + /** Exposes {@link PubSubIO.Read} as an external transform for cross-language usage. */ + @Experimental + @AutoService(ExternalTransformRegistrar.class) + public static class External implements ExternalTransformRegistrar { + + public static final String URN = "beam:external:java:pubsub:read:v1"; + + @Override + public Map> knownBuilders() { + return ImmutableMap.of(URN, AutoValue_PubsubIO_Read.Builder.class); + } + + /** Parameters class to expose the transform to an external SDK. */ + public static class Configuration { + + // All byte arrays are UTF-8 encoded strings + @Nullable private String topic; + @Nullable private String subscription; + @Nullable private String idAttribute; + @Nullable private String timestampAttribute; + private boolean needsAttributes; + + public void setTopic(@Nullable String topic) { + this.topic = topic; + } + + public void setSubscription(@Nullable String subscription) { + this.subscription = subscription; + } + + public void setIdLabel(@Nullable String idAttribute) { + this.idAttribute = idAttribute; + } + + public void setTimestampAttribute(@Nullable String timestampAttribute) { + this.timestampAttribute = timestampAttribute; + } + + public void setWithAttributes(Boolean needsAttributes) { + // we must use Boolean instead of boolean because the external payload system + // inspects the native type of each coder urn, and BooleanCoder wants Boolean. + this.needsAttributes = needsAttributes; + } + } } /** @@ -955,7 +1041,8 @@ public abstract static class Write extends PTransform, PDone> abstract Builder toBuilder(); @AutoValue.Builder - abstract static class Builder { + abstract static class Builder + implements ExternalTransformBuilder, PDone> { abstract Builder setTopicProvider(ValueProvider topicProvider); abstract Builder setPubsubClientFactory(PubsubClient.PubsubClientFactory factory); @@ -971,6 +1058,58 @@ abstract static class Builder { abstract Builder setFormatFn(SimpleFunction formatFn); abstract Write build(); + + @Override + public PTransform, PDone> buildExternal(External.Configuration config) { + if (config.topic != null) { + StaticValueProvider topic = StaticValueProvider.of(config.topic); + setTopicProvider(NestedValueProvider.of(topic, new TopicTranslator())); + } + if (config.idAttribute != null) { + setIdAttribute(config.idAttribute); + } + if (config.timestampAttribute != null) { + setTimestampAttribute(config.timestampAttribute); + } + SimpleFunction parseFn = + (SimpleFunction) new FormatPayloadFromPubsubMessageProto(); + setFormatFn(parseFn); + return build(); + } + } + + /** Exposes {@link PubSubIO.Write} as an external transform for cross-language usage. */ + @Experimental + @AutoService(ExternalTransformRegistrar.class) + public static class External implements ExternalTransformRegistrar { + + public static final String URN = "beam:external:java:pubsub:write:v1"; + + @Override + public Map> knownBuilders() { + return ImmutableMap.of(URN, AutoValue_PubsubIO_Write.Builder.class); + } + + /** Parameters class to expose the transform to an external SDK. */ + public static class Configuration { + + // All byte arrays are UTF-8 encoded strings + private String topic; + @Nullable private String idAttribute; + @Nullable private String timestampAttribute; + + public void setTopic(String topic) { + this.topic = topic; + } + + public void setIdLabel(@Nullable String idAttribute) { + this.idAttribute = idAttribute; + } + + public void setTimestampAttribute(@Nullable String timestampAttribute) { + this.timestampAttribute = timestampAttribute; + } + } } /** @@ -1213,6 +1352,22 @@ public T apply(PubsubMessage input) { } } + private static class ParsePayloadAsPubsubMessageProto + extends SimpleFunction { + @Override + public byte[] apply(PubsubMessage input) { + Map attributes = input.getAttributeMap(); + com.google.pubsub.v1.PubsubMessage.Builder message = + com.google.pubsub.v1.PubsubMessage.newBuilder() + .setData(ByteString.copyFrom(input.getPayload())); + // TODO(BEAM-8085) this should not be null + if (attributes != null) { + message.putAllAttributes(attributes); + } + return message.build().toByteArray(); + } + } + private static class FormatPayloadAsUtf8 extends SimpleFunction { @Override public PubsubMessage apply(String input) { @@ -1237,6 +1392,20 @@ public PubsubMessage apply(T input) { } } + private static class FormatPayloadFromPubsubMessageProto + extends SimpleFunction { + @Override + public PubsubMessage apply(byte[] input) { + try { + com.google.pubsub.v1.PubsubMessage message = + com.google.pubsub.v1.PubsubMessage.parseFrom(input); + return new PubsubMessage(message.getData().toByteArray(), message.getAttributesMap()); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException("Could not decode Pubsub message", e); + } + } + } + private static class IdentityMessageFn extends SimpleFunction { @Override public PubsubMessage apply(PubsubMessage input) { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java new file mode 100644 index 000000000000..50f75289122c --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import static org.hamcrest.MatcherAssert.assertThat; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import javax.annotation.Nullable; +import org.apache.beam.model.expansion.v1.ExpansionApi; +import org.apache.beam.model.pipeline.v1.ExternalTransforms; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.ParDoTranslation; +import org.apache.beam.runners.core.construction.PipelineTranslation; +import org.apache.beam.runners.core.construction.ReadTranslation; +import org.apache.beam.runners.core.construction.expansion.ExpansionService; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.BooleanCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.ByteString; +import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.StreamObserver; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; +import org.hamcrest.Matchers; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.powermock.reflect.Whitebox; + +/** Tests for building {@link PubsubIO} externally via the ExpansionService. */ +@RunWith(JUnit4.class) +public class PubsubIOExternalTest { + @Test + public void testConstructPubsubRead() throws Exception { + String topic = "projects/project-1234/topics/topic_name"; + String idAttribute = "id_foo"; + Boolean needsAttributes = true; + + ExternalTransforms.ExternalConfigurationPayload payload = + ExternalTransforms.ExternalConfigurationPayload.newBuilder() + .putConfiguration( + "topic", + ExternalTransforms.ConfigValue.newBuilder() + .addCoderUrn("beam:coder:string_utf8:v1") + .setPayload(ByteString.copyFrom(encodeString(topic))) + .build()) + .putConfiguration( + "id_label", + ExternalTransforms.ConfigValue.newBuilder() + .addCoderUrn("beam:coder:string_utf8:v1") + .setPayload(ByteString.copyFrom(encodeString(idAttribute))) + .build()) + .putConfiguration( + "with_attributes", + ExternalTransforms.ConfigValue.newBuilder() + .addCoderUrn("beam:coder:bool:v1") + .setPayload(ByteString.copyFrom(encodeBoolean(needsAttributes))) + .build()) + .build(); + + RunnerApi.Components defaultInstance = RunnerApi.Components.getDefaultInstance(); + ExpansionApi.ExpansionRequest request = + ExpansionApi.ExpansionRequest.newBuilder() + .setComponents(defaultInstance) + .setTransform( + RunnerApi.PTransform.newBuilder() + .setUniqueName("test") + .setSpec( + RunnerApi.FunctionSpec.newBuilder() + .setUrn("beam:external:java:pubsub:read:v1") + .setPayload(payload.toByteString()))) + .setNamespace("test_namespace") + .build(); + + ExpansionService expansionService = new ExpansionService(); + TestStreamObserver observer = new TestStreamObserver<>(); + expansionService.expand(request, observer); + + ExpansionApi.ExpansionResponse result = observer.result; + RunnerApi.PTransform transform = result.getTransform(); + assertThat( + transform.getSubtransformsList(), + Matchers.contains( + "test_namespacetest/PubsubUnboundedSource", "test_namespacetest/MapElements")); + assertThat(transform.getInputsCount(), Matchers.is(0)); + assertThat(transform.getOutputsCount(), Matchers.is(1)); + + RunnerApi.PTransform pubsubComposite = + result.getComponents().getTransformsOrThrow(transform.getSubtransforms(0)); + RunnerApi.PTransform pubsubRead = + result.getComponents().getTransformsOrThrow(pubsubComposite.getSubtransforms(0)); + RunnerApi.ReadPayload readPayload = + RunnerApi.ReadPayload.parseFrom(pubsubRead.getSpec().getPayload()); + PubsubUnboundedSource.PubsubSource source = + (PubsubUnboundedSource.PubsubSource) ReadTranslation.unboundedSourceFromProto(readPayload); + PubsubUnboundedSource spec = source.outer; + + assertThat( + spec.getTopicProvider() == null ? null : String.valueOf(spec.getTopicProvider()), + Matchers.is(topic)); + assertThat(spec.getIdAttribute(), Matchers.is(idAttribute)); + assertThat(spec.getNeedsAttributes(), Matchers.is(true)); + } + + @Test + public void testConstructPubsubWrite() throws Exception { + String topic = "projects/project-1234/topics/topic_name"; + String idAttribute = "id_foo"; + + ExternalTransforms.ExternalConfigurationPayload payload = + ExternalTransforms.ExternalConfigurationPayload.newBuilder() + .putConfiguration( + "topic", + ExternalTransforms.ConfigValue.newBuilder() + .addCoderUrn("beam:coder:string_utf8:v1") + .setPayload(ByteString.copyFrom(encodeString(topic))) + .build()) + .putConfiguration( + "id_label", + ExternalTransforms.ConfigValue.newBuilder() + .addCoderUrn("beam:coder:string_utf8:v1") + .setPayload(ByteString.copyFrom(encodeString(idAttribute))) + .build()) + .build(); + + Pipeline p = Pipeline.create(); + p.apply("unbounded", Create.of(1, 2, 3)).setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED); + + RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p); + String inputPCollection = + Iterables.getOnlyElement( + Iterables.getLast(pipelineProto.getComponents().getTransformsMap().values()) + .getOutputsMap() + .values()); + + ExpansionApi.ExpansionRequest request = + ExpansionApi.ExpansionRequest.newBuilder() + .setComponents(pipelineProto.getComponents()) + .setTransform( + RunnerApi.PTransform.newBuilder() + .setUniqueName("test") + .putInputs("input", inputPCollection) + .setSpec( + RunnerApi.FunctionSpec.newBuilder() + .setUrn("beam:external:java:pubsub:write:v1") + .setPayload(payload.toByteString()))) + .setNamespace("test_namespace") + .build(); + + ExpansionService expansionService = new ExpansionService(); + TestStreamObserver observer = new TestStreamObserver<>(); + expansionService.expand(request, observer); + + ExpansionApi.ExpansionResponse result = observer.result; + + RunnerApi.PTransform transform = result.getTransform(); + assertThat( + transform.getSubtransformsList(), + Matchers.contains( + "test_namespacetest/MapElements", "test_namespacetest/PubsubUnboundedSink")); + assertThat(transform.getInputsCount(), Matchers.is(1)); + assertThat(transform.getOutputsCount(), Matchers.is(0)); + + // test_namespacetest/PubsubUnboundedSink + RunnerApi.PTransform writeComposite = + result.getComponents().getTransformsOrThrow(transform.getSubtransforms(1)); + + // test_namespacetest/PubsubUnboundedSink/PubsubUnboundedSink.Writer + RunnerApi.PTransform writeComposite2 = + result.getComponents().getTransformsOrThrow(writeComposite.getSubtransforms(3)); + + // test_namespacetest/PubsubUnboundedSink/PubsubUnboundedSink.Writer/ParMultiDo(Writer) + RunnerApi.PTransform writeParDo = + result.getComponents().getTransformsOrThrow(writeComposite2.getSubtransforms(0)); + + RunnerApi.ParDoPayload parDoPayload = + RunnerApi.ParDoPayload.parseFrom(writeParDo.getSpec().getPayload()); + DoFn pubsubWriter = ParDoTranslation.getDoFn(parDoPayload); + + String idAttributeActual = (String) Whitebox.getInternalState(pubsubWriter, "idAttribute"); + + ValueProvider topicActual = + (ValueProvider) Whitebox.getInternalState(pubsubWriter, "topic"); + + assertThat(topicActual == null ? null : String.valueOf(topicActual), Matchers.is(topic)); + assertThat(idAttributeActual, Matchers.is(idAttribute)); + } + + private static byte[] encodeString(String str) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + StringUtf8Coder.of().encode(str, baos); + return baos.toByteArray(); + } + + private static byte[] encodeBoolean(Boolean value) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + BooleanCoder.of().encode(value, baos); + return baos.toByteArray(); + } + + private static @Nullable String getTopic(@Nullable ValueProvider value) { + if (value == null) { + return null; + } + return String.valueOf(value); + } + + private static class TestStreamObserver implements StreamObserver { + + private T result; + + @Override + public void onNext(T t) { + result = t; + } + + @Override + public void onError(Throwable throwable) { + throw new RuntimeException("Should not happen", throwable); + } + + @Override + public void onCompleted() {} + } +} diff --git a/sdks/python/apache_beam/io/external/gcp/__init__.py b/sdks/python/apache_beam/io/external/gcp/__init__.py new file mode 100644 index 000000000000..6569e3fe5de4 --- /dev/null +++ b/sdks/python/apache_beam/io/external/gcp/__init__.py @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import absolute_import diff --git a/sdks/python/apache_beam/io/external/gcp/pubsub.py b/sdks/python/apache_beam/io/external/gcp/pubsub.py new file mode 100644 index 000000000000..f0988ed75962 --- /dev/null +++ b/sdks/python/apache_beam/io/external/gcp/pubsub.py @@ -0,0 +1,168 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import absolute_import + +import typing + +from past.builtins import unicode + +import apache_beam as beam +from apache_beam.io.gcp import pubsub +from apache_beam.transforms import Map +from apache_beam.transforms.external import ExternalTransform +from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder + +ReadFromPubsubSchema = typing.NamedTuple( + 'ReadFromPubsubSchema', + [ + ('topic', typing.Optional[unicode]), + ('subscription', typing.Optional[unicode]), + ('id_label', typing.Optional[unicode]), + ('with_attributes', bool), + ('timestamp_attribute', typing.Optional[unicode]), + ] +) + + +class ReadFromPubSub(beam.PTransform): + """An external ``PTransform`` for reading from Cloud Pub/Sub. + + Experimental; no backwards compatibility guarantees. It requires special + preparation of the Java SDK. See BEAM-7870. + """ + + URN = 'beam:external:java:pubsub:read:v1' + + def __init__(self, topic=None, subscription=None, id_label=None, + with_attributes=False, timestamp_attribute=None, + expansion_service=None): + """Initializes ``ReadFromPubSub``. + + Args: + topic: Cloud Pub/Sub topic in the form + "projects//topics/". If provided, subscription must be + None. + subscription: Existing Cloud Pub/Sub subscription to use in the + form "projects//subscriptions/". If not + specified, a temporary subscription will be created from the specified + topic. If provided, topic must be None. + id_label: The attribute on incoming Pub/Sub messages to use as a unique + record identifier. When specified, the value of this attribute (which + can be any string that uniquely identifies the record) will be used for + deduplication of messages. If not provided, we cannot guarantee + that no duplicate data will be delivered on the Pub/Sub stream. In this + case, deduplication of the stream will be strictly best effort. + with_attributes: + True - output elements will be + :class:`~apache_beam.io.gcp.pubsub.PubsubMessage` objects. + False - output elements will be of type ``bytes`` (message + data only). + timestamp_attribute: Message value to use as element timestamp. If None, + uses message publishing time as the timestamp. + + Timestamp values should be in one of two formats: + + - A numerical value representing the number of milliseconds since the + Unix epoch. + - A string in RFC 3339 format, UTC timezone. Example: + ``2015-10-29T23:41:41.123Z``. The sub-second component of the + timestamp is optional, and digits beyond the first three (i.e., time + units smaller than milliseconds) may be ignored. + """ + self.params = ReadFromPubsubSchema( + topic=topic, + subscription=subscription, + id_label=id_label, + with_attributes=with_attributes, + timestamp_attribute=timestamp_attribute) + self.expansion_service = expansion_service + + def expand(self, pbegin): + pcoll = pbegin.apply( + ExternalTransform( + self.URN, NamedTupleBasedPayloadBuilder(self.params), + self.expansion_service)) + + if self.params.with_attributes: + pcoll = pcoll | 'FromProto' >> Map(pubsub.PubsubMessage._from_proto_str) + pcoll.element_type = pubsub.PubsubMessage + else: + pcoll.element_type = bytes + return pcoll + + +WriteToPubsubSchema = typing.NamedTuple( + 'WriteToPubsubSchema', + [ + ('topic', unicode), + ('id_label', typing.Optional[unicode]), + # this is not implemented yet on the Java side: + # ('with_attributes', bool), + ('timestamp_attribute', typing.Optional[unicode]), + ] +) + + +class WriteToPubSub(beam.PTransform): + """An external ``PTransform`` for writing messages to Cloud Pub/Sub. + + Experimental; no backwards compatibility guarantees. It requires special + preparation of the Java SDK. See BEAM-7870. + """ + + URN = 'beam:external:java:pubsub:write:v1' + + def __init__(self, topic, with_attributes=False, id_label=None, + timestamp_attribute=None, expansion_service=None): + """Initializes ``WriteToPubSub``. + + Args: + topic: Cloud Pub/Sub topic in the form "/topics//". + with_attributes: + True - input elements will be + :class:`~apache_beam.io.gcp.pubsub.PubsubMessage` objects. + False - input elements will be of type ``bytes`` (message + data only). + id_label: If set, will set an attribute for each Cloud Pub/Sub message + with the given name and a unique value. This attribute can then be used + in a ReadFromPubSub PTransform to deduplicate messages. + timestamp_attribute: If set, will set an attribute for each Cloud Pub/Sub + message with the given name and the message's publish time as the value. + """ + self.params = WriteToPubsubSchema( + topic=topic, + id_label=id_label, + # with_attributes=with_attributes, + timestamp_attribute=timestamp_attribute) + self.expansion_service = expansion_service + self.with_attributes = with_attributes + + def expand(self, pvalue): + if self.with_attributes: + pcoll = pvalue | 'ToProto' >> Map(pubsub.WriteToPubSub.to_proto_str) + else: + pcoll = pvalue | 'ToProto' >> Map( + lambda x: pubsub.PubsubMessage(x, {})._to_proto_str()) + pcoll.element_type = bytes + + return pcoll.apply( + ExternalTransform( + self.URN, + NamedTupleBasedPayloadBuilder(self.params), + self.expansion_service) + ) diff --git a/sdks/python/apache_beam/io/external/kafka.py b/sdks/python/apache_beam/io/external/kafka.py index f824515b1aa4..04d91a79a26d 100644 --- a/sdks/python/apache_beam/io/external/kafka.py +++ b/sdks/python/apache_beam/io/external/kafka.py @@ -64,7 +64,8 @@ class ReadFromKafka(ExternalTransform): Note: Runners need to support translating Read operations in order to use this source. At the moment only the Flink Runner supports this. - Experimental; no backwards compatibility guarantees. + Experimental; no backwards compatibility guarantees. It requires special + preparation of the Java SDK. See BEAM-7870. """ # Returns the key/value data as raw byte arrays @@ -128,7 +129,8 @@ class WriteToKafka(ExternalTransform): If no Kafka Serializer for key/value is provided, then key/value are assumed to be byte arrays. - Experimental; no backwards compatibility guarantees. + Experimental; no backwards compatibility guarantees. It requires special + preparation of the Java SDK. See BEAM-7870. """ # Default serializer which passes raw bytes to Kafka diff --git a/sdks/python/apache_beam/transforms/external_test.py b/sdks/python/apache_beam/transforms/external_test.py index ba315f98dfb5..fe269770c376 100644 --- a/sdks/python/apache_beam/transforms/external_test.py +++ b/sdks/python/apache_beam/transforms/external_test.py @@ -34,6 +34,7 @@ import apache_beam as beam from apache_beam import Pipeline +from apache_beam.coders import BooleanCoder from apache_beam.coders import FloatCoder from apache_beam.coders import IterableCoder from apache_beam.coders import StrUtf8Coder @@ -66,6 +67,7 @@ def get_payload(args): class PayloadBase(object): values = { 'integer_example': 1, + 'boolean': True, 'string_example': u'thing', 'list_of_strings': [u'foo', u'bar'], 'optional_kv': (u'key', 1.1), @@ -74,6 +76,7 @@ class PayloadBase(object): bytes_values = { 'integer_example': 1, + 'boolean': True, 'string_example': 'thing', 'list_of_strings': ['foo', 'bar'], 'optional_kv': ('key', 1.1), @@ -85,6 +88,10 @@ class PayloadBase(object): coder_urn=['beam:coder:varint:v1'], payload=VarIntCoder() .get_impl().encode_nested(values['integer_example'])), + 'boolean': ConfigValue( + coder_urn=['beam:coder:bool:v1'], + payload=BooleanCoder() + .get_impl().encode_nested(values['boolean'])), 'string_example': ConfigValue( coder_urn=['beam:coder:string_utf8:v1'], payload=StrUtf8Coder() @@ -151,6 +158,7 @@ def get_payload_from_typing_hints(self, values): 'TestSchema', [ ('integer_example', int), + ('boolean', bool), ('string_example', unicode), ('list_of_strings', typing.List[unicode]), ('optional_kv', typing.Optional[typing.Tuple[unicode, float]]), @@ -188,6 +196,10 @@ def test_implicit_payload_builder_with_bytes(self): coder_urn=['beam:coder:varint:v1'], payload=VarIntCoder() .get_impl().encode_nested(values['integer_example'])), + 'boolean': ConfigValue( + coder_urn=['beam:coder:bool:v1'], + payload=BooleanCoder() + .get_impl().encode_nested(values['boolean'])), 'string_example': ConfigValue( coder_urn=['beam:coder:bytes:v1'], payload=StrUtf8Coder() diff --git a/sdks/python/apache_beam/transforms/external_test_py3.py b/sdks/python/apache_beam/transforms/external_test_py3.py index 88fa870f17b5..980ad8027e60 100644 --- a/sdks/python/apache_beam/transforms/external_test_py3.py +++ b/sdks/python/apache_beam/transforms/external_test_py3.py @@ -43,6 +43,7 @@ class AnnotatedTransform(beam.ExternalTransform): def __init__(self, integer_example: int, + boolean: bool, string_example: str, list_of_strings: typing.List[str], optional_kv: typing.Optional[typing.Tuple[str, float]] = None, @@ -53,6 +54,7 @@ def __init__(self, AnnotationBasedPayloadBuilder( self, integer_example=integer_example, + boolean=boolean, string_example=string_example, list_of_strings=list_of_strings, optional_kv=optional_kv, @@ -69,6 +71,7 @@ class AnnotatedTransform(beam.ExternalTransform): def __init__(self, integer_example: int, + boolean: bool, string_example: str, list_of_strings: typehints.List[str], optional_kv: typehints.Optional[typehints.KV[str, float]] = None, @@ -79,6 +82,7 @@ def __init__(self, AnnotationBasedPayloadBuilder( self, integer_example=integer_example, + boolean=boolean, string_example=string_example, list_of_strings=list_of_strings, optional_kv=optional_kv, diff --git a/sdks/python/apache_beam/transforms/external_test_py37.py b/sdks/python/apache_beam/transforms/external_test_py37.py index ad1ff72f0cfe..1a3cc2dfa7ee 100644 --- a/sdks/python/apache_beam/transforms/external_test_py37.py +++ b/sdks/python/apache_beam/transforms/external_test_py37.py @@ -44,6 +44,7 @@ class DataclassTransform(beam.ExternalTransform): URN = 'beam:external:fakeurn:v1' integer_example: int + boolean: bool string_example: str list_of_strings: typing.List[str] optional_kv: typing.Optional[typing.Tuple[str, float]] = None @@ -59,6 +60,7 @@ class DataclassTransform(beam.ExternalTransform): URN = 'beam:external:fakeurn:v1' integer_example: int + boolean: bool string_example: str list_of_strings: typehints.List[str] optional_kv: typehints.Optional[typehints.KV[str, float]] = None