Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 52 additions & 10 deletions tests/csapi/device_lists_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,51 @@ func TestDeviceListUpdates(t *testing.T) {
}
}

// makeBarrier returns a function which tries to act as a barrier for `device_lists.changed`
// updates.
//
// When a remote user has a device list update, an entry is expected to appear in
// `device_lists.changed` in the `/sync` response. The local homeserver may then query the
// remote homeserver for the update. Some homeservers (Synapse) may emit extra
// `device_lists.changed` updates in `/sync` responses after querying keys.
//
// The barrier tries to ensure that `device_lists.changed` entries resulting from device list
// updates and queries before the barrier do not appear in `/sync` responses after the barrier.
Comment on lines +109 to +110
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have I understood this correctly?

We make something new happen in the device lists stream for barry and wait to see the result happen. Because

  • barry's change happens after any "no-op" device_list.changed entries, and
  • the device list stream is sequential/linear(?)

by the time we've seen device_lists.changed for barry we will have already seen the "no-op" `device_list.changed" (if it was generated at all).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the idea, yes.

makeBarrier := func(
t *testing.T,
deployment *docker.Deployment,
observingUser *client.CSAPI,
otherHSName string,
) func(t *testing.T, nextBatch string) string {
t.Helper()

barry := deployment.RegisterUser(t, otherHSName, generateLocalpart("barry"), "password", false)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

excellent choice of username :)


// The observing user must share a room with the dummy barrier user.
roomID := barry.CreateRoom(t, map[string]interface{}{"preset": "public_chat"})
observingUser.JoinRoom(t, roomID, []string{otherHSName})
observingUser.MustSyncUntil(t, client.SyncReq{}, client.SyncJoinedTo(observingUser.UserID, roomID))

return func(t *testing.T, nextBatch string) string {
// Publish a device list update from the barrier user and wait until the observing user
// sees it.
uploadNewKeys(t, barry)
return observingUser.MustSyncUntil(
t,
client.SyncReq{Since: nextBatch},
syncDeviceListsHas("changed", barry.UserID),
)
}
}

// In all of these test scenarios, there are two users: Alice and Bob.
// We only care about what Alice sees.

// testOtherUserJoin tests another user joining a room Alice is already in.
testOtherUserJoin := func(t *testing.T, deployment *docker.Deployment, hsName string, otherHSName string) {
alice := deployment.RegisterUser(t, hsName, generateLocalpart("alice"), "password", false)
bob := deployment.RegisterUser(t, otherHSName, generateLocalpart("bob"), "password", false)
barrier := makeBarrier(t, deployment, alice, otherHSName)
checkBobKeys := uploadNewKeys(t, bob)

roomID := alice.CreateRoom(t, map[string]interface{}{"preset": "public_chat"})
Expand All @@ -117,14 +155,14 @@ func TestDeviceListUpdates(t *testing.T) {
bob.MustSyncUntil(t, client.SyncReq{}, client.SyncJoinedTo(bob.UserID, roomID))

// Check that Alice receives a device list update from Bob
alice.MustSyncUntil(
aliceNextBatch = alice.MustSyncUntil(
t,
client.SyncReq{Since: aliceNextBatch},
syncDeviceListsHas("changed", bob.UserID),
)
mustQueryKeys(t, alice, bob.UserID, checkBobKeys)
// Some homeservers (Synapse) may emit another `changed` update after querying keys.
_, aliceNextBatch = alice.MustSync(t, client.SyncReq{TimeoutMillis: "0"})
aliceNextBatch = barrier(t, aliceNextBatch)

// Both homeservers think Bob has joined now
// Bob then updates their device list
Expand All @@ -145,6 +183,7 @@ func TestDeviceListUpdates(t *testing.T) {
) {
alice := deployment.RegisterUser(t, hsName, generateLocalpart("alice"), "password", false)
bob := deployment.RegisterUser(t, otherHSName, generateLocalpart("bob"), "password", false)
barrier := makeBarrier(t, deployment, alice, otherHSName)
checkBobKeys := uploadNewKeys(t, bob)

roomID := bob.CreateRoom(t, map[string]interface{}{"preset": "public_chat"})
Expand All @@ -157,14 +196,14 @@ func TestDeviceListUpdates(t *testing.T) {
bob.MustSyncUntil(t, client.SyncReq{}, client.SyncJoinedTo(alice.UserID, roomID))

// Check that Alice receives a device list update from Bob
alice.MustSyncUntil(
aliceNextBatch = alice.MustSyncUntil(
t,
client.SyncReq{Since: aliceNextBatch},
syncDeviceListsHas("changed", bob.UserID),
)
mustQueryKeys(t, alice, bob.UserID, checkBobKeys)
// Some homeservers (Synapse) may emit another `changed` update after querying keys.
_, aliceNextBatch = alice.MustSync(t, client.SyncReq{TimeoutMillis: "0"})
aliceNextBatch = barrier(t, aliceNextBatch)

// Both homeservers think Alice has joined now
// Bob then updates their device list
Expand All @@ -183,6 +222,7 @@ func TestDeviceListUpdates(t *testing.T) {
testOtherUserLeave := func(t *testing.T, deployment *docker.Deployment, hsName string, otherHSName string) {
alice := deployment.RegisterUser(t, hsName, generateLocalpart("alice"), "password", false)
bob := deployment.RegisterUser(t, otherHSName, generateLocalpart("bob"), "password", false)
barrier := makeBarrier(t, deployment, alice, otherHSName)
checkBobKeys := uploadNewKeys(t, bob)

roomID := alice.CreateRoom(t, map[string]interface{}{"preset": "public_chat"})
Expand All @@ -192,10 +232,10 @@ func TestDeviceListUpdates(t *testing.T) {
bobNextBatch := bob.MustSyncUntil(t, client.SyncReq{}, client.SyncJoinedTo(bob.UserID, roomID))

// Alice performs an initial sync
alice.MustSyncUntil(t, client.SyncReq{}, client.SyncJoinedTo(bob.UserID, roomID))
aliceNextBatch := alice.MustSyncUntil(t, client.SyncReq{}, client.SyncJoinedTo(bob.UserID, roomID))
mustQueryKeys(t, alice, bob.UserID, checkBobKeys)
// Some homeservers (Synapse) may emit another `changed` update after querying keys.
_, aliceNextBatch := alice.MustSync(t, client.SyncReq{TimeoutMillis: "0"})
aliceNextBatch = barrier(t, aliceNextBatch)

// Bob leaves the room
bob.LeaveRoom(t, roomID)
Expand Down Expand Up @@ -226,6 +266,7 @@ func TestDeviceListUpdates(t *testing.T) {
testLeave := func(t *testing.T, deployment *docker.Deployment, hsName string, otherHSName string) {
alice := deployment.RegisterUser(t, hsName, generateLocalpart("alice"), "password", false)
bob := deployment.RegisterUser(t, otherHSName, generateLocalpart("bob"), "password", false)
barrier := makeBarrier(t, deployment, alice, otherHSName)
checkBobKeys := uploadNewKeys(t, bob)

roomID := bob.CreateRoom(t, map[string]interface{}{"preset": "public_chat"})
Expand All @@ -235,10 +276,10 @@ func TestDeviceListUpdates(t *testing.T) {
bobNextBatch := bob.MustSyncUntil(t, client.SyncReq{}, client.SyncJoinedTo(alice.UserID, roomID))

// Alice performs an initial sync
alice.MustSyncUntil(t, client.SyncReq{}, client.SyncJoinedTo(alice.UserID, roomID))
aliceNextBatch := alice.MustSyncUntil(t, client.SyncReq{}, client.SyncJoinedTo(alice.UserID, roomID))
mustQueryKeys(t, alice, bob.UserID, checkBobKeys)
// Some homeservers (Synapse) may emit another `changed` update after querying keys.
_, aliceNextBatch := alice.MustSync(t, client.SyncReq{TimeoutMillis: "0"})
aliceNextBatch = barrier(t, aliceNextBatch)

// Alice leaves the room
alice.LeaveRoom(t, roomID)
Expand Down Expand Up @@ -269,6 +310,7 @@ func TestDeviceListUpdates(t *testing.T) {
testOtherUserRejoin := func(t *testing.T, deployment *docker.Deployment, hsName string, otherHSName string) {
alice := deployment.RegisterUser(t, hsName, generateLocalpart("alice"), "password", false)
bob := deployment.RegisterUser(t, otherHSName, generateLocalpart("bob"), "password", false)
barrier := makeBarrier(t, deployment, alice, otherHSName)
checkBobKeys := uploadNewKeys(t, bob)

roomID := alice.CreateRoom(t, map[string]interface{}{"preset": "public_chat"})
Expand All @@ -278,10 +320,10 @@ func TestDeviceListUpdates(t *testing.T) {
bobNextBatch := bob.MustSyncUntil(t, client.SyncReq{}, client.SyncJoinedTo(bob.UserID, roomID))

// Alice performs an initial sync
alice.MustSyncUntil(t, client.SyncReq{}, client.SyncJoinedTo(bob.UserID, roomID))
aliceNextBatch := alice.MustSyncUntil(t, client.SyncReq{}, client.SyncJoinedTo(bob.UserID, roomID))
mustQueryKeys(t, alice, bob.UserID, checkBobKeys)
// Some homeservers (Synapse) may emit another `changed` update after querying keys.
_, aliceNextBatch := alice.MustSync(t, client.SyncReq{TimeoutMillis: "0"})
aliceNextBatch = barrier(t, aliceNextBatch)

// Both homeservers think Bob has joined now
// Bob leaves the room
Expand Down