diff --git a/apps/console/internal/domain/app.go b/apps/console/internal/domain/app.go index 3eb584603..4b8ea6f7a 100644 --- a/apps/console/internal/domain/app.go +++ b/apps/console/internal/domain/app.go @@ -1,8 +1,6 @@ package domain import ( - "fmt" - "github.com/kloudlite/api/apps/console/internal/entities" fc "github.com/kloudlite/api/apps/console/internal/entities/field-constants" "github.com/kloudlite/api/common" @@ -202,7 +200,6 @@ func (d *domain) RestartApp(ctx ResourceContext, appName string) error { } func (d *domain) OnAppUpdateMessage(ctx ResourceContext, app entities.App, status types.ResourceStatus, opts UpdateAndDeleteOpts) error { - fmt.Printf("OnAppUpdateMessage: %v\n", app) xApp, err := d.findApp(ctx, app.Name) if err != nil { return errors.NewE(err) diff --git a/apps/container-registry/internal/app/adapter-resource-update-publish.go b/apps/container-registry/internal/app/adapter-resource-update-publish.go index 37d2e4e6e..2cba84c4a 100644 --- a/apps/container-registry/internal/app/adapter-resource-update-publish.go +++ b/apps/container-registry/internal/app/adapter-resource-update-publish.go @@ -35,11 +35,11 @@ func NewResourceEventPublisher(cli *nats.Client, logger logging.Logger) domain.R } func clusterBuildRunUpdateSubject(buildRun *entities.BuildRun) string { - return fmt.Sprintf("res-updates.account.%s.cluster.%s.repo.%s.build-config.%s.build-run.%s", + return fmt.Sprintf("res-updates.account.%s.repo.%s.build-run.%s", buildRun.AccountName, - buildRun.ClusterName, buildRun.Spec.Registry.Repo.Name, - buildRun.Spec.BuildOptions) + buildRun.Id, + ) } func clusterBuildCacheUpdateSubject(buildCache *entities.BuildCacheKey) string { diff --git a/apps/container-registry/internal/app/git-webhook-consumer.go b/apps/container-registry/internal/app/git-webhook-consumer.go index 0043d82c1..d7b9e5951 100644 --- a/apps/container-registry/internal/app/git-webhook-consumer.go +++ b/apps/container-registry/internal/app/git-webhook-consumer.go @@ -127,7 +127,7 @@ func processGitWebhooks(ctx context.Context, d domain.Domain, consumer GitWebhoo AccountName: build.Spec.AccountName, } - err := d.CreateBuildRun(dctx, build, hook, pullToken) + err := d.CreateBuildRun(dctx, build, hook, pullToken, "") if err != nil { logger.Errorf(err, "could not create build run") } diff --git a/apps/container-registry/internal/app/graph/buildrun.resolvers.go b/apps/container-registry/internal/app/graph/buildrun.resolvers.go index 76efd3377..4d1a14fc0 100644 --- a/apps/container-registry/internal/app/graph/buildrun.resolvers.go +++ b/apps/container-registry/internal/app/graph/buildrun.resolvers.go @@ -33,29 +33,17 @@ func (r *buildRunResolver) ID(ctx context.Context, obj *entities.BuildRun) (stri // Spec is the resolver for the spec field. func (r *buildRunResolver) Spec(ctx context.Context, obj *entities.BuildRun) (*model.GithubComKloudliteOperatorApisDistributionV1BuildRunSpec, error) { - var m model.GithubComKloudliteOperatorApisDistributionV1BuildRunSpec - if err := fn.JsonConversion(obj.Spec, &m); err != nil { - return nil, errors.NewE(err) - } - return &m, nil + return fn.JsonConvert[*model.GithubComKloudliteOperatorApisDistributionV1BuildRunSpec](obj.Spec) } // Status is the resolver for the status field. func (r *buildRunResolver) Status(ctx context.Context, obj *entities.BuildRun) (*model.GithubComKloudliteOperatorPkgOperatorStatus, error) { - var m model.GithubComKloudliteOperatorPkgOperatorStatus - if err := fn.JsonConversion(obj.Spec, &m); err != nil { - return nil, errors.NewE(err) - } - return &m, nil + return fn.JsonConvert[*model.GithubComKloudliteOperatorPkgOperatorStatus](obj.Status) } // SyncStatus is the resolver for the syncStatus field. func (r *buildRunResolver) SyncStatus(ctx context.Context, obj *entities.BuildRun) (*model.GithubComKloudliteAPIPkgTypesSyncStatus, error) { - var m model.GithubComKloudliteAPIPkgTypesSyncStatus - if err := fn.JsonConversion(obj.Spec, &m); err != nil { - return nil, errors.NewE(err) - } - return &m, nil + return fn.JsonConvert[*model.GithubComKloudliteAPIPkgTypesSyncStatus](obj.SyncStatus) } // UpdateTime is the resolver for the updateTime field. diff --git a/apps/container-registry/internal/app/process-resource-updates.go b/apps/container-registry/internal/app/process-resource-updates.go index e22ab19aa..eae637ad9 100644 --- a/apps/container-registry/internal/app/process-resource-updates.go +++ b/apps/container-registry/internal/app/process-resource-updates.go @@ -10,6 +10,7 @@ import ( "github.com/kloudlite/api/apps/container-registry/internal/domain" "github.com/kloudlite/api/apps/container-registry/internal/domain/entities" + "github.com/kloudlite/api/pkg/errors" fn "github.com/kloudlite/api/pkg/functions" "github.com/kloudlite/api/pkg/logging" "github.com/kloudlite/operator/operators/resource-watcher/types" @@ -55,6 +56,24 @@ func processResourceUpdates(consumer ReceiveResourceUpdatesConsumer, d domain.Do "accountName/clusterName", fmt.Sprintf("%s/%s", su.AccountName, su.ClusterName), ) + resStatus, err := func() (types.ResourceStatus, error) { + v, ok := su.Object[types.ResourceStatusKey] + if !ok { + return "", errors.NewE(fmt.Errorf("field %s not found in object", types.ResourceStatusKey)) + } + s, ok := v.(string) + if !ok { + return "", errors.NewE(fmt.Errorf("field value %v is not a string", v)) + } + + return types.ResourceStatus(s), nil + }() + if err != nil { + return err + } + + opts := domain.UpdateAndDeleteOpts{MessageTimestamp: msg.Timestamp} + mLogger.Infof("received message") defer func() { mLogger.Infof("processed message") @@ -71,10 +90,15 @@ func processResourceUpdates(consumer ReceiveResourceUpdatesConsumer, d domain.Do if err := fn.JsonConversion(su.Object, &buildRun); err != nil { return err } + + buildRun.AccountName = su.AccountName + buildRun.ClusterName = su.ClusterName + if obj.GetDeletionTimestamp() != nil { return d.OnBuildRunDeleteMessage(dctx, buildRun) } - return d.OnBuildRunUpdateMessage(dctx, buildRun) + + return d.OnBuildRunUpdateMessage(dctx, buildRun, resStatus, opts) } default: diff --git a/apps/container-registry/internal/domain/api.go b/apps/container-registry/internal/domain/api.go index a8b7ca6a0..acf29f909 100644 --- a/apps/container-registry/internal/domain/api.go +++ b/apps/container-registry/internal/domain/api.go @@ -2,11 +2,13 @@ package domain import ( "context" + "time" "github.com/kloudlite/api/apps/container-registry/internal/domain/entities" "github.com/kloudlite/api/pkg/logging" "github.com/kloudlite/api/pkg/repos" "github.com/kloudlite/api/pkg/types" + t "github.com/kloudlite/operator/operators/resource-watcher/types" ) func NewRegistryContext(parent context.Context, userId repos.ID, accountName string) RegistryContext { @@ -22,6 +24,10 @@ type CheckNameAvailabilityOutput struct { SuggestedNames []string `json:"suggestedNames,omitempty"` } +type UpdateAndDeleteOpts struct { + MessageTimestamp time.Time +} + type Domain interface { ProcessRegistryEvents(ctx context.Context, events []entities.Event, logger logging.Logger) error @@ -81,9 +87,10 @@ type Domain interface { ListBuildRuns(ctx RegistryContext, repoName string, search map[string]repos.MatchFilter, pagination repos.CursorPagination) (*repos.PaginatedRecord[*entities.BuildRun], error) GetBuildRun(ctx RegistryContext, repoName string, runName string) (*entities.BuildRun, error) - OnBuildRunUpdateMessage(ctx RegistryContext, buildRun entities.BuildRun) error + OnBuildRunUpdateMessage(ctx RegistryContext, buildRun entities.BuildRun, status t.ResourceStatus, opts UpdateAndDeleteOpts) error + OnBuildRunDeleteMessage(ctx RegistryContext, buildRun entities.BuildRun) error OnBuildRunApplyErrorMessage(ctx RegistryContext, clusterName string, name string, errorMsg string) error ListBuildsByCache(ctx RegistryContext, cacheId repos.ID, pagination repos.CursorPagination) (*repos.PaginatedRecord[*entities.Build], error) - CreateBuildRun(ctx RegistryContext, build *entities.Build, hook *GitWebhookPayload, pullToken string) error + CreateBuildRun(ctx RegistryContext, build *entities.Build, hook *GitWebhookPayload, pullToken string, seed string) error } diff --git a/apps/container-registry/internal/domain/build-run.go b/apps/container-registry/internal/domain/build-run.go index a692420f0..a6e47a352 100644 --- a/apps/container-registry/internal/domain/build-run.go +++ b/apps/container-registry/internal/domain/build-run.go @@ -3,6 +3,7 @@ package domain import ( "crypto/md5" "fmt" + "strconv" "strings" "time" @@ -22,6 +23,8 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/yaml" + + t2 "github.com/kloudlite/operator/operators/resource-watcher/types" ) func (d *Impl) ListBuildRuns(ctx RegistryContext, repoName string, matchFilters map[string]repos.MatchFilter, pagination repos.CursorPagination) (*repos.PaginatedRecord[*entities.BuildRun], error) { @@ -49,15 +52,62 @@ func (d *Impl) GetBuildRun(ctx RegistryContext, repoName string, buildRunName st return brun, nil } -func (d *Impl) OnBuildRunUpdateMessage(ctx RegistryContext, buildRun entities.BuildRun) error { - if _, err := d.buildRunRepo.Upsert(ctx, repos.Filter{ +func (d *Impl) parseRecordVersionFromAnnotations(annotations map[string]string) (int, error) { + annotatedVersion, ok := annotations[constants.RecordVersionKey] + if !ok { + return 0, errors.Newf("no annotation with record version key (%s), found on the resource", constants.RecordVersionKey) + } + + annVersion, err := strconv.ParseInt(annotatedVersion, 10, 32) + if err != nil { + return 0, errors.NewE(err) + } + + return int(annVersion), nil +} + +func (d *Impl) MatchRecordVersion(annotations map[string]string, rv int) (int, error) { + annVersion, err := d.parseRecordVersionFromAnnotations(annotations) + if err != nil { + return -1, errors.NewE(err) + } + + if annVersion != rv { + return -1, errors.Newf("record version mismatch, expected %d, got %d", rv, annVersion) + } + + return annVersion, nil +} + +func (d *Impl) OnBuildRunUpdateMessage(ctx RegistryContext, buildRun entities.BuildRun, status t2.ResourceStatus, opts UpdateAndDeleteOpts) error { + + xBr, err := d.buildRunRepo.FindOne(ctx, repos.Filter{ + fields.AccountName: ctx.AccountName, fields.MetadataName: buildRun.Name, fields.MetadataNamespace: buildRun.Namespace, - fields.AccountName: ctx.AccountName, fields.ClusterName: buildRun.ClusterName, - }, &buildRun); err != nil { + }) + if err != nil { return errors.NewE(err) } + if xBr == nil { + return errors.Newf("build run with name %q not found", buildRun.Name) + } + + recordVersion, err := d.MatchRecordVersion(xBr.Annotations, xBr.RecordVersion) + if err != nil { + return errors.NewE(err) + } + + if _, err = d.buildRunRepo.PatchById( + ctx, + xBr.Id, + common.PatchForSyncFromAgent(&buildRun, recordVersion, status, common.PatchOpts{ + MessageTimestamp: opts.MessageTimestamp, + })); err != nil { + return errors.NewE(err) + } + d.resourceEventPublisher.PublishBuildRunEvent(&buildRun, PublishAdd) return nil @@ -96,13 +146,13 @@ func (d *Impl) OnBuildRunApplyErrorMessage(ctx RegistryContext, clusterName stri return errors.NewE(err) } -func getUniqueKey(build *entities.Build, hook *GitWebhookPayload) string { - uid := fmt.Sprint(build.Id, hook.CommitHash) +func getUniqueKey(build *entities.Build, hook *GitWebhookPayload, seed string) string { + uid := fmt.Sprint(build.Id, hook.CommitHash, seed) return fmt.Sprintf("%x", md5.Sum([]byte(uid))) } -func (d *Impl) CreateBuildRun(ctx RegistryContext, build *entities.Build, hook *GitWebhookPayload, pullToken string) error { - uniqueKey := getUniqueKey(build, hook) +func (d *Impl) CreateBuildRun(ctx RegistryContext, build *entities.Build, hook *GitWebhookPayload, pullToken string, seed string) error { + uniqueKey := getUniqueKey(build, hook, seed) i, err := admin.GetExpirationTime(fmt.Sprintf("%d%s", 1, "d")) if err != nil { return errors.NewE(err) @@ -179,7 +229,7 @@ func (d *Impl) CreateBuildRun(ctx RegistryContext, build *entities.Build, hook * br := entities.BuildRun{ BuildRun: brRaw, BuildName: build.Name, - SyncStatus: t.SyncStatus{}, + SyncStatus: t.GenSyncStatus(t.SyncActionApply, build.RecordVersion), } br.AccountName = build.Spec.AccountName br.ClusterName = build.BuildClusterName diff --git a/apps/container-registry/internal/domain/build.go b/apps/container-registry/internal/domain/build.go index ee87cfcae..9fc3abc8f 100644 --- a/apps/container-registry/internal/domain/build.go +++ b/apps/container-registry/internal/domain/build.go @@ -4,6 +4,7 @@ import ( "context" "slices" + "github.com/google/uuid" "github.com/kloudlite/api/apps/container-registry/internal/domain/entities" fc "github.com/kloudlite/api/apps/container-registry/internal/domain/entities/field-constants" iamT "github.com/kloudlite/api/apps/iam/types" @@ -236,7 +237,6 @@ func (d *Impl) DeleteBuild(ctx RegistryContext, buildId repos.ID) error { } func (d *Impl) TriggerBuild(ctx RegistryContext, buildId repos.ID) error { - co, err := d.iamClient.Can(ctx, &iam.CanIn{ UserId: string(ctx.UserId), ResourceRefs: []string{ @@ -300,12 +300,16 @@ func (d *Impl) TriggerBuild(ctx RegistryContext, buildId repos.ID) error { return errors.Newf("provider %s not supported", b.Source.Provider) } + b.Name = string(buildId) + + uid := uuid.NewString() + if err := d.CreateBuildRun(ctx, b, &GitWebhookPayload{ GitProvider: string(b.Source.Provider), RepoUrl: b.Source.Repository, GitBranch: b.Source.Branch, CommitHash: commitHash, - }, pullToken); err != nil { + }, pullToken, uid); err != nil { return errors.NewE(err) } diff --git a/apps/container-registry/internal/domain/entities/build-run.go b/apps/container-registry/internal/domain/entities/build-run.go index f4c34b89e..0c6ee6aa8 100644 --- a/apps/container-registry/internal/domain/entities/build-run.go +++ b/apps/container-registry/internal/domain/entities/build-run.go @@ -1,9 +1,11 @@ package entities import ( + "github.com/kloudlite/api/common" "github.com/kloudlite/api/pkg/repos" t "github.com/kloudlite/api/pkg/types" distributionv1 "github.com/kloudlite/operator/apis/distribution/v1" + "github.com/kloudlite/operator/pkg/operator" ) type BuildRun struct { @@ -13,6 +15,19 @@ type BuildRun struct { AccountName string `json:"accountName" graphql:"noinput"` ClusterName string `json:"clusterName" graphql:"noinput"` SyncStatus t.SyncStatus `json:"syncStatus" graphql:"noinput"` + common.ResourceMetadata `json:",inline"` +} + +func (a *BuildRun) GetDisplayName() string { + return a.ResourceMetadata.DisplayName +} + +func (a *BuildRun) GetGeneration() int64 { + return a.ObjectMeta.Generation +} + +func (a *BuildRun) GetStatus() operator.Status { + return a.BuildRun.Status } var BuildRunIndices = []repos.IndexField{