diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp index 017a03ef558b46..7c7d4f5bee6142 100644 --- a/be/src/exec/broker_scanner.cpp +++ b/be/src/exec/broker_scanner.cpp @@ -237,6 +237,7 @@ Status BrokerScanner::open_file_reader() { case TFileType::FILE_STREAM: { _stream_load_pipe = _state->exec_env()->load_stream_mgr()->get(range.load_id); if (_stream_load_pipe == nullptr) { + VLOG(3) << "unknown stream load id: " << UniqueId(range.load_id); return Status("unknown stream load id"); } _cur_file_reader = _stream_load_pipe.get(); diff --git a/be/src/runtime/routine_load/data_consumer.cpp b/be/src/runtime/routine_load/data_consumer.cpp index dcb30023f83d65..0e701b3153a4a8 100644 --- a/be/src/runtime/routine_load/data_consumer.cpp +++ b/be/src/runtime/routine_load/data_consumer.cpp @@ -145,6 +145,8 @@ Status KafkaDataConsumer::start(StreamLoadContext* ctx) { << ", batch size: " << left_bytes << ". " << ctx->brief(); + // copy one + std::map cmt_offset = ctx->kafka_info->cmt_offset; MonotonicStopWatch watch; watch.start(); Status st; @@ -157,6 +159,7 @@ Status KafkaDataConsumer::start(StreamLoadContext* ctx) { if (_finished) { kakfa_pipe ->finish(); + ctx->kafka_info->cmt_offset = std::move(cmt_offset); return Status::OK; } @@ -175,6 +178,7 @@ Status KafkaDataConsumer::start(StreamLoadContext* ctx) { DCHECK(left_bytes < ctx->kafka_info->max_batch_size); DCHECK(left_rows < ctx->kafka_info->max_batch_rows); kakfa_pipe->finish(); + ctx->kafka_info->cmt_offset = std::move(cmt_offset); _finished = true; return Status::OK; } @@ -195,7 +199,7 @@ Status KafkaDataConsumer::start(StreamLoadContext* ctx) { if (st.ok()) { left_rows--; left_bytes -= msg->len(); - ctx->kafka_info->cmt_offset[msg->partition()] = msg->offset(); + cmt_offset[msg->partition()] = msg->offset(); VLOG(3) << "consume partition[" << msg->partition() << " - " << msg->offset() << "]"; } diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index 8dbff529c00b60..de385dafcd2d27 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -37,7 +37,7 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) { std::unique_lock l(_lock); if (_task_map.find(task.id) != _task_map.end()) { // already submitted - LOG(INFO) << "routine load task " << task.id << " has already been submitted"; + LOG(INFO) << "routine load task " << UniqueId(task.id) << " has already been submitted"; return Status::OK; } @@ -127,6 +127,8 @@ void RoutineLoadTaskExecutor::exec_task( } \ } while (false); + VLOG(1) << "begin to execute routine load task: " << ctx->brief(); + // get or create data consumer std::shared_ptr consumer; HANDLE_ERROR(consumer_pool->get_consumer(ctx, &consumer), "failed to get consumer"); diff --git a/be/src/runtime/stream_load/load_stream_mgr.h b/be/src/runtime/stream_load/load_stream_mgr.h index 97e5777adc1779..cfaf08a94a0a63 100644 --- a/be/src/runtime/stream_load/load_stream_mgr.h +++ b/be/src/runtime/stream_load/load_stream_mgr.h @@ -40,6 +40,7 @@ class LoadStreamMgr { return Status("id already exist"); } _stream_map.emplace(id, stream); + VLOG(3) << "put stream load pipe: " << id; return Status::OK; } @@ -59,6 +60,7 @@ class LoadStreamMgr { auto it = _stream_map.find(id); if (it != std::end(_stream_map)) { _stream_map.erase(it); + VLOG(3) << "remove stream load pipe: " << id; } return; }