Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion common/scala/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ dependencies {

compile 'io.reactivex:rxscala_2.12:0.26.5'
compile 'io.reactivex:rxjava-reactive-streams:1.2.1'
compile ('com.microsoft.azure:azure-cosmosdb:2.6.1')
compile ('com.microsoft.azure:azure-cosmosdb:2.6.2')

compile ('com.lightbend.akka:akka-stream-alpakka-s3_2.12:1.0.1') {
exclude group: 'org.apache.httpcomponents' //Not used as alpakka uses akka-http
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,14 +363,20 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
.map(l => if (limit > 0) l.take(limit) else l)

val g = f.andThen {
case Success(out) =>
case Success(queryResult) =>
if (queryMetrics.nonEmpty) {
val combinedMetrics = QueryMetrics.ZERO.add(queryMetrics: _*)
logging.debug(
this,
s"[QueryMetricsEnabled] Collection [$collName] - Query [${querySpec.getQueryText}].\nQueryMetrics\n[$combinedMetrics]")
}
transid.finished(this, start, s"[QUERY] '$collName' completed: matched ${out.size}", InfoLevel)
val stats = viewMapper.recordQueryStats(ddoc, viewName, descending, querySpec.getParameters, queryResult)
val statsToLog = stats.map(s => " " + s).getOrElse("")
transid.finished(
this,
start,
s"[QUERY] '$collName' completed: matched ${queryResult.size}$statsToLog",
InfoLevel)
}
reportFailure(g, start, failure => s"[QUERY] '$collName' internal error, failure: '${failure.getMessage}'")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import java.util.Collections
import com.microsoft.azure.cosmosdb.DataType.{Number, String}
import com.microsoft.azure.cosmosdb.IndexKind.Range
import com.microsoft.azure.cosmosdb.{PartitionKeyDefinition, SqlParameter, SqlParameterCollection, SqlQuerySpec}
import kamon.metric.MeasurementUnit
import org.apache.openwhisk.common.{LogMarkerToken, TransactionId, WhiskInstants}
import org.apache.openwhisk.core.database.ActivationHandler.NS_PATH
import org.apache.openwhisk.core.database.WhisksHandler.ROOT_NS
import org.apache.openwhisk.core.database.cosmosdb.CosmosDBConstants.{alias, computed, deleted}
Expand All @@ -34,6 +36,10 @@ import org.apache.openwhisk.core.database.{
WhisksHandler
}
import org.apache.openwhisk.core.entity.WhiskQueries.TOP
import org.apache.openwhisk.utils.JsHelpers
import spray.json.{JsNumber, JsObject}

import scala.collection.JavaConverters._

private[cosmosdb] trait CosmosDBViewMapper {
protected val NOTHING = ""
Expand Down Expand Up @@ -71,6 +77,17 @@ private[cosmosdb] trait CosmosDBViewMapper {

new SqlQuerySpec(query, paramColl)
}

/**
* Records query related stats based on result returned and arguments passed
*
* @return an optional string representation of stats for logging purpose
*/
def recordQueryStats(ddoc: String,
viewName: String,
descending: Boolean,
queryParams: SqlParameterCollection,
result: List[JsObject]): Option[String] = None
}

private[cosmosdb] abstract class SimpleMapper extends CosmosDBViewMapper {
Expand Down Expand Up @@ -183,7 +200,8 @@ private[cosmosdb] object WhisksViewMapper extends SimpleMapper {
}

}
private[cosmosdb] object ActivationViewMapper extends SimpleMapper {
private[cosmosdb] object ActivationViewMapper extends SimpleMapper with WhiskInstants {
import CosmosDBViewMapper._
private val NS = "namespace"
private val NS_WITH_PATH = s"$computed.$NS_PATH"
private val START = "start"
Expand Down Expand Up @@ -233,6 +251,41 @@ private[cosmosdb] object ActivationViewMapper extends SimpleMapper {
case "activations" if ddoc.startsWith("whisks") => s"r.$START"
case _ => throw UnsupportedView(s"$ddoc/$view")
}

private val resultDeltaToken = createStatsToken("activations", "resultDelta", "activations")
private val sinceDeltaToken = createStatsToken("activations", "sinceDelta", "activations")

override def recordQueryStats(ddoc: String,
viewName: String,
descending: Boolean,
queryParams: SqlParameterCollection,
result: List[JsObject]): Option[String] = {
val stat = if (viewName == "activations" && descending) {
// Collect stats for the delta between
// 1. now and start time of last activation
// 2. now and start time as specific in query for `since` parameter
// These stats would help in determining how much old activations are being queried for list query (used in activation poll)
val uptoOpt = paramValue(queryParams, "upto", classOf[Number])
val startOpt = paramValue(queryParams, "start", classOf[Number])

// Result json has structure { id: "", "key": [], "value": {activation}}
// So fetch value of start via `value.start` path
val lastOpt = result.lastOption.flatMap(js => JsHelpers.getFieldPath(js, "value", "start"))

(uptoOpt, startOpt, lastOpt) match {
//Go for case which does not specify upto as that would be the case with poll based query
case (None, Some(startFromQuery), Some(JsNumber(start))) =>
val now = nowInMillis().toEpochMilli
val resultStartDelta = (now - start.longValue()).max(0)
val queryStartDelta = (now - startFromQuery.longValue()).max(0)
resultDeltaToken.histogram.record(resultStartDelta)
sinceDeltaToken.histogram.record(queryStartDelta)
Some(s"resultDelta=$resultStartDelta, sinceDelta=$queryStartDelta")
case _ => None
}
} else None
stat
}
}
private[cosmosdb] object SubjectViewMapper extends CosmosDBViewMapper {
private val UUID = "uuid"
Expand Down Expand Up @@ -318,3 +371,18 @@ private[cosmosdb] object SubjectViewMapper extends CosmosDBViewMapper {

private def selectClause(count: Boolean) = if (count) "TOP 1 VALUE COUNT(r)" else "r"
}

object CosmosDBViewMapper {

def paramValue[T](params: SqlParameterCollection, key: String, clazz: Class[T]): Option[T] = {
val name = "@" + key
params.iterator().asScala.find(_.getName == name).map(_.getValue(clazz).asInstanceOf[T])
}

def createStatsToken(viewName: String, statName: String, collName: String): LogMarkerToken = {
val unit = MeasurementUnit.time.milliseconds
val tags = Map("view" -> viewName, "collection" -> collName)
if (TransactionId.metricsKamonTags) LogMarkerToken("cosmosdb", "query", statName, tags = tags)(unit)
else LogMarkerToken("cosmosdb", "query", collName, Some(statName))(unit)
}
}