Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ Status CloudBaseCompaction::modify_rowsets() {
int64_t initiator = HashUtil::hash64(_uuid.data(), _uuid.size(), 0) &
std::numeric_limits<int64_t>::max();
RETURN_IF_ERROR(cloud_tablet()->calc_delete_bitmap_for_compaction(
_input_rowsets, _output_rowset, _rowid_conversion, compaction_type(),
_input_rowsets, _output_rowset, *_rowid_conversion, compaction_type(),
_stats.merged_rows, initiator, output_rowset_delete_bitmap,
_allow_delete_in_cumu_compaction));
LOG_INFO("update delete bitmap in CloudBaseCompaction, tablet_id={}, range=[{}-{}]",
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ Status CloudCumulativeCompaction::modify_rowsets() {
if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write()) {
RETURN_IF_ERROR(cloud_tablet()->calc_delete_bitmap_for_compaction(
_input_rowsets, _output_rowset, _rowid_conversion, compaction_type(),
_input_rowsets, _output_rowset, *_rowid_conversion, compaction_type(),
_stats.merged_rows, initiator, output_rowset_delete_bitmap,
_allow_delete_in_cumu_compaction));
LOG_INFO("update delete bitmap in CloudCumulativeCompaction, tablet_id={}, range=[{}-{}]",
Expand Down
11 changes: 7 additions & 4 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ Compaction::Compaction(BaseTabletSPtr tablet, const std::string& label)
_allow_delete_in_cumu_compaction(config::enable_delete_when_cumu_compaction) {
;
init_profile(label);
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker);
_rowid_conversion = std::make_unique<RowIdConversion>();
}

Compaction::~Compaction() {
Expand All @@ -132,6 +134,7 @@ Compaction::~Compaction() {
_input_rowsets.clear();
_output_rowset.reset();
_cur_tablet_schema.reset();
_rowid_conversion.reset();
}

void Compaction::init_profile(const std::string& label) {
Expand Down Expand Up @@ -179,7 +182,7 @@ Status Compaction::merge_input_rowsets() {
if (!ctx.columns_to_do_index_compaction.empty() ||
(_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write())) {
_stats.rowid_conversion = &_rowid_conversion;
_stats.rowid_conversion = _rowid_conversion.get();
}

int64_t way_num = merge_way_num();
Expand Down Expand Up @@ -1023,7 +1026,7 @@ Status CompactionMixin::modify_rowsets() {
// TODO(LiaoXin): check if there are duplicate keys
std::size_t missed_rows_size = 0;
tablet()->calc_compaction_output_rowset_delete_bitmap(
_input_rowsets, _rowid_conversion, 0, version.second + 1, missed_rows.get(),
_input_rowsets, *_rowid_conversion, 0, version.second + 1, missed_rows.get(),
location_map.get(), _tablet->tablet_meta()->delete_bitmap(),
&output_rowset_delete_bitmap);
if (missed_rows) {
Expand Down Expand Up @@ -1121,7 +1124,7 @@ Status CompactionMixin::modify_rowsets() {
}
DeleteBitmap txn_output_delete_bitmap(_tablet->tablet_id());
tablet()->calc_compaction_output_rowset_delete_bitmap(
_input_rowsets, _rowid_conversion, 0, UINT64_MAX, missed_rows.get(),
_input_rowsets, *_rowid_conversion, 0, UINT64_MAX, missed_rows.get(),
location_map.get(), *it.delete_bitmap.get(), &txn_output_delete_bitmap);
if (config::enable_merge_on_write_correctness_check) {
RowsetIdUnorderedSet rowsetids;
Expand All @@ -1141,7 +1144,7 @@ Status CompactionMixin::modify_rowsets() {
// Convert the delete bitmap of the input rowsets to output rowset for
// incremental data.
tablet()->calc_compaction_output_rowset_delete_bitmap(
_input_rowsets, _rowid_conversion, version.second, UINT64_MAX,
_input_rowsets, *_rowid_conversion, version.second, UINT64_MAX,
missed_rows.get(), location_map.get(), _tablet->tablet_meta()->delete_bitmap(),
&output_rowset_delete_bitmap);

Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class Compaction {
Version _output_version;

int64_t _newest_write_timestamp {-1};
RowIdConversion _rowid_conversion;
std::unique_ptr<RowIdConversion> _rowid_conversion = nullptr;
TabletSchemaSPtr _cur_tablet_schema;

std::unique_ptr<RuntimeProfile> _profile;
Expand Down
29 changes: 26 additions & 3 deletions be/src/olap/rowid_conversion.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include "olap/olap_common.h"
#include "olap/utils.h"
#include "runtime/thread_context.h"

namespace doris {

Expand All @@ -33,17 +34,24 @@ namespace doris {
class RowIdConversion {
public:
RowIdConversion() = default;
~RowIdConversion() = default;
~RowIdConversion() { RELEASE_THREAD_MEM_TRACKER(_seg_rowid_map_mem_used); }

// resize segment rowid map to its rows num
void init_segment_map(const RowsetId& src_rowset_id, const std::vector<uint32_t>& num_rows) {
size_t delta_std_pair_cap = 0;
for (size_t i = 0; i < num_rows.size(); i++) {
uint32_t id = _segments_rowid_map.size();
_segment_to_id_map.emplace(std::pair<RowsetId, uint32_t> {src_rowset_id, i}, id);
_id_to_segment_map.emplace_back(src_rowset_id, i);
_segments_rowid_map.emplace_back(std::vector<std::pair<uint32_t, uint32_t>>(
num_rows[i], std::pair<uint32_t, uint32_t>(UINT32_MAX, UINT32_MAX)));
std::vector<std::pair<uint32_t, uint32_t>> vec(
num_rows[i], std::pair<uint32_t, uint32_t>(UINT32_MAX, UINT32_MAX));
delta_std_pair_cap += vec.capacity();
_segments_rowid_map.emplace_back(std::move(vec));
}
//NOTE: manually count _segments_rowid_map's memory here, because _segments_rowid_map could be used by indexCompaction.
// indexCompaction is a thridparty code, it's too complex to modify it.
// refer compact_column.
track_mem_usage(delta_std_pair_cap);
}

// set dst rowset id
Expand Down Expand Up @@ -109,12 +117,27 @@ class RowIdConversion {
return _segment_to_id_map.at(segment);
}

private:
void track_mem_usage(size_t delta_std_pair_cap) {
_std_pair_cap += delta_std_pair_cap;

size_t new_size =
_std_pair_cap * sizeof(std::pair<uint32_t, uint32_t>) +
_segments_rowid_map.capacity() * sizeof(std::vector<std::pair<uint32_t, uint32_t>>);

RELEASE_THREAD_MEM_TRACKER(_seg_rowid_map_mem_used);
CONSUME_THREAD_MEM_TRACKER(new_size);
_seg_rowid_map_mem_used = new_size;
}

private:
// the first level vector: index indicates src segment.
// the second level vector: index indicates row id of source segment,
// value indicates row id of destination segment.
// <UINT32_MAX, UINT32_MAX> indicates current row not exist.
std::vector<std::vector<std::pair<uint32_t, uint32_t>>> _segments_rowid_map;
size_t _seg_rowid_map_mem_used {0};
size_t _std_pair_cap {0};

// Map source segment to 0 to n
std::map<std::pair<RowsetId, uint32_t>, uint32_t> _segment_to_id_map;
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/beta_rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ class BetaRowsetWriter : public BaseBetaRowsetWriter {
// already been segment compacted
std::atomic<int32_t> _num_segcompacted {0}; // index for segment compaction

std::shared_ptr<SegcompactionWorker> _segcompaction_worker;
std::shared_ptr<SegcompactionWorker> _segcompaction_worker = nullptr;

// ensure only one inflight segcompaction task for each rowset
std::atomic<bool> _is_doing_segcompaction {false};
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/rowset/segcompaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,8 @@ Status SegcompactionWorker::_create_segment_writer_for_segcompaction(
}

Status SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPtr segments) {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->segcompaction_mem_tracker());
DCHECK(_seg_compact_mem_tracker != nullptr);
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_seg_compact_mem_tracker);
/* throttle segcompaction task if memory depleted */
if (GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) {
return Status::Error<FETCH_MEMORY_EXCEEDED>("skip segcompaction due to memory shortage");
Expand Down
10 changes: 9 additions & 1 deletion be/src/olap/rowset/segcompaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ class SegcompactionWorker {
public:
explicit SegcompactionWorker(BetaRowsetWriter* writer);

~SegcompactionWorker() {
DCHECK(_seg_compact_mem_tracker != nullptr);
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_seg_compact_mem_tracker);
if (_rowid_conversion) {
_rowid_conversion.reset();
}
}

void compact_segments(SegCompactionCandidatesSharedPtr segments);

bool need_convert_delete_bitmap();
Expand Down Expand Up @@ -95,7 +103,7 @@ class SegcompactionWorker {
InvertedIndexFileWriterPtr _inverted_index_file_writer = nullptr;

// for unique key mow table
std::unique_ptr<SimpleRowIdConversion> _rowid_conversion;
std::unique_ptr<SimpleRowIdConversion> _rowid_conversion {nullptr};
DeleteBitmapPtr _converted_delete_bitmap;
std::shared_ptr<MemTrackerLimiter> _seg_compact_mem_tracker = nullptr;

Expand Down
5 changes: 3 additions & 2 deletions be/src/olap/simple_rowid_conversion.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include "olap/olap_common.h"
#include "olap/utils.h"
#include "vec/common/custom_allocator.h"

namespace doris {

Expand All @@ -37,7 +38,7 @@ class SimpleRowIdConversion {
_cur_dst_segment_rowid = 0;
for (auto seg_rows : num_rows) {
_segments_rowid_map.emplace(seg_rows.first,
std::vector<uint32_t>(seg_rows.second, UINT32_MAX));
DorisVector<uint32_t>(seg_rows.second, UINT32_MAX));
}
}

Expand Down Expand Up @@ -72,7 +73,7 @@ class SimpleRowIdConversion {
// key: index indicates src segment.
// value: index indicates row id of source segment, value indicates row id of destination
// segment. UINT32_MAX indicates current row not exist.
std::map<uint32_t, std::vector<uint32_t>> _segments_rowid_map;
DorisMap<uint32_t, DorisVector<uint32_t>> _segments_rowid_map;

// dst rowset id
RowsetId _rowst_id;
Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/thread_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,8 @@ class ThreadContext {
// to nullptr, but the object it points to is not initialized. At this time, when the memory
// is released somewhere, the hook is triggered to cause the crash.
std::unique_ptr<ThreadMemTrackerMgr> thread_mem_tracker_mgr;
[[nodiscard]] MemTrackerLimiter* thread_mem_tracker() const {
return thread_mem_tracker_mgr->limiter_mem_tracker().get();
[[nodiscard]] std::shared_ptr<MemTrackerLimiter> thread_mem_tracker() const {
return thread_mem_tracker_mgr->limiter_mem_tracker();
}

QueryThreadContext query_thread_context();
Expand Down
82 changes: 82 additions & 0 deletions be/src/vec/common/custom_allocator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include "vec/common/allocator.h"
#include "vec/common/allocator_fwd.h"

template <class T, typename MemoryAllocator = Allocator<true>>
class CustomStdAllocator;

template <typename T>
using DorisVector = std::vector<T, CustomStdAllocator<T>>;

template <class Key, class T, class Compare = std::less<Key>,
class Allocator = CustomStdAllocator<std::pair<const Key, T>>>
using DorisMap = std::map<Key, T, Compare, Allocator>;

// NOTE: Even CustomStdAllocator 's allocate/dallocate could modify memory tracker,but it's still stateless,
// because threadcontext owns the memtracker, not CustomStdAllocator.
template <class T, typename MemoryAllocator>
class CustomStdAllocator : private MemoryAllocator {
public:
using value_type = T;
using pointer = T*;
using const_pointer = const T*;
using size_type = std::size_t;
using difference_type = std::ptrdiff_t;

CustomStdAllocator() noexcept = default;

template <class U>
struct rebind {
typedef CustomStdAllocator<U> other;
};

template <class Up>
CustomStdAllocator(const CustomStdAllocator<Up>&) noexcept {}

T* allocate(size_t n) { return static_cast<T*>(MemoryAllocator::alloc(n * sizeof(T))); }

void deallocate(T* ptr, size_t n) noexcept { MemoryAllocator::free((void*)ptr, n * sizeof(T)); }

size_t max_size() const noexcept { return size_t(~0) / sizeof(T); }

T* allocate(size_t n, const void*) { return allocate(n); }

template <class Up, class... Args>
void construct(Up* p, Args&&... args) {
::new ((void*)p) Up(std::forward<Args>(args)...);
}

void destroy(T* p) { p->~T(); }

T* address(T& t) const noexcept { return std::addressof(t); }

T* address(const T& t) const noexcept { return std::addressof(t); }
};

template <class T, class Up>
bool operator==(const CustomStdAllocator<T>&, const CustomStdAllocator<Up>&) {
return true;
}

template <class T, class Up>
bool operator!=(const CustomStdAllocator<T>&, const CustomStdAllocator<Up>&) {
return false;
}
Loading