From 80c7640eb57578f75cf11c6db37c8f969c5da1ec Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Wed, 1 May 2019 15:48:04 -0700 Subject: [PATCH 1/2] remove prints --- .../pubsub_v1/subscriber/_protocol/streaming_pull_manager.py | 5 ++--- pubsub/tests/system.py | 1 + 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index d025fa71368f..ed51b8724eac 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -195,7 +195,7 @@ def load(self): messages_percent = self._leaser.message_count / self._flow_control.max_messages bytes_percent = self._leaser.bytes / self._flow_control.max_bytes - print(f"{messages_percent}, {bytes_percent}") + return max(messages_percent, bytes_percent) def add_close_callback(self, callback): @@ -213,7 +213,7 @@ def maybe_pause_consumer(self): if self._consumer is not None and not self._consumer.is_paused: _LOGGER.debug("Message backlog over load at %.2f, pausing.", self.load) self._consumer.pause() - print('paused') + def maybe_resume_consumer(self): """Check the current load and resume the consumer if needed.""" @@ -227,7 +227,6 @@ def maybe_resume_consumer(self): return if self.load < self.flow_control.resume_threshold: - print('resuming') self._consumer.resume() else: _LOGGER.debug("Did not resume, current load is %s", self.load) diff --git a/pubsub/tests/system.py b/pubsub/tests/system.py index 80349240fabb..e6001f8e7801 100644 --- a/pubsub/tests/system.py +++ b/pubsub/tests/system.py @@ -65,6 +65,7 @@ def cleanup(): for to_call, argument in registry: to_call(argument) + def test_publish_messages(publisher, topic_path, cleanup): futures = [] # Make sure the topic gets deleted. From 98c2b80c76516906bbc9b819e143145b8fcf1517 Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Wed, 1 May 2019 16:27:33 -0700 Subject: [PATCH 2/2] black formatter --- .../subscriber/_protocol/streaming_pull_manager.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index ed51b8724eac..acf514775779 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -193,10 +193,12 @@ def load(self): if self._leaser is None: return 0 - messages_percent = self._leaser.message_count / self._flow_control.max_messages - bytes_percent = self._leaser.bytes / self._flow_control.max_bytes - - return max(messages_percent, bytes_percent) + return max( + [ + self._leaser.message_count / self._flow_control.max_messages, + self._leaser.bytes / self._flow_control.max_bytes, + ] + ) def add_close_callback(self, callback): """Schedules a callable when the manager closes. @@ -208,12 +210,10 @@ def add_close_callback(self, callback): def maybe_pause_consumer(self): """Check the current load and pause the consumer if needed.""" - print(self.load) if self.load >= 1.0: if self._consumer is not None and not self._consumer.is_paused: _LOGGER.debug("Message backlog over load at %.2f, pausing.", self.load) self._consumer.pause() - def maybe_resume_consumer(self): """Check the current load and resume the consumer if needed."""