Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,22 @@ func SyncTimelineHasEventID(roomID string, eventID string) SyncCheckOpt {
})
}

// Check that the state section for `roomID` has an event which passes the check function.
// Note that the state section of a sync response only contains the change in state up to the start
// of the timeline and will not contain the entire state of the room for incremental or
// `lazy_load_members` syncs.
func SyncStateHas(roomID string, check func(gjson.Result) bool) SyncCheckOpt {
return func(clientUserID string, topLevelSyncJSON gjson.Result) error {
err := loopArray(
topLevelSyncJSON, "rooms.join."+GjsonEscape(roomID)+".state.events", check,
)
if err == nil {
return nil
}
return fmt.Errorf("SyncStateHas(%s): %s", roomID, err)
}
}

func SyncEphemeralHas(roomID string, check func(gjson.Result) bool) SyncCheckOpt {
return func(clientUserID string, topLevelSyncJSON gjson.Result) error {
err := loopArray(
Expand Down
174 changes: 174 additions & 0 deletions tests/federation_room_join_partial_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,17 @@ func TestPartialStateJoin(t *testing.T) {
return serverRoom
}

// getSyncToken gets the latest sync token
getSyncToken := func(t *testing.T, alice *client.CSAPI) string {
_, syncToken := alice.MustSync(t,
client.SyncReq{
Filter: buildLazyLoadingSyncFilter(nil),
TimeoutMillis: "0",
},
)
return syncToken
}

// test that a regular /sync request made during a partial-state /send_join
// request blocks until the state is correctly synced.
t.Run("SyncBlocksDuringPartialStateJoin", func(t *testing.T) {
Expand Down Expand Up @@ -326,6 +337,169 @@ func TestPartialStateJoin(t *testing.T) {
testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, eventC)
})

// initial sync must return memberships of event senders even when they aren't present in the
// partial room state.
t.Run("Lazy-loading initial sync includes remote memberships during partial state join", func(t *testing.T) {
deployment := Deploy(t, b.BlueprintAlice)
defer deployment.Destroy(t)
alice := deployment.Client(t, "hs1", "@alice:hs1")

server := createTestServer(t, deployment)
cancel := server.Listen()
defer cancel()
serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t))
psjResult := beginPartialStateJoin(t, server, serverRoom, alice)
defer psjResult.Destroy()

// the HS will make an /event_auth request for the event
federation.HandleEventAuthRequests()(server)

// derek sends a message into the room.
event := psjResult.CreateMessageEvent(t, "derek", nil)
t.Logf("Derek created event with ID %s", event.EventID())
psjResult.Server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{event.JSON()}, nil)

// wait for the homeserver to persist the event.
awaitEventArrival(t, time.Second, alice, serverRoom.RoomID, event.EventID())

// do a lazy-loading initial sync.
syncRes, _ := alice.MustSync(t,
client.SyncReq{
Since: "",
Filter: buildLazyLoadingSyncFilter(nil),
},
)

err := client.SyncStateHas(serverRoom.RoomID, func(ev gjson.Result) bool {
return ev.Get("type").Str == "m.room.member" && ev.Get("state_key").Str == event.Sender()
})(alice.UserID, syncRes)
if err != nil {
t.Errorf("Did not find %s's m.room.member event in lazy-loading /sync response: %s", event.Sender(), err)
}
})

// gappy sync must return memberships of event senders even when they aren't present in the
// partial room state.
t.Run("Lazy-loading gappy sync includes remote memberships during partial state join", func(t *testing.T) {
deployment := Deploy(t, b.BlueprintAlice)
defer deployment.Destroy(t)
alice := deployment.Client(t, "hs1", "@alice:hs1")
syncToken := getSyncToken(t, alice)

server := createTestServer(t, deployment)
cancel := server.Listen()
defer cancel()
serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t))
psjResult := beginPartialStateJoin(t, server, serverRoom, alice)
defer psjResult.Destroy()

syncToken = alice.MustSyncUntil(t,
client.SyncReq{
Since: syncToken,
Filter: buildLazyLoadingSyncFilter(nil),
},
client.SyncJoinedTo(alice.UserID, serverRoom.RoomID),
)

// the HS will make an /event_auth request for the event
federation.HandleEventAuthRequests()(server)

// derek sends two messages into the room.
event1 := psjResult.CreateMessageEvent(t, "derek", nil)
event2 := psjResult.CreateMessageEvent(t, "derek", nil)
t.Logf("Derek created event 1 with ID %s", event1.EventID())
t.Logf("Derek created event 2 with ID %s", event2.EventID())
psjResult.Server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{event1.JSON(), event2.JSON()}, nil)

// wait for the homeserver to persist the event.
awaitEventArrival(t, time.Second, alice, serverRoom.RoomID, event2.EventID())

// do a gappy sync which only picks up the second message.
syncRes, _ := alice.MustSync(t,
client.SyncReq{
Since: syncToken,
Filter: buildLazyLoadingSyncFilter(map[string]interface{}{
"limit": 1,
}),
},
)

if !syncRes.Get("rooms.join." + client.GjsonEscape(serverRoom.RoomID) + ".timeline.limited").Bool() {
t.Errorf("/sync response was not gappy")
}

err := client.SyncTimelineHas(serverRoom.RoomID, func(ev gjson.Result) bool {
return ev.Get("event_id").Str == event1.EventID()
})(alice.UserID, syncRes)
if err == nil {
t.Errorf("gappy /sync returned the first event unexpectedly")
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should also assert that the limited flag is set? I don't think it matters much for this test, but in general I think it might avoid potential pitfalls to explicitly assert that.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a check for the limited flag.

err = client.SyncTimelineHas(serverRoom.RoomID, func(ev gjson.Result) bool {
return ev.Get("event_id").Str == event2.EventID()
})(alice.UserID, syncRes)
if err != nil {
t.Errorf("Did not find event 2 in lazy-loading /sync response: %s", err)
}

err = client.SyncStateHas(serverRoom.RoomID, func(ev gjson.Result) bool {
return ev.Get("type").Str == "m.room.member" && ev.Get("state_key").Str == event2.Sender()
})(alice.UserID, syncRes)
if err != nil {
t.Errorf("Did not find %s's m.room.member event in lazy-loading /sync response: %s", event2.Sender(), err)
}
})

// incremental sync must return memberships of event senders even when they aren't present in
// the partial room state.
t.Run("Lazy-loading incremental sync includes remote memberships during partial state join", func(t *testing.T) {
deployment := Deploy(t, b.BlueprintAlice)
defer deployment.Destroy(t)
alice := deployment.Client(t, "hs1", "@alice:hs1")
syncToken := getSyncToken(t, alice)

server := createTestServer(t, deployment)
cancel := server.Listen()
defer cancel()
serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t))
psjResult := beginPartialStateJoin(t, server, serverRoom, alice)
defer psjResult.Destroy()

syncToken = alice.MustSyncUntil(t,
client.SyncReq{
Since: syncToken,
Filter: buildLazyLoadingSyncFilter(nil),
},
client.SyncJoinedTo(alice.UserID, serverRoom.RoomID),
)

// the HS will make an /event_auth request for the event
federation.HandleEventAuthRequests()(server)

// derek sends a message into the room.
event := psjResult.CreateMessageEvent(t, "derek", nil)
psjResult.Server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{event.JSON()}, nil)
t.Logf("Derek created event with ID %s", event.EventID())

// wait for the homeserver to persist the event.
awaitEventArrival(t, time.Second, alice, serverRoom.RoomID, event.EventID())

// do an incremental sync.
syncRes, _ := alice.MustSync(t,
client.SyncReq{
Since: syncToken,
Filter: buildLazyLoadingSyncFilter(nil),
},
)

err := client.SyncStateHas(serverRoom.RoomID, func(ev gjson.Result) bool {
return ev.Get("type").Str == "m.room.member" && ev.Get("state_key").Str == event.Sender()
})(alice.UserID, syncRes)
if err != nil {
t.Errorf("Did not find %s's m.room.member event in lazy-loading /sync response: %s", event.Sender(), err)
}
})

// a request to (client-side) /members?at= should block until the (federation) /state request completes
// TODO(faster_joins): also need to test /state, and /members without an `at`, which follow a different path
t.Run("MembersRequestBlocksDuringPartialStateJoin", func(t *testing.T) {
Expand Down