diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java new file mode 100644 index 000000000000..6d08a7072a97 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io; + +import static com.google.common.base.Preconditions.checkState; + +import org.apache.beam.sdk.coders.BigEndianLongCoder; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.options.PubsubOptions; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.DisplayData.Builder; +import org.apache.beam.sdk.transforms.windowing.AfterFirst; +import org.apache.beam.sdk.transforms.windowing.AfterPane; +import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.Repeatedly; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.util.PubsubClient; +import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage; +import org.apache.beam.sdk.util.PubsubClient.PubsubClientFactory; +import org.apache.beam.sdk.util.PubsubClient.TopicPath; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; + +import com.google.common.annotations.VisibleForTesting; + +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +import javax.annotation.Nullable; + +/** + * A PTransform which streams messages to Pubsub. + * + */ +public class PubsubUnboundedSink extends PTransform, PDone> { + private static final Logger LOG = LoggerFactory.getLogger(PubsubUnboundedSink.class); + + /** + * Default maximum number of messages per publish. + */ + private static final int DEFAULT_PUBLISH_BATCH_SIZE = 1000; + + /** + * Default maximum size of a publish batch, in bytes. + */ + private static final int DEFAULT_PUBLISH_BATCH_BYTES = 400000; + + /** + * Default longest delay between receiving a message and pushing it to Pubsub. + */ + private static final Duration DEFAULT_MAX_LATENCY = Duration.standardSeconds(2); + + /** + * Coder for conveying outgoing messages between internal stages. + */ + private static class OutgoingMessageCoder extends CustomCoder { + @Override + public void encode( + OutgoingMessage value, OutputStream outStream, Context context) + throws CoderException, IOException { + ByteArrayCoder.of().encode(value.elementBytes, outStream, Context.NESTED); + BigEndianLongCoder.of().encode(value.timestampMsSinceEpoch, outStream, Context.NESTED); + } + + @Override + public OutgoingMessage decode( + InputStream inStream, Context context) throws CoderException, IOException { + byte[] elementBytes = ByteArrayCoder.of().decode(inStream, Context.NESTED); + long timestampMsSinceEpoch = BigEndianLongCoder.of().decode(inStream, Context.NESTED); + return new OutgoingMessage(elementBytes, timestampMsSinceEpoch); + } + } + + @VisibleForTesting + static final Coder CODER = new OutgoingMessageCoder(); + + // ================================================================================ + // ShardFn + // ================================================================================ + + /** + * Convert elements to messages and shard them. + */ + private static class ShardFn extends DoFn> { + private final Aggregator elementCounter = + createAggregator("elements", new Sum.SumLongFn()); + private final Coder elementCoder; + private final int numShards; + + ShardFn(Coder elementCoder, int numShards) { + this.elementCoder = elementCoder; + this.numShards = numShards; + } + + @Override + public void processElement(ProcessContext c) throws Exception { + elementCounter.addValue(1L); + byte[] elementBytes = CoderUtils.encodeToByteArray(elementCoder, c.element()); + long timestampMsSinceEpoch = c.timestamp().getMillis(); + c.output(KV.of(ThreadLocalRandom.current().nextInt(numShards), + new OutgoingMessage(elementBytes, timestampMsSinceEpoch))); + } + + @Override + public void populateDisplayData(Builder builder) { + super.populateDisplayData(builder); + builder.add(DisplayData.item("numShards", numShards)); + } + } + + // ================================================================================ + // WriterFn + // ================================================================================ + + /** + * Publish messages to Pubsub in batches. + */ + private static class WriterFn + extends DoFn>, Void> { + private final PubsubClientFactory pubsubFactory; + private final TopicPath topic; + private final String timestampLabel; + private final String idLabel; + private final int publishBatchSize; + private final int publishBatchBytes; + + /** + * Client on which to talk to Pubsub. Null until created by {@link #startBundle}. + */ + @Nullable + private transient PubsubClient pubsubClient; + + private final Aggregator batchCounter = + createAggregator("batches", new Sum.SumLongFn()); + private final Aggregator elementCounter = + createAggregator("elements", new Sum.SumLongFn()); + private final Aggregator byteCounter = + createAggregator("bytes", new Sum.SumLongFn()); + + WriterFn( + PubsubClientFactory pubsubFactory, TopicPath topic, String timestampLabel, + String idLabel, int publishBatchSize, int publishBatchBytes) { + this.pubsubFactory = pubsubFactory; + this.topic = topic; + this.timestampLabel = timestampLabel; + this.idLabel = idLabel; + this.publishBatchSize = publishBatchSize; + this.publishBatchBytes = publishBatchBytes; + } + + /** + * BLOCKING + * Send {@code messages} as a batch to Pubsub. + */ + private void publishBatch(List messages, int bytes) + throws IOException { + long nowMsSinceEpoch = System.currentTimeMillis(); + int n = pubsubClient.publish(topic, messages); + checkState(n == messages.size(), "Attempted to publish %d messages but %d were successful", + messages.size(), n); + batchCounter.addValue(1L); + elementCounter.addValue((long) messages.size()); + byteCounter.addValue((long) bytes); + } + + @Override + public void startBundle(Context c) throws Exception { + checkState(pubsubClient == null, "startBundle invoked without prior finishBundle"); + pubsubClient = pubsubFactory.newClient(timestampLabel, idLabel, + c.getPipelineOptions().as(PubsubOptions.class)); + } + + @Override + public void processElement(ProcessContext c) throws Exception { + List pubsubMessages = new ArrayList<>(publishBatchSize); + int bytes = 0; + for (OutgoingMessage message : c.element().getValue()) { + if (!pubsubMessages.isEmpty() + && bytes + message.elementBytes.length > publishBatchBytes) { + // Break large (in bytes) batches into smaller. + // (We've already broken by batch size using the trigger below, though that may + // run slightly over the actual PUBLISH_BATCH_SIZE. We'll consider that ok since + // the hard limit from Pubsub is by bytes rather than number of messages.) + // BLOCKS until published. + publishBatch(pubsubMessages, bytes); + pubsubMessages.clear(); + bytes = 0; + } + pubsubMessages.add(message); + bytes += message.elementBytes.length; + } + if (!pubsubMessages.isEmpty()) { + // BLOCKS until published. + publishBatch(pubsubMessages, bytes); + } + } + + @Override + public void finishBundle(Context c) throws Exception { + pubsubClient.close(); + pubsubClient = null; + } + + @Override + public void populateDisplayData(Builder builder) { + super.populateDisplayData(builder); + builder.add(DisplayData.item("topic", topic.getPath())); + builder.add(DisplayData.item("transport", pubsubFactory.getKind())); + builder.addIfNotNull(DisplayData.item("timestampLabel", timestampLabel)); + builder.addIfNotNull(DisplayData.item("idLabel", idLabel)); + } + } + + // ================================================================================ + // PubsubUnboundedSink + // ================================================================================ + + /** + * Which factory to use for creating Pubsub transport. + */ + private final PubsubClientFactory pubsubFactory; + + /** + * Pubsub topic to publish to. + */ + private final TopicPath topic; + + /** + * Coder for elements. It is the responsibility of the underlying Pubsub transport to + * re-encode element bytes if necessary, eg as Base64 strings. + */ + private final Coder elementCoder; + + /** + * Pubsub metadata field holding timestamp of each element, or {@literal null} if should use + * Pubsub message publish timestamp instead. + */ + @Nullable + private final String timestampLabel; + + /** + * Pubsub metadata field holding id for each element, or {@literal null} if need to generate + * a unique id ourselves. + */ + @Nullable + private final String idLabel; + + /** + * Number of 'shards' to use so that latency in Pubsub publish can be hidden. Generally this + * should be a small multiple of the number of available cores. Too smoll a number results + * in too much time lost to blocking Pubsub calls. To large a number results in too many + * single-element batches being sent to Pubsub with high per-batch overhead. + */ + private final int numShards; + + /** + * Maximum number of messages per publish. + */ + private final int publishBatchSize; + + /** + * Maximum size of a publish batch, in bytes. + */ + private final int publishBatchBytes; + + /** + * Longest delay between receiving a message and pushing it to Pubsub. + */ + private final Duration maxLatency; + + @VisibleForTesting + PubsubUnboundedSink( + PubsubClientFactory pubsubFactory, + TopicPath topic, + Coder elementCoder, + String timestampLabel, + String idLabel, + int numShards, + int publishBatchSize, + int publishBatchBytes, + Duration maxLatency) { + this.pubsubFactory = pubsubFactory; + this.topic = topic; + this.elementCoder = elementCoder; + this.timestampLabel = timestampLabel; + this.idLabel = idLabel; + this.numShards = numShards; + this.publishBatchSize = publishBatchSize; + this.publishBatchBytes = publishBatchBytes; + this.maxLatency = maxLatency; + } + + public PubsubUnboundedSink( + PubsubClientFactory pubsubFactory, + TopicPath topic, + Coder elementCoder, + String timestampLabel, + String idLabel, + int numShards) { + this(pubsubFactory, topic, elementCoder, timestampLabel, idLabel, numShards, + DEFAULT_PUBLISH_BATCH_SIZE, DEFAULT_PUBLISH_BATCH_BYTES, DEFAULT_MAX_LATENCY); + } + + public TopicPath getTopic() { + return topic; + } + + @Nullable + public String getTimestampLabel() { + return timestampLabel; + } + + @Nullable + public String getIdLabel() { + return idLabel; + } + + public Coder getElementCoder() { + return elementCoder; + } + + @Override + public PDone apply(PCollection input) { + input.apply( + Window.named("PubsubUnboundedSink.Window") + .into(new GlobalWindows()) + .triggering( + Repeatedly.forever( + AfterFirst.of(AfterPane.elementCountAtLeast(publishBatchSize), + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(maxLatency)))) + .discardingFiredPanes()) + .apply(ParDo.named("PubsubUnboundedSink.Shard") + .of(new ShardFn(elementCoder, numShards))) + .setCoder(KvCoder.of(VarIntCoder.of(), CODER)) + .apply(GroupByKey.create()) + .apply(ParDo.named("PubsubUnboundedSink.Writer") + .of(new WriterFn(pubsubFactory, topic, timestampLabel, idLabel, + publishBatchSize, publishBatchBytes))); + return PDone.in(input.getPipeline()); + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java index f0a90968f510..29d0fd567df3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java @@ -55,7 +55,7 @@ */ public class PubsubApiaryClient extends PubsubClient { - public static final PubsubClientFactory FACTORY = new PubsubClientFactory() { + private static class PubsubApiaryClientFactory implements PubsubClientFactory { @Override public PubsubClient newClient( @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options) @@ -73,7 +73,17 @@ public PubsubClient newClient( .build(); return new PubsubApiaryClient(timestampLabel, idLabel, pubsub); } - }; + + @Override + public String getKind() { + return "Apiary"; + } + } + + /** + * Factory for creating Pubsub clients using Apiary transport. + */ + public static final PubsubClientFactory FACTORY = new PubsubApiaryClientFactory(); /** * Label to use for custom timestamps, or {@literal null} if should use Pubsub publish time diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java index a44329d9f1c6..9c75003f4bbf 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java @@ -54,6 +54,11 @@ PubsubClient newClient( @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options) throws IOException; + + /** + * Return the display name for this factory. Eg "Apiary", "gRPC". + */ + String getKind(); } /** @@ -205,7 +210,7 @@ public String toString() { } public static SubscriptionPath subscriptionPathFromPath(String path) { - return new SubscriptionPath(path); + return new SubscriptionPath(path); } public static SubscriptionPath subscriptionPathFromName( @@ -286,6 +291,12 @@ public OutgoingMessage(byte[] elementBytes, long timestampMsSinceEpoch) { this.timestampMsSinceEpoch = timestampMsSinceEpoch; } + @Override + public String toString() { + return String.format("OutgoingMessage(%db, %dms)", + elementBytes.length, timestampMsSinceEpoch); + } + @Override public boolean equals(Object o) { if (this == o) { @@ -360,6 +371,12 @@ public IncomingMessage withRequestTime(long requestTimeMsSinceEpoch) { ackId, recordId); } + @Override + public String toString() { + return String.format("IncomingMessage(%db, %dms)", + elementBytes.length, timestampMsSinceEpoch); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java index b3c1b8f525bc..bb535aa4d857 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java @@ -75,36 +75,47 @@ public class PubsubGrpcClient extends PubsubClient { private static final String PUBSUB_ADDRESS = "pubsub.googleapis.com"; private static final int PUBSUB_PORT = 443; + // Will be needed when credentials are correctly constructed and scoped. + @SuppressWarnings("unused") private static final List PUBSUB_SCOPES = Collections.singletonList("https://www.googleapis.com/auth/pubsub"); private static final int LIST_BATCH_SIZE = 1000; private static final int DEFAULT_TIMEOUT_S = 15; - public static final PubsubClientFactory FACTORY = - new PubsubClientFactory() { - @Override - public PubsubClient newClient( - @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options) - throws IOException { - ManagedChannel channel = NettyChannelBuilder - .forAddress(PUBSUB_ADDRESS, PUBSUB_PORT) - .negotiationType(NegotiationType.TLS) - .sslContext(GrpcSslContexts.forClient().ciphers(null).build()) - .build(); - // TODO: GcpOptions needs to support building com.google.auth.oauth2.Credentials from the - // various command line options. It currently only supports the older - // com.google.api.client.auth.oauth2.Credentials. - GoogleCredentials credentials = GoogleCredentials.getApplicationDefault(); - return new PubsubGrpcClient(timestampLabel, - idLabel, - DEFAULT_TIMEOUT_S, - channel, - credentials, - null /* publisher stub */, - null /* subscriber stub */); - } - }; + private static class PubsubGrpcClientFactory implements PubsubClientFactory { + @Override + public PubsubClient newClient( + @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options) + throws IOException { + ManagedChannel channel = NettyChannelBuilder + .forAddress(PUBSUB_ADDRESS, PUBSUB_PORT) + .negotiationType(NegotiationType.TLS) + .sslContext(GrpcSslContexts.forClient().ciphers(null).build()) + .build(); + // TODO: GcpOptions needs to support building com.google.auth.oauth2.Credentials from the + // various command line options. It currently only supports the older + // com.google.api.client.auth.oauth2.Credentials. + GoogleCredentials credentials = GoogleCredentials.getApplicationDefault(); + return new PubsubGrpcClient(timestampLabel, + idLabel, + DEFAULT_TIMEOUT_S, + channel, + credentials, + null /* publisher stub */, + null /* subscriber stub */); + } + + @Override + public String getKind() { + return "Grpc"; + } + } + + /** + * Factory for creating Pubsub clients using gRCP transport. + */ + public static final PubsubClientFactory FACTORY = new PubsubGrpcClientFactory(); /** * Timeout for grpc calls (in s). diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java index 4a47c3070776..9c3dd851616f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java @@ -51,6 +51,11 @@ public PubsubClient newClient( throws IOException { return new PubsubTestClient(expectedTopic, null, 0, expectedOutgoingMessages, null); } + + @Override + public String getKind() { + return "PublishTest"; + } }; } @@ -66,6 +71,11 @@ public PubsubClient newClient( return new PubsubTestClient(null, expectedSubscription, ackTimeoutSec, null, expectedIncomingMessages); } + + @Override + public String getKind() { + return "PullTest"; + } }; } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java new file mode 100644 index 000000000000..2cb9a6530ba7 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io; + +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.testing.CoderProperties; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.util.PubsubClient; +import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage; +import org.apache.beam.sdk.util.PubsubClient.PubsubClientFactory; +import org.apache.beam.sdk.util.PubsubClient.TopicPath; +import org.apache.beam.sdk.util.PubsubTestClient; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Sets; + +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * Test PubsubUnboundedSink. + */ +@RunWith(JUnit4.class) +public class PubsubUnboundedSinkTest { + private static final TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic"); + private static final String DATA = "testData"; + private static final long TIMESTAMP = 1234L; + private static final String TIMESTAMP_LABEL = "timestamp"; + private static final String ID_LABEL = "id"; + + private static class Stamp extends DoFn { + @Override + public void processElement(ProcessContext c) { + c.outputWithTimestamp(c.element(), new Instant(TIMESTAMP)); + } + } + + @Test + public void saneCoder() throws Exception { + OutgoingMessage message = new OutgoingMessage(DATA.getBytes(), TIMESTAMP); + CoderProperties.coderDecodeEncodeEqual(PubsubUnboundedSink.CODER, message); + CoderProperties.coderSerializable(PubsubUnboundedSink.CODER); + } + + @Test + public void sendOneMessage() { + Set outgoing = + Sets.newHashSet(new OutgoingMessage(DATA.getBytes(), TIMESTAMP)); + PubsubClientFactory factory = + PubsubTestClient.createFactoryForPublish(TOPIC, outgoing); + PubsubUnboundedSink sink = + new PubsubUnboundedSink<>(factory, TOPIC, StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL, + 10); + TestPipeline p = TestPipeline.create(); + p.apply(Create.of(ImmutableList.of(DATA))) + .apply(ParDo.of(new Stamp())) + .apply(sink); + // Run the pipeline. The PubsubTestClient will assert fail if the actual published + // message does not match the expected publish message. + p.run(); + } + + @Test + public void sendMoreThanOneBatchByNumMessages() { + Set outgoing = new HashSet<>(); + List data = new ArrayList<>(); + int batchSize = 2; + int batchBytes = 1000; + for (int i = 0; i < batchSize * 10; i++) { + String str = String.valueOf(i); + outgoing.add(new OutgoingMessage(str.getBytes(), TIMESTAMP)); + data.add(str); + } + PubsubClientFactory factory = + PubsubTestClient.createFactoryForPublish(TOPIC, outgoing); + PubsubUnboundedSink sink = + new PubsubUnboundedSink<>(factory, TOPIC, StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL, + 10, batchSize, batchBytes, Duration.standardSeconds(2)); + TestPipeline p = TestPipeline.create(); + p.apply(Create.of(data)) + .apply(ParDo.of(new Stamp())) + .apply(sink); + // Run the pipeline. The PubsubTestClient will assert fail if the actual published + // message does not match the expected publish message. + p.run(); + } + + @Test + public void sendMoreThanOneBatchByByteSize() { + Set outgoing = new HashSet<>(); + List data = new ArrayList<>(); + int batchSize = 100; + int batchBytes = 10; + int n = 0; + while (n < batchBytes * 10) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < batchBytes; i++) { + sb.append(String.valueOf(n)); + } + String str = sb.toString(); + outgoing.add(new OutgoingMessage(str.getBytes(), TIMESTAMP)); + data.add(str); + n += str.length(); + } + PubsubClientFactory factory = + PubsubTestClient.createFactoryForPublish(TOPIC, outgoing); + PubsubUnboundedSink sink = + new PubsubUnboundedSink<>(factory, TOPIC, StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL, + 10, batchSize, batchBytes, Duration.standardSeconds(2)); + TestPipeline p = TestPipeline.create(); + p.apply(Create.of(data)) + .apply(ParDo.of(new Stamp())) + .apply(sink); + // Run the pipeline. The PubsubTestClient will assert fail if the actual published + // message does not match the expected publish message. + p.run(); + } +}