From 12fcd7452d9e895e04802a07093ad046c61551c6 Mon Sep 17 00:00:00 2001 From: Juntao Zhang Date: Thu, 11 Sep 2025 14:00:42 +0800 Subject: [PATCH] [cdc] Fix PostgreSQL DECIMAL type conversion issue --- .../action/cdc/format/debezium/DebeziumSchemaUtils.java | 2 +- .../flink/action/cdc/postgres/PostgresRecordParser.java | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java index 80f99165e6d4..983b760294af 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java @@ -167,7 +167,7 @@ public static String transformRawValue( } else if (("bytes".equals(debeziumType) && className == null)) { // MySQL binary, varbinary, blob transformed = new String(Base64.getDecoder().decode(rawValue)); - } else if ("bytes".equals(debeziumType) && decimalLogicalName().equals(className)) { + } else if ("bytes".equals(debeziumType) && className.endsWith(decimalLogicalName())) { // MySQL numeric, fixed, decimal try { new BigDecimal(rawValue); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java index c2565c1f2de2..8c5be3b6a13c 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java @@ -179,7 +179,7 @@ private DataType extractFieldType(DebeziumEvent.Field field) { case "string": return DataTypes.STRING(); case "bytes": - if (decimalLogicalName().equals(field.name())) { + if (field.name() != null && field.name().endsWith(decimalLogicalName())) { int precision = field.parameters().get("connect.decimal.precision").asInt(); int scale = field.parameters().get("scale").asInt(); return DataTypes.DECIMAL(precision, scale); @@ -270,7 +270,8 @@ private Map extractRow(JsonNode recordRow, CdcSchema.Builder sch } else if (("bytes".equals(postgresSqlType) && className == null)) { // binary, varbinary newValue = new String(Base64.getDecoder().decode(oldValue)); - } else if ("bytes".equals(postgresSqlType) && decimalLogicalName().equals(className)) { + } else if ("bytes".equals(postgresSqlType) + && className.endsWith(decimalLogicalName())) { // numeric, decimal try { new BigDecimal(oldValue);