diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index 3a939996..47d1e9fe 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -35,38 +35,76 @@ import ( func TestPartialStateJoin(t *testing.T) { // createTestServer spins up a federation server suitable for the tests in this file - createTestServer := func(t *testing.T, deployment *docker.Deployment) *federation.Server { + createTestServer := func(t *testing.T, deployment *docker.Deployment, opts ...func(*federation.Server)) *federation.Server { + t.Helper() + return federation.NewServer(t, deployment, - federation.HandleKeyRequests(), - federation.HandlePartialStateMakeSendJoinRequests(), - federation.HandleEventRequests(), - federation.HandleTransactionRequests( - func(e *gomatrixserverlib.Event) { - t.Fatalf("Received unexpected PDU: %s", string(e.JSON())) - }, - // the homeserver under test may send us presence when the joining user syncs - nil, - ), + append( + opts, // `opts` goes first so that it can override any of the following handlers + federation.HandleKeyRequests(), + federation.HandlePartialStateMakeSendJoinRequests(), + federation.HandleEventRequests(), + federation.HandleTransactionRequests( + func(e *gomatrixserverlib.Event) { + t.Fatalf("Received unexpected PDU: %s", string(e.JSON())) + }, + // the homeserver under test may send us presence when the joining user syncs + nil, + ), + )..., ) } + // createMemberEvent creates a membership event for the given user + createMembershipEvent := func( + t *testing.T, signingServer *federation.Server, room *federation.ServerRoom, userId string, + membership string, + ) *gomatrixserverlib.Event { + t.Helper() + + return signingServer.MustCreateEvent(t, room, b.Event{ + Type: "m.room.member", + StateKey: b.Ptr(userId), + Sender: userId, + Content: map[string]interface{}{ + "membership": membership, + }, + }) + } + + // createJoinEvent creates a join event for the given user + createJoinEvent := func( + t *testing.T, signingServer *federation.Server, room *federation.ServerRoom, userId string, + ) *gomatrixserverlib.Event { + t.Helper() + + return createMembershipEvent(t, signingServer, room, userId, "join") + } + + // createLeaveEvent creates a leave event for the given user + createLeaveEvent := func( + t *testing.T, signingServer *federation.Server, room *federation.ServerRoom, userId string, + ) *gomatrixserverlib.Event { + t.Helper() + + return createMembershipEvent(t, signingServer, room, userId, "leave") + } + // createTestRoom creates a room on the complement server suitable for many of the tests in this file + // The room starts with @charlie and @derek in it createTestRoom := func(t *testing.T, server *federation.Server, roomVer gomatrixserverlib.RoomVersion) *federation.ServerRoom { + t.Helper() + // create the room on the complement server, with charlie and derek as members serverRoom := server.MustMakeRoom(t, roomVer, federation.InitialRoomEvents(roomVer, server.UserID("charlie"))) - serverRoom.AddEvent(server.MustCreateEvent(t, serverRoom, b.Event{ - Type: "m.room.member", - StateKey: b.Ptr(server.UserID("derek")), - Sender: server.UserID("derek"), - Content: map[string]interface{}{ - "membership": "join", - }, - })) + serverRoom.AddEvent(createJoinEvent(t, server, serverRoom, server.UserID("derek"))) return serverRoom } // getSyncToken gets the latest sync token getSyncToken := func(t *testing.T, alice *client.CSAPI) string { + t.Helper() + _, syncToken := alice.MustSync(t, client.SyncReq{ Filter: buildLazyLoadingSyncFilter(nil), @@ -76,6 +114,16 @@ func TestPartialStateJoin(t *testing.T) { return syncToken } + // awaitPartialStateJoinCompletion waits until the joined room is no longer partial-stated + awaitPartialStateJoinCompletion := func( + t *testing.T, room *federation.ServerRoom, user *client.CSAPI, + ) { + t.Helper() + + user.MustSyncUntil(t, client.SyncReq{}, client.SyncJoinedTo(user.UserID, room.RoomID)) + t.Logf("%s's partial state join to %s completed.", user.UserID, room.RoomID) + } + deployment := Deploy(t, b.BlueprintAlice) defer deployment.Destroy(t) @@ -985,25 +1033,11 @@ func TestPartialStateJoin(t *testing.T) { serverRoom := server.MustMakeRoom(t, roomVer, initialRoomEvents) // derek joins - derekJoinEvent := server.MustCreateEvent(t, serverRoom, b.Event{ - Type: "m.room.member", - StateKey: &derek, - Sender: derek, - Content: map[string]interface{}{ - "membership": "join", - }, - }) + derekJoinEvent := createJoinEvent(t, server, serverRoom, derek) serverRoom.AddEvent(derekJoinEvent) // ... and leaves again - derekLeaveEvent := server.MustCreateEvent(t, serverRoom, b.Event{ - Type: "m.room.member", - StateKey: &derek, - Sender: derek, - Content: map[string]interface{}{ - "membership": "leave", - }, - }) + derekLeaveEvent := createLeaveEvent(t, server, serverRoom, derek) serverRoom.AddEvent(derekLeaveEvent) psjResult := beginPartialStateJoin(t, server, serverRoom, alice) @@ -1078,30 +1112,15 @@ func TestPartialStateJoin(t *testing.T) { serverRoom := server.MustMakeRoom(t, roomVer, initialRoomEvents) // derek joins - derekJoinEvent := server.MustCreateEvent(t, serverRoom, b.Event{ - Type: "m.room.member", - StateKey: &derek, - Sender: derek, - Content: map[string]interface{}{"membership": "join"}, - }) + derekJoinEvent := createJoinEvent(t, server, serverRoom, derek) serverRoom.AddEvent(derekJoinEvent) // ... and leaves again - derekLeaveEvent := server.MustCreateEvent(t, serverRoom, b.Event{ - Type: "m.room.member", - StateKey: &derek, - Sender: derek, - Content: map[string]interface{}{"membership": "leave"}, - }) + derekLeaveEvent := createLeaveEvent(t, server, serverRoom, derek) serverRoom.AddEvent(derekLeaveEvent) // Elsie joins - elsieJoinEvent := server.MustCreateEvent(t, serverRoom, b.Event{ - Type: "m.room.member", - StateKey: &elsie, - Sender: elsie, - Content: map[string]interface{}{"membership": "join"}, - }) + elsieJoinEvent := createJoinEvent(t, server, serverRoom, elsie) serverRoom.AddEvent(elsieJoinEvent) psjResult := beginPartialStateJoin(t, server, serverRoom, alice) @@ -1470,6 +1489,237 @@ func TestPartialStateJoin(t *testing.T) { t.Errorf("SendKnock: non-HTTPError: %v", err) } }) + + t.Run("Outgoing device list updates", func(t *testing.T) { + // setupOutgoingDeviceListUpdateTest sets up two complement homeservers. + // A room is created on the first complement server, containing only local users. + // Returns channels for device list updates arriving at the complement homeservers, which + // can be used with `mustReceiveDeviceListUpdate` and `mustNotReceiveDeviceListUpdate`. + setupOutgoingDeviceListUpdateTest := func( + t *testing.T, deployment *docker.Deployment, aliceLocalpart string, + opts ...func(*federation.Server), + ) ( + alice *client.CSAPI, server1 *federation.Server, server2 *federation.Server, + deviceListUpdateChannel1 chan gomatrixserverlib.DeviceListUpdateEvent, + deviceListUpdateChannel2 chan gomatrixserverlib.DeviceListUpdateEvent, + room *federation.ServerRoom, cleanup func(), + ) { + alice = deployment.RegisterUser(t, "hs1", aliceLocalpart, "secret", false) + + deviceListUpdateChannel1 = make(chan gomatrixserverlib.DeviceListUpdateEvent) + deviceListUpdateChannel2 = make(chan gomatrixserverlib.DeviceListUpdateEvent) + + createDeviceListUpdateTestServer := func( + t *testing.T, deployment *docker.Deployment, + deviceListUpdateChannel chan gomatrixserverlib.DeviceListUpdateEvent, + opts ...func(*federation.Server), + ) *federation.Server { + return createTestServer(t, deployment, + append( + opts, // `opts` goes first so that it can override any of the following handlers + federation.HandleEventAuthRequests(), + federation.HandleTransactionRequests( + func(e *gomatrixserverlib.Event) { + t.Fatalf("Received unexpected PDU: %s", string(e.JSON())) + }, + func(e gomatrixserverlib.EDU) { + if e.Type == "m.presence" { + return + } + if e.Type != "m.device_list_update" { + t.Fatalf("Received unexpected EDU: %s", e) + } + + var deviceListUpdate gomatrixserverlib.DeviceListUpdateEvent + json.Unmarshal(e.Content, &deviceListUpdate) + deviceListUpdateChannel <- deviceListUpdate + }, + ), + )..., + ) + } + + server1 = createDeviceListUpdateTestServer(t, deployment, deviceListUpdateChannel1, opts...) + server2 = createDeviceListUpdateTestServer(t, deployment, deviceListUpdateChannel2, opts...) + cancel1 := server1.Listen() + cancel2 := server2.Listen() + + room = createTestRoom(t, server1, alice.GetDefaultRoomVersion(t)) + + cleanup = func() { + cancel1() + cancel2() + close(deviceListUpdateChannel1) + close(deviceListUpdateChannel2) + } + return + } + + // renameDevice triggers an outgoing device list update + // We may want to rewrite this to update keys instead in the future. + renameDevice := func(t *testing.T, user *client.CSAPI, displayName string) { + t.Helper() + + user.MustDoFunc( + t, + "PUT", + []string{"_matrix", "client", "v3", "devices", user.DeviceID}, + client.WithJSONBody( + t, + map[string]interface{}{ + "display_name": displayName, + }, + ), + ) + + t.Logf("%s sent device list update.", user.UserID) + } + + // mustReceiveDeviceListUpdate checks that a complement homeserver has received a device + // list update since the last call. Only consumes a single device list update. + mustReceiveDeviceListUpdate := func( + t *testing.T, channel chan gomatrixserverlib.DeviceListUpdateEvent, errFormat string, + args ...interface{}, + ) { + t.Helper() + + select { + case <-time.After(1 * time.Second): + t.Fatalf(errFormat, args...) + case <-channel: + } + } + + // mustNotReceiveDeviceListUpdate checks that a complement homeserver has not received a + // device list update since the last call. + mustNotReceiveDeviceListUpdate := func( + t *testing.T, channel chan gomatrixserverlib.DeviceListUpdateEvent, errFormat string, + args ...interface{}, + ) { + t.Helper() + + select { + case <-time.After(1 * time.Second): + case <-channel: + t.Fatalf(errFormat, args...) + } + } + + // test that device list updates are sent to the remote homeservers listed in the + // `/send_join` response in a room with partial state. + t.Run("Device list updates reach all servers in partial state rooms", func(t *testing.T) { + alice, server1, server2, deviceListUpdateChannel1, deviceListUpdateChannel2, room, cleanup := setupOutgoingDeviceListUpdateTest(t, deployment, "t23alice") + defer cleanup() + + // The room starts with @charlie:server1 and @derek:server1 in it. + // @elsie:server2 joins the room before @t23alice:hs1. + room.AddEvent(createJoinEvent(t, server2, room, server2.UserID("elsie"))) + + // @t23alice:hs1 joins the room. + psjResult := beginPartialStateJoin(t, server1, room, alice) + defer psjResult.Destroy() + + // Both homeservers should receive device list updates. + renameDevice(t, alice, "A new device name 1") + mustReceiveDeviceListUpdate(t, deviceListUpdateChannel1, "@charlie and @derek did not receive device list update.") + mustReceiveDeviceListUpdate(t, deviceListUpdateChannel2, "@elsie did not receive device list update.") + t.Log("@charlie, @derek and @elsie received device list update.") + + // Finish the partial state join. + psjResult.FinishStateRequest() + awaitPartialStateJoinCompletion(t, room, alice) + + // Both homeservers should still receive device list updates. + renameDevice(t, alice, "A new device name 2") + mustReceiveDeviceListUpdate(t, deviceListUpdateChannel1, "@charlie and @derek did not receive device list update.") + mustReceiveDeviceListUpdate(t, deviceListUpdateChannel2, "@elsie did not receive device list update.") + t.Log("@charlie, @derek and @elsie received device list update.") + }) + + // test that device list updates are additionally sent to remote homeservers that join after + // the local homeserver. + t.Run("Device list updates reach newly joined servers in partial state rooms", func(t *testing.T) { + alice, server1, server2, deviceListUpdateChannel1, deviceListUpdateChannel2, room, cleanup := setupOutgoingDeviceListUpdateTest(t, deployment, "t24alice") + defer cleanup() + + // The room starts with @charlie:server1 and @derek:server1 in it. + // @t24alice:hs1 joins the room. + psjResult := beginPartialStateJoin(t, server1, room, alice) + defer psjResult.Destroy() + + // Only server1 should receive device list updates. + renameDevice(t, alice, "A new device name 1") + mustReceiveDeviceListUpdate(t, deviceListUpdateChannel1, "@charlie and @derek did not receive device list update.") + mustNotReceiveDeviceListUpdate(t, deviceListUpdateChannel2, "@elsie received device list update unexpectedly.") + t.Log("@charlie and @derek received device list update.") + + // @elsie:server2 joins the room. + // Make server1 send the event to the homeserver, since server2's rooms list isn't set + // up right and it can't answer queries about events in the room. + joinEvent := createJoinEvent(t, server2, room, server2.UserID("elsie")) + room.AddEvent(joinEvent) + server1.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{joinEvent.JSON()}, nil) + awaitEventViaSync(t, alice, room.RoomID, joinEvent.EventID(), "") + + // Both servers should receive device list updates now. + renameDevice(t, alice, "A new device name 2") + mustReceiveDeviceListUpdate(t, deviceListUpdateChannel1, "@charlie and @derek did not receive device list update.") + mustReceiveDeviceListUpdate(t, deviceListUpdateChannel2, "@elsie did not receive device list update.") + t.Log("@charlie, @derek and @elsie received device list update.") + + // Finish the partial state join. + psjResult.FinishStateRequest() + awaitPartialStateJoinCompletion(t, room, alice) + + // Both homeservers should still receive device list updates. + renameDevice(t, alice, "A new device name 3") + mustReceiveDeviceListUpdate(t, deviceListUpdateChannel1, "@charlie and @derek did not receive device list update.") + mustReceiveDeviceListUpdate(t, deviceListUpdateChannel2, "@elsie did not receive device list update.") + t.Log("@charlie, @derek and @elsie received device list update.") + }) + + // test that device list updates are sent to the remote homeservers listed in the + // `/send_join` response in a room with partial state, even after they leave. The homeserver + // under test must do so, as it has no way of knowing that a remote homeserver has no more + // users in the room. + t.Run("Device list updates no longer reach departed servers after partial state join completes", func(t *testing.T) { + alice, server1, server2, deviceListUpdateChannel1, deviceListUpdateChannel2, room, cleanup := setupOutgoingDeviceListUpdateTest(t, deployment, "t25alice") + defer cleanup() + + // The room starts with @charlie:server1 and @derek:server1 in it. + // @elsie:server2 joins the room before @t25alice:hs1. + room.AddEvent(createJoinEvent(t, server2, room, server2.UserID("elsie"))) + + // @t25alice:hs1 joins the room. + psjResult := beginPartialStateJoin(t, server1, room, alice) + defer psjResult.Destroy() + + // @elsie:server2 leaves the room. + // Make server1 send the event to the homeserver, since server2's rooms list isn't set + // up right and it can't answer queries about events in the room. + leaveEvent := createLeaveEvent(t, server2, room, server2.UserID("elsie")) + room.AddEvent(leaveEvent) + server1.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{leaveEvent.JSON()}, nil) + awaitEventViaSync(t, alice, room.RoomID, leaveEvent.EventID(), "") + + // Both homeservers should receive device list updates, since hs1 cannot know that + // @elsie was the last user from server2 in the room. + renameDevice(t, alice, "A new device name 1") + mustReceiveDeviceListUpdate(t, deviceListUpdateChannel1, "@charlie and @derek did not receive device list update.") + mustReceiveDeviceListUpdate(t, deviceListUpdateChannel2, "@elsie did not receive device list update.") + t.Log("@charlie, @derek and @elsie received device list update.") + + // Finish the partial state join. + psjResult.FinishStateRequest() + awaitPartialStateJoinCompletion(t, room, alice) + + // @elsie:server2 should no longer receive device list updates. + renameDevice(t, alice, "A new device name 2") + mustReceiveDeviceListUpdate(t, deviceListUpdateChannel1, "@charlie and @derek did not receive device list update.") + mustNotReceiveDeviceListUpdate(t, deviceListUpdateChannel2, "@elsie received device list update unexpectedly.") + t.Log("@charlie and @derek received device list update.") + }) + }) } // test reception of an event over federation during a resync @@ -1561,6 +1811,8 @@ func testReceiveEventDuringPartialStateJoin( // awaitEventViaSync waits for alice to be able to see a given event via an incremental lazy-loading // /sync and returns the new sync token after func awaitEventViaSync(t *testing.T, alice *client.CSAPI, roomID string, eventID string, syncToken string) string { + t.Helper() + // check that a lazy-loading sync can see the event syncToken = alice.MustSyncUntil(t, client.SyncReq{ @@ -1577,6 +1829,8 @@ func awaitEventViaSync(t *testing.T, alice *client.CSAPI, roomID string, eventID // awaitEventArrival waits for alice to be able to see a given event via /event func awaitEventArrival(t *testing.T, timeout time.Duration, alice *client.CSAPI, roomID string, eventID string) { + t.Helper() + // Alice should be able to see the event with an /event request. We might have to try it a few times. alice.DoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "event", eventID}, client.WithRetryUntil(timeout, func(res *http.Response) bool {