From 820bdff732e48c4bc235b5943a265ea63218d45c Mon Sep 17 00:00:00 2001 From: Dylan Moreland Date: Mon, 13 Apr 2026 13:01:17 -0400 Subject: [PATCH] Add raweventid as a CloudEvent "extras" field --- README.md | 2 + clickhouse/clickhouse_test.go | 18 +++++- cloudevent.go | 3 + cloudevent_decoder.go | 3 +- cloudevent_encoder.go | 3 + cloudevent_test.go | 116 ++++++++++++++++++++++++++++++++++ field_accessors.go | 11 +++- parquet/parquet_test.go | 23 ++++++- 8 files changed, 173 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 3236980..fcf7a29 100644 --- a/README.md +++ b/README.md @@ -88,6 +88,7 @@ Each CloudEvent contains the following header fields: | DataSchema | URI pointing to a schema for the data field. | | DataVersion | An optional way for the data provider to specify the version of the data structure in the payload (e.g., "default/v1.0"). | | Signature | An optional cryptographic signature of the CloudEvent's data field for verification purposes. | +| RawEventID | An optional identifier linking a parsed CloudEvent back to its raw image or PDF event. | | Tags | An optional list of tags that can be used to filter and categorize a cloudevents (currently these are only used by `dimo.attestation`). | | Extras | Additional custom fields. | @@ -96,6 +97,7 @@ The DIMO-specific extensions to the CloudEvents specification include: - `Producer`: Provides additional context about the specific instance, process, or device that created the event - `DataVersion`: A DIMO-specific extension that is unique to each source. This can be used by a source to determine the shape of the data field, enabling version-based data processing - `Signature`: An optional cryptographic signature field for verifying the integrity and authenticity of the CloudEvent's data +- `RawEventID`: An optional identifier pointing from a parsed event to its raw source event - `Tags`: An optional list of tags for filtering and categorizing events, useful for event organization and query optimization ### Event Uniqueness diff --git a/clickhouse/clickhouse_test.go b/clickhouse/clickhouse_test.go index 868b379..b992b7e 100644 --- a/clickhouse/clickhouse_test.go +++ b/clickhouse/clickhouse_test.go @@ -189,6 +189,7 @@ func TestAddNonColumnFieldsToExtras(t *testing.T) { SpecVersion: "1.0", DataSchema: "https://example.com/schema", Signature: "test-signature", + RawEventID: "raw-event-123", Tags: []string{"tag1", "tag2"}, } @@ -198,6 +199,7 @@ func TestAddNonColumnFieldsToExtras(t *testing.T) { assert.NotContains(t, extras, "specversion") assert.Equal(t, "https://example.com/schema", extras["dataschema"]) assert.Equal(t, "test-signature", extras["signature"]) + assert.Equal(t, "raw-event-123", extras["raweventid"]) assert.Equal(t, []string{"tag1", "tag2"}, extras["tags"]) }) @@ -207,6 +209,7 @@ func TestAddNonColumnFieldsToExtras(t *testing.T) { SpecVersion: "1.0", DataSchema: "https://example.com/schema", Signature: "test-signature", + RawEventID: "raw-event-123", Tags: []string{"tag1", "tag2"}, Extras: map[string]any{ "existing": "value", @@ -224,11 +227,13 @@ func TestAddNonColumnFieldsToExtras(t *testing.T) { assert.NotContains(t, extras, "specversion") assert.Equal(t, "https://example.com/schema", extras["dataschema"]) assert.Equal(t, "test-signature", extras["signature"]) + assert.Equal(t, "raw-event-123", extras["raweventid"]) assert.Equal(t, []string{"tag1", "tag2"}, extras["tags"]) // Verify original extras map is not modified assert.NotContains(t, event.Extras, "dataschema") assert.NotContains(t, event.Extras, "signature") + assert.NotContains(t, event.Extras, "raweventid") assert.NotContains(t, event.Extras, "tags") }) @@ -238,6 +243,7 @@ func TestAddNonColumnFieldsToExtras(t *testing.T) { SpecVersion: "", // zero value DataSchema: "", // zero value Signature: "", // zero value + RawEventID: "", // zero value Tags: nil, // zero value } @@ -247,6 +253,7 @@ func TestAddNonColumnFieldsToExtras(t *testing.T) { assert.NotContains(t, extras, "specversion") assert.NotContains(t, extras, "dataschema") assert.NotContains(t, extras, "signature") + assert.NotContains(t, extras, "raweventid") assert.NotContains(t, extras, "tags") }) @@ -280,6 +287,7 @@ func TestRestoreNonColumnFields(t *testing.T) { assert.Equal(t, "1.0", event.SpecVersion) assert.Empty(t, event.DataSchema) assert.Empty(t, event.Signature) + assert.Empty(t, event.RawEventID) assert.Nil(t, event.Tags) }) @@ -295,6 +303,7 @@ func TestRestoreNonColumnFields(t *testing.T) { assert.Equal(t, "1.0", event.SpecVersion) assert.Empty(t, event.DataSchema) assert.Empty(t, event.Signature) + assert.Empty(t, event.RawEventID) assert.Nil(t, event.Tags) }) @@ -305,6 +314,7 @@ func TestRestoreNonColumnFields(t *testing.T) { "specversion": "1.0", "dataschema": "https://example.com/schema", "signature": "test-signature", + "raweventid": "raw-event-123", "tags": []any{"tag1", "tag2"}, "other": "should-remain", }, @@ -316,12 +326,14 @@ func TestRestoreNonColumnFields(t *testing.T) { assert.Equal(t, "1.0", event.SpecVersion) assert.Equal(t, "https://example.com/schema", event.DataSchema) assert.Equal(t, "test-signature", event.Signature) + assert.Equal(t, "raw-event-123", event.RawEventID) assert.Equal(t, []string{"tag1", "tag2"}, event.Tags) // Check that non-column fields are removed from extras assert.NotContains(t, event.Extras, "specversion") assert.NotContains(t, event.Extras, "dataschema") assert.NotContains(t, event.Extras, "signature") + assert.NotContains(t, event.Extras, "raweventid") assert.NotContains(t, event.Extras, "tags") assert.Contains(t, event.Extras, "other") // other fields remain }) @@ -332,6 +344,7 @@ func TestRestoreNonColumnFields(t *testing.T) { Extras: map[string]any{ "specversion": 123, // wrong type "dataschema": []int{}, // wrong type + "raweventid": 999, // wrong type "tags": "not-a-slice", // wrong type }, } @@ -342,11 +355,13 @@ func TestRestoreNonColumnFields(t *testing.T) { // SpecVersion is always hardcoded to "1.0" regardless of extras assert.Equal(t, "1.0", event.SpecVersion) assert.Empty(t, event.DataSchema) + assert.Empty(t, event.RawEventID) assert.Nil(t, event.Tags) // Wrong-typed values should still be removed from extras for some fields assert.NotContains(t, event.Extras, "specversion") assert.NotContains(t, event.Extras, "dataschema") + assert.NotContains(t, event.Extras, "raweventid") assert.NotContains(t, event.Extras, "tags") }) @@ -355,7 +370,7 @@ func TestRestoreNonColumnFields(t *testing.T) { ID: "test-id", Extras: map[string]any{ "specversion": "1.0", - // missing dataschema, signature, tags + // missing dataschema, signature, raweventid, tags "other": "value", }, } @@ -366,6 +381,7 @@ func TestRestoreNonColumnFields(t *testing.T) { assert.Equal(t, "1.0", event.SpecVersion) assert.Empty(t, event.DataSchema) assert.Empty(t, event.Signature) + assert.Empty(t, event.RawEventID) assert.Nil(t, event.Tags) // specversion should be removed, other should remain diff --git a/cloudevent.go b/cloudevent.go index ad7ad31..4bcf84e 100644 --- a/cloudevent.go +++ b/cloudevent.go @@ -56,6 +56,9 @@ type CloudEventHeader struct { // Signature hold the signature of the a cloudevent's data field. Signature string `json:"signature,omitempty"` + // RawEventID optionally links a parsed event to the ID of its backing raw event. + RawEventID string `json:"raweventid,omitempty"` + // Tags are a list of tags that can be used to filter events. Tags []string `json:"tags,omitempty"` diff --git a/cloudevent_decoder.go b/cloudevent_decoder.go index 1fd0b20..e88bf5c 100644 --- a/cloudevent_decoder.go +++ b/cloudevent_decoder.go @@ -12,7 +12,7 @@ import ( var knownHeaderFields = map[string]struct{}{ "specversion": {}, "type": {}, "source": {}, "subject": {}, "id": {}, "time": {}, "datacontenttype": {}, "dataschema": {}, "dataversion": {}, - "producer": {}, "signature": {}, "tags": {}, + "producer": {}, "signature": {}, "raweventid": {}, "tags": {}, } // unmarshalHeader parses CloudEvent JSON with gjson and returns the populated @@ -34,6 +34,7 @@ func unmarshalHeader(data []byte) (CloudEventHeader, []byte, string, error) { header.DataSchema = result.Get("dataschema").Str header.DataVersion = result.Get("dataversion").Str header.Signature = result.Get("signature").Str + header.RawEventID = result.Get("raweventid").Str if tr := result.Get("time"); tr.Exists() { if tr.Type != gjson.String { diff --git a/cloudevent_encoder.go b/cloudevent_encoder.go index 025c8cf..9c94f2f 100644 --- a/cloudevent_encoder.go +++ b/cloudevent_encoder.go @@ -70,6 +70,9 @@ func (c *CloudEventHeader) marshalHeaderTo(buf *bytes.Buffer) error { if c.Signature != "" { writeStringField(buf, "signature", c.Signature) } + if c.RawEventID != "" { + writeStringField(buf, "raweventid", c.RawEventID) + } if len(c.Tags) > 0 { buf.WriteString(`,"tags":[`) for i, tag := range c.Tags { diff --git a/cloudevent_test.go b/cloudevent_test.go index e73bd34..882a68f 100644 --- a/cloudevent_test.go +++ b/cloudevent_test.go @@ -90,6 +90,39 @@ func TestCloudEvent_MarshalJSON(t *testing.T) { } }`, }, + { + name: "event with raw event id", + event: cloudevent.CloudEvent[TestData]{ + CloudEventHeader: cloudevent.CloudEventHeader{ + ID: "789", + Source: "test-source", + Producer: "test-producer", + SpecVersion: cloudevent.SpecVersion, + Subject: "test-subject", + Time: now, + Type: cloudevent.TypeFingerprint, + RawEventID: "raw-event-123", + }, + Data: TestData{ + Message: "test", + Count: 2, + }, + }, + expected: `{ + "id": "789", + "source": "test-source", + "producer": "test-producer", + "specversion": "1.0", + "subject": "test-subject", + "time": "` + now.Format(time.RFC3339Nano) + `", + "type": "dimo.fingerprint", + "raweventid": "raw-event-123", + "data": { + "message": "test", + "count": 2 + } + }`, + }, } for _, tt := range tests { @@ -184,6 +217,39 @@ func TestCloudEvent_UnmarshalJSON(t *testing.T) { }, }, }, + { + name: "event with raw event id", + json: `{ + "id": "789", + "source": "test-source", + "producer": "test-producer", + "specversion": "1.0", + "subject": "test-subject", + "time": "` + now.Format(time.RFC3339Nano) + `", + "type": "dimo.fingerprint", + "raweventid": "raw-event-123", + "data": { + "message": "test", + "count": 2 + } + }`, + expected: cloudevent.CloudEvent[TestData]{ + CloudEventHeader: cloudevent.CloudEventHeader{ + ID: "789", + Source: "test-source", + Producer: "test-producer", + SpecVersion: cloudevent.SpecVersion, + Subject: "test-subject", + Time: now, + Type: cloudevent.TypeFingerprint, + RawEventID: "raw-event-123", + }, + Data: TestData{ + Message: "test", + Count: 2, + }, + }, + }, } for _, tt := range tests { @@ -253,6 +319,29 @@ func TestCloudEventHeader_MarshalJSON(t *testing.T) { "extra2": 123 }`, }, + { + name: "header with raw event id", + header: cloudevent.CloudEventHeader{ + ID: "789", + Source: "test-source", + Producer: "test-producer", + SpecVersion: cloudevent.SpecVersion, + Subject: "test-subject", + Time: now, + Type: cloudevent.TypeFingerprint, + RawEventID: "raw-event-123", + }, + expected: `{ + "id": "789", + "source": "test-source", + "producer": "test-producer", + "specversion": "1.0", + "subject": "test-subject", + "time": "` + now.Format(time.RFC3339Nano) + `", + "type": "dimo.fingerprint", + "raweventid": "raw-event-123" + }`, + }, } for _, tt := range tests { @@ -333,6 +422,33 @@ func TestCloudEventHeader_UnmarshalJSON(t *testing.T) { }, }, }, + { + name: "header with raw event id", + json: `{ + "id": "789", + "source": "test-source", + "producer": "test-producer", + "specversion": "1.0", + "subject": "test-subject", + "time": "` + now.Format(time.RFC3339Nano) + `", + "type": "dimo.fingerprint", + "raweventid": "raw-event-123", + "extra1": "value1" + }`, + expected: cloudevent.CloudEventHeader{ + ID: "789", + Source: "test-source", + Producer: "test-producer", + SpecVersion: cloudevent.SpecVersion, + Subject: "test-subject", + Time: now, + Type: cloudevent.TypeFingerprint, + RawEventID: "raw-event-123", + Extras: map[string]any{ + "extra1": "value1", + }, + }, + }, } for _, tt := range tests { diff --git a/field_accessors.go b/field_accessors.go index bd5bed5..37ae3cf 100644 --- a/field_accessors.go +++ b/field_accessors.go @@ -21,6 +21,12 @@ func RestoreNonColumnFields(event *CloudEventHeader) { } delete(event.Extras, "signature") } + if val, ok := event.Extras["raweventid"]; ok { + if typedVal, ok := val.(string); ok { + event.RawEventID = typedVal + } + delete(event.Extras, "raweventid") + } if val, ok := event.Extras["tags"]; ok { if anySlice, ok := val.([]any); ok { typedSlice := make([]string, len(anySlice)) @@ -36,7 +42,7 @@ func RestoreNonColumnFields(event *CloudEventHeader) { // AddNonColumnFieldsToExtras adds fields without dedicated columns to Extras. // Returns nil when there are no extras and no non-column fields to add. func AddNonColumnFieldsToExtras(event *CloudEventHeader) map[string]any { - hasNonColumn := event.DataSchema != "" || event.Signature != "" || len(event.Tags) > 0 + hasNonColumn := event.DataSchema != "" || event.Signature != "" || event.RawEventID != "" || len(event.Tags) > 0 if !hasNonColumn && len(event.Extras) == 0 { return nil } @@ -52,6 +58,9 @@ func AddNonColumnFieldsToExtras(event *CloudEventHeader) map[string]any { if event.Signature != "" { extras["signature"] = event.Signature } + if event.RawEventID != "" { + extras["raweventid"] = event.RawEventID + } if len(event.Tags) > 0 { extras["tags"] = event.Tags } diff --git a/parquet/parquet_test.go b/parquet/parquet_test.go index 1671675..811ca68 100644 --- a/parquet/parquet_test.go +++ b/parquet/parquet_test.go @@ -238,6 +238,20 @@ func TestRoundtrip_Signature(t *testing.T) { assert.Equal(t, "0xdeadbeef", events[0].Signature) } +func TestRoundtrip_RawEventID(t *testing.T) { + t.Parallel() + original := makeEvent("raw-ref-evt", json.RawMessage(`{}`)) + original.RawEventID = "raw-event-123" + + buf, _ := encodeToBuffer(t, []cloudevent.RawEvent{original}, "rawref") + + r := bytes.NewReader(buf.Bytes()) + events, err := Decode(r, int64(buf.Len())) + require.NoError(t, err) + require.Len(t, events, 1) + assert.Equal(t, "raw-event-123", events[0].RawEventID) +} + func TestRoundtrip_Tags(t *testing.T) { t.Parallel() original := makeEvent("tag-evt", json.RawMessage(`{}`)) @@ -479,9 +493,9 @@ func TestIsParquetRef(t *testing.T) { func TestParseIndexKey_Valid(t *testing.T) { t.Parallel() tests := []struct { - input string - wantKey string - wantRow int64 + input string + wantKey string + wantRow int64 }{ {"obj/key#0", "obj/key", 0}, {"obj/key#42", "obj/key", 42}, @@ -551,6 +565,7 @@ func TestRoundtrip_AllNonColumnFields(t *testing.T) { original := makeEvent("full-evt", json.RawMessage(`{"complete":true}`)) original.DataSchema = "https://schema.dimo.zone/status/v2" original.Signature = "0xabcdef1234567890" + original.RawEventID = "raw-event-123" original.Tags = []string{"tagged", "event"} original.Extras = map[string]any{ "custom": "value", @@ -566,9 +581,11 @@ func TestRoundtrip_AllNonColumnFields(t *testing.T) { got := events[0] assert.Equal(t, original.DataSchema, got.DataSchema) assert.Equal(t, original.Signature, got.Signature) + assert.Equal(t, original.RawEventID, got.RawEventID) assert.Equal(t, original.Tags, got.Tags) assert.Equal(t, "value", got.Extras["custom"]) assert.NotContains(t, got.Extras, "signature") + assert.NotContains(t, got.Extras, "raweventid") } func TestRoundtrip_EmptyStringFields(t *testing.T) {