diff --git a/paimon-flink/paimon-flink-cdc/pom.xml b/paimon-flink/paimon-flink-cdc/pom.xml
index bb7c0d865564..6d4ecb3c4791 100644
--- a/paimon-flink/paimon-flink-cdc/pom.xml
+++ b/paimon-flink/paimon-flink-cdc/pom.xml
@@ -44,6 +44,7 @@ under the License.
4.0.0-1.177.5.03.3.0-1.20
+ 5.2.1
@@ -165,6 +166,13 @@ under the License.
${json-path.version}
+
+ org.mongodb
+ bson
+ ${bson.version}
+ provided
+
+
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonDataFormat.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonDataFormat.java
new file mode 100644
index 000000000000..e9b6a573f0a5
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonDataFormat.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.format.debezium;
+
+import org.apache.paimon.flink.action.cdc.format.AbstractJsonDataFormat;
+import org.apache.paimon.flink.action.cdc.format.RecordParserFactory;
+
+/**
+ * Supports the message queue's debezium bson data format and provides definitions for the message
+ * queue's record json deserialization class and parsing class {@link DebeziumBsonRecordParser}.
+ */
+public class DebeziumBsonDataFormat extends AbstractJsonDataFormat {
+
+ @Override
+ protected RecordParserFactory parser() {
+ return DebeziumBsonRecordParser::new;
+ }
+}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonDataFormatFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonDataFormatFactory.java
new file mode 100644
index 000000000000..2d55cb983280
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonDataFormatFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.format.debezium;
+
+import org.apache.paimon.flink.action.cdc.format.DataFormat;
+import org.apache.paimon.flink.action.cdc.format.DataFormatFactory;
+
+/** Factory to create {@link DebeziumBsonDataFormat}. */
+public class DebeziumBsonDataFormatFactory implements DataFormatFactory {
+
+ public static final String IDENTIFIER = "debezium-bson";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public DataFormat create() {
+ return new DebeziumBsonDataFormat();
+ }
+}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java
new file mode 100644
index 000000000000..10dcd56b80dd
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.format.debezium;
+
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
+import org.apache.paimon.flink.action.cdc.ComputedColumn;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
+import org.apache.paimon.flink.action.cdc.mongodb.BsonValueConvertor;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.TypeUtils;
+
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
+
+import org.bson.BsonDocument;
+import org.bson.BsonValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.FIELD_PAYLOAD;
+import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.FIELD_SCHEMA;
+import static org.apache.paimon.utils.JsonSerdeUtil.writeValueAsString;
+
+/**
+ * The {@code DebeziumRecordParser} class extends the {@link DebeziumJsonRecordParser} and is
+ * designed to parse records from MongoDB's BSON change data capture (CDC) format via Debezium.
+ *
+ *
The class supports The following features:
+ *
+ *
Convert bson string to java object from before/after field
+ *
+ *
Parse the schema from bson, all fields are string type, and the _id field is the primary key
+ */
+public class DebeziumBsonRecordParser extends DebeziumJsonRecordParser {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DebeziumBsonRecordParser.class);
+
+ private static final String FIELD_COLLECTION = "collection";
+ private static final String FIELD_OBJECT_ID = "_id";
+ private static final List PRIMARY_KEYS = Collections.singletonList(FIELD_OBJECT_ID);
+
+ public DebeziumBsonRecordParser(TypeMapping typeMapping, List computedColumns) {
+ super(typeMapping, computedColumns);
+ }
+
+ @Override
+ protected void setRoot(CdcSourceRecord record) {
+ JsonNode node = (JsonNode) record.getValue();
+ if (node.has(FIELD_SCHEMA)) {
+ root = node.get(FIELD_PAYLOAD);
+ } else {
+ root = node;
+ }
+ }
+
+ @Override
+ protected Map extractRowData(JsonNode record, RowType.Builder rowTypeBuilder) {
+ // bson record should be a string
+ Preconditions.checkArgument(
+ record.isTextual(),
+ "debezium bson record expected to be STRING type, but actual is %s",
+ record.getNodeType());
+
+ BsonDocument document = BsonDocument.parse(record.asText());
+ LinkedHashMap resultMap = new LinkedHashMap<>();
+ for (Map.Entry entry : document.entrySet()) {
+ String fieldName = entry.getKey();
+ resultMap.put(fieldName, toJsonString(BsonValueConvertor.convert(entry.getValue())));
+ rowTypeBuilder.field(fieldName, DataTypes.STRING());
+ }
+
+ evalComputedColumns(resultMap, rowTypeBuilder);
+
+ return resultMap;
+ }
+
+ private static String toJsonString(Object entry) {
+ if (entry == null) {
+ return null;
+ } else if (!TypeUtils.isBasicType(entry)) {
+ try {
+ return writeValueAsString(entry);
+ } catch (JsonProcessingException e) {
+ LOG.error("Failed to deserialize record.", e);
+ }
+ }
+ return Objects.toString(entry);
+ }
+
+ @Override
+ protected List extractPrimaryKeys() {
+ return PRIMARY_KEYS;
+ }
+
+ @Nullable
+ @Override
+ protected String getTableName() {
+ return getFromSourceField(FIELD_COLLECTION);
+ }
+
+ @Override
+ protected String format() {
+ return "debezium-bson";
+ }
+}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumJsonRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumJsonRecordParser.java
index 6349fc6a95d2..66ddc2e1caaa 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumJsonRecordParser.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumJsonRecordParser.java
@@ -244,7 +244,7 @@ protected String format() {
}
@Nullable
- private String getFromSourceField(String key) {
+ protected String getFromSourceField(String key) {
JsonNode node = root.get(FIELD_SOURCE);
if (isNull(node)) {
return null;
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/BsonValueConvertor.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/BsonValueConvertor.java
new file mode 100644
index 000000000000..16d43821bd12
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/BsonValueConvertor.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mongodb;
+
+import org.apache.paimon.utils.Pair;
+
+import org.apache.flink.util.CollectionUtil;
+import org.bson.BsonArray;
+import org.bson.BsonBinary;
+import org.bson.BsonBinarySubType;
+import org.bson.BsonBoolean;
+import org.bson.BsonDateTime;
+import org.bson.BsonDbPointer;
+import org.bson.BsonDecimal128;
+import org.bson.BsonDocument;
+import org.bson.BsonDouble;
+import org.bson.BsonInt32;
+import org.bson.BsonInt64;
+import org.bson.BsonJavaScript;
+import org.bson.BsonJavaScriptWithScope;
+import org.bson.BsonMaxKey;
+import org.bson.BsonMinKey;
+import org.bson.BsonNull;
+import org.bson.BsonObjectId;
+import org.bson.BsonRegularExpression;
+import org.bson.BsonString;
+import org.bson.BsonSymbol;
+import org.bson.BsonTimestamp;
+import org.bson.BsonUndefined;
+import org.bson.BsonValue;
+import org.bson.types.Decimal128;
+import org.bson.types.ObjectId;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The {@code BsonValueConvertor} class is designed to convert {@link BsonValue} to Java objects.
+ */
+public class BsonValueConvertor {
+ private static Integer convert(BsonTimestamp bsonTimestamp) {
+ return bsonTimestamp.getTime();
+ }
+
+ private static BigDecimal convert(BsonDecimal128 bsonValue) {
+ return convert(bsonValue.decimal128Value());
+ }
+
+ private static BigDecimal convert(Decimal128 bsonValue) {
+ if (bsonValue.isNaN() || bsonValue.isInfinite()) {
+ return null;
+ }
+ return bsonValue.bigDecimalValue();
+ }
+
+ private static String convert(BsonObjectId objectId) {
+ return convert(objectId.getValue());
+ }
+
+ private static String convert(ObjectId objectId) {
+ return objectId.toHexString();
+ }
+
+ private static String convert(BsonBinary bsonBinary) {
+ if (BsonBinarySubType.isUuid(bsonBinary.getType())) {
+ return bsonBinary.asUuid().toString();
+ } else {
+ return toHex(bsonBinary.getData());
+ }
+ }
+
+ private static String convert(BsonUndefined bsonUndefined) {
+ return null;
+ }
+
+ private static String convert(BsonRegularExpression regex) {
+ String escaped =
+ regex.getPattern().isEmpty() ? "(?:)" : regex.getPattern().replace("/", "\\/");
+ return String.format("/%s/%s", escaped, regex.getOptions());
+ }
+
+ private static Double convert(BsonDouble bsonDouble) {
+ double value = bsonDouble.getValue();
+ if (Double.isNaN(value) || Double.isInfinite(value)) {
+ return null;
+ }
+ return value;
+ }
+
+ private static String convert(BsonString string) {
+ return string.getValue();
+ }
+
+ private static Integer convert(BsonInt32 int32) {
+ return int32.getValue();
+ }
+
+ private static Long convert(BsonInt64 int64) {
+ return int64.getValue();
+ }
+
+ private static Boolean convert(BsonBoolean bool) {
+ return bool.getValue();
+ }
+
+ private static Long convert(BsonDateTime dateTime) {
+ return dateTime.getValue();
+ }
+
+ private static String convert(BsonSymbol symbol) {
+ return symbol.getSymbol();
+ }
+
+ private static String convert(BsonJavaScript javascript) {
+ return javascript.getCode();
+ }
+
+ private static Map convert(BsonJavaScriptWithScope javascriptWithScope) {
+ return CollectionUtil.map(
+ Pair.of("code", javascriptWithScope.getCode()),
+ Pair.of("scope", convert(javascriptWithScope.getScope())));
+ }
+
+ private static String convert(BsonNull bsonNull) {
+ return null;
+ }
+
+ private static String convert(BsonDbPointer dbPointer) {
+ return dbPointer.toString();
+ }
+
+ private static String convert(BsonMaxKey maxKey) {
+ return maxKey.toString();
+ }
+
+ private static String convert(BsonMinKey minKey) {
+ return minKey.toString();
+ }
+
+ private static Map convert(BsonDocument document) {
+ LinkedHashMap map = new LinkedHashMap<>(document.size());
+ for (Map.Entry entry : document.entrySet()) {
+ map.put(entry.getKey(), convert(entry.getValue()));
+ }
+ return map;
+ }
+
+ private static List