diff --git a/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt b/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt index fd82c22ac6..c0f646af6a 100644 --- a/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt +++ b/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt @@ -26,10 +26,11 @@ internal class OperationRepo( internal class OperationQueueItem( val operation: Operation, val waiter: WaiterWithValue? = null, + val bucket: Int, var retries: Int = 0, ) { override fun toString(): String { - return Pair(operation.toString(), retries).toString() + "\n" + return "bucket:$bucket, retries:$retries, operation:$operation\n" } } @@ -38,6 +39,27 @@ internal class OperationRepo( private val waiter = WaiterWithValue() private var paused = false + /** *** Buckets *** + * Purpose: Bucketing is a pattern we are using to help save network + * calls. It works together with opRepoExecutionInterval to define + * a time window operations can be added to the bucket. + * + * When enqueue() is called it creates a new OperationQueueItem with it's + * bucket = enqueueIntoBucket. Just before we start processing a bucket we + * enqueueIntoBucket++, this ensures anything new that comes in while + * executing doesn't cause it to skip the opRepoExecutionInterval delay. + * + * NOTE: Bucketing only effects the starting operation we grab. + * The reason is we still want getGroupableOperations() to find + * other operations it can execute in one go (same network call). + * It's more efficient overall, as it lowers the total number of + * network calls. + */ + private var enqueueIntoBucket = 0 + private val executeBucket: Int get() { + return if (enqueueIntoBucket == 0) 0 else enqueueIntoBucket - 1 + } + init { val executorsMap: MutableMap = mutableMapOf() @@ -49,7 +71,7 @@ internal class OperationRepo( this.executorsMap = executorsMap for (operation in _operationModelStore.list()) { - internalEnqueue(OperationQueueItem(operation), flush = false, addToStore = false) + internalEnqueue(OperationQueueItem(operation, bucket = enqueueIntoBucket), flush = false, addToStore = false) } } @@ -73,7 +95,7 @@ internal class OperationRepo( Logging.log(LogLevel.DEBUG, "OperationRepo.enqueue(operation: $operation, flush: $flush)") operation.id = UUID.randomUUID().toString() - internalEnqueue(OperationQueueItem(operation), flush, true) + internalEnqueue(OperationQueueItem(operation, bucket = enqueueIntoBucket), flush, true) } override suspend fun enqueueAndWait( @@ -84,7 +106,7 @@ internal class OperationRepo( operation.id = UUID.randomUUID().toString() val waiter = WaiterWithValue() - internalEnqueue(OperationQueueItem(operation, waiter), flush, true) + internalEnqueue(OperationQueueItem(operation, waiter, bucket = enqueueIntoBucket), flush, true) return waiter.waitForWake() } @@ -109,13 +131,14 @@ internal class OperationRepo( */ private suspend fun processQueueForever() { waitForNewOperationAndExecutionInterval() + enqueueIntoBucket++ while (true) { if (paused) { Logging.debug("OperationRepo is paused") return } - val ops = getNextOps() + val ops = getNextOps(executeBucket) Logging.debug("processQueueForever:ops:\n$ops") if (ops != null) { @@ -125,6 +148,7 @@ internal class OperationRepo( delay(_configModelStore.model.opRepoPostWakeDelay) } else { waitForNewOperationAndExecutionInterval() + enqueueIntoBucket++ } } } @@ -151,7 +175,7 @@ internal class OperationRepo( } } - private suspend fun executeOperations(ops: List) { + internal suspend fun executeOperations(ops: List) { try { val startingOp = ops.first() val executor = @@ -227,7 +251,7 @@ internal class OperationRepo( synchronized(queue) { for (op in response.operations.reversed()) { op.id = UUID.randomUUID().toString() - val queueItem = OperationQueueItem(op) + val queueItem = OperationQueueItem(op, bucket = 0) queue.add(0, queueItem) _operationModelStore.add(0, queueItem.operation) } @@ -249,9 +273,12 @@ internal class OperationRepo( delay(delayFor) } - internal fun getNextOps(): List? { + internal fun getNextOps(bucketFilter: Int): List? { return synchronized(queue) { - val startingOp = queue.firstOrNull { it.operation.canStartExecute } + val startingOp = + queue.firstOrNull { + it.operation.canStartExecute && it.bucket <= bucketFilter + } if (startingOp != null) { queue.remove(startingOp) diff --git a/OneSignalSDK/onesignal/core/src/test/java/com/onesignal/core/internal/operations/OperationRepoTests.kt b/OneSignalSDK/onesignal/core/src/test/java/com/onesignal/core/internal/operations/OperationRepoTests.kt index f456d34795..243e539622 100644 --- a/OneSignalSDK/onesignal/core/src/test/java/com/onesignal/core/internal/operations/OperationRepoTests.kt +++ b/OneSignalSDK/onesignal/core/src/test/java/com/onesignal/core/internal/operations/OperationRepoTests.kt @@ -1,8 +1,10 @@ package com.onesignal.core.internal.operations import com.onesignal.common.threading.Waiter +import com.onesignal.common.threading.WaiterWithValue import com.onesignal.core.internal.operations.impl.OperationModelStore import com.onesignal.core.internal.operations.impl.OperationRepo +import com.onesignal.core.internal.operations.impl.OperationRepo.OperationQueueItem import com.onesignal.core.internal.time.impl.Time import com.onesignal.debug.LogLevel import com.onesignal.debug.internal.logging.Logging @@ -11,6 +13,7 @@ import io.kotest.core.spec.style.FunSpec import io.kotest.matchers.shouldBe import io.mockk.CapturingSlot import io.mockk.coEvery +import io.mockk.coVerify import io.mockk.coVerifyOrder import io.mockk.every import io.mockk.just @@ -43,17 +46,16 @@ private class Mocks { mockExecutor } - val operationRepo: OperationRepo = - run { - spyk( - OperationRepo( - listOf(executor), - operationModelStore, - configModelStore, - Time(), - ), - ) - } + val operationRepo: OperationRepo by lazy { + spyk( + OperationRepo( + listOf(executor), + operationModelStore, + configModelStore, + Time(), + ), + ) + } } class OperationRepoTests : FunSpec({ @@ -102,7 +104,7 @@ class OperationRepoTests : FunSpec({ // 1st: gets the operation // 2nd: will be empty // 3rd: shouldn't be called, loop should be waiting on next operation - mocks.operationRepo.getNextOps() + mocks.operationRepo.getNextOps(withArg { Any() }) } } @@ -135,7 +137,8 @@ class OperationRepoTests : FunSpec({ test("enqueue operation executes and is removed when executed after retry") { // Given val mocks = Mocks() - coEvery { mocks.operationRepo.delayBeforeRetry(any()) } just runs + val opRepo = mocks.operationRepo + coEvery { opRepo.delayBeforeRetry(any()) } just runs coEvery { mocks.executor.execute(any()) } returns ExecutionResponse(ExecutionResult.FAIL_RETRY) andThen ExecutionResponse(ExecutionResult.SUCCESS) @@ -144,8 +147,8 @@ class OperationRepoTests : FunSpec({ val operation = mockOperation(operationIdSlot = operationIdSlot) // When - mocks.operationRepo.start() - val response = mocks.operationRepo.enqueueAndWait(operation) + opRepo.start() + val response = opRepo.enqueueAndWait(operation) // Then response shouldBe true @@ -158,7 +161,7 @@ class OperationRepoTests : FunSpec({ it[0] shouldBe operation }, ) - mocks.operationRepo.delayBeforeRetry(1) + opRepo.delayBeforeRetry(1) mocks.executor.execute( withArg { it.count() shouldBe 1 @@ -370,6 +373,118 @@ class OperationRepoTests : FunSpec({ } response shouldBe true } + + // This ensures a misbehaving app can't add operations (such as addTag()) + // in a tight loop and cause a number of back-to-back operations without delay. + test("operations enqueued while repo is executing should be executed only after the next opRepoExecutionInterval") { + // Given + val mocks = Mocks() + mocks.configModelStore.model.opRepoExecutionInterval = 100 + val enqueueAndWaitMaxTime = mocks.configModelStore.model.opRepoExecutionInterval / 2 + val opRepo = mocks.operationRepo + + val executeOperationsCall = mockExecuteOperations(opRepo) + + // When + opRepo.start() + opRepo.enqueue(mockOperationNonGroupable()) + executeOperationsCall.waitForWake() + val secondEnqueueResult = + withTimeoutOrNull(enqueueAndWaitMaxTime) { + opRepo.enqueueAndWait(mockOperationNonGroupable()) + } + + // Then + secondEnqueueResult shouldBe null + coVerify(exactly = 1) { + opRepo.executeOperations(any()) + } + } + + // This ensures there are no off-by-one errors with the same scenario as above, but on a 2nd + // pass of OperationRepo + test("operations enqueued while repo is executing should be executed only after the next opRepoExecutionInterval, 2nd pass") { + // Given + val mocks = Mocks() + mocks.configModelStore.model.opRepoExecutionInterval = 100 + val enqueueAndWaitMaxTime = mocks.configModelStore.model.opRepoExecutionInterval / 2 + val opRepo = mocks.operationRepo + + val executeOperationsCall = mockExecuteOperations(opRepo) + + // When + opRepo.start() + opRepo.enqueue(mockOperationNonGroupable()) + executeOperationsCall.waitForWake() + opRepo.enqueueAndWait(mockOperationNonGroupable()) + val thirdEnqueueResult = + withTimeoutOrNull(enqueueAndWaitMaxTime) { + opRepo.enqueueAndWait(mockOperationNonGroupable()) + } + + // Then + thirdEnqueueResult shouldBe null + coVerify(exactly = 2) { + opRepo.executeOperations(any()) + } + } + + // Starting operations are operations we didn't process the last time the app was running. + // We want to ensure we process them, but only after the standard batching delay to be as + // optional as possible with network calls. + test("starting OperationModelStore should be processed, following normal delay rules") { + // Given + val mocks = Mocks() + mocks.configModelStore.model.opRepoExecutionInterval = 100 + every { mocks.operationModelStore.list() } returns listOf(mockOperation()) + val executeOperationsCall = mockExecuteOperations(mocks.operationRepo) + + // When + mocks.operationRepo.start() + val immediateResult = + withTimeoutOrNull(100) { + executeOperationsCall.waitForWake() + } + val delayedResult = + withTimeoutOrNull(200) { + executeOperationsCall.waitForWake() + } + + // Then + immediateResult shouldBe null + delayedResult shouldBe true + } + + test("ensure results from executeOperations are added to beginning of the queue") { + // Given + val mocks = Mocks() + val executor = mocks.executor + val opWithResult = mockOperationNonGroupable() + val opFromResult = mockOperationNonGroupable() + coEvery { + executor.execute(listOf(opWithResult)) + } coAnswers { + ExecutionResponse(ExecutionResult.SUCCESS, operations = listOf(opFromResult)) + } + val firstOp = mockOperationNonGroupable() + val secondOp = mockOperationNonGroupable() + + // When + mocks.operationRepo.start() + mocks.operationRepo.enqueue(firstOp) + mocks.operationRepo.executeOperations( + listOf(OperationQueueItem(opWithResult, bucket = 0)), + ) + mocks.operationRepo.enqueueAndWait(secondOp) + + // Then + coVerifyOrder { + executor.execute(withArg { it[0] shouldBe opWithResult }) + executor.execute(withArg { it[0] shouldBe opFromResult }) + executor.execute(withArg { it[0] shouldBe firstOp }) + executor.execute(withArg { it[0] shouldBe secondOp }) + } + } }) { companion object { private fun mockOperation( @@ -395,5 +510,17 @@ class OperationRepoTests : FunSpec({ return operation } + + private fun mockOperationNonGroupable() = mockOperation(groupComparisonType = GroupComparisonType.NONE) + + private fun mockExecuteOperations(opRepo: OperationRepo): WaiterWithValue { + val executeWaiter = WaiterWithValue() + coEvery { opRepo.executeOperations(any()) } coAnswers { + executeWaiter.wake(true) + delay(10) + firstArg>().forEach { it.waiter?.wake(true) } + } + return executeWaiter + } } }