diff --git a/Makefile b/Makefile index c5151b1..6f4fa9d 100644 --- a/Makefile +++ b/Makefile @@ -95,7 +95,12 @@ endif mv $(PATHINSTBIN)/protoclib/include $(PATHINSTBIN)/ rm $(PATHINSTBIN)/protoc.zip -make tools: tools-golangci-lint tools-protoc ## install all tools +tools-protoc-plugins: ## install protoc-gen-go and protoc-gen-go-grpc from go.mod tool directive + @mkdir -p $(PATHINSTBIN) + GOBIN=$(PATHINSTBIN) go install google.golang.org/protobuf/cmd/protoc-gen-go + GOBIN=$(PATHINSTBIN) go install google.golang.org/grpc/cmd/protoc-gen-go-grpc + +make tools: tools-golangci-lint tools-protoc tools-protoc-plugins ## install all tools gqlgen: ## Generate gqlgen code. @go tool gqlgen generate diff --git a/internal/graph/base.resolvers.go b/internal/graph/base.resolvers.go index 0946a91..e3fb053 100644 --- a/internal/graph/base.resolvers.go +++ b/internal/graph/base.resolvers.go @@ -114,6 +114,28 @@ func (r *queryResolver) CloudEvents(ctx context.Context, did string, limit *int, return out, nil } +// AvailableCloudEventTypes is the resolver for the availableCloudEventTypes field. +func (r *queryResolver) AvailableCloudEventTypes(ctx context.Context, did string, filter *model.CloudEventFilter) ([]*model.CloudEventTypeSummary, error) { + opts, err := r.requireSubjectOptsByDID(ctx, did, filter) + if err != nil { + return nil, err + } + summaries, err := r.EventService.GetCloudEventTypeSummaries(ctx, opts) + if err != nil { + return nil, err + } + out := make([]*model.CloudEventTypeSummary, len(summaries)) + for i, s := range summaries { + out[i] = &model.CloudEventTypeSummary{ + Type: s.Type, + Count: int(s.Count), + FirstSeen: s.FirstSeen, + LastSeen: s.LastSeen, + } + } + return out, nil +} + // CloudEvent returns CloudEventResolver implementation. func (r *Resolver) CloudEvent() CloudEventResolver { return &cloudEventResolver{r} } diff --git a/internal/graph/generated.go b/internal/graph/generated.go index 59140a5..d7b824f 100644 --- a/internal/graph/generated.go +++ b/internal/graph/generated.go @@ -74,11 +74,19 @@ type ComplexityRoot struct { IndexKey func(childComplexity int) int } + CloudEventTypeSummary struct { + Count func(childComplexity int) int + FirstSeen func(childComplexity int) int + LastSeen func(childComplexity int) int + Type func(childComplexity int) int + } + Query struct { - CloudEvents func(childComplexity int, did string, limit *int, filter *model.CloudEventFilter) int - Indexes func(childComplexity int, did string, limit *int, filter *model.CloudEventFilter) int - LatestCloudEvent func(childComplexity int, did string, filter *model.CloudEventFilter) int - LatestIndex func(childComplexity int, did string, filter *model.CloudEventFilter) int + AvailableCloudEventTypes func(childComplexity int, did string, filter *model.CloudEventFilter) int + CloudEvents func(childComplexity int, did string, limit *int, filter *model.CloudEventFilter) int + Indexes func(childComplexity int, did string, limit *int, filter *model.CloudEventFilter) int + LatestCloudEvent func(childComplexity int, did string, filter *model.CloudEventFilter) int + LatestIndex func(childComplexity int, did string, filter *model.CloudEventFilter) int } } @@ -92,6 +100,7 @@ type QueryResolver interface { Indexes(ctx context.Context, did string, limit *int, filter *model.CloudEventFilter) ([]*model.CloudEventIndex, error) LatestCloudEvent(ctx context.Context, did string, filter *model.CloudEventFilter) (*CloudEventWrapper, error) CloudEvents(ctx context.Context, did string, limit *int, filter *model.CloudEventFilter) ([]*CloudEventWrapper, error) + AvailableCloudEventTypes(ctx context.Context, did string, filter *model.CloudEventFilter) ([]*model.CloudEventTypeSummary, error) } type executableSchema struct { @@ -218,6 +227,42 @@ func (e *executableSchema) Complexity(ctx context.Context, typeName, field strin return e.complexity.CloudEventIndex.IndexKey(childComplexity), true + case "CloudEventTypeSummary.count": + if e.complexity.CloudEventTypeSummary.Count == nil { + break + } + + return e.complexity.CloudEventTypeSummary.Count(childComplexity), true + case "CloudEventTypeSummary.firstSeen": + if e.complexity.CloudEventTypeSummary.FirstSeen == nil { + break + } + + return e.complexity.CloudEventTypeSummary.FirstSeen(childComplexity), true + case "CloudEventTypeSummary.lastSeen": + if e.complexity.CloudEventTypeSummary.LastSeen == nil { + break + } + + return e.complexity.CloudEventTypeSummary.LastSeen(childComplexity), true + case "CloudEventTypeSummary.type": + if e.complexity.CloudEventTypeSummary.Type == nil { + break + } + + return e.complexity.CloudEventTypeSummary.Type(childComplexity), true + + case "Query.availableCloudEventTypes": + if e.complexity.Query.AvailableCloudEventTypes == nil { + break + } + + args, err := ec.field_Query_availableCloudEventTypes_args(ctx, rawArgs) + if err != nil { + return 0, false + } + + return e.complexity.Query.AvailableCloudEventTypes(childComplexity, args["did"].(string), args["filter"].(*model.CloudEventFilter)), true case "Query.cloudEvents": if e.complexity.Query.CloudEvents == nil { break @@ -376,6 +421,20 @@ type CloudEvent { dataBase64: String } +""" +Summary of a single cloud event type for a subject. +""" +type CloudEventTypeSummary { + """Cloud event type string (e.g. "dimo.status").""" + type: String! + """Number of cloud events of this type.""" + count: Int! + """Earliest event timestamp.""" + firstSeen: Time! + """Latest event timestamp.""" + lastSeen: Time! +} + """ The root query type for the Fetch API GraphQL schema. ERC721 DID (e.g. did:eth:chainId:contract:tokenId). """ @@ -399,6 +458,11 @@ type Query { List full cloud events. """ cloudEvents(did: String!, limit: Int = 10, filter: CloudEventFilter): [CloudEvent!]! + + """ + List cloud event types available for a subject, with counts and time ranges. + """ + availableCloudEventTypes(did: String!, filter: CloudEventFilter): [CloudEventTypeSummary!]! } """ @@ -458,6 +522,22 @@ func (ec *executionContext) field_Query___type_args(ctx context.Context, rawArgs return args, nil } +func (ec *executionContext) field_Query_availableCloudEventTypes_args(ctx context.Context, rawArgs map[string]any) (map[string]any, error) { + var err error + args := map[string]any{} + arg0, err := graphql.ProcessArgField(ctx, rawArgs, "did", ec.unmarshalNString2string) + if err != nil { + return nil, err + } + args["did"] = arg0 + arg1, err := graphql.ProcessArgField(ctx, rawArgs, "filter", ec.unmarshalOCloudEventFilter2ᚖgithubᚗcomᚋDIMOᚑNetworkᚋfetchᚑapiᚋinternalᚋgraphᚋmodelᚐCloudEventFilter) + if err != nil { + return nil, err + } + args["filter"] = arg1 + return args, nil +} + func (ec *executionContext) field_Query_cloudEvents_args(ctx context.Context, rawArgs map[string]any) (map[string]any, error) { var err error args := map[string]any{} @@ -1129,6 +1209,122 @@ func (ec *executionContext) fieldContext_CloudEventIndex_indexKey(_ context.Cont return fc, nil } +func (ec *executionContext) _CloudEventTypeSummary_type(ctx context.Context, field graphql.CollectedField, obj *model.CloudEventTypeSummary) (ret graphql.Marshaler) { + return graphql.ResolveField( + ctx, + ec.OperationContext, + field, + ec.fieldContext_CloudEventTypeSummary_type, + func(ctx context.Context) (any, error) { + return obj.Type, nil + }, + nil, + ec.marshalNString2string, + true, + true, + ) +} + +func (ec *executionContext) fieldContext_CloudEventTypeSummary_type(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "CloudEventTypeSummary", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type String does not have child fields") + }, + } + return fc, nil +} + +func (ec *executionContext) _CloudEventTypeSummary_count(ctx context.Context, field graphql.CollectedField, obj *model.CloudEventTypeSummary) (ret graphql.Marshaler) { + return graphql.ResolveField( + ctx, + ec.OperationContext, + field, + ec.fieldContext_CloudEventTypeSummary_count, + func(ctx context.Context) (any, error) { + return obj.Count, nil + }, + nil, + ec.marshalNInt2int, + true, + true, + ) +} + +func (ec *executionContext) fieldContext_CloudEventTypeSummary_count(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "CloudEventTypeSummary", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type Int does not have child fields") + }, + } + return fc, nil +} + +func (ec *executionContext) _CloudEventTypeSummary_firstSeen(ctx context.Context, field graphql.CollectedField, obj *model.CloudEventTypeSummary) (ret graphql.Marshaler) { + return graphql.ResolveField( + ctx, + ec.OperationContext, + field, + ec.fieldContext_CloudEventTypeSummary_firstSeen, + func(ctx context.Context) (any, error) { + return obj.FirstSeen, nil + }, + nil, + ec.marshalNTime2timeᚐTime, + true, + true, + ) +} + +func (ec *executionContext) fieldContext_CloudEventTypeSummary_firstSeen(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "CloudEventTypeSummary", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type Time does not have child fields") + }, + } + return fc, nil +} + +func (ec *executionContext) _CloudEventTypeSummary_lastSeen(ctx context.Context, field graphql.CollectedField, obj *model.CloudEventTypeSummary) (ret graphql.Marshaler) { + return graphql.ResolveField( + ctx, + ec.OperationContext, + field, + ec.fieldContext_CloudEventTypeSummary_lastSeen, + func(ctx context.Context) (any, error) { + return obj.LastSeen, nil + }, + nil, + ec.marshalNTime2timeᚐTime, + true, + true, + ) +} + +func (ec *executionContext) fieldContext_CloudEventTypeSummary_lastSeen(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "CloudEventTypeSummary", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type Time does not have child fields") + }, + } + return fc, nil +} + func (ec *executionContext) _Query_latestIndex(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) { return graphql.ResolveField( ctx, @@ -1321,6 +1517,57 @@ func (ec *executionContext) fieldContext_Query_cloudEvents(ctx context.Context, return fc, nil } +func (ec *executionContext) _Query_availableCloudEventTypes(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) { + return graphql.ResolveField( + ctx, + ec.OperationContext, + field, + ec.fieldContext_Query_availableCloudEventTypes, + func(ctx context.Context) (any, error) { + fc := graphql.GetFieldContext(ctx) + return ec.resolvers.Query().AvailableCloudEventTypes(ctx, fc.Args["did"].(string), fc.Args["filter"].(*model.CloudEventFilter)) + }, + nil, + ec.marshalNCloudEventTypeSummary2ᚕᚖgithubᚗcomᚋDIMOᚑNetworkᚋfetchᚑapiᚋinternalᚋgraphᚋmodelᚐCloudEventTypeSummaryᚄ, + true, + true, + ) +} + +func (ec *executionContext) fieldContext_Query_availableCloudEventTypes(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "Query", + Field: field, + IsMethod: true, + IsResolver: true, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + switch field.Name { + case "type": + return ec.fieldContext_CloudEventTypeSummary_type(ctx, field) + case "count": + return ec.fieldContext_CloudEventTypeSummary_count(ctx, field) + case "firstSeen": + return ec.fieldContext_CloudEventTypeSummary_firstSeen(ctx, field) + case "lastSeen": + return ec.fieldContext_CloudEventTypeSummary_lastSeen(ctx, field) + } + return nil, fmt.Errorf("no field named %q was found under type CloudEventTypeSummary", field.Name) + }, + } + defer func() { + if r := recover(); r != nil { + err = ec.Recover(ctx, r) + ec.Error(ctx, err) + } + }() + ctx = graphql.WithFieldContext(ctx, fc) + if fc.Args, err = ec.field_Query_availableCloudEventTypes_args(ctx, field.ArgumentMap(ec.Variables)); err != nil { + ec.Error(ctx, err) + return fc, err + } + return fc, nil +} + func (ec *executionContext) _Query___type(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) { return graphql.ResolveField( ctx, @@ -3214,6 +3461,60 @@ func (ec *executionContext) _CloudEventIndex(ctx context.Context, sel ast.Select return out } +var cloudEventTypeSummaryImplementors = []string{"CloudEventTypeSummary"} + +func (ec *executionContext) _CloudEventTypeSummary(ctx context.Context, sel ast.SelectionSet, obj *model.CloudEventTypeSummary) graphql.Marshaler { + fields := graphql.CollectFields(ec.OperationContext, sel, cloudEventTypeSummaryImplementors) + + out := graphql.NewFieldSet(fields) + deferred := make(map[string]*graphql.FieldSet) + for i, field := range fields { + switch field.Name { + case "__typename": + out.Values[i] = graphql.MarshalString("CloudEventTypeSummary") + case "type": + out.Values[i] = ec._CloudEventTypeSummary_type(ctx, field, obj) + if out.Values[i] == graphql.Null { + out.Invalids++ + } + case "count": + out.Values[i] = ec._CloudEventTypeSummary_count(ctx, field, obj) + if out.Values[i] == graphql.Null { + out.Invalids++ + } + case "firstSeen": + out.Values[i] = ec._CloudEventTypeSummary_firstSeen(ctx, field, obj) + if out.Values[i] == graphql.Null { + out.Invalids++ + } + case "lastSeen": + out.Values[i] = ec._CloudEventTypeSummary_lastSeen(ctx, field, obj) + if out.Values[i] == graphql.Null { + out.Invalids++ + } + default: + panic("unknown field " + strconv.Quote(field.Name)) + } + } + out.Dispatch(ctx) + if out.Invalids > 0 { + return graphql.Null + } + + atomic.AddInt32(&ec.deferred, int32(len(deferred))) + + for label, dfs := range deferred { + ec.processDeferredGroup(graphql.DeferredGroup{ + Label: label, + Path: graphql.GetPath(ctx), + FieldSet: dfs, + Context: ctx, + }) + } + + return out +} + var queryImplementors = []string{"Query"} func (ec *executionContext) _Query(ctx context.Context, sel ast.SelectionSet) graphql.Marshaler { @@ -3320,6 +3621,28 @@ func (ec *executionContext) _Query(ctx context.Context, sel ast.SelectionSet) gr func(ctx context.Context) graphql.Marshaler { return innerFunc(ctx, out) }) } + out.Concurrently(i, func(ctx context.Context) graphql.Marshaler { return rrm(innerCtx) }) + case "availableCloudEventTypes": + field := field + + innerFunc := func(ctx context.Context, fs *graphql.FieldSet) (res graphql.Marshaler) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + } + }() + res = ec._Query_availableCloudEventTypes(ctx, field) + if res == graphql.Null { + atomic.AddUint32(&fs.Invalids, 1) + } + return res + } + + rrm := func(ctx context.Context) graphql.Marshaler { + return ec.OperationContext.RootResolverMiddleware(ctx, + func(ctx context.Context) graphql.Marshaler { return innerFunc(ctx, out) }) + } + out.Concurrently(i, func(ctx context.Context) graphql.Marshaler { return rrm(innerCtx) }) case "__type": out.Values[i] = ec.OperationContext.RootResolverMiddleware(innerCtx, func(ctx context.Context) (res graphql.Marshaler) { @@ -3833,6 +4156,76 @@ func (ec *executionContext) marshalNCloudEventIndex2ᚖgithubᚗcomᚋDIMOᚑNet return ec._CloudEventIndex(ctx, sel, v) } +func (ec *executionContext) marshalNCloudEventTypeSummary2ᚕᚖgithubᚗcomᚋDIMOᚑNetworkᚋfetchᚑapiᚋinternalᚋgraphᚋmodelᚐCloudEventTypeSummaryᚄ(ctx context.Context, sel ast.SelectionSet, v []*model.CloudEventTypeSummary) graphql.Marshaler { + ret := make(graphql.Array, len(v)) + var wg sync.WaitGroup + isLen1 := len(v) == 1 + if !isLen1 { + wg.Add(len(v)) + } + for i := range v { + i := i + fc := &graphql.FieldContext{ + Index: &i, + Result: &v[i], + } + ctx := graphql.WithFieldContext(ctx, fc) + f := func(i int) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = nil + } + }() + if !isLen1 { + defer wg.Done() + } + ret[i] = ec.marshalNCloudEventTypeSummary2ᚖgithubᚗcomᚋDIMOᚑNetworkᚋfetchᚑapiᚋinternalᚋgraphᚋmodelᚐCloudEventTypeSummary(ctx, sel, v[i]) + } + if isLen1 { + f(i) + } else { + go f(i) + } + + } + wg.Wait() + + for _, e := range ret { + if e == graphql.Null { + return graphql.Null + } + } + + return ret +} + +func (ec *executionContext) marshalNCloudEventTypeSummary2ᚖgithubᚗcomᚋDIMOᚑNetworkᚋfetchᚑapiᚋinternalᚋgraphᚋmodelᚐCloudEventTypeSummary(ctx context.Context, sel ast.SelectionSet, v *model.CloudEventTypeSummary) graphql.Marshaler { + if v == nil { + if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) { + graphql.AddErrorf(ctx, "the requested element is null which the schema does not allow") + } + return graphql.Null + } + return ec._CloudEventTypeSummary(ctx, sel, v) +} + +func (ec *executionContext) unmarshalNInt2int(ctx context.Context, v any) (int, error) { + res, err := graphql.UnmarshalInt(v) + return res, graphql.ErrorOnPath(ctx, err) +} + +func (ec *executionContext) marshalNInt2int(ctx context.Context, sel ast.SelectionSet, v int) graphql.Marshaler { + _ = sel + res := graphql.MarshalInt(v) + if res == graphql.Null { + if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) { + graphql.AddErrorf(ctx, "the requested element is null which the schema does not allow") + } + } + return res +} + func (ec *executionContext) unmarshalNString2string(ctx context.Context, v any) (string, error) { res, err := graphql.UnmarshalString(v) return res, graphql.ErrorOnPath(ctx, err) diff --git a/internal/graph/model/models_gen.go b/internal/graph/model/models_gen.go index 82fe636..17c4017 100644 --- a/internal/graph/model/models_gen.go +++ b/internal/graph/model/models_gen.go @@ -25,6 +25,18 @@ type CloudEventIndex struct { IndexKey string `json:"indexKey"` } +// Summary of a single cloud event type for a subject. +type CloudEventTypeSummary struct { + // Cloud event type string (e.g. "dimo.status"). + Type string `json:"type"` + // Number of cloud events of this type. + Count int `json:"count"` + // Earliest event timestamp. + FirstSeen time.Time `json:"firstSeen"` + // Latest event timestamp. + LastSeen time.Time `json:"lastSeen"` +} + // The root query type for the Fetch API GraphQL schema. ERC721 DID (e.g. did:eth:chainId:contract:tokenId). type Query struct { } diff --git a/pkg/eventrepo/event_repo_test.go b/pkg/eventrepo/event_repo_test.go index 5838ce8..75a879a 100644 --- a/pkg/eventrepo/event_repo_test.go +++ b/pkg/eventrepo/event_repo_test.go @@ -981,6 +981,140 @@ func TestListIndexesAdvanced(t *testing.T) { } } +// TestGetCloudEventTypeSummaries tests the GetCloudEventTypeSummaries function. +func TestGetCloudEventTypeSummaries(t *testing.T) { + t.Parallel() + chContainer := setupClickHouseContainer(t) + + conn, err := chContainer.GetClickHouseAsConn() + require.NoError(t, err) + ctx := context.Background() + + contractAddr := randAddress() + deviceTokenID := big.NewInt(555555) + subject := cloudevent.ERC721DID{ + ChainID: 153, + ContractAddress: contractAddr, + TokenID: deviceTokenID, + } + subjectStr := subject.String() + + now := time.Now().UTC().Truncate(time.Second) + + // Insert 3 status events and 1 fingerprint event + status1 := &cloudevent.CloudEventHeader{ + Subject: subjectStr, + Type: cloudevent.TypeStatus, + Source: "source-A", + Time: now.Add(-3 * time.Hour), + } + status2 := &cloudevent.CloudEventHeader{ + Subject: subjectStr, + Type: cloudevent.TypeStatus, + Source: "source-A", + Time: now.Add(-2 * time.Hour), + } + status3 := &cloudevent.CloudEventHeader{ + Subject: subjectStr, + Type: cloudevent.TypeStatus, + Source: "source-B", + Time: now.Add(-1 * time.Hour), + } + fp1 := &cloudevent.CloudEventHeader{ + Subject: subjectStr, + Type: cloudevent.TypeFingerprint, + Source: "source-A", + Time: now.Add(-30 * time.Minute), + } + + insertTestData(t, ctx, conn, status1) + insertTestData(t, ctx, conn, status2) + insertTestData(t, ctx, conn, status3) + insertTestData(t, ctx, conn, fp1) + + indexService := eventrepo.New(conn, nil, "") + + t.Run("no filter returns all types", func(t *testing.T) { + opts := &grpc.SearchOptions{ + Subject: &wrapperspb.StringValue{Value: subjectStr}, + } + summaries, err := indexService.GetCloudEventTypeSummaries(ctx, opts) + require.NoError(t, err) + require.Len(t, summaries, 2) + + // Results ordered by event_type; fingerprint < status alphabetically + fpSummary := summaries[0] + assert.Equal(t, cloudevent.TypeFingerprint, fpSummary.Type) + assert.Equal(t, uint64(1), fpSummary.Count) + assert.Equal(t, fp1.Time.UTC().Truncate(time.Second), fpSummary.FirstSeen.UTC().Truncate(time.Second)) + assert.Equal(t, fp1.Time.UTC().Truncate(time.Second), fpSummary.LastSeen.UTC().Truncate(time.Second)) + + statusSummary := summaries[1] + assert.Equal(t, cloudevent.TypeStatus, statusSummary.Type) + assert.Equal(t, uint64(3), statusSummary.Count) + assert.Equal(t, status1.Time.UTC().Truncate(time.Second), statusSummary.FirstSeen.UTC().Truncate(time.Second)) + assert.Equal(t, status3.Time.UTC().Truncate(time.Second), statusSummary.LastSeen.UTC().Truncate(time.Second)) + }) + + t.Run("type filter", func(t *testing.T) { + opts := &grpc.SearchOptions{ + Subject: &wrapperspb.StringValue{Value: subjectStr}, + Type: &wrapperspb.StringValue{Value: cloudevent.TypeStatus}, + } + summaries, err := indexService.GetCloudEventTypeSummaries(ctx, opts) + require.NoError(t, err) + require.Len(t, summaries, 1) + assert.Equal(t, cloudevent.TypeStatus, summaries[0].Type) + assert.Equal(t, uint64(3), summaries[0].Count) + }) + + t.Run("time range filter", func(t *testing.T) { + opts := &grpc.SearchOptions{ + Subject: &wrapperspb.StringValue{Value: subjectStr}, + After: ×tamppb.Timestamp{Seconds: now.Add(-2*time.Hour - 30*time.Second).Unix()}, + Before: ×tamppb.Timestamp{Seconds: now.Add(-30*time.Minute + 30*time.Second).Unix()}, + } + summaries, err := indexService.GetCloudEventTypeSummaries(ctx, opts) + require.NoError(t, err) + // Should include status2 (-2h), status3 (-1h), fp1 (-30m) + require.Len(t, summaries, 2) + + fpSummary := summaries[0] + assert.Equal(t, cloudevent.TypeFingerprint, fpSummary.Type) + assert.Equal(t, uint64(1), fpSummary.Count) + + statusSummary := summaries[1] + assert.Equal(t, cloudevent.TypeStatus, statusSummary.Type) + assert.Equal(t, uint64(2), statusSummary.Count) + }) + + t.Run("source filter", func(t *testing.T) { + opts := &grpc.SearchOptions{ + Subject: &wrapperspb.StringValue{Value: subjectStr}, + Source: &wrapperspb.StringValue{Value: "source-B"}, + } + summaries, err := indexService.GetCloudEventTypeSummaries(ctx, opts) + require.NoError(t, err) + require.Len(t, summaries, 1) + assert.Equal(t, cloudevent.TypeStatus, summaries[0].Type) + assert.Equal(t, uint64(1), summaries[0].Count) + }) + + t.Run("no matching events returns empty slice", func(t *testing.T) { + otherSubject := cloudevent.ERC721DID{ + ChainID: 153, + ContractAddress: contractAddr, + TokenID: big.NewInt(999999), + } + opts := &grpc.SearchOptions{ + Subject: &wrapperspb.StringValue{Value: otherSubject.String()}, + } + summaries, err := indexService.GetCloudEventTypeSummaries(ctx, opts) + require.NoError(t, err) + require.Empty(t, summaries) + }) +} + func randAddress() common.Address { privateKey, err := crypto.GenerateKey() if err != nil { diff --git a/pkg/eventrepo/eventrepo.go b/pkg/eventrepo/eventrepo.go index a18de4d..752c775 100644 --- a/pkg/eventrepo/eventrepo.go +++ b/pkg/eventrepo/eventrepo.go @@ -8,6 +8,7 @@ import ( "encoding/json" "fmt" "strings" + "time" "github.com/ClickHouse/clickhouse-go/v2" "github.com/DIMO-Network/cloudevent" @@ -154,6 +155,59 @@ func (s *Service) ListIndexesAdvanced(ctx context.Context, limit int, advancedOp return cloudEvents, nil } +// CloudEventTypeSummary holds per-type aggregate metadata for a subject. +type CloudEventTypeSummary struct { + Type string + Count uint64 + FirstSeen time.Time + LastSeen time.Time +} + +// GetCloudEventTypeSummaries returns per-type counts and time ranges for the given search options. +func (s *Service) GetCloudEventTypeSummaries(ctx context.Context, opts *grpc.SearchOptions) ([]CloudEventTypeSummary, error) { + advancedOpts := convertSearchOptionsToAdvanced(opts) + + mods := []qm.QueryMod{ + qm.Select( + chindexer.TypeColumn, + "count(*) AS count", + "MIN("+chindexer.TimestampColumn+") AS first_seen", + "MAX("+chindexer.TimestampColumn+") AS last_seen", + ), + qm.From(chindexer.TableName), + qm.GroupBy(chindexer.TypeColumn), + qm.OrderBy(chindexer.TypeColumn), + } + + if advancedOpts != nil { + mods = append(mods, AdvancedSearchOptionsToQueryMod(advancedOpts)...) + } + + query, args := newQuery(mods...) + rows, err := s.chConn.Query(ctx, query, args...) + if err != nil { + return nil, fmt.Errorf("failed to get event type summaries: %w", err) + } + + var summaries []CloudEventTypeSummary + for rows.Next() { + var s CloudEventTypeSummary + if err := rows.Scan(&s.Type, &s.Count, &s.FirstSeen, &s.LastSeen); err != nil { + _ = rows.Close() + return nil, fmt.Errorf("failed to scan event type summary: %w", err) + } + summaries = append(summaries, s) + } + _ = rows.Close() + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("failed to iterate event type summaries: %w", err) + } + if summaries == nil { + summaries = []CloudEventTypeSummary{} + } + return summaries, nil +} + // ListCloudEvents fetches and returns the cloud events that match the given options. func (s *Service) ListCloudEvents(ctx context.Context, bucketName string, limit int, opts *grpc.SearchOptions) ([]cloudevent.RawEvent, error) { advancedOpts := convertSearchOptionsToAdvanced(opts) diff --git a/pkg/grpc/cloudevent.pb.go b/pkg/grpc/cloudevent.pb.go index fced35c..dbeeb39 100644 --- a/pkg/grpc/cloudevent.pb.go +++ b/pkg/grpc/cloudevent.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.10 +// protoc-gen-go v1.36.11 // protoc v6.33.4 // source: pkg/grpc/cloudevent.proto diff --git a/pkg/grpc/fetch-api.pb.go b/pkg/grpc/fetch-api.pb.go index c132aa0..470c4df 100644 --- a/pkg/grpc/fetch-api.pb.go +++ b/pkg/grpc/fetch-api.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.10 +// protoc-gen-go v1.36.11 // protoc v6.33.4 // source: pkg/grpc/fetch-api.proto diff --git a/schema/base.graphqls b/schema/base.graphqls index 91b552e..aafb0b9 100644 --- a/schema/base.graphqls +++ b/schema/base.graphqls @@ -20,6 +20,20 @@ type CloudEvent { dataBase64: String } +""" +Summary of a single cloud event type for a subject. +""" +type CloudEventTypeSummary { + """Cloud event type string (e.g. "dimo.status").""" + type: String! + """Number of cloud events of this type.""" + count: Int! + """Earliest event timestamp.""" + firstSeen: Time! + """Latest event timestamp.""" + lastSeen: Time! +} + """ The root query type for the Fetch API GraphQL schema. ERC721 DID (e.g. did:eth:chainId:contract:tokenId). """ @@ -43,6 +57,11 @@ type Query { List full cloud events. """ cloudEvents(did: String!, limit: Int = 10, filter: CloudEventFilter): [CloudEvent!]! + + """ + List cloud event types available for a subject, with counts and time ranges. + """ + availableCloudEventTypes(did: String!, filter: CloudEventFilter): [CloudEventTypeSummary!]! } """