Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -163,22 +163,32 @@ internal class OperationRepo(
/**
* Waits until a new operation is enqueued, then wait an additional
* amount of time afterwards, so operations can be grouped/batched.
* NOTE: Any operations that are enqueued while waiting here causes
* the wait timer to restart over. This is intentional, we
* are basically wait for "the dust to settle" / "the water
* is calm" to ensure the app is done making updates.
* FUTURE: Highly recommend not removing this "the dust to settle"
* logic, as it ensures any app stuck in a loop can't
* cause continuous network requests. If the delay is too
* long for legitimate use-cases then allow tweaking the
* opRepoExecutionInterval value or allow commitNow()
* with a budget.
*/
private suspend fun waitForNewOperationAndExecutionInterval() {
// 1. Wait for an operation to be enqueued
var wakeMessage = waiter.waitForWake()

// 2. Wait at least the time defined in opRepoExecutionInterval
// so operations can be grouped, unless one of them used
// flush=true (AKA force)
var lastTime = _time.currentTimeMillis
// 2. Now wait opRepoExecutionInterval, restart the wait
// time everytime something new is enqueued, to ensure
// the dust has settled.
var remainingTime = _configModelStore.model.opRepoExecutionInterval - wakeMessage.previousWaitedTime
while (!wakeMessage.force && remainingTime > 0) {
withTimeoutOrNull(remainingTime) {
wakeMessage = waiter.waitForWake()
}
remainingTime -= _time.currentTimeMillis - lastTime
lastTime = _time.currentTimeMillis
while (!wakeMessage.force) {
val waitedTheFullTime =
withTimeoutOrNull(remainingTime) {
wakeMessage = waiter.waitForWake()
} == null
if (waitedTheFullTime) break
remainingTime = _configModelStore.model.opRepoExecutionInterval
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,32 @@ class OperationRepoTests : FunSpec({
)
}
}

// We want to prevent a misbehaving app stuck in a loop from continuously
// sending updates every opRepoExecutionInterval (5 seconds currently).
// By waiting for the dust to settle we ensure the app is done making
// updates.
test("ensure each time enqueue is called it restarts the delay time") {
// Given
val mocks = Mocks()
mocks.configModelStore.model.opRepoExecutionInterval = 100

// When
mocks.operationRepo.start()
launch {
repeat(10) {
mocks.operationRepo.enqueue(mockOperation(groupComparisonType = GroupComparisonType.ALTER))
delay(50)
}
}
val result =
withTimeoutOrNull(500) {
mocks.operationRepo.enqueueAndWait(mockOperation(groupComparisonType = GroupComparisonType.ALTER))
}

// Then
result shouldBe null
}
}) {
companion object {
private fun mockOperation(
Expand Down