From e94e1db9d9cef859c05c80728ff8fdbf992ebe0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Denis=20Krienb=C3=BChl?= Date: Wed, 21 Aug 2024 11:25:16 +0200 Subject: [PATCH 1/2] Add a helper command to inspect PROXY connections The server can be run as follows: cd cmd/http-echo go run main.go Then in a separate shell: curl http://127.0.0.1/proxy-protocol/used --haproxy-protocol false curl http://127.0.0.1/proxy-protocol/used --haproxy-protocol true This tool could live outside this repository and may one day just do that, once it has more fetures and could replace the nginx hello server used in other places. --- cmd/http-echo/go.mod | 5 +++ cmd/http-echo/go.sum | 2 ++ cmd/http-echo/main.go | 73 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 80 insertions(+) create mode 100644 cmd/http-echo/go.mod create mode 100644 cmd/http-echo/go.sum create mode 100644 cmd/http-echo/main.go diff --git a/cmd/http-echo/go.mod b/cmd/http-echo/go.mod new file mode 100644 index 0000000..bc48f99 --- /dev/null +++ b/cmd/http-echo/go.mod @@ -0,0 +1,5 @@ +module github.com/cloudscale-ch/cloudscale-cloud-controller-manager/cmd/http-echo + +go 1.23.0 + +require github.com/pires/go-proxyproto v0.7.0 diff --git a/cmd/http-echo/go.sum b/cmd/http-echo/go.sum new file mode 100644 index 0000000..0633665 --- /dev/null +++ b/cmd/http-echo/go.sum @@ -0,0 +1,2 @@ +github.com/pires/go-proxyproto v0.7.0 h1:IukmRewDQFWC7kfnb66CSomk2q/seBuilHBYFwyq0Hs= +github.com/pires/go-proxyproto v0.7.0/go.mod h1:Vz/1JPY/OACxWGQNIRY2BeyDmpoaWmEP40O9LbuiFR4= diff --git a/cmd/http-echo/main.go b/cmd/http-echo/main.go new file mode 100644 index 0000000..1436eb7 --- /dev/null +++ b/cmd/http-echo/main.go @@ -0,0 +1,73 @@ +// A http echo server to get information about connections made to it. +package main + +import ( + "context" + "flag" + "fmt" + "net" + "net/http" + "time" + + proxyproto "github.com/pires/go-proxyproto" +) + +func main() { + host := flag.String("host", "127.0.0.1", "Host to connect to") + port := flag.Int("port", 8080, "Port to connect to") + + flag.Parse() + + serve(*host, *port) +} + +// log http requests in basic fashion +func log(h http.Handler) http.Handler { + return http.HandlerFunc( + func(w http.ResponseWriter, r *http.Request) { + h.ServeHTTP(w, r) + fmt.Printf("%s %s (%s)\n", r.Method, r.RequestURI, r.RemoteAddr) + }) +} + +// serve HTTP API on the given host and port +func serve(host string, port int) { + router := http.NewServeMux() + + // Returns 'true' if the PROXY protocol was used for the given connection + router.HandleFunc("GET /proxy-protocol/used", + func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, r.Context().Value("HasProxyHeader")) + }) + + addr := fmt.Sprintf("%s:%d", host, port) + + server := http.Server{ + Addr: addr, + Handler: log(router), + ConnContext: func(ctx context.Context, c net.Conn) context.Context { + hasProxyHeader := false + + if c, ok := c.(*proxyproto.Conn); ok { + hasProxyHeader = c.ProxyHeader() != nil + } + + return context.WithValue(ctx, "HasProxyHeader", hasProxyHeader) + }, + } + + listener, err := net.Listen("tcp", server.Addr) + if err != nil { + panic(err) + } + + fmt.Printf("Listening on %s\n", addr) + + proxyListener := &proxyproto.Listener{ + Listener: listener, + ReadHeaderTimeout: 10 * time.Second, + } + defer proxyListener.Close() + + server.Serve(proxyListener) +} From 0c0afd8bf323cf6033cc853c699a7a75d1231106 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Denis=20Krienb=C3=BChl?= Date: Thu, 22 Aug 2024 17:27:20 +0200 Subject: [PATCH 2/2] Add option to prevent cluster-traffic from bypassing loadbalancers This is accomplished with two new annotations: - `k8s.cloudscale.ch/loadbalancer-force-hostname` - `k8s.cloudscale.ch/loadbalancer-ip-mode` The former forces a hostname to be reported for loadbalancer ingress, the latter adds support for the new IPMode config available by default on Kubernetes 1.30, and feature-gated on 1.29. This is required for clusters that use the `proxy` or `proxyv2` protocol for any of their loadbalancers, and send traffic from inside the cluster to the loadbalancers. In such a constellation, traffic may not be sent through the loadbalancer, unless the hostname is set (for older clusters). For newer cluster, the default "IP Mode" used is "Proxy", as that is the least surprising setting. References: - https://kubernetes.io/blog/2023/12/18/kubernetes-1-29-feature-loadbalancer-ip-mode-alpha/ - https://github.com/cloudscale-ch/cloudscale-cloud-controller-manager/issues/15 --- .github/workflows/ccm-integration-tests.yml | 1 + pkg/cloudscale_ccm/loadbalancer.go | 132 ++++++++++-- pkg/cloudscale_ccm/service_info.go | 4 + pkg/internal/integration/service_test.go | 213 +++++++++++++++++++- pkg/internal/kubeutil/annotate.go | 28 +++ 5 files changed, 352 insertions(+), 26 deletions(-) diff --git a/.github/workflows/ccm-integration-tests.yml b/.github/workflows/ccm-integration-tests.yml index 7413053..857b1e2 100644 --- a/.github/workflows/ccm-integration-tests.yml +++ b/.github/workflows/ccm-integration-tests.yml @@ -129,6 +129,7 @@ jobs: env: CLOUDSCALE_API_TOKEN: ${{ secrets.CLOUDSCALE_API_TOKEN }} + HTTP_ECHO_BRANCH: ${{ vars.HTTP_ECHO_BRANCH }} KUBERNETES: '${{ matrix.kubernetes }}' SUBNET: '${{ matrix.subnet }}' CLUSTER_PREFIX: '${{ matrix.cluster_prefix }}' diff --git a/pkg/cloudscale_ccm/loadbalancer.go b/pkg/cloudscale_ccm/loadbalancer.go index 20bb182..e2048d3 100644 --- a/pkg/cloudscale_ccm/loadbalancer.go +++ b/pkg/cloudscale_ccm/loadbalancer.go @@ -12,6 +12,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" + "k8s.io/utils/ptr" ) // Annotations used by the loadbalancer integration of cloudscale_ccm. Those @@ -133,6 +134,60 @@ const ( // as all pools have to be recreated. LoadBalancerPoolProtocol = "k8s.cloudscale.ch/loadbalancer-pool-protocol" + // LoadBalancerForceHostname forces the CCM to report a specific hostname + // to Kubernetes when returning the loadbalancer status, instead of + // reporting the IP address(es). + // + // The hostname used should point to the same IP address that would + // otherwise be reported. This is used as a workaround for clusters that + // do not support status.loadBalancer.ingress.ipMode, and use `proxy` or + // `proxyv2` protocol. + // + // For newer clusters, .status.loadBalancer.ingress.ipMode is automatically + // set to "Proxy", unless LoadBalancerIPMode is set to "VIP" + // + // For more information about this workaround see + // https://kubernetes.io/blog/2023/12/18/kubernetes-1-29-feature-loadbalancer-ip-mode-alpha/ + // + // To illustrate, here's an example of a load balancer status shown on + // a Kubernetes 1.29 service with default settings: + // + // apiVersion: v1 + // kind: Service + // ... + // status: + // loadBalancer: + // ingress: + // - ip: 45.81.71.1 + // - ip: 2a06:c00::1 + // + // Using the annotation causes the status to use the given value instead: + // + // apiVersion: v1 + // kind: Service + // metadata: + // annotations: + // k8s.cloudscale.ch/loadbalancer-force-hostname: example.org + // status: + // loadBalancer: + // ingress: + // - hostname: example.org + // + // If you are not using the `proxy` or `proxyv2` protocol, or if you are + // on Kubernetes 1.30 or newer, you probly do not need this setting. + // + // See `LoadBalancerIPMode` below. + LoadBalancerForceHostname = "k8s.cloudscale.ch/loadbalancer-force-hostname" + + // LoadBalancerIPMode defines the IP mode reported to Kubernetes for the + // loadbalancers managed by this CCM. It defaults to "Proxy", where all + // traffic destined to the load balancer is sent through the load balancer, + // even if coming from inside the cluster. + // + // The older behavior, where traffic inside the cluster is directly + // sent to the backend service, can be activated by using "VIP" instead. + LoadBalancerIPMode = "k8s.cloudscale.ch/loadbalancer-ip-mode" + // LoadBalancerHealthMonitorDelayS is the delay between two successive // checks, in seconds. Defaults to 2. // @@ -269,7 +324,13 @@ func (l *loadbalancer) GetLoadBalancer( return nil, false, nil } - return loadBalancerStatus(instance), true, nil + result, err := l.loadBalancerStatus(serviceInfo, instance) + if err != nil { + return nil, true, fmt.Errorf( + "unable to get load balancer state for %s: %w", service.Name, err) + } + + return result, true, nil } // GetLoadBalancerName returns the name of the load balancer. Implementations @@ -361,7 +422,13 @@ func (l *loadbalancer) EnsureLoadBalancer( "unable to annotate service %s: %w", service.Name, err) } - return loadBalancerStatus(actual.lb), nil + result, err := l.loadBalancerStatus(serviceInfo, actual.lb) + if err != nil { + return nil, fmt.Errorf( + "unable to get load balancer state for %s: %w", service.Name, err) + } + + return result, nil } // UpdateLoadBalancer updates hosts under the specified load balancer. @@ -432,6 +499,53 @@ func (l *loadbalancer) EnsureLoadBalancerDeleted( }) } +// loadBalancerStatus generates the v1.LoadBalancerStatus for the given +// loadbalancer, as required by Kubernetes. +func (l *loadbalancer) loadBalancerStatus( + serviceInfo *serviceInfo, + lb *cloudscale.LoadBalancer, +) (*v1.LoadBalancerStatus, error) { + + status := v1.LoadBalancerStatus{} + + // When forcing the use of a hostname, there's exactly one ingress item + hostname := serviceInfo.annotation(LoadBalancerForceHostname) + if len(hostname) > 0 { + status.Ingress = []v1.LoadBalancerIngress{{Hostname: hostname}} + return &status, nil + } + + // Otherwise there as many items as there are addresses + status.Ingress = make([]v1.LoadBalancerIngress, len(lb.VIPAddresses)) + + var ipmode *v1.LoadBalancerIPMode + switch serviceInfo.annotation(LoadBalancerIPMode) { + case "Proxy": + ipmode = ptr.To(v1.LoadBalancerIPModeProxy) + case "VIP": + ipmode = ptr.To(v1.LoadBalancerIPModeVIP) + default: + return nil, fmt.Errorf( + "unsupported IP mode: '%s', must be 'Proxy' or 'VIP'", *ipmode) + } + + // On newer releases, we explicitly configure the IP mode + supportsIPMode, err := kubeutil.IsKubernetesReleaseOrNewer(l.k8s, 1, 30) + if err != nil { + return nil, fmt.Errorf("failed to get load balancer status: %w", err) + } + + for i, address := range lb.VIPAddresses { + status.Ingress[i].IP = address.Address + + if supportsIPMode { + status.Ingress[i].IPMode = ipmode + } + } + + return &status, nil +} + // ensureValidConfig ensures that the configuration can be applied at all, // acting as a gate that ensures certain invariants before any changes are // made. @@ -545,17 +659,3 @@ func (l *loadbalancer) findIPsAssignedElsewhere( return conflicts, nil } - -// loadBalancerStatus generates the v1.LoadBalancerStatus for the given -// loadbalancer, as required by Kubernetes. -func loadBalancerStatus(lb *cloudscale.LoadBalancer) *v1.LoadBalancerStatus { - - status := v1.LoadBalancerStatus{} - status.Ingress = make([]v1.LoadBalancerIngress, len(lb.VIPAddresses)) - - for i, address := range lb.VIPAddresses { - status.Ingress[i].IP = address.Address - } - - return &status -} diff --git a/pkg/cloudscale_ccm/service_info.go b/pkg/cloudscale_ccm/service_info.go index 10f0603..9379294 100644 --- a/pkg/cloudscale_ccm/service_info.go +++ b/pkg/cloudscale_ccm/service_info.go @@ -82,6 +82,10 @@ func (s serviceInfo) annotation(key string) string { return s.annotationOrDefault(key, "") case LoadBalancerPoolProtocol: return s.annotationOrDefault(key, "tcp") + case LoadBalancerForceHostname: + return s.annotationOrDefault(key, "") + case LoadBalancerIPMode: + return s.annotationOrDefault(key, "Proxy") case LoadBalancerFlavor: return s.annotationOrDefault(key, "lb-standard") case LoadBalancerVIPAddresses: diff --git a/pkg/internal/integration/service_test.go b/pkg/internal/integration/service_test.go index aecfaa9..1dc67dd 100644 --- a/pkg/internal/integration/service_test.go +++ b/pkg/internal/integration/service_test.go @@ -6,7 +6,9 @@ import ( "context" "fmt" "io" + "math/rand" "net/netip" + "os" "os/exec" "strings" "time" @@ -16,15 +18,24 @@ import ( "github.com/cloudscale-ch/cloudscale-cloud-controller-manager/pkg/internal/testkit" cloudscale "github.com/cloudscale-ch/cloudscale-go-sdk/v4" appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/wait" ) func (s *IntegrationTestSuite) CreateDeployment( name string, image string, replicas int32, port int32, args ...string) { + var command []string + + if len(args) > 0 { + command = args[:1] + args = args[1:] + } + spec := appsv1.DeploymentSpec{ Replicas: &replicas, Selector: &metav1.LabelSelector{ @@ -41,9 +52,10 @@ func (s *IntegrationTestSuite) CreateDeployment( Spec: v1.PodSpec{ Containers: []v1.Container{ { - Name: name, - Image: image, - Args: args, + Name: name, + Image: image, + Command: command, + Args: args, Ports: []v1.ContainerPort{ {ContainerPort: port}, }, @@ -82,19 +94,105 @@ func (s *IntegrationTestSuite) ExposeDeployment( }, } - _, err := s.k8s.CoreV1().Services(s.ns).Create( - context.Background(), - &v1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Annotations: annotations, + service, err := s.k8s.CoreV1().Services(s.ns).Get( + context.Background(), name, metav1.GetOptions{}, + ) + + if err != nil { + _, err = s.k8s.CoreV1().Services(s.ns).Create( + context.Background(), + &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Annotations: annotations, + }, + Spec: spec, }, - Spec: spec, + metav1.CreateOptions{}, + ) + s.Require().NoError(err) + } else { + service.Spec = spec + service.ObjectMeta.Annotations = annotations + + _, err = s.k8s.CoreV1().Services(s.ns).Update( + context.Background(), + service, + metav1.UpdateOptions{}, + ) + s.Require().NoError(err) + } +} + +// RunJob starts a single job and then awaits the result, returing it as string +func (s *IntegrationTestSuite) RunJob( + image string, timeout time.Duration, cmd ...string) string { + + ctx, _ := context.WithTimeout(context.Background(), timeout) + name := fmt.Sprintf("job-%08x", rand.Uint32()) + + // Specify the job + spec := batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, }, + Spec: batchv1.JobSpec{ + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + RestartPolicy: v1.RestartPolicyNever, + Containers: []v1.Container{ + { + Name: name, + Image: image, + Command: cmd, + }, + }, + }, + }, + }, + } + + // Start it + _, err := s.k8s.BatchV1().Jobs(s.ns).Create( + ctx, + &spec, metav1.CreateOptions{}, ) s.Require().NoError(err) + + // Wait for completion + var job *batchv1.Job + err = wait.PollUntilContextTimeout(ctx, 1*time.Second, timeout, true, + func(ctx context.Context) (bool, error) { + job, err = s.k8s.BatchV1().Jobs(s.ns).Get( + ctx, name, metav1.GetOptions{}) + + if err != nil { + return false, err + } + return job.Status.Succeeded > 0, nil + }, + ) + + s.Require().NoError(err) + + // Get pod + pods, err := s.k8s.CoreV1().Pods(s.ns).List( + ctx, metav1.ListOptions{ + LabelSelector: fmt.Sprintf("job-name=%s", name), + }, + ) + + s.Require().NoError(err) + s.Require().Len(pods.Items, 1) + + logs, err := s.k8s.CoreV1().Pods(s.ns).GetLogs( + pods.Items[0].Name, &v1.PodLogOptions{}).Do(ctx).Raw() + + s.Require().NoError(err) + + return string(logs) } // CCMLogs returns all the logs of the CCM since the given time. @@ -493,3 +591,98 @@ func (s *IntegrationTestSuite) TestFloatingIPConflicts() { lines := s.CCMLogs(start) s.Assert().Contains(lines, "assigned to another service") } + +func (s *IntegrationTestSuite) TestServiceProxyProtocol() { + + // Get the branch to run http-echo with (in the future, we might + // offer this in a separate container). + branch := os.Getenv("HTTP_ECHO_BRANCH") + if len(branch) == 0 { + branch = "main" + } + + // Deploy our http-echo server to check for proxy connections + s.T().Log("Creating http-echo deployment", "branch", branch) + s.CreateDeployment("http-echo", "golang", 2, 80, "bash", "-c", fmt.Sprintf(` + git clone https://github.com/cloudscale-ch/cloudscale-cloud-controller-manager ccm; + cd ccm; + git checkout %s || exit 1; + cd cmd/http-echo; + go run main.go -host 0.0.0.0 -port 80 + `, branch)) + + // Expose the deployment using a LoadBalancer service + s.ExposeDeployment("http-echo", 80, 80, map[string]string{ + "k8s.cloudscale.ch/loadbalancer-pool-protocol": "proxy", + + // Make sure to get the default behavior of older Kubernetes releases, + // even on newer releases. + "k8s.cloudscale.ch/loadbalancer-ip-mode": "VIP", + }) + + // Wait for the service to be ready + s.T().Log("Waiting for http-echo service to be ready") + service := s.AwaitServiceReady("http-echo", 180*time.Second) + s.Require().NotNil(service) + + addr := service.Status.LoadBalancer.Ingress[0].IP + url := fmt.Sprintf("http://%s/proxy-protocol/used", addr) + + // Wait for respones to work + s.T().Log("Waiting for http-echo responses") + errors := 0 + + for i := 0; i < 100; i++ { + _, err := testkit.HTTPRead(url) + + if err == nil { + break + } else { + s.T().Logf("Request %d failed: %s", i, err) + errors++ + } + + time.Sleep(5 * time.Millisecond) + } + + // Make sure our HTTP requests get wrapped in the PROXY protocol + s.T().Log("Testing PROXY protocol from outside") + + used, err := testkit.HTTPRead(url) + s.Assert().NoError(err) + s.Assert().Equal("true\n", used) + + // Sending a request from inside the cluster does not work, unless we + // use a workaround. + s.T().Log("Testing PROXY protocol from inside") + used = s.RunJob("curlimages/curl", 90*time.Second, "curl", "-s", url) + s.Assert().Equal("false\n", used) + + // The workaround works by using an IP that needs to be reolved via name + s.ExposeDeployment("http-echo", 80, 80, map[string]string{ + "k8s.cloudscale.ch/loadbalancer-pool-protocol": "proxy", + "k8s.cloudscale.ch/loadbalancer-ip-mode": "VIP", + "k8s.cloudscale.ch/loadbalancer-force-hostname": fmt.Sprintf( + "%s.cust.cloudscale.ch", + strings.ReplaceAll(addr, ".", "-"), + ), + }) + + s.T().Log("Testing PROXY protocol from inside with workaround") + used = s.RunJob("curlimages/curl", 90*time.Second, "curl", "-s", url) + s.Assert().Equal("true\n", used) + + // On newer Kubernetes releases, the defaults just work + newer, err := kubeutil.IsKubernetesReleaseOrNewer(s.k8s, 1, 30) + s.Assert().NoError(err) + + if newer { + s.ExposeDeployment("http-echo", 80, 80, map[string]string{ + "k8s.cloudscale.ch/loadbalancer-pool-protocol": "proxy", + }) + + s.T().Log("Testing PROXY protocol on newer Kubernetes releases") + used = s.RunJob("curlimages/curl", 90*time.Second, "curl", "-s", url) + s.Assert().Equal("true\n", used) + } +} diff --git a/pkg/internal/kubeutil/annotate.go b/pkg/internal/kubeutil/annotate.go index 3e9b8f7..3ecc220 100644 --- a/pkg/internal/kubeutil/annotate.go +++ b/pkg/internal/kubeutil/annotate.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "strconv" "strings" v1 "k8s.io/api/core/v1" @@ -129,3 +130,30 @@ func PatchServiceExternalTrafficPolicy( return PatchService(ctx, client, service, operations) } + +// IsKubernetesReleaseOrNewer fetches the Kubernetes release and returns +// true if matches the given major.minor release, or is newer. +func IsKubernetesReleaseOrNewer( + client kubernetes.Interface, + major int, + minor int, +) (bool, error) { + release, err := client.Discovery().ServerVersion() + if err != nil { + return false, fmt.Errorf("failed to read kubernetes version: %w", err) + } + + k8sMajor, err := strconv.Atoi(release.Major) + if err != nil { + return false, fmt.Errorf( + "failed to convert '%s' to int: %w", release.Major, err) + } + + k8sMinor, err := strconv.Atoi(release.Minor) + if err != nil { + return false, fmt.Errorf( + "failed to convert '%s' to int: %w", release.Minor, err) + } + + return k8sMajor > major || (k8sMajor == major && k8sMinor >= minor), nil +}