From 1e76290a659185a049a14caf972d3d3a42540610 Mon Sep 17 00:00:00 2001 From: Dylan Moreland Date: Thu, 19 Mar 2026 14:56:50 -0400 Subject: [PATCH 1/3] Add a query for available event types Shows type, count, first seen, last seen. Much like the query in Telemetry. --- internal/graph/base.resolvers.go | 22 ++ internal/graph/generated.go | 393 +++++++++++++++++++++++++++++ internal/graph/model/models_gen.go | 12 + pkg/eventrepo/event_repo_test.go | 134 ++++++++++ pkg/eventrepo/eventrepo.go | 54 ++++ schema/base.graphqls | 19 ++ 6 files changed, 634 insertions(+) diff --git a/internal/graph/base.resolvers.go b/internal/graph/base.resolvers.go index 0946a91..a2e468c 100644 --- a/internal/graph/base.resolvers.go +++ b/internal/graph/base.resolvers.go @@ -114,6 +114,28 @@ func (r *queryResolver) CloudEvents(ctx context.Context, did string, limit *int, return out, nil } +// AvailableEvents is the resolver for the availableEvents field. +func (r *queryResolver) AvailableEvents(ctx context.Context, did string, filter *model.CloudEventFilter) ([]*model.EventTypeSummary, error) { + opts, err := r.requireSubjectOptsByDID(ctx, did, filter) + if err != nil { + return nil, err + } + summaries, err := r.EventService.GetEventTypeSummaries(ctx, opts) + if err != nil { + return nil, err + } + out := make([]*model.EventTypeSummary, len(summaries)) + for i, s := range summaries { + out[i] = &model.EventTypeSummary{ + Type: s.Type, + Count: int(s.Count), + FirstSeen: s.FirstSeen, + LastSeen: s.LastSeen, + } + } + return out, nil +} + // CloudEvent returns CloudEventResolver implementation. func (r *Resolver) CloudEvent() CloudEventResolver { return &cloudEventResolver{r} } diff --git a/internal/graph/generated.go b/internal/graph/generated.go index 59140a5..ff709c5 100644 --- a/internal/graph/generated.go +++ b/internal/graph/generated.go @@ -74,7 +74,15 @@ type ComplexityRoot struct { IndexKey func(childComplexity int) int } + EventTypeSummary struct { + Count func(childComplexity int) int + FirstSeen func(childComplexity int) int + LastSeen func(childComplexity int) int + Type func(childComplexity int) int + } + Query struct { + AvailableEvents func(childComplexity int, did string, filter *model.CloudEventFilter) int CloudEvents func(childComplexity int, did string, limit *int, filter *model.CloudEventFilter) int Indexes func(childComplexity int, did string, limit *int, filter *model.CloudEventFilter) int LatestCloudEvent func(childComplexity int, did string, filter *model.CloudEventFilter) int @@ -92,6 +100,7 @@ type QueryResolver interface { Indexes(ctx context.Context, did string, limit *int, filter *model.CloudEventFilter) ([]*model.CloudEventIndex, error) LatestCloudEvent(ctx context.Context, did string, filter *model.CloudEventFilter) (*CloudEventWrapper, error) CloudEvents(ctx context.Context, did string, limit *int, filter *model.CloudEventFilter) ([]*CloudEventWrapper, error) + AvailableEvents(ctx context.Context, did string, filter *model.CloudEventFilter) ([]*model.EventTypeSummary, error) } type executableSchema struct { @@ -218,6 +227,42 @@ func (e *executableSchema) Complexity(ctx context.Context, typeName, field strin return e.complexity.CloudEventIndex.IndexKey(childComplexity), true + case "EventTypeSummary.count": + if e.complexity.EventTypeSummary.Count == nil { + break + } + + return e.complexity.EventTypeSummary.Count(childComplexity), true + case "EventTypeSummary.firstSeen": + if e.complexity.EventTypeSummary.FirstSeen == nil { + break + } + + return e.complexity.EventTypeSummary.FirstSeen(childComplexity), true + case "EventTypeSummary.lastSeen": + if e.complexity.EventTypeSummary.LastSeen == nil { + break + } + + return e.complexity.EventTypeSummary.LastSeen(childComplexity), true + case "EventTypeSummary.type": + if e.complexity.EventTypeSummary.Type == nil { + break + } + + return e.complexity.EventTypeSummary.Type(childComplexity), true + + case "Query.availableEvents": + if e.complexity.Query.AvailableEvents == nil { + break + } + + args, err := ec.field_Query_availableEvents_args(ctx, rawArgs) + if err != nil { + return 0, false + } + + return e.complexity.Query.AvailableEvents(childComplexity, args["did"].(string), args["filter"].(*model.CloudEventFilter)), true case "Query.cloudEvents": if e.complexity.Query.CloudEvents == nil { break @@ -376,6 +421,20 @@ type CloudEvent { dataBase64: String } +""" +Summary of a single cloud event type for a subject. +""" +type EventTypeSummary { + """Cloud event type string (e.g. "dimo.status").""" + type: String! + """Number of cloud events of this type.""" + count: Int! + """Earliest event timestamp.""" + firstSeen: Time! + """Latest event timestamp.""" + lastSeen: Time! +} + """ The root query type for the Fetch API GraphQL schema. ERC721 DID (e.g. did:eth:chainId:contract:tokenId). """ @@ -399,6 +458,11 @@ type Query { List full cloud events. """ cloudEvents(did: String!, limit: Int = 10, filter: CloudEventFilter): [CloudEvent!]! + + """ + List event types available for a subject, with counts and time ranges. + """ + availableEvents(did: String!, filter: CloudEventFilter): [EventTypeSummary!]! } """ @@ -458,6 +522,22 @@ func (ec *executionContext) field_Query___type_args(ctx context.Context, rawArgs return args, nil } +func (ec *executionContext) field_Query_availableEvents_args(ctx context.Context, rawArgs map[string]any) (map[string]any, error) { + var err error + args := map[string]any{} + arg0, err := graphql.ProcessArgField(ctx, rawArgs, "did", ec.unmarshalNString2string) + if err != nil { + return nil, err + } + args["did"] = arg0 + arg1, err := graphql.ProcessArgField(ctx, rawArgs, "filter", ec.unmarshalOCloudEventFilter2ᚖgithubᚗcomᚋDIMOᚑNetworkᚋfetchᚑapiᚋinternalᚋgraphᚋmodelᚐCloudEventFilter) + if err != nil { + return nil, err + } + args["filter"] = arg1 + return args, nil +} + func (ec *executionContext) field_Query_cloudEvents_args(ctx context.Context, rawArgs map[string]any) (map[string]any, error) { var err error args := map[string]any{} @@ -1129,6 +1209,122 @@ func (ec *executionContext) fieldContext_CloudEventIndex_indexKey(_ context.Cont return fc, nil } +func (ec *executionContext) _EventTypeSummary_type(ctx context.Context, field graphql.CollectedField, obj *model.EventTypeSummary) (ret graphql.Marshaler) { + return graphql.ResolveField( + ctx, + ec.OperationContext, + field, + ec.fieldContext_EventTypeSummary_type, + func(ctx context.Context) (any, error) { + return obj.Type, nil + }, + nil, + ec.marshalNString2string, + true, + true, + ) +} + +func (ec *executionContext) fieldContext_EventTypeSummary_type(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "EventTypeSummary", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type String does not have child fields") + }, + } + return fc, nil +} + +func (ec *executionContext) _EventTypeSummary_count(ctx context.Context, field graphql.CollectedField, obj *model.EventTypeSummary) (ret graphql.Marshaler) { + return graphql.ResolveField( + ctx, + ec.OperationContext, + field, + ec.fieldContext_EventTypeSummary_count, + func(ctx context.Context) (any, error) { + return obj.Count, nil + }, + nil, + ec.marshalNInt2int, + true, + true, + ) +} + +func (ec *executionContext) fieldContext_EventTypeSummary_count(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "EventTypeSummary", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type Int does not have child fields") + }, + } + return fc, nil +} + +func (ec *executionContext) _EventTypeSummary_firstSeen(ctx context.Context, field graphql.CollectedField, obj *model.EventTypeSummary) (ret graphql.Marshaler) { + return graphql.ResolveField( + ctx, + ec.OperationContext, + field, + ec.fieldContext_EventTypeSummary_firstSeen, + func(ctx context.Context) (any, error) { + return obj.FirstSeen, nil + }, + nil, + ec.marshalNTime2timeᚐTime, + true, + true, + ) +} + +func (ec *executionContext) fieldContext_EventTypeSummary_firstSeen(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "EventTypeSummary", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type Time does not have child fields") + }, + } + return fc, nil +} + +func (ec *executionContext) _EventTypeSummary_lastSeen(ctx context.Context, field graphql.CollectedField, obj *model.EventTypeSummary) (ret graphql.Marshaler) { + return graphql.ResolveField( + ctx, + ec.OperationContext, + field, + ec.fieldContext_EventTypeSummary_lastSeen, + func(ctx context.Context) (any, error) { + return obj.LastSeen, nil + }, + nil, + ec.marshalNTime2timeᚐTime, + true, + true, + ) +} + +func (ec *executionContext) fieldContext_EventTypeSummary_lastSeen(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "EventTypeSummary", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type Time does not have child fields") + }, + } + return fc, nil +} + func (ec *executionContext) _Query_latestIndex(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) { return graphql.ResolveField( ctx, @@ -1321,6 +1517,57 @@ func (ec *executionContext) fieldContext_Query_cloudEvents(ctx context.Context, return fc, nil } +func (ec *executionContext) _Query_availableEvents(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) { + return graphql.ResolveField( + ctx, + ec.OperationContext, + field, + ec.fieldContext_Query_availableEvents, + func(ctx context.Context) (any, error) { + fc := graphql.GetFieldContext(ctx) + return ec.resolvers.Query().AvailableEvents(ctx, fc.Args["did"].(string), fc.Args["filter"].(*model.CloudEventFilter)) + }, + nil, + ec.marshalNEventTypeSummary2ᚕᚖgithubᚗcomᚋDIMOᚑNetworkᚋfetchᚑapiᚋinternalᚋgraphᚋmodelᚐEventTypeSummaryᚄ, + true, + true, + ) +} + +func (ec *executionContext) fieldContext_Query_availableEvents(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "Query", + Field: field, + IsMethod: true, + IsResolver: true, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + switch field.Name { + case "type": + return ec.fieldContext_EventTypeSummary_type(ctx, field) + case "count": + return ec.fieldContext_EventTypeSummary_count(ctx, field) + case "firstSeen": + return ec.fieldContext_EventTypeSummary_firstSeen(ctx, field) + case "lastSeen": + return ec.fieldContext_EventTypeSummary_lastSeen(ctx, field) + } + return nil, fmt.Errorf("no field named %q was found under type EventTypeSummary", field.Name) + }, + } + defer func() { + if r := recover(); r != nil { + err = ec.Recover(ctx, r) + ec.Error(ctx, err) + } + }() + ctx = graphql.WithFieldContext(ctx, fc) + if fc.Args, err = ec.field_Query_availableEvents_args(ctx, field.ArgumentMap(ec.Variables)); err != nil { + ec.Error(ctx, err) + return fc, err + } + return fc, nil +} + func (ec *executionContext) _Query___type(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) { return graphql.ResolveField( ctx, @@ -3214,6 +3461,60 @@ func (ec *executionContext) _CloudEventIndex(ctx context.Context, sel ast.Select return out } +var eventTypeSummaryImplementors = []string{"EventTypeSummary"} + +func (ec *executionContext) _EventTypeSummary(ctx context.Context, sel ast.SelectionSet, obj *model.EventTypeSummary) graphql.Marshaler { + fields := graphql.CollectFields(ec.OperationContext, sel, eventTypeSummaryImplementors) + + out := graphql.NewFieldSet(fields) + deferred := make(map[string]*graphql.FieldSet) + for i, field := range fields { + switch field.Name { + case "__typename": + out.Values[i] = graphql.MarshalString("EventTypeSummary") + case "type": + out.Values[i] = ec._EventTypeSummary_type(ctx, field, obj) + if out.Values[i] == graphql.Null { + out.Invalids++ + } + case "count": + out.Values[i] = ec._EventTypeSummary_count(ctx, field, obj) + if out.Values[i] == graphql.Null { + out.Invalids++ + } + case "firstSeen": + out.Values[i] = ec._EventTypeSummary_firstSeen(ctx, field, obj) + if out.Values[i] == graphql.Null { + out.Invalids++ + } + case "lastSeen": + out.Values[i] = ec._EventTypeSummary_lastSeen(ctx, field, obj) + if out.Values[i] == graphql.Null { + out.Invalids++ + } + default: + panic("unknown field " + strconv.Quote(field.Name)) + } + } + out.Dispatch(ctx) + if out.Invalids > 0 { + return graphql.Null + } + + atomic.AddInt32(&ec.deferred, int32(len(deferred))) + + for label, dfs := range deferred { + ec.processDeferredGroup(graphql.DeferredGroup{ + Label: label, + Path: graphql.GetPath(ctx), + FieldSet: dfs, + Context: ctx, + }) + } + + return out +} + var queryImplementors = []string{"Query"} func (ec *executionContext) _Query(ctx context.Context, sel ast.SelectionSet) graphql.Marshaler { @@ -3320,6 +3621,28 @@ func (ec *executionContext) _Query(ctx context.Context, sel ast.SelectionSet) gr func(ctx context.Context) graphql.Marshaler { return innerFunc(ctx, out) }) } + out.Concurrently(i, func(ctx context.Context) graphql.Marshaler { return rrm(innerCtx) }) + case "availableEvents": + field := field + + innerFunc := func(ctx context.Context, fs *graphql.FieldSet) (res graphql.Marshaler) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + } + }() + res = ec._Query_availableEvents(ctx, field) + if res == graphql.Null { + atomic.AddUint32(&fs.Invalids, 1) + } + return res + } + + rrm := func(ctx context.Context) graphql.Marshaler { + return ec.OperationContext.RootResolverMiddleware(ctx, + func(ctx context.Context) graphql.Marshaler { return innerFunc(ctx, out) }) + } + out.Concurrently(i, func(ctx context.Context) graphql.Marshaler { return rrm(innerCtx) }) case "__type": out.Values[i] = ec.OperationContext.RootResolverMiddleware(innerCtx, func(ctx context.Context) (res graphql.Marshaler) { @@ -3833,6 +4156,76 @@ func (ec *executionContext) marshalNCloudEventIndex2ᚖgithubᚗcomᚋDIMOᚑNet return ec._CloudEventIndex(ctx, sel, v) } +func (ec *executionContext) marshalNEventTypeSummary2ᚕᚖgithubᚗcomᚋDIMOᚑNetworkᚋfetchᚑapiᚋinternalᚋgraphᚋmodelᚐEventTypeSummaryᚄ(ctx context.Context, sel ast.SelectionSet, v []*model.EventTypeSummary) graphql.Marshaler { + ret := make(graphql.Array, len(v)) + var wg sync.WaitGroup + isLen1 := len(v) == 1 + if !isLen1 { + wg.Add(len(v)) + } + for i := range v { + i := i + fc := &graphql.FieldContext{ + Index: &i, + Result: &v[i], + } + ctx := graphql.WithFieldContext(ctx, fc) + f := func(i int) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = nil + } + }() + if !isLen1 { + defer wg.Done() + } + ret[i] = ec.marshalNEventTypeSummary2ᚖgithubᚗcomᚋDIMOᚑNetworkᚋfetchᚑapiᚋinternalᚋgraphᚋmodelᚐEventTypeSummary(ctx, sel, v[i]) + } + if isLen1 { + f(i) + } else { + go f(i) + } + + } + wg.Wait() + + for _, e := range ret { + if e == graphql.Null { + return graphql.Null + } + } + + return ret +} + +func (ec *executionContext) marshalNEventTypeSummary2ᚖgithubᚗcomᚋDIMOᚑNetworkᚋfetchᚑapiᚋinternalᚋgraphᚋmodelᚐEventTypeSummary(ctx context.Context, sel ast.SelectionSet, v *model.EventTypeSummary) graphql.Marshaler { + if v == nil { + if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) { + graphql.AddErrorf(ctx, "the requested element is null which the schema does not allow") + } + return graphql.Null + } + return ec._EventTypeSummary(ctx, sel, v) +} + +func (ec *executionContext) unmarshalNInt2int(ctx context.Context, v any) (int, error) { + res, err := graphql.UnmarshalInt(v) + return res, graphql.ErrorOnPath(ctx, err) +} + +func (ec *executionContext) marshalNInt2int(ctx context.Context, sel ast.SelectionSet, v int) graphql.Marshaler { + _ = sel + res := graphql.MarshalInt(v) + if res == graphql.Null { + if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) { + graphql.AddErrorf(ctx, "the requested element is null which the schema does not allow") + } + } + return res +} + func (ec *executionContext) unmarshalNString2string(ctx context.Context, v any) (string, error) { res, err := graphql.UnmarshalString(v) return res, graphql.ErrorOnPath(ctx, err) diff --git a/internal/graph/model/models_gen.go b/internal/graph/model/models_gen.go index 82fe636..5b48fae 100644 --- a/internal/graph/model/models_gen.go +++ b/internal/graph/model/models_gen.go @@ -25,6 +25,18 @@ type CloudEventIndex struct { IndexKey string `json:"indexKey"` } +// Summary of a single cloud event type for a subject. +type EventTypeSummary struct { + // Cloud event type string (e.g. "dimo.status"). + Type string `json:"type"` + // Number of cloud events of this type. + Count int `json:"count"` + // Earliest event timestamp. + FirstSeen time.Time `json:"firstSeen"` + // Latest event timestamp. + LastSeen time.Time `json:"lastSeen"` +} + // The root query type for the Fetch API GraphQL schema. ERC721 DID (e.g. did:eth:chainId:contract:tokenId). type Query struct { } diff --git a/pkg/eventrepo/event_repo_test.go b/pkg/eventrepo/event_repo_test.go index 5838ce8..ce761fe 100644 --- a/pkg/eventrepo/event_repo_test.go +++ b/pkg/eventrepo/event_repo_test.go @@ -981,6 +981,140 @@ func TestListIndexesAdvanced(t *testing.T) { } } +// TestGetEventTypeSummaries tests the GetEventTypeSummaries function. +func TestGetEventTypeSummaries(t *testing.T) { + t.Parallel() + chContainer := setupClickHouseContainer(t) + + conn, err := chContainer.GetClickHouseAsConn() + require.NoError(t, err) + ctx := context.Background() + + contractAddr := randAddress() + deviceTokenID := big.NewInt(555555) + subject := cloudevent.ERC721DID{ + ChainID: 153, + ContractAddress: contractAddr, + TokenID: deviceTokenID, + } + subjectStr := subject.String() + + now := time.Now().UTC().Truncate(time.Second) + + // Insert 3 status events and 1 fingerprint event + status1 := &cloudevent.CloudEventHeader{ + Subject: subjectStr, + Type: cloudevent.TypeStatus, + Source: "source-A", + Time: now.Add(-3 * time.Hour), + } + status2 := &cloudevent.CloudEventHeader{ + Subject: subjectStr, + Type: cloudevent.TypeStatus, + Source: "source-A", + Time: now.Add(-2 * time.Hour), + } + status3 := &cloudevent.CloudEventHeader{ + Subject: subjectStr, + Type: cloudevent.TypeStatus, + Source: "source-B", + Time: now.Add(-1 * time.Hour), + } + fp1 := &cloudevent.CloudEventHeader{ + Subject: subjectStr, + Type: cloudevent.TypeFingerprint, + Source: "source-A", + Time: now.Add(-30 * time.Minute), + } + + insertTestData(t, ctx, conn, status1) + insertTestData(t, ctx, conn, status2) + insertTestData(t, ctx, conn, status3) + insertTestData(t, ctx, conn, fp1) + + indexService := eventrepo.New(conn, nil, "") + + t.Run("no filter returns all types", func(t *testing.T) { + opts := &grpc.SearchOptions{ + Subject: &wrapperspb.StringValue{Value: subjectStr}, + } + summaries, err := indexService.GetEventTypeSummaries(ctx, opts) + require.NoError(t, err) + require.Len(t, summaries, 2) + + // Results ordered by event_type; fingerprint < status alphabetically + fpSummary := summaries[0] + assert.Equal(t, cloudevent.TypeFingerprint, fpSummary.Type) + assert.Equal(t, uint64(1), fpSummary.Count) + assert.Equal(t, fp1.Time.UTC().Truncate(time.Second), fpSummary.FirstSeen.UTC().Truncate(time.Second)) + assert.Equal(t, fp1.Time.UTC().Truncate(time.Second), fpSummary.LastSeen.UTC().Truncate(time.Second)) + + statusSummary := summaries[1] + assert.Equal(t, cloudevent.TypeStatus, statusSummary.Type) + assert.Equal(t, uint64(3), statusSummary.Count) + assert.Equal(t, status1.Time.UTC().Truncate(time.Second), statusSummary.FirstSeen.UTC().Truncate(time.Second)) + assert.Equal(t, status3.Time.UTC().Truncate(time.Second), statusSummary.LastSeen.UTC().Truncate(time.Second)) + }) + + t.Run("type filter", func(t *testing.T) { + opts := &grpc.SearchOptions{ + Subject: &wrapperspb.StringValue{Value: subjectStr}, + Type: &wrapperspb.StringValue{Value: cloudevent.TypeStatus}, + } + summaries, err := indexService.GetEventTypeSummaries(ctx, opts) + require.NoError(t, err) + require.Len(t, summaries, 1) + assert.Equal(t, cloudevent.TypeStatus, summaries[0].Type) + assert.Equal(t, uint64(3), summaries[0].Count) + }) + + t.Run("time range filter", func(t *testing.T) { + opts := &grpc.SearchOptions{ + Subject: &wrapperspb.StringValue{Value: subjectStr}, + After: ×tamppb.Timestamp{Seconds: now.Add(-2*time.Hour - 30*time.Second).Unix()}, + Before: ×tamppb.Timestamp{Seconds: now.Add(-30*time.Minute + 30*time.Second).Unix()}, + } + summaries, err := indexService.GetEventTypeSummaries(ctx, opts) + require.NoError(t, err) + // Should include status2 (-2h), status3 (-1h), fp1 (-30m) + require.Len(t, summaries, 2) + + fpSummary := summaries[0] + assert.Equal(t, cloudevent.TypeFingerprint, fpSummary.Type) + assert.Equal(t, uint64(1), fpSummary.Count) + + statusSummary := summaries[1] + assert.Equal(t, cloudevent.TypeStatus, statusSummary.Type) + assert.Equal(t, uint64(2), statusSummary.Count) + }) + + t.Run("source filter", func(t *testing.T) { + opts := &grpc.SearchOptions{ + Subject: &wrapperspb.StringValue{Value: subjectStr}, + Source: &wrapperspb.StringValue{Value: "source-B"}, + } + summaries, err := indexService.GetEventTypeSummaries(ctx, opts) + require.NoError(t, err) + require.Len(t, summaries, 1) + assert.Equal(t, cloudevent.TypeStatus, summaries[0].Type) + assert.Equal(t, uint64(1), summaries[0].Count) + }) + + t.Run("no matching events returns empty slice", func(t *testing.T) { + otherSubject := cloudevent.ERC721DID{ + ChainID: 153, + ContractAddress: contractAddr, + TokenID: big.NewInt(999999), + } + opts := &grpc.SearchOptions{ + Subject: &wrapperspb.StringValue{Value: otherSubject.String()}, + } + summaries, err := indexService.GetEventTypeSummaries(ctx, opts) + require.NoError(t, err) + require.Empty(t, summaries) + }) +} + func randAddress() common.Address { privateKey, err := crypto.GenerateKey() if err != nil { diff --git a/pkg/eventrepo/eventrepo.go b/pkg/eventrepo/eventrepo.go index a18de4d..7b84bc5 100644 --- a/pkg/eventrepo/eventrepo.go +++ b/pkg/eventrepo/eventrepo.go @@ -8,6 +8,7 @@ import ( "encoding/json" "fmt" "strings" + "time" "github.com/ClickHouse/clickhouse-go/v2" "github.com/DIMO-Network/cloudevent" @@ -154,6 +155,59 @@ func (s *Service) ListIndexesAdvanced(ctx context.Context, limit int, advancedOp return cloudEvents, nil } +// EventTypeSummary holds per-type aggregate metadata for a subject. +type EventTypeSummary struct { + Type string + Count uint64 + FirstSeen time.Time + LastSeen time.Time +} + +// GetEventTypeSummaries returns per-type counts and time ranges for the given search options. +func (s *Service) GetEventTypeSummaries(ctx context.Context, opts *grpc.SearchOptions) ([]EventTypeSummary, error) { + advancedOpts := convertSearchOptionsToAdvanced(opts) + + mods := []qm.QueryMod{ + qm.Select( + chindexer.TypeColumn, + "count(*) AS count", + "MIN("+chindexer.TimestampColumn+") AS first_seen", + "MAX("+chindexer.TimestampColumn+") AS last_seen", + ), + qm.From(chindexer.TableName), + qm.GroupBy(chindexer.TypeColumn), + qm.OrderBy(chindexer.TypeColumn), + } + + if advancedOpts != nil { + mods = append(mods, AdvancedSearchOptionsToQueryMod(advancedOpts)...) + } + + query, args := newQuery(mods...) + rows, err := s.chConn.Query(ctx, query, args...) + if err != nil { + return nil, fmt.Errorf("failed to get event type summaries: %w", err) + } + + var summaries []EventTypeSummary + for rows.Next() { + var s EventTypeSummary + if err := rows.Scan(&s.Type, &s.Count, &s.FirstSeen, &s.LastSeen); err != nil { + _ = rows.Close() + return nil, fmt.Errorf("failed to scan event type summary: %w", err) + } + summaries = append(summaries, s) + } + _ = rows.Close() + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("failed to iterate event type summaries: %w", err) + } + if summaries == nil { + summaries = []EventTypeSummary{} + } + return summaries, nil +} + // ListCloudEvents fetches and returns the cloud events that match the given options. func (s *Service) ListCloudEvents(ctx context.Context, bucketName string, limit int, opts *grpc.SearchOptions) ([]cloudevent.RawEvent, error) { advancedOpts := convertSearchOptionsToAdvanced(opts) diff --git a/schema/base.graphqls b/schema/base.graphqls index 91b552e..d363410 100644 --- a/schema/base.graphqls +++ b/schema/base.graphqls @@ -20,6 +20,20 @@ type CloudEvent { dataBase64: String } +""" +Summary of a single cloud event type for a subject. +""" +type EventTypeSummary { + """Cloud event type string (e.g. "dimo.status").""" + type: String! + """Number of cloud events of this type.""" + count: Int! + """Earliest event timestamp.""" + firstSeen: Time! + """Latest event timestamp.""" + lastSeen: Time! +} + """ The root query type for the Fetch API GraphQL schema. ERC721 DID (e.g. did:eth:chainId:contract:tokenId). """ @@ -43,6 +57,11 @@ type Query { List full cloud events. """ cloudEvents(did: String!, limit: Int = 10, filter: CloudEventFilter): [CloudEvent!]! + + """ + List event types available for a subject, with counts and time ranges. + """ + availableEvents(did: String!, filter: CloudEventFilter): [EventTypeSummary!]! } """ From bf8716cc2141debbb24327ea0de326488528ea7d Mon Sep 17 00:00:00 2001 From: Dylan Moreland Date: Fri, 20 Mar 2026 12:23:49 -0400 Subject: [PATCH 2/3] Change language from "event" to "cloudEvent" --- internal/graph/base.resolvers.go | 10 +-- internal/graph/generated.go | 136 ++++++++++++++--------------- internal/graph/model/models_gen.go | 2 +- pkg/eventrepo/event_repo_test.go | 14 +-- pkg/eventrepo/eventrepo.go | 14 +-- schema/base.graphqls | 6 +- 6 files changed, 91 insertions(+), 91 deletions(-) diff --git a/internal/graph/base.resolvers.go b/internal/graph/base.resolvers.go index a2e468c..e3fb053 100644 --- a/internal/graph/base.resolvers.go +++ b/internal/graph/base.resolvers.go @@ -114,19 +114,19 @@ func (r *queryResolver) CloudEvents(ctx context.Context, did string, limit *int, return out, nil } -// AvailableEvents is the resolver for the availableEvents field. -func (r *queryResolver) AvailableEvents(ctx context.Context, did string, filter *model.CloudEventFilter) ([]*model.EventTypeSummary, error) { +// AvailableCloudEventTypes is the resolver for the availableCloudEventTypes field. +func (r *queryResolver) AvailableCloudEventTypes(ctx context.Context, did string, filter *model.CloudEventFilter) ([]*model.CloudEventTypeSummary, error) { opts, err := r.requireSubjectOptsByDID(ctx, did, filter) if err != nil { return nil, err } - summaries, err := r.EventService.GetEventTypeSummaries(ctx, opts) + summaries, err := r.EventService.GetCloudEventTypeSummaries(ctx, opts) if err != nil { return nil, err } - out := make([]*model.EventTypeSummary, len(summaries)) + out := make([]*model.CloudEventTypeSummary, len(summaries)) for i, s := range summaries { - out[i] = &model.EventTypeSummary{ + out[i] = &model.CloudEventTypeSummary{ Type: s.Type, Count: int(s.Count), FirstSeen: s.FirstSeen, diff --git a/internal/graph/generated.go b/internal/graph/generated.go index ff709c5..d7b824f 100644 --- a/internal/graph/generated.go +++ b/internal/graph/generated.go @@ -74,7 +74,7 @@ type ComplexityRoot struct { IndexKey func(childComplexity int) int } - EventTypeSummary struct { + CloudEventTypeSummary struct { Count func(childComplexity int) int FirstSeen func(childComplexity int) int LastSeen func(childComplexity int) int @@ -82,11 +82,11 @@ type ComplexityRoot struct { } Query struct { - AvailableEvents func(childComplexity int, did string, filter *model.CloudEventFilter) int - CloudEvents func(childComplexity int, did string, limit *int, filter *model.CloudEventFilter) int - Indexes func(childComplexity int, did string, limit *int, filter *model.CloudEventFilter) int - LatestCloudEvent func(childComplexity int, did string, filter *model.CloudEventFilter) int - LatestIndex func(childComplexity int, did string, filter *model.CloudEventFilter) int + AvailableCloudEventTypes func(childComplexity int, did string, filter *model.CloudEventFilter) int + CloudEvents func(childComplexity int, did string, limit *int, filter *model.CloudEventFilter) int + Indexes func(childComplexity int, did string, limit *int, filter *model.CloudEventFilter) int + LatestCloudEvent func(childComplexity int, did string, filter *model.CloudEventFilter) int + LatestIndex func(childComplexity int, did string, filter *model.CloudEventFilter) int } } @@ -100,7 +100,7 @@ type QueryResolver interface { Indexes(ctx context.Context, did string, limit *int, filter *model.CloudEventFilter) ([]*model.CloudEventIndex, error) LatestCloudEvent(ctx context.Context, did string, filter *model.CloudEventFilter) (*CloudEventWrapper, error) CloudEvents(ctx context.Context, did string, limit *int, filter *model.CloudEventFilter) ([]*CloudEventWrapper, error) - AvailableEvents(ctx context.Context, did string, filter *model.CloudEventFilter) ([]*model.EventTypeSummary, error) + AvailableCloudEventTypes(ctx context.Context, did string, filter *model.CloudEventFilter) ([]*model.CloudEventTypeSummary, error) } type executableSchema struct { @@ -227,42 +227,42 @@ func (e *executableSchema) Complexity(ctx context.Context, typeName, field strin return e.complexity.CloudEventIndex.IndexKey(childComplexity), true - case "EventTypeSummary.count": - if e.complexity.EventTypeSummary.Count == nil { + case "CloudEventTypeSummary.count": + if e.complexity.CloudEventTypeSummary.Count == nil { break } - return e.complexity.EventTypeSummary.Count(childComplexity), true - case "EventTypeSummary.firstSeen": - if e.complexity.EventTypeSummary.FirstSeen == nil { + return e.complexity.CloudEventTypeSummary.Count(childComplexity), true + case "CloudEventTypeSummary.firstSeen": + if e.complexity.CloudEventTypeSummary.FirstSeen == nil { break } - return e.complexity.EventTypeSummary.FirstSeen(childComplexity), true - case "EventTypeSummary.lastSeen": - if e.complexity.EventTypeSummary.LastSeen == nil { + return e.complexity.CloudEventTypeSummary.FirstSeen(childComplexity), true + case "CloudEventTypeSummary.lastSeen": + if e.complexity.CloudEventTypeSummary.LastSeen == nil { break } - return e.complexity.EventTypeSummary.LastSeen(childComplexity), true - case "EventTypeSummary.type": - if e.complexity.EventTypeSummary.Type == nil { + return e.complexity.CloudEventTypeSummary.LastSeen(childComplexity), true + case "CloudEventTypeSummary.type": + if e.complexity.CloudEventTypeSummary.Type == nil { break } - return e.complexity.EventTypeSummary.Type(childComplexity), true + return e.complexity.CloudEventTypeSummary.Type(childComplexity), true - case "Query.availableEvents": - if e.complexity.Query.AvailableEvents == nil { + case "Query.availableCloudEventTypes": + if e.complexity.Query.AvailableCloudEventTypes == nil { break } - args, err := ec.field_Query_availableEvents_args(ctx, rawArgs) + args, err := ec.field_Query_availableCloudEventTypes_args(ctx, rawArgs) if err != nil { return 0, false } - return e.complexity.Query.AvailableEvents(childComplexity, args["did"].(string), args["filter"].(*model.CloudEventFilter)), true + return e.complexity.Query.AvailableCloudEventTypes(childComplexity, args["did"].(string), args["filter"].(*model.CloudEventFilter)), true case "Query.cloudEvents": if e.complexity.Query.CloudEvents == nil { break @@ -424,7 +424,7 @@ type CloudEvent { """ Summary of a single cloud event type for a subject. """ -type EventTypeSummary { +type CloudEventTypeSummary { """Cloud event type string (e.g. "dimo.status").""" type: String! """Number of cloud events of this type.""" @@ -460,9 +460,9 @@ type Query { cloudEvents(did: String!, limit: Int = 10, filter: CloudEventFilter): [CloudEvent!]! """ - List event types available for a subject, with counts and time ranges. + List cloud event types available for a subject, with counts and time ranges. """ - availableEvents(did: String!, filter: CloudEventFilter): [EventTypeSummary!]! + availableCloudEventTypes(did: String!, filter: CloudEventFilter): [CloudEventTypeSummary!]! } """ @@ -522,7 +522,7 @@ func (ec *executionContext) field_Query___type_args(ctx context.Context, rawArgs return args, nil } -func (ec *executionContext) field_Query_availableEvents_args(ctx context.Context, rawArgs map[string]any) (map[string]any, error) { +func (ec *executionContext) field_Query_availableCloudEventTypes_args(ctx context.Context, rawArgs map[string]any) (map[string]any, error) { var err error args := map[string]any{} arg0, err := graphql.ProcessArgField(ctx, rawArgs, "did", ec.unmarshalNString2string) @@ -1209,12 +1209,12 @@ func (ec *executionContext) fieldContext_CloudEventIndex_indexKey(_ context.Cont return fc, nil } -func (ec *executionContext) _EventTypeSummary_type(ctx context.Context, field graphql.CollectedField, obj *model.EventTypeSummary) (ret graphql.Marshaler) { +func (ec *executionContext) _CloudEventTypeSummary_type(ctx context.Context, field graphql.CollectedField, obj *model.CloudEventTypeSummary) (ret graphql.Marshaler) { return graphql.ResolveField( ctx, ec.OperationContext, field, - ec.fieldContext_EventTypeSummary_type, + ec.fieldContext_CloudEventTypeSummary_type, func(ctx context.Context) (any, error) { return obj.Type, nil }, @@ -1225,9 +1225,9 @@ func (ec *executionContext) _EventTypeSummary_type(ctx context.Context, field gr ) } -func (ec *executionContext) fieldContext_EventTypeSummary_type(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { +func (ec *executionContext) fieldContext_CloudEventTypeSummary_type(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { fc = &graphql.FieldContext{ - Object: "EventTypeSummary", + Object: "CloudEventTypeSummary", Field: field, IsMethod: false, IsResolver: false, @@ -1238,12 +1238,12 @@ func (ec *executionContext) fieldContext_EventTypeSummary_type(_ context.Context return fc, nil } -func (ec *executionContext) _EventTypeSummary_count(ctx context.Context, field graphql.CollectedField, obj *model.EventTypeSummary) (ret graphql.Marshaler) { +func (ec *executionContext) _CloudEventTypeSummary_count(ctx context.Context, field graphql.CollectedField, obj *model.CloudEventTypeSummary) (ret graphql.Marshaler) { return graphql.ResolveField( ctx, ec.OperationContext, field, - ec.fieldContext_EventTypeSummary_count, + ec.fieldContext_CloudEventTypeSummary_count, func(ctx context.Context) (any, error) { return obj.Count, nil }, @@ -1254,9 +1254,9 @@ func (ec *executionContext) _EventTypeSummary_count(ctx context.Context, field g ) } -func (ec *executionContext) fieldContext_EventTypeSummary_count(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { +func (ec *executionContext) fieldContext_CloudEventTypeSummary_count(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { fc = &graphql.FieldContext{ - Object: "EventTypeSummary", + Object: "CloudEventTypeSummary", Field: field, IsMethod: false, IsResolver: false, @@ -1267,12 +1267,12 @@ func (ec *executionContext) fieldContext_EventTypeSummary_count(_ context.Contex return fc, nil } -func (ec *executionContext) _EventTypeSummary_firstSeen(ctx context.Context, field graphql.CollectedField, obj *model.EventTypeSummary) (ret graphql.Marshaler) { +func (ec *executionContext) _CloudEventTypeSummary_firstSeen(ctx context.Context, field graphql.CollectedField, obj *model.CloudEventTypeSummary) (ret graphql.Marshaler) { return graphql.ResolveField( ctx, ec.OperationContext, field, - ec.fieldContext_EventTypeSummary_firstSeen, + ec.fieldContext_CloudEventTypeSummary_firstSeen, func(ctx context.Context) (any, error) { return obj.FirstSeen, nil }, @@ -1283,9 +1283,9 @@ func (ec *executionContext) _EventTypeSummary_firstSeen(ctx context.Context, fie ) } -func (ec *executionContext) fieldContext_EventTypeSummary_firstSeen(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { +func (ec *executionContext) fieldContext_CloudEventTypeSummary_firstSeen(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { fc = &graphql.FieldContext{ - Object: "EventTypeSummary", + Object: "CloudEventTypeSummary", Field: field, IsMethod: false, IsResolver: false, @@ -1296,12 +1296,12 @@ func (ec *executionContext) fieldContext_EventTypeSummary_firstSeen(_ context.Co return fc, nil } -func (ec *executionContext) _EventTypeSummary_lastSeen(ctx context.Context, field graphql.CollectedField, obj *model.EventTypeSummary) (ret graphql.Marshaler) { +func (ec *executionContext) _CloudEventTypeSummary_lastSeen(ctx context.Context, field graphql.CollectedField, obj *model.CloudEventTypeSummary) (ret graphql.Marshaler) { return graphql.ResolveField( ctx, ec.OperationContext, field, - ec.fieldContext_EventTypeSummary_lastSeen, + ec.fieldContext_CloudEventTypeSummary_lastSeen, func(ctx context.Context) (any, error) { return obj.LastSeen, nil }, @@ -1312,9 +1312,9 @@ func (ec *executionContext) _EventTypeSummary_lastSeen(ctx context.Context, fiel ) } -func (ec *executionContext) fieldContext_EventTypeSummary_lastSeen(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { +func (ec *executionContext) fieldContext_CloudEventTypeSummary_lastSeen(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { fc = &graphql.FieldContext{ - Object: "EventTypeSummary", + Object: "CloudEventTypeSummary", Field: field, IsMethod: false, IsResolver: false, @@ -1517,24 +1517,24 @@ func (ec *executionContext) fieldContext_Query_cloudEvents(ctx context.Context, return fc, nil } -func (ec *executionContext) _Query_availableEvents(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) { +func (ec *executionContext) _Query_availableCloudEventTypes(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) { return graphql.ResolveField( ctx, ec.OperationContext, field, - ec.fieldContext_Query_availableEvents, + ec.fieldContext_Query_availableCloudEventTypes, func(ctx context.Context) (any, error) { fc := graphql.GetFieldContext(ctx) - return ec.resolvers.Query().AvailableEvents(ctx, fc.Args["did"].(string), fc.Args["filter"].(*model.CloudEventFilter)) + return ec.resolvers.Query().AvailableCloudEventTypes(ctx, fc.Args["did"].(string), fc.Args["filter"].(*model.CloudEventFilter)) }, nil, - ec.marshalNEventTypeSummary2ᚕᚖgithubᚗcomᚋDIMOᚑNetworkᚋfetchᚑapiᚋinternalᚋgraphᚋmodelᚐEventTypeSummaryᚄ, + ec.marshalNCloudEventTypeSummary2ᚕᚖgithubᚗcomᚋDIMOᚑNetworkᚋfetchᚑapiᚋinternalᚋgraphᚋmodelᚐCloudEventTypeSummaryᚄ, true, true, ) } -func (ec *executionContext) fieldContext_Query_availableEvents(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { +func (ec *executionContext) fieldContext_Query_availableCloudEventTypes(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { fc = &graphql.FieldContext{ Object: "Query", Field: field, @@ -1543,15 +1543,15 @@ func (ec *executionContext) fieldContext_Query_availableEvents(ctx context.Conte Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { switch field.Name { case "type": - return ec.fieldContext_EventTypeSummary_type(ctx, field) + return ec.fieldContext_CloudEventTypeSummary_type(ctx, field) case "count": - return ec.fieldContext_EventTypeSummary_count(ctx, field) + return ec.fieldContext_CloudEventTypeSummary_count(ctx, field) case "firstSeen": - return ec.fieldContext_EventTypeSummary_firstSeen(ctx, field) + return ec.fieldContext_CloudEventTypeSummary_firstSeen(ctx, field) case "lastSeen": - return ec.fieldContext_EventTypeSummary_lastSeen(ctx, field) + return ec.fieldContext_CloudEventTypeSummary_lastSeen(ctx, field) } - return nil, fmt.Errorf("no field named %q was found under type EventTypeSummary", field.Name) + return nil, fmt.Errorf("no field named %q was found under type CloudEventTypeSummary", field.Name) }, } defer func() { @@ -1561,7 +1561,7 @@ func (ec *executionContext) fieldContext_Query_availableEvents(ctx context.Conte } }() ctx = graphql.WithFieldContext(ctx, fc) - if fc.Args, err = ec.field_Query_availableEvents_args(ctx, field.ArgumentMap(ec.Variables)); err != nil { + if fc.Args, err = ec.field_Query_availableCloudEventTypes_args(ctx, field.ArgumentMap(ec.Variables)); err != nil { ec.Error(ctx, err) return fc, err } @@ -3461,34 +3461,34 @@ func (ec *executionContext) _CloudEventIndex(ctx context.Context, sel ast.Select return out } -var eventTypeSummaryImplementors = []string{"EventTypeSummary"} +var cloudEventTypeSummaryImplementors = []string{"CloudEventTypeSummary"} -func (ec *executionContext) _EventTypeSummary(ctx context.Context, sel ast.SelectionSet, obj *model.EventTypeSummary) graphql.Marshaler { - fields := graphql.CollectFields(ec.OperationContext, sel, eventTypeSummaryImplementors) +func (ec *executionContext) _CloudEventTypeSummary(ctx context.Context, sel ast.SelectionSet, obj *model.CloudEventTypeSummary) graphql.Marshaler { + fields := graphql.CollectFields(ec.OperationContext, sel, cloudEventTypeSummaryImplementors) out := graphql.NewFieldSet(fields) deferred := make(map[string]*graphql.FieldSet) for i, field := range fields { switch field.Name { case "__typename": - out.Values[i] = graphql.MarshalString("EventTypeSummary") + out.Values[i] = graphql.MarshalString("CloudEventTypeSummary") case "type": - out.Values[i] = ec._EventTypeSummary_type(ctx, field, obj) + out.Values[i] = ec._CloudEventTypeSummary_type(ctx, field, obj) if out.Values[i] == graphql.Null { out.Invalids++ } case "count": - out.Values[i] = ec._EventTypeSummary_count(ctx, field, obj) + out.Values[i] = ec._CloudEventTypeSummary_count(ctx, field, obj) if out.Values[i] == graphql.Null { out.Invalids++ } case "firstSeen": - out.Values[i] = ec._EventTypeSummary_firstSeen(ctx, field, obj) + out.Values[i] = ec._CloudEventTypeSummary_firstSeen(ctx, field, obj) if out.Values[i] == graphql.Null { out.Invalids++ } case "lastSeen": - out.Values[i] = ec._EventTypeSummary_lastSeen(ctx, field, obj) + out.Values[i] = ec._CloudEventTypeSummary_lastSeen(ctx, field, obj) if out.Values[i] == graphql.Null { out.Invalids++ } @@ -3622,7 +3622,7 @@ func (ec *executionContext) _Query(ctx context.Context, sel ast.SelectionSet) gr } out.Concurrently(i, func(ctx context.Context) graphql.Marshaler { return rrm(innerCtx) }) - case "availableEvents": + case "availableCloudEventTypes": field := field innerFunc := func(ctx context.Context, fs *graphql.FieldSet) (res graphql.Marshaler) { @@ -3631,7 +3631,7 @@ func (ec *executionContext) _Query(ctx context.Context, sel ast.SelectionSet) gr ec.Error(ctx, ec.Recover(ctx, r)) } }() - res = ec._Query_availableEvents(ctx, field) + res = ec._Query_availableCloudEventTypes(ctx, field) if res == graphql.Null { atomic.AddUint32(&fs.Invalids, 1) } @@ -4156,7 +4156,7 @@ func (ec *executionContext) marshalNCloudEventIndex2ᚖgithubᚗcomᚋDIMOᚑNet return ec._CloudEventIndex(ctx, sel, v) } -func (ec *executionContext) marshalNEventTypeSummary2ᚕᚖgithubᚗcomᚋDIMOᚑNetworkᚋfetchᚑapiᚋinternalᚋgraphᚋmodelᚐEventTypeSummaryᚄ(ctx context.Context, sel ast.SelectionSet, v []*model.EventTypeSummary) graphql.Marshaler { +func (ec *executionContext) marshalNCloudEventTypeSummary2ᚕᚖgithubᚗcomᚋDIMOᚑNetworkᚋfetchᚑapiᚋinternalᚋgraphᚋmodelᚐCloudEventTypeSummaryᚄ(ctx context.Context, sel ast.SelectionSet, v []*model.CloudEventTypeSummary) graphql.Marshaler { ret := make(graphql.Array, len(v)) var wg sync.WaitGroup isLen1 := len(v) == 1 @@ -4180,7 +4180,7 @@ func (ec *executionContext) marshalNEventTypeSummary2ᚕᚖgithubᚗcomᚋDIMO if !isLen1 { defer wg.Done() } - ret[i] = ec.marshalNEventTypeSummary2ᚖgithubᚗcomᚋDIMOᚑNetworkᚋfetchᚑapiᚋinternalᚋgraphᚋmodelᚐEventTypeSummary(ctx, sel, v[i]) + ret[i] = ec.marshalNCloudEventTypeSummary2ᚖgithubᚗcomᚋDIMOᚑNetworkᚋfetchᚑapiᚋinternalᚋgraphᚋmodelᚐCloudEventTypeSummary(ctx, sel, v[i]) } if isLen1 { f(i) @@ -4200,14 +4200,14 @@ func (ec *executionContext) marshalNEventTypeSummary2ᚕᚖgithubᚗcomᚋDIMO return ret } -func (ec *executionContext) marshalNEventTypeSummary2ᚖgithubᚗcomᚋDIMOᚑNetworkᚋfetchᚑapiᚋinternalᚋgraphᚋmodelᚐEventTypeSummary(ctx context.Context, sel ast.SelectionSet, v *model.EventTypeSummary) graphql.Marshaler { +func (ec *executionContext) marshalNCloudEventTypeSummary2ᚖgithubᚗcomᚋDIMOᚑNetworkᚋfetchᚑapiᚋinternalᚋgraphᚋmodelᚐCloudEventTypeSummary(ctx context.Context, sel ast.SelectionSet, v *model.CloudEventTypeSummary) graphql.Marshaler { if v == nil { if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) { graphql.AddErrorf(ctx, "the requested element is null which the schema does not allow") } return graphql.Null } - return ec._EventTypeSummary(ctx, sel, v) + return ec._CloudEventTypeSummary(ctx, sel, v) } func (ec *executionContext) unmarshalNInt2int(ctx context.Context, v any) (int, error) { diff --git a/internal/graph/model/models_gen.go b/internal/graph/model/models_gen.go index 5b48fae..17c4017 100644 --- a/internal/graph/model/models_gen.go +++ b/internal/graph/model/models_gen.go @@ -26,7 +26,7 @@ type CloudEventIndex struct { } // Summary of a single cloud event type for a subject. -type EventTypeSummary struct { +type CloudEventTypeSummary struct { // Cloud event type string (e.g. "dimo.status"). Type string `json:"type"` // Number of cloud events of this type. diff --git a/pkg/eventrepo/event_repo_test.go b/pkg/eventrepo/event_repo_test.go index ce761fe..75a879a 100644 --- a/pkg/eventrepo/event_repo_test.go +++ b/pkg/eventrepo/event_repo_test.go @@ -981,8 +981,8 @@ func TestListIndexesAdvanced(t *testing.T) { } } -// TestGetEventTypeSummaries tests the GetEventTypeSummaries function. -func TestGetEventTypeSummaries(t *testing.T) { +// TestGetCloudEventTypeSummaries tests the GetCloudEventTypeSummaries function. +func TestGetCloudEventTypeSummaries(t *testing.T) { t.Parallel() chContainer := setupClickHouseContainer(t) @@ -1038,7 +1038,7 @@ func TestGetEventTypeSummaries(t *testing.T) { opts := &grpc.SearchOptions{ Subject: &wrapperspb.StringValue{Value: subjectStr}, } - summaries, err := indexService.GetEventTypeSummaries(ctx, opts) + summaries, err := indexService.GetCloudEventTypeSummaries(ctx, opts) require.NoError(t, err) require.Len(t, summaries, 2) @@ -1061,7 +1061,7 @@ func TestGetEventTypeSummaries(t *testing.T) { Subject: &wrapperspb.StringValue{Value: subjectStr}, Type: &wrapperspb.StringValue{Value: cloudevent.TypeStatus}, } - summaries, err := indexService.GetEventTypeSummaries(ctx, opts) + summaries, err := indexService.GetCloudEventTypeSummaries(ctx, opts) require.NoError(t, err) require.Len(t, summaries, 1) assert.Equal(t, cloudevent.TypeStatus, summaries[0].Type) @@ -1074,7 +1074,7 @@ func TestGetEventTypeSummaries(t *testing.T) { After: ×tamppb.Timestamp{Seconds: now.Add(-2*time.Hour - 30*time.Second).Unix()}, Before: ×tamppb.Timestamp{Seconds: now.Add(-30*time.Minute + 30*time.Second).Unix()}, } - summaries, err := indexService.GetEventTypeSummaries(ctx, opts) + summaries, err := indexService.GetCloudEventTypeSummaries(ctx, opts) require.NoError(t, err) // Should include status2 (-2h), status3 (-1h), fp1 (-30m) require.Len(t, summaries, 2) @@ -1093,7 +1093,7 @@ func TestGetEventTypeSummaries(t *testing.T) { Subject: &wrapperspb.StringValue{Value: subjectStr}, Source: &wrapperspb.StringValue{Value: "source-B"}, } - summaries, err := indexService.GetEventTypeSummaries(ctx, opts) + summaries, err := indexService.GetCloudEventTypeSummaries(ctx, opts) require.NoError(t, err) require.Len(t, summaries, 1) assert.Equal(t, cloudevent.TypeStatus, summaries[0].Type) @@ -1109,7 +1109,7 @@ func TestGetEventTypeSummaries(t *testing.T) { opts := &grpc.SearchOptions{ Subject: &wrapperspb.StringValue{Value: otherSubject.String()}, } - summaries, err := indexService.GetEventTypeSummaries(ctx, opts) + summaries, err := indexService.GetCloudEventTypeSummaries(ctx, opts) require.NoError(t, err) require.Empty(t, summaries) }) diff --git a/pkg/eventrepo/eventrepo.go b/pkg/eventrepo/eventrepo.go index 7b84bc5..752c775 100644 --- a/pkg/eventrepo/eventrepo.go +++ b/pkg/eventrepo/eventrepo.go @@ -155,16 +155,16 @@ func (s *Service) ListIndexesAdvanced(ctx context.Context, limit int, advancedOp return cloudEvents, nil } -// EventTypeSummary holds per-type aggregate metadata for a subject. -type EventTypeSummary struct { +// CloudEventTypeSummary holds per-type aggregate metadata for a subject. +type CloudEventTypeSummary struct { Type string Count uint64 FirstSeen time.Time LastSeen time.Time } -// GetEventTypeSummaries returns per-type counts and time ranges for the given search options. -func (s *Service) GetEventTypeSummaries(ctx context.Context, opts *grpc.SearchOptions) ([]EventTypeSummary, error) { +// GetCloudEventTypeSummaries returns per-type counts and time ranges for the given search options. +func (s *Service) GetCloudEventTypeSummaries(ctx context.Context, opts *grpc.SearchOptions) ([]CloudEventTypeSummary, error) { advancedOpts := convertSearchOptionsToAdvanced(opts) mods := []qm.QueryMod{ @@ -189,9 +189,9 @@ func (s *Service) GetEventTypeSummaries(ctx context.Context, opts *grpc.SearchOp return nil, fmt.Errorf("failed to get event type summaries: %w", err) } - var summaries []EventTypeSummary + var summaries []CloudEventTypeSummary for rows.Next() { - var s EventTypeSummary + var s CloudEventTypeSummary if err := rows.Scan(&s.Type, &s.Count, &s.FirstSeen, &s.LastSeen); err != nil { _ = rows.Close() return nil, fmt.Errorf("failed to scan event type summary: %w", err) @@ -203,7 +203,7 @@ func (s *Service) GetEventTypeSummaries(ctx context.Context, opts *grpc.SearchOp return nil, fmt.Errorf("failed to iterate event type summaries: %w", err) } if summaries == nil { - summaries = []EventTypeSummary{} + summaries = []CloudEventTypeSummary{} } return summaries, nil } diff --git a/schema/base.graphqls b/schema/base.graphqls index d363410..aafb0b9 100644 --- a/schema/base.graphqls +++ b/schema/base.graphqls @@ -23,7 +23,7 @@ type CloudEvent { """ Summary of a single cloud event type for a subject. """ -type EventTypeSummary { +type CloudEventTypeSummary { """Cloud event type string (e.g. "dimo.status").""" type: String! """Number of cloud events of this type.""" @@ -59,9 +59,9 @@ type Query { cloudEvents(did: String!, limit: Int = 10, filter: CloudEventFilter): [CloudEvent!]! """ - List event types available for a subject, with counts and time ranges. + List cloud event types available for a subject, with counts and time ranges. """ - availableEvents(did: String!, filter: CloudEventFilter): [EventTypeSummary!]! + availableCloudEventTypes(did: String!, filter: CloudEventFilter): [CloudEventTypeSummary!]! } """ From c5f3a38b997b8fd4797a51ce3d5b127725e82916 Mon Sep 17 00:00:00 2001 From: Dylan Moreland Date: Fri, 20 Mar 2026 12:43:24 -0400 Subject: [PATCH 3/3] Fix this protobuf generation headache We were not putting the Go-specific binaries in ./bin, so you tended to get whatever the system had. --- Makefile | 7 ++++++- pkg/grpc/cloudevent.pb.go | 2 +- pkg/grpc/fetch-api.pb.go | 2 +- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/Makefile b/Makefile index c5151b1..6f4fa9d 100644 --- a/Makefile +++ b/Makefile @@ -95,7 +95,12 @@ endif mv $(PATHINSTBIN)/protoclib/include $(PATHINSTBIN)/ rm $(PATHINSTBIN)/protoc.zip -make tools: tools-golangci-lint tools-protoc ## install all tools +tools-protoc-plugins: ## install protoc-gen-go and protoc-gen-go-grpc from go.mod tool directive + @mkdir -p $(PATHINSTBIN) + GOBIN=$(PATHINSTBIN) go install google.golang.org/protobuf/cmd/protoc-gen-go + GOBIN=$(PATHINSTBIN) go install google.golang.org/grpc/cmd/protoc-gen-go-grpc + +make tools: tools-golangci-lint tools-protoc tools-protoc-plugins ## install all tools gqlgen: ## Generate gqlgen code. @go tool gqlgen generate diff --git a/pkg/grpc/cloudevent.pb.go b/pkg/grpc/cloudevent.pb.go index fced35c..dbeeb39 100644 --- a/pkg/grpc/cloudevent.pb.go +++ b/pkg/grpc/cloudevent.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.10 +// protoc-gen-go v1.36.11 // protoc v6.33.4 // source: pkg/grpc/cloudevent.proto diff --git a/pkg/grpc/fetch-api.pb.go b/pkg/grpc/fetch-api.pb.go index c132aa0..470c4df 100644 --- a/pkg/grpc/fetch-api.pb.go +++ b/pkg/grpc/fetch-api.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.10 +// protoc-gen-go v1.36.11 // protoc v6.33.4 // source: pkg/grpc/fetch-api.proto