Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions docs/content/migration/iceberg-compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,28 @@ You can configure the following table option, so that Paimon is forced to perfor
Note that full compaction is a resource-consuming process, so the value of this table option should not be too small.
We recommend full compaction to be performed once or twice per hour.

## Supported Types

Paimon Iceberg compatibility currently supports the following data types.

| Paimon Data Type | Iceberg Data Type |
|----------------------------|-------------------|
| `BOOLEAN` | `boolean` |
| `INT` | `int` |
| `BIGINT` | `long` |
| `FLOAT` | `float` |
| `DOUBLE` | `double` |
| `DECIMAL` | `decimal` |
| `CHAR` | `string` |
| `VARCHAR` | `string` |
| `BINARY` | `binary` |
| `VARBINARY` | `binary` |
| `DATE` | `date` |
| `TIMESTAMP`* | `timestamp` |
| `TIMESTAMP_LTZ`* | `timestamptz` |

*: `TIMESTAMP` and `TIMESTAMP_LTZ` type only support precision from 4 to 6
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from 4 to 6.


## Other Related Table Options

<table class="table table-bordered">
Expand Down
1 change: 0 additions & 1 deletion paimon-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ under the License.

<properties>
<frocksdbjni.version>6.20.3-ververica-2.0</frocksdbjni.version>
<iceberg.version>1.6.1</iceberg.version>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@

import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.LocalZonedTimestampType;
import org.apache.paimon.types.TimestampType;
import org.apache.paimon.utils.Preconditions;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
Expand Down Expand Up @@ -79,11 +83,26 @@ public static ByteBuffer toByteBuffer(DataType type, Object value) {
case DECIMAL:
Decimal decimal = (Decimal) value;
return ByteBuffer.wrap((decimal.toUnscaledBytes()));
case TIMESTAMP_WITHOUT_TIME_ZONE:
return timestampToByteBuffer(
(Timestamp) value, ((TimestampType) type).getPrecision());
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return timestampToByteBuffer(
(Timestamp) value, ((LocalZonedTimestampType) type).getPrecision());
default:
throw new UnsupportedOperationException("Cannot serialize type: " + type);
}
}

private static ByteBuffer timestampToByteBuffer(Timestamp timestamp, int precision) {
Preconditions.checkArgument(
precision > 3 && precision <= 6,
"Paimon Iceberg compatibility only support timestamp type with precision from 4 to 6.");
return ByteBuffer.allocate(8)
.order(ByteOrder.LITTLE_ENDIAN)
.putLong(0, timestamp.toMicros());
}

public static Object toPaimonObject(DataType type, byte[] bytes) {
switch (type.getTypeRoot()) {
case BOOLEAN:
Expand Down Expand Up @@ -112,6 +131,15 @@ public static Object toPaimonObject(DataType type, byte[] bytes) {
DecimalType decimalType = (DecimalType) type;
return Decimal.fromUnscaledBytes(
bytes, decimalType.getPrecision(), decimalType.getScale());
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
int timestampPrecision = ((TimestampType) type).getPrecision();
long timestampLong =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change the order of these two statements, for example:

Preconditions.checkArgument(
timestampPrecision > 3 && timestampPrecision <= 6,
"Paimon Iceberg compatibility only support timestamp type with precision from 4 to 6.");
long timestampLong =
ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN).getLong();

ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN).getLong();
Preconditions.checkArgument(
timestampPrecision > 3 && timestampPrecision <= 6,
"Paimon Iceberg compatibility only support timestamp type with precision from 4 to 6.");
return Timestamp.fromMicros(timestampLong);
default:
throw new UnsupportedOperationException("Cannot deserialize type: " + type);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@

import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.LocalZonedTimestampType;
import org.apache.paimon.types.TimestampType;
import org.apache.paimon.utils.Preconditions;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
Expand Down Expand Up @@ -57,6 +59,8 @@ public class IcebergDataField {
@JsonProperty(FIELD_TYPE)
private final String type;

@JsonIgnore private final DataType dataType;

@JsonProperty(FIELD_DOC)
private final String doc;

Expand All @@ -66,6 +70,7 @@ public IcebergDataField(DataField dataField, int bias) {
dataField.name(),
!dataField.type().isNullable(),
toTypeString(dataField.type()),
dataField.type(),
dataField.description());
}

Expand All @@ -76,10 +81,16 @@ public IcebergDataField(
@JsonProperty(FIELD_REQUIRED) boolean required,
@JsonProperty(FIELD_TYPE) String type,
@JsonProperty(FIELD_DOC) String doc) {
this(id, name, required, type, null, doc);
}

public IcebergDataField(
int id, String name, boolean required, String type, DataType dataType, String doc) {
this.id = id;
this.name = name;
this.required = required;
this.type = type;
this.dataType = dataType;
this.doc = doc;
}

Expand Down Expand Up @@ -110,7 +121,7 @@ public String doc() {

@JsonIgnore
public DataType dataType() {
return fromTypeString(type);
return Preconditions.checkNotNull(dataType);
}

private static String toTypeString(DataType dataType) {
Expand All @@ -137,38 +148,23 @@ private static String toTypeString(DataType dataType) {
DecimalType decimalType = (DecimalType) dataType;
return String.format(
"decimal(%d, %d)", decimalType.getPrecision(), decimalType.getScale());
case TIMESTAMP_WITHOUT_TIME_ZONE:
int timestampPrecision = ((TimestampType) dataType).getPrecision();
Preconditions.checkArgument(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are four place check this precision.
Should we extract this to reduce common code?

timestampPrecision > 3 && timestampPrecision <= 6,
"Paimon Iceberg compatibility only support timestamp type with precision from 4 to 6.");
return "timestamp";
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
int timestampLtzPrecision = ((LocalZonedTimestampType) dataType).getPrecision();
Preconditions.checkArgument(
timestampLtzPrecision > 3 && timestampLtzPrecision <= 6,
"Paimon Iceberg compatibility only support timestamp type with precision from 4 to 6.");
return "timestamptz";
default:
throw new UnsupportedOperationException("Unsupported data type: " + dataType);
}
}

private static DataType fromTypeString(String type) {
if ("boolean".equals(type)) {
return DataTypes.BOOLEAN();
} else if ("int".equals(type)) {
return DataTypes.INT();
} else if ("long".equals(type)) {
return DataTypes.BIGINT();
} else if ("float".equals(type)) {
return DataTypes.FLOAT();
} else if ("double".equals(type)) {
return DataTypes.DOUBLE();
} else if ("date".equals(type)) {
return DataTypes.DATE();
} else if ("string".equals(type)) {
return DataTypes.STRING();
} else if ("binary".equals(type)) {
return DataTypes.BYTES();
} else if (type.startsWith("decimal")) {
String[] precisionAndScale =
type.substring("decimal(".length(), type.length() - 1).split(", ");
return DataTypes.DECIMAL(
Integer.parseInt(precisionAndScale[0]), Integer.parseInt(precisionAndScale[1]));
} else {
throw new UnsupportedOperationException("Unsupported data type: " + type);
}
}

@Override
public int hashCode() {
return Objects.hash(id, name, required, type, doc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.disk.IOManagerImpl;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
Expand Down Expand Up @@ -59,6 +60,7 @@

import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -374,7 +376,8 @@ public void testAllTypeStatistics() throws Exception {
DataTypes.STRING(),
DataTypes.BINARY(20),
DataTypes.VARBINARY(20),
DataTypes.DATE()
DataTypes.DATE(),
DataTypes.TIMESTAMP(6)
},
new String[] {
"v_int",
Expand All @@ -387,7 +390,8 @@ public void testAllTypeStatistics() throws Exception {
"v_varchar",
"v_binary",
"v_varbinary",
"v_date"
"v_date",
"v_timestamp"
});
FileStoreTable table =
createPaimonTable(rowType, Collections.emptyList(), Collections.emptyList(), -1);
Expand All @@ -408,7 +412,8 @@ public void testAllTypeStatistics() throws Exception {
BinaryString.fromString("cat"),
"B_apple".getBytes(),
"B_cat".getBytes(),
100);
100,
Timestamp.fromLocalDateTime(LocalDateTime.of(2024, 10, 10, 11, 22, 33)));
write.write(lowerBounds);
GenericRow upperBounds =
GenericRow.of(
Expand All @@ -422,7 +427,8 @@ public void testAllTypeStatistics() throws Exception {
BinaryString.fromString("dog"),
"B_banana".getBytes(),
"B_dog".getBytes(),
200);
200,
Timestamp.fromLocalDateTime(LocalDateTime.of(2024, 10, 20, 11, 22, 33)));
write.write(upperBounds);
commit.commit(1, write.prepareCommit(false, 1));

Expand Down Expand Up @@ -450,6 +456,9 @@ public void testAllTypeStatistics() throws Exception {
} else if (type.getTypeRoot() == DataTypeRoot.DECIMAL) {
lower = new BigDecimal(lowerBounds.getField(i).toString());
upper = new BigDecimal(upperBounds.getField(i).toString());
} else if (type.getTypeRoot() == DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) {
lower = ((Timestamp) lowerBounds.getField(i)).toMicros();
upper = ((Timestamp) upperBounds.getField(i)).toMicros();
} else {
lower = lowerBounds.getField(i);
upper = upperBounds.getField(i);
Expand All @@ -460,6 +469,9 @@ public void testAllTypeStatistics() throws Exception {
if (type.getTypeRoot() == DataTypeRoot.DATE) {
expectedLower = LocalDate.ofEpochDay((int) lower).toString();
expectedUpper = LocalDate.ofEpochDay((int) upper).toString();
} else if (type.getTypeRoot() == DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) {
expectedLower = Timestamp.fromMicros((long) lower).toString();
expectedUpper = Timestamp.fromMicros((long) upper).toString();
}

assertThat(
Expand Down
1 change: 0 additions & 1 deletion paimon-flink/paimon-flink-1.17/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ under the License.
<properties>
<flink.version>1.17.2</flink.version>
<iceberg.flink.version>1.17</iceberg.flink.version>
<iceberg.version>1.6.1</iceberg.version>
</properties>

<dependencies>
Expand Down
1 change: 0 additions & 1 deletion paimon-flink/paimon-flink-1.18/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ under the License.
<properties>
<flink.version>1.18.1</flink.version>
<iceberg.flink.version>1.18</iceberg.flink.version>
<iceberg.version>1.6.1</iceberg.version>
</properties>

<dependencies>
Expand Down
1 change: 0 additions & 1 deletion paimon-flink/paimon-flink-1.19/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ under the License.
<properties>
<flink.version>1.19.1</flink.version>
<iceberg.flink.version>1.19</iceberg.flink.version>
<iceberg.version>1.6.1</iceberg.version>
</properties>

<dependencies>
Expand Down
13 changes: 13 additions & 0 deletions paimon-flink/paimon-flink-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,19 @@ under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
<version>${iceberg.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-data</artifactId>
<version>${iceberg.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Loading