Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 46 additions & 20 deletions tests/federation_room_join_partial_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,17 @@ func TestPartialStateJoin(t *testing.T) {
return serverRoom
}

// getSyncToken gets the latest sync token
getSyncToken := func(t *testing.T, alice *client.CSAPI) string {
_, syncToken := alice.MustSync(t,
client.SyncReq{
Filter: buildLazyLoadingSyncFilter(nil),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you probably don't need a Filter?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't that make the function block if we decide to use it midway through a partial join?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, but I didn't think this was used in the middle of a partial-state join?

(indeed, I thought initialsyncs in the middle of partial-state joins blocked irrespective of the lazy-loading flag. I'm probably wrong though)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It isn't currently used in the middle of a partial-state join, but I would like to keep the option open.

lazy-loading initial syncs don't (or no longer?) block during a partial-state join, otherwise one of the new lazy-loading tests in #442 would be getting stuck.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all makes sense, thanks!

TimeoutMillis: "0",
},
)
return syncToken
}

// test that a regular /sync request made during a partial-state /send_join
// request blocks until the state is correctly synced.
t.Run("SyncBlocksDuringPartialStateJoin", func(t *testing.T) {
Expand Down Expand Up @@ -182,6 +193,7 @@ func TestPartialStateJoin(t *testing.T) {
deployment := Deploy(t, b.BlueprintAlice)
defer deployment.Destroy(t)
alice := deployment.Client(t, "hs1", "@alice:hs1")
syncToken := getSyncToken(t, alice)

server := createTestServer(t, deployment)
cancel := server.Listen()
Expand All @@ -197,14 +209,15 @@ func TestPartialStateJoin(t *testing.T) {
t.Logf("Derek created event with ID %s", event.EventID())

// derek sends an event in the room
testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, event)
testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, event, syncToken)
})

// we should be able to receive events with a missing prev event over federation during the resync
t.Run("CanReceiveEventsWithMissingParentsDuringPartialStateJoin", func(t *testing.T) {
deployment := Deploy(t, b.BlueprintAlice)
defer deployment.Destroy(t)
alice := deployment.Client(t, "hs1", "@alice:hs1")
syncToken := getSyncToken(t, alice)

server := createTestServer(t, deployment)
cancel := server.Listen()
Expand Down Expand Up @@ -236,14 +249,15 @@ func TestPartialStateJoin(t *testing.T) {
[]string{eventB.EventID()}, []*gomatrixserverlib.Event{eventA})

// send event B to hs1
testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, eventB)
testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, eventB, syncToken)
})

// we should be able to receive events with partially missing prev events over federation during the resync
t.Run("CanReceiveEventsWithHalfMissingParentsDuringPartialStateJoin", func(t *testing.T) {
deployment := Deploy(t, b.BlueprintAlice)
defer deployment.Destroy(t)
alice := deployment.Client(t, "hs1", "@alice:hs1")
syncToken := getSyncToken(t, alice)

server := createTestServer(t, deployment)
cancel := server.Listen()
Expand Down Expand Up @@ -277,7 +291,7 @@ func TestPartialStateJoin(t *testing.T) {
[]string{eventB.EventID()}, []*gomatrixserverlib.Event{eventA})

// send event B to hs1
testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, eventB)
testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, eventB, syncToken)
})

// we should be able to receive events with a missing prev event, with half missing prev events,
Expand All @@ -286,6 +300,7 @@ func TestPartialStateJoin(t *testing.T) {
deployment := Deploy(t, b.BlueprintAlice)
defer deployment.Destroy(t)
alice := deployment.Client(t, "hs1", "@alice:hs1")
syncToken := getSyncToken(t, alice)

server := createTestServer(t, deployment)
cancel := server.Listen()
Expand Down Expand Up @@ -323,7 +338,7 @@ func TestPartialStateJoin(t *testing.T) {
handleStateRequests(t, server, serverRoom, eventA.EventID(), serverRoom.AllCurrentState(), nil, nil)

// send event C to hs1
testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, eventC)
testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, eventC, syncToken)
})

// a request to (client-side) /members?at= should block until the (federation) /state request completes
Expand Down Expand Up @@ -635,6 +650,7 @@ func TestPartialStateJoin(t *testing.T) {
deployment := Deploy(t, b.BlueprintAlice)
defer deployment.Destroy(t)
alice := deployment.Client(t, "hs1", "@alice:hs1")
syncToken := getSyncToken(t, alice)

server := createTestServer(t, deployment)
cancel := server.Listen()
Expand Down Expand Up @@ -692,7 +708,7 @@ func TestPartialStateJoin(t *testing.T) {

t.Logf("Charlie sent timeline event 2")
// wait for it to become visible, which implies that all the outliers have been pulled in.
awaitEventArrival(t, time.Second, alice, serverRoom.RoomID, timelineEvent2.EventID())
awaitEventViaSync(t, alice, serverRoom.RoomID, timelineEvent2.EventID(), syncToken)

// now we send over all the other events in the gap.
server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{lateEvent.JSON()}, nil)
Expand Down Expand Up @@ -741,6 +757,7 @@ func TestPartialStateJoin(t *testing.T) {
deployment := Deploy(t, b.BlueprintAlice)
defer deployment.Destroy(t)
alice := deployment.Client(t, "hs1", "@alice:hs1")
syncToken := getSyncToken(t, alice)

server := createTestServer(t, deployment)
cancel := server.Listen()
Expand Down Expand Up @@ -777,7 +794,7 @@ func TestPartialStateJoin(t *testing.T) {
[]json.RawMessage{badStateEvent.JSON(), sentinelEvent.JSON()}, nil)

// wait for the sentinel event to be visible
awaitEventArrival(t, time.Second, alice, serverRoom.RoomID, sentinelEvent.EventID())
syncToken = awaitEventViaSync(t, alice, serverRoom.RoomID, sentinelEvent.EventID(), syncToken)

// ... and check that the bad state event is *not* visible
must.MatchResponse(t,
Expand All @@ -793,7 +810,7 @@ func TestPartialStateJoin(t *testing.T) {
// one more (non-state) event, for testReceiveEventDuringPartialStateJoin
event := psjResult.CreateMessageEvent(t, "charlie", nil)
t.Logf("charlie created regular timeline event %s", event.EventID())
testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, event)
testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, event, syncToken)

// check that the bad state event is *still* not visible
must.MatchResponse(t,
Expand Down Expand Up @@ -929,14 +946,14 @@ func TestPartialStateJoin(t *testing.T) {

// test reception of an event over federation during a resync
// sends the given event to the homeserver under test, checks that a client can see it and checks
// the state at the event
// the state at the event. returns the new sync token after the event.
func testReceiveEventDuringPartialStateJoin(
t *testing.T, deployment *docker.Deployment, alice *client.CSAPI, psjResult partialStateJoinResult, event *gomatrixserverlib.Event,
) {
t *testing.T, deployment *docker.Deployment, alice *client.CSAPI, psjResult partialStateJoinResult, event *gomatrixserverlib.Event, syncToken string,
) string {
// send the event to the homeserver
psjResult.Server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{event.JSON()}, nil)

awaitEventArrival(t, time.Second, alice, psjResult.ServerRoom.RoomID, event.EventID())
syncToken = awaitEventViaSync(t, alice, psjResult.ServerRoom.RoomID, event.EventID(), syncToken)

// fire off a /state_ids request for the last event.
// it must either:
Expand Down Expand Up @@ -989,7 +1006,7 @@ func testReceiveEventDuringPartialStateJoin(
)
if err := psjResult.Server.SendFederationRequest(context.Background(), deployment, stateReq, &respStateIDs); err != nil {
t.Errorf("/state_ids request returned non-200: %s", err)
return
return syncToken
}
var gotState, expectedState []interface{}
for _, ev := range respStateIDs.StateEventIDs {
Expand All @@ -999,21 +1016,30 @@ func testReceiveEventDuringPartialStateJoin(
expectedState = append(expectedState, ev.EventID())
}
must.CheckOffAll(t, gotState, expectedState)

return syncToken
}

// awaitEventArrival waits for alice to be able to see a given event
func awaitEventArrival(t *testing.T, timeout time.Duration, alice *client.CSAPI, roomID string, eventID string) {
/* TODO: check that a lazy-loading sync can see the event. Currently this doesn't work, because /sync blocks.
* https://github.com/matrix-org/synapse/issues/13146
alice.MustSyncUntil(t,
// awaitEventViaSync waits for alice to be able to see a given event via an incremental lazy-loading
// /sync and returns the new sync token after
func awaitEventViaSync(t *testing.T, alice *client.CSAPI, roomID string, eventID string, syncToken string) string {
// check that a lazy-loading sync can see the event
syncToken = alice.MustSyncUntil(t,
client.SyncReq{
Since: syncToken,
Filter: buildLazyLoadingSyncFilter(nil),
},
client.SyncTimelineHasEventID(roomID, eventID),
)
*/

// still, Alice should be able to see the event with an /event request. We might have to try it a few times.
t.Logf("Alice successfully received event %s via /sync", eventID)

return syncToken
}

// awaitEventArrival waits for alice to be able to see a given event via /event
func awaitEventArrival(t *testing.T, timeout time.Duration, alice *client.CSAPI, roomID string, eventID string) {
// Alice should be able to see the event with an /event request. We might have to try it a few times.
alice.DoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "event", eventID},
client.WithRetryUntil(timeout, func(res *http.Response) bool {
if res.StatusCode == 200 {
Expand All @@ -1027,7 +1053,7 @@ func awaitEventArrival(t *testing.T, timeout time.Duration, alice *client.CSAPI,
return false
}),
)
t.Logf("Alice successfully received event %s", eventID)
t.Logf("Alice successfully observed event %s via /event", eventID)
}

// buildLazyLoadingSyncFilter constructs a json-marshalled filter suitable the 'Filter' field of a client.SyncReq
Expand Down