From de45c3ecbb35516a0b5ad27531378e266984857e Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Tue, 22 Oct 2019 15:35:40 +0300 Subject: [PATCH 1/4] fix(pubsub): lease-manage all received messages This is to prevent the messages that are put on hold from unnecessarily timing out too soon, causing the backend to re-send them. --- .../_protocol/streaming_pull_manager.py | 27 ++++++++++--------- .../subscriber/test_streaming_pull_manager.py | 12 ++++++--- 2 files changed, 22 insertions(+), 17 deletions(-) diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index d3b1d6f51eb6..97dc27aae059 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -545,6 +545,15 @@ def _on_response(self, response): self._messages_on_hold.qsize(), ) + # Immediately (i.e. without waiting for the auto lease management) + # modack the messages we received, as this tells the server that we've + # received them. + items = [ + requests.ModAckRequest(message.ack_id, self._ack_histogram.percentile(99)) + for message in response.received_messages + ] + self._dispatcher.modify_ack_deadline(items) + invoke_callbacks_for = [] for received_message in response.received_messages: @@ -552,23 +561,15 @@ def _on_response(self, response): received_message.message, received_message.ack_id, self._scheduler.queue ) if self.load < _MAX_LOAD: - req = requests.LeaseRequest( - ack_id=message.ack_id, byte_size=message.size - ) - self.leaser.add([req]) invoke_callbacks_for.append(message) - self.maybe_pause_consumer() else: self._messages_on_hold.put(message) - # Immediately (i.e. without waiting for the auto lease management) - # modack the messages we received and not put on hold, as this tells - # the server that we've received them. - items = [ - requests.ModAckRequest(message.ack_id, self._ack_histogram.percentile(99)) - for message in invoke_callbacks_for - ] - self._dispatcher.modify_ack_deadline(items) + req = requests.LeaseRequest( + ack_id=message.ack_id, byte_size=message.size + ) + self.leaser.add([req]) + self.maybe_pause_consumer() _LOGGER.debug( "Scheduling callbacks for %s new messages, new total on hold %s.", diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index 114663e7b8e2..08dcf32cc7b1 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -668,13 +668,17 @@ def test__on_response_with_leaser_overload(): # are called in the expected way. manager._on_response(response) - # only the messages that are added to the lease management and dispatched to - # callbacks should have their ACK deadline extended + # all messages should be added to the lease management and have their ACK + # deadline extended, even those not dispatched to callbacks dispatcher.modify_ack_deadline.assert_called_once_with( - [requests.ModAckRequest("fack", 10)] + [ + requests.ModAckRequest("fack", 10), + requests.ModAckRequest("back", 10), + requests.ModAckRequest("zack", 10), + ] ) - # one message should be scheduled, the leaser capacity allows for it + # one message should be scheduled, the flow control limits allow for it schedule_calls = scheduler.schedule.mock_calls assert len(schedule_calls) == 1 call_args = schedule_calls[0][1] From b65873b3221da32deac4262a8c1d3084f0fbe105 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Wed, 23 Oct 2019 11:44:10 +0300 Subject: [PATCH 2/4] Exclude on hold messages from load calculation Even the messages received that exceed the maximum load (as defined by flow control) must be lease-mananged to avoid unnecessary ACK deadline expirations, but since they are not dispatched (yet) to user callbacks, they should not contribute to the overall load. Without this change, the total load could be overestimated, resulting in an indefinitely paused message stream, and messages not being dispatched to callbacks when they should be. --- .../_protocol/streaming_pull_manager.py | 54 ++++++++++++------- .../subscriber/test_streaming_pull_manager.py | 31 +++++------ 2 files changed, 50 insertions(+), 35 deletions(-) diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index 97dc27aae059..da54b4ec4840 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -135,9 +135,15 @@ def __init__( # because the FlowControl limits have been hit. self._messages_on_hold = queue.Queue() + # the total number of bytes consumed by the messages currently on hold + self._on_hold_bytes = 0 + # A lock ensuring that pausing / resuming the consumer are both atomic # operations that cannot be executed concurrently. Needed for properly - # syncing these operations with the current leaser load. + # syncing these operations with the current leaser load. Additionally, + # the lock is used to protect modifications of internal data that + # affects the load computation, i.e. the count and size of the messages + # currently on hold. self._pause_resume_lock = threading.Lock() # The threads created in ``.open()``. @@ -218,10 +224,18 @@ def load(self): if self._leaser is None: return 0.0 + # Messages that are temporarily put on hold are not being delivered to + # user's callbacks, thus they should not contribute to the flow control + # load calculation. + # However, since these messages must still be lease-managed to avoid + # unnecessary ACK deadline expirations, their count and total size must + # be subtracted from the leaser's values. return max( [ - self._leaser.message_count / self._flow_control.max_messages, - self._leaser.bytes / self._flow_control.max_bytes, + (self._leaser.message_count - self._messages_on_hold.qsize()) + / self._flow_control.max_messages, + (self._leaser.bytes - self._on_hold_bytes) + / self._flow_control.max_bytes, ] ) @@ -292,13 +306,12 @@ def _maybe_release_messages(self): except queue.Empty: break - self.leaser.add( - [requests.LeaseRequest(ack_id=msg.ack_id, byte_size=msg.size)] - ) + self._on_hold_bytes -= msg.size _LOGGER.debug( - "Released held message to leaser, scheduling callback for it, " - "still on hold %s.", + "Released held message, scheduling callback for it, " + "still on hold %s (bytes %s).", self._messages_on_hold.qsize(), + self._on_hold_bytes, ) self._scheduler.schedule(self._callback, msg) @@ -540,9 +553,10 @@ def _on_response(self, response): the callback for each message using the executor. """ _LOGGER.debug( - "Processing %s received message(s), currenty on hold %s.", + "Processing %s received message(s), currenty on hold %s (bytes %s).", len(response.received_messages), self._messages_on_hold.qsize(), + self._on_hold_bytes, ) # Immediately (i.e. without waiting for the auto lease management) @@ -560,21 +574,25 @@ def _on_response(self, response): message = google.cloud.pubsub_v1.subscriber.message.Message( received_message.message, received_message.ack_id, self._scheduler.queue ) - if self.load < _MAX_LOAD: - invoke_callbacks_for.append(message) - else: - self._messages_on_hold.put(message) - - req = requests.LeaseRequest( - ack_id=message.ack_id, byte_size=message.size - ) + # Making a decision based on the load, and modifying the data that + # affects the load -> needs a lock, as that state can be modified + # by different threads. + with self._pause_resume_lock: + if self.load < _MAX_LOAD: + invoke_callbacks_for.append(message) + else: + self._messages_on_hold.put(message) + self._on_hold_bytes += message.size + + req = requests.LeaseRequest(ack_id=message.ack_id, byte_size=message.size) self.leaser.add([req]) self.maybe_pause_consumer() _LOGGER.debug( - "Scheduling callbacks for %s new messages, new total on hold %s.", + "Scheduling callbacks for %s new messages, new total on hold %s (bytes %s).", len(invoke_callbacks_for), self._messages_on_hold.qsize(), + self._on_hold_bytes, ) for msg in invoke_callbacks_for: self._scheduler.schedule(self._callback, msg) diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index 08dcf32cc7b1..3d8ebb99be4d 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -233,13 +233,15 @@ def test__maybe_release_messages_on_overload(): manager = make_manager( flow_control=types.FlowControl(max_messages=10, max_bytes=1000) ) - # Ensure load is exactly 1.0 (to verify that >= condition is used) - _leaser = manager._leaser = mock.create_autospec(leaser.Leaser) - _leaser.message_count = 10 - _leaser.bytes = 1000 msg = mock.create_autospec(message.Message, instance=True, ack_id="ack", size=11) manager._messages_on_hold.put(msg) + manager._on_hold_bytes = msg.size + + # Ensure load is exactly 1.0 (to verify that >= condition is used) + _leaser = manager._leaser = mock.create_autospec(leaser.Leaser) + _leaser.message_count = 10 + _leaser.bytes = 1000 + msg.size manager._maybe_release_messages() @@ -254,18 +256,20 @@ def test__maybe_release_messages_below_overload(): ) manager._callback = mock.sentinel.callback - # init leaser message count to 8 to leave room for 2 more messages + # Init leaser message count to 11, so that when subtracting the 3 messages + # that are on hold, there is still room for another 2 messages before the + # max load is hit. _leaser = manager._leaser = mock.create_autospec(leaser.Leaser) - fake_leaser_add(_leaser, init_msg_count=8, assumed_msg_size=25) - _leaser.add = mock.Mock(wraps=_leaser.add) # to spy on calls + fake_leaser_add(_leaser, init_msg_count=11, assumed_msg_size=10) messages = [ - mock.create_autospec(message.Message, instance=True, ack_id="ack_foo", size=11), - mock.create_autospec(message.Message, instance=True, ack_id="ack_bar", size=22), - mock.create_autospec(message.Message, instance=True, ack_id="ack_baz", size=33), + mock.create_autospec(message.Message, instance=True, ack_id="ack_foo", size=10), + mock.create_autospec(message.Message, instance=True, ack_id="ack_bar", size=10), + mock.create_autospec(message.Message, instance=True, ack_id="ack_baz", size=10), ] for msg in messages: manager._messages_on_hold.put(msg) + manager._on_hold_bytes = 3 * 10 # the actual call of MUT manager._maybe_release_messages() @@ -274,13 +278,6 @@ def test__maybe_release_messages_below_overload(): msg = manager._messages_on_hold.get_nowait() assert msg.ack_id == "ack_baz" - assert len(_leaser.add.mock_calls) == 2 - expected_calls = [ - mock.call([requests.LeaseRequest(ack_id="ack_foo", byte_size=11)]), - mock.call([requests.LeaseRequest(ack_id="ack_bar", byte_size=22)]), - ] - _leaser.add.assert_has_calls(expected_calls) - schedule_calls = manager._scheduler.schedule.mock_calls assert len(schedule_calls) == 2 for _, call_args, _ in schedule_calls: From a0cf284ecb9f895a6a45b6800c6685677a3c638d Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Wed, 23 Oct 2019 12:46:06 +0300 Subject: [PATCH 3/4] Add warning if internal bytes count is negative This should not happen, but if it does, it is a bug in the StreamingPullManager logic, and we should know about it. --- .../_protocol/streaming_pull_manager.py | 7 +++++ .../subscriber/test_streaming_pull_manager.py | 28 +++++++++++++++++++ 2 files changed, 35 insertions(+) diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index da54b4ec4840..bc3059cb69af 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -307,6 +307,13 @@ def _maybe_release_messages(self): break self._on_hold_bytes -= msg.size + + if self._on_hold_bytes < 0: + _LOGGER.warning( + "On hold bytes was unexpectedly negative: %s", self._on_hold_bytes + ) + self._on_hold_bytes = 0 + _LOGGER.debug( "Released held message, scheduling callback for it, " "still on hold %s (bytes %s).", diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index 3d8ebb99be4d..761ac073083c 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -286,6 +286,34 @@ def test__maybe_release_messages_below_overload(): assert call_args[1].ack_id in ("ack_foo", "ack_bar") +def test__maybe_release_messages_negative_on_hold_bytes_warning(caplog): + manager = make_manager( + flow_control=types.FlowControl(max_messages=10, max_bytes=1000) + ) + + msg = mock.create_autospec(message.Message, instance=True, ack_id="ack", size=17) + manager._messages_on_hold.put(msg) + manager._on_hold_bytes = 5 # too low for some reason + + _leaser = manager._leaser = mock.create_autospec(leaser.Leaser) + _leaser.message_count = 3 + _leaser.bytes = 150 + + with caplog.at_level(logging.WARNING): + manager._maybe_release_messages() + + expected_warnings = [ + record.message.lower() + for record in caplog.records + if "unexpectedly negative" in record.message + ] + assert len(expected_warnings) == 1 + assert "on hold bytes" in expected_warnings[0] + assert "-12" in expected_warnings[0] + + assert manager._on_hold_bytes == 0 # should be auto-corrected + + def test_send_unary(): manager = make_manager() manager._UNARY_REQUESTS = True From 79adfa4d9d05e6955b403b3afedb65a0e4c097be Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Wed, 23 Oct 2019 12:22:35 +0300 Subject: [PATCH 4/4] Use histogram to set default stream ACK deadline With all the messages lease-managed (even those on hold), there is no need to have a fixed default value. --- .../_protocol/streaming_pull_manager.py | 19 +------------------ pubsub/tests/system.py | 16 ++++++---------- .../subscriber/test_streaming_pull_manager.py | 4 +--- 3 files changed, 8 insertions(+), 31 deletions(-) diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index bc3059cb69af..f3798c05610e 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -51,13 +51,6 @@ _RESUME_THRESHOLD = 0.8 """The load threshold below which to resume the incoming message stream.""" -_DEFAULT_STREAM_ACK_DEADLINE = 60 -"""The default message acknowledge deadline in seconds for incoming message stream. - -This default deadline is dynamically modified for the messages that are added -to the lease management. -""" - def _maybe_wrap_exception(exception): """Wraps a gRPC exception class, if needed.""" @@ -412,17 +405,7 @@ def open(self, callback, on_callback_error): ) # Create the RPC - - # We must use a fixed value for the ACK deadline, as we cannot read it - # from the subscription. The latter would require `pubsub.subscriptions.get` - # permission, which is not granted to the default subscriber role - # `roles/pubsub.subscriber`. - # See also https://github.com/googleapis/google-cloud-python/issues/9339 - # - # When dynamic lease management is enabled for the "on hold" messages, - # the default stream ACK deadline should again be set based on the - # historic ACK timing data, i.e. `self.ack_histogram.percentile(99)`. - stream_ack_deadline_seconds = _DEFAULT_STREAM_ACK_DEADLINE + stream_ack_deadline_seconds = self.ack_histogram.percentile(99) get_initial_request = functools.partial( self._get_initial_request, stream_ack_deadline_seconds diff --git a/pubsub/tests/system.py b/pubsub/tests/system.py index fd7473e1e53b..59e5e3fe83a4 100644 --- a/pubsub/tests/system.py +++ b/pubsub/tests/system.py @@ -382,10 +382,6 @@ class CallbackError(Exception): with pytest.raises(CallbackError): future.result(timeout=30) - @pytest.mark.xfail( - reason="The default stream ACK deadline is static and received messages " - "exceeding FlowControl.max_messages are currently not lease managed." - ) def test_streaming_pull_ack_deadline( self, publisher, subscriber, project, topic_path, subscription_path, cleanup ): @@ -400,7 +396,7 @@ def test_streaming_pull_ack_deadline( # Subscribe to the topic. This must happen before the messages # are published. subscriber.create_subscription( - subscription_path, topic_path, ack_deadline_seconds=240 + subscription_path, topic_path, ack_deadline_seconds=45 ) # publish some messages and wait for completion @@ -408,7 +404,7 @@ def test_streaming_pull_ack_deadline( # subscribe to the topic callback = StreamingPullCallback( - processing_time=70, # more than the default stream ACK deadline (60s) + processing_time=13, # more than the default stream ACK deadline (10s) resolve_at_msg_count=3, # one more than the published messages count ) flow_control = types.FlowControl(max_messages=1) @@ -416,13 +412,13 @@ def test_streaming_pull_ack_deadline( subscription_path, callback, flow_control=flow_control ) - # We expect to process the first two messages in 2 * 70 seconds, and + # We expect to process the first two messages in 2 * 13 seconds, and # any duplicate message that is re-sent by the backend in additional - # 70 seconds, totalling 210 seconds (+ overhead) --> if there have been - # no duplicates in 240 seconds, we can reasonably assume that there + # 13 seconds, totalling 39 seconds (+ overhead) --> if there have been + # no duplicates in 60 seconds, we can reasonably assume that there # won't be any. try: - callback.done_future.result(timeout=240) + callback.done_future.result(timeout=60) except exceptions.TimeoutError: # future timed out, because we received no excessive messages assert sorted(callback.seen_message_ids) == [1, 2] diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index 761ac073083c..1732ec6cd4b3 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -429,8 +429,6 @@ def test_heartbeat_inactive(): "google.cloud.pubsub_v1.subscriber._protocol.heartbeater.Heartbeater", autospec=True ) def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bidi_rpc): - stream_ack_deadline = streaming_pull_manager._DEFAULT_STREAM_ACK_DEADLINE - manager = make_manager() manager.open(mock.sentinel.callback, mock.sentinel.on_callback_error) @@ -460,7 +458,7 @@ def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bi ) initial_request_arg = resumable_bidi_rpc.call_args.kwargs["initial_request"] assert initial_request_arg.func == manager._get_initial_request - assert initial_request_arg.args[0] == stream_ack_deadline + assert initial_request_arg.args[0] == 10 # the default stream ACK timeout assert not manager._client.api.get_subscription.called resumable_bidi_rpc.return_value.add_done_callback.assert_called_once_with(