Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,27 +102,19 @@ public Type atomic(org.apache.doris.catalog.Type atomic) {
PrimitiveType primitiveType = atomic.getPrimitiveType();
if (primitiveType.equals(PrimitiveType.BOOLEAN)) {
return Types.BooleanType.get();
} else if (primitiveType.equals(PrimitiveType.TINYINT)
|| primitiveType.equals(PrimitiveType.SMALLINT)
|| primitiveType.equals(PrimitiveType.INT)) {
} else if (primitiveType.equals(PrimitiveType.INT)) {
return Types.IntegerType.get();
} else if (primitiveType.equals(PrimitiveType.BIGINT)
|| primitiveType.equals(PrimitiveType.LARGEINT)) {
} else if (primitiveType.equals(PrimitiveType.BIGINT)) {
return Types.LongType.get();
} else if (primitiveType.equals(PrimitiveType.FLOAT)) {
return Types.FloatType.get();
} else if (primitiveType.equals(PrimitiveType.DOUBLE)) {
return Types.DoubleType.get();
} else if (primitiveType.equals(PrimitiveType.CHAR)
|| primitiveType.equals(PrimitiveType.VARCHAR)
|| primitiveType.equals(PrimitiveType.STRING)) {
} else if (primitiveType.equals(PrimitiveType.STRING)) {
return Types.StringType.get();
} else if (primitiveType.equals(PrimitiveType.DATE)
|| primitiveType.equals(PrimitiveType.DATEV2)) {
return Types.DateType.get();
} else if (primitiveType.equals(PrimitiveType.TIME)
|| primitiveType.equals(PrimitiveType.TIMEV2)) {
return Types.TimeType.get();
} else if (primitiveType.equals(PrimitiveType.DECIMALV2)
|| primitiveType.isDecimalV3Type()) {
return Types.DecimalType.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public void createTable(CreateTableStmt stmt) throws UserException {
Schema schema = new Schema(visit.asNestedType().asStructType().fields());
Map<String, String> properties = stmt.getProperties();
properties.put(ExternalCatalog.DORIS_VERSION, ExternalCatalog.DORIS_VERSION_VALUE);
PartitionSpec partitionSpec = IcebergUtils.solveIcebergPartitionSpec(properties, schema);
PartitionSpec partitionSpec = IcebergUtils.solveIcebergPartitionSpec(stmt.getPartitionDesc(), schema);
catalog.createTable(TableIdentifier.of(dbName, tableName), schema, partitionSpec, properties);
db.setUnInitialized(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
import org.apache.doris.analysis.DecimalLiteral;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.FloatLiteral;
import org.apache.doris.analysis.FunctionCallExpr;
import org.apache.doris.analysis.InPredicate;
import org.apache.doris.analysis.IntLiteral;
import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.analysis.NullLiteral;
import org.apache.doris.analysis.PartitionDesc;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.analysis.Subquery;
Expand Down Expand Up @@ -63,8 +65,6 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* Iceberg utils
Expand All @@ -78,7 +78,6 @@ public Integer initialValue() {
}
};
static long MILLIS_TO_NANO_TIME = 1000;
private static final Pattern PARTITION_REG = Pattern.compile("(\\w+)\\((\\d+)?,?(\\w+)\\)");
// https://iceberg.apache.org/spec/#schemas-and-data-types
// All time and timestamp values are stored with microsecond precision
private static final int ICEBERG_DATETIME_SCALE_MS = 6;
Expand Down Expand Up @@ -415,57 +414,51 @@ private static SlotRef convertDorisExprToSlotRef(Expr expr) {
return slotRef;
}

// "partition"="c1;day(c1);bucket(4,c3)"
public static PartitionSpec solveIcebergPartitionSpec(Map<String, String> properties, Schema schema)
public static PartitionSpec solveIcebergPartitionSpec(PartitionDesc partitionDesc, Schema schema)
throws UserException {
if (properties.containsKey("partition")) {
PartitionSpec.Builder builder = PartitionSpec.builderFor(schema);
String par = properties.get("partition").replaceAll(" ", "");
String[] pars = par.split(";");
for (String func : pars) {
if (func.contains("(")) {
Matcher matcher = PARTITION_REG.matcher(func);
if (matcher.matches()) {
switch (matcher.group(1).toLowerCase()) {
case "bucket":
builder.bucket(matcher.group(3), Integer.parseInt(matcher.group(2)));
break;
case "year":
case "years":
builder.year(matcher.group(3));
break;
case "month":
case "months":
builder.month(matcher.group(3));
break;
case "date":
case "day":
case "days":
builder.day(matcher.group(3));
break;
case "date_hour":
case "hour":
case "hours":
builder.hour(matcher.group(3));
break;
case "truncate":
builder.truncate(matcher.group(3), Integer.parseInt(matcher.group(2)));
break;
default:
throw new UserException("unsupported partition for " + matcher.group(1));
}
} else {
throw new UserException("failed to get partition info from " + func);
}
} else {
builder.identity(func);
if (partitionDesc == null) {
return PartitionSpec.unpartitioned();
}

ArrayList<Expr> partitionExprs = partitionDesc.getPartitionExprs();
PartitionSpec.Builder builder = PartitionSpec.builderFor(schema);
for (Expr expr : partitionExprs) {
if (expr instanceof SlotRef) {
builder.identity(((SlotRef) expr).getColumnName());
} else if (expr instanceof FunctionCallExpr) {
String exprName = expr.getExprName();
List<Expr> params = ((FunctionCallExpr) expr).getParams().exprs();
switch (exprName.toLowerCase()) {
case "bucket":
builder.bucket(params.get(1).getExprName(), Integer.parseInt(params.get(0).getStringValue()));
break;
case "year":
case "years":
builder.year(params.get(0).getExprName());
break;
case "month":
case "months":
builder.month(params.get(0).getExprName());
break;
case "date":
case "day":
case "days":
builder.day(params.get(0).getExprName());
break;
case "date_hour":
case "hour":
case "hours":
builder.hour(params.get(0).getExprName());
break;
case "truncate":
builder.truncate(params.get(1).getExprName(), Integer.parseInt(params.get(0).getStringValue()));
break;
default:
throw new UserException("unsupported partition for " + exprName);
}
}
properties.remove("partition");
return builder.build();
} else {
return PartitionSpec.unpartitioned();
}
return builder.build();
}

private static Type icebergPrimitiveTypeToDorisType(org.apache.iceberg.types.Type.PrimitiveType primitive) {
Expand Down Expand Up @@ -567,5 +560,4 @@ public static long getIcebergRowCount(ExternalCatalog catalog, String dbName, St
}
return -1;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -747,4 +747,8 @@ public CreateTableStmt translateToLegacyStmt() {
partitionDesc, distributionDesc, Maps.newHashMap(properties), extProperties,
comment, addRollups, null);
}

public void setIsExternal(boolean isExternal) {
this.isExternal = isExternal;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.iceberg;

import org.apache.doris.analysis.CreateCatalogStmt;
import org.apache.doris.analysis.CreateDbStmt;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.DbName;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.CatalogFactory;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.trees.plans.commands.CreateTableCommand;
import org.apache.doris.nereids.trees.plans.commands.info.CreateTableInfo;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.qe.ConnectContext;

import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;

import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;

public class CreateIcebergTableTest {

public static String warehouse;
public static IcebergHadoopExternalCatalog icebergCatalog;
public static IcebergMetadataOps ops;
public static String dbName = "testdb";
public static ConnectContext connectContext;

@BeforeClass
public static void beforeClass() throws Throwable {
Path warehousePath = Files.createTempDirectory("test_warehouse_");
warehouse = "file://" + warehousePath.toAbsolutePath() + "/";

HashMap<String, String> param = new HashMap<>();
param.put("type", "iceberg");
param.put("iceberg.catalog.type", "hadoop");
param.put("warehouse", warehouse);

// create catalog
CreateCatalogStmt createCatalogStmt = new CreateCatalogStmt(true, "iceberg", "", param, "comment");
icebergCatalog = (IcebergHadoopExternalCatalog) CatalogFactory.createFromStmt(1, createCatalogStmt);
icebergCatalog.setInitialized(true);

// create db
ops = new IcebergMetadataOps(icebergCatalog, icebergCatalog.getCatalog());
CreateDbStmt createDbStmt = new CreateDbStmt(true, new DbName("iceberg", dbName), null);
ops.createDb(createDbStmt);
icebergCatalog.setInitialized(true);
IcebergExternalDatabase db = new IcebergExternalDatabase(icebergCatalog, 1L, dbName);
icebergCatalog.addDatabaseForTest(db);

// context
connectContext = new ConnectContext();
connectContext.setThreadLocalInfo();
}

@Test
public void testSimpleTable() throws UserException {
TableIdentifier tb = TableIdentifier.of(dbName, getTableName());
String sql = "create table " + tb + " (id int) engine = iceberg";
createTable(sql);
Table table = ops.getCatalog().loadTable(tb);
Schema schema = table.schema();
Assert.assertEquals(1, schema.columns().size());
Assert.assertEquals(PartitionSpec.unpartitioned(), table.spec());
}

@Test
public void testProperties() throws UserException {
TableIdentifier tb = TableIdentifier.of(dbName, getTableName());
String sql = "create table " + tb + " (id int) engine = iceberg properties(\"a\"=\"b\")";
createTable(sql);
Table table = ops.getCatalog().loadTable(tb);
Schema schema = table.schema();
Assert.assertEquals(1, schema.columns().size());
Assert.assertEquals(PartitionSpec.unpartitioned(), table.spec());
Assert.assertEquals("b", table.properties().get("a"));
}

@Test
public void testType() throws UserException {
TableIdentifier tb = TableIdentifier.of(dbName, getTableName());
String sql = "create table " + tb + " ("
+ "c0 int, "
+ "c1 bigint, "
+ "c2 float, "
+ "c3 double, "
+ "c4 string, "
+ "c5 date, "
+ "c6 decimal(20, 10), "
+ "c7 datetime"
+ ") engine = iceberg "
+ "properties(\"a\"=\"b\")";
createTable(sql);
Table table = ops.getCatalog().loadTable(tb);
Schema schema = table.schema();
List<Types.NestedField> columns = schema.columns();
Assert.assertEquals(8, columns.size());
Assert.assertEquals(Type.TypeID.INTEGER, columns.get(0).type().typeId());
Assert.assertEquals(Type.TypeID.LONG, columns.get(1).type().typeId());
Assert.assertEquals(Type.TypeID.FLOAT, columns.get(2).type().typeId());
Assert.assertEquals(Type.TypeID.DOUBLE, columns.get(3).type().typeId());
Assert.assertEquals(Type.TypeID.STRING, columns.get(4).type().typeId());
Assert.assertEquals(Type.TypeID.DATE, columns.get(5).type().typeId());
Assert.assertEquals(Type.TypeID.DECIMAL, columns.get(6).type().typeId());
Assert.assertEquals(Type.TypeID.TIMESTAMP, columns.get(7).type().typeId());
}

@Test
public void testPartition() throws UserException {
TableIdentifier tb = TableIdentifier.of(dbName, getTableName());
String sql = "create table " + tb + " ("
+ "id int, "
+ "ts1 datetime, "
+ "ts2 datetime, "
+ "ts3 datetime, "
+ "ts4 datetime, "
+ "dt1 date, "
+ "dt2 date, "
+ "dt3 date, "
+ "s string"
+ ") engine = iceberg "
+ "partition by ("
+ "id, "
+ "bucket(2, id), "
+ "year(ts1), "
+ "year(dt1), "
+ "month(ts2), "
+ "month(dt2), "
+ "day(ts3), "
+ "day(dt3), "
+ "hour(ts4), "
+ "truncate(10, s)) ()"
+ "properties(\"a\"=\"b\")";
createTable(sql);
Table table = ops.getCatalog().loadTable(tb);
Schema schema = table.schema();
Assert.assertEquals(9, schema.columns().size());
PartitionSpec spec = PartitionSpec.builderFor(schema)
.identity("id")
.bucket("id", 2)
.year("ts1")
.year("dt1")
.month("ts2")
.month("dt2")
.day("ts3")
.day("dt3")
.hour("ts4")
.truncate("s", 10)
.build();
Assert.assertEquals(spec, table.spec());
Assert.assertEquals("b", table.properties().get("a"));
}

public void createTable(String sql) throws UserException {
LogicalPlan plan = new NereidsParser().parseSingle(sql);
Assertions.assertTrue(plan instanceof CreateTableCommand);
CreateTableInfo createTableInfo = ((CreateTableCommand) plan).getCreateTableInfo();
createTableInfo.setIsExternal(true);
CreateTableStmt createTableStmt = createTableInfo.translateToLegacyStmt();
ops.createTable(createTableStmt);
}

public String getTableName() {
String s = "test_tb_" + UUID.randomUUID();
return s.replaceAll("-", "");
}
}