Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ Compaction::Compaction(const TabletSharedPtr& tablet, const std::string& label)
_state(CompactionState::INITED) {
_mem_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::COMPACTION, label);
init_profile(label);
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker);
_rowid_conversion = std::make_unique<RowIdConversion>();
}

Compaction::~Compaction() {
Expand All @@ -90,6 +92,7 @@ Compaction::~Compaction() {
_input_rowsets.clear();
_output_rowset.reset();
_cur_tablet_schema.reset();
_rowid_conversion.reset();
}

void Compaction::init_profile(const std::string& label) {
Expand Down Expand Up @@ -378,7 +381,7 @@ Status Compaction::do_compaction_impl(int64_t permits) {
if (!ctx.columns_to_do_index_compaction.empty() ||
(_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write())) {
stats.rowid_conversion = &_rowid_conversion;
stats.rowid_conversion = _rowid_conversion.get();
}
int64_t way_num = merge_way_num();

Expand Down Expand Up @@ -964,7 +967,7 @@ Status Compaction::modify_rowsets(const Merger::Statistics* stats) {
// TODO(LiaoXin): check if there are duplicate keys
std::size_t missed_rows_size = 0;
_tablet->calc_compaction_output_rowset_delete_bitmap(
_input_rowsets, _rowid_conversion, 0, version.second + 1, missed_rows.get(),
_input_rowsets, *_rowid_conversion, 0, version.second + 1, missed_rows.get(),
location_map.get(), _tablet->tablet_meta()->delete_bitmap(),
&output_rowset_delete_bitmap);
if (missed_rows) {
Expand Down Expand Up @@ -1024,7 +1027,7 @@ Status Compaction::modify_rowsets(const Merger::Statistics* stats) {
}
DeleteBitmap txn_output_delete_bitmap(_tablet->tablet_id());
_tablet->calc_compaction_output_rowset_delete_bitmap(
_input_rowsets, _rowid_conversion, 0, UINT64_MAX, missed_rows.get(),
_input_rowsets, *_rowid_conversion, 0, UINT64_MAX, missed_rows.get(),
location_map.get(), *it.delete_bitmap.get(), &txn_output_delete_bitmap);
if (config::enable_merge_on_write_correctness_check) {
RowsetIdUnorderedSet rowsetids;
Expand All @@ -1044,7 +1047,7 @@ Status Compaction::modify_rowsets(const Merger::Statistics* stats) {
// Convert the delete bitmap of the input rowsets to output rowset for
// incremental data.
_tablet->calc_compaction_output_rowset_delete_bitmap(
_input_rowsets, _rowid_conversion, version.second, UINT64_MAX,
_input_rowsets, *_rowid_conversion, version.second, UINT64_MAX,
missed_rows.get(), location_map.get(), _tablet->tablet_meta()->delete_bitmap(),
&output_rowset_delete_bitmap);

Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class Compaction {
Version _output_version;

int64_t _newest_write_timestamp;
RowIdConversion _rowid_conversion;
std::unique_ptr<RowIdConversion> _rowid_conversion = nullptr;
TabletSchemaSPtr _cur_tablet_schema;

std::unique_ptr<RuntimeProfile> _profile;
Expand Down
29 changes: 26 additions & 3 deletions be/src/olap/rowid_conversion.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include "olap/olap_common.h"
#include "olap/utils.h"
#include "runtime/thread_context.h"

namespace doris {

Expand All @@ -33,17 +34,24 @@ namespace doris {
class RowIdConversion {
public:
RowIdConversion() = default;
~RowIdConversion() = default;
~RowIdConversion() { RELEASE_THREAD_MEM_TRACKER(_seg_rowid_map_mem_used); }

// resize segment rowid map to its rows num
void init_segment_map(const RowsetId& src_rowset_id, const std::vector<uint32_t>& num_rows) {
size_t delta_std_pair_cap = 0;
for (size_t i = 0; i < num_rows.size(); i++) {
uint32_t id = _segments_rowid_map.size();
_segment_to_id_map.emplace(std::pair<RowsetId, uint32_t> {src_rowset_id, i}, id);
_id_to_segment_map.emplace_back(src_rowset_id, i);
_segments_rowid_map.emplace_back(std::vector<std::pair<uint32_t, uint32_t>>(
num_rows[i], std::pair<uint32_t, uint32_t>(UINT32_MAX, UINT32_MAX)));
std::vector<std::pair<uint32_t, uint32_t>> vec(
num_rows[i], std::pair<uint32_t, uint32_t>(UINT32_MAX, UINT32_MAX));
delta_std_pair_cap += vec.capacity();
_segments_rowid_map.emplace_back(std::move(vec));
}
//NOTE: manually count _segments_rowid_map's memory here, because _segments_rowid_map could be used by indexCompaction.
// indexCompaction is a thridparty code, it's too complex to modify it.
// refer compact_column.
track_mem_usage(delta_std_pair_cap);
}

// set dst rowset id
Expand Down Expand Up @@ -109,12 +117,27 @@ class RowIdConversion {
return _segment_to_id_map.at(segment);
}

private:
void track_mem_usage(size_t delta_std_pair_cap) {
_std_pair_cap += delta_std_pair_cap;

size_t new_size =
_std_pair_cap * sizeof(std::pair<uint32_t, uint32_t>) +
_segments_rowid_map.capacity() * sizeof(std::vector<std::pair<uint32_t, uint32_t>>);

RELEASE_THREAD_MEM_TRACKER(_seg_rowid_map_mem_used);
CONSUME_THREAD_MEM_TRACKER(new_size);
_seg_rowid_map_mem_used = new_size;
}

private:
// the first level vector: index indicates src segment.
// the second level vector: index indicates row id of source segment,
// value indicates row id of destination segment.
// <UINT32_MAX, UINT32_MAX> indicates current row not exist.
std::vector<std::vector<std::pair<uint32_t, uint32_t>>> _segments_rowid_map;
size_t _seg_rowid_map_mem_used {0};
size_t _std_pair_cap {0};

// Map source segment to 0 to n
std::map<std::pair<RowsetId, uint32_t>, uint32_t> _segment_to_id_map;
Expand Down
8 changes: 8 additions & 0 deletions be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,14 @@ Status BetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) {
return Status::OK();
}

Status BetaRowsetWriter::init(const RowsetWriterContext& rowset_writer_context) {
RETURN_IF_ERROR(BaseBetaRowsetWriter::init(rowset_writer_context));
if (_segcompaction_worker) {
_segcompaction_worker->init_mem_tracker(rowset_writer_context.txn_id);
}
return Status::OK();
}

Status BetaRowsetWriter::_load_noncompacted_segment(segment_v2::SegmentSharedPtr& segment,
int32_t segment_id) {
DCHECK(_rowset_meta->is_local());
Expand Down
4 changes: 3 additions & 1 deletion be/src/olap/rowset/beta_rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ class BetaRowsetWriter : public BaseBetaRowsetWriter {

Status build(RowsetSharedPtr& rowset) override;

Status init(const RowsetWriterContext& rowset_writer_context) override;

Status flush_segment_writer_for_segcompaction(
std::unique_ptr<segment_v2::SegmentWriter>* writer, uint64_t index_size,
KeyBoundsPB& key_bounds);
Expand Down Expand Up @@ -231,7 +233,7 @@ class BetaRowsetWriter : public BaseBetaRowsetWriter {
// already been segment compacted
std::atomic<int32_t> _num_segcompacted {0}; // index for segment compaction

std::shared_ptr<SegcompactionWorker> _segcompaction_worker;
std::shared_ptr<SegcompactionWorker> _segcompaction_worker = nullptr;

// ensure only one inflight segcompaction task for each rowset
std::atomic<bool> _is_doing_segcompaction {false};
Expand Down
8 changes: 7 additions & 1 deletion be/src/olap/rowset/segcompaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ using namespace ErrorCode;

SegcompactionWorker::SegcompactionWorker(BetaRowsetWriter* writer) : _writer(writer) {}

void SegcompactionWorker::init_mem_tracker(int64_t txn_id) {
_seg_compact_mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::COMPACTION, "segcompaction-" + std::to_string(txn_id));
}

Status SegcompactionWorker::_get_segcompaction_reader(
SegCompactionCandidatesSharedPtr segments, TabletSharedPtr tablet,
std::shared_ptr<Schema> schema, OlapReaderStatistics* stat,
Expand Down Expand Up @@ -220,7 +225,8 @@ Status SegcompactionWorker::_create_segment_writer_for_segcompaction(
}

Status SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPtr segments) {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->segcompaction_mem_tracker());
DCHECK(_seg_compact_mem_tracker != nullptr);
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_seg_compact_mem_tracker);
/* throttle segcompaction task if memory depleted */
if (GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) {
return Status::Error<FETCH_MEMORY_EXCEEDED>("skip segcompaction due to memory shortage");
Expand Down
13 changes: 12 additions & 1 deletion be/src/olap/rowset/segcompaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ class SegcompactionWorker {
public:
explicit SegcompactionWorker(BetaRowsetWriter* writer);

~SegcompactionWorker() {
DCHECK(_seg_compact_mem_tracker != nullptr);
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_seg_compact_mem_tracker);
if (_rowid_conversion) {
_rowid_conversion.reset();
}
}

void compact_segments(SegCompactionCandidatesSharedPtr segments);

bool need_convert_delete_bitmap();
Expand All @@ -65,6 +73,8 @@ class SegcompactionWorker {
// set the cancel flag, tasks already started will not be cancelled.
bool cancel();

void init_mem_tracker(int64_t txn_id);

private:
Status _create_segment_writer_for_segcompaction(
std::unique_ptr<segment_v2::SegmentWriter>* writer, uint32_t begin, uint32_t end);
Expand All @@ -88,8 +98,9 @@ class SegcompactionWorker {
io::FileWriterPtr _file_writer;

// for unique key mow table
std::unique_ptr<SimpleRowIdConversion> _rowid_conversion;
std::unique_ptr<SimpleRowIdConversion> _rowid_conversion = nullptr;
DeleteBitmapPtr _converted_delete_bitmap;
std::shared_ptr<MemTrackerLimiter> _seg_compact_mem_tracker = nullptr;

// the state is not mutable when 1)actual compaction operation started or 2) cancelled
std::atomic<bool> _is_compacting_state_mutable = true;
Expand Down
5 changes: 3 additions & 2 deletions be/src/olap/simple_rowid_conversion.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include "olap/olap_common.h"
#include "olap/utils.h"
#include "vec/common/custom_allocator.h"

namespace doris {

Expand All @@ -37,7 +38,7 @@ class SimpleRowIdConversion {
_cur_dst_segment_rowid = 0;
for (auto seg_rows : num_rows) {
_segments_rowid_map.emplace(seg_rows.first,
std::vector<uint32_t>(seg_rows.second, UINT32_MAX));
DorisVector<uint32_t>(seg_rows.second, UINT32_MAX));
}
}

Expand Down Expand Up @@ -72,7 +73,7 @@ class SimpleRowIdConversion {
// key: index indicates src segment.
// value: index indicates row id of source segment, value indicates row id of destination
// segment. UINT32_MAX indicates current row not exist.
std::map<uint32_t, std::vector<uint32_t>> _segments_rowid_map;
DorisMap<uint32_t, DorisVector<uint32_t>> _segments_rowid_map;

// dst rowset id
RowsetId _rowst_id;
Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/thread_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,8 @@ class ThreadContext {
// to nullptr, but the object it points to is not initialized. At this time, when the memory
// is released somewhere, the hook is triggered to cause the crash.
std::unique_ptr<ThreadMemTrackerMgr> thread_mem_tracker_mgr;
[[nodiscard]] MemTrackerLimiter* thread_mem_tracker() const {
return thread_mem_tracker_mgr->limiter_mem_tracker_raw();
[[nodiscard]] std::shared_ptr<MemTrackerLimiter> thread_mem_tracker() const {
return thread_mem_tracker_mgr->limiter_mem_tracker();
}

QueryThreadContext query_thread_context();
Expand Down
82 changes: 82 additions & 0 deletions be/src/vec/common/custom_allocator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include "vec/common/allocator.h"
#include "vec/common/allocator_fwd.h"

template <class T, typename MemoryAllocator = Allocator<true>>
class CustomStdAllocator;

template <typename T>
using DorisVector = std::vector<T, CustomStdAllocator<T>>;

template <class Key, class T, class Compare = std::less<Key>,
class Allocator = CustomStdAllocator<std::pair<const Key, T>>>
using DorisMap = std::map<Key, T, Compare, Allocator>;

// NOTE: Even CustomStdAllocator 's allocate/dallocate could modify memory tracker,but it's still stateless,
// because threadcontext owns the memtracker, not CustomStdAllocator.
template <class T, typename MemoryAllocator>
class CustomStdAllocator : private MemoryAllocator {
public:
using value_type = T;
using pointer = T*;
using const_pointer = const T*;
using size_type = std::size_t;
using difference_type = std::ptrdiff_t;

CustomStdAllocator() noexcept = default;

template <class U>
struct rebind {
typedef CustomStdAllocator<U> other;
};

template <class Up>
CustomStdAllocator(const CustomStdAllocator<Up>&) noexcept {}

T* allocate(size_t n) { return static_cast<T*>(MemoryAllocator::alloc(n * sizeof(T))); }

void deallocate(T* ptr, size_t n) noexcept { MemoryAllocator::free((void*)ptr, n * sizeof(T)); }

size_t max_size() const noexcept { return size_t(~0) / sizeof(T); }

T* allocate(size_t n, const void*) { return allocate(n); }

template <class Up, class... Args>
void construct(Up* p, Args&&... args) {
::new ((void*)p) Up(std::forward<Args>(args)...);
}

void destroy(T* p) { p->~T(); }

T* address(T& t) const noexcept { return std::addressof(t); }

T* address(const T& t) const noexcept { return std::addressof(t); }
};

template <class T, class Up>
bool operator==(const CustomStdAllocator<T>&, const CustomStdAllocator<Up>&) {
return true;
}

template <class T, class Up>
bool operator!=(const CustomStdAllocator<T>&, const CustomStdAllocator<Up>&) {
return false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ static RowsetSharedPtr do_compaction(std::vector<RowsetSharedPtr> rowsets,
}

Merger::Statistics stats;
stats.rowid_conversion = &compaction._rowid_conversion;
stats.rowid_conversion = compaction._rowid_conversion.get();
Status st = Merger::vertical_merge_rowsets(
tablet, compaction.compaction_type(), compaction._cur_tablet_schema, input_rs_readers,
compaction._output_rs_writer.get(), 100000, 5, &stats);
Expand Down
Loading