Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 31 additions & 35 deletions be/src/vec/aggregate_functions/aggregate_function_window.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,11 @@
#pragma once

#include <glog/logging.h>
#include <stddef.h>
#include <stdint.h>

#include <algorithm>
#include <boost/iterator/iterator_facade.hpp>
#include <cstdint>
#include <memory>
#include <ostream>

#include "gutil/integral_types.h"
#include "vec/aggregate_functions/aggregate_function.h"
Expand Down Expand Up @@ -354,15 +352,10 @@ struct LeadLagData {
static constexpr bool result_nullable = result_is_nullable;
void reset() {
_data_value.reset();
_default_value.reset();
_is_inited = false;
_offset_value = 0;
}

bool default_is_null() { return _default_value.is_null(); }

// here _ptr pointer default column from third
void set_value_from_default() { this->_data_value = _default_value; }

void insert_result_into(IColumn& to) const {
if constexpr (result_is_nullable) {
if (_data_value.is_null()) {
Expand All @@ -379,8 +372,6 @@ struct LeadLagData {
}
}

void set_is_null() { this->_data_value.reset(); }

void set_value(const IColumn** columns, size_t pos) {
if constexpr (arg_is_nullable) {
if (assert_cast<const ColumnNullable*, TypeCheckOnRelease::DISABLE>(columns[0])
Expand All @@ -394,40 +385,47 @@ struct LeadLagData {
_data_value.set_value(columns[0], pos);
}

void check_default(const IColumn* column) {
if (!_is_inited) {
if (is_column_nullable(*column)) {
const auto* nullable_column =
assert_cast<const ColumnNullable*, TypeCheckOnRelease::DISABLE>(column);
if (nullable_column->is_null_at(0)) {
_default_value.reset();
} else {
_default_value.set_value(nullable_column->get_nested_column_ptr().get(), 0);
}
void set_value_from_default(const IColumn* column, size_t pos) {
DCHECK_GE(pos, 0);
if (is_column_nullable(*column)) {
const auto* nullable_column =
assert_cast<const ColumnNullable*, TypeCheckOnRelease::DISABLE>(column);
if (nullable_column->is_null_at(pos)) {
this->_data_value.reset();
} else {
_default_value.set_value(column, 0);
this->_data_value.set_value(nullable_column->get_nested_column_ptr().get(), pos);
}
} else {
this->_data_value.set_value(column, pos);
}
}

void set_offset_value(const IColumn* column) {
if (!_is_inited) {
const auto* column_number = assert_cast<const ColumnInt64*>(column);
_offset_value = column_number->get_data()[0];
_is_inited = true;
}
}

int64_t get_offset_value() const { return _offset_value; }

private:
BaseValue<ColVecType, arg_is_nullable> _data_value;
BaseValue<ColVecType, arg_is_nullable> _default_value;
bool _is_inited = false;
int64_t _offset_value = 0;
};

template <typename Data, bool = false>
struct WindowFunctionLeadImpl : Data {
void add_range_single_place(int64_t partition_start, int64_t partition_end, int64_t frame_start,
int64_t frame_end, const IColumn** columns) {
this->check_default(columns[2]);
if (frame_end > partition_end) { //output default value, win end is under partition
if (this->default_is_null()) {
this->set_is_null();
} else {
this->set_value_from_default();
}
this->set_offset_value(columns[1]);
// eg: lead(column, 10, default_value), column size maybe 3 rows
// offset value 10 is from second argument, pos: 11 is calculated as frame_end
auto pos = frame_end - 1 - this->get_offset_value();
this->set_value_from_default(columns[2], pos);
return;
}
this->set_value(columns, frame_end - 1);
Expand All @@ -440,13 +438,11 @@ template <typename Data, bool = false>
struct WindowFunctionLagImpl : Data {
void add_range_single_place(int64_t partition_start, int64_t partition_end, int64_t frame_start,
int64_t frame_end, const IColumn** columns) {
this->check_default(columns[2]);
// window start is beyond partition
if (partition_start >= frame_end) { //[unbound preceding(0), offset preceding(-123)]
if (this->default_is_null()) { // win start is beyond partition
this->set_is_null();
} else {
this->set_value_from_default();
}
this->set_offset_value(columns[1]);
auto pos = frame_end - 1 + this->get_offset_value();
this->set_value_from_default(columns[2], pos);
return;
}
this->set_value(columns, frame_end - 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void checkLegalityBeforeTypeCoercion() {
return;
}
if (children().size() >= 2) {
checkValidParams(getOffset(), true);
checkValidParams(getOffset());
if (getOffset() instanceof Literal) {
if (((Literal) getOffset()).getDouble() < 0) {
throw new AnalysisException(
Expand All @@ -109,9 +109,6 @@ public void checkLegalityBeforeTypeCoercion() {
throw new AnalysisException(
"The offset parameter of LAG must be a constant positive integer: " + this.toSql());
}
if (children().size() >= 3) {
checkValidParams(getDefaultValue(), false);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void checkLegalityBeforeTypeCoercion() {
return;
}
if (children().size() >= 2) {
checkValidParams(getOffset(), true);
checkValidParams(getOffset());
if (getOffset() instanceof Literal) {
if (((Literal) getOffset()).getDouble() < 0) {
throw new AnalysisException(
Expand All @@ -104,9 +104,6 @@ public void checkLegalityBeforeTypeCoercion() {
throw new AnalysisException(
"The offset parameter of LAG must be a constant positive integer: " + this.toSql());
}
if (children().size() >= 3) {
checkValidParams(getDefaultValue(), false);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ public int computeHashCode() {
/**
* LAG/LEAD param must be const, and offset must be number
*/
protected void checkValidParams(Expression param, boolean isOffset) {
protected void checkValidParams(Expression param) {
DataType type = param.getDataType();
if (isOffset == true && !type.isNumericType()) {
if (!type.isNumericType()) {
throw new AnalysisException("The offset of LAG/LEAD must be a number: " + this.toSql());
}
if (!param.isConstant()) {
throw new AnalysisException(
"The parameter 2 or parameter 3 of LAG/LEAD must be a constant value: " + this.toSql());
"The parameter 2 of LAG/LEAD must be a constant value: " + this.toSql());
}
}
}
11 changes: 11 additions & 0 deletions regression-test/data/correctness_p0/test_lag_lead_window.out
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ a aa /wyyt-image/2021/11/13/595345040188712460.jpg
b aa /wyyt-image/2022/04/13/1434607674511761493.jpg /wyyt-image/2022/04/13/1434607674511761493.jpg
c cc /wyyt-image/2022/04/13/1434607674511761493.jpg

-- !select_default3 --
a /wyyt-image/2021/11/13/595345040188712460.jpg aa aa aa
b /wyyt-image/2022/04/13/1434607674511761493.jpg aa /wyyt-image/2022/04/13/1434607674511761493.jpg aa
c /wyyt-image/2022/04/13/1434607674511761493.jpg cc cc /wyyt-image/2022/04/13/1434607674511761493.jpg

-- !select_default --
c 2022-09-06T00:00:02 2022-09-06T00:00:01
b 2022-09-06T00:00:01 2022-09-06T00:00
Expand Down Expand Up @@ -49,3 +54,9 @@ a 2022-09-06T00:00 \N
b 2022-09-06T00:00:01 \N
c 2022-09-06T00:00:02 \N

-- !select_lead_7 --
2023-01-01 1 10 20 10
2023-01-02 1 20 30 10
2023-01-03 1 30 \N 20
2023-01-04 1 \N \N 30

39 changes: 39 additions & 0 deletions regression-test/suites/correctness_p0/test_lag_lead_window.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ suite("test_lag_lead_window") {
lead(cc,1,'') over (PARTITION by cc order by aa) as lead_cc
from ${tableName}
order by aa; """

qt_select_default3 """ select aa,cc,bb,lead(cc,1,bb) over (PARTITION by cc order by aa) as lead_res,
lag(cc,1,bb) over (PARTITION by cc order by aa) as lag_res
from ${tableName}
order by aa; """

sql """ DROP TABLE IF EXISTS test1 """
sql """ CREATE TABLE IF NOT EXISTS test1 (id varchar(255), create_time datetime)
DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1"); """
Expand All @@ -61,4 +67,37 @@ suite("test_lag_lead_window") {

sql """ DROP TABLE IF EXISTS test1 """


qt_select_lead_7 """ SELECT
sale_date,
product_id,
quantity,
LEAD (quantity, 1, quantity) OVER (
PARTITION BY
product_id
ORDER BY
sale_date
) AS next_day_quantity,
LAG (quantity, 1, quantity) OVER (
PARTITION BY
product_id
ORDER BY
sale_date
) AS pre_day_quantity
FROM
(
select 1 AS product_id, '2023-01-01' AS sale_date, 10 AS quantity
UNION ALL
select 1, '2023-01-02', 20
UNION ALL
select 1, '2023-01-03', 30
UNION ALL
select 1, '2023-01-04', NULL
) AS t
ORDER BY
product_id,
sale_date;
"""


}
Loading