Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion core/src/services/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,9 @@ pub use self::persy::Persy;
#[cfg(feature = "services-redis")]
mod redis;
#[cfg(feature = "services-redis")]
pub use self::redis::Redis;
pub use redis::Redis;
#[cfg(feature = "services-redis")]
pub use redis::RedisConfig;

#[cfg(feature = "services-rocksdb")]
mod rocksdb;
Expand Down
100 changes: 57 additions & 43 deletions core/src/services/redis/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use redis::ConnectionAddr;
use redis::ConnectionInfo;
use redis::RedisConnectionInfo;
use redis::RedisError;
use serde::Deserialize;
use tokio::sync::OnceCell;

use crate::raw::adapters::kv;
Expand All @@ -42,10 +43,11 @@ use crate::*;
const DEFAULT_REDIS_ENDPOINT: &str = "tcp://127.0.0.1:6379";
const DEFAULT_REDIS_PORT: u16 = 6379;

/// [Redis](https://redis.io/) services support.
#[doc = include_str!("docs.md")]
#[derive(Clone, Default)]
pub struct RedisBuilder {
/// Config for Redis services support.
#[derive(Default, Deserialize, Clone)]
#[serde(default)]
#[non_exhaustive]
pub struct RedisConfig {
Comment thread
Xuanwo marked this conversation as resolved.
/// network address of the Redis service. Can be "tcp://127.0.0.1:6379", e.g.
///
/// default is "tcp://127.0.0.1:6379"
Expand Down Expand Up @@ -74,24 +76,42 @@ pub struct RedisBuilder {
default_ttl: Option<Duration>,
}

impl Debug for RedisBuilder {
impl Debug for RedisConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let mut ds = f.debug_struct("Builder");
ds.field("db", &self.db.to_string());
ds.field("root", &self.root);
let mut d = f.debug_struct("RedisConfig");

d.field("db", &self.db.to_string());
d.field("root", &self.root);
if let Some(endpoint) = self.endpoint.clone() {
ds.field("endpoint", &endpoint);
d.field("endpoint", &endpoint);
}
if let Some(cluster_endpoints) = self.cluster_endpoints.clone() {
ds.field("cluster_endpoints", &cluster_endpoints);
d.field("cluster_endpoints", &cluster_endpoints);
}
if let Some(username) = self.username.clone() {
ds.field("username", &username);
d.field("username", &username);
}
if self.password.is_some() {
ds.field("password", &"<redacted>");
d.field("password", &"<redacted>");
}
ds.finish()

d.finish_non_exhaustive()
}
}

/// [Redis](https://redis.io/) services support.
#[doc = include_str!("docs.md")]
#[derive(Clone, Default)]
pub struct RedisBuilder {
config: RedisConfig,
}

impl Debug for RedisBuilder {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let mut d = f.debug_struct("RedisBuilder");

d.field("config", &self.config);
d.finish_non_exhaustive()
}
}

Expand All @@ -104,7 +124,7 @@ impl RedisBuilder {
/// - "unix" or "redis+unix": unix socket connection
pub fn endpoint(&mut self, endpoint: &str) -> &mut Self {
if !endpoint.is_empty() {
self.endpoint = Some(endpoint.to_owned());
self.config.endpoint = Some(endpoint.to_owned());
}
self
}
Expand All @@ -118,7 +138,7 @@ impl RedisBuilder {
/// - "unix" or "redis+unix": unix socket connection
pub fn cluster_endpoints(&mut self, cluster_endpoints: &str) -> &mut Self {
if !cluster_endpoints.is_empty() {
self.cluster_endpoints = Some(cluster_endpoints.to_owned());
self.config.cluster_endpoints = Some(cluster_endpoints.to_owned());
}
self
}
Expand All @@ -128,7 +148,7 @@ impl RedisBuilder {
/// default: no username
pub fn username(&mut self, username: &str) -> &mut Self {
if !username.is_empty() {
self.username = Some(username.to_owned());
self.config.username = Some(username.to_owned());
}
self
}
Expand All @@ -138,7 +158,7 @@ impl RedisBuilder {
/// default: no password
pub fn password(&mut self, password: &str) -> &mut Self {
if !password.is_empty() {
self.password = Some(password.to_owned());
self.config.password = Some(password.to_owned());
}
self
}
Expand All @@ -147,15 +167,15 @@ impl RedisBuilder {
///
/// default: 0
pub fn db(&mut self, db: i64) -> &mut Self {
self.db = db;
self.config.db = db;
self
}

/// Set the default ttl for redis services.
///
/// If set, we will specify `EX` for write operations.
pub fn default_ttl(&mut self, ttl: Duration) -> &mut Self {
self.default_ttl = Some(ttl);
self.config.default_ttl = Some(ttl);
self
}

Expand All @@ -164,7 +184,7 @@ impl RedisBuilder {
/// default: "/"
pub fn root(&mut self, root: &str) -> &mut Self {
if !root.is_empty() {
self.root = Some(root.to_owned());
self.config.root = Some(root.to_owned());
}
self
}
Expand All @@ -175,38 +195,31 @@ impl Builder for RedisBuilder {
type Accessor = RedisBackend;

fn from_map(map: HashMap<String, String>) -> Self {
let mut builder = RedisBuilder::default();

map.get("root").map(|v| builder.root(v));
map.get("endpoint").map(|v| builder.endpoint(v));
map.get("cluster_endpoints")
.map(|v| builder.cluster_endpoints(v));
map.get("username").map(|v| builder.username(v));
map.get("password").map(|v| builder.password(v));
map.get("db")
.map(|v| v.parse::<i64>().map(|v| builder.db(v)));

builder
let config = RedisConfig::deserialize(ConfigDeserializer::new(map))
.expect("config deserialize must succeed");

RedisBuilder { config }
}

fn build(&mut self) -> Result<Self::Accessor> {
let root = normalize_root(
self.root
self.config
.root
.clone()
.unwrap_or_else(|| "/".to_string())
.as_str(),
);

if let Some(endpoints) = self.cluster_endpoints.clone() {
if let Some(endpoints) = self.config.cluster_endpoints.clone() {
let mut cluster_endpoints: Vec<ConnectionInfo> = Vec::default();
for endpoint in endpoints.split(',') {
cluster_endpoints.push(self.get_connection_info(endpoint.to_string())?);
}
let mut client_builder = ClusterClientBuilder::new(cluster_endpoints);
if let Some(username) = &self.username {
if let Some(username) = &self.config.username {
client_builder = client_builder.username(username.clone());
}
if let Some(password) = &self.password {
if let Some(password) = &self.config.password {
client_builder = client_builder.password(password.clone());
}
let client = client_builder.build()?;
Expand All @@ -218,11 +231,12 @@ impl Builder for RedisBuilder {
client: None,
cluster_client: Some(client),
conn,
default_ttl: self.default_ttl,
default_ttl: self.config.default_ttl,
})
.with_root(&root))
} else {
let endpoint = self
.config
.endpoint
.clone()
.unwrap_or_else(|| DEFAULT_REDIS_ENDPOINT.to_string());
Expand All @@ -231,8 +245,8 @@ impl Builder for RedisBuilder {
Client::open(self.get_connection_info(endpoint.clone())?).map_err(|e| {
Error::new(ErrorKind::ConfigInvalid, "invalid or unsupported scheme")
.with_context("service", Scheme::Redis)
.with_context("endpoint", self.endpoint.as_ref().unwrap())
.with_context("db", self.db.to_string())
.with_context("endpoint", self.config.endpoint.as_ref().unwrap())
.with_context("db", self.config.db.to_string())
.set_source(e)
})?;

Expand All @@ -242,7 +256,7 @@ impl Builder for RedisBuilder {
client: Some(client),
cluster_client: None,
conn,
default_ttl: self.default_ttl,
default_ttl: self.config.default_ttl,
})
.with_root(&root))
}
Expand Down Expand Up @@ -293,9 +307,9 @@ impl RedisBuilder {
};

let redis_info = RedisConnectionInfo {
db: self.db,
username: self.username.clone(),
password: self.password.clone(),
db: self.config.db,
username: self.config.username.clone(),
password: self.config.password.clone(),
};

Ok(ConnectionInfo {
Expand Down
1 change: 1 addition & 0 deletions core/src/services/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@

mod backend;
pub use backend::RedisBuilder as Redis;
pub use backend::RedisConfig;
10 changes: 5 additions & 5 deletions core/src/services/sqlite/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,22 +44,22 @@ pub struct SqliteConfig {
/// - `file://data.db`
///
/// For more information, please refer to [Opening A New Database Connection](http://www.sqlite.org/c3ref/open.html)
connection_string: Option<String>,
pub connection_string: Option<String>,

/// Set the table name of the sqlite service to read/write.
table: Option<String>,
pub table: Option<String>,
/// Set the key field name of the sqlite service to read/write.
///
/// Default to `key` if not specified.
key_field: Option<String>,
pub key_field: Option<String>,
/// Set the value field name of the sqlite service to read/write.
///
/// Default to `value` if not specified.
value_field: Option<String>,
pub value_field: Option<String>,
/// set the working directory, all operations will be performed under it.
///
/// default: "/"
root: Option<String>,
pub root: Option<String>,
}

impl Debug for SqliteConfig {
Expand Down