diff --git a/modules/eventbus/cloudevents_decode.go b/modules/eventbus/cloudevents_decode.go new file mode 100644 index 00000000..74fe76f8 --- /dev/null +++ b/modules/eventbus/cloudevents_decode.go @@ -0,0 +1,203 @@ +package eventbus + +import ( + "encoding/base64" + "encoding/json" + "errors" + "fmt" + "log/slog" + "mime" + "time" +) + +// Sentinel errors for CloudEvent validation. +var ( + ErrCloudEventMissingSpecVersion = errors.New("CloudEvent missing required 'specversion' attribute") + ErrCloudEventUnsupportedSpecVersion = errors.New("CloudEvent has unsupported 'specversion' (expected \"1.0\")") + ErrCloudEventMissingType = errors.New("CloudEvent missing required 'type' attribute") + ErrCloudEventMissingSource = errors.New("CloudEvent missing required 'source' attribute") + ErrCloudEventMissingID = errors.New("CloudEvent missing required 'id' attribute") +) + +// knownCloudEventKeys are the CloudEvents spec-defined keys that have +// dedicated handling. Anything else is treated as an extension attribute. +var knownCloudEventKeys = map[string]bool{ + "specversion": true, + "type": true, + "source": true, + "id": true, + "time": true, + "datacontenttype": true, + "dataschema": true, + "data": true, + "data_base64": true, + "subject": true, +} + +// isJSONContentType checks whether the datacontenttype attribute in a +// CloudEvent map indicates a JSON media type (e.g. "application/json", +// "application/json; charset=utf-8"). +func isJSONContentType(m map[string]json.RawMessage) bool { + dct, ok := extractString(m, "datacontenttype") + if !ok { + return false + } + mediaType, _, err := mime.ParseMediaType(dct) + if err != nil { + return false + } + return mediaType == "application/json" +} + +// extractString extracts a JSON string value from a pre-parsed map. +// Returns ("", false) if the key is absent or the value is not a JSON string. +func extractString(m map[string]json.RawMessage, key string) (string, bool) { + raw, ok := m[key] + if !ok { + return "", false + } + var s string + if err := json.Unmarshal(raw, &s); err != nil { + return "", false + } + return s, true +} + +// parseCloudEvent maps a pre-parsed CloudEvents JSON map to an eventbus.Event. +// The caller is expected to have already unmarshalled the raw bytes into the +// map, so this function performs no redundant decoding. +// +// Mapping: +// - type → Event.Topic +// - data → Event.Payload +// - time → Event.CreatedAt (RFC3339; falls back to time.Now()) +// - specversion, source, id, datacontenttype, subject, and all extension +// attributes → Event.Metadata (prefixed with "ce_") +func parseCloudEvent(m map[string]json.RawMessage) (Event, error) { + specversion, ok := extractString(m, "specversion") + if !ok || specversion == "" { + return Event{}, ErrCloudEventMissingSpecVersion + } + if specversion != "1.0" { + return Event{}, fmt.Errorf("%w: got %q", ErrCloudEventUnsupportedSpecVersion, specversion) + } + ceType, ok := extractString(m, "type") + if !ok || ceType == "" { + return Event{}, ErrCloudEventMissingType + } + source, ok := extractString(m, "source") + if !ok || source == "" { + return Event{}, ErrCloudEventMissingSource + } + id, ok := extractString(m, "id") + if !ok || id == "" { + return Event{}, ErrCloudEventMissingID + } + + var createdAt time.Time + if timeStr, hasTime := extractString(m, "time"); hasTime && timeStr != "" { + var err error + createdAt, err = time.Parse(time.RFC3339, timeStr) + if err != nil { + slog.Warn("CloudEvent has unparseable 'time' attribute, using current time", + "time", timeStr, "error", err) + createdAt = time.Now() + } + } else { + createdAt = time.Now() + } + + // Warn if both data and data_base64 are present (spec violation). + if rawData, hasData := m["data"]; hasData && len(rawData) > 0 && string(rawData) != "null" { + if rawB64, hasB64 := m["data_base64"]; hasB64 && len(rawB64) > 0 && string(rawB64) != "null" { + slog.Warn("CloudEvent contains both 'data' and 'data_base64' (spec violation); using 'data'", + "id", id, "source", source, "type", ceType) + } + } + + var payload interface{} + if data, hasData := m["data"]; hasData && len(data) > 0 && string(data) != "null" { + if err := json.Unmarshal(data, &payload); err != nil { + return Event{}, fmt.Errorf("failed to parse CloudEvent 'data' field: %w", err) + } + } else if dataB64, hasB64 := m["data_base64"]; hasB64 && len(dataB64) > 0 && string(dataB64) != "null" { + var b64str string + if err := json.Unmarshal(dataB64, &b64str); err != nil { + return Event{}, fmt.Errorf("failed to parse CloudEvent 'data_base64' field: %w", err) + } + decoded, err := base64.StdEncoding.DecodeString(b64str) + if err != nil { + return Event{}, fmt.Errorf("failed to base64-decode CloudEvent 'data_base64' field: %w", err) + } + // If datacontenttype indicates JSON, unmarshal the decoded bytes. + // Use mime.ParseMediaType to handle parameters like charset. + if isJSONContentType(m) { + if err := json.Unmarshal(decoded, &payload); err != nil { + return Event{}, fmt.Errorf("failed to parse CloudEvent 'data_base64' JSON content: %w", err) + } + } else { + payload = decoded + } + } + + // Build metadata from known attributes and extension attributes. + metadata := make(map[string]interface{}) + metadata["ce_specversion"] = specversion + metadata["ce_type"] = ceType + metadata["ce_source"] = source + metadata["ce_id"] = id + if dct, ok := extractString(m, "datacontenttype"); ok { + metadata["ce_datacontenttype"] = dct + } + if subj, ok := extractString(m, "subject"); ok { + metadata["ce_subject"] = subj + } + if ds, ok := extractString(m, "dataschema"); ok { + metadata["ce_dataschema"] = ds + } + + for key, val := range m { + if knownCloudEventKeys[key] { + continue + } + var extVal interface{} + if err := json.Unmarshal(val, &extVal); err != nil { + slog.Warn("CloudEvent extension attribute has non-JSON value, storing as raw string", + "key", key, "error", err) + metadata["ce_"+key] = string(val) + } else { + metadata["ce_"+key] = extVal + } + } + + return Event{ + Topic: ceType, + Payload: payload, + Metadata: metadata, + CreatedAt: createdAt, + }, nil +} + +// parseRecord attempts to parse raw JSON as either a CloudEvents envelope +// or a native eventbus.Event. This is the entry point used by engine +// deserialization paths. It first unmarshals into a generic map to probe +// for the "specversion" key; CloudEvents are then decoded from that map +// in a single pass, while native Events require a second unmarshal into +// the Event struct. +func parseRecord(raw []byte) (Event, error) { + var m map[string]json.RawMessage + if err := json.Unmarshal(raw, &m); err != nil { + return Event{}, fmt.Errorf("failed to deserialize record: %w", err) + } + + if _, ok := m["specversion"]; ok { + return parseCloudEvent(m) + } + + var event Event + if err := json.Unmarshal(raw, &event); err != nil { + return Event{}, fmt.Errorf("failed to deserialize record: %w", err) + } + + return event, nil +} diff --git a/modules/eventbus/cloudevents_decode_test.go b/modules/eventbus/cloudevents_decode_test.go new file mode 100644 index 00000000..aed4baec --- /dev/null +++ b/modules/eventbus/cloudevents_decode_test.go @@ -0,0 +1,614 @@ +package eventbus + +import ( + "encoding/json" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// helper to unmarshal a JSON string into the map that parseCloudEvent expects. +func ceMap(t *testing.T, raw string) map[string]json.RawMessage { + t.Helper() + var m map[string]json.RawMessage + require.NoError(t, json.Unmarshal([]byte(raw), &m)) + return m +} + +func TestExtractString(t *testing.T) { + t.Parallel() + + t.Run("key present and string", func(t *testing.T) { + t.Parallel() + m := map[string]json.RawMessage{"k": json.RawMessage(`"hello"`)} + v, ok := extractString(m, "k") + assert.True(t, ok) + assert.Equal(t, "hello", v) + }) + + t.Run("key absent", func(t *testing.T) { + t.Parallel() + m := map[string]json.RawMessage{} + v, ok := extractString(m, "k") + assert.False(t, ok) + assert.Equal(t, "", v) + }) + + t.Run("key present but not a string", func(t *testing.T) { + t.Parallel() + m := map[string]json.RawMessage{"k": json.RawMessage(`123`)} + v, ok := extractString(m, "k") + assert.False(t, ok) + assert.Equal(t, "", v) + }) +} + +func TestIsJSONContentType(t *testing.T) { + t.Parallel() + + t.Run("application/json", func(t *testing.T) { + t.Parallel() + m := map[string]json.RawMessage{"datacontenttype": json.RawMessage(`"application/json"`)} + assert.True(t, isJSONContentType(m)) + }) + + t.Run("application/json with charset", func(t *testing.T) { + t.Parallel() + m := map[string]json.RawMessage{"datacontenttype": json.RawMessage(`"application/json; charset=utf-8"`)} + assert.True(t, isJSONContentType(m)) + }) + + t.Run("non-JSON content type", func(t *testing.T) { + t.Parallel() + m := map[string]json.RawMessage{"datacontenttype": json.RawMessage(`"text/plain"`)} + assert.False(t, isJSONContentType(m)) + }) + + t.Run("missing datacontenttype", func(t *testing.T) { + t.Parallel() + m := map[string]json.RawMessage{} + assert.False(t, isJSONContentType(m)) + }) + + t.Run("invalid media type", func(t *testing.T) { + t.Parallel() + m := map[string]json.RawMessage{"datacontenttype": json.RawMessage(`";;;invalid"`)} + assert.False(t, isJSONContentType(m)) + }) +} + +func TestParseCloudEvent(t *testing.T) { + t.Parallel() + + t.Run("full CloudEvent with all fields", func(t *testing.T) { + t.Parallel() + m := ceMap(t, `{ + "specversion": "1.0", + "type": "conversations.conversation.started", + "source": "platform.conversations", + "id": "evt-ff9745302bb23718d9da693c", + "time": "2026-02-06T23:02:35+00:00", + "datacontenttype": "application/json", + "data": {"id": "123", "texterId": "987", "keyword": "HELLO"} + }`) + + event, err := parseCloudEvent(m) + require.NoError(t, err) + + assert.Equal(t, "conversations.conversation.started", event.Topic) + assert.NotNil(t, event.Payload) + assert.Equal(t, "1.0", event.Metadata["ce_specversion"]) + assert.Equal(t, "conversations.conversation.started", event.Metadata["ce_type"]) + assert.Equal(t, "platform.conversations", event.Metadata["ce_source"]) + assert.Equal(t, "evt-ff9745302bb23718d9da693c", event.Metadata["ce_id"]) + assert.Equal(t, "application/json", event.Metadata["ce_datacontenttype"]) + + expectedTime, err := time.Parse(time.RFC3339, "2026-02-06T23:02:35+00:00") + require.NoError(t, err) + assert.Equal(t, expectedTime, event.CreatedAt) + + payloadMap, ok := event.Payload.(map[string]interface{}) + require.True(t, ok) + assert.Equal(t, "123", payloadMap["id"]) + assert.Equal(t, "987", payloadMap["texterId"]) + assert.Equal(t, "HELLO", payloadMap["keyword"]) + }) + + t.Run("CloudEvent with extension attributes", func(t *testing.T) { + t.Parallel() + m := ceMap(t, `{ + "specversion": "1.0", + "type": "user.created", + "source": "user-service", + "id": "abc-123", + "tenantid": "tenant-456", + "traceparent": "00-abc-def-01" + }`) + + event, err := parseCloudEvent(m) + require.NoError(t, err) + + assert.Equal(t, "user.created", event.Topic) + assert.Equal(t, "tenant-456", event.Metadata["ce_tenantid"]) + assert.Equal(t, "00-abc-def-01", event.Metadata["ce_traceparent"]) + }) + + t.Run("CloudEvent without time uses current time", func(t *testing.T) { + t.Parallel() + m := ceMap(t, `{ + "specversion": "1.0", + "type": "test.event", + "source": "test", + "id": "1" + }`) + + before := time.Now() + event, err := parseCloudEvent(m) + after := time.Now() + require.NoError(t, err) + + assert.False(t, event.CreatedAt.Before(before)) + assert.False(t, event.CreatedAt.After(after)) + }) + + t.Run("CloudEvent with unparseable time falls back to now", func(t *testing.T) { + t.Parallel() + m := ceMap(t, `{ + "specversion": "1.0", + "type": "test.event", + "source": "test", + "id": "1", + "time": "not-a-timestamp" + }`) + + before := time.Now() + event, err := parseCloudEvent(m) + after := time.Now() + require.NoError(t, err) + + assert.False(t, event.CreatedAt.IsZero()) + assert.False(t, event.CreatedAt.Before(before)) + assert.False(t, event.CreatedAt.After(after)) + }) + + t.Run("CloudEvent with null data", func(t *testing.T) { + t.Parallel() + m := ceMap(t, `{ + "specversion": "1.0", + "type": "test.event", + "source": "test", + "id": "1", + "data": null + }`) + + event, err := parseCloudEvent(m) + require.NoError(t, err) + assert.Nil(t, event.Payload) + }) + + t.Run("CloudEvent with no data field", func(t *testing.T) { + t.Parallel() + m := ceMap(t, `{ + "specversion": "1.0", + "type": "test.event", + "source": "test", + "id": "1" + }`) + + event, err := parseCloudEvent(m) + require.NoError(t, err) + assert.Nil(t, event.Payload) + }) + + t.Run("missing required type returns error", func(t *testing.T) { + t.Parallel() + m := ceMap(t, `{"specversion": "1.0", "source": "test", "id": "1"}`) + _, err := parseCloudEvent(m) + assert.ErrorIs(t, err, ErrCloudEventMissingType) + }) + + t.Run("missing required source returns error", func(t *testing.T) { + t.Parallel() + m := ceMap(t, `{"specversion": "1.0", "type": "test", "id": "1"}`) + _, err := parseCloudEvent(m) + assert.ErrorIs(t, err, ErrCloudEventMissingSource) + }) + + t.Run("missing required id returns error", func(t *testing.T) { + t.Parallel() + m := ceMap(t, `{"specversion": "1.0", "type": "test", "source": "test"}`) + _, err := parseCloudEvent(m) + assert.ErrorIs(t, err, ErrCloudEventMissingID) + }) + + t.Run("missing required specversion returns error", func(t *testing.T) { + t.Parallel() + m := ceMap(t, `{"type": "test", "source": "test", "id": "1"}`) + _, err := parseCloudEvent(m) + assert.ErrorIs(t, err, ErrCloudEventMissingSpecVersion) + }) + + t.Run("non-string specversion returns error", func(t *testing.T) { + t.Parallel() + m := map[string]json.RawMessage{ + "specversion": json.RawMessage(`1.0`), + "type": json.RawMessage(`"test"`), + "source": json.RawMessage(`"src"`), + "id": json.RawMessage(`"1"`), + } + _, err := parseCloudEvent(m) + assert.ErrorIs(t, err, ErrCloudEventMissingSpecVersion) + }) + + t.Run("non-string type returns error", func(t *testing.T) { + t.Parallel() + m := map[string]json.RawMessage{ + "specversion": json.RawMessage(`"1.0"`), + "type": json.RawMessage(`42`), + "source": json.RawMessage(`"src"`), + "id": json.RawMessage(`"1"`), + } + _, err := parseCloudEvent(m) + assert.ErrorIs(t, err, ErrCloudEventMissingType) + }) + + t.Run("non-string source returns error", func(t *testing.T) { + t.Parallel() + m := map[string]json.RawMessage{ + "specversion": json.RawMessage(`"1.0"`), + "type": json.RawMessage(`"test"`), + "source": json.RawMessage(`true`), + "id": json.RawMessage(`"1"`), + } + _, err := parseCloudEvent(m) + assert.ErrorIs(t, err, ErrCloudEventMissingSource) + }) + + t.Run("non-string id returns error", func(t *testing.T) { + t.Parallel() + m := map[string]json.RawMessage{ + "specversion": json.RawMessage(`"1.0"`), + "type": json.RawMessage(`"test"`), + "source": json.RawMessage(`"src"`), + "id": json.RawMessage(`99`), + } + _, err := parseCloudEvent(m) + assert.ErrorIs(t, err, ErrCloudEventMissingID) + }) + + t.Run("CloudEvent with subject attribute", func(t *testing.T) { + t.Parallel() + m := ceMap(t, `{ + "specversion": "1.0", + "type": "test.event", + "source": "test", + "id": "1", + "subject": "resource-123" + }`) + + event, err := parseCloudEvent(m) + require.NoError(t, err) + assert.Equal(t, "resource-123", event.Metadata["ce_subject"]) + }) + + t.Run("CloudEvent with string data payload", func(t *testing.T) { + t.Parallel() + m := ceMap(t, `{ + "specversion": "1.0", + "type": "test.event", + "source": "test", + "id": "1", + "data": "plain text payload" + }`) + + event, err := parseCloudEvent(m) + require.NoError(t, err) + assert.Equal(t, "plain text payload", event.Payload) + }) + + t.Run("CloudEvent with array data payload", func(t *testing.T) { + t.Parallel() + m := ceMap(t, `{ + "specversion": "1.0", + "type": "test.event", + "source": "test", + "id": "1", + "data": [1, 2, 3] + }`) + + event, err := parseCloudEvent(m) + require.NoError(t, err) + arr, ok := event.Payload.([]interface{}) + require.True(t, ok) + assert.Len(t, arr, 3) + }) + + t.Run("CloudEvent with data_base64 binary payload", func(t *testing.T) { + t.Parallel() + // "SGVsbG8gV29ybGQ=" is base64 for "Hello World" + m := ceMap(t, `{ + "specversion": "1.0", + "type": "test.event", + "source": "test", + "id": "1", + "data_base64": "SGVsbG8gV29ybGQ=" + }`) + + event, err := parseCloudEvent(m) + require.NoError(t, err) + assert.Equal(t, []byte("Hello World"), event.Payload) + }) + + t.Run("CloudEvent with data_base64 JSON payload", func(t *testing.T) { + t.Parallel() + // base64 of `{"key":"value"}`, with charset parameter in content type + m := ceMap(t, `{ + "specversion": "1.0", + "type": "test.event", + "source": "test", + "id": "1", + "datacontenttype": "application/json; charset=utf-8", + "data_base64": "eyJrZXkiOiJ2YWx1ZSJ9" + }`) + + event, err := parseCloudEvent(m) + require.NoError(t, err) + payloadMap, ok := event.Payload.(map[string]interface{}) + require.True(t, ok) + assert.Equal(t, "value", payloadMap["key"]) + }) + + t.Run("CloudEvent with invalid data_base64 returns error", func(t *testing.T) { + t.Parallel() + m := ceMap(t, `{ + "specversion": "1.0", + "type": "test.event", + "source": "test", + "id": "1", + "data_base64": "!!!not-base64!!!" + }`) + + _, err := parseCloudEvent(m) + assert.Error(t, err) + assert.Contains(t, err.Error(), "data_base64") + }) + + t.Run("CloudEvent with non-string data_base64 returns error", func(t *testing.T) { + t.Parallel() + m := map[string]json.RawMessage{ + "specversion": json.RawMessage(`"1.0"`), + "type": json.RawMessage(`"test"`), + "source": json.RawMessage(`"src"`), + "id": json.RawMessage(`"1"`), + "data_base64": json.RawMessage(`12345`), + } + _, err := parseCloudEvent(m) + assert.Error(t, err) + assert.Contains(t, err.Error(), "data_base64") + }) + + t.Run("CloudEvent with data_base64 invalid JSON content returns error", func(t *testing.T) { + t.Parallel() + // base64 of "not json" = "bm90IGpzb24=" + m := ceMap(t, `{ + "specversion": "1.0", + "type": "test.event", + "source": "test", + "id": "1", + "datacontenttype": "application/json", + "data_base64": "bm90IGpzb24=" + }`) + + _, err := parseCloudEvent(m) + assert.Error(t, err) + assert.Contains(t, err.Error(), "data_base64") + }) + + t.Run("data takes precedence over data_base64", func(t *testing.T) { + t.Parallel() + m := ceMap(t, `{ + "specversion": "1.0", + "type": "test.event", + "source": "test", + "id": "1", + "data": {"from": "data"}, + "data_base64": "SGVsbG8=" + }`) + + event, err := parseCloudEvent(m) + require.NoError(t, err) + payloadMap, ok := event.Payload.(map[string]interface{}) + require.True(t, ok) + assert.Equal(t, "data", payloadMap["from"]) + }) + + t.Run("CloudEvent with null data_base64", func(t *testing.T) { + t.Parallel() + m := ceMap(t, `{ + "specversion": "1.0", + "type": "test.event", + "source": "test", + "id": "1", + "data_base64": null + }`) + + event, err := parseCloudEvent(m) + require.NoError(t, err) + assert.Nil(t, event.Payload) + }) + + t.Run("invalid data field returns error", func(t *testing.T) { + t.Parallel() + m := map[string]json.RawMessage{ + "specversion": json.RawMessage(`"1.0"`), + "type": json.RawMessage(`"test"`), + "source": json.RawMessage(`"src"`), + "id": json.RawMessage(`"1"`), + "data": json.RawMessage(`{invalid`), + } + _, err := parseCloudEvent(m) + assert.Error(t, err) + assert.Contains(t, err.Error(), "data") + }) + + t.Run("unsupported specversion returns error", func(t *testing.T) { + t.Parallel() + m := ceMap(t, `{ + "specversion": "99.9", + "type": "test.event", + "source": "test", + "id": "1" + }`) + _, err := parseCloudEvent(m) + assert.ErrorIs(t, err, ErrCloudEventUnsupportedSpecVersion) + assert.Contains(t, err.Error(), `"99.9"`) + }) + + t.Run("specversion 1.0 is accepted", func(t *testing.T) { + t.Parallel() + m := ceMap(t, `{ + "specversion": "1.0", + "type": "test.event", + "source": "test", + "id": "1" + }`) + _, err := parseCloudEvent(m) + require.NoError(t, err) + }) + + t.Run("dataschema attribute is captured in metadata", func(t *testing.T) { + t.Parallel() + m := ceMap(t, `{ + "specversion": "1.0", + "type": "test.event", + "source": "test", + "id": "1", + "dataschema": "https://example.com/schema/v1" + }`) + event, err := parseCloudEvent(m) + require.NoError(t, err) + assert.Equal(t, "https://example.com/schema/v1", event.Metadata["ce_dataschema"]) + }) + + t.Run("dataschema is not treated as extension attribute", func(t *testing.T) { + t.Parallel() + m := ceMap(t, `{ + "specversion": "1.0", + "type": "test.event", + "source": "test", + "id": "1", + "dataschema": "https://example.com/schema/v1", + "customext": "val" + }`) + event, err := parseCloudEvent(m) + require.NoError(t, err) + // dataschema should be captured explicitly, not duplicated via extension loop + assert.Equal(t, "https://example.com/schema/v1", event.Metadata["ce_dataschema"]) + assert.Equal(t, "val", event.Metadata["ce_customext"]) + }) + + t.Run("data null with data_base64 present uses data_base64", func(t *testing.T) { + t.Parallel() + m := ceMap(t, `{ + "specversion": "1.0", + "type": "test.event", + "source": "test", + "id": "1", + "data": null, + "data_base64": "SGVsbG8=" + }`) + event, err := parseCloudEvent(m) + require.NoError(t, err) + assert.Equal(t, []byte("Hello"), event.Payload) + }) + + t.Run("extension with non-JSON value falls back to string", func(t *testing.T) { + t.Parallel() + m := map[string]json.RawMessage{ + "specversion": json.RawMessage(`"1.0"`), + "type": json.RawMessage(`"test"`), + "source": json.RawMessage(`"src"`), + "id": json.RawMessage(`"1"`), + "customext": json.RawMessage(`{bad json`), + } + event, err := parseCloudEvent(m) + require.NoError(t, err) + assert.Equal(t, "{bad json", event.Metadata["ce_customext"]) + }) +} + +func TestParseRecord(t *testing.T) { + t.Parallel() + + t.Run("routes CloudEvent to parseCloudEvent", func(t *testing.T) { + t.Parallel() + raw := []byte(`{ + "specversion": "1.0", + "type": "order.placed", + "source": "order-service", + "id": "evt-123", + "data": {"orderId": "456"} + }`) + + event, err := parseRecord(raw) + require.NoError(t, err) + assert.Equal(t, "order.placed", event.Topic) + assert.Equal(t, "1.0", event.Metadata["ce_specversion"]) + }) + + t.Run("routes native Event to json.Unmarshal", func(t *testing.T) { + t.Parallel() + raw := []byte(`{ + "topic": "user.created", + "payload": {"userId": "789"}, + "metadata": {"source": "internal"}, + "createdAt": "2026-01-15T10:00:00Z" + }`) + + event, err := parseRecord(raw) + require.NoError(t, err) + assert.Equal(t, "user.created", event.Topic) + _, hasCeSpec := event.Metadata["ce_specversion"] + assert.False(t, hasCeSpec) + assert.Equal(t, "internal", event.Metadata["source"]) + }) + + t.Run("invalid JSON returns error", func(t *testing.T) { + t.Parallel() + _, err := parseRecord([]byte(`not json at all`)) + assert.Error(t, err) + }) + + t.Run("empty JSON object returns native Event", func(t *testing.T) { + t.Parallel() + event, err := parseRecord([]byte(`{}`)) + require.NoError(t, err) + assert.Equal(t, "", event.Topic) + }) + + t.Run("valid JSON but invalid native Event returns error", func(t *testing.T) { + t.Parallel() + // createdAt must be a valid RFC3339 timestamp; a bare string triggers unmarshal error. + raw := []byte(`{"topic":"t","createdAt":"not-a-time"}`) + _, err := parseRecord(raw) + assert.Error(t, err) + assert.Contains(t, err.Error(), "failed to deserialize record") + }) + + t.Run("native event with __topic metadata preserved", func(t *testing.T) { + t.Parallel() + raw := []byte(`{ + "topic": "user.created", + "payload": {"userId": "123"}, + "metadata": {"__topic": "user.created"}, + "createdAt": "2026-01-15T10:00:00Z" + }`) + + event, err := parseRecord(raw) + require.NoError(t, err) + assert.Equal(t, "user.created", event.Topic) + assert.Equal(t, "user.created", event.Metadata["__topic"]) + }) +} diff --git a/modules/eventbus/kafka.go b/modules/eventbus/kafka.go index c456a637..880836ba 100644 --- a/modules/eventbus/kafka.go +++ b/modules/eventbus/kafka.go @@ -117,16 +117,16 @@ func (h *KafkaConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSes } h.mutex.RUnlock() + // Deserialize once per message, reuse for all matching subscriptions + event, err := parseRecord(msg.Value) + if err != nil { + slog.Error("Failed to deserialize Kafka message", "error", err, "topic", msg.Topic) + session.MarkMessage(msg, "") + continue + } + // Process message for each matching subscription for _, sub := range subs { - // Deserialize event - var event Event - if err := json.Unmarshal(msg.Value, &event); err != nil { - slog.Error("Failed to deserialize Kafka message", "error", err, "topic", msg.Topic) - continue - } - - // Process the event if sub.isAsync { go h.eventBus.processEventAsync(sub, event) } else { diff --git a/modules/eventbus/kafka_test.go b/modules/eventbus/kafka_test.go index 2e5a4b16..f97c470a 100644 --- a/modules/eventbus/kafka_test.go +++ b/modules/eventbus/kafka_test.go @@ -282,3 +282,248 @@ func TestKafkaPublishNotStarted(t *testing.T) { err := bus.Publish(context.Background(), Event{Topic: "test", Payload: "data"}) assert.ErrorIs(t, err, ErrEventBusNotStarted) } + +// --- ConsumeClaim integration tests: CloudEvents deserialization via Kafka --- + +// testConsumerGroupSession implements sarama.ConsumerGroupSession for tests. +type testConsumerGroupSession struct { + ctx context.Context + markedMsgs []*sarama.ConsumerMessage +} + +func (s *testConsumerGroupSession) Claims() map[string][]int32 { return nil } +func (s *testConsumerGroupSession) MemberID() string { return "test-member" } +func (s *testConsumerGroupSession) GenerationID() int32 { return 1 } +func (s *testConsumerGroupSession) MarkOffset(_ string, _ int32, _ int64, _ string) {} +func (s *testConsumerGroupSession) Commit() {} +func (s *testConsumerGroupSession) ResetOffset(_ string, _ int32, _ int64, _ string) {} +func (s *testConsumerGroupSession) Context() context.Context { return s.ctx } +func (s *testConsumerGroupSession) MarkMessage(msg *sarama.ConsumerMessage, _ string) { + s.markedMsgs = append(s.markedMsgs, msg) +} + +// testConsumerGroupClaim implements sarama.ConsumerGroupClaim for tests. +type testConsumerGroupClaim struct { + messages chan *sarama.ConsumerMessage +} + +func (c *testConsumerGroupClaim) Topic() string { return "test-topic" } +func (c *testConsumerGroupClaim) Partition() int32 { return 0 } +func (c *testConsumerGroupClaim) InitialOffset() int64 { return 0 } +func (c *testConsumerGroupClaim) HighWaterMarkOffset() int64 { return 0 } +func (c *testConsumerGroupClaim) Messages() <-chan *sarama.ConsumerMessage { return c.messages } + +func TestKafkaConsumeClaimCloudEvent(t *testing.T) { + ctrl := gomock.NewController(t) + producer := mocks.NewMockSyncProducer(ctrl) + bus := newTestKafkaEventBus(producer) + defer bus.cancel() + + received := make(chan Event, 1) + handler := &KafkaConsumerGroupHandler{ + eventBus: bus, + subscriptions: map[string]*kafkaSubscription{ + "sub-1": { + id: "sub-1", + topic: "order.placed", + handler: func(ctx context.Context, event Event) error { + received <- event + return nil + }, + done: make(chan struct{}), + bus: bus, + }, + }, + } + + messages := make(chan *sarama.ConsumerMessage, 1) + messages <- &sarama.ConsumerMessage{ + Topic: "order.placed", + Value: []byte(`{"specversion":"1.0","type":"order.placed","source":"order-svc","id":"evt-1","data":{"orderId":"42"}}`), + } + close(messages) + + session := &testConsumerGroupSession{ctx: context.Background()} + claim := &testConsumerGroupClaim{messages: messages} + + err := handler.ConsumeClaim(session, claim) + require.NoError(t, err) + + select { + case event := <-received: + assert.Equal(t, "order.placed", event.Topic) + assert.Equal(t, "1.0", event.Metadata["ce_specversion"]) + assert.Equal(t, "order-svc", event.Metadata["ce_source"]) + payloadMap, ok := event.Payload.(map[string]interface{}) + require.True(t, ok) + assert.Equal(t, "42", payloadMap["orderId"]) + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for event") + } + + assert.Len(t, session.markedMsgs, 1) +} + +func TestKafkaConsumeClaimNativeEvent(t *testing.T) { + ctrl := gomock.NewController(t) + producer := mocks.NewMockSyncProducer(ctrl) + bus := newTestKafkaEventBus(producer) + defer bus.cancel() + + received := make(chan Event, 1) + handler := &KafkaConsumerGroupHandler{ + eventBus: bus, + subscriptions: map[string]*kafkaSubscription{ + "sub-1": { + id: "sub-1", + topic: "user.created", + handler: func(ctx context.Context, event Event) error { + received <- event + return nil + }, + done: make(chan struct{}), + bus: bus, + }, + }, + } + + messages := make(chan *sarama.ConsumerMessage, 1) + messages <- &sarama.ConsumerMessage{ + Topic: "user.created", + Value: []byte(`{"topic":"user.created","payload":{"userId":"u-789"},"metadata":{"__topic":"user.created"},"createdAt":"2026-01-15T10:00:00Z"}`), + } + close(messages) + + session := &testConsumerGroupSession{ctx: context.Background()} + claim := &testConsumerGroupClaim{messages: messages} + + err := handler.ConsumeClaim(session, claim) + require.NoError(t, err) + + select { + case event := <-received: + assert.Equal(t, "user.created", event.Topic) + payloadMap, ok := event.Payload.(map[string]interface{}) + require.True(t, ok) + assert.Equal(t, "u-789", payloadMap["userId"]) + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for event") + } + + assert.Len(t, session.markedMsgs, 1) +} + +func TestKafkaConsumeClaimRejectsInvalidRecord(t *testing.T) { + ctrl := gomock.NewController(t) + producer := mocks.NewMockSyncProducer(ctrl) + bus := newTestKafkaEventBus(producer) + defer bus.cancel() + + handlerCalled := make(chan struct{}, 1) + handler := &KafkaConsumerGroupHandler{ + eventBus: bus, + subscriptions: map[string]*kafkaSubscription{ + "sub-1": { + id: "sub-1", + topic: "order.placed", + handler: func(ctx context.Context, event Event) error { + handlerCalled <- struct{}{} + return nil + }, + done: make(chan struct{}), + bus: bus, + }, + }, + } + + messages := make(chan *sarama.ConsumerMessage, 1) + messages <- &sarama.ConsumerMessage{ + Topic: "order.placed", + Value: []byte(`not valid json`), + } + close(messages) + + session := &testConsumerGroupSession{ctx: context.Background()} + claim := &testConsumerGroupClaim{messages: messages} + + err := handler.ConsumeClaim(session, claim) + require.NoError(t, err) + + // Handler should NOT have been called. + select { + case <-handlerCalled: + t.Fatal("handler should not have been called for invalid record") + case <-time.After(100 * time.Millisecond): + // Success: handler was not called. + } + + // Message should still be marked (offset committed even on error). + assert.Len(t, session.markedMsgs, 1) +} + +func TestKafkaConsumeClaimMultipleMessages(t *testing.T) { + ctrl := gomock.NewController(t) + producer := mocks.NewMockSyncProducer(ctrl) + bus := newTestKafkaEventBus(producer) + defer bus.cancel() + + orderReceived := make(chan Event, 1) + userReceived := make(chan Event, 1) + handler := &KafkaConsumerGroupHandler{ + eventBus: bus, + subscriptions: map[string]*kafkaSubscription{ + "order-sub": { + id: "order-sub", + topic: "order.placed", + handler: func(ctx context.Context, e Event) error { + orderReceived <- e + return nil + }, + done: make(chan struct{}), + bus: bus, + }, + "user-sub": { + id: "user-sub", + topic: "user.created", + handler: func(ctx context.Context, e Event) error { + userReceived <- e + return nil + }, + done: make(chan struct{}), + bus: bus, + }, + }, + } + + messages := make(chan *sarama.ConsumerMessage, 2) + messages <- &sarama.ConsumerMessage{ + Topic: "order.placed", + Value: []byte(`{"specversion":"1.0","type":"order.placed","source":"orders","id":"1","data":{"orderId":"o1"}}`), + } + messages <- &sarama.ConsumerMessage{ + Topic: "user.created", + Value: []byte(`{"specversion":"1.0","type":"user.created","source":"users","id":"2","data":{"userId":"u1"}}`), + } + close(messages) + + session := &testConsumerGroupSession{ctx: context.Background()} + claim := &testConsumerGroupClaim{messages: messages} + + err := handler.ConsumeClaim(session, claim) + require.NoError(t, err) + + select { + case e := <-orderReceived: + assert.Equal(t, "order.placed", e.Topic) + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for order event") + } + select { + case e := <-userReceived: + assert.Equal(t, "user.created", e.Topic) + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for user event") + } + + assert.Len(t, session.markedMsgs, 2) +} diff --git a/modules/eventbus/kinesis.go b/modules/eventbus/kinesis.go index 1e4243e5..b242f4ff 100644 --- a/modules/eventbus/kinesis.go +++ b/modules/eventbus/kinesis.go @@ -378,8 +378,8 @@ func (k *KinesisEventBus) readShard(shardID string) { // Process records for _, record := range resp.Records { - var event Event - if err := json.Unmarshal(record.Data, &event); err != nil { + event, err := parseRecord(record.Data) + if err != nil { slog.Error("Failed to deserialize Kinesis record", "error", err) continue } diff --git a/modules/eventbus/kinesis_test.go b/modules/eventbus/kinesis_test.go index aeb3ef31..d73ecd46 100644 --- a/modules/eventbus/kinesis_test.go +++ b/modules/eventbus/kinesis_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/aws/aws-sdk-go-v2/service/kinesis" + "github.com/aws/aws-sdk-go-v2/service/kinesis/types" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" @@ -254,3 +255,306 @@ func TestKinesisPublishNotStarted(t *testing.T) { err := bus.Publish(context.Background(), Event{Topic: "test", Payload: "data"}) assert.ErrorIs(t, err, ErrEventBusNotStarted) } + +// --- ReadShard integration tests: CloudEvents deserialization via mock Kinesis --- + +// newReadShardTestBus creates a test bus with subscriptions wired directly +// (bypasses Subscribe to avoid starting shard reader goroutines). +func newReadShardTestBus(t *testing.T, client KinesisClient, subs map[string]map[string]*kinesisSubscription) *KinesisEventBus { + t.Helper() + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + return &KinesisEventBus{ + config: &KinesisConfig{ + StreamName: "test-stream", + ShardCount: 1, + }, + client: client, + subscriptions: subs, + ctx: ctx, + cancel: cancel, + isStarted: true, + } +} + +func TestKinesisReadShardCloudEvent(t *testing.T) { + ctrl := gomock.NewController(t) + mockClient := mocks.NewMockKinesisClient(ctrl) + + received := make(chan Event, 1) + subs := map[string]map[string]*kinesisSubscription{ + "order.placed": { + "test-sub": { + id: "test-sub", + topic: "order.placed", + handler: func(ctx context.Context, event Event) error { + received <- event + return nil + }, + done: make(chan struct{}), + }, + }, + } + bus := newReadShardTestBus(t, mockClient, subs) + subs["order.placed"]["test-sub"].bus = bus + + ceJSON := []byte(`{ + "specversion": "1.0", + "type": "order.placed", + "source": "order-service", + "id": "evt-001", + "time": "2026-02-06T12:00:00Z", + "data": {"orderId": "abc-123"} + }`) + + iteratorStr := "shard-iter-1" + mockClient.EXPECT(). + GetShardIterator(gomock.Any(), gomock.Any()). + Return(&kinesis.GetShardIteratorOutput{ShardIterator: &iteratorStr}, nil) + + // Return records then nil iterator to terminate the loop. + mockClient.EXPECT(). + GetRecords(gomock.Any(), gomock.Any()). + Return(&kinesis.GetRecordsOutput{ + Records: []types.Record{{Data: ceJSON}}, + NextShardIterator: nil, + }, nil) + + bus.readShard("shard-0") + + select { + case event := <-received: + assert.Equal(t, "order.placed", event.Topic) + assert.Equal(t, "1.0", event.Metadata["ce_specversion"]) + assert.Equal(t, "order-service", event.Metadata["ce_source"]) + assert.Equal(t, "evt-001", event.Metadata["ce_id"]) + payloadMap, ok := event.Payload.(map[string]interface{}) + require.True(t, ok) + assert.Equal(t, "abc-123", payloadMap["orderId"]) + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for event") + } +} + +func TestKinesisReadShardNativeEvent(t *testing.T) { + ctrl := gomock.NewController(t) + mockClient := mocks.NewMockKinesisClient(ctrl) + + received := make(chan Event, 1) + subs := map[string]map[string]*kinesisSubscription{ + "user.created": { + "test-sub": { + id: "test-sub", + topic: "user.created", + handler: func(ctx context.Context, event Event) error { + received <- event + return nil + }, + done: make(chan struct{}), + }, + }, + } + bus := newReadShardTestBus(t, mockClient, subs) + subs["user.created"]["test-sub"].bus = bus + + nativeJSON := []byte(`{ + "topic": "user.created", + "payload": {"userId": "u-789"}, + "metadata": {"__topic": "user.created"}, + "createdAt": "2026-01-15T10:00:00Z" + }`) + + iteratorStr := "shard-iter-1" + mockClient.EXPECT(). + GetShardIterator(gomock.Any(), gomock.Any()). + Return(&kinesis.GetShardIteratorOutput{ShardIterator: &iteratorStr}, nil) + + mockClient.EXPECT(). + GetRecords(gomock.Any(), gomock.Any()). + Return(&kinesis.GetRecordsOutput{ + Records: []types.Record{{Data: nativeJSON}}, + NextShardIterator: nil, + }, nil) + + bus.readShard("shard-0") + + select { + case event := <-received: + assert.Equal(t, "user.created", event.Topic) + payloadMap, ok := event.Payload.(map[string]interface{}) + require.True(t, ok) + assert.Equal(t, "u-789", payloadMap["userId"]) + assert.Equal(t, "user.created", event.Metadata["__topic"]) + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for event") + } +} + +func TestKinesisReadShardCloudEventBase64(t *testing.T) { + ctrl := gomock.NewController(t) + mockClient := mocks.NewMockKinesisClient(ctrl) + + received := make(chan Event, 1) + subs := map[string]map[string]*kinesisSubscription{ + "file.uploaded": { + "test-sub": { + id: "test-sub", + topic: "file.uploaded", + handler: func(ctx context.Context, event Event) error { + received <- event + return nil + }, + done: make(chan struct{}), + }, + }, + } + bus := newReadShardTestBus(t, mockClient, subs) + subs["file.uploaded"]["test-sub"].bus = bus + + // "SGVsbG8gV29ybGQ=" is base64 for "Hello World" + ceJSON := []byte(`{ + "specversion": "1.0", + "type": "file.uploaded", + "source": "storage-service", + "id": "evt-002", + "data_base64": "SGVsbG8gV29ybGQ=" + }`) + + iteratorStr := "shard-iter-1" + mockClient.EXPECT(). + GetShardIterator(gomock.Any(), gomock.Any()). + Return(&kinesis.GetShardIteratorOutput{ShardIterator: &iteratorStr}, nil) + + mockClient.EXPECT(). + GetRecords(gomock.Any(), gomock.Any()). + Return(&kinesis.GetRecordsOutput{ + Records: []types.Record{{Data: ceJSON}}, + NextShardIterator: nil, + }, nil) + + bus.readShard("shard-0") + + select { + case event := <-received: + assert.Equal(t, "file.uploaded", event.Topic) + assert.Equal(t, []byte("Hello World"), event.Payload) + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for event") + } +} + +func TestKinesisReadShardRejectsInvalidSpecversion(t *testing.T) { + ctrl := gomock.NewController(t) + mockClient := mocks.NewMockKinesisClient(ctrl) + + handlerCalled := make(chan struct{}, 1) + subs := map[string]map[string]*kinesisSubscription{ + "order.placed": { + "test-sub": { + id: "test-sub", + topic: "order.placed", + handler: func(ctx context.Context, event Event) error { + handlerCalled <- struct{}{} + return nil + }, + done: make(chan struct{}), + }, + }, + } + bus := newReadShardTestBus(t, mockClient, subs) + subs["order.placed"]["test-sub"].bus = bus + + badCE := []byte(`{ + "specversion": "99.9", + "type": "order.placed", + "source": "order-service", + "id": "evt-bad" + }`) + + iteratorStr := "shard-iter-1" + mockClient.EXPECT(). + GetShardIterator(gomock.Any(), gomock.Any()). + Return(&kinesis.GetShardIteratorOutput{ShardIterator: &iteratorStr}, nil) + + mockClient.EXPECT(). + GetRecords(gomock.Any(), gomock.Any()). + Return(&kinesis.GetRecordsOutput{ + Records: []types.Record{{Data: badCE}}, + NextShardIterator: nil, + }, nil) + + bus.readShard("shard-0") + + // Handler should NOT have been called for invalid specversion. + select { + case <-handlerCalled: + t.Fatal("handler should not have been called for invalid specversion") + case <-time.After(100 * time.Millisecond): + // Success: handler was not called. + } +} + +func TestKinesisReadShardMultipleRecords(t *testing.T) { + ctrl := gomock.NewController(t) + mockClient := mocks.NewMockKinesisClient(ctrl) + + orderReceived := make(chan Event, 1) + userReceived := make(chan Event, 1) + subs := map[string]map[string]*kinesisSubscription{ + "order.placed": { + "order-sub": { + id: "order-sub", + topic: "order.placed", + handler: func(ctx context.Context, e Event) error { + orderReceived <- e + return nil + }, + done: make(chan struct{}), + }, + }, + "user.created": { + "user-sub": { + id: "user-sub", + topic: "user.created", + handler: func(ctx context.Context, e Event) error { + userReceived <- e + return nil + }, + done: make(chan struct{}), + }, + }, + } + bus := newReadShardTestBus(t, mockClient, subs) + subs["order.placed"]["order-sub"].bus = bus + subs["user.created"]["user-sub"].bus = bus + + iteratorStr := "shard-iter-1" + mockClient.EXPECT(). + GetShardIterator(gomock.Any(), gomock.Any()). + Return(&kinesis.GetShardIteratorOutput{ShardIterator: &iteratorStr}, nil) + + mockClient.EXPECT(). + GetRecords(gomock.Any(), gomock.Any()). + Return(&kinesis.GetRecordsOutput{ + Records: []types.Record{ + {Data: []byte(`{"specversion":"1.0","type":"order.placed","source":"orders","id":"1","data":{"orderId":"o1"}}`)}, + {Data: []byte(`{"specversion":"1.0","type":"user.created","source":"users","id":"2","data":{"userId":"u1"}}`)}, + }, + NextShardIterator: nil, + }, nil) + + bus.readShard("shard-0") + + select { + case e := <-orderReceived: + assert.Equal(t, "order.placed", e.Topic) + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for order event") + } + select { + case e := <-userReceived: + assert.Equal(t, "user.created", e.Topic) + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for user event") + } +} diff --git a/modules/eventbus/nats.go b/modules/eventbus/nats.go index ce0b4dcf..d3a80c6d 100644 --- a/modules/eventbus/nats.go +++ b/modules/eventbus/nats.go @@ -299,8 +299,8 @@ func (n *NatsEventBus) subscribe(ctx context.Context, topic string, handler Even sub.mutex.RUnlock() // Deserialize event - var event Event - if err := json.Unmarshal(msg.Data, &event); err != nil { + event, err := parseRecord(msg.Data) + if err != nil { slog.Error("Failed to deserialize NATS message", "error", err, "subject", msg.Subject) return } diff --git a/modules/eventbus/redis.go b/modules/eventbus/redis.go index 0dab6783..a9b916c8 100644 --- a/modules/eventbus/redis.go +++ b/modules/eventbus/redis.go @@ -350,8 +350,8 @@ func (r *RedisEventBus) handleMessages(sub *redisSubscription) { } // Deserialize event - var event Event - if err := json.Unmarshal([]byte(msg.Payload), &event); err != nil { + event, err := parseRecord([]byte(msg.Payload)) + if err != nil { slog.Error("Failed to deserialize Redis message", "error", err, "topic", msg.Channel) continue }