Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/9413.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve performance of fetching events from the database.
5 changes: 5 additions & 0 deletions synapse/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ class SQLBaseStore(metaclass=ABCMeta):
per data store (and not one per physical database).
"""

# if set to True, we will query the `event_json`, `rejections` and `state_events`
# tables when fetching event data. When False, we rely on it all being in the
# `events` table.
USE_EVENT_JSON = False

def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer"):
self.hs = hs
self._clock = hs.get_clock()
Expand Down
7 changes: 7 additions & 0 deletions synapse/storage/databases/main/censor_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,13 @@ def _censor_event_txn(self, txn, event_id, pruned_json):
updatevalues={"json": pruned_json},
)

self.db_pool.simple_update_one_txn(
txn,
table="events",
keyvalues={"event_id": event_id},
updatevalues={"json": pruned_json},
)

async def expire_event(self, event_id: str) -> None:
"""Retrieve and expire an event that has expired, and delete its associated
expiry timestamp. If the event can't be retrieved, delete its associated
Expand Down
123 changes: 90 additions & 33 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,17 +222,28 @@ async def _get_events_which_are_prevs(self, event_ids: Iterable[str]) -> List[st
results: List[str] = []

def _get_events_which_are_prevs_txn(txn, batch):
sql = """
SELECT prev_event_id, internal_metadata
FROM event_edges
INNER JOIN events USING (event_id)
LEFT JOIN rejections USING (event_id)
LEFT JOIN event_json USING (event_id)
WHERE
NOT events.outlier
AND rejections.event_id IS NULL
AND
"""
if self.store.USE_EVENT_JSON:
sql = """
SELECT prev_event_id, ej.internal_metadata
FROM event_edges
INNER JOIN events USING (event_id)
LEFT JOIN rejections USING (event_id)
LEFT JOIN event_json ej USING (event_id)
WHERE
NOT events.outlier
AND rejections.event_id IS NULL
AND
"""
else:
sql = """
SELECT prev_event_id, events.internal_metadata
FROM event_edges
INNER JOIN events USING (event_id)
WHERE
NOT events.outlier
AND events.rejection_reason IS NULL
AND
"""

clause, args = make_in_list_sql_clause(
self.database_engine, "prev_event_id", batch
Expand Down Expand Up @@ -274,20 +285,32 @@ async def _get_prevs_before_rejected(self, event_ids: Iterable[str]) -> Set[str]
def _get_prevs_before_rejected_txn(txn, batch):
to_recursively_check = batch

while to_recursively_check:
if self.store.USE_EVENT_JSON:
sql = """
SELECT
event_id, prev_event_id, internal_metadata,
event_id, prev_event_id, ej.internal_metadata,
rejections.event_id IS NOT NULL
FROM event_edges
INNER JOIN events USING (event_id)
LEFT JOIN rejections USING (event_id)
LEFT JOIN event_json USING (event_id)
LEFT JOIN event_json ej USING (event_id)
WHERE
NOT events.outlier
AND
"""
else:
sql = """
SELECT
event_id, prev_event_id, events.internal_metadata,
events.rejection_reason IS NOT NULL
FROM event_edges
INNER JOIN events USING (event_id)
WHERE
NOT events.outlier
AND
"""

while to_recursively_check:
clause, args = make_in_list_sql_clause(
self.database_engine, "event_id", to_recursively_check
)
Expand Down Expand Up @@ -478,6 +501,7 @@ def _persist_event_auth_chain_txn(
event_to_room_id = {e.event_id: e.room_id for e in state_events.values()}

self._add_chain_cover_index(
self.store.USE_EVENT_JSON,
txn,
self.db_pool,
self.store.event_chain_id_gen,
Expand All @@ -489,6 +513,7 @@ def _persist_event_auth_chain_txn(
@classmethod
def _add_chain_cover_index(
cls,
use_event_json: bool,
txn,
db_pool: DatabasePool,
event_chain_id_gen: SequenceGenerator,
Expand Down Expand Up @@ -558,14 +583,25 @@ def _add_chain_cover_index(

# We loop here in case we find an out of band membership and need to
# fetch their auth event info.
while missing_auth_chains:
if use_event_json:
sql = """
SELECT event_id, events.type, state_key, chain_id, sequence_number
SELECT event_id, events.type, se.state_key, chain_id, sequence_number
FROM events
INNER JOIN state_events USING (event_id)
INNER JOIN state_events se USING (event_id)
LEFT JOIN event_auth_chains USING (event_id)
WHERE
"""
else:
sql = """
SELECT event_id, events.type, events.state_key, chain_id, sequence_number
FROM events
LEFT JOIN event_auth_chains USING (event_id)
WHERE
events.state_key IS NOT NULL AND
"""

while missing_auth_chains:

clause, args = make_in_list_sql_clause(
txn.database_engine,
"event_id",
Expand Down Expand Up @@ -1277,6 +1313,9 @@ def _update_outliers_txn(self, txn, events_and_contexts):
sql = "UPDATE event_json SET internal_metadata = ? WHERE event_id = ?"
txn.execute(sql, (metadata_json, event.event_id))

sql = "UPDATE events SET internal_metadata = ?, outlier = ? WHERE event_id = ?"
txn.execute(sql, (metadata_json, False, event.event_id))

# Add an entry to the ex_outlier_stream table to replicate the
# change in outlier status to our workers.
stream_order = event.internal_metadata.stream_ordering
Expand All @@ -1292,23 +1331,23 @@ def _update_outliers_txn(self, txn, events_and_contexts):
},
)

sql = "UPDATE events SET outlier = ? WHERE event_id = ?"
txn.execute(sql, (False, event.event_id))

# Update the event_backward_extremities table now that this
# event isn't an outlier any more.
self._update_backward_extremeties(txn, [event])

return [ec for ec in events_and_contexts if ec[0] not in to_remove]

def _store_event_txn(self, txn, events_and_contexts):
def _store_event_txn(
self,
txn: LoggingTransaction,
events_and_contexts: Iterable[Tuple[EventBase, EventContext]],
):
"""Insert new events into the event, event_json, redaction and
state_events tables.

Args:
txn (twisted.enterprise.adbapi.Connection): db connection
events_and_contexts (list[(EventBase, EventContext)]): events
we are persisting
txn: db connection
events_and_contexts: events we are persisting
"""

if not events_and_contexts:
Expand Down Expand Up @@ -1371,8 +1410,15 @@ def get_internal_metadata(event):
"contains_url": (
"url" in event.content and isinstance(event.content["url"], str)
),
"internal_metadata": json_encoder.encode(
event.internal_metadata.get_dict()
),
"json": json_encoder.encode(event_dict(event)),
"format_version": event.format_version,
"state_key": event.get("state_key"),
"rejection_reason": ctx.rejected or None,
}
for event, _ in events_and_contexts
for event, ctx in events_and_contexts
],
)

Expand Down Expand Up @@ -1406,8 +1452,9 @@ def get_internal_metadata(event):
}

# TODO: How does this work with backfilling?
# richvdh: pretty sure it's never used.
if hasattr(event, "replaces_state"):
vals["prev_state"] = event.replaces_state
vals["prev_state"] = event.replaces_state # type: ignore

state_values.append(vals)

Expand Down Expand Up @@ -1540,13 +1587,7 @@ def _update_metadata_tables_txn(
def _add_to_cache(self, txn, events_and_contexts):
to_prefill = []

rows = []
N = 200
for i in range(0, len(events_and_contexts), N):
ev_map = {e[0].event_id: e[0] for e in events_and_contexts[i : i + N]}
if not ev_map:
break

if self.store.USE_EVENT_JSON:
sql = (
"SELECT "
" e.event_id as event_id, "
Expand All @@ -1557,6 +1598,22 @@ def _add_to_cache(self, txn, events_and_contexts):
" LEFT JOIN redactions as r ON e.event_id = r.redacts"
" WHERE "
)
else:
sql = (
"SELECT "
" e.event_id as event_id, "
" r.redacts as redacts,"
" e.rejection_reason as rejects "
" FROM events as e"
" LEFT JOIN redactions as r ON e.event_id = r.redacts"
" WHERE "
)

N = 200
for i in range(0, len(events_and_contexts), N):
ev_map = {e[0].event_id: e[0] for e in events_and_contexts[i : i + N]}
if not ev_map:
break

clause, args = make_in_list_sql_clause(
self.database_engine, "e.event_id", list(ev_map)
Expand Down
Loading