Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion be/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,8 @@ E(INVERTED_INDEX_NO_TERMS, -6005);
E(INVERTED_INDEX_RENAME_FILE_FAILED, -6006);
E(INVERTED_INDEX_EVALUATE_SKIPPED, -6007);
E(INVERTED_INDEX_BUILD_WAITTING, -6008);
E(KEY_NOT_FOUND, -6009);
E(KEY_ALREADY_EXISTS, -6010);
#undef E
} // namespace ErrorCode

Expand Down Expand Up @@ -305,7 +307,9 @@ constexpr bool capture_stacktrace(int code) {
&& code != ErrorCode::TRANSACTION_NOT_EXIST
&& code != ErrorCode::TRANSACTION_ALREADY_VISIBLE
&& code != ErrorCode::TOO_MANY_TRANSACTIONS
&& code != ErrorCode::TRANSACTION_ALREADY_COMMITTED;
&& code != ErrorCode::TRANSACTION_ALREADY_COMMITTED
&& code != ErrorCode::KEY_NOT_FOUND
&& code != ErrorCode::KEY_ALREADY_EXISTS;
}
// clang-format on

Expand Down
9 changes: 5 additions & 4 deletions be/src/olap/rowset/segment_v2/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -367,14 +367,14 @@ Status Segment::lookup_row_key(const Slice& key, bool with_seq_col, RowLocation*

DCHECK(_pk_index_reader != nullptr);
if (!_pk_index_reader->check_present(key_without_seq)) {
return Status::NotFound("Can't find key in the segment");
return Status::Error<ErrorCode::KEY_NOT_FOUND>("Can't find key in the segment");
}
bool exact_match = false;
std::unique_ptr<segment_v2::IndexedColumnIterator> index_iterator;
RETURN_IF_ERROR(_pk_index_reader->new_iterator(&index_iterator));
RETURN_IF_ERROR(index_iterator->seek_at_or_after(&key_without_seq, &exact_match));
if (!has_seq_col && !exact_match) {
return Status::NotFound("Can't find key in the segment");
return Status::Error<ErrorCode::KEY_NOT_FOUND>("Can't find key in the segment");
}
row_location->row_id = index_iterator->get_current_ordinal();
row_location->segment_id = _segment_id;
Expand All @@ -396,7 +396,7 @@ Status Segment::lookup_row_key(const Slice& key, bool with_seq_col, RowLocation*

// compare key
if (key_without_seq.compare(sought_key_without_seq) != 0) {
return Status::NotFound("Can't find key in the segment");
return Status::Error<ErrorCode::KEY_NOT_FOUND>("Can't find key in the segment");
}

if (!with_seq_col) {
Expand All @@ -409,7 +409,8 @@ Status Segment::lookup_row_key(const Slice& key, bool with_seq_col, RowLocation*
Slice previous_sequence_id = Slice(
sought_key.get_data() + sought_key_without_seq.get_size() + 1, seq_col_length - 1);
if (sequence_id.compare(previous_sequence_id) < 0) {
return Status::AlreadyExist("key with higher sequence id exists");
return Status::Error<ErrorCode::KEY_ALREADY_EXISTS>(
"key with higher sequence id exists");
}
}

Expand Down
6 changes: 3 additions & 3 deletions be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
RowsetSharedPtr rowset;
auto st = _tablet->lookup_row_key(key, have_input_seq_column, specified_rowsets, &loc,
_mow_context->max_version, segment_caches, &rowset);
if (st.is<NOT_FOUND>()) {
if (st.is<KEY_NOT_FOUND>()) {
if (_tablet_schema->is_strict_mode()) {
++num_rows_filtered;
// delete the invalid newly inserted row
Expand All @@ -436,7 +436,7 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
use_default_or_null_flag.emplace_back(true);
continue;
}
if (!st.ok() && !st.is<ALREADY_EXIST>()) {
if (!st.ok() && !st.is<KEY_ALREADY_EXISTS>()) {
LOG(WARNING) << "failed to lookup row key, error: " << st;
return st;
}
Expand All @@ -454,7 +454,7 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
_tablet->prepare_to_read(loc, segment_pos, &_rssid_to_rid);
}

if (st.is<ALREADY_EXIST>()) {
if (st.is<KEY_ALREADY_EXISTS>()) {
// although we need to mark delete current row, we still need to read missing columns
// for this row, we need to ensure that each column is aligned
_mow_context->delete_bitmap->add({_opts.rowset_ctx->rowset_id, _segment_id, 0},
Expand Down
34 changes: 7 additions & 27 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2829,10 +2829,10 @@ Status Tablet::lookup_row_key(const Slice& encoded_key, bool with_seq_col,

for (auto id : picked_segments) {
Status s = segments[id]->lookup_row_key(encoded_key, with_seq_col, &loc);
if (s.is<NOT_FOUND>()) {
if (s.is<KEY_NOT_FOUND>()) {
continue;
}
if (!s.ok() && !s.is<ALREADY_EXIST>()) {
if (!s.ok() && !s.is<KEY_ALREADY_EXISTS>()) {
return s;
}
if (s.ok() && _tablet_meta->delete_bitmap().contains_agg_without_cache(
Expand All @@ -2845,7 +2845,7 @@ Status Tablet::lookup_row_key(const Slice& encoded_key, bool with_seq_col,
// The key is deleted, we don't need to search for it any more.
break;
}
// `st` is either OK or ALREADY_EXIST now.
// `st` is either OK or KEY_ALREADY_EXISTS now.
// for partial update, even if the key is already exists, we still need to
// read it's original values to keep all columns align.
*row_location = loc;
Expand All @@ -2858,7 +2858,7 @@ Status Tablet::lookup_row_key(const Slice& encoded_key, bool with_seq_col,
}
}
g_tablet_pk_not_found << 1;
return Status::NotFound("can't find key in all rowsets");
return Status::Error<ErrorCode::KEY_NOT_FOUND>("can't find key in all rowsets");
}

// load segment may do io so it should out lock
Expand Down Expand Up @@ -2972,17 +2972,17 @@ Status Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
RowsetSharedPtr rowset_find;
auto st = lookup_row_key(key, true, specified_rowsets, &loc, dummy_version.first - 1,
segment_caches, &rowset_find);
bool expected_st = st.ok() || st.is<NOT_FOUND>() || st.is<ALREADY_EXIST>();
bool expected_st = st.ok() || st.is<KEY_NOT_FOUND>() || st.is<KEY_ALREADY_EXISTS>();
DCHECK(expected_st) << "unexpected error status while lookup_row_key:" << st;
if (!expected_st) {
return st;
}
if (st.is<NOT_FOUND>()) {
if (st.is<KEY_NOT_FOUND>()) {
continue;
}

// sequence id smaller than the previous one, so delete current row
if (st.is<ALREADY_EXIST>()) {
if (st.is<KEY_ALREADY_EXISTS>()) {
delete_bitmap->add({rowset_id, seg->id(), 0}, row_id);
continue;
} else if (is_partial_update && rowset_writer != nullptr) {
Expand Down Expand Up @@ -3212,26 +3212,6 @@ void Tablet::prepare_to_read(const RowLocation& row_location, size_t pos,
seg_it->second.emplace_back(RidAndPos {row_location.row_id, pos});
}

Status Tablet::_check_pk_in_pre_segments(
RowsetId rowset_id, const std::vector<segment_v2::SegmentSharedPtr>& pre_segments,
const Slice& key, DeleteBitmapPtr delete_bitmap, RowLocation* loc) {
for (auto it = pre_segments.rbegin(); it != pre_segments.rend(); ++it) {
auto st = (*it)->lookup_row_key(key, true, loc);
DCHECK(st.ok() || st.is<NOT_FOUND>() || st.is<ALREADY_EXIST>())
<< "unexpected error status while lookup_row_key:" << st;
if (st.is<NOT_FOUND>()) {
continue;
} else if (st.ok() && _schema->has_sequence_col() &&
delete_bitmap->contains({rowset_id, loc->segment_id, 0}, loc->row_id)) {
// if has sequence col, we continue to compare the sequence_id of
// all segments, util we find an existing key.
continue;
}
return st;
}
return Status::NotFound("Can't find key in the segment");
}

void Tablet::_rowset_ids_difference(const RowsetIdUnorderedSet& cur,
const RowsetIdUnorderedSet& pre, RowsetIdUnorderedSet* to_add,
RowsetIdUnorderedSet* to_del) {
Expand Down
4 changes: 0 additions & 4 deletions be/src/olap/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -580,10 +580,6 @@ class Tablet : public BaseTablet {
bool _reconstruct_version_tracker_if_necessary();
void _init_context_common_fields(RowsetWriterContext& context);

Status _check_pk_in_pre_segments(RowsetId rowset_id,
const std::vector<segment_v2::SegmentSharedPtr>& pre_segments,
const Slice& key, DeleteBitmapPtr delete_bitmap,
RowLocation* loc);
void _rowset_ids_difference(const RowsetIdUnorderedSet& cur, const RowsetIdUnorderedSet& pre,
RowsetIdUnorderedSet* to_add, RowsetIdUnorderedSet* to_del);
Status _load_rowset_segments(const RowsetSharedPtr& rowset,
Expand Down
2 changes: 1 addition & 1 deletion be/src/service/point_query_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ Status PointQueryExecutor::_lookup_row_key() {
st = (_tablet->lookup_row_key(_row_read_ctxs[i]._primary_key, true, specified_rowsets,
&location, INT32_MAX /*rethink?*/, segment_caches,
rowset_ptr.get()));
if (st.is<ErrorCode::NOT_FOUND>()) {
if (st.is<ErrorCode::KEY_NOT_FOUND>()) {
continue;
}
RETURN_IF_ERROR(st);
Expand Down