From cf310630b57ae39d843ac2a9a61189b1e271788e Mon Sep 17 00:00:00 2001 From: Luigi Marini Date: Thu, 11 Mar 2021 09:56:15 -0600 Subject: [PATCH 1/7] Several views were throwing errors trying to access a None value in `EventSinkService` when a user was not logged in. Replaced `get()` with `getOrElse()`. --- CHANGELOG.md | 6 ++++ app/services/EventSinkService.scala | 48 ++++++++++++++--------------- doc/src/sphinx/conf.py | 2 +- project/Build.scala | 2 +- public/swagger.yml | 2 +- 5 files changed, 33 insertions(+), 27 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d97e68f03..47aa5b828 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/). +## 1.15.1 - 2021-03-12 + +### Fixed +- Several views were throwing errors trying to access a None value in `EventSinkService` when a user was not logged in. + Replaced `get()` with `getOrElse()`. + ## 1.15.0 - 2021-03-03 ### Added diff --git a/app/services/EventSinkService.scala b/app/services/EventSinkService.scala index 40e20a1e0..36dbbea2d 100644 --- a/app/services/EventSinkService.scala +++ b/app/services/EventSinkService.scala @@ -80,8 +80,8 @@ class EventSinkService { "resource_name" -> dataset.name, "author_id" -> dataset.author.id, "author_name" -> dataset.author.fullName, - "viewer_id" -> viewer.get.id, - "viewer_name" -> viewer.get.getMiniUser.fullName + "viewer_id" -> viewer.getOrElse(User.anonymous).id, + "viewer_name" -> viewer.getOrElse(User.anonymous).getMiniUser.fullName )) } @@ -94,8 +94,8 @@ class EventSinkService { "resource_name" -> file.filename, "author_id" -> file.author.id, "author_name" -> file.author.fullName, - "viewer_id" -> viewer.get.id, - "viewer_name" -> viewer.get.getMiniUser.fullName + "viewer_id" -> viewer.getOrElse(User.anonymous).id, + "viewer_name" -> viewer.getOrElse(User.anonymous).getMiniUser.fullName )) } @@ -108,8 +108,8 @@ class EventSinkService { "resource_name" -> collection.name, "author_id" -> collection.author.id, "author_name" -> collection.author.fullName, - "viewer_id" -> viewer.get.id, - "viewer_name" -> viewer.get.getMiniUser.fullName + "viewer_id" -> viewer.getOrElse(User.anonymous).id, + "viewer_name" -> viewer.getOrElse(User.anonymous).getMiniUser.fullName )) } @@ -135,8 +135,8 @@ class EventSinkService { "resource_name" -> space.name, "author_id" -> author.id, "author_name" -> author.fullName, - "viewer_id" -> "", - "viewer_name" -> "Anonymous" + "viewer_id" -> User.anonymous.id, + "viewer_name" -> User.anonymous.fullName )) } case (Some(v), None) => { @@ -159,8 +159,8 @@ class EventSinkService { "resource_name" -> space.name, "author_id" -> space.creator.stringify, "author_name" -> "", - "viewer_id" -> "", - "viewer_name" -> "Anonymous" + "viewer_id" -> User.anonymous.id, + "viewer_name" -> User.anonymous.fullName )) } } @@ -174,8 +174,8 @@ class EventSinkService { "resource_name" -> file.filename, "author_id" -> file.author.id, "author_name" -> file.author.fullName, - "submitter_id" -> submitter.get.id, - "submitter_name" -> submitter.get.getMiniUser.fullName + "submitter_id" -> submitter.getOrElse(User.anonymous).id, + "submitter_name" -> submitter.getOrElse(User.anonymous).getMiniUser.fullName )) } @@ -187,8 +187,8 @@ class EventSinkService { "resource_name" -> dataset.name, "author_id" -> dataset.author.id, "author_name" -> dataset.author.fullName, - "submitter_id" -> submitter.get.id, - "submitter_name" -> submitter.get.getMiniUser.fullName + "submitter_id" -> submitter.getOrElse(User.anonymous).id, + "submitter_name" -> submitter.getOrElse(User.anonymous).getMiniUser.fullName )) } @@ -201,8 +201,8 @@ class EventSinkService { "resource_name" -> dataset.name, "author_id" -> dataset.author.id, "author_name" -> dataset.author.fullName, - "submitter_id" -> submitter.get.id, - "submitter_name" -> submitter.get.getMiniUser.fullName + "submitter_id" -> submitter.getOrElse(User.anonymous).id, + "submitter_name" -> submitter.getOrElse(User.anonymous).getMiniUser.fullName )) } @@ -214,16 +214,16 @@ class EventSinkService { "dataset_name" -> d.name, "dataset_author_name" -> d.author.fullName, "dataset_author_id" -> d.author.id, - "uploader_id" -> uploader.get.id, - "uploader_name" -> uploader.get.getMiniUser.fullName, + "uploader_id" -> uploader.getOrElse(User.anonymous).id, + "uploader_name" -> uploader.getOrElse(User.anonymous).getMiniUser.fullName, "filename" -> file.filename, "length" -> file.length )) } case None => { logEvent("upload", Json.obj( - "uploader_id" -> uploader.get.id, - "uploader_name" -> uploader.get.getMiniUser.fullName, + "uploader_id" -> uploader.getOrElse(User.anonymous).id, + "uploader_name" -> uploader.getOrElse(User.anonymous).getMiniUser.fullName, "filename" -> file.filename, "length" -> file.length )) @@ -240,8 +240,8 @@ class EventSinkService { "type" -> "file", "uploader_id" -> file.author.id, "uploader_name" -> file.author.fullName, - "downloader_id" -> downloader.get.id, - "downloader_name" -> downloader.get.getMiniUser.fullName, + "downloader_id" -> downloader.getOrElse(User.anonymous).id, + "downloader_name" -> downloader.getOrElse(User.anonymous).getMiniUser.fullName, "filename" -> file.filename, "length" -> file.length )) @@ -254,8 +254,8 @@ class EventSinkService { "dataset_name" -> dataset.name, "dataset_author_name" -> dataset.author.fullName, "dataset_author_id" -> dataset.author.id, - "downloader_id" -> downloader.get.id, - "downloader_name" -> downloader.get.getMiniUser.fullName, + "downloader_id" -> downloader.getOrElse(User.anonymous).id, + "downloader_name" -> downloader.getOrElse(User.anonymous).getMiniUser.fullName, "files_length" -> dataset.files.length, "folder_length" -> dataset.folders.length )) diff --git a/doc/src/sphinx/conf.py b/doc/src/sphinx/conf.py index 2d856648e..24c97f9e2 100644 --- a/doc/src/sphinx/conf.py +++ b/doc/src/sphinx/conf.py @@ -22,7 +22,7 @@ author = 'Luigi Marini' # The full version, including alpha/beta/rc tags -release = '1.15.0' +release = '1.15.1' # -- General configuration --------------------------------------------------- diff --git a/project/Build.scala b/project/Build.scala index ab8b9a23f..79b39ed88 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -13,7 +13,7 @@ import NativePackagerKeys._ object ApplicationBuild extends Build { val appName = "clowder" - val version = "1.15.0" + val version = "1.15.1" val jvm = "1.7" def appVersion: String = { diff --git a/public/swagger.yml b/public/swagger.yml index 18303b04d..5b0545f4e 100644 --- a/public/swagger.yml +++ b/public/swagger.yml @@ -9,7 +9,7 @@ info: Clowder is a customizable and scalable data management system to support any data format and multiple research domains. It is under active development and deployed for a variety of research projects. - version: 1.15.0 + version: 1.15.1 termsOfService: https://clowder.ncsa.illinois.edu/clowder/tos contact: name: Clowder From 1e304e64ee16bfa6ae5148a200a62f81f1b745b6 Mon Sep 17 00:00:00 2001 From: Mike Lambert Date: Thu, 11 Mar 2021 11:53:39 -0600 Subject: [PATCH 2/7] Reuse field names across different event types in EventSinkService (#188) * Match EventSinkService fields to spreadsheet * Inject timestamp when sending event, turn down logging --- CHANGELOG.md | 6 + app/services/EventSinkService.scala | 181 ++++++++++++++-------------- 2 files changed, 95 insertions(+), 92 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d97e68f03..195c5e5c0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/). +## Unreleased + +### Changed + +- Consolidated field names sent by the EventSinkService to maxaimize reuse. + ## 1.15.0 - 2021-03-03 ### Added diff --git a/app/services/EventSinkService.scala b/app/services/EventSinkService.scala index 40e20a1e0..98e49ce28 100644 --- a/app/services/EventSinkService.scala +++ b/app/services/EventSinkService.scala @@ -6,7 +6,7 @@ import java.net.URI import java.time.Instant import play.api.{Logger, Play} import play.api.Play.current -import play.api.libs.json.{JsValue, Json} +import play.api.libs.json.{JsObject, JsValue, Json} object EventSinkService { val EXCHANGE_NAME_CONFIG_KEY = "eventsink.exchangename" @@ -14,12 +14,6 @@ object EventSinkService { val EXCHANGE_NAME_DEFAULT_VALUE = "clowder.metrics" val QUEUE_NAME_DEFAULT_VALUE = "event.sink" - - // TODO: Make sure these match the real config key names - val AMPLITUDE_CONFIG_KEY = "amplitude.apikey" - val GA_CONFIG_KEY = "google.analytics" - val INFLUX_AUTH_CONFIG_KEY = "influx.uri" - val MONGO_AUTH_CONFIG_KEY = "mongo.uri" } class EventSinkService { @@ -27,12 +21,6 @@ class EventSinkService { val userService: UserService = DI.injector.getInstance(classOf[UserService]) val appConfig: AppConfigurationService = DI.injector.getInstance(classOf[AppConfigurationService]) - // UNUSED: Fetch directly from config on demand - def getGoogleAnalytics(): String = Play.configuration.getString(EventSinkService.GA_CONFIG_KEY).getOrElse("") - def getAmplitudeApiKey(): String = Play.configuration.getString(EventSinkService.AMPLITUDE_CONFIG_KEY).getOrElse("") - def getMongoAuth(): String = Play.configuration.getString(EventSinkService.AMPLITUDE_CONFIG_KEY).getOrElse("") - def getInfluxAuth(): String = Play.configuration.getString(EventSinkService.INFLUX_AUTH_CONFIG_KEY).getOrElse("") - /** Event Sink exchange name in RabbitMQ */ val exchangeName = Play.configuration.getString(EventSinkService.EXCHANGE_NAME_CONFIG_KEY) .getOrElse(EventSinkService.EXCHANGE_NAME_DEFAULT_VALUE) @@ -41,20 +29,18 @@ class EventSinkService { val queueName = Play.configuration.getString(EventSinkService.QUEUE_NAME_CONFIG_KEY) .getOrElse(EventSinkService.QUEUE_NAME_DEFAULT_VALUE) - def logEvent(category: String, metadata: JsValue) = { - Logger.info("eventsink.exchangename=" + exchangeName) - Logger.info("eventsink.queueName=" + queueName) - - Logger.info("Submitting message to event sink exchange: " + Json.stringify(metadata)) - - //val message = EventSinkMessage(Instant.now().getEpochSecond, category, metadata) - messageService.submit(exchangeName, queueName, metadata, "fanout") + def logEvent(message: JsValue) = { + // Inject timestamp before logging the event + val event = message.as[JsObject] + ("created" -> Json.toJson(java.util.Date.from(Instant.now()))) + Logger.info("Submitting message to event sink exchange: " + Json.stringify(event)) + messageService.submit(exchangeName, queueName, event, "fanout") } /** Log an event when user signs up */ def logUserSignupEvent(user: User) = { - Logger.info("New user signed up: " + user.id.stringify) - logEvent("user_activity", Json.obj( + Logger.debug("New user signed up: " + user.id.stringify) + logEvent(Json.obj( + "category" -> "user_activity", "type" -> "signup", "user_id" -> user.id, "user_name" -> user.fullName @@ -63,8 +49,9 @@ class EventSinkService { /** Log an event when user logs in */ def logUserLoginEvent(user: User) = { - Logger.info("User logged in: " + user.id.stringify) - logEvent("user_activity", Json.obj( + Logger.debug("User logged in: " + user.id.stringify) + logEvent(Json.obj( + "category" -> "user_activity", "type" -> "login", "user_id" -> user.id, "user_name" -> user.fullName @@ -73,191 +60,201 @@ class EventSinkService { /** Log an event when user views a dataset */ def logDatasetViewEvent(dataset: Dataset, viewer: Option[User]) = { - Logger.info("User viewed a dataset: " + dataset.id.stringify) - logEvent("view_resource", Json.obj( + Logger.debug("User viewed a dataset: " + dataset.id.stringify) + logEvent(Json.obj( + "category" -> "view_resource", "type" -> "dataset", "resource_id" -> dataset.id, "resource_name" -> dataset.name, "author_id" -> dataset.author.id, "author_name" -> dataset.author.fullName, - "viewer_id" -> viewer.get.id, - "viewer_name" -> viewer.get.getMiniUser.fullName + "user_id" -> viewer.get.id, + "user_name" -> viewer.get.getMiniUser.fullName )) } /** Log an event when user views a file */ def logFileViewEvent(file: File, viewer: Option[User]) = { - Logger.info("User viewed a file: " + file.id.stringify) - logEvent("view_resource", Json.obj( + Logger.debug("User viewed a file: " + file.id.stringify) + logEvent(Json.obj( + "category" -> "view_resource", "type" -> "file", "resource_id" -> file.id, "resource_name" -> file.filename, "author_id" -> file.author.id, "author_name" -> file.author.fullName, - "viewer_id" -> viewer.get.id, - "viewer_name" -> viewer.get.getMiniUser.fullName + "user_id" -> viewer.get.id, + "user_name" -> viewer.get.getMiniUser.fullName )) } /** Log an event when user views a collection */ def logCollectionViewEvent(collection: Collection, viewer: Option[User]) = { - Logger.info("User viewed a collection: " + collection.id.stringify) - logEvent("view_resource", Json.obj( + Logger.debug("User viewed a collection: " + collection.id.stringify) + logEvent(Json.obj( + "category" -> "view_resource", "type" -> "collection", "resource_id" -> collection.id, "resource_name" -> collection.name, "author_id" -> collection.author.id, "author_name" -> collection.author.fullName, - "viewer_id" -> viewer.get.id, - "viewer_name" -> viewer.get.getMiniUser.fullName + "user_id" -> viewer.get.id, + "user_name" -> viewer.get.getMiniUser.fullName )) } /** Log an event when user views a space */ def logSpaceViewEvent(space: ProjectSpace, viewer: Option[User]) = { - Logger.info("User viewed a space: " + space.id.stringify) + Logger.debug("User viewed a space: " + space.id.stringify) (viewer, userService.get(space.creator)) match { case (Some(v), Some(author)) => { - logEvent("view_resource", Json.obj( + logEvent(Json.obj( + "category" -> "view_resource", "type" -> "space", "resource_id" -> space.id, "resource_name" -> space.name, "author_id" -> space.creator.stringify, "author_name" -> author.fullName, - "viewer_id" -> v.id, - "viewer_name" -> v.getMiniUser.fullName + "user_id" -> v.id, + "user_name" -> v.getMiniUser.fullName )) } case (None, Some(author)) => { - logEvent("view_resource", Json.obj( + logEvent(Json.obj( + "category" -> "view_resource", "type" -> "space", "resource_id" -> space.id, "resource_name" -> space.name, "author_id" -> author.id, "author_name" -> author.fullName, - "viewer_id" -> "", - "viewer_name" -> "Anonymous" + "user_id" -> "", + "user_name" -> "Anonymous" )) } case (Some(v), None) => { // TODO: Is this a real case? Is this needed? - logEvent("view_resource", Json.obj( + logEvent(Json.obj( + "category" -> "view_resource", "type" -> "space", "resource_id" -> space.id, "resource_name" -> space.name, "author_id" -> space.creator.stringify, "author_name" -> "", - "viewer_id" -> v.id, - "viewer_name" -> v.getMiniUser.fullName + "user_id" -> v.id, + "user_name" -> v.getMiniUser.fullName )) } case (None, None) => { // TODO: Is this a real case? Is this needed? - logEvent("view_resource", Json.obj( + logEvent(Json.obj( + "category" -> "view_resource", "type" -> "space", "resource_id" -> space.id, "resource_name" -> space.name, "author_id" -> space.creator.stringify, "author_name" -> "", - "viewer_id" -> "", - "viewer_name" -> "Anonymous" + "user_id" -> "", + "user_name" -> "Anonymous" )) } } } def logSubmitFileToExtractorEvent(file: File, extractorName: String, submitter: Option[User]) = { - logEvent("extraction", Json.obj( + logEvent(Json.obj( + "category" -> "extraction", "type" -> "file", "extractor_name" -> extractorName, "resource_id" -> file.id, "resource_name" -> file.filename, "author_id" -> file.author.id, "author_name" -> file.author.fullName, - "submitter_id" -> submitter.get.id, - "submitter_name" -> submitter.get.getMiniUser.fullName + "user_id" -> submitter.get.id, + "user_name" -> submitter.get.getMiniUser.fullName )) } def logSubmitDatasetToExtractorEvent(dataset: Dataset, extractorName: String, submitter: Option[User]) = { - logEvent("extraction", Json.obj( + logEvent(Json.obj( + "category" -> "extraction", "type" -> "dataset", "extractor_name" -> extractorName, "resource_id" -> dataset.id, "resource_name" -> dataset.name, "author_id" -> dataset.author.id, "author_name" -> dataset.author.fullName, - "submitter_id" -> submitter.get.id, - "submitter_name" -> submitter.get.getMiniUser.fullName + "user_id" -> submitter.get.id, + "user_name" -> submitter.get.getMiniUser.fullName )) } def logSubmitSelectionToExtractorEvent(dataset: Dataset, extractorName: String, submitter: Option[User]) = { // TODO: Is this a real case? Is this needed? - logEvent("extraction", Json.obj( + logEvent(Json.obj( + "category" -> "extraction", "type" -> "selection", "extractor_name" -> extractorName, "resource_id" -> dataset.id, "resource_name" -> dataset.name, "author_id" -> dataset.author.id, "author_name" -> dataset.author.fullName, - "submitter_id" -> submitter.get.id, - "submitter_name" -> submitter.get.getMiniUser.fullName + "user_id" -> submitter.get.id, + "user_name" -> submitter.get.getMiniUser.fullName )) } def logFileUploadEvent(file: File, dataset: Option[Dataset], uploader: Option[User]) = { dataset match { case Some(d) => { - logEvent("upload", Json.obj( + logEvent(Json.obj( + "category" -> "upload", "dataset_id" -> d.id, "dataset_name" -> d.name, - "dataset_author_name" -> d.author.fullName, - "dataset_author_id" -> d.author.id, - "uploader_id" -> uploader.get.id, - "uploader_name" -> uploader.get.getMiniUser.fullName, - "filename" -> file.filename, - "length" -> file.length + "author_name" -> d.author.fullName, + "author_id" -> d.author.id, + "user_id" -> uploader.get.id, + "user_name" -> uploader.get.getMiniUser.fullName, + "resource_name" -> file.filename, + "size" -> file.length )) } case None => { - logEvent("upload", Json.obj( - "uploader_id" -> uploader.get.id, - "uploader_name" -> uploader.get.getMiniUser.fullName, - "filename" -> file.filename, - "length" -> file.length + logEvent(Json.obj( + "category" -> "upload", + "user_id" -> uploader.get.id, + "user_name" -> uploader.get.getMiniUser.fullName, + "resource_name" -> file.filename, + "size" -> file.length )) } } } - def logFileDownloadEvent(file: File, /*dataset: Dataset,*/ downloader: Option[User]) = { - logEvent("download", Json.obj( - /*"dataset_id" -> dataset.id, - "dataset_name" -> dataset.name, - "dataset_author_name" -> dataset.author.fullName, - "dataset_author_id" -> dataset.author.id,*/ + def logFileDownloadEvent(file: File, downloader: Option[User]) = { + logEvent(Json.obj( + "category" -> "download", "type" -> "file", - "uploader_id" -> file.author.id, - "uploader_name" -> file.author.fullName, - "downloader_id" -> downloader.get.id, - "downloader_name" -> downloader.get.getMiniUser.fullName, - "filename" -> file.filename, - "length" -> file.length + "resource_id" -> file.id, + "resource_name" -> file.filename, + "author_id" -> file.author.id, + "author_name" -> file.author.fullName, + "user_id" -> downloader.get.id, + "user_name" -> downloader.get.getMiniUser.fullName, + "size" -> file.length )) } def logDatasetDownloadEvent(dataset: Dataset, downloader: Option[User]) = { - logEvent("download", Json.obj( + logEvent(Json.obj( + "category" -> "download", "type" -> "dataset", - "dataset_id" -> dataset.id, - "dataset_name" -> dataset.name, - "dataset_author_name" -> dataset.author.fullName, - "dataset_author_id" -> dataset.author.id, - "downloader_id" -> downloader.get.id, - "downloader_name" -> downloader.get.getMiniUser.fullName, - "files_length" -> dataset.files.length, - "folder_length" -> dataset.folders.length + "resource_id" -> dataset.id, + "resource_name" -> dataset.name, + "author_name" -> dataset.author.fullName, + "author_id" -> dataset.author.id, + "user_id" -> downloader.get.id, + "user_name" -> downloader.get.getMiniUser.fullName, + "size" -> (dataset.files.length + dataset.folders.length) )) } } From ad7e71a1a8f9448b08c76c42413afa66c202c1cf Mon Sep 17 00:00:00 2001 From: Luigi Marini Date: Thu, 11 Mar 2021 12:41:15 -0600 Subject: [PATCH 3/7] Don't automatically create eventsink queue and bind it to eventsink exchange. Let clients do that so that we don't have a queue for the eventsink filling up if there are no consumers. --- CHANGELOG.md | 2 ++ app/services/rabbitmq/RabbitMQMessageService.scala | 2 -- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a0a1c38c0..3afdf6a4e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/). Replaced `get()` with `getOrElse()`. - Consolidated field names sent by the EventSinkService to maximize reuse. - Changed `EventSinkService` logging to debug to minimize chatter. +- Don't automatically create eventsink queue and bind it to eventsink exchange. Let clients do that so that we don't + have a queue for the eventsink filling up if there are no consumers. ## 1.15.0 - 2021-03-03 diff --git a/app/services/rabbitmq/RabbitMQMessageService.scala b/app/services/rabbitmq/RabbitMQMessageService.scala index 8ca483d98..bf7910282 100644 --- a/app/services/rabbitmq/RabbitMQMessageService.scala +++ b/app/services/rabbitmq/RabbitMQMessageService.scala @@ -233,8 +233,6 @@ class RabbitMQMessageService extends MessageService { // This probably isn't going to extract queue (use other submit() for that) so make a new broker val tempChannel = connection.get.createChannel() tempChannel.exchangeDeclare(exchange, exchange_type, true) - tempChannel.queueDeclare(routing_key, true, false, false, null) - tempChannel.queueBind(routing_key, exchange, routing_key) tempChannel.basicPublish(exchange, routing_key, null, message.toString.getBytes) } From 7d2dd28d9eb02649fa362d62535700e37a1163d3 Mon Sep 17 00:00:00 2001 From: Luigi Marini Date: Mon, 15 Mar 2021 09:42:14 -0500 Subject: [PATCH 4/7] Removed queue from EventSinkService. Co-authored-by: Mike Lambert --- app/services/EventSinkService.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/app/services/EventSinkService.scala b/app/services/EventSinkService.scala index a75dcff7f..df63ab64a 100644 --- a/app/services/EventSinkService.scala +++ b/app/services/EventSinkService.scala @@ -13,7 +13,7 @@ object EventSinkService { val QUEUE_NAME_CONFIG_KEY = "eventsink.queuename" val EXCHANGE_NAME_DEFAULT_VALUE = "clowder.metrics" - val QUEUE_NAME_DEFAULT_VALUE = "event.sink" + val QUEUE_NAME_DEFAULT_VALUE = "" } class EventSinkService { @@ -259,4 +259,4 @@ class EventSinkService { } } -//case class EventSinkMessage(created: Long, category: String, metadata: JsValue) \ No newline at end of file +//case class EventSinkMessage(created: Long, category: String, metadata: JsValue) From d0b84db9954bd54bccce8263329cb843dd53b918 Mon Sep 17 00:00:00 2001 From: Luigi Marini Date: Mon, 15 Mar 2021 09:42:23 -0500 Subject: [PATCH 5/7] Removed queue from EventSinkService. Co-authored-by: Mike Lambert --- app/services/rabbitmq/RabbitMQMessageService.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/app/services/rabbitmq/RabbitMQMessageService.scala b/app/services/rabbitmq/RabbitMQMessageService.scala index bf7910282..7b50fad35 100644 --- a/app/services/rabbitmq/RabbitMQMessageService.scala +++ b/app/services/rabbitmq/RabbitMQMessageService.scala @@ -233,6 +233,12 @@ class RabbitMQMessageService extends MessageService { // This probably isn't going to extract queue (use other submit() for that) so make a new broker val tempChannel = connection.get.createChannel() tempChannel.exchangeDeclare(exchange, exchange_type, true) + + // If a routing_key (queue name) was provided, ensure that the queue exists + if (routing_key != "") { + tempChannel.queueDeclare(routing_key, true, false, false, null) + tempChannel.queueBind(routing_key, exchange, routing_key) + } tempChannel.basicPublish(exchange, routing_key, null, message.toString.getBytes) } From 27f151d56bc804a9d6e22e255135d16edb491af4 Mon Sep 17 00:00:00 2001 From: Luigi Marini Date: Tue, 16 Mar 2021 13:45:39 -0500 Subject: [PATCH 6/7] Update app/services/EventSinkService.scala Co-authored-by: Mike Lambert --- app/services/EventSinkService.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/app/services/EventSinkService.scala b/app/services/EventSinkService.scala index df63ab64a..5cd72b422 100644 --- a/app/services/EventSinkService.scala +++ b/app/services/EventSinkService.scala @@ -33,7 +33,11 @@ class EventSinkService { // Inject timestamp before logging the event val event = message.as[JsObject] + ("created" -> Json.toJson(java.util.Date.from(Instant.now()))) Logger.info("Submitting message to event sink exchange: " + Json.stringify(event)) - messageService.submit(exchangeName, queueName, event, "fanout") + try { + messageService.submit(exchangeName, queueName, event, "fanout") + } catch { + case e: Throwable => { Logger.error("Failed to submit event sink message", e) } + } } /** Log an event when user signs up */ From 69f47608fb0f310cb08d3cdb2f3142d78e919054 Mon Sep 17 00:00:00 2001 From: Luigi Marini Date: Tue, 16 Mar 2021 15:03:06 -0500 Subject: [PATCH 7/7] Make sure Clowder is connected to Rabbitmq when sending messages to an exchange. --- app/services/rabbitmq/RabbitMQMessageService.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/services/rabbitmq/RabbitMQMessageService.scala b/app/services/rabbitmq/RabbitMQMessageService.scala index 7b50fad35..cf2166328 100644 --- a/app/services/rabbitmq/RabbitMQMessageService.scala +++ b/app/services/rabbitmq/RabbitMQMessageService.scala @@ -230,7 +230,7 @@ class RabbitMQMessageService extends MessageService { /** Submit a message to broker. */ override def submit(exchange: String, routing_key: String, message: JsValue, exchange_type: String = "topic") = { - // This probably isn't going to extract queue (use other submit() for that) so make a new broker + connect() val tempChannel = connection.get.createChannel() tempChannel.exchangeDeclare(exchange, exchange_type, true)