Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 42 additions & 20 deletions synapse/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -2033,6 +2033,8 @@ def _delete_old_state_txn(self, txn, room_id, topological_ordering):
for event_id, state_key in event_rows:
txn.call_after(self._get_state_group_for_event.invalidate, (event_id,))

logger.debug("[purge] Finding new backward extremities")

# We calculate the new entries for the backward extremeties by finding
# all events that point to events that are to be purged
txn.execute(
Expand All @@ -2045,6 +2047,8 @@ def _delete_old_state_txn(self, txn, room_id, topological_ordering):
)
new_backwards_extrems = txn.fetchall()

logger.debug("[purge] replacing backward extremities: %r", new_backwards_extrems)

txn.execute(
"DELETE FROM event_backward_extremities WHERE room_id = ?",
(room_id,)
Expand All @@ -2059,6 +2063,8 @@ def _delete_old_state_txn(self, txn, room_id, topological_ordering):
]
)

logger.debug("[purge] finding redundant state groups")

# Get all state groups that are only referenced by events that are
# to be deleted.
txn.execute(
Expand All @@ -2074,15 +2080,19 @@ def _delete_old_state_txn(self, txn, room_id, topological_ordering):
)

state_rows = txn.fetchall()
state_groups_to_delete = [sg for sg, in state_rows]

# make a set of the redundant state groups, so that we can look them up
# efficiently
state_groups_to_delete = set([sg for sg, in state_rows])

# Now we get all the state groups that rely on these state groups
new_state_edges = []
chunks = [
state_groups_to_delete[i:i + 100]
for i in xrange(0, len(state_groups_to_delete), 100)
]
for chunk in chunks:
logger.debug("[purge] finding state groups which depend on redundant"
" state groups")
remaining_state_groups = []
for i in xrange(0, len(state_rows), 100):
chunk = [sg for sg, in state_rows[i:i + 100]]
# look for state groups whose prev_state_group is one we are about
# to delete
rows = self._simple_select_many_txn(
txn,
table="state_group_edges",
Expand All @@ -2091,29 +2101,36 @@ def _delete_old_state_txn(self, txn, room_id, topological_ordering):
retcols=["state_group"],
keyvalues={},
)
new_state_edges.extend(row["state_group"] for row in rows)
remaining_state_groups.extend(
row["state_group"] for row in rows

# exclude state groups we are about to delete: no point in
# updating them
if row["state_group"] not in state_groups_to_delete
)

# Now we turn the state groups that reference to-be-deleted state groups
# to non delta versions.
for new_state_edge in new_state_edges:
# Now we turn the state groups that reference to-be-deleted state
# groups to non delta versions.
for sg in remaining_state_groups:
logger.debug("[purge] de-delta-ing remaining state group %s", sg)
curr_state = self._get_state_groups_from_groups_txn(
txn, [new_state_edge], types=None
txn, [sg], types=None
)
curr_state = curr_state[new_state_edge]
curr_state = curr_state[sg]

self._simple_delete_txn(
txn,
table="state_groups_state",
keyvalues={
"state_group": new_state_edge,
"state_group": sg,
}
)

self._simple_delete_txn(
txn,
table="state_group_edges",
keyvalues={
"state_group": new_state_edge,
"state_group": sg,
}
)

Expand All @@ -2122,7 +2139,7 @@ def _delete_old_state_txn(self, txn, room_id, topological_ordering):
table="state_groups_state",
values=[
{
"state_group": new_state_edge,
"state_group": sg,
"room_id": room_id,
"type": key[0],
"state_key": key[1],
Expand All @@ -2132,6 +2149,7 @@ def _delete_old_state_txn(self, txn, room_id, topological_ordering):
],
)

logger.debug("[purge] removing redundant state groups")
txn.executemany(
"DELETE FROM state_groups_state WHERE state_group = ?",
state_rows
Expand All @@ -2140,12 +2158,15 @@ def _delete_old_state_txn(self, txn, room_id, topological_ordering):
"DELETE FROM state_groups WHERE id = ?",
state_rows
)

# Delete all non-state
logger.debug("[purge] removing events from event_to_state_groups")
txn.executemany(
"DELETE FROM event_to_state_groups WHERE event_id = ?",
[(event_id,) for event_id, _ in event_rows]
)

logger.debug("[purge] updating room_depth")
txn.execute(
"UPDATE room_depth SET min_depth = ? WHERE room_id = ?",
(topological_ordering, room_id,)
Expand All @@ -2171,16 +2192,15 @@ def _delete_old_state_txn(self, txn, room_id, topological_ordering):
"event_signatures",
"rejections",
):
logger.debug("[purge] removing non-state events from %s", table)

txn.executemany(
"DELETE FROM %s WHERE event_id = ?" % (table,),
to_delete
)

txn.executemany(
"DELETE FROM events WHERE event_id = ?",
to_delete
)
# Mark all state and own events as outliers
logger.debug("[purge] marking events as outliers")
txn.executemany(
"UPDATE events SET outlier = ?"
" WHERE event_id = ?",
Expand All @@ -2190,6 +2210,8 @@ def _delete_old_state_txn(self, txn, room_id, topological_ordering):
]
)

logger.debug("[purge] done")

@defer.inlineCallbacks
def is_event_after(self, event_id1, event_id2):
"""Returns True if event_id1 is after event_id2 in the stream
Expand Down