Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,16 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).

## 1.15.1 - 2021-03-12

### Fixed
- Several views were throwing errors trying to access a None value in `EventSinkService` when a user was not logged in.
Replaced `get()` with `getOrElse()`.
- Consolidated field names sent by the EventSinkService to maximize reuse.
- Changed `EventSinkService` logging to debug to minimize chatter.
- Don't automatically create eventsink queue and bind it to eventsink exchange. Let clients do that so that we don't
have a queue for the eventsink filling up if there are no consumers.

## 1.15.0 - 2021-03-03

### Added
Expand Down
189 changes: 95 additions & 94 deletions app/services/EventSinkService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,33 +6,21 @@ import java.net.URI
import java.time.Instant
import play.api.{Logger, Play}
import play.api.Play.current
import play.api.libs.json.{JsValue, Json}
import play.api.libs.json.{JsObject, JsValue, Json}

object EventSinkService {
val EXCHANGE_NAME_CONFIG_KEY = "eventsink.exchangename"
val QUEUE_NAME_CONFIG_KEY = "eventsink.queuename"

val EXCHANGE_NAME_DEFAULT_VALUE = "clowder.metrics"
val QUEUE_NAME_DEFAULT_VALUE = "event.sink"

// TODO: Make sure these match the real config key names
val AMPLITUDE_CONFIG_KEY = "amplitude.apikey"
val GA_CONFIG_KEY = "google.analytics"
val INFLUX_AUTH_CONFIG_KEY = "influx.uri"
val MONGO_AUTH_CONFIG_KEY = "mongo.uri"
val QUEUE_NAME_DEFAULT_VALUE = ""
}

class EventSinkService {
val messageService: MessageService = DI.injector.getInstance(classOf[MessageService])
val userService: UserService = DI.injector.getInstance(classOf[UserService])
val appConfig: AppConfigurationService = DI.injector.getInstance(classOf[AppConfigurationService])

// UNUSED: Fetch directly from config on demand
def getGoogleAnalytics(): String = Play.configuration.getString(EventSinkService.GA_CONFIG_KEY).getOrElse("")
def getAmplitudeApiKey(): String = Play.configuration.getString(EventSinkService.AMPLITUDE_CONFIG_KEY).getOrElse("")
def getMongoAuth(): String = Play.configuration.getString(EventSinkService.AMPLITUDE_CONFIG_KEY).getOrElse("")
def getInfluxAuth(): String = Play.configuration.getString(EventSinkService.INFLUX_AUTH_CONFIG_KEY).getOrElse("")

/** Event Sink exchange name in RabbitMQ */
val exchangeName = Play.configuration.getString(EventSinkService.EXCHANGE_NAME_CONFIG_KEY)
.getOrElse(EventSinkService.EXCHANGE_NAME_DEFAULT_VALUE)
Expand All @@ -41,20 +29,22 @@ class EventSinkService {
val queueName = Play.configuration.getString(EventSinkService.QUEUE_NAME_CONFIG_KEY)
.getOrElse(EventSinkService.QUEUE_NAME_DEFAULT_VALUE)

def logEvent(category: String, metadata: JsValue) = {
Logger.info("eventsink.exchangename=" + exchangeName)
Logger.info("eventsink.queueName=" + queueName)

Logger.info("Submitting message to event sink exchange: " + Json.stringify(metadata))

//val message = EventSinkMessage(Instant.now().getEpochSecond, category, metadata)
messageService.submit(exchangeName, queueName, metadata, "fanout")
def logEvent(message: JsValue) = {
// Inject timestamp before logging the event
val event = message.as[JsObject] + ("created" -> Json.toJson(java.util.Date.from(Instant.now())))
Logger.info("Submitting message to event sink exchange: " + Json.stringify(event))
try {
messageService.submit(exchangeName, queueName, event, "fanout")
} catch {
case e: Throwable => { Logger.error("Failed to submit event sink message", e) }
}
}

/** Log an event when user signs up */
def logUserSignupEvent(user: User) = {
Logger.info("New user signed up: " + user.id.stringify)
logEvent("user_activity", Json.obj(
Logger.debug("New user signed up: " + user.id.stringify)
logEvent(Json.obj(
"category" -> "user_activity",
"type" -> "signup",
"user_id" -> user.id,
"user_name" -> user.fullName
Expand All @@ -63,8 +53,9 @@ class EventSinkService {

/** Log an event when user logs in */
def logUserLoginEvent(user: User) = {
Logger.info("User logged in: " + user.id.stringify)
logEvent("user_activity", Json.obj(
Logger.debug("User logged in: " + user.id.stringify)
logEvent(Json.obj(
"category" -> "user_activity",
"type" -> "login",
"user_id" -> user.id,
"user_name" -> user.fullName
Expand All @@ -73,193 +64,203 @@ class EventSinkService {

/** Log an event when user views a dataset */
def logDatasetViewEvent(dataset: Dataset, viewer: Option[User]) = {
Logger.info("User viewed a dataset: " + dataset.id.stringify)
logEvent("view_resource", Json.obj(
Logger.debug("User viewed a dataset: " + dataset.id.stringify)
logEvent(Json.obj(
"category" -> "view_resource",
"type" -> "dataset",
"resource_id" -> dataset.id,
"resource_name" -> dataset.name,
"author_id" -> dataset.author.id,
"author_name" -> dataset.author.fullName,
"viewer_id" -> viewer.get.id,
"viewer_name" -> viewer.get.getMiniUser.fullName
"user_id" -> viewer.getOrElse(User.anonymous).id,
"user_name" -> viewer.getOrElse(User.anonymous).getMiniUser.fullName
))
}

/** Log an event when user views a file */
def logFileViewEvent(file: File, viewer: Option[User]) = {
Logger.info("User viewed a file: " + file.id.stringify)
logEvent("view_resource", Json.obj(
Logger.debug("User viewed a file: " + file.id.stringify)
logEvent(Json.obj(
"category" -> "view_resource",
"type" -> "file",
"resource_id" -> file.id,
"resource_name" -> file.filename,
"author_id" -> file.author.id,
"author_name" -> file.author.fullName,
"viewer_id" -> viewer.get.id,
"viewer_name" -> viewer.get.getMiniUser.fullName
"user_id" -> viewer.getOrElse(User.anonymous).id,
"user_name" -> viewer.getOrElse(User.anonymous).getMiniUser.fullName
))
}

/** Log an event when user views a collection */
def logCollectionViewEvent(collection: Collection, viewer: Option[User]) = {
Logger.info("User viewed a collection: " + collection.id.stringify)
logEvent("view_resource", Json.obj(
Logger.debug("User viewed a collection: " + collection.id.stringify)
logEvent(Json.obj(
"category" -> "view_resource",
"type" -> "collection",
"resource_id" -> collection.id,
"resource_name" -> collection.name,
"author_id" -> collection.author.id,
"author_name" -> collection.author.fullName,
"viewer_id" -> viewer.get.id,
"viewer_name" -> viewer.get.getMiniUser.fullName
"user_id" -> viewer.getOrElse(User.anonymous).id,
"user_name" -> viewer.getOrElse(User.anonymous).getMiniUser.fullName
))
}

/** Log an event when user views a space */
def logSpaceViewEvent(space: ProjectSpace, viewer: Option[User]) = {
Logger.info("User viewed a space: " + space.id.stringify)
Logger.debug("User viewed a space: " + space.id.stringify)
(viewer, userService.get(space.creator)) match {
case (Some(v), Some(author)) => {
logEvent("view_resource", Json.obj(
logEvent(Json.obj(
"category" -> "view_resource",
"type" -> "space",
"resource_id" -> space.id,
"resource_name" -> space.name,
"author_id" -> space.creator.stringify,
"author_name" -> author.fullName,
"viewer_id" -> v.id,
"viewer_name" -> v.getMiniUser.fullName
"user_id" -> v.id,
"user_name" -> v.getMiniUser.fullName
))
}
case (None, Some(author)) => {
logEvent("view_resource", Json.obj(
logEvent(Json.obj(
"category" -> "view_resource",
"type" -> "space",
"resource_id" -> space.id,
"resource_name" -> space.name,
"author_id" -> author.id,
"author_name" -> author.fullName,
"viewer_id" -> "",
"viewer_name" -> "Anonymous"
"user_id" -> User.anonymous.id,
"user_name" -> User.anonymous.fullName
))
}
case (Some(v), None) => {
// TODO: Is this a real case? Is this needed?
logEvent("view_resource", Json.obj(
logEvent(Json.obj(
"category" -> "view_resource",
"type" -> "space",
"resource_id" -> space.id,
"resource_name" -> space.name,
"author_id" -> space.creator.stringify,
"author_name" -> "",
"viewer_id" -> v.id,
"viewer_name" -> v.getMiniUser.fullName
"user_id" -> v.id,
"user_name" -> v.getMiniUser.fullName
))
}
case (None, None) => {
// TODO: Is this a real case? Is this needed?
logEvent("view_resource", Json.obj(
logEvent(Json.obj(
"category" -> "view_resource",
"type" -> "space",
"resource_id" -> space.id,
"resource_name" -> space.name,
"author_id" -> space.creator.stringify,
"author_name" -> "",
"viewer_id" -> "",
"viewer_name" -> "Anonymous"
"user_id" -> User.anonymous.id,
"user_name" -> User.anonymous.fullName
))
}
}
}

def logSubmitFileToExtractorEvent(file: File, extractorName: String, submitter: Option[User]) = {
logEvent("extraction", Json.obj(
logEvent(Json.obj(
"category" -> "extraction",
"type" -> "file",
"extractor_name" -> extractorName,
"resource_id" -> file.id,
"resource_name" -> file.filename,
"author_id" -> file.author.id,
"author_name" -> file.author.fullName,
"submitter_id" -> submitter.get.id,
"submitter_name" -> submitter.get.getMiniUser.fullName
"user_id" -> submitter.getOrElse(User.anonymous).id,
"user_name" -> submitter.getOrElse(User.anonymous).getMiniUser.fullName
))
}

def logSubmitDatasetToExtractorEvent(dataset: Dataset, extractorName: String, submitter: Option[User]) = {
logEvent("extraction", Json.obj(
logEvent(Json.obj(
"category" -> "extraction",
"type" -> "dataset",
"extractor_name" -> extractorName,
"resource_id" -> dataset.id,
"resource_name" -> dataset.name,
"author_id" -> dataset.author.id,
"author_name" -> dataset.author.fullName,
"submitter_id" -> submitter.get.id,
"submitter_name" -> submitter.get.getMiniUser.fullName
"user_id" -> submitter.getOrElse(User.anonymous).id,
"user_name" -> submitter.getOrElse(User.anonymous).getMiniUser.fullName
))
}

def logSubmitSelectionToExtractorEvent(dataset: Dataset, extractorName: String, submitter: Option[User]) = {
// TODO: Is this a real case? Is this needed?
logEvent("extraction", Json.obj(
logEvent(Json.obj(
"category" -> "extraction",
"type" -> "selection",
"extractor_name" -> extractorName,
"resource_id" -> dataset.id,
"resource_name" -> dataset.name,
"author_id" -> dataset.author.id,
"author_name" -> dataset.author.fullName,
"submitter_id" -> submitter.get.id,
"submitter_name" -> submitter.get.getMiniUser.fullName
"user_id" -> submitter.getOrElse(User.anonymous).id,
"user_name" -> submitter.getOrElse(User.anonymous).getMiniUser.fullName
))
}

def logFileUploadEvent(file: File, dataset: Option[Dataset], uploader: Option[User]) = {
dataset match {
case Some(d) => {
logEvent("upload", Json.obj(
logEvent(Json.obj(
"category" -> "upload",
"dataset_id" -> d.id,
"dataset_name" -> d.name,
"dataset_author_name" -> d.author.fullName,
"dataset_author_id" -> d.author.id,
"uploader_id" -> uploader.get.id,
"uploader_name" -> uploader.get.getMiniUser.fullName,
"filename" -> file.filename,
"length" -> file.length
"author_name" -> d.author.fullName,
"author_id" -> d.author.id,
"user_id" -> uploader.getOrElse(User.anonymous).id,
"user_name" -> uploader.getOrElse(User.anonymous).getMiniUser.fullName,
"resource_name" -> file.filename,
"size" -> file.length
))
}
case None => {
logEvent("upload", Json.obj(
"uploader_id" -> uploader.get.id,
"uploader_name" -> uploader.get.getMiniUser.fullName,
"filename" -> file.filename,
"length" -> file.length
logEvent(Json.obj(
"category" -> "upload",
"user_id" -> uploader.getOrElse(User.anonymous).id,
"user_name" -> uploader.getOrElse(User.anonymous).getMiniUser.fullName,
"resource_name" -> file.filename,
"size" -> file.length
))
}
}
}

def logFileDownloadEvent(file: File, /*dataset: Dataset,*/ downloader: Option[User]) = {
logEvent("download", Json.obj(
/*"dataset_id" -> dataset.id,
"dataset_name" -> dataset.name,
"dataset_author_name" -> dataset.author.fullName,
"dataset_author_id" -> dataset.author.id,*/
def logFileDownloadEvent(file: File, downloader: Option[User]) = {
logEvent(Json.obj(
"category" -> "download",
"type" -> "file",
"uploader_id" -> file.author.id,
"uploader_name" -> file.author.fullName,
"downloader_id" -> downloader.get.id,
"downloader_name" -> downloader.get.getMiniUser.fullName,
"filename" -> file.filename,
"length" -> file.length
"resource_id" -> file.id,
"resource_name" -> file.filename,
"author_id" -> file.author.id,
"author_name" -> file.author.fullName,
"user_id" -> downloader.getOrElse(User.anonymous).id,
"user_name" -> downloader.getOrElse(User.anonymous).getMiniUser.fullName,
"size" -> file.length
))
}

def logDatasetDownloadEvent(dataset: Dataset, downloader: Option[User]) = {
logEvent("download", Json.obj(
logEvent(Json.obj(
"category" -> "download",
"type" -> "dataset",
"dataset_id" -> dataset.id,
"dataset_name" -> dataset.name,
"dataset_author_name" -> dataset.author.fullName,
"dataset_author_id" -> dataset.author.id,
"downloader_id" -> downloader.get.id,
"downloader_name" -> downloader.get.getMiniUser.fullName,
"files_length" -> dataset.files.length,
"folder_length" -> dataset.folders.length
"resource_id" -> dataset.id,
"resource_name" -> dataset.name,
"author_name" -> dataset.author.fullName,
"author_id" -> dataset.author.id,
"user_id" -> downloader.getOrElse(User.anonymous).id,
"user_name" -> downloader.getOrElse(User.anonymous).getMiniUser.fullName,
"size" -> (dataset.files.length + dataset.folders.length)
))
}
}

//case class EventSinkMessage(created: Long, category: String, metadata: JsValue)
//case class EventSinkMessage(created: Long, category: String, metadata: JsValue)
10 changes: 7 additions & 3 deletions app/services/rabbitmq/RabbitMQMessageService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -230,11 +230,15 @@ class RabbitMQMessageService extends MessageService {

/** Submit a message to broker. */
override def submit(exchange: String, routing_key: String, message: JsValue, exchange_type: String = "topic") = {
// This probably isn't going to extract queue (use other submit() for that) so make a new broker
connect()
val tempChannel = connection.get.createChannel()
tempChannel.exchangeDeclare(exchange, exchange_type, true)
tempChannel.queueDeclare(routing_key, true, false, false, null)
tempChannel.queueBind(routing_key, exchange, routing_key)

// If a routing_key (queue name) was provided, ensure that the queue exists
if (routing_key != "") {
tempChannel.queueDeclare(routing_key, true, false, false, null)
tempChannel.queueBind(routing_key, exchange, routing_key)
}
tempChannel.basicPublish(exchange, routing_key, null, message.toString.getBytes)
}

Expand Down
2 changes: 1 addition & 1 deletion doc/src/sphinx/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
author = 'Luigi Marini'

# The full version, including alpha/beta/rc tags
release = '1.15.0'
release = '1.15.1'


# -- General configuration ---------------------------------------------------
Expand Down
Loading