Skip to content
/ beam Public
forked from apache/beam
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.ENABLE_CUSTOM_PUBSUB_SINK;
import static org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.ENABLE_CUSTOM_PUBSUB_SOURCE;
import static org.apache.beam.sdk.options.ExperimentalOptions.hasExperiment;
import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray;
import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray;
import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString;
Expand Down Expand Up @@ -2017,6 +2018,16 @@ private static void translate(
PubsubUnboundedSink overriddenTransform,
StepTranslationContext stepContext,
PCollection input) {
if (overriddenTransform.getPublishBatchWithOrderingKey()) {
throw new UnsupportedOperationException(
String.format(
"%s does not currently support publishing with ordering keys. "
+ "%s is required to support publishing with ordering keys. "
+ "Set the pipeline option --experiments=%s to use this PTransform.",
StreamingPubsubIOWrite.class.getSimpleName(),
PubsubUnboundedSink.class.getSimpleName(),
ENABLE_CUSTOM_PUBSUB_SINK));
}
stepContext.addInput(PropertyNames.FORMAT, "pubsub");
if (overriddenTransform.getTopicProvider() != null) {
if (overriddenTransform.getTopicProvider().isAccessible()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class PreparePubsubWriteDoFn<InputT> extends DoFn<InputT, PubsubMessage>
private static final int PUBSUB_MESSAGE_ATTRIBUTE_MAX_VALUE_BYTES = 1024;
// The amount of bytes that each attribute entry adds up to the request
private static final int PUBSUB_MESSAGE_ATTRIBUTE_ENCODE_ADDITIONAL_BYTES = 6;
private boolean allowOrderingKey;
private int maxPublishBatchSize;

private SerializableFunction<ValueInSingleWindow<InputT>, PubsubMessage> formatFunction;
Expand All @@ -63,6 +64,20 @@ static int validatePubsubMessageSize(PubsubMessage message, int maxPublishBatchS
}
int totalSize = payloadSize;

@Nullable String orderingKey = message.getOrderingKey();
if (orderingKey != null) {
int orderingKeySize = orderingKey.getBytes(StandardCharsets.UTF_8).length;
if (orderingKeySize > PUBSUB_MESSAGE_ATTRIBUTE_MAX_VALUE_BYTES) {
throw new SizeLimitExceededException(
"Pubsub message ordering key of length "
+ orderingKeySize
+ " exceeds maximum of "
+ PUBSUB_MESSAGE_ATTRIBUTE_MAX_VALUE_BYTES
+ " bytes. See https://cloud.google.com/pubsub/quotas#resource_limits");
}
totalSize += orderingKeySize;
}

@Nullable Map<String, String> attributes = message.getAttributeMap();
if (attributes != null) {
if (attributes.size() > PUBSUB_MESSAGE_MAX_ATTRIBUTES) {
Expand Down Expand Up @@ -122,12 +137,14 @@ static int validatePubsubMessageSize(PubsubMessage message, int maxPublishBatchS
SerializableFunction<ValueInSingleWindow<InputT>, PubsubMessage> formatFunction,
@Nullable
SerializableFunction<ValueInSingleWindow<InputT>, PubsubIO.PubsubTopic> topicFunction,
boolean allowOrderingKey,
int maxPublishBatchSize,
BadRecordRouter badRecordRouter,
Coder<InputT> inputCoder,
TupleTag<PubsubMessage> outputTag) {
this.formatFunction = formatFunction;
this.topicFunction = topicFunction;
this.allowOrderingKey = allowOrderingKey;
this.maxPublishBatchSize = maxPublishBatchSize;
this.badRecordRouter = badRecordRouter;
this.inputCoder = inputCoder;
Expand Down Expand Up @@ -165,6 +182,14 @@ public void process(
return;
}
}
if (!allowOrderingKey && message.getOrderingKey() != null) {
badRecordRouter.route(
o,
element,
inputCoder,
null,
"The transform was not configured to publish messages with ordering keys");
}
try {
validatePubsubMessageSize(message, maxPublishBatchSize);
} catch (SizeLimitExceededException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -724,20 +724,6 @@ public static Write<PubsubMessage> writeMessages() {
.build();
}

/**
* Returns A {@link PTransform} that writes {@link PubsubMessage}s, along with the {@link
* PubsubMessage#getMessageId() messageId} and {@link PubsubMessage#getOrderingKey()}, to a Google
* Cloud Pub/Sub stream.
*/
public static Write<PubsubMessage> writeMessagesWithOrderingKey() {
return Write.newBuilder()
.setTopicProvider(null)
.setTopicFunction(null)
.setDynamicDestinations(false)
.setNeedsOrderingKey(true)
.build();
}

/**
* Enables dynamic destination topics. The {@link PubsubMessage} elements are each expected to
* contain a destination topic, which can be set using {@link PubsubMessage#withTopic}. If {@link
Expand Down Expand Up @@ -1413,6 +1399,16 @@ public Write<T> withMaxBatchBytesSize(int maxBatchBytesSize) {
return toBuilder().setMaxBatchBytesSize(maxBatchBytesSize).build();
}

/**
* Writes to Pub/Sub with each record's ordering key. A subscription with message ordering
* enabled will receive messages published in the same region with the same ordering key in the
* order in which they were received by the service. Note that the order in which Beam publishes
* records to the service remains unspecified.
*/
public Write<T> withOrderingKey() {
return toBuilder().setNeedsOrderingKey(true).build();
}

/**
* Writes to Pub/Sub and adds each record's timestamp to the published messages in an attribute
* with the specified name. The value of the attribute will be a number representing the number
Expand Down Expand Up @@ -1484,6 +1480,7 @@ public PDone expand(PCollection<T> input) {
new PreparePubsubWriteDoFn<>(
getFormatFn(),
topicFunction,
getNeedsOrderingKey(),
maxMessageSize,
getBadRecordRouter(),
input.getCoder(),
Expand Down Expand Up @@ -1520,6 +1517,7 @@ public PDone expand(PCollection<T> input) {
getTimestampAttribute(),
getIdAttribute(),
100 /* numShards */,
getNeedsOrderingKey(),
MoreObjects.firstNonNull(
getMaxBatchSize(), PubsubUnboundedSink.DEFAULT_PUBLISH_BATCH_SIZE),
MoreObjects.firstNonNull(
Expand Down Expand Up @@ -1552,7 +1550,7 @@ private class OutgoingData {
}
}

private transient Map<PubsubTopic, OutgoingData> output;
private transient Map<KV<PubsubTopic, String>, OutgoingData> output;

private transient PubsubClient pubsubClient;

Expand Down Expand Up @@ -1594,19 +1592,21 @@ public void processElement(@Element PubsubMessage message, @Timestamp Instant ti
pubsubTopic =
PubsubTopic.fromPath(Preconditions.checkArgumentNotNull(message.getTopic()));
}
String orderingKey = message.getOrderingKey();

// Checking before adding the message stops us from violating max batch size or bytes
OutgoingData currentTopicOutput =
output.computeIfAbsent(pubsubTopic, t -> new OutgoingData());
if (currentTopicOutput.messages.size() >= maxPublishBatchSize
|| (!currentTopicOutput.messages.isEmpty()
&& (currentTopicOutput.bytes + messageSize) >= maxPublishBatchByteSize)) {
publish(pubsubTopic, currentTopicOutput.messages);
currentTopicOutput.messages.clear();
currentTopicOutput.bytes = 0;
OutgoingData currentTopicAndOrderingKeyOutput =
output.computeIfAbsent(KV.of(pubsubTopic, orderingKey), t -> new OutgoingData());
if (currentTopicAndOrderingKeyOutput.messages.size() >= maxPublishBatchSize
|| (!currentTopicAndOrderingKeyOutput.messages.isEmpty()
&& (currentTopicAndOrderingKeyOutput.bytes + messageSize)
>= maxPublishBatchByteSize)) {
publish(pubsubTopic, currentTopicAndOrderingKeyOutput.messages);
currentTopicAndOrderingKeyOutput.messages.clear();
currentTopicAndOrderingKeyOutput.bytes = 0;
}

Map<String, String> attributes = message.getAttributeMap();
String orderingKey = message.getOrderingKey();

com.google.pubsub.v1.PubsubMessage.Builder msgBuilder =
com.google.pubsub.v1.PubsubMessage.newBuilder()
Expand All @@ -1618,16 +1618,16 @@ public void processElement(@Element PubsubMessage message, @Timestamp Instant ti
}

// NOTE: The record id is always null.
currentTopicOutput.messages.add(
currentTopicAndOrderingKeyOutput.messages.add(
OutgoingMessage.of(
msgBuilder.build(), timestamp.getMillis(), null, message.getTopic()));
currentTopicOutput.bytes += messageSize;
currentTopicAndOrderingKeyOutput.bytes += messageSize;
}

@FinishBundle
public void finishBundle() throws IOException {
for (Map.Entry<PubsubTopic, OutgoingData> entry : output.entrySet()) {
publish(entry.getKey(), entry.getValue().messages);
for (Map.Entry<KV<PubsubTopic, @Nullable String>, OutgoingData> entry : output.entrySet()) {
publish(entry.getKey().getKey(), entry.getValue().messages);
}
output = null;
pubsubClient.close();
Expand Down
Loading