diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index f29ec307914eeb..212233dcbae6ee 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1181,14 +1181,15 @@ DEFINE_mString(ca_cert_file_paths, "/etc/ssl/ca-bundle.pem"); /** Table sink configurations(currently contains only external table types) **/ -// Minimum data processed to scale writers when non partition writing +// Minimum data processed to scale writers in exchange when non partition writing DEFINE_mInt64(table_sink_non_partition_write_scaling_data_processed_threshold, - "125829120"); // 120MB -// Minimum data processed to start rebalancing in exchange when partition writing -DEFINE_mInt64(table_sink_partition_write_data_processed_threshold, "209715200"); // 200MB + "26214400"); // 25MB // Minimum data processed to trigger skewed partition rebalancing in exchange when partition writing -DEFINE_mInt64(table_sink_partition_write_skewed_data_processed_rebalance_threshold, - "209715200"); // 200MB +DEFINE_mInt64(table_sink_partition_write_min_data_processed_rebalance_threshold, + "26214400"); // 25MB +// Minimum partition data processed to rebalance writers in exchange when partition writing +DEFINE_mInt64(table_sink_partition_write_min_partition_data_processed_rebalance_threshold, + "15728640"); // 15MB // Maximum processed partition nums of per writer when partition writing DEFINE_mInt32(table_sink_partition_write_max_partition_nums_per_writer, "128"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 81fabfb9517879..b6b7e0c1d3660f 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1265,12 +1265,12 @@ DECLARE_String(trino_connector_plugin_dir); DECLARE_mString(ca_cert_file_paths); /** Table sink configurations(currently contains only external table types) **/ -// Minimum data processed to scale writers when non partition writing +// Minimum data processed to scale writers in exchange when non partition writing DECLARE_mInt64(table_sink_non_partition_write_scaling_data_processed_threshold); -// Minimum data processed to start rebalancing in exchange when partition writing -DECLARE_mInt64(table_sink_partition_write_data_processed_threshold); // Minimum data processed to trigger skewed partition rebalancing in exchange when partition writing -DECLARE_mInt64(table_sink_partition_write_skewed_data_processed_rebalance_threshold); +DECLARE_mInt64(table_sink_partition_write_min_data_processed_rebalance_threshold); +// Minimum partition data processed to rebalance writers in exchange when partition writing +DECLARE_mInt64(table_sink_partition_write_min_partition_data_processed_rebalance_threshold); // Maximum processed partition nums of per writer when partition writing DECLARE_mInt32(table_sink_partition_write_max_partition_nums_per_writer); diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index bc55bc8f805803..8323e20cfd17c1 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -257,17 +257,23 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf _partitioner.reset( new vectorized::Crc32HashPartitioner(_partition_count)); _partition_function.reset(new HashPartitionFunction(_partitioner.get())); - // const long MEGABYTE = 1024 * 1024; - // const long MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD = 10000 * MEGABYTE; // 1MB - // const long MIN_DATA_PROCESSED_REBALANCE_THRESHOLD = 50000 * MEGABYTE; // 50MB - // const long MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD = 1; // 1MB - // const long MIN_DATA_PROCESSED_REBALANCE_THRESHOLD = 1; // 50MB scale_writer_partitioning_exchanger.reset(new vectorized::ScaleWriterPartitioningExchanger< HashPartitionFunction>( channels.size(), *_partition_function, _partition_count, channels.size(), 1, - config::table_sink_partition_write_data_processed_threshold, - config::table_sink_partition_write_skewed_data_processed_rebalance_threshold)); + config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold / + state->task_num() == + 0 + ? config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold + : config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold / + state->task_num(), + config::table_sink_partition_write_min_data_processed_rebalance_threshold / + state->task_num() == + 0 + ? config::table_sink_partition_write_min_data_processed_rebalance_threshold + : config::table_sink_partition_write_min_data_processed_rebalance_threshold / + state->task_num())); + RETURN_IF_ERROR(_partitioner->init(p._texprs)); RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc)); _profile->add_info_string("Partitioner", diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index ce2d4a507487eb..f23e39472ab993 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -593,6 +593,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( init_runtime_state(task_runtime_state); auto cur_task_id = _total_tasks++; task_runtime_state->set_task_id(cur_task_id); + task_runtime_state->set_task_num(pipeline->num_tasks()); auto task = std::make_unique( pipeline, cur_task_id, get_task_runtime_state(cur_task_id), this, pipeline_id_to_profile[pip_idx].get(), get_local_exchange_state(pipeline), diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 6fae242d53cd95..85413406fd2978 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -624,6 +624,10 @@ class RuntimeState { int task_id() const { return _task_id; } + void set_task_num(int task_num) { _task_num = task_num; } + + int task_num() const { return _task_num; } + private: Status create_error_log_file(); @@ -734,6 +738,7 @@ class RuntimeState { std::vector _error_tablet_infos; int _max_operator_id = 0; int _task_id = -1; + int _task_num = 0; std::vector _hive_partition_updates;