Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/olap/iterators.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ class StorageReadOptions {
std::vector<uint32_t>* read_orderby_key_columns = nullptr;

IOContext io_ctx;
Version version;
};

// Used to read data in RowBlockV2 one by one
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/rowset/beta_rowset_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
// convert RowsetReaderContext to StorageReadOptions
_read_options.stats = _stats;
_read_options.push_down_agg_type_opt = _context->push_down_agg_type_opt;
_read_options.version = _rowset->version();
if (read_context->lower_bound_keys != nullptr) {
for (int i = 0; i < read_context->lower_bound_keys->size(); ++i) {
_read_options.key_ranges.emplace_back(&read_context->lower_bound_keys->at(i),
Expand Down
31 changes: 31 additions & 0 deletions be/src/olap/rowset/segment_v2/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -999,6 +999,33 @@ Status SegmentIterator::_read_columns_by_index(uint32_t nrows_read_limit, uint32
return Status::OK();
}

void SegmentIterator::_replace_version_col(size_t num_rows) {
// Only the rowset with single version need to replace the version column.
// Doris can't determine the version before publish_version finished, so
// we can't write data to __DORIS_VERSION_COL__ in segment writer, the value
// is 0 by default.
// So we need to replace the value to real version while reading.
if (_opts.version.first != _opts.version.second) {
return;
}
auto cids = _schema.column_ids();
int32_t version_idx = _schema.version_col_idx();
auto iter = std::find(cids.begin(), cids.end(), version_idx);
if (iter == cids.end()) {
return;
}

auto column_desc = _schema.column(version_idx);
auto column = Schema::get_data_type_ptr(*column_desc)->create_column();
DCHECK(_schema.column(version_idx)->type() == FieldType::OLAP_FIELD_TYPE_BIGINT);
auto col_ptr = reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(column.get());
for (size_t j = 0; j < num_rows; j++) {
col_ptr->insert_value(_opts.version.second);
}
_current_return_columns[version_idx] = std::move(column);
VLOG_DEBUG << "replaced version column in segment iterator, version_col_idx:" << version_idx;
}

uint16_t SegmentIterator::_evaluate_vectorization_predicate(uint16_t* sel_rowid_idx,
uint16_t selected_size) {
SCOPED_RAW_TIMER(&_opts.stats->vec_cond_ns);
Expand Down Expand Up @@ -1158,6 +1185,7 @@ Status SegmentIterator::next_batch(vectorized::Block* block) {
}

if (!_is_need_vec_eval && !_is_need_short_eval) {
_replace_version_col(_current_batch_rows_read);
_output_non_pred_columns(block);
} else {
_convert_dict_code_for_predicate_if_necessary();
Expand All @@ -1184,6 +1212,7 @@ Status SegmentIterator::next_batch(vectorized::Block* block) {
if (!_lazy_materialization_read) {
Status ret = Status::OK();
if (selected_size > 0) {
_replace_version_col(selected_size);
ret = _output_column_by_sel_idx(block, _first_read_column_ids, sel_rowid_idx,
selected_size);
}
Expand All @@ -1203,6 +1232,8 @@ Status SegmentIterator::next_batch(vectorized::Block* block) {
RETURN_IF_ERROR(_read_columns_by_rowids(_non_predicate_columns, _block_rowids,
sel_rowid_idx, selected_size));

_replace_version_col(selected_size);

// step4: output columns
// 4.1 output non-predicate column
_output_non_pred_columns(block);
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/rowset/segment_v2/segment_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ class SegmentIterator : public RowwiseIterator {
vectorized::MutableColumns& column_block, size_t nrows);
Status _read_columns_by_index(uint32_t nrows_read_limit, uint32_t& nrows_read,
bool set_block_rowid);
void _replace_version_col(size_t num_rows);
void _init_current_block(vectorized::Block* block,
std::vector<vectorized::MutableColumnPtr>& non_pred_vector);
uint16_t _evaluate_vectorization_predicate(uint16_t* sel_rowid_idx, uint16_t selected_size);
Expand Down
11 changes: 11 additions & 0 deletions be/src/olap/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ class Schema {
if (column.is_key()) {
++num_key_columns;
}
if (column.name() == VERSION_COL) {
_version_col_idx = cid;
}
columns.push_back(column);
}
_delete_sign_idx = tablet_schema->delete_sign_idx();
Expand All @@ -72,6 +75,9 @@ class Schema {
if (columns[i].name() == DELETE_SIGN) {
_delete_sign_idx = i;
}
if (columns[i].name() == VERSION_COL) {
_version_col_idx = i;
}
_unique_ids[i] = columns[i].unique_id();
}
_init(columns, col_ids, num_key_columns);
Expand All @@ -97,6 +103,9 @@ class Schema {
if (cols.at(cid)->name() == DELETE_SIGN) {
_delete_sign_idx = cid;
}
if (cols.at(cid)->name() == VERSION_COL) {
_version_col_idx = cid;
}
_unique_ids[cid] = cols[cid]->unique_id();
}

Expand Down Expand Up @@ -145,6 +154,7 @@ class Schema {
int32_t unique_id(size_t index) const { return _unique_ids[index]; }
int32_t delete_sign_idx() const { return _delete_sign_idx; }
bool has_sequence_col() const { return _has_sequence_col; }
int32_t version_col_idx() const { return _version_col_idx; }

private:
void _init(const std::vector<TabletColumn>& cols, const std::vector<ColumnId>& col_ids,
Expand All @@ -169,6 +179,7 @@ class Schema {
size_t _schema_size;
int32_t _delete_sign_idx = -1;
bool _has_sequence_col = false;
int32_t _version_col_idx = -1;
};

} // namespace doris
6 changes: 6 additions & 0 deletions be/src/olap/tablet_schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,8 @@ void TabletSchema::append_column(TabletColumn column, bool is_dropped_column) {
_delete_sign_idx = _num_columns;
} else if (UNLIKELY(column.name() == SEQUENCE_COL)) {
_sequence_col_idx = _num_columns;
} else if (UNLIKELY(column.name() == VERSION_COL)) {
_version_col_idx = _num_columns;
}
// The dropped column may have same name with exsiting column, so that
// not add to name to index map, only for uid to index map
Expand Down Expand Up @@ -599,6 +601,7 @@ void TabletSchema::init_from_pb(const TabletSchemaPB& schema) {
_disable_auto_compaction = schema.disable_auto_compaction();
_delete_sign_idx = schema.delete_sign_idx();
_sequence_col_idx = schema.sequence_col_idx();
_version_col_idx = schema.version_col_idx();
_sort_type = schema.sort_type();
_sort_col_num = schema.sort_col_num();
_compression_type = schema.compression_type();
Expand Down Expand Up @@ -657,6 +660,8 @@ void TabletSchema::build_current_tablet_schema(int64_t index_id, int32_t version
_delete_sign_idx = _num_columns;
} else if (UNLIKELY(column->name() == SEQUENCE_COL)) {
_sequence_col_idx = _num_columns;
} else if (UNLIKELY(column->name() == VERSION_COL)) {
_version_col_idx = _num_columns;
}
_field_name_to_index[column->name()] = _num_columns;
_field_id_to_index[column->unique_id()] = _num_columns;
Expand Down Expand Up @@ -726,6 +731,7 @@ void TabletSchema::to_schema_pb(TabletSchemaPB* tablet_schema_pb) const {
tablet_schema_pb->set_sort_col_num(_sort_col_num);
tablet_schema_pb->set_schema_version(_schema_version);
tablet_schema_pb->set_compression_type(_compression_type);
tablet_schema_pb->set_version_col_idx(_version_col_idx);
}

uint32_t TabletSchema::mem_size() const {
Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/tablet_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ class TabletSchema {
void set_delete_sign_idx(int32_t delete_sign_idx) { _delete_sign_idx = delete_sign_idx; }
bool has_sequence_col() const { return _sequence_col_idx != -1; }
int32_t sequence_col_idx() const { return _sequence_col_idx; }
void set_version_col_idx(int32_t version_col_idx) { _version_col_idx = version_col_idx; }
int32_t version_col_idx() const { return _version_col_idx; }
segment_v2::CompressionTypePB compression_type() const { return _compression_type; }

const std::vector<TabletIndex>& indexes() const { return _indexes; }
Expand Down Expand Up @@ -241,6 +243,7 @@ class TabletSchema {
bool _is_in_memory = false;
int32_t _delete_sign_idx = -1;
int32_t _sequence_col_idx = -1;
int32_t _version_col_idx = -1;
int32_t _schema_version = -1;
bool _disable_auto_compaction = false;
};
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
namespace doris {
void write_log_info(char* buf, size_t buf_len, const char* fmt, ...);
static const std::string DELETE_SIGN = "__DORIS_DELETE_SIGN__";
static const std::string VERSION_COL = "__DORIS_VERSION_COL__";

// 用来加速运算
const static int32_t g_power_table[] = {1, 10, 100, 1000, 10000,
Expand Down
3 changes: 2 additions & 1 deletion be/test/olap/test_data/header_without_inc_rs.txt
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@
"sort_col_num": 0,
"compression_type": "LZ4F",
"schema_version": 0,
"disable_auto_compaction": false
"disable_auto_compaction": false,
"version_col_idx": -1
},
"rs_metas": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1411,6 +1411,12 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static boolean enable_batch_delete_by_default = true;

/**
* Whether to add a version column when create unique table
*/
@ConfField(mutable = true, masterOnly = true)
public static boolean enable_hidden_version_column_by_default = true;

/**
* Used to set default db data quota bytes.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -926,6 +926,8 @@ private boolean addColumnInternal(OlapTable olapTable, Column newColumn, ColumnP
throw new DdlException("Can not enable batch delete support, already supported batch delete.");
} else if (newColName.equalsIgnoreCase(Column.SEQUENCE_COL)) {
throw new DdlException("Can not enable sequence column support, already supported sequence column.");
} else if (newColName.equalsIgnoreCase(Column.VERSION_COL)) {
throw new DdlException("Can not enable version column support, already supported version column.");
} else {
if (ignoreSameColumn && newColumn.equals(foundColumn)) {
//for add columns rpc, allow add same type column.
Expand Down
10 changes: 10 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,16 @@ public static ColumnDef newSequenceColumnDef(Type type, AggregateType aggregateT
"sequence column hidden column", false);
}

public static ColumnDef newVersionColumnDef() {
return new ColumnDef(Column.VERSION_COL, TypeDef.create(PrimitiveType.BIGINT), false, null, false,
new ColumnDef.DefaultValue(true, "0"), "doris version hidden column", false);
}

public static ColumnDef newVersionColumnDef(AggregateType aggregateType) {
return new ColumnDef(Column.VERSION_COL, TypeDef.create(PrimitiveType.BIGINT), false, aggregateType, false,
new ColumnDef.DefaultValue(true, "0"), "doris version hidden column", false);
}

public boolean isAllowNull() {
return isAllowNull;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,14 @@ public void analyze(Analyzer analyzer) throws UserException, AnalysisException {
columnDefs.add(ColumnDef.newDeleteSignColumnDef(AggregateType.REPLACE));
}
}
if (Config.enable_hidden_version_column_by_default && keysDesc != null
&& keysDesc.getKeysType() == KeysType.UNIQUE_KEYS) {
if (enableUniqueKeyMergeOnWrite) {
columnDefs.add(ColumnDef.newVersionColumnDef(AggregateType.NONE));
} else {
columnDefs.add(ColumnDef.newVersionColumnDef(AggregateType.REPLACE));
}
}
boolean hasObjectStored = false;
String objectStoredColumn = "";
Set<String> columnSet = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
Expand Down
7 changes: 7 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class Column implements Writable, GsonPostProcessable {
private static final Logger LOG = LogManager.getLogger(Column.class);
public static final String DELETE_SIGN = "__DORIS_DELETE_SIGN__";
public static final String SEQUENCE_COL = "__DORIS_SEQUENCE_COL__";
public static final String VERSION_COL = "__DORIS_VERSION_COL__";
private static final String COLUMN_ARRAY_CHILDREN = "item";
public static final int COLUMN_UNIQUE_ID_INIT_VALUE = -1;

Expand Down Expand Up @@ -277,6 +278,12 @@ public boolean isSequenceColumn() {
|| aggregationType == AggregateType.NONE) && nameEquals(SEQUENCE_COL, true);
}

public boolean isVersionColumn() {
// aggregationType is NONE for unique table with merge on write.
return !visible && (aggregationType == AggregateType.REPLACE
|| aggregationType == AggregateType.NONE) && nameEquals(VERSION_COL, true);
}

public PrimitiveType getDataType() {
return type.getPrimitiveType();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ public TCreateTabletReq toThrift() {
}
int deleteSign = -1;
int sequenceCol = -1;
int versionCol = -1;
List<TColumn> tColumns = new ArrayList<TColumn>();
for (int i = 0; i < columns.size(); i++) {
Column column = columns.get(i);
Expand All @@ -213,10 +214,14 @@ public TCreateTabletReq toThrift() {
if (column.isSequenceColumn()) {
sequenceCol = i;
}
if (column.isVersionColumn()) {
versionCol = i;
}
}
tSchema.setColumns(tColumns);
tSchema.setDeleteSignIdx(deleteSign);
tSchema.setSequenceColIdx(sequenceCol);
tSchema.setVersionColIdx(versionCol);

if (CollectionUtils.isNotEmpty(indexes)) {
List<TOlapTableIndex> tIndexes = new ArrayList<>();
Expand Down
1 change: 1 addition & 0 deletions gensrc/proto/olap_file.proto
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ message TabletSchemaPB {
optional int32 schema_version = 14;
optional bool disable_auto_compaction = 15 [default=false];
repeated TabletIndexPB index = 16;
optional int32 version_col_idx = 17 [default = -1];
}

enum TabletStatePB {
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/AgentService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ struct TTabletSchema {
11: optional Types.TSortType sort_type
12: optional i32 sort_col_num
13: optional bool disable_auto_compaction
14: optional i32 version_col_idx = -1
}

// this enum stands for different storage format in src_backends
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
3 1

-- !select_skip_merge --
1 1 0
1 11 0
2 1 0
2 11 0
3 1 0
1 1 0 2
1 11 0 3
2 1 0 2
2 11 0 3
3 1 0 4

-- !select_batch_delete --
2 11
Expand All @@ -23,16 +23,16 @@
3 1

-- !select_skip_merge_after_delete --
1 1 0
1 11 0
1 111 1
3 1 0
1 1 0 2
1 11 0 3
1 111 1 5
3 1 0 4

-- !select_skip_delete2 --
1 1 0
1 11 0
1 111 1
2 1 0
2 11 0
3 1 0
1 1 0 2
1 11 0 3
1 111 1 5
2 1 0 2
2 11 0 3
3 1 0 4

Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ int_value INT Yes false \N REPLACE
char_value CHAR(10) Yes false \N REPLACE
date_value DATE Yes false \N REPLACE
__DORIS_DELETE_SIGN__ TINYINT No false 0 REPLACE
__DORIS_VERSION_COL__ BIGINT No false 0 REPLACE
__DORIS_SEQUENCE_COL__ INT Yes false \N REPLACE

-- !desc_uniq_table --
Expand All @@ -13,5 +14,6 @@ int_value INT Yes false \N REPLACE
char_value CHAR(10) Yes false \N REPLACE
date_value DATE Yes false \N REPLACE
__DORIS_DELETE_SIGN__ TINYINT No false 0 REPLACE
__DORIS_VERSION_COL__ BIGINT No false 0 REPLACE
__DORIS_SEQUENCE_COL__ INT Yes false \N REPLACE

Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,18 @@
3 6 11

-- !all --
1 10 15 16 17 0 15
15 8 19 20 21 0 19
2 5 14 13 14 0 12
3 6 11 14 15 0 13
1 10 15 16 17 0 4 15
15 8 19 20 21 0 7 19
2 5 14 13 14 0 5 12
3 6 11 14 15 0 6 13

-- !desc --
k1 INT Yes true \N
v1 TINYINT Yes false \N REPLACE
v2 INT Yes false \N REPLACE
v3 INT Yes false \N REPLACE
v4 INT Yes false \N REPLACE
__DORIS_DELETE_SIGN__ TINYINT No false 0 REPLACE
__DORIS_VERSION_COL__ BIGINT No false 0 REPLACE
__DORIS_SEQUENCE_COL__ INT Yes false \N REPLACE

Loading