Skip to content
This repository was archived by the owner on Jun 11, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 127 additions & 0 deletions apps/infra/internal/app/adapter-resource-update-publish.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package app

import (
"fmt"
"github.com/kloudlite/api/apps/infra/internal/domain"
"github.com/kloudlite/api/apps/infra/internal/entities"
"github.com/kloudlite/api/pkg/logging"
"github.com/kloudlite/api/pkg/nats"
)

type ResourceEventPublisherImpl struct {
cli *nats.Client
logger logging.Logger
}

func (r *ResourceEventPublisherImpl) PublishClusterEvent(cluster *entities.Cluster, msg domain.PublishMsg) {
subject := clusterResUpdateSubject(cluster)
if err := r.cli.Conn.Publish(subject, []byte(msg)); err != nil {
r.logger.Errorf(err, "failed to publish message to subject %q", subject)
}
}

func (r *ResourceEventPublisherImpl) PublishNodePoolEvent(np *entities.NodePool, msg domain.PublishMsg) {
subject := nodePoolResUpdateSubject(np)
if err := r.cli.Conn.Publish(subject, []byte(msg)); err != nil {
r.logger.Errorf(err, "failed to publish message to subject %q", subject)
}
}

func (r *ResourceEventPublisherImpl) PublishVpnDeviceEvent(dev *entities.VPNDevice, msg domain.PublishMsg) {
subject := vpnDeviceResUpdateSubject(dev)
if err := r.cli.Conn.Publish(subject, []byte(msg)); err != nil {
r.logger.Errorf(err, "failed to publish message to subject %q", subject)
}
}

func (r *ResourceEventPublisherImpl) PublishDomainResEvent(domain *entities.DomainEntry, msg domain.PublishMsg) {
subject := domainResUpdateSubject(domain)
if err := r.cli.Conn.Publish(subject, []byte(msg)); err != nil {
r.logger.Errorf(err, "failed to publish message to subject %q", subject)
}
}

func (r *ResourceEventPublisherImpl) PublishPvcResEvent(pvc *entities.PersistentVolumeClaim, msg domain.PublishMsg) {
subject := pvcResUpdateSubject(pvc)
if err := r.cli.Conn.Publish(subject, []byte(msg)); err != nil {
r.logger.Errorf(err, "failed to publish message to subject %q", subject)
}
}

func (r *ResourceEventPublisherImpl) PublishCMSEvent(cms *entities.ClusterManagedService, msg domain.PublishMsg) {
subject := clusterManagedServiceUpdateSubject(cms)
if err := r.cli.Conn.Publish(subject, []byte(msg)); err != nil {
r.logger.Errorf(err, "failed to publish message to subject %q", subject)
}
}

func NewResourceEventPublisher(cli *nats.Client, logger logging.Logger) domain.ResourceEventPublisher {
return &ResourceEventPublisherImpl{
cli,
logger,
}
}

func clusterResUpdateSubject(cluster *entities.Cluster) string {
return fmt.Sprint(
"res-updates.",
"account.",
cluster.Cluster.Spec.AccountName, ".",
"cluster.",
cluster.Cluster.Name)
}

func nodePoolResUpdateSubject(nodePool *entities.NodePool) string {
return fmt.Sprint(
"res-updates.",
"account.",
nodePool.AccountName, ".",
"cluster.",
nodePool.ClusterName, ".",
"node-pool.", nodePool.Name,
)
}

func domainResUpdateSubject(domainEntry *entities.DomainEntry) string {
return fmt.Sprint(
"res-updates.",
"account.",
domainEntry.AccountName, ".",
"cluster.",
domainEntry.ClusterName, ".",
"domain.", domainEntry.DomainName,
)
}

func vpnDeviceResUpdateSubject(device *entities.VPNDevice) string {
return fmt.Sprint(
"res-updates.",
"account.",
device.AccountName, ".",
"cluster.",
device.ClusterName, ".",
"vpn-device.", device.Name,
)
}

func pvcResUpdateSubject(pvc *entities.PersistentVolumeClaim) string {
return fmt.Sprint(
"res-updates.",
"account.",
pvc.AccountName, ".",
"cluster.",
pvc.ClusterName, ".",
"vpn-device.", pvc.Name,
)
}

func clusterManagedServiceUpdateSubject(cmsvc *entities.ClusterManagedService) string {
return fmt.Sprint(
"res-updates.",
"account.",
cmsvc.AccountName, ".",
"cluster.",
cmsvc.ClusterName, ".",
"cluster-managed-service.", cmsvc.Name,
)
}
4 changes: 4 additions & 0 deletions apps/infra/internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ var Module = fx.Module(
})
}),

fx.Provide(func(cli *nats.Client, logger logging.Logger) domain.ResourceEventPublisher {
return NewResourceEventPublisher(cli, logger)
}),

domain.Module,

fx.Provide(func(d domain.Domain) infra.InfraServer {
Expand Down
24 changes: 6 additions & 18 deletions apps/infra/internal/domain/cluster-managed-service.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,7 @@ func (d *domain) CreateClusterManagedService(ctx InfraContext, clusterName strin
if cms, err := d.clusterManagedServiceRepo.Create(ctx, &service); err != nil {
return nil, errors.NewE(err)
} else {
if err := d.natCli.Conn.Publish(d.clusterManagedServiceUpdateSubject(&service), []byte("Added")); err != nil {
d.logger.Errorf(err, "failed to publish message to subject %q", d.clusterManagedServiceUpdateSubject(&service), []byte("Added"))
}
d.resourceEventPublisher.PublishCMSEvent(&service, PublishAdd)

return cms, nil
}
Expand Down Expand Up @@ -161,9 +159,7 @@ func (d *domain) UpdateClusterManagedService(ctx InfraContext, clusterName strin
return nil, errors.NewE(err)
}

if err := d.natCli.Conn.Publish(d.clusterManagedServiceUpdateSubject(unp), []byte("Updated")); err != nil {
d.logger.Errorf(err, "failed to publish message to subject %q", d.clusterManagedServiceUpdateSubject(unp), []byte("Updated"))
}
d.resourceEventPublisher.PublishCMSEvent(unp, PublishUpdate)

if err := d.resDispatcher.ApplyToTargetCluster(ctx, clusterName, unp, unp.RecordVersion); err != nil {
return nil, errors.NewE(err)
Expand Down Expand Up @@ -193,9 +189,7 @@ func (d *domain) DeleteClusterManagedService(ctx InfraContext, clusterName strin
return errors.NewE(err)
}

if err := d.natCli.Conn.Publish(d.clusterManagedServiceUpdateSubject(upC), []byte("Updated")); err != nil {
d.logger.Errorf(err, "failed to publish message to subject %q", d.clusterManagedServiceUpdateSubject(upC), []byte("Added"))
}
d.resourceEventPublisher.PublishCMSEvent(upC, PublishUpdate)

return d.resDispatcher.DeleteFromTargetCluster(ctx, clusterName, &upC.ClusterManagedService)
}
Expand All @@ -211,9 +205,7 @@ func (d *domain) OnClusterManagedServiceApplyError(ctx InfraContext, clusterName
svc.SyncStatus.Error = &errMsg

_, err = d.clusterManagedServiceRepo.UpdateById(ctx, svc.Id, svc)
if err := d.natCli.Conn.Publish(d.clusterManagedServiceUpdateSubject(svc), []byte("Updated")); err != nil {
d.logger.Errorf(err, "failed to publish message to subject %q", d.clusterManagedServiceUpdateSubject(svc))
}
d.resourceEventPublisher.PublishCMSEvent(svc, PublishUpdate)
return errors.NewE(err)
}

Expand All @@ -229,9 +221,7 @@ func (d *domain) OnClusterManagedServiceDeleteMessage(ctx InfraContext, clusterN
}

err := d.clusterManagedServiceRepo.DeleteById(ctx, svc.Id)
if err := d.natCli.Conn.Publish(d.clusterManagedServiceUpdateSubject(svc), []byte("Deleted")); err != nil {
d.logger.Errorf(err, "failed to publish message to subject %q", d.clusterManagedServiceUpdateSubject(svc))
}
d.resourceEventPublisher.PublishCMSEvent(svc, PublishDelete)
return err
}
func (d *domain) OnClusterManagedServiceUpdateMessage(ctx InfraContext, clusterName string, service entities.ClusterManagedService) error {
Expand All @@ -254,8 +244,6 @@ func (d *domain) OnClusterManagedServiceUpdateMessage(ctx InfraContext, clusterN
if _, err := d.clusterManagedServiceRepo.UpdateById(ctx, svc.Id, svc); err != nil {
return errors.NewE(err)
}
if err := d.natCli.Conn.Publish(d.clusterManagedServiceUpdateSubject(svc), []byte("Updated")); err != nil {
d.logger.Errorf(err, "failed to publish message to subject %q", d.clusterManagedServiceUpdateSubject(svc))
}
d.resourceEventPublisher.PublishCMSEvent(svc, PublishUpdate)
return nil
}
20 changes: 5 additions & 15 deletions apps/infra/internal/domain/clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,7 @@ func (d *domain) CreateCluster(ctx InfraContext, cluster entities.Cluster) (*ent
return nil, errors.NewE(err)
}

if err = d.natCli.Conn.Publish(d.clusterResUpdateSubject(nCluster), []byte("Added")); err != nil {
d.logger.Errorf(err, "failed to publish message to account %q", cluster.Cluster.Spec.AccountId)
}
d.resourceEventPublisher.PublishClusterEvent(&cluster, PublishAdd)

return nCluster, nil
}
Expand Down Expand Up @@ -335,9 +333,7 @@ func (d *domain) UpdateCluster(ctx InfraContext, cluster entities.Cluster) (*ent
return nil, errors.NewE(err)
}

if err := d.natCli.Conn.Publish(d.clusterResUpdateSubject(&cluster), []byte("Updated")); err != nil {
d.logger.Errorf(err, "failed to publish message to account %q", cluster.Cluster.Spec.AccountId)
}
d.resourceEventPublisher.PublishClusterEvent(&cluster, PublishUpdate)
return uCluster, nil
}

Expand Down Expand Up @@ -371,9 +367,7 @@ func (d *domain) DeleteCluster(ctx InfraContext, name string) error {

deletedCluster := d.deleteK8sResource(ctx, &upC.Cluster)

if err = d.natCli.Conn.Publish(d.clusterResUpdateSubject(c), []byte("Update")); err != nil {
d.logger.Errorf(err, "failed to publish message to account %q", c.Cluster.Spec.AccountId)
}
d.resourceEventPublisher.PublishClusterEvent(c, PublishUpdate)

return deletedCluster
}
Expand All @@ -392,9 +386,7 @@ func (d *domain) OnDeleteClusterMessage(ctx InfraContext, cluster entities.Clust
"metadata.name": cluster.Name,
"metadata.namespace": accNs,
})
if err = d.natCli.Conn.Publish(d.clusterResUpdateSubject(&cluster), []byte("Delete")); err != nil {
d.logger.Errorf(err, "failed to publish message to account %q", cluster.Cluster.Spec.AccountId)
}
d.resourceEventPublisher.PublishClusterEvent(&cluster, PublishDelete)

return onDeletedClusterMessage
}
Expand All @@ -421,9 +413,7 @@ func (d *domain) OnUpdateClusterMessage(ctx InfraContext, cluster entities.Clust
c.Status = cluster.Status

_, err = d.clusterRepo.UpdateById(ctx, c.Id, c)
if err = d.natCli.Conn.Publish(d.clusterResUpdateSubject(c), []byte("Updated")); err != nil {
d.logger.Errorf(err, "failed to publish message to account %q", cluster.Cluster.Spec.AccountId)
}
d.resourceEventPublisher.PublishClusterEvent(&cluster, PublishUpdate)
return errors.NewE(err)
}

Expand Down
12 changes: 3 additions & 9 deletions apps/infra/internal/domain/domain-entries.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@ func (d *domain) CreateDomainEntry(ctx InfraContext, de entities.DomainEntry) (*
if err != nil {
return nil, errors.NewE(err)
}
if err = d.natCli.Conn.Publish(d.domainResUpdateSubject(nde), []byte("Added")); err != nil {
d.logger.Errorf(err, "failed to publish message to account %q", d.domainResUpdateSubject(nde))
}
d.resourceEventPublisher.PublishDomainResEvent(nde, PublishAdd)

return nde, nil
}
Expand All @@ -71,9 +69,7 @@ func (d *domain) UpdateDomainEntry(ctx InfraContext, de entities.DomainEntry) (*
if err != nil {
return nil, errors.NewE(err)
}
if err = d.natCli.Conn.Publish(d.domainResUpdateSubject(newDe), []byte("Updated")); err != nil {
d.logger.Errorf(err, "failed to publish message to account %q", d.domainResUpdateSubject(newDe))
}
d.resourceEventPublisher.PublishDomainResEvent(newDe, PublishUpdate)
return newDe, nil
}

Expand All @@ -89,9 +85,7 @@ func (d *domain) DeleteDomainEntry(ctx InfraContext, domainName string) error {
if err = d.domainEntryRepo.DeleteById(ctx, entry.Id); err != nil {
return err
}
if err = d.natCli.Conn.Publish(d.domainResUpdateSubject(entry), []byte("Updated")); err != nil {
d.logger.Errorf(err, "failed to publish message to account %q", d.domainResUpdateSubject(entry))
}
d.resourceEventPublisher.PublishDomainResEvent(entry, PublishUpdate)
return err
}

Expand Down
3 changes: 3 additions & 0 deletions apps/infra/internal/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type domain struct {
resDispatcher ResourceDispatcher
k8sClient k8s.Client
natCli *nats.Client
resourceEventPublisher ResourceEventPublisher
}

func (d *domain) resyncToTargetCluster(ctx InfraContext, action types.SyncAction, clusterName string, obj client.Object, recordVersion int) error {
Expand Down Expand Up @@ -149,6 +150,7 @@ var Module = fx.Module("domain",
msgOfficeInternalClient message_office_internal.MessageOfficeInternalClient,
natCli *nats.Client,
logger logging.Logger,
resourceEventPublisher ResourceEventPublisher,
) Domain {
return &domain{
logger: logger,
Expand All @@ -168,6 +170,7 @@ var Module = fx.Module("domain",
iamClient: iamClient,
accountsSvc: accountsSvc,
messageOfficeInternalClient: msgOfficeInternalClient,
resourceEventPublisher: resourceEventPublisher,
}
}),
)
30 changes: 7 additions & 23 deletions apps/infra/internal/domain/nodepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,15 +144,12 @@ func (d *domain) CreateNodePool(ctx InfraContext, clusterName string, nodepool e
}
return nil, errors.NewE(err)
}
if err:=d.natCli.Conn.Publish(d.nodePoolResUpdateSubject(&nodepool), []byte("Added")); err != nil {
d.logger.Errorf(err, "failed to publish message to subject %q", d.nodePoolResUpdateSubject(&nodepool), []byte("Added"))
}
d.resourceEventPublisher.PublishNodePoolEvent(&nodepool, PublishAdd)

if err := d.resDispatcher.ApplyToTargetCluster(ctx, clusterName, &np.NodePool, np.RecordVersion); err != nil {
return nil, errors.NewE(err)
}


return np, nil
}

Expand Down Expand Up @@ -198,14 +195,9 @@ func (d *domain) UpdateNodePool(ctx InfraContext, clusterName string, nodePool e
if err != nil {
return nil, errors.NewE(err)
}
if err:=d.natCli.Conn.Publish(d.nodePoolResUpdateSubject(unp), []byte("Updated")); err != nil {
d.logger.Errorf(err, "failed to publish message to subject %q", d.nodePoolResUpdateSubject(unp), []byte("Added"))
}


if err:=d.natCli.Conn.Publish(d.nodePoolResUpdateSubject(unp), []byte("Added")); err != nil {
d.logger.Errorf(err, "failed to publish message to subject %q", d.nodePoolResUpdateSubject(unp))
}
d.resourceEventPublisher.PublishNodePoolEvent(unp, PublishUpdate)
d.resourceEventPublisher.PublishNodePoolEvent(unp, PublishDelete)

if err := d.resDispatcher.ApplyToTargetCluster(ctx, clusterName, &unp.NodePool, unp.RecordVersion); err != nil {
return nil, errors.NewE(err)
Expand Down Expand Up @@ -233,9 +225,7 @@ func (d *domain) DeleteNodePool(ctx InfraContext, clusterName string, poolName s
if err != nil {
return errors.NewE(err)
}
if err:=d.natCli.Conn.Publish(d.nodePoolResUpdateSubject(upC), []byte("Updated")); err != nil {
d.logger.Errorf(err, "failed to publish message to subject %q", d.nodePoolResUpdateSubject(upC), []byte("Added"))
}
d.resourceEventPublisher.PublishNodePoolEvent(upC, PublishUpdate)
return d.resDispatcher.DeleteFromTargetCluster(ctx, clusterName, &upC.NodePool)
}

Expand Down Expand Up @@ -307,9 +297,7 @@ func (d *domain) OnDeleteNodePoolMessage(ctx InfraContext, clusterName string, n
}

err := d.nodePoolRepo.DeleteById(ctx, np.Id)
if err:=d.natCli.Conn.Publish(d.nodePoolResUpdateSubject(np), []byte("Deleted")); err != nil {
d.logger.Errorf(err, "failed to publish message to subject %q", d.nodePoolResUpdateSubject(np))
}
d.resourceEventPublisher.PublishNodePoolEvent(np, PublishDelete)
return err
}

Expand All @@ -333,9 +321,7 @@ func (d *domain) OnUpdateNodePoolMessage(ctx InfraContext, clusterName string, n
if _, err := d.nodePoolRepo.UpdateById(ctx, np.Id, np); err != nil {
return errors.NewE(err)
}
if err:=d.natCli.Conn.Publish(d.nodePoolResUpdateSubject(np), []byte("Updated")); err != nil {
d.logger.Errorf(err, "failed to publish message to subject %q", d.nodePoolResUpdateSubject(np))
}
d.resourceEventPublisher.PublishNodePoolEvent(np, PublishUpdate)
return nil
}

Expand All @@ -351,8 +337,6 @@ func (d *domain) OnNodepoolApplyError(ctx InfraContext, clusterName string, name
np.SyncStatus.Error = &errMsg

_, err = d.nodePoolRepo.UpdateById(ctx, np.Id, np)
if err:=d.natCli.Conn.Publish(d.nodePoolResUpdateSubject(np), []byte("Updated")); err != nil {
d.logger.Errorf(err, "failed to publish message to subject %q", d.nodePoolResUpdateSubject(np))
}
d.resourceEventPublisher.PublishNodePoolEvent(np, PublishUpdate)
return errors.NewE(err)
}
Loading