From 297943669d469580ce51de2e92d25bbcf38441e0 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Mon, 17 Jun 2024 12:48:27 +0800 Subject: [PATCH 1/8] tmp --- be/src/vec/sink/autoinc_buffer.cpp | 20 ++++-- be/src/vec/sink/autoinc_buffer.h | 5 +- .../test_unique_auto_inc_concurrent.out | 4 ++ .../test_unique_auto_inc_concurrent.groovy | 61 +++++++++++++++++++ 4 files changed, 83 insertions(+), 7 deletions(-) create mode 100644 regression-test/data/data_model_p0/unique/test_unique_auto_inc_concurrent.out create mode 100644 regression-test/suites/data_model_p0/unique/test_unique_auto_inc_concurrent.groovy diff --git a/be/src/vec/sink/autoinc_buffer.cpp b/be/src/vec/sink/autoinc_buffer.cpp index c7c096ec6e8c1e..8c27127da0a5b2 100644 --- a/be/src/vec/sink/autoinc_buffer.cpp +++ b/be/src/vec/sink/autoinc_buffer.cpp @@ -65,10 +65,12 @@ Status AutoIncIDBuffer::sync_request_ids(size_t length, return _rpc_status; } + DCHECK(_is_backend_buffer_full); { std::lock_guard lock(_backend_buffer_latch); std::swap(_front_buffer, _backend_buffer); } + _is_backend_buffer_full = false; DCHECK_LE(length, _front_buffer.second); if (length > _front_buffer.second) { @@ -84,20 +86,19 @@ Status AutoIncIDBuffer::sync_request_ids(size_t length, } Status AutoIncIDBuffer::_prefetch_ids(size_t length) { - if (_front_buffer.second > _low_water_level_mark() || _is_fetching) { + if (_front_buffer.second > _low_water_level_mark() || _is_fetching || _is_backend_buffer_full) { return Status::OK(); } - TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address; - _is_fetching = true; RETURN_IF_ERROR(_rpc_token->submit_func([=, this]() { TAutoIncrementRangeRequest request; TAutoIncrementRangeResult result; + TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address; request.__set_db_id(_db_id); request.__set_table_id(_table_id); request.__set_column_id(_column_id); request.__set_length(length); - int64_t get_auto_inc_range_rpc_ns; + int64_t get_auto_inc_range_rpc_ns = 0; { SCOPED_RAW_TIMER(&get_auto_inc_range_rpc_ns); _rpc_status = ThriftRpcHelper::rpc( @@ -109,18 +110,27 @@ Status AutoIncIDBuffer::_prefetch_ids(size_t length) { LOG(INFO) << "[auto-inc-range][start=" << result.start << ",length=" << result.length << "][elapsed=" << get_auto_inc_range_rpc_ns / 1000000 << " ms]"; - if (!_rpc_status.ok() || result.length <= 0) { + if (!_rpc_status.ok()) { LOG(WARNING) << "Failed to fetch auto-incremnt range, encounter rpc failure." << "errmsg=" << _rpc_status.to_string(); return; } + if (result.length != length) [[unlikely]] { + _rpc_status = Status::RpcError( + "Failed to fetch auto-incremnt range, request length={}, but get " + "result.length={}", + length, result.length); + return; + } { std::lock_guard lock(_backend_buffer_latch); _backend_buffer = {result.start, result.length}; } + _is_backend_buffer_full = true; _is_fetching = false; })); + _is_fetching = true; return Status::OK(); } diff --git a/be/src/vec/sink/autoinc_buffer.h b/be/src/vec/sink/autoinc_buffer.h index 3ec009b0960657..8687f90078e2db 100644 --- a/be/src/vec/sink/autoinc_buffer.h +++ b/be/src/vec/sink/autoinc_buffer.h @@ -81,7 +81,9 @@ class AutoIncIDBuffer { std::unique_ptr _rpc_token; Status _rpc_status {Status::OK()}; + std::atomic _is_fetching {false}; + std::atomic _is_backend_buffer_full {false}; std::pair _front_buffer {0, 0}; std::pair _backend_buffer {0, 0}; @@ -115,8 +117,7 @@ class GlobalAutoIncBuffers { auto key = std::make_tuple(db_id, table_id, column_id); auto it = _buffers.find(key); if (it == _buffers.end()) { - _buffers.emplace(std::make_pair( - key, AutoIncIDBuffer::create_shared(db_id, table_id, column_id))); + _buffers.emplace(key, AutoIncIDBuffer::create_shared(db_id, table_id, column_id)); } return _buffers[{db_id, table_id, column_id}]; } diff --git a/regression-test/data/data_model_p0/unique/test_unique_auto_inc_concurrent.out b/regression-test/data/data_model_p0/unique/test_unique_auto_inc_concurrent.out new file mode 100644 index 00000000000000..bdf9609742cca3 --- /dev/null +++ b/regression-test/data/data_model_p0/unique/test_unique_auto_inc_concurrent.out @@ -0,0 +1,4 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +3000000 3000000 + diff --git a/regression-test/suites/data_model_p0/unique/test_unique_auto_inc_concurrent.groovy b/regression-test/suites/data_model_p0/unique/test_unique_auto_inc_concurrent.groovy new file mode 100644 index 00000000000000..fb064ce584ab37 --- /dev/null +++ b/regression-test/suites/data_model_p0/unique/test_unique_auto_inc_concurrent.groovy @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_unique_table_auto_inc_concurrent") { + + def table1 = "test_unique_table_auto_inc_concurrent" + sql "drop table if exists ${table1}" + sql """ + CREATE TABLE IF NOT EXISTS `${table1}` ( + `id` BIGINT NOT NULL AUTO_INCREMENT, + `value` int(11) NOT NULL + ) ENGINE=OLAP + UNIQUE KEY(`id`) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "in_memory" = "false", + "storage_format" = "V2", + "enable_unique_key_merge_on_write" = "true" + ) + """ + + def threads = [] + def thread_num = 30 + def rows = 10000 + def iters = 10 + + def load_task = { + (1..iters).each { id -> + sql """insert into ${table1}(value) select number from numbers("number" = "${rows}");""" + } + } + + (1..thread_num).each { id -> + threads.add(Thread.start { + load_task() + }) + } + + threads.each { thread -> thread.join() } + + qt_sql "select count(id), count(distinct id) from ${table1};" + + sql "drop table if exists ${table1};" +} + From ea6c87240ecc75fadc81d33ebb3d7bdb5f23d140 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Wed, 19 Jun 2024 15:14:58 +0800 Subject: [PATCH 2/8] tmp --- .../test_unique_auto_inc_concurrent.groovy | 28 ++++++++----------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/regression-test/suites/data_model_p0/unique/test_unique_auto_inc_concurrent.groovy b/regression-test/suites/data_model_p0/unique/test_unique_auto_inc_concurrent.groovy index fb064ce584ab37..8a27416f9e1c23 100644 --- a/regression-test/suites/data_model_p0/unique/test_unique_auto_inc_concurrent.groovy +++ b/regression-test/suites/data_model_p0/unique/test_unique_auto_inc_concurrent.groovy @@ -35,26 +35,22 @@ suite("test_unique_table_auto_inc_concurrent") { ) """ - def threads = [] - def thread_num = 30 - def rows = 10000 - def iters = 10 - - def load_task = { - (1..iters).each { id -> - sql """insert into ${table1}(value) select number from numbers("number" = "${rows}");""" + def run_test = {thread_num, rows, iters -> + def threads = [] + (1..thread_num).each { id1 -> + threads.add(Thread.start { + (1..iters).each { id2 -> + sql """insert into ${table1}(value) select number from numbers("number" = "${rows}");""" + } + }) } - } - (1..thread_num).each { id -> - threads.add(Thread.start { - load_task() - }) - } + threads.each { thread -> thread.join() } - threads.each { thread -> thread.join() } + qt_sql "select count(id), count(distinct id) from ${table1};" + } - qt_sql "select count(id), count(distinct id) from ${table1};" + run_test(30, 10000, 10) sql "drop table if exists ${table1};" } From 32d75ea6b7acaf05ccec618ebc7a228cddd8dc63 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Thu, 20 Jun 2024 18:31:10 +0800 Subject: [PATCH 3/8] check master in getAutoIncrementRange service --- .../org/apache/doris/service/FrontendServiceImpl.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 1b106576720453..d0ab049e4e8943 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -2626,6 +2626,15 @@ public TAutoIncrementRangeResult getAutoIncrementRange(TAutoIncrementRangeReques TAutoIncrementRangeResult result = new TAutoIncrementRangeResult(); TStatus status = new TStatus(TStatusCode.OK); result.setStatus(status); + + if (!Env.getCurrentEnv().isMaster()) { + status.setStatusCode(TStatusCode.NOT_MASTER); + status.addToErrorMsgs(NOT_MASTER_ERR_MSG); + LOG.error("failed to getAutoIncrementRange:{}, request:{}, backend:{}", + NOT_MASTER_ERR_MSG, request, getClientAddrAsString()); + return result; + } + try { Env env = Env.getCurrentEnv(); Database db = env.getInternalCatalog().getDbOrMetaException(request.getDbId()); From 86b5da16e65c4e2bc264d5f35b85b497d210b6a5 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Thu, 20 Jun 2024 20:03:24 +0800 Subject: [PATCH 4/8] refactor autoinf_buffer async fetching mechanism --- be/src/vec/sink/autoinc_buffer.cpp | 150 +++++++++++------- be/src/vec/sink/autoinc_buffer.h | 38 ++++- .../doris/catalog/AutoIncrementGenerator.java | 2 +- .../test_unique_auto_inc_concurrent.out | 6 + .../test_unique_auto_inc_concurrent.groovy | 2 + 5 files changed, 132 insertions(+), 66 deletions(-) diff --git a/be/src/vec/sink/autoinc_buffer.cpp b/be/src/vec/sink/autoinc_buffer.cpp index 8c27127da0a5b2..9531e782db8903 100644 --- a/be/src/vec/sink/autoinc_buffer.cpp +++ b/be/src/vec/sink/autoinc_buffer.cpp @@ -19,6 +19,7 @@ #include +#include #include #include "common/status.h" @@ -42,54 +43,10 @@ void AutoIncIDBuffer::set_batch_size_at_least(size_t batch_size) { } } -void AutoIncIDBuffer::_wait_for_prefetching() { - if (_is_fetching) { - _rpc_token->wait(); - } -} - -Status AutoIncIDBuffer::sync_request_ids(size_t length, - std::vector>* result) { - std::unique_lock lock(_mutex); - RETURN_IF_ERROR(_prefetch_ids(_prefetch_size())); - if (_front_buffer.second > 0) { - auto min_length = std::min(_front_buffer.second, length); - length -= min_length; - result->emplace_back(_front_buffer.first, min_length); - _front_buffer.first += min_length; - _front_buffer.second -= min_length; - } - if (length > 0) { - _wait_for_prefetching(); - if (!_rpc_status.ok()) { - return _rpc_status; - } - - DCHECK(_is_backend_buffer_full); - { - std::lock_guard lock(_backend_buffer_latch); - std::swap(_front_buffer, _backend_buffer); - } - _is_backend_buffer_full = false; - - DCHECK_LE(length, _front_buffer.second); - if (length > _front_buffer.second) { - return Status::RpcError("auto inc sync result length > front buffer. " + - std::to_string(length) + " vs " + - std::to_string(_front_buffer.second)); - } - result->emplace_back(_front_buffer.first, length); - _front_buffer.first += length; - _front_buffer.second -= length; - } - return Status::OK(); -} - -Status AutoIncIDBuffer::_prefetch_ids(size_t length) { - if (_front_buffer.second > _low_water_level_mark() || _is_fetching || _is_backend_buffer_full) { - return Status::OK(); - } - RETURN_IF_ERROR(_rpc_token->submit_func([=, this]() { +Result AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) { + constexpr uint32_t FETCH_AUTOINC_MAX_RETRY_TIMES = 3; + _rpc_status = Status::OK(); + for (uint32_t retry_times = 0; retry_times < FETCH_AUTOINC_MAX_RETRY_TIMES; retry_times++) { TAutoIncrementRangeRequest request; TAutoIncrementRangeResult result; TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address; @@ -111,23 +68,102 @@ Status AutoIncIDBuffer::_prefetch_ids(size_t length) { << "][elapsed=" << get_auto_inc_range_rpc_ns / 1000000 << " ms]"; if (!_rpc_status.ok()) { - LOG(WARNING) << "Failed to fetch auto-incremnt range, encounter rpc failure." - << "errmsg=" << _rpc_status.to_string(); - return; + LOG(WARNING) + << "Failed to fetch auto-incremnt range, encounter rpc failure. retry_time=" + << retry_times << ", errmsg=" << _rpc_status.to_string(); + continue; } if (result.length != length) [[unlikely]] { _rpc_status = Status::RpcError( "Failed to fetch auto-incremnt range, request length={}, but get " - "result.length={}", - length, result.length); - return; + "result.length={}, retry_time={}", + length, result.length, retry_times); + continue; } + return result.start; + } + CHECK(!_rpc_status.ok()); + return _rpc_status; +} + +void AutoIncIDBuffer::_get_autoinc_ranges_from_buffers( + size_t& request_length, std::vector>* result) { + std::lock_guard lock {_latch}; + while (request_length > 0 && !_buffers.empty()) { + auto& autoinc_range = _buffers.front(); + CHECK_GT(autoinc_range.length, 0); + auto min_length = std::min(request_length, autoinc_range.length); + result->emplace_back(autoinc_range.start, min_length); + autoinc_range.consume(min_length); + request_length -= min_length; + _current_volume -= min_length; + if (autoinc_range.empty()) { + _buffers.pop_front(); + } + } +} + +Status AutoIncIDBuffer::sync_request_ids(size_t request_length, + std::vector>* result) { + std::unique_lock lock(_mutex); + int current = current_volume(); + if (current < request_length) { + // ids from _buffers is NOT sufficient for current request, + // first, we wait for any potential asynchronous fetch task + if (_is_fetching) { + _rpc_token->wait(); + } + if (!_rpc_status.ok()) { + return _rpc_status; + } + // then try to feed the request_length + _get_autoinc_ranges_from_buffers(request_length, result); + // If ids from buffers is still not enough for request, we should fetch ids from fe synchronously. + // After that, if current_volume <= low_water_level_mark, we should launch an asynchronous fetch task. + // However, in order to reduce latency, we combine these two RPCs into one + + // it's guarenteed that there's no background task here, so no need to lock _latch + if (request_length > 0 || _current_volume <= _low_water_level_mark()) { + int64_t remained_length = _prefetch_size(); + int64_t start = DORIS_TRY(_fetch_ids_from_fe(request_length + remained_length)); + if (request_length > 0) { + result->emplace_back(start, request_length); + } + start += request_length; + _buffers.emplace_back(start, remained_length); + _current_volume += remained_length; + } + + } else if (current <= _low_water_level_mark()) { + // ids from _buffers is sufficient for current request, + // but current_volume <= low_water_level_mark, need to launch an asynchronous fetch task if no such task exists + _get_autoinc_ranges_from_buffers(request_length, result); + CHECK_EQ(request_length, 0); + if (!_is_fetching) { + RETURN_IF_ERROR(_launch_async_fetch_task(_prefetch_size())); + } + } else { + // ids from _buffers is sufficient for current request, + // and current_volume > low_water_level_mark, no need to launch asynchronous fetch task + _get_autoinc_ranges_from_buffers(request_length, result); + CHECK_EQ(request_length, 0); + } + return Status::OK(); +} + +Status AutoIncIDBuffer::_launch_async_fetch_task(size_t length) { + RETURN_IF_ERROR(_rpc_token->submit_func([=, this]() { + auto&& res = _fetch_ids_from_fe(length); + if (!res.has_value()) [[unlikely]] { + return; + } + int64_t start = res.value(); { - std::lock_guard lock(_backend_buffer_latch); - _backend_buffer = {result.start, result.length}; + std::lock_guard lock {_latch}; + _buffers.emplace_back(start, length); + _current_volume += length; } - _is_backend_buffer_full = true; _is_fetching = false; })); _is_fetching = true; diff --git a/be/src/vec/sink/autoinc_buffer.h b/be/src/vec/sink/autoinc_buffer.h index 8687f90078e2db..60fb6773f6e427 100644 --- a/be/src/vec/sink/autoinc_buffer.h +++ b/be/src/vec/sink/autoinc_buffer.h @@ -43,7 +43,7 @@ struct AutoIncIDAllocator { return ids.front().first++; } - void insert_ids(int64_t start, size_t length) { + void insert_ids(int64_t start, std::size_t length) { total_count += length; ids.emplace_back(start, length); } @@ -61,17 +61,39 @@ class AutoIncIDBuffer { // all public functions are thread safe AutoIncIDBuffer(int64_t _db_id, int64_t _table_id, int64_t column_id); void set_batch_size_at_least(size_t batch_size); - Status sync_request_ids(size_t length, std::vector>* result); + Status sync_request_ids(size_t request_length, std::vector>* result); + + struct AutoIncRange { + int64_t start; + size_t length; + + bool empty() const { return length == 0; } + + void consume(size_t l) { + start += l; + length -= l; + } + }; private: - Status _prefetch_ids(size_t length); [[nodiscard]] size_t _prefetch_size() const { return _batch_size * config::auto_inc_prefetch_size_ratio; } + [[nodiscard]] size_t _low_water_level_mark() const { return _batch_size * config::auto_inc_low_water_level_mark_size_ratio; }; - void _wait_for_prefetching(); + + size_t current_volume() const { + std::lock_guard lock {_latch}; + return _current_volume; + } + void _get_autoinc_ranges_from_buffers(size_t& request_length, + std::vector>* result); + + Status _launch_async_fetch_task(size_t length); + + Result _fetch_ids_from_fe(size_t length); std::atomic _batch_size {MIN_BATCH_SIZE}; @@ -83,12 +105,12 @@ class AutoIncIDBuffer { Status _rpc_status {Status::OK()}; std::atomic _is_fetching {false}; - std::atomic _is_backend_buffer_full {false}; - std::pair _front_buffer {0, 0}; - std::pair _backend_buffer {0, 0}; - std::mutex _backend_buffer_latch; // for _backend_buffer std::mutex _mutex; + + mutable std::mutex _latch; + size_t _current_volume {0}; + std::list _buffers; }; class GlobalAutoIncBuffers { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/AutoIncrementGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/AutoIncrementGenerator.java index be110360850cab..3562259a5f52b3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/AutoIncrementGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/AutoIncrementGenerator.java @@ -40,7 +40,7 @@ public class AutoIncrementGenerator implements Writable, GsonPostProcessable { public static final long NEXT_ID_INIT_VALUE = 1; // _MIN_BATCH_SIZE = 4064 in load task - private static final long BATCH_ID_INTERVAL = 50000; + private static final long BATCH_ID_INTERVAL = 500000; @SerializedName(value = "dbId") private Long dbId; diff --git a/regression-test/data/data_model_p0/unique/test_unique_auto_inc_concurrent.out b/regression-test/data/data_model_p0/unique/test_unique_auto_inc_concurrent.out index bdf9609742cca3..796adb08837731 100644 --- a/regression-test/data/data_model_p0/unique/test_unique_auto_inc_concurrent.out +++ b/regression-test/data/data_model_p0/unique/test_unique_auto_inc_concurrent.out @@ -2,3 +2,9 @@ -- !sql -- 3000000 3000000 +-- !sql -- +6000000 6000000 + +-- !sql -- +9000000 9000000 + diff --git a/regression-test/suites/data_model_p0/unique/test_unique_auto_inc_concurrent.groovy b/regression-test/suites/data_model_p0/unique/test_unique_auto_inc_concurrent.groovy index 8a27416f9e1c23..c738cc1a1dc046 100644 --- a/regression-test/suites/data_model_p0/unique/test_unique_auto_inc_concurrent.groovy +++ b/regression-test/suites/data_model_p0/unique/test_unique_auto_inc_concurrent.groovy @@ -51,6 +51,8 @@ suite("test_unique_table_auto_inc_concurrent") { } run_test(30, 10000, 10) + run_test(30, 100000, 1) + run_test(5, 60000, 10) sql "drop table if exists ${table1};" } From 506c4836ec879798b11444d1457dfde33e96e814 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Mon, 24 Jun 2024 22:10:50 +0800 Subject: [PATCH 5/8] tmp save, refactor autoinc_buffer.cpp, remove current_volume --- be/src/vec/sink/autoinc_buffer.cpp | 48 ++++++------------------------ be/src/vec/sink/autoinc_buffer.h | 5 ---- 2 files changed, 9 insertions(+), 44 deletions(-) diff --git a/be/src/vec/sink/autoinc_buffer.cpp b/be/src/vec/sink/autoinc_buffer.cpp index 9531e782db8903..33fa6f3b446969 100644 --- a/be/src/vec/sink/autoinc_buffer.cpp +++ b/be/src/vec/sink/autoinc_buffer.cpp @@ -97,7 +97,6 @@ void AutoIncIDBuffer::_get_autoinc_ranges_from_buffers( result->emplace_back(autoinc_range.start, min_length); autoinc_range.consume(min_length); request_length -= min_length; - _current_volume -= min_length; if (autoinc_range.empty()) { _buffers.pop_front(); } @@ -107,47 +106,19 @@ void AutoIncIDBuffer::_get_autoinc_ranges_from_buffers( Status AutoIncIDBuffer::sync_request_ids(size_t request_length, std::vector>* result) { std::unique_lock lock(_mutex); - int current = current_volume(); - if (current < request_length) { - // ids from _buffers is NOT sufficient for current request, - // first, we wait for any potential asynchronous fetch task - if (_is_fetching) { - _rpc_token->wait(); - } - if (!_rpc_status.ok()) { - return _rpc_status; - } - // then try to feed the request_length + while (request_length > 0) { _get_autoinc_ranges_from_buffers(request_length, result); - // If ids from buffers is still not enough for request, we should fetch ids from fe synchronously. - // After that, if current_volume <= low_water_level_mark, we should launch an asynchronous fetch task. - // However, in order to reduce latency, we combine these two RPCs into one - - // it's guarenteed that there's no background task here, so no need to lock _latch - if (request_length > 0 || _current_volume <= _low_water_level_mark()) { - int64_t remained_length = _prefetch_size(); - int64_t start = DORIS_TRY(_fetch_ids_from_fe(request_length + remained_length)); - if (request_length > 0) { - result->emplace_back(start, request_length); - } - start += request_length; - _buffers.emplace_back(start, remained_length); - _current_volume += remained_length; + if (request_length == 0) { + break; } - - } else if (current <= _low_water_level_mark()) { - // ids from _buffers is sufficient for current request, - // but current_volume <= low_water_level_mark, need to launch an asynchronous fetch task if no such task exists - _get_autoinc_ranges_from_buffers(request_length, result); - CHECK_EQ(request_length, 0); if (!_is_fetching) { - RETURN_IF_ERROR(_launch_async_fetch_task(_prefetch_size())); + RETURN_IF_ERROR(_launch_async_fetch_task(request_length)); } - } else { - // ids from _buffers is sufficient for current request, - // and current_volume > low_water_level_mark, no need to launch asynchronous fetch task - _get_autoinc_ranges_from_buffers(request_length, result); - CHECK_EQ(request_length, 0); + _rpc_token->wait(); + } + CHECK_EQ(request_length, 0); + if (!_is_fetching) { + RETURN_IF_ERROR(_launch_async_fetch_task(_prefetch_size())); } return Status::OK(); } @@ -162,7 +133,6 @@ Status AutoIncIDBuffer::_launch_async_fetch_task(size_t length) { { std::lock_guard lock {_latch}; _buffers.emplace_back(start, length); - _current_volume += length; } _is_fetching = false; })); diff --git a/be/src/vec/sink/autoinc_buffer.h b/be/src/vec/sink/autoinc_buffer.h index 60fb6773f6e427..f10b7e77c50d62 100644 --- a/be/src/vec/sink/autoinc_buffer.h +++ b/be/src/vec/sink/autoinc_buffer.h @@ -84,10 +84,6 @@ class AutoIncIDBuffer { return _batch_size * config::auto_inc_low_water_level_mark_size_ratio; }; - size_t current_volume() const { - std::lock_guard lock {_latch}; - return _current_volume; - } void _get_autoinc_ranges_from_buffers(size_t& request_length, std::vector>* result); @@ -109,7 +105,6 @@ class AutoIncIDBuffer { std::mutex _mutex; mutable std::mutex _latch; - size_t _current_volume {0}; std::list _buffers; }; From 696a3c640fae699ac494a89b5fcd4d44f52c4fdf Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Wed, 26 Jun 2024 16:46:33 +0800 Subject: [PATCH 6/8] refactor autoinc_buffer --- be/src/vec/sink/autoinc_buffer.cpp | 39 ++++++++++++++++--- be/src/vec/sink/autoinc_buffer.h | 5 ++- .../doris/service/FrontendServiceImpl.java | 1 + gensrc/thrift/FrontendService.thrift | 1 + 4 files changed, 39 insertions(+), 7 deletions(-) diff --git a/be/src/vec/sink/autoinc_buffer.cpp b/be/src/vec/sink/autoinc_buffer.cpp index 33fa6f3b446969..fa12afd57b078d 100644 --- a/be/src/vec/sink/autoinc_buffer.cpp +++ b/be/src/vec/sink/autoinc_buffer.cpp @@ -22,6 +22,7 @@ #include #include +#include "common/logging.h" #include "common/status.h" #include "runtime/client_cache.h" #include "runtime/exec_env.h" @@ -46,10 +47,10 @@ void AutoIncIDBuffer::set_batch_size_at_least(size_t batch_size) { Result AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) { constexpr uint32_t FETCH_AUTOINC_MAX_RETRY_TIMES = 3; _rpc_status = Status::OK(); + TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address; for (uint32_t retry_times = 0; retry_times < FETCH_AUTOINC_MAX_RETRY_TIMES; retry_times++) { TAutoIncrementRangeRequest request; TAutoIncrementRangeResult result; - TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address; request.__set_db_id(_db_id); request.__set_table_id(_table_id); request.__set_column_id(_column_id); @@ -67,6 +68,16 @@ Result AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) { LOG(INFO) << "[auto-inc-range][start=" << result.start << ",length=" << result.length << "][elapsed=" << get_auto_inc_range_rpc_ns / 1000000 << " ms]"; + if (_rpc_status.is()) { + LOG_WARNING( + "Failed to fetch auto-incremnt range, request to non-master FE, discard all " + "auto_increment ranges in _buffers. retry_time={}", + retry_times); + master_addr = result.master_address; + _discard_all(); + continue; + } + if (!_rpc_status.ok()) { LOG(WARNING) << "Failed to fetch auto-incremnt range, encounter rpc failure. retry_time=" @@ -74,10 +85,12 @@ Result AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) { continue; } if (result.length != length) [[unlikely]] { - _rpc_status = Status::RpcError( + auto msg = fmt::format( "Failed to fetch auto-incremnt range, request length={}, but get " "result.length={}, retry_time={}", length, result.length, retry_times); + LOG(WARNING) << msg; + _rpc_status = Status::RpcError(msg); continue; } @@ -96,6 +109,7 @@ void AutoIncIDBuffer::_get_autoinc_ranges_from_buffers( auto min_length = std::min(request_length, autoinc_range.length); result->emplace_back(autoinc_range.start, min_length); autoinc_range.consume(min_length); + _current_volume -= min_length; request_length -= min_length; if (autoinc_range.empty()) { _buffers.pop_front(); @@ -105,39 +119,52 @@ void AutoIncIDBuffer::_get_autoinc_ranges_from_buffers( Status AutoIncIDBuffer::sync_request_ids(size_t request_length, std::vector>* result) { - std::unique_lock lock(_mutex); + std::lock_guard lock(_mutex); while (request_length > 0) { _get_autoinc_ranges_from_buffers(request_length, result); if (request_length == 0) { break; } if (!_is_fetching) { - RETURN_IF_ERROR(_launch_async_fetch_task(request_length)); + RETURN_IF_ERROR( + _launch_async_fetch_task(std::max(request_length, _prefetch_size()))); } _rpc_token->wait(); + CHECK(!_is_fetching); + if (!_rpc_status.ok()) { + return _rpc_status; + } } CHECK_EQ(request_length, 0); - if (!_is_fetching) { + if (!_is_fetching && _current_volume < _low_water_level_mark()) { RETURN_IF_ERROR(_launch_async_fetch_task(_prefetch_size())); } return Status::OK(); } Status AutoIncIDBuffer::_launch_async_fetch_task(size_t length) { + _is_fetching = true; RETURN_IF_ERROR(_rpc_token->submit_func([=, this]() { auto&& res = _fetch_ids_from_fe(length); if (!res.has_value()) [[unlikely]] { + _is_fetching = false; return; } int64_t start = res.value(); { std::lock_guard lock {_latch}; _buffers.emplace_back(start, length); + _current_volume += length; } _is_fetching = false; })); - _is_fetching = true; return Status::OK(); } +void AutoIncIDBuffer::_discard_all() { + std::lock_guard lock {_latch}; + _buffers.clear(); + _current_volume = 0; +} + } // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/sink/autoinc_buffer.h b/be/src/vec/sink/autoinc_buffer.h index f10b7e77c50d62..9ea183e8f8949d 100644 --- a/be/src/vec/sink/autoinc_buffer.h +++ b/be/src/vec/sink/autoinc_buffer.h @@ -43,7 +43,7 @@ struct AutoIncIDAllocator { return ids.front().first++; } - void insert_ids(int64_t start, std::size_t length) { + void insert_ids(int64_t start, size_t length) { total_count += length; ids.emplace_back(start, length); } @@ -91,6 +91,8 @@ class AutoIncIDBuffer { Result _fetch_ids_from_fe(size_t length); + void _discard_all(); + std::atomic _batch_size {MIN_BATCH_SIZE}; int64_t _db_id; @@ -106,6 +108,7 @@ class AutoIncIDBuffer { mutable std::mutex _latch; std::list _buffers; + size_t _current_volume {0}; }; class GlobalAutoIncBuffers { diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index d0ab049e4e8943..75a1987eb235b7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -2630,6 +2630,7 @@ public TAutoIncrementRangeResult getAutoIncrementRange(TAutoIncrementRangeReques if (!Env.getCurrentEnv().isMaster()) { status.setStatusCode(TStatusCode.NOT_MASTER); status.addToErrorMsgs(NOT_MASTER_ERR_MSG); + result.setMasterAddress(getMasterAddress()); LOG.error("failed to getAutoIncrementRange:{}, request:{}, backend:{}", NOT_MASTER_ERR_MSG, request, getClientAddrAsString()); return result; diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 9a57abc9b613b0..d25f5c0ac2be63 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1323,6 +1323,7 @@ struct TAutoIncrementRangeResult { 1: optional Status.TStatus status 2: optional i64 start 3: optional i64 length + 4: optional Types.TNetworkAddress master_address } struct TCreatePartitionRequest { From 1f06a05e7bf93791e5353f972f3125f050d84d88 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Wed, 26 Jun 2024 20:36:18 +0800 Subject: [PATCH 7/8] reduce test concurrency --- be/src/vec/sink/autoinc_buffer.cpp | 6 ++++-- .../unique/test_unique_auto_inc_concurrent.out | 6 +++--- .../unique/test_unique_auto_inc_concurrent.groovy | 6 +++--- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/be/src/vec/sink/autoinc_buffer.cpp b/be/src/vec/sink/autoinc_buffer.cpp index fa12afd57b078d..2d4eb792f4729e 100644 --- a/be/src/vec/sink/autoinc_buffer.cpp +++ b/be/src/vec/sink/autoinc_buffer.cpp @@ -19,8 +19,8 @@ #include +#include #include -#include #include "common/logging.h" #include "common/status.h" @@ -28,7 +28,6 @@ #include "runtime/exec_env.h" #include "util/runtime_profile.h" #include "util/thrift_rpc_helper.h" -#include "vec/sink/vtablet_block_convertor.h" namespace doris::vectorized { @@ -75,6 +74,7 @@ Result AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) { retry_times); master_addr = result.master_address; _discard_all(); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); continue; } @@ -82,6 +82,7 @@ Result AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) { LOG(WARNING) << "Failed to fetch auto-incremnt range, encounter rpc failure. retry_time=" << retry_times << ", errmsg=" << _rpc_status.to_string(); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); continue; } if (result.length != length) [[unlikely]] { @@ -91,6 +92,7 @@ Result AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) { length, result.length, retry_times); LOG(WARNING) << msg; _rpc_status = Status::RpcError(msg); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); continue; } diff --git a/regression-test/data/data_model_p0/unique/test_unique_auto_inc_concurrent.out b/regression-test/data/data_model_p0/unique/test_unique_auto_inc_concurrent.out index 796adb08837731..03819c9a717a90 100644 --- a/regression-test/data/data_model_p0/unique/test_unique_auto_inc_concurrent.out +++ b/regression-test/data/data_model_p0/unique/test_unique_auto_inc_concurrent.out @@ -1,10 +1,10 @@ -- This file is automatically generated. You should know what you did if you want to edit this -- !sql -- -3000000 3000000 +1500000 1500000 -- !sql -- -6000000 6000000 +3000000 3000000 -- !sql -- -9000000 9000000 +4500000 4500000 diff --git a/regression-test/suites/data_model_p0/unique/test_unique_auto_inc_concurrent.groovy b/regression-test/suites/data_model_p0/unique/test_unique_auto_inc_concurrent.groovy index c738cc1a1dc046..bf6d584b2af555 100644 --- a/regression-test/suites/data_model_p0/unique/test_unique_auto_inc_concurrent.groovy +++ b/regression-test/suites/data_model_p0/unique/test_unique_auto_inc_concurrent.groovy @@ -50,9 +50,9 @@ suite("test_unique_table_auto_inc_concurrent") { qt_sql "select count(id), count(distinct id) from ${table1};" } - run_test(30, 10000, 10) - run_test(30, 100000, 1) - run_test(5, 60000, 10) + run_test(15, 10000, 10) + run_test(15, 100000, 1) + run_test(5, 30000, 10) sql "drop table if exists ${table1};" } From 95f675031bdba2fb65ac04a3cc1f42a95ab0e87b Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Thu, 27 Jun 2024 22:33:22 +0800 Subject: [PATCH 8/8] fix AutoIncrementGenerator --- be/src/vec/sink/autoinc_buffer.cpp | 7 ------- be/src/vec/sink/autoinc_buffer.h | 2 -- .../org/apache/doris/catalog/AutoIncrementGenerator.java | 3 +-- 3 files changed, 1 insertion(+), 11 deletions(-) diff --git a/be/src/vec/sink/autoinc_buffer.cpp b/be/src/vec/sink/autoinc_buffer.cpp index 2d4eb792f4729e..f83dbcb55b8a9f 100644 --- a/be/src/vec/sink/autoinc_buffer.cpp +++ b/be/src/vec/sink/autoinc_buffer.cpp @@ -73,7 +73,6 @@ Result AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) { "auto_increment ranges in _buffers. retry_time={}", retry_times); master_addr = result.master_address; - _discard_all(); std::this_thread::sleep_for(std::chrono::milliseconds(10)); continue; } @@ -163,10 +162,4 @@ Status AutoIncIDBuffer::_launch_async_fetch_task(size_t length) { return Status::OK(); } -void AutoIncIDBuffer::_discard_all() { - std::lock_guard lock {_latch}; - _buffers.clear(); - _current_volume = 0; -} - } // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/sink/autoinc_buffer.h b/be/src/vec/sink/autoinc_buffer.h index 9ea183e8f8949d..032ac18981f4da 100644 --- a/be/src/vec/sink/autoinc_buffer.h +++ b/be/src/vec/sink/autoinc_buffer.h @@ -91,8 +91,6 @@ class AutoIncIDBuffer { Result _fetch_ids_from_fe(size_t length); - void _discard_all(); - std::atomic _batch_size {MIN_BATCH_SIZE}; int64_t _db_id; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/AutoIncrementGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/AutoIncrementGenerator.java index 3562259a5f52b3..e4c8cf5de01566 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/AutoIncrementGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/AutoIncrementGenerator.java @@ -48,7 +48,6 @@ public class AutoIncrementGenerator implements Writable, GsonPostProcessable { private Long tableId; @SerializedName(value = "columnId") private Long columnId; - @SerializedName(value = "nextId") private long nextId; @SerializedName(value = "batchEndId") private long batchEndId; @@ -86,10 +85,10 @@ public synchronized Pair getAutoIncrementRange(long columnId, long endId = startId + length; nextId = startId + length; if (endId > batchEndId) { - batchEndId = (endId / BATCH_ID_INTERVAL + 1) * BATCH_ID_INTERVAL; Preconditions.checkState(editLog != null); AutoIncrementIdUpdateLog info = new AutoIncrementIdUpdateLog(dbId, tableId, columnId, batchEndId); editLog.logUpdateAutoIncrementId(info); + batchEndId = (endId / BATCH_ID_INTERVAL + 1) * BATCH_ID_INTERVAL; } LOG.info("[getAutoIncrementRange result][{}, {}]", startId, length); return Pair.of(startId, length);