diff --git a/.github/workflows/check.yml b/.github/workflows/check.yml index 74a90dbfd..0ae15c491 100644 --- a/.github/workflows/check.yml +++ b/.github/workflows/check.yml @@ -19,4 +19,4 @@ jobs: distribution: 'temurin' - name: Set up Gradle uses: gradle/actions/setup-gradle@v3 - - run: ./gradlew checkWithCodenarc checkstyleMain checkstyleTest runUnitTests + - run: ./gradlew checkWithCodenarc checkstyleMain checkstyleTest runUnitTests runLiveObjectUnitTests diff --git a/.github/workflows/integration-test.yml b/.github/workflows/integration-test.yml index 8ec98e980..89368a8a8 100644 --- a/.github/workflows/integration-test.yml +++ b/.github/workflows/integration-test.yml @@ -90,3 +90,21 @@ jobs: uses: gradle/actions/setup-gradle@v3 - run: ./gradlew :java:testRealtimeSuite -Pokhttp + + check-liveobjects: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + with: + submodules: 'recursive' + + - name: Set up the JDK + uses: actions/setup-java@v4 + with: + java-version: '17' + distribution: 'temurin' + + - name: Set up Gradle + uses: gradle/actions/setup-gradle@v3 + + - run: ./gradlew runLiveObjectIntegrationTests diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index f1e77a7c5..62b9b1f02 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -1,6 +1,6 @@ [versions] agp = "8.5.2" -junit = "4.12" +junit = "4.13.2" gson = "2.9.0" msgpack = "0.8.11" java-websocket = "1.5.3" @@ -21,7 +21,9 @@ okhttp = "4.12.0" test-retry = "1.6.0" kotlin = "2.1.10" coroutine = "1.9.0" +mockk = "1.14.2" turbine = "1.2.0" +ktor = "3.1.3" jetbrains-annoations = "26.0.2" [libraries] @@ -47,12 +49,16 @@ android-retrostreams = { group = "net.sourceforge.streamsupport", name = "androi okhttp = { group = "com.squareup.okhttp3", name = "okhttp", version.ref = "okhttp" } coroutine-core = { group = "org.jetbrains.kotlinx", name = "kotlinx-coroutines-core", version.ref = "coroutine" } coroutine-test = { group = "org.jetbrains.kotlinx", name = "kotlinx-coroutines-test", version.ref = "coroutine" } +mockk = { group = "io.mockk", name = "mockk", version.ref = "mockk" } turbine = { group = "app.cash.turbine", name = "turbine", version.ref = "turbine" } +ktor-client-core = { module = "io.ktor:ktor-client-core", version.ref = "ktor" } +ktor-client-cio = { module = "io.ktor:ktor-client-cio", version.ref = "ktor" } jetbrains = { group = "org.jetbrains", name = "annotations", version.ref = "jetbrains-annoations" } [bundles] common = ["msgpack", "vcdiff-core"] tests = ["junit", "hamcrest-all", "nanohttpd", "nanohttpd-nanolets", "nanohttpd-websocket", "mockito-core", "concurrentunit", "slf4j-simple"] +kotlin-tests = ["junit", "mockk", "coroutine-test", "nanohttpd", "turbine", "ktor-client-cio", "ktor-client-core"] instrumental-android = ["android-test-runner", "android-test-rules", "dexmaker", "dexmaker-dx", "dexmaker-mockito", "android-retrostreams"] [plugins] diff --git a/lib/src/main/java/io/ably/lib/objects/Adapter.java b/lib/src/main/java/io/ably/lib/objects/Adapter.java index a9e00beeb..926795c83 100644 --- a/lib/src/main/java/io/ably/lib/objects/Adapter.java +++ b/lib/src/main/java/io/ably/lib/objects/Adapter.java @@ -25,7 +25,7 @@ public void setChannelSerial(@NotNull String channelName, @NotNull String channe } @Override - public void send(ProtocolMessage msg, CompletionListener listener) throws AblyException { + public void send(@NotNull ProtocolMessage msg, @NotNull CompletionListener listener) throws AblyException { // Always queue LiveObjects messages to ensure reliable state synchronization and proper acknowledgment ably.connection.connectionManager.send(msg, true, listener); } diff --git a/lib/src/main/java/io/ably/lib/objects/LiveObjectsAdapter.java b/lib/src/main/java/io/ably/lib/objects/LiveObjectsAdapter.java index c6040c1b0..1050a1511 100644 --- a/lib/src/main/java/io/ably/lib/objects/LiveObjectsAdapter.java +++ b/lib/src/main/java/io/ably/lib/objects/LiveObjectsAdapter.java @@ -15,7 +15,7 @@ public interface LiveObjectsAdapter { * @param listener a listener to be notified of the success or failure of the send operation. * @throws AblyException if an error occurs during the send operation. */ - void send(ProtocolMessage msg, CompletionListener listener) throws AblyException; + void send(@NotNull ProtocolMessage msg, @NotNull CompletionListener listener) throws AblyException; /** * Sets the channel serial for a specific channel. diff --git a/live-objects/build.gradle.kts b/live-objects/build.gradle.kts index 745a9a47c..2adb88fff 100644 --- a/live-objects/build.gradle.kts +++ b/live-objects/build.gradle.kts @@ -1,3 +1,5 @@ +import org.gradle.api.tasks.testing.logging.TestExceptionFormat + plugins { `java-library` alias(libs.plugins.kotlin.jvm) @@ -9,14 +11,34 @@ repositories { dependencies { implementation(project(":java")) - testImplementation(kotlin("test")) implementation(libs.coroutine.core) - testImplementation(libs.coroutine.test) + testImplementation(kotlin("test")) + testImplementation(libs.bundles.kotlin.tests) +} + +tasks.withType().configureEach { + testLogging { + exceptionFormat = TestExceptionFormat.FULL + } + jvmArgs("--add-opens", "java.base/java.time=ALL-UNNAMED") + jvmArgs("--add-opens", "java.base/java.lang=ALL-UNNAMED") + beforeTest(closureOf { logger.lifecycle("-> $this") }) + outputs.upToDateWhen { false } +} + +tasks.register("runLiveObjectUnitTests") { + filter { + includeTestsMatching("io.ably.lib.objects.unit.*") + } } -tasks.test { - useJUnitPlatform() +tasks.register("runLiveObjectIntegrationTests") { + filter { + includeTestsMatching("io.ably.lib.objects.integration.*") + // Exclude the base integration test class + excludeTestsMatching("io.ably.lib.objects.integration.setup.IntegrationTest") + } } kotlin { diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/ErrorCodes.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/ErrorCodes.kt new file mode 100644 index 000000000..148b8abf4 --- /dev/null +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/ErrorCodes.kt @@ -0,0 +1,11 @@ +package io.ably.lib.objects + +internal enum class ErrorCode(public val code: Int) { + BadRequest(40_000), + InternalError(50_000), +} + +internal enum class HttpStatusCode(public val code: Int) { + BadRequest(400), + InternalServerError(500), +} diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt index 9a6606f5c..e60ed3565 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt @@ -1,7 +1,6 @@ package io.ably.lib.objects import io.ably.lib.realtime.CompletionListener -import io.ably.lib.types.AblyException import io.ably.lib.types.ErrorInfo import io.ably.lib.types.ProtocolMessage import kotlinx.coroutines.suspendCancellableCoroutine @@ -16,7 +15,7 @@ internal suspend fun LiveObjectsAdapter.sendAsync(message: ProtocolMessage) = su } override fun onError(reason: ErrorInfo) { - continuation.resumeWithException(AblyException.fromErrorInfo(reason)) + continuation.resumeWithException(ablyException(reason)) } }) } catch (e: Exception) { diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/Utils.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/Utils.kt new file mode 100644 index 000000000..088028e5b --- /dev/null +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/Utils.kt @@ -0,0 +1,35 @@ +package io.ably.lib.objects + +import io.ably.lib.types.AblyException +import io.ably.lib.types.ErrorInfo + +internal fun ablyException( + errorMessage: String, + errorCode: ErrorCode, + statusCode: HttpStatusCode = HttpStatusCode.BadRequest, + cause: Throwable? = null, +): AblyException { + val errorInfo = createErrorInfo(errorMessage, errorCode, statusCode) + return createAblyException(errorInfo, cause) +} + +internal fun ablyException( + errorInfo: ErrorInfo, + cause: Throwable? = null, +): AblyException = createAblyException(errorInfo, cause) + +private fun createErrorInfo( + errorMessage: String, + errorCode: ErrorCode, + statusCode: HttpStatusCode, +) = ErrorInfo(errorMessage, statusCode.code, errorCode.code) + +private fun createAblyException( + errorInfo: ErrorInfo, + cause: Throwable?, +) = cause?.let { AblyException.fromErrorInfo(it, errorInfo) } + ?: AblyException.fromErrorInfo(errorInfo) + +internal fun clientError(errorMessage: String) = ablyException(errorMessage, ErrorCode.BadRequest, HttpStatusCode.BadRequest) + +internal fun serverError(errorMessage: String) = ablyException(errorMessage, ErrorCode.InternalError, HttpStatusCode.InternalServerError) diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/TestUtils.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/TestUtils.kt new file mode 100644 index 000000000..17719b961 --- /dev/null +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/TestUtils.kt @@ -0,0 +1,60 @@ +package io.ably.lib.objects + +import java.lang.reflect.Field +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.delay +import kotlinx.coroutines.suspendCancellableCoroutine +import kotlinx.coroutines.withContext +import kotlinx.coroutines.withTimeout + +suspend fun assertWaiter(timeoutInMs: Long = 10_000, block: suspend () -> Boolean) { + withContext(Dispatchers.Default) { + withTimeout(timeoutInMs) { + do { + val success = block() + delay(100) + } while (!success) + } + } +} + +fun Any.setPrivateField(name: String, value: Any?) { + val valueField = javaClass.findField(name) + valueField.isAccessible = true + valueField.set(this, value) +} + +fun Any.getPrivateField(name: String): T { + val valueField = javaClass.findField(name) + valueField.isAccessible = true + @Suppress("UNCHECKED_CAST") + return valueField.get(this) as T +} + +private fun Class<*>.findField(name: String): Field { + var result = kotlin.runCatching { getDeclaredField(name) } + var currentClass = this + while (result.isFailure && currentClass.superclass != null) // stop when we got field or reached top of class hierarchy + { + currentClass = currentClass.superclass!! + result = kotlin.runCatching { currentClass.getDeclaredField(name) } + } + if (result.isFailure) { + throw result.exceptionOrNull() as Exception + } + return result.getOrNull() as Field +} + +suspend fun Any.invokePrivateSuspendMethod(methodName: String, vararg args: Any?): T = suspendCancellableCoroutine { cont -> + val suspendMethod = javaClass.declaredMethods.find { it.name == methodName } + ?: error("Method '$methodName' not found") + suspendMethod.isAccessible = true + suspendMethod.invoke(this, *args, cont) +} + +fun Any.invokePrivateMethod(methodName: String, vararg args: Any?): T { + val method = javaClass.declaredMethods.find { it.name == methodName } ?: error("Method '$methodName' not found") + method.isAccessible = true + @Suppress("UNCHECKED_CAST") + return method.invoke(this, *args) as T +} diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/LiveObjectTest.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/LiveObjectTest.kt new file mode 100644 index 000000000..7e672e178 --- /dev/null +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/LiveObjectTest.kt @@ -0,0 +1,17 @@ +package io.ably.lib.objects.integration + +import io.ably.lib.objects.integration.setup.IntegrationTest +import kotlinx.coroutines.test.runTest +import org.junit.Test +import kotlin.test.assertNotNull + +class LiveObjectTest : IntegrationTest() { + + @Test + fun testChannelObjectGetterTest() = runTest { + val channelName = generateChannelName() + val channel = getRealtimeChannel(channelName) + val objects = channel.objects + assertNotNull(objects) + } +} diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/setup/IntegrationTest.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/setup/IntegrationTest.kt new file mode 100644 index 000000000..ea323124b --- /dev/null +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/setup/IntegrationTest.kt @@ -0,0 +1,98 @@ +package io.ably.lib.objects.integration.setup + +import io.ably.lib.realtime.AblyRealtime +import io.ably.lib.realtime.Channel +import io.ably.lib.types.ChannelMode +import io.ably.lib.types.ChannelOptions +import kotlinx.coroutines.runBlocking +import org.junit.After +import org.junit.AfterClass +import org.junit.BeforeClass +import org.junit.Rule +import org.junit.rules.Timeout +import org.junit.runner.RunWith +import org.junit.runners.Parameterized +import java.util.UUID + +@RunWith(Parameterized::class) +abstract class IntegrationTest { + @Parameterized.Parameter + lateinit var testParams: String + + @JvmField + @Rule + val timeout: Timeout = Timeout.seconds(10) + + private val realtimeClients = mutableMapOf() + + /** + * Retrieves a realtime channel for the specified channel name and client ID + * If a client with the given clientID does not exist, a new client is created using the provided options. + * The channel is attached and ensured to be in the attached state before returning. + * + * @param channelName Name of the channel + * @param clientId The ID of the client to use or create. Defaults to "client1". + * @return The attached realtime channel. + * @throws Exception If the channel fails to attach or the client fails to connect. + */ + internal suspend fun getRealtimeChannel(channelName: String, clientId: String = "client1"): Channel { + val client = realtimeClients.getOrPut(clientId) { + sandbox.createRealtimeClient { + this.clientId = clientId + useBinaryProtocol = testParams == "msgpack_protocol" + }. apply { ensureConnected() } + } + val channelOpts = ChannelOptions().apply { + modes = arrayOf(ChannelMode.object_publish, ChannelMode.object_subscribe) + } + return client.channels.get(channelName, channelOpts).apply { + attach() + ensureAttached() + } + } + + /** + * Generates a unique channel name for testing purposes. + * This is mainly to avoid channel name/state/history collisions across tests in same file. + */ + internal fun generateChannelName(): String { + return "test-channel-${UUID.randomUUID()}" + } + + @After + fun afterEach() { + for (ablyRealtime in realtimeClients.values) { + for ((channelName, channel) in ablyRealtime.channels.entrySet()) { + channel.off() + ablyRealtime.channels.release(channelName) + } + ablyRealtime.close() + } + realtimeClients.clear() + } + + companion object { + private lateinit var sandbox: Sandbox + + @JvmStatic + @Parameterized.Parameters(name = "{0}") + fun data(): Iterable { + return listOf("msgpack_protocol", "json_protocol") + } + + @JvmStatic + @BeforeClass + @Throws(Exception::class) + fun setUpBeforeClass() { + runBlocking { + sandbox = Sandbox.createInstance() + } + } + + @JvmStatic + @AfterClass + @Throws(Exception::class) + fun tearDownAfterClass() { + } + } +} diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/setup/Sandbox.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/setup/Sandbox.kt new file mode 100644 index 000000000..7d2b05586 --- /dev/null +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/setup/Sandbox.kt @@ -0,0 +1,105 @@ +package io.ably.lib.objects.integration.setup + +import com.google.gson.JsonElement +import com.google.gson.JsonParser +import io.ably.lib.objects.ablyException +import io.ably.lib.realtime.* +import io.ably.lib.types.ClientOptions +import io.ktor.client.HttpClient +import io.ktor.client.engine.cio.CIO +import io.ktor.client.network.sockets.ConnectTimeoutException +import io.ktor.client.network.sockets.SocketTimeoutException +import io.ktor.client.plugins.HttpRequestRetry +import io.ktor.client.plugins.HttpRequestTimeoutException +import io.ktor.client.request.* +import io.ktor.client.statement.HttpResponse +import io.ktor.client.statement.bodyAsText +import io.ktor.http.ContentType +import io.ktor.http.contentType +import io.ktor.http.isSuccess +import kotlinx.coroutines.CompletableDeferred + +private val client = HttpClient(CIO) { + install(HttpRequestRetry) { + maxRetries = 5 + retryIf { _, response -> + !response.status.isSuccess() + } + retryOnExceptionIf { _, cause -> + cause is ConnectTimeoutException || + cause is HttpRequestTimeoutException || + cause is SocketTimeoutException + } + exponentialDelay() + } +} + +class Sandbox private constructor(val appId: String, val apiKey: String) { + companion object { + private suspend fun loadAppCreationJson(): JsonElement = + JsonParser.parseString( + client.get("https://raw.githubusercontent.com/ably/ably-common/refs/heads/main/test-resources/test-app-setup.json") { + contentType(ContentType.Application.Json) + }.bodyAsText(), + ).asJsonObject.get("post_apps") + + internal suspend fun createInstance(): Sandbox { + val response: HttpResponse = client.post("https://sandbox.realtime.ably-nonprod.net/apps") { + contentType(ContentType.Application.Json) + setBody(loadAppCreationJson().toString()) + } + val body = JsonParser.parseString(response.bodyAsText()) + + return Sandbox( + appId = body.asJsonObject["appId"].asString, + // From JS chat repo at 7985ab7 — "The key we need to use is the one at index 5, which gives enough permissions to interact with Chat and Channels" + apiKey = body.asJsonObject["keys"].asJsonArray[0].asJsonObject["keyStr"].asString, + ) + } + } +} + + +internal fun Sandbox.createRealtimeClient(options: ClientOptions.() -> Unit): AblyRealtime { + val clientOptions = ClientOptions().apply { + apply(options) + key = apiKey + environment = "sandbox" + } + return AblyRealtime(clientOptions) +} + +internal suspend fun AblyRealtime.ensureConnected() { + if (this.connection.state == ConnectionState.connected) { + return + } + val connectedDeferred = CompletableDeferred() + this.connection.on { + if (it.event == ConnectionEvent.connected) { + connectedDeferred.complete(Unit) + this.connection.off() + } else if (it.event != ConnectionEvent.connecting) { + connectedDeferred.completeExceptionally(ablyException(it.reason)) + this.connection.off() + this.close() + } + } + connectedDeferred.await() +} + +internal suspend fun Channel.ensureAttached() { + if (this.state == ChannelState.attached) { + return + } + val attachedDeferred = CompletableDeferred() + this.on { + if (it.event == ChannelEvent.attached) { + attachedDeferred.complete(Unit) + this.off() + } else if (it.event != ChannelEvent.attaching) { + attachedDeferred.completeExceptionally(ablyException(it.reason)) + this.off() + } + } + attachedDeferred.await() +} diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/unit/LiveObjectTest.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/unit/LiveObjectTest.kt new file mode 100644 index 000000000..4c4294877 --- /dev/null +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/unit/LiveObjectTest.kt @@ -0,0 +1,14 @@ +package io.ably.lib.objects.unit + +import kotlinx.coroutines.test.runTest +import org.junit.Test +import kotlin.test.assertNotNull + +class LiveObjectTest { + @Test + fun testChannelObjectGetterTest() = runTest { + val channel = getMockRealtimeChannel("test-channel") + val objects = channel.objects + assertNotNull(objects) + } +} diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt new file mode 100644 index 000000000..5946e6320 --- /dev/null +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt @@ -0,0 +1,37 @@ +package io.ably.lib.objects.unit + +import io.ably.lib.realtime.AblyRealtime +import io.ably.lib.realtime.Channel +import io.ably.lib.realtime.ChannelState +import io.ably.lib.types.ChannelMode +import io.ably.lib.types.ChannelOptions +import io.ably.lib.types.ClientOptions +import io.mockk.every +import io.mockk.mockk +import io.mockk.spyk + +internal fun getMockRealtimeChannel( + channelName: String, + clientId: String = "client1", + channelModes: Array = arrayOf(ChannelMode.object_publish, ChannelMode.object_subscribe)): Channel { + val client = AblyRealtime(ClientOptions().apply { + autoConnect = false + key = "keyName:Value" + this.clientId = clientId + }) + val channelOpts = ChannelOptions().apply { modes = channelModes } + val channel = client.channels.get(channelName, channelOpts) + return spyk(channel) { + every { attach() } answers { + state = ChannelState.attached + } + every { detach() } answers { + state = ChannelState.detached + } + every { subscribe(any(), any()) } returns mockk(relaxUnitFun = true) + every { subscribe(any>(), any()) } returns mockk(relaxUnitFun = true) + every { subscribe(any()) } returns mockk(relaxUnitFun = true) + }.apply { + state = ChannelState.attached + } +}