diff --git a/README.md b/README.md index 9ec03a2..4a6b5e3 100644 --- a/README.md +++ b/README.md @@ -1,25 +1,27 @@ # BatchHttp [![CircleCI](https://circleci.com/gh/digitalorigin/batch-http.svg?style=svg&circle-token=d196d5b828e9e0debb5c25f04e7279c1f342d675)](https://circleci.com/gh/digitalorigin/batch-http) -A tool for processing data batches through a REST API. It reads the `stdin` for JSON lines, converts each line to an HTTP call and -then it makes the call. Finally it prints both the input line and the response to the `stdout`. - -For example, when passed a JSON string (compacted in a single line) +A tool for processing HTTP request batches through a REST API. It reads the `stdin` for JSON lines representing HTTP requests, +converts each line to an HTTP and executes it, providing both the request and the response as an output in the `stdout`. +For example, when passed a JSON string such as ```json { - "query": { - "address": "1600 Amphitheatre Parkway, Mountain View, 9090", - "region":"es", - "language":"es" - } + "request": { + "method": "GET", + "path": "/maps/api/geocode/", + "query": { + "address": "1600 Amphitheatre Parkway, Mountain View, 9090", + "region":"es", + "language":"es" + } + } } ``` - -it will make a request with a query parameters string +provided the `endpoint` configuration value is set to `maps.googleapis.com`, it will make a `GET` request to the endpoint on ```console -region=es&language=es&address=1600+Amphitheatre+Parkway,+Mountain+View,+CA +https://maps.googleapis.com/maps/api/geocode/region=es&language=es&address=1600+Amphitheatre+Parkway,+Mountain+View,+CA ``` -The `endpoint` and `path` of the request can defined in the `application.conf` configuration file or they can be -overridden by the `endpoint` and `path` keys in the input JSON payload (takes precedence). The configuration +The `endpoint` and `port` of the requests must be defined in the `application.conf` configuration file, whilst the `path` +can be defined both in the configuration or in the `request` object (the second takes precedence). The configuration file also supports additional secret query parameters. So with the following configuration file we can make a request to the [Google Geocoding service](https://developers.google.com/maps/documentation/geocoding/intro). ```hocon @@ -27,7 +29,6 @@ request to the [Google Geocoding service](https://developers.google.com/maps/doc flow { endpoint = "maps.googleapis.com" - path = "/maps/api/geocode/json" # additional parameters to be inculded in the http query if any extra_params { @@ -40,6 +41,8 @@ The results is a JSON line which gets printed to the `stdout` with both the `req ```json { "request": { + "method": "GET", + "path": "/maps/api/geocode/", "query": { "address": "1600 Amphitheatre Parkway, Mountain View, 9090", "region":"es", @@ -56,14 +59,19 @@ The results is a JSON line which gets printed to the `stdout` with both the `req } ``` -- If a `query` object is passed in the JSON, a request will be created with all parameters inside -the object as URL parameters. `GET` method will be used. +In general, to represent the HTTP request as a JSON object some rules are contemplated ([#17](https://github.com/digitalorigin/batch-http/issues/17)) +* The request must be wrapped in a `request` object in the JSON root. +* Inside the `request`, a `method` field can be used to indicate the [HTTP method](https://developer.mozilla.org/en-US/docs/Web/HTTP/Methods) for the request. If no method is provided and no default method is configured, `GET` will be used. +* The following attributes can also be specified to further refine the request. + * A `path` string for passing the path to be used in the request. If no path is provided in the request or in the configuration, `/` will be used. + * A `query` object for passing a set of key-value query parameters to be used in the request. + * A `headers` object for passing a set of key-value headers to be used in the request. + * A `body` object for sending a generic JSON object in the request body. +* A response is represented in a `response` object in the root. A `response` can contain `headers` and `body` as well. The response status is represented in a `status` field. +* Optionally a `context` object can also be passed in the root to allow for context propagation. This allows annotating input records with metadata which will not be used in the request ([#3](https://github.com/dcereijodo/batch-http/issues/3)) -- If a `body` object is passed in the JSON, a request will be created with the contents of `body` -in the request body. `POST` method will be used. +Any object or key not specified above will be simply ignored. -Whatever is passed in the input JSON in the `context` key it will be propagated unaltered to the result. -This allows annotating input records with metadata which will not be used in the request ([#3](https://github.com/dcereijodo/batch-http/issues/3)) ## Configuration You can find examples and descriptions for all configurations supported by `batch-http` in the [sample configuration file](src/main/resources/application.conf). All this properties can be overridden on invocation by providing appropriate [JVM arguments](https://github.com/lightbend/config). diff --git a/src/it/scala/com/pagantis/singer/flows/it/TestRequest.scala b/src/it/scala/com/pagantis/singer/flows/it/TestRequest.scala index cb26b62..2eec6e9 100644 --- a/src/it/scala/com/pagantis/singer/flows/it/TestRequest.scala +++ b/src/it/scala/com/pagantis/singer/flows/it/TestRequest.scala @@ -11,7 +11,6 @@ import akka.http.scaladsl.model.{HttpRequest, HttpResponse} import akka.stream.ActorMaterializer import akka.stream.scaladsl.Flow import akka.stream.scaladsl.{Sink, Source} -import com.pagantis.singer.flows.BatchHttp.clazz import com.pagantis.singer.flows.Request import org.scalatest.concurrent.ScalaFutures import org.scalatest.time.{Millis, Seconds, Span} @@ -27,18 +26,18 @@ class TestRequest extends FlatSpec with Matchers with DefaultJsonProtocol with S // init actor system, loggers and execution context implicit val system: ActorSystem = ActorSystem("BatchHttp") implicit val materializer: ActorMaterializer = ActorMaterializer() - implicit val standardLogger: LoggingAdapter = Logging(system, clazz) + implicit val standardLogger: LoggingAdapter = Logging(system, getClass.getName) implicit val ec: ExecutionContextExecutor = system.dispatcher - implicit val connectionPool = Http().cachedHostConnectionPoolHttps[Request]("jsonplaceholder.typicode.com") + private implicit val connectionPool: Flow[(HttpRequest, Request), (Try[HttpResponse], Request), HostConnectionPool] = + Http().cachedHostConnectionPoolHttps[Request]("jsonplaceholder.typicode.com") - implicit val defaultPatience = + private implicit val defaultPatience: PatienceConfig = PatienceConfig(timeout = Span(2, Seconds), interval = Span(5, Millis)) import Request._ - def makeRequestAndHandle(line: String)(implicit connectionPool: Flow[(HttpRequest, Request),(Try[HttpResponse], Request), HostConnectionPool]) = { - + private def makeRequestAndHandle(line: String)(implicit connectionPool: Flow[(HttpRequest, Request),(Try[HttpResponse], Request), HostConnectionPool]) = { Source.single(line) .map( line => { @@ -53,60 +52,67 @@ class TestRequest extends FlatSpec with Matchers with DefaultJsonProtocol with S } "Request" should "get comments by post_id" in { - - whenReady(makeRequestAndHandle("""{"query": {"post_id": 1}, "path": "/comments"}""")) { - response => response.parseJson.asJsObject.fields("response") shouldBe a[JsArray] + whenReady(makeRequestAndHandle("""{"request": {"method": "GET", "query": {"post_id": 1}, "path": "/comments"}}""")) { + response => response.parseJson.asJsObject.fields("response").asJsObject.fields("body") shouldBe a[JsArray] } - whenReady(makeRequestAndHandle("""{"query": {"post_id": 1}, "path": "/comments", "context": "CvKL8"}""")) { - response => { - val responseAsJson = response.parseJson.asJsObject + whenReady(makeRequestAndHandle("""{"request": {"method": "GET", "query": {"post_id": 1}, "path": "/comments"}, "context": "CvKL8"}""")) { + responseRaw => { + val responseAsJson = responseRaw.parseJson.asJsObject val fields = responseAsJson.fields + fields("request") shouldBe JsObject( + "method" -> JsString("GET"), "query" -> JsObject( "post_id" -> JsNumber(1) - ) + ), + "path" -> JsString("/comments") ) - fields("response") shouldBe a[JsArray] + + val responseFields = fields("response").asJsObject.fields + responseFields("body") shouldBe a[JsArray] fields("context") shouldBe JsString("CvKL8") - inside (fields("extracted_at")) { + + inside (responseFields("responded_at")) { case JsString(extractedAt) => LocalDateTime.parse(extractedAt, DateTimeFormatter.ISO_DATE_TIME) shouldBe a[LocalDateTime] case _ => fail } } } - } "Request" should "post posts with user_id" in { - - whenReady(makeRequestAndHandle("""{"body": {"userId": 1, "title": "foo", "body": "bar"}, "path": "/posts"}""")) { - response => { - val responseAsJson = response.parseJson.asJsObject + whenReady(makeRequestAndHandle("""{"request": {"method": "POST", "body": {"userId": 1, "title": "foo", "body": "bar"}, "path": "/posts"}}""")) { + responseRaw => { + val responseAsJson = responseRaw.parseJson.asJsObject val fields = responseAsJson.fields + fields("request") shouldBe JsObject( + "method" -> JsString("POST"), "body" -> JsObject( "title" -> JsString("foo"), "body" -> JsString("bar"), "userId" -> JsNumber(1) - ) + ), + "path" -> JsString("/posts") ) - fields("response") shouldBe JsObject( + + val response = fields("response") + response.asJsObject.fields("body") shouldBe JsObject( "id" -> JsNumber(101), "title" -> JsString("foo"), "body" -> JsString("bar"), "userId" -> JsNumber(1) ) - inside (fields("extracted_at")) { + + inside (response.asJsObject.fields("responded_at")) { case JsString(extractedAt) => LocalDateTime.parse(extractedAt, DateTimeFormatter.ISO_DATE_TIME) shouldBe a[LocalDateTime] case _ => fail } } } - } - } diff --git a/src/main/scala/com/pagantis/singer/flows/BatchHttp.scala b/src/main/scala/com/pagantis/singer/flows/BatchHttp.scala index 58d084e..de2d5ae 100644 --- a/src/main/scala/com/pagantis/singer/flows/BatchHttp.scala +++ b/src/main/scala/com/pagantis/singer/flows/BatchHttp.scala @@ -37,7 +37,7 @@ object BatchHttp extends App { // This shutdown sequence was copied from another related issue: https://github.com/akka/akka-http/issues/907#issuecomment-345288919 def shutdownSequence = { for { - http <- Http().shutdownAllConnectionPools() + _ <- Http().shutdownAllConnectionPools() akka <- system.terminate() } yield akka } diff --git a/src/main/scala/com/pagantis/singer/flows/InvalidRequestException.scala b/src/main/scala/com/pagantis/singer/flows/InvalidRequestException.scala new file mode 100644 index 0000000..f5cad31 --- /dev/null +++ b/src/main/scala/com/pagantis/singer/flows/InvalidRequestException.scala @@ -0,0 +1,3 @@ +package com.pagantis.singer.flows + +case class InvalidRequestException(message: String = "Invalid request representation https://github.com/digitalorigin/batch-http/issues/17") extends Exception(message) \ No newline at end of file diff --git a/src/main/scala/com/pagantis/singer/flows/Request.scala b/src/main/scala/com/pagantis/singer/flows/Request.scala index 2bf3abe..2652e90 100644 --- a/src/main/scala/com/pagantis/singer/flows/Request.scala +++ b/src/main/scala/com/pagantis/singer/flows/Request.scala @@ -3,99 +3,87 @@ package com.pagantis.singer.flows import java.time.LocalDateTime import java.time.format.DateTimeFormatter -import akka.http.scaladsl.model.{ContentTypes, HttpEntity, HttpMethods, HttpRequest, HttpResponse, Uri} +import akka.http.scaladsl.model.{ContentTypes, HttpEntity, HttpHeader, HttpMethod, HttpMethods, HttpRequest, HttpResponse, RequestEntity, Uri} import akka.http.scaladsl.model.Uri.Query +import akka.http.scaladsl.model.headers.RawHeader import akka.stream.Materializer import akka.util.ByteString import com.typesafe.config.ConfigFactory import net.ceedubs.ficus.Ficus._ import spray.json._ +import scala.concurrent.{ExecutionContextExecutor, Future} +import scala.language.postfixOps import scala.util.{Failure, Success, Try} object Request { - val config = ConfigFactory.load() + private val config = ConfigFactory.load() - val extraParams = config.as[Map[String, String]]("flow.extra_params") - val defaultPath = config.getString("flow.path") + private val extraParams: Map[String, String] = + config.as[Option[Map[String, String]]]("flow.extra_params") match { + case None => Map() + case Some(params) => params + } - def fromLine(line: String) = { + private val defaultPath = config.getString("flow.path") + def fromLine(line: String): Request = { val message = line.parseJson - val fields = message.asJsObject.fields - - (fields.get("query"), fields.get("body"), fields.get("path"), fields.get("context")) match { - - case (Some(_), Some(_), _, _) => - throw new Exception - - case (Some(jsonQueryParams), None, optPath, optContext) => - - val asStringMap = jsonQueryParams - .asJsObject - .fields - - val optStringPath = optPath collect { case JsString(value) => value } - - GetRequest(asStringMap ++ extraParams.map(pair => (pair._1, JsString(pair._2))), optStringPath, optContext) - - case (None, Some(jsonBody), optPath, optContext) => - - val optStringPath = optPath collect { case JsString(value) => value } - - PostRequest(jsonBody.asJsObject, optStringPath, optContext) + val rootFields = message.asJsObject.fields + val request = rootFields.get("request") + val optContext = rootFields.get("context") + + request match { + case Some(JsObject(requestFields)) => + requestFields.get("method") match { + case Some(JsString(method)) => HttpMethods.getForKey(method) match { + case Some(httpMethod) => Request(httpMethod, requestFields, optContext) + case None => throw InvalidRequestException("'method' must be a valid HTTP method") + } + case Some(_) => throw InvalidRequestException("'method' must be a JSON string") + case _ => Request(HttpMethods.GET, requestFields, optContext) + } + case _ => throw new InvalidRequestException } - } - def parseResponse(triedResponse: (Try[HttpResponse], Request))(implicit am: Materializer) = { - - implicit val ec = am.executionContext + def parseResponse(triedResponse: (Try[HttpResponse], Request))(implicit am: Materializer): Future[String] = { + implicit val ec: ExecutionContextExecutor = am.executionContext triedResponse match { case (Success(response), request) => Request.fromHttpResponse(response).map(request.toLine(_)) case (Failure(exception), _) => throw exception } - } - - - def fromHttpResponse(response: HttpResponse)(implicit am: Materializer) = { - + def fromHttpResponse(response: HttpResponse)(implicit am: Materializer): Future[JsValue] = { import spray.json._ - implicit val ec = am.executionContext + implicit val ec: ExecutionContextExecutor = am.executionContext val responseAsJson = response.entity.dataBytes.runFold(ByteString(""))(_ ++ _) map (body => body.utf8String.parseJson) - responseAsJson - } - } +case class Request(method: HttpMethod, methodContents: Map[String, JsValue], context: Option[JsValue]) { -trait Request { - - val context: Option[JsValue] - - def toAkkaRequest: HttpRequest - - def outputRequest: JsObject + def outputRequest: JsObject = + JsObject(methodContents + ("method" -> JsString(method.value))) def toLine(response: JsValue, extractedAt: LocalDateTime = LocalDateTime.now()): String = { - val request = outputRequest val requestAndResponse = Map( "request" -> request, - "response" -> response, - "extracted_at" -> JsString(extractedAt.format(DateTimeFormatter.ISO_DATE_TIME)) + "response" -> JsObject( + "body" -> response, + "responded_at" -> JsString(extractedAt.format(DateTimeFormatter.ISO_DATE_TIME)) + ) ) val outputKeys = context match { @@ -106,49 +94,63 @@ trait Request { JsObject(outputKeys).compactPrint } -} - -case class GetRequest(queryParams: Map[String, JsValue], path: Option[String] = None, context: Option[JsValue]) extends Request { - - override def toAkkaRequest: HttpRequest = { + private def buildQuery(rawQuery: Option[JsValue]): Query = { + rawQuery match { + case Some(JsObject(fields)) => Query( + fields.mapValues { + case JsString(value) => value + case JsNumber(value) => value.toString + case value => throw InvalidRequestException(s"invalid query parameter $value: it must be a string or a number") + } ++ Request.extraParams + ) + case None if Request.extraParams.nonEmpty => Query(Request.extraParams) + case None => Query() + case _ => throw InvalidRequestException("'query' member must be key-value map") + } + } - val query = Query( - queryParams - .collect { - case (key, JsString(value)) => (key, value) - case (key, JsNumber(value)) => (key, value.toString) - } - ) + private def buildHeaders(rawHeaders: Option[JsValue]): List[HttpHeader] = { + rawHeaders match { + case Some(JsObject(fields)) => fields map { + case (header, JsString(value)) => RawHeader(header, value) + case (header, JsNumber(value)) => RawHeader(header, value.toString) + case header => throw InvalidRequestException(s"invalid header $header: it must be a string or a number") + } toList + case None => List() + case _ => throw InvalidRequestException("'headers' member must be key-value map") + } + } - HttpRequest( - method = HttpMethods.GET, - uri = path match { - case None => Uri(Request.defaultPath).withQuery(query) - case Some(path) => Uri(path).withQuery(query) + private def buildBody(rawBody: Option[JsValue]): RequestEntity = { + rawBody match { + case Some(body) => body match { + case JsObject(_) => HttpEntity(ContentTypes.`application/json`, body.compactPrint) + case _ => throw InvalidRequestException("'body' must be a full JSON object") } - ) + case None => HttpEntity.Empty + } } - override def outputRequest: JsObject = { - - val noSecrets = queryParams.filter(param => !Request.extraParams.contains(param._1)) + private def buildUri(optPath: Option[JsValue], optQueryRaw: Option[JsValue]): Uri = { + val uri = optPath match { + case Some(JsString(path)) => Uri.from(path = path) + case Some(_) => throw InvalidRequestException("'path' must be an JSON string") + case None => Uri.from(path = Request.defaultPath) + } - JsObject("query" -> JsObject(noSecrets)) + uri.withQuery(buildQuery(optQueryRaw)) } -} - -case class PostRequest(body: JsObject, path: Option[String] = None, context: Option[JsValue]) extends Request { - - override def toAkkaRequest: HttpRequest = HttpRequest( - method = HttpMethods.POST, - uri = path match { - case None => Uri(Request.defaultPath) - case Some(path) => Uri(path) - }, - entity = HttpEntity(ContentTypes.`application/json`, body.compactPrint) - ) + def toAkkaRequest: HttpRequest = { + val headers = buildHeaders(methodContents.get("headers")) + val entity = buildBody(methodContents.get("body") ) + val uri = buildUri(methodContents.get("path"), methodContents.get("query")) - override def outputRequest: JsObject = JsObject("body" -> body) - -} + HttpRequest( + method = method, + uri = uri, + headers = headers, + entity = entity + ) + } +} \ No newline at end of file diff --git a/src/test/scala/com/pagantis/singer/flows/test/TestRequest.scala b/src/test/scala/com/pagantis/singer/flows/test/TestRequest.scala index a5baab6..cb88c73 100644 --- a/src/test/scala/com/pagantis/singer/flows/test/TestRequest.scala +++ b/src/test/scala/com/pagantis/singer/flows/test/TestRequest.scala @@ -1,45 +1,110 @@ package com.pagantis.singer.flows.test -import com.pagantis.singer.flows.{GetRequest, PostRequest, Request} -import org.scalatest.{FlatSpec, Matchers} -import spray.json.{DefaultJsonProtocol, JsNumber, JsObject, JsString} +import akka.http.scaladsl.model.headers.RawHeader +import akka.http.scaladsl.model.{HttpMethods, HttpRequest, Uri} +import com.pagantis.singer.flows.{InvalidRequestException, Request} +import org.scalatest.{FlatSpec, Inside, Matchers} +import spray.json.{DefaultJsonProtocol, JsNumber, JsObject, JsString, JsValue} -class TestRequest extends FlatSpec with Matchers with DefaultJsonProtocol { +class TestRequest extends FlatSpec with Matchers with DefaultJsonProtocol with Inside{ - val query = JsObject( + val get: JsObject = JsObject( + "method" -> JsString("GET"), "query" -> JsObject( "parm1" -> JsString("value1"), "parm2" -> JsString("value2") + ), + "headers" -> JsObject( + "Some header key" -> JsString("Some header value") ) ) - val body = JsObject( + val post: JsObject = JsObject( + "method" -> JsString("POST"), "body" -> JsObject( "parm1" -> JsString("value1"), "parm2" -> JsString("value2") - ) + ), + "query" -> JsObject( + "var" -> JsString("var_value_1") + ), + "path" -> JsString("/some_path") ) - "Request" should "create GET request when a query map is passed" in { + private def wrapRequest(methodObject: JsObject, optContext: Option[JsValue] = None) = optContext match { + case Some(context) => JsObject("request" -> methodObject, "context" -> context) + case None => JsObject("request" -> methodObject) + } + + "Request" should "create GET request" in { + val context = JsString("some_id") + val request = Request.fromLine(wrapRequest(get, Some(context)).compactPrint) - Request.fromLine(query.compactPrint) shouldBe a[GetRequest] - Request.fromLine( - JsObject( - query.fields + ("context" -> JsString("some_id")) - ) compactPrint - ) shouldBe a[GetRequest] + inside(request.toAkkaRequest) { + case HttpRequest(HttpMethods.GET, _, headers, _, _) => + headers should contain (RawHeader("Some header key", "Some header value")) + case _ => fail + } + inside(request) { + case Request(method, _, Some(requestContext)) => + method shouldBe HttpMethods.GET + requestContext shouldBe context + case _ => fail + } } - "Request" should "create POST request when a body is passed" in { + "Request" should "create POST request" in { + val context = JsObject("context" -> JsObject(Map("type" -> JsString("order"), "id" -> JsNumber(746)))) + val request = Request.fromLine(wrapRequest(post, Some(context)).compactPrint) - Request.fromLine(body.compactPrint) shouldBe a[PostRequest] - Request.fromLine( - JsObject( - body.fields + ("context" -> JsObject(Map("type" -> JsString("order"), "id" -> JsNumber(746)))) - ) compactPrint - ) shouldBe a[PostRequest] + inside(request.toAkkaRequest) { + case HttpRequest(HttpMethods.POST, Uri(_, _, path, rawQueryString, _), _, _, _) => + rawQueryString shouldBe Some("var=var_value_1") + path.toString shouldBe "/some_path" + case _ => fail + } + inside(request) { + case Request(_, _, Some(requestContext)) => + requestContext shouldBe context + case _ => fail + } } + "Request" should "fail invalid representations" in { + assertThrows[InvalidRequestException] { Request.fromLine("""{}""") } + + (intercept[InvalidRequestException] { + Request.fromLine("""{"request": {"method": "INVALID_METHOD"}}""") + } getMessage) shouldBe "'method' must be a valid HTTP method" + + (intercept[InvalidRequestException] { + Request.fromLine("""{"request": {"method": 200}}""") + } getMessage) shouldBe "'method' must be a JSON string" + + (intercept[InvalidRequestException] { + Request.fromLine("""{"request": {"method": "GET", "query": {"param": {}}}}""").toAkkaRequest + } getMessage) shouldBe "invalid query parameter {}: it must be a string or a number" + + (intercept[InvalidRequestException] { + Request.fromLine("""{"request": {"method": "GET", "query": "query_val"}}""").toAkkaRequest + } getMessage) shouldBe "'query' member must be key-value map" + + (intercept[InvalidRequestException] { + Request.fromLine("""{"request": {"method": "GET", "headers": {"header_1": {}}}}""").toAkkaRequest + } getMessage) shouldBe "invalid header (header_1,{}): it must be a string or a number" + + (intercept[InvalidRequestException] { + Request.fromLine("""{"request": {"method": "GET", "headers": "header_val"}}""").toAkkaRequest + } getMessage) shouldBe "'headers' member must be key-value map" + + (intercept[InvalidRequestException] { + Request.fromLine("""{"request": {"method": "POST", "body": "body_val"}}""").toAkkaRequest + } getMessage) shouldBe "'body' must be a full JSON object" + + (intercept[InvalidRequestException] { + Request.fromLine("""{"request": {"method": "POST", "body": {}, "path": {}}}""").toAkkaRequest + } getMessage) shouldBe "'path' must be an JSON string" + } }