From 2d942b15ee7984366773fbc6147737c70c9d5633 Mon Sep 17 00:00:00 2001 From: dcereijodo Date: Fri, 13 Dec 2019 20:27:16 +0100 Subject: [PATCH 01/13] use a natural request representation --- .../singer/flows/it/TestRequest.scala | 39 ++--- .../com/pagantis/singer/flows/BatchHttp.scala | 2 +- .../com/pagantis/singer/flows/Request.scala | 157 ++++++++---------- .../singer/flows/test/TestRequest.scala | 72 ++++---- 4 files changed, 134 insertions(+), 136 deletions(-) diff --git a/src/it/scala/com/pagantis/singer/flows/it/TestRequest.scala b/src/it/scala/com/pagantis/singer/flows/it/TestRequest.scala index cb26b62..c662f98 100644 --- a/src/it/scala/com/pagantis/singer/flows/it/TestRequest.scala +++ b/src/it/scala/com/pagantis/singer/flows/it/TestRequest.scala @@ -27,18 +27,18 @@ class TestRequest extends FlatSpec with Matchers with DefaultJsonProtocol with S // init actor system, loggers and execution context implicit val system: ActorSystem = ActorSystem("BatchHttp") implicit val materializer: ActorMaterializer = ActorMaterializer() - implicit val standardLogger: LoggingAdapter = Logging(system, clazz) + implicit val standardLogger: LoggingAdapter = Logging(system, getClass.getName) implicit val ec: ExecutionContextExecutor = system.dispatcher - implicit val connectionPool = Http().cachedHostConnectionPoolHttps[Request]("jsonplaceholder.typicode.com") + private implicit val connectionPool: Flow[(HttpRequest, Request), (Try[HttpResponse], Request), HostConnectionPool] = + Http().cachedHostConnectionPoolHttps[Request]("jsonplaceholder.typicode.com") - implicit val defaultPatience = + private implicit val defaultPatience: PatienceConfig = PatienceConfig(timeout = Span(2, Seconds), interval = Span(5, Millis)) import Request._ - def makeRequestAndHandle(line: String)(implicit connectionPool: Flow[(HttpRequest, Request),(Try[HttpResponse], Request), HostConnectionPool]) = { - + private def makeRequestAndHandle(line: String)(implicit connectionPool: Flow[(HttpRequest, Request),(Try[HttpResponse], Request), HostConnectionPool]) = { Source.single(line) .map( line => { @@ -53,18 +53,20 @@ class TestRequest extends FlatSpec with Matchers with DefaultJsonProtocol with S } "Request" should "get comments by post_id" in { - - whenReady(makeRequestAndHandle("""{"query": {"post_id": 1}, "path": "/comments"}""")) { + whenReady(makeRequestAndHandle("""{"request": {"get": {"query": {"post_id": 1}, "path": "/comments"}}}""")) { response => response.parseJson.asJsObject.fields("response") shouldBe a[JsArray] } - whenReady(makeRequestAndHandle("""{"query": {"post_id": 1}, "path": "/comments", "context": "CvKL8"}""")) { + whenReady(makeRequestAndHandle("""{"request": {"get": {"query": {"post_id": 1}, "path": "/comments"}}, "context": "CvKL8"}""")) { response => { val responseAsJson = response.parseJson.asJsObject val fields = responseAsJson.fields fields("request") shouldBe JsObject( - "query" -> JsObject( - "post_id" -> JsNumber(1) + "get" -> JsObject( + "query" -> JsObject( + "post_id" -> JsNumber(1) + ), + "path" -> JsString("/comments") ) ) fields("response") shouldBe a[JsArray] @@ -76,20 +78,21 @@ class TestRequest extends FlatSpec with Matchers with DefaultJsonProtocol with S } } } - } "Request" should "post posts with user_id" in { - - whenReady(makeRequestAndHandle("""{"body": {"userId": 1, "title": "foo", "body": "bar"}, "path": "/posts"}""")) { + whenReady(makeRequestAndHandle("""{"request": {"post": {"body": {"userId": 1, "title": "foo", "body": "bar"}, "path": "/posts"}}}""")) { response => { val responseAsJson = response.parseJson.asJsObject val fields = responseAsJson.fields fields("request") shouldBe JsObject( - "body" -> JsObject( - "title" -> JsString("foo"), - "body" -> JsString("bar"), - "userId" -> JsNumber(1) + "post" -> JsObject( + "body" -> JsObject( + "title" -> JsString("foo"), + "body" -> JsString("bar"), + "userId" -> JsNumber(1) + ), + "path" -> JsString("/posts") ) ) fields("response") shouldBe JsObject( @@ -105,8 +108,6 @@ class TestRequest extends FlatSpec with Matchers with DefaultJsonProtocol with S } } } - } - } diff --git a/src/main/scala/com/pagantis/singer/flows/BatchHttp.scala b/src/main/scala/com/pagantis/singer/flows/BatchHttp.scala index 58d084e..de2d5ae 100644 --- a/src/main/scala/com/pagantis/singer/flows/BatchHttp.scala +++ b/src/main/scala/com/pagantis/singer/flows/BatchHttp.scala @@ -37,7 +37,7 @@ object BatchHttp extends App { // This shutdown sequence was copied from another related issue: https://github.com/akka/akka-http/issues/907#issuecomment-345288919 def shutdownSequence = { for { - http <- Http().shutdownAllConnectionPools() + _ <- Http().shutdownAllConnectionPools() akka <- system.terminate() } yield akka } diff --git a/src/main/scala/com/pagantis/singer/flows/Request.scala b/src/main/scala/com/pagantis/singer/flows/Request.scala index 2bf3abe..312f378 100644 --- a/src/main/scala/com/pagantis/singer/flows/Request.scala +++ b/src/main/scala/com/pagantis/singer/flows/Request.scala @@ -3,92 +3,86 @@ package com.pagantis.singer.flows import java.time.LocalDateTime import java.time.format.DateTimeFormatter -import akka.http.scaladsl.model.{ContentTypes, HttpEntity, HttpMethods, HttpRequest, HttpResponse, Uri} -import akka.http.scaladsl.model.Uri.Query +import akka.http.scaladsl.model.{ContentTypes, HttpEntity, HttpHeader, HttpMethods, HttpRequest, HttpResponse, Uri} +import akka.http.scaladsl.model.Uri.{Path, Query} +import akka.http.scaladsl.model.headers.RawHeader import akka.stream.Materializer import akka.util.ByteString import com.typesafe.config.ConfigFactory import net.ceedubs.ficus.Ficus._ import spray.json._ +import scala.concurrent.{ExecutionContextExecutor, Future} import scala.util.{Failure, Success, Try} object Request { - val config = ConfigFactory.load() + private val config = ConfigFactory.load() - val extraParams = config.as[Map[String, String]]("flow.extra_params") - val defaultPath = config.getString("flow.path") - - def fromLine(line: String) = { - - val message = line.parseJson - val fields = message.asJsObject.fields - - (fields.get("query"), fields.get("body"), fields.get("path"), fields.get("context")) match { - - case (Some(_), Some(_), _, _) => - throw new Exception - - case (Some(jsonQueryParams), None, optPath, optContext) => - - val asStringMap = jsonQueryParams - .asJsObject - .fields - - val optStringPath = optPath collect { case JsString(value) => value } - - GetRequest(asStringMap ++ extraParams.map(pair => (pair._1, JsString(pair._2))), optStringPath, optContext) + private val extraParams: Map[String, String] = + config.as[Option[Map[String, String]]]("flow.extra_params") match { + case None => Map() + case Some(params) => params + } - case (None, Some(jsonBody), optPath, optContext) => + private val defaultPath = config.getString("flow.path") - val optStringPath = optPath collect { case JsString(value) => value } + def fromLine(line: String): Request = { - PostRequest(jsonBody.asJsObject, optStringPath, optContext) + val message = line.parseJson + val rootFields = message.asJsObject.fields + val request = rootFields.get("request") + val optContext = rootFields.get("context") + + request match { + case Some(JsObject(requestFields)) => + (requestFields.get("get"), requestFields.get("post")) match { + case (Some(_), Some(_)) => throw InvalidRequestException("'get' and 'post' methods are mutually exclusive") + case (Some(JsObject(methodContents)), None) => Request(GET, methodContents, optContext) + case (None, Some(JsObject(methodContents))) => Request(POST, methodContents, optContext) + case _ => throw new InvalidRequestException + } + case _ => throw new InvalidRequestException } - } - def parseResponse(triedResponse: (Try[HttpResponse], Request))(implicit am: Materializer) = { + } - implicit val ec = am.executionContext + def parseResponse(triedResponse: (Try[HttpResponse], Request))(implicit am: Materializer): Future[String] = { + implicit val ec: ExecutionContextExecutor = am.executionContext triedResponse match { case (Success(response), request) => Request.fromHttpResponse(response).map(request.toLine(_)) case (Failure(exception), _) => throw exception } - } - - - def fromHttpResponse(response: HttpResponse)(implicit am: Materializer) = { - + def fromHttpResponse(response: HttpResponse)(implicit am: Materializer): Future[JsValue] = { import spray.json._ - implicit val ec = am.executionContext + implicit val ec: ExecutionContextExecutor = am.executionContext val responseAsJson = response.entity.dataBytes.runFold(ByteString(""))(_ ++ _) map (body => body.utf8String.parseJson) - responseAsJson - } - } +sealed trait RequestMethod +case object GET extends RequestMethod +case object POST extends RequestMethod -trait Request { - - val context: Option[JsValue] - - def toAkkaRequest: HttpRequest +case class Request(method: RequestMethod, methodContents: Map[String, JsValue], context: Option[JsValue]) { - def outputRequest: JsObject + def outputRequest: JsObject = { + method match { + case GET => JsObject("get" -> JsObject(methodContents)) + case POST => JsObject("post" -> JsObject(methodContents)) + } + } def toLine(response: JsValue, extractedAt: LocalDateTime = LocalDateTime.now()): String = { - val request = outputRequest val requestAndResponse = @@ -106,49 +100,42 @@ trait Request { JsObject(outputKeys).compactPrint } -} - -case class GetRequest(queryParams: Map[String, JsValue], path: Option[String] = None, context: Option[JsValue]) extends Request { + private def buildQueryString(query: Option[JsValue]) = { + query match { + case Some(JsObject(fields)) => Some(fields.mapValues(_.toString) ++ Request.extraParams) + case None if Request.extraParams.nonEmpty => Some(Request.extraParams) + case None => None + case _ => throw InvalidRequestException("'query' member must be key-value map") + } + } - override def toAkkaRequest: HttpRequest = { + def buildHeaders(headers: Option[JsValue]): collection.immutable.Seq[HttpHeader] = { + headers match { + case Some(JsObject(fields)) => fields map { case (header, value) => RawHeader(header, value.toString) } toList + case None => List() + case _ => throw InvalidRequestException("'headers' member must be key-value map") + } + } - val query = Query( - queryParams - .collect { - case (key, JsString(value)) => (key, value) - case (key, JsNumber(value)) => (key, value.toString) - } - ) + def baseRequest: HttpRequest = { + val queryString = buildQueryString(methodContents.get("query")) + val headers = buildHeaders(methodContents.get("header")) HttpRequest( - method = HttpMethods.GET, - uri = path match { - case None => Uri(Request.defaultPath).withQuery(query) - case Some(path) => Uri(path).withQuery(query) - } + uri = methodContents.get("path") match { + case Some(JsString(path)) => Uri(path = Path(path), queryString = queryString.map(Query(_).toString)) + case _ => Uri(path = Path(Request.defaultPath), queryString = queryString.map(Query(_).toString)) + }, + headers = headers ) } - override def outputRequest: JsObject = { - - val noSecrets = queryParams.filter(param => !Request.extraParams.contains(param._1)) - - JsObject("query" -> JsObject(noSecrets)) + def toAkkaRequest: HttpRequest = { + method match { + case GET => baseRequest.withMethod(HttpMethods.GET) + case POST => baseRequest.withMethod(HttpMethods.POST).withEntity( + HttpEntity(ContentTypes.`application/json`, methodContents.get("body").map(_.compactPrint).getOrElse("")) + ) + } } - -} - -case class PostRequest(body: JsObject, path: Option[String] = None, context: Option[JsValue]) extends Request { - - override def toAkkaRequest: HttpRequest = HttpRequest( - method = HttpMethods.POST, - uri = path match { - case None => Uri(Request.defaultPath) - case Some(path) => Uri(path) - }, - entity = HttpEntity(ContentTypes.`application/json`, body.compactPrint) - ) - - override def outputRequest: JsObject = JsObject("body" -> body) - -} +} \ No newline at end of file diff --git a/src/test/scala/com/pagantis/singer/flows/test/TestRequest.scala b/src/test/scala/com/pagantis/singer/flows/test/TestRequest.scala index a5baab6..c352fec 100644 --- a/src/test/scala/com/pagantis/singer/flows/test/TestRequest.scala +++ b/src/test/scala/com/pagantis/singer/flows/test/TestRequest.scala @@ -1,45 +1,55 @@ package com.pagantis.singer.flows.test -import com.pagantis.singer.flows.{GetRequest, PostRequest, Request} -import org.scalatest.{FlatSpec, Matchers} -import spray.json.{DefaultJsonProtocol, JsNumber, JsObject, JsString} - -class TestRequest extends FlatSpec with Matchers with DefaultJsonProtocol { - - val query = JsObject( - "query" -> JsObject( - "parm1" -> JsString("value1"), - "parm2" -> JsString("value2") +import com.pagantis.singer.flows.{GET, POST, Request} +import org.scalatest.{FlatSpec, Inside, Matchers} +import spray.json.{DefaultJsonProtocol, JsNumber, JsObject, JsString, JsValue} + +class TestRequest extends FlatSpec with Matchers with DefaultJsonProtocol with Inside{ + + val get: JsObject = JsObject( + "get" -> JsObject( + "query" -> JsObject( + "parm1" -> JsString("value1"), + "parm2" -> JsString("value2") + ) ) ) - val body = JsObject( - "body" -> JsObject( - "parm1" -> JsString("value1"), - "parm2" -> JsString("value2") + val post: JsObject = JsObject( + "post" -> JsObject( + "body" -> JsObject( + "parm1" -> JsString("value1"), + "parm2" -> JsString("value2") + ) ) ) - "Request" should "create GET request when a query map is passed" in { - - Request.fromLine(query.compactPrint) shouldBe a[GetRequest] - Request.fromLine( - JsObject( - query.fields + ("context" -> JsString("some_id")) - ) compactPrint - ) shouldBe a[GetRequest] + private def wrapRequest(methodObject: JsObject, optContext: Option[JsValue] = None) = optContext match { + case Some(context) => JsObject("request" -> methodObject, "context" -> context) + case None => JsObject("request" -> methodObject) + } + "Request" should "create GET request when a get method object is passed" in { + inside(Request.fromLine(wrapRequest(get).compactPrint)) { + case Request(method, _, _) => method shouldBe GET + } + inside(Request.fromLine(wrapRequest(get, Some(JsString("some_id"))).compactPrint)) { + case Request(method, _, _) => method shouldBe GET + } } "Request" should "create POST request when a body is passed" in { - - Request.fromLine(body.compactPrint) shouldBe a[PostRequest] - Request.fromLine( - JsObject( - body.fields + ("context" -> JsObject(Map("type" -> JsString("order"), "id" -> JsNumber(746)))) - ) compactPrint - ) shouldBe a[PostRequest] - + inside(Request.fromLine(wrapRequest(post).compactPrint)) { + case Request(method, _, _) => method shouldBe POST + } + val request = + Request + .fromLine(wrapRequest( + post, + Some(JsObject("context" -> JsObject(Map("type" -> JsString("order"), "id" -> JsNumber(746))))) + ).compactPrint) + inside(request) { + case Request(method, _, _) => method shouldBe POST + } } - } From 6b998190caa0647af21dccab7e24dd005793c847 Mon Sep 17 00:00:00 2001 From: dcereijodo Date: Fri, 13 Dec 2019 21:22:30 +0100 Subject: [PATCH 02/13] add missing exception --- .../com/pagantis/singer/flows/InvalidRequestException.scala | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 src/main/scala/com/pagantis/singer/flows/InvalidRequestException.scala diff --git a/src/main/scala/com/pagantis/singer/flows/InvalidRequestException.scala b/src/main/scala/com/pagantis/singer/flows/InvalidRequestException.scala new file mode 100644 index 0000000..f5cad31 --- /dev/null +++ b/src/main/scala/com/pagantis/singer/flows/InvalidRequestException.scala @@ -0,0 +1,3 @@ +package com.pagantis.singer.flows + +case class InvalidRequestException(message: String = "Invalid request representation https://github.com/digitalorigin/batch-http/issues/17") extends Exception(message) \ No newline at end of file From 0ae627d62d77d040b7780febdb9ccaca3dfd1398 Mon Sep 17 00:00:00 2001 From: dcereijodo Date: Sat, 14 Dec 2019 11:32:22 +0100 Subject: [PATCH 03/13] the request method is a field instead of a full JSON object --- .../singer/flows/it/TestRequest.scala | 33 +++++------ .../com/pagantis/singer/flows/Request.scala | 57 +++++++++---------- .../singer/flows/test/TestRequest.scala | 41 +++++++------ 3 files changed, 67 insertions(+), 64 deletions(-) diff --git a/src/it/scala/com/pagantis/singer/flows/it/TestRequest.scala b/src/it/scala/com/pagantis/singer/flows/it/TestRequest.scala index c662f98..1354cda 100644 --- a/src/it/scala/com/pagantis/singer/flows/it/TestRequest.scala +++ b/src/it/scala/com/pagantis/singer/flows/it/TestRequest.scala @@ -11,7 +11,6 @@ import akka.http.scaladsl.model.{HttpRequest, HttpResponse} import akka.stream.ActorMaterializer import akka.stream.scaladsl.Flow import akka.stream.scaladsl.{Sink, Source} -import com.pagantis.singer.flows.BatchHttp.clazz import com.pagantis.singer.flows.Request import org.scalatest.concurrent.ScalaFutures import org.scalatest.time.{Millis, Seconds, Span} @@ -53,21 +52,20 @@ class TestRequest extends FlatSpec with Matchers with DefaultJsonProtocol with S } "Request" should "get comments by post_id" in { - whenReady(makeRequestAndHandle("""{"request": {"get": {"query": {"post_id": 1}, "path": "/comments"}}}""")) { + whenReady(makeRequestAndHandle("""{"request": {"method": "GET", "query": {"post_id": 1}, "path": "/comments"}}""")) { response => response.parseJson.asJsObject.fields("response") shouldBe a[JsArray] } - whenReady(makeRequestAndHandle("""{"request": {"get": {"query": {"post_id": 1}, "path": "/comments"}}, "context": "CvKL8"}""")) { + whenReady(makeRequestAndHandle("""{"request": {"method": "GET", "query": {"post_id": 1}, "path": "/comments"}, "context": "CvKL8"}""")) { response => { val responseAsJson = response.parseJson.asJsObject val fields = responseAsJson.fields fields("request") shouldBe JsObject( - "get" -> JsObject( - "query" -> JsObject( - "post_id" -> JsNumber(1) - ), - "path" -> JsString("/comments") - ) + "method" -> JsString("GET"), + "query" -> JsObject( + "post_id" -> JsNumber(1) + ), + "path" -> JsString("/comments") ) fields("response") shouldBe a[JsArray] fields("context") shouldBe JsString("CvKL8") @@ -81,19 +79,18 @@ class TestRequest extends FlatSpec with Matchers with DefaultJsonProtocol with S } "Request" should "post posts with user_id" in { - whenReady(makeRequestAndHandle("""{"request": {"post": {"body": {"userId": 1, "title": "foo", "body": "bar"}, "path": "/posts"}}}""")) { + whenReady(makeRequestAndHandle("""{"request": {"method": "POST", "body": {"userId": 1, "title": "foo", "body": "bar"}, "path": "/posts"}}""")) { response => { val responseAsJson = response.parseJson.asJsObject val fields = responseAsJson.fields fields("request") shouldBe JsObject( - "post" -> JsObject( - "body" -> JsObject( - "title" -> JsString("foo"), - "body" -> JsString("bar"), - "userId" -> JsNumber(1) - ), - "path" -> JsString("/posts") - ) + "method" -> JsString("POST"), + "body" -> JsObject( + "title" -> JsString("foo"), + "body" -> JsString("bar"), + "userId" -> JsNumber(1) + ), + "path" -> JsString("/posts") ) fields("response") shouldBe JsObject( "id" -> JsNumber(101), diff --git a/src/main/scala/com/pagantis/singer/flows/Request.scala b/src/main/scala/com/pagantis/singer/flows/Request.scala index 312f378..61e4160 100644 --- a/src/main/scala/com/pagantis/singer/flows/Request.scala +++ b/src/main/scala/com/pagantis/singer/flows/Request.scala @@ -3,7 +3,7 @@ package com.pagantis.singer.flows import java.time.LocalDateTime import java.time.format.DateTimeFormatter -import akka.http.scaladsl.model.{ContentTypes, HttpEntity, HttpHeader, HttpMethods, HttpRequest, HttpResponse, Uri} +import akka.http.scaladsl.model.{ContentTypes, HttpEntity, HttpHeader, HttpMethod, HttpMethods, HttpRequest, HttpResponse, RequestEntity, Uri} import akka.http.scaladsl.model.Uri.{Path, Query} import akka.http.scaladsl.model.headers.RawHeader import akka.stream.Materializer @@ -13,6 +13,7 @@ import net.ceedubs.ficus.Ficus._ import spray.json._ import scala.concurrent.{ExecutionContextExecutor, Future} +import scala.language.postfixOps import scala.util.{Failure, Success, Try} object Request { @@ -36,11 +37,13 @@ object Request { request match { case Some(JsObject(requestFields)) => - (requestFields.get("get"), requestFields.get("post")) match { - case (Some(_), Some(_)) => throw InvalidRequestException("'get' and 'post' methods are mutually exclusive") - case (Some(JsObject(methodContents)), None) => Request(GET, methodContents, optContext) - case (None, Some(JsObject(methodContents))) => Request(POST, methodContents, optContext) - case _ => throw new InvalidRequestException + requestFields.get("method") match { + case Some(JsString(method)) => HttpMethods.getForKey(method) match { + case Some(httpMethod) => Request(httpMethod, requestFields, optContext) + case None => throw InvalidRequestException("'method' must be a valid HTTP method") + } + case Some(_) => throw InvalidRequestException("'method' must be a JSON string") + case _ => Request(HttpMethods.GET, requestFields, optContext) } case _ => throw new InvalidRequestException } @@ -69,18 +72,10 @@ object Request { } } -sealed trait RequestMethod -case object GET extends RequestMethod -case object POST extends RequestMethod +case class Request(method: HttpMethod, methodContents: Map[String, JsValue], context: Option[JsValue]) { -case class Request(method: RequestMethod, methodContents: Map[String, JsValue], context: Option[JsValue]) { - - def outputRequest: JsObject = { - method match { - case GET => JsObject("get" -> JsObject(methodContents)) - case POST => JsObject("post" -> JsObject(methodContents)) - } - } + def outputRequest: JsObject = + JsObject(methodContents + ("method" -> JsString(method.value))) def toLine(response: JsValue, extractedAt: LocalDateTime = LocalDateTime.now()): String = { val request = outputRequest @@ -109,7 +104,7 @@ case class Request(method: RequestMethod, methodContents: Map[String, JsValue], } } - def buildHeaders(headers: Option[JsValue]): collection.immutable.Seq[HttpHeader] = { + private def buildHeaders(headers: Option[JsValue]): List[HttpHeader] = { headers match { case Some(JsObject(fields)) => fields map { case (header, value) => RawHeader(header, value.toString) } toList case None => List() @@ -117,25 +112,29 @@ case class Request(method: RequestMethod, methodContents: Map[String, JsValue], } } - def baseRequest: HttpRequest = { + private def buildBody(body: Option[JsValue]): RequestEntity = { + body match { + case Some(body) => body match { + case JsObject(_) => HttpEntity(ContentTypes.`application/json`, body.compactPrint) + case _ => throw InvalidRequestException("'body' must be a full JSON object") + } + case None => HttpEntity.Empty + } + } + + def toAkkaRequest: HttpRequest = { val queryString = buildQueryString(methodContents.get("query")) val headers = buildHeaders(methodContents.get("header")) + val entity = buildBody(methodContents.get("body") ) HttpRequest( + method = method, uri = methodContents.get("path") match { case Some(JsString(path)) => Uri(path = Path(path), queryString = queryString.map(Query(_).toString)) case _ => Uri(path = Path(Request.defaultPath), queryString = queryString.map(Query(_).toString)) }, - headers = headers + headers = headers, + entity = entity ) } - - def toAkkaRequest: HttpRequest = { - method match { - case GET => baseRequest.withMethod(HttpMethods.GET) - case POST => baseRequest.withMethod(HttpMethods.POST).withEntity( - HttpEntity(ContentTypes.`application/json`, methodContents.get("body").map(_.compactPrint).getOrElse("")) - ) - } - } } \ No newline at end of file diff --git a/src/test/scala/com/pagantis/singer/flows/test/TestRequest.scala b/src/test/scala/com/pagantis/singer/flows/test/TestRequest.scala index c352fec..596c906 100644 --- a/src/test/scala/com/pagantis/singer/flows/test/TestRequest.scala +++ b/src/test/scala/com/pagantis/singer/flows/test/TestRequest.scala @@ -1,26 +1,25 @@ package com.pagantis.singer.flows.test -import com.pagantis.singer.flows.{GET, POST, Request} +import akka.http.scaladsl.model.HttpMethods +import com.pagantis.singer.flows.Request import org.scalatest.{FlatSpec, Inside, Matchers} import spray.json.{DefaultJsonProtocol, JsNumber, JsObject, JsString, JsValue} class TestRequest extends FlatSpec with Matchers with DefaultJsonProtocol with Inside{ val get: JsObject = JsObject( - "get" -> JsObject( - "query" -> JsObject( - "parm1" -> JsString("value1"), - "parm2" -> JsString("value2") - ) + "method" -> JsString("GET"), + "query" -> JsObject( + "parm1" -> JsString("value1"), + "parm2" -> JsString("value2") ) ) val post: JsObject = JsObject( - "post" -> JsObject( - "body" -> JsObject( - "parm1" -> JsString("value1"), - "parm2" -> JsString("value2") - ) + "method" -> JsString("POST"), + "body" -> JsObject( + "parm1" -> JsString("value1"), + "parm2" -> JsString("value2") ) ) @@ -31,25 +30,33 @@ class TestRequest extends FlatSpec with Matchers with DefaultJsonProtocol with I "Request" should "create GET request when a get method object is passed" in { inside(Request.fromLine(wrapRequest(get).compactPrint)) { - case Request(method, _, _) => method shouldBe GET + case Request(method, _, _) => method shouldBe HttpMethods.GET } - inside(Request.fromLine(wrapRequest(get, Some(JsString("some_id"))).compactPrint)) { - case Request(method, _, _) => method shouldBe GET + val context = JsString("some_id") + inside(Request.fromLine(wrapRequest(get, Some(context)).compactPrint)) { + case Request(method, _, Some(requestContext)) => + method shouldBe HttpMethods.GET + requestContext shouldBe context + case _ => fail } } "Request" should "create POST request when a body is passed" in { inside(Request.fromLine(wrapRequest(post).compactPrint)) { - case Request(method, _, _) => method shouldBe POST + case Request(method, _, _) => method shouldBe HttpMethods.POST } + val context = JsObject("context" -> JsObject(Map("type" -> JsString("order"), "id" -> JsNumber(746)))) val request = Request .fromLine(wrapRequest( post, - Some(JsObject("context" -> JsObject(Map("type" -> JsString("order"), "id" -> JsNumber(746))))) + Some(context) ).compactPrint) inside(request) { - case Request(method, _, _) => method shouldBe POST + case Request(method, _, Some(requestContext)) => + method shouldBe HttpMethods.POST + requestContext shouldBe context + case _ => fail } } } From 2057b7da3954ea9b58c11fd947d3aefc1a2d9b3d Mon Sep 17 00:00:00 2001 From: dcereijodo Date: Sat, 14 Dec 2019 12:55:08 +0100 Subject: [PATCH 04/13] extract uri built into his own method --- .../com/pagantis/singer/flows/Request.scala | 44 ++++++++++++------- 1 file changed, 27 insertions(+), 17 deletions(-) diff --git a/src/main/scala/com/pagantis/singer/flows/Request.scala b/src/main/scala/com/pagantis/singer/flows/Request.scala index 61e4160..b45d338 100644 --- a/src/main/scala/com/pagantis/singer/flows/Request.scala +++ b/src/main/scala/com/pagantis/singer/flows/Request.scala @@ -29,7 +29,6 @@ object Request { private val defaultPath = config.getString("flow.path") def fromLine(line: String): Request = { - val message = line.parseJson val rootFields = message.asJsObject.fields val request = rootFields.get("request") @@ -47,8 +46,6 @@ object Request { } case _ => throw new InvalidRequestException } - - } def parseResponse(triedResponse: (Try[HttpResponse], Request))(implicit am: Materializer): Future[String] = { @@ -95,25 +92,25 @@ case class Request(method: HttpMethod, methodContents: Map[String, JsValue], con JsObject(outputKeys).compactPrint } - private def buildQueryString(query: Option[JsValue]) = { - query match { - case Some(JsObject(fields)) => Some(fields.mapValues(_.toString) ++ Request.extraParams) - case None if Request.extraParams.nonEmpty => Some(Request.extraParams) - case None => None + private def buildQuery(rawQuery: Option[JsValue]): Query = { + rawQuery match { + case Some(JsObject(fields)) => Query(fields.mapValues(_.toString) ++ Request.extraParams) + case None if Request.extraParams.nonEmpty => Query(Request.extraParams) + case None => Query() case _ => throw InvalidRequestException("'query' member must be key-value map") } } - private def buildHeaders(headers: Option[JsValue]): List[HttpHeader] = { - headers match { + private def buildHeaders(rawHeaders: Option[JsValue]): List[HttpHeader] = { + rawHeaders match { case Some(JsObject(fields)) => fields map { case (header, value) => RawHeader(header, value.toString) } toList case None => List() case _ => throw InvalidRequestException("'headers' member must be key-value map") } } - private def buildBody(body: Option[JsValue]): RequestEntity = { - body match { + private def buildBody(rawBody: Option[JsValue]): RequestEntity = { + rawBody match { case Some(body) => body match { case JsObject(_) => HttpEntity(ContentTypes.`application/json`, body.compactPrint) case _ => throw InvalidRequestException("'body' must be a full JSON object") @@ -122,17 +119,30 @@ case class Request(method: HttpMethod, methodContents: Map[String, JsValue], con } } + private def buildUri(optDomain: Option[JsValue], optPath: Option[JsValue], optQueryRaw: Option[JsValue]): Uri = { + val path = optPath match { + case Some(JsString(path)) => path + case Some(_) => throw InvalidRequestException("'path' must be an JSON string") + case None => Request.defaultPath + } + + val uri = optDomain match { + case Some(JsString(domain)) => Uri.from(host = domain, path = path) + case Some(_) => throw InvalidRequestException("'domain' must be an JSON string") + case None => Uri.from(path = path) + } + + uri.withQuery(buildQuery(optQueryRaw)) + } + def toAkkaRequest: HttpRequest = { - val queryString = buildQueryString(methodContents.get("query")) val headers = buildHeaders(methodContents.get("header")) val entity = buildBody(methodContents.get("body") ) + val uri = buildUri(methodContents.get("domain"), methodContents.get("path"), methodContents.get("query")) HttpRequest( method = method, - uri = methodContents.get("path") match { - case Some(JsString(path)) => Uri(path = Path(path), queryString = queryString.map(Query(_).toString)) - case _ => Uri(path = Path(Request.defaultPath), queryString = queryString.map(Query(_).toString)) - }, + uri = uri, headers = headers, entity = entity ) From fc72eb08b222a2ff7dd31d1b7cd7cf7e50036635 Mon Sep 17 00:00:00 2001 From: dcereijodo Date: Sat, 14 Dec 2019 14:34:37 +0100 Subject: [PATCH 05/13] wrap response body into his own object --- .../singer/flows/it/TestRequest.scala | 26 ++++++++++++------- .../com/pagantis/singer/flows/Request.scala | 6 +++-- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/src/it/scala/com/pagantis/singer/flows/it/TestRequest.scala b/src/it/scala/com/pagantis/singer/flows/it/TestRequest.scala index 1354cda..2eec6e9 100644 --- a/src/it/scala/com/pagantis/singer/flows/it/TestRequest.scala +++ b/src/it/scala/com/pagantis/singer/flows/it/TestRequest.scala @@ -53,13 +53,14 @@ class TestRequest extends FlatSpec with Matchers with DefaultJsonProtocol with S "Request" should "get comments by post_id" in { whenReady(makeRequestAndHandle("""{"request": {"method": "GET", "query": {"post_id": 1}, "path": "/comments"}}""")) { - response => response.parseJson.asJsObject.fields("response") shouldBe a[JsArray] + response => response.parseJson.asJsObject.fields("response").asJsObject.fields("body") shouldBe a[JsArray] } whenReady(makeRequestAndHandle("""{"request": {"method": "GET", "query": {"post_id": 1}, "path": "/comments"}, "context": "CvKL8"}""")) { - response => { - val responseAsJson = response.parseJson.asJsObject + responseRaw => { + val responseAsJson = responseRaw.parseJson.asJsObject val fields = responseAsJson.fields + fields("request") shouldBe JsObject( "method" -> JsString("GET"), "query" -> JsObject( @@ -67,9 +68,12 @@ class TestRequest extends FlatSpec with Matchers with DefaultJsonProtocol with S ), "path" -> JsString("/comments") ) - fields("response") shouldBe a[JsArray] + + val responseFields = fields("response").asJsObject.fields + responseFields("body") shouldBe a[JsArray] fields("context") shouldBe JsString("CvKL8") - inside (fields("extracted_at")) { + + inside (responseFields("responded_at")) { case JsString(extractedAt) => LocalDateTime.parse(extractedAt, DateTimeFormatter.ISO_DATE_TIME) shouldBe a[LocalDateTime] case _ => fail @@ -80,9 +84,10 @@ class TestRequest extends FlatSpec with Matchers with DefaultJsonProtocol with S "Request" should "post posts with user_id" in { whenReady(makeRequestAndHandle("""{"request": {"method": "POST", "body": {"userId": 1, "title": "foo", "body": "bar"}, "path": "/posts"}}""")) { - response => { - val responseAsJson = response.parseJson.asJsObject + responseRaw => { + val responseAsJson = responseRaw.parseJson.asJsObject val fields = responseAsJson.fields + fields("request") shouldBe JsObject( "method" -> JsString("POST"), "body" -> JsObject( @@ -92,13 +97,16 @@ class TestRequest extends FlatSpec with Matchers with DefaultJsonProtocol with S ), "path" -> JsString("/posts") ) - fields("response") shouldBe JsObject( + + val response = fields("response") + response.asJsObject.fields("body") shouldBe JsObject( "id" -> JsNumber(101), "title" -> JsString("foo"), "body" -> JsString("bar"), "userId" -> JsNumber(1) ) - inside (fields("extracted_at")) { + + inside (response.asJsObject.fields("responded_at")) { case JsString(extractedAt) => LocalDateTime.parse(extractedAt, DateTimeFormatter.ISO_DATE_TIME) shouldBe a[LocalDateTime] case _ => fail diff --git a/src/main/scala/com/pagantis/singer/flows/Request.scala b/src/main/scala/com/pagantis/singer/flows/Request.scala index b45d338..3b20c42 100644 --- a/src/main/scala/com/pagantis/singer/flows/Request.scala +++ b/src/main/scala/com/pagantis/singer/flows/Request.scala @@ -80,8 +80,10 @@ case class Request(method: HttpMethod, methodContents: Map[String, JsValue], con val requestAndResponse = Map( "request" -> request, - "response" -> response, - "extracted_at" -> JsString(extractedAt.format(DateTimeFormatter.ISO_DATE_TIME)) + "response" -> JsObject( + "body" -> response, + "responded_at" -> JsString(extractedAt.format(DateTimeFormatter.ISO_DATE_TIME)) + ) ) val outputKeys = context match { From 2cfd1e9e0683963c5f7f39091c8cf5d40580feb1 Mon Sep 17 00:00:00 2001 From: dcereijodo Date: Sat, 14 Dec 2019 14:42:30 +0100 Subject: [PATCH 06/13] remove 'domain' parsin --- .../com/pagantis/singer/flows/Request.scala | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/src/main/scala/com/pagantis/singer/flows/Request.scala b/src/main/scala/com/pagantis/singer/flows/Request.scala index 3b20c42..d55573c 100644 --- a/src/main/scala/com/pagantis/singer/flows/Request.scala +++ b/src/main/scala/com/pagantis/singer/flows/Request.scala @@ -121,17 +121,11 @@ case class Request(method: HttpMethod, methodContents: Map[String, JsValue], con } } - private def buildUri(optDomain: Option[JsValue], optPath: Option[JsValue], optQueryRaw: Option[JsValue]): Uri = { - val path = optPath match { - case Some(JsString(path)) => path + private def buildUri(optPath: Option[JsValue], optQueryRaw: Option[JsValue]): Uri = { + val uri = optPath match { + case Some(JsString(path)) => Uri.from(path = path) case Some(_) => throw InvalidRequestException("'path' must be an JSON string") - case None => Request.defaultPath - } - - val uri = optDomain match { - case Some(JsString(domain)) => Uri.from(host = domain, path = path) - case Some(_) => throw InvalidRequestException("'domain' must be an JSON string") - case None => Uri.from(path = path) + case None => Uri.from(path = Request.defaultPath) } uri.withQuery(buildQuery(optQueryRaw)) @@ -140,7 +134,7 @@ case class Request(method: HttpMethod, methodContents: Map[String, JsValue], con def toAkkaRequest: HttpRequest = { val headers = buildHeaders(methodContents.get("header")) val entity = buildBody(methodContents.get("body") ) - val uri = buildUri(methodContents.get("domain"), methodContents.get("path"), methodContents.get("query")) + val uri = buildUri(methodContents.get("path"), methodContents.get("query")) HttpRequest( method = method, From 878cf7a171660bad2046128c502ede3b05da1abc Mon Sep 17 00:00:00 2001 From: dcereijodo Date: Sat, 14 Dec 2019 15:56:23 +0100 Subject: [PATCH 07/13] test 'headers' and 'query' options --- .../com/pagantis/singer/flows/Request.scala | 16 ++++++--- .../singer/flows/test/TestRequest.scala | 36 ++++++++++++++++--- 2 files changed, 43 insertions(+), 9 deletions(-) diff --git a/src/main/scala/com/pagantis/singer/flows/Request.scala b/src/main/scala/com/pagantis/singer/flows/Request.scala index d55573c..3f74148 100644 --- a/src/main/scala/com/pagantis/singer/flows/Request.scala +++ b/src/main/scala/com/pagantis/singer/flows/Request.scala @@ -4,7 +4,7 @@ import java.time.LocalDateTime import java.time.format.DateTimeFormatter import akka.http.scaladsl.model.{ContentTypes, HttpEntity, HttpHeader, HttpMethod, HttpMethods, HttpRequest, HttpResponse, RequestEntity, Uri} -import akka.http.scaladsl.model.Uri.{Path, Query} +import akka.http.scaladsl.model.Uri.Query import akka.http.scaladsl.model.headers.RawHeader import akka.stream.Materializer import akka.util.ByteString @@ -96,7 +96,12 @@ case class Request(method: HttpMethod, methodContents: Map[String, JsValue], con private def buildQuery(rawQuery: Option[JsValue]): Query = { rawQuery match { - case Some(JsObject(fields)) => Query(fields.mapValues(_.toString) ++ Request.extraParams) + case Some(JsObject(fields)) => Query( + fields.mapValues { + case JsString(value) => value + case value => throw InvalidRequestException(s"invalid query parameter $value: it must be a string") + } ++ Request.extraParams + ) case None if Request.extraParams.nonEmpty => Query(Request.extraParams) case None => Query() case _ => throw InvalidRequestException("'query' member must be key-value map") @@ -105,7 +110,10 @@ case class Request(method: HttpMethod, methodContents: Map[String, JsValue], con private def buildHeaders(rawHeaders: Option[JsValue]): List[HttpHeader] = { rawHeaders match { - case Some(JsObject(fields)) => fields map { case (header, value) => RawHeader(header, value.toString) } toList + case Some(JsObject(fields)) => fields map { + case (header, JsString(value)) => RawHeader(header, value) + case header => throw InvalidRequestException(s"invalid header $header: it must be a string") + } toList case None => List() case _ => throw InvalidRequestException("'headers' member must be key-value map") } @@ -132,7 +140,7 @@ case class Request(method: HttpMethod, methodContents: Map[String, JsValue], con } def toAkkaRequest: HttpRequest = { - val headers = buildHeaders(methodContents.get("header")) + val headers = buildHeaders(methodContents.get("headers")) val entity = buildBody(methodContents.get("body") ) val uri = buildUri(methodContents.get("path"), methodContents.get("query")) diff --git a/src/test/scala/com/pagantis/singer/flows/test/TestRequest.scala b/src/test/scala/com/pagantis/singer/flows/test/TestRequest.scala index 596c906..f002b6d 100644 --- a/src/test/scala/com/pagantis/singer/flows/test/TestRequest.scala +++ b/src/test/scala/com/pagantis/singer/flows/test/TestRequest.scala @@ -1,6 +1,7 @@ package com.pagantis.singer.flows.test -import akka.http.scaladsl.model.HttpMethods +import akka.http.scaladsl.model.headers.RawHeader +import akka.http.scaladsl.model.{HttpMethods, HttpRequest, Uri} import com.pagantis.singer.flows.Request import org.scalatest.{FlatSpec, Inside, Matchers} import spray.json.{DefaultJsonProtocol, JsNumber, JsObject, JsString, JsValue} @@ -12,6 +13,9 @@ class TestRequest extends FlatSpec with Matchers with DefaultJsonProtocol with I "query" -> JsObject( "parm1" -> JsString("value1"), "parm2" -> JsString("value2") + ), + "headers" -> JsObject( + "Some header key" -> JsString("Some header value") ) ) @@ -20,7 +24,11 @@ class TestRequest extends FlatSpec with Matchers with DefaultJsonProtocol with I "body" -> JsObject( "parm1" -> JsString("value1"), "parm2" -> JsString("value2") - ) + ), + "query" -> JsObject( + "var" -> JsString("var_value_1") + ), + "path" -> JsString("/some_path") ) private def wrapRequest(methodObject: JsObject, optContext: Option[JsValue] = None) = optContext match { @@ -28,10 +36,13 @@ class TestRequest extends FlatSpec with Matchers with DefaultJsonProtocol with I case None => JsObject("request" -> methodObject) } - "Request" should "create GET request when a get method object is passed" in { - inside(Request.fromLine(wrapRequest(get).compactPrint)) { + "Request" should "create GET request" in { + val request = Request.fromLine(wrapRequest(get).compactPrint) + + inside(request) { case Request(method, _, _) => method shouldBe HttpMethods.GET } + val context = JsString("some_id") inside(Request.fromLine(wrapRequest(get, Some(context)).compactPrint)) { case Request(method, _, Some(requestContext)) => @@ -39,12 +50,19 @@ class TestRequest extends FlatSpec with Matchers with DefaultJsonProtocol with I requestContext shouldBe context case _ => fail } + + inside(request.toAkkaRequest) { + case HttpRequest(_, _, headers, _, _) => + headers should contain (RawHeader("Some header key", "Some header value")) + case _ => fail + } } - "Request" should "create POST request when a body is passed" in { + "Request" should "create POST request" in { inside(Request.fromLine(wrapRequest(post).compactPrint)) { case Request(method, _, _) => method shouldBe HttpMethods.POST } + val context = JsObject("context" -> JsObject(Map("type" -> JsString("order"), "id" -> JsNumber(746)))) val request = Request @@ -52,11 +70,19 @@ class TestRequest extends FlatSpec with Matchers with DefaultJsonProtocol with I post, Some(context) ).compactPrint) + inside(request) { case Request(method, _, Some(requestContext)) => method shouldBe HttpMethods.POST requestContext shouldBe context case _ => fail } + + inside(request.toAkkaRequest) { + case HttpRequest(_, Uri(_, _, path, rawQueryString, _), _, _, _) => + rawQueryString shouldBe Some("var=var_value_1") + path.toString shouldBe "/some_path" + case _ => fail + } } } From 09b9692625511e6bcf707a7daf3fd80d79f8bb27 Mon Sep 17 00:00:00 2001 From: dcereijodo Date: Sat, 14 Dec 2019 16:28:54 +0100 Subject: [PATCH 08/13] allow numbers as valid query parameter and header values --- src/main/scala/com/pagantis/singer/flows/Request.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/main/scala/com/pagantis/singer/flows/Request.scala b/src/main/scala/com/pagantis/singer/flows/Request.scala index 3f74148..2652e90 100644 --- a/src/main/scala/com/pagantis/singer/flows/Request.scala +++ b/src/main/scala/com/pagantis/singer/flows/Request.scala @@ -99,7 +99,8 @@ case class Request(method: HttpMethod, methodContents: Map[String, JsValue], con case Some(JsObject(fields)) => Query( fields.mapValues { case JsString(value) => value - case value => throw InvalidRequestException(s"invalid query parameter $value: it must be a string") + case JsNumber(value) => value.toString + case value => throw InvalidRequestException(s"invalid query parameter $value: it must be a string or a number") } ++ Request.extraParams ) case None if Request.extraParams.nonEmpty => Query(Request.extraParams) @@ -112,7 +113,8 @@ case class Request(method: HttpMethod, methodContents: Map[String, JsValue], con rawHeaders match { case Some(JsObject(fields)) => fields map { case (header, JsString(value)) => RawHeader(header, value) - case header => throw InvalidRequestException(s"invalid header $header: it must be a string") + case (header, JsNumber(value)) => RawHeader(header, value.toString) + case header => throw InvalidRequestException(s"invalid header $header: it must be a string or a number") } toList case None => List() case _ => throw InvalidRequestException("'headers' member must be key-value map") From a1a81b44a50fa2c0f738626f500edc5b73be25c7 Mon Sep 17 00:00:00 2001 From: dcereijodo Date: Sat, 14 Dec 2019 17:27:18 +0100 Subject: [PATCH 09/13] add tests for request construction exceptions --- .../singer/flows/test/TestRequest.scala | 78 ++++++++++++------- 1 file changed, 50 insertions(+), 28 deletions(-) diff --git a/src/test/scala/com/pagantis/singer/flows/test/TestRequest.scala b/src/test/scala/com/pagantis/singer/flows/test/TestRequest.scala index f002b6d..6090c78 100644 --- a/src/test/scala/com/pagantis/singer/flows/test/TestRequest.scala +++ b/src/test/scala/com/pagantis/singer/flows/test/TestRequest.scala @@ -2,7 +2,7 @@ package com.pagantis.singer.flows.test import akka.http.scaladsl.model.headers.RawHeader import akka.http.scaladsl.model.{HttpMethods, HttpRequest, Uri} -import com.pagantis.singer.flows.Request +import com.pagantis.singer.flows.{InvalidRequestException, Request} import org.scalatest.{FlatSpec, Inside, Matchers} import spray.json.{DefaultJsonProtocol, JsNumber, JsObject, JsString, JsValue} @@ -39,50 +39,72 @@ class TestRequest extends FlatSpec with Matchers with DefaultJsonProtocol with I "Request" should "create GET request" in { val request = Request.fromLine(wrapRequest(get).compactPrint) - inside(request) { - case Request(method, _, _) => method shouldBe HttpMethods.GET + inside(request.toAkkaRequest) { + case HttpRequest(HttpMethods.GET, _, headers, _, _) => + headers should contain (RawHeader("Some header key", "Some header value")) + case _ => fail } val context = JsString("some_id") - inside(Request.fromLine(wrapRequest(get, Some(context)).compactPrint)) { + inside(request) { case Request(method, _, Some(requestContext)) => method shouldBe HttpMethods.GET requestContext shouldBe context case _ => fail } - - inside(request.toAkkaRequest) { - case HttpRequest(_, _, headers, _, _) => - headers should contain (RawHeader("Some header key", "Some header value")) - case _ => fail - } } "Request" should "create POST request" in { - inside(Request.fromLine(wrapRequest(post).compactPrint)) { - case Request(method, _, _) => method shouldBe HttpMethods.POST - } - val context = JsObject("context" -> JsObject(Map("type" -> JsString("order"), "id" -> JsNumber(746)))) - val request = - Request - .fromLine(wrapRequest( - post, - Some(context) - ).compactPrint) - - inside(request) { - case Request(method, _, Some(requestContext)) => - method shouldBe HttpMethods.POST - requestContext shouldBe context - case _ => fail - } + val request = Request.fromLine(wrapRequest(post, Some(context)).compactPrint) inside(request.toAkkaRequest) { - case HttpRequest(_, Uri(_, _, path, rawQueryString, _), _, _, _) => + case HttpRequest(HttpMethods.POST, Uri(_, _, path, rawQueryString, _), _, _, _) => rawQueryString shouldBe Some("var=var_value_1") path.toString shouldBe "/some_path" case _ => fail } + + inside(request) { + case Request(_, _, Some(requestContext)) => + requestContext shouldBe context + case _ => fail + } + } + + "Request" should "fail invalid representations" in { + assertThrows[InvalidRequestException] { Request.fromLine("""{}""") } + + (intercept[InvalidRequestException] { + Request.fromLine("""{"request": {"method": "INVALID_METHOD"}}""") + } getMessage) shouldBe "'method' must be a valid HTTP method" + + (intercept[InvalidRequestException] { + Request.fromLine("""{"request": {"method": 200}}""") + } getMessage) shouldBe "'method' must be a JSON string" + + (intercept[InvalidRequestException] { + Request.fromLine("""{"request": {"method": "GET", "query": {"param": {}}}}""").toAkkaRequest + } getMessage) shouldBe "invalid query parameter {}: it must be a string or a number" + + (intercept[InvalidRequestException] { + Request.fromLine("""{"request": {"method": "GET", "query": "query_val"}}""").toAkkaRequest + } getMessage) shouldBe "'query' member must be key-value map" + + (intercept[InvalidRequestException] { + Request.fromLine("""{"request": {"method": "GET", "headers": {"header_1": {}}}}""").toAkkaRequest + } getMessage) shouldBe "invalid header (header_1,{}): it must be a string or a number" + + (intercept[InvalidRequestException] { + Request.fromLine("""{"request": {"method": "GET", "headers": "header_val"}}""").toAkkaRequest + } getMessage) shouldBe "'headers' member must be key-value map" + + (intercept[InvalidRequestException] { + Request.fromLine("""{"request": {"method": "POST", "body": "body_val"}}""").toAkkaRequest + } getMessage) shouldBe "'body' must be a full JSON object" + + (intercept[InvalidRequestException] { + Request.fromLine("""{"request": {"method": "POST", "body": {}, "path": {}}}""").toAkkaRequest + } getMessage) shouldBe "'path' must be an JSON string" } } From 8f567bf1f3b5c5be66b958b5d634ace4856ba576 Mon Sep 17 00:00:00 2001 From: dcereijodo Date: Mon, 16 Dec 2019 11:50:49 +0100 Subject: [PATCH 10/13] fix broken test --- .../scala/com/pagantis/singer/flows/test/TestRequest.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/scala/com/pagantis/singer/flows/test/TestRequest.scala b/src/test/scala/com/pagantis/singer/flows/test/TestRequest.scala index 6090c78..cb88c73 100644 --- a/src/test/scala/com/pagantis/singer/flows/test/TestRequest.scala +++ b/src/test/scala/com/pagantis/singer/flows/test/TestRequest.scala @@ -37,7 +37,8 @@ class TestRequest extends FlatSpec with Matchers with DefaultJsonProtocol with I } "Request" should "create GET request" in { - val request = Request.fromLine(wrapRequest(get).compactPrint) + val context = JsString("some_id") + val request = Request.fromLine(wrapRequest(get, Some(context)).compactPrint) inside(request.toAkkaRequest) { case HttpRequest(HttpMethods.GET, _, headers, _, _) => @@ -45,7 +46,6 @@ class TestRequest extends FlatSpec with Matchers with DefaultJsonProtocol with I case _ => fail } - val context = JsString("some_id") inside(request) { case Request(method, _, Some(requestContext)) => method shouldBe HttpMethods.GET From d5bda09f2f75c84afd03ee73178d833ed23f3521 Mon Sep 17 00:00:00 2001 From: dcereijodo Date: Mon, 16 Dec 2019 14:48:43 +0100 Subject: [PATCH 11/13] update readme --- README.md | 51 +++++++++++++++++++++++++++++---------------------- 1 file changed, 29 insertions(+), 22 deletions(-) diff --git a/README.md b/README.md index 9ec03a2..2ce8eb4 100644 --- a/README.md +++ b/README.md @@ -1,25 +1,27 @@ # BatchHttp [![CircleCI](https://circleci.com/gh/digitalorigin/batch-http.svg?style=svg&circle-token=d196d5b828e9e0debb5c25f04e7279c1f342d675)](https://circleci.com/gh/digitalorigin/batch-http) -A tool for processing data batches through a REST API. It reads the `stdin` for JSON lines, converts each line to an HTTP call and -then it makes the call. Finally it prints both the input line and the response to the `stdout`. - -For example, when passed a JSON string (compacted in a single line) +A tool for processing HTTP request batches through a REST API. It reads the `stdin` for JSON lines representing HTTP requests, +converts each line to an HTTP and executes it, providing both the request and the response as an output in the `stdout`. +For example, when passed a JSON string such as ```json { - "query": { - "address": "1600 Amphitheatre Parkway, Mountain View, 9090", - "region":"es", - "language":"es" - } + "request": { + "method": "GET", + "path": "/maps/api/geocode/", + "query": { + "address": "1600 Amphitheatre Parkway, Mountain View, 9090", + "region":"es", + "language":"es" + } + } } ``` - -it will make a request with a query parameters string +provided the `endpoint` configuration value is set to `maps.googleapis.com`, it will make a `GET` request to the endpoint on ```console -region=es&language=es&address=1600+Amphitheatre+Parkway,+Mountain+View,+CA +https://maps.googleapis.com/maps/api/geocode/region=es&language=es&address=1600+Amphitheatre+Parkway,+Mountain+View,+CA ``` -The `endpoint` and `path` of the request can defined in the `application.conf` configuration file or they can be -overridden by the `endpoint` and `path` keys in the input JSON payload (takes precedence). The configuration +The `endpoint` and `port` of the requests must be defined in the `application.conf` configuration file, whilst the `path` +can be defined both in the configuration or in the `request` object takes precedence). The configuration file also supports additional secret query parameters. So with the following configuration file we can make a request to the [Google Geocoding service](https://developers.google.com/maps/documentation/geocoding/intro). ```hocon @@ -27,7 +29,6 @@ request to the [Google Geocoding service](https://developers.google.com/maps/doc flow { endpoint = "maps.googleapis.com" - path = "/maps/api/geocode/json" # additional parameters to be inculded in the http query if any extra_params { @@ -40,6 +41,8 @@ The results is a JSON line which gets printed to the `stdout` with both the `req ```json { "request": { + "method": "GET", + "path": "/maps/api/geocode/", "query": { "address": "1600 Amphitheatre Parkway, Mountain View, 9090", "region":"es", @@ -56,14 +59,18 @@ The results is a JSON line which gets printed to the `stdout` with both the `req } ``` -- If a `query` object is passed in the JSON, a request will be created with all parameters inside -the object as URL parameters. `GET` method will be used. - -- If a `body` object is passed in the JSON, a request will be created with the contents of `body` -in the request body. `POST` method will be used. +In general, to represent the HTTP request as a JSON object you should consider the following rules: +* Wrap your request in a `request` object in the root. This will keep alignment with the response representation, which will contain the `request` and the `response`. +* Inside your `request`, use a `method` field to indicate the [HTTP method](https://developer.mozilla.org/en-US/docs/Web/HTTP/Methods) to be used in the request. If no method is provided and no default method is configured, `GET` will be used. +* The following attributes can also be specified to further refine the request. + * A `path` string for passing the path to be used in the request. If no path is provided in the request or in the configuration, `/` will be used as default. + * A `query` object for passing a set of key-value query parameters to be used in the request + * A `headers` object for passing a set of key-value headers to be used in the request + * A `body` object (for `POST` method only) for sending a generic JSON object in the request body +* A response is represented in `response` object in the root. A `response` can also contain `headers` and `body`. The response status is represented in a `status` field. +* Optionally a `context` object can also be passed in the root to allow for context propagation. This allows annotating input records with metadata which will not be used in the request ([#3](https://github.com/dcereijodo/batch-http/issues/3)) +* Any object or key not specified above will be simply ignored. -Whatever is passed in the input JSON in the `context` key it will be propagated unaltered to the result. -This allows annotating input records with metadata which will not be used in the request ([#3](https://github.com/dcereijodo/batch-http/issues/3)) ## Configuration You can find examples and descriptions for all configurations supported by `batch-http` in the [sample configuration file](src/main/resources/application.conf). All this properties can be overridden on invocation by providing appropriate [JVM arguments](https://github.com/lightbend/config). From dfead291836598c1e542a1ef456699aa144fe246 Mon Sep 17 00:00:00 2001 From: dcereijodo Date: Mon, 16 Dec 2019 14:50:49 +0100 Subject: [PATCH 12/13] update readme --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 2ce8eb4..48853ab 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,7 @@ provided the `endpoint` configuration value is set to `maps.googleapis.com`, it https://maps.googleapis.com/maps/api/geocode/region=es&language=es&address=1600+Amphitheatre+Parkway,+Mountain+View,+CA ``` The `endpoint` and `port` of the requests must be defined in the `application.conf` configuration file, whilst the `path` -can be defined both in the configuration or in the `request` object takes precedence). The configuration +can be defined both in the configuration or in the `request` object (the second takes precedence). The configuration file also supports additional secret query parameters. So with the following configuration file we can make a request to the [Google Geocoding service](https://developers.google.com/maps/documentation/geocoding/intro). ```hocon From d30bec572b1e1db51de77580bed4063edf131828 Mon Sep 17 00:00:00 2001 From: dcereijodo Date: Mon, 16 Dec 2019 14:59:57 +0100 Subject: [PATCH 13/13] update readme --- README.md | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 48853ab..4a6b5e3 100644 --- a/README.md +++ b/README.md @@ -59,17 +59,18 @@ The results is a JSON line which gets printed to the `stdout` with both the `req } ``` -In general, to represent the HTTP request as a JSON object you should consider the following rules: -* Wrap your request in a `request` object in the root. This will keep alignment with the response representation, which will contain the `request` and the `response`. -* Inside your `request`, use a `method` field to indicate the [HTTP method](https://developer.mozilla.org/en-US/docs/Web/HTTP/Methods) to be used in the request. If no method is provided and no default method is configured, `GET` will be used. +In general, to represent the HTTP request as a JSON object some rules are contemplated ([#17](https://github.com/digitalorigin/batch-http/issues/17)) +* The request must be wrapped in a `request` object in the JSON root. +* Inside the `request`, a `method` field can be used to indicate the [HTTP method](https://developer.mozilla.org/en-US/docs/Web/HTTP/Methods) for the request. If no method is provided and no default method is configured, `GET` will be used. * The following attributes can also be specified to further refine the request. - * A `path` string for passing the path to be used in the request. If no path is provided in the request or in the configuration, `/` will be used as default. - * A `query` object for passing a set of key-value query parameters to be used in the request - * A `headers` object for passing a set of key-value headers to be used in the request - * A `body` object (for `POST` method only) for sending a generic JSON object in the request body -* A response is represented in `response` object in the root. A `response` can also contain `headers` and `body`. The response status is represented in a `status` field. + * A `path` string for passing the path to be used in the request. If no path is provided in the request or in the configuration, `/` will be used. + * A `query` object for passing a set of key-value query parameters to be used in the request. + * A `headers` object for passing a set of key-value headers to be used in the request. + * A `body` object for sending a generic JSON object in the request body. +* A response is represented in a `response` object in the root. A `response` can contain `headers` and `body` as well. The response status is represented in a `status` field. * Optionally a `context` object can also be passed in the root to allow for context propagation. This allows annotating input records with metadata which will not be used in the request ([#3](https://github.com/dcereijodo/batch-http/issues/3)) -* Any object or key not specified above will be simply ignored. + +Any object or key not specified above will be simply ignored. ## Configuration