Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions src/main/scala/com/pagantis/singer/flows/BatchHttp.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.pagantis.singer.flows

import akka.actor.ActorSystem
import akka.actor.{ActorSystem, Props}
import akka.event.{Logging, LoggingAdapter}
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
Expand Down Expand Up @@ -34,12 +34,16 @@ object BatchHttp extends App {
val parallelism = config.getInt("flow.parallelism")
val frameLength = config.getInt("flow.frame_length")

val startTime = System.nanoTime()

// This shutdown sequence was copied from another related issue: https://github.com/akka/akka-http/issues/907#issuecomment-345288919
def shutdownSequence =
Http().shutdownAllConnectionPools
def shutdownSequence = {
for {
http <- Http().shutdownAllConnectionPools()
akka <- system.terminate()
} yield akka
}


val counter = system.actorOf(Props[CountLogger], "counter")

import Request._

Expand All @@ -61,12 +65,14 @@ object BatchHttp extends App {
.log(clazz)
.mapAsync(parallelism)(parseResponse(_))
.log(clazz)
.map {
line: String =>
counter ! 1
line
}
.runForeach(println(_))

Await.ready(flowComputation, Duration.Inf)

standardLogger.info(s"Total execution time: ${(System.nanoTime - startTime)/1000000000} seconds")

Await.ready(shutdownSequence, Duration.Inf)

flowComputation.value match {
Expand Down
39 changes: 39 additions & 0 deletions src/main/scala/com/pagantis/singer/flows/CountLogger.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.pagantis.singer.flows

import java.time.LocalDateTime
import java.time.format.DateTimeFormatter

import akka.actor.Actor
import spray.json.{JsNumber, JsObject, JsString}

/**
* This is a helper actor that is instantiated on start up and receives messages
* from the main [[BatchHttp]] stream on every record processed. It maintains
* an internal counter of the current execution state and logs this information
* accordingly
*/
class CountLogger extends Actor with akka.actor.ActorLogging {
var total: Long = 0
val startTime: LocalDateTime = LocalDateTime.now()

private def logCount(count: Long): Unit = {
val currentTime = LocalDateTime.now()
val logRecord = JsObject(
Map(
"requests_processed" -> JsNumber(total),
"timestamp" -> JsString(currentTime.format(DateTimeFormatter.ISO_DATE_TIME)),
)
)
log.info(s"${logRecord.compactPrint}")
}

override def receive: Receive = {
case increment: Int =>
total = total + increment
if(total % 5000 == 0) logCount(total)

}

override def postStop(): Unit = logCount(total)

}