diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp index e32e9c9efcf4cc..62ff0b2fccee81 100644 --- a/be/src/exec/tablet_info.cpp +++ b/be/src/exec/tablet_info.cpp @@ -388,21 +388,18 @@ Status VOlapTablePartitionParam::init() { // for both auto/non-auto partition table. _is_in_partition = _part_type == TPartitionType::type::LIST_PARTITIONED; - // initial partitions. if meet dummy partitions only for open BE nodes, not generate key of them for finding + // initial partitions for (const auto& t_part : _t_param.partitions) { VOlapTablePartition* part = nullptr; RETURN_IF_ERROR(generate_partition_from(t_part, part)); _partitions.emplace_back(part); - - if (!_t_param.partitions_is_fake) { - if (_is_in_partition) { - for (auto& in_key : part->in_keys) { - _partitions_map->emplace(std::tuple {in_key.first, in_key.second, false}, part); - } - } else { - _partitions_map->emplace( - std::tuple {part->end_key.first, part->end_key.second, false}, part); + if (_is_in_partition) { + for (auto& in_key : part->in_keys) { + _partitions_map->emplace(std::tuple {in_key.first, in_key.second, false}, part); } + } else { + _partitions_map->emplace(std::tuple {part->end_key.first, part->end_key.second, false}, + part); } } diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp index 726016a7a04bd4..146575feac9fa9 100644 --- a/be/src/runtime/load_channel.cpp +++ b/be/src/runtime/load_channel.cpp @@ -33,11 +33,11 @@ namespace doris { bvar::Adder g_loadchannel_cnt("loadchannel_cnt"); LoadChannel::LoadChannel(const UniqueId& load_id, int64_t timeout_s, bool is_high_priority, - std::string sender_ip, int64_t backend_id, bool enable_profile) + const std::string& sender_ip, int64_t backend_id, bool enable_profile) : _load_id(load_id), _timeout_s(timeout_s), _is_high_priority(is_high_priority), - _sender_ip(std::move(sender_ip)), + _sender_ip(sender_ip), _backend_id(backend_id), _enable_profile(enable_profile) { std::shared_ptr query_context = @@ -161,7 +161,6 @@ Status LoadChannel::add_batch(const PTabletWriterAddBlockRequest& request, } // 3. handle eos - // if channel is incremental, maybe hang on close until all close request arrived. if (request.has_eos() && request.eos()) { st = _handle_eos(channel.get(), request, response); _report_profile(response); @@ -183,23 +182,6 @@ Status LoadChannel::_handle_eos(BaseTabletsChannel* channel, auto index_id = request.index_id(); RETURN_IF_ERROR(channel->close(this, request, response, &finished)); - - // for init node, we close waiting(hang on) all close request and let them return together. - if (request.has_hang_wait() && request.hang_wait()) { - DCHECK(!channel->is_incremental_channel()); - VLOG_TRACE << "reciever close waiting!" << request.sender_id(); - int count = 0; - while (!channel->is_finished()) { - bthread_usleep(1000); - count++; - } - // now maybe finished or cancelled. - VLOG_TRACE << "reciever close wait finished!" << request.sender_id(); - if (count >= 1000 * _timeout_s) { // maybe config::streaming_load_rpc_max_alive_time_sec - return Status::InternalError("Tablets channel didn't wait all close"); - } - } - if (finished) { std::lock_guard l(_lock); { @@ -209,7 +191,6 @@ Status LoadChannel::_handle_eos(BaseTabletsChannel* channel, std::make_pair(channel->total_received_rows(), channel->num_rows_filtered()))); _tablets_channels.erase(index_id); } - VLOG_NOTICE << "load " << _load_id.to_string() << " closed tablets_channel " << index_id; _finished_channel_ids.emplace(index_id); } return Status::OK(); diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h index 98a8d7c9f81203..4a437e51907d8c 100644 --- a/be/src/runtime/load_channel.h +++ b/be/src/runtime/load_channel.h @@ -17,7 +17,10 @@ #pragma once +#include #include +#include +#include #include #include #include @@ -25,11 +28,15 @@ #include #include #include +#include #include "common/status.h" +#include "olap/memtable_memory_limiter.h" +#include "runtime/exec_env.h" #include "runtime/thread_context.h" #include "util/runtime_profile.h" #include "util/spinlock.h" +#include "util/thrift_util.h" #include "util/uid_util.h" namespace doris { @@ -45,7 +52,7 @@ class BaseTabletsChannel; class LoadChannel { public: LoadChannel(const UniqueId& load_id, int64_t timeout_s, bool is_high_priority, - std::string sender_ip, int64_t backend_id, bool enable_profile); + const std::string& sender_ip, int64_t backend_id, bool enable_profile); ~LoadChannel(); // open a new load channel if not exist diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp index d236645b1fe79c..4b0cc32f9c99ac 100644 --- a/be/src/runtime/load_channel_mgr.cpp +++ b/be/src/runtime/load_channel_mgr.cpp @@ -24,17 +24,25 @@ // IWYU pragma: no_include #include // IWYU pragma: keep #include +#include +#include #include #include +#include #include +#include #include #include "common/config.h" #include "common/logging.h" #include "runtime/exec_env.h" #include "runtime/load_channel.h" +#include "runtime/memory/mem_tracker.h" #include "util/doris_metrics.h" +#include "util/mem_info.h" #include "util/metrics.h" +#include "util/perf_counters.h" +#include "util/pretty_printer.h" #include "util/thread.h" namespace doris { diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index 77cad55b9e73a9..266a4b97183dcf 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -360,7 +360,7 @@ LoadStream::~LoadStream() { Status LoadStream::init(const POpenLoadStreamRequest* request) { _txn_id = request->txn_id(); _total_streams = request->total_streams(); - _is_incremental = (_total_streams == 0); + DCHECK(_total_streams > 0) << "total streams should be greator than 0"; _schema = std::make_shared(); RETURN_IF_ERROR(_schema->init(request->schema())); diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h index b2635698379f6d..c61a2d163de495 100644 --- a/be/src/runtime/load_stream.h +++ b/be/src/runtime/load_stream.h @@ -117,9 +117,6 @@ class LoadStream : public brpc::StreamInputHandler { void add_source(int64_t src_id) { std::lock_guard lock_guard(_lock); _open_streams[src_id]++; - if (_is_incremental) { - _total_streams++; - } } Status close(int64_t src_id, const std::vector& tablets_to_commit, @@ -170,7 +167,6 @@ class LoadStream : public brpc::StreamInputHandler { RuntimeProfile::Counter* _close_wait_timer = nullptr; LoadStreamMgr* _load_stream_mgr = nullptr; QueryThreadContext _query_thread_context; - bool _is_incremental = false; }; using LoadStreamPtr = std::unique_ptr; diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index f8b0116b2f9608..526c979968d48a 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -140,29 +140,9 @@ Status BaseTabletsChannel::open(const PTabletWriterOpenRequest& request) { RETURN_IF_ERROR(_schema->init(request.schema())); _tuple_desc = _schema->tuple_desc(); - int max_sender = request.num_senders(); - /* - * a tablets channel in reciever is related to a bulk of VNodeChannel of sender. each instance one or none. - * there are two possibilities: - * 1. there's partitions originally broadcasted by FE. so all sender(instance) know it at start. and open() will be - * called directly, not by incremental_open(). and after _state changes to kOpened. _open_by_incremental will never - * be true. in this case, _num_remaining_senders will keep same with senders number. when all sender sent close rpc, - * the tablets channel will close. and if for auto partition table, these channel's closing will hang on reciever and - * return together to avoid close-then-incremental-open problem. - * 2. this tablets channel is opened by incremental_open of sender's sink node. so only this sender will know this partition - * (this TabletsChannel) at that time. and we are not sure how many sender will know in the end. it depends on data - * distribution. in this situation open() is called by incremental_open() at first time. so _open_by_incremental is true. - * then _num_remaining_senders will not be set here. but inc every time when incremental_open() called. so it's dynamic - * and also need same number of senders' close to close. but will not hang. - */ - if (_open_by_incremental) { - DCHECK(_num_remaining_senders == 0) << _num_remaining_senders; - } else { - _num_remaining_senders = max_sender; - } - // just use max_sender no matter incremental or not cuz we dont know how many senders will open. - _next_seqs.resize(max_sender, 0); - _closed_senders.Reset(max_sender); + _num_remaining_senders = request.num_senders(); + _next_seqs.resize(_num_remaining_senders, 0); + _closed_senders.Reset(_num_remaining_senders); RETURN_IF_ERROR(_open_all_writers(request)); @@ -172,19 +152,10 @@ Status BaseTabletsChannel::open(const PTabletWriterOpenRequest& request) { Status BaseTabletsChannel::incremental_open(const PTabletWriterOpenRequest& params) { SCOPED_TIMER(_incremental_open_timer); - - // current node first opened by incremental open - if (_state == kInitialized) { - _open_by_incremental = true; + if (_state == kInitialized) { // haven't opened RETURN_IF_ERROR(open(params)); } - std::lock_guard l(_lock); - - if (_open_by_incremental) { - _num_remaining_senders++; - } - std::vector* index_slots = nullptr; int32_t schema_hash = 0; for (const auto& index : _schema->indexes()) { diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h index f3b996d91dce8c..27db9387602658 100644 --- a/be/src/runtime/tablets_channel.h +++ b/be/src/runtime/tablets_channel.h @@ -21,6 +21,8 @@ #include #include +#include +#include #include #include #include @@ -111,11 +113,6 @@ class BaseTabletsChannel { size_t num_rows_filtered() const { return _num_rows_filtered; } - // means this tablets in this BE is incremental opened partitions. - bool is_incremental_channel() const { return _open_by_incremental; } - - bool is_finished() const { return _state == kFinished; } - protected: Status _get_current_seq(int64_t& cur_seq, const PTabletWriterAddBlockRequest& request); @@ -154,8 +151,8 @@ class BaseTabletsChannel { int64_t _txn_id = -1; int64_t _index_id = -1; std::shared_ptr _schema; + TupleDescriptor* _tuple_desc = nullptr; - bool _open_by_incremental = false; // next sequence we expect int _num_remaining_senders = 0; diff --git a/be/src/vec/sink/load_stream_map_pool.cpp b/be/src/vec/sink/load_stream_map_pool.cpp index 7a3072ade6e70b..fdcfe190dbf5c5 100644 --- a/be/src/vec/sink/load_stream_map_pool.cpp +++ b/be/src/vec/sink/load_stream_map_pool.cpp @@ -35,7 +35,7 @@ LoadStreamMap::LoadStreamMap(UniqueId load_id, int64_t src_id, int num_streams, DCHECK(num_use > 0) << "use num should be greater than 0"; } -std::shared_ptr LoadStreamMap::get_or_create(int64_t dst_id, bool incremental) { +std::shared_ptr LoadStreamMap::get_or_create(int64_t dst_id) { std::lock_guard lock(_mutex); std::shared_ptr streams = _streams_for_node[dst_id]; if (streams != nullptr) { @@ -44,7 +44,7 @@ std::shared_ptr LoadStreamMap::get_or_create(int64_t dst_id, bool incre streams = std::make_shared(); for (int i = 0; i < _num_streams; i++) { streams->emplace_back(new LoadStreamStub(_load_id, _src_id, _tablet_schema_for_index, - _enable_unique_mow_for_index, incremental)); + _enable_unique_mow_for_index)); } _streams_for_node[dst_id] = streams; return streams; @@ -101,13 +101,10 @@ bool LoadStreamMap::release() { return false; } -Status LoadStreamMap::close_load(bool incremental) { - return for_each_st([this, incremental](int64_t dst_id, const Streams& streams) -> Status { +Status LoadStreamMap::close_load() { + return for_each_st([this](int64_t dst_id, const Streams& streams) -> Status { const auto& tablets = _tablets_to_commit[dst_id]; for (auto& stream : streams) { - if (stream->is_incremental() != incremental) { - continue; - } RETURN_IF_ERROR(stream->close_load(tablets)); } return Status::OK(); diff --git a/be/src/vec/sink/load_stream_map_pool.h b/be/src/vec/sink/load_stream_map_pool.h index d0f72ab7e004e0..aad12dba2aa4ac 100644 --- a/be/src/vec/sink/load_stream_map_pool.h +++ b/be/src/vec/sink/load_stream_map_pool.h @@ -78,7 +78,7 @@ class LoadStreamMap { LoadStreamMap(UniqueId load_id, int64_t src_id, int num_streams, int num_use, LoadStreamMapPool* pool); - std::shared_ptr get_or_create(int64_t dst_id, bool incremental = false); + std::shared_ptr get_or_create(int64_t dst_id); std::shared_ptr at(int64_t dst_id); @@ -95,7 +95,7 @@ class LoadStreamMap { // send CLOSE_LOAD to all streams, return ERROR if any. // only call this method after release() returns true. - Status close_load(bool incremental); + Status close_load(); private: const UniqueId _load_id; diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index caebb381db6048..92670c1c930090 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -127,12 +127,11 @@ inline std::ostream& operator<<(std::ostream& ostr, const LoadStreamReplyHandler LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id, std::shared_ptr schema_map, - std::shared_ptr mow_map, bool incremental) + std::shared_ptr mow_map) : _load_id(load_id), _src_id(src_id), _tablet_schema_for_index(schema_map), - _enable_unique_mow_for_index(mow_map), - _is_incremental(incremental) {}; + _enable_unique_mow_for_index(mow_map) {}; LoadStreamStub::~LoadStreamStub() { if (_is_init.load() && !_is_closed.load()) { @@ -169,13 +168,7 @@ Status LoadStreamStub::open(BrpcClientCache* client_cache, request.set_src_id(_src_id); request.set_txn_id(txn_id); request.set_enable_profile(enable_profile); - if (_is_incremental) { - request.set_total_streams(0); - } else if (total_streams > 0) { - request.set_total_streams(total_streams); - } else { - return Status::InternalError("total_streams should be greator than 0"); - } + request.set_total_streams(total_streams); request.set_idle_timeout_ms(idle_timeout_ms); schema.to_protobuf(request.mutable_schema()); for (auto& tablet : tablets_for_schema) { diff --git a/be/src/vec/sink/load_stream_stub.h b/be/src/vec/sink/load_stream_stub.h index 1bf0fac4e381b8..1f0d2e459d3344 100644 --- a/be/src/vec/sink/load_stream_stub.h +++ b/be/src/vec/sink/load_stream_stub.h @@ -111,12 +111,12 @@ class LoadStreamStub : public std::enable_shared_from_this { // construct new stub LoadStreamStub(PUniqueId load_id, int64_t src_id, std::shared_ptr schema_map, - std::shared_ptr mow_map, bool incremental = false); + std::shared_ptr mow_map); LoadStreamStub(UniqueId load_id, int64_t src_id, std::shared_ptr schema_map, - std::shared_ptr mow_map, bool incremental = false) - : LoadStreamStub(load_id.to_proto(), src_id, schema_map, mow_map, incremental) {}; + std::shared_ptr mow_map) + : LoadStreamStub(load_id.to_proto(), src_id, schema_map, mow_map) {}; // for mock this class in UT #ifdef BE_TEST @@ -195,8 +195,6 @@ class LoadStreamStub : public std::enable_shared_from_this { int64_t dst_id() const { return _dst_id; } - bool is_incremental() const { return _is_incremental; } - friend std::ostream& operator<<(std::ostream& ostr, const LoadStreamStub& stub); std::string to_string(); @@ -257,8 +255,6 @@ class LoadStreamStub : public std::enable_shared_from_this { bthread::Mutex _failed_tablets_mutex; std::vector _success_tablets; std::unordered_map _failed_tablets; - - bool _is_incremental = false; }; } // namespace doris diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index 818bff422f9583..64fb092e736c8a 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -35,9 +35,11 @@ #include #include +#include #include #include #include +#include #include #include #include @@ -48,18 +50,23 @@ #include "util/runtime_profile.h" #include "vec/data_types/data_type.h" #include "vec/exprs/vexpr_fwd.h" +#include "vec/runtime/vdatetime_value.h" +#include "vec/sink/volap_table_sink.h" #include "vec/sink/vrow_distribution.h" #ifdef DEBUG #include #endif +#include "bvar/bvar.h" #include "common/compiler_util.h" // IWYU pragma: keep #include "common/logging.h" #include "common/object_pool.h" #include "common/signal_handler.h" #include "common/status.h" #include "exec/tablet_info.h" +#include "runtime/client_cache.h" +#include "runtime/define_primitive_type.h" #include "runtime/descriptors.h" #include "runtime/exec_env.h" #include "runtime/runtime_state.h" @@ -79,8 +86,11 @@ #include "util/uid_util.h" #include "vec/columns/column.h" #include "vec/columns/column_const.h" +#include "vec/columns/column_decimal.h" +#include "vec/columns/column_nullable.h" #include "vec/columns/column_vector.h" #include "vec/columns/columns_number.h" +#include "vec/common/assert_cast.h" #include "vec/core/block.h" #include "vec/core/types.h" #include "vec/data_types/data_type_nullable.h" @@ -100,8 +110,7 @@ bvar::Adder g_sink_write_rows; bvar::PerSecond> g_sink_write_rows_per_second("sink_throughput_row", &g_sink_write_rows, 60); -Status IndexChannel::init(RuntimeState* state, const std::vector& tablets, - bool incremental) { +Status IndexChannel::init(RuntimeState* state, const std::vector& tablets) { SCOPED_CONSUME_MEM_TRACKER(_index_channel_tracker.get()); for (const auto& tablet : tablets) { // First find the location BEs of this tablet @@ -119,15 +128,8 @@ Status IndexChannel::init(RuntimeState* state, const std::vector_pool. // Because the deconstruction of NodeChannel may take a long time to wait rpc finish. // but the ObjectPool will hold a spin lock to delete objects. - channel = - std::make_shared(_parent, this, replica_node_id, incremental); + channel = std::make_shared(_parent, this, replica_node_id); _node_channels.emplace(replica_node_id, channel); - // incremental opened new node. when close we have use two-stage close. - if (incremental) { - _has_inc_node = true; - } - LOG(INFO) << "init new node for instance " << _parent->_sender_id - << ", incremantal:" << incremental; } else { channel = it->second; } @@ -357,23 +359,22 @@ Status VNodeChannel::init(RuntimeState* state) { // add block closure // Has to using value to capture _task_exec_ctx because tablet writer may destroyed during callback. _send_block_callback = WriteBlockCallback::create_shared(); - _send_block_callback->addFailedHandler( - [&, task_exec_ctx = _task_exec_ctx](const WriteBlockCallbackContext& ctx) { - std::shared_ptr ctx_lock = task_exec_ctx.lock(); - if (ctx_lock == nullptr) { - return; - } - _add_block_failed_callback(ctx); - }); + _send_block_callback->addFailedHandler([&, task_exec_ctx = _task_exec_ctx](bool is_last_rpc) { + auto ctx_lock = task_exec_ctx.lock(); + if (ctx_lock == nullptr) { + return; + } + _add_block_failed_callback(is_last_rpc); + }); _send_block_callback->addSuccessHandler( [&, task_exec_ctx = _task_exec_ctx](const PTabletWriterAddBlockResult& result, - const WriteBlockCallbackContext& ctx) { - std::shared_ptr ctx_lock = task_exec_ctx.lock(); + bool is_last_rpc) { + auto ctx_lock = task_exec_ctx.lock(); if (ctx_lock == nullptr) { return; } - _add_block_success_callback(result, ctx); + _add_block_success_callback(result, is_last_rpc); }); _name = fmt::format("VNodeChannel[{}-{}]", _index_channel->_index_id, _node_id); @@ -676,7 +677,6 @@ void VNodeChannel::try_send_pending_block(RuntimeState* state) { } // eos request must be the last request-> it's a signal makeing callback function to set _add_batch_finished true. - // end_mark makes is_last_rpc true when rpc finished and call callbacks. _send_block_callback->end_mark(); _send_finished = true; CHECK(_pending_batches_num == 0) << _pending_batches_num; @@ -726,7 +726,7 @@ void VNodeChannel::try_send_pending_block(RuntimeState* state) { } void VNodeChannel::_add_block_success_callback(const PTabletWriterAddBlockResult& result, - const WriteBlockCallbackContext& ctx) { + bool is_last_rpc) { std::lock_guard l(this->_closed_lock); if (this->_is_closed) { // if the node channel is closed, no need to call the following logic, @@ -744,7 +744,7 @@ void VNodeChannel::_add_block_success_callback(const PTabletWriterAddBlockResult Status st = _index_channel->check_intolerable_failure(); if (!st.ok()) { _cancel_with_msg(st.to_string()); - } else if (ctx._is_last_rpc) { + } else if (is_last_rpc) { for (const auto& tablet : result.tablet_vec()) { TTabletCommitInfo commit_info; commit_info.tabletId = tablet.tablet_id(); @@ -802,7 +802,7 @@ void VNodeChannel::_add_block_success_callback(const PTabletWriterAddBlockResult } } -void VNodeChannel::_add_block_failed_callback(const WriteBlockCallbackContext& ctx) { +void VNodeChannel::_add_block_failed_callback(bool is_last_rpc) { std::lock_guard l(this->_closed_lock); if (this->_is_closed) { // if the node channel is closed, no need to call `mark_as_failed`, @@ -819,7 +819,7 @@ void VNodeChannel::_add_block_failed_callback(const WriteBlockCallbackContext& c Status st = _index_channel->check_intolerable_failure(); if (!st.ok()) { _cancel_with_msg(fmt::format("{}, err: {}", channel_info(), st.to_string())); - } else if (ctx._is_last_rpc) { + } else if (is_last_rpc) { // if this is last rpc, will must set _add_batches_finished. otherwise, node channel's close_wait // will be blocked. _add_batches_finished = true; @@ -892,14 +892,12 @@ Status VNodeChannel::close_wait(RuntimeState* state) { } } - // Waiting for finished until _add_batches_finished changed by rpc's finished callback. - // it may take a long time, so we couldn't set a timeout + // waiting for finished, it may take a long time, so we couldn't set a timeout // For pipeline engine, the close is called in async writer's process block method, // so that it will not block pipeline thread. while (!_add_batches_finished && !_cancelled && !state->is_cancelled()) { bthread_usleep(1000); } - VLOG_CRITICAL << _parent->_sender_id << " close wait finished"; _close_time_ms = UnixMillis() - _close_time_ms; if (_cancelled || state->is_cancelled()) { @@ -927,18 +925,17 @@ void VNodeChannel::_close_check() { CHECK(_cur_mutable_block == nullptr) << name(); } -void VNodeChannel::mark_close(bool hang_wait) { +void VNodeChannel::mark_close() { auto st = none_of({_cancelled, _eos_is_produced}); if (!st.ok()) { return; } _cur_add_block_request->set_eos(true); - _cur_add_block_request->set_hang_wait(hang_wait); { std::lock_guard l(_pending_batches_lock); if (!_cur_mutable_block) [[unlikely]] { - // never had a block arrived. add a dummy block + // add a dummy block _cur_mutable_block = vectorized::MutableBlock::create_unique(); } auto tmp_add_block_request = @@ -1171,7 +1168,7 @@ Status VTabletWriter::_init(RuntimeState* state, RuntimeProfile* profile) { return Status::InternalError("unknown destination tuple descriptor"); } - if (!_vec_output_expr_ctxs.empty() && + if (_vec_output_expr_ctxs.size() > 0 && _output_tuple_desc->slots().size() != _vec_output_expr_ctxs.size()) { LOG(WARNING) << "output tuple slot num should be equal to num of output exprs, " << "output_tuple_slot_num " << _output_tuple_desc->slots().size() @@ -1282,7 +1279,7 @@ Status VTabletWriter::_incremental_open_node_channel( // update and reinit for existing channels. std::shared_ptr channel = _index_id_to_channel[index->index_id]; DCHECK(channel != nullptr); - RETURN_IF_ERROR(channel->init(_state, tablets, true)); // add tablets into it + RETURN_IF_ERROR(channel->init(_state, tablets)); // add tablets into it } fmt::memory_buffer buf; @@ -1377,63 +1374,14 @@ void VTabletWriter::_do_try_close(RuntimeState* state, const Status& exec_status _try_close = true; // will stop periodic thread if (status.ok()) { - // BE id -> add_batch method counter - std::unordered_map node_add_batch_counter_map; - // only if status is ok can we call this _profile->total_time_counter(). // if status is not ok, this sink may not be prepared, so that _profile is null SCOPED_TIMER(_profile->total_time_counter()); - for (const auto& index_channel : _channels) { - // two-step mark close. first we send close_origin to recievers to close all originly exist TabletsChannel. - // when they all closed, we are sure all Writer of instances called _do_try_close. that means no new channel - // will be opened. the refcount of recievers will be monotonically decreasing. then we are safe to close all - // our channels. - if (index_channel->has_incremental_node_channel()) { - if (!status.ok()) { - break; - } - VLOG_TRACE << _sender_id << " first stage close start"; - index_channel->for_init_node_channel( - [&index_channel, &status](const std::shared_ptr& ch) { - if (!status.ok() || ch->is_closed()) { - return; - } - ch->mark_close(true); - if (ch->is_cancelled()) { - status = cancel_channel_and_check_intolerable_failure( - status, ch->get_cancel_msg(), index_channel, ch); - } - }); - if (!status.ok()) { - break; - } - index_channel->for_init_node_channel( - [this, &index_channel, &status](const std::shared_ptr& ch) { - if (!status.ok() || ch->is_closed()) { - return; - } - auto s = ch->close_wait(_state); - if (!s.ok()) { - status = cancel_channel_and_check_intolerable_failure( - status, s.to_string(), index_channel, ch); - } - }); + { + for (const auto& index_channel : _channels) { if (!status.ok()) { break; } - index_channel->for_inc_node_channel( - [&index_channel, &status](const std::shared_ptr& ch) { - if (!status.ok() || ch->is_closed()) { - return; - } - // only first try close, all node channels will mark_close() - ch->mark_close(); - if (ch->is_cancelled()) { - status = cancel_channel_and_check_intolerable_failure( - status, ch->get_cancel_msg(), index_channel, ch); - } - }); - } else { // not has_incremental_node_channel index_channel->for_each_node_channel( [&index_channel, &status](const std::shared_ptr& ch) { if (!status.ok() || ch->is_closed()) { @@ -1446,8 +1394,8 @@ void VTabletWriter::_do_try_close(RuntimeState* state, const Status& exec_status status, ch->get_cancel_msg(), index_channel, ch); } }); - } - } // end for index channels + } // end for index channels + } } if (!status.ok()) { diff --git a/be/src/vec/sink/writer/vtablet_writer.h b/be/src/vec/sink/writer/vtablet_writer.h index 603034cea6d7a5..bcc5228457a02a 100644 --- a/be/src/vec/sink/writer/vtablet_writer.h +++ b/be/src/vec/sink/writer/vtablet_writer.h @@ -36,11 +36,13 @@ #include #include #include +#include #include #include #include #include #include +#include #include #include #include @@ -53,17 +55,23 @@ #include "common/status.h" #include "exec/data_sink.h" #include "exec/tablet_info.h" +#include "gutil/ref_counted.h" +#include "runtime/decimalv2_value.h" #include "runtime/exec_env.h" #include "runtime/memory/mem_tracker.h" #include "runtime/thread_context.h" +#include "runtime/types.h" +#include "util/countdown_latch.h" #include "util/ref_count_closure.h" #include "util/runtime_profile.h" #include "util/spinlock.h" #include "util/stopwatch.hpp" #include "vec/columns/column.h" +#include "vec/common/allocator.h" #include "vec/core/block.h" #include "vec/data_types/data_type.h" #include "vec/exprs/vexpr_fwd.h" +#include "vec/runtime/vfile_format_transformer.h" #include "vec/sink/vrow_distribution.h" #include "vec/sink/vtablet_block_convertor.h" #include "vec/sink/vtablet_finder.h" @@ -106,10 +114,6 @@ struct AddBatchCounter { } }; -struct WriteBlockCallbackContext { - std::atomic _is_last_rpc {false}; -}; - // It's very error-prone to guarantee the handler capture vars' & this closure's destruct sequence. // So using create() to get the closure pointer is recommended. We can delete the closure ptr before the capture vars destruction. // Delete this point is safe, don't worry about RPC callback will run after WriteBlockCallback deleted. @@ -123,13 +127,8 @@ class WriteBlockCallback final : public ::doris::DummyBrpcCallback { WriteBlockCallback() : cid(INVALID_BTHREAD_ID) {} ~WriteBlockCallback() override = default; - void addFailedHandler(const std::function& fn) { - failed_handler = fn; - } - void addSuccessHandler( - const std::function& fn) { - success_handler = fn; - } + void addFailedHandler(const std::function& fn) { failed_handler = fn; } + void addSuccessHandler(const std::function& fn) { success_handler = fn; } void join() override { // We rely on in_flight to assure one rpc is running, @@ -166,8 +165,8 @@ class WriteBlockCallback final : public ::doris::DummyBrpcCallback { bool is_packet_in_flight() { return _packet_in_flight; } void end_mark() { - DCHECK(_ctx._is_last_rpc == false); - _ctx._is_last_rpc = true; + DCHECK(_is_last_rpc == false); + _is_last_rpc = true; } void call() override { @@ -176,9 +175,9 @@ class WriteBlockCallback final : public ::doris::DummyBrpcCallback { LOG(WARNING) << "failed to send brpc batch, error=" << berror(::doris::DummyBrpcCallback::cntl_->ErrorCode()) << ", error_text=" << ::doris::DummyBrpcCallback::cntl_->ErrorText(); - failed_handler(_ctx); + failed_handler(_is_last_rpc); } else { - success_handler(*(::doris::DummyBrpcCallback::response_), _ctx); + success_handler(*(::doris::DummyBrpcCallback::response_), _is_last_rpc); } clear_in_flight(); } @@ -186,9 +185,9 @@ class WriteBlockCallback final : public ::doris::DummyBrpcCallback { private: brpc::CallId cid; std::atomic _packet_in_flight {false}; - WriteBlockCallbackContext _ctx; - std::function failed_handler; - std::function success_handler; + std::atomic _is_last_rpc {false}; + std::function failed_handler; + std::function success_handler; }; class IndexChannel; @@ -259,8 +258,7 @@ class VNodeChannel { // two ways to stop channel: // 1. mark_close()->close_wait() PS. close_wait() will block waiting for the last AddBatch rpc response. // 2. just cancel() - // hang_wait = true will make reciever hang until all sender mark_closed. - void mark_close(bool hang_wait = false); + void mark_close(); bool is_closed() const { return _is_closed; } bool is_cancelled() const { return _cancelled; } @@ -322,9 +320,8 @@ class VNodeChannel { void _close_check(); void _cancel_with_msg(const std::string& msg); - void _add_block_success_callback(const PTabletWriterAddBlockResult& result, - const WriteBlockCallbackContext& ctx); - void _add_block_failed_callback(const WriteBlockCallbackContext& ctx); + void _add_block_success_callback(const PTabletWriterAddBlockResult& result, bool is_last_rpc); + void _add_block_failed_callback(bool is_last_rpc); VTabletWriter* _parent = nullptr; IndexChannel* _index_channel = nullptr; @@ -428,8 +425,7 @@ class IndexChannel { ~IndexChannel() = default; // allow to init multi times, for incremental open more tablets for one index(table) - Status init(RuntimeState* state, const std::vector& tablets, - bool incremental = false); + Status init(RuntimeState* state, const std::vector& tablets); void for_each_node_channel( const std::function&)>& func) { @@ -438,26 +434,6 @@ class IndexChannel { } } - void for_init_node_channel( - const std::function&)>& func) { - for (auto& it : _node_channels) { - if (!it.second->is_incremental()) { - func(it.second); - } - } - } - - void for_inc_node_channel( - const std::function&)>& func) { - for (auto& it : _node_channels) { - if (it.second->is_incremental()) { - func(it.second); - } - } - } - - bool has_incremental_node_channel() const { return _has_inc_node; } - void mark_as_failed(const VNodeChannel* node_channel, const std::string& err, int64_t tablet_id = -1); Status check_intolerable_failure(); @@ -516,7 +492,6 @@ class IndexChannel { std::unordered_map> _node_channels; // from tablet_id to backend channel std::unordered_map>> _channels_by_tablet; - bool _has_inc_node = false; // lock to protect _failed_channels and _failed_channels_msgs mutable doris::SpinLock _fail_lock; diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index 1b14a57d15403d..c1b43722c33b99 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -111,7 +111,7 @@ Status VTabletWriterV2::_incremental_open_streams( } } for (int64_t dst_id : new_backends) { - auto streams = _load_stream_map->get_or_create(dst_id, true); + auto streams = _load_stream_map->get_or_create(dst_id); RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams)); } return Status::OK(); @@ -310,11 +310,6 @@ Status VTabletWriterV2::_build_tablet_node_mapping() { tablet.set_index_id(index.index_id); tablet.set_tablet_id(tablet_id); _tablets_for_node[node].emplace(tablet_id, tablet); - constexpr int64_t DUMMY_TABLET_ID = 0; - if (tablet_id == DUMMY_TABLET_ID) [[unlikely]] { - // ignore fake tablet for auto partition - continue; - } if (known_indexes.contains(index.index_id)) [[likely]] { continue; } @@ -553,26 +548,32 @@ Status VTabletWriterV2::close(Status exec_status) { LOG(INFO) << "sink " << _sender_id << " released streams, is_last=" << is_last_sink << ", load_id=" << print_id(_load_id); - // send CLOSE_LOAD on all non-incremental streams if this is the last sink + // send CLOSE_LOAD on all streams if this is the last sink if (is_last_sink) { - RETURN_IF_ERROR(_load_stream_map->close_load(false)); + RETURN_IF_ERROR(_load_stream_map->close_load()); } - // close_wait on all non-incremental streams, even if this is not the last sink. + // close_wait on all streams, even if this is not the last sink. // because some per-instance data structures are now shared among all sinks // due to sharing delta writers and load stream stubs. - RETURN_IF_ERROR(_close_wait(false)); - - // send CLOSE_LOAD on all incremental streams if this is the last sink. - // this must happen after all non-incremental streams are closed, - // so we can ensure all sinks are in close phase before closing incremental streams. - if (is_last_sink) { - RETURN_IF_ERROR(_load_stream_map->close_load(true)); + { + SCOPED_TIMER(_close_load_timer); + RETURN_IF_ERROR(_load_stream_map->for_each_st([this](int64_t dst_id, + const Streams& streams) -> Status { + for (auto& stream : streams) { + int64_t remain_ms = static_cast(_state->execution_timeout()) * 1000 - + _timeout_watch.elapsed_time() / 1000 / 1000; + if (remain_ms <= 0) { + LOG(WARNING) << "load timed out before close waiting, load_id=" + << print_id(_load_id); + return Status::TimedOut("load timed out before close waiting"); + } + RETURN_IF_ERROR(stream->close_wait(_state, remain_ms)); + } + return Status::OK(); + })); } - // close_wait on all incremental streams, even if this is not the last sink. - RETURN_IF_ERROR(_close_wait(true)); - // calculate and submit commit info if (is_last_sink) { DBUG_EXECUTE_IF("VTabletWriterV2.close.add_failed_tablet", { @@ -623,27 +624,6 @@ Status VTabletWriterV2::close(Status exec_status) { return status; } -Status VTabletWriterV2::_close_wait(bool incremental) { - SCOPED_TIMER(_close_load_timer); - return _load_stream_map->for_each_st( - [this, incremental](int64_t dst_id, const Streams& streams) -> Status { - for (auto& stream : streams) { - if (stream->is_incremental() != incremental) { - continue; - } - int64_t remain_ms = static_cast(_state->execution_timeout()) * 1000 - - _timeout_watch.elapsed_time() / 1000 / 1000; - if (remain_ms <= 0) { - LOG(WARNING) << "load timed out before close waiting, load_id=" - << print_id(_load_id); - return Status::TimedOut("load timed out before close waiting"); - } - RETURN_IF_ERROR(stream->close_wait(_state, remain_ms)); - } - return Status::OK(); - }); -} - void VTabletWriterV2::_calc_tablets_to_commit() { LOG(INFO) << "saving close load info, load_id=" << print_id(_load_id) << ", txn_id=" << _txn_id << ", sink_id=" << _sender_id; diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h b/be/src/vec/sink/writer/vtablet_writer_v2.h index 5a9890cdb4942c..e3d31fb32b9899 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.h +++ b/be/src/vec/sink/writer/vtablet_writer_v2.h @@ -147,8 +147,6 @@ class VTabletWriterV2 final : public AsyncResultWriter { void _calc_tablets_to_commit(); - Status _close_wait(bool incremental); - Status _cancel(Status status); std::shared_ptr _mem_tracker; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java index dafdcdc49f5393..1a4d188a0ca7dd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java @@ -35,7 +35,7 @@ import java.util.stream.Collectors; public class ListPartitionItem extends PartitionItem { - public static final ListPartitionItem DUMMY_ITEM = new ListPartitionItem(Lists.newArrayList()); + public static ListPartitionItem DUMMY_ITEM = new ListPartitionItem(Lists.newArrayList()); private final List partitionKeys; private boolean isDefaultPartition = false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java index ff5fa91ee6ca6e..b227afdc142eab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java @@ -87,13 +87,6 @@ public static PartitionKey createInfinityPartitionKey(List columns, bool return partitionKey; } - public static PartitionKey createMaxPartitionKey() { - PartitionKey partitionKey = new PartitionKey(); - partitionKey.keys.add(MaxLiteral.MAX_VALUE); - // type not set - return partitionKey; - } - public static PartitionKey createPartitionKey(List keys, List columns) throws AnalysisException { PartitionKey partitionKey = new PartitionKey(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java index bb7ddabbaa4b7b..56214aaa0eadbd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java @@ -30,12 +30,10 @@ public class RangePartitionItem extends PartitionItem { private Range partitionKeyRange; - public static final Range DUMMY_RANGE; - public static final RangePartitionItem DUMMY_ITEM; + public static final Range DUMMY_ITEM; static { - DUMMY_RANGE = Range.closed(new PartitionKey(), new PartitionKey()); - DUMMY_ITEM = new RangePartitionItem(Range.closed(new PartitionKey(), PartitionKey.createMaxPartitionKey())); + DUMMY_ITEM = Range.closed(new PartitionKey(), new PartitionKey()); } public RangePartitionItem(Range range) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index df6b02b8324ebe..f52bc11829f443 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -1698,12 +1698,12 @@ public void addPartition(Database db, String tableName, AddPartitionClause addPa isTempPartition, partitionInfo.getIsMutable(partitionId)); } else if (partitionInfo.getType() == PartitionType.LIST) { info = new PartitionPersistInfo(db.getId(), olapTable.getId(), partition, - RangePartitionItem.DUMMY_RANGE, partitionInfo.getItem(partitionId), dataProperty, + RangePartitionItem.DUMMY_ITEM, partitionInfo.getItem(partitionId), dataProperty, partitionInfo.getReplicaAllocation(partitionId), partitionInfo.getIsInMemory(partitionId), isTempPartition, partitionInfo.getIsMutable(partitionId)); } else { info = new PartitionPersistInfo(db.getId(), olapTable.getId(), partition, - RangePartitionItem.DUMMY_RANGE, ListPartitionItem.DUMMY_ITEM, dataProperty, + RangePartitionItem.DUMMY_ITEM, ListPartitionItem.DUMMY_ITEM, dataProperty, partitionInfo.getReplicaAllocation(partitionId), partitionInfo.getIsInMemory(partitionId), isTempPartition, partitionInfo.getIsMutable(partitionId)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java index e3195eec13549d..ada7c6b770bab0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -339,84 +339,18 @@ private List getDistColumns(DistributionInfo distInfo) throws UserExcept return distColumns; } - private PartitionItem createDummyPartitionItem(PartitionType partType) throws UserException { - if (partType == PartitionType.LIST) { - return ListPartitionItem.DUMMY_ITEM; - } else if (partType == PartitionType.RANGE) { - return RangePartitionItem.DUMMY_ITEM; - } else { - throw new UserException("unsupported partition for OlapTable, partition=" + partType); - } - } - - private TOlapTablePartitionParam createDummyPartition(long dbId, OlapTable table, Analyzer analyzer, - TOlapTablePartitionParam partitionParam, PartitionInfo partitionInfo, PartitionType partType) - throws UserException { - partitionParam.setEnableAutomaticPartition(true); - // these partitions only use in locations. not find partition. - partitionParam.setPartitionsIsFake(true); - - // set columns - for (Column partCol : partitionInfo.getPartitionColumns()) { - partitionParam.addToPartitionColumns(partCol.getName()); - } - - int partColNum = partitionInfo.getPartitionColumns().size(); - - TOlapTablePartition fakePartition = new TOlapTablePartition(); - fakePartition.setId(0); - // set partition keys - setPartitionKeys(fakePartition, createDummyPartitionItem(partType), partColNum); - - for (Long indexId : table.getIndexIdToMeta().keySet()) { - fakePartition.addToIndexes(new TOlapTableIndexTablets(indexId, Arrays.asList(0L))); - fakePartition.setNumBuckets(1); - } - fakePartition.setIsMutable(true); - - DistributionInfo distInfo = table.getDefaultDistributionInfo(); - partitionParam.setDistributedColumns(getDistColumns(distInfo)); - partitionParam.addToPartitions(fakePartition); - - ArrayList exprSource = partitionInfo.getPartitionExprs(); - if (exprSource != null && analyzer != null) { - Analyzer funcAnalyzer = new Analyzer(analyzer.getEnv(), analyzer.getContext()); - tupleDescriptor.setTable(table); - funcAnalyzer.registerTupleDescriptor(tupleDescriptor); - // we must clone the exprs. otherwise analyze will influence the origin exprs. - ArrayList exprs = new ArrayList(); - for (Expr e : exprSource) { - exprs.add(e.clone()); - } - for (Expr e : exprs) { - e.reset(); - e.analyze(funcAnalyzer); - } - partitionParam.setPartitionFunctionExprs(Expr.treesToThrift(exprs)); - } - - return partitionParam; - } - public TOlapTablePartitionParam createPartition(long dbId, OlapTable table, Analyzer analyzer) throws UserException { TOlapTablePartitionParam partitionParam = new TOlapTablePartitionParam(); - PartitionInfo partitionInfo = table.getPartitionInfo(); - boolean enableAutomaticPartition = partitionInfo.enableAutomaticPartition(); - PartitionType partType = table.getPartitionInfo().getType(); partitionParam.setDbId(dbId); partitionParam.setTableId(table.getId()); partitionParam.setVersion(0); - partitionParam.setPartitionType(partType.toThrift()); - - // create shadow partition for empty auto partition table. only use in this load. - if (enableAutomaticPartition && partitionIds.isEmpty()) { - return createDummyPartition(dbId, table, analyzer, partitionParam, partitionInfo, partType); - } + PartitionType partType = table.getPartitionInfo().getType(); switch (partType) { case LIST: case RANGE: { + PartitionInfo partitionInfo = table.getPartitionInfo(); for (Column partCol : partitionInfo.getPartitionColumns()) { partitionParam.addToPartitionColumns(partCol.getName()); } @@ -461,6 +395,7 @@ public TOlapTablePartitionParam createPartition(long dbId, OlapTable table, Anal } } } + boolean enableAutomaticPartition = partitionInfo.enableAutomaticPartition(); // for auto create partition by function expr, there is no any partition firstly, // But this is required in thrift struct. if (enableAutomaticPartition && partitionIds.isEmpty()) { @@ -529,6 +464,7 @@ public TOlapTablePartitionParam createPartition(long dbId, OlapTable table, Anal throw new UserException("unsupported partition for OlapTable, partition=" + partType); } } + partitionParam.setPartitionType(partType.toThrift()); return partitionParam; } @@ -569,46 +505,7 @@ public static void setPartitionKeys(TOlapTablePartition tPartition, PartitionIte } } - public List createDummyLocation(OlapTable table) throws UserException { - TOlapTableLocationParam locationParam = new TOlapTableLocationParam(); - TOlapTableLocationParam slaveLocationParam = new TOlapTableLocationParam(); - - final long fakeTabletId = 0; - SystemInfoService clusterInfo = Env.getCurrentSystemInfo(); - List aliveBe = clusterInfo.getAllBackendIds(true); - if (aliveBe.isEmpty()) { - throw new UserException(InternalErrorCode.REPLICA_FEW_ERR, "no available BE in cluster"); - } - for (int i = 0; i < table.getIndexNumber(); i++) { - // only one fake tablet here - if (singleReplicaLoad) { - Long[] nodes = aliveBe.toArray(new Long[0]); - List slaveBe = aliveBe; - - Random random = new SecureRandom(); - int masterNode = random.nextInt(nodes.length); - locationParam.addToTablets(new TTabletLocation(fakeTabletId, - Arrays.asList(nodes[masterNode]))); - - slaveBe.remove(masterNode); - slaveLocationParam.addToTablets(new TTabletLocation(fakeTabletId, - slaveBe)); - } else { - locationParam.addToTablets(new TTabletLocation(fakeTabletId, - Arrays.asList(aliveBe.get(0)))); // just one fake location is enough - - LOG.info("created dummy location tablet_id={}, be_id={}", fakeTabletId, aliveBe.get(0)); - } - } - - return Arrays.asList(locationParam, slaveLocationParam); - } - public List createLocation(OlapTable table) throws UserException { - if (table.getPartitionInfo().enableAutomaticPartition() && partitionIds.isEmpty()) { - return createDummyLocation(table); - } - TOlapTableLocationParam locationParam = new TOlapTableLocationParam(); TOlapTableLocationParam slaveLocationParam = new TOlapTableLocationParam(); // BE id -> path hash diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index af86660be21293..b0abcc671417d3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -3511,7 +3511,7 @@ public TReplacePartitionResult replacePartition(TReplacePartitionRequest request if (!Env.getCurrentEnv().isMaster()) { errorStatus.setStatusCode(TStatusCode.NOT_MASTER); errorStatus.addToErrorMsgs(NOT_MASTER_ERR_MSG); - LOG.warn("failed to replace Partition: {}", NOT_MASTER_ERR_MSG); + LOG.warn("failed to createPartition: {}", NOT_MASTER_ERR_MSG); return result; } @@ -3546,8 +3546,10 @@ public TReplacePartitionResult replacePartition(TReplacePartitionRequest request List allReqPartNames; // all request partitions try { taskLock.lock(); - // we dont lock the table. other thread in this txn will be controled by taskLock. - // if we have already replaced. dont do it again, but acquire the recorded new partition directly. + // we dont lock the table. other thread in this txn will be controled by + // taskLock. + // if we have already replaced. dont do it again, but acquire the recorded new + // partition directly. // if not by this txn, just let it fail naturally is ok. List replacedPartIds = overwriteManager.tryReplacePartitionIds(taskGroupId, partitionIds); // here if replacedPartIds still have null. this will throw exception. @@ -3557,7 +3559,8 @@ public TReplacePartitionResult replacePartition(TReplacePartitionRequest request .filter(i -> partitionIds.get(i) == replacedPartIds.get(i)) // equal means not replaced .mapToObj(partitionIds::get) .collect(Collectors.toList()); - // from here we ONLY deal the pending partitions. not include the dealed(by others). + // from here we ONLY deal the pending partitions. not include the dealed(by + // others). if (!pendingPartitionIds.isEmpty()) { // below two must have same order inner. List pendingPartitionNames = olapTable.uncheckedGetPartNamesById(pendingPartitionIds); @@ -3568,7 +3571,8 @@ public TReplacePartitionResult replacePartition(TReplacePartitionRequest request overwriteManager.registerTaskInGroup(taskGroupId, taskId); InsertOverwriteUtil.addTempPartitions(olapTable, pendingPartitionNames, tempPartitionNames); InsertOverwriteUtil.replacePartition(olapTable, pendingPartitionNames, tempPartitionNames); - // now temp partitions are bumped up and use new names. we get their ids and record them. + // now temp partitions are bumped up and use new names. we get their ids and + // record them. List newPartitionIds = new ArrayList(); for (String newPartName : pendingPartitionNames) { newPartitionIds.add(olapTable.getPartition(newPartName).getId()); diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index cf0c14ea47cb8d..12b1b6b1edaea0 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -152,8 +152,6 @@ message PTabletWriterAddBlockRequest { optional bool write_single_replica = 12 [default = false]; map slave_tablet_nodes = 13; optional bool is_single_tablet_block = 14 [default = false]; - // for auto-partition first stage close, we should hang. - optional bool hang_wait = 15 [default = false]; }; message PSlaveTabletNodes { diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index 5da7b4df7de799..ef7a8451684982 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -211,7 +211,6 @@ struct TOlapTablePartitionParam { // insert overwrite partition(*) 11: optional bool enable_auto_detect_overwrite 12: optional i64 overwrite_group_id - 13: optional bool partitions_is_fake = false } struct TOlapTableIndex { diff --git a/regression-test/data/partition_p1/auto_partition/sql/two_instance_correctness.out b/regression-test/data/partition_p1/auto_partition/sql/two_instance_correctness.out deleted file mode 100644 index 4ee136aef2b9c5..00000000000000 --- a/regression-test/data/partition_p1/auto_partition/sql/two_instance_correctness.out +++ /dev/null @@ -1,4 +0,0 @@ --- This file is automatically generated. You should know what you did if you want to edit this --- !sql -- -2 - diff --git a/regression-test/suites/partition_p0/auto_partition/test_auto_range_partition.groovy b/regression-test/suites/partition_p0/auto_partition/test_auto_range_partition.groovy index f52dc2945f086a..508f086f865729 100644 --- a/regression-test/suites/partition_p0/auto_partition/test_auto_range_partition.groovy +++ b/regression-test/suites/partition_p0/auto_partition/test_auto_range_partition.groovy @@ -140,7 +140,8 @@ suite("test_auto_range_partition") { logger.info("${result2}") assertEquals(result2.size(), 2) - // insert into select have multi sender in load + // partition expr extraction + sql " drop table if exists isit " sql " drop table if exists isit_src " sql """ diff --git a/regression-test/suites/partition_p1/auto_partition/sql/multi_thread_load.groovy b/regression-test/suites/partition_p1/auto_partition/sql/multi_thread_load.groovy index 8d43d90ff15503..4f9b7a365b4713 100644 --- a/regression-test/suites/partition_p1/auto_partition/sql/multi_thread_load.groovy +++ b/regression-test/suites/partition_p1/auto_partition/sql/multi_thread_load.groovy @@ -19,7 +19,7 @@ import groovy.io.FileType import java.nio.file.Files import java.nio.file.Paths -suite("multi_thread_load", "p1,nonConcurrent") { // stress case should use resource fully``` +suite("multi_thread_load", "p1,nonConcurrent") { // stress case should use resource fully // get doris-db from s3 def dirPath = context.file.parent def fatherPath = context.file.parentFile.parentFile.getPath() diff --git a/regression-test/suites/partition_p1/auto_partition/sql/two_instance_correctness.groovy b/regression-test/suites/partition_p1/auto_partition/sql/two_instance_correctness.groovy deleted file mode 100644 index c9f2f04aab31c5..00000000000000 --- a/regression-test/suites/partition_p1/auto_partition/sql/two_instance_correctness.groovy +++ /dev/null @@ -1,45 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -suite("two_instance_correctness") { - - // finish time of instances have diff - sql "DROP TABLE IF EXISTS two_bkt;" - sql """ - create table two_bkt( - k0 date not null - ) - DISTRIBUTED BY HASH(`k0`) BUCKETS 2 - properties("replication_num" = "1"); - """ - - sql """ insert into two_bkt values ("2012-12-11"); """ - sql """ insert into two_bkt select "2020-12-12" from numbers("number" = "20000"); """ - - sql " DROP TABLE IF EXISTS two_bkt_dest; " - sql """ - create table two_bkt_dest( - k0 date not null - ) - AUTO PARTITION BY RANGE (date_trunc(k0, 'day')) () - DISTRIBUTED BY HASH(`k0`) BUCKETS 10 - properties("replication_num" = "1"); - """ - sql " insert into two_bkt_dest select * from two_bkt; " - - qt_sql " select count(distinct k0) from two_bkt_dest; " -}