Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/types/ArrayType.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,21 @@ public boolean equals(Object o) {
return elementType.equals(arrayType.elementType);
}

@Override
public boolean equalsIgnoreFieldId(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
ArrayType arrayType = (ArrayType) o;
return elementType.equalsIgnoreFieldId(arrayType.elementType);
}

@Override
public boolean isPrunedFrom(Object o) {
if (this == o) {
Expand Down
26 changes: 19 additions & 7 deletions paimon-common/src/main/java/org/apache/paimon/types/DataField.java
Original file line number Diff line number Diff line change
Expand Up @@ -148,17 +148,29 @@ public boolean equals(Object o) {
&& Objects.equals(description, field.description);
}

public boolean isPrunedFrom(DataField field) {
if (this == field) {
public boolean equalsIgnoreFieldId(DataField other) {
if (this == other) {
return true;
}
if (field == null) {
if (other == null) {
return false;
}
return Objects.equals(id, field.id)
&& Objects.equals(name, field.name)
&& type.isPrunedFrom(field.type)
&& Objects.equals(description, field.description);
return Objects.equals(name, other.name)
&& type.equalsIgnoreFieldId(other.type)
&& Objects.equals(description, other.description);
}

public boolean isPrunedFrom(DataField other) {
if (this == other) {
return true;
}
if (other == null) {
return false;
}
return Objects.equals(id, other.id)
&& Objects.equals(name, other.name)
&& type.isPrunedFrom(other.type)
&& Objects.equals(description, other.description);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ public boolean equals(Object o) {
return isNullable == that.isNullable && typeRoot == that.typeRoot;
}

public boolean equalsIgnoreFieldId(Object o) {
return equals(o);
}

/**
* Determine whether the current type is the result of the target type after pruning (e.g.
* select some fields from a nested type) or just the same.
Expand Down
16 changes: 16 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/types/MapType.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,22 @@ public boolean equals(Object o) {
return keyType.equals(mapType.keyType) && valueType.equals(mapType.valueType);
}

@Override
public boolean equalsIgnoreFieldId(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
MapType mapType = (MapType) o;
return keyType.equalsIgnoreFieldId(mapType.keyType)
&& valueType.equalsIgnoreFieldId(mapType.valueType);
}

@Override
public boolean isPrunedFrom(Object o) {
if (this == o) {
Expand Down
24 changes: 18 additions & 6 deletions paimon-common/src/main/java/org/apache/paimon/types/RowType.java
Original file line number Diff line number Diff line change
Expand Up @@ -210,13 +210,25 @@ public boolean equals(Object o) {
return false;
}
RowType rowType = (RowType) o;
// For nested RowTypes e.g. DataField.dataType = RowType we need to ignoreIds as they can be
// different
if (fields.size() != rowType.fields.size()) {
return fields.equals(rowType.fields);
}

public boolean equalsIgnoreFieldId(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
for (int i = 0; i < fields.size(); ++i) {
if (!DataField.dataFieldEqualsIgnoreId(fields.get(i), rowType.fields.get(i))) {
if (!super.equals(o)) {
return false;
}
RowType other = (RowType) o;
if (fields.size() != other.fields.size()) {
return false;
}
for (int i = 0; i < fields.size(); i++) {
if (!fields.get(i).equalsIgnoreFieldId(other.fields.get(i))) {
return false;
}
}
Expand All @@ -236,7 +248,7 @@ public boolean isPrunedFrom(Object o) {
}
RowType rowType = (RowType) o;
for (DataField field : fields) {
if (!field.isPrunedFrom(rowType.getField(field.name()))) {
if (!field.isPrunedFrom(rowType.getField(field.id()))) {
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ public Schema(
this.partitionKeys = normalizePartitionKeys(partitionKeys);
this.primaryKeys = normalizePrimaryKeys(primaryKeys);
this.fields = normalizeFields(fields, this.primaryKeys, this.partitionKeys);

this.comment = comment;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public TableSchema createTable(Schema schema, boolean externalTable) throws Exce
if (latest.isPresent()) {
TableSchema latestSchema = latest.get();
if (externalTable) {
checkSchemaForExternalTable(latestSchema, schema);
checkSchemaForExternalTable(latestSchema.toSchema(), schema);
return latestSchema;
} else {
throw new IllegalStateException(
Expand Down Expand Up @@ -248,14 +248,15 @@ public TableSchema createTable(Schema schema, boolean externalTable) throws Exce
}
}

private void checkSchemaForExternalTable(TableSchema existsSchema, Schema newSchema) {
private void checkSchemaForExternalTable(Schema existsSchema, Schema newSchema) {
// When creating an external table, if the table already exists in the location, we can
// choose not to specify the fields.
if (newSchema.fields().isEmpty()
// When the fields are explicitly specified, we need check for consistency.
|| (Objects.equals(existsSchema.fields(), newSchema.fields())
&& Objects.equals(existsSchema.partitionKeys(), newSchema.partitionKeys())
&& Objects.equals(existsSchema.primaryKeys(), newSchema.primaryKeys()))) {
if ((newSchema.fields().isEmpty()
|| newSchema.rowType().equalsIgnoreFieldId(existsSchema.rowType()))
&& (newSchema.partitionKeys().isEmpty()
|| Objects.equals(newSchema.partitionKeys(), existsSchema.partitionKeys()))
&& (newSchema.primaryKeys().isEmpty()
|| Objects.equals(newSchema.primaryKeys(), existsSchema.primaryKeys()))) {
// check for options
Map<String, String> existsOptions = existsSchema.options();
Map<String, String> newOptions = newSchema.options();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,10 @@ public static List<DataField> newFields(RowType rowType) {
return rowType.getFields();
}

public Schema toSchema() {
return new Schema(fields, partitionKeys, primaryKeys, options, comment);
}

// =================== Utils for reading =========================

public static TableSchema fromJson(String json) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,63 @@ abstract class DDLWithHiveCatalogTestBase extends PaimonHiveTestBase {
}
}

test("Paimon DDL with hive catalog: create external table with schema evolution") {
Seq(sparkCatalogName, paimonHiveCatalogName).foreach {
catalogName =>
spark.sql(s"USE $catalogName")
withTempDir {
tbLocation =>
withDatabase("paimon_db") {
spark.sql(s"CREATE DATABASE IF NOT EXISTS paimon_db")
spark.sql(s"USE paimon_db")
withTable("t1", "t2") {
val expertTbLocation = tbLocation.getCanonicalPath
spark.sql(
s"""
|CREATE TABLE t1 (a INT, b INT, c STRUCT<f1: INT, f2: INT, f3: INT>) USING paimon
|LOCATION '$expertTbLocation'
|""".stripMargin)
spark.sql("INSERT INTO t1 VALUES (1, 1, STRUCT(1, 1, 1))")
spark.sql("ALTER TABLE t1 DROP COLUMN b")
spark.sql("ALTER TABLE t1 ADD COLUMN b INT")
spark.sql("ALTER TABLE t1 DROP COLUMN c.f2")
spark.sql("ALTER TABLE t1 ADD COLUMN c.f2 INT")
spark.sql("INSERT INTO t1 VALUES (2, STRUCT(1, 1, 1), 1)")
checkAnswer(
spark.sql("SELECT * FROM t1 ORDER by a"),
Seq(Row(1, Row(1, 1, null), null), Row(2, Row(1, 1, 1), 1)))

spark.sql(
s"""
|CREATE TABLE t2 (a INT, c STRUCT<f1: INT, f3: INT, f2: INT>, b INT) USING paimon
|LOCATION '$expertTbLocation'
|""".stripMargin)
checkAnswer(
spark.sql("SELECT * FROM t2 ORDER by a"),
Seq(Row(1, Row(1, 1, null), null), Row(2, Row(1, 1, 1), 1)))

// create table with wrong schema
intercept[Exception] {
spark.sql(
s"""
|CREATE TABLE t3 (a INT, b INT, c STRUCT<f1: INT, f3: INT, f2: INT>) USING paimon
|LOCATION '$expertTbLocation'
|""".stripMargin)
}

intercept[Exception] {
spark.sql(
s"""
|CREATE TABLE t4 (a INT, c STRUCT<f1: INT, f2: INT, f3: INT>, b INT) USING paimon
|LOCATION '$expertTbLocation'
|""".stripMargin)
}
}
}
}
}
}

def getDatabaseProp(dbName: String, propertyName: String): String = {
spark
.sql(s"DESC DATABASE EXTENDED $dbName")
Expand Down
Loading