From 3f99981145ae4c850a5bee8d21b4f54f28216325 Mon Sep 17 00:00:00 2001 From: Chetan Mehrotra Date: Thu, 13 Jun 2019 10:01:55 +0530 Subject: [PATCH 01/10] Update to CosmosDB SDK v2.4.5 --- common/scala/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/scala/build.gradle b/common/scala/build.gradle index 2f457fc621e..f5f1b279a43 100644 --- a/common/scala/build.gradle +++ b/common/scala/build.gradle @@ -83,7 +83,7 @@ dependencies { compile 'io.reactivex:rxscala_2.12:0.26.5' compile 'io.reactivex:rxjava-reactive-streams:1.2.1' - compile ('com.microsoft.azure:azure-cosmosdb:2.4.2') + compile ('com.microsoft.azure:azure-cosmosdb:2.4.5') compile ('com.lightbend.akka:akka-stream-alpakka-s3_2.12:1.0.1') { exclude group: 'org.apache.httpcomponents' //Not used as alpakka uses akka-http From aed3700a17a04f5177bdc7c2d50bcae14029b2c5 Mon Sep 17 00:00:00 2001 From: Chetan Mehrotra Date: Fri, 14 Jun 2019 10:06:06 +0530 Subject: [PATCH 02/10] QueuedExecutor to enable controlled writes to db --- .../cosmosdb/CosmosDBArtifactStore.scala | 22 ++++-- .../database/cosmosdb/CosmosDBConfig.scala | 5 +- .../database/cosmosdb/QueuedExecutor.scala | 67 +++++++++++++++++++ 3 files changed, 89 insertions(+), 5 deletions(-) create mode 100644 common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/QueuedExecutor.scala diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala index c635092e63a..0aa80de332c 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala @@ -38,7 +38,7 @@ import org.apache.openwhisk.http.Messages import spray.json._ import scala.collection.JavaConverters._ -import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.{Await, ExecutionContext, Future} import scala.concurrent.duration._ import scala.util.Success @@ -79,6 +79,11 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected private val softDeleteTTL = config.softDeleteTTL.map(_.toSeconds.toInt) private val clusterIdValue = config.clusterId.map(JsString(_)) + private val docWriter = config.writeQueueConfig + .map(qc => + new QueuedExecutor[(JsObject, TransactionId), DocInfo](qc.queueSize, qc.concurrency)({ + case (js, tid) => putJsonDoc(js)(tid) + })) logging.info( this, @@ -97,9 +102,15 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected override protected[core] implicit val executionContext: ExecutionContext = system.dispatcher override protected[database] def put(d: DocumentAbstraction)(implicit transid: TransactionId): Future[DocInfo] = { - val asJson = d.toDocumentRecord + val json = d.toDocumentRecord + docWriter match { + case Some(w) => w.put((json, transid)) + case None => putJsonDoc(json) + } + } - val doc = toCosmosDoc(asJson) + protected[database] def putJsonDoc(json: JsObject)(implicit transid: TransactionId): Future[DocInfo] = { + val doc = toCosmosDoc(json) val id = doc.getId val docinfoStr = s"id: $id, rev: ${doc.getETag}" val start = transid.started(this, LoggingMarkers.DATABASE_SAVE, s"[PUT] '$collName' saving document: '$docinfoStr'") @@ -114,7 +125,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected .head() .recoverWith { case e: DocumentClientException if isConflict(e) && isNewDocument(doc) => - val docId = DocId(asJson.fields(_id).convertTo[String]) + val docId = DocId(json.fields(_id).convertTo[String]) //Fetch existing document and check if its deleted getRaw(docId).flatMap { case Some(js) => @@ -357,6 +368,8 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected logging.debug( this, s"[QueryMetricsEnabled] Collection [$collName] - Query [${querySpec.getQueryText}].\nQueryMetrics\n[$combinedMetrics]") + println( + s"[QueryMetricsEnabled] Collection [$collName] - Query [${querySpec.getQueryText}].\nQueryMetrics\n[$combinedMetrics]") } transid.finished(this, start, s"[QUERY] '$collName' completed: matched ${out.size}") } @@ -429,6 +442,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected override def shutdown(): Unit = { //Its async so a chance exist for next scheduled job to still trigger + docWriter.foreach(Await.ready(_, 10.seconds)) usageMetricRecorder.foreach(system.stop) attachmentStore.foreach(_.shutdown()) clientRef.close() diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfig.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfig.scala index 2dba5f829bd..d9a2bdf64a1 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfig.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfig.scala @@ -31,6 +31,8 @@ import pureconfig._ import scala.collection.JavaConverters._ import scala.concurrent.duration._ +case class WriteQueueConfig(queueSize: Int, concurrency: Int) + case class CosmosDBConfig(endpoint: String, key: String, db: String, @@ -40,7 +42,8 @@ case class CosmosDBConfig(endpoint: String, timeToLive: Option[Duration], clusterId: Option[String], softDeleteTTL: Option[FiniteDuration], - recordUsageFrequency: Option[FiniteDuration]) { + recordUsageFrequency: Option[FiniteDuration], + writeQueueConfig: Option[WriteQueueConfig]) { def createClient(): AsyncDocumentClient = { new AsyncDocumentClient.Builder() diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/QueuedExecutor.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/QueuedExecutor.scala new file mode 100644 index 00000000000..9fd250b31a9 --- /dev/null +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/QueuedExecutor.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.core.database.cosmosdb + +import akka.Done +import akka.stream.scaladsl.{Keep, Sink, Source} +import akka.stream.{ActorMaterializer, OverflowStrategy, QueueOfferResult} + +import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.util.{Failure, Success} + +class QueuedExecutor[T, R](queueSize: Int, concurrency: Int)(operation: T => Future[R])( + implicit materializer: ActorMaterializer, + ec: ExecutionContext) { + //TODO Track queue size + private val (docQueue, queueFinish) = Source + .queue[(T, Promise[R])](queueSize, OverflowStrategy.dropNew) + .mapAsyncUnordered(concurrency) { + case (d, p) => + val f = operation(d) + f.onComplete { + case Success(result) => p.success(result) + case Failure(e) => p.failure(e) + } + // Recover Future to not abort stream in case of a failure + f.recover { case _ => () } + } + .toMat(Sink.ignore)(Keep.both) + .run() + + /** + * Queues an element to be written later + * + * @param el the element to process + * @return a future containing the response of the database for this specific element + */ + def put(el: T): Future[R] = { + val promise = Promise[R]() + docQueue.offer(el -> promise).flatMap { + case QueueOfferResult.Enqueued => promise.future + case QueueOfferResult.Dropped => Future.failed(new Exception("DB request queue is full.")) + case QueueOfferResult.QueueClosed => Future.failed(new Exception("DB request queue was closed.")) + case QueueOfferResult.Failure(f) => Future.failed(f) + } + promise.future + } + + def close(): Future[Done] = { + docQueue.complete() + docQueue.watchCompletion().flatMap(_ => queueFinish) + } +} From caccf17d637f31552884e36ca71e30efc21d185b Mon Sep 17 00:00:00 2001 From: Chetan Mehrotra Date: Fri, 14 Jun 2019 11:28:22 +0530 Subject: [PATCH 03/10] Add test case for queued writes --- .../cosmosdb/CosmosDBArtifactStore.scala | 21 +++----- .../database/cosmosdb/DocumentPersister.scala | 49 +++++++++++++++++ .../database/cosmosdb/QueuedExecutor.scala | 8 +-- .../cosmosdb/CosmosDBQueuedWriteTests.scala | 53 +++++++++++++++++++ 4 files changed, 112 insertions(+), 19 deletions(-) create mode 100644 common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/DocumentPersister.scala create mode 100644 tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBQueuedWriteTests.scala diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala index 0aa80de332c..eb3e484f816 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala @@ -60,9 +60,10 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected with CosmosDBSupport with AttachmentSupport[DocumentAbstraction] { + override protected[core] implicit val executionContext: ExecutionContext = system.dispatcher + private val cosmosScheme = "cosmos" val attachmentScheme: String = attachmentStore.map(_.scheme).getOrElse(cosmosScheme) - protected val client: AsyncDocumentClient = clientRef.get.client private[cosmosdb] val (database, collection) = initialize() @@ -79,11 +80,8 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected private val softDeleteTTL = config.softDeleteTTL.map(_.toSeconds.toInt) private val clusterIdValue = config.clusterId.map(JsString(_)) - private val docWriter = config.writeQueueConfig - .map(qc => - new QueuedExecutor[(JsObject, TransactionId), DocInfo](qc.queueSize, qc.concurrency)({ - case (js, tid) => putJsonDoc(js)(tid) - })) + private val docPersister: DocumentPersister = + config.writeQueueConfig.map(new QueuedPersister(this, _)).getOrElse(new SimplePersister(this)) logging.info( this, @@ -99,14 +97,9 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected //Clone the returned instance as these are mutable def documentCollection(): DocumentCollection = new DocumentCollection(collection.toJson) - override protected[core] implicit val executionContext: ExecutionContext = system.dispatcher - override protected[database] def put(d: DocumentAbstraction)(implicit transid: TransactionId): Future[DocInfo] = { val json = d.toDocumentRecord - docWriter match { - case Some(w) => w.put((json, transid)) - case None => putJsonDoc(json) - } + docPersister.put(json) } protected[database] def putJsonDoc(json: JsObject)(implicit transid: TransactionId): Future[DocInfo] = { @@ -368,8 +361,6 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected logging.debug( this, s"[QueryMetricsEnabled] Collection [$collName] - Query [${querySpec.getQueryText}].\nQueryMetrics\n[$combinedMetrics]") - println( - s"[QueryMetricsEnabled] Collection [$collName] - Query [${querySpec.getQueryText}].\nQueryMetrics\n[$combinedMetrics]") } transid.finished(this, start, s"[QUERY] '$collName' completed: matched ${out.size}") } @@ -442,7 +433,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected override def shutdown(): Unit = { //Its async so a chance exist for next scheduled job to still trigger - docWriter.foreach(Await.ready(_, 10.seconds)) + Await.ready(docPersister.close(), 10.seconds) usageMetricRecorder.foreach(system.stop) attachmentStore.foreach(_.shutdown()) clientRef.close() diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/DocumentPersister.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/DocumentPersister.scala new file mode 100644 index 00000000000..3504fb1d1d4 --- /dev/null +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/DocumentPersister.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.core.database.cosmosdb + +import akka.Done +import akka.stream.ActorMaterializer +import org.apache.openwhisk.common.TransactionId +import org.apache.openwhisk.core.entity.DocInfo +import spray.json.JsObject + +import scala.concurrent.{ExecutionContext, Future} + +trait DocumentPersister { + def put(js: JsObject)(implicit transid: TransactionId): Future[DocInfo] + def close(): Future[Done] = Future.successful(Done) +} + +class SimplePersister(store: CosmosDBArtifactStore[_]) extends DocumentPersister { + override def put(js: JsObject)(implicit transid: TransactionId): Future[DocInfo] = store.putJsonDoc(js) +} + +class QueuedPersister(store: CosmosDBArtifactStore[_], config: WriteQueueConfig)( + implicit materializer: ActorMaterializer, + ec: ExecutionContext) + extends DocumentPersister { + private val queuedExecutor = + new QueuedExecutor[(JsObject, TransactionId), DocInfo](config.queueSize, config.concurrency)({ + case (js, tid) => store.putJsonDoc(js)(tid) + }) + + override def put(js: JsObject)(implicit transid: TransactionId): Future[DocInfo] = queuedExecutor.put((js, transid)) + + override def close(): Future[Done] = queuedExecutor.close() +} diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/QueuedExecutor.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/QueuedExecutor.scala index 9fd250b31a9..3bc7aa2cad0 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/QueuedExecutor.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/QueuedExecutor.scala @@ -28,7 +28,7 @@ class QueuedExecutor[T, R](queueSize: Int, concurrency: Int)(operation: T => Fut implicit materializer: ActorMaterializer, ec: ExecutionContext) { //TODO Track queue size - private val (docQueue, queueFinish) = Source + private val (queue, queueFinish) = Source .queue[(T, Promise[R])](queueSize, OverflowStrategy.dropNew) .mapAsyncUnordered(concurrency) { case (d, p) => @@ -51,7 +51,7 @@ class QueuedExecutor[T, R](queueSize: Int, concurrency: Int)(operation: T => Fut */ def put(el: T): Future[R] = { val promise = Promise[R]() - docQueue.offer(el -> promise).flatMap { + queue.offer(el -> promise).flatMap { case QueueOfferResult.Enqueued => promise.future case QueueOfferResult.Dropped => Future.failed(new Exception("DB request queue is full.")) case QueueOfferResult.QueueClosed => Future.failed(new Exception("DB request queue was closed.")) @@ -61,7 +61,7 @@ class QueuedExecutor[T, R](queueSize: Int, concurrency: Int)(operation: T => Fut } def close(): Future[Done] = { - docQueue.complete() - docQueue.watchCompletion().flatMap(_ => queueFinish) + queue.complete() + queue.watchCompletion().flatMap(_ => queueFinish) } } diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBQueuedWriteTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBQueuedWriteTests.scala new file mode 100644 index 00000000000..5cae8fa9f5b --- /dev/null +++ b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBQueuedWriteTests.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.core.database.cosmosdb + +import org.apache.openwhisk.common.TransactionId +import org.apache.openwhisk.core.database.DocumentSerializer +import org.apache.openwhisk.core.database.memory.MemoryAttachmentStoreProvider +import org.junit.runner.RunWith +import org.scalatest.FlatSpec +import org.scalatest.junit.JUnitRunner + +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.reflect.ClassTag + +@RunWith(classOf[JUnitRunner]) +class CosmosDBQueuedWriteTests extends FlatSpec with CosmosDBStoreBehaviorBase { + override def storeType = "CosmosDB_QueuedWrites" + + override protected def getAttachmentStore[D <: DocumentSerializer: ClassTag]() = + Some(MemoryAttachmentStoreProvider.makeStore[D]()) + + override def adaptCosmosDBConfig(config: CosmosDBConfig): CosmosDBConfig = + config.copy(writeQueueConfig = Some(WriteQueueConfig(1000, 2))) + + it should "write multiple documents" in { + implicit val tid: TransactionId = transid() + + val insertCount = 10 + val ns = newNS() + val activations = (1 to insertCount).map(newActivation(ns.asString, "testact", _)) + val f = Future.sequence(activations.map(activationStore.put(_))) + + implicit val patienceConfig: PatienceConfig = PatienceConfig(timeout = 2.minutes) + f.futureValue.size shouldBe insertCount + + } +} From 48e54ebefa71db80c4339fd8b33ad9aaa4e2e7ca Mon Sep 17 00:00:00 2001 From: Chetan Mehrotra Date: Fri, 14 Jun 2019 13:38:16 +0530 Subject: [PATCH 04/10] Track the queue size --- .../database/cosmosdb/QueuedExecutor.scala | 30 +++++- .../cosmosdb/QueuedExecutorTests.scala | 98 +++++++++++++++++++ 2 files changed, 123 insertions(+), 5 deletions(-) create mode 100644 tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/QueuedExecutorTests.scala diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/QueuedExecutor.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/QueuedExecutor.scala index 3bc7aa2cad0..fd90974ae97 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/QueuedExecutor.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/QueuedExecutor.scala @@ -20,22 +20,28 @@ package org.apache.openwhisk.core.database.cosmosdb import akka.Done import akka.stream.scaladsl.{Keep, Sink, Source} import akka.stream.{ActorMaterializer, OverflowStrategy, QueueOfferResult} +import kamon.metric.Gauge +import org.apache.openwhisk.common.Counter import scala.concurrent.{ExecutionContext, Future, Promise} import scala.util.{Failure, Success} -class QueuedExecutor[T, R](queueSize: Int, concurrency: Int)(operation: T => Future[R])( +class QueuedExecutor[T, R](queueSize: Int, concurrency: Int, gauge: Option[Gauge] = None)(operation: T => Future[R])( implicit materializer: ActorMaterializer, ec: ExecutionContext) { - //TODO Track queue size + private val counter = new Counter private val (queue, queueFinish) = Source .queue[(T, Promise[R])](queueSize, OverflowStrategy.dropNew) .mapAsyncUnordered(concurrency) { case (d, p) => val f = operation(d) f.onComplete { - case Success(result) => p.success(result) - case Failure(e) => p.failure(e) + case Success(result) => + elementRemoved() + p.success(result) + case Failure(e) => + elementRemoved() + p.failure(e) } // Recover Future to not abort stream in case of a failure f.recover { case _ => () } @@ -52,7 +58,9 @@ class QueuedExecutor[T, R](queueSize: Int, concurrency: Int)(operation: T => Fut def put(el: T): Future[R] = { val promise = Promise[R]() queue.offer(el -> promise).flatMap { - case QueueOfferResult.Enqueued => promise.future + case QueueOfferResult.Enqueued => + elementAdded() + promise.future case QueueOfferResult.Dropped => Future.failed(new Exception("DB request queue is full.")) case QueueOfferResult.QueueClosed => Future.failed(new Exception("DB request queue was closed.")) case QueueOfferResult.Failure(f) => Future.failed(f) @@ -60,8 +68,20 @@ class QueuedExecutor[T, R](queueSize: Int, concurrency: Int)(operation: T => Fut promise.future } + def size: Long = counter.cur + def close(): Future[Done] = { queue.complete() queue.watchCompletion().flatMap(_ => queueFinish) } + + private def elementAdded() = { + gauge.foreach(_.increment()) + counter.next() + } + + private def elementRemoved() = { + gauge.foreach(_.decrement()) + counter.prev() + } } diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/QueuedExecutorTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/QueuedExecutorTests.scala new file mode 100644 index 00000000000..7e152acef2d --- /dev/null +++ b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/QueuedExecutorTests.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.core.database.cosmosdb + +import akka.Done +import akka.stream.ActorMaterializer +import common.{LoggedFunction, WskActorSystem} +import org.apache.openwhisk.utils.retry +import org.junit.runner.RunWith +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.junit.JUnitRunner +import org.scalatest.{FlatSpec, Matchers} + +import scala.collection.mutable +import scala.concurrent.duration._ +import scala.concurrent.{Future, Promise} + +@RunWith(classOf[JUnitRunner]) +class QueuedExecutorTests extends FlatSpec with Matchers with WskActorSystem with ScalaFutures { + implicit val materializer: ActorMaterializer = ActorMaterializer() + + val promiseDelay = 100.milliseconds + def resolveDelayed(p: Promise[Unit], delay: FiniteDuration = promiseDelay) = + akka.pattern.after(delay, actorSystem.scheduler) { + p.success(()) + Future.successful(()) + } + + behavior of "QueuedExecutor" + + it should "complete queued values with the thrown exception" in { + val executor = new QueuedExecutor[Int, Int](2, 1)(_ => Future.failed(new Exception)) + + val r1 = executor.put(1) + val r2 = executor.put(2) + + r1.failed.futureValue shouldBe an[Exception] + r2.failed.futureValue shouldBe an[Exception] + + // the executor is still intact + val r3 = executor.put(3) + val r4 = executor.put(4) + + r3.failed.futureValue shouldBe an[Exception] + r4.failed.futureValue shouldBe an[Exception] + } + + it should "wait for executions to finish on close" in { + val count = 5 + + val ps = Seq.fill(count)(Promise[Unit]()) + val queuedPromises = mutable.Queue(ps: _*) + + val queuedOperation = LoggedFunction((i: Int) => { + queuedPromises.dequeue().future.map(_ => i + 1) + }) + + val executor = new QueuedExecutor[Int, Int](100, 1)(queuedOperation) + + val values = 1 to count + val results = values.map(executor.put) + + // First entry + retry(queuedOperation.calls should have size 1) + executor.size shouldBe count + + ps.head.success(()) + retry(queuedOperation.calls should have size 2) + + //Trigger close + val closeF = executor.close() + + //Let each operation complete now + ps.foreach(_.trySuccess(())) + + //Wait for executor to close + closeF.futureValue shouldBe Done + + queuedOperation.calls should have size count + Future.sequence(results).futureValue.sum shouldBe values.map(_ + 1).sum + executor.size shouldBe 0 + } +} From 6a8477570358b55be9408c8fe8db8b2dd4e0faf7 Mon Sep 17 00:00:00 2001 From: Chetan Mehrotra Date: Fri, 14 Jun 2019 13:41:48 +0530 Subject: [PATCH 05/10] Remove the extra counter --- .../core/database/cosmosdb/QueuedExecutor.scala | 10 ++-------- .../core/database/cosmosdb/QueuedExecutorTests.scala | 8 +++++--- 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/QueuedExecutor.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/QueuedExecutor.scala index fd90974ae97..20a6fdc2358 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/QueuedExecutor.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/QueuedExecutor.scala @@ -21,7 +21,6 @@ import akka.Done import akka.stream.scaladsl.{Keep, Sink, Source} import akka.stream.{ActorMaterializer, OverflowStrategy, QueueOfferResult} import kamon.metric.Gauge -import org.apache.openwhisk.common.Counter import scala.concurrent.{ExecutionContext, Future, Promise} import scala.util.{Failure, Success} @@ -29,7 +28,6 @@ import scala.util.{Failure, Success} class QueuedExecutor[T, R](queueSize: Int, concurrency: Int, gauge: Option[Gauge] = None)(operation: T => Future[R])( implicit materializer: ActorMaterializer, ec: ExecutionContext) { - private val counter = new Counter private val (queue, queueFinish) = Source .queue[(T, Promise[R])](queueSize, OverflowStrategy.dropNew) .mapAsyncUnordered(concurrency) { @@ -68,20 +66,16 @@ class QueuedExecutor[T, R](queueSize: Int, concurrency: Int, gauge: Option[Gauge promise.future } - def size: Long = counter.cur - def close(): Future[Done] = { queue.complete() queue.watchCompletion().flatMap(_ => queueFinish) } - private def elementAdded() = { + private def elementAdded(): Unit = { gauge.foreach(_.increment()) - counter.next() } - private def elementRemoved() = { + private def elementRemoved(): Unit = { gauge.foreach(_.decrement()) - counter.prev() } } diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/QueuedExecutorTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/QueuedExecutorTests.scala index 7e152acef2d..939d1ac1cbe 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/QueuedExecutorTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/QueuedExecutorTests.scala @@ -20,6 +20,7 @@ package org.apache.openwhisk.core.database.cosmosdb import akka.Done import akka.stream.ActorMaterializer import common.{LoggedFunction, WskActorSystem} +import kamon.metric.{AtomicLongGauge, MeasurementUnit} import org.apache.openwhisk.utils.retry import org.junit.runner.RunWith import org.scalatest.concurrent.ScalaFutures @@ -70,14 +71,15 @@ class QueuedExecutorTests extends FlatSpec with Matchers with WskActorSystem wit queuedPromises.dequeue().future.map(_ => i + 1) }) - val executor = new QueuedExecutor[Int, Int](100, 1)(queuedOperation) + val gauge = new AtomicLongGauge("", Map.empty, MeasurementUnit.none) + val executor = new QueuedExecutor[Int, Int](100, 1, Some(gauge))(queuedOperation) val values = 1 to count val results = values.map(executor.put) // First entry retry(queuedOperation.calls should have size 1) - executor.size shouldBe count + gauge.snapshot().value shouldBe count ps.head.success(()) retry(queuedOperation.calls should have size 2) @@ -93,6 +95,6 @@ class QueuedExecutorTests extends FlatSpec with Matchers with WskActorSystem wit queuedOperation.calls should have size count Future.sequence(results).futureValue.sum shouldBe values.map(_ + 1).sum - executor.size shouldBe 0 + gauge.snapshot().value shouldBe 0 } } From 92fa9ea10d19319f98ab66829bfc1c4d46397c65 Mon Sep 17 00:00:00 2001 From: Chetan Mehrotra Date: Fri, 14 Jun 2019 14:02:11 +0530 Subject: [PATCH 06/10] Wire up Kamon gauge to track queue size --- .../database/cosmosdb/CosmosDBArtifactStore.scala | 14 ++++++++++++-- .../core/database/cosmosdb/DocumentPersister.scala | 3 ++- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala index eb3e484f816..15c6c65dc60 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala @@ -81,14 +81,16 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected private val clusterIdValue = config.clusterId.map(JsString(_)) private val docPersister: DocumentPersister = - config.writeQueueConfig.map(new QueuedPersister(this, _)).getOrElse(new SimplePersister(this)) + config.writeQueueConfig + .map(new QueuedPersister(this, _, Some(queueSizeToken.gauge))) + .getOrElse(new SimplePersister(this)) logging.info( this, s"Initializing CosmosDBArtifactStore for collection [$collName]. Service endpoint [${client.getServiceEndpoint}], " + s"Read endpoint [${client.getReadEndpoint}], Write endpoint [${client.getWriteEndpoint}], Connection Policy [${client.getConnectionPolicy}], " + s"Time to live [${collection.getDefaultTimeToLive} secs, clusterId [${config.clusterId}], soft delete TTL [${config.softDeleteTTL}], " + - s"Consistency Level [${config.consistencyLevel}], Usage Metric Frequency [${config.recordUsageFrequency}]") + s"Consistency Level [${config.consistencyLevel}], Usage Metric Frequency [${config.recordUsageFrequency}], Write Queue: [${config.writeQueueConfig}]") private val usageMetricRecorder = config.recordUsageFrequency.map { f => Scheduler.scheduleWaitAtLeast(f, 10.seconds)(() => recordResourceUsage()) @@ -557,6 +559,14 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected else LogMarkerToken("cosmosdb", name, collName)(unit) } + private def queueSizeToken: LogMarkerToken = { + val unit = MeasurementUnit.none + val name = "writeQueue" + if (TransactionId.metricsKamonTags) + LogMarkerToken("cosmosdb", name, "size", tags = Map("collection" -> collName))(unit) + else LogMarkerToken("cosmosdb", name, collName)(unit) + } + private def isSoftDeleted(doc: Document) = doc.getBoolean(deleted) == true private def isSoftDeleted(js: JsObject) = js.fields.get(deleted).contains(JsTrue) diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/DocumentPersister.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/DocumentPersister.scala index 3504fb1d1d4..b3ce53d759e 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/DocumentPersister.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/DocumentPersister.scala @@ -19,6 +19,7 @@ package org.apache.openwhisk.core.database.cosmosdb import akka.Done import akka.stream.ActorMaterializer +import kamon.metric.Gauge import org.apache.openwhisk.common.TransactionId import org.apache.openwhisk.core.entity.DocInfo import spray.json.JsObject @@ -34,7 +35,7 @@ class SimplePersister(store: CosmosDBArtifactStore[_]) extends DocumentPersister override def put(js: JsObject)(implicit transid: TransactionId): Future[DocInfo] = store.putJsonDoc(js) } -class QueuedPersister(store: CosmosDBArtifactStore[_], config: WriteQueueConfig)( +class QueuedPersister(store: CosmosDBArtifactStore[_], config: WriteQueueConfig, gauge: Option[Gauge])( implicit materializer: ActorMaterializer, ec: ExecutionContext) extends DocumentPersister { From da594a6fab663c03006c354955aeea9a0af8b38a Mon Sep 17 00:00:00 2001 From: Chetan Mehrotra Date: Fri, 14 Jun 2019 15:47:24 +0530 Subject: [PATCH 07/10] Document the config --- common/scala/src/main/resources/application.conf | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf index 44f714c71a4..b7b3f00f25f 100644 --- a/common/scala/src/main/resources/application.conf +++ b/common/scala/src/main/resources/application.conf @@ -281,6 +281,14 @@ whisk { # } # } # } + + # Optional queued writes mode + # write-queue-config = { + # # Size of in memory queue. If queue gets full then elements would be dropped upon addition + # queue-size = 10000 + # # Number of concurrent connections to use to perform writes to db + # concurrency = 500 + # } } # transaction ID related configuration From c1da10252d5ab44afdef35d02f0e1bc65e415307 Mon Sep 17 00:00:00 2001 From: Chetan Mehrotra Date: Wed, 19 Jun 2019 09:55:13 +0530 Subject: [PATCH 08/10] Ensure that Gauge to measure queue size is actually passed --- .../openwhisk/core/database/cosmosdb/DocumentPersister.scala | 2 +- .../openwhisk/core/database/cosmosdb/QueuedExecutor.scala | 2 +- .../openwhisk/core/database/cosmosdb/QueuedExecutorTests.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/DocumentPersister.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/DocumentPersister.scala index b3ce53d759e..e9661b6ef59 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/DocumentPersister.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/DocumentPersister.scala @@ -40,7 +40,7 @@ class QueuedPersister(store: CosmosDBArtifactStore[_], config: WriteQueueConfig, ec: ExecutionContext) extends DocumentPersister { private val queuedExecutor = - new QueuedExecutor[(JsObject, TransactionId), DocInfo](config.queueSize, config.concurrency)({ + new QueuedExecutor[(JsObject, TransactionId), DocInfo](config.queueSize, config.concurrency, gauge)({ case (js, tid) => store.putJsonDoc(js)(tid) }) diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/QueuedExecutor.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/QueuedExecutor.scala index 20a6fdc2358..e1c59684dab 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/QueuedExecutor.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/QueuedExecutor.scala @@ -25,7 +25,7 @@ import kamon.metric.Gauge import scala.concurrent.{ExecutionContext, Future, Promise} import scala.util.{Failure, Success} -class QueuedExecutor[T, R](queueSize: Int, concurrency: Int, gauge: Option[Gauge] = None)(operation: T => Future[R])( +class QueuedExecutor[T, R](queueSize: Int, concurrency: Int, gauge: Option[Gauge])(operation: T => Future[R])( implicit materializer: ActorMaterializer, ec: ExecutionContext) { private val (queue, queueFinish) = Source diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/QueuedExecutorTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/QueuedExecutorTests.scala index 939d1ac1cbe..affb5e20e97 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/QueuedExecutorTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/QueuedExecutorTests.scala @@ -45,7 +45,7 @@ class QueuedExecutorTests extends FlatSpec with Matchers with WskActorSystem wit behavior of "QueuedExecutor" it should "complete queued values with the thrown exception" in { - val executor = new QueuedExecutor[Int, Int](2, 1)(_ => Future.failed(new Exception)) + val executor = new QueuedExecutor[Int, Int](2, 1, None)(_ => Future.failed(new Exception)) val r1 = executor.put(1) val r2 = executor.put(2) From 7caefeb135d877eb57a2ad9e243cb496512b7316 Mon Sep 17 00:00:00 2001 From: Chetan Mehrotra Date: Thu, 20 Jun 2019 10:54:47 +0530 Subject: [PATCH 09/10] Ensure that offer result future gets returned and not an empty promise --- .../apache/openwhisk/core/database/cosmosdb/QueuedExecutor.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/QueuedExecutor.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/QueuedExecutor.scala index e1c59684dab..12595d74455 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/QueuedExecutor.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/QueuedExecutor.scala @@ -63,7 +63,6 @@ class QueuedExecutor[T, R](queueSize: Int, concurrency: Int, gauge: Option[Gauge case QueueOfferResult.QueueClosed => Future.failed(new Exception("DB request queue was closed.")) case QueueOfferResult.Failure(f) => Future.failed(f) } - promise.future } def close(): Future[Done] = { From fe64e452a43056ea42adfb7ee3560e1ae36bf12b Mon Sep 17 00:00:00 2001 From: Chetan Mehrotra Date: Thu, 20 Jun 2019 11:19:30 +0530 Subject: [PATCH 10/10] Handle and test queue full scenario --- .../cosmosdb/CosmosDBArtifactStore.scala | 2 +- .../database/cosmosdb/DocumentPersister.scala | 32 +++++++++++++++---- .../database/cosmosdb/QueuedExecutor.scala | 5 ++- .../cosmosdb/CosmosDBQueuedWriteTests.scala | 14 +++++++- 4 files changed, 43 insertions(+), 10 deletions(-) diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala index 15c6c65dc60..3edda51571a 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala @@ -82,7 +82,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected private val clusterIdValue = config.clusterId.map(JsString(_)) private val docPersister: DocumentPersister = config.writeQueueConfig - .map(new QueuedPersister(this, _, Some(queueSizeToken.gauge))) + .map(new QueuedPersister(this, _, collName, Some(queueSizeToken.gauge))) .getOrElse(new SimplePersister(this)) logging.info( diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/DocumentPersister.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/DocumentPersister.scala index e9661b6ef59..d7a41e111f8 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/DocumentPersister.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/DocumentPersister.scala @@ -19,10 +19,13 @@ package org.apache.openwhisk.core.database.cosmosdb import akka.Done import akka.stream.ActorMaterializer -import kamon.metric.Gauge -import org.apache.openwhisk.common.TransactionId +import kamon.metric.{Gauge, MeasurementUnit} +import org.apache.openwhisk.common.LoggingMarkers.start +import org.apache.openwhisk.common.{LogMarkerToken, Logging, TransactionId} +import org.apache.openwhisk.core.database.StoreUtils.reportFailure +import org.apache.openwhisk.core.database.cosmosdb.CosmosDBUtil.{_id, _rev} import org.apache.openwhisk.core.entity.DocInfo -import spray.json.JsObject +import spray.json.{DefaultJsonProtocol, JsObject} import scala.concurrent.{ExecutionContext, Future} @@ -35,16 +38,31 @@ class SimplePersister(store: CosmosDBArtifactStore[_]) extends DocumentPersister override def put(js: JsObject)(implicit transid: TransactionId): Future[DocInfo] = store.putJsonDoc(js) } -class QueuedPersister(store: CosmosDBArtifactStore[_], config: WriteQueueConfig, gauge: Option[Gauge])( +class QueuedPersister(store: CosmosDBArtifactStore[_], config: WriteQueueConfig, collName: String, gauge: Option[Gauge])( implicit materializer: ActorMaterializer, - ec: ExecutionContext) - extends DocumentPersister { + ec: ExecutionContext, + logging: Logging) + extends DocumentPersister + with DefaultJsonProtocol { + private val enqueueDoc = + LogMarkerToken("database", "enqueueDocument", start)(MeasurementUnit.time.milliseconds) private val queuedExecutor = new QueuedExecutor[(JsObject, TransactionId), DocInfo](config.queueSize, config.concurrency, gauge)({ case (js, tid) => store.putJsonDoc(js)(tid) }) - override def put(js: JsObject)(implicit transid: TransactionId): Future[DocInfo] = queuedExecutor.put((js, transid)) + override def put(js: JsObject)(implicit transid: TransactionId): Future[DocInfo] = { + val id = js.fields(_id).convertTo[String] + val rev = js.fields.get(_rev).map(_.convertTo[String]).getOrElse("") + val docinfoStr = s"id: $id, rev: $rev" + val start = transid.started(this, enqueueDoc, s"[PUT] '$collName' saving document: '$docinfoStr'") + val f = queuedExecutor.put((js, transid)) + reportFailure( + f, + start, + failure => s"[PUT] '$collName' internal error for $docinfoStr, failure: '${failure.getMessage}'") + f + } override def close(): Future[Done] = queuedExecutor.close() } diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/QueuedExecutor.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/QueuedExecutor.scala index 12595d74455..1f228ff4991 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/QueuedExecutor.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/QueuedExecutor.scala @@ -25,9 +25,12 @@ import kamon.metric.Gauge import scala.concurrent.{ExecutionContext, Future, Promise} import scala.util.{Failure, Success} +case class QueuedExecutorFullException(message: String) extends Exception(message) + class QueuedExecutor[T, R](queueSize: Int, concurrency: Int, gauge: Option[Gauge])(operation: T => Future[R])( implicit materializer: ActorMaterializer, ec: ExecutionContext) { + private val queueFullException = QueuedExecutorFullException(s"Queue of size [$queueSize] is full. Entry dropped") private val (queue, queueFinish) = Source .queue[(T, Promise[R])](queueSize, OverflowStrategy.dropNew) .mapAsyncUnordered(concurrency) { @@ -59,7 +62,7 @@ class QueuedExecutor[T, R](queueSize: Int, concurrency: Int, gauge: Option[Gauge case QueueOfferResult.Enqueued => elementAdded() promise.future - case QueueOfferResult.Dropped => Future.failed(new Exception("DB request queue is full.")) + case QueueOfferResult.Dropped => Future.failed(queueFullException) case QueueOfferResult.QueueClosed => Future.failed(new Exception("DB request queue was closed.")) case QueueOfferResult.Failure(f) => Future.failed(f) } diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBQueuedWriteTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBQueuedWriteTests.scala index 5cae8fa9f5b..138b0ef71c5 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBQueuedWriteTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBQueuedWriteTests.scala @@ -36,7 +36,7 @@ class CosmosDBQueuedWriteTests extends FlatSpec with CosmosDBStoreBehaviorBase { Some(MemoryAttachmentStoreProvider.makeStore[D]()) override def adaptCosmosDBConfig(config: CosmosDBConfig): CosmosDBConfig = - config.copy(writeQueueConfig = Some(WriteQueueConfig(1000, 2))) + config.copy(writeQueueConfig = Some(WriteQueueConfig(20, 1))) it should "write multiple documents" in { implicit val tid: TransactionId = transid() @@ -48,6 +48,18 @@ class CosmosDBQueuedWriteTests extends FlatSpec with CosmosDBStoreBehaviorBase { implicit val patienceConfig: PatienceConfig = PatienceConfig(timeout = 2.minutes) f.futureValue.size shouldBe insertCount + } + + it should "log error when queue size is exceeded" in { + implicit val tid: TransactionId = transid() + + val insertCount = 50 + val ns = newNS() + val activations = (1 to insertCount).map(newActivation(ns.asString, "testact", _)) + val f = Future.sequence(activations.map(activationStore.put(_))) + implicit val patienceConfig: PatienceConfig = PatienceConfig(timeout = 2.minutes) + f.failed.futureValue shouldBe an[QueuedExecutorFullException] + stream.toString should include("QueuedExecutorFullException") } }