From 6ac54da0109ad243ad91a1ef9529b3c5eeb65a84 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 17 Jul 2018 16:58:58 +0100 Subject: [PATCH 1/7] Add concept of StatelessContext This allows us to lazily load state on the master process as required (when using workers). The advantage of this is that in the common case the master doesn't need the full state of the events to persist them. We keep the concept of an EventContext that has the current_state_ids and prev_state_ids attributes, to avoid having to rewrite a lot of code. Instead, we shift the necessary functions to accept StatelessContext. --- synapse/events/snapshot.py | 162 +++++++++++++++++++------ synapse/handlers/_base.py | 8 +- synapse/handlers/message.py | 26 ++-- synapse/replication/http/send_event.py | 6 +- synapse/storage/events.py | 34 +++--- 5 files changed, 174 insertions(+), 62 deletions(-) diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index bcd9bb594670..aaa3aad29ded 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -15,29 +15,20 @@ from frozendict import frozendict +import abc + from twisted.internet import defer -class EventContext(object): +class StatelessContext(object): """ Attributes: - current_state_ids (dict[(str, str), str]): - The current state map including the current event. - (type, state_key) -> event_id - - prev_state_ids (dict[(str, str), str]): - The current state map excluding the current event. - (type, state_key) -> event_id - state_group (int|None): state group id, if the state has been stored as a state group. This is usually only None if e.g. the event is an outlier. rejected (bool|str): A rejection reason if the event was rejected, else False - push_actions (list[(str, list[object])]): list of (user_id, actions) - tuples - prev_group (int): Previously persisted state group. ``None`` for an outlier. delta_ids (dict[(str, str), str]): Delta from ``prev_group``. @@ -45,24 +36,31 @@ class EventContext(object): prev_state_events (?): XXX: is this ever set to anything other than the empty list? + + current_state_ids (dict[(str, str), str]|None): + The current state map including the current event. + (type, state_key) -> event_id + + prev_state_ids (dict[(str, str), str]|None): + The current state map excluding the current event. + (type, state_key) -> event_id """ + __metaclass__ = abc.ABCMeta + __slots__ = [ - "current_state_ids", - "prev_state_ids", "state_group", "rejected", "prev_group", "delta_ids", "prev_state_events", "app_service", + + "current_state_ids", + "prev_state_ids", ] def __init__(self): - # The current state including the current event - self.current_state_ids = None - # The current state excluding the current event - self.prev_state_ids = None self.state_group = None self.rejected = False @@ -76,9 +74,39 @@ def __init__(self): self.app_service = None + # The current state including the current event + self.current_state_ids = None + # The current state excluding the current event + self.prev_state_ids = None + + @abc.abstractmethod + def get_current_state_ids(self, store): + """Gets the current state IDs + + Returns: + Deferred[dict[(str, str), str]|None] + """ + raise NotImplementedError() + + @abc.abstractmethod + def get_prev_state_ids(self, store): + """Gets the prev state IDs + + Returns: + Deferred[dict[(str, str), str]|None] + """ + raise NotImplementedError() + + +class EventContext(StatelessContext): + """This is the same as StatelessContext, except guarantees that + current_state_ids and prev_state_ids are set. + """ + __slots__ = [] + def serialize(self, event): """Converts self to a type that can be serialized as JSON, and then - deserialized by `deserialize` + deserialized by `DeserializedContext.deserialize` Args: event (FrozenEvent): The event that this context relates to @@ -108,20 +136,52 @@ def serialize(self, event): "app_service_id": self.app_service.id if self.app_service else None } + def get_current_state_ids(self, store): + """Implements StatelessContext""" + return defer.succeed(self.current_state_ids) + + def get_prev_state_ids(self, store): + """Implements StatelessContext""" + return defer.succeed(self.prev_state_ids) + + +class DeserializedContext(StatelessContext): + """A context that comes from a serialized version of a StatelessContext. + + It does not necessarily have current_state_ids and prev_state_ids filled + out (unlike EventContext), but does cache the results of + `get_current_state_ids` and `get_prev_state_ids`. + + Attributes: + _have_fetched_state (bool): Whether we attempted to fill out + current_state_ids + _prev_state_id (str|None): If set then the event associated with the + context overrode the _prev_state_id + _event_type (str): The type of the event the context is associated with + _event_state_key (str|None): The state_key of the event the context is + associated with + """ + + __slots__ = [ + "_have_fetched_state", + "_prev_state_id", + "_event_type", + "_event_state_key", + ] + @staticmethod - @defer.inlineCallbacks def deserialize(store, input): """Converts a dict that was produced by `serialize` back into a - EventContext. + StatelessContext. Args: store (DataStore): Used to convert AS ID to AS object input (dict): A dict produced by `serialize` Returns: - EventContext + StatelessContext """ - context = EventContext() + context = DeserializedContext() context.state_group = input["state_group"] context.rejected = input["rejected"] context.prev_group = input["prev_group"] @@ -130,24 +190,56 @@ def deserialize(store, input): # We use the state_group and prev_state_id stuff to pull the # current_state_ids out of the DB and construct prev_state_ids. - prev_state_id = input["prev_state_id"] - event_type = input["event_type"] - event_state_key = input["event_state_key"] + context._prev_state_id = input["prev_state_id"] + context._event_type = input["event_type"] + context._event_state_key = input["event_state_key"] - context.current_state_ids = yield store.get_state_ids_for_group( - context.state_group, - ) - if prev_state_id and event_state_key: - context.prev_state_ids = dict(context.current_state_ids) - context.prev_state_ids[(event_type, event_state_key)] = prev_state_id - else: - context.prev_state_ids = context.current_state_ids + context._have_fetched_state = False app_service_id = input["app_service_id"] if app_service_id: context.app_service = store.get_app_service_by_id(app_service_id) - defer.returnValue(context) + return context + + @defer.inlineCallbacks + def get_current_state_ids(self, store): + """Implements StatelessContext""" + + if not self._have_fetched_state: + yield self._fill_out_state(store) + + defer.returnValue(self.current_state_ids) + + @defer.inlineCallbacks + def get_prev_state_ids(self, store): + """Implements StatelessContext""" + + if not self._have_fetched_state: + yield self._fill_out_state(store) + + defer.returnValue(self.current_state_ids) + + @defer.inlineCallbacks + def _fill_out_state(self, store): + """Called to populate the current_state_ids and prev_state_ids + attributes by loading from the database. + """ + self._have_fetched_state = True + + if self.state_group is None: + return + + self.current_state_ids = yield store.get_state_ids_for_group( + self.state_group, + ) + if self._prev_state_id: + self.prev_state_ids = dict(self.current_state_ids) + + key = (self._event_type, self._event_state_key) + self.prev_state_ids[key] = self._prev_state_id + else: + self.prev_state_ids = self.current_state_ids def _encode_state_dict(state_dict): diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index b6a8b3aa3b31..50ad1d329ae8 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -106,14 +106,20 @@ def ratelimit(self, requester, update=True): @defer.inlineCallbacks def maybe_kick_guest_users(self, event, context=None): + """ + Args: + event (FrozenEvent) + context (StatelessContext) + """ # Technically this function invalidates current_state by changing it. # Hopefully this isn't that important to the caller. if event.type == EventTypes.GuestAccess: guest_access = event.content.get("guest_access", "forbidden") if guest_access != "can_join": if context: + current_state_ids = yield context.get_current_state_ids(self.store) current_state = yield self.store.get_events( - list(context.current_state_ids.values()) + list(current_state_ids.values()) ) else: current_state = yield self.state_handler.get_current_state( diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index a39b852cebfc..dcfb9c9d54d8 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -850,6 +850,13 @@ def persist_and_notify_client_event( calculated the push actions for the event, and checked auth. This should only be run on master. + + Args: + requester (Requester) + event (FrozenEvent) + context (StatelessContext) + ratelimit(bool) + extra_users (list[UserID]) """ assert not self.config.worker_app @@ -884,9 +891,11 @@ def is_inviter_member_event(e): e.sender == event.sender ) + current_state_ids = yield context.get_current_state_ids(self.store) + state_to_include_ids = [ e_id - for k, e_id in iteritems(context.current_state_ids) + for k, e_id in current_state_ids.iteritems() if k[0] in self.hs.config.room_invite_state_types or k == (EventTypes.Member, event.sender) ] @@ -922,8 +931,9 @@ def is_inviter_member_event(e): ) if event.type == EventTypes.Redaction: + prev_state_ids = yield context.get_prev_state_ids(self.store) auth_events_ids = yield self.auth.compute_auth_events( - event, context.prev_state_ids, for_verification=True, + event, prev_state_ids, for_verification=True, ) auth_events = yield self.store.get_events(auth_events_ids) auth_events = { @@ -943,11 +953,13 @@ def is_inviter_member_event(e): "You don't have permission to redact events" ) - if event.type == EventTypes.Create and context.prev_state_ids: - raise AuthError( - 403, - "Changing the room create event is forbidden", - ) + if event.type == EventTypes.Create: + prev_state_ids = yield context.get_prev_state_ids(self.store) + if prev_state_ids: + raise AuthError( + 403, + "Changing the room create event is forbidden", + ) (event_stream_id, max_stream_id) = yield self.store.persist_event( event, context=context diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py index 2eede547921e..a054030b443e 100644 --- a/synapse/replication/http/send_event.py +++ b/synapse/replication/http/send_event.py @@ -24,7 +24,7 @@ SynapseError, ) from synapse.events import FrozenEvent -from synapse.events.snapshot import EventContext +from synapse.events.snapshot import DeserializedContext from synapse.http.servlet import RestServlet, parse_json_object_from_request from synapse.types import Requester, UserID from synapse.util.caches.response_cache import ResponseCache @@ -136,7 +136,9 @@ def _handle_request(self, request): event = FrozenEvent(event_dict, internal_metadata, rejected_reason) requester = Requester.deserialize(self.store, content["requester"]) - context = yield EventContext.deserialize(self.store, content["context"]) + context = yield DeserializedContext.deserialize( + self.store, content["context"], + ) ratelimit = content["ratelimit"] extra_users = [UserID.from_string(u) for u in content["extra_users"]] diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 2aaab0d02c7f..27559bc2bda3 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -32,7 +32,7 @@ from synapse.api.errors import SynapseError # these are only included to make the type annotations work from synapse.events import EventBase # noqa: F401 -from synapse.events.snapshot import EventContext # noqa: F401 +from synapse.events.snapshot import StatelessContext # noqa: F401 from synapse.storage.events_worker import EventsWorkerStore from synapse.types import RoomStreamToken, get_domain_from_id from synapse.util.async import ObservableDeferred @@ -89,7 +89,7 @@ def add_to_queue(self, room_id, events_and_contexts, backfilled): Args: room_id (str): - events_and_contexts (list[(EventBase, EventContext)]): + events_and_contexts (list[(EventBase, StatelessContext)]): backfilled (bool): Returns: @@ -266,7 +266,7 @@ def persist_event(self, event, context, backfilled=False): Args: event (EventBase): - context (EventContext): + context (StatelessContext): backfilled (bool): Returns: @@ -303,7 +303,7 @@ def _persist_events(self, events_and_contexts, backfilled=False, """Persist events to db Args: - events_and_contexts (list[(EventBase, EventContext)]): + events_and_contexts (list[(EventBase, StatelessContext)]): backfilled (bool): delete_existing (bool): @@ -520,7 +520,7 @@ def _get_new_state_after_events(self, room_id, events_context, old_latest_event_ room_id (str): room to which the events are being added. Used for logging etc - events_context (list[(EventBase, EventContext)]): + events_context (list[(EventBase, StatelessContext)]): events and contexts which are being added to the room old_latest_event_ids (iterable[str]): @@ -551,7 +551,7 @@ def _get_new_state_after_events(self, room_id, events_context, old_latest_event_ if ctx.state_group in state_groups_map: continue - state_groups_map[ctx.state_group] = ctx.current_state_ids + state_groups_map[ctx.state_group] = yield ctx.get_current_state_ids(self) # We need to map the event_ids to their state groups. First, let's # check if the event is one we're persisting, in which case we can @@ -672,7 +672,7 @@ def _persist_events_txn(self, txn, events_and_contexts, backfilled, Args: txn (twisted.enterprise.adbapi.Connection): db connection - events_and_contexts (list[(EventBase, EventContext)]): + events_and_contexts (list[(EventBase, StatelessContext)]): events to persist backfilled (bool): True if the events were backfilled delete_existing (bool): True to purge existing table rows for the @@ -884,9 +884,9 @@ def _filter_events_and_contexts_for_duplicates(cls, events_and_contexts): Pick the earliest non-outlier if there is one, else the earliest one. Args: - events_and_contexts (list[(EventBase, EventContext)]): + events_and_contexts (list[(EventBase, StatelessContext)]): Returns: - list[(EventBase, EventContext)]: filtered list + list[(EventBase, StatelessContext)]: filtered list """ new_events_and_contexts = OrderedDict() for event, context in events_and_contexts: @@ -907,7 +907,7 @@ def _update_room_depths_txn(self, txn, events_and_contexts, backfilled): Args: txn (twisted.enterprise.adbapi.Connection): db connection - events_and_contexts (list[(EventBase, EventContext)]): events + events_and_contexts (list[(EventBase, StatelessContext)]): events we are persisting backfilled (bool): True if the events were backfilled """ @@ -937,11 +937,11 @@ def _update_outliers_txn(self, txn, events_and_contexts): Args: txn (twisted.enterprise.adbapi.Connection): db connection - events_and_contexts (list[(EventBase, EventContext)]): events + events_and_contexts (list[(EventBase, StatelessContext)]): events we are persisting Returns: - list[(EventBase, EventContext)] new list, without events which + list[(EventBase, StatelessContext)] new list, without events which are already in the events table. """ txn.execute( @@ -1074,7 +1074,7 @@ def _store_event_txn(self, txn, events_and_contexts): Args: txn (twisted.enterprise.adbapi.Connection): db connection - events_and_contexts (list[(EventBase, EventContext)]): events + events_and_contexts (list[(EventBase, StatelessContext)]): events we are persisting """ @@ -1136,11 +1136,11 @@ def _store_rejected_events_txn(self, txn, events_and_contexts): Args: txn (twisted.enterprise.adbapi.Connection): db connection - events_and_contexts (list[(EventBase, EventContext)]): events + events_and_contexts (list[(EventBase, StatelessContext)]): events we are persisting Returns: - list[(EventBase, EventContext)] new list, without the rejected + list[(EventBase, StatelessContext)] new list, without the rejected events. """ # Remove the rejected events from the list now that we've added them @@ -1164,9 +1164,9 @@ def _update_metadata_tables_txn(self, txn, events_and_contexts, Args: txn (twisted.enterprise.adbapi.Connection): db connection - events_and_contexts (list[(EventBase, EventContext)]): events + events_and_contexts (list[(EventBase, StatelessContext)]): events we are persisting - all_events_and_contexts (list[(EventBase, EventContext)]): all + all_events_and_contexts (list[(EventBase, StatelessContext)]): all events that we were going to persist. This includes events we've already persisted, etc, that wouldn't appear in events_and_context. From 57d3facfb86d08e2f60af0880161a14be7a12f2d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 17 Jul 2018 17:34:55 +0100 Subject: [PATCH 2/7] Newsfile --- changelog.d/3550.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/3550.misc diff --git a/changelog.d/3550.misc b/changelog.d/3550.misc new file mode 100644 index 000000000000..2374dc0c44d5 --- /dev/null +++ b/changelog.d/3550.misc @@ -0,0 +1 @@ +Lazily load state on master process when using workers to reduce DB consumption From a20c8a906aca78013118044165ee689beeb68794 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 18 Jul 2018 09:48:51 +0100 Subject: [PATCH 3/7] isort --- synapse/events/snapshot.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index aaa3aad29ded..696a2734d34b 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -13,10 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from frozendict import frozendict - import abc +from frozendict import frozendict + from twisted.internet import defer From 311950e71420a7993fb37740c6e8d917395964a1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 20 Jul 2018 15:20:16 +0100 Subject: [PATCH 4/7] Shuffle things around and clarify doc comments --- synapse/events/snapshot.py | 83 ++++++++++++++++++++++++-------------- 1 file changed, 53 insertions(+), 30 deletions(-) diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index 696a2734d34b..20054dd98c76 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -19,6 +19,8 @@ from twisted.internet import defer +from synapse.util.logcontext import make_deferred_yieldable + class StatelessContext(object): """ @@ -36,14 +38,6 @@ class StatelessContext(object): prev_state_events (?): XXX: is this ever set to anything other than the empty list? - - current_state_ids (dict[(str, str), str]|None): - The current state map including the current event. - (type, state_key) -> event_id - - prev_state_ids (dict[(str, str), str]|None): - The current state map excluding the current event. - (type, state_key) -> event_id """ __metaclass__ = abc.ABCMeta @@ -55,9 +49,6 @@ class StatelessContext(object): "delta_ids", "prev_state_events", "app_service", - - "current_state_ids", - "prev_state_ids", ] def __init__(self): @@ -84,7 +75,8 @@ def get_current_state_ids(self, store): """Gets the current state IDs Returns: - Deferred[dict[(str, str), str]|None] + Deferred[dict[(str, str), str]|None]: Returns None if state_group + is None, which happens when the associated event is an outlier. """ raise NotImplementedError() @@ -93,16 +85,37 @@ def get_prev_state_ids(self, store): """Gets the prev state IDs Returns: - Deferred[dict[(str, str), str]|None] + Deferred[dict[(str, str), str]|None]: Returns None if state_group + is None, which happens when the associated event is an outlier. """ raise NotImplementedError() class EventContext(StatelessContext): - """This is the same as StatelessContext, except guarantees that - current_state_ids and prev_state_ids are set. + """This is the same as StatelessContext, except that + current_state_ids and prev_state_ids are already calculated. + + Attributes: + current_state_ids (dict[(str, str), str]|None): + The current state map including the current event. + (type, state_key) -> event_id + Is None if event is an outlier + + prev_state_ids (dict[(str, str), str]|None): + The current state map excluding the current event. + (type, state_key) -> event_id` + Is None if event is an outlier """ - __slots__ = [] + __slots__ = [ + "current_state_ids", + "prev_state_ids", + ] + + def __init__(self): + super(EventContext, self).__init__() + + self.current_state_ids = None + self.prev_state_ids = None def serialize(self, event): """Converts self to a type that can be serialized as JSON, and then @@ -148,21 +161,29 @@ def get_prev_state_ids(self, store): class DeserializedContext(StatelessContext): """A context that comes from a serialized version of a StatelessContext. - It does not necessarily have current_state_ids and prev_state_ids filled - out (unlike EventContext), but does cache the results of + It does not necessarily have current_state_ids and prev_state_ids precomputed + (unlike EventContext), but does cache the results of `get_current_state_ids` and `get_prev_state_ids`. Attributes: - _have_fetched_state (bool): Whether we attempted to fill out - current_state_ids + _have_fetched_state (Deferred|None): Resolves when *_state_ids have + been calculated. None if we haven't started calculating yet _prev_state_id (str|None): If set then the event associated with the context overrode the _prev_state_id _event_type (str): The type of the event the context is associated with _event_state_key (str|None): The state_key of the event the context is associated with + _current_state_ids (dict[(str, str), str]|None): + The current state map including the current event. + (type, state_key) -> event_id + _prev_state_ids (dict[(str, str), str]|None): + The current state map excluding the current event. + (type, state_key) -> event_id` """ __slots__ = [ + "_current_state_ids", + "_prev_state_ids", "_have_fetched_state", "_prev_state_id", "_event_type", @@ -179,7 +200,7 @@ def deserialize(store, input): input (dict): A dict produced by `serialize` Returns: - StatelessContext + DeserializedContext """ context = DeserializedContext() context.state_group = input["state_group"] @@ -188,13 +209,13 @@ def deserialize(store, input): context.delta_ids = _decode_state_dict(input["delta_ids"]) context.prev_state_events = input["prev_state_events"] - # We use the state_group and prev_state_id stuff to pull the - # current_state_ids out of the DB and construct prev_state_ids. context._prev_state_id = input["prev_state_id"] context._event_type = input["event_type"] context._event_state_key = input["event_state_key"] - context._have_fetched_state = False + context._have_fetched_state = None + context._current_state_ids = None + context._prev_state_ids = None app_service_id = input["app_service_id"] if app_service_id: @@ -207,7 +228,9 @@ def get_current_state_ids(self, store): """Implements StatelessContext""" if not self._have_fetched_state: - yield self._fill_out_state(store) + self._have_fetched_state = self._fill_out_state(store) + + yield make_deferred_yieldable(self._have_fetched_state) defer.returnValue(self.current_state_ids) @@ -216,24 +239,24 @@ def get_prev_state_ids(self, store): """Implements StatelessContext""" if not self._have_fetched_state: - yield self._fill_out_state(store) + self._have_fetched_state = self._fill_out_state(store) + + yield make_deferred_yieldable(self._have_fetched_state) defer.returnValue(self.current_state_ids) @defer.inlineCallbacks def _fill_out_state(self, store): - """Called to populate the current_state_ids and prev_state_ids + """Called to populate the _current_state_ids and _prev_state_ids attributes by loading from the database. """ - self._have_fetched_state = True - if self.state_group is None: return self.current_state_ids = yield store.get_state_ids_for_group( self.state_group, ) - if self._prev_state_id: + if self._prev_state_id and self._event_state_key is not None: self.prev_state_ids = dict(self.current_state_ids) key = (self._event_type, self._event_state_key) From 5b7da74e80e08024bf1b40aaa5ff890aab362949 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 Jul 2018 10:15:29 +0100 Subject: [PATCH 5/7] Rename _have_fetched_state --- synapse/events/snapshot.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index 20054dd98c76..751a3d447b93 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -166,7 +166,7 @@ class DeserializedContext(StatelessContext): `get_current_state_ids` and `get_prev_state_ids`. Attributes: - _have_fetched_state (Deferred|None): Resolves when *_state_ids have + _fetching_state_deferred (Deferred|None): Resolves when *_state_ids have been calculated. None if we haven't started calculating yet _prev_state_id (str|None): If set then the event associated with the context overrode the _prev_state_id @@ -184,7 +184,7 @@ class DeserializedContext(StatelessContext): __slots__ = [ "_current_state_ids", "_prev_state_ids", - "_have_fetched_state", + "_fetching_state_deferred", "_prev_state_id", "_event_type", "_event_state_key", @@ -213,7 +213,7 @@ def deserialize(store, input): context._event_type = input["event_type"] context._event_state_key = input["event_state_key"] - context._have_fetched_state = None + context._fetching_state_deferred = None context._current_state_ids = None context._prev_state_ids = None @@ -227,10 +227,10 @@ def deserialize(store, input): def get_current_state_ids(self, store): """Implements StatelessContext""" - if not self._have_fetched_state: - self._have_fetched_state = self._fill_out_state(store) + if not self._fetching_state_deferred: + self._fetching_state_deferred = self._fill_out_state(store) - yield make_deferred_yieldable(self._have_fetched_state) + yield make_deferred_yieldable(self._fetching_state_deferred) defer.returnValue(self.current_state_ids) @@ -238,10 +238,10 @@ def get_current_state_ids(self, store): def get_prev_state_ids(self, store): """Implements StatelessContext""" - if not self._have_fetched_state: - self._have_fetched_state = self._fill_out_state(store) + if not self._fetching_state_deferred: + self._fetching_state_deferred = self._fill_out_state(store) - yield make_deferred_yieldable(self._have_fetched_state) + yield make_deferred_yieldable(self._fetching_state_deferred) defer.returnValue(self.current_state_ids) From cdfd92d1afd149848d9f6ce40388cff5690cfc73 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 Jul 2018 10:17:41 +0100 Subject: [PATCH 6/7] Add missing run_in_background --- synapse/events/snapshot.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index 751a3d447b93..cc3309b2abae 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -19,7 +19,7 @@ from twisted.internet import defer -from synapse.util.logcontext import make_deferred_yieldable +from synapse.util.logcontext import make_deferred_yieldable, run_in_background class StatelessContext(object): @@ -228,7 +228,9 @@ def get_current_state_ids(self, store): """Implements StatelessContext""" if not self._fetching_state_deferred: - self._fetching_state_deferred = self._fill_out_state(store) + self._fetching_state_deferred = run_in_background( + self._fill_out_state, store, + ) yield make_deferred_yieldable(self._fetching_state_deferred) @@ -239,7 +241,9 @@ def get_prev_state_ids(self, store): """Implements StatelessContext""" if not self._fetching_state_deferred: - self._fetching_state_deferred = self._fill_out_state(store) + self._fetching_state_deferred = run_in_background( + self._fill_out_state, store, + ) yield make_deferred_yieldable(self._fetching_state_deferred) From c451ce913e3bcc3541392df6d83b9e35e822fe00 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 Jul 2018 10:18:43 +0100 Subject: [PATCH 7/7] Fixup and remove spurious attributes --- synapse/events/snapshot.py | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index cc3309b2abae..1b10f0a76d1d 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -65,11 +65,6 @@ def __init__(self): self.app_service = None - # The current state including the current event - self.current_state_ids = None - # The current state excluding the current event - self.prev_state_ids = None - @abc.abstractmethod def get_current_state_ids(self, store): """Gets the current state IDs @@ -234,7 +229,7 @@ def get_current_state_ids(self, store): yield make_deferred_yieldable(self._fetching_state_deferred) - defer.returnValue(self.current_state_ids) + defer.returnValue(self._current_state_ids) @defer.inlineCallbacks def get_prev_state_ids(self, store): @@ -247,7 +242,7 @@ def get_prev_state_ids(self, store): yield make_deferred_yieldable(self._fetching_state_deferred) - defer.returnValue(self.current_state_ids) + defer.returnValue(self._prev_state_ids) @defer.inlineCallbacks def _fill_out_state(self, store): @@ -257,16 +252,16 @@ def _fill_out_state(self, store): if self.state_group is None: return - self.current_state_ids = yield store.get_state_ids_for_group( + self._current_state_ids = yield store.get_state_ids_for_group( self.state_group, ) if self._prev_state_id and self._event_state_key is not None: - self.prev_state_ids = dict(self.current_state_ids) + self._prev_state_ids = dict(self._current_state_ids) key = (self._event_type, self._event_state_key) - self.prev_state_ids[key] = self._prev_state_id + self._prev_state_ids[key] = self._prev_state_id else: - self.prev_state_ids = self.current_state_ids + self._prev_state_ids = self._current_state_ids def _encode_state_dict(state_dict):