Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion src/execution/expression_executor.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use arrow::array::ArrayRef;
use arrow::datatypes::DataType;
use arrow::record_batch::RecordBatch;

use super::ExecutorError;
use super::{ExecutorError, RecordBatchUtil};
use crate::planner_v2::BoundExpression;
use crate::types_v2::ScalarValue;

/// ExpressionExecutor is responsible for executing a set of expressions and storing the result in a
/// data chunk
Expand All @@ -20,6 +22,16 @@ impl ExpressionExecutor {
Ok(result)
}

pub fn execute_scalar(expression: &BoundExpression) -> Result<ScalarValue, ExecutorError> {
let input = RecordBatchUtil::new_one_row_dummy_batch()?;
let res = Self::execute(&[expression.clone()], &input)?;
assert!(res.len() == 1);
let col = res.get(0).unwrap();
assert_eq!(DataType::from(expression.return_type()), *col.data_type());
let val = ScalarValue::try_from_array(col, 0)?;
Ok(val)
}

fn execute_internal(
expr: &BoundExpression,
input: &RecordBatch,
Expand Down
4 changes: 4 additions & 0 deletions src/execution/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mod physical_explain;
mod physical_expression_scan;
mod physical_filter;
mod physical_insert;
mod physical_limit;
mod physical_projection;
mod physical_table_scan;

Expand All @@ -16,6 +17,7 @@ pub use physical_explain::*;
pub use physical_expression_scan::*;
pub use physical_filter::*;
pub use physical_insert::*;
pub use physical_limit::*;
pub use physical_projection::*;
pub use physical_table_scan::*;

Expand All @@ -38,6 +40,7 @@ pub enum PhysicalOperator {
PhysicalProjection(PhysicalProjection),
PhysicalColumnDataScan(PhysicalColumnDataScan),
PhysicalFilter(PhysicalFilter),
PhysicalLimit(PhysicalLimit),
}

impl PhysicalOperator {
Expand All @@ -51,6 +54,7 @@ impl PhysicalOperator {
PhysicalOperator::PhysicalDummyScan(op) => &op.base.children,
PhysicalOperator::PhysicalColumnDataScan(op) => &op.base.children,
PhysicalOperator::PhysicalFilter(op) => &op.base.children,
PhysicalOperator::PhysicalLimit(op) => &op.base.children,
}
}
}
21 changes: 21 additions & 0 deletions src/execution/physical_plan/physical_limit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use derive_new::new;

use super::{PhysicalOperator, PhysicalOperatorBase};
use crate::execution::PhysicalPlanGenerator;
use crate::planner_v2::LogicalLimit;

#[derive(new, Clone)]
pub struct PhysicalLimit {
pub(crate) base: PhysicalOperatorBase,
pub(crate) limit: Option<u64>,
pub(crate) offset: Option<u64>,
}

impl PhysicalPlanGenerator {
pub(crate) fn create_physical_limit(&self, op: LogicalLimit) -> PhysicalOperator {
let base = self.create_physical_operator_base(op.base);
let limit = op.limit.map(|_| op.limit_value);
let offset = op.offset.map(|_| op.offsert_value);
PhysicalOperator::PhysicalLimit(PhysicalLimit::new(base, limit, offset))
}
}
1 change: 1 addition & 0 deletions src/execution/physical_plan_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ impl PhysicalPlanGenerator {
LogicalOperator::LogicalDummyScan(op) => self.create_physical_dummy_scan(op),
LogicalOperator::LogicalExplain(op) => self.create_physical_explain(op),
LogicalOperator::LogicalFilter(op) => self.create_physical_filter(op),
LogicalOperator::LogicalLimit(op) => self.create_physical_limit(op),
}
}

Expand Down
18 changes: 16 additions & 2 deletions src/execution/util.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::collections::HashMap;

use arrow::datatypes::{Field, Schema, SchemaRef};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;

use super::ExecutorError;
use crate::planner_v2::BoundExpression;
use crate::types_v2::LogicalType;
use crate::types_v2::{LogicalType, ScalarValue};

pub struct SchemaUtil;

Expand All @@ -25,3 +27,15 @@ impl SchemaUtil {
SchemaRef::new(Schema::new_with_metadata(fields, HashMap::new()))
}
}

pub struct RecordBatchUtil;

impl RecordBatchUtil {
pub fn new_one_row_dummy_batch() -> Result<RecordBatch, ExecutorError> {
let fields = vec![Field::new("dummy", DataType::Boolean, true)];
let schema = SchemaRef::new(Schema::new_with_metadata(fields, HashMap::new()));
let array = ScalarValue::Boolean(Some(true)).to_array();
let batch = RecordBatch::try_new(schema, vec![array])?;
Ok(batch)
}
}
122 changes: 122 additions & 0 deletions src/execution/volcano_executor/limit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
use std::sync::Arc;

use arrow::record_batch::RecordBatch;
use derive_new::new;
use futures_async_stream::try_stream;

use crate::execution::{BoxedExecutor, ExecutionContext, ExecutorError, PhysicalLimit};

#[derive(new)]
pub struct Limit {
pub(crate) plan: PhysicalLimit,
pub(crate) child: BoxedExecutor,
}

impl Limit {
#[try_stream(boxed, ok = RecordBatch, error = ExecutorError)]
pub async fn execute(self, _context: Arc<ExecutionContext>) {
let limit = self.plan.limit;

let offset_val = self.plan.offset.unwrap_or(0);

if limit.is_some() && limit.unwrap() == 0 {
return Ok(());
}

let mut returned_count = 0;

#[for_await]
for batch in self.child {
let batch = batch?;

let cardinality = batch.num_rows() as u64;
let limit_val = limit.unwrap_or(cardinality);

let start = returned_count.max(offset_val) - returned_count;
let end = {
// from total returned rows level, the total_end is end index of whole returned
// rows level.
let total_end = offset_val + limit_val;
let current_batch_end = returned_count + cardinality;
// we choose the min of total_end and current_batch_end as the end index of to
// match limit semantics.
let real_end = total_end.min(current_batch_end);
// to calculate the end index of current batch
real_end - returned_count
};

returned_count += cardinality;

// example: offset=1000, limit=2, cardinality=100
// when first loop:
// start = 0.max(1000)-0 = 1000
// end = (1000+2).min(0+100)-0 = 100
// so, start(1000) > end(100), we skip this loop batch.
if start >= end {
continue;
}

if (start..end) == (0..cardinality) {
yield batch;
} else {
let length = end - start;
yield batch.slice(start as usize, length as usize);
}

// dut to returned_count is always += cardinality, and returned_batch maybe slsliced,
// so it will larger than real total_end.
// example: offset=1, limit=4, cardinality=6, data=[(0..6)]
// returned_count=6 > 1+4, meanwhile returned_batch size is 4 ([0..5])
if returned_count >= offset_val + limit_val {
break;
}
}
}
}

#[cfg(test)]
mod tests {
use std::ops::Range;
use std::sync::Arc;

use arrow::array::Int32Array;
use arrow::datatypes::{DataType, Field, Schema};
use futures::{StreamExt, TryStreamExt};
use itertools::Itertools;
use test_case::test_case;

use super::*;
use crate::execution::PhysicalOperatorBase;
use crate::main_entry::{ClientContext, DatabaseInstance};

#[test_case(&[(0..6)], 1, 4, &[(1..5)])]
#[test_case(&[(0..6)], 0, 10, &[(0..6)])]
#[test_case(&[(0..6)], 10, 0, &[])]
#[test_case(&[(0..2), (2..4), (4..6)], 1, 4, &[(1..2),(2..4),(4..5)])]
#[test_case(&[(0..2), (2..4), (4..6)], 1, 2, &[(1..2),(2..3)])]
#[test_case(&[(0..2), (2..4), (4..6)], 3, 0, &[])]
#[tokio::test]
async fn limit(
inputs: &'static [Range<i32>],
offset: u64,
limit: u64,
outputs: &'static [Range<i32>],
) {
let executor = Limit {
plan: PhysicalLimit::new(PhysicalOperatorBase::default(), Some(limit), Some(offset)),
child: futures::stream::iter(inputs.iter().map(range_to_chunk).map(Ok)).boxed(),
};
let ctx = Arc::new(ExecutionContext::new(ClientContext::new(Arc::new(
DatabaseInstance::default(),
))));
let actual = executor.execute(ctx).try_collect::<Vec<_>>().await.unwrap();
let outputs = outputs.iter().map(range_to_chunk).collect_vec();
assert_eq!(actual, outputs);
}

fn range_to_chunk(range: &Range<i32>) -> RecordBatch {
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
let data: Vec<_> = range.clone().collect();
RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(data))]).unwrap()
}
}
7 changes: 7 additions & 0 deletions src/execution/volcano_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mod dummy_scan;
mod expression_scan;
mod filter;
mod insert;
mod limit;
mod projection;
mod table_scan;
use std::sync::Arc;
Expand All @@ -17,6 +18,7 @@ pub use filter::*;
use futures::stream::BoxStream;
use futures::TryStreamExt;
pub use insert::*;
pub use limit::*;
pub use projection::*;
pub use table_scan::*;

Expand Down Expand Up @@ -60,6 +62,11 @@ impl VolcanoExecutor {
let child_executor = self.build(child, context.clone());
Filter::new(op, child_executor).execute(context)
}
PhysicalOperator::PhysicalLimit(op) => {
let child = op.base.children.first().unwrap().clone();
let child_executor = self.build(child, context.clone());
Limit::new(op, child_executor).execute(context)
}
}
}

Expand Down
7 changes: 7 additions & 0 deletions src/planner_v2/binder/errors.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::catalog_v2::CatalogError;
use crate::execution::ExecutorError;
use crate::function::FunctionError;

#[derive(thiserror::Error, Debug)]
Expand Down Expand Up @@ -31,4 +32,10 @@ pub enum BindError {
#[source]
FunctionError,
),
#[error("executor error: {0}")]
ExecutorError(
#[from]
#[source]
ExecutorError,
),
}
80 changes: 80 additions & 0 deletions src/planner_v2/binder/query_node/bind_result_modifier.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use derive_new::new;
use sqlparser::ast::{Expr, Query};

use crate::execution::ExpressionExecutor;
use crate::planner_v2::{
BindError, Binder, BoundCastExpression, BoundExpression, ExpressionBinder,
};
use crate::types_v2::{LogicalType, ScalarValue};

#[derive(Debug)]
pub enum BoundResultModifier {
BoundLimitModifier(BoundLimitModifier),
}

#[derive(new, Debug)]
pub struct BoundLimitModifier {
pub(crate) limit_value: u64,
pub(crate) offsert_value: u64,
pub(crate) limit: Option<BoundExpression>,
pub(crate) offset: Option<BoundExpression>,
}

impl Binder {
fn bind_delimiter(
expr_binder: &mut ExpressionBinder,
expr: &Expr,
) -> Result<BoundExpression, BindError> {
let bound_expr = expr_binder.bind_expression(expr, &mut vec![], &mut vec![])?;
let new_expr =
BoundCastExpression::try_add_cast_to_type(bound_expr, LogicalType::UBigint, false)?;
Ok(new_expr)
}

fn cast_delimiter_val(val: ScalarValue) -> u64 {
match val {
ScalarValue::UInt64(Some(v)) => v,
_ => unreachable!("delimiter val must be int64 due to previous cast"),
}
}

pub fn bind_limit_modifier(
&mut self,
query: &Query,
) -> Result<Option<BoundResultModifier>, BindError> {
let mut expr_binder = ExpressionBinder::new(self);
let limit = query
.limit
.as_ref()
.map(|expr| Self::bind_delimiter(&mut expr_binder, expr))
.transpose()?;
let limit_value = if let Some(limit_expr) = &limit {
let val = ExpressionExecutor::execute_scalar(limit_expr)?;
Self::cast_delimiter_val(val)
} else {
u64::max_value()
};

let offset = query
.offset
.as_ref()
.map(|expr| Self::bind_delimiter(&mut expr_binder, &expr.value))
.transpose()?;
let offsert_value = if let Some(offset_expr) = &offset {
let val = ExpressionExecutor::execute_scalar(offset_expr)?;
Self::cast_delimiter_val(val)
} else {
0
};

let modifier = if limit.is_none() && offset.is_none() {
None
} else {
Some(BoundResultModifier::BoundLimitModifier(
BoundLimitModifier::new(limit_value, offsert_value, limit, offset),
))
};

Ok(modifier)
}
}
Loading