Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 21 additions & 30 deletions be/src/olap/accept_null_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ namespace doris {
* but pass (set/return true) for NULL value rows.
*
* At parent, it's used for topn runtime predicate.
* Eg: original input indexs is '1,2,3,7,8,9' and value of index9 is null, we get nested predicate output index is '1,2,3', but we finally output '1,2,3,9'
*/
class AcceptNullPredicate : public ColumnPredicate {
ENABLE_FACTORY_CREATOR(AcceptNullPredicate);
Expand All @@ -44,8 +45,6 @@ class AcceptNullPredicate : public ColumnPredicate {

PredicateType type() const override { return _nested->type(); }

void set_nested(ColumnPredicate* nested) { _nested.reset(nested); }

Status evaluate(BitmapIndexIterator* iterator, uint32_t num_rows,
roaring::Roaring* roaring) const override {
return _nested->evaluate(iterator, num_rows, roaring);
Expand All @@ -64,18 +63,14 @@ class AcceptNullPredicate : public ColumnPredicate {
void evaluate_and(const vectorized::IColumn& column, const uint16_t* sel, uint16_t size,
bool* flags) const override {
if (column.has_null()) {
// copy original flags
bool original_flags[size];
memcpy(original_flags, flags, size * sizeof(bool));
std::vector<uint8_t> original_flags(size);
memcpy(original_flags.data(), flags, size);

const auto& nullable_col = assert_cast<const vectorized::ColumnNullable&>(column);
// call evaluate_and and restore true for NULL rows
_nested->evaluate_and(nullable_col.get_nested_column(), sel, size, flags);
const auto& nullmap = nullable_col.get_null_map_data();
for (uint16_t i = 0; i < size; ++i) {
uint16_t idx = sel[i];
if (original_flags[i] && !flags[i] && nullable_col.is_null_at(idx)) {
flags[i] = true;
}
flags[i] |= (original_flags[i] && nullmap[sel[i]]);
}
} else {
_nested->evaluate_and(column, sel, size, flags);
Expand All @@ -84,20 +79,7 @@ class AcceptNullPredicate : public ColumnPredicate {

void evaluate_or(const vectorized::IColumn& column, const uint16_t* sel, uint16_t size,
bool* flags) const override {
if (column.has_null()) {
const auto& nullable_col = assert_cast<const vectorized::ColumnNullable&>(column);
_nested->evaluate_or(nullable_col.get_nested_column(), sel, size, flags);

// call evaluate_or and set true for NULL rows
for (uint16_t i = 0; i < size; ++i) {
uint16_t idx = sel[i];
if (!flags[i] && nullable_col.is_null_at(idx)) {
flags[i] = true;
}
}
} else {
_nested->evaluate_or(column, sel, size, flags);
}
DCHECK(false) << "should not reach here";
}

bool evaluate_and(const std::pair<WrapperField*, WrapperField*>& statistic) const override {
Expand Down Expand Up @@ -138,8 +120,8 @@ class AcceptNullPredicate : public ColumnPredicate {
bool* flags) const override {
if (column.has_null()) {
// copy original flags
bool original_flags[size];
memcpy(original_flags, flags, size * sizeof(bool));
std::vector<uint8_t> original_flags(size);
memcpy(original_flags.data(), flags, size);

const auto& nullable_col = assert_cast<const vectorized::ColumnNullable&>(column);
// call evaluate_and_vec and restore true for NULL rows
Expand All @@ -160,23 +142,32 @@ class AcceptNullPredicate : public ColumnPredicate {
uint16_t _evaluate_inner(const vectorized::IColumn& column, uint16_t* sel,
uint16_t size) const override {
if (column.has_null()) {
if (size == 0) return 0;
if (size == 0) {
return 0;
}
// create selected_flags
uint16_t max_idx = sel[size - 1];
bool selected[max_idx + 1];
std::vector<uint16_t> old_sel(size);
memcpy(old_sel.data(), sel, sizeof(uint16_t) * size);

const auto& nullable_col = assert_cast<const vectorized::ColumnNullable&>(column);
memcpy(selected, nullable_col.get_null_map_data().data(), (max_idx + 1) * sizeof(bool));
// call nested predicate evaluate
uint16_t new_size = _nested->evaluate(nullable_col.get_nested_column(), sel, size);

// process NULL values
if (new_size < size) {
std::vector<uint8_t> selected(max_idx + 1, 0);
const auto* nullmap = nullable_col.get_null_map_data().data();
// add rows selected by _nested->evaluate
for (uint16_t i = 0; i < new_size; ++i) {
uint16_t row_idx = sel[i];
selected[row_idx] = true;
}
// reset null from original data
for (uint16_t i = 0; i < size; ++i) {
uint16_t row_idx = old_sel[i];
selected[row_idx] |= nullmap[row_idx];
}

// recaculate new_size and sel array
new_size = 0;
Expand All @@ -198,4 +189,4 @@ class AcceptNullPredicate : public ColumnPredicate {
std::unique_ptr<ColumnPredicate> _nested;
};

} //namespace doris
} //namespace doris
4 changes: 2 additions & 2 deletions be/src/olap/shared_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,9 @@ class SharedPredicate : public ColumnPredicate {
std::string _debug_string() const override {
std::shared_lock<std::shared_mutex> lock(_mtx);
if (!_nested) {
return "shared_predicate<unknow>";
return "shared_predicate(unknow)";
}
return "shared_predicate<" + _nested->debug_string() + ">";
return "shared_predicate(" + _nested->debug_string() + ")";
}

mutable std::shared_mutex _mtx;
Expand Down
13 changes: 13 additions & 0 deletions regression-test/data/nereids_arith_p0/topn/accept_null.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !test --
100 dd 100 0
1000 dd 1000 0
10000 dd 10000 0
10001 dd 10001 0
10002 dd 10002 0
10003 dd 10003 0
10004 dd 10004 0
10005 dd 10005 0
10006 dd 10006 0
10007 dd 10007 0

110 changes: 110 additions & 0 deletions regression-test/suites/nereids_arith_p0/topn/accept_null.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import org.codehaus.groovy.runtime.IOGroovyMethods

suite ("accept_null") {
sql """ drop table IF EXISTS detail_tmp;"""

sql """
CREATE TABLE `detail_tmp` (
`id` VARCHAR(512) NOT NULL,
`accident_no` VARCHAR(512) NULL,
`accident_type_name` VARCHAR(512) NULL
) ENGINE=OLAP
UNIQUE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS AUTO
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"min_load_replica_num" = "-1",
"is_being_synced" = "false",
"storage_medium" = "hdd",
"storage_format" = "V2",
"inverted_index_storage_format" = "V1",
"enable_unique_key_merge_on_write" = "true",
"light_schema_change" = "true",
"disable_auto_compaction" = "false",
"enable_single_replica_compaction" = "false",
"group_commit_interval_ms" = "10000",
"group_commit_data_bytes" = "134217728",
"enable_mow_light_delete" = "false"
);
"""

sql "insert into detail_tmp(id,accident_type_name,accident_no) select e1,'dd',e1 from (select 1 k1) as t lateral view explode_numbers(100000) tmp1 as e1;"
sql "delete from detail_tmp where accident_no <100;"

def tablets = sql_return_maparray """ show tablets from detail_tmp; """

// before full compaction, there are 7 rowsets in all tablets.
for (def tablet : tablets) {
int rowsetCount = 0
def (code, out, err) = curl("GET", tablet.CompactionStatus)
logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def tabletJson = parseJson(out.trim())
assert tabletJson.rowsets instanceof List
}

// trigger full compactions for all tablets by table id in ${tableName}
def backendId_to_backendIP = [:]
def backendId_to_backendHttpPort = [:]
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
boolean disableAutoCompaction = true
for(int i=0;i<backendId_to_backendIP.keySet().size();i++){
backend_id = backendId_to_backendIP.keySet()[i]
def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id))
logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def configList = parseJson(out.trim())
assert configList instanceof List

for (Object ele in (List) configList) {
assert ele instanceof List<String>
if (((List<String>) ele)[0] == "disable_auto_compaction") {
disableAutoCompaction = Boolean.parseBoolean(((List<String>) ele)[2])
}
}
}

for (def tablet : tablets) {
String tablet_id = tablet.TabletId
def tablet_info = sql_return_maparray """ show tablet ${tablet_id}; """
logger.info("tablet"+tablet_info)
def table_id = tablet_info[0].TableId
backend_id = tablet.BackendId
def times = 1
def code, out, err
do{
(code, out, err) = be_run_full_compaction_by_table_id(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), table_id)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
++times
sleep(2000)
} while (parseJson(out.trim()).status.toLowerCase()!="success" && times<=10)

def compactJson = parseJson(out.trim())
if (compactJson.status.toLowerCase() == "fail") {
assertEquals(disableAutoCompaction, false)
logger.info("Compaction was done automatically!")
}
if (disableAutoCompaction) {
assertEquals("success", compactJson.status.toLowerCase())
}
}

qt_test "select id,accident_type_name,accident_no,__DORIS_DELETE_SIGN__ From detail_tmp where accident_type_name = 'dd' order by accident_no,id limit 10;"
}