From a470954b1eb68094f275419356a4153966e7ad24 Mon Sep 17 00:00:00 2001 From: Ruslan Pislari Date: Fri, 12 Sep 2025 13:55:32 +0300 Subject: [PATCH 1/7] refactor: generic store type --- crates/http-service/Cargo.toml | 1 + crates/http-service/src/executor/http.rs | 16 +++-- crates/http-service/src/executor/wasi_http.rs | 16 +++-- crates/http-service/src/lib.rs | 10 +-- crates/http-service/src/state.rs | 6 +- crates/key-value-store/src/lib.rs | 63 ++++++++++++------- crates/runtime/src/lib.rs | 4 +- crates/runtime/src/store.rs | 12 ---- src/context.rs | 17 ++--- src/executor.rs | 5 +- src/key_value.rs | 18 +++--- 11 files changed, 95 insertions(+), 73 deletions(-) diff --git a/crates/http-service/Cargo.toml b/crates/http-service/Cargo.toml index 11d00e6..8516f5f 100644 --- a/crates/http-service/Cargo.toml +++ b/crates/http-service/Cargo.toml @@ -27,6 +27,7 @@ runtime = { path = "../runtime" } http-backend = { path = "../http-backend" } dictionary = { path = "../dictionary" } secret = { path = "../secret" } +key-value-store = { path = "../key-value-store" } nanoid = "0.4" bytesize = { workspace = true } async-trait = "0.1" diff --git a/crates/http-service/src/executor/http.rs b/crates/http-service/src/executor/http.rs index 4afd206..d9c30cf 100644 --- a/crates/http-service/src/executor/http.rs +++ b/crates/http-service/src/executor/http.rs @@ -8,6 +8,7 @@ use http::{Method, Request, Response, StatusCode}; use http_backend::Backend; use http_body_util::{BodyExt, Full}; use hyper::body::Body; +use key_value_store::StoreManager; use reactor::gcore::fastedge; use runtime::{store::StoreBuilder, InstancePre}; use std::time::{Duration, Instant}; @@ -15,17 +16,18 @@ use wasmtime_wasi_http::body::HyperOutgoingBody; /// Execute context used by ['HttpService'] #[derive(Clone)] -pub struct HttpExecutorImpl { - instance_pre: InstancePre>, +pub struct HttpExecutorImpl { + instance_pre: InstancePre>, store_builder: StoreBuilder, backend: Backend, dictionary: Dictionary, } #[async_trait] -impl HttpExecutor for HttpExecutorImpl +impl HttpExecutor for HttpExecutorImpl where - C: Clone + Send + Sync + 'static, + C: Clone + Send + Sync , + M: StoreManager + Default , { async fn execute( &self, @@ -89,6 +91,7 @@ where propagate_headers: parts.headers, propagate_header_names, dictionary: self.dictionary.clone(), + key_value_store: Default::default(), }; let mut store = store_builder.build(state)?; @@ -143,12 +146,13 @@ where } } -impl HttpExecutorImpl +impl HttpExecutorImpl where C: Clone + Send + Sync + 'static, + M: StoreManager { pub fn new( - instance_pre: InstancePre>, + instance_pre: InstancePre>, store_builder: StoreBuilder, backend: Backend, dictionary: Dictionary, diff --git a/crates/http-service/src/executor/wasi_http.rs b/crates/http-service/src/executor/wasi_http.rs index 1d603d8..f8a4000 100644 --- a/crates/http-service/src/executor/wasi_http.rs +++ b/crates/http-service/src/executor/wasi_http.rs @@ -14,20 +14,22 @@ use runtime::{store::StoreBuilder, InstancePre}; use wasmtime_wasi_http::bindings::http::types::Scheme; use wasmtime_wasi_http::bindings::ProxyPre; use wasmtime_wasi_http::{body::HyperOutgoingBody, WasiHttpView}; +use key_value_store::StoreManager; /// Execute context used by ['HttpService'] #[derive(Clone)] -pub struct WasiHttpExecutorImpl { - instance_pre: InstancePre>, +pub struct WasiHttpExecutorImpl { + instance_pre: InstancePre>, store_builder: StoreBuilder, backend: Backend, dictionary: Dictionary, } #[async_trait] -impl HttpExecutor for WasiHttpExecutorImpl +impl HttpExecutor for WasiHttpExecutorImpl where C: Clone + Send + Sync + 'static, + M: StoreManager + Default { async fn execute( &self, @@ -61,7 +63,7 @@ where parts.uri = Uri::from_parts(uparts)?; } - //FIXME send streamed request body + //TODO send streamed request body let body = body .collect() .await @@ -100,6 +102,7 @@ where propagate_headers, propagate_header_names, dictionary: self.dictionary.clone(), + key_value_store: Default::default(), }; let mut store = store_builder.build(state).context("store build")?; @@ -186,12 +189,13 @@ where } } -impl WasiHttpExecutorImpl +impl WasiHttpExecutorImpl where C: Clone + Send + Sync + 'static, + M: StoreManager { pub fn new( - instance_pre: InstancePre>, + instance_pre: InstancePre>, store_builder: StoreBuilder, backend: Backend, dictionary: Dictionary, diff --git a/crates/http-service/src/lib.rs b/crates/http-service/src/lib.rs index a000b12..e7790f7 100644 --- a/crates/http-service/src/lib.rs +++ b/crates/http-service/src/lib.rs @@ -60,7 +60,7 @@ pub struct HttpConfig { } pub struct HttpService { - engine: WasmEngine>, + engine: WasmEngine>, context: T, } @@ -74,7 +74,7 @@ where + StatsWriter + Router + ContextHeaders - + ExecutorFactory> + + ExecutorFactory> + Clone + Sync + Send @@ -82,7 +82,7 @@ where T::BackendConnector: Connect + Clone + Send + Sync + 'static, T::Executor: HttpExecutor + Send + Sync, { - type State = HttpState; + type State = HttpState; type Config = HttpConfig; type Context = T; @@ -203,7 +203,7 @@ where })?; reactor::gcore::fastedge::key_value::add_to_linker::<_, HasSelf<_>>(linker, |data| { - &mut data.key_value_store + &mut data.as_mut().key_value_store })?; Ok(()) @@ -216,7 +216,7 @@ where + StatsWriter + Router + ContextHeaders - + ExecutorFactory> + + ExecutorFactory> + Sync + Send + 'static diff --git a/crates/http-service/src/state.rs b/crates/http-service/src/state.rs index 090d7d5..0a15339 100644 --- a/crates/http-service/src/state.rs +++ b/crates/http-service/src/state.rs @@ -6,16 +6,18 @@ use http::{header, HeaderMap, HeaderName, Uri}; use http_backend::Backend; use runtime::BackendRequest; use tracing::instrument; +use key_value_store::{KeyValueStore, StoreManager}; -pub struct HttpState { +pub struct HttpState { pub(super) http_backend: Backend, pub(super) uri: Uri, pub(super) propagate_headers: HeaderMap, pub(super) propagate_header_names: Vec, pub(super) dictionary: Dictionary, + pub(super) key_value_store: KeyValueStore, } -impl BackendRequest for HttpState { +impl BackendRequest for HttpState { #[instrument(skip(self), ret, err)] fn backend_request(&mut self, mut head: Parts) -> anyhow::Result { match self.http_backend.strategy { diff --git a/crates/key-value-store/src/lib.rs b/crates/key-value-store/src/lib.rs index 56372aa..2c26208 100644 --- a/crates/key-value-store/src/lib.rs +++ b/crates/key-value-store/src/lib.rs @@ -2,34 +2,34 @@ use reactor::gcore::fastedge::key_value; use slab::Slab; use smol_str::SmolStr; use std::collections::HashMap; -use std::sync::Arc; use wasmtime::component::Resource; pub use key_value::{Error, Value}; #[async_trait::async_trait] -pub trait Store: Sync + Send { - async fn get(&self, key: &str) -> Result, Error>; +pub trait Store: Sync + Send{ + async fn get(&mut self, key: &str) -> Result, Error>; - async fn get_by_range(&self, key: &str, min: u32, max: u32) -> Result, Error>; + async fn get_by_range(&mut self, key: &str, min: u32, max: u32) -> Result, Error>; - async fn bf_exists(&self, bf: &str, key: &str) -> Result; + async fn bf_exists(&mut self, bf: &str, key: &str) -> Result; } #[async_trait::async_trait] pub trait StoreManager: Sync + Send { + type StoreType: Store; /// Get a store by db url. - async fn get_store(&self, param: &str) -> Result, Error>; + async fn get_store(&mut self, param: &str) -> Result; } #[derive(Clone)] -pub struct KeyValueStore { +pub struct KeyValueStore { allowed_stores: HashMap, - manager: Arc, - stores: Slab>, + manager: M, + stores: Slab, } -impl key_value::HostStore for KeyValueStore { +impl key_value::HostStore for KeyValueStore { async fn open(&mut self, name: String) -> Result, Error> { let store_id = KeyValueStore::open(self, &name).await?; Ok(Resource::new_own(store_id)) @@ -71,10 +71,10 @@ impl key_value::HostStore for KeyValueStore { } } -impl key_value::Host for KeyValueStore {} +impl key_value::Host for KeyValueStore {} -impl KeyValueStore { - pub fn new(allowed_stores: Vec<(SmolStr, SmolStr)>, manager: Arc) -> Self { +impl KeyValueStore { + pub fn new(allowed_stores: Vec<(SmolStr, SmolStr)>, manager: M) -> Self { Self { allowed_stores: allowed_stores.into_iter().collect(), manager, @@ -93,8 +93,8 @@ impl KeyValueStore { } /// Get a value from a store by key. - pub async fn get(&self, store: u32, key: &str) -> Result, Error> { - let Some(store) = self.stores.get(store as usize) else { + pub async fn get(&mut self, store: u32, key: &str) -> Result, Error> { + let Some(store) = self.stores.get_mut(store as usize) else { return Err(Error::NoSuchStore); }; store.get(key).await @@ -102,32 +102,32 @@ impl KeyValueStore { /// Get a values from a store by key. pub async fn get_by_range( - &self, + &mut self, store: u32, key: &str, min: u32, max: u32, ) -> Result, Error> { - let Some(store) = self.stores.get(store as usize) else { + let Some(store) = self.stores.get_mut(store as usize) else { return Err(Error::NoSuchStore); }; store.get_by_range(key, min, max).await } /// Get a value from a store by key. - pub async fn bf_exists(&self, store: u32, bf: &str, key: &str) -> Result { - let Some(store) = self.stores.get(store as usize) else { + pub async fn bf_exists(&mut self, store: u32, bf: &str, key: &str) -> Result { + let Some(store) = self.stores.get_mut(store as usize) else { return Err(Error::NoSuchStore); }; store.bf_exists(bf, key).await } } -impl Default for KeyValueStore { +impl Default for KeyValueStore { fn default() -> Self { Self { allowed_stores: Default::default(), - manager: Arc::new(NoSuchStoreManager), + manager: M::default(), stores: Slab::new(), } } @@ -135,9 +135,28 @@ impl Default for KeyValueStore { pub struct NoSuchStoreManager; +pub struct NoStore; + +#[async_trait::async_trait] +impl Store for NoStore { + async fn get(&mut self, _key: &str) -> Result, Error> { + Err(Error::NoSuchStore) + } + + async fn get_by_range(&mut self, _key: &str, _min: u32, _max: u32) -> Result, Error> { + Err(Error::NoSuchStore) + } + + async fn bf_exists(&mut self, _bf: &str, _key: &str) -> Result { + Err(Error::NoSuchStore) + } +} + #[async_trait::async_trait] impl StoreManager for NoSuchStoreManager { - async fn get_store(&self, _name: &str) -> Result, Error> { + type StoreType = NoStore; + + async fn get_store(&mut self, _name: &str) -> Result { Err(Error::NoSuchStore) } } diff --git a/crates/runtime/src/lib.rs b/crates/runtime/src/lib.rs index 69f332e..9d3aca7 100644 --- a/crates/runtime/src/lib.rs +++ b/crates/runtime/src/lib.rs @@ -1,5 +1,5 @@ use crate::app::KvStoreOption; -use key_value_store::KeyValueStore; +use key_value_store::{KeyValueStore, StoreManager}; use std::{fmt::Debug, ops::Deref}; use wasmtime_wasi::ResourceTable; use wasmtime_wasi::WasiCtxView; @@ -393,7 +393,7 @@ pub trait PreCompiledLoader { } pub trait ContextT { - type BackendConnector: 'static; + type BackendConnector: 'static; fn make_logger(&self, app_name: SmolStr, wrk: &App) -> Logger; diff --git a/crates/runtime/src/store.rs b/crates/runtime/src/store.rs index 1b73919..388dc44 100644 --- a/crates/runtime/src/store.rs +++ b/crates/runtime/src/store.rs @@ -3,7 +3,6 @@ use crate::logger::Logger; use crate::registry::CachedGraphRegistry; use crate::{Data, Wasi, WasiVersion, DEFAULT_EPOCH_TICK_INTERVAL}; use anyhow::Result; -use key_value_store::KeyValueStore; use secret::SecretStore; use std::{ collections::HashMap, @@ -83,7 +82,6 @@ pub struct StoreBuilder { properties: HashMap, registry: CachedGraphRegistry, secret_store: SecretStore, - key_value_store: KeyValueStore, } impl StoreBuilder { @@ -99,7 +97,6 @@ impl StoreBuilder { properties: Default::default(), registry: CachedGraphRegistry::new(), secret_store: Default::default(), - key_value_store: KeyValueStore::default(), } } @@ -160,14 +157,6 @@ impl StoreBuilder { } } - /// Set key value store - pub fn key_value_store(self, key_value_store: KeyValueStore) -> Self { - Self { - key_value_store, - ..self - } - } - pub fn make_wasi_nn(&self) -> Result { // initialize application specific graph let backends: Vec<&str> = self @@ -246,7 +235,6 @@ impl StoreBuilder { logger, http: WasiHttpCtx::new(), secret_store: self.secret_store, - key_value_store: self.key_value_store, }, ); inner.limiter(|state| &mut state.store_limits); diff --git a/src/context.rs b/src/context.rs index 4a985ee..6be76c4 100644 --- a/src/context.rs +++ b/src/context.rs @@ -8,7 +8,7 @@ use http_service::state::HttpState; use http_service::{ContextHeaders, ExecutorFactory}; use hyper_tls::HttpsConnector; use hyper_util::client::legacy::connect::HttpConnector; -use key_value_store::KeyValueStore; +use key_value_store::{KeyValueStore}; use runtime::app::{KvStoreOption, SecretOption}; use runtime::logger::{Console, Logger}; use runtime::util::stats::{StatRow, StatsWriter}; @@ -47,6 +47,8 @@ impl PreCompiledLoader for Context { impl ContextT for Context { type BackendConnector = HttpsConnector; + type StoreManager = CliStoreManager; + fn make_logger(&self, _app_name: SmolStr, _wrk: &App) -> Logger { Logger::new(Console::default()) } @@ -73,23 +75,23 @@ impl ContextT for Context { Ok(SecretStore::new(Arc::new(secret_impl))) } - fn make_key_value_store(&self, stores: &Vec) -> KeyValueStore { + fn make_key_value_store(&self, stores: &Vec) -> KeyValueStore { let stores = stores .iter() .map(|s| (s.name.clone(), s.param.clone())) .collect(); - KeyValueStore::new(stores, Arc::new(CliStoreManager)) + KeyValueStore::new(stores, CliStoreManager) } } -impl ExecutorFactory>> for Context { +impl ExecutorFactory, CliStoreManager>> for Context { type Executor = RunExecutor; fn get_executor( &self, name: SmolStr, app: &App, - engine: &WasmEngine>>, + engine: &WasmEngine, CliStoreManager>>, ) -> anyhow::Result { let mut dictionary = Dictionary::new(); for (k, v) in app.env.iter() { @@ -100,7 +102,7 @@ impl ExecutorFactory>> for Context { let logger = self.make_logger(name, app); let secret_store = self.make_secret_store(app.secrets.as_ref())?; - let key_value_store = self.make_key_value_store(app.kv_stores.as_ref()); + //let key_value_store = self.make_key_value_store(app.kv_stores.as_ref()); let version = WasiVersion::Preview2; let store_builder = engine @@ -109,8 +111,7 @@ impl ExecutorFactory>> for Context { .max_memory_size(app.mem_limit) .max_epoch_ticks(app.max_duration) .logger(logger) - .secret_store(secret_store) - .key_value_store(key_value_store); + .secret_store(secret_store); let component = self.loader().load_component(app.binary_id)?; let instance_pre = engine.component_instantiate_pre(&component)?; diff --git a/src/executor.rs b/src/executor.rs index 6a153b4..ab8e311 100644 --- a/src/executor.rs +++ b/src/executor.rs @@ -8,10 +8,11 @@ use hyper::body::Body; use hyper_tls::HttpsConnector; use hyper_util::client::legacy::connect::HttpConnector; use std::time::Duration; +use crate::key_value::CliStoreManager; pub enum RunExecutor { - Http(HttpExecutorImpl>), - Wasi(WasiHttpExecutorImpl>), + Http(HttpExecutorImpl, CliStoreManager>), + Wasi(WasiHttpExecutorImpl, CliStoreManager>), } #[async_trait] diff --git a/src/key_value.rs b/src/key_value.rs index def387d..b4aff40 100644 --- a/src/key_value.rs +++ b/src/key_value.rs @@ -1,28 +1,30 @@ use key_value_store::{Error, Store, StoreManager, Value}; -use std::sync::Arc; -struct CliStore; +pub struct CliStore; -pub(crate) struct CliStoreManager; +#[derive(Default)] +pub struct CliStoreManager; #[async_trait::async_trait] impl StoreManager for CliStoreManager { - async fn get_store(&self, _name: &str) -> Result, Error> { - Ok(Arc::new(CliStore)) + type StoreType = CliStore; + + async fn get_store(&mut self, _name: &str) -> Result { + Ok(CliStore) } } #[async_trait::async_trait] impl Store for CliStore { - async fn get(&self, key: &str) -> Result>, Error> { + async fn get(&mut self, key: &str) -> Result>, Error> { Ok(Some(key.as_bytes().to_vec())) } - async fn get_by_range(&self, _key: &str, _min: u32, _max: u32) -> Result, Error> { + async fn get_by_range(&mut self, _key: &str, _min: u32, _max: u32) -> Result, Error> { todo!() } - async fn bf_exists(&self, _bf: &str, _key: &str) -> Result { + async fn bf_exists(&mut self, _bf: &str, _key: &str) -> Result { todo!() } } From 3fab88d56bdbafd2f1eaf28bc4856eec7ca50937 Mon Sep 17 00:00:00 2001 From: Ruslan Pislari Date: Mon, 15 Sep 2025 09:51:54 +0300 Subject: [PATCH 2/7] Revert "refactor: generic store type" This reverts commit 2558edfeddf3199565838d12315e8225db61576e. --- crates/http-service/Cargo.toml | 1 - crates/http-service/src/executor/http.rs | 23 +++---- crates/http-service/src/executor/wasi_http.rs | 16 ++--- crates/http-service/src/lib.rs | 10 +-- crates/http-service/src/state.rs | 6 +- crates/key-value-store/src/lib.rs | 63 +++++++------------ crates/runtime/src/lib.rs | 2 +- crates/runtime/src/store.rs | 12 ++++ src/context.rs | 17 +++-- src/executor.rs | 5 +- src/key_value.rs | 18 +++--- 11 files changed, 74 insertions(+), 99 deletions(-) diff --git a/crates/http-service/Cargo.toml b/crates/http-service/Cargo.toml index 8516f5f..11d00e6 100644 --- a/crates/http-service/Cargo.toml +++ b/crates/http-service/Cargo.toml @@ -27,7 +27,6 @@ runtime = { path = "../runtime" } http-backend = { path = "../http-backend" } dictionary = { path = "../dictionary" } secret = { path = "../secret" } -key-value-store = { path = "../key-value-store" } nanoid = "0.4" bytesize = { workspace = true } async-trait = "0.1" diff --git a/crates/http-service/src/executor/http.rs b/crates/http-service/src/executor/http.rs index d9c30cf..d322b56 100644 --- a/crates/http-service/src/executor/http.rs +++ b/crates/http-service/src/executor/http.rs @@ -8,7 +8,6 @@ use http::{Method, Request, Response, StatusCode}; use http_backend::Backend; use http_body_util::{BodyExt, Full}; use hyper::body::Body; -use key_value_store::StoreManager; use reactor::gcore::fastedge; use runtime::{store::StoreBuilder, InstancePre}; use std::time::{Duration, Instant}; @@ -16,18 +15,17 @@ use wasmtime_wasi_http::body::HyperOutgoingBody; /// Execute context used by ['HttpService'] #[derive(Clone)] -pub struct HttpExecutorImpl { - instance_pre: InstancePre>, +pub struct HttpExecutorImpl { + instance_pre: InstancePre>, store_builder: StoreBuilder, backend: Backend, dictionary: Dictionary, } #[async_trait] -impl HttpExecutor for HttpExecutorImpl +impl HttpExecutor for HttpExecutorImpl where - C: Clone + Send + Sync , - M: StoreManager + Default , + C: Clone + Send + Sync + 'static, { async fn execute( &self, @@ -91,14 +89,12 @@ where propagate_headers: parts.headers, propagate_header_names, dictionary: self.dictionary.clone(), - key_value_store: Default::default(), }; let mut store = store_builder.build(state)?; let instance = self.instance_pre.instantiate_async(&mut store).await?; - let http_handler = - instance.get_export_index(&mut store, None, "gcore:fastedge/http-handler"); + let http_handler = instance.get_export_index(&mut store, None, "gcore:fastedge/http-handler"); let process = instance .get_export_index(&mut store, http_handler.as_ref(), "process") .ok_or_else(|| anyhow!("gcore:fastedge/http-handler instance not found"))?; @@ -114,9 +110,7 @@ where Err(error) => { // log to application logger error if let Some(ref logger) = store.data().logger { - logger - .write_msg(format!("Execution error: {}", error)) - .await; + logger.write_msg(format!("Execution error: {}", error)).await; } return Err(error); } @@ -146,13 +140,12 @@ where } } -impl HttpExecutorImpl +impl HttpExecutorImpl where C: Clone + Send + Sync + 'static, - M: StoreManager { pub fn new( - instance_pre: InstancePre>, + instance_pre: InstancePre>, store_builder: StoreBuilder, backend: Backend, dictionary: Dictionary, diff --git a/crates/http-service/src/executor/wasi_http.rs b/crates/http-service/src/executor/wasi_http.rs index f8a4000..1d603d8 100644 --- a/crates/http-service/src/executor/wasi_http.rs +++ b/crates/http-service/src/executor/wasi_http.rs @@ -14,22 +14,20 @@ use runtime::{store::StoreBuilder, InstancePre}; use wasmtime_wasi_http::bindings::http::types::Scheme; use wasmtime_wasi_http::bindings::ProxyPre; use wasmtime_wasi_http::{body::HyperOutgoingBody, WasiHttpView}; -use key_value_store::StoreManager; /// Execute context used by ['HttpService'] #[derive(Clone)] -pub struct WasiHttpExecutorImpl { - instance_pre: InstancePre>, +pub struct WasiHttpExecutorImpl { + instance_pre: InstancePre>, store_builder: StoreBuilder, backend: Backend, dictionary: Dictionary, } #[async_trait] -impl HttpExecutor for WasiHttpExecutorImpl +impl HttpExecutor for WasiHttpExecutorImpl where C: Clone + Send + Sync + 'static, - M: StoreManager + Default { async fn execute( &self, @@ -63,7 +61,7 @@ where parts.uri = Uri::from_parts(uparts)?; } - //TODO send streamed request body + //FIXME send streamed request body let body = body .collect() .await @@ -102,7 +100,6 @@ where propagate_headers, propagate_header_names, dictionary: self.dictionary.clone(), - key_value_store: Default::default(), }; let mut store = store_builder.build(state).context("store build")?; @@ -189,13 +186,12 @@ where } } -impl WasiHttpExecutorImpl +impl WasiHttpExecutorImpl where C: Clone + Send + Sync + 'static, - M: StoreManager { pub fn new( - instance_pre: InstancePre>, + instance_pre: InstancePre>, store_builder: StoreBuilder, backend: Backend, dictionary: Dictionary, diff --git a/crates/http-service/src/lib.rs b/crates/http-service/src/lib.rs index e7790f7..a000b12 100644 --- a/crates/http-service/src/lib.rs +++ b/crates/http-service/src/lib.rs @@ -60,7 +60,7 @@ pub struct HttpConfig { } pub struct HttpService { - engine: WasmEngine>, + engine: WasmEngine>, context: T, } @@ -74,7 +74,7 @@ where + StatsWriter + Router + ContextHeaders - + ExecutorFactory> + + ExecutorFactory> + Clone + Sync + Send @@ -82,7 +82,7 @@ where T::BackendConnector: Connect + Clone + Send + Sync + 'static, T::Executor: HttpExecutor + Send + Sync, { - type State = HttpState; + type State = HttpState; type Config = HttpConfig; type Context = T; @@ -203,7 +203,7 @@ where })?; reactor::gcore::fastedge::key_value::add_to_linker::<_, HasSelf<_>>(linker, |data| { - &mut data.as_mut().key_value_store + &mut data.key_value_store })?; Ok(()) @@ -216,7 +216,7 @@ where + StatsWriter + Router + ContextHeaders - + ExecutorFactory> + + ExecutorFactory> + Sync + Send + 'static diff --git a/crates/http-service/src/state.rs b/crates/http-service/src/state.rs index 0a15339..090d7d5 100644 --- a/crates/http-service/src/state.rs +++ b/crates/http-service/src/state.rs @@ -6,18 +6,16 @@ use http::{header, HeaderMap, HeaderName, Uri}; use http_backend::Backend; use runtime::BackendRequest; use tracing::instrument; -use key_value_store::{KeyValueStore, StoreManager}; -pub struct HttpState { +pub struct HttpState { pub(super) http_backend: Backend, pub(super) uri: Uri, pub(super) propagate_headers: HeaderMap, pub(super) propagate_header_names: Vec, pub(super) dictionary: Dictionary, - pub(super) key_value_store: KeyValueStore, } -impl BackendRequest for HttpState { +impl BackendRequest for HttpState { #[instrument(skip(self), ret, err)] fn backend_request(&mut self, mut head: Parts) -> anyhow::Result { match self.http_backend.strategy { diff --git a/crates/key-value-store/src/lib.rs b/crates/key-value-store/src/lib.rs index 2c26208..56372aa 100644 --- a/crates/key-value-store/src/lib.rs +++ b/crates/key-value-store/src/lib.rs @@ -2,34 +2,34 @@ use reactor::gcore::fastedge::key_value; use slab::Slab; use smol_str::SmolStr; use std::collections::HashMap; +use std::sync::Arc; use wasmtime::component::Resource; pub use key_value::{Error, Value}; #[async_trait::async_trait] -pub trait Store: Sync + Send{ - async fn get(&mut self, key: &str) -> Result, Error>; +pub trait Store: Sync + Send { + async fn get(&self, key: &str) -> Result, Error>; - async fn get_by_range(&mut self, key: &str, min: u32, max: u32) -> Result, Error>; + async fn get_by_range(&self, key: &str, min: u32, max: u32) -> Result, Error>; - async fn bf_exists(&mut self, bf: &str, key: &str) -> Result; + async fn bf_exists(&self, bf: &str, key: &str) -> Result; } #[async_trait::async_trait] pub trait StoreManager: Sync + Send { - type StoreType: Store; /// Get a store by db url. - async fn get_store(&mut self, param: &str) -> Result; + async fn get_store(&self, param: &str) -> Result, Error>; } #[derive(Clone)] -pub struct KeyValueStore { +pub struct KeyValueStore { allowed_stores: HashMap, - manager: M, - stores: Slab, + manager: Arc, + stores: Slab>, } -impl key_value::HostStore for KeyValueStore { +impl key_value::HostStore for KeyValueStore { async fn open(&mut self, name: String) -> Result, Error> { let store_id = KeyValueStore::open(self, &name).await?; Ok(Resource::new_own(store_id)) @@ -71,10 +71,10 @@ impl key_value::HostStore for KeyValueStore { } } -impl key_value::Host for KeyValueStore {} +impl key_value::Host for KeyValueStore {} -impl KeyValueStore { - pub fn new(allowed_stores: Vec<(SmolStr, SmolStr)>, manager: M) -> Self { +impl KeyValueStore { + pub fn new(allowed_stores: Vec<(SmolStr, SmolStr)>, manager: Arc) -> Self { Self { allowed_stores: allowed_stores.into_iter().collect(), manager, @@ -93,8 +93,8 @@ impl KeyValueStore { } /// Get a value from a store by key. - pub async fn get(&mut self, store: u32, key: &str) -> Result, Error> { - let Some(store) = self.stores.get_mut(store as usize) else { + pub async fn get(&self, store: u32, key: &str) -> Result, Error> { + let Some(store) = self.stores.get(store as usize) else { return Err(Error::NoSuchStore); }; store.get(key).await @@ -102,32 +102,32 @@ impl KeyValueStore { /// Get a values from a store by key. pub async fn get_by_range( - &mut self, + &self, store: u32, key: &str, min: u32, max: u32, ) -> Result, Error> { - let Some(store) = self.stores.get_mut(store as usize) else { + let Some(store) = self.stores.get(store as usize) else { return Err(Error::NoSuchStore); }; store.get_by_range(key, min, max).await } /// Get a value from a store by key. - pub async fn bf_exists(&mut self, store: u32, bf: &str, key: &str) -> Result { - let Some(store) = self.stores.get_mut(store as usize) else { + pub async fn bf_exists(&self, store: u32, bf: &str, key: &str) -> Result { + let Some(store) = self.stores.get(store as usize) else { return Err(Error::NoSuchStore); }; store.bf_exists(bf, key).await } } -impl Default for KeyValueStore { +impl Default for KeyValueStore { fn default() -> Self { Self { allowed_stores: Default::default(), - manager: M::default(), + manager: Arc::new(NoSuchStoreManager), stores: Slab::new(), } } @@ -135,28 +135,9 @@ impl Default for KeyValueStore { pub struct NoSuchStoreManager; -pub struct NoStore; - -#[async_trait::async_trait] -impl Store for NoStore { - async fn get(&mut self, _key: &str) -> Result, Error> { - Err(Error::NoSuchStore) - } - - async fn get_by_range(&mut self, _key: &str, _min: u32, _max: u32) -> Result, Error> { - Err(Error::NoSuchStore) - } - - async fn bf_exists(&mut self, _bf: &str, _key: &str) -> Result { - Err(Error::NoSuchStore) - } -} - #[async_trait::async_trait] impl StoreManager for NoSuchStoreManager { - type StoreType = NoStore; - - async fn get_store(&mut self, _name: &str) -> Result { + async fn get_store(&self, _name: &str) -> Result, Error> { Err(Error::NoSuchStore) } } diff --git a/crates/runtime/src/lib.rs b/crates/runtime/src/lib.rs index 9d3aca7..0461f2e 100644 --- a/crates/runtime/src/lib.rs +++ b/crates/runtime/src/lib.rs @@ -1,5 +1,5 @@ use crate::app::KvStoreOption; -use key_value_store::{KeyValueStore, StoreManager}; +use key_value_store::KeyValueStore; use std::{fmt::Debug, ops::Deref}; use wasmtime_wasi::ResourceTable; use wasmtime_wasi::WasiCtxView; diff --git a/crates/runtime/src/store.rs b/crates/runtime/src/store.rs index 388dc44..1b73919 100644 --- a/crates/runtime/src/store.rs +++ b/crates/runtime/src/store.rs @@ -3,6 +3,7 @@ use crate::logger::Logger; use crate::registry::CachedGraphRegistry; use crate::{Data, Wasi, WasiVersion, DEFAULT_EPOCH_TICK_INTERVAL}; use anyhow::Result; +use key_value_store::KeyValueStore; use secret::SecretStore; use std::{ collections::HashMap, @@ -82,6 +83,7 @@ pub struct StoreBuilder { properties: HashMap, registry: CachedGraphRegistry, secret_store: SecretStore, + key_value_store: KeyValueStore, } impl StoreBuilder { @@ -97,6 +99,7 @@ impl StoreBuilder { properties: Default::default(), registry: CachedGraphRegistry::new(), secret_store: Default::default(), + key_value_store: KeyValueStore::default(), } } @@ -157,6 +160,14 @@ impl StoreBuilder { } } + /// Set key value store + pub fn key_value_store(self, key_value_store: KeyValueStore) -> Self { + Self { + key_value_store, + ..self + } + } + pub fn make_wasi_nn(&self) -> Result { // initialize application specific graph let backends: Vec<&str> = self @@ -235,6 +246,7 @@ impl StoreBuilder { logger, http: WasiHttpCtx::new(), secret_store: self.secret_store, + key_value_store: self.key_value_store, }, ); inner.limiter(|state| &mut state.store_limits); diff --git a/src/context.rs b/src/context.rs index 6be76c4..4a985ee 100644 --- a/src/context.rs +++ b/src/context.rs @@ -8,7 +8,7 @@ use http_service::state::HttpState; use http_service::{ContextHeaders, ExecutorFactory}; use hyper_tls::HttpsConnector; use hyper_util::client::legacy::connect::HttpConnector; -use key_value_store::{KeyValueStore}; +use key_value_store::KeyValueStore; use runtime::app::{KvStoreOption, SecretOption}; use runtime::logger::{Console, Logger}; use runtime::util::stats::{StatRow, StatsWriter}; @@ -47,8 +47,6 @@ impl PreCompiledLoader for Context { impl ContextT for Context { type BackendConnector = HttpsConnector; - type StoreManager = CliStoreManager; - fn make_logger(&self, _app_name: SmolStr, _wrk: &App) -> Logger { Logger::new(Console::default()) } @@ -75,23 +73,23 @@ impl ContextT for Context { Ok(SecretStore::new(Arc::new(secret_impl))) } - fn make_key_value_store(&self, stores: &Vec) -> KeyValueStore { + fn make_key_value_store(&self, stores: &Vec) -> KeyValueStore { let stores = stores .iter() .map(|s| (s.name.clone(), s.param.clone())) .collect(); - KeyValueStore::new(stores, CliStoreManager) + KeyValueStore::new(stores, Arc::new(CliStoreManager)) } } -impl ExecutorFactory, CliStoreManager>> for Context { +impl ExecutorFactory>> for Context { type Executor = RunExecutor; fn get_executor( &self, name: SmolStr, app: &App, - engine: &WasmEngine, CliStoreManager>>, + engine: &WasmEngine>>, ) -> anyhow::Result { let mut dictionary = Dictionary::new(); for (k, v) in app.env.iter() { @@ -102,7 +100,7 @@ impl ExecutorFactory, CliStoreManager>> let logger = self.make_logger(name, app); let secret_store = self.make_secret_store(app.secrets.as_ref())?; - //let key_value_store = self.make_key_value_store(app.kv_stores.as_ref()); + let key_value_store = self.make_key_value_store(app.kv_stores.as_ref()); let version = WasiVersion::Preview2; let store_builder = engine @@ -111,7 +109,8 @@ impl ExecutorFactory, CliStoreManager>> .max_memory_size(app.mem_limit) .max_epoch_ticks(app.max_duration) .logger(logger) - .secret_store(secret_store); + .secret_store(secret_store) + .key_value_store(key_value_store); let component = self.loader().load_component(app.binary_id)?; let instance_pre = engine.component_instantiate_pre(&component)?; diff --git a/src/executor.rs b/src/executor.rs index ab8e311..6a153b4 100644 --- a/src/executor.rs +++ b/src/executor.rs @@ -8,11 +8,10 @@ use hyper::body::Body; use hyper_tls::HttpsConnector; use hyper_util::client::legacy::connect::HttpConnector; use std::time::Duration; -use crate::key_value::CliStoreManager; pub enum RunExecutor { - Http(HttpExecutorImpl, CliStoreManager>), - Wasi(WasiHttpExecutorImpl, CliStoreManager>), + Http(HttpExecutorImpl>), + Wasi(WasiHttpExecutorImpl>), } #[async_trait] diff --git a/src/key_value.rs b/src/key_value.rs index b4aff40..def387d 100644 --- a/src/key_value.rs +++ b/src/key_value.rs @@ -1,30 +1,28 @@ use key_value_store::{Error, Store, StoreManager, Value}; +use std::sync::Arc; -pub struct CliStore; +struct CliStore; -#[derive(Default)] -pub struct CliStoreManager; +pub(crate) struct CliStoreManager; #[async_trait::async_trait] impl StoreManager for CliStoreManager { - type StoreType = CliStore; - - async fn get_store(&mut self, _name: &str) -> Result { - Ok(CliStore) + async fn get_store(&self, _name: &str) -> Result, Error> { + Ok(Arc::new(CliStore)) } } #[async_trait::async_trait] impl Store for CliStore { - async fn get(&mut self, key: &str) -> Result>, Error> { + async fn get(&self, key: &str) -> Result>, Error> { Ok(Some(key.as_bytes().to_vec())) } - async fn get_by_range(&mut self, _key: &str, _min: u32, _max: u32) -> Result, Error> { + async fn get_by_range(&self, _key: &str, _min: u32, _max: u32) -> Result, Error> { todo!() } - async fn bf_exists(&mut self, _bf: &str, _key: &str) -> Result { + async fn bf_exists(&self, _bf: &str, _key: &str) -> Result { todo!() } } From 7603aee357b3249155785b67c2b539a6e869f3f0 Mon Sep 17 00:00:00 2001 From: Ruslan Pislari Date: Wed, 17 Sep 2025 09:51:33 +0300 Subject: [PATCH 3/7] feat: updating key-value store interface --- Cargo.lock | 1 + crates/key-value-store/Cargo.toml | 1 + crates/key-value-store/src/lib.rs | 72 ++++++++++++++++++++++++++----- sdk | 2 +- src/key_value.rs | 14 ++++-- 5 files changed, 75 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6e8676a..61465ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1954,6 +1954,7 @@ dependencies = [ "reactor", "slab", "smol_str", + "tracing", "wasmtime", ] diff --git a/crates/key-value-store/Cargo.toml b/crates/key-value-store/Cargo.toml index 1505b79..9adc0cd 100644 --- a/crates/key-value-store/Cargo.toml +++ b/crates/key-value-store/Cargo.toml @@ -12,6 +12,7 @@ wasmtime = {workspace = true} slab = "0.4" async-trait = "0.1" smol_str = {workspace = true} +tracing = "0.1" [lints] workspace = true diff --git a/crates/key-value-store/src/lib.rs b/crates/key-value-store/src/lib.rs index 56372aa..93fad90 100644 --- a/crates/key-value-store/src/lib.rs +++ b/crates/key-value-store/src/lib.rs @@ -3,6 +3,7 @@ use slab::Slab; use smol_str::SmolStr; use std::collections::HashMap; use std::sync::Arc; +use tracing::instrument; use wasmtime::component::Resource; pub use key_value::{Error, Value}; @@ -11,9 +12,13 @@ pub use key_value::{Error, Value}; pub trait Store: Sync + Send { async fn get(&self, key: &str) -> Result, Error>; - async fn get_by_range(&self, key: &str, min: u32, max: u32) -> Result, Error>; + async fn zrange(&self, key: &str, min: u32, max: u32) -> Result, Error>; - async fn bf_exists(&self, bf: &str, key: &str) -> Result; + async fn scan(&self, pattern: &str) -> Result, Error>; + + async fn zscan(&self, key: &str, pattern: &str) -> Result, Error>; + + async fn cf_exists(&self, key: &str, item: &str) -> Result; } #[async_trait::async_trait] @@ -44,7 +49,7 @@ impl key_value::HostStore for KeyValueStore { KeyValueStore::get(self, store_id, &key).await } - async fn get_by_range( + async fn zrange( &mut self, store: Resource, key: String, @@ -52,17 +57,36 @@ impl key_value::HostStore for KeyValueStore { max: u32, ) -> Result, Error> { let store_id = store.rep(); - KeyValueStore::get_by_range(self, store_id, &key, min, max).await + KeyValueStore::zrange(self, store_id, &key, min, max).await + } + + async fn scan( + &mut self, + store: Resource, + pattern: String, + ) -> Result, Error> { + let store_id = store.rep(); + KeyValueStore::scan(self, store_id, &pattern).await + } + + async fn zscan( + &mut self, + store: Resource, + key: String, + pattern: String, + ) -> Result, Error> { + let store_id = store.rep(); + KeyValueStore::zscan(self, store_id, &key, &pattern).await } - async fn bf_exists( + async fn cf_exists( &mut self, store: Resource, - bf: String, key: String, + item: String, ) -> Result { let store_id = store.rep(); - KeyValueStore::bf_exists(self, store_id, &bf, &key).await + KeyValueStore::cf_exists(self, store_id, &key, &item).await } async fn drop(&mut self, store: Resource) -> Result<(), wasmtime::Error> { @@ -74,6 +98,7 @@ impl key_value::HostStore for KeyValueStore { impl key_value::Host for KeyValueStore {} impl KeyValueStore { + #[instrument(skip(manager), level = "trace")] pub fn new(allowed_stores: Vec<(SmolStr, SmolStr)>, manager: Arc) -> Self { Self { allowed_stores: allowed_stores.into_iter().collect(), @@ -83,6 +108,7 @@ impl KeyValueStore { } /// Open a store by name. Return the store ID. + #[instrument(skip(self), level = "trace", ret, err)] pub async fn open(&mut self, name: &str) -> Result { if let Some(param) = self.allowed_stores.get(name) { let store = self.manager.get_store(¶m).await?; @@ -93,6 +119,7 @@ impl KeyValueStore { } /// Get a value from a store by key. + #[instrument(skip(self), level = "trace", ret, err)] pub async fn get(&self, store: u32, key: &str) -> Result, Error> { let Some(store) = self.stores.get(store as usize) else { return Err(Error::NoSuchStore); @@ -101,7 +128,8 @@ impl KeyValueStore { } /// Get a values from a store by key. - pub async fn get_by_range( + #[instrument(skip(self), level = "trace", ret, err)] + pub async fn zrange( &self, store: u32, key: &str, @@ -111,15 +139,37 @@ impl KeyValueStore { let Some(store) = self.stores.get(store as usize) else { return Err(Error::NoSuchStore); }; - store.get_by_range(key, min, max).await + store.zrange(key, min, max).await + } + + #[instrument(skip(self), level = "trace", ret, err)] + async fn scan(&mut self, store: u32, pattern: &str) -> Result, Error> { + let Some(store) = self.stores.get(store as usize) else { + return Err(Error::NoSuchStore); + }; + store.scan(pattern).await + } + + #[instrument(skip(self), level = "trace", ret, err)] + async fn zscan( + &mut self, + store: u32, + key: &str, + pattern: &str, + ) -> Result, Error> { + let Some(store) = self.stores.get(store as usize) else { + return Err(Error::NoSuchStore); + }; + store.zscan(key, pattern).await } /// Get a value from a store by key. - pub async fn bf_exists(&self, store: u32, bf: &str, key: &str) -> Result { + #[instrument(skip(self), level = "trace", ret, err)] + pub async fn cf_exists(&self, store: u32, key: &str, item: &str) -> Result { let Some(store) = self.stores.get(store as usize) else { return Err(Error::NoSuchStore); }; - store.bf_exists(bf, key).await + store.cf_exists(key, item).await } } diff --git a/sdk b/sdk index af1ab97..5d29c6a 160000 --- a/sdk +++ b/sdk @@ -1 +1 @@ -Subproject commit af1ab97436e8fa4a4db5e6191aae98397081ef02 +Subproject commit 5d29c6a7420a83af0e114b723a431239b84fa300 diff --git a/src/key_value.rs b/src/key_value.rs index def387d..b32b59c 100644 --- a/src/key_value.rs +++ b/src/key_value.rs @@ -14,15 +14,23 @@ impl StoreManager for CliStoreManager { #[async_trait::async_trait] impl Store for CliStore { - async fn get(&self, key: &str) -> Result>, Error> { + async fn get(&self, key: &str) -> Result, Error> { Ok(Some(key.as_bytes().to_vec())) } - async fn get_by_range(&self, _key: &str, _min: u32, _max: u32) -> Result, Error> { + async fn zrange(&self, _key: &str, _min: u32, _max: u32) -> Result, Error> { todo!() } - async fn bf_exists(&self, _bf: &str, _key: &str) -> Result { + async fn scan(&self, _pattern: &str) -> Result, Error> { + todo!() + } + + async fn zscan(&self, _key: &str, _pattern: &str) -> Result, Error> { + todo!() + } + + async fn cf_exists(&self, _key: &str, _item: &str) -> Result { todo!() } } From 02515052ee76f6112d93605a7d67532095d0ef5b Mon Sep 17 00:00:00 2001 From: Ruslan Pislari Date: Wed, 17 Sep 2025 11:49:10 +0300 Subject: [PATCH 4/7] feat: adding redis store impl --- Cargo.lock | 85 +++++++++++++++++++++++- crates/key-value-store/Cargo.toml | 5 ++ crates/key-value-store/src/lib.rs | 24 ++++--- crates/key-value-store/src/redis_impl.rs | 85 ++++++++++++++++++++++++ crates/runtime/Cargo.toml | 2 +- src/context.rs | 5 +- src/dotenv.rs | 10 +-- src/key_value.rs | 38 ++++------- src/main.rs | 24 ++++++- 9 files changed, 232 insertions(+), 46 deletions(-) create mode 100644 crates/key-value-store/src/redis_impl.rs diff --git a/Cargo.lock b/Cargo.lock index 61465ca..9bd5efd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -121,6 +121,12 @@ dependencies = [ "derive_arbitrary", ] +[[package]] +name = "arc-swap" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" + [[package]] name = "async-trait" version = "0.1.89" @@ -144,6 +150,15 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +[[package]] +name = "backon" +version = "1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "592277618714fbcecda9a02ba7a8781f319d26532a88553bbacc77ba5d2b3a8d" +dependencies = [ + "fastrand", +] + [[package]] name = "backtrace" version = "0.3.75" @@ -552,6 +567,20 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "combine" +version = "4.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" +dependencies = [ + "bytes", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", + "tokio-util", +] + [[package]] name = "core-foundation" version = "0.9.4" @@ -1678,7 +1707,7 @@ dependencies = [ "hyper", "libc", "pin-project-lite", - "socket2", + "socket2 0.6.0", "tokio", "tower-service", "tracing", @@ -1891,6 +1920,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.14.0" @@ -1952,6 +1990,7 @@ version = "0.11.9" dependencies = [ "async-trait", "reactor", + "redis", "slab", "smol_str", "tracing", @@ -2817,6 +2856,32 @@ version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "03251193000f4bd3b042892be858ee50e8b3719f2b08e5833ac4353724632430" +[[package]] +name = "redis" +version = "0.27.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09d8f99a4090c89cc489a94833c901ead69bfbf3877b4867d5482e321ee875bc" +dependencies = [ + "arc-swap", + "async-trait", + "backon", + "bytes", + "combine", + "futures", + "futures-util", + "itertools 0.13.0", + "itoa", + "num-bigint", + "percent-encoding", + "pin-project-lite", + "ryu", + "sha1_smol", + "socket2 0.5.10", + "tokio", + "tokio-util", + "url", +] + [[package]] name = "redox_syscall" version = "0.5.17" @@ -3202,6 +3267,12 @@ dependencies = [ "serde", ] +[[package]] +name = "sha1_smol" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d" + [[package]] name = "sha2" version = "0.10.9" @@ -3283,6 +3354,16 @@ dependencies = [ "serde", ] +[[package]] +name = "socket2" +version = "0.5.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "socket2" version = "0.6.0" @@ -3524,7 +3605,7 @@ dependencies = [ "pin-project-lite", "signal-hook-registry", "slab", - "socket2", + "socket2 0.6.0", "tokio-macros", "windows-sys 0.59.0", ] diff --git a/crates/key-value-store/Cargo.toml b/crates/key-value-store/Cargo.toml index 9adc0cd..05b2503 100644 --- a/crates/key-value-store/Cargo.toml +++ b/crates/key-value-store/Cargo.toml @@ -6,6 +6,10 @@ publish.workspace = true authors.workspace = true description = "key-value store host function" +[features] +default = [] +redis = ["dep:redis"] + [dependencies] reactor = { path = "../reactor" } wasmtime = {workspace = true} @@ -13,6 +17,7 @@ slab = "0.4" async-trait = "0.1" smol_str = {workspace = true} tracing = "0.1" +redis = { version = "0.27.6", features = ["aio", "tokio-comp", "connection-manager"], optional = true} [lints] workspace = true diff --git a/crates/key-value-store/src/lib.rs b/crates/key-value-store/src/lib.rs index 93fad90..1eb5bda 100644 --- a/crates/key-value-store/src/lib.rs +++ b/crates/key-value-store/src/lib.rs @@ -1,3 +1,6 @@ +#[cfg(feature = "redis")] +mod redis_impl; + use reactor::gcore::fastedge::key_value; use slab::Slab; use smol_str::SmolStr; @@ -8,6 +11,9 @@ use wasmtime::component::Resource; pub use key_value::{Error, Value}; +#[cfg(feature = "redis")] +pub use redis_impl::RedisStore; + #[async_trait::async_trait] pub trait Store: Sync + Send { async fn get(&self, key: &str) -> Result, Error>; @@ -49,6 +55,15 @@ impl key_value::HostStore for KeyValueStore { KeyValueStore::get(self, store_id, &key).await } + async fn scan( + &mut self, + store: Resource, + pattern: String, + ) -> Result, Error> { + let store_id = store.rep(); + KeyValueStore::scan(self, store_id, &pattern).await + } + async fn zrange( &mut self, store: Resource, @@ -60,15 +75,6 @@ impl key_value::HostStore for KeyValueStore { KeyValueStore::zrange(self, store_id, &key, min, max).await } - async fn scan( - &mut self, - store: Resource, - pattern: String, - ) -> Result, Error> { - let store_id = store.rep(); - KeyValueStore::scan(self, store_id, &pattern).await - } - async fn zscan( &mut self, store: Resource, diff --git a/crates/key-value-store/src/redis_impl.rs b/crates/key-value-store/src/redis_impl.rs new file mode 100644 index 0000000..09e88a7 --- /dev/null +++ b/crates/key-value-store/src/redis_impl.rs @@ -0,0 +1,85 @@ +use crate::Store; +use redis::{AsyncCommands, AsyncIter}; +use reactor::gcore::fastedge::key_value::{Error, Value}; + +#[derive(Clone)] +pub struct RedisStore { + inner: redis::aio::MultiplexedConnection, +} + +impl RedisStore { + pub async fn open(params: &str) -> Result { + let conn = ::redis::Client::open(params).map_err(|error| { + tracing::warn!(error=?error, "redis open"); + Error::InternalError + })? + .get_multiplexed_async_connection() + .await.map_err(|error| { + tracing::warn!(error=?error, "redis open"); + Error::InternalError + })?; + Ok(Self{ + inner: conn + }) + } +} + +#[async_trait::async_trait] +impl Store for RedisStore { + async fn get(&self, key: &str) -> Result, Error> { + self.inner.clone().get(key).await.map_err(|error| { + tracing::warn!(cause=?error, "redis get"); + Error::InternalError + }) + } + + async fn zrange(&self, key: &str, min: u32, max: u32) -> Result, Error> { + self.inner + .clone() + .zrangebyscore(key, min, max) + .await + .map_err(|error| { + tracing::warn!(cause=?error, "redis zrangebyscore"); + Error::InternalError + }) + } + + async fn scan(&self, pattern: &str) -> Result, Error> { + let mut conn = self.inner.clone(); + let mut it = conn.scan_match(pattern).await.map_err(|error| { + tracing::warn!(cause=?error, "redis scan_match"); + Error::InternalError + })?; + let mut ret = vec![]; + while let Some(element) = it.next_item().await { + ret.push(element); + } + Ok(ret) + } + + async fn zscan(&self, key: &str, pattern: &str) -> Result, Error> { + let mut conn = self.inner.clone(); + let mut it: AsyncIter<(Value, u32)> = + conn.zscan_match(key, pattern).await.map_err(|error| { + tracing::warn!(cause=?error, "redis zscan_match"); + Error::InternalError + })?; + let mut ret = vec![]; + while let Some(element) = it.next_item().await { + ret.push(element); + } + Ok(ret) + } + + async fn cf_exists(&self, key: &str, item: &str) -> Result { + redis::cmd("CF.EXISTS") + .arg(key) + .arg(item) + .query_async(&mut self.inner.clone()) + .await + .map_err(|error| { + tracing::warn!(cause=?error, "redis cf_exists"); + Error::InternalError + }) + } +} diff --git a/crates/runtime/Cargo.toml b/crates/runtime/Cargo.toml index 558677a..16d757a 100644 --- a/crates/runtime/Cargo.toml +++ b/crates/runtime/Cargo.toml @@ -29,7 +29,7 @@ wit-component = "0.228.0" tracing = { workspace = true } bytesize = { workspace = true } http-backend = { path = "../http-backend" } -key-value-store = { path = "../key-value-store" } +key-value-store = { path = "../key-value-store" , features = ["redis"]} secret = { path = "../secret" } async-trait = "0.1" bytes = "1.10" diff --git a/src/context.rs b/src/context.rs index 4a985ee..28ecdb3 100644 --- a/src/context.rs +++ b/src/context.rs @@ -74,11 +74,12 @@ impl ContextT for Context { } fn make_key_value_store(&self, stores: &Vec) -> KeyValueStore { - let stores = stores + let allowed_stores = stores .iter() .map(|s| (s.name.clone(), s.param.clone())) .collect(); - KeyValueStore::new(stores, Arc::new(CliStoreManager)) + let manager = CliStoreManager{stores: stores.to_owned()}; + KeyValueStore::new(allowed_stores, Arc::new(manager)) } } diff --git a/src/dotenv.rs b/src/dotenv.rs index 3f79f1b..03cd379 100644 --- a/src/dotenv.rs +++ b/src/dotenv.rs @@ -11,6 +11,7 @@ pub enum EnvArgType { ReqHeader, Env, Secrets, + KvStore } #[derive(Debug, PartialEq)] @@ -20,6 +21,7 @@ enum EnvArgFileType { Variables, Secrets, DotEnv, + KvStore } impl From for EnvArgFileType { @@ -29,6 +31,7 @@ impl From for EnvArgFileType { EnvArgType::ReqHeader => EnvArgFileType::ReqHeaders, EnvArgType::Env => EnvArgFileType::Variables, EnvArgType::Secrets => EnvArgFileType::Secrets, + EnvArgType::KvStore => EnvArgFileType::KvStore } } } @@ -39,10 +42,7 @@ pub struct DotEnvInjector { impl DotEnvInjector { pub fn new(file_path: Option) -> Self { - let file_path = match file_path { - Some(path) => path, - None => current_dir().unwrap_or_else(|_| Path::new(".").to_path_buf()), - }; + let file_path = file_path.unwrap_or_else(|| current_dir().unwrap_or_else(|_| Path::new(".").to_path_buf())); Self { file_path } } @@ -91,6 +91,7 @@ impl DotEnvInjector { EnvArgType::ReqHeader => "FASTEDGE_VAR_REQ_HEADER_", EnvArgType::Env => "FASTEDGE_VAR_ENV_", EnvArgType::Secrets => "FASTEDGE_VAR_SECRET_", + EnvArgType::KvStore => "FASTEDGE_VAR_KV_STORE" }; if key.starts_with("FASTEDGE_VAR_") { @@ -116,6 +117,7 @@ impl DotEnvInjector { EnvArgFileType::Variables => ".env.variables", EnvArgFileType::Secrets => ".env.secrets", EnvArgFileType::DotEnv => ".env", + EnvArgFileType::KvStore => ".env.kv_stores" }; let filename = self.file_path.join(env_arg_file_type_str); diff --git a/src/key_value.rs b/src/key_value.rs index b32b59c..6a0e24c 100644 --- a/src/key_value.rs +++ b/src/key_value.rs @@ -1,36 +1,22 @@ -use key_value_store::{Error, Store, StoreManager, Value}; +use key_value_store::{Error, Store, StoreManager}; use std::sync::Arc; +use key_value_store::RedisStore; +use runtime::app::KvStoreOption; -struct CliStore; -pub(crate) struct CliStoreManager; +pub(crate) struct CliStoreManager { + pub(crate) stores: Vec, +} #[async_trait::async_trait] impl StoreManager for CliStoreManager { - async fn get_store(&self, _name: &str) -> Result, Error> { - Ok(Arc::new(CliStore)) + async fn get_store(&self, name: &str) -> Result, Error> { + let Some(opts) = self.stores.iter().find(|store| store.name == name) else { + return Err(Error::NoSuchStore); + }; + let store = RedisStore::open(&opts.param).await?; + Ok(Arc::new(store)) } } -#[async_trait::async_trait] -impl Store for CliStore { - async fn get(&self, key: &str) -> Result, Error> { - Ok(Some(key.as_bytes().to_vec())) - } - - async fn zrange(&self, _key: &str, _min: u32, _max: u32) -> Result, Error> { - todo!() - } - async fn scan(&self, _pattern: &str) -> Result, Error> { - todo!() - } - - async fn zscan(&self, _key: &str, _pattern: &str) -> Result, Error> { - todo!() - } - - async fn cf_exists(&self, _key: &str, _item: &str) -> Result { - todo!() - } -} diff --git a/src/main.rs b/src/main.rs index ed7282a..9e1e736 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,7 +12,7 @@ use http_backend::{Backend, BackendStrategy}; use http_service::{HttpConfig, HttpService}; use hyper_tls::HttpsConnector; use hyper_util::client::legacy::connect::HttpConnector; -use runtime::app::Status; +use runtime::app::{KvStoreOption, Status}; use runtime::service::{Service, ServiceBuilder}; use runtime::{App, SecretValue, WasmConfig}; use smol_str::{SmolStr, ToSmolStr}; @@ -72,6 +72,9 @@ struct HttpRunArgs { /// Headers added to response #[arg(long, value_parser = parse_key_value::< SmolStr, SmolStr >)] rsp_headers: Option>, + /// key-value store redis url list (ex: --kv-stores myredis=redis://localhost) + #[arg(long, value_parser = parse_key_value::< SmolStr, SmolStr >)] + kv_stores: Option>, } #[tokio::main] @@ -137,6 +140,23 @@ async fn main() -> anyhow::Result<()> { }); } + let kv_stores = dotenv_injector.merge_with_dotenv_variables( + has_dotenv_flag, + EnvArgType::KvStore, + run.kv_stores.unwrap_or_default().into_iter().collect(), + ); + + let kv_stores = kv_stores + .into_iter() + .map(|(name, param)| KvStoreOption { + param, + name, + prefix: Default::default(), + cache_size: 0, + cache_ttl: 0, + }) + .collect(); + let cli_app = App { binary_id: 0, max_duration: run.max_duration.map(|m| m / 10).unwrap_or(60000), @@ -150,7 +170,7 @@ async fn main() -> anyhow::Result<()> { status: Status::Enabled, debug_until: None, secrets, - kv_stores: vec![], + kv_stores, }; let mut headers = dotenv_injector.merge_with_dotenv_variables( From 56b8efaf0ce070826ae4ada9e48def0c1966b16f Mon Sep 17 00:00:00 2001 From: Ruslan Pislari Date: Wed, 17 Sep 2025 12:49:26 +0300 Subject: [PATCH 5/7] fix: changing score type to f64 --- crates/key-value-store/src/lib.rs | 16 ++++++++-------- crates/key-value-store/src/redis_impl.rs | 6 +++--- sdk | 2 +- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/crates/key-value-store/src/lib.rs b/crates/key-value-store/src/lib.rs index 1eb5bda..20ef503 100644 --- a/crates/key-value-store/src/lib.rs +++ b/crates/key-value-store/src/lib.rs @@ -18,11 +18,11 @@ pub use redis_impl::RedisStore; pub trait Store: Sync + Send { async fn get(&self, key: &str) -> Result, Error>; - async fn zrange(&self, key: &str, min: u32, max: u32) -> Result, Error>; + async fn zrange(&self, key: &str, min: f64, max: f64) -> Result, Error>; async fn scan(&self, pattern: &str) -> Result, Error>; - async fn zscan(&self, key: &str, pattern: &str) -> Result, Error>; + async fn zscan(&self, key: &str, pattern: &str) -> Result, Error>; async fn cf_exists(&self, key: &str, item: &str) -> Result; } @@ -68,8 +68,8 @@ impl key_value::HostStore for KeyValueStore { &mut self, store: Resource, key: String, - min: u32, - max: u32, + min: f64, + max: f64, ) -> Result, Error> { let store_id = store.rep(); KeyValueStore::zrange(self, store_id, &key, min, max).await @@ -80,7 +80,7 @@ impl key_value::HostStore for KeyValueStore { store: Resource, key: String, pattern: String, - ) -> Result, Error> { + ) -> Result, Error> { let store_id = store.rep(); KeyValueStore::zscan(self, store_id, &key, &pattern).await } @@ -139,8 +139,8 @@ impl KeyValueStore { &self, store: u32, key: &str, - min: u32, - max: u32, + min: f64, + max: f64, ) -> Result, Error> { let Some(store) = self.stores.get(store as usize) else { return Err(Error::NoSuchStore); @@ -162,7 +162,7 @@ impl KeyValueStore { store: u32, key: &str, pattern: &str, - ) -> Result, Error> { + ) -> Result, Error> { let Some(store) = self.stores.get(store as usize) else { return Err(Error::NoSuchStore); }; diff --git a/crates/key-value-store/src/redis_impl.rs b/crates/key-value-store/src/redis_impl.rs index 09e88a7..bbebd2c 100644 --- a/crates/key-value-store/src/redis_impl.rs +++ b/crates/key-value-store/src/redis_impl.rs @@ -33,7 +33,7 @@ impl Store for RedisStore { }) } - async fn zrange(&self, key: &str, min: u32, max: u32) -> Result, Error> { + async fn zrange(&self, key: &str, min: f64, max: f64) -> Result, Error> { self.inner .clone() .zrangebyscore(key, min, max) @@ -57,9 +57,9 @@ impl Store for RedisStore { Ok(ret) } - async fn zscan(&self, key: &str, pattern: &str) -> Result, Error> { + async fn zscan(&self, key: &str, pattern: &str) -> Result, Error> { let mut conn = self.inner.clone(); - let mut it: AsyncIter<(Value, u32)> = + let mut it: AsyncIter<(Value, f64)> = conn.zscan_match(key, pattern).await.map_err(|error| { tracing::warn!(cause=?error, "redis zscan_match"); Error::InternalError diff --git a/sdk b/sdk index 5d29c6a..2dfcd06 160000 --- a/sdk +++ b/sdk @@ -1 +1 @@ -Subproject commit 5d29c6a7420a83af0e114b723a431239b84fa300 +Subproject commit 2dfcd0699ffd493fdb3f59301ede4cd456ded408 From ea67d4b60361f23ae21fc06dee44cacc2ee46936 Mon Sep 17 00:00:00 2001 From: Ruslan Pislari Date: Wed, 17 Sep 2025 13:02:40 +0300 Subject: [PATCH 6/7] fix: formatting --- crates/http-service/src/executor/http.rs | 7 +++++-- crates/key-value-store/src/redis_impl.rs | 24 ++++++++++++------------ crates/runtime/src/lib.rs | 2 +- src/context.rs | 4 +++- src/dotenv.rs | 13 +++++++------ src/key_value.rs | 7 ++----- 6 files changed, 30 insertions(+), 27 deletions(-) diff --git a/crates/http-service/src/executor/http.rs b/crates/http-service/src/executor/http.rs index d322b56..4afd206 100644 --- a/crates/http-service/src/executor/http.rs +++ b/crates/http-service/src/executor/http.rs @@ -94,7 +94,8 @@ where let mut store = store_builder.build(state)?; let instance = self.instance_pre.instantiate_async(&mut store).await?; - let http_handler = instance.get_export_index(&mut store, None, "gcore:fastedge/http-handler"); + let http_handler = + instance.get_export_index(&mut store, None, "gcore:fastedge/http-handler"); let process = instance .get_export_index(&mut store, http_handler.as_ref(), "process") .ok_or_else(|| anyhow!("gcore:fastedge/http-handler instance not found"))?; @@ -110,7 +111,9 @@ where Err(error) => { // log to application logger error if let Some(ref logger) = store.data().logger { - logger.write_msg(format!("Execution error: {}", error)).await; + logger + .write_msg(format!("Execution error: {}", error)) + .await; } return Err(error); } diff --git a/crates/key-value-store/src/redis_impl.rs b/crates/key-value-store/src/redis_impl.rs index bbebd2c..9d60658 100644 --- a/crates/key-value-store/src/redis_impl.rs +++ b/crates/key-value-store/src/redis_impl.rs @@ -1,6 +1,6 @@ use crate::Store; -use redis::{AsyncCommands, AsyncIter}; use reactor::gcore::fastedge::key_value::{Error, Value}; +use redis::{AsyncCommands, AsyncIter}; #[derive(Clone)] pub struct RedisStore { @@ -9,18 +9,18 @@ pub struct RedisStore { impl RedisStore { pub async fn open(params: &str) -> Result { - let conn = ::redis::Client::open(params).map_err(|error| { - tracing::warn!(error=?error, "redis open"); - Error::InternalError - })? + let conn = ::redis::Client::open(params) + .map_err(|error| { + tracing::warn!(error=?error, "redis open"); + Error::InternalError + })? .get_multiplexed_async_connection() - .await.map_err(|error| { - tracing::warn!(error=?error, "redis open"); - Error::InternalError - })?; - Ok(Self{ - inner: conn - }) + .await + .map_err(|error| { + tracing::warn!(error=?error, "redis open"); + Error::InternalError + })?; + Ok(Self { inner: conn }) } } diff --git a/crates/runtime/src/lib.rs b/crates/runtime/src/lib.rs index 0461f2e..69f332e 100644 --- a/crates/runtime/src/lib.rs +++ b/crates/runtime/src/lib.rs @@ -393,7 +393,7 @@ pub trait PreCompiledLoader { } pub trait ContextT { - type BackendConnector: 'static; + type BackendConnector: 'static; fn make_logger(&self, app_name: SmolStr, wrk: &App) -> Logger; diff --git a/src/context.rs b/src/context.rs index 28ecdb3..e5a964d 100644 --- a/src/context.rs +++ b/src/context.rs @@ -78,7 +78,9 @@ impl ContextT for Context { .iter() .map(|s| (s.name.clone(), s.param.clone())) .collect(); - let manager = CliStoreManager{stores: stores.to_owned()}; + let manager = CliStoreManager { + stores: stores.to_owned(), + }; KeyValueStore::new(allowed_stores, Arc::new(manager)) } } diff --git a/src/dotenv.rs b/src/dotenv.rs index 03cd379..f077823 100644 --- a/src/dotenv.rs +++ b/src/dotenv.rs @@ -11,7 +11,7 @@ pub enum EnvArgType { ReqHeader, Env, Secrets, - KvStore + KvStore, } #[derive(Debug, PartialEq)] @@ -21,7 +21,7 @@ enum EnvArgFileType { Variables, Secrets, DotEnv, - KvStore + KvStore, } impl From for EnvArgFileType { @@ -31,7 +31,7 @@ impl From for EnvArgFileType { EnvArgType::ReqHeader => EnvArgFileType::ReqHeaders, EnvArgType::Env => EnvArgFileType::Variables, EnvArgType::Secrets => EnvArgFileType::Secrets, - EnvArgType::KvStore => EnvArgFileType::KvStore + EnvArgType::KvStore => EnvArgFileType::KvStore, } } } @@ -42,7 +42,8 @@ pub struct DotEnvInjector { impl DotEnvInjector { pub fn new(file_path: Option) -> Self { - let file_path = file_path.unwrap_or_else(|| current_dir().unwrap_or_else(|_| Path::new(".").to_path_buf())); + let file_path = file_path + .unwrap_or_else(|| current_dir().unwrap_or_else(|_| Path::new(".").to_path_buf())); Self { file_path } } @@ -91,7 +92,7 @@ impl DotEnvInjector { EnvArgType::ReqHeader => "FASTEDGE_VAR_REQ_HEADER_", EnvArgType::Env => "FASTEDGE_VAR_ENV_", EnvArgType::Secrets => "FASTEDGE_VAR_SECRET_", - EnvArgType::KvStore => "FASTEDGE_VAR_KV_STORE" + EnvArgType::KvStore => "FASTEDGE_VAR_KV_STORE", }; if key.starts_with("FASTEDGE_VAR_") { @@ -117,7 +118,7 @@ impl DotEnvInjector { EnvArgFileType::Variables => ".env.variables", EnvArgFileType::Secrets => ".env.secrets", EnvArgFileType::DotEnv => ".env", - EnvArgFileType::KvStore => ".env.kv_stores" + EnvArgFileType::KvStore => ".env.kv_stores", }; let filename = self.file_path.join(env_arg_file_type_str); diff --git a/src/key_value.rs b/src/key_value.rs index 6a0e24c..88b4a6c 100644 --- a/src/key_value.rs +++ b/src/key_value.rs @@ -1,8 +1,7 @@ -use key_value_store::{Error, Store, StoreManager}; -use std::sync::Arc; use key_value_store::RedisStore; +use key_value_store::{Error, Store, StoreManager}; use runtime::app::KvStoreOption; - +use std::sync::Arc; pub(crate) struct CliStoreManager { pub(crate) stores: Vec, @@ -18,5 +17,3 @@ impl StoreManager for CliStoreManager { Ok(Arc::new(store)) } } - - From bbcf0035bb32d2dee8237b799a8c45c83342cd6f Mon Sep 17 00:00:00 2001 From: Ruslan Pislari Date: Wed, 17 Sep 2025 14:40:45 +0300 Subject: [PATCH 7/7] forward sdk --- sdk | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk b/sdk index 2dfcd06..0a12055 160000 --- a/sdk +++ b/sdk @@ -1 +1 @@ -Subproject commit 2dfcd0699ffd493fdb3f59301ede4cd456ded408 +Subproject commit 0a12055ee06c894333a88617781785617fe93900