From 844c9b0aaedc7092ee24ff0100de70fb141ab5fe Mon Sep 17 00:00:00 2001 From: Jonathan de Jong Date: Wed, 17 Mar 2021 14:28:40 +0100 Subject: [PATCH 1/6] fix federation stall --- synapse/federation/sender/__init__.py | 41 ++++++++++++------- .../storage/databases/main/transactions.py | 35 ++++++++++++---- 2 files changed, 54 insertions(+), 22 deletions(-) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 24ebc4b8031f..211a82e75c26 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -14,7 +14,7 @@ # limitations under the License. import logging -from typing import Dict, Hashable, Iterable, List, Optional, Set, Tuple +from typing import Dict, Hashable, Iterable, List, Optional, Sequence, Set, Tuple from prometheus_client import Counter @@ -189,7 +189,9 @@ async def _process_event_queue_loop(self) -> None: if not events and next_token >= self._last_poked_id: break - async def handle_event(event: EventBase) -> None: + async def handle_event( + event: EventBase, flush_destination_rooms: bool = True + ) -> None: # Only send events for this server. send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of() is_mine = self.is_mine_id(event.sender) @@ -251,7 +253,9 @@ async def handle_event(event: EventBase) -> None: logger.debug("Sending %s to %r", event, destinations) if destinations: - await self._send_pdu(event, destinations) + await self._send_pdu( + event, destinations, flush_destination_rooms + ) now = self.clock.time_msec() ts = await self.store.get_received_ts(event.event_id) @@ -260,10 +264,11 @@ async def handle_event(event: EventBase) -> None: "federation_sender" ).observe((now - ts) / 1000) - async def handle_room_events(events: Iterable[EventBase]) -> None: + async def handle_room_events(events: Sequence[EventBase]) -> None: with Measure(self.clock, "handle_room_events"): - for event in events: - await handle_event(event) + evs_len = len(events) + for pos, event in enumerate(events, start=1): + await handle_event(event, pos == evs_len) events_by_room = {} # type: Dict[str, List[EventBase]] for event in events: @@ -307,7 +312,12 @@ async def handle_room_events(events: Iterable[EventBase]) -> None: finally: self._is_processing = False - async def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None: + async def _send_pdu( + self, + pdu: EventBase, + destinations: Iterable[str], + flush_destination_rooms: bool = True, + ) -> None: # We loop through all destinations to see whether we already have # a transaction in progress. If we do, stick it in the pending_pdus # table and we'll get back to it later. @@ -324,14 +334,15 @@ async def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None: assert pdu.internal_metadata.stream_ordering - # track the fact that we have a PDU for these destinations, - # to allow us to perform catch-up later on if the remote is unreachable - # for a while. - await self.store.store_destination_rooms_entries( - destinations, - pdu.room_id, - pdu.internal_metadata.stream_ordering, - ) + if flush_destination_rooms: + # track the fact that we have a PDU for these destinations, + # to allow us to perform catch-up later on if the remote is unreachable + # for a while. + await self.store.store_destination_rooms_entries( + destinations, + pdu.room_id, + pdu.internal_metadata.stream_ordering, + ) for destination in destinations: self._get_per_destination_queue(destination).send_pdu(pdu) diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index 030966184140..7b8f1b4af9e9 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -26,6 +26,14 @@ from synapse.types import JsonDict from synapse.util.caches.expiringcache import ExpiringCache +try: + from psycopg2.errors import SerializationFailure +except ImportError: + # No postgres, no harm in making it a dummy class. + class SerializationFailure(Exception): + ... + + db_binary_type = memoryview logger = logging.getLogger(__name__) @@ -312,13 +320,26 @@ async def store_destination_rooms_entries( stream_ordering: the stream_ordering of the event """ - return await self.db_pool.runInteraction( - "store_destination_rooms_entries", - self._store_destination_rooms_entries_txn, - destinations, - room_id, - stream_ordering, - ) + while True: + try: + return await self.db_pool.runInteraction( + "store_destination_rooms_entries", + self._store_destination_rooms_entries_txn, + destinations, + room_id, + stream_ordering, + ) + except SerializationFailure as e: + logger.debug( + "Could not finish _store_destination_rooms_entries_txn due to SerializationFailure, retrying...", + exc_info=e, + ) + + # This is fine, as SerializationFailure cannot be caused by anything else than postgres rolling back + # the transaction because another transaction (concurrently) has updated the view of the table, + # there "can be only one" transaction that wins, in that regard, and so we try again (because the other + # transaction has succeeded). + continue def _store_destination_rooms_entries_txn( self, From 1c60f31f674d66aeb3837cd8d1cc51e8268bd1a1 Mon Sep 17 00:00:00 2001 From: Jonathan de Jong Date: Wed, 17 Mar 2021 14:40:13 +0100 Subject: [PATCH 2/6] news --- changelog.d/9639.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/9639.bugfix diff --git a/changelog.d/9639.bugfix b/changelog.d/9639.bugfix new file mode 100644 index 000000000000..ac4d8b198928 --- /dev/null +++ b/changelog.d/9639.bugfix @@ -0,0 +1 @@ +Fixes #9635, and optimizes federation sending. \ No newline at end of file From 86b91a03cc7cf2925232d0ad976766c7891f8b97 Mon Sep 17 00:00:00 2001 From: Jonathan de Jong Date: Fri, 19 Mar 2021 13:32:01 +0100 Subject: [PATCH 3/6] apply feedback and simplify store_destination_rooms_entries --- .../storage/databases/main/transactions.py | 59 ++++--------------- 1 file changed, 10 insertions(+), 49 deletions(-) diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index 7b8f1b4af9e9..4c73aec90a4b 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -320,62 +320,23 @@ async def store_destination_rooms_entries( stream_ordering: the stream_ordering of the event """ - while True: - try: - return await self.db_pool.runInteraction( - "store_destination_rooms_entries", - self._store_destination_rooms_entries_txn, - destinations, - room_id, - stream_ordering, - ) - except SerializationFailure as e: - logger.debug( - "Could not finish _store_destination_rooms_entries_txn due to SerializationFailure, retrying...", - exc_info=e, - ) - - # This is fine, as SerializationFailure cannot be caused by anything else than postgres rolling back - # the transaction because another transaction (concurrently) has updated the view of the table, - # there "can be only one" transaction that wins, in that regard, and so we try again (because the other - # transaction has succeeded). - continue - - def _store_destination_rooms_entries_txn( - self, - txn: LoggingTransaction, - destinations: Iterable[str], - room_id: str, - stream_ordering: int, - ) -> None: - - # ensure we have a `destinations` row for this destination, as there is - # a foreign key constraint. - if isinstance(self.database_engine, PostgresEngine): - q = """ - INSERT INTO destinations (destination) - VALUES (?) - ON CONFLICT DO NOTHING; - """ - elif isinstance(self.database_engine, Sqlite3Engine): - q = """ - INSERT OR IGNORE INTO destinations (destination) - VALUES (?); - """ - else: - raise RuntimeError("Unknown database engine") - - txn.execute_batch(q, ((destination,) for destination in destinations)) + await self.db_pool.simple_upsert_many( + table="destinations", + key_names=("destination",), + key_values=[(d,) for d in destinations], + value_names=[], + value_values=[], + desc="store_destination_rooms_entries_dests", + ) rows = [(destination, room_id) for destination in destinations] - - self.db_pool.simple_upsert_many_txn( - txn, + await self.db_pool.simple_upsert_many( table="destination_rooms", key_names=("destination", "room_id"), key_values=rows, value_names=["stream_ordering"], value_values=[(stream_ordering,)] * len(rows), + desc="store_destination_rooms_entries_rooms", ) async def get_destination_last_successful_stream_ordering( From fc86c73e2eb08879dbd29c24c170780ba8839668 Mon Sep 17 00:00:00 2001 From: Jonathan de Jong Date: Fri, 19 Mar 2021 13:37:46 +0100 Subject: [PATCH 4/6] softly pet the linter --- synapse/storage/databases/main/transactions.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index 4c73aec90a4b..3e0ba4671959 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -22,7 +22,6 @@ from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage._base import SQLBaseStore, db_to_json from synapse.storage.database import DatabasePool, LoggingTransaction -from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.types import JsonDict from synapse.util.caches.expiringcache import ExpiringCache From 294320776641d237b32151112ea0e89309d34d52 Mon Sep 17 00:00:00 2001 From: Jonathan de Jong Date: Sat, 20 Mar 2021 18:45:12 +0100 Subject: [PATCH 5/6] revert optimizations --- changelog.d/9639.bugfix | 2 +- synapse/federation/sender/__init__.py | 41 +++++++------------ .../storage/databases/main/transactions.py | 8 ---- 3 files changed, 16 insertions(+), 35 deletions(-) diff --git a/changelog.d/9639.bugfix b/changelog.d/9639.bugfix index ac4d8b198928..5882b89ace86 100644 --- a/changelog.d/9639.bugfix +++ b/changelog.d/9639.bugfix @@ -1 +1 @@ -Fixes #9635, and optimizes federation sending. \ No newline at end of file +Fixes #9635. \ No newline at end of file diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 211a82e75c26..24ebc4b8031f 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -14,7 +14,7 @@ # limitations under the License. import logging -from typing import Dict, Hashable, Iterable, List, Optional, Sequence, Set, Tuple +from typing import Dict, Hashable, Iterable, List, Optional, Set, Tuple from prometheus_client import Counter @@ -189,9 +189,7 @@ async def _process_event_queue_loop(self) -> None: if not events and next_token >= self._last_poked_id: break - async def handle_event( - event: EventBase, flush_destination_rooms: bool = True - ) -> None: + async def handle_event(event: EventBase) -> None: # Only send events for this server. send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of() is_mine = self.is_mine_id(event.sender) @@ -253,9 +251,7 @@ async def handle_event( logger.debug("Sending %s to %r", event, destinations) if destinations: - await self._send_pdu( - event, destinations, flush_destination_rooms - ) + await self._send_pdu(event, destinations) now = self.clock.time_msec() ts = await self.store.get_received_ts(event.event_id) @@ -264,11 +260,10 @@ async def handle_event( "federation_sender" ).observe((now - ts) / 1000) - async def handle_room_events(events: Sequence[EventBase]) -> None: + async def handle_room_events(events: Iterable[EventBase]) -> None: with Measure(self.clock, "handle_room_events"): - evs_len = len(events) - for pos, event in enumerate(events, start=1): - await handle_event(event, pos == evs_len) + for event in events: + await handle_event(event) events_by_room = {} # type: Dict[str, List[EventBase]] for event in events: @@ -312,12 +307,7 @@ async def handle_room_events(events: Sequence[EventBase]) -> None: finally: self._is_processing = False - async def _send_pdu( - self, - pdu: EventBase, - destinations: Iterable[str], - flush_destination_rooms: bool = True, - ) -> None: + async def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None: # We loop through all destinations to see whether we already have # a transaction in progress. If we do, stick it in the pending_pdus # table and we'll get back to it later. @@ -334,15 +324,14 @@ async def _send_pdu( assert pdu.internal_metadata.stream_ordering - if flush_destination_rooms: - # track the fact that we have a PDU for these destinations, - # to allow us to perform catch-up later on if the remote is unreachable - # for a while. - await self.store.store_destination_rooms_entries( - destinations, - pdu.room_id, - pdu.internal_metadata.stream_ordering, - ) + # track the fact that we have a PDU for these destinations, + # to allow us to perform catch-up later on if the remote is unreachable + # for a while. + await self.store.store_destination_rooms_entries( + destinations, + pdu.room_id, + pdu.internal_metadata.stream_ordering, + ) for destination in destinations: self._get_per_destination_queue(destination).send_pdu(pdu) diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index 3e0ba4671959..b7072f1f5ef2 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -25,14 +25,6 @@ from synapse.types import JsonDict from synapse.util.caches.expiringcache import ExpiringCache -try: - from psycopg2.errors import SerializationFailure -except ImportError: - # No postgres, no harm in making it a dummy class. - class SerializationFailure(Exception): - ... - - db_binary_type = memoryview logger = logging.getLogger(__name__) From 35da66fbac16eedbe9c43811037710a0ebd95ba8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 22 Mar 2021 10:01:13 +0000 Subject: [PATCH 6/6] Update changelog.d/9639.bugfix --- changelog.d/9639.bugfix | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.d/9639.bugfix b/changelog.d/9639.bugfix index 5882b89ace86..51b37467075a 100644 --- a/changelog.d/9639.bugfix +++ b/changelog.d/9639.bugfix @@ -1 +1 @@ -Fixes #9635. \ No newline at end of file +Fix bug where federation sending can stall due to `concurrent access` database exceptions when it falls behind.