Skip to content
This repository was archived by the owner on Jun 11, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions apps/infra/internal/domain/build-run.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func (d *domain) OnBuildRunUpdateMessage(ctx InfraContext, clusterName string, b
}, &buildRun); err != nil {
return errors.NewE(err)
}

return nil
}

Expand All @@ -51,5 +52,6 @@ func (d *domain) OnBuildRunDeleteMessage(ctx InfraContext, clusterName string, b
}); err != nil {
return errors.NewE(err)
}
//d.natCli.Conn.Publish(fmt.Scan(buildRun.BuildRun, ))
return nil
}
31 changes: 26 additions & 5 deletions apps/infra/internal/domain/clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,10 @@ func (d *domain) CreateCluster(ctx InfraContext, cluster entities.Cluster) (*ent
return nil, errors.NewE(err)
}

if err = d.natCli.Conn.Publish(d.clusterResUpdateSubject(nCluster), []byte("Added")); err != nil {
d.logger.Errorf(err, "failed to publish message to account %q", cluster.Cluster.Spec.AccountId)
}

return nCluster, nil
}

Expand Down Expand Up @@ -361,7 +365,7 @@ func (d *domain) UpdateCluster(ctx InfraContext, cluster entities.Cluster) (*ent
UserEmail: ctx.UserEmail,
}

// FIXME: no update for cluster spec
// FIXME: no update for cluster spec
// clus.Spec = cluster.Spec

clus.Labels = cluster.Labels
Expand All @@ -377,9 +381,13 @@ func (d *domain) UpdateCluster(ctx InfraContext, cluster entities.Cluster) (*ent
return nil, errors.NewE(err)
}

if err:=d.natCli.Conn.Publish(d.clusterResUpdateSubject(&cluster), []byte("Updated")); err != nil {
d.logger.Errorf(err, "failed to publish message to account %q", cluster.Cluster.Spec.AccountId)
}
return uCluster, nil
}


func (d *domain) readClusterK8sResource(ctx InfraContext, namespace string, name string) (cluster *clustersv1.Cluster, found bool, err error) {
var clus entities.Cluster
if err := d.k8sClient.Get(ctx, fn.NN(namespace, name), &clus.Cluster); err != nil {
Expand Down Expand Up @@ -408,23 +416,33 @@ func (d *domain) DeleteCluster(ctx InfraContext, name string) error {
return errors.NewE(err)
}

return d.deleteK8sResource(ctx, &upC.Cluster)
deletedCluster := d.deleteK8sResource(ctx, &upC.Cluster)

if err = d.natCli.Conn.Publish(d.clusterResUpdateSubject(c), []byte("Update")); err != nil {
d.logger.Errorf(err, "failed to publish message to account %q", c.Cluster.Spec.AccountId)
}

return deletedCluster
}

return nil
}

}
func (d *domain) OnDeleteClusterMessage(ctx InfraContext, cluster entities.Cluster) error {
accNs, err := d.getAccNamespace(ctx, ctx.AccountName)
if err != nil {
return errors.NewE(err)
}

return d.clusterRepo.DeleteOne(ctx, repos.Filter{
onDeletedClusterMessage := d.clusterRepo.DeleteOne(ctx, repos.Filter{
"accountName": ctx.AccountName,
"metadata.name": cluster.Name,
"metadata.namespace": accNs,
})
if err = d.natCli.Conn.Publish(d.clusterResUpdateSubject(&cluster), []byte("Delete")); err != nil {
d.logger.Errorf(err, "failed to publish message to account %q", cluster.Cluster.Spec.AccountId)
}

return onDeletedClusterMessage
}

func (d *domain) OnUpdateClusterMessage(ctx InfraContext, cluster entities.Cluster) error {
Expand All @@ -449,6 +467,9 @@ func (d *domain) OnUpdateClusterMessage(ctx InfraContext, cluster entities.Clust
c.Status = cluster.Status

_, err = d.clusterRepo.UpdateById(ctx, c.Id, c)
if err = d.natCli.Conn.Publish(d.clusterResUpdateSubject(c) , []byte("Updated")); err != nil {
d.logger.Errorf(err, "failed to publish message to account %q", cluster.Cluster.Spec.AccountId)
}
return errors.NewE(err)
}

Expand Down
14 changes: 13 additions & 1 deletion apps/infra/internal/domain/domain-entries.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ func (d *domain) CreateDomainEntry(ctx InfraContext, de entities.DomainEntry) (*
if err != nil {
return nil, errors.NewE(err)
}
if err = d.natCli.Conn.Publish(d.domainResUpdateSubject(nde), []byte("Added")); err != nil {
d.logger.Errorf(err, "failed to publish message to account %q", d.domainResUpdateSubject(nde))
}

return nde, nil
}
Expand All @@ -68,6 +71,9 @@ func (d *domain) UpdateDomainEntry(ctx InfraContext, de entities.DomainEntry) (*
if err != nil {
return nil, errors.NewE(err)
}
if err = d.natCli.Conn.Publish(d.domainResUpdateSubject(newDe), []byte("Updated")); err != nil {
d.logger.Errorf(err, "failed to publish message to account %q", d.domainResUpdateSubject(newDe))
}
return newDe, nil
}

Expand All @@ -80,7 +86,13 @@ func (d *domain) DeleteDomainEntry(ctx InfraContext, domainName string) error {
return errors.NewE(err)
}

return d.domainEntryRepo.DeleteById(ctx, entry.Id)
if err = d.domainEntryRepo.DeleteById(ctx, entry.Id); err != nil {
return err
}
if err = d.natCli.Conn.Publish(d.domainResUpdateSubject(entry), []byte("Updated")); err != nil {
d.logger.Errorf(err, "failed to publish message to account %q", d.domainResUpdateSubject(entry))
}
return err
}

func (d *domain) findDomainEntry(ctx context.Context, accountName string, domainName string) (*entities.DomainEntry, error) {
Expand Down
5 changes: 4 additions & 1 deletion apps/infra/internal/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package domain

import (
"fmt"
"github.com/kloudlite/api/pkg/nats"
"strconv"

"github.com/kloudlite/api/pkg/errors"
Expand Down Expand Up @@ -40,6 +41,7 @@ type domain struct {
messageOfficeInternalClient message_office_internal.MessageOfficeInternalClient
resDispatcher ResourceDispatcher
k8sClient k8s.Client
natCli *nats.Client
}

func (d *domain) resyncToTargetCluster(ctx InfraContext, action types.SyncAction, clusterName string, obj client.Object, recordVersion int) error {
Expand Down Expand Up @@ -143,11 +145,12 @@ var Module = fx.Module("domain",
iamClient iam.IAMClient,
accountsSvc AccountsSvc,
msgOfficeInternalClient message_office_internal.MessageOfficeInternalClient,

natCli *nats.Client,
logger logging.Logger,
) Domain {
return &domain{
logger: logger,
natCli: natCli,
env: env,
clusterRepo: clusterRepo,
nodeRepo: nodeRepo,
Expand Down
27 changes: 26 additions & 1 deletion apps/infra/internal/domain/nodepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,15 @@ func (d *domain) CreateNodePool(ctx InfraContext, clusterName string, nodepool e
}
return nil, errors.NewE(err)
}
if err:=d.natCli.Conn.Publish(d.nodePoolResUpdateSubject(&nodepool), []byte("Added")); err != nil {
d.logger.Errorf(err, "failed to publish message to subject %q", d.nodePoolResUpdateSubject(&nodepool), []byte("Added"))
}

if err := d.resDispatcher.ApplyToTargetCluster(ctx, clusterName, &np.NodePool, np.RecordVersion); err != nil {
return nil, errors.NewE(err)
}


return np, nil
}

Expand Down Expand Up @@ -194,6 +198,14 @@ func (d *domain) UpdateNodePool(ctx InfraContext, clusterName string, nodePool e
if err != nil {
return nil, errors.NewE(err)
}
if err:=d.natCli.Conn.Publish(d.nodePoolResUpdateSubject(unp), []byte("Updated")); err != nil {
d.logger.Errorf(err, "failed to publish message to subject %q", d.nodePoolResUpdateSubject(unp), []byte("Added"))
}


if err:=d.natCli.Conn.Publish(d.nodePoolResUpdateSubject(unp), []byte("Added")); err != nil {
d.logger.Errorf(err, "failed to publish message to subject %q", d.nodePoolResUpdateSubject(unp))
}

if err := d.resDispatcher.ApplyToTargetCluster(ctx, clusterName, &unp.NodePool, unp.RecordVersion); err != nil {
return nil, errors.NewE(err)
Expand Down Expand Up @@ -221,6 +233,9 @@ func (d *domain) DeleteNodePool(ctx InfraContext, clusterName string, poolName s
if err != nil {
return errors.NewE(err)
}
if err:=d.natCli.Conn.Publish(d.nodePoolResUpdateSubject(upC), []byte("Updated")); err != nil {
d.logger.Errorf(err, "failed to publish message to subject %q", d.nodePoolResUpdateSubject(upC), []byte("Added"))
}
return d.resDispatcher.DeleteFromTargetCluster(ctx, clusterName, &upC.NodePool)
}

Expand Down Expand Up @@ -291,7 +306,11 @@ func (d *domain) OnDeleteNodePoolMessage(ctx InfraContext, clusterName string, n
return d.resyncToTargetCluster(ctx, np.SyncStatus.Action, clusterName, &np.NodePool, np.RecordVersion)
}

return d.nodePoolRepo.DeleteById(ctx, np.Id)
err := d.nodePoolRepo.DeleteById(ctx, np.Id)
if err:=d.natCli.Conn.Publish(d.nodePoolResUpdateSubject(np), []byte("Deleted")); err != nil {
d.logger.Errorf(err, "failed to publish message to subject %q", d.nodePoolResUpdateSubject(np))
}
return err
}

func (d *domain) OnUpdateNodePoolMessage(ctx InfraContext, clusterName string, nodePool entities.NodePool) error {
Expand All @@ -314,6 +333,9 @@ func (d *domain) OnUpdateNodePoolMessage(ctx InfraContext, clusterName string, n
if _, err := d.nodePoolRepo.UpdateById(ctx, np.Id, np); err != nil {
return errors.NewE(err)
}
if err:=d.natCli.Conn.Publish(d.nodePoolResUpdateSubject(np), []byte("Updated")); err != nil {
d.logger.Errorf(err, "failed to publish message to subject %q", d.nodePoolResUpdateSubject(np))
}
return nil
}

Expand All @@ -329,5 +351,8 @@ func (d *domain) OnNodepoolApplyError(ctx InfraContext, clusterName string, name
np.SyncStatus.Error = &errMsg

_, err = d.nodePoolRepo.UpdateById(ctx, np.Id, np)
if err:=d.natCli.Conn.Publish(d.nodePoolResUpdateSubject(np), []byte("Updated")); err != nil {
d.logger.Errorf(err, "failed to publish message to subject %q", d.nodePoolResUpdateSubject(np))
}
return errors.NewE(err)
}
6 changes: 6 additions & 0 deletions apps/infra/internal/domain/pvc.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ func (d *domain) OnPVCUpdateMessage(ctx InfraContext, clusterName string, pvc en
}, &pvc); err != nil {
return errors.NewE(err)
}
if err:=d.natCli.Conn.Publish(d.pvcResUpdateSubject(&pvc), []byte("Updated")); err != nil {
d.logger.Errorf(err, "failed to publish message to subject %q", d.pvcResUpdateSubject(&pvc))
}
return nil
}

Expand All @@ -51,5 +54,8 @@ func (d *domain) OnPVCDeleteMessage(ctx InfraContext, clusterName string, pvc en
}); err != nil {
return errors.NewE(err)
}
if err:=d.natCli.Conn.Publish(d.pvcResUpdateSubject(&pvc), []byte("Deleted")); err != nil {
d.logger.Errorf(err, "failed to publish message to subject %q", d.pvcResUpdateSubject(&pvc))
}
return nil
}
62 changes: 62 additions & 0 deletions apps/infra/internal/domain/update-subjects.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package domain

import (
"fmt"
"github.com/kloudlite/api/apps/infra/internal/entities"
)

func (d *domain) clusterResUpdateSubject(cluster *entities.Cluster) string {
return fmt.Sprint(
"res-updates.",
"account.",
cluster.Cluster.Spec.AccountName, ".",
"cluster.",
cluster.Cluster.Name)
}


func (d *domain) nodePoolResUpdateSubject(nodePool *entities.NodePool) string {
return fmt.Sprint(
"res-updates.",
"account.",
nodePool.AccountName, ".",
"cluster.",
nodePool.ClusterName, ".",
"node-pool.", nodePool.Name,
)
}

func (d *domain) domainResUpdateSubject(domainEntry *entities.DomainEntry) string {
return fmt.Sprint(
"res-updates.",
"account.",
domainEntry.AccountName, ".",
"cluster.",
domainEntry.ClusterName, ".",
"domain.", domainEntry.DomainName,
)
}


func (d *domain) vpnDeviceResUpdateSubject(device *entities.VPNDevice) string {
return fmt.Sprint(
"res-updates.",
"account.",
device.AccountName, ".",
"cluster.",
device.ClusterName, ".",
"vpn-device.", device.Name,
)
}


func (d *domain) pvcResUpdateSubject(pvc *entities.PersistentVolumeClaim) string {
return fmt.Sprint(
"res-updates.",
"account.",
pvc.AccountName, ".",
"cluster.",
pvc.ClusterName, ".",
"vpn-device.", pvc.Name,
)
}
24 changes: 22 additions & 2 deletions apps/infra/internal/domain/vpn-device.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ func (d *domain) ListVPNDevices(ctx context.Context, accountName string, cluster
if clusterName != nil {
filter["clusterName"] = *clusterName
}

return d.vpnDeviceRepo.FindPaginated(ctx, d.vpnDeviceRepo.MergeMatchFilters(filter, search), pagination)
}

Expand Down Expand Up @@ -50,6 +49,9 @@ func (d *domain) CreateVPNDevice(ctx InfraContext, clusterName string, device en
}
return nil, errors.NewE(err)
}
if err:=d.natCli.Conn.Publish(d.vpnDeviceResUpdateSubject(nDevice), []byte("Deleted")); err != nil {
d.logger.Errorf(err, "failed to publish message to subject %q", d.vpnDeviceResUpdateSubject(nDevice))
}

if err := d.resDispatcher.ApplyToTargetCluster(ctx, clusterName, &nDevice.Device, nDevice.RecordVersion); err != nil {
return nil, errors.NewE(err)
Expand Down Expand Up @@ -87,6 +89,9 @@ func (d *domain) UpdateVPNDevice(ctx InfraContext, clusterName string, device en
if err != nil {
return nil, errors.NewE(err)
}
if err:=d.natCli.Conn.Publish(d.vpnDeviceResUpdateSubject(nDevice), []byte("Updated")); err != nil {
d.logger.Errorf(err, "failed to publish message to subject %q", d.vpnDeviceResUpdateSubject(nDevice))
}

if err := d.resDispatcher.ApplyToTargetCluster(ctx, clusterName, &nDevice.Device, nDevice.RecordVersion); err != nil {
return nil, errors.NewE(err)
Expand Down Expand Up @@ -126,6 +131,9 @@ func (d *domain) DeleteVPNDevice(ctx InfraContext, clusterName string, name stri
if _, err := d.vpnDeviceRepo.UpdateById(ctx, device.Id, device); err != nil {
return errors.NewE(err)
}
if err:=d.natCli.Conn.Publish(d.vpnDeviceResUpdateSubject(device), []byte("Updated")); err != nil {
d.logger.Errorf(err, "failed to publish message to subject %q", d.vpnDeviceResUpdateSubject(device))
}
return d.resDispatcher.DeleteFromTargetCluster(ctx, clusterName, &device.Device)
}

Expand All @@ -140,6 +148,9 @@ func (d *domain) OnVPNDeviceApplyError(ctx InfraContext, clusterName string, nam
currDevice.SyncStatus.Error = &errMsg

_, err = d.vpnDeviceRepo.UpdateById(ctx, currDevice.Id, currDevice)
if err:=d.natCli.Conn.Publish(d.vpnDeviceResUpdateSubject(currDevice), []byte("Updated")); err != nil {
d.logger.Errorf(err, "failed to publish message to subject %q", d.vpnDeviceResUpdateSubject(currDevice))
}
return errors.NewE(err)
}

Expand Down Expand Up @@ -168,6 +179,9 @@ func (d *domain) OnVPNDeviceUpdateMessage(ctx InfraContext, clusterName string,
currDevice.SyncStatus.LastSyncedAt = time.Now()

_, err = d.vpnDeviceRepo.UpdateById(ctx, currDevice.Id, currDevice)
if err:=d.natCli.Conn.Publish(d.vpnDeviceResUpdateSubject(currDevice), []byte("Updated")); err != nil {
d.logger.Errorf(err, "failed to publish message to subject %q", d.vpnDeviceResUpdateSubject(currDevice))
}
return errors.NewE(err)
}

Expand All @@ -181,5 +195,11 @@ func (d *domain) OnVPNDeviceDeleteMessage(ctx InfraContext, clusterName string,
return errors.NewE(err)
}

return d.vpnDeviceRepo.DeleteById(ctx, currDevice.Id)
if err = d.vpnDeviceRepo.DeleteById(ctx, currDevice.Id); err != nil {
return errors.NewE(err)
}
if err:=d.natCli.Conn.Publish(d.vpnDeviceResUpdateSubject(currDevice), []byte("Updated")); err != nil {
d.logger.Errorf(err, "failed to publish message to subject %q", d.vpnDeviceResUpdateSubject(currDevice))
}
return nil
}
Loading