diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index 3a003fa..81b5cb4 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -14,4 +14,6 @@ flow { extra_params { key = ${?API_KEY} } + # maximum size of framing buffer (input json lines cannot be longer than this) + frame_length = 2048 } \ No newline at end of file diff --git a/src/main/scala/com/pagantis/singer/flows/BatchHttp.scala b/src/main/scala/com/pagantis/singer/flows/BatchHttp.scala index 486dcd8..58989ef 100644 --- a/src/main/scala/com/pagantis/singer/flows/BatchHttp.scala +++ b/src/main/scala/com/pagantis/singer/flows/BatchHttp.scala @@ -4,7 +4,7 @@ import akka.actor.ActorSystem import akka.event.{Logging, LoggingAdapter} import akka.http.scaladsl.Http import akka.stream.ActorMaterializer -import akka.stream.scaladsl.StreamConverters +import akka.stream.scaladsl.{JsonFraming, StreamConverters} import com.typesafe.config.ConfigFactory import scala.concurrent.duration.Duration @@ -31,6 +31,7 @@ object BatchHttp extends App { val endpoint = config.getString("flow.endpoint") val parallelism = config.getInt("flow.parallelism") + val frameLength = config.getInt("flow.frame_length") val startTime = System.nanoTime() @@ -44,6 +45,7 @@ object BatchHttp extends App { val flowComputation = StreamConverters .fromInputStream(() => inputStream) + .via(JsonFraming.objectScanner(frameLength)) .log(clazz) .mapConcat(_.utf8String.split("\n").toList) .log(clazz)