Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 23 additions & 9 deletions fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,13 @@ private void generateExportJobExecutor() {
ExportTaskExecutor executor = new ExportTaskExecutor(selectStmts, this);
jobExecutorList.add(executor);
}

// add empty task to make export job could be finished finally if jobExecutorList is empty
// which means that export table without data
if (jobExecutorList.isEmpty()) {
ExportTaskExecutor executor = new ExportTaskExecutor(Lists.newArrayList(), this);
jobExecutorList.add(executor);
}
}

/**
Expand Down Expand Up @@ -511,15 +518,23 @@ private List<List<List<Long>>> splitTablets() throws UserException {
// get partitions
// user specifies partitions, already checked in ExportCommand
if (!this.partitionNames.isEmpty()) {
this.partitionNames.forEach(partitionName -> partitions.add(table.getPartition(partitionName)));
this.partitionNames.forEach(partitionName -> {
Partition partition = table.getPartition(partitionName);
if (partition.hasData()) {
partitions.add(partition);
}
});
} else {
if (table.getPartitions().size() > Config.maximum_number_of_export_partitions) {
throw new UserException("The partitions number of this export job is larger than the maximum number"
+ " of partitions allowed by a export job");
}
partitions.addAll(table.getPartitions());
table.getPartitions().forEach(partition -> {
if (partition.hasData()) {
partitions.add(partition);
}
});
}
if (partitions.size() > Config.maximum_number_of_export_partitions) {
throw new UserException("The partitions number of this export job is larger than the maximum number"
+ " of partitions allowed by a export job");
}

// get tablets
for (Partition partition : partitions) {
// Partition data consistency is not need to verify partition version.
Expand Down Expand Up @@ -589,8 +604,7 @@ private List<List<List<Long>>> splitTablets() throws UserException {
List<Long> tabletsList = new ArrayList<>(flatTabletIdList.subList(start, start + tabletsNum));
List<List<Long>> tablets = new ArrayList<>();
for (int i = 0; i < tabletsList.size(); i += MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT) {
int end = i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT < tabletsList.size()
? i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT : tabletsList.size();
int end = Math.min(i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT, tabletsList.size());
tablets.add(new ArrayList<>(tabletsList.subList(i, end)));
}

Expand Down
269 changes: 5 additions & 264 deletions regression-test/suites/export_p0/test_export_empty_table.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -143,62 +143,8 @@ suite("test_export_empty_table", "p0") {
waiting_export.call(label)

// check file amounts
check_file_amounts.call("${outFilePath}", 1)

// check data correctness
sql """ DROP TABLE IF EXISTS ${table_load_name} """
sql """
CREATE TABLE IF NOT EXISTS ${table_load_name} (
`user_id` INT NOT NULL COMMENT "用户id",
`date` DATE NOT NULL COMMENT "数据灌入日期时间",
`datetime` DATETIME NOT NULL COMMENT "数据灌入日期时间",
`city` VARCHAR(20) COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`bool_col` boolean COMMENT "",
`int_col` int COMMENT "",
`bigint_col` bigint COMMENT "",
`float_col` float COMMENT "",
`double_col` double COMMENT "",
`char_col` CHAR(10) COMMENT "",
`decimal_col` decimal COMMENT "",
`ipv4_col` ipv4 COMMENT "",
`ipv6_col` ipv6 COMMENT ""
)
DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1");
"""

File[] files = new File("${outFilePath}").listFiles()
String file_path = files[0].getAbsolutePath()
streamLoad {
table "${table_load_name}"

set 'columns', 'user_id, date, datetime, city, age, sex, bool_col, int_col, bigint_col, float_col, double_col, char_col, decimal_col'
set 'strict_mode', 'true'
set 'format', 'csv'
set 'column_separator', ','

file "${file_path}"
time 10000 // limit inflight 10s

check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(0, json.NumberTotalRows)
assertEquals(0, json.NumberFilteredRows)
}
}

sql """ sync; """

qt_select_load1 """ SELECT * FROM ${table_load_name} t ORDER BY user_id; """

check_file_amounts.call("${outFilePath}", 0)
} finally {
try_sql("DROP TABLE IF EXISTS ${table_load_name}")
delete_files.call("${outFilePath}")
}

Expand All @@ -222,59 +168,8 @@ suite("test_export_empty_table", "p0") {
waiting_export.call(label)

// check file amounts
check_file_amounts.call("${outFilePath}", 1)

// check data correctness
sql """ DROP TABLE IF EXISTS ${table_load_name} """
sql """
CREATE TABLE IF NOT EXISTS ${table_load_name} (
`user_id` INT NOT NULL COMMENT "用户id",
`date` DATE NOT NULL COMMENT "数据灌入日期时间",
`datetime` DATETIME NOT NULL COMMENT "数据灌入日期时间",
`city` VARCHAR(20) COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`bool_col` boolean COMMENT "",
`int_col` int COMMENT "",
`bigint_col` bigint COMMENT "",
`float_col` float COMMENT "",
`double_col` double COMMENT "",
`char_col` CHAR(10) COMMENT "",
`decimal_col` decimal COMMENT "",
`ipv4_col` ipv4 COMMENT "",
`ipv6_col` ipv6 COMMENT ""
)
DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1");
"""

File[] files = new File("${outFilePath}").listFiles()
String file_path = files[0].getAbsolutePath()
streamLoad {
table "${table_load_name}"

set 'columns', 'user_id, date, datetime, city, age, sex, bool_col, int_col, bigint_col, float_col, double_col, char_col, decimal_col'
set 'strict_mode', 'true'
set 'format', 'parquet'

file "${file_path}"
time 10000 // limit inflight 10s

check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(0, json.NumberTotalRows)
assertEquals(0, json.NumberFilteredRows)
}
}

qt_select_load2 """ SELECT * FROM ${table_load_name} t ORDER BY user_id; """

check_file_amounts.call("${outFilePath}", 0)
} finally {
try_sql("DROP TABLE IF EXISTS ${table_load_name}")
delete_files.call("${outFilePath}")
}

Expand All @@ -297,59 +192,8 @@ suite("test_export_empty_table", "p0") {
waiting_export.call(label)

// check file amounts
check_file_amounts.call("${outFilePath}", 1)

// check data correctness
sql """ DROP TABLE IF EXISTS ${table_load_name} """
sql """
CREATE TABLE IF NOT EXISTS ${table_load_name} (
`user_id` INT NOT NULL COMMENT "用户id",
`date` DATE NOT NULL COMMENT "数据灌入日期时间",
`datetime` DATETIME NOT NULL COMMENT "数据灌入日期时间",
`city` VARCHAR(20) COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`bool_col` boolean COMMENT "",
`int_col` int COMMENT "",
`bigint_col` bigint COMMENT "",
`float_col` float COMMENT "",
`double_col` double COMMENT "",
`char_col` CHAR(10) COMMENT "",
`decimal_col` decimal COMMENT "",
`ipv4_col` ipv4 COMMENT "",
`ipv6_col` ipv6 COMMENT ""
)
DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1");
"""

File[] files = new File("${outFilePath}").listFiles()
String file_path = files[0].getAbsolutePath()
streamLoad {
table "${table_load_name}"

set 'columns', 'user_id, date, datetime, city, age, sex, bool_col, int_col, bigint_col, float_col, double_col, char_col, decimal_col'
set 'strict_mode', 'true'
set 'format', 'orc'

file "${file_path}"
time 10000 // limit inflight 10s

check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(0, json.NumberTotalRows)
assertEquals(0, json.NumberFilteredRows)
}
}

qt_select_load3 """ SELECT * FROM ${table_load_name} t ORDER BY user_id; """

check_file_amounts.call("${outFilePath}", 0)
} finally {
try_sql("DROP TABLE IF EXISTS ${table_load_name}")
delete_files.call("${outFilePath}")
}

Expand All @@ -373,58 +217,7 @@ suite("test_export_empty_table", "p0") {
waiting_export.call(label)

// check file amounts
check_file_amounts.call("${outFilePath}", 1)

// check data correctness
sql """ DROP TABLE IF EXISTS ${table_load_name} """
sql """
CREATE TABLE IF NOT EXISTS ${table_load_name} (
`user_id` INT NOT NULL COMMENT "用户id",
`date` DATE NOT NULL COMMENT "数据灌入日期时间",
`datetime` DATETIME NOT NULL COMMENT "数据灌入日期时间",
`city` VARCHAR(20) COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`bool_col` boolean COMMENT "",
`int_col` int COMMENT "",
`bigint_col` bigint COMMENT "",
`float_col` float COMMENT "",
`double_col` double COMMENT "",
`char_col` CHAR(10) COMMENT "",
`decimal_col` decimal COMMENT "",
`ipv4_col` ipv4 COMMENT "",
`ipv6_col` ipv6 COMMENT ""
)
DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1");
"""

File[] files = new File("${outFilePath}").listFiles()
String file_path = files[0].getAbsolutePath()
streamLoad {
table "${table_load_name}"

set 'columns', 'user_id, date, datetime, city, age, sex, bool_col, int_col, bigint_col, float_col, double_col, char_col, decimal_col'
set 'strict_mode', 'true'
set 'format', 'csv_with_names'
set 'column_separator', ','

file "${file_path}"
time 10000 // limit inflight 10s

check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(0, json.NumberTotalRows)
assertEquals(0, json.NumberFilteredRows)
}
}

qt_select_load4 """ SELECT * FROM ${table_load_name} t ORDER BY user_id; """

check_file_amounts.call("${outFilePath}", 0)
} finally {
try_sql("DROP TABLE IF EXISTS ${table_load_name}")
delete_files.call("${outFilePath}")
Expand All @@ -451,60 +244,8 @@ suite("test_export_empty_table", "p0") {
waiting_export.call(label)

// check file amounts
check_file_amounts.call("${outFilePath}", 1)

// check data correctness
sql """ DROP TABLE IF EXISTS ${table_load_name} """
sql """
CREATE TABLE IF NOT EXISTS ${table_load_name} (
`user_id` INT NOT NULL COMMENT "用户id",
`date` DATE NOT NULL COMMENT "数据灌入日期时间",
`datetime` DATETIME NOT NULL COMMENT "数据灌入日期时间",
`city` VARCHAR(20) COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`bool_col` boolean COMMENT "",
`int_col` int COMMENT "",
`bigint_col` bigint COMMENT "",
`float_col` float COMMENT "",
`double_col` double COMMENT "",
`char_col` CHAR(10) COMMENT "",
`decimal_col` decimal COMMENT "",
`ipv4_col` ipv4 COMMENT "",
`ipv6_col` ipv6 COMMENT ""
)
DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1");
"""

File[] files = new File("${outFilePath}").listFiles()
String file_path = files[0].getAbsolutePath()
streamLoad {
table "${table_load_name}"

set 'columns', 'user_id, date, datetime, city, age, sex, bool_col, int_col, bigint_col, float_col, double_col, char_col, decimal_col'
set 'strict_mode', 'true'
set 'format', 'csv_with_names_and_types'
set 'column_separator', ','

file "${file_path}"
time 10000 // limit inflight 10s

check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(0, json.NumberTotalRows)
assertEquals(0, json.NumberFilteredRows)
}
}

qt_select_load5 """ SELECT * FROM ${table_load_name} t ORDER BY user_id; """

check_file_amounts.call("${outFilePath}", 0)
} finally {
try_sql("DROP TABLE IF EXISTS ${table_load_name}")
delete_files.call("${outFilePath}")
}

Expand Down