diff --git a/apps/infra/internal/domain/build-run.go b/apps/infra/internal/domain/build-run.go index 2446c00bd..df0b58135 100644 --- a/apps/infra/internal/domain/build-run.go +++ b/apps/infra/internal/domain/build-run.go @@ -39,6 +39,7 @@ func (d *domain) OnBuildRunUpdateMessage(ctx InfraContext, clusterName string, b }, &buildRun); err != nil { return errors.NewE(err) } + return nil } @@ -51,5 +52,6 @@ func (d *domain) OnBuildRunDeleteMessage(ctx InfraContext, clusterName string, b }); err != nil { return errors.NewE(err) } + //d.natCli.Conn.Publish(fmt.Scan(buildRun.BuildRun, )) return nil } diff --git a/apps/infra/internal/domain/clusters.go b/apps/infra/internal/domain/clusters.go index e53b9a85f..1371cb632 100644 --- a/apps/infra/internal/domain/clusters.go +++ b/apps/infra/internal/domain/clusters.go @@ -244,6 +244,10 @@ func (d *domain) CreateCluster(ctx InfraContext, cluster entities.Cluster) (*ent return nil, errors.NewE(err) } + if err = d.natCli.Conn.Publish(d.clusterResUpdateSubject(nCluster), []byte("Added")); err != nil { + d.logger.Errorf(err, "failed to publish message to account %q", cluster.Cluster.Spec.AccountId) + } + return nCluster, nil } @@ -361,7 +365,7 @@ func (d *domain) UpdateCluster(ctx InfraContext, cluster entities.Cluster) (*ent UserEmail: ctx.UserEmail, } - // FIXME: no update for cluster spec + // FIXME: no update for cluster spec // clus.Spec = cluster.Spec clus.Labels = cluster.Labels @@ -377,9 +381,13 @@ func (d *domain) UpdateCluster(ctx InfraContext, cluster entities.Cluster) (*ent return nil, errors.NewE(err) } + if err:=d.natCli.Conn.Publish(d.clusterResUpdateSubject(&cluster), []byte("Updated")); err != nil { + d.logger.Errorf(err, "failed to publish message to account %q", cluster.Cluster.Spec.AccountId) + } return uCluster, nil } + func (d *domain) readClusterK8sResource(ctx InfraContext, namespace string, name string) (cluster *clustersv1.Cluster, found bool, err error) { var clus entities.Cluster if err := d.k8sClient.Get(ctx, fn.NN(namespace, name), &clus.Cluster); err != nil { @@ -408,23 +416,33 @@ func (d *domain) DeleteCluster(ctx InfraContext, name string) error { return errors.NewE(err) } - return d.deleteK8sResource(ctx, &upC.Cluster) + deletedCluster := d.deleteK8sResource(ctx, &upC.Cluster) + + if err = d.natCli.Conn.Publish(d.clusterResUpdateSubject(c), []byte("Update")); err != nil { + d.logger.Errorf(err, "failed to publish message to account %q", c.Cluster.Spec.AccountId) + } + + return deletedCluster } return nil -} +} func (d *domain) OnDeleteClusterMessage(ctx InfraContext, cluster entities.Cluster) error { accNs, err := d.getAccNamespace(ctx, ctx.AccountName) if err != nil { return errors.NewE(err) } - - return d.clusterRepo.DeleteOne(ctx, repos.Filter{ + onDeletedClusterMessage := d.clusterRepo.DeleteOne(ctx, repos.Filter{ "accountName": ctx.AccountName, "metadata.name": cluster.Name, "metadata.namespace": accNs, }) + if err = d.natCli.Conn.Publish(d.clusterResUpdateSubject(&cluster), []byte("Delete")); err != nil { + d.logger.Errorf(err, "failed to publish message to account %q", cluster.Cluster.Spec.AccountId) + } + + return onDeletedClusterMessage } func (d *domain) OnUpdateClusterMessage(ctx InfraContext, cluster entities.Cluster) error { @@ -449,6 +467,9 @@ func (d *domain) OnUpdateClusterMessage(ctx InfraContext, cluster entities.Clust c.Status = cluster.Status _, err = d.clusterRepo.UpdateById(ctx, c.Id, c) + if err = d.natCli.Conn.Publish(d.clusterResUpdateSubject(c) , []byte("Updated")); err != nil { + d.logger.Errorf(err, "failed to publish message to account %q", cluster.Cluster.Spec.AccountId) + } return errors.NewE(err) } diff --git a/apps/infra/internal/domain/domain-entries.go b/apps/infra/internal/domain/domain-entries.go index 9ad47f61d..a6ba598cd 100644 --- a/apps/infra/internal/domain/domain-entries.go +++ b/apps/infra/internal/domain/domain-entries.go @@ -43,6 +43,9 @@ func (d *domain) CreateDomainEntry(ctx InfraContext, de entities.DomainEntry) (* if err != nil { return nil, errors.NewE(err) } + if err = d.natCli.Conn.Publish(d.domainResUpdateSubject(nde), []byte("Added")); err != nil { + d.logger.Errorf(err, "failed to publish message to account %q", d.domainResUpdateSubject(nde)) + } return nde, nil } @@ -68,6 +71,9 @@ func (d *domain) UpdateDomainEntry(ctx InfraContext, de entities.DomainEntry) (* if err != nil { return nil, errors.NewE(err) } + if err = d.natCli.Conn.Publish(d.domainResUpdateSubject(newDe), []byte("Updated")); err != nil { + d.logger.Errorf(err, "failed to publish message to account %q", d.domainResUpdateSubject(newDe)) + } return newDe, nil } @@ -80,7 +86,13 @@ func (d *domain) DeleteDomainEntry(ctx InfraContext, domainName string) error { return errors.NewE(err) } - return d.domainEntryRepo.DeleteById(ctx, entry.Id) + if err = d.domainEntryRepo.DeleteById(ctx, entry.Id); err != nil { + return err + } + if err = d.natCli.Conn.Publish(d.domainResUpdateSubject(entry), []byte("Updated")); err != nil { + d.logger.Errorf(err, "failed to publish message to account %q", d.domainResUpdateSubject(entry)) + } + return err } func (d *domain) findDomainEntry(ctx context.Context, accountName string, domainName string) (*entities.DomainEntry, error) { diff --git a/apps/infra/internal/domain/domain.go b/apps/infra/internal/domain/domain.go index b3992a946..546f18d84 100644 --- a/apps/infra/internal/domain/domain.go +++ b/apps/infra/internal/domain/domain.go @@ -2,6 +2,7 @@ package domain import ( "fmt" + "github.com/kloudlite/api/pkg/nats" "strconv" "github.com/kloudlite/api/pkg/errors" @@ -40,6 +41,7 @@ type domain struct { messageOfficeInternalClient message_office_internal.MessageOfficeInternalClient resDispatcher ResourceDispatcher k8sClient k8s.Client + natCli *nats.Client } func (d *domain) resyncToTargetCluster(ctx InfraContext, action types.SyncAction, clusterName string, obj client.Object, recordVersion int) error { @@ -143,11 +145,12 @@ var Module = fx.Module("domain", iamClient iam.IAMClient, accountsSvc AccountsSvc, msgOfficeInternalClient message_office_internal.MessageOfficeInternalClient, - + natCli *nats.Client, logger logging.Logger, ) Domain { return &domain{ logger: logger, + natCli: natCli, env: env, clusterRepo: clusterRepo, nodeRepo: nodeRepo, diff --git a/apps/infra/internal/domain/nodepool.go b/apps/infra/internal/domain/nodepool.go index 3231f6fcf..af9632aef 100644 --- a/apps/infra/internal/domain/nodepool.go +++ b/apps/infra/internal/domain/nodepool.go @@ -144,11 +144,15 @@ func (d *domain) CreateNodePool(ctx InfraContext, clusterName string, nodepool e } return nil, errors.NewE(err) } + if err:=d.natCli.Conn.Publish(d.nodePoolResUpdateSubject(&nodepool), []byte("Added")); err != nil { + d.logger.Errorf(err, "failed to publish message to subject %q", d.nodePoolResUpdateSubject(&nodepool), []byte("Added")) + } if err := d.resDispatcher.ApplyToTargetCluster(ctx, clusterName, &np.NodePool, np.RecordVersion); err != nil { return nil, errors.NewE(err) } + return np, nil } @@ -194,6 +198,14 @@ func (d *domain) UpdateNodePool(ctx InfraContext, clusterName string, nodePool e if err != nil { return nil, errors.NewE(err) } + if err:=d.natCli.Conn.Publish(d.nodePoolResUpdateSubject(unp), []byte("Updated")); err != nil { + d.logger.Errorf(err, "failed to publish message to subject %q", d.nodePoolResUpdateSubject(unp), []byte("Added")) + } + + + if err:=d.natCli.Conn.Publish(d.nodePoolResUpdateSubject(unp), []byte("Added")); err != nil { + d.logger.Errorf(err, "failed to publish message to subject %q", d.nodePoolResUpdateSubject(unp)) + } if err := d.resDispatcher.ApplyToTargetCluster(ctx, clusterName, &unp.NodePool, unp.RecordVersion); err != nil { return nil, errors.NewE(err) @@ -221,6 +233,9 @@ func (d *domain) DeleteNodePool(ctx InfraContext, clusterName string, poolName s if err != nil { return errors.NewE(err) } + if err:=d.natCli.Conn.Publish(d.nodePoolResUpdateSubject(upC), []byte("Updated")); err != nil { + d.logger.Errorf(err, "failed to publish message to subject %q", d.nodePoolResUpdateSubject(upC), []byte("Added")) + } return d.resDispatcher.DeleteFromTargetCluster(ctx, clusterName, &upC.NodePool) } @@ -291,7 +306,11 @@ func (d *domain) OnDeleteNodePoolMessage(ctx InfraContext, clusterName string, n return d.resyncToTargetCluster(ctx, np.SyncStatus.Action, clusterName, &np.NodePool, np.RecordVersion) } - return d.nodePoolRepo.DeleteById(ctx, np.Id) + err := d.nodePoolRepo.DeleteById(ctx, np.Id) + if err:=d.natCli.Conn.Publish(d.nodePoolResUpdateSubject(np), []byte("Deleted")); err != nil { + d.logger.Errorf(err, "failed to publish message to subject %q", d.nodePoolResUpdateSubject(np)) + } + return err } func (d *domain) OnUpdateNodePoolMessage(ctx InfraContext, clusterName string, nodePool entities.NodePool) error { @@ -314,6 +333,9 @@ func (d *domain) OnUpdateNodePoolMessage(ctx InfraContext, clusterName string, n if _, err := d.nodePoolRepo.UpdateById(ctx, np.Id, np); err != nil { return errors.NewE(err) } + if err:=d.natCli.Conn.Publish(d.nodePoolResUpdateSubject(np), []byte("Updated")); err != nil { + d.logger.Errorf(err, "failed to publish message to subject %q", d.nodePoolResUpdateSubject(np)) + } return nil } @@ -329,5 +351,8 @@ func (d *domain) OnNodepoolApplyError(ctx InfraContext, clusterName string, name np.SyncStatus.Error = &errMsg _, err = d.nodePoolRepo.UpdateById(ctx, np.Id, np) + if err:=d.natCli.Conn.Publish(d.nodePoolResUpdateSubject(np), []byte("Updated")); err != nil { + d.logger.Errorf(err, "failed to publish message to subject %q", d.nodePoolResUpdateSubject(np)) + } return errors.NewE(err) } diff --git a/apps/infra/internal/domain/pvc.go b/apps/infra/internal/domain/pvc.go index 7bac06372..7874420a9 100644 --- a/apps/infra/internal/domain/pvc.go +++ b/apps/infra/internal/domain/pvc.go @@ -39,6 +39,9 @@ func (d *domain) OnPVCUpdateMessage(ctx InfraContext, clusterName string, pvc en }, &pvc); err != nil { return errors.NewE(err) } + if err:=d.natCli.Conn.Publish(d.pvcResUpdateSubject(&pvc), []byte("Updated")); err != nil { + d.logger.Errorf(err, "failed to publish message to subject %q", d.pvcResUpdateSubject(&pvc)) + } return nil } @@ -51,5 +54,8 @@ func (d *domain) OnPVCDeleteMessage(ctx InfraContext, clusterName string, pvc en }); err != nil { return errors.NewE(err) } + if err:=d.natCli.Conn.Publish(d.pvcResUpdateSubject(&pvc), []byte("Deleted")); err != nil { + d.logger.Errorf(err, "failed to publish message to subject %q", d.pvcResUpdateSubject(&pvc)) + } return nil } diff --git a/apps/infra/internal/domain/update-subjects.go b/apps/infra/internal/domain/update-subjects.go new file mode 100644 index 000000000..8d1adc400 --- /dev/null +++ b/apps/infra/internal/domain/update-subjects.go @@ -0,0 +1,62 @@ +package domain + +import ( + "fmt" + "github.com/kloudlite/api/apps/infra/internal/entities" +) + +func (d *domain) clusterResUpdateSubject(cluster *entities.Cluster) string { + return fmt.Sprint( + "res-updates.", + "account.", + cluster.Cluster.Spec.AccountName, ".", + "cluster.", + cluster.Cluster.Name) +} + + +func (d *domain) nodePoolResUpdateSubject(nodePool *entities.NodePool) string { + return fmt.Sprint( + "res-updates.", + "account.", + nodePool.AccountName, ".", + "cluster.", + nodePool.ClusterName, ".", + "node-pool.", nodePool.Name, + ) +} + +func (d *domain) domainResUpdateSubject(domainEntry *entities.DomainEntry) string { + return fmt.Sprint( + "res-updates.", + "account.", + domainEntry.AccountName, ".", + "cluster.", + domainEntry.ClusterName, ".", + "domain.", domainEntry.DomainName, + ) +} + + +func (d *domain) vpnDeviceResUpdateSubject(device *entities.VPNDevice) string { + return fmt.Sprint( + "res-updates.", + "account.", + device.AccountName, ".", + "cluster.", + device.ClusterName, ".", + "vpn-device.", device.Name, + ) +} + + +func (d *domain) pvcResUpdateSubject(pvc *entities.PersistentVolumeClaim) string { + return fmt.Sprint( + "res-updates.", + "account.", + pvc.AccountName, ".", + "cluster.", + pvc.ClusterName, ".", + "vpn-device.", pvc.Name, + ) +} diff --git a/apps/infra/internal/domain/vpn-device.go b/apps/infra/internal/domain/vpn-device.go index 2d777e049..8ff87fa6a 100644 --- a/apps/infra/internal/domain/vpn-device.go +++ b/apps/infra/internal/domain/vpn-device.go @@ -16,7 +16,6 @@ func (d *domain) ListVPNDevices(ctx context.Context, accountName string, cluster if clusterName != nil { filter["clusterName"] = *clusterName } - return d.vpnDeviceRepo.FindPaginated(ctx, d.vpnDeviceRepo.MergeMatchFilters(filter, search), pagination) } @@ -50,6 +49,9 @@ func (d *domain) CreateVPNDevice(ctx InfraContext, clusterName string, device en } return nil, errors.NewE(err) } + if err:=d.natCli.Conn.Publish(d.vpnDeviceResUpdateSubject(nDevice), []byte("Deleted")); err != nil { + d.logger.Errorf(err, "failed to publish message to subject %q", d.vpnDeviceResUpdateSubject(nDevice)) + } if err := d.resDispatcher.ApplyToTargetCluster(ctx, clusterName, &nDevice.Device, nDevice.RecordVersion); err != nil { return nil, errors.NewE(err) @@ -87,6 +89,9 @@ func (d *domain) UpdateVPNDevice(ctx InfraContext, clusterName string, device en if err != nil { return nil, errors.NewE(err) } + if err:=d.natCli.Conn.Publish(d.vpnDeviceResUpdateSubject(nDevice), []byte("Updated")); err != nil { + d.logger.Errorf(err, "failed to publish message to subject %q", d.vpnDeviceResUpdateSubject(nDevice)) + } if err := d.resDispatcher.ApplyToTargetCluster(ctx, clusterName, &nDevice.Device, nDevice.RecordVersion); err != nil { return nil, errors.NewE(err) @@ -126,6 +131,9 @@ func (d *domain) DeleteVPNDevice(ctx InfraContext, clusterName string, name stri if _, err := d.vpnDeviceRepo.UpdateById(ctx, device.Id, device); err != nil { return errors.NewE(err) } + if err:=d.natCli.Conn.Publish(d.vpnDeviceResUpdateSubject(device), []byte("Updated")); err != nil { + d.logger.Errorf(err, "failed to publish message to subject %q", d.vpnDeviceResUpdateSubject(device)) + } return d.resDispatcher.DeleteFromTargetCluster(ctx, clusterName, &device.Device) } @@ -140,6 +148,9 @@ func (d *domain) OnVPNDeviceApplyError(ctx InfraContext, clusterName string, nam currDevice.SyncStatus.Error = &errMsg _, err = d.vpnDeviceRepo.UpdateById(ctx, currDevice.Id, currDevice) + if err:=d.natCli.Conn.Publish(d.vpnDeviceResUpdateSubject(currDevice), []byte("Updated")); err != nil { + d.logger.Errorf(err, "failed to publish message to subject %q", d.vpnDeviceResUpdateSubject(currDevice)) + } return errors.NewE(err) } @@ -168,6 +179,9 @@ func (d *domain) OnVPNDeviceUpdateMessage(ctx InfraContext, clusterName string, currDevice.SyncStatus.LastSyncedAt = time.Now() _, err = d.vpnDeviceRepo.UpdateById(ctx, currDevice.Id, currDevice) + if err:=d.natCli.Conn.Publish(d.vpnDeviceResUpdateSubject(currDevice), []byte("Updated")); err != nil { + d.logger.Errorf(err, "failed to publish message to subject %q", d.vpnDeviceResUpdateSubject(currDevice)) + } return errors.NewE(err) } @@ -181,5 +195,11 @@ func (d *domain) OnVPNDeviceDeleteMessage(ctx InfraContext, clusterName string, return errors.NewE(err) } - return d.vpnDeviceRepo.DeleteById(ctx, currDevice.Id) + if err = d.vpnDeviceRepo.DeleteById(ctx, currDevice.Id); err != nil { + return errors.NewE(err) + } + if err:=d.natCli.Conn.Publish(d.vpnDeviceResUpdateSubject(currDevice), []byte("Updated")); err != nil { + d.logger.Errorf(err, "failed to publish message to subject %q", d.vpnDeviceResUpdateSubject(currDevice)) + } + return nil } diff --git a/apps/infra/internal/framework/framework.go b/apps/infra/internal/framework/framework.go index 5b3232d1b..c63c0e0a0 100644 --- a/apps/infra/internal/framework/framework.go +++ b/apps/infra/internal/framework/framework.go @@ -13,6 +13,7 @@ import ( httpServer "github.com/kloudlite/api/pkg/http-server" "github.com/kloudlite/api/pkg/logging" "github.com/kloudlite/api/pkg/nats" + mongoRepo "github.com/kloudlite/api/pkg/repos" "go.uber.org/fx" ) @@ -40,14 +41,14 @@ var Module = fx.Module("framework", mongoRepo.NewMongoClientFx[*framework](), - fx.Provide(func(ev *env.Env, logger logging.Logger) (*nats.JetstreamClient, error) { - c, err := nats.NewClient(ev.NatsURL, nats.ClientOpts{ + fx.Provide(func(ev *env.Env, logger logging.Logger) (*nats.Client, error) { + return nats.NewClient(ev.NatsURL, nats.ClientOpts{ Name: "infra", Logger: logger, }) - if err != nil { - return nil, errors.NewE(err) - } + }), + + fx.Provide(func(c *nats.Client) (*nats.JetstreamClient, error) { return nats.NewJetstreamClient(c) }),