From a66fa4f25e7f762d7de53e43a0ebd3106271e5d6 Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Thu, 9 Oct 2025 15:51:23 +0300 Subject: [PATCH 1/6] feat: Implement FFI_TableProviderFactory --- datafusion/ffi/src/lib.rs | 1 + datafusion/ffi/src/table_provider_factory.rs | 404 +++++++++++++++++++ 2 files changed, 405 insertions(+) create mode 100644 datafusion/ffi/src/table_provider_factory.rs diff --git a/datafusion/ffi/src/lib.rs b/datafusion/ffi/src/lib.rs index 2ca9b8f6f495a..5eb3626db1e7b 100644 --- a/datafusion/ffi/src/lib.rs +++ b/datafusion/ffi/src/lib.rs @@ -39,6 +39,7 @@ pub mod record_batch_stream; pub mod schema_provider; pub mod session; pub mod table_provider; +pub mod table_provider_factory; pub mod table_source; pub mod udaf; pub mod udf; diff --git a/datafusion/ffi/src/table_provider_factory.rs b/datafusion/ffi/src/table_provider_factory.rs new file mode 100644 index 0000000000000..de3eb220d3cea --- /dev/null +++ b/datafusion/ffi/src/table_provider_factory.rs @@ -0,0 +1,404 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{ffi::c_void, sync::Arc}; + +use abi_stable::{ + std_types::{RResult, RString, RVec}, + StableAbi, +}; +use async_ffi::{FfiFuture, FutureExt}; +use async_trait::async_trait; +use datafusion::{ + catalog::{Session, TableProvider, TableProviderFactory}, + error::DataFusionError, + execution::session_state::SessionStateBuilder, + logical_expr::CreateExternalTable, + prelude::SessionContext, +}; +use datafusion_common::TableReference; +use datafusion_proto::{ + logical_plan::{from_proto, to_proto, DefaultLogicalExtensionCodec}, + protobuf::{CreateExternalTableNode, SortExprNodeCollection}, +}; +use prost::Message; +use std::collections::HashMap; +use tokio::runtime::Handle; + +use crate::{ + df_result, rresult_return, + session_config::FFI_SessionConfig, + table_provider::{FFI_TableProvider, ForeignTableProvider}, +}; + +use super::session_config::ForeignSessionConfig; +use datafusion::error::Result; + +/// A stable struct for sharing [`TableProviderFactory`] across FFI boundaries. +/// +/// Similar to [`FFI_TableProvider`], this struct uses the FFI-safe pattern where: +/// - The `FFI_*` struct exposes stable function pointers +/// - Private data is stored as an opaque pointer +/// - The `Foreign*` wrapper is used by consumers on the other side of the FFI boundary +/// +/// [`FFI_TableProvider`]: crate::table_provider::FFI_TableProvider +#[repr(C)] +#[derive(Debug, StableAbi)] +#[allow(non_camel_case_types)] +pub struct FFI_TableProviderFactory { + /// Create a TableProvider with the given command. + /// + /// # Arguments + /// + /// * `factory` - the table provider factory + /// * `session_config` - session configuration + /// * `cmd_serialized` - a [`CreateExternalTableNode`] protobuf message serialized into bytes + /// to pass across the FFI boundary. + pub create: unsafe extern "C" fn( + factory: &Self, + session_config: &FFI_SessionConfig, + cmd_serialized: RVec, + ) + -> FfiFuture>, + + /// Used to create a clone of the factory. This should only need to be called + /// by the receiver of the factory. + pub clone: unsafe extern "C" fn(factory: &Self) -> Self, + + /// Release the memory of the private data when it is no longer being used. + pub release: unsafe extern "C" fn(factory: &mut Self), + + /// Return the major DataFusion version number of this factory. + pub version: unsafe extern "C" fn() -> u64, + + /// Internal data. This is only to be accessed by the provider of the factory. + /// A [`ForeignTableProviderFactory`] should never attempt to access this data. + pub private_data: *mut c_void, +} + +unsafe impl Send for FFI_TableProviderFactory {} +unsafe impl Sync for FFI_TableProviderFactory {} + +struct FactoryPrivateData { + factory: Arc, + runtime: Option, +} + +unsafe extern "C" fn create_fn_wrapper( + factory: &FFI_TableProviderFactory, + session_config: &FFI_SessionConfig, + cmd_serialized: RVec, +) -> FfiFuture> { + let private_data = factory.private_data as *mut FactoryPrivateData; + let internal_factory = &(*private_data).factory; + let session_config = session_config.clone(); + let runtime = &(*private_data).runtime; + + async move { + let config = rresult_return!(ForeignSessionConfig::try_from(&session_config)); + let session = SessionStateBuilder::new() + .with_default_features() + .with_config(config.0) + .build(); + let ctx = SessionContext::new_with_state(session); + + let codec = DefaultLogicalExtensionCodec {}; + + let proto_cmd = + rresult_return!(CreateExternalTableNode::decode(cmd_serialized.as_ref())); + + // Deserialize CreateExternalTable from protobuf + let pb_schema = rresult_return!(proto_cmd.schema.ok_or_else(|| { + DataFusionError::Internal( + "CreateExternalTableNode was missing required field schema".to_string(), + ) + })); + + let constraints = rresult_return!(proto_cmd.constraints.ok_or_else(|| { + DataFusionError::Internal( + "CreateExternalTableNode was missing required field constraints" + .to_string(), + ) + })); + + let definition = if !proto_cmd.definition.is_empty() { + Some(proto_cmd.definition) + } else { + None + }; + + let mut order_exprs = vec![]; + for expr in &proto_cmd.order_exprs { + let sorts = rresult_return!(from_proto::parse_sorts( + &expr.sort_expr_nodes, + &ctx, + &codec + )); + order_exprs.push(sorts); + } + + let mut column_defaults = HashMap::with_capacity(proto_cmd.column_defaults.len()); + for (col_name, expr) in &proto_cmd.column_defaults { + let expr = rresult_return!(from_proto::parse_expr(expr, &ctx, &codec)); + column_defaults.insert(col_name.clone(), expr); + } + + let table_ref = rresult_return!(proto_cmd.name.as_ref().ok_or_else(|| { + DataFusionError::Internal( + "CreateExternalTableNode was missing required field name".to_string(), + ) + })); + + let name: TableReference = rresult_return!(table_ref.clone().try_into()); + + let cmd = CreateExternalTable { + schema: rresult_return!(pb_schema.try_into()), + name, + location: proto_cmd.location, + file_type: proto_cmd.file_type, + table_partition_cols: proto_cmd.table_partition_cols, + order_exprs, + if_not_exists: proto_cmd.if_not_exists, + or_replace: proto_cmd.or_replace, + temporary: proto_cmd.temporary, + definition, + unbounded: proto_cmd.unbounded, + options: proto_cmd.options, + constraints: constraints.into(), + column_defaults, + }; + + let provider = rresult_return!(internal_factory.create(&ctx.state(), &cmd).await); + + RResult::ROk(FFI_TableProvider::new(provider, true, runtime.clone())) + } + .into_ffi() +} + +unsafe extern "C" fn release_fn_wrapper(factory: &mut FFI_TableProviderFactory) { + let private_data = Box::from_raw(factory.private_data as *mut FactoryPrivateData); + drop(private_data); +} + +unsafe extern "C" fn clone_fn_wrapper( + factory: &FFI_TableProviderFactory, +) -> FFI_TableProviderFactory { + let old_private_data = factory.private_data as *const FactoryPrivateData; + let runtime = (*old_private_data).runtime.clone(); + + let private_data = Box::into_raw(Box::new(FactoryPrivateData { + factory: Arc::clone(&(*old_private_data).factory), + runtime, + })) as *mut c_void; + + FFI_TableProviderFactory { + create: create_fn_wrapper, + clone: clone_fn_wrapper, + release: release_fn_wrapper, + version: super::version, + private_data, + } +} + +impl Drop for FFI_TableProviderFactory { + fn drop(&mut self) { + unsafe { (self.release)(self) } + } +} + +impl FFI_TableProviderFactory { + /// Creates a new [`FFI_TableProviderFactory`]. + pub fn new(factory: Arc, runtime: Option) -> Self { + let private_data = Box::new(FactoryPrivateData { factory, runtime }); + + Self { + create: create_fn_wrapper, + clone: clone_fn_wrapper, + release: release_fn_wrapper, + version: super::version, + private_data: Box::into_raw(private_data) as *mut c_void, + } + } +} + +/// This wrapper struct exists on the receiver side of the FFI interface, so it has +/// no guarantees about being able to access the data in `private_data`. Any functions +/// defined on this struct must only use the stable functions provided in +/// FFI_TableProviderFactory to interact with the foreign table provider factory. +#[derive(Debug)] +pub struct ForeignTableProviderFactory(pub FFI_TableProviderFactory); + +unsafe impl Send for ForeignTableProviderFactory {} +unsafe impl Sync for ForeignTableProviderFactory {} + +impl From<&FFI_TableProviderFactory> for ForeignTableProviderFactory { + fn from(factory: &FFI_TableProviderFactory) -> Self { + Self(factory.clone()) + } +} + +impl Clone for FFI_TableProviderFactory { + fn clone(&self) -> Self { + unsafe { (self.clone)(self) } + } +} + +#[async_trait] +impl TableProviderFactory for ForeignTableProviderFactory { + async fn create( + &self, + state: &dyn Session, + cmd: &CreateExternalTable, + ) -> Result> { + let session_config: FFI_SessionConfig = state.config().into(); + + let codec = DefaultLogicalExtensionCodec {}; + + // Serialize CreateExternalTable to protobuf + let mut converted_order_exprs: Vec = vec![]; + for order in &cmd.order_exprs { + let temp = SortExprNodeCollection { + sort_expr_nodes: to_proto::serialize_sorts(order, &codec)?, + }; + converted_order_exprs.push(temp); + } + + let mut converted_column_defaults = + HashMap::with_capacity(cmd.column_defaults.len()); + for (col_name, expr) in &cmd.column_defaults { + converted_column_defaults + .insert(col_name.clone(), to_proto::serialize_expr(expr, &codec)?); + } + + let proto_cmd = CreateExternalTableNode { + name: Some(cmd.name.clone().into()), + location: cmd.location.clone(), + file_type: cmd.file_type.clone(), + schema: Some(cmd.schema.as_ref().try_into()?), + table_partition_cols: cmd.table_partition_cols.clone(), + if_not_exists: cmd.if_not_exists, + or_replace: cmd.or_replace, + temporary: cmd.temporary, + order_exprs: converted_order_exprs, + definition: cmd.definition.clone().unwrap_or_default(), + unbounded: cmd.unbounded, + options: cmd.options.clone(), + constraints: Some(cmd.constraints.clone().into()), + column_defaults: converted_column_defaults, + }; + + let cmd_serialized = proto_cmd.encode_to_vec().into(); + + let provider = unsafe { + let maybe_provider = + (self.0.create)(&self.0, &session_config, cmd_serialized).await; + + let ffi_provider = df_result!(maybe_provider)?; + ForeignTableProvider::from(&ffi_provider) + }; + + Ok(Arc::new(provider)) + } +} + +#[cfg(test)] +mod tests { + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion::datasource::listing_table_factory::ListingTableFactory; + use datafusion_common::ToDFSchema; + use std::collections::HashMap; + + use super::*; + + #[tokio::test] + async fn test_round_trip_ffi_table_provider_factory() -> Result<()> { + let factory = Arc::new(ListingTableFactory::new()); + + let ffi_factory = FFI_TableProviderFactory::new(factory, None); + + let foreign_factory: ForeignTableProviderFactory = (&ffi_factory).into(); + + let ctx = SessionContext::new(); + + // Create a simple external table command + let schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Utf8, false), + ]); + let df_schema = schema.to_dfschema_ref()?; + + let cmd = CreateExternalTable { + schema: df_schema, + name: TableReference::bare("test_table"), + location: "file:///tmp/test.csv".to_string(), + file_type: "CSV".to_string(), + table_partition_cols: vec![], + if_not_exists: false, + or_replace: false, + temporary: false, + definition: None, + order_exprs: vec![], + unbounded: false, + options: HashMap::new(), + constraints: Default::default(), + column_defaults: HashMap::new(), + }; + + let provider = foreign_factory.create(&ctx.state(), &cmd).await?; + + assert_eq!(provider.schema().fields().len(), 2); + + Ok(()) + } + + #[tokio::test] + async fn test_ffi_table_provider_factory_clone() -> Result<()> { + let factory = Arc::new(ListingTableFactory::new()); + let ffi_factory = FFI_TableProviderFactory::new(factory, None); + + // Test that we can clone the factory + let cloned_factory = ffi_factory.clone(); + let foreign_factory: ForeignTableProviderFactory = (&cloned_factory).into(); + + let ctx = SessionContext::new(); + + let schema = Schema::new(vec![Field::new("c", DataType::Float64, true)]); + let df_schema = schema.to_dfschema_ref()?; + + let cmd = CreateExternalTable { + schema: df_schema, + name: TableReference::bare("cloned_test"), + location: "file:///tmp/cloned.parquet".to_string(), + file_type: "PARQUET".to_string(), + table_partition_cols: vec![], + if_not_exists: false, + or_replace: false, + temporary: false, + definition: None, + order_exprs: vec![], + unbounded: false, + options: HashMap::new(), + constraints: Default::default(), + column_defaults: HashMap::new(), + }; + + let provider = foreign_factory.create(&ctx.state(), &cmd).await?; + assert_eq!(provider.schema().fields().len(), 1); + + Ok(()) + } +} From 926747f6aa0a93432d20725f7cb73414570a0175 Mon Sep 17 00:00:00 2001 From: "Paul J. Davis" Date: Thu, 12 Feb 2026 14:16:49 -0600 Subject: [PATCH 2/6] feat: Finish implementing FFI_TableProviderFactory --- datafusion/ffi/src/table_provider_factory.rs | 330 +++++++++++------- datafusion/ffi/src/tests/mod.rs | 19 +- .../ffi/src/tests/table_provider_factory.rs | 58 +++ datafusion/ffi/tests/ffi_integration.rs | 46 ++- 4 files changed, 329 insertions(+), 124 deletions(-) create mode 100644 datafusion/ffi/src/tests/table_provider_factory.rs diff --git a/datafusion/ffi/src/table_provider_factory.rs b/datafusion/ffi/src/table_provider_factory.rs index de3eb220d3cea..93c432bd8504f 100644 --- a/datafusion/ffi/src/table_provider_factory.rs +++ b/datafusion/ffi/src/table_provider_factory.rs @@ -18,35 +18,31 @@ use std::{ffi::c_void, sync::Arc}; use abi_stable::{ - std_types::{RResult, RString, RVec}, StableAbi, + std_types::{RResult, RString, RVec}, }; use async_ffi::{FfiFuture, FutureExt}; use async_trait::async_trait; -use datafusion::{ - catalog::{Session, TableProvider, TableProviderFactory}, - error::DataFusionError, - execution::session_state::SessionStateBuilder, - logical_expr::CreateExternalTable, - prelude::SessionContext, -}; +use datafusion_catalog::{Session, TableProvider, TableProviderFactory}; use datafusion_common::TableReference; -use datafusion_proto::{ - logical_plan::{from_proto, to_proto, DefaultLogicalExtensionCodec}, - protobuf::{CreateExternalTableNode, SortExprNodeCollection}, +use datafusion_common::error::{DataFusionError, Result}; +use datafusion_execution::TaskContext; +use datafusion_expr::CreateExternalTable; +use datafusion_proto::logical_plan::from_proto::{parse_expr, parse_sorts}; +use datafusion_proto::logical_plan::to_proto::{serialize_expr, serialize_sorts}; +use datafusion_proto::logical_plan::{ + DefaultLogicalExtensionCodec, LogicalExtensionCodec, }; +use datafusion_proto::protobuf::{CreateExternalTableNode, SortExprNodeCollection}; use prost::Message; use std::collections::HashMap; use tokio::runtime::Handle; -use crate::{ - df_result, rresult_return, - session_config::FFI_SessionConfig, - table_provider::{FFI_TableProvider, ForeignTableProvider}, -}; - -use super::session_config::ForeignSessionConfig; -use datafusion::error::Result; +use crate::execution::FFI_TaskContextProvider; +use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec; +use crate::session::{FFI_SessionRef, ForeignSession}; +use crate::table_provider::{FFI_TableProvider, ForeignTableProvider}; +use crate::{df_result, rresult_return}; /// A stable struct for sharing [`TableProviderFactory`] across FFI boundaries. /// @@ -58,7 +54,6 @@ use datafusion::error::Result; /// [`FFI_TableProvider`]: crate::table_provider::FFI_TableProvider #[repr(C)] #[derive(Debug, StableAbi)] -#[allow(non_camel_case_types)] pub struct FFI_TableProviderFactory { /// Create a TableProvider with the given command. /// @@ -68,12 +63,13 @@ pub struct FFI_TableProviderFactory { /// * `session_config` - session configuration /// * `cmd_serialized` - a [`CreateExternalTableNode`] protobuf message serialized into bytes /// to pass across the FFI boundary. - pub create: unsafe extern "C" fn( + create: unsafe extern "C" fn( factory: &Self, - session_config: &FFI_SessionConfig, + session: FFI_SessionRef, cmd_serialized: RVec, - ) - -> FfiFuture>, + ) -> FfiFuture>, + + pub logical_codec: FFI_LogicalExtensionCodec, /// Used to create a clone of the factory. This should only need to be called /// by the receiver of the factory. @@ -88,35 +84,106 @@ pub struct FFI_TableProviderFactory { /// Internal data. This is only to be accessed by the provider of the factory. /// A [`ForeignTableProviderFactory`] should never attempt to access this data. pub private_data: *mut c_void, + + /// Utility to identify when FFI objects are accessed locally through + /// the foreign interface. See [`crate::get_library_marker_id`] and + /// the crate's `README.md` for more information. + pub library_marker_id: extern "C" fn() -> usize, } unsafe impl Send for FFI_TableProviderFactory {} unsafe impl Sync for FFI_TableProviderFactory {} struct FactoryPrivateData { - factory: Arc, + factory: Arc, runtime: Option, } +impl FFI_TableProviderFactory { + /// Creates a new [`FFI_TableProvider`]. + pub fn new( + factory: Arc, + runtime: Option, + task_ctx_provider: impl Into, + logical_codec: Option>, + ) -> Self { + let task_ctx_provider = task_ctx_provider.into(); + let logical_codec = + logical_codec.unwrap_or_else(|| Arc::new(DefaultLogicalExtensionCodec {})); + let logical_codec = FFI_LogicalExtensionCodec::new( + logical_codec, + runtime.clone(), + task_ctx_provider.clone(), + ); + Self::new_with_ffi_codec(factory, runtime, logical_codec) + } + + pub fn new_with_ffi_codec( + factory: Arc, + runtime: Option, + logical_codec: FFI_LogicalExtensionCodec, + ) -> Self { + let private_data = Box::new(FactoryPrivateData { factory, runtime }); + + Self { + create: create_fn_wrapper, + logical_codec, + clone: clone_fn_wrapper, + release: release_fn_wrapper, + version: super::version, + private_data: Box::into_raw(private_data) as *mut c_void, + library_marker_id: crate::get_library_marker_id, + } + } + + fn inner(&self) -> &Arc { + let private_data = self.private_data as *const FactoryPrivateData; + unsafe { &(*private_data).factory } + } + + fn runtime(&self) -> &Option { + let private_data = self.private_data as *const FactoryPrivateData; + unsafe { &(*private_data).runtime } + } +} + +impl Clone for FFI_TableProviderFactory { + fn clone(&self) -> Self { + unsafe { (self.clone)(self) } + } +} + +impl Drop for FFI_TableProviderFactory { + fn drop(&mut self) { + unsafe { (self.release)(self) } + } +} + unsafe extern "C" fn create_fn_wrapper( factory: &FFI_TableProviderFactory, - session_config: &FFI_SessionConfig, + session: FFI_SessionRef, cmd_serialized: RVec, ) -> FfiFuture> { - let private_data = factory.private_data as *mut FactoryPrivateData; - let internal_factory = &(*private_data).factory; - let session_config = session_config.clone(); - let runtime = &(*private_data).runtime; + let task_ctx: Result, DataFusionError> = + (&factory.logical_codec.task_ctx_provider).try_into(); + let runtime = factory.runtime().clone(); + let logical_codec: Arc = (&factory.logical_codec).into(); + let ffi_logical_codec = factory.logical_codec.clone(); + let internal_factory = Arc::clone(factory.inner()); async move { - let config = rresult_return!(ForeignSessionConfig::try_from(&session_config)); - let session = SessionStateBuilder::new() - .with_default_features() - .with_config(config.0) - .build(); - let ctx = SessionContext::new_with_state(session); - - let codec = DefaultLogicalExtensionCodec {}; + let mut foreign_session = None; + let session = rresult_return!( + session + .as_local() + .map(Ok::<&(dyn Session + Send + Sync), DataFusionError>) + .unwrap_or_else(|| { + foreign_session = Some(ForeignSession::try_from(&session)?); + Ok(foreign_session.as_ref().unwrap()) + }) + ); + + let task_ctx = rresult_return!(task_ctx); let proto_cmd = rresult_return!(CreateExternalTableNode::decode(cmd_serialized.as_ref())); @@ -141,19 +208,23 @@ unsafe extern "C" fn create_fn_wrapper( None }; - let mut order_exprs = vec![]; + let mut order_exprs = Vec::with_capacity(proto_cmd.order_exprs.len()); for expr in &proto_cmd.order_exprs { - let sorts = rresult_return!(from_proto::parse_sorts( + let sorts = rresult_return!(parse_sorts( &expr.sort_expr_nodes, - &ctx, - &codec + task_ctx.as_ref(), + logical_codec.as_ref(), )); order_exprs.push(sorts); } let mut column_defaults = HashMap::with_capacity(proto_cmd.column_defaults.len()); for (col_name, expr) in &proto_cmd.column_defaults { - let expr = rresult_return!(from_proto::parse_expr(expr, &ctx, &codec)); + let expr = rresult_return!(parse_expr( + expr, + task_ctx.as_ref(), + logical_codec.as_ref() + )); column_defaults.insert(col_name.clone(), expr); } @@ -182,56 +253,46 @@ unsafe extern "C" fn create_fn_wrapper( column_defaults, }; - let provider = rresult_return!(internal_factory.create(&ctx.state(), &cmd).await); + let provider = rresult_return!(internal_factory.create(session, &cmd).await); - RResult::ROk(FFI_TableProvider::new(provider, true, runtime.clone())) + RResult::ROk(FFI_TableProvider::new_with_ffi_codec( + provider, + true, + runtime.clone(), + ffi_logical_codec, + )) } .into_ffi() } -unsafe extern "C" fn release_fn_wrapper(factory: &mut FFI_TableProviderFactory) { - let private_data = Box::from_raw(factory.private_data as *mut FactoryPrivateData); - drop(private_data); -} - unsafe extern "C" fn clone_fn_wrapper( factory: &FFI_TableProviderFactory, ) -> FFI_TableProviderFactory { - let old_private_data = factory.private_data as *const FactoryPrivateData; - let runtime = (*old_private_data).runtime.clone(); + let runtime = factory.runtime().clone(); + let old_factory = Arc::clone(factory.inner()); let private_data = Box::into_raw(Box::new(FactoryPrivateData { - factory: Arc::clone(&(*old_private_data).factory), + factory: old_factory, runtime, })) as *mut c_void; FFI_TableProviderFactory { create: create_fn_wrapper, + logical_codec: factory.logical_codec.clone(), clone: clone_fn_wrapper, release: release_fn_wrapper, version: super::version, private_data, + library_marker_id: crate::get_library_marker_id, } } -impl Drop for FFI_TableProviderFactory { - fn drop(&mut self) { - unsafe { (self.release)(self) } - } -} - -impl FFI_TableProviderFactory { - /// Creates a new [`FFI_TableProviderFactory`]. - pub fn new(factory: Arc, runtime: Option) -> Self { - let private_data = Box::new(FactoryPrivateData { factory, runtime }); - - Self { - create: create_fn_wrapper, - clone: clone_fn_wrapper, - release: release_fn_wrapper, - version: super::version, - private_data: Box::into_raw(private_data) as *mut c_void, - } +unsafe extern "C" fn release_fn_wrapper(factory: &mut FFI_TableProviderFactory) { + unsafe { + debug_assert!(!factory.private_data.is_null()); + let private_data = Box::from_raw(factory.private_data as *mut FactoryPrivateData); + drop(private_data); + factory.private_data = std::ptr::null_mut(); } } @@ -242,37 +303,35 @@ impl FFI_TableProviderFactory { #[derive(Debug)] pub struct ForeignTableProviderFactory(pub FFI_TableProviderFactory); -unsafe impl Send for ForeignTableProviderFactory {} -unsafe impl Sync for ForeignTableProviderFactory {} - -impl From<&FFI_TableProviderFactory> for ForeignTableProviderFactory { +impl From<&FFI_TableProviderFactory> for Arc { fn from(factory: &FFI_TableProviderFactory) -> Self { - Self(factory.clone()) + if (factory.library_marker_id)() == crate::get_library_marker_id() { + Arc::clone(factory.inner()) as Arc + } else { + Arc::new(ForeignTableProviderFactory(factory.clone())) + } } } -impl Clone for FFI_TableProviderFactory { - fn clone(&self) -> Self { - unsafe { (self.clone)(self) } - } -} +unsafe impl Send for ForeignTableProviderFactory {} +unsafe impl Sync for ForeignTableProviderFactory {} #[async_trait] impl TableProviderFactory for ForeignTableProviderFactory { async fn create( &self, - state: &dyn Session, + session: &dyn Session, cmd: &CreateExternalTable, ) -> Result> { - let session_config: FFI_SessionConfig = state.config().into(); + let session = FFI_SessionRef::new(session, None, self.0.logical_codec.clone()); - let codec = DefaultLogicalExtensionCodec {}; + let codec: Arc = (&self.0.logical_codec).into(); // Serialize CreateExternalTable to protobuf let mut converted_order_exprs: Vec = vec![]; for order in &cmd.order_exprs { let temp = SortExprNodeCollection { - sort_expr_nodes: to_proto::serialize_sorts(order, &codec)?, + sort_expr_nodes: serialize_sorts(order, codec.as_ref())?, }; converted_order_exprs.push(temp); } @@ -281,7 +340,7 @@ impl TableProviderFactory for ForeignTableProviderFactory { HashMap::with_capacity(cmd.column_defaults.len()); for (col_name, expr) in &cmd.column_defaults { converted_column_defaults - .insert(col_name.clone(), to_proto::serialize_expr(expr, &codec)?); + .insert(col_name.clone(), serialize_expr(expr, codec.as_ref())?); } let proto_cmd = CreateExternalTableNode { @@ -304,11 +363,10 @@ impl TableProviderFactory for ForeignTableProviderFactory { let cmd_serialized = proto_cmd.encode_to_vec().into(); let provider = unsafe { - let maybe_provider = - (self.0.create)(&self.0, &session_config, cmd_serialized).await; + let maybe_provider = (self.0.create)(&self.0, session, cmd_serialized).await; let ffi_provider = df_result!(maybe_provider)?; - ForeignTableProvider::from(&ffi_provider) + ForeignTableProvider(ffi_provider) }; Ok(Arc::new(provider)) @@ -317,35 +375,65 @@ impl TableProviderFactory for ForeignTableProviderFactory { #[cfg(test)] mod tests { - use arrow::datatypes::{DataType, Field, Schema}; - use datafusion::datasource::listing_table_factory::ListingTableFactory; + use arrow::datatypes::Schema; + use datafusion::prelude::SessionContext; use datafusion_common::ToDFSchema; + use datafusion_execution::TaskContextProvider; use std::collections::HashMap; use super::*; + #[derive(Debug)] + struct TestTableProviderFactory {} + + #[async_trait] + impl TableProviderFactory for TestTableProviderFactory { + async fn create( + &self, + _session: &dyn Session, + _cmd: &CreateExternalTable, + ) -> Result> { + use arrow::datatypes::Field; + use datafusion::arrow::array::Float32Array; + use datafusion::arrow::datatypes::DataType; + use datafusion::arrow::record_batch::RecordBatch; + use datafusion::datasource::MemTable; + + let schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)])); + + let batch1 = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Float32Array::from(vec![2.0, 4.0, 8.0]))], + )?; + let batch2 = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Float32Array::from(vec![64.0]))], + )?; + + Ok(Arc::new(MemTable::try_new( + schema, + vec![vec![batch1], vec![batch2]], + )?)) + } + } + #[tokio::test] async fn test_round_trip_ffi_table_provider_factory() -> Result<()> { - let factory = Arc::new(ListingTableFactory::new()); - - let ffi_factory = FFI_TableProviderFactory::new(factory, None); - - let foreign_factory: ForeignTableProviderFactory = (&ffi_factory).into(); + let ctx = Arc::new(SessionContext::new()); + let task_ctx_provider = Arc::clone(&ctx) as Arc; + let task_ctx_provider = FFI_TaskContextProvider::from(&task_ctx_provider); - let ctx = SessionContext::new(); - - // Create a simple external table command - let schema = Schema::new(vec![ - Field::new("a", DataType::Int32, false), - Field::new("b", DataType::Utf8, false), - ]); - let df_schema = schema.to_dfschema_ref()?; + let factory = Arc::new(TestTableProviderFactory {}); + let ffi_factory = + FFI_TableProviderFactory::new(factory, None, task_ctx_provider, None); + let factory: Arc = (&ffi_factory).into(); let cmd = CreateExternalTable { - schema: df_schema, + schema: Schema::empty().to_dfschema_ref()?, name: TableReference::bare("test_table"), - location: "file:///tmp/test.csv".to_string(), - file_type: "CSV".to_string(), + location: "test".to_string(), + file_type: "test".to_string(), table_partition_cols: vec![], if_not_exists: false, or_replace: false, @@ -358,32 +446,32 @@ mod tests { column_defaults: HashMap::new(), }; - let provider = foreign_factory.create(&ctx.state(), &cmd).await?; + let provider = factory.create(&ctx.state(), &cmd).await?; - assert_eq!(provider.schema().fields().len(), 2); + assert_eq!(provider.schema().fields().len(), 1); Ok(()) } #[tokio::test] async fn test_ffi_table_provider_factory_clone() -> Result<()> { - let factory = Arc::new(ListingTableFactory::new()); - let ffi_factory = FFI_TableProviderFactory::new(factory, None); + let ctx = Arc::new(SessionContext::new()); + let task_ctx_provider = Arc::clone(&ctx) as Arc; + let task_ctx_provider = FFI_TaskContextProvider::from(&task_ctx_provider); + + let factory = Arc::new(TestTableProviderFactory {}); + let ffi_factory = + FFI_TableProviderFactory::new(factory, None, task_ctx_provider, None); // Test that we can clone the factory let cloned_factory = ffi_factory.clone(); - let foreign_factory: ForeignTableProviderFactory = (&cloned_factory).into(); - - let ctx = SessionContext::new(); - - let schema = Schema::new(vec![Field::new("c", DataType::Float64, true)]); - let df_schema = schema.to_dfschema_ref()?; + let factory: Arc = (&cloned_factory).into(); let cmd = CreateExternalTable { - schema: df_schema, + schema: Schema::empty().to_dfschema_ref()?, name: TableReference::bare("cloned_test"), - location: "file:///tmp/cloned.parquet".to_string(), - file_type: "PARQUET".to_string(), + location: "test".to_string(), + file_type: "test".to_string(), table_partition_cols: vec![], if_not_exists: false, or_replace: false, @@ -396,7 +484,7 @@ mod tests { column_defaults: HashMap::new(), }; - let provider = foreign_factory.create(&ctx.state(), &cmd).await?; + let provider = factory.create(&ctx.state(), &cmd).await?; assert_eq!(provider.schema().fields().len(), 1); Ok(()) diff --git a/datafusion/ffi/src/tests/mod.rs b/datafusion/ffi/src/tests/mod.rs index 9bcd7e0031083..c936330668468 100644 --- a/datafusion/ffi/src/tests/mod.rs +++ b/datafusion/ffi/src/tests/mod.rs @@ -34,19 +34,21 @@ use udf_udaf_udwf::{ create_ffi_stddev_func, create_ffi_sum_func, create_ffi_table_func, }; -use super::table_provider::FFI_TableProvider; -use super::udf::FFI_ScalarUDF; use crate::catalog_provider::FFI_CatalogProvider; use crate::catalog_provider_list::FFI_CatalogProviderList; use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec; +use crate::table_provider::FFI_TableProvider; +use crate::table_provider_factory::FFI_TableProviderFactory; use crate::tests::catalog::create_catalog_provider_list; use crate::udaf::FFI_AggregateUDF; +use crate::udf::FFI_ScalarUDF; use crate::udtf::FFI_TableFunction; use crate::udwf::FFI_WindowUDF; mod async_provider; pub mod catalog; mod sync_provider; +mod table_provider_factory; mod udf_udaf_udwf; pub mod utils; @@ -71,6 +73,10 @@ pub struct ForeignLibraryModule { codec: FFI_LogicalExtensionCodec, ) -> FFI_TableProvider, + /// Constructs the table provider factory + pub create_table_factory: + extern "C" fn(codec: FFI_LogicalExtensionCodec) -> FFI_TableProviderFactory, + /// Create a scalar UDF pub create_scalar_udf: extern "C" fn() -> FFI_ScalarUDF, @@ -128,6 +134,14 @@ extern "C" fn construct_table_provider( } } +/// Here we only wish to create a simple table provider as an example. +/// We create an in-memory table and convert it to it's FFI counterpart. +extern "C" fn construct_table_provider_factory( + codec: FFI_LogicalExtensionCodec, +) -> FFI_TableProviderFactory { + table_provider_factory::create(codec) +} + #[export_root_module] /// This defines the entry point for using the module. pub fn get_foreign_library_module() -> ForeignLibraryModuleRef { @@ -135,6 +149,7 @@ pub fn get_foreign_library_module() -> ForeignLibraryModuleRef { create_catalog: create_catalog_provider, create_catalog_list: create_catalog_provider_list, create_table: construct_table_provider, + create_table_factory: construct_table_provider_factory, create_scalar_udf: create_ffi_abs_func, create_nullary_udf: create_ffi_random_func, create_table_function: create_ffi_table_func, diff --git a/datafusion/ffi/src/tests/table_provider_factory.rs b/datafusion/ffi/src/tests/table_provider_factory.rs new file mode 100644 index 0000000000000..29af6aacf6484 --- /dev/null +++ b/datafusion/ffi/src/tests/table_provider_factory.rs @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use datafusion_catalog::{MemTable, Session, TableProvider, TableProviderFactory}; +use datafusion_common::Result; +use datafusion_expr::CreateExternalTable; + +use super::{create_record_batch, create_test_schema}; +use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec; +use crate::table_provider_factory::FFI_TableProviderFactory; + +#[derive(Debug)] +pub struct TestTableProviderFactory {} + +#[async_trait] +impl TableProviderFactory for TestTableProviderFactory { + async fn create( + &self, + _session: &dyn Session, + _cmd: &CreateExternalTable, + ) -> Result> { + let schema = create_test_schema(); + + // It is useful to create these as multiple record batches + // so that we can demonstrate the FFI stream. + let batches = vec![ + create_record_batch(1, 5), + create_record_batch(6, 1), + create_record_batch(7, 5), + ]; + + let table_provider = MemTable::try_new(schema, vec![batches]).unwrap(); + + Ok(Arc::new(table_provider)) + } +} + +pub(crate) fn create(codec: FFI_LogicalExtensionCodec) -> FFI_TableProviderFactory { + let factory = TestTableProviderFactory {}; + FFI_TableProviderFactory::new_with_ffi_codec(Arc::new(factory), None, codec) +} diff --git a/datafusion/ffi/tests/ffi_integration.rs b/datafusion/ffi/tests/ffi_integration.rs index 2d18679cb018e..80538d4f92fb1 100644 --- a/datafusion/ffi/tests/ffi_integration.rs +++ b/datafusion/ffi/tests/ffi_integration.rs @@ -21,10 +21,15 @@ mod utils; /// when the feature integration-tests is built #[cfg(feature = "integration-tests")] mod tests { + use std::collections::HashMap; use std::sync::Arc; - use datafusion::catalog::TableProvider; + use arrow::datatypes::Schema; + use datafusion::catalog::{TableProvider, TableProviderFactory}; use datafusion::error::{DataFusionError, Result}; + use datafusion_common::TableReference; + use datafusion_common::ToDFSchema; + use datafusion_expr::CreateExternalTable; use datafusion_ffi::tests::create_record_batch; use datafusion_ffi::tests::utils::get_module; @@ -69,4 +74,43 @@ mod tests { async fn sync_test_table_provider() -> Result<()> { test_table_provider(true).await } + + #[tokio::test] + async fn test_table_provider_factory() -> Result<()> { + let table_provider_module = get_module()?; + let (ctx, codec) = super::utils::ctx_and_codec(); + + let ffi_table_provider_factory = table_provider_module + .create_table_factory() + .ok_or(DataFusionError::NotImplemented( + "External table provider factory failed to implement create".to_string(), + ))?(codec); + + let foreign_table_provider_factory: Arc = + (&ffi_table_provider_factory).into(); + + let cmd = CreateExternalTable { + schema: Schema::empty().to_dfschema_ref()?, + name: TableReference::bare("cloned_test"), + location: "test".to_string(), + file_type: "test".to_string(), + table_partition_cols: vec![], + if_not_exists: false, + or_replace: false, + temporary: false, + definition: None, + order_exprs: vec![], + unbounded: false, + options: HashMap::new(), + constraints: Default::default(), + column_defaults: HashMap::new(), + }; + + let provider = foreign_table_provider_factory + .create(&ctx.state(), &cmd) + .await?; + assert_eq!(provider.schema().fields().len(), 2); + + Ok(()) + } } From 8ca3a0fd63d59c08f7e80b4d56ccea40ee7a2bf4 Mon Sep 17 00:00:00 2001 From: "Paul J. Davis" Date: Fri, 20 Feb 2026 13:34:09 -0600 Subject: [PATCH 3/6] Remove pub access in FFI_TableProviderFactory --- datafusion/ffi/src/table_provider_factory.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/datafusion/ffi/src/table_provider_factory.rs b/datafusion/ffi/src/table_provider_factory.rs index 93c432bd8504f..7f73f794bfdc2 100644 --- a/datafusion/ffi/src/table_provider_factory.rs +++ b/datafusion/ffi/src/table_provider_factory.rs @@ -69,26 +69,26 @@ pub struct FFI_TableProviderFactory { cmd_serialized: RVec, ) -> FfiFuture>, - pub logical_codec: FFI_LogicalExtensionCodec, + logical_codec: FFI_LogicalExtensionCodec, /// Used to create a clone of the factory. This should only need to be called /// by the receiver of the factory. - pub clone: unsafe extern "C" fn(factory: &Self) -> Self, + clone: unsafe extern "C" fn(factory: &Self) -> Self, /// Release the memory of the private data when it is no longer being used. - pub release: unsafe extern "C" fn(factory: &mut Self), + release: unsafe extern "C" fn(factory: &mut Self), /// Return the major DataFusion version number of this factory. - pub version: unsafe extern "C" fn() -> u64, + version: unsafe extern "C" fn() -> u64, /// Internal data. This is only to be accessed by the provider of the factory. /// A [`ForeignTableProviderFactory`] should never attempt to access this data. - pub private_data: *mut c_void, + private_data: *mut c_void, /// Utility to identify when FFI objects are accessed locally through /// the foreign interface. See [`crate::get_library_marker_id`] and /// the crate's `README.md` for more information. - pub library_marker_id: extern "C" fn() -> usize, + library_marker_id: extern "C" fn() -> usize, } unsafe impl Send for FFI_TableProviderFactory {} From 1a6bfa4a0812aff88611780fa644b937ab12a9e5 Mon Sep 17 00:00:00 2001 From: "Paul J. Davis" Date: Fri, 20 Feb 2026 15:17:00 -0600 Subject: [PATCH 4/6] Use datafusion_proto conversion functions --- datafusion/ffi/src/table_provider_factory.rs | 215 +++++++------------ 1 file changed, 75 insertions(+), 140 deletions(-) diff --git a/datafusion/ffi/src/table_provider_factory.rs b/datafusion/ffi/src/table_provider_factory.rs index 7f73f794bfdc2..5048a7c1849a7 100644 --- a/datafusion/ffi/src/table_provider_factory.rs +++ b/datafusion/ffi/src/table_provider_factory.rs @@ -24,18 +24,14 @@ use abi_stable::{ use async_ffi::{FfiFuture, FutureExt}; use async_trait::async_trait; use datafusion_catalog::{Session, TableProvider, TableProviderFactory}; -use datafusion_common::TableReference; use datafusion_common::error::{DataFusionError, Result}; use datafusion_execution::TaskContext; -use datafusion_expr::CreateExternalTable; -use datafusion_proto::logical_plan::from_proto::{parse_expr, parse_sorts}; -use datafusion_proto::logical_plan::to_proto::{serialize_expr, serialize_sorts}; +use datafusion_expr::{CreateExternalTable, DdlStatement, LogicalPlan}; use datafusion_proto::logical_plan::{ - DefaultLogicalExtensionCodec, LogicalExtensionCodec, + AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec, }; -use datafusion_proto::protobuf::{CreateExternalTableNode, SortExprNodeCollection}; +use datafusion_proto::protobuf::LogicalPlanNode; use prost::Message; -use std::collections::HashMap; use tokio::runtime::Handle; use crate::execution::FFI_TaskContextProvider; @@ -145,6 +141,24 @@ impl FFI_TableProviderFactory { let private_data = self.private_data as *const FactoryPrivateData; unsafe { &(*private_data).runtime } } + + fn deserialize_cmd( + &self, + cmd_serialized: &RVec, + ) -> Result { + let task_ctx: Arc = + (&self.logical_codec.task_ctx_provider).try_into()?; + let logical_codec: Arc = (&self.logical_codec).into(); + + let plan = LogicalPlanNode::decode(cmd_serialized.as_ref()) + .map_err(|e| DataFusionError::Internal(format!("{e:?}")))?; + match plan.try_into_logical_plan(&task_ctx, logical_codec.as_ref())? { + LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) => Ok(cmd), + _ => Err(DataFusionError::Internal( + "Invalid logical plan in FFI_TableProviderFactory.".to_owned(), + )), + } + } } impl Clone for FFI_TableProviderFactory { @@ -164,107 +178,45 @@ unsafe extern "C" fn create_fn_wrapper( session: FFI_SessionRef, cmd_serialized: RVec, ) -> FfiFuture> { - let task_ctx: Result, DataFusionError> = - (&factory.logical_codec.task_ctx_provider).try_into(); - let runtime = factory.runtime().clone(); - let logical_codec: Arc = (&factory.logical_codec).into(); - let ffi_logical_codec = factory.logical_codec.clone(); - let internal_factory = Arc::clone(factory.inner()); + let factory = factory.clone(); async move { - let mut foreign_session = None; - let session = rresult_return!( - session - .as_local() - .map(Ok::<&(dyn Session + Send + Sync), DataFusionError>) - .unwrap_or_else(|| { - foreign_session = Some(ForeignSession::try_from(&session)?); - Ok(foreign_session.as_ref().unwrap()) - }) + let provider = rresult_return!( + create_fn_wrapper_impl(factory, session, cmd_serialized).await ); - - let task_ctx = rresult_return!(task_ctx); - - let proto_cmd = - rresult_return!(CreateExternalTableNode::decode(cmd_serialized.as_ref())); - - // Deserialize CreateExternalTable from protobuf - let pb_schema = rresult_return!(proto_cmd.schema.ok_or_else(|| { - DataFusionError::Internal( - "CreateExternalTableNode was missing required field schema".to_string(), - ) - })); - - let constraints = rresult_return!(proto_cmd.constraints.ok_or_else(|| { - DataFusionError::Internal( - "CreateExternalTableNode was missing required field constraints" - .to_string(), - ) - })); - - let definition = if !proto_cmd.definition.is_empty() { - Some(proto_cmd.definition) - } else { - None - }; - - let mut order_exprs = Vec::with_capacity(proto_cmd.order_exprs.len()); - for expr in &proto_cmd.order_exprs { - let sorts = rresult_return!(parse_sorts( - &expr.sort_expr_nodes, - task_ctx.as_ref(), - logical_codec.as_ref(), - )); - order_exprs.push(sorts); - } - - let mut column_defaults = HashMap::with_capacity(proto_cmd.column_defaults.len()); - for (col_name, expr) in &proto_cmd.column_defaults { - let expr = rresult_return!(parse_expr( - expr, - task_ctx.as_ref(), - logical_codec.as_ref() - )); - column_defaults.insert(col_name.clone(), expr); - } - - let table_ref = rresult_return!(proto_cmd.name.as_ref().ok_or_else(|| { - DataFusionError::Internal( - "CreateExternalTableNode was missing required field name".to_string(), - ) - })); - - let name: TableReference = rresult_return!(table_ref.clone().try_into()); - - let cmd = CreateExternalTable { - schema: rresult_return!(pb_schema.try_into()), - name, - location: proto_cmd.location, - file_type: proto_cmd.file_type, - table_partition_cols: proto_cmd.table_partition_cols, - order_exprs, - if_not_exists: proto_cmd.if_not_exists, - or_replace: proto_cmd.or_replace, - temporary: proto_cmd.temporary, - definition, - unbounded: proto_cmd.unbounded, - options: proto_cmd.options, - constraints: constraints.into(), - column_defaults, - }; - - let provider = rresult_return!(internal_factory.create(session, &cmd).await); - - RResult::ROk(FFI_TableProvider::new_with_ffi_codec( - provider, - true, - runtime.clone(), - ffi_logical_codec, - )) + RResult::ROk(provider) } .into_ffi() } +async fn create_fn_wrapper_impl( + factory: FFI_TableProviderFactory, + session: FFI_SessionRef, + cmd_serialized: RVec, +) -> Result { + let runtime = factory.runtime().clone(); + let ffi_logical_codec = factory.logical_codec.clone(); + let internal_factory = Arc::clone(factory.inner()); + let cmd = factory.deserialize_cmd(&cmd_serialized)?; + + let mut foreign_session = None; + let session = session + .as_local() + .map(Ok::<&(dyn Session + Send + Sync), DataFusionError>) + .unwrap_or_else(|| { + foreign_session = Some(ForeignSession::try_from(&session)?); + Ok(foreign_session.as_ref().unwrap()) + })?; + + let provider = internal_factory.create(session, &cmd).await?; + Ok(FFI_TableProvider::new_with_ffi_codec( + provider, + true, + runtime.clone(), + ffi_logical_codec, + )) +} + unsafe extern "C" fn clone_fn_wrapper( factory: &FFI_TableProviderFactory, ) -> FFI_TableProviderFactory { @@ -303,6 +255,25 @@ unsafe extern "C" fn release_fn_wrapper(factory: &mut FFI_TableProviderFactory) #[derive(Debug)] pub struct ForeignTableProviderFactory(pub FFI_TableProviderFactory); +impl ForeignTableProviderFactory { + fn serialize_cmd( + &self, + cmd: CreateExternalTable, + ) -> Result, DataFusionError> { + let logical_codec: Arc = + (&self.0.logical_codec).into(); + + let plan = LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)); + let plan: LogicalPlanNode = + AsLogicalPlan::try_from_logical_plan(&plan, logical_codec.as_ref())?; + + let mut buf: Vec = Vec::new(); + plan.try_encode(&mut buf)?; + + Ok(buf.into()) + } +} + impl From<&FFI_TableProviderFactory> for Arc { fn from(factory: &FFI_TableProviderFactory) -> Self { if (factory.library_marker_id)() == crate::get_library_marker_id() { @@ -324,46 +295,10 @@ impl TableProviderFactory for ForeignTableProviderFactory { cmd: &CreateExternalTable, ) -> Result> { let session = FFI_SessionRef::new(session, None, self.0.logical_codec.clone()); - - let codec: Arc = (&self.0.logical_codec).into(); - - // Serialize CreateExternalTable to protobuf - let mut converted_order_exprs: Vec = vec![]; - for order in &cmd.order_exprs { - let temp = SortExprNodeCollection { - sort_expr_nodes: serialize_sorts(order, codec.as_ref())?, - }; - converted_order_exprs.push(temp); - } - - let mut converted_column_defaults = - HashMap::with_capacity(cmd.column_defaults.len()); - for (col_name, expr) in &cmd.column_defaults { - converted_column_defaults - .insert(col_name.clone(), serialize_expr(expr, codec.as_ref())?); - } - - let proto_cmd = CreateExternalTableNode { - name: Some(cmd.name.clone().into()), - location: cmd.location.clone(), - file_type: cmd.file_type.clone(), - schema: Some(cmd.schema.as_ref().try_into()?), - table_partition_cols: cmd.table_partition_cols.clone(), - if_not_exists: cmd.if_not_exists, - or_replace: cmd.or_replace, - temporary: cmd.temporary, - order_exprs: converted_order_exprs, - definition: cmd.definition.clone().unwrap_or_default(), - unbounded: cmd.unbounded, - options: cmd.options.clone(), - constraints: Some(cmd.constraints.clone().into()), - column_defaults: converted_column_defaults, - }; - - let cmd_serialized = proto_cmd.encode_to_vec().into(); + let cmd = self.serialize_cmd(cmd.clone())?; let provider = unsafe { - let maybe_provider = (self.0.create)(&self.0, session, cmd_serialized).await; + let maybe_provider = (self.0.create)(&self.0, session, cmd).await; let ffi_provider = df_result!(maybe_provider)?; ForeignTableProvider(ffi_provider) @@ -377,7 +312,7 @@ impl TableProviderFactory for ForeignTableProviderFactory { mod tests { use arrow::datatypes::Schema; use datafusion::prelude::SessionContext; - use datafusion_common::ToDFSchema; + use datafusion_common::{TableReference, ToDFSchema}; use datafusion_execution::TaskContextProvider; use std::collections::HashMap; From 92f352b120ab1868acce406e9b46635d54b6aa8e Mon Sep 17 00:00:00 2001 From: "Paul J. Davis" Date: Fri, 20 Feb 2026 15:26:28 -0600 Subject: [PATCH 5/6] Force the creation of a ForeignTableProviderFactory --- datafusion/ffi/src/table_provider_factory.rs | 24 +++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/datafusion/ffi/src/table_provider_factory.rs b/datafusion/ffi/src/table_provider_factory.rs index 5048a7c1849a7..38b0a41c72179 100644 --- a/datafusion/ffi/src/table_provider_factory.rs +++ b/datafusion/ffi/src/table_provider_factory.rs @@ -173,6 +173,16 @@ impl Drop for FFI_TableProviderFactory { } } +impl From<&FFI_TableProviderFactory> for Arc { + fn from(factory: &FFI_TableProviderFactory) -> Self { + if (factory.library_marker_id)() == crate::get_library_marker_id() { + Arc::clone(factory.inner()) as Arc + } else { + Arc::new(ForeignTableProviderFactory(factory.clone())) + } + } +} + unsafe extern "C" fn create_fn_wrapper( factory: &FFI_TableProviderFactory, session: FFI_SessionRef, @@ -274,16 +284,6 @@ impl ForeignTableProviderFactory { } } -impl From<&FFI_TableProviderFactory> for Arc { - fn from(factory: &FFI_TableProviderFactory) -> Self { - if (factory.library_marker_id)() == crate::get_library_marker_id() { - Arc::clone(factory.inner()) as Arc - } else { - Arc::new(ForeignTableProviderFactory(factory.clone())) - } - } -} - unsafe impl Send for ForeignTableProviderFactory {} unsafe impl Sync for ForeignTableProviderFactory {} @@ -360,8 +360,10 @@ mod tests { let task_ctx_provider = FFI_TaskContextProvider::from(&task_ctx_provider); let factory = Arc::new(TestTableProviderFactory {}); - let ffi_factory = + let mut ffi_factory = FFI_TableProviderFactory::new(factory, None, task_ctx_provider, None); + ffi_factory.library_marker_id = crate::mock_foreign_marker_id; + let factory: Arc = (&ffi_factory).into(); let cmd = CreateExternalTable { From eba0ca46abf481f433d1aa64a423c33ad78a097a Mon Sep 17 00:00:00 2001 From: "Paul J. Davis" Date: Fri, 20 Feb 2026 15:38:52 -0600 Subject: [PATCH 6/6] Fix doc comment --- datafusion/ffi/src/table_provider_factory.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/ffi/src/table_provider_factory.rs b/datafusion/ffi/src/table_provider_factory.rs index 38b0a41c72179..15789eeab0421 100644 --- a/datafusion/ffi/src/table_provider_factory.rs +++ b/datafusion/ffi/src/table_provider_factory.rs @@ -57,7 +57,7 @@ pub struct FFI_TableProviderFactory { /// /// * `factory` - the table provider factory /// * `session_config` - session configuration - /// * `cmd_serialized` - a [`CreateExternalTableNode`] protobuf message serialized into bytes + /// * `cmd_serialized` - a ['CreateExternalTable`] encoded as a [`LogicalPlanNode`] protobuf message serialized into bytes /// to pass across the FFI boundary. create: unsafe extern "C" fn( factory: &Self,