From 10d7a2d41c7b581cf5c04a2d807dbcbd1f5929ba Mon Sep 17 00:00:00 2001 From: Adam Reeve Date: Wed, 21 May 2025 15:53:03 +1200 Subject: [PATCH 1/2] Add example of what Parquet encryption config might look like --- datafusion-examples/Cargo.toml | 2 + .../examples/parquet_encryption_with_kms.rs | 205 ++++++++++++++++++ 2 files changed, 207 insertions(+) create mode 100644 datafusion-examples/examples/parquet_encryption_with_kms.rs diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index b31708a5c1cc7..bdc95d6bb8fce 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -69,6 +69,8 @@ futures = { workspace = true } log = { workspace = true } mimalloc = { version = "0.1", default-features = false } object_store = { workspace = true, features = ["aws", "http"] } +parquet = { workspace = true, features = ["encryption"] } +parquet-key-management = { version = "0.1", features = ["_test_utils"] } prost = { workspace = true } tempfile = { workspace = true } test-utils = { path = "../test-utils" } diff --git a/datafusion-examples/examples/parquet_encryption_with_kms.rs b/datafusion-examples/examples/parquet_encryption_with_kms.rs new file mode 100644 index 0000000000000..4bebc070bb9e7 --- /dev/null +++ b/datafusion-examples/examples/parquet_encryption_with_kms.rs @@ -0,0 +1,205 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray}; +use datafusion::common::extensions_options; +use datafusion::config::{ConfigExtension, TableParquetOptions}; +use datafusion::dataframe::DataFrameWriteOptions; +use datafusion::datasource::file_format::parquet::ParquetFormat; +use datafusion::datasource::listing::ListingOptions; +use datafusion::datasource::physical_plan::FileSinkConfig; +use datafusion::error::{DataFusionError, Result}; +use datafusion::physical_plan::execute_stream; +use datafusion::prelude::SessionContext; +use futures::StreamExt; +use parquet::encryption::decrypt::FileDecryptionProperties; +use parquet::encryption::encrypt::FileEncryptionProperties; +use parquet_key_management::crypto_factory::{ + CryptoFactory, DecryptionConfiguration, EncryptionConfiguration, +}; +use parquet_key_management::kms::KmsConnectionConfig; +use parquet_key_management::test_kms::TestKmsClientFactory; +use std::sync::Arc; +use tempfile::TempDir; + +/// This example demonstrates reading and writing Parquet files that +/// are encrypted using Parquet Modular Encryption, and uses the +/// parquet-key-management crate to integrate with a Key Management Server (KMS). + +#[tokio::main] +async fn main() -> Result<()> { + let ctx = SessionContext::new(); + + // Register an `EncryptionFactory` implementation to be used for Parquet encryption + // in the session context. + // This uses an in-memory test KMS from the `parquet_key_management` crate with + // a custom `KmsEncryptionFactory` wrapper type to integrate with DataFusion. + // `EncryptionFactory` instances are registered with a name to identify them so + // they can be later referenced in configuration options, and it's possible to register + // multiple different factories to handle different ways of encrypting Parquet. + // In future it could be possible to have built-in implementations in DataFusion. + let crypto_factory = CryptoFactory::new(TestKmsClientFactory::with_default_keys()); + let encryption_factory = KmsEncryptionFactory { crypto_factory }; + ctx.register_parquet_encryption_factory( + "example.inmem_kms_encryption", + Arc::new(encryption_factory), + ); + + // Register options required for our encryption factory. + // This works similarly to register_table_options_extension but registers extension options + // specific to Parquet, so they can be set in `TableParquetOptions`. + ctx.register_parquet_options_extension(EncryptionConfig::default()); + + let tmpdir = TempDir::new()?; + write_encrypted(&ctx, &tmpdir).await?; + read_encrypted(&ctx, &tmpdir).await?; + Ok(()) +} + +/// Write an encrypted Parquet file +async fn write_encrypted(ctx: &SessionContext, tmpdir: &TempDir) -> Result<()> { + let a: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "c", "d"])); + let b: ArrayRef = Arc::new(Int32Array::from(vec![1, 10, 10, 100])); + let batch = RecordBatch::try_from_iter(vec![("a", a), ("b", b)])?; + + ctx.register_batch("test_data", batch)?; + let df = ctx.table("test_data").await?; + + let mut parquet_options = TableParquetOptions::new(); + // We specify that we want to use Parquet encryption by setting the name of the + // encryption factory to use. + parquet_options.global.encryption_factory = "example.inmem_kms_encryption".to_owned(); + // Our encryption factory requires specifying the master key identifier to + // use for encryption: + let encryption_options = parquet_options + .extensions + .get_mut::() + .ok_or_else(|| { + DataFusionError::Configuration( + "EncryptionConfig options not registered".to_owned(), + ) + }); + encryption_options.footer_key_id = Some("kf1".to_owned()); + // Question: I don't think extensions.get will work when creating new `TableParquetOptions`, + // we probably need to register the extension options here but also allow registering them + // on the session context for compatibility with setting options via strings?. + + df.write_parquet( + tmpdir.path().to_str().unwrap(), + DataFrameWriteOptions::new(), + Some(parquet_options), + ) + .await?; + + println!("Encrypted Parquet written to {:?}", tmpdir.path()); + + Ok(()) +} + +/// Read from an encrypted Parquet file +async fn read_encrypted(ctx: &SessionContext, tmpdir: &TempDir) -> Result<()> { + let mut parquet_options = TableParquetOptions::new(); + // Specify the encryption factory to use for decrypting Parquet. + // We don't require setting any additional configuration options when reading. + parquet_options.global.encryption_factory = "example.inmem_kms_encryption".to_owned(); + + let file_format = ParquetFormat::default().with_options(parquet_options); + let listing_options = ListingOptions::new(Arc::new(file_format)); + + let file_name = std::fs::read_dir(tmpdir)?.next().unwrap()?; + let table_path = format!("file://{}", file_name.path().as_os_str().to_str().unwrap()); + + let _ = ctx + .register_listing_table( + "parquet_table", + &table_path, + listing_options.clone(), + None, + None, + ) + .await?; + + let df = ctx.sql("SELECT * FROM parquet_table").await?; + let plan = df.create_physical_plan().await?; + + let mut batch_stream = execute_stream(plan.clone(), ctx.task_ctx())?; + println!("Reading encrypted Parquet as a RecordBatch stream"); + while let Some(batch) = batch_stream.next().await { + let batch = batch?; + println!("Read batch with {} rows", batch.num_rows()); + } + println!("Finished reading"); + Ok(()) +} + +// Custom options used to configure our encryption factory +extensions_options! { + pub struct EncryptionConfig { + pub footer_key_id: Option, default = None + } +} + +impl ConfigExtension for EncryptionConfig { + const PREFIX: &'static str = "kms_encryption"; +} + +/// Wrapper type for `CryptoFactory` to allow implementing the `EncryptionFactory` trait +struct KmsEncryptionFactory { + crypto_factory: CryptoFactory, +} + +/// `EncryptionFactory` is a trait defined by DataFusion that allows generating +/// file encryption and decryption properties. +/// The `FileSinkConfig is provided so that the schema may be used to dynamically specify +/// per-column encryption. +impl EncryptionFactory for KmsEncryptionFactory { + fn get_file_encryption_properties( + &self, + options: &TableParquetOptions, + sink_config: &FileSinkConfig, + ) -> Result { + let config: &EncryptionConfig = options.extensions.get().ok_or_else(|| { + DataFusionError::Configuration( + "EncryptionConfig options not registered".to_owned(), + ) + })?; + let footer_key_id = config.footer_key_id.as_ref().cloned().ok_or_else(|| { + DataFusionError::Configuration("Footer key id not configured".to_owned()) + })?; + // We could configure per-column keys using the provided schema, + // but for simplicity this example uses uniform encryption. + let config = + EncryptionConfiguration::builder(footer_key_id.to_owned()).build()?; + // Similarly, KMS connection could be configured from the options. + let kms_config = Arc::new(KmsConnectionConfig::default()); + Ok(self + .crypto_factory + .file_encryption_properties(kms_config, &config)?) + } + + fn get_file_decryption_properties( + &self, + options: &TableParquetOptions, + file_path: &str, + ) -> Result { + let config = DecryptionConfiguration::builder().build()?; + let kms_config = Arc::new(KmsConnectionConfig::default()); + Ok(self + .crypto_factory + .file_decryption_properties(kms_config, config)?) + } +} From 4cb361540ca7dce597ca944da547782bbcdd401e Mon Sep 17 00:00:00 2001 From: Adam Reeve Date: Tue, 3 Jun 2025 15:52:02 +1200 Subject: [PATCH 2/2] Tidy up and remove use of ConfigExtension --- .../examples/parquet_encryption_with_kms.rs | 92 +++++++++---------- 1 file changed, 46 insertions(+), 46 deletions(-) diff --git a/datafusion-examples/examples/parquet_encryption_with_kms.rs b/datafusion-examples/examples/parquet_encryption_with_kms.rs index 4bebc070bb9e7..47dd2fe5f7cca 100644 --- a/datafusion-examples/examples/parquet_encryption_with_kms.rs +++ b/datafusion-examples/examples/parquet_encryption_with_kms.rs @@ -16,8 +16,7 @@ // under the License. use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray}; -use datafusion::common::extensions_options; -use datafusion::config::{ConfigExtension, TableParquetOptions}; +use datafusion::config::TableParquetOptions; use datafusion::dataframe::DataFrameWriteOptions; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::listing::ListingOptions; @@ -33,6 +32,7 @@ use parquet_key_management::crypto_factory::{ }; use parquet_key_management::kms::KmsConnectionConfig; use parquet_key_management::test_kms::TestKmsClientFactory; +use std::collections::HashMap; use std::sync::Arc; use tempfile::TempDir; @@ -40,13 +40,15 @@ use tempfile::TempDir; /// are encrypted using Parquet Modular Encryption, and uses the /// parquet-key-management crate to integrate with a Key Management Server (KMS). +const ENCRYPTION_FACTORY_ID: &'static str = "example.inmem_kms_encryption"; + #[tokio::main] async fn main() -> Result<()> { let ctx = SessionContext::new(); // Register an `EncryptionFactory` implementation to be used for Parquet encryption // in the session context. - // This uses an in-memory test KMS from the `parquet_key_management` crate with + // This example uses an in-memory test KMS from the `parquet_key_management` crate with // a custom `KmsEncryptionFactory` wrapper type to integrate with DataFusion. // `EncryptionFactory` instances are registered with a name to identify them so // they can be later referenced in configuration options, and it's possible to register @@ -55,15 +57,10 @@ async fn main() -> Result<()> { let crypto_factory = CryptoFactory::new(TestKmsClientFactory::with_default_keys()); let encryption_factory = KmsEncryptionFactory { crypto_factory }; ctx.register_parquet_encryption_factory( - "example.inmem_kms_encryption", + ENCRYPTION_FACTORY_ID, Arc::new(encryption_factory), ); - // Register options required for our encryption factory. - // This works similarly to register_table_options_extension but registers extension options - // specific to Parquet, so they can be set in `TableParquetOptions`. - ctx.register_parquet_options_extension(EncryptionConfig::default()); - let tmpdir = TempDir::new()?; write_encrypted(&ctx, &tmpdir).await?; read_encrypted(&ctx, &tmpdir).await?; @@ -80,23 +77,14 @@ async fn write_encrypted(ctx: &SessionContext, tmpdir: &TempDir) -> Result<()> { let df = ctx.table("test_data").await?; let mut parquet_options = TableParquetOptions::new(); - // We specify that we want to use Parquet encryption by setting the name of the + // We specify that we want to use Parquet encryption by setting the identifier of the // encryption factory to use. - parquet_options.global.encryption_factory = "example.inmem_kms_encryption".to_owned(); + parquet_options.encryption.factory_id = ENCRYPTION_FACTORY_ID.to_owned(); // Our encryption factory requires specifying the master key identifier to - // use for encryption: - let encryption_options = parquet_options - .extensions - .get_mut::() - .ok_or_else(|| { - DataFusionError::Configuration( - "EncryptionConfig options not registered".to_owned(), - ) - }); - encryption_options.footer_key_id = Some("kf1".to_owned()); - // Question: I don't think extensions.get will work when creating new `TableParquetOptions`, - // we probably need to register the extension options here but also allow registering them - // on the session context for compatibility with setting options via strings?. + // use for encryption. To support arbitrary configuration options for different encryption factories, + // DataFusion could use a HashMap field for encryption options. + let encryption_config = EncryptionConfig::new("kf1".to_owned()); + parquet_options.encryption.factory_options = encryption_config.to_config_map(); df.write_parquet( tmpdir.path().to_str().unwrap(), @@ -114,8 +102,9 @@ async fn write_encrypted(ctx: &SessionContext, tmpdir: &TempDir) -> Result<()> { async fn read_encrypted(ctx: &SessionContext, tmpdir: &TempDir) -> Result<()> { let mut parquet_options = TableParquetOptions::new(); // Specify the encryption factory to use for decrypting Parquet. - // We don't require setting any additional configuration options when reading. - parquet_options.global.encryption_factory = "example.inmem_kms_encryption".to_owned(); + // In this example, we don't require any additional configuration options when reading + // as key identifiers are stored in the key metadata. + parquet_options.encryption.factory_id = ENCRYPTION_FACTORY_ID.to_owned(); let file_format = ParquetFormat::default().with_options(parquet_options); let listing_options = ListingOptions::new(Arc::new(file_format)); @@ -125,7 +114,7 @@ async fn read_encrypted(ctx: &SessionContext, tmpdir: &TempDir) -> Result<()> { let _ = ctx .register_listing_table( - "parquet_table", + "encrypted_parquet_table", &table_path, listing_options.clone(), None, @@ -133,7 +122,7 @@ async fn read_encrypted(ctx: &SessionContext, tmpdir: &TempDir) -> Result<()> { ) .await?; - let df = ctx.sql("SELECT * FROM parquet_table").await?; + let df = ctx.sql("SELECT * FROM encrypted_parquet_table").await?; let plan = df.create_physical_plan().await?; let mut batch_stream = execute_stream(plan.clone(), ctx.task_ctx())?; @@ -146,51 +135,62 @@ async fn read_encrypted(ctx: &SessionContext, tmpdir: &TempDir) -> Result<()> { Ok(()) } -// Custom options used to configure our encryption factory -extensions_options! { - pub struct EncryptionConfig { - pub footer_key_id: Option, default = None - } +// Options used to configure our example encryption factory +struct EncryptionConfig { + pub footer_key_id: String, } -impl ConfigExtension for EncryptionConfig { - const PREFIX: &'static str = "kms_encryption"; +impl EncryptionConfig { + pub fn new(footer_key_id: String) -> Self { + Self { footer_key_id } + } + + pub fn to_config_map(self) -> HashMap { + let mut config = HashMap::new(); + config.insert("footer_key_id".to_string(), self.footer_key_id); + config + } } -/// Wrapper type for `CryptoFactory` to allow implementing the `EncryptionFactory` trait +/// Wrapper type around `CryptoFactory` to allow implementing the `EncryptionFactory` trait struct KmsEncryptionFactory { crypto_factory: CryptoFactory, } /// `EncryptionFactory` is a trait defined by DataFusion that allows generating /// file encryption and decryption properties. -/// The `FileSinkConfig is provided so that the schema may be used to dynamically specify -/// per-column encryption. impl EncryptionFactory for KmsEncryptionFactory { + /// Generate file encryption properties to use when writing a Parquet file. + /// The `FileSinkConfig` is provided so that the schema may be used to dynamically configure + /// per-column encryption keys. + /// Because `FileSinkConfig` can represent multiple output files, we also provide a + /// single file path so that external key material may be used (where key metadata is + /// stored in a JSON file alongside Parquet files). fn get_file_encryption_properties( &self, options: &TableParquetOptions, sink_config: &FileSinkConfig, + file_path: &str, ) -> Result { - let config: &EncryptionConfig = options.extensions.get().ok_or_else(|| { + let config: &HashMap = options.encryption.factory_options; + let footer_key_id = config.get("footer_key_id").cloned().ok_or_else(|| { DataFusionError::Configuration( - "EncryptionConfig options not registered".to_owned(), + "Footer key id for encryption is not set".to_owned(), ) })?; - let footer_key_id = config.footer_key_id.as_ref().cloned().ok_or_else(|| { - DataFusionError::Configuration("Footer key id not configured".to_owned()) - })?; // We could configure per-column keys using the provided schema, // but for simplicity this example uses uniform encryption. - let config = - EncryptionConfiguration::builder(footer_key_id.to_owned()).build()?; - // Similarly, KMS connection could be configured from the options. + let config = EncryptionConfiguration::builder(footer_key_id).build()?; + // Similarly, the KMS connection could be configured from the options if needed, but this + // example just uses the default options. let kms_config = Arc::new(KmsConnectionConfig::default()); Ok(self .crypto_factory .file_encryption_properties(kms_config, &config)?) } + /// Generate file decryption properties to use when reading a Parquet file. + /// The `file_path` needs to be known to support encryption factories that use external key material. fn get_file_decryption_properties( &self, options: &TableParquetOptions,