diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index 9ed3d78b..f66a56ae 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -35,31 +35,134 @@ import ( "github.com/matrix-org/complement/internal/must" ) -func TestPartialStateJoin(t *testing.T) { - // createTestServer spins up a federation server suitable for the tests in this file - createTestServer := func(t *testing.T, deployment *docker.Deployment, opts ...func(*federation.Server)) *federation.Server { - t.Helper() +type server struct { + *federation.Server - return federation.NewServer(t, deployment, - append( - opts, // `opts` goes first so that it can override any of the following handlers - federation.HandleKeyRequests(), - federation.HandlePartialStateMakeSendJoinRequests(), - federation.HandleEventRequests(), - federation.HandleTransactionRequests( - func(e *gomatrixserverlib.Event) { - t.Fatalf("Received unexpected PDU: %s", string(e.JSON())) - }, - // the homeserver under test may send us presence when the joining user syncs - nil, - ), - )..., - ) + pduHandlers map[int]func(*gomatrixserverlib.Event) bool + eduHandlers map[int]func(gomatrixserverlib.EDU) bool + + nextPDUHandlerKey int + nextEDUHandlerKey int +} + +// createTestServer spins up a federation server suitable for the tests in this file +// +// The `federation.HandleTransactionRequests` handler must not be used. +// Instead, `AddPDUHandler` and `AddEDUHandler` should be used. +func createTestServer(t *testing.T, deployment *docker.Deployment, opts ...func(*federation.Server)) *server { + t.Helper() + + server := &server{ + pduHandlers: map[int]func(*gomatrixserverlib.Event) bool{}, + eduHandlers: map[int]func(gomatrixserverlib.EDU) bool{}, + nextPDUHandlerKey: 0, + nextEDUHandlerKey: 0, } + server.Server = federation.NewServer(t, deployment, + append( + opts, + federation.HandleKeyRequests(), + federation.HandlePartialStateMakeSendJoinRequests(), + federation.HandleEventRequests(), + federation.HandleTransactionRequests( + func(e *gomatrixserverlib.Event) { + expected := false + for _, pduHandler := range server.pduHandlers { + expected = pduHandler(e) || expected + } + if !expected { + t.Errorf("Received unexpected PDU: %s", string(e.JSON())) + } + }, + func(edu gomatrixserverlib.EDU) { + expected := false + for _, eduHandler := range server.eduHandlers { + expected = eduHandler(edu) || expected + } + + if !expected { + t.Errorf("Received unexpected EDU: %s: %s", edu.Type, string(edu.Content)) + } + }, + ), + )..., + ) + + // the homeserver under test may send us presence when the joining user syncs + server.AddEDUHandler(func(edu gomatrixserverlib.EDU) bool { return edu.Type == "m.presence" }) + + return server +} + +// AddPDUHandler adds a PDU callback that returns `true` if it expected the given PDU. +// When a PDU is received which is not expected by any PDU callback, the ongoing test is failed. +// Returns a function to remove the PDU callback. +func (s *server) AddPDUHandler(pduHandler func(*gomatrixserverlib.Event) bool) func() { + pduHandlerKey := s.nextPDUHandlerKey + s.nextPDUHandlerKey++ + s.pduHandlers[pduHandlerKey] = pduHandler + + return func() { + delete(s.pduHandlers, pduHandlerKey) + } +} + +// AddEDUHandler adds an EDU callback that returns `true` if it expected the given EDU. +// When an EDU is received which is not expected by any EDU callback, the ongoing test is failed. +// Returns a function to remove the EDU callback. +func (s *server) AddEDUHandler(eduHandler func(gomatrixserverlib.EDU) bool) func() { + eduHandlerKey := s.nextEDUHandlerKey + s.nextEDUHandlerKey++ + s.eduHandlers[eduHandlerKey] = eduHandler + + return func() { + delete(s.eduHandlers, eduHandlerKey) + } +} + +// WithWaitForLeave runs the given action and waits for the user to leave the room. +func (s *server) WithWaitForLeave( + t *testing.T, room *federation.ServerRoom, userID string, leaveAction func(), +) { + leaveChannel := make(chan *gomatrixserverlib.Event, 10) + removePDUHandler := s.AddPDUHandler( + func(e *gomatrixserverlib.Event) bool { + if membership, _ := e.Membership(); e.Type() == "m.room.member" && + *e.StateKey() == userID && + membership == "leave" { + leaveChannel <- e + return true + } + return false + }, + ) + defer removePDUHandler() + + leaveAction() + + memberEvent := room.CurrentState("m.room.member", userID) + membership := "" + if memberEvent != nil { + membership, _ = memberEvent.Membership() + } + if membership == "leave" { + t.Logf("%s has already seen %s leave test room %s.", s.ServerName(), userID, room.RoomID) + } else { + select { + case <-leaveChannel: + t.Logf("%s saw %s leave test room %s.", s.ServerName(), userID, room.RoomID) + break + case <-time.After(1 * time.Second): + t.Errorf("%s timed out waiting for %s to leave test room %s.", s.ServerName(), userID, room.RoomID) + } + } +} + +func TestPartialStateJoin(t *testing.T) { // createMemberEvent creates a membership event for the given user createMembershipEvent := func( - t *testing.T, signingServer *federation.Server, room *federation.ServerRoom, userId string, + t *testing.T, signingServer *server, room *federation.ServerRoom, userId string, membership string, ) *gomatrixserverlib.Event { t.Helper() @@ -76,7 +179,7 @@ func TestPartialStateJoin(t *testing.T) { // createJoinEvent creates a join event for the given user createJoinEvent := func( - t *testing.T, signingServer *federation.Server, room *federation.ServerRoom, userId string, + t *testing.T, signingServer *server, room *federation.ServerRoom, userId string, ) *gomatrixserverlib.Event { t.Helper() @@ -85,7 +188,7 @@ func TestPartialStateJoin(t *testing.T) { // createLeaveEvent creates a leave event for the given user createLeaveEvent := func( - t *testing.T, signingServer *federation.Server, room *federation.ServerRoom, userId string, + t *testing.T, signingServer *server, room *federation.ServerRoom, userId string, ) *gomatrixserverlib.Event { t.Helper() @@ -94,7 +197,7 @@ func TestPartialStateJoin(t *testing.T) { // createTestRoom creates a room on the complement server suitable for many of the tests in this file // The room starts with @charlie and @derek in it - createTestRoom := func(t *testing.T, server *federation.Server, roomVer gomatrixserverlib.RoomVersion) *federation.ServerRoom { + createTestRoom := func(t *testing.T, server *server, roomVer gomatrixserverlib.RoomVersion) *federation.ServerRoom { t.Helper() // create the room on the complement server, with charlie and derek as members @@ -169,15 +272,7 @@ func TestPartialStateJoin(t *testing.T) { _, eagerSyncToken = alice.MustSync(t, client.SyncReq{}) t.Log("1. Partial join Alice to a remote room.") - server := createTestServer( - t, - deployment, - // Allow PDUs and EDUs, since Alice will send a message in the room. - federation.HandleTransactionRequests( - func(e *gomatrixserverlib.Event) {}, - func(e gomatrixserverlib.EDU) {}, - ), - ) + server := createTestServer(t, deployment) cancel := server.Listen() defer cancel() serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t)) @@ -205,6 +300,8 @@ func TestPartialStateJoin(t *testing.T) { ) t.Log("4. Have Alice send a message to the remote room.") + removePDUHandler := server.AddPDUHandler(func(*gomatrixserverlib.Event) bool { return true }) + defer removePDUHandler() messageId := alice.SendEventUnsynced(t, serverRoom.RoomID, b.Event{ Type: "m.room.message", Content: map[string]interface{}{ @@ -359,28 +456,23 @@ func TestPartialStateJoin(t *testing.T) { t.Run("CanSendEventsDuringPartialStateJoin", func(t *testing.T) { alice := deployment.RegisterUser(t, "hs1", "t3alice", "secret", false) - pdusChannel := make(chan *gomatrixserverlib.Event) - server := createTestServer( - t, - deployment, - federation.HandleTransactionRequests( - func(e *gomatrixserverlib.Event) { - pdusChannel <- e - }, - // we don't expect EDUs - func(e gomatrixserverlib.EDU) { - if e.Type != "m.presence" { - t.Fatalf("Received unexpected EDU: %s", e.Content) - } - }, - ), - ) + server := createTestServer(t, deployment) cancel := server.Listen() defer cancel() serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t)) psjResult := beginPartialStateJoin(t, server, serverRoom, alice) defer psjResult.Destroy(t) + pdusChannel := make(chan *gomatrixserverlib.Event) + removePDUHandler := server.AddPDUHandler( + func(e *gomatrixserverlib.Event) bool { + pdusChannel <- e + + return true + }, + ) + defer removePDUHandler() + alice.Client.Timeout = 2 * time.Second paths := []string{"_matrix", "client", "v3", "rooms", serverRoom.RoomID, "send", "m.room.message", "0"} res := alice.MustDoFunc(t, "PUT", paths, client.WithJSONBody(t, map[string]interface{}{ @@ -725,7 +817,7 @@ func TestPartialStateJoin(t *testing.T) { defer psjResult.Destroy(t) // the HS will make an /event_auth request for the event - federation.HandleEventAuthRequests()(server) + federation.HandleEventAuthRequests()(server.Server) event := psjResult.CreateMessageEvent(t, "derek", nil) t.Logf("Derek created event with ID %s", event.EventID()) @@ -762,7 +854,7 @@ func TestPartialStateJoin(t *testing.T) { t.Logf("Derek created event B with ID %s", eventB.EventID()) // the HS will make an /event_auth request for event A - federation.HandleEventAuthRequests()(server) + federation.HandleEventAuthRequests()(server.Server) // the HS will make a /get_missing_events request for the missing prev events of event B handleGetMissingEventsRequests(t, server, serverRoom, @@ -802,7 +894,7 @@ func TestPartialStateJoin(t *testing.T) { t.Logf("Derek created event B with ID %s", eventB.EventID()) // the HS will make an /event_auth request for event A - federation.HandleEventAuthRequests()(server) + federation.HandleEventAuthRequests()(server.Server) // the HS will make a /get_missing_events request for the missing prev event of event B handleGetMissingEventsRequests(t, server, serverRoom, @@ -870,7 +962,7 @@ func TestPartialStateJoin(t *testing.T) { defer psjResult.Destroy(t) // the HS will make an /event_auth request for the event - federation.HandleEventAuthRequests()(server) + federation.HandleEventAuthRequests()(server.Server) // derek sends a message into the room. event := psjResult.CreateMessageEvent(t, "derek", nil) @@ -918,7 +1010,7 @@ func TestPartialStateJoin(t *testing.T) { ) // the HS will make an /event_auth request for the event - federation.HandleEventAuthRequests()(server) + federation.HandleEventAuthRequests()(server.Server) // derek sends two messages into the room. event1 := psjResult.CreateMessageEvent(t, "derek", nil) @@ -988,7 +1080,7 @@ func TestPartialStateJoin(t *testing.T) { ) // the HS will make an /event_auth request for the event - federation.HandleEventAuthRequests()(server) + federation.HandleEventAuthRequests()(server.Server) // derek sends a message into the room. event := psjResult.CreateMessageEvent(t, "derek", nil) @@ -1419,7 +1511,7 @@ func TestPartialStateJoin(t *testing.T) { defer psjResult.Destroy(t) // the HS will make an /event_auth request for the event - federation.HandleEventAuthRequests()(server) + federation.HandleEventAuthRequests()(server.Server) // derek sends a state event, despite not having permission to send state. This should be rejected. badStateEvent := server.MustCreateEvent(t, serverRoom, b.Event{ @@ -1484,7 +1576,7 @@ func TestPartialStateJoin(t *testing.T) { defer cancel() // the HS will make an /event_auth request for the event - federation.HandleEventAuthRequests()(server) + federation.HandleEventAuthRequests()(server.Server) // create the room on the complement server, with charlie as the founder, and derek as a user with permission // to send state. He later leaves. @@ -1562,7 +1654,7 @@ func TestPartialStateJoin(t *testing.T) { defer cancel() // the HS will make an /event_auth request for the event - federation.HandleEventAuthRequests()(server) + federation.HandleEventAuthRequests()(server.Server) // create the room on the complement server, with charlie as the founder, derek as a user with permission // to kick users, and elsie as a bystander who has permission to send state. @@ -1757,7 +1849,7 @@ func TestPartialStateJoin(t *testing.T) { // This is permissible because testServer1 is fully joined to the room. // We can't actually use /make_join because host.docker.internal doesn't resolve, // so compute it without making any requests: - makeJoinResp, err := federation.MakeRespMakeJoin(testServer1, serverRoom, testServer2.UserID("daniel")) + makeJoinResp, err := federation.MakeRespMakeJoin(testServer1.Server, serverRoom, testServer2.UserID("daniel")) if err != nil { t.Fatalf("MakeRespMakeJoin failed : %s", err) } @@ -1930,7 +2022,7 @@ func TestPartialStateJoin(t *testing.T) { // This is permissible because testServer1 is fully joined to the room. // We can't actually use /make_knock because host.docker.internal doesn't resolve, // so compute it without making any requests: - makeKnockResp, err := federation.MakeRespMakeKnock(testServer1, serverRoom, testServer2.UserID("daniel")) + makeKnockResp, err := federation.MakeRespMakeKnock(testServer1.Server, serverRoom, testServer2.UserID("daniel")) if err != nil { t.Fatalf("MakeRespMakeKnock failed : %s", err) } @@ -1966,7 +2058,7 @@ func TestPartialStateJoin(t *testing.T) { t *testing.T, deployment *docker.Deployment, aliceLocalpart string, opts ...func(*federation.Server), ) ( - alice *client.CSAPI, server1 *federation.Server, server2 *federation.Server, + alice *client.CSAPI, server1 *server, server2 *server, deviceListUpdateChannel1 chan gomatrixserverlib.DeviceListUpdateEvent, deviceListUpdateChannel2 chan gomatrixserverlib.DeviceListUpdateEvent, room *federation.ServerRoom, cleanup func(), @@ -1980,31 +2072,28 @@ func TestPartialStateJoin(t *testing.T) { t *testing.T, deployment *docker.Deployment, deviceListUpdateChannel chan gomatrixserverlib.DeviceListUpdateEvent, opts ...func(*federation.Server), - ) *federation.Server { - return createTestServer(t, deployment, + ) *server { + server := createTestServer(t, deployment, append( opts, // `opts` goes first so that it can override any of the following handlers federation.HandleEventAuthRequests(), - federation.HandleTransactionRequests( - func(e *gomatrixserverlib.Event) { - t.Fatalf("Received unexpected PDU: %s", string(e.JSON())) - }, - func(e gomatrixserverlib.EDU) { - if e.Type == "m.presence" { - return - } - if e.Type != "m.device_list_update" { - t.Fatalf("Received unexpected EDU: %s", e) - } - - t.Logf("Complement server received m.device_list_update: %v", string(e.Content)) - var deviceListUpdate gomatrixserverlib.DeviceListUpdateEvent - json.Unmarshal(e.Content, &deviceListUpdate) - deviceListUpdateChannel <- deviceListUpdate - }, - ), )..., ) + + server.AddEDUHandler(func(edu gomatrixserverlib.EDU) bool { + if edu.Type != "m.device_list_update" { + return false + } + + t.Logf("Complement server received m.device_list_update: %v", string(edu.Content)) + var deviceListUpdate gomatrixserverlib.DeviceListUpdateEvent + json.Unmarshal(edu.Content, &deviceListUpdate) + deviceListUpdateChannel <- deviceListUpdate + + return true + }) + + return server } server1 = createDeviceListUpdateTestServer(t, deployment, deviceListUpdateChannel1, opts...) @@ -2194,7 +2283,7 @@ func TestPartialStateJoin(t *testing.T) { // under test joins. setupIncorrectlyAcceptedKick := func( t *testing.T, deployment *docker.Deployment, alice *client.CSAPI, - server1 *federation.Server, server2 *federation.Server, + server1 *server, server2 *server, deviceListUpdateChannel1 chan gomatrixserverlib.DeviceListUpdateEvent, deviceListUpdateChannel2 chan gomatrixserverlib.DeviceListUpdateEvent, room *federation.ServerRoom, @@ -2262,7 +2351,7 @@ func TestPartialStateJoin(t *testing.T) { // Returns @alice:hs1's sync token after @elsie:server2 has left the partial state room. setupAnotherSharedRoomThenLeave := func( t *testing.T, deployment *docker.Deployment, alice *client.CSAPI, - server1 *federation.Server, server2 *federation.Server, + server1 *server, server2 *server, partialStateRoom *federation.ServerRoom, syncToken string, ) string { elsie := server2.UserID("elsie") @@ -2296,7 +2385,7 @@ func TestPartialStateJoin(t *testing.T) { // list updates once hs1's partial state join has completed. testMissedDeviceListUpdateSentOncePartialJoinCompletes := func( t *testing.T, deployment *docker.Deployment, alice *client.CSAPI, - server1 *federation.Server, server2 *federation.Server, + server1 *server, server2 *server, deviceListUpdateChannel1 chan gomatrixserverlib.DeviceListUpdateEvent, deviceListUpdateChannel2 chan gomatrixserverlib.DeviceListUpdateEvent, room *federation.ServerRoom, psjResult partialStateJoinResult, syncToken string, @@ -2445,7 +2534,7 @@ func TestPartialStateJoin(t *testing.T) { setupDeviceListCachingTest := func( t *testing.T, deployment *docker.Deployment, aliceLocalpart string, ) ( - alice *client.CSAPI, server *federation.Server, userDevicesQueryChannel chan string, + alice *client.CSAPI, server *server, userDevicesQueryChannel chan string, room *federation.ServerRoom, sendDeviceListUpdate func(string), cleanup func(), ) { alice = deployment.RegisterUser(t, "hs1", aliceLocalpart, "secret", false) @@ -2976,7 +3065,7 @@ func TestPartialStateJoin(t *testing.T) { // @elsie will be discovered to be no longer in the room. setupUserIncorrectlyInRoom := func( t *testing.T, deployment *docker.Deployment, alice *client.CSAPI, - server *federation.Server, room *federation.ServerRoom, + server *server, room *federation.ServerRoom, ) (syncToken string, psjResult partialStateJoinResult) { charlie := server.UserID("charlie") derek := server.UserID("derek") @@ -3327,11 +3416,7 @@ func TestPartialStateJoin(t *testing.T) { alice := deployment.RegisterUser(t, "hs1", "t44alice", "secret", false) bob := deployment.RegisterUser(t, "hs1", "t44bob", "secret", false) - server := createTestServer( - t, - deployment, - federation.HandleTransactionRequests(nil, nil), - ) + server := createTestServer(t, deployment) cancel := server.Listen() defer cancel() serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t)) @@ -3339,6 +3424,7 @@ func TestPartialStateJoin(t *testing.T) { psjResult := beginPartialStateJoin(t, server, serverRoom, alice) defer psjResult.Destroy(t) + server.AddPDUHandler(func(e *gomatrixserverlib.Event) bool { return true }) bob.JoinRoom(t, serverRoom.RoomID, []string{server.ServerName()}) alice.MustSyncUntil(t, client.SyncReq{ @@ -3362,22 +3448,7 @@ func TestPartialStateJoin(t *testing.T) { t.Run("Can change display name during partial state join", func(t *testing.T) { alice := deployment.RegisterUser(t, "hs1", "t45alice", "secret", false) - pdusChannel := make(chan *gomatrixserverlib.Event) - server := createTestServer( - t, - deployment, - federation.HandleTransactionRequests( - func(e *gomatrixserverlib.Event) { - pdusChannel <- e - }, - // we don't expect EDUs - func(e gomatrixserverlib.EDU) { - if e.Type != "m.presence" { - t.Fatalf("Received unexpected EDU: %s", e.Content) - } - }, - ), - ) + server := createTestServer(t, deployment) cancel := server.Listen() defer cancel() serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t)) @@ -3385,6 +3456,16 @@ func TestPartialStateJoin(t *testing.T) { psjResult := beginPartialStateJoin(t, server, serverRoom, alice) defer psjResult.Destroy(t) + pdusChannel := make(chan *gomatrixserverlib.Event) + removePDUHandler := server.AddPDUHandler( + func(e *gomatrixserverlib.Event) bool { + pdusChannel <- e + + return true + }, + ) + defer removePDUHandler() + alice.MustDoFunc(t, "PUT", []string{"_matrix", "client", "v3", "profile", alice.UserID, "displayname"}, @@ -3414,12 +3495,7 @@ func TestPartialStateJoin(t *testing.T) { // Before testing that leaves during resyncs are seen during resyncs, sanity // check that leaves during resyncs appear after the resync. alice := deployment.RegisterUser(t, "hs1", "t42alice", "secret", false) - handleTransactions := federation.HandleTransactionRequests( - // Accept all PDUs and EDUs - func(e *gomatrixserverlib.Event) {}, - func(e gomatrixserverlib.EDU) {}, - ) - server := createTestServer(t, deployment, handleTransactions) + server := createTestServer(t, deployment) cancel := server.Listen() defer cancel() @@ -3437,6 +3513,7 @@ func TestPartialStateJoin(t *testing.T) { leaveCompleted := NewWaiter() t.Log("Alice starts a leave request") + server.AddPDUHandler(func(e *gomatrixserverlib.Event) bool { return true }) go func() { alice.LeaveRoom(t, serverRoom.RoomID) t.Log("Alice's leave request completed") @@ -3458,33 +3535,8 @@ func TestPartialStateJoin(t *testing.T) { }) t.Run("does not wait for resync", func(t *testing.T) { - // Prepare to listen for leave events from the HS under test. - // We're only expecting one leave event, but give the channel extra capacity - // to avoid deadlock if the HS does something silly. - leavesChannel := make(chan *gomatrixserverlib.Event, 10) - handleTransactions := federation.HandleTransactionRequests( - func(e *gomatrixserverlib.Event) { - if e.Type() == "m.room.member" { - if ok := gjson.ValidBytes(e.Content()); !ok { - t.Fatalf("Received event %s with invalid content: %v", e.EventID(), e.Content()) - } - content := gjson.ParseBytes(e.Content()) - membership := content.Get("membership") - if membership.Exists() && membership.Str == "leave" { - leavesChannel <- e - } - } - }, - // we don't care about EDUs - func(e gomatrixserverlib.EDU) {}, - ) - alice := deployment.RegisterUser(t, "hs1", "t43alice", "secret", false) - server := createTestServer( - t, - deployment, - handleTransactions, - ) + server := createTestServer(t, deployment) cancel := server.Listen() defer cancel() @@ -3509,6 +3561,26 @@ func TestPartialStateJoin(t *testing.T) { ) t.Logf("Alice's leave is received by the resident server") + // Prepare to listen for leave events from the HS under test. + // We're only expecting one leave event, but give the channel extra capacity + // to avoid deadlock if the HS does something silly. + leavesChannel := make(chan *gomatrixserverlib.Event, 10) + server.AddPDUHandler( + func(e *gomatrixserverlib.Event) bool { + if e.Type() == "m.room.member" { + if ok := gjson.ValidBytes(e.Content()); !ok { + t.Fatalf("Received event %s with invalid content: %v", e.EventID(), e.Content()) + } + content := gjson.ParseBytes(e.Content()) + membership := content.Get("membership") + if membership.Exists() && membership.Str == "leave" { + leavesChannel <- e + } + } + + return true + }, + ) select { case <-time.After(1 * time.Second): t.Fatal("Resident server did not receive Alice's leave") @@ -3523,12 +3595,7 @@ func TestPartialStateJoin(t *testing.T) { t.Run("works after a second partial join", func(t *testing.T) { alice := deployment.RegisterUser(t, "hs1", "t47alice", "secret", false) bob := deployment.RegisterUser(t, "hs1", "t47bob", "secret", false) - handleTransactions := federation.HandleTransactionRequests( - // Accept all PDUs and EDUs - func(e *gomatrixserverlib.Event) {}, - func(e gomatrixserverlib.EDU) {}, - ) - server := createTestServer(t, deployment, handleTransactions) + server := createTestServer(t, deployment) cancel := server.Listen() defer cancel() @@ -3558,6 +3625,7 @@ func TestPartialStateJoin(t *testing.T) { ) t.Log("Alice leaves the room") + server.AddPDUHandler(func(e *gomatrixserverlib.Event) bool { return true }) alice.LeaveRoom(t, serverRoom.RoomID) t.Log("Alice sees Alice's leave") @@ -3577,12 +3645,7 @@ func TestPartialStateJoin(t *testing.T) { t.Run("succeeds, then rejoin succeeds without resync completing", func(t *testing.T) { alice := deployment.RegisterUser(t, "hs1", "t48alice", "secret", false) - handleTransactions := federation.HandleTransactionRequests( - // Accept all PDUs and EDUs - func(e *gomatrixserverlib.Event) {}, - func(e gomatrixserverlib.EDU) {}, - ) - server := createTestServer(t, deployment, handleTransactions) + server := createTestServer(t, deployment) cancel := server.Listen() defer cancel() @@ -3599,6 +3662,7 @@ func TestPartialStateJoin(t *testing.T) { ) t.Log("Alice leaves the room") + server.AddPDUHandler(func(e *gomatrixserverlib.Event) bool { return true }) alice.LeaveRoom(t, serverRoom.RoomID) t.Log("Alice sees Alice's leave") @@ -3622,12 +3686,7 @@ func TestPartialStateJoin(t *testing.T) { t.Run("succeeds, then another user can join without resync completing", func(t *testing.T) { alice := deployment.RegisterUser(t, "hs1", "t49alice", "secret", false) bob := deployment.RegisterUser(t, "hs1", "t49bob", "secret", false) - handleTransactions := federation.HandleTransactionRequests( - // Accept all PDUs and EDUs - func(e *gomatrixserverlib.Event) {}, - func(e gomatrixserverlib.EDU) {}, - ) - server := createTestServer(t, deployment, handleTransactions) + server := createTestServer(t, deployment) cancel := server.Listen() defer cancel() @@ -3647,6 +3706,7 @@ func TestPartialStateJoin(t *testing.T) { ) t.Log("Alice leaves the room") + server.AddPDUHandler(func(e *gomatrixserverlib.Event) bool { return true }) alice.LeaveRoom(t, serverRoom.RoomID) t.Log("Alice sees Alice's leave") @@ -3669,12 +3729,7 @@ func TestPartialStateJoin(t *testing.T) { t.Run("can be triggered by remote kick", func(t *testing.T) { alice := deployment.RegisterUser(t, "hs1", "t50alice", "secret", false) - handleTransactions := federation.HandleTransactionRequests( - // Accept all PDUs and EDUs - func(e *gomatrixserverlib.Event) {}, - func(e gomatrixserverlib.EDU) {}, - ) - server := createTestServer(t, deployment, handleTransactions) + server := createTestServer(t, deployment) cancel := server.Listen() defer cancel() @@ -3722,12 +3777,7 @@ func TestPartialStateJoin(t *testing.T) { t.Run("can be triggered by remote ban", func(t *testing.T) { alice := deployment.RegisterUser(t, "hs1", "t51alice", "secret", false) - handleTransactions := federation.HandleTransactionRequests( - // Accept all PDUs and EDUs - func(e *gomatrixserverlib.Event) {}, - func(e gomatrixserverlib.EDU) {}, - ) - server := createTestServer(t, deployment, handleTransactions) + server := createTestServer(t, deployment) cancel := server.Listen() defer cancel() @@ -3901,12 +3951,7 @@ func TestPartialStateJoin(t *testing.T) { } t.Log("Alice begins a partial join to a room") alice := deployment.RegisterUser(t, "hs1", "t46alice", "secret", true) - // Ignore PDUs (leaves from shutting down the room) and EDUs (presence). - server := createTestServer( - t, - deployment, - federation.HandleTransactionRequests(nil, nil), - ) + server := createTestServer(t, deployment) cancel := server.Listen() defer cancel() @@ -3935,6 +3980,8 @@ func TestPartialStateJoin(t *testing.T) { psjResult.AwaitStateIdsRequest(t) t.Log("Alice purges that room") + // Ignore PDUs (leaves from shutting down the room). + server.AddPDUHandler(func(e *gomatrixserverlib.Event) bool { return true }) alice.MustDoFunc(t, "DELETE", []string{"_synapse", "admin", "v1", "rooms", serverRoom.RoomID}, client.WithJSONBody(t, map[string]interface{}{})) // Note: clients don't get told about purged rooms. No leave event for you! @@ -4135,7 +4182,7 @@ func buildLazyLoadingSyncFilter(timelineOptions map[string]interface{}) string { // partialStateJoinResult is the result of beginPartialStateJoin type partialStateJoinResult struct { - Server *federation.Server + Server *server ServerRoom *federation.ServerRoom User *client.CSAPI fedStateIdsRequestReceivedWaiter *Waiter @@ -4149,7 +4196,7 @@ type partialStateJoinResult struct { // When this method completes, the /join request will have completed, but the // state has not yet been re-synced. To allow the re-sync to proceed, call // partialStateJoinResult.FinishStateRequest. -func beginPartialStateJoin(t *testing.T, server *federation.Server, serverRoom *federation.ServerRoom, joiningUser *client.CSAPI) partialStateJoinResult { +func beginPartialStateJoin(t *testing.T, server *server, serverRoom *federation.ServerRoom, joiningUser *client.CSAPI) partialStateJoinResult { // we store the Server and ServerRoom for the benefit of utilities like testReceiveEventDuringPartialStateJoin result := partialStateJoinResult{ Server: server, @@ -4193,8 +4240,10 @@ func beginPartialStateJoin(t *testing.T, server *federation.Server, serverRoom * return result } -// Destroy cleans up the resources associated with the join attempt. It must -// be called once the test is finished +// Destroy cleans up the resources associated with the join attempt. +// It is idempotent and must be called once the test is finished. +// Specifically, it ensures that the partial state join completes and makes the joining user leave +// the room. func (psj *partialStateJoinResult) Destroy(t *testing.T) { if psj.fedStateIdsSendResponseWaiter != nil { psj.fedStateIdsSendResponseWaiter.Finish() @@ -4208,7 +4257,18 @@ func (psj *partialStateJoinResult) Destroy(t *testing.T) { // has finished all federation activity before tearing down the Complement server. // Otherwise the homeserver at the Complement's hostname:port combination may be // considered offline and interfere with subsequent tests. + t.Log("Cleaning up after test...") + awaitPartialStateJoinCompletion(t, psj.ServerRoom, psj.User) + + // The caller is about to tear down the Complement homeserver. Leave the room, so + // that the homeserver under test stops sending it presence updates. + psj.Server.WithWaitForLeave( + t, + psj.ServerRoom, + psj.User.UserID, + func() { psj.User.LeaveRoom(t, psj.ServerRoom.RoomID) }, + ) } // send a message into the room without letting the homeserver under test know about it. @@ -4250,7 +4310,7 @@ func (psj *partialStateJoinResult) FinishStateRequest() { // if requestReceivedWaiter is not nil, it will be Finish()ed when the request arrives. // if sendResponseWaiter is not nil, we will Wait() for it to finish before sending the response. func handleStateIdsRequests( - t *testing.T, srv *federation.Server, serverRoom *federation.ServerRoom, + t *testing.T, srv *server, serverRoom *federation.ServerRoom, eventID string, roomState []*gomatrixserverlib.Event, requestReceivedWaiter *Waiter, sendResponseWaiter *Waiter, ) { @@ -4290,7 +4350,7 @@ func handleStateIdsRequests( // if requestReceivedWaiter is not nil, it will be Finish()ed when the request arrives. // if sendResponseWaiter is not nil, we will Wait() for it to finish before sending the response. func handleStateRequests( - t *testing.T, srv *federation.Server, serverRoom *federation.ServerRoom, + t *testing.T, srv *server, serverRoom *federation.ServerRoom, eventID string, roomState []*gomatrixserverlib.Event, requestReceivedWaiter *Waiter, sendResponseWaiter *Waiter, ) { @@ -4329,7 +4389,7 @@ func handleStateRequests( // This can (currently) only handle a single `/get_missing_events` request, and the "latest_events" in the request // must match those listed in "expectedLatestEvents" (otherwise the test is failed). func handleGetMissingEventsRequests( - t *testing.T, srv *federation.Server, serverRoom *federation.ServerRoom, + t *testing.T, srv *server, serverRoom *federation.ServerRoom, expectedLatestEvents []string, eventsToReturn []*gomatrixserverlib.Event, ) { srv.Mux().HandleFunc(fmt.Sprintf("/_matrix/federation/v1/get_missing_events/%s", serverRoom.RoomID), func(w http.ResponseWriter, req *http.Request) {