Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
43d4458
Use http.DefaultTransport dialer settings in h2c.DefaultTransport
tanzeeb Nov 22, 2018
0e2e7c0
Support streaming in activator/util.Rewinder
tanzeeb Nov 22, 2018
3fdab18
Use custom timeout handler in queue-proxy
tanzeeb Nov 22, 2018
b2254d9
Use http2 instead of http as the k8s service port names
tanzeeb Nov 22, 2018
79278f3
Enforce max content length for streaming requests in activator
tanzeeb Nov 23, 2018
bee2aa4
Don't read original reader again after rewinder is closed
tanzeeb Nov 24, 2018
a1ca7b1
Use chan struct{} instead of chan bool in queue-proxy timeoutHandler
tanzeeb Nov 26, 2018
5239099
Move LimitReadCloser from activator handler to activator util and add…
tanzeeb Nov 26, 2018
4d50862
Add/fix godoc comments for activator/util
tanzeeb Nov 27, 2018
816afa8
Explictly set Dialer config defaults in h2c.DefaultTransport
tanzeeb Nov 27, 2018
183ca2b
Remove timeoutHandler from queue-proxy
tanzeeb Jan 21, 2019
9258466
Revert k8s Service port name from http2 to http
tanzeeb Jan 21, 2019
a25b30a
Run activator on two ports, to support activating both http1 and h2c …
tanzeeb Jan 22, 2019
e8200a5
Add RevisionProtocolType to API
tanzeeb Jan 24, 2019
2987952
Dynamically select 'http' or 'http2' for k8s service port name
tanzeeb Jan 24, 2019
7ce391f
Dynamically route to the http1 or h2c activator service port based on…
tanzeeb Jan 24, 2019
c7f6743
Add test coverage for activator.ServicePort
tanzeeb Jan 24, 2019
1700fbc
Print errors in EnforceLengthHandler tests
tanzeeb Jan 30, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions cmd/activator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,14 +184,22 @@ func main() {
logger.Fatalw("Failed to start configuration manager", zap.Error(err))
}

srv := h2c.NewServer(":8080", ah)
http1Srv := h2c.NewServer(":8080", ah)
go func() {
if err := srv.ListenAndServe(); err != nil {
if err := http1Srv.ListenAndServe(); err != nil {
logger.Errorw("Error running HTTP server", zap.Error(err))
}
}()

h2cSrv := h2c.NewServer(":8081", ah)
go func() {
if err := h2cSrv.ListenAndServe(); err != nil {
logger.Errorw("Error running HTTP server", zap.Error(err))
}
}()

<-stopCh
a.Shutdown()
srv.Shutdown(context.TODO())
http1Srv.Shutdown(context.TODO())
h2cSrv.Shutdown(context.TODO())
}
1 change: 1 addition & 0 deletions cmd/queue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ func handler(w http.ResponseWriter, r *http.Request) {
} else {
proxy.ServeHTTP(w, r)
}

}

// Sets up /health and /quitquitquit endpoints.
Expand Down
4 changes: 4 additions & 0 deletions config/400-activator-service.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ spec:
port: 80
targetPort: 8080
nodePort: # empty removing existing value to avoid upgrade error # TODO delete after v0.4
- name: http2
protocol: TCP
port: 81
targetPort: 8081
- name: metrics
protocol: TCP
port: 9090
Expand Down
4 changes: 3 additions & 1 deletion config/activator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ spec:
# and substituted here.
image: github.com/knative/serving/cmd/activator
ports:
- name: activator-port
- name: http1-port
containerPort: 8080
- name: h2c-port
containerPort: 8081
- name: metrics-port
containerPort: 9090
args:
Expand Down
17 changes: 17 additions & 0 deletions pkg/activator/activator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ limitations under the License.

package activator

import "github.com/knative/serving/pkg/apis/serving/v1alpha1"

const (
// K8sServiceName is the name of the activator service
K8sServiceName = "activator-service"
Expand All @@ -25,6 +27,11 @@ const (
RevisionHeaderName string = "knative-serving-revision"
// RevisionHeaderNamespace is the header key for revision's namespace
RevisionHeaderNamespace string = "knative-serving-namespace"

// ServicePortHTTP1 is the port number for activating HTTP1 revisions
ServicePortHTTP1 int32 = 80
// ServicePortHTTP1 is the port number for activating H2C revisions
ServicePortH2C int32 = 81
)

// Activator provides an active endpoint for a revision or an error and
Expand Down Expand Up @@ -53,3 +60,13 @@ type ActivationResult struct {
ConfigurationName string
Error error
}

// ServicePort returns the activator service port for the given Revision protocol.
// Default is `ServicePortHTTP1`.
func ServicePort(protocol v1alpha1.RevisionProtocolType) int32 {
if protocol == v1alpha1.RevisionProtocolH2C {
return ServicePortH2C
}

return ServicePortHTTP1
}
49 changes: 49 additions & 0 deletions pkg/activator/activator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
Copyright 2018 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package activator

import (
"testing"

"github.com/knative/serving/pkg/apis/serving/v1alpha1"
)

func TestServicePort(t *testing.T) {
tests := []struct {
name string
protocol v1alpha1.RevisionProtocolType
port int32
}{{
name: "h2c",
protocol: v1alpha1.RevisionProtocolH2C,
port: ServicePortH2C,
}, {
name: "http1",
protocol: v1alpha1.RevisionProtocolHTTP1,
port: ServicePortHTTP1,
}}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
want := tt.port
Comment thread
tanzeeb marked this conversation as resolved.
got := ServicePort(tt.protocol)

if want != got {
t.Errorf("unexpected port. want %d, got %d", want, got)
}
})
}
}
6 changes: 6 additions & 0 deletions pkg/activator/handler/enforce_length_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ package handler

import (
"net/http"

"github.com/knative/serving/pkg/activator/util"
)

// EnforceMaxContentLengthHandler prevents uploads larger than `MaxContentLengthBytes`
Expand All @@ -24,10 +26,14 @@ type EnforceMaxContentLengthHandler struct {
}

func (h *EnforceMaxContentLengthHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// If we have a ContentLength, we can fail early
if r.ContentLength > h.MaxContentLengthBytes {
w.WriteHeader(http.StatusRequestEntityTooLarge)
return
}

// Enforce MaxContentLengthBytes
r.Body = util.LimitReadCloser(r.Body, h.MaxContentLengthBytes)
Comment thread
tanzeeb marked this conversation as resolved.

h.NextHandler.ServeHTTP(w, r)
}
65 changes: 59 additions & 6 deletions pkg/activator/handler/enforce_length_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,51 +13,104 @@ limitations under the License.
package handler

import (
"bytes"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
"strings"
"testing"
)

func TestEnforceMaxContentLengthHandler(t *testing.T) {
payload := "SAMPLE PAYLOAD"
Comment thread
tanzeeb marked this conversation as resolved.

// httptest.NewRequest will set ContentLength for strings.NewReader
fixed := func(p string) io.Reader { return strings.NewReader(p) }
stream := func(p string) io.Reader { return ioutil.NopCloser(strings.NewReader(p)) }

examples := []struct {
label string
maxUpload int
request io.Reader
response string
status int
}{
{
label: "under",
label: "under with ContentLength",
maxUpload: len(payload) + 1,
request: fixed(payload),
status: http.StatusOK,
response: payload,
},
{
label: "equal",
label: "equal with ContentLength",
maxUpload: len(payload),
request: fixed(payload),
status: http.StatusOK,
response: payload,
},
{
label: "over",
label: "over with ContentLength",
maxUpload: len(payload) - 1,
request: fixed(payload),
status: http.StatusRequestEntityTooLarge,
response: "",
},
{
label: "under without ContentLength",
maxUpload: len(payload) + 1,
request: stream(payload),
status: http.StatusOK,
response: payload,
},
{
label: "equal without ContentLength",
maxUpload: len(payload),
request: stream(payload),
status: http.StatusOK,
response: payload,
},
{
label: "over without ContentLength",
maxUpload: len(payload) - 1,
request: stream(payload),
status: http.StatusOK,
response: payload[0 : len(payload)-1],
},
}

for _, e := range examples {
t.Run(e.label, func(t *testing.T) {
// Return 200 and request body
baseHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Comment thread
tanzeeb marked this conversation as resolved.
w.WriteHeader(http.StatusOK)

if r.Body != nil {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
t.Fatalf("Error reading request body: %v", err)
}
w.Write(body)
}
})
handler := EnforceMaxContentLengthHandler{NextHandler: baseHandler, MaxContentLengthBytes: int64(e.maxUpload)}

resp := httptest.NewRecorder()
req := httptest.NewRequest("POST", "http://example.com", bytes.NewBufferString(payload))
req := httptest.NewRequest("POST", "http://example.com", e.request)

handler.ServeHTTP(resp, req)

gotBody, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatalf("Error reading response body: %v", err)
}

if e.response != string(gotBody) {
Comment thread
tanzeeb marked this conversation as resolved.
t.Errorf("Unexpected response body. Want %q, got %q", e.response, gotBody)
}

if resp.Code != e.status {
t.Errorf("Unexpected response status for payload %q. Want %d, got %d", payload, e.status, resp.Code)
t.Errorf("Unexpected response status. Want %d, got %d", e.status, resp.Code)
}
})
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/activator/revision.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (r *revisionActivator) revisionEndpoint(revision *v1alpha1.Revision) (end E
// Search for the correct port in all the service ports.
port := int32(-1)
for _, p := range svc.Spec.Ports {
if p.Name == revisionresources.ServicePortName {
if p.Name == revisionresources.ServicePortName(revision) {
port = p.Port
break
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/activator/revision_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ func newRevisionBuilder(labels map[string]string) *revisionBuilder {
Spec: v1alpha1.RevisionSpec{
Container: corev1.Container{
Image: "gcr.io/repo/image",
Ports: []corev1.ContainerPort{{Name: "h2c"}},
},
},
Status: v1alpha1.RevisionStatus{
Expand Down Expand Up @@ -335,7 +336,7 @@ func newServiceBuilder() *serviceBuilder {
},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{{
Name: revisionresources.ServicePortName,
Name: revisionresources.ServicePortNameH2C,
Port: v1alpha1.DefaultUserPort,
}, {
Name: "anotherport",
Expand Down
34 changes: 34 additions & 0 deletions pkg/activator/util/io.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
Copyright 2018 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package util

import "io"

// LimitReadCloser returns a ReadCloser wrapped with an `io.LimitReader`
func LimitReadCloser(rc io.ReadCloser, l int64) io.ReadCloser {
return &readCloser{io.LimitReader(rc, l), rc}
}

type readCloser struct {
Reader io.Reader
Closer io.Closer
}

func (lrc *readCloser) Read(b []byte) (int, error) {
return lrc.Reader.Read(b)
}

func (lrc *readCloser) Close() error {
return lrc.Closer.Close()
}
36 changes: 32 additions & 4 deletions pkg/activator/util/io_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,15 @@ limitations under the License.
*/
package util

import "io"
import (
"io"
"io/ioutil"
"strings"
"testing"
)

type spyReadCloser struct {
io.Reader
io.ReadCloser
Closed bool
ReadAfterClose bool
}
Expand All @@ -25,11 +30,34 @@ func (s *spyReadCloser) Read(b []byte) (n int, err error) {
s.ReadAfterClose = true
}

return s.Reader.Read(b)
return s.ReadCloser.Read(b)
}

func (s *spyReadCloser) Close() error {
s.Closed = true

return nil
return s.ReadCloser.Close()
}

func TestLimitReadCloser(t *testing.T) {
want := "test"
r := strings.NewReader(want + " foo")
rc := &spyReadCloser{ReadCloser: ioutil.NopCloser(r)}

lrc := LimitReadCloser(rc, int64(len(want)))

got, err := ioutil.ReadAll(lrc)
lrc.Close()

if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

if string(got) != want {
t.Fatalf("Unexpected body. Want %q, got %q", want, got)
}

if !rc.Closed {
t.Fatalf("Expected ReadCloser to be closed.")
}
}
Loading