From f878af2df26da6f924564d2c3852f9bca5bb37d3 Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Fri, 28 Jun 2019 10:34:53 +0100 Subject: [PATCH 01/20] Trace e2e --- synapse/handlers/e2e_keys.py | 39 ++++++++++++++++++++++++++++ synapse/handlers/e2e_room_keys.py | 10 +++++++ synapse/rest/client/v2_alpha/keys.py | 13 ++++++++++ synapse/storage/e2e_room_keys.py | 9 +++++++ synapse/storage/end_to_end_keys.py | 35 +++++++++++++++++++++++++ 5 files changed, 106 insertions(+) diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 1f90b0d27864..ff29a39d7d09 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -15,6 +15,7 @@ # limitations under the License. import logging +from synapse.util.tracerutils import TracerUtil, trace_defered_function from six import iteritems @@ -46,6 +47,7 @@ def __init__(self, hs): "client_keys", self.on_federation_query_client_keys ) + @trace_defered_function @defer.inlineCallbacks def query_devices(self, query_body, timeout): """ Handle a device key query from a client @@ -81,6 +83,9 @@ def query_devices(self, query_body, timeout): else: remote_queries[user_id] = device_ids + TracerUtil.set_tag("local_key_query", local_query) + TracerUtil.set_tag("remote_key_query", remote_queries) + # First get local devices. failures = {} results = {} @@ -121,6 +126,7 @@ def query_devices(self, query_body, timeout): r[user_id] = remote_queries[user_id] # Now fetch any devices that we don't have in our cache + @trace_defered_function @defer.inlineCallbacks def do_remote_query(destination): """This is called when we are querying the device list of a user on @@ -185,6 +191,8 @@ def do_remote_query(destination): except Exception as e: failure = _exception_to_failure(e) failures[destination] = failure + TracerUtil.set_tag("error", True) + TracerUtil.set_tag("reason", failure) yield make_deferred_yieldable( defer.gatherResults( @@ -198,6 +206,7 @@ def do_remote_query(destination): return {"device_keys": results, "failures": failures} + @trace_defered_function @defer.inlineCallbacks def query_local_devices(self, query): """Get E2E device keys for local users @@ -210,6 +219,7 @@ def query_local_devices(self, query): defer.Deferred: (resolves to dict[string, dict[string, dict]]): map from user_id -> device_id -> device details """ + TracerUtil.set_tag("local_query", query) local_query = [] result_dict = {} @@ -217,6 +227,14 @@ def query_local_devices(self, query): # we use UserID.from_string to catch invalid user ids if not self.is_mine(UserID.from_string(user_id)): logger.warning("Request for keys for non-local user %s", user_id) + TracerUtil.log_kv( + { + "message": "Requested a local key for a user which" + + " was not local to the homeserver", + "user_id": user_id, + } + ) + TracerUtil.set_tag("error", True) raise SynapseError(400, "Not a user here") if not device_ids: @@ -241,6 +259,7 @@ def query_local_devices(self, query): r["unsigned"]["device_display_name"] = display_name result_dict[user_id][device_id] = r + TracerUtil.log_kv(results) return result_dict @defer.inlineCallbacks @@ -251,6 +270,7 @@ def on_federation_query_client_keys(self, query_body): res = yield self.query_local_devices(device_keys_query) return {"device_keys": res} + @trace_defered_function @defer.inlineCallbacks def claim_one_time_keys(self, query, timeout): local_query = [] @@ -265,6 +285,9 @@ def claim_one_time_keys(self, query, timeout): domain = get_domain_from_id(user_id) remote_queries.setdefault(domain, {})[user_id] = device_keys + TracerUtil.set_tag("local_key_query", local_query) + TracerUtil.set_tag("remote_key_query", remote_queries) + results = yield self.store.claim_e2e_one_time_keys(local_query) json_result = {} @@ -276,8 +299,10 @@ def claim_one_time_keys(self, query, timeout): key_id: json.loads(json_bytes) } + @trace_defered_function @defer.inlineCallbacks def claim_client_keys(destination): + TracerUtil.set_tag("destination", destination) device_keys = remote_queries[destination] try: remote_result = yield self.federation.claim_client_keys( @@ -290,6 +315,8 @@ def claim_client_keys(destination): except Exception as e: failure = _exception_to_failure(e) failures[destination] = failure + TracerUtil.set_tag("error", True) + TracerUtil.set_tag("reason", failure) yield make_deferred_yieldable( defer.gatherResults( @@ -313,15 +340,21 @@ def claim_client_keys(destination): ), ) + TracerUtil.log_kv({"one_time_keys": json_result, "failures": failures}) return {"one_time_keys": json_result, "failures": failures} + @trace_defered_function @defer.inlineCallbacks def upload_keys_for_user(self, user_id, device_id, keys): + TracerUtil.set_tag("user_id", user_id) + TracerUtil.set_tag("device_id", device_id) + TracerUtil.set_tag("keys", keys) time_now = self.clock.time_msec() # TODO: Validate the JSON to make sure it has the right keys. device_keys = keys.get("device_keys", None) + TracerUtil.set_tag("device_keys", device_keys) if device_keys: logger.info( "Updating device_keys for device %r for user %s at %d", @@ -342,6 +375,10 @@ def upload_keys_for_user(self, user_id, device_id, keys): yield self._upload_one_time_keys_for_user( user_id, device_id, time_now, one_time_keys ) + else: + TracerUtil.log_kv( + {"event": "did not upload one_time_keys", "reason": "no keys given"} + ) # the device should have been registered already, but it may have been # deleted due to a race with a DELETE request. Or we may be using an @@ -352,8 +389,10 @@ def upload_keys_for_user(self, user_id, device_id, keys): result = yield self.store.count_e2e_one_time_keys(user_id, device_id) + TracerUtil.set_tag("one_time_key_counts", result) return {"one_time_key_counts": result} + @trace_defered_function @defer.inlineCallbacks def _upload_one_time_keys_for_user( self, user_id, device_id, time_now, one_time_keys diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py index 41b871fc5953..06837bb12647 100644 --- a/synapse/handlers/e2e_room_keys.py +++ b/synapse/handlers/e2e_room_keys.py @@ -27,6 +27,7 @@ SynapseError, ) from synapse.util.async_helpers import Linearizer +from synapse.util.tracerutils import TracerUtil, trace_defered_function logger = logging.getLogger(__name__) @@ -49,6 +50,7 @@ def __init__(self, hs): # changed. self._upload_linearizer = Linearizer("upload_room_keys_lock") + @trace_defered_function @defer.inlineCallbacks def get_room_keys(self, user_id, version, room_id=None, session_id=None): """Bulk get the E2E room keys for a given backup, optionally filtered to a given @@ -84,8 +86,10 @@ def get_room_keys(self, user_id, version, room_id=None, session_id=None): user_id, version, room_id, session_id ) + TracerUtil.log_kv(results) return results + @trace_defered_function @defer.inlineCallbacks def delete_room_keys(self, user_id, version, room_id=None, session_id=None): """Bulk delete the E2E room keys for a given backup, optionally filtered to a given @@ -107,6 +111,7 @@ def delete_room_keys(self, user_id, version, room_id=None, session_id=None): with (yield self._upload_linearizer.queue(user_id)): yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id) + @trace_defered_function @defer.inlineCallbacks def upload_room_keys(self, user_id, version, room_keys): """Bulk upload a list of room keys into a given backup version, asserting @@ -174,6 +179,7 @@ def upload_room_keys(self, user_id, version, room_keys): user_id, version, room_id, session_id, session ) + @trace_defered_function @defer.inlineCallbacks def _upload_room_key(self, user_id, version, room_id, session_id, room_key): """Upload a given room_key for a given room and session into a given @@ -236,6 +242,7 @@ def _should_replace_room_key(current_room_key, room_key): return False return True + @trace_defered_function @defer.inlineCallbacks def create_version(self, user_id, version_info): """Create a new backup version. This automatically becomes the new @@ -264,6 +271,7 @@ def create_version(self, user_id, version_info): ) return new_version + @trace_defered_function @defer.inlineCallbacks def get_version_info(self, user_id, version=None): """Get the info about a given version of the user's backup @@ -294,6 +302,7 @@ def get_version_info(self, user_id, version=None): raise return res + @trace_defered_function @defer.inlineCallbacks def delete_version(self, user_id, version=None): """Deletes a given version of the user's e2e_room_keys backup @@ -314,6 +323,7 @@ def delete_version(self, user_id, version=None): else: raise + @trace_defered_function @defer.inlineCallbacks def update_version(self, user_id, version, version_info): """Update the info about a given version of the user's backup diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py index 6008adec7cf3..8de9e12df493 100644 --- a/synapse/rest/client/v2_alpha/keys.py +++ b/synapse/rest/client/v2_alpha/keys.py @@ -25,6 +25,10 @@ parse_string, ) from synapse.types import StreamToken +from synapse.util.tracerutils import ( + TracerUtil, + trace_defered_function_using_operation_name, +) from ._base import client_patterns @@ -68,6 +72,7 @@ def __init__(self, hs): self.auth = hs.get_auth() self.e2e_keys_handler = hs.get_e2e_keys_handler() + @trace_defered_function_using_operation_name("upload_keys") @defer.inlineCallbacks def on_POST(self, request, device_id): requester = yield self.auth.get_user_by_req(request, allow_guest=True) @@ -78,6 +83,14 @@ def on_POST(self, request, device_id): # passing the device_id here is deprecated; however, we allow it # for now for compatibility with older clients. if requester.device_id is not None and device_id != requester.device_id: + TracerUtil.set_tag("error", True) + TracerUtil.log_kv( + { + "message": "Client uploading keys for a different device", + "logged_in_id": requester.device_id, + "key_being_uploaded": device_id, + } + ) logger.warning( "Client uploading keys for a different device " "(logged in as %s, uploading for %s)", diff --git a/synapse/storage/e2e_room_keys.py b/synapse/storage/e2e_room_keys.py index 99128f2df70e..b0e426d30ee8 100644 --- a/synapse/storage/e2e_room_keys.py +++ b/synapse/storage/e2e_room_keys.py @@ -18,11 +18,13 @@ from twisted.internet import defer from synapse.api.errors import StoreError +from synapse.util.tracerutils import trace_defered_function, trace_function from ._base import SQLBaseStore class EndToEndRoomKeyStore(SQLBaseStore): + @trace_defered_function @defer.inlineCallbacks def get_e2e_room_key(self, user_id, version, room_id, session_id): """Get the encrypted E2E room key for a given session from a given @@ -63,6 +65,7 @@ def get_e2e_room_key(self, user_id, version, room_id, session_id): return row + @trace_defered_function @defer.inlineCallbacks def set_e2e_room_key(self, user_id, version, room_id, session_id, room_key): """Replaces or inserts the encrypted E2E room key for a given session in @@ -95,6 +98,7 @@ def set_e2e_room_key(self, user_id, version, room_id, session_id, room_key): lock=False, ) + @trace_defered_function @defer.inlineCallbacks def get_e2e_room_keys(self, user_id, version, room_id=None, session_id=None): """Bulk get the E2E room keys for a given backup, optionally filtered to a given @@ -153,6 +157,7 @@ def get_e2e_room_keys(self, user_id, version, room_id=None, session_id=None): return sessions + @trace_defered_function @defer.inlineCallbacks def delete_e2e_room_keys(self, user_id, version, room_id=None, session_id=None): """Bulk delete the E2E room keys for a given backup, optionally filtered to a given @@ -194,6 +199,7 @@ def _get_current_version(txn, user_id): raise StoreError(404, "No current backup version") return row[0] + @trace_function def get_e2e_room_keys_version_info(self, user_id, version=None): """Get info metadata about a version of our room_keys backup. @@ -236,6 +242,7 @@ def _get_e2e_room_keys_version_info_txn(txn): "get_e2e_room_keys_version_info", _get_e2e_room_keys_version_info_txn ) + @trace_function def create_e2e_room_keys_version(self, user_id, info): """Atomically creates a new version of this user's e2e_room_keys store with the given version info. @@ -276,6 +283,7 @@ def _create_e2e_room_keys_version_txn(txn): "create_e2e_room_keys_version_txn", _create_e2e_room_keys_version_txn ) + @trace_function def update_e2e_room_keys_version(self, user_id, version, info): """Update a given backup version @@ -292,6 +300,7 @@ def update_e2e_room_keys_version(self, user_id, version, info): desc="update_e2e_room_keys_version", ) + @trace_function def delete_e2e_room_keys_version(self, user_id, version=None): """Delete a given backup version of the user's room keys. Doesn't delete their actual key data. diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index 1e07474e706a..9779cb70c987 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -22,8 +22,11 @@ from ._base import SQLBaseStore, db_to_json +from synapse.util.tracerutils import TracerUtil, trace_defered_function, trace_function + class EndToEndKeyWorkerStore(SQLBaseStore): + @trace_defered_function @defer.inlineCallbacks def get_e2e_device_keys( self, query_list, include_all_devices=False, include_deleted_devices=False @@ -40,6 +43,7 @@ def get_e2e_device_keys( Dict mapping from user-id to dict mapping from device_id to dict containing "key_json", "device_display_name". """ + TracerUtil.set_tag("query_list", query_list) if not query_list: return {} @@ -57,9 +61,13 @@ def get_e2e_device_keys( return results + @trace_function def _get_e2e_device_keys_txn( self, txn, query_list, include_all_devices=False, include_deleted_devices=False ): + TracerUtil.set_tag("include_all_devices", include_all_devices) + TracerUtil.set_tag("include_deleted_devices", include_deleted_devices) + query_clauses = [] query_params = [] @@ -104,8 +112,10 @@ def _get_e2e_device_keys_txn( for user_id, device_id in deleted_devices: result.setdefault(user_id, {})[device_id] = None + TracerUtil.log_kv(result) return result + @trace_defered_function @defer.inlineCallbacks def get_e2e_one_time_keys(self, user_id, device_id, key_ids): """Retrieve a number of one-time keys for a user @@ -121,6 +131,10 @@ def get_e2e_one_time_keys(self, user_id, device_id, key_ids): key_id) to json string for key """ + TracerUtil.set_tag("user_id", user_id) + TracerUtil.set_tag("device_id", device_id) + TracerUtil.set_tag("key_ids", key_ids) + rows = yield self._simple_select_many_batch( table="e2e_one_time_keys_json", column="key_id", @@ -145,7 +159,11 @@ def add_e2e_one_time_keys(self, user_id, device_id, time_now, new_keys): (algorithm, key_id, key json) """ + @trace_function def _add_e2e_one_time_keys(txn): + TracerUtil.set_tag("user_id", user_id) + TracerUtil.set_tag("device_id", device_id) + TracerUtil.set_tag("new_keys", new_keys) # We are protected from race between lookup and insertion due to # a unique constraint. If there is a race of two calls to # `add_e2e_one_time_keys` then they'll conflict and we will only @@ -201,7 +219,13 @@ def set_e2e_device_keys(self, user_id, device_id, time_now, device_keys): or the keys were already in the database. """ + @trace_function def _set_e2e_device_keys_txn(txn): + TracerUtil.set_tag("user_id", user_id) + TracerUtil.set_tag("device_id", device_id) + TracerUtil.set_tag("time_now", time_now) + TracerUtil.set_tag("device_keys", device_keys) + old_key_json = self._simple_select_one_onecol_txn( txn, table="e2e_device_keys_json", @@ -215,6 +239,7 @@ def _set_e2e_device_keys_txn(txn): new_key_json = encode_canonical_json(device_keys).decode("utf-8") if old_key_json == new_key_json: + TracerUtil.set_tag("error", True) return False self._simple_upsert_txn( @@ -231,6 +256,7 @@ def _set_e2e_device_keys_txn(txn): def claim_e2e_one_time_keys(self, query_list): """Take a list of one time keys out of the database""" + @trace_function def _claim_e2e_one_time_keys(txn): sql = ( "SELECT key_id, key_json FROM e2e_one_time_keys_json" @@ -252,7 +278,13 @@ def _claim_e2e_one_time_keys(txn): " AND key_id = ?" ) for user_id, device_id, algorithm, key_id in delete: + TracerUtil.log_kv( + {"message": "executing claim transaction on database"} + ) txn.execute(sql, (user_id, device_id, algorithm, key_id)) + TracerUtil.log_kv( + {"message": "finished executing and invalidating cache"} + ) self._invalidate_cache_and_stream( txn, self.count_e2e_one_time_keys, (user_id, device_id) ) @@ -261,7 +293,10 @@ def _claim_e2e_one_time_keys(txn): return self.runInteraction("claim_e2e_one_time_keys", _claim_e2e_one_time_keys) def delete_e2e_keys_by_device(self, user_id, device_id): + @trace_function def delete_e2e_keys_by_device_txn(txn): + TracerUtil.set_tag("user_id", user_id) + TracerUtil.set_tag("device_id", device_id) self._simple_delete_txn( txn, table="e2e_device_keys_json", From 62e0b78f71949415e8c5e868535c49afefd87de8 Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Fri, 28 Jun 2019 15:39:18 +0100 Subject: [PATCH 02/20] Fix e2e bugs --- synapse/storage/e2e_room_keys.py | 10 +++++----- synapse/storage/end_to_end_keys.py | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/synapse/storage/e2e_room_keys.py b/synapse/storage/e2e_room_keys.py index b0e426d30ee8..ced6dd04ded0 100644 --- a/synapse/storage/e2e_room_keys.py +++ b/synapse/storage/e2e_room_keys.py @@ -18,7 +18,7 @@ from twisted.internet import defer from synapse.api.errors import StoreError -from synapse.util.tracerutils import trace_defered_function, trace_function +from synapse.util.tracerutils import trace_defered_function from ._base import SQLBaseStore @@ -199,7 +199,7 @@ def _get_current_version(txn, user_id): raise StoreError(404, "No current backup version") return row[0] - @trace_function + @trace_defered_function def get_e2e_room_keys_version_info(self, user_id, version=None): """Get info metadata about a version of our room_keys backup. @@ -242,7 +242,7 @@ def _get_e2e_room_keys_version_info_txn(txn): "get_e2e_room_keys_version_info", _get_e2e_room_keys_version_info_txn ) - @trace_function + @trace_defered_function def create_e2e_room_keys_version(self, user_id, info): """Atomically creates a new version of this user's e2e_room_keys store with the given version info. @@ -283,7 +283,7 @@ def _create_e2e_room_keys_version_txn(txn): "create_e2e_room_keys_version_txn", _create_e2e_room_keys_version_txn ) - @trace_function + @trace_defered_function def update_e2e_room_keys_version(self, user_id, version, info): """Update a given backup version @@ -300,7 +300,7 @@ def update_e2e_room_keys_version(self, user_id, version, info): desc="update_e2e_room_keys_version", ) - @trace_function + @trace_defered_function def delete_e2e_room_keys_version(self, user_id, version=None): """Delete a given backup version of the user's room keys. Doesn't delete their actual key data. diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index 9779cb70c987..acfe3da0ad07 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -239,7 +239,7 @@ def _set_e2e_device_keys_txn(txn): new_key_json = encode_canonical_json(device_keys).decode("utf-8") if old_key_json == new_key_json: - TracerUtil.set_tag("error", True) + TracerUtil.log_kv({"event", "key already stored"}) return False self._simple_upsert_txn( From 84a90def15968d026198b2163fddd04f828ba0dd Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Fri, 28 Jun 2019 17:09:25 +0100 Subject: [PATCH 03/20] Trace more e2e stuff and less e2e stuff --- synapse/handlers/e2e_room_keys.py | 1 - synapse/rest/client/v2_alpha/room_keys.py | 5 +++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py index 06837bb12647..07724d1c4a1e 100644 --- a/synapse/handlers/e2e_room_keys.py +++ b/synapse/handlers/e2e_room_keys.py @@ -271,7 +271,6 @@ def create_version(self, user_id, version_info): ) return new_version - @trace_defered_function @defer.inlineCallbacks def get_version_info(self, user_id, version=None): """Get the info about a given version of the user's backup diff --git a/synapse/rest/client/v2_alpha/room_keys.py b/synapse/rest/client/v2_alpha/room_keys.py index 10dec96208b6..34ab74d3a4e8 100644 --- a/synapse/rest/client/v2_alpha/room_keys.py +++ b/synapse/rest/client/v2_alpha/room_keys.py @@ -23,6 +23,10 @@ parse_json_object_from_request, parse_string, ) +from synapse.util.tracerutils import ( + TracerUtil, + trace_defered_function_using_operation_name, +) from ._base import client_patterns @@ -311,6 +315,7 @@ def __init__(self, hs): self.auth = hs.get_auth() self.e2e_room_keys_handler = hs.get_e2e_room_keys_handler() + @trace_defered_function_using_operation_name("get_room_keys_version") @defer.inlineCallbacks def on_GET(self, request, version): """ From 47ba86736d241f7d8c47c723384f800a6183ca77 Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Tue, 2 Jul 2019 17:34:48 +0100 Subject: [PATCH 04/20] Update to new access pattern --- synapse/handlers/e2e_keys.py | 56 +++++++++++------------ synapse/handlers/e2e_room_keys.py | 18 ++++---- synapse/rest/client/v2_alpha/keys.py | 12 ++--- synapse/rest/client/v2_alpha/room_keys.py | 7 +-- synapse/storage/end_to_end_keys.py | 54 +++++++++++----------- 5 files changed, 70 insertions(+), 77 deletions(-) diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index ff29a39d7d09..6efa21aab848 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -15,7 +15,7 @@ # limitations under the License. import logging -from synapse.util.tracerutils import TracerUtil, trace_defered_function +import synapse.util.tracerutils as tracerutils from six import iteritems @@ -47,7 +47,7 @@ def __init__(self, hs): "client_keys", self.on_federation_query_client_keys ) - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def query_devices(self, query_body, timeout): """ Handle a device key query from a client @@ -83,8 +83,8 @@ def query_devices(self, query_body, timeout): else: remote_queries[user_id] = device_ids - TracerUtil.set_tag("local_key_query", local_query) - TracerUtil.set_tag("remote_key_query", remote_queries) + tracerutils.set_tag("local_key_query", local_query) + tracerutils.set_tag("remote_key_query", remote_queries) # First get local devices. failures = {} @@ -126,7 +126,7 @@ def query_devices(self, query_body, timeout): r[user_id] = remote_queries[user_id] # Now fetch any devices that we don't have in our cache - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def do_remote_query(destination): """This is called when we are querying the device list of a user on @@ -191,8 +191,8 @@ def do_remote_query(destination): except Exception as e: failure = _exception_to_failure(e) failures[destination] = failure - TracerUtil.set_tag("error", True) - TracerUtil.set_tag("reason", failure) + tracerutils.set_tag("error", True) + tracerutils.set_tag("reason", failure) yield make_deferred_yieldable( defer.gatherResults( @@ -206,7 +206,7 @@ def do_remote_query(destination): return {"device_keys": results, "failures": failures} - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def query_local_devices(self, query): """Get E2E device keys for local users @@ -219,7 +219,7 @@ def query_local_devices(self, query): defer.Deferred: (resolves to dict[string, dict[string, dict]]): map from user_id -> device_id -> device details """ - TracerUtil.set_tag("local_query", query) + tracerutils.set_tag("local_query", query) local_query = [] result_dict = {} @@ -227,14 +227,14 @@ def query_local_devices(self, query): # we use UserID.from_string to catch invalid user ids if not self.is_mine(UserID.from_string(user_id)): logger.warning("Request for keys for non-local user %s", user_id) - TracerUtil.log_kv( + tracerutils.log_kv( { "message": "Requested a local key for a user which" + " was not local to the homeserver", "user_id": user_id, } ) - TracerUtil.set_tag("error", True) + tracerutils.set_tag("error", True) raise SynapseError(400, "Not a user here") if not device_ids: @@ -259,7 +259,7 @@ def query_local_devices(self, query): r["unsigned"]["device_display_name"] = display_name result_dict[user_id][device_id] = r - TracerUtil.log_kv(results) + tracerutils.log_kv(results) return result_dict @defer.inlineCallbacks @@ -270,7 +270,7 @@ def on_federation_query_client_keys(self, query_body): res = yield self.query_local_devices(device_keys_query) return {"device_keys": res} - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def claim_one_time_keys(self, query, timeout): local_query = [] @@ -285,8 +285,8 @@ def claim_one_time_keys(self, query, timeout): domain = get_domain_from_id(user_id) remote_queries.setdefault(domain, {})[user_id] = device_keys - TracerUtil.set_tag("local_key_query", local_query) - TracerUtil.set_tag("remote_key_query", remote_queries) + tracerutils.set_tag("local_key_query", local_query) + tracerutils.set_tag("remote_key_query", remote_queries) results = yield self.store.claim_e2e_one_time_keys(local_query) @@ -299,10 +299,10 @@ def claim_one_time_keys(self, query, timeout): key_id: json.loads(json_bytes) } - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def claim_client_keys(destination): - TracerUtil.set_tag("destination", destination) + tracerutils.set_tag("destination", destination) device_keys = remote_queries[destination] try: remote_result = yield self.federation.claim_client_keys( @@ -315,8 +315,8 @@ def claim_client_keys(destination): except Exception as e: failure = _exception_to_failure(e) failures[destination] = failure - TracerUtil.set_tag("error", True) - TracerUtil.set_tag("reason", failure) + tracerutils.set_tag("error", True) + tracerutils.set_tag("reason", failure) yield make_deferred_yieldable( defer.gatherResults( @@ -340,21 +340,21 @@ def claim_client_keys(destination): ), ) - TracerUtil.log_kv({"one_time_keys": json_result, "failures": failures}) + tracerutils.log_kv({"one_time_keys": json_result, "failures": failures}) return {"one_time_keys": json_result, "failures": failures} - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def upload_keys_for_user(self, user_id, device_id, keys): - TracerUtil.set_tag("user_id", user_id) - TracerUtil.set_tag("device_id", device_id) - TracerUtil.set_tag("keys", keys) + tracerutils.set_tag("user_id", user_id) + tracerutils.set_tag("device_id", device_id) + tracerutils.set_tag("keys", keys) time_now = self.clock.time_msec() # TODO: Validate the JSON to make sure it has the right keys. device_keys = keys.get("device_keys", None) - TracerUtil.set_tag("device_keys", device_keys) + tracerutils.set_tag("device_keys", device_keys) if device_keys: logger.info( "Updating device_keys for device %r for user %s at %d", @@ -376,7 +376,7 @@ def upload_keys_for_user(self, user_id, device_id, keys): user_id, device_id, time_now, one_time_keys ) else: - TracerUtil.log_kv( + tracerutils.log_kv( {"event": "did not upload one_time_keys", "reason": "no keys given"} ) @@ -389,10 +389,10 @@ def upload_keys_for_user(self, user_id, device_id, keys): result = yield self.store.count_e2e_one_time_keys(user_id, device_id) - TracerUtil.set_tag("one_time_key_counts", result) + tracerutils.set_tag("one_time_key_counts", result) return {"one_time_key_counts": result} - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def _upload_one_time_keys_for_user( self, user_id, device_id, time_now, one_time_keys diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py index 07724d1c4a1e..a90ec2f9fcca 100644 --- a/synapse/handlers/e2e_room_keys.py +++ b/synapse/handlers/e2e_room_keys.py @@ -27,7 +27,7 @@ SynapseError, ) from synapse.util.async_helpers import Linearizer -from synapse.util.tracerutils import TracerUtil, trace_defered_function +import synapse.util.tracerutils as tracerutils logger = logging.getLogger(__name__) @@ -50,7 +50,7 @@ def __init__(self, hs): # changed. self._upload_linearizer = Linearizer("upload_room_keys_lock") - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def get_room_keys(self, user_id, version, room_id=None, session_id=None): """Bulk get the E2E room keys for a given backup, optionally filtered to a given @@ -86,10 +86,10 @@ def get_room_keys(self, user_id, version, room_id=None, session_id=None): user_id, version, room_id, session_id ) - TracerUtil.log_kv(results) + tracerutils.log_kv(results) return results - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def delete_room_keys(self, user_id, version, room_id=None, session_id=None): """Bulk delete the E2E room keys for a given backup, optionally filtered to a given @@ -111,7 +111,7 @@ def delete_room_keys(self, user_id, version, room_id=None, session_id=None): with (yield self._upload_linearizer.queue(user_id)): yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id) - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def upload_room_keys(self, user_id, version, room_keys): """Bulk upload a list of room keys into a given backup version, asserting @@ -179,7 +179,7 @@ def upload_room_keys(self, user_id, version, room_keys): user_id, version, room_id, session_id, session ) - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def _upload_room_key(self, user_id, version, room_id, session_id, room_key): """Upload a given room_key for a given room and session into a given @@ -242,7 +242,7 @@ def _should_replace_room_key(current_room_key, room_key): return False return True - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def create_version(self, user_id, version_info): """Create a new backup version. This automatically becomes the new @@ -301,7 +301,7 @@ def get_version_info(self, user_id, version=None): raise return res - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def delete_version(self, user_id, version=None): """Deletes a given version of the user's e2e_room_keys backup @@ -322,7 +322,7 @@ def delete_version(self, user_id, version=None): else: raise - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def update_version(self, user_id, version, version_info): """Update the info about a given version of the user's backup diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py index 8de9e12df493..40052e7a05af 100644 --- a/synapse/rest/client/v2_alpha/keys.py +++ b/synapse/rest/client/v2_alpha/keys.py @@ -25,11 +25,7 @@ parse_string, ) from synapse.types import StreamToken -from synapse.util.tracerutils import ( - TracerUtil, - trace_defered_function_using_operation_name, -) - +import synapse.util.tracerutils as tracerutils from ._base import client_patterns logger = logging.getLogger(__name__) @@ -72,7 +68,7 @@ def __init__(self, hs): self.auth = hs.get_auth() self.e2e_keys_handler = hs.get_e2e_keys_handler() - @trace_defered_function_using_operation_name("upload_keys") + @tracerutils.trace_defered_function_using_operation_name("upload_keys") @defer.inlineCallbacks def on_POST(self, request, device_id): requester = yield self.auth.get_user_by_req(request, allow_guest=True) @@ -83,8 +79,8 @@ def on_POST(self, request, device_id): # passing the device_id here is deprecated; however, we allow it # for now for compatibility with older clients. if requester.device_id is not None and device_id != requester.device_id: - TracerUtil.set_tag("error", True) - TracerUtil.log_kv( + tracerutils.set_tag("error", True) + tracerutils.log_kv( { "message": "Client uploading keys for a different device", "logged_in_id": requester.device_id, diff --git a/synapse/rest/client/v2_alpha/room_keys.py b/synapse/rest/client/v2_alpha/room_keys.py index 34ab74d3a4e8..ac03d5889977 100644 --- a/synapse/rest/client/v2_alpha/room_keys.py +++ b/synapse/rest/client/v2_alpha/room_keys.py @@ -23,10 +23,7 @@ parse_json_object_from_request, parse_string, ) -from synapse.util.tracerutils import ( - TracerUtil, - trace_defered_function_using_operation_name, -) +import synapse.util.tracerutils as tracerutils from ._base import client_patterns @@ -315,7 +312,7 @@ def __init__(self, hs): self.auth = hs.get_auth() self.e2e_room_keys_handler = hs.get_e2e_room_keys_handler() - @trace_defered_function_using_operation_name("get_room_keys_version") + @tracerutils.trace_defered_function_using_operation_name("get_room_keys_version") @defer.inlineCallbacks def on_GET(self, request, version): """ diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index acfe3da0ad07..98246c347d02 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -22,11 +22,11 @@ from ._base import SQLBaseStore, db_to_json -from synapse.util.tracerutils import TracerUtil, trace_defered_function, trace_function +import synapse.util.tracerutils as tracerutils class EndToEndKeyWorkerStore(SQLBaseStore): - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def get_e2e_device_keys( self, query_list, include_all_devices=False, include_deleted_devices=False @@ -43,7 +43,7 @@ def get_e2e_device_keys( Dict mapping from user-id to dict mapping from device_id to dict containing "key_json", "device_display_name". """ - TracerUtil.set_tag("query_list", query_list) + tracerutils.set_tag("query_list", query_list) if not query_list: return {} @@ -61,12 +61,12 @@ def get_e2e_device_keys( return results - @trace_function + @tracerutils.trace_function def _get_e2e_device_keys_txn( self, txn, query_list, include_all_devices=False, include_deleted_devices=False ): - TracerUtil.set_tag("include_all_devices", include_all_devices) - TracerUtil.set_tag("include_deleted_devices", include_deleted_devices) + tracerutils.set_tag("include_all_devices", include_all_devices) + tracerutils.set_tag("include_deleted_devices", include_deleted_devices) query_clauses = [] query_params = [] @@ -112,10 +112,10 @@ def _get_e2e_device_keys_txn( for user_id, device_id in deleted_devices: result.setdefault(user_id, {})[device_id] = None - TracerUtil.log_kv(result) + tracerutils.log_kv(result) return result - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def get_e2e_one_time_keys(self, user_id, device_id, key_ids): """Retrieve a number of one-time keys for a user @@ -131,9 +131,9 @@ def get_e2e_one_time_keys(self, user_id, device_id, key_ids): key_id) to json string for key """ - TracerUtil.set_tag("user_id", user_id) - TracerUtil.set_tag("device_id", device_id) - TracerUtil.set_tag("key_ids", key_ids) + tracerutils.set_tag("user_id", user_id) + tracerutils.set_tag("device_id", device_id) + tracerutils.set_tag("key_ids", key_ids) rows = yield self._simple_select_many_batch( table="e2e_one_time_keys_json", @@ -159,11 +159,11 @@ def add_e2e_one_time_keys(self, user_id, device_id, time_now, new_keys): (algorithm, key_id, key json) """ - @trace_function + @tracerutils.strace_function def _add_e2e_one_time_keys(txn): - TracerUtil.set_tag("user_id", user_id) - TracerUtil.set_tag("device_id", device_id) - TracerUtil.set_tag("new_keys", new_keys) + tracerutils.set_tag("user_id", user_id) + tracerutils.set_tag("device_id", device_id) + tracerutils.set_tag("new_keys", new_keys) # We are protected from race between lookup and insertion due to # a unique constraint. If there is a race of two calls to # `add_e2e_one_time_keys` then they'll conflict and we will only @@ -219,12 +219,12 @@ def set_e2e_device_keys(self, user_id, device_id, time_now, device_keys): or the keys were already in the database. """ - @trace_function + @tracerutils.trace_function def _set_e2e_device_keys_txn(txn): - TracerUtil.set_tag("user_id", user_id) - TracerUtil.set_tag("device_id", device_id) - TracerUtil.set_tag("time_now", time_now) - TracerUtil.set_tag("device_keys", device_keys) + tracerutils.set_tag("user_id", user_id) + tracerutils.set_tag("device_id", device_id) + tracerutils.set_tag("time_now", time_now) + tracerutils.set_tag("device_keys", device_keys) old_key_json = self._simple_select_one_onecol_txn( txn, @@ -239,7 +239,7 @@ def _set_e2e_device_keys_txn(txn): new_key_json = encode_canonical_json(device_keys).decode("utf-8") if old_key_json == new_key_json: - TracerUtil.log_kv({"event", "key already stored"}) + tracerutils.log_kv({"event", "key already stored"}) return False self._simple_upsert_txn( @@ -256,7 +256,7 @@ def _set_e2e_device_keys_txn(txn): def claim_e2e_one_time_keys(self, query_list): """Take a list of one time keys out of the database""" - @trace_function + @tracerutils.trace_function def _claim_e2e_one_time_keys(txn): sql = ( "SELECT key_id, key_json FROM e2e_one_time_keys_json" @@ -278,11 +278,11 @@ def _claim_e2e_one_time_keys(txn): " AND key_id = ?" ) for user_id, device_id, algorithm, key_id in delete: - TracerUtil.log_kv( + tracerutils.log_kv( {"message": "executing claim transaction on database"} ) txn.execute(sql, (user_id, device_id, algorithm, key_id)) - TracerUtil.log_kv( + tracerutils.log_kv( {"message": "finished executing and invalidating cache"} ) self._invalidate_cache_and_stream( @@ -293,10 +293,10 @@ def _claim_e2e_one_time_keys(txn): return self.runInteraction("claim_e2e_one_time_keys", _claim_e2e_one_time_keys) def delete_e2e_keys_by_device(self, user_id, device_id): - @trace_function + @tracerutils.trace_function def delete_e2e_keys_by_device_txn(txn): - TracerUtil.set_tag("user_id", user_id) - TracerUtil.set_tag("device_id", device_id) + tracerutils.set_tag("user_id", user_id) + tracerutils.set_tag("device_id", device_id) self._simple_delete_txn( txn, table="e2e_device_keys_json", From 37da7849b3b5a06dffa32c373bb919a7fc2ad8b4 Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Wed, 3 Jul 2019 11:09:42 +0100 Subject: [PATCH 05/20] typo --- synapse/storage/end_to_end_keys.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index 98246c347d02..9fbff3ebf47c 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -159,7 +159,7 @@ def add_e2e_one_time_keys(self, user_id, device_id, time_now, new_keys): (algorithm, key_id, key json) """ - @tracerutils.strace_function + @tracerutils.trace_function def _add_e2e_one_time_keys(txn): tracerutils.set_tag("user_id", user_id) tracerutils.set_tag("device_id", device_id) From e7fefb15c5323d5b0d35afe393616fc2beb27c78 Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Thu, 4 Jul 2019 17:11:46 +0100 Subject: [PATCH 06/20] The great logging/ migration --- synapse/handlers/e2e_keys.py | 56 +++++++++++------------ synapse/handlers/e2e_room_keys.py | 18 ++++---- synapse/rest/client/v2_alpha/keys.py | 8 ++-- synapse/rest/client/v2_alpha/room_keys.py | 4 +- synapse/storage/e2e_room_keys.py | 2 +- synapse/storage/end_to_end_keys.py | 54 +++++++++++----------- 6 files changed, 71 insertions(+), 71 deletions(-) diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 6efa21aab848..5e8538721d35 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -15,7 +15,7 @@ # limitations under the License. import logging -import synapse.util.tracerutils as tracerutils +import synapse.logging.opentracing as opentracing from six import iteritems @@ -47,7 +47,7 @@ def __init__(self, hs): "client_keys", self.on_federation_query_client_keys ) - @tracerutils.trace_defered_function + @opentracing.trace_defered_function @defer.inlineCallbacks def query_devices(self, query_body, timeout): """ Handle a device key query from a client @@ -83,8 +83,8 @@ def query_devices(self, query_body, timeout): else: remote_queries[user_id] = device_ids - tracerutils.set_tag("local_key_query", local_query) - tracerutils.set_tag("remote_key_query", remote_queries) + opentracing.set_tag("local_key_query", local_query) + opentracing.set_tag("remote_key_query", remote_queries) # First get local devices. failures = {} @@ -126,7 +126,7 @@ def query_devices(self, query_body, timeout): r[user_id] = remote_queries[user_id] # Now fetch any devices that we don't have in our cache - @tracerutils.trace_defered_function + @opentracing.trace_defered_function @defer.inlineCallbacks def do_remote_query(destination): """This is called when we are querying the device list of a user on @@ -191,8 +191,8 @@ def do_remote_query(destination): except Exception as e: failure = _exception_to_failure(e) failures[destination] = failure - tracerutils.set_tag("error", True) - tracerutils.set_tag("reason", failure) + opentracing.set_tag("error", True) + opentracing.set_tag("reason", failure) yield make_deferred_yieldable( defer.gatherResults( @@ -206,7 +206,7 @@ def do_remote_query(destination): return {"device_keys": results, "failures": failures} - @tracerutils.trace_defered_function + @opentracing.trace_defered_function @defer.inlineCallbacks def query_local_devices(self, query): """Get E2E device keys for local users @@ -219,7 +219,7 @@ def query_local_devices(self, query): defer.Deferred: (resolves to dict[string, dict[string, dict]]): map from user_id -> device_id -> device details """ - tracerutils.set_tag("local_query", query) + opentracing.set_tag("local_query", query) local_query = [] result_dict = {} @@ -227,14 +227,14 @@ def query_local_devices(self, query): # we use UserID.from_string to catch invalid user ids if not self.is_mine(UserID.from_string(user_id)): logger.warning("Request for keys for non-local user %s", user_id) - tracerutils.log_kv( + opentracing.log_kv( { "message": "Requested a local key for a user which" + " was not local to the homeserver", "user_id": user_id, } ) - tracerutils.set_tag("error", True) + opentracing.set_tag("error", True) raise SynapseError(400, "Not a user here") if not device_ids: @@ -259,7 +259,7 @@ def query_local_devices(self, query): r["unsigned"]["device_display_name"] = display_name result_dict[user_id][device_id] = r - tracerutils.log_kv(results) + opentracing.log_kv(results) return result_dict @defer.inlineCallbacks @@ -270,7 +270,7 @@ def on_federation_query_client_keys(self, query_body): res = yield self.query_local_devices(device_keys_query) return {"device_keys": res} - @tracerutils.trace_defered_function + @opentracing.trace_defered_function @defer.inlineCallbacks def claim_one_time_keys(self, query, timeout): local_query = [] @@ -285,8 +285,8 @@ def claim_one_time_keys(self, query, timeout): domain = get_domain_from_id(user_id) remote_queries.setdefault(domain, {})[user_id] = device_keys - tracerutils.set_tag("local_key_query", local_query) - tracerutils.set_tag("remote_key_query", remote_queries) + opentracing.set_tag("local_key_query", local_query) + opentracing.set_tag("remote_key_query", remote_queries) results = yield self.store.claim_e2e_one_time_keys(local_query) @@ -299,10 +299,10 @@ def claim_one_time_keys(self, query, timeout): key_id: json.loads(json_bytes) } - @tracerutils.trace_defered_function + @opentracing.trace_defered_function @defer.inlineCallbacks def claim_client_keys(destination): - tracerutils.set_tag("destination", destination) + opentracing.set_tag("destination", destination) device_keys = remote_queries[destination] try: remote_result = yield self.federation.claim_client_keys( @@ -315,8 +315,8 @@ def claim_client_keys(destination): except Exception as e: failure = _exception_to_failure(e) failures[destination] = failure - tracerutils.set_tag("error", True) - tracerutils.set_tag("reason", failure) + opentracing.set_tag("error", True) + opentracing.set_tag("reason", failure) yield make_deferred_yieldable( defer.gatherResults( @@ -340,21 +340,21 @@ def claim_client_keys(destination): ), ) - tracerutils.log_kv({"one_time_keys": json_result, "failures": failures}) + opentracing.log_kv({"one_time_keys": json_result, "failures": failures}) return {"one_time_keys": json_result, "failures": failures} - @tracerutils.trace_defered_function + @opentracing.trace_defered_function @defer.inlineCallbacks def upload_keys_for_user(self, user_id, device_id, keys): - tracerutils.set_tag("user_id", user_id) - tracerutils.set_tag("device_id", device_id) - tracerutils.set_tag("keys", keys) + opentracing.set_tag("user_id", user_id) + opentracing.set_tag("device_id", device_id) + opentracing.set_tag("keys", keys) time_now = self.clock.time_msec() # TODO: Validate the JSON to make sure it has the right keys. device_keys = keys.get("device_keys", None) - tracerutils.set_tag("device_keys", device_keys) + opentracing.set_tag("device_keys", device_keys) if device_keys: logger.info( "Updating device_keys for device %r for user %s at %d", @@ -376,7 +376,7 @@ def upload_keys_for_user(self, user_id, device_id, keys): user_id, device_id, time_now, one_time_keys ) else: - tracerutils.log_kv( + opentracing.log_kv( {"event": "did not upload one_time_keys", "reason": "no keys given"} ) @@ -389,10 +389,10 @@ def upload_keys_for_user(self, user_id, device_id, keys): result = yield self.store.count_e2e_one_time_keys(user_id, device_id) - tracerutils.set_tag("one_time_key_counts", result) + opentracing.set_tag("one_time_key_counts", result) return {"one_time_key_counts": result} - @tracerutils.trace_defered_function + @opentracing.trace_defered_function @defer.inlineCallbacks def _upload_one_time_keys_for_user( self, user_id, device_id, time_now, one_time_keys diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py index a90ec2f9fcca..0459d52a3c81 100644 --- a/synapse/handlers/e2e_room_keys.py +++ b/synapse/handlers/e2e_room_keys.py @@ -27,7 +27,7 @@ SynapseError, ) from synapse.util.async_helpers import Linearizer -import synapse.util.tracerutils as tracerutils +import synapse.logging.opentracing as opentracing logger = logging.getLogger(__name__) @@ -50,7 +50,7 @@ def __init__(self, hs): # changed. self._upload_linearizer = Linearizer("upload_room_keys_lock") - @tracerutils.trace_defered_function + @opentracing.trace_defered_function @defer.inlineCallbacks def get_room_keys(self, user_id, version, room_id=None, session_id=None): """Bulk get the E2E room keys for a given backup, optionally filtered to a given @@ -86,10 +86,10 @@ def get_room_keys(self, user_id, version, room_id=None, session_id=None): user_id, version, room_id, session_id ) - tracerutils.log_kv(results) + opentracing.log_kv(results) return results - @tracerutils.trace_defered_function + @opentracing.trace_defered_function @defer.inlineCallbacks def delete_room_keys(self, user_id, version, room_id=None, session_id=None): """Bulk delete the E2E room keys for a given backup, optionally filtered to a given @@ -111,7 +111,7 @@ def delete_room_keys(self, user_id, version, room_id=None, session_id=None): with (yield self._upload_linearizer.queue(user_id)): yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id) - @tracerutils.trace_defered_function + @opentracing.trace_defered_function @defer.inlineCallbacks def upload_room_keys(self, user_id, version, room_keys): """Bulk upload a list of room keys into a given backup version, asserting @@ -179,7 +179,7 @@ def upload_room_keys(self, user_id, version, room_keys): user_id, version, room_id, session_id, session ) - @tracerutils.trace_defered_function + @opentracing.trace_defered_function @defer.inlineCallbacks def _upload_room_key(self, user_id, version, room_id, session_id, room_key): """Upload a given room_key for a given room and session into a given @@ -242,7 +242,7 @@ def _should_replace_room_key(current_room_key, room_key): return False return True - @tracerutils.trace_defered_function + @opentracing.trace_defered_function @defer.inlineCallbacks def create_version(self, user_id, version_info): """Create a new backup version. This automatically becomes the new @@ -301,7 +301,7 @@ def get_version_info(self, user_id, version=None): raise return res - @tracerutils.trace_defered_function + @opentracing.trace_defered_function @defer.inlineCallbacks def delete_version(self, user_id, version=None): """Deletes a given version of the user's e2e_room_keys backup @@ -322,7 +322,7 @@ def delete_version(self, user_id, version=None): else: raise - @tracerutils.trace_defered_function + @opentracing.trace_defered_function @defer.inlineCallbacks def update_version(self, user_id, version, version_info): """Update the info about a given version of the user's backup diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py index 40052e7a05af..00260cde7351 100644 --- a/synapse/rest/client/v2_alpha/keys.py +++ b/synapse/rest/client/v2_alpha/keys.py @@ -25,7 +25,7 @@ parse_string, ) from synapse.types import StreamToken -import synapse.util.tracerutils as tracerutils +import synapse.logging.opentracing as opentracing from ._base import client_patterns logger = logging.getLogger(__name__) @@ -68,7 +68,7 @@ def __init__(self, hs): self.auth = hs.get_auth() self.e2e_keys_handler = hs.get_e2e_keys_handler() - @tracerutils.trace_defered_function_using_operation_name("upload_keys") + @opentracing.trace_defered_function_using_operation_name("upload_keys") @defer.inlineCallbacks def on_POST(self, request, device_id): requester = yield self.auth.get_user_by_req(request, allow_guest=True) @@ -79,8 +79,8 @@ def on_POST(self, request, device_id): # passing the device_id here is deprecated; however, we allow it # for now for compatibility with older clients. if requester.device_id is not None and device_id != requester.device_id: - tracerutils.set_tag("error", True) - tracerutils.log_kv( + opentracing.set_tag("error", True) + opentracing.log_kv( { "message": "Client uploading keys for a different device", "logged_in_id": requester.device_id, diff --git a/synapse/rest/client/v2_alpha/room_keys.py b/synapse/rest/client/v2_alpha/room_keys.py index ac03d5889977..51bd4f12fff0 100644 --- a/synapse/rest/client/v2_alpha/room_keys.py +++ b/synapse/rest/client/v2_alpha/room_keys.py @@ -23,7 +23,7 @@ parse_json_object_from_request, parse_string, ) -import synapse.util.tracerutils as tracerutils +import synapse.logging.opentracing as opentracing from ._base import client_patterns @@ -312,7 +312,7 @@ def __init__(self, hs): self.auth = hs.get_auth() self.e2e_room_keys_handler = hs.get_e2e_room_keys_handler() - @tracerutils.trace_defered_function_using_operation_name("get_room_keys_version") + @opentracing.trace_defered_function_using_operation_name("get_room_keys_version") @defer.inlineCallbacks def on_GET(self, request, version): """ diff --git a/synapse/storage/e2e_room_keys.py b/synapse/storage/e2e_room_keys.py index ced6dd04ded0..f18605dfc488 100644 --- a/synapse/storage/e2e_room_keys.py +++ b/synapse/storage/e2e_room_keys.py @@ -18,7 +18,7 @@ from twisted.internet import defer from synapse.api.errors import StoreError -from synapse.util.tracerutils import trace_defered_function +from synapse.logging.opentracing import trace_defered_function from ._base import SQLBaseStore diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index 9fbff3ebf47c..4fa694e06a0d 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -22,11 +22,11 @@ from ._base import SQLBaseStore, db_to_json -import synapse.util.tracerutils as tracerutils +import synapse.logging.opentracing as opentracing class EndToEndKeyWorkerStore(SQLBaseStore): - @tracerutils.trace_defered_function + @opentracing.trace_defered_function @defer.inlineCallbacks def get_e2e_device_keys( self, query_list, include_all_devices=False, include_deleted_devices=False @@ -43,7 +43,7 @@ def get_e2e_device_keys( Dict mapping from user-id to dict mapping from device_id to dict containing "key_json", "device_display_name". """ - tracerutils.set_tag("query_list", query_list) + opentracing.set_tag("query_list", query_list) if not query_list: return {} @@ -61,12 +61,12 @@ def get_e2e_device_keys( return results - @tracerutils.trace_function + @opentracing.trace_function def _get_e2e_device_keys_txn( self, txn, query_list, include_all_devices=False, include_deleted_devices=False ): - tracerutils.set_tag("include_all_devices", include_all_devices) - tracerutils.set_tag("include_deleted_devices", include_deleted_devices) + opentracing.set_tag("include_all_devices", include_all_devices) + opentracing.set_tag("include_deleted_devices", include_deleted_devices) query_clauses = [] query_params = [] @@ -112,10 +112,10 @@ def _get_e2e_device_keys_txn( for user_id, device_id in deleted_devices: result.setdefault(user_id, {})[device_id] = None - tracerutils.log_kv(result) + opentracing.log_kv(result) return result - @tracerutils.trace_defered_function + @opentracing.trace_defered_function @defer.inlineCallbacks def get_e2e_one_time_keys(self, user_id, device_id, key_ids): """Retrieve a number of one-time keys for a user @@ -131,9 +131,9 @@ def get_e2e_one_time_keys(self, user_id, device_id, key_ids): key_id) to json string for key """ - tracerutils.set_tag("user_id", user_id) - tracerutils.set_tag("device_id", device_id) - tracerutils.set_tag("key_ids", key_ids) + opentracing.set_tag("user_id", user_id) + opentracing.set_tag("device_id", device_id) + opentracing.set_tag("key_ids", key_ids) rows = yield self._simple_select_many_batch( table="e2e_one_time_keys_json", @@ -159,11 +159,11 @@ def add_e2e_one_time_keys(self, user_id, device_id, time_now, new_keys): (algorithm, key_id, key json) """ - @tracerutils.trace_function + @opentracing.trace_function def _add_e2e_one_time_keys(txn): - tracerutils.set_tag("user_id", user_id) - tracerutils.set_tag("device_id", device_id) - tracerutils.set_tag("new_keys", new_keys) + opentracing.set_tag("user_id", user_id) + opentracing.set_tag("device_id", device_id) + opentracing.set_tag("new_keys", new_keys) # We are protected from race between lookup and insertion due to # a unique constraint. If there is a race of two calls to # `add_e2e_one_time_keys` then they'll conflict and we will only @@ -219,12 +219,12 @@ def set_e2e_device_keys(self, user_id, device_id, time_now, device_keys): or the keys were already in the database. """ - @tracerutils.trace_function + @opentracing.trace_function def _set_e2e_device_keys_txn(txn): - tracerutils.set_tag("user_id", user_id) - tracerutils.set_tag("device_id", device_id) - tracerutils.set_tag("time_now", time_now) - tracerutils.set_tag("device_keys", device_keys) + opentracing.set_tag("user_id", user_id) + opentracing.set_tag("device_id", device_id) + opentracing.set_tag("time_now", time_now) + opentracing.set_tag("device_keys", device_keys) old_key_json = self._simple_select_one_onecol_txn( txn, @@ -239,7 +239,7 @@ def _set_e2e_device_keys_txn(txn): new_key_json = encode_canonical_json(device_keys).decode("utf-8") if old_key_json == new_key_json: - tracerutils.log_kv({"event", "key already stored"}) + opentracing.log_kv({"event", "key already stored"}) return False self._simple_upsert_txn( @@ -256,7 +256,7 @@ def _set_e2e_device_keys_txn(txn): def claim_e2e_one_time_keys(self, query_list): """Take a list of one time keys out of the database""" - @tracerutils.trace_function + @opentracing.trace_function def _claim_e2e_one_time_keys(txn): sql = ( "SELECT key_id, key_json FROM e2e_one_time_keys_json" @@ -278,11 +278,11 @@ def _claim_e2e_one_time_keys(txn): " AND key_id = ?" ) for user_id, device_id, algorithm, key_id in delete: - tracerutils.log_kv( + opentracing.log_kv( {"message": "executing claim transaction on database"} ) txn.execute(sql, (user_id, device_id, algorithm, key_id)) - tracerutils.log_kv( + opentracing.log_kv( {"message": "finished executing and invalidating cache"} ) self._invalidate_cache_and_stream( @@ -293,10 +293,10 @@ def _claim_e2e_one_time_keys(txn): return self.runInteraction("claim_e2e_one_time_keys", _claim_e2e_one_time_keys) def delete_e2e_keys_by_device(self, user_id, device_id): - @tracerutils.trace_function + @opentracing.trace_function def delete_e2e_keys_by_device_txn(txn): - tracerutils.set_tag("user_id", user_id) - tracerutils.set_tag("device_id", device_id) + opentracing.set_tag("user_id", user_id) + opentracing.set_tag("device_id", device_id) self._simple_delete_txn( txn, table="e2e_device_keys_json", From 631c4e411b815887355b4d5e2b1ecd0f536187c2 Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Mon, 8 Jul 2019 14:32:00 +0100 Subject: [PATCH 07/20] Clean up room key tracing --- synapse/handlers/e2e_room_keys.py | 21 ++++++++++++++++++--- synapse/storage/e2e_room_keys.py | 23 ++++++++++++++--------- 2 files changed, 32 insertions(+), 12 deletions(-) diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py index 0459d52a3c81..1c34ff712fa0 100644 --- a/synapse/handlers/e2e_room_keys.py +++ b/synapse/handlers/e2e_room_keys.py @@ -179,7 +179,6 @@ def upload_room_keys(self, user_id, version, room_keys): user_id, version, room_id, session_id, session ) - @opentracing.trace_defered_function @defer.inlineCallbacks def _upload_room_key(self, user_id, version, room_id, session_id, room_key): """Upload a given room_key for a given room and session into a given @@ -192,7 +191,14 @@ def _upload_room_key(self, user_id, version, room_id, session_id, room_key): session_id(str): the session whose room_key we're setting room_key(dict): the room_key being set """ - + opentracing.log_kv( + { + "message": "Trying to upload room key", + "room_id": room_id, + "session_id": session_id, + "user_id": user_id, + } + ) # get the room_key for this particular row current_room_key = None try: @@ -201,14 +207,23 @@ def _upload_room_key(self, user_id, version, room_id, session_id, room_key): ) except StoreError as e: if e.code == 404: - pass + opentracing.log_kv( + { + "message": "Room key not found.", + "room_id": room_id, + "user_id": user_id, + } + ) else: raise if self._should_replace_room_key(current_room_key, room_key): + opentracing.log_kv({"message": "Replacing room key."}) yield self.store.set_e2e_room_key( user_id, version, room_id, session_id, room_key ) + else: + opentracing.log_kv({"message": "Not replacing room_key."}) @staticmethod def _should_replace_room_key(current_room_key, room_key): diff --git a/synapse/storage/e2e_room_keys.py b/synapse/storage/e2e_room_keys.py index f18605dfc488..bfd8b51362e4 100644 --- a/synapse/storage/e2e_room_keys.py +++ b/synapse/storage/e2e_room_keys.py @@ -18,13 +18,12 @@ from twisted.internet import defer from synapse.api.errors import StoreError -from synapse.logging.opentracing import trace_defered_function +import synapse.logging.opentracing as opentracing from ._base import SQLBaseStore class EndToEndRoomKeyStore(SQLBaseStore): - @trace_defered_function @defer.inlineCallbacks def get_e2e_room_key(self, user_id, version, room_id, session_id): """Get the encrypted E2E room key for a given session from a given @@ -65,7 +64,6 @@ def get_e2e_room_key(self, user_id, version, room_id, session_id): return row - @trace_defered_function @defer.inlineCallbacks def set_e2e_room_key(self, user_id, version, room_id, session_id, room_key): """Replaces or inserts the encrypted E2E room key for a given session in @@ -97,8 +95,16 @@ def set_e2e_room_key(self, user_id, version, room_id, session_id, room_key): }, lock=False, ) + opentracing.log_kv( + { + "message": "Set room key", + "room_id": room_id, + "session_id": session_id, + "room_key": room_key, + } + ) - @trace_defered_function + @opentracing.trace_defered_function @defer.inlineCallbacks def get_e2e_room_keys(self, user_id, version, room_id=None, session_id=None): """Bulk get the E2E room keys for a given backup, optionally filtered to a given @@ -157,7 +163,7 @@ def get_e2e_room_keys(self, user_id, version, room_id=None, session_id=None): return sessions - @trace_defered_function + @opentracing.trace_defered_function @defer.inlineCallbacks def delete_e2e_room_keys(self, user_id, version, room_id=None, session_id=None): """Bulk delete the E2E room keys for a given backup, optionally filtered to a given @@ -199,7 +205,6 @@ def _get_current_version(txn, user_id): raise StoreError(404, "No current backup version") return row[0] - @trace_defered_function def get_e2e_room_keys_version_info(self, user_id, version=None): """Get info metadata about a version of our room_keys backup. @@ -242,7 +247,7 @@ def _get_e2e_room_keys_version_info_txn(txn): "get_e2e_room_keys_version_info", _get_e2e_room_keys_version_info_txn ) - @trace_defered_function + @opentracing.trace_defered_function def create_e2e_room_keys_version(self, user_id, info): """Atomically creates a new version of this user's e2e_room_keys store with the given version info. @@ -283,7 +288,7 @@ def _create_e2e_room_keys_version_txn(txn): "create_e2e_room_keys_version_txn", _create_e2e_room_keys_version_txn ) - @trace_defered_function + @opentracing.trace_defered_function def update_e2e_room_keys_version(self, user_id, version, info): """Update a given backup version @@ -300,7 +305,7 @@ def update_e2e_room_keys_version(self, user_id, version, info): desc="update_e2e_room_keys_version", ) - @trace_defered_function + @opentracing.trace_defered_function def delete_e2e_room_keys_version(self, user_id, version=None): """Delete a given backup version of the user's room keys. Doesn't delete their actual key data. From bdbe11dac472f508c401b0ef0fd7a75d00ebbb15 Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Mon, 8 Jul 2019 14:58:02 +0100 Subject: [PATCH 08/20] Cleanup key upload tracing --- synapse/handlers/e2e_keys.py | 28 +++++++++++++++++++++++----- synapse/storage/end_to_end_keys.py | 18 +++++++----------- 2 files changed, 30 insertions(+), 16 deletions(-) diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 5e8538721d35..47471b400e21 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -343,12 +343,10 @@ def claim_client_keys(destination): opentracing.log_kv({"one_time_keys": json_result, "failures": failures}) return {"one_time_keys": json_result, "failures": failures} - @opentracing.trace_defered_function @defer.inlineCallbacks def upload_keys_for_user(self, user_id, device_id, keys): opentracing.set_tag("user_id", user_id) opentracing.set_tag("device_id", device_id) - opentracing.set_tag("keys", keys) time_now = self.clock.time_msec() @@ -362,6 +360,13 @@ def upload_keys_for_user(self, user_id, device_id, keys): user_id, time_now, ) + opentracing.log_kv( + { + "message": "Updating device_keys for user.", + "user_id": user_id, + "device_id": device_id, + } + ) # TODO: Sign the JSON with the server key changed = yield self.store.set_e2e_device_keys( user_id, device_id, time_now, device_keys @@ -369,15 +374,26 @@ def upload_keys_for_user(self, user_id, device_id, keys): if changed: # Only notify about device updates *if* the keys actually changed yield self.device_handler.notify_device_update(user_id, [device_id]) - + else: + opentracing.log_kv( + {"message": "Not updating device_keys for user", "user_id": user_id} + ) one_time_keys = keys.get("one_time_keys", None) + opentracing.set_tag("one_time_keys", one_time_keys) if one_time_keys: + opentracing.log_kv( + { + "message": "Updating one_time_keys for device.", + "user_id": user_id, + "device_id": device_id, + } + ) yield self._upload_one_time_keys_for_user( user_id, device_id, time_now, one_time_keys ) else: opentracing.log_kv( - {"event": "did not upload one_time_keys", "reason": "no keys given"} + {"message": "Did not update one_time_keys", "reason": "no keys given"} ) # the device should have been registered already, but it may have been @@ -392,7 +408,6 @@ def upload_keys_for_user(self, user_id, device_id, keys): opentracing.set_tag("one_time_key_counts", result) return {"one_time_key_counts": result} - @opentracing.trace_defered_function @defer.inlineCallbacks def _upload_one_time_keys_for_user( self, user_id, device_id, time_now, one_time_keys @@ -434,6 +449,9 @@ def _upload_one_time_keys_for_user( (algorithm, key_id, encode_canonical_json(key).decode("ascii")) ) + opentracing.log_kv( + {"message": "Inserting new one_time_keys.", "keys": new_keys} + ) yield self.store.add_e2e_one_time_keys(user_id, device_id, time_now, new_keys) diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index 4fa694e06a0d..29d91dfa6b32 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -115,7 +115,6 @@ def _get_e2e_device_keys_txn( opentracing.log_kv(result) return result - @opentracing.trace_defered_function @defer.inlineCallbacks def get_e2e_one_time_keys(self, user_id, device_id, key_ids): """Retrieve a number of one-time keys for a user @@ -131,10 +130,6 @@ def get_e2e_one_time_keys(self, user_id, device_id, key_ids): key_id) to json string for key """ - opentracing.set_tag("user_id", user_id) - opentracing.set_tag("device_id", device_id) - opentracing.set_tag("key_ids", key_ids) - rows = yield self._simple_select_many_batch( table="e2e_one_time_keys_json", column="key_id", @@ -143,8 +138,11 @@ def get_e2e_one_time_keys(self, user_id, device_id, key_ids): keyvalues={"user_id": user_id, "device_id": device_id}, desc="add_e2e_one_time_keys_check", ) - - return {(row["algorithm"], row["key_id"]): row["key_json"] for row in rows} + result = {(row["algorithm"], row["key_id"]): row["key_json"] for row in rows} + opentracing.log_kv( + {"message": "Fetched one time keys for user", "one_time_keys": result} + ) + return result @defer.inlineCallbacks def add_e2e_one_time_keys(self, user_id, device_id, time_now, new_keys): @@ -159,7 +157,6 @@ def add_e2e_one_time_keys(self, user_id, device_id, time_now, new_keys): (algorithm, key_id, key json) """ - @opentracing.trace_function def _add_e2e_one_time_keys(txn): opentracing.set_tag("user_id", user_id) opentracing.set_tag("device_id", device_id) @@ -219,7 +216,6 @@ def set_e2e_device_keys(self, user_id, device_id, time_now, device_keys): or the keys were already in the database. """ - @opentracing.trace_function def _set_e2e_device_keys_txn(txn): opentracing.set_tag("user_id", user_id) opentracing.set_tag("device_id", device_id) @@ -239,7 +235,7 @@ def _set_e2e_device_keys_txn(txn): new_key_json = encode_canonical_json(device_keys).decode("utf-8") if old_key_json == new_key_json: - opentracing.log_kv({"event", "key already stored"}) + opentracing.log_kv({"Message", "Device key already stored."}) return False self._simple_upsert_txn( @@ -248,7 +244,7 @@ def _set_e2e_device_keys_txn(txn): keyvalues={"user_id": user_id, "device_id": device_id}, values={"ts_added_ms": time_now, "key_json": new_key_json}, ) - + opentracing.log_kv({"message": "Device keys stored."}) return True return self.runInteraction("set_e2e_device_keys", _set_e2e_device_keys_txn) From 27d4d793a842311d3f743b19e9efa701e04eaf6f Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Mon, 8 Jul 2019 15:12:05 +0100 Subject: [PATCH 09/20] Trace key claiming --- synapse/federation/federation_server.py | 4 ++++ synapse/storage/end_to_end_keys.py | 4 +++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index d216c46dfee0..6cfe550624a0 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -507,6 +507,7 @@ def on_query_client_keys(self, origin, content): def on_query_user_devices(self, origin, user_id): return self.on_query_request("user_devices", user_id) + @opentracing.trace_defered_function @defer.inlineCallbacks @log_function def on_claim_client_keys(self, origin, content): @@ -515,6 +516,9 @@ def on_claim_client_keys(self, origin, content): for device_id, algorithm in device_keys.items(): query.append((user_id, device_id, algorithm)) + opentracing.log_kv( + {"message": "Claiming one time keys.", "user, device pairs": query} + ) results = yield self.store.claim_e2e_one_time_keys(query) json_result = {} diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index 29d91dfa6b32..e82c32f0225b 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -275,7 +275,9 @@ def _claim_e2e_one_time_keys(txn): ) for user_id, device_id, algorithm, key_id in delete: opentracing.log_kv( - {"message": "executing claim transaction on database"} + { + "message": "Executing claim e2e_one_time_keys transaction on database." + } ) txn.execute(sql, (user_id, device_id, algorithm, key_id)) opentracing.log_kv( From b930e19d9f22d85826c8e22bfebbbac17982dff6 Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Thu, 11 Jul 2019 11:55:14 +0100 Subject: [PATCH 10/20] Nicer tracing --- synapse/rest/client/v2_alpha/keys.py | 6 +++++- synapse/storage/end_to_end_keys.py | 12 ++++++++---- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py index 00260cde7351..381c1e6da775 100644 --- a/synapse/rest/client/v2_alpha/keys.py +++ b/synapse/rest/client/v2_alpha/keys.py @@ -182,20 +182,24 @@ def __init__(self, hs): self.auth = hs.get_auth() self.device_handler = hs.get_device_handler() + @opentracing.trace_defered_function @defer.inlineCallbacks def on_GET(self, request): requester = yield self.auth.get_user_by_req(request, allow_guest=True) from_token_string = parse_string(request, "from") + opentracing.set_tag("from", from_token_string) # We want to enforce they do pass us one, but we ignore it and return # changes after the "to" as well as before. - parse_string(request, "to") + opentracing.set_tag("to", parse_string(request, "to")) from_token = StreamToken.from_string(from_token_string) user_id = requester.user.to_string() + opentracing.set_tag("user_id", user_id) + results = yield self.device_handler.get_user_ids_changed(user_id, from_token) return (200, results) diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index e82c32f0225b..ea5ed2737be0 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -235,7 +235,7 @@ def _set_e2e_device_keys_txn(txn): new_key_json = encode_canonical_json(device_keys).decode("utf-8") if old_key_json == new_key_json: - opentracing.log_kv({"Message", "Device key already stored."}) + opentracing.log_kv({"Message": "Device key already stored."}) return False self._simple_upsert_txn( @@ -291,10 +291,14 @@ def _claim_e2e_one_time_keys(txn): return self.runInteraction("claim_e2e_one_time_keys", _claim_e2e_one_time_keys) def delete_e2e_keys_by_device(self, user_id, device_id): - @opentracing.trace_function def delete_e2e_keys_by_device_txn(txn): - opentracing.set_tag("user_id", user_id) - opentracing.set_tag("device_id", device_id) + opentracing.log_kv( + { + "message": "Deleting keys for device", + "device_id": device_id, + "user_id": user_id, + } + ) self._simple_delete_txn( txn, table="e2e_device_keys_json", From fd06118114ac2e7a77ea21d6b32375e8156ffeac Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Tue, 16 Jul 2019 11:20:20 +0100 Subject: [PATCH 11/20] Better args wrapper --- synapse/handlers/e2e_keys.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 47471b400e21..98b30973c430 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -344,9 +344,8 @@ def claim_client_keys(destination): return {"one_time_keys": json_result, "failures": failures} @defer.inlineCallbacks + @opentracing.tag_args def upload_keys_for_user(self, user_id, device_id, keys): - opentracing.set_tag("user_id", user_id) - opentracing.set_tag("device_id", device_id) time_now = self.clock.time_msec() From 28afd58a0a70edef0d8660c58e748621e8557bee Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Wed, 17 Jul 2019 13:56:29 +0100 Subject: [PATCH 12/20] Use better decorator names. --- synapse/federation/federation_server.py | 2 +- synapse/handlers/e2e_keys.py | 10 +++++----- synapse/handlers/e2e_room_keys.py | 12 ++++++------ synapse/rest/client/v2_alpha/keys.py | 4 ++-- synapse/rest/client/v2_alpha/room_keys.py | 2 +- synapse/storage/e2e_room_keys.py | 10 +++++----- synapse/storage/end_to_end_keys.py | 6 +++--- 7 files changed, 23 insertions(+), 23 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 6cfe550624a0..8e82398e0afa 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -507,7 +507,7 @@ def on_query_client_keys(self, origin, content): def on_query_user_devices(self, origin, user_id): return self.on_query_request("user_devices", user_id) - @opentracing.trace_defered_function + @opentracing.trace_deferred @defer.inlineCallbacks @log_function def on_claim_client_keys(self, origin, content): diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 98b30973c430..ac626b2d02ea 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -47,7 +47,7 @@ def __init__(self, hs): "client_keys", self.on_federation_query_client_keys ) - @opentracing.trace_defered_function + @opentracing.trace_deferred @defer.inlineCallbacks def query_devices(self, query_body, timeout): """ Handle a device key query from a client @@ -126,7 +126,7 @@ def query_devices(self, query_body, timeout): r[user_id] = remote_queries[user_id] # Now fetch any devices that we don't have in our cache - @opentracing.trace_defered_function + @opentracing.trace_deferred @defer.inlineCallbacks def do_remote_query(destination): """This is called when we are querying the device list of a user on @@ -206,7 +206,7 @@ def do_remote_query(destination): return {"device_keys": results, "failures": failures} - @opentracing.trace_defered_function + @opentracing.trace_deferred @defer.inlineCallbacks def query_local_devices(self, query): """Get E2E device keys for local users @@ -270,7 +270,7 @@ def on_federation_query_client_keys(self, query_body): res = yield self.query_local_devices(device_keys_query) return {"device_keys": res} - @opentracing.trace_defered_function + @opentracing.trace_deferred @defer.inlineCallbacks def claim_one_time_keys(self, query, timeout): local_query = [] @@ -299,7 +299,7 @@ def claim_one_time_keys(self, query, timeout): key_id: json.loads(json_bytes) } - @opentracing.trace_defered_function + @opentracing.trace_deferred @defer.inlineCallbacks def claim_client_keys(destination): opentracing.set_tag("destination", destination) diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py index 1c34ff712fa0..975c241cce11 100644 --- a/synapse/handlers/e2e_room_keys.py +++ b/synapse/handlers/e2e_room_keys.py @@ -50,7 +50,7 @@ def __init__(self, hs): # changed. self._upload_linearizer = Linearizer("upload_room_keys_lock") - @opentracing.trace_defered_function + @opentracing.trace_deferred @defer.inlineCallbacks def get_room_keys(self, user_id, version, room_id=None, session_id=None): """Bulk get the E2E room keys for a given backup, optionally filtered to a given @@ -89,7 +89,7 @@ def get_room_keys(self, user_id, version, room_id=None, session_id=None): opentracing.log_kv(results) return results - @opentracing.trace_defered_function + @opentracing.trace_deferred @defer.inlineCallbacks def delete_room_keys(self, user_id, version, room_id=None, session_id=None): """Bulk delete the E2E room keys for a given backup, optionally filtered to a given @@ -111,7 +111,7 @@ def delete_room_keys(self, user_id, version, room_id=None, session_id=None): with (yield self._upload_linearizer.queue(user_id)): yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id) - @opentracing.trace_defered_function + @opentracing.trace_deferred @defer.inlineCallbacks def upload_room_keys(self, user_id, version, room_keys): """Bulk upload a list of room keys into a given backup version, asserting @@ -257,7 +257,7 @@ def _should_replace_room_key(current_room_key, room_key): return False return True - @opentracing.trace_defered_function + @opentracing.trace_deferred @defer.inlineCallbacks def create_version(self, user_id, version_info): """Create a new backup version. This automatically becomes the new @@ -316,7 +316,7 @@ def get_version_info(self, user_id, version=None): raise return res - @opentracing.trace_defered_function + @opentracing.trace_deferred @defer.inlineCallbacks def delete_version(self, user_id, version=None): """Deletes a given version of the user's e2e_room_keys backup @@ -337,7 +337,7 @@ def delete_version(self, user_id, version=None): else: raise - @opentracing.trace_defered_function + @opentracing.trace_deferred @defer.inlineCallbacks def update_version(self, user_id, version, version_info): """Update the info about a given version of the user's backup diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py index 381c1e6da775..ff8a5537690b 100644 --- a/synapse/rest/client/v2_alpha/keys.py +++ b/synapse/rest/client/v2_alpha/keys.py @@ -68,7 +68,7 @@ def __init__(self, hs): self.auth = hs.get_auth() self.e2e_keys_handler = hs.get_e2e_keys_handler() - @opentracing.trace_defered_function_using_operation_name("upload_keys") + @opentracing.trace_deferred_using_operation_name("upload_keys") @defer.inlineCallbacks def on_POST(self, request, device_id): requester = yield self.auth.get_user_by_req(request, allow_guest=True) @@ -182,7 +182,7 @@ def __init__(self, hs): self.auth = hs.get_auth() self.device_handler = hs.get_device_handler() - @opentracing.trace_defered_function + @opentracing.trace_deferred @defer.inlineCallbacks def on_GET(self, request): requester = yield self.auth.get_user_by_req(request, allow_guest=True) diff --git a/synapse/rest/client/v2_alpha/room_keys.py b/synapse/rest/client/v2_alpha/room_keys.py index 51bd4f12fff0..49c0e5b9e5a3 100644 --- a/synapse/rest/client/v2_alpha/room_keys.py +++ b/synapse/rest/client/v2_alpha/room_keys.py @@ -312,7 +312,7 @@ def __init__(self, hs): self.auth = hs.get_auth() self.e2e_room_keys_handler = hs.get_e2e_room_keys_handler() - @opentracing.trace_defered_function_using_operation_name("get_room_keys_version") + @opentracing.trace_deferred_using_operation_name("get_room_keys_version") @defer.inlineCallbacks def on_GET(self, request, version): """ diff --git a/synapse/storage/e2e_room_keys.py b/synapse/storage/e2e_room_keys.py index bfd8b51362e4..d6bf34ee30ab 100644 --- a/synapse/storage/e2e_room_keys.py +++ b/synapse/storage/e2e_room_keys.py @@ -104,7 +104,7 @@ def set_e2e_room_key(self, user_id, version, room_id, session_id, room_key): } ) - @opentracing.trace_defered_function + @opentracing.trace_deferred @defer.inlineCallbacks def get_e2e_room_keys(self, user_id, version, room_id=None, session_id=None): """Bulk get the E2E room keys for a given backup, optionally filtered to a given @@ -163,7 +163,7 @@ def get_e2e_room_keys(self, user_id, version, room_id=None, session_id=None): return sessions - @opentracing.trace_defered_function + @opentracing.trace_deferred @defer.inlineCallbacks def delete_e2e_room_keys(self, user_id, version, room_id=None, session_id=None): """Bulk delete the E2E room keys for a given backup, optionally filtered to a given @@ -247,7 +247,7 @@ def _get_e2e_room_keys_version_info_txn(txn): "get_e2e_room_keys_version_info", _get_e2e_room_keys_version_info_txn ) - @opentracing.trace_defered_function + @opentracing.trace_deferred def create_e2e_room_keys_version(self, user_id, info): """Atomically creates a new version of this user's e2e_room_keys store with the given version info. @@ -288,7 +288,7 @@ def _create_e2e_room_keys_version_txn(txn): "create_e2e_room_keys_version_txn", _create_e2e_room_keys_version_txn ) - @opentracing.trace_defered_function + @opentracing.trace_deferred def update_e2e_room_keys_version(self, user_id, version, info): """Update a given backup version @@ -305,7 +305,7 @@ def update_e2e_room_keys_version(self, user_id, version, info): desc="update_e2e_room_keys_version", ) - @opentracing.trace_defered_function + @opentracing.trace_deferred def delete_e2e_room_keys_version(self, user_id, version=None): """Delete a given backup version of the user's room keys. Doesn't delete their actual key data. diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index ea5ed2737be0..93c5cff072e3 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -26,7 +26,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore): - @opentracing.trace_defered_function + @opentracing.trace_deferred @defer.inlineCallbacks def get_e2e_device_keys( self, query_list, include_all_devices=False, include_deleted_devices=False @@ -61,7 +61,7 @@ def get_e2e_device_keys( return results - @opentracing.trace_function + @opentracing.trace def _get_e2e_device_keys_txn( self, txn, query_list, include_all_devices=False, include_deleted_devices=False ): @@ -252,7 +252,7 @@ def _set_e2e_device_keys_txn(txn): def claim_e2e_one_time_keys(self, query_list): """Take a list of one time keys out of the database""" - @opentracing.trace_function + @opentracing.trace def _claim_e2e_one_time_keys(txn): sql = ( "SELECT key_id, key_json FROM e2e_one_time_keys_json" From 3604759bef9d98c80c464275f31a3becdbc2a575 Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Mon, 22 Jul 2019 13:38:47 +0100 Subject: [PATCH 13/20] Use unified trace method --- synapse/federation/federation_server.py | 2 +- synapse/handlers/e2e_keys.py | 10 +++++----- synapse/handlers/e2e_room_keys.py | 12 ++++++------ synapse/rest/client/v2_alpha/keys.py | 4 ++-- synapse/rest/client/v2_alpha/room_keys.py | 2 +- synapse/storage/e2e_room_keys.py | 10 +++++----- synapse/storage/end_to_end_keys.py | 2 +- 7 files changed, 21 insertions(+), 21 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 8e82398e0afa..02fa3278ba35 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -507,7 +507,7 @@ def on_query_client_keys(self, origin, content): def on_query_user_devices(self, origin, user_id): return self.on_query_request("user_devices", user_id) - @opentracing.trace_deferred + @opentracing.trace @defer.inlineCallbacks @log_function def on_claim_client_keys(self, origin, content): diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index ac626b2d02ea..7a13c0774490 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -47,7 +47,7 @@ def __init__(self, hs): "client_keys", self.on_federation_query_client_keys ) - @opentracing.trace_deferred + @opentracing.trace @defer.inlineCallbacks def query_devices(self, query_body, timeout): """ Handle a device key query from a client @@ -126,7 +126,7 @@ def query_devices(self, query_body, timeout): r[user_id] = remote_queries[user_id] # Now fetch any devices that we don't have in our cache - @opentracing.trace_deferred + @opentracing.trace @defer.inlineCallbacks def do_remote_query(destination): """This is called when we are querying the device list of a user on @@ -206,7 +206,7 @@ def do_remote_query(destination): return {"device_keys": results, "failures": failures} - @opentracing.trace_deferred + @opentracing.trace @defer.inlineCallbacks def query_local_devices(self, query): """Get E2E device keys for local users @@ -270,7 +270,7 @@ def on_federation_query_client_keys(self, query_body): res = yield self.query_local_devices(device_keys_query) return {"device_keys": res} - @opentracing.trace_deferred + @opentracing.trace @defer.inlineCallbacks def claim_one_time_keys(self, query, timeout): local_query = [] @@ -299,7 +299,7 @@ def claim_one_time_keys(self, query, timeout): key_id: json.loads(json_bytes) } - @opentracing.trace_deferred + @opentracing.trace @defer.inlineCallbacks def claim_client_keys(destination): opentracing.set_tag("destination", destination) diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py index 975c241cce11..8c56dd1123ae 100644 --- a/synapse/handlers/e2e_room_keys.py +++ b/synapse/handlers/e2e_room_keys.py @@ -50,7 +50,7 @@ def __init__(self, hs): # changed. self._upload_linearizer = Linearizer("upload_room_keys_lock") - @opentracing.trace_deferred + @opentracing.trace @defer.inlineCallbacks def get_room_keys(self, user_id, version, room_id=None, session_id=None): """Bulk get the E2E room keys for a given backup, optionally filtered to a given @@ -89,7 +89,7 @@ def get_room_keys(self, user_id, version, room_id=None, session_id=None): opentracing.log_kv(results) return results - @opentracing.trace_deferred + @opentracing.trace @defer.inlineCallbacks def delete_room_keys(self, user_id, version, room_id=None, session_id=None): """Bulk delete the E2E room keys for a given backup, optionally filtered to a given @@ -111,7 +111,7 @@ def delete_room_keys(self, user_id, version, room_id=None, session_id=None): with (yield self._upload_linearizer.queue(user_id)): yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id) - @opentracing.trace_deferred + @opentracing.trace @defer.inlineCallbacks def upload_room_keys(self, user_id, version, room_keys): """Bulk upload a list of room keys into a given backup version, asserting @@ -257,7 +257,7 @@ def _should_replace_room_key(current_room_key, room_key): return False return True - @opentracing.trace_deferred + @opentracing.trace @defer.inlineCallbacks def create_version(self, user_id, version_info): """Create a new backup version. This automatically becomes the new @@ -316,7 +316,7 @@ def get_version_info(self, user_id, version=None): raise return res - @opentracing.trace_deferred + @opentracing.trace @defer.inlineCallbacks def delete_version(self, user_id, version=None): """Deletes a given version of the user's e2e_room_keys backup @@ -337,7 +337,7 @@ def delete_version(self, user_id, version=None): else: raise - @opentracing.trace_deferred + @opentracing.trace @defer.inlineCallbacks def update_version(self, user_id, version, version_info): """Update the info about a given version of the user's backup diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py index ff8a5537690b..4ef2a060f1c1 100644 --- a/synapse/rest/client/v2_alpha/keys.py +++ b/synapse/rest/client/v2_alpha/keys.py @@ -68,7 +68,7 @@ def __init__(self, hs): self.auth = hs.get_auth() self.e2e_keys_handler = hs.get_e2e_keys_handler() - @opentracing.trace_deferred_using_operation_name("upload_keys") + @opentracing.trace_using_operation_name("upload_keys") @defer.inlineCallbacks def on_POST(self, request, device_id): requester = yield self.auth.get_user_by_req(request, allow_guest=True) @@ -182,7 +182,7 @@ def __init__(self, hs): self.auth = hs.get_auth() self.device_handler = hs.get_device_handler() - @opentracing.trace_deferred + @opentracing.trace @defer.inlineCallbacks def on_GET(self, request): requester = yield self.auth.get_user_by_req(request, allow_guest=True) diff --git a/synapse/rest/client/v2_alpha/room_keys.py b/synapse/rest/client/v2_alpha/room_keys.py index 49c0e5b9e5a3..4d829270ebe0 100644 --- a/synapse/rest/client/v2_alpha/room_keys.py +++ b/synapse/rest/client/v2_alpha/room_keys.py @@ -312,7 +312,7 @@ def __init__(self, hs): self.auth = hs.get_auth() self.e2e_room_keys_handler = hs.get_e2e_room_keys_handler() - @opentracing.trace_deferred_using_operation_name("get_room_keys_version") + @opentracing.trace_using_operation_name("get_room_keys_version") @defer.inlineCallbacks def on_GET(self, request, version): """ diff --git a/synapse/storage/e2e_room_keys.py b/synapse/storage/e2e_room_keys.py index d6bf34ee30ab..276343406038 100644 --- a/synapse/storage/e2e_room_keys.py +++ b/synapse/storage/e2e_room_keys.py @@ -104,7 +104,7 @@ def set_e2e_room_key(self, user_id, version, room_id, session_id, room_key): } ) - @opentracing.trace_deferred + @opentracing.trace @defer.inlineCallbacks def get_e2e_room_keys(self, user_id, version, room_id=None, session_id=None): """Bulk get the E2E room keys for a given backup, optionally filtered to a given @@ -163,7 +163,7 @@ def get_e2e_room_keys(self, user_id, version, room_id=None, session_id=None): return sessions - @opentracing.trace_deferred + @opentracing.trace @defer.inlineCallbacks def delete_e2e_room_keys(self, user_id, version, room_id=None, session_id=None): """Bulk delete the E2E room keys for a given backup, optionally filtered to a given @@ -247,7 +247,7 @@ def _get_e2e_room_keys_version_info_txn(txn): "get_e2e_room_keys_version_info", _get_e2e_room_keys_version_info_txn ) - @opentracing.trace_deferred + @opentracing.trace def create_e2e_room_keys_version(self, user_id, info): """Atomically creates a new version of this user's e2e_room_keys store with the given version info. @@ -288,7 +288,7 @@ def _create_e2e_room_keys_version_txn(txn): "create_e2e_room_keys_version_txn", _create_e2e_room_keys_version_txn ) - @opentracing.trace_deferred + @opentracing.trace def update_e2e_room_keys_version(self, user_id, version, info): """Update a given backup version @@ -305,7 +305,7 @@ def update_e2e_room_keys_version(self, user_id, version, info): desc="update_e2e_room_keys_version", ) - @opentracing.trace_deferred + @opentracing.trace def delete_e2e_room_keys_version(self, user_id, version=None): """Delete a given backup version of the user's room keys. Doesn't delete their actual key data. diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index 93c5cff072e3..f9a45b98bd9b 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -26,7 +26,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore): - @opentracing.trace_deferred + @opentracing.trace @defer.inlineCallbacks def get_e2e_device_keys( self, query_list, include_all_devices=False, include_deleted_devices=False From 1ae4666009d9cfe59b4ceed9918920cc1216aac9 Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Mon, 5 Aug 2019 14:04:07 +0100 Subject: [PATCH 14/20] Remove redundent tagging. --- synapse/handlers/e2e_keys.py | 2 -- synapse/rest/client/v2_alpha/keys.py | 2 -- 2 files changed, 4 deletions(-) diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 7a13c0774490..16ce86cf0c4b 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -351,7 +351,6 @@ def upload_keys_for_user(self, user_id, device_id, keys): # TODO: Validate the JSON to make sure it has the right keys. device_keys = keys.get("device_keys", None) - opentracing.set_tag("device_keys", device_keys) if device_keys: logger.info( "Updating device_keys for device %r for user %s at %d", @@ -378,7 +377,6 @@ def upload_keys_for_user(self, user_id, device_id, keys): {"message": "Not updating device_keys for user", "user_id": user_id} ) one_time_keys = keys.get("one_time_keys", None) - opentracing.set_tag("one_time_keys", one_time_keys) if one_time_keys: opentracing.log_kv( { diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py index 4ef2a060f1c1..7def7472e385 100644 --- a/synapse/rest/client/v2_alpha/keys.py +++ b/synapse/rest/client/v2_alpha/keys.py @@ -198,8 +198,6 @@ def on_GET(self, request): user_id = requester.user.to_string() - opentracing.set_tag("user_id", user_id) - results = yield self.device_handler.get_user_ids_changed(user_id, from_token) return (200, results) From d0d8cfea403cbe61101bf0e3fb546b9bda118e06 Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Mon, 5 Aug 2019 14:20:39 +0100 Subject: [PATCH 15/20] Remove redundent spans. --- synapse/rest/client/v2_alpha/keys.py | 1 - synapse/rest/client/v2_alpha/room_keys.py | 1 - 2 files changed, 2 deletions(-) diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py index 7def7472e385..fb36a31e080b 100644 --- a/synapse/rest/client/v2_alpha/keys.py +++ b/synapse/rest/client/v2_alpha/keys.py @@ -182,7 +182,6 @@ def __init__(self, hs): self.auth = hs.get_auth() self.device_handler = hs.get_device_handler() - @opentracing.trace @defer.inlineCallbacks def on_GET(self, request): requester = yield self.auth.get_user_by_req(request, allow_guest=True) diff --git a/synapse/rest/client/v2_alpha/room_keys.py b/synapse/rest/client/v2_alpha/room_keys.py index 4d829270ebe0..81191dd23780 100644 --- a/synapse/rest/client/v2_alpha/room_keys.py +++ b/synapse/rest/client/v2_alpha/room_keys.py @@ -312,7 +312,6 @@ def __init__(self, hs): self.auth = hs.get_auth() self.e2e_room_keys_handler = hs.get_e2e_room_keys_handler() - @opentracing.trace_using_operation_name("get_room_keys_version") @defer.inlineCallbacks def on_GET(self, request, version): """ From 030f807108f0d26fd68670758ae5a7214235c994 Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Wed, 14 Aug 2019 17:21:12 +0100 Subject: [PATCH 16/20] isort --- synapse/federation/federation_server.py | 7 +++---- synapse/handlers/e2e_keys.py | 2 +- synapse/handlers/e2e_room_keys.py | 2 +- synapse/rest/client/v2_alpha/keys.py | 3 ++- synapse/rest/client/v2_alpha/room_keys.py | 1 - synapse/storage/e2e_room_keys.py | 2 +- synapse/storage/end_to_end_keys.py | 3 +-- 7 files changed, 9 insertions(+), 11 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 02fa3278ba35..9286ca320213 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -43,6 +43,7 @@ from synapse.federation.units import Edu, Transaction from synapse.http.endpoint import parse_server_name from synapse.logging.context import nested_logging_context +from synapse.logging.opentracing import log_kv, trace from synapse.logging.utils import log_function from synapse.replication.http.federation import ( ReplicationFederationSendEduRestServlet, @@ -507,7 +508,7 @@ def on_query_client_keys(self, origin, content): def on_query_user_devices(self, origin, user_id): return self.on_query_request("user_devices", user_id) - @opentracing.trace + @trace @defer.inlineCallbacks @log_function def on_claim_client_keys(self, origin, content): @@ -516,9 +517,7 @@ def on_claim_client_keys(self, origin, content): for device_id, algorithm in device_keys.items(): query.append((user_id, device_id, algorithm)) - opentracing.log_kv( - {"message": "Claiming one time keys.", "user, device pairs": query} - ) + log_kv({"message": "Claiming one time keys.", "user, device pairs": query}) results = yield self.store.claim_e2e_one_time_keys(query) json_result = {} diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 16ce86cf0c4b..a79723cc3e1b 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -15,7 +15,6 @@ # limitations under the License. import logging -import synapse.logging.opentracing as opentracing from six import iteritems @@ -23,6 +22,7 @@ from twisted.internet import defer +import synapse.logging.opentracing as opentracing from synapse.api.errors import CodeMessageException, SynapseError from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.types import UserID, get_domain_from_id diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py index 8c56dd1123ae..f6a95bce18d7 100644 --- a/synapse/handlers/e2e_room_keys.py +++ b/synapse/handlers/e2e_room_keys.py @@ -19,6 +19,7 @@ from twisted.internet import defer +import synapse.logging.opentracing as opentracing from synapse.api.errors import ( Codes, NotFoundError, @@ -27,7 +28,6 @@ SynapseError, ) from synapse.util.async_helpers import Linearizer -import synapse.logging.opentracing as opentracing logger = logging.getLogger(__name__) diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py index fb36a31e080b..3282b5589291 100644 --- a/synapse/rest/client/v2_alpha/keys.py +++ b/synapse/rest/client/v2_alpha/keys.py @@ -17,6 +17,7 @@ from twisted.internet import defer +import synapse.logging.opentracing as opentracing from synapse.api.errors import SynapseError from synapse.http.servlet import ( RestServlet, @@ -25,7 +26,7 @@ parse_string, ) from synapse.types import StreamToken -import synapse.logging.opentracing as opentracing + from ._base import client_patterns logger = logging.getLogger(__name__) diff --git a/synapse/rest/client/v2_alpha/room_keys.py b/synapse/rest/client/v2_alpha/room_keys.py index 81191dd23780..10dec96208b6 100644 --- a/synapse/rest/client/v2_alpha/room_keys.py +++ b/synapse/rest/client/v2_alpha/room_keys.py @@ -23,7 +23,6 @@ parse_json_object_from_request, parse_string, ) -import synapse.logging.opentracing as opentracing from ._base import client_patterns diff --git a/synapse/storage/e2e_room_keys.py b/synapse/storage/e2e_room_keys.py index 276343406038..4a6b2be6f617 100644 --- a/synapse/storage/e2e_room_keys.py +++ b/synapse/storage/e2e_room_keys.py @@ -17,8 +17,8 @@ from twisted.internet import defer -from synapse.api.errors import StoreError import synapse.logging.opentracing as opentracing +from synapse.api.errors import StoreError from ._base import SQLBaseStore diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index f9a45b98bd9b..7ee973a8bb3c 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -18,12 +18,11 @@ from twisted.internet import defer +import synapse.logging.opentracing as opentracing from synapse.util.caches.descriptors import cached from ._base import SQLBaseStore, db_to_json -import synapse.logging.opentracing as opentracing - class EndToEndKeyWorkerStore(SQLBaseStore): @opentracing.trace From 138ef5f504ad0b2006c833ebaa8bfefe5b1c0338 Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Wed, 14 Aug 2019 17:27:03 +0100 Subject: [PATCH 17/20] newsfile --- changelog.d/5855.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/5855.misc diff --git a/changelog.d/5855.misc b/changelog.d/5855.misc new file mode 100644 index 000000000000..32db7fbe3777 --- /dev/null +++ b/changelog.d/5855.misc @@ -0,0 +1 @@ +Opentracing for room and e2e keys. From d671ab42f2581f8e397bd678fb75157ba9470228 Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Wed, 14 Aug 2019 18:21:33 +0100 Subject: [PATCH 18/20] Import style --- synapse/handlers/e2e_keys.py | 58 +++++++++++++--------------- synapse/handlers/e2e_room_keys.py | 24 ++++++------ synapse/rest/client/v2_alpha/keys.py | 17 +++++--- synapse/storage/e2e_room_keys.py | 14 +++---- synapse/storage/end_to_end_keys.py | 46 ++++++++++------------ 5 files changed, 78 insertions(+), 81 deletions(-) diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index a79723cc3e1b..2a5ae3de6a3e 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -22,9 +22,9 @@ from twisted.internet import defer -import synapse.logging.opentracing as opentracing from synapse.api.errors import CodeMessageException, SynapseError from synapse.logging.context import make_deferred_yieldable, run_in_background +from synapse.logging.opentracing import log_kv, set_tag, tag_args, trace from synapse.types import UserID, get_domain_from_id from synapse.util import unwrapFirstError from synapse.util.retryutils import NotRetryingDestination @@ -47,7 +47,7 @@ def __init__(self, hs): "client_keys", self.on_federation_query_client_keys ) - @opentracing.trace + @trace @defer.inlineCallbacks def query_devices(self, query_body, timeout): """ Handle a device key query from a client @@ -83,8 +83,8 @@ def query_devices(self, query_body, timeout): else: remote_queries[user_id] = device_ids - opentracing.set_tag("local_key_query", local_query) - opentracing.set_tag("remote_key_query", remote_queries) + set_tag("local_key_query", local_query) + set_tag("remote_key_query", remote_queries) # First get local devices. failures = {} @@ -126,7 +126,7 @@ def query_devices(self, query_body, timeout): r[user_id] = remote_queries[user_id] # Now fetch any devices that we don't have in our cache - @opentracing.trace + @trace @defer.inlineCallbacks def do_remote_query(destination): """This is called when we are querying the device list of a user on @@ -191,8 +191,8 @@ def do_remote_query(destination): except Exception as e: failure = _exception_to_failure(e) failures[destination] = failure - opentracing.set_tag("error", True) - opentracing.set_tag("reason", failure) + set_tag("error", True) + set_tag("reason", failure) yield make_deferred_yieldable( defer.gatherResults( @@ -206,7 +206,7 @@ def do_remote_query(destination): return {"device_keys": results, "failures": failures} - @opentracing.trace + @trace @defer.inlineCallbacks def query_local_devices(self, query): """Get E2E device keys for local users @@ -219,7 +219,7 @@ def query_local_devices(self, query): defer.Deferred: (resolves to dict[string, dict[string, dict]]): map from user_id -> device_id -> device details """ - opentracing.set_tag("local_query", query) + set_tag("local_query", query) local_query = [] result_dict = {} @@ -227,14 +227,14 @@ def query_local_devices(self, query): # we use UserID.from_string to catch invalid user ids if not self.is_mine(UserID.from_string(user_id)): logger.warning("Request for keys for non-local user %s", user_id) - opentracing.log_kv( + log_kv( { "message": "Requested a local key for a user which" + " was not local to the homeserver", "user_id": user_id, } ) - opentracing.set_tag("error", True) + set_tag("error", True) raise SynapseError(400, "Not a user here") if not device_ids: @@ -259,7 +259,7 @@ def query_local_devices(self, query): r["unsigned"]["device_display_name"] = display_name result_dict[user_id][device_id] = r - opentracing.log_kv(results) + log_kv(results) return result_dict @defer.inlineCallbacks @@ -270,7 +270,7 @@ def on_federation_query_client_keys(self, query_body): res = yield self.query_local_devices(device_keys_query) return {"device_keys": res} - @opentracing.trace + @trace @defer.inlineCallbacks def claim_one_time_keys(self, query, timeout): local_query = [] @@ -285,8 +285,8 @@ def claim_one_time_keys(self, query, timeout): domain = get_domain_from_id(user_id) remote_queries.setdefault(domain, {})[user_id] = device_keys - opentracing.set_tag("local_key_query", local_query) - opentracing.set_tag("remote_key_query", remote_queries) + set_tag("local_key_query", local_query) + set_tag("remote_key_query", remote_queries) results = yield self.store.claim_e2e_one_time_keys(local_query) @@ -299,10 +299,10 @@ def claim_one_time_keys(self, query, timeout): key_id: json.loads(json_bytes) } - @opentracing.trace + @trace @defer.inlineCallbacks def claim_client_keys(destination): - opentracing.set_tag("destination", destination) + set_tag("destination", destination) device_keys = remote_queries[destination] try: remote_result = yield self.federation.claim_client_keys( @@ -315,8 +315,8 @@ def claim_client_keys(destination): except Exception as e: failure = _exception_to_failure(e) failures[destination] = failure - opentracing.set_tag("error", True) - opentracing.set_tag("reason", failure) + set_tag("error", True) + set_tag("reason", failure) yield make_deferred_yieldable( defer.gatherResults( @@ -340,11 +340,11 @@ def claim_client_keys(destination): ), ) - opentracing.log_kv({"one_time_keys": json_result, "failures": failures}) + log_kv({"one_time_keys": json_result, "failures": failures}) return {"one_time_keys": json_result, "failures": failures} @defer.inlineCallbacks - @opentracing.tag_args + @tag_args def upload_keys_for_user(self, user_id, device_id, keys): time_now = self.clock.time_msec() @@ -358,7 +358,7 @@ def upload_keys_for_user(self, user_id, device_id, keys): user_id, time_now, ) - opentracing.log_kv( + log_kv( { "message": "Updating device_keys for user.", "user_id": user_id, @@ -373,12 +373,10 @@ def upload_keys_for_user(self, user_id, device_id, keys): # Only notify about device updates *if* the keys actually changed yield self.device_handler.notify_device_update(user_id, [device_id]) else: - opentracing.log_kv( - {"message": "Not updating device_keys for user", "user_id": user_id} - ) + log_kv({"message": "Not updating device_keys for user", "user_id": user_id}) one_time_keys = keys.get("one_time_keys", None) if one_time_keys: - opentracing.log_kv( + log_kv( { "message": "Updating one_time_keys for device.", "user_id": user_id, @@ -389,7 +387,7 @@ def upload_keys_for_user(self, user_id, device_id, keys): user_id, device_id, time_now, one_time_keys ) else: - opentracing.log_kv( + log_kv( {"message": "Did not update one_time_keys", "reason": "no keys given"} ) @@ -402,7 +400,7 @@ def upload_keys_for_user(self, user_id, device_id, keys): result = yield self.store.count_e2e_one_time_keys(user_id, device_id) - opentracing.set_tag("one_time_key_counts", result) + set_tag("one_time_key_counts", result) return {"one_time_key_counts": result} @defer.inlineCallbacks @@ -446,9 +444,7 @@ def _upload_one_time_keys_for_user( (algorithm, key_id, encode_canonical_json(key).decode("ascii")) ) - opentracing.log_kv( - {"message": "Inserting new one_time_keys.", "keys": new_keys} - ) + log_kv({"message": "Inserting new one_time_keys.", "keys": new_keys}) yield self.store.add_e2e_one_time_keys(user_id, device_id, time_now, new_keys) diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py index f6a95bce18d7..a9d80f708c77 100644 --- a/synapse/handlers/e2e_room_keys.py +++ b/synapse/handlers/e2e_room_keys.py @@ -19,7 +19,6 @@ from twisted.internet import defer -import synapse.logging.opentracing as opentracing from synapse.api.errors import ( Codes, NotFoundError, @@ -27,6 +26,7 @@ StoreError, SynapseError, ) +from synapse.logging.opentracing import log_kv, trace from synapse.util.async_helpers import Linearizer logger = logging.getLogger(__name__) @@ -50,7 +50,7 @@ def __init__(self, hs): # changed. self._upload_linearizer = Linearizer("upload_room_keys_lock") - @opentracing.trace + @trace @defer.inlineCallbacks def get_room_keys(self, user_id, version, room_id=None, session_id=None): """Bulk get the E2E room keys for a given backup, optionally filtered to a given @@ -86,10 +86,10 @@ def get_room_keys(self, user_id, version, room_id=None, session_id=None): user_id, version, room_id, session_id ) - opentracing.log_kv(results) + log_kv(results) return results - @opentracing.trace + @trace @defer.inlineCallbacks def delete_room_keys(self, user_id, version, room_id=None, session_id=None): """Bulk delete the E2E room keys for a given backup, optionally filtered to a given @@ -111,7 +111,7 @@ def delete_room_keys(self, user_id, version, room_id=None, session_id=None): with (yield self._upload_linearizer.queue(user_id)): yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id) - @opentracing.trace + @trace @defer.inlineCallbacks def upload_room_keys(self, user_id, version, room_keys): """Bulk upload a list of room keys into a given backup version, asserting @@ -191,7 +191,7 @@ def _upload_room_key(self, user_id, version, room_id, session_id, room_key): session_id(str): the session whose room_key we're setting room_key(dict): the room_key being set """ - opentracing.log_kv( + log_kv( { "message": "Trying to upload room key", "room_id": room_id, @@ -207,7 +207,7 @@ def _upload_room_key(self, user_id, version, room_id, session_id, room_key): ) except StoreError as e: if e.code == 404: - opentracing.log_kv( + log_kv( { "message": "Room key not found.", "room_id": room_id, @@ -218,12 +218,12 @@ def _upload_room_key(self, user_id, version, room_id, session_id, room_key): raise if self._should_replace_room_key(current_room_key, room_key): - opentracing.log_kv({"message": "Replacing room key."}) + log_kv({"message": "Replacing room key."}) yield self.store.set_e2e_room_key( user_id, version, room_id, session_id, room_key ) else: - opentracing.log_kv({"message": "Not replacing room_key."}) + log_kv({"message": "Not replacing room_key."}) @staticmethod def _should_replace_room_key(current_room_key, room_key): @@ -257,7 +257,7 @@ def _should_replace_room_key(current_room_key, room_key): return False return True - @opentracing.trace + @trace @defer.inlineCallbacks def create_version(self, user_id, version_info): """Create a new backup version. This automatically becomes the new @@ -316,7 +316,7 @@ def get_version_info(self, user_id, version=None): raise return res - @opentracing.trace + @trace @defer.inlineCallbacks def delete_version(self, user_id, version=None): """Deletes a given version of the user's e2e_room_keys backup @@ -337,7 +337,7 @@ def delete_version(self, user_id, version=None): else: raise - @opentracing.trace + @trace @defer.inlineCallbacks def update_version(self, user_id, version, version_info): """Update the info about a given version of the user's backup diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py index 3282b5589291..e4323a40e895 100644 --- a/synapse/rest/client/v2_alpha/keys.py +++ b/synapse/rest/client/v2_alpha/keys.py @@ -17,7 +17,6 @@ from twisted.internet import defer -import synapse.logging.opentracing as opentracing from synapse.api.errors import SynapseError from synapse.http.servlet import ( RestServlet, @@ -25,6 +24,12 @@ parse_json_object_from_request, parse_string, ) +from synapse.logging.opentracing import ( + log_kv, + set_tag, + trace, + trace_using_operation_name, +) from synapse.types import StreamToken from ._base import client_patterns @@ -69,7 +74,7 @@ def __init__(self, hs): self.auth = hs.get_auth() self.e2e_keys_handler = hs.get_e2e_keys_handler() - @opentracing.trace_using_operation_name("upload_keys") + @trace_using_operation_name("upload_keys") @defer.inlineCallbacks def on_POST(self, request, device_id): requester = yield self.auth.get_user_by_req(request, allow_guest=True) @@ -80,8 +85,8 @@ def on_POST(self, request, device_id): # passing the device_id here is deprecated; however, we allow it # for now for compatibility with older clients. if requester.device_id is not None and device_id != requester.device_id: - opentracing.set_tag("error", True) - opentracing.log_kv( + set_tag("error", True) + log_kv( { "message": "Client uploading keys for a different device", "logged_in_id": requester.device_id, @@ -188,11 +193,11 @@ def on_GET(self, request): requester = yield self.auth.get_user_by_req(request, allow_guest=True) from_token_string = parse_string(request, "from") - opentracing.set_tag("from", from_token_string) + set_tag("from", from_token_string) # We want to enforce they do pass us one, but we ignore it and return # changes after the "to" as well as before. - opentracing.set_tag("to", parse_string(request, "to")) + set_tag("to", parse_string(request, "to")) from_token = StreamToken.from_string(from_token_string) diff --git a/synapse/storage/e2e_room_keys.py b/synapse/storage/e2e_room_keys.py index 4a6b2be6f617..2d4b273672fd 100644 --- a/synapse/storage/e2e_room_keys.py +++ b/synapse/storage/e2e_room_keys.py @@ -17,8 +17,8 @@ from twisted.internet import defer -import synapse.logging.opentracing as opentracing from synapse.api.errors import StoreError +from synapse.logging.opentracing import log_kv, trace from ._base import SQLBaseStore @@ -95,7 +95,7 @@ def set_e2e_room_key(self, user_id, version, room_id, session_id, room_key): }, lock=False, ) - opentracing.log_kv( + log_kv( { "message": "Set room key", "room_id": room_id, @@ -104,7 +104,7 @@ def set_e2e_room_key(self, user_id, version, room_id, session_id, room_key): } ) - @opentracing.trace + @trace @defer.inlineCallbacks def get_e2e_room_keys(self, user_id, version, room_id=None, session_id=None): """Bulk get the E2E room keys for a given backup, optionally filtered to a given @@ -163,7 +163,7 @@ def get_e2e_room_keys(self, user_id, version, room_id=None, session_id=None): return sessions - @opentracing.trace + @trace @defer.inlineCallbacks def delete_e2e_room_keys(self, user_id, version, room_id=None, session_id=None): """Bulk delete the E2E room keys for a given backup, optionally filtered to a given @@ -247,7 +247,7 @@ def _get_e2e_room_keys_version_info_txn(txn): "get_e2e_room_keys_version_info", _get_e2e_room_keys_version_info_txn ) - @opentracing.trace + @trace def create_e2e_room_keys_version(self, user_id, info): """Atomically creates a new version of this user's e2e_room_keys store with the given version info. @@ -288,7 +288,7 @@ def _create_e2e_room_keys_version_txn(txn): "create_e2e_room_keys_version_txn", _create_e2e_room_keys_version_txn ) - @opentracing.trace + @trace def update_e2e_room_keys_version(self, user_id, version, info): """Update a given backup version @@ -305,7 +305,7 @@ def update_e2e_room_keys_version(self, user_id, version, info): desc="update_e2e_room_keys_version", ) - @opentracing.trace + @trace def delete_e2e_room_keys_version(self, user_id, version=None): """Delete a given backup version of the user's room keys. Doesn't delete their actual key data. diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index 7ee973a8bb3c..33e3a84933de 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -18,14 +18,14 @@ from twisted.internet import defer -import synapse.logging.opentracing as opentracing +from synapse.logging.opentracing import log_kv, set_tag, trace from synapse.util.caches.descriptors import cached from ._base import SQLBaseStore, db_to_json class EndToEndKeyWorkerStore(SQLBaseStore): - @opentracing.trace + @trace @defer.inlineCallbacks def get_e2e_device_keys( self, query_list, include_all_devices=False, include_deleted_devices=False @@ -42,7 +42,7 @@ def get_e2e_device_keys( Dict mapping from user-id to dict mapping from device_id to dict containing "key_json", "device_display_name". """ - opentracing.set_tag("query_list", query_list) + set_tag("query_list", query_list) if not query_list: return {} @@ -60,12 +60,12 @@ def get_e2e_device_keys( return results - @opentracing.trace + @trace def _get_e2e_device_keys_txn( self, txn, query_list, include_all_devices=False, include_deleted_devices=False ): - opentracing.set_tag("include_all_devices", include_all_devices) - opentracing.set_tag("include_deleted_devices", include_deleted_devices) + set_tag("include_all_devices", include_all_devices) + set_tag("include_deleted_devices", include_deleted_devices) query_clauses = [] query_params = [] @@ -111,7 +111,7 @@ def _get_e2e_device_keys_txn( for user_id, device_id in deleted_devices: result.setdefault(user_id, {})[device_id] = None - opentracing.log_kv(result) + log_kv(result) return result @defer.inlineCallbacks @@ -138,9 +138,7 @@ def get_e2e_one_time_keys(self, user_id, device_id, key_ids): desc="add_e2e_one_time_keys_check", ) result = {(row["algorithm"], row["key_id"]): row["key_json"] for row in rows} - opentracing.log_kv( - {"message": "Fetched one time keys for user", "one_time_keys": result} - ) + log_kv({"message": "Fetched one time keys for user", "one_time_keys": result}) return result @defer.inlineCallbacks @@ -157,9 +155,9 @@ def add_e2e_one_time_keys(self, user_id, device_id, time_now, new_keys): """ def _add_e2e_one_time_keys(txn): - opentracing.set_tag("user_id", user_id) - opentracing.set_tag("device_id", device_id) - opentracing.set_tag("new_keys", new_keys) + set_tag("user_id", user_id) + set_tag("device_id", device_id) + set_tag("new_keys", new_keys) # We are protected from race between lookup and insertion due to # a unique constraint. If there is a race of two calls to # `add_e2e_one_time_keys` then they'll conflict and we will only @@ -216,10 +214,10 @@ def set_e2e_device_keys(self, user_id, device_id, time_now, device_keys): """ def _set_e2e_device_keys_txn(txn): - opentracing.set_tag("user_id", user_id) - opentracing.set_tag("device_id", device_id) - opentracing.set_tag("time_now", time_now) - opentracing.set_tag("device_keys", device_keys) + set_tag("user_id", user_id) + set_tag("device_id", device_id) + set_tag("time_now", time_now) + set_tag("device_keys", device_keys) old_key_json = self._simple_select_one_onecol_txn( txn, @@ -234,7 +232,7 @@ def _set_e2e_device_keys_txn(txn): new_key_json = encode_canonical_json(device_keys).decode("utf-8") if old_key_json == new_key_json: - opentracing.log_kv({"Message": "Device key already stored."}) + log_kv({"Message": "Device key already stored."}) return False self._simple_upsert_txn( @@ -243,7 +241,7 @@ def _set_e2e_device_keys_txn(txn): keyvalues={"user_id": user_id, "device_id": device_id}, values={"ts_added_ms": time_now, "key_json": new_key_json}, ) - opentracing.log_kv({"message": "Device keys stored."}) + log_kv({"message": "Device keys stored."}) return True return self.runInteraction("set_e2e_device_keys", _set_e2e_device_keys_txn) @@ -251,7 +249,7 @@ def _set_e2e_device_keys_txn(txn): def claim_e2e_one_time_keys(self, query_list): """Take a list of one time keys out of the database""" - @opentracing.trace + @trace def _claim_e2e_one_time_keys(txn): sql = ( "SELECT key_id, key_json FROM e2e_one_time_keys_json" @@ -273,15 +271,13 @@ def _claim_e2e_one_time_keys(txn): " AND key_id = ?" ) for user_id, device_id, algorithm, key_id in delete: - opentracing.log_kv( + log_kv( { "message": "Executing claim e2e_one_time_keys transaction on database." } ) txn.execute(sql, (user_id, device_id, algorithm, key_id)) - opentracing.log_kv( - {"message": "finished executing and invalidating cache"} - ) + log_kv({"message": "finished executing and invalidating cache"}) self._invalidate_cache_and_stream( txn, self.count_e2e_one_time_keys, (user_id, device_id) ) @@ -291,7 +287,7 @@ def _claim_e2e_one_time_keys(txn): def delete_e2e_keys_by_device(self, user_id, device_id): def delete_e2e_keys_by_device_txn(txn): - opentracing.log_kv( + log_kv( { "message": "Deleting keys for device", "device_id": device_id, From eb2a4e54ac7d896363bcc5a42dc1e37873b5faac Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Fri, 16 Aug 2019 18:36:55 +0100 Subject: [PATCH 19/20] Redundent + --- synapse/handlers/e2e_keys.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 2a5ae3de6a3e..056fb97acb40 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -230,7 +230,7 @@ def query_local_devices(self, query): log_kv( { "message": "Requested a local key for a user which" - + " was not local to the homeserver", + " was not local to the homeserver", "user_id": user_id, } ) From 1892c677cb7c9c305a0dbeb1655c6fec9501979e Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Fri, 16 Aug 2019 18:37:09 +0100 Subject: [PATCH 20/20] Unused import --- synapse/rest/client/v2_alpha/keys.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py index e4323a40e895..b218a3f334d4 100644 --- a/synapse/rest/client/v2_alpha/keys.py +++ b/synapse/rest/client/v2_alpha/keys.py @@ -24,12 +24,7 @@ parse_json_object_from_request, parse_string, ) -from synapse.logging.opentracing import ( - log_kv, - set_tag, - trace, - trace_using_operation_name, -) +from synapse.logging.opentracing import log_kv, set_tag, trace_using_operation_name from synapse.types import StreamToken from ._base import client_patterns