From 54b32ef44272dc749d8fe1204489e4ea1a908b4b Mon Sep 17 00:00:00 2001 From: Chetan Mehrotra Date: Mon, 14 Oct 2019 11:47:32 +0530 Subject: [PATCH 1/2] Update cosmosdb to v2.6.2 --- common/scala/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/scala/build.gradle b/common/scala/build.gradle index 4a6bced7f4d..f2f7a4b82b8 100644 --- a/common/scala/build.gradle +++ b/common/scala/build.gradle @@ -83,7 +83,7 @@ dependencies { compile 'io.reactivex:rxscala_2.12:0.26.5' compile 'io.reactivex:rxjava-reactive-streams:1.2.1' - compile ('com.microsoft.azure:azure-cosmosdb:2.6.1') + compile ('com.microsoft.azure:azure-cosmosdb:2.6.2') compile ('com.lightbend.akka:akka-stream-alpakka-s3_2.12:1.0.1') { exclude group: 'org.apache.httpcomponents' //Not used as alpakka uses akka-http From e56df687c733c49df05da06eeff46385478287b1 Mon Sep 17 00:00:00 2001 From: Chetan Mehrotra Date: Mon, 14 Oct 2019 14:06:26 +0530 Subject: [PATCH 2/2] Collect query stats for activation poll query --- .../cosmosdb/CosmosDBArtifactStore.scala | 10 ++- .../cosmosdb/CosmosDBViewMapper.scala | 70 ++++++++++++++++++- 2 files changed, 77 insertions(+), 3 deletions(-) diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala index 1ff638f130c..57cb6ea1539 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala @@ -363,14 +363,20 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected .map(l => if (limit > 0) l.take(limit) else l) val g = f.andThen { - case Success(out) => + case Success(queryResult) => if (queryMetrics.nonEmpty) { val combinedMetrics = QueryMetrics.ZERO.add(queryMetrics: _*) logging.debug( this, s"[QueryMetricsEnabled] Collection [$collName] - Query [${querySpec.getQueryText}].\nQueryMetrics\n[$combinedMetrics]") } - transid.finished(this, start, s"[QUERY] '$collName' completed: matched ${out.size}", InfoLevel) + val stats = viewMapper.recordQueryStats(ddoc, viewName, descending, querySpec.getParameters, queryResult) + val statsToLog = stats.map(s => " " + s).getOrElse("") + transid.finished( + this, + start, + s"[QUERY] '$collName' completed: matched ${queryResult.size}$statsToLog", + InfoLevel) } reportFailure(g, start, failure => s"[QUERY] '$collName' internal error, failure: '${failure.getMessage}'") } diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBViewMapper.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBViewMapper.scala index 6707e343724..22aa98ee8ff 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBViewMapper.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBViewMapper.scala @@ -22,6 +22,8 @@ import java.util.Collections import com.microsoft.azure.cosmosdb.DataType.{Number, String} import com.microsoft.azure.cosmosdb.IndexKind.Range import com.microsoft.azure.cosmosdb.{PartitionKeyDefinition, SqlParameter, SqlParameterCollection, SqlQuerySpec} +import kamon.metric.MeasurementUnit +import org.apache.openwhisk.common.{LogMarkerToken, TransactionId, WhiskInstants} import org.apache.openwhisk.core.database.ActivationHandler.NS_PATH import org.apache.openwhisk.core.database.WhisksHandler.ROOT_NS import org.apache.openwhisk.core.database.cosmosdb.CosmosDBConstants.{alias, computed, deleted} @@ -34,6 +36,10 @@ import org.apache.openwhisk.core.database.{ WhisksHandler } import org.apache.openwhisk.core.entity.WhiskQueries.TOP +import org.apache.openwhisk.utils.JsHelpers +import spray.json.{JsNumber, JsObject} + +import scala.collection.JavaConverters._ private[cosmosdb] trait CosmosDBViewMapper { protected val NOTHING = "" @@ -71,6 +77,17 @@ private[cosmosdb] trait CosmosDBViewMapper { new SqlQuerySpec(query, paramColl) } + + /** + * Records query related stats based on result returned and arguments passed + * + * @return an optional string representation of stats for logging purpose + */ + def recordQueryStats(ddoc: String, + viewName: String, + descending: Boolean, + queryParams: SqlParameterCollection, + result: List[JsObject]): Option[String] = None } private[cosmosdb] abstract class SimpleMapper extends CosmosDBViewMapper { @@ -183,7 +200,8 @@ private[cosmosdb] object WhisksViewMapper extends SimpleMapper { } } -private[cosmosdb] object ActivationViewMapper extends SimpleMapper { +private[cosmosdb] object ActivationViewMapper extends SimpleMapper with WhiskInstants { + import CosmosDBViewMapper._ private val NS = "namespace" private val NS_WITH_PATH = s"$computed.$NS_PATH" private val START = "start" @@ -233,6 +251,41 @@ private[cosmosdb] object ActivationViewMapper extends SimpleMapper { case "activations" if ddoc.startsWith("whisks") => s"r.$START" case _ => throw UnsupportedView(s"$ddoc/$view") } + + private val resultDeltaToken = createStatsToken("activations", "resultDelta", "activations") + private val sinceDeltaToken = createStatsToken("activations", "sinceDelta", "activations") + + override def recordQueryStats(ddoc: String, + viewName: String, + descending: Boolean, + queryParams: SqlParameterCollection, + result: List[JsObject]): Option[String] = { + val stat = if (viewName == "activations" && descending) { + // Collect stats for the delta between + // 1. now and start time of last activation + // 2. now and start time as specific in query for `since` parameter + // These stats would help in determining how much old activations are being queried for list query (used in activation poll) + val uptoOpt = paramValue(queryParams, "upto", classOf[Number]) + val startOpt = paramValue(queryParams, "start", classOf[Number]) + + // Result json has structure { id: "", "key": [], "value": {activation}} + // So fetch value of start via `value.start` path + val lastOpt = result.lastOption.flatMap(js => JsHelpers.getFieldPath(js, "value", "start")) + + (uptoOpt, startOpt, lastOpt) match { + //Go for case which does not specify upto as that would be the case with poll based query + case (None, Some(startFromQuery), Some(JsNumber(start))) => + val now = nowInMillis().toEpochMilli + val resultStartDelta = (now - start.longValue()).max(0) + val queryStartDelta = (now - startFromQuery.longValue()).max(0) + resultDeltaToken.histogram.record(resultStartDelta) + sinceDeltaToken.histogram.record(queryStartDelta) + Some(s"resultDelta=$resultStartDelta, sinceDelta=$queryStartDelta") + case _ => None + } + } else None + stat + } } private[cosmosdb] object SubjectViewMapper extends CosmosDBViewMapper { private val UUID = "uuid" @@ -318,3 +371,18 @@ private[cosmosdb] object SubjectViewMapper extends CosmosDBViewMapper { private def selectClause(count: Boolean) = if (count) "TOP 1 VALUE COUNT(r)" else "r" } + +object CosmosDBViewMapper { + + def paramValue[T](params: SqlParameterCollection, key: String, clazz: Class[T]): Option[T] = { + val name = "@" + key + params.iterator().asScala.find(_.getName == name).map(_.getValue(clazz).asInstanceOf[T]) + } + + def createStatsToken(viewName: String, statName: String, collName: String): LogMarkerToken = { + val unit = MeasurementUnit.time.milliseconds + val tags = Map("view" -> viewName, "collection" -> collName) + if (TransactionId.metricsKamonTags) LogMarkerToken("cosmosdb", "query", statName, tags = tags)(unit) + else LogMarkerToken("cosmosdb", "query", collName, Some(statName))(unit) + } +}