From 27676a06db3a5d717b91b6a1361f032faafe70e0 Mon Sep 17 00:00:00 2001 From: "westhide.dev" Date: Fri, 21 Mar 2025 00:07:30 +0800 Subject: [PATCH] feat: Support serde for FileScanConfig `batch_size` --- datafusion/datasource/src/file_scan_config.rs | 15 ++++++++++++- datafusion/proto/proto/datafusion.proto | 1 + datafusion/proto/src/generated/pbjson.rs | 22 +++++++++++++++++++ datafusion/proto/src/generated/prost.rs | 2 ++ .../proto/src/physical_plan/from_proto.rs | 3 ++- .../proto/src/physical_plan/to_proto.rs | 1 + 6 files changed, 42 insertions(+), 2 deletions(-) diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 91b5f01577392..82308bda70124 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -167,6 +167,9 @@ pub struct FileScanConfig { pub new_lines_in_values: bool, /// File source such as `ParquetSource`, `CsvSource`, `JsonSource`, etc. pub file_source: Arc, + /// Batch size while creating new batches + /// Defaults to [`datafusion_common::config::ExecutionOptions`] batch_size. + pub batch_size: Option, } impl DataSource for FileScanConfig { @@ -176,10 +179,13 @@ impl DataSource for FileScanConfig { context: Arc, ) -> Result { let object_store = context.runtime_env().object_store(&self.object_store_url)?; + let batch_size = self + .batch_size + .unwrap_or_else(|| context.session_config().batch_size()); let source = self .file_source - .with_batch_size(context.session_config().batch_size()) + .with_batch_size(batch_size) .with_schema(Arc::clone(&self.file_schema)) .with_projection(self); @@ -338,6 +344,7 @@ impl FileScanConfig { file_compression_type: FileCompressionType::UNCOMPRESSED, new_lines_in_values: false, file_source: Arc::clone(&file_source), + batch_size: None, }; config = config.with_source(Arc::clone(&file_source)); @@ -492,6 +499,12 @@ impl FileScanConfig { self } + /// Set the batch_size property + pub fn with_batch_size(mut self, batch_size: Option) -> Self { + self.batch_size = batch_size; + self + } + /// Specifies whether newlines in (quoted) values are supported. /// /// Parsing newlines in quoted values may be affected by execution behaviour such as diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 7c34afe7ff5f0..2e028eb291181 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -997,6 +997,7 @@ message FileScanExecConf { reserved 10; datafusion_common.Constraints constraints = 11; + optional uint64 batch_size = 12; } message ParquetScanExecNode { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 75e8ef55b7df0..6166b6ec47961 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -5780,6 +5780,9 @@ impl serde::Serialize for FileScanExecConf { if self.constraints.is_some() { len += 1; } + if self.batch_size.is_some() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion.FileScanExecConf", len)?; if !self.file_groups.is_empty() { struct_ser.serialize_field("fileGroups", &self.file_groups)?; @@ -5808,6 +5811,11 @@ impl serde::Serialize for FileScanExecConf { if let Some(v) = self.constraints.as_ref() { struct_ser.serialize_field("constraints", v)?; } + if let Some(v) = self.batch_size.as_ref() { + #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] + struct_ser.serialize_field("batchSize", ToString::to_string(&v).as_str())?; + } struct_ser.end() } } @@ -5831,6 +5839,8 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { "output_ordering", "outputOrdering", "constraints", + "batch_size", + "batchSize", ]; #[allow(clippy::enum_variant_names)] @@ -5844,6 +5854,7 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { ObjectStoreUrl, OutputOrdering, Constraints, + BatchSize, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -5874,6 +5885,7 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { "objectStoreUrl" | "object_store_url" => Ok(GeneratedField::ObjectStoreUrl), "outputOrdering" | "output_ordering" => Ok(GeneratedField::OutputOrdering), "constraints" => Ok(GeneratedField::Constraints), + "batchSize" | "batch_size" => Ok(GeneratedField::BatchSize), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -5902,6 +5914,7 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { let mut object_store_url__ = None; let mut output_ordering__ = None; let mut constraints__ = None; + let mut batch_size__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::FileGroups => { @@ -5961,6 +5974,14 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { } constraints__ = map_.next_value()?; } + GeneratedField::BatchSize => { + if batch_size__.is_some() { + return Err(serde::de::Error::duplicate_field("batchSize")); + } + batch_size__ = + map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| x.0) + ; + } } } Ok(FileScanExecConf { @@ -5973,6 +5994,7 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { object_store_url: object_store_url__.unwrap_or_default(), output_ordering: output_ordering__.unwrap_or_default(), constraints: constraints__, + batch_size: batch_size__, }) } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 81c821c0d2d18..d2165dad48501 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1517,6 +1517,8 @@ pub struct FileScanExecConf { pub output_ordering: ::prost::alloc::vec::Vec, #[prost(message, optional, tag = "11")] pub constraints: ::core::option::Option, + #[prost(uint64, optional, tag = "12")] + pub batch_size: ::core::option::Option, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct ParquetScanExecNode { diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 0bf9fdb63d593..a417eccee1cdb 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -544,7 +544,8 @@ pub fn parse_protobuf_file_scan_config( .with_projection(projection) .with_limit(proto.limit.as_ref().map(|sl| sl.limit as usize)) .with_table_partition_cols(table_partition_cols) - .with_output_ordering(output_ordering); + .with_output_ordering(output_ordering) + .with_batch_size(proto.batch_size.map(|s| s as usize)); Ok(config) } diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 3f67842fe625c..c2cf506eb96de 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -530,6 +530,7 @@ pub fn serialize_file_scan_config( }) .collect::>(), constraints: Some(conf.constraints.clone().into()), + batch_size: conf.batch_size.map(|s| s as u64), }) }