Skip to content
Merged
50 changes: 29 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,33 +1,34 @@
# BatchHttp [![CircleCI](https://circleci.com/gh/digitalorigin/batch-http.svg?style=svg&circle-token=d196d5b828e9e0debb5c25f04e7279c1f342d675)](https://circleci.com/gh/digitalorigin/batch-http)
A tool for processing data batches through a REST API. It reads the `stdin` for JSON lines, converts each line to an HTTP call and
then it makes the call. Finally it prints both the input line and the response to the `stdout`.

For example, when passed a JSON string (compacted in a single line)
A tool for processing HTTP request batches through a REST API. It reads the `stdin` for JSON lines representing HTTP requests,
converts each line to an HTTP and executes it, providing both the request and the response as an output in the `stdout`.

For example, when passed a JSON string such as
```json
{
"query": {
"address": "1600 Amphitheatre Parkway, Mountain View, 9090",
"region":"es",
"language":"es"
}
"request": {
"method": "GET",
"path": "/maps/api/geocode/",
"query": {
"address": "1600 Amphitheatre Parkway, Mountain View, 9090",
"region":"es",
"language":"es"
}
}
}
```

it will make a request with a query parameters string
provided the `endpoint` configuration value is set to `maps.googleapis.com`, it will make a `GET` request to the endpoint on
```console
region=es&language=es&address=1600+Amphitheatre+Parkway,+Mountain+View,+CA
https://maps.googleapis.com/maps/api/geocode/region=es&language=es&address=1600+Amphitheatre+Parkway,+Mountain+View,+CA
```
The `endpoint` and `path` of the request can defined in the `application.conf` configuration file or they can be
overridden by the `endpoint` and `path` keys in the input JSON payload (takes precedence). The configuration
The `endpoint` and `port` of the requests must be defined in the `application.conf` configuration file, whilst the `path`
can be defined both in the configuration or in the `request` object (the second takes precedence). The configuration
file also supports additional secret query parameters. So with the following configuration file we can make a
request to the [Google Geocoding service](https://developers.google.com/maps/documentation/geocoding/intro).
```hocon
# application.conf
flow {

endpoint = "maps.googleapis.com"
path = "/maps/api/geocode/json"

# additional parameters to be inculded in the http query if any
extra_params {
Expand All @@ -40,6 +41,8 @@ The results is a JSON line which gets printed to the `stdout` with both the `req
```json
{
"request": {
"method": "GET",
"path": "/maps/api/geocode/",
"query": {
"address": "1600 Amphitheatre Parkway, Mountain View, 9090",
"region":"es",
Expand All @@ -56,14 +59,19 @@ The results is a JSON line which gets printed to the `stdout` with both the `req
}
```

- If a `query` object is passed in the JSON, a request will be created with all parameters inside
the object as URL parameters. `GET` method will be used.
In general, to represent the HTTP request as a JSON object some rules are contemplated ([#17](https://github.com/digitalorigin/batch-http/issues/17))
* The request must be wrapped in a `request` object in the JSON root.
* Inside the `request`, a `method` field can be used to indicate the [HTTP method](https://developer.mozilla.org/en-US/docs/Web/HTTP/Methods) for the request. If no method is provided and no default method is configured, `GET` will be used.
* The following attributes can also be specified to further refine the request.
* A `path` string for passing the path to be used in the request. If no path is provided in the request or in the configuration, `/` will be used.
* A `query` object for passing a set of key-value query parameters to be used in the request.
* A `headers` object for passing a set of key-value headers to be used in the request.
* A `body` object for sending a generic JSON object in the request body.
* A response is represented in a `response` object in the root. A `response` can contain `headers` and `body` as well. The response status is represented in a `status` field.
* Optionally a `context` object can also be passed in the root to allow for context propagation. This allows annotating input records with metadata which will not be used in the request ([#3](https://github.com/dcereijodo/batch-http/issues/3))

- If a `body` object is passed in the JSON, a request will be created with the contents of `body`
in the request body. `POST` method will be used.
Any object or key not specified above will be simply ignored.

Whatever is passed in the input JSON in the `context` key it will be propagated unaltered to the result.
This allows annotating input records with metadata which will not be used in the request ([#3](https://github.com/dcereijodo/batch-http/issues/3))

## Configuration
You can find examples and descriptions for all configurations supported by `batch-http` in the [sample configuration file](src/main/resources/application.conf). All this properties can be overridden on invocation by providing appropriate [JVM arguments](https://github.com/lightbend/config).
Expand Down
56 changes: 31 additions & 25 deletions src/it/scala/com/pagantis/singer/flows/it/TestRequest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.{Sink, Source}
import com.pagantis.singer.flows.BatchHttp.clazz
import com.pagantis.singer.flows.Request
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.time.{Millis, Seconds, Span}
Expand All @@ -27,18 +26,18 @@ class TestRequest extends FlatSpec with Matchers with DefaultJsonProtocol with S
// init actor system, loggers and execution context
implicit val system: ActorSystem = ActorSystem("BatchHttp")
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val standardLogger: LoggingAdapter = Logging(system, clazz)
implicit val standardLogger: LoggingAdapter = Logging(system, getClass.getName)
implicit val ec: ExecutionContextExecutor = system.dispatcher

implicit val connectionPool = Http().cachedHostConnectionPoolHttps[Request]("jsonplaceholder.typicode.com")
private implicit val connectionPool: Flow[(HttpRequest, Request), (Try[HttpResponse], Request), HostConnectionPool] =
Http().cachedHostConnectionPoolHttps[Request]("jsonplaceholder.typicode.com")

implicit val defaultPatience =
private implicit val defaultPatience: PatienceConfig =
PatienceConfig(timeout = Span(2, Seconds), interval = Span(5, Millis))

import Request._

def makeRequestAndHandle(line: String)(implicit connectionPool: Flow[(HttpRequest, Request),(Try[HttpResponse], Request), HostConnectionPool]) = {

private def makeRequestAndHandle(line: String)(implicit connectionPool: Flow[(HttpRequest, Request),(Try[HttpResponse], Request), HostConnectionPool]) = {
Source.single(line)
.map(
line => {
Expand All @@ -53,60 +52,67 @@ class TestRequest extends FlatSpec with Matchers with DefaultJsonProtocol with S
}

"Request" should "get comments by post_id" in {

whenReady(makeRequestAndHandle("""{"query": {"post_id": 1}, "path": "/comments"}""")) {
response => response.parseJson.asJsObject.fields("response") shouldBe a[JsArray]
whenReady(makeRequestAndHandle("""{"request": {"method": "GET", "query": {"post_id": 1}, "path": "/comments"}}""")) {
response => response.parseJson.asJsObject.fields("response").asJsObject.fields("body") shouldBe a[JsArray]
}

whenReady(makeRequestAndHandle("""{"query": {"post_id": 1}, "path": "/comments", "context": "CvKL8"}""")) {
response => {
val responseAsJson = response.parseJson.asJsObject
whenReady(makeRequestAndHandle("""{"request": {"method": "GET", "query": {"post_id": 1}, "path": "/comments"}, "context": "CvKL8"}""")) {
responseRaw => {
val responseAsJson = responseRaw.parseJson.asJsObject
val fields = responseAsJson.fields

fields("request") shouldBe JsObject(
"method" -> JsString("GET"),
"query" -> JsObject(
"post_id" -> JsNumber(1)
)
),
"path" -> JsString("/comments")
)
fields("response") shouldBe a[JsArray]

val responseFields = fields("response").asJsObject.fields
responseFields("body") shouldBe a[JsArray]
fields("context") shouldBe JsString("CvKL8")
inside (fields("extracted_at")) {

inside (responseFields("responded_at")) {
case JsString(extractedAt) =>
LocalDateTime.parse(extractedAt, DateTimeFormatter.ISO_DATE_TIME) shouldBe a[LocalDateTime]
case _ => fail
}
}
}

}

"Request" should "post posts with user_id" in {

whenReady(makeRequestAndHandle("""{"body": {"userId": 1, "title": "foo", "body": "bar"}, "path": "/posts"}""")) {
response => {
val responseAsJson = response.parseJson.asJsObject
whenReady(makeRequestAndHandle("""{"request": {"method": "POST", "body": {"userId": 1, "title": "foo", "body": "bar"}, "path": "/posts"}}""")) {
responseRaw => {
val responseAsJson = responseRaw.parseJson.asJsObject
val fields = responseAsJson.fields

fields("request") shouldBe JsObject(
"method" -> JsString("POST"),
"body" -> JsObject(
"title" -> JsString("foo"),
"body" -> JsString("bar"),
"userId" -> JsNumber(1)
)
),
"path" -> JsString("/posts")
)
fields("response") shouldBe JsObject(

val response = fields("response")
response.asJsObject.fields("body") shouldBe JsObject(
"id" -> JsNumber(101),
"title" -> JsString("foo"),
"body" -> JsString("bar"),
"userId" -> JsNumber(1)
)
inside (fields("extracted_at")) {

inside (response.asJsObject.fields("responded_at")) {
case JsString(extractedAt) =>
LocalDateTime.parse(extractedAt, DateTimeFormatter.ISO_DATE_TIME) shouldBe a[LocalDateTime]
case _ => fail
}
}
}

}

}

2 changes: 1 addition & 1 deletion src/main/scala/com/pagantis/singer/flows/BatchHttp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ object BatchHttp extends App {
// This shutdown sequence was copied from another related issue: https://github.com/akka/akka-http/issues/907#issuecomment-345288919
def shutdownSequence = {
for {
http <- Http().shutdownAllConnectionPools()
_ <- Http().shutdownAllConnectionPools()
akka <- system.terminate()
} yield akka
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package com.pagantis.singer.flows

case class InvalidRequestException(message: String = "Invalid request representation https://github.com/digitalorigin/batch-http/issues/17") extends Exception(message)
Loading