diff --git a/changelog.d/9739.bugfix b/changelog.d/9739.bugfix new file mode 100644 index 000000000000..6c5f4b673114 --- /dev/null +++ b/changelog.d/9739.bugfix @@ -0,0 +1 @@ +Fix #8518 and #3880. \ No newline at end of file diff --git a/changelog.d/9739.feature b/changelog.d/9739.feature new file mode 100644 index 000000000000..895082605e5c --- /dev/null +++ b/changelog.d/9739.feature @@ -0,0 +1 @@ +Added experimental support to cache `/sync` responses with config key `experimental_features.sync_cache_timeout_ms` (in milliseconds). \ No newline at end of file diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py index eb96ecda74ea..0f793d901e25 100644 --- a/synapse/config/experimental.py +++ b/synapse/config/experimental.py @@ -36,3 +36,8 @@ def read_config(self, config: JsonDict, **kwargs): # MSC3026 (busy presence state) self.msc3026_enabled = experimental.get("msc3026_enabled", False) # type: bool + + # /sync ResponseCache timeout (in ms) + self.sync_cache_timeout = experimental.get( + "sync_cache_timeout_ms", 0 + ) # type: int diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 7b356ba7e5a8..da7326ad4f83 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -41,7 +41,7 @@ from synapse.util.async_helpers import concurrently_execute from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.caches.lrucache import LruCache -from synapse.util.caches.response_cache import ResponseCache +from synapse.util.caches.sync_response_cache import SyncResponseCache from synapse.util.metrics import Measure, measure_func from synapse.visibility import filter_events_for_client @@ -244,9 +244,11 @@ def __init__(self, hs: "HomeServer"): self.presence_handler = hs.get_presence_handler() self.event_sources = hs.get_event_sources() self.clock = hs.get_clock() - self.response_cache = ResponseCache( - hs.get_clock(), "sync" - ) # type: ResponseCache[Tuple[Any, ...]] + self.response_cache = SyncResponseCache( + hs.get_clock(), + "sync", + timeout_ms=self.hs_config.experimental.sync_cache_timeout, + ) # type: SyncResponseCache[Tuple[Any, ...]] self.state = hs.get_state_handler() self.auth = hs.get_auth() self.storage = hs.get_storage() @@ -278,8 +280,10 @@ async def wait_for_sync_for_user( user_id = sync_config.user.to_string() await self.auth.check_auth_blocking(requester=requester) - res = await self.response_cache.wrap( + res = await self.response_cache.wrap_conditional( sync_config.request_key, + # Evict cache if next_batch would refer to this cached result + lambda result: since_token != result.next_batch, self._wait_for_sync_for_user, sync_config, since_token, diff --git a/synapse/util/caches/sync_response_cache.py b/synapse/util/caches/sync_response_cache.py new file mode 100644 index 000000000000..44d9ae5b19fe --- /dev/null +++ b/synapse/util/caches/sync_response_cache.py @@ -0,0 +1,101 @@ +import logging +from typing import Any, Callable, Dict + +from twisted.internet import defer + +from synapse.util import Clock +from synapse.util.async_helpers import ObservableDeferred +from synapse.util.caches.response_cache import ResponseCache, T + +logger = logging.getLogger(__name__) + + +# A special class for /sync responses, to conditionally cache these. +class SyncResponseCache(ResponseCache[T]): + def __init__(self, clock: Clock, name: str, timeout_ms: float = 0): + super().__init__(clock, name, timeout_ms) + + self.conditionals = {} # type: Dict[T, Callable[[Any], bool]] + + def run_conditional(self, key: T, result: Any) -> bool: + """Runs a conditional set on key T, defaults to True""" + cond = self.conditionals.get(key, None) + if cond is None: + return True + else: + try: + # Below type annotation is needed for mypy to shush about some statements being unreachable, + # we essentially have to not trust other functions to be able to correctly recover from any fallout + # (and log it) + res = cond(result) # type: Any + except Exception: + logger.exception( + "[%s]: Executing conditional %r on %s raised an exception.", + self._name, + cond, + key, + ) + # Evict cache out of caution. + return False + else: + if not isinstance(res, bool): + logger.warning( + "[%s]: Conditional %r returned non-bool value %r (for key %r)", + self._name, + cond, + res, + key, + ) + # Return concrete boolean value based on falsy or truthiness. + # If this raises, then so be it, then this value wasn't ever supposed to be true" or "false" + # anyways, then have it be a scream test. + return bool(res) + else: + return res + + # Copy this method wholesale from ResponseCache to be able to alter the inner `remove` function + def set(self, key: T, deferred: defer.Deferred) -> defer.Deferred: + """Same as ResponseCache.set, but is conditional-aware""" + result = ObservableDeferred(deferred, consumeErrors=True) + self.pending_result_cache[key] = result + + def remove(r): + if self.timeout_sec and ( + not isinstance(r, BaseException) and self.run_conditional(key, r) + ): + self.clock.call_later( + self.timeout_sec, self.pending_result_cache.pop, key, None + ) + else: + self.pending_result_cache.pop(key, None) + + self.conditionals.pop(key, None) + + return r + + result.addBoth(remove) + return result.observe() + + def wrap_conditional( + self, + key: T, + conditional: "Callable[[Any], bool]", + callback: "Callable[..., Any]", + *args: Any, + **kwargs: Any + ) -> defer.Deferred: + """Same as wrap(), but adds a conditional to be executed on completion. + + Only the very first caller with this key, between both wrap() and wrap_conditional(), will set the + conditional function, otherwise the 'conditional' argument will be ignored.""" + + if self.get(key) is None: # we are the first caller + logger.debug( + "[%s]: We are the very first caller for [%s], setting conditional %r...", + self._name, + key, + conditional, + ) + self.conditionals[key] = conditional + + return self.wrap(key, callback, *args, **kwargs) diff --git a/tests/util/caches/test_sync_responsecache.py b/tests/util/caches/test_sync_responsecache.py new file mode 100644 index 000000000000..2e587ce5ccbf --- /dev/null +++ b/tests/util/caches/test_sync_responsecache.py @@ -0,0 +1,134 @@ +from synapse.util.caches.sync_response_cache import SyncResponseCache + +from tests.server import get_clock +from tests.unittest import TestCase + +# A random callback that returns an object +CALLBACK = lambda: OBJ + +# An object, can be equalized to itself +OBJ = {0} + +# The key used on the caches throughout this file +KEY = 0 + +# Easy conditionals +YES = lambda _: True +NO = lambda _: False + + +class SyncResponseCacheTestCase(TestCase): + """ + A TestCase class for SyncResponseCache. + + The test-case function naming has some logic to it in it's parts, here's some notes about it: + first: Denotes tests that test wrap_conditional as a "first caller" + later: Denotes tests that test wrap_conditional as a non-"first caller" + multi: Denotes tests that have multiple consequent calls to wrap* + approve: Denotes tests where the conditional approves of the results (letting cache). + disapprove: Denotes tests where the conditional disapproves of the result (expiring it). + hit: Denotes tests which expected outcome is a cache hit. + miss: Denotes tests which expected outcome is a cache miss. + """ + + def setUp(self): + self.reactor, self.clock = get_clock() + self.cache = SyncResponseCache(self.clock, "keeping_cache", timeout_ms=1000) + + # Extra helper functions + + def is_hit(self): + self.assertEqual( + OBJ, + self.successResultOf(self.cache.get(KEY)), + "cache should not be expired", + ) + + def is_miss(self): + self.assertIsNone(self.cache.get(KEY), "cache should be expired") + + def pump(self): + self.reactor.pump((1,)) + + # Like CALLBACK, but waits a second, and is async + async def delayed_callback(self): + await self.clock.sleep(1) + return OBJ + + # Actual tests + + def test_cache_first_approve_hit(self): + self.cache.wrap_conditional(KEY, YES, CALLBACK) + + self.is_hit() + + def test_cache_first_disapprove_miss(self): + self.cache.wrap_conditional(KEY, NO, CALLBACK) + + self.is_miss() + + def test_cache_later_approve_hit(self): + # first + self.cache.wrap(KEY, CALLBACK) + + # second + self.cache.wrap_conditional(KEY, YES, CALLBACK) + + self.is_hit() + + def test_cache_later_disapprove_hit(self): + # first + self.cache.wrap(KEY, CALLBACK) + + # second + self.cache.wrap_conditional(KEY, NO, CALLBACK) + + self.is_hit() + + # Show how later calls to wrap_conditional dont change it's conditional outcome + # These need self.delayed_callback, because else the first wrap* (by logic of run_in_background) + # will also run the function *and* it's callbacks, including (Sync)ResponseCache.set::{{remove}} + + def test_cache_multi_first_approve_later_approve_hit(self): + # first + self.cache.wrap_conditional(KEY, YES, self.delayed_callback) + + # second + self.cache.wrap_conditional(KEY, YES, self.delayed_callback) + + self.pump() + + self.is_hit() + + def test_cache_multi_first_approve_later_disapprove_hit(self): + # first + self.cache.wrap_conditional(KEY, YES, self.delayed_callback) + + # second + self.cache.wrap_conditional(KEY, NO, self.delayed_callback) + + self.pump() + + self.is_hit() + + def test_cache_multi_first_disapprove_later_approve_miss(self): + # first + self.cache.wrap_conditional(KEY, NO, self.delayed_callback) + + # second + self.cache.wrap_conditional(KEY, YES, self.delayed_callback) + + self.pump() + + self.is_miss() + + def test_cache_multi_first_disapprove_later_disapprove_miss(self): + # first + self.cache.wrap_conditional(KEY, NO, self.delayed_callback) + + # second + self.cache.wrap_conditional(KEY, NO, self.delayed_callback) + + self.pump() + + self.is_miss()