Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 94 additions & 56 deletions be/src/vec/sink/autoinc_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@

#include <gen_cpp/HeartbeatService_types.h>

#include <string>
#include <chrono>
#include <mutex>

#include "common/logging.h"
#include "common/status.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "util/runtime_profile.h"
#include "util/thrift_rpc_helper.h"
#include "vec/sink/vtablet_block_convertor.h"

namespace doris::vectorized {

Expand All @@ -42,62 +43,19 @@ void AutoIncIDBuffer::set_batch_size_at_least(size_t batch_size) {
}
}

void AutoIncIDBuffer::_wait_for_prefetching() {
if (_is_fetching) {
_rpc_token->wait();
}
}

Status AutoIncIDBuffer::sync_request_ids(size_t length,
std::vector<std::pair<int64_t, size_t>>* result) {
std::unique_lock<std::mutex> lock(_mutex);
RETURN_IF_ERROR(_prefetch_ids(_prefetch_size()));
if (_front_buffer.second > 0) {
auto min_length = std::min(_front_buffer.second, length);
length -= min_length;
result->emplace_back(_front_buffer.first, min_length);
_front_buffer.first += min_length;
_front_buffer.second -= min_length;
}
if (length > 0) {
_wait_for_prefetching();
if (!_rpc_status.ok()) {
return _rpc_status;
}

{
std::lock_guard<std::mutex> lock(_backend_buffer_latch);
std::swap(_front_buffer, _backend_buffer);
}

DCHECK_LE(length, _front_buffer.second);
if (length > _front_buffer.second) {
return Status::RpcError("auto inc sync result length > front buffer. " +
std::to_string(length) + " vs " +
std::to_string(_front_buffer.second));
}
result->emplace_back(_front_buffer.first, length);
_front_buffer.first += length;
_front_buffer.second -= length;
}
return Status::OK();
}

Status AutoIncIDBuffer::_prefetch_ids(size_t length) {
if (_front_buffer.second > _low_water_level_mark() || _is_fetching) {
return Status::OK();
}
Result<int64_t> AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) {
constexpr uint32_t FETCH_AUTOINC_MAX_RETRY_TIMES = 3;
_rpc_status = Status::OK();
TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address;
_is_fetching = true;
RETURN_IF_ERROR(_rpc_token->submit_func([=, this]() {
for (uint32_t retry_times = 0; retry_times < FETCH_AUTOINC_MAX_RETRY_TIMES; retry_times++) {
TAutoIncrementRangeRequest request;
TAutoIncrementRangeResult result;
request.__set_db_id(_db_id);
request.__set_table_id(_table_id);
request.__set_column_id(_column_id);
request.__set_length(length);

int64_t get_auto_inc_range_rpc_ns;
int64_t get_auto_inc_range_rpc_ns = 0;
{
SCOPED_RAW_TIMER(&get_auto_inc_range_rpc_ns);
_rpc_status = ThriftRpcHelper::rpc<FrontendServiceClient>(
Expand All @@ -109,15 +67,95 @@ Status AutoIncIDBuffer::_prefetch_ids(size_t length) {
LOG(INFO) << "[auto-inc-range][start=" << result.start << ",length=" << result.length
<< "][elapsed=" << get_auto_inc_range_rpc_ns / 1000000 << " ms]";

if (!_rpc_status.ok() || result.length <= 0) {
LOG(WARNING) << "Failed to fetch auto-incremnt range, encounter rpc failure."
<< "errmsg=" << _rpc_status.to_string();
return;
if (_rpc_status.is<ErrorCode::NOT_MASTER>()) {
LOG_WARNING(
"Failed to fetch auto-incremnt range, request to non-master FE, discard all "
"auto_increment ranges in _buffers. retry_time={}",
retry_times);
master_addr = result.master_address;
std::this_thread::sleep_for(std::chrono::milliseconds(10));
continue;
}

if (!_rpc_status.ok()) {
LOG(WARNING)
<< "Failed to fetch auto-incremnt range, encounter rpc failure. retry_time="
<< retry_times << ", errmsg=" << _rpc_status.to_string();
std::this_thread::sleep_for(std::chrono::milliseconds(10));
continue;
}
if (result.length != length) [[unlikely]] {
auto msg = fmt::format(
"Failed to fetch auto-incremnt range, request length={}, but get "
"result.length={}, retry_time={}",
length, result.length, retry_times);
LOG(WARNING) << msg;
_rpc_status = Status::RpcError<true>(msg);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
continue;
}

return result.start;
}
CHECK(!_rpc_status.ok());
return _rpc_status;
}

void AutoIncIDBuffer::_get_autoinc_ranges_from_buffers(
size_t& request_length, std::vector<std::pair<int64_t, size_t>>* result) {
std::lock_guard<std::mutex> lock {_latch};
while (request_length > 0 && !_buffers.empty()) {
auto& autoinc_range = _buffers.front();
CHECK_GT(autoinc_range.length, 0);
auto min_length = std::min(request_length, autoinc_range.length);
result->emplace_back(autoinc_range.start, min_length);
autoinc_range.consume(min_length);
_current_volume -= min_length;
request_length -= min_length;
if (autoinc_range.empty()) {
_buffers.pop_front();
}
}
}

Status AutoIncIDBuffer::sync_request_ids(size_t request_length,
std::vector<std::pair<int64_t, size_t>>* result) {
std::lock_guard<std::mutex> lock(_mutex);
while (request_length > 0) {
_get_autoinc_ranges_from_buffers(request_length, result);
if (request_length == 0) {
break;
}
if (!_is_fetching) {
RETURN_IF_ERROR(
_launch_async_fetch_task(std::max<size_t>(request_length, _prefetch_size())));
}
_rpc_token->wait();
CHECK(!_is_fetching);
if (!_rpc_status.ok()) {
return _rpc_status;
}
}
CHECK_EQ(request_length, 0);
if (!_is_fetching && _current_volume < _low_water_level_mark()) {
RETURN_IF_ERROR(_launch_async_fetch_task(_prefetch_size()));
}
return Status::OK();
}

Status AutoIncIDBuffer::_launch_async_fetch_task(size_t length) {
_is_fetching = true;
RETURN_IF_ERROR(_rpc_token->submit_func([=, this]() {
auto&& res = _fetch_ids_from_fe(length);
if (!res.has_value()) [[unlikely]] {
_is_fetching = false;
return;
}
int64_t start = res.value();
{
std::lock_guard<std::mutex> lock(_backend_buffer_latch);
_backend_buffer = {result.start, result.length};
std::lock_guard<std::mutex> lock {_latch};
_buffers.emplace_back(start, length);
_current_volume += length;
}
_is_fetching = false;
}));
Expand Down
35 changes: 27 additions & 8 deletions be/src/vec/sink/autoinc_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,35 @@ class AutoIncIDBuffer {
// all public functions are thread safe
AutoIncIDBuffer(int64_t _db_id, int64_t _table_id, int64_t column_id);
void set_batch_size_at_least(size_t batch_size);
Status sync_request_ids(size_t length, std::vector<std::pair<int64_t, size_t>>* result);
Status sync_request_ids(size_t request_length, std::vector<std::pair<int64_t, size_t>>* result);

struct AutoIncRange {
int64_t start;
size_t length;

bool empty() const { return length == 0; }

void consume(size_t l) {
start += l;
length -= l;
}
};

private:
Status _prefetch_ids(size_t length);
[[nodiscard]] size_t _prefetch_size() const {
return _batch_size * config::auto_inc_prefetch_size_ratio;
}

[[nodiscard]] size_t _low_water_level_mark() const {
return _batch_size * config::auto_inc_low_water_level_mark_size_ratio;
};
void _wait_for_prefetching();

void _get_autoinc_ranges_from_buffers(size_t& request_length,
std::vector<std::pair<int64_t, size_t>>* result);

Status _launch_async_fetch_task(size_t length);

Result<int64_t> _fetch_ids_from_fe(size_t length);

std::atomic<size_t> _batch_size {MIN_BATCH_SIZE};

Expand All @@ -81,12 +99,14 @@ class AutoIncIDBuffer {

std::unique_ptr<ThreadPoolToken> _rpc_token;
Status _rpc_status {Status::OK()};

std::atomic<bool> _is_fetching {false};

std::pair<int64_t, size_t> _front_buffer {0, 0};
std::pair<int64_t, size_t> _backend_buffer {0, 0};
std::mutex _backend_buffer_latch; // for _backend_buffer
std::mutex _mutex;

mutable std::mutex _latch;
std::list<AutoIncRange> _buffers;
size_t _current_volume {0};
};

class GlobalAutoIncBuffers {
Expand Down Expand Up @@ -115,8 +135,7 @@ class GlobalAutoIncBuffers {
auto key = std::make_tuple(db_id, table_id, column_id);
auto it = _buffers.find(key);
if (it == _buffers.end()) {
_buffers.emplace(std::make_pair(
key, AutoIncIDBuffer::create_shared(db_id, table_id, column_id)));
_buffers.emplace(key, AutoIncIDBuffer::create_shared(db_id, table_id, column_id));
}
return _buffers[{db_id, table_id, column_id}];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,14 @@ public class AutoIncrementGenerator implements Writable, GsonPostProcessable {

public static final long NEXT_ID_INIT_VALUE = 1;
// _MIN_BATCH_SIZE = 4064 in load task
private static final long BATCH_ID_INTERVAL = 50000;
private static final long BATCH_ID_INTERVAL = 500000;

@SerializedName(value = "dbId")
private Long dbId;
@SerializedName(value = "tableId")
private Long tableId;
@SerializedName(value = "columnId")
private Long columnId;
@SerializedName(value = "nextId")
private long nextId;
@SerializedName(value = "batchEndId")
private long batchEndId;
Expand Down Expand Up @@ -86,10 +85,10 @@ public synchronized Pair<Long, Long> getAutoIncrementRange(long columnId,
long endId = startId + length;
nextId = startId + length;
if (endId > batchEndId) {
batchEndId = (endId / BATCH_ID_INTERVAL + 1) * BATCH_ID_INTERVAL;
Preconditions.checkState(editLog != null);
AutoIncrementIdUpdateLog info = new AutoIncrementIdUpdateLog(dbId, tableId, columnId, batchEndId);
editLog.logUpdateAutoIncrementId(info);
batchEndId = (endId / BATCH_ID_INTERVAL + 1) * BATCH_ID_INTERVAL;
}
LOG.info("[getAutoIncrementRange result][{}, {}]", startId, length);
return Pair.of(startId, length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2626,6 +2626,16 @@ public TAutoIncrementRangeResult getAutoIncrementRange(TAutoIncrementRangeReques
TAutoIncrementRangeResult result = new TAutoIncrementRangeResult();
TStatus status = new TStatus(TStatusCode.OK);
result.setStatus(status);

if (!Env.getCurrentEnv().isMaster()) {
status.setStatusCode(TStatusCode.NOT_MASTER);
status.addToErrorMsgs(NOT_MASTER_ERR_MSG);
result.setMasterAddress(getMasterAddress());
LOG.error("failed to getAutoIncrementRange:{}, request:{}, backend:{}",
NOT_MASTER_ERR_MSG, request, getClientAddrAsString());
return result;
}

try {
Env env = Env.getCurrentEnv();
Database db = env.getInternalCatalog().getDbOrMetaException(request.getDbId());
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -1323,6 +1323,7 @@ struct TAutoIncrementRangeResult {
1: optional Status.TStatus status
2: optional i64 start
3: optional i64 length
4: optional Types.TNetworkAddress master_address
}

struct TCreatePartitionRequest {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
1500000 1500000

-- !sql --
3000000 3000000

-- !sql --
4500000 4500000

Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_unique_table_auto_inc_concurrent") {

def table1 = "test_unique_table_auto_inc_concurrent"
sql "drop table if exists ${table1}"
sql """
CREATE TABLE IF NOT EXISTS `${table1}` (
`id` BIGINT NOT NULL AUTO_INCREMENT,
`value` int(11) NOT NULL
) ENGINE=OLAP
UNIQUE KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"in_memory" = "false",
"storage_format" = "V2",
"enable_unique_key_merge_on_write" = "true"
)
"""

def run_test = {thread_num, rows, iters ->
def threads = []
(1..thread_num).each { id1 ->
threads.add(Thread.start {
(1..iters).each { id2 ->
sql """insert into ${table1}(value) select number from numbers("number" = "${rows}");"""
}
})
}

threads.each { thread -> thread.join() }

qt_sql "select count(id), count(distinct id) from ${table1};"
}

run_test(15, 10000, 10)
run_test(15, 100000, 1)
run_test(5, 30000, 10)

sql "drop table if exists ${table1};"
}