From d5c2caae53c9a7289369f29ccc9074a145dba93f Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Mon, 18 Nov 2024 11:16:44 +0800 Subject: [PATCH] [feat](catalog)Support Pre-Execution Authentication for HMS Type Iceberg Catalog Operations. (#43445) ### What problem does this PR solve? Support Pre-Execution Authentication for HMS Type Iceberg Catalog Operations Summary This PR introduces a new utility class, PreExecutionAuthenticator, which is designed to ensure pre-execution authentication for HMS (Hive Metastore) type operations on Iceberg catalogs. This is especially useful in environments where secure access is required, such as Kerberos-based Hadoop ecosystems. By integrating PreExecutionAuthenticator, each relevant operation will undergo an authentication step prior to execution, maintaining security compliance. ### Motivation In environments utilizing an Iceberg catalog with an HMS backend, many operations may require authentication to access secure data or perform privileged tasks. Given that operations on HMS-type catalogs typically run within a Hadoop environment secured by Kerberos, ensuring each operation is executed within an authenticated context is essential. Previously, there was no standardized mechanism to enforce pre-execution authentication, which led to potential security gaps. This PR aims to address this issue by introducing an extensible authentication utility. ### Key Changes Addition of PreExecutionAuthenticator Utility Class Provides a standard way to perform pre-execution authentication for tasks. Leverages HadoopAuthenticator (when available) to execute tasks within a privileged context using doAs. Supports execution with or without authentication, enabling flexibility for both secure and non-secure environments. Integration with Iceberg Catalog Operations All relevant HMS-type catalog operations will now use PreExecutionAuthenticator to perform pre-execution authentication. Ensures that operations like createDb, dropDb, and other privileged tasks are executed only after authentication. Extensible Design PreExecutionAuthenticator is adaptable to other future authentication methods, if needed, beyond Hadoop and Kerberos. CallableToPrivilegedExceptionActionAdapter class allows any Callable task to be executed within a PrivilegedExceptionAction, making it versatile for various task types. ### Check List (For Author) - Test - [x] Manual test (add detailed scripts or steps below) ``` mysql> CREATE TABLE ha -> ( -> vendor_id BIGINT, -> trip_id BIGINT, -> trip_distance FLOAT, -> fare_amount DOUBLE, -> store_and_fwd_flag STRING, -> ts DATETIME -> ); Query OK, 0 rows affected (2.08 sec) mysql> show create table ha; +-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | Table | Create Table | +-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | ha | CREATE TABLE `ha` ( `vendor_id` bigint NULL, `trip_id` bigint NULL, `trip_distance` float NULL, `fare_amount` double NULL, `store_and_fwd_flag` text NULL, `ts` datetimev2(6) NULL ) ENGINE=ICEBERG_EXTERNAL_TABLE LOCATION 'xxxxx' PROPERTIES ( "doris.version" = "doris-2.1.6-rc04-67ee7f53e6", "write.parquet.compression-codec" = "zstd" ); mysql> INSERT INTO iceberg.ck_iceberg.ha -> VALUES -> (1, 1000371, 1.8, 15.32, 'N', '2024-01-01 9:15:23'), -> (2, 1000372, 2.5, 22.15, 'N', '2024-01-02 12:10:11'), -> (2, 1000373, 0.9, 9.01, 'N', '2024-01-01 3:25:15'), -> (1, 1000374, 8.4, 42.13, 'Y', '2024-01-03 7:12:33'); Query OK, 4 rows affected (5.10 sec) {'status':'COMMITTED', 'txnId':'35030'} mysql> select * from ha; +-----------+---------+---------------+-------------+--------------------+----------------------------+ | vendor_id | trip_id | trip_distance | fare_amount | store_and_fwd_flag | ts | +-----------+---------+---------------+-------------+--------------------+----------------------------+ | 1 | 1000371 | 1.8 | 15.32 | N | 2024-01-01 09:15:23.000000 | | 2 | 1000372 | 2.5 | 22.15 | N | 2024-01-02 12:10:11.000000 | | 2 | 1000373 | 0.9 | 9.01 | N | 2024-01-01 03:25:15.000000 | | 1 | 1000374 | 8.4 | 42.13 | Y | 2024-01-03 07:12:33.000000 | +-----------+---------+---------------+-------------+--------------------+----------------------------+ 4 rows in set (1.20 sec) ``` --- .../PreExecutionAuthenticator.java | 116 ++++++++++++++++++ .../iceberg/IcebergExternalCatalog.java | 4 + .../iceberg/IcebergHMSExternalCatalog.java | 7 ++ .../iceberg/IcebergMetadataOps.java | 64 +++++++++- .../iceberg/IcebergTransaction.java | 23 ++-- .../iceberg/IcebergTransactionTest.java | 1 + 6 files changed, 204 insertions(+), 11 deletions(-) create mode 100644 fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticator.java diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticator.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticator.java new file mode 100644 index 00000000000000..6260833b7db558 --- /dev/null +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticator.java @@ -0,0 +1,116 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.security.authentication; + +import java.security.PrivilegedExceptionAction; +import java.util.concurrent.Callable; + +/** + * PreExecutionAuthenticator is a utility class that ensures specified tasks + * are executed with necessary authentication, particularly useful for systems + * like Hadoop that require Kerberos-based pre-execution authentication. + * + *

If a HadoopAuthenticator is provided, this class will execute tasks + * within a privileged context using Hadoop's authentication mechanisms + * (such as Kerberos). Otherwise, it will execute tasks normally. + */ +public class PreExecutionAuthenticator { + + private HadoopAuthenticator hadoopAuthenticator; + + /** + * Default constructor for PreExecutionAuthenticator. + * This allows setting the HadoopAuthenticator at a later point if needed. + */ + public PreExecutionAuthenticator() { + } + + /** + * Executes the specified task with necessary authentication. + *

If a HadoopAuthenticator is set, the task will be executed within a + * privileged context using the doAs method. If no authenticator is present, + * the task will be executed directly. + * + * @param task The task to execute, represented as a Callable + * @param The type of the result returned by the task + * @return The result of the executed task + * @throws Exception If an exception occurs during task execution + */ + public T execute(Callable task) throws Exception { + if (hadoopAuthenticator != null) { + // Adapts Callable to PrivilegedExceptionAction for use with Hadoop authentication + PrivilegedExceptionAction action = new CallableToPrivilegedExceptionActionAdapter<>(task); + return hadoopAuthenticator.doAs(action); + } else { + // Executes the task directly if no authentication is needed + return task.call(); + } + } + + /** + * Retrieves the current HadoopAuthenticator. + *

This allows checking if a HadoopAuthenticator is configured or + * changing it at runtime. + * + * @return The current HadoopAuthenticator instance, or null if none is set + */ + public HadoopAuthenticator getHadoopAuthenticator() { + return hadoopAuthenticator; + } + + /** + * Sets the HadoopAuthenticator, enabling pre-execution authentication + * for tasks requiring privileged access. + * + * @param hadoopAuthenticator An instance of HadoopAuthenticator to be used + */ + public void setHadoopAuthenticator(HadoopAuthenticator hadoopAuthenticator) { + this.hadoopAuthenticator = hadoopAuthenticator; + } + + /** + * Adapter class to convert a Callable into a PrivilegedExceptionAction. + *

This is necessary to run the task within a privileged context, + * particularly for Hadoop operations with Kerberos. + * + * @param The type of result returned by the action + */ + public class CallableToPrivilegedExceptionActionAdapter implements PrivilegedExceptionAction { + private final Callable callable; + + /** + * Constructs an adapter that wraps a Callable into a PrivilegedExceptionAction. + * + * @param callable The Callable to be adapted + */ + public CallableToPrivilegedExceptionActionAdapter(Callable callable) { + this.callable = callable; + } + + /** + * Executes the wrapped Callable as a PrivilegedExceptionAction. + * + * @return The result of the callable's call method + * @throws Exception If an exception occurs during callable execution + */ + @Override + public T run() throws Exception { + return callable.call(); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java index 6f79afd5de5d7f..d8dfd1c128f162 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java @@ -17,6 +17,7 @@ package org.apache.doris.datasource.iceberg; +import org.apache.doris.common.security.authentication.PreExecutionAuthenticator; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.InitCatalogLog; import org.apache.doris.datasource.SessionContext; @@ -42,6 +43,8 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog { protected String icebergCatalogType; protected Catalog catalog; + protected PreExecutionAuthenticator preExecutionAuthenticator; + public IcebergExternalCatalog(long catalogId, String name, String comment) { super(catalogId, name, InitCatalogLog.Type.ICEBERG, comment); } @@ -51,6 +54,7 @@ public IcebergExternalCatalog(long catalogId, String name, String comment) { @Override protected void initLocalObjectsImpl() { + preExecutionAuthenticator = new PreExecutionAuthenticator(); initCatalog(); IcebergMetadataOps ops = ExternalMetadataOperations.newIcebergMetadataOps(this, catalog); transactionManager = TransactionManagerFactory.createIcebergTransactionManager(ops); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java index 51d39357b816fa..c5a99c157ce8e6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java @@ -17,6 +17,8 @@ package org.apache.doris.datasource.iceberg; +import org.apache.doris.common.security.authentication.AuthenticationConfig; +import org.apache.doris.common.security.authentication.HadoopAuthenticator; import org.apache.doris.datasource.CatalogProperty; import org.apache.doris.datasource.property.PropertyConverter; @@ -35,6 +37,11 @@ public IcebergHMSExternalCatalog(long catalogId, String name, String resource, M protected void initCatalog() { icebergCatalogType = ICEBERG_HMS; catalog = IcebergUtils.createIcebergHiveCatalog(this, getName()); + if (preExecutionAuthenticator.getHadoopAuthenticator() == null) { + AuthenticationConfig config = AuthenticationConfig.getKerberosConfig(getConfiguration()); + HadoopAuthenticator authenticator = HadoopAuthenticator.getHadoopAuthenticator(config); + preExecutionAuthenticator.setHadoopAuthenticator(authenticator); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java index 8f1c3741c51cf7..2fd62734dc36ce 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java @@ -28,6 +28,7 @@ import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.UserException; +import org.apache.doris.common.security.authentication.PreExecutionAuthenticator; import org.apache.doris.datasource.DorisTypeVisitor; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.ExternalDatabase; @@ -53,11 +54,14 @@ public class IcebergMetadataOps implements ExternalMetadataOps { protected Catalog catalog; protected IcebergExternalCatalog dorisCatalog; protected SupportsNamespaces nsCatalog; + private PreExecutionAuthenticator preExecutionAuthenticator; public IcebergMetadataOps(IcebergExternalCatalog dorisCatalog, Catalog catalog) { this.dorisCatalog = dorisCatalog; this.catalog = catalog; nsCatalog = (SupportsNamespaces) catalog; + this.preExecutionAuthenticator = dorisCatalog.preExecutionAuthenticator; + } public Catalog getCatalog() { @@ -82,9 +86,13 @@ public boolean databaseExist(String dbName) { } public List listDatabaseNames() { - return nsCatalog.listNamespaces().stream() - .map(e -> e.toString()) - .collect(Collectors.toList()); + try { + return preExecutionAuthenticator.execute(() -> nsCatalog.listNamespaces().stream() + .map(Namespace::toString) + .collect(Collectors.toList())); + } catch (Exception e) { + throw new RuntimeException("Failed to list database names, error message is: " + e.getMessage()); + } } @Override @@ -95,6 +103,19 @@ public List listTableNames(String dbName) { @Override public void createDb(CreateDbStmt stmt) throws DdlException { + try { + preExecutionAuthenticator.execute(() -> { + performCreateDb(stmt); + return null; + + }); + } catch (Exception e) { + throw new DdlException("Failed to create database: " + + stmt.getFullDbName() + " ,error message is: " + e.getMessage()); + } + } + + private void performCreateDb(CreateDbStmt stmt) throws DdlException { SupportsNamespaces nsCatalog = (SupportsNamespaces) catalog; String dbName = stmt.getFullDbName(); Map properties = stmt.getProperties(); @@ -109,7 +130,7 @@ public void createDb(CreateDbStmt stmt) throws DdlException { String icebergCatalogType = dorisCatalog.getIcebergCatalogType(); if (!properties.isEmpty() && !IcebergExternalCatalog.ICEBERG_HMS.equals(icebergCatalogType)) { throw new DdlException( - "Not supported: create database with properties for iceberg catalog type: " + icebergCatalogType); + "Not supported: create database with properties for iceberg catalog type: " + icebergCatalogType); } nsCatalog.createNamespace(Namespace.of(dbName), properties); dorisCatalog.onRefreshCache(true); @@ -117,6 +138,17 @@ public void createDb(CreateDbStmt stmt) throws DdlException { @Override public void dropDb(DropDbStmt stmt) throws DdlException { + try { + preExecutionAuthenticator.execute(() -> { + preformDropDb(stmt); + return null; + }); + } catch (Exception e) { + throw new DdlException("Failed to drop database: " + stmt.getDbName() + " ,error message is: ", e); + } + } + + private void preformDropDb(DropDbStmt stmt) throws DdlException { String dbName = stmt.getDbName(); if (!databaseExist(dbName)) { if (stmt.isSetIfExists()) { @@ -133,6 +165,15 @@ public void dropDb(DropDbStmt stmt) throws DdlException { @Override public boolean createTable(CreateTableStmt stmt) throws UserException { + try { + preExecutionAuthenticator.execute(() -> performCreateTable(stmt)); + } catch (Exception e) { + throw new DdlException("Failed to create table: " + stmt.getTableName() + " ,error message is:", e); + } + return false; + } + + public boolean performCreateTable(CreateTableStmt stmt) throws UserException { String dbName = stmt.getDbName(); ExternalDatabase db = dorisCatalog.getDbNullable(dbName); if (db == null) { @@ -165,6 +206,17 @@ public boolean createTable(CreateTableStmt stmt) throws UserException { @Override public void dropTable(DropTableStmt stmt) throws DdlException { + try { + preExecutionAuthenticator.execute(() -> { + performDropTable(stmt); + return null; + }); + } catch (Exception e) { + throw new DdlException("Failed to drop table: " + stmt.getTableName() + " ,error message is:", e); + } + } + + private void performDropTable(DropTableStmt stmt) throws DdlException { String dbName = stmt.getDbName(); String tableName = stmt.getTableName(); ExternalDatabase db = dorisCatalog.getDbNullable(dbName); @@ -193,4 +245,8 @@ public void dropTable(DropTableStmt stmt) throws DdlException { public void truncateTable(String dbName, String tblName, List partitions) { throw new UnsupportedOperationException("Truncate Iceberg table is not supported."); } + + public PreExecutionAuthenticator getPreExecutionAuthenticator() { + return preExecutionAuthenticator; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java index c198b58b2a96bd..685915025d665e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java @@ -82,14 +82,23 @@ public void finishInsert(SimpleTableInfo tableInfo, Optional { + //create and start the iceberg transaction + TUpdateMode updateMode = TUpdateMode.APPEND; + if (insertCtx.isPresent()) { + updateMode = ((BaseExternalTableInsertCommandContext) insertCtx.get()).isOverwrite() + ? TUpdateMode.OVERWRITE + : TUpdateMode.APPEND; + } + updateManifestAfterInsert(updateMode); + return null; + }); + } catch (Exception e) { + LOG.warn("Failed to finish insert for iceberg table {}.", tableInfo, e); + throw new RuntimeException(e); } - updateManifestAfterInsert(updateMode); + } private void updateManifestAfterInsert(TUpdateMode updateMode) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java index 66c3ea197101d1..79f7d5b5ad6555 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java @@ -87,6 +87,7 @@ private void createCatalog() throws IOException { hadoopCatalog.setConf(new Configuration()); hadoopCatalog.initialize("df", props); this.externalCatalog = new IcebergHMSExternalCatalog(1L, "iceberg", "", Maps.newHashMap(), ""); + externalCatalog.initLocalObjectsImpl(); new MockUp() { @Mock public Catalog getCatalog() {