From 04f0e1d3be6f505277c1a1fe053177a6fef64f4e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 15 Nov 2023 16:09:37 +0000 Subject: [PATCH 1/5] More efficiently handle no-op POSITION We may receive `POSITION` commands where we already know that worker has advanced past that position, so there is no point in handling it. --- synapse/replication/tcp/handler.py | 12 ++++++++++++ synapse/replication/tcp/streams/_base.py | 17 +++++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index afd03137f0fd..56a5e95fbbe5 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -588,6 +588,13 @@ def on_POSITION(self, conn: IReplicationConnection, cmd: PositionCommand) -> Non logger.debug("Handling '%s %s'", cmd.NAME, cmd.to_line()) + # Check if we early discard this position. + stream = self._streams[cmd.stream_name] + if stream.can_discard_position( + cmd.instance_name, cmd.prev_token, cmd.new_token + ): + return + self._add_command_to_stream_queue(conn, cmd) async def _process_position( @@ -599,6 +606,11 @@ async def _process_position( """ stream = self._streams[stream_name] + if stream.can_discard_position( + cmd.instance_name, cmd.prev_token, cmd.new_token + ): + return + # We're about to go and catch up with the stream, so remove from set # of connected streams. for streams in self._streams_by_connection.values(): diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index 58a44029aa75..a240b337c2fa 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -144,6 +144,16 @@ def minimal_local_current_token(self) -> Token: """ raise NotImplementedError() + def can_discard_position( + self, instance_name: str, prev_token: int, new_token: int + ) -> bool: + """Whether or not a position command for this stream can be discarded. + + Useful for streams that can never go backwards and where we already know + the stream ID for the instance has advanced. + """ + return False + def discard_updates_and_advance(self) -> None: """Called when the stream should advance but the updates would be discarded, e.g. when there are no currently connected workers. @@ -221,6 +231,13 @@ def current_token(self, instance_name: str) -> Token: def minimal_local_current_token(self) -> Token: return self._stream_id_gen.get_minimal_local_current_token() + def can_discard_position( + self, instance_name: str, prev_token: int, new_token: int + ) -> bool: + # These streams can't go backwards, so we know we can ignore any + # positions where the tokens are from before the current token. + return new_token <= self.current_token(instance_name) + def current_token_without_instance( current_token: Callable[[], int] From 6dc12037aad89a500bf0a61c9b0e916a874e4264 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 15 Nov 2023 16:18:05 +0000 Subject: [PATCH 2/5] Newsfile --- changelog.d/16640.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/16640.misc diff --git a/changelog.d/16640.misc b/changelog.d/16640.misc new file mode 100644 index 000000000000..3b1cc2185d20 --- /dev/null +++ b/changelog.d/16640.misc @@ -0,0 +1 @@ +More efficiently handle no-op `POSITION` over replication. From c4fb5f77f5e718089819055aa39821bba5226179 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 15 Nov 2023 16:47:13 +0000 Subject: [PATCH 3/5] Correctly handle disconnected streams --- synapse/replication/tcp/handler.py | 24 +++++++++++++++++++++--- synapse/replication/tcp/streams/_base.py | 9 +++++++++ 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 56a5e95fbbe5..aa43b5ee1798 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -588,11 +588,17 @@ def on_POSITION(self, conn: IReplicationConnection, cmd: PositionCommand) -> Non logger.debug("Handling '%s %s'", cmd.NAME, cmd.to_line()) - # Check if we early discard this position. + # Check if we can early discard this position. We can only do so for + # connected streams. stream = self._streams[cmd.stream_name] if stream.can_discard_position( cmd.instance_name, cmd.prev_token, cmd.new_token - ): + ) and self.is_stream_connected(conn, cmd.stream_name): + logger.debug( + "Discarding redundant POSITION %s/%s", + cmd.instance_name, + cmd.stream_name, + ) return self._add_command_to_stream_queue(conn, cmd) @@ -608,7 +614,12 @@ async def _process_position( if stream.can_discard_position( cmd.instance_name, cmd.prev_token, cmd.new_token - ): + ) and self.is_stream_connected(conn, cmd.stream_name): + logger.debug( + "Discarding redundant POSITION %s/%s", + cmd.instance_name, + cmd.stream_name, + ) return # We're about to go and catch up with the stream, so remove from set @@ -669,6 +680,13 @@ async def _process_position( self._streams_by_connection.setdefault(conn, set()).add(stream_name) + def is_stream_connected( + self, conn: IReplicationConnection, stream_name: str + ) -> bool: + """Return if stream has been successfully connected and is ready to + receive updates""" + return stream_name in self._streams_by_connection.get(conn, ()) + def on_REMOTE_SERVER_UP( self, conn: IReplicationConnection, cmd: RemoteServerUpCommand ) -> None: diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index a240b337c2fa..51b6b73f38c4 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -236,6 +236,15 @@ def can_discard_position( ) -> bool: # These streams can't go backwards, so we know we can ignore any # positions where the tokens are from before the current token. + + if new_token <= self.current_token(instance_name): + logger.info( + "Discarding POSITION %s/%s: %s vs %s", + self.NAME, + instance_name, + new_token, + self.current_token(instance_name), + ) return new_token <= self.current_token(instance_name) From 232916120e9d52830ac07730d9d8d93909f69b70 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 16 Nov 2023 09:53:00 +0000 Subject: [PATCH 4/5] Remove debug logging --- synapse/replication/tcp/streams/_base.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index 51b6b73f38c4..cc34dfb322f9 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -237,14 +237,6 @@ def can_discard_position( # These streams can't go backwards, so we know we can ignore any # positions where the tokens are from before the current token. - if new_token <= self.current_token(instance_name): - logger.info( - "Discarding POSITION %s/%s: %s vs %s", - self.NAME, - instance_name, - new_token, - self.current_token(instance_name), - ) return new_token <= self.current_token(instance_name) From 607ae536d42439adcb5dadd271c0566d11d768cd Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 16 Nov 2023 09:53:38 +0000 Subject: [PATCH 5/5] Updated logging --- synapse/replication/tcp/handler.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index aa43b5ee1798..1748182663f4 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -595,9 +595,11 @@ def on_POSITION(self, conn: IReplicationConnection, cmd: PositionCommand) -> Non cmd.instance_name, cmd.prev_token, cmd.new_token ) and self.is_stream_connected(conn, cmd.stream_name): logger.debug( - "Discarding redundant POSITION %s/%s", + "Discarding redundant POSITION %s/%s %s %s", cmd.instance_name, cmd.stream_name, + cmd.prev_token, + cmd.new_token, ) return @@ -616,9 +618,11 @@ async def _process_position( cmd.instance_name, cmd.prev_token, cmd.new_token ) and self.is_stream_connected(conn, cmd.stream_name): logger.debug( - "Discarding redundant POSITION %s/%s", + "Discarding redundant POSITION %s/%s %s %s", cmd.instance_name, cmd.stream_name, + cmd.prev_token, + cmd.new_token, ) return