diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index d025fa71368f..acf514775779 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -193,10 +193,12 @@ def load(self): if self._leaser is None: return 0 - messages_percent = self._leaser.message_count / self._flow_control.max_messages - bytes_percent = self._leaser.bytes / self._flow_control.max_bytes - print(f"{messages_percent}, {bytes_percent}") - return max(messages_percent, bytes_percent) + return max( + [ + self._leaser.message_count / self._flow_control.max_messages, + self._leaser.bytes / self._flow_control.max_bytes, + ] + ) def add_close_callback(self, callback): """Schedules a callable when the manager closes. @@ -208,12 +210,10 @@ def add_close_callback(self, callback): def maybe_pause_consumer(self): """Check the current load and pause the consumer if needed.""" - print(self.load) if self.load >= 1.0: if self._consumer is not None and not self._consumer.is_paused: _LOGGER.debug("Message backlog over load at %.2f, pausing.", self.load) self._consumer.pause() - print('paused') def maybe_resume_consumer(self): """Check the current load and resume the consumer if needed.""" @@ -227,7 +227,6 @@ def maybe_resume_consumer(self): return if self.load < self.flow_control.resume_threshold: - print('resuming') self._consumer.resume() else: _LOGGER.debug("Did not resume, current load is %s", self.load) diff --git a/pubsub/tests/system.py b/pubsub/tests/system.py index 80349240fabb..e6001f8e7801 100644 --- a/pubsub/tests/system.py +++ b/pubsub/tests/system.py @@ -65,6 +65,7 @@ def cleanup(): for to_call, argument in registry: to_call(argument) + def test_publish_messages(publisher, topic_path, cleanup): futures = [] # Make sure the topic gets deleted.