Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,10 @@ public Status listDirectories(String remotePath, Set<String> result) {
try {
S3URI s3Uri = S3URI.create(remotePath, isUsePathStyle, forceParsingByStandardUri);
String bucket = s3Uri.getBucket();
String prefix = s3Uri.getKey();
String key = s3Uri.getKey();
String schemeAndBucket = remotePath.substring(0, remotePath.length() - key.length());

String prefix = key.endsWith("/") ? key : key + "/";
ListObjectsV2Request.Builder requestBuilder = ListObjectsV2Request.builder()
.bucket(bucket)
.prefix(prefix)
Expand All @@ -228,7 +231,7 @@ public Status listDirectories(String remotePath, Set<String> result) {
ListObjectsV2Response response = getClient().listObjectsV2(requestBuilder.build());

for (CommonPrefix dir : response.commonPrefixes()) {
result.add("s3://" + bucket + "/" + dir.prefix());
result.add(schemeAndBucket + dir.prefix());
}
continuationToken = response.nextContinuationToken();
} while (continuationToken != null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,103 @@ suite("hive_on_hms_and_dlf", "p2,external,new_catalog_property") {
assert dropResult.size() == 0
}

/*--------test insert overwrite---------*/
def testInsertOverwrite = { String catalogProperties, String prefix, String dbLocation ->
def catalog_name = "${prefix}_catalog"
sql """
DROP CATALOG IF EXISTS ${catalog_name};
"""
sql """
CREATE CATALOG IF NOT EXISTS ${catalog_name} PROPERTIES (
${catalogProperties}
);
"""
sql """
switch ${catalog_name};
"""

def db_name = prefix + "_db" + System.currentTimeMillis() + ThreadLocalRandom.current().nextInt(1000)
sql """
DROP DATABASE IF EXISTS ${db_name} FORCE;
"""
sql """
CREATE DATABASE IF NOT EXISTS ${db_name}
PROPERTIES ('location'='${dbLocation}');
"""

def dbResult = sql """
show databases like "${db_name}";
"""
assert dbResult.size() == 1

sql """
use ${db_name};
"""

def table_name = prefix + ThreadLocalRandom.current().nextInt(1000) + "_overwrite_table"

// Create non-partitioned table for insert overwrite test
sql """
CREATE TABLE ${table_name} (
id INT COMMENT 'id',
name VARCHAR(20) COMMENT 'name',
age INT COMMENT 'age'
) ENGINE=hive
PROPERTIES (
'file_format'='parquet'
);
"""

// Test 1: Initial insert
sql """
insert into ${table_name} values (1, 'alice', 20), (2, 'bob', 25);
"""
def result1 = sql """
SELECT COUNT(*) FROM ${table_name};
"""
assert result1[0][0] == 2

// Test 2: Insert overwrite - should replace all data
sql """
insert overwrite table ${table_name} values (3, 'charlie', 30);
"""
def result2 = sql """
SELECT * FROM ${table_name};
"""
assert result2.size() == 1
assert result2[0][0] == 3

// Test 3: Another insert overwrite with multiple rows
sql """
insert overwrite table ${table_name} values (4, 'david', 35), (5, 'eve', 28), (6, 'frank', 40);
"""
def result3 = sql """
SELECT COUNT(*) FROM ${table_name};
"""
assert result3[0][0] == 3

// Test 4: Verify data integrity after overwrite
def result4 = sql """
SELECT * FROM ${table_name} ORDER BY id;
"""
assert result4.size() == 3
assert result4[0][0] == 4
assert result4[1][0] == 5
assert result4[2][0] == 6

sql """
DROP TABLE ${table_name};
"""
sql """
DROP DATABASE ${db_name} FORCE;
"""

def dropResult = sql """
show databases like "${db_name}";
"""
assert dropResult.size() == 0
}

/*--------only execute query---------*/
def testQuery = { String catalog_properties, String prefix, String db_name, String table_name, int data_count ->

Expand Down Expand Up @@ -371,15 +468,21 @@ suite("hive_on_hms_and_dlf", "p2,external,new_catalog_property") {
//OBS - Partition table tests
db_location = "obs://${obs_parent_path}/hive/hms/partition/" + System.currentTimeMillis()
testPartitionTableInsert(hms_properties + obs_storage_properties, "hive_hms_obs_partition_test", db_location)
//OBS - Insert overwrite tests (verifies scheme preservation in listDirectories)
db_location = "obs://${obs_parent_path}/hive/hms/overwrite/" + System.currentTimeMillis()
testInsertOverwrite(hms_properties + obs_storage_properties, "hive_hms_obs_overwrite_test", db_location)
//GCS
if(context.config.otherConfigs.get("enableGCS")){
db_location = "gs://${gcs_parent_path}/hive/hms/" + System.currentTimeMillis()
testQueryAndInsert(hms_properties + gcs_storage_old_properties, "hive_hms_gcs_test", db_location)
testQueryAndInsert(hms_properties + gcs_storage_new_properties, "hive_hms_gcs_test_new", db_location)
testQueryAndInsert(hms_type_properties + hms_kerberos_old_prop + gcs_storage_old_properties, "hive_hms_on_gcs_kerberos_old", db_location)
testQueryAndInsert(hms_type_properties + hms_kerberos_new_prop + gcs_storage_new_properties, "hive_hms_on_gcs_kerberos_new", db_location)
//GCS - Insert overwrite tests
db_location = "gs://${gcs_parent_path}/hive/hms/overwrite/" + System.currentTimeMillis()
testInsertOverwrite(hms_properties + gcs_storage_new_properties, "hive_hms_gcs_overwrite_test", db_location)
}

//COS
db_location = "cosn://${cos_parent_path}/hive/hms/" + System.currentTimeMillis()
testQueryAndInsert(hms_properties + cos_storage_properties, "hive_hms_cos_test", db_location)
Expand All @@ -390,6 +493,9 @@ suite("hive_on_hms_and_dlf", "p2,external,new_catalog_property") {
//COS - Partition table tests
db_location = "cosn://${cos_parent_path}/hive/hms/partition/" + System.currentTimeMillis()
testPartitionTableInsert(hms_properties + cos_storage_properties, "hive_hms_cos_partition_test", db_location)
//COS - Insert overwrite tests
db_location = "cosn://${cos_parent_path}/hive/hms/overwrite/" + System.currentTimeMillis()
testInsertOverwrite(hms_properties + cos_storage_properties, "hive_hms_cos_overwrite_test", db_location)

db_location = "cos://${cos_parent_path}/hive/hms/" + System.currentTimeMillis()
testQueryAndInsert(hms_properties + cos_storage_properties, "hive_hms_cos_test", db_location)
Expand All @@ -405,9 +511,12 @@ suite("hive_on_hms_and_dlf", "p2,external,new_catalog_property") {
db_location = "oss://${oss_parent_path}/hive/hms/partition/" + System.currentTimeMillis()
testPartitionTableInsert(hms_properties + oss_storage_properties, "hive_hms_oss_partition_test", db_location)
testPartitionTableInsert(hms_properties + oss_region_param + oss_storage_properties, "hive_hms_oss_partition_test_region", db_location)
//OSS - Insert overwrite tests
db_location = "oss://${oss_parent_path}/hive/hms/overwrite/" + System.currentTimeMillis()
testInsertOverwrite(hms_properties + oss_storage_properties, "hive_hms_oss_overwrite_test", db_location)

//s3
db_location = "s3a://${s3_parent_path}/hive/hms/"+System.currentTimeMillis()
db_location = "s3a://${s3_parent_path}/hive/hms/"+System.currentTimeMillis()
testQueryAndInsert(hms_properties + s3_storage_properties, "hive_hms_s3_test", db_location)
db_location = "s3a://${s3_parent_path}/hive/hms/"+System.currentTimeMillis()
testQueryAndInsert(hms_properties + s3_region_param + s3_storage_properties, "hive_hms_s3_test_region", db_location)
Expand All @@ -419,6 +528,9 @@ suite("hive_on_hms_and_dlf", "p2,external,new_catalog_property") {
//testQueryAndInsert(hms_type_properties + hms_kerberos_new_prop + s3_storage_properties, "hive_hms_on_s3_kerberos_new",db_location)
db_location = "s3://${s3_parent_path}/hive/hms/"+System.currentTimeMillis()
testQueryAndInsert(hms_properties + s3_storage_properties, "hive_hms_s3_test", db_location)
//S3 - Insert overwrite tests
db_location = "s3://${s3_parent_path}/hive/hms/overwrite/"+System.currentTimeMillis()
testInsertOverwrite(hms_properties + s3_storage_properties, "hive_hms_s3_overwrite_test", db_location)
//HDFS
db_location = "${hdfs_parent_path}/hive/hms/" + System.currentTimeMillis()
testQueryAndInsert(hms_properties + hdfs_properties, "hive_hms_hdfs_test", db_location)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,178 @@ suite("iceberg_and_hive_on_glue", "p2,external,hive,new_catalog_property") {
assert dropResult.size() == 0
}

/*--------test insert overwrite for hive---------*/
def testInsertOverwrite = { String catalogProperties, String prefix, String dbLocation ->
def catalog_name = "${prefix}_catalog"
sql """
DROP CATALOG IF EXISTS ${catalog_name};
"""
sql """
CREATE CATALOG IF NOT EXISTS ${catalog_name} PROPERTIES (
${catalogProperties}
);
"""
sql """
switch ${catalog_name};
"""

def db_name = prefix + "_db" + System.currentTimeMillis()
sql """
DROP DATABASE IF EXISTS ${db_name} FORCE;
"""
sql """
CREATE DATABASE IF NOT EXISTS ${db_name}
PROPERTIES ('location'='${dbLocation}');
"""

def dbResult = sql """
show databases like "${db_name}";
"""
assert dbResult.size() == 1

sql """
use ${db_name};
"""

def table_name = prefix + "_overwrite_table"

// Create non-partitioned table for insert overwrite test
sql """
CREATE TABLE ${table_name} (
id INT COMMENT 'id',
name VARCHAR(20) COMMENT 'name',
age INT COMMENT 'age'
) ENGINE=hive
PROPERTIES (
'file_format'='parquet'
);
"""

// Test 1: Initial insert
sql """
insert into ${table_name} values (1, 'alice', 20), (2, 'bob', 25);
"""
def result1 = sql """
SELECT COUNT(*) FROM ${table_name};
"""
assert result1[0][0] == 2

// Test 2: Insert overwrite - should replace all data
sql """
insert overwrite table ${table_name} values (3, 'charlie', 30);
"""
def result2 = sql """
SELECT * FROM ${table_name};
"""
assert result2.size() == 1
assert result2[0][0] == 3

// Test 3: Another insert overwrite with multiple rows
sql """
insert overwrite table ${table_name} values (4, 'david', 35), (5, 'eve', 28), (6, 'frank', 40);
"""
def result3 = sql """
SELECT COUNT(*) FROM ${table_name};
"""
assert result3[0][0] == 3

sql """
DROP TABLE ${table_name};
"""
sql """
DROP DATABASE ${db_name} FORCE;
"""

def dropResult = sql """
show databases like "${db_name}";
"""
assert dropResult.size() == 0
}

/*--------test insert overwrite for iceberg---------*/
def testInsertOverwriteIceberg = { String catalogProperties, String prefix ->
def catalog_name = "${prefix}_catalog"
sql """
DROP CATALOG IF EXISTS ${catalog_name};
"""
sql """
CREATE CATALOG IF NOT EXISTS ${catalog_name} PROPERTIES (
${catalogProperties}
);
"""
sql """
switch ${catalog_name};
"""

def db_name = prefix + "_db"
sql """
DROP DATABASE IF EXISTS ${db_name} FORCE;
"""
sql """
CREATE DATABASE IF NOT EXISTS ${db_name};
"""

def dbResult = sql """
show databases like "${db_name}";
"""
assert dbResult.size() == 1

sql """
use ${db_name};
"""

def table_name = prefix + "_overwrite_table"

// Create table for insert overwrite test
sql """
CREATE TABLE ${table_name} (
id INT NOT NULL COMMENT 'id',
name VARCHAR(20) COMMENT 'name',
age INT COMMENT 'age'
);
"""

// Test 1: Initial insert
sql """
insert into ${table_name} values (1, 'alice', 20), (2, 'bob', 25);
"""
def result1 = sql """
SELECT COUNT(*) FROM ${table_name};
"""
assert result1[0][0] == 2

// Test 2: Insert overwrite - should replace all data
sql """
insert overwrite table ${table_name} values (3, 'charlie', 30);
"""
def result2 = sql """
SELECT * FROM ${table_name};
"""
assert result2.size() == 1
assert result2[0][0] == 3

// Test 3: Another insert overwrite with multiple rows
sql """
insert overwrite table ${table_name} values (4, 'david', 35), (5, 'eve', 28), (6, 'frank', 40);
"""
def result3 = sql """
SELECT COUNT(*) FROM ${table_name};
"""
assert result3[0][0] == 3

sql """
DROP TABLE ${table_name};
"""
sql """
DROP DATABASE ${db_name} FORCE;
"""

def dropResult = sql """
show databases like "${db_name}";
"""
assert dropResult.size() == 0
}

/*--------only execute query---------*/
def testQuery = { String catalog_properties, String prefix, String db_name, String table_name, int data_count ->

Expand Down Expand Up @@ -223,4 +395,11 @@ suite("iceberg_and_hive_on_glue", "p2,external,hive,new_catalog_property") {
testQueryAndInsertIcerberg(warehouse_location + iceberg_glue_catalog_base_properties + glue_properties_1, "iceberg_glue_on_s3")
testQueryAndInsertIcerberg(warehouse_location + iceberg_glue_catalog_base_properties + glue_properties_2, "iceberg_glue_on_s3")
testQueryAndInsertIcerberg(warehouse_location + iceberg_glue_catalog_base_properties + glue_properties_3, "iceberg_glue_on_s3")

// Iceberg - Insert overwrite tests
testInsertOverwriteIceberg(warehouse_location + iceberg_glue_catalog_base_properties + glue_properties_3, "iceberg_glue_overwrite_on_s3")

// Hive on Glue - Insert overwrite tests
def db_location = "${s3_warehouse}hive-glue-s3-warehouse/hive-overwrite/" + System.currentTimeMillis()
testInsertOverwrite(hms_glue_catalog_base_properties + glue_properties_3, "hive_glue_overwrite_on_s3", db_location)
}
Loading