diff --git a/datafusion/ffi/src/lib.rs b/datafusion/ffi/src/lib.rs index 2ca9b8f6f495a..5eb3626db1e7b 100644 --- a/datafusion/ffi/src/lib.rs +++ b/datafusion/ffi/src/lib.rs @@ -39,6 +39,7 @@ pub mod record_batch_stream; pub mod schema_provider; pub mod session; pub mod table_provider; +pub mod table_provider_factory; pub mod table_source; pub mod udaf; pub mod udf; diff --git a/datafusion/ffi/src/table_provider_factory.rs b/datafusion/ffi/src/table_provider_factory.rs new file mode 100644 index 0000000000000..15789eeab0421 --- /dev/null +++ b/datafusion/ffi/src/table_provider_factory.rs @@ -0,0 +1,429 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{ffi::c_void, sync::Arc}; + +use abi_stable::{ + StableAbi, + std_types::{RResult, RString, RVec}, +}; +use async_ffi::{FfiFuture, FutureExt}; +use async_trait::async_trait; +use datafusion_catalog::{Session, TableProvider, TableProviderFactory}; +use datafusion_common::error::{DataFusionError, Result}; +use datafusion_execution::TaskContext; +use datafusion_expr::{CreateExternalTable, DdlStatement, LogicalPlan}; +use datafusion_proto::logical_plan::{ + AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec, +}; +use datafusion_proto::protobuf::LogicalPlanNode; +use prost::Message; +use tokio::runtime::Handle; + +use crate::execution::FFI_TaskContextProvider; +use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec; +use crate::session::{FFI_SessionRef, ForeignSession}; +use crate::table_provider::{FFI_TableProvider, ForeignTableProvider}; +use crate::{df_result, rresult_return}; + +/// A stable struct for sharing [`TableProviderFactory`] across FFI boundaries. +/// +/// Similar to [`FFI_TableProvider`], this struct uses the FFI-safe pattern where: +/// - The `FFI_*` struct exposes stable function pointers +/// - Private data is stored as an opaque pointer +/// - The `Foreign*` wrapper is used by consumers on the other side of the FFI boundary +/// +/// [`FFI_TableProvider`]: crate::table_provider::FFI_TableProvider +#[repr(C)] +#[derive(Debug, StableAbi)] +pub struct FFI_TableProviderFactory { + /// Create a TableProvider with the given command. + /// + /// # Arguments + /// + /// * `factory` - the table provider factory + /// * `session_config` - session configuration + /// * `cmd_serialized` - a ['CreateExternalTable`] encoded as a [`LogicalPlanNode`] protobuf message serialized into bytes + /// to pass across the FFI boundary. + create: unsafe extern "C" fn( + factory: &Self, + session: FFI_SessionRef, + cmd_serialized: RVec, + ) -> FfiFuture>, + + logical_codec: FFI_LogicalExtensionCodec, + + /// Used to create a clone of the factory. This should only need to be called + /// by the receiver of the factory. + clone: unsafe extern "C" fn(factory: &Self) -> Self, + + /// Release the memory of the private data when it is no longer being used. + release: unsafe extern "C" fn(factory: &mut Self), + + /// Return the major DataFusion version number of this factory. + version: unsafe extern "C" fn() -> u64, + + /// Internal data. This is only to be accessed by the provider of the factory. + /// A [`ForeignTableProviderFactory`] should never attempt to access this data. + private_data: *mut c_void, + + /// Utility to identify when FFI objects are accessed locally through + /// the foreign interface. See [`crate::get_library_marker_id`] and + /// the crate's `README.md` for more information. + library_marker_id: extern "C" fn() -> usize, +} + +unsafe impl Send for FFI_TableProviderFactory {} +unsafe impl Sync for FFI_TableProviderFactory {} + +struct FactoryPrivateData { + factory: Arc, + runtime: Option, +} + +impl FFI_TableProviderFactory { + /// Creates a new [`FFI_TableProvider`]. + pub fn new( + factory: Arc, + runtime: Option, + task_ctx_provider: impl Into, + logical_codec: Option>, + ) -> Self { + let task_ctx_provider = task_ctx_provider.into(); + let logical_codec = + logical_codec.unwrap_or_else(|| Arc::new(DefaultLogicalExtensionCodec {})); + let logical_codec = FFI_LogicalExtensionCodec::new( + logical_codec, + runtime.clone(), + task_ctx_provider.clone(), + ); + Self::new_with_ffi_codec(factory, runtime, logical_codec) + } + + pub fn new_with_ffi_codec( + factory: Arc, + runtime: Option, + logical_codec: FFI_LogicalExtensionCodec, + ) -> Self { + let private_data = Box::new(FactoryPrivateData { factory, runtime }); + + Self { + create: create_fn_wrapper, + logical_codec, + clone: clone_fn_wrapper, + release: release_fn_wrapper, + version: super::version, + private_data: Box::into_raw(private_data) as *mut c_void, + library_marker_id: crate::get_library_marker_id, + } + } + + fn inner(&self) -> &Arc { + let private_data = self.private_data as *const FactoryPrivateData; + unsafe { &(*private_data).factory } + } + + fn runtime(&self) -> &Option { + let private_data = self.private_data as *const FactoryPrivateData; + unsafe { &(*private_data).runtime } + } + + fn deserialize_cmd( + &self, + cmd_serialized: &RVec, + ) -> Result { + let task_ctx: Arc = + (&self.logical_codec.task_ctx_provider).try_into()?; + let logical_codec: Arc = (&self.logical_codec).into(); + + let plan = LogicalPlanNode::decode(cmd_serialized.as_ref()) + .map_err(|e| DataFusionError::Internal(format!("{e:?}")))?; + match plan.try_into_logical_plan(&task_ctx, logical_codec.as_ref())? { + LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) => Ok(cmd), + _ => Err(DataFusionError::Internal( + "Invalid logical plan in FFI_TableProviderFactory.".to_owned(), + )), + } + } +} + +impl Clone for FFI_TableProviderFactory { + fn clone(&self) -> Self { + unsafe { (self.clone)(self) } + } +} + +impl Drop for FFI_TableProviderFactory { + fn drop(&mut self) { + unsafe { (self.release)(self) } + } +} + +impl From<&FFI_TableProviderFactory> for Arc { + fn from(factory: &FFI_TableProviderFactory) -> Self { + if (factory.library_marker_id)() == crate::get_library_marker_id() { + Arc::clone(factory.inner()) as Arc + } else { + Arc::new(ForeignTableProviderFactory(factory.clone())) + } + } +} + +unsafe extern "C" fn create_fn_wrapper( + factory: &FFI_TableProviderFactory, + session: FFI_SessionRef, + cmd_serialized: RVec, +) -> FfiFuture> { + let factory = factory.clone(); + + async move { + let provider = rresult_return!( + create_fn_wrapper_impl(factory, session, cmd_serialized).await + ); + RResult::ROk(provider) + } + .into_ffi() +} + +async fn create_fn_wrapper_impl( + factory: FFI_TableProviderFactory, + session: FFI_SessionRef, + cmd_serialized: RVec, +) -> Result { + let runtime = factory.runtime().clone(); + let ffi_logical_codec = factory.logical_codec.clone(); + let internal_factory = Arc::clone(factory.inner()); + let cmd = factory.deserialize_cmd(&cmd_serialized)?; + + let mut foreign_session = None; + let session = session + .as_local() + .map(Ok::<&(dyn Session + Send + Sync), DataFusionError>) + .unwrap_or_else(|| { + foreign_session = Some(ForeignSession::try_from(&session)?); + Ok(foreign_session.as_ref().unwrap()) + })?; + + let provider = internal_factory.create(session, &cmd).await?; + Ok(FFI_TableProvider::new_with_ffi_codec( + provider, + true, + runtime.clone(), + ffi_logical_codec, + )) +} + +unsafe extern "C" fn clone_fn_wrapper( + factory: &FFI_TableProviderFactory, +) -> FFI_TableProviderFactory { + let runtime = factory.runtime().clone(); + let old_factory = Arc::clone(factory.inner()); + + let private_data = Box::into_raw(Box::new(FactoryPrivateData { + factory: old_factory, + runtime, + })) as *mut c_void; + + FFI_TableProviderFactory { + create: create_fn_wrapper, + logical_codec: factory.logical_codec.clone(), + clone: clone_fn_wrapper, + release: release_fn_wrapper, + version: super::version, + private_data, + library_marker_id: crate::get_library_marker_id, + } +} + +unsafe extern "C" fn release_fn_wrapper(factory: &mut FFI_TableProviderFactory) { + unsafe { + debug_assert!(!factory.private_data.is_null()); + let private_data = Box::from_raw(factory.private_data as *mut FactoryPrivateData); + drop(private_data); + factory.private_data = std::ptr::null_mut(); + } +} + +/// This wrapper struct exists on the receiver side of the FFI interface, so it has +/// no guarantees about being able to access the data in `private_data`. Any functions +/// defined on this struct must only use the stable functions provided in +/// FFI_TableProviderFactory to interact with the foreign table provider factory. +#[derive(Debug)] +pub struct ForeignTableProviderFactory(pub FFI_TableProviderFactory); + +impl ForeignTableProviderFactory { + fn serialize_cmd( + &self, + cmd: CreateExternalTable, + ) -> Result, DataFusionError> { + let logical_codec: Arc = + (&self.0.logical_codec).into(); + + let plan = LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)); + let plan: LogicalPlanNode = + AsLogicalPlan::try_from_logical_plan(&plan, logical_codec.as_ref())?; + + let mut buf: Vec = Vec::new(); + plan.try_encode(&mut buf)?; + + Ok(buf.into()) + } +} + +unsafe impl Send for ForeignTableProviderFactory {} +unsafe impl Sync for ForeignTableProviderFactory {} + +#[async_trait] +impl TableProviderFactory for ForeignTableProviderFactory { + async fn create( + &self, + session: &dyn Session, + cmd: &CreateExternalTable, + ) -> Result> { + let session = FFI_SessionRef::new(session, None, self.0.logical_codec.clone()); + let cmd = self.serialize_cmd(cmd.clone())?; + + let provider = unsafe { + let maybe_provider = (self.0.create)(&self.0, session, cmd).await; + + let ffi_provider = df_result!(maybe_provider)?; + ForeignTableProvider(ffi_provider) + }; + + Ok(Arc::new(provider)) + } +} + +#[cfg(test)] +mod tests { + use arrow::datatypes::Schema; + use datafusion::prelude::SessionContext; + use datafusion_common::{TableReference, ToDFSchema}; + use datafusion_execution::TaskContextProvider; + use std::collections::HashMap; + + use super::*; + + #[derive(Debug)] + struct TestTableProviderFactory {} + + #[async_trait] + impl TableProviderFactory for TestTableProviderFactory { + async fn create( + &self, + _session: &dyn Session, + _cmd: &CreateExternalTable, + ) -> Result> { + use arrow::datatypes::Field; + use datafusion::arrow::array::Float32Array; + use datafusion::arrow::datatypes::DataType; + use datafusion::arrow::record_batch::RecordBatch; + use datafusion::datasource::MemTable; + + let schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)])); + + let batch1 = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Float32Array::from(vec![2.0, 4.0, 8.0]))], + )?; + let batch2 = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Float32Array::from(vec![64.0]))], + )?; + + Ok(Arc::new(MemTable::try_new( + schema, + vec![vec![batch1], vec![batch2]], + )?)) + } + } + + #[tokio::test] + async fn test_round_trip_ffi_table_provider_factory() -> Result<()> { + let ctx = Arc::new(SessionContext::new()); + let task_ctx_provider = Arc::clone(&ctx) as Arc; + let task_ctx_provider = FFI_TaskContextProvider::from(&task_ctx_provider); + + let factory = Arc::new(TestTableProviderFactory {}); + let mut ffi_factory = + FFI_TableProviderFactory::new(factory, None, task_ctx_provider, None); + ffi_factory.library_marker_id = crate::mock_foreign_marker_id; + + let factory: Arc = (&ffi_factory).into(); + + let cmd = CreateExternalTable { + schema: Schema::empty().to_dfschema_ref()?, + name: TableReference::bare("test_table"), + location: "test".to_string(), + file_type: "test".to_string(), + table_partition_cols: vec![], + if_not_exists: false, + or_replace: false, + temporary: false, + definition: None, + order_exprs: vec![], + unbounded: false, + options: HashMap::new(), + constraints: Default::default(), + column_defaults: HashMap::new(), + }; + + let provider = factory.create(&ctx.state(), &cmd).await?; + + assert_eq!(provider.schema().fields().len(), 1); + + Ok(()) + } + + #[tokio::test] + async fn test_ffi_table_provider_factory_clone() -> Result<()> { + let ctx = Arc::new(SessionContext::new()); + let task_ctx_provider = Arc::clone(&ctx) as Arc; + let task_ctx_provider = FFI_TaskContextProvider::from(&task_ctx_provider); + + let factory = Arc::new(TestTableProviderFactory {}); + let ffi_factory = + FFI_TableProviderFactory::new(factory, None, task_ctx_provider, None); + + // Test that we can clone the factory + let cloned_factory = ffi_factory.clone(); + let factory: Arc = (&cloned_factory).into(); + + let cmd = CreateExternalTable { + schema: Schema::empty().to_dfschema_ref()?, + name: TableReference::bare("cloned_test"), + location: "test".to_string(), + file_type: "test".to_string(), + table_partition_cols: vec![], + if_not_exists: false, + or_replace: false, + temporary: false, + definition: None, + order_exprs: vec![], + unbounded: false, + options: HashMap::new(), + constraints: Default::default(), + column_defaults: HashMap::new(), + }; + + let provider = factory.create(&ctx.state(), &cmd).await?; + assert_eq!(provider.schema().fields().len(), 1); + + Ok(()) + } +} diff --git a/datafusion/ffi/src/tests/mod.rs b/datafusion/ffi/src/tests/mod.rs index 9bcd7e0031083..c936330668468 100644 --- a/datafusion/ffi/src/tests/mod.rs +++ b/datafusion/ffi/src/tests/mod.rs @@ -34,19 +34,21 @@ use udf_udaf_udwf::{ create_ffi_stddev_func, create_ffi_sum_func, create_ffi_table_func, }; -use super::table_provider::FFI_TableProvider; -use super::udf::FFI_ScalarUDF; use crate::catalog_provider::FFI_CatalogProvider; use crate::catalog_provider_list::FFI_CatalogProviderList; use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec; +use crate::table_provider::FFI_TableProvider; +use crate::table_provider_factory::FFI_TableProviderFactory; use crate::tests::catalog::create_catalog_provider_list; use crate::udaf::FFI_AggregateUDF; +use crate::udf::FFI_ScalarUDF; use crate::udtf::FFI_TableFunction; use crate::udwf::FFI_WindowUDF; mod async_provider; pub mod catalog; mod sync_provider; +mod table_provider_factory; mod udf_udaf_udwf; pub mod utils; @@ -71,6 +73,10 @@ pub struct ForeignLibraryModule { codec: FFI_LogicalExtensionCodec, ) -> FFI_TableProvider, + /// Constructs the table provider factory + pub create_table_factory: + extern "C" fn(codec: FFI_LogicalExtensionCodec) -> FFI_TableProviderFactory, + /// Create a scalar UDF pub create_scalar_udf: extern "C" fn() -> FFI_ScalarUDF, @@ -128,6 +134,14 @@ extern "C" fn construct_table_provider( } } +/// Here we only wish to create a simple table provider as an example. +/// We create an in-memory table and convert it to it's FFI counterpart. +extern "C" fn construct_table_provider_factory( + codec: FFI_LogicalExtensionCodec, +) -> FFI_TableProviderFactory { + table_provider_factory::create(codec) +} + #[export_root_module] /// This defines the entry point for using the module. pub fn get_foreign_library_module() -> ForeignLibraryModuleRef { @@ -135,6 +149,7 @@ pub fn get_foreign_library_module() -> ForeignLibraryModuleRef { create_catalog: create_catalog_provider, create_catalog_list: create_catalog_provider_list, create_table: construct_table_provider, + create_table_factory: construct_table_provider_factory, create_scalar_udf: create_ffi_abs_func, create_nullary_udf: create_ffi_random_func, create_table_function: create_ffi_table_func, diff --git a/datafusion/ffi/src/tests/table_provider_factory.rs b/datafusion/ffi/src/tests/table_provider_factory.rs new file mode 100644 index 0000000000000..29af6aacf6484 --- /dev/null +++ b/datafusion/ffi/src/tests/table_provider_factory.rs @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use datafusion_catalog::{MemTable, Session, TableProvider, TableProviderFactory}; +use datafusion_common::Result; +use datafusion_expr::CreateExternalTable; + +use super::{create_record_batch, create_test_schema}; +use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec; +use crate::table_provider_factory::FFI_TableProviderFactory; + +#[derive(Debug)] +pub struct TestTableProviderFactory {} + +#[async_trait] +impl TableProviderFactory for TestTableProviderFactory { + async fn create( + &self, + _session: &dyn Session, + _cmd: &CreateExternalTable, + ) -> Result> { + let schema = create_test_schema(); + + // It is useful to create these as multiple record batches + // so that we can demonstrate the FFI stream. + let batches = vec![ + create_record_batch(1, 5), + create_record_batch(6, 1), + create_record_batch(7, 5), + ]; + + let table_provider = MemTable::try_new(schema, vec![batches]).unwrap(); + + Ok(Arc::new(table_provider)) + } +} + +pub(crate) fn create(codec: FFI_LogicalExtensionCodec) -> FFI_TableProviderFactory { + let factory = TestTableProviderFactory {}; + FFI_TableProviderFactory::new_with_ffi_codec(Arc::new(factory), None, codec) +} diff --git a/datafusion/ffi/tests/ffi_integration.rs b/datafusion/ffi/tests/ffi_integration.rs index 2d18679cb018e..80538d4f92fb1 100644 --- a/datafusion/ffi/tests/ffi_integration.rs +++ b/datafusion/ffi/tests/ffi_integration.rs @@ -21,10 +21,15 @@ mod utils; /// when the feature integration-tests is built #[cfg(feature = "integration-tests")] mod tests { + use std::collections::HashMap; use std::sync::Arc; - use datafusion::catalog::TableProvider; + use arrow::datatypes::Schema; + use datafusion::catalog::{TableProvider, TableProviderFactory}; use datafusion::error::{DataFusionError, Result}; + use datafusion_common::TableReference; + use datafusion_common::ToDFSchema; + use datafusion_expr::CreateExternalTable; use datafusion_ffi::tests::create_record_batch; use datafusion_ffi::tests::utils::get_module; @@ -69,4 +74,43 @@ mod tests { async fn sync_test_table_provider() -> Result<()> { test_table_provider(true).await } + + #[tokio::test] + async fn test_table_provider_factory() -> Result<()> { + let table_provider_module = get_module()?; + let (ctx, codec) = super::utils::ctx_and_codec(); + + let ffi_table_provider_factory = table_provider_module + .create_table_factory() + .ok_or(DataFusionError::NotImplemented( + "External table provider factory failed to implement create".to_string(), + ))?(codec); + + let foreign_table_provider_factory: Arc = + (&ffi_table_provider_factory).into(); + + let cmd = CreateExternalTable { + schema: Schema::empty().to_dfschema_ref()?, + name: TableReference::bare("cloned_test"), + location: "test".to_string(), + file_type: "test".to_string(), + table_partition_cols: vec![], + if_not_exists: false, + or_replace: false, + temporary: false, + definition: None, + order_exprs: vec![], + unbounded: false, + options: HashMap::new(), + constraints: Default::default(), + column_defaults: HashMap::new(), + }; + + let provider = foreign_table_provider_factory + .create(&ctx.state(), &cmd) + .await?; + assert_eq!(provider.schema().fields().len(), 2); + + Ok(()) + } }