Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions pkg/cloudscale_ccm/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,29 @@ const (
// This can not be changed once the service is created.
LoadBalancerZone = "k8s.cloudscale.ch/loadbalancer-zone"

// LoadBalancerVIPAddresses defines the virtual IP addresses through which
// incoming traffic is received. this defaults to an automatically assigned
// public IPv4 and IPv6 address.
//
// If you want to use a specific private subnet instead, to load balance
// inside your cluster, you have to specify the subnet the loadbalancer
// should bind to, and optionally what IP address it should use (if you
// don't want an automatically assigned one).
//
// The value of this option is a list of JSON objects, as documented here:
//
// https://www.cloudscale.ch/en/api/v1#vip_addresses-attribute-specification
//
// By default, an empty list is set (to get a public address pair).
//
// This can currently not be changed and will cause an error if attempted,
// as the loadbalancer would have to be recreated, causing potential
// downtime, and a release of any address it held.
//
// To change the address it is recommended to create a new service
// resources instead.
LoadBalancerVIPAddresses = "k8s.cloudscale.ch/loadbalancer-vip-addresses"

// LoadBalancerPoolAlgorithm defines the load balancing algorithm used
// by the loadbalancer. See the API documentation for more information:
//
Expand Down
41 changes: 39 additions & 2 deletions pkg/cloudscale_ccm/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,27 @@ func desiredLbState(
}
}

// Parse the loadbalancer VIP addresses
var vip []cloudscale.VIPAddressRequest

err := serviceInfo.annotationMarshal(LoadBalancerVIPAddresses, &vip)
if err != nil {
return nil, err
}

addrs := make([]cloudscale.VIPAddress, 0, len(vip))
for _, v := range vip {
addrs = append(addrs, cloudscale.VIPAddress{
Address: v.Address,
Subnet: cloudscale.SubnetStub{
UUID: v.Subnet,
},
})
}

s := newLbState(&cloudscale.LoadBalancer{
Name: serviceInfo.annotation(LoadBalancerName),
VIPAddresses: []cloudscale.VIPAddress{},
VIPAddresses: addrs,
Flavor: cloudscale.LoadBalancerFlavorStub{
Slug: serviceInfo.annotation(LoadBalancerFlavor),
},
Expand Down Expand Up @@ -359,7 +377,26 @@ func nextLbActions(
// If the lb requires other changes, inform the user that they need to
// recreate the service themselves.
if len(desired.lb.VIPAddresses) > 0 {
if !slices.Equal(desired.lb.VIPAddresses, actual.lb.VIPAddresses) {
equal := slices.EqualFunc(
desired.lb.VIPAddresses,
actual.lb.VIPAddresses,
func(d cloudscale.VIPAddress, a cloudscale.VIPAddress) bool {

// The desired address may be missing, the actual address
// is always given.
if d.Address != "" && d.Address != a.Address {
return false
}

if d.Subnet.UUID != a.Subnet.UUID {
return false
}

return true
},
)

if !equal {
return nil, fmt.Errorf(
"VIP addresses for %s changed, please re-create the service",
actual.lb.HREF,
Expand Down
11 changes: 11 additions & 0 deletions pkg/cloudscale_ccm/service_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ func (s serviceInfo) annotation(key string) string {
return s.annotationOrDefault(key, "tcp")
case LoadBalancerFlavor:
return s.annotationOrDefault(key, "lb-standard")
case LoadBalancerVIPAddresses:
return s.annotationOrDefault(key, "[]")
case LoadBalancerPoolAlgorithm:
return s.annotationOrDefault(key, "round_robin")
case LoadBalancerHealthMonitorDelayS:
Expand Down Expand Up @@ -155,6 +157,15 @@ func (s serviceInfo) annotationList(key string) ([]string, error) {
return list, nil
}

// annotationMarshal marshals the annotation to the given type.
func (s serviceInfo) annotationMarshal(key string, target any) error {
err := json.Unmarshal([]byte(s.annotation(key)), target)
if err != nil {
return fmt.Errorf("failed to parse %s: %w", key, err)
}
return nil
}

// annotationOrElase returns the annotation with the given key, or returns the
// result of the fallback function if the key does not exist.
func (s serviceInfo) annotationOrElse(key string, fn func() string) string {
Expand Down
2 changes: 1 addition & 1 deletion pkg/internal/actions/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (a *CreateLbAction) Run(
for _, addr := range a.lb.VIPAddresses {
addrs = append(addrs, cloudscale.VIPAddressRequest{
Address: addr.Address,
Subnet: addr.Subnet.CIDR,
Subnet: addr.Subnet.UUID,
})
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/internal/integration/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type IntegrationTestSuite struct {
api *cloudscale.Client
ns string
clusterPrefix string
resources []string
sshkey string
}

func (s *IntegrationTestSuite) SetupSuite() {
Expand All @@ -43,6 +45,7 @@ func (s *IntegrationTestSuite) SetupSuite() {
if !ok {
log.Fatalf("could not find K8TEST_PATH environment variable\n")
}
s.sshkey = fmt.Sprintf("%s/cluster/ssh", k8test)

if prefix, ok := os.LookupEnv("CLUSTER_PREFIX"); ok {
s.clusterPrefix = prefix
Expand Down
47 changes: 46 additions & 1 deletion pkg/internal/integration/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"
"net/netip"
"os/exec"
"strings"
"time"

Expand Down Expand Up @@ -153,7 +154,7 @@ func (s *IntegrationTestSuite) AwaitServiceReady(
service = s.ServiceNamed(name)
s.Require().NotNil(service)

if service.Annotations != nil {
if len(service.Status.LoadBalancer.Ingress) >= 1 {
return service
}
time.Sleep(1 * time.Second)
Expand Down Expand Up @@ -226,6 +227,50 @@ func (s *IntegrationTestSuite) TestServiceEndToEnd() {
s.Assert().NotContains(lines, "Warn")
}

func (s *IntegrationTestSuite) TestServiceVIPAddresses() {

// Get the private subnet used by the nodes
var subnet string
var public string

servers := s.Servers()
s.Require().NotEmpty(servers)
for _, iface := range servers[0].Interfaces {
if iface.Type == "public" {
public = iface.Addresses[0].Address
continue
}

subnet = iface.Addresses[0].Subnet.UUID
break
}

// Deploy a TCP server that returns something
s.T().Log("Creating foo deployment")
s.CreateDeployment("nginx", "nginxdemos/hello:plain-text", 2, 80)

// Expose the deployment using a LoadBalancer service
s.ExposeDeployment("nginx", 80, 80, map[string]string{
"k8s.cloudscale.ch/loadbalancer-vip-addresses": fmt.Sprintf(
`[{"subnet": "%s"}]`, subnet),
})

s.T().Log("Waiting for nginx service to be ready")
service := s.AwaitServiceReady("nginx", 180*time.Second)
s.Require().NotNil(service)

// Use a worker as a jumphost to check if we get "foo"
addr := service.Status.LoadBalancer.Ingress[0].IP

cmd := exec.Command(
"ssh", fmt.Sprintf("ubuntu@%s", public), "-i", s.sshkey,
fmt.Sprintf("curl -s http://%s", addr),
)
out, err := cmd.Output()
s.Require().NoError(err)
s.Require().Contains(string(out), "Server name:")
}

func (s *IntegrationTestSuite) TestServiceTrafficPolicyLocal() {

// Traffic received via default "Cluster" policy is snatted via node.
Expand Down