From e142a2db0c52826f3db73937ad2b33a37449d88d Mon Sep 17 00:00:00 2001 From: Qi Chen Date: Mon, 21 Oct 2024 23:22:05 +0800 Subject: [PATCH 1/2] [Configuration](transactional-hive) Add `skip_checking_acid_version_file` session var to skip checking acid version file in some hive envs. (#42111) [Configuration] (transactional-hive) Add `skip_checking_acid_version_file` session var to skip checking acid version file in some hive envs. --- .../create_preinstalled_scripts/run25.hql | 7 ---- .../datasource/hive/HiveMetaStoreCache.java | 40 ++++++++++--------- .../datasource/hive/source/HiveScanNode.java | 5 ++- .../org/apache/doris/qe/SessionVariable.java | 15 +++++++ .../hive/test_transactional_hive.groovy | 11 ++++- 5 files changed, 50 insertions(+), 28 deletions(-) diff --git a/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_scripts/run25.hql b/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_scripts/run25.hql index 2cf7197de95775..814df4cdc5ff90 100755 --- a/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_scripts/run25.hql +++ b/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_scripts/run25.hql @@ -1,4 +1,3 @@ --- Currently docker is hive 2.x version. Hive 2.x versioned full-acid tables need to run major compaction. SET hive.support.concurrency=true; SET hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; @@ -25,8 +24,6 @@ insert into orc_full_acid values update orc_full_acid set value = 'CC' where id = 3; -alter table orc_full_acid compact 'major'; - create table orc_full_acid_par (id INT, value STRING) PARTITIONED BY (part_col INT) CLUSTERED BY (id) INTO 3 BUCKETS @@ -44,7 +41,3 @@ insert into orc_full_acid_par PARTITION(part_col=20230102) values (6, 'F'); update orc_full_acid_par set value = 'BB' where id = 2; - -alter table orc_full_acid_par PARTITION(part_col=20230101) compact 'major'; -alter table orc_full_acid_par PARTITION(part_col=20230102) compact 'major'; - diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index bae5978cffb682..8a300c1e579dfa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -738,7 +738,7 @@ public LoadingCache getPartitionCache() { } public List getFilesByTransaction(List partitions, ValidWriteIdList validWriteIds, - boolean isFullAcid, long tableId, String bindBrokerName) { + boolean isFullAcid, boolean skipCheckingAcidVersionFile, long tableId, String bindBrokerName) { List fileCacheValues = Lists.newArrayList(); String remoteUser = jobConf.get(AuthenticationConfig.HADOOP_USER_NAME); try { @@ -772,25 +772,27 @@ public List getFilesByTransaction(List partitions if (baseOrDeltaPath == null) { return Collections.emptyList(); } - String acidVersionPath = new Path(baseOrDeltaPath, "_orc_acid_version").toUri().toString(); - RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( - new FileSystemCache.FileSystemCacheKey( - LocationPath.getFSIdentity(baseOrDeltaPath.toUri().toString(), - bindBrokerName), - catalog.getCatalogProperty().getProperties(), - bindBrokerName, jobConf)); - Status status = fs.exists(acidVersionPath); - if (status != Status.OK) { - if (status.getErrCode() == ErrCode.NOT_FOUND) { - acidVersion = 0; - } else { - throw new Exception(String.format("Failed to check remote path {} exists.", - acidVersionPath)); + if (!skipCheckingAcidVersionFile) { + String acidVersionPath = new Path(baseOrDeltaPath, "_orc_acid_version").toUri().toString(); + RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( + new FileSystemCache.FileSystemCacheKey( + LocationPath.getFSIdentity(baseOrDeltaPath.toUri().toString(), + bindBrokerName), + catalog.getCatalogProperty().getProperties(), + bindBrokerName, jobConf)); + Status status = fs.exists(acidVersionPath); + if (status != Status.OK) { + if (status.getErrCode() == ErrCode.NOT_FOUND) { + acidVersion = 0; + } else { + throw new Exception(String.format("Failed to check remote path {} exists.", + acidVersionPath)); + } + } + if (acidVersion == 0 && !directory.getCurrentDirectories().isEmpty()) { + throw new Exception( + "Hive 2.x versioned full-acid tables need to run major compaction."); } - } - if (acidVersion == 0 && !directory.getCurrentDirectories().isEmpty()) { - throw new Exception( - "Hive 2.x versioned full-acid tables need to run major compaction."); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java index 1dbcd5064f30ef..f17de4bfe0a116 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java @@ -90,6 +90,8 @@ public class HiveScanNode extends FileQueryScanNode { private final Semaphore splittersOnFlight = new Semaphore(NUM_SPLITTERS_ON_FLIGHT); private final AtomicInteger numSplitsPerPartition = new AtomicInteger(NUM_SPLITS_PER_PARTITION); + private boolean skipCheckingAcidVersionFile = false; + /** * * External file scan node for Query Hive table * needCheckColumnPriv: Some of ExternalFileScanNode do not need to check column priv @@ -117,6 +119,7 @@ protected void doInitialize() throws UserException { this.hiveTransaction = new HiveTransaction(DebugUtil.printId(ConnectContext.get().queryId()), ConnectContext.get().getQualifiedUser(), hmsTable, hmsTable.isFullAcidTable()); Env.getCurrentHiveTransactionMgr().register(hiveTransaction); + skipCheckingAcidVersionFile = ConnectContext.get().getSessionVariable().skipCheckingAcidVersionFile; } } @@ -343,7 +346,7 @@ private List getFileSplitByTransaction(HiveMetaStoreCache cache, ValidWriteIdList validWriteIds = hiveTransaction.getValidWriteIds( ((HMSExternalCatalog) hmsTable.getCatalog()).getClient()); return cache.getFilesByTransaction(partitions, validWriteIds, - hiveTransaction.isFullAcid(), hmsTable.getId(), bindBrokerName); + hiveTransaction.isFullAcid(), skipCheckingAcidVersionFile, hmsTable.getId(), bindBrokerName); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index b817125f6810c4..5b2971125a2030 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -642,6 +642,15 @@ public class SessionVariable implements Serializable, Writable { "adaptive_pipeline_task_serial_read_on_limit"; public static final String REQUIRE_SEQUENCE_IN_INSERT = "require_sequence_in_insert"; +<<<<<<< HEAD +======= + public static final String ENABLE_PHRASE_QUERY_SEQUENYIAL_OPT = "enable_phrase_query_sequential_opt"; + + public static final String ENABLE_COOLDOWN_REPLICA_AFFINITY = + "enable_cooldown_replica_affinity"; + public static final String SKIP_CHECKING_ACID_VERSION_FILE = "skip_checking_acid_version_file"; + +>>>>>>> f46946eef2 ([Configuration](transactional-hive) Add `skip_checking_acid_version_file` session var to skip checking acid version file in some hive envs. (#42111)) /** * If set false, user couldn't submit analyze SQL and FE won't allocate any related resources. */ @@ -2111,6 +2120,12 @@ public void setIgnoreShapePlanNodes(String ignoreShapePlanNodes) { }) public boolean requireSequenceInInsert = true; + @VariableMgr.VarAttr(name = SKIP_CHECKING_ACID_VERSION_FILE, needForward = true, description = { + "跳过检查 transactional hive 版本文件 '_orc_acid_version.'", + "Skip checking transactional hive version file '_orc_acid_version.'" + }) + public boolean skipCheckingAcidVersionFile = false; + public void setEnableEsParallelScroll(boolean enableESParallelScroll) { this.enableESParallelScroll = enableESParallelScroll; } diff --git a/regression-test/suites/external_table_p0/hive/test_transactional_hive.groovy b/regression-test/suites/external_table_p0/hive/test_transactional_hive.groovy index 305c1f6615b2c5..dbe20395ec95ec 100644 --- a/regression-test/suites/external_table_p0/hive/test_transactional_hive.groovy +++ b/regression-test/suites/external_table_p0/hive/test_transactional_hive.groovy @@ -16,7 +16,10 @@ // under the License. suite("test_transactional_hive", "p0,external,hive,external_docker,external_docker_hive") { + String skip_checking_acid_version_file = "false" + def q01 = { + sql """set skip_checking_acid_version_file=${skip_checking_acid_version_file}""" qt_q01 """ select * from orc_full_acid order by id; """ @@ -32,6 +35,7 @@ suite("test_transactional_hive", "p0,external,hive,external_docker,external_dock } def q01_par = { + sql """set skip_checking_acid_version_file=${skip_checking_acid_version_file}""" qt_q01 """ select * from orc_full_acid_par order by id; """ @@ -54,7 +58,7 @@ suite("test_transactional_hive", "p0,external,hive,external_docker,external_dock return; } - for (String hivePrefix : ["hive2", "hive3"]) { + for (String hivePrefix : ["hive3"]) { try { String hms_port = context.config.otherConfigs.get(hivePrefix + "HmsPort") String catalog_name = "test_transactional_${hivePrefix}" @@ -67,6 +71,11 @@ suite("test_transactional_hive", "p0,external,hive,external_docker,external_dock );""" sql """use `${catalog_name}`.`default`""" + skip_checking_acid_version_file = "false" + q01() + q01_par() + + skip_checking_acid_version_file = "true" q01() q01_par() From 9c66a2439292d3dbf3d16c6a14869c64bba4d449 Mon Sep 17 00:00:00 2001 From: morningman Date: Wed, 30 Oct 2024 23:04:21 +0800 Subject: [PATCH 2/2] 1 --- .../src/main/java/org/apache/doris/qe/SessionVariable.java | 7 ------- 1 file changed, 7 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 5b2971125a2030..9946108bec5c95 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -642,15 +642,8 @@ public class SessionVariable implements Serializable, Writable { "adaptive_pipeline_task_serial_read_on_limit"; public static final String REQUIRE_SEQUENCE_IN_INSERT = "require_sequence_in_insert"; -<<<<<<< HEAD -======= - public static final String ENABLE_PHRASE_QUERY_SEQUENYIAL_OPT = "enable_phrase_query_sequential_opt"; - - public static final String ENABLE_COOLDOWN_REPLICA_AFFINITY = - "enable_cooldown_replica_affinity"; public static final String SKIP_CHECKING_ACID_VERSION_FILE = "skip_checking_acid_version_file"; ->>>>>>> f46946eef2 ([Configuration](transactional-hive) Add `skip_checking_acid_version_file` session var to skip checking acid version file in some hive envs. (#42111)) /** * If set false, user couldn't submit analyze SQL and FE won't allocate any related resources. */