From f480209e97cf3291dd3548ad1e027d9cd2ad4a26 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 23 Mar 2021 13:31:17 -0400 Subject: [PATCH 1/7] Add type hints to FederationRemoteSendQueue. --- synapse/federation/send_queue.py | 64 ++++++++++++++++++++------------ 1 file changed, 40 insertions(+), 24 deletions(-) diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 3e993b428b71..097cb1b02fb4 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -31,25 +31,31 @@ import logging from collections import namedtuple -from typing import Dict, List, Tuple, Type +from typing import TYPE_CHECKING, Dict, List, Optional, Sized, Tuple, Type from sortedcontainers import SortedDict from twisted.internet import defer from synapse.api.presence import UserPresenceState +from synapse.federation.sender import FederationSender from synapse.metrics import LaterGauge +from synapse.replication.tcp.streams.federation import FederationStream +from synapse.types import JsonDict, ReadReceipt from synapse.util.metrics import Measure from .units import Edu +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) class FederationRemoteSendQueue: """A drop in replacement for FederationSender""" - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): self.server_name = hs.hostname self.clock = hs.get_clock() self.notifier = hs.get_notifier() @@ -58,7 +64,7 @@ def __init__(self, hs): # We may have multiple federation sender instances, so we need to track # their positions separately. self._sender_instances = hs.config.worker.federation_shard_config.instances - self._sender_positions = {} + self._sender_positions = {} # type: Dict[str, int] # Pending presence map user_id -> UserPresenceState self.presence_map = {} # type: Dict[str, UserPresenceState] @@ -94,7 +100,7 @@ def __init__(self, hs): # we make a new function, so we need to make a new function so the inner # lambda binds to the queue rather than to the name of the queue which # changes. ARGH. - def register(name, queue): + def register(name: str, queue: Sized) -> None: LaterGauge( "synapse_federation_send_queue_%s_size" % (queue_name,), "", @@ -115,13 +121,13 @@ def register(name, queue): self.clock.looping_call(self._clear_queue, 30 * 1000) - def _next_pos(self): + def _next_pos(self) -> int: pos = self.pos self.pos += 1 self.pos_time[self.clock.time_msec()] = pos return pos - def _clear_queue(self): + def _clear_queue(self) -> None: """Clear the queues for anything older than N minutes""" FIVE_MINUTES_AGO = 5 * 60 * 1000 @@ -138,7 +144,7 @@ def _clear_queue(self): self._clear_queue_before_pos(position_to_delete) - def _clear_queue_before_pos(self, position_to_delete): + def _clear_queue_before_pos(self, position_to_delete: int) -> None: """Clear all the queues from before a given position""" with Measure(self.clock, "send_queue._clear"): # Delete things out of presence maps @@ -188,13 +194,18 @@ def _clear_queue_before_pos(self, position_to_delete): for key in keys[:i]: del self.edus[key] - def notify_new_events(self, max_token): + def notify_new_events(self, max_token: int) -> None: """As per FederationSender""" # We don't need to replicate this as it gets sent down a different # stream. - pass - def build_and_send_edu(self, destination, edu_type, content, key=None): + def build_and_send_edu( + self, + destination: str, + edu_type: str, + content: JsonDict, + key: Optional[tuple] = None, + ) -> None: """As per FederationSender""" if destination == self.server_name: logger.info("Not sending EDU to ourselves") @@ -218,38 +229,40 @@ def build_and_send_edu(self, destination, edu_type, content, key=None): self.notifier.on_new_replication_data() - def send_read_receipt(self, receipt): + def send_read_receipt(self, receipt: ReadReceipt) -> defer.Deferred: """As per FederationSender Args: - receipt (synapse.types.ReadReceipt): + receipt: """ # nothing to do here: the replication listener will handle it. return defer.succeed(None) - def send_presence(self, states): + def send_presence(self, states: List[UserPresenceState]) -> None: """As per FederationSender Args: - states (list(UserPresenceState)) + states """ pos = self._next_pos() # We only want to send presence for our own users, so lets always just # filter here just in case. - local_states = list(filter(lambda s: self.is_mine_id(s.user_id), states)) + local_states = [s for s in states if self.is_mine_id(s.user_id)] self.presence_map.update({state.user_id: state for state in local_states}) self.presence_changed[pos] = [state.user_id for state in local_states] self.notifier.on_new_replication_data() - def send_presence_to_destinations(self, states, destinations): + def send_presence_to_destinations( + self, states: List[UserPresenceState], destinations: List[str] + ) -> None: """As per FederationSender Args: - states (list[UserPresenceState]) - destinations (list[str]) + states + destinations """ for state in states: pos = self._next_pos() @@ -258,15 +271,15 @@ def send_presence_to_destinations(self, states, destinations): self.notifier.on_new_replication_data() - def send_device_messages(self, destination): + def send_device_messages(self, destination: str) -> None: """As per FederationSender""" # We don't need to replicate this as it gets sent down a different # stream. - def get_current_token(self): + def get_current_token(self) -> int: return self.pos - 1 - def federation_ack(self, instance_name, token): + def federation_ack(self, instance_name: str, token: int) -> None: if self._sender_instances: # If we have configured multiple federation sender instances we need # to track their positions separately, and only clear the queue up @@ -504,13 +517,16 @@ def add_to_buffer(self, buff): ) -def process_rows_for_federation(transaction_queue, rows): +def process_rows_for_federation( + transaction_queue: FederationSender, + rows: List[FederationStream.FederationStreamRow], +) -> None: """Parse a list of rows from the federation stream and put them in the transaction queue ready for sending to the relevant homeservers. Args: - transaction_queue (FederationSender) - rows (list(synapse.replication.tcp.streams.federation.FederationStream.FederationStreamRow)) + transaction_queue + rows """ # The federation stream contains a bunch of different types of From aab99601153fa0dcbe2f3926c17aa3e09e8074db Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 23 Mar 2021 13:51:47 -0400 Subject: [PATCH 2/7] Add an abstract base class for the FederationSender. --- synapse/federation/send_queue.py | 30 +++++--- synapse/federation/sender/__init__.py | 106 ++++++++++++++++++++++---- synapse/replication/tcp/commands.py | 6 +- 3 files changed, 116 insertions(+), 26 deletions(-) diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 097cb1b02fb4..ee9b6b0fd834 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -31,14 +31,22 @@ import logging from collections import namedtuple -from typing import TYPE_CHECKING, Dict, List, Optional, Sized, Tuple, Type +from typing import ( + TYPE_CHECKING, + Dict, + Hashable, + Iterable, + List, + Optional, + Sized, + Tuple, + Type, +) from sortedcontainers import SortedDict -from twisted.internet import defer - from synapse.api.presence import UserPresenceState -from synapse.federation.sender import FederationSender +from synapse.federation.sender import AbstractFederationSender, FederationSender from synapse.metrics import LaterGauge from synapse.replication.tcp.streams.federation import FederationStream from synapse.types import JsonDict, ReadReceipt @@ -52,7 +60,7 @@ logger = logging.getLogger(__name__) -class FederationRemoteSendQueue: +class FederationRemoteSendQueue(AbstractFederationSender): """A drop in replacement for FederationSender""" def __init__(self, hs: "HomeServer"): @@ -77,7 +85,7 @@ def __init__(self, hs: "HomeServer"): # Stream position -> (user_id, destinations) self.presence_destinations = ( SortedDict() - ) # type: SortedDict[int, Tuple[str, List[str]]] + ) # type: SortedDict[int, Tuple[str, Iterable[str]]] # (destination, key) -> EDU self.keyed_edu = {} # type: Dict[Tuple[str, tuple], Edu] @@ -204,7 +212,7 @@ def build_and_send_edu( destination: str, edu_type: str, content: JsonDict, - key: Optional[tuple] = None, + key: Optional[Hashable] = None, ) -> None: """As per FederationSender""" if destination == self.server_name: @@ -229,14 +237,13 @@ def build_and_send_edu( self.notifier.on_new_replication_data() - def send_read_receipt(self, receipt: ReadReceipt) -> defer.Deferred: + async def send_read_receipt(self, receipt: ReadReceipt) -> None: """As per FederationSender Args: receipt: """ # nothing to do here: the replication listener will handle it. - return defer.succeed(None) def send_presence(self, states: List[UserPresenceState]) -> None: """As per FederationSender @@ -256,7 +263,7 @@ def send_presence(self, states: List[UserPresenceState]) -> None: self.notifier.on_new_replication_data() def send_presence_to_destinations( - self, states: List[UserPresenceState], destinations: List[str] + self, states: Iterable[UserPresenceState], destinations: Iterable[str] ) -> None: """As per FederationSender @@ -276,6 +283,9 @@ def send_device_messages(self, destination: str) -> None: # We don't need to replicate this as it gets sent down a different # stream. + def wake_destination(self, server: str) -> None: + pass + def get_current_token(self) -> int: return self.pos - 1 diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 24ebc4b8031f..6b7b9ae3b651 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -13,14 +13,14 @@ # See the License for the specific language governing permissions and # limitations under the License. +import abc import logging -from typing import Dict, Hashable, Iterable, List, Optional, Set, Tuple +from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Set, Tuple from prometheus_client import Counter from twisted.internet import defer -import synapse import synapse.metrics from synapse.api.presence import UserPresenceState from synapse.events import EventBase @@ -40,9 +40,12 @@ events_processed_counter, ) from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.types import ReadReceipt, RoomStreamToken +from synapse.types import JsonDict, ReadReceipt, RoomStreamToken from synapse.util.metrics import Measure, measure_func +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) sent_pdus_destination_dist_count = Counter( @@ -65,8 +68,85 @@ CATCH_UP_STARTUP_INTERVAL_SEC = 5 -class FederationSender: - def __init__(self, hs: "synapse.server.HomeServer"): +class AbstractFederationSender(metaclass=abc.ABCMeta): + @abc.abstractmethod + def notify_new_events(self, max_token: RoomStreamToken) -> None: + """This gets called when we have some new events we might want to + send out to other servers. + """ + + @abc.abstractmethod + async def send_read_receipt(self, receipt: ReadReceipt) -> None: + """Send a RR to any other servers in the room + + Args: + receipt: receipt to be sent + """ + + @abc.abstractmethod + def send_presence(self, states: List[UserPresenceState]) -> None: + """Send the new presence states to the appropriate destinations. + + This actually queues up the presence states ready for sending and + triggers a background task to process them and send out the transactions. + """ + + @abc.abstractmethod + def send_presence_to_destinations( + self, states: Iterable[UserPresenceState], destinations: Iterable[str] + ) -> None: + """Send the given presence states to the given destinations. + + Args: + destinations: + """ + + @abc.abstractmethod + def build_and_send_edu( + self, + destination: str, + edu_type: str, + content: JsonDict, + key: Optional[Hashable] = None, + ) -> None: + """Construct an Edu object, and queue it for sending + + Args: + destination: name of server to send to + edu_type: type of EDU to send + content: content of EDU + key: clobbering key for this edu + """ + + @abc.abstractmethod + def send_device_messages(self, destination: str) -> None: + pass + + @abc.abstractmethod + def wake_destination(self, destination: str) -> None: + """Called when we want to retry sending transactions to a remote. + + This is mainly useful if the remote server has been down and we think it + might have come back. + """ + + @abc.abstractmethod + def get_current_token(self) -> int: + # Dummy implementation for case where federation sender isn't offloaded + # to a worker. + return 0 + + @abc.abstractmethod + async def get_replication_rows( + self, instance_name: str, from_token: int, to_token: int, target_row_count: int + ) -> Tuple[List[Tuple[int, Tuple]], int, bool]: + # Dummy implementation for case where federation sender isn't offloaded + # to a worker. + return [], 0, False + + +class FederationSender(AbstractFederationSender): + def __init__(self, hs: "HomeServer"): self.hs = hs self.server_name = hs.hostname @@ -432,7 +512,7 @@ def _flush_rrs_for_room(self, room_id: str) -> None: queue.flush_read_receipts_for_room(room_id) @preserve_fn # the caller should not yield on this - async def send_presence(self, states: List[UserPresenceState]): + async def send_presence(self, states: List[UserPresenceState]) -> None: """Send the new presence states to the appropriate destinations. This actually queues up the presence states ready for sending and @@ -494,7 +574,7 @@ def send_presence_to_destinations( self._get_per_destination_queue(destination).send_presence(states) @measure_func("txnqueue._process_presence") - async def _process_presence_inner(self, states: List[UserPresenceState]): + async def _process_presence_inner(self, states: List[UserPresenceState]) -> None: """Given a list of states populate self.pending_presence_by_dest and poke to send a new transaction to each destination """ @@ -516,9 +596,9 @@ def build_and_send_edu( self, destination: str, edu_type: str, - content: dict, + content: JsonDict, key: Optional[Hashable] = None, - ): + ) -> None: """Construct an Edu object, and queue it for sending Args: @@ -545,7 +625,7 @@ def build_and_send_edu( self.send_edu(edu, key) - def send_edu(self, edu: Edu, key: Optional[Hashable]): + def send_edu(self, edu: Edu, key: Optional[Hashable]) -> None: """Queue an EDU for sending Args: @@ -563,7 +643,7 @@ def send_edu(self, edu: Edu, key: Optional[Hashable]): else: queue.send_edu(edu) - def send_device_messages(self, destination: str): + def send_device_messages(self, destination: str) -> None: if destination == self.server_name: logger.warning("Not sending device update to ourselves") return @@ -575,7 +655,7 @@ def send_device_messages(self, destination: str): self._get_per_destination_queue(destination).attempt_new_transaction() - def wake_destination(self, destination: str): + def wake_destination(self, destination: str) -> None: """Called when we want to retry sending transactions to a remote. This is mainly useful if the remote server has been down and we think it @@ -607,7 +687,7 @@ async def get_replication_rows( # to a worker. return [], 0, False - async def _wake_destinations_needing_catchup(self): + async def _wake_destinations_needing_catchup(self) -> None: """ Wakes up destinations that need catch-up and are not currently being backed off from. diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py index bb447f75b4d2..8abed1f52d3e 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py @@ -312,16 +312,16 @@ class FederationAckCommand(Command): NAME = "FEDERATION_ACK" - def __init__(self, instance_name, token): + def __init__(self, instance_name: str, token: int): self.instance_name = instance_name self.token = token @classmethod - def from_line(cls, line): + def from_line(cls, line: str) -> "FederationAckCommand": instance_name, token = line.split(" ") return cls(instance_name, int(token)) - def to_line(self): + def to_line(self) -> str: return "%s %s" % (self.instance_name, self.token) From c1a6a59137e88a316e860d3f0a4d2980510d1304 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 23 Mar 2021 13:57:59 -0400 Subject: [PATCH 3/7] Add type hint to HomeServer. --- synapse/replication/tcp/handler.py | 5 ++++- synapse/replication/tcp/streams/federation.py | 16 +++++++++++++--- synapse/server.py | 4 ++-- 3 files changed, 19 insertions(+), 6 deletions(-) diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index a8894beadfd1..8b6bf420584a 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -34,6 +34,7 @@ from twisted.internet.protocol import ReconnectingClientFactory +from synapse.federation.send_queue import FederationRemoteSendQueue from synapse.metrics import LaterGauge from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.tcp.client import DirectTcpReplicationClientFactory @@ -213,7 +214,9 @@ def __init__(self, hs: "HomeServer"): self._federation_sender = None if self._is_master and not hs.config.send_federation: - self._federation_sender = hs.get_federation_sender() + federation_sender = hs.get_federation_sender() + assert isinstance(federation_sender, FederationRemoteSendQueue) + self._federation_sender = federation_sender self._server_notices_sender = None if self._is_master: diff --git a/synapse/replication/tcp/streams/federation.py b/synapse/replication/tcp/streams/federation.py index 9bcd13b00989..c00d053cbf03 100644 --- a/synapse/replication/tcp/streams/federation.py +++ b/synapse/replication/tcp/streams/federation.py @@ -14,13 +14,18 @@ # See the License for the specific language governing permissions and # limitations under the License. from collections import namedtuple +from typing import TYPE_CHECKING, Any, Awaitable, Callable, List, Tuple +from synapse.federation.sender import FederationSender from synapse.replication.tcp.streams._base import ( Stream, current_token_without_instance, make_http_update_function, ) +if TYPE_CHECKING: + from synapse.server import HomeServer + class FederationStream(Stream): """Data to be sent over federation. Only available when master has federation @@ -38,17 +43,20 @@ class FederationStream(Stream): NAME = "federation" ROW_TYPE = FederationStreamRow - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): if hs.config.worker_app is None: # master process: get updates from the FederationRemoteSendQueue. # (if the master is configured to send federation itself, federation_sender # will be a real FederationSender, which has stubs for current_token and # get_replication_rows.) federation_sender = hs.get_federation_sender() + assert isinstance(federation_sender, FederationSender) current_token = current_token_without_instance( federation_sender.get_current_token ) - update_function = federation_sender.get_replication_rows + update_function = ( + federation_sender.get_replication_rows + ) # type: Callable[[str, int, int, int], Awaitable[Tuple[List[Tuple[int, Any]], int, bool]]] elif hs.should_send_federation(): # federation sender: Query master process @@ -69,5 +77,7 @@ def _stub_current_token(instance_name: str) -> int: return 0 @staticmethod - async def _stub_update_function(instance_name, from_token, upto_token, limit): + async def _stub_update_function( + instance_name: str, from_token: int, upto_token: int, limit: int + ) -> Tuple[list, int, bool]: return [], upto_token, False diff --git a/synapse/server.py b/synapse/server.py index 5e787e2281a8..e85b9391faae 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -60,7 +60,7 @@ FederationServer, ) from synapse.federation.send_queue import FederationRemoteSendQueue -from synapse.federation.sender import FederationSender +from synapse.federation.sender import AbstractFederationSender, FederationSender from synapse.federation.transport.client import TransportLayerClient from synapse.groups.attestations import GroupAttestationSigning, GroupAttestionRenewer from synapse.groups.groups_server import GroupsServerHandler, GroupsServerWorkerHandler @@ -571,7 +571,7 @@ def get_federation_transport_client(self) -> TransportLayerClient: return TransportLayerClient(self) @cache_in_self - def get_federation_sender(self): + def get_federation_sender(self) -> AbstractFederationSender: if self.should_send_federation(): return FederationSender(self) elif not self.config.worker_app: From 48b3a1172892ac8ddf56703ef17f8628b14f30f4 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 24 Mar 2021 12:43:12 -0400 Subject: [PATCH 4/7] Remove unused code. --- synapse/app/generic_worker.py | 7 ------- synapse/federation/send_queue.py | 10 ++++++---- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index caef394e1d4d..500725a4eb7b 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -792,13 +792,6 @@ def __init__(self, hs: GenericWorkerServer): self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer") - def on_start(self): - # There may be some events that are persisted but haven't been sent, - # so send them now. - self.federation_sender.notify_new_events( - self.store.get_room_max_stream_ordering() - ) - def wake_destination(self, server: str): self.federation_sender.wake_destination(server) diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index ee9b6b0fd834..79c712eac34c 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -49,7 +49,7 @@ from synapse.federation.sender import AbstractFederationSender, FederationSender from synapse.metrics import LaterGauge from synapse.replication.tcp.streams.federation import FederationStream -from synapse.types import JsonDict, ReadReceipt +from synapse.types import JsonDict, ReadReceipt, RoomStreamToken from synapse.util.metrics import Measure from .units import Edu @@ -202,10 +202,12 @@ def _clear_queue_before_pos(self, position_to_delete: int) -> None: for key in keys[:i]: del self.edus[key] - def notify_new_events(self, max_token: int) -> None: + def notify_new_events(self, max_token: RoomStreamToken) -> None: """As per FederationSender""" - # We don't need to replicate this as it gets sent down a different - # stream. + # This should never get called. + raise NotImplementedError( + "FederationRemoteSendQueue unexpectedly received a call to notify_new_events." + ) def build_and_send_edu( self, From 35a6a641559d4f1e0f20c82adcc36c5155859609 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 24 Mar 2021 12:56:30 -0400 Subject: [PATCH 5/7] Do not cause import cycles. --- synapse/federation/send_queue.py | 4 +--- synapse/federation/sender/__init__.py | 16 ++++++++++------ synapse/replication/tcp/handler.py | 5 +---- synapse/replication/tcp/streams/federation.py | 2 -- 4 files changed, 12 insertions(+), 15 deletions(-) diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 79c712eac34c..0c18c49abb70 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -205,9 +205,7 @@ def _clear_queue_before_pos(self, position_to_delete: int) -> None: def notify_new_events(self, max_token: RoomStreamToken) -> None: """As per FederationSender""" # This should never get called. - raise NotImplementedError( - "FederationRemoteSendQueue unexpectedly received a call to notify_new_events." - ) + raise NotImplementedError() def build_and_send_edu( self, diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 6b7b9ae3b651..c93106132ad8 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -132,17 +132,17 @@ def wake_destination(self, destination: str) -> None: @abc.abstractmethod def get_current_token(self) -> int: - # Dummy implementation for case where federation sender isn't offloaded - # to a worker. - return 0 + pass + + @abc.abstractmethod + def federation_ack(self, instance_name: str, token: int) -> None: + pass @abc.abstractmethod async def get_replication_rows( self, instance_name: str, from_token: int, to_token: int, target_row_count: int ) -> Tuple[List[Tuple[int, Tuple]], int, bool]: - # Dummy implementation for case where federation sender isn't offloaded - # to a worker. - return [], 0, False + pass class FederationSender(AbstractFederationSender): @@ -679,6 +679,10 @@ def get_current_token() -> int: # to a worker. return 0 + def federation_ack(self, instance_name: str, token: int) -> None: + # It is not expected that this gets called on FederationSender. + raise NotImplementedError() + @staticmethod async def get_replication_rows( instance_name: str, from_token: int, to_token: int, target_row_count: int diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 8b6bf420584a..a8894beadfd1 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -34,7 +34,6 @@ from twisted.internet.protocol import ReconnectingClientFactory -from synapse.federation.send_queue import FederationRemoteSendQueue from synapse.metrics import LaterGauge from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.tcp.client import DirectTcpReplicationClientFactory @@ -214,9 +213,7 @@ def __init__(self, hs: "HomeServer"): self._federation_sender = None if self._is_master and not hs.config.send_federation: - federation_sender = hs.get_federation_sender() - assert isinstance(federation_sender, FederationRemoteSendQueue) - self._federation_sender = federation_sender + self._federation_sender = hs.get_federation_sender() self._server_notices_sender = None if self._is_master: diff --git a/synapse/replication/tcp/streams/federation.py b/synapse/replication/tcp/streams/federation.py index c00d053cbf03..9bb8e9e17731 100644 --- a/synapse/replication/tcp/streams/federation.py +++ b/synapse/replication/tcp/streams/federation.py @@ -16,7 +16,6 @@ from collections import namedtuple from typing import TYPE_CHECKING, Any, Awaitable, Callable, List, Tuple -from synapse.federation.sender import FederationSender from synapse.replication.tcp.streams._base import ( Stream, current_token_without_instance, @@ -50,7 +49,6 @@ def __init__(self, hs: "HomeServer"): # will be a real FederationSender, which has stubs for current_token and # get_replication_rows.) federation_sender = hs.get_federation_sender() - assert isinstance(federation_sender, FederationSender) current_token = current_token_without_instance( federation_sender.get_current_token ) From 7da7f8f76486c9c5fe74bfc57b375b2b973f57a6 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 24 Mar 2021 13:01:07 -0400 Subject: [PATCH 6/7] Newsfragment --- changelog.d/9681.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/9681.misc diff --git a/changelog.d/9681.misc b/changelog.d/9681.misc new file mode 100644 index 000000000000..35338cd33263 --- /dev/null +++ b/changelog.d/9681.misc @@ -0,0 +1 @@ +Add additional type hints to the Homeserver object. From 0b70f94254d069c7252ba3852dbe80c2f4a48969 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 29 Mar 2021 10:49:28 -0400 Subject: [PATCH 7/7] Raise NotImplementedError. --- synapse/federation/sender/__init__.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index c93106132ad8..8babb1ebbe4e 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -74,6 +74,7 @@ def notify_new_events(self, max_token: RoomStreamToken) -> None: """This gets called when we have some new events we might want to send out to other servers. """ + raise NotImplementedError() @abc.abstractmethod async def send_read_receipt(self, receipt: ReadReceipt) -> None: @@ -82,6 +83,7 @@ async def send_read_receipt(self, receipt: ReadReceipt) -> None: Args: receipt: receipt to be sent """ + raise NotImplementedError() @abc.abstractmethod def send_presence(self, states: List[UserPresenceState]) -> None: @@ -90,6 +92,7 @@ def send_presence(self, states: List[UserPresenceState]) -> None: This actually queues up the presence states ready for sending and triggers a background task to process them and send out the transactions. """ + raise NotImplementedError() @abc.abstractmethod def send_presence_to_destinations( @@ -100,6 +103,7 @@ def send_presence_to_destinations( Args: destinations: """ + raise NotImplementedError() @abc.abstractmethod def build_and_send_edu( @@ -117,10 +121,11 @@ def build_and_send_edu( content: content of EDU key: clobbering key for this edu """ + raise NotImplementedError() @abc.abstractmethod def send_device_messages(self, destination: str) -> None: - pass + raise NotImplementedError() @abc.abstractmethod def wake_destination(self, destination: str) -> None: @@ -129,20 +134,21 @@ def wake_destination(self, destination: str) -> None: This is mainly useful if the remote server has been down and we think it might have come back. """ + raise NotImplementedError() @abc.abstractmethod def get_current_token(self) -> int: - pass + raise NotImplementedError() @abc.abstractmethod def federation_ack(self, instance_name: str, token: int) -> None: - pass + raise NotImplementedError() @abc.abstractmethod async def get_replication_rows( self, instance_name: str, from_token: int, to_token: int, target_row_count: int ) -> Tuple[List[Tuple[int, Tuple]], int, bool]: - pass + raise NotImplementedError() class FederationSender(AbstractFederationSender):