Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 13 additions & 14 deletions faktory/_proto.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ def is_supported_server_version(self, v: int) -> bool:

def get_message(self) -> Iterator[str]:
socket = self.socket
buffer = self.select_data(self.buffer_size)
buffer = self.select_data()
while self.is_connected or self.is_connecting:
buffering = True
while buffering:
Expand Down Expand Up @@ -271,32 +271,31 @@ def get_message(self) -> Iterator[str]:
self.log.debug("> {}".format("nil"))
yield ""
else:
if len(buffer) >= number_of_bytes:
# we've already got enough bytes in the buffer
data = buffer[:number_of_bytes]
buffer = buffer[number_of_bytes:]
else:
data = buffer
while len(data) != number_of_bytes:
bytes_required = number_of_bytes - len(data)
data += self.select_data(bytes_required)
buffer = []
while len(buffer) < number_of_bytes:
buffer += self.select_data()
data = buffer[:number_of_bytes]
buffer = buffer[number_of_bytes:]
resp = data.decode().strip("\r\n ")
if self.debug:
self.log.debug("> {}".format(resp))
yield resp
else:
more = self.select_data(self.buffer_size)
more = self.select_data()
if not more:
buffering = False
else:
buffer += more

def select_data(self, buffer_size: int):
def select_data(self):
s = self.socket
ready = select.select([s], [], [], self.timeout)
if ready[0]:
buffer = s.recv(buffer_size)
buffer = s.recv(self.buffer_size)
if self.use_tls:
unread = s.pending()
while unread:
buffer += s.recv(unread)
unread = s.pending()
if len(buffer) > 0:
return buffer
self.disconnect()
Expand Down