From 8ab331087a1ef43540ae6c250f6f85d229621192 Mon Sep 17 00:00:00 2001 From: Jonathan de Jong Date: Sat, 3 Apr 2021 20:20:08 +0200 Subject: [PATCH 1/4] add initial work --- synapse/config/experimental.py | 5 + synapse/handlers/sync.py | 14 +- synapse/util/caches/sync_response_cache.py | 101 ++++++++++++++ tests/util/caches/test_sync_responsecache.py | 134 +++++++++++++++++++ 4 files changed, 249 insertions(+), 5 deletions(-) create mode 100644 synapse/util/caches/sync_response_cache.py create mode 100644 tests/util/caches/test_sync_responsecache.py diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py index eb96ecda74ea..0f793d901e25 100644 --- a/synapse/config/experimental.py +++ b/synapse/config/experimental.py @@ -36,3 +36,8 @@ def read_config(self, config: JsonDict, **kwargs): # MSC3026 (busy presence state) self.msc3026_enabled = experimental.get("msc3026_enabled", False) # type: bool + + # /sync ResponseCache timeout (in ms) + self.sync_cache_timeout = experimental.get( + "sync_cache_timeout_ms", 0 + ) # type: int diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 7b356ba7e5a8..da7326ad4f83 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -41,7 +41,7 @@ from synapse.util.async_helpers import concurrently_execute from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.caches.lrucache import LruCache -from synapse.util.caches.response_cache import ResponseCache +from synapse.util.caches.sync_response_cache import SyncResponseCache from synapse.util.metrics import Measure, measure_func from synapse.visibility import filter_events_for_client @@ -244,9 +244,11 @@ def __init__(self, hs: "HomeServer"): self.presence_handler = hs.get_presence_handler() self.event_sources = hs.get_event_sources() self.clock = hs.get_clock() - self.response_cache = ResponseCache( - hs.get_clock(), "sync" - ) # type: ResponseCache[Tuple[Any, ...]] + self.response_cache = SyncResponseCache( + hs.get_clock(), + "sync", + timeout_ms=self.hs_config.experimental.sync_cache_timeout, + ) # type: SyncResponseCache[Tuple[Any, ...]] self.state = hs.get_state_handler() self.auth = hs.get_auth() self.storage = hs.get_storage() @@ -278,8 +280,10 @@ async def wait_for_sync_for_user( user_id = sync_config.user.to_string() await self.auth.check_auth_blocking(requester=requester) - res = await self.response_cache.wrap( + res = await self.response_cache.wrap_conditional( sync_config.request_key, + # Evict cache if next_batch would refer to this cached result + lambda result: since_token != result.next_batch, self._wait_for_sync_for_user, sync_config, since_token, diff --git a/synapse/util/caches/sync_response_cache.py b/synapse/util/caches/sync_response_cache.py new file mode 100644 index 000000000000..45ed075b09c8 --- /dev/null +++ b/synapse/util/caches/sync_response_cache.py @@ -0,0 +1,101 @@ +import logging +from typing import Any, Callable, Dict + +from twisted.internet import defer + +from synapse.util import Clock +from synapse.util.async_helpers import ObservableDeferred +from synapse.util.caches.response_cache import ResponseCache, T + +logger = logging.getLogger(__name__) + + +# A special class for /sync responses, to conditionally cache these. +class SyncResponseCache(ResponseCache[T]): + def __init__(self, clock: Clock, name: str, timeout_ms: float = 0): + super().__init__(clock, name, timeout_ms) + + self.conditionals = {} # type: Dict[T, Callable[[Any], bool]] + + def run_conditional(self, key: T, result: Any) -> bool: + """Runs a conditional set on key T, defaults to True""" + cond = self.conditionals.get(key, None) + if cond is None: + return True + else: + try: + # Below type annotation is needed for mypy to shush about some statements being unreachable, + # we essentially have to not trust other functions to be able to correctly recover from any fallout + # (and log it) + res = cond(result) # type: Any + except Exception: + logger.exception( + "[%s]:Executing conditional %r on %s raised an exception.", + self._name, + cond, + key, + ) + # Evict cache out of caution. + return False + else: + if not isinstance(res, bool): + logger.warning( + "[%s]:Conditional %r returned non-bool value %r (for key %r)", + self._name, + cond, + res, + key, + ) + # Return concrete boolean value based on falsy or truthiness. + # If this raises, then so be it, then this value wasn't ever supposed to be true" or "false" + # anyways, then have it be a scream test. + return bool(res) + else: + return res + + # Copy this method wholesale from ResponseCache to be able to alter the inner `remove` function + def set(self, key: T, deferred: defer.Deferred) -> defer.Deferred: + """Same as ResponseCache.set, but is conditional-aware""" + result = ObservableDeferred(deferred, consumeErrors=True) + self.pending_result_cache[key] = result + + def remove(r): + if self.timeout_sec and ( + not isinstance(r, BaseException) and self.run_conditional(key, r) + ): + self.clock.call_later( + self.timeout_sec, self.pending_result_cache.pop, key, None + ) + else: + self.pending_result_cache.pop(key, None) + + self.conditionals.pop(key, None) + + return r + + result.addBoth(remove) + return result.observe() + + def wrap_conditional( + self, + key: T, + conditional: "Callable[[Any], bool]", + callback: "Callable[..., Any]", + *args: Any, + **kwargs: Any + ) -> defer.Deferred: + """Same as wrap(), but adds a conditional to be executed on completion. + + Only the very first caller with this key, between both wrap() and wrap_conditional(), will set the + conditional function, otherwise the 'conditional' argument will be ignored.""" + + if self.get(key) is None: # we are the first caller + logger.debug( + "[%s]: We are the very first caller for [%s], setting conditional %r...", + self._name, + key, + conditional, + ) + self.conditionals[key] = conditional + + return self.wrap(key, callback, *args, **kwargs) diff --git a/tests/util/caches/test_sync_responsecache.py b/tests/util/caches/test_sync_responsecache.py new file mode 100644 index 000000000000..2e587ce5ccbf --- /dev/null +++ b/tests/util/caches/test_sync_responsecache.py @@ -0,0 +1,134 @@ +from synapse.util.caches.sync_response_cache import SyncResponseCache + +from tests.server import get_clock +from tests.unittest import TestCase + +# A random callback that returns an object +CALLBACK = lambda: OBJ + +# An object, can be equalized to itself +OBJ = {0} + +# The key used on the caches throughout this file +KEY = 0 + +# Easy conditionals +YES = lambda _: True +NO = lambda _: False + + +class SyncResponseCacheTestCase(TestCase): + """ + A TestCase class for SyncResponseCache. + + The test-case function naming has some logic to it in it's parts, here's some notes about it: + first: Denotes tests that test wrap_conditional as a "first caller" + later: Denotes tests that test wrap_conditional as a non-"first caller" + multi: Denotes tests that have multiple consequent calls to wrap* + approve: Denotes tests where the conditional approves of the results (letting cache). + disapprove: Denotes tests where the conditional disapproves of the result (expiring it). + hit: Denotes tests which expected outcome is a cache hit. + miss: Denotes tests which expected outcome is a cache miss. + """ + + def setUp(self): + self.reactor, self.clock = get_clock() + self.cache = SyncResponseCache(self.clock, "keeping_cache", timeout_ms=1000) + + # Extra helper functions + + def is_hit(self): + self.assertEqual( + OBJ, + self.successResultOf(self.cache.get(KEY)), + "cache should not be expired", + ) + + def is_miss(self): + self.assertIsNone(self.cache.get(KEY), "cache should be expired") + + def pump(self): + self.reactor.pump((1,)) + + # Like CALLBACK, but waits a second, and is async + async def delayed_callback(self): + await self.clock.sleep(1) + return OBJ + + # Actual tests + + def test_cache_first_approve_hit(self): + self.cache.wrap_conditional(KEY, YES, CALLBACK) + + self.is_hit() + + def test_cache_first_disapprove_miss(self): + self.cache.wrap_conditional(KEY, NO, CALLBACK) + + self.is_miss() + + def test_cache_later_approve_hit(self): + # first + self.cache.wrap(KEY, CALLBACK) + + # second + self.cache.wrap_conditional(KEY, YES, CALLBACK) + + self.is_hit() + + def test_cache_later_disapprove_hit(self): + # first + self.cache.wrap(KEY, CALLBACK) + + # second + self.cache.wrap_conditional(KEY, NO, CALLBACK) + + self.is_hit() + + # Show how later calls to wrap_conditional dont change it's conditional outcome + # These need self.delayed_callback, because else the first wrap* (by logic of run_in_background) + # will also run the function *and* it's callbacks, including (Sync)ResponseCache.set::{{remove}} + + def test_cache_multi_first_approve_later_approve_hit(self): + # first + self.cache.wrap_conditional(KEY, YES, self.delayed_callback) + + # second + self.cache.wrap_conditional(KEY, YES, self.delayed_callback) + + self.pump() + + self.is_hit() + + def test_cache_multi_first_approve_later_disapprove_hit(self): + # first + self.cache.wrap_conditional(KEY, YES, self.delayed_callback) + + # second + self.cache.wrap_conditional(KEY, NO, self.delayed_callback) + + self.pump() + + self.is_hit() + + def test_cache_multi_first_disapprove_later_approve_miss(self): + # first + self.cache.wrap_conditional(KEY, NO, self.delayed_callback) + + # second + self.cache.wrap_conditional(KEY, YES, self.delayed_callback) + + self.pump() + + self.is_miss() + + def test_cache_multi_first_disapprove_later_disapprove_miss(self): + # first + self.cache.wrap_conditional(KEY, NO, self.delayed_callback) + + # second + self.cache.wrap_conditional(KEY, NO, self.delayed_callback) + + self.pump() + + self.is_miss() From 9a36ecbd625fc48fd2ab947cdde1e9cb9dc9c9d4 Mon Sep 17 00:00:00 2001 From: Jonathan de Jong Date: Sat, 3 Apr 2021 20:30:19 +0200 Subject: [PATCH 2/4] news --- changelog.d/9739.bugfix | 1 + changelog.d/9739.feature | 1 + 2 files changed, 2 insertions(+) create mode 100644 changelog.d/9739.bugfix create mode 100644 changelog.d/9739.feature diff --git a/changelog.d/9739.bugfix b/changelog.d/9739.bugfix new file mode 100644 index 000000000000..08e7dbb8607e --- /dev/null +++ b/changelog.d/9739.bugfix @@ -0,0 +1 @@ +Fix #8518. \ No newline at end of file diff --git a/changelog.d/9739.feature b/changelog.d/9739.feature new file mode 100644 index 000000000000..895082605e5c --- /dev/null +++ b/changelog.d/9739.feature @@ -0,0 +1 @@ +Added experimental support to cache `/sync` responses with config key `experimental_features.sync_cache_timeout_ms` (in milliseconds). \ No newline at end of file From b96580051e345b3e750c021adc6010dd9b47e29d Mon Sep 17 00:00:00 2001 From: Jonathan de Jong Date: Sat, 3 Apr 2021 20:32:55 +0200 Subject: [PATCH 3/4] more news --- changelog.d/9739.bugfix | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.d/9739.bugfix b/changelog.d/9739.bugfix index 08e7dbb8607e..6c5f4b673114 100644 --- a/changelog.d/9739.bugfix +++ b/changelog.d/9739.bugfix @@ -1 +1 @@ -Fix #8518. \ No newline at end of file +Fix #8518 and #3880. \ No newline at end of file From e8b487b70a3ae42be7253a577af38301f01c3ae9 Mon Sep 17 00:00:00 2001 From: Jonathan de Jong Date: Sat, 3 Apr 2021 20:46:37 +0200 Subject: [PATCH 4/4] spacing --- synapse/util/caches/sync_response_cache.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/util/caches/sync_response_cache.py b/synapse/util/caches/sync_response_cache.py index 45ed075b09c8..44d9ae5b19fe 100644 --- a/synapse/util/caches/sync_response_cache.py +++ b/synapse/util/caches/sync_response_cache.py @@ -30,7 +30,7 @@ def run_conditional(self, key: T, result: Any) -> bool: res = cond(result) # type: Any except Exception: logger.exception( - "[%s]:Executing conditional %r on %s raised an exception.", + "[%s]: Executing conditional %r on %s raised an exception.", self._name, cond, key, @@ -40,7 +40,7 @@ def run_conditional(self, key: T, result: Any) -> bool: else: if not isinstance(res, bool): logger.warning( - "[%s]:Conditional %r returned non-bool value %r (for key %r)", + "[%s]: Conditional %r returned non-bool value %r (for key %r)", self._name, cond, res,