diff --git a/iceberg/iceberg-catalog/pom.xml b/iceberg/iceberg-catalog/pom.xml
index 50058ddfb179..643acc49832d 100644
--- a/iceberg/iceberg-catalog/pom.xml
+++ b/iceberg/iceberg-catalog/pom.xml
@@ -114,5 +114,17 @@
+
+ org.apache.httpcomponents.client5
+ httpclient5
+
+
+ org.apache.httpcomponents.core5
+ httpcore5
+
+
+ org.apache.httpcomponents.core5
+ httpcore5-h2
+
diff --git a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/CatalogUtils.java b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/CatalogUtils.java
new file mode 100644
index 000000000000..f2fbd5401798
--- /dev/null
+++ b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/CatalogUtils.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hive;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public class CatalogUtils {
+ public static final String NAME = "name";
+ public static final String LOCATION = "location";
+ public static final String CATALOG_NAME = "iceberg.catalog";
+ public static final String CATALOG_CONFIG_PREFIX = "iceberg.catalog.";
+ public static final String CATALOG_WAREHOUSE_TEMPLATE = "iceberg.catalog.%s.warehouse";
+ public static final String CATALOG_IMPL_TEMPLATE = "iceberg.catalog.%s.catalog-impl";
+ public static final String CATALOG_DEFAULT_CONFIG_PREFIX = "iceberg.catalog-default.";
+ public static final String ICEBERG_HADOOP_TABLE_NAME = "location_based_table";
+ public static final String ICEBERG_DEFAULT_CATALOG_NAME = "default_iceberg";
+ public static final String NO_CATALOG_TYPE = "no catalog";
+ public static final Set PROPERTIES_TO_REMOVE = ImmutableSet.of(
+ // We don't want to push down the metadata location props to Iceberg from HMS,
+ // since the snapshot pointer in HMS would always be one step ahead
+ BaseMetastoreTableOperations.METADATA_LOCATION_PROP,
+ BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP);
+
+ private CatalogUtils() {
+
+ }
+
+ /**
+ * Calculates the properties we would like to send to the catalog.
+ *
+ *
The base of the properties is the properties stored at the Hive Metastore for the given table
+ *
We add the {@link CatalogUtils#LOCATION} as the table location
+ *
We add the {@link CatalogUtils#NAME} as
+ * TableIdentifier defined by the database name and table name
+ *
We add the serdeProperties of the HMS table
+ *
We remove some parameters that we don't want to push down to the Iceberg table props
+ *
+ * @param hmsTable Table for which we are calculating the properties
+ * @return The properties we can provide for Iceberg functions
+ */
+ public static Properties getCatalogProperties(org.apache.hadoop.hive.metastore.api.Table hmsTable) {
+ Properties properties = new Properties();
+ properties.putAll(toIcebergProperties(hmsTable.getParameters()));
+
+ if (properties.get(LOCATION) == null && hmsTable.getSd() != null &&
+ hmsTable.getSd().getLocation() != null) {
+ properties.put(LOCATION, hmsTable.getSd().getLocation());
+ }
+
+ if (properties.get(NAME) == null) {
+ properties.put(NAME, TableIdentifier.of(hmsTable.getDbName(),
+ hmsTable.getTableName()).toString());
+ }
+
+ SerDeInfo serdeInfo = hmsTable.getSd().getSerdeInfo();
+ if (serdeInfo != null) {
+ properties.putAll(toIcebergProperties(serdeInfo.getParameters()));
+ }
+
+ // Remove HMS table parameters we don't want to propagate to Iceberg
+ PROPERTIES_TO_REMOVE.forEach(properties::remove);
+
+ return properties;
+ }
+
+ private static Properties toIcebergProperties(Map parameters) {
+ Properties properties = new Properties();
+ parameters.entrySet().stream()
+ .filter(e -> e.getKey() != null && e.getValue() != null)
+ .forEach(e -> {
+ String icebergKey = HMSTablePropertyHelper.translateToIcebergProp(e.getKey());
+ properties.put(icebergKey, e.getValue());
+ });
+ return properties;
+ }
+
+ /**
+ * Collect all the catalog specific configuration from the global hive configuration.
+ * @param conf a Hadoop configuration
+ * @param catalogName name of the catalog
+ * @return complete map of catalog properties
+ */
+ public static Map getCatalogProperties(Configuration conf, String catalogName) {
+ Map catalogProperties = Maps.newHashMap();
+ String keyPrefix = CATALOG_CONFIG_PREFIX + catalogName;
+ conf.forEach(config -> {
+ if (config.getKey().startsWith(CatalogUtils.CATALOG_DEFAULT_CONFIG_PREFIX)) {
+ catalogProperties.putIfAbsent(
+ config.getKey().substring(CatalogUtils.CATALOG_DEFAULT_CONFIG_PREFIX.length()),
+ config.getValue());
+ } else if (config.getKey().startsWith(keyPrefix)) {
+ catalogProperties.put(
+ config.getKey().substring(keyPrefix.length() + 1),
+ config.getValue());
+ }
+ });
+
+ return catalogProperties;
+ }
+
+ public static String getCatalogName(Configuration conf) {
+ return MetastoreConf.getVar(conf, MetastoreConf.ConfVars.CATALOG_DEFAULT);
+ }
+
+ public static String getCatalogType(Configuration conf) {
+ return getCatalogType(conf, CatalogUtils.getCatalogName(conf));
+ }
+
+ public static boolean isHadoopTable(Configuration conf, Properties catalogProperties) {
+ String catalogName = catalogProperties.getProperty(CATALOG_NAME);
+ return ICEBERG_HADOOP_TABLE_NAME.equals(catalogName) || hadoopCatalog(conf, catalogProperties);
+ }
+
+ public static boolean hadoopCatalog(Configuration conf, Properties props) {
+ return assertCatalogType(conf, props, CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP, CatalogUtil.ICEBERG_CATALOG_HADOOP);
+ }
+
+ /**
+ * Get Hadoop config key of a catalog property based on catalog name
+ * @param catalogName catalog name
+ * @param catalogProperty catalog property, can be any custom property,
+ * a commonly used list of properties can be found
+ * at {@link org.apache.iceberg.CatalogProperties}
+ * @return Hadoop config key of a catalog property for the catalog name
+ */
+ public static String catalogPropertyConfigKey(String catalogName, String catalogProperty) {
+ return String.format("%s%s.%s", CATALOG_CONFIG_PREFIX, catalogName, catalogProperty);
+ }
+
+ /**
+ * Return the catalog type based on the catalog name.
+ *
+ * See Catalogs documentation for catalog type resolution strategy.
+ *
+ * @param conf global hive configuration
+ * @param catalogName name of the catalog
+ * @return type of the catalog, can be null
+ */
+ public static String getCatalogType(Configuration conf, String catalogName) {
+ if (!StringUtils.isEmpty(catalogName)) {
+ String catalogType = conf.get(catalogPropertyConfigKey(
+ catalogName, CatalogUtil.ICEBERG_CATALOG_TYPE));
+ if (catalogName.equals(ICEBERG_HADOOP_TABLE_NAME)) {
+ return NO_CATALOG_TYPE;
+ } else {
+ return catalogType;
+ }
+ } else {
+ String catalogType = conf.get(CatalogUtil.ICEBERG_CATALOG_TYPE);
+ if (catalogType != null && catalogType.equals(LOCATION)) {
+ return NO_CATALOG_TYPE;
+ } else {
+ return catalogType;
+ }
+ }
+ }
+
+ public static String getCatalogImpl(Configuration conf, String catalogName) {
+ return Optional.ofNullable(catalogName)
+ .filter(StringUtils::isNotEmpty)
+ .map(name -> String.format(CatalogUtils.CATALOG_IMPL_TEMPLATE, name))
+ .map(conf::get)
+ .orElse(null);
+ }
+
+ public static boolean assertCatalogType(Configuration conf, Properties props, String expectedType,
+ String expectedImpl) {
+ String catalogName = props.getProperty(CATALOG_NAME);
+ String catalogType = Optional.ofNullable(CatalogUtils.getCatalogType(conf, catalogName))
+ .orElseGet(() -> CatalogUtils.getCatalogType(conf, ICEBERG_DEFAULT_CATALOG_NAME));
+
+ if (catalogType != null) {
+ return expectedType.equalsIgnoreCase(catalogType);
+ }
+
+ String actualImpl = CatalogUtils.getCatalogProperties(conf, catalogName).get(CatalogProperties.CATALOG_IMPL);
+
+ // Return true immediately if the strings are equal (this also handles both being null).
+ if (StringUtils.equals(expectedImpl, actualImpl)) {
+ return true;
+ }
+
+ // If they are not equal, but one of them is null, they can't be subtypes.
+ if (expectedImpl == null || actualImpl == null) {
+ return false;
+ }
+
+ // Now that we know both are non-null and not equal, check the class hierarchy.
+ try {
+ return Class.forName(expectedImpl).isAssignableFrom(Class.forName(actualImpl));
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(String.format("Error checking if catalog %s is subtype of %s",
+ catalogName, expectedImpl), e);
+ }
+ }
+}
diff --git a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HMSTablePropertyHelper.java b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HMSTablePropertyHelper.java
index 2eaaaee1272e..45253e1dd8d1 100644
--- a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HMSTablePropertyHelper.java
+++ b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HMSTablePropertyHelper.java
@@ -22,17 +22,20 @@
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
+import java.util.Properties;
import java.util.Set;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hive.iceberg.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.SortOrder;
import org.apache.iceberg.SortOrderParser;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableProperties;
@@ -42,6 +45,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.JsonUtil;
+import org.apache.parquet.Strings;
import org.apache.parquet.hadoop.ParquetOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,6 +55,7 @@
public class HMSTablePropertyHelper {
private static final Logger LOG = LoggerFactory.getLogger(HMSTablePropertyHelper.class);
public static final String HIVE_ICEBERG_STORAGE_HANDLER = "org.apache.iceberg.mr.hive.HiveIcebergStorageHandler";
+ public static final String PARTITION_SPEC = "iceberg.mr.table.partition.spec";
private static final BiMap ICEBERG_TO_HMS_TRANSLATION = ImmutableBiMap.of(
// gc.enabled in Iceberg and external.table.purge in Hive are meant to do the same things
@@ -63,7 +68,7 @@ private HMSTablePropertyHelper() {
/**
* Provides key translation where necessary between Iceberg and HMS props. This translation is needed because some
- * properties control the same behaviour but are named differently in Iceberg and Hive. Therefore changes to these
+ * properties control the same behaviour but are named differently in Iceberg and Hive. Therefore, changes to these
* property pairs should be synchronized.
*
* Example: Deleting data files upon DROP TABLE is enabled using gc.enabled=true in Iceberg and
@@ -131,6 +136,12 @@ public static void updateHmsTableForIcebergTable(
tbl.setParameters(parameters);
}
+ public static SortOrder getSortOrder(Properties props, Schema schema) {
+ String sortOrderJsonString = props.getProperty(TableProperties.DEFAULT_SORT_ORDER);
+ return Strings.isNullOrEmpty(sortOrderJsonString) ? SortOrder.unsorted() : SortOrderParser.fromJson(schema,
+ sortOrderJsonString);
+ }
+
private static void setCommonParameters(
String newMetadataLocation,
String uuid,
@@ -143,8 +154,9 @@ private static void setCommonParameters(
if (uuid != null) {
parameters.put(TableProperties.UUID, uuid);
}
-
- obsoleteProps.forEach(parameters::remove);
+ if (obsoleteProps != null) {
+ obsoleteProps.forEach(parameters::remove);
+ }
parameters.put(BaseMetastoreTableOperations.TABLE_TYPE_PROP, tableType);
parameters.put(BaseMetastoreTableOperations.METADATA_LOCATION_PROP, newMetadataLocation);
@@ -158,7 +170,7 @@ private static void setCommonParameters(
@VisibleForTesting
static void setStorageHandler(Map parameters, boolean hiveEngineEnabled) {
- // If needed set the 'storage_handler' property to enable query from Hive
+ // If needed, set the 'storage_handler' property to enable query from Hive
if (hiveEngineEnabled) {
parameters.put(hive_metastoreConstants.META_TABLE_STORAGE, HIVE_ICEBERG_STORAGE_HANDLER);
} else {
@@ -209,6 +221,16 @@ static void setPartitionSpec(TableMetadata metadata, Map paramet
}
}
+ public static PartitionSpec getPartitionSpec(Map props, Schema schema) {
+ String specJson = props.getOrDefault(
+ PARTITION_SPEC,
+ props.get(TableProperties.DEFAULT_PARTITION_SPEC)
+ );
+ return Optional.ofNullable(specJson)
+ .map(spec -> PartitionSpecParser.fromJson(schema, spec))
+ .orElseGet(PartitionSpec::unpartitioned);
+ }
+
@VisibleForTesting
static void setSortOrder(TableMetadata metadata, Map parameters, long maxHiveTablePropertySize) {
parameters.remove(TableProperties.DEFAULT_SORT_ORDER);
diff --git a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/MetastoreUtil.java b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/MetastoreUtil.java
index dd9fc44a5c85..1744b9b135cb 100644
--- a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/MetastoreUtil.java
+++ b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/MetastoreUtil.java
@@ -19,17 +19,39 @@
package org.apache.iceberg.hive;
+import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.common.TableName;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.thrift.TException;
public class MetastoreUtil {
+
+ public static final String DEFAULT_INPUT_FORMAT_CLASS = "org.apache.iceberg.mr.hive.HiveIcebergInputFormat";
+ public static final String DEFAULT_OUTPUT_FORMAT_CLASS = "org.apache.iceberg.mr.hive.HiveIcebergOutputFormat";
+ public static final String DEFAULT_SERDE_CLASS = "org.apache.iceberg.mr.hive.HiveIcebergSerDe";
+
private static final DynMethods.UnboundMethod ALTER_TABLE =
DynMethods.builder("alter_table")
.impl(
@@ -87,4 +109,60 @@ public static void alterTable(
}
}
}
+
+ public static List getPartitionKeys(org.apache.iceberg.Table table, int specId) {
+ Schema schema = table.specs().get(specId).schema();
+ List hiveSchema = HiveSchemaUtil.convert(schema);
+ Map colNameToColType = hiveSchema.stream()
+ .collect(Collectors.toMap(FieldSchema::getName, FieldSchema::getType));
+ return table.specs().get(specId).fields().stream()
+ .map(partField -> new FieldSchema(
+ schema.findColumnName(partField.sourceId()),
+ colNameToColType.get(schema.findColumnName(partField.sourceId())),
+ String.format("Transform: %s", partField.transform().toString()))
+ )
+ .toList();
+ }
+
+ public static Table toHiveTable(org.apache.iceberg.Table table, Configuration conf) {
+ var result = new Table();
+ TableName tableName = TableName.fromString(table.name(), MetaStoreUtils.getDefaultCatalog(conf),
+ Warehouse.DEFAULT_DATABASE_NAME);
+ result.setCatName(tableName.getCat());
+ result.setDbName(tableName.getDb());
+ result.setTableName(tableName.getTable());
+ result.setTableType(TableType.EXTERNAL_TABLE.toString());
+ result.setPartitionKeys(getPartitionKeys(table, table.spec().specId()));
+ TableMetadata metadata = ((BaseTable) table).operations().current();
+ long maxHiveTablePropertySize = conf.getLong(HiveOperationsBase.HIVE_TABLE_PROPERTY_MAX_SIZE,
+ HiveOperationsBase.HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT);
+ HMSTablePropertyHelper.updateHmsTableForIcebergTable(metadata.metadataFileLocation(), result, metadata,
+ null, true, maxHiveTablePropertySize, null);
+ String catalogType = CatalogUtils.getCatalogType(conf);
+ if (!StringUtils.isEmpty(catalogType) && !CatalogUtils.NO_CATALOG_TYPE.equals(catalogType)) {
+ result.getParameters().put(CatalogUtil.ICEBERG_CATALOG_TYPE, CatalogUtils.getCatalogType(conf));
+ }
+ result.setSd(getHiveStorageDescriptor(table));
+ return result;
+ }
+
+ private static StorageDescriptor getHiveStorageDescriptor(org.apache.iceberg.Table table) {
+ var result = new StorageDescriptor();
+ result.setCols(HiveSchemaUtil.convert(table.schema()));
+ result.setBucketCols(Lists.newArrayList());
+ result.setNumBuckets(-1);
+ result.setSortCols(Lists.newArrayList());
+ result.setInputFormat(DEFAULT_INPUT_FORMAT_CLASS);
+ result.setOutputFormat(DEFAULT_OUTPUT_FORMAT_CLASS);
+ result.setSerdeInfo(getHiveSerdeInfo());
+ result.setLocation(table.location());
+ result.setParameters(Maps.newHashMap());
+ return result;
+ }
+
+ private static SerDeInfo getHiveSerdeInfo() {
+ var result = new SerDeInfo("icebergSerde", DEFAULT_SERDE_CLASS, Maps.newHashMap());
+ result.getParameters().put(serdeConstants.SERIALIZATION_FORMAT, "1"); // Default serialization format.
+ return result;
+ }
}
diff --git a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/client/HiveRESTCatalogClient.java b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/client/HiveRESTCatalogClient.java
new file mode 100644
index 000000000000..c1ffaa4a666c
--- /dev/null
+++ b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/client/HiveRESTCatalogClient.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hive.client;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.regex.Pattern;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.CreateTableRequest;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.DropDatabaseRequest;
+import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.GetTableRequest;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.PrincipalType;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.client.BaseMetaStoreClient;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hive.CatalogUtils;
+import org.apache.iceberg.hive.HMSTablePropertyHelper;
+import org.apache.iceberg.hive.HiveSchemaUtil;
+import org.apache.iceberg.hive.MetastoreUtil;
+import org.apache.iceberg.hive.RuntimeMetaException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.rest.RESTCatalog;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HiveRESTCatalogClient extends BaseMetaStoreClient {
+
+ public static final String NAMESPACE_SEPARATOR = ".";
+ public static final String DB_OWNER = "owner";
+ public static final String DB_OWNER_TYPE = "ownerType";
+
+ private static final Logger LOG = LoggerFactory.getLogger(HiveRESTCatalogClient.class);
+
+ private RESTCatalog restCatalog;
+
+ public HiveRESTCatalogClient(Configuration conf, boolean allowEmbedded) {
+ this(conf);
+ }
+
+ public HiveRESTCatalogClient(Configuration conf) {
+ super(conf);
+ reconnect();
+ }
+
+ @Override
+ public void reconnect() {
+ close();
+ String catName = MetaStoreUtils.getDefaultCatalog(conf);
+ Map properties = CatalogUtils.getCatalogProperties(conf, CatalogUtils.getCatalogName(conf));
+ restCatalog = (RESTCatalog) CatalogUtil.buildIcebergCatalog(catName, properties, null);
+ }
+
+ @Override
+ public void close() {
+ try {
+ if (restCatalog != null) {
+ restCatalog.close();
+ }
+ } catch (IOException e) {
+ throw new RuntimeMetaException(e.getCause(), "Failed to close existing REST catalog");
+ }
+ }
+
+ @Override
+ public List getDatabases(String catName, String dbPattern) {
+ validateCurrentCatalog(catName);
+ // Convert the Hive glob pattern (e.g., "db*") to a valid Java regex ("db.*").
+ String regex = dbPattern.replace("*", ".*");
+ Pattern pattern = Pattern.compile(regex);
+
+ return restCatalog.listNamespaces(Namespace.empty()).stream()
+ .map(Namespace::toString)
+ .filter(pattern.asPredicate())
+ .toList();
+ }
+
+ @Override
+ public List getAllDatabases(String catName) {
+ return getDatabases(catName, "*");
+ }
+
+ @Override
+ public List getTables(String catName, String dbName, String tablePattern) {
+ validateCurrentCatalog(catName);
+
+ // Convert the Hive glob pattern to a Java regex.
+ String regex = tablePattern.replace("*", ".*");
+ Pattern pattern = Pattern.compile(regex);
+
+ // List tables from the specific database (namespace) and filter them.
+ return restCatalog.listTables(Namespace.of(dbName)).stream()
+ .map(TableIdentifier::name)
+ .filter(pattern.asPredicate())
+ .toList();
+ }
+
+ @Override
+ public List getAllTables(String catName, String dbName) {
+ return getTables(catName, dbName, "*");
+ }
+
+ @Override
+ public void dropTable(Table table, boolean deleteData, boolean ignoreUnknownTab, boolean ifPurge) throws TException {
+ restCatalog.dropTable(TableIdentifier.of(table.getDbName(), table.getTableName()));
+ }
+
+ private void validateCurrentCatalog(String catName) {
+ if (!restCatalog.name().equals(catName)) {
+ throw new IllegalArgumentException(
+ String.format("Catalog name '%s' does not match the current catalog '%s'", catName, restCatalog.name()));
+ }
+ }
+
+ @Override
+ public boolean tableExists(String catName, String dbName, String tableName) {
+ validateCurrentCatalog(catName);
+ return restCatalog.tableExists(TableIdentifier.of(dbName, tableName));
+ }
+
+ @Override
+ public Database getDatabase(String catName, String dbName) throws NoSuchObjectException {
+ validateCurrentCatalog(catName);
+
+ return restCatalog.listNamespaces(Namespace.empty()).stream()
+ .filter(namespace -> namespace.levels()[0].equals(dbName))
+ .map(namespace -> {
+ Database database = new Database();
+ database.setName(String.join(NAMESPACE_SEPARATOR, namespace.levels()));
+ Map namespaceMetadata = restCatalog.loadNamespaceMetadata(Namespace.of(dbName));
+ database.setLocationUri(namespaceMetadata.get(CatalogUtils.LOCATION));
+ database.setCatalogName(restCatalog.name());
+ database.setOwnerName(namespaceMetadata.get(DB_OWNER));
+ try {
+ database.setOwnerType(PrincipalType.valueOf(namespaceMetadata.get(DB_OWNER_TYPE)));
+ } catch (Exception e) {
+ LOG.warn("Can not set ownerType: {}", namespaceMetadata.get(DB_OWNER_TYPE), e);
+ }
+ return database;
+ }).findFirst().orElseThrow(() ->
+ new NoSuchObjectException("Database " + dbName + " not found"));
+ }
+
+ @Override
+ public Table getTable(GetTableRequest tableRequest) throws TException {
+ validateCurrentCatalog(tableRequest.getCatName());
+ org.apache.iceberg.Table icebergTable;
+ try {
+ icebergTable = restCatalog.loadTable(TableIdentifier.of(tableRequest.getDbName(),
+ tableRequest.getTblName()));
+ } catch (NoSuchTableException exception) {
+ throw new NoSuchObjectException();
+ }
+ return MetastoreUtil.toHiveTable(icebergTable, conf);
+ }
+
+ @Override
+ public void createTable(CreateTableRequest request) throws TException {
+ Table table = request.getTable();
+ List cols = Lists.newArrayList(table.getSd().getCols());
+ if (table.isSetPartitionKeys() && !table.getPartitionKeys().isEmpty()) {
+ cols.addAll(table.getPartitionKeys());
+ }
+ Properties catalogProperties = CatalogUtils.getCatalogProperties(table);
+ Schema schema = HiveSchemaUtil.convert(cols, true);
+ Map envCtxProps = Optional.ofNullable(request.getEnvContext())
+ .map(EnvironmentContext::getProperties)
+ .orElse(Collections.emptyMap());
+ org.apache.iceberg.PartitionSpec partitionSpec =
+ HMSTablePropertyHelper.getPartitionSpec(envCtxProps, schema);
+ SortOrder sortOrder = HMSTablePropertyHelper.getSortOrder(catalogProperties, schema);
+
+ restCatalog.buildTable(TableIdentifier.of(table.getDbName(), table.getTableName()), schema)
+ .withPartitionSpec(partitionSpec)
+ .withLocation(catalogProperties.getProperty(CatalogUtils.LOCATION))
+ .withSortOrder(sortOrder)
+ .withProperties(Maps.fromProperties(catalogProperties))
+ .create();
+ }
+
+ @Override
+ public void createDatabase(Database db) {
+ validateCurrentCatalog(db.getCatalogName());
+ Map props = ImmutableMap.of(
+ CatalogUtils.LOCATION, db.getLocationUri(),
+ DB_OWNER, db.getOwnerName(),
+ DB_OWNER_TYPE, db.getOwnerType().toString()
+ );
+ restCatalog.createNamespace(Namespace.of(db.getName()), props);
+ }
+
+
+ @Override
+ public void dropDatabase(DropDatabaseRequest req) {
+ validateCurrentCatalog(req.getCatalogName());
+ restCatalog.dropNamespace(Namespace.of(req.getName()));
+ }
+}
diff --git a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/client/TestHiveRESTCatalogClient.java b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/client/TestHiveRESTCatalogClient.java
new file mode 100644
index 000000000000..f42e7e3775ee
--- /dev/null
+++ b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/client/TestHiveRESTCatalogClient.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hive.client;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.CreateTableRequest;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PartitionSpecParser;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.hive.HiveSchemaUtil;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.rest.RESTCatalog;
+import org.apache.thrift.TException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+
+public class TestHiveRESTCatalogClient {
+
+ private static HiveRESTCatalogClient spyHiveRESTCatalogClient;
+ private static RESTCatalog mockRestCatalog;
+ private static Catalog.TableBuilder mockTableBuilder;
+ private static MockedStatic mockCatalogUtil;
+
+ private static final TableOperations ops = new TableOperations() {
+ @Override
+ public TableMetadata current() {
+ return TableMetadata.newTableMetadata(new Schema(), PartitionSpec.unpartitioned(), "location",
+ Maps.newHashMap());
+ }
+
+ @Override
+ public TableMetadata refresh() {
+ return null;
+ }
+
+ @Override
+ public void commit(TableMetadata base, TableMetadata metadata) {
+
+ }
+
+ @Override
+ public FileIO io() {
+ return null;
+ }
+
+ @Override
+ public String metadataFileLocation(String fileName) {
+ return null;
+ }
+
+ @Override
+ public LocationProvider locationProvider() {
+ return null;
+ }
+ };
+
+ @BeforeAll
+ public static void before() throws MetaException {
+ Configuration configuration = new Configuration();
+ configuration.set("iceberg.catalog", "ice01");
+ configuration.set("iceberg.catalog.ice01.uri", "http://localhost");
+ mockCatalogUtil = Mockito.mockStatic(CatalogUtil.class);
+ mockRestCatalog = Mockito.mock(RESTCatalog.class);
+ mockCatalogUtil.when(() -> CatalogUtil.buildIcebergCatalog(any(), any(), any())).thenReturn(mockRestCatalog);
+ spyHiveRESTCatalogClient = Mockito.spy(new HiveRESTCatalogClient(configuration));
+ spyHiveRESTCatalogClient.reconnect();
+ }
+
+ @BeforeEach
+ public void resetMocks() {
+ Mockito.reset(mockRestCatalog);
+
+ mockTableBuilder = Mockito.mock(Catalog.TableBuilder.class);
+ Mockito.when(mockTableBuilder.withPartitionSpec(any(PartitionSpec.class))).thenReturn(mockTableBuilder);
+ Mockito.when(mockTableBuilder.withLocation(any())).thenReturn(mockTableBuilder);
+ Mockito.when(mockTableBuilder.withSortOrder(any())).thenReturn(mockTableBuilder);
+ Mockito.when(mockTableBuilder.withProperties(any())).thenReturn(mockTableBuilder);
+ Mockito.doReturn(mockTableBuilder).when(mockRestCatalog).buildTable(any(), any());
+
+ Mockito.doReturn(new BaseTable(ops, "tableName")).when(mockRestCatalog).loadTable(any());
+ Namespace namespace = Namespace.of("default");
+ Mockito.doReturn(Collections.singletonList(namespace)).when(mockRestCatalog).listNamespaces(any());
+ Mockito.doReturn("hive").when(mockRestCatalog).name();
+ Mockito.doReturn(new BaseTable(ops, "tableName")).when(mockRestCatalog).createTable(any(), any(), any(),
+ any());
+ }
+
+ @AfterEach
+ public void after() {
+
+ }
+
+ @Test
+ public void testGetTable() throws TException {
+ spyHiveRESTCatalogClient.getTable("default", "tableName");
+ Mockito.verify(mockRestCatalog).loadTable(TableIdentifier.of("default", "tableName"));
+ }
+
+ @Test
+ public void testCreateTable() throws TException {
+ Table table = new Table();
+ table.setTableName("tableName");
+ table.setDbName("default");
+ table.setSd(new StorageDescriptor());
+ table.getSd().setCols(new LinkedList<>());
+ table.setParameters(Maps.newHashMap());
+ spyHiveRESTCatalogClient.createTable(table);
+ Mockito.verify(mockRestCatalog).buildTable(any(), any());
+ }
+
+ @Test
+ public void testCreatePartitionedTable() throws TException {
+ Table table = new Table();
+ table.setTableName("tableName");
+ table.setDbName("default");
+ table.setParameters(Maps.newHashMap());
+
+ FieldSchema col1 = new FieldSchema("id", "string", "");
+ FieldSchema col2 = new FieldSchema("city", "string", "");
+ List cols = Arrays.asList(col1, col2);
+
+ table.setSd(new StorageDescriptor());
+ table.getSd().setCols(cols);
+
+ Schema schema = HiveSchemaUtil.convert(cols, false);
+ PartitionSpec spec = PartitionSpec.builderFor(schema).identity("city").build();
+ String specString = PartitionSpecParser.toJson(spec);
+
+ CreateTableRequest request = new CreateTableRequest(table);
+ request.setEnvContext(new EnvironmentContext(
+ Map.ofEntries(Map.entry(TableProperties.DEFAULT_PARTITION_SPEC, specString))));
+ spyHiveRESTCatalogClient.createTable(request);
+
+ ArgumentCaptor captor = ArgumentCaptor.forClass(PartitionSpec.class);
+ // Verify buildTable was called,
+ Mockito.verify(mockRestCatalog).buildTable(any(), any());
+
+ // Verify that withPartitionSpec was called
+ Mockito.verify(mockTableBuilder).withPartitionSpec(captor.capture());
+
+ // Assert that the correct PartitionSpec was passed to .withPartitionSpec()
+ PartitionSpec capturedSpec = captor.getValue();
+ assertThat(capturedSpec.isPartitioned()).isTrue();
+ assertThat(capturedSpec.fields()).hasSize(1);
+ assertThat(capturedSpec.fields().getFirst().sourceId()).isEqualTo(schema.findField("city").fieldId());
+ }
+
+ @Test
+ public void testGetDatabase() throws TException {
+ Database aDefault = spyHiveRESTCatalogClient.getDatabase("default");
+ assertThat(aDefault.getName()).isEqualTo("default");
+ Mockito.verify(mockRestCatalog).listNamespaces(Namespace.empty());
+ }
+}
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/Catalogs.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/Catalogs.java
index f65ed35af5c3..e431e0323a45 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/Catalogs.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/Catalogs.java
@@ -24,23 +24,21 @@
import java.util.Properties;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.SortOrder;
-import org.apache.iceberg.SortOrderParser;
import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.hive.CatalogUtils;
+import org.apache.iceberg.hive.HMSTablePropertyHelper;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.parquet.Strings;
/**
* Class for catalog resolution and accessing the common functions for {@link Catalog} API.
@@ -73,6 +71,7 @@ public final class Catalogs {
public static final String SNAPSHOT_REF = "snapshot_ref";
private static final String NO_CATALOG_TYPE = "no catalog";
+
private static final Set PROPERTIES_TO_REMOVE =
ImmutableSet.of(InputFormatConfig.TABLE_SCHEMA, InputFormatConfig.PARTITION_SPEC, LOCATION, NAME,
InputFormatConfig.CATALOG_NAME);
@@ -144,7 +143,7 @@ public static Table createTable(Configuration conf, Properties props) {
Map map = filterIcebergTableProperties(props);
Optional catalog = loadCatalog(conf, catalogName);
- SortOrder sortOrder = getSortOrder(props, schema);
+ SortOrder sortOrder = HMSTablePropertyHelper.getSortOrder(props, schema);
if (catalog.isPresent()) {
String name = props.getProperty(NAME);
Preconditions.checkNotNull(name, "Table identifier not set");
@@ -156,12 +155,6 @@ public static Table createTable(Configuration conf, Properties props) {
return new HadoopTables(conf).create(schema, spec, sortOrder, map, location);
}
- private static SortOrder getSortOrder(Properties props, Schema schema) {
- String sortOrderJsonString = props.getProperty(TableProperties.DEFAULT_SORT_ORDER);
- return Strings.isNullOrEmpty(sortOrderJsonString) ?
- SortOrder.unsorted() : SortOrderParser.fromJson(schema, sortOrderJsonString);
- }
-
/**
* Drops an Iceberg table using the catalog specified by the configuration.
*
- * See {@link Catalogs} documentation for catalog type resolution strategy.
- *
- * @param conf global hive configuration
- * @param catalogName name of the catalog
- * @return type of the catalog, can be null
- */
- private static String getCatalogType(Configuration conf, String catalogName) {
- if (catalogName != null) {
- String catalogType = conf.get(InputFormatConfig.catalogPropertyConfigKey(
- catalogName, CatalogUtil.ICEBERG_CATALOG_TYPE));
- if (catalogName.equals(ICEBERG_HADOOP_TABLE_NAME)) {
- return NO_CATALOG_TYPE;
- } else {
- return catalogType;
- }
- } else {
- String catalogType = conf.get(CatalogUtil.ICEBERG_CATALOG_TYPE);
- if (catalogType != null && catalogType.equals(LOCATION)) {
- return NO_CATALOG_TYPE;
- } else {
- return catalogType;
- }
+ CatalogUtils.getCatalogProperties(conf, name), conf));
}
}
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
index 4898b4b8954f..823c7a37ba37 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
@@ -69,24 +69,14 @@ private InputFormatConfig() {
public static final int COMMIT_TABLE_THREAD_POOL_SIZE_DEFAULT = 10;
public static final String COMMIT_FILE_THREAD_POOL_SIZE = "iceberg.mr.commit.file.thread.pool.size";
public static final int COMMIT_FILE_THREAD_POOL_SIZE_DEFAULT = 10;
- public static final String WRITE_TARGET_FILE_SIZE = "iceberg.mr.write.target.file.size";
public static final String CASE_SENSITIVE = "iceberg.mr.case.sensitive";
public static final boolean CASE_SENSITIVE_DEFAULT = true;
public static final String CATALOG_NAME = "iceberg.catalog";
- public static final String HADOOP_CATALOG = "hadoop.catalog";
- public static final String HADOOP_TABLES = "hadoop.tables";
public static final String HIVE_CATALOG = "hive.catalog";
- public static final String ICEBERG_SNAPSHOTS_TABLE_SUFFIX = ".snapshots";
- public static final String SNAPSHOT_TABLE = "iceberg.snapshots.table";
- public static final String SNAPSHOT_TABLE_SUFFIX = "__snapshots";
public static final String CATALOG_CONFIG_PREFIX = "iceberg.catalog.";
- public static final String CATALOG_TYPE_TEMPLATE = "iceberg.catalog.%s.type";
- public static final String CATALOG_WAREHOUSE_TEMPLATE = "iceberg.catalog.%s.warehouse";
- public static final String CATALOG_CLASS_TEMPLATE = "iceberg.catalog.%s.catalog-impl";
- public static final String CATALOG_DEFAULT_CONFIG_PREFIX = "iceberg.catalog-default.";
public enum InMemoryDataModel {
HIVE,
@@ -215,18 +205,6 @@ public static boolean fetchVirtualColumns(Configuration conf) {
return conf.getBoolean(InputFormatConfig.FETCH_VIRTUAL_COLUMNS, false);
}
- /**
- * Get Hadoop config key of a catalog property based on catalog name
- * @param catalogName catalog name
- * @param catalogProperty catalog property, can be any custom property,
- * a commonly used list of properties can be found
- * at {@link org.apache.iceberg.CatalogProperties}
- * @return Hadoop config key of a catalog property for the catalog name
- */
- public static String catalogPropertyConfigKey(String catalogName, String catalogProperty) {
- return String.format("%s%s.%s", CATALOG_CONFIG_PREFIX, catalogName, catalogProperty);
- }
-
private static Schema schema(Configuration conf, String key) {
String json = conf.get(key);
return json == null ? null : SchemaParser.fromJson(json);
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/BaseHiveIcebergMetaHook.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/BaseHiveIcebergMetaHook.java
new file mode 100644
index 000000000000..e160deebf418
--- /dev/null
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/BaseHiveIcebergMetaHook.java
@@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.HiveMetaHook;
+import org.apache.hadoop.hive.metastore.api.CreateTableRequest;
+import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.ddl.misc.sortoder.SortFieldDesc;
+import org.apache.hadoop.hive.ql.ddl.misc.sortoder.SortFields;
+import org.apache.hadoop.hive.ql.util.NullOrdering;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.NullOrder;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PartitionSpecParser;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.SortDirection;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.SortOrderParser;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.hive.CatalogUtils;
+import org.apache.iceberg.hive.HMSTablePropertyHelper;
+import org.apache.iceberg.hive.HiveSchemaUtil;
+import org.apache.iceberg.mr.Catalogs;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
+import org.apache.iceberg.relocated.com.google.common.base.Strings;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.RowLevelOperationMode.MERGE_ON_READ;
+
+public class BaseHiveIcebergMetaHook implements HiveMetaHook {
+ private static final Logger LOG = LoggerFactory.getLogger(BaseHiveIcebergMetaHook.class);
+ private static final ObjectMapper JSON_OBJECT_MAPPER = new ObjectMapper();
+ public static final Map COMMON_HMS_PROPERTIES = ImmutableMap.of(
+ BaseMetastoreTableOperations.TABLE_TYPE_PROP, BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.toUpperCase()
+ );
+ private static final Set PARAMETERS_TO_REMOVE = ImmutableSet
+ .of(InputFormatConfig.TABLE_SCHEMA, Catalogs.LOCATION, Catalogs.NAME, InputFormatConfig.PARTITION_SPEC);
+ static final String ORC_FILES_ONLY = "iceberg.orc.files.only";
+
+ protected final Configuration conf;
+ protected Table icebergTable = null;
+ protected Properties catalogProperties;
+ protected boolean createHMSTableInHook = false;
+
+ public enum FileFormat {
+ ORC("orc"), PARQUET("parquet"), AVRO("avro");
+
+ private final String label;
+
+ FileFormat(String label) {
+ this.label = label;
+ }
+
+ public String getLabel() {
+ return label;
+ }
+ }
+
+ public BaseHiveIcebergMetaHook(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public void preCreateTable(org.apache.hadoop.hive.metastore.api.Table hmsTable) {
+ CreateTableRequest request = new CreateTableRequest(hmsTable);
+ preCreateTable(request);
+ }
+
+ @Override
+ public void preCreateTable(CreateTableRequest request) {
+ org.apache.hadoop.hive.metastore.api.Table hmsTable = request.getTable();
+ if (hmsTable.isTemporary()) {
+ throw new UnsupportedOperationException("Creation of temporary iceberg tables is not supported.");
+ }
+ this.catalogProperties = CatalogUtils.getCatalogProperties(hmsTable);
+
+ // Set the table type even for non HiveCatalog based tables
+ hmsTable.getParameters().put(BaseMetastoreTableOperations.TABLE_TYPE_PROP,
+ BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.toUpperCase());
+
+ if (!Catalogs.hiveCatalog(conf, catalogProperties)) {
+ if (Boolean.parseBoolean(this.catalogProperties.getProperty(hive_metastoreConstants.TABLE_IS_CTLT))) {
+ throw new UnsupportedOperationException("CTLT target table must be a HiveCatalog table.");
+ }
+ // For non-HiveCatalog tables too, we should set the input and output format
+ // so that the table can be read by other engines like Impala
+ hmsTable.getSd().setInputFormat(HiveIcebergInputFormat.class.getCanonicalName());
+ hmsTable.getSd().setOutputFormat(HiveIcebergOutputFormat.class.getCanonicalName());
+
+ // If not using HiveCatalog check for existing table
+ try {
+ this.icebergTable = IcebergTableUtil.getTable(conf, catalogProperties, true);
+
+ if (CatalogUtils.hadoopCatalog(conf, catalogProperties) && hmsTable.getSd() != null &&
+ hmsTable.getSd().getLocation() == null) {
+ hmsTable.getSd().setLocation(icebergTable.location());
+ }
+ Preconditions.checkArgument(catalogProperties.getProperty(InputFormatConfig.TABLE_SCHEMA) == null,
+ "Iceberg table already created - can not use provided schema");
+ Preconditions.checkArgument(catalogProperties.getProperty(InputFormatConfig.PARTITION_SPEC) == null,
+ "Iceberg table already created - can not use provided partition specification");
+
+ LOG.info("Iceberg table already exists {}", icebergTable);
+ return;
+ } catch (NoSuchTableException nte) {
+ // If the table does not exist we will create it below
+ }
+ }
+
+ // If the table does not exist collect data for table creation
+ // - InputFormatConfig.TABLE_SCHEMA, InputFormatConfig.PARTITION_SPEC takes precedence so the user can override the
+ // Iceberg schema and specification generated by the code
+
+ Set identifierFields = Optional.ofNullable(request.getPrimaryKeys())
+ .map(primaryKeys ->
+ primaryKeys.stream().map(SQLPrimaryKey::getColumn_name).collect(Collectors.toSet()))
+ .orElse(Collections.emptySet());
+
+ Schema schema = schema(catalogProperties, hmsTable, identifierFields);
+ PartitionSpec spec = spec(conf, schema, hmsTable);
+
+ // If there are partition keys specified remove them from the HMS table and add them to the column list
+ if (hmsTable.isSetPartitionKeys()) {
+ hmsTable.getSd().getCols().addAll(hmsTable.getPartitionKeys());
+ hmsTable.setPartitionKeysIsSet(false);
+ }
+
+ catalogProperties.put(InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(schema));
+ String specString = PartitionSpecParser.toJson(spec);
+ catalogProperties.put(InputFormatConfig.PARTITION_SPEC, specString);
+ validateCatalogConfigsDefined();
+
+ if (request.getEnvContext() == null) {
+ request.setEnvContext(new EnvironmentContext());
+ }
+ request.getEnvContext().putToProperties(TableProperties.DEFAULT_PARTITION_SPEC, specString);
+ setCommonHmsTablePropertiesForIceberg(hmsTable);
+
+ if (hmsTable.getParameters().containsKey(BaseMetastoreTableOperations.METADATA_LOCATION_PROP)) {
+ createHMSTableInHook = true;
+ }
+
+ assertFileFormat(catalogProperties.getProperty(TableProperties.DEFAULT_FILE_FORMAT));
+
+ // Set whether the format is ORC, to be used during vectorization.
+ setOrcOnlyFilesParam(hmsTable);
+ // Remove hive primary key columns from table request, as iceberg doesn't support hive primary key.
+ request.setPrimaryKeys(null);
+ setSortOrder(hmsTable, schema, catalogProperties);
+ }
+
+ /**
+ * Method for verification that necessary catalog configs are defined in Session Conf.
+ *
+ *
If the catalog name is provided in 'iceberg.catalog' table property,
+ * and the name is not the default catalog and not hadoop catalog, checks that one of the two configs
+ * is defined in Session Conf: iceberg.catalog.catalogName.type
+ * or iceberg.catalog.catalogName.catalog-impl. See description in Catalogs.java for more details.
+ *
+ */
+ private void validateCatalogConfigsDefined() {
+ String catalogName = catalogProperties.getProperty(InputFormatConfig.CATALOG_NAME);
+ if (!StringUtils.isEmpty(catalogName) && !Catalogs.ICEBERG_HADOOP_TABLE_NAME.equals(catalogName)) {
+
+ boolean configsExist = !StringUtils.isEmpty(CatalogUtils.getCatalogType(conf, catalogName)) ||
+ !StringUtils.isEmpty(CatalogUtils.getCatalogImpl(conf, catalogName));
+
+ Preconditions.checkArgument(configsExist, "Catalog type or impl must be set for catalog: %s", catalogName);
+ }
+ }
+
+ private void setSortOrder(org.apache.hadoop.hive.metastore.api.Table hmsTable, Schema schema,
+ Properties properties) {
+ String sortOderJSONString = hmsTable.getParameters().get(TableProperties.DEFAULT_SORT_ORDER);
+ SortFields sortFields = null;
+ if (!Strings.isNullOrEmpty(sortOderJSONString)) {
+ try {
+ sortFields = JSON_OBJECT_MAPPER.reader().readValue(sortOderJSONString, SortFields.class);
+ } catch (Exception e) {
+ LOG.warn("Can not read write order json: {}", sortOderJSONString, e);
+ return;
+ }
+ if (sortFields != null && !sortFields.getSortFields().isEmpty()) {
+ SortOrder.Builder sortOderBuilder = SortOrder.builderFor(schema);
+ sortFields.getSortFields().forEach(fieldDesc -> {
+ NullOrder nullOrder = fieldDesc.getNullOrdering() == NullOrdering.NULLS_FIRST ?
+ NullOrder.NULLS_FIRST : NullOrder.NULLS_LAST;
+ SortDirection sortDirection = fieldDesc.getDirection() == SortFieldDesc.SortDirection.ASC ?
+ SortDirection.ASC : SortDirection.DESC;
+ sortOderBuilder.sortBy(fieldDesc.getColumnName(), sortDirection, nullOrder);
+ });
+ properties.put(TableProperties.DEFAULT_SORT_ORDER, SortOrderParser.toJson(sortOderBuilder.build()));
+ }
+ }
+ }
+
+ @Override
+ public void rollbackCreateTable(org.apache.hadoop.hive.metastore.api.Table hmsTable) {
+ // do nothing
+ }
+
+ @Override
+ public void commitCreateTable(org.apache.hadoop.hive.metastore.api.Table hmsTable) {
+ // do nothing
+ }
+
+ @Override
+ public void preDropTable(org.apache.hadoop.hive.metastore.api.Table hmsTable) {
+ // do nothing
+ }
+
+ @Override
+ public void rollbackDropTable(org.apache.hadoop.hive.metastore.api.Table hmsTable) {
+ // do nothing
+ }
+
+ @Override
+ public void commitDropTable(org.apache.hadoop.hive.metastore.api.Table hmsTable, boolean deleteData) {
+ // do nothing
+ }
+
+ @Override
+ public boolean createHMSTableInHook() {
+ return createHMSTableInHook;
+ }
+
+ private static void assertFileFormat(String format) {
+ if (format == null) {
+ return;
+ }
+ String lowerCaseFormat = format.toLowerCase();
+ Preconditions.checkArgument(Arrays.stream(FileFormat.values()).anyMatch(v -> lowerCaseFormat.contains(v.label)),
+ String.format("Unsupported fileformat %s", format));
+ }
+
+ protected void setCommonHmsTablePropertiesForIceberg(org.apache.hadoop.hive.metastore.api.Table hmsTable) {
+ if (CatalogUtils.isHadoopTable(conf, catalogProperties)) {
+ String location = (hmsTable.getSd() != null) ? hmsTable.getSd().getLocation() : null;
+ if (location == null && CatalogUtils.hadoopCatalog(conf, catalogProperties)) {
+ location = IcebergTableUtil.defaultWarehouseLocation(
+ TableIdentifier.of(hmsTable.getDbName(), hmsTable.getTableName()),
+ conf, catalogProperties);
+ hmsTable.getSd().setLocation(location);
+ }
+ Preconditions.checkArgument(location != null, "Table location not set");
+ }
+
+ Map hmsParams = hmsTable.getParameters();
+ COMMON_HMS_PROPERTIES.forEach(hmsParams::putIfAbsent);
+
+ // Remove null values from hms table properties
+ hmsParams.entrySet().removeIf(e -> e.getKey() == null || e.getValue() == null);
+
+ // Remove creation related properties
+ PARAMETERS_TO_REMOVE.forEach(hmsParams::remove);
+
+ setWriteModeDefaults(null, hmsParams, null);
+ }
+
+ protected Schema schema(Properties properties, org.apache.hadoop.hive.metastore.api.Table hmsTable,
+ Set identifierFields) {
+ boolean autoConversion = conf.getBoolean(InputFormatConfig.SCHEMA_AUTO_CONVERSION, false);
+
+ if (properties.getProperty(InputFormatConfig.TABLE_SCHEMA) != null) {
+ return SchemaParser.fromJson(properties.getProperty(InputFormatConfig.TABLE_SCHEMA));
+ }
+ List cols = Lists.newArrayList(hmsTable.getSd().getCols());
+ if (hmsTable.isSetPartitionKeys() && !hmsTable.getPartitionKeys().isEmpty()) {
+ cols.addAll(hmsTable.getPartitionKeys());
+ }
+ Schema schema = HiveSchemaUtil.convert(cols, autoConversion);
+
+ return getSchemaWithIdentifierFields(schema, identifierFields);
+ }
+
+ private Schema getSchemaWithIdentifierFields(Schema schema, Set identifierFields) {
+ if (identifierFields == null || identifierFields.isEmpty()) {
+ return schema;
+ }
+ Set identifierFieldIds = identifierFields.stream()
+ .map(column -> {
+ Types.NestedField field = schema.findField(column);
+ Preconditions.checkNotNull(field,
+ "Cannot find identifier field ID for the column %s in schema %s", column, schema);
+ return field.fieldId();
+ })
+ .collect(Collectors.toSet());
+
+ List cols = schema.columns().stream()
+ .map(column -> identifierFieldIds.contains(column.fieldId()) ? column.asRequired() : column)
+ .toList();
+
+ return new Schema(cols, identifierFieldIds);
+ }
+
+ protected static PartitionSpec spec(Configuration configuration, Schema schema,
+ org.apache.hadoop.hive.metastore.api.Table hmsTable) {
+
+ Preconditions.checkArgument(!hmsTable.isSetPartitionKeys() || hmsTable.getPartitionKeys().isEmpty(),
+ "We can only handle non-partitioned Hive tables. The Iceberg schema should be in " +
+ InputFormatConfig.PARTITION_SPEC + " or already converted to a partition transform ");
+
+ PartitionSpec spec = IcebergTableUtil.spec(configuration, schema);
+ if (spec != null) {
+ Preconditions.checkArgument(hmsTable.getParameters().get(InputFormatConfig.PARTITION_SPEC) == null,
+ "Provide only one of the following: Hive partition transform specification, or the " +
+ InputFormatConfig.PARTITION_SPEC + " property");
+ return spec;
+ }
+
+ return HMSTablePropertyHelper.getPartitionSpec(hmsTable.getParameters(), schema);
+ }
+
+ protected void setOrcOnlyFilesParam(org.apache.hadoop.hive.metastore.api.Table hmsTable) {
+ hmsTable.getParameters().put(ORC_FILES_ONLY, String.valueOf(isOrcOnlyFiles(hmsTable)));
+ }
+
+ protected boolean isOrcOnlyFiles(org.apache.hadoop.hive.metastore.api.Table hmsTable) {
+ return !"FALSE".equalsIgnoreCase(hmsTable.getParameters().get(ORC_FILES_ONLY)) &&
+ (hmsTable.getSd().getInputFormat() != null &&
+ hmsTable.getSd().getInputFormat().toUpperCase().contains(org.apache.iceberg.FileFormat.ORC.name()) ||
+ org.apache.iceberg.FileFormat.ORC.name()
+ .equalsIgnoreCase(hmsTable.getSd().getSerdeInfo().getParameters()
+ .get(TableProperties.DEFAULT_FILE_FORMAT)) ||
+ org.apache.iceberg.FileFormat.ORC.name()
+ .equalsIgnoreCase(hmsTable.getParameters().get(TableProperties.DEFAULT_FILE_FORMAT)));
+ }
+
+ protected void setWriteModeDefaults(Table icebergTbl, Map newProps, EnvironmentContext context) {
+ if ((icebergTbl == null || ((BaseTable) icebergTbl).operations().current().formatVersion() == 1) &&
+ IcebergTableUtil.isV2TableOrAbove(newProps)) {
+ List writeModeList = ImmutableList.of(
+ TableProperties.DELETE_MODE, TableProperties.UPDATE_MODE, TableProperties.MERGE_MODE);
+ writeModeList.stream()
+ .filter(writeMode -> catalogProperties.get(writeMode) == null)
+ .forEach(writeMode -> {
+ catalogProperties.put(writeMode, MERGE_ON_READ.modeName());
+ newProps.put(writeMode, MERGE_ON_READ.modeName());
+ });
+
+ if (context != null) {
+ Splitter splitter = Splitter.on(PROPERTIES_SEPARATOR);
+ Map contextProperties = context.getProperties();
+ if (contextProperties.containsKey(SET_PROPERTIES)) {
+ String propValue = context.getProperties().get(SET_PROPERTIES);
+ String writeModeStr = writeModeList.stream()
+ .filter(writeMode -> !splitter.splitToList(propValue).contains(writeMode))
+ .collect(Collectors.joining("'"));
+ if (!writeModeStr.isEmpty()) {
+ contextProperties.put(SET_PROPERTIES, propValue + "'" + writeModeStr);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void postGetTable(org.apache.hadoop.hive.metastore.api.Table hmsTable) {
+ if (hmsTable != null) {
+ try {
+ Table tbl = IcebergTableUtil.getTable(conf, hmsTable);
+ String formatVersion = String.valueOf(((BaseTable) tbl).operations().current().formatVersion());
+ hmsTable.getParameters().put(TableProperties.FORMAT_VERSION, formatVersion);
+ // Set the serde info
+ hmsTable.getSd().setInputFormat(HiveIcebergInputFormat.class.getName());
+ hmsTable.getSd().setOutputFormat(HiveIcebergOutputFormat.class.getName());
+ hmsTable.getSd().getSerdeInfo().setSerializationLib(HiveIcebergSerDe.class.getName());
+ String storageHandler = hmsTable.getParameters().get(hive_metastoreConstants.META_TABLE_STORAGE);
+ // Check if META_TABLE_STORAGE is not present or is not an instance of ICEBERG_STORAGE_HANDLER
+ if (storageHandler == null || !isHiveIcebergStorageHandler(storageHandler)) {
+ hmsTable.getParameters()
+ .put(hive_metastoreConstants.META_TABLE_STORAGE, HMSTablePropertyHelper.HIVE_ICEBERG_STORAGE_HANDLER);
+ }
+ } catch (NoSuchTableException | NotFoundException ex) {
+ // If the table doesn't exist, ignore throwing exception from here
+ }
+ }
+ }
+
+ private static boolean isHiveIcebergStorageHandler(String storageHandler) {
+ try {
+ Class> storageHandlerClass = Class.forName(storageHandler);
+ return Class.forName(HIVE_ICEBERG_STORAGE_HANDLER).isAssignableFrom(storageHandlerClass);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("Error checking storage handler class", e);
+ }
+ }
+}
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
index a7389f7f20f5..aec7ab7b03b3 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
@@ -19,11 +19,9 @@
package org.apache.iceberg.mr.hive;
-import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
@@ -31,8 +29,6 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
-import java.util.Properties;
-import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
@@ -44,13 +40,11 @@
import org.apache.hadoop.hive.metastore.HiveMetaHook;
import org.apache.hadoop.hive.metastore.PartitionDropOptions;
import org.apache.hadoop.hive.metastore.Warehouse;
-import org.apache.hadoop.hive.metastore.api.CreateTableRequest;
import org.apache.hadoop.hive.metastore.api.DropPartitionsExpr;
import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.RequestPartsSpec;
-import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
@@ -58,8 +52,6 @@
import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.QueryState;
-import org.apache.hadoop.hive.ql.ddl.misc.sortoder.SortFieldDesc;
-import org.apache.hadoop.hive.ql.ddl.misc.sortoder.SortFields;
import org.apache.hadoop.hive.ql.ddl.table.AlterTableType;
import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.io.AcidUtils;
@@ -76,7 +68,6 @@
import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionStateUtil;
-import org.apache.hadoop.hive.ql.util.NullOrdering;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
@@ -92,7 +83,6 @@
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.MetadataTableUtils;
-import org.apache.iceberg.NullOrder;
import org.apache.iceberg.PartitionData;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
@@ -100,9 +90,6 @@
import org.apache.iceberg.PartitionsTable;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
-import org.apache.iceberg.SortDirection;
-import org.apache.iceberg.SortOrder;
-import org.apache.iceberg.SortOrderParser;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
@@ -113,14 +100,13 @@
import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchTableException;
-import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.expressions.UnboundPredicate;
import org.apache.iceberg.expressions.UnboundTerm;
import org.apache.iceberg.hive.CachedClientPool;
-import org.apache.iceberg.hive.HMSTablePropertyHelper;
+import org.apache.iceberg.hive.CatalogUtils;
import org.apache.iceberg.hive.HiveLock;
import org.apache.iceberg.hive.HiveSchemaUtil;
import org.apache.iceberg.hive.HiveTableOperations;
@@ -135,10 +121,8 @@
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Splitter;
-import org.apache.iceberg.relocated.com.google.common.base.Strings;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
@@ -151,21 +135,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.iceberg.RowLevelOperationMode.MERGE_ON_READ;
-
-public class HiveIcebergMetaHook implements HiveMetaHook {
+public class HiveIcebergMetaHook extends BaseHiveIcebergMetaHook {
private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergMetaHook.class);
- private static final ObjectMapper JSON_OBJECT_MAPPER = new ObjectMapper();
- public static final Map COMMON_HMS_PROPERTIES = ImmutableMap.of(
- BaseMetastoreTableOperations.TABLE_TYPE_PROP, BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.toUpperCase()
- );
- private static final Set PARAMETERS_TO_REMOVE = ImmutableSet
- .of(InputFormatConfig.TABLE_SCHEMA, Catalogs.LOCATION, Catalogs.NAME, InputFormatConfig.PARTITION_SPEC);
- private static final Set PROPERTIES_TO_REMOVE = ImmutableSet
- // We don't want to push down the metadata location props to Iceberg from HMS,
- // since the snapshot pointer in HMS would always be one step ahead
- .of(BaseMetastoreTableOperations.METADATA_LOCATION_PROP,
- BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP);
static final EnumSet SUPPORTED_ALTER_OPS = EnumSet.of(
AlterTableType.ADDCOLS, AlterTableType.REPLACE_COLUMNS, AlterTableType.RENAME_COLUMN, AlterTableType.DROP_COLUMN,
AlterTableType.ADDPROPS, AlterTableType.DROPPROPS, AlterTableType.SETPARTITIONSPEC,
@@ -180,14 +151,9 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
private static final List> EMPTY_FILTER =
Lists.newArrayList(org.apache.commons.lang3.tuple.Pair.of(1, new byte[0]));
static final String MIGRATED_TO_ICEBERG = "MIGRATED_TO_ICEBERG";
- static final String ORC_FILES_ONLY = "iceberg.orc.files.only";
-
static final String DECIMAL64_VECTORIZATION = "iceberg.decimal64.vectorization";
static final String MANUAL_ICEBERG_METADATA_LOCATION_CHANGE = "MANUAL_ICEBERG_METADATA_LOCATION_CHANGE";
- private final Configuration conf;
- private Table icebergTable = null;
- private Properties catalogProperties;
private boolean deleteIcebergTable;
private FileIO deleteIo;
private TableMetadata deleteMetadata;
@@ -196,127 +162,10 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
private UpdateSchema updateSchema;
private Transaction transaction;
private AlterTableType currentAlterTableOp;
- private boolean createHMSTableInHook = false;
private HiveLock commitLock;
- private enum FileFormat {
- ORC("orc"), PARQUET("parquet"), AVRO("avro");
-
- private final String label;
-
- FileFormat(String label) {
- this.label = label;
- }
- }
-
public HiveIcebergMetaHook(Configuration conf) {
- this.conf = conf;
- }
-
- @Override
- public void preCreateTable(org.apache.hadoop.hive.metastore.api.Table hmsTable) {
- CreateTableRequest request = new CreateTableRequest(hmsTable);
- preCreateTable(request);
- }
- @Override
- public void preCreateTable(CreateTableRequest request) {
- org.apache.hadoop.hive.metastore.api.Table hmsTable = request.getTable();
- if (hmsTable.isTemporary()) {
- throw new UnsupportedOperationException("Creation of temporary iceberg tables is not supported.");
- }
- this.catalogProperties = getCatalogProperties(hmsTable);
-
- // Set the table type even for non HiveCatalog based tables
- hmsTable.getParameters().put(BaseMetastoreTableOperations.TABLE_TYPE_PROP,
- BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.toUpperCase());
-
- if (!Catalogs.hiveCatalog(conf, catalogProperties)) {
- if (Boolean.parseBoolean(this.catalogProperties.getProperty(hive_metastoreConstants.TABLE_IS_CTLT))) {
- throw new UnsupportedOperationException("CTLT target table must be a HiveCatalog table.");
- }
- // For non-HiveCatalog tables too, we should set the input and output format
- // so that the table can be read by other engines like Impala
- hmsTable.getSd().setInputFormat(HiveIcebergInputFormat.class.getCanonicalName());
- hmsTable.getSd().setOutputFormat(HiveIcebergOutputFormat.class.getCanonicalName());
-
- // If not using HiveCatalog check for existing table
- try {
- this.icebergTable = IcebergTableUtil.getTable(conf, catalogProperties, true);
-
- if (Catalogs.hadoopCatalog(conf, catalogProperties) && hmsTable.getSd() != null &&
- hmsTable.getSd().getLocation() == null) {
- hmsTable.getSd().setLocation(icebergTable.location());
- }
- Preconditions.checkArgument(catalogProperties.getProperty(InputFormatConfig.TABLE_SCHEMA) == null,
- "Iceberg table already created - can not use provided schema");
- Preconditions.checkArgument(catalogProperties.getProperty(InputFormatConfig.PARTITION_SPEC) == null,
- "Iceberg table already created - can not use provided partition specification");
-
- LOG.info("Iceberg table already exists {}", icebergTable);
- return;
- } catch (NoSuchTableException nte) {
- // If the table does not exist we will create it below
- }
- }
-
- // If the table does not exist collect data for table creation
- // - InputFormatConfig.TABLE_SCHEMA, InputFormatConfig.PARTITION_SPEC takes precedence so the user can override the
- // Iceberg schema and specification generated by the code
-
- Set identifierFields = Optional.ofNullable(request.getPrimaryKeys())
- .map(primaryKeys ->
- primaryKeys.stream().map(SQLPrimaryKey::getColumn_name).collect(Collectors.toSet()))
- .orElse(Collections.emptySet());
-
- Schema schema = schema(catalogProperties, hmsTable, identifierFields);
- PartitionSpec spec = spec(conf, schema, hmsTable);
-
- // If there are partition keys specified remove them from the HMS table and add them to the column list
- if (hmsTable.isSetPartitionKeys()) {
- hmsTable.getSd().getCols().addAll(hmsTable.getPartitionKeys());
- hmsTable.setPartitionKeysIsSet(false);
- }
-
- catalogProperties.put(InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(schema));
- catalogProperties.put(InputFormatConfig.PARTITION_SPEC, PartitionSpecParser.toJson(spec));
- setCommonHmsTablePropertiesForIceberg(hmsTable);
-
- if (hmsTable.getParameters().containsKey(BaseMetastoreTableOperations.METADATA_LOCATION_PROP)) {
- createHMSTableInHook = true;
- }
-
- assertFileFormat(catalogProperties.getProperty(TableProperties.DEFAULT_FILE_FORMAT));
-
- // Set whether the format is ORC, to be used during vectorization.
- setOrcOnlyFilesParam(hmsTable);
- // Remove hive primary key columns from table request, as iceberg doesn't support hive primary key.
- request.setPrimaryKeys(null);
- setSortOrder(hmsTable, schema, catalogProperties);
- }
-
- private void setSortOrder(org.apache.hadoop.hive.metastore.api.Table hmsTable, Schema schema,
- Properties properties) {
- String sortOderJSONString = hmsTable.getParameters().get(TableProperties.DEFAULT_SORT_ORDER);
- SortFields sortFields = null;
- if (!Strings.isNullOrEmpty(sortOderJSONString)) {
- try {
- sortFields = JSON_OBJECT_MAPPER.reader().readValue(sortOderJSONString, SortFields.class);
- } catch (Exception e) {
- LOG.warn("Can not read write order json: {}", sortOderJSONString, e);
- return;
- }
- if (sortFields != null && !sortFields.getSortFields().isEmpty()) {
- SortOrder.Builder sortOderBuilder = SortOrder.builderFor(schema);
- sortFields.getSortFields().forEach(fieldDesc -> {
- NullOrder nullOrder = fieldDesc.getNullOrdering() == NullOrdering.NULLS_FIRST ?
- NullOrder.NULLS_FIRST : NullOrder.NULLS_LAST;
- SortDirection sortDirection = fieldDesc.getDirection() == SortFieldDesc.SortDirection.ASC ?
- SortDirection.ASC : SortDirection.DESC;
- sortOderBuilder.sortBy(fieldDesc.getColumnName(), sortDirection, nullOrder);
- });
- properties.put(TableProperties.DEFAULT_SORT_ORDER, SortOrderParser.toJson(sortOderBuilder.build()));
- }
- }
+ super(conf);
}
@Override
@@ -358,7 +207,7 @@ public void preDropTable(org.apache.hadoop.hive.metastore.api.Table hmsTable) {
@Override
public void preDropTable(org.apache.hadoop.hive.metastore.api.Table hmsTable, boolean deleteData) {
- this.catalogProperties = getCatalogProperties(hmsTable);
+ this.catalogProperties = CatalogUtils.getCatalogProperties(hmsTable);
this.deleteIcebergTable = hmsTable.getParameters() != null &&
"TRUE".equalsIgnoreCase(hmsTable.getParameters().get(InputFormatConfig.EXTERNAL_TABLE_PURGE));
@@ -408,7 +257,7 @@ public void commitDropTable(org.apache.hadoop.hive.metastore.api.Table hmsTable,
@Override
public void preAlterTable(org.apache.hadoop.hive.metastore.api.Table hmsTable, EnvironmentContext context)
throws MetaException {
- catalogProperties = getCatalogProperties(hmsTable);
+ catalogProperties = CatalogUtils.getCatalogProperties(hmsTable);
setupAlterOperationType(hmsTable, context);
if (AlterTableType.RENAME.equals(currentAlterTableOp)) {
catalogProperties.put(Catalogs.NAME, TableIdentifier.of(context.getProperties().get(OLD_DB_NAME),
@@ -637,7 +486,7 @@ public void commitAlterTable(org.apache.hadoop.hive.metastore.api.Table hmsTable
}
commitLock.unlock();
if (isTableMigration) {
- catalogProperties = getCatalogProperties(hmsTable);
+ catalogProperties = CatalogUtils.getCatalogProperties(hmsTable);
catalogProperties.put(InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(preAlterTableProperties.schema));
catalogProperties.put(InputFormatConfig.PARTITION_SPEC, PartitionSpecParser.toJson(preAlterTableProperties.spec));
setFileFormat(preAlterTableProperties.format);
@@ -678,7 +527,7 @@ public void rollbackAlterTable(org.apache.hadoop.hive.metastore.api.Table hmsTab
LOG.debug("Initiating rollback for table {} at location {}",
hmsTable.getTableName(), hmsTable.getSd().getLocation());
context.getProperties().put(INITIALIZE_ROLLBACK_MIGRATION, "true");
- this.catalogProperties = getCatalogProperties(hmsTable);
+ this.catalogProperties = CatalogUtils.getCatalogProperties(hmsTable);
try {
this.icebergTable = Catalogs.loadTable(conf, catalogProperties);
} catch (NoSuchTableException nte) {
@@ -703,7 +552,7 @@ public void rollbackAlterTable(org.apache.hadoop.hive.metastore.api.Table hmsTab
public void preTruncateTable(org.apache.hadoop.hive.metastore.api.Table table, EnvironmentContext context,
List partNames)
throws MetaException {
- this.catalogProperties = getCatalogProperties(table);
+ this.catalogProperties = CatalogUtils.getCatalogProperties(table);
this.icebergTable = Catalogs.loadTable(conf, catalogProperties);
Map partitionFieldMap = icebergTable.spec().fields().stream()
.collect(Collectors.toMap(PartitionField::name, Function.identity()));
@@ -796,149 +645,12 @@ private void setFileFormat(String format) {
String lowerCaseFormat = format.toLowerCase();
for (FileFormat fileFormat : FileFormat.values()) {
- if (lowerCaseFormat.contains(fileFormat.label)) {
- catalogProperties.put(TableProperties.DEFAULT_FILE_FORMAT, fileFormat.label);
+ if (lowerCaseFormat.contains(fileFormat.getLabel())) {
+ catalogProperties.put(TableProperties.DEFAULT_FILE_FORMAT, fileFormat.getLabel());
}
}
}
- private void assertFileFormat(String format) {
- if (format == null) {
- return;
- }
- String lowerCaseFormat = format.toLowerCase();
- Preconditions.checkArgument(Arrays.stream(FileFormat.values()).anyMatch(v -> lowerCaseFormat.contains(v.label)),
- String.format("Unsupported fileformat %s", format));
- }
-
- private void setCommonHmsTablePropertiesForIceberg(org.apache.hadoop.hive.metastore.api.Table hmsTable) {
- // If the table is not managed by Hive or Hadoop catalog, then the location should be set
- if (!Catalogs.hiveCatalog(conf, catalogProperties)) {
- String location = (hmsTable.getSd() != null) ? hmsTable.getSd().getLocation() : null;
- if (location == null && Catalogs.hadoopCatalog(conf, catalogProperties)) {
- location = IcebergTableUtil.defaultWarehouseLocation(
- TableIdentifier.of(hmsTable.getDbName(), hmsTable.getTableName()),
- conf, catalogProperties);
- hmsTable.getSd().setLocation(location);
- }
- Preconditions.checkArgument(location != null, "Table location not set");
- }
-
- Map hmsParams = hmsTable.getParameters();
- COMMON_HMS_PROPERTIES.forEach(hmsParams::putIfAbsent);
-
- // Remove null values from hms table properties
- hmsParams.entrySet().removeIf(e -> e.getKey() == null || e.getValue() == null);
-
- // Remove creation related properties
- PARAMETERS_TO_REMOVE.forEach(hmsParams::remove);
-
- setWriteModeDefaults(null, hmsParams, null);
- }
-
- /**
- * Calculates the properties we would like to send to the catalog.
- *
- *
The base of the properties is the properties stored at the Hive Metastore for the given table
- *
We add the {@link Catalogs#LOCATION} as the table location
- *
We add the {@link Catalogs#NAME} as TableIdentifier defined by the database name and table name
- *
We add the serdeProperties of the HMS table
- *
We remove some parameters that we don't want to push down to the Iceberg table props
- *
- * @param hmsTable Table for which we are calculating the properties
- * @return The properties we can provide for Iceberg functions, like {@link Catalogs}
- */
- private static Properties getCatalogProperties(org.apache.hadoop.hive.metastore.api.Table hmsTable) {
- Properties properties = new Properties();
-
- hmsTable.getParameters().entrySet().stream().filter(e -> e.getKey() != null && e.getValue() != null).forEach(e -> {
- // translate key names between HMS and Iceberg where needed
- String icebergKey = HMSTablePropertyHelper.translateToIcebergProp(e.getKey());
- properties.put(icebergKey, e.getValue());
- });
-
- if (properties.get(Catalogs.LOCATION) == null &&
- hmsTable.getSd() != null && hmsTable.getSd().getLocation() != null) {
- properties.put(Catalogs.LOCATION, hmsTable.getSd().getLocation());
- }
-
- if (properties.get(Catalogs.NAME) == null) {
- properties.put(Catalogs.NAME, TableIdentifier.of(hmsTable.getDbName(), hmsTable.getTableName()).toString());
- }
-
- SerDeInfo serdeInfo = hmsTable.getSd().getSerdeInfo();
- if (serdeInfo != null) {
- serdeInfo.getParameters().entrySet().stream()
- .filter(e -> e.getKey() != null && e.getValue() != null).forEach(e -> {
- String icebergKey = HMSTablePropertyHelper.translateToIcebergProp(e.getKey());
- properties.put(icebergKey, e.getValue());
- });
- }
-
- // Remove HMS table parameters we don't want to propagate to Iceberg
- PROPERTIES_TO_REMOVE.forEach(properties::remove);
-
- return properties;
- }
-
- private Schema schema(Properties properties, org.apache.hadoop.hive.metastore.api.Table hmsTable,
- Set identifierFields) {
- boolean autoConversion = conf.getBoolean(InputFormatConfig.SCHEMA_AUTO_CONVERSION, false);
-
- if (properties.getProperty(InputFormatConfig.TABLE_SCHEMA) != null) {
- return SchemaParser.fromJson(properties.getProperty(InputFormatConfig.TABLE_SCHEMA));
- }
- List cols = Lists.newArrayList(hmsTable.getSd().getCols());
- if (hmsTable.isSetPartitionKeys() && !hmsTable.getPartitionKeys().isEmpty()) {
- cols.addAll(hmsTable.getPartitionKeys());
- }
- Schema schema = HiveSchemaUtil.convert(cols, autoConversion);
-
- return getSchemaWithIdentifierFields(schema, identifierFields);
- }
-
- private Schema getSchemaWithIdentifierFields(Schema schema, Set identifierFields) {
- if (identifierFields == null || identifierFields.isEmpty()) {
- return schema;
- }
- Set identifierFieldIds = identifierFields.stream()
- .map(column -> {
- Types.NestedField field = schema.findField(column);
- Preconditions.checkNotNull(field,
- "Cannot find identifier field ID for the column %s in schema %s", column, schema);
- return field.fieldId();
- })
- .collect(Collectors.toSet());
-
- List cols = schema.columns().stream()
- .map(column -> identifierFieldIds.contains(column.fieldId()) ? column.asRequired() : column)
- .collect(Collectors.toList());
-
- return new Schema(cols, identifierFieldIds);
- }
-
- private static PartitionSpec spec(Configuration configuration, Schema schema,
- org.apache.hadoop.hive.metastore.api.Table hmsTable) {
-
- Preconditions.checkArgument(!hmsTable.isSetPartitionKeys() || hmsTable.getPartitionKeys().isEmpty(),
- "We can only handle non-partitioned Hive tables. The Iceberg schema should be in " +
- InputFormatConfig.PARTITION_SPEC + " or already converted to a partition transform ");
-
- PartitionSpec spec = IcebergTableUtil.spec(configuration, schema);
- if (spec != null) {
- Preconditions.checkArgument(hmsTable.getParameters().get(InputFormatConfig.PARTITION_SPEC) == null,
- "Provide only one of the following: Hive partition transform specification, or the " +
- InputFormatConfig.PARTITION_SPEC + " property");
- return spec;
- }
-
- if (hmsTable.getParameters().get(InputFormatConfig.PARTITION_SPEC) != null) {
- return PartitionSpecParser.fromJson(schema, hmsTable.getParameters().get(InputFormatConfig.PARTITION_SPEC));
- } else {
- return PartitionSpec.unpartitioned();
- }
- }
-
private void handleAddColumns(org.apache.hadoop.hive.metastore.api.Table hmsTable) {
Collection addedCols =
HiveSchemaUtil.getSchemaDiff(hmsTable.getSd().getCols(), HiveSchemaUtil.convert(icebergTable.schema()), false)
@@ -1090,84 +802,6 @@ private Type.PrimitiveType getPrimitiveTypeOrThrow(FieldSchema field) throws Met
return (Type.PrimitiveType) newType;
}
- private void setOrcOnlyFilesParam(org.apache.hadoop.hive.metastore.api.Table hmsTable) {
- if (isOrcOnlyFiles(hmsTable)) {
- hmsTable.getParameters().put(ORC_FILES_ONLY, "true");
- } else {
- hmsTable.getParameters().put(ORC_FILES_ONLY, "false");
- }
- }
-
- private boolean isOrcOnlyFiles(org.apache.hadoop.hive.metastore.api.Table hmsTable) {
- return !"FALSE".equalsIgnoreCase(hmsTable.getParameters().get(ORC_FILES_ONLY)) &&
- (hmsTable.getSd().getInputFormat() != null &&
- hmsTable.getSd().getInputFormat().toUpperCase().contains(org.apache.iceberg.FileFormat.ORC.name()) ||
- org.apache.iceberg.FileFormat.ORC.name()
- .equalsIgnoreCase(hmsTable.getSd().getSerdeInfo().getParameters().get("write.format.default")) ||
- org.apache.iceberg.FileFormat.ORC.name()
- .equalsIgnoreCase(hmsTable.getParameters().get("write.format.default")));
- }
-
- private void setWriteModeDefaults(Table icebergTbl, Map newProps, EnvironmentContext context) {
- if ((icebergTbl == null || ((BaseTable) icebergTbl).operations().current().formatVersion() == 1) &&
- IcebergTableUtil.isV2TableOrAbove(newProps)) {
- List writeModeList = ImmutableList.of(
- TableProperties.DELETE_MODE, TableProperties.UPDATE_MODE, TableProperties.MERGE_MODE);
- writeModeList.stream()
- .filter(writeMode -> catalogProperties.get(writeMode) == null)
- .forEach(writeMode -> {
- catalogProperties.put(writeMode, MERGE_ON_READ.modeName());
- newProps.put(writeMode, MERGE_ON_READ.modeName());
- });
-
- if (context != null) {
- Splitter splitter = Splitter.on(PROPERTIES_SEPARATOR);
- Map contextProperties = context.getProperties();
- if (contextProperties.containsKey(SET_PROPERTIES)) {
- String propValue = context.getProperties().get(SET_PROPERTIES);
- String writeModeStr = writeModeList.stream()
- .filter(writeMode -> !splitter.splitToList(propValue).contains(writeMode))
- .collect(Collectors.joining("'"));
- if (!writeModeStr.isEmpty()) {
- contextProperties.put(SET_PROPERTIES, propValue + "'" + writeModeStr);
- }
- }
- }
- }
- }
-
- @Override
- public void postGetTable(org.apache.hadoop.hive.metastore.api.Table hmsTable) {
- if (hmsTable != null) {
- try {
- Table tbl = IcebergTableUtil.getTable(conf, hmsTable);
- String formatVersion = String.valueOf(((BaseTable) tbl).operations().current().formatVersion());
- hmsTable.getParameters().put(TableProperties.FORMAT_VERSION, formatVersion);
- // Set the serde info
- hmsTable.getSd().setInputFormat(HiveIcebergInputFormat.class.getName());
- hmsTable.getSd().setOutputFormat(HiveIcebergOutputFormat.class.getName());
- hmsTable.getSd().getSerdeInfo().setSerializationLib(HiveIcebergSerDe.class.getName());
- String storageHandler = hmsTable.getParameters().get(hive_metastoreConstants.META_TABLE_STORAGE);
- // Check if META_TABLE_STORAGE is not present or is not an instance of ICEBERG_STORAGE_HANDLER
- if (storageHandler == null || !isHiveIcebergStorageHandler(storageHandler)) {
- hmsTable.getParameters()
- .put(hive_metastoreConstants.META_TABLE_STORAGE, HMSTablePropertyHelper.HIVE_ICEBERG_STORAGE_HANDLER);
- }
- } catch (NoSuchTableException | NotFoundException ex) {
- // If the table doesn't exist, ignore throwing exception from here
- }
- }
- }
-
- private static boolean isHiveIcebergStorageHandler(String storageHandler) {
- try {
- Class> storageHandlerClass = Class.forName(storageHandler);
- return Class.forName(HIVE_ICEBERG_STORAGE_HANDLER).isAssignableFrom(storageHandlerClass);
- } catch (ClassNotFoundException e) {
- throw new RuntimeException("Error checking storage handler class", e);
- }
- }
-
@Override
public void preDropPartitions(org.apache.hadoop.hive.metastore.api.Table hmsTable,
EnvironmentContext context,
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index 88aa9c449911..67b50c3f3489 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -131,6 +131,7 @@
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataOperations;
import org.apache.iceberg.ExpireSnapshots;
@@ -172,8 +173,10 @@
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.expressions.StrictMetricsEvaluator;
import org.apache.iceberg.hadoop.ConfigProperties;
+import org.apache.iceberg.hive.CatalogUtils;
import org.apache.iceberg.hive.HiveSchemaUtil;
import org.apache.iceberg.hive.HiveTableOperations;
+import org.apache.iceberg.hive.MetastoreUtil;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.InputFormatConfig;
@@ -257,7 +260,13 @@ public Class extends AbstractSerDe> getSerDeClass() {
public HiveMetaHook getMetaHook() {
// Make sure to always return a new instance here, as HiveIcebergMetaHook might hold state relevant for the
// operation.
- return new HiveIcebergMetaHook(conf);
+ String catalogType = CatalogUtils.getCatalogType(conf);
+ if (StringUtils.isEmpty(catalogType) || CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE.equals(catalogType)) {
+ return new HiveIcebergMetaHook(conf);
+ } else {
+ conf.set(ConfigProperties.LOCK_HIVE_ENABLED, "false");
+ return new BaseHiveIcebergMetaHook(conf);
+ }
}
@Override
@@ -2114,7 +2123,7 @@ public List getPartitionKeys(org.apache.hadoop.hive.ql.metadata.Tab
return Collections.emptyList();
}
Table icebergTable = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
- return IcebergTableUtil.getPartitionKeys(icebergTable, icebergTable.spec().specId());
+ return MetastoreUtil.getPartitionKeys(icebergTable, icebergTable.spec().specId());
}
@Override
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
index 098928bb2043..11fc7a624238 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
@@ -44,7 +44,6 @@
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.utils.TableFetcher;
@@ -87,7 +86,7 @@
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.ResidualEvaluator;
-import org.apache.iceberg.hive.HiveSchemaUtil;
+import org.apache.iceberg.hive.CatalogUtils;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.InputFormatConfig;
@@ -106,7 +105,6 @@
import static org.apache.iceberg.RowLevelOperationMode.COPY_ON_WRITE;
import static org.apache.iceberg.RowLevelOperationMode.MERGE_ON_READ;
import static org.apache.iceberg.mr.InputFormatConfig.CATALOG_NAME;
-import static org.apache.iceberg.mr.InputFormatConfig.CATALOG_WAREHOUSE_TEMPLATE;
public class IcebergTableUtil {
private static final Logger LOG = LoggerFactory.getLogger(IcebergTableUtil.class);
@@ -501,20 +499,6 @@ public static Expression generateExpressionFromPartitionSpec(Table table, Map getPartitionKeys(Table table, int specId) {
- Schema schema = table.specs().get(specId).schema();
- List hiveSchema = HiveSchemaUtil.convert(schema);
- Map colNameToColType = hiveSchema.stream()
- .collect(Collectors.toMap(FieldSchema::getName, FieldSchema::getType));
- return table.specs().get(specId).fields().stream()
- .map(partField -> new FieldSchema(
- schema.findColumnName(partField.sourceId()),
- colNameToColType.get(schema.findColumnName(partField.sourceId())),
- String.format("Transform: %s", partField.transform().toString()))
- )
- .collect(Collectors.toList());
- }
-
public static List getPartitionFields(Table table, boolean latestSpecOnly) {
return latestSpecOnly ? table.spec().fields() :
table.specs().values().stream()
@@ -646,7 +630,7 @@ public static String defaultWarehouseLocation(TableIdentifier tableIdentifier,
Configuration conf, Properties catalogProperties) {
StringBuilder sb = new StringBuilder();
String warehouseLocation = conf.get(String.format(
- CATALOG_WAREHOUSE_TEMPLATE, catalogProperties.getProperty(CATALOG_NAME))
+ CatalogUtils.CATALOG_WAREHOUSE_TEMPLATE, catalogProperties.getProperty(CATALOG_NAME))
);
sb.append(warehouseLocation).append('/');
for (String level : tableIdentifier.namespace().levels()) {
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestCatalogs.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestCatalogs.java
index 42a6b0c77c6d..060ffa8fba82 100644
--- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestCatalogs.java
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestCatalogs.java
@@ -35,6 +35,7 @@
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.hive.CatalogUtils;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.types.Types;
import org.assertj.core.api.Assertions;
@@ -209,7 +210,7 @@ public void testLoadCatalogDefault() {
@Test
public void testLoadCatalogHive() {
String catalogName = "barCatalog";
- conf.set(InputFormatConfig.catalogPropertyConfigKey(catalogName, CatalogUtil.ICEBERG_CATALOG_TYPE),
+ conf.set(CatalogUtils.catalogPropertyConfigKey(catalogName, CatalogUtil.ICEBERG_CATALOG_TYPE),
CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE);
Optional hiveCatalog = Catalogs.loadCatalog(conf, catalogName);
Assert.assertTrue(hiveCatalog.isPresent());
@@ -222,9 +223,9 @@ public void testLoadCatalogHive() {
@Test
public void testLoadCatalogHadoop() {
String catalogName = "barCatalog";
- conf.set(InputFormatConfig.catalogPropertyConfigKey(catalogName, CatalogUtil.ICEBERG_CATALOG_TYPE),
+ conf.set(CatalogUtils.catalogPropertyConfigKey(catalogName, CatalogUtil.ICEBERG_CATALOG_TYPE),
CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
- conf.set(InputFormatConfig.catalogPropertyConfigKey(catalogName, CatalogProperties.WAREHOUSE_LOCATION),
+ conf.set(CatalogUtils.catalogPropertyConfigKey(catalogName, CatalogProperties.WAREHOUSE_LOCATION),
"/tmp/mylocation");
Optional hadoopCatalog = Catalogs.loadCatalog(conf, catalogName);
Assert.assertTrue(hadoopCatalog.isPresent());
@@ -238,9 +239,9 @@ public void testLoadCatalogHadoop() {
@Test
public void testLoadCatalogCustom() {
String catalogName = "barCatalog";
- conf.set(InputFormatConfig.catalogPropertyConfigKey(catalogName, CatalogProperties.CATALOG_IMPL),
+ conf.set(CatalogUtils.catalogPropertyConfigKey(catalogName, CatalogProperties.CATALOG_IMPL),
CustomHadoopCatalog.class.getName());
- conf.set(InputFormatConfig.catalogPropertyConfigKey(catalogName, CatalogProperties.WAREHOUSE_LOCATION),
+ conf.set(CatalogUtils.catalogPropertyConfigKey(catalogName, CatalogProperties.WAREHOUSE_LOCATION),
"/tmp/mylocation");
Optional customHadoopCatalog = Catalogs.loadCatalog(conf, catalogName);
Assert.assertTrue(customHadoopCatalog.isPresent());
@@ -258,7 +259,7 @@ public void testLoadCatalogLocation() {
@Test
public void testLoadCatalogUnknown() {
String catalogName = "barCatalog";
- conf.set(InputFormatConfig.catalogPropertyConfigKey(catalogName, CatalogUtil.ICEBERG_CATALOG_TYPE), "fooType");
+ conf.set(CatalogUtils.catalogPropertyConfigKey(catalogName, CatalogUtil.ICEBERG_CATALOG_TYPE), "fooType");
Assertions.assertThatThrownBy(() -> Catalogs.loadCatalog(conf, catalogName))
.isInstanceOf(UnsupportedOperationException.class)
@@ -269,7 +270,7 @@ public void testLoadCatalogUnknown() {
public void testDefaultCatalogProperties() {
String catalogProperty = "io.manifest.cache-enabled";
// Set global property
- final String defaultCatalogProperty = InputFormatConfig.CATALOG_DEFAULT_CONFIG_PREFIX + catalogProperty;
+ final String defaultCatalogProperty = CatalogUtils.CATALOG_DEFAULT_CONFIG_PREFIX + catalogProperty;
conf.setBoolean(defaultCatalogProperty, true);
HiveCatalog defaultCatalog = (HiveCatalog) Catalogs.loadCatalog(conf, null).get();
Assert.assertEquals("true", defaultCatalog.properties().get(catalogProperty));
@@ -301,9 +302,9 @@ public CustomHadoopCatalog(Configuration conf, String warehouseLocation) {
}
private void setCustomCatalogProperties(String catalogName, String warehouseLocation) {
- conf.set(InputFormatConfig.catalogPropertyConfigKey(catalogName, CatalogProperties.WAREHOUSE_LOCATION),
+ conf.set(CatalogUtils.catalogPropertyConfigKey(catalogName, CatalogProperties.WAREHOUSE_LOCATION),
warehouseLocation);
- conf.set(InputFormatConfig.catalogPropertyConfigKey(catalogName, CatalogProperties.CATALOG_IMPL),
+ conf.set(CatalogUtils.catalogPropertyConfigKey(catalogName, CatalogProperties.CATALOG_IMPL),
CustomHadoopCatalog.class.getName());
conf.set(InputFormatConfig.CATALOG_NAME, catalogName);
}
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java
index fddc0cca9b18..30f7bb087636 100644
--- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java
@@ -57,6 +57,7 @@
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.hive.CatalogUtils;
import org.apache.iceberg.mr.hive.HiveIcebergInputFormat;
import org.apache.iceberg.mr.mapred.Container;
import org.apache.iceberg.mr.mapred.MapredIcebergInputFormat;
@@ -372,9 +373,9 @@ public void testCustomCatalog() throws IOException {
String warehouseLocation = temp.newFolder("hadoop_catalog").getAbsolutePath();
conf.set("warehouse.location", warehouseLocation);
conf.set(InputFormatConfig.CATALOG_NAME, Catalogs.ICEBERG_DEFAULT_CATALOG_NAME);
- conf.set(InputFormatConfig.catalogPropertyConfigKey(Catalogs.ICEBERG_DEFAULT_CATALOG_NAME,
+ conf.set(CatalogUtils.catalogPropertyConfigKey(Catalogs.ICEBERG_DEFAULT_CATALOG_NAME,
CatalogUtil.ICEBERG_CATALOG_TYPE), CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
- conf.set(InputFormatConfig.catalogPropertyConfigKey(Catalogs.ICEBERG_DEFAULT_CATALOG_NAME,
+ conf.set(CatalogUtils.catalogPropertyConfigKey(Catalogs.ICEBERG_DEFAULT_CATALOG_NAME,
CatalogProperties.WAREHOUSE_LOCATION), warehouseLocation);
Catalog catalog = new HadoopCatalog(conf, conf.get("warehouse.location"));
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
index 732e6f9f94e0..09229d3b91ba 100644
--- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
@@ -699,7 +699,8 @@ public void testCreateTableError() {
"'='" +
testTables.catalogName() +
"')"));
- if (testTableType != TestTables.TestTableType.HADOOP_CATALOG) {
+ if (testTableType != TestTables.TestTableType.HADOOP_CATALOG &&
+ testTableType != TestTables.TestTableType.CUSTOM_CATALOG) {
assertThatThrownBy
.isInstanceOf(IllegalArgumentException.class)
.hasMessageStartingWith("Failed to execute Hive query")
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
index c66a3240978a..8e885f36ff72 100644
--- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
@@ -49,6 +49,7 @@
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.hive.CatalogUtils;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.hive.HiveVersion;
import org.apache.iceberg.mr.Catalogs;
@@ -542,10 +543,10 @@ static class CustomCatalogTestTables extends TestTables {
@Override
public Map properties() {
return ImmutableMap.of(
- InputFormatConfig.catalogPropertyConfigKey(catalog, CatalogProperties.CATALOG_IMPL),
- TestCatalogs.CustomHadoopCatalog.class.getName(),
- InputFormatConfig.catalogPropertyConfigKey(catalog, CatalogProperties.WAREHOUSE_LOCATION),
- warehouseLocation
+ CatalogUtils.catalogPropertyConfigKey(catalog, CatalogProperties.CATALOG_IMPL),
+ TestCatalogs.CustomHadoopCatalog.class.getName(),
+ CatalogUtils.catalogPropertyConfigKey(catalog, CatalogProperties.WAREHOUSE_LOCATION),
+ warehouseLocation
);
}
@@ -573,10 +574,10 @@ static class HadoopCatalogTestTables extends TestTables {
@Override
public Map properties() {
return ImmutableMap.of(
- InputFormatConfig.catalogPropertyConfigKey(catalog, CatalogUtil.ICEBERG_CATALOG_TYPE),
- CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP,
- InputFormatConfig.catalogPropertyConfigKey(catalog, CatalogProperties.WAREHOUSE_LOCATION),
- warehouseLocation
+ CatalogUtils.catalogPropertyConfigKey(catalog, CatalogUtil.ICEBERG_CATALOG_TYPE),
+ CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP,
+ CatalogUtils.catalogPropertyConfigKey(catalog, CatalogProperties.WAREHOUSE_LOCATION),
+ warehouseLocation
);
}
@@ -627,7 +628,7 @@ static class HiveTestTables extends TestTables {
@Override
public Map properties() {
- return ImmutableMap.of(InputFormatConfig.catalogPropertyConfigKey(catalog, CatalogUtil.ICEBERG_CATALOG_TYPE),
+ return ImmutableMap.of(CatalogUtils.catalogPropertyConfigKey(catalog, CatalogUtil.ICEBERG_CATALOG_TYPE),
CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE);
}
diff --git a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_rest_catalog.q b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_rest_catalog.q
new file mode 100644
index 000000000000..b87d9e0af671
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_rest_catalog.q
@@ -0,0 +1,64 @@
+-- SORT_QUERY_RESULTS
+-- Mask neededVirtualColumns due to non-strict order
+--! qt:replace:/(\s+neededVirtualColumns:\s)(.*)/$1#Masked#/
+-- Mask random uuid
+--! qt:replace:/(\s+'uuid'=')\S+('\s*)/$1#Masked#$2/
+-- Mask a random snapshot id
+--! qt:replace:/(\s+current-snapshot-id\s+)\S+(\s*)/$1#Masked#/
+-- Mask added file size
+--! qt:replace:/(\S\"added-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
+-- Mask total file size
+--! qt:replace:/(\S\"total-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
+-- Mask current-snapshot-timestamp-ms
+--! qt:replace:/(\s+current-snapshot-timestamp-ms\s+)\S+(\s*)/$1#Masked#$2/
+--! qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
+--! qt:replace:/(MAJOR\s+refused\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
+-- Mask compaction id as they will be allocated in parallel threads
+--! qt:replace:/^[0-9]/#Masked#/
+-- Mask removed file size
+--! qt:replace:/(\S\"removed-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
+-- Mask iceberg version
+--! qt:replace:/(\S\"iceberg-version\\\":\\\")(\w+\s\w+\s\d+\.\d+\.\d+\s\(\w+\s\w+\))(\\\")/$1#Masked#$3/
+
+set metastore.client.impl=org.apache.iceberg.hive.client.HiveRESTCatalogClient;
+set metastore.catalog.default=ice01;
+set iceberg.catalog.ice01.type=rest;
+
+--! This config is set in the driver setup (see TestIcebergRESTCatalogLlapLocalCliDriver.java)
+--! conf.set('iceberg.catalog.ice01.uri', );
+
+create database ice_rest;
+use ice_rest;
+
+--! Creating table without catalog name in table properties
+create table ice_orc1 (
+ first_name string,
+ last_name string,
+ dept_id bigint,
+ team_id bigint
+ )
+partitioned by (company_id bigint)
+stored by iceberg stored as orc;
+
+--! Creating table with a valid catalog name in table properties
+create table ice_orc2 (
+ first_name string,
+ last_name string,
+ dept_id bigint,
+ team_id bigint
+ )
+partitioned by (company_id bigint)
+stored by iceberg stored as orc
+TBLPROPERTIES('format-version'='2', 'iceberg.catalog'='ice01');
+
+--! Output should contain: 'type' = 'rest'
+show create table ice_orc2;
+
+show tables;
+drop table ice_orc1;
+drop table ice_orc2;
+show tables;
+
+show databases;
+drop database ice_rest;
+show databases;
\ No newline at end of file
diff --git a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_rest_catalog.q.out b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_rest_catalog.q.out
new file mode 100644
index 000000000000..2622a47ae1cf
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_rest_catalog.q.out
@@ -0,0 +1,154 @@
+PREHOOK: query: create database ice_rest
+PREHOOK: type: CREATEDATABASE
+PREHOOK: Output: database:ice_rest
+POSTHOOK: query: create database ice_rest
+POSTHOOK: type: CREATEDATABASE
+POSTHOOK: Output: database:ice_rest
+PREHOOK: query: use ice_rest
+PREHOOK: type: SWITCHDATABASE
+PREHOOK: Input: database:ice_rest
+POSTHOOK: query: use ice_rest
+POSTHOOK: type: SWITCHDATABASE
+POSTHOOK: Input: database:ice_rest
+PREHOOK: query: create table ice_orc1 (
+ first_name string,
+ last_name string,
+ dept_id bigint,
+ team_id bigint
+ )
+partitioned by (company_id bigint)
+stored by iceberg stored as orc
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:ice_rest
+PREHOOK: Output: ice_rest@ice_orc1
+POSTHOOK: query: create table ice_orc1 (
+ first_name string,
+ last_name string,
+ dept_id bigint,
+ team_id bigint
+ )
+partitioned by (company_id bigint)
+stored by iceberg stored as orc
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:ice_rest
+POSTHOOK: Output: ice_rest@ice_orc1
+PREHOOK: query: create table ice_orc2 (
+ first_name string,
+ last_name string,
+ dept_id bigint,
+ team_id bigint
+ )
+partitioned by (company_id bigint)
+stored by iceberg stored as orc
+TBLPROPERTIES('format-version'='2', 'iceberg.catalog'='ice01')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:ice_rest
+PREHOOK: Output: ice_rest@ice_orc2
+POSTHOOK: query: create table ice_orc2 (
+ first_name string,
+ last_name string,
+ dept_id bigint,
+ team_id bigint
+ )
+partitioned by (company_id bigint)
+stored by iceberg stored as orc
+TBLPROPERTIES('format-version'='2', 'iceberg.catalog'='ice01')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:ice_rest
+POSTHOOK: Output: ice_rest@ice_orc2
+PREHOOK: query: show create table ice_orc2
+PREHOOK: type: SHOW_CREATETABLE
+PREHOOK: Input: ice_rest@ice_orc2
+POSTHOOK: query: show create table ice_orc2
+POSTHOOK: type: SHOW_CREATETABLE
+POSTHOOK: Input: ice_rest@ice_orc2
+CREATE EXTERNAL TABLE `ice_orc2`(
+ `first_name` string,
+ `last_name` string,
+ `dept_id` bigint,
+ `team_id` bigint,
+ `company_id` bigint)
+PARTITIONED BY (
+ `company_id` bigint COMMENT 'Transform: identity')
+PARTITIONED BY SPEC (
+`company_id`)
+ROW FORMAT SERDE
+ 'org.apache.iceberg.mr.hive.HiveIcebergSerDe'
+STORED BY
+ 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
+WITH SERDEPROPERTIES (
+ 'serialization.format'='1')
+LOCATION
+#### A masked pattern was here ####
+TBLPROPERTIES (
+ 'bucketing_version'='2',
+ 'current-schema'='{"type":"struct","schema-id":0,"fields":[{"id":1,"name":"first_name","required":false,"type":"string"},{"id":2,"name":"last_name","required":false,"type":"string"},{"id":3,"name":"dept_id","required":false,"type":"long"},{"id":4,"name":"team_id","required":false,"type":"long"},{"id":5,"name":"company_id","required":false,"type":"long"}]}',
+ 'default-partition-spec'='{"spec-id":0,"fields":[{"name":"company_id","transform":"identity","source-id":5,"field-id":1000}]}',
+ 'format-version'='2',
+ 'iceberg.catalog'='ice01',
+ 'iceberg.orc.files.only'='true',
+#### A masked pattern was here ####
+ 'name'='ice_rest.ice_orc2',
+ 'parquet.compression'='zstd',
+ 'serialization.format'='1',
+ 'snapshot-count'='0',
+ 'table_type'='ICEBERG',
+ 'type'='rest',
+ 'uuid'='#Masked#',
+ 'write.delete.mode'='merge-on-read',
+ 'write.format.default'='orc',
+ 'write.merge.mode'='merge-on-read',
+ 'write.update.mode'='merge-on-read')
+PREHOOK: query: show tables
+PREHOOK: type: SHOWTABLES
+PREHOOK: Input: database:ice_rest
+POSTHOOK: query: show tables
+POSTHOOK: type: SHOWTABLES
+POSTHOOK: Input: database:ice_rest
+ice_orc1
+ice_orc2
+PREHOOK: query: drop table ice_orc1
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: ice_rest@ice_orc1
+PREHOOK: Output: database:ice_rest
+PREHOOK: Output: ice_rest@ice_orc1
+POSTHOOK: query: drop table ice_orc1
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: ice_rest@ice_orc1
+POSTHOOK: Output: database:ice_rest
+POSTHOOK: Output: ice_rest@ice_orc1
+PREHOOK: query: drop table ice_orc2
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: ice_rest@ice_orc2
+PREHOOK: Output: database:ice_rest
+PREHOOK: Output: ice_rest@ice_orc2
+POSTHOOK: query: drop table ice_orc2
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: ice_rest@ice_orc2
+POSTHOOK: Output: database:ice_rest
+POSTHOOK: Output: ice_rest@ice_orc2
+PREHOOK: query: show tables
+PREHOOK: type: SHOWTABLES
+PREHOOK: Input: database:ice_rest
+POSTHOOK: query: show tables
+POSTHOOK: type: SHOWTABLES
+POSTHOOK: Input: database:ice_rest
+PREHOOK: query: show databases
+PREHOOK: type: SHOWDATABASES
+POSTHOOK: query: show databases
+POSTHOOK: type: SHOWDATABASES
+default
+ice_rest
+PREHOOK: query: drop database ice_rest
+PREHOOK: type: DROPDATABASE
+PREHOOK: Input: database:ice_rest
+PREHOOK: Output: database:ice_rest
+POSTHOOK: query: drop database ice_rest
+POSTHOOK: type: DROPDATABASE
+POSTHOOK: Input: database:ice_rest
+POSTHOOK: Output: database:ice_rest
+PREHOOK: query: show databases
+PREHOOK: type: SHOWDATABASES
+POSTHOOK: query: show databases
+POSTHOOK: type: SHOWDATABASES
+default
diff --git a/itests/hive-iceberg/pom.xml b/itests/hive-iceberg/pom.xml
new file mode 100644
index 000000000000..170a6c89abd8
--- /dev/null
+++ b/itests/hive-iceberg/pom.xml
@@ -0,0 +1,155 @@
+
+
+
+ 4.0.0
+
+ org.apache.hive
+ hive-it
+ 4.2.0-SNAPSHOT
+ ../pom.xml
+
+ hive-it-iceberg
+ jar
+ Hive Iceberg Integration - Unit Tests
+
+ ../..
+ UTF-8
+ false
+ 1.9.1
+
+
+
+ org.apache.hive
+ hive-standalone-metastore-server
+ ${project.version}
+
+
+ org.apache.hive
+ hive-standalone-metastore-rest-catalog
+ ${project.version}
+
+
+ org.apache.hive
+ hive-standalone-metastore-rest-catalog
+ tests
+ ${project.version}
+
+
+ org.apache.hive
+ hive-standalone-metastore-common
+ ${project.version}
+
+
+ org.apache.hive
+ hive-iceberg-catalog
+ ${project.version}
+
+
+ org.apache.hive
+ hive-iceberg-handler
+ ${project.version}
+ test
+
+
+ org.apache.hive
+ hive-standalone-metastore-common
+ ${project.version}
+ tests
+ test
+
+
+ org.apache.hive
+ hive-standalone-metastore-server
+ ${project.version}
+ tests
+ test
+
+
+ org.apache.iceberg
+ iceberg-api
+ ${iceberg.version}
+ tests
+ test
+
+
+ org.apache.iceberg
+ iceberg-core
+ ${iceberg.version}
+ tests
+ test
+
+
+ org.apache.iceberg
+ iceberg-open-api
+ ${iceberg.version}
+ test-fixtures
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter
+ ${junit.jupiter.version}
+ test
+
+
+ org.apache.hive
+ hive-exec
+ tests
+ test
+ ${project.version}
+
+
+ org.apache.hive
+ hive-it-util
+ test
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-antrun-plugin
+
+
+ setup-metastore-scripts
+ process-test-resources
+
+ run
+
+
+
+
+
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+
+ test-jar
+
+
+
+
+
+
+
diff --git a/itests/hive-iceberg/src/test/java/org/apache/hive/TestHiveRESTCatalogClientIT.java b/itests/hive-iceberg/src/test/java/org/apache/hive/TestHiveRESTCatalogClientIT.java
new file mode 100644
index 000000000000..b6386e0ec42f
--- /dev/null
+++ b/itests/hive-iceberg/src/test/java/org/apache/hive/TestHiveRESTCatalogClientIT.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaHook;
+import org.apache.hadoop.hive.metastore.HiveMetaHookLoader;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.ServletSecurity;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.PrincipalType;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
+import org.apache.hadoop.hive.ql.metadata.HiveUtils;
+import org.apache.hadoop.hive.ql.stats.StatsUtils;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PartitionSpecParser;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hive.CatalogUtils;
+import org.apache.iceberg.hive.HiveSchemaUtil;
+import org.apache.iceberg.rest.extension.HiveRESTCatalogServerExtension;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import java.util.Map;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/*
+ * This test is an integration test for the hive-iceberg REST Catalog client and HMS REST Catalog Server.
+ * It uses the HiveMetaStoreClient backed by hive-iceberg REST catalog adapter to connect to the HMS RESTCatalog Server.
+ * The flow is as follows:
+ * Hive ql wrapper --> HiveMetaStoreClient --> HiveRESTCatalogClient --> HMS RESTCatalog Server --> HMS
+ */
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class TestHiveRESTCatalogClientIT {
+
+ private static final String DB_NAME = "ice_db";
+ private static final String TABLE_NAME = "ice_tbl";
+ private static final String CATALOG_NAME = "ice01";
+ private static final String HIVE_ICEBERG_STORAGE_HANDLER = "org.apache.iceberg.mr.hive.HiveIcebergStorageHandler";
+
+ private Configuration conf;
+ private HiveConf hiveConf;
+ private Hive hive;
+
+ private IMetaStoreClient msClient;
+
+ @RegisterExtension
+ private static final HiveRESTCatalogServerExtension REST_CATALOG_EXTENSION =
+ HiveRESTCatalogServerExtension.builder(ServletSecurity.AuthType.NONE)
+ .addMetaStoreSchemaClassName(ITestsSchemaInfo.class)
+ .build();
+
+ @BeforeAll
+ public void setup() throws Exception {
+ // Starting msClient with Iceberg REST Catalog client underneath
+ String restCatalogPrefix = String.format("%s%s.", CatalogUtils.CATALOG_CONFIG_PREFIX, CATALOG_NAME);
+
+ conf = REST_CATALOG_EXTENSION.getConf();
+
+ MetastoreConf.setVar(conf, MetastoreConf.ConfVars.METASTORE_CLIENT_IMPL,
+ "org.apache.iceberg.hive.client.HiveRESTCatalogClient");
+ conf.set(MetastoreConf.ConfVars.CATALOG_DEFAULT.getVarname(), CATALOG_NAME);
+ conf.set(restCatalogPrefix + "uri", REST_CATALOG_EXTENSION.getRestEndpoint());
+ conf.set(restCatalogPrefix + "type", CatalogUtil.ICEBERG_CATALOG_TYPE_REST);
+
+ HiveMetaHookLoader hookLoader = tbl -> {
+ HiveStorageHandler storageHandler;
+ try {
+ storageHandler = HiveUtils.getStorageHandler(conf, HIVE_ICEBERG_STORAGE_HANDLER);
+ } catch (HiveException e) {
+ throw new MetaException(e.getMessage());
+ }
+ return storageHandler == null ? null : storageHandler.getMetaHook();
+ };
+
+ msClient = new HiveMetaStoreClient(conf, hookLoader);
+ hiveConf = new HiveConf(conf, HiveConf.class);
+ hive = Hive.get(hiveConf);
+ }
+
+ @AfterAll public void tearDown() {
+ if (msClient != null) {
+ msClient.close();
+ }
+ }
+
+ @Test
+ public void testIceberg() throws Exception {
+
+ // --- Create Database ---
+ Database db = new Database();
+ db.setCatalogName(CATALOG_NAME);
+ db.setName(DB_NAME);
+ db.setOwnerType(PrincipalType.USER);
+ db.setOwnerName(System.getProperty("user.name"));
+ String warehouseDir = MetastoreConf.get(conf, MetastoreConf.ConfVars.WAREHOUSE_EXTERNAL.getVarname());
+ db.setLocationUri(warehouseDir + "/" + DB_NAME + ".db");
+ hive.createDatabase(db, true);
+
+ // --- Get Database ---
+ Database retrievedDB = hive.getDatabase(CATALOG_NAME, DB_NAME);
+ Assertions.assertEquals(DB_NAME, retrievedDB.getName());
+ Assertions.assertEquals(CATALOG_NAME, retrievedDB.getCatalogName());
+
+ // --- Get Databases ---
+ List dbs = msClient.getDatabases(CATALOG_NAME, "ice_*");
+ Assertions.assertEquals(1, dbs.size());
+ Assertions.assertEquals(DB_NAME, dbs.get(0));
+
+ // --- Get All Databases ---
+ List allDbs = msClient.getAllDatabases(CATALOG_NAME);
+ Assertions.assertEquals(2, allDbs.size());
+ Assertions.assertTrue(allDbs.contains("default"));
+ Assertions.assertTrue(allDbs.contains(DB_NAME));
+
+ // --- Create Table ---
+ org.apache.hadoop.hive.metastore.api.Table tTable = createPartitionedTable(msClient,
+ CATALOG_NAME, DB_NAME, TABLE_NAME, new java.util.HashMap<>());
+ Assertions.assertNotNull(tTable);
+ Assertions.assertEquals(HiveMetaHook.ICEBERG, tTable.getParameters().get(HiveMetaHook.TABLE_TYPE));
+
+ // --- Create Table --- with an invalid catalog name in table parameters (should fail)
+ Map tableParameters = new java.util.HashMap<>();
+ tableParameters.put(CatalogUtils.CATALOG_NAME, "some_missing_catalog");
+ assertThrows(IllegalArgumentException.class, () ->
+ createPartitionedTable(msClient, CATALOG_NAME, DB_NAME, TABLE_NAME + "_2", tableParameters));
+
+ // --- tableExists ---
+ Assertions.assertTrue(msClient.tableExists(CATALOG_NAME, DB_NAME, TABLE_NAME));
+
+ // --- Get Table ---
+ org.apache.hadoop.hive.metastore.api.Table table = msClient.getTable(CATALOG_NAME, DB_NAME, TABLE_NAME);
+ Assertions.assertEquals(DB_NAME, table.getDbName());
+ Assertions.assertEquals(TABLE_NAME, table.getTableName());
+ Assertions.assertEquals(HIVE_ICEBERG_STORAGE_HANDLER, table.getParameters().get("storage_handler"));
+ Assertions.assertNotNull(table.getParameters().get(TableProperties.DEFAULT_PARTITION_SPEC));
+ Assertions.assertEquals(1, table.getPartitionKeys().size());
+ Assertions.assertEquals("city", table.getPartitionKeys().getFirst().getName());
+
+ // --- Get Tables ---
+ List tables = msClient.getTables(CATALOG_NAME, DB_NAME, "ice_*");
+ Assertions.assertEquals(1, tables.size());
+ Assertions.assertEquals(TABLE_NAME, tables.getFirst());
+
+ // --- Get All Tables ---
+ List allTables = msClient.getAllTables(CATALOG_NAME, DB_NAME);
+ Assertions.assertEquals(1, allTables.size());
+ Assertions.assertEquals(TABLE_NAME, allTables.getFirst());
+
+ // --- Drop Table ---
+ msClient.dropTable(CATALOG_NAME, DB_NAME, TABLE_NAME);
+ Assertions.assertFalse(msClient.tableExists(CATALOG_NAME, DB_NAME, TABLE_NAME));
+
+ // --- Drop Database ---
+ msClient.dropDatabase(DB_NAME);
+ Assertions.assertFalse(msClient.getAllDatabases(CATALOG_NAME).contains(DB_NAME));
+ }
+
+ private static Table createPartitionedTable(IMetaStoreClient db, String catName, String dbName, String tableName,
+ Map tableParameters) throws Exception {
+ db.dropTable(catName, dbName, tableName);
+ Table table = new Table();
+ table.setCatName(catName);
+ table.setDbName(dbName);
+ table.setTableName(tableName);
+
+ FieldSchema col1 = new FieldSchema("key", "string", "");
+ FieldSchema col2 = new FieldSchema("value", "int", "");
+ FieldSchema col3 = new FieldSchema("city", "string", "");
+ List cols = Arrays.asList(col1, col2, col3);
+
+ StorageDescriptor sd = new StorageDescriptor();
+ sd.setSerdeInfo(new SerDeInfo());
+ sd.setInputFormat(TextInputFormat.class.getCanonicalName());
+ sd.setOutputFormat(HiveIgnoreKeyTextOutputFormat.class.getCanonicalName());
+ sd.setCols(cols);
+ sd.getSerdeInfo().setParameters(new java.util.HashMap<>());
+ table.setSd(sd);
+
+ Schema schema = HiveSchemaUtil.convert(cols, false);
+ PartitionSpec spec = PartitionSpec.builderFor(schema).identity("city").build();
+ String specString = PartitionSpecParser.toJson(spec);
+ table.setParameters(new java.util.HashMap<>());
+ table.getParameters().putAll(tableParameters);
+ table.getParameters().put(TableProperties.DEFAULT_PARTITION_SPEC, specString);
+
+ db.createTable(table);
+ return db.getTable(catName, dbName, tableName);
+ }
+}
diff --git a/itests/pom.xml b/itests/pom.xml
index 938f5267c9fa..2d8b04efc5de 100644
--- a/itests/pom.xml
+++ b/itests/pom.xml
@@ -44,6 +44,7 @@
qtest-druidqtest-kuduqtest-iceberg
+ hive-iceberg
diff --git a/itests/qtest-iceberg/pom.xml b/itests/qtest-iceberg/pom.xml
index 57ac18684931..6ac62e8fa0b3 100644
--- a/itests/qtest-iceberg/pom.xml
+++ b/itests/qtest-iceberg/pom.xml
@@ -20,13 +20,23 @@
../pom.xml4.0.0
- hive-it-iceberg
+ hive-it-iceberg-qfilejarHive Integration - QFile Iceberg Tests../..
+
+ org.apache.hive
+ hive-standalone-metastore-server
+ ${project.version}
+
+
+ io.dropwizard.metrics
+ metrics-core
+ ${dropwizard.version}
+
@@ -452,6 +462,19 @@
${mockito-core.version}test
+
+ org.apache.hive
+ hive-standalone-metastore-rest-catalog
+ ${project.version}
+ test
+
+
+ org.apache.hive
+ hive-standalone-metastore-rest-catalog
+ tests
+ ${project.version}
+ test
+
diff --git a/itests/qtest-iceberg/src/test/java/org/apache/hadoop/hive/cli/HiveRESTCatalogServerExtension.java b/itests/qtest-iceberg/src/test/java/org/apache/hadoop/hive/cli/HiveRESTCatalogServerExtension.java
new file mode 100644
index 000000000000..bc39ab612ea4
--- /dev/null
+++ b/itests/qtest-iceberg/src/test/java/org/apache/hadoop/hive/cli/HiveRESTCatalogServerExtension.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.cli;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.MetaStoreSchemaInfo;
+import org.apache.hadoop.hive.metastore.ServletSecurity.AuthType;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+import org.apache.iceberg.rest.extension.RESTCatalogServer;
+import org.junit.rules.ExternalResource;
+
+public class HiveRESTCatalogServerExtension extends ExternalResource {
+ private final Configuration conf;
+ private final RESTCatalogServer restCatalogServer;
+
+ private HiveRESTCatalogServerExtension(AuthType authType, Class extends MetaStoreSchemaInfo> schemaInfoClass) {
+ this.conf = MetastoreConf.newMetastoreConf();
+ MetastoreConf.setVar(conf, ConfVars.CATALOG_SERVLET_AUTH, authType.name());
+ restCatalogServer = new RESTCatalogServer();
+ if (schemaInfoClass != null) {
+ restCatalogServer.setSchemaInfoClass(schemaInfoClass);
+ }
+ }
+
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ protected void before() throws Throwable {
+ restCatalogServer.start(conf);
+ }
+
+ @Override
+ protected void after() {
+ restCatalogServer.stop();
+ }
+
+ public String getRestEndpoint() {
+ return restCatalogServer.getRestEndpoint();
+ }
+
+ public static class Builder {
+ private final AuthType authType;
+ private Class extends MetaStoreSchemaInfo> metaStoreSchemaClass;
+
+ private Builder(AuthType authType) {
+ this.authType = authType;
+ }
+
+ public Builder addMetaStoreSchemaClassName(Class extends MetaStoreSchemaInfo> metaStoreSchemaClass) {
+ this.metaStoreSchemaClass = metaStoreSchemaClass;
+ return this;
+ }
+
+ public HiveRESTCatalogServerExtension build() {
+ return new HiveRESTCatalogServerExtension(authType, metaStoreSchemaClass);
+ }
+ }
+
+ public static Builder builder(AuthType authType) {
+ return new Builder(authType);
+ }
+
+ public RESTCatalogServer getRestCatalogServer() {
+ return restCatalogServer;
+ }
+}
\ No newline at end of file
diff --git a/itests/qtest-iceberg/src/test/java/org/apache/hadoop/hive/cli/TestIcebergRESTCatalogLlapLocalCliDriver.java b/itests/qtest-iceberg/src/test/java/org/apache/hadoop/hive/cli/TestIcebergRESTCatalogLlapLocalCliDriver.java
new file mode 100644
index 000000000000..ef28f6257cb1
--- /dev/null
+++ b/itests/qtest-iceberg/src/test/java/org/apache/hadoop/hive/cli/TestIcebergRESTCatalogLlapLocalCliDriver.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.cli;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.cli.control.CliAdapter;
+import org.apache.hadoop.hive.cli.control.CliConfigs;
+import org.apache.hadoop.hive.metastore.ServletSecurity;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hive.ITestsSchemaInfo;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.hive.CatalogUtils;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestRule;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.stream.Stream;
+
+@RunWith(Parameterized.class)
+public class TestIcebergRESTCatalogLlapLocalCliDriver {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ org.apache.hadoop.hive.cli.TestIcebergRESTCatalogLlapLocalCliDriver.class);
+ private static final String CATALOG_NAME = "ice01";
+ private static final CliAdapter adapter = new CliConfigs.TestIcebergRESTCatalogLlapLocalCliDriver().getCliAdapter();
+
+ private final String name;
+ private final File qfile;
+
+ @ClassRule
+ public static final HiveRESTCatalogServerExtension REST_CATALOG_EXTENSION =
+ HiveRESTCatalogServerExtension.builder(ServletSecurity.AuthType.NONE)
+ .addMetaStoreSchemaClassName(ITestsSchemaInfo.class)
+ .build();
+
+ @ClassRule
+ public static final TestRule cliClassRule = adapter.buildClassRule();
+
+ @Rule
+ public final TestRule cliTestRule = adapter.buildTestRule();
+
+ @Parameters(name = "{0}")
+ public static List