Skip to content

Commit 8fc14f8

Browse files
committed
IGNITE-15103 Implement debug and error logging of connections and queries - Fixes #45.
1 parent dd3b280 commit 8fc14f8

File tree

7 files changed

+208
-64
lines changed

7 files changed

+208
-64
lines changed

pyignite/connection/aio_connection.py

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
from typing import Union
3434

3535
from pyignite.constants import PROTOCOLS, PROTOCOL_BYTE_ORDER
36-
from pyignite.exceptions import HandshakeError, SocketError, connection_errors
36+
from pyignite.exceptions import HandshakeError, SocketError, connection_errors, AuthenticationError
3737
from .bitmask_feature import BitmaskFeature
3838
from .connection import BaseConnection
3939

@@ -68,7 +68,7 @@ def data_received(self, data: bytes) -> None:
6868
hs_response = self.__parse_handshake(packet, self._conn.client)
6969
self._handshake_fut.set_result(hs_response)
7070
else:
71-
self._conn.on_message(packet)
71+
self._conn.process_message(packet)
7272
self._buffer = self._buffer[packet_sz:len(self._buffer)]
7373

7474
def __has_full_response(self):
@@ -84,7 +84,7 @@ def __process_connection_error(self, exc):
8484
connected = self._handshake_fut.done()
8585
if not connected:
8686
self._handshake_fut.set_exception(exc)
87-
self._conn.on_connection_lost(exc, connected)
87+
self._conn.process_connection_lost(exc, connected)
8888

8989
@staticmethod
9090
def __send_handshake(transport, conn):
@@ -177,38 +177,41 @@ async def _connect(self):
177177
self.client.protocol_context = ProtocolContext(max(PROTOCOLS), BitmaskFeature.all_supported())
178178

179179
try:
180+
self._on_handshake_start()
180181
result = await self._connect_version()
181182
except HandshakeError as e:
182183
if e.expected_version in PROTOCOLS:
183184
self.client.protocol_context.version = e.expected_version
184185
result = await self._connect_version()
185186
else:
187+
self._on_handshake_fail(e)
186188
raise e
187-
except connection_errors:
189+
except AuthenticationError as e:
190+
self._on_handshake_fail(e)
191+
raise e
192+
except Exception as e:
188193
# restore undefined protocol version
189194
if detecting_protocol:
190195
self.client.protocol_context = None
191-
raise
196+
self._on_handshake_fail(e)
197+
raise e
192198

193-
# connection is ready for end user
194-
features = BitmaskFeature.from_array(result.get('features', None))
195-
self.client.protocol_context.features = features
196-
self.uuid = result.get('node_uuid', None) # version-specific (1.4+)
197-
self.failed = False
199+
self._on_handshake_success(result)
198200

199-
def on_connection_lost(self, error, reconnect=False):
201+
def process_connection_lost(self, err, reconnect=False):
200202
self.failed = True
201203
for _, fut in self._pending_reqs.items():
202-
fut.set_exception(error)
204+
fut.set_exception(err)
203205
self._pending_reqs.clear()
204206

205207
if self._transport_closed_fut and not self._transport_closed_fut.done():
206208
self._transport_closed_fut.set_result(None)
207209

208210
if reconnect and not self._closed:
211+
self._on_connection_lost(err)
209212
self._loop.create_task(self._reconnect())
210213

211-
def on_message(self, data):
214+
def process_message(self, data):
212215
req_id = int.from_bytes(data[4:12], byteorder=PROTOCOL_BYTE_ORDER, signed=True)
213216
if req_id in self._pending_reqs:
214217
self._pending_reqs[req_id].set_result(data)
@@ -227,7 +230,7 @@ async def _connect_version(self) -> Union[dict, OrderedDict]:
227230
hs_response = await handshake_fut
228231

229232
if hs_response.op_code == 0:
230-
await self._close_transport()
233+
await self.close()
231234
self._process_handshake_error(hs_response)
232235

233236
return hs_response
@@ -281,4 +284,5 @@ async def _close_transport(self):
281284
except asyncio.TimeoutError:
282285
pass
283286
finally:
287+
self._on_connection_lost(expected=True)
284288
self._transport_closed_fut = None

pyignite/connection/connection.py

Lines changed: 55 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515

16+
import logging
1617
from collections import OrderedDict
1718
import socket
1819
from typing import Union
@@ -28,6 +29,8 @@
2829

2930
CLIENT_STATUS_AUTH_FAILURE = 2000
3031

32+
logger = logging.getLogger('.'.join(__name__.split('.')[:-1]))
33+
3134

3235
class BaseConnection:
3336
def __init__(self, client, host: str = None, port: int = None, username: str = None, password: str = None,
@@ -78,21 +81,53 @@ def protocol_context(self):
7881
return self.client.protocol_context
7982

8083
def _process_handshake_error(self, response):
81-
error_text = f'Handshake error: {response.message}'
8284
# if handshake fails for any reason other than protocol mismatch
8385
# (i.e. authentication error), server version is 0.0.0
86+
if response.client_status == CLIENT_STATUS_AUTH_FAILURE:
87+
raise AuthenticationError(response.message)
88+
8489
protocol_version = self.client.protocol_context.version
8590
server_version = (response.version_major, response.version_minor, response.version_patch)
86-
91+
error_text = f'Handshake error: {response.message}'
8792
if any(server_version):
8893
error_text += f' Server expects binary protocol version ' \
8994
f'{server_version[0]}.{server_version[1]}.{server_version[2]}. ' \
9095
f'Client provides ' \
9196
f'{protocol_version[0]}.{protocol_version[1]}.{protocol_version[2]}.'
92-
elif response.client_status == CLIENT_STATUS_AUTH_FAILURE:
93-
raise AuthenticationError(error_text)
9497
raise HandshakeError(server_version, error_text)
9598

99+
def _on_handshake_start(self):
100+
if logger.isEnabledFor(logging.DEBUG):
101+
logger.debug("Connecting to node(address=%s, port=%d) with protocol context %s",
102+
self.host, self.port, self.client.protocol_context)
103+
104+
def _on_handshake_success(self, result):
105+
features = BitmaskFeature.from_array(result.get('features', None))
106+
self.client.protocol_context.features = features
107+
self.uuid = result.get('node_uuid', None) # version-specific (1.4+)
108+
self.failed = False
109+
110+
if logger.isEnabledFor(logging.DEBUG):
111+
logger.debug("Connected to node(address=%s, port=%d, node_uuid=%s) with protocol context %s",
112+
self.host, self.port, self.uuid, self.client.protocol_context)
113+
114+
def _on_handshake_fail(self, err):
115+
if isinstance(err, AuthenticationError):
116+
logger.error("Authentication failed while connecting to node(address=%s, port=%d): %s",
117+
self.host, self.port, err)
118+
else:
119+
logger.error("Failed to perform handshake, connection to node(address=%s, port=%d) "
120+
"with protocol context %s failed: %s",
121+
self.host, self.port, self.client.protocol_context, err, exc_info=True)
122+
123+
def _on_connection_lost(self, err=None, expected=False):
124+
if expected and logger.isEnabledFor(logging.DEBUG):
125+
logger.debug("Connection closed to node(address=%s, port=%d, node_uuid=%s)",
126+
self.host, self.port, self.uuid)
127+
else:
128+
logger.info("Connection lost to node(address=%s, port=%d, node_uuid=%s): %s",
129+
self.host, self.port, self.uuid, err)
130+
96131

97132
class Connection(BaseConnection):
98133
"""
@@ -168,24 +203,26 @@ def connect(self):
168203
self.client.protocol_context = ProtocolContext(max(PROTOCOLS), BitmaskFeature.all_supported())
169204

170205
try:
206+
self._on_handshake_start()
171207
result = self._connect_version()
172208
except HandshakeError as e:
173209
if e.expected_version in PROTOCOLS:
174210
self.client.protocol_context.version = e.expected_version
175211
result = self._connect_version()
176212
else:
213+
self._on_handshake_fail(e)
177214
raise e
178-
except connection_errors:
215+
except AuthenticationError as e:
216+
self._on_handshake_fail(e)
217+
raise e
218+
except Exception as e:
179219
# restore undefined protocol version
180220
if detecting_protocol:
181221
self.client.protocol_context = None
182-
raise
222+
self._on_handshake_fail(e)
223+
raise e
183224

184-
# connection is ready for end user
185-
features = BitmaskFeature.from_array(result.get('features', None))
186-
self.client.protocol_context.features = features
187-
self.uuid = result.get('node_uuid', None) # version-specific (1.4+)
188-
self.failed = False
225+
self._on_handshake_success(result)
189226

190227
def _connect_version(self) -> Union[dict, OrderedDict]:
191228
"""
@@ -258,11 +295,12 @@ def send(self, data: Union[bytes, bytearray], flags=None, reconnect=True):
258295

259296
try:
260297
self._socket.sendall(data, **kwargs)
261-
except connection_errors:
298+
except connection_errors as e:
262299
self.failed = True
263300
if reconnect:
301+
self._on_connection_lost(e)
264302
self.reconnect()
265-
raise
303+
raise e
266304

267305
def recv(self, flags=None, reconnect=True) -> bytearray:
268306
"""
@@ -287,11 +325,12 @@ def recv(self, flags=None, reconnect=True) -> bytearray:
287325
if bytes_received == 0:
288326
raise SocketError('Connection broken.')
289327
bytes_total_received += bytes_received
290-
except connection_errors:
328+
except connection_errors as e:
291329
self.failed = True
292330
if reconnect:
331+
self._on_connection_lost(e)
293332
self.reconnect()
294-
raise
333+
raise e
295334

296335
if bytes_total_received < 4:
297336
continue
@@ -325,5 +364,5 @@ def close(self):
325364
self._socket.close()
326365
except connection_errors:
327366
pass
328-
367+
self._on_connection_lost(expected=True)
329368
self._socket = None

pyignite/connection/protocol_context.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ def __eq__(self, other):
3737
self.version == other.version and \
3838
self.features == other.features
3939

40+
def __str__(self):
41+
return f'ProtocolContext(version={self._version}, features={self._features})'
42+
4043
def _ensure_consistency(self):
4144
if not self.is_feature_flags_supported():
4245
self._features = None

0 commit comments

Comments
 (0)