diff --git a/apps/infra/internal/app/app.go b/apps/infra/internal/app/app.go index 7504e5142..8172fd3ae 100644 --- a/apps/infra/internal/app/app.go +++ b/apps/infra/internal/app/app.go @@ -44,6 +44,8 @@ type ( var Module = fx.Module( "app", repos.NewFxMongoRepo[*entities.Cluster]("clusters", "clus", entities.ClusterIndices), + repos.NewFxMongoRepo[*entities.ClusterConnection]("cluster-connections", "clus-cn", entities.ClusterConnIndices), + repos.NewFxMongoRepo[*entities.ClusterGroup]("cluster-groups", "clus-grp", entities.ClusterGroupIndices), // repos.NewFxMongoRepo[*entities.BYOKCluster]("byok_clusters", "byok", entities.BYOKClusterIndices), repos.NewFxMongoRepo[*entities.BYOKCluster]("clusters", "byok", entities.BYOKClusterIndices), repos.NewFxMongoRepo[*entities.ClusterManagedService]("cmsvcs", "cmsvcs", entities.ClusterManagedServiceIndices), diff --git a/apps/infra/internal/app/process-error-on-apply.go b/apps/infra/internal/app/process-error-on-apply.go index 3c54e5c35..6c8d3c48a 100644 --- a/apps/infra/internal/app/process-error-on-apply.go +++ b/apps/infra/internal/app/process-error-on-apply.go @@ -52,6 +52,18 @@ func ProcessErrorOnApply(consumer ErrorOnApplyConsumer, logger logging.Logger, d gvkstr := obj.GroupVersionKind().String() switch gvkstr { + case clusterConnGVK.String(): + { + cc, err := fn.JsonConvert[entities.ClusterConnection](obj.Object) + if err != nil { + return err + } + + if errObj.Action == t.ActionApply { + return d.OnClusterConnApplyError(dctx, errObj.ClusterName, obj.GetName(), errObj.Error, opts) + } + return d.OnClusterConnDeleteMessage(dctx, errObj.ClusterName, cc) + } case nodepoolGVK.String(): { nodepool, err := fn.JsonConvert[entities.NodePool](obj.Object) diff --git a/apps/infra/internal/app/process-resource-updates.go b/apps/infra/internal/app/process-resource-updates.go index 28c9d0ac1..f64e8ef53 100644 --- a/apps/infra/internal/app/process-resource-updates.go +++ b/apps/infra/internal/app/process-resource-updates.go @@ -33,6 +33,7 @@ func gvk(obj client.Object) string { var ( clusterGVK = fn.GVK("clusters.kloudlite.io/v1", "Cluster") + clusterConnGVK = fn.GVK("wireguard.kloudlite.io/v1", "ClusterConnection") nodepoolGVK = fn.GVK("clusters.kloudlite.io/v1", "NodePool") helmreleaseGVK = fn.GVK("crds.kloudlite.io/v1", "HelmChart") pvcGVK = fn.GVK("v1", "PersistentVolumeClaim") @@ -109,6 +110,18 @@ func processResourceUpdates(consumer ReceiveResourceUpdatesConsumer, d domain.Do } return d.OnClusterUpdateMessage(dctx, clus, resStatus, domain.UpdateAndDeleteOpts{MessageTimestamp: msg.Timestamp}) } + case clusterConnGVK.String(): + { + var np entities.ClusterConnection + if err := fn.JsonConversion(su.Object, &np); err != nil { + return errors.NewE(err) + } + + if resStatus == types.ResourceStatusDeleted { + return d.OnClusterConnDeleteMessage(dctx, su.ClusterName, np) + } + return d.OnClusterConnUpdateMessage(dctx, su.ClusterName, np, resStatus, domain.UpdateAndDeleteOpts{MessageTimestamp: msg.Timestamp}) + } case nodepoolGVK.String(): { var np entities.NodePool diff --git a/apps/infra/internal/domain/api.go b/apps/infra/internal/domain/api.go index 88072f83b..92eaf621e 100644 --- a/apps/infra/internal/domain/api.go +++ b/apps/infra/internal/domain/api.go @@ -43,6 +43,7 @@ const ( ResourceTypeDomainEntries ResourceType = "domain_entries" ResourceTypeHelmRelease ResourceType = "helm_release" ResourceTypeNodePool ResourceType = "nodepool" + ResourceTypeClusterConnection ResourceType = "cluster_connection" ResourceTypePVC ResourceType = "persistance_volume_claim" ResourceTypePV ResourceType = "persistance_volume" ResourceTypeVolumeAttachment ResourceType = "volume_attachment" @@ -94,6 +95,10 @@ type Domain interface { OnNodePoolUpdateMessage(ctx InfraContext, clusterName string, nodePool entities.NodePool, status types.ResourceStatus, opts UpdateAndDeleteOpts) error OnNodepoolApplyError(ctx InfraContext, clusterName string, name string, errMsg string, opts UpdateAndDeleteOpts) error + OnClusterConnDeleteMessage(ctx InfraContext, clusterName string, clusterConn entities.ClusterConnection) error + OnClusterConnUpdateMessage(ctx InfraContext, clusterName string, clusterConn entities.ClusterConnection, status types.ResourceStatus, opts UpdateAndDeleteOpts) error + OnClusterConnApplyError(ctx InfraContext, clusterName string, name string, errMsg string, opts UpdateAndDeleteOpts) error + ListNodes(ctx InfraContext, clusterName string, search map[string]repos.MatchFilter, pagination repos.CursorPagination) (*repos.PaginatedRecord[*entities.Node], error) GetNode(ctx InfraContext, clusterName string, nodeName string) (*entities.Node, error) diff --git a/apps/infra/internal/domain/cluster-conn.go b/apps/infra/internal/domain/cluster-conn.go new file mode 100644 index 000000000..52fcbf470 --- /dev/null +++ b/apps/infra/internal/domain/cluster-conn.go @@ -0,0 +1,177 @@ +package domain + +import ( + "fmt" + + "github.com/kloudlite/api/apps/infra/internal/entities" + fc "github.com/kloudlite/api/apps/infra/internal/entities/field-constants" + "github.com/kloudlite/api/common" + "github.com/kloudlite/api/common/fields" + "github.com/kloudlite/api/pkg/errors" + "github.com/kloudlite/api/pkg/repos" + wgv1 "github.com/kloudlite/operator/apis/wireguard/v1" + "github.com/kloudlite/operator/operators/resource-watcher/types" +) + +func (d *domain) reconClusterConns(ctx InfraContext, clusterGroup string) error { + conns, err := d.clusterConnRepo.Find(ctx, repos.Query{ + Filter: repos.Filter{fields.AccountName: ctx.AccountName, fc.ClusterClusterGroupName: clusterGroup}, + }) + + if err != nil { + return errors.NewE(err) + } + + peers := make([]wgv1.Peer, 0) + + for _, c := range conns { + if c.Spec.PublicKey == nil { + continue + } + + peers = append(peers, wgv1.Peer{ + PublicKey: *c.Spec.PublicKey, + Endpoint: c.Endpoint, + Id: c.Spec.Id, + AllowedIPs: []string{c.CIDR}, + }) + } + + for _, xcc := range conns { + if fmt.Sprintf("%#v", xcc.Spec.Peers) == fmt.Sprintf("%#v", peers) { + continue + } + + xcc.Spec.Peers = peers + unp, err := d.clusterConnRepo.Patch( + ctx, + repos.Filter{ + fields.AccountName: ctx.AccountName, + fields.ClusterName: xcc.ClusterName, + fields.MetadataName: xcc.Name, + }, + common.PatchForUpdate(ctx, xcc, common.PatchOpts{XPatch: map[string]any{fc.ClusterConnectionSpecPeers: peers}}), + ) + + if err != nil { + return errors.NewE(err) + } + + if err := d.resDispatcher.ApplyToTargetCluster(ctx, + unp.ClusterName, + &unp.ClusterConnection, + unp.RecordVersion, + ); err != nil { + return errors.NewE(err) + } + } + + return nil +} + +func (d *domain) findClusterConn(ctx InfraContext, clusterName string, connName string) (*entities.ClusterConnection, error) { + cc, err := d.clusterConnRepo.FindOne(ctx, repos.Filter{ + fields.AccountName: ctx.AccountName, + fields.ClusterName: clusterName, + fields.MetadataName: connName, + }) + if err != nil { + return nil, errors.NewE(err) + } + if cc == nil { + return nil, errors.Newf("cluster connection with name %q not found", clusterName) + } + return cc, nil +} + +func (d *domain) findClusterConns(ctx InfraContext, clusterGroup string) ([]*entities.ClusterConnection, error) { + cc, err := d.clusterConnRepo.Find(ctx, repos.Query{ + Filter: repos.Filter{ + fields.AccountName: ctx.AccountName, + fc.ClusterClusterGroupName: clusterGroup, + }, + }) + if err != nil { + return nil, errors.NewE(err) + } + + return cc, nil +} + +func (d *domain) OnClusterConnDeleteMessage(ctx InfraContext, clusterName string, clusterConn entities.ClusterConnection) error { + err := d.clusterConnRepo.DeleteOne( + ctx, + repos.Filter{ + fields.AccountName: ctx.AccountName, + fields.ClusterName: clusterName, + fields.MetadataName: clusterConn.Name, + }, + ) + if err != nil { + return errors.NewE(err) + } + + d.resourceEventPublisher.PublishResourceEvent(ctx, clusterName, ResourceTypeClusterConnection, clusterConn.Name, PublishDelete) + return err +} + +func (d *domain) OnClusterConnUpdateMessage(ctx InfraContext, clusterName string, clusterConn entities.ClusterConnection, status types.ResourceStatus, opts UpdateAndDeleteOpts) error { + xconn, err := d.findClusterConn(ctx, clusterName, clusterConn.Name) + if err != nil { + return errors.NewE(err) + } + + if xconn == nil { + return errors.Newf("no cluster connection found") + } + + if _, err := d.matchRecordVersion(clusterConn.Annotations, xconn.RecordVersion); err != nil { + return d.resyncToTargetCluster(ctx, xconn.SyncStatus.Action, clusterName, &xconn.ClusterConnection, xconn.RecordVersion) + } + + recordVersion, err := d.matchRecordVersion(clusterConn.Annotations, xconn.RecordVersion) + if err != nil { + return errors.NewE(err) + } + + unp, err := d.clusterConnRepo.PatchById( + ctx, + xconn.Id, + common.PatchForSyncFromAgent(&clusterConn, + recordVersion, status, + common.PatchOpts{ + MessageTimestamp: opts.MessageTimestamp, + })) + if err != nil { + return errors.NewE(err) + } + + if err := d.reconClusterConns(ctx, xconn.ClusterGroupName); err != nil { + return errors.NewE(err) + } + + d.resourceEventPublisher.PublishResourceEvent(ctx, clusterName, ResourceTypeClusterConnection, unp.Name, PublishUpdate) + return nil +} + +func (d *domain) OnClusterConnApplyError(ctx InfraContext, clusterName string, name string, errMsg string, opts UpdateAndDeleteOpts) error { + unp, err := d.clusterConnRepo.Patch( + ctx, + repos.Filter{ + fields.AccountName: ctx.AccountName, + fields.ClusterName: clusterName, + fields.MetadataName: name, + }, + common.PatchForErrorFromAgent( + errMsg, + common.PatchOpts{ + MessageTimestamp: opts.MessageTimestamp, + }, + ), + ) + if err != nil { + return errors.NewE(err) + } + d.resourceEventPublisher.PublishResourceEvent(ctx, clusterName, ResourceTypeClusterConnection, unp.Name, PublishUpdate) + return errors.NewE(err) +} diff --git a/apps/infra/internal/domain/cluster-group.go b/apps/infra/internal/domain/cluster-group.go new file mode 100644 index 000000000..4188b5afd --- /dev/null +++ b/apps/infra/internal/domain/cluster-group.go @@ -0,0 +1 @@ +package domain diff --git a/apps/infra/internal/domain/clusters.go b/apps/infra/internal/domain/clusters.go index b1b87183b..6100c0d9b 100644 --- a/apps/infra/internal/domain/clusters.go +++ b/apps/infra/internal/domain/clusters.go @@ -99,6 +99,8 @@ func (d *domain) GetClusterAdminKubeconfig(ctx InfraContext, clusterName string) func (d *domain) applyCluster(ctx InfraContext, cluster *entities.Cluster) error { addTrackingId(&cluster.Cluster, cluster.Id) return d.applyK8sResource(ctx, &cluster.Cluster, cluster.RecordVersion) + + // TODO: create cluster connection and apply to target cluster } func (d *domain) CreateCluster(ctx InfraContext, cluster entities.Cluster) (*entities.Cluster, error) { diff --git a/apps/infra/internal/domain/domain.go b/apps/infra/internal/domain/domain.go index ddd08ccf0..86ca6d92e 100644 --- a/apps/infra/internal/domain/domain.go +++ b/apps/infra/internal/domain/domain.go @@ -37,6 +37,8 @@ type domain struct { helmReleaseRepo repos.DbRepo[*entities.HelmRelease] nodeRepo repos.DbRepo[*entities.Node] nodePoolRepo repos.DbRepo[*entities.NodePool] + clusterConnRepo repos.DbRepo[*entities.ClusterConnection] + clusterGroupRepo repos.DbRepo[*entities.ClusterGroup] domainEntryRepo repos.DbRepo[*entities.DomainEntry] secretRepo repos.DbRepo[*entities.CloudProviderSecret] pvcRepo repos.DbRepo[*entities.PersistentVolumeClaim] @@ -166,6 +168,9 @@ var Module = fx.Module("domain", resourceDispatcher ResourceDispatcher, helmReleaseRepo repos.DbRepo[*entities.HelmRelease], + clusterConnRepo repos.DbRepo[*entities.ClusterConnection], + clusterGroupRepo repos.DbRepo[*entities.ClusterGroup], + pvcRepo repos.DbRepo[*entities.PersistentVolumeClaim], pvRepo repos.DbRepo[*entities.PersistentVolume], namespaceRepo repos.DbRepo[*entities.Namespace], @@ -212,6 +217,8 @@ var Module = fx.Module("domain", logger: logger, env: env, clusterRepo: clusterRepo, + clusterConnRepo: clusterConnRepo, + clusterGroupRepo: clusterGroupRepo, byokClusterRepo: byokClusterRepo, clusterManagedServiceRepo: clustermanagedserviceRepo, nodeRepo: nodeRepo, diff --git a/apps/infra/internal/entities/cluster-connection.go b/apps/infra/internal/entities/cluster-connection.go new file mode 100644 index 000000000..e58d6e890 --- /dev/null +++ b/apps/infra/internal/entities/cluster-connection.go @@ -0,0 +1,63 @@ +package entities + +import ( + "github.com/kloudlite/api/common" + "github.com/kloudlite/api/pkg/repos" + t "github.com/kloudlite/api/pkg/types" + wgv1 "github.com/kloudlite/operator/apis/wireguard/v1" + "github.com/kloudlite/operator/pkg/operator" +) + +type ClusterConnection struct { + repos.BaseEntity `json:",inline" graphql:"noinput"` + + wgv1.ClusterConnection `json:",inline"` + + common.ResourceMetadata `json:",inline"` + + AccountName string `json:"accountName" graphql:"noinput"` + ClusterName string `json:"clusterName" graphql:"noinput"` + ClusterGroupName string `json:"clusterGroupName" graphql:"noinput"` + + CIDR string `json:"cidr" graphql:"noinput"` + Endpoint string `json:"endpoint" graphql:"noinput"` + + SyncStatus t.SyncStatus `json:"syncStatus" graphql:"noinput"` +} + +func (c *ClusterConnection) GetDisplayName() string { + return c.ResourceMetadata.DisplayName +} + +func (c *ClusterConnection) GetStatus() operator.Status { + return c.ClusterConnection.Status +} + +var ClusterConnIndices = []repos.IndexField{ + { + Field: []repos.IndexKey{ + {Key: "id", Value: repos.IndexAsc}, + }, + Unique: true, + }, + { + Field: []repos.IndexKey{ + {Key: "metadata.name", Value: repos.IndexAsc}, + {Key: "accountName", Value: repos.IndexAsc}, + {Key: "clusterName", Value: repos.IndexAsc}, + }, + Unique: true, + }, + { + Field: []repos.IndexKey{ + {Key: "accountName", Value: repos.IndexAsc}, + {Key: "spec.id", Value: repos.IndexAsc}, + }, + Unique: true, + }, + { + Field: []repos.IndexKey{ + {Key: "accountName", Value: repos.IndexAsc}, + }, + }, +} diff --git a/apps/infra/internal/entities/cluster-group.go b/apps/infra/internal/entities/cluster-group.go new file mode 100644 index 000000000..0d154cbc5 --- /dev/null +++ b/apps/infra/internal/entities/cluster-group.go @@ -0,0 +1,87 @@ +package entities + +import ( + "fmt" + + "github.com/kloudlite/api/common" + "github.com/kloudlite/api/pkg/functions" + "github.com/kloudlite/api/pkg/repos" + t "github.com/kloudlite/api/pkg/types" + "github.com/kloudlite/operator/pkg/operator" +) + +const ( + wgIpIndex = 16 + clusterPodIndex = 13 +) + +func GetCidrRanges(index int) (*string, error) { + switch index { + case wgIpIndex, clusterPodIndex: + return nil, fmt.Errorf("it can't be %d or %d", wgIpIndex, clusterPodIndex) + } + + if index < 0 || index > 255 { + return nil, fmt.Errorf("ip range can only be between 0 and 255") + } + + return functions.New(fmt.Sprintf("10.%d.0.0/16", index)), nil +} + +type Peer struct { + Id int `json:"id" graphql:"noinput"` + PubKey string `json:"pubKey" graphql:"noinput"` + AllowedIps []string `json:"allowedIps" graphql:"noinput"` +} + +type ClusterGroup struct { + repos.BaseEntity `json:",inline" graphql:"noinput"` + + common.ResourceMetadata `json:",inline"` + + // Peers []Peer `json:"peers" graphql:"noinput"` + + AccountName string `json:"accountName" graphql:"noinput"` + ClusterName string `json:"clusterName" graphql:"noinput"` + + SyncStatus t.SyncStatus `json:"syncStatus" graphql:"noinput"` +} + +func (c *ClusterGroup) GetDisplayName() string { + return c.ResourceMetadata.DisplayName +} + +func (c *ClusterGroup) GetStatus() operator.Status { + return operator.Status{ + IsReady: true, + } +} + +var ClusterGroupIndices = []repos.IndexField{ + { + Field: []repos.IndexKey{ + {Key: "id", Value: repos.IndexAsc}, + }, + Unique: true, + }, + { + Field: []repos.IndexKey{ + {Key: "metadata.name", Value: repos.IndexAsc}, + {Key: "accountName", Value: repos.IndexAsc}, + {Key: "clusterName", Value: repos.IndexAsc}, + }, + Unique: true, + }, + { + Field: []repos.IndexKey{ + {Key: "accountName", Value: repos.IndexAsc}, + {Key: "spec.id", Value: repos.IndexAsc}, + }, + Unique: true, + }, + { + Field: []repos.IndexKey{ + {Key: "accountName", Value: repos.IndexAsc}, + }, + }, +} diff --git a/apps/infra/internal/entities/cluster.go b/apps/infra/internal/entities/cluster.go index 87f61ed88..0781f2acd 100644 --- a/apps/infra/internal/entities/cluster.go +++ b/apps/infra/internal/entities/cluster.go @@ -15,8 +15,9 @@ type Cluster struct { common.ResourceMetadata `json:",inline"` - AccountName string `json:"accountName" graphql:"noinput"` - SyncStatus t.SyncStatus `json:"syncStatus" graphql:"noinput"` + ClusterGroupName *string `json:"clusterGroupName"` + AccountName string `json:"accountName" graphql:"noinput"` + SyncStatus t.SyncStatus `json:"syncStatus" graphql:"noinput"` } func (c *Cluster) GetDisplayName() string { diff --git a/apps/infra/internal/entities/field-constants/generated_constants.go b/apps/infra/internal/entities/field-constants/generated_constants.go index d3acf1b3c..2b1e84517 100644 --- a/apps/infra/internal/entities/field-constants/generated_constants.go +++ b/apps/infra/internal/entities/field-constants/generated_constants.go @@ -113,6 +113,7 @@ const ( // constant vars generated for struct Cluster const ( + ClusterClusterGroupName = "clusterGroupName" ClusterSpec = "spec" ClusterSpecAccountId = "spec.accountId" ClusterSpecAccountName = "spec.accountName" @@ -169,6 +170,32 @@ const ( ClusterSpecTaintMasterNodes = "spec.taintMasterNodes" ) +// constant vars generated for struct ClusterConnection +const ( + ClusterConnectionClusterGroupName = "clusterGroupName" + ClusterConnectionSpec = "spec" + ClusterConnectionSpecAgentsResources = "spec.agentsResources" + ClusterConnectionSpecAgentsResourcesClaims = "spec.agentsResources.claims" + ClusterConnectionSpecAgentsResourcesLimits = "spec.agentsResources.limits" + ClusterConnectionSpecAgentsResourcesRequests = "spec.agentsResources.requests" + ClusterConnectionSpecDnsServer = "spec.dnsServer" + ClusterConnectionSpecGatewayResources = "spec.gatewayResources" + ClusterConnectionSpecGatewayResourcesClaims = "spec.gatewayResources.claims" + ClusterConnectionSpecGatewayResourcesLimits = "spec.gatewayResources.limits" + ClusterConnectionSpecGatewayResourcesRequests = "spec.gatewayResources.requests" + ClusterConnectionSpecId = "spec.id" + ClusterConnectionSpecInterface = "spec.interface" + ClusterConnectionSpecIpAddress = "spec.ipAddress" + ClusterConnectionSpecNodeport = "spec.nodeport" + ClusterConnectionSpecPeers = "spec.peers" + ClusterConnectionSpecPublicKey = "spec.publicKey" +) + +// constant vars generated for struct ClusterGroup +const ( + ClusterGroupPeers = "peers" +) + // constant vars generated for struct ClusterManagedService const ( ClusterManagedServiceOutput = "output" @@ -351,6 +378,14 @@ const ( OutputFieldName = "name" ) +// constant vars generated for struct Peers +const ( + PeersAllowedIps = "allowedIps" + PeersName = "name" + PeersNamespace = "namespace" + PeersPubKey = "pubKey" +) + // constant vars generated for struct PersistentVolume const ( PersistentVolumeSpec = "spec"