Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ members = ["ballista-cli", "ballista/client", "ballista/core", "ballista/executo
resolver = "2"

[workspace.dependencies]
arrow = { version = "53", features = ["ipc_compression"] }
arrow-flight = { version = "53", features = ["flight-sql-experimental"] }
arrow = { version = "54", features = ["ipc_compression"] }
arrow-flight = { version = "54", features = ["flight-sql-experimental"] }
clap = { version = "4.5", features = ["derive", "cargo"] }
configure_me = { version = "0.4.0" }
configure_me_codegen = { version = "0.4.4" }
datafusion = "44.0.0"
datafusion-cli = "44.0.0"
datafusion-proto = "44.0.0"
datafusion-proto-common = "44.0.0"
datafusion = "45.0.0"
datafusion-cli = "45.0.0"
datafusion-proto = "45.0.0"
datafusion-proto-common = "45.0.0"
object_store = "0.11"
prost = "0.13"
prost-types = "0.13"
Expand All @@ -45,15 +45,15 @@ ctor = { version = "0.2" }
mimalloc = { version = "0.1" }

tokio = { version = "1" }
uuid = { version = "1.10", features = ["v4", "v7"] }
rand = { version = "0.8" }
uuid = { version = "1.13", features = ["v4", "v7"] }
rand = { version = "0.9" }
env_logger = { version = "0.11" }
futures = { version = "0.3" }
log = { version = "0.4" }
parking_lot = { version = "0.12" }
tempfile = { version = "3" }
tempfile = { version = "3.16" }
dashmap = { version = "6.1" }
async-trait = { version = "0.1.4" }
async-trait = { version = "0.1" }
serde = { version = "1.0" }
tokio-stream = { version = "0.1" }
url = { version = "2.5" }
Expand Down
8 changes: 4 additions & 4 deletions ballista-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
[package]
name = "ballista-cli"
description = "Command Line Client for Ballista distributed query engine."
version = "44.0.0"
version = "45.0.0"
authors = ["Apache DataFusion <dev@datafusion.apache.org>"]
edition = "2021"
keywords = ["ballista", "cli"]
Expand All @@ -28,14 +28,14 @@ repository = "https://github.com/apache/arrow-ballista"
readme = "README.md"

[dependencies]
ballista = { path = "../ballista/client", version = "44.0.0", features = ["standalone"] }
ballista = { path = "../ballista/client", version = "45.0.0", features = ["standalone"] }
clap = { workspace = true, features = ["derive", "cargo"] }
datafusion = { workspace = true }
datafusion-cli = { workspace = true }
dirs = "5.0.1"
dirs = "6.0"
env_logger = { workspace = true }
mimalloc = { workspace = true }
rustyline = "14.0.0"
rustyline = "15.0.0"
tokio = { workspace = true, features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] }

[features]
Expand Down
12 changes: 6 additions & 6 deletions ballista/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
name = "ballista"
description = "Ballista Distributed Compute"
license = "Apache-2.0"
version = "44.0.0"
version = "45.0.0"
homepage = "https://github.com/apache/arrow-ballista"
repository = "https://github.com/apache/arrow-ballista"
readme = "README.md"
Expand All @@ -28,18 +28,18 @@ edition = "2021"

[dependencies]
async-trait = { workspace = true }
ballista-core = { path = "../core", version = "44.0.0" }
ballista-executor = { path = "../executor", version = "44.0.0", optional = true }
ballista-scheduler = { path = "../scheduler", version = "44.0.0", optional = true }
ballista-core = { path = "../core", version = "45.0.0" }
ballista-executor = { path = "../executor", version = "45.0.0", optional = true }
ballista-scheduler = { path = "../scheduler", version = "45.0.0", optional = true }
datafusion = { workspace = true }
log = { workspace = true }

tokio = { workspace = true }
url = { workspace = true }

[dev-dependencies]
ballista-executor = { path = "../executor", version = "44.0.0" }
ballista-scheduler = { path = "../scheduler", version = "44.0.0" }
ballista-executor = { path = "../executor", version = "45.0.0" }
ballista-scheduler = { path = "../scheduler", version = "45.0.0" }
ctor = { workspace = true }
datafusion-proto = { workspace = true }
env_logger = { workspace = true }
Expand Down
99 changes: 99 additions & 0 deletions ballista/client/tests/context_checks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,4 +365,103 @@ mod supported {

Ok(())
}

/// looks like `ctx.enable_url_table()` changes session context id.
#[rstest]
#[case::standalone(standalone_context())]
#[case::remote(remote_context())]
#[tokio::test]
async fn should_execute_sql_show_with_url_table(
#[future(awt)]
#[case]
ctx: SessionContext,
test_data: String,
) {
let ctx = ctx.enable_url_table();

let result = ctx
.sql(&format!("select string_col, timestamp_col from '{test_data}/alltypes_plain.parquet' where id > 4"))
.await
.unwrap()
.collect()
.await
.unwrap();

let expected = [
"+------------+---------------------+",
"| string_col | timestamp_col |",
"+------------+---------------------+",
"| 31 | 2009-03-01T00:01:00 |",
"| 30 | 2009-04-01T00:00:00 |",
"| 31 | 2009-04-01T00:01:00 |",
"+------------+---------------------+",
];

assert_batches_eq!(expected, &result);
}

#[rstest]
#[case::standalone(standalone_context())]
#[case::remote(remote_context())]
#[tokio::test]
#[cfg(not(windows))] // test is failing at windows, can't debug it
async fn should_support_sql_insert_into(
#[future(awt)]
#[case]
ctx: SessionContext,
test_data: String,
) {
ctx.register_parquet(
"test",
&format!("{test_data}/alltypes_plain.parquet"),
Default::default(),
)
.await
.unwrap();

let write_dir = tempfile::tempdir().expect("temporary directory to be created");
let write_dir_path = write_dir
.path()
.to_str()
.expect("path to be converted to str");

ctx.sql("select * from test")
.await
.unwrap()
.write_parquet(write_dir_path, Default::default(), Default::default())
.await
.unwrap();

ctx.register_parquet("written_table", write_dir_path, Default::default())
.await
.unwrap();

ctx.sql("INSERT INTO written_table select * from test")
.await
.unwrap()
.show()
.await
.unwrap();

let result = ctx
.sql("select id, string_col, timestamp_col from written_table where id > 4 order by id")
.await.unwrap()
.collect()
.await.unwrap();

let expected = [
"+----+------------+---------------------+",
"| id | string_col | timestamp_col |",
"+----+------------+---------------------+",
"| 5 | 31 | 2009-03-01T00:01:00 |",
"| 5 | 31 | 2009-03-01T00:01:00 |",
"| 6 | 30 | 2009-04-01T00:00:00 |",
"| 6 | 30 | 2009-04-01T00:00:00 |",
"| 7 | 31 | 2009-04-01T00:01:00 |",
"| 7 | 31 | 2009-04-01T00:01:00 |",
"+----+------------+---------------------+",
];

assert_batches_eq!(expected, &result);
}
}
106 changes: 0 additions & 106 deletions ballista/client/tests/context_unsupported.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,112 +144,6 @@ mod unsupported {
"+----+----------+---------------------+",
];

assert_batches_eq!(expected, &result);
}
#[rstest]
#[case::standalone(standalone_context())]
#[case::remote(remote_context())]
#[tokio::test]
#[should_panic]
// "Error: Internal(failed to serialize logical plan: Internal(LogicalPlan serde is not yet implemented for Dml))"
async fn should_support_sql_insert_into(
#[future(awt)]
#[case]
ctx: SessionContext,
test_data: String,
) {
ctx.register_parquet(
"test",
&format!("{test_data}/alltypes_plain.parquet"),
Default::default(),
)
.await
.unwrap();
let write_dir = tempfile::tempdir().expect("temporary directory to be created");
let write_dir_path = write_dir
.path()
.to_str()
.expect("path to be converted to str");

ctx.sql("select * from test")
.await
.unwrap()
.write_parquet(write_dir_path, Default::default(), Default::default())
.await
.unwrap();

ctx.register_parquet("written_table", write_dir_path, Default::default())
.await
.unwrap();

let _ = ctx
.sql("INSERT INTO written_table select * from written_table")
.await
.unwrap()
.collect()
.await
.unwrap();

let result = ctx
.sql("select id, string_col, timestamp_col from written_table where id > 4 order by id")
.await.unwrap()
.collect()
.await.unwrap();

let expected = [
"+----+------------+---------------------+",
"| id | string_col | timestamp_col |",
"+----+------------+---------------------+",
"| 5 | 31 | 2009-03-01T00:01:00 |",
"| 5 | 31 | 2009-03-01T00:01:00 |",
"| 6 | 30 | 2009-04-01T00:00:00 |",
"| 6 | 30 | 2009-04-01T00:00:00 |",
"| 7 | 31 | 2009-04-01T00:01:00 |",
"| 7 | 31 | 2009-04-01T00:01:00 |",
"+----+------------+---------------------+",
];

assert_batches_eq!(expected, &result);
}

/// looks like `ctx.enable_url_table()` changes session context id.
///
/// Error returned:
/// ```
/// Failed to load SessionContext for session ID b5530099-63d1-43b1-9e11-87ac83bb33e5:
/// General error: No session for b5530099-63d1-43b1-9e11-87ac83bb33e5 found
/// ```
#[rstest]
#[case::standalone(standalone_context())]
#[case::remote(remote_context())]
#[tokio::test]
#[should_panic]
async fn should_execute_sql_show_with_url_table(
#[future(awt)]
#[case]
ctx: SessionContext,
test_data: String,
) {
let ctx = ctx.enable_url_table();

let result = ctx
.sql(&format!("select string_col, timestamp_col from '{test_data}/alltypes_plain.parquet' where id > 4"))
.await
.unwrap()
.collect()
.await
.unwrap();

let expected = [
"+------------+---------------------+",
"| string_col | timestamp_col |",
"+------------+---------------------+",
"| 31 | 2009-03-01T00:01:00 |",
"| 30 | 2009-04-01T00:00:00 |",
"| 31 | 2009-04-01T00:01:00 |",
"+------------+---------------------+",
];

assert_batches_eq!(expected, &result);
}
}
2 changes: 1 addition & 1 deletion ballista/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
name = "ballista-core"
description = "Ballista Distributed Compute"
license = "Apache-2.0"
version = "44.0.0"
version = "45.0.0"
homepage = "https://github.com/apache/arrow-ballista"
repository = "https://github.com/apache/arrow-ballista"
readme = "README.md"
Expand Down
15 changes: 15 additions & 0 deletions ballista/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ pub const BALLISTA_STANDALONE_PARALLELISM: &str = "ballista.standalone.paralleli
/// max message size for gRPC clients
pub const BALLISTA_GRPC_CLIENT_MAX_MESSAGE_SIZE: &str =
"ballista.grpc_client_max_message_size";
/// enable or disable ballista dml planner extension.
/// when enabled planner will use custom logical planner DML
/// extension which will serialize table provider used in DML
///
/// this configuration should be disabled if using remote schema
/// registries.
pub const BALLISTA_PLANNER_DML_EXTENSION: &str = "ballista.planner.dml_extension";

pub type ParseResult<T> = result::Result<T, String>;
use std::sync::LazyLock;
Expand All @@ -48,6 +55,10 @@ static CONFIG_ENTRIES: LazyLock<HashMap<String, ConfigEntry>> = LazyLock::new(||
"Configuration for max message size in gRPC clients".to_string(),
DataType::UInt64,
Some((16 * 1024 * 1024).to_string())),
ConfigEntry::new(BALLISTA_PLANNER_DML_EXTENSION.to_string(),
"Enable ballista planner DML extension".to_string(),
DataType::Boolean,
Some((true).to_string())),
];
entries
.into_iter()
Expand Down Expand Up @@ -165,6 +176,10 @@ impl BallistaConfig {
self.get_usize_setting(BALLISTA_STANDALONE_PARALLELISM)
}

pub fn planner_dml_extension(&self) -> bool {
self.get_bool_setting(BALLISTA_PLANNER_DML_EXTENSION)
}

fn get_usize_setting(&self, key: &str) -> usize {
if let Some(v) = self.settings.get(key) {
// infallible because we validate all configs in the constructor
Expand Down
4 changes: 2 additions & 2 deletions ballista/core/src/execution_plans/shuffle_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use itertools::Itertools;
use log::{error, info};
use rand::prelude::SliceRandom;
use rand::thread_rng;
use rand::rng;
use tokio::sync::{mpsc, Semaphore};
use tokio_stream::wrappers::ReceiverStream;

Expand Down Expand Up @@ -163,7 +163,7 @@ impl ExecutionPlan for ShuffleReaderExec {
.map(|(_, p)| p)
.collect();
// Shuffle partitions for evenly send fetching partition requests to avoid hot executors within multiple tasks
partition_locations.shuffle(&mut thread_rng());
partition_locations.shuffle(&mut rng());

let response_receiver =
send_fetch_partitions(partition_locations, max_request_num);
Expand Down
Loading
Loading