diff --git a/be/src/exec/es_http_scan_node.cpp b/be/src/exec/es_http_scan_node.cpp index 5834fbd76fadd6..95e294d37b2c9a 100644 --- a/be/src/exec/es_http_scan_node.cpp +++ b/be/src/exec/es_http_scan_node.cpp @@ -143,10 +143,13 @@ Status EsHttpScanNode::open(RuntimeState* state) { Status EsHttpScanNode::start_scanners() { { std::unique_lock l(_batch_queue_lock); - _num_running_scanners = 1; + _num_running_scanners = _scan_ranges.size(); + } + + for (int i = 0; i < _scan_ranges.size(); i++) { + _scanner_threads.emplace_back(&EsHttpScanNode::scanner_worker, this, i, + _scan_ranges.size()); } - _scanner_threads.emplace_back(&EsHttpScanNode::scanner_worker, this, 0, - _scan_ranges.size()); return Status::OK; } @@ -378,6 +381,7 @@ static std::string get_host_port(const std::vector& es_hosts) { void EsHttpScanNode::scanner_worker(int start_idx, int length) { // Clone expr context std::vector scanner_expr_ctxs; + DCHECK(start_idx < length); auto status = Expr::clone_if_not_exists(_conjunct_ctxs, _runtime_state, &scanner_expr_ctxs); if (!status.ok()) { @@ -385,31 +389,29 @@ void EsHttpScanNode::scanner_worker(int start_idx, int length) { } EsScanCounter counter; - for (int i = 0; i < length && status.ok(); ++i) { - const TEsScanRange& es_scan_range = - _scan_ranges[start_idx + i].scan_range.es_scan_range; - - // Collect the informations from scan range to perperties - std::map properties(_properties); - properties[ESScanReader::KEY_INDEX] = es_scan_range.index; - if (es_scan_range.__isset.type) { - properties[ESScanReader::KEY_TYPE] = es_scan_range.type; - } - properties[ESScanReader::KEY_SHARD] = std::to_string(es_scan_range.shard_id); - properties[ESScanReader::KEY_BATCH_SIZE] = std::to_string(_runtime_state->batch_size()); - properties[ESScanReader::KEY_HOST_PORT] = get_host_port(es_scan_range.es_hosts); - properties[ESScanReader::KEY_QUERY] - = ESScrollQueryBuilder::build(properties, _column_names, _predicates); - - // start scanner to scan - std::unique_ptr scanner(new EsHttpScanner( - _runtime_state, runtime_profile(), _tuple_id, - properties, scanner_expr_ctxs, &counter)); - status = scanner_scan(std::move(scanner), scanner_expr_ctxs, &counter); - if (!status.ok()) { - LOG(WARNING) << "Scanner[" << start_idx + i << "] process failed. status=" - << status.get_error_msg(); - } + const TEsScanRange& es_scan_range = + _scan_ranges[start_idx].scan_range.es_scan_range; + + // Collect the informations from scan range to perperties + std::map properties(_properties); + properties[ESScanReader::KEY_INDEX] = es_scan_range.index; + if (es_scan_range.__isset.type) { + properties[ESScanReader::KEY_TYPE] = es_scan_range.type; + } + properties[ESScanReader::KEY_SHARD] = std::to_string(es_scan_range.shard_id); + properties[ESScanReader::KEY_BATCH_SIZE] = std::to_string(_runtime_state->batch_size()); + properties[ESScanReader::KEY_HOST_PORT] = get_host_port(es_scan_range.es_hosts); + properties[ESScanReader::KEY_QUERY] + = ESScrollQueryBuilder::build(properties, _column_names, _predicates); + + // start scanner to scan + std::unique_ptr scanner(new EsHttpScanner( + _runtime_state, runtime_profile(), _tuple_id, + properties, scanner_expr_ctxs, &counter)); + status = scanner_scan(std::move(scanner), scanner_expr_ctxs, &counter); + if (!status.ok()) { + LOG(WARNING) << "Scanner[" << start_idx << "] process failed. status=" + << status.get_error_msg(); } // Update stats