-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Description
Doris supports batch delete
BACKGROUND
At present, Doris supports multiple import methods such as broker load, routine load, stream load, etc. For the deletion of data, it can only be deleted through the delete statement. When the delete statement is used, a new version of the data will be generated every time the delete is executed. If frequent deletions will seriously affect query performance, and when using delete mode to delete, it is achieved by generating an empty rowset to record the deletion conditions. Each read must filter the deletion jump condition, also when there are many conditions Impact on performance. Compared with other systems, the implementation of greenplum is more like a traditional database product, and snowflake is implemented through merge syntax.
For a scenario similar to the import of cdc data, insert and delete are generally interspersed in the data data. In the face of this scenario, our current import method cannot be satisfied, even if we can separate insert and delete. Problem, but still cannot solve the problem of deletion.
Design goals
Functional level:
Enhance the import function so that it can support the following scenarios:
Simple batch import, currently supported- Batch point deletion
- Import and delete mixed data import
- only support for unique_keys
Ease of use:
Minimize the modification of import syntax, and be compatible with the current import syntax
Performance
Import and read performance should be basically the same as the current import method, and there should not be too much performance loss
detailed design
The import syntax is to add a column to indicate whether the current row is imported or deleted. If there is no default behavior to insert rows, the function of this level of upgrade is only implemented on segmentV2, v1 is not considered for the time being, in the index file of the segment file IndexRegion A bitmap index similar to null bitmap is added to mark the rows to be deleted.
Data structure design
A bitmap index (delete_index_page) needs to be added to the segment structure to indicate which row is marked for deletion. The PagePointerPB structure is the same as previously defined, using bitmap as the index.
message SegmentFooterPB {
optional uint32 version = 1 [default = 1]; // file version
repeated ColumnMetaPB columns = 2; // tablet schema
optional uint32 num_rows = 3; // number of values
optional uint64 index_footprint = 4; // total idnex footprint of all columns
optional uint64 data_footprint = 5; // total data footprint of all columns
optional uint64 raw_data_footprint = 6; // raw data footprint
optional CompressionTypePB compress_type = 7 [default = LZ4F]; // default compression type for file columns
repeated MetadataPairPB file_meta_datas = 8; // meta data of file
// Short key index's page
optional PagePointerPB short_key_index_page = 9;,
// Use bitmap index to indicate which row is marked for deleting
optional PagePointerPB delete_index_page = 10;
}
Import syntax
The syntax design of the import is mainly to add a column map that specifies the field to delete the marked column, and this column needs to be added to the imported data. The method of setting each import method is as follows
stream load
The writing of stream load adds a field to set the deletion mark column in the column field of the header, example
-H "columns: k1, k2, label_c3" -H "merge_type: [MERGE|APPEND|DELETE]" -H "delete: label_c3=1"
broker load
Set the field to delete the marked column at PROPERTIES
LOAD LABEL db1.label1
(
[MERGE|APPEND|DELETE] DATA INFILE("hdfs://abc.com:8888/user/palo/test/ml/file1")
INTO TABLE tbl1
COLUMNS TERMINATED BY ","
(tmp_c1,tmp_c2, label_c3)
SET
(
id=tmp_c2,
name=tmp_c1,
)
[DELETE ON label=true]
)
WITH BROKER'broker'
(
"username"="user",
"password"="pass"
)
PROPERTIES
(
"timeout" = "3600"
);
reoutine load
Routine load adds mapping in the columns field. The mapping method is the same as above, the example is as follows
CREATE ROUTINE LOAD example_db.test1 ON example_tbl
[WITH MERGE|APPEND|DELETE]
COLUMNS(k1, k2, k3, v1, v2, label),
WHERE k1> 100 and k2 like "%doris%"
[DELETE ON label=true]
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false",
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"kafka_partitions" = "0,1,2,3",
"kafka_offsets" = "101,0,0,200"
);
Import
The process of data import is as follows:
When the imported data contains a delete mark and the delete mark is true, write the data and record the line number of the secondary line in the segment, and record it in the delete index, otherwise write the data directly, there can be an optimization here when the mark is deleted The value column of this row can be set to the value of the most space occupied by the corresponding type. For example, for the varchar type, we can set the value to an empty string to save space.
st=>start: Start Load
flag_cond=>condition: delete flag is true
write_rowdata=>operation: write data
write_rowdata_opt=>operation: write Data with minimum values
write_delete_index=>operation: write delete index
e=>end
st->flag_cond
flag_cond(yes)->write_rowdata_opt->write_delete_index->e
flag_cond(no)->write_rowdata->e
Suppose there is a table
+-------+-------------+------+-------+---------+---------+
| Field | Type | Null | Key | Default | Extra |
+-------+-------------+------+-------+---------+---------+
| k1 | INT | Yes | true | 0 | |
| k2 | SMALLINT | Yes | true | NULL | |
| k3 | VARCHAR(32) | Yes | true | | |
| v1 | BIGINT | Yes | false | 0 | REPLACE |
+-------+-------------+------+-------+---------+---------+
Import data is
0,1,foo,3,false
0,1,foo,2,true
1,2,bar,3,true
0,1,foo,5,false
If it is UNIQUE_KEYS, only the latest result can be recorded, so the first line and the second line are meaningless and will be ignored when importing the aggregation. Then the data recorded in the data area of the rowset is
1,2,bar,3
0,1,foo,5
delete_index is the record [1,0] in the bitmap, indicating that the first line in the batch of imported data is marked for deletion.
Read
UNIQUE_KEYS table
At present, when the UNIQUE_KEYS table is read, the data is merged from the high version to the floor version. Since unique_keys can ensure that there is only one record in each segment, only the first version needs to be read. If it is marked for deletion, skip all. Otherwise, return to the first line directly, and skip the rest.
Compaction
In compaction, the functions of cumulative compaction and base compaction are different. Cumulative compaction needs to save delete index, and delete index can be deleted after base compaction is over.
Cumulative Compaction
Cumulative compaction is to merge from a lower version to a higher version, similar to a read operation, but the rows that need to be deleted need to be read out and merged and need to be added to the new delete index. Since delete index is a bitmap index, it exists in each segment file,
Base Compaction
Base compaction is also an operation similar to read, but because it is a full read, there is no need to record the delete index. Just read the data directly and filter out the rows to be deleted according to the previous compaction logic.
