diff --git a/go.mod b/go.mod index 094b91e..07d8a31 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/99designs/gqlgen v0.17.86 github.com/ClickHouse/clickhouse-go/v2 v2.43.0 github.com/DIMO-Network/clickhouse-infra v0.0.7 - github.com/DIMO-Network/cloudevent v0.2.5 + github.com/DIMO-Network/cloudevent v0.2.7 github.com/DIMO-Network/server-garage v0.0.7 github.com/DIMO-Network/shared v1.1.7 github.com/DIMO-Network/token-exchange-api v0.4.0 diff --git a/go.sum b/go.sum index 8611027..7432874 100644 --- a/go.sum +++ b/go.sum @@ -16,8 +16,8 @@ github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7Oputl github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU= github.com/DIMO-Network/clickhouse-infra v0.0.7 h1:TAsjkFFKu3D5Xg6dwBcRBryjCVSlXsNjVbTwJ4UDlTg= github.com/DIMO-Network/clickhouse-infra v0.0.7/go.mod h1:XS80lhSJNWBWGgZ+m4j7++zFj1wAXfmtV2gJfhGlabQ= -github.com/DIMO-Network/cloudevent v0.2.5 h1:3s6ZKtBXHYy0ujMYBdOBF9uJAc5C+EKEF7/SToW2cJA= -github.com/DIMO-Network/cloudevent v0.2.5/go.mod h1:zFG6pf7ejwC0y3//mHpISDHR/blsSz8Lew2c4ebc8Lw= +github.com/DIMO-Network/cloudevent v0.2.7 h1:/cgFhUcWcliZYrmITkB8oIZb+zDhZvYNxWVGS2D3894= +github.com/DIMO-Network/cloudevent v0.2.7/go.mod h1:I/9NcpMozV5Fw194WimhbkAsJtKVZf5UKYJ9hgc8Cdg= github.com/DIMO-Network/server-garage v0.0.7 h1:kOBVyOtIbxa1x9pAf1epABTb9l/U3khf0PwUaHeHiKs= github.com/DIMO-Network/server-garage v0.0.7/go.mod h1:7DFor8MMJ8fLv9EB16Z5LrN+ftW3qeIk+swpkT7F2cU= github.com/DIMO-Network/shared v1.1.7 h1:5Ex8bZ6BpOjcLj4u7n5Kih1Ho6b9BVJsKpKn4iU2EaM= diff --git a/internal/graph/convert_test.go b/internal/graph/convert_test.go index 2717259..14ee144 100644 --- a/internal/graph/convert_test.go +++ b/internal/graph/convert_test.go @@ -22,6 +22,7 @@ func TestIndexToModel(t *testing.T) { Type: "dimo.status", DataContentType: "application/json", DataVersion: "r/v0/s", + RawEventID: "raw-event-123", }, Data: eventrepo.ObjectInfo{Key: "s3://bucket/key"}, } @@ -33,6 +34,7 @@ func TestIndexToModel(t *testing.T) { assert.Equal(t, "dimo.status", out.Header.Type) assert.Equal(t, "application/json", out.Header.DataContentType) assert.Equal(t, "r/v0/s", out.Header.DataVersion) + assert.Equal(t, "raw-event-123", out.Header.RawEventID) assert.Equal(t, "s3://bucket/key", out.IndexKey) }) @@ -51,5 +53,6 @@ func TestIndexToModel(t *testing.T) { assert.Empty(t, out.Header.DataContentType) assert.Empty(t, out.Header.DataVersion) assert.Empty(t, out.Header.Signature) + assert.Empty(t, out.Header.RawEventID) }) } diff --git a/internal/graph/generated.go b/internal/graph/generated.go index 596c305..7e13e7d 100644 --- a/internal/graph/generated.go +++ b/internal/graph/generated.go @@ -61,6 +61,7 @@ type ComplexityRoot struct { DataVersion func(childComplexity int) int ID func(childComplexity int) int Producer func(childComplexity int) int + RawEventID func(childComplexity int) int Signature func(childComplexity int) int Source func(childComplexity int) int SpecVersion func(childComplexity int) int @@ -178,6 +179,12 @@ func (e *executableSchema) Complexity(ctx context.Context, typeName, field strin } return e.complexity.CloudEventHeader.Producer(childComplexity), true + case "CloudEventHeader.raweventid": + if e.complexity.CloudEventHeader.RawEventID == nil { + break + } + + return e.complexity.CloudEventHeader.RawEventID(childComplexity), true case "CloudEventHeader.signature": if e.complexity.CloudEventHeader.Signature == nil { break @@ -489,6 +496,7 @@ type CloudEventHeader { dataversion: String producer: String! signature: String + raweventid: String tags: [String!]! } @@ -721,6 +729,8 @@ func (ec *executionContext) fieldContext_CloudEvent_header(_ context.Context, fi return ec.fieldContext_CloudEventHeader_producer(ctx, field) case "signature": return ec.fieldContext_CloudEventHeader_signature(ctx, field) + case "raweventid": + return ec.fieldContext_CloudEventHeader_raweventid(ctx, field) case "tags": return ec.fieldContext_CloudEventHeader_tags(ctx, field) } @@ -1136,6 +1146,35 @@ func (ec *executionContext) fieldContext_CloudEventHeader_signature(_ context.Co return fc, nil } +func (ec *executionContext) _CloudEventHeader_raweventid(ctx context.Context, field graphql.CollectedField, obj *cloudevent.CloudEventHeader) (ret graphql.Marshaler) { + return graphql.ResolveField( + ctx, + ec.OperationContext, + field, + ec.fieldContext_CloudEventHeader_raweventid, + func(ctx context.Context) (any, error) { + return obj.RawEventID, nil + }, + nil, + ec.marshalOString2string, + true, + false, + ) +} + +func (ec *executionContext) fieldContext_CloudEventHeader_raweventid(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "CloudEventHeader", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type String does not have child fields") + }, + } + return fc, nil +} + func (ec *executionContext) _CloudEventHeader_tags(ctx context.Context, field graphql.CollectedField, obj *cloudevent.CloudEventHeader) (ret graphql.Marshaler) { return graphql.ResolveField( ctx, @@ -1211,6 +1250,8 @@ func (ec *executionContext) fieldContext_CloudEventIndex_header(_ context.Contex return ec.fieldContext_CloudEventHeader_producer(ctx, field) case "signature": return ec.fieldContext_CloudEventHeader_signature(ctx, field) + case "raweventid": + return ec.fieldContext_CloudEventHeader_raweventid(ctx, field) case "tags": return ec.fieldContext_CloudEventHeader_tags(ctx, field) } @@ -3442,6 +3483,8 @@ func (ec *executionContext) _CloudEventHeader(ctx context.Context, sel ast.Selec } case "signature": out.Values[i] = ec._CloudEventHeader_signature(ctx, field, obj) + case "raweventid": + out.Values[i] = ec._CloudEventHeader_raweventid(ctx, field, obj) case "tags": out.Values[i] = ec._CloudEventHeader_tags(ctx, field, obj) if out.Values[i] == graphql.Null { diff --git a/pkg/eventrepo/event_repo_test.go b/pkg/eventrepo/event_repo_test.go index 83061d8..1437b8f 100644 --- a/pkg/eventrepo/event_repo_test.go +++ b/pkg/eventrepo/event_repo_test.go @@ -399,6 +399,7 @@ func TestGetEventWithAllHeaderFields(t *testing.T) { DataVersion: dataType, SpecVersion: cloudevent.SpecVersion, Signature: "0x1234567890", + RawEventID: "raw-event-id-123", Tags: []string{"tests.tag1", "tests.tag2"}, Extras: map[string]any{ "extraField": "extra-value", @@ -458,6 +459,7 @@ func TestGetEventWithAllHeaderFields(t *testing.T) { assert.Equal(t, fullHeaderEvent.DataVersion, retrievedEvent.DataVersion, "DataVersion mismatch") assert.Equal(t, cloudevent.SpecVersion, retrievedEvent.SpecVersion, "SpecVersion mismatch") assert.Equal(t, fullHeaderEvent.Signature, retrievedEvent.Signature, "Signature mismatch") + assert.Equal(t, fullHeaderEvent.RawEventID, retrievedEvent.RawEventID, "RawEventID mismatch") assert.Equal(t, fullHeaderEvent.Tags, retrievedEvent.Tags, "Tags mismatch") // Verify extras @@ -492,6 +494,37 @@ func TestGetEventWithAllHeaderFields(t *testing.T) { assert.Equal(t, "0x09876543210", retrievedEvent.Signature, "Signature field not set correctly") assert.Nil(t, retrievedEvent.Extras["signature"], "Signature should not be in extras") }) + + t.Run("retrieve event with raw event id that is originally in extras", func(t *testing.T) { + fullHeaderEvent3 := fullHeaderEvent + fullHeaderEvent3.Subject = eventDID2.String() + fullHeaderEvent3.RawEventID = "" + fullHeaderEvent3.Extras = map[string]any{ + "raweventid": "raw-event-id-from-extras", + } + indexKey3 := insertTestData(t, ctx, conn, &fullHeaderEvent3) + + mockS3Client.EXPECT().GetObject(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) { + require.Equal(t, indexKey3, *params.Key) + return &s3.GetObjectOutput{ + Body: io.NopCloser(bytes.NewReader(eventDataEnvelope)), + ContentLength: ref(int64(len(eventDataEnvelope))), + }, nil + }, + ) + opts := &grpc.SearchOptions{ + DataVersion: &wrapperspb.StringValue{Value: dataType}, + Subject: &wrapperspb.StringValue{Value: eventDID2.String()}, + } + + retrievedEvent, err := indexService.GetLatestCloudEvent(ctx, "test-bucket", opts) + require.NoError(t, err) + + assert.Equal(t, fullHeaderEvent3.ID, retrievedEvent.ID, "ID mismatch") + assert.Equal(t, "raw-event-id-from-extras", retrievedEvent.RawEventID, "RawEventID field not set correctly") + assert.Nil(t, retrievedEvent.Extras["raweventid"], "RawEventID should not be in extras") + }) } func ref[T any](x T) *T { diff --git a/pkg/grpc/cloudevent.pb.go b/pkg/grpc/cloudevent.pb.go index dbeeb39..48f41ff 100644 --- a/pkg/grpc/cloudevent.pb.go +++ b/pkg/grpc/cloudevent.pb.go @@ -112,7 +112,9 @@ type CloudEventHeader struct { // Tags are a list of tags that can be used to filter the events. Tags []string `protobuf:"bytes,12,rep,name=tags,proto3" json:"tags,omitempty"` // Signature hold the signature of the a cloudevent's data field. - Signature string `protobuf:"bytes,13,opt,name=signature,proto3" json:"signature,omitempty"` + Signature string `protobuf:"bytes,13,opt,name=signature,proto3" json:"signature,omitempty"` + // RawEventID links a parsed event to the backing raw event when present. + RawEventId string `protobuf:"bytes,14,opt,name=raw_event_id,json=rawEventId,proto3" json:"raw_event_id,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -238,6 +240,13 @@ func (x *CloudEventHeader) GetSignature() string { return "" } +func (x *CloudEventHeader) GetRawEventId() string { + if x != nil { + return x.RawEventId + } + return "" +} + var File_pkg_grpc_cloudevent_proto protoreflect.FileDescriptor const file_pkg_grpc_cloudevent_proto_rawDesc = "" + @@ -247,7 +256,7 @@ const file_pkg_grpc_cloudevent_proto_rawDesc = "" + "\n" + "CloudEvent\x124\n" + "\x06header\x18\x01 \x01(\v2\x1c.cloudevent.CloudEventHeaderR\x06header\x12\x12\n" + - "\x04data\x18\x02 \x01(\fR\x04data\"\xf6\x03\n" + + "\x04data\x18\x02 \x01(\fR\x04data\"\x98\x04\n" + "\x10CloudEventHeader\x12\x0e\n" + "\x02id\x18\x01 \x01(\tR\x02id\x12\x16\n" + "\x06source\x18\x02 \x01(\tR\x06source\x12\x1a\n" + @@ -263,7 +272,9 @@ const file_pkg_grpc_cloudevent_proto_rawDesc = "" + " \x01(\tR\vdataVersion\x12@\n" + "\x06extras\x18\v \x03(\v2(.cloudevent.CloudEventHeader.ExtrasEntryR\x06extras\x12\x12\n" + "\x04tags\x18\f \x03(\tR\x04tags\x12\x1c\n" + - "\tsignature\x18\r \x01(\tR\tsignature\x1a9\n" + + "\tsignature\x18\r \x01(\tR\tsignature\x12 \n" + + "\fraw_event_id\x18\x0e \x01(\tR\n" + + "rawEventId\x1a9\n" + "\vExtrasEntry\x12\x10\n" + "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" + "\x05value\x18\x02 \x01(\fR\x05value:\x028\x01B,Z*github.com/DIMO-Network/fetch-api/pkg/grpcb\x06proto3" diff --git a/pkg/grpc/cloudevent.proto b/pkg/grpc/cloudevent.proto index c1a96b9..20bed46 100644 --- a/pkg/grpc/cloudevent.proto +++ b/pkg/grpc/cloudevent.proto @@ -61,5 +61,7 @@ message CloudEventHeader { // Signature hold the signature of the a cloudevent's data field. string signature = 13; -} + // RawEventID links a parsed event to the backing raw event when present. + string raw_event_id = 14; +} diff --git a/pkg/grpc/common.go b/pkg/grpc/common.go index 12df9ff..ebfcf9f 100644 --- a/pkg/grpc/common.go +++ b/pkg/grpc/common.go @@ -53,6 +53,7 @@ func (c *CloudEventHeader) AsCloudEventHeader() cloudevent.CloudEventHeader { DataVersion: c.GetDataVersion(), Extras: extras, Signature: c.GetSignature(), + RawEventID: c.GetRawEventId(), Tags: TagsOrEmpty(c.GetTags()), } } @@ -84,6 +85,7 @@ func CloudEventHeaderToProto(event *cloudevent.CloudEventHeader) *CloudEventHead DataVersion: event.DataVersion, Extras: extras, Signature: event.Signature, + RawEventId: event.RawEventID, Tags: TagsOrEmpty(event.Tags), } } diff --git a/schema/base.graphqls b/schema/base.graphqls index ace6652..467070e 100644 --- a/schema/base.graphqls +++ b/schema/base.graphqls @@ -81,6 +81,7 @@ type CloudEventHeader { dataversion: String producer: String! signature: String + raweventid: String tags: [String!]! }