From 7b7f878ce8bafea4451aa435957dff66b2468be3 Mon Sep 17 00:00:00 2001 From: Ken'ichiro Oyama Date: Mon, 22 Sep 2025 14:41:11 +0900 Subject: [PATCH 1/2] feat: add parallel fetching for services --- go.mod | 1 + go.sum | 2 + tailor/resource.go | 605 +++++++++++++++++++++++++++++---------------- 3 files changed, 391 insertions(+), 217 deletions(-) diff --git a/go.mod b/go.mod index de80c6f..c6059a4 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/olekukonko/tablewriter v1.0.9 github.com/spf13/cobra v1.10.1 github.com/vektah/gqlparser/v2 v2.5.30 + golang.org/x/sync v0.17.0 google.golang.org/protobuf v1.36.9 ) diff --git a/go.sum b/go.sum index 17e9289..2296117 100644 --- a/go.sum +++ b/go.sum @@ -63,6 +63,8 @@ github.com/vektah/gqlparser/v2 v2.5.30 h1:EqLwGAFLIzt1wpx1IPpY67DwUujF1OfzgEyDsL github.com/vektah/gqlparser/v2 v2.5.30/go.mod h1:D1/VCZtV3LPnQrcPBeR/q5jkSQIPti0uYCP/RI0gIeo= golang.org/x/net v0.43.0 h1:lat02VYK2j4aLzMzecihNvTlJNQUq316m2Mr9rnM6YE= golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg= +golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= +golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= diff --git a/tailor/resource.go b/tailor/resource.go index 8bfd94d..4ab8ea6 100644 --- a/tailor/resource.go +++ b/tailor/resource.go @@ -2,10 +2,12 @@ package tailor import ( "context" + "sync" "time" tailorv1 "buf.build/gen/go/tailor-inc/tailor/protocolbuffers/go/tailor/v1" "connectrpc.com/connect" + "golang.org/x/sync/errgroup" ) type Resources struct { @@ -20,6 +22,8 @@ type Resources struct { withoutPipeline bool withoutStateFlow bool executionResultsSince *time.Time + + mu sync.Mutex } type Application struct { @@ -171,246 +175,413 @@ func (c *Client) Resources(ctx context.Context, opts ...ResourceOption) (*Resour return nil, err } } + + // Create errgroup for top-level parallel execution + g, ctx := errgroup.WithContext(ctx) + // Pipeline Services if !resources.withoutPipeline { - pageToken := "" - for { - res, err := c.client.ListPipelineServices(ctx, connect.NewRequest(&tailorv1.ListPipelineServicesRequest{ - WorkspaceId: c.cfg.WorkspaceID, - PageSize: pageSize, - PageToken: pageToken, - })) - if err != nil { - return nil, err - } - for _, p := range res.Msg.GetPipelineServices() { + g.Go(func() error { + return c.fetchPipelineServices(ctx, resources) + }) + } + + // TailorDB Services + if !resources.withoutTailorDB { + g.Go(func() error { + return c.fetchTailorDBServices(ctx, resources) + }) + } + + // StateFlow Services + if !resources.withoutStateFlow { + g.Go(func() error { + return c.fetchStateFlowServices(ctx, resources) + }) + } + + // Wait for all services to complete + if err := g.Wait(); err != nil { + return nil, err + } + + return resources, nil +} + +// fetchPipelineServices fetches pipeline services in parallel +func (c *Client) fetchPipelineServices(ctx context.Context, resources *Resources) error { + pageToken := "" + for { + res, err := c.client.ListPipelineServices(ctx, connect.NewRequest(&tailorv1.ListPipelineServicesRequest{ + WorkspaceId: c.cfg.WorkspaceID, + PageSize: pageSize, + PageToken: pageToken, + })) + if err != nil { + return err + } + + // Process pipelines in parallel + g, ctx := errgroup.WithContext(ctx) + var pipelines []*Pipeline + var mu sync.Mutex + + for _, p := range res.Msg.GetPipelineServices() { + g.Go(func() error { pipeline := &Pipeline{ NamespaceName: p.GetNamespace().GetName(), CommonSDL: p.GetCommonSdl(), } - // Pipeline Resolvers - { - pageToken := "" - for { - res, err := c.client.ListPipelineResolvers(ctx, connect.NewRequest(&tailorv1.ListPipelineResolversRequest{ - WorkspaceId: c.cfg.WorkspaceID, - NamespaceName: p.GetNamespace().GetName(), - PageSize: pageSize, - PageToken: pageToken, - })) - if err != nil { - return nil, err - } - for _, r := range res.Msg.GetPipelineResolvers() { - res, err := c.client.GetPipelineResolver(ctx, connect.NewRequest(&tailorv1.GetPipelineResolverRequest{ - WorkspaceId: c.cfg.WorkspaceID, - NamespaceName: p.GetNamespace().GetName(), - ResolverName: r.GetName(), - })) - if err != nil { - return nil, err - } - rr := res.Msg.GetPipelineResolver() - resolver := &PipelineResolver{ - Name: rr.GetName(), - Description: rr.GetDescription(), - Authorization: rr.GetAuthorization(), - SDL: rr.GetSdl(), - PreHook: rr.GetPreHook().GetExpr(), - PreScript: rr.GetPreScript(), - PostScript: rr.GetPostScript(), - PostHook: rr.GetPostHook().GetExpr(), - } - hasTest := false - for _, p := range rr.GetPipelines() { - step := &PipelineStep{ - Name: p.GetName(), - Description: p.GetDescription(), - PreValidation: p.GetPreValidation(), - PreScript: p.GetPreScript(), - PreHook: p.GetPreHook().GetExpr(), - PostScript: p.GetPostScript(), - PostValidation: p.GetPostValidation(), - PostHook: p.GetPostHook().GetExpr(), - Operation: PipelineStepOperation{ - Type: p.GetOperationType(), - Name: p.GetOperationName(), - Invoker: p.GetInvoker(), - Source: p.GetOperationSource(), - Test: p.GetTest(), - }, - } - if p.GetTest() != "" { - hasTest = true - } - resolver.Steps = append(resolver.Steps, step) - } - - // Pipeline Resolvers Execution Results - if resources.executionResultsSince != nil { - pageToken := "" - view := tailorv1.PipelineResolverExecutionResultView_PIPELINE_RESOLVER_EXECUTION_RESULT_VIEW_BASIC - if hasTest { - // Because branching occurs, context information is required. - view = tailorv1.PipelineResolverExecutionResultView_PIPELINE_RESOLVER_EXECUTION_RESULT_VIEW_FULL - } - L: - for { - res, err := c.client.ListPipelineResolverExecutionResults(ctx, connect.NewRequest(&tailorv1.ListPipelineResolverExecutionResultsRequest{ - WorkspaceId: c.cfg.WorkspaceID, - NamespaceName: p.GetNamespace().GetName(), - ResolverName: r.GetName(), - View: view, - PageSize: pageSize, - PageToken: pageToken, - })) - if err != nil { - return nil, err - } - for _, r := range res.Msg.GetResults() { - if r.GetCreatedAt().AsTime().Before(*resources.executionResultsSince) { - // Since the results are ordered by CreatedAt descending, - // we can stop fetching more results once we reach an older entry. - pageToken = "" - break L - } - resolver.ExecutionResults = append(resolver.ExecutionResults, r) - } - if res.Msg.GetNextPageToken() == "" { - break - } - pageToken = res.Msg.GetNextPageToken() - } - } - - pipeline.Resolvers = append(pipeline.Resolvers, resolver) - } - if res.Msg.GetNextPageToken() == "" { - break - } - pageToken = res.Msg.GetNextPageToken() - } + + if err := c.fetchPipelineResolvers(ctx, pipeline, p, resources); err != nil { + return err } - resources.Pipelines = append(resources.Pipelines, pipeline) - } - if res.Msg.GetNextPageToken() == "" { - break - } - pageToken = res.Msg.GetNextPageToken() + + mu.Lock() + pipelines = append(pipelines, pipeline) + mu.Unlock() + return nil + }) + } + + if err := g.Wait(); err != nil { + return err + } + + // Thread-safe append to resources + resources.mu.Lock() + resources.Pipelines = append(resources.Pipelines, pipelines...) + resources.mu.Unlock() + + if res.Msg.GetNextPageToken() == "" { + break } + pageToken = res.Msg.GetNextPageToken() } + return nil +} - // TailorDB Services - if !resources.withoutTailorDB { - pageToken := "" - for { - res, err := c.client.ListTailorDBServices(ctx, connect.NewRequest(&tailorv1.ListTailorDBServicesRequest{ - WorkspaceId: c.cfg.WorkspaceID, - PageSize: pageSize, - PageToken: pageToken, - })) - if err != nil { - return nil, err +// fetchPipelineResolvers fetches pipeline resolvers in parallel +func (c *Client) fetchPipelineResolvers(ctx context.Context, pipeline *Pipeline, p *tailorv1.PipelineService, resources *Resources) error { + pageToken := "" + for { + res, err := c.client.ListPipelineResolvers(ctx, connect.NewRequest(&tailorv1.ListPipelineResolversRequest{ + WorkspaceId: c.cfg.WorkspaceID, + NamespaceName: p.GetNamespace().GetName(), + PageSize: pageSize, + PageToken: pageToken, + })) + if err != nil { + return err + } + + // Process resolvers in parallel + g, ctx := errgroup.WithContext(ctx) + var resolvers []*PipelineResolver + var mu sync.Mutex + + for _, r := range res.Msg.GetPipelineResolvers() { + g.Go(func() error { + resolver, err := c.fetchPipelineResolverDetails(ctx, p, r, resources) + if err != nil { + return err + } + + mu.Lock() + resolvers = append(resolvers, resolver) + mu.Unlock() + return nil + }) + } + + if err := g.Wait(); err != nil { + return err + } + + pipeline.Resolvers = append(pipeline.Resolvers, resolvers...) + + if res.Msg.GetNextPageToken() == "" { + break + } + pageToken = res.Msg.GetNextPageToken() + } + return nil +} + +// fetchPipelineResolverDetails fetches pipeline resolver details +func (c *Client) fetchPipelineResolverDetails(ctx context.Context, p *tailorv1.PipelineService, r *tailorv1.PipelineResolver, resources *Resources) (*PipelineResolver, error) { + res, err := c.client.GetPipelineResolver(ctx, connect.NewRequest(&tailorv1.GetPipelineResolverRequest{ + WorkspaceId: c.cfg.WorkspaceID, + NamespaceName: p.GetNamespace().GetName(), + ResolverName: r.GetName(), + })) + if err != nil { + return nil, err + } + + rr := res.Msg.GetPipelineResolver() + resolver := &PipelineResolver{ + Name: rr.GetName(), + Description: rr.GetDescription(), + Authorization: rr.GetAuthorization(), + SDL: rr.GetSdl(), + PreHook: rr.GetPreHook().GetExpr(), + PreScript: rr.GetPreScript(), + PostScript: rr.GetPostScript(), + PostHook: rr.GetPostHook().GetExpr(), + } + + hasTest := false + for _, p := range rr.GetPipelines() { + step := &PipelineStep{ + Name: p.GetName(), + Description: p.GetDescription(), + PreValidation: p.GetPreValidation(), + PreScript: p.GetPreScript(), + PreHook: p.GetPreHook().GetExpr(), + PostScript: p.GetPostScript(), + PostValidation: p.GetPostValidation(), + PostHook: p.GetPostHook().GetExpr(), + Operation: PipelineStepOperation{ + Type: p.GetOperationType(), + Name: p.GetOperationName(), + Invoker: p.GetInvoker(), + Source: p.GetOperationSource(), + Test: p.GetTest(), + }, + } + if p.GetTest() != "" { + hasTest = true + } + resolver.Steps = append(resolver.Steps, step) + } + + // Pipeline Resolvers Execution Results + if resources.executionResultsSince != nil { + if err := c.fetchExecutionResults(ctx, resolver, p, r, hasTest, resources); err != nil { + return nil, err + } + } + + return resolver, nil +} + +// fetchExecutionResults fetches execution results for a resolver +func (c *Client) fetchExecutionResults(ctx context.Context, resolver *PipelineResolver, p *tailorv1.PipelineService, r *tailorv1.PipelineResolver, hasTest bool, resources *Resources) error { + pageToken := "" + view := tailorv1.PipelineResolverExecutionResultView_PIPELINE_RESOLVER_EXECUTION_RESULT_VIEW_BASIC + if hasTest { + // Because branching occurs, context information is required. + view = tailorv1.PipelineResolverExecutionResultView_PIPELINE_RESOLVER_EXECUTION_RESULT_VIEW_FULL + } + +L: + for { + res, err := c.client.ListPipelineResolverExecutionResults(ctx, connect.NewRequest(&tailorv1.ListPipelineResolverExecutionResultsRequest{ + WorkspaceId: c.cfg.WorkspaceID, + NamespaceName: p.GetNamespace().GetName(), + ResolverName: r.GetName(), + View: view, + PageSize: pageSize, + PageToken: pageToken, + })) + if err != nil { + return err + } + for _, r := range res.Msg.GetResults() { + if r.GetCreatedAt().AsTime().Before(*resources.executionResultsSince) { + // Since the results are ordered by CreatedAt descending, + // we can stop fetching more results once we reach an older entry. + pageToken = "" + break L } - for _, t := range res.Msg.GetTailordbServices() { + resolver.ExecutionResults = append(resolver.ExecutionResults, r) + } + if res.Msg.GetNextPageToken() == "" { + break + } + pageToken = res.Msg.GetNextPageToken() + } + return nil +} + +// fetchTailorDBServices fetches TailorDB services in parallel +func (c *Client) fetchTailorDBServices(ctx context.Context, resources *Resources) error { + pageToken := "" + for { + res, err := c.client.ListTailorDBServices(ctx, connect.NewRequest(&tailorv1.ListTailorDBServicesRequest{ + WorkspaceId: c.cfg.WorkspaceID, + PageSize: pageSize, + PageToken: pageToken, + })) + if err != nil { + return err + } + + // Process TailorDB services in parallel + g, ctx := errgroup.WithContext(ctx) + var tailordbs []*TailorDB + var mu sync.Mutex + + for _, t := range res.Msg.GetTailordbServices() { + g.Go(func() error { tailordb := &TailorDB{ NamespaceName: t.GetNamespace().GetName(), } - // TailorDB Types - { - pageToken := "" - for { - res, err := c.client.ListTailorDBTypes(ctx, connect.NewRequest(&tailorv1.ListTailorDBTypesRequest{ - WorkspaceId: c.cfg.WorkspaceID, - NamespaceName: t.GetNamespace().GetName(), - PageSize: pageSize, - PageToken: pageToken, - })) - if err != nil { - return nil, err - } - for _, tt := range res.Msg.GetTailordbTypes() { - res, err := c.client.GetTailorDBType(ctx, connect.NewRequest(&tailorv1.GetTailorDBTypeRequest{ - WorkspaceId: c.cfg.WorkspaceID, - NamespaceName: t.GetNamespace().GetName(), - TailordbTypeName: tt.GetName(), - })) - if err != nil { - return nil, err - } - ttt := res.Msg.GetTailordbType() - tailordbType := &TailorDBType{ - Name: ttt.GetName(), - Description: ttt.GetSchema().GetDescription(), - Draft: ttt.GetSchema().GetSettings().GetDraft(), - } - tailordbType.Fields = convertTailorDBFields(ttt.GetSchema().GetFields()) - tailordb.Types = append(tailordb.Types, tailordbType) - if ttt.GetSchema().GetPermission() != nil { - tailordbType.Permission = &TailorDBPermission{} - } - if ttt.GetSchema().GetTypePermission() != nil { - tailordbType.TypePermission = &TailorDBTypePermission{} - } - if ttt.GetSchema().GetRecordPermission() != nil { - tailordbType.RecordPermission = &TailorDBRecordPermission{} - } - if _, err := c.client.GetTailorDBGQLPermission(ctx, connect.NewRequest(&tailorv1.GetTailorDBGQLPermissionRequest{ - WorkspaceId: c.cfg.WorkspaceID, - NamespaceName: t.GetNamespace().GetName(), - TypeName: tt.GetName(), - })); err == nil { - tailordbType.GQLPermission = &TailorDBGQLPermission{} - } - } - if res.Msg.GetNextPageToken() == "" { - break - } - pageToken = res.Msg.GetNextPageToken() - } + + if err := c.fetchTailorDBTypes(ctx, tailordb, t); err != nil { + return err } - resources.TailorDBs = append(resources.TailorDBs, tailordb) - } - if res.Msg.GetNextPageToken() == "" { - break - } - pageToken = res.Msg.GetNextPageToken() + + mu.Lock() + tailordbs = append(tailordbs, tailordb) + mu.Unlock() + return nil + }) + } + + if err := g.Wait(); err != nil { + return err + } + + // Thread-safe append to resources + resources.mu.Lock() + resources.TailorDBs = append(resources.TailorDBs, tailordbs...) + resources.mu.Unlock() + + if res.Msg.GetNextPageToken() == "" { + break } + pageToken = res.Msg.GetNextPageToken() } + return nil +} - // StateFlow Services - if !resources.withoutStateFlow { - pageToken := "" - for { - res, err := c.client.ListStateflowServices(ctx, connect.NewRequest(&tailorv1.ListStateflowServicesRequest{ - WorkspaceId: c.cfg.WorkspaceID, - PageSize: pageSize, - PageToken: pageToken, - })) - if err != nil { - return nil, err - } - for _, s := range res.Msg.GetStateflowServices() { - stateflow := &StateFlow{ - NamespaceName: s.GetNamespace().GetName(), - } - // StateFlow Admin Users - for _, admin := range s.GetAdminUsers() { - adminUser := &StateFlowAdminUser{ - UserID: admin.GetUserId(), - } - stateflow.AdminUsers = append(stateflow.AdminUsers, adminUser) +// fetchTailorDBTypes fetches TailorDB types in parallel +func (c *Client) fetchTailorDBTypes(ctx context.Context, tailordb *TailorDB, t *tailorv1.TailorDBService) error { + pageToken := "" + for { + res, err := c.client.ListTailorDBTypes(ctx, connect.NewRequest(&tailorv1.ListTailorDBTypesRequest{ + WorkspaceId: c.cfg.WorkspaceID, + NamespaceName: t.GetNamespace().GetName(), + PageSize: pageSize, + PageToken: pageToken, + })) + if err != nil { + return err + } + + // Process types in parallel + g, ctx := errgroup.WithContext(ctx) + var types []*TailorDBType + var mu sync.Mutex + + for _, tt := range res.Msg.GetTailordbTypes() { + g.Go(func() error { + tailordbType, err := c.fetchTailorDBTypeDetails(ctx, t, tt) + if err != nil { + return err } - resources.StateFlows = append(resources.StateFlows, stateflow) + + mu.Lock() + types = append(types, tailordbType) + mu.Unlock() + return nil + }) + } + + if err := g.Wait(); err != nil { + return err + } + + tailordb.Types = append(tailordb.Types, types...) + + if res.Msg.GetNextPageToken() == "" { + break + } + pageToken = res.Msg.GetNextPageToken() + } + return nil +} + +// fetchTailorDBTypeDetails fetches TailorDB type details +func (c *Client) fetchTailorDBTypeDetails(ctx context.Context, t *tailorv1.TailorDBService, tt *tailorv1.TailorDBType) (*TailorDBType, error) { + res, err := c.client.GetTailorDBType(ctx, connect.NewRequest(&tailorv1.GetTailorDBTypeRequest{ + WorkspaceId: c.cfg.WorkspaceID, + NamespaceName: t.GetNamespace().GetName(), + TailordbTypeName: tt.GetName(), + })) + if err != nil { + return nil, err + } + + ttt := res.Msg.GetTailordbType() + tailordbType := &TailorDBType{ + Name: ttt.GetName(), + Description: ttt.GetSchema().GetDescription(), + Draft: ttt.GetSchema().GetSettings().GetDraft(), + } + tailordbType.Fields = convertTailorDBFields(ttt.GetSchema().GetFields()) + + if ttt.GetSchema().GetPermission() != nil { + tailordbType.Permission = &TailorDBPermission{} + } + if ttt.GetSchema().GetTypePermission() != nil { + tailordbType.TypePermission = &TailorDBTypePermission{} + } + if ttt.GetSchema().GetRecordPermission() != nil { + tailordbType.RecordPermission = &TailorDBRecordPermission{} + } + if _, err := c.client.GetTailorDBGQLPermission(ctx, connect.NewRequest(&tailorv1.GetTailorDBGQLPermissionRequest{ + WorkspaceId: c.cfg.WorkspaceID, + NamespaceName: t.GetNamespace().GetName(), + TypeName: tt.GetName(), + })); err == nil { + tailordbType.GQLPermission = &TailorDBGQLPermission{} + } + + return tailordbType, nil +} + +// fetchStateFlowServices fetches StateFlow services +func (c *Client) fetchStateFlowServices(ctx context.Context, resources *Resources) error { + pageToken := "" + for { + res, err := c.client.ListStateflowServices(ctx, connect.NewRequest(&tailorv1.ListStateflowServicesRequest{ + WorkspaceId: c.cfg.WorkspaceID, + PageSize: pageSize, + PageToken: pageToken, + })) + if err != nil { + return err + } + + var stateflows []*StateFlow + for _, s := range res.Msg.GetStateflowServices() { + stateflow := &StateFlow{ + NamespaceName: s.GetNamespace().GetName(), } - if res.Msg.GetNextPageToken() == "" { - break + // StateFlow Admin Users + for _, admin := range s.GetAdminUsers() { + adminUser := &StateFlowAdminUser{ + UserID: admin.GetUserId(), + } + stateflow.AdminUsers = append(stateflow.AdminUsers, adminUser) } - pageToken = res.Msg.GetNextPageToken() + stateflows = append(stateflows, stateflow) } - } - return resources, nil + // Thread-safe append to resources + resources.mu.Lock() + resources.StateFlows = append(resources.StateFlows, stateflows...) + resources.mu.Unlock() + + if res.Msg.GetNextPageToken() == "" { + break + } + pageToken = res.Msg.GetNextPageToken() + } + return nil } // convertTailorDBFields converts proto FieldConfig map to TailorDBField slice. From b5f2621a42c525c71b2b91856c3443045ffb3ad4 Mon Sep 17 00:00:00 2001 From: Ken'ichiro Oyama Date: Mon, 22 Sep 2025 14:44:46 +0900 Subject: [PATCH 2/2] chore: fix lint warn --- tailor/resource.go | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/tailor/resource.go b/tailor/resource.go index 4ab8ea6..d8ce772 100644 --- a/tailor/resource.go +++ b/tailor/resource.go @@ -208,7 +208,7 @@ func (c *Client) Resources(ctx context.Context, opts ...ResourceOption) (*Resour return resources, nil } -// fetchPipelineServices fetches pipeline services in parallel +// fetchPipelineServices fetches pipeline services in parallel. func (c *Client) fetchPipelineServices(ctx context.Context, resources *Resources) error { pageToken := "" for { @@ -261,7 +261,7 @@ func (c *Client) fetchPipelineServices(ctx context.Context, resources *Resources return nil } -// fetchPipelineResolvers fetches pipeline resolvers in parallel +// fetchPipelineResolvers fetches pipeline resolvers in parallel. func (c *Client) fetchPipelineResolvers(ctx context.Context, pipeline *Pipeline, p *tailorv1.PipelineService, resources *Resources) error { pageToken := "" for { @@ -308,7 +308,7 @@ func (c *Client) fetchPipelineResolvers(ctx context.Context, pipeline *Pipeline, return nil } -// fetchPipelineResolverDetails fetches pipeline resolver details +// fetchPipelineResolverDetails fetches pipeline resolver details. func (c *Client) fetchPipelineResolverDetails(ctx context.Context, p *tailorv1.PipelineService, r *tailorv1.PipelineResolver, resources *Resources) (*PipelineResolver, error) { res, err := c.client.GetPipelineResolver(ctx, connect.NewRequest(&tailorv1.GetPipelineResolverRequest{ WorkspaceId: c.cfg.WorkspaceID, @@ -366,7 +366,7 @@ func (c *Client) fetchPipelineResolverDetails(ctx context.Context, p *tailorv1.P return resolver, nil } -// fetchExecutionResults fetches execution results for a resolver +// fetchExecutionResults fetches execution results for a resolver. func (c *Client) fetchExecutionResults(ctx context.Context, resolver *PipelineResolver, p *tailorv1.PipelineService, r *tailorv1.PipelineResolver, hasTest bool, resources *Resources) error { pageToken := "" view := tailorv1.PipelineResolverExecutionResultView_PIPELINE_RESOLVER_EXECUTION_RESULT_VIEW_BASIC @@ -392,7 +392,6 @@ L: if r.GetCreatedAt().AsTime().Before(*resources.executionResultsSince) { // Since the results are ordered by CreatedAt descending, // we can stop fetching more results once we reach an older entry. - pageToken = "" break L } resolver.ExecutionResults = append(resolver.ExecutionResults, r) @@ -405,7 +404,7 @@ L: return nil } -// fetchTailorDBServices fetches TailorDB services in parallel +// fetchTailorDBServices fetches TailorDB services in parallel. func (c *Client) fetchTailorDBServices(ctx context.Context, resources *Resources) error { pageToken := "" for { @@ -457,7 +456,7 @@ func (c *Client) fetchTailorDBServices(ctx context.Context, resources *Resources return nil } -// fetchTailorDBTypes fetches TailorDB types in parallel +// fetchTailorDBTypes fetches TailorDB types in parallel. func (c *Client) fetchTailorDBTypes(ctx context.Context, tailordb *TailorDB, t *tailorv1.TailorDBService) error { pageToken := "" for { @@ -504,7 +503,7 @@ func (c *Client) fetchTailorDBTypes(ctx context.Context, tailordb *TailorDB, t * return nil } -// fetchTailorDBTypeDetails fetches TailorDB type details +// fetchTailorDBTypeDetails fetches TailorDB type details. func (c *Client) fetchTailorDBTypeDetails(ctx context.Context, t *tailorv1.TailorDBService, tt *tailorv1.TailorDBType) (*TailorDBType, error) { res, err := c.client.GetTailorDBType(ctx, connect.NewRequest(&tailorv1.GetTailorDBTypeRequest{ WorkspaceId: c.cfg.WorkspaceID, @@ -543,7 +542,7 @@ func (c *Client) fetchTailorDBTypeDetails(ctx context.Context, t *tailorv1.Tailo return tailordbType, nil } -// fetchStateFlowServices fetches StateFlow services +// fetchStateFlowServices fetches StateFlow services. func (c *Client) fetchStateFlowServices(ctx context.Context, resources *Resources) error { pageToken := "" for {