diff --git a/common/scala/build.gradle b/common/scala/build.gradle index 2f457fc621e..f5f1b279a43 100644 --- a/common/scala/build.gradle +++ b/common/scala/build.gradle @@ -83,7 +83,7 @@ dependencies { compile 'io.reactivex:rxscala_2.12:0.26.5' compile 'io.reactivex:rxjava-reactive-streams:1.2.1' - compile ('com.microsoft.azure:azure-cosmosdb:2.4.2') + compile ('com.microsoft.azure:azure-cosmosdb:2.4.5') compile ('com.lightbend.akka:akka-stream-alpakka-s3_2.12:1.0.1') { exclude group: 'org.apache.httpcomponents' //Not used as alpakka uses akka-http diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf index 44f714c71a4..b7b3f00f25f 100644 --- a/common/scala/src/main/resources/application.conf +++ b/common/scala/src/main/resources/application.conf @@ -281,6 +281,14 @@ whisk { # } # } # } + + # Optional queued writes mode + # write-queue-config = { + # # Size of in memory queue. If queue gets full then elements would be dropped upon addition + # queue-size = 10000 + # # Number of concurrent connections to use to perform writes to db + # concurrency = 500 + # } } # transaction ID related configuration diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala index c635092e63a..3edda51571a 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala @@ -38,7 +38,7 @@ import org.apache.openwhisk.http.Messages import spray.json._ import scala.collection.JavaConverters._ -import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.{Await, ExecutionContext, Future} import scala.concurrent.duration._ import scala.util.Success @@ -60,9 +60,10 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected with CosmosDBSupport with AttachmentSupport[DocumentAbstraction] { + override protected[core] implicit val executionContext: ExecutionContext = system.dispatcher + private val cosmosScheme = "cosmos" val attachmentScheme: String = attachmentStore.map(_.scheme).getOrElse(cosmosScheme) - protected val client: AsyncDocumentClient = clientRef.get.client private[cosmosdb] val (database, collection) = initialize() @@ -79,13 +80,17 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected private val softDeleteTTL = config.softDeleteTTL.map(_.toSeconds.toInt) private val clusterIdValue = config.clusterId.map(JsString(_)) + private val docPersister: DocumentPersister = + config.writeQueueConfig + .map(new QueuedPersister(this, _, collName, Some(queueSizeToken.gauge))) + .getOrElse(new SimplePersister(this)) logging.info( this, s"Initializing CosmosDBArtifactStore for collection [$collName]. Service endpoint [${client.getServiceEndpoint}], " + s"Read endpoint [${client.getReadEndpoint}], Write endpoint [${client.getWriteEndpoint}], Connection Policy [${client.getConnectionPolicy}], " + s"Time to live [${collection.getDefaultTimeToLive} secs, clusterId [${config.clusterId}], soft delete TTL [${config.softDeleteTTL}], " + - s"Consistency Level [${config.consistencyLevel}], Usage Metric Frequency [${config.recordUsageFrequency}]") + s"Consistency Level [${config.consistencyLevel}], Usage Metric Frequency [${config.recordUsageFrequency}], Write Queue: [${config.writeQueueConfig}]") private val usageMetricRecorder = config.recordUsageFrequency.map { f => Scheduler.scheduleWaitAtLeast(f, 10.seconds)(() => recordResourceUsage()) @@ -94,12 +99,13 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected //Clone the returned instance as these are mutable def documentCollection(): DocumentCollection = new DocumentCollection(collection.toJson) - override protected[core] implicit val executionContext: ExecutionContext = system.dispatcher - override protected[database] def put(d: DocumentAbstraction)(implicit transid: TransactionId): Future[DocInfo] = { - val asJson = d.toDocumentRecord + val json = d.toDocumentRecord + docPersister.put(json) + } - val doc = toCosmosDoc(asJson) + protected[database] def putJsonDoc(json: JsObject)(implicit transid: TransactionId): Future[DocInfo] = { + val doc = toCosmosDoc(json) val id = doc.getId val docinfoStr = s"id: $id, rev: ${doc.getETag}" val start = transid.started(this, LoggingMarkers.DATABASE_SAVE, s"[PUT] '$collName' saving document: '$docinfoStr'") @@ -114,7 +120,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected .head() .recoverWith { case e: DocumentClientException if isConflict(e) && isNewDocument(doc) => - val docId = DocId(asJson.fields(_id).convertTo[String]) + val docId = DocId(json.fields(_id).convertTo[String]) //Fetch existing document and check if its deleted getRaw(docId).flatMap { case Some(js) => @@ -429,6 +435,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected override def shutdown(): Unit = { //Its async so a chance exist for next scheduled job to still trigger + Await.ready(docPersister.close(), 10.seconds) usageMetricRecorder.foreach(system.stop) attachmentStore.foreach(_.shutdown()) clientRef.close() @@ -552,6 +559,14 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected else LogMarkerToken("cosmosdb", name, collName)(unit) } + private def queueSizeToken: LogMarkerToken = { + val unit = MeasurementUnit.none + val name = "writeQueue" + if (TransactionId.metricsKamonTags) + LogMarkerToken("cosmosdb", name, "size", tags = Map("collection" -> collName))(unit) + else LogMarkerToken("cosmosdb", name, collName)(unit) + } + private def isSoftDeleted(doc: Document) = doc.getBoolean(deleted) == true private def isSoftDeleted(js: JsObject) = js.fields.get(deleted).contains(JsTrue) diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfig.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfig.scala index 2dba5f829bd..d9a2bdf64a1 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfig.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfig.scala @@ -31,6 +31,8 @@ import pureconfig._ import scala.collection.JavaConverters._ import scala.concurrent.duration._ +case class WriteQueueConfig(queueSize: Int, concurrency: Int) + case class CosmosDBConfig(endpoint: String, key: String, db: String, @@ -40,7 +42,8 @@ case class CosmosDBConfig(endpoint: String, timeToLive: Option[Duration], clusterId: Option[String], softDeleteTTL: Option[FiniteDuration], - recordUsageFrequency: Option[FiniteDuration]) { + recordUsageFrequency: Option[FiniteDuration], + writeQueueConfig: Option[WriteQueueConfig]) { def createClient(): AsyncDocumentClient = { new AsyncDocumentClient.Builder() diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/DocumentPersister.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/DocumentPersister.scala new file mode 100644 index 00000000000..d7a41e111f8 --- /dev/null +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/DocumentPersister.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.core.database.cosmosdb + +import akka.Done +import akka.stream.ActorMaterializer +import kamon.metric.{Gauge, MeasurementUnit} +import org.apache.openwhisk.common.LoggingMarkers.start +import org.apache.openwhisk.common.{LogMarkerToken, Logging, TransactionId} +import org.apache.openwhisk.core.database.StoreUtils.reportFailure +import org.apache.openwhisk.core.database.cosmosdb.CosmosDBUtil.{_id, _rev} +import org.apache.openwhisk.core.entity.DocInfo +import spray.json.{DefaultJsonProtocol, JsObject} + +import scala.concurrent.{ExecutionContext, Future} + +trait DocumentPersister { + def put(js: JsObject)(implicit transid: TransactionId): Future[DocInfo] + def close(): Future[Done] = Future.successful(Done) +} + +class SimplePersister(store: CosmosDBArtifactStore[_]) extends DocumentPersister { + override def put(js: JsObject)(implicit transid: TransactionId): Future[DocInfo] = store.putJsonDoc(js) +} + +class QueuedPersister(store: CosmosDBArtifactStore[_], config: WriteQueueConfig, collName: String, gauge: Option[Gauge])( + implicit materializer: ActorMaterializer, + ec: ExecutionContext, + logging: Logging) + extends DocumentPersister + with DefaultJsonProtocol { + private val enqueueDoc = + LogMarkerToken("database", "enqueueDocument", start)(MeasurementUnit.time.milliseconds) + private val queuedExecutor = + new QueuedExecutor[(JsObject, TransactionId), DocInfo](config.queueSize, config.concurrency, gauge)({ + case (js, tid) => store.putJsonDoc(js)(tid) + }) + + override def put(js: JsObject)(implicit transid: TransactionId): Future[DocInfo] = { + val id = js.fields(_id).convertTo[String] + val rev = js.fields.get(_rev).map(_.convertTo[String]).getOrElse("") + val docinfoStr = s"id: $id, rev: $rev" + val start = transid.started(this, enqueueDoc, s"[PUT] '$collName' saving document: '$docinfoStr'") + val f = queuedExecutor.put((js, transid)) + reportFailure( + f, + start, + failure => s"[PUT] '$collName' internal error for $docinfoStr, failure: '${failure.getMessage}'") + f + } + + override def close(): Future[Done] = queuedExecutor.close() +} diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/QueuedExecutor.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/QueuedExecutor.scala new file mode 100644 index 00000000000..1f228ff4991 --- /dev/null +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/QueuedExecutor.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.core.database.cosmosdb + +import akka.Done +import akka.stream.scaladsl.{Keep, Sink, Source} +import akka.stream.{ActorMaterializer, OverflowStrategy, QueueOfferResult} +import kamon.metric.Gauge + +import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.util.{Failure, Success} + +case class QueuedExecutorFullException(message: String) extends Exception(message) + +class QueuedExecutor[T, R](queueSize: Int, concurrency: Int, gauge: Option[Gauge])(operation: T => Future[R])( + implicit materializer: ActorMaterializer, + ec: ExecutionContext) { + private val queueFullException = QueuedExecutorFullException(s"Queue of size [$queueSize] is full. Entry dropped") + private val (queue, queueFinish) = Source + .queue[(T, Promise[R])](queueSize, OverflowStrategy.dropNew) + .mapAsyncUnordered(concurrency) { + case (d, p) => + val f = operation(d) + f.onComplete { + case Success(result) => + elementRemoved() + p.success(result) + case Failure(e) => + elementRemoved() + p.failure(e) + } + // Recover Future to not abort stream in case of a failure + f.recover { case _ => () } + } + .toMat(Sink.ignore)(Keep.both) + .run() + + /** + * Queues an element to be written later + * + * @param el the element to process + * @return a future containing the response of the database for this specific element + */ + def put(el: T): Future[R] = { + val promise = Promise[R]() + queue.offer(el -> promise).flatMap { + case QueueOfferResult.Enqueued => + elementAdded() + promise.future + case QueueOfferResult.Dropped => Future.failed(queueFullException) + case QueueOfferResult.QueueClosed => Future.failed(new Exception("DB request queue was closed.")) + case QueueOfferResult.Failure(f) => Future.failed(f) + } + } + + def close(): Future[Done] = { + queue.complete() + queue.watchCompletion().flatMap(_ => queueFinish) + } + + private def elementAdded(): Unit = { + gauge.foreach(_.increment()) + } + + private def elementRemoved(): Unit = { + gauge.foreach(_.decrement()) + } +} diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBQueuedWriteTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBQueuedWriteTests.scala new file mode 100644 index 00000000000..138b0ef71c5 --- /dev/null +++ b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBQueuedWriteTests.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.core.database.cosmosdb + +import org.apache.openwhisk.common.TransactionId +import org.apache.openwhisk.core.database.DocumentSerializer +import org.apache.openwhisk.core.database.memory.MemoryAttachmentStoreProvider +import org.junit.runner.RunWith +import org.scalatest.FlatSpec +import org.scalatest.junit.JUnitRunner + +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.reflect.ClassTag + +@RunWith(classOf[JUnitRunner]) +class CosmosDBQueuedWriteTests extends FlatSpec with CosmosDBStoreBehaviorBase { + override def storeType = "CosmosDB_QueuedWrites" + + override protected def getAttachmentStore[D <: DocumentSerializer: ClassTag]() = + Some(MemoryAttachmentStoreProvider.makeStore[D]()) + + override def adaptCosmosDBConfig(config: CosmosDBConfig): CosmosDBConfig = + config.copy(writeQueueConfig = Some(WriteQueueConfig(20, 1))) + + it should "write multiple documents" in { + implicit val tid: TransactionId = transid() + + val insertCount = 10 + val ns = newNS() + val activations = (1 to insertCount).map(newActivation(ns.asString, "testact", _)) + val f = Future.sequence(activations.map(activationStore.put(_))) + + implicit val patienceConfig: PatienceConfig = PatienceConfig(timeout = 2.minutes) + f.futureValue.size shouldBe insertCount + } + + it should "log error when queue size is exceeded" in { + implicit val tid: TransactionId = transid() + + val insertCount = 50 + val ns = newNS() + val activations = (1 to insertCount).map(newActivation(ns.asString, "testact", _)) + val f = Future.sequence(activations.map(activationStore.put(_))) + + implicit val patienceConfig: PatienceConfig = PatienceConfig(timeout = 2.minutes) + f.failed.futureValue shouldBe an[QueuedExecutorFullException] + stream.toString should include("QueuedExecutorFullException") + } +} diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/QueuedExecutorTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/QueuedExecutorTests.scala new file mode 100644 index 00000000000..affb5e20e97 --- /dev/null +++ b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/QueuedExecutorTests.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.core.database.cosmosdb + +import akka.Done +import akka.stream.ActorMaterializer +import common.{LoggedFunction, WskActorSystem} +import kamon.metric.{AtomicLongGauge, MeasurementUnit} +import org.apache.openwhisk.utils.retry +import org.junit.runner.RunWith +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.junit.JUnitRunner +import org.scalatest.{FlatSpec, Matchers} + +import scala.collection.mutable +import scala.concurrent.duration._ +import scala.concurrent.{Future, Promise} + +@RunWith(classOf[JUnitRunner]) +class QueuedExecutorTests extends FlatSpec with Matchers with WskActorSystem with ScalaFutures { + implicit val materializer: ActorMaterializer = ActorMaterializer() + + val promiseDelay = 100.milliseconds + def resolveDelayed(p: Promise[Unit], delay: FiniteDuration = promiseDelay) = + akka.pattern.after(delay, actorSystem.scheduler) { + p.success(()) + Future.successful(()) + } + + behavior of "QueuedExecutor" + + it should "complete queued values with the thrown exception" in { + val executor = new QueuedExecutor[Int, Int](2, 1, None)(_ => Future.failed(new Exception)) + + val r1 = executor.put(1) + val r2 = executor.put(2) + + r1.failed.futureValue shouldBe an[Exception] + r2.failed.futureValue shouldBe an[Exception] + + // the executor is still intact + val r3 = executor.put(3) + val r4 = executor.put(4) + + r3.failed.futureValue shouldBe an[Exception] + r4.failed.futureValue shouldBe an[Exception] + } + + it should "wait for executions to finish on close" in { + val count = 5 + + val ps = Seq.fill(count)(Promise[Unit]()) + val queuedPromises = mutable.Queue(ps: _*) + + val queuedOperation = LoggedFunction((i: Int) => { + queuedPromises.dequeue().future.map(_ => i + 1) + }) + + val gauge = new AtomicLongGauge("", Map.empty, MeasurementUnit.none) + val executor = new QueuedExecutor[Int, Int](100, 1, Some(gauge))(queuedOperation) + + val values = 1 to count + val results = values.map(executor.put) + + // First entry + retry(queuedOperation.calls should have size 1) + gauge.snapshot().value shouldBe count + + ps.head.success(()) + retry(queuedOperation.calls should have size 2) + + //Trigger close + val closeF = executor.close() + + //Let each operation complete now + ps.foreach(_.trySuccess(())) + + //Wait for executor to close + closeF.futureValue shouldBe Done + + queuedOperation.calls should have size count + Future.sequence(results).futureValue.sum shouldBe values.map(_ + 1).sum + gauge.snapshot().value shouldBe 0 + } +}