From 327f8bceae270cceffeb843d271f053b8570ad5e Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Thu, 14 Aug 2025 09:26:31 -0400 Subject: [PATCH] FFI_RecordBatchStream was causing a memory leak --- datafusion/ffi/src/record_batch_stream.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/datafusion/ffi/src/record_batch_stream.rs b/datafusion/ffi/src/record_batch_stream.rs index 78d65a816fcc2..6c2282df88dd0 100644 --- a/datafusion/ffi/src/record_batch_stream.rs +++ b/datafusion/ffi/src/record_batch_stream.rs @@ -57,6 +57,9 @@ pub struct FFI_RecordBatchStream { /// Return the schema of the record batch pub schema: unsafe extern "C" fn(stream: &Self) -> WrappedSchema, + /// Release the memory of the private data when it is no longer being used. + pub release: unsafe extern "C" fn(arg: &mut Self), + /// Internal data. This is only to be accessed by the provider of the plan. /// The foreign library should never attempt to access this data. pub private_data: *mut c_void, @@ -82,6 +85,7 @@ impl FFI_RecordBatchStream { FFI_RecordBatchStream { poll_next: poll_next_fn_wrapper, schema: schema_fn_wrapper, + release: release_fn_wrapper, private_data, } } @@ -96,6 +100,12 @@ unsafe extern "C" fn schema_fn_wrapper(stream: &FFI_RecordBatchStream) -> Wrappe (*stream).schema().into() } +unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_RecordBatchStream) { + let private_data = + Box::from_raw(provider.private_data as *mut RecordBatchStreamPrivateData); + drop(private_data); +} + fn record_batch_to_wrapped_array( record_batch: RecordBatch, ) -> RResult { @@ -197,6 +207,12 @@ impl Stream for FFI_RecordBatchStream { } } +impl Drop for FFI_RecordBatchStream { + fn drop(&mut self) { + unsafe { (self.release)(self) } + } +} + #[cfg(test)] mod tests { use std::sync::Arc;