diff --git a/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt b/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt index 10d3b4dfa5..4439b688e3 100644 --- a/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt +++ b/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt @@ -1,7 +1,6 @@ package com.onesignal.core.internal.operations.impl import com.onesignal.common.threading.WaiterWithValue -import com.onesignal.common.threading.suspendifyOnIO import com.onesignal.core.internal.config.ConfigModelStore import com.onesignal.core.internal.operations.ExecutionResult import com.onesignal.core.internal.operations.GroupComparisonType @@ -14,7 +13,10 @@ import com.onesignal.debug.LogLevel import com.onesignal.debug.internal.logging.Logging import com.onesignal.user.internal.operations.impl.states.NewRecordsState import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.newSingleThreadContext import kotlinx.coroutines.withTimeoutOrNull import java.util.UUID import kotlin.math.max @@ -43,6 +45,14 @@ internal class OperationRepo( val previousWaitedTime: Long = 0, ) + // The order of operation execution is critical to this OperationRepo + // logic, all processing must be done on same thread to ensure this. + // - This result of not following this is flaky tests, which inturn could + // result in bugs in production. + private val scope by lazy { + CoroutineScope(newSingleThreadContext(name = "OSOperationRepoScope")) + } + private val executorsMap: Map internal val queue = mutableListOf() private val waiter = WaiterWithValue() @@ -92,7 +102,7 @@ internal class OperationRepo( override fun start() { paused = false - suspendifyOnIO { + scope.launch { // load saved operations first then start processing the queue to ensure correct operation order loadSavedOperations() processQueueForever() @@ -113,8 +123,7 @@ internal class OperationRepo( Logging.log(LogLevel.DEBUG, "OperationRepo.enqueue(operation: $operation, flush: $flush)") operation.id = UUID.randomUUID().toString() - // Use suspendifyOnIO to ensure non-blocking behavior for main thread - suspendifyOnIO { + scope.launch { internalEnqueue(OperationQueueItem(operation, bucket = enqueueIntoBucket), flush, true) } } @@ -127,7 +136,9 @@ internal class OperationRepo( operation.id = UUID.randomUUID().toString() val waiter = WaiterWithValue() - internalEnqueue(OperationQueueItem(operation, waiter, bucket = enqueueIntoBucket), flush, true) + scope.launch { + internalEnqueue(OperationQueueItem(operation, waiter, bucket = enqueueIntoBucket), flush, true) + } return waiter.waitForWake() }