From e33c84bf1608485539c1a12cc212206909d710bd Mon Sep 17 00:00:00 2001 From: laihui Date: Tue, 28 Oct 2025 17:30:11 +0800 Subject: [PATCH] introduce load job system table --- be/src/exec/schema_scanner.cpp | 3 + be/src/exec/schema_scanner/schema_helper.cpp | 9 + be/src/exec/schema_scanner/schema_helper.h | 5 + .../schema_load_job_scanner.cpp | 189 ++++++++++++++++++ .../schema_scanner/schema_load_job_scanner.h | 50 +++++ .../doris/analysis/SchemaTableType.java | 2 + .../org/apache/doris/catalog/SchemaTable.java | 24 +++ .../doris/service/FrontendServiceImpl.java | 81 ++++++++ gensrc/thrift/Descriptors.thrift | 1 + gensrc/thrift/FrontendService.thrift | 32 +++ .../test_load_job_info_system_table.groovy | 74 +++++++ 11 files changed, 470 insertions(+) create mode 100644 be/src/exec/schema_scanner/schema_load_job_scanner.cpp create mode 100644 be/src/exec/schema_scanner/schema_load_job_scanner.h create mode 100644 regression-test/suites/job_p0/job_system_table/test_load_job_info_system_table.groovy diff --git a/be/src/exec/schema_scanner.cpp b/be/src/exec/schema_scanner.cpp index 370ea72495ae0e..a39af09cc5123f 100644 --- a/be/src/exec/schema_scanner.cpp +++ b/be/src/exec/schema_scanner.cpp @@ -41,6 +41,7 @@ #include "exec/schema_scanner/schema_encryption_keys_scanner.h" #include "exec/schema_scanner/schema_file_cache_statistics.h" #include "exec/schema_scanner/schema_files_scanner.h" +#include "exec/schema_scanner/schema_load_job_scanner.h" #include "exec/schema_scanner/schema_metadata_name_ids_scanner.h" #include "exec/schema_scanner/schema_partitions_scanner.h" #include "exec/schema_scanner/schema_processlist_scanner.h" @@ -240,6 +241,8 @@ std::unique_ptr SchemaScanner::create(TSchemaTableType::type type return SchemaBackendKerberosTicketCacheScanner::create_unique(); case TSchemaTableType::SCH_ROUTINE_LOAD_JOBS: return SchemaRoutineLoadJobScanner::create_unique(); + case TSchemaTableType::SCH_LOAD_JOBS: + return SchemaLoadJobScanner::create_unique(); case TSchemaTableType::SCH_BACKEND_TABLETS: return SchemaTabletsScanner::create_unique(); case TSchemaTableType::SCH_VIEW_DEPENDENCY: diff --git a/be/src/exec/schema_scanner/schema_helper.cpp b/be/src/exec/schema_scanner/schema_helper.cpp index 37c2af63142c97..33516e01726447 100644 --- a/be/src/exec/schema_scanner/schema_helper.cpp +++ b/be/src/exec/schema_scanner/schema_helper.cpp @@ -151,6 +151,15 @@ Status SchemaHelper::fetch_routine_load_job(const std::string& ip, const int32_t }); } +Status SchemaHelper::fetch_load_job(const std::string& ip, const int32_t port, + const TFetchLoadJobRequest& request, + TFetchLoadJobResult* result) { + return ThriftRpcHelper::rpc( + ip, port, [&request, &result](FrontendServiceConnection& client) { + client->fetchLoadJob(*result, request); + }); +} + Status SchemaHelper::fetch_schema_table_data(const std::string& ip, const int32_t port, const TFetchSchemaTableDataRequest& request, TFetchSchemaTableDataResult* result) { diff --git a/be/src/exec/schema_scanner/schema_helper.h b/be/src/exec/schema_scanner/schema_helper.h index cc931f305379b0..76bc7eaea46b8d 100644 --- a/be/src/exec/schema_scanner/schema_helper.h +++ b/be/src/exec/schema_scanner/schema_helper.h @@ -29,6 +29,8 @@ class TDescribeTablesParams; class TDescribeTablesResult; class TFetchRoutineLoadJobRequest; class TFetchRoutineLoadJobResult; +class TFetchLoadJobRequest; +class TFetchLoadJobResult; class TFetchSchemaTableDataRequest; class TFetchSchemaTableDataResult; class TGetDbsParams; @@ -94,6 +96,9 @@ class SchemaHelper { const TFetchRoutineLoadJobRequest& request, TFetchRoutineLoadJobResult* result); + static Status fetch_load_job(const std::string& ip, const int32_t port, + const TFetchLoadJobRequest& request, TFetchLoadJobResult* result); + static Status fetch_schema_table_data(const std::string& ip, const int32_t port, const TFetchSchemaTableDataRequest& request, TFetchSchemaTableDataResult* result); diff --git a/be/src/exec/schema_scanner/schema_load_job_scanner.cpp b/be/src/exec/schema_scanner/schema_load_job_scanner.cpp new file mode 100644 index 00000000000000..851882d13f9b83 --- /dev/null +++ b/be/src/exec/schema_scanner/schema_load_job_scanner.cpp @@ -0,0 +1,189 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/schema_scanner/schema_load_job_scanner.h" + +#include +#include + +#include + +#include "exec/schema_scanner/schema_helper.h" +#include "runtime/runtime_state.h" +#include "vec/common/string_ref.h" +#include "vec/core/block.h" +#include "vec/data_types/data_type_factory.hpp" + +namespace doris { +class RuntimeState; +namespace vectorized { +class Block; +} // namespace vectorized + +std::vector SchemaLoadJobScanner::_s_tbls_columns = { + // name, type, size, is_null + {"JOB_ID", TYPE_STRING, sizeof(StringRef), true}, + {"LABEL", TYPE_STRING, sizeof(StringRef), true}, + {"STATE", TYPE_STRING, sizeof(StringRef), true}, + {"PROGRESS", TYPE_STRING, sizeof(StringRef), true}, + {"TYPE", TYPE_STRING, sizeof(StringRef), true}, + {"ETL_INFO", TYPE_STRING, sizeof(StringRef), true}, + {"TASK_INFO", TYPE_STRING, sizeof(StringRef), true}, + {"ERROR_MSG", TYPE_STRING, sizeof(StringRef), true}, + {"CREATE_TIME", TYPE_STRING, sizeof(StringRef), true}, + {"ETL_START_TIME", TYPE_STRING, sizeof(StringRef), true}, + {"ETL_FINISH_TIME", TYPE_STRING, sizeof(StringRef), true}, + {"LOAD_START_TIME", TYPE_STRING, sizeof(StringRef), true}, + {"LOAD_FINISH_TIME", TYPE_STRING, sizeof(StringRef), true}, + {"URL", TYPE_STRING, sizeof(StringRef), true}, + {"JOB_DETAILS", TYPE_STRING, sizeof(StringRef), true}, + {"TRANSACTION_ID", TYPE_STRING, sizeof(StringRef), true}, + {"ERROR_TABLETS", TYPE_STRING, sizeof(StringRef), true}, + {"USER", TYPE_STRING, sizeof(StringRef), true}, + {"COMMENT", TYPE_STRING, sizeof(StringRef), true}, + {"FIRST_ERROR_MSG", TYPE_STRING, sizeof(StringRef), true}, +}; + +SchemaLoadJobScanner::SchemaLoadJobScanner() + : SchemaScanner(_s_tbls_columns, TSchemaTableType::SCH_LOAD_JOBS) {} + +SchemaLoadJobScanner::~SchemaLoadJobScanner() {} + +Status SchemaLoadJobScanner::start(RuntimeState* state) { + if (!_is_init) { + return Status::InternalError("used before initialized."); + } + TFetchLoadJobRequest request; + RETURN_IF_ERROR(SchemaHelper::fetch_load_job(*(_param->common_param->ip), + _param->common_param->port, request, &_result)); + return Status::OK(); +} + +Status SchemaLoadJobScanner::get_next_block_internal(vectorized::Block* block, bool* eos) { + if (!_is_init) { + return Status::InternalError("call this before initial."); + } + if (block == nullptr || eos == nullptr) { + return Status::InternalError("invalid parameter."); + } + + *eos = true; + if (_result.loadJobs.empty()) { + return Status::OK(); + } + + return _fill_block_impl(block); +} + +Status SchemaLoadJobScanner::_fill_block_impl(vectorized::Block* block) { + SCOPED_TIMER(_fill_block_timer); + + const auto& jobs_info = _result.loadJobs; + size_t row_num = jobs_info.size(); + if (row_num == 0) { + return Status::OK(); + } + + for (size_t col_idx = 0; col_idx < _s_tbls_columns.size(); ++col_idx) { + const auto& col_desc = _s_tbls_columns[col_idx]; + + std::vector str_refs(row_num); + std::vector datas(row_num); + std::vector column_values(row_num); + + for (size_t row_idx = 0; row_idx < row_num; ++row_idx) { + const auto& job_info = jobs_info[row_idx]; + std::string& column_value = column_values[row_idx]; + + if (col_desc.type == TYPE_STRING) { + switch (col_idx) { + case 0: // JOB_ID + column_value = job_info.__isset.job_id ? job_info.job_id : ""; + break; + case 1: // LABEL + column_value = job_info.__isset.label ? job_info.label : ""; + break; + case 2: // STATE + column_value = job_info.__isset.state ? job_info.state : ""; + break; + case 3: // PROGRESS + column_value = job_info.__isset.progress ? job_info.progress : ""; + break; + case 4: // TYPE + column_value = job_info.__isset.type ? job_info.type : ""; + break; + case 5: // ETL_INFO + column_value = job_info.__isset.etl_info ? job_info.etl_info : ""; + break; + case 6: // TASK_INFO + column_value = job_info.__isset.task_info ? job_info.task_info : ""; + break; + case 7: // ERROR_MSG + column_value = job_info.__isset.error_msg ? job_info.error_msg : ""; + break; + case 8: // CREATE_TIME + column_value = job_info.__isset.create_time ? job_info.create_time : ""; + break; + case 9: // ETL_START_TIME + column_value = job_info.__isset.etl_start_time ? job_info.etl_start_time : ""; + break; + case 10: // ETL_FINISH_TIME + column_value = job_info.__isset.etl_finish_time ? job_info.etl_finish_time : ""; + break; + case 11: // LOAD_START_TIME + column_value = job_info.__isset.load_start_time ? job_info.load_start_time : ""; + break; + case 12: // LOAD_FINISH_TIME + column_value = + job_info.__isset.load_finish_time ? job_info.load_finish_time : ""; + break; + case 13: // URL + column_value = job_info.__isset.url ? job_info.url : ""; + break; + case 14: // JOB_DETAILS + column_value = job_info.__isset.job_details ? job_info.job_details : ""; + break; + case 15: // TRANSACTION_ID + column_value = job_info.__isset.transaction_id ? job_info.transaction_id : ""; + break; + case 16: // ERROR_TABLETS + column_value = job_info.__isset.error_tablets ? job_info.error_tablets : ""; + break; + case 17: // USER + column_value = job_info.__isset.user ? job_info.user : ""; + break; + case 18: // COMMENT + column_value = job_info.__isset.comment ? job_info.comment : ""; + break; + case 19: // FIRST_ERROR_MSG + column_value = job_info.__isset.first_error_msg ? job_info.first_error_msg : ""; + break; + } + + str_refs[row_idx] = + StringRef(column_values[row_idx].data(), column_values[row_idx].size()); + datas[row_idx] = &str_refs[row_idx]; + } + } + + RETURN_IF_ERROR(fill_dest_column_for_range(block, col_idx, datas)); + } + + return Status::OK(); +} + +} // namespace doris diff --git a/be/src/exec/schema_scanner/schema_load_job_scanner.h b/be/src/exec/schema_scanner/schema_load_job_scanner.h new file mode 100644 index 00000000000000..8fcd5fa56d8166 --- /dev/null +++ b/be/src/exec/schema_scanner/schema_load_job_scanner.h @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include + +#include "common/status.h" +#include "exec/schema_scanner.h" + +namespace doris { +class RuntimeState; +namespace vectorized { +class Block; +} // namespace vectorized + +class SchemaLoadJobScanner : public SchemaScanner { + ENABLE_FACTORY_CREATOR(SchemaLoadJobScanner); + +public: + SchemaLoadJobScanner(); + ~SchemaLoadJobScanner() override; + + Status start(RuntimeState* state) override; + Status get_next_block_internal(vectorized::Block* block, bool* eos) override; + +private: + Status _fill_block_impl(vectorized::Block* block); + + TFetchLoadJobResult _result; + static std::vector _s_tbls_columns; +}; + +} // namespace doris diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java index 4adbf468c42974..5576c6d294be9d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java @@ -96,6 +96,8 @@ public enum SchemaTableType { TSchemaTableType.SCH_BACKEND_KERBEROS_TICKET_CACHE), SCH_ROUTINE_LOAD_JOBS("ROUTINE_LOAD_JOBS", "ROUTINE_LOAD_JOBS", TSchemaTableType.SCH_ROUTINE_LOAD_JOBS), + SCH_LOAD_JOBS("LOAD_JOBS", "LOAD_JOBS", + TSchemaTableType.SCH_LOAD_JOBS), SCH_VIEW_DEPENDENCY("VIEW_DEPENDENCY", "VIEW_DEPENDENCY", TSchemaTableType.SCH_VIEW_DEPENDENCY), SQL_BLOCK_RULE_STATUS("SQL_BLOCK_RULE_STATUS", "SQL_BLOCK_RULE_STATUS", diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java index f3252cc15ec780..1fc83fdb925266 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java @@ -674,6 +674,30 @@ public class SchemaTable extends Table { .column("IS_ABNORMAL_PAUSE", ScalarType.createType(PrimitiveType.BOOLEAN)) .build()) ) + .put("load_jobs", + new SchemaTable(SystemIdGenerator.getNextId(), "load_jobs", TableType.SCHEMA, + builder().column("JOB_ID", ScalarType.createStringType()) + .column("LABEL", ScalarType.createStringType()) + .column("STATE", ScalarType.createStringType()) + .column("PROGRESS", ScalarType.createStringType()) + .column("TYPE", ScalarType.createStringType()) + .column("ETL_INFO", ScalarType.createStringType()) + .column("TASK_INFO", ScalarType.createStringType()) + .column("ERROR_MSG", ScalarType.createStringType()) + .column("CREATE_TIME", ScalarType.createStringType()) + .column("ETL_START_TIME", ScalarType.createStringType()) + .column("ETL_FINISH_TIME", ScalarType.createStringType()) + .column("LOAD_START_TIME", ScalarType.createStringType()) + .column("LOAD_FINISH_TIME", ScalarType.createStringType()) + .column("URL", ScalarType.createStringType()) + .column("JOB_DETAILS", ScalarType.createStringType()) + .column("TRANSACTION_ID", ScalarType.createStringType()) + .column("ERROR_TABLETS", ScalarType.createStringType()) + .column("USER", ScalarType.createStringType()) + .column("COMMENT", ScalarType.createStringType()) + .column("FIRST_ERROR_MSG", ScalarType.createStringType()) + .build()) + ) .put("backend_tablets", new SchemaTable(SystemIdGenerator.getNextId(), "backend_tablets", TableType.SCHEMA, builder().column("BE_ID", ScalarType.createType(PrimitiveType.BIGINT)) .column("TABLET_ID", ScalarType.createType(PrimitiveType.BIGINT)) diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index e89465d91619d6..f9cbad663816c8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -85,6 +85,7 @@ import org.apache.doris.insertoverwrite.InsertOverwriteManager; import org.apache.doris.insertoverwrite.InsertOverwriteUtil; import org.apache.doris.load.StreamLoadHandler; +import org.apache.doris.load.loadv2.LoadManager; import org.apache.doris.load.routineload.ErrorReason; import org.apache.doris.load.routineload.RoutineLoadJob; import org.apache.doris.load.routineload.RoutineLoadJob.JobState; @@ -153,6 +154,8 @@ import org.apache.doris.thrift.TDropPlsqlStoredProcedureRequest; import org.apache.doris.thrift.TEncryptionAlgorithm; import org.apache.doris.thrift.TEncryptionKey; +import org.apache.doris.thrift.TFetchLoadJobRequest; +import org.apache.doris.thrift.TFetchLoadJobResult; import org.apache.doris.thrift.TFetchResourceResult; import org.apache.doris.thrift.TFetchRoutineLoadJobRequest; import org.apache.doris.thrift.TFetchRoutineLoadJobResult; @@ -201,6 +204,7 @@ import org.apache.doris.thrift.TListPrivilegesResult; import org.apache.doris.thrift.TListTableMetadataNameIdsResult; import org.apache.doris.thrift.TListTableStatusResult; +import org.apache.doris.thrift.TLoadJob; import org.apache.doris.thrift.TLoadTxn2PCRequest; import org.apache.doris.thrift.TLoadTxn2PCResult; import org.apache.doris.thrift.TLoadTxnBeginRequest; @@ -4384,6 +4388,83 @@ public TFetchRoutineLoadJobResult fetchRoutineLoadJob(TFetchRoutineLoadJobReques return result; } + @Override + public TFetchLoadJobResult fetchLoadJob(TFetchLoadJobRequest request) { + TFetchLoadJobResult result = new TFetchLoadJobResult(); + + if (!Env.getCurrentEnv().isReady()) { + return result; + } + + // Create a ConnectContext with skipAuth=true for system table access + // This is necessary because LoadJob.checkAuth() requires a ConnectContext + // and system table queries from backend don't have user context + ConnectContext ctx = new ConnectContext(); + ctx.setEnv(Env.getCurrentEnv()); + ctx.setSkipAuth(true); + ctx.setThreadLocalInfo(); + + try { + LoadManager loadManager = Env.getCurrentEnv().getLoadManager(); + List jobInfos = Lists.newArrayList(); + List dbNames = Env.getCurrentInternalCatalog().getDbNames(); + for (String dbName : dbNames) { + DatabaseIf db; + try { + db = Env.getCurrentInternalCatalog().getDbOrAnalysisException(dbName); + } catch (Exception e) { + LOG.warn("Failed to get database: {}", dbName, e); + continue; + } + long dbId = db.getId(); + try { + List> loadJobInfosByDb = loadManager.getLoadJobInfosByDb( + dbId, null, false, null); + for (List jobInfo : loadJobInfosByDb) { + TLoadJob tJob = new TLoadJob(); + // Based on LOAD_TITLE_NAMES order: + // JobId, Label, State, Progress, Type, EtlInfo, TaskInfo, ErrorMsg, CreateTime, + // EtlStartTime, EtlFinishTime, LoadStartTime, LoadFinishTime, URL, JobDetails, + // TransactionId, ErrorTablets, User, Comment, FirstErrorMsg + if (jobInfo.size() >= 20) { + tJob.setJobId(String.valueOf(jobInfo.get(0))); + tJob.setLabel(String.valueOf(jobInfo.get(1))); + tJob.setState(String.valueOf(jobInfo.get(2))); + tJob.setProgress(String.valueOf(jobInfo.get(3))); + tJob.setType(String.valueOf(jobInfo.get(4))); + tJob.setEtlInfo(String.valueOf(jobInfo.get(5))); + tJob.setTaskInfo(String.valueOf(jobInfo.get(6))); + tJob.setErrorMsg(String.valueOf(jobInfo.get(7))); + tJob.setCreateTime(String.valueOf(jobInfo.get(8))); + tJob.setEtlStartTime(String.valueOf(jobInfo.get(9))); + tJob.setEtlFinishTime(String.valueOf(jobInfo.get(10))); + tJob.setLoadStartTime(String.valueOf(jobInfo.get(11))); + tJob.setLoadFinishTime(String.valueOf(jobInfo.get(12))); + tJob.setUrl(String.valueOf(jobInfo.get(13))); + tJob.setJobDetails(String.valueOf(jobInfo.get(14))); + tJob.setTransactionId(String.valueOf(jobInfo.get(15))); + tJob.setErrorTablets(String.valueOf(jobInfo.get(16))); + tJob.setUser(String.valueOf(jobInfo.get(17))); + tJob.setComment(String.valueOf(jobInfo.get(18))); + tJob.setFirstErrorMsg(String.valueOf(jobInfo.get(19))); + jobInfos.add(tJob); + } + } + } catch (Exception e) { + LOG.warn("Failed to get load jobs for database: {}", dbName, e); + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("load job infos: {}", jobInfos); + } + result.setLoadJobs(jobInfos); + + return result; + } finally { + ConnectContext.remove(); + } + } + @Override public TGetTableTDEInfoResult getTableTDEInfo(TGetTableTDEInfoRequest request) throws TException { String clientAddr = getClientAddrAsString(); diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index f7c1e32daad0d7..95ac677da28344 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -160,6 +160,7 @@ enum TSchemaTableType { SCH_CLUSTER_SNAPSHOT_PROPERTIES = 61; SCH_BLACKHOLE = 62; SCH_COLUMN_DATA_SIZES = 63; + SCH_LOAD_JOBS = 64; } enum THdfsCompression { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 54a6d1555e5393..88365ecba4562d 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1604,6 +1604,36 @@ struct TFetchRoutineLoadJobResult { 1: optional list routineLoadJobs } +struct TFetchLoadJobRequest { +} + +struct TLoadJob { + 1: optional string job_id + 2: optional string label + 3: optional string state + 4: optional string progress + 5: optional string type + 6: optional string etl_info + 7: optional string task_info + 8: optional string error_msg + 9: optional string create_time + 10: optional string etl_start_time + 11: optional string etl_finish_time + 12: optional string load_start_time + 13: optional string load_finish_time + 14: optional string url + 15: optional string job_details + 16: optional string transaction_id + 17: optional string error_tablets + 18: optional string user + 19: optional string comment + 20: optional string first_error_msg +} + +struct TFetchLoadJobResult { + 1: optional list loadJobs +} + struct TPlanNodeRuntimeStatsItem { // node_id means PlanNodeId, add this field so that we can merge RuntimeProfile of same node more easily 1: optional i32 node_id @@ -1749,6 +1779,8 @@ service FrontendService { TFetchRoutineLoadJobResult fetchRoutineLoadJob(1: TFetchRoutineLoadJobRequest request) + TFetchLoadJobResult fetchLoadJob(1: TFetchLoadJobRequest request) + TGetEncryptionKeysResult getEncryptionKeys(1: TGetEncryptionKeysRequest request) TGetTableTDEInfoResult getTableTDEInfo(1: TGetTableTDEInfoRequest request) diff --git a/regression-test/suites/job_p0/job_system_table/test_load_job_info_system_table.groovy b/regression-test/suites/job_p0/job_system_table/test_load_job_info_system_table.groovy new file mode 100644 index 00000000000000..05cad21b3cebf3 --- /dev/null +++ b/regression-test/suites/job_p0/job_system_table/test_load_job_info_system_table.groovy @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_load_job_info_system_table", "p0") { + def tableName = "test_load_job_info_table" + def label = "test_load_job_label_" + UUID.randomUUID().toString().replace("-", "") + sql """ + DROP TABLE IF EXISTS ${tableName} + """ + sql """ + CREATE TABLE ${tableName} + ( + k1 INT NOT NULL, + k2 VARCHAR(50) NOT NULL, + k3 DATETIME NOT NULL, + v1 INT SUM DEFAULT '0' + ) + AGGREGATE KEY(k1, k2, k3) + DISTRIBUTED BY HASH(k1) BUCKETS 3 + PROPERTIES ( + "replication_num" = "1" + ) + """ + + sql """ + INSERT INTO ${tableName} with label ${label} select 1, 'test1', '2023-01-01 00:00:00', 10 + """ + + def res = sql """ + SELECT + JOB_ID, + LABEL, + STATE, + PROGRESS, + TYPE, + ETL_INFO, + TASK_INFO, + ERROR_MSG, + CREATE_TIME, + ETL_START_TIME, + ETL_FINISH_TIME, + LOAD_START_TIME, + LOAD_FINISH_TIME, + URL, + JOB_DETAILS, + TRANSACTION_ID, + ERROR_TABLETS, + USER, + COMMENT, + FIRST_ERROR_MSG + FROM + information_schema.load_jobs + WHERE + LABEL = '${label}' + """ + + log.info("Result size: ${res.size()}") + assertTrue(res.size() > 0, "Job should appear in load_jobs system table after ${label} insert") +} +