From 0c61849d86999e1d3743cded6b2ce10f87be054b Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Mon, 4 Nov 2019 14:16:35 +0200 Subject: [PATCH 1/3] fix(pubsub): split large (mod)ACK requests into smaller ones There is a server-side limit on the maximum size of ACK and modACK requests, which can be hit if the leaser tries to manage too many messages in a single requests. This commit assures that such large requests are split into multiple smaller requests. --- .../subscriber/_protocol/dispatcher.py | 47 ++++++++++++---- .../pubsub_v1/subscriber/test_dispatcher.py | 54 +++++++++++++++++++ 2 files changed, 91 insertions(+), 10 deletions(-) diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py index 2b2574829306..372d8e84e563 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py @@ -13,9 +13,12 @@ # limitations under the License. from __future__ import absolute_import +from __future__ import division import collections +import itertools import logging +import math import threading from google.cloud.pubsub_v1 import types @@ -34,6 +37,18 @@ """The maximum amount of time in seconds to wait for additional request items before processing the next batch of requests.""" +_ACK_IDS_BATCH_SIZE = 3000 +"""The maximum number of ACK IDs to send in a single StreamingPullRequest. + +The backend imposes a maximum request size limit of 524288 bytes (512 KiB) per +acknowledge / modifyAckDeadline request. ACK IDs have a maximum size of 164 +bytes, thus we cannot send more than o 524288/164 ~= 3197 ACK IDs in a single +StreamingPullRequest message. + +Accounting for some overhead, we should thus only send a maximum of 3000 ACK +IDs at a time. +""" + class Dispatcher(object): def __init__(self, manager, queue): @@ -119,9 +134,16 @@ def ack(self, items): if time_to_ack is not None: self._manager.ack_histogram.add(time_to_ack) - ack_ids = [item.ack_id for item in items] - request = types.StreamingPullRequest(ack_ids=ack_ids) - self._manager.send(request) + # We must potentially split the request into multiple smaller requests + # to avoid the server-side max request size limit. + ack_ids = (item.ack_id for item in items) + total_chunks = int(math.ceil(len(items) / _ACK_IDS_BATCH_SIZE)) + + for _ in range(total_chunks): + request = types.StreamingPullRequest( + ack_ids=itertools.islice(ack_ids, _ACK_IDS_BATCH_SIZE) + ) + self._manager.send(request) # Remove the message from lease management. self.drop(items) @@ -150,13 +172,18 @@ def modify_ack_deadline(self, items): Args: items(Sequence[ModAckRequest]): The items to modify. """ - ack_ids = [item.ack_id for item in items] - seconds = [item.seconds for item in items] - - request = types.StreamingPullRequest( - modify_deadline_ack_ids=ack_ids, modify_deadline_seconds=seconds - ) - self._manager.send(request) + # We must potentially split the request into multiple smaller requests + # to avoid the server-side max request size limit. + ack_ids = (item.ack_id for item in items) + seconds = (item.seconds for item in items) + total_chunks = int(math.ceil(len(items) / _ACK_IDS_BATCH_SIZE)) + + for _ in range(total_chunks): + request = types.StreamingPullRequest( + modify_deadline_ack_ids=itertools.islice(ack_ids, _ACK_IDS_BATCH_SIZE), + modify_deadline_seconds=itertools.islice(seconds, _ACK_IDS_BATCH_SIZE), + ) + self._manager.send(request) def nack(self, items): """Explicitly deny receipt of messages. diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_dispatcher.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_dispatcher.py index 0e1e9744f6d9..33f1de16499a 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_dispatcher.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_dispatcher.py @@ -95,6 +95,33 @@ def test_ack_no_time(): manager.ack_histogram.add.assert_not_called() +def test_ack_splitting_large_payload(): + manager = mock.create_autospec( + streaming_pull_manager.StreamingPullManager, instance=True + ) + dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue) + + items = [ + # use realistic lengths for ACK IDs (max 164 bytes) + requests.AckRequest(ack_id=str(i).zfill(164), byte_size=0, time_to_ack=20) + for i in range(6001) + ] + dispatcher_.ack(items) + + calls = manager.send.call_args_list + assert len(calls) == 3 + + all_ack_ids = {item.ack_id for item in items} + sent_ack_ids = set() + + for call in calls: + message = call.args[0] + assert message.ByteSize() <= 524288 # server-side limit (2**19) + sent_ack_ids.update(message.ack_ids) + + assert sent_ack_ids == all_ack_ids # all messages should have been ACK-ed + + def test_lease(): manager = mock.create_autospec( streaming_pull_manager.StreamingPullManager, instance=True @@ -153,6 +180,33 @@ def test_modify_ack_deadline(): ) +def test_modify_ack_deadline_splitting_large_payload(): + manager = mock.create_autospec( + streaming_pull_manager.StreamingPullManager, instance=True + ) + dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue) + + items = [ + # use realistic lengths for ACK IDs (max 164 bytes) + requests.ModAckRequest(ack_id=str(i).zfill(164), seconds=60) + for i in range(6001) + ] + dispatcher_.modify_ack_deadline(items) + + calls = manager.send.call_args_list + assert len(calls) == 3 + + all_ack_ids = {item.ack_id for item in items} + sent_ack_ids = set() + + for call in calls: + message = call.args[0] + assert message.ByteSize() <= 524288 # server-side limit (2**19) + sent_ack_ids.update(message.modify_deadline_ack_ids) + + assert sent_ack_ids == all_ack_ids # all messages should have been ACK-ed + + @mock.patch("threading.Thread", autospec=True) def test_start(thread): manager = mock.create_autospec( From 86f1d7fbb98151ccb3a3bcbf82cd1418513673f2 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Mon, 4 Nov 2019 15:22:19 +0200 Subject: [PATCH 2/3] Decrease max ACK batch size to 2500 The previous limit of 3000 seems to be too optimistic, and the request size limit is still hit. Reducing the batch size to 2500 fixes the problem. --- .../pubsub_v1/subscriber/_protocol/dispatcher.py | 6 +++--- .../unit/pubsub_v1/subscriber/test_dispatcher.py | 12 ++++++------ 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py index 372d8e84e563..b1d8429cba58 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py @@ -37,15 +37,15 @@ """The maximum amount of time in seconds to wait for additional request items before processing the next batch of requests.""" -_ACK_IDS_BATCH_SIZE = 3000 +_ACK_IDS_BATCH_SIZE = 2500 """The maximum number of ACK IDs to send in a single StreamingPullRequest. The backend imposes a maximum request size limit of 524288 bytes (512 KiB) per acknowledge / modifyAckDeadline request. ACK IDs have a maximum size of 164 -bytes, thus we cannot send more than o 524288/164 ~= 3197 ACK IDs in a single +bytes, thus we cannot send more than o 524288/176 ~= 2979 ACK IDs in a single StreamingPullRequest message. -Accounting for some overhead, we should thus only send a maximum of 3000 ACK +Accounting for some overhead, we should thus only send a maximum of 2500 ACK IDs at a time. """ diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_dispatcher.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_dispatcher.py index 33f1de16499a..559dd0573480 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_dispatcher.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_dispatcher.py @@ -102,9 +102,9 @@ def test_ack_splitting_large_payload(): dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue) items = [ - # use realistic lengths for ACK IDs (max 164 bytes) - requests.AckRequest(ack_id=str(i).zfill(164), byte_size=0, time_to_ack=20) - for i in range(6001) + # use realistic lengths for ACK IDs (max 176 bytes) + requests.AckRequest(ack_id=str(i).zfill(176), byte_size=0, time_to_ack=20) + for i in range(5001) ] dispatcher_.ack(items) @@ -187,9 +187,9 @@ def test_modify_ack_deadline_splitting_large_payload(): dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue) items = [ - # use realistic lengths for ACK IDs (max 164 bytes) - requests.ModAckRequest(ack_id=str(i).zfill(164), seconds=60) - for i in range(6001) + # use realistic lengths for ACK IDs (max 176 bytes) + requests.ModAckRequest(ack_id=str(i).zfill(176), seconds=60) + for i in range(5001) ] dispatcher_.modify_ack_deadline(items) From 62f9d798d0c938bb35089e436b8be8a41430e3f5 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Fri, 22 Nov 2019 19:01:49 +0100 Subject: [PATCH 3/3] Add additional test assertions about sent ACK IDs The tests should also check that each message is (MOD)ACK-ed exactly once. --- .../unit/pubsub_v1/subscriber/test_dispatcher.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_dispatcher.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_dispatcher.py index 559dd0573480..592a03c6422c 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_dispatcher.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_dispatcher.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import collections import threading from google.cloud.pubsub_v1 import types @@ -112,14 +113,15 @@ def test_ack_splitting_large_payload(): assert len(calls) == 3 all_ack_ids = {item.ack_id for item in items} - sent_ack_ids = set() + sent_ack_ids = collections.Counter() for call in calls: message = call.args[0] assert message.ByteSize() <= 524288 # server-side limit (2**19) sent_ack_ids.update(message.ack_ids) - assert sent_ack_ids == all_ack_ids # all messages should have been ACK-ed + assert set(sent_ack_ids) == all_ack_ids # all messages should have been ACK-ed + assert sent_ack_ids.most_common(1)[0][1] == 1 # each message ACK-ed exactly once def test_lease(): @@ -197,14 +199,15 @@ def test_modify_ack_deadline_splitting_large_payload(): assert len(calls) == 3 all_ack_ids = {item.ack_id for item in items} - sent_ack_ids = set() + sent_ack_ids = collections.Counter() for call in calls: message = call.args[0] assert message.ByteSize() <= 524288 # server-side limit (2**19) sent_ack_ids.update(message.modify_deadline_ack_ids) - assert sent_ack_ids == all_ack_ids # all messages should have been ACK-ed + assert set(sent_ack_ids) == all_ack_ids # all messages should have been MODACK-ed + assert sent_ack_ids.most_common(1)[0][1] == 1 # each message MODACK-ed exactly once @mock.patch("threading.Thread", autospec=True)