Skip to content

Commit d016e8e

Browse files
committed
fix(pubsub): split large (mod)ACK requests into smaller ones
There is a server-side limit on the maximum size of ACK and modACK requests, which can be hit if the leaser tries to manage too many messages in a single requests. This commit assures that such large requests are split into multiple smaller requests.
1 parent 89eaedb commit d016e8e

File tree

2 files changed

+91
-10
lines changed

2 files changed

+91
-10
lines changed

pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,12 @@
1313
# limitations under the License.
1414

1515
from __future__ import absolute_import
16+
from __future__ import division
1617

1718
import collections
19+
import itertools
1820
import logging
21+
import math
1922
import threading
2023

2124
from google.cloud.pubsub_v1 import types
@@ -34,6 +37,18 @@
3437
"""The maximum amount of time in seconds to wait for additional request items
3538
before processing the next batch of requests."""
3639

40+
_ACK_IDS_BATCH_SIZE = 3000
41+
"""The maximum number of ACK IDs to send in a single StreamingPullRequest.
42+
43+
The backend imposes a maximum request size limit of 524288 bytes (512 KiB) per
44+
acknowledge / modifyAckDeadline request. ACK IDs have a maximum size of 164
45+
bytes, thus we cannot send more than o 524288/164 ~= 3197 ACK IDs in a single
46+
StreamingPullRequest message.
47+
48+
Accounting for some overhead, we should thus only send a maximum of 3000 ACK
49+
IDs at a time.
50+
"""
51+
3752

3853
class Dispatcher(object):
3954
def __init__(self, manager, queue):
@@ -119,9 +134,16 @@ def ack(self, items):
119134
if time_to_ack is not None:
120135
self._manager.ack_histogram.add(time_to_ack)
121136

122-
ack_ids = [item.ack_id for item in items]
123-
request = types.StreamingPullRequest(ack_ids=ack_ids)
124-
self._manager.send(request)
137+
# We must potentially split the request into multiple smaller requests
138+
# to avoid the server-side max request size limit.
139+
ack_ids = (item.ack_id for item in items)
140+
total_chunks = int(math.ceil(len(items) / _ACK_IDS_BATCH_SIZE))
141+
142+
for _ in range(total_chunks):
143+
request = types.StreamingPullRequest(
144+
ack_ids=itertools.islice(ack_ids, _ACK_IDS_BATCH_SIZE),
145+
)
146+
self._manager.send(request)
125147

126148
# Remove the message from lease management.
127149
self.drop(items)
@@ -150,13 +172,18 @@ def modify_ack_deadline(self, items):
150172
Args:
151173
items(Sequence[ModAckRequest]): The items to modify.
152174
"""
153-
ack_ids = [item.ack_id for item in items]
154-
seconds = [item.seconds for item in items]
155-
156-
request = types.StreamingPullRequest(
157-
modify_deadline_ack_ids=ack_ids, modify_deadline_seconds=seconds
158-
)
159-
self._manager.send(request)
175+
# We must potentially split the request into multiple smaller requests
176+
# to avoid the server-side max request size limit.
177+
ack_ids = (item.ack_id for item in items)
178+
seconds = (item.seconds for item in items)
179+
total_chunks = int(math.ceil(len(items) / _ACK_IDS_BATCH_SIZE))
180+
181+
for _ in range(total_chunks):
182+
request = types.StreamingPullRequest(
183+
modify_deadline_ack_ids=itertools.islice(ack_ids, _ACK_IDS_BATCH_SIZE),
184+
modify_deadline_seconds=itertools.islice(seconds, _ACK_IDS_BATCH_SIZE),
185+
)
186+
self._manager.send(request)
160187

161188
def nack(self, items):
162189
"""Explicitly deny receipt of messages.

pubsub/tests/unit/pubsub_v1/subscriber/test_dispatcher.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,33 @@ def test_ack_no_time():
9595
manager.ack_histogram.add.assert_not_called()
9696

9797

98+
def test_ack_splitting_large_payload():
99+
manager = mock.create_autospec(
100+
streaming_pull_manager.StreamingPullManager, instance=True
101+
)
102+
dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue)
103+
104+
items = [
105+
# use realistic lengths for ACK IDs (max 164 bytes)
106+
requests.AckRequest(ack_id=str(i).zfill(164), byte_size=0, time_to_ack=20)
107+
for i in range(6001)
108+
]
109+
dispatcher_.ack(items)
110+
111+
calls = manager.send.call_args_list
112+
assert len(calls) == 3
113+
114+
all_ack_ids = {item.ack_id for item in items}
115+
sent_ack_ids = set()
116+
117+
for call in calls:
118+
message = call.args[0]
119+
assert message.ByteSize() <= 524288 # server-side limit (2**19)
120+
sent_ack_ids.update(message.ack_ids)
121+
122+
assert sent_ack_ids == all_ack_ids # all messages should have been ACK-ed
123+
124+
98125
def test_lease():
99126
manager = mock.create_autospec(
100127
streaming_pull_manager.StreamingPullManager, instance=True
@@ -153,6 +180,33 @@ def test_modify_ack_deadline():
153180
)
154181

155182

183+
def test_modify_ack_deadline_splitting_large_payload():
184+
manager = mock.create_autospec(
185+
streaming_pull_manager.StreamingPullManager, instance=True
186+
)
187+
dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue)
188+
189+
items = [
190+
# use realistic lengths for ACK IDs (max 164 bytes)
191+
requests.ModAckRequest(ack_id=str(i).zfill(164), seconds=60)
192+
for i in range(6001)
193+
]
194+
dispatcher_.modify_ack_deadline(items)
195+
196+
calls = manager.send.call_args_list
197+
assert len(calls) == 3
198+
199+
all_ack_ids = {item.ack_id for item in items}
200+
sent_ack_ids = set()
201+
202+
for call in calls:
203+
message = call.args[0]
204+
assert message.ByteSize() <= 524288 # server-side limit (2**19)
205+
sent_ack_ids.update(message.modify_deadline_ack_ids)
206+
207+
assert sent_ack_ids == all_ack_ids # all messages should have been ACK-ed
208+
209+
156210
@mock.patch("threading.Thread", autospec=True)
157211
def test_start(thread):
158212
manager = mock.create_autospec(

0 commit comments

Comments
 (0)