From 7eba16e22982cb3646436d0c3ad75f6244754224 Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Wed, 11 Dec 2024 22:59:23 +0800 Subject: [PATCH 1/2] update --- .../apache/paimon/table/system/BinlogTable.java | 17 +++++++++++++++++ .../spark/sql/PaimonSystemTableTest.scala | 16 ++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java index b17d61d44e77..27a4e6b65753 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java @@ -99,6 +99,23 @@ private BinlogRead(InnerTableRead dataRead) { super(dataRead); } + @Override + public InnerTableRead withReadType(RowType readType) { + List fields = new ArrayList<>(); + for (DataField field : readType.getFields()) { + if (field.name().equals(SpecialFields.ROW_KIND.name())) { + fields.add(field); + } else { + fields.add( + new DataField( + field.id(), + field.name(), + ((ArrayType) field.type()).getElementType())); + } + } + return super.withReadType(readType.copy(fields)); + } + @Override public RecordReader createReader(Split split) throws IOException { DataSplit dataSplit = (DataSplit) split; diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonSystemTableTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonSystemTableTest.scala index 64baf6232fd8..7baa57a54d90 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonSystemTableTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonSystemTableTest.scala @@ -81,4 +81,20 @@ class PaimonSystemTableTest extends PaimonSparkTestBase { spark.sql("select partition,bucket from `T$buckets`"), Row("[2024-10-10, 01]", 0) :: Row("[2024-10-10, 01]", 1) :: Row("[2024-10-10, 01]", 2) :: Nil) } + + test("system table: binlog table") { + sql(""" + |CREATE TABLE T (a INT, b INT) + |TBLPROPERTIES ('primary-key'='a', 'changelog-producer' = 'lookup', 'bucket' = '2') + |""".stripMargin) + + sql("INSERT INTO T VALUES (1, 2)") + sql("INSERT INTO T VALUES (1, 3)") + sql("INSERT INTO T VALUES (2, 2)") + + checkAnswer( + sql("SELECT * FROM `T$binlog`"), + Seq(Row("+I", Array(1), Array(3)), Row("+I", Array(2), Array(2))) + ) + } } From 6618c625643c661247d7a15638f01455fbe5a89c Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Wed, 11 Dec 2024 23:06:14 +0800 Subject: [PATCH 2/2] update --- .../apache/paimon/table/system/BinlogTable.java | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java index 27a4e6b65753..08eea468ea70 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java @@ -72,13 +72,8 @@ public RowType rowType() { List fields = new ArrayList<>(); fields.add(SpecialFields.ROW_KIND); for (DataField field : wrapped.rowType().getFields()) { - DataField newField = - new DataField( - field.id(), - field.name(), - new ArrayType(field.type().nullable()), // convert to nullable - field.description()); - fields.add(newField); + // convert to nullable + fields.add(field.newType(new ArrayType(field.type().nullable()))); } return new RowType(fields); } @@ -106,11 +101,7 @@ public InnerTableRead withReadType(RowType readType) { if (field.name().equals(SpecialFields.ROW_KIND.name())) { fields.add(field); } else { - fields.add( - new DataField( - field.id(), - field.name(), - ((ArrayType) field.type()).getElementType())); + fields.add(field.newType(((ArrayType) field.type()).getElementType())); } } return super.withReadType(readType.copy(fields));