From 9b3f334b713f02c92b1f4e695757a6f6dc7d1d76 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 18 May 2022 16:39:12 +0100 Subject: [PATCH 1/4] Factor out a function to set up the partial-state-join We're going to need to do this several times for different tests, so we factor out a separate function, and some utility methods to use the result. --- ...federation_room_join_partial_state_test.go | 147 +++++++++++------- 1 file changed, 95 insertions(+), 52 deletions(-) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index 793c61d3..e43d8c2f 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -19,6 +19,7 @@ import ( "github.com/matrix-org/complement/internal/b" "github.com/matrix-org/complement/internal/client" + "github.com/matrix-org/complement/internal/docker" "github.com/matrix-org/complement/internal/federation" "github.com/matrix-org/complement/internal/match" ) @@ -27,43 +28,12 @@ import ( // made during a partial-state /send_join request blocks until the state is // correctly synced. func TestSyncBlocksDuringPartialStateJoin(t *testing.T) { - // We make a room on the Complement server, then have @alice:hs1 join it, - // and make a sync request while the resync is in flight - deployment := Deploy(t, b.BlueprintAlice) defer deployment.Destroy(t) - alice := deployment.Client(t, "hs1", "@alice:hs1") - srv := federation.NewServer(t, deployment, - federation.HandleKeyRequests(), - federation.HandlePartialStateMakeSendJoinRequests(), - federation.HandleEventRequests(), - ) - cancel := srv.Listen() - defer cancel() - - // some things for orchestration - fedStateIdsRequestReceivedWaiter := NewWaiter() - defer fedStateIdsRequestReceivedWaiter.Finish() - fedStateIdsSendResponseWaiter := NewWaiter() - defer fedStateIdsSendResponseWaiter.Finish() - - // create the room on the complement server, with charlie and derek as members - charlie := srv.UserID("charlie") - derek := srv.UserID("derek") - serverRoom := makeTestRoom(t, srv, alice.GetDefaultRoomVersion(t), charlie, derek) - - // register a handler for /state_ids requests, which finishes fedStateIdsRequestReceivedWaiter, then - // waits for fedStateIdsSendResponseWaiter and sends a reply - handleStateIdsRequests(t, srv, serverRoom, fedStateIdsRequestReceivedWaiter, fedStateIdsSendResponseWaiter) - - // a handler for /state requests, which sends a sensible response - handleStateRequests(t, srv, serverRoom, nil, nil) - - // have alice join the room by room ID. - alice.JoinRoom(t, serverRoom.RoomID, []string{srv.ServerName()}) - t.Logf("Join completed") + psjResult := beginPartialStateJoin(t, deployment, alice) + defer psjResult.Destroy() // Alice has now joined the room, and the server is syncing the state in the background. @@ -76,7 +46,7 @@ func TestSyncBlocksDuringPartialStateJoin(t *testing.T) { }() // wait for the state_ids request to arrive - fedStateIdsRequestReceivedWaiter.Waitf(t, 5*time.Second, "Waiting for /state_ids request") + psjResult.AwaitStateIdsRequest(t) // the client-side requests should still be waiting select { @@ -86,7 +56,7 @@ func TestSyncBlocksDuringPartialStateJoin(t *testing.T) { } // release the federation /state response - fedStateIdsSendResponseWaiter.Finish() + psjResult.FinishStateRequest() // the /sync request should now complete, with the new room var syncRes gjson.Result @@ -96,7 +66,7 @@ func TestSyncBlocksDuringPartialStateJoin(t *testing.T) { case syncRes = <-syncResponseChan: } - roomRes := syncRes.Get("rooms.join." + client.GjsonEscape(serverRoom.RoomID)) + roomRes := syncRes.Get("rooms.join." + client.GjsonEscape(psjResult.ServerRoom.RoomID)) if !roomRes.Exists() { t.Fatalf("/sync completed without join to new room\n") } @@ -104,32 +74,105 @@ func TestSyncBlocksDuringPartialStateJoin(t *testing.T) { // check that the state includes both charlie and derek. matcher := match.JSONCheckOffAllowUnwanted("state.events", []interface{}{ - "m.room.member|" + charlie, - "m.room.member|" + derek, + "m.room.member|" + psjResult.Server.UserID("charlie"), + "m.room.member|" + psjResult.Server.UserID("derek"), }, func(result gjson.Result) interface{} { return strings.Join([]string{result.Map()["type"].Str, result.Map()["state_key"].Str}, "|") }, nil, ) if err := matcher([]byte(roomRes.Raw)); err != nil { t.Errorf("Did not find expected state events in /sync response: %s", err) + } } -// makeTestRoom constructs a test room on the Complement server, and adds the given extra members -func makeTestRoom(t *testing.T, srv *federation.Server, roomVer gomatrixserverlib.RoomVersion, creator string, members ...string) *federation.ServerRoom { - serverRoom := srv.MustMakeRoom(t, roomVer, federation.InitialRoomEvents(roomVer, creator)) - for _, m := range members { - serverRoom.AddEvent(srv.MustCreateEvent(t, serverRoom, b.Event{ - Type: "m.room.member", - StateKey: b.Ptr(m), - Sender: m, - Content: map[string]interface{}{ - "membership": "join", - }, - }), - ) +// partialStateJoinResult is the result of beginPartialStateJoin +type partialStateJoinResult struct { + cancelListener func() + Server *federation.Server + ServerRoom *federation.ServerRoom + fedStateIdsRequestReceivedWaiter *Waiter + fedStateIdsSendResponseWaiter *Waiter +} + +// beginPartialStateJoin spins up a room on a complement server, +// then has a test user join it. It returns a partialStateJoinResult, +// which must be Destroy'd on completion. +// +// When this method completes, the /join request will have completed, but the +// state has not yet been re-synced. To allow the re-sync to proceed, call +// partialStateJoinResult.FinishStateRequest. +func beginPartialStateJoin(t *testing.T, deployment *docker.Deployment, joiningUser *client.CSAPI) partialStateJoinResult { + result := partialStateJoinResult{} + success := false + defer func() { + if !success { + result.Destroy() + } + }() + + result.Server = federation.NewServer(t, deployment, + federation.HandleKeyRequests(), + federation.HandlePartialStateMakeSendJoinRequests(), + federation.HandleEventRequests(), + ) + result.cancelListener = result.Server.Listen() + + // some things for orchestration + result.fedStateIdsRequestReceivedWaiter = NewWaiter() + result.fedStateIdsSendResponseWaiter = NewWaiter() + + // create the room on the complement server, with charlie and derek as members + roomVer := joiningUser.GetDefaultRoomVersion(t) + result.ServerRoom = result.Server.MustMakeRoom(t, roomVer, federation.InitialRoomEvents(roomVer, result.Server.UserID("charlie"))) + result.ServerRoom.AddEvent(result.Server.MustCreateEvent(t, result.ServerRoom, b.Event{ + Type: "m.room.member", + StateKey: b.Ptr(result.Server.UserID("derek")), + Sender: result.Server.UserID("derek"), + Content: map[string]interface{}{ + "membership": "join", + }, + })) + + // register a handler for /state_ids requests, which finishes fedStateIdsRequestReceivedWaiter, then + // waits for fedStateIdsSendResponseWaiter and sends a reply + handleStateIdsRequests(t, result.Server, result.ServerRoom, result.fedStateIdsRequestReceivedWaiter, result.fedStateIdsSendResponseWaiter) + + // a handler for /state requests, which sends a sensible response + handleStateRequests(t, result.Server, result.ServerRoom, nil, nil) + + // have joiningUser join the room by room ID. + joiningUser.JoinRoom(t, result.ServerRoom.RoomID, []string{result.Server.ServerName()}) + t.Logf("/join request completed") + + success = true + return result +} + +// Destroy cleans up the resources associated with the join attempt. It must +// be called once the test is finished +func (psj *partialStateJoinResult) Destroy() { + if psj.fedStateIdsSendResponseWaiter != nil { + psj.fedStateIdsSendResponseWaiter.Finish() + } + + if psj.fedStateIdsRequestReceivedWaiter != nil { + psj.fedStateIdsRequestReceivedWaiter.Finish() } - return serverRoom + + if psj.cancelListener != nil { + psj.cancelListener() + } +} + +// wait for a /state_ids request for the test room to arrive +func (psj *partialStateJoinResult) AwaitStateIdsRequest(t *testing.T) { + psj.fedStateIdsRequestReceivedWaiter.Waitf(t, 5*time.Second, "Waiting for /state_ids request") +} + +// allow the /state_ids request to complete, thus allowing the state re-sync to complete +func (psj *partialStateJoinResult) FinishStateRequest() { + psj.fedStateIdsSendResponseWaiter.Finish() } // handleStateIdsRequests registers a handler for /state_ids requests for serverRoom. From 6f51a509d7a062c924ca8bae0d3c1dc450510a60 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 18 May 2022 16:45:54 +0100 Subject: [PATCH 2/4] Add tests for operations during partial-state sync --- ...federation_room_join_partial_state_test.go | 112 ++++++++++++++++++ 1 file changed, 112 insertions(+) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index e43d8c2f..62041c26 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -9,6 +9,7 @@ import ( "encoding/json" "fmt" "net/http" + "net/url" "strings" "testing" "time" @@ -22,6 +23,7 @@ import ( "github.com/matrix-org/complement/internal/docker" "github.com/matrix-org/complement/internal/federation" "github.com/matrix-org/complement/internal/match" + "github.com/matrix-org/complement/internal/must" ) // TestSyncBlocksDuringPartialStateJoin tests that a regular /sync request @@ -86,6 +88,116 @@ func TestSyncBlocksDuringPartialStateJoin(t *testing.T) { } } +// when Alice does a lazy-loading sync, she should see the room immediately +func TestCanLazyLoadingSyncDuringPartialStateJoin(t *testing.T) { + deployment := Deploy(t, b.BlueprintAlice) + defer deployment.Destroy(t) + alice := deployment.Client(t, "hs1", "@alice:hs1") + + psjResult := beginPartialStateJoin(t, deployment, alice) + defer psjResult.Destroy() + + alice.MustSyncUntil(t, + client.SyncReq{ + Filter: buildLazyLoadingSyncFilter(), + }, + client.SyncJoinedTo(alice.UserID, psjResult.ServerRoom.RoomID), + ) + t.Logf("Alice successfully synced") +} + +// we should be able to send events in the room, during the resync +func TestCanSendEventsDuringPartialStateJoin(t *testing.T) { + t.Skip("Cannot yet send events during resync") + deployment := Deploy(t, b.BlueprintAlice) + defer deployment.Destroy(t) + alice := deployment.Client(t, "hs1", "@alice:hs1") + + psjResult := beginPartialStateJoin(t, deployment, alice) + defer psjResult.Destroy() + + alice.Client.Timeout = 2 * time.Second + paths := []string{"_matrix", "client", "r0", "rooms", psjResult.ServerRoom.RoomID, "send", "m.room.message", "0"} + res := alice.MustDoFunc(t, "PUT", paths, client.WithJSONBody(t, map[string]interface{}{ + "msgtype": "m.text", + "body": "Hello world!", + })) + body := gjson.ParseBytes(client.ParseJSON(t, res)) + eventID := body.Get("event_id").Str + t.Logf("Alice sent event event ID %s", eventID) +} + +// a request to (client-side) /members?at= should block until the (federation) /state request completes +// TODO(faster_joins): also need to test /state, and /members without an `at`, which follow a different path +func TestMembersRequestBlocksDuringPartialStateJoin(t *testing.T) { + deployment := Deploy(t, b.BlueprintAlice) + defer deployment.Destroy(t) + alice := deployment.Client(t, "hs1", "@alice:hs1") + + psjResult := beginPartialStateJoin(t, deployment, alice) + defer psjResult.Destroy() + + // we need a sync token to pass to the `at` param. + syncToken := alice.MustSyncUntil(t, + client.SyncReq{ + Filter: buildLazyLoadingSyncFilter(), + }, + client.SyncJoinedTo(alice.UserID, psjResult.ServerRoom.RoomID), + ) + t.Logf("Alice successfully synced") + + // Fire off a goroutine to send the request, and write the response back to a channel. + clientMembersRequestResponseChan := make(chan *http.Response) + defer close(clientMembersRequestResponseChan) + go func() { + queryParams := url.Values{} + queryParams.Set("at", syncToken) + clientMembersRequestResponseChan <- alice.MustDoFunc( + t, + "GET", + []string{"_matrix", "client", "r0", "rooms", psjResult.ServerRoom.RoomID, "members"}, + client.WithQueries(queryParams), + ) + }() + + // release the federation /state response + psjResult.FinishStateRequest() + + // the client-side /members request should now complete, with a response that includes charlie and derek. + select { + case <-time.After(1 * time.Second): + t.Fatalf("client-side /members request did not complete") + case res := <-clientMembersRequestResponseChan: + must.MatchResponse(t, res, match.HTTPResponse{ + JSON: []match.JSON{ + match.JSONCheckOff("chunk", + []interface{}{ + "m.room.member|" + alice.UserID, + "m.room.member|" + psjResult.Server.UserID("charlie"), + "m.room.member|" + psjResult.Server.UserID("derek"), + }, func(result gjson.Result) interface{} { + return strings.Join([]string{result.Map()["type"].Str, result.Map()["state_key"].Str}, "|") + }, nil), + }, + }) + } +} + +// buildLazyLoadingSyncFilter constructs a json-marshalled filter suitable the 'Filter' field of a client.SyncReq +func buildLazyLoadingSyncFilter() string { + j, _ := json.Marshal(map[string]interface{}{ + "room": map[string]interface{}{ + "timeline": map[string]interface{}{ + "lazy_load_members": true, + }, + "state": map[string]interface{}{ + "lazy_load_members": true, + }, + }, + }) + return string(j) +} + // partialStateJoinResult is the result of beginPartialStateJoin type partialStateJoinResult struct { cancelListener func() From 0d88c77b072dd5f83d2ef691f0690ff08f4163d9 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 18 May 2022 17:13:45 +0100 Subject: [PATCH 3/4] Enable faster-joins tests in complement CI --- .github/workflows/ci.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index b4b1bfed..a024138f 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -30,7 +30,7 @@ jobs: matrix: include: - homeserver: Synapse - tags: synapse_blacklist,msc3083 + tags: synapse_blacklist msc3083 faster_joins - homeserver: Dendrite tags: msc2836 dendrite_blacklist From c293cec94d0680733035f447ca6a647b51e8cb0d Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 18 May 2022 17:54:49 +0100 Subject: [PATCH 4/4] Group tests under a single parent test Mostly, this is nice because it groups all the output together. --- ...federation_room_join_partial_state_test.go | 285 +++++++++--------- 1 file changed, 143 insertions(+), 142 deletions(-) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index 62041c26..45ec6a74 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -26,161 +26,162 @@ import ( "github.com/matrix-org/complement/internal/must" ) -// TestSyncBlocksDuringPartialStateJoin tests that a regular /sync request -// made during a partial-state /send_join request blocks until the state is -// correctly synced. -func TestSyncBlocksDuringPartialStateJoin(t *testing.T) { - deployment := Deploy(t, b.BlueprintAlice) - defer deployment.Destroy(t) - alice := deployment.Client(t, "hs1", "@alice:hs1") - - psjResult := beginPartialStateJoin(t, deployment, alice) - defer psjResult.Destroy() - - // Alice has now joined the room, and the server is syncing the state in the background. - - // attempts to sync should now block. Fire off a goroutine to try it. - syncResponseChan := make(chan gjson.Result) - defer close(syncResponseChan) - go func() { - response, _ := alice.MustSync(t, client.SyncReq{}) - syncResponseChan <- response - }() - - // wait for the state_ids request to arrive - psjResult.AwaitStateIdsRequest(t) - - // the client-side requests should still be waiting - select { - case <-syncResponseChan: - t.Fatalf("Sync completed before state resync complete") - default: - } +func TestPartialStateJoin(t *testing.T) { + // test that a regular /sync request made during a partial-state /send_join + // request blocks until the state is correctly synced. + t.Run("SyncBlocksDuringPartialStateJoin", func(t *testing.T) { + deployment := Deploy(t, b.BlueprintAlice) + defer deployment.Destroy(t) + alice := deployment.Client(t, "hs1", "@alice:hs1") + + psjResult := beginPartialStateJoin(t, deployment, alice) + defer psjResult.Destroy() + + // Alice has now joined the room, and the server is syncing the state in the background. + + // attempts to sync should now block. Fire off a goroutine to try it. + syncResponseChan := make(chan gjson.Result) + defer close(syncResponseChan) + go func() { + response, _ := alice.MustSync(t, client.SyncReq{}) + syncResponseChan <- response + }() + + // wait for the state_ids request to arrive + psjResult.AwaitStateIdsRequest(t) + + // the client-side requests should still be waiting + select { + case <-syncResponseChan: + t.Fatalf("Sync completed before state resync complete") + default: + } - // release the federation /state response - psjResult.FinishStateRequest() + // release the federation /state response + psjResult.FinishStateRequest() - // the /sync request should now complete, with the new room - var syncRes gjson.Result - select { - case <-time.After(1 * time.Second): - t.Fatalf("/sync request request did not complete") - case syncRes = <-syncResponseChan: - } + // the /sync request should now complete, with the new room + var syncRes gjson.Result + select { + case <-time.After(1 * time.Second): + t.Fatalf("/sync request request did not complete") + case syncRes = <-syncResponseChan: + } - roomRes := syncRes.Get("rooms.join." + client.GjsonEscape(psjResult.ServerRoom.RoomID)) - if !roomRes.Exists() { - t.Fatalf("/sync completed without join to new room\n") - } + roomRes := syncRes.Get("rooms.join." + client.GjsonEscape(psjResult.ServerRoom.RoomID)) + if !roomRes.Exists() { + t.Fatalf("/sync completed without join to new room\n") + } - // check that the state includes both charlie and derek. - matcher := match.JSONCheckOffAllowUnwanted("state.events", - []interface{}{ - "m.room.member|" + psjResult.Server.UserID("charlie"), - "m.room.member|" + psjResult.Server.UserID("derek"), - }, func(result gjson.Result) interface{} { - return strings.Join([]string{result.Map()["type"].Str, result.Map()["state_key"].Str}, "|") - }, nil, - ) - if err := matcher([]byte(roomRes.Raw)); err != nil { - t.Errorf("Did not find expected state events in /sync response: %s", err) + // check that the state includes both charlie and derek. + matcher := match.JSONCheckOffAllowUnwanted("state.events", + []interface{}{ + "m.room.member|" + psjResult.Server.UserID("charlie"), + "m.room.member|" + psjResult.Server.UserID("derek"), + }, func(result gjson.Result) interface{} { + return strings.Join([]string{result.Map()["type"].Str, result.Map()["state_key"].Str}, "|") + }, nil, + ) + if err := matcher([]byte(roomRes.Raw)); err != nil { + t.Errorf("Did not find expected state events in /sync response: %s", err) - } -} + } + }) -// when Alice does a lazy-loading sync, she should see the room immediately -func TestCanLazyLoadingSyncDuringPartialStateJoin(t *testing.T) { - deployment := Deploy(t, b.BlueprintAlice) - defer deployment.Destroy(t) - alice := deployment.Client(t, "hs1", "@alice:hs1") + // when Alice does a lazy-loading sync, she should see the room immediately + t.Run("CanLazyLoadingSyncDuringPartialStateJoin", func(t *testing.T) { + deployment := Deploy(t, b.BlueprintAlice) + defer deployment.Destroy(t) + alice := deployment.Client(t, "hs1", "@alice:hs1") - psjResult := beginPartialStateJoin(t, deployment, alice) - defer psjResult.Destroy() + psjResult := beginPartialStateJoin(t, deployment, alice) + defer psjResult.Destroy() - alice.MustSyncUntil(t, - client.SyncReq{ - Filter: buildLazyLoadingSyncFilter(), - }, - client.SyncJoinedTo(alice.UserID, psjResult.ServerRoom.RoomID), - ) - t.Logf("Alice successfully synced") -} - -// we should be able to send events in the room, during the resync -func TestCanSendEventsDuringPartialStateJoin(t *testing.T) { - t.Skip("Cannot yet send events during resync") - deployment := Deploy(t, b.BlueprintAlice) - defer deployment.Destroy(t) - alice := deployment.Client(t, "hs1", "@alice:hs1") - - psjResult := beginPartialStateJoin(t, deployment, alice) - defer psjResult.Destroy() - - alice.Client.Timeout = 2 * time.Second - paths := []string{"_matrix", "client", "r0", "rooms", psjResult.ServerRoom.RoomID, "send", "m.room.message", "0"} - res := alice.MustDoFunc(t, "PUT", paths, client.WithJSONBody(t, map[string]interface{}{ - "msgtype": "m.text", - "body": "Hello world!", - })) - body := gjson.ParseBytes(client.ParseJSON(t, res)) - eventID := body.Get("event_id").Str - t.Logf("Alice sent event event ID %s", eventID) -} + alice.MustSyncUntil(t, + client.SyncReq{ + Filter: buildLazyLoadingSyncFilter(), + }, + client.SyncJoinedTo(alice.UserID, psjResult.ServerRoom.RoomID), + ) + t.Logf("Alice successfully synced") + }) -// a request to (client-side) /members?at= should block until the (federation) /state request completes -// TODO(faster_joins): also need to test /state, and /members without an `at`, which follow a different path -func TestMembersRequestBlocksDuringPartialStateJoin(t *testing.T) { - deployment := Deploy(t, b.BlueprintAlice) - defer deployment.Destroy(t) - alice := deployment.Client(t, "hs1", "@alice:hs1") + // we should be able to send events in the room, during the resync + t.Run("CanSendEventsDuringPartialStateJoin", func(t *testing.T) { + t.Skip("Cannot yet send events during resync") + deployment := Deploy(t, b.BlueprintAlice) + defer deployment.Destroy(t) + alice := deployment.Client(t, "hs1", "@alice:hs1") + + psjResult := beginPartialStateJoin(t, deployment, alice) + defer psjResult.Destroy() + + alice.Client.Timeout = 2 * time.Second + paths := []string{"_matrix", "client", "r0", "rooms", psjResult.ServerRoom.RoomID, "send", "m.room.message", "0"} + res := alice.MustDoFunc(t, "PUT", paths, client.WithJSONBody(t, map[string]interface{}{ + "msgtype": "m.text", + "body": "Hello world!", + })) + body := gjson.ParseBytes(client.ParseJSON(t, res)) + eventID := body.Get("event_id").Str + t.Logf("Alice sent event event ID %s", eventID) + }) - psjResult := beginPartialStateJoin(t, deployment, alice) - defer psjResult.Destroy() + // a request to (client-side) /members?at= should block until the (federation) /state request completes + // TODO(faster_joins): also need to test /state, and /members without an `at`, which follow a different path + t.Run("MembersRequestBlocksDuringPartialStateJoin", func(t *testing.T) { + deployment := Deploy(t, b.BlueprintAlice) + defer deployment.Destroy(t) + alice := deployment.Client(t, "hs1", "@alice:hs1") - // we need a sync token to pass to the `at` param. - syncToken := alice.MustSyncUntil(t, - client.SyncReq{ - Filter: buildLazyLoadingSyncFilter(), - }, - client.SyncJoinedTo(alice.UserID, psjResult.ServerRoom.RoomID), - ) - t.Logf("Alice successfully synced") - - // Fire off a goroutine to send the request, and write the response back to a channel. - clientMembersRequestResponseChan := make(chan *http.Response) - defer close(clientMembersRequestResponseChan) - go func() { - queryParams := url.Values{} - queryParams.Set("at", syncToken) - clientMembersRequestResponseChan <- alice.MustDoFunc( - t, - "GET", - []string{"_matrix", "client", "r0", "rooms", psjResult.ServerRoom.RoomID, "members"}, - client.WithQueries(queryParams), - ) - }() + psjResult := beginPartialStateJoin(t, deployment, alice) + defer psjResult.Destroy() - // release the federation /state response - psjResult.FinishStateRequest() - - // the client-side /members request should now complete, with a response that includes charlie and derek. - select { - case <-time.After(1 * time.Second): - t.Fatalf("client-side /members request did not complete") - case res := <-clientMembersRequestResponseChan: - must.MatchResponse(t, res, match.HTTPResponse{ - JSON: []match.JSON{ - match.JSONCheckOff("chunk", - []interface{}{ - "m.room.member|" + alice.UserID, - "m.room.member|" + psjResult.Server.UserID("charlie"), - "m.room.member|" + psjResult.Server.UserID("derek"), - }, func(result gjson.Result) interface{} { - return strings.Join([]string{result.Map()["type"].Str, result.Map()["state_key"].Str}, "|") - }, nil), + // we need a sync token to pass to the `at` param. + syncToken := alice.MustSyncUntil(t, + client.SyncReq{ + Filter: buildLazyLoadingSyncFilter(), }, - }) - } + client.SyncJoinedTo(alice.UserID, psjResult.ServerRoom.RoomID), + ) + t.Logf("Alice successfully synced") + + // Fire off a goroutine to send the request, and write the response back to a channel. + clientMembersRequestResponseChan := make(chan *http.Response) + defer close(clientMembersRequestResponseChan) + go func() { + queryParams := url.Values{} + queryParams.Set("at", syncToken) + clientMembersRequestResponseChan <- alice.MustDoFunc( + t, + "GET", + []string{"_matrix", "client", "r0", "rooms", psjResult.ServerRoom.RoomID, "members"}, + client.WithQueries(queryParams), + ) + }() + + // release the federation /state response + psjResult.FinishStateRequest() + + // the client-side /members request should now complete, with a response that includes charlie and derek. + select { + case <-time.After(1 * time.Second): + t.Fatalf("client-side /members request did not complete") + case res := <-clientMembersRequestResponseChan: + must.MatchResponse(t, res, match.HTTPResponse{ + JSON: []match.JSON{ + match.JSONCheckOff("chunk", + []interface{}{ + "m.room.member|" + alice.UserID, + "m.room.member|" + psjResult.Server.UserID("charlie"), + "m.room.member|" + psjResult.Server.UserID("derek"), + }, func(result gjson.Result) interface{} { + return strings.Join([]string{result.Map()["type"].Str, result.Map()["state_key"].Str}, "|") + }, nil), + }, + }) + } + }) } // buildLazyLoadingSyncFilter constructs a json-marshalled filter suitable the 'Filter' field of a client.SyncReq