Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/7565.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix exception `'GenericWorkerReplicationHandler' object has no attribute 'send_federation_ack'`, introduced in v1.13.0.
68 changes: 44 additions & 24 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import contextlib
import logging
import sys
from typing import Dict, Iterable
from typing import Dict, Iterable, Optional, Set

from typing_extensions import ContextManager

Expand Down Expand Up @@ -677,10 +677,9 @@ def __init__(self, hs):
self.notify_pushers = hs.config.start_pushers
self.pusher_pool = hs.get_pusherpool()

self.send_handler = None # type: Optional[FederationSenderHandler]
if hs.config.send_federation:
self.send_handler = FederationSenderHandler(hs, self)
else:
self.send_handler = None
self.send_handler = FederationSenderHandler(hs)

async def on_rdata(self, stream_name, instance_name, token, rows):
await super().on_rdata(stream_name, instance_name, token, rows)
Expand Down Expand Up @@ -718,7 +717,7 @@ async def _process_and_notify(self, stream_name, instance_name, token, rows):
if entities:
self.notifier.on_new_event("to_device_key", token, users=entities)
elif stream_name == DeviceListsStream.NAME:
all_room_ids = set()
all_room_ids = set() # type: Set[str]
for row in rows:
if row.entity.startswith("@"):
room_ids = await self.store.get_rooms_for_user(row.entity)
Expand Down Expand Up @@ -769,24 +768,33 @@ def on_remote_server_up(self, server: str):


class FederationSenderHandler(object):
"""Processes the replication stream and forwards the appropriate entries
to the federation sender.
"""Processes the fedration replication stream

This class is only instantiate on the worker responsible for sending outbound
federation transactions. It receives rows from the replication stream and forwards
the appropriate entries to the FederationSender class.
"""

def __init__(self, hs: GenericWorkerServer, replication_client):
def __init__(self, hs: GenericWorkerServer):
self.store = hs.get_datastore()
self._is_mine_id = hs.is_mine_id
self.federation_sender = hs.get_federation_sender()
self.replication_client = replication_client

self._hs = hs

# if the worker is restarted, we want to pick up where we left off in
# the replication stream, so load the position from the database.
#
# XXX is this actually worthwhile? Whenever the master is restarted, we'll
# drop some rows anyway (which is mostly fine because we're only dropping
# typing and presence notifications). If the replication stream is
# unreliable, why do we do all this hoop-jumping to store the position in the
# database? See also https://github.com/matrix-org/synapse/issues/7535.
#
self.federation_position = self.store.federation_out_pos_startup
self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer")

self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer")
self._last_ack = self.federation_position

self._room_serials = {}
self._room_typing = {}

def on_start(self):
# There may be some events that are persisted but haven't been sent,
# so send them now.
Expand Down Expand Up @@ -849,22 +857,34 @@ async def _on_new_receipts(self, rows):
await self.federation_sender.send_read_receipt(receipt_info)

async def update_token(self, token):
"""Update the record of where we have processed to in the federation stream.

Called after we have processed a an update received over replication. Sends
a FEDERATION_ACK back to the master, and stores the token that we have processed
in `federation_stream_position` so that we can restart where we left off.
"""
try:
self.federation_position = token

# We linearize here to ensure we don't have races updating the token
#
# XXX this appears to be redundant, since the ReplicationCommandHandler
# has a linearizer which ensures that we only process one line of
# replication data at a time. Should we remove it, or is it doing useful
# service for robustness? Or could we replace it with an assertion that
# we're not being re-entered?

with (await self._fed_position_linearizer.queue(None)):
if self._last_ack < self.federation_position:
await self.store.update_federation_out_pos(
"federation", self.federation_position
)
await self.store.update_federation_out_pos(
"federation", self.federation_position
)

# We ACK this token over replication so that the master can drop
# its in memory queues
self.replication_client.send_federation_ack(
self.federation_position
)
self._last_ack = self.federation_position
# We ACK this token over replication so that the master can drop
# its in memory queues
self._hs.get_tcp_replication().send_federation_ack(
self.federation_position
)
self._last_ack = self.federation_position
except Exception:
logger.exception("Error updating federation stream position")

Expand Down
71 changes: 71 additions & 0 deletions tests/replication/test_federation_ack.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# -*- coding: utf-8 -*-
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import mock

from synapse.app.generic_worker import GenericWorkerServer
from synapse.replication.tcp.commands import FederationAckCommand
from synapse.replication.tcp.protocol import AbstractConnection
from synapse.replication.tcp.streams.federation import FederationStream

from tests.unittest import HomeserverTestCase


class FederationAckTestCase(HomeserverTestCase):
def default_config(self) -> dict:
config = super().default_config()
config["worker_app"] = "synapse.app.federation_sender"
config["send_federation"] = True
return config

def make_homeserver(self, reactor, clock):
hs = self.setup_test_homeserver(homeserverToUse=GenericWorkerServer)
return hs

def test_federation_ack_sent(self):
"""A FEDERATION_ACK should be sent back after each RDATA federation

This test checks that the federation sender is correctly sending back
FEDERATION_ACK messages. The test works by spinning up a federation_sender
worker server, and then fishing out its ReplicationCommandHandler. We wire
the RCH up to a mock connection (so that we can observe the command being sent)
and then poke in an RDATA row.

XXX: it might be nice to do this by pretending to be a synapse master worker
(or a redis server), and having the worker connect to us via a mocked-up TCP
transport, rather than assuming that the implementation has a
ReplicationCommandHandler.
"""
rch = self.hs.get_tcp_replication()

# wire up the ReplicationCommandHandler to a mock connection
mock_connection = mock.Mock(spec=AbstractConnection)
rch.new_connection(mock_connection)

# tell it it received an RDATA row
self.get_success(
rch.on_rdata(
"federation",
"master",
token=10,
rows=[FederationStream.FederationStreamRow(type="x", data=[1, 2, 3])],
)
)

# now check that the FEDERATION_ACK was sent
mock_connection.send_command.assert_called_once()
cmd = mock_connection.send_command.call_args[0][0]
assert isinstance(cmd, FederationAckCommand)
self.assertEqual(cmd.token, 10)