Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,4 @@ Status SchemaActiveQueriesScanner::get_next_block_internal(vectorized::Block* bl
return Status::OK();
}

} // namespace doris
} // namespace doris
22 changes: 10 additions & 12 deletions be/src/exec/schema_scanner/schema_scanner_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,43 +31,41 @@ void SchemaScannerHelper::insert_string_value(int col_index, std::string str_val
vectorized::Block* block) {
vectorized::MutableColumnPtr mutable_col_ptr;
mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable();
auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get());
auto* nullable_column = assert_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get());
vectorized::IColumn* col_ptr = &nullable_column->get_nested_column();
reinterpret_cast<vectorized::ColumnString*>(col_ptr)->insert_data(str_val.data(),
str_val.size());
assert_cast<vectorized::ColumnString*>(col_ptr)->insert_data(str_val.data(), str_val.size());
nullable_column->get_null_map_data().emplace_back(0);
}

void SchemaScannerHelper::insert_datetime_value(int col_index, const std::vector<void*>& datas,
vectorized::Block* block) {
vectorized::MutableColumnPtr mutable_col_ptr;
mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable();
auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get());
auto* nullable_column = assert_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get());
vectorized::IColumn* col_ptr = &nullable_column->get_nested_column();
auto data = datas[0];
reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_data(
assert_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_data(
reinterpret_cast<char*>(data), 0);
nullable_column->get_null_map_data().emplace_back(0);
}

void SchemaScannerHelper::insert_int_value(int col_index, int64_t int_val,
vectorized::Block* block) {
void SchemaScannerHelper::insert_int64_value(int col_index, int64_t int_val,
vectorized::Block* block) {
vectorized::MutableColumnPtr mutable_col_ptr;
mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable();
auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get());
auto* nullable_column = assert_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get());
vectorized::IColumn* col_ptr = &nullable_column->get_nested_column();
reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_value(int_val);
assert_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_value(int_val);
nullable_column->get_null_map_data().emplace_back(0);
}

void SchemaScannerHelper::insert_double_value(int col_index, double double_val,
vectorized::Block* block) {
vectorized::MutableColumnPtr mutable_col_ptr;
mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable();
auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get());
auto* nullable_column = assert_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get());
vectorized::IColumn* col_ptr = &nullable_column->get_nested_column();
reinterpret_cast<vectorized::ColumnVector<vectorized::Float64>*>(col_ptr)->insert_value(
double_val);
assert_cast<vectorized::ColumnVector<vectorized::Float64>*>(col_ptr)->insert_value(double_val);
nullable_column->get_null_map_data().emplace_back(0);
}
} // namespace doris
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_scanner_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class SchemaScannerHelper {
static void insert_datetime_value(int col_index, const std::vector<void*>& datas,
vectorized::Block* block);

static void insert_int_value(int col_index, int64_t int_val, vectorized::Block* block);
static void insert_int64_value(int col_index, int64_t int_val, vectorized::Block* block);
static void insert_double_value(int col_index, double double_val, vectorized::Block* block);
};

Expand Down
20 changes: 10 additions & 10 deletions be/src/runtime/runtime_query_statistics_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -521,26 +521,26 @@ void RuntimeQueryStatisticsMgr::get_active_be_tasks_block(vectorized::Block* blo
for (auto& [query_id, qs_ctx_ptr] : _query_statistics_ctx_map) {
TQueryStatistics tqs;
qs_ctx_ptr->collect_query_statistics(&tqs);
SchemaScannerHelper::insert_int_value(0, be_id, block);
SchemaScannerHelper::insert_int64_value(0, be_id, block);
SchemaScannerHelper::insert_string_value(1, qs_ctx_ptr->_fe_addr.hostname, block);
SchemaScannerHelper::insert_string_value(2, query_id, block);

int64_t task_time = qs_ctx_ptr->_is_query_finished
? qs_ctx_ptr->_query_finish_time - qs_ctx_ptr->_query_start_time
: MonotonicMillis() - qs_ctx_ptr->_query_start_time;
SchemaScannerHelper::insert_int_value(3, task_time, block);
SchemaScannerHelper::insert_int_value(4, tqs.cpu_ms, block);
SchemaScannerHelper::insert_int_value(5, tqs.scan_rows, block);
SchemaScannerHelper::insert_int_value(6, tqs.scan_bytes, block);
SchemaScannerHelper::insert_int_value(7, tqs.max_peak_memory_bytes, block);
SchemaScannerHelper::insert_int_value(8, tqs.current_used_memory_bytes, block);
SchemaScannerHelper::insert_int_value(9, tqs.shuffle_send_bytes, block);
SchemaScannerHelper::insert_int_value(10, tqs.shuffle_send_rows, block);
SchemaScannerHelper::insert_int64_value(3, task_time, block);
SchemaScannerHelper::insert_int64_value(4, tqs.cpu_ms, block);
SchemaScannerHelper::insert_int64_value(5, tqs.scan_rows, block);
SchemaScannerHelper::insert_int64_value(6, tqs.scan_bytes, block);
SchemaScannerHelper::insert_int64_value(7, tqs.max_peak_memory_bytes, block);
SchemaScannerHelper::insert_int64_value(8, tqs.current_used_memory_bytes, block);
SchemaScannerHelper::insert_int64_value(9, tqs.shuffle_send_bytes, block);
SchemaScannerHelper::insert_int64_value(10, tqs.shuffle_send_rows, block);

std::stringstream ss;
ss << qs_ctx_ptr->_query_type;
SchemaScannerHelper::insert_string_value(11, ss.str(), block);
}
}

} // namespace doris
} // namespace doris
10 changes: 5 additions & 5 deletions be/src/runtime/workload_group/workload_group_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,18 +255,18 @@ void WorkloadGroupMgr::get_wg_resource_usage(vectorized::Block* block) {
std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
block->reserve(_workload_groups.size());
for (const auto& [id, wg] : _workload_groups) {
SchemaScannerHelper::insert_int_value(0, be_id, block);
SchemaScannerHelper::insert_int_value(1, wg->id(), block);
SchemaScannerHelper::insert_int_value(2, wg->get_mem_used(), block);
SchemaScannerHelper::insert_int64_value(0, be_id, block);
SchemaScannerHelper::insert_int64_value(1, wg->id(), block);
SchemaScannerHelper::insert_int64_value(2, wg->get_mem_used(), block);

double cpu_usage_p =
(double)wg->get_cpu_usage() / (double)total_cpu_time_ns_per_second * 100;
cpu_usage_p = std::round(cpu_usage_p * 100.0) / 100.0;

SchemaScannerHelper::insert_double_value(3, cpu_usage_p, block);

SchemaScannerHelper::insert_int_value(4, wg->get_local_scan_bytes_per_second(), block);
SchemaScannerHelper::insert_int_value(5, wg->get_remote_scan_bytes_per_second(), block);
SchemaScannerHelper::insert_int64_value(4, wg->get_local_scan_bytes_per_second(), block);
SchemaScannerHelper::insert_int64_value(5, wg->get_remote_scan_bytes_per_second(), block);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,8 @@ public class SchemaTable extends Table {
.column("PARTITION_NAME", ScalarType.createVarchar(64))
.column("SUBPARTITION_NAME", ScalarType.createVarchar(64))
.column("PARTITION_ORDINAL_POSITION", ScalarType.createType(PrimitiveType.INT))
.column("SUBPARTITION_ORDINAL_POSITION", ScalarType.createType(PrimitiveType.INT))
.column("SUBPARTITION_ORDINAL_POSITION",
ScalarType.createType(PrimitiveType.INT))
.column("PARTITION_METHOD", ScalarType.createVarchar(13))
.column("SUBPARTITION_METHOD", ScalarType.createVarchar(13))
.column("PARTITION_EXPRESSION", ScalarType.createVarchar(2048))
Expand Down
Loading