Skip to content
This repository was archived by the owner on Jun 11, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/infra/internal/domain/clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ func (d *domain) syncKloudliteDeviceOnCluster(ctx InfraContext, gvpnName string)
}

// 2. Grab wireguard config from that device
wgConfig, err := d.getGlobalVPNDeviceWgConfig(ctx, gv.Name, gv.KloudliteDevice.Name)
wgConfig, err := d.getGlobalVPNDeviceWgConfig(ctx, gv.Name, gv.KloudliteDevice.Name, nil)
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import (
)

const (
gvpnConnectionDeviceMethod = "gvpn-connection"
gvpnConnectionDeviceMethod = "gvpn-connection"
kloudliteGlobalVPNDeviceMethod = "kloudlite-global-vpn-device"
)

func (d *domain) getGlobalVPNConnectionPeers(vpns []*entities.GlobalVPNConnection) ([]wgv1.Peer, error) {
Expand Down
44 changes: 31 additions & 13 deletions apps/infra/internal/domain/global-vpn-devices.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,19 @@ func (d *domain) DeleteGlobalVPNDevice(ctx InfraContext, gvpn string, deviceName
}

func (d *domain) ListGlobalVPNDevice(ctx InfraContext, gvpn string, search map[string]repos.MatchFilter, pagination repos.CursorPagination) (*repos.PaginatedRecord[*entities.GlobalVPNDevice], error) {
filter := d.gvpnDevicesRepo.MergeMatchFilters(repos.Filter{
fc.AccountName: ctx.AccountName,
fc.GlobalVPNDeviceGlobalVPNName: gvpn,
fc.GlobalVPNDeviceCreationMethod: map[string]any{"$ne": gvpnConnectionDeviceMethod},
}, search)
filter := d.gvpnDevicesRepo.MergeMatchFilters(
repos.Filter{
fc.AccountName: ctx.AccountName,
fc.GlobalVPNDeviceGlobalVPNName: gvpn,
},
map[string]repos.MatchFilter{
fc.GlobalVPNDeviceCreationMethod: {
MatchType: repos.MatchTypeNotInArray,
NotInArray: []any{gvpnConnectionDeviceMethod, kloudliteGlobalVPNDevice},
},
},
search,
)
return d.gvpnDevicesRepo.FindPaginated(ctx, filter, pagination)
}

Expand Down Expand Up @@ -167,9 +175,11 @@ func (d *domain) createGlobalVPNDevice(ctx InfraContext, gvpnDevice entities.Glo
func (d *domain) buildPeersFromGlobalVPNDevices(ctx InfraContext, gvpn string) (publicPeers []wgv1.Peer, privatePeers []wgv1.Peer, err error) {
devices, err := d.gvpnDevicesRepo.Find(ctx, repos.Query{
Filter: map[string]any{
fc.AccountName: ctx.AccountName,
fc.GlobalVPNDeviceGlobalVPNName: gvpn,
fc.GlobalVPNDeviceCreationMethod: map[string]any{"$ne": gvpnConnectionDeviceMethod},
fc.AccountName: ctx.AccountName,
fc.GlobalVPNDeviceGlobalVPNName: gvpn,
fc.GlobalVPNDeviceCreationMethod: map[string]any{
"$ne": gvpnConnectionDeviceMethod,
},
},
})
if err != nil {
Expand Down Expand Up @@ -202,14 +212,18 @@ func (d *domain) buildPeersFromGlobalVPNDevices(ctx InfraContext, gvpn string) (
}

func (d *domain) GetGlobalVPNDevice(ctx InfraContext, gvpn string, gvpnDevice string) (*entities.GlobalVPNDevice, error) {
if gvpn == "" || gvpnDevice == "" {
return nil, errors.New("invalid global vpn or device")
}

return d.findGlobalVPNDevice(ctx, gvpn, gvpnDevice)
}

func (d *domain) GetGlobalVPNDeviceWgConfig(ctx InfraContext, gvpn string, gvpnDevice string) (string, error) {
return d.getGlobalVPNDeviceWgConfig(ctx, gvpn, gvpnDevice)
return d.getGlobalVPNDeviceWgConfig(ctx, gvpn, gvpnDevice, nil)
}

func (d *domain) getGlobalVPNDeviceWgConfig(ctx InfraContext, gvpn string, gvpnDevice string) (string, error) {
func (d *domain) getGlobalVPNDeviceWgConfig(ctx InfraContext, gvpn string, gvpnDevice string, postUp []string) (string, error) {
device, err := d.findGlobalVPNDevice(ctx, gvpn, gvpnDevice)
if err != nil {
return "", err
Expand Down Expand Up @@ -262,9 +276,13 @@ func (d *domain) getGlobalVPNDeviceWgConfig(ctx InfraContext, gvpn string, gvpnD
}

config, err := wgutils.GenerateWireguardConfig(wgutils.WgConfigParams{
IPAddr: device.IPAddr,
PrivateKey: device.PrivateKey,
DNS: dnsServer,
IPAddr: device.IPAddr,
PrivateKey: device.PrivateKey,
DNS: dnsServer,
PostUp: postUp,
// PostUp: []string{
// "sudo iptables -A INPUT -i wg0 -j DROP",
// },
PublicPeers: publicPeers,
PrivatePeers: privatePeers,
})
Expand Down
4 changes: 2 additions & 2 deletions apps/infra/internal/domain/global-vpn.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@ func (d *domain) createGlobalVPN(ctx InfraContext, gvpn entities.GlobalVPN) (*en
Name: kloudliteGlobalVPNDevice,
},
ResourceMetadata: common.ResourceMetadata{
DisplayName: "kloudlite-platform-device",
DisplayName: kloudliteGlobalVPNDevice,
CreatedBy: common.CreatedOrUpdatedByKloudlite,
LastUpdatedBy: common.CreatedOrUpdatedByKloudlite,
},
AccountName: ctx.AccountName,
GlobalVPNName: gv.Name,
PublicEndpoint: nil,
CreationMethod: gvpnConnectionDeviceMethod,
CreationMethod: kloudliteGlobalVPNDevice,
})
if err != nil {
return nil, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ spec:
capabilities:
add:
- NET_ADMIN
- SYS_MODULE
privileged: true
volumeMounts:
- mountPath: /config/wg_confs/wg0.conf
Expand Down Expand Up @@ -114,7 +115,6 @@ spec:
memory: 100Mi

dnsPolicy: ClusterFirst

volumes:
- name: wg-config
secret:
Expand Down
97 changes: 56 additions & 41 deletions apps/observability/internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,7 @@ var Module = fx.Module(
return infra.NewInfraClient(conn)
}),

fx.Provide(func(cfg *rest.Config) (k8s.Client, error) {
return k8s.NewClient(cfg, nil)
}),

fx.Invoke(func(infraCli infra.InfraClient, kcli k8s.Client, iamCli iam.IAMClient, mux *http.ServeMux, sessStore SessionStore, ev *env.Env, logger logging.Logger) {
fx.Invoke(func(infraCli infra.InfraClient, kcfg *rest.Config, iamCli iam.IAMClient, mux *http.ServeMux, sessStore SessionStore, ev *env.Env, logger logging.Logger) {
sessionMiddleware := httpServer.NewReadSessionMiddlewareHandler(sessStore, constants.CookieName, constants.CacheSessionPrefix)

loggingMiddleware := httpServer.NewLoggingMiddleware(logger)
Expand Down Expand Up @@ -116,10 +112,41 @@ var Module = fx.Module(
step = "15s"
}

if err := queryProm(ev.PromHttpAddr, PromMetricsType(metricsType), map[string]string{
"kl_account_name": accountName,
"kl_cluster_name": clusterName,
"kl_tracking_id": trackingId,
k8sCli, err := func() (k8s.Client, error) {
if strings.HasPrefix(trackingId, "clus-") {
return k8s.NewClient(kcfg, nil)
}

return k8s.NewClient(&rest.Config{
Host: fmt.Sprintf("http://kloudlite-device-proxy-%s.kl-account-%s.svc.cluster.local:8080/clusters/%s", "default", accountName, clusterName),
WrapTransport: func(rt http.RoundTripper) http.RoundTripper {
return httpServer.NewRoundTripperWithHeaders(rt, map[string][]string{
"X-Kloudlite-Authz": {fmt.Sprintf("Bearer %s", ev.GlobalVPNAuthzSecret)},
})
},
}, nil)
}()
if err != nil {
http.Error(w, fmt.Sprintf("failed to create k8s client: %v", err), http.StatusInternalServerError)
return
}

pods, err := ListPods(r.Context(), k8sCli, map[string]string{constants.ObservabilityTrackingKey: trackingId})
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

podNames := make([]string, 0, len(pods))
for _, pod := range pods {
podNames = append(podNames, pod.Name)
}

if err := queryProm(ev.PromHttpAddr, PromMetricsType(metricsType), map[string]PromValue{
"kl_account_name": {Operator: PromOperatorEqual, Value: accountName},
"kl_cluster_name": {Operator: PromOperatorEqual, Value: clusterName},
"kl_tracking_id": {Operator: PromOperatorEqual, Value: trackingId},
"pod_name": {Operator: PromOperatorMatchRegex, Value: fmt.Sprintf("^(%s)$", strings.Join(podNames, ","))},
}, st, et, step, w); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
Expand Down Expand Up @@ -147,40 +174,26 @@ var Module = fx.Module(
clusterName := r.URL.Query().Get("cluster_name")
trackingId := r.URL.Query().Get("tracking_id")

if !strings.HasPrefix(trackingId, "clus-") {
cfg := &rest.Config{
Host: fmt.Sprintf("http://kloudlite-device-proxy-%s.kl-account-%s.svc.cluster.local:8080/clusters/%s", "default", accountName, clusterName),
// Host: fmt.Sprintf("http://kube-access.test-kube-access-globalvpn.svc.cluster.local:8080/clusters/%s", clusterName),
// Host: fmt.Sprintf("http://kloudlite-device-proxy-default.kl-%s.svc.cluster.local:8080/clusters/%s", accountName, clusterName),
k8sCli, err := func() (k8s.Client, error) {
if strings.HasPrefix(trackingId, "clus-") {
return k8s.NewClient(kcfg, nil)
}

//out, err := infraCli.GetClusterKubeconfig(r.Context(), &infra.GetClusterIn{
// UserId: string(sess.UserId),
// UserName: sess.UserName,
// UserEmail: sess.UserEmail,
// AccountName: accountName,
// ClusterName: clusterName,
//})
//if err != nil {
// http.Error(w, err.Error(), 500)
// return
//}
//
//cfg, err := k8s.RestConfigFromKubeConfig(out.Kubeconfig)
//if err != nil {
// http.Error(w, err.Error(), 500)
// return
//}

var err error
kcli, err = k8s.NewClient(cfg, nil)
if err != nil {
http.Error(w, err.Error(), 500)
return
}
return k8s.NewClient(&rest.Config{
Host: fmt.Sprintf("http://kloudlite-device-proxy-%s.kl-account-%s.svc.cluster.local:8080/clusters/%s", "default", accountName, clusterName),
WrapTransport: func(rt http.RoundTripper) http.RoundTripper {
return httpServer.NewRoundTripperWithHeaders(rt, map[string][]string{
"X-Kloudlite-Authz": {fmt.Sprintf("Bearer %s", ev.GlobalVPNAuthzSecret)},
})
},
}, nil)
}()
if err != nil {
http.Error(w, fmt.Sprintf("failed to create k8s client: %v", err), http.StatusInternalServerError)
return
}

pods, err := ListPods(r.Context(), kcli, map[string]string{constants.ObservabilityTrackingKey: trackingId})
pods, err := ListPods(r.Context(), k8sCli, map[string]string{constants.ObservabilityTrackingKey: trackingId})
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
Expand Down Expand Up @@ -212,7 +225,9 @@ var Module = fx.Module(
msg, err := b.ReadBytes('\n')
if err != nil {
if !errors.Is(err, io.EOF) {
http.Error(w, err.Error(), 500)
if !closed {
http.Error(w, err.Error(), 500)
}
}
return
}
Expand All @@ -221,7 +236,7 @@ var Module = fx.Module(
}
}()

if err := StreamLogs(r.Context(), kcli, pods, pw, logger); err != nil {
if err := StreamLogs(r.Context(), k8sCli, pods, pw, logger); err != nil {
http.Error(w, err.Error(), 500)
}
})))
Expand Down
23 changes: 18 additions & 5 deletions apps/observability/internal/app/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,26 @@ const (
WorkspaceTargetNs ObservabilityLabel = "kl_workspace_target_ns"
)

func buildPromQuery(resType PromMetricsType, filters map[string]string) (string, error) {
type PromOperator string

const (
PromOperatorEqual = PromOperator("=")
PromOperatorNotEqual = PromOperator("!=")
PromOperatorMatchRegex = PromOperator("=~")
PromOperatorNotMatchRegex = PromOperator("!~")
)

type PromValue struct {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (testing): Missing test for PromValue struct and its usage

Unit tests should be added to verify the correct usage of the PromValue struct, especially in the buildPromQuery and queryProm functions.

Operator PromOperator
// Value must be a VALID prometheus value suitable for the specified PromOperator
Value any
}

func buildPromQuery(resType PromMetricsType, filters map[string]PromValue) (string, error) {
tags := make([]string, 0, len(filters))

for k, v := range filters {
if v != "" {
tags = append(tags, fmt.Sprintf(`%s=%q`, k, v))
}
tags = append(tags, fmt.Sprintf(`%s%s%q`, k, v.Operator, v.Value))
}

switch resType {
Expand All @@ -115,7 +128,7 @@ func buildPromQuery(resType PromMetricsType, filters map[string]string) (string,
}
}

func queryProm(promAddr string, resType PromMetricsType, filters map[string]string, startTime string, endTime string, step string, writer io.Writer) error {
func queryProm(promAddr string, resType PromMetricsType, filters map[string]PromValue, startTime string, endTime string, step string, writer io.Writer) error {
promQuery, err := buildPromQuery(resType, filters)
if err != nil {
return errors.NewE(err)
Expand Down
2 changes: 2 additions & 0 deletions apps/observability/internal/env/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ type Env struct {
IsDev bool

KubernetesApiProxy string `env:"KUBERNETES_API_PROXY"`

GlobalVPNAuthzSecret string `env:"GLOBAL_VPN_AUTHZ_SECRET" required:"true"`
}

func LoadEnv() (*Env, error) {
Expand Down
17 changes: 17 additions & 0 deletions cmd/global-vpn-kube-proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,34 @@ func main() {
var addr string
var proxyAddr string
var debug bool
var authz string

flag.BoolVar(&debug, "debug", false, "--debug")
flag.StringVar(&addr, "addr", ":8080", "--addr <host:port>")
flag.StringVar(&proxyAddr, "proxy-addr", "", "--proxy-addr <host:port>")
flag.StringVar(&authz, "authz", "", "--authz <authz-token>")
flag.Parse()

if authz == "" {
log.Fatal("authz token, must be provided")
}

reverseProxyMap := make(map[string]*httputil.ReverseProxy)

mux := http.NewServeMux()

kloudliteAuthzHeader := "X-Kloudlite-Authz"

counter := 1
mux.HandleFunc("/clusters/", func(w http.ResponseWriter, req *http.Request) {
token := strings.TrimPrefix(req.Header.Get(kloudliteAuthzHeader), "Bearer ")
fmt.Println("HERE", token)

if len(token) != len(authz) || token != authz {
http.Error(w, "UnAuthorized", http.StatusUnauthorized)
return
}

sp := strings.Split(strings.TrimPrefix(req.URL.Path, "/clusters/"), "/")
if len(sp) <= 1 {
http.Error(w, "invalid request", http.StatusForbidden)
Expand Down Expand Up @@ -55,6 +71,7 @@ func main() {
req.URL.Scheme = "http"
req.URL.Host = strings.ReplaceAll(proxyAddr, "{{.CLUSTER_NAME}}", clusterName)
req.URL.Path = fmt.Sprintf("/%s", strings.Join(sp[1:], "/"))
req.Header.Del(kloudliteAuthzHeader)
},
}
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/functions/list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package functions

func Reduce[T any, V any](items []T, reducerFn func(V, T), value V) V {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Consider renaming reducerFn to reducer for consistency.

The parameter name reducerFn could be simplified to reducer for consistency with common naming conventions.

Suggested change
func Reduce[T any, V any](items []T, reducerFn func(V, T), value V) V {
func Reduce[T any, V any](items []T, reducer func(V, T), value V) V {

for i := range items {
item := items[i]
reducerFn(value, item)
}

return value
}
6 changes: 2 additions & 4 deletions pkg/logging/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func New(options *Options) (Logger, error) {
return cfg
}
pcfg := zap.NewProductionEncoderConfig()
pcfg.EncodeLevel = zapcore.CapitalColorLevelEncoder
pcfg.TimeKey = ""
pcfg.LineEnding = "\n"
return pcfg
Expand Down Expand Up @@ -100,9 +101,6 @@ func New(options *Options) (Logger, error) {
}

lgr := zap.New(zapcore.NewCore(zapcore.NewConsoleEncoder(cfg), os.Stdout, loglevel), zapOpts...)

cLogger := &logger{
zapLogger: lgr.Sugar(),
}
cLogger := &logger{zapLogger: lgr.Sugar()}
return cLogger, nil
}
Loading