Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion common/scala/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ dependencies {

compile 'io.reactivex:rxscala_2.12:0.26.5'
compile 'io.reactivex:rxjava-reactive-streams:1.2.1'
compile ('com.microsoft.azure:azure-cosmosdb:2.4.2')
compile ('com.microsoft.azure:azure-cosmosdb:2.4.5')

compile ('com.lightbend.akka:akka-stream-alpakka-s3_2.12:1.0.1') {
exclude group: 'org.apache.httpcomponents' //Not used as alpakka uses akka-http
Expand Down
8 changes: 8 additions & 0 deletions common/scala/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,14 @@ whisk {
# }
# }
# }

# Optional queued writes mode
# write-queue-config = {
# # Size of in memory queue. If queue gets full then elements would be dropped upon addition
# queue-size = 10000
# # Number of concurrent connections to use to perform writes to db
# concurrency = 500
# }
}

# transaction ID related configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.openwhisk.http.Messages
import spray.json._

import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
import scala.util.Success

Expand All @@ -60,9 +60,10 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
with CosmosDBSupport
with AttachmentSupport[DocumentAbstraction] {

override protected[core] implicit val executionContext: ExecutionContext = system.dispatcher

private val cosmosScheme = "cosmos"
val attachmentScheme: String = attachmentStore.map(_.scheme).getOrElse(cosmosScheme)

protected val client: AsyncDocumentClient = clientRef.get.client
private[cosmosdb] val (database, collection) = initialize()

Expand All @@ -79,13 +80,17 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
private val softDeleteTTL = config.softDeleteTTL.map(_.toSeconds.toInt)

private val clusterIdValue = config.clusterId.map(JsString(_))
private val docPersister: DocumentPersister =
config.writeQueueConfig
.map(new QueuedPersister(this, _, collName, Some(queueSizeToken.gauge)))
.getOrElse(new SimplePersister(this))

logging.info(
this,
s"Initializing CosmosDBArtifactStore for collection [$collName]. Service endpoint [${client.getServiceEndpoint}], " +
s"Read endpoint [${client.getReadEndpoint}], Write endpoint [${client.getWriteEndpoint}], Connection Policy [${client.getConnectionPolicy}], " +
s"Time to live [${collection.getDefaultTimeToLive} secs, clusterId [${config.clusterId}], soft delete TTL [${config.softDeleteTTL}], " +
s"Consistency Level [${config.consistencyLevel}], Usage Metric Frequency [${config.recordUsageFrequency}]")
s"Consistency Level [${config.consistencyLevel}], Usage Metric Frequency [${config.recordUsageFrequency}], Write Queue: [${config.writeQueueConfig}]")

private val usageMetricRecorder = config.recordUsageFrequency.map { f =>
Scheduler.scheduleWaitAtLeast(f, 10.seconds)(() => recordResourceUsage())
Expand All @@ -94,12 +99,13 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
//Clone the returned instance as these are mutable
def documentCollection(): DocumentCollection = new DocumentCollection(collection.toJson)

override protected[core] implicit val executionContext: ExecutionContext = system.dispatcher

override protected[database] def put(d: DocumentAbstraction)(implicit transid: TransactionId): Future[DocInfo] = {
val asJson = d.toDocumentRecord
val json = d.toDocumentRecord
docPersister.put(json)
}

val doc = toCosmosDoc(asJson)
protected[database] def putJsonDoc(json: JsObject)(implicit transid: TransactionId): Future[DocInfo] = {
val doc = toCosmosDoc(json)
val id = doc.getId
val docinfoStr = s"id: $id, rev: ${doc.getETag}"
val start = transid.started(this, LoggingMarkers.DATABASE_SAVE, s"[PUT] '$collName' saving document: '$docinfoStr'")
Expand All @@ -114,7 +120,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
.head()
.recoverWith {
case e: DocumentClientException if isConflict(e) && isNewDocument(doc) =>
val docId = DocId(asJson.fields(_id).convertTo[String])
val docId = DocId(json.fields(_id).convertTo[String])
//Fetch existing document and check if its deleted
getRaw(docId).flatMap {
case Some(js) =>
Expand Down Expand Up @@ -429,6 +435,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected

override def shutdown(): Unit = {
//Its async so a chance exist for next scheduled job to still trigger
Await.ready(docPersister.close(), 10.seconds)
usageMetricRecorder.foreach(system.stop)
attachmentStore.foreach(_.shutdown())
clientRef.close()
Expand Down Expand Up @@ -552,6 +559,14 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
else LogMarkerToken("cosmosdb", name, collName)(unit)
}

private def queueSizeToken: LogMarkerToken = {
val unit = MeasurementUnit.none
val name = "writeQueue"
if (TransactionId.metricsKamonTags)
LogMarkerToken("cosmosdb", name, "size", tags = Map("collection" -> collName))(unit)
else LogMarkerToken("cosmosdb", name, collName)(unit)
}

private def isSoftDeleted(doc: Document) = doc.getBoolean(deleted) == true

private def isSoftDeleted(js: JsObject) = js.fields.get(deleted).contains(JsTrue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import pureconfig._
import scala.collection.JavaConverters._
import scala.concurrent.duration._

case class WriteQueueConfig(queueSize: Int, concurrency: Int)

case class CosmosDBConfig(endpoint: String,
key: String,
db: String,
Expand All @@ -40,7 +42,8 @@ case class CosmosDBConfig(endpoint: String,
timeToLive: Option[Duration],
clusterId: Option[String],
softDeleteTTL: Option[FiniteDuration],
recordUsageFrequency: Option[FiniteDuration]) {
recordUsageFrequency: Option[FiniteDuration],
writeQueueConfig: Option[WriteQueueConfig]) {

def createClient(): AsyncDocumentClient = {
new AsyncDocumentClient.Builder()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.openwhisk.core.database.cosmosdb

import akka.Done
import akka.stream.ActorMaterializer
import kamon.metric.{Gauge, MeasurementUnit}
import org.apache.openwhisk.common.LoggingMarkers.start
import org.apache.openwhisk.common.{LogMarkerToken, Logging, TransactionId}
import org.apache.openwhisk.core.database.StoreUtils.reportFailure
import org.apache.openwhisk.core.database.cosmosdb.CosmosDBUtil.{_id, _rev}
import org.apache.openwhisk.core.entity.DocInfo
import spray.json.{DefaultJsonProtocol, JsObject}

import scala.concurrent.{ExecutionContext, Future}

trait DocumentPersister {
def put(js: JsObject)(implicit transid: TransactionId): Future[DocInfo]
def close(): Future[Done] = Future.successful(Done)
}

class SimplePersister(store: CosmosDBArtifactStore[_]) extends DocumentPersister {
override def put(js: JsObject)(implicit transid: TransactionId): Future[DocInfo] = store.putJsonDoc(js)
}

class QueuedPersister(store: CosmosDBArtifactStore[_], config: WriteQueueConfig, collName: String, gauge: Option[Gauge])(
implicit materializer: ActorMaterializer,
ec: ExecutionContext,
logging: Logging)
extends DocumentPersister
with DefaultJsonProtocol {
private val enqueueDoc =
LogMarkerToken("database", "enqueueDocument", start)(MeasurementUnit.time.milliseconds)
private val queuedExecutor =
new QueuedExecutor[(JsObject, TransactionId), DocInfo](config.queueSize, config.concurrency, gauge)({
case (js, tid) => store.putJsonDoc(js)(tid)
})

override def put(js: JsObject)(implicit transid: TransactionId): Future[DocInfo] = {
val id = js.fields(_id).convertTo[String]
val rev = js.fields.get(_rev).map(_.convertTo[String]).getOrElse("")
val docinfoStr = s"id: $id, rev: $rev"
val start = transid.started(this, enqueueDoc, s"[PUT] '$collName' saving document: '$docinfoStr'")
val f = queuedExecutor.put((js, transid))
reportFailure(
f,
start,
failure => s"[PUT] '$collName' internal error for $docinfoStr, failure: '${failure.getMessage}'")
f
}

override def close(): Future[Done] = queuedExecutor.close()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.openwhisk.core.database.cosmosdb

import akka.Done
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy, QueueOfferResult}
import kamon.metric.Gauge

import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Failure, Success}

case class QueuedExecutorFullException(message: String) extends Exception(message)

class QueuedExecutor[T, R](queueSize: Int, concurrency: Int, gauge: Option[Gauge])(operation: T => Future[R])(
implicit materializer: ActorMaterializer,
ec: ExecutionContext) {
private val queueFullException = QueuedExecutorFullException(s"Queue of size [$queueSize] is full. Entry dropped")
private val (queue, queueFinish) = Source
.queue[(T, Promise[R])](queueSize, OverflowStrategy.dropNew)
.mapAsyncUnordered(concurrency) {
case (d, p) =>
val f = operation(d)
f.onComplete {
case Success(result) =>
elementRemoved()
p.success(result)
case Failure(e) =>
elementRemoved()
p.failure(e)
}
// Recover Future to not abort stream in case of a failure
f.recover { case _ => () }
}
.toMat(Sink.ignore)(Keep.both)
.run()

/**
* Queues an element to be written later
*
* @param el the element to process
* @return a future containing the response of the database for this specific element
*/
def put(el: T): Future[R] = {
val promise = Promise[R]()
queue.offer(el -> promise).flatMap {
case QueueOfferResult.Enqueued =>
elementAdded()
promise.future
case QueueOfferResult.Dropped => Future.failed(queueFullException)
case QueueOfferResult.QueueClosed => Future.failed(new Exception("DB request queue was closed."))
case QueueOfferResult.Failure(f) => Future.failed(f)
}
}

def close(): Future[Done] = {
queue.complete()
queue.watchCompletion().flatMap(_ => queueFinish)
}

private def elementAdded(): Unit = {
gauge.foreach(_.increment())
}

private def elementRemoved(): Unit = {
gauge.foreach(_.decrement())
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.openwhisk.core.database.cosmosdb

import org.apache.openwhisk.common.TransactionId
import org.apache.openwhisk.core.database.DocumentSerializer
import org.apache.openwhisk.core.database.memory.MemoryAttachmentStoreProvider
import org.junit.runner.RunWith
import org.scalatest.FlatSpec
import org.scalatest.junit.JUnitRunner

import scala.concurrent.Future
import scala.concurrent.duration._
import scala.reflect.ClassTag

@RunWith(classOf[JUnitRunner])
class CosmosDBQueuedWriteTests extends FlatSpec with CosmosDBStoreBehaviorBase {
override def storeType = "CosmosDB_QueuedWrites"

override protected def getAttachmentStore[D <: DocumentSerializer: ClassTag]() =
Some(MemoryAttachmentStoreProvider.makeStore[D]())

override def adaptCosmosDBConfig(config: CosmosDBConfig): CosmosDBConfig =
config.copy(writeQueueConfig = Some(WriteQueueConfig(20, 1)))

it should "write multiple documents" in {
implicit val tid: TransactionId = transid()

val insertCount = 10
val ns = newNS()
val activations = (1 to insertCount).map(newActivation(ns.asString, "testact", _))
val f = Future.sequence(activations.map(activationStore.put(_)))

implicit val patienceConfig: PatienceConfig = PatienceConfig(timeout = 2.minutes)
f.futureValue.size shouldBe insertCount
}

it should "log error when queue size is exceeded" in {
implicit val tid: TransactionId = transid()

val insertCount = 50
val ns = newNS()
val activations = (1 to insertCount).map(newActivation(ns.asString, "testact", _))
val f = Future.sequence(activations.map(activationStore.put(_)))

implicit val patienceConfig: PatienceConfig = PatienceConfig(timeout = 2.minutes)
f.failed.futureValue shouldBe an[QueuedExecutorFullException]
stream.toString should include("QueuedExecutorFullException")
}
}
Loading