From 6fc496bbb728540c38f06cea0caa13ac5a2f7d50 Mon Sep 17 00:00:00 2001 From: Arjun Singh Date: Tue, 18 Jul 2023 16:25:15 -0700 Subject: [PATCH 1/4] Clarify where buffer_size is used by using self.buffer_size in select_data --- faktory/_proto.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/faktory/_proto.py b/faktory/_proto.py index 85d5050..a203f36 100644 --- a/faktory/_proto.py +++ b/faktory/_proto.py @@ -243,7 +243,7 @@ def is_supported_server_version(self, v: int) -> bool: def get_message(self) -> Iterator[str]: socket = self.socket - buffer = self.select_data(self.buffer_size) + buffer = self.select_data() while self.is_connected or self.is_connecting: buffering = True while buffering: @@ -286,17 +286,17 @@ def get_message(self) -> Iterator[str]: self.log.debug("> {}".format(resp)) yield resp else: - more = self.select_data(self.buffer_size) + more = self.select_data() if not more: buffering = False else: buffer += more - def select_data(self, buffer_size: int): + def select_data(self): s = self.socket ready = select.select([s], [], [], self.timeout) if ready[0]: - buffer = s.recv(buffer_size) + buffer = s.recv(self.buffer_size) if len(buffer) > 0: return buffer self.disconnect() From d3d710f806cf99ba741ae3d0e8f05fa34901c3cd Mon Sep 17 00:00:00 2001 From: Arjun Singh Date: Tue, 18 Jul 2023 16:26:51 -0700 Subject: [PATCH 2/4] Fix issue with secure socket having unread data, but returning as not ready --- faktory/_proto.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/faktory/_proto.py b/faktory/_proto.py index a203f36..d4c14b1 100644 --- a/faktory/_proto.py +++ b/faktory/_proto.py @@ -297,6 +297,10 @@ def select_data(self): ready = select.select([s], [], [], self.timeout) if ready[0]: buffer = s.recv(self.buffer_size) + unread = s.pending() + while unread: + buffer += s.recv(unread) + unread = s.pending() if len(buffer) > 0: return buffer self.disconnect() From 248ee04df2dc933f7e0636aa7e20421f5fc52c25 Mon Sep 17 00:00:00 2001 From: Arjun Singh Date: Tue, 18 Jul 2023 23:51:51 -0700 Subject: [PATCH 3/4] Clean up get_message & don't rely on receiving a specific number of bytes from the socket --- faktory/_proto.py | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/faktory/_proto.py b/faktory/_proto.py index d4c14b1..91f3a5d 100644 --- a/faktory/_proto.py +++ b/faktory/_proto.py @@ -271,16 +271,10 @@ def get_message(self) -> Iterator[str]: self.log.debug("> {}".format("nil")) yield "" else: - if len(buffer) >= number_of_bytes: - # we've already got enough bytes in the buffer - data = buffer[:number_of_bytes] - buffer = buffer[number_of_bytes:] - else: - data = buffer - while len(data) != number_of_bytes: - bytes_required = number_of_bytes - len(data) - data += self.select_data(bytes_required) - buffer = [] + while len(buffer) < number_of_bytes: + buffer += self.select_data() + data = buffer[:number_of_bytes] + buffer = buffer[number_of_bytes:] resp = data.decode().strip("\r\n ") if self.debug: self.log.debug("> {}".format(resp)) From cc172526bd8e1290e1ed4ec8ee1560ef47edb106 Mon Sep 17 00:00:00 2001 From: Arjun Singh Date: Wed, 19 Jul 2023 14:26:54 -0700 Subject: [PATCH 4/4] Don't check socket.pending unless using TLS --- faktory/_proto.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/faktory/_proto.py b/faktory/_proto.py index 91f3a5d..786d32f 100644 --- a/faktory/_proto.py +++ b/faktory/_proto.py @@ -291,10 +291,11 @@ def select_data(self): ready = select.select([s], [], [], self.timeout) if ready[0]: buffer = s.recv(self.buffer_size) - unread = s.pending() - while unread: - buffer += s.recv(unread) + if self.use_tls: unread = s.pending() + while unread: + buffer += s.recv(unread) + unread = s.pending() if len(buffer) > 0: return buffer self.disconnect()