From dca06a1fe6ac457a23c2401fd4151de71a0a0e2a Mon Sep 17 00:00:00 2001 From: VelikovPetar Date: Thu, 30 Apr 2026 20:11:36 +0200 Subject: [PATCH] Add MessageBufferConfig. --- .../api/stream-chat-android-state.api | 27 ++- .../internal/EventHandlerSequential.kt | 81 +++++-- .../state/plugin/config/StatePluginConfig.kt | 57 +++++ .../factory/StreamStatePluginFactory.kt | 4 + .../state/event/TotalUnreadCountTest.kt | 2 + .../internal/EventHandlerSequentialTest.kt | 212 +++++++++++++++++- ...andlerSequentialUserMessagesDeletedTest.kt | 2 + 7 files changed, 359 insertions(+), 26 deletions(-) diff --git a/stream-chat-android-state/api/stream-chat-android-state.api b/stream-chat-android-state/api/stream-chat-android-state.api index d745c2150b5..664907cf929 100644 --- a/stream-chat-android-state/api/stream-chat-android-state.api +++ b/stream-chat-android-state/api/stream-chat-android-state.api @@ -109,15 +109,34 @@ public final class io/getstream/chat/android/state/plugin/config/ChannelMessageL public fun toString ()Ljava/lang/String; } +public final class io/getstream/chat/android/state/plugin/config/MessageBufferConfig { + public fun ()V + public fun (Ljava/util/Set;ILkotlinx/coroutines/channels/BufferOverflow;)V + public synthetic fun (Ljava/util/Set;ILkotlinx/coroutines/channels/BufferOverflow;ILkotlin/jvm/internal/DefaultConstructorMarker;)V + public final fun component1 ()Ljava/util/Set; + public final fun component2 ()I + public final fun component3 ()Lkotlinx/coroutines/channels/BufferOverflow; + public final fun copy (Ljava/util/Set;ILkotlinx/coroutines/channels/BufferOverflow;)Lio/getstream/chat/android/state/plugin/config/MessageBufferConfig; + public static synthetic fun copy$default (Lio/getstream/chat/android/state/plugin/config/MessageBufferConfig;Ljava/util/Set;ILkotlinx/coroutines/channels/BufferOverflow;ILjava/lang/Object;)Lio/getstream/chat/android/state/plugin/config/MessageBufferConfig; + public fun equals (Ljava/lang/Object;)Z + public final fun getCapacity ()I + public final fun getChannelTypes ()Ljava/util/Set; + public final fun getOverflow ()Lkotlinx/coroutines/channels/BufferOverflow; + public fun hashCode ()I + public fun toString ()Ljava/lang/String; +} + public final class io/getstream/chat/android/state/plugin/config/MessageLimitConfig { public fun ()V - public fun (Ljava/util/Set;)V - public synthetic fun (Ljava/util/Set;ILkotlin/jvm/internal/DefaultConstructorMarker;)V + public fun (Ljava/util/Set;Lio/getstream/chat/android/state/plugin/config/MessageBufferConfig;)V + public synthetic fun (Ljava/util/Set;Lio/getstream/chat/android/state/plugin/config/MessageBufferConfig;ILkotlin/jvm/internal/DefaultConstructorMarker;)V public final fun component1 ()Ljava/util/Set; - public final fun copy (Ljava/util/Set;)Lio/getstream/chat/android/state/plugin/config/MessageLimitConfig; - public static synthetic fun copy$default (Lio/getstream/chat/android/state/plugin/config/MessageLimitConfig;Ljava/util/Set;ILjava/lang/Object;)Lio/getstream/chat/android/state/plugin/config/MessageLimitConfig; + public final fun component2 ()Lio/getstream/chat/android/state/plugin/config/MessageBufferConfig; + public final fun copy (Ljava/util/Set;Lio/getstream/chat/android/state/plugin/config/MessageBufferConfig;)Lio/getstream/chat/android/state/plugin/config/MessageLimitConfig; + public static synthetic fun copy$default (Lio/getstream/chat/android/state/plugin/config/MessageLimitConfig;Ljava/util/Set;Lio/getstream/chat/android/state/plugin/config/MessageBufferConfig;ILjava/lang/Object;)Lio/getstream/chat/android/state/plugin/config/MessageLimitConfig; public fun equals (Ljava/lang/Object;)Z public final fun getChannelMessageLimits ()Ljava/util/Set; + public final fun getMessageBufferConfig ()Lio/getstream/chat/android/state/plugin/config/MessageBufferConfig; public fun hashCode ()I public fun toString ()Ljava/lang/String; } diff --git a/stream-chat-android-state/src/main/java/io/getstream/chat/android/state/event/handler/internal/EventHandlerSequential.kt b/stream-chat-android-state/src/main/java/io/getstream/chat/android/state/event/handler/internal/EventHandlerSequential.kt index dfb15804818..82c72fa39a5 100644 --- a/stream-chat-android-state/src/main/java/io/getstream/chat/android/state/event/handler/internal/EventHandlerSequential.kt +++ b/stream-chat-android-state/src/main/java/io/getstream/chat/android/state/event/handler/internal/EventHandlerSequential.kt @@ -114,6 +114,7 @@ import io.getstream.chat.android.state.event.handler.internal.batch.BatchEvent import io.getstream.chat.android.state.event.handler.internal.batch.SocketEventCollector import io.getstream.chat.android.state.event.handler.internal.utils.realType import io.getstream.chat.android.state.event.handler.internal.utils.toChannelUserRead +import io.getstream.chat.android.state.plugin.config.MessageBufferConfig import io.getstream.chat.android.state.plugin.logic.channel.internal.ChannelLogic import io.getstream.chat.android.state.plugin.logic.internal.LogicRegistry import io.getstream.chat.android.state.plugin.logic.querychannels.internal.QueryChannelsLogic @@ -157,6 +158,7 @@ internal class EventHandlerSequential( private val repos: RepositoryFacade, private val sideEffect: suspend () -> Unit, private val syncedEvents: Flow>, + private val bufferConfig: MessageBufferConfig, scope: CoroutineScope, ) : EventHandler { @@ -167,12 +169,58 @@ internal class EventHandlerSequential( private val mutex = Mutex() private val socketEvents = MutableSharedFlow(extraBufferCapacity = Int.MAX_VALUE) + + /** + * Secondary flow used only when [bufferConfig] opts specific channel types into a bounded buffer. + * Allocated lazily so the default configuration pays no cost for it. + */ + private val bufferedNewMessageEvents: MutableSharedFlow by lazy { + MutableSharedFlow( + extraBufferCapacity = bufferConfig.capacity, + onBufferOverflow = bufferConfig.overflow, + ) + } private val socketEventCollector = SocketEventCollector(scope) { batchEvent -> handleBatchEvent(batchEvent) } private var eventsDisposable: Disposable = EMPTY_DISPOSABLE + /** + * Default listener — emits every event into the unbuffered [socketEvents] flow without + * inspecting [bufferConfig]. Used whenever no channel types are opted in for buffering. + */ + private val defaultSocketEventListener: ChatEventListener = ChatEventListener { event -> + logEmitOutcome(event, socketEvents.tryEmit(event)) + } + + /** + * Listener used only when [bufferConfig] opts specific channel types into a bounded buffer. + * Routes matching [NewMessageEvent]s to [bufferedNewMessageEvents] and everything else to + * [socketEvents]. + */ + private val bufferedSocketEventListener: ChatEventListener = ChatEventListener { event -> + val target = if (event is NewMessageEvent && event.channelType in bufferConfig.channelTypes) { + bufferedNewMessageEvents + } else { + socketEvents + } + logEmitOutcome(event, target.tryEmit(event)) + } + + private fun logEmitOutcome(event: ChatEvent, emitted: Boolean) { + if (emitted) { + val cCount = collectedCount.get() + val eCount = emittedCount.incrementAndGet() + val ratio = eCount.toDouble() / cCount.toDouble() + StreamLog.v(TAG_SOCKET) { + "[onSocketEventReceived] event.type: ${event.realType}; $eCount => $cCount ($ratio)" + } + } else { + StreamLog.e(TAG_SOCKET) { "[onSocketEventReceived] failed to emit socket event: $event" } + } + } + init { logger.d { " no args" } } @@ -199,26 +247,23 @@ internal class EventHandlerSequential( ) } } - scope.launch { - socketEvents.collect { event -> - collectedCount.incrementAndGet() - initJob.join() - sideEffect() - socketEventCollector.collect(event) - } + val collectSocketEvent: suspend (ChatEvent) -> Unit = { event -> + collectedCount.incrementAndGet() + initJob.join() + sideEffect() + socketEventCollector.collect(event) } - eventsDisposable = subscribeForEvents { event -> - if (socketEvents.tryEmit(event)) { - val cCount = collectedCount.get() - val eCount = emittedCount.incrementAndGet() - val ratio = eCount.toDouble() / cCount.toDouble() - StreamLog.v(TAG_SOCKET) { - "[onSocketEventReceived] event.type: ${event.realType}; $eCount => $cCount ($ratio)" - } - } else { - StreamLog.e(TAG_SOCKET) { "[onSocketEventReceived] failed to emit socket event: $event" } - } + scope.launch { socketEvents.collect(collectSocketEvent) } + val isBufferingEnabled = bufferConfig.channelTypes.isNotEmpty() + if (isBufferingEnabled) { + scope.launch { bufferedNewMessageEvents.collect(collectSocketEvent) } + } + val activeListener = if (isBufferingEnabled) { + bufferedSocketEventListener + } else { + defaultSocketEventListener } + eventsDisposable = subscribeForEvents(activeListener) } } diff --git a/stream-chat-android-state/src/main/java/io/getstream/chat/android/state/plugin/config/StatePluginConfig.kt b/stream-chat-android-state/src/main/java/io/getstream/chat/android/state/plugin/config/StatePluginConfig.kt index bcf77a113a1..b3f0d840f59 100644 --- a/stream-chat-android-state/src/main/java/io/getstream/chat/android/state/plugin/config/StatePluginConfig.kt +++ b/stream-chat-android-state/src/main/java/io/getstream/chat/android/state/plugin/config/StatePluginConfig.kt @@ -22,6 +22,7 @@ import io.getstream.chat.android.models.TimeDuration import io.getstream.chat.android.state.extensions.queryChannelsAsState import io.getstream.chat.android.state.extensions.watchChannelAsState import io.getstream.chat.android.state.plugin.internal.StatePlugin +import kotlinx.coroutines.channels.BufferOverflow /** * Provides a configuration for [io.getstream.chat.android.state.plugin.internal.StatePlugin]. @@ -128,9 +129,14 @@ public data class StatePluginConfig @JvmOverloads constructor( * @param channelMessageLimits A set of [ChannelMessageLimit] defining the maximum number of messages to keep in * memory for different channel types. By default, this is an empty set, meaning no limits are applied and all * messages are kept in memory. Each channel type can have its own limit configured independently. + * + * @param messageBufferConfig Configuration for bounding the inbound `NewMessageEvent` buffer on selected channel + * types. By default, no buffering is applied — events flow through the unbuffered path. See [MessageBufferConfig] + * for details and trade-offs. */ public data class MessageLimitConfig( public val channelMessageLimits: Set = setOf(), + public val messageBufferConfig: MessageBufferConfig = MessageBufferConfig(), ) /** @@ -161,3 +167,54 @@ public data class ChannelMessageLimit( public val channelType: String, public val baseLimit: Int, ) + +/** + * Configuration for buffering inbound `NewMessageEvent`s for specific channel types before they + * are dispatched to the sequential event-handling pipeline. + * + * High-traffic channel types (e.g. livestreams) can produce a flood of new-message events that + * arrive faster than they can be processed sequentially. This configuration applies a bounded + * buffer with a configurable overflow strategy (e.g. drop oldest) for `NewMessageEvent`s on the + * configured channel types only. Events for other channel types — and all non-`NewMessageEvent` + * events — continue to flow through the default unbuffered path with `Int.MAX_VALUE` capacity, + * so signal-critical events such as reads, bans or member updates are never dropped. + * + * By default this is a no-op: no channel types are configured, so the buffered code path is not + * active and the SDK behaves exactly as if this configuration did not exist. + * + * Example — drop the oldest pending `NewMessageEvent` for `messaging` channels when more than + * 100 are queued: + * ```kotlin + * StatePluginConfig( + * messageLimitConfig = MessageLimitConfig( + * messageBufferConfig = MessageBufferConfig( + * channelTypes = setOf("messaging"), + * capacity = 100, + * overflow = BufferOverflow.DROP_OLDEST, + * ), + * ), + * ) + * ``` + * + * @param channelTypes The set of channel types whose `NewMessageEvent`s should be routed through + * the bounded buffer. Channel types not in this set continue to use the unbuffered path. When + * this set is empty (the default), buffering is disabled entirely and the per-event channel-type + * check is skipped. + * + * @param capacity The maximum number of `NewMessageEvent`s that can be queued in the buffer + * while the consumer is busy. Once exceeded, [overflow] decides which event to drop or whether + * to suspend. Defaults to `Int.MAX_VALUE`, which effectively disables overflow. + * + * @param overflow The strategy applied when the buffer is full: + * - [BufferOverflow.SUSPEND] (default): the producer suspends until space is available. Note + * that the underlying socket listener uses non-suspending `tryEmit`, so with `SUSPEND` an + * overflowing emission is simply reported as a failed emit rather than blocking the socket. + * - [BufferOverflow.DROP_OLDEST]: the oldest queued event is evicted to make room for the new + * one. Useful for live channels where freshness matters more than completeness. + * - [BufferOverflow.DROP_LATEST]: the newest event is discarded and the queued events are kept. + */ +public data class MessageBufferConfig( + public val channelTypes: Set = emptySet(), + public val capacity: Int = Int.MAX_VALUE, + public val overflow: BufferOverflow = BufferOverflow.SUSPEND, +) diff --git a/stream-chat-android-state/src/main/java/io/getstream/chat/android/state/plugin/factory/StreamStatePluginFactory.kt b/stream-chat-android-state/src/main/java/io/getstream/chat/android/state/plugin/factory/StreamStatePluginFactory.kt index 1c89d54749a..125695192cc 100644 --- a/stream-chat-android-state/src/main/java/io/getstream/chat/android/state/plugin/factory/StreamStatePluginFactory.kt +++ b/stream-chat-android-state/src/main/java/io/getstream/chat/android/state/plugin/factory/StreamStatePluginFactory.kt @@ -29,6 +29,7 @@ import io.getstream.chat.android.models.User import io.getstream.chat.android.state.errorhandler.StateErrorHandlerFactory import io.getstream.chat.android.state.event.handler.internal.EventHandler import io.getstream.chat.android.state.event.handler.internal.EventHandlerSequential +import io.getstream.chat.android.state.plugin.config.MessageBufferConfig import io.getstream.chat.android.state.plugin.config.StatePluginConfig import io.getstream.chat.android.state.plugin.internal.StatePlugin import io.getstream.chat.android.state.plugin.logic.internal.LogicRegistry @@ -151,6 +152,7 @@ public class StreamStatePluginFactory( repos = repositoryFacade, syncedEvents = syncManager.syncedEvents, sideEffect = syncManager::awaitSyncing, + bufferConfig = config.messageLimitConfig.messageBufferConfig, ) if (config.backgroundSyncEnabled) { @@ -192,6 +194,7 @@ public class StreamStatePluginFactory( repos: RepositoryFacade, sideEffect: suspend () -> Unit, syncedEvents: Flow>, + bufferConfig: MessageBufferConfig, ): EventHandler { return EventHandlerSequential( scope = scope, @@ -204,6 +207,7 @@ public class StreamStatePluginFactory( repos = repos, syncedEvents = syncedEvents, sideEffect = sideEffect, + bufferConfig = bufferConfig, ) } } diff --git a/stream-chat-android-state/src/test/java/io/getstream/chat/android/state/event/TotalUnreadCountTest.kt b/stream-chat-android-state/src/test/java/io/getstream/chat/android/state/event/TotalUnreadCountTest.kt index ee68498da38..e947006ea3b 100644 --- a/stream-chat-android-state/src/test/java/io/getstream/chat/android/state/event/TotalUnreadCountTest.kt +++ b/stream-chat-android-state/src/test/java/io/getstream/chat/android/state/event/TotalUnreadCountTest.kt @@ -25,6 +25,7 @@ import io.getstream.chat.android.models.ChannelCapabilities import io.getstream.chat.android.models.User import io.getstream.chat.android.state.event.handler.internal.EventHandler import io.getstream.chat.android.state.event.handler.internal.EventHandlerSequential +import io.getstream.chat.android.state.plugin.config.MessageBufferConfig import io.getstream.chat.android.state.plugin.state.global.internal.MutableGlobalState import io.getstream.chat.android.test.TestCoroutineExtension import kotlinx.coroutines.ExperimentalCoroutinesApi @@ -146,6 +147,7 @@ internal class TotalUnreadCountTest { repos = repos, sideEffect = sideEffect, syncedEvents = syncedEvents, + bufferConfig = MessageBufferConfig(), ) fun givenMockedRepositories(): Fixture { diff --git a/stream-chat-android-state/src/test/java/io/getstream/chat/android/state/event/handler/internal/EventHandlerSequentialTest.kt b/stream-chat-android-state/src/test/java/io/getstream/chat/android/state/event/handler/internal/EventHandlerSequentialTest.kt index 3d3fac45a5a..67881101fef 100644 --- a/stream-chat-android-state/src/test/java/io/getstream/chat/android/state/event/handler/internal/EventHandlerSequentialTest.kt +++ b/stream-chat-android-state/src/test/java/io/getstream/chat/android/state/event/handler/internal/EventHandlerSequentialTest.kt @@ -53,27 +53,38 @@ import io.getstream.chat.android.randomMute import io.getstream.chat.android.randomPoll import io.getstream.chat.android.randomString import io.getstream.chat.android.randomUser +import io.getstream.chat.android.state.event.handler.internal.batch.BatchEvent +import io.getstream.chat.android.state.plugin.config.MessageBufferConfig import io.getstream.chat.android.state.plugin.logic.internal.LogicRegistry import io.getstream.chat.android.state.plugin.state.StateRegistry import io.getstream.chat.android.state.plugin.state.global.internal.MutableGlobalState +import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.emptyFlow +import kotlinx.coroutines.test.advanceUntilIdle import kotlinx.coroutines.test.runTest import org.amshove.kluent.`should be equal to` import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.Test import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.Arguments import org.junit.jupiter.params.provider.MethodSource import org.mockito.kotlin.any +import org.mockito.kotlin.argumentCaptor +import org.mockito.kotlin.atLeast import org.mockito.kotlin.doReturn import org.mockito.kotlin.mock import org.mockito.kotlin.stub import org.mockito.kotlin.verify import org.mockito.kotlin.whenever +import java.util.concurrent.atomic.AtomicReference +@kotlinx.coroutines.ExperimentalCoroutinesApi internal class EventHandlerSequentialTest { @ParameterizedTest @@ -171,19 +182,188 @@ internal class EventHandlerSequentialTest { } } + @Test + fun `When buffer overflows with DROP_OLDEST, the oldest queued NewMessageEvent is dropped`() = runTest { + val fixture = Fixture() + .withBufferConfig( + MessageBufferConfig( + channelTypes = setOf(BUFFERED_CHANNEL_TYPE), + capacity = 1, + overflow = BufferOverflow.DROP_OLDEST, + ), + ) + .withReadEventsCapabilityForAny() + .pauseSideEffect() + val handler = fixture.get(this) + val first = newMessageEventOnBufferedType() + val second = newMessageEventOnBufferedType() + val third = newMessageEventOnBufferedType() + val fourth = newMessageEventOnBufferedType() + + handler.startListening() + advanceUntilIdle() + val listener = fixture.listener() + + listener.onEvent(first) + advanceUntilIdle() // collector picks up first, suspends on sideEffect + listener.onEvent(second) + listener.onEvent(third) + listener.onEvent(fourth) + advanceUntilIdle() // give the buffer a chance to apply DROP_OLDEST + + fixture.releaseSideEffect() + advanceUntilIdle() + + val processedEvents = capturedBatchEvents(fixture).flatMap { it.sortedEvents } + // The first event was already in the collector when overflow happened, and the latest + // emitted event survives in the buffer. The events in between are dropped. + assertTrue(first in processedEvents) { "Expected the first emitted event to survive" } + assertTrue(fourth in processedEvents) { "Expected the latest emitted event to survive" } + assertFalse(second in processedEvents) { "Expected the second event to be dropped (oldest queued)" } + assertFalse(third in processedEvents) { "Expected the third event to be dropped (oldest queued)" } + } + + @Test + fun `When buffer would overflow with DROP_LATEST, the latest NewMessageEvent is dropped`() = runTest { + val fixture = Fixture() + .withBufferConfig( + MessageBufferConfig( + channelTypes = setOf(BUFFERED_CHANNEL_TYPE), + capacity = 1, + overflow = BufferOverflow.DROP_LATEST, + ), + ) + .withReadEventsCapabilityForAny() + .pauseSideEffect() + val handler = fixture.get(this) + val first = newMessageEventOnBufferedType() + val second = newMessageEventOnBufferedType() + val third = newMessageEventOnBufferedType() + val fourth = newMessageEventOnBufferedType() + + handler.startListening() + advanceUntilIdle() + val listener = fixture.listener() + + listener.onEvent(first) + advanceUntilIdle() + listener.onEvent(second) + listener.onEvent(third) + listener.onEvent(fourth) + advanceUntilIdle() + + fixture.releaseSideEffect() + advanceUntilIdle() + + val processedEvents = capturedBatchEvents(fixture).flatMap { it.sortedEvents } + // With DROP_LATEST the in-flight first event survives, the second event takes the buffer + // slot and is processed once the gate releases; later emissions are dropped. + assertTrue(first in processedEvents) { "Expected the first emitted event to survive" } + assertTrue(second in processedEvents) { "Expected the second event to fit the buffer slot" } + assertFalse(third in processedEvents) { "Expected the third event to be dropped (latest)" } + assertFalse(fourth in processedEvents) { "Expected the fourth event to be dropped (latest)" } + } + + @Test + fun `When NewMessageEvent channelType is not buffered, no event is dropped`() = runTest { + val fixture = Fixture() + .withBufferConfig( + MessageBufferConfig( + channelTypes = setOf("livestream"), + capacity = 1, + overflow = BufferOverflow.DROP_OLDEST, + ), + ) + .withReadEventsCapabilityForAny() + .pauseSideEffect() + val handler = fixture.get(this) + val events = List(4) { newMessageEventOnBufferedType() } + + handler.startListening() + advanceUntilIdle() + val listener = fixture.listener() + + events.forEach { listener.onEvent(it) } + advanceUntilIdle() + + fixture.releaseSideEffect() + advanceUntilIdle() + + val processedEvents = capturedBatchEvents(fixture).flatMap { it.sortedEvents } + events.forEach { event -> + assertTrue(event in processedEvents) { + "Expected non-buffered channelType event to be processed (no drop)" + } + } + } + + @Test + fun `When listener is bombarded with thousands of NewMessageEvents, the buffer drops old ones`() = runTest { + val capacity = 100 + val totalEmissions = 5_000 + val fixture = Fixture() + .withBufferConfig( + MessageBufferConfig( + channelTypes = setOf(BUFFERED_CHANNEL_TYPE), + capacity = capacity, + overflow = BufferOverflow.DROP_OLDEST, + ), + ) + .withReadEventsCapabilityForAny() + val handler = fixture.get(this) + + handler.startListening() + advanceUntilIdle() + val listener = fixture.listener() + + // Bombard the listener synchronously. Because the test dispatcher won't run the + // collector between tryEmit calls, the buffer fills past `capacity` and DROP_OLDEST + // evicts the oldest events. + val emitted = List(totalEmissions) { newMessageEventOnBufferedType() } + emitted.forEach { listener.onEvent(it) } + advanceUntilIdle() + + val processed = capturedBatchEvents(fixture).flatMap { it.sortedEvents }.toSet() + assertTrue(processed.size < totalEmissions) { + "Expected drops under load: processed=${processed.size}, emitted=$totalEmissions" + } + assertTrue(emitted.last() in processed) { + "Expected the latest emitted event to survive DROP_OLDEST overflow" + } + assertFalse(emitted.first() in processed) { + "Expected the oldest emitted event to be dropped under load" + } + } + + private fun capturedBatchEvents(fixture: Fixture): List { + val captor = argumentCaptor() + verify(fixture.stateRegistry, atLeast(0)).handleBatchEvent(captor.capture()) + return captor.allValues + } + + private fun newMessageEventOnBufferedType() = randomNewMessageEvent( + cid = "$BUFFERED_CHANNEL_TYPE:${randomString()}", + channelType = BUFFERED_CHANNEL_TYPE, + ) + internal class Fixture { private var currentUser = randomUser() - private val subscribeForEvents: (ChatEventListener) -> Disposable = - { _ -> EventHandlerSequential.EMPTY_DISPOSABLE } + private val capturedListener = AtomicReference>() + private val subscribeForEvents: (ChatEventListener) -> Disposable = { listener -> + capturedListener.set(listener) + EventHandlerSequential.EMPTY_DISPOSABLE + } private val logicRegistry: LogicRegistry = mock() - private val stateRegistry: StateRegistry = mock() + val stateRegistry: StateRegistry = mock() private val clientState: ClientState = mock { on(it.user) doReturn MutableStateFlow(currentUser) } private var mutableGlobalState: MutableGlobalState? = null private var repos: RepositoryFacade = mock() - private val sideEffect: suspend () -> Unit = {} + private var sideEffectGate: CompletableDeferred = CompletableDeferred(Unit) + private val sideEffect: suspend () -> Unit = { sideEffectGate.await() } private val syncedEvents: Flow> = emptyFlow() + private var bufferConfig: MessageBufferConfig = MessageBufferConfig() fun withReadEventsCapability(cid: String) = apply { repos.stub { @@ -193,6 +373,14 @@ internal class EventHandlerSequentialTest { } } + fun withReadEventsCapabilityForAny() = apply { + repos.stub { + onBlocking { + selectChannel(any()) + } doReturn randomChannel(ownCapabilities = setOf(ChannelCapabilities.READ_EVENTS)) + } + } + fun withCurrentUser(user: User) = apply { currentUser = user } @@ -205,6 +393,20 @@ internal class EventHandlerSequentialTest { this.repos = repos } + fun withBufferConfig(config: MessageBufferConfig) = apply { + this.bufferConfig = config + } + + fun pauseSideEffect() = apply { + sideEffectGate = CompletableDeferred() + } + + fun releaseSideEffect() { + sideEffectGate.complete(Unit) + } + + fun listener(): ChatEventListener = capturedListener.get() + fun get(scope: CoroutineScope) = EventHandlerSequential( currentUserId = currentUser.id, subscribeForEvents = subscribeForEvents, @@ -215,11 +417,13 @@ internal class EventHandlerSequentialTest { repos = repos, sideEffect = sideEffect, syncedEvents = syncedEvents, + bufferConfig = bufferConfig, scope = scope, ) } companion object { + private const val BUFFERED_CHANNEL_TYPE = "messaging" private val initialTotalunreadCount: Int = positiveRandomInt() private val initialChannelUnreadCount: Int = positiveRandomInt() private val totalUnreadCount = positiveRandomInt() diff --git a/stream-chat-android-state/src/test/java/io/getstream/chat/android/state/event/handler/internal/EventHandlerSequentialUserMessagesDeletedTest.kt b/stream-chat-android-state/src/test/java/io/getstream/chat/android/state/event/handler/internal/EventHandlerSequentialUserMessagesDeletedTest.kt index e2053eefb7c..7b48caa3b33 100644 --- a/stream-chat-android-state/src/test/java/io/getstream/chat/android/state/event/handler/internal/EventHandlerSequentialUserMessagesDeletedTest.kt +++ b/stream-chat-android-state/src/test/java/io/getstream/chat/android/state/event/handler/internal/EventHandlerSequentialUserMessagesDeletedTest.kt @@ -26,6 +26,7 @@ import io.getstream.chat.android.client.utils.observable.Disposable import io.getstream.chat.android.randomDate import io.getstream.chat.android.randomMessage import io.getstream.chat.android.randomUser +import io.getstream.chat.android.state.plugin.config.MessageBufferConfig import io.getstream.chat.android.state.plugin.logic.channel.internal.ChannelLogic import io.getstream.chat.android.state.plugin.logic.internal.LogicRegistry import io.getstream.chat.android.state.plugin.state.StateRegistry @@ -345,6 +346,7 @@ internal class EventHandlerSequentialUserMessagesDeletedTest { repos = repos, sideEffect = sideEffect, syncedEvents = syncedEvents, + bufferConfig = MessageBufferConfig(), scope = scope, ) }