Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions iceberg/iceberg-catalog/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -114,5 +114,17 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents.core5</groupId>
<artifactId>httpcore5</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents.core5</groupId>
<artifactId>httpcore5-h2</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.hive;

import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;

public class CatalogUtils {
public static final String NAME = "name";
public static final String LOCATION = "location";
public static final String CATALOG_NAME = "iceberg.catalog";
public static final String CATALOG_CONFIG_PREFIX = "iceberg.catalog.";
public static final String CATALOG_WAREHOUSE_TEMPLATE = "iceberg.catalog.%s.warehouse";
public static final String CATALOG_IMPL_TEMPLATE = "iceberg.catalog.%s.catalog-impl";
public static final String CATALOG_DEFAULT_CONFIG_PREFIX = "iceberg.catalog-default.";
public static final String ICEBERG_HADOOP_TABLE_NAME = "location_based_table";
public static final String ICEBERG_DEFAULT_CATALOG_NAME = "default_iceberg";
public static final String NO_CATALOG_TYPE = "no catalog";
public static final Set<String> PROPERTIES_TO_REMOVE = ImmutableSet.of(
// We don't want to push down the metadata location props to Iceberg from HMS,
// since the snapshot pointer in HMS would always be one step ahead
BaseMetastoreTableOperations.METADATA_LOCATION_PROP,
BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP);

private CatalogUtils() {

}

/**
* Calculates the properties we would like to send to the catalog.
* <ul>
* <li>The base of the properties is the properties stored at the Hive Metastore for the given table
* <li>We add the {@link CatalogUtils#LOCATION} as the table location
* <li>We add the {@link CatalogUtils#NAME} as
* TableIdentifier defined by the database name and table name
* <li>We add the serdeProperties of the HMS table
* <li>We remove some parameters that we don't want to push down to the Iceberg table props
* </ul>
* @param hmsTable Table for which we are calculating the properties
* @return The properties we can provide for Iceberg functions
*/
public static Properties getCatalogProperties(org.apache.hadoop.hive.metastore.api.Table hmsTable) {
Properties properties = new Properties();
properties.putAll(toIcebergProperties(hmsTable.getParameters()));

if (properties.get(LOCATION) == null && hmsTable.getSd() != null &&
hmsTable.getSd().getLocation() != null) {
properties.put(LOCATION, hmsTable.getSd().getLocation());
}

if (properties.get(NAME) == null) {
properties.put(NAME, TableIdentifier.of(hmsTable.getDbName(),
hmsTable.getTableName()).toString());
}

SerDeInfo serdeInfo = hmsTable.getSd().getSerdeInfo();
if (serdeInfo != null) {
properties.putAll(toIcebergProperties(serdeInfo.getParameters()));
}

// Remove HMS table parameters we don't want to propagate to Iceberg
PROPERTIES_TO_REMOVE.forEach(properties::remove);

return properties;
}

private static Properties toIcebergProperties(Map<String, String> parameters) {
Properties properties = new Properties();
parameters.entrySet().stream()
.filter(e -> e.getKey() != null && e.getValue() != null)
.forEach(e -> {
String icebergKey = HMSTablePropertyHelper.translateToIcebergProp(e.getKey());
properties.put(icebergKey, e.getValue());
});
return properties;
}

/**
* Collect all the catalog specific configuration from the global hive configuration.
* @param conf a Hadoop configuration
* @param catalogName name of the catalog
* @return complete map of catalog properties
*/
public static Map<String, String> getCatalogProperties(Configuration conf, String catalogName) {
Map<String, String> catalogProperties = Maps.newHashMap();
String keyPrefix = CATALOG_CONFIG_PREFIX + catalogName;
conf.forEach(config -> {
if (config.getKey().startsWith(CatalogUtils.CATALOG_DEFAULT_CONFIG_PREFIX)) {
catalogProperties.putIfAbsent(
config.getKey().substring(CatalogUtils.CATALOG_DEFAULT_CONFIG_PREFIX.length()),
config.getValue());
} else if (config.getKey().startsWith(keyPrefix)) {
catalogProperties.put(
config.getKey().substring(keyPrefix.length() + 1),
config.getValue());
}
});

return catalogProperties;
}

public static String getCatalogName(Configuration conf) {
return MetastoreConf.getVar(conf, MetastoreConf.ConfVars.CATALOG_DEFAULT);
}

public static String getCatalogType(Configuration conf) {
return getCatalogType(conf, CatalogUtils.getCatalogName(conf));
}

public static boolean isHadoopTable(Configuration conf, Properties catalogProperties) {
String catalogName = catalogProperties.getProperty(CATALOG_NAME);
return ICEBERG_HADOOP_TABLE_NAME.equals(catalogName) || hadoopCatalog(conf, catalogProperties);
}

public static boolean hadoopCatalog(Configuration conf, Properties props) {
return assertCatalogType(conf, props, CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP, CatalogUtil.ICEBERG_CATALOG_HADOOP);
}

/**
* Get Hadoop config key of a catalog property based on catalog name
* @param catalogName catalog name
* @param catalogProperty catalog property, can be any custom property,
* a commonly used list of properties can be found
* at {@link org.apache.iceberg.CatalogProperties}
* @return Hadoop config key of a catalog property for the catalog name
*/
public static String catalogPropertyConfigKey(String catalogName, String catalogProperty) {
return String.format("%s%s.%s", CATALOG_CONFIG_PREFIX, catalogName, catalogProperty);
}

/**
* Return the catalog type based on the catalog name.
* <p>
* See Catalogs documentation for catalog type resolution strategy.
*
* @param conf global hive configuration
* @param catalogName name of the catalog
* @return type of the catalog, can be null
*/
public static String getCatalogType(Configuration conf, String catalogName) {
if (!StringUtils.isEmpty(catalogName)) {
String catalogType = conf.get(catalogPropertyConfigKey(
catalogName, CatalogUtil.ICEBERG_CATALOG_TYPE));
if (catalogName.equals(ICEBERG_HADOOP_TABLE_NAME)) {
return NO_CATALOG_TYPE;
} else {
return catalogType;
}
} else {
String catalogType = conf.get(CatalogUtil.ICEBERG_CATALOG_TYPE);
if (catalogType != null && catalogType.equals(LOCATION)) {
return NO_CATALOG_TYPE;
} else {
return catalogType;
}
}
}

public static String getCatalogImpl(Configuration conf, String catalogName) {
return Optional.ofNullable(catalogName)
.filter(StringUtils::isNotEmpty)
.map(name -> String.format(CatalogUtils.CATALOG_IMPL_TEMPLATE, name))
.map(conf::get)
.orElse(null);
}

public static boolean assertCatalogType(Configuration conf, Properties props, String expectedType,
String expectedImpl) {
String catalogName = props.getProperty(CATALOG_NAME);
String catalogType = Optional.ofNullable(CatalogUtils.getCatalogType(conf, catalogName))
.orElseGet(() -> CatalogUtils.getCatalogType(conf, ICEBERG_DEFAULT_CATALOG_NAME));

if (catalogType != null) {
return expectedType.equalsIgnoreCase(catalogType);
}

String actualImpl = CatalogUtils.getCatalogProperties(conf, catalogName).get(CatalogProperties.CATALOG_IMPL);

// Return true immediately if the strings are equal (this also handles both being null).
if (StringUtils.equals(expectedImpl, actualImpl)) {
return true;
}

// If they are not equal, but one of them is null, they can't be subtypes.
if (expectedImpl == null || actualImpl == null) {
return false;
}

// Now that we know both are non-null and not equal, check the class hierarchy.
try {
return Class.forName(expectedImpl).isAssignableFrom(Class.forName(actualImpl));
} catch (ClassNotFoundException e) {
throw new RuntimeException(String.format("Error checking if catalog %s is subtype of %s",
catalogName, expectedImpl), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,20 @@
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hive.iceberg.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.SortOrderParser;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableProperties;
Expand All @@ -42,6 +45,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.JsonUtil;
import org.apache.parquet.Strings;
import org.apache.parquet.hadoop.ParquetOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -51,6 +55,7 @@
public class HMSTablePropertyHelper {
private static final Logger LOG = LoggerFactory.getLogger(HMSTablePropertyHelper.class);
public static final String HIVE_ICEBERG_STORAGE_HANDLER = "org.apache.iceberg.mr.hive.HiveIcebergStorageHandler";
public static final String PARTITION_SPEC = "iceberg.mr.table.partition.spec";

private static final BiMap<String, String> ICEBERG_TO_HMS_TRANSLATION = ImmutableBiMap.of(
// gc.enabled in Iceberg and external.table.purge in Hive are meant to do the same things
Expand All @@ -63,7 +68,7 @@ private HMSTablePropertyHelper() {

/**
* Provides key translation where necessary between Iceberg and HMS props. This translation is needed because some
* properties control the same behaviour but are named differently in Iceberg and Hive. Therefore changes to these
* properties control the same behaviour but are named differently in Iceberg and Hive. Therefore, changes to these
* property pairs should be synchronized.
*
* Example: Deleting data files upon DROP TABLE is enabled using gc.enabled=true in Iceberg and
Expand Down Expand Up @@ -131,6 +136,12 @@ public static void updateHmsTableForIcebergTable(
tbl.setParameters(parameters);
}

public static SortOrder getSortOrder(Properties props, Schema schema) {
String sortOrderJsonString = props.getProperty(TableProperties.DEFAULT_SORT_ORDER);
return Strings.isNullOrEmpty(sortOrderJsonString) ? SortOrder.unsorted() : SortOrderParser.fromJson(schema,
sortOrderJsonString);
}

private static void setCommonParameters(
String newMetadataLocation,
String uuid,
Expand All @@ -143,8 +154,9 @@ private static void setCommonParameters(
if (uuid != null) {
parameters.put(TableProperties.UUID, uuid);
}

obsoleteProps.forEach(parameters::remove);
if (obsoleteProps != null) {
obsoleteProps.forEach(parameters::remove);
}

parameters.put(BaseMetastoreTableOperations.TABLE_TYPE_PROP, tableType);
parameters.put(BaseMetastoreTableOperations.METADATA_LOCATION_PROP, newMetadataLocation);
Expand All @@ -158,7 +170,7 @@ private static void setCommonParameters(

@VisibleForTesting
static void setStorageHandler(Map<String, String> parameters, boolean hiveEngineEnabled) {
// If needed set the 'storage_handler' property to enable query from Hive
// If needed, set the 'storage_handler' property to enable query from Hive
if (hiveEngineEnabled) {
parameters.put(hive_metastoreConstants.META_TABLE_STORAGE, HIVE_ICEBERG_STORAGE_HANDLER);
} else {
Expand Down Expand Up @@ -209,6 +221,16 @@ static void setPartitionSpec(TableMetadata metadata, Map<String, String> paramet
}
}

public static PartitionSpec getPartitionSpec(Map<String, String> props, Schema schema) {
String specJson = props.getOrDefault(
PARTITION_SPEC,
props.get(TableProperties.DEFAULT_PARTITION_SPEC)
);
return Optional.ofNullable(specJson)
.map(spec -> PartitionSpecParser.fromJson(schema, spec))
.orElseGet(PartitionSpec::unpartitioned);
}

@VisibleForTesting
static void setSortOrder(TableMetadata metadata, Map<String, String> parameters, long maxHiveTablePropertySize) {
parameters.remove(TableProperties.DEFAULT_SORT_ORDER);
Expand Down
Loading
Loading