From 2a5ae0b377f2e48ad2e4ff532c825d8d97b42d25 Mon Sep 17 00:00:00 2001 From: Fedomn Date: Mon, 26 Dec 2022 22:36:57 +0800 Subject: [PATCH] feat(planner): support limit operator Signed-off-by: Fedomn --- src/execution/expression_executor.rs | 14 +- src/execution/physical_plan/mod.rs | 4 + src/execution/physical_plan/physical_limit.rs | 21 +++ src/execution/physical_plan_generator.rs | 1 + src/execution/util.rs | 18 ++- src/execution/volcano_executor/limit.rs | 122 ++++++++++++++++++ src/execution/volcano_executor/mod.rs | 7 + src/planner_v2/binder/errors.rs | 7 + .../binder/query_node/bind_result_modifier.rs | 80 ++++++++++++ .../binder/query_node/bind_select_node.rs | 15 ++- src/planner_v2/binder/query_node/mod.rs | 4 + .../binder/query_node/plan_result_modifier.rs | 28 ++++ .../binder/query_node/plan_select_node.rs | 3 + .../binder/statement/bind_select.rs | 5 +- src/planner_v2/operator/logical_limit.rs | 13 ++ src/planner_v2/operator/mod.rs | 25 ++++ src/util/tree_render.rs | 15 +++ tests/slt/limit.slt | 38 ++++++ 18 files changed, 414 insertions(+), 6 deletions(-) create mode 100644 src/execution/physical_plan/physical_limit.rs create mode 100644 src/execution/volcano_executor/limit.rs create mode 100644 src/planner_v2/binder/query_node/bind_result_modifier.rs create mode 100644 src/planner_v2/binder/query_node/plan_result_modifier.rs create mode 100644 src/planner_v2/operator/logical_limit.rs diff --git a/src/execution/expression_executor.rs b/src/execution/expression_executor.rs index a9f9df4..5831f5c 100644 --- a/src/execution/expression_executor.rs +++ b/src/execution/expression_executor.rs @@ -1,8 +1,10 @@ use arrow::array::ArrayRef; +use arrow::datatypes::DataType; use arrow::record_batch::RecordBatch; -use super::ExecutorError; +use super::{ExecutorError, RecordBatchUtil}; use crate::planner_v2::BoundExpression; +use crate::types_v2::ScalarValue; /// ExpressionExecutor is responsible for executing a set of expressions and storing the result in a /// data chunk @@ -20,6 +22,16 @@ impl ExpressionExecutor { Ok(result) } + pub fn execute_scalar(expression: &BoundExpression) -> Result { + let input = RecordBatchUtil::new_one_row_dummy_batch()?; + let res = Self::execute(&[expression.clone()], &input)?; + assert!(res.len() == 1); + let col = res.get(0).unwrap(); + assert_eq!(DataType::from(expression.return_type()), *col.data_type()); + let val = ScalarValue::try_from_array(col, 0)?; + Ok(val) + } + fn execute_internal( expr: &BoundExpression, input: &RecordBatch, diff --git a/src/execution/physical_plan/mod.rs b/src/execution/physical_plan/mod.rs index bb34ca5..e9230f4 100644 --- a/src/execution/physical_plan/mod.rs +++ b/src/execution/physical_plan/mod.rs @@ -5,6 +5,7 @@ mod physical_explain; mod physical_expression_scan; mod physical_filter; mod physical_insert; +mod physical_limit; mod physical_projection; mod physical_table_scan; @@ -16,6 +17,7 @@ pub use physical_explain::*; pub use physical_expression_scan::*; pub use physical_filter::*; pub use physical_insert::*; +pub use physical_limit::*; pub use physical_projection::*; pub use physical_table_scan::*; @@ -38,6 +40,7 @@ pub enum PhysicalOperator { PhysicalProjection(PhysicalProjection), PhysicalColumnDataScan(PhysicalColumnDataScan), PhysicalFilter(PhysicalFilter), + PhysicalLimit(PhysicalLimit), } impl PhysicalOperator { @@ -51,6 +54,7 @@ impl PhysicalOperator { PhysicalOperator::PhysicalDummyScan(op) => &op.base.children, PhysicalOperator::PhysicalColumnDataScan(op) => &op.base.children, PhysicalOperator::PhysicalFilter(op) => &op.base.children, + PhysicalOperator::PhysicalLimit(op) => &op.base.children, } } } diff --git a/src/execution/physical_plan/physical_limit.rs b/src/execution/physical_plan/physical_limit.rs new file mode 100644 index 0000000..84b7753 --- /dev/null +++ b/src/execution/physical_plan/physical_limit.rs @@ -0,0 +1,21 @@ +use derive_new::new; + +use super::{PhysicalOperator, PhysicalOperatorBase}; +use crate::execution::PhysicalPlanGenerator; +use crate::planner_v2::LogicalLimit; + +#[derive(new, Clone)] +pub struct PhysicalLimit { + pub(crate) base: PhysicalOperatorBase, + pub(crate) limit: Option, + pub(crate) offset: Option, +} + +impl PhysicalPlanGenerator { + pub(crate) fn create_physical_limit(&self, op: LogicalLimit) -> PhysicalOperator { + let base = self.create_physical_operator_base(op.base); + let limit = op.limit.map(|_| op.limit_value); + let offset = op.offset.map(|_| op.offsert_value); + PhysicalOperator::PhysicalLimit(PhysicalLimit::new(base, limit, offset)) + } +} diff --git a/src/execution/physical_plan_generator.rs b/src/execution/physical_plan_generator.rs index 76bf1c5..650d957 100644 --- a/src/execution/physical_plan_generator.rs +++ b/src/execution/physical_plan_generator.rs @@ -43,6 +43,7 @@ impl PhysicalPlanGenerator { LogicalOperator::LogicalDummyScan(op) => self.create_physical_dummy_scan(op), LogicalOperator::LogicalExplain(op) => self.create_physical_explain(op), LogicalOperator::LogicalFilter(op) => self.create_physical_filter(op), + LogicalOperator::LogicalLimit(op) => self.create_physical_limit(op), } } diff --git a/src/execution/util.rs b/src/execution/util.rs index 64d144c..68620aa 100644 --- a/src/execution/util.rs +++ b/src/execution/util.rs @@ -1,9 +1,11 @@ use std::collections::HashMap; -use arrow::datatypes::{Field, Schema, SchemaRef}; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use arrow::record_batch::RecordBatch; +use super::ExecutorError; use crate::planner_v2::BoundExpression; -use crate::types_v2::LogicalType; +use crate::types_v2::{LogicalType, ScalarValue}; pub struct SchemaUtil; @@ -25,3 +27,15 @@ impl SchemaUtil { SchemaRef::new(Schema::new_with_metadata(fields, HashMap::new())) } } + +pub struct RecordBatchUtil; + +impl RecordBatchUtil { + pub fn new_one_row_dummy_batch() -> Result { + let fields = vec![Field::new("dummy", DataType::Boolean, true)]; + let schema = SchemaRef::new(Schema::new_with_metadata(fields, HashMap::new())); + let array = ScalarValue::Boolean(Some(true)).to_array(); + let batch = RecordBatch::try_new(schema, vec![array])?; + Ok(batch) + } +} diff --git a/src/execution/volcano_executor/limit.rs b/src/execution/volcano_executor/limit.rs new file mode 100644 index 0000000..710a619 --- /dev/null +++ b/src/execution/volcano_executor/limit.rs @@ -0,0 +1,122 @@ +use std::sync::Arc; + +use arrow::record_batch::RecordBatch; +use derive_new::new; +use futures_async_stream::try_stream; + +use crate::execution::{BoxedExecutor, ExecutionContext, ExecutorError, PhysicalLimit}; + +#[derive(new)] +pub struct Limit { + pub(crate) plan: PhysicalLimit, + pub(crate) child: BoxedExecutor, +} + +impl Limit { + #[try_stream(boxed, ok = RecordBatch, error = ExecutorError)] + pub async fn execute(self, _context: Arc) { + let limit = self.plan.limit; + + let offset_val = self.plan.offset.unwrap_or(0); + + if limit.is_some() && limit.unwrap() == 0 { + return Ok(()); + } + + let mut returned_count = 0; + + #[for_await] + for batch in self.child { + let batch = batch?; + + let cardinality = batch.num_rows() as u64; + let limit_val = limit.unwrap_or(cardinality); + + let start = returned_count.max(offset_val) - returned_count; + let end = { + // from total returned rows level, the total_end is end index of whole returned + // rows level. + let total_end = offset_val + limit_val; + let current_batch_end = returned_count + cardinality; + // we choose the min of total_end and current_batch_end as the end index of to + // match limit semantics. + let real_end = total_end.min(current_batch_end); + // to calculate the end index of current batch + real_end - returned_count + }; + + returned_count += cardinality; + + // example: offset=1000, limit=2, cardinality=100 + // when first loop: + // start = 0.max(1000)-0 = 1000 + // end = (1000+2).min(0+100)-0 = 100 + // so, start(1000) > end(100), we skip this loop batch. + if start >= end { + continue; + } + + if (start..end) == (0..cardinality) { + yield batch; + } else { + let length = end - start; + yield batch.slice(start as usize, length as usize); + } + + // dut to returned_count is always += cardinality, and returned_batch maybe slsliced, + // so it will larger than real total_end. + // example: offset=1, limit=4, cardinality=6, data=[(0..6)] + // returned_count=6 > 1+4, meanwhile returned_batch size is 4 ([0..5]) + if returned_count >= offset_val + limit_val { + break; + } + } + } +} + +#[cfg(test)] +mod tests { + use std::ops::Range; + use std::sync::Arc; + + use arrow::array::Int32Array; + use arrow::datatypes::{DataType, Field, Schema}; + use futures::{StreamExt, TryStreamExt}; + use itertools::Itertools; + use test_case::test_case; + + use super::*; + use crate::execution::PhysicalOperatorBase; + use crate::main_entry::{ClientContext, DatabaseInstance}; + + #[test_case(&[(0..6)], 1, 4, &[(1..5)])] + #[test_case(&[(0..6)], 0, 10, &[(0..6)])] + #[test_case(&[(0..6)], 10, 0, &[])] + #[test_case(&[(0..2), (2..4), (4..6)], 1, 4, &[(1..2),(2..4),(4..5)])] + #[test_case(&[(0..2), (2..4), (4..6)], 1, 2, &[(1..2),(2..3)])] + #[test_case(&[(0..2), (2..4), (4..6)], 3, 0, &[])] + #[tokio::test] + async fn limit( + inputs: &'static [Range], + offset: u64, + limit: u64, + outputs: &'static [Range], + ) { + let executor = Limit { + plan: PhysicalLimit::new(PhysicalOperatorBase::default(), Some(limit), Some(offset)), + child: futures::stream::iter(inputs.iter().map(range_to_chunk).map(Ok)).boxed(), + }; + let ctx = Arc::new(ExecutionContext::new(ClientContext::new(Arc::new( + DatabaseInstance::default(), + )))); + let actual = executor.execute(ctx).try_collect::>().await.unwrap(); + let outputs = outputs.iter().map(range_to_chunk).collect_vec(); + assert_eq!(actual, outputs); + } + + fn range_to_chunk(range: &Range) -> RecordBatch { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let data: Vec<_> = range.clone().collect(); + RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(data))]).unwrap() + } +} diff --git a/src/execution/volcano_executor/mod.rs b/src/execution/volcano_executor/mod.rs index 9ee0806..705e7b0 100644 --- a/src/execution/volcano_executor/mod.rs +++ b/src/execution/volcano_executor/mod.rs @@ -4,6 +4,7 @@ mod dummy_scan; mod expression_scan; mod filter; mod insert; +mod limit; mod projection; mod table_scan; use std::sync::Arc; @@ -17,6 +18,7 @@ pub use filter::*; use futures::stream::BoxStream; use futures::TryStreamExt; pub use insert::*; +pub use limit::*; pub use projection::*; pub use table_scan::*; @@ -60,6 +62,11 @@ impl VolcanoExecutor { let child_executor = self.build(child, context.clone()); Filter::new(op, child_executor).execute(context) } + PhysicalOperator::PhysicalLimit(op) => { + let child = op.base.children.first().unwrap().clone(); + let child_executor = self.build(child, context.clone()); + Limit::new(op, child_executor).execute(context) + } } } diff --git a/src/planner_v2/binder/errors.rs b/src/planner_v2/binder/errors.rs index 39290dd..db6903f 100644 --- a/src/planner_v2/binder/errors.rs +++ b/src/planner_v2/binder/errors.rs @@ -1,4 +1,5 @@ use crate::catalog_v2::CatalogError; +use crate::execution::ExecutorError; use crate::function::FunctionError; #[derive(thiserror::Error, Debug)] @@ -31,4 +32,10 @@ pub enum BindError { #[source] FunctionError, ), + #[error("executor error: {0}")] + ExecutorError( + #[from] + #[source] + ExecutorError, + ), } diff --git a/src/planner_v2/binder/query_node/bind_result_modifier.rs b/src/planner_v2/binder/query_node/bind_result_modifier.rs new file mode 100644 index 0000000..9562af3 --- /dev/null +++ b/src/planner_v2/binder/query_node/bind_result_modifier.rs @@ -0,0 +1,80 @@ +use derive_new::new; +use sqlparser::ast::{Expr, Query}; + +use crate::execution::ExpressionExecutor; +use crate::planner_v2::{ + BindError, Binder, BoundCastExpression, BoundExpression, ExpressionBinder, +}; +use crate::types_v2::{LogicalType, ScalarValue}; + +#[derive(Debug)] +pub enum BoundResultModifier { + BoundLimitModifier(BoundLimitModifier), +} + +#[derive(new, Debug)] +pub struct BoundLimitModifier { + pub(crate) limit_value: u64, + pub(crate) offsert_value: u64, + pub(crate) limit: Option, + pub(crate) offset: Option, +} + +impl Binder { + fn bind_delimiter( + expr_binder: &mut ExpressionBinder, + expr: &Expr, + ) -> Result { + let bound_expr = expr_binder.bind_expression(expr, &mut vec![], &mut vec![])?; + let new_expr = + BoundCastExpression::try_add_cast_to_type(bound_expr, LogicalType::UBigint, false)?; + Ok(new_expr) + } + + fn cast_delimiter_val(val: ScalarValue) -> u64 { + match val { + ScalarValue::UInt64(Some(v)) => v, + _ => unreachable!("delimiter val must be int64 due to previous cast"), + } + } + + pub fn bind_limit_modifier( + &mut self, + query: &Query, + ) -> Result, BindError> { + let mut expr_binder = ExpressionBinder::new(self); + let limit = query + .limit + .as_ref() + .map(|expr| Self::bind_delimiter(&mut expr_binder, expr)) + .transpose()?; + let limit_value = if let Some(limit_expr) = &limit { + let val = ExpressionExecutor::execute_scalar(limit_expr)?; + Self::cast_delimiter_val(val) + } else { + u64::max_value() + }; + + let offset = query + .offset + .as_ref() + .map(|expr| Self::bind_delimiter(&mut expr_binder, &expr.value)) + .transpose()?; + let offsert_value = if let Some(offset_expr) = &offset { + let val = ExpressionExecutor::execute_scalar(offset_expr)?; + Self::cast_delimiter_val(val) + } else { + 0 + }; + + let modifier = if limit.is_none() && offset.is_none() { + None + } else { + Some(BoundResultModifier::BoundLimitModifier( + BoundLimitModifier::new(limit_value, offsert_value, limit, offset), + )) + }; + + Ok(modifier) + } +} diff --git a/src/planner_v2/binder/query_node/bind_select_node.rs b/src/planner_v2/binder/query_node/bind_select_node.rs index 6e050f5..592c37e 100644 --- a/src/planner_v2/binder/query_node/bind_select_node.rs +++ b/src/planner_v2/binder/query_node/bind_select_node.rs @@ -3,6 +3,7 @@ use std::collections::HashMap; use derive_new::new; use sqlparser::ast::{Ident, Query}; +use super::BoundResultModifier; use crate::planner_v2::{ BindError, Binder, BoundExpression, BoundTableRef, ColumnAliasData, ExpressionBinder, SqlparserResolver, VALUES_LIST_ALIAS, @@ -20,7 +21,6 @@ pub struct BoundSelectNode { /// The FROM clause pub(crate) from_table: BoundTableRef, /// The WHERE clause - #[allow(dead_code)] pub(crate) where_clause: Option, /// The original unparsed expressions. This is exported after binding, because the binding /// might change the expressions (e.g. when a * clause is present) @@ -29,6 +29,8 @@ pub struct BoundSelectNode { /// Index used by the LogicalProjection #[new(default)] pub(crate) projection_index: usize, + /// The result modifiers that should be applied to this query node + pub(crate) modifiers: Vec, } impl Binder { @@ -66,7 +68,15 @@ impl Binder { .try_collect::>()?; let bound_table_ref = BoundTableRef::BoundExpressionListRef(bound_expression_list_ref); - let node = BoundSelectNode::new(names, types, select_list, bound_table_ref, None, None); + let node = BoundSelectNode::new( + names, + types, + select_list, + bound_table_ref, + None, + None, + vec![], + ); Ok(node) } @@ -132,6 +142,7 @@ impl Binder { from_table, where_clause, Some(original_select_items), + vec![], )) } diff --git a/src/planner_v2/binder/query_node/mod.rs b/src/planner_v2/binder/query_node/mod.rs index 9a196c2..ab76c44 100644 --- a/src/planner_v2/binder/query_node/mod.rs +++ b/src/planner_v2/binder/query_node/mod.rs @@ -1,4 +1,8 @@ +mod bind_result_modifier; mod bind_select_node; +mod plan_result_modifier; mod plan_select_node; +pub use bind_result_modifier::*; pub use bind_select_node::*; +pub use plan_result_modifier::*; pub use plan_select_node::*; diff --git a/src/planner_v2/binder/query_node/plan_result_modifier.rs b/src/planner_v2/binder/query_node/plan_result_modifier.rs new file mode 100644 index 0000000..9ce62f4 --- /dev/null +++ b/src/planner_v2/binder/query_node/plan_result_modifier.rs @@ -0,0 +1,28 @@ +use super::BoundResultModifier; +use crate::planner_v2::{BindError, Binder, LogicalLimit, LogicalOperator, LogicalOperatorBase}; + +impl Binder { + pub fn plan_for_result_modifiers( + &mut self, + result_modifiers: Vec, + root: LogicalOperator, + ) -> Result { + let mut root_op = root; + for modifier in result_modifiers.into_iter() { + match modifier { + BoundResultModifier::BoundLimitModifier(limit) => { + let mut op = LogicalOperator::LogicalLimit(LogicalLimit::new( + LogicalOperatorBase::default(), + limit.limit_value, + limit.offsert_value, + limit.limit, + limit.offset, + )); + op.add_child(root_op); + root_op = op; + } + } + } + Ok(root_op) + } +} diff --git a/src/planner_v2/binder/query_node/plan_select_node.rs b/src/planner_v2/binder/query_node/plan_select_node.rs index 31e1d70..1ac53e0 100644 --- a/src/planner_v2/binder/query_node/plan_select_node.rs +++ b/src/planner_v2/binder/query_node/plan_select_node.rs @@ -35,6 +35,9 @@ impl Binder { node.projection_index, )); + let result_modifiers = node.modifiers; + let root = self.plan_for_result_modifiers(result_modifiers, root)?; + Ok(BoundStatement::new(root, node.types, node.names)) } diff --git a/src/planner_v2/binder/statement/bind_select.rs b/src/planner_v2/binder/statement/bind_select.rs index 0d53ddd..117f927 100644 --- a/src/planner_v2/binder/statement/bind_select.rs +++ b/src/planner_v2/binder/statement/bind_select.rs @@ -7,7 +7,10 @@ impl Binder { pub fn bind_select(&mut self, stmt: &Statement) -> Result { match stmt { Statement::Query(query) => { - let node = self.bind_select_node(query)?; + let mut node = self.bind_select_node(query)?; + if let Some(limit_modifier) = self.bind_limit_modifier(query)? { + node.modifiers.push(limit_modifier); + } self.create_plan_for_select_node(node) } _ => Err(BindError::UnsupportedStmt(format!("{:?}", stmt))), diff --git a/src/planner_v2/operator/logical_limit.rs b/src/planner_v2/operator/logical_limit.rs new file mode 100644 index 0000000..90d6390 --- /dev/null +++ b/src/planner_v2/operator/logical_limit.rs @@ -0,0 +1,13 @@ +use derive_new::new; + +use super::LogicalOperatorBase; +use crate::planner_v2::BoundExpression; + +#[derive(new, Debug, Clone)] +pub struct LogicalLimit { + pub(crate) base: LogicalOperatorBase, + pub(crate) limit_value: u64, + pub(crate) offsert_value: u64, + pub(crate) limit: Option, + pub(crate) offset: Option, +} diff --git a/src/planner_v2/operator/mod.rs b/src/planner_v2/operator/mod.rs index eba85c5..3f2be4a 100644 --- a/src/planner_v2/operator/mod.rs +++ b/src/planner_v2/operator/mod.rs @@ -7,6 +7,7 @@ mod logical_expression_get; mod logical_filter; mod logical_get; mod logical_insert; +mod logical_limit; mod logical_projection; use derive_new::new; pub use logical_create_table::*; @@ -16,6 +17,7 @@ pub use logical_expression_get::*; pub use logical_filter::*; pub use logical_get::*; pub use logical_insert::*; +pub use logical_limit::*; pub use logical_projection::*; use super::{BoundExpression, ColumnBinding}; @@ -39,6 +41,7 @@ pub enum LogicalOperator { LogicalProjection(LogicalProjection), LogicalExplain(LogicalExplain), LogicalFilter(LogicalFilter), + LogicalLimit(LogicalLimit), } impl LogicalOperator { @@ -52,6 +55,7 @@ impl LogicalOperator { LogicalOperator::LogicalDummyScan(op) => &mut op.base.children, LogicalOperator::LogicalExplain(op) => &mut op.base.children, LogicalOperator::LogicalFilter(op) => &mut op.base.children, + LogicalOperator::LogicalLimit(op) => &mut op.base.children, } } @@ -65,6 +69,21 @@ impl LogicalOperator { LogicalOperator::LogicalDummyScan(op) => &op.base.children, LogicalOperator::LogicalExplain(op) => &op.base.children, LogicalOperator::LogicalFilter(op) => &op.base.children, + LogicalOperator::LogicalLimit(op) => &op.base.children, + } + } + + pub fn add_child(&mut self, child: LogicalOperator) { + match self { + LogicalOperator::LogicalCreateTable(op) => op.base.children.push(child), + LogicalOperator::LogicalExpressionGet(op) => op.base.children.push(child), + LogicalOperator::LogicalInsert(op) => op.base.children.push(child), + LogicalOperator::LogicalGet(op) => op.base.children.push(child), + LogicalOperator::LogicalProjection(op) => op.base.children.push(child), + LogicalOperator::LogicalDummyScan(op) => op.base.children.push(child), + LogicalOperator::LogicalExplain(op) => op.base.children.push(child), + LogicalOperator::LogicalFilter(op) => op.base.children.push(child), + LogicalOperator::LogicalLimit(op) => op.base.children.push(child), } } @@ -78,6 +97,7 @@ impl LogicalOperator { LogicalOperator::LogicalDummyScan(op) => &mut op.base.expressioins, LogicalOperator::LogicalExplain(op) => &mut op.base.expressioins, LogicalOperator::LogicalFilter(op) => &mut op.base.expressioins, + LogicalOperator::LogicalLimit(op) => &mut op.base.expressioins, } } @@ -91,6 +111,7 @@ impl LogicalOperator { LogicalOperator::LogicalDummyScan(op) => &op.base.types, LogicalOperator::LogicalExplain(op) => &op.base.types, LogicalOperator::LogicalFilter(op) => &op.base.types, + LogicalOperator::LogicalLimit(op) => &op.base.types, } } @@ -113,6 +134,7 @@ impl LogicalOperator { vec![ColumnBinding::new(0, 0), ColumnBinding::new(0, 1)] } LogicalOperator::LogicalFilter(op) => op.base.children[0].get_column_bindings(), + LogicalOperator::LogicalLimit(op) => op.base.children[0].get_column_bindings(), } } @@ -145,6 +167,9 @@ impl LogicalOperator { LogicalOperator::LogicalFilter(op) => { op.base.types = op.base.children[0].types().to_vec(); } + LogicalOperator::LogicalLimit(op) => { + op.base.types = op.base.children[0].types().to_vec(); + } } } diff --git a/src/util/tree_render.rs b/src/util/tree_render.rs index 2dfd787..9e11786 100644 --- a/src/util/tree_render.rs +++ b/src/util/tree_render.rs @@ -123,6 +123,20 @@ impl TreeRender { .join(", "); format!("LogicalFilter: {}", exprs) } + LogicalOperator::LogicalLimit(op) => { + let limit = op + .limit + .as_ref() + .map(|_| format!("{}", op.limit_value)) + .unwrap_or_else(|| "None".to_string()); + + let offset = op + .offset + .as_ref() + .map(|_| format!("{}", op.offsert_value)) + .unwrap_or_else(|| "None".to_string()); + format!("LogicalLimit: limit[{}], offset[{}]", limit, offset) + } } } @@ -154,6 +168,7 @@ impl TreeRender { PhysicalOperator::PhysicalProjection(_) => "PhysicalProjection".to_string(), PhysicalOperator::PhysicalColumnDataScan(_) => "PhysicalColumnDataScan".to_string(), PhysicalOperator::PhysicalFilter(_) => "PhysicalFilter".to_string(), + PhysicalOperator::PhysicalLimit(_) => "PhysicalLimit".to_string(), } } diff --git a/tests/slt/limit.slt b/tests/slt/limit.slt index ca6fa22..9fa4ef2 100644 --- a/tests/slt/limit.slt +++ b/tests/slt/limit.slt @@ -28,3 +28,41 @@ select id from employee limit 2 ---- 1 2 + + +onlyif sqlrs_v2 +statement ok +create table t1(v1 int, v2 int, v3 int); +insert into t1(v1, v2, v3) values (0, 4, 1), (1, 5, 2), (2, 6, 3), (3, 7, 4), (4, 8, 5); + +onlyif sqlrs_v2 +query I +select v1 from t1 limit 2 offset 1; +---- +1 +2 + +onlyif sqlrs_v2 +query I +select v1 from t1 limit 1 offset 10; +---- + +onlyif sqlrs_v2 +query I +select v1 from t1 limit 0 offset 0; +---- + +onlyif sqlrs_v2 +query I +select v1 from t1 offset 2; +---- +2 +3 +4 + +onlyif sqlrs_v2 +query I +select v1 from t1 limit 2; +---- +0 +1