diff --git a/docs/content/cdc-ingestion/debezium-bson.md b/docs/content/cdc-ingestion/debezium-bson.md
new file mode 100644
index 000000000000..52eb0422d54b
--- /dev/null
+++ b/docs/content/cdc-ingestion/debezium-bson.md
@@ -0,0 +1,329 @@
+---
+title: "Debezium BSON"
+weight: 6
+type: docs
+aliases:
+- /cdc-ingestion/debezium-bson.html
+---
+
+
+# Debezium BSON Format
+
+
+The debezium-bson format is one of the formats supported by }}">Kafka CDC.
+It is the format obtained by collecting mongodb through debezium, which is similar to
+debezium-json format.
+However, MongoDB does not have a fixed schema, and the field types of each document may be different, so the before/after fields
+in JSON are all string types, while the debezium-json format requires a JSON object type.
+
+
+## Prepare MongoDB BSON Jar
+
+Can be downloaded from the [Maven repository](https://mvnrepository.com/artifact/org.mongodb/bson)
+
+```
+bson-*.jar
+```
+
+## Introduction
+
+{{< hint info >}}
+The debezium bson format requires insert/update/delete event messages include the full document, and include a field that represents the state of the document before the change.
+This requires setting debezium's capture.mode to change_streams_update_full_with_pre_image and [capture.mode.full.update.type](https://debezium.io/documentation/reference/stable/connectors/mongodb.html#mongodb-property-capture-mode-full-update-type) to post_image.
+Before version 6.0 of MongoDB, it was not possible to obtain 'Update Before' information. Therefore, using the id field in the Kafka Key as 'Update before' information
+{{< /hint >}}
+
+Here is a simple example for an update operation captured from a Mongodb customers collection in JSON format:
+
+```json
+{
+ "schema": {
+ "type": "struct",
+ "fields": [
+ {
+ "type": "string",
+ "optional": true,
+ "name": "io.debezium.data.Json",
+ "version": 1,
+ "field": "before"
+ },
+ {
+ "type": "string",
+ "optional": true,
+ "name": "io.debezium.data.Json",
+ "version": 1,
+ "field": "after"
+ },
+ ...
+ ]
+ },
+ "payload": {
+ "before": "{\"_id\": {\"$oid\" : \"596e275826f08b2730779e1f\"}, \"name\" : \"Anne\", \"create_time\" : {\"$numberLong\" : \"1558965506000\"}, \"tags\":[\"success\"]}",
+ "after": "{\"_id\": {\"$oid\" : \"596e275826f08b2730779e1f\"}, \"name\" : \"Anne\", \"create_time\" : {\"$numberLong\" : \"1558965506000\"}, \"tags\":[\"passion\",\"success\"]}",
+ "source": {
+ "db": "inventory",
+ "rs": "rs0",
+ "collection": "customers",
+ ...
+ },
+ "op": "u",
+ "ts_ms": 1558965515240,
+ "ts_us": 1558965515240142,
+ "ts_ns": 1558965515240142879
+ }
+}
+```
+
+This document from the MongoDB collection customers has 4 columns, the _id is a BSON ObjectID, name is a string,
+create_time is a long, tags is an array of string. The following is the processing result in debezium-bson format:
+
+Document Schema:
+
+| Field Name | Field Type | Key |
+|------------|------------|-------------|
+| _id | STRING | Primary Key |
+| name | STRING | |
+| create_time| STRING | |
+| tags | STRING | |
+
+Records:
+
+| RowKind | _id | name | create_time | tags |
+|---------|--------------------------|-------|----------------------------|-----------------------|
+| -U | 596e275826f08b2730779e1f | Anne | 1558965506000 | ["success"] |
+| +U | 596e275826f08b2730779e1f | Anne | 1558965506000 | ["passion","success"] |
+
+
+### How it works
+Because the schema field of the event message does not have the field information of the document, the debezium-bson format does not require event messages to have schema information. The specific operations are as follows:
+
+- Parse the before/after fields of the event message into BSONDocument.
+- Recursive traversal all fields of BSONDocument and convert BsonValue to Java Object.
+- All top-level fields of before/after are converted to string type, and _id is fixed to primary key
+- If the top-level fields of before/after is a basic type(such as Integer/Long, etc.), it is directly converted to a string, if not, it is converted to a JSON string
+
+Below is a list of top-level field BsonValue conversion examples:
+
+
+
+
+ | BsonValue Type |
+ Json Value |
+ Conversion Result String |
+
+
+
+
+ BsonString |
+ "hello" |
+ "hello" |
+
+
+ BsonInt32 |
+ 123 |
+ "123" |
+
+
+ BsonInt64 |
+
+
+ - 1735934393769
+ - {"$numberLong": "1735934393769"}
+
+ |
+ "1735934393769" |
+
+
+ BsonDouble |
+
+
+ - {"$numberDouble": "3.14"}
+ - {"$numberDouble": "NaN"}
+ - {"$numberDouble": "Infinity"}
+ - {"$numberDouble": "-Infinity"}
+
+ |
+
+
+ - "3.14"
+ - "NaN"
+ - "Infinity"
+ - "-Infinity"
+
+ |
+
+
+ BsonBoolean |
+
+
+ |
+
+
+ |
+
+
+ BsonArray |
+ [1,2,{"$numberLong": "1735934393769"}] |
+ "[1,2,1735934393769]" |
+
+
+ BsonObjectId |
+ {"$oid": "596e275826f08b2730779e1f"} |
+ "596e275826f08b2730779e1f" |
+
+
+ BsonDateTime |
+ {"$date": 1735934393769 } |
+ "1735934393769" |
+
+
+ BsonNull |
+ null |
+ null |
+
+
+ BsonUndefined |
+ {"$undefined": true} |
+ null |
+
+
+ BsonBinary |
+ {"$binary": "uE2/4v5MSVOiJZkOo3APKQ==", "$type": "0"} |
+ "uE2/4v5MSVOiJZkOo3APKQ==" |
+
+
+ BsonBinary(type=UUID) |
+ {"$binary": "uE2/4v5MSVOiJZkOo3APKQ==", "$type": "4"} |
+ "b84dbfe2-fe4c-4953-a225-990ea3700f29" |
+
+
+ BsonDecimal128 |
+
+
+ - {"$numberDecimal": "3.14"}
+ - {"$numberDecimal": "NaN"}
+
+ |
+
+
+ |
+
+
+ BsonRegularExpression |
+ {"$regularExpression": {"pattern": "^pass$", "options": "i"}} |
+ "/^pass$/i" |
+
+
+ BsonSymbol |
+ {"$symbol": "symbol"} |
+ "symbol" |
+
+
+ BsonTimestamp |
+ {"$timestamp": {"t": 1736997330, "i": 2}} |
+ "1736997330" |
+
+
+ BsonMinKey |
+ {"$minKey": 1} |
+ "BsonMinKey" |
+
+
+ BsonMaxKey |
+ {"$maxKey": 1} |
+ "BsonMaxKey" |
+
+
+ BsonJavaScript |
+ {"$code": "function(){}"} |
+ "function(){}" |
+
+
+ BsonJavaScriptWithScope |
+ {"$code": "function(){}", "$scope": {"name": "Anne"}} |
+ '{"$code": "function(){}", "$scope": {"name": "Anne"}}' |
+
+
+ BsonDocument |
+
+
+{
+ "decimalPi": {"$numberDecimal": "3.14"},
+ "doublePi": {"$numberDouble": "3.14"},
+ "doubleNaN": {"$numberDouble": "NaN"},
+ "decimalNaN": {"$numberDecimal": "NaN"},
+ "long": {"$numberLong": "100"},
+ "bool": true,
+ "array": [
+ {"$numberInt": "1"},
+ {"$numberLong": "2"}
+ ]
+}
+
+ |
+
+
+'{
+ "decimalPi":3.14,
+ "doublePi":3.14,
+ "doubleNaN":"NaN",
+ "decimalNaN":"NaN",
+ "long":100,
+ "bool":true,
+ "array":[1,2]
+}'
+
+ |
+
+
+
+
+
+### How to use
+Use debezium-bson by adding the kafka_conf parameter **value.format=debezium-bson**. Let’s take table synchronization as an example:
+
+```bash
+/bin/flink run \
+ /path/to/paimon-flink-action-{{< version >}}.jar \
+ kafka_sync_table \
+ --warehouse hdfs:///path/to/warehouse \
+ --database test_db \
+ --table ods_mongodb_customers \
+ --primary_keys _id \
+ --kafka_conf properties.bootstrap.servers=127.0.0.1:9020 \
+ --kafka_conf topic=customers \
+ --kafka_conf properties.group.id=123456 \
+ --kafka_conf value.format=debezium-bson \
+ --catalog_conf metastore=filesystem \
+ --table_conf bucket=4 \
+ --table_conf changelog-producer=input \
+ --table_conf sink.parallelism=4
+```
+
+
diff --git a/docs/content/cdc-ingestion/kafka-cdc.md b/docs/content/cdc-ingestion/kafka-cdc.md
index fc16c5b0fc1f..7747495c5bb9 100644
--- a/docs/content/cdc-ingestion/kafka-cdc.md
+++ b/docs/content/cdc-ingestion/kafka-cdc.md
@@ -67,6 +67,10 @@ If a message in a Kafka topic is a change event captured from another database u
aws-dms-json |
True |
+
+ | }}">debezium-bson |
+ True |
+
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java
index 10dcd56b80dd..397575c8e598 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java
@@ -22,13 +22,17 @@
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.mongodb.BsonValueConvertor;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.TypeUtils;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.TextNode;
import org.bson.BsonDocument;
import org.bson.BsonValue;
@@ -37,14 +41,23 @@
import javax.annotation.Nullable;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.FIELD_BEFORE;
import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.FIELD_PAYLOAD;
import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.FIELD_SCHEMA;
+import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.FIELD_TYPE;
+import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.OP_DELETE;
+import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.OP_INSERT;
+import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.OP_READE;
+import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.OP_UPDATE;
+import static org.apache.paimon.utils.JsonSerdeUtil.fromJson;
+import static org.apache.paimon.utils.JsonSerdeUtil.isNull;
import static org.apache.paimon.utils.JsonSerdeUtil.writeValueAsString;
/**
@@ -63,19 +76,47 @@ public class DebeziumBsonRecordParser extends DebeziumJsonRecordParser {
private static final String FIELD_COLLECTION = "collection";
private static final String FIELD_OBJECT_ID = "_id";
+ private static final String FIELD_KEY_ID = "id";
private static final List PRIMARY_KEYS = Collections.singletonList(FIELD_OBJECT_ID);
+ private ObjectNode keyRoot;
+
public DebeziumBsonRecordParser(TypeMapping typeMapping, List computedColumns) {
super(typeMapping, computedColumns);
}
+ @Override
+ public List extractRecords() {
+ String operation = getAndCheck(FIELD_TYPE).asText();
+ List records = new ArrayList<>();
+ switch (operation) {
+ case OP_INSERT:
+ case OP_READE:
+ processRecord(getData(), RowKind.INSERT, records);
+ break;
+ case OP_UPDATE:
+ processDeleteRecord(operation, records);
+ processRecord(getData(), RowKind.INSERT, records);
+ break;
+ case OP_DELETE:
+ processDeleteRecord(operation, records);
+ break;
+ default:
+ throw new UnsupportedOperationException("Unknown record operation: " + operation);
+ }
+ return records;
+ }
+
@Override
protected void setRoot(CdcSourceRecord record) {
- JsonNode node = (JsonNode) record.getValue();
- if (node.has(FIELD_SCHEMA)) {
- root = node.get(FIELD_PAYLOAD);
- } else {
- root = node;
+ root = (JsonNode) record.getValue();
+ if (root.has(FIELD_SCHEMA)) {
+ root = root.get(FIELD_PAYLOAD);
+ }
+
+ keyRoot = (ObjectNode) record.getKey();
+ if (!isNull(keyRoot) && keyRoot.has(FIELD_SCHEMA)) {
+ keyRoot = (ObjectNode) keyRoot.get(FIELD_PAYLOAD);
}
}
@@ -128,4 +169,37 @@ protected String getTableName() {
protected String format() {
return "debezium-bson";
}
+
+ public boolean checkBeforeExists() {
+ return !isNull(root.get(FIELD_BEFORE));
+ }
+
+ private void processDeleteRecord(String operation, List records) {
+ if (checkBeforeExists()) {
+ processRecord(getBefore(operation), RowKind.DELETE, records);
+ } else {
+ // Before version 6.0 of MongoDB, it was not possible to obtain 'Update Before'
+ // information. Therefore, data is first deleted using the key 'id'
+ JsonNode idNode = null;
+ Preconditions.checkArgument(
+ !isNull(keyRoot) && !isNull(idNode = keyRoot.get(FIELD_KEY_ID)),
+ "Invalid %s format: missing '%s' field in key when '%s' is '%s' for: %s.",
+ format(),
+ FIELD_KEY_ID,
+ FIELD_TYPE,
+ operation,
+ keyRoot);
+
+ // Deserialize id from json string to JsonNode
+ Map record =
+ Collections.singletonMap(
+ FIELD_OBJECT_ID, fromJson(idNode.asText(), JsonNode.class));
+
+ try {
+ processRecord(new TextNode(writeValueAsString(record)), RowKind.DELETE, records);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException("Failed to deserialize key record.", e);
+ }
+ }
+ }
}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumJsonRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumJsonRecordParser.java
index 66ddc2e1caaa..6e06e3adcaab 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumJsonRecordParser.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumJsonRecordParser.java
@@ -109,11 +109,11 @@ public List extractRecords() {
return records;
}
- private JsonNode getData() {
+ protected JsonNode getData() {
return getAndCheck(dataField());
}
- private JsonNode getBefore(String op) {
+ protected JsonNode getBefore(String op) {
return getAndCheck(FIELD_BEFORE, FIELD_TYPE, op);
}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumJsonDeserializationSchema.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumJsonDeserializationSchema.java
index 76211cf56dad..507c9eb63c3d 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumJsonDeserializationSchema.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumJsonDeserializationSchema.java
@@ -64,7 +64,14 @@ public CdcSourceRecord deserialize(ConsumerRecord message) throw
}
try {
- return new CdcSourceRecord(objectMapper.readValue(message.value(), JsonNode.class));
+ byte[] key = message.key();
+ JsonNode keyNode = null;
+ if (key != null) {
+ keyNode = objectMapper.readValue(key, JsonNode.class);
+ }
+
+ JsonNode valueNode = objectMapper.readValue(message.value(), JsonNode.class);
+ return new CdcSourceRecord(null, keyNode, valueNode);
} catch (Exception e) {
LOG.error("Invalid Json:\n{}", new String(message.value()));
throw e;
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/BsonValueConvertor.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/BsonValueConvertor.java
index 16d43821bd12..666ef9224268 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/BsonValueConvertor.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/BsonValueConvertor.java
@@ -47,8 +47,8 @@
import org.bson.types.Decimal128;
import org.bson.types.ObjectId;
-import java.math.BigDecimal;
import java.util.ArrayList;
+import java.util.Base64;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -61,15 +61,22 @@ private static Integer convert(BsonTimestamp bsonTimestamp) {
return bsonTimestamp.getTime();
}
- private static BigDecimal convert(BsonDecimal128 bsonValue) {
+ private static Number convert(BsonDecimal128 bsonValue) {
return convert(bsonValue.decimal128Value());
}
- private static BigDecimal convert(Decimal128 bsonValue) {
- if (bsonValue.isNaN() || bsonValue.isInfinite()) {
- return null;
+ private static Number convert(Decimal128 bsonValue) {
+ if (bsonValue.isNaN()) {
+ return Double.NaN;
+ } else if (bsonValue.isInfinite()) {
+ if (bsonValue.isNegative()) {
+ return Double.NEGATIVE_INFINITY;
+ } else {
+ return Double.POSITIVE_INFINITY;
+ }
+ } else {
+ return bsonValue.bigDecimalValue();
}
- return bsonValue.bigDecimalValue();
}
private static String convert(BsonObjectId objectId) {
@@ -84,7 +91,7 @@ private static String convert(BsonBinary bsonBinary) {
if (BsonBinarySubType.isUuid(bsonBinary.getType())) {
return bsonBinary.asUuid().toString();
} else {
- return toHex(bsonBinary.getData());
+ return Base64.getEncoder().encodeToString(bsonBinary.getData());
}
}
@@ -99,11 +106,7 @@ private static String convert(BsonRegularExpression regex) {
}
private static Double convert(BsonDouble bsonDouble) {
- double value = bsonDouble.getValue();
- if (Double.isNaN(value) || Double.isInfinite(value)) {
- return null;
- }
- return value;
+ return bsonDouble.getValue();
}
private static String convert(BsonString string) {
@@ -136,8 +139,8 @@ private static String convert(BsonJavaScript javascript) {
private static Map convert(BsonJavaScriptWithScope javascriptWithScope) {
return CollectionUtil.map(
- Pair.of("code", javascriptWithScope.getCode()),
- Pair.of("scope", convert(javascriptWithScope.getScope())));
+ Pair.of("$code", javascriptWithScope.getCode()),
+ Pair.of("$scope", convert(javascriptWithScope.getScope())));
}
private static String convert(BsonNull bsonNull) {
@@ -224,19 +227,4 @@ public static Object convert(BsonValue bsonValue) {
"Unsupported BSON type: " + bsonValue.getBsonType());
}
}
-
- public static String toHex(byte[] bytes) {
- StringBuilder sb = new StringBuilder();
-
- for (byte b : bytes) {
- String s = Integer.toHexString(255 & b);
- if (s.length() < 2) {
- sb.append("0");
- }
-
- sb.append(s);
- }
-
- return sb.toString();
- }
}
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParserTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParserTest.java
new file mode 100644
index 000000000000..78d3bcc3de60
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParserTest.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.format.debezium;
+
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
+import org.apache.paimon.flink.action.cdc.format.DataFormat;
+import org.apache.paimon.flink.action.cdc.watermark.MessageQueueCdcTimestampExtractor;
+import org.apache.paimon.flink.sink.cdc.CdcRecord;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.JsonSerdeUtil;
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.TextNode;
+
+import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Test for DebeziumBsonRecordParser. */
+public class DebeziumBsonRecordParserTest {
+
+ private static final Logger log = LoggerFactory.getLogger(DebeziumBsonRecordParserTest.class);
+ private static List insertList = new ArrayList<>();
+ private static List updateList = new ArrayList<>();
+ private static List deleteList = new ArrayList<>();
+
+ private static ArrayList bsonRecords = new ArrayList<>();
+ private static ArrayList jsonRecords = new ArrayList<>();
+
+ private static Map keyEvent = new HashMap<>();
+
+ private static KafkaDeserializationSchema kafkaDeserializationSchema = null;
+
+ private static Map beforeEvent = new HashMap<>();
+
+ private static Map afterEvent = new HashMap<>();
+
+ @BeforeAll
+ public static void beforeAll() throws Exception {
+ DataFormat dataFormat = new DebeziumBsonDataFormatFactory().create();
+ kafkaDeserializationSchema = dataFormat.createKafkaDeserializer(null);
+
+ keyEvent.put("_id", "67ab25755c0d5ac87eb8c632");
+
+ beforeEvent.put("_id", "67ab25755c0d5ac87eb8c632");
+ beforeEvent.put("created_at", "1736207571013");
+ beforeEvent.put("created_by", "peter");
+ beforeEvent.put("tags", "[\"pending\"]");
+ beforeEvent.put("updated_at", "1739455297970");
+
+ afterEvent.put("_id", "67ab25755c0d5ac87eb8c632");
+ afterEvent.put("created_at", "1736207571013");
+ afterEvent.put("created_by", "peter");
+ afterEvent.put("tags", "[\"succeed\"]");
+ afterEvent.put("updated_at", "1739455397970");
+
+ String insertRes = "kafka/debezium-bson/table/event/event-insert.txt";
+ String updateRes = "kafka/debezium-bson/table/event/event-update.txt";
+ String deleteRes = "kafka/debezium-bson/table/event/event-delete.txt";
+ String bsonPth = "kafka/debezium-bson/table/event/event-bson.txt";
+ String jsonPath = "kafka/debezium-bson/table/event/event-json.txt";
+
+ parseCdcSourceRecords(insertRes, insertList);
+
+ parseCdcSourceRecords(updateRes, updateList);
+
+ parseCdcSourceRecords(deleteRes, deleteList);
+
+ parseCdcSourceRecords(bsonPth, bsonRecords);
+
+ parseCdcSourceRecords(jsonPath, jsonRecords);
+ }
+
+ @AfterAll
+ public static void afterAll() {
+ insertList.clear();
+ updateList.clear();
+ deleteList.clear();
+ bsonRecords.clear();
+ jsonRecords.clear();
+ }
+
+ private static void parseCdcSourceRecords(String resourcePath, List records)
+ throws Exception {
+ URL url = DebeziumBsonRecordParserTest.class.getClassLoader().getResource(resourcePath);
+ List line = Files.readAllLines(Paths.get(url.toURI()));
+ String key = null;
+ for (String json : line) {
+ if (StringUtils.isNullOrWhitespaceOnly(json) || !json.startsWith("{")) {
+ continue;
+ }
+ if (key == null) {
+ key = json;
+ } else {
+ // test kafka deserialization
+ records.add(deserializeKafkaSchema(key, json));
+ key = null;
+ }
+ }
+ }
+
+ @Test
+ public void extractInsertRecord() throws Exception {
+ DebeziumBsonRecordParser parser =
+ new DebeziumBsonRecordParser(TypeMapping.defaultMapping(), Collections.emptyList());
+ Assertions.assertFalse(insertList.isEmpty());
+ for (CdcSourceRecord cdcRecord : insertList) {
+ Schema schema = parser.buildSchema(cdcRecord);
+ Assertions.assertEquals(schema.primaryKeys(), Arrays.asList("_id"));
+
+ List records = parser.extractRecords();
+ Assertions.assertEquals(records.size(), 1);
+
+ CdcRecord result = records.get(0).toRichCdcRecord().toCdcRecord();
+ Assertions.assertEquals(result.kind(), RowKind.INSERT);
+ Assertions.assertEquals(beforeEvent, result.data());
+
+ String dbName = parser.getDatabaseName();
+ Assertions.assertEquals(dbName, "bigdata_test");
+
+ String tableName = parser.getTableName();
+ Assertions.assertEquals(tableName, "sync_test_table");
+
+ MessageQueueCdcTimestampExtractor extractor = new MessageQueueCdcTimestampExtractor();
+ Assertions.assertTrue(extractor.extractTimestamp(cdcRecord) > 0);
+ }
+ }
+
+ @Test
+ public void extractUpdateRecord() throws Exception {
+ DebeziumBsonRecordParser parser =
+ new DebeziumBsonRecordParser(TypeMapping.defaultMapping(), Collections.emptyList());
+ Assertions.assertFalse(updateList.isEmpty());
+ for (CdcSourceRecord cdcRecord : updateList) {
+ Schema schema = parser.buildSchema(cdcRecord);
+ Assertions.assertEquals(schema.primaryKeys(), Arrays.asList("_id"));
+
+ List records = parser.extractRecords();
+ Assertions.assertEquals(records.size(), 2);
+
+ CdcRecord updateBefore = records.get(0).toRichCdcRecord().toCdcRecord();
+ Assertions.assertEquals(updateBefore.kind(), RowKind.DELETE);
+ if (parser.checkBeforeExists()) {
+ Assertions.assertEquals(beforeEvent, updateBefore.data());
+ } else {
+ Assertions.assertEquals(keyEvent, updateBefore.data());
+ }
+
+ CdcRecord updateAfter = records.get(1).toRichCdcRecord().toCdcRecord();
+ Assertions.assertEquals(updateAfter.kind(), RowKind.INSERT);
+ Assertions.assertEquals(afterEvent, updateAfter.data());
+
+ String dbName = parser.getDatabaseName();
+ Assertions.assertEquals(dbName, "bigdata_test");
+
+ String tableName = parser.getTableName();
+ Assertions.assertEquals(tableName, "sync_test_table");
+
+ MessageQueueCdcTimestampExtractor extractor = new MessageQueueCdcTimestampExtractor();
+ Assertions.assertTrue(extractor.extractTimestamp(cdcRecord) > 0);
+ }
+ }
+
+ @Test
+ public void extractDeleteRecord() throws Exception {
+ DebeziumBsonRecordParser parser =
+ new DebeziumBsonRecordParser(TypeMapping.defaultMapping(), Collections.emptyList());
+ Assertions.assertFalse(deleteList.isEmpty());
+ for (CdcSourceRecord cdcRecord : deleteList) {
+ Schema schema = parser.buildSchema(cdcRecord);
+ Assertions.assertEquals(schema.primaryKeys(), Arrays.asList("_id"));
+
+ List records = parser.extractRecords();
+ Assertions.assertEquals(records.size(), 1);
+
+ CdcRecord result = records.get(0).toRichCdcRecord().toCdcRecord();
+ Assertions.assertEquals(result.kind(), RowKind.DELETE);
+ if (parser.checkBeforeExists()) {
+ Assertions.assertEquals(beforeEvent, result.data());
+ } else {
+ Assertions.assertEquals(keyEvent, result.data());
+ }
+
+ String dbName = parser.getDatabaseName();
+ Assertions.assertEquals(dbName, "bigdata_test");
+
+ String tableName = parser.getTableName();
+ Assertions.assertEquals(tableName, "sync_test_table");
+
+ MessageQueueCdcTimestampExtractor extractor = new MessageQueueCdcTimestampExtractor();
+ Assertions.assertTrue(extractor.extractTimestamp(cdcRecord) > 0);
+ }
+ }
+
+ @Test
+ public void bsonConvertJsonTest() throws Exception {
+ DebeziumBsonRecordParser parser =
+ new DebeziumBsonRecordParser(TypeMapping.defaultMapping(), Collections.emptyList());
+
+ Assertions.assertFalse(jsonRecords.isEmpty());
+ for (int i = 0; i < jsonRecords.size(); i++) {
+ CdcSourceRecord bsonRecord = bsonRecords.get(i);
+ CdcSourceRecord jsonRecord = jsonRecords.get(i);
+
+ JsonNode bsonTextNode =
+ new TextNode(JsonSerdeUtil.writeValueAsString(bsonRecord.getValue()));
+ Map resultMap = parser.extractRowData(bsonTextNode, RowType.builder());
+
+ ObjectNode expectNode = (ObjectNode) jsonRecord.getValue();
+
+ expectNode
+ .fields()
+ .forEachRemaining(
+ entry -> {
+ String key = entry.getKey();
+ String expectValue = null;
+ if (!JsonSerdeUtil.isNull(entry.getValue())) {
+ expectValue = entry.getValue().asText();
+ }
+ Assertions.assertEquals(expectValue, resultMap.get(key));
+ });
+ }
+ }
+
+ private static CdcSourceRecord deserializeKafkaSchema(String key, String value)
+ throws Exception {
+ return kafkaDeserializationSchema.deserialize(
+ new ConsumerRecord<>("topic", 0, 0, key.getBytes(), value.getBytes()));
+ }
+}
diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-bson.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-bson.txt
new file mode 100644
index 000000000000..be744a114770
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-bson.txt
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{"id": "dummy"}
+{"id": {"$oid": "596e275826f08b2730779e1f"},"name": "Sally","age": 25,"created_time": {"$numberLong": "1735934393769"},"updated_time": 1735934393769,"deleted_time": {"$date": 1735934393769 },"register_time": {"$timestamp": {"t": 1736997330, "i": 2}},"double_nan": {"$numberDouble": "NaN"},"double_inf": {"$numberDouble": "Infinity"},"double_ninf": {"$numberDouble": "-Infinity"},"double_zero": {"$numberDouble": "0"},"boolean_true": true,"boolean_false": false,"array": ["a", "b", "c"],"array_mix": ["1", 2, {"$numberLong": "1735934393769"}],"decimal": {"$numberDecimal": "3.14"},"decimal_nan": {"$numberDecimal": "NaN"},"regex": {"$regularExpression": {"pattern": "^pass$", "options": "i"}},"symbol": {"$symbol": "symbol"},"minKey": {"$minKey": 1},"maxKey": {"$maxKey": 1},"code": {"$code": "function(){}"},"null": null,"undefined": {"$undefined": true},"binary": {"$binary": "uE2/4v5MSVOiJZkOo3APKQ==", "$type": "0"},"uuid": {"$binary": "uE2/4v5MSVOiJZkOo3APKQ==", "$type": "4"} ,"context": { "city": "Shanghai", "location": { "lat": 31.2304, "lon": {"$numberDouble": "121.4737"} }, "start_time": 0, "end_time": {"$numberLong": "1735934393769"}, "args": [1, 2], "nan": {"$numberDouble": "NaN"}}}
\ No newline at end of file
diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-delete.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-delete.txt
new file mode 100644
index 000000000000..a1de2bc514c8
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-delete.txt
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{"id": "{\"$oid\": \"67ab25755c0d5ac87eb8c632\"}"}
+{"before": null,"after": null,"updateDescription": null,"source": {"version": "2.7.0.Final","connector": "mongodb","name": "mongodb_bigdata_test","ts_ms": 1739321992000,"snapshot": "false","db": "bigdata_test","sequence": null,"ts_us": 1739321992000000,"ts_ns": 1739321992000000000,"collection": "sync_test_table","ord": 28,"lsid": null,"txnNumber": null,"wallTime": null},"op": "d","ts_ms": 1739321992890,"transaction": null}
+{"id": "\"67ab25755c0d5ac87eb8c632\""}
+{"before": null,"after": null,"updateDescription": null,"source": {"version": "2.7.0.Final","connector": "mongodb","name": "mongodb_bigdata_test","ts_ms": 1739321992000,"snapshot": "false","db": "bigdata_test","sequence": null,"ts_us": 1739321992000000,"ts_ns": 1739321992000000000,"collection": "sync_test_table","ord": 28,"lsid": null,"txnNumber": null,"wallTime": null},"op": "d","ts_ms": 1739321992890,"transaction": null}
+{"id": "{\"$oid\": \"67ab25755c0d5ac87eb8c632\"}"}
+{"before": "{\"_id\": {\"$oid\": \"67ab25755c0d5ac87eb8c632\"},\"created_at\": {\"$numberLong\": \"1736207571013\"},\"created_by\": \"peter\",\"tags\": [\"pending\"],\"updated_at\": {\"$date\": 1739455297970}}","after": null,"updateDescription": null,"source": {"version": "2.7.0.Final","connector": "mongodb","name": "mongodb_bigdata_test","ts_ms": 1739321992000,"snapshot": "false","db": "bigdata_test","sequence": null,"ts_us": 1739321992000000,"ts_ns": 1739321992000000000,"collection": "sync_test_table","ord": 28,"lsid": null,"txnNumber": null,"wallTime": null},"op": "d","ts_ms": 1739321992890,"transaction": null}
\ No newline at end of file
diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-insert.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-insert.txt
new file mode 100644
index 000000000000..ff978460151b
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-insert.txt
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{"id": "{\"$oid\": \"67ab25755c0d5ac87eb8c632\"}"}
+{"before": null,"after": "{\"_id\": {\"$oid\": \"67ab25755c0d5ac87eb8c632\"},\"created_at\": {\"$numberLong\": \"1736207571013\"},\"created_by\": \"peter\",\"tags\": [\"pending\"],\"updated_at\": {\"$date\": 1739455297970}}","updateDescription": null,"source": {"version": "2.7.0.Final","connector": "mongodb","name": "mongodb_bigdata_test","ts_ms": 1739321992000,"snapshot": "false","db": "bigdata_test","sequence": null,"ts_us": 1739321992000000,"ts_ns": 1739321992000000000,"collection": "sync_test_table","ord": 28,"lsid": null,"txnNumber": null,"wallTime": null},"op": "c","ts_ms": 1739321992890,"transaction": null}
\ No newline at end of file
diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-json.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-json.txt
new file mode 100644
index 000000000000..115ee5a91b34
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-json.txt
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{"id": "dummy"}
+{"id":"596e275826f08b2730779e1f","name":"Sally","age":"25","created_time":"1735934393769","updated_time":"1735934393769","deleted_time":"1735934393769","register_time":"1736997330","double_nan":"NaN","double_inf":"Infinity","double_ninf":"-Infinity","double_zero":"0.0","boolean_true":"true","boolean_false":"false","array":"[\"a\",\"b\",\"c\"]","array_mix":"[\"1\",2,1735934393769]","decimal":"3.14","decimal_nan":"NaN","regex":"/^pass$/i","symbol":"symbol","minKey":"BsonMinKey","maxKey":"BsonMaxKey","code":"function(){}","null":null,"undefined":null,"binary":"uE2/4v5MSVOiJZkOo3APKQ==","uuid":"b84dbfe2-fe4c-4953-a225-990ea3700f29","context":"{\"city\":\"Shanghai\",\"location\":{\"lat\":31.2304,\"lon\":121.4737},\"start_time\":0,\"end_time\":1735934393769,\"args\":[1,2],\"nan\":\"NaN\"}"}
\ No newline at end of file
diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-update.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-update.txt
new file mode 100644
index 000000000000..035ce3ff873a
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-update.txt
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{"id": "{\"$oid\": \"67ab25755c0d5ac87eb8c632\"}"}
+{"before": null,"after": "{\"_id\": {\"$oid\": \"67ab25755c0d5ac87eb8c632\"},\"created_at\": {\"$numberLong\": \"1736207571013\"},\"created_by\": \"peter\",\"tags\": [\"succeed\"],\"updated_at\": {\"$date\": 1739455397970}}","updateDescription": null,"source": {"version": "2.7.0.Final","connector": "mongodb","name": "mongodb_bigdata_test","ts_ms": 1739321992000,"snapshot": "false","db": "bigdata_test","sequence": null,"ts_us": 1739321992000000,"ts_ns": 1739321992000000000,"collection": "sync_test_table","ord": 28,"lsid": null,"txnNumber": null,"wallTime": null},"op": "u","ts_ms": 1739321992890,"transaction": null}
+{"id": "{\"$oid\": \"67ab25755c0d5ac87eb8c632\"}"}
+{"before": "{\"_id\": {\"$oid\": \"67ab25755c0d5ac87eb8c632\"},\"created_at\": {\"$numberLong\": \"1736207571013\"},\"created_by\": \"peter\",\"tags\": [\"pending\"],\"updated_at\": {\"$date\": 1739455297970}}","after": "{\"_id\": {\"$oid\": \"67ab25755c0d5ac87eb8c632\"},\"created_at\": {\"$numberLong\": \"1736207571013\"},\"created_by\": \"peter\",\"tags\": [\"succeed\"],\"updated_at\": {\"$date\": 1739455397970}}","updateDescription": null,"source": {"version": "2.7.0.Final","connector": "mongodb","name": "mongodb_bigdata_test","ts_ms": 1739321992000,"snapshot": "false","db": "bigdata_test","sequence": null,"ts_us": 1739321992000000,"ts_ns": 1739321992000000000,"collection": "sync_test_table","ord": 28,"lsid": null,"txnNumber": null,"wallTime": null},"op": "u","ts_ms": 1739321992890,"transaction": null}