diff --git a/Cargo.lock b/Cargo.lock index 6e8676a..9bd5efd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -121,6 +121,12 @@ dependencies = [ "derive_arbitrary", ] +[[package]] +name = "arc-swap" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" + [[package]] name = "async-trait" version = "0.1.89" @@ -144,6 +150,15 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +[[package]] +name = "backon" +version = "1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "592277618714fbcecda9a02ba7a8781f319d26532a88553bbacc77ba5d2b3a8d" +dependencies = [ + "fastrand", +] + [[package]] name = "backtrace" version = "0.3.75" @@ -552,6 +567,20 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "combine" +version = "4.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" +dependencies = [ + "bytes", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", + "tokio-util", +] + [[package]] name = "core-foundation" version = "0.9.4" @@ -1678,7 +1707,7 @@ dependencies = [ "hyper", "libc", "pin-project-lite", - "socket2", + "socket2 0.6.0", "tokio", "tower-service", "tracing", @@ -1891,6 +1920,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.14.0" @@ -1952,8 +1990,10 @@ version = "0.11.9" dependencies = [ "async-trait", "reactor", + "redis", "slab", "smol_str", + "tracing", "wasmtime", ] @@ -2816,6 +2856,32 @@ version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "03251193000f4bd3b042892be858ee50e8b3719f2b08e5833ac4353724632430" +[[package]] +name = "redis" +version = "0.27.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09d8f99a4090c89cc489a94833c901ead69bfbf3877b4867d5482e321ee875bc" +dependencies = [ + "arc-swap", + "async-trait", + "backon", + "bytes", + "combine", + "futures", + "futures-util", + "itertools 0.13.0", + "itoa", + "num-bigint", + "percent-encoding", + "pin-project-lite", + "ryu", + "sha1_smol", + "socket2 0.5.10", + "tokio", + "tokio-util", + "url", +] + [[package]] name = "redox_syscall" version = "0.5.17" @@ -3201,6 +3267,12 @@ dependencies = [ "serde", ] +[[package]] +name = "sha1_smol" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d" + [[package]] name = "sha2" version = "0.10.9" @@ -3282,6 +3354,16 @@ dependencies = [ "serde", ] +[[package]] +name = "socket2" +version = "0.5.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "socket2" version = "0.6.0" @@ -3523,7 +3605,7 @@ dependencies = [ "pin-project-lite", "signal-hook-registry", "slab", - "socket2", + "socket2 0.6.0", "tokio-macros", "windows-sys 0.59.0", ] diff --git a/crates/key-value-store/Cargo.toml b/crates/key-value-store/Cargo.toml index 1505b79..05b2503 100644 --- a/crates/key-value-store/Cargo.toml +++ b/crates/key-value-store/Cargo.toml @@ -6,12 +6,18 @@ publish.workspace = true authors.workspace = true description = "key-value store host function" +[features] +default = [] +redis = ["dep:redis"] + [dependencies] reactor = { path = "../reactor" } wasmtime = {workspace = true} slab = "0.4" async-trait = "0.1" smol_str = {workspace = true} +tracing = "0.1" +redis = { version = "0.27.6", features = ["aio", "tokio-comp", "connection-manager"], optional = true} [lints] workspace = true diff --git a/crates/key-value-store/src/lib.rs b/crates/key-value-store/src/lib.rs index 56372aa..20ef503 100644 --- a/crates/key-value-store/src/lib.rs +++ b/crates/key-value-store/src/lib.rs @@ -1,19 +1,30 @@ +#[cfg(feature = "redis")] +mod redis_impl; + use reactor::gcore::fastedge::key_value; use slab::Slab; use smol_str::SmolStr; use std::collections::HashMap; use std::sync::Arc; +use tracing::instrument; use wasmtime::component::Resource; pub use key_value::{Error, Value}; +#[cfg(feature = "redis")] +pub use redis_impl::RedisStore; + #[async_trait::async_trait] pub trait Store: Sync + Send { async fn get(&self, key: &str) -> Result, Error>; - async fn get_by_range(&self, key: &str, min: u32, max: u32) -> Result, Error>; + async fn zrange(&self, key: &str, min: f64, max: f64) -> Result, Error>; + + async fn scan(&self, pattern: &str) -> Result, Error>; + + async fn zscan(&self, key: &str, pattern: &str) -> Result, Error>; - async fn bf_exists(&self, bf: &str, key: &str) -> Result; + async fn cf_exists(&self, key: &str, item: &str) -> Result; } #[async_trait::async_trait] @@ -44,25 +55,44 @@ impl key_value::HostStore for KeyValueStore { KeyValueStore::get(self, store_id, &key).await } - async fn get_by_range( + async fn scan( + &mut self, + store: Resource, + pattern: String, + ) -> Result, Error> { + let store_id = store.rep(); + KeyValueStore::scan(self, store_id, &pattern).await + } + + async fn zrange( &mut self, store: Resource, key: String, - min: u32, - max: u32, + min: f64, + max: f64, ) -> Result, Error> { let store_id = store.rep(); - KeyValueStore::get_by_range(self, store_id, &key, min, max).await + KeyValueStore::zrange(self, store_id, &key, min, max).await + } + + async fn zscan( + &mut self, + store: Resource, + key: String, + pattern: String, + ) -> Result, Error> { + let store_id = store.rep(); + KeyValueStore::zscan(self, store_id, &key, &pattern).await } - async fn bf_exists( + async fn cf_exists( &mut self, store: Resource, - bf: String, key: String, + item: String, ) -> Result { let store_id = store.rep(); - KeyValueStore::bf_exists(self, store_id, &bf, &key).await + KeyValueStore::cf_exists(self, store_id, &key, &item).await } async fn drop(&mut self, store: Resource) -> Result<(), wasmtime::Error> { @@ -74,6 +104,7 @@ impl key_value::HostStore for KeyValueStore { impl key_value::Host for KeyValueStore {} impl KeyValueStore { + #[instrument(skip(manager), level = "trace")] pub fn new(allowed_stores: Vec<(SmolStr, SmolStr)>, manager: Arc) -> Self { Self { allowed_stores: allowed_stores.into_iter().collect(), @@ -83,6 +114,7 @@ impl KeyValueStore { } /// Open a store by name. Return the store ID. + #[instrument(skip(self), level = "trace", ret, err)] pub async fn open(&mut self, name: &str) -> Result { if let Some(param) = self.allowed_stores.get(name) { let store = self.manager.get_store(¶m).await?; @@ -93,6 +125,7 @@ impl KeyValueStore { } /// Get a value from a store by key. + #[instrument(skip(self), level = "trace", ret, err)] pub async fn get(&self, store: u32, key: &str) -> Result, Error> { let Some(store) = self.stores.get(store as usize) else { return Err(Error::NoSuchStore); @@ -101,25 +134,48 @@ impl KeyValueStore { } /// Get a values from a store by key. - pub async fn get_by_range( + #[instrument(skip(self), level = "trace", ret, err)] + pub async fn zrange( &self, store: u32, key: &str, - min: u32, - max: u32, + min: f64, + max: f64, ) -> Result, Error> { let Some(store) = self.stores.get(store as usize) else { return Err(Error::NoSuchStore); }; - store.get_by_range(key, min, max).await + store.zrange(key, min, max).await + } + + #[instrument(skip(self), level = "trace", ret, err)] + async fn scan(&mut self, store: u32, pattern: &str) -> Result, Error> { + let Some(store) = self.stores.get(store as usize) else { + return Err(Error::NoSuchStore); + }; + store.scan(pattern).await + } + + #[instrument(skip(self), level = "trace", ret, err)] + async fn zscan( + &mut self, + store: u32, + key: &str, + pattern: &str, + ) -> Result, Error> { + let Some(store) = self.stores.get(store as usize) else { + return Err(Error::NoSuchStore); + }; + store.zscan(key, pattern).await } /// Get a value from a store by key. - pub async fn bf_exists(&self, store: u32, bf: &str, key: &str) -> Result { + #[instrument(skip(self), level = "trace", ret, err)] + pub async fn cf_exists(&self, store: u32, key: &str, item: &str) -> Result { let Some(store) = self.stores.get(store as usize) else { return Err(Error::NoSuchStore); }; - store.bf_exists(bf, key).await + store.cf_exists(key, item).await } } diff --git a/crates/key-value-store/src/redis_impl.rs b/crates/key-value-store/src/redis_impl.rs new file mode 100644 index 0000000..9d60658 --- /dev/null +++ b/crates/key-value-store/src/redis_impl.rs @@ -0,0 +1,85 @@ +use crate::Store; +use reactor::gcore::fastedge::key_value::{Error, Value}; +use redis::{AsyncCommands, AsyncIter}; + +#[derive(Clone)] +pub struct RedisStore { + inner: redis::aio::MultiplexedConnection, +} + +impl RedisStore { + pub async fn open(params: &str) -> Result { + let conn = ::redis::Client::open(params) + .map_err(|error| { + tracing::warn!(error=?error, "redis open"); + Error::InternalError + })? + .get_multiplexed_async_connection() + .await + .map_err(|error| { + tracing::warn!(error=?error, "redis open"); + Error::InternalError + })?; + Ok(Self { inner: conn }) + } +} + +#[async_trait::async_trait] +impl Store for RedisStore { + async fn get(&self, key: &str) -> Result, Error> { + self.inner.clone().get(key).await.map_err(|error| { + tracing::warn!(cause=?error, "redis get"); + Error::InternalError + }) + } + + async fn zrange(&self, key: &str, min: f64, max: f64) -> Result, Error> { + self.inner + .clone() + .zrangebyscore(key, min, max) + .await + .map_err(|error| { + tracing::warn!(cause=?error, "redis zrangebyscore"); + Error::InternalError + }) + } + + async fn scan(&self, pattern: &str) -> Result, Error> { + let mut conn = self.inner.clone(); + let mut it = conn.scan_match(pattern).await.map_err(|error| { + tracing::warn!(cause=?error, "redis scan_match"); + Error::InternalError + })?; + let mut ret = vec![]; + while let Some(element) = it.next_item().await { + ret.push(element); + } + Ok(ret) + } + + async fn zscan(&self, key: &str, pattern: &str) -> Result, Error> { + let mut conn = self.inner.clone(); + let mut it: AsyncIter<(Value, f64)> = + conn.zscan_match(key, pattern).await.map_err(|error| { + tracing::warn!(cause=?error, "redis zscan_match"); + Error::InternalError + })?; + let mut ret = vec![]; + while let Some(element) = it.next_item().await { + ret.push(element); + } + Ok(ret) + } + + async fn cf_exists(&self, key: &str, item: &str) -> Result { + redis::cmd("CF.EXISTS") + .arg(key) + .arg(item) + .query_async(&mut self.inner.clone()) + .await + .map_err(|error| { + tracing::warn!(cause=?error, "redis cf_exists"); + Error::InternalError + }) + } +} diff --git a/crates/runtime/Cargo.toml b/crates/runtime/Cargo.toml index 558677a..16d757a 100644 --- a/crates/runtime/Cargo.toml +++ b/crates/runtime/Cargo.toml @@ -29,7 +29,7 @@ wit-component = "0.228.0" tracing = { workspace = true } bytesize = { workspace = true } http-backend = { path = "../http-backend" } -key-value-store = { path = "../key-value-store" } +key-value-store = { path = "../key-value-store" , features = ["redis"]} secret = { path = "../secret" } async-trait = "0.1" bytes = "1.10" diff --git a/sdk b/sdk index af1ab97..0a12055 160000 --- a/sdk +++ b/sdk @@ -1 +1 @@ -Subproject commit af1ab97436e8fa4a4db5e6191aae98397081ef02 +Subproject commit 0a12055ee06c894333a88617781785617fe93900 diff --git a/src/context.rs b/src/context.rs index 4a985ee..e5a964d 100644 --- a/src/context.rs +++ b/src/context.rs @@ -74,11 +74,14 @@ impl ContextT for Context { } fn make_key_value_store(&self, stores: &Vec) -> KeyValueStore { - let stores = stores + let allowed_stores = stores .iter() .map(|s| (s.name.clone(), s.param.clone())) .collect(); - KeyValueStore::new(stores, Arc::new(CliStoreManager)) + let manager = CliStoreManager { + stores: stores.to_owned(), + }; + KeyValueStore::new(allowed_stores, Arc::new(manager)) } } diff --git a/src/dotenv.rs b/src/dotenv.rs index 3f79f1b..f077823 100644 --- a/src/dotenv.rs +++ b/src/dotenv.rs @@ -11,6 +11,7 @@ pub enum EnvArgType { ReqHeader, Env, Secrets, + KvStore, } #[derive(Debug, PartialEq)] @@ -20,6 +21,7 @@ enum EnvArgFileType { Variables, Secrets, DotEnv, + KvStore, } impl From for EnvArgFileType { @@ -29,6 +31,7 @@ impl From for EnvArgFileType { EnvArgType::ReqHeader => EnvArgFileType::ReqHeaders, EnvArgType::Env => EnvArgFileType::Variables, EnvArgType::Secrets => EnvArgFileType::Secrets, + EnvArgType::KvStore => EnvArgFileType::KvStore, } } } @@ -39,10 +42,8 @@ pub struct DotEnvInjector { impl DotEnvInjector { pub fn new(file_path: Option) -> Self { - let file_path = match file_path { - Some(path) => path, - None => current_dir().unwrap_or_else(|_| Path::new(".").to_path_buf()), - }; + let file_path = file_path + .unwrap_or_else(|| current_dir().unwrap_or_else(|_| Path::new(".").to_path_buf())); Self { file_path } } @@ -91,6 +92,7 @@ impl DotEnvInjector { EnvArgType::ReqHeader => "FASTEDGE_VAR_REQ_HEADER_", EnvArgType::Env => "FASTEDGE_VAR_ENV_", EnvArgType::Secrets => "FASTEDGE_VAR_SECRET_", + EnvArgType::KvStore => "FASTEDGE_VAR_KV_STORE", }; if key.starts_with("FASTEDGE_VAR_") { @@ -116,6 +118,7 @@ impl DotEnvInjector { EnvArgFileType::Variables => ".env.variables", EnvArgFileType::Secrets => ".env.secrets", EnvArgFileType::DotEnv => ".env", + EnvArgFileType::KvStore => ".env.kv_stores", }; let filename = self.file_path.join(env_arg_file_type_str); diff --git a/src/key_value.rs b/src/key_value.rs index def387d..88b4a6c 100644 --- a/src/key_value.rs +++ b/src/key_value.rs @@ -1,28 +1,19 @@ -use key_value_store::{Error, Store, StoreManager, Value}; +use key_value_store::RedisStore; +use key_value_store::{Error, Store, StoreManager}; +use runtime::app::KvStoreOption; use std::sync::Arc; -struct CliStore; - -pub(crate) struct CliStoreManager; - -#[async_trait::async_trait] -impl StoreManager for CliStoreManager { - async fn get_store(&self, _name: &str) -> Result, Error> { - Ok(Arc::new(CliStore)) - } +pub(crate) struct CliStoreManager { + pub(crate) stores: Vec, } #[async_trait::async_trait] -impl Store for CliStore { - async fn get(&self, key: &str) -> Result>, Error> { - Ok(Some(key.as_bytes().to_vec())) - } - - async fn get_by_range(&self, _key: &str, _min: u32, _max: u32) -> Result, Error> { - todo!() - } - - async fn bf_exists(&self, _bf: &str, _key: &str) -> Result { - todo!() +impl StoreManager for CliStoreManager { + async fn get_store(&self, name: &str) -> Result, Error> { + let Some(opts) = self.stores.iter().find(|store| store.name == name) else { + return Err(Error::NoSuchStore); + }; + let store = RedisStore::open(&opts.param).await?; + Ok(Arc::new(store)) } } diff --git a/src/main.rs b/src/main.rs index ed7282a..9e1e736 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,7 +12,7 @@ use http_backend::{Backend, BackendStrategy}; use http_service::{HttpConfig, HttpService}; use hyper_tls::HttpsConnector; use hyper_util::client::legacy::connect::HttpConnector; -use runtime::app::Status; +use runtime::app::{KvStoreOption, Status}; use runtime::service::{Service, ServiceBuilder}; use runtime::{App, SecretValue, WasmConfig}; use smol_str::{SmolStr, ToSmolStr}; @@ -72,6 +72,9 @@ struct HttpRunArgs { /// Headers added to response #[arg(long, value_parser = parse_key_value::< SmolStr, SmolStr >)] rsp_headers: Option>, + /// key-value store redis url list (ex: --kv-stores myredis=redis://localhost) + #[arg(long, value_parser = parse_key_value::< SmolStr, SmolStr >)] + kv_stores: Option>, } #[tokio::main] @@ -137,6 +140,23 @@ async fn main() -> anyhow::Result<()> { }); } + let kv_stores = dotenv_injector.merge_with_dotenv_variables( + has_dotenv_flag, + EnvArgType::KvStore, + run.kv_stores.unwrap_or_default().into_iter().collect(), + ); + + let kv_stores = kv_stores + .into_iter() + .map(|(name, param)| KvStoreOption { + param, + name, + prefix: Default::default(), + cache_size: 0, + cache_ttl: 0, + }) + .collect(); + let cli_app = App { binary_id: 0, max_duration: run.max_duration.map(|m| m / 10).unwrap_or(60000), @@ -150,7 +170,7 @@ async fn main() -> anyhow::Result<()> { status: Status::Enabled, debug_until: None, secrets, - kv_stores: vec![], + kv_stores, }; let mut headers = dotenv_injector.merge_with_dotenv_variables(