From 275d0f0a17d53adca5dd77dc86ec26196d861423 Mon Sep 17 00:00:00 2001 From: Mark Shields Date: Tue, 26 Apr 2016 18:42:00 -0700 Subject: [PATCH 1/6] Add PubsubUnboundedSink and tests --- .../beam/sdk/io/PubsubUnboundedSink.java | 309 ++++++++++++++++++ .../beam/sdk/io/PubsubUnboundedSinkTest.java | 75 +++++ 2 files changed, 384 insertions(+) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java new file mode 100644 index 000000000000..98ffeff6ca4e --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io; + +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.BigEndianLongCoder; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.options.PubsubOptions; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.windowing.AfterFirst; +import org.apache.beam.sdk.transforms.windowing.AfterPane; +import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.Repeatedly; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.util.PubsubClient; +import org.apache.beam.sdk.util.PubsubClient.PubsubClientFactory; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; + +import com.google.common.base.Preconditions; + +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +import javax.annotation.Nullable; + +/** + * A PTransform which streams messages to pub/sub. + * + */ +public class PubsubUnboundedSink extends PTransform, PDone> { + private static final Logger LOG = LoggerFactory.getLogger(PubsubUnboundedSink.class); + + /** + * Maximum number of messages per publish. + */ + private static final int PUBLISH_BATCH_SIZE = 1000; + + /** + * Maximum size of a publish batch, in bytes. + */ + private static final long PUBLISH_BATCH_BYTES = 400000; + + /** + * Longest delay between receiving a message and pushing it to pub/sub. + */ + private static final Duration MAX_LATENCY = Duration.standardSeconds(2); + + /** + * Coder for conveying outgoing messages between internal stages. + */ + private static final Coder CODER = new + AtomicCoder() { + @Override + public void encode( + PubsubClient.OutgoingMessage value, OutputStream outStream, Context context) + throws CoderException, IOException { + ByteArrayCoder.of().encode(value.elementBytes, outStream, Context.NESTED); + BigEndianLongCoder.of().encode(value.timestampMsSinceEpoch, outStream, Context.NESTED); + } + + @Override + public PubsubClient.OutgoingMessage decode( + InputStream inStream, Context context) throws CoderException, IOException { + byte[] elementBytes = ByteArrayCoder.of().decode(inStream, Context.NESTED); + long timestampMsSinceEpoch = BigEndianLongCoder.of().decode(inStream, Context.NESTED); + return new PubsubClient.OutgoingMessage(elementBytes, timestampMsSinceEpoch); + } + }; + + // ================================================================================ + // ShardFv + // ================================================================================ + + /** + * Convert elements to messages and shard them. + */ + private class ShardFn extends DoFn> { + private final Aggregator elementCounter = + createAggregator("elements", new Sum.SumLongFn()); + + @Override + public void processElement(ProcessContext c) throws Exception { + elementCounter.addValue(1L); + byte[] elementBytes = CoderUtils.encodeToByteArray(elementCoder, c.element()); + long timestampMsSinceEpoch = c.timestamp().getMillis(); + c.output(KV.of(ThreadLocalRandom.current().nextInt(numShards), + new PubsubClient.OutgoingMessage(elementBytes, timestampMsSinceEpoch))); + } + } + + // ================================================================================ + // WriterFn + // ================================================================================ + + /** + * Publish messages to pub/sub in batches. + */ + private class WriterFn + extends DoFn>, Void> { + + /** + * Client on which to talk to pub/sub. Null until created by {@link #startBundle}. + */ + @Nullable + private transient PubsubClient pubsubClient; + + private final Aggregator batchCounter = + createAggregator("batches", new Sum.SumLongFn()); + private final Aggregator elementCounter = + createAggregator("elements", new Sum.SumLongFn()); + private final Aggregator byteCounter = + createAggregator("bytes", new Sum.SumLongFn()); + + /** + * BLOCKING + * Send {@code messages} as a batch to pub/sub. + */ + private void publishBatch(List messages, int bytes) + throws IOException { + long nowMsSinceEpoch = System.currentTimeMillis(); + int n = pubsubClient.publish(topic, messages); + Preconditions.checkState(n == messages.size()); + batchCounter.addValue(1L); + elementCounter.addValue((long) messages.size()); + byteCounter.addValue((long) bytes); + } + + @Override + public void startBundle(Context c) throws Exception { + Preconditions.checkState(pubsubClient == null); + pubsubClient = pubsubFactory.newClient(timestampLabel, idLabel, + c.getPipelineOptions().as(PubsubOptions.class)); + super.startBundle(c); + } + + @Override + public void processElement(ProcessContext c) throws Exception { + List pubsubMessages = new ArrayList<>(PUBLISH_BATCH_SIZE); + int bytes = 0; + for (PubsubClient.OutgoingMessage message : c.element().getValue()) { + if (!pubsubMessages.isEmpty() + && bytes + message.elementBytes.length > PUBLISH_BATCH_BYTES) { + // Break large (in bytes) batches into smaller. + // (We've already broken by batch size using the trigger below, though that may + // run slightly over the actual PUBLISH_BATCH_SIZE. There is currently no way to + // trigger on accumulated message size.) + // BLOCKS until published. + publishBatch(pubsubMessages, bytes); + pubsubMessages.clear(); + bytes = 0; + } + pubsubMessages.add(message); + bytes += message.elementBytes.length; + } + if (!pubsubMessages.isEmpty()) { + // BLOCKS until published. + publishBatch(pubsubMessages, bytes); + } + } + + @Override + public void finishBundle(Context c) throws Exception { + pubsubClient.close(); + pubsubClient = null; + super.finishBundle(c); + } + } + + + // ================================================================================ + // PubsubUnboundedSink + // ================================================================================ + + /** + * Which factor to use for creating Pubsub transport. + */ + private final PubsubClientFactory pubsubFactory; + + /** + * Pub/sub topic to publish to. + */ + private final PubsubClient.TopicPath topic; + + /** + * Coder for elements. Elements are effectively double-encoded: first to a byte array + * using this checkpointCoder, then to a base-64 string to conform to pub/sub's payload + * conventions. + */ + private final Coder elementCoder; + + /** + * Pub/sub metadata field holding timestamp of each element, or {@literal null} if should use + * pub/sub message publish timestamp instead. + */ + @Nullable + private final String timestampLabel; + + /** + * Pub/sub metadata field holding id for each element, or {@literal null} if need to generate + * a unique id ourselves. + * CAUTION: Currently ignored. + */ + @Nullable + private final String idLabel; + + /** + * Number of 'shards' to use so that latency in Pubsub publish can be hidden. Generally this + * should be a small multiple of the number of available cores. Too smoll a number results + * in too much time lost to blocking Pubsub calls. To large a number results in too many + * single-element batches being sent to Pubsub with high per-batch overhead. + */ + private final int numShards; + + public PubsubUnboundedSink( + PubsubClientFactory pubsubFactory, + PubsubClient.TopicPath topic, + Coder elementCoder, + String timestampLabel, + String idLabel, + int numShards) { + this.pubsubFactory = pubsubFactory; + this.topic = topic; + this.elementCoder = elementCoder; + this.timestampLabel = timestampLabel; + this.idLabel = idLabel; + this.numShards = numShards; + } + + public PubsubClient.TopicPath getTopic() { + return topic; + } + + @Nullable + public String getTimestampLabel() { + return timestampLabel; + } + + @Nullable + public String getIdLabel() { + return idLabel; + } + + public Coder getElementCoder() { + return elementCoder; + } + + @Override + public PDone apply(PCollection input) { + String label = "PubsubSink(" + topic.getPath().replace("/", ".") + ")"; + input.apply( + Window.into(new GlobalWindows()) + .triggering( + Repeatedly.forever( + AfterFirst.of(AfterPane.elementCountAtLeast(PUBLISH_BATCH_SIZE), + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(MAX_LATENCY)))) + .discardingFiredPanes() + .withAllowedLateness(Duration.ZERO)) + .apply(ParDo.named(label + ".Shard").of(new ShardFn())) + .setCoder(KvCoder.of(VarIntCoder.of(), CODER)) + .apply(GroupByKey.create()) + .apply(ParDo.named(label + ".Writer").of(new WriterFn())); + return PDone.in(input.getPipeline()); + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java new file mode 100644 index 000000000000..34ff52902634 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io; + +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.util.PubsubClient; +import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage; +import org.apache.beam.sdk.util.PubsubClient.PubsubClientFactory; +import org.apache.beam.sdk.util.PubsubClient.TopicPath; +import org.apache.beam.sdk.util.PubsubTestClient; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Sets; + +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.Set; + +/** + * Test PubsubUnboundedSink. + */ +@RunWith(JUnit4.class) +public class PubsubUnboundedSinkTest { + private static final TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic"); + private static final String DATA = "testData"; + private static final long TIMESTAMP = 1234L; + private static final String TIMESTAMP_LABEL = "timestamp"; + private static final String ID_LABEL = "id"; + + private static class Stamp extends DoFn { + @Override + public void processElement(ProcessContext c) { + c.outputWithTimestamp(c.element(), new Instant(TIMESTAMP)); + } + } + + @Test + public void sendOneMessage() { + Set outgoing = + Sets.newHashSet(new OutgoingMessage(DATA.getBytes(), TIMESTAMP)); + PubsubClientFactory factory = + PubsubTestClient.createFactoryForPublish(TOPIC, outgoing); + PubsubUnboundedSink sink = + new PubsubUnboundedSink<>(factory, TOPIC, StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL, + 10); + TestPipeline p = TestPipeline.create(); + p.apply(Create.of(ImmutableList.of(DATA))) + .apply(ParDo.of(new Stamp())) + .apply(sink); + p.run(); + } +} From 915813a9588a44511571f53207056871bcc3c609 Mon Sep 17 00:00:00 2001 From: Mark Shields Date: Wed, 4 May 2016 10:47:22 -0700 Subject: [PATCH 2/6] Quick polish --- .../beam/sdk/io/PubsubUnboundedSink.java | 39 +++++++++++-------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java index 98ffeff6ca4e..71c930649619 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java @@ -61,14 +61,19 @@ import javax.annotation.Nullable; /** - * A PTransform which streams messages to pub/sub. + * A PTransform which streams messages to Pubsub. *
    - *
  • The underlying implementation is just a {@link DoFn} which publishes as a side effect. + *
  • The underlying implementation is just a {@link GroupByKey} followed by a {@link ParDo} which + * publishes as a side effect. (In the future we want to design and switch to a custom + * {@code UnboundedSink} implementation so as to gain access to system watermark and + * end-of-pipeline cleanup.) *
  • We try to send messages in batches while also limiting send latency. *
  • No stats are logged. Rather some counters are used to keep track of elements and batches. - *
  • Though some background threads are used by the underlying netty system all actual pub/sub + *
  • Though some background threads are used by the underlying netty system all actual Pubsub * calls are blocking. We rely on the underlying runner to allow multiple {@link DoFn} instances * to execute concurrently and hide latency. + *
  • A failed work item will cause messages to be resent. Thus we rely on the Pubsub consumer + * to dedup messages. *
*/ public class PubsubUnboundedSink extends PTransform, PDone> { @@ -85,7 +90,7 @@ public class PubsubUnboundedSink extends PTransform, PDone> { private static final long PUBLISH_BATCH_BYTES = 400000; /** - * Longest delay between receiving a message and pushing it to pub/sub. + * Longest delay between receiving a message and pushing it to Pubsub. */ private static final Duration MAX_LATENCY = Duration.standardSeconds(2); @@ -137,13 +142,13 @@ public void processElement(ProcessContext c) throws Exception { // ================================================================================ /** - * Publish messages to pub/sub in batches. + * Publish messages to Pubsub in batches. */ private class WriterFn extends DoFn>, Void> { /** - * Client on which to talk to pub/sub. Null until created by {@link #startBundle}. + * Client on which to talk to Pubsub. Null until created by {@link #startBundle}. */ @Nullable private transient PubsubClient pubsubClient; @@ -157,7 +162,7 @@ private class WriterFn /** * BLOCKING - * Send {@code messages} as a batch to pub/sub. + * Send {@code messages} as a batch to Pubsub. */ private void publishBatch(List messages, int bytes) throws IOException { @@ -221,28 +226,26 @@ public void finishBundle(Context c) throws Exception { private final PubsubClientFactory pubsubFactory; /** - * Pub/sub topic to publish to. + * Pubsub topic to publish to. */ private final PubsubClient.TopicPath topic; /** - * Coder for elements. Elements are effectively double-encoded: first to a byte array - * using this checkpointCoder, then to a base-64 string to conform to pub/sub's payload - * conventions. + * Coder for elements. It is the responsibility of the underlying Pubsub transport to + * re-encode element bytes if necessary, eg as Base64 strings. */ private final Coder elementCoder; /** - * Pub/sub metadata field holding timestamp of each element, or {@literal null} if should use - * pub/sub message publish timestamp instead. + * Pubsub metadata field holding timestamp of each element, or {@literal null} if should use + * Pubsub message publish timestamp instead. */ @Nullable private final String timestampLabel; /** - * Pub/sub metadata field holding id for each element, or {@literal null} if need to generate + * Pubsub metadata field holding id for each element, or {@literal null} if need to generate * a unique id ourselves. - * CAUTION: Currently ignored. */ @Nullable private final String idLabel; @@ -290,9 +293,11 @@ public Coder getElementCoder() { @Override public PDone apply(PCollection input) { - String label = "PubsubSink(" + topic.getPath().replace("/", ".") + ")"; + // TODO: Include topic.getPath() in transform metadata when it is available. + String label = "PubsubSink"; input.apply( - Window.into(new GlobalWindows()) + Window.named(label + ".Window") + .into(new GlobalWindows()) .triggering( Repeatedly.forever( AfterFirst.of(AfterPane.elementCountAtLeast(PUBLISH_BATCH_SIZE), From ef37b4b91cb1a0ab015d592d3a01caffcf39da79 Mon Sep 17 00:00:00 2001 From: Mark Shields Date: Tue, 10 May 2016 12:01:32 -0700 Subject: [PATCH 3/6] Make the factories static classes. --- .../beam/sdk/util/PubsubApiaryClient.java | 11 ++-- .../beam/sdk/util/PubsubGrpcClient.java | 54 ++++++++++--------- 2 files changed, 38 insertions(+), 27 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java index f0a90968f510..1f4fa3a59141 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java @@ -55,11 +55,11 @@ */ public class PubsubApiaryClient extends PubsubClient { - public static final PubsubClientFactory FACTORY = new PubsubClientFactory() { + private static class PubsubApiaryClientFactory implements PubsubClientFactory { @Override public PubsubClient newClient( @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options) - throws IOException { + throws IOException { Pubsub pubsub = new Builder( Transport.getTransport(), Transport.getJsonFactory(), @@ -73,7 +73,12 @@ public PubsubClient newClient( .build(); return new PubsubApiaryClient(timestampLabel, idLabel, pubsub); } - }; + } + + /** + * Factory for creating Pubsub clients using Apiary transport. + */ + public static final PubsubClientFactory FACTORY = new PubsubApiaryClientFactory(); /** * Label to use for custom timestamps, or {@literal null} if should use Pubsub publish time diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java index b3c1b8f525bc..da58480b73ce 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java @@ -75,36 +75,42 @@ public class PubsubGrpcClient extends PubsubClient { private static final String PUBSUB_ADDRESS = "pubsub.googleapis.com"; private static final int PUBSUB_PORT = 443; + // Will be needed when credentials are correctly constructed and scoped. + @SuppressWarnings("unused") private static final List PUBSUB_SCOPES = Collections.singletonList("https://www.googleapis.com/auth/pubsub"); private static final int LIST_BATCH_SIZE = 1000; private static final int DEFAULT_TIMEOUT_S = 15; - public static final PubsubClientFactory FACTORY = - new PubsubClientFactory() { - @Override - public PubsubClient newClient( - @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options) - throws IOException { - ManagedChannel channel = NettyChannelBuilder - .forAddress(PUBSUB_ADDRESS, PUBSUB_PORT) - .negotiationType(NegotiationType.TLS) - .sslContext(GrpcSslContexts.forClient().ciphers(null).build()) - .build(); - // TODO: GcpOptions needs to support building com.google.auth.oauth2.Credentials from the - // various command line options. It currently only supports the older - // com.google.api.client.auth.oauth2.Credentials. - GoogleCredentials credentials = GoogleCredentials.getApplicationDefault(); - return new PubsubGrpcClient(timestampLabel, - idLabel, - DEFAULT_TIMEOUT_S, - channel, - credentials, - null /* publisher stub */, - null /* subscriber stub */); - } - }; + private static class PubsubGrpcClientFactory implements PubsubClientFactory { + @Override + public PubsubClient newClient( + @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options) + throws IOException { + ManagedChannel channel = NettyChannelBuilder + .forAddress(PUBSUB_ADDRESS, PUBSUB_PORT) + .negotiationType(NegotiationType.TLS) + .sslContext(GrpcSslContexts.forClient().ciphers(null).build()) + .build(); + // TODO: GcpOptions needs to support building com.google.auth.oauth2.Credentials from the + // various command line options. It currently only supports the older + // com.google.api.client.auth.oauth2.Credentials. + GoogleCredentials credentials = GoogleCredentials.getApplicationDefault(); + return new PubsubGrpcClient(timestampLabel, + idLabel, + DEFAULT_TIMEOUT_S, + channel, + credentials, + null /* publisher stub */, + null /* subscriber stub */); + } + } + + /** + * Factory for creating Pubsub clients using gRCP transport. + */ + public static final PubsubClientFactory FACTORY = new PubsubGrpcClientFactory(); /** * Timeout for grpc calls (in s). From 12cb6fefbc19af300510e0954229c4ad2198f5fa Mon Sep 17 00:00:00 2001 From: Mark Shields Date: Tue, 10 May 2016 12:01:55 -0700 Subject: [PATCH 4/6] Dan and Ken's comments. Make sure all classes are static. --- .../beam/sdk/io/PubsubUnboundedSink.java | 77 ++++++++++++------- .../beam/sdk/io/PubsubUnboundedSinkTest.java | 8 ++ 2 files changed, 57 insertions(+), 28 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java index 71c930649619..726088be0a2f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java @@ -18,11 +18,11 @@ package org.apache.beam.sdk.io; -import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.BigEndianLongCoder; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.options.PubsubOptions; @@ -40,11 +40,13 @@ import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.PubsubClient; +import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage; import org.apache.beam.sdk.util.PubsubClient.PubsubClientFactory; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.joda.time.Duration; @@ -97,24 +99,26 @@ public class PubsubUnboundedSink extends PTransform, PDone> { /** * Coder for conveying outgoing messages between internal stages. */ - private static final Coder CODER = new - AtomicCoder() { - @Override - public void encode( - PubsubClient.OutgoingMessage value, OutputStream outStream, Context context) - throws CoderException, IOException { - ByteArrayCoder.of().encode(value.elementBytes, outStream, Context.NESTED); - BigEndianLongCoder.of().encode(value.timestampMsSinceEpoch, outStream, Context.NESTED); - } + private static class OutgoingMessageCoder extends CustomCoder { + @Override + public void encode( + PubsubClient.OutgoingMessage value, OutputStream outStream, Context context) + throws CoderException, IOException { + ByteArrayCoder.of().encode(value.elementBytes, outStream, Context.NESTED); + BigEndianLongCoder.of().encode(value.timestampMsSinceEpoch, outStream, Context.NESTED); + } - @Override - public PubsubClient.OutgoingMessage decode( - InputStream inStream, Context context) throws CoderException, IOException { - byte[] elementBytes = ByteArrayCoder.of().decode(inStream, Context.NESTED); - long timestampMsSinceEpoch = BigEndianLongCoder.of().decode(inStream, Context.NESTED); - return new PubsubClient.OutgoingMessage(elementBytes, timestampMsSinceEpoch); - } - }; + @Override + public PubsubClient.OutgoingMessage decode( + InputStream inStream, Context context) throws CoderException, IOException { + byte[] elementBytes = ByteArrayCoder.of().decode(inStream, Context.NESTED); + long timestampMsSinceEpoch = BigEndianLongCoder.of().decode(inStream, Context.NESTED); + return new PubsubClient.OutgoingMessage(elementBytes, timestampMsSinceEpoch); + } + } + + @VisibleForTesting + static final Coder CODER = new OutgoingMessageCoder(); // ================================================================================ // ShardFv @@ -123,9 +127,16 @@ public PubsubClient.OutgoingMessage decode( /** * Convert elements to messages and shard them. */ - private class ShardFn extends DoFn> { + private static class ShardFn extends DoFn> { private final Aggregator elementCounter = createAggregator("elements", new Sum.SumLongFn()); + private final Coder elementCoder; + private final int numShards; + + ShardFn(Coder elementCoder, int numShards) { + this.elementCoder = elementCoder; + this.numShards = numShards; + } @Override public void processElement(ProcessContext c) throws Exception { @@ -144,8 +155,12 @@ public void processElement(ProcessContext c) throws Exception { /** * Publish messages to Pubsub in batches. */ - private class WriterFn + private static class WriterFn extends DoFn>, Void> { + private final PubsubClientFactory pubsubFactory; + private final PubsubClient.TopicPath topic; + private final String timestampLabel; + private final String idLabel; /** * Client on which to talk to Pubsub. Null until created by {@link #startBundle}. @@ -160,6 +175,14 @@ private class WriterFn private final Aggregator byteCounter = createAggregator("bytes", new Sum.SumLongFn()); + WriterFn(PubsubClientFactory pubsubFactory, PubsubClient.TopicPath topic, + String timestampLabel, String idLabel) { + this.pubsubFactory = pubsubFactory; + this.topic = topic; + this.timestampLabel = timestampLabel; + this.idLabel = idLabel; + } + /** * BLOCKING * Send {@code messages} as a batch to Pubsub. @@ -215,13 +238,12 @@ public void finishBundle(Context c) throws Exception { } } - // ================================================================================ // PubsubUnboundedSink // ================================================================================ /** - * Which factor to use for creating Pubsub transport. + * Which factory to use for creating Pubsub transport. */ private final PubsubClientFactory pubsubFactory; @@ -294,21 +316,20 @@ public Coder getElementCoder() { @Override public PDone apply(PCollection input) { // TODO: Include topic.getPath() in transform metadata when it is available. - String label = "PubsubSink"; input.apply( - Window.named(label + ".Window") + Window.named("PubsubSink.Window") .into(new GlobalWindows()) .triggering( Repeatedly.forever( AfterFirst.of(AfterPane.elementCountAtLeast(PUBLISH_BATCH_SIZE), AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(MAX_LATENCY)))) - .discardingFiredPanes() - .withAllowedLateness(Duration.ZERO)) - .apply(ParDo.named(label + ".Shard").of(new ShardFn())) + .discardingFiredPanes()) + .apply(ParDo.named("PubsubSink.Shard").of(new ShardFn(elementCoder, numShards))) .setCoder(KvCoder.of(VarIntCoder.of(), CODER)) .apply(GroupByKey.create()) - .apply(ParDo.named(label + ".Writer").of(new WriterFn())); + .apply(ParDo.named("PubsubSink.Writer").of(new WriterFn(pubsubFactory, topic, + timestampLabel, idLabel))); return PDone.in(input.getPipeline()); } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java index 34ff52902634..493667ca5baf 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.io; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.testing.CoderProperties; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; @@ -57,6 +58,13 @@ public void processElement(ProcessContext c) { } } + @Test + public void saneCoder() throws Exception { + OutgoingMessage message = new OutgoingMessage(DATA.getBytes(), TIMESTAMP); + CoderProperties.coderDecodeEncodeEqual(PubsubUnboundedSink.CODER, message); + CoderProperties.coderSerializable(PubsubUnboundedSink.CODER); + } + @Test public void sendOneMessage() { Set outgoing = From e47ebeb7b9888aee4250f787db638746962551d7 Mon Sep 17 00:00:00 2001 From: Mark Shields Date: Wed, 11 May 2016 15:17:41 -0700 Subject: [PATCH 5/6] Dan's comments. Unit test batching by number and byte size --- .../beam/sdk/io/PubsubUnboundedSink.java | 141 ++++++++++++------ .../beam/sdk/util/PubsubApiaryClient.java | 7 +- .../apache/beam/sdk/util/PubsubClient.java | 20 ++- .../beam/sdk/util/PubsubGrpcClient.java | 7 +- .../beam/sdk/util/PubsubTestClient.java | 10 ++ .../beam/sdk/io/PubsubUnboundedSinkTest.java | 62 ++++++++ 6 files changed, 202 insertions(+), 45 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java index 726088be0a2f..9744850d080c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java @@ -18,6 +18,8 @@ package org.apache.beam.sdk.io; +import static com.google.common.base.Preconditions.checkState; + import org.apache.beam.sdk.coders.BigEndianLongCoder; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; @@ -32,6 +34,8 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.DisplayData.Builder; import org.apache.beam.sdk.transforms.windowing.AfterFirst; import org.apache.beam.sdk.transforms.windowing.AfterPane; import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; @@ -42,12 +46,12 @@ import org.apache.beam.sdk.util.PubsubClient; import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage; import org.apache.beam.sdk.util.PubsubClient.PubsubClientFactory; +import org.apache.beam.sdk.util.PubsubClient.TopicPath; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import org.joda.time.Duration; import org.slf4j.Logger; @@ -74,7 +78,7 @@ *
  • Though some background threads are used by the underlying netty system all actual Pubsub * calls are blocking. We rely on the underlying runner to allow multiple {@link DoFn} instances * to execute concurrently and hide latency. - *
  • A failed work item will cause messages to be resent. Thus we rely on the Pubsub consumer + *
  • A failed bundle will cause messages to be resent. Thus we rely on the Pubsub consumer * to dedup messages. * */ @@ -82,19 +86,19 @@ public class PubsubUnboundedSink extends PTransform, PDone> { private static final Logger LOG = LoggerFactory.getLogger(PubsubUnboundedSink.class); /** - * Maximum number of messages per publish. + * Default maximum number of messages per publish. */ - private static final int PUBLISH_BATCH_SIZE = 1000; + private static final int DEFAULT_PUBLISH_BATCH_SIZE = 1000; /** - * Maximum size of a publish batch, in bytes. + * Default maximum size of a publish batch, in bytes. */ - private static final long PUBLISH_BATCH_BYTES = 400000; + private static final int DEFAULT_PUBLISH_BATCH_BYTES = 400000; /** - * Longest delay between receiving a message and pushing it to Pubsub. + * Default longest delay between receiving a message and pushing it to Pubsub. */ - private static final Duration MAX_LATENCY = Duration.standardSeconds(2); + private static final Duration DEFAULT_MAX_LATENCY = Duration.standardSeconds(2); /** * Coder for conveying outgoing messages between internal stages. @@ -102,32 +106,32 @@ public class PubsubUnboundedSink extends PTransform, PDone> { private static class OutgoingMessageCoder extends CustomCoder { @Override public void encode( - PubsubClient.OutgoingMessage value, OutputStream outStream, Context context) + OutgoingMessage value, OutputStream outStream, Context context) throws CoderException, IOException { ByteArrayCoder.of().encode(value.elementBytes, outStream, Context.NESTED); BigEndianLongCoder.of().encode(value.timestampMsSinceEpoch, outStream, Context.NESTED); } @Override - public PubsubClient.OutgoingMessage decode( + public OutgoingMessage decode( InputStream inStream, Context context) throws CoderException, IOException { byte[] elementBytes = ByteArrayCoder.of().decode(inStream, Context.NESTED); long timestampMsSinceEpoch = BigEndianLongCoder.of().decode(inStream, Context.NESTED); - return new PubsubClient.OutgoingMessage(elementBytes, timestampMsSinceEpoch); + return new OutgoingMessage(elementBytes, timestampMsSinceEpoch); } } @VisibleForTesting - static final Coder CODER = new OutgoingMessageCoder(); + static final Coder CODER = new OutgoingMessageCoder(); // ================================================================================ - // ShardFv + // ShardFn // ================================================================================ /** * Convert elements to messages and shard them. */ - private static class ShardFn extends DoFn> { + private static class ShardFn extends DoFn> { private final Aggregator elementCounter = createAggregator("elements", new Sum.SumLongFn()); private final Coder elementCoder; @@ -144,7 +148,13 @@ public void processElement(ProcessContext c) throws Exception { byte[] elementBytes = CoderUtils.encodeToByteArray(elementCoder, c.element()); long timestampMsSinceEpoch = c.timestamp().getMillis(); c.output(KV.of(ThreadLocalRandom.current().nextInt(numShards), - new PubsubClient.OutgoingMessage(elementBytes, timestampMsSinceEpoch))); + new OutgoingMessage(elementBytes, timestampMsSinceEpoch))); + } + + @Override + public void populateDisplayData(Builder builder) { + super.populateDisplayData(builder); + builder.add(DisplayData.item("numShards", numShards)); } } @@ -156,11 +166,13 @@ public void processElement(ProcessContext c) throws Exception { * Publish messages to Pubsub in batches. */ private static class WriterFn - extends DoFn>, Void> { + extends DoFn>, Void> { private final PubsubClientFactory pubsubFactory; - private final PubsubClient.TopicPath topic; + private final TopicPath topic; private final String timestampLabel; private final String idLabel; + private final int publishBatchSize; + private final int publishBatchBytes; /** * Client on which to talk to Pubsub. Null until created by {@link #startBundle}. @@ -175,23 +187,27 @@ private static class WriterFn private final Aggregator byteCounter = createAggregator("bytes", new Sum.SumLongFn()); - WriterFn(PubsubClientFactory pubsubFactory, PubsubClient.TopicPath topic, - String timestampLabel, String idLabel) { + WriterFn( + PubsubClientFactory pubsubFactory, TopicPath topic, String timestampLabel, + String idLabel, int publishBatchSize, int publishBatchBytes) { this.pubsubFactory = pubsubFactory; this.topic = topic; this.timestampLabel = timestampLabel; this.idLabel = idLabel; + this.publishBatchSize = publishBatchSize; + this.publishBatchBytes = publishBatchBytes; } /** * BLOCKING * Send {@code messages} as a batch to Pubsub. */ - private void publishBatch(List messages, int bytes) + private void publishBatch(List messages, int bytes) throws IOException { long nowMsSinceEpoch = System.currentTimeMillis(); int n = pubsubClient.publish(topic, messages); - Preconditions.checkState(n == messages.size()); + checkState(n == messages.size(), "Attempted to publish %d messaged but %d were successful", + messages.size(), n); batchCounter.addValue(1L); elementCounter.addValue((long) messages.size()); byteCounter.addValue((long) bytes); @@ -199,23 +215,22 @@ private void publishBatch(List messages, int bytes @Override public void startBundle(Context c) throws Exception { - Preconditions.checkState(pubsubClient == null); + checkState(pubsubClient == null, "startBundle invoked without prior finishBundle"); pubsubClient = pubsubFactory.newClient(timestampLabel, idLabel, c.getPipelineOptions().as(PubsubOptions.class)); - super.startBundle(c); } @Override public void processElement(ProcessContext c) throws Exception { - List pubsubMessages = new ArrayList<>(PUBLISH_BATCH_SIZE); + List pubsubMessages = new ArrayList<>(publishBatchSize); int bytes = 0; - for (PubsubClient.OutgoingMessage message : c.element().getValue()) { + for (OutgoingMessage message : c.element().getValue()) { if (!pubsubMessages.isEmpty() - && bytes + message.elementBytes.length > PUBLISH_BATCH_BYTES) { + && bytes + message.elementBytes.length > publishBatchBytes) { // Break large (in bytes) batches into smaller. // (We've already broken by batch size using the trigger below, though that may - // run slightly over the actual PUBLISH_BATCH_SIZE. There is currently no way to - // trigger on accumulated message size.) + // run slightly over the actual PUBLISH_BATCH_SIZE. We'll consider that ok since + // the hard limit from Pubsub is by bytes rather than number of messages.) // BLOCKS until published. publishBatch(pubsubMessages, bytes); pubsubMessages.clear(); @@ -234,7 +249,15 @@ public void processElement(ProcessContext c) throws Exception { public void finishBundle(Context c) throws Exception { pubsubClient.close(); pubsubClient = null; - super.finishBundle(c); + } + + @Override + public void populateDisplayData(Builder builder) { + super.populateDisplayData(builder); + builder.add(DisplayData.item("topic", topic.getPath())); + builder.add(DisplayData.item("transport", pubsubFactory.toString())); + builder.addIfNotNull(DisplayData.item("timestampLabel", timestampLabel)); + builder.addIfNotNull(DisplayData.item("idLabel", idLabel)); } } @@ -250,7 +273,7 @@ public void finishBundle(Context c) throws Exception { /** * Pubsub topic to publish to. */ - private final PubsubClient.TopicPath topic; + private final TopicPath topic; /** * Coder for elements. It is the responsibility of the underlying Pubsub transport to @@ -280,22 +303,55 @@ public void finishBundle(Context c) throws Exception { */ private final int numShards; - public PubsubUnboundedSink( + /** + * Maximum number of messages per publish. + */ + private final int publishBatchSize; + + /** + * Maximum size of a publish batch, in bytes. + */ + private final int publishBatchBytes; + + /** + * Longest delay between receiving a message and pushing it to Pubsub. + */ + private final Duration maxLatency; + + @VisibleForTesting + PubsubUnboundedSink( PubsubClientFactory pubsubFactory, - PubsubClient.TopicPath topic, + TopicPath topic, Coder elementCoder, String timestampLabel, String idLabel, - int numShards) { + int numShards, + int publishBatchSize, + int publishBatchBytes, + Duration maxLatency) { this.pubsubFactory = pubsubFactory; this.topic = topic; this.elementCoder = elementCoder; this.timestampLabel = timestampLabel; this.idLabel = idLabel; this.numShards = numShards; + this.publishBatchSize = publishBatchSize; + this.publishBatchBytes = publishBatchBytes; + this.maxLatency = maxLatency; + } + + public PubsubUnboundedSink( + PubsubClientFactory pubsubFactory, + TopicPath topic, + Coder elementCoder, + String timestampLabel, + String idLabel, + int numShards) { + this(pubsubFactory, topic, elementCoder, timestampLabel, idLabel, numShards, + DEFAULT_PUBLISH_BATCH_SIZE, DEFAULT_PUBLISH_BATCH_BYTES, DEFAULT_MAX_LATENCY); } - public PubsubClient.TopicPath getTopic() { + public TopicPath getTopic() { return topic; } @@ -315,21 +371,22 @@ public Coder getElementCoder() { @Override public PDone apply(PCollection input) { - // TODO: Include topic.getPath() in transform metadata when it is available. input.apply( - Window.named("PubsubSink.Window") + Window.named("PubsubUnboundedSink.Window") .into(new GlobalWindows()) .triggering( Repeatedly.forever( - AfterFirst.of(AfterPane.elementCountAtLeast(PUBLISH_BATCH_SIZE), + AfterFirst.of(AfterPane.elementCountAtLeast(publishBatchSize), AfterProcessingTime.pastFirstElementInPane() - .plusDelayOf(MAX_LATENCY)))) + .plusDelayOf(maxLatency)))) .discardingFiredPanes()) - .apply(ParDo.named("PubsubSink.Shard").of(new ShardFn(elementCoder, numShards))) + .apply(ParDo.named("PubsubUnboundedSink.Shard") + .of(new ShardFn(elementCoder, numShards))) .setCoder(KvCoder.of(VarIntCoder.of(), CODER)) - .apply(GroupByKey.create()) - .apply(ParDo.named("PubsubSink.Writer").of(new WriterFn(pubsubFactory, topic, - timestampLabel, idLabel))); + .apply(GroupByKey.create()) + .apply(ParDo.named("PubsubUnboundedSink.Writer") + .of(new WriterFn(pubsubFactory, topic, timestampLabel, idLabel, + publishBatchSize, publishBatchBytes))); return PDone.in(input.getPipeline()); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java index 1f4fa3a59141..5679ac01f4b4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java @@ -59,7 +59,7 @@ private static class PubsubApiaryClientFactory implements PubsubClientFactory { @Override public PubsubClient newClient( @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options) - throws IOException { + throws IOException { Pubsub pubsub = new Builder( Transport.getTransport(), Transport.getJsonFactory(), @@ -73,6 +73,11 @@ public PubsubClient newClient( .build(); return new PubsubApiaryClient(timestampLabel, idLabel, pubsub); } + + @Override + public String toString() { + return "Apiary"; + } } /** diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java index a44329d9f1c6..62fbc927cd31 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java @@ -54,6 +54,12 @@ PubsubClient newClient( @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options) throws IOException; + + /** + * Return the display name for this factory. + */ + @Override + String toString(); } /** @@ -205,7 +211,7 @@ public String toString() { } public static SubscriptionPath subscriptionPathFromPath(String path) { - return new SubscriptionPath(path); + return new SubscriptionPath(path); } public static SubscriptionPath subscriptionPathFromName( @@ -286,6 +292,12 @@ public OutgoingMessage(byte[] elementBytes, long timestampMsSinceEpoch) { this.timestampMsSinceEpoch = timestampMsSinceEpoch; } + @Override + public String toString() { + return String.format("OutgoingMessage(%db, %dms)", + elementBytes.length, timestampMsSinceEpoch); + } + @Override public boolean equals(Object o) { if (this == o) { @@ -360,6 +372,12 @@ public IncomingMessage withRequestTime(long requestTimeMsSinceEpoch) { ackId, recordId); } + @Override + public String toString() { + return String.format("IncomingMessage(%db, %dms)", + elementBytes.length, timestampMsSinceEpoch); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java index da58480b73ce..07dbf375d963 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java @@ -87,7 +87,7 @@ private static class PubsubGrpcClientFactory implements PubsubClientFactory { @Override public PubsubClient newClient( @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options) - throws IOException { + throws IOException { ManagedChannel channel = NettyChannelBuilder .forAddress(PUBSUB_ADDRESS, PUBSUB_PORT) .negotiationType(NegotiationType.TLS) @@ -105,6 +105,11 @@ public PubsubClient newClient( null /* publisher stub */, null /* subscriber stub */); } + + @Override + public String toString() { + return "Grpc"; + } } /** diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java index 4a47c3070776..afed1e518c35 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java @@ -51,6 +51,11 @@ public PubsubClient newClient( throws IOException { return new PubsubTestClient(expectedTopic, null, 0, expectedOutgoingMessages, null); } + + @Override + public String toString() { + return "PublishTestClient"; + } }; } @@ -66,6 +71,11 @@ public PubsubClient newClient( return new PubsubTestClient(null, expectedSubscription, ackTimeoutSec, null, expectedIncomingMessages); } + + @Override + public String toString() { + return "PullTestClient"; + } }; } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java index 493667ca5baf..2cb9a6530ba7 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java @@ -33,11 +33,15 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; +import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; import java.util.Set; /** @@ -78,6 +82,64 @@ public void sendOneMessage() { p.apply(Create.of(ImmutableList.of(DATA))) .apply(ParDo.of(new Stamp())) .apply(sink); + // Run the pipeline. The PubsubTestClient will assert fail if the actual published + // message does not match the expected publish message. + p.run(); + } + + @Test + public void sendMoreThanOneBatchByNumMessages() { + Set outgoing = new HashSet<>(); + List data = new ArrayList<>(); + int batchSize = 2; + int batchBytes = 1000; + for (int i = 0; i < batchSize * 10; i++) { + String str = String.valueOf(i); + outgoing.add(new OutgoingMessage(str.getBytes(), TIMESTAMP)); + data.add(str); + } + PubsubClientFactory factory = + PubsubTestClient.createFactoryForPublish(TOPIC, outgoing); + PubsubUnboundedSink sink = + new PubsubUnboundedSink<>(factory, TOPIC, StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL, + 10, batchSize, batchBytes, Duration.standardSeconds(2)); + TestPipeline p = TestPipeline.create(); + p.apply(Create.of(data)) + .apply(ParDo.of(new Stamp())) + .apply(sink); + // Run the pipeline. The PubsubTestClient will assert fail if the actual published + // message does not match the expected publish message. + p.run(); + } + + @Test + public void sendMoreThanOneBatchByByteSize() { + Set outgoing = new HashSet<>(); + List data = new ArrayList<>(); + int batchSize = 100; + int batchBytes = 10; + int n = 0; + while (n < batchBytes * 10) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < batchBytes; i++) { + sb.append(String.valueOf(n)); + } + String str = sb.toString(); + outgoing.add(new OutgoingMessage(str.getBytes(), TIMESTAMP)); + data.add(str); + n += str.length(); + } + PubsubClientFactory factory = + PubsubTestClient.createFactoryForPublish(TOPIC, outgoing); + PubsubUnboundedSink sink = + new PubsubUnboundedSink<>(factory, TOPIC, StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL, + 10, batchSize, batchBytes, Duration.standardSeconds(2)); + TestPipeline p = TestPipeline.create(); + p.apply(Create.of(data)) + .apply(ParDo.of(new Stamp())) + .apply(sink); + // Run the pipeline. The PubsubTestClient will assert fail if the actual published + // message does not match the expected publish message. p.run(); } } From 128b7e877c515cab4a9b6da16fbf7b91a39a9486 Mon Sep 17 00:00:00 2001 From: Mark Shields Date: Wed, 11 May 2016 17:02:11 -0700 Subject: [PATCH 6/6] More Dan comments --- .../java/org/apache/beam/sdk/io/PubsubUnboundedSink.java | 4 ++-- .../java/org/apache/beam/sdk/util/PubsubApiaryClient.java | 2 +- .../main/java/org/apache/beam/sdk/util/PubsubClient.java | 5 ++--- .../java/org/apache/beam/sdk/util/PubsubGrpcClient.java | 2 +- .../java/org/apache/beam/sdk/util/PubsubTestClient.java | 8 ++++---- 5 files changed, 10 insertions(+), 11 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java index 9744850d080c..6d08a7072a97 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java @@ -206,7 +206,7 @@ private void publishBatch(List messages, int bytes) throws IOException { long nowMsSinceEpoch = System.currentTimeMillis(); int n = pubsubClient.publish(topic, messages); - checkState(n == messages.size(), "Attempted to publish %d messaged but %d were successful", + checkState(n == messages.size(), "Attempted to publish %d messages but %d were successful", messages.size(), n); batchCounter.addValue(1L); elementCounter.addValue((long) messages.size()); @@ -255,7 +255,7 @@ public void finishBundle(Context c) throws Exception { public void populateDisplayData(Builder builder) { super.populateDisplayData(builder); builder.add(DisplayData.item("topic", topic.getPath())); - builder.add(DisplayData.item("transport", pubsubFactory.toString())); + builder.add(DisplayData.item("transport", pubsubFactory.getKind())); builder.addIfNotNull(DisplayData.item("timestampLabel", timestampLabel)); builder.addIfNotNull(DisplayData.item("idLabel", idLabel)); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java index 5679ac01f4b4..29d0fd567df3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java @@ -75,7 +75,7 @@ public PubsubClient newClient( } @Override - public String toString() { + public String getKind() { return "Apiary"; } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java index 62fbc927cd31..9c75003f4bbf 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java @@ -56,10 +56,9 @@ PubsubClient newClient( PubsubOptions options) throws IOException; /** - * Return the display name for this factory. + * Return the display name for this factory. Eg "Apiary", "gRPC". */ - @Override - String toString(); + String getKind(); } /** diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java index 07dbf375d963..bb535aa4d857 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java @@ -107,7 +107,7 @@ public PubsubClient newClient( } @Override - public String toString() { + public String getKind() { return "Grpc"; } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java index afed1e518c35..9c3dd851616f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java @@ -53,8 +53,8 @@ public PubsubClient newClient( } @Override - public String toString() { - return "PublishTestClient"; + public String getKind() { + return "PublishTest"; } }; } @@ -73,8 +73,8 @@ public PubsubClient newClient( } @Override - public String toString() { - return "PullTestClient"; + public String getKind() { + return "PullTest"; } }; }