From 0988977c07b1af558242b9359f92a15500e6ee33 Mon Sep 17 00:00:00 2001 From: Josh Kasten Date: Fri, 12 Apr 2024 18:22:04 -0400 Subject: [PATCH 1/5] create test to prove operOp's enqueue doesn't wait This commit simply adds a test to prove the following issue with the current implementation of OperationRepo. Once processQueueForever starts processing operations it will execute them back-to-back until it can't process any more. This is done intently as the idea is sync all changes to the backend quickly, after waiting a set amount of time for batching. However nothing is in place to account for something continually adding new operations to the repo while it's in this mode. Some misbehaving app could be adding operations in a tight loop as fast or faster than the queue could exclude them. --- .../internal/operations/impl/OperationRepo.kt | 2 +- .../internal/operations/OperationRepoTests.kt | 32 +++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt b/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt index fd82c22ac6..b8d19075d1 100644 --- a/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt +++ b/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt @@ -151,7 +151,7 @@ internal class OperationRepo( } } - private suspend fun executeOperations(ops: List) { + internal suspend fun executeOperations(ops: List) { try { val startingOp = ops.first() val executor = diff --git a/OneSignalSDK/onesignal/core/src/test/java/com/onesignal/core/internal/operations/OperationRepoTests.kt b/OneSignalSDK/onesignal/core/src/test/java/com/onesignal/core/internal/operations/OperationRepoTests.kt index f456d34795..7b76ac31ca 100644 --- a/OneSignalSDK/onesignal/core/src/test/java/com/onesignal/core/internal/operations/OperationRepoTests.kt +++ b/OneSignalSDK/onesignal/core/src/test/java/com/onesignal/core/internal/operations/OperationRepoTests.kt @@ -11,6 +11,7 @@ import io.kotest.core.spec.style.FunSpec import io.kotest.matchers.shouldBe import io.mockk.CapturingSlot import io.mockk.coEvery +import io.mockk.coVerify import io.mockk.coVerifyOrder import io.mockk.every import io.mockk.just @@ -370,6 +371,37 @@ class OperationRepoTests : FunSpec({ } response shouldBe true } + + // This ensures a misbehaving app can't add operations (such as addTag()) + // in a tight loop and cause a number of back-to-back operations without delay. + test("operations enqueued while repo is executing should be executed only after the next opRepoExecutionInterval") { + // Given + val mocks = Mocks() + mocks.configModelStore.model.opRepoExecutionInterval = 100 + val enqueueAndWaitMaxTime = mocks.configModelStore.model.opRepoExecutionInterval / 2 + val opRepo = mocks.operationRepo + + val executeOperationsCall = Waiter() + coEvery { opRepo.executeOperations(any()) } coAnswers { + executeOperationsCall.wake() + delay(10) + } + + // When + opRepo.start() + opRepo.enqueue(mockOperation(groupComparisonType = GroupComparisonType.NONE)) + executeOperationsCall.waitForWake() + val secondEnqueueResult = + withTimeoutOrNull(enqueueAndWaitMaxTime) { + opRepo.enqueueAndWait(mockOperation(groupComparisonType = GroupComparisonType.NONE)) + } + + // Then + secondEnqueueResult shouldBe null + coVerify(exactly = 1) { + opRepo.executeOperations(any()) + } + } }) { companion object { private fun mockOperation( From 0f718b70669f75d0a7b5dde88b3d39d5b1a67108 Mon Sep 17 00:00:00 2001 From: Josh Kasten Date: Fri, 12 Apr 2024 19:04:14 -0400 Subject: [PATCH 2/5] save network calls on OpRepo by adding bucketing Buckets Purpose: Bucketing is a pattern we are using to help save network calls. It works together with opRepoExecutionInterval to define a time window operations can be added to the bucket. When enqueue() is called it creates a new OperationQueueItem with it's bucket = enqueueIntoBucket. Just before we start processing a bucket we enqueueIntoBucket++, this ensures anything new that comes in while executing doesn't cause it to skip the opRepoExecutionInterval delay. NOTE: Bucketing only effects the starting operation we grab. The reason is we still want getGroupableOperations() to find other operations it can execute in one go (same network call). It's more efficient overall, as it lowers the total number of network calls. This address the failing test "operations enqueued while repo is executing should be executed only after the next opRepoExecutionInterval" we added in commit 0988977c07b1af558242b9359f92a15500e6ee33 --- .../internal/operations/impl/OperationRepo.kt | 43 +++++++++++++++---- .../internal/operations/OperationRepoTests.kt | 2 +- 2 files changed, 36 insertions(+), 9 deletions(-) diff --git a/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt b/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt index b8d19075d1..c0f646af6a 100644 --- a/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt +++ b/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt @@ -26,10 +26,11 @@ internal class OperationRepo( internal class OperationQueueItem( val operation: Operation, val waiter: WaiterWithValue? = null, + val bucket: Int, var retries: Int = 0, ) { override fun toString(): String { - return Pair(operation.toString(), retries).toString() + "\n" + return "bucket:$bucket, retries:$retries, operation:$operation\n" } } @@ -38,6 +39,27 @@ internal class OperationRepo( private val waiter = WaiterWithValue() private var paused = false + /** *** Buckets *** + * Purpose: Bucketing is a pattern we are using to help save network + * calls. It works together with opRepoExecutionInterval to define + * a time window operations can be added to the bucket. + * + * When enqueue() is called it creates a new OperationQueueItem with it's + * bucket = enqueueIntoBucket. Just before we start processing a bucket we + * enqueueIntoBucket++, this ensures anything new that comes in while + * executing doesn't cause it to skip the opRepoExecutionInterval delay. + * + * NOTE: Bucketing only effects the starting operation we grab. + * The reason is we still want getGroupableOperations() to find + * other operations it can execute in one go (same network call). + * It's more efficient overall, as it lowers the total number of + * network calls. + */ + private var enqueueIntoBucket = 0 + private val executeBucket: Int get() { + return if (enqueueIntoBucket == 0) 0 else enqueueIntoBucket - 1 + } + init { val executorsMap: MutableMap = mutableMapOf() @@ -49,7 +71,7 @@ internal class OperationRepo( this.executorsMap = executorsMap for (operation in _operationModelStore.list()) { - internalEnqueue(OperationQueueItem(operation), flush = false, addToStore = false) + internalEnqueue(OperationQueueItem(operation, bucket = enqueueIntoBucket), flush = false, addToStore = false) } } @@ -73,7 +95,7 @@ internal class OperationRepo( Logging.log(LogLevel.DEBUG, "OperationRepo.enqueue(operation: $operation, flush: $flush)") operation.id = UUID.randomUUID().toString() - internalEnqueue(OperationQueueItem(operation), flush, true) + internalEnqueue(OperationQueueItem(operation, bucket = enqueueIntoBucket), flush, true) } override suspend fun enqueueAndWait( @@ -84,7 +106,7 @@ internal class OperationRepo( operation.id = UUID.randomUUID().toString() val waiter = WaiterWithValue() - internalEnqueue(OperationQueueItem(operation, waiter), flush, true) + internalEnqueue(OperationQueueItem(operation, waiter, bucket = enqueueIntoBucket), flush, true) return waiter.waitForWake() } @@ -109,13 +131,14 @@ internal class OperationRepo( */ private suspend fun processQueueForever() { waitForNewOperationAndExecutionInterval() + enqueueIntoBucket++ while (true) { if (paused) { Logging.debug("OperationRepo is paused") return } - val ops = getNextOps() + val ops = getNextOps(executeBucket) Logging.debug("processQueueForever:ops:\n$ops") if (ops != null) { @@ -125,6 +148,7 @@ internal class OperationRepo( delay(_configModelStore.model.opRepoPostWakeDelay) } else { waitForNewOperationAndExecutionInterval() + enqueueIntoBucket++ } } } @@ -227,7 +251,7 @@ internal class OperationRepo( synchronized(queue) { for (op in response.operations.reversed()) { op.id = UUID.randomUUID().toString() - val queueItem = OperationQueueItem(op) + val queueItem = OperationQueueItem(op, bucket = 0) queue.add(0, queueItem) _operationModelStore.add(0, queueItem.operation) } @@ -249,9 +273,12 @@ internal class OperationRepo( delay(delayFor) } - internal fun getNextOps(): List? { + internal fun getNextOps(bucketFilter: Int): List? { return synchronized(queue) { - val startingOp = queue.firstOrNull { it.operation.canStartExecute } + val startingOp = + queue.firstOrNull { + it.operation.canStartExecute && it.bucket <= bucketFilter + } if (startingOp != null) { queue.remove(startingOp) diff --git a/OneSignalSDK/onesignal/core/src/test/java/com/onesignal/core/internal/operations/OperationRepoTests.kt b/OneSignalSDK/onesignal/core/src/test/java/com/onesignal/core/internal/operations/OperationRepoTests.kt index 7b76ac31ca..6fc60f9210 100644 --- a/OneSignalSDK/onesignal/core/src/test/java/com/onesignal/core/internal/operations/OperationRepoTests.kt +++ b/OneSignalSDK/onesignal/core/src/test/java/com/onesignal/core/internal/operations/OperationRepoTests.kt @@ -103,7 +103,7 @@ class OperationRepoTests : FunSpec({ // 1st: gets the operation // 2nd: will be empty // 3rd: shouldn't be called, loop should be waiting on next operation - mocks.operationRepo.getNextOps() + mocks.operationRepo.getNextOps(withArg { Any() }) } } From eba7081f233d600c71f78a3e79d7c5a727e3a9d7 Mon Sep 17 00:00:00 2001 From: Josh Kasten Date: Fri, 12 Apr 2024 20:27:42 -0400 Subject: [PATCH 3/5] add test to ensure a 2nd pass behaves correctly This ensures we don't have a off-by-one errors --- .../internal/operations/OperationRepoTests.kt | 50 ++++++++++++++++--- 1 file changed, 43 insertions(+), 7 deletions(-) diff --git a/OneSignalSDK/onesignal/core/src/test/java/com/onesignal/core/internal/operations/OperationRepoTests.kt b/OneSignalSDK/onesignal/core/src/test/java/com/onesignal/core/internal/operations/OperationRepoTests.kt index 6fc60f9210..f3373ab9bc 100644 --- a/OneSignalSDK/onesignal/core/src/test/java/com/onesignal/core/internal/operations/OperationRepoTests.kt +++ b/OneSignalSDK/onesignal/core/src/test/java/com/onesignal/core/internal/operations/OperationRepoTests.kt @@ -381,19 +381,15 @@ class OperationRepoTests : FunSpec({ val enqueueAndWaitMaxTime = mocks.configModelStore.model.opRepoExecutionInterval / 2 val opRepo = mocks.operationRepo - val executeOperationsCall = Waiter() - coEvery { opRepo.executeOperations(any()) } coAnswers { - executeOperationsCall.wake() - delay(10) - } + val executeOperationsCall = mockExecuteOperations(opRepo) // When opRepo.start() - opRepo.enqueue(mockOperation(groupComparisonType = GroupComparisonType.NONE)) + opRepo.enqueue(mockOperationNonGroupable()) executeOperationsCall.waitForWake() val secondEnqueueResult = withTimeoutOrNull(enqueueAndWaitMaxTime) { - opRepo.enqueueAndWait(mockOperation(groupComparisonType = GroupComparisonType.NONE)) + opRepo.enqueueAndWait(mockOperationNonGroupable()) } // Then @@ -402,6 +398,34 @@ class OperationRepoTests : FunSpec({ opRepo.executeOperations(any()) } } + + // This ensures there are no off-by-one errors with the same scenario as above, but on a 2nd + // pass of OperationRepo + test("operations enqueued while repo is executing should be executed only after the next opRepoExecutionInterval, 2nd pass") { + // Given + val mocks = Mocks() + mocks.configModelStore.model.opRepoExecutionInterval = 100 + val enqueueAndWaitMaxTime = mocks.configModelStore.model.opRepoExecutionInterval / 2 + val opRepo = mocks.operationRepo + + val executeOperationsCall = mockExecuteOperations(opRepo) + + // When + opRepo.start() + opRepo.enqueue(mockOperationNonGroupable()) + executeOperationsCall.waitForWake() + opRepo.enqueueAndWait(mockOperationNonGroupable()) + val thirdEnqueueResult = + withTimeoutOrNull(enqueueAndWaitMaxTime) { + opRepo.enqueueAndWait(mockOperationNonGroupable()) + } + + // Then + thirdEnqueueResult shouldBe null + coVerify(exactly = 2) { + opRepo.executeOperations(any()) + } + } }) { companion object { private fun mockOperation( @@ -427,5 +451,17 @@ class OperationRepoTests : FunSpec({ return operation } + + private fun mockOperationNonGroupable() = mockOperation(groupComparisonType = GroupComparisonType.NONE) + + private fun mockExecuteOperations(opRepo: OperationRepo): Waiter { + val executeWaiter = Waiter() + coEvery { opRepo.executeOperations(any()) } coAnswers { + executeWaiter.wake() + delay(10) + firstArg>().forEach { it.waiter?.wake(true) } + } + return executeWaiter + } } } From c5fde006fbba94311f51c3d1dff7065aa870bcef Mon Sep 17 00:00:00 2001 From: Josh Kasten Date: Fri, 12 Apr 2024 21:17:29 -0400 Subject: [PATCH 4/5] test to ensure we are processing cold start ops We want to ensure we are processing operations starting operations we didn't process the last time the app was running and in an efficient way. --- .../internal/operations/OperationRepoTests.kt | 63 +++++++++++++------ 1 file changed, 45 insertions(+), 18 deletions(-) diff --git a/OneSignalSDK/onesignal/core/src/test/java/com/onesignal/core/internal/operations/OperationRepoTests.kt b/OneSignalSDK/onesignal/core/src/test/java/com/onesignal/core/internal/operations/OperationRepoTests.kt index f3373ab9bc..84b42d5e9f 100644 --- a/OneSignalSDK/onesignal/core/src/test/java/com/onesignal/core/internal/operations/OperationRepoTests.kt +++ b/OneSignalSDK/onesignal/core/src/test/java/com/onesignal/core/internal/operations/OperationRepoTests.kt @@ -1,6 +1,7 @@ package com.onesignal.core.internal.operations import com.onesignal.common.threading.Waiter +import com.onesignal.common.threading.WaiterWithValue import com.onesignal.core.internal.operations.impl.OperationModelStore import com.onesignal.core.internal.operations.impl.OperationRepo import com.onesignal.core.internal.time.impl.Time @@ -44,17 +45,16 @@ private class Mocks { mockExecutor } - val operationRepo: OperationRepo = - run { - spyk( - OperationRepo( - listOf(executor), - operationModelStore, - configModelStore, - Time(), - ), - ) - } + val operationRepo: OperationRepo by lazy { + spyk( + OperationRepo( + listOf(executor), + operationModelStore, + configModelStore, + Time(), + ), + ) + } } class OperationRepoTests : FunSpec({ @@ -136,7 +136,8 @@ class OperationRepoTests : FunSpec({ test("enqueue operation executes and is removed when executed after retry") { // Given val mocks = Mocks() - coEvery { mocks.operationRepo.delayBeforeRetry(any()) } just runs + val opRepo = mocks.operationRepo + coEvery { opRepo.delayBeforeRetry(any()) } just runs coEvery { mocks.executor.execute(any()) } returns ExecutionResponse(ExecutionResult.FAIL_RETRY) andThen ExecutionResponse(ExecutionResult.SUCCESS) @@ -145,8 +146,8 @@ class OperationRepoTests : FunSpec({ val operation = mockOperation(operationIdSlot = operationIdSlot) // When - mocks.operationRepo.start() - val response = mocks.operationRepo.enqueueAndWait(operation) + opRepo.start() + val response = opRepo.enqueueAndWait(operation) // Then response shouldBe true @@ -159,7 +160,7 @@ class OperationRepoTests : FunSpec({ it[0] shouldBe operation }, ) - mocks.operationRepo.delayBeforeRetry(1) + opRepo.delayBeforeRetry(1) mocks.executor.execute( withArg { it.count() shouldBe 1 @@ -426,6 +427,32 @@ class OperationRepoTests : FunSpec({ opRepo.executeOperations(any()) } } + + // Starting operations are operations we didn't process the last time the app was running. + // We want to ensure we process them, but only after the standard batching delay to be as + // optional as possible with network calls. + test("starting OperationModelStore should be processed, following normal delay rules") { + // Given + val mocks = Mocks() + mocks.configModelStore.model.opRepoExecutionInterval = 100 + every { mocks.operationModelStore.list() } returns listOf(mockOperation()) + val executeOperationsCall = mockExecuteOperations(mocks.operationRepo) + + // When + mocks.operationRepo.start() + val immediateResult = + withTimeoutOrNull(100) { + executeOperationsCall.waitForWake() + } + val delayedResult = + withTimeoutOrNull(200) { + executeOperationsCall.waitForWake() + } + + // Then + immediateResult shouldBe null + delayedResult shouldBe true + } }) { companion object { private fun mockOperation( @@ -454,10 +481,10 @@ class OperationRepoTests : FunSpec({ private fun mockOperationNonGroupable() = mockOperation(groupComparisonType = GroupComparisonType.NONE) - private fun mockExecuteOperations(opRepo: OperationRepo): Waiter { - val executeWaiter = Waiter() + private fun mockExecuteOperations(opRepo: OperationRepo): WaiterWithValue { + val executeWaiter = WaiterWithValue() coEvery { opRepo.executeOperations(any()) } coAnswers { - executeWaiter.wake() + executeWaiter.wake(true) delay(10) firstArg>().forEach { it.waiter?.wake(true) } } From 8001c6e940c1f3f2a99d4dbc7134b415d0d21e12 Mon Sep 17 00:00:00 2001 From: Josh Kasten Date: Sat, 13 Apr 2024 16:24:45 -0400 Subject: [PATCH 5/5] add results from executeOperations test --- .../internal/operations/OperationRepoTests.kt | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/OneSignalSDK/onesignal/core/src/test/java/com/onesignal/core/internal/operations/OperationRepoTests.kt b/OneSignalSDK/onesignal/core/src/test/java/com/onesignal/core/internal/operations/OperationRepoTests.kt index 84b42d5e9f..243e539622 100644 --- a/OneSignalSDK/onesignal/core/src/test/java/com/onesignal/core/internal/operations/OperationRepoTests.kt +++ b/OneSignalSDK/onesignal/core/src/test/java/com/onesignal/core/internal/operations/OperationRepoTests.kt @@ -4,6 +4,7 @@ import com.onesignal.common.threading.Waiter import com.onesignal.common.threading.WaiterWithValue import com.onesignal.core.internal.operations.impl.OperationModelStore import com.onesignal.core.internal.operations.impl.OperationRepo +import com.onesignal.core.internal.operations.impl.OperationRepo.OperationQueueItem import com.onesignal.core.internal.time.impl.Time import com.onesignal.debug.LogLevel import com.onesignal.debug.internal.logging.Logging @@ -453,6 +454,37 @@ class OperationRepoTests : FunSpec({ immediateResult shouldBe null delayedResult shouldBe true } + + test("ensure results from executeOperations are added to beginning of the queue") { + // Given + val mocks = Mocks() + val executor = mocks.executor + val opWithResult = mockOperationNonGroupable() + val opFromResult = mockOperationNonGroupable() + coEvery { + executor.execute(listOf(opWithResult)) + } coAnswers { + ExecutionResponse(ExecutionResult.SUCCESS, operations = listOf(opFromResult)) + } + val firstOp = mockOperationNonGroupable() + val secondOp = mockOperationNonGroupable() + + // When + mocks.operationRepo.start() + mocks.operationRepo.enqueue(firstOp) + mocks.operationRepo.executeOperations( + listOf(OperationQueueItem(opWithResult, bucket = 0)), + ) + mocks.operationRepo.enqueueAndWait(secondOp) + + // Then + coVerifyOrder { + executor.execute(withArg { it[0] shouldBe opWithResult }) + executor.execute(withArg { it[0] shouldBe opFromResult }) + executor.execute(withArg { it[0] shouldBe firstOp }) + executor.execute(withArg { it[0] shouldBe secondOp }) + } + } }) { companion object { private fun mockOperation(