Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 37 additions & 9 deletions cmd/assaf-http/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ import (
"errors"
"flag"
"fmt"
"github.com/assaf/assaf-http-server/conf"
"github.com/assaf/assaf-http-server/models"
"github.com/opentracing-contrib/go-amqp/amqptracer"
"github.com/opentracing/opentracing-go"
"github.com/streadway/amqp"
"github.com/uber/jaeger-client-go"
"github.com/uber/jaeger-client-go/config"
"io"
"io/ioutil"
"log"
"math"
"math/rand"
Expand Down Expand Up @@ -269,10 +271,12 @@ func returnResponse(c_msg amqp.Delivery, requests *Reqs, tracer opentracing.Trac
}

sp, e := getOrCreateSpan(c_msg, tracer)
defer sp.Finish()
if e != nil {
log.Printf("Span was not created %v", e)
} else {
defer sp.Finish()
}

c_msg.Ack(false)
log.Printf("got request id: %d", consumeRequest.ReqId)

Expand All @@ -286,7 +290,7 @@ func returnResponse(c_msg amqp.Delivery, requests *Reqs, tracer opentracing.Trac
case <-consumeRequest.ReqChannel:
// This is done in order to no send on a closed channel which leads to panics
return errors.New("request closed before response")
case consumeRequest.ReqChannel <- models.Message{Body: c_msg.Body, RoutingKey: c_msg.RoutingKey, SpanCtx: sp.Context(), HttpStatus: http_status}:
case consumeRequest.ReqChannel <- models.Message{Body: c_msg.Body, RoutingKey: c_msg.RoutingKey, HttpStatus: http_status}:
}
return nil
}
Expand Down Expand Up @@ -314,22 +318,42 @@ func getOrCreateSpan(c_msg amqp.Delivery, tracer opentracing.Tracer) (opentracin
return sp, nil
}

func buildQuery(u *url.URL) (CleanQuery, error) {
func buildQuery(u *url.URL, body []byte) (CleanQuery, error) {

p, err := url.ParseRequestURI(u.RequestURI())
blank := CleanQuery{"", "", p.Query()}
values := make(map[string]interface{})
for key, _ := range p.Query() {
values[key] = p.Query().Get(key)
}
blank := CleanQuery{"", "", values}

fmt.Printf("%v", p)
if err != nil {
return blank, err
}

log.Printf("got query %s", p)

cleanQuery := strings.Split(p.Path, "/")
path := strings.SplitN(p.Path, "/"+conf.Assaf.Ns, 2)
cleanQuery := strings.Split(path[1], "/")

if len(body) > 0 {
// A bit of a dirty hack to keep error handling
cleanQuery = append(cleanQuery, "")

err := json.Unmarshal([]byte(body), &values)

if err != nil {
log.Println("couldn't unmarshal body")
}
}

if len(cleanQuery) < 3 {
return blank, url.EscapeError("bad request, not enough arguments")
}
return CleanQuery{routing: cleanQuery[1], p: cleanQuery[2], values: p.Query()}, nil
return CleanQuery{routing: cleanQuery[1], p: cleanQuery[2], values: values}, nil
}

func cleanRequest(p string) (string, error) {

cleanPath, err := url.QueryUnescape(p)
Expand All @@ -344,7 +368,7 @@ func cleanRequest(p string) (string, error) {
type CleanQuery struct {
routing string
p string
values url.Values
values map[string]interface{}
}

func requestHandler(msgQueue chan<- models.Message, tracer opentracing.Tracer, requests *Reqs) http.Handler {
Expand All @@ -353,7 +377,9 @@ func requestHandler(msgQueue chan<- models.Message, tracer opentracing.Tracer, r
span := tracer.StartSpan("call-http")
defer span.Finish()
requestId := rand.Intn(math.MaxInt32)
cq, err := buildQuery(r.URL)
bodyBytes, err := ioutil.ReadAll(r.Body)
cq, err := buildQuery(r.URL, bodyBytes)

fmt.Printf("%v", cq)
if err != nil {
fmt.Fprintf(w, "error: %s", err)
Expand Down Expand Up @@ -482,7 +508,9 @@ func main() {
defer done()

routerHttpHandler := Logger(requestHandler(toQueue, tracer, requests), "ds-proxy")
log.Fatal(http.ListenAndServe(":8080", routerHttpHandler))
http.Handle(conf.AssafBase, routerHttpHandler)

log.Fatal(http.ListenAndServe(":8080", nil))

//<-ctx.Done()
}
7 changes: 3 additions & 4 deletions models/Requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@ package models
import (
"github.com/opentracing/opentracing-go"
"github.com/streadway/amqp"
"net/url"
"time"
)

// A Message going over the wire
type DataRequest struct {
Body string `json:"body"`
RequestID int `json:"request-id"`
Options url.Values `json:"options"`
Body string `json:"body"`
RequestID int `json:"request-id"`
Options map[string]interface{} `json:"options"`
}

// Close tears the connection down, taking the channel with it.
Expand Down