feat: add CloudEvents deserialization support to all eventbus engines#177
feat: add CloudEvents deserialization support to all eventbus engines#177kristopherchun merged 7 commits intomainfrom
Conversation
External producers publish domain events using the CloudEvents v1.0 spec, but all eventbus engines (Kinesis, Kafka, NATS, Redis) unmarshal records directly into eventbus.Event whose JSON tags don't match CloudEvents fields. This causes CloudEvents records to deserialize as zero-value structs with empty topics that match no subscriptions, silently dropping messages. Introduce parseRecord() which detects CloudEvents by the presence of "specversion", maps type→Topic, data→Payload, time→CreatedAt, and preserves context/extension attributes in Metadata with a ce_ prefix. Native eventbus.Event format continues to work unchanged.
📋 API Contract Changes Summary✅ No breaking changes detected - only additions and non-breaking modifications Changed Components:Core FrameworkContract diff saved to artifacts/diffs/core.json Module: authContract diff saved to artifacts/diffs/auth.json Module: cacheContract diff saved to artifacts/diffs/cache.json Module: chimuxContract diff saved to artifacts/diffs/chimux.json Module: databaseContract diff saved to artifacts/diffs/database.json Module: eventbusContract diff saved to artifacts/diffs/eventbus.json Module: eventloggerContract diff saved to artifacts/diffs/eventlogger.json Module: httpclientContract diff saved to artifacts/diffs/httpclient.json Module: httpserverContract diff saved to artifacts/diffs/httpserver.json Module: jsonschemaContract diff saved to artifacts/diffs/jsonschema.json Module: letsencryptContract diff saved to artifacts/diffs/letsencrypt.json Module: logmaskerContract diff saved to artifacts/diffs/logmasker.json Module: reverseproxyContract diff saved to artifacts/diffs/reverseproxy.json Module: schedulerContract diff saved to artifacts/diffs/scheduler.json Artifacts📁 Full contract diffs and JSON artifacts are available in the workflow artifacts. |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #177 +/- ##
==========================================
- Coverage 60.44% 56.63% -3.82%
==========================================
Files 93 94 +1
Lines 19538 19637 +99
==========================================
- Hits 11810 11121 -689
- Misses 6607 7320 +713
- Partials 1121 1196 +75
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
Adds CloudEvents v1.0 JSON envelope deserialization to the eventbus module so all supported engines can consume events produced in either the native eventbus.Event format or CloudEvents without configuration changes.
Changes:
- Introduces
parseRecord()to auto-detect CloudEvents (viaspecversion) and map CloudEvents →eventbus.Event. - Updates Kafka/Kinesis/NATS/Redis consume paths to use
parseRecord()instead of directjson.Unmarshal. - Adds unit tests covering CloudEvents detection/mapping and fallback to native
eventbus.Event.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| modules/eventbus/cloudevents_decode.go | Implements CloudEvents detection and mapping into eventbus.Event. |
| modules/eventbus/cloudevents_decode_test.go | Adds tests for CloudEvents detection/mapping and native fallback. |
| modules/eventbus/kafka.go | Switches Kafka consume path to use parseRecord(). |
| modules/eventbus/kinesis.go | Switches Kinesis record deserialization to use parseRecord(). |
| modules/eventbus/nats.go | Switches NATS message deserialization to use parseRecord(). |
| modules/eventbus/redis.go | Switches Redis message deserialization to use parseRecord(). |
Replace triple-unmarshal approach (probe map, envelope struct, extension map) with a single unmarshal into map[string]json.RawMessage that is reused for detection, field extraction, and extension collection. Add extractString helper and tests for non-string attributes, invalid data fields, and malformed extensions to achieve 100% coverage on cloudevents_decode.go.
- Add data_base64 handling per CloudEvents spec with JSON-aware decode when datacontenttype is application/json - Move Kafka parseRecord call before subscriber loop to deserialize once per message instead of once per subscription - Use sentinel errors for required attribute validation (err113) - Fix discarded time.Parse error in test
- Parse datacontenttype with mime.ParseMediaType to handle parameters like "application/json; charset=utf-8" - Preserve ce_type in metadata for full CloudEvent context reconstruction - Fix misleading parseRecord doc comment about single-unmarshal behavior
…tests Validate specversion is "1.0" (reject unknown versions), add dataschema to known keys, warn on data+data_base64 mutual exclusivity violation, and log warnings on extension attribute JSON parse failures. Add Kinesis readShard integration tests exercising CloudEvent and native Event deserialization through the mock KinesisClient.
Cover the deserialization and dispatch path in ConsumeClaim using lightweight test doubles for sarama.ConsumerGroupSession and ConsumerGroupClaim. Tests verify CloudEvent decode, native Event decode, invalid record rejection with offset commit, and multi- message batch dispatch to topic-matched subscribers.
Summary
External producers publishing domain events in CloudEvents v1.0 format are silently dropped because the eventbus engines deserialize records directly into
eventbus.Event, whose JSON tags don't match CloudEvents fields. This results in zero-value structs with empty topics that match no subscriptions.This PR introduces auto-detection and mapping of CloudEvents envelopes so that all four eventbus engines (Kafka, Kinesis, NATS, Redis) can consume both native
eventbus.Eventand CloudEvents records without any configuration changes.Changes
parseRecord()entry point that auto-detects CloudEvents by probing forspecversion, falling back to nativeeventbus.Eventdeserializationtype→Topic,data→Payload,time→CreatedAtMetadatawith ace_prefix (e.g.,ce_source,ce_id,ce_tenantid)json.Unmarshalcalls in all four engine consume paths (Kafka, Kinesis, NATS, Redis) withparseRecord()specversion,type,source,id)