diff --git a/build.gradle.kts b/build.gradle.kts index 965f79b6..08be8283 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -6,7 +6,7 @@ val sonatypeUsername: String? = System.getenv("sonatypeUsername") val sonatypePassword: String? = System.getenv("sonatypePassword") plugins { - id("com.github.ben-manes.versions") version "0.39.0" + id("com.github.ben-manes.versions") version "0.44.0" id("io.codearte.nexus-staging") version "0.30.0" id("de.marcphilipp.nexus-publish") version "0.4.0" jacoco diff --git a/gradle.properties b/gradle.properties index 5ef27d4b..8852cf3c 100644 --- a/gradle.properties +++ b/gradle.properties @@ -3,18 +3,18 @@ version=0.18.1 # Dependencies coroutine_version=1.6.4 -jackson_version=2.13.4 -caffeine_version=3.1.1 -serialization_version=1.4.0 +jackson_version=2.14.1 +caffeine_version=3.1.2 +serialization_version=1.4.1 kDataLoader_version=0.5.1 deferredJsonBuilder_version=1.0.0 -ktor_version=2.1.2 +ktor_version=2.2.2 # Test-Dependencies kotlin_html_version=0.7.5 netty_version=4.1.82.Final -junit_version=5.9.0 -kluent_version=1.68 +junit_version=5.9.2 +kluent_version=1.72 hamcrest_version=2.2 @@ -25,7 +25,7 @@ coverallsGradlePlugin_version=2.8.4 jacoco_version=0.8.5 # Example Dependencies -logback_version=1.2.1 +logback_version=1.4.5 exposed_version=0.32.1 h2_version=1.4.200 hikari_version=4.0.3 diff --git a/kgraphql-example/build.gradle.kts b/kgraphql-example/build.gradle.kts index 0ca5f7fe..214b7864 100644 --- a/kgraphql-example/build.gradle.kts +++ b/kgraphql-example/build.gradle.kts @@ -1,8 +1,8 @@ plugins { base application - kotlin("jvm") version "1.7.10" - id("org.jetbrains.dokka") version "1.7.10" + kotlin("jvm") version "1.8.0" + id("org.jetbrains.dokka") version "1.7.20" signing } diff --git a/kgraphql-ktor/build.gradle.kts b/kgraphql-ktor/build.gradle.kts index 13b2b282..4ad4bbb4 100644 --- a/kgraphql-ktor/build.gradle.kts +++ b/kgraphql-ktor/build.gradle.kts @@ -1,8 +1,8 @@ plugins { base - kotlin("jvm") version "1.6.20" - kotlin("plugin.serialization") version "1.6.20" - id("org.jetbrains.dokka") version "1.4.32" + kotlin("jvm") version "1.8.0" + kotlin("plugin.serialization") version "1.8.0" + id("org.jetbrains.dokka") version "1.7.20" signing } diff --git a/kgraphql/build.gradle.kts b/kgraphql/build.gradle.kts index 4ac86575..1afee05c 100644 --- a/kgraphql/build.gradle.kts +++ b/kgraphql/build.gradle.kts @@ -1,8 +1,8 @@ plugins { base - kotlin("jvm") version "1.7.10" - id("org.jetbrains.dokka") version "1.7.10" + kotlin("jvm") version "1.8.0" + id("org.jetbrains.dokka") version "1.7.20" signing } @@ -26,6 +26,7 @@ dependencies { implementation(kotlin("reflect")) implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$coroutine_version") + implementation("org.jetbrains.kotlinx:kotlinx-coroutines-jdk8:$coroutine_version") implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:$serialization_version") // JVM dependency implementation("com.fasterxml.jackson.core:jackson-databind:$jackson_version") @@ -33,7 +34,8 @@ dependencies { implementation("com.github.ben-manes.caffeine:caffeine:$caffeine_version") implementation("com.apurebase:DeferredJsonBuilder:$deferredJsonBuilder_version") - api("de.nidomiro:KDataLoader:$kDataLoader_version") + +// api("de.nidomiro:KDataLoader:$kDataLoader_version") testImplementation("io.netty:netty-all:$netty_version") diff --git a/kgraphql/src/main/kotlin/com/apurebase/kgraphql/schema/dsl/DataLoaderPropertyDSL.kt b/kgraphql/src/main/kotlin/com/apurebase/kgraphql/schema/dsl/DataLoaderPropertyDSL.kt index a735ccc7..7b9b43ee 100644 --- a/kgraphql/src/main/kotlin/com/apurebase/kgraphql/schema/dsl/DataLoaderPropertyDSL.kt +++ b/kgraphql/src/main/kotlin/com/apurebase/kgraphql/schema/dsl/DataLoaderPropertyDSL.kt @@ -87,7 +87,6 @@ class DataLoaderPropertyDSL( { TimedAutoDispatcherDataLoaderOptions() }, mapOf(), dataLoader!!, - null, ) ) } diff --git a/kgraphql/src/main/kotlin/com/apurebase/kgraphql/schema/execution/DataLoaderPreparedRequestExecutor.kt b/kgraphql/src/main/kotlin/com/apurebase/kgraphql/schema/execution/DataLoaderPreparedRequestExecutor.kt index c4542dbc..69276275 100644 --- a/kgraphql/src/main/kotlin/com/apurebase/kgraphql/schema/execution/DataLoaderPreparedRequestExecutor.kt +++ b/kgraphql/src/main/kotlin/com/apurebase/kgraphql/schema/execution/DataLoaderPreparedRequestExecutor.kt @@ -16,9 +16,7 @@ import com.apurebase.kgraphql.schema.scalar.serializeScalar import com.apurebase.kgraphql.schema.structure.Field import com.apurebase.kgraphql.schema.structure.InputValue import com.apurebase.kgraphql.schema.structure.Type -import kotlinx.coroutines.CompletableDeferred -import kotlinx.coroutines.Deferred -import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.* import kotlinx.serialization.json.* import nidomiro.kdataloader.DataLoader import kotlin.reflect.KProperty1 @@ -34,25 +32,24 @@ class DataLoaderPreparedRequestExecutor(val schema: DefaultSchema) : RequestExec val loaders: Map, DataLoader> ) - private suspend fun ExecutionPlan.constructLoaders(): Map, DataLoader> { + private suspend fun ExecutionPlan.constructLoaders(): Map, DataLoader> = coroutineScope { val loaders = mutableMapOf, DataLoader>() suspend fun Collection.look() { forEach { ex -> - ex.selectionNode when (ex) { is Execution.Fragment -> ex.elements.look() is Execution.Node -> { ex.children.look() if (ex.field is Field.DataLoader<*, *, *>) { - loaders[ex.field] = ex.field.loader.constructNew() as DataLoader + loaders[ex.field] = ex.field.loader.constructNew(coroutineContext.job) as DataLoader } } } } } operations.look() - return loaders + loaders } private suspend fun DeferredJsonMap.writeOperation( diff --git a/kgraphql/src/main/kotlin/com/apurebase/kgraphql/schema/execution/ParallelRequestExecutor.kt b/kgraphql/src/main/kotlin/com/apurebase/kgraphql/schema/execution/ParallelRequestExecutor.kt index c57ac5b1..5c11a48a 100644 --- a/kgraphql/src/main/kotlin/com/apurebase/kgraphql/schema/execution/ParallelRequestExecutor.kt +++ b/kgraphql/src/main/kotlin/com/apurebase/kgraphql/schema/execution/ParallelRequestExecutor.kt @@ -292,7 +292,7 @@ class ParallelRequestExecutor(val schema: DefaultSchema) : RequestExecutor { ) // as this isn't the DataLoaderPreparedRequestExecutor. We'll use this instant workaround instead. - val loader = field.loader.constructNew() as DataLoader + val loader = field.loader.constructNew(null) as DataLoader val value = loader.loadAsync(preparedValue) loader.dispatch() diff --git a/kgraphql/src/main/kotlin/nidomiro/kdataloader/BatchMode.kt b/kgraphql/src/main/kotlin/nidomiro/kdataloader/BatchMode.kt new file mode 100644 index 00000000..311110cb --- /dev/null +++ b/kgraphql/src/main/kotlin/nidomiro/kdataloader/BatchMode.kt @@ -0,0 +1,13 @@ +package nidomiro.kdataloader + +sealed class BatchMode { + /** + * Load data in batches of [batchSize] + */ + data class LoadInBatch(val batchSize: Int? = null) : BatchMode() + + /** + * Load everything immediately + */ + object LoadImmediately : BatchMode() +} diff --git a/kgraphql/src/main/kotlin/nidomiro/kdataloader/Cache.kt b/kgraphql/src/main/kotlin/nidomiro/kdataloader/Cache.kt new file mode 100644 index 00000000..e28bde7f --- /dev/null +++ b/kgraphql/src/main/kotlin/nidomiro/kdataloader/Cache.kt @@ -0,0 +1,30 @@ +package nidomiro.kdataloader + +import kotlinx.coroutines.CompletableDeferred + +/** + * A "Threadsafe" Cache + * (Coroutine-Save) + */ +interface Cache { + + suspend fun store(key: K, value: CompletableDeferred): CompletableDeferred + + suspend fun get(key: K): CompletableDeferred? + + suspend fun getOrCreate( + key: K, + generator: suspend (key: K) -> CompletableDeferred, + callOnCacheHit: suspend () -> Unit + ): CompletableDeferred + + suspend fun clear(key: K): CompletableDeferred? + + suspend fun clear() +} + +suspend fun Cache.getOrCreate( + key: K, + generator: suspend (key: K) -> CompletableDeferred +): CompletableDeferred = + getOrCreate(key, generator, {}) diff --git a/kgraphql/src/main/kotlin/nidomiro/kdataloader/CoroutineMapCache.kt b/kgraphql/src/main/kotlin/nidomiro/kdataloader/CoroutineMapCache.kt new file mode 100644 index 00000000..0d5b5b2d --- /dev/null +++ b/kgraphql/src/main/kotlin/nidomiro/kdataloader/CoroutineMapCache.kt @@ -0,0 +1,54 @@ +package nidomiro.kdataloader + +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock + +class CoroutineMapCache( + private val cacheMap: MutableMap> = mutableMapOf() +) : Cache { + private val mutex = Mutex() + + + override suspend fun store(key: K, value: CompletableDeferred): CompletableDeferred { + mutex.withLock { + cacheMap[key] = value + } + return value + } + + override suspend fun get(key: K): CompletableDeferred? = + mutex.withLock { + cacheMap[key] + } + + override suspend fun getOrCreate( + key: K, + generator: suspend (key: K) -> CompletableDeferred, + callOnCacheHit: suspend () -> Unit + ): CompletableDeferred = + mutex.withLock { + val currentVal = cacheMap[key] + if (currentVal == null) { + val generated = generator(key) + cacheMap[key] = generated + return@withLock generated + } else { + callOnCacheHit() + return@withLock currentVal + } + } + + + override suspend fun clear(key: K): CompletableDeferred? = + mutex.withLock { + cacheMap.remove(key) + } + + override suspend fun clear() = + mutex.withLock { + cacheMap.clear() + } + + +} diff --git a/kgraphql/src/main/kotlin/nidomiro/kdataloader/DataLoader.kt b/kgraphql/src/main/kotlin/nidomiro/kdataloader/DataLoader.kt new file mode 100644 index 00000000..2fc8def5 --- /dev/null +++ b/kgraphql/src/main/kotlin/nidomiro/kdataloader/DataLoader.kt @@ -0,0 +1,83 @@ +package nidomiro.kdataloader + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Deferred +import kotlinx.coroutines.Job +import kotlinx.coroutines.joinAll +import kotlin.jvm.JvmName +import nidomiro.kdataloader.statistics.DataLoaderStatistics + +typealias BatchLoader = suspend (ids: List) -> List> + +interface DataLoader { + + val options: DataLoaderOptions + + /** + * Loads the value for the given Key. + * The returned [Deferred] completes with the finish of [dispatch] in case of [DataLoaderOptions.batchLoadEnabled] = true. + * If [DataLoaderOptions.batchLoadEnabled] = false it calls the BatchLoader immediately and returns the retrieved value. + */ + suspend fun loadAsync(key: K): Deferred + + /** + * The same as [loadAsync] but for multiple Keys at once. + */ + suspend fun loadManyAsync(vararg keys: K): Deferred> + + /** + * Executes all stored requests via the given batchLoader. + * After this function finishes all [Deferred] created before are completed. + */ + suspend fun dispatch() + + /** + * Removes the value of the given Key from the cache + */ + suspend fun clear(key: K) + + /** + * Removes all values from the cache + */ + suspend fun clearAll() + + /** + * Primes the cache with the given values. + * After priming the [BatchLoader] will not be called with this key. + */ + suspend fun prime(key: K, value: R) + + /** + * Primes the cache with the given [Throwable]. + * After priming the [BatchLoader] will not be called with this key, if [DataLoaderOptions.cacheExceptions] = true. + */ + suspend fun prime(key: K, value: Throwable) + + + /** + * Returns a snapshot of the statistics at the point of calling + */ + suspend fun createStatisticsSnapshot(): DataLoaderStatistics + +} + +/** + * @see DataLoader.prime(K, R) + */ +suspend fun SimpleDataLoaderImpl.prime(cacheEntry: Pair) { + prime(cacheEntry.first, cacheEntry.second) +} + +/** + * @see DataLoader.prime(K, Throwable) + */ +@JvmName("primeFailure") +suspend fun SimpleDataLoaderImpl.prime(cacheEntry: Pair) { + prime(cacheEntry.first, cacheEntry.second) +} + +internal suspend fun DataLoader.prime(key: K, value: ExecutionResult) = + when (value) { + is ExecutionResult.Success -> prime(key, value.value) + is ExecutionResult.Failure -> prime(key, value.throwable) + } diff --git a/kgraphql/src/main/kotlin/nidomiro/kdataloader/DataLoaderOptions.kt b/kgraphql/src/main/kotlin/nidomiro/kdataloader/DataLoaderOptions.kt new file mode 100644 index 00000000..365c9574 --- /dev/null +++ b/kgraphql/src/main/kotlin/nidomiro/kdataloader/DataLoaderOptions.kt @@ -0,0 +1,18 @@ +package nidomiro.kdataloader + +open class DataLoaderOptions( + /** + * The cache implementation + */ + val cache: Cache? = CoroutineMapCache(), + + /** + * Cache Exceptional States? + */ + val cacheExceptions: Boolean = true, + + /** + * The batch-mode + */ + val batchMode: BatchMode = BatchMode.LoadInBatch() +) diff --git a/kgraphql/src/main/kotlin/nidomiro/kdataloader/DefaultLoaderQueueImpl.kt b/kgraphql/src/main/kotlin/nidomiro/kdataloader/DefaultLoaderQueueImpl.kt new file mode 100644 index 00000000..7b533965 --- /dev/null +++ b/kgraphql/src/main/kotlin/nidomiro/kdataloader/DefaultLoaderQueueImpl.kt @@ -0,0 +1,27 @@ +package nidomiro.kdataloader + +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock + +class DefaultLoaderQueueImpl : LoaderQueue { + + private val mutex = Mutex() + private var queue: MutableList>> = mutableListOf() + + + override suspend fun enqueue(key: K, deferred: CompletableDeferred) { + mutex.withLock { + queue.add(LoaderQueueEntry(key, deferred)) + } + } + + override suspend fun getAllItemsAsList(): List>> = + mutex.withLock { + val currentQueue = queue + queue = mutableListOf() + return@withLock currentQueue + } + + +} diff --git a/kgraphql/src/main/kotlin/nidomiro/kdataloader/ExecutionResult.kt b/kgraphql/src/main/kotlin/nidomiro/kdataloader/ExecutionResult.kt new file mode 100644 index 00000000..032748e4 --- /dev/null +++ b/kgraphql/src/main/kotlin/nidomiro/kdataloader/ExecutionResult.kt @@ -0,0 +1,6 @@ +package nidomiro.kdataloader + +sealed class ExecutionResult { + data class Success(val value: T) : ExecutionResult() + data class Failure(val throwable: Throwable) : ExecutionResult() +} diff --git a/kgraphql/src/main/kotlin/nidomiro/kdataloader/LoaderQueue.kt b/kgraphql/src/main/kotlin/nidomiro/kdataloader/LoaderQueue.kt new file mode 100644 index 00000000..f46cecc8 --- /dev/null +++ b/kgraphql/src/main/kotlin/nidomiro/kdataloader/LoaderQueue.kt @@ -0,0 +1,21 @@ +package nidomiro.kdataloader + +import kotlinx.coroutines.CompletableDeferred + +interface LoaderQueue { + + suspend fun enqueue(key: K, deferred: CompletableDeferred) + + /** + * returns all stored Items as List and clears the queue + * (Coroutine-Save) + */ + suspend fun getAllItemsAsList(): List>> + +} + + +data class LoaderQueueEntry( + val key: K, + val value: V +) diff --git a/kgraphql/src/main/kotlin/nidomiro/kdataloader/SimpleDataLoaderImpl.kt b/kgraphql/src/main/kotlin/nidomiro/kdataloader/SimpleDataLoaderImpl.kt new file mode 100644 index 00000000..29ba76b5 --- /dev/null +++ b/kgraphql/src/main/kotlin/nidomiro/kdataloader/SimpleDataLoaderImpl.kt @@ -0,0 +1,170 @@ +package nidomiro.kdataloader + +import kotlinx.coroutines.* +import nidomiro.kdataloader.statistics.SimpleStatisticsCollector +import nidomiro.kdataloader.statistics.StatisticsCollector + +open class SimpleDataLoaderImpl( + override val options: DataLoaderOptions, + private val statisticsCollector: StatisticsCollector, + private val batchLoader: BatchLoader +) : DataLoader { + constructor(options: DataLoaderOptions, batchLoader: BatchLoader) : this( + options, + SimpleStatisticsCollector(), + batchLoader + ) + + constructor(batchLoader: BatchLoader) : this(DataLoaderOptions(), batchLoader) + + private val dataLoaderScope = CoroutineScope(Dispatchers.Default) + + private val queue: LoaderQueue = DefaultLoaderQueueImpl() + + @Suppress("DeferredResultUnused") + override suspend fun loadAsync(key: K): Deferred { + statisticsCollector.incLoadAsyncMethodCalledAsync() + statisticsCollector.incObjectsRequestedAsync() + + return internalLoadAsync(key) + } + + @Suppress("DeferredResultUnused") + private suspend fun internalLoadAsync(key: K): Deferred { + val block: suspend (key: K) -> CompletableDeferred = { + val newDeferred = CompletableDeferred() + queue.enqueue(key, newDeferred) + if (options.batchMode == BatchMode.LoadImmediately) { + dispatch() + } + newDeferred + } + + return if (options.cache != null) { + options.cache!!.getOrCreate(key, block, { statisticsCollector.incCacheHitCountAsync() }) + } else { + block(key) + } + } + + @Suppress("DeferredResultUnused") + override suspend fun loadManyAsync(vararg keys: K): Deferred> { + statisticsCollector.incLoadManyAsyncMethodCalledAsync() + statisticsCollector.incObjectsRequestedAsync(keys.size.toLong()) + + val deferreds = keys.map { internalLoadAsync(it) } + + return dataLoaderScope.async(Dispatchers.Default) { + return@async deferreds.map { it.await() } + } + } + + @Suppress("DeferredResultUnused") + override suspend fun dispatch() { + dataLoaderScope.launch { + statisticsCollector.incDispatchMethodCalledAsync() + + val queueEntries = if (options.cache != null) { + queue.getAllItemsAsList().distinctBy { it.key } + } else { + queue.getAllItemsAsList() + } + + queueEntries + .batchIfNeeded(options.batchMode) + .forEach { + executeDispatchOnQueueEntries(it) + } + } + } + + private fun List>>.batchIfNeeded(batchMode: BatchMode) = + if (batchMode is BatchMode.LoadInBatch && batchMode.batchSize != null) { + this.chunked(batchMode.batchSize) + } else { + listOf(this) + } + + private suspend fun executeDispatchOnQueueEntries(queueEntries: List>>) { + val keys = queueEntries.map { it.key } + if (keys.isNotEmpty()) { + executeBatchLoader(keys, queueEntries) + } + } + + @Suppress("DeferredResultUnused") + private suspend fun executeBatchLoader( + keys: List, + queueEntries: List>> + ) { + statisticsCollector.incBatchCallsExecutedAsync() + try { + batchLoader(keys).forEachIndexed { i, result -> + val queueEntry = queueEntries[i] + handleSingleBatchLoaderResult(result, queueEntry) + } + } catch (e: Throwable) { + handleCompleteBatchLoaderFailure(queueEntries, e) + } + } + + private suspend fun handleSingleBatchLoaderResult( + result: ExecutionResult, + queueEntry: LoaderQueueEntry> + ) { + when (result) { + is ExecutionResult.Success -> queueEntry.value.complete(result.value) + is ExecutionResult.Failure -> { + queueEntry.value.completeExceptionally(result.throwable) + if (!options.cacheExceptions) { + clear(queueEntry.key) + } + } + + } + } + + private suspend fun handleCompleteBatchLoaderFailure( + queueEntries: List>>, + e: Throwable + ) { + queueEntries.forEach { + clear(it.key) + it.value.completeExceptionally(e) + } + } + + @Suppress("DeferredResultUnused") + override suspend fun clear(key: K) { + statisticsCollector.incClearMethodCalledAsync() + + options.cache?.clear(key) + } + + @Suppress("DeferredResultUnused") + override suspend fun clearAll() { + statisticsCollector.incClearAllMethodCalledAsync() + options.cache?.clear() + } + + @Suppress("DeferredResultUnused") + override suspend fun prime(key: K, value: R) { + statisticsCollector.incPrimeMethodCalledAsync() + options.cache?.getOrCreate(key) { + CompletableDeferred(value) + } + } + + @Suppress("DeferredResultUnused") + override suspend fun prime(key: K, value: Throwable) { + statisticsCollector.incPrimeMethodCalledAsync() + options.cache?.getOrCreate(key) { + CompletableDeferred().apply { + completeExceptionally(value) + } + } + } + + override suspend fun createStatisticsSnapshot() = statisticsCollector.createStatisticsSnapshot() + +} diff --git a/kgraphql/src/main/kotlin/nidomiro/kdataloader/TimedAutoDispatcherDataLoaderOptions.kt b/kgraphql/src/main/kotlin/nidomiro/kdataloader/TimedAutoDispatcherDataLoaderOptions.kt new file mode 100644 index 00000000..c4f632e6 --- /dev/null +++ b/kgraphql/src/main/kotlin/nidomiro/kdataloader/TimedAutoDispatcherDataLoaderOptions.kt @@ -0,0 +1,8 @@ +package nidomiro.kdataloader + +class TimedAutoDispatcherDataLoaderOptions( + val waitInterval: Long = 100, + cache: Cache? = CoroutineMapCache(), + cacheExceptions: Boolean = true, + batchMode: BatchMode = BatchMode.LoadInBatch() +): DataLoaderOptions(cache, cacheExceptions, batchMode) diff --git a/kgraphql/src/main/kotlin/nidomiro/kdataloader/TimedAutoDispatcherImpl.kt b/kgraphql/src/main/kotlin/nidomiro/kdataloader/TimedAutoDispatcherImpl.kt new file mode 100644 index 00000000..eb4ae7a6 --- /dev/null +++ b/kgraphql/src/main/kotlin/nidomiro/kdataloader/TimedAutoDispatcherImpl.kt @@ -0,0 +1,65 @@ +package nidomiro.kdataloader + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import nidomiro.kdataloader.statistics.SimpleStatisticsCollector + +class TimedAutoDispatcherImpl( + options: TimedAutoDispatcherDataLoaderOptions, + batchLoader: BatchLoader, + parent: Job? = null, +) : SimpleDataLoaderImpl(options, SimpleStatisticsCollector(), batchLoader), CoroutineScope { + + private val autoChannel = Channel() + override val coroutineContext = Job(parent) + + val dataLoaderDispatcher = newSingleThreadContext("CounterContext") + + init { + launch(CoroutineName("TimedAutoDispatcherImpl:init")) { + var job: Job? = null + while (true) { + autoChannel.receive() +// println("TimedAutoDispatcherImpl:message") + if (job?.isActive == true) job.cancelAndJoin() + job = launch(CoroutineName("TimedAutoDispatcherImpl:autoChannel") + coroutineContext) { + delay(options.waitInterval) + if (isActive) launch { + dispatch() + } + } + } + } + } + + suspend fun cancel() { + coroutineContext.cancel() + autoChannel.close() + dispatch() + } + + override suspend fun loadAsync(key: K): Deferred { + return super.loadAsync(key).also { autoChannel.send(Unit) } + } + + override suspend fun loadManyAsync(vararg keys: K): Deferred> { + return super.loadManyAsync(*keys).also { autoChannel.send(Unit) } + } + + override suspend fun clear(key: K) { + super.clear(key).also { autoChannel.send(Unit) } + } + + override suspend fun clearAll() { + super.clearAll().also { autoChannel.send(Unit) } + } + + override suspend fun prime(key: K, value: R) { + super.prime(key, value).also { autoChannel.send(Unit) } + } + + override suspend fun prime(key: K, value: Throwable){ + super.prime(key, value).also { autoChannel.send(Unit) } + } + +} diff --git a/kgraphql/src/main/kotlin/nidomiro/kdataloader/factories/DataLoaderFactory.kt b/kgraphql/src/main/kotlin/nidomiro/kdataloader/factories/DataLoaderFactory.kt new file mode 100644 index 00000000..665db000 --- /dev/null +++ b/kgraphql/src/main/kotlin/nidomiro/kdataloader/factories/DataLoaderFactory.kt @@ -0,0 +1,24 @@ +package nidomiro.kdataloader.factories + +import kotlinx.coroutines.Job +import nidomiro.kdataloader.* +import kotlin.coroutines.CoroutineContext + +typealias DataLoaderFactoryMethod = (options: DataLoaderOptions, batchLoader: BatchLoader, parent: Job?) -> DataLoader + +open class DataLoaderFactory( + @Suppress("MemberVisibilityCanBePrivate") + protected val optionsFactory: () -> DataLoaderOptions, + @Suppress("MemberVisibilityCanBePrivate") + protected val batchLoader: BatchLoader, + @Suppress("MemberVisibilityCanBePrivate") + protected val cachePrimes: Map>, + protected val factoryMethod: DataLoaderFactoryMethod +) { + + suspend fun constructNew(parent: Job?): DataLoader { + val dataLoader = factoryMethod(optionsFactory(), batchLoader, parent) + cachePrimes.forEach { (key, value) -> dataLoader.prime(key, value) } + return dataLoader + } +} diff --git a/kgraphql/src/main/kotlin/nidomiro/kdataloader/factories/TimedAutoDispatcherDataLoaderFactory.kt b/kgraphql/src/main/kotlin/nidomiro/kdataloader/factories/TimedAutoDispatcherDataLoaderFactory.kt new file mode 100644 index 00000000..6b410454 --- /dev/null +++ b/kgraphql/src/main/kotlin/nidomiro/kdataloader/factories/TimedAutoDispatcherDataLoaderFactory.kt @@ -0,0 +1,18 @@ +package nidomiro.kdataloader.factories + +import kotlinx.coroutines.Job +import kotlinx.coroutines.job +import nidomiro.kdataloader.BatchLoader +import nidomiro.kdataloader.DataLoaderOptions +import nidomiro.kdataloader.ExecutionResult +import nidomiro.kdataloader.TimedAutoDispatcherImpl +import nidomiro.kdataloader.TimedAutoDispatcherDataLoaderOptions +import kotlin.coroutines.CoroutineContext + +class TimedAutoDispatcherDataLoaderFactory( + optionsFactory: () -> TimedAutoDispatcherDataLoaderOptions, + cachePrimes: Map>, + batchLoader: BatchLoader, +) : DataLoaderFactory(optionsFactory, batchLoader, cachePrimes, { _: DataLoaderOptions, bl: BatchLoader, parent -> + TimedAutoDispatcherImpl(optionsFactory(), bl, null) +}) diff --git a/kgraphql/src/main/kotlin/nidomiro/kdataloader/statistics/DataLoaderStatistics.kt b/kgraphql/src/main/kotlin/nidomiro/kdataloader/statistics/DataLoaderStatistics.kt new file mode 100644 index 00000000..2661d97d --- /dev/null +++ b/kgraphql/src/main/kotlin/nidomiro/kdataloader/statistics/DataLoaderStatistics.kt @@ -0,0 +1,18 @@ +package nidomiro.kdataloader.statistics + +data class DataLoaderStatistics( + val loadAsyncMethodCalled: Long = 0, + val loadManyAsyncMethodCalled: Long = 0, + val dispatchMethodCalled: Long = 0, + val clearMethodCalled: Long = 0, + val clearAllMethodCalled: Long = 0, + val primeMethodCalled: Long = 0, + + /** + * Contains the count of all Objects requested via loadAsync or loadAsyncMany + */ + val objectsRequested: Long = 0, + val batchCallsExecuted: Long = 0, + + val cacheHitCount: Long = 0 +) diff --git a/kgraphql/src/main/kotlin/nidomiro/kdataloader/statistics/SimpleStatisticsCollector.kt b/kgraphql/src/main/kotlin/nidomiro/kdataloader/statistics/SimpleStatisticsCollector.kt new file mode 100644 index 00000000..cba1a39a --- /dev/null +++ b/kgraphql/src/main/kotlin/nidomiro/kdataloader/statistics/SimpleStatisticsCollector.kt @@ -0,0 +1,59 @@ +package nidomiro.kdataloader.statistics + +import kotlinx.coroutines.* + +class SimpleStatisticsCollector : StatisticsCollector { + + private var loadAsyncMethodCalled: Long = 0 + private var loadAsyncManyMethodCalled: Long = 0 + private var dispatchMethodCalled: Long = 0 + private var clearMethodCalled: Long = 0 + private var clearAllMethodCalled: Long = 0 + private var primeMethodCalled: Long = 0 + private var objectsRequested: Long = 0 + private var batchCallsExecuted: Long = 0 + private var cacheHitCount: Long = 0 + + override suspend fun incLoadAsyncMethodCalledAsync() = + CompletableDeferred(++loadAsyncMethodCalled) + + override suspend fun incLoadManyAsyncMethodCalledAsync() = + CompletableDeferred(++loadAsyncManyMethodCalled) + + override suspend fun incDispatchMethodCalledAsync() = + CompletableDeferred(++dispatchMethodCalled) + + override suspend fun incClearMethodCalledAsync() = + CompletableDeferred(++clearMethodCalled) + + override suspend fun incClearAllMethodCalledAsync() = + CompletableDeferred(++clearAllMethodCalled) + + override suspend fun incPrimeMethodCalledAsync() = + CompletableDeferred(++primeMethodCalled) + + override suspend fun incObjectsRequestedAsync(objectCount: Long): Deferred { + objectsRequested += objectCount + return CompletableDeferred(objectsRequested) + } + + + override suspend fun incBatchCallsExecutedAsync() = + CompletableDeferred(++batchCallsExecuted) + + override suspend fun incCacheHitCountAsync() = + CompletableDeferred(++cacheHitCount) + + override suspend fun createStatisticsSnapshot(): DataLoaderStatistics = + DataLoaderStatistics( + loadAsyncMethodCalled = this.loadAsyncMethodCalled, + loadManyAsyncMethodCalled = this.loadAsyncManyMethodCalled, + dispatchMethodCalled = this.dispatchMethodCalled, + clearMethodCalled = this.clearMethodCalled, + clearAllMethodCalled = this.clearAllMethodCalled, + primeMethodCalled = this.primeMethodCalled, + objectsRequested = this.objectsRequested, + batchCallsExecuted = this.batchCallsExecuted, + cacheHitCount = this.cacheHitCount + ) +} diff --git a/kgraphql/src/main/kotlin/nidomiro/kdataloader/statistics/StatisticsCollector.kt b/kgraphql/src/main/kotlin/nidomiro/kdataloader/statistics/StatisticsCollector.kt new file mode 100644 index 00000000..78d8a1a0 --- /dev/null +++ b/kgraphql/src/main/kotlin/nidomiro/kdataloader/statistics/StatisticsCollector.kt @@ -0,0 +1,27 @@ +package nidomiro.kdataloader.statistics + +import kotlinx.coroutines.Deferred + +interface StatisticsCollector { + + suspend fun incLoadAsyncMethodCalledAsync(): Deferred + suspend fun incLoadManyAsyncMethodCalledAsync(): Deferred + suspend fun incDispatchMethodCalledAsync(): Deferred + suspend fun incClearMethodCalledAsync(): Deferred + suspend fun incClearAllMethodCalledAsync(): Deferred + suspend fun incPrimeMethodCalledAsync(): Deferred + + /** + * Increment by the number of requested objects + */ + suspend fun incObjectsRequestedAsync(objectCount: Long = 1): Deferred + + suspend fun incBatchCallsExecutedAsync(): Deferred + + suspend fun incCacheHitCountAsync(): Deferred + + /** + * returns a immutable copy of the statistics at the point of calling this method + */ + suspend fun createStatisticsSnapshot(): DataLoaderStatistics +} diff --git a/kgraphql/src/test/kotlin/com/apurebase/kgraphql/integration/DataLoaderExecutionTest.kt b/kgraphql/src/test/kotlin/com/apurebase/kgraphql/integration/DataLoaderExecutionTest.kt new file mode 100644 index 00000000..448755a8 --- /dev/null +++ b/kgraphql/src/test/kotlin/com/apurebase/kgraphql/integration/DataLoaderExecutionTest.kt @@ -0,0 +1,110 @@ +package com.apurebase.kgraphql.integration + +import com.apurebase.kgraphql.defaultSchema +import com.apurebase.kgraphql.schema.execution.Executor +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.debug.DebugProbes +import kotlinx.coroutines.delay +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withTimeout +import nidomiro.kdataloader.ExecutionResult +import org.junit.jupiter.api.RepeatedTest +import kotlin.random.Random +import kotlin.random.nextInt +import kotlin.random.nextLong + +class DataLoaderExecutionTest { + + private val fake = FakeComplicatedDataLoad() + + data class Item(val id: Int) + data class ItemValue(val itemId: Int, val value: String) + + val schema = defaultSchema { + configure { + executor = Executor.DataLoaderPrepared + timeout = null + } + + query("items") { + resolver { amount: Int? -> + println("query:items") + delay(Random.nextLong(50..250L)) + (1..(amount ?: 1_000)).map(::Item) + } + } + + type { + dataProperty("name") { + prepare { it.id } + loader { ids -> + println("loader:name ${ids.size}") + coroutineScope { + delay(Random.nextLong(1..15L)) + ids.map { ExecutionResult.Success("Name-$it")} + } + } + } + dataProperty>("values") { + prepare { it.id } + loader { ids -> + coroutineScope { + val start = Random.nextInt(1..1_000_000_000) + val delay = Random.nextLong(1..3L) + println("loader:values [size: ${ids.size}, delay: $delay]") + ids.map { id -> + ExecutionResult.Success((start..(start+3)).map { + ItemValue( + itemId = id, + value = fake.loadValue("delay:$delay,start:$start", delay), + ) + }) + } + } + } + } + } + type { + dataProperty("parent") { + prepare { it.itemId } + loader { ids -> + println("loader:parent ${ids.size}") + delay(5) + ids.map { + ExecutionResult.Success(Item(it)) + } + } + } + } + } + + @RepeatedTest(50) + fun Stress_test_with_dataloaders_and_custom_superviser_jobs() { +// DebugProbes.install() +// withTimeout(60_000) { + val result = schema.executeBlocking(""" + { + data1: items(amount: 250) { ...Fields } + data2: items(amount: 200) { ...Fields } + data3: items { ...Fields } + } + + fragment Fields on Item { + id + name + values { + parent { + values1: values { itemId, value } + values2: values { itemId, value } + values3: values { itemId, value } + values4: values { itemId, value } + } + } + } + """.trimIndent()) + + println(result) +// } + } + +} diff --git a/kgraphql/src/test/kotlin/com/apurebase/kgraphql/integration/FakeComplicatedDataLoad.kt b/kgraphql/src/test/kotlin/com/apurebase/kgraphql/integration/FakeComplicatedDataLoad.kt new file mode 100644 index 00000000..f35bd324 --- /dev/null +++ b/kgraphql/src/test/kotlin/com/apurebase/kgraphql/integration/FakeComplicatedDataLoad.kt @@ -0,0 +1,25 @@ +package com.apurebase.kgraphql.integration + +import kotlinx.coroutines.* +import kotlin.random.Random +import kotlin.random.nextLong + +class FakeComplicatedDataLoad: CoroutineScope { + override val coroutineContext = SupervisorJob() + + private val cache1 = SuspendCache.suspendCache, String> { (wait, key) -> + delay(wait) + "$wait-$key" + } + private val cache2 = SuspendCache.suspendCache, String> { (wait, key) -> + delay(wait + Random.nextLong(1..3L)) + "$key-$wait" + } + + suspend fun loadValue(returnValue: String, delay: Long = 50) = coroutineScope { + async(CoroutineName("FakeComplicatedDataLoad:loadValue:$returnValue:$delay")) { + "${cache1.get(delay to returnValue)}:${cache2.get(delay to returnValue)}" + }.await() + } + +} diff --git a/kgraphql/src/test/kotlin/com/apurebase/kgraphql/integration/SuspendableCache.kt b/kgraphql/src/test/kotlin/com/apurebase/kgraphql/integration/SuspendableCache.kt new file mode 100644 index 00000000..066f70d6 --- /dev/null +++ b/kgraphql/src/test/kotlin/com/apurebase/kgraphql/integration/SuspendableCache.kt @@ -0,0 +1,53 @@ +package com.apurebase.kgraphql.integration + +import com.github.benmanes.caffeine.cache.AsyncCache +import com.github.benmanes.caffeine.cache.Caffeine +import kotlinx.coroutines.* +import kotlinx.coroutines.future.await +import kotlinx.coroutines.future.future +import java.util.concurrent.CompletionException +import java.util.concurrent.TimeUnit +import kotlin.time.Duration +import kotlin.time.Duration.Companion.minutes + +open class SuspendCache( + private val cache: AsyncCache, + private val onGet: suspend (K) -> V, +): CoroutineScope, AsyncCache by cache { + override val coroutineContext = SupervisorJob() + suspend fun get(key: K): V = supervisorScope { + try { + getAsync(key).await() + } catch (e: CompletionException) { + if (e.cause != null) throw e.cause!! + else throw e + } + } + + fun put(key: K, value: V) { + put(key, future { value }) + } + + + private fun CoroutineScope.getAsync(key: K) = get(key) { k, executor -> + future(executor.asCoroutineDispatcher()) { + onGet(k) + } + } + + companion object { + fun suspendCache( + timeoutDuration: Duration = 2.minutes, + block: suspend (K) -> V, + ) = runBlocking { suspendCache(timeoutDuration, block) } + fun CoroutineScope.suspendCache( + timeoutDuration: Duration = 2.minutes, + block: suspend (K) -> V, + ) = Caffeine.newBuilder() + .expireAfterWrite(timeoutDuration.inWholeMilliseconds, TimeUnit.SECONDS) + .buildAsync { k, _-> future { block(k) } } + .let { SuspendCache(it, block)} + } +} + +