Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions datafusion/catalog-listing/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1079,5 +1079,9 @@ mod tests {
fn table_options_mut(&mut self) -> &mut TableOptions {
unimplemented!()
}

fn task_ctx(&self) -> Arc<datafusion_execution::TaskContext> {
unimplemented!()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,17 @@
use std::sync::Arc;
use std::{any::Any, borrow::Cow};

use crate::Session;
use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use datafusion_catalog::Session;
use datafusion_physical_plan::work_table::WorkTableExec;

use crate::{
error::Result,
logical_expr::{Expr, LogicalPlan, TableProviderFilterPushDown},
physical_plan::ExecutionPlan,
};
use datafusion_physical_plan::ExecutionPlan;

use crate::datasource::{TableProvider, TableType};
use datafusion_common::error::Result;
use datafusion_expr::{Expr, LogicalPlan, TableProviderFilterPushDown, TableType};

use crate::TableProvider;

/// The temporary working table where the previous iteration of a recursive query is stored
/// Naming is based on PostgreSQL's implementation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use std::sync::Arc;
use std::{any::Any, borrow::Cow};

use crate::datasource::TableProvider;
use crate::TableProvider;

use arrow::datatypes::SchemaRef;
use datafusion_common::{internal_err, Constraints};
Expand Down Expand Up @@ -133,7 +133,7 @@ fn preserves_table_type() {

async fn scan(
&self,
_: &dyn datafusion_catalog::Session,
_: &dyn crate::Session,
_: Option<&Vec<usize>>,
_: &[Expr],
_: Option<usize>,
Expand Down
2 changes: 2 additions & 0 deletions datafusion/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ pub use r#async::*;
pub use schema::*;
pub use session::*;
pub use table::*;
pub mod cte_worktable;
pub mod default_table_source;
pub mod stream;
pub mod streaming;
pub mod view;
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@
//! [`MemoryCatalogProvider`], [`MemoryCatalogProviderList`]: In-memory
//! implementations of [`CatalogProviderList`] and [`CatalogProvider`].

use crate::{CatalogProvider, CatalogProviderList, SchemaProvider, TableProvider};
use async_trait::async_trait;
use crate::{CatalogProvider, CatalogProviderList, SchemaProvider};
use dashmap::DashMap;
use datafusion_common::{exec_err, DataFusionError};
use datafusion_common::exec_err;
use std::any::Any;
use std::sync::Arc;

Expand Down Expand Up @@ -134,67 +133,3 @@ impl CatalogProvider for MemoryCatalogProvider {
}
}
}

/// Simple in-memory implementation of a schema.
#[derive(Debug)]
pub struct MemorySchemaProvider {
tables: DashMap<String, Arc<dyn TableProvider>>,
}

impl MemorySchemaProvider {
/// Instantiates a new MemorySchemaProvider with an empty collection of tables.
pub fn new() -> Self {
Self {
tables: DashMap::new(),
}
}
}

impl Default for MemorySchemaProvider {
fn default() -> Self {
Self::new()
}
}

#[async_trait]
impl SchemaProvider for MemorySchemaProvider {
fn as_any(&self) -> &dyn Any {
self
}

fn table_names(&self) -> Vec<String> {
self.tables
.iter()
.map(|table| table.key().clone())
.collect()
}

async fn table(
&self,
name: &str,
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
Ok(self.tables.get(name).map(|table| Arc::clone(table.value())))
}

fn register_table(
&self,
name: String,
table: Arc<dyn TableProvider>,
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
if self.table_exist(name.as_str()) {
return exec_err!("The table {name} already exists");
}
Ok(self.tables.insert(name, table))
}

fn deregister_table(
&self,
name: &str,
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
Ok(self.tables.remove(name).map(|(_, table)| table))
}

fn table_exist(&self, name: &str) -> bool {
self.tables.contains_key(name)
}
}
22 changes: 22 additions & 0 deletions datafusion/catalog/src/memory/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

pub(crate) mod catalog;
pub(crate) mod schema;

pub use catalog::*;
pub use schema::*;
89 changes: 89 additions & 0 deletions datafusion/catalog/src/memory/schema.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! [`MemorySchemaProvider`]: In-memory implementations of [`SchemaProvider`].

use crate::{SchemaProvider, TableProvider};
use async_trait::async_trait;
use dashmap::DashMap;
use datafusion_common::{exec_err, DataFusionError};
use std::any::Any;
use std::sync::Arc;

/// Simple in-memory implementation of a schema.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is nice to break this out into a new module

#[derive(Debug)]
pub struct MemorySchemaProvider {
tables: DashMap<String, Arc<dyn TableProvider>>,
}

impl MemorySchemaProvider {
/// Instantiates a new MemorySchemaProvider with an empty collection of tables.
pub fn new() -> Self {
Self {
tables: DashMap::new(),
}
}
}

impl Default for MemorySchemaProvider {
fn default() -> Self {
Self::new()
}
}

#[async_trait]
impl SchemaProvider for MemorySchemaProvider {
fn as_any(&self) -> &dyn Any {
self
}

fn table_names(&self) -> Vec<String> {
self.tables
.iter()
.map(|table| table.key().clone())
.collect()
}

async fn table(
&self,
name: &str,
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
Ok(self.tables.get(name).map(|table| Arc::clone(table.value())))
}

fn register_table(
&self,
name: String,
table: Arc<dyn TableProvider>,
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
if self.table_exist(name.as_str()) {
return exec_err!("The table {name} already exists");
}
Ok(self.tables.insert(name, table))
}

fn deregister_table(
&self,
name: &str,
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
Ok(self.tables.remove(name).map(|(_, table)| table))
}

fn table_exist(&self, name: &str) -> bool {
self.tables.contains_key(name)
}
}
3 changes: 3 additions & 0 deletions datafusion/catalog/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ pub trait Session: Send + Sync {

/// Returns a mutable reference to [`TableOptions`]
fn table_options_mut(&mut self) -> &mut TableOptions;

/// Get a new TaskContext to run in this session
fn task_ctx(&self) -> Arc<TaskContext>;
}

/// Create a new task context instance from Session
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use std::sync::Arc;

use crate::datasource::{TableProvider, TableType};
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::logical_expr::Expr;
use crate::physical_plan::insert::{DataSink, DataSinkExec};
use crate::physical_plan::repartition::RepartitionExec;
Expand Down Expand Up @@ -129,7 +128,7 @@ impl MemTable {
pub async fn load(
t: Arc<dyn TableProvider>,
output_partitions: Option<usize>,
state: &SessionState,
state: &dyn Session,
) -> Result<Self> {
let schema = t.schema();
let constraints = t.constraints();
Expand Down Expand Up @@ -267,6 +266,8 @@ impl TableProvider for MemTable {
/// # Returns
///
/// * A plan that returns the number of rows written.
///
/// [`SessionState`]: crate::execution::context::SessionState
async fn insert_into(
&self,
_state: &dyn Session,
Expand Down
13 changes: 6 additions & 7 deletions datafusion/core/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
//!
//! [`ListingTable`]: crate::datasource::listing::ListingTable

pub mod cte_worktable;
pub mod default_table_source;
pub mod dynamic_file;
pub mod empty;
pub mod file_format;
Expand All @@ -32,11 +30,6 @@ pub mod provider;
mod statistics;
mod view_test;

pub use datafusion_catalog::stream;
pub use datafusion_catalog::view;
pub use datafusion_datasource::schema_adapter;
pub use datafusion_datasource::source;

// backwards compatibility
pub use self::default_table_source::{
provider_as_source, source_as_provider, DefaultTableSource,
Expand All @@ -45,6 +38,12 @@ pub use self::memory::MemTable;
pub use self::view::ViewTable;
pub use crate::catalog::TableProvider;
pub use crate::logical_expr::TableType;
pub use datafusion_catalog::cte_worktable;
pub use datafusion_catalog::default_table_source;
pub use datafusion_catalog::stream;
pub use datafusion_catalog::view;
pub use datafusion_datasource::schema_adapter;
pub use datafusion_datasource::source;
pub use datafusion_execution::object_store;
pub use datafusion_physical_expr::create_ordering;
pub use statistics::get_statistics_with_limit;
Expand Down
2 changes: 0 additions & 2 deletions datafusion/core/src/execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,4 @@ pub use session_state_defaults::SessionStateDefaults;

// backwards compatibility
pub use crate::datasource::file_format::options;

// backwards compatibility
pub use datafusion_execution::*;
4 changes: 4 additions & 0 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,10 @@ impl Session for SessionState {
fn table_options_mut(&mut self) -> &mut TableOptions {
self.table_options_mut()
}

fn task_ctx(&self) -> Arc<TaskContext> {
self.task_ctx()
}
}

impl SessionState {
Expand Down
Loading