From 709c49e66a70ccbc2aff3c5d6d1fbee871b69d81 Mon Sep 17 00:00:00 2001 From: morningman Date: Mon, 11 Mar 2019 17:19:52 +0800 Subject: [PATCH] Fix bug that data consumer should be removed from pool when being got --- .../runtime/routine_load/data_consumer_pool.cpp | 16 ++++++++++------ .../runtime/routine_load_task_executor_test.cpp | 2 +- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/be/src/runtime/routine_load/data_consumer_pool.cpp b/be/src/runtime/routine_load/data_consumer_pool.cpp index 958e518bcaf1a5..f7ee3af1a79259 100644 --- a/be/src/runtime/routine_load/data_consumer_pool.cpp +++ b/be/src/runtime/routine_load/data_consumer_pool.cpp @@ -26,13 +26,17 @@ Status DataConsumerPool::get_consumer( std::unique_lock l(_lock); // check if there is an available consumer. - // if has, return it - for (auto& c : _pool) { - if (c->match(ctx)) { - VLOG(3) << "get an available data consumer from pool: " << c->id(); - c->reset(); - *ret = c; + // if has, return it, also remove it from the pool + auto iter = std::begin(_pool); + while (iter != std::end(_pool)) { + if ((*iter)->match(ctx)) { + VLOG(3) << "get an available data consumer from pool: " << (*iter)->id(); + (*iter)->reset(); + *ret = *iter; + iter = _pool.erase(iter); return Status::OK; + } else { + ++iter; } } diff --git a/be/test/runtime/routine_load_task_executor_test.cpp b/be/test/runtime/routine_load_task_executor_test.cpp index 3c078cb4d9cbbc..628a2ac6a615ac 100644 --- a/be/test/runtime/routine_load_task_executor_test.cpp +++ b/be/test/runtime/routine_load_task_executor_test.cpp @@ -98,7 +98,7 @@ TEST_F(RoutineLoadTaskExecutorTest, exec_task) { ASSERT_TRUE(st.ok()); sleep(10); - k_info.brokers = "127.0.0.2:9092"; + k_info.brokers = "127.0.0.1:9092"; task.__set_kafka_load_info(k_info); st = executor.submit_task(task); ASSERT_TRUE(st.ok());