Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 49 additions & 25 deletions rust/benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ use arrow::util::pretty;
use datafusion::error::Result;
use datafusion::execution::context::{ExecutionConfig, ExecutionContext};

use datafusion::datasource::parquet::ParquetTable;
use datafusion::datasource::{CsvFile, MemTable, TableProvider};
use datafusion::physical_plan::csv::CsvReadOptions;
use structopt::StructOpt;

Expand Down Expand Up @@ -59,6 +61,10 @@ struct TpchOpt {
/// File format: `csv` or `parquet`
#[structopt(short = "f", long = "format", default_value = "csv")]
file_format: String,

/// Load the data into a MemTable before executing the query
#[structopt(short = "m", long = "mem-table")]
mem_table: bool,
}

#[tokio::main]
Expand All @@ -73,31 +79,49 @@ async fn main() -> Result<()> {

let path = opt.path.to_str().unwrap();

match opt.file_format.as_str() {
// dbgen creates .tbl ('|' delimited) files
"tbl" => {
let path = format!("{}/lineitem.tbl", path);
let schema = lineitem_schema();
let options = CsvReadOptions::new()
.schema(&schema)
.delimiter(b'|')
.file_extension(".tbl");
ctx.register_csv("lineitem", &path, options)?
}
"csv" => {
let path = format!("{}/lineitem", path);
let schema = lineitem_schema();
let options = CsvReadOptions::new().schema(&schema).has_header(true);
ctx.register_csv("lineitem", &path, options)?
}
"parquet" => {
let path = format!("{}/lineitem", path);
ctx.register_parquet("lineitem", &path)?
}
other => {
println!("Invalid file format '{}'", other);
process::exit(-1);
}
let tableprovider: Box<dyn TableProvider + Send + Sync> =
match opt.file_format.as_str() {
// dbgen creates .tbl ('|' delimited) files
"tbl" => {
let path = format!("{}/lineitem.tbl", path);
let schema = lineitem_schema();
let options = CsvReadOptions::new()
.schema(&schema)
.delimiter(b'|')
.file_extension(".tbl");

Box::new(CsvFile::try_new(&path, options)?)
}
"csv" => {
let path = format!("{}/lineitem", path);
let schema = lineitem_schema();
let options = CsvReadOptions::new().schema(&schema).has_header(true);

Box::new(CsvFile::try_new(&path, options)?)
}
"parquet" => {
let path = format!("{}/lineitem", path);
Box::new(ParquetTable::try_new(&path)?)
}
other => {
println!("Invalid file format '{}'", other);
process::exit(-1);
}
};

if opt.mem_table {
println!("Loading data into memory");
let start = Instant::now();

let memtable = MemTable::load(tableprovider.as_ref(), opt.batch_size).await?;
println!(
"Loaded data into memory in {} ms",
start.elapsed().as_millis()
);

ctx.register_table("lineitem", Box::new(memtable));
} else {
ctx.register_table("lineitem", tableprovider);
}

let sql = match opt.query {
Expand Down
2 changes: 1 addition & 1 deletion rust/datafusion/benches/sort_limit_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ fn create_context() -> Arc<Mutex<ExecutionContext>> {
let ctx_holder: Arc<Mutex<Vec<Arc<Mutex<ExecutionContext>>>>> =
Arc::new(Mutex::new(vec![]));
rt.block_on(async {
let mem_table = MemTable::load(&csv).await.unwrap();
let mem_table = MemTable::load(&csv, 16 * 1024).await.unwrap();

// create local execution context
let mut ctx = ExecutionContext::new();
Expand Down
4 changes: 2 additions & 2 deletions rust/datafusion/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ impl MemTable {
}

/// Create a mem table by reading from another data source
pub async fn load(t: &dyn TableProvider) -> Result<Self> {
pub async fn load(t: &dyn TableProvider, batch_size: usize) -> Result<Self> {
let schema = t.schema();
let exec = t.scan(&None, 1024 * 1024)?;
let exec = t.scan(&None, batch_size)?;

let mut data: Vec<Vec<RecordBatch>> =
Vec::with_capacity(exec.output_partitioning().partition_count());
Expand Down