Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,6 @@ flow {
extra_params {
key = ${?API_KEY}
}
# maximum size of framing buffer (input json lines cannot be longer than this)
frame_length = 2048
}
4 changes: 3 additions & 1 deletion src/main/scala/com/pagantis/singer/flows/BatchHttp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import akka.actor.ActorSystem
import akka.event.{Logging, LoggingAdapter}
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.StreamConverters
import akka.stream.scaladsl.{JsonFraming, StreamConverters}
import com.typesafe.config.ConfigFactory

import scala.concurrent.duration.Duration
Expand All @@ -31,6 +31,7 @@ object BatchHttp extends App {

val endpoint = config.getString("flow.endpoint")
val parallelism = config.getInt("flow.parallelism")
val frameLength = config.getInt("flow.frame_length")

val startTime = System.nanoTime()

Expand All @@ -44,6 +45,7 @@ object BatchHttp extends App {
val flowComputation =
StreamConverters
.fromInputStream(() => inputStream)
.via(JsonFraming.objectScanner(frameLength))
.log(clazz)
.mapConcat(_.utf8String.split("\n").toList)
.log(clazz)
Expand Down