diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index b22432e7..edadd606 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -34,6 +34,37 @@ import ( ) func TestPartialStateJoin(t *testing.T) { + // createTestServer spins up a federation server suitable for the tests in this file + createTestServer := func(t *testing.T, deployment *docker.Deployment) *federation.Server { + return federation.NewServer(t, deployment, + federation.HandleKeyRequests(), + federation.HandlePartialStateMakeSendJoinRequests(), + federation.HandleEventRequests(), + federation.HandleTransactionRequests( + func(e *gomatrixserverlib.Event) { + t.Fatalf("Received unexpected PDU: %s", string(e.JSON())) + }, + // the homeserver under test may send us presence when the joining user syncs + nil, + ), + ) + } + + // createTestRoom creates a room on the complement server suitable for many of the tests in this file + createTestRoom := func(t *testing.T, server *federation.Server, roomVer gomatrixserverlib.RoomVersion) *federation.ServerRoom { + // create the room on the complement server, with charlie and derek as members + serverRoom := server.MustMakeRoom(t, roomVer, federation.InitialRoomEvents(roomVer, server.UserID("charlie"))) + serverRoom.AddEvent(server.MustCreateEvent(t, serverRoom, b.Event{ + Type: "m.room.member", + StateKey: b.Ptr(server.UserID("derek")), + Sender: server.UserID("derek"), + Content: map[string]interface{}{ + "membership": "join", + }, + })) + return serverRoom + } + // test that a regular /sync request made during a partial-state /send_join // request blocks until the state is correctly synced. t.Run("SyncBlocksDuringPartialStateJoin", func(t *testing.T) { @@ -41,7 +72,11 @@ func TestPartialStateJoin(t *testing.T) { defer deployment.Destroy(t) alice := deployment.Client(t, "hs1", "@alice:hs1") - psjResult := beginPartialStateJoin(t, deployment, alice) + server := createTestServer(t, deployment) + cancel := server.Listen() + defer cancel() + serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t)) + psjResult := beginPartialStateJoin(t, server, serverRoom, alice) defer psjResult.Destroy() // Alice has now joined the room, and the server is syncing the state in the background. @@ -75,7 +110,7 @@ func TestPartialStateJoin(t *testing.T) { case syncRes = <-syncResponseChan: } - roomRes := syncRes.Get("rooms.join." + client.GjsonEscape(psjResult.ServerRoom.RoomID)) + roomRes := syncRes.Get("rooms.join." + client.GjsonEscape(serverRoom.RoomID)) if !roomRes.Exists() { t.Fatalf("/sync completed without join to new room\n") } @@ -83,8 +118,8 @@ func TestPartialStateJoin(t *testing.T) { // check that the state includes both charlie and derek. matcher := match.JSONCheckOffAllowUnwanted("state.events", []interface{}{ - "m.room.member|" + psjResult.Server.UserID("charlie"), - "m.room.member|" + psjResult.Server.UserID("derek"), + "m.room.member|" + server.UserID("charlie"), + "m.room.member|" + server.UserID("derek"), }, func(result gjson.Result) interface{} { return strings.Join([]string{result.Map()["type"].Str, result.Map()["state_key"].Str}, "|") }, nil, @@ -101,14 +136,18 @@ func TestPartialStateJoin(t *testing.T) { defer deployment.Destroy(t) alice := deployment.Client(t, "hs1", "@alice:hs1") - psjResult := beginPartialStateJoin(t, deployment, alice) + server := createTestServer(t, deployment) + cancel := server.Listen() + defer cancel() + serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t)) + psjResult := beginPartialStateJoin(t, server, serverRoom, alice) defer psjResult.Destroy() alice.MustSyncUntil(t, client.SyncReq{ Filter: buildLazyLoadingSyncFilter(nil), }, - client.SyncJoinedTo(alice.UserID, psjResult.ServerRoom.RoomID), + client.SyncJoinedTo(alice.UserID, serverRoom.RoomID), ) t.Logf("Alice successfully synced") }) @@ -120,11 +159,15 @@ func TestPartialStateJoin(t *testing.T) { defer deployment.Destroy(t) alice := deployment.Client(t, "hs1", "@alice:hs1") - psjResult := beginPartialStateJoin(t, deployment, alice) + server := createTestServer(t, deployment) + cancel := server.Listen() + defer cancel() + serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t)) + psjResult := beginPartialStateJoin(t, server, serverRoom, alice) defer psjResult.Destroy() alice.Client.Timeout = 2 * time.Second - paths := []string{"_matrix", "client", "v3", "rooms", psjResult.ServerRoom.RoomID, "send", "m.room.message", "0"} + paths := []string{"_matrix", "client", "v3", "rooms", serverRoom.RoomID, "send", "m.room.message", "0"} res := alice.MustDoFunc(t, "PUT", paths, client.WithJSONBody(t, map[string]interface{}{ "msgtype": "m.text", "body": "Hello world!", @@ -140,11 +183,15 @@ func TestPartialStateJoin(t *testing.T) { defer deployment.Destroy(t) alice := deployment.Client(t, "hs1", "@alice:hs1") - psjResult := beginPartialStateJoin(t, deployment, alice) + server := createTestServer(t, deployment) + cancel := server.Listen() + defer cancel() + serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t)) + psjResult := beginPartialStateJoin(t, server, serverRoom, alice) defer psjResult.Destroy() // the HS will make an /event_auth request for the event - federation.HandleEventAuthRequests()(psjResult.Server) + federation.HandleEventAuthRequests()(server) event := psjResult.CreateMessageEvent(t, "derek", nil) t.Logf("Derek created event with ID %s", event.EventID()) @@ -159,7 +206,11 @@ func TestPartialStateJoin(t *testing.T) { defer deployment.Destroy(t) alice := deployment.Client(t, "hs1", "@alice:hs1") - psjResult := beginPartialStateJoin(t, deployment, alice) + server := createTestServer(t, deployment) + cancel := server.Listen() + defer cancel() + serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t)) + psjResult := beginPartialStateJoin(t, server, serverRoom, alice) defer psjResult.Destroy() // we construct the following event graph: @@ -170,7 +221,7 @@ func TestPartialStateJoin(t *testing.T) { // // initially, hs1 only knows about event M. // we send only event B to hs1. - eventM := psjResult.ServerRoom.CurrentState("m.room.member", alice.UserID) + eventM := serverRoom.CurrentState("m.room.member", alice.UserID) eventA := psjResult.CreateMessageEvent(t, "derek", []string{eventM.EventID()}) eventB := psjResult.CreateMessageEvent(t, "derek", []string{eventA.EventID()}) t.Logf("%s's m.room.member event is %s", *eventM.StateKey(), eventM.EventID()) @@ -178,10 +229,10 @@ func TestPartialStateJoin(t *testing.T) { t.Logf("Derek created event B with ID %s", eventB.EventID()) // the HS will make an /event_auth request for event A - federation.HandleEventAuthRequests()(psjResult.Server) + federation.HandleEventAuthRequests()(server) // the HS will make a /get_missing_events request for the missing prev events of event B - handleGetMissingEventsRequests(t, psjResult.Server, psjResult.ServerRoom, + handleGetMissingEventsRequests(t, server, serverRoom, []string{eventB.EventID()}, []*gomatrixserverlib.Event{eventA}) // send event B to hs1 @@ -194,7 +245,11 @@ func TestPartialStateJoin(t *testing.T) { defer deployment.Destroy(t) alice := deployment.Client(t, "hs1", "@alice:hs1") - psjResult := beginPartialStateJoin(t, deployment, alice) + server := createTestServer(t, deployment) + cancel := server.Listen() + defer cancel() + serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t)) + psjResult := beginPartialStateJoin(t, server, serverRoom, alice) defer psjResult.Destroy() // we construct the following event graph: @@ -207,7 +262,7 @@ func TestPartialStateJoin(t *testing.T) { // // initially, hs1 only knows about event M. // we send only event B to hs1. - eventM := psjResult.ServerRoom.CurrentState("m.room.member", alice.UserID) + eventM := serverRoom.CurrentState("m.room.member", alice.UserID) eventA := psjResult.CreateMessageEvent(t, "derek", []string{eventM.EventID()}) eventB := psjResult.CreateMessageEvent(t, "derek", []string{eventA.EventID(), eventM.EventID()}) t.Logf("%s's m.room.member event is %s", *eventM.StateKey(), eventM.EventID()) @@ -215,10 +270,10 @@ func TestPartialStateJoin(t *testing.T) { t.Logf("Derek created event B with ID %s", eventB.EventID()) // the HS will make an /event_auth request for event A - federation.HandleEventAuthRequests()(psjResult.Server) + federation.HandleEventAuthRequests()(server) // the HS will make a /get_missing_events request for the missing prev event of event B - handleGetMissingEventsRequests(t, psjResult.Server, psjResult.ServerRoom, + handleGetMissingEventsRequests(t, server, serverRoom, []string{eventB.EventID()}, []*gomatrixserverlib.Event{eventA}) // send event B to hs1 @@ -232,7 +287,11 @@ func TestPartialStateJoin(t *testing.T) { defer deployment.Destroy(t) alice := deployment.Client(t, "hs1", "@alice:hs1") - psjResult := beginPartialStateJoin(t, deployment, alice) + server := createTestServer(t, deployment) + cancel := server.Listen() + defer cancel() + serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t)) + psjResult := beginPartialStateJoin(t, server, serverRoom, alice) defer psjResult.Destroy() // we construct the following event graph: @@ -245,7 +304,7 @@ func TestPartialStateJoin(t *testing.T) { // // initially, hs1 only knows about event M. // we send only event C to hs1. - eventM := psjResult.ServerRoom.CurrentState("m.room.member", alice.UserID) + eventM := serverRoom.CurrentState("m.room.member", alice.UserID) eventA := psjResult.CreateMessageEvent(t, "derek", []string{eventM.EventID()}) eventB := psjResult.CreateMessageEvent(t, "derek", []string{eventA.EventID(), eventM.EventID()}) eventC := psjResult.CreateMessageEvent(t, "derek", []string{eventB.EventID()}) @@ -256,12 +315,12 @@ func TestPartialStateJoin(t *testing.T) { // the HS will make a /get_missing_events request for the missing prev event of event C, // to which we respond with event B only. - handleGetMissingEventsRequests(t, psjResult.Server, psjResult.ServerRoom, + handleGetMissingEventsRequests(t, server, serverRoom, []string{eventC.EventID()}, []*gomatrixserverlib.Event{eventB}) // dedicated state_ids and state handlers for event A - handleStateIdsRequests(t, psjResult.Server, psjResult.ServerRoom, eventA.EventID(), psjResult.ServerRoom.AllCurrentState(), nil, nil) - handleStateRequests(t, psjResult.Server, psjResult.ServerRoom, eventA.EventID(), psjResult.ServerRoom.AllCurrentState(), nil, nil) + handleStateIdsRequests(t, server, serverRoom, eventA.EventID(), serverRoom.AllCurrentState(), nil, nil) + handleStateRequests(t, server, serverRoom, eventA.EventID(), serverRoom.AllCurrentState(), nil, nil) // send event C to hs1 testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, eventC) @@ -274,7 +333,11 @@ func TestPartialStateJoin(t *testing.T) { defer deployment.Destroy(t) alice := deployment.Client(t, "hs1", "@alice:hs1") - psjResult := beginPartialStateJoin(t, deployment, alice) + server := createTestServer(t, deployment) + cancel := server.Listen() + defer cancel() + serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t)) + psjResult := beginPartialStateJoin(t, server, serverRoom, alice) defer psjResult.Destroy() // we need a sync token to pass to the `at` param. @@ -282,7 +345,7 @@ func TestPartialStateJoin(t *testing.T) { client.SyncReq{ Filter: buildLazyLoadingSyncFilter(nil), }, - client.SyncJoinedTo(alice.UserID, psjResult.ServerRoom.RoomID), + client.SyncJoinedTo(alice.UserID, serverRoom.RoomID), ) t.Logf("Alice successfully synced") @@ -295,7 +358,7 @@ func TestPartialStateJoin(t *testing.T) { clientMembersRequestResponseChan <- alice.MustDoFunc( t, "GET", - []string{"_matrix", "client", "v3", "rooms", psjResult.ServerRoom.RoomID, "members"}, + []string{"_matrix", "client", "v3", "rooms", serverRoom.RoomID, "members"}, client.WithQueries(queryParams), ) }() @@ -313,8 +376,8 @@ func TestPartialStateJoin(t *testing.T) { match.JSONCheckOff("chunk", []interface{}{ "m.room.member|" + alice.UserID, - "m.room.member|" + psjResult.Server.UserID("charlie"), - "m.room.member|" + psjResult.Server.UserID("derek"), + "m.room.member|" + server.UserID("charlie"), + "m.room.member|" + server.UserID("derek"), }, func(result gjson.Result) interface{} { return strings.Join([]string{result.Map()["type"].Str, result.Map()["state_key"].Str}, "|") }, nil), @@ -330,7 +393,11 @@ func TestPartialStateJoin(t *testing.T) { defer deployment.Destroy(t) alice := deployment.Client(t, "hs1", "@alice:hs1") - psjResult := beginPartialStateJoin(t, deployment, alice) + server := createTestServer(t, deployment) + cancel := server.Listen() + defer cancel() + serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t)) + psjResult := beginPartialStateJoin(t, server, serverRoom, alice) defer psjResult.Destroy() // Alice has now joined the room, and the server is syncing the state in the background. @@ -373,7 +440,7 @@ func TestPartialStateJoin(t *testing.T) { case syncRes = <-syncResponseChan: } - roomRes := syncRes.Get("rooms.join." + client.GjsonEscape(psjResult.ServerRoom.RoomID)) + roomRes := syncRes.Get("rooms.join." + client.GjsonEscape(serverRoom.RoomID)) if !roomRes.Exists() { t.Fatalf("/sync completed without join to new room\n") } @@ -394,18 +461,7 @@ func TestPartialStateJoin(t *testing.T) { }) // create the complement homeserver - server := federation.NewServer(t, deployment, - federation.HandleKeyRequests(), - federation.HandlePartialStateMakeSendJoinRequests(), - federation.HandleEventRequests(), - federation.HandleTransactionRequests( - func(e *gomatrixserverlib.Event) { - t.Fatalf("Received unexpected PDU: %s", string(e.JSON())) - }, - // hs1 may send us presence when alice syncs - nil, - ), - ) + server := createTestServer(t, deployment) cancelListener := server.Listen() defer cancelListener() @@ -484,7 +540,11 @@ func TestPartialStateJoin(t *testing.T) { defer deployment.Destroy(t) alice := deployment.Client(t, "hs1", "@alice:hs1") - psjResult := beginPartialStateJoin(t, deployment, alice) + server := createTestServer(t, deployment) + cancel := server.Listen() + defer cancel() + serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t)) + psjResult := beginPartialStateJoin(t, server, serverRoom, alice) defer psjResult.Destroy() // get a sync token before state syncing finishes. @@ -492,7 +552,7 @@ func TestPartialStateJoin(t *testing.T) { client.SyncReq{ Filter: buildLazyLoadingSyncFilter(nil), }, - client.SyncJoinedTo(alice.UserID, psjResult.ServerRoom.RoomID), + client.SyncJoinedTo(alice.UserID, serverRoom.RoomID), ) t.Logf("Alice successfully synced") @@ -502,30 +562,30 @@ func TestPartialStateJoin(t *testing.T) { psjResult.FinishStateRequest() alice.MustSyncUntil(t, client.SyncReq{}, - client.SyncJoinedTo(alice.UserID, psjResult.ServerRoom.RoomID), + client.SyncJoinedTo(alice.UserID, serverRoom.RoomID), ) // make derek send two messages into the room. // we will do a gappy sync after, which will only pick up the last message. var lastEventID string for i := 0; i < 2; i++ { - event := psjResult.Server.MustCreateEvent(t, psjResult.ServerRoom, b.Event{ + event := server.MustCreateEvent(t, serverRoom, b.Event{ Type: "m.room.message", - Sender: psjResult.Server.UserID("derek"), + Sender: server.UserID("derek"), Content: map[string]interface{}{ "msgtype": "m.text", "body": "Message " + strconv.Itoa(i), }, }) lastEventID = event.EventID() - psjResult.ServerRoom.AddEvent(event) - psjResult.Server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{event.JSON()}, nil) + serverRoom.AddEvent(event) + server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{event.JSON()}, nil) } // wait for the events to come down a regular /sync. alice.MustSyncUntil(t, client.SyncReq{}, - client.SyncTimelineHasEventID(psjResult.ServerRoom.RoomID, lastEventID), + client.SyncTimelineHasEventID(serverRoom.RoomID, lastEventID), ) // now do a gappy sync using the sync token from before. @@ -539,11 +599,11 @@ func TestPartialStateJoin(t *testing.T) { ) // check that the state includes derek. - roomRes := syncRes.Get("rooms.join." + client.GjsonEscape(psjResult.ServerRoom.RoomID)) + roomRes := syncRes.Get("rooms.join." + client.GjsonEscape(serverRoom.RoomID)) if !roomRes.Exists() { t.Fatalf("/sync completed without join to new room\n") } - t.Logf("gappy /sync response for %s: %s", psjResult.ServerRoom.RoomID, roomRes) + t.Logf("gappy /sync response for %s: %s", serverRoom.RoomID, roomRes) timelineMatcher := match.JSONCheckOff("timeline.events", []interface{}{lastEventID}, @@ -553,7 +613,7 @@ func TestPartialStateJoin(t *testing.T) { ) stateMatcher := match.JSONCheckOffAllowUnwanted("state.events", []interface{}{ - "m.room.member|" + psjResult.Server.UserID("derek"), + "m.room.member|" + server.UserID("derek"), }, func(result gjson.Result) interface{} { return strings.Join([]string{result.Map()["type"].Str, result.Map()["state_key"].Str}, "|") }, nil, @@ -576,7 +636,11 @@ func TestPartialStateJoin(t *testing.T) { defer deployment.Destroy(t) alice := deployment.Client(t, "hs1", "@alice:hs1") - psjResult := beginPartialStateJoin(t, deployment, alice) + server := createTestServer(t, deployment) + cancel := server.Listen() + defer cancel() + serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t)) + psjResult := beginPartialStateJoin(t, server, serverRoom, alice) defer psjResult.Destroy() // Alice has now joined the room, and the server is syncing the state in the background. @@ -594,13 +658,13 @@ func TestPartialStateJoin(t *testing.T) { outlierEventIDs := make([]string, len(outliers)) for i := range outliers { body := fmt.Sprintf("outlier event %d", i) - outliers[i] = psjResult.Server.MustCreateEvent(t, psjResult.ServerRoom, b.Event{ + outliers[i] = server.MustCreateEvent(t, serverRoom, b.Event{ Type: "outlier_state", - Sender: psjResult.Server.UserID("charlie"), + Sender: server.UserID("charlie"), StateKey: b.Ptr(fmt.Sprintf("state_%d", i)), Content: map[string]interface{}{"body": body}, }) - psjResult.ServerRoom.AddEvent(outliers[i]) + serverRoom.AddEvent(outliers[i]) outlierEventIDs[i] = outliers[i].EventID() } t.Logf("Created outliers: %s ... %s", outliers[0].EventID(), outliers[len(outliers)-1].EventID()) @@ -612,26 +676,26 @@ func TestPartialStateJoin(t *testing.T) { // dedicated get_missing_event handler for timelineEvent2. // we grudgingly return a single event. - handleGetMissingEventsRequests(t, psjResult.Server, psjResult.ServerRoom, + handleGetMissingEventsRequests(t, server, serverRoom, []string{timelineEvent2.EventID()}, []*gomatrixserverlib.Event{timelineEvent1}, ) // dedicated state_ids and state handlers for timelineEvent1's prev event (ie, the last outlier event) - handleStateIdsRequests(t, psjResult.Server, psjResult.ServerRoom, outliers[len(outliers)-1].EventID(), - psjResult.ServerRoom.AllCurrentState(), nil, nil) - handleStateRequests(t, psjResult.Server, psjResult.ServerRoom, outliers[len(outliers)-1].EventID(), - psjResult.ServerRoom.AllCurrentState(), nil, nil) + handleStateIdsRequests(t, server, serverRoom, outliers[len(outliers)-1].EventID(), + serverRoom.AllCurrentState(), nil, nil) + handleStateRequests(t, server, serverRoom, outliers[len(outliers)-1].EventID(), + serverRoom.AllCurrentState(), nil, nil) // now, send over the most recent event, which will make the server get_missing_events // (we will send timelineEvent1), and then request state (we will send all the outliers). - psjResult.Server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{timelineEvent2.JSON()}, nil) + server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{timelineEvent2.JSON()}, nil) t.Logf("Charlie sent timeline event 2") // wait for it to become visible, which implies that all the outliers have been pulled in. - awaitEventArrival(t, time.Second, alice, psjResult.ServerRoom.RoomID, timelineEvent2.EventID()) + awaitEventArrival(t, time.Second, alice, serverRoom.RoomID, timelineEvent2.EventID()) // now we send over all the other events in the gap. - psjResult.Server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{lateEvent.JSON()}, nil) + server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{lateEvent.JSON()}, nil) t.Logf("Charlie sent late event") for i := 0; i < len(outliers); { @@ -640,13 +704,13 @@ func TestPartialStateJoin(t *testing.T) { for j := i; j < i+50 && j < len(outliers); j++ { transactionEvents = append(transactionEvents, outliers[j].JSON()) } - psjResult.Server.MustSendTransaction(t, deployment, "hs1", transactionEvents, nil) + server.MustSendTransaction(t, deployment, "hs1", transactionEvents, nil) t.Logf("Charlie sent %d ex-outliers", len(transactionEvents)) i += len(transactionEvents) } // wait for the last outlier to arrive - awaitEventArrival(t, 10*time.Second, alice, psjResult.ServerRoom.RoomID, outliers[len(outliers)-1].EventID()) + awaitEventArrival(t, 10*time.Second, alice, serverRoom.RoomID, outliers[len(outliers)-1].EventID()) // release the federation /state response psjResult.FinishStateRequest() @@ -657,7 +721,7 @@ func TestPartialStateJoin(t *testing.T) { alice.MustSyncUntil(t, client.SyncReq{}, func(clientUserID string, topLevelSyncJSON gjson.Result) error { - key := "rooms.join." + client.GjsonEscape(psjResult.ServerRoom.RoomID) + ".timeline.events" + key := "rooms.join." + client.GjsonEscape(serverRoom.RoomID) + ".timeline.events" array := topLevelSyncJSON.Get(key) if !array.Exists() { return fmt.Errorf("Key %s does not exist", key) @@ -798,22 +862,25 @@ func buildLazyLoadingSyncFilter(timelineOptions map[string]interface{}) string { // partialStateJoinResult is the result of beginPartialStateJoin type partialStateJoinResult struct { - cancelListener func() Server *federation.Server ServerRoom *federation.ServerRoom fedStateIdsRequestReceivedWaiter *Waiter fedStateIdsSendResponseWaiter *Waiter } -// beginPartialStateJoin spins up a room on a complement server, -// then has a test user join it. It returns a partialStateJoinResult, -// which must be Destroy'd on completion. +// beginPartialStateJoin has a test user attempt to join the given room. +// +// It returns a partialStateJoinResult, which must be Destroy'd on completion. // // When this method completes, the /join request will have completed, but the // state has not yet been re-synced. To allow the re-sync to proceed, call // partialStateJoinResult.FinishStateRequest. -func beginPartialStateJoin(t *testing.T, deployment *docker.Deployment, joiningUser *client.CSAPI) partialStateJoinResult { - result := partialStateJoinResult{} +func beginPartialStateJoin(t *testing.T, server *federation.Server, serverRoom *federation.ServerRoom, joiningUser *client.CSAPI) partialStateJoinResult { + // we store the Server and ServerRoom for the benefit of utilities like testReceiveEventDuringPartialStateJoin + result := partialStateJoinResult{ + Server: server, + ServerRoom: serverRoom, + } success := false defer func() { if !success { @@ -821,56 +888,30 @@ func beginPartialStateJoin(t *testing.T, deployment *docker.Deployment, joiningU } }() - result.Server = federation.NewServer(t, deployment, - federation.HandleKeyRequests(), - federation.HandlePartialStateMakeSendJoinRequests(), - federation.HandleEventRequests(), - federation.HandleTransactionRequests( - func(e *gomatrixserverlib.Event) { - t.Fatalf("Received unexpected PDU: %s", string(e.JSON())) - }, - // the homeserver under test may send us presence when the joining user syncs - nil, - ), - ) - result.cancelListener = result.Server.Listen() - // some things for orchestration result.fedStateIdsRequestReceivedWaiter = NewWaiter() result.fedStateIdsSendResponseWaiter = NewWaiter() - // create the room on the complement server, with charlie and derek as members - roomVer := joiningUser.GetDefaultRoomVersion(t) - result.ServerRoom = result.Server.MustMakeRoom(t, roomVer, federation.InitialRoomEvents(roomVer, result.Server.UserID("charlie"))) - result.ServerRoom.AddEvent(result.Server.MustCreateEvent(t, result.ServerRoom, b.Event{ - Type: "m.room.member", - StateKey: b.Ptr(result.Server.UserID("derek")), - Sender: result.Server.UserID("derek"), - Content: map[string]interface{}{ - "membership": "join", - }, - })) - // register a handler for /state_ids requests for the most recent event, // which finishes fedStateIdsRequestReceivedWaiter, then // waits for fedStateIdsSendResponseWaiter and sends a reply - lastEvent := result.ServerRoom.Timeline[len(result.ServerRoom.Timeline)-1] - currentState := result.ServerRoom.AllCurrentState() + lastEvent := serverRoom.Timeline[len(serverRoom.Timeline)-1] + currentState := serverRoom.AllCurrentState() handleStateIdsRequests( - t, result.Server, result.ServerRoom, + t, server, serverRoom, lastEvent.EventID(), currentState, result.fedStateIdsRequestReceivedWaiter, result.fedStateIdsSendResponseWaiter, ) // a handler for /state requests, which sends a sensible response handleStateRequests( - t, result.Server, result.ServerRoom, + t, server, serverRoom, lastEvent.EventID(), currentState, nil, nil, ) // have joiningUser join the room by room ID. - joiningUser.JoinRoom(t, result.ServerRoom.RoomID, []string{result.Server.ServerName()}) + joiningUser.JoinRoom(t, serverRoom.RoomID, []string{server.ServerName()}) t.Logf("/join request completed") success = true @@ -887,10 +928,6 @@ func (psj *partialStateJoinResult) Destroy() { if psj.fedStateIdsRequestReceivedWaiter != nil { psj.fedStateIdsRequestReceivedWaiter.Finish() } - - if psj.cancelListener != nil { - psj.cancelListener() - } } // send a message into the room without letting the homeserver under test know about it.