Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/6749.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Allow streaming cache 'invalidate all' to workers.
5 changes: 5 additions & 0 deletions docs/tcp_replication.md
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,11 @@ and they key to invalidate. For example:

> RDATA caches 550953771 ["get_user_by_id", ["@bob:example.com"], 1550574873251]

Alternatively, an entire cache can be invalidated by sending down a `null`
instead of the key. For example:

> RDATA caches 550953772 ["get_user_by_id", null, 1550574873252]

However, there are times when a number of caches need to be invalidated
at the same time with the same key. To reduce traffic we batch those
invalidations into a single poke by defining a special cache name that
Expand Down
7 changes: 6 additions & 1 deletion synapse/replication/slave/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,16 @@ def process_replication_rows(self, stream_name, token, rows):
self._cache_id_gen.advance(token)
for row in rows:
if row.cache_func == CURRENT_STATE_CACHE_NAME:
if row.keys is None:
raise Exception(
"Can't send an 'invalidate all' for current state cache"
)

room_id = row.keys[0]
members_changed = set(row.keys[1:])
self._invalidate_state_caches(room_id, members_changed)
else:
self._attempt_to_invalidate_cache(row.cache_func, tuple(row.keys))
self._attempt_to_invalidate_cache(row.cache_func, row.keys)

def _invalidate_cache_and_stream(self, txn, cache_func, keys):
txn.call_after(cache_func.invalidate, keys)
Expand Down
26 changes: 21 additions & 5 deletions synapse/replication/tcp/streams/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
import itertools
import logging
from collections import namedtuple
from typing import Any
from typing import Any, List, Optional

import attr

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -65,10 +67,24 @@
"PushersStreamRow",
("user_id", "app_id", "pushkey", "deleted"), # str # str # str # bool
)
CachesStreamRow = namedtuple(
"CachesStreamRow",
("cache_func", "keys", "invalidation_ts"), # str # list(str) # int
)


@attr.s
class CachesStreamRow:
"""Stream to inform workers they should invalidate their cache.

Attributes:
cache_func: Name of the cached function.
keys: The entry in the cache to invalidate. If None then will
invalidate all.
invalidation_ts: Timestamp of when the invalidation took place.
"""

cache_func = attr.ib(type=str)
keys = attr.ib(type=Optional[List[Any]])
invalidation_ts = attr.ib(type=int)


PublicRoomsStreamRow = namedtuple(
"PublicRoomsStreamRow",
(
Expand Down
18 changes: 13 additions & 5 deletions synapse/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import logging
import random
from abc import ABCMeta
from typing import Any, Optional

from six import PY2
from six.moves import builtins
Expand All @@ -26,7 +27,7 @@
from synapse.storage.database import LoggingTransaction # noqa: F401
from synapse.storage.database import make_in_list_sql_clause # noqa: F401
from synapse.storage.database import Database
from synapse.types import get_domain_from_id
from synapse.types import Collection, get_domain_from_id

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -63,17 +64,24 @@ def _invalidate_state_caches(self, room_id, members_changed):
self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
self._attempt_to_invalidate_cache("get_current_state_ids", (room_id,))

def _attempt_to_invalidate_cache(self, cache_name, key):
def _attempt_to_invalidate_cache(
self, cache_name: str, key: Optional[Collection[Any]]
):
"""Attempts to invalidate the cache of the given name, ignoring if the
cache doesn't exist. Mainly used for invalidating caches on workers,
where they may not have the cache.

Args:
cache_name (str)
key (tuple)
cache_name
key: Entry to invalidate. If None then invalidates the entire
cache.
"""

try:
getattr(self, cache_name).invalidate(key)
if key is None:
getattr(self, cache_name).invalidate_all()
else:
getattr(self, cache_name).invalidate(tuple(key))
except AttributeError:
# We probably haven't pulled in the cache in this worker,
# which is fine.
Expand Down
27 changes: 23 additions & 4 deletions synapse/storage/data_stores/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import itertools
import logging
from typing import Any, Iterable, Optional

from twisted.internet import defer

Expand Down Expand Up @@ -43,6 +44,14 @@ def _invalidate_cache_and_stream(self, txn, cache_func, keys):
txn.call_after(cache_func.invalidate, keys)
self._send_invalidation_to_replication(txn, cache_func.__name__, keys)

def _invalidate_all_cache_and_stream(self, txn, cache_func):
"""Invalidates the entire cache and adds it to the cache stream so slaves
will know to invalidate their caches.
"""

txn.call_after(cache_func.invalidate_all)
self._send_invalidation_to_replication(txn, cache_func.__name__, None)

def _invalidate_state_caches_and_stream(self, txn, room_id, members_changed):
"""Special case invalidation of caches based on current state.

Expand Down Expand Up @@ -73,17 +82,24 @@ def _invalidate_state_caches_and_stream(self, txn, room_id, members_changed):
txn, CURRENT_STATE_CACHE_NAME, [room_id]
)

def _send_invalidation_to_replication(self, txn, cache_name, keys):
def _send_invalidation_to_replication(
self, txn, cache_name: str, keys: Optional[Iterable[Any]]
):
"""Notifies replication that given cache has been invalidated.

Note that this does *not* invalidate the cache locally.

Args:
txn
cache_name (str)
keys (iterable[str])
cache_name
keys: Entry to invalidate. If None will invalidate all.
"""

if cache_name == CURRENT_STATE_CACHE_NAME and keys is None:
raise Exception(
"Can't stream invalidate all with magic current state cache"
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As someone who hasn't completely wrapped his head around our caching yet, what does "magic" mean in this context?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

)

if isinstance(self.database_engine, PostgresEngine):
# get_next() returns a context manager which is designed to wrap
# the transaction. However, we want to only get an ID when we want
Expand All @@ -95,13 +111,16 @@ def _send_invalidation_to_replication(self, txn, cache_name, keys):
txn.call_after(ctx.__exit__, None, None, None)
txn.call_after(self.hs.get_notifier().on_new_replication_data)

if keys is not None:
keys = list(keys)

self.db.simple_insert_txn(
txn,
table="cache_invalidation_stream",
values={
"stream_id": stream_id,
"cache_func": cache_name,
"keys": list(keys),
"keys": keys,
"invalidation_ts": self.clock.time_msec(),
},
)
Expand Down