diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index c042778265abb..d20de3106bd32 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -40,6 +40,7 @@ futures = "0.3" env_logger = "0.9" mimalloc = { version = "0.1", optional = true, default-features = false } snmalloc-rs = {version = "0.2", optional = true, features= ["cache-friendly"] } +rand = "0.8.4" [dev-dependencies] ballista-core = { path = "../ballista/rust/core" } diff --git a/benchmarks/README.md b/benchmarks/README.md index a63761b6c2b3d..e6c17430d6e28 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -178,5 +178,20 @@ Query 'fare_amt_by_passenger' iteration 1 took 7599 ms Query 'fare_amt_by_passenger' iteration 2 took 7969 ms ``` +## Running the Ballista Loadtest + +```bash + cargo run --bin tpch -- loadtest ballista-load + --query-list 1,3,5,6,7,10,12,13 + --requests 200 + --concurrency 10 + --data-path /**** + --format parquet + --host localhost + --port 50050 + --sql-path /*** + --debug +``` + [1]: http://www.tpc.org/tpch/ [2]: https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index 71e68b6c4b75a..d9317fe38dd35 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -17,6 +17,9 @@ //! Benchmark derived from TPC-H. This is not an official TPC-H benchmark. +use futures::future::join_all; +use rand::prelude::*; +use std::ops::Div; use std::{ fs, iter::Iterator, @@ -137,6 +140,48 @@ struct DataFusionBenchmarkOpt { mem_table: bool, } +#[derive(Debug, StructOpt, Clone)] +struct BallistaLoadtestOpt { + #[structopt(short = "q", long)] + query_list: String, + + /// Activate debug mode to see query results + #[structopt(short, long)] + debug: bool, + + /// Number of requests + #[structopt(short = "r", long = "requests", default_value = "100")] + requests: usize, + + /// Number of connections + #[structopt(short = "c", long = "concurrency", default_value = "5")] + concurrency: usize, + + /// Number of partitions to process in parallel + #[structopt(short = "n", long = "partitions", default_value = "2")] + partitions: usize, + + /// Path to data files + #[structopt(parse(from_os_str), required = true, short = "p", long = "data-path")] + path: PathBuf, + + /// Path to sql files + #[structopt(parse(from_os_str), required = true, long = "sql-path")] + sql_path: PathBuf, + + /// File format: `csv` or `parquet` + #[structopt(short = "f", long = "format", default_value = "parquet")] + file_format: String, + + /// Ballista executor host + #[structopt(long = "host")] + host: Option, + + /// Ballista executor port + #[structopt(long = "port")] + port: Option, +} + #[derive(Debug, StructOpt)] struct ConvertOpt { /// Path to csv files @@ -173,11 +218,19 @@ enum BenchmarkSubCommandOpt { DataFusionBenchmark(DataFusionBenchmarkOpt), } +#[derive(Debug, StructOpt)] +#[structopt(about = "loadtest command")] +enum LoadtestOpt { + #[structopt(name = "ballista-load")] + BallistaLoadtest(BallistaLoadtestOpt), +} + #[derive(Debug, StructOpt)] #[structopt(name = "TPC-H", about = "TPC-H Benchmarks.")] enum TpchOpt { Benchmark(BenchmarkSubCommandOpt), Convert(ConvertOpt), + Loadtest(LoadtestOpt), } const TABLES: &[&str] = &[ @@ -187,6 +240,7 @@ const TABLES: &[&str] = &[ #[tokio::main] async fn main() -> Result<()> { use BenchmarkSubCommandOpt::*; + use LoadtestOpt::*; env_logger::init(); match TpchOpt::from_args() { @@ -197,6 +251,9 @@ async fn main() -> Result<()> { benchmark_datafusion(opt).await.map(|_| ()) } TpchOpt::Convert(opt) => convert_tbl(opt).await, + TpchOpt::Loadtest(BallistaLoadtest(opt)) => { + loadtest_ballista(opt).await.map(|_| ()) + } } } @@ -268,6 +325,151 @@ async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> Result<()> { // register tables with Ballista context let path = opt.path.to_str().unwrap(); let file_format = opt.file_format.as_str(); + + register_tables(path, file_format, &ctx).await; + + let mut millis = vec![]; + + // run benchmark + let sql = get_query_sql(opt.query)?; + println!("Running benchmark with query {}:\n {}", opt.query, sql); + for i in 0..opt.iterations { + let start = Instant::now(); + let df = ctx + .sql(&sql) + .await + .map_err(|e| DataFusionError::Plan(format!("{:?}", e))) + .unwrap(); + let batches = df + .collect() + .await + .map_err(|e| DataFusionError::Plan(format!("{:?}", e))) + .unwrap(); + let elapsed = start.elapsed().as_secs_f64() * 1000.0; + millis.push(elapsed as f64); + println!("Query {} iteration {} took {:.1} ms", opt.query, i, elapsed); + if opt.debug { + pretty::print_batches(&batches)?; + } + } + + let avg = millis.iter().sum::() / millis.len() as f64; + println!("Query {} avg time: {:.2} ms", opt.query, avg); + + Ok(()) +} + +async fn loadtest_ballista(opt: BallistaLoadtestOpt) -> Result<()> { + println!( + "Running loadtest_ballista with the following options: {:?}", + opt + ); + + let config = BallistaConfig::builder() + .set( + BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, + &format!("{}", opt.partitions), + ) + .build() + .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?; + + let concurrency = opt.concurrency; + let request_amount = opt.requests; + let mut clients = vec![]; + + for _num in 0..concurrency { + clients.push(BallistaContext::remote( + opt.host.clone().unwrap().as_str(), + opt.port.unwrap(), + &config, + )); + } + + // register tables with Ballista context + let path = opt.path.to_str().unwrap(); + let file_format = opt.file_format.as_str(); + let sql_path = opt.sql_path.to_str().unwrap().to_string(); + + for ctx in &clients { + register_tables(path, file_format, ctx).await; + } + + let request_per_thread = request_amount.div(concurrency); + // run benchmark + let query_list: Vec = opt + .query_list + .split(',') + .map(|s| s.parse().unwrap()) + .collect(); + println!("query list: {:?} ", &query_list); + + let total = Instant::now(); + let mut futures = vec![]; + + for (client_id, client) in clients.into_iter().enumerate() { + let query_list_clone = query_list.clone(); + let sql_path_clone = sql_path.clone(); + let handle = tokio::spawn(async move { + for i in 0..request_per_thread { + let query_id = query_list_clone + .get( + (0..query_list_clone.len()) + .choose(&mut rand::thread_rng()) + .unwrap(), + ) + .unwrap(); + let sql = + get_query_sql_by_path(query_id.to_owned(), sql_path_clone.clone()) + .unwrap(); + println!( + "Client {} Round {} Query {} started", + &client_id, &i, query_id + ); + let start = Instant::now(); + let df = client + .sql(&sql) + .await + .map_err(|e| DataFusionError::Plan(format!("{:?}", e))) + .unwrap(); + let batches = df + .collect() + .await + .map_err(|e| DataFusionError::Plan(format!("{:?}", e))) + .unwrap(); + let elapsed = start.elapsed().as_secs_f64() * 1000.0; + println!( + "Client {} Round {} Query {} took {:.1} ms ", + &client_id, &i, query_id, elapsed + ); + if opt.debug { + pretty::print_batches(&batches).unwrap(); + } + } + }); + futures.push(handle); + } + join_all(futures).await; + let elapsed = total.elapsed().as_secs_f64() * 1000.0; + println!("###############################"); + println!("load test took {:.1} ms", elapsed); + Ok(()) +} + +fn get_query_sql_by_path(query: usize, mut sql_path: String) -> Result { + if sql_path.ends_with('/') { + sql_path.pop(); + } + if query > 0 && query < 23 { + let filename = format!("{}/q{}.sql", sql_path, query); + Ok(fs::read_to_string(&filename).expect("failed to read query")) + } else { + Err(DataFusionError::Plan( + "invalid query. Expected value between 1 and 22".to_owned(), + )) + } +} + +async fn register_tables(path: &str, file_format: &str, ctx: &BallistaContext) { for table in TABLES { match file_format { // dbgen creates .tbl ('|' delimited) files without header @@ -281,7 +483,8 @@ async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> Result<()> { .file_extension(".tbl"); ctx.register_csv(table, &path, options) .await - .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))?; + .map_err(|e| DataFusionError::Plan(format!("{:?}", e))) + .unwrap(); } "csv" => { let path = format!("{}/{}", path, table); @@ -289,47 +492,21 @@ async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> Result<()> { let options = CsvReadOptions::new().schema(&schema).has_header(true); ctx.register_csv(table, &path, options) .await - .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))?; + .map_err(|e| DataFusionError::Plan(format!("{:?}", e))) + .unwrap(); } "parquet" => { let path = format!("{}/{}", path, table); ctx.register_parquet(table, &path) .await - .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))?; + .map_err(|e| DataFusionError::Plan(format!("{:?}", e))) + .unwrap(); } other => { unimplemented!("Invalid file format '{}'", other); } } } - - let mut millis = vec![]; - - // run benchmark - let sql = get_query_sql(opt.query)?; - println!("Running benchmark with query {}:\n {}", opt.query, sql); - for i in 0..opt.iterations { - let start = Instant::now(); - let df = ctx - .sql(&sql) - .await - .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))?; - let batches = df - .collect() - .await - .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))?; - let elapsed = start.elapsed().as_secs_f64() * 1000.0; - millis.push(elapsed as f64); - println!("Query {} iteration {} took {:.1} ms", opt.query, i, elapsed); - if opt.debug { - pretty::print_batches(&batches)?; - } - } - - let avg = millis.iter().sum::() / millis.len() as f64; - println!("Query {} avg time: {:.2} ms", opt.query, avg); - - Ok(()) } fn get_query_sql(query: usize) -> Result {