diff --git a/internal/client/client.go b/internal/client/client.go index 90f68dd5..83d7772d 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -260,9 +260,9 @@ func (c *CSAPI) SetPushRule(t *testing.T, scope string, kind string, ruleID stri return c.MustDoFunc(t, "PUT", []string{"_matrix", "client", "v3", "pushrules", scope, kind, ruleID}, WithJSONBody(t, body), WithQueries(queryParams)) } -// SendEventSynced sends `e` into the room and waits for its event ID to come down /sync. +// SendEventUnsynced sends `e` into the room. // Returns the event ID of the sent event. -func (c *CSAPI) SendEventSynced(t *testing.T, roomID string, e b.Event) string { +func (c *CSAPI) SendEventUnsynced(t *testing.T, roomID string, e b.Event) string { t.Helper() c.txnID++ paths := []string{"_matrix", "client", "v3", "rooms", roomID, "send", e.Type, strconv.Itoa(c.txnID)} @@ -272,6 +272,14 @@ func (c *CSAPI) SendEventSynced(t *testing.T, roomID string, e b.Event) string { res := c.MustDoFunc(t, "PUT", paths, WithJSONBody(t, e.Content)) body := ParseJSON(t, res) eventID := GetJSONFieldStr(t, body, "event_id") + return eventID +} + +// SendEventSynced sends `e` into the room and waits for its event ID to come down /sync. +// Returns the event ID of the sent event. +func (c *CSAPI) SendEventSynced(t *testing.T, roomID string, e b.Event) string { + t.Helper() + eventID := c.SendEventUnsynced(t, roomID, e) t.Logf("SendEventSynced waiting for event ID %s", eventID) c.MustSyncUntil(t, SyncReq{}, SyncTimelineHas(roomID, func(r gjson.Result) bool { return r.Get("event_id").Str == eventID diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index d2c505f3..e2fbaba1 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -118,52 +118,133 @@ func TestPartialStateJoin(t *testing.T) { deployment := Deploy(t, b.BlueprintAlice) defer deployment.Destroy(t) - // test that a regular /sync request made during a partial-state /send_join - // request blocks until the state is correctly synced. - t.Run("SyncBlocksDuringPartialStateJoin", func(t *testing.T) { - alice := deployment.RegisterUser(t, "hs1", "t1alice", "secret", false) + // Test that an eager (i.e. NOT lazy-loading members) /sync request made during a + // partial-state /send_join request does not return the room until the resync has + // completed. + // + // We need to test both an eager initial sync (no `since` token) and an eager + // incremental sync (has a `since` token) separately. Do this as follows + // + // 1. Partial join Alice to a remote room. + // 2. Have Alice lazy-sync until she sees (1). + // 3. Have Alice eager sync. The response should omit the remote room. + // 4. Have Alice send a message to the remote room. + // 5. Have Alice lazy-sync until she sees (4). + // 6. Have Alice eager-sync. The response should omit the remote room. + // 7. Allow the resync to complete. + // 8. Have Alice eager-sync until she sees the remote room. + // + // Alice's lazy syncs in steps 2 and 5 are incremental. + // (We have Alice lazy-sync to avoid races; we want to be sure that the server has + // deliberately chosen to omit the remote room from the lazy-sync response.) + + eagerSyncDuringPartialStateJoinTest := func(t *testing.T, usernameSuffix string, incremental bool) { + alice := deployment.RegisterUser(t, "hs1", "t1alice_"+usernameSuffix, "secret", false) + + // Maintain two sync tokens: once for the eager syncs under test, and another + // for the lazy syncs which we use to avoid races. + var eagerSyncToken, lazySyncToken string + + getEagerSyncReq := func() client.SyncReq { + // We track the `next_batch` returned by Alice's eager syncs. However, we + // will only _use_ it when we are testing the behaviour of incremental syncs. + if incremental { + return client.SyncReq{Since: eagerSyncToken} + } else { + // NB: We are assuming that the responses to repeated initial syncs + // are not cached by the homeserver. + return client.SyncReq{Since: ""} + } + } - server := createTestServer(t, deployment) + getLazySyncReq := func() client.SyncReq { + return client.SyncReq{ + Since: lazySyncToken, + Filter: buildLazyLoadingSyncFilter(nil), + } + } + + t.Log("Do a one-off initial sync for Alice, so we have a next_batch token for future incremental syncs") + _, eagerSyncToken = alice.MustSync(t, client.SyncReq{}) + + t.Log("1. Partial join Alice to a remote room.") + server := createTestServer( + t, + deployment, + // Allow PDUs and EDUs, since Alice will send a message in the room. + federation.HandleTransactionRequests( + func(e *gomatrixserverlib.Event) {}, + func(e gomatrixserverlib.EDU) {}, + ), + ) cancel := server.Listen() defer cancel() serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t)) psjResult := beginPartialStateJoin(t, server, serverRoom, alice) defer psjResult.Destroy(t) - // Alice has now joined the room, and the server is syncing the state in the background. + t.Log("2. Have Alice lazy-sync until she sees (1).") + lazySyncToken = alice.MustSyncUntil( + t, + getLazySyncReq(), + client.SyncJoinedTo(alice.UserID, serverRoom.RoomID), + ) - // attempts to sync should now block. Fire off a goroutine to try it. - syncResponseChan := make(chan gjson.Result) - go func() { - response, _ := alice.MustSync(t, client.SyncReq{}) - syncResponseChan <- response - close(syncResponseChan) - }() + t.Log("3. Have Alice eager sync. The response should omit the remote room.") + syncJoinedRoomPath := "rooms.join." + client.GjsonEscape(serverRoom.RoomID) + response, eagerSyncToken := alice.MustSync( + t, + getEagerSyncReq(), + ) + must.MatchGJSON( + t, + response, + match.JSONKeyMissing(syncJoinedRoomPath), + ) + + t.Log("4. Have Alice send a message to the remote room.") + messageId := alice.SendEventUnsynced(t, serverRoom.RoomID, b.Event{ + Type: "m.room.message", + Content: map[string]interface{}{ + "body": "Hello world", + "msgtype": "m.text", + }, + Sender: alice.UserID, + }) + + t.Log("5. Have Alice lazy-sync until she sees (4).") + alice.MustSyncUntil( + t, + getLazySyncReq(), + client.SyncTimelineHasEventID(serverRoom.RoomID, messageId), + ) + + t.Log("6. Have Alice eager-sync. The response should omit the remote room.") + response, eagerSyncToken = alice.MustSync( + t, + getEagerSyncReq(), + ) + must.MatchGJSON( + t, + response, + match.JSONKeyMissing(syncJoinedRoomPath), + ) + + t.Log("7. Allow the resync to complete.") // wait for the state_ids request to arrive psjResult.AwaitStateIdsRequest(t) - // the client-side requests should still be waiting - select { - case <-syncResponseChan: - t.Fatalf("Sync completed before state resync complete") - default: - } - // release the federation /state response psjResult.FinishStateRequest() + awaitPartialStateJoinCompletion(t, serverRoom, alice) - // the /sync request should now complete, with the new room - var syncRes gjson.Result - select { - case <-time.After(1 * time.Second): - t.Fatalf("/sync request request did not complete") - case syncRes = <-syncResponseChan: - } + t.Log("8. Have Alice eager-sync. She should see the remote room.") + response, eagerSyncToken = alice.MustSync(t, getEagerSyncReq()) - roomRes := syncRes.Get("rooms.join." + client.GjsonEscape(serverRoom.RoomID)) + roomRes := response.Get(syncJoinedRoomPath) if !roomRes.Exists() { - t.Fatalf("/sync completed without join to new room\n") + t.Fatal("Sync does NOT include the joined room after the resync, but should") } // check that the state includes both charlie and derek. @@ -177,7 +258,79 @@ func TestPartialStateJoin(t *testing.T) { ) if err := matcher([]byte(roomRes.Raw)); err != nil { t.Errorf("Did not find expected state events in /sync response: %s", err) + } + } + + t.Run("EagerInitialSyncDuringPartialStateJoin", func(t *testing.T) { + eagerSyncDuringPartialStateJoinTest(t, "initial", false) + }) + + t.Run("EagerIncrementalSyncDuringPartialStateJoin", func(t *testing.T) { + eagerSyncDuringPartialStateJoinTest(t, "incremental", true) + }) + + // The tests above use long-polling syncs with a (complement-default) timeout of + // 1s. They don't test that the long-poll returns early when the partial state + // resync completes. This test does exactly that. + t.Run("EagerLongPollingSyncWokenWhenResyncCompletes", func(t *testing.T) { + alice := deployment.RegisterUser(t, "hs1", "t1alice_long_polling", "secret", false) + + t.Log("Alice partial-joins a remote room.") + server := createTestServer(t, deployment) + cancel := server.Listen() + defer cancel() + serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t)) + psjResult := beginPartialStateJoin(t, server, serverRoom, alice) + defer psjResult.Destroy(t) + + t.Log("Alice eager-syncs. The response should not contain the remote room.") + response, nextBatch := alice.MustSync(t, client.SyncReq{}) + + syncJoinedRoomPath := "rooms.join." + client.GjsonEscape(serverRoom.RoomID) + if response.Get(syncJoinedRoomPath).Exists() { + t.Fatal("Sync shouldn't include the joined room until resync is over") + } + // Begin a long polling sync that shouldn't return yet since no change happened + + responseChan := make(chan gjson.Result, 1) + go func() { + defer close(responseChan) + response, _ := alice.MustSync(t, client.SyncReq{ + TimeoutMillis: "10000", + Since: nextBatch, + }) + responseChan <- response + }() + + // Try to wait for the sync to actually start, then un-partial-state the room + time.Sleep(2 * time.Second) + + // Sanity check that the sync hasn't completed + select { + case response := <-responseChan: + t.Fatalf("Recieved sync response too soon: %s", response.Raw) + default: + t.Logf("No sync response yet") + } + + // wait for the state_ids request to arrive + psjResult.AwaitStateIdsRequest(t) + // release the federation /state response + psjResult.FinishStateRequest() + + // Try to wait for the sync to return or timeout after 15 seconds, + // as the above tests are using a timeout of 10 seconds + select { + case response = <-responseChan: + case <-time.After(time.Second * 10): + t.Fatal("sync should have returned before the timeout") + } + + // the /sync request should now complete, with the new room + roomRes := response.Get(syncJoinedRoomPath) + if !roomRes.Exists() { + t.Fatal("Sync does NOT include the joined room after resync") } }) @@ -918,8 +1071,10 @@ func TestPartialStateJoin(t *testing.T) { } }) - // test that a partial-state join continues syncing state after a restart - // the same as SyncBlocksDuringPartialStateJoin, with a restart in the middle + // Test that a partial-state join continues syncing state after a restart + // Similar to EagerIncrementalSyncDuringPartialStateJoin, with a restart in the + // middle. (But this test is slightly simpler: it doesn't lazy-sync before + // eager-syncing, and doesn't send a message to the partial-state room.) t.Run("PartialStateJoinContinuesAfterRestart", func(t *testing.T) { alice := deployment.RegisterUser(t, "hs1", "t12alice", "secret", false) @@ -935,45 +1090,36 @@ func TestPartialStateJoin(t *testing.T) { // wait for the state_ids request to arrive psjResult.AwaitStateIdsRequest(t) + // Eager sync shouldn't include the room yet + response, nextBatch := alice.MustSync(t, client.SyncReq{}) + + syncJoinedRoomPath := "rooms.join." + client.GjsonEscape(serverRoom.RoomID) + if response.Get(syncJoinedRoomPath).Exists() { + t.Fatal("Sync shouldn't include the joined room until resync is over") + } + // restart the homeserver err := deployment.Restart(t) if err != nil { t.Errorf("Failed to restart homeserver: %s", err) } - // attempts to sync should block. Fire off a goroutine to try it. - syncResponseChan := make(chan gjson.Result) - go func() { - response, _ := alice.MustSync(t, client.SyncReq{}) - syncResponseChan <- response - close(syncResponseChan) - }() + // Sync still shouldn't include the room + response, nextBatch = alice.MustSync(t, client.SyncReq{Since: nextBatch}) - // we expect another state_ids request to arrive. - // we'd do another AwaitStateIdsRequest, except it's single-use. - - // the client-side requests should still be waiting - select { - case <-syncResponseChan: - t.Fatalf("Sync completed before state resync complete") - default: + if response.Get(syncJoinedRoomPath).Exists() { + t.Fatal("Sync shouldn't include the joined room until resync is over") } // release the federation /state response psjResult.FinishStateRequest() // the /sync request should now complete, with the new room - var syncRes gjson.Result - select { - case <-time.After(1 * time.Second): - t.Fatalf("/sync request request did not complete") - case syncRes = <-syncResponseChan: - } - - roomRes := syncRes.Get("rooms.join." + client.GjsonEscape(serverRoom.RoomID)) - if !roomRes.Exists() { - t.Fatalf("/sync completed without join to new room\n") - } + nextBatch = alice.MustSyncUntil( + t, + client.SyncReq{Since: nextBatch}, + client.SyncJoinedTo(alice.UserID, serverRoom.RoomID), + ) }) // test that a partial-state join can fall back to other homeservers when re-syncing @@ -1031,35 +1177,24 @@ func TestPartialStateJoin(t *testing.T) { // wait until hs2 starts syncing state fedStateIdsRequestReceivedWaiter.Waitf(t, 5*time.Second, "Waiting for /state_ids request") - syncResponseChan := make(chan gjson.Result) - go func() { - response, _ := charlie.MustSync(t, client.SyncReq{}) - syncResponseChan <- response - close(syncResponseChan) - }() + response, nextBatch := charlie.MustSync(t, client.SyncReq{}) - // the client-side requests should still be waiting - select { - case <-syncResponseChan: - t.Fatalf("hs2 sync completed before state resync complete") - default: + // the client-side requests shouldn't report the join yet + syncJoinedRoomPath := "rooms.join." + client.GjsonEscape(roomID) + if response.Get(syncJoinedRoomPath).Exists() { + t.Fatal("Sync shouldn't include the joined room yet") } // reply to hs2 with a bogus /state_ids response fedStateIdsSendResponseWaiter.Finish() - // charlie's /sync request should now complete, with the new room - var syncRes gjson.Result - select { - case <-time.After(1 * time.Second): - t.Fatalf("hs2 /sync request request did not complete") - case syncRes = <-syncResponseChan: - } - - roomRes := syncRes.Get("rooms.join." + client.GjsonEscape(roomID)) - if !roomRes.Exists() { - t.Fatalf("hs2 /sync completed without join to new room\n") - } + // We expect hs2 to fall back to requesting state from hs1, in order to + // complete the partial state join + nextBatch = charlie.MustSyncUntil( + t, + client.SyncReq{Since: nextBatch}, + client.SyncJoinedTo(charlie.UserID, roomID), + ) }) // test a lazy-load-members sync while re-syncing partial state, followed by completion of state syncing, @@ -1085,7 +1220,7 @@ func TestPartialStateJoin(t *testing.T) { t.Logf("Alice successfully synced") // wait for partial state to finish syncing, - // by waiting for the room to show up in a regular /sync. + // by waiting for the room to show up in /sync. psjResult.AwaitStateIdsRequest(t) psjResult.FinishStateRequest() alice.MustSyncUntil(t, @@ -1110,7 +1245,7 @@ func TestPartialStateJoin(t *testing.T) { server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{event.JSON()}, nil) } - // wait for the events to come down a regular /sync. + // wait for the events to come down a /sync. alice.MustSyncUntil(t, client.SyncReq{}, client.SyncTimelineHasEventID(serverRoom.RoomID, lastEventID), @@ -1660,7 +1795,7 @@ func TestPartialStateJoin(t *testing.T) { // Alice has now joined the room, and the server is syncing the state in the background. - // attempts to sync should now block. Fire off a goroutine to try it. + // attempts to joined_members should now block. Fire off a goroutine to try it. jmResponseChan := make(chan *http.Response) go func() { response := alice.MustDoFunc(t, "GET", []string{"_matrix", "client", "v3", "rooms", serverRoom.RoomID, "joined_members"})