catalog = loadCatalog(conf, catalogName);
- SortOrder sortOrder = getSortOrder(props, schema);
+ SortOrder sortOrder = HMSTablePropertyHelper.getSortOrder(props, schema);
if (catalog.isPresent()) {
String name = props.getProperty(NAME);
Preconditions.checkNotNull(name, "Table identifier not set");
@@ -156,12 +156,6 @@ public static Table createTable(Configuration conf, Properties props) {
return new HadoopTables(conf).create(schema, spec, sortOrder, map, location);
}
- private static SortOrder getSortOrder(Properties props, Schema schema) {
- String sortOrderJsonString = props.getProperty(TableProperties.DEFAULT_SORT_ORDER);
- return Strings.isNullOrEmpty(sortOrderJsonString) ?
- SortOrder.unsorted() : SortOrderParser.fromJson(schema, sortOrderJsonString);
- }
-
/**
* Drops an Iceberg table using the catalog specified by the configuration.
*
@@ -227,7 +221,7 @@ public static Table registerTable(Configuration conf, Properties props, String m
return catalog.get().registerTable(TableIdentifier.parse(name), metadataLocation);
}
Preconditions.checkNotNull(location, "Table location not set");
- SortOrder sortOrder = getSortOrder(props, schema);
+ SortOrder sortOrder = HMSTablePropertyHelper.getSortOrder(props, schema);
return new HadoopTables(conf).create(schema, spec, sortOrder, map, location);
}
@@ -263,7 +257,8 @@ static Optional loadCatalog(Configuration conf, String catalogName) {
*/
private static Map getCatalogProperties(Configuration conf, String catalogName) {
Map catalogProperties = Maps.newHashMap();
- String keyPrefix = InputFormatConfig.CATALOG_CONFIG_PREFIX + catalogName;
+ String keyPrefix = REST_CATALOG_TYPE.equals(catalogName) ?
+ InputFormatConfig.CATALOG_REST_CONFIG_PREFIX : InputFormatConfig.CATALOG_CONFIG_PREFIX + catalogName;
conf.forEach(config -> {
if (config.getKey().startsWith(InputFormatConfig.CATALOG_DEFAULT_CONFIG_PREFIX)) {
catalogProperties.putIfAbsent(
@@ -275,7 +270,9 @@ private static Map getCatalogProperties(Configuration conf, Stri
config.getValue());
}
});
-
+ if (REST_CATALOG_TYPE.equals(catalogName)) {
+ catalogProperties.put("type", "rest");
+ }
return catalogProperties;
}
@@ -298,7 +295,8 @@ private static String getCatalogType(Configuration conf, String catalogName) {
return catalogType;
}
} else {
- String catalogType = conf.get(CatalogUtil.ICEBERG_CATALOG_TYPE);
+ String catalogType = conf.get(InputFormatConfig.catalogPropertyConfigKey(
+ "", CatalogUtil.ICEBERG_CATALOG_TYPE));
if (catalogType != null && catalogType.equals(LOCATION)) {
return NO_CATALOG_TYPE;
} else {
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
index 4898b4b8954f..d26bdef45588 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
@@ -28,6 +28,7 @@
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.util.SerializationUtil;
+import org.apache.parquet.Strings;
public class InputFormatConfig {
@@ -69,23 +70,15 @@ private InputFormatConfig() {
public static final int COMMIT_TABLE_THREAD_POOL_SIZE_DEFAULT = 10;
public static final String COMMIT_FILE_THREAD_POOL_SIZE = "iceberg.mr.commit.file.thread.pool.size";
public static final int COMMIT_FILE_THREAD_POOL_SIZE_DEFAULT = 10;
- public static final String WRITE_TARGET_FILE_SIZE = "iceberg.mr.write.target.file.size";
public static final String CASE_SENSITIVE = "iceberg.mr.case.sensitive";
public static final boolean CASE_SENSITIVE_DEFAULT = true;
public static final String CATALOG_NAME = "iceberg.catalog";
- public static final String HADOOP_CATALOG = "hadoop.catalog";
- public static final String HADOOP_TABLES = "hadoop.tables";
public static final String HIVE_CATALOG = "hive.catalog";
- public static final String ICEBERG_SNAPSHOTS_TABLE_SUFFIX = ".snapshots";
- public static final String SNAPSHOT_TABLE = "iceberg.snapshots.table";
- public static final String SNAPSHOT_TABLE_SUFFIX = "__snapshots";
public static final String CATALOG_CONFIG_PREFIX = "iceberg.catalog.";
- public static final String CATALOG_TYPE_TEMPLATE = "iceberg.catalog.%s.type";
- public static final String CATALOG_WAREHOUSE_TEMPLATE = "iceberg.catalog.%s.warehouse";
- public static final String CATALOG_CLASS_TEMPLATE = "iceberg.catalog.%s.catalog-impl";
+ public static final String CATALOG_REST_CONFIG_PREFIX = "iceberg.rest-catalog";
public static final String CATALOG_DEFAULT_CONFIG_PREFIX = "iceberg.catalog-default.";
public enum InMemoryDataModel {
@@ -224,7 +217,9 @@ public static boolean fetchVirtualColumns(Configuration conf) {
* @return Hadoop config key of a catalog property for the catalog name
*/
public static String catalogPropertyConfigKey(String catalogName, String catalogProperty) {
- return String.format("%s%s.%s", CATALOG_CONFIG_PREFIX, catalogName, catalogProperty);
+ return Strings.isNullOrEmpty(catalogName) ?
+ String.format("%s%s", CATALOG_CONFIG_PREFIX, catalogProperty) :
+ String.format("%s%s.%s", CATALOG_CONFIG_PREFIX, catalogName, catalogProperty);
}
private static Schema schema(Configuration conf, String key) {
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
index fb53247536dc..cf05e7f137e1 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
@@ -921,7 +921,7 @@ private static PartitionSpec spec(Configuration configuration, Schema schema,
"We can only handle non-partitioned Hive tables. The Iceberg schema should be in " +
InputFormatConfig.PARTITION_SPEC + " or already converted to a partition transform ");
- PartitionSpec spec = IcebergTableUtil.spec(configuration, schema);
+ PartitionSpec spec = HMSTablePropertyHelper.createPartitionSpec(configuration, schema);
if (spec != null) {
Preconditions.checkArgument(hmsTable.getParameters().get(InputFormatConfig.PARTITION_SPEC) == null,
"Provide only one of the following: Hive partition transform specification, or the " +
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index b4d9f0bea227..38cef81076db 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -173,6 +173,7 @@
import org.apache.iceberg.expressions.StrictMetricsEvaluator;
import org.apache.iceberg.hadoop.ConfigProperties;
import org.apache.iceberg.hadoop.HadoopConfigurable;
+import org.apache.iceberg.hive.HMSTablePropertyHelper;
import org.apache.iceberg.hive.HiveSchemaUtil;
import org.apache.iceberg.hive.HiveTableOperations;
import org.apache.iceberg.io.CloseableIterable;
@@ -1662,7 +1663,7 @@ static void overlayTableProperties(Configuration configuration, TableDesc tableD
AbstractSerDe serDe = tableDesc.getDeserializer(configuration);
HiveIcebergSerDe icebergSerDe = (HiveIcebergSerDe) serDe;
schema = icebergSerDe.getTableSchema();
- spec = IcebergTableUtil.spec(configuration, icebergSerDe.getTableSchema());
+ spec = HMSTablePropertyHelper.createPartitionSpec(configuration, icebergSerDe.getTableSchema());
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
index 2e4e5f0a094e..570e2408de85 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
@@ -80,6 +80,7 @@
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.hive.HMSTablePropertyHelper;
import org.apache.iceberg.hive.HiveSchemaUtil;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.mr.Catalogs;
@@ -229,54 +230,9 @@ static PartitionStatisticsFile getPartitionStatsFile(Table table, long snapshotI
.findAny().orElse(null);
}
- /**
- * Create {@link PartitionSpec} based on the partition information stored in
- * {@link TransformSpec}.
- * @param configuration a Hadoop configuration
- * @param schema iceberg table schema
- * @return iceberg partition spec, always non-null
- */
- public static PartitionSpec spec(Configuration configuration, Schema schema) {
- List partitionTransformSpecList = SessionStateUtil
- .getResource(configuration, hive_metastoreConstants.PARTITION_TRANSFORM_SPEC)
- .map(o -> (List) o).orElse(null);
-
- if (partitionTransformSpecList == null) {
- LOG.warn("Iceberg partition transform spec is not found in QueryState.");
- return null;
- }
- PartitionSpec.Builder builder = PartitionSpec.builderFor(schema);
- partitionTransformSpecList.forEach(spec -> {
- switch (spec.getTransformType()) {
- case IDENTITY:
- builder.identity(spec.getColumnName().toLowerCase());
- break;
- case YEAR:
- builder.year(spec.getColumnName());
- break;
- case MONTH:
- builder.month(spec.getColumnName());
- break;
- case DAY:
- builder.day(spec.getColumnName());
- break;
- case HOUR:
- builder.hour(spec.getColumnName());
- break;
- case TRUNCATE:
- builder.truncate(spec.getColumnName(), spec.getTransformParam().get());
- break;
- case BUCKET:
- builder.bucket(spec.getColumnName(), spec.getTransformParam().get());
- break;
- }
- });
- return builder.build();
- }
-
public static void updateSpec(Configuration configuration, Table table) {
// get the new partition transform spec
- PartitionSpec newPartitionSpec = spec(configuration, table.schema());
+ PartitionSpec newPartitionSpec = HMSTablePropertyHelper.createPartitionSpec(configuration, table.schema());
if (newPartitionSpec == null) {
LOG.warn("Iceberg partition spec is not updated due to empty partition spec definition.");
return;
diff --git a/pom.xml b/pom.xml
index 5fbc655ef816..19d90c614872 100644
--- a/pom.xml
+++ b/pom.xml
@@ -151,6 +151,8 @@
4.5.13
4.4.13
2.9.2
+ 5.3
+ 5.3
2.5.2
2.16.1
2.3.4
@@ -608,6 +610,21 @@
httpcore
${httpcomponents.core.version}