From b66740189dcc2708a57d952be56498cae85b1da2 Mon Sep 17 00:00:00 2001 From: comphead Date: Thu, 6 Feb 2025 17:56:10 -0800 Subject: [PATCH 1/9] Adding an optional `hdfs` crate --- native/Cargo.toml | 5 +- native/core/Cargo.toml | 8 +- native/hdfs/Cargo.toml | 49 ++ native/hdfs/src/lib.rs | 24 + native/hdfs/src/object_store/hdfs.rs | 676 +++++++++++++++++++++++++++ native/hdfs/src/object_store/mod.rs | 18 + 6 files changed, 776 insertions(+), 4 deletions(-) create mode 100644 native/hdfs/Cargo.toml create mode 100644 native/hdfs/src/lib.rs create mode 100644 native/hdfs/src/object_store/hdfs.rs create mode 100644 native/hdfs/src/object_store/mod.rs diff --git a/native/Cargo.toml b/native/Cargo.toml index 6c905554fb..3a6a2613e1 100644 --- a/native/Cargo.toml +++ b/native/Cargo.toml @@ -16,7 +16,8 @@ # under the License. [workspace] -members = ["core", "spark-expr", "proto"] +default-members = ["core", "spark-expr", "proto"] +members = ["core", "spark-expr", "proto", "hdfs"] resolver = "2" [workspace.package] @@ -38,6 +39,8 @@ arrow-array = { version = "54.1.0" } arrow-buffer = { version = "54.1.0" } arrow-data = { version = "54.1.0" } arrow-schema = { version = "54.1.0" } +async-trait = { version = "0.1" } +bytes = { version = "1.10.0" } parquet = { version = "54.1.0", default-features = false, features = ["experimental"] } datafusion = { version = "45.0.0", default-features = false, features = ["unicode_expressions", "crypto_expressions"] } datafusion-common = { version = "45.0.0", default-features = false } diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index d3f17a7056..42dfeaad42 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -44,7 +44,7 @@ parquet = { workspace = true, default-features = false, features = ["experimenta futures = { workspace = true } mimalloc = { version = "*", default-features = false, optional = true } tokio = { version = "1", features = ["rt-multi-thread"] } -async-trait = "0.1" +async-trait = { workspace = true } log = "0.4" log4rs = "1.2.0" thiserror = { workspace = true } @@ -58,9 +58,9 @@ lz4_flex = { version = "0.11.3", default-features = false } zstd = "0.11" rand = { workspace = true} num = { workspace = true } -bytes = "1.5.0" +bytes = { workspace = true } tempfile = "3.8.0" -itertools = "0.11.0" +itertools = "0.14.0" paste = "1.0.14" datafusion-common = { workspace = true, features= ["object_store"] } datafusion = { workspace = true } @@ -77,6 +77,7 @@ datafusion-comet-proto = { workspace = true } object_store = { workspace = true } url = { workspace = true } chrono = { workspace = true } +datafusion-objectstore-hdfs = { path = "../hdfs", optional = true} [dev-dependencies] pprof = { version = "0.14.0", features = ["flamegraph"] } @@ -88,6 +89,7 @@ hex = "0.4.3" [features] default = [] +hdfs=["datafusion-objectstore-hdfs"] [lib] name = "comet" diff --git a/native/hdfs/Cargo.toml b/native/hdfs/Cargo.toml new file mode 100644 index 0000000000..e0e7ff1e7e --- /dev/null +++ b/native/hdfs/Cargo.toml @@ -0,0 +1,49 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# This is an optional HDFS crate +# To build it from root is required to provide a valid JAVA_HOME +# and enable `hdfs` feature +# Example: JAVA_HOME="/opt/homebrew/opt/openjdk@11" cargo build --features=hdfs + +[package] +name = "datafusion-objectstore-hdfs" +description = "Comet HDFS integration. Initiated by Yanghong Zhong (https://github.com/datafusion-contrib/datafusion-objectstore-hdfs)" +version = { workspace = true } +homepage = { workspace = true } +repository = { workspace = true } +authors = { workspace = true } +readme = { workspace = true } +license = { workspace = true } +edition = { workspace = true } + +[features] +default = ["hdfs", "try_spawn_blocking"] +hdfs = ["fs-hdfs"] +hdfs3 = ["fs-hdfs3"] +# Used for trying to spawn a blocking thread for implementing each object store interface when running in a tokio runtime +try_spawn_blocking = [] + +[dependencies] +async-trait = { workspace = true } +bytes = { workspace = true } +chrono = { workspace = true } +fs-hdfs = { version = "^0.1.12", optional = true } +fs-hdfs3 = { version = "^0.1.12", optional = true } +futures = { workspace = true } +object_store = { workspace = true } +tokio = { version = "1", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] } diff --git a/native/hdfs/src/lib.rs b/native/hdfs/src/lib.rs new file mode 100644 index 0000000000..9e8fffa1c1 --- /dev/null +++ b/native/hdfs/src/lib.rs @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! HDFS as a remote ObjectStore for [Datafusion](https://github.com/apache/datafusion). +//! +//! This crate introduces ``HadoopFileSystem`` as a remote ObjectStore which provides the ability of querying on HDFS files. +//! +//! For the HDFS access, We leverage the library [fs-hdfs](https://github.com/datafusion-contrib/fs-hdfs). +//! Basically, the library only provides Rust FFI APIs for the ``libhdfs`` which can be compiled by a set of C files provided by the [official Hadoop Community](https://github.com/apache/hadoop). +pub mod object_store; diff --git a/native/hdfs/src/object_store/hdfs.rs b/native/hdfs/src/object_store/hdfs.rs new file mode 100644 index 0000000000..5a8101b2fd --- /dev/null +++ b/native/hdfs/src/object_store/hdfs.rs @@ -0,0 +1,676 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Object store that represents the HDFS File System. + +use std::collections::{BTreeSet, VecDeque}; +use std::fmt::{Display, Formatter}; +use std::ops::Range; +use std::path::PathBuf; +use std::sync::Arc; + +use async_trait::async_trait; +use bytes::Bytes; +use chrono::{DateTime, Utc}; +use futures::{stream::BoxStream, StreamExt, TryStreamExt}; +use hdfs::hdfs::{get_hdfs_by_full_path, FileStatus, HdfsErr, HdfsFile, HdfsFs}; +use hdfs::walkdir::HdfsWalkDir; +use object_store::{ + path::{self, Path}, + Error, GetOptions, GetRange, GetResult, GetResultPayload, ListResult, MultipartUpload, + ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, +}; + +/// scheme for HDFS File System +pub static HDFS_SCHEME: &str = "hdfs"; +/// scheme for HDFS Federation File System +pub static VIEWFS_SCHEME: &str = "viewfs"; + +#[derive(Debug)] +/// Hadoop File System as Object Store. +pub struct HadoopFileSystem { + hdfs: Arc, +} + +impl Default for HadoopFileSystem { + fn default() -> Self { + Self { + hdfs: get_hdfs_by_full_path("default").expect("Fail to get default HdfsFs"), + } + } +} + +impl HadoopFileSystem { + /// Get HDFS from the full path, like hdfs://localhost:8020/xxx/xxx + pub fn new(full_path: &str) -> Option { + get_hdfs_by_full_path(full_path) + .map(|hdfs| Some(Self { hdfs })) + .unwrap_or(None) + } + + /// Return filesystem path of the given location + fn path_to_filesystem(location: &Path) -> String { + format!("/{}", location.as_ref()) + } + + pub fn get_path_root(&self) -> String { + self.hdfs.url().to_owned() + } + + pub fn get_path(&self, full_path: &str) -> Path { + get_path(full_path, self.hdfs.url()) + } + + pub fn get_hdfs_host(&self) -> String { + let hdfs_url = self.hdfs.url(); + if hdfs_url.starts_with(HDFS_SCHEME) { + hdfs_url[7..].to_owned() + } else if hdfs_url.starts_with(VIEWFS_SCHEME) { + hdfs_url[9..].to_owned() + } else { + "".to_owned() + } + } + + fn read_range(range: &Range, file: &HdfsFile) -> Result { + let to_read = range.end - range.start; + let mut buf = vec![0; to_read]; + let read = file + .read_with_pos(range.start as i64, buf.as_mut_slice()) + .map_err(to_error)?; + assert_eq!( + to_read as i32, + read, + "Read path {} from {} with expected size {} and actual size {}", + file.path(), + range.start, + to_read, + read + ); + Ok(buf.into()) + } +} + +impl Display for HadoopFileSystem { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "HadoopFileSystem") + } +} + +#[async_trait] +impl ObjectStore for HadoopFileSystem { + async fn put_opts( + &self, + _location: &Path, + _bytes: PutPayload, + _opts: PutOptions, + ) -> Result { + todo!() + } + + async fn put_multipart_opts( + &self, + _location: &Path, + _opts: PutMultipartOpts, + ) -> object_store::Result> { + unimplemented!() + } + + async fn get(&self, location: &Path) -> Result { + let hdfs = self.hdfs.clone(); + let hdfs_root = self.hdfs.url().to_owned(); + let location = HadoopFileSystem::path_to_filesystem(location); + + let (blob, object_metadata, range) = maybe_spawn_blocking(move || { + let file = hdfs.open(&location).map_err(to_error)?; + + let file_status = file.get_file_status().map_err(to_error)?; + + let to_read = file_status.len(); + let mut buf = vec![0; to_read]; + let read = file.read(buf.as_mut_slice()).map_err(to_error)?; + assert_eq!( + to_read as i32, read, + "Read path {} with expected size {} and actual size {}", + &location, to_read, read + ); + + file.close().map_err(to_error)?; + + let object_metadata = convert_metadata(file_status.clone(), &hdfs_root); + + let range = Range { + start: 0, + end: file_status.len(), + }; + + Ok((buf.into(), object_metadata, range)) + }) + .await?; + + Ok(GetResult { + payload: GetResultPayload::Stream( + futures::stream::once(async move { Ok(blob) }).boxed(), + ), + meta: object_metadata, + range, + attributes: Default::default(), + }) + } + + async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { + if options.if_match.is_some() || options.if_none_match.is_some() { + return Err(Error::Generic { + store: "HadoopFileSystem", + source: Box::new(HdfsErr::Generic("ETags not supported".to_string())), + }); + } + + let hdfs = self.hdfs.clone(); + let hdfs_root = self.hdfs.url().to_owned(); + let location = HadoopFileSystem::path_to_filesystem(location); + + let (blob, object_metadata, range) = maybe_spawn_blocking(move || { + let file = hdfs.open(&location).map_err(to_error)?; + + let file_status = file.get_file_status().map_err(to_error)?; + + if options.if_unmodified_since.is_some() || options.if_modified_since.is_some() { + check_modified(&options, &location, last_modified(&file_status))?; + } + + let range = match options.range { + Some(GetRange::Bounded(range)) => range, + _ => Range { + start: 0, + end: file_status.len(), + }, + }; + + let buf = Self::read_range(&range, &file)?; + + file.close().map_err(to_error)?; + + let object_meta = convert_metadata(file_status, &hdfs_root); + + Ok((buf, object_meta, range)) + }) + .await?; + + Ok(GetResult { + payload: GetResultPayload::Stream( + futures::stream::once(async move { Ok(blob) }).boxed(), + ), + meta: object_metadata, + range, + attributes: Default::default(), + }) + } + + async fn get_range(&self, location: &Path, range: Range) -> Result { + let hdfs = self.hdfs.clone(); + let location = HadoopFileSystem::path_to_filesystem(location); + + maybe_spawn_blocking(move || { + let file = hdfs.open(&location).map_err(to_error)?; + let buf = Self::read_range(&range, &file)?; + file.close().map_err(to_error)?; + + Ok(buf) + }) + .await + } + + async fn get_ranges(&self, location: &Path, ranges: &[Range]) -> Result> { + coalesce_ranges( + ranges, + |range| self.get_range(location, range), + HDFS_COALESCE_DEFAULT, + ) + .await + } + + async fn head(&self, location: &Path) -> Result { + let hdfs = self.hdfs.clone(); + let hdfs_root = self.hdfs.url().to_owned(); + let location = HadoopFileSystem::path_to_filesystem(location); + + maybe_spawn_blocking(move || { + let file_status = hdfs.get_file_status(&location).map_err(to_error)?; + Ok(convert_metadata(file_status, &hdfs_root)) + }) + .await + } + + async fn delete(&self, location: &Path) -> Result<()> { + let hdfs = self.hdfs.clone(); + let location = HadoopFileSystem::path_to_filesystem(location); + + maybe_spawn_blocking(move || { + hdfs.delete(&location, false).map_err(to_error)?; + + Ok(()) + }) + .await + } + + /// List all of the leaf files under the prefix path. + /// It will recursively search leaf files whose depth is larger than 1 + fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result> { + let default_path = Path::from(self.get_path_root()); + let prefix = prefix.unwrap_or(&default_path); + let hdfs = self.hdfs.clone(); + let hdfs_root = self.hdfs.url().to_owned(); + let walkdir = + HdfsWalkDir::new_with_hdfs(HadoopFileSystem::path_to_filesystem(prefix), hdfs) + .min_depth(1); + + let s = + walkdir.into_iter().flat_map(move |result_dir_entry| { + match convert_walkdir_result(result_dir_entry) { + Err(e) => Some(Err(e)), + Ok(None) => None, + Ok(entry @ Some(_)) => entry + .filter(|dir_entry| dir_entry.is_file()) + .map(|entry| Ok(convert_metadata(entry, &hdfs_root))), + } + }); + + // If no tokio context, return iterator directly as no + // need to perform chunked spawn_blocking reads + if tokio::runtime::Handle::try_current().is_err() { + return futures::stream::iter(s).boxed(); + } + + // Otherwise list in batches of CHUNK_SIZE + const CHUNK_SIZE: usize = 1024; + + let buffer = VecDeque::with_capacity(CHUNK_SIZE); + let stream = futures::stream::try_unfold((s, buffer), |(mut s, mut buffer)| async move { + if buffer.is_empty() { + (s, buffer) = tokio::task::spawn_blocking(move || { + for _ in 0..CHUNK_SIZE { + match s.next() { + Some(r) => buffer.push_back(r), + None => break, + } + } + (s, buffer) + }) + .await?; + } + + match buffer.pop_front() { + Some(Err(e)) => Err(e), + Some(Ok(meta)) => Ok(Some((meta, (s, buffer)))), + None => Ok(None), + } + }); + + stream.boxed() + } + + /// List files and directories directly under the prefix path. + /// It will not recursively search leaf files whose depth is larger than 1 + async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { + let default_path = Path::from(self.get_path_root()); + let prefix = prefix.unwrap_or(&default_path); + let hdfs = self.hdfs.clone(); + let hdfs_root = self.hdfs.url().to_owned(); + let walkdir = + HdfsWalkDir::new_with_hdfs(HadoopFileSystem::path_to_filesystem(prefix), hdfs) + .min_depth(1) + .max_depth(1); + + let prefix = prefix.clone(); + maybe_spawn_blocking(move || { + let mut common_prefixes = BTreeSet::new(); + let mut objects = Vec::new(); + + for entry_res in walkdir.into_iter().map(convert_walkdir_result) { + if let Some(entry) = entry_res? { + let is_directory = entry.is_directory(); + let entry_location = get_path(entry.name(), &hdfs_root); + + let mut parts = match entry_location.prefix_match(&prefix) { + Some(parts) => parts, + None => continue, + }; + + let common_prefix = match parts.next() { + Some(p) => p, + None => continue, + }; + + drop(parts); + + if is_directory { + common_prefixes.insert(prefix.child(common_prefix)); + } else { + objects.push(convert_metadata(entry, &hdfs_root)); + } + } + } + + Ok(ListResult { + common_prefixes: common_prefixes.into_iter().collect(), + objects, + }) + }) + .await + } + + /// Copy an object from one path to another. + /// If there exists an object at the destination, it will be overwritten. + async fn copy(&self, from: &Path, to: &Path) -> Result<()> { + let hdfs = self.hdfs.clone(); + let from = HadoopFileSystem::path_to_filesystem(from); + let to = HadoopFileSystem::path_to_filesystem(to); + + maybe_spawn_blocking(move || { + // We need to make sure the source exist + if !hdfs.exist(&from) { + return Err(Error::NotFound { + path: from.clone(), + source: Box::new(HdfsErr::FileNotFound(from)), + }); + } + // Delete destination if exists + if hdfs.exist(&to) { + hdfs.delete(&to, false).map_err(to_error)?; + } + + hdfs::util::HdfsUtil::copy(hdfs.as_ref(), &from, hdfs.as_ref(), &to) + .map_err(to_error)?; + + Ok(()) + }) + .await + } + + /// It's only allowed for the same HDFS + async fn rename(&self, from: &Path, to: &Path) -> Result<()> { + let hdfs = self.hdfs.clone(); + let from = HadoopFileSystem::path_to_filesystem(from); + let to = HadoopFileSystem::path_to_filesystem(to); + + maybe_spawn_blocking(move || { + hdfs.rename(&from, &to).map_err(to_error)?; + + Ok(()) + }) + .await + } + + /// Copy an object from one path to another, only if destination is empty. + /// Will return an error if the destination already has an object. + async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { + let hdfs = self.hdfs.clone(); + let from = HadoopFileSystem::path_to_filesystem(from); + let to = HadoopFileSystem::path_to_filesystem(to); + + maybe_spawn_blocking(move || { + if hdfs.exist(&to) { + return Err(Error::AlreadyExists { + path: from, + source: Box::new(HdfsErr::FileAlreadyExists(to)), + }); + } + + hdfs::util::HdfsUtil::copy(hdfs.as_ref(), &from, hdfs.as_ref(), &to) + .map_err(to_error)?; + + Ok(()) + }) + .await + } +} + +/// Create Path without prefix +pub fn get_path(full_path: &str, prefix: &str) -> Path { + let partial_path = &full_path[prefix.len()..]; + Path::parse(partial_path).unwrap() +} + +/// Convert HDFS file status to ObjectMeta +pub fn convert_metadata(file: FileStatus, prefix: &str) -> ObjectMeta { + ObjectMeta { + location: get_path(file.name(), prefix), + last_modified: last_modified(&file), + size: file.len(), + e_tag: None, + version: None, + } +} + +fn last_modified(file: &FileStatus) -> DateTime { + DateTime::from_timestamp(file.last_modified(), 0).unwrap() +} + +fn check_modified( + get_options: &GetOptions, + location: &str, + last_modified: DateTime, +) -> Result<()> { + if let Some(date) = get_options.if_modified_since { + if last_modified <= date { + return Err(Error::NotModified { + path: location.to_string(), + source: format!("{} >= {}", date, last_modified).into(), + }); + } + } + + if let Some(date) = get_options.if_unmodified_since { + if last_modified > date { + return Err(Error::Precondition { + path: location.to_string(), + source: format!("{} < {}", date, last_modified).into(), + }); + } + } + Ok(()) +} + +/// Convert walkdir results and converts not-found errors into `None`. +fn convert_walkdir_result( + res: std::result::Result, +) -> Result> { + match res { + Ok(entry) => Ok(Some(entry)), + Err(walkdir_err) => match walkdir_err { + HdfsErr::FileNotFound(_) => Ok(None), + _ => Err(to_error(HdfsErr::Generic( + "Fail to walk hdfs directory".to_owned(), + ))), + }, + } +} + +/// Range requests with a gap less than or equal to this, +/// will be coalesced into a single request by [`coalesce_ranges`] +pub const HDFS_COALESCE_DEFAULT: usize = 1024 * 1024; + +/// Up to this number of range requests will be performed in parallel by [`coalesce_ranges`] +pub const OBJECT_STORE_COALESCE_PARALLEL: usize = 10; + +/// Takes a function to fetch ranges and coalesces adjacent ranges if they are +/// less than `coalesce` bytes apart. +pub async fn coalesce_ranges( + ranges: &[Range], + fetch: F, + coalesce: usize, +) -> Result> +where + F: FnMut(Range) -> Fut, + Fut: std::future::Future>, +{ + let fetch_ranges = merge_ranges(ranges, coalesce); + + let fetched: Vec<_> = futures::stream::iter(fetch_ranges.iter().cloned()) + .map(fetch) + .buffered(OBJECT_STORE_COALESCE_PARALLEL) + .try_collect() + .await?; + + Ok(ranges + .iter() + .map(|range| { + let idx = fetch_ranges.partition_point(|v| v.start <= range.start) - 1; + let fetch_range = &fetch_ranges[idx]; + let fetch_bytes = &fetched[idx]; + + let start = range.start - fetch_range.start; + let end = range.end - fetch_range.start; + fetch_bytes.slice(start..end) + }) + .collect()) +} + +/// Takes a function and spawns it to a tokio blocking pool if available +pub async fn maybe_spawn_blocking(f: F) -> Result +where + F: FnOnce() -> Result + Send + 'static, + T: Send + 'static, +{ + #[cfg(feature = "try_spawn_blocking")] + match tokio::runtime::Handle::try_current() { + Ok(runtime) => runtime.spawn_blocking(f).await?, + Err(_) => f(), + } + + #[cfg(not(feature = "try_spawn_blocking"))] + f() +} + +/// Returns a sorted list of ranges that cover `ranges` +fn merge_ranges(ranges: &[Range], coalesce: usize) -> Vec> { + if ranges.is_empty() { + return vec![]; + } + + let mut ranges = ranges.to_vec(); + ranges.sort_unstable_by_key(|range| range.start); + + let mut ret = Vec::with_capacity(ranges.len()); + let mut start_idx = 0; + let mut end_idx = 1; + + while start_idx != ranges.len() { + let mut range_end = ranges[start_idx].end; + + while end_idx != ranges.len() + && ranges[end_idx] + .start + .checked_sub(range_end) + .map(|delta| delta <= coalesce) + .unwrap_or(true) + { + range_end = range_end.max(ranges[end_idx].end); + end_idx += 1; + } + + let start = ranges[start_idx].start; + let end = range_end; + ret.push(start..end); + + start_idx = end_idx; + end_idx += 1; + } + + ret +} + +fn to_error(err: HdfsErr) -> Error { + match err { + HdfsErr::FileNotFound(path) => Error::NotFound { + path: path.clone(), + source: Box::new(HdfsErr::FileNotFound(path)), + }, + HdfsErr::FileAlreadyExists(path) => Error::AlreadyExists { + path: path.clone(), + source: Box::new(HdfsErr::FileAlreadyExists(path)), + }, + HdfsErr::InvalidUrl(path) => Error::InvalidPath { + source: path::Error::InvalidPath { + path: PathBuf::from(path), + }, + }, + HdfsErr::CannotConnectToNameNode(namenode_uri) => Error::Generic { + store: "HadoopFileSystem", + source: Box::new(HdfsErr::CannotConnectToNameNode(namenode_uri)), + }, + HdfsErr::Generic(err_str) => Error::Generic { + store: "HadoopFileSystem", + source: Box::new(HdfsErr::Generic(err_str)), + }, + } +} + +#[allow(clippy::single_range_in_vec_init)] +#[cfg(test)] +mod tests { + use super::*; + use std::ops::Range; + + #[tokio::test] + async fn test_coalesce_ranges() { + let do_fetch = |ranges: Vec>, coalesce: usize| async move { + let max = ranges.iter().map(|x| x.end).max().unwrap_or(0); + let src: Vec<_> = (0..max).map(|x| x as u8).collect(); + + let mut fetches = vec![]; + let coalesced = coalesce_ranges( + &ranges, + |range| { + fetches.push(range.clone()); + futures::future::ready(Ok(Bytes::from(src[range].to_vec()))) + }, + coalesce, + ) + .await + .unwrap(); + + assert_eq!(ranges.len(), coalesced.len()); + for (range, bytes) in ranges.iter().zip(coalesced) { + assert_eq!(bytes.as_ref(), &src[range.clone()]); + } + fetches + }; + + let fetches = do_fetch(vec![], 0).await; + assert_eq!(fetches, vec![]); + + let fetches = do_fetch(vec![0..3], 0).await; + assert_eq!(fetches, vec![0..3]); + + let fetches = do_fetch(vec![0..2, 3..5], 0).await; + assert_eq!(fetches, vec![0..2, 3..5]); + + let fetches = do_fetch(vec![0..1, 1..2], 0).await; + assert_eq!(fetches, vec![0..2]); + + let fetches = do_fetch(vec![0..1, 2..72], 1).await; + assert_eq!(fetches, vec![0..72]); + + let fetches = do_fetch(vec![0..1, 56..72, 73..75], 1).await; + assert_eq!(fetches, vec![0..1, 56..75]); + + let fetches = do_fetch(vec![0..1, 5..6, 7..9, 4..6], 1).await; + assert_eq!(fetches, vec![0..1, 4..9]); + } +} diff --git a/native/hdfs/src/object_store/mod.rs b/native/hdfs/src/object_store/mod.rs new file mode 100644 index 0000000000..db4754a010 --- /dev/null +++ b/native/hdfs/src/object_store/mod.rs @@ -0,0 +1,18 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub mod hdfs; From af60b8438580028bde2673c080908626d3a86320 Mon Sep 17 00:00:00 2001 From: comphead Date: Fri, 7 Feb 2025 10:34:48 -0800 Subject: [PATCH 2/9] Adding an optional `hdfs` crate --- .github/actions/rust-test/action.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/actions/rust-test/action.yaml b/.github/actions/rust-test/action.yaml index a1cde99ba2..aa0f467b29 100644 --- a/.github/actions/rust-test/action.yaml +++ b/.github/actions/rust-test/action.yaml @@ -29,7 +29,7 @@ runs: shell: bash run: | cd native - cargo clippy --color=never --all-targets --workspace -- -D warnings + cargo clippy --color=never --all-targets --workspace --exclude hdfs -- -D warnings - name: Check compilation shell: bash From 25657d340e7e2c11b95816979e574732993c987b Mon Sep 17 00:00:00 2001 From: comphead Date: Fri, 7 Feb 2025 12:01:02 -0800 Subject: [PATCH 3/9] Adding an optional `hdfs` crate --- .github/actions/rust-test/action.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/actions/rust-test/action.yaml b/.github/actions/rust-test/action.yaml index aa0f467b29..99ec3a0a2e 100644 --- a/.github/actions/rust-test/action.yaml +++ b/.github/actions/rust-test/action.yaml @@ -29,7 +29,7 @@ runs: shell: bash run: | cd native - cargo clippy --color=never --all-targets --workspace --exclude hdfs -- -D warnings + cargo clippy --color=never --all-targets --workspace --exclude datafusion-objectstore-hdfs -- -D warnings - name: Check compilation shell: bash From 127dc4e5e887c770074cd721a2944f44be7f6147 Mon Sep 17 00:00:00 2001 From: comphead Date: Fri, 7 Feb 2025 14:33:02 -0800 Subject: [PATCH 4/9] Adding an optional `hdfs` crate --- native/core/Cargo.toml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index 42dfeaad42..fb4608d38c 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -91,6 +91,10 @@ hex = "0.4.3" default = [] hdfs=["datafusion-objectstore-hdfs"] +# exclude optional packages from cargo machete verifications +[package.metadata.cargo-machete] +ignored = ["datafusion-objectstore-hdfs"] + [lib] name = "comet" # "rlib" is for benchmarking with criterion. From 4e4c7bce1b4367af7b66e1f677c8bb4a9fac146e Mon Sep 17 00:00:00 2001 From: comphead Date: Fri, 7 Feb 2025 14:38:13 -0800 Subject: [PATCH 5/9] Adding an optional `hdfs` crate --- native/hdfs/Cargo.toml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/native/hdfs/Cargo.toml b/native/hdfs/Cargo.toml index e0e7ff1e7e..82db7f257c 100644 --- a/native/hdfs/Cargo.toml +++ b/native/hdfs/Cargo.toml @@ -47,3 +47,6 @@ fs-hdfs3 = { version = "^0.1.12", optional = true } futures = { workspace = true } object_store = { workspace = true } tokio = { version = "1", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] } + +[package.metadata.cargo-machete] +ignored = ["fs-hdfs", "fs-hdfs3"] \ No newline at end of file From c288600ea1c72a655ccf1500a18a3e9b9b1d66eb Mon Sep 17 00:00:00 2001 From: comphead Date: Wed, 12 Feb 2025 08:46:48 -0800 Subject: [PATCH 6/9] Adding an optional `hdfs` crate --- .github/actions/rust-test/action.yaml | 2 +- NOTICE.txt | 3 +++ native/core/Cargo.toml | 6 ++--- native/hdfs/Cargo.toml | 4 ++-- native/hdfs/README.md | 33 +++++++++++++++++++++++++++ 5 files changed, 42 insertions(+), 6 deletions(-) create mode 100644 native/hdfs/README.md diff --git a/.github/actions/rust-test/action.yaml b/.github/actions/rust-test/action.yaml index 99ec3a0a2e..a1cde99ba2 100644 --- a/.github/actions/rust-test/action.yaml +++ b/.github/actions/rust-test/action.yaml @@ -29,7 +29,7 @@ runs: shell: bash run: | cd native - cargo clippy --color=never --all-targets --workspace --exclude datafusion-objectstore-hdfs -- -D warnings + cargo clippy --color=never --all-targets --workspace -- -D warnings - name: Check compilation shell: bash diff --git a/NOTICE.txt b/NOTICE.txt index b572b1fa29..3a68eec957 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -8,3 +8,6 @@ This product includes software developed at Apache Gluten (https://github.com/apache/incubator-gluten/) Specifically: - Optimizer rule to replace SortMergeJoin with ShuffleHashJoin + +This produce includes software developed at +Apache DataFusion HDFS ObjectStore Contrib Package(https://github.com/datafusion-contrib/datafusion-objectstore-hdfs) \ No newline at end of file diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index 9a1e55478d..f54642618b 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -78,7 +78,7 @@ object_store = { workspace = true } url = { workspace = true } chrono = { workspace = true } parking_lot = "0.12.3" -datafusion-objectstore-hdfs = { path = "../hdfs", optional = true} +datafusion-comet-objectstore-hdfs = { path = "../hdfs", optional = true} [dev-dependencies] pprof = { version = "0.14.0", features = ["flamegraph"] } @@ -90,11 +90,11 @@ hex = "0.4.3" [features] default = [] -hdfs=["datafusion-objectstore-hdfs"] +hdfs=["datafusion-comet-objectstore-hdfs"] # exclude optional packages from cargo machete verifications [package.metadata.cargo-machete] -ignored = ["datafusion-objectstore-hdfs"] +ignored = ["datafusion-comet-objectstore-hdfs"] [lib] name = "comet" diff --git a/native/hdfs/Cargo.toml b/native/hdfs/Cargo.toml index 82db7f257c..dc8f970ef7 100644 --- a/native/hdfs/Cargo.toml +++ b/native/hdfs/Cargo.toml @@ -21,8 +21,8 @@ # Example: JAVA_HOME="/opt/homebrew/opt/openjdk@11" cargo build --features=hdfs [package] -name = "datafusion-objectstore-hdfs" -description = "Comet HDFS integration. Initiated by Yanghong Zhong (https://github.com/datafusion-contrib/datafusion-objectstore-hdfs)" +name = "datafusion-comet-objectstore-hdfs" +description = "Comet HDFS integration" version = { workspace = true } homepage = { workspace = true } repository = { workspace = true } diff --git a/native/hdfs/README.md b/native/hdfs/README.md new file mode 100644 index 0000000000..b5dd2b5557 --- /dev/null +++ b/native/hdfs/README.md @@ -0,0 +1,33 @@ + + +# Apache DataFusion Comet: HDFS integration + +This crate contains the HDFS cluster integration +and is intended to be used as part of the Apache DataFusion Comet project + +The HDFS access powered by [fs-hdfs](https://github.com/datafusion-contrib/fs-hdfs). +The crate provides `object_store` implementation leveraged by Rust FFI APIs for the `libhdfs` which can be compiled +by a set of C files provided by the [official Hadoop Community](https://github.com/apache/hadoop). + +# Supported HDFS versions + +Currently supported Apache Hadoop clients are: +- 2.* +- 3.* \ No newline at end of file From ecc7995f3613d872556f415d0ba492f5d946ca07 Mon Sep 17 00:00:00 2001 From: comphead Date: Wed, 12 Feb 2025 09:01:10 -0800 Subject: [PATCH 7/9] Adding an optional `hdfs` crate --- .github/actions/setup-builder/action.yaml | 1 + native/Cargo.lock | 143 ++++++++++++++++++++-- 2 files changed, 133 insertions(+), 11 deletions(-) diff --git a/.github/actions/setup-builder/action.yaml b/.github/actions/setup-builder/action.yaml index 0787d9eed3..0ccd01ad72 100644 --- a/.github/actions/setup-builder/action.yaml +++ b/.github/actions/setup-builder/action.yaml @@ -34,6 +34,7 @@ runs: run: | apt-get update apt-get install -y protobuf-compiler + apt-get install -y clang - name: Install JDK ${{inputs.jdk-version}} uses: actions/setup-java@v4 diff --git a/native/Cargo.lock b/native/Cargo.lock index 8e37079f66..55a1713874 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -399,6 +399,28 @@ dependencies = [ "num-traits", ] +[[package]] +name = "bindgen" +version = "0.64.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4243e6031260db77ede97ad86c27e501d646a27ab57b59a574f725d98ab1fb4" +dependencies = [ + "bitflags 1.3.2", + "cexpr", + "clang-sys", + "lazy_static", + "lazycell", + "log", + "peeking_take_while", + "proc-macro2", + "quote", + "regex", + "rustc-hash", + "shlex", + "syn 1.0.109", + "which", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -510,6 +532,15 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6d43a04d8753f35258c91f8ec639f792891f748a1edbd759cf1dcea3382ad83c" +[[package]] +name = "cexpr" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" +dependencies = [ + "nom", +] + [[package]] name = "cfg-if" version = "1.0.0" @@ -598,6 +629,17 @@ dependencies = [ "half", ] +[[package]] +name = "clang-sys" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b023947811758c97c59bf9d1c188fd619ad4718dcaa767947df1cadb14f39f4" +dependencies = [ + "glob", + "libc", + "libloading 0.8.6", +] + [[package]] name = "clap" version = "4.5.28" @@ -898,6 +940,7 @@ dependencies = [ "crc32fast", "criterion", "datafusion", + "datafusion-comet-objectstore-hdfs", "datafusion-comet-proto", "datafusion-comet-spark-expr", "datafusion-common", @@ -907,7 +950,7 @@ dependencies = [ "datafusion-physical-expr", "futures", "hex", - "itertools 0.11.0", + "itertools 0.14.0", "jni", "lazy_static", "log", @@ -934,6 +977,20 @@ dependencies = [ "zstd 0.11.2+zstd.1.5.2", ] +[[package]] +name = "datafusion-comet-objectstore-hdfs" +version = "0.6.0" +dependencies = [ + "async-trait", + "bytes", + "chrono", + "fs-hdfs", + "fs-hdfs3", + "futures", + "object_store", + "tokio", +] + [[package]] name = "datafusion-comet-proto" version = "0.6.0" @@ -1490,6 +1547,34 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fs-hdfs" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25f164ff6334da016dffd1c29a3c05b81c35b857ef829d3fa9e58ae8d3e6f76b" +dependencies = [ + "bindgen", + "cc", + "lazy_static", + "libc", + "log", + "url", +] + +[[package]] +name = "fs-hdfs3" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f38e500596a428817fd4fd8a9a21da32f4edb3250e87886039670b12ea02f5d" +dependencies = [ + "bindgen", + "cc", + "lazy_static", + "libc", + "log", + "url", +] + [[package]] name = "futures" version = "0.3.31" @@ -1909,15 +1994,6 @@ dependencies = [ "either", ] -[[package]] -name = "itertools" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1c173a5686ce8bfa551b3563d0c2170bf24ca44da99c7ca4bfdab5418c3fe57" -dependencies = [ - "either", -] - [[package]] name = "itertools" version = "0.12.1" @@ -1971,7 +2047,7 @@ dependencies = [ "combine", "java-locator", "jni-sys", - "libloading", + "libloading 0.7.4", "log", "thiserror", "walkdir", @@ -2009,6 +2085,12 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" +[[package]] +name = "lazycell" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" + [[package]] name = "lexical-core" version = "1.0.5" @@ -2089,6 +2171,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "libloading" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34" +dependencies = [ + "cfg-if", + "windows-targets 0.52.6", +] + [[package]] name = "libm" version = "0.2.11" @@ -2213,6 +2305,12 @@ dependencies = [ "libmimalloc-sys", ] +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "miniz_oxide" version = "0.8.3" @@ -2239,6 +2337,16 @@ dependencies = [ "libc", ] +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] + [[package]] name = "num" version = "0.4.3" @@ -2449,6 +2557,12 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" +[[package]] +name = "peeking_take_while" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" + [[package]] name = "percent-encoding" version = "2.3.1" @@ -2796,6 +2910,12 @@ version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" +[[package]] +name = "rustc-hash" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" + [[package]] name = "rustc_version" version = "0.4.1" @@ -3201,6 +3321,7 @@ checksum = "3d61fa4ffa3de412bfea335c6ecff681de2b609ba3c77ef3e00e521813a9ed9e" dependencies = [ "backtrace", "bytes", + "parking_lot", "pin-project-lite", "tokio-macros", ] From 7ae091005284b2a753ccdf8397f5e717da336a2f Mon Sep 17 00:00:00 2001 From: Oleks V Date: Wed, 12 Feb 2025 17:13:52 -0800 Subject: [PATCH 8/9] Update NOTICE.txt Co-authored-by: Andy Grove --- NOTICE.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/NOTICE.txt b/NOTICE.txt index 3a68eec957..fe08b076cd 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -10,4 +10,4 @@ Specifically: - Optimizer rule to replace SortMergeJoin with ShuffleHashJoin This produce includes software developed at -Apache DataFusion HDFS ObjectStore Contrib Package(https://github.com/datafusion-contrib/datafusion-objectstore-hdfs) \ No newline at end of file +DataFusion HDFS ObjectStore Contrib Package(https://github.com/datafusion-contrib/datafusion-objectstore-hdfs) \ No newline at end of file From adfacc77098e6bafe017cb719f2ae18a1fb22b54 Mon Sep 17 00:00:00 2001 From: Oleks V Date: Wed, 12 Feb 2025 17:14:08 -0800 Subject: [PATCH 9/9] Update NOTICE.txt Co-authored-by: Andy Grove --- NOTICE.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/NOTICE.txt b/NOTICE.txt index fe08b076cd..8a77627012 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -9,5 +9,5 @@ Apache Gluten (https://github.com/apache/incubator-gluten/) Specifically: - Optimizer rule to replace SortMergeJoin with ShuffleHashJoin -This produce includes software developed at +This product includes software developed at DataFusion HDFS ObjectStore Contrib Package(https://github.com/datafusion-contrib/datafusion-objectstore-hdfs) \ No newline at end of file