diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index fc73429ee150..6800553480ef 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -20,6 +20,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.ENABLE_CUSTOM_PUBSUB_SINK; import static org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.ENABLE_CUSTOM_PUBSUB_SOURCE; +import static org.apache.beam.sdk.options.ExperimentalOptions.hasExperiment; import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray; import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray; import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString; @@ -2017,6 +2018,16 @@ private static void translate( PubsubUnboundedSink overriddenTransform, StepTranslationContext stepContext, PCollection input) { + if (overriddenTransform.getPublishBatchWithOrderingKey()) { + throw new UnsupportedOperationException( + String.format( + "%s does not currently support publishing with ordering keys. " + + "%s is required to support publishing with ordering keys. " + + "Set the pipeline option --experiments=%s to use this PTransform.", + StreamingPubsubIOWrite.class.getSimpleName(), + PubsubUnboundedSink.class.getSimpleName(), + ENABLE_CUSTOM_PUBSUB_SINK)); + } stepContext.addInput(PropertyNames.FORMAT, "pubsub"); if (overriddenTransform.getTopicProvider() != null) { if (overriddenTransform.getTopicProvider().isAccessible()) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java index 47033451ab89..563ea2f47f5c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java @@ -39,6 +39,7 @@ public class PreparePubsubWriteDoFn extends DoFn private static final int PUBSUB_MESSAGE_ATTRIBUTE_MAX_VALUE_BYTES = 1024; // The amount of bytes that each attribute entry adds up to the request private static final int PUBSUB_MESSAGE_ATTRIBUTE_ENCODE_ADDITIONAL_BYTES = 6; + private boolean allowOrderingKey; private int maxPublishBatchSize; private SerializableFunction, PubsubMessage> formatFunction; @@ -63,6 +64,20 @@ static int validatePubsubMessageSize(PubsubMessage message, int maxPublishBatchS } int totalSize = payloadSize; + @Nullable String orderingKey = message.getOrderingKey(); + if (orderingKey != null) { + int orderingKeySize = orderingKey.getBytes(StandardCharsets.UTF_8).length; + if (orderingKeySize > PUBSUB_MESSAGE_ATTRIBUTE_MAX_VALUE_BYTES) { + throw new SizeLimitExceededException( + "Pubsub message ordering key of length " + + orderingKeySize + + " exceeds maximum of " + + PUBSUB_MESSAGE_ATTRIBUTE_MAX_VALUE_BYTES + + " bytes. See https://cloud.google.com/pubsub/quotas#resource_limits"); + } + totalSize += orderingKeySize; + } + @Nullable Map attributes = message.getAttributeMap(); if (attributes != null) { if (attributes.size() > PUBSUB_MESSAGE_MAX_ATTRIBUTES) { @@ -122,12 +137,14 @@ static int validatePubsubMessageSize(PubsubMessage message, int maxPublishBatchS SerializableFunction, PubsubMessage> formatFunction, @Nullable SerializableFunction, PubsubIO.PubsubTopic> topicFunction, + boolean allowOrderingKey, int maxPublishBatchSize, BadRecordRouter badRecordRouter, Coder inputCoder, TupleTag outputTag) { this.formatFunction = formatFunction; this.topicFunction = topicFunction; + this.allowOrderingKey = allowOrderingKey; this.maxPublishBatchSize = maxPublishBatchSize; this.badRecordRouter = badRecordRouter; this.inputCoder = inputCoder; @@ -165,6 +182,14 @@ public void process( return; } } + if (!allowOrderingKey && message.getOrderingKey() != null) { + badRecordRouter.route( + o, + element, + inputCoder, + null, + "The transform was not configured to publish messages with ordering keys"); + } try { validatePubsubMessageSize(message, maxPublishBatchSize); } catch (SizeLimitExceededException e) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index 17070711a871..b78e3878140a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -724,20 +724,6 @@ public static Write writeMessages() { .build(); } - /** - * Returns A {@link PTransform} that writes {@link PubsubMessage}s, along with the {@link - * PubsubMessage#getMessageId() messageId} and {@link PubsubMessage#getOrderingKey()}, to a Google - * Cloud Pub/Sub stream. - */ - public static Write writeMessagesWithOrderingKey() { - return Write.newBuilder() - .setTopicProvider(null) - .setTopicFunction(null) - .setDynamicDestinations(false) - .setNeedsOrderingKey(true) - .build(); - } - /** * Enables dynamic destination topics. The {@link PubsubMessage} elements are each expected to * contain a destination topic, which can be set using {@link PubsubMessage#withTopic}. If {@link @@ -1413,6 +1399,16 @@ public Write withMaxBatchBytesSize(int maxBatchBytesSize) { return toBuilder().setMaxBatchBytesSize(maxBatchBytesSize).build(); } + /** + * Writes to Pub/Sub with each record's ordering key. A subscription with message ordering + * enabled will receive messages published in the same region with the same ordering key in the + * order in which they were received by the service. Note that the order in which Beam publishes + * records to the service remains unspecified. + */ + public Write withOrderingKey() { + return toBuilder().setNeedsOrderingKey(true).build(); + } + /** * Writes to Pub/Sub and adds each record's timestamp to the published messages in an attribute * with the specified name. The value of the attribute will be a number representing the number @@ -1484,6 +1480,7 @@ public PDone expand(PCollection input) { new PreparePubsubWriteDoFn<>( getFormatFn(), topicFunction, + getNeedsOrderingKey(), maxMessageSize, getBadRecordRouter(), input.getCoder(), @@ -1520,6 +1517,7 @@ public PDone expand(PCollection input) { getTimestampAttribute(), getIdAttribute(), 100 /* numShards */, + getNeedsOrderingKey(), MoreObjects.firstNonNull( getMaxBatchSize(), PubsubUnboundedSink.DEFAULT_PUBLISH_BATCH_SIZE), MoreObjects.firstNonNull( @@ -1552,7 +1550,7 @@ private class OutgoingData { } } - private transient Map output; + private transient Map, OutgoingData> output; private transient PubsubClient pubsubClient; @@ -1594,19 +1592,21 @@ public void processElement(@Element PubsubMessage message, @Timestamp Instant ti pubsubTopic = PubsubTopic.fromPath(Preconditions.checkArgumentNotNull(message.getTopic())); } + String orderingKey = message.getOrderingKey(); + // Checking before adding the message stops us from violating max batch size or bytes - OutgoingData currentTopicOutput = - output.computeIfAbsent(pubsubTopic, t -> new OutgoingData()); - if (currentTopicOutput.messages.size() >= maxPublishBatchSize - || (!currentTopicOutput.messages.isEmpty() - && (currentTopicOutput.bytes + messageSize) >= maxPublishBatchByteSize)) { - publish(pubsubTopic, currentTopicOutput.messages); - currentTopicOutput.messages.clear(); - currentTopicOutput.bytes = 0; + OutgoingData currentTopicAndOrderingKeyOutput = + output.computeIfAbsent(KV.of(pubsubTopic, orderingKey), t -> new OutgoingData()); + if (currentTopicAndOrderingKeyOutput.messages.size() >= maxPublishBatchSize + || (!currentTopicAndOrderingKeyOutput.messages.isEmpty() + && (currentTopicAndOrderingKeyOutput.bytes + messageSize) + >= maxPublishBatchByteSize)) { + publish(pubsubTopic, currentTopicAndOrderingKeyOutput.messages); + currentTopicAndOrderingKeyOutput.messages.clear(); + currentTopicAndOrderingKeyOutput.bytes = 0; } Map attributes = message.getAttributeMap(); - String orderingKey = message.getOrderingKey(); com.google.pubsub.v1.PubsubMessage.Builder msgBuilder = com.google.pubsub.v1.PubsubMessage.newBuilder() @@ -1618,16 +1618,16 @@ public void processElement(@Element PubsubMessage message, @Timestamp Instant ti } // NOTE: The record id is always null. - currentTopicOutput.messages.add( + currentTopicAndOrderingKeyOutput.messages.add( OutgoingMessage.of( msgBuilder.build(), timestamp.getMillis(), null, message.getTopic())); - currentTopicOutput.bytes += messageSize; + currentTopicAndOrderingKeyOutput.bytes += messageSize; } @FinishBundle public void finishBundle() throws IOException { - for (Map.Entry entry : output.entrySet()) { - publish(entry.getKey(), entry.getValue().messages); + for (Map.Entry, OutgoingData> entry : output.entrySet()) { + publish(entry.getKey().getKey(), entry.getValue().messages); } output = null; pubsubClient.close(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java index aa8e3a411486..4105f3e79a00 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java @@ -23,8 +23,12 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Objects; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import org.apache.beam.sdk.coders.AtomicCoder; @@ -202,7 +206,12 @@ public void processElement( } @Nullable String topic = dynamicTopicFn.apply(element); - K key = keyFunction.apply(ThreadLocalRandom.current().nextInt(numShards), topic); + @Nullable String orderingKey = message.getOrderingKey(); + int shard = + orderingKey == null + ? ThreadLocalRandom.current().nextInt(numShards) + : Hashing.murmur3_32_fixed().hashString(orderingKey, StandardCharsets.UTF_8).asInt(); + K key = keyFunction.apply(shard, topic); o.output(KV.of(key, OutgoingMessage.of(message, timestampMsSinceEpoch, recordId, topic))); } @@ -219,6 +228,16 @@ public void populateDisplayData(DisplayData.Builder builder) { /** Publish messages to Pubsub in batches. */ private static class WriterFn extends DoFn, Void> { + private class OutgoingData { + int bytes; + List messages; + + OutgoingData() { + this.bytes = 0; + this.messages = new ArrayList<>(publishBatchSize); + } + } + private final PubsubClientFactory pubsubFactory; private final @Nullable ValueProvider topic; private final String timestampAttribute; @@ -305,27 +324,53 @@ public void startBundle(StartBundleContext c) throws Exception { } @ProcessElement + @SuppressWarnings("ReferenceEquality") public void processElement(ProcessContext c) throws Exception { - List pubsubMessages = new ArrayList<>(publishBatchSize); - int bytes = 0; + // TODO(sjvanrossum): Refactor the write transform so this map can be indexed with topic + + // ordering key and have bundle scoped lifetime. + // NB: A larger, breaking refactor could make this irrelevant with a GBK on topic + ordering + // key (or GroupIntoBatches with configurable shard for ShardedKey?) and unify + // bounded/unbounded writes and static/dynamic destinations. + Map<@Nullable String, OutgoingData> orderingKeyBatches = new HashMap<>(); + @Nullable String currentOrderingKey = null; + @Nullable OutgoingData currentBatch = null; for (OutgoingMessage message : c.element()) { - if (!pubsubMessages.isEmpty() - && bytes + message.getMessage().getData().size() > publishBatchBytes) { - // Break large (in bytes) batches into smaller. - // (We've already broken by batch size using the trigger below, though that may - // run slightly over the actual PUBLISH_BATCH_SIZE. We'll consider that ok since - // the hard limit from Pubsub is by bytes rather than number of messages.) - // BLOCKS until published. - publishBatch(pubsubMessages, bytes); - pubsubMessages.clear(); - bytes = 0; + // If currentBatch is null set currentOrderingKey before entering the then clause. If + // currentBatch is not null and currentOrderingKey does not equal messageOrderingKey set + // currentOrderingKey and currentBatch before entering the then or else clause and only + // enter the then clause if currentBatch is null. This ensures currentBatch is initialized + // and contains at least one element before entering the else clause if currentOrderingKey + // is equal to messageOrderingKey. + @Nullable String messageOrderingKey = message.getMessage().getOrderingKey(); + if ((currentBatch == null + && (currentOrderingKey = messageOrderingKey) == messageOrderingKey) + || (!Objects.equals(currentOrderingKey, messageOrderingKey) + && (currentBatch = orderingKeyBatches.get(currentOrderingKey = messageOrderingKey)) + == null)) { + currentBatch = new OutgoingData(); + currentBatch.messages.add(message); + currentBatch.bytes += message.getMessage().getData().size(); + orderingKeyBatches.put(currentOrderingKey, currentBatch); + } else { + if (currentBatch.bytes + message.getMessage().getData().size() > publishBatchBytes) { + // Break large (in bytes) batches into smaller. + // (We've already broken by batch size using the trigger below, though that may + // run slightly over the actual PUBLISH_BATCH_SIZE. We'll consider that ok since + // the hard limit from Pubsub is by bytes rather than number of messages.) + // BLOCKS until published. + publishBatch(currentBatch.messages, currentBatch.bytes); + currentBatch.messages.clear(); + currentBatch.bytes = 0; + } + currentBatch.messages.add(message); + currentBatch.bytes += message.getMessage().getData().size(); } - pubsubMessages.add(message); - bytes += message.getMessage().getData().size(); } - if (!pubsubMessages.isEmpty()) { - // BLOCKS until published. - publishBatch(pubsubMessages, bytes); + for (OutgoingData batch : orderingKeyBatches.values()) { + if (!batch.messages.isEmpty()) { + // BLOCKS until published. + publishBatch(batch.messages, batch.bytes); + } } } @@ -378,6 +423,9 @@ public void populateDisplayData(DisplayData.Builder builder) { */ private final int numShards; + /** Publish messages with an ordering key. Currently unsupported in DataflowRunner. */ + private final boolean publishBatchWithOrderingKey; + /** Maximum number of messages per publish. */ private final int publishBatchSize; @@ -402,6 +450,7 @@ public void populateDisplayData(DisplayData.Builder builder) { String timestampAttribute, String idAttribute, int numShards, + boolean publishBatchWithOrderingKey, int publishBatchSize, int publishBatchBytes, Duration maxLatency, @@ -412,6 +461,7 @@ public void populateDisplayData(DisplayData.Builder builder) { this.timestampAttribute = timestampAttribute; this.idAttribute = idAttribute; this.numShards = numShards; + this.publishBatchWithOrderingKey = publishBatchWithOrderingKey; this.publishBatchSize = publishBatchSize; this.publishBatchBytes = publishBatchBytes; this.maxLatency = maxLatency; @@ -424,13 +474,15 @@ public PubsubUnboundedSink( ValueProvider topic, String timestampAttribute, String idAttribute, - int numShards) { + int numShards, + boolean publishBatchWithOrderingKey) { this( pubsubFactory, topic, timestampAttribute, idAttribute, numShards, + publishBatchWithOrderingKey, DEFAULT_PUBLISH_BATCH_SIZE, DEFAULT_PUBLISH_BATCH_BYTES, DEFAULT_MAX_LATENCY, @@ -444,6 +496,7 @@ public PubsubUnboundedSink( String timestampAttribute, String idAttribute, int numShards, + boolean publishBatchWithOrderingKey, String pubsubRootUrl) { this( pubsubFactory, @@ -451,6 +504,7 @@ public PubsubUnboundedSink( timestampAttribute, idAttribute, numShards, + publishBatchWithOrderingKey, DEFAULT_PUBLISH_BATCH_SIZE, DEFAULT_PUBLISH_BATCH_BYTES, DEFAULT_MAX_LATENCY, @@ -464,6 +518,7 @@ public PubsubUnboundedSink( String timestampAttribute, String idAttribute, int numShards, + boolean publishBatchWithOrderingKey, int publishBatchSize, int publishBatchBytes) { this( @@ -472,6 +527,7 @@ public PubsubUnboundedSink( timestampAttribute, idAttribute, numShards, + publishBatchWithOrderingKey, publishBatchSize, publishBatchBytes, DEFAULT_MAX_LATENCY, @@ -485,6 +541,7 @@ public PubsubUnboundedSink( String timestampAttribute, String idAttribute, int numShards, + boolean publishBatchWithOrderingKey, int publishBatchSize, int publishBatchBytes, String pubsubRootUrl) { @@ -494,6 +551,7 @@ public PubsubUnboundedSink( timestampAttribute, idAttribute, numShards, + publishBatchWithOrderingKey, publishBatchSize, publishBatchBytes, DEFAULT_MAX_LATENCY, @@ -520,6 +578,10 @@ public PubsubUnboundedSink( return idAttribute; } + public boolean getPublishBatchWithOrderingKey() { + return publishBatchWithOrderingKey; + } + @Override public PDone expand(PCollection input) { if (topic != null) { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFnTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFnTest.java index 494189d43f36..a125a7b67e69 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFnTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFnTest.java @@ -44,6 +44,19 @@ public void testValidatePubsubMessageSizeOnlyPayload() throws SizeLimitExceededE assertEquals(data.length, messageSize); } + @Test + public void testValidatePubsubMessageSizePayloadAndOrderingKey() + throws SizeLimitExceededException { + byte[] data = new byte[1024]; + String orderingKey = "key"; + PubsubMessage message = new PubsubMessage(data, null, null, orderingKey); + + int messageSize = + PreparePubsubWriteDoFn.validatePubsubMessageSize(message, PUBSUB_MESSAGE_MAX_TOTAL_SIZE); + + assertEquals(data.length + orderingKey.getBytes(StandardCharsets.UTF_8).length, messageSize); + } + @Test public void testValidatePubsubMessageSizePayloadAndAttributes() throws SizeLimitExceededException { @@ -76,6 +89,19 @@ public void testValidatePubsubMessageSizePayloadTooLarge() { message, PUBSUB_MESSAGE_MAX_TOTAL_SIZE)); } + @Test + public void testValidatePubsubMessageSizePayloadPlusOrderingKeyTooLarge() { + byte[] data = new byte[(10 << 20)]; + String orderingKey = "key"; + PubsubMessage message = new PubsubMessage(data, null, null, orderingKey); + + assertThrows( + SizeLimitExceededException.class, + () -> + PreparePubsubWriteDoFn.validatePubsubMessageSize( + message, PUBSUB_MESSAGE_MAX_TOTAL_SIZE)); + } + @Test public void testValidatePubsubMessageSizePayloadPlusAttributesTooLarge() { byte[] data = new byte[(10 << 20)]; @@ -121,6 +147,19 @@ public void testValidatePubsubMessageSizeAttributeValueTooLarge() { message, PUBSUB_MESSAGE_MAX_TOTAL_SIZE)); } + @Test + public void testValidatePubsubMessageSizeOrderingKeyTooLarge() { + byte[] data = new byte[1024]; + String orderingKey = RandomStringUtils.randomAscii(1025); + PubsubMessage message = new PubsubMessage(data, null, null, orderingKey); + + assertThrows( + SizeLimitExceededException.class, + () -> + PreparePubsubWriteDoFn.validatePubsubMessageSize( + message, PUBSUB_MESSAGE_MAX_TOTAL_SIZE)); + } + @Test public void testValidatePubsubMessagePayloadTooLarge() { byte[] data = new byte[(10 << 20) + 1]; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubWritePayloadTranslationTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubWritePayloadTranslationTest.java index 45fbab0576fb..75a6173591a1 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubWritePayloadTranslationTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubWritePayloadTranslationTest.java @@ -67,6 +67,7 @@ public void testTranslateSinkWithTopic() throws Exception { TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, 0, + false, 0, 0, Duration.ZERO, @@ -104,6 +105,7 @@ public void testTranslateDynamicSink() throws Exception { TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, 0, + false, 0, 0, Duration.ZERO, @@ -143,6 +145,7 @@ public void testTranslateSinkWithTopicOverridden() throws Exception { TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, 0, + false, 0, 0, Duration.ZERO, diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java index c9b6bae45b98..8460b8f7be05 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java @@ -120,6 +120,7 @@ public void sendOneMessage() throws IOException { TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, NUM_SHARDS, + false, batchSize, batchBytes, Duration.standardSeconds(2), @@ -152,6 +153,7 @@ public void sendOneMessageWithoutAttributes() throws IOException { TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, NUM_SHARDS, + false, 1 /* batchSize */, 1 /* batchBytes */, Duration.standardSeconds(2), @@ -207,6 +209,7 @@ public void testDynamicTopics() throws IOException { TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, NUM_SHARDS, + false, 1 /* batchSize */, 1 /* batchBytes */, Duration.standardSeconds(2), @@ -258,6 +261,7 @@ public void sendMoreThanOneBatchByNumMessages() throws IOException { TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, NUM_SHARDS, + false, batchSize, batchBytes, Duration.standardSeconds(2), @@ -303,6 +307,7 @@ public void sendMoreThanOneBatchByByteSize() throws IOException { TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, NUM_SHARDS, + false, batchSize, batchBytes, Duration.standardSeconds(2), diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteIT.java index 02e37b633525..fe36e31e69bb 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteIT.java @@ -132,7 +132,7 @@ public void testBoundedWriteMessageWithAttributesAndMessageIdAndOrderingKey() th pipeline .apply(Create.of(outgoingMessage).withCoder(PubsubMessageSchemaCoder.getSchemaCoder())) - .apply(PubsubIO.writeMessagesWithOrderingKey().to(testTopicPath.getPath())); + .apply(PubsubIO.writeMessages().withOrderingKey().to(testTopicPath.getPath())); pipeline.run().waitUntilFinish(); List incomingMessages =