Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions datafusion-examples/examples/custom_datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ use datafusion::execution::context::{SessionState, TaskContext};
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::memory::MemoryStream;
use datafusion::physical_plan::{
project_schema, ExecutionPlan, SendableRecordBatchStream, Statistics,
project_schema, DisplayAs, DisplayFormatType, ExecutionPlan,
SendableRecordBatchStream, Statistics,
};
use datafusion::prelude::*;
use datafusion_expr::{Expr, LogicalPlanBuilder};
use std::any::Any;
use std::collections::{BTreeMap, HashMap};
use std::fmt::{Debug, Formatter};
use std::fmt::{self, Debug, Formatter};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::time::timeout;
Expand Down Expand Up @@ -204,6 +205,12 @@ impl CustomExec {
}
}

impl DisplayAs for CustomExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> std::fmt::Result {
write!(f, "CustomExec")
}
}

impl ExecutionPlan for CustomExec {
fn as_any(&self) -> &dyn Any {
self
Expand Down
20 changes: 11 additions & 9 deletions datafusion/core/src/datasource/physical_plan/arrow_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,17 @@ impl ArrowExec {
}
}

impl DisplayAs for ArrowExec {
fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
write!(f, "ArrowExec: ")?;
self.base_config.fmt_as(t, f)
}
}

impl ExecutionPlan for ArrowExec {
fn as_any(&self) -> &dyn Any {
self
Expand Down Expand Up @@ -132,15 +143,6 @@ impl ExecutionPlan for ArrowExec {
Some(self.metrics.clone_inner())
}

fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
write!(f, "ArrowExec: ")?;
self.base_config.fmt_as(t, f)
}

fn statistics(&self) -> Statistics {
self.projected_statistics.clone()
}
Expand Down
20 changes: 11 additions & 9 deletions datafusion/core/src/datasource/physical_plan/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,17 @@ impl AvroExec {
}
}

impl DisplayAs for AvroExec {
fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
write!(f, "AvroExec: ")?;
self.base_config.fmt_as(t, f)
}
}

impl ExecutionPlan for AvroExec {
fn as_any(&self) -> &dyn Any {
self
Expand Down Expand Up @@ -141,15 +152,6 @@ impl ExecutionPlan for AvroExec {
Ok(Box::pin(stream))
}

fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
write!(f, "AvroExec: ")?;
self.base_config.fmt_as(t, f)
}

fn statistics(&self) -> Statistics {
self.projected_statistics.clone()
}
Expand Down
22 changes: 12 additions & 10 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,18 @@ impl CsvExec {
}
}

impl DisplayAs for CsvExec {
fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
write!(f, "CsvExec: ")?;
self.base_config.fmt_as(t, f)?;
write!(f, ", has_header={}", self.has_header)
}
}

impl ExecutionPlan for CsvExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
Expand Down Expand Up @@ -172,16 +184,6 @@ impl ExecutionPlan for CsvExec {
Ok(Box::pin(stream) as SendableRecordBatchStream)
}

fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
write!(f, "CsvExec: ")?;
self.base_config.fmt_as(t, f)?;
write!(f, ", has_header={}", self.has_header)
}

fn statistics(&self) -> Statistics {
self.projected_statistics.clone()
}
Expand Down
20 changes: 11 additions & 9 deletions datafusion/core/src/datasource/physical_plan/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,17 @@ impl NdJsonExec {
}
}

impl DisplayAs for NdJsonExec {
fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
write!(f, "JsonExec: ")?;
self.base_config.fmt_as(t, f)
}
}

impl ExecutionPlan for NdJsonExec {
fn as_any(&self) -> &dyn Any {
self
Expand Down Expand Up @@ -150,15 +161,6 @@ impl ExecutionPlan for NdJsonExec {
Ok(Box::pin(stream) as SendableRecordBatchStream)
}

fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
write!(f, "JsonExec: ")?;
self.base_config.fmt_as(t, f)
}

fn statistics(&self) -> Statistics {
self.projected_statistics.clone()
}
Expand Down
54 changes: 28 additions & 26 deletions datafusion/core/src/datasource/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,34 @@ impl ParquetExec {
}
}

impl DisplayAs for ParquetExec {
fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
let predicate_string = self
.predicate
.as_ref()
.map(|p| format!(", predicate={p}"))
.unwrap_or_default();

let pruning_predicate_string = self
.pruning_predicate
.as_ref()
.map(|pre| format!(", pruning_predicate={}", pre.predicate_expr()))
.unwrap_or_default();

write!(f, "ParquetExec: ")?;
self.base_config.fmt_as(t, f)?;
write!(f, "{}{}", predicate_string, pruning_predicate_string,)
}
}
}
}

impl ExecutionPlan for ParquetExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
Expand Down Expand Up @@ -413,32 +441,6 @@ impl ExecutionPlan for ParquetExec {
Ok(Box::pin(stream))
}

fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
let predicate_string = self
.predicate
.as_ref()
.map(|p| format!(", predicate={p}"))
.unwrap_or_default();

let pruning_predicate_string = self
.pruning_predicate
.as_ref()
.map(|pre| format!(", pruning_predicate={}", pre.predicate_expr()))
.unwrap_or_default();

write!(f, "ParquetExec: ")?;
self.base_config.fmt_as(t, f)?;
write!(f, "{}{}", predicate_string, pruning_predicate_string,)
}
}
}

fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
Expand Down
20 changes: 11 additions & 9 deletions datafusion/core/src/physical_optimizer/repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ mod tests {
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::union::UnionExec;
use crate::physical_plan::{displayable, DisplayFormatType, Statistics};
use crate::physical_plan::{displayable, DisplayAs, DisplayFormatType, Statistics};
use datafusion_physical_expr::PhysicalSortRequirement;

fn schema() -> SchemaRef {
Expand Down Expand Up @@ -1136,6 +1136,16 @@ mod tests {
}
}

impl DisplayAs for SortRequiredExec {
fn fmt_as(
&self,
_t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
write!(f, "SortRequiredExec")
}
}

impl ExecutionPlan for SortRequiredExec {
fn as_any(&self) -> &dyn std::any::Any {
self
Expand Down Expand Up @@ -1184,13 +1194,5 @@ mod tests {
fn statistics(&self) -> Statistics {
self.input.statistics()
}

fn fmt_as(
&self,
_t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
write!(f, "SortRequiredExec")
}
}
}
Loading