Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions be/src/exec/tablet_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
_is_partial_update = pschema.partial_update();
_is_strict_mode = pschema.is_strict_mode();
_timestamp_ms = pschema.timestamp_ms();
if (pschema.has_nano_seconds()) {
_nano_seconds = pschema.nano_seconds();
}
_timezone = pschema.timezone();

for (auto& col : pschema.partial_update_input_columns()) {
Expand Down Expand Up @@ -211,6 +214,7 @@ void OlapTableSchemaParam::to_protobuf(POlapTableSchemaParam* pschema) const {
pschema->set_is_strict_mode(_is_strict_mode);
pschema->set_timestamp_ms(_timestamp_ms);
pschema->set_timezone(_timezone);
pschema->set_nano_seconds(_nano_seconds);
for (auto col : _partial_update_input_columns) {
*pschema->add_partial_update_input_columns() = col;
}
Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/tablet_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ class OlapTableSchemaParam {
}
void set_timestamp_ms(int64_t timestamp_ms) { _timestamp_ms = timestamp_ms; }
int64_t timestamp_ms() const { return _timestamp_ms; }
void set_nano_seconds(int32_t nano_seconds) { _nano_seconds = nano_seconds; }
int32_t nano_seconds() const { return _nano_seconds; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: function 'nano_seconds' should be marked [[nodiscard]] [modernize-use-nodiscard]

Suggested change
int32_t nano_seconds() const { return _nano_seconds; }
[[nodiscard]] int32_t nano_seconds() const { return _nano_seconds; }

void set_timezone(std::string timezone) { _timezone = timezone; }
std::string timezone() const { return _timezone; }
bool is_strict_mode() const { return _is_strict_mode; }
Expand All @@ -109,6 +111,7 @@ class OlapTableSchemaParam {
std::set<std::string> _partial_update_input_columns;
bool _is_strict_mode = false;
int64_t _timestamp_ms = 0;
int32_t _nano_seconds {0};
std::string _timezone;
};

Expand Down
10 changes: 5 additions & 5 deletions be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -648,11 +648,11 @@ void DeltaWriter::_build_current_tablet_schema(int64_t index_id,
_tablet_schema->set_table_id(table_schema_param->table_id());
// set partial update columns info
_partial_update_info = std::make_shared<PartialUpdateInfo>();
_partial_update_info->init(*_tablet_schema, table_schema_param->is_partial_update(),
table_schema_param->partial_update_input_columns(),
table_schema_param->is_strict_mode(),
table_schema_param->timestamp_ms(), table_schema_param->timezone(),
_cur_max_version);
_partial_update_info->init(
*_tablet_schema, table_schema_param->is_partial_update(),
table_schema_param->partial_update_input_columns(),
table_schema_param->is_strict_mode(), table_schema_param->timestamp_ms(),
table_schema_param->nano_seconds(), table_schema_param->timezone(), _cur_max_version);
}

void DeltaWriter::_request_slave_tablet_pull_rowset(PNodeInfo node_info) {
Expand Down
24 changes: 19 additions & 5 deletions be/src/olap/partial_update_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@ namespace doris {

void PartialUpdateInfo::init(const TabletSchema& tablet_schema, bool partial_update,
const std::set<string>& partial_update_cols, bool is_strict_mode,
int64_t timestamp_ms, const std::string& timezone,
int64_t cur_max_version) {
int64_t timestamp_ms, int32_t nano_seconds,
const std::string& timezone, int64_t cur_max_version) {
is_partial_update = partial_update;
partial_update_input_columns = partial_update_cols;
max_version_in_flush_phase = cur_max_version;
this->timestamp_ms = timestamp_ms;
this->nano_seconds = nano_seconds;
this->timezone = timezone;
missing_cids.clear();
update_cids.clear();
Expand Down Expand Up @@ -66,6 +67,7 @@ void PartialUpdateInfo::to_pb(PartialUpdateInfoPB* partial_update_info_pb) const
can_insert_new_rows_in_partial_update);
partial_update_info_pb->set_is_strict_mode(is_strict_mode);
partial_update_info_pb->set_timestamp_ms(timestamp_ms);
partial_update_info_pb->set_nano_seconds(nano_seconds);
partial_update_info_pb->set_timezone(timezone);
for (const auto& value : default_values) {
partial_update_info_pb->add_default_values(value);
Expand Down Expand Up @@ -94,6 +96,9 @@ void PartialUpdateInfo::from_pb(PartialUpdateInfoPB* partial_update_info_pb) {
is_strict_mode = partial_update_info_pb->is_strict_mode();
timestamp_ms = partial_update_info_pb->timestamp_ms();
timezone = partial_update_info_pb->timezone();
if (partial_update_info_pb->has_nano_seconds()) {
nano_seconds = partial_update_info_pb->nano_seconds();
}
default_values.clear();
for (const auto& value : partial_update_info_pb->default_values()) {
default_values.push_back(value);
Expand All @@ -117,9 +122,18 @@ void PartialUpdateInfo::_generate_default_values_for_missing_cids(
to_lower(tablet_schema.column(cur_cid).default_value())
.find(to_lower("CURRENT_TIMESTAMP")) !=
std::string::npos)) {
vectorized::DateV2Value<vectorized::DateTimeV2ValueType> dtv;
dtv.from_unixtime(timestamp_ms / 1000, timezone);
default_value = dtv.debug_string();
auto pos = to_lower(tablet_schema.column(cur_cid).default_value()).find('(');
if (pos == std::string::npos) {
vectorized::DateV2Value<vectorized::DateTimeV2ValueType> dtv;
dtv.from_unixtime(timestamp_ms / 1000, timezone);
default_value = dtv.debug_string();
} else {
int precision = std::stoi(
tablet_schema.column(cur_cid).default_value().substr(pos + 1));
vectorized::DateV2Value<vectorized::DateTimeV2ValueType> dtv;
dtv.from_unixtime(timestamp_ms / 1000, nano_seconds, timezone, precision);
default_value = dtv.debug_string();
}
} else if (UNLIKELY(tablet_schema.column(cur_cid).type() ==
FieldType::OLAP_FIELD_TYPE_DATEV2 &&
to_lower(tablet_schema.column(cur_cid).default_value())
Expand Down
4 changes: 3 additions & 1 deletion be/src/olap/partial_update_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ class PartialUpdateInfoPB;
struct PartialUpdateInfo {
void init(const TabletSchema& tablet_schema, bool partial_update,
const std::set<std::string>& partial_update_cols, bool is_strict_mode,
int64_t timestamp_ms, const std::string& timezone, int64_t cur_max_version = -1);
int64_t timestamp_ms, int32_t nano_seconds, const std::string& timezone,
int64_t cur_max_version = -1);
void to_pb(PartialUpdateInfoPB* partial_update_info) const;
void from_pb(PartialUpdateInfoPB* partial_update_info);
std::string summary() const;
Expand All @@ -47,6 +48,7 @@ struct PartialUpdateInfo {
bool can_insert_new_rows_in_partial_update {true};
bool is_strict_mode {false};
int64_t timestamp_ms {0};
int32_t nano_seconds {0};
std::string timezone;

// default values for missing cids
Expand Down
6 changes: 6 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
Original file line number Diff line number Diff line change
Expand Up @@ -976,4 +976,10 @@ public boolean isMaterializedViewColumn() {
return getName().startsWith(CreateMaterializedViewStmt.MATERIALIZED_VIEW_NAME_PREFIX)
|| getName().startsWith(CreateMaterializedViewStmt.MATERIALIZED_VIEW_AGGREGATE_NAME_PREFIX);
}

public void setDefaultValueInfo(Column refColumn) {
this.defaultValue = refColumn.defaultValue;
this.defaultValueExprDef = refColumn.defaultValueExprDef;
this.realDefaultValue = refColumn.realDefaultValue;
}
}
21 changes: 20 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1100,7 +1100,7 @@ public void setSequenceMapCol(String colName) {
getOrCreatTableProperty().setSequenceMapCol(colName);
}

public void setSequenceInfo(Type type) {
public void setSequenceInfo(Type type, Column refColumn) {
this.hasSequenceCol = true;
this.sequenceType = type;

Expand All @@ -1114,6 +1114,9 @@ public void setSequenceInfo(Type type) {
// unique key table
sequenceCol = ColumnDef.newSequenceColumnDef(type, AggregateType.REPLACE).toColumn();
}
if (refColumn != null) {
sequenceCol.setDefaultValueInfo(refColumn);
}
// add sequence column at last
fullSchema.add(sequenceCol);
nameToColumn.put(Column.SEQUENCE_COL, sequenceCol);
Expand Down Expand Up @@ -1581,6 +1584,18 @@ public void readFields(DataInput in) throws IOException {
defaultDistributionInfo.markAutoBucket();
}

if (isUniqKeyMergeOnWrite() && getSequenceMapCol() != null) {
// set the hidden sequence column's default value the same with
// the sequence map column's for partial update
String seqMapColName = getSequenceMapCol();
Column seqMapCol = getBaseSchema().stream().filter(col -> col.getName().equalsIgnoreCase(seqMapColName))
.findFirst().orElse(null);
Column hiddenSeqCol = getSequenceCol();
if (seqMapCol != null && hiddenSeqCol != null) {
hiddenSeqCol.setDefaultValueInfo(seqMapCol);
}
}

// temp partitions
tempPartitions = TempPartitions.read(in);
RangePartitionInfo tempRangeInfo = tempPartitions.getPartitionInfo();
Expand Down Expand Up @@ -2350,6 +2365,10 @@ public boolean getEnableUniqueKeyMergeOnWrite() {
return tableProperty.getEnableUniqueKeyMergeOnWrite();
}

public boolean isUniqKeyMergeOnWrite() {
return getKeysType() == KeysType.UNIQUE_KEYS && getEnableUniqueKeyMergeOnWrite();
}

public boolean isDuplicateWithoutKey() {
return getKeysType() == KeysType.DUP_KEYS && getKeysNum() == 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2499,7 +2499,7 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx
throw new DdlException("Sequence type only support integer types and date types");
}
olapTable.setSequenceMapCol(col.getName());
olapTable.setSequenceInfo(col.getType());
olapTable.setSequenceInfo(col.getType(), col);
}
} catch (Exception e) {
throw new DdlException(e.getMessage());
Expand All @@ -2513,7 +2513,7 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx
throw new DdlException("The sequence_col and sequence_type cannot be set at the same time");
}
if (sequenceColType != null) {
olapTable.setSequenceInfo(sequenceColType);
olapTable.setSequenceInfo(sequenceColType, null);
}
} catch (Exception e) {
throw new DdlException(e.getMessage());
Expand Down
3 changes: 3 additions & 0 deletions gensrc/proto/descriptors.proto
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,8 @@ message POlapTableSchemaParam {
optional bool is_strict_mode = 9 [default = false];
optional int64 timestamp_ms = 11 [default = 0];
optional string timezone = 12;
// not used, for upgrade compatibility
optional int32 auto_increment_column_unique_id = 13 [default = -1];
optional int32 nano_seconds = 14 [default = 0];
};

1 change: 1 addition & 0 deletions gensrc/proto/olap_file.proto
Original file line number Diff line number Diff line change
Expand Up @@ -373,4 +373,5 @@ message PartialUpdateInfoPB {
optional bool is_schema_contains_auto_inc_column = 10 [default = false];
repeated string default_values = 11;
optional int64 max_version_in_flush_phase = 12 [default = -1];
optional int32 nano_seconds = 13 [default = 0];
}
Loading