From d0669487ce35d26b59d9b5f0e4f18c36c71d8996 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 15 Feb 2021 12:34:18 +0000 Subject: [PATCH 1/5] Disambiguate queries --- synapse/storage/databases/main/events.py | 12 ++++++------ .../storage/databases/main/events_bg_updates.py | 13 +++++++------ synapse/storage/databases/main/events_worker.py | 16 ++++++++-------- synapse/storage/databases/main/purge_events.py | 2 +- synapse/storage/databases/main/room.py | 4 ++-- synapse/storage/databases/main/roommember.py | 4 ++-- 6 files changed, 26 insertions(+), 25 deletions(-) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 86baf397fbda..f8c312e97e55 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -223,11 +223,11 @@ async def _get_events_which_are_prevs(self, event_ids: Iterable[str]) -> List[st def _get_events_which_are_prevs_txn(txn, batch): sql = """ - SELECT prev_event_id, internal_metadata + SELECT prev_event_id, ej.internal_metadata FROM event_edges INNER JOIN events USING (event_id) LEFT JOIN rejections USING (event_id) - LEFT JOIN event_json USING (event_id) + LEFT JOIN event_json ej USING (event_id) WHERE NOT events.outlier AND rejections.event_id IS NULL @@ -277,12 +277,12 @@ def _get_prevs_before_rejected_txn(txn, batch): while to_recursively_check: sql = """ SELECT - event_id, prev_event_id, internal_metadata, + event_id, prev_event_id, ej.internal_metadata, rejections.event_id IS NOT NULL FROM event_edges INNER JOIN events USING (event_id) LEFT JOIN rejections USING (event_id) - LEFT JOIN event_json USING (event_id) + LEFT JOIN event_json ej USING (event_id) WHERE NOT events.outlier AND @@ -560,9 +560,9 @@ def _add_chain_cover_index( # fetch their auth event info. while missing_auth_chains: sql = """ - SELECT event_id, events.type, state_key, chain_id, sequence_number + SELECT event_id, events.type, se.state_key, chain_id, sequence_number FROM events - INNER JOIN state_events USING (event_id) + INNER JOIN state_events se USING (event_id) LEFT JOIN event_auth_chains USING (event_id) WHERE """ diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index 6fcb2b8353aa..aec8cdce21f4 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -399,8 +399,9 @@ def _cleanup_extremities_bg_update_txn(txn): # First, we get `batch_size` events from the table, pulling out # their successor events, if any, and the successor events' # rejection status. + txn.execute( - """SELECT prev_event_id, event_id, internal_metadata, + """SELECT prev_event_id, event_id, ej.internal_metadata, rejections.event_id IS NOT NULL, events.outlier FROM ( SELECT event_id AS prev_event_id @@ -409,7 +410,7 @@ def _cleanup_extremities_bg_update_txn(txn): ) AS f LEFT JOIN event_edges USING (prev_event_id) LEFT JOIN events USING (event_id) - LEFT JOIN event_json USING (event_id) + LEFT JOIN event_json ej USING (event_id) LEFT JOIN rejections USING (event_id) """, (batch_size,), @@ -444,11 +445,11 @@ def _cleanup_extremities_bg_update_txn(txn): to_check, to_defer = batch[:100], batch[100:] soft_failed_events_to_lookup = set(to_defer) - sql = """SELECT prev_event_id, event_id, internal_metadata, + sql = """SELECT prev_event_id, event_id, ej.internal_metadata, rejections.event_id IS NOT NULL FROM event_edges INNER JOIN events USING (event_id) - INNER JOIN event_json USING (event_id) + INNER JOIN event_json ej USING (event_id) LEFT JOIN rejections USING (event_id) WHERE NOT events.outlier @@ -710,11 +711,11 @@ def get_rejected_events( SELECT DISTINCT event_id, COALESCE(room_version, '1'), - json, + ej.json, state_events.event_id IS NOT NULL, event_auth.event_id IS NOT NULL FROM rejections - INNER JOIN event_json USING (event_id) + INNER JOIN event_json ej USING (event_id) LEFT JOIN rooms USING (room_id) LEFT JOIN state_events USING (event_id) LEFT JOIN event_auth USING (event_id) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 3c86adab5650..dce951c013ae 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1183,10 +1183,10 @@ async def get_all_new_forward_event_rows( def get_all_new_forward_event_rows(txn): sql = ( "SELECT e.stream_ordering, e.event_id, e.room_id, e.type," - " state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL" + " se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL" " FROM events AS e" " LEFT JOIN redactions USING (event_id)" - " LEFT JOIN state_events USING (event_id)" + " LEFT JOIN state_events se USING (event_id)" " LEFT JOIN event_relations USING (event_id)" " LEFT JOIN room_memberships USING (event_id)" " LEFT JOIN rejections USING (event_id)" @@ -1220,11 +1220,11 @@ async def get_ex_outlier_stream_rows( def get_ex_outlier_stream_rows_txn(txn): sql = ( "SELECT event_stream_ordering, e.event_id, e.room_id, e.type," - " state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL" + " se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL" " FROM events AS e" " INNER JOIN ex_outlier_stream AS out USING (event_id)" " LEFT JOIN redactions USING (event_id)" - " LEFT JOIN state_events USING (event_id)" + " LEFT JOIN state_events se USING (event_id)" " LEFT JOIN event_relations USING (event_id)" " LEFT JOIN room_memberships USING (event_id)" " LEFT JOIN rejections USING (event_id)" @@ -1274,10 +1274,10 @@ async def get_all_new_backfill_event_rows( def get_all_new_backfill_event_rows(txn): sql = ( "SELECT -e.stream_ordering, e.event_id, e.room_id, e.type," - " state_key, redacts, relates_to_id" + " se.state_key, redacts, relates_to_id" " FROM events AS e" " LEFT JOIN redactions USING (event_id)" - " LEFT JOIN state_events USING (event_id)" + " LEFT JOIN state_events se USING (event_id)" " LEFT JOIN event_relations USING (event_id)" " WHERE ? > stream_ordering AND stream_ordering >= ?" " AND instance_name = ?" @@ -1296,11 +1296,11 @@ def get_all_new_backfill_event_rows(txn): sql = ( "SELECT -event_stream_ordering, e.event_id, e.room_id, e.type," - " state_key, redacts, relates_to_id" + " se.state_key, redacts, relates_to_id" " FROM events AS e" " INNER JOIN ex_outlier_stream AS out USING (event_id)" " LEFT JOIN redactions USING (event_id)" - " LEFT JOIN state_events USING (event_id)" + " LEFT JOIN state_events se USING (event_id)" " LEFT JOIN event_relations USING (event_id)" " WHERE ? > event_stream_ordering" " AND event_stream_ordering >= ?" diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py index 664c65dac5a6..81679b23d5ff 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py @@ -114,7 +114,7 @@ def _purge_history_txn( logger.info("[purge] looking for events to delete") - should_delete_expr = "state_key IS NULL" + should_delete_expr = "state_events.state_key IS NULL" should_delete_params: Tuple[Any, ...] = () if not delete_local_events: should_delete_expr += " AND event_id NOT LIKE ?" diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 443e5f331545..5dd464b15623 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -720,8 +720,8 @@ def _get_media_mxcs_in_room_txn(self, txn, room_id): the hostname and the value is the media ID. """ sql = """ - SELECT stream_ordering, json FROM events - JOIN event_json USING (room_id, event_id) + SELECT stream_ordering, ej.json FROM events + JOIN event_json ej USING (room_id, event_id) WHERE room_id = ? %(where_clause)s AND contains_url = ? AND outlier = ? diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 68f1b40ea693..6dd9a55443af 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -466,7 +466,7 @@ def _get_rooms_for_user_with_stream_ordering_txn( INNER JOIN events AS e USING (room_id, event_id) WHERE c.type = 'm.room.member' - AND state_key = ? + AND c.state_key = ? AND c.membership = ? """ else: @@ -477,7 +477,7 @@ def _get_rooms_for_user_with_stream_ordering_txn( INNER JOIN events AS e USING (room_id, event_id) WHERE c.type = 'm.room.member' - AND state_key = ? + AND c.state_key = ? AND m.membership = ? """ From f6c5f1a6585024245fba4870ef2fa19651260577 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 15 Feb 2021 12:34:48 +0000 Subject: [PATCH 2/5] Store additional data in `events` Add some new columns to the `events` table, and begin populating them for new events. --- .../storage/databases/main/censor_events.py | 7 +++++ synapse/storage/databases/main/events.py | 29 +++++++++++++------ .../main/delta/59/08new_events_columns.sql | 29 +++++++++++++++++++ 3 files changed, 56 insertions(+), 9 deletions(-) create mode 100644 synapse/storage/schema/main/delta/59/08new_events_columns.sql diff --git a/synapse/storage/databases/main/censor_events.py b/synapse/storage/databases/main/censor_events.py index f22c1f241b65..ec2d17f14993 100644 --- a/synapse/storage/databases/main/censor_events.py +++ b/synapse/storage/databases/main/censor_events.py @@ -144,6 +144,13 @@ def _censor_event_txn(self, txn, event_id, pruned_json): updatevalues={"json": pruned_json}, ) + self.db_pool.simple_update_one_txn( + txn, + table="events", + keyvalues={"event_id": event_id}, + updatevalues={"json": pruned_json}, + ) + async def expire_event(self, event_id: str) -> None: """Retrieve and expire an event that has expired, and delete its associated expiry timestamp. If the event can't be retrieved, delete its associated diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index f8c312e97e55..daafb6cbe490 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1277,6 +1277,9 @@ def _update_outliers_txn(self, txn, events_and_contexts): sql = "UPDATE event_json SET internal_metadata = ? WHERE event_id = ?" txn.execute(sql, (metadata_json, event.event_id)) + sql = "UPDATE events SET internal_metadata = ?, outlier = ? WHERE event_id = ?" + txn.execute(sql, (metadata_json, False, event.event_id)) + # Add an entry to the ex_outlier_stream table to replicate the # change in outlier status to our workers. stream_order = event.internal_metadata.stream_ordering @@ -1292,23 +1295,23 @@ def _update_outliers_txn(self, txn, events_and_contexts): }, ) - sql = "UPDATE events SET outlier = ? WHERE event_id = ?" - txn.execute(sql, (False, event.event_id)) - # Update the event_backward_extremities table now that this # event isn't an outlier any more. self._update_backward_extremeties(txn, [event]) return [ec for ec in events_and_contexts if ec[0] not in to_remove] - def _store_event_txn(self, txn, events_and_contexts): + def _store_event_txn( + self, + txn: LoggingTransaction, + events_and_contexts: Iterable[Tuple[EventBase, EventContext]], + ): """Insert new events into the event, event_json, redaction and state_events tables. Args: - txn (twisted.enterprise.adbapi.Connection): db connection - events_and_contexts (list[(EventBase, EventContext)]): events - we are persisting + txn: db connection + events_and_contexts: events we are persisting """ if not events_and_contexts: @@ -1371,8 +1374,15 @@ def get_internal_metadata(event): "contains_url": ( "url" in event.content and isinstance(event.content["url"], str) ), + "internal_metadata": json_encoder.encode( + event.internal_metadata.get_dict() + ), + "json": json_encoder.encode(event_dict(event)), + "format_version": event.format_version, + "state_key": event.get("state_key"), + "rejection_reason": ctx.rejected or None, } - for event, _ in events_and_contexts + for event, ctx in events_and_contexts ], ) @@ -1406,8 +1416,9 @@ def get_internal_metadata(event): } # TODO: How does this work with backfilling? + # richvdh: pretty sure it's never used. if hasattr(event, "replaces_state"): - vals["prev_state"] = event.replaces_state + vals["prev_state"] = event.replaces_state # type: ignore state_values.append(vals) diff --git a/synapse/storage/schema/main/delta/59/08new_events_columns.sql b/synapse/storage/schema/main/delta/59/08new_events_columns.sql new file mode 100644 index 000000000000..a690609f7ce2 --- /dev/null +++ b/synapse/storage/schema/main/delta/59/08new_events_columns.sql @@ -0,0 +1,29 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE events ADD COLUMN internal_metadata TEXT NOT NULL DEFAULT ''; +ALTER TABLE events ADD COLUMN json TEXT NOT NULL DEFAULT ''; +ALTER TABLE events ADD COLUMN format_version INTEGER; +ALTER TABLE events ADD COLUMN state_key TEXT; +ALTER TABLE events ADD COLUMN rejection_reason TEXT; + +/* +for s in `seq -240000 10000 7530000`; do + date; echo $s; + psql synapse -c "update events e set json=ej.json, internal_metadata=ej.internal_metadata, format_version=ej.format_version, state_key = se.state_key, rejection_reason=rej.reason + from event_json ej left join state_events se using (event_id) left join rejections rej using (event_id) where ej.event_id=e.event_id and e.stream_ordering between $s and ($s+9999)"; +done +*/ + From ce4fdb6c39c482dbeed00f7d573255ccef737d90 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 15 Feb 2021 14:03:27 +0000 Subject: [PATCH 3/5] Drop reads of state_events, events_json and rejections --- synapse/storage/_base.py | 5 + synapse/storage/databases/main/events.py | 86 ++++++-- .../databases/main/events_bg_updates.py | 100 +++++++--- .../storage/databases/main/events_worker.py | 183 ++++++++++++------ .../storage/databases/main/purge_events.py | 27 ++- synapse/storage/databases/main/rejections.py | 23 ++- synapse/storage/databases/main/room.py | 29 ++- 7 files changed, 329 insertions(+), 124 deletions(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 0623da9aa196..0dfdf90c728e 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -38,6 +38,11 @@ class SQLBaseStore(metaclass=ABCMeta): per data store (and not one per physical database). """ + # if set to True, we will query the `event_json`, `rejections` and `state_events` + # tables when fetching event data. When False, we rely on it all being in the + # `events` table. + USE_EVENT_JSON = False + def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer"): self.hs = hs self._clock = hs.get_clock() diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index daafb6cbe490..cce6a13e7d3d 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -222,17 +222,28 @@ async def _get_events_which_are_prevs(self, event_ids: Iterable[str]) -> List[st results: List[str] = [] def _get_events_which_are_prevs_txn(txn, batch): - sql = """ - SELECT prev_event_id, ej.internal_metadata - FROM event_edges - INNER JOIN events USING (event_id) - LEFT JOIN rejections USING (event_id) - LEFT JOIN event_json ej USING (event_id) - WHERE - NOT events.outlier - AND rejections.event_id IS NULL - AND - """ + if self.store.USE_EVENT_JSON: + sql = """ + SELECT prev_event_id, ej.internal_metadata + FROM event_edges + INNER JOIN events USING (event_id) + LEFT JOIN rejections USING (event_id) + LEFT JOIN event_json ej USING (event_id) + WHERE + NOT events.outlier + AND rejections.event_id IS NULL + AND + """ + else: + sql = """ + SELECT prev_event_id, events.internal_metadata + FROM event_edges + INNER JOIN events USING (event_id) + WHERE + NOT events.outlier + AND events.rejection_reason IS NULL + AND + """ clause, args = make_in_list_sql_clause( self.database_engine, "prev_event_id", batch @@ -274,7 +285,7 @@ async def _get_prevs_before_rejected(self, event_ids: Iterable[str]) -> Set[str] def _get_prevs_before_rejected_txn(txn, batch): to_recursively_check = batch - while to_recursively_check: + if self.store.USE_EVENT_JSON: sql = """ SELECT event_id, prev_event_id, ej.internal_metadata, @@ -287,7 +298,19 @@ def _get_prevs_before_rejected_txn(txn, batch): NOT events.outlier AND """ + else: + sql = """ + SELECT + event_id, prev_event_id, events.internal_metadata, + events.rejection_reason IS NOT NULL + FROM event_edges + INNER JOIN events USING (event_id) + WHERE + NOT events.outlier + AND + """ + while to_recursively_check: clause, args = make_in_list_sql_clause( self.database_engine, "event_id", to_recursively_check ) @@ -478,6 +501,7 @@ def _persist_event_auth_chain_txn( event_to_room_id = {e.event_id: e.room_id for e in state_events.values()} self._add_chain_cover_index( + self.store.USE_EVENT_JSON, txn, self.db_pool, self.store.event_chain_id_gen, @@ -489,6 +513,7 @@ def _persist_event_auth_chain_txn( @classmethod def _add_chain_cover_index( cls, + use_event_json: bool, txn, db_pool: DatabasePool, event_chain_id_gen: SequenceGenerator, @@ -558,7 +583,7 @@ def _add_chain_cover_index( # We loop here in case we find an out of band membership and need to # fetch their auth event info. - while missing_auth_chains: + if use_event_json: sql = """ SELECT event_id, events.type, se.state_key, chain_id, sequence_number FROM events @@ -566,6 +591,17 @@ def _add_chain_cover_index( LEFT JOIN event_auth_chains USING (event_id) WHERE """ + else: + sql = """ + SELECT event_id, events.type, events.state_key, chain_id, sequence_number + FROM events + LEFT JOIN event_auth_chains USING (event_id) + WHERE + events.state_key IS NOT NULL AND + """ + + while missing_auth_chains: + clause, args = make_in_list_sql_clause( txn.database_engine, "event_id", @@ -1551,13 +1587,7 @@ def _update_metadata_tables_txn( def _add_to_cache(self, txn, events_and_contexts): to_prefill = [] - rows = [] - N = 200 - for i in range(0, len(events_and_contexts), N): - ev_map = {e[0].event_id: e[0] for e in events_and_contexts[i : i + N]} - if not ev_map: - break - + if self.store.USE_EVENT_JSON: sql = ( "SELECT " " e.event_id as event_id, " @@ -1568,6 +1598,22 @@ def _add_to_cache(self, txn, events_and_contexts): " LEFT JOIN redactions as r ON e.event_id = r.redacts" " WHERE " ) + else: + sql = ( + "SELECT " + " e.event_id as event_id, " + " r.redacts as redacts," + " e.rejection_reason as rejects " + " FROM events as e" + " LEFT JOIN redactions as r ON e.event_id = r.redacts" + " WHERE " + ) + + N = 200 + for i in range(0, len(events_and_contexts), N): + ev_map = {e[0].event_id: e[0] for e in events_and_contexts[i : i + N]} + if not ev_map: + break clause, args = make_in_list_sql_clause( self.database_engine, "e.event_id", list(ev_map) diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index aec8cdce21f4..3040f7467504 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -400,8 +400,8 @@ def _cleanup_extremities_bg_update_txn(txn): # their successor events, if any, and the successor events' # rejection status. - txn.execute( - """SELECT prev_event_id, event_id, ej.internal_metadata, + if self.USE_EVENT_JSON: + sql = """SELECT prev_event_id, event_id, ej.internal_metadata, rejections.event_id IS NOT NULL, events.outlier FROM ( SELECT event_id AS prev_event_id @@ -412,8 +412,21 @@ def _cleanup_extremities_bg_update_txn(txn): LEFT JOIN events USING (event_id) LEFT JOIN event_json ej USING (event_id) LEFT JOIN rejections USING (event_id) - """, - (batch_size,), + """ + else: + sql = """SELECT prev_event_id, event_id, events.internal_metadata, + events.rejection_reason IS NOT NULL, events.outlier + FROM ( + SELECT event_id AS prev_event_id + FROM _extremities_to_check + LIMIT ? + ) AS f + LEFT JOIN event_edges USING (prev_event_id) + LEFT JOIN events USING (event_id) + """ + + txn.execute( + sql, (batch_size,), ) for prev_event_id, event_id, metadata, rejected, outlier in txn: @@ -438,13 +451,8 @@ def _cleanup_extremities_bg_update_txn(txn): # Now we recursively check all the soft-failed descendants we # found above in the same way, until we have nothing left to # check. - while soft_failed_events_to_lookup: - # We only want to do 100 at a time, so we split given list - # into two. - batch = list(soft_failed_events_to_lookup) - to_check, to_defer = batch[:100], batch[100:] - soft_failed_events_to_lookup = set(to_defer) + if self.USE_EVENT_JSON: sql = """SELECT prev_event_id, event_id, ej.internal_metadata, rejections.event_id IS NOT NULL FROM event_edges @@ -455,6 +463,23 @@ def _cleanup_extremities_bg_update_txn(txn): NOT events.outlier AND """ + else: + sql = """SELECT prev_event_id, event_id, events.internal_metadata, + events.rejection_reason IS NOT NULL + FROM event_edges + INNER JOIN events USING (event_id) + WHERE + NOT events.outlier + AND + """ + + while soft_failed_events_to_lookup: + # We only want to do 100 at a time, so we split given list + # into two. + batch = list(soft_failed_events_to_lookup) + to_check, to_defer = batch[:100], batch[100:] + soft_failed_events_to_lookup = set(to_defer) + clause, args = make_in_list_sql_clause( self.database_engine, "prev_event_id", to_check ) @@ -928,22 +953,42 @@ def _calculate_chain_cover_txn( extra_clause = "AND events.room_id = ?" tuple_args.append(last_room_id) - sql = """ - SELECT - event_id, state_events.type, state_events.state_key, - topological_ordering, stream_ordering, - events.room_id - FROM events - INNER JOIN state_events USING (event_id) - LEFT JOIN event_auth_chains USING (event_id) - LEFT JOIN event_auth_chain_to_calculate USING (event_id) - WHERE event_auth_chains.event_id IS NULL - AND event_auth_chain_to_calculate.event_id IS NULL - AND %(tuple_cmp)s - %(extra)s - ORDER BY events.room_id, topological_ordering, stream_ordering - %(limit)s - """ % { + if self.USE_EVENT_JSON: + sqlf = """ + SELECT + event_id, state_events.type, state_events.state_key, + topological_ordering, stream_ordering, + events.room_id + FROM events + INNER JOIN state_events USING (event_id) + LEFT JOIN event_auth_chains USING (event_id) + LEFT JOIN event_auth_chain_to_calculate USING (event_id) + WHERE event_auth_chains.event_id IS NULL + AND event_auth_chain_to_calculate.event_id IS NULL + AND %(tuple_cmp)s + %(extra)s + ORDER BY events.room_id, topological_ordering, stream_ordering + %(limit)s + """ + else: + sqlf = """ + SELECT + event_id, events.type, events.state_key, + topological_ordering, stream_ordering, + events.room_id + FROM events + LEFT JOIN event_auth_chains USING (event_id) + LEFT JOIN event_auth_chain_to_calculate USING (event_id) + WHERE events.state_key IS NOT NULL + AND event_auth_chains.event_id IS NULL + AND event_auth_chain_to_calculate.event_id IS NULL + AND %(tuple_cmp)s + %(extra)s + ORDER BY events.room_id, topological_ordering, stream_ordering + %(limit)s + """ + + sql = sqlf % { "tuple_cmp": tuple_clause, "limit": "LIMIT ?" if batch_size is not None else "", "extra": extra_clause, @@ -996,9 +1041,10 @@ def _calculate_chain_cover_txn( # Calculate and persist the chain cover index for this set of events. # - # Annoyingly we need to gut wrench into the persit event store so that + # Annoyingly we need to gut wrench into the persist event store so that # we can reuse the function to calculate the chain cover for rooms. PersistEventsStore._add_chain_cover_index( + self.USE_EVENT_JSON, txn, self.db_pool, self.event_chain_id_gen, diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index dce951c013ae..f389ec3f3cd4 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -905,7 +905,8 @@ def _fetch_event_rows(self, txn, event_ids): Dict[str, Dict]: a map from event id to event info. """ event_dict = {} - for evs in batch_iter(event_ids, 200): + + if self.USE_EVENT_JSON: sql = """\ SELECT e.event_id, @@ -921,7 +922,22 @@ def _fetch_event_rows(self, txn, event_ids): LEFT JOIN rooms r ON r.room_id = e.room_id LEFT JOIN rejections as rej USING (event_id) WHERE """ + else: + sql = """\ + SELECT + e.event_id, + e.stream_ordering, + e.internal_metadata, + e.json, + e.format_version, + r.room_version, + e.rejection_reason, + e.outlier + FROM events AS e + LEFT JOIN rooms r ON r.room_id = e.room_id + WHERE """ + for evs in batch_iter(event_ids, 200): clause, args = make_in_list_sql_clause( txn.database_engine, "e.event_id", evs ) @@ -1181,20 +1197,35 @@ async def get_all_new_forward_event_rows( """ def get_all_new_forward_event_rows(txn): - sql = ( - "SELECT e.stream_ordering, e.event_id, e.room_id, e.type," - " se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL" - " FROM events AS e" - " LEFT JOIN redactions USING (event_id)" - " LEFT JOIN state_events se USING (event_id)" - " LEFT JOIN event_relations USING (event_id)" - " LEFT JOIN room_memberships USING (event_id)" - " LEFT JOIN rejections USING (event_id)" - " WHERE ? < stream_ordering AND stream_ordering <= ?" - " AND instance_name = ?" - " ORDER BY stream_ordering ASC" - " LIMIT ?" - ) + if self.USE_EVENT_JSON: + sql = ( + "SELECT e.stream_ordering, e.event_id, e.room_id, e.type, se.state_key," + " redacts, relates_to_id, membership, rejections.reason IS NOT NULL" + " FROM events AS e" + " LEFT JOIN redactions USING (event_id)" + " LEFT JOIN state_events se USING (event_id)" + " LEFT JOIN event_relations USING (event_id)" + " LEFT JOIN room_memberships USING (event_id)" + " LEFT JOIN rejections USING (event_id)" + " WHERE ? < stream_ordering AND stream_ordering <= ?" + " AND instance_name = ?" + " ORDER BY stream_ordering ASC" + " LIMIT ?" + ) + else: + sql = ( + "SELECT e.stream_ordering, e.event_id, e.room_id, e.type, e.state_key," + " redacts, relates_to_id, membership, e.rejection_reason IS NOT NULL" + " FROM events AS e" + " LEFT JOIN redactions USING (event_id)" + " LEFT JOIN event_relations USING (event_id)" + " LEFT JOIN room_memberships USING (event_id)" + " WHERE ? < stream_ordering AND stream_ordering <= ?" + " AND instance_name = ?" + " ORDER BY stream_ordering ASC" + " LIMIT ?" + ) + txn.execute(sql, (last_id, current_id, instance_name, limit)) return txn.fetchall() @@ -1218,21 +1249,36 @@ async def get_ex_outlier_stream_rows( """ def get_ex_outlier_stream_rows_txn(txn): - sql = ( - "SELECT event_stream_ordering, e.event_id, e.room_id, e.type," - " se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL" - " FROM events AS e" - " INNER JOIN ex_outlier_stream AS out USING (event_id)" - " LEFT JOIN redactions USING (event_id)" - " LEFT JOIN state_events se USING (event_id)" - " LEFT JOIN event_relations USING (event_id)" - " LEFT JOIN room_memberships USING (event_id)" - " LEFT JOIN rejections USING (event_id)" - " WHERE ? < event_stream_ordering" - " AND event_stream_ordering <= ?" - " AND out.instance_name = ?" - " ORDER BY event_stream_ordering ASC" - ) + if self.USE_EVENT_JSON: + sql = ( + "SELECT event_stream_ordering, e.event_id, e.room_id, e.type, se.state_key," + " redacts, relates_to_id, membership, rejections.reason IS NOT NULL" + " FROM events AS e" + " INNER JOIN ex_outlier_stream AS out USING (event_id)" + " LEFT JOIN redactions USING (event_id)" + " LEFT JOIN state_events se USING (event_id)" + " LEFT JOIN event_relations USING (event_id)" + " LEFT JOIN room_memberships USING (event_id)" + " LEFT JOIN rejections USING (event_id)" + " WHERE ? < event_stream_ordering" + " AND event_stream_ordering <= ?" + " AND out.instance_name = ?" + " ORDER BY event_stream_ordering ASC" + ) + else: + sql = ( + "SELECT event_stream_ordering, e.event_id, e.room_id, e.type, e.state_key," + " redacts, relates_to_id, membership, e.rejection_reason IS NOT NULL" + " FROM events AS e" + " INNER JOIN ex_outlier_stream AS out USING (event_id)" + " LEFT JOIN redactions USING (event_id)" + " LEFT JOIN event_relations USING (event_id)" + " LEFT JOIN room_memberships USING (event_id)" + " WHERE ? < event_stream_ordering" + " AND event_stream_ordering <= ?" + " AND out.instance_name = ?" + " ORDER BY event_stream_ordering ASC" + ) txn.execute(sql, (last_id, current_id, instance_name)) return txn.fetchall() @@ -1272,18 +1318,31 @@ async def get_all_new_backfill_event_rows( return [], current_id, False def get_all_new_backfill_event_rows(txn): - sql = ( - "SELECT -e.stream_ordering, e.event_id, e.room_id, e.type," - " se.state_key, redacts, relates_to_id" - " FROM events AS e" - " LEFT JOIN redactions USING (event_id)" - " LEFT JOIN state_events se USING (event_id)" - " LEFT JOIN event_relations USING (event_id)" - " WHERE ? > stream_ordering AND stream_ordering >= ?" - " AND instance_name = ?" - " ORDER BY stream_ordering ASC" - " LIMIT ?" - ) + if self.USE_EVENT_JSON: + sql = ( + "SELECT -e.stream_ordering, e.event_id, e.room_id, e.type, se.state_key," + " redacts, relates_to_id" + " FROM events AS e" + " LEFT JOIN redactions USING (event_id)" + " LEFT JOIN state_events se USING (event_id)" + " LEFT JOIN event_relations USING (event_id)" + " WHERE ? > stream_ordering AND stream_ordering >= ?" + " AND instance_name = ?" + " ORDER BY stream_ordering ASC" + " LIMIT ?" + ) + else: + sql = ( + "SELECT -e.stream_ordering, e.event_id, e.room_id, e.type, e.state_key," + " redacts, relates_to_id" + " FROM events AS e" + " LEFT JOIN redactions USING (event_id)" + " LEFT JOIN event_relations USING (event_id)" + " WHERE ? > stream_ordering AND stream_ordering >= ?" + " AND instance_name = ?" + " ORDER BY stream_ordering ASC" + " LIMIT ?" + ) txn.execute(sql, (-last_id, -current_id, instance_name, limit)) new_event_updates = [(row[0], row[1:]) for row in txn] @@ -1294,19 +1353,33 @@ def get_all_new_backfill_event_rows(txn): else: upper_bound = current_id - sql = ( - "SELECT -event_stream_ordering, e.event_id, e.room_id, e.type," - " se.state_key, redacts, relates_to_id" - " FROM events AS e" - " INNER JOIN ex_outlier_stream AS out USING (event_id)" - " LEFT JOIN redactions USING (event_id)" - " LEFT JOIN state_events se USING (event_id)" - " LEFT JOIN event_relations USING (event_id)" - " WHERE ? > event_stream_ordering" - " AND event_stream_ordering >= ?" - " AND out.instance_name = ?" - " ORDER BY event_stream_ordering DESC" - ) + if self.USE_EVENT_JSON: + sql = ( + "SELECT -event_stream_ordering, e.event_id, e.room_id, e.type, se.state_key," + + " redacts, relates_to_id" + " FROM events AS e" + " INNER JOIN ex_outlier_stream AS out USING (event_id)" + " LEFT JOIN redactions USING (event_id)" + " LEFT JOIN state_events se USING (event_id)" + " LEFT JOIN event_relations USING (event_id)" + " WHERE ? > event_stream_ordering" + " AND event_stream_ordering >= ?" + " AND out.instance_name = ?" + " ORDER BY event_stream_ordering DESC" + ) + else: + sql = ( + "SELECT -event_stream_ordering, e.event_id, e.room_id, e.type, e.state_key," + " redacts, relates_to_id" + " FROM events AS e" + " INNER JOIN ex_outlier_stream AS out USING (event_id)" + " LEFT JOIN redactions USING (event_id)" + " LEFT JOIN event_relations USING (event_id)" + " WHERE ? > event_stream_ordering" + " AND event_stream_ordering >= ?" + " AND out.instance_name = ?" + " ORDER BY event_stream_ordering DESC" + ) txn.execute(sql, (-last_id, -upper_bound, instance_name)) new_event_updates.extend((row[0], row[1:]) for row in txn) diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py index 81679b23d5ff..56c1c19f0127 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py @@ -114,7 +114,11 @@ def _purge_history_txn( logger.info("[purge] looking for events to delete") - should_delete_expr = "state_events.state_key IS NULL" + should_delete_expr = ( + "state_events.state_key IS NULL" + if self.USE_EVENT_JSON + else "e.state_key IS NULL" + ) should_delete_params: Tuple[Any, ...] = () if not delete_local_events: should_delete_expr += " AND event_id NOT LIKE ?" @@ -126,12 +130,23 @@ def _purge_history_txn( # Note that we insert events that are outliers and aren't going to be # deleted, as nothing will happen to them. + if self.USE_EVENT_JSON: + sqlf = """ + INSERT INTO events_to_purge + SELECT event_id, %s + FROM events AS e LEFT JOIN state_events USING (event_id) + WHERE (NOT outlier OR (%s)) AND e.room_id = ? AND topological_ordering < ? + """ + else: + sqlf = """ + INSERT INTO events_to_purge + SELECT event_id, %s + FROM events AS e + WHERE (NOT outlier OR (%s)) AND e.room_id = ? AND topological_ordering < ? + """ + txn.execute( - "INSERT INTO events_to_purge" - " SELECT event_id, %s" - " FROM events AS e LEFT JOIN state_events USING (event_id)" - " WHERE (NOT outlier OR (%s)) AND e.room_id = ? AND topological_ordering < ?" - % (should_delete_expr, should_delete_expr), + sqlf % (should_delete_expr, should_delete_expr), should_delete_params, ) diff --git a/synapse/storage/databases/main/rejections.py b/synapse/storage/databases/main/rejections.py index 167318b314ae..7f1120e6ca87 100644 --- a/synapse/storage/databases/main/rejections.py +++ b/synapse/storage/databases/main/rejections.py @@ -22,10 +22,19 @@ class RejectionsStore(SQLBaseStore): async def get_rejection_reason(self, event_id: str) -> Optional[str]: - return await self.db_pool.simple_select_one_onecol( - table="rejections", - retcol="reason", - keyvalues={"event_id": event_id}, - allow_none=True, - desc="get_rejection_reason", - ) + if self.USE_EVENT_JSON: + return await self.db_pool.simple_select_one_onecol( + table="rejections", + retcol="reason", + keyvalues={"event_id": event_id}, + allow_none=True, + desc="get_rejection_reason", + ) + else: + return await self.db_pool.simple_select_one_onecol( + table="events", + retcol="rejection_reason", + keyvalues={"event_id": event_id}, + allow_none=True, + desc="get_rejection_reason", + ) diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 5dd464b15623..9d6d5d56a259 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -719,15 +719,26 @@ def _get_media_mxcs_in_room_txn(self, txn, room_id): The local and remote media as a lists of tuples where the key is the hostname and the value is the media ID. """ - sql = """ - SELECT stream_ordering, ej.json FROM events - JOIN event_json ej USING (room_id, event_id) - WHERE room_id = ? - %(where_clause)s - AND contains_url = ? AND outlier = ? - ORDER BY stream_ordering DESC - LIMIT ? - """ + if self.USE_EVENT_JSON: + sql = """ + SELECT stream_ordering, ej.json FROM events + JOIN event_json ej USING (room_id, event_id) + WHERE room_id = ? + %(where_clause)s + AND contains_url = ? AND outlier = ? + ORDER BY stream_ordering DESC + LIMIT ? + """ + else: + sql = """ + SELECT stream_ordering, json FROM events + WHERE room_id = ? + %(where_clause)s + AND contains_url = ? AND outlier = ? + ORDER BY stream_ordering DESC + LIMIT ? + """ + txn.execute(sql % {"where_clause": ""}, (room_id, True, False, 100)) local_media_mxcs = [] From 60812cabc12183955c915ef1f9a58301bf46e6fd Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 15 Feb 2021 22:35:45 +0000 Subject: [PATCH 4/5] fix broken test --- tests/storage/test_event_chain.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/storage/test_event_chain.py b/tests/storage/test_event_chain.py index d87f124c2638..0ec1f5bde1ad 100644 --- a/tests/storage/test_event_chain.py +++ b/tests/storage/test_event_chain.py @@ -19,6 +19,7 @@ from synapse.api.constants import EventTypes from synapse.api.room_versions import RoomVersions from synapse.events import EventBase +from synapse.events.snapshot import EventContext from synapse.rest import admin from synapse.rest.client.v1 import login, room from synapse.storage.databases.main.events import _LinkMap @@ -391,7 +392,9 @@ def persist( def _persist(txn): # We need to persist the events to the events and state_events # tables. - persist_events_store._store_event_txn(txn, [(e, {}) for e in events]) + persist_events_store._store_event_txn( + txn, [(e, EventContext()) for e in events] + ) # Actually call the function that calculates the auth chain stuff. persist_events_store._persist_event_auth_chain_txn(txn, events) From 219f3cbb19d72196f6e2856a36339558f7eaa488 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 15 Feb 2021 22:36:38 +0000 Subject: [PATCH 5/5] changelog --- changelog.d/9413.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/9413.misc diff --git a/changelog.d/9413.misc b/changelog.d/9413.misc new file mode 100644 index 000000000000..8c00d069797e --- /dev/null +++ b/changelog.d/9413.misc @@ -0,0 +1 @@ +Improve performance of fetching events from the database.