From 84dc8b9039621a2b55829bad8cc1dfec19253edd Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Mon, 16 Jan 2023 15:06:37 +0100 Subject: [PATCH 01/19] Change tests to accomodate non blocking regular sync --- ...federation_room_join_partial_state_test.go | 106 ++++++------------ 1 file changed, 36 insertions(+), 70 deletions(-) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index 4d12b940..a8898de9 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -119,8 +119,8 @@ func TestPartialStateJoin(t *testing.T) { defer deployment.Destroy(t) // test that a regular /sync request made during a partial-state /send_join - // request blocks until the state is correctly synced. - t.Run("SyncBlocksDuringPartialStateJoin", func(t *testing.T) { + // request does not return the room until the state is correctly synced. + t.Run("RegularSyncDuringPartialStateJoin", func(t *testing.T) { alice := deployment.RegisterUser(t, "hs1", "t1alice", "secret", false) server := createTestServer(t, deployment) @@ -132,38 +132,26 @@ func TestPartialStateJoin(t *testing.T) { // Alice has now joined the room, and the server is syncing the state in the background. - // attempts to sync should now block. Fire off a goroutine to try it. - syncResponseChan := make(chan gjson.Result) - go func() { - response, _ := alice.MustSync(t, client.SyncReq{}) - syncResponseChan <- response - close(syncResponseChan) - }() + // Regular sync shouldn't include the room yet + response, nextBatch := alice.MustSync(t, client.SyncReq{}) + + syncJoinedRoomPath := "rooms.join." + client.GjsonEscape(serverRoom.RoomID) + if response.Get(syncJoinedRoomPath).Exists() { + t.Fatal("Regular sync shouldn't include the joined room until resync is over") + } // wait for the state_ids request to arrive psjResult.AwaitStateIdsRequest(t) - // the client-side requests should still be waiting - select { - case <-syncResponseChan: - t.Fatalf("Sync completed before state resync complete") - default: - } - // release the federation /state response psjResult.FinishStateRequest() // the /sync request should now complete, with the new room - var syncRes gjson.Result - select { - case <-time.After(1 * time.Second): - t.Fatalf("/sync request request did not complete") - case syncRes = <-syncResponseChan: - } + response, _ = alice.MustSync(t, client.SyncReq{Since: nextBatch}) - roomRes := syncRes.Get("rooms.join." + client.GjsonEscape(serverRoom.RoomID)) + roomRes := response.Get(syncJoinedRoomPath) if !roomRes.Exists() { - t.Fatalf("/sync completed without join to new room\n") + t.Fatal("Regular sync should now include the joined room since resync is over") } // check that the state includes both charlie and derek. @@ -933,44 +921,35 @@ func TestPartialStateJoin(t *testing.T) { // wait for the state_ids request to arrive psjResult.AwaitStateIdsRequest(t) + // Regular sync shouldn't include the room yet + response, nextBatch := alice.MustSync(t, client.SyncReq{}) + + syncJoinedRoomPath := "rooms.join." + client.GjsonEscape(serverRoom.RoomID) + if response.Get(syncJoinedRoomPath).Exists() { + t.Fatal("Regular sync shouldn't include the joined room until resync is over") + } + // restart the homeserver err := deployment.Restart(t) if err != nil { t.Errorf("Failed to restart homeserver: %s", err) } - // attempts to sync should block. Fire off a goroutine to try it. - syncResponseChan := make(chan gjson.Result) - go func() { - response, _ := alice.MustSync(t, client.SyncReq{}) - syncResponseChan <- response - close(syncResponseChan) - }() + // Regular sync still shouldn't include the room + response, nextBatch = alice.MustSync(t, client.SyncReq{Since: nextBatch}) - // we expect another state_ids request to arrive. - // we'd do another AwaitStateIdsRequest, except it's single-use. - - // the client-side requests should still be waiting - select { - case <-syncResponseChan: - t.Fatalf("Sync completed before state resync complete") - default: + if response.Get(syncJoinedRoomPath).Exists() { + t.Fatal("Regular sync shouldn't include the joined room until resync is over") } // release the federation /state response psjResult.FinishStateRequest() // the /sync request should now complete, with the new room - var syncRes gjson.Result - select { - case <-time.After(1 * time.Second): - t.Fatalf("/sync request request did not complete") - case syncRes = <-syncResponseChan: - } + response, _ = alice.MustSync(t, client.SyncReq{Since: nextBatch}) - roomRes := syncRes.Get("rooms.join." + client.GjsonEscape(serverRoom.RoomID)) - if !roomRes.Exists() { - t.Fatalf("/sync completed without join to new room\n") + if !response.Get(syncJoinedRoomPath).Exists() { + t.Fatal("Regular sync should now include the joined room since resync is over") } }) @@ -1029,34 +1008,21 @@ func TestPartialStateJoin(t *testing.T) { // wait until hs2 starts syncing state fedStateIdsRequestReceivedWaiter.Waitf(t, 5*time.Second, "Waiting for /state_ids request") - syncResponseChan := make(chan gjson.Result) - go func() { - response, _ := charlie.MustSync(t, client.SyncReq{}) - syncResponseChan <- response - close(syncResponseChan) - }() + response, nextBatch := charlie.MustSync(t, client.SyncReq{}) - // the client-side requests should still be waiting - select { - case <-syncResponseChan: - t.Fatalf("hs2 sync completed before state resync complete") - default: + // the client-side requests shouldn't report the join yet + syncJoinedRoomPath := "rooms.join." + client.GjsonEscape(roomID) + if response.Get(syncJoinedRoomPath).Exists() { + t.Fatal("Regular sync shouldn't include the joined room yet") } // reply to hs2 with a bogus /state_ids response fedStateIdsSendResponseWaiter.Finish() - // charlie's /sync request should now complete, with the new room - var syncRes gjson.Result - select { - case <-time.After(1 * time.Second): - t.Fatalf("hs2 /sync request request did not complete") - case syncRes = <-syncResponseChan: - } + response, _ = charlie.MustSync(t, client.SyncReq{Since: nextBatch}) - roomRes := syncRes.Get("rooms.join." + client.GjsonEscape(roomID)) - if !roomRes.Exists() { - t.Fatalf("hs2 /sync completed without join to new room\n") + if !response.Get(syncJoinedRoomPath).Exists() { + t.Fatal("hs2 /sync completed without join to new room") } }) @@ -1658,7 +1624,7 @@ func TestPartialStateJoin(t *testing.T) { // Alice has now joined the room, and the server is syncing the state in the background. - // attempts to sync should now block. Fire off a goroutine to try it. + // attempts to joined_members should now block. Fire off a goroutine to try it. jmResponseChan := make(chan *http.Response) go func() { response := alice.MustDoFunc(t, "GET", []string{"_matrix", "client", "v3", "rooms", serverRoom.RoomID, "joined_members"}) From 0b9cce3a2607f6e088457f5f7a128469d4ab5db1 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Tue, 17 Jan 2023 16:07:00 +0100 Subject: [PATCH 02/19] Remove or rename mentions of regular sync --- ...federation_room_join_partial_state_test.go | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index a8898de9..6fe7ab48 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -118,9 +118,9 @@ func TestPartialStateJoin(t *testing.T) { deployment := Deploy(t, b.BlueprintAlice) defer deployment.Destroy(t) - // test that a regular /sync request made during a partial-state /send_join + // test that a non lazy /sync request made during a partial-state /send_join // request does not return the room until the state is correctly synced. - t.Run("RegularSyncDuringPartialStateJoin", func(t *testing.T) { + t.Run("NonLazySyncDuringPartialStateJoin", func(t *testing.T) { alice := deployment.RegisterUser(t, "hs1", "t1alice", "secret", false) server := createTestServer(t, deployment) @@ -132,12 +132,12 @@ func TestPartialStateJoin(t *testing.T) { // Alice has now joined the room, and the server is syncing the state in the background. - // Regular sync shouldn't include the room yet + // sync shouldn't include the room yet response, nextBatch := alice.MustSync(t, client.SyncReq{}) syncJoinedRoomPath := "rooms.join." + client.GjsonEscape(serverRoom.RoomID) if response.Get(syncJoinedRoomPath).Exists() { - t.Fatal("Regular sync shouldn't include the joined room until resync is over") + t.Fatal("Sync shouldn't include the joined room until resync is over") } // wait for the state_ids request to arrive @@ -151,7 +151,7 @@ func TestPartialStateJoin(t *testing.T) { roomRes := response.Get(syncJoinedRoomPath) if !roomRes.Exists() { - t.Fatal("Regular sync should now include the joined room since resync is over") + t.Fatal("Sync should now include the joined room since resync is over") } // check that the state includes both charlie and derek. @@ -905,7 +905,7 @@ func TestPartialStateJoin(t *testing.T) { }) // test that a partial-state join continues syncing state after a restart - // the same as SyncBlocksDuringPartialStateJoin, with a restart in the middle + // the same as NonLazySyncDuringPartialStateJoin, with a restart in the middle t.Run("PartialStateJoinContinuesAfterRestart", func(t *testing.T) { alice := deployment.RegisterUser(t, "hs1", "t12alice", "secret", false) @@ -921,12 +921,12 @@ func TestPartialStateJoin(t *testing.T) { // wait for the state_ids request to arrive psjResult.AwaitStateIdsRequest(t) - // Regular sync shouldn't include the room yet + // Non lazy sync shouldn't include the room yet response, nextBatch := alice.MustSync(t, client.SyncReq{}) syncJoinedRoomPath := "rooms.join." + client.GjsonEscape(serverRoom.RoomID) if response.Get(syncJoinedRoomPath).Exists() { - t.Fatal("Regular sync shouldn't include the joined room until resync is over") + t.Fatal("Sync shouldn't include the joined room until resync is over") } // restart the homeserver @@ -935,11 +935,11 @@ func TestPartialStateJoin(t *testing.T) { t.Errorf("Failed to restart homeserver: %s", err) } - // Regular sync still shouldn't include the room + // Sync still shouldn't include the room response, nextBatch = alice.MustSync(t, client.SyncReq{Since: nextBatch}) if response.Get(syncJoinedRoomPath).Exists() { - t.Fatal("Regular sync shouldn't include the joined room until resync is over") + t.Fatal("Sync shouldn't include the joined room until resync is over") } // release the federation /state response @@ -949,7 +949,7 @@ func TestPartialStateJoin(t *testing.T) { response, _ = alice.MustSync(t, client.SyncReq{Since: nextBatch}) if !response.Get(syncJoinedRoomPath).Exists() { - t.Fatal("Regular sync should now include the joined room since resync is over") + t.Fatal("Sync should now include the joined room since resync is over") } }) @@ -1013,7 +1013,7 @@ func TestPartialStateJoin(t *testing.T) { // the client-side requests shouldn't report the join yet syncJoinedRoomPath := "rooms.join." + client.GjsonEscape(roomID) if response.Get(syncJoinedRoomPath).Exists() { - t.Fatal("Regular sync shouldn't include the joined room yet") + t.Fatal("Sync shouldn't include the joined room yet") } // reply to hs2 with a bogus /state_ids response @@ -1049,7 +1049,7 @@ func TestPartialStateJoin(t *testing.T) { t.Logf("Alice successfully synced") // wait for partial state to finish syncing, - // by waiting for the room to show up in a regular /sync. + // by waiting for the room to show up in /sync. psjResult.AwaitStateIdsRequest(t) psjResult.FinishStateRequest() alice.MustSyncUntil(t, @@ -1074,7 +1074,7 @@ func TestPartialStateJoin(t *testing.T) { server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{event.JSON()}, nil) } - // wait for the events to come down a regular /sync. + // wait for the events to come down a /sync. alice.MustSyncUntil(t, client.SyncReq{}, client.SyncTimelineHasEventID(serverRoom.RoomID, lastEventID), From a8dfc9df6fa56d9714a5f7ddf49ec67bc5c194f4 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Wed, 18 Jan 2023 17:52:50 +0100 Subject: [PATCH 03/19] Add long polling test --- ...federation_room_join_partial_state_test.go | 64 +++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index 6fe7ab48..c2086cc6 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -169,6 +169,70 @@ func TestPartialStateJoin(t *testing.T) { } }) + t.Run("NonLazyLongPollingSyncDuringPartialStateJoin", func(t *testing.T) { + alice := deployment.RegisterUser(t, "hs1", "t99alice", "secret", false) + + server := createTestServer(t, deployment) + cancel := server.Listen() + defer cancel() + serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t)) + psjResult := beginPartialStateJoin(t, server, serverRoom, alice) + defer psjResult.Destroy(t) + + // Alice has now joined the room, and the server is syncing the state in the background. + + // initial sync shouldn't include the room yet, but still return immediatly + response, nextBatch := alice.MustSync(t, client.SyncReq{ + TimeoutMillis: "10000", + }) + + syncJoinedRoomPath := "rooms.join." + client.GjsonEscape(serverRoom.RoomID) + if response.Get(syncJoinedRoomPath).Exists() { + t.Fatal("Sync shouldn't include the joined room until resync is over") + } + + // Begin a long polling sync that shouldn't return yet since no change happened + responseChan := make(chan gjson.Result, 1) + syncStarted := make(chan struct{}) + go func() { + defer close(responseChan) + defer close(syncStarted) + + syncStarted <- struct{}{} + response, _ := alice.MustSync(t, client.SyncReq{ + TimeoutMillis: "10000", + Since: nextBatch, + }) + responseChan <- response + }() + + // Try to wait for the sync to actually start, then un-partial-state the room + select { + case <-syncStarted: + // wait for the state_ids request to arrive + psjResult.AwaitStateIdsRequest(t) + // release the federation /state response + psjResult.FinishStateRequest() + case <-time.After(time.Second * 5): + // even though this should mostly be impossible, make sure we have a timeout + t.Fatalf("goroutine didn't start") + } + + // Try to wait for the sync to return or timeout after 15 seconds, + // as the above tests are using a timeout of 10 seconds + select { + case response = <-responseChan: + case <-time.After(time.Second * 5): + t.Errorf("sync should have returned before the timeout") + } + + // the /sync request should now complete, with the new room + roomRes := response.Get(syncJoinedRoomPath) + if !roomRes.Exists() { + t.Fatal("Sync should now include the joined room since resync is over") + } + }) + // when Alice does a lazy-loading sync, she should see the room immediately t.Run("CanLazyLoadingSyncDuringPartialStateJoin", func(t *testing.T) { alice := deployment.RegisterUser(t, "hs1", "t2alice", "secret", false) From 0af85cfe1949a1e2c965cc65640ff2dd325b098e Mon Sep 17 00:00:00 2001 From: David Robertson Date: Thu, 19 Jan 2023 13:17:20 +0000 Subject: [PATCH 04/19] WIP: test eager syncs during partial joins --- internal/client/client.go | 14 +- ...federation_room_join_partial_state_test.go | 120 ++++++++++++++++-- 2 files changed, 117 insertions(+), 17 deletions(-) diff --git a/internal/client/client.go b/internal/client/client.go index 7ffacffd..243461d6 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -260,9 +260,9 @@ func (c *CSAPI) SetPushRule(t *testing.T, scope string, kind string, ruleID stri return c.MustDoFunc(t, "PUT", []string{"_matrix", "client", "v3", "pushrules", scope, kind, ruleID}, WithJSONBody(t, body), WithQueries(queryParams)) } -// SendEventSynced sends `e` into the room and waits for its event ID to come down /sync. +// SendEventUnsynced sends `e` into the room. // Returns the event ID of the sent event. -func (c *CSAPI) SendEventSynced(t *testing.T, roomID string, e b.Event) string { +func (c *CSAPI) SendEventUnsynced(t *testing.T, roomID string, e b.Event) string { t.Helper() c.txnID++ paths := []string{"_matrix", "client", "v3", "rooms", roomID, "send", e.Type, strconv.Itoa(c.txnID)} @@ -272,6 +272,14 @@ func (c *CSAPI) SendEventSynced(t *testing.T, roomID string, e b.Event) string { res := c.MustDoFunc(t, "PUT", paths, WithJSONBody(t, e.Content)) body := ParseJSON(t, res) eventID := GetJSONFieldStr(t, body, "event_id") + return eventID +} + +// SendEventSynced sends `e` into the room and waits for its event ID to come down /sync. +// Returns the event ID of the sent event. +func (c *CSAPI) SendEventSynced(t *testing.T, roomID string, e b.Event) string { + t.Helper() + eventID := c.SendEventUnsynced(t, roomID, e) t.Logf("SendEventSynced waiting for event ID %s", eventID) c.MustSyncUntil(t, SyncReq{}, SyncTimelineHas(roomID, func(r gjson.Result) bool { return r.Get("event_id").Str == eventID @@ -414,7 +422,7 @@ func (c *CSAPI) LoginUser(t *testing.T, localpart, password string) (userID, acc "user": localpart, }, "password": password, - "type": "m.login.password", + "type": "m.login.password", } res := c.MustDoFunc(t, "POST", []string{"_matrix", "client", "v3", "login"}, WithJSONBody(t, reqBody)) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index c2086cc6..3731f663 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -118,11 +118,57 @@ func TestPartialStateJoin(t *testing.T) { deployment := Deploy(t, b.BlueprintAlice) defer deployment.Destroy(t) - // test that a non lazy /sync request made during a partial-state /send_join - // request does not return the room until the state is correctly synced. - t.Run("NonLazySyncDuringPartialStateJoin", func(t *testing.T) { - alice := deployment.RegisterUser(t, "hs1", "t1alice", "secret", false) + // Test that an eager (i.e. NOT lazy-loading members) /sync request made during a + // partial-state /send_join request does not return the room until the resync has + // completed. + // + // We need to test both an eager initial sync (no `since` token) and an eager + // incremental sync (has a `since` token) separately. Do this as follows + // + // 1. Partial join Alice to a remote room. + // 2. Have Alice lazy-sync until she sees (1). + // 3. Have Alice eager sync. The response should omit the remote room. + // 4. Have Alice send a message to the remote room. + // 5. Have Alice lazy-sync until she sees (4). + // 6. Have Alice eager-sync. The response should omit the remote room. + // 7. Allow the resync to complete. + // 8. Have Alice eager-sync until she sees the remote room. + // + // Alice's lazy syncs in steps 2 and 5 are incremental. + // (We have Alice lazy-sync to avoid races; we want to be sure that the server has + // deliberately chosen to omit the remote room from the lazy-sync response.) + + eagerSyncDuringPartialStateJoinTest := func(t *testing.T, usernameSuffix string, incremental bool) { + alice := deployment.RegisterUser(t, "hs1", "t1alice_" + usernameSuffix, "secret", false) + + // Maintain two sync tokens: once for the eager syncs under test, and another + // for the lazy syncs which we use to avoid races. + var eagerSyncToken, lazySyncToken string + + getEagerSyncReq := func() client.SyncReq { + // We track the `next_batch` returned by Alice's eager syncs. However we + // will only _use_ it when we are testing the behaviour of incremental syncs. + if incremental { + return client.SyncReq{Since: eagerSyncToken} + } else { + return client.SyncReq{Since: ""} + } + } + + getLazySyncReq := func() client.SyncReq { + return client.SyncReq{ + Since: lazySyncToken, + Filter: buildLazyLoadingSyncFilter(nil), + }, + } + + + t.Log("Do a one-off initial sync for Alice, so we have a next_batch token for future incremental syncs") + _, eagerSyncToken = alice.MustSync(t, client.SyncReq{}) + + + t.Log("1. Partial join Alice to a remote room.") server := createTestServer(t, deployment) cancel := server.Listen() defer cancel() @@ -130,24 +176,63 @@ func TestPartialStateJoin(t *testing.T) { psjResult := beginPartialStateJoin(t, server, serverRoom, alice) defer psjResult.Destroy(t) - // Alice has now joined the room, and the server is syncing the state in the background. - - // sync shouldn't include the room yet - response, nextBatch := alice.MustSync(t, client.SyncReq{}) + // 2. Have Alice lazy-sync until she sees (1). + lazySyncToken = alice.MustSyncUntil( + t, + getLazySyncReq(), + client.SyncJoinedTo(alice.UserID, serverRoom.RoomID), + ) + t.Log("3. Have Alice eager sync. The response should omit the remote room.") syncJoinedRoomPath := "rooms.join." + client.GjsonEscape(serverRoom.RoomID) - if response.Get(syncJoinedRoomPath).Exists() { - t.Fatal("Sync shouldn't include the joined room until resync is over") - } + response, eagerSyncToken := alice.MustSync( + t, + getEagerSyncReq(), + ) + must.MatchGJSON( + t, + response, + match.JSONKeyMissing(syncJoinedRoomPath), + ) + + t.Log("4. Have Alice send a message to the remote room.") + messageId := alice.SendEventUnsynced(t, serverRoom.RoomID, b.Event{ + Type: "m.room.message", + Content: map[string]interface{}{ + "body": "Hello world", + "msgtype": "m.text", + }, + Sender: alice.UserID, + }) + + t.Log("5. Have Alice lazy-sync until she sees (4).") + alice.MustSyncUntil( + t, + getLazySyncReq(), + client.SyncTimelineHasEventID(serverRoom.RoomID, messageId), + ) + + t.Log("6. Have Alice eager-sync. The response should omit the remote room.") + response, eagerSyncToken = alice.MustSync( + t, + getEagerSyncReq(), + ) + must.MatchGJSON( + t, + response, + match.JSONKeyMissing(syncJoinedRoomPath), + ) + + t.Log("7. Allow the resync to complete.") // wait for the state_ids request to arrive psjResult.AwaitStateIdsRequest(t) // release the federation /state response psjResult.FinishStateRequest() - // the /sync request should now complete, with the new room - response, _ = alice.MustSync(t, client.SyncReq{Since: nextBatch}) + t.Log("8. Have Alice eager-sync. She should see the remote room.") + response, eagerSyncToken = alice.MustSync(t, getEagerSyncReq()) roomRes := response.Get(syncJoinedRoomPath) if !roomRes.Exists() { @@ -165,8 +250,15 @@ func TestPartialStateJoin(t *testing.T) { ) if err := matcher([]byte(roomRes.Raw)); err != nil { t.Errorf("Did not find expected state events in /sync response: %s", err) - } + } + + t.Run("EagerInitialSyndDuringPartialStateJoin", func(t *testing.T) { + eagerSyncDuringPartialStateJoinTest(t, "initial", false) + }) + + t.Run("EagerIncrementalSyndDuringPartialStateJoin", func(t *testing.T) { + eagerSyncDuringPartialStateJoinTest(t, "initial", true) }) t.Run("NonLazyLongPollingSyncDuringPartialStateJoin", func(t *testing.T) { From 87a2f6536397cb5716b93e759c6e98a3f420b079 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Thu, 19 Jan 2023 13:58:40 +0000 Subject: [PATCH 05/19] Fixxxxx --- ...federation_room_join_partial_state_test.go | 89 +++---------------- 1 file changed, 12 insertions(+), 77 deletions(-) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index 3731f663..1527586d 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -139,14 +139,14 @@ func TestPartialStateJoin(t *testing.T) { // deliberately chosen to omit the remote room from the lazy-sync response.) eagerSyncDuringPartialStateJoinTest := func(t *testing.T, usernameSuffix string, incremental bool) { - alice := deployment.RegisterUser(t, "hs1", "t1alice_" + usernameSuffix, "secret", false) + alice := deployment.RegisterUser(t, "hs1", "t1alice_"+usernameSuffix, "secret", false) // Maintain two sync tokens: once for the eager syncs under test, and another // for the lazy syncs which we use to avoid races. var eagerSyncToken, lazySyncToken string getEagerSyncReq := func() client.SyncReq { - // We track the `next_batch` returned by Alice's eager syncs. However we + // We track the `next_batch` returned by Alice's eager syncs. However, we // will only _use_ it when we are testing the behaviour of incremental syncs. if incremental { return client.SyncReq{Since: eagerSyncToken} @@ -157,17 +157,14 @@ func TestPartialStateJoin(t *testing.T) { getLazySyncReq := func() client.SyncReq { return client.SyncReq{ - Since: lazySyncToken, - Filter: buildLazyLoadingSyncFilter(nil), - }, + Since: lazySyncToken, + Filter: buildLazyLoadingSyncFilter(nil), + } } - t.Log("Do a one-off initial sync for Alice, so we have a next_batch token for future incremental syncs") _, eagerSyncToken = alice.MustSync(t, client.SyncReq{}) - - t.Log("1. Partial join Alice to a remote room.") server := createTestServer(t, deployment) cancel := server.Listen() @@ -253,78 +250,14 @@ func TestPartialStateJoin(t *testing.T) { } } - t.Run("EagerInitialSyndDuringPartialStateJoin", func(t *testing.T) { + t.Run("EagerInitialSyncDuringPartialStateJoin", func(t *testing.T) { eagerSyncDuringPartialStateJoinTest(t, "initial", false) }) - t.Run("EagerIncrementalSyndDuringPartialStateJoin", func(t *testing.T) { + t.Run("EagerIncrementalSyncDuringPartialStateJoin", func(t *testing.T) { eagerSyncDuringPartialStateJoinTest(t, "initial", true) }) - t.Run("NonLazyLongPollingSyncDuringPartialStateJoin", func(t *testing.T) { - alice := deployment.RegisterUser(t, "hs1", "t99alice", "secret", false) - - server := createTestServer(t, deployment) - cancel := server.Listen() - defer cancel() - serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t)) - psjResult := beginPartialStateJoin(t, server, serverRoom, alice) - defer psjResult.Destroy(t) - - // Alice has now joined the room, and the server is syncing the state in the background. - - // initial sync shouldn't include the room yet, but still return immediatly - response, nextBatch := alice.MustSync(t, client.SyncReq{ - TimeoutMillis: "10000", - }) - - syncJoinedRoomPath := "rooms.join." + client.GjsonEscape(serverRoom.RoomID) - if response.Get(syncJoinedRoomPath).Exists() { - t.Fatal("Sync shouldn't include the joined room until resync is over") - } - - // Begin a long polling sync that shouldn't return yet since no change happened - responseChan := make(chan gjson.Result, 1) - syncStarted := make(chan struct{}) - go func() { - defer close(responseChan) - defer close(syncStarted) - - syncStarted <- struct{}{} - response, _ := alice.MustSync(t, client.SyncReq{ - TimeoutMillis: "10000", - Since: nextBatch, - }) - responseChan <- response - }() - - // Try to wait for the sync to actually start, then un-partial-state the room - select { - case <-syncStarted: - // wait for the state_ids request to arrive - psjResult.AwaitStateIdsRequest(t) - // release the federation /state response - psjResult.FinishStateRequest() - case <-time.After(time.Second * 5): - // even though this should mostly be impossible, make sure we have a timeout - t.Fatalf("goroutine didn't start") - } - - // Try to wait for the sync to return or timeout after 15 seconds, - // as the above tests are using a timeout of 10 seconds - select { - case response = <-responseChan: - case <-time.After(time.Second * 5): - t.Errorf("sync should have returned before the timeout") - } - - // the /sync request should now complete, with the new room - roomRes := response.Get(syncJoinedRoomPath) - if !roomRes.Exists() { - t.Fatal("Sync should now include the joined room since resync is over") - } - }) - // when Alice does a lazy-loading sync, she should see the room immediately t.Run("CanLazyLoadingSyncDuringPartialStateJoin", func(t *testing.T) { alice := deployment.RegisterUser(t, "hs1", "t2alice", "secret", false) @@ -1060,8 +993,10 @@ func TestPartialStateJoin(t *testing.T) { } }) - // test that a partial-state join continues syncing state after a restart - // the same as NonLazySyncDuringPartialStateJoin, with a restart in the middle + // Test that a partial-state join continues syncing state after a restart + // Similar to EagerIncrementalSyncDuringPartialStateJoin, with a restart in the + // middle. (But this test is slightly simpler: it doesn't lazy-sync before + // eager-syncing, and doesn't send a message to the partial-state room.) t.Run("PartialStateJoinContinuesAfterRestart", func(t *testing.T) { alice := deployment.RegisterUser(t, "hs1", "t12alice", "secret", false) @@ -1077,7 +1012,7 @@ func TestPartialStateJoin(t *testing.T) { // wait for the state_ids request to arrive psjResult.AwaitStateIdsRequest(t) - // Non lazy sync shouldn't include the room yet + // Eager sync shouldn't include the room yet response, nextBatch := alice.MustSync(t, client.SyncReq{}) syncJoinedRoomPath := "rooms.join." + client.GjsonEscape(serverRoom.RoomID) From cb46edc058e1fed503aa6ed9232119066bbecc30 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Thu, 19 Jan 2023 14:47:13 +0000 Subject: [PATCH 06/19] Fix suffix dummy amend to retrigger CI --- tests/federation_room_join_partial_state_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index 1527586d..b3a10ca3 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -255,7 +255,7 @@ func TestPartialStateJoin(t *testing.T) { }) t.Run("EagerIncrementalSyncDuringPartialStateJoin", func(t *testing.T) { - eagerSyncDuringPartialStateJoinTest(t, "initial", true) + eagerSyncDuringPartialStateJoinTest(t, "incremental", true) }) // when Alice does a lazy-loading sync, she should see the room immediately From 05b53bc8ab0a2054aeea7c7ac8d0a3580eb13a3a Mon Sep 17 00:00:00 2001 From: David Robertson Date: Thu, 19 Jan 2023 18:35:20 +0000 Subject: [PATCH 07/19] Fix --- tests/federation_room_join_partial_state_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index b3a10ca3..ced999f8 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -173,7 +173,7 @@ func TestPartialStateJoin(t *testing.T) { psjResult := beginPartialStateJoin(t, server, serverRoom, alice) defer psjResult.Destroy(t) - // 2. Have Alice lazy-sync until she sees (1). + t.Log("2. Have Alice lazy-sync until she sees (1).") lazySyncToken = alice.MustSyncUntil( t, getLazySyncReq(), From 84c88781482510dc68b0cd3974ce1f24fe031d4e Mon Sep 17 00:00:00 2001 From: David Robertson Date: Thu, 19 Jan 2023 18:35:38 +0000 Subject: [PATCH 08/19] Maybe use `MustSyncUntil`? --- ...federation_room_join_partial_state_test.go | 29 ++++++++----------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index ced999f8..f2619005 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -228,26 +228,21 @@ func TestPartialStateJoin(t *testing.T) { // release the federation /state response psjResult.FinishStateRequest() - t.Log("8. Have Alice eager-sync. She should see the remote room.") - response, eagerSyncToken = alice.MustSync(t, getEagerSyncReq()) - - roomRes := response.Get(syncJoinedRoomPath) - if !roomRes.Exists() { - t.Fatal("Sync should now include the joined room since resync is over") + t.Log("8. Have Alice eager-sync. She should see the remote room, including" + + "Charlie and Derek.") + remoteMembershipFor := func(localpart string) func(result gjson.Result) bool { + return func(result gjson.Result) bool { + return result.Get("type").Str == "m.room.member" && result.Get("state_key").Str == server.UserID(localpart) + } } - // check that the state includes both charlie and derek. - matcher := match.JSONCheckOffAllowUnwanted("state.events", - []interface{}{ - "m.room.member|" + server.UserID("charlie"), - "m.room.member|" + server.UserID("derek"), - }, func(result gjson.Result) interface{} { - return strings.Join([]string{result.Map()["type"].Str, result.Map()["state_key"].Str}, "|") - }, nil, + _ = alice.MustSyncUntil( + t, + getEagerSyncReq(), + client.SyncJoinedTo(alice.UserID, serverRoom.RoomID), + client.SyncStateHas(serverRoom.RoomID, remoteMembershipFor("charlie")), + client.SyncStateHas(serverRoom.RoomID, remoteMembershipFor("derek")), ) - if err := matcher([]byte(roomRes.Raw)); err != nil { - t.Errorf("Did not find expected state events in /sync response: %s", err) - } } t.Run("EagerInitialSyncDuringPartialStateJoin", func(t *testing.T) { From a078542efe0da4a3ad827fc8052503fc08fb77b9 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Thu, 19 Jan 2023 18:56:00 +0000 Subject: [PATCH 09/19] Wait for resync --- tests/federation_room_join_partial_state_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index f2619005..dc6c5696 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -227,6 +227,7 @@ func TestPartialStateJoin(t *testing.T) { // release the federation /state response psjResult.FinishStateRequest() + awaitPartialStateJoinCompletion(t, serverRoom, alice) t.Log("8. Have Alice eager-sync. She should see the remote room, including" + "Charlie and Derek.") From 5d0b5fd081ee2f8af7f654929b60eb7a0e070607 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Thu, 19 Jan 2023 18:57:34 +0000 Subject: [PATCH 10/19] Revert "Maybe use `MustSyncUntil`?" This reverts commit 84c88781482510dc68b0cd3974ce1f24fe031d4e. --- ...federation_room_join_partial_state_test.go | 29 +++++++++++-------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index dc6c5696..a8ba15e5 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -229,21 +229,26 @@ func TestPartialStateJoin(t *testing.T) { psjResult.FinishStateRequest() awaitPartialStateJoinCompletion(t, serverRoom, alice) - t.Log("8. Have Alice eager-sync. She should see the remote room, including" + - "Charlie and Derek.") - remoteMembershipFor := func(localpart string) func(result gjson.Result) bool { - return func(result gjson.Result) bool { - return result.Get("type").Str == "m.room.member" && result.Get("state_key").Str == server.UserID(localpart) - } + t.Log("8. Have Alice eager-sync. She should see the remote room.") + response, eagerSyncToken = alice.MustSync(t, getEagerSyncReq()) + + roomRes := response.Get(syncJoinedRoomPath) + if !roomRes.Exists() { + t.Fatal("Sync should now include the joined room since resync is over") } - _ = alice.MustSyncUntil( - t, - getEagerSyncReq(), - client.SyncJoinedTo(alice.UserID, serverRoom.RoomID), - client.SyncStateHas(serverRoom.RoomID, remoteMembershipFor("charlie")), - client.SyncStateHas(serverRoom.RoomID, remoteMembershipFor("derek")), + // check that the state includes both charlie and derek. + matcher := match.JSONCheckOffAllowUnwanted("state.events", + []interface{}{ + "m.room.member|" + server.UserID("charlie"), + "m.room.member|" + server.UserID("derek"), + }, func(result gjson.Result) interface{} { + return strings.Join([]string{result.Map()["type"].Str, result.Map()["state_key"].Str}, "|") + }, nil, ) + if err := matcher([]byte(roomRes.Raw)); err != nil { + t.Errorf("Did not find expected state events in /sync response: %s", err) + } } t.Run("EagerInitialSyncDuringPartialStateJoin", func(t *testing.T) { From fe2939af9ccec9ec201bc9bb85fc39b765bae2a7 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Thu, 19 Jan 2023 20:24:41 +0000 Subject: [PATCH 11/19] Allow PDUs and EDUs fuck you golang --- tests/federation_room_join_partial_state_test.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index a8ba15e5..7e138cd7 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -166,7 +166,15 @@ func TestPartialStateJoin(t *testing.T) { _, eagerSyncToken = alice.MustSync(t, client.SyncReq{}) t.Log("1. Partial join Alice to a remote room.") - server := createTestServer(t, deployment) + server := createTestServer( + t, + deployment, + // Allow PDUs and EDUs, since Alice will send a message in the room. + federation.HandleTransactionRequests( + func(e *gomatrixserverlib.Event) {}, + func(e gomatrixserverlib.EDU) {}, + ), + ) cancel := server.Listen() defer cancel() serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t)) From ee661be0ae10905f9b8ff201eb59526b0ded499f Mon Sep 17 00:00:00 2001 From: David Robertson Date: Fri, 20 Jan 2023 11:20:41 +0000 Subject: [PATCH 12/19] Improve error message? idk why golang makes these hard to see in test output --- tests/federation_room_join_partial_state_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index 7e138cd7..5597d5aa 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -242,7 +242,7 @@ func TestPartialStateJoin(t *testing.T) { roomRes := response.Get(syncJoinedRoomPath) if !roomRes.Exists() { - t.Fatal("Sync should now include the joined room since resync is over") + t.Fatal("Sync does NOT include the joined room after the resync, but should") } // check that the state includes both charlie and derek. From 394a9bf1f5196969f8ef391e76142765f847a95d Mon Sep 17 00:00:00 2001 From: David Robertson Date: Fri, 20 Jan 2023 11:20:58 +0000 Subject: [PATCH 13/19] Paranoia comment --- tests/federation_room_join_partial_state_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index 5597d5aa..9ef76570 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -151,6 +151,9 @@ func TestPartialStateJoin(t *testing.T) { if incremental { return client.SyncReq{Since: eagerSyncToken} } else { + // TODO: making repeated initial syncs might mean Synapse gives us a + // sync result from cache. Can we prevent that somehow? Restart the + // server would do it, but seems nuclear. return client.SyncReq{Since: ""} } } From 6738a2544a86f174964f6329e53fe6e064807f38 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Fri, 20 Jan 2023 13:59:53 +0000 Subject: [PATCH 14/19] Reinstate Mathieu's test Co-authored-by: Mathieu Velten --- ...federation_room_join_partial_state_test.go | 64 +++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index 9ef76570..9b0d42e4 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -270,6 +270,70 @@ func TestPartialStateJoin(t *testing.T) { eagerSyncDuringPartialStateJoinTest(t, "incremental", true) }) + t.Run("NonLazyLongPollingSyncDuringPartialStateJoin", func(t *testing.T) { + alice := deployment.RegisterUser(t, "hs1", "t99alice", "secret", false) + + server := createTestServer(t, deployment) + cancel := server.Listen() + defer cancel() + serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t)) + psjResult := beginPartialStateJoin(t, server, serverRoom, alice) + defer psjResult.Destroy(t) + + // Alice has now joined the room, and the server is syncing the state in the background. + + // initial sync shouldn't include the room yet, but still return immediatly + response, nextBatch := alice.MustSync(t, client.SyncReq{ + TimeoutMillis: "10000", + }) + + syncJoinedRoomPath := "rooms.join." + client.GjsonEscape(serverRoom.RoomID) + if response.Get(syncJoinedRoomPath).Exists() { + t.Fatal("Sync shouldn't include the joined room until resync is over") + } + + // Begin a long polling sync that shouldn't return yet since no change happened + responseChan := make(chan gjson.Result, 1) + syncStarted := make(chan struct{}) + go func() { + defer close(responseChan) + defer close(syncStarted) + + syncStarted <- struct{}{} + response, _ := alice.MustSync(t, client.SyncReq{ + TimeoutMillis: "10000", + Since: nextBatch, + }) + responseChan <- response + }() + + // Try to wait for the sync to actually start, then un-partial-state the room + select { + case <-syncStarted: + // wait for the state_ids request to arrive + psjResult.AwaitStateIdsRequest(t) + // release the federation /state response + psjResult.FinishStateRequest() + case <-time.After(time.Second * 5): + // even though this should mostly be impossible, make sure we have a timeout + t.Fatalf("goroutine didn't start") + } + + // Try to wait for the sync to return or timeout after 15 seconds, + // as the above tests are using a timeout of 10 seconds + select { + case response = <-responseChan: + case <-time.After(time.Second * 5): + t.Errorf("sync should have returned before the timeout") + } + + // the /sync request should now complete, with the new room + roomRes := response.Get(syncJoinedRoomPath) + if !roomRes.Exists() { + t.Fatal("Sync should now include the joined room since resync is over") + } + }) + // when Alice does a lazy-loading sync, she should see the room immediately t.Run("CanLazyLoadingSyncDuringPartialStateJoin", func(t *testing.T) { alice := deployment.RegisterUser(t, "hs1", "t2alice", "secret", false) From e51b5c27afaa2a25681c92ef49860587d4862f72 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Fri, 20 Jan 2023 14:11:29 +0000 Subject: [PATCH 15/19] Minor comment tweaks --- .../federation_room_join_partial_state_test.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index 9b0d42e4..7e571a87 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -270,9 +270,13 @@ func TestPartialStateJoin(t *testing.T) { eagerSyncDuringPartialStateJoinTest(t, "incremental", true) }) - t.Run("NonLazyLongPollingSyncDuringPartialStateJoin", func(t *testing.T) { - alice := deployment.RegisterUser(t, "hs1", "t99alice", "secret", false) + // The tests above use long-polling syncs with a (complement-default) timeout of + // 1s. They don't test that the long-poll returns early when the partial state + // resync completes. This test does exactly that. + t.Run("EagerLongPollingSyncWokenWhenResyncCompletes", func(t *testing.T) { + alice := deployment.RegisterUser(t, "hs1", "t1alice_long_polling", "secret", false) + t.Log("Alice partial-joins a remote room.") server := createTestServer(t, deployment) cancel := server.Listen() defer cancel() @@ -280,12 +284,8 @@ func TestPartialStateJoin(t *testing.T) { psjResult := beginPartialStateJoin(t, server, serverRoom, alice) defer psjResult.Destroy(t) - // Alice has now joined the room, and the server is syncing the state in the background. - - // initial sync shouldn't include the room yet, but still return immediatly - response, nextBatch := alice.MustSync(t, client.SyncReq{ - TimeoutMillis: "10000", - }) + t.Log("Alice eager-syncs. The response should not contain the remote room.") + response, nextBatch := alice.MustSync(t, client.SyncReq{}) syncJoinedRoomPath := "rooms.join." + client.GjsonEscape(serverRoom.RoomID) if response.Get(syncJoinedRoomPath).Exists() { @@ -330,7 +330,7 @@ func TestPartialStateJoin(t *testing.T) { // the /sync request should now complete, with the new room roomRes := response.Get(syncJoinedRoomPath) if !roomRes.Exists() { - t.Fatal("Sync should now include the joined room since resync is over") + t.Fatal("Sync does NOT include the joined room after resync") } }) From 0beaabf8917bd6aba8fcaea8be0f84b8187c5d84 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Sun, 22 Jan 2023 20:35:34 +0000 Subject: [PATCH 16/19] Fix caching comment --- tests/federation_room_join_partial_state_test.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index 7e571a87..cc168ebb 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -151,9 +151,8 @@ func TestPartialStateJoin(t *testing.T) { if incremental { return client.SyncReq{Since: eagerSyncToken} } else { - // TODO: making repeated initial syncs might mean Synapse gives us a - // sync result from cache. Can we prevent that somehow? Restart the - // server would do it, but seems nuclear. + // NB: We are assuming that the responses to repeated initial syncs + // are not cached by the homeserver. return client.SyncReq{Since: ""} } } From 76d2e5cf01d433d779211a975285ee710ce6cd64 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Mon, 23 Jan 2023 16:32:54 +0000 Subject: [PATCH 17/19] Use MustSyncUntil to avoid a race --- ...federation_room_join_partial_state_test.go | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index a4cb8483..066b6d62 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -1114,11 +1114,11 @@ func TestPartialStateJoin(t *testing.T) { psjResult.FinishStateRequest() // the /sync request should now complete, with the new room - response, _ = alice.MustSync(t, client.SyncReq{Since: nextBatch}) - - if !response.Get(syncJoinedRoomPath).Exists() { - t.Fatal("Sync should now include the joined room since resync is over") - } + nextBatch = alice.MustSyncUntil( + t, + client.SyncReq{Since: nextBatch}, + client.SyncJoinedTo(alice.UserID, serverRoom.RoomID), + ) }) // test that a partial-state join can fall back to other homeservers when re-syncing @@ -1187,11 +1187,11 @@ func TestPartialStateJoin(t *testing.T) { // reply to hs2 with a bogus /state_ids response fedStateIdsSendResponseWaiter.Finish() - response, _ = charlie.MustSync(t, client.SyncReq{Since: nextBatch}) - - if !response.Get(syncJoinedRoomPath).Exists() { - t.Fatal("hs2 /sync completed without join to new room") - } + nextBatch = charlie.MustSyncUntil( + t, + client.SyncReq{Since: nextBatch}, + client.SyncJoinedTo(charlie.UserID, roomID), + ) }) // test a lazy-load-members sync while re-syncing partial state, followed by completion of state syncing, From e1d3bbeda79c36dbce3be1b9156ed7e1b851551a Mon Sep 17 00:00:00 2001 From: David Robertson Date: Mon, 23 Jan 2023 16:52:06 +0000 Subject: [PATCH 18/19] Comment --- tests/federation_room_join_partial_state_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index 066b6d62..7e3077cb 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -1187,6 +1187,8 @@ func TestPartialStateJoin(t *testing.T) { // reply to hs2 with a bogus /state_ids response fedStateIdsSendResponseWaiter.Finish() + // We expect hs2 to fall back to requesting state from hs1, in order to + // complete the partial state join nextBatch = charlie.MustSyncUntil( t, client.SyncReq{Since: nextBatch}, From 2fcb98bdf6e0d7b7254e92bb6725a172daf499aa Mon Sep 17 00:00:00 2001 From: David Robertson Date: Mon, 23 Jan 2023 17:08:29 +0000 Subject: [PATCH 19/19] Improve long-polling faster join test --- ...federation_room_join_partial_state_test.go | 29 ++++++++++--------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index 7e3077cb..e2fbaba1 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -292,13 +292,10 @@ func TestPartialStateJoin(t *testing.T) { } // Begin a long polling sync that shouldn't return yet since no change happened + responseChan := make(chan gjson.Result, 1) - syncStarted := make(chan struct{}) go func() { defer close(responseChan) - defer close(syncStarted) - - syncStarted <- struct{}{} response, _ := alice.MustSync(t, client.SyncReq{ TimeoutMillis: "10000", Since: nextBatch, @@ -307,23 +304,27 @@ func TestPartialStateJoin(t *testing.T) { }() // Try to wait for the sync to actually start, then un-partial-state the room + time.Sleep(2 * time.Second) + + // Sanity check that the sync hasn't completed select { - case <-syncStarted: - // wait for the state_ids request to arrive - psjResult.AwaitStateIdsRequest(t) - // release the federation /state response - psjResult.FinishStateRequest() - case <-time.After(time.Second * 5): - // even though this should mostly be impossible, make sure we have a timeout - t.Fatalf("goroutine didn't start") + case response := <-responseChan: + t.Fatalf("Recieved sync response too soon: %s", response.Raw) + default: + t.Logf("No sync response yet") } + // wait for the state_ids request to arrive + psjResult.AwaitStateIdsRequest(t) + // release the federation /state response + psjResult.FinishStateRequest() + // Try to wait for the sync to return or timeout after 15 seconds, // as the above tests are using a timeout of 10 seconds select { case response = <-responseChan: - case <-time.After(time.Second * 5): - t.Errorf("sync should have returned before the timeout") + case <-time.After(time.Second * 10): + t.Fatal("sync should have returned before the timeout") } // the /sync request should now complete, with the new room