From 05b4443c4e5abdb6e2fcb2e73d015f967654ac35 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Sun, 7 May 2023 19:29:38 +0800 Subject: [PATCH 1/3] feat: Add typed kv adapter and migrate moka to it Signed-off-by: Xuanwo --- core/src/raw/adapters/kv/mod.rs | 6 - core/src/raw/adapters/mod.rs | 1 + core/src/raw/adapters/typed_kv/api.rs | 89 +++++++ core/src/raw/adapters/typed_kv/backend.rs | 282 ++++++++++++++++++++++ core/src/raw/adapters/typed_kv/mod.rs | 27 +++ core/src/raw/oio/cursor.rs | 21 ++ core/src/services/moka/backend.rs | 34 ++- 7 files changed, 434 insertions(+), 26 deletions(-) create mode 100644 core/src/raw/adapters/typed_kv/api.rs create mode 100644 core/src/raw/adapters/typed_kv/backend.rs create mode 100644 core/src/raw/adapters/typed_kv/mod.rs diff --git a/core/src/raw/adapters/kv/mod.rs b/core/src/raw/adapters/kv/mod.rs index d24179ca5dec..facb6efe1c59 100644 --- a/core/src/raw/adapters/kv/mod.rs +++ b/core/src/raw/adapters/kv/mod.rs @@ -18,12 +18,6 @@ //! Providing Key Value Adapter for OpenDAL. //! //! Any services that implement `Adapter` can be used an OpenDAL Service. -//! -//! # Notes -//! -//! This adapter creates a new storage format which is not stable. -//! -//! Any service that built upon this adapter should not be persisted. mod api; pub use api::Adapter; diff --git a/core/src/raw/adapters/mod.rs b/core/src/raw/adapters/mod.rs index d69a595a12ab..9864797c01f8 100644 --- a/core/src/raw/adapters/mod.rs +++ b/core/src/raw/adapters/mod.rs @@ -46,3 +46,4 @@ //! - [`kv::Adapter`]: Adapter for Key Value Services like in-memory map, `redis`. pub mod kv; +pub mod typed_kv; diff --git a/core/src/raw/adapters/typed_kv/api.rs b/core/src/raw/adapters/typed_kv/api.rs new file mode 100644 index 000000000000..1f2008c4f405 --- /dev/null +++ b/core/src/raw/adapters/typed_kv/api.rs @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{fmt::Debug, mem::size_of}; + +use async_trait::async_trait; +use bytes::Bytes; +use chrono::Utc; + +use crate::*; + +/// Adapter is the typed adapter to underlying kv services. +/// +/// By implement this trait, any kv service can work as an OpenDAL Service. +/// +/// # Notes +/// +/// `typed_kv::Adapter` is the typed version of `kv::Adapter`. It's more +/// efficient if the uderlying kv service can store data with its type. For +/// example, we can store `Bytes` along with it's metadata so that we don't +/// need to serialize/deserialize it when we get it from the service. +/// +/// Ideally, we should use `typed_kv::Adapter` instead of `kv::Adapter` for +/// in-memory rust libs like moka and dashmap. +#[async_trait] +pub trait Adapter: Send + Sync + Debug + Unpin + 'static { + /// Get the scheme and name of current adapter. + fn metadata(&self) -> (Scheme, String); + + /// Get a value from adapter. + async fn get(&self, path: &str) -> Result>; + + /// Get a value from adapter. + fn blocking_get(&self, path: &str) -> Result>; + + /// Set a value into adapter. + async fn set(&self, path: &str, value: Value) -> Result<()>; + + /// Set a value into adapter. + fn blocking_set(&self, path: &str, value: Value) -> Result<()>; + + /// Delete a value from adapter. + async fn delete(&self, path: &str) -> Result<()>; + + /// Delete a value from adapter. + fn blocking_delete(&self, path: &str) -> Result<()>; +} + +/// Value is the typed value stored in adapter. +/// +/// It's cheap to clone so that users can read data without extra copy. +#[derive(Debug, Clone)] +pub struct Value { + /// Metadata of this value. + pub metadata: Metadata, + /// The correbonding content of this value. + pub value: Bytes, +} + +impl Value { + /// Create a new dir of value. + pub fn new_dir() -> Self { + Self { + metadata: Metadata::new(EntryMode::DIR) + .with_content_length(0) + .with_last_modified(Utc::now()), + value: Bytes::new(), + } + } + + /// Size returns the in-memory size of Value. + pub fn size(&self) -> usize { + size_of::() + self.value.len() + } +} diff --git a/core/src/raw/adapters/typed_kv/backend.rs b/core/src/raw/adapters/typed_kv/backend.rs new file mode 100644 index 000000000000..cf7b963f2cc8 --- /dev/null +++ b/core/src/raw/adapters/typed_kv/backend.rs @@ -0,0 +1,282 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use bytes::Bytes; + +use super::Adapter; +use super::Value; +use crate::ops::*; +use crate::raw::oio::VectorCursor; +use crate::raw::*; +use crate::*; + +/// The typed kv backend which implmenets Accessor for for typed kv adapter. +#[derive(Debug, Clone)] +pub struct Backend { + kv: Arc, + root: String, +} + +impl Backend +where + S: Adapter, +{ + /// Create a new kv backend. + pub fn new(kv: S) -> Self { + Self { + kv: Arc::new(kv), + root: "/".to_string(), + } + } + + /// Configure root within this backend. + pub fn with_root(mut self, root: &str) -> Self { + self.root = normalize_root(root); + self + } +} + +#[async_trait] +impl Accessor for Backend { + type Reader = oio::Cursor; + type BlockingReader = oio::Cursor; + type Writer = KvWriter; + type BlockingWriter = KvWriter; + type Pager = (); + type BlockingPager = (); + + fn info(&self) -> AccessorInfo { + let (scheme, name) = self.kv.metadata(); + + let mut am = AccessorInfo::default(); + am.set_scheme(scheme); + am.set_name(&name); + am.set_root(&self.root); + + let cap = am.capability_mut(); + cap.read = true; + cap.read_can_seek = true; + cap.read_can_next = true; + cap.read_with_range = true; + cap.stat = true; + + cap.write = true; + cap.write_with_cache_control = true; + cap.write_with_content_disposition = true; + cap.write_with_content_type = true; + cap.write_without_content_length = true; + cap.create_dir = true; + cap.delete = true; + + am + } + + async fn create_dir(&self, path: &str, _: OpCreate) -> Result { + let p = build_abs_path(&self.root, path); + self.kv.set(&p, Value::new_dir()).await?; + Ok(RpCreate::default()) + } + + fn blocking_create_dir(&self, path: &str, _: OpCreate) -> Result { + let p = build_abs_path(&self.root, path); + self.kv.blocking_set(&p, Value::new_dir())?; + + Ok(RpCreate::default()) + } + + async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { + let p = build_abs_path(&self.root, path); + + let bs = match self.kv.get(&p).await? { + // TODO: we can reuse the metadata in value to build content range. + Some(bs) => bs.value, + None => return Err(Error::new(ErrorKind::NotFound, "kv doesn't have this path")), + }; + + let bs = self.apply_range(bs, args.range()); + + let length = bs.len(); + Ok((RpRead::new(length as u64), oio::Cursor::from(bs))) + } + + fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { + let p = build_abs_path(&self.root, path); + + let bs = match self.kv.blocking_get(&p)? { + // TODO: we can reuse the metadata in value to build content range. + Some(bs) => bs.value, + None => return Err(Error::new(ErrorKind::NotFound, "kv doesn't have this path")), + }; + + let bs = self.apply_range(bs, args.range()); + Ok((RpRead::new(bs.len() as u64), oio::Cursor::from(bs))) + } + + async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { + let p = build_abs_path(&self.root, path); + + Ok((RpWrite::new(), KvWriter::new(self.kv.clone(), p, args))) + } + + fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { + let p = build_abs_path(&self.root, path); + + Ok((RpWrite::new(), KvWriter::new(self.kv.clone(), p, args))) + } + + async fn stat(&self, path: &str, _: OpStat) -> Result { + let p = build_abs_path(&self.root, path); + + if p.is_empty() || p.ends_with('/') { + Ok(RpStat::new(Metadata::new(EntryMode::DIR))) + } else { + let bs = self.kv.get(&p).await?; + match bs { + Some(bs) => Ok(RpStat::new(bs.metadata)), + None => Err(Error::new(ErrorKind::NotFound, "kv doesn't have this path")), + } + } + } + + fn blocking_stat(&self, path: &str, _: OpStat) -> Result { + let p = build_abs_path(&self.root, path); + + if p.is_empty() || p.ends_with('/') { + Ok(RpStat::new(Metadata::new(EntryMode::DIR))) + } else { + let bs = self.kv.blocking_get(&p)?; + match bs { + Some(bs) => Ok(RpStat::new(bs.metadata)), + None => Err(Error::new(ErrorKind::NotFound, "kv doesn't have this path")), + } + } + } + + async fn delete(&self, path: &str, _: OpDelete) -> Result { + let p = build_abs_path(&self.root, path); + + self.kv.delete(&p).await?; + Ok(RpDelete::default()) + } + + fn blocking_delete(&self, path: &str, _: OpDelete) -> Result { + let p = build_abs_path(&self.root, path); + + self.kv.blocking_delete(&p)?; + Ok(RpDelete::default()) + } +} + +impl Backend +where + S: Adapter, +{ + fn apply_range(&self, mut bs: Bytes, br: BytesRange) -> Bytes { + match (br.offset(), br.size()) { + (Some(offset), Some(size)) => { + let mut bs = bs.split_off(offset as usize); + if (size as usize) < bs.len() { + let _ = bs.split_off(size as usize); + } + bs + } + (Some(offset), None) => bs.split_off(offset as usize), + (None, Some(size)) => bs.split_off(bs.len() - size as usize), + (None, None) => bs, + } + } +} + +pub struct KvWriter { + kv: Arc, + path: String, + + op: OpWrite, + buf: VectorCursor, +} + +impl KvWriter { + fn new(kv: Arc, path: String, op: OpWrite) -> Self { + KvWriter { + kv, + path, + op, + buf: VectorCursor::new(), + } + } + + fn build(&self) -> Value { + let mut metadata = Metadata::new(EntryMode::FILE); + if let Some(v) = self.op.cache_control() { + metadata.set_cache_control(v); + } + if let Some(v) = self.op.content_disposition() { + metadata.set_content_disposition(v); + } + if let Some(v) = self.op.content_type() { + metadata.set_content_type(v); + } + if let Some(v) = self.op.content_length() { + metadata.set_content_length(v); + } else { + metadata.set_content_length(self.buf.len() as u64); + } + + Value { + metadata, + value: self.buf.peak_all(), + } + } +} + +#[async_trait] +impl oio::Write for KvWriter { + // TODO: we need to support append in the future. + async fn write(&mut self, bs: Bytes) -> Result<()> { + self.buf.push(bs); + + Ok(()) + } + + async fn abort(&mut self) -> Result<()> { + self.buf.clear(); + + Ok(()) + } + + async fn close(&mut self) -> Result<()> { + self.kv.set(&self.path, self.build()).await?; + Ok(()) + } +} + +impl oio::BlockingWrite for KvWriter { + fn write(&mut self, bs: Bytes) -> Result<()> { + self.buf.push(bs); + + Ok(()) + } + + fn close(&mut self) -> Result<()> { + self.kv.blocking_set(&self.path, self.build())?; + + Ok(()) + } +} diff --git a/core/src/raw/adapters/typed_kv/mod.rs b/core/src/raw/adapters/typed_kv/mod.rs new file mode 100644 index 000000000000..12a4e599411f --- /dev/null +++ b/core/src/raw/adapters/typed_kv/mod.rs @@ -0,0 +1,27 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Providing Typed Key Value Adapter for OpenDAL. +//! +//! Any services that implement `Adapter` can be used an OpenDAL Service. + +mod api; +pub use api::Adapter; +pub use api::Value; + +mod backend; +pub use backend::Backend; diff --git a/core/src/raw/oio/cursor.rs b/core/src/raw/oio/cursor.rs index 1e34c9b9a3d1..7ce2247d3364 100644 --- a/core/src/raw/oio/cursor.rs +++ b/core/src/raw/oio/cursor.rs @@ -47,6 +47,12 @@ impl Cursor { } } +impl From for Cursor { + fn from(v: Bytes) -> Self { + Cursor { inner: v, pos: 0 } + } +} + impl From> for Cursor { fn from(v: Vec) -> Self { Cursor { @@ -259,6 +265,21 @@ impl VectorCursor { bs.freeze() } + /// peak all will read and copy all bytes from current cursor + /// without change it's content. + pub fn peak_all(&self) -> Bytes { + // Avoid data copy if we only have one bytes. + if self.inner.len() == 1 { + return self.inner[0].clone(); + } + + let mut bs = BytesMut::with_capacity(self.len()); + for b in &self.inner { + bs.extend_from_slice(b); + } + bs.freeze() + } + /// Take will consume n bytes from current cursor. /// /// # Panics diff --git a/core/src/services/moka/backend.rs b/core/src/services/moka/backend.rs index f33b698c06b1..f2dd7eee0c8c 100644 --- a/core/src/services/moka/backend.rs +++ b/core/src/services/moka/backend.rs @@ -24,7 +24,7 @@ use log::debug; use moka::sync::CacheBuilder; use moka::sync::SegmentedCache; -use crate::raw::adapters::kv; +use crate::raw::adapters::typed_kv; use crate::*; /// [moka](https://github.com/moka-rs/moka) backend support. @@ -151,11 +151,11 @@ impl Builder for MokaBuilder { fn build(&mut self) -> Result { debug!("backend build started: {:?}", &self); - let mut builder: CacheBuilder, _> = + let mut builder: CacheBuilder = SegmentedCache::builder(self.num_segments.unwrap_or(1)) .thread_pool_enabled(self.thread_pool_enabled.unwrap_or(false)); // Use entries' bytes as capacity weigher. - builder = builder.weigher(|k, v| (k.len() + v.len()) as u32); + builder = builder.weigher(|k, v| (k.len() + v.size()) as u32); if let Some(v) = &self.name { builder = builder.name(v); } @@ -177,11 +177,11 @@ impl Builder for MokaBuilder { } /// Backend is used to serve `Accessor` support in moka. -pub type MokaBackend = kv::Backend; +pub type MokaBackend = typed_kv::Backend; #[derive(Clone)] pub struct Adapter { - inner: SegmentedCache>, + inner: SegmentedCache, } impl Debug for Adapter { @@ -194,37 +194,31 @@ impl Debug for Adapter { } #[async_trait] -impl kv::Adapter for Adapter { - fn metadata(&self) -> kv::Metadata { - kv::Metadata::new( +impl typed_kv::Adapter for Adapter { + fn metadata(&self) -> (Scheme, String) { + ( Scheme::Moka, - self.inner.name().unwrap_or("moka"), - Capability { - read: true, - write: true, - - ..Default::default() - }, + self.inner.name().unwrap_or("moka").to_string(), ) } - async fn get(&self, path: &str) -> Result>> { + async fn get(&self, path: &str) -> Result> { self.blocking_get(path) } - fn blocking_get(&self, path: &str) -> Result>> { + fn blocking_get(&self, path: &str) -> Result> { match self.inner.get(path) { None => Ok(None), Some(bs) => Ok(Some(bs)), } } - async fn set(&self, path: &str, value: &[u8]) -> Result<()> { + async fn set(&self, path: &str, value: typed_kv::Value) -> Result<()> { self.blocking_set(path, value) } - fn blocking_set(&self, path: &str, value: &[u8]) -> Result<()> { - self.inner.insert(path.to_string(), value.to_vec()); + fn blocking_set(&self, path: &str, value: typed_kv::Value) -> Result<()> { + self.inner.insert(path.to_string(), value); Ok(()) } From f998ffcc053e2ffa36ce4d3bc84cd56593b29b0f Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Sun, 7 May 2023 19:40:44 +0800 Subject: [PATCH 2/3] Add bench for moka Signed-off-by: Xuanwo --- core/benches/ops/utils.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/benches/ops/utils.rs b/core/benches/ops/utils.rs index 15dc1915b191..7c29aa2f2e97 100644 --- a/core/benches/ops/utils.rs +++ b/core/benches/ops/utils.rs @@ -45,6 +45,8 @@ pub fn services() -> Vec<(&'static str, Option)> { ("fs", service::()), ("s3", service::()), ("memory", service::()), + #[cfg(feature = "services-moka")] + ("moka", service::()), ] } From cb4efaa333aee45b6c40c8f237b0296fac9b072d Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Sun, 7 May 2023 19:44:52 +0800 Subject: [PATCH 3/3] Fix typo Signed-off-by: Xuanwo --- core/src/raw/adapters/typed_kv/backend.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/raw/adapters/typed_kv/backend.rs b/core/src/raw/adapters/typed_kv/backend.rs index cf7b963f2cc8..e7a85dbed27a 100644 --- a/core/src/raw/adapters/typed_kv/backend.rs +++ b/core/src/raw/adapters/typed_kv/backend.rs @@ -27,7 +27,7 @@ use crate::raw::oio::VectorCursor; use crate::raw::*; use crate::*; -/// The typed kv backend which implmenets Accessor for for typed kv adapter. +/// The typed kv backend which implements Accessor for for typed kv adapter. #[derive(Debug, Clone)] pub struct Backend { kv: Arc,