diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRunner.java index 88018965bff7..3ed7df850225 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRunner.java @@ -33,7 +33,6 @@ import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecord; import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecordCoder; import org.apache.beam.runners.dataflow.internal.IsmFormat.MetadataKeyCoder; -import org.apache.beam.runners.dataflow.internal.PubsubIOTranslator; import org.apache.beam.runners.dataflow.internal.ReadTranslator; import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; @@ -63,6 +62,8 @@ import org.apache.beam.sdk.io.BigQueryIO; import org.apache.beam.sdk.io.FileBasedSink; import org.apache.beam.sdk.io.PubsubIO; +import org.apache.beam.sdk.io.PubsubUnboundedSink; +import org.apache.beam.sdk.io.PubsubUnboundedSource; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.ShardNameTemplate; import org.apache.beam.sdk.io.TextIO; @@ -107,6 +108,7 @@ import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.PCollectionList; @@ -177,6 +179,7 @@ import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; +import javax.annotation.Nullable; /** * A {@link PipelineRunner} that executes the operations in the @@ -338,33 +341,46 @@ public static DataflowPipelineRunner fromOptions(PipelineOptions options) { this.pcollectionsRequiringIndexedFormat = new HashSet<>(); this.ptransformViewsWithNonDeterministicKeyCoders = new HashSet<>(); + ImmutableMap.Builder, Class> builder = ImmutableMap., Class>builder(); if (options.isStreaming()) { - overrides = ImmutableMap., Class>builder() - .put(Combine.GloballyAsSingletonView.class, StreamingCombineGloballyAsSingletonView.class) - .put(Create.Values.class, StreamingCreate.class) - .put(View.AsMap.class, StreamingViewAsMap.class) - .put(View.AsMultimap.class, StreamingViewAsMultimap.class) - .put(View.AsSingleton.class, StreamingViewAsSingleton.class) - .put(View.AsList.class, StreamingViewAsList.class) - .put(View.AsIterable.class, StreamingViewAsIterable.class) - .put(Write.Bound.class, StreamingWrite.class) - .put(PubsubIO.Write.Bound.class, StreamingPubsubIOWrite.class) - .put(Read.Unbounded.class, StreamingUnboundedRead.class) - .put(Read.Bounded.class, UnsupportedIO.class) - .put(AvroIO.Read.Bound.class, UnsupportedIO.class) - .put(AvroIO.Write.Bound.class, UnsupportedIO.class) - .put(BigQueryIO.Read.Bound.class, UnsupportedIO.class) - .put(TextIO.Read.Bound.class, UnsupportedIO.class) - .put(TextIO.Write.Bound.class, UnsupportedIO.class) - .put(Window.Bound.class, AssignWindows.class) - .build(); + builder.put(Combine.GloballyAsSingletonView.class, + StreamingCombineGloballyAsSingletonView.class); + builder.put(Create.Values.class, StreamingCreate.class); + builder.put(View.AsMap.class, StreamingViewAsMap.class); + builder.put(View.AsMultimap.class, StreamingViewAsMultimap.class); + builder.put(View.AsSingleton.class, StreamingViewAsSingleton.class); + builder.put(View.AsList.class, StreamingViewAsList.class); + builder.put(View.AsIterable.class, StreamingViewAsIterable.class); + builder.put(Write.Bound.class, StreamingWrite.class); + builder.put(Read.Unbounded.class, StreamingUnboundedRead.class); + builder.put(Read.Bounded.class, UnsupportedIO.class); + builder.put(AvroIO.Read.Bound.class, UnsupportedIO.class); + builder.put(AvroIO.Write.Bound.class, UnsupportedIO.class); + builder.put(BigQueryIO.Read.Bound.class, UnsupportedIO.class); + builder.put(TextIO.Read.Bound.class, UnsupportedIO.class); + builder.put(TextIO.Write.Bound.class, UnsupportedIO.class); + builder.put(Window.Bound.class, AssignWindows.class); + // In streaming mode must use either the custom Pubsub unbounded source/sink or + // defer to Windmill's built-in implementation. + builder.put(PubsubIO.Read.Bound.PubsubBoundedReader.class, UnsupportedIO.class); + builder.put(PubsubIO.Write.Bound.PubsubBoundedWriter.class, UnsupportedIO.class); + if (options.getExperiments() == null + || !options.getExperiments().contains("enable_custom_pubsub_source")) { + builder.put(PubsubUnboundedSource.class, StreamingPubsubIORead.class); + } + if (options.getExperiments() == null + || !options.getExperiments().contains("enable_custom_pubsub_sink")) { + builder.put(PubsubUnboundedSink.class, StreamingPubsubIOWrite.class); + } } else { - ImmutableMap.Builder, Class> builder = ImmutableMap., Class>builder(); builder.put(Read.Unbounded.class, UnsupportedIO.class); builder.put(Window.Bound.class, AssignWindows.class); builder.put(Write.Bound.class, BatchWrite.class); builder.put(AvroIO.Write.Bound.class, BatchAvroIOWrite.class); builder.put(TextIO.Write.Bound.class, BatchTextIOWrite.class); + // In batch mode must use the custom Pubsub bounded source/sink. + builder.put(PubsubUnboundedSource.class, UnsupportedIO.class); + builder.put(PubsubUnboundedSink.class, UnsupportedIO.class); if (options.getExperiments() == null || !options.getExperiments().contains("disable_ism_side_input")) { builder.put(View.AsMap.class, BatchViewAsMap.class); @@ -373,8 +389,8 @@ public static DataflowPipelineRunner fromOptions(PipelineOptions options) { builder.put(View.AsList.class, BatchViewAsList.class); builder.put(View.AsIterable.class, BatchViewAsIterable.class); } - overrides = builder.build(); } + overrides = builder.build(); } /** @@ -2336,27 +2352,104 @@ protected String getKindString() { } } + // ================================================================================ + // PubsubIO translations + // ================================================================================ + /** - * Specialized implementation for - * {@link org.apache.beam.sdk.io.PubsubIO.Write PubsubIO.Write} for the - * Dataflow runner in streaming mode. - * - *

For internal use only. Subject to change at any time. - * - *

Public so the {@link PubsubIOTranslator} can access. + * Suppress application of {@link PubsubUnboundedSource#apply} in streaming mode so that we + * can instead defer to Windmill's implementation. */ - public static class StreamingPubsubIOWrite extends PTransform, PDone> { - private final PubsubIO.Write.Bound transform; + private static class StreamingPubsubIORead extends PTransform> { + private final PubsubUnboundedSource transform; + + /** + * Builds an instance of this class from the overridden transform. + */ + public StreamingPubsubIORead( + DataflowPipelineRunner runner, PubsubUnboundedSource transform) { + this.transform = transform; + } + + PubsubUnboundedSource getOverriddenTransform() { + return transform; + } + + @Override + public PCollection apply(PBegin input) { + return PCollection.createPrimitiveOutputInternal( + input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED) + .setCoder(transform.getElementCoder()); + } + + @Override + protected String getKindString() { + return "StreamingPubsubIORead"; + } + + static { + DataflowPipelineTranslator.registerTransformTranslator( + StreamingPubsubIORead.class, new StreamingPubsubIOReadTranslator()); + } + } + + /** + * Rewrite {@link StreamingPubsubIORead} to the appropriate internal node. + */ + private static class StreamingPubsubIOReadTranslator implements + TransformTranslator { + @Override + @SuppressWarnings({"rawtypes", "unchecked"}) + public void translate( + StreamingPubsubIORead transform, + TranslationContext context) { + translateTyped(transform, context); + } + + private void translateTyped( + StreamingPubsubIORead transform, + TranslationContext context) { + checkArgument(context.getPipelineOptions().isStreaming(), + "StreamingPubsubIORead is only for streaming pipelines."); + PubsubUnboundedSource overriddenTransform = transform.getOverriddenTransform(); + context.addStep(transform, "ParallelRead"); + context.addInput(PropertyNames.FORMAT, "pubsub"); + if (overriddenTransform.getTopic() != null) { + context.addInput(PropertyNames.PUBSUB_TOPIC, + overriddenTransform.getTopic().getV1Beta1Path()); + } + if (overriddenTransform.getSubscription() != null) { + context.addInput( + PropertyNames.PUBSUB_SUBSCRIPTION, + overriddenTransform.getSubscription().getV1Beta1Path()); + } + if (overriddenTransform.getTimestampLabel() != null) { + context.addInput(PropertyNames.PUBSUB_TIMESTAMP_LABEL, + overriddenTransform.getTimestampLabel()); + } + if (overriddenTransform.getIdLabel() != null) { + context.addInput(PropertyNames.PUBSUB_ID_LABEL, overriddenTransform.getIdLabel()); + } + context.addValueOnlyOutput(PropertyNames.OUTPUT, context.getOutput(transform)); + } + } + + /** + * Suppress application of {@link PubsubUnboundedSink#apply} in streaming mode so that we + * can instead defer to Windmill's implementation. + */ + private static class StreamingPubsubIOWrite extends PTransform, PDone> { + private final PubsubUnboundedSink transform; /** * Builds an instance of this class from the overridden transform. */ public StreamingPubsubIOWrite( - DataflowPipelineRunner runner, PubsubIO.Write.Bound transform) { + DataflowPipelineRunner runner, PubsubUnboundedSink transform) { this.transform = transform; } - public PubsubIO.Write.Bound getOverriddenTransform() { + PubsubUnboundedSink getOverriddenTransform() { return transform; } @@ -2369,8 +2462,51 @@ public PDone apply(PCollection input) { protected String getKindString() { return "StreamingPubsubIOWrite"; } + + static { + DataflowPipelineTranslator.registerTransformTranslator( + StreamingPubsubIOWrite.class, new StreamingPubsubIOWriteTranslator()); + } + } + + /** + * Rewrite {@link StreamingPubsubIOWrite} to the appropriate internal node. + */ + private static class StreamingPubsubIOWriteTranslator implements + TransformTranslator { + + @Override + @SuppressWarnings({"rawtypes", "unchecked"}) + public void translate( + StreamingPubsubIOWrite transform, + TranslationContext context) { + translateTyped(transform, context); + } + + private void translateTyped( + StreamingPubsubIOWrite transform, + TranslationContext context) { + checkArgument(context.getPipelineOptions().isStreaming(), + "StreamingPubsubIOWrite is only for streaming pipelines."); + PubsubUnboundedSink overriddenTransform = transform.getOverriddenTransform(); + context.addStep(transform, "ParallelWrite"); + context.addInput(PropertyNames.FORMAT, "pubsub"); + context.addInput(PropertyNames.PUBSUB_TOPIC, overriddenTransform.getTopic().getV1Beta1Path()); + if (overriddenTransform.getTimestampLabel() != null) { + context.addInput(PropertyNames.PUBSUB_TIMESTAMP_LABEL, + overriddenTransform.getTimestampLabel()); + } + if (overriddenTransform.getIdLabel() != null) { + context.addInput(PropertyNames.PUBSUB_ID_LABEL, overriddenTransform.getIdLabel()); + } + context.addEncodingInput( + WindowedValue.getValueOnlyCoder(overriddenTransform.getElementCoder())); + context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform)); + } } + // ================================================================================ + /** * Specialized implementation for * {@link org.apache.beam.sdk.io.Read.Unbounded Read.Unbounded} for the @@ -2912,11 +3048,14 @@ public Coder> getDefaultOutputCoder(CoderRegistry registry, Coder inp } /** - * Specialized expansion for unsupported IO transforms that throws an error. + * Specialized expansion for unsupported IO transforms and DoFns that throws an error. */ private static class UnsupportedIO extends PTransform { + @Nullable private PTransform transform; + @Nullable + private DoFn doFn; /** * Builds an instance of this class from the overridden transform. @@ -2974,13 +3113,50 @@ public UnsupportedIO(DataflowPipelineRunner runner, TextIO.Write.Bound transf this.transform = transform; } + /** + * Builds an instance of this class from the overridden doFn. + */ + @SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply() + public UnsupportedIO(DataflowPipelineRunner runner, + PubsubIO.Read.Bound.PubsubBoundedReader doFn) { + this.doFn = doFn; + } + + /** + * Builds an instance of this class from the overridden doFn. + */ + @SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply() + public UnsupportedIO(DataflowPipelineRunner runner, + PubsubIO.Write.Bound.PubsubBoundedWriter doFn) { + this.doFn = doFn; + } + + /** + * Builds an instance of this class from the overridden transform. + */ + @SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply() + public UnsupportedIO(DataflowPipelineRunner runner, PubsubUnboundedSource transform) { + this.transform = transform; + } + + /** + * Builds an instance of this class from the overridden transform. + */ + @SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply() + public UnsupportedIO(DataflowPipelineRunner runner, PubsubUnboundedSink transform) { + this.transform = transform; + } + + @Override public OutputT apply(InputT input) { String mode = input.getPipeline().getOptions().as(StreamingOptions.class).isStreaming() ? "streaming" : "batch"; + String name = + transform == null ? approximateSimpleName(doFn.getClass()) : + approximatePTransformName(transform.getClass()); throw new UnsupportedOperationException( - String.format("The DataflowPipelineRunner in %s mode does not support %s.", - mode, approximatePTransformName(transform.getClass()))); + String.format("The DataflowPipelineRunner in %s mode does not support %s.", mode, name)); } } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index d82280393910..7f673932f33d 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -32,7 +32,6 @@ import static com.google.common.base.Preconditions.checkNotNull; import org.apache.beam.runners.dataflow.DataflowPipelineRunner.GroupByKeyAndSortValuesOnly; -import org.apache.beam.runners.dataflow.internal.PubsubIOTranslator; import org.apache.beam.runners.dataflow.internal.ReadTranslator; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.dataflow.util.DoFnInfo; @@ -41,7 +40,6 @@ import org.apache.beam.sdk.Pipeline.PipelineVisitor; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableCoder; -import org.apache.beam.sdk.io.PubsubIO; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.runners.TransformTreeNode; @@ -1009,12 +1007,6 @@ private void translateHelper( /////////////////////////////////////////////////////////////////////////// // IO Translation. - registerTransformTranslator( - PubsubIO.Read.Bound.class, new PubsubIOTranslator.ReadTranslator()); - registerTransformTranslator( - DataflowPipelineRunner.StreamingPubsubIOWrite.class, - new PubsubIOTranslator.WriteTranslator()); - registerTransformTranslator(Read.Bounded.class, new ReadTranslator()); } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/PubsubIOTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/PubsubIOTranslator.java deleted file mode 100755 index 976f948dd1c2..000000000000 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/PubsubIOTranslator.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow.internal; - -import org.apache.beam.runners.dataflow.DataflowPipelineRunner; -import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TransformTranslator; -import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TranslationContext; -import org.apache.beam.sdk.io.PubsubIO; -import org.apache.beam.sdk.util.PropertyNames; -import org.apache.beam.sdk.util.WindowedValue; - -/** - * Pubsub transform support code for the Dataflow backend. - */ -public class PubsubIOTranslator { - - /** - * Implements PubsubIO Read translation for the Dataflow backend. - */ - public static class ReadTranslator implements TransformTranslator> { - @Override - @SuppressWarnings({"rawtypes", "unchecked"}) - public void translate( - PubsubIO.Read.Bound transform, - TranslationContext context) { - translateReadHelper(transform, context); - } - - private void translateReadHelper( - PubsubIO.Read.Bound transform, - TranslationContext context) { - if (!context.getPipelineOptions().isStreaming()) { - throw new IllegalArgumentException( - "PubsubIO.Read can only be used with the Dataflow streaming runner."); - } - - context.addStep(transform, "ParallelRead"); - context.addInput(PropertyNames.FORMAT, "pubsub"); - if (transform.getTopic() != null) { - context.addInput(PropertyNames.PUBSUB_TOPIC, transform.getTopic().asV1Beta1Path()); - } - if (transform.getSubscription() != null) { - context.addInput( - PropertyNames.PUBSUB_SUBSCRIPTION, transform.getSubscription().asV1Beta1Path()); - } - if (transform.getTimestampLabel() != null) { - context.addInput(PropertyNames.PUBSUB_TIMESTAMP_LABEL, transform.getTimestampLabel()); - } - if (transform.getIdLabel() != null) { - context.addInput(PropertyNames.PUBSUB_ID_LABEL, transform.getIdLabel()); - } - context.addValueOnlyOutput(PropertyNames.OUTPUT, context.getOutput(transform)); - } - } - - /** - * Implements PubsubIO Write translation for the Dataflow backend. - */ - public static class WriteTranslator - implements TransformTranslator> { - - @Override - @SuppressWarnings({"rawtypes", "unchecked"}) - public void translate( - DataflowPipelineRunner.StreamingPubsubIOWrite transform, - TranslationContext context) { - translateWriteHelper(transform, context); - } - - private void translateWriteHelper( - DataflowPipelineRunner.StreamingPubsubIOWrite customTransform, - TranslationContext context) { - if (!context.getPipelineOptions().isStreaming()) { - throw new IllegalArgumentException( - "PubsubIO.Write is non-primitive for the Dataflow batch runner."); - } - - PubsubIO.Write.Bound transform = customTransform.getOverriddenTransform(); - - context.addStep(customTransform, "ParallelWrite"); - context.addInput(PropertyNames.FORMAT, "pubsub"); - context.addInput(PropertyNames.PUBSUB_TOPIC, transform.getTopic().asV1Beta1Path()); - if (transform.getTimestampLabel() != null) { - context.addInput(PropertyNames.PUBSUB_TIMESTAMP_LABEL, transform.getTimestampLabel()); - } - if (transform.getIdLabel() != null) { - context.addInput(PropertyNames.PUBSUB_ID_LABEL, transform.getIdLabel()); - } - context.addEncodingInput(WindowedValue.getValueOnlyCoder(transform.getCoder())); - context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(customTransform)); - } - } -} diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowPubsubIOTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowPubsubIOTest.java index 4874877d73fd..3df9cdb9d4e7 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowPubsubIOTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowPubsubIOTest.java @@ -42,21 +42,22 @@ public class DataflowPubsubIOTest { @Test public void testPrimitiveWriteDisplayData() { DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(); - PubsubIO.Write.Bound write = PubsubIO.Write - .topic("projects/project/topics/topic"); + PubsubIO.Write.Bound write = PubsubIO.Write.topic("projects/project/topics/topic"); Set displayData = evaluator.displayDataForPrimitiveTransforms(write); assertThat("PubsubIO.Write should include the topic in its primitive display data", - displayData, hasItem(hasDisplayItem("topic"))); + displayData, hasItem(hasDisplayItem("topic"))); } @Test public void testPrimitiveReadDisplayData() { DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(); - PubsubIO.Read.Bound read = PubsubIO.Read.topic("projects/project/topics/topic"); + PubsubIO.Read.Bound read = + PubsubIO.Read.subscription("projects/project/subscriptions/subscription") + .maxNumRecords(1); Set displayData = evaluator.displayDataForPrimitiveTransforms(read); - assertThat("PubsubIO.Read should include the topic in its primitive display data", - displayData, hasItem(hasDisplayItem("topic"))); + assertThat("PubsubIO.Read should include the subscription in its primitive display data", + displayData, hasItem(hasDisplayItem("subscription"))); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java index 78fec852c666..dc08ddcdb42c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java @@ -36,10 +36,10 @@ import org.apache.beam.sdk.util.PubsubClient; import org.apache.beam.sdk.util.PubsubClient.IncomingMessage; import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage; +import org.apache.beam.sdk.util.PubsubClient.ProjectPath; +import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath; import org.apache.beam.sdk.util.PubsubClient.TopicPath; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.PInput; @@ -54,7 +54,6 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; -import java.util.Random; import java.util.regex.Matcher; import java.util.regex.Pattern; import javax.annotation.Nullable; @@ -634,12 +633,12 @@ public Bound maxReadTime(Duration maxReadTime) { @Override public PCollection apply(PInput input) { if (topic == null && subscription == null) { - throw new IllegalStateException("need to set either the topic or the subscription for " + throw new IllegalStateException("Need to set either the topic or the subscription for " + "a PubsubIO.Read transform"); } if (topic != null && subscription != null) { - throw new IllegalStateException("Can't set both the topic and the subscription for a " - + "PubsubIO.Read transform"); + throw new IllegalStateException("Can't set both the topic and the subscription for " + + "a PubsubIO.Read transform"); } boolean boundedOutput = getMaxNumRecords() > 0 || getMaxReadTime() != null; @@ -649,9 +648,20 @@ public PCollection apply(PInput input) { .apply(Create.of((Void) null)).setCoder(VoidCoder.of()) .apply(ParDo.of(new PubsubBoundedReader())).setCoder(coder); } else { - return PCollection.createPrimitiveOutputInternal( - input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED) - .setCoder(coder); + @Nullable ProjectPath projectPath = + topic == null ? null : + PubsubClient.projectPathFromId(topic.project); + @Nullable TopicPath topicPath = + topic == null ? null : + PubsubClient.topicPathFromName(topic.project, topic.topic); + @Nullable SubscriptionPath subscriptionPath = + subscription == null ? null : + PubsubClient.subscriptionPathFromName(subscription.project, + subscription.subscription); + return input.getPipeline().begin() + .apply(new PubsubUnboundedSource( + FACTORY, projectPath, topicPath, subscriptionPath, + coder, timestampLabel, idLabel)); } } @@ -707,12 +717,16 @@ public Duration getMaxReadTime() { /** * Default reader when Pubsub subscription has some form of upper bound. - *

TODO: Consider replacing with BoundedReadFromUnboundedSource on top of upcoming - * PubsubUnboundedSource. - *

NOTE: This is not the implementation used when running on the Google Dataflow hosted - * service. + * + *

TODO: Consider replacing with BoundedReadFromUnboundedSource on top + * of PubsubUnboundedSource. + * + *

NOTE: This is not the implementation used when running on the Google Cloud Dataflow + * service in streaming mode. + * + *

Public so can be suppressed by runners. */ - private class PubsubBoundedReader extends DoFn { + public class PubsubBoundedReader extends DoFn { private static final int DEFAULT_PULL_SIZE = 100; private static final int ACK_TIMEOUT_SEC = 60; @@ -724,20 +738,20 @@ public void processElement(ProcessContext c) throws IOException { PubsubClient.SubscriptionPath subscriptionPath; if (getSubscription() == null) { - // Create a randomized subscription derived from the topic name. - String subscription = getTopic().topic + "_dataflow_" + new Random().nextLong(); + TopicPath topicPath = + PubsubClient.topicPathFromName(getTopic().project, getTopic().topic); // The subscription will be registered under this pipeline's project if we know it. // Otherwise we'll fall back to the topic's project. // Note that they don't need to be the same. - String project = c.getPipelineOptions().as(PubsubOptions.class).getProject(); - if (Strings.isNullOrEmpty(project)) { - project = getTopic().project; + String projectId = + c.getPipelineOptions().as(PubsubOptions.class).getProject(); + if (Strings.isNullOrEmpty(projectId)) { + projectId = getTopic().project; } - subscriptionPath = PubsubClient.subscriptionPathFromName(project, subscription); - TopicPath topicPath = - PubsubClient.topicPathFromName(getTopic().project, getTopic().topic); + ProjectPath projectPath = PubsubClient.projectPathFromId(projectId); try { - pubsubClient.createSubscription(topicPath, subscriptionPath, ACK_TIMEOUT_SEC); + subscriptionPath = + pubsubClient.createRandomSubscription(projectPath, topicPath, ACK_TIMEOUT_SEC); } catch (Exception e) { throw new RuntimeException("Failed to create subscription: ", e); } @@ -795,6 +809,12 @@ public void processElement(ProcessContext c) throws IOException { } } } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + Bound.this.populateDisplayData(builder); + } } } @@ -961,8 +981,20 @@ public PDone apply(PCollection input) { if (topic == null) { throw new IllegalStateException("need to set the topic of a PubsubIO.Write transform"); } - input.apply(ParDo.of(new PubsubWriter())); - return PDone.in(input.getPipeline()); + switch (input.isBounded()) { + case BOUNDED: + input.apply(ParDo.of(new PubsubBoundedWriter())); + return PDone.in(input.getPipeline()); + case UNBOUNDED: + return input.apply(new PubsubUnboundedSink( + FACTORY, + PubsubClient.topicPathFromName(topic.project, topic.topic), + coder, + timestampLabel, + idLabel, + 100 /* numShards */)); + } + throw new RuntimeException(); // cases are exhaustive. } @Override @@ -993,11 +1025,14 @@ public Coder getCoder() { } /** - * Writer to Pubsub which batches messages. - *

NOTE: This is not the implementation used when running on the Google Dataflow hosted - * service. + * Writer to Pubsub which batches messages from bounded collections. + * + *

NOTE: This is not the implementation used when running on the Google Cloud Dataflow + * service in streaming mode. + * + *

Public so can be suppressed by runners. */ - private class PubsubWriter extends DoFn { + public class PubsubBoundedWriter extends DoFn { private static final int MAX_PUBLISH_BATCH_SIZE = 100; private transient List output; private transient PubsubClient pubsubClient; @@ -1005,15 +1040,18 @@ private class PubsubWriter extends DoFn { @Override public void startBundle(Context c) throws IOException { this.output = new ArrayList<>(); - this.pubsubClient = FACTORY.newClient(timestampLabel, idLabel, - c.getPipelineOptions().as(PubsubOptions.class)); + // NOTE: idLabel is ignored. + this.pubsubClient = + FACTORY.newClient(timestampLabel, null, + c.getPipelineOptions().as(PubsubOptions.class)); } @Override public void processElement(ProcessContext c) throws IOException { + // NOTE: The record id is always null. OutgoingMessage message = new OutgoingMessage(CoderUtils.encodeToByteArray(getCoder(), c.element()), - c.timestamp().getMillis()); + c.timestamp().getMillis(), null); output.add(message); if (output.size() >= MAX_PUBLISH_BATCH_SIZE) { @@ -1041,6 +1079,7 @@ private void publish() throws IOException { @Override public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); Bound.this.populateDisplayData(builder); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java index 7ca2b57bc4cf..6ff9b40d39d0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java @@ -26,6 +26,8 @@ import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.options.PubsubOptions; import org.apache.beam.sdk.transforms.Aggregator; @@ -52,6 +54,7 @@ import org.apache.beam.sdk.values.PDone; import com.google.common.annotations.VisibleForTesting; +import com.google.common.hash.Hashing; import org.joda.time.Duration; import org.slf4j.Logger; @@ -62,6 +65,7 @@ import java.io.OutputStream; import java.util.ArrayList; import java.util.List; +import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import javax.annotation.Nullable; @@ -81,6 +85,8 @@ *

  • A failed bundle will cause messages to be resent. Thus we rely on the Pubsub consumer * to dedup messages. * + * + *

    NOTE: This is not the implementation used when running on the Google Cloud Dataflow service. */ public class PubsubUnboundedSink extends PTransform, PDone> { private static final Logger LOG = LoggerFactory.getLogger(PubsubUnboundedSink.class); @@ -104,12 +110,16 @@ public class PubsubUnboundedSink extends PTransform, PDone> { * Coder for conveying outgoing messages between internal stages. */ private static class OutgoingMessageCoder extends CustomCoder { + private static final NullableCoder RECORD_ID_CODER = + NullableCoder.of(StringUtf8Coder.of()); + @Override public void encode( OutgoingMessage value, OutputStream outStream, Context context) throws CoderException, IOException { ByteArrayCoder.of().encode(value.elementBytes, outStream, Context.NESTED); BigEndianLongCoder.of().encode(value.timestampMsSinceEpoch, outStream, Context.NESTED); + RECORD_ID_CODER.encode(value.recordId, outStream, Context.NESTED); } @Override @@ -117,13 +127,31 @@ public OutgoingMessage decode( InputStream inStream, Context context) throws CoderException, IOException { byte[] elementBytes = ByteArrayCoder.of().decode(inStream, Context.NESTED); long timestampMsSinceEpoch = BigEndianLongCoder.of().decode(inStream, Context.NESTED); - return new OutgoingMessage(elementBytes, timestampMsSinceEpoch); + @Nullable String recordId = RECORD_ID_CODER.decode(inStream, Context.NESTED); + return new OutgoingMessage(elementBytes, timestampMsSinceEpoch, recordId); } } @VisibleForTesting static final Coder CODER = new OutgoingMessageCoder(); + // ================================================================================ + // RecordIdMethod + // ================================================================================ + + /** + * Specify how record ids are to be generated. + */ + @VisibleForTesting + enum RecordIdMethod { + /** Leave null. */ + NONE, + /** Generate randomly. */ + RANDOM, + /** Generate deterministically. For testing only. */ + DETERMINISTIC + } + // ================================================================================ // ShardFn // ================================================================================ @@ -136,10 +164,12 @@ private static class ShardFn extends DoFn> { createAggregator("elements", new Sum.SumLongFn()); private final Coder elementCoder; private final int numShards; + private final RecordIdMethod recordIdMethod; - ShardFn(Coder elementCoder, int numShards) { + ShardFn(Coder elementCoder, int numShards, RecordIdMethod recordIdMethod) { this.elementCoder = elementCoder; this.numShards = numShards; + this.recordIdMethod = recordIdMethod; } @Override @@ -147,9 +177,23 @@ public void processElement(ProcessContext c) throws Exception { elementCounter.addValue(1L); byte[] elementBytes = CoderUtils.encodeToByteArray(elementCoder, c.element()); long timestampMsSinceEpoch = c.timestamp().getMillis(); - // TODO: A random record id should be assigned here. + @Nullable String recordId = null; + switch (recordIdMethod) { + case NONE: + break; + case DETERMINISTIC: + recordId = Hashing.murmur3_128().hashBytes(elementBytes).toString(); + break; + case RANDOM: + // Since these elements go through a GroupByKey, any failures while sending to + // Pubsub will be retried without falling back and generating a new record id. + // Thus even though we may send the same message to Pubsub twice, it is guaranteed + // to have the same record id. + recordId = UUID.randomUUID().toString(); + break; + } c.output(KV.of(ThreadLocalRandom.current().nextInt(numShards), - new OutgoingMessage(elementBytes, timestampMsSinceEpoch))); + new OutgoingMessage(elementBytes, timestampMsSinceEpoch, recordId))); } @Override @@ -319,6 +363,12 @@ public void populateDisplayData(Builder builder) { */ private final Duration maxLatency; + /** + * How record ids should be generated for each record (if {@link #idLabel} is non-{@literal + * null}). + */ + private final RecordIdMethod recordIdMethod; + @VisibleForTesting PubsubUnboundedSink( PubsubClientFactory pubsubFactory, @@ -329,7 +379,8 @@ public void populateDisplayData(Builder builder) { int numShards, int publishBatchSize, int publishBatchBytes, - Duration maxLatency) { + Duration maxLatency, + RecordIdMethod recordIdMethod) { this.pubsubFactory = pubsubFactory; this.topic = topic; this.elementCoder = elementCoder; @@ -339,6 +390,7 @@ public void populateDisplayData(Builder builder) { this.publishBatchSize = publishBatchSize; this.publishBatchBytes = publishBatchBytes; this.maxLatency = maxLatency; + this.recordIdMethod = idLabel == null ? RecordIdMethod.NONE : recordIdMethod; } public PubsubUnboundedSink( @@ -349,7 +401,8 @@ public PubsubUnboundedSink( String idLabel, int numShards) { this(pubsubFactory, topic, elementCoder, timestampLabel, idLabel, numShards, - DEFAULT_PUBLISH_BATCH_SIZE, DEFAULT_PUBLISH_BATCH_BYTES, DEFAULT_MAX_LATENCY); + DEFAULT_PUBLISH_BATCH_SIZE, DEFAULT_PUBLISH_BATCH_BYTES, DEFAULT_MAX_LATENCY, + RecordIdMethod.RANDOM); } public TopicPath getTopic() { @@ -382,7 +435,7 @@ public PDone apply(PCollection input) { .plusDelayOf(maxLatency)))) .discardingFiredPanes()) .apply(ParDo.named("PubsubUnboundedSink.Shard") - .of(new ShardFn(elementCoder, numShards))) + .of(new ShardFn(elementCoder, numShards, recordIdMethod))) .setCoder(KvCoder.of(VarIntCoder.of(), CODER)) .apply(GroupByKey.create()) .apply(ParDo.named("PubsubUnboundedSink.Writer") diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java index d635a8a3860b..0492c7623677 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; @@ -42,13 +43,16 @@ import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.MovingFunction; import org.apache.beam.sdk.util.PubsubClient; +import org.apache.beam.sdk.util.PubsubClient.ProjectPath; import org.apache.beam.sdk.util.PubsubClient.PubsubClientFactory; import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath; +import org.apache.beam.sdk.util.PubsubClient.TopicPath; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import com.google.api.client.util.Clock; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Charsets; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; @@ -102,10 +106,17 @@ * are blocking. We rely on the underlying runner to allow multiple * {@link UnboundedSource.UnboundedReader} instances to execute concurrently and thus hide latency. * + * + *

    NOTE: This is not the implementation used when running on the Google Cloud Dataflow service. */ public class PubsubUnboundedSource extends PTransform> { private static final Logger LOG = LoggerFactory.getLogger(PubsubUnboundedSource.class); + /** + * Default ACK timeout for created subscriptions. + */ + private static final int DEAULT_ACK_TIMEOUT_SEC = 60; + /** * Coder for checkpoints. */ @@ -291,6 +302,17 @@ public void finalizeCheckpoint() throws IOException { } } + /** + * Return current time according to {@code reader}. + */ + private static long now(PubsubReader reader) { + if (reader.outer.outer.clock == null) { + return System.currentTimeMillis(); + } else { + return reader.outer.outer.clock.currentTimeMillis(); + } + } + /** * BLOCKING * NACK all messages which have been read from Pubsub but not passed downstream. @@ -303,13 +325,13 @@ public void nackAll(PubsubReader reader) throws IOException { for (String ackId : notYetReadIds) { batchYetToAckIds.add(ackId); if (batchYetToAckIds.size() >= ACK_BATCH_SIZE) { - long nowMsSinceEpoch = reader.outer.outer.clock.currentTimeMillis(); + long nowMsSinceEpoch = now(reader); reader.nackBatch(nowMsSinceEpoch, batchYetToAckIds); batchYetToAckIds.clear(); } } if (!batchYetToAckIds.isEmpty()) { - long nowMsSinceEpoch = reader.outer.outer.clock.currentTimeMillis(); + long nowMsSinceEpoch = now(reader); reader.nackBatch(nowMsSinceEpoch, batchYetToAckIds); } } @@ -614,7 +636,11 @@ private void extendBatch(long nowMsSinceEpoch, List ackIds) throws IOExc * Return the current time, in ms since epoch. */ private long now() { - return outer.outer.clock.currentTimeMillis(); + if (outer.outer.clock == null) { + return System.currentTimeMillis(); + } else { + return outer.outer.clock.currentTimeMillis(); + } } /** @@ -928,7 +954,7 @@ public byte[] getCurrentRecordId() throws NoSuchElementException { if (current == null) { throw new NoSuchElementException(); } - return current.recordId; + return current.recordId.getBytes(Charsets.UTF_8); } @Override @@ -1124,8 +1150,9 @@ public void populateDisplayData(Builder builder) { // ================================================================================ /** - * Clock to use for all timekeeping. + * For testing only: Clock to use for all timekeeping. If {@literal null} use system clock. */ + @Nullable private Clock clock; /** @@ -1134,9 +1161,28 @@ public void populateDisplayData(Builder builder) { private final PubsubClientFactory pubsubFactory; /** - * Subscription to read from. + * Project under which to create a subscription if only the {@link #topic} was given. + */ + @Nullable + private final ProjectPath project; + + /** + * Topic to read from. If {@literal null}, then {@link #subscription} must be given. + * Otherwise {@link #subscription} must be null. */ - private final SubscriptionPath subscription; + @Nullable + private final TopicPath topic; + + /** + * Subscription to read from. If {@literal null} then {@link #topic} must be given. + * Otherwise {@link #topic} must be null. + * + *

    If no subscription is given a random one will be created when the transorm is + * applied. This field will be update with that subscription's path. The created + * subscription is never deleted. + */ + @Nullable + private SubscriptionPath subscription; /** * Coder for elements. Elements are effectively double-encoded: first to a byte array @@ -1159,25 +1205,60 @@ public void populateDisplayData(Builder builder) { @Nullable private final String idLabel; - /** - * Construct an unbounded source to consume from the Pubsub {@code subscription}. - */ - public PubsubUnboundedSource( + @VisibleForTesting + PubsubUnboundedSource( Clock clock, PubsubClientFactory pubsubFactory, - SubscriptionPath subscription, + @Nullable ProjectPath project, + @Nullable TopicPath topic, + @Nullable SubscriptionPath subscription, Coder elementCoder, @Nullable String timestampLabel, @Nullable String idLabel) { + checkArgument((topic == null) != (subscription == null), + "Exactly one of topic and subscription must be given"); + checkArgument((topic == null) == (project == null), + "Project must be given if topic is given"); this.clock = clock; this.pubsubFactory = checkNotNull(pubsubFactory); - this.subscription = checkNotNull(subscription); + this.project = project; + this.topic = topic; + this.subscription = subscription; this.elementCoder = checkNotNull(elementCoder); this.timestampLabel = timestampLabel; this.idLabel = idLabel; } - public PubsubClient.SubscriptionPath getSubscription() { + /** + * Construct an unbounded source to consume from the Pubsub {@code subscription}. + */ + public PubsubUnboundedSource( + PubsubClientFactory pubsubFactory, + @Nullable ProjectPath project, + @Nullable TopicPath topic, + @Nullable SubscriptionPath subscription, + Coder elementCoder, + @Nullable String timestampLabel, + @Nullable String idLabel) { + this(null, pubsubFactory, project, topic, subscription, elementCoder, timestampLabel, idLabel); + } + + public Coder getElementCoder() { + return elementCoder; + } + + @Nullable + public ProjectPath getProject() { + return project; + } + + @Nullable + public TopicPath getTopic() { + return topic; + } + + @Nullable + public SubscriptionPath getSubscription() { return subscription; } @@ -1191,12 +1272,26 @@ public String getIdLabel() { return idLabel; } - public Coder getElementCoder() { - return elementCoder; - } - @Override public PCollection apply(PBegin input) { + if (subscription == null) { + try { + try (PubsubClient pubsubClient = + pubsubFactory.newClient(timestampLabel, idLabel, + input.getPipeline() + .getOptions() + .as(PubsubOptions.class))) { + subscription = + pubsubClient.createRandomSubscription(project, topic, DEAULT_ACK_TIMEOUT_SEC); + LOG.warn("Created subscription {} to topic {}." + + " Note this subscription WILL NOT be deleted when the pipeline terminates", + subscription, topic); + } + } catch (Exception e) { + throw new RuntimeException("Failed to create subscription: ", e); + } + } + return input.getPipeline().begin() .apply(Read.from(new PubsubSource(this))) .apply(ParDo.named("PubsubUnboundedSource.Stats") diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java index aa73d421b695..08981d01212e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java @@ -40,7 +40,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; -import com.google.common.hash.Hashing; import java.io.IOException; import java.util.ArrayList; @@ -135,11 +134,8 @@ public int publish(TopicPath topic, List outgoingMessages) attributes.put(timestampLabel, String.valueOf(outgoingMessage.timestampMsSinceEpoch)); } - if (idLabel != null) { - // TODO: The id should be associated with the OutgoingMessage so that it is stable - // across retried bundles - attributes.put(idLabel, - Hashing.murmur3_128().hashBytes(outgoingMessage.elementBytes).toString()); + if (idLabel != null && !Strings.isNullOrEmpty(outgoingMessage.recordId)) { + attributes.put(idLabel, outgoingMessage.recordId); } pubsubMessages.add(pubsubMessage); @@ -185,15 +181,13 @@ public List pull( checkState(!Strings.isNullOrEmpty(ackId)); // Record id, if any. - @Nullable byte[] recordId = null; + @Nullable String recordId = null; if (idLabel != null && attributes != null) { - String recordIdString = attributes.get(idLabel); - if (!Strings.isNullOrEmpty(recordIdString)) { - recordId = recordIdString.getBytes(); - } + recordId = attributes.get(idLabel); } - if (recordId == null) { - recordId = pubsubMessage.getMessageId().getBytes(); + if (Strings.isNullOrEmpty(recordId)) { + // Fall back to the Pubsub provided message id. + recordId = pubsubMessage.getMessageId(); } incomingMessages.add(new IncomingMessage(elementBytes, timestampMsSinceEpoch, diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java index dc4858e2014f..07ce97df13bd 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java @@ -33,6 +33,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; import javax.annotation.Nullable; /** @@ -132,6 +133,12 @@ public String getPath() { return path; } + public String getId() { + String[] splits = path.split("/"); + checkState(splits.length == 1, "Malformed project path %s", path); + return splits[1]; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -180,6 +187,12 @@ public String getPath() { return path; } + public String getName() { + String[] splits = path.split("/"); + checkState(splits.length == 4, "Malformed subscription path %s", path); + return splits[3]; + } + public String getV1Beta1Path() { String[] splits = path.split("/"); checkState(splits.length == 4, "Malformed subscription path %s", path); @@ -233,6 +246,12 @@ public String getPath() { return path; } + public String getName() { + String[] splits = path.split("/"); + checkState(splits.length == 4, "Malformed topic path %s", path); + return splits[3]; + } + public String getV1Beta1Path() { String[] splits = path.split("/"); checkState(splits.length == 4, "Malformed topic path %s", path); @@ -286,11 +305,18 @@ public static class OutgoingMessage implements Serializable { */ public final long timestampMsSinceEpoch; - // TODO: Support a record id. + /** + * If using an id label, the record id to associate with this record's metadata so the receiver + * can reject duplicates. Otherwise {@literal null}. + */ + @Nullable + public final String recordId; - public OutgoingMessage(byte[] elementBytes, long timestampMsSinceEpoch) { + public OutgoingMessage( + byte[] elementBytes, long timestampMsSinceEpoch, @Nullable String recordId) { this.elementBytes = elementBytes; this.timestampMsSinceEpoch = timestampMsSinceEpoch; + this.recordId = recordId; } @Override @@ -310,16 +336,14 @@ public boolean equals(Object o) { OutgoingMessage that = (OutgoingMessage) o; - if (timestampMsSinceEpoch != that.timestampMsSinceEpoch) { - return false; - } - return Arrays.equals(elementBytes, that.elementBytes); - + return timestampMsSinceEpoch == that.timestampMsSinceEpoch + && Arrays.equals(elementBytes, that.elementBytes) + && Objects.equal(recordId, that.recordId); } @Override public int hashCode() { - return Objects.hashCode(Arrays.hashCode(elementBytes), timestampMsSinceEpoch); + return Objects.hashCode(Arrays.hashCode(elementBytes), timestampMsSinceEpoch, recordId); } } @@ -353,14 +377,14 @@ public static class IncomingMessage implements Serializable { /** * Id to pass to the runner to distinguish this message from all others. */ - public final byte[] recordId; + public final String recordId; public IncomingMessage( byte[] elementBytes, long timestampMsSinceEpoch, long requestTimeMsSinceEpoch, String ackId, - byte[] recordId) { + String recordId) { this.elementBytes = elementBytes; this.timestampMsSinceEpoch = timestampMsSinceEpoch; this.requestTimeMsSinceEpoch = requestTimeMsSinceEpoch; @@ -390,26 +414,18 @@ public boolean equals(Object o) { IncomingMessage that = (IncomingMessage) o; - if (timestampMsSinceEpoch != that.timestampMsSinceEpoch) { - return false; - } - if (requestTimeMsSinceEpoch != that.requestTimeMsSinceEpoch) { - return false; - } - if (!Arrays.equals(elementBytes, that.elementBytes)) { - return false; - } - if (!ackId.equals(that.ackId)) { - return false; - } - return Arrays.equals(recordId, that.recordId); + return timestampMsSinceEpoch == that.timestampMsSinceEpoch + && requestTimeMsSinceEpoch == that.requestTimeMsSinceEpoch + && ackId.equals(that.ackId) + && recordId.equals(that.recordId) + && Arrays.equals(elementBytes, that.elementBytes); } @Override public int hashCode() { return Objects.hashCode(Arrays.hashCode(elementBytes), timestampMsSinceEpoch, requestTimeMsSinceEpoch, - ackId, Arrays.hashCode(recordId)); + ackId, recordId); } } @@ -484,6 +500,22 @@ public abstract void modifyAckDeadline( public abstract void createSubscription( TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException; + /** + * Create a random subscription for {@code topic}. Return the {@link SubscriptionPath}. It + * is the responsibility of the caller to later delete the subscription. + * + * @throws IOException + */ + public SubscriptionPath createRandomSubscription( + ProjectPath project, TopicPath topic, int ackDeadlineSeconds) throws IOException { + // Create a randomized subscription derived from the topic name. + String subscriptionName = topic.getName() + "_beam_" + ThreadLocalRandom.current().nextLong(); + SubscriptionPath subscription = + PubsubClient.subscriptionPathFromName(project.getId(), subscriptionName); + createSubscription(topic, subscription, ackDeadlineSeconds); + return subscription; + } + /** * Delete {@code subscription}. * @@ -507,7 +539,7 @@ public abstract List listSubscriptions(ProjectPath project, To public abstract int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException; /** - * Return {@literal true} if {@link pull} will always return empty list. Actual clients + * Return {@literal true} if {@link #pull} will always return empty list. Actual clients * will return {@literal false}. Test clients may return {@literal true} to signal that all * expected messages have been pulled and the test may complete. */ diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java index e759513efb25..ac157fb80309 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java @@ -27,7 +27,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; -import com.google.common.hash.Hashing; import com.google.protobuf.ByteString; import com.google.protobuf.Timestamp; import com.google.pubsub.v1.AcknowledgeRequest; @@ -257,10 +256,8 @@ public int publish(TopicPath topic, List outgoingMessages) .put(timestampLabel, String.valueOf(outgoingMessage.timestampMsSinceEpoch)); } - if (idLabel != null) { - message.getMutableAttributes() - .put(idLabel, - Hashing.murmur3_128().hashBytes(outgoingMessage.elementBytes).toString()); + if (idLabel != null && !Strings.isNullOrEmpty(outgoingMessage.recordId)) { + message.getMutableAttributes().put(idLabel, outgoingMessage.recordId); } request.addMessages(message); @@ -308,15 +305,13 @@ public List pull( checkState(!Strings.isNullOrEmpty(ackId)); // Record id, if any. - @Nullable byte[] recordId = null; + @Nullable String recordId = null; if (idLabel != null && attributes != null) { - String recordIdString = attributes.get(idLabel); - if (recordIdString != null && !recordIdString.isEmpty()) { - recordId = recordIdString.getBytes(); - } + recordId = attributes.get(idLabel); } - if (recordId == null) { - recordId = pubsubMessage.getMessageId().getBytes(); + if (Strings.isNullOrEmpty(recordId)) { + // Fall back to the Pubsub provided message id. + recordId = pubsubMessage.getMessageId(); } incomingMessages.add(new IncomingMessage(elementBytes, timestampMsSinceEpoch, diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java index c1dfa060cc05..9fa03803836b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java @@ -46,10 +46,9 @@ public class PubsubTestClient extends PubsubClient { * Mimic the state of the simulated Pubsub 'service'. * * Note that the {@link PubsubTestClientFactory} is serialized/deserialized even when running - * test - * pipelines. Meanwhile it is valid for multiple {@link PubsubTestClient}s to be created from - * the same client factory and run in parallel. Thus we can't enforce aliasing of the following - * data structures over all clients and must resort to a static. + * test pipelines. Meanwhile it is valid for multiple {@link PubsubTestClient}s to be created + * from the same client factory and run in parallel. Thus we can't enforce aliasing of the + * following data structures over all clients and must resort to a static. */ private static class State { /** @@ -69,6 +68,13 @@ private static class State { @Nullable Set remainingExpectedOutgoingMessages; + /** + * Publish mode only: Messages which should throw when first sent to simulate transient publish + * failure. + */ + @Nullable + Set remainingFailingOutgoingMessages; + /** * Pull mode only: Clock from which to get current time. */ @@ -119,11 +125,13 @@ public interface PubsubTestClientFactory extends PubsubClientFactory, Closeable */ public static PubsubTestClientFactory createFactoryForPublish( final TopicPath expectedTopic, - final Iterable expectedOutgoingMessages) { + final Iterable expectedOutgoingMessages, + final Iterable failingOutgoingMessages) { synchronized (STATE) { checkState(!STATE.isActive, "Test still in flight"); STATE.expectedTopic = expectedTopic; STATE.remainingExpectedOutgoingMessages = Sets.newHashSet(expectedOutgoingMessages); + STATE.remainingFailingOutgoingMessages = Sets.newHashSet(failingOutgoingMessages); STATE.isActive = true; } return new PubsubTestClientFactory() { @@ -257,6 +265,9 @@ public int publish( checkState(topic.equals(STATE.expectedTopic), "Topic %s does not match expected %s", topic, STATE.expectedTopic); for (OutgoingMessage outgoingMessage : outgoingMessages) { + if (STATE.remainingFailingOutgoingMessages.remove(outgoingMessage)) { + throw new RuntimeException("Simulating failure for " + outgoingMessage); + } checkState(STATE.remainingExpectedOutgoingMessages.remove(outgoingMessage), "Unexpected outgoing message %s", outgoingMessage); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java index b4ef785a5b04..bf70e474d918 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.io; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.PubsubUnboundedSink.RecordIdMethod; import org.apache.beam.sdk.testing.CoderProperties; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; @@ -31,7 +32,7 @@ import org.apache.beam.sdk.util.PubsubTestClient.PubsubTestClientFactory; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Sets; +import com.google.common.hash.Hashing; import org.joda.time.Duration; import org.joda.time.Instant; @@ -41,9 +42,7 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.HashSet; import java.util.List; -import java.util.Set; /** * Test PubsubUnboundedSink. @@ -55,6 +54,7 @@ public class PubsubUnboundedSinkTest { private static final long TIMESTAMP = 1234L; private static final String TIMESTAMP_LABEL = "timestamp"; private static final String ID_LABEL = "id"; + private static final int NUM_SHARDS = 10; private static class Stamp extends DoFn { @Override @@ -63,22 +63,30 @@ public void processElement(ProcessContext c) { } } + private String getRecordId(String data) { + return Hashing.murmur3_128().hashBytes(data.getBytes()).toString(); + } + @Test public void saneCoder() throws Exception { - OutgoingMessage message = new OutgoingMessage(DATA.getBytes(), TIMESTAMP); + OutgoingMessage message = new OutgoingMessage(DATA.getBytes(), TIMESTAMP, getRecordId(DATA)); CoderProperties.coderDecodeEncodeEqual(PubsubUnboundedSink.CODER, message); CoderProperties.coderSerializable(PubsubUnboundedSink.CODER); } @Test public void sendOneMessage() throws IOException { - Set outgoing = - Sets.newHashSet(new OutgoingMessage(DATA.getBytes(), TIMESTAMP)); + List outgoing = + ImmutableList.of(new OutgoingMessage(DATA.getBytes(), TIMESTAMP, getRecordId(DATA))); + int batchSize = 1; + int batchBytes = 1; try (PubsubTestClientFactory factory = - PubsubTestClient.createFactoryForPublish(TOPIC, outgoing)) { + PubsubTestClient.createFactoryForPublish(TOPIC, outgoing, + ImmutableList.of())) { PubsubUnboundedSink sink = new PubsubUnboundedSink<>(factory, TOPIC, StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL, - 10); + NUM_SHARDS, batchSize, batchBytes, Duration.standardSeconds(2), + RecordIdMethod.DETERMINISTIC); TestPipeline p = TestPipeline.create(); p.apply(Create.of(ImmutableList.of(DATA))) .apply(ParDo.of(new Stamp())) @@ -91,20 +99,22 @@ public void sendOneMessage() throws IOException { @Test public void sendMoreThanOneBatchByNumMessages() throws IOException { - Set outgoing = new HashSet<>(); + List outgoing = new ArrayList<>(); List data = new ArrayList<>(); int batchSize = 2; int batchBytes = 1000; for (int i = 0; i < batchSize * 10; i++) { String str = String.valueOf(i); - outgoing.add(new OutgoingMessage(str.getBytes(), TIMESTAMP)); + outgoing.add(new OutgoingMessage(str.getBytes(), TIMESTAMP, getRecordId(str))); data.add(str); } try (PubsubTestClientFactory factory = - PubsubTestClient.createFactoryForPublish(TOPIC, outgoing)) { + PubsubTestClient.createFactoryForPublish(TOPIC, outgoing, + ImmutableList.of())) { PubsubUnboundedSink sink = new PubsubUnboundedSink<>(factory, TOPIC, StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL, - 10, batchSize, batchBytes, Duration.standardSeconds(2)); + NUM_SHARDS, batchSize, batchBytes, Duration.standardSeconds(2), + RecordIdMethod.DETERMINISTIC); TestPipeline p = TestPipeline.create(); p.apply(Create.of(data)) .apply(ParDo.of(new Stamp())) @@ -117,7 +127,7 @@ public void sendMoreThanOneBatchByNumMessages() throws IOException { @Test public void sendMoreThanOneBatchByByteSize() throws IOException { - Set outgoing = new HashSet<>(); + List outgoing = new ArrayList<>(); List data = new ArrayList<>(); int batchSize = 100; int batchBytes = 10; @@ -128,15 +138,17 @@ public void sendMoreThanOneBatchByByteSize() throws IOException { sb.append(String.valueOf(n)); } String str = sb.toString(); - outgoing.add(new OutgoingMessage(str.getBytes(), TIMESTAMP)); + outgoing.add(new OutgoingMessage(str.getBytes(), TIMESTAMP, getRecordId(str))); data.add(str); n += str.length(); } try (PubsubTestClientFactory factory = - PubsubTestClient.createFactoryForPublish(TOPIC, outgoing)) { + PubsubTestClient.createFactoryForPublish(TOPIC, outgoing, + ImmutableList.of())) { PubsubUnboundedSink sink = new PubsubUnboundedSink<>(factory, TOPIC, StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL, - 10, batchSize, batchBytes, Duration.standardSeconds(2)); + NUM_SHARDS, batchSize, batchBytes, Duration.standardSeconds(2), + RecordIdMethod.DETERMINISTIC); TestPipeline p = TestPipeline.create(); p.apply(Create.of(data)) .apply(ParDo.of(new Stamp())) @@ -146,4 +158,8 @@ public void sendMoreThanOneBatchByByteSize() throws IOException { // The PubsubTestClientFactory will assert fail on close if the actual published // message does not match the expected publish message. } + + // TODO: We would like to test that failed Pubsub publish calls cause the already assigned + // (and random) record ids to be reused. However that can't be done without the test runnner + // supporting retrying bundles. } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java index b265d18dee70..3b0a1c8c00f2 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java @@ -86,14 +86,14 @@ public long currentTimeMillis() { }; factory = PubsubTestClient.createFactoryForPull(clock, SUBSCRIPTION, ACK_TIMEOUT_S, incoming); PubsubUnboundedSource source = - new PubsubUnboundedSource<>(clock, factory, SUBSCRIPTION, StringUtf8Coder.of(), + new PubsubUnboundedSource<>(clock, factory, null, null, SUBSCRIPTION, StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL); primSource = new PubsubSource<>(source); } private void setupOneMessage() { setupOneMessage(ImmutableList.of( - new IncomingMessage(DATA.getBytes(), TIMESTAMP, 0, ACK_ID, RECORD_ID.getBytes()))); + new IncomingMessage(DATA.getBytes(), TIMESTAMP, 0, ACK_ID, RECORD_ID))); } @After @@ -211,7 +211,7 @@ public void multipleReaders() throws IOException { for (int i = 0; i < 2; i++) { String data = String.format("data_%d", i); String ackid = String.format("ackid_%d", i); - incoming.add(new IncomingMessage(data.getBytes(), TIMESTAMP, 0, ackid, RECORD_ID.getBytes())); + incoming.add(new IncomingMessage(data.getBytes(), TIMESTAMP, 0, ackid, RECORD_ID)); } setupOneMessage(incoming); TestPipeline p = TestPipeline.create(); @@ -272,7 +272,7 @@ public void readManyMessages() throws IOException { String recid = String.format("recordid_%d", messageNum); String ackId = String.format("ackid_%d", messageNum); incoming.add(new IncomingMessage(data.getBytes(), messageNumToTimestamp(messageNum), 0, - ackId, recid.getBytes())); + ackId, recid)); } setupOneMessage(incoming); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubApiaryClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubApiaryClientTest.java index 40c31fb5ac03..0f3a7bb506dc 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubApiaryClientTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubApiaryClientTest.java @@ -34,7 +34,6 @@ import com.google.api.services.pubsub.model.ReceivedMessage; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.hash.Hashing; import org.junit.After; import org.junit.Before; @@ -61,8 +60,7 @@ public class PubsubApiaryClientTest { private static final String ID_LABEL = "id"; private static final String MESSAGE_ID = "testMessageId"; private static final String DATA = "testData"; - private static final String CUSTOM_ID = - Hashing.murmur3_128().hashBytes(DATA.getBytes()).toString(); + private static final String RECORD_ID = "testRecordId"; private static final String ACK_ID = "testAckId"; @Before @@ -89,7 +87,7 @@ public void pullOneMessage() throws IOException { .setPublishTime(String.valueOf(PUB_TIME)) .setAttributes( ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME), - ID_LABEL, CUSTOM_ID)); + ID_LABEL, RECORD_ID)); ReceivedMessage expectedReceivedMessage = new ReceivedMessage().setMessage(expectedPubsubMessage) .setAckId(ACK_ID); @@ -105,7 +103,7 @@ public void pullOneMessage() throws IOException { IncomingMessage actualMessage = acutalMessages.get(0); assertEquals(ACK_ID, actualMessage.ackId); assertEquals(DATA, new String(actualMessage.elementBytes)); - assertEquals(CUSTOM_ID, new String(actualMessage.recordId)); + assertEquals(RECORD_ID, actualMessage.recordId); assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch); assertEquals(MESSAGE_TIME, actualMessage.timestampMsSinceEpoch); } @@ -117,7 +115,7 @@ public void publishOneMessage() throws IOException { .encodeData(DATA.getBytes()) .setAttributes( ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME), - ID_LABEL, CUSTOM_ID)); + ID_LABEL, RECORD_ID)); PublishRequest expectedRequest = new PublishRequest() .setMessages(ImmutableList.of(expectedPubsubMessage)); PublishResponse expectedResponse = new PublishResponse() @@ -127,7 +125,7 @@ public void publishOneMessage() throws IOException { .publish(expectedTopic, expectedRequest) .execute()) .thenReturn(expectedResponse); - OutgoingMessage actualMessage = new OutgoingMessage(DATA.getBytes(), MESSAGE_TIME); + OutgoingMessage actualMessage = new OutgoingMessage(DATA.getBytes(), MESSAGE_TIME, RECORD_ID); int n = client.publish(TOPIC, ImmutableList.of(actualMessage)); assertEquals(1, n); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java index 189049c07ea4..71ee27c86aae 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java @@ -28,7 +28,6 @@ import com.google.auth.oauth2.GoogleCredentials; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.hash.Hashing; import com.google.protobuf.ByteString; import com.google.protobuf.Timestamp; import com.google.pubsub.v1.PublishRequest; @@ -70,8 +69,7 @@ public class PubsubGrpcClientTest { private static final String ID_LABEL = "id"; private static final String MESSAGE_ID = "testMessageId"; private static final String DATA = "testData"; - private static final String CUSTOM_ID = - Hashing.murmur3_128().hashBytes(DATA.getBytes()).toString(); + private static final String RECORD_ID = "testRecordId"; private static final String ACK_ID = "testAckId"; @Before @@ -118,7 +116,7 @@ public void pullOneMessage() throws IOException { .putAllAttributes( ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME), - ID_LABEL, CUSTOM_ID)) + ID_LABEL, RECORD_ID)) .build(); ReceivedMessage expectedReceivedMessage = ReceivedMessage.newBuilder() @@ -136,7 +134,7 @@ public void pullOneMessage() throws IOException { IncomingMessage actualMessage = acutalMessages.get(0); assertEquals(ACK_ID, actualMessage.ackId); assertEquals(DATA, new String(actualMessage.elementBytes)); - assertEquals(CUSTOM_ID, new String(actualMessage.recordId)); + assertEquals(RECORD_ID, actualMessage.recordId); assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch); assertEquals(MESSAGE_TIME, actualMessage.timestampMsSinceEpoch); } @@ -149,7 +147,7 @@ public void publishOneMessage() throws IOException { .setData(ByteString.copyFrom(DATA.getBytes())) .putAllAttributes( ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME), - ID_LABEL, CUSTOM_ID)) + ID_LABEL, RECORD_ID)) .build(); PublishRequest expectedRequest = PublishRequest.newBuilder() @@ -163,7 +161,7 @@ public void publishOneMessage() throws IOException { .build(); Mockito.when(mockPublisherStub.publish(expectedRequest)) .thenReturn(expectedResponse); - OutgoingMessage actualMessage = new OutgoingMessage(DATA.getBytes(), MESSAGE_TIME); + OutgoingMessage actualMessage = new OutgoingMessage(DATA.getBytes(), MESSAGE_TIME, RECORD_ID); int n = client.publish(TOPIC, ImmutableList.of(actualMessage)); assertEquals(1, n); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java index fedc8bf57ac8..d788f1070cec 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java @@ -61,7 +61,7 @@ public long currentTimeMillis() { } }; IncomingMessage expectedIncomingMessage = - new IncomingMessage(DATA.getBytes(), MESSAGE_TIME, REQ_TIME, ACK_ID, MESSAGE_ID.getBytes()); + new IncomingMessage(DATA.getBytes(), MESSAGE_TIME, REQ_TIME, ACK_ID, MESSAGE_ID); try (PubsubTestClientFactory factory = PubsubTestClient.createFactoryForPull(clock, SUBSCRIPTION, ACK_TIMEOUT_S, Lists.newArrayList(expectedIncomingMessage))) { @@ -99,9 +99,13 @@ public long currentTimeMillis() { @Test public void publishOneMessage() throws IOException { - OutgoingMessage expectedOutgoingMessage = new OutgoingMessage(DATA.getBytes(), MESSAGE_TIME); - try (PubsubTestClientFactory factory = PubsubTestClient.createFactoryForPublish(TOPIC, Sets - .newHashSet(expectedOutgoingMessage))) { + OutgoingMessage expectedOutgoingMessage = + new OutgoingMessage(DATA.getBytes(), MESSAGE_TIME, MESSAGE_ID); + try (PubsubTestClientFactory factory = + PubsubTestClient.createFactoryForPublish( + TOPIC, + Sets.newHashSet(expectedOutgoingMessage), + ImmutableList.of())) { try (PubsubTestClient client = (PubsubTestClient) factory.newClient(null, null, null)) { client.publish(TOPIC, ImmutableList.of(expectedOutgoingMessage)); }