From 6c8180787dd3d4af7256f3c46400d006323d745c Mon Sep 17 00:00:00 2001 From: lizc9 Date: Thu, 16 Jan 2025 15:07:17 +0800 Subject: [PATCH 1/2] [cdc] Add debezium-bson format document and bugfix bson value convert to java value --- docs/content/cdc-ingestion/debezium-bson.md | 329 ++++++++++++++++++ docs/content/cdc-ingestion/kafka-cdc.md | 4 + .../debezium/DebeziumBsonRecordParser.java | 30 ++ .../debezium/DebeziumJsonRecordParser.java | 4 +- .../cdc/mongodb/BsonValueConvertor.java | 46 +-- .../DebeziumBsonRecordParserTest.java | 196 +++++++++++ .../table/event/event-delete.txt | 18 + .../table/event/event-insert.txt | 18 + .../table/event/event-update.txt | 18 + 9 files changed, 632 insertions(+), 31 deletions(-) create mode 100644 docs/content/cdc-ingestion/debezium-bson.md create mode 100644 paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParserTest.java create mode 100644 paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-delete.txt create mode 100644 paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-insert.txt create mode 100644 paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-update.txt diff --git a/docs/content/cdc-ingestion/debezium-bson.md b/docs/content/cdc-ingestion/debezium-bson.md new file mode 100644 index 000000000000..978bb1202562 --- /dev/null +++ b/docs/content/cdc-ingestion/debezium-bson.md @@ -0,0 +1,329 @@ +--- +title: "Debezium BSON" +weight: 6 +type: docs +aliases: +- /cdc-ingestion/debezium-bson.html +--- + + +# Debezium BSON Format + + +The debezium-bson format is one of the formats supported by }}">Kafka CDC. +It is the format obtained by collecting mongodb through debezium, which is similar to +debezium-json format. +However, MongoDB does not have a fixed schema, and the field types of each document may be different, so the before/after fields +in JSON are all string types, while the debezium-json format requires a JSON object type. + + +## Prepare MongoDB BSON Jar + +Can be downloaded from the [Maven repository](https://mvnrepository.com/artifact/org.mongodb/bson) + +``` +bson-*.jar +``` + +## Introduction + +{{< hint info >}} +The debezium bson format requires insert/update/delete event messages include the full document, and include a field that represents the state of the document before the change. +This requires setting debezium's capture.mode to change_streams_update_full_with_pre_image and [capture.mode.full.update.type](https://debezium.io/documentation/reference/stable/connectors/mongodb.html#mongodb-property-capture-mode-full-update-type) to post_image. +The database must be running **MongoDB 6.0 or later** to use this option. +{{< /hint >}} + +Here is a simple example for an update operation captured from a Mongodb customers collection in JSON format: + +```json +{ + "schema": { + "type": "struct", + "fields": [ + { + "type": "string", + "optional": true, + "name": "io.debezium.data.Json", + "version": 1, + "field": "before" + }, + { + "type": "string", + "optional": true, + "name": "io.debezium.data.Json", + "version": 1, + "field": "after" + }, + ... + ] + }, + "payload": { + "before": "{\"_id\": {\"$oid\" : \"596e275826f08b2730779e1f\"}, \"name\" : \"Anne\", \"create_time\" : {\"$numberLong\" : \"1558965506000\"}, \"tags\":[\"success\"]}", + "after": "{\"_id\": {\"$oid\" : \"596e275826f08b2730779e1f\"}, \"name\" : \"Anne\", \"create_time\" : {\"$numberLong\" : \"1558965506000\"}, \"tags\":[\"passion\",\"success\"]}", + "source": { + "db": "inventory", + "rs": "rs0", + "collection": "customers", + ... + }, + "op": "u", + "ts_ms": 1558965515240, + "ts_us": 1558965515240142, + "ts_ns": 1558965515240142879 + } +} +``` + +This document from the MongoDB collection customers has 4 columns, the _id is a BSON ObjectID, name is a string, +create_time is a long, tags is an array of string. The following is the processing result in debezium-bson format: + +Document Schema: + +| Field Name | Field Type | Key | +|------------|------------|-------------| +| _id | STRING | Primary Key | +| name | STRING | | +| create_time| STRING | | +| tags | STRING | | + +Records: + +| RowKind | _id | name | create_time | tags | +|---------|--------------------------|-------|----------------------------|-----------------------| +| -U | 596e275826f08b2730779e1f | Anne | 1558965506000 | ["success"] | +| +U | 596e275826f08b2730779e1f | Anne | 1558965506000 | ["passion","success"] | + + +### How it works +Because the schema field of the event message does not have the field information of the document, the debezium-bson format does not require event messages to have schema information. The specific operations are as follows: + +- Parse the before/after fields of the event message into BSONDocument. +- Recursive traversal all fields of BSONDocument and convert BsonValue to Java Object. +- All top-level fields of before/after are converted to string type, and _id is fixed to primary key +- If the top-level fields of before/after is a basic type(such as Integer/Long, etc.), it is directly converted to a string, if not, it is converted to a JSON string + +Below is a list of top-level field BsonValue conversion examples: + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
BsonValue TypeJson ValueConversion Result String
BsonString
"hello""hello"
BsonInt32
123"123"
BsonInt64
+
    +
  • 1735934393769
  • +
  • {"$numberLong": 1735934393769}
  • +
+
"1735934393769"
BsonDouble
+
    +
  • {"$numberDouble": "3.14"}
  • +
  • {"$numberDouble": "NaN"}
  • +
  • {"$numberDouble": "Infinity"}
  • +
  • {"$numberDouble": "-Infinity"}
  • +
+
+
    +
  • "3.14"
  • +
  • "NaN"
  • +
  • "Infinity"
  • +
  • "-Infinity"
  • +
+
BsonBoolean
+
    +
  • true
  • +
  • false
  • +
+
+
    +
  • "true"
  • +
  • "false"
  • +
+
BsonArray
[1,2,{"$numberLong": 1735934393769}]"[1,2,1735934393769]"
BsonObjectId
{"$oid": "596e275826f08b2730779e1f"}"596e275826f08b2730779e1f"
BsonDateTime
{"$date": 1735934393769 }"1735934393769"
BsonNull
nullnull
BsonUndefined
{"$undefined": true}null
BsonBinary
{"$binary": "uE2/4v5MSVOiJZkOo3APKQ==", "$type": "0"}"uE2/4v5MSVOiJZkOo3APKQ=="
BsonBinary(type=UUID)
{"$binary": "uE2/4v5MSVOiJZkOo3APKQ==", "$type": "4"}"b84dbfe2-fe4c-4953-a225-990ea3700f29"
BsonDecimal128
+
    +
  • {"$numberDecimal": "3.14"}
  • +
  • {"$numberDecimal": "NaN"}
  • +
+
+
    +
  • "3.14"
  • +
  • "NaN"
  • +
+
BsonRegularExpression
{"$regularExpression": {"pattern": "^pass$", "options": "i"}}"/^pass$/i"
BsonSymbol
{"$symbol": "symbol"}"symbol"
BsonTimestamp
{"$timestamp": {"t": 1736997330, "i": 2}}"1736997330"
BsonMinKey
{"$minKey": 1}"BsonMinKey"
BsonMaxKey
{"$maxKey": 1}"BsonMaxKey"
BsonJavaScript
{"$code": "function(){}"}"function(){}"
BsonJavaScriptWithScope
{"$code": "function(){}", "$scope": {"name": "Anne"}}'{"$code": "function(){}", "$scope": {"name": "Anne"}}'
BsonDocument
+
+{
+  "decimalPi": {"$numberDecimal": "3.14"},
+  "doublePi": {"$numberDouble": "3.14"},
+  "doubleNaN": {"$numberDouble": "NaN"},
+  "decimalNaN": {"$numberDecimal": "NaN"},
+  "long": {"$numberLong": "100"},
+  "bool": true,
+  "array": [
+    {"$numberInt": "1"},
+    {"$numberLong": "2"}
+  ]
+}
+
+
+
+'{
+  "decimalPi":3.14,
+  "doublePi":3.14,
+  "doubleNaN":"NaN",
+  "decimalNaN":"NaN",
+  "long":100,
+  "bool":true,
+  "array":[1,2]
+}'
+
+
+ + +### How to use +Use debezium-bson by adding the kafka_conf parameter **value.format=debezium-bson**. Let’s take table synchronization as an example: + +```bash +/bin/flink run \ + /path/to/paimon-flink-action-{{< version >}}.jar \ + kafka_sync_table \ + --warehouse hdfs:///path/to/warehouse \ + --database test_db \ + --table ods_mongodb_customers \ + --primary_keys _id \ + --kafka_conf properties.bootstrap.servers=127.0.0.1:9020 \ + --kafka_conf topic=customers \ + --kafka_conf properties.group.id=123456 \ + --kafka_conf value.format=debezium-bson \ + --catalog_conf metastore=filesystem \ + --table_conf bucket=4 \ + --table_conf changelog-producer=input \ + --table_conf sink.parallelism=4 +``` + + diff --git a/docs/content/cdc-ingestion/kafka-cdc.md b/docs/content/cdc-ingestion/kafka-cdc.md index fc16c5b0fc1f..7747495c5bb9 100644 --- a/docs/content/cdc-ingestion/kafka-cdc.md +++ b/docs/content/cdc-ingestion/kafka-cdc.md @@ -67,6 +67,10 @@ If a message in a Kafka topic is a change event captured from another database u aws-dms-json True + + }}">debezium-bson + True + diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java index 10dcd56b80dd..525a5af81930 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java @@ -22,7 +22,9 @@ import org.apache.paimon.flink.action.cdc.ComputedColumn; import org.apache.paimon.flink.action.cdc.TypeMapping; import org.apache.paimon.flink.action.cdc.mongodb.BsonValueConvertor; +import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord; import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.TypeUtils; @@ -37,6 +39,7 @@ import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; @@ -45,6 +48,11 @@ import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.FIELD_PAYLOAD; import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.FIELD_SCHEMA; +import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.FIELD_TYPE; +import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.OP_DELETE; +import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.OP_INSERT; +import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.OP_READE; +import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.OP_UPDATE; import static org.apache.paimon.utils.JsonSerdeUtil.writeValueAsString; /** @@ -69,6 +77,28 @@ public DebeziumBsonRecordParser(TypeMapping typeMapping, List co super(typeMapping, computedColumns); } + @Override + public List extractRecords() { + String operation = getAndCheck(FIELD_TYPE).asText(); + List records = new ArrayList<>(); + switch (operation) { + case OP_INSERT: + case OP_READE: + processRecord(getData(), RowKind.INSERT, records); + break; + case OP_UPDATE: + processRecord(getBefore(operation), RowKind.DELETE, records); + processRecord(getData(), RowKind.INSERT, records); + break; + case OP_DELETE: + processRecord(getBefore(operation), RowKind.DELETE, records); + break; + default: + throw new UnsupportedOperationException("Unknown record operation: " + operation); + } + return records; + } + @Override protected void setRoot(CdcSourceRecord record) { JsonNode node = (JsonNode) record.getValue(); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumJsonRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumJsonRecordParser.java index 66ddc2e1caaa..6e06e3adcaab 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumJsonRecordParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumJsonRecordParser.java @@ -109,11 +109,11 @@ public List extractRecords() { return records; } - private JsonNode getData() { + protected JsonNode getData() { return getAndCheck(dataField()); } - private JsonNode getBefore(String op) { + protected JsonNode getBefore(String op) { return getAndCheck(FIELD_BEFORE, FIELD_TYPE, op); } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/BsonValueConvertor.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/BsonValueConvertor.java index 16d43821bd12..666ef9224268 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/BsonValueConvertor.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/BsonValueConvertor.java @@ -47,8 +47,8 @@ import org.bson.types.Decimal128; import org.bson.types.ObjectId; -import java.math.BigDecimal; import java.util.ArrayList; +import java.util.Base64; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -61,15 +61,22 @@ private static Integer convert(BsonTimestamp bsonTimestamp) { return bsonTimestamp.getTime(); } - private static BigDecimal convert(BsonDecimal128 bsonValue) { + private static Number convert(BsonDecimal128 bsonValue) { return convert(bsonValue.decimal128Value()); } - private static BigDecimal convert(Decimal128 bsonValue) { - if (bsonValue.isNaN() || bsonValue.isInfinite()) { - return null; + private static Number convert(Decimal128 bsonValue) { + if (bsonValue.isNaN()) { + return Double.NaN; + } else if (bsonValue.isInfinite()) { + if (bsonValue.isNegative()) { + return Double.NEGATIVE_INFINITY; + } else { + return Double.POSITIVE_INFINITY; + } + } else { + return bsonValue.bigDecimalValue(); } - return bsonValue.bigDecimalValue(); } private static String convert(BsonObjectId objectId) { @@ -84,7 +91,7 @@ private static String convert(BsonBinary bsonBinary) { if (BsonBinarySubType.isUuid(bsonBinary.getType())) { return bsonBinary.asUuid().toString(); } else { - return toHex(bsonBinary.getData()); + return Base64.getEncoder().encodeToString(bsonBinary.getData()); } } @@ -99,11 +106,7 @@ private static String convert(BsonRegularExpression regex) { } private static Double convert(BsonDouble bsonDouble) { - double value = bsonDouble.getValue(); - if (Double.isNaN(value) || Double.isInfinite(value)) { - return null; - } - return value; + return bsonDouble.getValue(); } private static String convert(BsonString string) { @@ -136,8 +139,8 @@ private static String convert(BsonJavaScript javascript) { private static Map convert(BsonJavaScriptWithScope javascriptWithScope) { return CollectionUtil.map( - Pair.of("code", javascriptWithScope.getCode()), - Pair.of("scope", convert(javascriptWithScope.getScope()))); + Pair.of("$code", javascriptWithScope.getCode()), + Pair.of("$scope", convert(javascriptWithScope.getScope()))); } private static String convert(BsonNull bsonNull) { @@ -224,19 +227,4 @@ public static Object convert(BsonValue bsonValue) { "Unsupported BSON type: " + bsonValue.getBsonType()); } } - - public static String toHex(byte[] bytes) { - StringBuilder sb = new StringBuilder(); - - for (byte b : bytes) { - String s = Integer.toHexString(255 & b); - if (s.length() < 2) { - sb.append("0"); - } - - sb.append(s); - } - - return sb.toString(); - } } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParserTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParserTest.java new file mode 100644 index 000000000000..8da539f47c1e --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParserTest.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.format.debezium; + +import org.apache.paimon.flink.action.cdc.CdcSourceRecord; +import org.apache.paimon.flink.action.cdc.TypeMapping; +import org.apache.paimon.flink.action.cdc.kafka.KafkaActionITCaseBase; +import org.apache.paimon.flink.action.cdc.watermark.MessageQueueCdcTimestampExtractor; +import org.apache.paimon.flink.sink.cdc.CdcRecord; +import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.types.RowKind; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** Test for DebeziumBsonRecordParser. */ +public class DebeziumBsonRecordParserTest extends KafkaActionITCaseBase { + + private static final Logger log = LoggerFactory.getLogger(DebeziumBsonRecordParserTest.class); + private static List insertList = new ArrayList<>(); + private static List updateList = new ArrayList<>(); + private static List deleteList = new ArrayList<>(); + + private static ObjectMapper objMapper = new ObjectMapper(); + + private static Map beforeEvent = new HashMap<>(); + + static { + beforeEvent.put("_id", "67ab25755c0d5ac87eb8c632"); + beforeEvent.put("created_at", "1736207571013"); + beforeEvent.put("created_by", "peter"); + beforeEvent.put("tags", "[\"pending\"]"); + beforeEvent.put("updated_at", "1739455297970"); + } + + private static Map afterEvent = new HashMap<>(); + + static { + afterEvent.put("_id", "67ab25755c0d5ac87eb8c632"); + afterEvent.put("created_at", "1736207571013"); + afterEvent.put("created_by", "peter"); + afterEvent.put("tags", "[\"succeed\"]"); + afterEvent.put("updated_at", "1739455397970"); + } + + @Before + public void setup() { + String insertRes = "kafka/debezium-bson/table/event/event-insert.txt"; + String updateRes = "kafka/debezium-bson/table/event/event-update.txt"; + String deleteRes = "kafka/debezium-bson/table/event/event-delete.txt"; + URL url; + try { + url = DebeziumBsonRecordParserTest.class.getClassLoader().getResource(insertRes); + Files.readAllLines(Paths.get(url.toURI())).stream() + .filter(this::isRecordLine) + .forEach(e -> insertList.add(e)); + + url = DebeziumBsonRecordParserTest.class.getClassLoader().getResource(updateRes); + Files.readAllLines(Paths.get(url.toURI())).stream() + .filter(this::isRecordLine) + .forEach(e -> updateList.add(e)); + + url = DebeziumBsonRecordParserTest.class.getClassLoader().getResource(deleteRes); + Files.readAllLines(Paths.get(url.toURI())).stream() + .filter(this::isRecordLine) + .forEach(e -> deleteList.add(e)); + + } catch (Exception e) { + log.error("Fail to init debezium-json cases", e); + } + } + + @Test + public void extractInsertRecord() throws Exception { + DebeziumBsonRecordParser parser = + new DebeziumBsonRecordParser(TypeMapping.defaultMapping(), Collections.emptyList()); + for (String json : insertList) { + // 将json解析为JsonNode对象 + JsonNode rootNode = objMapper.readValue(json, JsonNode.class); + CdcSourceRecord cdcRecord = new CdcSourceRecord(rootNode); + Schema schema = parser.buildSchema(cdcRecord); + Assert.assertEquals(schema.primaryKeys(), Arrays.asList("_id")); + + List records = parser.extractRecords(); + Assert.assertEquals(records.size(), 1); + + CdcRecord result = records.get(0).toRichCdcRecord().toCdcRecord(); + Assert.assertEquals(result.kind(), RowKind.INSERT); + Assert.assertEquals(beforeEvent, result.data()); + + String dbName = parser.getDatabaseName(); + Assert.assertEquals(dbName, "bigdata_test"); + + String tableName = parser.getTableName(); + Assert.assertEquals(tableName, "sync_test_table"); + + MessageQueueCdcTimestampExtractor extractor = new MessageQueueCdcTimestampExtractor(); + Assert.assertTrue(extractor.extractTimestamp(cdcRecord) > 0); + } + } + + @Test + public void extractUpdateRecord() throws Exception { + DebeziumBsonRecordParser parser = + new DebeziumBsonRecordParser(TypeMapping.defaultMapping(), Collections.emptyList()); + for (String json : updateList) { + // 将json解析为JsonNode对象 + JsonNode jsonNode = objMapper.readValue(json, JsonNode.class); + CdcSourceRecord cdcRecord = new CdcSourceRecord(jsonNode); + Schema schema = parser.buildSchema(cdcRecord); + Assert.assertEquals(schema.primaryKeys(), Arrays.asList("_id")); + + List records = parser.extractRecords(); + Assert.assertEquals(records.size(), 2); + + CdcRecord updateBefore = records.get(0).toRichCdcRecord().toCdcRecord(); + Assert.assertEquals(updateBefore.kind(), RowKind.DELETE); + Assert.assertEquals(beforeEvent, updateBefore.data()); + + CdcRecord updateAfter = records.get(1).toRichCdcRecord().toCdcRecord(); + Assert.assertEquals(updateAfter.kind(), RowKind.INSERT); + Assert.assertEquals(afterEvent, updateAfter.data()); + + String dbName = parser.getDatabaseName(); + Assert.assertEquals(dbName, "bigdata_test"); + + String tableName = parser.getTableName(); + Assert.assertEquals(tableName, "sync_test_table"); + + MessageQueueCdcTimestampExtractor extractor = new MessageQueueCdcTimestampExtractor(); + Assert.assertTrue(extractor.extractTimestamp(cdcRecord) > 0); + } + } + + @Test + public void extractDeleteRecord() throws Exception { + DebeziumBsonRecordParser parser = + new DebeziumBsonRecordParser(TypeMapping.defaultMapping(), Collections.emptyList()); + for (String json : deleteList) { + // 将json解析为JsonNode对象 + JsonNode jsonNode = objMapper.readValue(json, JsonNode.class); + CdcSourceRecord cdcRecord = new CdcSourceRecord(jsonNode); + Schema schema = parser.buildSchema(cdcRecord); + Assert.assertEquals(schema.primaryKeys(), Arrays.asList("_id")); + + List records = parser.extractRecords(); + Assert.assertEquals(records.size(), 1); + + CdcRecord result = records.get(0).toRichCdcRecord().toCdcRecord(); + Assert.assertEquals(result.kind(), RowKind.DELETE); + Assert.assertEquals(afterEvent, result.data()); + + String dbName = parser.getDatabaseName(); + Assert.assertEquals(dbName, "bigdata_test"); + + String tableName = parser.getTableName(); + Assert.assertEquals(tableName, "sync_test_table"); + + MessageQueueCdcTimestampExtractor extractor = new MessageQueueCdcTimestampExtractor(); + Assert.assertTrue(extractor.extractTimestamp(cdcRecord) > 0); + } + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-delete.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-delete.txt new file mode 100644 index 000000000000..1c095e4394db --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-delete.txt @@ -0,0 +1,18 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{"before": "{\"_id\": {\"$oid\": \"67ab25755c0d5ac87eb8c632\"},\"created_at\": {\"$numberLong\": \"1736207571013\"},\"created_by\": \"peter\",\"tags\": [\"succeed\"],\"updated_at\": {\"$date\": 1739455397970}}","after": null,"updateDescription": null,"source": {"version": "2.7.0.Final","connector": "mongodb","name": "mongodb_bigdata_test","ts_ms": 1739321992000,"snapshot": "false","db": "bigdata_test","sequence": null,"ts_us": 1739321992000000,"ts_ns": 1739321992000000000,"collection": "sync_test_table","ord": 28,"lsid": null,"txnNumber": null,"wallTime": null},"op": "d","ts_ms": 1739321992890,"transaction": null} \ No newline at end of file diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-insert.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-insert.txt new file mode 100644 index 000000000000..4467768ece4c --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-insert.txt @@ -0,0 +1,18 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{"before": null,"after": "{\"_id\": {\"$oid\": \"67ab25755c0d5ac87eb8c632\"},\"created_at\": {\"$numberLong\": \"1736207571013\"},\"created_by\": \"peter\",\"tags\": [\"pending\"],\"updated_at\": {\"$date\": 1739455297970}}","updateDescription": null,"source": {"version": "2.7.0.Final","connector": "mongodb","name": "mongodb_bigdata_test","ts_ms": 1739321992000,"snapshot": "false","db": "bigdata_test","sequence": null,"ts_us": 1739321992000000,"ts_ns": 1739321992000000000,"collection": "sync_test_table","ord": 28,"lsid": null,"txnNumber": null,"wallTime": null},"op": "c","ts_ms": 1739321992890,"transaction": null} \ No newline at end of file diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-update.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-update.txt new file mode 100644 index 000000000000..add415d29c37 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-update.txt @@ -0,0 +1,18 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{"before": "{\"_id\": {\"$oid\": \"67ab25755c0d5ac87eb8c632\"},\"created_at\": {\"$numberLong\": \"1736207571013\"},\"created_by\": \"peter\",\"tags\": [\"pending\"],\"updated_at\": {\"$date\": 1739455297970}}","after": "{\"_id\": {\"$oid\": \"67ab25755c0d5ac87eb8c632\"},\"created_at\": {\"$numberLong\": \"1736207571013\"},\"created_by\": \"peter\",\"tags\": [\"succeed\"],\"updated_at\": {\"$date\": 1739455397970}}","updateDescription": null,"source": {"version": "2.7.0.Final","connector": "mongodb","name": "mongodb_bigdata_test","ts_ms": 1739321992000,"snapshot": "false","db": "bigdata_test","sequence": null,"ts_us": 1739321992000000,"ts_ns": 1739321992000000000,"collection": "sync_test_table","ord": 28,"lsid": null,"txnNumber": null,"wallTime": null},"op": "u","ts_ms": 1739321992890,"transaction": null} \ No newline at end of file From 67b2b6b9449b3c550ad1d79f56c8d944d6db6aad Mon Sep 17 00:00:00 2001 From: lizc9 Date: Fri, 28 Feb 2025 16:12:28 +0800 Subject: [PATCH 2/2] [Flink] debezium-bson format using the id field in the Kafka Key as Update before information --- docs/content/cdc-ingestion/debezium-bson.md | 6 +- .../debezium/DebeziumBsonRecordParser.java | 58 ++++- ...afkaDebeziumJsonDeserializationSchema.java | 9 +- .../DebeziumBsonRecordParserTest.java | 217 ++++++++++++------ .../debezium-bson/table/event/event-bson.txt | 19 ++ .../table/event/event-delete.txt | 7 +- .../table/event/event-insert.txt | 1 + .../debezium-bson/table/event/event-json.txt | 19 ++ .../table/event/event-update.txt | 5 +- 9 files changed, 255 insertions(+), 86 deletions(-) create mode 100644 paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-bson.txt create mode 100644 paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-json.txt diff --git a/docs/content/cdc-ingestion/debezium-bson.md b/docs/content/cdc-ingestion/debezium-bson.md index 978bb1202562..52eb0422d54b 100644 --- a/docs/content/cdc-ingestion/debezium-bson.md +++ b/docs/content/cdc-ingestion/debezium-bson.md @@ -47,7 +47,7 @@ bson-*.jar {{< hint info >}} The debezium bson format requires insert/update/delete event messages include the full document, and include a field that represents the state of the document before the change. This requires setting debezium's capture.mode to change_streams_update_full_with_pre_image and [capture.mode.full.update.type](https://debezium.io/documentation/reference/stable/connectors/mongodb.html#mongodb-property-capture-mode-full-update-type) to post_image. -The database must be running **MongoDB 6.0 or later** to use this option. +Before version 6.0 of MongoDB, it was not possible to obtain 'Update Before' information. Therefore, using the id field in the Kafka Key as 'Update before' information {{< /hint >}} Here is a simple example for an update operation captured from a Mongodb customers collection in JSON format: @@ -145,7 +145,7 @@ Below is a list of top-level field BsonValue conversion examples:
  • 1735934393769
  • -
  • {"$numberLong": 1735934393769}
  • +
  • {"$numberLong": "1735934393769"}
"1735934393769" @@ -186,7 +186,7 @@ Below is a list of top-level field BsonValue conversion examples:
BsonArray
- [1,2,{"$numberLong": 1735934393769}] + [1,2,{"$numberLong": "1735934393769"}] "[1,2,1735934393769]" diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java index 525a5af81930..397575c8e598 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java @@ -31,6 +31,8 @@ import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.TextNode; import org.bson.BsonDocument; import org.bson.BsonValue; @@ -46,6 +48,7 @@ import java.util.Map; import java.util.Objects; +import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.FIELD_BEFORE; import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.FIELD_PAYLOAD; import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.FIELD_SCHEMA; import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.FIELD_TYPE; @@ -53,6 +56,8 @@ import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.OP_INSERT; import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.OP_READE; import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.OP_UPDATE; +import static org.apache.paimon.utils.JsonSerdeUtil.fromJson; +import static org.apache.paimon.utils.JsonSerdeUtil.isNull; import static org.apache.paimon.utils.JsonSerdeUtil.writeValueAsString; /** @@ -71,8 +76,11 @@ public class DebeziumBsonRecordParser extends DebeziumJsonRecordParser { private static final String FIELD_COLLECTION = "collection"; private static final String FIELD_OBJECT_ID = "_id"; + private static final String FIELD_KEY_ID = "id"; private static final List PRIMARY_KEYS = Collections.singletonList(FIELD_OBJECT_ID); + private ObjectNode keyRoot; + public DebeziumBsonRecordParser(TypeMapping typeMapping, List computedColumns) { super(typeMapping, computedColumns); } @@ -87,11 +95,11 @@ public List extractRecords() { processRecord(getData(), RowKind.INSERT, records); break; case OP_UPDATE: - processRecord(getBefore(operation), RowKind.DELETE, records); + processDeleteRecord(operation, records); processRecord(getData(), RowKind.INSERT, records); break; case OP_DELETE: - processRecord(getBefore(operation), RowKind.DELETE, records); + processDeleteRecord(operation, records); break; default: throw new UnsupportedOperationException("Unknown record operation: " + operation); @@ -101,11 +109,14 @@ public List extractRecords() { @Override protected void setRoot(CdcSourceRecord record) { - JsonNode node = (JsonNode) record.getValue(); - if (node.has(FIELD_SCHEMA)) { - root = node.get(FIELD_PAYLOAD); - } else { - root = node; + root = (JsonNode) record.getValue(); + if (root.has(FIELD_SCHEMA)) { + root = root.get(FIELD_PAYLOAD); + } + + keyRoot = (ObjectNode) record.getKey(); + if (!isNull(keyRoot) && keyRoot.has(FIELD_SCHEMA)) { + keyRoot = (ObjectNode) keyRoot.get(FIELD_PAYLOAD); } } @@ -158,4 +169,37 @@ protected String getTableName() { protected String format() { return "debezium-bson"; } + + public boolean checkBeforeExists() { + return !isNull(root.get(FIELD_BEFORE)); + } + + private void processDeleteRecord(String operation, List records) { + if (checkBeforeExists()) { + processRecord(getBefore(operation), RowKind.DELETE, records); + } else { + // Before version 6.0 of MongoDB, it was not possible to obtain 'Update Before' + // information. Therefore, data is first deleted using the key 'id' + JsonNode idNode = null; + Preconditions.checkArgument( + !isNull(keyRoot) && !isNull(idNode = keyRoot.get(FIELD_KEY_ID)), + "Invalid %s format: missing '%s' field in key when '%s' is '%s' for: %s.", + format(), + FIELD_KEY_ID, + FIELD_TYPE, + operation, + keyRoot); + + // Deserialize id from json string to JsonNode + Map record = + Collections.singletonMap( + FIELD_OBJECT_ID, fromJson(idNode.asText(), JsonNode.class)); + + try { + processRecord(new TextNode(writeValueAsString(record)), RowKind.DELETE, records); + } catch (JsonProcessingException e) { + throw new RuntimeException("Failed to deserialize key record.", e); + } + } + } } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumJsonDeserializationSchema.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumJsonDeserializationSchema.java index 76211cf56dad..507c9eb63c3d 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumJsonDeserializationSchema.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumJsonDeserializationSchema.java @@ -64,7 +64,14 @@ public CdcSourceRecord deserialize(ConsumerRecord message) throw } try { - return new CdcSourceRecord(objectMapper.readValue(message.value(), JsonNode.class)); + byte[] key = message.key(); + JsonNode keyNode = null; + if (key != null) { + keyNode = objectMapper.readValue(key, JsonNode.class); + } + + JsonNode valueNode = objectMapper.readValue(message.value(), JsonNode.class); + return new CdcSourceRecord(null, keyNode, valueNode); } catch (Exception e) { LOG.error("Invalid Json:\n{}", new String(message.value())); throw e; diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParserTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParserTest.java index 8da539f47c1e..78d3bcc3de60 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParserTest.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParserTest.java @@ -20,19 +20,26 @@ import org.apache.paimon.flink.action.cdc.CdcSourceRecord; import org.apache.paimon.flink.action.cdc.TypeMapping; -import org.apache.paimon.flink.action.cdc.kafka.KafkaActionITCaseBase; +import org.apache.paimon.flink.action.cdc.format.DataFormat; import org.apache.paimon.flink.action.cdc.watermark.MessageQueueCdcTimestampExtractor; import org.apache.paimon.flink.sink.cdc.CdcRecord; import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord; import org.apache.paimon.schema.Schema; import org.apache.paimon.types.RowKind; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.JsonSerdeUtil; +import org.apache.paimon.utils.StringUtils; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode; -import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; - -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.TextNode; + +import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,59 +54,85 @@ import java.util.Map; /** Test for DebeziumBsonRecordParser. */ -public class DebeziumBsonRecordParserTest extends KafkaActionITCaseBase { +public class DebeziumBsonRecordParserTest { private static final Logger log = LoggerFactory.getLogger(DebeziumBsonRecordParserTest.class); - private static List insertList = new ArrayList<>(); - private static List updateList = new ArrayList<>(); - private static List deleteList = new ArrayList<>(); + private static List insertList = new ArrayList<>(); + private static List updateList = new ArrayList<>(); + private static List deleteList = new ArrayList<>(); + + private static ArrayList bsonRecords = new ArrayList<>(); + private static ArrayList jsonRecords = new ArrayList<>(); - private static ObjectMapper objMapper = new ObjectMapper(); + private static Map keyEvent = new HashMap<>(); + + private static KafkaDeserializationSchema kafkaDeserializationSchema = null; private static Map beforeEvent = new HashMap<>(); - static { + private static Map afterEvent = new HashMap<>(); + + @BeforeAll + public static void beforeAll() throws Exception { + DataFormat dataFormat = new DebeziumBsonDataFormatFactory().create(); + kafkaDeserializationSchema = dataFormat.createKafkaDeserializer(null); + + keyEvent.put("_id", "67ab25755c0d5ac87eb8c632"); + beforeEvent.put("_id", "67ab25755c0d5ac87eb8c632"); beforeEvent.put("created_at", "1736207571013"); beforeEvent.put("created_by", "peter"); beforeEvent.put("tags", "[\"pending\"]"); beforeEvent.put("updated_at", "1739455297970"); - } - private static Map afterEvent = new HashMap<>(); - - static { afterEvent.put("_id", "67ab25755c0d5ac87eb8c632"); afterEvent.put("created_at", "1736207571013"); afterEvent.put("created_by", "peter"); afterEvent.put("tags", "[\"succeed\"]"); afterEvent.put("updated_at", "1739455397970"); - } - @Before - public void setup() { String insertRes = "kafka/debezium-bson/table/event/event-insert.txt"; String updateRes = "kafka/debezium-bson/table/event/event-update.txt"; String deleteRes = "kafka/debezium-bson/table/event/event-delete.txt"; - URL url; - try { - url = DebeziumBsonRecordParserTest.class.getClassLoader().getResource(insertRes); - Files.readAllLines(Paths.get(url.toURI())).stream() - .filter(this::isRecordLine) - .forEach(e -> insertList.add(e)); - - url = DebeziumBsonRecordParserTest.class.getClassLoader().getResource(updateRes); - Files.readAllLines(Paths.get(url.toURI())).stream() - .filter(this::isRecordLine) - .forEach(e -> updateList.add(e)); - - url = DebeziumBsonRecordParserTest.class.getClassLoader().getResource(deleteRes); - Files.readAllLines(Paths.get(url.toURI())).stream() - .filter(this::isRecordLine) - .forEach(e -> deleteList.add(e)); - - } catch (Exception e) { - log.error("Fail to init debezium-json cases", e); + String bsonPth = "kafka/debezium-bson/table/event/event-bson.txt"; + String jsonPath = "kafka/debezium-bson/table/event/event-json.txt"; + + parseCdcSourceRecords(insertRes, insertList); + + parseCdcSourceRecords(updateRes, updateList); + + parseCdcSourceRecords(deleteRes, deleteList); + + parseCdcSourceRecords(bsonPth, bsonRecords); + + parseCdcSourceRecords(jsonPath, jsonRecords); + } + + @AfterAll + public static void afterAll() { + insertList.clear(); + updateList.clear(); + deleteList.clear(); + bsonRecords.clear(); + jsonRecords.clear(); + } + + private static void parseCdcSourceRecords(String resourcePath, List records) + throws Exception { + URL url = DebeziumBsonRecordParserTest.class.getClassLoader().getResource(resourcePath); + List line = Files.readAllLines(Paths.get(url.toURI())); + String key = null; + for (String json : line) { + if (StringUtils.isNullOrWhitespaceOnly(json) || !json.startsWith("{")) { + continue; + } + if (key == null) { + key = json; + } else { + // test kafka deserialization + records.add(deserializeKafkaSchema(key, json)); + key = null; + } } } @@ -107,28 +140,26 @@ public void setup() { public void extractInsertRecord() throws Exception { DebeziumBsonRecordParser parser = new DebeziumBsonRecordParser(TypeMapping.defaultMapping(), Collections.emptyList()); - for (String json : insertList) { - // 将json解析为JsonNode对象 - JsonNode rootNode = objMapper.readValue(json, JsonNode.class); - CdcSourceRecord cdcRecord = new CdcSourceRecord(rootNode); + Assertions.assertFalse(insertList.isEmpty()); + for (CdcSourceRecord cdcRecord : insertList) { Schema schema = parser.buildSchema(cdcRecord); - Assert.assertEquals(schema.primaryKeys(), Arrays.asList("_id")); + Assertions.assertEquals(schema.primaryKeys(), Arrays.asList("_id")); List records = parser.extractRecords(); - Assert.assertEquals(records.size(), 1); + Assertions.assertEquals(records.size(), 1); CdcRecord result = records.get(0).toRichCdcRecord().toCdcRecord(); - Assert.assertEquals(result.kind(), RowKind.INSERT); - Assert.assertEquals(beforeEvent, result.data()); + Assertions.assertEquals(result.kind(), RowKind.INSERT); + Assertions.assertEquals(beforeEvent, result.data()); String dbName = parser.getDatabaseName(); - Assert.assertEquals(dbName, "bigdata_test"); + Assertions.assertEquals(dbName, "bigdata_test"); String tableName = parser.getTableName(); - Assert.assertEquals(tableName, "sync_test_table"); + Assertions.assertEquals(tableName, "sync_test_table"); MessageQueueCdcTimestampExtractor extractor = new MessageQueueCdcTimestampExtractor(); - Assert.assertTrue(extractor.extractTimestamp(cdcRecord) > 0); + Assertions.assertTrue(extractor.extractTimestamp(cdcRecord) > 0); } } @@ -136,32 +167,34 @@ public void extractInsertRecord() throws Exception { public void extractUpdateRecord() throws Exception { DebeziumBsonRecordParser parser = new DebeziumBsonRecordParser(TypeMapping.defaultMapping(), Collections.emptyList()); - for (String json : updateList) { - // 将json解析为JsonNode对象 - JsonNode jsonNode = objMapper.readValue(json, JsonNode.class); - CdcSourceRecord cdcRecord = new CdcSourceRecord(jsonNode); + Assertions.assertFalse(updateList.isEmpty()); + for (CdcSourceRecord cdcRecord : updateList) { Schema schema = parser.buildSchema(cdcRecord); - Assert.assertEquals(schema.primaryKeys(), Arrays.asList("_id")); + Assertions.assertEquals(schema.primaryKeys(), Arrays.asList("_id")); List records = parser.extractRecords(); - Assert.assertEquals(records.size(), 2); + Assertions.assertEquals(records.size(), 2); CdcRecord updateBefore = records.get(0).toRichCdcRecord().toCdcRecord(); - Assert.assertEquals(updateBefore.kind(), RowKind.DELETE); - Assert.assertEquals(beforeEvent, updateBefore.data()); + Assertions.assertEquals(updateBefore.kind(), RowKind.DELETE); + if (parser.checkBeforeExists()) { + Assertions.assertEquals(beforeEvent, updateBefore.data()); + } else { + Assertions.assertEquals(keyEvent, updateBefore.data()); + } CdcRecord updateAfter = records.get(1).toRichCdcRecord().toCdcRecord(); - Assert.assertEquals(updateAfter.kind(), RowKind.INSERT); - Assert.assertEquals(afterEvent, updateAfter.data()); + Assertions.assertEquals(updateAfter.kind(), RowKind.INSERT); + Assertions.assertEquals(afterEvent, updateAfter.data()); String dbName = parser.getDatabaseName(); - Assert.assertEquals(dbName, "bigdata_test"); + Assertions.assertEquals(dbName, "bigdata_test"); String tableName = parser.getTableName(); - Assert.assertEquals(tableName, "sync_test_table"); + Assertions.assertEquals(tableName, "sync_test_table"); MessageQueueCdcTimestampExtractor extractor = new MessageQueueCdcTimestampExtractor(); - Assert.assertTrue(extractor.extractTimestamp(cdcRecord) > 0); + Assertions.assertTrue(extractor.extractTimestamp(cdcRecord) > 0); } } @@ -169,28 +202,66 @@ public void extractUpdateRecord() throws Exception { public void extractDeleteRecord() throws Exception { DebeziumBsonRecordParser parser = new DebeziumBsonRecordParser(TypeMapping.defaultMapping(), Collections.emptyList()); - for (String json : deleteList) { - // 将json解析为JsonNode对象 - JsonNode jsonNode = objMapper.readValue(json, JsonNode.class); - CdcSourceRecord cdcRecord = new CdcSourceRecord(jsonNode); + Assertions.assertFalse(deleteList.isEmpty()); + for (CdcSourceRecord cdcRecord : deleteList) { Schema schema = parser.buildSchema(cdcRecord); - Assert.assertEquals(schema.primaryKeys(), Arrays.asList("_id")); + Assertions.assertEquals(schema.primaryKeys(), Arrays.asList("_id")); List records = parser.extractRecords(); - Assert.assertEquals(records.size(), 1); + Assertions.assertEquals(records.size(), 1); CdcRecord result = records.get(0).toRichCdcRecord().toCdcRecord(); - Assert.assertEquals(result.kind(), RowKind.DELETE); - Assert.assertEquals(afterEvent, result.data()); + Assertions.assertEquals(result.kind(), RowKind.DELETE); + if (parser.checkBeforeExists()) { + Assertions.assertEquals(beforeEvent, result.data()); + } else { + Assertions.assertEquals(keyEvent, result.data()); + } String dbName = parser.getDatabaseName(); - Assert.assertEquals(dbName, "bigdata_test"); + Assertions.assertEquals(dbName, "bigdata_test"); String tableName = parser.getTableName(); - Assert.assertEquals(tableName, "sync_test_table"); + Assertions.assertEquals(tableName, "sync_test_table"); MessageQueueCdcTimestampExtractor extractor = new MessageQueueCdcTimestampExtractor(); - Assert.assertTrue(extractor.extractTimestamp(cdcRecord) > 0); + Assertions.assertTrue(extractor.extractTimestamp(cdcRecord) > 0); + } + } + + @Test + public void bsonConvertJsonTest() throws Exception { + DebeziumBsonRecordParser parser = + new DebeziumBsonRecordParser(TypeMapping.defaultMapping(), Collections.emptyList()); + + Assertions.assertFalse(jsonRecords.isEmpty()); + for (int i = 0; i < jsonRecords.size(); i++) { + CdcSourceRecord bsonRecord = bsonRecords.get(i); + CdcSourceRecord jsonRecord = jsonRecords.get(i); + + JsonNode bsonTextNode = + new TextNode(JsonSerdeUtil.writeValueAsString(bsonRecord.getValue())); + Map resultMap = parser.extractRowData(bsonTextNode, RowType.builder()); + + ObjectNode expectNode = (ObjectNode) jsonRecord.getValue(); + + expectNode + .fields() + .forEachRemaining( + entry -> { + String key = entry.getKey(); + String expectValue = null; + if (!JsonSerdeUtil.isNull(entry.getValue())) { + expectValue = entry.getValue().asText(); + } + Assertions.assertEquals(expectValue, resultMap.get(key)); + }); } } + + private static CdcSourceRecord deserializeKafkaSchema(String key, String value) + throws Exception { + return kafkaDeserializationSchema.deserialize( + new ConsumerRecord<>("topic", 0, 0, key.getBytes(), value.getBytes())); + } } diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-bson.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-bson.txt new file mode 100644 index 000000000000..be744a114770 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-bson.txt @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{"id": "dummy"} +{"id": {"$oid": "596e275826f08b2730779e1f"},"name": "Sally","age": 25,"created_time": {"$numberLong": "1735934393769"},"updated_time": 1735934393769,"deleted_time": {"$date": 1735934393769 },"register_time": {"$timestamp": {"t": 1736997330, "i": 2}},"double_nan": {"$numberDouble": "NaN"},"double_inf": {"$numberDouble": "Infinity"},"double_ninf": {"$numberDouble": "-Infinity"},"double_zero": {"$numberDouble": "0"},"boolean_true": true,"boolean_false": false,"array": ["a", "b", "c"],"array_mix": ["1", 2, {"$numberLong": "1735934393769"}],"decimal": {"$numberDecimal": "3.14"},"decimal_nan": {"$numberDecimal": "NaN"},"regex": {"$regularExpression": {"pattern": "^pass$", "options": "i"}},"symbol": {"$symbol": "symbol"},"minKey": {"$minKey": 1},"maxKey": {"$maxKey": 1},"code": {"$code": "function(){}"},"null": null,"undefined": {"$undefined": true},"binary": {"$binary": "uE2/4v5MSVOiJZkOo3APKQ==", "$type": "0"},"uuid": {"$binary": "uE2/4v5MSVOiJZkOo3APKQ==", "$type": "4"} ,"context": { "city": "Shanghai", "location": { "lat": 31.2304, "lon": {"$numberDouble": "121.4737"} }, "start_time": 0, "end_time": {"$numberLong": "1735934393769"}, "args": [1, 2], "nan": {"$numberDouble": "NaN"}}} \ No newline at end of file diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-delete.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-delete.txt index 1c095e4394db..a1de2bc514c8 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-delete.txt +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-delete.txt @@ -15,4 +15,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -{"before": "{\"_id\": {\"$oid\": \"67ab25755c0d5ac87eb8c632\"},\"created_at\": {\"$numberLong\": \"1736207571013\"},\"created_by\": \"peter\",\"tags\": [\"succeed\"],\"updated_at\": {\"$date\": 1739455397970}}","after": null,"updateDescription": null,"source": {"version": "2.7.0.Final","connector": "mongodb","name": "mongodb_bigdata_test","ts_ms": 1739321992000,"snapshot": "false","db": "bigdata_test","sequence": null,"ts_us": 1739321992000000,"ts_ns": 1739321992000000000,"collection": "sync_test_table","ord": 28,"lsid": null,"txnNumber": null,"wallTime": null},"op": "d","ts_ms": 1739321992890,"transaction": null} \ No newline at end of file +{"id": "{\"$oid\": \"67ab25755c0d5ac87eb8c632\"}"} +{"before": null,"after": null,"updateDescription": null,"source": {"version": "2.7.0.Final","connector": "mongodb","name": "mongodb_bigdata_test","ts_ms": 1739321992000,"snapshot": "false","db": "bigdata_test","sequence": null,"ts_us": 1739321992000000,"ts_ns": 1739321992000000000,"collection": "sync_test_table","ord": 28,"lsid": null,"txnNumber": null,"wallTime": null},"op": "d","ts_ms": 1739321992890,"transaction": null} +{"id": "\"67ab25755c0d5ac87eb8c632\""} +{"before": null,"after": null,"updateDescription": null,"source": {"version": "2.7.0.Final","connector": "mongodb","name": "mongodb_bigdata_test","ts_ms": 1739321992000,"snapshot": "false","db": "bigdata_test","sequence": null,"ts_us": 1739321992000000,"ts_ns": 1739321992000000000,"collection": "sync_test_table","ord": 28,"lsid": null,"txnNumber": null,"wallTime": null},"op": "d","ts_ms": 1739321992890,"transaction": null} +{"id": "{\"$oid\": \"67ab25755c0d5ac87eb8c632\"}"} +{"before": "{\"_id\": {\"$oid\": \"67ab25755c0d5ac87eb8c632\"},\"created_at\": {\"$numberLong\": \"1736207571013\"},\"created_by\": \"peter\",\"tags\": [\"pending\"],\"updated_at\": {\"$date\": 1739455297970}}","after": null,"updateDescription": null,"source": {"version": "2.7.0.Final","connector": "mongodb","name": "mongodb_bigdata_test","ts_ms": 1739321992000,"snapshot": "false","db": "bigdata_test","sequence": null,"ts_us": 1739321992000000,"ts_ns": 1739321992000000000,"collection": "sync_test_table","ord": 28,"lsid": null,"txnNumber": null,"wallTime": null},"op": "d","ts_ms": 1739321992890,"transaction": null} \ No newline at end of file diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-insert.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-insert.txt index 4467768ece4c..ff978460151b 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-insert.txt +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-insert.txt @@ -15,4 +15,5 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +{"id": "{\"$oid\": \"67ab25755c0d5ac87eb8c632\"}"} {"before": null,"after": "{\"_id\": {\"$oid\": \"67ab25755c0d5ac87eb8c632\"},\"created_at\": {\"$numberLong\": \"1736207571013\"},\"created_by\": \"peter\",\"tags\": [\"pending\"],\"updated_at\": {\"$date\": 1739455297970}}","updateDescription": null,"source": {"version": "2.7.0.Final","connector": "mongodb","name": "mongodb_bigdata_test","ts_ms": 1739321992000,"snapshot": "false","db": "bigdata_test","sequence": null,"ts_us": 1739321992000000,"ts_ns": 1739321992000000000,"collection": "sync_test_table","ord": 28,"lsid": null,"txnNumber": null,"wallTime": null},"op": "c","ts_ms": 1739321992890,"transaction": null} \ No newline at end of file diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-json.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-json.txt new file mode 100644 index 000000000000..115ee5a91b34 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-json.txt @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{"id": "dummy"} +{"id":"596e275826f08b2730779e1f","name":"Sally","age":"25","created_time":"1735934393769","updated_time":"1735934393769","deleted_time":"1735934393769","register_time":"1736997330","double_nan":"NaN","double_inf":"Infinity","double_ninf":"-Infinity","double_zero":"0.0","boolean_true":"true","boolean_false":"false","array":"[\"a\",\"b\",\"c\"]","array_mix":"[\"1\",2,1735934393769]","decimal":"3.14","decimal_nan":"NaN","regex":"/^pass$/i","symbol":"symbol","minKey":"BsonMinKey","maxKey":"BsonMaxKey","code":"function(){}","null":null,"undefined":null,"binary":"uE2/4v5MSVOiJZkOo3APKQ==","uuid":"b84dbfe2-fe4c-4953-a225-990ea3700f29","context":"{\"city\":\"Shanghai\",\"location\":{\"lat\":31.2304,\"lon\":121.4737},\"start_time\":0,\"end_time\":1735934393769,\"args\":[1,2],\"nan\":\"NaN\"}"} \ No newline at end of file diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-update.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-update.txt index add415d29c37..035ce3ff873a 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-update.txt +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-update.txt @@ -15,4 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -{"before": "{\"_id\": {\"$oid\": \"67ab25755c0d5ac87eb8c632\"},\"created_at\": {\"$numberLong\": \"1736207571013\"},\"created_by\": \"peter\",\"tags\": [\"pending\"],\"updated_at\": {\"$date\": 1739455297970}}","after": "{\"_id\": {\"$oid\": \"67ab25755c0d5ac87eb8c632\"},\"created_at\": {\"$numberLong\": \"1736207571013\"},\"created_by\": \"peter\",\"tags\": [\"succeed\"],\"updated_at\": {\"$date\": 1739455397970}}","updateDescription": null,"source": {"version": "2.7.0.Final","connector": "mongodb","name": "mongodb_bigdata_test","ts_ms": 1739321992000,"snapshot": "false","db": "bigdata_test","sequence": null,"ts_us": 1739321992000000,"ts_ns": 1739321992000000000,"collection": "sync_test_table","ord": 28,"lsid": null,"txnNumber": null,"wallTime": null},"op": "u","ts_ms": 1739321992890,"transaction": null} \ No newline at end of file +{"id": "{\"$oid\": \"67ab25755c0d5ac87eb8c632\"}"} +{"before": null,"after": "{\"_id\": {\"$oid\": \"67ab25755c0d5ac87eb8c632\"},\"created_at\": {\"$numberLong\": \"1736207571013\"},\"created_by\": \"peter\",\"tags\": [\"succeed\"],\"updated_at\": {\"$date\": 1739455397970}}","updateDescription": null,"source": {"version": "2.7.0.Final","connector": "mongodb","name": "mongodb_bigdata_test","ts_ms": 1739321992000,"snapshot": "false","db": "bigdata_test","sequence": null,"ts_us": 1739321992000000,"ts_ns": 1739321992000000000,"collection": "sync_test_table","ord": 28,"lsid": null,"txnNumber": null,"wallTime": null},"op": "u","ts_ms": 1739321992890,"transaction": null} +{"id": "{\"$oid\": \"67ab25755c0d5ac87eb8c632\"}"} +{"before": "{\"_id\": {\"$oid\": \"67ab25755c0d5ac87eb8c632\"},\"created_at\": {\"$numberLong\": \"1736207571013\"},\"created_by\": \"peter\",\"tags\": [\"pending\"],\"updated_at\": {\"$date\": 1739455297970}}","after": "{\"_id\": {\"$oid\": \"67ab25755c0d5ac87eb8c632\"},\"created_at\": {\"$numberLong\": \"1736207571013\"},\"created_by\": \"peter\",\"tags\": [\"succeed\"],\"updated_at\": {\"$date\": 1739455397970}}","updateDescription": null,"source": {"version": "2.7.0.Final","connector": "mongodb","name": "mongodb_bigdata_test","ts_ms": 1739321992000,"snapshot": "false","db": "bigdata_test","sequence": null,"ts_us": 1739321992000000,"ts_ns": 1739321992000000000,"collection": "sync_test_table","ord": 28,"lsid": null,"txnNumber": null,"wallTime": null},"op": "u","ts_ms": 1739321992890,"transaction": null}