From 404ca15b1535f951f2a9c4697e6eb3ad1a6f5f13 Mon Sep 17 00:00:00 2001 From: Kristopher Chun Date: Fri, 6 Feb 2026 15:21:45 -0800 Subject: [PATCH 1/6] feat: add CloudEvents deserialization support to all eventbus engines MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit External producers publish domain events using the CloudEvents v1.0 spec, but all eventbus engines (Kinesis, Kafka, NATS, Redis) unmarshal records directly into eventbus.Event whose JSON tags don't match CloudEvents fields. This causes CloudEvents records to deserialize as zero-value structs with empty topics that match no subscriptions, silently dropping messages. Introduce parseRecord() which detects CloudEvents by the presence of "specversion", maps type→Topic, data→Payload, time→CreatedAt, and preserves context/extension attributes in Metadata with a ce_ prefix. Native eventbus.Event format continues to work unchanged. --- modules/eventbus/cloudevents_decode.go | 147 +++++++++ modules/eventbus/cloudevents_decode_test.go | 320 ++++++++++++++++++++ modules/eventbus/kafka.go | 4 +- modules/eventbus/kinesis.go | 4 +- modules/eventbus/nats.go | 4 +- modules/eventbus/redis.go | 4 +- 6 files changed, 475 insertions(+), 8 deletions(-) create mode 100644 modules/eventbus/cloudevents_decode.go create mode 100644 modules/eventbus/cloudevents_decode_test.go diff --git a/modules/eventbus/cloudevents_decode.go b/modules/eventbus/cloudevents_decode.go new file mode 100644 index 00000000..bdbf26b6 --- /dev/null +++ b/modules/eventbus/cloudevents_decode.go @@ -0,0 +1,147 @@ +package eventbus + +import ( + "encoding/json" + "fmt" + "log/slog" + "time" +) + +// cloudEventEnvelope is a lightweight representation of a CloudEvents v1.0 +// JSON envelope. Only the required and commonly-used attributes are given +// dedicated fields; extension attributes are captured separately. +type cloudEventEnvelope struct { + SpecVersion string `json:"specversion"` + Type string `json:"type"` + Source string `json:"source"` + ID string `json:"id"` + Time string `json:"time,omitempty"` + DataContentType string `json:"datacontenttype,omitempty"` + Data json.RawMessage `json:"data,omitempty"` + Subject string `json:"subject,omitempty"` +} + +// knownCloudEventKeys are the CloudEvents spec-defined keys that have +// dedicated handling. Anything else is treated as an extension attribute. +var knownCloudEventKeys = map[string]bool{ + "specversion": true, + "type": true, + "source": true, + "id": true, + "time": true, + "datacontenttype": true, + "data": true, + "data_base64": true, + "subject": true, +} + +// isCloudEvent checks whether raw JSON contains a CloudEvents envelope +// by probing for the required "specversion" key. +func isCloudEvent(raw json.RawMessage) bool { + var probe map[string]json.RawMessage + if err := json.Unmarshal(raw, &probe); err != nil { + return false + } + _, ok := probe["specversion"] + return ok +} + +// parseCloudEvent maps a CloudEvents JSON envelope to an eventbus.Event. +// +// Mapping: +// - type → Event.Topic +// - data → Event.Payload +// - time → Event.CreatedAt (RFC3339; falls back to time.Now()) +// - specversion, source, id, datacontenttype, subject, and all extension +// attributes → Event.Metadata (prefixed with "ce_") +func parseCloudEvent(raw json.RawMessage) (Event, error) { + var ce cloudEventEnvelope + if err := json.Unmarshal(raw, &ce); err != nil { + return Event{}, fmt.Errorf("failed to parse CloudEvent envelope: %w", err) + } + + if ce.SpecVersion == "" { + return Event{}, fmt.Errorf("CloudEvent missing required 'specversion' attribute") + } + if ce.Type == "" { + return Event{}, fmt.Errorf("CloudEvent missing required 'type' attribute") + } + if ce.Source == "" { + return Event{}, fmt.Errorf("CloudEvent missing required 'source' attribute") + } + if ce.ID == "" { + return Event{}, fmt.Errorf("CloudEvent missing required 'id' attribute") + } + + var createdAt time.Time + if ce.Time != "" { + var err error + createdAt, err = time.Parse(time.RFC3339, ce.Time) + if err != nil { + slog.Warn("CloudEvent has unparseable 'time' attribute, using current time", + "time", ce.Time, "error", err) + createdAt = time.Now() + } + } else { + createdAt = time.Now() + } + + var payload interface{} + if len(ce.Data) > 0 && string(ce.Data) != "null" { + if err := json.Unmarshal(ce.Data, &payload); err != nil { + return Event{}, fmt.Errorf("failed to parse CloudEvent 'data' field: %w", err) + } + } + + // Build metadata from known attributes and extension attributes. + var fullMap map[string]json.RawMessage + if err := json.Unmarshal(raw, &fullMap); err != nil { + return Event{}, fmt.Errorf("failed to parse CloudEvent for extensions: %w", err) + } + + metadata := make(map[string]interface{}) + metadata["ce_specversion"] = ce.SpecVersion + metadata["ce_source"] = ce.Source + metadata["ce_id"] = ce.ID + if ce.DataContentType != "" { + metadata["ce_datacontenttype"] = ce.DataContentType + } + if ce.Subject != "" { + metadata["ce_subject"] = ce.Subject + } + + for key, val := range fullMap { + if knownCloudEventKeys[key] { + continue + } + var extVal interface{} + if err := json.Unmarshal(val, &extVal); err != nil { + metadata["ce_"+key] = string(val) + } else { + metadata["ce_"+key] = extVal + } + } + + return Event{ + Topic: ce.Type, + Payload: payload, + Metadata: metadata, + CreatedAt: createdAt, + }, nil +} + +// parseRecord attempts to parse raw JSON as either a CloudEvents envelope +// or a native eventbus.Event. This is the entry point used by engine +// deserialization paths. +func parseRecord(raw []byte) (Event, error) { + if isCloudEvent(json.RawMessage(raw)) { + return parseCloudEvent(json.RawMessage(raw)) + } + + var event Event + if err := json.Unmarshal(raw, &event); err != nil { + return Event{}, fmt.Errorf("failed to deserialize record: %w", err) + } + + return event, nil +} diff --git a/modules/eventbus/cloudevents_decode_test.go b/modules/eventbus/cloudevents_decode_test.go new file mode 100644 index 00000000..ea327054 --- /dev/null +++ b/modules/eventbus/cloudevents_decode_test.go @@ -0,0 +1,320 @@ +package eventbus + +import ( + "encoding/json" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestIsCloudEvent(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + input string + expected bool + }{ + { + name: "valid CloudEvent with specversion", + input: `{"specversion":"1.0","type":"test","source":"test","id":"1"}`, + expected: true, + }, + { + name: "native Event without specversion", + input: `{"topic":"user.created","payload":{"id":"123"},"createdAt":"2026-01-01T00:00:00Z"}`, + expected: false, + }, + { + name: "empty JSON object", + input: `{}`, + expected: false, + }, + { + name: "invalid JSON", + input: `not json`, + expected: false, + }, + { + name: "array not object", + input: `[1,2,3]`, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + result := isCloudEvent(json.RawMessage(tt.input)) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestParseCloudEvent(t *testing.T) { + t.Parallel() + + t.Run("full CloudEvent with all fields", func(t *testing.T) { + t.Parallel() + raw := json.RawMessage(`{ + "specversion": "1.0", + "type": "conversations.conversation.started", + "source": "platform.conversations", + "id": "evt-ff9745302bb23718d9da693c", + "time": "2026-02-06T23:02:35+00:00", + "datacontenttype": "application/json", + "data": {"id": "123", "texterId": "987", "keyword": "HELLO"} + }`) + + event, err := parseCloudEvent(raw) + require.NoError(t, err) + + assert.Equal(t, "conversations.conversation.started", event.Topic) + assert.NotNil(t, event.Payload) + assert.Equal(t, "1.0", event.Metadata["ce_specversion"]) + assert.Equal(t, "platform.conversations", event.Metadata["ce_source"]) + assert.Equal(t, "evt-ff9745302bb23718d9da693c", event.Metadata["ce_id"]) + assert.Equal(t, "application/json", event.Metadata["ce_datacontenttype"]) + + expectedTime, _ := time.Parse(time.RFC3339, "2026-02-06T23:02:35+00:00") + assert.Equal(t, expectedTime, event.CreatedAt) + + payloadMap, ok := event.Payload.(map[string]interface{}) + require.True(t, ok) + assert.Equal(t, "123", payloadMap["id"]) + assert.Equal(t, "987", payloadMap["texterId"]) + assert.Equal(t, "HELLO", payloadMap["keyword"]) + }) + + t.Run("CloudEvent with extension attributes", func(t *testing.T) { + t.Parallel() + raw := json.RawMessage(`{ + "specversion": "1.0", + "type": "user.created", + "source": "user-service", + "id": "abc-123", + "tenantid": "tenant-456", + "traceparent": "00-abc-def-01" + }`) + + event, err := parseCloudEvent(raw) + require.NoError(t, err) + + assert.Equal(t, "user.created", event.Topic) + assert.Equal(t, "tenant-456", event.Metadata["ce_tenantid"]) + assert.Equal(t, "00-abc-def-01", event.Metadata["ce_traceparent"]) + }) + + t.Run("CloudEvent without time uses current time", func(t *testing.T) { + t.Parallel() + raw := json.RawMessage(`{ + "specversion": "1.0", + "type": "test.event", + "source": "test", + "id": "1" + }`) + + before := time.Now() + event, err := parseCloudEvent(raw) + after := time.Now() + require.NoError(t, err) + + assert.False(t, event.CreatedAt.Before(before)) + assert.False(t, event.CreatedAt.After(after)) + }) + + t.Run("CloudEvent with unparseable time falls back to now", func(t *testing.T) { + t.Parallel() + raw := json.RawMessage(`{ + "specversion": "1.0", + "type": "test.event", + "source": "test", + "id": "1", + "time": "not-a-timestamp" + }`) + + before := time.Now() + event, err := parseCloudEvent(raw) + after := time.Now() + require.NoError(t, err) + + assert.False(t, event.CreatedAt.IsZero()) + assert.False(t, event.CreatedAt.Before(before)) + assert.False(t, event.CreatedAt.After(after)) + }) + + t.Run("CloudEvent with null data", func(t *testing.T) { + t.Parallel() + raw := json.RawMessage(`{ + "specversion": "1.0", + "type": "test.event", + "source": "test", + "id": "1", + "data": null + }`) + + event, err := parseCloudEvent(raw) + require.NoError(t, err) + assert.Nil(t, event.Payload) + }) + + t.Run("CloudEvent with no data field", func(t *testing.T) { + t.Parallel() + raw := json.RawMessage(`{ + "specversion": "1.0", + "type": "test.event", + "source": "test", + "id": "1" + }`) + + event, err := parseCloudEvent(raw) + require.NoError(t, err) + assert.Nil(t, event.Payload) + }) + + t.Run("missing required type returns error", func(t *testing.T) { + t.Parallel() + raw := json.RawMessage(`{"specversion": "1.0", "source": "test", "id": "1"}`) + _, err := parseCloudEvent(raw) + assert.Error(t, err) + assert.Contains(t, err.Error(), "type") + }) + + t.Run("missing required source returns error", func(t *testing.T) { + t.Parallel() + raw := json.RawMessage(`{"specversion": "1.0", "type": "test", "id": "1"}`) + _, err := parseCloudEvent(raw) + assert.Error(t, err) + assert.Contains(t, err.Error(), "source") + }) + + t.Run("missing required id returns error", func(t *testing.T) { + t.Parallel() + raw := json.RawMessage(`{"specversion": "1.0", "type": "test", "source": "test"}`) + _, err := parseCloudEvent(raw) + assert.Error(t, err) + assert.Contains(t, err.Error(), "id") + }) + + t.Run("missing required specversion returns error", func(t *testing.T) { + t.Parallel() + raw := json.RawMessage(`{"type": "test", "source": "test", "id": "1"}`) + _, err := parseCloudEvent(raw) + assert.Error(t, err) + assert.Contains(t, err.Error(), "specversion") + }) + + t.Run("CloudEvent with subject attribute", func(t *testing.T) { + t.Parallel() + raw := json.RawMessage(`{ + "specversion": "1.0", + "type": "test.event", + "source": "test", + "id": "1", + "subject": "resource-123" + }`) + + event, err := parseCloudEvent(raw) + require.NoError(t, err) + assert.Equal(t, "resource-123", event.Metadata["ce_subject"]) + }) + + t.Run("CloudEvent with string data payload", func(t *testing.T) { + t.Parallel() + raw := json.RawMessage(`{ + "specversion": "1.0", + "type": "test.event", + "source": "test", + "id": "1", + "data": "plain text payload" + }`) + + event, err := parseCloudEvent(raw) + require.NoError(t, err) + assert.Equal(t, "plain text payload", event.Payload) + }) + + t.Run("CloudEvent with array data payload", func(t *testing.T) { + t.Parallel() + raw := json.RawMessage(`{ + "specversion": "1.0", + "type": "test.event", + "source": "test", + "id": "1", + "data": [1, 2, 3] + }`) + + event, err := parseCloudEvent(raw) + require.NoError(t, err) + arr, ok := event.Payload.([]interface{}) + require.True(t, ok) + assert.Len(t, arr, 3) + }) +} + +func TestParseRecord(t *testing.T) { + t.Parallel() + + t.Run("routes CloudEvent to parseCloudEvent", func(t *testing.T) { + t.Parallel() + raw := []byte(`{ + "specversion": "1.0", + "type": "order.placed", + "source": "order-service", + "id": "evt-123", + "data": {"orderId": "456"} + }`) + + event, err := parseRecord(raw) + require.NoError(t, err) + assert.Equal(t, "order.placed", event.Topic) + assert.Equal(t, "1.0", event.Metadata["ce_specversion"]) + }) + + t.Run("routes native Event to json.Unmarshal", func(t *testing.T) { + t.Parallel() + raw := []byte(`{ + "topic": "user.created", + "payload": {"userId": "789"}, + "metadata": {"source": "internal"}, + "createdAt": "2026-01-15T10:00:00Z" + }`) + + event, err := parseRecord(raw) + require.NoError(t, err) + assert.Equal(t, "user.created", event.Topic) + _, hasCeSpec := event.Metadata["ce_specversion"] + assert.False(t, hasCeSpec) + assert.Equal(t, "internal", event.Metadata["source"]) + }) + + t.Run("invalid JSON returns error", func(t *testing.T) { + t.Parallel() + _, err := parseRecord([]byte(`not json at all`)) + assert.Error(t, err) + }) + + t.Run("empty JSON object returns native Event", func(t *testing.T) { + t.Parallel() + event, err := parseRecord([]byte(`{}`)) + require.NoError(t, err) + assert.Equal(t, "", event.Topic) + }) + + t.Run("native event with __topic metadata preserved", func(t *testing.T) { + t.Parallel() + raw := []byte(`{ + "topic": "user.created", + "payload": {"userId": "123"}, + "metadata": {"__topic": "user.created"}, + "createdAt": "2026-01-15T10:00:00Z" + }`) + + event, err := parseRecord(raw) + require.NoError(t, err) + assert.Equal(t, "user.created", event.Topic) + assert.Equal(t, "user.created", event.Metadata["__topic"]) + }) +} diff --git a/modules/eventbus/kafka.go b/modules/eventbus/kafka.go index 73d466e5..f7433d98 100644 --- a/modules/eventbus/kafka.go +++ b/modules/eventbus/kafka.go @@ -118,8 +118,8 @@ func (h *KafkaConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSes // Process message for each matching subscription for _, sub := range subs { // Deserialize event - var event Event - if err := json.Unmarshal(msg.Value, &event); err != nil { + event, err := parseRecord(msg.Value) + if err != nil { slog.Error("Failed to deserialize Kafka message", "error", err, "topic", msg.Topic) continue } diff --git a/modules/eventbus/kinesis.go b/modules/eventbus/kinesis.go index 6aa79979..01cfc054 100644 --- a/modules/eventbus/kinesis.go +++ b/modules/eventbus/kinesis.go @@ -360,8 +360,8 @@ func (k *KinesisEventBus) readShard(shardID string) { // Process records for _, record := range resp.Records { - var event Event - if err := json.Unmarshal(record.Data, &event); err != nil { + event, err := parseRecord(record.Data) + if err != nil { slog.Error("Failed to deserialize Kinesis record", "error", err) continue } diff --git a/modules/eventbus/nats.go b/modules/eventbus/nats.go index ce0b4dcf..d3a80c6d 100644 --- a/modules/eventbus/nats.go +++ b/modules/eventbus/nats.go @@ -299,8 +299,8 @@ func (n *NatsEventBus) subscribe(ctx context.Context, topic string, handler Even sub.mutex.RUnlock() // Deserialize event - var event Event - if err := json.Unmarshal(msg.Data, &event); err != nil { + event, err := parseRecord(msg.Data) + if err != nil { slog.Error("Failed to deserialize NATS message", "error", err, "subject", msg.Subject) return } diff --git a/modules/eventbus/redis.go b/modules/eventbus/redis.go index 0dab6783..a9b916c8 100644 --- a/modules/eventbus/redis.go +++ b/modules/eventbus/redis.go @@ -350,8 +350,8 @@ func (r *RedisEventBus) handleMessages(sub *redisSubscription) { } // Deserialize event - var event Event - if err := json.Unmarshal([]byte(msg.Payload), &event); err != nil { + event, err := parseRecord([]byte(msg.Payload)) + if err != nil { slog.Error("Failed to deserialize Redis message", "error", err, "topic", msg.Channel) continue } From c967f06a90e083ac9a60a782015b60e48b2adc98 Mon Sep 17 00:00:00 2001 From: Kristopher Chun Date: Mon, 9 Feb 2026 15:21:32 -0800 Subject: [PATCH 2/6] refactor: single-pass JSON decode for CloudEvents deserialization Replace triple-unmarshal approach (probe map, envelope struct, extension map) with a single unmarshal into map[string]json.RawMessage that is reused for detection, field extraction, and extension collection. Add extractString helper and tests for non-string attributes, invalid data fields, and malformed extensions to achieve 100% coverage on cloudevents_decode.go. --- modules/eventbus/cloudevents_decode.go | 102 +++++----- modules/eventbus/cloudevents_decode_test.go | 213 ++++++++++++++------ 2 files changed, 194 insertions(+), 121 deletions(-) diff --git a/modules/eventbus/cloudevents_decode.go b/modules/eventbus/cloudevents_decode.go index bdbf26b6..bc627cb0 100644 --- a/modules/eventbus/cloudevents_decode.go +++ b/modules/eventbus/cloudevents_decode.go @@ -7,20 +7,6 @@ import ( "time" ) -// cloudEventEnvelope is a lightweight representation of a CloudEvents v1.0 -// JSON envelope. Only the required and commonly-used attributes are given -// dedicated fields; extension attributes are captured separately. -type cloudEventEnvelope struct { - SpecVersion string `json:"specversion"` - Type string `json:"type"` - Source string `json:"source"` - ID string `json:"id"` - Time string `json:"time,omitempty"` - DataContentType string `json:"datacontenttype,omitempty"` - Data json.RawMessage `json:"data,omitempty"` - Subject string `json:"subject,omitempty"` -} - // knownCloudEventKeys are the CloudEvents spec-defined keys that have // dedicated handling. Anything else is treated as an extension attribute. var knownCloudEventKeys = map[string]bool{ @@ -35,18 +21,23 @@ var knownCloudEventKeys = map[string]bool{ "subject": true, } -// isCloudEvent checks whether raw JSON contains a CloudEvents envelope -// by probing for the required "specversion" key. -func isCloudEvent(raw json.RawMessage) bool { - var probe map[string]json.RawMessage - if err := json.Unmarshal(raw, &probe); err != nil { - return false +// extractString extracts a JSON string value from a pre-parsed map. +// Returns ("", false) if the key is absent or the value is not a JSON string. +func extractString(m map[string]json.RawMessage, key string) (string, bool) { + raw, ok := m[key] + if !ok { + return "", false + } + var s string + if err := json.Unmarshal(raw, &s); err != nil { + return "", false } - _, ok := probe["specversion"] - return ok + return s, true } -// parseCloudEvent maps a CloudEvents JSON envelope to an eventbus.Event. +// parseCloudEvent maps a pre-parsed CloudEvents JSON map to an eventbus.Event. +// The caller is expected to have already unmarshalled the raw bytes into the +// map, so this function performs no redundant decoding. // // Mapping: // - type → Event.Topic @@ -54,32 +45,31 @@ func isCloudEvent(raw json.RawMessage) bool { // - time → Event.CreatedAt (RFC3339; falls back to time.Now()) // - specversion, source, id, datacontenttype, subject, and all extension // attributes → Event.Metadata (prefixed with "ce_") -func parseCloudEvent(raw json.RawMessage) (Event, error) { - var ce cloudEventEnvelope - if err := json.Unmarshal(raw, &ce); err != nil { - return Event{}, fmt.Errorf("failed to parse CloudEvent envelope: %w", err) - } - - if ce.SpecVersion == "" { +func parseCloudEvent(m map[string]json.RawMessage) (Event, error) { + specversion, ok := extractString(m, "specversion") + if !ok || specversion == "" { return Event{}, fmt.Errorf("CloudEvent missing required 'specversion' attribute") } - if ce.Type == "" { + ceType, ok := extractString(m, "type") + if !ok || ceType == "" { return Event{}, fmt.Errorf("CloudEvent missing required 'type' attribute") } - if ce.Source == "" { + source, ok := extractString(m, "source") + if !ok || source == "" { return Event{}, fmt.Errorf("CloudEvent missing required 'source' attribute") } - if ce.ID == "" { + id, ok := extractString(m, "id") + if !ok || id == "" { return Event{}, fmt.Errorf("CloudEvent missing required 'id' attribute") } var createdAt time.Time - if ce.Time != "" { + if timeStr, hasTime := extractString(m, "time"); hasTime && timeStr != "" { var err error - createdAt, err = time.Parse(time.RFC3339, ce.Time) + createdAt, err = time.Parse(time.RFC3339, timeStr) if err != nil { slog.Warn("CloudEvent has unparseable 'time' attribute, using current time", - "time", ce.Time, "error", err) + "time", timeStr, "error", err) createdAt = time.Now() } } else { @@ -87,30 +77,25 @@ func parseCloudEvent(raw json.RawMessage) (Event, error) { } var payload interface{} - if len(ce.Data) > 0 && string(ce.Data) != "null" { - if err := json.Unmarshal(ce.Data, &payload); err != nil { + if data, hasData := m["data"]; hasData && len(data) > 0 && string(data) != "null" { + if err := json.Unmarshal(data, &payload); err != nil { return Event{}, fmt.Errorf("failed to parse CloudEvent 'data' field: %w", err) } } // Build metadata from known attributes and extension attributes. - var fullMap map[string]json.RawMessage - if err := json.Unmarshal(raw, &fullMap); err != nil { - return Event{}, fmt.Errorf("failed to parse CloudEvent for extensions: %w", err) - } - metadata := make(map[string]interface{}) - metadata["ce_specversion"] = ce.SpecVersion - metadata["ce_source"] = ce.Source - metadata["ce_id"] = ce.ID - if ce.DataContentType != "" { - metadata["ce_datacontenttype"] = ce.DataContentType + metadata["ce_specversion"] = specversion + metadata["ce_source"] = source + metadata["ce_id"] = id + if dct, ok := extractString(m, "datacontenttype"); ok { + metadata["ce_datacontenttype"] = dct } - if ce.Subject != "" { - metadata["ce_subject"] = ce.Subject + if subj, ok := extractString(m, "subject"); ok { + metadata["ce_subject"] = subj } - for key, val := range fullMap { + for key, val := range m { if knownCloudEventKeys[key] { continue } @@ -123,7 +108,7 @@ func parseCloudEvent(raw json.RawMessage) (Event, error) { } return Event{ - Topic: ce.Type, + Topic: ceType, Payload: payload, Metadata: metadata, CreatedAt: createdAt, @@ -132,10 +117,17 @@ func parseCloudEvent(raw json.RawMessage) (Event, error) { // parseRecord attempts to parse raw JSON as either a CloudEvents envelope // or a native eventbus.Event. This is the entry point used by engine -// deserialization paths. +// deserialization paths. It performs a single JSON unmarshal into a generic +// map; if the map contains "specversion" the record is treated as a +// CloudEvent, otherwise it falls back to native Event deserialization. func parseRecord(raw []byte) (Event, error) { - if isCloudEvent(json.RawMessage(raw)) { - return parseCloudEvent(json.RawMessage(raw)) + var m map[string]json.RawMessage + if err := json.Unmarshal(raw, &m); err != nil { + return Event{}, fmt.Errorf("failed to deserialize record: %w", err) + } + + if _, ok := m["specversion"]; ok { + return parseCloudEvent(m) } var event Event diff --git a/modules/eventbus/cloudevents_decode_test.go b/modules/eventbus/cloudevents_decode_test.go index ea327054..350bb960 100644 --- a/modules/eventbus/cloudevents_decode_test.go +++ b/modules/eventbus/cloudevents_decode_test.go @@ -9,48 +9,40 @@ import ( "github.com/stretchr/testify/require" ) -func TestIsCloudEvent(t *testing.T) { +// helper to unmarshal a JSON string into the map that parseCloudEvent expects. +func ceMap(t *testing.T, raw string) map[string]json.RawMessage { + t.Helper() + var m map[string]json.RawMessage + require.NoError(t, json.Unmarshal([]byte(raw), &m)) + return m +} + +func TestExtractString(t *testing.T) { t.Parallel() - tests := []struct { - name string - input string - expected bool - }{ - { - name: "valid CloudEvent with specversion", - input: `{"specversion":"1.0","type":"test","source":"test","id":"1"}`, - expected: true, - }, - { - name: "native Event without specversion", - input: `{"topic":"user.created","payload":{"id":"123"},"createdAt":"2026-01-01T00:00:00Z"}`, - expected: false, - }, - { - name: "empty JSON object", - input: `{}`, - expected: false, - }, - { - name: "invalid JSON", - input: `not json`, - expected: false, - }, - { - name: "array not object", - input: `[1,2,3]`, - expected: false, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - result := isCloudEvent(json.RawMessage(tt.input)) - assert.Equal(t, tt.expected, result) - }) - } + t.Run("key present and string", func(t *testing.T) { + t.Parallel() + m := map[string]json.RawMessage{"k": json.RawMessage(`"hello"`)} + v, ok := extractString(m, "k") + assert.True(t, ok) + assert.Equal(t, "hello", v) + }) + + t.Run("key absent", func(t *testing.T) { + t.Parallel() + m := map[string]json.RawMessage{} + v, ok := extractString(m, "k") + assert.False(t, ok) + assert.Equal(t, "", v) + }) + + t.Run("key present but not a string", func(t *testing.T) { + t.Parallel() + m := map[string]json.RawMessage{"k": json.RawMessage(`123`)} + v, ok := extractString(m, "k") + assert.False(t, ok) + assert.Equal(t, "", v) + }) } func TestParseCloudEvent(t *testing.T) { @@ -58,7 +50,7 @@ func TestParseCloudEvent(t *testing.T) { t.Run("full CloudEvent with all fields", func(t *testing.T) { t.Parallel() - raw := json.RawMessage(`{ + m := ceMap(t, `{ "specversion": "1.0", "type": "conversations.conversation.started", "source": "platform.conversations", @@ -68,7 +60,7 @@ func TestParseCloudEvent(t *testing.T) { "data": {"id": "123", "texterId": "987", "keyword": "HELLO"} }`) - event, err := parseCloudEvent(raw) + event, err := parseCloudEvent(m) require.NoError(t, err) assert.Equal(t, "conversations.conversation.started", event.Topic) @@ -90,7 +82,7 @@ func TestParseCloudEvent(t *testing.T) { t.Run("CloudEvent with extension attributes", func(t *testing.T) { t.Parallel() - raw := json.RawMessage(`{ + m := ceMap(t, `{ "specversion": "1.0", "type": "user.created", "source": "user-service", @@ -99,7 +91,7 @@ func TestParseCloudEvent(t *testing.T) { "traceparent": "00-abc-def-01" }`) - event, err := parseCloudEvent(raw) + event, err := parseCloudEvent(m) require.NoError(t, err) assert.Equal(t, "user.created", event.Topic) @@ -109,7 +101,7 @@ func TestParseCloudEvent(t *testing.T) { t.Run("CloudEvent without time uses current time", func(t *testing.T) { t.Parallel() - raw := json.RawMessage(`{ + m := ceMap(t, `{ "specversion": "1.0", "type": "test.event", "source": "test", @@ -117,7 +109,7 @@ func TestParseCloudEvent(t *testing.T) { }`) before := time.Now() - event, err := parseCloudEvent(raw) + event, err := parseCloudEvent(m) after := time.Now() require.NoError(t, err) @@ -127,7 +119,7 @@ func TestParseCloudEvent(t *testing.T) { t.Run("CloudEvent with unparseable time falls back to now", func(t *testing.T) { t.Parallel() - raw := json.RawMessage(`{ + m := ceMap(t, `{ "specversion": "1.0", "type": "test.event", "source": "test", @@ -136,7 +128,7 @@ func TestParseCloudEvent(t *testing.T) { }`) before := time.Now() - event, err := parseCloudEvent(raw) + event, err := parseCloudEvent(m) after := time.Now() require.NoError(t, err) @@ -147,7 +139,7 @@ func TestParseCloudEvent(t *testing.T) { t.Run("CloudEvent with null data", func(t *testing.T) { t.Parallel() - raw := json.RawMessage(`{ + m := ceMap(t, `{ "specversion": "1.0", "type": "test.event", "source": "test", @@ -155,60 +147,112 @@ func TestParseCloudEvent(t *testing.T) { "data": null }`) - event, err := parseCloudEvent(raw) + event, err := parseCloudEvent(m) require.NoError(t, err) assert.Nil(t, event.Payload) }) t.Run("CloudEvent with no data field", func(t *testing.T) { t.Parallel() - raw := json.RawMessage(`{ + m := ceMap(t, `{ "specversion": "1.0", "type": "test.event", "source": "test", "id": "1" }`) - event, err := parseCloudEvent(raw) + event, err := parseCloudEvent(m) require.NoError(t, err) assert.Nil(t, event.Payload) }) t.Run("missing required type returns error", func(t *testing.T) { t.Parallel() - raw := json.RawMessage(`{"specversion": "1.0", "source": "test", "id": "1"}`) - _, err := parseCloudEvent(raw) + m := ceMap(t, `{"specversion": "1.0", "source": "test", "id": "1"}`) + _, err := parseCloudEvent(m) assert.Error(t, err) assert.Contains(t, err.Error(), "type") }) t.Run("missing required source returns error", func(t *testing.T) { t.Parallel() - raw := json.RawMessage(`{"specversion": "1.0", "type": "test", "id": "1"}`) - _, err := parseCloudEvent(raw) + m := ceMap(t, `{"specversion": "1.0", "type": "test", "id": "1"}`) + _, err := parseCloudEvent(m) assert.Error(t, err) assert.Contains(t, err.Error(), "source") }) t.Run("missing required id returns error", func(t *testing.T) { t.Parallel() - raw := json.RawMessage(`{"specversion": "1.0", "type": "test", "source": "test"}`) - _, err := parseCloudEvent(raw) + m := ceMap(t, `{"specversion": "1.0", "type": "test", "source": "test"}`) + _, err := parseCloudEvent(m) assert.Error(t, err) assert.Contains(t, err.Error(), "id") }) t.Run("missing required specversion returns error", func(t *testing.T) { t.Parallel() - raw := json.RawMessage(`{"type": "test", "source": "test", "id": "1"}`) - _, err := parseCloudEvent(raw) + m := ceMap(t, `{"type": "test", "source": "test", "id": "1"}`) + _, err := parseCloudEvent(m) assert.Error(t, err) assert.Contains(t, err.Error(), "specversion") }) + t.Run("non-string specversion returns error", func(t *testing.T) { + t.Parallel() + m := map[string]json.RawMessage{ + "specversion": json.RawMessage(`1.0`), + "type": json.RawMessage(`"test"`), + "source": json.RawMessage(`"src"`), + "id": json.RawMessage(`"1"`), + } + _, err := parseCloudEvent(m) + assert.Error(t, err) + assert.Contains(t, err.Error(), "specversion") + }) + + t.Run("non-string type returns error", func(t *testing.T) { + t.Parallel() + m := map[string]json.RawMessage{ + "specversion": json.RawMessage(`"1.0"`), + "type": json.RawMessage(`42`), + "source": json.RawMessage(`"src"`), + "id": json.RawMessage(`"1"`), + } + _, err := parseCloudEvent(m) + assert.Error(t, err) + assert.Contains(t, err.Error(), "type") + }) + + t.Run("non-string source returns error", func(t *testing.T) { + t.Parallel() + m := map[string]json.RawMessage{ + "specversion": json.RawMessage(`"1.0"`), + "type": json.RawMessage(`"test"`), + "source": json.RawMessage(`true`), + "id": json.RawMessage(`"1"`), + } + _, err := parseCloudEvent(m) + assert.Error(t, err) + assert.Contains(t, err.Error(), "source") + }) + + t.Run("non-string id returns error", func(t *testing.T) { + t.Parallel() + m := map[string]json.RawMessage{ + "specversion": json.RawMessage(`"1.0"`), + "type": json.RawMessage(`"test"`), + "source": json.RawMessage(`"src"`), + "id": json.RawMessage(`99`), + } + _, err := parseCloudEvent(m) + assert.Error(t, err) + assert.Contains(t, err.Error(), "id") + }) + t.Run("CloudEvent with subject attribute", func(t *testing.T) { t.Parallel() - raw := json.RawMessage(`{ + m := ceMap(t, `{ "specversion": "1.0", "type": "test.event", "source": "test", @@ -216,14 +260,14 @@ func TestParseCloudEvent(t *testing.T) { "subject": "resource-123" }`) - event, err := parseCloudEvent(raw) + event, err := parseCloudEvent(m) require.NoError(t, err) assert.Equal(t, "resource-123", event.Metadata["ce_subject"]) }) t.Run("CloudEvent with string data payload", func(t *testing.T) { t.Parallel() - raw := json.RawMessage(`{ + m := ceMap(t, `{ "specversion": "1.0", "type": "test.event", "source": "test", @@ -231,14 +275,14 @@ func TestParseCloudEvent(t *testing.T) { "data": "plain text payload" }`) - event, err := parseCloudEvent(raw) + event, err := parseCloudEvent(m) require.NoError(t, err) assert.Equal(t, "plain text payload", event.Payload) }) t.Run("CloudEvent with array data payload", func(t *testing.T) { t.Parallel() - raw := json.RawMessage(`{ + m := ceMap(t, `{ "specversion": "1.0", "type": "test.event", "source": "test", @@ -246,12 +290,40 @@ func TestParseCloudEvent(t *testing.T) { "data": [1, 2, 3] }`) - event, err := parseCloudEvent(raw) + event, err := parseCloudEvent(m) require.NoError(t, err) arr, ok := event.Payload.([]interface{}) require.True(t, ok) assert.Len(t, arr, 3) }) + + t.Run("invalid data field returns error", func(t *testing.T) { + t.Parallel() + m := map[string]json.RawMessage{ + "specversion": json.RawMessage(`"1.0"`), + "type": json.RawMessage(`"test"`), + "source": json.RawMessage(`"src"`), + "id": json.RawMessage(`"1"`), + "data": json.RawMessage(`{invalid`), + } + _, err := parseCloudEvent(m) + assert.Error(t, err) + assert.Contains(t, err.Error(), "data") + }) + + t.Run("extension with non-JSON value falls back to string", func(t *testing.T) { + t.Parallel() + m := map[string]json.RawMessage{ + "specversion": json.RawMessage(`"1.0"`), + "type": json.RawMessage(`"test"`), + "source": json.RawMessage(`"src"`), + "id": json.RawMessage(`"1"`), + "customext": json.RawMessage(`{bad json`), + } + event, err := parseCloudEvent(m) + require.NoError(t, err) + assert.Equal(t, "{bad json", event.Metadata["ce_customext"]) + }) } func TestParseRecord(t *testing.T) { @@ -303,6 +375,15 @@ func TestParseRecord(t *testing.T) { assert.Equal(t, "", event.Topic) }) + t.Run("valid JSON but invalid native Event returns error", func(t *testing.T) { + t.Parallel() + // createdAt must be a valid RFC3339 timestamp; a bare string triggers unmarshal error. + raw := []byte(`{"topic":"t","createdAt":"not-a-time"}`) + _, err := parseRecord(raw) + assert.Error(t, err) + assert.Contains(t, err.Error(), "failed to deserialize record") + }) + t.Run("native event with __topic metadata preserved", func(t *testing.T) { t.Parallel() raw := []byte(`{ From e7341e56c375d2ae288f6d70198e04da219cb5f3 Mon Sep 17 00:00:00 2001 From: Kristopher Chun Date: Mon, 9 Feb 2026 15:42:51 -0800 Subject: [PATCH 3/6] fix: address PR review feedback for CloudEvents decode - Add data_base64 handling per CloudEvents spec with JSON-aware decode when datacontenttype is application/json - Move Kafka parseRecord call before subscriber loop to deserialize once per message instead of once per subscription - Use sentinel errors for required attribute validation (err113) - Fix discarded time.Parse error in test --- modules/eventbus/cloudevents_decode.go | 36 ++++- modules/eventbus/cloudevents_decode_test.go | 141 +++++++++++++++++--- modules/eventbus/kafka.go | 16 +-- 3 files changed, 164 insertions(+), 29 deletions(-) diff --git a/modules/eventbus/cloudevents_decode.go b/modules/eventbus/cloudevents_decode.go index bc627cb0..2f68bd0b 100644 --- a/modules/eventbus/cloudevents_decode.go +++ b/modules/eventbus/cloudevents_decode.go @@ -1,12 +1,22 @@ package eventbus import ( + "encoding/base64" "encoding/json" + "errors" "fmt" "log/slog" "time" ) +// Sentinel errors for CloudEvent validation. +var ( + ErrCloudEventMissingSpecVersion = errors.New("CloudEvent missing required 'specversion' attribute") + ErrCloudEventMissingType = errors.New("CloudEvent missing required 'type' attribute") + ErrCloudEventMissingSource = errors.New("CloudEvent missing required 'source' attribute") + ErrCloudEventMissingID = errors.New("CloudEvent missing required 'id' attribute") +) + // knownCloudEventKeys are the CloudEvents spec-defined keys that have // dedicated handling. Anything else is treated as an extension attribute. var knownCloudEventKeys = map[string]bool{ @@ -48,19 +58,19 @@ func extractString(m map[string]json.RawMessage, key string) (string, bool) { func parseCloudEvent(m map[string]json.RawMessage) (Event, error) { specversion, ok := extractString(m, "specversion") if !ok || specversion == "" { - return Event{}, fmt.Errorf("CloudEvent missing required 'specversion' attribute") + return Event{}, ErrCloudEventMissingSpecVersion } ceType, ok := extractString(m, "type") if !ok || ceType == "" { - return Event{}, fmt.Errorf("CloudEvent missing required 'type' attribute") + return Event{}, ErrCloudEventMissingType } source, ok := extractString(m, "source") if !ok || source == "" { - return Event{}, fmt.Errorf("CloudEvent missing required 'source' attribute") + return Event{}, ErrCloudEventMissingSource } id, ok := extractString(m, "id") if !ok || id == "" { - return Event{}, fmt.Errorf("CloudEvent missing required 'id' attribute") + return Event{}, ErrCloudEventMissingID } var createdAt time.Time @@ -81,6 +91,24 @@ func parseCloudEvent(m map[string]json.RawMessage) (Event, error) { if err := json.Unmarshal(data, &payload); err != nil { return Event{}, fmt.Errorf("failed to parse CloudEvent 'data' field: %w", err) } + } else if dataB64, hasB64 := m["data_base64"]; hasB64 && len(dataB64) > 0 && string(dataB64) != "null" { + var b64str string + if err := json.Unmarshal(dataB64, &b64str); err != nil { + return Event{}, fmt.Errorf("failed to parse CloudEvent 'data_base64' field: %w", err) + } + decoded, err := base64.StdEncoding.DecodeString(b64str) + if err != nil { + return Event{}, fmt.Errorf("failed to base64-decode CloudEvent 'data_base64' field: %w", err) + } + // If datacontenttype indicates JSON, unmarshal the decoded bytes. + dct, _ := extractString(m, "datacontenttype") + if dct == "application/json" { + if err := json.Unmarshal(decoded, &payload); err != nil { + return Event{}, fmt.Errorf("failed to parse CloudEvent 'data_base64' JSON content: %w", err) + } + } else { + payload = decoded + } } // Build metadata from known attributes and extension attributes. diff --git a/modules/eventbus/cloudevents_decode_test.go b/modules/eventbus/cloudevents_decode_test.go index 350bb960..f17fc31e 100644 --- a/modules/eventbus/cloudevents_decode_test.go +++ b/modules/eventbus/cloudevents_decode_test.go @@ -70,7 +70,8 @@ func TestParseCloudEvent(t *testing.T) { assert.Equal(t, "evt-ff9745302bb23718d9da693c", event.Metadata["ce_id"]) assert.Equal(t, "application/json", event.Metadata["ce_datacontenttype"]) - expectedTime, _ := time.Parse(time.RFC3339, "2026-02-06T23:02:35+00:00") + expectedTime, err := time.Parse(time.RFC3339, "2026-02-06T23:02:35+00:00") + require.NoError(t, err) assert.Equal(t, expectedTime, event.CreatedAt) payloadMap, ok := event.Payload.(map[string]interface{}) @@ -170,32 +171,28 @@ func TestParseCloudEvent(t *testing.T) { t.Parallel() m := ceMap(t, `{"specversion": "1.0", "source": "test", "id": "1"}`) _, err := parseCloudEvent(m) - assert.Error(t, err) - assert.Contains(t, err.Error(), "type") + assert.ErrorIs(t, err, ErrCloudEventMissingType) }) t.Run("missing required source returns error", func(t *testing.T) { t.Parallel() m := ceMap(t, `{"specversion": "1.0", "type": "test", "id": "1"}`) _, err := parseCloudEvent(m) - assert.Error(t, err) - assert.Contains(t, err.Error(), "source") + assert.ErrorIs(t, err, ErrCloudEventMissingSource) }) t.Run("missing required id returns error", func(t *testing.T) { t.Parallel() m := ceMap(t, `{"specversion": "1.0", "type": "test", "source": "test"}`) _, err := parseCloudEvent(m) - assert.Error(t, err) - assert.Contains(t, err.Error(), "id") + assert.ErrorIs(t, err, ErrCloudEventMissingID) }) t.Run("missing required specversion returns error", func(t *testing.T) { t.Parallel() m := ceMap(t, `{"type": "test", "source": "test", "id": "1"}`) _, err := parseCloudEvent(m) - assert.Error(t, err) - assert.Contains(t, err.Error(), "specversion") + assert.ErrorIs(t, err, ErrCloudEventMissingSpecVersion) }) t.Run("non-string specversion returns error", func(t *testing.T) { @@ -207,8 +204,7 @@ func TestParseCloudEvent(t *testing.T) { "id": json.RawMessage(`"1"`), } _, err := parseCloudEvent(m) - assert.Error(t, err) - assert.Contains(t, err.Error(), "specversion") + assert.ErrorIs(t, err, ErrCloudEventMissingSpecVersion) }) t.Run("non-string type returns error", func(t *testing.T) { @@ -220,8 +216,7 @@ func TestParseCloudEvent(t *testing.T) { "id": json.RawMessage(`"1"`), } _, err := parseCloudEvent(m) - assert.Error(t, err) - assert.Contains(t, err.Error(), "type") + assert.ErrorIs(t, err, ErrCloudEventMissingType) }) t.Run("non-string source returns error", func(t *testing.T) { @@ -233,8 +228,7 @@ func TestParseCloudEvent(t *testing.T) { "id": json.RawMessage(`"1"`), } _, err := parseCloudEvent(m) - assert.Error(t, err) - assert.Contains(t, err.Error(), "source") + assert.ErrorIs(t, err, ErrCloudEventMissingSource) }) t.Run("non-string id returns error", func(t *testing.T) { @@ -246,8 +240,7 @@ func TestParseCloudEvent(t *testing.T) { "id": json.RawMessage(`99`), } _, err := parseCloudEvent(m) - assert.Error(t, err) - assert.Contains(t, err.Error(), "id") + assert.ErrorIs(t, err, ErrCloudEventMissingID) }) t.Run("CloudEvent with subject attribute", func(t *testing.T) { @@ -297,6 +290,120 @@ func TestParseCloudEvent(t *testing.T) { assert.Len(t, arr, 3) }) + t.Run("CloudEvent with data_base64 binary payload", func(t *testing.T) { + t.Parallel() + // "SGVsbG8gV29ybGQ=" is base64 for "Hello World" + m := ceMap(t, `{ + "specversion": "1.0", + "type": "test.event", + "source": "test", + "id": "1", + "data_base64": "SGVsbG8gV29ybGQ=" + }`) + + event, err := parseCloudEvent(m) + require.NoError(t, err) + assert.Equal(t, []byte("Hello World"), event.Payload) + }) + + t.Run("CloudEvent with data_base64 JSON payload", func(t *testing.T) { + t.Parallel() + // base64 of `{"key":"value"}` + m := ceMap(t, `{ + "specversion": "1.0", + "type": "test.event", + "source": "test", + "id": "1", + "datacontenttype": "application/json", + "data_base64": "eyJrZXkiOiJ2YWx1ZSJ9" + }`) + + event, err := parseCloudEvent(m) + require.NoError(t, err) + payloadMap, ok := event.Payload.(map[string]interface{}) + require.True(t, ok) + assert.Equal(t, "value", payloadMap["key"]) + }) + + t.Run("CloudEvent with invalid data_base64 returns error", func(t *testing.T) { + t.Parallel() + m := ceMap(t, `{ + "specversion": "1.0", + "type": "test.event", + "source": "test", + "id": "1", + "data_base64": "!!!not-base64!!!" + }`) + + _, err := parseCloudEvent(m) + assert.Error(t, err) + assert.Contains(t, err.Error(), "data_base64") + }) + + t.Run("CloudEvent with non-string data_base64 returns error", func(t *testing.T) { + t.Parallel() + m := map[string]json.RawMessage{ + "specversion": json.RawMessage(`"1.0"`), + "type": json.RawMessage(`"test"`), + "source": json.RawMessage(`"src"`), + "id": json.RawMessage(`"1"`), + "data_base64": json.RawMessage(`12345`), + } + _, err := parseCloudEvent(m) + assert.Error(t, err) + assert.Contains(t, err.Error(), "data_base64") + }) + + t.Run("CloudEvent with data_base64 invalid JSON content returns error", func(t *testing.T) { + t.Parallel() + // base64 of "not json" = "bm90IGpzb24=" + m := ceMap(t, `{ + "specversion": "1.0", + "type": "test.event", + "source": "test", + "id": "1", + "datacontenttype": "application/json", + "data_base64": "bm90IGpzb24=" + }`) + + _, err := parseCloudEvent(m) + assert.Error(t, err) + assert.Contains(t, err.Error(), "data_base64") + }) + + t.Run("data takes precedence over data_base64", func(t *testing.T) { + t.Parallel() + m := ceMap(t, `{ + "specversion": "1.0", + "type": "test.event", + "source": "test", + "id": "1", + "data": {"from": "data"}, + "data_base64": "SGVsbG8=" + }`) + + event, err := parseCloudEvent(m) + require.NoError(t, err) + payloadMap, ok := event.Payload.(map[string]interface{}) + require.True(t, ok) + assert.Equal(t, "data", payloadMap["from"]) + }) + + t.Run("CloudEvent with null data_base64", func(t *testing.T) { + t.Parallel() + m := ceMap(t, `{ + "specversion": "1.0", + "type": "test.event", + "source": "test", + "id": "1", + "data_base64": null + }`) + + event, err := parseCloudEvent(m) + require.NoError(t, err) + assert.Nil(t, event.Payload) + }) + t.Run("invalid data field returns error", func(t *testing.T) { t.Parallel() m := map[string]json.RawMessage{ diff --git a/modules/eventbus/kafka.go b/modules/eventbus/kafka.go index f7433d98..e72f556c 100644 --- a/modules/eventbus/kafka.go +++ b/modules/eventbus/kafka.go @@ -115,16 +115,16 @@ func (h *KafkaConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSes } h.mutex.RUnlock() + // Deserialize once per message, reuse for all matching subscriptions + event, err := parseRecord(msg.Value) + if err != nil { + slog.Error("Failed to deserialize Kafka message", "error", err, "topic", msg.Topic) + session.MarkMessage(msg, "") + continue + } + // Process message for each matching subscription for _, sub := range subs { - // Deserialize event - event, err := parseRecord(msg.Value) - if err != nil { - slog.Error("Failed to deserialize Kafka message", "error", err, "topic", msg.Topic) - continue - } - - // Process the event if sub.isAsync { go h.eventBus.processEventAsync(sub, event) } else { From aa04337c594eb46eeec73eecf30127e20e7b4a2e Mon Sep 17 00:00:00 2001 From: Kristopher Chun Date: Mon, 9 Feb 2026 15:56:19 -0800 Subject: [PATCH 4/6] fix: address second round of PR review feedback - Parse datacontenttype with mime.ParseMediaType to handle parameters like "application/json; charset=utf-8" - Preserve ce_type in metadata for full CloudEvent context reconstruction - Fix misleading parseRecord doc comment about single-unmarshal behavior --- modules/eventbus/cloudevents_decode.go | 28 ++++++++++++--- modules/eventbus/cloudevents_decode_test.go | 39 +++++++++++++++++++-- 2 files changed, 60 insertions(+), 7 deletions(-) diff --git a/modules/eventbus/cloudevents_decode.go b/modules/eventbus/cloudevents_decode.go index 2f68bd0b..9a5b4757 100644 --- a/modules/eventbus/cloudevents_decode.go +++ b/modules/eventbus/cloudevents_decode.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "log/slog" + "mime" "time" ) @@ -31,6 +32,21 @@ var knownCloudEventKeys = map[string]bool{ "subject": true, } +// isJSONContentType checks whether the datacontenttype attribute in a +// CloudEvent map indicates a JSON media type (e.g. "application/json", +// "application/json; charset=utf-8"). +func isJSONContentType(m map[string]json.RawMessage) bool { + dct, ok := extractString(m, "datacontenttype") + if !ok { + return false + } + mediaType, _, err := mime.ParseMediaType(dct) + if err != nil { + return false + } + return mediaType == "application/json" +} + // extractString extracts a JSON string value from a pre-parsed map. // Returns ("", false) if the key is absent or the value is not a JSON string. func extractString(m map[string]json.RawMessage, key string) (string, bool) { @@ -101,8 +117,8 @@ func parseCloudEvent(m map[string]json.RawMessage) (Event, error) { return Event{}, fmt.Errorf("failed to base64-decode CloudEvent 'data_base64' field: %w", err) } // If datacontenttype indicates JSON, unmarshal the decoded bytes. - dct, _ := extractString(m, "datacontenttype") - if dct == "application/json" { + // Use mime.ParseMediaType to handle parameters like charset. + if isJSONContentType(m) { if err := json.Unmarshal(decoded, &payload); err != nil { return Event{}, fmt.Errorf("failed to parse CloudEvent 'data_base64' JSON content: %w", err) } @@ -114,6 +130,7 @@ func parseCloudEvent(m map[string]json.RawMessage) (Event, error) { // Build metadata from known attributes and extension attributes. metadata := make(map[string]interface{}) metadata["ce_specversion"] = specversion + metadata["ce_type"] = ceType metadata["ce_source"] = source metadata["ce_id"] = id if dct, ok := extractString(m, "datacontenttype"); ok { @@ -145,9 +162,10 @@ func parseCloudEvent(m map[string]json.RawMessage) (Event, error) { // parseRecord attempts to parse raw JSON as either a CloudEvents envelope // or a native eventbus.Event. This is the entry point used by engine -// deserialization paths. It performs a single JSON unmarshal into a generic -// map; if the map contains "specversion" the record is treated as a -// CloudEvent, otherwise it falls back to native Event deserialization. +// deserialization paths. It first unmarshals into a generic map to probe +// for the "specversion" key; CloudEvents are then decoded from that map +// in a single pass, while native Events require a second unmarshal into +// the Event struct. func parseRecord(raw []byte) (Event, error) { var m map[string]json.RawMessage if err := json.Unmarshal(raw, &m); err != nil { diff --git a/modules/eventbus/cloudevents_decode_test.go b/modules/eventbus/cloudevents_decode_test.go index f17fc31e..3edfda97 100644 --- a/modules/eventbus/cloudevents_decode_test.go +++ b/modules/eventbus/cloudevents_decode_test.go @@ -45,6 +45,40 @@ func TestExtractString(t *testing.T) { }) } +func TestIsJSONContentType(t *testing.T) { + t.Parallel() + + t.Run("application/json", func(t *testing.T) { + t.Parallel() + m := map[string]json.RawMessage{"datacontenttype": json.RawMessage(`"application/json"`)} + assert.True(t, isJSONContentType(m)) + }) + + t.Run("application/json with charset", func(t *testing.T) { + t.Parallel() + m := map[string]json.RawMessage{"datacontenttype": json.RawMessage(`"application/json; charset=utf-8"`)} + assert.True(t, isJSONContentType(m)) + }) + + t.Run("non-JSON content type", func(t *testing.T) { + t.Parallel() + m := map[string]json.RawMessage{"datacontenttype": json.RawMessage(`"text/plain"`)} + assert.False(t, isJSONContentType(m)) + }) + + t.Run("missing datacontenttype", func(t *testing.T) { + t.Parallel() + m := map[string]json.RawMessage{} + assert.False(t, isJSONContentType(m)) + }) + + t.Run("invalid media type", func(t *testing.T) { + t.Parallel() + m := map[string]json.RawMessage{"datacontenttype": json.RawMessage(`";;;invalid"`)} + assert.False(t, isJSONContentType(m)) + }) +} + func TestParseCloudEvent(t *testing.T) { t.Parallel() @@ -66,6 +100,7 @@ func TestParseCloudEvent(t *testing.T) { assert.Equal(t, "conversations.conversation.started", event.Topic) assert.NotNil(t, event.Payload) assert.Equal(t, "1.0", event.Metadata["ce_specversion"]) + assert.Equal(t, "conversations.conversation.started", event.Metadata["ce_type"]) assert.Equal(t, "platform.conversations", event.Metadata["ce_source"]) assert.Equal(t, "evt-ff9745302bb23718d9da693c", event.Metadata["ce_id"]) assert.Equal(t, "application/json", event.Metadata["ce_datacontenttype"]) @@ -308,13 +343,13 @@ func TestParseCloudEvent(t *testing.T) { t.Run("CloudEvent with data_base64 JSON payload", func(t *testing.T) { t.Parallel() - // base64 of `{"key":"value"}` + // base64 of `{"key":"value"}`, with charset parameter in content type m := ceMap(t, `{ "specversion": "1.0", "type": "test.event", "source": "test", "id": "1", - "datacontenttype": "application/json", + "datacontenttype": "application/json; charset=utf-8", "data_base64": "eyJrZXkiOiJ2YWx1ZSJ9" }`) From 6cae8448f11bd15f87aefbb9e5094ee07f13d1b8 Mon Sep 17 00:00:00 2001 From: Kristopher Chun Date: Tue, 10 Feb 2026 10:53:29 -0800 Subject: [PATCH 5/6] fix: improve CloudEvents spec compliance and add Kinesis integration tests Validate specversion is "1.0" (reject unknown versions), add dataschema to known keys, warn on data+data_base64 mutual exclusivity violation, and log warnings on extension attribute JSON parse failures. Add Kinesis readShard integration tests exercising CloudEvent and native Event deserialization through the mock KinesisClient. --- modules/eventbus/cloudevents_decode.go | 26 +- modules/eventbus/cloudevents_decode_test.go | 73 ++++- modules/eventbus/kinesis_test.go | 304 ++++++++++++++++++++ 3 files changed, 398 insertions(+), 5 deletions(-) diff --git a/modules/eventbus/cloudevents_decode.go b/modules/eventbus/cloudevents_decode.go index 9a5b4757..74fe76f8 100644 --- a/modules/eventbus/cloudevents_decode.go +++ b/modules/eventbus/cloudevents_decode.go @@ -12,10 +12,11 @@ import ( // Sentinel errors for CloudEvent validation. var ( - ErrCloudEventMissingSpecVersion = errors.New("CloudEvent missing required 'specversion' attribute") - ErrCloudEventMissingType = errors.New("CloudEvent missing required 'type' attribute") - ErrCloudEventMissingSource = errors.New("CloudEvent missing required 'source' attribute") - ErrCloudEventMissingID = errors.New("CloudEvent missing required 'id' attribute") + ErrCloudEventMissingSpecVersion = errors.New("CloudEvent missing required 'specversion' attribute") + ErrCloudEventUnsupportedSpecVersion = errors.New("CloudEvent has unsupported 'specversion' (expected \"1.0\")") + ErrCloudEventMissingType = errors.New("CloudEvent missing required 'type' attribute") + ErrCloudEventMissingSource = errors.New("CloudEvent missing required 'source' attribute") + ErrCloudEventMissingID = errors.New("CloudEvent missing required 'id' attribute") ) // knownCloudEventKeys are the CloudEvents spec-defined keys that have @@ -27,6 +28,7 @@ var knownCloudEventKeys = map[string]bool{ "id": true, "time": true, "datacontenttype": true, + "dataschema": true, "data": true, "data_base64": true, "subject": true, @@ -76,6 +78,9 @@ func parseCloudEvent(m map[string]json.RawMessage) (Event, error) { if !ok || specversion == "" { return Event{}, ErrCloudEventMissingSpecVersion } + if specversion != "1.0" { + return Event{}, fmt.Errorf("%w: got %q", ErrCloudEventUnsupportedSpecVersion, specversion) + } ceType, ok := extractString(m, "type") if !ok || ceType == "" { return Event{}, ErrCloudEventMissingType @@ -102,6 +107,14 @@ func parseCloudEvent(m map[string]json.RawMessage) (Event, error) { createdAt = time.Now() } + // Warn if both data and data_base64 are present (spec violation). + if rawData, hasData := m["data"]; hasData && len(rawData) > 0 && string(rawData) != "null" { + if rawB64, hasB64 := m["data_base64"]; hasB64 && len(rawB64) > 0 && string(rawB64) != "null" { + slog.Warn("CloudEvent contains both 'data' and 'data_base64' (spec violation); using 'data'", + "id", id, "source", source, "type", ceType) + } + } + var payload interface{} if data, hasData := m["data"]; hasData && len(data) > 0 && string(data) != "null" { if err := json.Unmarshal(data, &payload); err != nil { @@ -139,6 +152,9 @@ func parseCloudEvent(m map[string]json.RawMessage) (Event, error) { if subj, ok := extractString(m, "subject"); ok { metadata["ce_subject"] = subj } + if ds, ok := extractString(m, "dataschema"); ok { + metadata["ce_dataschema"] = ds + } for key, val := range m { if knownCloudEventKeys[key] { @@ -146,6 +162,8 @@ func parseCloudEvent(m map[string]json.RawMessage) (Event, error) { } var extVal interface{} if err := json.Unmarshal(val, &extVal); err != nil { + slog.Warn("CloudEvent extension attribute has non-JSON value, storing as raw string", + "key", key, "error", err) metadata["ce_"+key] = string(val) } else { metadata["ce_"+key] = extVal diff --git a/modules/eventbus/cloudevents_decode_test.go b/modules/eventbus/cloudevents_decode_test.go index 3edfda97..aed4baec 100644 --- a/modules/eventbus/cloudevents_decode_test.go +++ b/modules/eventbus/cloudevents_decode_test.go @@ -382,7 +382,7 @@ func TestParseCloudEvent(t *testing.T) { "type": json.RawMessage(`"test"`), "source": json.RawMessage(`"src"`), "id": json.RawMessage(`"1"`), - "data_base64": json.RawMessage(`12345`), + "data_base64": json.RawMessage(`12345`), } _, err := parseCloudEvent(m) assert.Error(t, err) @@ -453,6 +453,77 @@ func TestParseCloudEvent(t *testing.T) { assert.Contains(t, err.Error(), "data") }) + t.Run("unsupported specversion returns error", func(t *testing.T) { + t.Parallel() + m := ceMap(t, `{ + "specversion": "99.9", + "type": "test.event", + "source": "test", + "id": "1" + }`) + _, err := parseCloudEvent(m) + assert.ErrorIs(t, err, ErrCloudEventUnsupportedSpecVersion) + assert.Contains(t, err.Error(), `"99.9"`) + }) + + t.Run("specversion 1.0 is accepted", func(t *testing.T) { + t.Parallel() + m := ceMap(t, `{ + "specversion": "1.0", + "type": "test.event", + "source": "test", + "id": "1" + }`) + _, err := parseCloudEvent(m) + require.NoError(t, err) + }) + + t.Run("dataschema attribute is captured in metadata", func(t *testing.T) { + t.Parallel() + m := ceMap(t, `{ + "specversion": "1.0", + "type": "test.event", + "source": "test", + "id": "1", + "dataschema": "https://example.com/schema/v1" + }`) + event, err := parseCloudEvent(m) + require.NoError(t, err) + assert.Equal(t, "https://example.com/schema/v1", event.Metadata["ce_dataschema"]) + }) + + t.Run("dataschema is not treated as extension attribute", func(t *testing.T) { + t.Parallel() + m := ceMap(t, `{ + "specversion": "1.0", + "type": "test.event", + "source": "test", + "id": "1", + "dataschema": "https://example.com/schema/v1", + "customext": "val" + }`) + event, err := parseCloudEvent(m) + require.NoError(t, err) + // dataschema should be captured explicitly, not duplicated via extension loop + assert.Equal(t, "https://example.com/schema/v1", event.Metadata["ce_dataschema"]) + assert.Equal(t, "val", event.Metadata["ce_customext"]) + }) + + t.Run("data null with data_base64 present uses data_base64", func(t *testing.T) { + t.Parallel() + m := ceMap(t, `{ + "specversion": "1.0", + "type": "test.event", + "source": "test", + "id": "1", + "data": null, + "data_base64": "SGVsbG8=" + }`) + event, err := parseCloudEvent(m) + require.NoError(t, err) + assert.Equal(t, []byte("Hello"), event.Payload) + }) + t.Run("extension with non-JSON value falls back to string", func(t *testing.T) { t.Parallel() m := map[string]json.RawMessage{ diff --git a/modules/eventbus/kinesis_test.go b/modules/eventbus/kinesis_test.go index aeb3ef31..d73ecd46 100644 --- a/modules/eventbus/kinesis_test.go +++ b/modules/eventbus/kinesis_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/aws/aws-sdk-go-v2/service/kinesis" + "github.com/aws/aws-sdk-go-v2/service/kinesis/types" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" @@ -254,3 +255,306 @@ func TestKinesisPublishNotStarted(t *testing.T) { err := bus.Publish(context.Background(), Event{Topic: "test", Payload: "data"}) assert.ErrorIs(t, err, ErrEventBusNotStarted) } + +// --- ReadShard integration tests: CloudEvents deserialization via mock Kinesis --- + +// newReadShardTestBus creates a test bus with subscriptions wired directly +// (bypasses Subscribe to avoid starting shard reader goroutines). +func newReadShardTestBus(t *testing.T, client KinesisClient, subs map[string]map[string]*kinesisSubscription) *KinesisEventBus { + t.Helper() + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + return &KinesisEventBus{ + config: &KinesisConfig{ + StreamName: "test-stream", + ShardCount: 1, + }, + client: client, + subscriptions: subs, + ctx: ctx, + cancel: cancel, + isStarted: true, + } +} + +func TestKinesisReadShardCloudEvent(t *testing.T) { + ctrl := gomock.NewController(t) + mockClient := mocks.NewMockKinesisClient(ctrl) + + received := make(chan Event, 1) + subs := map[string]map[string]*kinesisSubscription{ + "order.placed": { + "test-sub": { + id: "test-sub", + topic: "order.placed", + handler: func(ctx context.Context, event Event) error { + received <- event + return nil + }, + done: make(chan struct{}), + }, + }, + } + bus := newReadShardTestBus(t, mockClient, subs) + subs["order.placed"]["test-sub"].bus = bus + + ceJSON := []byte(`{ + "specversion": "1.0", + "type": "order.placed", + "source": "order-service", + "id": "evt-001", + "time": "2026-02-06T12:00:00Z", + "data": {"orderId": "abc-123"} + }`) + + iteratorStr := "shard-iter-1" + mockClient.EXPECT(). + GetShardIterator(gomock.Any(), gomock.Any()). + Return(&kinesis.GetShardIteratorOutput{ShardIterator: &iteratorStr}, nil) + + // Return records then nil iterator to terminate the loop. + mockClient.EXPECT(). + GetRecords(gomock.Any(), gomock.Any()). + Return(&kinesis.GetRecordsOutput{ + Records: []types.Record{{Data: ceJSON}}, + NextShardIterator: nil, + }, nil) + + bus.readShard("shard-0") + + select { + case event := <-received: + assert.Equal(t, "order.placed", event.Topic) + assert.Equal(t, "1.0", event.Metadata["ce_specversion"]) + assert.Equal(t, "order-service", event.Metadata["ce_source"]) + assert.Equal(t, "evt-001", event.Metadata["ce_id"]) + payloadMap, ok := event.Payload.(map[string]interface{}) + require.True(t, ok) + assert.Equal(t, "abc-123", payloadMap["orderId"]) + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for event") + } +} + +func TestKinesisReadShardNativeEvent(t *testing.T) { + ctrl := gomock.NewController(t) + mockClient := mocks.NewMockKinesisClient(ctrl) + + received := make(chan Event, 1) + subs := map[string]map[string]*kinesisSubscription{ + "user.created": { + "test-sub": { + id: "test-sub", + topic: "user.created", + handler: func(ctx context.Context, event Event) error { + received <- event + return nil + }, + done: make(chan struct{}), + }, + }, + } + bus := newReadShardTestBus(t, mockClient, subs) + subs["user.created"]["test-sub"].bus = bus + + nativeJSON := []byte(`{ + "topic": "user.created", + "payload": {"userId": "u-789"}, + "metadata": {"__topic": "user.created"}, + "createdAt": "2026-01-15T10:00:00Z" + }`) + + iteratorStr := "shard-iter-1" + mockClient.EXPECT(). + GetShardIterator(gomock.Any(), gomock.Any()). + Return(&kinesis.GetShardIteratorOutput{ShardIterator: &iteratorStr}, nil) + + mockClient.EXPECT(). + GetRecords(gomock.Any(), gomock.Any()). + Return(&kinesis.GetRecordsOutput{ + Records: []types.Record{{Data: nativeJSON}}, + NextShardIterator: nil, + }, nil) + + bus.readShard("shard-0") + + select { + case event := <-received: + assert.Equal(t, "user.created", event.Topic) + payloadMap, ok := event.Payload.(map[string]interface{}) + require.True(t, ok) + assert.Equal(t, "u-789", payloadMap["userId"]) + assert.Equal(t, "user.created", event.Metadata["__topic"]) + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for event") + } +} + +func TestKinesisReadShardCloudEventBase64(t *testing.T) { + ctrl := gomock.NewController(t) + mockClient := mocks.NewMockKinesisClient(ctrl) + + received := make(chan Event, 1) + subs := map[string]map[string]*kinesisSubscription{ + "file.uploaded": { + "test-sub": { + id: "test-sub", + topic: "file.uploaded", + handler: func(ctx context.Context, event Event) error { + received <- event + return nil + }, + done: make(chan struct{}), + }, + }, + } + bus := newReadShardTestBus(t, mockClient, subs) + subs["file.uploaded"]["test-sub"].bus = bus + + // "SGVsbG8gV29ybGQ=" is base64 for "Hello World" + ceJSON := []byte(`{ + "specversion": "1.0", + "type": "file.uploaded", + "source": "storage-service", + "id": "evt-002", + "data_base64": "SGVsbG8gV29ybGQ=" + }`) + + iteratorStr := "shard-iter-1" + mockClient.EXPECT(). + GetShardIterator(gomock.Any(), gomock.Any()). + Return(&kinesis.GetShardIteratorOutput{ShardIterator: &iteratorStr}, nil) + + mockClient.EXPECT(). + GetRecords(gomock.Any(), gomock.Any()). + Return(&kinesis.GetRecordsOutput{ + Records: []types.Record{{Data: ceJSON}}, + NextShardIterator: nil, + }, nil) + + bus.readShard("shard-0") + + select { + case event := <-received: + assert.Equal(t, "file.uploaded", event.Topic) + assert.Equal(t, []byte("Hello World"), event.Payload) + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for event") + } +} + +func TestKinesisReadShardRejectsInvalidSpecversion(t *testing.T) { + ctrl := gomock.NewController(t) + mockClient := mocks.NewMockKinesisClient(ctrl) + + handlerCalled := make(chan struct{}, 1) + subs := map[string]map[string]*kinesisSubscription{ + "order.placed": { + "test-sub": { + id: "test-sub", + topic: "order.placed", + handler: func(ctx context.Context, event Event) error { + handlerCalled <- struct{}{} + return nil + }, + done: make(chan struct{}), + }, + }, + } + bus := newReadShardTestBus(t, mockClient, subs) + subs["order.placed"]["test-sub"].bus = bus + + badCE := []byte(`{ + "specversion": "99.9", + "type": "order.placed", + "source": "order-service", + "id": "evt-bad" + }`) + + iteratorStr := "shard-iter-1" + mockClient.EXPECT(). + GetShardIterator(gomock.Any(), gomock.Any()). + Return(&kinesis.GetShardIteratorOutput{ShardIterator: &iteratorStr}, nil) + + mockClient.EXPECT(). + GetRecords(gomock.Any(), gomock.Any()). + Return(&kinesis.GetRecordsOutput{ + Records: []types.Record{{Data: badCE}}, + NextShardIterator: nil, + }, nil) + + bus.readShard("shard-0") + + // Handler should NOT have been called for invalid specversion. + select { + case <-handlerCalled: + t.Fatal("handler should not have been called for invalid specversion") + case <-time.After(100 * time.Millisecond): + // Success: handler was not called. + } +} + +func TestKinesisReadShardMultipleRecords(t *testing.T) { + ctrl := gomock.NewController(t) + mockClient := mocks.NewMockKinesisClient(ctrl) + + orderReceived := make(chan Event, 1) + userReceived := make(chan Event, 1) + subs := map[string]map[string]*kinesisSubscription{ + "order.placed": { + "order-sub": { + id: "order-sub", + topic: "order.placed", + handler: func(ctx context.Context, e Event) error { + orderReceived <- e + return nil + }, + done: make(chan struct{}), + }, + }, + "user.created": { + "user-sub": { + id: "user-sub", + topic: "user.created", + handler: func(ctx context.Context, e Event) error { + userReceived <- e + return nil + }, + done: make(chan struct{}), + }, + }, + } + bus := newReadShardTestBus(t, mockClient, subs) + subs["order.placed"]["order-sub"].bus = bus + subs["user.created"]["user-sub"].bus = bus + + iteratorStr := "shard-iter-1" + mockClient.EXPECT(). + GetShardIterator(gomock.Any(), gomock.Any()). + Return(&kinesis.GetShardIteratorOutput{ShardIterator: &iteratorStr}, nil) + + mockClient.EXPECT(). + GetRecords(gomock.Any(), gomock.Any()). + Return(&kinesis.GetRecordsOutput{ + Records: []types.Record{ + {Data: []byte(`{"specversion":"1.0","type":"order.placed","source":"orders","id":"1","data":{"orderId":"o1"}}`)}, + {Data: []byte(`{"specversion":"1.0","type":"user.created","source":"users","id":"2","data":{"userId":"u1"}}`)}, + }, + NextShardIterator: nil, + }, nil) + + bus.readShard("shard-0") + + select { + case e := <-orderReceived: + assert.Equal(t, "order.placed", e.Topic) + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for order event") + } + select { + case e := <-userReceived: + assert.Equal(t, "user.created", e.Topic) + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for user event") + } +} From cf22350fba371f59b829bae7b718006cbf6ae29a Mon Sep 17 00:00:00 2001 From: Kristopher Chun Date: Tue, 10 Feb 2026 11:24:30 -0800 Subject: [PATCH 6/6] test: add Kafka ConsumeClaim integration tests Cover the deserialization and dispatch path in ConsumeClaim using lightweight test doubles for sarama.ConsumerGroupSession and ConsumerGroupClaim. Tests verify CloudEvent decode, native Event decode, invalid record rejection with offset commit, and multi- message batch dispatch to topic-matched subscribers. --- modules/eventbus/kafka_test.go | 245 +++++++++++++++++++++++++++++++++ 1 file changed, 245 insertions(+) diff --git a/modules/eventbus/kafka_test.go b/modules/eventbus/kafka_test.go index 2e5a4b16..f97c470a 100644 --- a/modules/eventbus/kafka_test.go +++ b/modules/eventbus/kafka_test.go @@ -282,3 +282,248 @@ func TestKafkaPublishNotStarted(t *testing.T) { err := bus.Publish(context.Background(), Event{Topic: "test", Payload: "data"}) assert.ErrorIs(t, err, ErrEventBusNotStarted) } + +// --- ConsumeClaim integration tests: CloudEvents deserialization via Kafka --- + +// testConsumerGroupSession implements sarama.ConsumerGroupSession for tests. +type testConsumerGroupSession struct { + ctx context.Context + markedMsgs []*sarama.ConsumerMessage +} + +func (s *testConsumerGroupSession) Claims() map[string][]int32 { return nil } +func (s *testConsumerGroupSession) MemberID() string { return "test-member" } +func (s *testConsumerGroupSession) GenerationID() int32 { return 1 } +func (s *testConsumerGroupSession) MarkOffset(_ string, _ int32, _ int64, _ string) {} +func (s *testConsumerGroupSession) Commit() {} +func (s *testConsumerGroupSession) ResetOffset(_ string, _ int32, _ int64, _ string) {} +func (s *testConsumerGroupSession) Context() context.Context { return s.ctx } +func (s *testConsumerGroupSession) MarkMessage(msg *sarama.ConsumerMessage, _ string) { + s.markedMsgs = append(s.markedMsgs, msg) +} + +// testConsumerGroupClaim implements sarama.ConsumerGroupClaim for tests. +type testConsumerGroupClaim struct { + messages chan *sarama.ConsumerMessage +} + +func (c *testConsumerGroupClaim) Topic() string { return "test-topic" } +func (c *testConsumerGroupClaim) Partition() int32 { return 0 } +func (c *testConsumerGroupClaim) InitialOffset() int64 { return 0 } +func (c *testConsumerGroupClaim) HighWaterMarkOffset() int64 { return 0 } +func (c *testConsumerGroupClaim) Messages() <-chan *sarama.ConsumerMessage { return c.messages } + +func TestKafkaConsumeClaimCloudEvent(t *testing.T) { + ctrl := gomock.NewController(t) + producer := mocks.NewMockSyncProducer(ctrl) + bus := newTestKafkaEventBus(producer) + defer bus.cancel() + + received := make(chan Event, 1) + handler := &KafkaConsumerGroupHandler{ + eventBus: bus, + subscriptions: map[string]*kafkaSubscription{ + "sub-1": { + id: "sub-1", + topic: "order.placed", + handler: func(ctx context.Context, event Event) error { + received <- event + return nil + }, + done: make(chan struct{}), + bus: bus, + }, + }, + } + + messages := make(chan *sarama.ConsumerMessage, 1) + messages <- &sarama.ConsumerMessage{ + Topic: "order.placed", + Value: []byte(`{"specversion":"1.0","type":"order.placed","source":"order-svc","id":"evt-1","data":{"orderId":"42"}}`), + } + close(messages) + + session := &testConsumerGroupSession{ctx: context.Background()} + claim := &testConsumerGroupClaim{messages: messages} + + err := handler.ConsumeClaim(session, claim) + require.NoError(t, err) + + select { + case event := <-received: + assert.Equal(t, "order.placed", event.Topic) + assert.Equal(t, "1.0", event.Metadata["ce_specversion"]) + assert.Equal(t, "order-svc", event.Metadata["ce_source"]) + payloadMap, ok := event.Payload.(map[string]interface{}) + require.True(t, ok) + assert.Equal(t, "42", payloadMap["orderId"]) + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for event") + } + + assert.Len(t, session.markedMsgs, 1) +} + +func TestKafkaConsumeClaimNativeEvent(t *testing.T) { + ctrl := gomock.NewController(t) + producer := mocks.NewMockSyncProducer(ctrl) + bus := newTestKafkaEventBus(producer) + defer bus.cancel() + + received := make(chan Event, 1) + handler := &KafkaConsumerGroupHandler{ + eventBus: bus, + subscriptions: map[string]*kafkaSubscription{ + "sub-1": { + id: "sub-1", + topic: "user.created", + handler: func(ctx context.Context, event Event) error { + received <- event + return nil + }, + done: make(chan struct{}), + bus: bus, + }, + }, + } + + messages := make(chan *sarama.ConsumerMessage, 1) + messages <- &sarama.ConsumerMessage{ + Topic: "user.created", + Value: []byte(`{"topic":"user.created","payload":{"userId":"u-789"},"metadata":{"__topic":"user.created"},"createdAt":"2026-01-15T10:00:00Z"}`), + } + close(messages) + + session := &testConsumerGroupSession{ctx: context.Background()} + claim := &testConsumerGroupClaim{messages: messages} + + err := handler.ConsumeClaim(session, claim) + require.NoError(t, err) + + select { + case event := <-received: + assert.Equal(t, "user.created", event.Topic) + payloadMap, ok := event.Payload.(map[string]interface{}) + require.True(t, ok) + assert.Equal(t, "u-789", payloadMap["userId"]) + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for event") + } + + assert.Len(t, session.markedMsgs, 1) +} + +func TestKafkaConsumeClaimRejectsInvalidRecord(t *testing.T) { + ctrl := gomock.NewController(t) + producer := mocks.NewMockSyncProducer(ctrl) + bus := newTestKafkaEventBus(producer) + defer bus.cancel() + + handlerCalled := make(chan struct{}, 1) + handler := &KafkaConsumerGroupHandler{ + eventBus: bus, + subscriptions: map[string]*kafkaSubscription{ + "sub-1": { + id: "sub-1", + topic: "order.placed", + handler: func(ctx context.Context, event Event) error { + handlerCalled <- struct{}{} + return nil + }, + done: make(chan struct{}), + bus: bus, + }, + }, + } + + messages := make(chan *sarama.ConsumerMessage, 1) + messages <- &sarama.ConsumerMessage{ + Topic: "order.placed", + Value: []byte(`not valid json`), + } + close(messages) + + session := &testConsumerGroupSession{ctx: context.Background()} + claim := &testConsumerGroupClaim{messages: messages} + + err := handler.ConsumeClaim(session, claim) + require.NoError(t, err) + + // Handler should NOT have been called. + select { + case <-handlerCalled: + t.Fatal("handler should not have been called for invalid record") + case <-time.After(100 * time.Millisecond): + // Success: handler was not called. + } + + // Message should still be marked (offset committed even on error). + assert.Len(t, session.markedMsgs, 1) +} + +func TestKafkaConsumeClaimMultipleMessages(t *testing.T) { + ctrl := gomock.NewController(t) + producer := mocks.NewMockSyncProducer(ctrl) + bus := newTestKafkaEventBus(producer) + defer bus.cancel() + + orderReceived := make(chan Event, 1) + userReceived := make(chan Event, 1) + handler := &KafkaConsumerGroupHandler{ + eventBus: bus, + subscriptions: map[string]*kafkaSubscription{ + "order-sub": { + id: "order-sub", + topic: "order.placed", + handler: func(ctx context.Context, e Event) error { + orderReceived <- e + return nil + }, + done: make(chan struct{}), + bus: bus, + }, + "user-sub": { + id: "user-sub", + topic: "user.created", + handler: func(ctx context.Context, e Event) error { + userReceived <- e + return nil + }, + done: make(chan struct{}), + bus: bus, + }, + }, + } + + messages := make(chan *sarama.ConsumerMessage, 2) + messages <- &sarama.ConsumerMessage{ + Topic: "order.placed", + Value: []byte(`{"specversion":"1.0","type":"order.placed","source":"orders","id":"1","data":{"orderId":"o1"}}`), + } + messages <- &sarama.ConsumerMessage{ + Topic: "user.created", + Value: []byte(`{"specversion":"1.0","type":"user.created","source":"users","id":"2","data":{"userId":"u1"}}`), + } + close(messages) + + session := &testConsumerGroupSession{ctx: context.Background()} + claim := &testConsumerGroupClaim{messages: messages} + + err := handler.ConsumeClaim(session, claim) + require.NoError(t, err) + + select { + case e := <-orderReceived: + assert.Equal(t, "order.placed", e.Topic) + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for order event") + } + select { + case e := <-userReceived: + assert.Equal(t, "user.created", e.Topic) + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for user event") + } + + assert.Len(t, session.markedMsgs, 2) +}