Skip to content
This repository was archived by the owner on Jun 11, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions apps/infra/internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ type (
var Module = fx.Module(
"app",
repos.NewFxMongoRepo[*entities.Cluster]("clusters", "clus", entities.ClusterIndices),
repos.NewFxMongoRepo[*entities.ClusterConnection]("cluster-connections", "clus-cn", entities.ClusterConnIndices),
repos.NewFxMongoRepo[*entities.ClusterGroup]("cluster-groups", "clus-grp", entities.ClusterGroupIndices),
// repos.NewFxMongoRepo[*entities.BYOKCluster]("byok_clusters", "byok", entities.BYOKClusterIndices),
repos.NewFxMongoRepo[*entities.BYOKCluster]("clusters", "byok", entities.BYOKClusterIndices),
repos.NewFxMongoRepo[*entities.ClusterManagedService]("cmsvcs", "cmsvcs", entities.ClusterManagedServiceIndices),
Expand Down
12 changes: 12 additions & 0 deletions apps/infra/internal/app/process-error-on-apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,18 @@ func ProcessErrorOnApply(consumer ErrorOnApplyConsumer, logger logging.Logger, d

gvkstr := obj.GroupVersionKind().String()
switch gvkstr {
case clusterConnGVK.String():
{
cc, err := fn.JsonConvert[entities.ClusterConnection](obj.Object)
if err != nil {
return err
}

if errObj.Action == t.ActionApply {
return d.OnClusterConnApplyError(dctx, errObj.ClusterName, obj.GetName(), errObj.Error, opts)
}
return d.OnClusterConnDeleteMessage(dctx, errObj.ClusterName, cc)
}
case nodepoolGVK.String():
{
nodepool, err := fn.JsonConvert[entities.NodePool](obj.Object)
Expand Down
13 changes: 13 additions & 0 deletions apps/infra/internal/app/process-resource-updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func gvk(obj client.Object) string {

var (
clusterGVK = fn.GVK("clusters.kloudlite.io/v1", "Cluster")
clusterConnGVK = fn.GVK("wireguard.kloudlite.io/v1", "ClusterConnection")
nodepoolGVK = fn.GVK("clusters.kloudlite.io/v1", "NodePool")
helmreleaseGVK = fn.GVK("crds.kloudlite.io/v1", "HelmChart")
pvcGVK = fn.GVK("v1", "PersistentVolumeClaim")
Expand Down Expand Up @@ -109,6 +110,18 @@ func processResourceUpdates(consumer ReceiveResourceUpdatesConsumer, d domain.Do
}
return d.OnClusterUpdateMessage(dctx, clus, resStatus, domain.UpdateAndDeleteOpts{MessageTimestamp: msg.Timestamp})
}
case clusterConnGVK.String():
{
var np entities.ClusterConnection
if err := fn.JsonConversion(su.Object, &np); err != nil {
return errors.NewE(err)
}

if resStatus == types.ResourceStatusDeleted {
return d.OnClusterConnDeleteMessage(dctx, su.ClusterName, np)
}
return d.OnClusterConnUpdateMessage(dctx, su.ClusterName, np, resStatus, domain.UpdateAndDeleteOpts{MessageTimestamp: msg.Timestamp})
}
case nodepoolGVK.String():
{
var np entities.NodePool
Expand Down
5 changes: 5 additions & 0 deletions apps/infra/internal/domain/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const (
ResourceTypeDomainEntries ResourceType = "domain_entries"
ResourceTypeHelmRelease ResourceType = "helm_release"
ResourceTypeNodePool ResourceType = "nodepool"
ResourceTypeClusterConnection ResourceType = "cluster_connection"
ResourceTypePVC ResourceType = "persistance_volume_claim"
ResourceTypePV ResourceType = "persistance_volume"
ResourceTypeVolumeAttachment ResourceType = "volume_attachment"
Expand Down Expand Up @@ -94,6 +95,10 @@ type Domain interface {
OnNodePoolUpdateMessage(ctx InfraContext, clusterName string, nodePool entities.NodePool, status types.ResourceStatus, opts UpdateAndDeleteOpts) error
OnNodepoolApplyError(ctx InfraContext, clusterName string, name string, errMsg string, opts UpdateAndDeleteOpts) error

OnClusterConnDeleteMessage(ctx InfraContext, clusterName string, clusterConn entities.ClusterConnection) error
OnClusterConnUpdateMessage(ctx InfraContext, clusterName string, clusterConn entities.ClusterConnection, status types.ResourceStatus, opts UpdateAndDeleteOpts) error
OnClusterConnApplyError(ctx InfraContext, clusterName string, name string, errMsg string, opts UpdateAndDeleteOpts) error

ListNodes(ctx InfraContext, clusterName string, search map[string]repos.MatchFilter, pagination repos.CursorPagination) (*repos.PaginatedRecord[*entities.Node], error)
GetNode(ctx InfraContext, clusterName string, nodeName string) (*entities.Node, error)

Expand Down
177 changes: 177 additions & 0 deletions apps/infra/internal/domain/cluster-conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package domain

import (
"fmt"

"github.com/kloudlite/api/apps/infra/internal/entities"
fc "github.com/kloudlite/api/apps/infra/internal/entities/field-constants"
"github.com/kloudlite/api/common"
"github.com/kloudlite/api/common/fields"
"github.com/kloudlite/api/pkg/errors"
"github.com/kloudlite/api/pkg/repos"
wgv1 "github.com/kloudlite/operator/apis/wireguard/v1"
"github.com/kloudlite/operator/operators/resource-watcher/types"
)

func (d *domain) reconClusterConns(ctx InfraContext, clusterGroup string) error {
conns, err := d.clusterConnRepo.Find(ctx, repos.Query{
Filter: repos.Filter{fields.AccountName: ctx.AccountName, fc.ClusterClusterGroupName: clusterGroup},
})

if err != nil {
return errors.NewE(err)
}

peers := make([]wgv1.Peer, 0)

for _, c := range conns {
if c.Spec.PublicKey == nil {
continue
}

peers = append(peers, wgv1.Peer{
PublicKey: *c.Spec.PublicKey,
Endpoint: c.Endpoint,
Id: c.Spec.Id,
AllowedIPs: []string{c.CIDR},
})
}

for _, xcc := range conns {
if fmt.Sprintf("%#v", xcc.Spec.Peers) == fmt.Sprintf("%#v", peers) {
continue
}

xcc.Spec.Peers = peers
unp, err := d.clusterConnRepo.Patch(
ctx,
repos.Filter{
fields.AccountName: ctx.AccountName,
fields.ClusterName: xcc.ClusterName,
fields.MetadataName: xcc.Name,
},
common.PatchForUpdate(ctx, xcc, common.PatchOpts{XPatch: map[string]any{fc.ClusterConnectionSpecPeers: peers}}),
)

if err != nil {
return errors.NewE(err)
}

if err := d.resDispatcher.ApplyToTargetCluster(ctx,
unp.ClusterName,
&unp.ClusterConnection,
unp.RecordVersion,
); err != nil {
return errors.NewE(err)
}
}

return nil
}

func (d *domain) findClusterConn(ctx InfraContext, clusterName string, connName string) (*entities.ClusterConnection, error) {
cc, err := d.clusterConnRepo.FindOne(ctx, repos.Filter{
fields.AccountName: ctx.AccountName,
fields.ClusterName: clusterName,
fields.MetadataName: connName,
})
if err != nil {
return nil, errors.NewE(err)
}
if cc == nil {
return nil, errors.Newf("cluster connection with name %q not found", clusterName)
}
return cc, nil
}

func (d *domain) findClusterConns(ctx InfraContext, clusterGroup string) ([]*entities.ClusterConnection, error) {
cc, err := d.clusterConnRepo.Find(ctx, repos.Query{
Filter: repos.Filter{
fields.AccountName: ctx.AccountName,
fc.ClusterClusterGroupName: clusterGroup,
},
})
if err != nil {
return nil, errors.NewE(err)
}

return cc, nil
}

func (d *domain) OnClusterConnDeleteMessage(ctx InfraContext, clusterName string, clusterConn entities.ClusterConnection) error {
err := d.clusterConnRepo.DeleteOne(
ctx,
repos.Filter{
fields.AccountName: ctx.AccountName,
fields.ClusterName: clusterName,
fields.MetadataName: clusterConn.Name,
},
)
if err != nil {
return errors.NewE(err)
}

d.resourceEventPublisher.PublishResourceEvent(ctx, clusterName, ResourceTypeClusterConnection, clusterConn.Name, PublishDelete)
return err
}

func (d *domain) OnClusterConnUpdateMessage(ctx InfraContext, clusterName string, clusterConn entities.ClusterConnection, status types.ResourceStatus, opts UpdateAndDeleteOpts) error {
xconn, err := d.findClusterConn(ctx, clusterName, clusterConn.Name)
if err != nil {
return errors.NewE(err)
}

if xconn == nil {
return errors.Newf("no cluster connection found")
}

if _, err := d.matchRecordVersion(clusterConn.Annotations, xconn.RecordVersion); err != nil {
return d.resyncToTargetCluster(ctx, xconn.SyncStatus.Action, clusterName, &xconn.ClusterConnection, xconn.RecordVersion)
}

recordVersion, err := d.matchRecordVersion(clusterConn.Annotations, xconn.RecordVersion)
if err != nil {
return errors.NewE(err)
}

unp, err := d.clusterConnRepo.PatchById(
ctx,
xconn.Id,
common.PatchForSyncFromAgent(&clusterConn,
recordVersion, status,
common.PatchOpts{
MessageTimestamp: opts.MessageTimestamp,
}))
if err != nil {
return errors.NewE(err)
}

if err := d.reconClusterConns(ctx, xconn.ClusterGroupName); err != nil {
return errors.NewE(err)
}

d.resourceEventPublisher.PublishResourceEvent(ctx, clusterName, ResourceTypeClusterConnection, unp.Name, PublishUpdate)
return nil
}

func (d *domain) OnClusterConnApplyError(ctx InfraContext, clusterName string, name string, errMsg string, opts UpdateAndDeleteOpts) error {
unp, err := d.clusterConnRepo.Patch(
ctx,
repos.Filter{
fields.AccountName: ctx.AccountName,
fields.ClusterName: clusterName,
fields.MetadataName: name,
},
common.PatchForErrorFromAgent(
errMsg,
common.PatchOpts{
MessageTimestamp: opts.MessageTimestamp,
},
),
)
if err != nil {
return errors.NewE(err)
}
d.resourceEventPublisher.PublishResourceEvent(ctx, clusterName, ResourceTypeClusterConnection, unp.Name, PublishUpdate)
return errors.NewE(err)
}
1 change: 1 addition & 0 deletions apps/infra/internal/domain/cluster-group.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package domain
2 changes: 2 additions & 0 deletions apps/infra/internal/domain/clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ func (d *domain) GetClusterAdminKubeconfig(ctx InfraContext, clusterName string)
func (d *domain) applyCluster(ctx InfraContext, cluster *entities.Cluster) error {
addTrackingId(&cluster.Cluster, cluster.Id)
return d.applyK8sResource(ctx, &cluster.Cluster, cluster.RecordVersion)

// TODO: create cluster connection and apply to target cluster
}

func (d *domain) CreateCluster(ctx InfraContext, cluster entities.Cluster) (*entities.Cluster, error) {
Expand Down
7 changes: 7 additions & 0 deletions apps/infra/internal/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type domain struct {
helmReleaseRepo repos.DbRepo[*entities.HelmRelease]
nodeRepo repos.DbRepo[*entities.Node]
nodePoolRepo repos.DbRepo[*entities.NodePool]
clusterConnRepo repos.DbRepo[*entities.ClusterConnection]
clusterGroupRepo repos.DbRepo[*entities.ClusterGroup]
domainEntryRepo repos.DbRepo[*entities.DomainEntry]
secretRepo repos.DbRepo[*entities.CloudProviderSecret]
pvcRepo repos.DbRepo[*entities.PersistentVolumeClaim]
Expand Down Expand Up @@ -166,6 +168,9 @@ var Module = fx.Module("domain",
resourceDispatcher ResourceDispatcher,
helmReleaseRepo repos.DbRepo[*entities.HelmRelease],

clusterConnRepo repos.DbRepo[*entities.ClusterConnection],
clusterGroupRepo repos.DbRepo[*entities.ClusterGroup],

pvcRepo repos.DbRepo[*entities.PersistentVolumeClaim],
pvRepo repos.DbRepo[*entities.PersistentVolume],
namespaceRepo repos.DbRepo[*entities.Namespace],
Expand Down Expand Up @@ -212,6 +217,8 @@ var Module = fx.Module("domain",
logger: logger,
env: env,
clusterRepo: clusterRepo,
clusterConnRepo: clusterConnRepo,
clusterGroupRepo: clusterGroupRepo,
byokClusterRepo: byokClusterRepo,
clusterManagedServiceRepo: clustermanagedserviceRepo,
nodeRepo: nodeRepo,
Expand Down
63 changes: 63 additions & 0 deletions apps/infra/internal/entities/cluster-connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package entities

import (
"github.com/kloudlite/api/common"
"github.com/kloudlite/api/pkg/repos"
t "github.com/kloudlite/api/pkg/types"
wgv1 "github.com/kloudlite/operator/apis/wireguard/v1"
"github.com/kloudlite/operator/pkg/operator"
)

type ClusterConnection struct {
repos.BaseEntity `json:",inline" graphql:"noinput"`

wgv1.ClusterConnection `json:",inline"`

common.ResourceMetadata `json:",inline"`

AccountName string `json:"accountName" graphql:"noinput"`
ClusterName string `json:"clusterName" graphql:"noinput"`
ClusterGroupName string `json:"clusterGroupName" graphql:"noinput"`

CIDR string `json:"cidr" graphql:"noinput"`
Endpoint string `json:"endpoint" graphql:"noinput"`

SyncStatus t.SyncStatus `json:"syncStatus" graphql:"noinput"`
}

func (c *ClusterConnection) GetDisplayName() string {
return c.ResourceMetadata.DisplayName
}

func (c *ClusterConnection) GetStatus() operator.Status {
return c.ClusterConnection.Status
}

var ClusterConnIndices = []repos.IndexField{
{
Field: []repos.IndexKey{
{Key: "id", Value: repos.IndexAsc},
},
Unique: true,
},
{
Field: []repos.IndexKey{
{Key: "metadata.name", Value: repos.IndexAsc},
{Key: "accountName", Value: repos.IndexAsc},
{Key: "clusterName", Value: repos.IndexAsc},
},
Unique: true,
},
{
Field: []repos.IndexKey{
{Key: "accountName", Value: repos.IndexAsc},
{Key: "spec.id", Value: repos.IndexAsc},
},
Unique: true,
},
{
Field: []repos.IndexKey{
{Key: "accountName", Value: repos.IndexAsc},
},
},
}
Loading