diff --git a/rust/datafusion/README.md b/rust/datafusion/README.md index 43e99e22bb5..4ca5f502e61 100644 --- a/rust/datafusion/README.md +++ b/rust/datafusion/README.md @@ -127,59 +127,6 @@ are mapped to Arrow types according to the following table This section describes how you can get started at developing DataFusion. -## Core concepts - -### Dynamic typing - -DataFusion's memory layout is the columnar format Arrow. Because a column type is only known -at runtime, DataFusion, like Arrow's create, uses dynamic typing throughout most of its code. Thus, a central aspect of DataFusion's query engine is keeping track of an expression's `datatype`. - -### Nullability - -Arrow's columnar format natively supports the notion of null values, and DataFusion also. Like types, -DataFusion keeps track of an expression's `nullability` throughout planning and execution. - -### Field and Schema - -Arrow's implementation in rust has a `Field` that contains information about a column: - -* name -* datatype -* nullability - -A `Schema` is essentially a vector of fields. - -### parse, plan, optimize, execute - -When a query is sent to DataFusion, there are different steps that it passes through until a result is -obtained. Broadly, they are: - -1. The string is parsed to an Abstract syntax tree (AST). We use [sqlparser](https://docs.rs/sqlparser/0.6.1/sqlparser/) for this. -2. The AST is converted to a logical plan ([src/sql](src/sql/planner.rs)) -3. The logical plan is optimized to a new logical plan ([src/optimizer](src/optimizer)) -4. The logical plan is converted to a physical plan ([src/physical_plan/planner](src/physical_plan/planner.rs)) -5. The physical plan is executed ([src/execution/context.rs](src/execution/context.rs)) - -Phases 1-4 are typically cheap/fast when compared to phase 5, and thus DataFusion puts a lot of effort to ensure that phase 5 runs without errors. - -#### Logical plan - -A logical plan is a representation of the plan without details of how it is executed. In general, - -* given a data schema and a logical plan, the resulting schema is known. -* given data and a logical plan, we agree on the result, irrespectively of how it is computed. - -A logical plan is composed by nodes (called `LogicalPlan`), and each node is composed by logical expressions (called `Expr`). All of these are located in [src/logical_plan/mod.rs](src/logical_plan/mod.rs). - -#### Physical plan - -A Physical plan is a plan that can be executed. Contrarily to a logical plan, the physical plan has specific -information about how the calculation should be performed (e.g. what actual rust functions are used). - -A physical plan is composed by nodes (implement the trait `ExecutionPlan`), and each node is composed by physical expressions (implement the trait `PhysicalExpr`) or aggreagate expressions (implement the trait `AggregateExpr`). All of these are located in [src/physical_plan](src/physical_plan). - -Physical expressions are evaluated against `RecordBatch` (a group of `Array`s and a `Schema`). - ### Bootstrap environment DataFusion is written in Rust and it uses a standard rust toolkit: diff --git a/rust/datafusion/src/lib.rs b/rust/datafusion/src/lib.rs index 92652c7b874..45a73eb74b8 100644 --- a/rust/datafusion/src/lib.rs +++ b/rust/datafusion/src/lib.rs @@ -18,43 +18,135 @@ #![warn(missing_docs)] //! DataFusion is an extensible query execution framework that uses -//! Apache Arrow as the memory model. +//! [Apache Arrow](https://arrow.apache.org) as its in-memory format. //! -//! DataFusion supports both SQL and a DataFrame API for building logical query plans -//! and also provides a query optimizer and execution engine capable of parallel execution +//! DataFusion supports both an SQL and a DataFrame API for building logical query plans +//! as well as a query optimizer and execution engine capable of parallel execution //! against partitioned data sources (CSV and Parquet) using threads. //! -//! DataFusion currently supports simple projection, filter, and aggregate queries. -//! -/// [ExecutionContext](../execution/context/struct.ExecutionContext.html) is the main interface -/// for executing queries with DataFusion. -/// -/// The following example demonstrates how to use the context to execute a query against a CSV -/// data source using the DataFrame API: -/// -/// ``` -/// # use datafusion::prelude::*; -/// # use datafusion::error::Result; -/// # fn main() -> Result<()> { -/// let mut ctx = ExecutionContext::new(); -/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?; -/// let df = df.filter(col("a").lt_eq(col("b")))? -/// .aggregate(vec![col("a")], vec![min(col("b"))])? -/// .limit(100)?; -/// let results = df.collect(); -/// # Ok(()) -/// # } -/// ``` -/// -/// The following example demonstrates how to execute the same query using SQL: -/// -/// ``` -/// use datafusion::prelude::*; -/// -/// let mut ctx = ExecutionContext::new(); -/// ctx.register_csv("example", "tests/example.csv", CsvReadOptions::new()).unwrap(); -/// let results = ctx.sql("SELECT a, MIN(b) FROM example GROUP BY a LIMIT 100").unwrap(); -/// ``` +//! Below is an example of how to execute a query against a CSV using [`DataFrames`](dataframe::DataFrame): +//! +//! ```rust +//! # use datafusion::prelude::*; +//! # use datafusion::error::Result; +//! # use arrow::record_batch::RecordBatch; +//! +//! # #[tokio::main] +//! # async fn main() -> Result<()> { +//! let mut ctx = ExecutionContext::new(); +//! +//! // create the dataframe +//! let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?; +//! +//! // create a plan +//! let df = df.filter(col("a").lt_eq(col("b")))? +//! .aggregate(vec![col("a")], vec![min(col("b"))])? +//! .limit(100)?; +//! +//! // execute the plan +//! let results: Vec = df.collect().await?; +//! # Ok(()) +//! # } +//! ``` +//! +//! and how to execute a query against a CSV using SQL: +//! +//! ``` +//! # use datafusion::prelude::*; +//! # use datafusion::error::Result; +//! # use arrow::record_batch::RecordBatch; +//! +//! # #[tokio::main] +//! # async fn main() -> Result<()> { +//! let mut ctx = ExecutionContext::new(); +//! +//! ctx.register_csv("example", "tests/example.csv", CsvReadOptions::new())?; +//! +//! // create a plan +//! let df = ctx.sql("SELECT a, MIN(b) FROM example GROUP BY a LIMIT 100")?; +//! +//! // execute the plan +//! let results: Vec = df.collect().await?; +//! # Ok(()) +//! # } +//! ``` +//! +//! ## Parse, Plan, Optimize, Execute +//! +//! DataFusion is a fully fledged query engine capable of performing complex operations. +//! Specifically, when DataFusion receives an SQL query, there are different steps +//! that it passes through until a result is obtained. Broadly, they are: +//! +//! 1. The string is parsed to an Abstract syntax tree (AST) using [sqlparser](https://docs.rs/sqlparser/0.6.1/sqlparser/). +//! 2. The planner [`SqlToRel`](sql::planner::SqlToRel) converts logical expressions on the AST to logical expressions [`Expr`s](logical_plan::Expr). +//! 3. The planner [`SqlToRel`](sql::planner::SqlToRel) converts logical nodes on the AST to a [`LogicalPlan`](logical_plan::LogicalPlan). +//! 4. [`OptimizerRules`](optimizer::optimizer::OptimizerRule) are applied to the [`LogicalPlan`](logical_plan::LogicalPlan) to optimize it. +//! 5. The [`LogicalPlan`](logical_plan::LogicalPlan) is converted to an [`ExecutionPlan`](physical_plan::ExecutionPlan) by a [`PhysicalPlanner`](physical_plan::PhysicalPlanner) +//! 6. The [`ExecutionPlan`](physical_plan::ExecutionPlan) is executed against data through the [`ExecutionContext`](execution::context::ExecutionContext) +//! +//! With a [`DataFrame`](dataframe::DataFrame) API, steps 1-3 are not used as the DataFrame builds the [`LogicalPlan`](logical_plan::LogicalPlan) directly. +//! +//! Phases 1-5 are typically cheap when compared to phase 6, and thus DataFusion puts a +//! lot of effort to ensure that phase 6 runs efficiently and without errors. +//! +//! DataFusion's planning is divided in two main parts: logical planning and physical planning. +//! +//! ### Logical plan +//! +//! Logical planning yields [`logical plans`](logical_plan::LogicalPlan) and [`logical expressions`](logical_plan::Expr). +//! These are [`Schema`](arrow::datatypes::Schema)-aware traits that represent statements whose result is independent of how it should physically be executed. +//! +//! A [`LogicalPlan`](logical_plan::LogicalPlan) is a Direct Asyclic graph of other [`LogicalPlan`s](logical_plan::LogicalPlan) and each node contains logical expressions ([`Expr`s](logical_plan::Expr)). +//! All of these are located in [`logical_plan`](logical_plan). +//! +//! ### Physical plan +//! +//! A Physical plan ([`ExecutionPlan`](physical_plan::ExecutionPlan)) is a plan that can be executed against data. +//! Contrarily to a logical plan, the physical plan has concrete information about how the calculation +//! should be performed (e.g. what Rust functions are used) and how data should be loaded into memory. +//! +//! [`ExecutionPlan`](physical_plan::ExecutionPlan) uses the Arrow format as its in-memory representation of data, through the [arrow] crate. +//! We recommend going through [its documentation](arrow) for details on how the data is physically represented. +//! +//! A [`ExecutionPlan`](physical_plan::ExecutionPlan) is composed by nodes (implement the trait [`ExecutionPlan`](physical_plan::ExecutionPlan)), +//! and each node is composed by physical expressions ([`PhysicalExpr`](physical_plan::PhysicalExpr)) +//! or aggreagate expressions ([`AggregateExpr`](physical_plan::AggregateExpr)). +//! All of these are located in the module [`physical_plan`](physical_plan). +//! +//! Broadly speaking, +//! +//! * an [`ExecutionPlan`](physical_plan::ExecutionPlan) receives a partition number and asyncronosly returns +//! an iterator over [`RecordBatch`](arrow::record_batch::RecordBatch) +//! (a node-specific struct that implements [`RecordBatchReader`](arrow::record_batch::RecordBatchReader)) +//! * a [`PhysicalExpr`](physical_plan::PhysicalExpr) receives a [`RecordBatch`](arrow::record_batch::RecordBatch) +//! and returns an [`Array`](arrow::array::Array) +//! * an [`AggregateExpr`](physical_plan::AggregateExpr) receives [`RecordBatch`es](arrow::record_batch::RecordBatch) +//! and returns a [`RecordBatch`](arrow::record_batch::RecordBatch) of a single row(*) +//! +//! (*) Technically, it aggregates the results on each partition and then merges the results into a single partition. +//! +//! The following physical nodes are currently implemented: +//! +//! * Projection: [`ProjectionExec`](physical_plan::projection::ProjectionExec) +//! * Filter: [`FilterExec`](physical_plan::filter::FilterExec) +//! * Hash and Grouped aggregations: [`HashAggregateExec`](physical_plan::hash_aggregate::HashAggregateExec) +//! * Sort: [`SortExec`](physical_plan::sort::SortExec) +//! * Merge (partitions): [`MergeExec`](physical_plan::merge::MergeExec) +//! * Limit: [`LocalLimitExec`](physical_plan::limit::LocalLimitExec) and [`GlobalLimitExec`](physical_plan::limit::GlobalLimitExec) +//! * Scan a CSV: [`CsvExec`](physical_plan::csv::CsvExec) +//! * Scan a Parquet: [`ParquetExec`](physical_plan::parquet::ParquetExec) +//! * Scan from memory: [`MemoryExec`](physical_plan::memory::MemoryExec) +//! * Explain the plan: [`ExplainExec`](physical_plan::explain::ExplainExec) +//! +//! ## Customize +//! +//! DataFusion allows users to +//! * extend the planner to use user-defined logical and physical nodes ([`QueryPlanner`](execution::context::QueryPlanner)) +//! * declare and use user-defined scalar functions ([`ScalarUDF`](physical_plan::udf::ScalarUDF)) +//! * declare and use user-defined aggregate functions ([`AggregateUDF`](physical_plan::udaf::AggregateUDF)) +//! +//! you can find examples of each of them in examples section. + extern crate arrow; extern crate sqlparser;