From 51aabb929ac9f0f5821b00c8f66f7db796208b77 Mon Sep 17 00:00:00 2001 From: Pavel Nemirovsky Date: Wed, 12 May 2021 22:23:21 +0300 Subject: [PATCH 01/16] Adjusted logic --- proxy.go | 5 +++-- scope.go | 28 +++++++++++++++++++++++----- utils.go | 13 +++++++++++++ 3 files changed, 39 insertions(+), 7 deletions(-) diff --git a/proxy.go b/proxy.go index 9ce26dbf..eb24ce71 100644 --- a/proxy.go +++ b/proxy.go @@ -435,7 +435,7 @@ func (rp *reverseProxy) applyConfig(cfg *config.Config) error { return nil } -// refreshCacheMetrics refresehs cacheSize and cacheItems metrics. +// refreshCacheMetrics refreshes cacheSize and cacheItems metrics. func (rp *reverseProxy) refreshCacheMetrics() { rp.lock.RLock() defer rp.lock.RUnlock() @@ -452,7 +452,7 @@ func (rp *reverseProxy) refreshCacheMetrics() { func (rp *reverseProxy) getScope(req *http.Request) (*scope, int, error) { name, password := getAuth(req) - + sessionId := getSessionId(req) var ( u *user c *cluster @@ -467,6 +467,7 @@ func (rp *reverseProxy) getScope(req *http.Request) (*scope, int, error) { // Fix applyConfig if c or cu equal to nil. c = rp.clusters[u.toCluster] cu = c.users[u.toUser] + c.sessionId = sessionId } rp.lock.RUnlock() diff --git a/scope.go b/scope.go index c5701dba..a750ebfa 100644 --- a/scope.go +++ b/scope.go @@ -19,6 +19,8 @@ import ( "github.com/prometheus/client_golang/prometheus" ) +// var route = make(map[int]*host) + type scopeID uint64 func (sid scopeID) String() string { @@ -305,6 +307,8 @@ var allowedParams = []string{ "extremes", // what to do if the volume of the result exceeds one of the limits "result_overflow_mode", + // session stickiness + "session_id", } // This regexp must match params needed to describe a way to use external data @@ -718,6 +722,8 @@ type cluster struct { killQueryUserName string killQueryUserPassword string + sessionId string + heartBeat *heartBeat } @@ -829,18 +835,30 @@ func (r *replica) getHost() *host { reqs = ^uint32(0) } - if reqs == 0 { - return h - } - // Scan all the hosts for the least loaded host. for i := uint32(1); i < n; i++ { tmpIdx := (idx + i) % n tmpH := r.hosts[tmpIdx] + + // handling sticky session + if r.cluster.sessionId != "" { + sessionId := hash(r.cluster.sessionId) + tmpIdx = (sessionId) % n + tmpHSticky := r.hosts[tmpIdx] + if !tmpHSticky.isActive() { + log.Debugf("Sticky Session Server has been picked up but not available") + continue + } + log.Debugf("Sticky Session Server is: %s, session: %d, idx mod: %d - %d", tmpH.addr, sessionId, tmpIdx, n) + return tmpH + } + + // continue as usual + tmpReqs := tmpH.load() + if !tmpH.isActive() { continue } - tmpReqs := tmpH.load() if tmpReqs == 0 { return tmpH } diff --git a/utils.go b/utils.go index b14c0d62..58474fa0 100644 --- a/utils.go +++ b/utils.go @@ -4,6 +4,7 @@ import ( "bytes" "compress/gzip" "fmt" + "hash/fnv" "io" "io/ioutil" "net/http" @@ -43,6 +44,12 @@ func getAuth(req *http.Request) (string, string) { return "default", "" } +// getSessionId retrieves session id +func getSessionId(req *http.Request) string { + params := req.URL.Query() + return params.Get("session_id") +} + // getQuerySnippet returns query snippet. // // getQuerySnippet must be called only for error reporting. @@ -57,6 +64,12 @@ func getQuerySnippet(req *http.Request) string { return query + body } +func hash(s string) uint32 { + h := fnv.New32a() + h.Write([]byte(s)) + return h.Sum32() +} + func getQuerySnippetFromBody(req *http.Request) string { if req.Body == nil { return "" From 8d905da242b8cd951a8981b9221bc17539647a45 Mon Sep 17 00:00:00 2001 From: Pavel Nemirovsky Date: Wed, 12 May 2021 23:54:49 +0300 Subject: [PATCH 02/16] Error handling --- scope.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scope.go b/scope.go index a750ebfa..fa94b4c5 100644 --- a/scope.go +++ b/scope.go @@ -846,14 +846,14 @@ func (r *replica) getHost() *host { tmpIdx = (sessionId) % n tmpHSticky := r.hosts[tmpIdx] if !tmpHSticky.isActive() { - log.Debugf("Sticky Session Server has been picked up but not available") + log.Debugf("Sticky Session Server has been picked up, but it is not available") continue } log.Debugf("Sticky Session Server is: %s, session: %d, idx mod: %d - %d", tmpH.addr, sessionId, tmpIdx, n) return tmpH } - // continue as usual + // handling least loaded host flow tmpReqs := tmpH.load() if !tmpH.isActive() { From 669e5a7959eca3e5323cae9be86d0a52948c119a Mon Sep 17 00:00:00 2001 From: Pavel Nemirovsky Date: Thu, 13 May 2021 01:03:15 +0300 Subject: [PATCH 03/16] tests --- main_test.go | 22 ++++++++++++++++++++++ proxy.go | 4 ++-- testdata/http-session-id.yml | 15 +++++++++++++++ utils.go | 4 +++- 4 files changed, 42 insertions(+), 3 deletions(-) create mode 100644 testdata/http-session-id.yml diff --git a/main_test.go b/main_test.go index 6492585e..0e626461 100644 --- a/main_test.go +++ b/main_test.go @@ -284,6 +284,28 @@ func TestServe(t *testing.T) { }, startHTTP, }, + { + "http POST request with session id", + "testdata/http-session-id.yml", + func(t *testing.T) { + // buf := bytes.NewBufferString("SELECT * FROM system.numbers LIMIT 10") + data := url.Values{} + data.Set("query", `SELECT * FROM system.numbers LIMIT 10`) + data.Add("session_id", "1234") + data.Add("query_id", "1234") + req, err := http.NewRequest("POST", "http://127.0.0.1:9090/", bytes.NewBufferString(data.Encode())) + req.Header.Set("Content-Type", "application/x-www-form-urlencoded; param=value") // This makes it work + + checkErr(t, err) + resp, err := http.DefaultClient.Do(req) + checkErr(t, err) + if resp.StatusCode != http.StatusOK { + t.Fatalf("unexpected status code: %d; expected: %d", resp.StatusCode, http.StatusOK) + } + resp.Body.Close() + }, + startHTTP, + }, { "http request", "testdata/http.yml", diff --git a/proxy.go b/proxy.go index eb24ce71..a893f8ed 100644 --- a/proxy.go +++ b/proxy.go @@ -110,9 +110,9 @@ func (rp *reverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) { q := getQuerySnippet(req) if srw.statusCode == http.StatusOK { requestSuccess.With(s.labels).Inc() - log.Debugf("%s: request success; query: %q; URL: %q", s, q, req.URL.String()) + log.Debugf("%s: request success; query: %q; Method: %s; URL: %q", s, q, req.Method, req.URL.String()) } else { - log.Debugf("%s: request failure: non-200 status code %d; query: %q; URL: %q", s, srw.statusCode, q, req.URL.String()) + log.Debugf("%s: request failure: non-200 status code %d; query: %q; Method: %s; URL: %q", s, srw.statusCode, q, req.Method, req.URL.String()) } statusCodes.With( diff --git a/testdata/http-session-id.yml b/testdata/http-session-id.yml new file mode 100644 index 00000000..2c00146d --- /dev/null +++ b/testdata/http-session-id.yml @@ -0,0 +1,15 @@ +log_debug: true +server: + http: + listen_addr: ":9090" + allowed_networks: ["127.0.0.1/24"] + +users: + - name: "default" + to_cluster: "default" + to_user: "default" + +clusters: + - name: "default" + nodes: + - 127.0.0.1:8124 diff --git a/utils.go b/utils.go index 58474fa0..55290631 100644 --- a/utils.go +++ b/utils.go @@ -47,7 +47,9 @@ func getAuth(req *http.Request) (string, string) { // getSessionId retrieves session id func getSessionId(req *http.Request) string { params := req.URL.Query() - return params.Get("session_id") + sessionId := params.Get("session_id") + req.Header.Set("X-ClickHouse-Session-ID", sessionId) + return sessionId } // getQuerySnippet returns query snippet. From 881b17a499260c38428e1ac935b68ed48ad45e84 Mon Sep 17 00:00:00 2001 From: Pavel Nemirovsky Date: Thu, 13 May 2021 12:55:20 +0300 Subject: [PATCH 04/16] Adjust --- proxy.go | 7 ++--- scope.go | 78 +++++++++++++++++++++++++++++++++++++++++--------------- utils.go | 1 - 3 files changed, 62 insertions(+), 24 deletions(-) diff --git a/proxy.go b/proxy.go index a893f8ed..2c18f61c 100644 --- a/proxy.go +++ b/proxy.go @@ -51,8 +51,10 @@ func newReverseProxy() *reverseProxy { func (rp *reverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) { startTime := time.Now() - s, status, err := rp.getScope(req) + if s.sessionId != "" { + rw.Header().Set("X-ClickHouse-Session-Id", s.sessionId) + } if err != nil { q := getQuerySnippet(req) err = fmt.Errorf("%q: %s; query: %q", req.RemoteAddr, err, q) @@ -467,7 +469,6 @@ func (rp *reverseProxy) getScope(req *http.Request) (*scope, int, error) { // Fix applyConfig if c or cu equal to nil. c = rp.clusters[u.toCluster] cu = c.users[u.toUser] - c.sessionId = sessionId } rp.lock.RUnlock() @@ -490,6 +491,6 @@ func (rp *reverseProxy) getScope(req *http.Request) (*scope, int, error) { return nil, http.StatusForbidden, fmt.Errorf("cluster user %q is not allowed to access", cu.name) } - s := newScope(req, u, c, cu) + s := newScope(req, u, c, cu, sessionId) return s, 0, nil } diff --git a/scope.go b/scope.go index fa94b4c5..75a377d5 100644 --- a/scope.go +++ b/scope.go @@ -42,6 +42,8 @@ type scope struct { user *user clusterUser *clusterUser + sessionId string + remoteAddr string localAddr string @@ -51,9 +53,12 @@ type scope struct { labels prometheus.Labels } -func newScope(req *http.Request, u *user, c *cluster, cu *clusterUser) *scope { - h := c.getHost() +func newScope(req *http.Request, u *user, c *cluster, cu *clusterUser, sessionId string) *scope { + h := c.getHost() + if sessionId != "" { + h = c.getHostSticky(sessionId) + } var localAddr string if addr, ok := req.Context().Value(http.LocalAddrContextKey).(net.Addr); ok { localAddr = addr.String() @@ -65,6 +70,7 @@ func newScope(req *http.Request, u *user, c *cluster, cu *clusterUser) *scope { cluster: c, user: u, clusterUser: cu, + sessionId: sessionId, remoteAddr: req.RemoteAddr, localAddr: localAddr, @@ -722,8 +728,6 @@ type cluster struct { killQueryUserName string killQueryUserPassword string - sessionId string - heartBeat *heartBeat } @@ -816,10 +820,10 @@ func (c *cluster) getReplica() *replica { return r } -// getHost returns least loaded + round-robin host from replica. +// getHostSticky returns host by stickiness from replica. // // Always returns non-nil. -func (r *replica) getHost() *host { +func (r *replica) getHostSticky(sessionId string) *host { idx := atomic.AddUint32(&r.nextHostIdx, 1) n := uint32(len(r.hosts)) if n == 1 { @@ -828,37 +832,63 @@ func (r *replica) getHost() *host { idx %= n h := r.hosts[idx] - reqs := h.load() - - // Set least priority to inactive host. - if !h.isActive() { - reqs = ^uint32(0) - } // Scan all the hosts for the least loaded host. for i := uint32(1); i < n; i++ { tmpIdx := (idx + i) % n - tmpH := r.hosts[tmpIdx] // handling sticky session - if r.cluster.sessionId != "" { - sessionId := hash(r.cluster.sessionId) + if sessionId != "" { + sessionId := hash(sessionId) tmpIdx = (sessionId) % n tmpHSticky := r.hosts[tmpIdx] + log.Debugf("Sticky server candidate is: %s", tmpHSticky.addr) if !tmpHSticky.isActive() { - log.Debugf("Sticky Session Server has been picked up, but it is not available") + log.Debugf("Sticky session server has been picked up, but it is not available") continue } - log.Debugf("Sticky Session Server is: %s, session: %d, idx mod: %d - %d", tmpH.addr, sessionId, tmpIdx, n) - return tmpH + log.Debugf("Sticky session server is: %s, session_id: %d, server_idx: %d, max nodes in pool: %d", tmpHSticky.addr, sessionId, tmpIdx, n) + return tmpHSticky } + } - // handling least loaded host flow - tmpReqs := tmpH.load() + // The returned host may be inactive. This is OK, + // since this means all the hosts are inactive, + // so let's try proxying the request to any host. + return h +} + +// getHost returns least loaded + round-robin host from replica. +// +// Always returns non-nil. +func (r *replica) getHost() *host { + idx := atomic.AddUint32(&r.nextHostIdx, 1) + n := uint32(len(r.hosts)) + if n == 1 { + return r.hosts[0] + } + + idx %= n + h := r.hosts[idx] + reqs := h.load() + + // Set least priority to inactive host. + if !h.isActive() { + reqs = ^uint32(0) + } + + if reqs == 0 { + return h + } + // Scan all the hosts for the least loaded host. + for i := uint32(1); i < n; i++ { + tmpIdx := (idx + i) % n + tmpH := r.hosts[tmpIdx] if !tmpH.isActive() { continue } + tmpReqs := tmpH.load() if tmpReqs == 0 { return tmpH } @@ -874,6 +904,14 @@ func (r *replica) getHost() *host { return h } +// getHostSticky returns host based on stickiness from cluster. +// +// Always returns non-nil. +func (c *cluster) getHostSticky(sessionId string) *host { + r := c.getReplica() + return r.getHostSticky(sessionId) +} + // getHost returns least loaded + round-robin host from cluster. // // Always returns non-nil. diff --git a/utils.go b/utils.go index 55290631..15c8d5b8 100644 --- a/utils.go +++ b/utils.go @@ -48,7 +48,6 @@ func getAuth(req *http.Request) (string, string) { func getSessionId(req *http.Request) string { params := req.URL.Query() sessionId := params.Get("session_id") - req.Header.Set("X-ClickHouse-Session-ID", sessionId) return sessionId } From fc7db12d1d8d73cdfb486d58bcdbb380bd628278 Mon Sep 17 00:00:00 2001 From: Pavel Nemirovsky Date: Thu, 13 May 2021 13:02:24 +0300 Subject: [PATCH 05/16] session_id --- proxy.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/proxy.go b/proxy.go index 2c18f61c..72f7fb6f 100644 --- a/proxy.go +++ b/proxy.go @@ -52,9 +52,6 @@ func newReverseProxy() *reverseProxy { func (rp *reverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) { startTime := time.Now() s, status, err := rp.getScope(req) - if s.sessionId != "" { - rw.Header().Set("X-ClickHouse-Session-Id", s.sessionId) - } if err != nil { q := getQuerySnippet(req) err = fmt.Errorf("%q: %s; query: %q", req.RemoteAddr, err, q) @@ -117,6 +114,11 @@ func (rp *reverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) { log.Debugf("%s: request failure: non-200 status code %d; query: %q; Method: %s; URL: %q", s, srw.statusCode, q, req.Method, req.URL.String()) } + // publish session_id if needed + if s.sessionId != "" { + rw.Header().Set("X-ClickHouse-Session-Id", s.sessionId) + } + statusCodes.With( prometheus.Labels{ "user": s.user.name, From 444a437746b6a86d535dba65af0212647be81247 Mon Sep 17 00:00:00 2001 From: Pavel Nemirovsky Date: Thu, 13 May 2021 15:05:10 +0300 Subject: [PATCH 06/16] test adjustment --- main_test.go | 12 ++++-------- proxy.go | 10 +++++----- 2 files changed, 9 insertions(+), 13 deletions(-) diff --git a/main_test.go b/main_test.go index 0e626461..5dfc72f1 100644 --- a/main_test.go +++ b/main_test.go @@ -288,18 +288,14 @@ func TestServe(t *testing.T) { "http POST request with session id", "testdata/http-session-id.yml", func(t *testing.T) { - // buf := bytes.NewBufferString("SELECT * FROM system.numbers LIMIT 10") - data := url.Values{} - data.Set("query", `SELECT * FROM system.numbers LIMIT 10`) - data.Add("session_id", "1234") - data.Add("query_id", "1234") - req, err := http.NewRequest("POST", "http://127.0.0.1:9090/", bytes.NewBufferString(data.Encode())) - req.Header.Set("Content-Type", "application/x-www-form-urlencoded; param=value") // This makes it work + req, err := http.NewRequest("POST", "http://127.0.0.1:9090/?query_id=45395792-a432-4b92-8cc9-536c14e1e1a9&extremes=0&session_id=default-session-id233", bytes.NewBufferString("SELECT * FROM system.numbers LIMIT 10")) + req.Header.Set("Content-Type", "application/x-www-form-urlencoded;") // This makes it work checkErr(t, err) resp, err := http.DefaultClient.Do(req) checkErr(t, err) - if resp.StatusCode != http.StatusOK { + + if resp.StatusCode != http.StatusOK || resp.Header.Get("X-Clickhouse-Server-Session-Id") == "" { t.Fatalf("unexpected status code: %d; expected: %d", resp.StatusCode, http.StatusOK) } resp.Body.Close() diff --git a/proxy.go b/proxy.go index 72f7fb6f..cd9d54f1 100644 --- a/proxy.go +++ b/proxy.go @@ -98,6 +98,11 @@ func (rp *reverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) { ReadCloser: req.Body, } + // publish session_id if needed + if s.sessionId != "" { + rw.Header().Set("X-ClickHouse-Server-Session-Id", s.sessionId) + } + if s.user.cache == nil { rp.proxyRequest(s, srw, srw, req) } else { @@ -114,11 +119,6 @@ func (rp *reverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) { log.Debugf("%s: request failure: non-200 status code %d; query: %q; Method: %s; URL: %q", s, srw.statusCode, q, req.Method, req.URL.String()) } - // publish session_id if needed - if s.sessionId != "" { - rw.Header().Set("X-ClickHouse-Session-Id", s.sessionId) - } - statusCodes.With( prometheus.Labels{ "user": s.user.name, From 4f7a4e84a82ae1e80eb005af2b9b5aa9b4ff00df Mon Sep 17 00:00:00 2001 From: Pavel Nemirovsky Date: Fri, 21 May 2021 20:03:37 +0300 Subject: [PATCH 07/16] session_timeout logic --- proxy.go | 3 ++- scope.go | 26 ++++++++++++++++---------- utils.go | 11 +++++++++++ 3 files changed, 29 insertions(+), 11 deletions(-) diff --git a/proxy.go b/proxy.go index cd9d54f1..11684b6d 100644 --- a/proxy.go +++ b/proxy.go @@ -457,6 +457,7 @@ func (rp *reverseProxy) refreshCacheMetrics() { func (rp *reverseProxy) getScope(req *http.Request) (*scope, int, error) { name, password := getAuth(req) sessionId := getSessionId(req) + sessionTimeout := getSessionTimeout(req) var ( u *user c *cluster @@ -493,6 +494,6 @@ func (rp *reverseProxy) getScope(req *http.Request) (*scope, int, error) { return nil, http.StatusForbidden, fmt.Errorf("cluster user %q is not allowed to access", cu.name) } - s := newScope(req, u, c, cu, sessionId) + s := newScope(req, u, c, cu, sessionId, sessionTimeout) return s, 0, nil } diff --git a/scope.go b/scope.go index 75a377d5..fec32513 100644 --- a/scope.go +++ b/scope.go @@ -9,6 +9,7 @@ import ( "net/http" "net/url" "regexp" + "strconv" "strings" "sync/atomic" "time" @@ -42,7 +43,8 @@ type scope struct { user *user clusterUser *clusterUser - sessionId string + sessionId string + sessionTimeout int remoteAddr string localAddr string @@ -53,8 +55,7 @@ type scope struct { labels prometheus.Labels } -func newScope(req *http.Request, u *user, c *cluster, cu *clusterUser, sessionId string) *scope { - +func newScope(req *http.Request, u *user, c *cluster, cu *clusterUser, sessionId string, sessionTimeout int) *scope { h := c.getHost() if sessionId != "" { h = c.getHostSticky(sessionId) @@ -64,13 +65,14 @@ func newScope(req *http.Request, u *user, c *cluster, cu *clusterUser, sessionId localAddr = addr.String() } s := &scope{ - startTime: time.Now(), - id: newScopeID(), - host: h, - cluster: c, - user: u, - clusterUser: cu, - sessionId: sessionId, + startTime: time.Now(), + id: newScopeID(), + host: h, + cluster: c, + user: u, + clusterUser: cu, + sessionId: sessionId, + sessionTimeout: sessionTimeout, remoteAddr: req.RemoteAddr, localAddr: localAddr, @@ -315,6 +317,8 @@ var allowedParams = []string{ "result_overflow_mode", // session stickiness "session_id", + // session timeout + "session_timeout", } // This regexp must match params needed to describe a way to use external data @@ -359,6 +363,8 @@ func (s *scope) decorateRequest(req *http.Request) (*http.Request, url.Values) { // Set query_id as scope_id to have possibility to kill query if needed. params.Set("query_id", s.id.String()) + // Set session_timeout an idle timeout for session + params.Set("session_timeout", strconv.Itoa(s.sessionTimeout)) req.URL.RawQuery = params.Encode() diff --git a/utils.go b/utils.go index 15c8d5b8..c3de05fb 100644 --- a/utils.go +++ b/utils.go @@ -9,6 +9,7 @@ import ( "io/ioutil" "net/http" "sort" + "strconv" "strings" "github.com/Vertamedia/chproxy/chdecompressor" @@ -51,6 +52,16 @@ func getSessionId(req *http.Request) string { return sessionId } +// getSessionId retrieves session id +func getSessionTimeout(req *http.Request) int { + params := req.URL.Query() + sessionTimeout, err := strconv.Atoi(params.Get("session_timeout")) + if err != nil && sessionTimeout > 0 { + return sessionTimeout + } + return 60 +} + // getQuerySnippet returns query snippet. // // getQuerySnippet must be called only for error reporting. From 2ebaa8805c17e08fdf9506ca3782e1686bd93bee Mon Sep 17 00:00:00 2001 From: Pavel Nemirovsky Date: Fri, 21 May 2021 20:11:05 +0300 Subject: [PATCH 08/16] session_timeout tests --- main_test.go | 4 +++- scope_test.go | 12 ++++++------ 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/main_test.go b/main_test.go index 5dfc72f1..63d1e1b1 100644 --- a/main_test.go +++ b/main_test.go @@ -288,7 +288,9 @@ func TestServe(t *testing.T) { "http POST request with session id", "testdata/http-session-id.yml", func(t *testing.T) { - req, err := http.NewRequest("POST", "http://127.0.0.1:9090/?query_id=45395792-a432-4b92-8cc9-536c14e1e1a9&extremes=0&session_id=default-session-id233", bytes.NewBufferString("SELECT * FROM system.numbers LIMIT 10")) + req, err := http.NewRequest("POST", + "http://127.0.0.1:9090/?query_id=45395792-a432-4b92-8cc9-536c14e1e1a9&extremes=0&session_id=default-session-id233", + bytes.NewBufferString("SELECT * FROM system.numbers LIMIT 10")) req.Header.Set("Content-Type", "application/x-www-form-urlencoded;") // This makes it work checkErr(t, err) diff --git a/scope_test.go b/scope_test.go index af3180ac..284c4458 100644 --- a/scope_test.go +++ b/scope_test.go @@ -330,14 +330,14 @@ func TestDecorateRequest(t *testing.T) { "text/plain", "GET", nil, - []string{"query_id", "query"}, + []string{"query_id", "session_timeout", "query"}, }, { "http://127.0.0.1?user=default&password=default&query=SELECT&database=default&wait_end_of_query=1", "text/plain", "GET", nil, - []string{"query_id", "query", "database"}, + []string{"query_id", "session_timeout", "query", "database"}, }, { "http://127.0.0.1?user=default&password=default&query=SELECT&testdata_structure=id+UInt32&testdata_format=TSV", @@ -352,7 +352,7 @@ func TestDecorateRequest(t *testing.T) { }, }, }, - []string{"query_id", "query", "max_threads"}, + []string{"query_id", "session_timeout", "query", "max_threads"}, }, { "http://127.0.0.1?user=default&password=default&query=SELECT&testdata_structure=id+UInt32&testdata_format=TSV", @@ -367,7 +367,7 @@ func TestDecorateRequest(t *testing.T) { }, }, }, - []string{"query_id", "query"}, + []string{"query_id", "session_timeout", "query"}, }, { "http://127.0.0.1?user=default&password=default&query=SELECT&testdata_type_buzz=1&testdata_structure_foo=id+UInt32&testdata_format-bar=TSV", @@ -386,14 +386,14 @@ func TestDecorateRequest(t *testing.T) { }, }, }, - []string{"query_id", "query", "max_threads", "background_pool_size"}, + []string{"query_id", "session_timeout", "query", "max_threads", "background_pool_size"}, }, { "http://127.0.0.1?user=default&password=default&query=SELECT&testdata_structure=id+UInt32&testdata_format=TSV", "multipart/form-data; boundary=foobar", "POST", nil, - []string{"query_id", "testdata_structure", "testdata_format", "query"}, + []string{"query_id", "session_timeout", "testdata_structure", "testdata_format", "query"}, }, } From 14fc89fdc5ccb7d0bf0917ccc1cd053d0e9d24fe Mon Sep 17 00:00:00 2001 From: Pavel Nemirovsky Date: Fri, 21 May 2021 20:13:04 +0300 Subject: [PATCH 09/16] header test --- main_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main_test.go b/main_test.go index 63d1e1b1..d1c63b1b 100644 --- a/main_test.go +++ b/main_test.go @@ -297,7 +297,7 @@ func TestServe(t *testing.T) { resp, err := http.DefaultClient.Do(req) checkErr(t, err) - if resp.StatusCode != http.StatusOK || resp.Header.Get("X-Clickhouse-Server-Session-Id") == "" { + if resp.StatusCode != http.StatusOK || resp.StatusCode != http.StatusOK && resp.Header.Get("X-Clickhouse-Server-Session-Id") == "" { t.Fatalf("unexpected status code: %d; expected: %d", resp.StatusCode, http.StatusOK) } resp.Body.Close() From 26ca1f58de1e5dea02aa65dbc2fba6eda869765e Mon Sep 17 00:00:00 2001 From: Pavel Nemirovsky Date: Fri, 21 May 2021 20:14:20 +0300 Subject: [PATCH 10/16] cleaning unused code --- scope.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/scope.go b/scope.go index fec32513..9b85ef8f 100644 --- a/scope.go +++ b/scope.go @@ -20,8 +20,6 @@ import ( "github.com/prometheus/client_golang/prometheus" ) -// var route = make(map[int]*host) - type scopeID uint64 func (sid scopeID) String() string { From dce7872666145bde90bd43ec145d3a2739dd2a63 Mon Sep 17 00:00:00 2001 From: Pavel Nemirovsky Date: Fri, 21 May 2021 21:18:03 +0300 Subject: [PATCH 11/16] handling session_id verification --- scope.go | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/scope.go b/scope.go index 9b85ef8f..951f927f 100644 --- a/scope.go +++ b/scope.go @@ -842,18 +842,16 @@ func (r *replica) getHostSticky(sessionId string) *host { tmpIdx := (idx + i) % n // handling sticky session - if sessionId != "" { - sessionId := hash(sessionId) - tmpIdx = (sessionId) % n - tmpHSticky := r.hosts[tmpIdx] - log.Debugf("Sticky server candidate is: %s", tmpHSticky.addr) - if !tmpHSticky.isActive() { - log.Debugf("Sticky session server has been picked up, but it is not available") - continue - } - log.Debugf("Sticky session server is: %s, session_id: %d, server_idx: %d, max nodes in pool: %d", tmpHSticky.addr, sessionId, tmpIdx, n) - return tmpHSticky + sessionId := hash(sessionId) + tmpIdx = (sessionId) % n + tmpHSticky := r.hosts[tmpIdx] + log.Debugf("Sticky server candidate is: %s", tmpHSticky.addr) + if !tmpHSticky.isActive() { + log.Debugf("Sticky session server has been picked up, but it is not available") + continue } + log.Debugf("Sticky session server is: %s, session_id: %d, server_idx: %d, max nodes in pool: %d", tmpHSticky.addr, sessionId, tmpIdx, n) + return tmpHSticky } // The returned host may be inactive. This is OK, From 456d694cbc71c1fa974684ed6a30d462a1d85216 Mon Sep 17 00:00:00 2001 From: Pavel Nemirovsky Date: Sun, 23 May 2021 22:59:16 +0300 Subject: [PATCH 12/16] Ability to build version under Docker (Native go build doesn't work due to Datadog lib) --- Dockerfile | 2 ++ Makefile | 12 +++++++++++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index d36cb9c8..36e45588 100644 --- a/Dockerfile +++ b/Dockerfile @@ -5,6 +5,8 @@ RUN go get golang.org/x/lint/golint RUN mkdir -p /go/src/github.com/Vertamedia/chproxy WORKDIR /go/src/github.com/Vertamedia/chproxy COPY . ./ +ARG EXT_BUILD_TAG +ENV EXT_BUILD_TAG ${EXT_BUILD_TAG} RUN make release-build FROM alpine diff --git a/Makefile b/Makefile index e5f137d8..9617aace 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,8 @@ +current_dir = $(pwd) pkgs = $(shell go list ./...) gofiles := $(shell find . -name "*.go" -type f -not -path "./vendor/*") -BUILD_TAG = $(shell git tag --points-at HEAD) +BUILD_TAG = $(or $(shell git tag --points-at HEAD), $(EXT_BUILD_TAG), latest) BUILD_CONSTS = \ -X main.buildTime=`date -u '+%Y-%m-%d_%H:%M:%S'` \ @@ -36,7 +37,16 @@ clean: rm -f chproxy release-build: + @echo "Ver: $(BUILD_TAG), OPTS: $(BUILD_OPTS)" GOOS=linux GOARCH=amd64 go build $(BUILD_OPTS) + rm chproxy-linux-amd64-*.tar.gz + tar czf chproxy-linux-amd64-$(BUILD_TAG).tar.gz chproxy release: format lint test clean release-build + @echo "Ver: $(BUILD_TAG), OPTS: $(BUILD_OPTS)" tar czf chproxy-linux-amd64-$(BUILD_TAG).tar.gz chproxy + +release-build-docker: + @echo "Ver: $(BUILD_TAG)" + @DOCKER_BUILDKIT=1 docker build --target build --build-arg EXT_BUILD_TAG=$(BUILD_TAG) --progress plain -t chproxy-build . + @docker run --rm --entrypoint "/bin/sh" -v $(CURDIR):/host chproxy-build -c "/bin/cp /go/src/github.com/Vertamedia/chproxy/*.tar.gz /host" From eca0b6c15ba9704850d4a5a1141b4eaf548708c1 Mon Sep 17 00:00:00 2001 From: Pavel Nemirovsky Date: Mon, 24 May 2021 12:19:21 +0300 Subject: [PATCH 13/16] Error handling in Makefile --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 9617aace..87b74a9e 100644 --- a/Makefile +++ b/Makefile @@ -39,7 +39,7 @@ clean: release-build: @echo "Ver: $(BUILD_TAG), OPTS: $(BUILD_OPTS)" GOOS=linux GOARCH=amd64 go build $(BUILD_OPTS) - rm chproxy-linux-amd64-*.tar.gz + @if [ -f chproxy-linux-amd64-*.tar.gz ]; then rm chproxy-linux-amd64-*.tar.gz; fi tar czf chproxy-linux-amd64-$(BUILD_TAG).tar.gz chproxy release: format lint test clean release-build From 4304395eededb678a179adc1414d5c39913ea166 Mon Sep 17 00:00:00 2001 From: Pavel Nemirovsky Date: Mon, 24 May 2021 14:41:49 +0300 Subject: [PATCH 14/16] Bump to newer go + make sure static linking works correctly --- Dockerfile | 4 ++-- Makefile | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Dockerfile b/Dockerfile index 36e45588..efa4a22f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,6 @@ -FROM golang:1.13-alpine AS build +FROM golang:1.16-alpine AS build -RUN apk add --update zstd-static zstd-dev make gcc musl-dev git +RUN apk add --no-cache --update zstd-static zstd-dev make gcc musl-dev git libc6-compat RUN go get golang.org/x/lint/golint RUN mkdir -p /go/src/github.com/Vertamedia/chproxy WORKDIR /go/src/github.com/Vertamedia/chproxy diff --git a/Makefile b/Makefile index 87b74a9e..a66b71fc 100644 --- a/Makefile +++ b/Makefile @@ -38,7 +38,7 @@ clean: release-build: @echo "Ver: $(BUILD_TAG), OPTS: $(BUILD_OPTS)" - GOOS=linux GOARCH=amd64 go build $(BUILD_OPTS) + @GOOS=linux GOARCH=amd64 go build $(BUILD_OPTS) @if [ -f chproxy-linux-amd64-*.tar.gz ]; then rm chproxy-linux-amd64-*.tar.gz; fi tar czf chproxy-linux-amd64-$(BUILD_TAG).tar.gz chproxy From e5d9dcaf23f852e6777f89da62370d45085468da Mon Sep 17 00:00:00 2001 From: Pavel Nemirovsky Date: Fri, 4 Jun 2021 01:37:29 +0300 Subject: [PATCH 15/16] Session_id typo fix --- utils.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils.go b/utils.go index c3de05fb..cd40e976 100644 --- a/utils.go +++ b/utils.go @@ -56,7 +56,7 @@ func getSessionId(req *http.Request) string { func getSessionTimeout(req *http.Request) int { params := req.URL.Query() sessionTimeout, err := strconv.Atoi(params.Get("session_timeout")) - if err != nil && sessionTimeout > 0 { + if err == nil && sessionTimeout > 0 { return sessionTimeout } return 60 From 15a760920166656286be53a960887edd6942a411 Mon Sep 17 00:00:00 2001 From: Pavel Nemirovsky Date: Fri, 4 Jun 2021 20:58:12 +0300 Subject: [PATCH 16/16] Adjust tests for session-name and session-timeout --- main_test.go | 22 +++++++++++++++++++--- proxy.go | 5 +++++ 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/main_test.go b/main_test.go index d1c63b1b..77414cab 100644 --- a/main_test.go +++ b/main_test.go @@ -13,6 +13,7 @@ import ( "net/http/httptest" "net/url" "os" + "strconv" "strings" "sync" "testing" @@ -285,11 +286,13 @@ func TestServe(t *testing.T) { startHTTP, }, { - "http POST request with session id", + "http POST request with session_id and session_timeout", "testdata/http-session-id.yml", func(t *testing.T) { + sessionName := "name" + sessionTimeout := 900 req, err := http.NewRequest("POST", - "http://127.0.0.1:9090/?query_id=45395792-a432-4b92-8cc9-536c14e1e1a9&extremes=0&session_id=default-session-id233", + "http://127.0.0.1:9090/?query_id=45395792-a432-4b92-8cc9-536c14e1e1a9&extremes=0&session_id="+sessionName+"&session_timeout="+strconv.Itoa(sessionTimeout), bytes.NewBufferString("SELECT * FROM system.numbers LIMIT 10")) req.Header.Set("Content-Type", "application/x-www-form-urlencoded;") // This makes it work @@ -297,9 +300,22 @@ func TestServe(t *testing.T) { resp, err := http.DefaultClient.Do(req) checkErr(t, err) - if resp.StatusCode != http.StatusOK || resp.StatusCode != http.StatusOK && resp.Header.Get("X-Clickhouse-Server-Session-Id") == "" { + if resp.StatusCode != http.StatusOK { t.Fatalf("unexpected status code: %d; expected: %d", resp.StatusCode, http.StatusOK) } + + // verify correctness of session_id + _sessionName := resp.Header.Get("X-Clickhouse-Server-Session-Id") + if _sessionName != sessionName { + t.Fatalf("unexpected value of X-Clickhouse-Server-Session-Id: %s; expected: %s", _sessionName, sessionName) + } + + // verify correctness of session_id + _sessionTimeout, _ := strconv.Atoi(resp.Header.Get("X-Clickhouse-Server-Session-Timeout")) + if _sessionTimeout != sessionTimeout { + t.Fatalf("unexpected value of X-Clickhouse-Server-Session-Timeout: %d; expected: %d", _sessionTimeout, sessionTimeout) + } + resp.Body.Close() }, startHTTP, diff --git a/proxy.go b/proxy.go index 11684b6d..134dfe71 100644 --- a/proxy.go +++ b/proxy.go @@ -103,6 +103,11 @@ func (rp *reverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) { rw.Header().Set("X-ClickHouse-Server-Session-Id", s.sessionId) } + // publish session_timeout if needed + if s.sessionId != "" { + rw.Header().Set("X-ClickHouse-Server-Session-Timeout", strconv.Itoa(s.sessionTimeout)) + } + if s.user.cache == nil { rp.proxyRequest(s, srw, srw, req) } else {