-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Closed
Labels
enhancementNew feature or requestNew feature or request
Description
Search before asking
- I searched in the issues and found nothing similar.
Motivation
cdc action supports the following formats of kafka data which collected from mongodb via debezium, regardless of whether it contains schema
{
"before": null,
"after": "{\"_id\":{\"$oid\":\"64001c996f4de7ff3189d374\"},\"last_updated_at\":{\"$numberLong\":\"1732232838425\"},\"tags\":[\"completely\",\"pass\"],\"updated_by\":\"xxx\"}",
"updateDescription": null,
"source": {
"version": "2.7.0.Final",
"connector": "mongodb",
"name": "datapipeline",
"ts_ms": 1732644484000,
"snapshot": "false",
"db": "datapipeline",
"sequence": null,
"ts_us": 1732644484000000,
"ts_ns": 1732644484000000000,
"collection": "clips",
"ord": 22,
"lsid": null,
"txnNumber": null,
"wallTime": null
},
"op": "c",
"ts_ms": 1732644484231,
"transaction": null
}Solution
Add debezium-bson format for kafka cdc action:
- support parse bson string from before/after field in kafka message
- extract java object and basic data type from bson value
- convert java basic data type to string, java object to json string
- supports schema evolution, and all fields are fixed to string type, _id is fixed to primary key
Expected Results:
- Schema:
| Column | DataType | Key |
|---|---|---|
| _id | STRING | Primary Key |
| last_updated_at | STRING | |
| tags | STRING | |
| updated_by | STRING |
- Records:
| RowKind | _id | last_updated_at | tags | updated_by |
|---|---|---|---|---|
| +I | 64001c996f4de7ff3189d374 | 1732232838425 | ["completely","pass"] | xxx |
Anything else?
No response
Are you willing to submit a PR?
- I'm willing to submit a PR!
Metadata
Metadata
Assignees
Labels
enhancementNew feature or requestNew feature or request