diff --git a/docs/content/migration/iceberg-compatibility.md b/docs/content/migration/iceberg-compatibility.md index ffbeb2592124..3040424955ea 100644 --- a/docs/content/migration/iceberg-compatibility.md +++ b/docs/content/migration/iceberg-compatibility.md @@ -345,6 +345,28 @@ You can configure the following table option, so that Paimon is forced to perfor Note that full compaction is a resource-consuming process, so the value of this table option should not be too small. We recommend full compaction to be performed once or twice per hour. +## Supported Types + +Paimon Iceberg compatibility currently supports the following data types. + +| Paimon Data Type | Iceberg Data Type | +|----------------------------|-------------------| +| `BOOLEAN` | `boolean` | +| `INT` | `int` | +| `BIGINT` | `long` | +| `FLOAT` | `float` | +| `DOUBLE` | `double` | +| `DECIMAL` | `decimal` | +| `CHAR` | `string` | +| `VARCHAR` | `string` | +| `BINARY` | `binary` | +| `VARBINARY` | `binary` | +| `DATE` | `date` | +| `TIMESTAMP`* | `timestamp` | +| `TIMESTAMP_LTZ`* | `timestamptz` | + +*: `TIMESTAMP` and `TIMESTAMP_LTZ` type only support precision from 4 to 6 + ## Other Related Table Options diff --git a/paimon-core/pom.xml b/paimon-core/pom.xml index a208d59b2e1a..399f0b5d6c19 100644 --- a/paimon-core/pom.xml +++ b/paimon-core/pom.xml @@ -33,7 +33,6 @@ under the License. 6.20.3-ververica-2.0 - 1.6.1 diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java index 1d9e1c3b16e9..9048d46e44f6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java @@ -20,8 +20,12 @@ import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.Timestamp; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DecimalType; +import org.apache.paimon.types.LocalZonedTimestampType; +import org.apache.paimon.types.TimestampType; +import org.apache.paimon.utils.Preconditions; import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -79,11 +83,26 @@ public static ByteBuffer toByteBuffer(DataType type, Object value) { case DECIMAL: Decimal decimal = (Decimal) value; return ByteBuffer.wrap((decimal.toUnscaledBytes())); + case TIMESTAMP_WITHOUT_TIME_ZONE: + return timestampToByteBuffer( + (Timestamp) value, ((TimestampType) type).getPrecision()); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return timestampToByteBuffer( + (Timestamp) value, ((LocalZonedTimestampType) type).getPrecision()); default: throw new UnsupportedOperationException("Cannot serialize type: " + type); } } + private static ByteBuffer timestampToByteBuffer(Timestamp timestamp, int precision) { + Preconditions.checkArgument( + precision > 3 && precision <= 6, + "Paimon Iceberg compatibility only support timestamp type with precision from 4 to 6."); + return ByteBuffer.allocate(8) + .order(ByteOrder.LITTLE_ENDIAN) + .putLong(0, timestamp.toMicros()); + } + public static Object toPaimonObject(DataType type, byte[] bytes) { switch (type.getTypeRoot()) { case BOOLEAN: @@ -112,6 +131,15 @@ public static Object toPaimonObject(DataType type, byte[] bytes) { DecimalType decimalType = (DecimalType) type; return Decimal.fromUnscaledBytes( bytes, decimalType.getPrecision(), decimalType.getScale()); + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + int timestampPrecision = ((TimestampType) type).getPrecision(); + long timestampLong = + ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN).getLong(); + Preconditions.checkArgument( + timestampPrecision > 3 && timestampPrecision <= 6, + "Paimon Iceberg compatibility only support timestamp type with precision from 4 to 6."); + return Timestamp.fromMicros(timestampLong); default: throw new UnsupportedOperationException("Cannot deserialize type: " + type); } diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java index 183fd5c00dc2..c4505bbdd9b4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java @@ -20,8 +20,10 @@ import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; -import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.DecimalType; +import org.apache.paimon.types.LocalZonedTimestampType; +import org.apache.paimon.types.TimestampType; +import org.apache.paimon.utils.Preconditions; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; @@ -57,6 +59,8 @@ public class IcebergDataField { @JsonProperty(FIELD_TYPE) private final String type; + @JsonIgnore private final DataType dataType; + @JsonProperty(FIELD_DOC) private final String doc; @@ -66,6 +70,7 @@ public IcebergDataField(DataField dataField, int bias) { dataField.name(), !dataField.type().isNullable(), toTypeString(dataField.type()), + dataField.type(), dataField.description()); } @@ -76,10 +81,16 @@ public IcebergDataField( @JsonProperty(FIELD_REQUIRED) boolean required, @JsonProperty(FIELD_TYPE) String type, @JsonProperty(FIELD_DOC) String doc) { + this(id, name, required, type, null, doc); + } + + public IcebergDataField( + int id, String name, boolean required, String type, DataType dataType, String doc) { this.id = id; this.name = name; this.required = required; this.type = type; + this.dataType = dataType; this.doc = doc; } @@ -110,7 +121,7 @@ public String doc() { @JsonIgnore public DataType dataType() { - return fromTypeString(type); + return Preconditions.checkNotNull(dataType); } private static String toTypeString(DataType dataType) { @@ -137,38 +148,23 @@ private static String toTypeString(DataType dataType) { DecimalType decimalType = (DecimalType) dataType; return String.format( "decimal(%d, %d)", decimalType.getPrecision(), decimalType.getScale()); + case TIMESTAMP_WITHOUT_TIME_ZONE: + int timestampPrecision = ((TimestampType) dataType).getPrecision(); + Preconditions.checkArgument( + timestampPrecision > 3 && timestampPrecision <= 6, + "Paimon Iceberg compatibility only support timestamp type with precision from 4 to 6."); + return "timestamp"; + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + int timestampLtzPrecision = ((LocalZonedTimestampType) dataType).getPrecision(); + Preconditions.checkArgument( + timestampLtzPrecision > 3 && timestampLtzPrecision <= 6, + "Paimon Iceberg compatibility only support timestamp type with precision from 4 to 6."); + return "timestamptz"; default: throw new UnsupportedOperationException("Unsupported data type: " + dataType); } } - private static DataType fromTypeString(String type) { - if ("boolean".equals(type)) { - return DataTypes.BOOLEAN(); - } else if ("int".equals(type)) { - return DataTypes.INT(); - } else if ("long".equals(type)) { - return DataTypes.BIGINT(); - } else if ("float".equals(type)) { - return DataTypes.FLOAT(); - } else if ("double".equals(type)) { - return DataTypes.DOUBLE(); - } else if ("date".equals(type)) { - return DataTypes.DATE(); - } else if ("string".equals(type)) { - return DataTypes.STRING(); - } else if ("binary".equals(type)) { - return DataTypes.BYTES(); - } else if (type.startsWith("decimal")) { - String[] precisionAndScale = - type.substring("decimal(".length(), type.length() - 1).split(", "); - return DataTypes.DECIMAL( - Integer.parseInt(precisionAndScale[0]), Integer.parseInt(precisionAndScale[1])); - } else { - throw new UnsupportedOperationException("Unsupported data type: " + type); - } - } - @Override public int hashCode() { return Objects.hash(id, name, required, type, doc); diff --git a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java index dadbd557a5dd..9a27d5618459 100644 --- a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java @@ -26,6 +26,7 @@ import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.Decimal; import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.Timestamp; import org.apache.paimon.disk.IOManagerImpl; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; @@ -59,6 +60,7 @@ import java.math.BigDecimal; import java.time.LocalDate; +import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -374,7 +376,8 @@ public void testAllTypeStatistics() throws Exception { DataTypes.STRING(), DataTypes.BINARY(20), DataTypes.VARBINARY(20), - DataTypes.DATE() + DataTypes.DATE(), + DataTypes.TIMESTAMP(6) }, new String[] { "v_int", @@ -387,7 +390,8 @@ public void testAllTypeStatistics() throws Exception { "v_varchar", "v_binary", "v_varbinary", - "v_date" + "v_date", + "v_timestamp" }); FileStoreTable table = createPaimonTable(rowType, Collections.emptyList(), Collections.emptyList(), -1); @@ -408,7 +412,8 @@ public void testAllTypeStatistics() throws Exception { BinaryString.fromString("cat"), "B_apple".getBytes(), "B_cat".getBytes(), - 100); + 100, + Timestamp.fromLocalDateTime(LocalDateTime.of(2024, 10, 10, 11, 22, 33))); write.write(lowerBounds); GenericRow upperBounds = GenericRow.of( @@ -422,7 +427,8 @@ public void testAllTypeStatistics() throws Exception { BinaryString.fromString("dog"), "B_banana".getBytes(), "B_dog".getBytes(), - 200); + 200, + Timestamp.fromLocalDateTime(LocalDateTime.of(2024, 10, 20, 11, 22, 33))); write.write(upperBounds); commit.commit(1, write.prepareCommit(false, 1)); @@ -450,6 +456,9 @@ public void testAllTypeStatistics() throws Exception { } else if (type.getTypeRoot() == DataTypeRoot.DECIMAL) { lower = new BigDecimal(lowerBounds.getField(i).toString()); upper = new BigDecimal(upperBounds.getField(i).toString()); + } else if (type.getTypeRoot() == DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) { + lower = ((Timestamp) lowerBounds.getField(i)).toMicros(); + upper = ((Timestamp) upperBounds.getField(i)).toMicros(); } else { lower = lowerBounds.getField(i); upper = upperBounds.getField(i); @@ -460,6 +469,9 @@ public void testAllTypeStatistics() throws Exception { if (type.getTypeRoot() == DataTypeRoot.DATE) { expectedLower = LocalDate.ofEpochDay((int) lower).toString(); expectedUpper = LocalDate.ofEpochDay((int) upper).toString(); + } else if (type.getTypeRoot() == DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) { + expectedLower = Timestamp.fromMicros((long) lower).toString(); + expectedUpper = Timestamp.fromMicros((long) upper).toString(); } assertThat( diff --git a/paimon-flink/paimon-flink-1.17/pom.xml b/paimon-flink/paimon-flink-1.17/pom.xml index 112ecc4c7c7c..7b64f1cdfc67 100644 --- a/paimon-flink/paimon-flink-1.17/pom.xml +++ b/paimon-flink/paimon-flink-1.17/pom.xml @@ -36,7 +36,6 @@ under the License. 1.17.2 1.17 - 1.6.1 diff --git a/paimon-flink/paimon-flink-1.18/pom.xml b/paimon-flink/paimon-flink-1.18/pom.xml index 5aaf42eb4971..db74cf11c91a 100644 --- a/paimon-flink/paimon-flink-1.18/pom.xml +++ b/paimon-flink/paimon-flink-1.18/pom.xml @@ -36,7 +36,6 @@ under the License. 1.18.1 1.18 - 1.6.1 diff --git a/paimon-flink/paimon-flink-1.19/pom.xml b/paimon-flink/paimon-flink-1.19/pom.xml index ab8c2540e9cf..fd874c018a48 100644 --- a/paimon-flink/paimon-flink-1.19/pom.xml +++ b/paimon-flink/paimon-flink-1.19/pom.xml @@ -36,7 +36,6 @@ under the License. 1.19.1 1.19 - 1.6.1 diff --git a/paimon-flink/paimon-flink-common/pom.xml b/paimon-flink/paimon-flink-common/pom.xml index ef0f7fe1776a..77fe611eebe7 100644 --- a/paimon-flink/paimon-flink-common/pom.xml +++ b/paimon-flink/paimon-flink-common/pom.xml @@ -150,6 +150,19 @@ under the License. test + + org.apache.iceberg + iceberg-core + ${iceberg.version} + test + + + + org.apache.iceberg + iceberg-data + ${iceberg.version} + test + diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/iceberg/FlinkIcebergITCaseBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/iceberg/FlinkIcebergITCaseBase.java index efa36020f920..d76394c9ee5b 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/iceberg/FlinkIcebergITCaseBase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/iceberg/FlinkIcebergITCaseBase.java @@ -24,6 +24,13 @@ import org.apache.flink.table.api.TableResult; import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.IcebergGenerics; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.io.CloseableIterable; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -192,7 +199,10 @@ public void testFilterAllTypes(String format) throws Exception { + " v_decimal DECIMAL(8, 3),\n" + " v_varchar STRING,\n" + " v_varbinary VARBINARY(20),\n" - + " v_date DATE\n" + + " v_date DATE,\n" + // it seems that Iceberg Flink connector has some bug when filtering a + // timestamp_ltz, so we don't test it here + + " v_timestamp TIMESTAMP(6)\n" + ") PARTITIONED BY (pt) WITH (\n" + " 'metadata.iceberg.storage' = 'hadoop-catalog',\n" + " 'file.format' = '" @@ -201,9 +211,9 @@ public void testFilterAllTypes(String format) throws Exception { + ")"); tEnv.executeSql( "INSERT INTO paimon.`default`.T VALUES " - + "(1, 1, 1, true, 10, CAST(100.0 AS FLOAT), 1000.0, 123.456, 'cat', CAST('B_cat' AS VARBINARY(20)), DATE '2024-10-10'), " - + "(2, 2, 2, false, 20, CAST(200.0 AS FLOAT), 2000.0, 234.567, 'dog', CAST('B_dog' AS VARBINARY(20)), DATE '2024-10-20'), " - + "(3, 3, CAST(NULL AS INT), CAST(NULL AS BOOLEAN), CAST(NULL AS BIGINT), CAST(NULL AS FLOAT), CAST(NULL AS DOUBLE), CAST(NULL AS DECIMAL(8, 3)), CAST(NULL AS STRING), CAST(NULL AS VARBINARY(20)), CAST(NULL AS DATE))") + + "(1, 1, 1, true, 10, CAST(100.0 AS FLOAT), 1000.0, 123.456, 'cat', CAST('B_cat' AS VARBINARY(20)), DATE '2024-10-10', TIMESTAMP '2024-10-10 11:22:33.123456'), " + + "(2, 2, 2, false, 20, CAST(200.0 AS FLOAT), 2000.0, 234.567, 'dog', CAST('B_dog' AS VARBINARY(20)), DATE '2024-10-20', TIMESTAMP '2024-10-20 11:22:33.123456'), " + + "(3, 3, CAST(NULL AS INT), CAST(NULL AS BOOLEAN), CAST(NULL AS BIGINT), CAST(NULL AS FLOAT), CAST(NULL AS DOUBLE), CAST(NULL AS DECIMAL(8, 3)), CAST(NULL AS STRING), CAST(NULL AS VARBINARY(20)), CAST(NULL AS DATE), CAST(NULL AS TIMESTAMP(6)))") .await(); tEnv.executeSql( @@ -218,7 +228,8 @@ public void testFilterAllTypes(String format) throws Exception { + " v_decimal DECIMAL(8, 3),\n" + " v_varchar STRING,\n" + " v_varbinary VARBINARY(20),\n" - + " v_date DATE\n" + + " v_date DATE,\n" + + " v_timestamp TIMESTAMP(6)\n" + ") PARTITIONED BY (pt) WITH (\n" + " 'connector' = 'iceberg',\n" + " 'catalog-type' = 'hadoop',\n" @@ -246,6 +257,11 @@ public void testFilterAllTypes(String format) throws Exception { .containsExactly(Row.of(1)); assertThat(collect(tEnv.executeSql("SELECT id FROM T where v_date = '2024-10-10'"))) .containsExactly(Row.of(1)); + assertThat( + collect( + tEnv.executeSql( + "SELECT id FROM T where v_timestamp = TIMESTAMP '2024-10-10 11:22:33.123456'"))) + .containsExactly(Row.of(1)); assertThat(collect(tEnv.executeSql("SELECT id FROM T where v_int IS NULL"))) .containsExactly(Row.of(3)); assertThat(collect(tEnv.executeSql("SELECT id FROM T where v_boolean IS NULL"))) @@ -264,6 +280,67 @@ public void testFilterAllTypes(String format) throws Exception { .containsExactly(Row.of(3)); assertThat(collect(tEnv.executeSql("SELECT id FROM T where v_date IS NULL"))) .containsExactly(Row.of(3)); + assertThat(collect(tEnv.executeSql("SELECT id FROM T where v_timestamp IS NULL"))) + .containsExactly(Row.of(3)); + } + + @ParameterizedTest + // orc writer does not write timestamp_ltz correctly, however we won't fix it due to + // compatibility concern, so we don't test orc here + @ValueSource(strings = {"parquet"}) + public void testFilterTimestampLtz(String format) throws Exception { + String warehouse = getTempDirPath(); + TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().parallelism(2).build(); + tEnv.executeSql( + "CREATE CATALOG paimon WITH (\n" + + " 'type' = 'paimon',\n" + + " 'warehouse' = '" + + warehouse + + "'\n" + + ")"); + tEnv.executeSql( + "CREATE TABLE paimon.`default`.T (\n" + + " id INT," + + " v_timestampltz TIMESTAMP_LTZ(6)\n" + + ") WITH (\n" + + " 'metadata.iceberg.storage' = 'hadoop-catalog',\n" + + " 'file.format' = '" + + format + + "'\n" + + ")"); + tEnv.executeSql( + "INSERT INTO paimon.`default`.T VALUES " + + "(1, CAST(TO_TIMESTAMP_LTZ(1100000000321, 3) AS TIMESTAMP_LTZ(6))), " + + "(2, CAST(TO_TIMESTAMP_LTZ(1200000000321, 3) AS TIMESTAMP_LTZ(6))), " + + "(3, CAST(NULL AS TIMESTAMP_LTZ(6)))") + .await(); + + HadoopCatalog icebergCatalog = + new HadoopCatalog(new Configuration(), warehouse + "/iceberg"); + TableIdentifier icebergIdentifier = TableIdentifier.of("default", "T"); + org.apache.iceberg.Table icebergTable = icebergCatalog.loadTable(icebergIdentifier); + + CloseableIterable result = + IcebergGenerics.read(icebergTable) + .where(Expressions.equal("v_timestampltz", 1100000000321000L)) + .build(); + List actual = new ArrayList<>(); + for (Record record : result) { + actual.add(record.get(0)); + } + result.close(); + assertThat(actual).containsExactly(1); + + result = + IcebergGenerics.read(icebergTable) + .where(Expressions.isNull("v_timestampltz")) + .build(); + actual = new ArrayList<>(); + for (Record record : result) { + actual.add(record.get(0)); + } + result.close(); + assertThat(actual).containsExactly(3); } @ParameterizedTest diff --git a/pom.xml b/pom.xml index 1f0d47860124..f8f827d8e1da 100644 --- a/pom.xml +++ b/pom.xml @@ -97,6 +97,7 @@ under the License. 1C true 1.19.1 + 1.6.1