diff --git a/bin/goaround b/bin/goaround index 1535496..0d228cb 100755 Binary files a/bin/goaround and b/bin/goaround differ diff --git a/bin/shasum b/bin/shasum index d410f85..98c00f4 100644 --- a/bin/shasum +++ b/bin/shasum @@ -1 +1 @@ -f875002d695a5726444d6a86644f35449f844d44 ./bin/goaround +71711d3d03e824ec9a51ac734c94648e9874292f ./bin/goaround diff --git a/go.mod b/go.mod index 943b3e2..e31282c 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,8 @@ module github.com/CoderCookE/goaround go 1.12 -require github.com/dgraph-io/ristretto v0.0.0-20191004195602-f823dc4a5031 +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/dgraph-io/ristretto v0.0.1 + github.com/stretchr/testify v1.3.0 // indirect +) diff --git a/go.sum b/go.sum index e1536b3..91c12c1 100644 --- a/go.sum +++ b/go.sum @@ -1,10 +1,18 @@ -github.com/codercooke/goaround v0.0.0-20190726005525-216236838df3 h1:SH0qcVMNgZgsuUM8/xtmBtigM4UBvoCfuCQqsWBMo58= -github.com/codercooke/goaround v0.0.0-20190726005525-216236838df3/go.mod h1:l53nwp2tMBWyIWF0fHoVLBj5tKcovj0yC4LNp3CrvAY= +github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= +github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= +github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgraph-io/ristretto v0.0.0-20191004195602-f823dc4a5031 h1:puinfjvl1jM3YaODRJ63nURwCuY+7r2gaOxE4ON+68E= github.com/dgraph-io/ristretto v0.0.0-20191004195602-f823dc4a5031/go.mod h1:jg4yDfbNNmxP2Nq5Z7MbyQrXyl/syIRF2LnbbxqViho= +github.com/dgraph-io/ristretto v0.0.1 h1:cJwdnj42uV8Jg4+KLrYovLiCgIfz9wtWm6E6KA+1tLs= +github.com/dgraph-io/ristretto v0.0.1/go.mod h1:T40EBc7CJke8TkpiYfGGKAeFjSaxuFXhuXRyumBd6RE= +github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= diff --git a/internal/connectionpool/connection.go b/internal/connectionpool/connection.go index 0b019b9..ea6dca9 100644 --- a/internal/connectionpool/connection.go +++ b/internal/connectionpool/connection.go @@ -8,9 +8,15 @@ import ( "sync" ) +type message struct { + health bool + backend string + proxy *httputil.ReverseProxy +} + type connection struct { healthy bool - messages chan bool + messages chan message backend string sync.RWMutex proxy *httputil.ReverseProxy @@ -20,7 +26,7 @@ type connection struct { func newConnection(proxy *httputil.ReverseProxy, backend string, cache *ristretto.Cache) (*connection, error) { conn := &connection{ backend: backend, - messages: make(chan bool), + messages: make(chan message), proxy: proxy, cache: cache, } @@ -56,9 +62,19 @@ func (c *connection) get(w http.ResponseWriter, r *http.Request) error { func (c *connection) healthCheck() { for { - healthy := <-c.messages + msg := <-c.messages + c.Lock() - c.healthy = healthy + + backend := msg.backend + c.healthy = msg.health + proxy := msg.proxy + + if proxy != nil && c.backend != backend { + c.backend = backend + c.proxy = proxy + } + c.Unlock() } } diff --git a/internal/connectionpool/connection_test.go b/internal/connectionpool/connection_test.go index f88775e..031c818 100644 --- a/internal/connectionpool/connection_test.go +++ b/internal/connectionpool/connection_test.go @@ -39,7 +39,7 @@ func TestHealthCheck(t *testing.T) { assertion.Equal(err, nil) assertion.False(conn.healthy) - conn.messages <- true + conn.messages <- message{health: true} time.Sleep(200 * time.Millisecond) conn.Lock() @@ -47,7 +47,7 @@ func TestHealthCheck(t *testing.T) { conn.Unlock() assertion.True(health) - conn.messages <- false + conn.messages <- message{health: false} time.Sleep(200 * time.Millisecond) conn.Lock() diff --git a/internal/connectionpool/health-checker.go b/internal/connectionpool/health-checker.go index d66c3cb..3823483 100644 --- a/internal/connectionpool/health-checker.go +++ b/internal/connectionpool/health-checker.go @@ -7,6 +7,8 @@ import ( "io/ioutil" "log" "net/http" + "net/http/httputil" + "sync" "time" ) @@ -16,7 +18,8 @@ type healthCheckReponse struct { } type healthChecker struct { - subscribers []chan bool + sync.Mutex + subscribers []chan message currentHealth bool client *http.Client backend string @@ -27,14 +30,19 @@ func (hc *healthChecker) Start() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + hc.Lock() hc.check(ctx) + hc.Unlock() + ticker := time.NewTicker(1000 * time.Millisecond) for { select { case <-ticker.C: cancel() + hc.Lock() ctx, cancel = context.WithCancel(context.Background()) hc.check(ctx) + hc.Unlock() case <-hc.done: ticker.Stop() return @@ -42,12 +50,24 @@ func (hc *healthChecker) Start() { } } +func (hc *healthChecker) Reuse(newBackend string, proxy *httputil.ReverseProxy) *healthChecker { + hc.Lock() + hc.backend = newBackend + hc.notifySubscribers(false, hc.backend, proxy) + hc.Unlock() + + return hc +} + func (hc *healthChecker) check(ctx context.Context) { url := fmt.Sprintf("%s%s", hc.backend, "/health") healthy := hc.currentHealth - req, _ := http.NewRequest(http.MethodGet, url, nil) - if resp, err := http.DefaultClient.Do(req.WithContext(ctx)); err != nil { + req, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + log.Printf("Error creating request: %s, error %s", hc.backend, err.Error()) + healthy = false + } else if resp, err := http.DefaultClient.Do(req.WithContext(ctx)); err != nil { log.Printf("Error with health check, backend: %s, error %s", hc.backend, err.Error()) healthy = false } else { @@ -67,9 +87,14 @@ func (hc *healthChecker) check(ctx context.Context) { if healthy != hc.currentHealth { hc.currentHealth = healthy - for _, c := range hc.subscribers { - c <- healthy - } + hc.notifySubscribers(healthy, hc.backend, nil) + } +} + +func (hc *healthChecker) notifySubscribers(healthy bool, backend string, proxy *httputil.ReverseProxy) { + message := message{health: healthy, backend: backend, proxy: proxy} + for _, c := range hc.subscribers { + c <- message } } diff --git a/internal/connectionpool/health-checker_test.go b/internal/connectionpool/health-checker_test.go index f056eb6..c43456a 100644 --- a/internal/connectionpool/health-checker_test.go +++ b/internal/connectionpool/health-checker_test.go @@ -20,7 +20,7 @@ func TestHealthChecker(t *testing.T) { assertion := &assert.Asserter{T: t} t.Run("backend returns a healthy state", func(t *testing.T) { - resChan := make(chan bool, 1) + resChan := make(chan message, 1) availableHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { healthReponse := &healthCheckReponse{State: "healthy", Message: ""} @@ -33,7 +33,7 @@ func TestHealthChecker(t *testing.T) { hc := healthChecker{ client: client, - subscribers: []chan bool{resChan}, + subscribers: []chan message{resChan}, backend: availableServer.URL, done: make(chan bool), currentHealth: false, @@ -44,11 +44,11 @@ func TestHealthChecker(t *testing.T) { health := <-resChan - assertion.True(health) + assertion.True(health.health) }) t.Run("backend returns a degraded state", func(t *testing.T) { - resChan := make(chan bool, 1) + resChan := make(chan message, 1) degradedHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { healthReponse := &healthCheckReponse{State: "degraded", Message: ""} @@ -61,7 +61,7 @@ func TestHealthChecker(t *testing.T) { hc := healthChecker{ client: client, - subscribers: []chan bool{resChan}, + subscribers: []chan message{resChan}, backend: degradedServer.URL, done: make(chan bool), currentHealth: true, @@ -72,6 +72,6 @@ func TestHealthChecker(t *testing.T) { health := <-resChan - assertion.False(health) + assertion.False(health.health) }) } diff --git a/internal/connectionpool/pool.go b/internal/connectionpool/pool.go index ad29558..a288663 100644 --- a/internal/connectionpool/pool.go +++ b/internal/connectionpool/pool.go @@ -1,6 +1,7 @@ package connectionpool import ( + "bufio" "bytes" "github.com/dgraph-io/ristretto" "io/ioutil" @@ -11,12 +12,18 @@ import ( "net/http" "net/http/httputil" "net/url" + "os" + "strings" "time" ) type pool struct { - connections chan *connection - healthChecks []*healthChecker + connections chan *connection + healthChecks map[string]*healthChecker + client *http.Client + connsPerBackend int + cacheEnabled bool + cache *ristretto.Cache } //Exported method for creation of a connection-pool takes []string @@ -30,7 +37,7 @@ func New(c *Config) *pool { backendCount := int(math.Max(float64(len(backends)), float64(1))) if backendCount > 0 { - maxRequests = connsPerBackend * backendCount + maxRequests = connsPerBackend * backendCount * 2 } tr := &http.Transport{ @@ -51,12 +58,6 @@ func New(c *Config) *pool { Transport: tr, } - connectionPool := &pool{ - connections: make(chan *connection, maxRequests), - } - - poolConnections := make([]*connection, 0) - var cache *ristretto.Cache var err error if cache, err = buildCache(); err != nil { @@ -64,57 +65,22 @@ func New(c *Config) *pool { cacheEnabled = false } - for _, backend := range backends { - url, err := url.ParseRequestURI(backend) - - if err != nil { - log.Printf("error parsing backend url: %s", backend) - } else { - proxy := httputil.NewSingleHostReverseProxy(url) - proxy.Transport = client.Transport - - if cacheEnabled { - cacheResponse := func(r *http.Response) error { - body, err := ioutil.ReadAll(r.Body) - cacheable := string(body) - r.Body = ioutil.NopCloser(bytes.NewBuffer(body)) - - path := r.Request.URL.Path - if err == nil { - cache.Set(path, cacheable, 1) - } - - return nil - } - - proxy.ModifyResponse = cacheResponse - } - - newConnection, err := newConnection(proxy, backend, cache) - if err != nil { - log.Printf("Error adding connection for: %s", backend) - } else { - backendConnections := make([]chan bool, connsPerBackend) - - for i := 0; i < connsPerBackend; i++ { - poolConnections = append(poolConnections, newConnection) - backendConnections[i] = newConnection.messages - } - - hc := &healthChecker{ - client: client, - subscribers: backendConnections, - backend: newConnection.backend, - done: make(chan bool, 1), - } + connectionPool := &pool{ + connections: make(chan *connection, maxRequests), + healthChecks: make(map[string]*healthChecker), + client: client, + connsPerBackend: connsPerBackend, + cache: cache, + cacheEnabled: cacheEnabled, + } - connectionPool.healthChecks = append(connectionPool.healthChecks, hc) - go hc.Start() - } - } + poolConnections := []*connection{} + for _, backend := range backends { + poolConnections = connectionPool.addBackend(poolConnections, backend) } shuffle(poolConnections, connectionPool.connections) + go connectionPool.ListenForBackendChanges() return connectionPool } @@ -164,3 +130,159 @@ func (p *pool) Shutdown() { hc.Shutdown() } } + +func (p *pool) ListenForBackendChanges() { + const SockAddr = "/tmp/goaround.sock" + + if err := os.RemoveAll(SockAddr); err != nil { + log.Fatal(err) + } + + l, err := net.Listen("unix", SockAddr) + if err != nil { + log.Fatal("listen error:", err) + } + defer l.Close() + + for { + conn, err := l.Accept() + if err != nil { + log.Fatal("accept error:", err) + } + + scanner := bufio.NewScanner(conn) + for scanner.Scan() { + updated := strings.Split(scanner.Text(), ",") + + var currentBackends []string + for k := range p.healthChecks { + currentBackends = append(currentBackends, k) + } + + added, removed := difference(currentBackends, updated) + log.Printf("Adding: %s", added) + log.Printf("Removing: %s", removed) + + for _, removedBackend := range removed { + if len(added) > 0 { + var new string + new, added = added[0], added[1:] + url, err := url.ParseRequestURI(new) + if err != nil { + log.Printf("Error adding backend, %s", new) + } else { + proxy := httputil.NewSingleHostReverseProxy(url) + proxy.Transport = p.client.Transport + + if p.cacheEnabled { + cacheResponse := func(r *http.Response) error { + body, err := ioutil.ReadAll(r.Body) + cacheable := string(body) + r.Body = ioutil.NopCloser(bytes.NewBuffer(body)) + + path := r.Request.URL.Path + if err == nil { + p.cache.Set(path, cacheable, 1) + } + + return nil + } + proxy.ModifyResponse = cacheResponse + } + + newHC := p.healthChecks[removedBackend].Reuse(new, proxy) + p.healthChecks[new] = newHC + } + } else { + p.healthChecks[removedBackend].Shutdown() + } + + delete(p.healthChecks, removedBackend) + } + + poolConnections := []*connection{} + for _, addedBackend := range added { + poolConnections = p.addBackend(poolConnections, addedBackend) + } + + shuffle(poolConnections, p.connections) + } + } +} + +func difference(original []string, updated []string) (added []string, removed []string) { + oldBackends := make(map[string]bool) + for _, i := range original { + oldBackends[i] = true + } + + newBackends := make(map[string]bool) + for _, i := range updated { + newBackends[i] = true + } + + for _, i := range updated { + if _, ok := oldBackends[i]; !ok { + added = append(added, i) + } + } + + for _, i := range original { + if _, ok := newBackends[i]; !ok { + removed = append(removed, i) + } + } + + return +} + +func (p *pool) addBackend(connections []*connection, backend string) []*connection { + url, err := url.ParseRequestURI(backend) + if err != nil { + log.Printf("error parsing backend url: %s", backend) + } else { + proxy := httputil.NewSingleHostReverseProxy(url) + proxy.Transport = p.client.Transport + + if p.cacheEnabled { + cacheResponse := func(r *http.Response) error { + body, err := ioutil.ReadAll(r.Body) + cacheable := string(body) + r.Body = ioutil.NopCloser(bytes.NewBuffer(body)) + + path := r.Request.URL.Path + if err == nil { + p.cache.Set(path, cacheable, 1) + } + + return nil + } + + proxy.ModifyResponse = cacheResponse + } + + configuredConn, err := newConnection(proxy, backend, p.cache) + if err != nil { + log.Printf("Error adding connection for: %s", backend) + } else { + backendConnections := make([]chan message, p.connsPerBackend) + + for i := 0; i < p.connsPerBackend; i++ { + connections = append(connections, configuredConn) + backendConnections[i] = configuredConn.messages + } + + hc := &healthChecker{ + client: p.client, + subscribers: backendConnections, + backend: configuredConn.backend, + done: make(chan bool, 1), + } + + p.healthChecks[backend] = hc + go hc.Start() + } + } + + return connections +} diff --git a/internal/connectionpool/pool_test.go b/internal/connectionpool/pool_test.go index 399a685..40e7e2e 100644 --- a/internal/connectionpool/pool_test.go +++ b/internal/connectionpool/pool_test.go @@ -2,14 +2,15 @@ package connectionpool import ( "encoding/json" + "fmt" "io/ioutil" + "net" "net/http" "net/http/httptest" - "net/http/httputil" - "net/url" "strings" "sync" "testing" + "time" "github.com/CoderCookE/goaround/internal/assert" ) @@ -19,7 +20,6 @@ func TestFetch(t *testing.T) { t.Run("With cache", func(t *testing.T) { t.Run("Fetches from cache", func(t *testing.T) { callCount := 0 - availableResChan := make(chan bool, 1) wg := &sync.WaitGroup{} availableHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { var message []byte @@ -27,8 +27,6 @@ func TestFetch(t *testing.T) { if r.URL.Path == "/health" { healthReponse := &healthCheckReponse{State: "healthy", Message: ""} message, _ = json.Marshal(healthReponse) - - availableResChan <- true } if r.URL.Path == "/foo" { @@ -51,73 +49,104 @@ func TestFetch(t *testing.T) { } connectionPool := New(config) - defer connectionPool.Shutdown() - <-availableResChan + connectionPool.Shutdown() + + connectionPool.healthChecks[availableServer.URL].notifySubscribers(true, availableServer.URL, nil) + time.Sleep(200) + wg.Add(1) + for i := 0; i < 5; i++ { reader := strings.NewReader("This is a test") request := httptest.NewRequest("GET", "http://www.test.com/foo", reader) recorder := httptest.NewRecorder() connectionPool.Fetch(recorder, request) + + wg.Wait() + result, err := ioutil.ReadAll(recorder.Result().Body) assertion.Equal(err, nil) assertion.Equal(recorder.Code, http.StatusOK) assertion.Equal(string(result), "hello") assertion.Equal(callCount, 1) - wg.Wait() } }) }) t.Run("No cache", func(t *testing.T) { - t.Run("No backends available, returns 503", func(t *testing.T) { - config := &Config{ - Backends: []string{}, - NumConns: 1, - } - connectionPool := New(config) - defer connectionPool.Shutdown() + t.Run("fetches each request from server", func(t *testing.T) { + callCount := 0 + wg := &sync.WaitGroup{} - reader := strings.NewReader("This is a test") - request := httptest.NewRequest("GET", "http://www.test.com/hello", reader) - recorder := httptest.NewRecorder() - connectionPool.Fetch(recorder, request) + availableHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var message []byte - assertion.Equal(recorder.Code, http.StatusServiceUnavailable) - }) + if r.URL.Path == "/health" { + healthReponse := &healthCheckReponse{State: "healthy", Message: ""} + message, _ = json.Marshal(healthReponse) + } - t.Run("none of the instances are healthy, should return an HTTP 503 response code", func(t *testing.T) { - unavailableHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusInternalServerError) + if r.URL.Path == "/foo" { + wg.Done() + callCount += 1 + message = []byte("hello") + } + + w.Write(message) }) - unavailableServer := httptest.NewServer(unavailableHandler) - defer unavailableServer.Close() + availableServer := httptest.NewServer(availableHandler) + defer availableServer.Close() - backends := []string{unavailableServer.URL} + backends := []string{availableServer.URL} config := &Config{ Backends: backends, NumConns: 1, } connectionPool := New(config) - defer connectionPool.Shutdown() + connectionPool.Shutdown() - reader := strings.NewReader("This is a test") - request := httptest.NewRequest("GET", "http://www.test.com/hello", reader) - recorder := httptest.NewRecorder() - connectionPool.Fetch(recorder, request) + connectionPool.healthChecks[availableServer.URL].notifySubscribers(true, availableServer.URL, nil) + time.Sleep(200) + + for i := 0; i < 5; i++ { + wg.Add(1) + reader := strings.NewReader("This is a test") + request := httptest.NewRequest("GET", "http://www.test.com/foo", reader) + recorder := httptest.NewRecorder() + connectionPool.Fetch(recorder, request) - assertion.Equal(recorder.Code, http.StatusServiceUnavailable) + wg.Wait() + + result, err := ioutil.ReadAll(recorder.Result().Body) + assertion.Equal(err, nil) + assertion.Equal(recorder.Code, http.StatusOK) + assertion.Equal(string(result), "hello") + } + + assertion.Equal(callCount, 5) }) t.Run("First connection tried is degraded, Uses next connections", func(t *testing.T) { - availableResChan := make(chan bool, 1) + callCount := 0 + wg := &sync.WaitGroup{} + availableHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - healthReponse := &healthCheckReponse{State: "healthy", Message: ""} - healthMessage, _ := json.Marshal(healthReponse) - availableResChan <- true - w.Write(healthMessage) + var message []byte + + if r.URL.Path == "/health" { + healthReponse := &healthCheckReponse{State: "healthy", Message: ""} + message, _ = json.Marshal(healthReponse) + } + + if r.URL.Path == "/foo" { + wg.Done() + callCount += 1 + message = []byte("hello") + } + + w.Write(message) }) availableServer := httptest.NewServer(availableHandler) @@ -130,44 +159,87 @@ func TestFetch(t *testing.T) { unavailableServer := httptest.NewServer(unavailableHandler) defer unavailableServer.Close() - healthyBackend, _ := url.ParseRequestURI(availableServer.URL) - healthyConnection := &connection{ - backend: availableServer.URL, - healthy: true, - messages: make(chan bool), - proxy: httputil.NewSingleHostReverseProxy(healthyBackend), - } - - unhealthyBackend, _ := url.ParseRequestURI(unavailableServer.URL) - unhealthyConnection := &connection{ - backend: unavailableServer.URL, - healthy: false, - messages: make(chan bool), - proxy: httputil.NewSingleHostReverseProxy(unhealthyBackend), - } - config := &Config{ - Backends: []string{}, + Backends: []string{unavailableServer.URL, availableServer.URL}, NumConns: 10, } connectionPool := New(config) - connectionPool.connections <- unhealthyConnection - connectionPool.connections <- healthyConnection + connectionPool.Shutdown() - defer connectionPool.Shutdown() + connectionPool.healthChecks[availableServer.URL].notifySubscribers(true, availableServer.URL, nil) + time.Sleep(200) reader := strings.NewReader("This is a test") - request := httptest.NewRequest("GET", "http://www.test.com/hello", reader) + request := httptest.NewRequest("GET", "http://www.test.com/foo", reader) recorder := httptest.NewRecorder() - connectionPool.Fetch(recorder, request) - <-availableResChan + wg.Add(1) + connectionPool.Fetch(recorder, request) result, err := ioutil.ReadAll(recorder.Result().Body) - assertion.Equal(err, nil) + assertion.Equal(recorder.Code, http.StatusOK) - assertion.Equal(string(result), `{"state":"healthy","message":""}`) + assertion.Equal(string(result), `hello`) }) }) + + t.Run("Listens on unix socket for updates to backends", func(t *testing.T) { + callCount := 0 + blocker := make(chan bool) + wg := &sync.WaitGroup{} + + availableHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var message []byte + if r.URL.Path == "/health" { + healthReponse := &healthCheckReponse{State: "healthy", Message: ""} + message, _ = json.Marshal(healthReponse) + } + + if r.URL.Path == "/foo" { + wg.Done() + callCount += 1 + message = []byte("bar") + } + + w.Write(message) + go func() { + time.Sleep(1 * time.Second) + blocker <- true + }() + }) + + availableServer := httptest.NewServer(availableHandler) + defer availableServer.Close() + + config := &Config{ + Backends: []string{}, + NumConns: 10, + } + connectionPool := New(config) + time.Sleep(1 * time.Second) + + const SockAddr = "/tmp/goaround.sock" + c, err := net.Dial("unix", SockAddr) + assertion.Equal(err, nil) + + defer c.Close() + post := fmt.Sprintf("%s\n", availableServer.URL) + _, err = c.Write([]byte(post)) + assertion.Equal(err, nil) + <-blocker + + reader := strings.NewReader("This is a test") + request := httptest.NewRequest("GET", "http://www.test.com/foo", reader) + recorder := httptest.NewRecorder() + + wg.Add(1) + connectionPool.Fetch(recorder, request) + wg.Wait() + + result, err := ioutil.ReadAll(recorder.Result().Body) + assertion.Equal(err, nil) + assertion.Equal(recorder.Code, http.StatusOK) + assertion.Equal(string(result), `bar`) + }) } diff --git a/main.go b/main.go index 2c28361..ed4c2d6 100644 --- a/main.go +++ b/main.go @@ -27,6 +27,7 @@ func main() { handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { start := time.Now() + defer r.Body.Close() connectionPool.Fetch(w, r) log.Printf("Request completed in %v seconds", time.Since(start).Seconds()) }) diff --git a/readme.md b/readme.md index 1f310b4..559ba42 100644 --- a/readme.md +++ b/readme.md @@ -38,6 +38,14 @@ sudo ./bin/goaround -p 443 -b http://127.0.0.1:2702 -cacert /Users/ecook/cacert. -privkey location of private key ``` +## Updating backends via unix socket +Pass a comma separated list of all backends; +``` +echo "http:/?localhost:3000,http://localhost:3001" | nc -U /tmp/goaround.sock + +``` +The backends previously configured will be removed and replaced with only the ones passed in the updated list. + ## Detailed Implemnation This service starts a web server on a user defined port, passed via `-p` flag, if no flag is passed the service will default to port 3000.