Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ internal class OperationRepo(
internal class OperationQueueItem(
val operation: Operation,
val waiter: WaiterWithValue<Boolean>? = null,
val bucket: Int,
var retries: Int = 0,
) {
override fun toString(): String {
return Pair(operation.toString(), retries).toString() + "\n"
return "bucket:$bucket, retries:$retries, operation:$operation\n"
}
}

Expand All @@ -38,6 +39,27 @@ internal class OperationRepo(
private val waiter = WaiterWithValue<Boolean>()
private var paused = false

/** *** Buckets ***
* Purpose: Bucketing is a pattern we are using to help save network
* calls. It works together with opRepoExecutionInterval to define
* a time window operations can be added to the bucket.
*
* When enqueue() is called it creates a new OperationQueueItem with it's
* bucket = enqueueIntoBucket. Just before we start processing a bucket we
* enqueueIntoBucket++, this ensures anything new that comes in while
* executing doesn't cause it to skip the opRepoExecutionInterval delay.
*
* NOTE: Bucketing only effects the starting operation we grab.
* The reason is we still want getGroupableOperations() to find
* other operations it can execute in one go (same network call).
* It's more efficient overall, as it lowers the total number of
* network calls.
*/
private var enqueueIntoBucket = 0
private val executeBucket: Int get() {
return if (enqueueIntoBucket == 0) 0 else enqueueIntoBucket - 1
}

init {
val executorsMap: MutableMap<String, IOperationExecutor> = mutableMapOf()

Expand All @@ -49,7 +71,7 @@ internal class OperationRepo(
this.executorsMap = executorsMap

for (operation in _operationModelStore.list()) {
internalEnqueue(OperationQueueItem(operation), flush = false, addToStore = false)
internalEnqueue(OperationQueueItem(operation, bucket = enqueueIntoBucket), flush = false, addToStore = false)
}
}

Expand All @@ -73,7 +95,7 @@ internal class OperationRepo(
Logging.log(LogLevel.DEBUG, "OperationRepo.enqueue(operation: $operation, flush: $flush)")

operation.id = UUID.randomUUID().toString()
internalEnqueue(OperationQueueItem(operation), flush, true)
internalEnqueue(OperationQueueItem(operation, bucket = enqueueIntoBucket), flush, true)
}

override suspend fun enqueueAndWait(
Expand All @@ -84,7 +106,7 @@ internal class OperationRepo(

operation.id = UUID.randomUUID().toString()
val waiter = WaiterWithValue<Boolean>()
internalEnqueue(OperationQueueItem(operation, waiter), flush, true)
internalEnqueue(OperationQueueItem(operation, waiter, bucket = enqueueIntoBucket), flush, true)
return waiter.waitForWake()
}

Expand All @@ -109,13 +131,14 @@ internal class OperationRepo(
*/
private suspend fun processQueueForever() {
waitForNewOperationAndExecutionInterval()
enqueueIntoBucket++
while (true) {
if (paused) {
Logging.debug("OperationRepo is paused")
return
}

val ops = getNextOps()
val ops = getNextOps(executeBucket)
Logging.debug("processQueueForever:ops:\n$ops")

if (ops != null) {
Expand All @@ -125,6 +148,7 @@ internal class OperationRepo(
delay(_configModelStore.model.opRepoPostWakeDelay)
} else {
waitForNewOperationAndExecutionInterval()
enqueueIntoBucket++
}
}
}
Expand All @@ -151,7 +175,7 @@ internal class OperationRepo(
}
}

private suspend fun executeOperations(ops: List<OperationQueueItem>) {
internal suspend fun executeOperations(ops: List<OperationQueueItem>) {
try {
val startingOp = ops.first()
val executor =
Expand Down Expand Up @@ -227,7 +251,7 @@ internal class OperationRepo(
synchronized(queue) {
for (op in response.operations.reversed()) {
op.id = UUID.randomUUID().toString()
val queueItem = OperationQueueItem(op)
val queueItem = OperationQueueItem(op, bucket = 0)
queue.add(0, queueItem)
_operationModelStore.add(0, queueItem.operation)
}
Expand All @@ -249,9 +273,12 @@ internal class OperationRepo(
delay(delayFor)
}

internal fun getNextOps(): List<OperationQueueItem>? {
internal fun getNextOps(bucketFilter: Int): List<OperationQueueItem>? {
return synchronized(queue) {
val startingOp = queue.firstOrNull { it.operation.canStartExecute }
val startingOp =
queue.firstOrNull {
it.operation.canStartExecute && it.bucket <= bucketFilter
}

if (startingOp != null) {
queue.remove(startingOp)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.onesignal.core.internal.operations

import com.onesignal.common.threading.Waiter
import com.onesignal.common.threading.WaiterWithValue
import com.onesignal.core.internal.operations.impl.OperationModelStore
import com.onesignal.core.internal.operations.impl.OperationRepo
import com.onesignal.core.internal.operations.impl.OperationRepo.OperationQueueItem
import com.onesignal.core.internal.time.impl.Time
import com.onesignal.debug.LogLevel
import com.onesignal.debug.internal.logging.Logging
Expand All @@ -11,6 +13,7 @@ import io.kotest.core.spec.style.FunSpec
import io.kotest.matchers.shouldBe
import io.mockk.CapturingSlot
import io.mockk.coEvery
import io.mockk.coVerify
import io.mockk.coVerifyOrder
import io.mockk.every
import io.mockk.just
Expand Down Expand Up @@ -43,17 +46,16 @@ private class Mocks {
mockExecutor
}

val operationRepo: OperationRepo =
run {
spyk(
OperationRepo(
listOf(executor),
operationModelStore,
configModelStore,
Time(),
),
)
}
val operationRepo: OperationRepo by lazy {
spyk(
OperationRepo(
listOf(executor),
operationModelStore,
configModelStore,
Time(),
),
)
}
}

class OperationRepoTests : FunSpec({
Expand Down Expand Up @@ -102,7 +104,7 @@ class OperationRepoTests : FunSpec({
// 1st: gets the operation
// 2nd: will be empty
// 3rd: shouldn't be called, loop should be waiting on next operation
mocks.operationRepo.getNextOps()
mocks.operationRepo.getNextOps(withArg { Any() })
}
}

Expand Down Expand Up @@ -135,7 +137,8 @@ class OperationRepoTests : FunSpec({
test("enqueue operation executes and is removed when executed after retry") {
// Given
val mocks = Mocks()
coEvery { mocks.operationRepo.delayBeforeRetry(any()) } just runs
val opRepo = mocks.operationRepo
coEvery { opRepo.delayBeforeRetry(any()) } just runs
coEvery {
mocks.executor.execute(any())
} returns ExecutionResponse(ExecutionResult.FAIL_RETRY) andThen ExecutionResponse(ExecutionResult.SUCCESS)
Expand All @@ -144,8 +147,8 @@ class OperationRepoTests : FunSpec({
val operation = mockOperation(operationIdSlot = operationIdSlot)

// When
mocks.operationRepo.start()
val response = mocks.operationRepo.enqueueAndWait(operation)
opRepo.start()
val response = opRepo.enqueueAndWait(operation)

// Then
response shouldBe true
Expand All @@ -158,7 +161,7 @@ class OperationRepoTests : FunSpec({
it[0] shouldBe operation
},
)
mocks.operationRepo.delayBeforeRetry(1)
opRepo.delayBeforeRetry(1)
mocks.executor.execute(
withArg {
it.count() shouldBe 1
Expand Down Expand Up @@ -370,6 +373,118 @@ class OperationRepoTests : FunSpec({
}
response shouldBe true
}

// This ensures a misbehaving app can't add operations (such as addTag())
// in a tight loop and cause a number of back-to-back operations without delay.
test("operations enqueued while repo is executing should be executed only after the next opRepoExecutionInterval") {
// Given
val mocks = Mocks()
mocks.configModelStore.model.opRepoExecutionInterval = 100
val enqueueAndWaitMaxTime = mocks.configModelStore.model.opRepoExecutionInterval / 2
val opRepo = mocks.operationRepo

val executeOperationsCall = mockExecuteOperations(opRepo)

// When
opRepo.start()
opRepo.enqueue(mockOperationNonGroupable())
executeOperationsCall.waitForWake()
val secondEnqueueResult =
withTimeoutOrNull(enqueueAndWaitMaxTime) {
opRepo.enqueueAndWait(mockOperationNonGroupable())
}

// Then
secondEnqueueResult shouldBe null
coVerify(exactly = 1) {
opRepo.executeOperations(any())
}
}

// This ensures there are no off-by-one errors with the same scenario as above, but on a 2nd
// pass of OperationRepo
test("operations enqueued while repo is executing should be executed only after the next opRepoExecutionInterval, 2nd pass") {
// Given
val mocks = Mocks()
mocks.configModelStore.model.opRepoExecutionInterval = 100
val enqueueAndWaitMaxTime = mocks.configModelStore.model.opRepoExecutionInterval / 2
val opRepo = mocks.operationRepo

val executeOperationsCall = mockExecuteOperations(opRepo)

// When
opRepo.start()
opRepo.enqueue(mockOperationNonGroupable())
executeOperationsCall.waitForWake()
opRepo.enqueueAndWait(mockOperationNonGroupable())
val thirdEnqueueResult =
withTimeoutOrNull(enqueueAndWaitMaxTime) {
opRepo.enqueueAndWait(mockOperationNonGroupable())
}

// Then
thirdEnqueueResult shouldBe null
coVerify(exactly = 2) {
opRepo.executeOperations(any())
}
}

// Starting operations are operations we didn't process the last time the app was running.
// We want to ensure we process them, but only after the standard batching delay to be as
// optional as possible with network calls.
test("starting OperationModelStore should be processed, following normal delay rules") {
// Given
val mocks = Mocks()
mocks.configModelStore.model.opRepoExecutionInterval = 100
every { mocks.operationModelStore.list() } returns listOf(mockOperation())
val executeOperationsCall = mockExecuteOperations(mocks.operationRepo)

// When
mocks.operationRepo.start()
val immediateResult =
withTimeoutOrNull(100) {
executeOperationsCall.waitForWake()
}
val delayedResult =
withTimeoutOrNull(200) {
executeOperationsCall.waitForWake()
}

// Then
immediateResult shouldBe null
delayedResult shouldBe true
}

test("ensure results from executeOperations are added to beginning of the queue") {
// Given
val mocks = Mocks()
val executor = mocks.executor
val opWithResult = mockOperationNonGroupable()
val opFromResult = mockOperationNonGroupable()
coEvery {
executor.execute(listOf(opWithResult))
} coAnswers {
ExecutionResponse(ExecutionResult.SUCCESS, operations = listOf(opFromResult))
}
val firstOp = mockOperationNonGroupable()
val secondOp = mockOperationNonGroupable()

// When
mocks.operationRepo.start()
mocks.operationRepo.enqueue(firstOp)
mocks.operationRepo.executeOperations(
listOf(OperationQueueItem(opWithResult, bucket = 0)),
)
mocks.operationRepo.enqueueAndWait(secondOp)

// Then
coVerifyOrder {
executor.execute(withArg { it[0] shouldBe opWithResult })
executor.execute(withArg { it[0] shouldBe opFromResult })
executor.execute(withArg { it[0] shouldBe firstOp })
executor.execute(withArg { it[0] shouldBe secondOp })
}
}
}) {
companion object {
private fun mockOperation(
Expand All @@ -395,5 +510,17 @@ class OperationRepoTests : FunSpec({

return operation
}

private fun mockOperationNonGroupable() = mockOperation(groupComparisonType = GroupComparisonType.NONE)

private fun mockExecuteOperations(opRepo: OperationRepo): WaiterWithValue<Boolean> {
val executeWaiter = WaiterWithValue<Boolean>()
coEvery { opRepo.executeOperations(any()) } coAnswers {
executeWaiter.wake(true)
delay(10)
firstArg<List<OperationRepo.OperationQueueItem>>().forEach { it.waiter?.wake(true) }
}
return executeWaiter
}
}
}