Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -956,7 +956,7 @@ Status BaseBetaRowsetWriter::create_inverted_index_file_writer(
return Status::OK();
}

Status BetaRowsetWriter::_create_segment_writer_for_segcompaction(
Status BetaRowsetWriter::create_segment_writer_for_segcompaction(
std::unique_ptr<segment_v2::SegmentWriter>* writer, int64_t begin, int64_t end) {
DCHECK(begin >= 0 && end >= 0);
std::string path = BetaRowset::local_segment_path_segcompacted(_context.tablet_path,
Expand Down Expand Up @@ -996,6 +996,11 @@ Status BetaRowsetWriter::_create_segment_writer_for_segcompaction(
RETURN_IF_ERROR(_segcompaction_worker->get_file_writer()->close());
}
_segcompaction_worker->get_file_writer().reset(file_writer.release());
if (auto& idx_file_writer = _segcompaction_worker->get_inverted_index_file_writer();
idx_file_writer != nullptr) {
RETURN_IF_ERROR(idx_file_writer->close());
}
_segcompaction_worker->get_inverted_index_file_writer().reset(index_file_writer.release());
return Status::OK();
}

Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/rowset/beta_rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,8 @@ class BetaRowsetWriter : public BaseBetaRowsetWriter {
Status flush_segment_writer_for_segcompaction(
std::unique_ptr<segment_v2::SegmentWriter>* writer, uint64_t index_size,
KeyBoundsPB& key_bounds);
Status create_segment_writer_for_segcompaction(
std::unique_ptr<segment_v2::SegmentWriter>* writer, int64_t begin, int64_t end);

bool is_segcompacted() const { return _num_segcompacted > 0; }

Expand All @@ -291,8 +293,6 @@ class BetaRowsetWriter : public BaseBetaRowsetWriter {
Status _check_segment_number_limit(size_t segnum) override;
int64_t _num_seg() const override;
Status _wait_flying_segcompaction();
Status _create_segment_writer_for_segcompaction(
std::unique_ptr<segment_v2::SegmentWriter>* writer, int64_t begin, int64_t end);
Status _segcompaction_if_necessary();
Status _segcompaction_rename_last_segments();
Status _load_noncompacted_segment(segment_v2::SegmentSharedPtr& segment, int32_t segment_id);
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segcompaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ Status SegcompactionWorker::_check_correctness(OlapReaderStatistics& reader_stat

Status SegcompactionWorker::_create_segment_writer_for_segcompaction(
std::unique_ptr<segment_v2::SegmentWriter>* writer, uint32_t begin, uint32_t end) {
return _writer->_create_segment_writer_for_segcompaction(writer, begin, end);
return _writer->create_segment_writer_for_segcompaction(writer, begin, end);
}

Status SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPtr segments) {
Expand Down
5 changes: 5 additions & 0 deletions be/src/olap/rowset/segcompaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "olap/merger.h"
#include "olap/simple_rowid_conversion.h"
#include "olap/tablet.h"
#include "segment_v2/inverted_index_file_writer.h"
#include "segment_v2/segment.h"

namespace doris {
Expand Down Expand Up @@ -69,6 +70,9 @@ class SegcompactionWorker {
DeleteBitmapPtr get_converted_delete_bitmap() { return _converted_delete_bitmap; }

io::FileWriterPtr& get_file_writer() { return _file_writer; }
InvertedIndexFileWriterPtr& get_inverted_index_file_writer() {
return _inverted_index_file_writer;
}

// set the cancel flag, tasks already started will not be cancelled.
bool cancel();
Expand Down Expand Up @@ -96,6 +100,7 @@ class SegcompactionWorker {
// Currently cloud storage engine doesn't need segcompaction
BetaRowsetWriter* _writer = nullptr;
io::FileWriterPtr _file_writer;
InvertedIndexFileWriterPtr _inverted_index_file_writer = nullptr;

// for unique key mow table
std::unique_ptr<SimpleRowIdConversion> _rowid_conversion = nullptr;
Expand Down
64 changes: 64 additions & 0 deletions be/test/olap/segcompaction_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "olap/rowset/rowset_reader_context.h"
#include "olap/rowset/rowset_writer.h"
#include "olap/rowset/rowset_writer_context.h"
#include "olap/rowset/segment_v2/segment_writer.h"
#include "olap/storage_engine.h"
#include "olap/tablet_meta.h"
#include "olap/tablet_schema.h"
Expand Down Expand Up @@ -178,6 +179,24 @@ class SegCompactionTest : public testing::Test {
tablet_schema->init_from_pb(tablet_schema_pb);
}

void construct_column(ColumnPB* column_pb, TabletIndexPB* tablet_index, int64_t index_id,
const std::string& index_name, int32_t col_unique_id,
const std::string& column_type, const std::string& column_name,
bool parser = false) {
column_pb->set_unique_id(col_unique_id);
column_pb->set_name(column_name);
column_pb->set_type(column_type);
column_pb->set_is_key(false);
column_pb->set_is_nullable(true);
tablet_index->set_index_id(index_id);
tablet_index->set_index_name(index_name);
tablet_index->set_index_type(IndexType::INVERTED);
tablet_index->add_col_unique_id(col_unique_id);
if (parser) {
auto* properties = tablet_index->mutable_properties();
(*properties)[INVERTED_INDEX_PARSER_KEY] = INVERTED_INDEX_PARSER_UNICODE;
}
}
// use different id to avoid conflict
void create_rowset_writer_context(int64_t id, TabletSchemaSPtr tablet_schema,
RowsetWriterContext* rowset_writer_context) {
Expand Down Expand Up @@ -830,6 +849,51 @@ TEST_F(SegCompactionTest, SegCompactionThenReadUniqueTableSmall) {
}
}

TEST_F(SegCompactionTest, CreateSegCompactionWriter) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: function 'TEST_F' exceeds recommended size/complexity thresholds [readability-function-size]

TEST_F(SegCompactionTest, CreateSegCompactionWriter) {
^
Additional context

be/test/olap/segcompaction_test.cpp:851: 264 lines including whitespace and comments (threshold 80)

TEST_F(SegCompactionTest, CreateSegCompactionWriter) {
^

config::enable_segcompaction = true;
Status s;
TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>();
TabletSchemaPB schema_pb;
schema_pb.set_keys_type(KeysType::DUP_KEYS);
schema_pb.set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V2);

construct_column(schema_pb.add_column(), schema_pb.add_index(), 10000, "key_index", 0, "INT",
"key");
construct_column(schema_pb.add_column(), schema_pb.add_index(), 10001, "v1_index", 1, "STRING",
"v1");
construct_column(schema_pb.add_column(), schema_pb.add_index(), 10002, "v2_index", 2, "STRING",
"v2", true);
construct_column(schema_pb.add_column(), schema_pb.add_index(), 10003, "v3_index", 3, "INT",
"v3");

tablet_schema.reset(new TabletSchema);
tablet_schema->init_from_pb(schema_pb);
RowsetSharedPtr rowset;
config::segcompaction_candidate_max_rows = 6000; // set threshold above
// rows_per_segment
config::segcompaction_batch_size = 3;
std::vector<uint32_t> segment_num_rows;
{
RowsetWriterContext writer_context;
create_rowset_writer_context(10052, tablet_schema, &writer_context);

auto res = RowsetFactory::create_rowset_writer(*l_engine, writer_context, false);
EXPECT_TRUE(res.has_value()) << res.error();
auto rowset_writer = std::move(res).value();
EXPECT_EQ(Status::OK(), s);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant?

auto beta_rowset_writer = dynamic_cast<BetaRowsetWriter*>(rowset_writer.get());
EXPECT_TRUE(beta_rowset_writer != nullptr);
std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr;
auto status = beta_rowset_writer->create_segment_writer_for_segcompaction(&writer, 0, 1);
EXPECT_TRUE(beta_rowset_writer != nullptr);
EXPECT_TRUE(status == Status::OK());
int64_t inverted_index_file_size = 0;
status = writer->close_inverted_index(&inverted_index_file_size);
EXPECT_TRUE(status == Status::OK());
std::cout << inverted_index_file_size << std::endl;
}
}

TEST_F(SegCompactionTest, SegCompactionThenReadAggTableSmall) {
config::enable_segcompaction = true;
Status s;
Expand Down