From 467aa4d1beb3ec1541ca29d1059e411101062013 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 20 Jan 2020 15:09:18 +0000 Subject: [PATCH 1/2] Propagate cache invalidates from workers to other workers. --- synapse/replication/tcp/protocol.py | 2 +- synapse/replication/tcp/resource.py | 9 ++++++--- synapse/storage/data_stores/main/cache.py | 21 +++++++++++++++++++++ 3 files changed, 28 insertions(+), 4 deletions(-) diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 131e5acb09ca..bc1482a9bbf2 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -459,7 +459,7 @@ async def on_REMOVE_PUSHER(self, cmd): await self.streamer.on_remove_pusher(cmd.app_id, cmd.push_key, cmd.user_id) async def on_INVALIDATE_CACHE(self, cmd): - self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys) + await self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys) async def on_REMOTE_SERVER_UP(self, cmd: RemoteServerUpCommand): self.streamer.on_remote_server_up(cmd.data) diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 6ebf944f66a4..ce60ae2e07f1 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -17,7 +17,7 @@ import logging import random -from typing import List +from typing import Any, List from six import itervalues @@ -271,11 +271,14 @@ async def on_remove_pusher(self, app_id, push_key, user_id): self.notifier.on_new_replication_data() @measure_func("repl.on_invalidate_cache") - def on_invalidate_cache(self, cache_func, keys): + async def on_invalidate_cache(self, cache_func: str, keys: List[Any]): """The client has asked us to invalidate a cache """ invalidate_cache_counter.inc() - getattr(self.store, cache_func).invalidate(tuple(keys)) + + # We invalidate the cache locally, but then also stream that to other + # workers. + await self.store.invalidate_cache_and_stream(cache_func, tuple(keys)) @measure_func("repl.on_user_ip") async def on_user_ip( diff --git a/synapse/storage/data_stores/main/cache.py b/synapse/storage/data_stores/main/cache.py index bf91512daf99..a5baf538914e 100644 --- a/synapse/storage/data_stores/main/cache.py +++ b/synapse/storage/data_stores/main/cache.py @@ -16,6 +16,7 @@ import itertools import logging +from typing import Any, Tuple from twisted.internet import defer @@ -32,6 +33,26 @@ class CacheInvalidationStore(SQLBaseStore): + async def invalidate_cache_and_stream(self, cache_name: str, keys: Tuple[Any, ...]): + """Invalidates the cache and adds it to the cache stream so slaves + will know to invalidate their caches. + + This should only be used to invalidate caches where slaves won't + otherwise know from other replication streams that the cache should + be invalidated. + """ + cache_func = getattr(self, cache_name, None) + if not cache_func: + return + + cache_func.invalidate(keys) + await self.runInteraction( + "invalidate_cache_and_stream", + self._send_invalidation_to_replication, + cache_func.__name__, + keys, + ) + def _invalidate_cache_and_stream(self, txn, cache_func, keys): """Invalidates the cache and adds it to the cache stream so slaves will know to invalidate their caches. From 66d48f55a5bef37b55b0b5ecfb749e40a5199130 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 20 Jan 2020 15:11:38 +0000 Subject: [PATCH 2/2] Newsfile --- changelog.d/6748.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/6748.misc diff --git a/changelog.d/6748.misc b/changelog.d/6748.misc new file mode 100644 index 000000000000..de320d4cd9a7 --- /dev/null +++ b/changelog.d/6748.misc @@ -0,0 +1 @@ +Propagate cache invalidates from workers to other workers.