Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
-- Currently docker is hive 2.x version. Hive 2.x versioned full-acid tables need to run major compaction.
SET hive.support.concurrency=true;
SET hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;

Expand All @@ -25,8 +24,6 @@ insert into orc_full_acid values

update orc_full_acid set value = 'CC' where id = 3;

alter table orc_full_acid compact 'major';

create table orc_full_acid_par (id INT, value STRING)
PARTITIONED BY (part_col INT)
CLUSTERED BY (id) INTO 3 BUCKETS
Expand All @@ -44,7 +41,3 @@ insert into orc_full_acid_par PARTITION(part_col=20230102) values
(6, 'F');

update orc_full_acid_par set value = 'BB' where id = 2;

alter table orc_full_acid_par PARTITION(part_col=20230101) compact 'major';
alter table orc_full_acid_par PARTITION(part_col=20230102) compact 'major';

Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,7 @@ public LoadingCache<PartitionCacheKey, HivePartition> getPartitionCache() {
}

public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions, ValidWriteIdList validWriteIds,
boolean isFullAcid, long tableId, String bindBrokerName) {
boolean isFullAcid, boolean skipCheckingAcidVersionFile, long tableId, String bindBrokerName) {
List<FileCacheValue> fileCacheValues = Lists.newArrayList();
String remoteUser = jobConf.get(AuthenticationConfig.HADOOP_USER_NAME);
try {
Expand Down Expand Up @@ -778,25 +778,27 @@ public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions
if (baseOrDeltaPath == null) {
return Collections.emptyList();
}
String acidVersionPath = new Path(baseOrDeltaPath, "_orc_acid_version").toUri().toString();
RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(
LocationPath.getFSIdentity(baseOrDeltaPath.toUri().toString(),
bindBrokerName),
catalog.getCatalogProperty().getProperties(),
bindBrokerName, jobConf));
Status status = fs.exists(acidVersionPath);
if (status != Status.OK) {
if (status.getErrCode() == ErrCode.NOT_FOUND) {
acidVersion = 0;
} else {
throw new Exception(String.format("Failed to check remote path {} exists.",
acidVersionPath));
if (!skipCheckingAcidVersionFile) {
String acidVersionPath = new Path(baseOrDeltaPath, "_orc_acid_version").toUri().toString();
RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(
LocationPath.getFSIdentity(baseOrDeltaPath.toUri().toString(),
bindBrokerName),
catalog.getCatalogProperty().getProperties(),
bindBrokerName, jobConf));
Status status = fs.exists(acidVersionPath);
if (status != Status.OK) {
if (status.getErrCode() == ErrCode.NOT_FOUND) {
acidVersion = 0;
} else {
throw new Exception(String.format("Failed to check remote path {} exists.",
acidVersionPath));
}
}
if (acidVersion == 0 && !directory.getCurrentDirectories().isEmpty()) {
throw new Exception(
"Hive 2.x versioned full-acid tables need to run major compaction.");
}
}
if (acidVersion == 0 && !directory.getCurrentDirectories().isEmpty()) {
throw new Exception(
"Hive 2.x versioned full-acid tables need to run major compaction.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ public class HiveScanNode extends FileQueryScanNode {
private final Semaphore splittersOnFlight = new Semaphore(NUM_SPLITTERS_ON_FLIGHT);
private final AtomicInteger numSplitsPerPartition = new AtomicInteger(NUM_SPLITS_PER_PARTITION);

private boolean skipCheckingAcidVersionFile = false;

/**
* * External file scan node for Query Hive table
* needCheckColumnPriv: Some of ExternalFileScanNode do not need to check column priv
Expand Down Expand Up @@ -118,6 +120,7 @@ protected void doInitialize() throws UserException {
this.hiveTransaction = new HiveTransaction(DebugUtil.printId(ConnectContext.get().queryId()),
ConnectContext.get().getQualifiedUser(), hmsTable, hmsTable.isFullAcidTable());
Env.getCurrentHiveTransactionMgr().register(hiveTransaction);
skipCheckingAcidVersionFile = ConnectContext.get().getSessionVariable().skipCheckingAcidVersionFile;
}
}

Expand Down Expand Up @@ -374,7 +377,7 @@ private List<FileCacheValue> getFileSplitByTransaction(HiveMetaStoreCache cache,
ValidWriteIdList validWriteIds = hiveTransaction.getValidWriteIds(
((HMSExternalCatalog) hmsTable.getCatalog()).getClient());
return cache.getFilesByTransaction(partitions, validWriteIds,
hiveTransaction.isFullAcid(), hmsTable.getId(), bindBrokerName);
hiveTransaction.isFullAcid(), skipCheckingAcidVersionFile, hmsTable.getId(), bindBrokerName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,8 @@ public class SessionVariable implements Serializable, Writable {

public static final String REQUIRE_SEQUENCE_IN_INSERT = "require_sequence_in_insert";

public static final String SKIP_CHECKING_ACID_VERSION_FILE = "skip_checking_acid_version_file";

/**
* If set false, user couldn't submit analyze SQL and FE won't allocate any related resources.
*/
Expand Down Expand Up @@ -2165,6 +2167,13 @@ public void setIgnoreShapePlanNodes(String ignoreShapePlanNodes) {
})
public boolean requireSequenceInInsert = true;


@VariableMgr.VarAttr(name = SKIP_CHECKING_ACID_VERSION_FILE, needForward = true, description = {
"跳过检查 transactional hive 版本文件 '_orc_acid_version.'",
"Skip checking transactional hive version file '_orc_acid_version.'"
})
public boolean skipCheckingAcidVersionFile = false;

public void setEnableEsParallelScroll(boolean enableESParallelScroll) {
this.enableESParallelScroll = enableESParallelScroll;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
// under the License.

suite("test_transactional_hive", "p0,external,hive,external_docker,external_docker_hive") {
String skip_checking_acid_version_file = "false"

def q01 = {
sql """set skip_checking_acid_version_file=${skip_checking_acid_version_file}"""
qt_q01 """
select * from orc_full_acid order by id;
"""
Expand All @@ -32,6 +35,7 @@ suite("test_transactional_hive", "p0,external,hive,external_docker,external_dock
}

def q01_par = {
sql """set skip_checking_acid_version_file=${skip_checking_acid_version_file}"""
qt_q01 """
select * from orc_full_acid_par order by id;
"""
Expand All @@ -54,7 +58,7 @@ suite("test_transactional_hive", "p0,external,hive,external_docker,external_dock
return;
}

for (String hivePrefix : ["hive2", "hive3"]) {
for (String hivePrefix : ["hive3"]) {
try {
String hms_port = context.config.otherConfigs.get(hivePrefix + "HmsPort")
String catalog_name = "test_transactional_${hivePrefix}"
Expand All @@ -67,6 +71,11 @@ suite("test_transactional_hive", "p0,external,hive,external_docker,external_dock
);"""
sql """use `${catalog_name}`.`default`"""

skip_checking_acid_version_file = "false"
q01()
q01_par()

skip_checking_acid_version_file = "true"
q01()
q01_par()

Expand Down