diff --git a/apps/infra/internal/app/adapter-resource-update-publish.go b/apps/infra/internal/app/adapter-resource-update-publish.go new file mode 100644 index 000000000..f44c4cf4c --- /dev/null +++ b/apps/infra/internal/app/adapter-resource-update-publish.go @@ -0,0 +1,127 @@ +package app + +import ( + "fmt" + "github.com/kloudlite/api/apps/infra/internal/domain" + "github.com/kloudlite/api/apps/infra/internal/entities" + "github.com/kloudlite/api/pkg/logging" + "github.com/kloudlite/api/pkg/nats" +) + +type ResourceEventPublisherImpl struct { + cli *nats.Client + logger logging.Logger +} + +func (r *ResourceEventPublisherImpl) PublishClusterEvent(cluster *entities.Cluster, msg domain.PublishMsg) { + subject := clusterResUpdateSubject(cluster) + if err := r.cli.Conn.Publish(subject, []byte(msg)); err != nil { + r.logger.Errorf(err, "failed to publish message to subject %q", subject) + } +} + +func (r *ResourceEventPublisherImpl) PublishNodePoolEvent(np *entities.NodePool, msg domain.PublishMsg) { + subject := nodePoolResUpdateSubject(np) + if err := r.cli.Conn.Publish(subject, []byte(msg)); err != nil { + r.logger.Errorf(err, "failed to publish message to subject %q", subject) + } +} + +func (r *ResourceEventPublisherImpl) PublishVpnDeviceEvent(dev *entities.VPNDevice, msg domain.PublishMsg) { + subject := vpnDeviceResUpdateSubject(dev) + if err := r.cli.Conn.Publish(subject, []byte(msg)); err != nil { + r.logger.Errorf(err, "failed to publish message to subject %q", subject) + } +} + +func (r *ResourceEventPublisherImpl) PublishDomainResEvent(domain *entities.DomainEntry, msg domain.PublishMsg) { + subject := domainResUpdateSubject(domain) + if err := r.cli.Conn.Publish(subject, []byte(msg)); err != nil { + r.logger.Errorf(err, "failed to publish message to subject %q", subject) + } +} + +func (r *ResourceEventPublisherImpl) PublishPvcResEvent(pvc *entities.PersistentVolumeClaim, msg domain.PublishMsg) { + subject := pvcResUpdateSubject(pvc) + if err := r.cli.Conn.Publish(subject, []byte(msg)); err != nil { + r.logger.Errorf(err, "failed to publish message to subject %q", subject) + } +} + +func (r *ResourceEventPublisherImpl) PublishCMSEvent(cms *entities.ClusterManagedService, msg domain.PublishMsg) { + subject := clusterManagedServiceUpdateSubject(cms) + if err := r.cli.Conn.Publish(subject, []byte(msg)); err != nil { + r.logger.Errorf(err, "failed to publish message to subject %q", subject) + } +} + +func NewResourceEventPublisher(cli *nats.Client, logger logging.Logger) domain.ResourceEventPublisher { + return &ResourceEventPublisherImpl{ + cli, + logger, + } +} + +func clusterResUpdateSubject(cluster *entities.Cluster) string { + return fmt.Sprint( + "res-updates.", + "account.", + cluster.Cluster.Spec.AccountName, ".", + "cluster.", + cluster.Cluster.Name) +} + +func nodePoolResUpdateSubject(nodePool *entities.NodePool) string { + return fmt.Sprint( + "res-updates.", + "account.", + nodePool.AccountName, ".", + "cluster.", + nodePool.ClusterName, ".", + "node-pool.", nodePool.Name, + ) +} + +func domainResUpdateSubject(domainEntry *entities.DomainEntry) string { + return fmt.Sprint( + "res-updates.", + "account.", + domainEntry.AccountName, ".", + "cluster.", + domainEntry.ClusterName, ".", + "domain.", domainEntry.DomainName, + ) +} + +func vpnDeviceResUpdateSubject(device *entities.VPNDevice) string { + return fmt.Sprint( + "res-updates.", + "account.", + device.AccountName, ".", + "cluster.", + device.ClusterName, ".", + "vpn-device.", device.Name, + ) +} + +func pvcResUpdateSubject(pvc *entities.PersistentVolumeClaim) string { + return fmt.Sprint( + "res-updates.", + "account.", + pvc.AccountName, ".", + "cluster.", + pvc.ClusterName, ".", + "vpn-device.", pvc.Name, + ) +} + +func clusterManagedServiceUpdateSubject(cmsvc *entities.ClusterManagedService) string { + return fmt.Sprint( + "res-updates.", + "account.", + cmsvc.AccountName, ".", + "cluster.", + cmsvc.ClusterName, ".", + "cluster-managed-service.", cmsvc.Name, + ) +} diff --git a/apps/infra/internal/app/app.go b/apps/infra/internal/app/app.go index de059daff..8f64c240e 100644 --- a/apps/infra/internal/app/app.go +++ b/apps/infra/internal/app/app.go @@ -82,6 +82,10 @@ var Module = fx.Module( }) }), + fx.Provide(func(cli *nats.Client, logger logging.Logger) domain.ResourceEventPublisher { + return NewResourceEventPublisher(cli, logger) + }), + domain.Module, fx.Provide(func(d domain.Domain) infra.InfraServer { diff --git a/apps/infra/internal/domain/cluster-managed-service.go b/apps/infra/internal/domain/cluster-managed-service.go index fa2e5dd88..4a83e0410 100644 --- a/apps/infra/internal/domain/cluster-managed-service.go +++ b/apps/infra/internal/domain/cluster-managed-service.go @@ -116,9 +116,7 @@ func (d *domain) CreateClusterManagedService(ctx InfraContext, clusterName strin if cms, err := d.clusterManagedServiceRepo.Create(ctx, &service); err != nil { return nil, errors.NewE(err) } else { - if err := d.natCli.Conn.Publish(d.clusterManagedServiceUpdateSubject(&service), []byte("Added")); err != nil { - d.logger.Errorf(err, "failed to publish message to subject %q", d.clusterManagedServiceUpdateSubject(&service), []byte("Added")) - } + d.resourceEventPublisher.PublishCMSEvent(&service, PublishAdd) return cms, nil } @@ -161,9 +159,7 @@ func (d *domain) UpdateClusterManagedService(ctx InfraContext, clusterName strin return nil, errors.NewE(err) } - if err := d.natCli.Conn.Publish(d.clusterManagedServiceUpdateSubject(unp), []byte("Updated")); err != nil { - d.logger.Errorf(err, "failed to publish message to subject %q", d.clusterManagedServiceUpdateSubject(unp), []byte("Updated")) - } + d.resourceEventPublisher.PublishCMSEvent(unp, PublishUpdate) if err := d.resDispatcher.ApplyToTargetCluster(ctx, clusterName, unp, unp.RecordVersion); err != nil { return nil, errors.NewE(err) @@ -193,9 +189,7 @@ func (d *domain) DeleteClusterManagedService(ctx InfraContext, clusterName strin return errors.NewE(err) } - if err := d.natCli.Conn.Publish(d.clusterManagedServiceUpdateSubject(upC), []byte("Updated")); err != nil { - d.logger.Errorf(err, "failed to publish message to subject %q", d.clusterManagedServiceUpdateSubject(upC), []byte("Added")) - } + d.resourceEventPublisher.PublishCMSEvent(upC, PublishUpdate) return d.resDispatcher.DeleteFromTargetCluster(ctx, clusterName, &upC.ClusterManagedService) } @@ -211,9 +205,7 @@ func (d *domain) OnClusterManagedServiceApplyError(ctx InfraContext, clusterName svc.SyncStatus.Error = &errMsg _, err = d.clusterManagedServiceRepo.UpdateById(ctx, svc.Id, svc) - if err := d.natCli.Conn.Publish(d.clusterManagedServiceUpdateSubject(svc), []byte("Updated")); err != nil { - d.logger.Errorf(err, "failed to publish message to subject %q", d.clusterManagedServiceUpdateSubject(svc)) - } + d.resourceEventPublisher.PublishCMSEvent(svc, PublishUpdate) return errors.NewE(err) } @@ -229,9 +221,7 @@ func (d *domain) OnClusterManagedServiceDeleteMessage(ctx InfraContext, clusterN } err := d.clusterManagedServiceRepo.DeleteById(ctx, svc.Id) - if err := d.natCli.Conn.Publish(d.clusterManagedServiceUpdateSubject(svc), []byte("Deleted")); err != nil { - d.logger.Errorf(err, "failed to publish message to subject %q", d.clusterManagedServiceUpdateSubject(svc)) - } + d.resourceEventPublisher.PublishCMSEvent(svc, PublishDelete) return err } func (d *domain) OnClusterManagedServiceUpdateMessage(ctx InfraContext, clusterName string, service entities.ClusterManagedService) error { @@ -254,8 +244,6 @@ func (d *domain) OnClusterManagedServiceUpdateMessage(ctx InfraContext, clusterN if _, err := d.clusterManagedServiceRepo.UpdateById(ctx, svc.Id, svc); err != nil { return errors.NewE(err) } - if err := d.natCli.Conn.Publish(d.clusterManagedServiceUpdateSubject(svc), []byte("Updated")); err != nil { - d.logger.Errorf(err, "failed to publish message to subject %q", d.clusterManagedServiceUpdateSubject(svc)) - } + d.resourceEventPublisher.PublishCMSEvent(svc, PublishUpdate) return nil } diff --git a/apps/infra/internal/domain/clusters.go b/apps/infra/internal/domain/clusters.go index 6fc85bf9f..5e51f93b4 100644 --- a/apps/infra/internal/domain/clusters.go +++ b/apps/infra/internal/domain/clusters.go @@ -244,9 +244,7 @@ func (d *domain) CreateCluster(ctx InfraContext, cluster entities.Cluster) (*ent return nil, errors.NewE(err) } - if err = d.natCli.Conn.Publish(d.clusterResUpdateSubject(nCluster), []byte("Added")); err != nil { - d.logger.Errorf(err, "failed to publish message to account %q", cluster.Cluster.Spec.AccountId) - } + d.resourceEventPublisher.PublishClusterEvent(&cluster, PublishAdd) return nCluster, nil } @@ -335,9 +333,7 @@ func (d *domain) UpdateCluster(ctx InfraContext, cluster entities.Cluster) (*ent return nil, errors.NewE(err) } - if err := d.natCli.Conn.Publish(d.clusterResUpdateSubject(&cluster), []byte("Updated")); err != nil { - d.logger.Errorf(err, "failed to publish message to account %q", cluster.Cluster.Spec.AccountId) - } + d.resourceEventPublisher.PublishClusterEvent(&cluster, PublishUpdate) return uCluster, nil } @@ -371,9 +367,7 @@ func (d *domain) DeleteCluster(ctx InfraContext, name string) error { deletedCluster := d.deleteK8sResource(ctx, &upC.Cluster) - if err = d.natCli.Conn.Publish(d.clusterResUpdateSubject(c), []byte("Update")); err != nil { - d.logger.Errorf(err, "failed to publish message to account %q", c.Cluster.Spec.AccountId) - } + d.resourceEventPublisher.PublishClusterEvent(c, PublishUpdate) return deletedCluster } @@ -392,9 +386,7 @@ func (d *domain) OnDeleteClusterMessage(ctx InfraContext, cluster entities.Clust "metadata.name": cluster.Name, "metadata.namespace": accNs, }) - if err = d.natCli.Conn.Publish(d.clusterResUpdateSubject(&cluster), []byte("Delete")); err != nil { - d.logger.Errorf(err, "failed to publish message to account %q", cluster.Cluster.Spec.AccountId) - } + d.resourceEventPublisher.PublishClusterEvent(&cluster, PublishDelete) return onDeletedClusterMessage } @@ -421,9 +413,7 @@ func (d *domain) OnUpdateClusterMessage(ctx InfraContext, cluster entities.Clust c.Status = cluster.Status _, err = d.clusterRepo.UpdateById(ctx, c.Id, c) - if err = d.natCli.Conn.Publish(d.clusterResUpdateSubject(c), []byte("Updated")); err != nil { - d.logger.Errorf(err, "failed to publish message to account %q", cluster.Cluster.Spec.AccountId) - } + d.resourceEventPublisher.PublishClusterEvent(&cluster, PublishUpdate) return errors.NewE(err) } diff --git a/apps/infra/internal/domain/domain-entries.go b/apps/infra/internal/domain/domain-entries.go index a6ba598cd..9b2d5d487 100644 --- a/apps/infra/internal/domain/domain-entries.go +++ b/apps/infra/internal/domain/domain-entries.go @@ -43,9 +43,7 @@ func (d *domain) CreateDomainEntry(ctx InfraContext, de entities.DomainEntry) (* if err != nil { return nil, errors.NewE(err) } - if err = d.natCli.Conn.Publish(d.domainResUpdateSubject(nde), []byte("Added")); err != nil { - d.logger.Errorf(err, "failed to publish message to account %q", d.domainResUpdateSubject(nde)) - } + d.resourceEventPublisher.PublishDomainResEvent(nde, PublishAdd) return nde, nil } @@ -71,9 +69,7 @@ func (d *domain) UpdateDomainEntry(ctx InfraContext, de entities.DomainEntry) (* if err != nil { return nil, errors.NewE(err) } - if err = d.natCli.Conn.Publish(d.domainResUpdateSubject(newDe), []byte("Updated")); err != nil { - d.logger.Errorf(err, "failed to publish message to account %q", d.domainResUpdateSubject(newDe)) - } + d.resourceEventPublisher.PublishDomainResEvent(newDe, PublishUpdate) return newDe, nil } @@ -89,9 +85,7 @@ func (d *domain) DeleteDomainEntry(ctx InfraContext, domainName string) error { if err = d.domainEntryRepo.DeleteById(ctx, entry.Id); err != nil { return err } - if err = d.natCli.Conn.Publish(d.domainResUpdateSubject(entry), []byte("Updated")); err != nil { - d.logger.Errorf(err, "failed to publish message to account %q", d.domainResUpdateSubject(entry)) - } + d.resourceEventPublisher.PublishDomainResEvent(entry, PublishUpdate) return err } diff --git a/apps/infra/internal/domain/domain.go b/apps/infra/internal/domain/domain.go index 18b3bdab5..9cf39a8a3 100644 --- a/apps/infra/internal/domain/domain.go +++ b/apps/infra/internal/domain/domain.go @@ -43,6 +43,7 @@ type domain struct { resDispatcher ResourceDispatcher k8sClient k8s.Client natCli *nats.Client + resourceEventPublisher ResourceEventPublisher } func (d *domain) resyncToTargetCluster(ctx InfraContext, action types.SyncAction, clusterName string, obj client.Object, recordVersion int) error { @@ -149,6 +150,7 @@ var Module = fx.Module("domain", msgOfficeInternalClient message_office_internal.MessageOfficeInternalClient, natCli *nats.Client, logger logging.Logger, + resourceEventPublisher ResourceEventPublisher, ) Domain { return &domain{ logger: logger, @@ -168,6 +170,7 @@ var Module = fx.Module("domain", iamClient: iamClient, accountsSvc: accountsSvc, messageOfficeInternalClient: msgOfficeInternalClient, + resourceEventPublisher: resourceEventPublisher, } }), ) diff --git a/apps/infra/internal/domain/nodepool.go b/apps/infra/internal/domain/nodepool.go index af9632aef..965fcccc1 100644 --- a/apps/infra/internal/domain/nodepool.go +++ b/apps/infra/internal/domain/nodepool.go @@ -144,15 +144,12 @@ func (d *domain) CreateNodePool(ctx InfraContext, clusterName string, nodepool e } return nil, errors.NewE(err) } - if err:=d.natCli.Conn.Publish(d.nodePoolResUpdateSubject(&nodepool), []byte("Added")); err != nil { - d.logger.Errorf(err, "failed to publish message to subject %q", d.nodePoolResUpdateSubject(&nodepool), []byte("Added")) - } + d.resourceEventPublisher.PublishNodePoolEvent(&nodepool, PublishAdd) if err := d.resDispatcher.ApplyToTargetCluster(ctx, clusterName, &np.NodePool, np.RecordVersion); err != nil { return nil, errors.NewE(err) } - return np, nil } @@ -198,14 +195,9 @@ func (d *domain) UpdateNodePool(ctx InfraContext, clusterName string, nodePool e if err != nil { return nil, errors.NewE(err) } - if err:=d.natCli.Conn.Publish(d.nodePoolResUpdateSubject(unp), []byte("Updated")); err != nil { - d.logger.Errorf(err, "failed to publish message to subject %q", d.nodePoolResUpdateSubject(unp), []byte("Added")) - } - - if err:=d.natCli.Conn.Publish(d.nodePoolResUpdateSubject(unp), []byte("Added")); err != nil { - d.logger.Errorf(err, "failed to publish message to subject %q", d.nodePoolResUpdateSubject(unp)) - } + d.resourceEventPublisher.PublishNodePoolEvent(unp, PublishUpdate) + d.resourceEventPublisher.PublishNodePoolEvent(unp, PublishDelete) if err := d.resDispatcher.ApplyToTargetCluster(ctx, clusterName, &unp.NodePool, unp.RecordVersion); err != nil { return nil, errors.NewE(err) @@ -233,9 +225,7 @@ func (d *domain) DeleteNodePool(ctx InfraContext, clusterName string, poolName s if err != nil { return errors.NewE(err) } - if err:=d.natCli.Conn.Publish(d.nodePoolResUpdateSubject(upC), []byte("Updated")); err != nil { - d.logger.Errorf(err, "failed to publish message to subject %q", d.nodePoolResUpdateSubject(upC), []byte("Added")) - } + d.resourceEventPublisher.PublishNodePoolEvent(upC, PublishUpdate) return d.resDispatcher.DeleteFromTargetCluster(ctx, clusterName, &upC.NodePool) } @@ -307,9 +297,7 @@ func (d *domain) OnDeleteNodePoolMessage(ctx InfraContext, clusterName string, n } err := d.nodePoolRepo.DeleteById(ctx, np.Id) - if err:=d.natCli.Conn.Publish(d.nodePoolResUpdateSubject(np), []byte("Deleted")); err != nil { - d.logger.Errorf(err, "failed to publish message to subject %q", d.nodePoolResUpdateSubject(np)) - } + d.resourceEventPublisher.PublishNodePoolEvent(np, PublishDelete) return err } @@ -333,9 +321,7 @@ func (d *domain) OnUpdateNodePoolMessage(ctx InfraContext, clusterName string, n if _, err := d.nodePoolRepo.UpdateById(ctx, np.Id, np); err != nil { return errors.NewE(err) } - if err:=d.natCli.Conn.Publish(d.nodePoolResUpdateSubject(np), []byte("Updated")); err != nil { - d.logger.Errorf(err, "failed to publish message to subject %q", d.nodePoolResUpdateSubject(np)) - } + d.resourceEventPublisher.PublishNodePoolEvent(np, PublishUpdate) return nil } @@ -351,8 +337,6 @@ func (d *domain) OnNodepoolApplyError(ctx InfraContext, clusterName string, name np.SyncStatus.Error = &errMsg _, err = d.nodePoolRepo.UpdateById(ctx, np.Id, np) - if err:=d.natCli.Conn.Publish(d.nodePoolResUpdateSubject(np), []byte("Updated")); err != nil { - d.logger.Errorf(err, "failed to publish message to subject %q", d.nodePoolResUpdateSubject(np)) - } + d.resourceEventPublisher.PublishNodePoolEvent(np, PublishUpdate) return errors.NewE(err) } diff --git a/apps/infra/internal/domain/ports.go b/apps/infra/internal/domain/ports.go index 095741a2c..ba6521c71 100644 --- a/apps/infra/internal/domain/ports.go +++ b/apps/infra/internal/domain/ports.go @@ -2,6 +2,7 @@ package domain import ( "context" + "github.com/kloudlite/api/apps/infra/internal/entities" "github.com/kloudlite/api/grpc-interfaces/kloudlite.io/rpc/accounts" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -10,10 +11,24 @@ type AccountsSvc interface { GetAccount(ctx context.Context, userId string, accountName string) (*accounts.GetAccountOut, error) } - - - type ResourceDispatcher interface { ApplyToTargetCluster(ctx InfraContext, clusterName string, obj client.Object, recordVersion int) error DeleteFromTargetCluster(ctx InfraContext, clusterName string, obj client.Object) error -} \ No newline at end of file +} + +type PublishMsg string + +const ( + PublishAdd PublishMsg = "added" + PublishDelete PublishMsg = "deleted" + PublishUpdate PublishMsg = "updated" +) + +type ResourceEventPublisher interface { + PublishClusterEvent(cluster *entities.Cluster, msg PublishMsg) + PublishNodePoolEvent(np *entities.NodePool, msg PublishMsg) + PublishVpnDeviceEvent(dev *entities.VPNDevice, msg PublishMsg) + PublishDomainResEvent(domain *entities.DomainEntry, msg PublishMsg) + PublishPvcResEvent(pvc *entities.PersistentVolumeClaim, msg PublishMsg) + PublishCMSEvent(pvc *entities.ClusterManagedService, msg PublishMsg) +} diff --git a/apps/infra/internal/domain/pvc.go b/apps/infra/internal/domain/pvc.go index 7874420a9..bf94f97af 100644 --- a/apps/infra/internal/domain/pvc.go +++ b/apps/infra/internal/domain/pvc.go @@ -39,9 +39,7 @@ func (d *domain) OnPVCUpdateMessage(ctx InfraContext, clusterName string, pvc en }, &pvc); err != nil { return errors.NewE(err) } - if err:=d.natCli.Conn.Publish(d.pvcResUpdateSubject(&pvc), []byte("Updated")); err != nil { - d.logger.Errorf(err, "failed to publish message to subject %q", d.pvcResUpdateSubject(&pvc)) - } + d.resourceEventPublisher.PublishPvcResEvent(&pvc, PublishUpdate) return nil } @@ -54,8 +52,6 @@ func (d *domain) OnPVCDeleteMessage(ctx InfraContext, clusterName string, pvc en }); err != nil { return errors.NewE(err) } - if err:=d.natCli.Conn.Publish(d.pvcResUpdateSubject(&pvc), []byte("Deleted")); err != nil { - d.logger.Errorf(err, "failed to publish message to subject %q", d.pvcResUpdateSubject(&pvc)) - } + d.resourceEventPublisher.PublishPvcResEvent(&pvc, PublishDelete) return nil } diff --git a/apps/infra/internal/domain/update-subjects.go b/apps/infra/internal/domain/update-subjects.go deleted file mode 100644 index 82a321b11..000000000 --- a/apps/infra/internal/domain/update-subjects.go +++ /dev/null @@ -1,70 +0,0 @@ -package domain - -import ( - "fmt" - "github.com/kloudlite/api/apps/infra/internal/entities" -) - -func (d *domain) clusterResUpdateSubject(cluster *entities.Cluster) string { - return fmt.Sprint( - "res-updates.", - "account.", - cluster.Cluster.Spec.AccountName, ".", - "cluster.", - cluster.Cluster.Name) -} - -func (d *domain) nodePoolResUpdateSubject(nodePool *entities.NodePool) string { - return fmt.Sprint( - "res-updates.", - "account.", - nodePool.AccountName, ".", - "cluster.", - nodePool.ClusterName, ".", - "node-pool.", nodePool.Name, - ) -} - -func (d *domain) clusterManagedServiceUpdateSubject(cmsvc *entities.ClusterManagedService) string { - return fmt.Sprint( - "res-updates.", - "account.", - cmsvc.AccountName, ".", - "cluster.", - cmsvc.ClusterName, ".", - "cluster-managed-service.", cmsvc.Name, - ) -} - -func (d *domain) domainResUpdateSubject(domainEntry *entities.DomainEntry) string { - return fmt.Sprint( - "res-updates.", - "account.", - domainEntry.AccountName, ".", - "cluster.", - domainEntry.ClusterName, ".", - "domain.", domainEntry.DomainName, - ) -} - -func (d *domain) vpnDeviceResUpdateSubject(device *entities.VPNDevice) string { - return fmt.Sprint( - "res-updates.", - "account.", - device.AccountName, ".", - "cluster.", - device.ClusterName, ".", - "vpn-device.", device.Name, - ) -} - -func (d *domain) pvcResUpdateSubject(pvc *entities.PersistentVolumeClaim) string { - return fmt.Sprint( - "res-updates.", - "account.", - pvc.AccountName, ".", - "cluster.", - pvc.ClusterName, ".", - "vpn-device.", pvc.Name, - ) -} diff --git a/apps/infra/internal/domain/vpn-device.go b/apps/infra/internal/domain/vpn-device.go index 8ff87fa6a..107c904dc 100644 --- a/apps/infra/internal/domain/vpn-device.go +++ b/apps/infra/internal/domain/vpn-device.go @@ -49,9 +49,7 @@ func (d *domain) CreateVPNDevice(ctx InfraContext, clusterName string, device en } return nil, errors.NewE(err) } - if err:=d.natCli.Conn.Publish(d.vpnDeviceResUpdateSubject(nDevice), []byte("Deleted")); err != nil { - d.logger.Errorf(err, "failed to publish message to subject %q", d.vpnDeviceResUpdateSubject(nDevice)) - } + d.resourceEventPublisher.PublishVpnDeviceEvent(&device, PublishAdd) if err := d.resDispatcher.ApplyToTargetCluster(ctx, clusterName, &nDevice.Device, nDevice.RecordVersion); err != nil { return nil, errors.NewE(err) @@ -89,9 +87,7 @@ func (d *domain) UpdateVPNDevice(ctx InfraContext, clusterName string, device en if err != nil { return nil, errors.NewE(err) } - if err:=d.natCli.Conn.Publish(d.vpnDeviceResUpdateSubject(nDevice), []byte("Updated")); err != nil { - d.logger.Errorf(err, "failed to publish message to subject %q", d.vpnDeviceResUpdateSubject(nDevice)) - } + d.resourceEventPublisher.PublishVpnDeviceEvent(nDevice, PublishUpdate) if err := d.resDispatcher.ApplyToTargetCluster(ctx, clusterName, &nDevice.Device, nDevice.RecordVersion); err != nil { return nil, errors.NewE(err) @@ -131,9 +127,7 @@ func (d *domain) DeleteVPNDevice(ctx InfraContext, clusterName string, name stri if _, err := d.vpnDeviceRepo.UpdateById(ctx, device.Id, device); err != nil { return errors.NewE(err) } - if err:=d.natCli.Conn.Publish(d.vpnDeviceResUpdateSubject(device), []byte("Updated")); err != nil { - d.logger.Errorf(err, "failed to publish message to subject %q", d.vpnDeviceResUpdateSubject(device)) - } + d.resourceEventPublisher.PublishVpnDeviceEvent(device, PublishUpdate) return d.resDispatcher.DeleteFromTargetCluster(ctx, clusterName, &device.Device) } @@ -148,9 +142,7 @@ func (d *domain) OnVPNDeviceApplyError(ctx InfraContext, clusterName string, nam currDevice.SyncStatus.Error = &errMsg _, err = d.vpnDeviceRepo.UpdateById(ctx, currDevice.Id, currDevice) - if err:=d.natCli.Conn.Publish(d.vpnDeviceResUpdateSubject(currDevice), []byte("Updated")); err != nil { - d.logger.Errorf(err, "failed to publish message to subject %q", d.vpnDeviceResUpdateSubject(currDevice)) - } + d.resourceEventPublisher.PublishVpnDeviceEvent(currDevice, PublishUpdate) return errors.NewE(err) } @@ -179,9 +171,7 @@ func (d *domain) OnVPNDeviceUpdateMessage(ctx InfraContext, clusterName string, currDevice.SyncStatus.LastSyncedAt = time.Now() _, err = d.vpnDeviceRepo.UpdateById(ctx, currDevice.Id, currDevice) - if err:=d.natCli.Conn.Publish(d.vpnDeviceResUpdateSubject(currDevice), []byte("Updated")); err != nil { - d.logger.Errorf(err, "failed to publish message to subject %q", d.vpnDeviceResUpdateSubject(currDevice)) - } + d.resourceEventPublisher.PublishVpnDeviceEvent(currDevice, PublishUpdate) return errors.NewE(err) } @@ -198,8 +188,6 @@ func (d *domain) OnVPNDeviceDeleteMessage(ctx InfraContext, clusterName string, if err = d.vpnDeviceRepo.DeleteById(ctx, currDevice.Id); err != nil { return errors.NewE(err) } - if err:=d.natCli.Conn.Publish(d.vpnDeviceResUpdateSubject(currDevice), []byte("Updated")); err != nil { - d.logger.Errorf(err, "failed to publish message to subject %q", d.vpnDeviceResUpdateSubject(currDevice)) - } + d.resourceEventPublisher.PublishVpnDeviceEvent(currDevice, PublishUpdate) return nil }