diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index b4b1bfed..a024138f 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -30,7 +30,7 @@ jobs: matrix: include: - homeserver: Synapse - tags: synapse_blacklist,msc3083 + tags: synapse_blacklist msc3083 faster_joins - homeserver: Dendrite tags: msc2836 dendrite_blacklist diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index 793c61d3..45ec6a74 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -9,6 +9,7 @@ import ( "encoding/json" "fmt" "net/http" + "net/url" "strings" "testing" "time" @@ -19,117 +20,272 @@ import ( "github.com/matrix-org/complement/internal/b" "github.com/matrix-org/complement/internal/client" + "github.com/matrix-org/complement/internal/docker" "github.com/matrix-org/complement/internal/federation" "github.com/matrix-org/complement/internal/match" + "github.com/matrix-org/complement/internal/must" ) -// TestSyncBlocksDuringPartialStateJoin tests that a regular /sync request -// made during a partial-state /send_join request blocks until the state is -// correctly synced. -func TestSyncBlocksDuringPartialStateJoin(t *testing.T) { - // We make a room on the Complement server, then have @alice:hs1 join it, - // and make a sync request while the resync is in flight +func TestPartialStateJoin(t *testing.T) { + // test that a regular /sync request made during a partial-state /send_join + // request blocks until the state is correctly synced. + t.Run("SyncBlocksDuringPartialStateJoin", func(t *testing.T) { + deployment := Deploy(t, b.BlueprintAlice) + defer deployment.Destroy(t) + alice := deployment.Client(t, "hs1", "@alice:hs1") - deployment := Deploy(t, b.BlueprintAlice) - defer deployment.Destroy(t) + psjResult := beginPartialStateJoin(t, deployment, alice) + defer psjResult.Destroy() - alice := deployment.Client(t, "hs1", "@alice:hs1") + // Alice has now joined the room, and the server is syncing the state in the background. - srv := federation.NewServer(t, deployment, + // attempts to sync should now block. Fire off a goroutine to try it. + syncResponseChan := make(chan gjson.Result) + defer close(syncResponseChan) + go func() { + response, _ := alice.MustSync(t, client.SyncReq{}) + syncResponseChan <- response + }() + + // wait for the state_ids request to arrive + psjResult.AwaitStateIdsRequest(t) + + // the client-side requests should still be waiting + select { + case <-syncResponseChan: + t.Fatalf("Sync completed before state resync complete") + default: + } + + // release the federation /state response + psjResult.FinishStateRequest() + + // the /sync request should now complete, with the new room + var syncRes gjson.Result + select { + case <-time.After(1 * time.Second): + t.Fatalf("/sync request request did not complete") + case syncRes = <-syncResponseChan: + } + + roomRes := syncRes.Get("rooms.join." + client.GjsonEscape(psjResult.ServerRoom.RoomID)) + if !roomRes.Exists() { + t.Fatalf("/sync completed without join to new room\n") + } + + // check that the state includes both charlie and derek. + matcher := match.JSONCheckOffAllowUnwanted("state.events", + []interface{}{ + "m.room.member|" + psjResult.Server.UserID("charlie"), + "m.room.member|" + psjResult.Server.UserID("derek"), + }, func(result gjson.Result) interface{} { + return strings.Join([]string{result.Map()["type"].Str, result.Map()["state_key"].Str}, "|") + }, nil, + ) + if err := matcher([]byte(roomRes.Raw)); err != nil { + t.Errorf("Did not find expected state events in /sync response: %s", err) + + } + }) + + // when Alice does a lazy-loading sync, she should see the room immediately + t.Run("CanLazyLoadingSyncDuringPartialStateJoin", func(t *testing.T) { + deployment := Deploy(t, b.BlueprintAlice) + defer deployment.Destroy(t) + alice := deployment.Client(t, "hs1", "@alice:hs1") + + psjResult := beginPartialStateJoin(t, deployment, alice) + defer psjResult.Destroy() + + alice.MustSyncUntil(t, + client.SyncReq{ + Filter: buildLazyLoadingSyncFilter(), + }, + client.SyncJoinedTo(alice.UserID, psjResult.ServerRoom.RoomID), + ) + t.Logf("Alice successfully synced") + }) + + // we should be able to send events in the room, during the resync + t.Run("CanSendEventsDuringPartialStateJoin", func(t *testing.T) { + t.Skip("Cannot yet send events during resync") + deployment := Deploy(t, b.BlueprintAlice) + defer deployment.Destroy(t) + alice := deployment.Client(t, "hs1", "@alice:hs1") + + psjResult := beginPartialStateJoin(t, deployment, alice) + defer psjResult.Destroy() + + alice.Client.Timeout = 2 * time.Second + paths := []string{"_matrix", "client", "r0", "rooms", psjResult.ServerRoom.RoomID, "send", "m.room.message", "0"} + res := alice.MustDoFunc(t, "PUT", paths, client.WithJSONBody(t, map[string]interface{}{ + "msgtype": "m.text", + "body": "Hello world!", + })) + body := gjson.ParseBytes(client.ParseJSON(t, res)) + eventID := body.Get("event_id").Str + t.Logf("Alice sent event event ID %s", eventID) + }) + + // a request to (client-side) /members?at= should block until the (federation) /state request completes + // TODO(faster_joins): also need to test /state, and /members without an `at`, which follow a different path + t.Run("MembersRequestBlocksDuringPartialStateJoin", func(t *testing.T) { + deployment := Deploy(t, b.BlueprintAlice) + defer deployment.Destroy(t) + alice := deployment.Client(t, "hs1", "@alice:hs1") + + psjResult := beginPartialStateJoin(t, deployment, alice) + defer psjResult.Destroy() + + // we need a sync token to pass to the `at` param. + syncToken := alice.MustSyncUntil(t, + client.SyncReq{ + Filter: buildLazyLoadingSyncFilter(), + }, + client.SyncJoinedTo(alice.UserID, psjResult.ServerRoom.RoomID), + ) + t.Logf("Alice successfully synced") + + // Fire off a goroutine to send the request, and write the response back to a channel. + clientMembersRequestResponseChan := make(chan *http.Response) + defer close(clientMembersRequestResponseChan) + go func() { + queryParams := url.Values{} + queryParams.Set("at", syncToken) + clientMembersRequestResponseChan <- alice.MustDoFunc( + t, + "GET", + []string{"_matrix", "client", "r0", "rooms", psjResult.ServerRoom.RoomID, "members"}, + client.WithQueries(queryParams), + ) + }() + + // release the federation /state response + psjResult.FinishStateRequest() + + // the client-side /members request should now complete, with a response that includes charlie and derek. + select { + case <-time.After(1 * time.Second): + t.Fatalf("client-side /members request did not complete") + case res := <-clientMembersRequestResponseChan: + must.MatchResponse(t, res, match.HTTPResponse{ + JSON: []match.JSON{ + match.JSONCheckOff("chunk", + []interface{}{ + "m.room.member|" + alice.UserID, + "m.room.member|" + psjResult.Server.UserID("charlie"), + "m.room.member|" + psjResult.Server.UserID("derek"), + }, func(result gjson.Result) interface{} { + return strings.Join([]string{result.Map()["type"].Str, result.Map()["state_key"].Str}, "|") + }, nil), + }, + }) + } + }) +} + +// buildLazyLoadingSyncFilter constructs a json-marshalled filter suitable the 'Filter' field of a client.SyncReq +func buildLazyLoadingSyncFilter() string { + j, _ := json.Marshal(map[string]interface{}{ + "room": map[string]interface{}{ + "timeline": map[string]interface{}{ + "lazy_load_members": true, + }, + "state": map[string]interface{}{ + "lazy_load_members": true, + }, + }, + }) + return string(j) +} + +// partialStateJoinResult is the result of beginPartialStateJoin +type partialStateJoinResult struct { + cancelListener func() + Server *federation.Server + ServerRoom *federation.ServerRoom + fedStateIdsRequestReceivedWaiter *Waiter + fedStateIdsSendResponseWaiter *Waiter +} + +// beginPartialStateJoin spins up a room on a complement server, +// then has a test user join it. It returns a partialStateJoinResult, +// which must be Destroy'd on completion. +// +// When this method completes, the /join request will have completed, but the +// state has not yet been re-synced. To allow the re-sync to proceed, call +// partialStateJoinResult.FinishStateRequest. +func beginPartialStateJoin(t *testing.T, deployment *docker.Deployment, joiningUser *client.CSAPI) partialStateJoinResult { + result := partialStateJoinResult{} + success := false + defer func() { + if !success { + result.Destroy() + } + }() + + result.Server = federation.NewServer(t, deployment, federation.HandleKeyRequests(), federation.HandlePartialStateMakeSendJoinRequests(), federation.HandleEventRequests(), ) - cancel := srv.Listen() - defer cancel() + result.cancelListener = result.Server.Listen() // some things for orchestration - fedStateIdsRequestReceivedWaiter := NewWaiter() - defer fedStateIdsRequestReceivedWaiter.Finish() - fedStateIdsSendResponseWaiter := NewWaiter() - defer fedStateIdsSendResponseWaiter.Finish() + result.fedStateIdsRequestReceivedWaiter = NewWaiter() + result.fedStateIdsSendResponseWaiter = NewWaiter() // create the room on the complement server, with charlie and derek as members - charlie := srv.UserID("charlie") - derek := srv.UserID("derek") - serverRoom := makeTestRoom(t, srv, alice.GetDefaultRoomVersion(t), charlie, derek) + roomVer := joiningUser.GetDefaultRoomVersion(t) + result.ServerRoom = result.Server.MustMakeRoom(t, roomVer, federation.InitialRoomEvents(roomVer, result.Server.UserID("charlie"))) + result.ServerRoom.AddEvent(result.Server.MustCreateEvent(t, result.ServerRoom, b.Event{ + Type: "m.room.member", + StateKey: b.Ptr(result.Server.UserID("derek")), + Sender: result.Server.UserID("derek"), + Content: map[string]interface{}{ + "membership": "join", + }, + })) // register a handler for /state_ids requests, which finishes fedStateIdsRequestReceivedWaiter, then // waits for fedStateIdsSendResponseWaiter and sends a reply - handleStateIdsRequests(t, srv, serverRoom, fedStateIdsRequestReceivedWaiter, fedStateIdsSendResponseWaiter) + handleStateIdsRequests(t, result.Server, result.ServerRoom, result.fedStateIdsRequestReceivedWaiter, result.fedStateIdsSendResponseWaiter) // a handler for /state requests, which sends a sensible response - handleStateRequests(t, srv, serverRoom, nil, nil) - - // have alice join the room by room ID. - alice.JoinRoom(t, serverRoom.RoomID, []string{srv.ServerName()}) - t.Logf("Join completed") + handleStateRequests(t, result.Server, result.ServerRoom, nil, nil) - // Alice has now joined the room, and the server is syncing the state in the background. + // have joiningUser join the room by room ID. + joiningUser.JoinRoom(t, result.ServerRoom.RoomID, []string{result.Server.ServerName()}) + t.Logf("/join request completed") - // attempts to sync should now block. Fire off a goroutine to try it. - syncResponseChan := make(chan gjson.Result) - defer close(syncResponseChan) - go func() { - response, _ := alice.MustSync(t, client.SyncReq{}) - syncResponseChan <- response - }() - - // wait for the state_ids request to arrive - fedStateIdsRequestReceivedWaiter.Waitf(t, 5*time.Second, "Waiting for /state_ids request") + success = true + return result +} - // the client-side requests should still be waiting - select { - case <-syncResponseChan: - t.Fatalf("Sync completed before state resync complete") - default: +// Destroy cleans up the resources associated with the join attempt. It must +// be called once the test is finished +func (psj *partialStateJoinResult) Destroy() { + if psj.fedStateIdsSendResponseWaiter != nil { + psj.fedStateIdsSendResponseWaiter.Finish() } - // release the federation /state response - fedStateIdsSendResponseWaiter.Finish() - - // the /sync request should now complete, with the new room - var syncRes gjson.Result - select { - case <-time.After(1 * time.Second): - t.Fatalf("/sync request request did not complete") - case syncRes = <-syncResponseChan: + if psj.fedStateIdsRequestReceivedWaiter != nil { + psj.fedStateIdsRequestReceivedWaiter.Finish() } - roomRes := syncRes.Get("rooms.join." + client.GjsonEscape(serverRoom.RoomID)) - if !roomRes.Exists() { - t.Fatalf("/sync completed without join to new room\n") + if psj.cancelListener != nil { + psj.cancelListener() } +} - // check that the state includes both charlie and derek. - matcher := match.JSONCheckOffAllowUnwanted("state.events", - []interface{}{ - "m.room.member|" + charlie, - "m.room.member|" + derek, - }, func(result gjson.Result) interface{} { - return strings.Join([]string{result.Map()["type"].Str, result.Map()["state_key"].Str}, "|") - }, nil, - ) - if err := matcher([]byte(roomRes.Raw)); err != nil { - t.Errorf("Did not find expected state events in /sync response: %s", err) - } +// wait for a /state_ids request for the test room to arrive +func (psj *partialStateJoinResult) AwaitStateIdsRequest(t *testing.T) { + psj.fedStateIdsRequestReceivedWaiter.Waitf(t, 5*time.Second, "Waiting for /state_ids request") } -// makeTestRoom constructs a test room on the Complement server, and adds the given extra members -func makeTestRoom(t *testing.T, srv *federation.Server, roomVer gomatrixserverlib.RoomVersion, creator string, members ...string) *federation.ServerRoom { - serverRoom := srv.MustMakeRoom(t, roomVer, federation.InitialRoomEvents(roomVer, creator)) - for _, m := range members { - serverRoom.AddEvent(srv.MustCreateEvent(t, serverRoom, b.Event{ - Type: "m.room.member", - StateKey: b.Ptr(m), - Sender: m, - Content: map[string]interface{}{ - "membership": "join", - }, - }), - ) - } - return serverRoom +// allow the /state_ids request to complete, thus allowing the state re-sync to complete +func (psj *partialStateJoinResult) FinishStateRequest() { + psj.fedStateIdsSendResponseWaiter.Finish() } // handleStateIdsRequests registers a handler for /state_ids requests for serverRoom.