From 879909bfd4aaa5e31b6c16c710c28c77848c60e4 Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Fri, 8 Feb 2019 16:05:39 -0500 Subject: [PATCH 1/5] Add ordering key fields to proto. We are locally committing the proto with ordering keys for now. Once we are prepared to release this feature, we can use the default, released proto. --- .../com/google/pubsub/v1/PubsubMessage.java | 105 +++++ .../pubsub/v1/PubsubMessageOrBuilder.java | 5 + .../com/google/pubsub/v1/PubsubProto.java | 364 +++++++++--------- .../com/google/pubsub/v1/Subscription.java | 48 +++ .../pubsub/v1/SubscriptionOrBuilder.java | 3 + .../main/proto/google/pubsub/v1/pubsub.proto | 3 + 6 files changed, 347 insertions(+), 181 deletions(-) diff --git a/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/PubsubMessage.java b/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/PubsubMessage.java index 21ae0f027cbe..8b09b93c5033 100644 --- a/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/PubsubMessage.java +++ b/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/PubsubMessage.java @@ -32,6 +32,7 @@ private PubsubMessage(com.google.protobuf.GeneratedMessageV3.Builder builder) private PubsubMessage() { data_ = com.google.protobuf.ByteString.EMPTY; messageId_ = ""; + orderingKey_ = ""; } @java.lang.Override @@ -98,6 +99,13 @@ private PubsubMessage( publishTime_ = subBuilder.buildPartial(); } + break; + } + case 42: + { + java.lang.String s = input.readStringRequireUtf8(); + + orderingKey_ = s; break; } default: @@ -350,6 +358,33 @@ public com.google.protobuf.TimestampOrBuilder getPublishTimeOrBuilder() { return getPublishTime(); } + public static final int ORDERING_KEY_FIELD_NUMBER = 5; + private volatile java.lang.Object orderingKey_; + /** string ordering_key = 5; */ + public java.lang.String getOrderingKey() { + java.lang.Object ref = orderingKey_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + orderingKey_ = s; + return s; + } + } + /** string ordering_key = 5; */ + public com.google.protobuf.ByteString getOrderingKeyBytes() { + java.lang.Object ref = orderingKey_; + if (ref instanceof java.lang.String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8((java.lang.String) ref); + orderingKey_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + private byte memoizedIsInitialized = -1; @java.lang.Override @@ -375,6 +410,9 @@ public void writeTo(com.google.protobuf.CodedOutputStream output) throws java.io if (publishTime_ != null) { output.writeMessage(4, getPublishTime()); } + if (!getOrderingKeyBytes().isEmpty()) { + com.google.protobuf.GeneratedMessageV3.writeString(output, 5, orderingKey_); + } unknownFields.writeTo(output); } @@ -403,6 +441,9 @@ public int getSerializedSize() { if (publishTime_ != null) { size += com.google.protobuf.CodedOutputStream.computeMessageSize(4, getPublishTime()); } + if (!getOrderingKeyBytes().isEmpty()) { + size += com.google.protobuf.GeneratedMessageV3.computeStringSize(5, orderingKey_); + } size += unknownFields.getSerializedSize(); memoizedSize = size; return size; @@ -426,6 +467,7 @@ public boolean equals(final java.lang.Object obj) { if (hasPublishTime()) { result = result && getPublishTime().equals(other.getPublishTime()); } + result = result && getOrderingKey().equals(other.getOrderingKey()); result = result && unknownFields.equals(other.unknownFields); return result; } @@ -449,6 +491,8 @@ public int hashCode() { hash = (37 * hash) + PUBLISH_TIME_FIELD_NUMBER; hash = (53 * hash) + getPublishTime().hashCode(); } + hash = (37 * hash) + ORDERING_KEY_FIELD_NUMBER; + hash = (53 * hash) + getOrderingKey().hashCode(); hash = (29 * hash) + unknownFields.hashCode(); memoizedHashCode = hash; return hash; @@ -631,6 +675,8 @@ public Builder clear() { publishTime_ = null; publishTimeBuilder_ = null; } + orderingKey_ = ""; + return this; } @@ -668,6 +714,7 @@ public com.google.pubsub.v1.PubsubMessage buildPartial() { } else { result.publishTime_ = publishTimeBuilder_.build(); } + result.orderingKey_ = orderingKey_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -729,6 +776,10 @@ public Builder mergeFrom(com.google.pubsub.v1.PubsubMessage other) { if (other.hasPublishTime()) { mergePublishTime(other.getPublishTime()); } + if (!other.getOrderingKey().isEmpty()) { + orderingKey_ = other.orderingKey_; + onChanged(); + } this.mergeUnknownFields(other.unknownFields); onChanged(); return this; @@ -1273,6 +1324,60 @@ public com.google.protobuf.TimestampOrBuilder getPublishTimeOrBuilder() { return publishTimeBuilder_; } + private java.lang.Object orderingKey_ = ""; + /** string ordering_key = 5; */ + public java.lang.String getOrderingKey() { + java.lang.Object ref = orderingKey_; + if (!(ref instanceof java.lang.String)) { + com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + orderingKey_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** string ordering_key = 5; */ + public com.google.protobuf.ByteString getOrderingKeyBytes() { + java.lang.Object ref = orderingKey_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8((java.lang.String) ref); + orderingKey_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + /** string ordering_key = 5; */ + public Builder setOrderingKey(java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + + orderingKey_ = value; + onChanged(); + return this; + } + /** string ordering_key = 5; */ + public Builder clearOrderingKey() { + + orderingKey_ = getDefaultInstance().getOrderingKey(); + onChanged(); + return this; + } + /** string ordering_key = 5; */ + public Builder setOrderingKeyBytes(com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + checkByteStringIsUtf8(value); + + orderingKey_ = value; + onChanged(); + return this; + } + @java.lang.Override public final Builder setUnknownFields(final com.google.protobuf.UnknownFieldSet unknownFields) { return super.setUnknownFieldsProto3(unknownFields); diff --git a/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/PubsubMessageOrBuilder.java b/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/PubsubMessageOrBuilder.java index e1db91aa9512..a278851db728 100644 --- a/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/PubsubMessageOrBuilder.java +++ b/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/PubsubMessageOrBuilder.java @@ -137,4 +137,9 @@ public interface PubsubMessageOrBuilder * .google.protobuf.Timestamp publish_time = 4; */ com.google.protobuf.TimestampOrBuilder getPublishTimeOrBuilder(); + + /** string ordering_key = 5; */ + java.lang.String getOrderingKey(); + /** string ordering_key = 5; */ + com.google.protobuf.ByteString getOrderingKeyBytes(); } diff --git a/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/PubsubProto.java b/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/PubsubProto.java index cdc7b4ce4539..b53e18f24357 100644 --- a/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/PubsubProto.java +++ b/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/PubsubProto.java @@ -212,189 +212,190 @@ public static com.google.protobuf.Descriptors.FileDescriptor getDescriptor() { + "bsub.v1.Topic.LabelsEntry\022F\n\026message_sto" + "rage_policy\030\003 \001(\0132&.google.pubsub.v1.Mes" + "sageStoragePolicy\032-\n\013LabelsEntry\022\013\n\003key\030" - + "\001 \001(\t\022\r\n\005value\030\002 \001(\t:\0028\001\"\333\001\n\rPubsubMessa" + + "\001 \001(\t\022\r\n\005value\030\002 \001(\t:\0028\001\"\361\001\n\rPubsubMessa" + "ge\022\014\n\004data\030\001 \001(\014\022C\n\nattributes\030\002 \003(\0132/.g" + "oogle.pubsub.v1.PubsubMessage.Attributes" + "Entry\022\022\n\nmessage_id\030\003 \001(\t\0220\n\014publish_tim" - + "e\030\004 \001(\0132\032.google.protobuf.Timestamp\0321\n\017A" - + "ttributesEntry\022\013\n\003key\030\001 \001(\t\022\r\n\005value\030\002 \001" - + "(\t:\0028\001\" \n\017GetTopicRequest\022\r\n\005topic\030\001 \001(\t" - + "\"m\n\022UpdateTopicRequest\022&\n\005topic\030\001 \001(\0132\027." - + "google.pubsub.v1.Topic\022/\n\013update_mask\030\002 " - + "\001(\0132\032.google.protobuf.FieldMask\"R\n\016Publi" - + "shRequest\022\r\n\005topic\030\001 \001(\t\0221\n\010messages\030\002 \003" - + "(\0132\037.google.pubsub.v1.PubsubMessage\"&\n\017P" - + "ublishResponse\022\023\n\013message_ids\030\001 \003(\t\"K\n\021L" - + "istTopicsRequest\022\017\n\007project\030\001 \001(\t\022\021\n\tpag" - + "e_size\030\002 \001(\005\022\022\n\npage_token\030\003 \001(\t\"V\n\022List" - + "TopicsResponse\022\'\n\006topics\030\001 \003(\0132\027.google." - + "pubsub.v1.Topic\022\027\n\017next_page_token\030\002 \001(\t" - + "\"U\n\035ListTopicSubscriptionsRequest\022\r\n\005top" - + "ic\030\001 \001(\t\022\021\n\tpage_size\030\002 \001(\005\022\022\n\npage_toke" - + "n\030\003 \001(\t\"P\n\036ListTopicSubscriptionsRespons" - + "e\022\025\n\rsubscriptions\030\001 \003(\t\022\027\n\017next_page_to" - + "ken\030\002 \001(\t\"Q\n\031ListTopicSnapshotsRequest\022\r" - + "\n\005topic\030\001 \001(\t\022\021\n\tpage_size\030\002 \001(\005\022\022\n\npage" - + "_token\030\003 \001(\t\"H\n\032ListTopicSnapshotsRespon" - + "se\022\021\n\tsnapshots\030\001 \003(\t\022\027\n\017next_page_token" - + "\030\002 \001(\t\"#\n\022DeleteTopicRequest\022\r\n\005topic\030\001 " - + "\001(\t\"\204\003\n\014Subscription\022\014\n\004name\030\001 \001(\t\022\r\n\005to" - + "pic\030\002 \001(\t\0221\n\013push_config\030\004 \001(\0132\034.google." - + "pubsub.v1.PushConfig\022\034\n\024ack_deadline_sec" - + "onds\030\005 \001(\005\022\035\n\025retain_acked_messages\030\007 \001(" - + "\010\022=\n\032message_retention_duration\030\010 \001(\0132\031." - + "google.protobuf.Duration\022:\n\006labels\030\t \003(\013" - + "2*.google.pubsub.v1.Subscription.LabelsE" - + "ntry\022=\n\021expiration_policy\030\013 \001(\0132\".google" - + ".pubsub.v1.ExpirationPolicy\032-\n\013LabelsEnt" - + "ry\022\013\n\003key\030\001 \001(\t\022\r\n\005value\030\002 \001(\t:\0028\001\":\n\020Ex" - + "pirationPolicy\022&\n\003ttl\030\001 \001(\0132\031.google.pro" - + "tobuf.Duration\"\230\001\n\nPushConfig\022\025\n\rpush_en" - + "dpoint\030\001 \001(\t\022@\n\nattributes\030\002 \003(\0132,.googl" - + "e.pubsub.v1.PushConfig.AttributesEntry\0321" - + "\n\017AttributesEntry\022\013\n\003key\030\001 \001(\t\022\r\n\005value\030" - + "\002 \001(\t:\0028\001\"S\n\017ReceivedMessage\022\016\n\006ack_id\030\001" - + " \001(\t\0220\n\007message\030\002 \001(\0132\037.google.pubsub.v1" - + ".PubsubMessage\".\n\026GetSubscriptionRequest" - + "\022\024\n\014subscription\030\001 \001(\t\"\202\001\n\031UpdateSubscri" - + "ptionRequest\0224\n\014subscription\030\001 \001(\0132\036.goo" - + "gle.pubsub.v1.Subscription\022/\n\013update_mas" - + "k\030\002 \001(\0132\032.google.protobuf.FieldMask\"R\n\030L" - + "istSubscriptionsRequest\022\017\n\007project\030\001 \001(\t" - + "\022\021\n\tpage_size\030\002 \001(\005\022\022\n\npage_token\030\003 \001(\t\"" - + "k\n\031ListSubscriptionsResponse\0225\n\rsubscrip" - + "tions\030\001 \003(\0132\036.google.pubsub.v1.Subscript" - + "ion\022\027\n\017next_page_token\030\002 \001(\t\"1\n\031DeleteSu" - + "bscriptionRequest\022\024\n\014subscription\030\001 \001(\t\"" - + "b\n\027ModifyPushConfigRequest\022\024\n\014subscripti" - + "on\030\001 \001(\t\0221\n\013push_config\030\002 \001(\0132\034.google.p" - + "ubsub.v1.PushConfig\"U\n\013PullRequest\022\024\n\014su" - + "bscription\030\001 \001(\t\022\032\n\022return_immediately\030\002" - + " \001(\010\022\024\n\014max_messages\030\003 \001(\005\"L\n\014PullRespon" - + "se\022<\n\021received_messages\030\001 \003(\0132!.google.p" - + "ubsub.v1.ReceivedMessage\"_\n\030ModifyAckDea" - + "dlineRequest\022\024\n\014subscription\030\001 \001(\t\022\017\n\007ac" - + "k_ids\030\004 \003(\t\022\034\n\024ack_deadline_seconds\030\003 \001(" - + "\005\";\n\022AcknowledgeRequest\022\024\n\014subscription\030" - + "\001 \001(\t\022\017\n\007ack_ids\030\002 \003(\t\"\244\001\n\024StreamingPull" - + "Request\022\024\n\014subscription\030\001 \001(\t\022\017\n\007ack_ids" - + "\030\002 \003(\t\022\037\n\027modify_deadline_seconds\030\003 \003(\005\022" - + "\037\n\027modify_deadline_ack_ids\030\004 \003(\t\022#\n\033stre" - + "am_ack_deadline_seconds\030\005 \001(\005\"U\n\025Streami" - + "ngPullResponse\022<\n\021received_messages\030\001 \003(" - + "\0132!.google.pubsub.v1.ReceivedMessage\"\257\001\n" - + "\025CreateSnapshotRequest\022\014\n\004name\030\001 \001(\t\022\024\n\014" - + "subscription\030\002 \001(\t\022C\n\006labels\030\003 \003(\01323.goo" - + "gle.pubsub.v1.CreateSnapshotRequest.Labe" - + "lsEntry\032-\n\013LabelsEntry\022\013\n\003key\030\001 \001(\t\022\r\n\005v" - + "alue\030\002 \001(\t:\0028\001\"v\n\025UpdateSnapshotRequest\022" - + ",\n\010snapshot\030\001 \001(\0132\032.google.pubsub.v1.Sna" - + "pshot\022/\n\013update_mask\030\002 \001(\0132\032.google.prot" - + "obuf.FieldMask\"\277\001\n\010Snapshot\022\014\n\004name\030\001 \001(" - + "\t\022\r\n\005topic\030\002 \001(\t\022/\n\013expire_time\030\003 \001(\0132\032." - + "google.protobuf.Timestamp\0226\n\006labels\030\004 \003(" - + "\0132&.google.pubsub.v1.Snapshot.LabelsEntr" - + "y\032-\n\013LabelsEntry\022\013\n\003key\030\001 \001(\t\022\r\n\005value\030\002" - + " \001(\t:\0028\001\"&\n\022GetSnapshotRequest\022\020\n\010snapsh" - + "ot\030\001 \001(\t\"N\n\024ListSnapshotsRequest\022\017\n\007proj" - + "ect\030\001 \001(\t\022\021\n\tpage_size\030\002 \001(\005\022\022\n\npage_tok" - + "en\030\003 \001(\t\"_\n\025ListSnapshotsResponse\022-\n\tsna" - + "pshots\030\001 \003(\0132\032.google.pubsub.v1.Snapshot" - + "\022\027\n\017next_page_token\030\002 \001(\t\")\n\025DeleteSnaps" - + "hotRequest\022\020\n\010snapshot\030\001 \001(\t\"m\n\013SeekRequ" - + "est\022\024\n\014subscription\030\001 \001(\t\022*\n\004time\030\002 \001(\0132" - + "\032.google.protobuf.TimestampH\000\022\022\n\010snapsho" - + "t\030\003 \001(\tH\000B\010\n\006target\"\016\n\014SeekResponse2\277\010\n\t" - + "Publisher\022j\n\013CreateTopic\022\027.google.pubsub" - + ".v1.Topic\032\027.google.pubsub.v1.Topic\")\202\323\344\223" - + "\002#\032\036/v1/{name=projects/*/topics/*}:\001*\022}\n" - + "\013UpdateTopic\022$.google.pubsub.v1.UpdateTo" - + "picRequest\032\027.google.pubsub.v1.Topic\"/\202\323\344" - + "\223\002)2$/v1/{topic.name=projects/*/topics/*" - + "}:\001*\022\202\001\n\007Publish\022 .google.pubsub.v1.Publ" - + "ishRequest\032!.google.pubsub.v1.PublishRes" - + "ponse\"2\202\323\344\223\002,\"\'/v1/{topic=projects/*/top" - + "ics/*}:publish:\001*\022o\n\010GetTopic\022!.google.p" - + "ubsub.v1.GetTopicRequest\032\027.google.pubsub" - + ".v1.Topic\"\'\202\323\344\223\002!\022\037/v1/{topic=projects/*" - + "/topics/*}\022\200\001\n\nListTopics\022#.google.pubsu" - + "b.v1.ListTopicsRequest\032$.google.pubsub.v" - + "1.ListTopicsResponse\"\'\202\323\344\223\002!\022\037/v1/{proje" - + "ct=projects/*}/topics\022\262\001\n\026ListTopicSubsc" - + "riptions\022/.google.pubsub.v1.ListTopicSub" - + "scriptionsRequest\0320.google.pubsub.v1.Lis" - + "tTopicSubscriptionsResponse\"5\202\323\344\223\002/\022-/v1" - + "/{topic=projects/*/topics/*}/subscriptio" - + "ns\022\242\001\n\022ListTopicSnapshots\022+.google.pubsu" - + "b.v1.ListTopicSnapshotsRequest\032,.google." - + "pubsub.v1.ListTopicSnapshotsResponse\"1\202\323" - + "\344\223\002+\022)/v1/{topic=projects/*/topics/*}/sn" - + "apshots\022t\n\013DeleteTopic\022$.google.pubsub.v" - + "1.DeleteTopicRequest\032\026.google.protobuf.E" - + "mpty\"\'\202\323\344\223\002!*\037/v1/{topic=projects/*/topi" - + "cs/*}2\371\021\n\nSubscriber\022\206\001\n\022CreateSubscript" - + "ion\022\036.google.pubsub.v1.Subscription\032\036.go" - + "ogle.pubsub.v1.Subscription\"0\202\323\344\223\002*\032%/v1" - + "/{name=projects/*/subscriptions/*}:\001*\022\222\001" - + "\n\017GetSubscription\022(.google.pubsub.v1.Get" - + "SubscriptionRequest\032\036.google.pubsub.v1.S" - + "ubscription\"5\202\323\344\223\002/\022-/v1/{subscription=p" - + "rojects/*/subscriptions/*}\022\240\001\n\022UpdateSub" - + "scription\022+.google.pubsub.v1.UpdateSubsc" - + "riptionRequest\032\036.google.pubsub.v1.Subscr" - + "iption\"=\202\323\344\223\002722/v1/{subscription.name=p" - + "rojects/*/subscriptions/*}:\001*\022\234\001\n\021ListSu" - + "bscriptions\022*.google.pubsub.v1.ListSubsc" - + "riptionsRequest\032+.google.pubsub.v1.ListS" - + "ubscriptionsResponse\".\202\323\344\223\002(\022&/v1/{proje" - + "ct=projects/*}/subscriptions\022\220\001\n\022DeleteS" - + "ubscription\022+.google.pubsub.v1.DeleteSub" - + "scriptionRequest\032\026.google.protobuf.Empty" - + "\"5\202\323\344\223\002/*-/v1/{subscription=projects/*/s" - + "ubscriptions/*}\022\243\001\n\021ModifyAckDeadline\022*." - + "google.pubsub.v1.ModifyAckDeadlineReques" - + "t\032\026.google.protobuf.Empty\"J\202\323\344\223\002D\"?/v1/{" - + "subscription=projects/*/subscriptions/*}" - + ":modifyAckDeadline:\001*\022\221\001\n\013Acknowledge\022$." - + "google.pubsub.v1.AcknowledgeRequest\032\026.go" - + "ogle.protobuf.Empty\"D\202\323\344\223\002>\"9/v1/{subscr" - + "iption=projects/*/subscriptions/*}:ackno" - + "wledge:\001*\022\204\001\n\004Pull\022\035.google.pubsub.v1.Pu" - + "llRequest\032\036.google.pubsub.v1.PullRespons" - + "e\"=\202\323\344\223\0027\"2/v1/{subscription=projects/*/" - + "subscriptions/*}:pull:\001*\022f\n\rStreamingPul" - + "l\022&.google.pubsub.v1.StreamingPullReques" - + "t\032\'.google.pubsub.v1.StreamingPullRespon" - + "se\"\000(\0010\001\022\240\001\n\020ModifyPushConfig\022).google.p" - + "ubsub.v1.ModifyPushConfigRequest\032\026.googl" - + "e.protobuf.Empty\"I\202\323\344\223\002C\">/v1/{subscript" - + "ion=projects/*/subscriptions/*}:modifyPu" - + "shConfig:\001*\022~\n\013GetSnapshot\022$.google.pubs" - + "ub.v1.GetSnapshotRequest\032\032.google.pubsub" - + ".v1.Snapshot\"-\202\323\344\223\002\'\022%/v1/{snapshot=proj" - + "ects/*/snapshots/*}\022\214\001\n\rListSnapshots\022&." - + "google.pubsub.v1.ListSnapshotsRequest\032\'." - + "google.pubsub.v1.ListSnapshotsResponse\"*" - + "\202\323\344\223\002$\022\"/v1/{project=projects/*}/snapsho" - + "ts\022\203\001\n\016CreateSnapshot\022\'.google.pubsub.v1" - + ".CreateSnapshotRequest\032\032.google.pubsub.v" - + "1.Snapshot\",\202\323\344\223\002&\032!/v1/{name=projects/*" - + "/snapshots/*}:\001*\022\214\001\n\016UpdateSnapshot\022\'.go" - + "ogle.pubsub.v1.UpdateSnapshotRequest\032\032.g" - + "oogle.pubsub.v1.Snapshot\"5\202\323\344\223\002/2*/v1/{s" - + "napshot.name=projects/*/snapshots/*}:\001*\022" - + "\200\001\n\016DeleteSnapshot\022\'.google.pubsub.v1.De" - + "leteSnapshotRequest\032\026.google.protobuf.Em" - + "pty\"-\202\323\344\223\002\'*%/v1/{snapshot=projects/*/sn" - + "apshots/*}\022\204\001\n\004Seek\022\035.google.pubsub.v1.S" - + "eekRequest\032\036.google.pubsub.v1.SeekRespon" - + "se\"=\202\323\344\223\0027\"2/v1/{subscription=projects/*" - + "/subscriptions/*}:seek:\001*B\256\001\n\024com.google" - + ".pubsub.v1B\013PubsubProtoP\001Z6google.golang" - + ".org/genproto/googleapis/pubsub/v1;pubsu" - + "b\370\001\001\252\002\026Google.Cloud.PubSub.V1\312\002\026Google\\C" - + "loud\\PubSub\\V1\352\002\031Google::Cloud::PubSub::" - + "V1b\006proto3" + + "e\030\004 \001(\0132\032.google.protobuf.Timestamp\022\024\n\014o" + + "rdering_key\030\005 \001(\t\0321\n\017AttributesEntry\022\013\n\003" + + "key\030\001 \001(\t\022\r\n\005value\030\002 \001(\t:\0028\001\" \n\017GetTopic" + + "Request\022\r\n\005topic\030\001 \001(\t\"m\n\022UpdateTopicReq" + + "uest\022&\n\005topic\030\001 \001(\0132\027.google.pubsub.v1.T" + + "opic\022/\n\013update_mask\030\002 \001(\0132\032.google.proto" + + "buf.FieldMask\"R\n\016PublishRequest\022\r\n\005topic" + + "\030\001 \001(\t\0221\n\010messages\030\002 \003(\0132\037.google.pubsub" + + ".v1.PubsubMessage\"&\n\017PublishResponse\022\023\n\013" + + "message_ids\030\001 \003(\t\"K\n\021ListTopicsRequest\022\017" + + "\n\007project\030\001 \001(\t\022\021\n\tpage_size\030\002 \001(\005\022\022\n\npa" + + "ge_token\030\003 \001(\t\"V\n\022ListTopicsResponse\022\'\n\006" + + "topics\030\001 \003(\0132\027.google.pubsub.v1.Topic\022\027\n" + + "\017next_page_token\030\002 \001(\t\"U\n\035ListTopicSubsc" + + "riptionsRequest\022\r\n\005topic\030\001 \001(\t\022\021\n\tpage_s" + + "ize\030\002 \001(\005\022\022\n\npage_token\030\003 \001(\t\"P\n\036ListTop" + + "icSubscriptionsResponse\022\025\n\rsubscriptions" + + "\030\001 \003(\t\022\027\n\017next_page_token\030\002 \001(\t\"Q\n\031ListT" + + "opicSnapshotsRequest\022\r\n\005topic\030\001 \001(\t\022\021\n\tp" + + "age_size\030\002 \001(\005\022\022\n\npage_token\030\003 \001(\t\"H\n\032Li" + + "stTopicSnapshotsResponse\022\021\n\tsnapshots\030\001 " + + "\003(\t\022\027\n\017next_page_token\030\002 \001(\t\"#\n\022DeleteTo" + + "picRequest\022\r\n\005topic\030\001 \001(\t\"\245\003\n\014Subscripti" + + "on\022\014\n\004name\030\001 \001(\t\022\r\n\005topic\030\002 \001(\t\0221\n\013push_" + + "config\030\004 \001(\0132\034.google.pubsub.v1.PushConf" + + "ig\022\034\n\024ack_deadline_seconds\030\005 \001(\005\022\035\n\025reta" + + "in_acked_messages\030\007 \001(\010\022=\n\032message_reten" + + "tion_duration\030\010 \001(\0132\031.google.protobuf.Du" + + "ration\022:\n\006labels\030\t \003(\0132*.google.pubsub.v" + + "1.Subscription.LabelsEntry\022\037\n\027enable_mes" + + "sage_ordering\030\n \001(\010\022=\n\021expiration_policy" + + "\030\013 \001(\0132\".google.pubsub.v1.ExpirationPoli" + + "cy\032-\n\013LabelsEntry\022\013\n\003key\030\001 \001(\t\022\r\n\005value\030" + + "\002 \001(\t:\0028\001\":\n\020ExpirationPolicy\022&\n\003ttl\030\001 \001" + + "(\0132\031.google.protobuf.Duration\"\230\001\n\nPushCo" + + "nfig\022\025\n\rpush_endpoint\030\001 \001(\t\022@\n\nattribute" + + "s\030\002 \003(\0132,.google.pubsub.v1.PushConfig.At" + + "tributesEntry\0321\n\017AttributesEntry\022\013\n\003key\030" + + "\001 \001(\t\022\r\n\005value\030\002 \001(\t:\0028\001\"S\n\017ReceivedMess" + + "age\022\016\n\006ack_id\030\001 \001(\t\0220\n\007message\030\002 \001(\0132\037.g" + + "oogle.pubsub.v1.PubsubMessage\".\n\026GetSubs" + + "criptionRequest\022\024\n\014subscription\030\001 \001(\t\"\202\001" + + "\n\031UpdateSubscriptionRequest\0224\n\014subscript" + + "ion\030\001 \001(\0132\036.google.pubsub.v1.Subscriptio" + + "n\022/\n\013update_mask\030\002 \001(\0132\032.google.protobuf" + + ".FieldMask\"R\n\030ListSubscriptionsRequest\022\017" + + "\n\007project\030\001 \001(\t\022\021\n\tpage_size\030\002 \001(\005\022\022\n\npa" + + "ge_token\030\003 \001(\t\"k\n\031ListSubscriptionsRespo" + + "nse\0225\n\rsubscriptions\030\001 \003(\0132\036.google.pubs" + + "ub.v1.Subscription\022\027\n\017next_page_token\030\002 " + + "\001(\t\"1\n\031DeleteSubscriptionRequest\022\024\n\014subs" + + "cription\030\001 \001(\t\"b\n\027ModifyPushConfigReques" + + "t\022\024\n\014subscription\030\001 \001(\t\0221\n\013push_config\030\002" + + " \001(\0132\034.google.pubsub.v1.PushConfig\"U\n\013Pu" + + "llRequest\022\024\n\014subscription\030\001 \001(\t\022\032\n\022retur" + + "n_immediately\030\002 \001(\010\022\024\n\014max_messages\030\003 \001(" + + "\005\"L\n\014PullResponse\022<\n\021received_messages\030\001" + + " \003(\0132!.google.pubsub.v1.ReceivedMessage\"" + + "_\n\030ModifyAckDeadlineRequest\022\024\n\014subscript" + + "ion\030\001 \001(\t\022\017\n\007ack_ids\030\004 \003(\t\022\034\n\024ack_deadli" + + "ne_seconds\030\003 \001(\005\";\n\022AcknowledgeRequest\022\024" + + "\n\014subscription\030\001 \001(\t\022\017\n\007ack_ids\030\002 \003(\t\"\244\001" + + "\n\024StreamingPullRequest\022\024\n\014subscription\030\001" + + " \001(\t\022\017\n\007ack_ids\030\002 \003(\t\022\037\n\027modify_deadline" + + "_seconds\030\003 \003(\005\022\037\n\027modify_deadline_ack_id" + + "s\030\004 \003(\t\022#\n\033stream_ack_deadline_seconds\030\005" + + " \001(\005\"U\n\025StreamingPullResponse\022<\n\021receive" + + "d_messages\030\001 \003(\0132!.google.pubsub.v1.Rece" + + "ivedMessage\"\257\001\n\025CreateSnapshotRequest\022\014\n" + + "\004name\030\001 \001(\t\022\024\n\014subscription\030\002 \001(\t\022C\n\006lab" + + "els\030\003 \003(\01323.google.pubsub.v1.CreateSnaps" + + "hotRequest.LabelsEntry\032-\n\013LabelsEntry\022\013\n" + + "\003key\030\001 \001(\t\022\r\n\005value\030\002 \001(\t:\0028\001\"v\n\025UpdateS" + + "napshotRequest\022,\n\010snapshot\030\001 \001(\0132\032.googl" + + "e.pubsub.v1.Snapshot\022/\n\013update_mask\030\002 \001(" + + "\0132\032.google.protobuf.FieldMask\"\277\001\n\010Snapsh" + + "ot\022\014\n\004name\030\001 \001(\t\022\r\n\005topic\030\002 \001(\t\022/\n\013expir" + + "e_time\030\003 \001(\0132\032.google.protobuf.Timestamp" + + "\0226\n\006labels\030\004 \003(\0132&.google.pubsub.v1.Snap" + + "shot.LabelsEntry\032-\n\013LabelsEntry\022\013\n\003key\030\001" + + " \001(\t\022\r\n\005value\030\002 \001(\t:\0028\001\"&\n\022GetSnapshotRe" + + "quest\022\020\n\010snapshot\030\001 \001(\t\"N\n\024ListSnapshots" + + "Request\022\017\n\007project\030\001 \001(\t\022\021\n\tpage_size\030\002 " + + "\001(\005\022\022\n\npage_token\030\003 \001(\t\"_\n\025ListSnapshots" + + "Response\022-\n\tsnapshots\030\001 \003(\0132\032.google.pub" + + "sub.v1.Snapshot\022\027\n\017next_page_token\030\002 \001(\t" + + "\")\n\025DeleteSnapshotRequest\022\020\n\010snapshot\030\001 " + + "\001(\t\"m\n\013SeekRequest\022\024\n\014subscription\030\001 \001(\t" + + "\022*\n\004time\030\002 \001(\0132\032.google.protobuf.Timesta" + + "mpH\000\022\022\n\010snapshot\030\003 \001(\tH\000B\010\n\006target\"\016\n\014Se" + + "ekResponse2\277\010\n\tPublisher\022j\n\013CreateTopic\022" + + "\027.google.pubsub.v1.Topic\032\027.google.pubsub" + + ".v1.Topic\")\202\323\344\223\002#\032\036/v1/{name=projects/*/" + + "topics/*}:\001*\022}\n\013UpdateTopic\022$.google.pub" + + "sub.v1.UpdateTopicRequest\032\027.google.pubsu" + + "b.v1.Topic\"/\202\323\344\223\002)2$/v1/{topic.name=proj" + + "ects/*/topics/*}:\001*\022\202\001\n\007Publish\022 .google" + + ".pubsub.v1.PublishRequest\032!.google.pubsu" + + "b.v1.PublishResponse\"2\202\323\344\223\002,\"\'/v1/{topic" + + "=projects/*/topics/*}:publish:\001*\022o\n\010GetT" + + "opic\022!.google.pubsub.v1.GetTopicRequest\032" + + "\027.google.pubsub.v1.Topic\"\'\202\323\344\223\002!\022\037/v1/{t" + + "opic=projects/*/topics/*}\022\200\001\n\nListTopics" + + "\022#.google.pubsub.v1.ListTopicsRequest\032$." + + "google.pubsub.v1.ListTopicsResponse\"\'\202\323\344" + + "\223\002!\022\037/v1/{project=projects/*}/topics\022\262\001\n" + + "\026ListTopicSubscriptions\022/.google.pubsub." + + "v1.ListTopicSubscriptionsRequest\0320.googl" + + "e.pubsub.v1.ListTopicSubscriptionsRespon" + + "se\"5\202\323\344\223\002/\022-/v1/{topic=projects/*/topics" + + "/*}/subscriptions\022\242\001\n\022ListTopicSnapshots" + + "\022+.google.pubsub.v1.ListTopicSnapshotsRe" + + "quest\032,.google.pubsub.v1.ListTopicSnapsh" + + "otsResponse\"1\202\323\344\223\002+\022)/v1/{topic=projects" + + "/*/topics/*}/snapshots\022t\n\013DeleteTopic\022$." + + "google.pubsub.v1.DeleteTopicRequest\032\026.go" + + "ogle.protobuf.Empty\"\'\202\323\344\223\002!*\037/v1/{topic=" + + "projects/*/topics/*}2\371\021\n\nSubscriber\022\206\001\n\022" + + "CreateSubscription\022\036.google.pubsub.v1.Su" + + "bscription\032\036.google.pubsub.v1.Subscripti" + + "on\"0\202\323\344\223\002*\032%/v1/{name=projects/*/subscri" + + "ptions/*}:\001*\022\222\001\n\017GetSubscription\022(.googl" + + "e.pubsub.v1.GetSubscriptionRequest\032\036.goo" + + "gle.pubsub.v1.Subscription\"5\202\323\344\223\002/\022-/v1/" + + "{subscription=projects/*/subscriptions/*" + + "}\022\240\001\n\022UpdateSubscription\022+.google.pubsub" + + ".v1.UpdateSubscriptionRequest\032\036.google.p" + + "ubsub.v1.Subscription\"=\202\323\344\223\002722/v1/{subs" + + "cription.name=projects/*/subscriptions/*" + + "}:\001*\022\234\001\n\021ListSubscriptions\022*.google.pubs" + + "ub.v1.ListSubscriptionsRequest\032+.google." + + "pubsub.v1.ListSubscriptionsResponse\".\202\323\344" + + "\223\002(\022&/v1/{project=projects/*}/subscripti" + + "ons\022\220\001\n\022DeleteSubscription\022+.google.pubs" + + "ub.v1.DeleteSubscriptionRequest\032\026.google" + + ".protobuf.Empty\"5\202\323\344\223\002/*-/v1/{subscripti" + + "on=projects/*/subscriptions/*}\022\243\001\n\021Modif" + + "yAckDeadline\022*.google.pubsub.v1.ModifyAc" + + "kDeadlineRequest\032\026.google.protobuf.Empty" + + "\"J\202\323\344\223\002D\"?/v1/{subscription=projects/*/s" + + "ubscriptions/*}:modifyAckDeadline:\001*\022\221\001\n" + + "\013Acknowledge\022$.google.pubsub.v1.Acknowle" + + "dgeRequest\032\026.google.protobuf.Empty\"D\202\323\344\223" + + "\002>\"9/v1/{subscription=projects/*/subscri" + + "ptions/*}:acknowledge:\001*\022\204\001\n\004Pull\022\035.goog" + + "le.pubsub.v1.PullRequest\032\036.google.pubsub" + + ".v1.PullResponse\"=\202\323\344\223\0027\"2/v1/{subscript" + + "ion=projects/*/subscriptions/*}:pull:\001*\022" + + "f\n\rStreamingPull\022&.google.pubsub.v1.Stre" + + "amingPullRequest\032\'.google.pubsub.v1.Stre" + + "amingPullResponse\"\000(\0010\001\022\240\001\n\020ModifyPushCo" + + "nfig\022).google.pubsub.v1.ModifyPushConfig" + + "Request\032\026.google.protobuf.Empty\"I\202\323\344\223\002C\"" + + ">/v1/{subscription=projects/*/subscripti" + + "ons/*}:modifyPushConfig:\001*\022~\n\013GetSnapsho" + + "t\022$.google.pubsub.v1.GetSnapshotRequest\032" + + "\032.google.pubsub.v1.Snapshot\"-\202\323\344\223\002\'\022%/v1" + + "/{snapshot=projects/*/snapshots/*}\022\214\001\n\rL" + + "istSnapshots\022&.google.pubsub.v1.ListSnap" + + "shotsRequest\032\'.google.pubsub.v1.ListSnap" + + "shotsResponse\"*\202\323\344\223\002$\022\"/v1/{project=proj" + + "ects/*}/snapshots\022\203\001\n\016CreateSnapshot\022\'.g" + + "oogle.pubsub.v1.CreateSnapshotRequest\032\032." + + "google.pubsub.v1.Snapshot\",\202\323\344\223\002&\032!/v1/{" + + "name=projects/*/snapshots/*}:\001*\022\214\001\n\016Upda" + + "teSnapshot\022\'.google.pubsub.v1.UpdateSnap" + + "shotRequest\032\032.google.pubsub.v1.Snapshot\"" + + "5\202\323\344\223\002/2*/v1/{snapshot.name=projects/*/s" + + "napshots/*}:\001*\022\200\001\n\016DeleteSnapshot\022\'.goog" + + "le.pubsub.v1.DeleteSnapshotRequest\032\026.goo" + + "gle.protobuf.Empty\"-\202\323\344\223\002\'*%/v1/{snapsho" + + "t=projects/*/snapshots/*}\022\204\001\n\004Seek\022\035.goo" + + "gle.pubsub.v1.SeekRequest\032\036.google.pubsu" + + "b.v1.SeekResponse\"=\202\323\344\223\0027\"2/v1/{subscrip" + + "tion=projects/*/subscriptions/*}:seek:\001*" + + "B\256\001\n\024com.google.pubsub.v1B\013PubsubProtoP\001" + + "Z6google.golang.org/genproto/googleapis/" + + "pubsub/v1;pubsub\370\001\001\252\002\026Google.Cloud.PubSu" + + "b.V1\312\002\026Google\\Cloud\\PubSub\\V1\352\002\031Google::" + + "Cloud::PubSub::V1b\006proto3" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -443,7 +444,7 @@ public com.google.protobuf.ExtensionRegistry assignDescriptors( new com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( internal_static_google_pubsub_v1_PubsubMessage_descriptor, new java.lang.String[] { - "Data", "Attributes", "MessageId", "PublishTime", + "Data", "Attributes", "MessageId", "PublishTime", "OrderingKey", }); internal_static_google_pubsub_v1_PubsubMessage_AttributesEntry_descriptor = internal_static_google_pubsub_v1_PubsubMessage_descriptor.getNestedTypes().get(0); @@ -554,6 +555,7 @@ public com.google.protobuf.ExtensionRegistry assignDescriptors( "RetainAckedMessages", "MessageRetentionDuration", "Labels", + "EnableMessageOrdering", "ExpirationPolicy", }); internal_static_google_pubsub_v1_Subscription_LabelsEntry_descriptor = diff --git a/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/Subscription.java b/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/Subscription.java index efb2168d1aa7..3ddf8870e33c 100644 --- a/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/Subscription.java +++ b/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/Subscription.java @@ -27,6 +27,7 @@ private Subscription() { topic_ = ""; ackDeadlineSeconds_ = 0; retainAckedMessages_ = false; + enableMessageOrdering_ = false; } @java.lang.Override @@ -120,6 +121,11 @@ private Subscription( labels_.getMutableMap().put(labels__.getKey(), labels__.getValue()); break; } + case 80: + { + enableMessageOrdering_ = input.readBool(); + break; + } case 90: { com.google.pubsub.v1.ExpirationPolicy.Builder subBuilder = null; @@ -546,6 +552,13 @@ public java.lang.String getLabelsOrThrow(java.lang.String key) { return map.get(key); } + public static final int ENABLE_MESSAGE_ORDERING_FIELD_NUMBER = 10; + private boolean enableMessageOrdering_; + /** bool enable_message_ordering = 10; */ + public boolean getEnableMessageOrdering() { + return enableMessageOrdering_; + } + public static final int EXPIRATION_POLICY_FIELD_NUMBER = 11; private com.google.pubsub.v1.ExpirationPolicy expirationPolicy_; /** @@ -645,6 +658,9 @@ public void writeTo(com.google.protobuf.CodedOutputStream output) throws java.io } com.google.protobuf.GeneratedMessageV3.serializeStringMapTo( output, internalGetLabels(), LabelsDefaultEntryHolder.defaultEntry, 9); + if (enableMessageOrdering_ != false) { + output.writeBool(10, enableMessageOrdering_); + } if (expirationPolicy_ != null) { output.writeMessage(11, getExpirationPolicy()); } @@ -687,6 +703,9 @@ public int getSerializedSize() { .build(); size += com.google.protobuf.CodedOutputStream.computeMessageSize(9, labels__); } + if (enableMessageOrdering_ != false) { + size += com.google.protobuf.CodedOutputStream.computeBoolSize(10, enableMessageOrdering_); + } if (expirationPolicy_ != null) { size += com.google.protobuf.CodedOutputStream.computeMessageSize(11, getExpirationPolicy()); } @@ -719,6 +738,7 @@ public boolean equals(final java.lang.Object obj) { result = result && getMessageRetentionDuration().equals(other.getMessageRetentionDuration()); } result = result && internalGetLabels().equals(other.internalGetLabels()); + result = result && (getEnableMessageOrdering() == other.getEnableMessageOrdering()); result = result && (hasExpirationPolicy() == other.hasExpirationPolicy()); if (hasExpirationPolicy()) { result = result && getExpirationPolicy().equals(other.getExpirationPolicy()); @@ -754,6 +774,8 @@ public int hashCode() { hash = (37 * hash) + LABELS_FIELD_NUMBER; hash = (53 * hash) + internalGetLabels().hashCode(); } + hash = (37 * hash) + ENABLE_MESSAGE_ORDERING_FIELD_NUMBER; + hash = (53 * hash) + com.google.protobuf.Internal.hashBoolean(getEnableMessageOrdering()); if (hasExpirationPolicy()) { hash = (37 * hash) + EXPIRATION_POLICY_FIELD_NUMBER; hash = (53 * hash) + getExpirationPolicy().hashCode(); @@ -943,6 +965,8 @@ public Builder clear() { messageRetentionDurationBuilder_ = null; } internalGetMutableLabels().clear(); + enableMessageOrdering_ = false; + if (expirationPolicyBuilder_ == null) { expirationPolicy_ = null; } else { @@ -993,6 +1017,7 @@ public com.google.pubsub.v1.Subscription buildPartial() { } result.labels_ = internalGetLabels(); result.labels_.makeImmutable(); + result.enableMessageOrdering_ = enableMessageOrdering_; if (expirationPolicyBuilder_ == null) { result.expirationPolicy_ = expirationPolicy_; } else { @@ -1069,6 +1094,9 @@ public Builder mergeFrom(com.google.pubsub.v1.Subscription other) { mergeMessageRetentionDuration(other.getMessageRetentionDuration()); } internalGetMutableLabels().mergeFrom(other.internalGetLabels()); + if (other.getEnableMessageOrdering() != false) { + setEnableMessageOrdering(other.getEnableMessageOrdering()); + } if (other.hasExpirationPolicy()) { mergeExpirationPolicy(other.getExpirationPolicy()); } @@ -2106,6 +2134,26 @@ public Builder putAllLabels(java.util.Map va return this; } + private boolean enableMessageOrdering_; + /** bool enable_message_ordering = 10; */ + public boolean getEnableMessageOrdering() { + return enableMessageOrdering_; + } + /** bool enable_message_ordering = 10; */ + public Builder setEnableMessageOrdering(boolean value) { + + enableMessageOrdering_ = value; + onChanged(); + return this; + } + /** bool enable_message_ordering = 10; */ + public Builder clearEnableMessageOrdering() { + + enableMessageOrdering_ = false; + onChanged(); + return this; + } + private com.google.pubsub.v1.ExpirationPolicy expirationPolicy_ = null; private com.google.protobuf.SingleFieldBuilderV3< com.google.pubsub.v1.ExpirationPolicy, diff --git a/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/SubscriptionOrBuilder.java b/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/SubscriptionOrBuilder.java index dc06e7fb432b..1d340dd65a49 100644 --- a/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/SubscriptionOrBuilder.java +++ b/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/SubscriptionOrBuilder.java @@ -264,6 +264,9 @@ public interface SubscriptionOrBuilder */ java.lang.String getLabelsOrThrow(java.lang.String key); + /** bool enable_message_ordering = 10; */ + boolean getEnableMessageOrdering(); + /** * * diff --git a/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/proto/google/pubsub/v1/pubsub.proto b/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/proto/google/pubsub/v1/pubsub.proto index 02d0bf34b3ee..cd7ca090c3f2 100644 --- a/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/proto/google/pubsub/v1/pubsub.proto +++ b/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/proto/google/pubsub/v1/pubsub.proto @@ -399,6 +399,8 @@ message PubsubMessage { // it receives the `Publish` call. It must not be populated by the // publisher in a `Publish` call. google.protobuf.Timestamp publish_time = 4; + + string ordering_key = 5; } // Request for the GetTopic method. @@ -599,6 +601,7 @@ message Subscription { // managing labels. map labels = 9; + bool enable_message_ordering = 10; // A policy that specifies the conditions for this subscription's expiration. // A subscription is considered active as long as any connected subscriber is // successfully consuming messages from the subscription or is issuing From bf1c63ffd4323598229ebe1b7589d33e94054381 Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Fri, 8 Feb 2019 16:06:18 -0500 Subject: [PATCH 2/5] Add support for ordered publishing --- .../com/google/cloud/pubsub/v1/Publisher.java | 181 +++++++++++++----- .../pubsub/v1/FakePublisherServiceImpl.java | 22 ++- .../cloud/pubsub/v1/PublisherImplTest.java | 173 +++++++++++++++++ 3 files changed, 324 insertions(+), 52 deletions(-) diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 557a483073de..b5f39c1f7fcc 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -45,9 +45,12 @@ import com.google.pubsub.v1.TopicNames; import java.io.IOException; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -81,16 +84,16 @@ public class Publisher { private final String topicName; private final BatchingSettings batchingSettings; + private final boolean enableMessageOrdering; private final Lock messagesBatchLock; - private List messagesBatch; - private int batchedBytes; - + private final Map messagesBatches; private final AtomicBoolean activeAlarm; private final PublisherStub publisherStub; private final ScheduledExecutorService executor; + private final SequentialExecutorService sequentialExecutor; private final AtomicBoolean shutdown; private final List closeables; private final MessageWaiter messagesWaiter; @@ -110,11 +113,13 @@ private Publisher(Builder builder) throws IOException { topicName = builder.topicName; this.batchingSettings = builder.batchingSettings; + this.enableMessageOrdering = builder.enableMessageOrdering; - messagesBatch = new LinkedList<>(); + messagesBatches = new HashMap<>(); messagesBatchLock = new ReentrantLock(); activeAlarm = new AtomicBoolean(false); executor = builder.executorProvider.getExecutor(); + sequentialExecutor = new SequentialExecutorService<>(executor); if (builder.executorProvider.shouldAutoClose()) { closeables = Collections.singletonList(new ExecutorAsBackgroundResource(executor)); @@ -124,8 +129,10 @@ private Publisher(Builder builder) throws IOException { // Publisher used to take maxAttempt == 0 to mean infinity, but to GAX it means don't retry. // We post-process this here to keep backward-compatibility. + // Also, if "message ordering" is enabled, the publisher should retry sending the failed + // message infinitely rather than sending the next one. RetrySettings retrySettings = builder.retrySettings; - if (retrySettings.getMaxAttempts() == 0) { + if (retrySettings.getMaxAttempts() == 0 || builder.enableMessageOrdering) { retrySettings = retrySettings.toBuilder().setMaxAttempts(Integer.MAX_VALUE).build(); } @@ -192,6 +199,12 @@ public ApiFuture publish(PubsubMessage message) { throw new IllegalStateException("Cannot publish on a shut-down publisher."); } + final String orderingKey = message.getOrderingKey(); + if (!orderingKey.isEmpty() && !enableMessageOrdering) { + throw new IllegalStateException( + "Cannot publish a message with an ordering key when message ordeirng is not enabled."); + } + final int messageSize = message.getSerializedSize(); OutstandingBatch batchToSend = null; SettableApiFuture publishResult = SettableApiFuture.create(); @@ -199,35 +212,38 @@ public ApiFuture publish(PubsubMessage message) { messagesBatchLock.lock(); try { // Check if the next message makes the current batch exceed the max batch byte size. - if (!messagesBatch.isEmpty() + MessagesBatch batch = messagesBatches.get(orderingKey); + if (batch == null) { + batch = new MessagesBatch(orderingKey); + messagesBatches.put(orderingKey, batch); + } + if (!batch.isEmpty() && hasBatchingBytes() - && batchedBytes + messageSize >= getMaxBatchBytes()) { - batchToSend = new OutstandingBatch(messagesBatch, batchedBytes); - messagesBatch = new LinkedList<>(); - batchedBytes = 0; + && batch.getBatchedBytes() + messageSize >= getMaxBatchBytes()) { + batchToSend = batch.popOutstandingBatch(); } // Border case if the message to send is greater or equals to the max batch size then can't // be included in the current batch and instead sent immediately. if (!hasBatchingBytes() || messageSize < getMaxBatchBytes()) { - batchedBytes += messageSize; - messagesBatch.add(outstandingPublish); - + batch.addMessage(outstandingPublish, messageSize); // If after adding the message we have reached the batch max messages then we have a batch // to send. - if (messagesBatch.size() == getBatchingSettings().getElementCountThreshold()) { - batchToSend = new OutstandingBatch(messagesBatch, batchedBytes); - messagesBatch = new LinkedList<>(); - batchedBytes = 0; + if (batch.getMessagesCount() == getBatchingSettings().getElementCountThreshold()) { + batchToSend = batch.popOutstandingBatch(); } } + // Setup the next duration based delivery alarm if there are messages batched. - if (!messagesBatch.isEmpty()) { + if (!batch.isEmpty()) { setupDurationBasedPublishAlarm(); - } else if (currentAlarmFuture != null) { - logger.log(Level.FINER, "Cancelling alarm, no more messages"); - if (activeAlarm.getAndSet(false)) { - currentAlarmFuture.cancel(false); + } else { + messagesBatches.remove(orderingKey); + if (currentAlarmFuture != null) { + logger.log(Level.FINER, "Cancelling alarm, no more messages"); + if (activeAlarm.getAndSet(false)) { + currentAlarmFuture.cancel(false); + } } } } finally { @@ -238,14 +254,8 @@ && hasBatchingBytes() if (batchToSend != null) { logger.log(Level.FINER, "Scheduling a batch for immediate sending."); - final OutstandingBatch finalBatchToSend = batchToSend; - executor.execute( - new Runnable() { - @Override - public void run() { - publishOutstandingBatch(finalBatchToSend); - } - }); + publishOutstandingBatch(batchToSend); + publishAllOutstanding(); } // If the message is over the size limit, it was not added to the pending messages and it will @@ -253,14 +263,9 @@ public void run() { if (hasBatchingBytes() && messageSize >= getMaxBatchBytes()) { logger.log( Level.FINER, "Message exceeds the max batch bytes, scheduling it for immediate send."); - executor.execute( - new Runnable() { - @Override - public void run() { - publishOutstandingBatch( - new OutstandingBatch(ImmutableList.of(outstandingPublish), messageSize)); - } - }); + publishOutstandingBatch( + new OutstandingBatch(ImmutableList.of(outstandingPublish), messageSize, orderingKey)); + publishAllOutstanding(); } return publishResult; @@ -292,29 +297,33 @@ public void run() { */ public void publishAllOutstanding() { messagesBatchLock.lock(); - OutstandingBatch batchToSend; try { - if (messagesBatch.isEmpty()) { - return; + for (MessagesBatch batch : messagesBatches.values()) { + if (!batch.isEmpty()) { + // TODO(kimkyung-goog): Do not release `messageBatchLock` when publishing a batch. If it's + // released, the order of publishing cannot be guaranteed if `publish()` is called while + // this function is running. This locking mechanism needs to be improved if it causes any + // performance degradation. + publishOutstandingBatch(batch.popOutstandingBatch()); + } } - batchToSend = new OutstandingBatch(messagesBatch, batchedBytes); - messagesBatch = new LinkedList<>(); - batchedBytes = 0; + messagesBatches.clear(); } finally { messagesBatchLock.unlock(); } - publishOutstandingBatch(batchToSend); } - private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) { + private ApiFuture publishCall(final OutstandingBatch outstandingBatch) { PublishRequest.Builder publishRequest = PublishRequest.newBuilder(); publishRequest.setTopic(topicName); for (OutstandingPublish outstandingPublish : outstandingBatch.outstandingPublishes) { publishRequest.addMessages(outstandingPublish.message); } + return publisherStub.publishCallable().futureCall(publishRequest.build()); + } - ApiFutures.addCallback( - publisherStub.publishCallable().futureCall(publishRequest.build()), + private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) { + final ApiFutureCallback futureCallback = new ApiFutureCallback() { @Override public void onSuccess(PublishResponse result) { @@ -353,7 +362,28 @@ public void onFailure(Throwable t) { messagesWaiter.incrementPendingMessages(-outstandingBatch.size()); } } - }); + }; + + if (outstandingBatch.orderingKey.isEmpty()) { + // If ordering key is empty, publish the batch using the normal executor. + Runnable task = + new Runnable() { + public void run() { + ApiFutures.addCallback(publishCall(outstandingBatch), futureCallback); + } + }; + executor.execute(task); + } else { + // If ordering key is specified, publish the batch using the sequential executor. + Callable func = + new Callable() { + public ApiFuture call() { + return publishCall(outstandingBatch); + } + }; + ApiFutures.addCallback( + sequentialExecutor.submit(outstandingBatch.orderingKey, func), futureCallback); + } } private static final class OutstandingBatch { @@ -361,12 +391,15 @@ private static final class OutstandingBatch { final long creationTime; int attempt; int batchSizeBytes; + final String orderingKey; - OutstandingBatch(List outstandingPublishes, int batchSizeBytes) { + OutstandingBatch( + List outstandingPublishes, int batchSizeBytes, String orderingKey) { this.outstandingPublishes = outstandingPublishes; attempt = 1; creationTime = System.currentTimeMillis(); this.batchSizeBytes = batchSizeBytes; + this.orderingKey = orderingKey; } public int getAttempt() { @@ -504,7 +537,7 @@ public static final class Builder { .setRpcTimeoutMultiplier(2) .setMaxRpcTimeout(DEFAULT_RPC_TIMEOUT) .build(); - + static final boolean DEFAULT_ENABLE_MESSAGE_ORDERING = false; private static final int THREADS_PER_CPU = 5; static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER = InstantiatingExecutorProvider.newBuilder() @@ -518,6 +551,8 @@ public static final class Builder { RetrySettings retrySettings = DEFAULT_RETRY_SETTINGS; + boolean enableMessageOrdering = DEFAULT_ENABLE_MESSAGE_ORDERING; + TransportChannelProvider channelProvider = TopicAdminSettings.defaultGrpcTransportProviderBuilder().setChannelsPerCpu(1).build(); @@ -604,6 +639,12 @@ public Builder setRetrySettings(RetrySettings retrySettings) { return this; } + /** Sets the message ordering option. */ + public Builder setEnableMessageOrdering(boolean enableMessageOrdering) { + this.enableMessageOrdering = enableMessageOrdering; + return this; + } + /** Gives the ability to set a custom executor to be used by the library. */ public Builder setExecutorProvider(ExecutorProvider executorProvider) { this.executorProvider = Preconditions.checkNotNull(executorProvider); @@ -614,4 +655,42 @@ public Publisher build() throws IOException { return new Publisher(this); } } + + private static class MessagesBatch { + private List messages = new LinkedList<>(); + private int batchedBytes; + private String orderingKey; + + public MessagesBatch(String orderingKey) { + this.orderingKey = orderingKey; + } + + public OutstandingBatch popOutstandingBatch() { + OutstandingBatch batch = new OutstandingBatch(messages, batchedBytes, orderingKey); + reset(); + return batch; + } + + private void reset() { + messages = new LinkedList<>(); + batchedBytes = 0; + } + + public boolean isEmpty() { + return messages.isEmpty(); + } + + public int getBatchedBytes() { + return batchedBytes; + } + + public void addMessage(OutstandingPublish message, int messageSize) { + messages.add(message); + batchedBytes += messageSize; + } + + public int getMessagesCount() { + return messages.size(); + } + }; } diff --git a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakePublisherServiceImpl.java b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakePublisherServiceImpl.java index 396b5d05bd5f..620a09ac98bc 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakePublisherServiceImpl.java +++ b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakePublisherServiceImpl.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; /** * A fake implementation of {@link PublisherImplBase}, that can be used to test clients of a Cloud @@ -33,6 +34,8 @@ class FakePublisherServiceImpl extends PublisherImplBase { private final LinkedBlockingQueue requests = new LinkedBlockingQueue<>(); private final LinkedBlockingQueue publishResponses = new LinkedBlockingQueue<>(); + private final AtomicInteger nextMessageId = new AtomicInteger(1); + private boolean autoPublishResponse; /** Class used to save the state of a possible response. */ private static class Response { @@ -75,7 +78,15 @@ public void publish(PublishRequest request, StreamObserver resp requests.add(request); Response response; try { - response = publishResponses.take(); + if (autoPublishResponse) { + PublishResponse.Builder builder = PublishResponse.newBuilder(); + for (int i = 0; i < request.getMessagesCount(); i++) { + builder.addMessageIds(Integer.toString(nextMessageId.getAndIncrement())); + } + response = new Response(builder.build()); + } else { + response = publishResponses.take(); + } } catch (InterruptedException e) { throw new IllegalArgumentException(e); } @@ -87,6 +98,15 @@ public void publish(PublishRequest request, StreamObserver resp } } + /** + * If enabled, PublishResponse is generated with a unique message id automatically when publish() + * is called. + */ + public FakePublisherServiceImpl setAutoPublishResponse(boolean autoPublishResponse) { + this.autoPublishResponse = autoPublishResponse; + return this; + } + public FakePublisherServiceImpl addPublishResponse(PublishResponse publishResponse) { publishResponses.add(new Response(publishResponse)); return this; diff --git a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index 2901ae4b0a9d..720c6b601822 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -35,12 +35,14 @@ import com.google.cloud.pubsub.v1.Publisher.Builder; import com.google.protobuf.ByteString; import com.google.pubsub.v1.ProjectTopicName; +import com.google.pubsub.v1.PublishRequest; import com.google.pubsub.v1.PublishResponse; import com.google.pubsub.v1.PubsubMessage; import io.grpc.Server; import io.grpc.Status; import io.grpc.StatusException; import io.grpc.inprocess.InProcessServerBuilder; +import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.junit.After; @@ -241,6 +243,177 @@ private ApiFuture sendTestMessage(Publisher publisher, String data) { PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(data)).build()); } + @Test + public void testBatchedMessagesWithOrderingKeyByNum() throws Exception { + // Limit the number of maximum elements in a single batch to 3. + Publisher publisher = + getTestPublisherBuilder() + .setBatchingSettings( + Publisher.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(3L) + .setDelayThreshold(Duration.ofSeconds(100)) + .build()) + .setEnableMessageOrdering(true) + .build(); + testPublisherServiceImpl.setAutoPublishResponse(true); + + // Publish two messages with ordering key, "OrderA", and other two messages with "OrderB". + ApiFuture publishFuture1 = sendTestMessageWithOrderingKey(publisher, "m1", "OrderA"); + ApiFuture publishFuture2 = sendTestMessageWithOrderingKey(publisher, "m2", "OrderB"); + ApiFuture publishFuture3 = sendTestMessageWithOrderingKey(publisher, "m3", "OrderA"); + ApiFuture publishFuture4 = sendTestMessageWithOrderingKey(publisher, "m4", "OrderB"); + + // Verify that none of them were published since the batching size is 3. + assertFalse(publishFuture1.isDone()); + assertFalse(publishFuture2.isDone()); + assertFalse(publishFuture3.isDone()); + assertFalse(publishFuture4.isDone()); + + // One of the batches reaches the limit. + ApiFuture publishFuture5 = sendTestMessageWithOrderingKey(publisher, "m5", "OrderA"); + // Verify that they were delivered in order per ordering key. + assertTrue(Integer.parseInt(publishFuture1.get()) < Integer.parseInt(publishFuture3.get())); + assertTrue(Integer.parseInt(publishFuture2.get()) < Integer.parseInt(publishFuture4.get())); + assertTrue(Integer.parseInt(publishFuture3.get()) < Integer.parseInt(publishFuture5.get())); + + // Verify that every message within the same batch has the same ordering key. + List requests = testPublisherServiceImpl.getCapturedRequests(); + for (PublishRequest request : requests) { + if (request.getMessagesCount() > 1) { + String orderingKey = request.getMessages(0).getOrderingKey(); + for (PubsubMessage message : request.getMessagesList()) { + assertEquals(message.getOrderingKey(), orderingKey); + } + } + } + publisher.shutdown(); + } + + @Test + public void testBatchedMessagesWithOrderingKeyByDuration() throws Exception { + // Limit the batching timeout to 100 seconds. + Publisher publisher = + getTestPublisherBuilder() + .setBatchingSettings( + Publisher.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(10L) + .setDelayThreshold(Duration.ofSeconds(100)) + .build()) + .setEnableMessageOrdering(true) + .build(); + testPublisherServiceImpl.setAutoPublishResponse(true); + + // Publish two messages with ordering key, "OrderA", and other two messages with "OrderB". + ApiFuture publishFuture1 = sendTestMessageWithOrderingKey(publisher, "m1", "OrderA"); + ApiFuture publishFuture2 = sendTestMessageWithOrderingKey(publisher, "m2", "OrderB"); + ApiFuture publishFuture3 = sendTestMessageWithOrderingKey(publisher, "m3", "OrderA"); + ApiFuture publishFuture4 = sendTestMessageWithOrderingKey(publisher, "m4", "OrderB"); + + // Verify that none of them were published since the batching size is 10 and timeout has not + // been expired. + assertFalse(publishFuture1.isDone()); + assertFalse(publishFuture2.isDone()); + assertFalse(publishFuture3.isDone()); + assertFalse(publishFuture4.isDone()); + + // The timeout expires. + fakeExecutor.advanceTime(Duration.ofSeconds(100)); + + // Verify that they were delivered in order per ordering key. + assertTrue(Integer.parseInt(publishFuture1.get()) < Integer.parseInt(publishFuture3.get())); + assertTrue(Integer.parseInt(publishFuture2.get()) < Integer.parseInt(publishFuture4.get())); + + // Verify that every message within the same batch has the same ordering key. + List requests = testPublisherServiceImpl.getCapturedRequests(); + for (PublishRequest request : requests) { + if (request.getMessagesCount() > 1) { + String orderingKey = request.getMessages(0).getOrderingKey(); + for (PubsubMessage message : request.getMessagesList()) { + assertEquals(message.getOrderingKey(), orderingKey); + } + } + } + publisher.shutdown(); + } + + @Test + public void testOrderingKeyWhenDisabled_throwsException() throws Exception { + // Message ordering is disabled by default. + Publisher publisher = getTestPublisherBuilder().build(); + try { + ApiFuture publishFuture = sendTestMessageWithOrderingKey(publisher, "m1", "orderA"); + fail("Should have thrown an IllegalStateException"); + } catch (IllegalStateException expected) { + // expected + } + publisher.shutdown(); + } + + @Test + public void testEnableMessageOrdering_overwritesMaxAttempts() throws Exception { + // Set maxAttempts to 1 and enableMessageOrdering to true at the same time. + Publisher publisher = + getTestPublisherBuilder() + .setExecutorProvider(SINGLE_THREAD_EXECUTOR) + .setRetrySettings( + Publisher.Builder.DEFAULT_RETRY_SETTINGS + .toBuilder() + .setTotalTimeout(Duration.ofSeconds(10)) + .setMaxAttempts(1) + .build()) + .setEnableMessageOrdering(true) + .build(); + + // Although maxAttempts is 1, the publisher will retry until it succeeds since + // enableMessageOrdering is true. + testPublisherServiceImpl.addPublishError(new Throwable("Transiently failing")); + testPublisherServiceImpl.addPublishError(new Throwable("Transiently failing")); + testPublisherServiceImpl.addPublishError(new Throwable("Transiently failing")); + testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("1")); + + ApiFuture publishFuture1 = sendTestMessageWithOrderingKey(publisher, "m1", "orderA"); + assertEquals("1", publishFuture1.get()); + + assertEquals(4, testPublisherServiceImpl.getCapturedRequests().size()); + publisher.shutdown(); + } + + @Test(expected = ExecutionException.class) + public void testEnableMessageOrdering_failsImmediatelyForNonRetryableError() throws Exception { + Publisher publisher = + getTestPublisherBuilder() + .setExecutorProvider(SINGLE_THREAD_EXECUTOR) + .setRetrySettings( + Publisher.Builder.DEFAULT_RETRY_SETTINGS + .toBuilder() + .setTotalTimeout(Duration.ofSeconds(10)) + .setMaxAttempts(2) + .build()) + .setEnableMessageOrdering(true) + .build(); + + testPublisherServiceImpl.addPublishError(new StatusException(Status.INVALID_ARGUMENT)); + ApiFuture publishFuture1 = sendTestMessageWithOrderingKey(publisher, "m1", "orderA"); + + try { + publishFuture1.get(); + } finally { + assertEquals(1, testPublisherServiceImpl.getCapturedRequests().size()); + publisher.shutdown(); + } + } + + private ApiFuture sendTestMessageWithOrderingKey( + Publisher publisher, String data, String orderingKey) { + return publisher.publish( + PubsubMessage.newBuilder() + .setOrderingKey(orderingKey) + .setData(ByteString.copyFromUtf8(data)) + .build()); + } + @Test public void testErrorPropagation() throws Exception { Publisher publisher = From 98b5072ab4fee0e024c65cc66ef8b3b786254655 Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Mon, 11 Feb 2019 14:37:51 -0500 Subject: [PATCH 3/5] Add comments about ordering key properties. --- .../com/google/pubsub/v1/PubsubMessage.java | 105 ++++++++++++++++-- .../pubsub/v1/PubsubMessageOrBuilder.java | 30 ++++- .../com/google/pubsub/v1/Subscription.java | 60 +++++++++- .../pubsub/v1/SubscriptionOrBuilder.java | 15 ++- .../main/proto/google/pubsub/v1/pubsub.proto | 15 +++ 5 files changed, 211 insertions(+), 14 deletions(-) diff --git a/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/PubsubMessage.java b/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/PubsubMessage.java index 8b09b93c5033..103856373e62 100644 --- a/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/PubsubMessage.java +++ b/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/PubsubMessage.java @@ -360,7 +360,20 @@ public com.google.protobuf.TimestampOrBuilder getPublishTimeOrBuilder() { public static final int ORDERING_KEY_FIELD_NUMBER = 5; private volatile java.lang.Object orderingKey_; - /** string ordering_key = 5; */ + /** + * + * + *
+   * WARNING: `ordering_key` is an experimental field not yet
+   * supported by the service.
+   * Identifies related messages for which publish order should be respected.
+   * If a `Subscription` has `enable_message_ordering` set to `true`, messages
+   * published with the same `ordering_key` value will be delivered to
+   * subscribers in the order in which they are received by the Pub/Sub system.
+   * 
+ * + * string ordering_key = 5; + */ public java.lang.String getOrderingKey() { java.lang.Object ref = orderingKey_; if (ref instanceof java.lang.String) { @@ -372,7 +385,20 @@ public java.lang.String getOrderingKey() { return s; } } - /** string ordering_key = 5; */ + /** + * + * + *
+   * WARNING: `ordering_key` is an experimental field not yet
+   * supported by the service.
+   * Identifies related messages for which publish order should be respected.
+   * If a `Subscription` has `enable_message_ordering` set to `true`, messages
+   * published with the same `ordering_key` value will be delivered to
+   * subscribers in the order in which they are received by the Pub/Sub system.
+   * 
+ * + * string ordering_key = 5; + */ public com.google.protobuf.ByteString getOrderingKeyBytes() { java.lang.Object ref = orderingKey_; if (ref instanceof java.lang.String) { @@ -1325,7 +1351,20 @@ public com.google.protobuf.TimestampOrBuilder getPublishTimeOrBuilder() { } private java.lang.Object orderingKey_ = ""; - /** string ordering_key = 5; */ + /** + * + * + *
+     * WARNING: `ordering_key` is an experimental field not yet
+     * supported by the service.
+     * Identifies related messages for which publish order should be respected.
+     * If a `Subscription` has `enable_message_ordering` set to `true`, messages
+     * published with the same `ordering_key` value will be delivered to
+     * subscribers in the order in which they are received by the Pub/Sub system.
+     * 
+ * + * string ordering_key = 5; + */ public java.lang.String getOrderingKey() { java.lang.Object ref = orderingKey_; if (!(ref instanceof java.lang.String)) { @@ -1337,7 +1376,20 @@ public java.lang.String getOrderingKey() { return (java.lang.String) ref; } } - /** string ordering_key = 5; */ + /** + * + * + *
+     * WARNING: `ordering_key` is an experimental field not yet
+     * supported by the service.
+     * Identifies related messages for which publish order should be respected.
+     * If a `Subscription` has `enable_message_ordering` set to `true`, messages
+     * published with the same `ordering_key` value will be delivered to
+     * subscribers in the order in which they are received by the Pub/Sub system.
+     * 
+ * + * string ordering_key = 5; + */ public com.google.protobuf.ByteString getOrderingKeyBytes() { java.lang.Object ref = orderingKey_; if (ref instanceof String) { @@ -1349,7 +1401,20 @@ public com.google.protobuf.ByteString getOrderingKeyBytes() { return (com.google.protobuf.ByteString) ref; } } - /** string ordering_key = 5; */ + /** + * + * + *
+     * WARNING: `ordering_key` is an experimental field not yet
+     * supported by the service.
+     * Identifies related messages for which publish order should be respected.
+     * If a `Subscription` has `enable_message_ordering` set to `true`, messages
+     * published with the same `ordering_key` value will be delivered to
+     * subscribers in the order in which they are received by the Pub/Sub system.
+     * 
+ * + * string ordering_key = 5; + */ public Builder setOrderingKey(java.lang.String value) { if (value == null) { throw new NullPointerException(); @@ -1359,14 +1424,40 @@ public Builder setOrderingKey(java.lang.String value) { onChanged(); return this; } - /** string ordering_key = 5; */ + /** + * + * + *
+     * WARNING: `ordering_key` is an experimental field not yet
+     * supported by the service.
+     * Identifies related messages for which publish order should be respected.
+     * If a `Subscription` has `enable_message_ordering` set to `true`, messages
+     * published with the same `ordering_key` value will be delivered to
+     * subscribers in the order in which they are received by the Pub/Sub system.
+     * 
+ * + * string ordering_key = 5; + */ public Builder clearOrderingKey() { orderingKey_ = getDefaultInstance().getOrderingKey(); onChanged(); return this; } - /** string ordering_key = 5; */ + /** + * + * + *
+     * WARNING: `ordering_key` is an experimental field not yet
+     * supported by the service.
+     * Identifies related messages for which publish order should be respected.
+     * If a `Subscription` has `enable_message_ordering` set to `true`, messages
+     * published with the same `ordering_key` value will be delivered to
+     * subscribers in the order in which they are received by the Pub/Sub system.
+     * 
+ * + * string ordering_key = 5; + */ public Builder setOrderingKeyBytes(com.google.protobuf.ByteString value) { if (value == null) { throw new NullPointerException(); diff --git a/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/PubsubMessageOrBuilder.java b/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/PubsubMessageOrBuilder.java index a278851db728..93fb2ce06b9c 100644 --- a/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/PubsubMessageOrBuilder.java +++ b/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/PubsubMessageOrBuilder.java @@ -138,8 +138,34 @@ public interface PubsubMessageOrBuilder */ com.google.protobuf.TimestampOrBuilder getPublishTimeOrBuilder(); - /** string ordering_key = 5; */ + /** + * + * + *
+   * WARNING: `ordering_key` is an experimental field not yet
+   * supported by the service.
+   * Identifies related messages for which publish order should be respected.
+   * If a `Subscription` has `enable_message_ordering` set to `true`, messages
+   * published with the same `ordering_key` value will be delivered to
+   * subscribers in the order in which they are received by the Pub/Sub system.
+   * 
+ * + * string ordering_key = 5; + */ java.lang.String getOrderingKey(); - /** string ordering_key = 5; */ + /** + * + * + *
+   * WARNING: `ordering_key` is an experimental field not yet
+   * supported by the service.
+   * Identifies related messages for which publish order should be respected.
+   * If a `Subscription` has `enable_message_ordering` set to `true`, messages
+   * published with the same `ordering_key` value will be delivered to
+   * subscribers in the order in which they are received by the Pub/Sub system.
+   * 
+ * + * string ordering_key = 5; + */ com.google.protobuf.ByteString getOrderingKeyBytes(); } diff --git a/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/Subscription.java b/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/Subscription.java index 3ddf8870e33c..9b73353c4e84 100644 --- a/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/Subscription.java +++ b/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/Subscription.java @@ -554,7 +554,20 @@ public java.lang.String getLabelsOrThrow(java.lang.String key) { public static final int ENABLE_MESSAGE_ORDERING_FIELD_NUMBER = 10; private boolean enableMessageOrdering_; - /** bool enable_message_ordering = 10; */ + /** + * + * + *
+   * WARNING: `enable_message_ordering` is an experimental field not yet
+   * supported by the service.
+   * If true, messages published with the same `ordering_key` in `PubsubMessage`
+   * will be delivered to the subscribers in the order in which they
+   * are received by the Pub/Sub system. Otherwise, they may be delivered in
+   * any order.
+   * 
+ * + * bool enable_message_ordering = 10; + */ public boolean getEnableMessageOrdering() { return enableMessageOrdering_; } @@ -2135,18 +2148,57 @@ public Builder putAllLabels(java.util.Map va } private boolean enableMessageOrdering_; - /** bool enable_message_ordering = 10; */ + /** + * + * + *
+     * WARNING: `enable_message_ordering` is an experimental field not yet
+     * supported by the service.
+     * If true, messages published with the same `ordering_key` in `PubsubMessage`
+     * will be delivered to the subscribers in the order in which they
+     * are received by the Pub/Sub system. Otherwise, they may be delivered in
+     * any order.
+     * 
+ * + * bool enable_message_ordering = 10; + */ public boolean getEnableMessageOrdering() { return enableMessageOrdering_; } - /** bool enable_message_ordering = 10; */ + /** + * + * + *
+     * WARNING: `enable_message_ordering` is an experimental field not yet
+     * supported by the service.
+     * If true, messages published with the same `ordering_key` in `PubsubMessage`
+     * will be delivered to the subscribers in the order in which they
+     * are received by the Pub/Sub system. Otherwise, they may be delivered in
+     * any order.
+     * 
+ * + * bool enable_message_ordering = 10; + */ public Builder setEnableMessageOrdering(boolean value) { enableMessageOrdering_ = value; onChanged(); return this; } - /** bool enable_message_ordering = 10; */ + /** + * + * + *
+     * WARNING: `enable_message_ordering` is an experimental field not yet
+     * supported by the service.
+     * If true, messages published with the same `ordering_key` in `PubsubMessage`
+     * will be delivered to the subscribers in the order in which they
+     * are received by the Pub/Sub system. Otherwise, they may be delivered in
+     * any order.
+     * 
+ * + * bool enable_message_ordering = 10; + */ public Builder clearEnableMessageOrdering() { enableMessageOrdering_ = false; diff --git a/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/SubscriptionOrBuilder.java b/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/SubscriptionOrBuilder.java index 1d340dd65a49..8a97ea0bd8a7 100644 --- a/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/SubscriptionOrBuilder.java +++ b/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/SubscriptionOrBuilder.java @@ -264,7 +264,20 @@ public interface SubscriptionOrBuilder */ java.lang.String getLabelsOrThrow(java.lang.String key); - /** bool enable_message_ordering = 10; */ + /** + * + * + *
+   * WARNING: `enable_message_ordering` is an experimental field not yet
+   * supported by the service.
+   * If true, messages published with the same `ordering_key` in `PubsubMessage`
+   * will be delivered to the subscribers in the order in which they
+   * are received by the Pub/Sub system. Otherwise, they may be delivered in
+   * any order.
+   * 
+ * + * bool enable_message_ordering = 10; + */ boolean getEnableMessageOrdering(); /** diff --git a/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/proto/google/pubsub/v1/pubsub.proto b/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/proto/google/pubsub/v1/pubsub.proto index cd7ca090c3f2..1dc8cd301508 100644 --- a/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/proto/google/pubsub/v1/pubsub.proto +++ b/google-api-grpc/proto-google-cloud-pubsub-v1/src/main/proto/google/pubsub/v1/pubsub.proto @@ -400,6 +400,13 @@ message PubsubMessage { // publisher in a `Publish` call. google.protobuf.Timestamp publish_time = 4; + // WARNING: `ordering_key` is an experimental field not yet + // supported by the service. + // + // Identifies related messages for which publish order should be respected. + // If a `Subscription` has `enable_message_ordering` set to `true`, messages + // published with the same `ordering_key` value will be delivered to + // subscribers in the order in which they are received by the Pub/Sub system. string ordering_key = 5; } @@ -601,7 +608,15 @@ message Subscription { // managing labels. map labels = 9; + // WARNING: `enable_message_ordering` is an experimental field not yet + // supported by the service. + // + // If true, messages published with the same `ordering_key` in `PubsubMessage` + // will be delivered to the subscribers in the order in which they + // are received by the Pub/Sub system. Otherwise, they may be delivered in + // any order. bool enable_message_ordering = 10; + // A policy that specifies the conditions for this subscription's expiration. // A subscription is considered active as long as any connected subscriber is // successfully consuming messages from the subscription or is issuing From abd8e4f3cc38b867fb135171721053422c56550e Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Mon, 11 Feb 2019 16:21:33 -0500 Subject: [PATCH 4/5] Change retry codes and add checks for a null ordering key. --- .../com/google/cloud/pubsub/v1/Publisher.java | 51 ++++++++++++------- .../cloud/pubsub/v1/PublisherImplTest.java | 25 --------- 2 files changed, 33 insertions(+), 43 deletions(-) diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index b5f39c1f7fcc..3673efd6a593 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -45,11 +45,13 @@ import com.google.pubsub.v1.TopicNames; import java.io.IOException; import java.util.Collections; +import java.util.EnumSet; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -131,9 +133,29 @@ private Publisher(Builder builder) throws IOException { // We post-process this here to keep backward-compatibility. // Also, if "message ordering" is enabled, the publisher should retry sending the failed // message infinitely rather than sending the next one. - RetrySettings retrySettings = builder.retrySettings; - if (retrySettings.getMaxAttempts() == 0 || builder.enableMessageOrdering) { - retrySettings = retrySettings.toBuilder().setMaxAttempts(Integer.MAX_VALUE).build(); + RetrySettings.Builder retrySettingsBuilder = builder.retrySettings.toBuilder(); + if (retrySettingsBuilder.getMaxAttempts() == 0) { + retrySettingsBuilder.setMaxAttempts(Integer.MAX_VALUE); + } + if (enableMessageOrdering) { + retrySettingsBuilder + .setMaxAttempts(Integer.MAX_VALUE) + .setTotalTimeout(Duration.ofNanos(Long.MAX_VALUE)); + } + + Set retryCodes; + if (enableMessageOrdering) { + retryCodes = EnumSet.allOf(StatusCode.Code.class); + } else { + retryCodes = + EnumSet.of( + StatusCode.Code.ABORTED, + StatusCode.Code.CANCELLED, + StatusCode.Code.DEADLINE_EXCEEDED, + StatusCode.Code.INTERNAL, + StatusCode.Code.RESOURCE_EXHAUSTED, + StatusCode.Code.UNKNOWN, + StatusCode.Code.UNAVAILABLE); } PublisherStubSettings.Builder stubSettings = @@ -143,15 +165,8 @@ private Publisher(Builder builder) throws IOException { .setTransportChannelProvider(builder.channelProvider); stubSettings .publishSettings() - .setRetryableCodes( - StatusCode.Code.ABORTED, - StatusCode.Code.CANCELLED, - StatusCode.Code.DEADLINE_EXCEEDED, - StatusCode.Code.INTERNAL, - StatusCode.Code.RESOURCE_EXHAUSTED, - StatusCode.Code.UNKNOWN, - StatusCode.Code.UNAVAILABLE) - .setRetrySettings(retrySettings) + .setRetryableCodes(retryCodes) + .setRetrySettings(retrySettingsBuilder.build()) .setBatchingSettings(BatchingSettings.newBuilder().setIsEnabled(false).build()); this.publisherStub = GrpcPublisherStub.create(stubSettings.build()); @@ -200,7 +215,7 @@ public ApiFuture publish(PubsubMessage message) { } final String orderingKey = message.getOrderingKey(); - if (!orderingKey.isEmpty() && !enableMessageOrdering) { + if (orderingKey != null && !orderingKey.isEmpty() && !enableMessageOrdering) { throw new IllegalStateException( "Cannot publish a message with an ordering key when message ordeirng is not enabled."); } @@ -300,10 +315,10 @@ public void publishAllOutstanding() { try { for (MessagesBatch batch : messagesBatches.values()) { if (!batch.isEmpty()) { - // TODO(kimkyung-goog): Do not release `messageBatchLock` when publishing a batch. If it's - // released, the order of publishing cannot be guaranteed if `publish()` is called while - // this function is running. This locking mechanism needs to be improved if it causes any - // performance degradation. + // TODO(kimkyung-goog): Do not release `messagesBatchLock` when publishing a batch. If + // it's released, the order of publishing cannot be guaranteed if `publish()` is called + // while this function is running. This locking mechanism needs to be improved if it + // causes any performance degradation. publishOutstandingBatch(batch.popOutstandingBatch()); } } @@ -364,7 +379,7 @@ public void onFailure(Throwable t) { } }; - if (outstandingBatch.orderingKey.isEmpty()) { + if (outstandingBatch.orderingKey == null || outstandingBatch.orderingKey.isEmpty()) { // If ordering key is empty, publish the batch using the normal executor. Runnable task = new Runnable() { diff --git a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index 720c6b601822..3d0557e97719 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -380,31 +380,6 @@ public void testEnableMessageOrdering_overwritesMaxAttempts() throws Exception { publisher.shutdown(); } - @Test(expected = ExecutionException.class) - public void testEnableMessageOrdering_failsImmediatelyForNonRetryableError() throws Exception { - Publisher publisher = - getTestPublisherBuilder() - .setExecutorProvider(SINGLE_THREAD_EXECUTOR) - .setRetrySettings( - Publisher.Builder.DEFAULT_RETRY_SETTINGS - .toBuilder() - .setTotalTimeout(Duration.ofSeconds(10)) - .setMaxAttempts(2) - .build()) - .setEnableMessageOrdering(true) - .build(); - - testPublisherServiceImpl.addPublishError(new StatusException(Status.INVALID_ARGUMENT)); - ApiFuture publishFuture1 = sendTestMessageWithOrderingKey(publisher, "m1", "orderA"); - - try { - publishFuture1.get(); - } finally { - assertEquals(1, testPublisherServiceImpl.getCapturedRequests().size()); - publisher.shutdown(); - } - } - private ApiFuture sendTestMessageWithOrderingKey( Publisher publisher, String data, String orderingKey) { return publisher.publish( From f2c734848b6c526074a9ca3895f2568b617cdb8e Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Tue, 12 Feb 2019 15:53:00 -0500 Subject: [PATCH 5/5] Add support for ordered delivery to subscribers --- .../cloud/pubsub/v1/MessageDispatcher.java | 81 ++++++++++--------- .../pubsub/v1/MessageDispatcherTest.java | 53 ++++++++++++ 2 files changed, 97 insertions(+), 37 deletions(-) diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index 5f5ebbaee204..bf79813f74e7 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -65,6 +65,7 @@ class MessageDispatcher { @InternalApi static final Duration PENDING_ACKS_SEND_DELAY = Duration.ofMillis(100); private final Executor executor; + private final SequentialExecutorService sequentialExecutor; private final ScheduledExecutorService systemExecutor; private final ApiClock clock; @@ -217,6 +218,7 @@ void sendAckOperations( jobLock = new ReentrantLock(); messagesWaiter = new MessageWaiter(); this.clock = clock; + this.sequentialExecutor = new SequentialExecutorService(executor); } public void start() { @@ -401,46 +403,51 @@ public void processOutstandingBatches() { outstandingMessageBatches.poll(); batchCallback = nextBatch.doneCallback; } - } - final PubsubMessage message = outstandingMessage.receivedMessage().getMessage(); - final AckHandler ackHandler = outstandingMessage.ackHandler(); - final SettableApiFuture response = SettableApiFuture.create(); - final AckReplyConsumer consumer = - new AckReplyConsumer() { - @Override - public void ack() { - response.set(AckReply.ACK); - } - - @Override - public void nack() { - response.set(AckReply.NACK); - } - }; - ApiFutures.addCallback(response, ackHandler, MoreExecutors.directExecutor()); - executor.execute( - new Runnable() { - @Override - public void run() { - try { - if (ackHandler - .totalExpiration - .plusSeconds(messageDeadlineSeconds.get()) - .isBefore(now())) { - // Message expired while waiting. We don't extend these messages anymore, - // so it was probably sent to someone else. Don't work on it. - // Don't nack it either, because we'd be nacking someone else's message. - ackHandler.forget(); - return; - } + final PubsubMessage message = outstandingMessage.receivedMessage().getMessage(); + final AckHandler ackHandler = outstandingMessage.ackHandler(); + final SettableApiFuture response = SettableApiFuture.create(); + final AckReplyConsumer consumer = + new AckReplyConsumer() { + @Override + public void ack() { + response.set(AckReply.ACK); + } - receiver.receiveMessage(message, consumer); - } catch (Exception e) { - response.setException(e); + @Override + public void nack() { + response.set(AckReply.NACK); } - } - }); + }; + ApiFutures.addCallback(response, ackHandler, MoreExecutors.directExecutor()); + Runnable deliverMessageTask = + new Runnable() { + @Override + public void run() { + try { + if (ackHandler + .totalExpiration + .plusSeconds(messageDeadlineSeconds.get()) + .isBefore(now())) { + // Message expired while waiting. We don't extend these messages anymore, + // so it was probably sent to someone else. Don't work on it. + // Don't nack it either, because we'd be nacking someone else's message. + ackHandler.forget(); + return; + } + + receiver.receiveMessage(message, consumer); + } catch (Exception e) { + response.setException(e); + } + } + }; + if (message.getOrderingKey().isEmpty()) { + executor.execute(deliverMessageTask); + } else { + sequentialExecutor.submit(message.getOrderingKey(), deliverMessageTask); + } + } if (batchDone) { batchCallback.run(); } diff --git a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java index 785368bb13cb..0f5fb93664b1 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java +++ b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java @@ -28,8 +28,10 @@ import com.google.pubsub.v1.ReceivedMessage; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -53,6 +55,7 @@ public void run() { private MessageDispatcher dispatcher; private LinkedBlockingQueue consumers; + private Map> messagesByOrderingKey; private List sentAcks; private List sentModAcks; private FakeClock clock; @@ -72,6 +75,7 @@ static ModAckItem of(String ackId, int seconds) { @Before public void setUp() { consumers = new LinkedBlockingQueue<>(); + messagesByOrderingKey = new HashMap<>(); sentAcks = new ArrayList<>(); sentModAcks = new ArrayList<>(); @@ -79,6 +83,12 @@ public void setUp() { new MessageReceiver() { @Override public void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer) { + List messages = messagesByOrderingKey.get(message.getOrderingKey()); + if (messages == null) { + messages = new ArrayList<>(); + messagesByOrderingKey.put(message.getOrderingKey(), messages); + } + messages.add(message.getData()); consumers.add(consumer); } }; @@ -205,4 +215,47 @@ public void testDeadlineAdjustment() throws Exception { assertThat(dispatcher.computeDeadlineSeconds()).isEqualTo(42); } + + private ReceivedMessage newReceivedMessage(String ackId, String orderingKey, String data) { + return ReceivedMessage.newBuilder() + .setAckId(ackId) + .setMessage( + PubsubMessage.newBuilder() + .setOrderingKey(orderingKey) + .setData(ByteString.copyFromUtf8(data)) + .build()) + .build(); + } + + @Test + public void testOrderingKey() throws Exception { + // Create messages with "orderA". + ReceivedMessage message1 = newReceivedMessage("ackId1", "orderA", "m1"); + ReceivedMessage message2 = newReceivedMessage("ackId2", "orderA", "m2"); + // Create messages with "orderB". + ReceivedMessage message3 = newReceivedMessage("ackId3", "orderB", "m3"); + ReceivedMessage message4 = newReceivedMessage("ackId4", "orderB", "m4"); + ReceivedMessage message5 = newReceivedMessage("ackId5", "orderB", "m5"); + + dispatcher.processReceivedMessages(Collections.singletonList(message1), NOOP_RUNNABLE); + consumers.take().ack(); + dispatcher.processReceivedMessages(Collections.singletonList(message2), NOOP_RUNNABLE); + consumers.take().ack(); + dispatcher.processReceivedMessages(Collections.singletonList(message3), NOOP_RUNNABLE); + consumers.take().ack(); + dispatcher.processReceivedMessages(Collections.singletonList(message4), NOOP_RUNNABLE); + consumers.take().ack(); + dispatcher.processReceivedMessages(Collections.singletonList(message5), NOOP_RUNNABLE); + consumers.take().ack(); + + assertThat(messagesByOrderingKey.get("orderA")) + .containsExactly(ByteString.copyFromUtf8("m1"), ByteString.copyFromUtf8("m2")) + .inOrder(); + assertThat(messagesByOrderingKey.get("orderB")) + .containsExactly( + ByteString.copyFromUtf8("m3"), + ByteString.copyFromUtf8("m4"), + ByteString.copyFromUtf8("m5")) + .inOrder(); + } }