Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 84 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions crates/key-value-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,18 @@ publish.workspace = true
authors.workspace = true
description = "key-value store host function"

[features]
default = []
redis = ["dep:redis"]

[dependencies]
reactor = { path = "../reactor" }
wasmtime = {workspace = true}
slab = "0.4"
async-trait = "0.1"
smol_str = {workspace = true}
tracing = "0.1"
redis = { version = "0.27.6", features = ["aio", "tokio-comp", "connection-manager"], optional = true}

[lints]
workspace = true
86 changes: 71 additions & 15 deletions crates/key-value-store/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,30 @@
#[cfg(feature = "redis")]
mod redis_impl;

use reactor::gcore::fastedge::key_value;
use slab::Slab;
use smol_str::SmolStr;
use std::collections::HashMap;
use std::sync::Arc;
use tracing::instrument;
use wasmtime::component::Resource;

pub use key_value::{Error, Value};

#[cfg(feature = "redis")]
pub use redis_impl::RedisStore;

#[async_trait::async_trait]
pub trait Store: Sync + Send {
async fn get(&self, key: &str) -> Result<Option<Value>, Error>;

async fn get_by_range(&self, key: &str, min: u32, max: u32) -> Result<Vec<Value>, Error>;
async fn zrange(&self, key: &str, min: f64, max: f64) -> Result<Vec<Value>, Error>;

async fn scan(&self, pattern: &str) -> Result<Vec<String>, Error>;

async fn zscan(&self, key: &str, pattern: &str) -> Result<Vec<(Value, f64)>, Error>;

async fn bf_exists(&self, bf: &str, key: &str) -> Result<bool, Error>;
async fn cf_exists(&self, key: &str, item: &str) -> Result<bool, Error>;
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -44,25 +55,44 @@ impl key_value::HostStore for KeyValueStore {
KeyValueStore::get(self, store_id, &key).await
}

async fn get_by_range(
async fn scan(
&mut self,
store: Resource<key_value::Store>,
pattern: String,
) -> Result<Vec<String>, Error> {
let store_id = store.rep();
KeyValueStore::scan(self, store_id, &pattern).await
}

async fn zrange(
&mut self,
store: Resource<key_value::Store>,
key: String,
min: u32,
max: u32,
min: f64,
max: f64,
) -> Result<Vec<Value>, Error> {
let store_id = store.rep();
KeyValueStore::get_by_range(self, store_id, &key, min, max).await
KeyValueStore::zrange(self, store_id, &key, min, max).await
}

async fn zscan(
&mut self,
store: Resource<key_value::Store>,
key: String,
pattern: String,
) -> Result<Vec<(Value, f64)>, Error> {
let store_id = store.rep();
KeyValueStore::zscan(self, store_id, &key, &pattern).await
}

async fn bf_exists(
async fn cf_exists(
&mut self,
store: Resource<key_value::Store>,
bf: String,
key: String,
item: String,
) -> Result<bool, Error> {
let store_id = store.rep();
KeyValueStore::bf_exists(self, store_id, &bf, &key).await
KeyValueStore::cf_exists(self, store_id, &key, &item).await
}

async fn drop(&mut self, store: Resource<key_value::Store>) -> Result<(), wasmtime::Error> {
Expand All @@ -74,6 +104,7 @@ impl key_value::HostStore for KeyValueStore {
impl key_value::Host for KeyValueStore {}

impl KeyValueStore {
#[instrument(skip(manager), level = "trace")]
pub fn new(allowed_stores: Vec<(SmolStr, SmolStr)>, manager: Arc<dyn StoreManager>) -> Self {
Self {
allowed_stores: allowed_stores.into_iter().collect(),
Expand All @@ -83,6 +114,7 @@ impl KeyValueStore {
}

/// Open a store by name. Return the store ID.
#[instrument(skip(self), level = "trace", ret, err)]
pub async fn open(&mut self, name: &str) -> Result<u32, Error> {
if let Some(param) = self.allowed_stores.get(name) {
let store = self.manager.get_store(&param).await?;
Expand All @@ -93,6 +125,7 @@ impl KeyValueStore {
}

/// Get a value from a store by key.
#[instrument(skip(self), level = "trace", ret, err)]
pub async fn get(&self, store: u32, key: &str) -> Result<Option<Value>, Error> {
let Some(store) = self.stores.get(store as usize) else {
return Err(Error::NoSuchStore);
Expand All @@ -101,25 +134,48 @@ impl KeyValueStore {
}

/// Get a values from a store by key.
pub async fn get_by_range(
#[instrument(skip(self), level = "trace", ret, err)]
pub async fn zrange(
&self,
store: u32,
key: &str,
min: u32,
max: u32,
min: f64,
max: f64,
) -> Result<Vec<Value>, Error> {
let Some(store) = self.stores.get(store as usize) else {
return Err(Error::NoSuchStore);
};
store.get_by_range(key, min, max).await
store.zrange(key, min, max).await
}

#[instrument(skip(self), level = "trace", ret, err)]
async fn scan(&mut self, store: u32, pattern: &str) -> Result<Vec<String>, Error> {
let Some(store) = self.stores.get(store as usize) else {
return Err(Error::NoSuchStore);
};
store.scan(pattern).await
}

#[instrument(skip(self), level = "trace", ret, err)]
async fn zscan(
&mut self,
store: u32,
key: &str,
pattern: &str,
) -> Result<Vec<(Value, f64)>, Error> {
let Some(store) = self.stores.get(store as usize) else {
return Err(Error::NoSuchStore);
};
store.zscan(key, pattern).await
}

/// Get a value from a store by key.
pub async fn bf_exists(&self, store: u32, bf: &str, key: &str) -> Result<bool, Error> {
#[instrument(skip(self), level = "trace", ret, err)]
pub async fn cf_exists(&self, store: u32, key: &str, item: &str) -> Result<bool, Error> {
let Some(store) = self.stores.get(store as usize) else {
return Err(Error::NoSuchStore);
};
store.bf_exists(bf, key).await
store.cf_exists(key, item).await
}
}

Expand Down
Loading