Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ Each CloudEvent contains the following header fields:
| DataSchema | URI pointing to a schema for the data field. |
| DataVersion | An optional way for the data provider to specify the version of the data structure in the payload (e.g., "default/v1.0"). |
| Signature | An optional cryptographic signature of the CloudEvent's data field for verification purposes. |
| RawEventID | An optional identifier linking a parsed CloudEvent back to its raw image or PDF event. |
| Tags | An optional list of tags that can be used to filter and categorize a cloudevents (currently these are only used by `dimo.attestation`). |
| Extras | Additional custom fields. |

Expand All @@ -96,6 +97,7 @@ The DIMO-specific extensions to the CloudEvents specification include:
- `Producer`: Provides additional context about the specific instance, process, or device that created the event
- `DataVersion`: A DIMO-specific extension that is unique to each source. This can be used by a source to determine the shape of the data field, enabling version-based data processing
- `Signature`: An optional cryptographic signature field for verifying the integrity and authenticity of the CloudEvent's data
- `RawEventID`: An optional identifier pointing from a parsed event to its raw source event
- `Tags`: An optional list of tags for filtering and categorizing events, useful for event organization and query optimization

### Event Uniqueness
Expand Down
18 changes: 17 additions & 1 deletion clickhouse/clickhouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ func TestAddNonColumnFieldsToExtras(t *testing.T) {
SpecVersion: "1.0",
DataSchema: "https://example.com/schema",
Signature: "test-signature",
RawEventID: "raw-event-123",
Tags: []string{"tag1", "tag2"},
}

Expand All @@ -198,6 +199,7 @@ func TestAddNonColumnFieldsToExtras(t *testing.T) {
assert.NotContains(t, extras, "specversion")
assert.Equal(t, "https://example.com/schema", extras["dataschema"])
assert.Equal(t, "test-signature", extras["signature"])
assert.Equal(t, "raw-event-123", extras["raweventid"])
assert.Equal(t, []string{"tag1", "tag2"}, extras["tags"])
})

Expand All @@ -207,6 +209,7 @@ func TestAddNonColumnFieldsToExtras(t *testing.T) {
SpecVersion: "1.0",
DataSchema: "https://example.com/schema",
Signature: "test-signature",
RawEventID: "raw-event-123",
Tags: []string{"tag1", "tag2"},
Extras: map[string]any{
"existing": "value",
Expand All @@ -224,11 +227,13 @@ func TestAddNonColumnFieldsToExtras(t *testing.T) {
assert.NotContains(t, extras, "specversion")
assert.Equal(t, "https://example.com/schema", extras["dataschema"])
assert.Equal(t, "test-signature", extras["signature"])
assert.Equal(t, "raw-event-123", extras["raweventid"])
assert.Equal(t, []string{"tag1", "tag2"}, extras["tags"])

// Verify original extras map is not modified
assert.NotContains(t, event.Extras, "dataschema")
assert.NotContains(t, event.Extras, "signature")
assert.NotContains(t, event.Extras, "raweventid")
assert.NotContains(t, event.Extras, "tags")
})

Expand All @@ -238,6 +243,7 @@ func TestAddNonColumnFieldsToExtras(t *testing.T) {
SpecVersion: "", // zero value
DataSchema: "", // zero value
Signature: "", // zero value
RawEventID: "", // zero value
Tags: nil, // zero value
}

Expand All @@ -247,6 +253,7 @@ func TestAddNonColumnFieldsToExtras(t *testing.T) {
assert.NotContains(t, extras, "specversion")
assert.NotContains(t, extras, "dataschema")
assert.NotContains(t, extras, "signature")
assert.NotContains(t, extras, "raweventid")
assert.NotContains(t, extras, "tags")
})

Expand Down Expand Up @@ -280,6 +287,7 @@ func TestRestoreNonColumnFields(t *testing.T) {
assert.Equal(t, "1.0", event.SpecVersion)
assert.Empty(t, event.DataSchema)
assert.Empty(t, event.Signature)
assert.Empty(t, event.RawEventID)
assert.Nil(t, event.Tags)
})

Expand All @@ -295,6 +303,7 @@ func TestRestoreNonColumnFields(t *testing.T) {
assert.Equal(t, "1.0", event.SpecVersion)
assert.Empty(t, event.DataSchema)
assert.Empty(t, event.Signature)
assert.Empty(t, event.RawEventID)
assert.Nil(t, event.Tags)
})

Expand All @@ -305,6 +314,7 @@ func TestRestoreNonColumnFields(t *testing.T) {
"specversion": "1.0",
"dataschema": "https://example.com/schema",
"signature": "test-signature",
"raweventid": "raw-event-123",
"tags": []any{"tag1", "tag2"},
"other": "should-remain",
},
Expand All @@ -316,12 +326,14 @@ func TestRestoreNonColumnFields(t *testing.T) {
assert.Equal(t, "1.0", event.SpecVersion)
assert.Equal(t, "https://example.com/schema", event.DataSchema)
assert.Equal(t, "test-signature", event.Signature)
assert.Equal(t, "raw-event-123", event.RawEventID)
assert.Equal(t, []string{"tag1", "tag2"}, event.Tags)

// Check that non-column fields are removed from extras
assert.NotContains(t, event.Extras, "specversion")
assert.NotContains(t, event.Extras, "dataschema")
assert.NotContains(t, event.Extras, "signature")
assert.NotContains(t, event.Extras, "raweventid")
assert.NotContains(t, event.Extras, "tags")
assert.Contains(t, event.Extras, "other") // other fields remain
})
Expand All @@ -332,6 +344,7 @@ func TestRestoreNonColumnFields(t *testing.T) {
Extras: map[string]any{
"specversion": 123, // wrong type
"dataschema": []int{}, // wrong type
"raweventid": 999, // wrong type
"tags": "not-a-slice", // wrong type
},
}
Expand All @@ -342,11 +355,13 @@ func TestRestoreNonColumnFields(t *testing.T) {
// SpecVersion is always hardcoded to "1.0" regardless of extras
assert.Equal(t, "1.0", event.SpecVersion)
assert.Empty(t, event.DataSchema)
assert.Empty(t, event.RawEventID)
assert.Nil(t, event.Tags)

// Wrong-typed values should still be removed from extras for some fields
assert.NotContains(t, event.Extras, "specversion")
assert.NotContains(t, event.Extras, "dataschema")
assert.NotContains(t, event.Extras, "raweventid")
assert.NotContains(t, event.Extras, "tags")
})

Expand All @@ -355,7 +370,7 @@ func TestRestoreNonColumnFields(t *testing.T) {
ID: "test-id",
Extras: map[string]any{
"specversion": "1.0",
// missing dataschema, signature, tags
// missing dataschema, signature, raweventid, tags
"other": "value",
},
}
Expand All @@ -366,6 +381,7 @@ func TestRestoreNonColumnFields(t *testing.T) {
assert.Equal(t, "1.0", event.SpecVersion)
assert.Empty(t, event.DataSchema)
assert.Empty(t, event.Signature)
assert.Empty(t, event.RawEventID)
assert.Nil(t, event.Tags)

// specversion should be removed, other should remain
Expand Down
3 changes: 3 additions & 0 deletions cloudevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ type CloudEventHeader struct {
// Signature hold the signature of the a cloudevent's data field.
Signature string `json:"signature,omitempty"`

// RawEventID optionally links a parsed event to the ID of its backing raw event.
RawEventID string `json:"raweventid,omitempty"`

// Tags are a list of tags that can be used to filter events.
Tags []string `json:"tags,omitempty"`

Expand Down
3 changes: 2 additions & 1 deletion cloudevent_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
var knownHeaderFields = map[string]struct{}{
"specversion": {}, "type": {}, "source": {}, "subject": {}, "id": {},
"time": {}, "datacontenttype": {}, "dataschema": {}, "dataversion": {},
"producer": {}, "signature": {}, "tags": {},
"producer": {}, "signature": {}, "raweventid": {}, "tags": {},
}

// unmarshalHeader parses CloudEvent JSON with gjson and returns the populated
Expand All @@ -34,6 +34,7 @@ func unmarshalHeader(data []byte) (CloudEventHeader, []byte, string, error) {
header.DataSchema = result.Get("dataschema").Str
header.DataVersion = result.Get("dataversion").Str
header.Signature = result.Get("signature").Str
header.RawEventID = result.Get("raweventid").Str

if tr := result.Get("time"); tr.Exists() {
if tr.Type != gjson.String {
Expand Down
3 changes: 3 additions & 0 deletions cloudevent_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ func (c *CloudEventHeader) marshalHeaderTo(buf *bytes.Buffer) error {
if c.Signature != "" {
writeStringField(buf, "signature", c.Signature)
}
if c.RawEventID != "" {
writeStringField(buf, "raweventid", c.RawEventID)
}
if len(c.Tags) > 0 {
buf.WriteString(`,"tags":[`)
for i, tag := range c.Tags {
Expand Down
116 changes: 116 additions & 0 deletions cloudevent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,39 @@ func TestCloudEvent_MarshalJSON(t *testing.T) {
}
}`,
},
{
name: "event with raw event id",
event: cloudevent.CloudEvent[TestData]{
CloudEventHeader: cloudevent.CloudEventHeader{
ID: "789",
Source: "test-source",
Producer: "test-producer",
SpecVersion: cloudevent.SpecVersion,
Subject: "test-subject",
Time: now,
Type: cloudevent.TypeFingerprint,
RawEventID: "raw-event-123",
},
Data: TestData{
Message: "test",
Count: 2,
},
},
expected: `{
"id": "789",
"source": "test-source",
"producer": "test-producer",
"specversion": "1.0",
"subject": "test-subject",
"time": "` + now.Format(time.RFC3339Nano) + `",
"type": "dimo.fingerprint",
"raweventid": "raw-event-123",
"data": {
"message": "test",
"count": 2
}
}`,
},
}

for _, tt := range tests {
Expand Down Expand Up @@ -184,6 +217,39 @@ func TestCloudEvent_UnmarshalJSON(t *testing.T) {
},
},
},
{
name: "event with raw event id",
json: `{
"id": "789",
"source": "test-source",
"producer": "test-producer",
"specversion": "1.0",
"subject": "test-subject",
"time": "` + now.Format(time.RFC3339Nano) + `",
"type": "dimo.fingerprint",
"raweventid": "raw-event-123",
"data": {
"message": "test",
"count": 2
}
}`,
expected: cloudevent.CloudEvent[TestData]{
CloudEventHeader: cloudevent.CloudEventHeader{
ID: "789",
Source: "test-source",
Producer: "test-producer",
SpecVersion: cloudevent.SpecVersion,
Subject: "test-subject",
Time: now,
Type: cloudevent.TypeFingerprint,
RawEventID: "raw-event-123",
},
Data: TestData{
Message: "test",
Count: 2,
},
},
},
}

for _, tt := range tests {
Expand Down Expand Up @@ -253,6 +319,29 @@ func TestCloudEventHeader_MarshalJSON(t *testing.T) {
"extra2": 123
}`,
},
{
name: "header with raw event id",
header: cloudevent.CloudEventHeader{
ID: "789",
Source: "test-source",
Producer: "test-producer",
SpecVersion: cloudevent.SpecVersion,
Subject: "test-subject",
Time: now,
Type: cloudevent.TypeFingerprint,
RawEventID: "raw-event-123",
},
expected: `{
"id": "789",
"source": "test-source",
"producer": "test-producer",
"specversion": "1.0",
"subject": "test-subject",
"time": "` + now.Format(time.RFC3339Nano) + `",
"type": "dimo.fingerprint",
"raweventid": "raw-event-123"
}`,
},
}

for _, tt := range tests {
Expand Down Expand Up @@ -333,6 +422,33 @@ func TestCloudEventHeader_UnmarshalJSON(t *testing.T) {
},
},
},
{
name: "header with raw event id",
json: `{
"id": "789",
"source": "test-source",
"producer": "test-producer",
"specversion": "1.0",
"subject": "test-subject",
"time": "` + now.Format(time.RFC3339Nano) + `",
"type": "dimo.fingerprint",
"raweventid": "raw-event-123",
"extra1": "value1"
}`,
expected: cloudevent.CloudEventHeader{
ID: "789",
Source: "test-source",
Producer: "test-producer",
SpecVersion: cloudevent.SpecVersion,
Subject: "test-subject",
Time: now,
Type: cloudevent.TypeFingerprint,
RawEventID: "raw-event-123",
Extras: map[string]any{
"extra1": "value1",
},
},
},
}

for _, tt := range tests {
Expand Down
11 changes: 10 additions & 1 deletion field_accessors.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ func RestoreNonColumnFields(event *CloudEventHeader) {
}
delete(event.Extras, "signature")
}
if val, ok := event.Extras["raweventid"]; ok {
if typedVal, ok := val.(string); ok {
event.RawEventID = typedVal
}
delete(event.Extras, "raweventid")
}
if val, ok := event.Extras["tags"]; ok {
if anySlice, ok := val.([]any); ok {
typedSlice := make([]string, len(anySlice))
Expand All @@ -36,7 +42,7 @@ func RestoreNonColumnFields(event *CloudEventHeader) {
// AddNonColumnFieldsToExtras adds fields without dedicated columns to Extras.
// Returns nil when there are no extras and no non-column fields to add.
func AddNonColumnFieldsToExtras(event *CloudEventHeader) map[string]any {
hasNonColumn := event.DataSchema != "" || event.Signature != "" || len(event.Tags) > 0
hasNonColumn := event.DataSchema != "" || event.Signature != "" || event.RawEventID != "" || len(event.Tags) > 0
if !hasNonColumn && len(event.Extras) == 0 {
return nil
}
Expand All @@ -52,6 +58,9 @@ func AddNonColumnFieldsToExtras(event *CloudEventHeader) map[string]any {
if event.Signature != "" {
extras["signature"] = event.Signature
}
if event.RawEventID != "" {
extras["raweventid"] = event.RawEventID
}
if len(event.Tags) > 0 {
extras["tags"] = event.Tags
}
Expand Down
Loading
Loading