Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,9 @@ jobs:
- name: Verify Working Directory Clean
run: git diff --exit-code

# Run doc tests
# Run `cargo test doc` (test documentation examples)
linux-test-doc:
name: cargo doctest (amd64)
name: cargo test doc (amd64)
needs: [ linux-build-lib ]
runs-on: ubuntu-latest
container:
Expand All @@ -157,6 +157,24 @@ jobs:
- name: Verify Working Directory Clean
run: git diff --exit-code

# Run `cargo doc` to ensure the rustdoc is clean
linux-rustdoc:
name: cargo doc
needs: [ linux-build-lib ]
runs-on: ubuntu-latest
container:
image: amd64/rust
steps:
- uses: actions/checkout@v3
- name: Setup Rust toolchain
uses: ./.github/actions/setup-builder
with:
rust-version: stable
- name: Run cargo doc
run: |
export RUSTDOCFLAGS="-D warnings -A rustdoc::private-intra-doc-links"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

cargo doc --document-private-items --no-deps --workspace

# verify that the benchmark queries return the correct results
verify-benchmark-results:
name: verify benchmark results (amd64)
Expand Down
6 changes: 6 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,13 +306,19 @@ config_namespace! {
/// Should DataFusion execute sorts in a per-partition fashion and merge
/// afterwards instead of coalescing first and sorting globally.
/// With this flag is enabled, plans in the form below
///
/// ```text
/// "SortExec: [a@0 ASC]",
/// " CoalescePartitionsExec",
/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
/// ```
/// would turn into the plan below which performs better in multithreaded environments
///
/// ```text
/// "SortPreservingMergeExec: [a@0 ASC]",
/// " SortExec: [a@0 ASC]",
/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
/// ```
pub repartition_sorts: bool, default = true

/// When set to true, the logical plan optimizer will produce warning
Expand Down
35 changes: 24 additions & 11 deletions datafusion/common/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,29 @@
// specific language governing permissions and limitations
// under the License.

//! This module provides common traits for visiting or rewriting tree nodes easily.
//! This module provides common traits for visiting or rewriting tree
//! data structures easily.

use std::sync::Arc;

use crate::Result;

/// Trait for tree node. It can be [`ExecutionPlan`], [`PhysicalExpr`], [`LogicalPlan`], [`Expr`], etc.
/// Defines a visitable and rewriteable a tree node. This trait is
/// implemented for plans ([`ExecutionPlan`] and [`LogicalPlan`]) as
/// well as expression trees ([`PhysicalExpr`], [`Expr`]) in
/// DataFusion
///
/// <!-- Since these are in the datafusion-common crate, can't use intra doc links) -->
/// [`ExecutionPlan`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html
/// [`PhysicalExpr`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.PhysicalExpr.html
/// [`LogicalPlan`]: https://docs.rs/datafusion-expr/latest/datafusion_expr/logical_plan/enum.LogicalPlan.html
/// [`Expr`]: https://docs.rs/datafusion-expr/latest/datafusion_expr/expr/enum.Expr.html
pub trait TreeNode: Sized {
/// Use preorder to iterate the node on the tree so that we can stop fast for some cases.
/// Use preorder to iterate the node on the tree so that we can
/// stop fast for some cases.
///
/// [`op`] can be used to collect some info from the tree node
/// or do some checking for the tree node.
/// The `op` closure can be used to collect some info from the
/// tree node or do some checking for the tree node.
fn apply<F>(&self, op: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>,
Expand Down Expand Up @@ -68,7 +79,8 @@ pub trait TreeNode: Sized {
/// children of that node will be visited, nor is post_visit
/// called on that node. Details see [`TreeNodeVisitor`]
///
/// If using the default [`post_visit`] with nothing to do, the [`apply`] should be preferred
/// If using the default [`TreeNodeVisitor::post_visit`] that does
/// nothing, [`Self::apply`] should be preferred.
fn visit<V: TreeNodeVisitor<N = Self>>(
&self,
visitor: &mut V,
Expand Down Expand Up @@ -152,7 +164,8 @@ pub trait TreeNode: Sized {
/// children of that node will be visited, nor is mutate
/// called on that node
///
/// If using the default [`pre_visit`] with [`true`] returned, the [`transform`] should be preferred
/// If using the default [`TreeNodeRewriter::pre_visit`] which
/// returns `true`, [`Self::transform`] should be preferred.
fn rewrite<R: TreeNodeRewriter<N = Self>>(self, rewriter: &mut R) -> Result<Self> {
let need_mutate = match rewriter.pre_visit(&self)? {
RewriteRecursion::Mutate => return rewriter.mutate(self),
Expand Down Expand Up @@ -190,8 +203,8 @@ pub trait TreeNode: Sized {
/// tree and makes it easier to add new types of tree node and
/// algorithms.
///
/// When passed to[`TreeNode::visit`], [`TreeNode::pre_visit`]
/// and [`TreeNode::post_visit`] are invoked recursively
/// When passed to[`TreeNode::visit`], [`TreeNodeVisitor::pre_visit`]
/// and [`TreeNodeVisitor::post_visit`] are invoked recursively
/// on an node tree.
///
/// If an [`Err`] result is returned, recursion is stopped
Expand Down Expand Up @@ -239,7 +252,7 @@ pub trait TreeNodeRewriter: Sized {
fn mutate(&mut self, node: Self::N) -> Result<Self::N>;
}

/// Controls how the [TreeNode] recursion should proceed for [`rewrite`].
/// Controls how the [`TreeNode`] recursion should proceed for [`TreeNode::rewrite`].
#[derive(Debug)]
pub enum RewriteRecursion {
/// Continue rewrite this node tree.
Expand All @@ -252,7 +265,7 @@ pub enum RewriteRecursion {
Skip,
}

/// Controls how the [TreeNode] recursion should proceed for [`visit`].
/// Controls how the [`TreeNode`] recursion should proceed for [`TreeNode::visit`].
#[derive(Debug)]
pub enum VisitRecursion {
/// Continue the visit to this node tree.
Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ pub(crate) fn parse_identifiers(s: &str) -> Result<Vec<Ident>> {
Ok(idents)
}

/// Construct a new Vec<ArrayRef> from the rows of the `arrays` at the `indices`.
/// Construct a new [`Vec`] of [`ArrayRef`] from the rows of the `arrays` at the `indices`.
pub fn get_arrayref_at_indices(
arrays: &[ArrayRef],
indices: &PrimitiveArray<UInt32Type>,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@
//! [`AggregateUDF`]: physical_plan::udaf::AggregateUDF
//! [`QueryPlanner`]: execution::context::QueryPlanner
//! [`OptimizerRule`]: datafusion_optimizer::optimizer::OptimizerRule
//! [`PhysicalOptimizerRule`]: datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule
//! [`PhysicalOptimizerRule`]: crate::physical_optimizer::optimizer::PhysicalOptimizerRule
//!
//! # Code Organization
//!
Expand Down
3 changes: 3 additions & 0 deletions datafusion/core/src/physical_optimizer/pipeline_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ pub fn check_finiteness_requirements(
/// data pruning. For this to be possible, it needs to have a filter where
/// all involved [`PhysicalExpr`]s, [`Operator`]s and data types support
/// interval calculations.
///
/// [`PhysicalExpr`]: crate::physical_plan::PhysicalExpr
/// [`Operator`]: datafusion_expr::Operator
fn is_prunable(join: &SymmetricHashJoinExec) -> bool {
join.filter().map_or(false, |filter| {
check_support(filter.expression())
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ fn build_is_null_column_expr(
/// expression that will evaluate to FALSE if it can be determined no
/// rows between the min/max values could pass the predicates.
///
/// Returns the pruning predicate as an [`Expr`]
/// Returns the pruning predicate as an [`PhysicalExpr`]
fn build_predicate_expression(
expr: &Arc<dyn PhysicalExpr>,
schema: &Schema,
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/physical_optimizer/sort_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ use std::sync::Arc;
/// This is a "data class" we use within the [`EnforceSorting`] rule to push
/// down [`SortExec`] in the plan. In some cases, we can reduce the total
/// computational cost by pushing down `SortExec`s through some executors.
///
/// [`EnforceSorting`]: crate::physical_optimizer::sort_enforcement::EnforceSorting
#[derive(Debug, Clone)]
pub(crate) struct SortPushDown {
/// Current plan
Expand Down
4 changes: 3 additions & 1 deletion datafusion/core/src/physical_plan/aggregates/no_grouping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ pub(crate) struct AggregateStream {
///
/// This is wrapped into yet another struct because we need to interact with the async memory management subsystem
/// during poll. To have as little code "weirdness" as possible, we chose to just use [`BoxStream`] together with
/// [`futures::stream::unfold`]. The latter requires a state object, which is [`GroupedHashAggregateStreamV2Inner`].
/// [`futures::stream::unfold`].
///
/// The latter requires a state object, which is [`AggregateStreamInner`].
struct AggregateStreamInner {
schema: SchemaRef,
mode: AggregateMode,
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/physical_plan/file_format/file_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,13 @@ enum FileStreamState {
/// Currently performing asynchronous IO to obtain a stream of RecordBatch
/// for a given parquet file
Open {
/// A [`FileOpenFuture`] returned by [`FormatReader::open`]
/// A [`FileOpenFuture`] returned by [`FileOpener::open`]
future: FileOpenFuture,
/// The partition values for this file
partition_values: Vec<ScalarValue>,
},
/// Scanning the [`BoxStream`] returned by the completion of a [`FileOpenFuture`]
/// returned by [`FormatReader::open`]
/// returned by [`FileOpener::open`]
Scan {
/// Partitioning column values for the current batch_iter
partition_values: Vec<ScalarValue>,
Expand Down Expand Up @@ -149,7 +149,7 @@ impl StartableTime {
struct FileStreamMetrics {
/// Wall clock time elapsed for file opening.
///
/// Time between when [`FileReader::open`] is called and when the
/// Time between when [`FileOpener::open`] is called and when the
/// [`FileStream`] receives a stream for reading.
///
/// If there are multiple files being scanned, the stream
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/physical_plan/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ use super::{ColumnStatistics, Statistics};
/// different dictionary type.
///
/// Use [`wrap_partition_value_in_dict`] to wrap a [`ScalarValue`] in the same say.
///
/// [`ListingTable`]: crate::datasource::listing::ListingTable
pub fn wrap_partition_type_in_dict(val_type: DataType) -> DataType {
DataType::Dictionary(Box::new(DataType::UInt16), Box::new(val_type))
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ fn make_output_ordering_string(ordering: &[PhysicalSortExpr]) -> String {
w
}

/// Implements [`FormatReader`] for a parquet file
/// Implements [`FileOpener`] for a parquet file
struct ParquetOpener {
partition_index: usize,
projection: Arc<[usize]>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ impl SymmetricHashJoinExec {
/// # Error
/// This function errors when:
/// - It is not possible to join the left and right sides on keys `on`, or
/// - It fails to construct [SortedFilterExpr]s, or
/// - It fails to construct `SortedFilterExpr`s, or
/// - It fails to create the [ExprIntervalGraph].
pub fn try_new(
left: Arc<dyn ExecutionPlan>,
Expand Down
11 changes: 6 additions & 5 deletions datafusion/core/src/physical_plan/joins/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -735,14 +735,15 @@ pub(crate) fn need_produce_result_in_final(join_type: JoinType) -> bool {
)
}

/// In the end of join execution, need to use bit map of the matched indices to generate the final left and
/// right indices.
/// In the end of join execution, need to use bit map of the matched
/// indices to generate the final left and right indices.
///
/// For example:
/// left_bit_map: [true, false, true, true, false]
/// join_type: `Left`
///
/// The result is: ([1,4], [null, null])
/// 1. left_bit_map: `[true, false, true, true, false]`
/// 2. join_type: `Left`
///
/// The result is: `([1,4], [null, null])`
pub(crate) fn get_final_indices_from_bit_map(
left_bit_map: &BooleanBufferBuilder,
join_type: JoinType,
Expand Down
10 changes: 5 additions & 5 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1292,16 +1292,16 @@ impl DefaultPhysicalPlanner {
}
}

/// Expand and align a GROUPING SET expression.
/// (see https://www.postgresql.org/docs/current/queries-table-expressions.html#QUERIES-GROUPING-SETS)
/// Expand and align a GROUPING SET expression.
/// (see <https://www.postgresql.org/docs/current/queries-table-expressions.html#QUERIES-GROUPING-SETS>)
///
/// This will take a list of grouping sets and ensure that each group is
/// properly aligned for the physical execution plan. We do this by
/// identifying all unique expression in each group and conforming each
/// group to the same set of expression types and ordering.
/// For example, if we have something like `GROUPING SETS ((a,b,c),(a),(b),(b,c))`
/// we would expand this to `GROUPING SETS ((a,b,c),(a,NULL,NULL),(NULL,b,NULL),(NULL,b,c))
/// (see https://www.postgresql.org/docs/current/queries-table-expressions.html#QUERIES-GROUPING-SETS)
/// (see <https://www.postgresql.org/docs/current/queries-table-expressions.html#QUERIES-GROUPING-SETS>)
fn merge_grouping_set_physical_expr(
grouping_sets: &[Vec<Expr>],
input_dfschema: &DFSchema,
Expand Down Expand Up @@ -1352,7 +1352,7 @@ fn merge_grouping_set_physical_expr(
}

/// Expand and align a CUBE expression. This is a special case of GROUPING SETS
/// (see https://www.postgresql.org/docs/current/queries-table-expressions.html#QUERIES-GROUPING-SETS)
/// (see <https://www.postgresql.org/docs/current/queries-table-expressions.html#QUERIES-GROUPING-SETS>)
fn create_cube_physical_expr(
exprs: &[Expr],
input_dfschema: &DFSchema,
Expand Down Expand Up @@ -1399,7 +1399,7 @@ fn create_cube_physical_expr(
}

/// Expand and align a ROLLUP expression. This is a special case of GROUPING SETS
/// (see https://www.postgresql.org/docs/current/queries-table-expressions.html#QUERIES-GROUPING-SETS)
/// (see <https://www.postgresql.org/docs/current/queries-table-expressions.html#QUERIES-GROUPING-SETS>)
fn create_rollup_physical_expr(
exprs: &[Expr],
input_dfschema: &DFSchema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ pub fn channels<T>(

/// Erroring during [send](DistributionSender::send).
///
/// This occurs when the [receiver](DistributedReceiver) is gone.
/// This occurs when the [receiver](DistributionReceiver) is gone.
#[derive(PartialEq, Eq)]
pub struct SendError<T>(pub T);

Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/scheduler/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ pub mod repartition;
/// from the physical plan as a single [`Pipeline`]. Parallelized implementations
/// are also possible
///
/// [`ExecutionPlan`]: crate::physical_plan::ExecutionPlan
pub trait Pipeline: Send + Sync + std::fmt::Debug {
/// Push a [`RecordBatch`] to the given input partition
fn push(&self, input: RecordBatch, child: usize, partition: usize) -> Result<()>;
Expand Down
10 changes: 6 additions & 4 deletions datafusion/core/src/scheduler/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ impl Task {
}
}

/// Call [`Pipeline::poll_partition`], attempting to make progress on query execution
/// Call [`Pipeline::poll_partition`][super::pipeline::Pipeline::poll_partition],
/// attempting to make progress on query execution
pub fn do_work(self) {
assert!(is_worker(), "Task::do_work called outside of worker pool");
if self.context.is_cancelled() {
Expand Down Expand Up @@ -324,7 +325,9 @@ impl ExecutionContext {

struct TaskWaker {
/// Store a weak reference to the [`ExecutionContext`] to avoid reference cycles if this
/// [`Waker`] is stored within a [`Pipeline`] owned by the [`ExecutionContext`]
/// [`Waker`] is stored within a `Pipeline` owned by the [`ExecutionContext`]
///
/// [`Waker`]: std::task::Waker
context: Weak<ExecutionContext>,

/// A counter that stores the number of times this has been awoken
Expand All @@ -338,9 +341,8 @@ struct TaskWaker {
/// This ensures that a given [`Task`] is not enqueued multiple times
///
/// We store an integer, as opposed to a boolean, so that wake ups that
/// occur during [`Pipeline::poll_partition`] can be detected and handled
/// occur during `Pipeline::poll_partition` can be detected and handled
/// after it has finished executing
///
wake_count: AtomicUsize,

/// The index of the pipeline within `query` to poll
Expand Down
4 changes: 2 additions & 2 deletions datafusion/execution/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl SessionConfig {

/// Customize [`target_partitions`]
///
/// [`target_partitions`]: crate::config::ExecutionOptions::target_partitions
/// [`target_partitions`]: datafusion_common::config::ExecutionOptions::target_partitions
pub fn with_target_partitions(mut self, n: usize) -> Self {
// partition count must be greater than zero
assert!(n > 0);
Expand All @@ -104,7 +104,7 @@ impl SessionConfig {

/// Get [`target_partitions`]
///
/// [`target_partitions`]: crate::config::ExecutionOptions::target_partitions
/// [`target_partitions`]: datafusion_common::config::ExecutionOptions::target_partitions
pub fn target_partitions(&self) -> usize {
self.options.execution.target_partitions
}
Expand Down
3 changes: 2 additions & 1 deletion datafusion/execution/src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ impl std::fmt::Display for ObjectStoreUrl {
/// 2. Systems relying on ad-hoc discovery, without corresponding DDL, can create [`ObjectStore`]
/// lazily by providing a custom implementation of [`ObjectStoreRegistry`]
///
/// [`ListingTableUrl`]: crate::datasource::listing::ListingTableUrl
/// <!-- is in a different crate so normal rustdoc links don't work -->
/// [`ListingTableUrl`]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTableUrl.html
pub trait ObjectStoreRegistry: Send + Sync + std::fmt::Debug + 'static {
/// If a store with the same key existed before, it is replaced and returned
fn register_store(
Expand Down
Loading