Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 191 additions & 5 deletions src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,38 @@ mod listing_table_builder;
pub mod stream_schema_provider;

use actix_web::Either;
use arrow_schema::SchemaRef;
use chrono::NaiveDateTime;
use chrono::{DateTime, Duration, Utc};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::tree_node::Transformed;
use datafusion::execution::disk_manager::DiskManager;
use datafusion::execution::{SendableRecordBatchStream, SessionState, SessionStateBuilder};
use datafusion::execution::{
RecordBatchStream, SendableRecordBatchStream, SessionState, SessionStateBuilder,
};
use datafusion::logical_expr::expr::Alias;
use datafusion::logical_expr::{
Aggregate, Explain, Filter, LogicalPlan, PlanType, Projection, ToStringifiedPlan,
};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{
ExecutionPlan, ExecutionPlanProperties, collect_partitioned, execute_stream_partitioned,
};
use datafusion::prelude::*;
use datafusion::sql::parser::DFParser;
use datafusion::sql::resolve::resolve_table_references;
use datafusion::sql::sqlparser::dialect::PostgreSqlDialect;
use futures::Stream;
use futures::stream::select_all;
use itertools::Itertools;
use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use std::ops::Bound;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Poll};
use sysinfo::System;
use tokio::runtime::Runtime;

Expand All @@ -55,6 +67,7 @@ use crate::catalog::manifest::Manifest;
use crate::catalog::snapshot::Snapshot;
use crate::event::DEFAULT_TIMESTAMP_KEY;
use crate::handlers::http::query::QueryError;
use crate::metrics::increment_bytes_scanned_in_query_by_date;
use crate::option::Mode;
use crate::parseable::PARSEABLE;
use crate::storage::{ObjectStorageProvider, ObjectStoreFormat};
Expand All @@ -77,7 +90,27 @@ pub async fn execute(
is_streaming: bool,
) -> Result<
(
Either<Vec<RecordBatch>, SendableRecordBatchStream>,
Either<
Vec<RecordBatch>,
Pin<
Box<
RecordBatchStreamAdapter<
select_all::SelectAll<
Pin<
Box<
dyn RecordBatchStream<
Item = Result<
RecordBatch,
datafusion::error::DataFusionError,
>,
> + Send,
>,
>,
>,
>,
>,
>,
>,
Vec<String>,
),
ExecuteError,
Expand Down Expand Up @@ -178,7 +211,27 @@ impl Query {
is_streaming: bool,
) -> Result<
(
Either<Vec<RecordBatch>, SendableRecordBatchStream>,
Either<
Vec<RecordBatch>,
Pin<
Box<
RecordBatchStreamAdapter<
select_all::SelectAll<
Pin<
Box<
dyn RecordBatchStream<
Item = Result<
RecordBatch,
datafusion::error::DataFusionError,
>,
> + Send,
>,
>,
>,
>,
>,
>,
>,
Vec<String>,
),
ExecuteError,
Expand All @@ -199,10 +252,49 @@ impl Query {
return Ok((Either::Left(vec![]), fields));
}

let plan = QUERY_SESSION
.state()
.create_physical_plan(df.logical_plan())
.await?;

let results = if !is_streaming {
Either::Left(df.collect().await?)
let task_ctx = QUERY_SESSION.task_ctx();

let batches = collect_partitioned(plan.clone(), task_ctx.clone())
.await?
.into_iter()
.flatten()
.collect();

let actual_io_bytes = get_total_bytes_scanned(&plan);

// Track billing metrics for query scan
let current_date = chrono::Utc::now().date_naive().to_string();
increment_bytes_scanned_in_query_by_date(actual_io_bytes, &current_date);

Either::Left(batches)
} else {
Either::Right(df.execute_stream().await?)
let task_ctx = QUERY_SESSION.task_ctx();

let output_partitions = plan.output_partitioning().partition_count();

let monitor_state = Arc::new(MonitorState {
plan: plan.clone(),
active_streams: AtomicUsize::new(output_partitions),
});

let streams = execute_stream_partitioned(plan.clone(), task_ctx.clone())?
.into_iter()
.map(|s| {
let wrapped = PartitionedMetricMonitor::new(s, monitor_state.clone());
Box::pin(wrapped) as SendableRecordBatchStream
})
.collect_vec();

let merged_stream = futures::stream::select_all(streams);

let final_stream = RecordBatchStreamAdapter::new(plan.schema(), merged_stream);
Either::Right(Box::pin(final_stream))
};

Ok((results, fields))
Expand Down Expand Up @@ -293,6 +385,24 @@ impl Query {
}
}

/// Recursively sums up "bytes_scanned" from all nodes in the plan
fn get_total_bytes_scanned(plan: &Arc<dyn ExecutionPlan>) -> u64 {
let mut total_bytes = 0;

if let Some(metrics) = plan.metrics() {
// "bytes_scanned" is the standard key used by ParquetExec
if let Some(scanned) = metrics.sum_by_name("bytes_scanned") {
total_bytes += scanned.as_usize() as u64;
}
}

for child in plan.children() {
total_bytes += get_total_bytes_scanned(child);
}

total_bytes
}

/// Record of counts for a given time bin.
#[derive(Debug, Serialize, Clone, Deserialize)]
pub struct CountsRecord {
Expand Down Expand Up @@ -741,6 +851,82 @@ pub mod error {
}
}

/// Shared state across all partitions
struct MonitorState {
plan: Arc<dyn ExecutionPlan>,
active_streams: AtomicUsize,
}

/// A wrapper that monitors the ExecutionPlan and logs metrics when the stream finishes.
pub struct PartitionedMetricMonitor {
// The actual stream doing the work
inner: SendableRecordBatchStream,
/// State of the streams
state: Arc<MonitorState>,
// Ensure we only emit metrics once even if polled after completion/error
is_finished: bool,
}

impl PartitionedMetricMonitor {
fn new(inner: SendableRecordBatchStream, state: Arc<MonitorState>) -> Self {
Self {
inner,
state,
is_finished: false,
}
}
}

impl Stream for PartitionedMetricMonitor {
type Item = datafusion::error::Result<RecordBatch>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.is_finished {
return Poll::Ready(None);
}

let poll = self.inner.as_mut().poll_next(cx);

// Check if the stream just finished
match &poll {
Poll::Ready(None) => {
self.is_finished = true;
self.check_if_last_stream();
}
Poll::Ready(Some(Err(e))) => {
tracing::error!("Stream Failed with error: {}", e);
self.is_finished = true;
self.check_if_last_stream();
}
_ => {}
}

poll
}

fn size_hint(&self) -> (usize, Option<usize>) {
(0, None)
}
}

impl RecordBatchStream for PartitionedMetricMonitor {
fn schema(&self) -> SchemaRef {
self.inner.schema()
}
}

impl PartitionedMetricMonitor {
fn check_if_last_stream(&self) {
let prev_count = self.state.active_streams.fetch_sub(1, Ordering::SeqCst);

if prev_count == 1 {
let bytes = get_total_bytes_scanned(&self.state.plan);
let current_date = chrono::Utc::now().date_naive().to_string();
increment_bytes_scanned_in_query_by_date(bytes, &current_date);
}
}
}

#[cfg(test)]
mod tests {
use serde_json::json;
Expand Down
11 changes: 1 addition & 10 deletions src/query/stream_schema_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,7 @@ use crate::{
},
event::DEFAULT_TIMESTAMP_KEY,
hottier::HotTierManager,
metrics::{
QUERY_CACHE_HIT, increment_bytes_scanned_in_query_by_date,
increment_files_scanned_in_query_by_date,
},
metrics::{QUERY_CACHE_HIT, increment_files_scanned_in_query_by_date},
option::Mode,
parseable::{PARSEABLE, STREAM_EXISTS},
storage::{ObjectStorage, ObjectStoreFormat},
Expand Down Expand Up @@ -349,7 +346,6 @@ impl StandardTableProvider {
let mut partitioned_files = Vec::from_iter((0..target_partition).map(|_| Vec::new()));
let mut column_statistics = HashMap::<String, Option<TypedStatistics>>::new();
let mut count = 0;
let mut total_compressed_size = 0u64;
let mut file_count = 0u64;
for (index, file) in manifest_files
.into_iter()
Expand All @@ -366,9 +362,6 @@ impl StandardTableProvider {

// Track billing metrics for files scanned in query
file_count += 1;
// Calculate actual compressed bytes that will be read from storage
let compressed_bytes: u64 = columns.iter().map(|col| col.compressed_size).sum();
total_compressed_size += compressed_bytes;

// object_store::path::Path doesn't automatically deal with Windows path separators
// to do that, we are using from_absolute_path() which takes into consideration the underlying filesystem
Expand Down Expand Up @@ -429,8 +422,6 @@ impl StandardTableProvider {
// Track billing metrics for query scan
let current_date = chrono::Utc::now().date_naive().to_string();
increment_files_scanned_in_query_by_date(file_count, &current_date);
// Use compressed size as it represents actual bytes read from storage (S3/object store charges)
increment_bytes_scanned_in_query_by_date(total_compressed_size, &current_date);

(partitioned_files, statistics)
}
Expand Down
Loading