Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/3995.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix bug in event persistence logic which caused 'NoneType is not iterable'
102 changes: 69 additions & 33 deletions synapse/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from synapse.storage.event_federation import EventFederationStore
from synapse.storage.events_worker import EventsWorkerStore
from synapse.types import RoomStreamToken, get_domain_from_id
from synapse.util import batch_iter
from synapse.util.async_helpers import ObservableDeferred
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from synapse.util.frozenutils import frozendict_json_encoder
Expand Down Expand Up @@ -386,12 +387,10 @@ def _persist_events(self, events_and_contexts, backfilled=False,
)

for room_id, ev_ctx_rm in iteritems(events_by_room):
# Work out new extremities by recursively adding and removing
# the new events.
latest_event_ids = yield self.get_latest_event_ids_in_room(
room_id
)
new_latest_event_ids = yield self._calculate_new_extremeties(
new_latest_event_ids = yield self._calculate_new_extremities(
room_id, ev_ctx_rm, latest_event_ids
)

Expand All @@ -400,6 +399,12 @@ def _persist_events(self, events_and_contexts, backfilled=False,
# No change in extremities, so no change in state
continue

# there should always be at least one forward extremity.
# (except during the initial persistence of the send_join
# results, in which case there will be no existing
# extremities, so we'll `continue` above and skip this bit.)
assert new_latest_event_ids, "No forward extremities left!"

new_forward_extremeties[room_id] = new_latest_event_ids

len_1 = (
Expand Down Expand Up @@ -517,44 +522,79 @@ def _persist_events(self, events_and_contexts, backfilled=False,
)

@defer.inlineCallbacks
def _calculate_new_extremeties(self, room_id, event_contexts, latest_event_ids):
"""Calculates the new forward extremeties for a room given events to
def _calculate_new_extremities(self, room_id, event_contexts, latest_event_ids):
"""Calculates the new forward extremities for a room given events to
persist.

Assumes that we are only persisting events for one room at a time.
"""
new_latest_event_ids = set(latest_event_ids)
# First, add all the new events to the list
new_latest_event_ids.update(
event.event_id for event, ctx in event_contexts

# we're only interested in new events which aren't outliers and which aren't
# being rejected.
new_events = [
event for event, ctx in event_contexts
if not event.internal_metadata.is_outlier() and not ctx.rejected
]

# start with the existing forward extremities
result = set(latest_event_ids)

# add all the new events to the list
result.update(
event.event_id for event in new_events
)
# Now remove all events that are referenced by the to-be-added events
new_latest_event_ids.difference_update(

# Now remove all events which are prev_events of any of the new events
result.difference_update(
e_id
for event, ctx in event_contexts
for event in new_events
for e_id, _ in event.prev_events
if not event.internal_metadata.is_outlier() and not ctx.rejected
)

# And finally remove any events that are referenced by previously added
# events.
rows = yield self._simple_select_many_batch(
table="event_edges",
column="prev_event_id",
iterable=list(new_latest_event_ids),
retcols=["prev_event_id"],
keyvalues={
"is_state": False,
},
desc="_calculate_new_extremeties",
)
# Finally, remove any events which are prev_events of any existing events.
existing_prevs = yield self._get_events_which_are_prevs(result)
result.difference_update(existing_prevs)

new_latest_event_ids.difference_update(
row["prev_event_id"] for row in rows
)
defer.returnValue(result)

defer.returnValue(new_latest_event_ids)
@defer.inlineCallbacks
def _get_events_which_are_prevs(self, event_ids):
"""Filter the supplied list of event_ids to get those which are prev_events of
existing (non-outlier/rejected) events.

Args:
event_ids (Iterable[str]): event ids to filter

Returns:
Deferred[List[str]]: filtered event ids
"""
results = []

def _get_events(txn, batch):
sql = """
SELECT prev_event_id
FROM event_edges
INNER JOIN events USING (event_id)
LEFT JOIN rejections USING (event_id)
WHERE
prev_event_id IN (%s)
AND NOT events.outlier
AND rejections.event_id IS NULL
""" % (
",".join("?" for _ in batch),
)

txn.execute(sql, batch)
results.extend(r[0] for r in txn)

for chunk in batch_iter(event_ids, 100):
yield self.runInteraction(
"_get_events_which_are_prevs",
_get_events,
chunk,
)

defer.returnValue(results)

@defer.inlineCallbacks
def _get_new_state_after_events(self, room_id, events_context, old_latest_event_ids,
Expand Down Expand Up @@ -586,10 +626,6 @@ def _get_new_state_after_events(self, room_id, events_context, old_latest_event_
the new current state is only returned if we've already calculated
it.
"""

if not new_latest_event_ids:
return

# map from state_group to ((type, key) -> event_id) state map
state_groups_map = {}

Expand Down