Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 0 additions & 53 deletions rust/datafusion/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,59 +127,6 @@ are mapped to Arrow types according to the following table

This section describes how you can get started at developing DataFusion.

## Core concepts

### Dynamic typing

DataFusion's memory layout is the columnar format Arrow. Because a column type is only known
at runtime, DataFusion, like Arrow's create, uses dynamic typing throughout most of its code. Thus, a central aspect of DataFusion's query engine is keeping track of an expression's `datatype`.

### Nullability

Arrow's columnar format natively supports the notion of null values, and DataFusion also. Like types,
DataFusion keeps track of an expression's `nullability` throughout planning and execution.

### Field and Schema

Arrow's implementation in rust has a `Field` that contains information about a column:

* name
* datatype
* nullability

A `Schema` is essentially a vector of fields.

### parse, plan, optimize, execute

When a query is sent to DataFusion, there are different steps that it passes through until a result is
obtained. Broadly, they are:

1. The string is parsed to an Abstract syntax tree (AST). We use [sqlparser](https://docs.rs/sqlparser/0.6.1/sqlparser/) for this.
2. The AST is converted to a logical plan ([src/sql](src/sql/planner.rs))
3. The logical plan is optimized to a new logical plan ([src/optimizer](src/optimizer))
4. The logical plan is converted to a physical plan ([src/physical_plan/planner](src/physical_plan/planner.rs))
5. The physical plan is executed ([src/execution/context.rs](src/execution/context.rs))

Phases 1-4 are typically cheap/fast when compared to phase 5, and thus DataFusion puts a lot of effort to ensure that phase 5 runs without errors.

#### Logical plan

A logical plan is a representation of the plan without details of how it is executed. In general,

* given a data schema and a logical plan, the resulting schema is known.
* given data and a logical plan, we agree on the result, irrespectively of how it is computed.

A logical plan is composed by nodes (called `LogicalPlan`), and each node is composed by logical expressions (called `Expr`). All of these are located in [src/logical_plan/mod.rs](src/logical_plan/mod.rs).

#### Physical plan

A Physical plan is a plan that can be executed. Contrarily to a logical plan, the physical plan has specific
information about how the calculation should be performed (e.g. what actual rust functions are used).

A physical plan is composed by nodes (implement the trait `ExecutionPlan`), and each node is composed by physical expressions (implement the trait `PhysicalExpr`) or aggreagate expressions (implement the trait `AggregateExpr`). All of these are located in [src/physical_plan](src/physical_plan).

Physical expressions are evaluated against `RecordBatch` (a group of `Array`s and a `Schema`).

### Bootstrap environment

DataFusion is written in Rust and it uses a standard rust toolkit:
Expand Down
160 changes: 126 additions & 34 deletions rust/datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,43 +18,135 @@
#![warn(missing_docs)]

//! DataFusion is an extensible query execution framework that uses
//! Apache Arrow as the memory model.
//! [Apache Arrow](https://arrow.apache.org) as its in-memory format.
//!
//! DataFusion supports both SQL and a DataFrame API for building logical query plans
//! and also provides a query optimizer and execution engine capable of parallel execution
//! DataFusion supports both an SQL and a DataFrame API for building logical query plans
//! as well as a query optimizer and execution engine capable of parallel execution
//! against partitioned data sources (CSV and Parquet) using threads.
//!
//! DataFusion currently supports simple projection, filter, and aggregate queries.
//!
/// [ExecutionContext](../execution/context/struct.ExecutionContext.html) is the main interface
/// for executing queries with DataFusion.
///
/// The following example demonstrates how to use the context to execute a query against a CSV
/// data source using the DataFrame API:
///
/// ```
/// # use datafusion::prelude::*;
/// # use datafusion::error::Result;
/// # fn main() -> Result<()> {
/// let mut ctx = ExecutionContext::new();
/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
/// let df = df.filter(col("a").lt_eq(col("b")))?
/// .aggregate(vec![col("a")], vec![min(col("b"))])?
/// .limit(100)?;
/// let results = df.collect();
/// # Ok(())
/// # }
/// ```
///
/// The following example demonstrates how to execute the same query using SQL:
///
/// ```
/// use datafusion::prelude::*;
///
/// let mut ctx = ExecutionContext::new();
/// ctx.register_csv("example", "tests/example.csv", CsvReadOptions::new()).unwrap();
/// let results = ctx.sql("SELECT a, MIN(b) FROM example GROUP BY a LIMIT 100").unwrap();
/// ```
//! Below is an example of how to execute a query against a CSV using [`DataFrames`](dataframe::DataFrame):
//!
//! ```rust
//! # use datafusion::prelude::*;
//! # use datafusion::error::Result;
//! # use arrow::record_batch::RecordBatch;
//!
//! # #[tokio::main]
//! # async fn main() -> Result<()> {
//! let mut ctx = ExecutionContext::new();
//!
//! // create the dataframe
//! let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
//!
//! // create a plan
//! let df = df.filter(col("a").lt_eq(col("b")))?
//! .aggregate(vec![col("a")], vec![min(col("b"))])?
//! .limit(100)?;
//!
//! // execute the plan
//! let results: Vec<RecordBatch> = df.collect().await?;
//! # Ok(())
//! # }
//! ```
//!
//! and how to execute a query against a CSV using SQL:
//!
//! ```
//! # use datafusion::prelude::*;
//! # use datafusion::error::Result;
//! # use arrow::record_batch::RecordBatch;
//!
//! # #[tokio::main]
//! # async fn main() -> Result<()> {
//! let mut ctx = ExecutionContext::new();
//!
//! ctx.register_csv("example", "tests/example.csv", CsvReadOptions::new())?;
//!
//! // create a plan
//! let df = ctx.sql("SELECT a, MIN(b) FROM example GROUP BY a LIMIT 100")?;
//!
//! // execute the plan
//! let results: Vec<RecordBatch> = df.collect().await?;
//! # Ok(())
//! # }
//! ```
//!
//! ## Parse, Plan, Optimize, Execute
//!
//! DataFusion is a fully fledged query engine capable of performing complex operations.
//! Specifically, when DataFusion receives an SQL query, there are different steps
//! that it passes through until a result is obtained. Broadly, they are:
//!
//! 1. The string is parsed to an Abstract syntax tree (AST) using [sqlparser](https://docs.rs/sqlparser/0.6.1/sqlparser/).
//! 2. The planner [`SqlToRel`](sql::planner::SqlToRel) converts logical expressions on the AST to logical expressions [`Expr`s](logical_plan::Expr).
//! 3. The planner [`SqlToRel`](sql::planner::SqlToRel) converts logical nodes on the AST to a [`LogicalPlan`](logical_plan::LogicalPlan).
//! 4. [`OptimizerRules`](optimizer::optimizer::OptimizerRule) are applied to the [`LogicalPlan`](logical_plan::LogicalPlan) to optimize it.
//! 5. The [`LogicalPlan`](logical_plan::LogicalPlan) is converted to an [`ExecutionPlan`](physical_plan::ExecutionPlan) by a [`PhysicalPlanner`](physical_plan::PhysicalPlanner)
//! 6. The [`ExecutionPlan`](physical_plan::ExecutionPlan) is executed against data through the [`ExecutionContext`](execution::context::ExecutionContext)
//!
//! With a [`DataFrame`](dataframe::DataFrame) API, steps 1-3 are not used as the DataFrame builds the [`LogicalPlan`](logical_plan::LogicalPlan) directly.
//!
//! Phases 1-5 are typically cheap when compared to phase 6, and thus DataFusion puts a
//! lot of effort to ensure that phase 6 runs efficiently and without errors.
//!
//! DataFusion's planning is divided in two main parts: logical planning and physical planning.
//!
//! ### Logical plan
//!
//! Logical planning yields [`logical plans`](logical_plan::LogicalPlan) and [`logical expressions`](logical_plan::Expr).
//! These are [`Schema`](arrow::datatypes::Schema)-aware traits that represent statements whose result is independent of how it should physically be executed.
//!
//! A [`LogicalPlan`](logical_plan::LogicalPlan) is a Direct Asyclic graph of other [`LogicalPlan`s](logical_plan::LogicalPlan) and each node contains logical expressions ([`Expr`s](logical_plan::Expr)).
//! All of these are located in [`logical_plan`](logical_plan).
//!
//! ### Physical plan
//!
//! A Physical plan ([`ExecutionPlan`](physical_plan::ExecutionPlan)) is a plan that can be executed against data.
//! Contrarily to a logical plan, the physical plan has concrete information about how the calculation
//! should be performed (e.g. what Rust functions are used) and how data should be loaded into memory.
//!
//! [`ExecutionPlan`](physical_plan::ExecutionPlan) uses the Arrow format as its in-memory representation of data, through the [arrow] crate.
//! We recommend going through [its documentation](arrow) for details on how the data is physically represented.
//!
//! A [`ExecutionPlan`](physical_plan::ExecutionPlan) is composed by nodes (implement the trait [`ExecutionPlan`](physical_plan::ExecutionPlan)),
//! and each node is composed by physical expressions ([`PhysicalExpr`](physical_plan::PhysicalExpr))
//! or aggreagate expressions ([`AggregateExpr`](physical_plan::AggregateExpr)).
//! All of these are located in the module [`physical_plan`](physical_plan).
//!
//! Broadly speaking,
//!
//! * an [`ExecutionPlan`](physical_plan::ExecutionPlan) receives a partition number and asyncronosly returns
//! an iterator over [`RecordBatch`](arrow::record_batch::RecordBatch)
//! (a node-specific struct that implements [`RecordBatchReader`](arrow::record_batch::RecordBatchReader))
//! * a [`PhysicalExpr`](physical_plan::PhysicalExpr) receives a [`RecordBatch`](arrow::record_batch::RecordBatch)
//! and returns an [`Array`](arrow::array::Array)
//! * an [`AggregateExpr`](physical_plan::AggregateExpr) receives [`RecordBatch`es](arrow::record_batch::RecordBatch)
//! and returns a [`RecordBatch`](arrow::record_batch::RecordBatch) of a single row(*)
//!
//! (*) Technically, it aggregates the results on each partition and then merges the results into a single partition.
//!
//! The following physical nodes are currently implemented:
//!
//! * Projection: [`ProjectionExec`](physical_plan::projection::ProjectionExec)
//! * Filter: [`FilterExec`](physical_plan::filter::FilterExec)
//! * Hash and Grouped aggregations: [`HashAggregateExec`](physical_plan::hash_aggregate::HashAggregateExec)
//! * Sort: [`SortExec`](physical_plan::sort::SortExec)
//! * Merge (partitions): [`MergeExec`](physical_plan::merge::MergeExec)
//! * Limit: [`LocalLimitExec`](physical_plan::limit::LocalLimitExec) and [`GlobalLimitExec`](physical_plan::limit::GlobalLimitExec)
//! * Scan a CSV: [`CsvExec`](physical_plan::csv::CsvExec)
//! * Scan a Parquet: [`ParquetExec`](physical_plan::parquet::ParquetExec)
//! * Scan from memory: [`MemoryExec`](physical_plan::memory::MemoryExec)
//! * Explain the plan: [`ExplainExec`](physical_plan::explain::ExplainExec)
//!
//! ## Customize
//!
//! DataFusion allows users to
//! * extend the planner to use user-defined logical and physical nodes ([`QueryPlanner`](execution::context::QueryPlanner))
//! * declare and use user-defined scalar functions ([`ScalarUDF`](physical_plan::udf::ScalarUDF))
//! * declare and use user-defined aggregate functions ([`AggregateUDF`](physical_plan::udaf::AggregateUDF))
//!
//! you can find examples of each of them in examples section.

extern crate arrow;
extern crate sqlparser;

Expand Down