From 4318ae4dbad5cb094ca26b6bda3745e10479ca57 Mon Sep 17 00:00:00 2001 From: Xiaoguang Sun Date: Mon, 22 Oct 2018 09:32:47 +0800 Subject: [PATCH 01/17] Public API Request for Comments Signed-off-by: Xiaoguang Sun --- Cargo.toml | 16 ++++ src/lib.rs | 88 +++++++++++++++++ src/raw.rs | 62 ++++++++++++ src/transaction.rs | 229 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 395 insertions(+) create mode 100644 Cargo.toml create mode 100644 src/lib.rs create mode 100644 src/raw.rs create mode 100644 src/transaction.rs diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 00000000..0a395358 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "tikv-client" +version = "0.0.0" +keywords = ["TiKV", "KV", "distributed-systems"] +license = "Apache-2.0" +authors = ["The TiKV Project Developers"] +repository = "https://github.com/tikv/client-rust" +description = "The rust language implementation of TiKV client." + +[lib] +name = "tikv_client" + +[dependencies] +futures = "0.1" +serde = "1.0" +serde_derive = "1.0" diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 00000000..8567b9f7 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,88 @@ +extern crate futures; +extern crate serde; +#[macro_use] +extern crate serde_derive; + +use std::io::Error; +use std::path::PathBuf; + +use futures::Future; + +pub mod raw; +pub mod transaction; + +#[derive(Default, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug)] +pub struct Key(Vec); +#[derive(Default, Clone, Eq, PartialEq, Hash, Debug)] +pub struct Value(Vec); +#[derive(Default, Clone, Eq, PartialEq, Debug)] +pub struct KvPair(Key, Value); +#[derive(Default, Clone, Eq, PartialEq, Debug)] +pub struct KeyRange(Key, Key); + +pub type KvFuture = Box + Send>; + +impl Into for Vec { + fn into(self) -> Key { + Key(self) + } +} + +impl Into for Vec { + fn into(self) -> Value { + Value(self) + } +} + +impl Into for (Key, Value) { + fn into(self) -> KvPair { + KvPair(self.0, self.1) + } +} + +impl Into for (Key, Key) { + fn into(self) -> KeyRange { + KeyRange(self.0, self.1) + } +} + +#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)] +#[serde(default)] +#[serde(rename_all = "kebab-case")] +pub struct Config { + pub pd_endpoints: Vec, + pub ca_path: Option, + pub cert_path: Option, + pub key_path: Option, +} + +impl Config { + pub fn new(pd_endpoints: E) -> Self + where + E: IntoIterator, + { + Config { + pd_endpoints: pd_endpoints.into_iter().collect(), + ca_path: None, + cert_path: None, + key_path: None, + } + } + + pub fn with_security( + pd_endpoints: E, + ca_path: PathBuf, + cert_path: PathBuf, + key_path: PathBuf, + ) -> Self + where + E: IntoIterator, + { + Config { + pd_endpoints: pd_endpoints.into_iter().collect(), + ca_path: Some(ca_path), + cert_path: Some(cert_path), + key_path: Some(key_path), + } + } +} diff --git a/src/raw.rs b/src/raw.rs new file mode 100644 index 00000000..2178c561 --- /dev/null +++ b/src/raw.rs @@ -0,0 +1,62 @@ +use {Config, Key, KeyRange, KvFuture, KvPair, Value}; + +pub trait Client { + fn new(_config: &Config) -> KvFuture { + unimplemented!() + } + + fn get(&self, key: K, cf: C) -> KvFuture + where + K: Into, + C: Into>; + + fn batch_get(&self, keys: I, cf: C) -> KvFuture> + where + I: IntoIterator, + K: Into, + C: Into>; + + fn put(&self, pair: P, cf: C) -> KvFuture<()> + where + P: Into, + C: Into>; + + fn batch_put(&self, pairs: I, cf: C) -> KvFuture<()> + where + I: IntoIterator, + P: Into, + C: Into>; + + fn delete(&self, key: K, cf: C) -> KvFuture<()> + where + K: Into, + C: Into>; + + fn batch_delete(&self, keys: I, cf: C) -> KvFuture<()> + where + I: IntoIterator, + K: Into, + C: Into>; + + fn scan(&self, range: R, limit: u32, key_only: bool, cf: C) -> KvFuture> + where + R: Into, + C: Into>; + + fn batch_scan( + &self, + ranges: I, + each_limit: u32, + key_only: bool, + cf: C, + ) -> KvFuture> + where + I: IntoIterator, + R: Into, + C: Into>; + + fn delete_range(&self, range: R, cf: C) -> KvFuture<()> + where + R: Into, + C: Into>; +} diff --git a/src/transaction.rs b/src/transaction.rs new file mode 100644 index 00000000..3a37afc2 --- /dev/null +++ b/src/transaction.rs @@ -0,0 +1,229 @@ +use std::io::Error; + +use futures::{Poll, Stream}; + +use {Config, Key, KvFuture, KvPair, Value}; + +#[derive(Copy, Clone)] +pub struct Timestamp(u64); + +impl Into for u64 { + fn into(self) -> Timestamp { + Timestamp(self) + } +} + +impl Timestamp { + pub fn timestamp(self) -> u64 { + self.0 + } + + pub fn physical(self) -> i64 { + (self.0 >> 16) as i64 + } + + pub fn logical(self) -> i64 { + (self.0 & 0xFFFF as u64) as i64 + } +} + +pub struct Scanner; + +impl Stream for Scanner { + type Item = KvPair; + type Error = Error; + + fn poll(&mut self) -> Poll, Self::Error> { + unimplemented!() + } +} + +pub trait Retriever { + fn get(&self, key: K) -> KvFuture + where + K: Into; + + fn batch_get(&self, keys: I) -> KvFuture> + where + I: IntoIterator, + K: Into; + + fn seek(&self, key: K) -> KvFuture + where + K: Into; + + fn seek_reverse(&self, key: K) -> KvFuture + where + K: Into; +} + +pub trait Mutator { + fn set

(&mut self, pair: P) -> KvFuture<()> + where + P: Into; + + fn delete(&mut self, key: K) -> KvFuture<()> + where + K: Into; +} + +pub struct Transaction; + +impl Transaction { + pub fn commit(&mut self) -> KvFuture<()> { + unimplemented!() + } + + pub fn rollback(&mut self) -> KvFuture<()> { + unimplemented!() + } + + pub fn lock_keys(&mut self, keys: I) -> KvFuture<()> + where + I: IntoIterator, + K: Into, + { + drop(keys); + unimplemented!() + } + + pub fn is_readonly(&self) -> bool { + unimplemented!() + } + + pub fn start_ts(&self) -> Timestamp { + unimplemented!() + } + + pub fn snapshot(&self) -> KvFuture { + unimplemented!() + } +} + +impl Retriever for Transaction { + fn get(&self, key: K) -> KvFuture + where + K: Into, + { + drop(key); + unimplemented!() + } + + fn batch_get(&self, keys: I) -> KvFuture> + where + I: IntoIterator, + K: Into, + { + drop(keys); + unimplemented!() + } + + fn seek(&self, key: K) -> KvFuture + where + K: Into, + { + drop(key); + unimplemented!() + } + + fn seek_reverse(&self, key: K) -> KvFuture + where + K: Into, + { + drop(key); + unimplemented!() + } +} + +impl Mutator for Transaction { + fn set

(&mut self, pair: P) -> KvFuture<()> + where + P: Into, + { + drop(pair); + unimplemented!() + } + + fn delete(&mut self, key: K) -> KvFuture<()> + where + K: Into, + { + drop(key); + unimplemented!() + } +} + +pub struct Snapshot; + +impl Retriever for Snapshot { + fn get(&self, key: K) -> KvFuture + where + K: Into, + { + drop(key); + unimplemented!() + } + + fn batch_get(&self, keys: I) -> KvFuture> + where + I: IntoIterator, + K: Into, + { + drop(keys); + unimplemented!() + } + + fn seek(&self, key: K) -> KvFuture + where + K: Into, + { + drop(key); + unimplemented!() + } + + fn seek_reverse(&self, key: K) -> KvFuture + where + K: Into, + { + drop(key); + unimplemented!() + } +} + +pub struct Oracle; + +impl Oracle { + pub fn timestamp(&self) -> KvFuture { + unimplemented!() + } + + pub fn is_expired(&self, _lock_timestamp: Timestamp, _ttl: Timestamp) -> KvFuture { + unimplemented!() + } +} + +pub trait Client { + fn new(_config: &Config) -> KvFuture { + unimplemented!() + } + + fn begin(&self) -> KvFuture { + unimplemented!() + } + + fn begin_with_timestamp(&self, _timestamp: Timestamp) -> KvFuture { + unimplemented!() + } + + fn snapshot(&self) -> KvFuture { + unimplemented!() + } + + fn current_timestamp(&self) -> Timestamp { + unimplemented!() + } + + fn oracle(&self) -> Oracle { + unimplemented!() + } +} From c9b96412fded65ed6e0e1bd3c5f14c2dffce4a28 Mon Sep 17 00:00:00 2001 From: Xiaoguang Sun Date: Tue, 23 Oct 2018 23:19:41 +0800 Subject: [PATCH 02/17] Remove Oracle from transaction API Signed-off-by: Xiaoguang Sun --- src/transaction.rs | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/src/transaction.rs b/src/transaction.rs index 3a37afc2..18151c0f 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -190,18 +190,6 @@ impl Retriever for Snapshot { } } -pub struct Oracle; - -impl Oracle { - pub fn timestamp(&self) -> KvFuture { - unimplemented!() - } - - pub fn is_expired(&self, _lock_timestamp: Timestamp, _ttl: Timestamp) -> KvFuture { - unimplemented!() - } -} - pub trait Client { fn new(_config: &Config) -> KvFuture { unimplemented!() @@ -222,8 +210,4 @@ pub trait Client { fn current_timestamp(&self) -> Timestamp { unimplemented!() } - - fn oracle(&self) -> Oracle { - unimplemented!() - } } From 1e345ca55137527d634dbeccac521f5afff3ac68 Mon Sep 17 00:00:00 2001 From: Xiaoguang Sun Date: Thu, 25 Oct 2018 15:00:11 +0800 Subject: [PATCH 03/17] Add set_limit/set_key_only to Scanner Signed-off-by: Xiaoguang Sun --- src/transaction.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/transaction.rs b/src/transaction.rs index 18151c0f..90bf6faa 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -29,6 +29,16 @@ impl Timestamp { pub struct Scanner; +impl Scanner { + pub fn set_limit(&mut self, _limit: u32) { + unimplemented!() + } + + pub fn set_key_only(&mut self, _key_only: bool) { + unimplemented!() + } +} + impl Stream for Scanner { type Item = KvPair; type Error = Error; From 2806bfb7e0213307b9bc7baba67bc1c7231fea1f Mon Sep 17 00:00:00 2001 From: Xiaoguang Sun Date: Fri, 9 Nov 2018 21:45:34 +0800 Subject: [PATCH 04/17] Do not move argument unless necessary. Signed-off-by: Xiaoguang Sun --- examples/raw.rs | 48 ++++++++++++++++ examples/transaction.rs | 86 ++++++++++++++++++++++++++++ src/lib.rs | 21 ++++--- src/raw.rs | 120 ++++++++++++++++++++++++++++++++++------ src/transaction.rs | 62 ++++++++++++--------- 5 files changed, 283 insertions(+), 54 deletions(-) create mode 100644 examples/raw.rs create mode 100644 examples/transaction.rs diff --git a/examples/raw.rs b/examples/raw.rs new file mode 100644 index 00000000..a8174a04 --- /dev/null +++ b/examples/raw.rs @@ -0,0 +1,48 @@ +extern crate futures; +extern crate tikv_client; + +use futures::future::Future; +use tikv_client::raw::Client; +use tikv_client::*; + +fn main() { + let config = Config::new(vec!["127.0.0.1:3379"]); + let raw = raw::RawClient::new(&config) + .wait() + .expect("Could not connect to tikv"); + + let key: Key = b"Company".to_vec().into(); + let value: Value = b"PingCAP".to_vec().into(); + + raw.put((Clone::clone(&key), Clone::clone(&value)), None) + .wait() + .expect("Could not put kv pair to tikv"); + println!("Successfully put {:?}:{:?} to tikv", key, value); + + let value = raw.get(&key, None).wait().expect("Could not get value"); + println!("Found val: {:?} for key: {:?}", value, key); + + raw.delete(&key, None) + .wait() + .expect("Could not delete value"); + println!("Key: {:?} deleted", key); + + raw.get(&key, None) + .wait() + .expect_err("Get returned value for not existing key"); + + let keys = vec![b"k1".to_vec().into(), b"k2".to_vec().into()]; + + let values = raw + .batch_get(&keys, None) + .wait() + .expect("Could not get values"); + println!("Found values: {:?} for keys: {:?}", values, keys); + + let start: Key = b"k1".to_vec().into(); + let end: Key = b"k2".to_vec().into(); + raw.scan(&start..&end, 10, false, None); + + let ranges = [&start..&end, &start..&end]; + raw.batch_scan(&ranges, 10, false, None); +} diff --git a/examples/transaction.rs b/examples/transaction.rs new file mode 100644 index 00000000..e8f9fd12 --- /dev/null +++ b/examples/transaction.rs @@ -0,0 +1,86 @@ +extern crate futures; +extern crate tikv_client; + +use futures::{Async, Future, Stream}; +use tikv_client::transaction::{Client, Mutator, Retriever, TxnClient}; +use tikv_client::*; + +fn puts(client: &TxnClient, pairs: P) +where + P: IntoIterator, + I: Into, +{ + let mut txn = client.begin().wait().expect("Could not begin transaction"); + let _: Vec<()> = pairs + .into_iter() + .map(Into::into) + .map(|p| { + txn.set(p).wait().expect("Could not set key value pair"); + }).collect(); + txn.commit().wait().expect("Could not commit transaction"); +} + +fn get(client: &TxnClient, key: &Key) -> Value { + let txn = client.begin().wait().expect("Could not begin transaction"); + txn.get(key).wait().expect("Could not get value") +} + +fn scan(client: &TxnClient, start: &Key, limit: usize) { + let txn = client.begin().wait().expect("Could not begin transaction"); + let mut scanner = txn.seek(start).wait().expect("Could not seek to start key"); + let mut limit = limit; + loop { + if limit == 0 { + break; + } + match scanner.poll() { + Ok(Async::Ready(None)) => return, + Ok(Async::Ready(Some(pair))) => { + limit -= 1; + println!("{:?}", pair); + } + _ => break, + } + } +} + +fn dels

(client: &TxnClient, pairs: P) +where + P: IntoIterator, +{ + let mut txn = client.begin().wait().expect("Could not begin transaction"); + let _: Vec<()> = pairs + .into_iter() + .map(|p| { + txn.delete(p).wait().expect("Could not delete key"); + }).collect(); + txn.commit().wait().expect("Could not commit transaction"); +} + +fn main() { + let config = Config::new(vec!["127.0.0.1:3379"]); + let txn = TxnClient::new(&config) + .wait() + .expect("Could not connect to tikv"); + + // set + let key1: Key = b"key1".to_vec().into(); + let value1: Value = b"value1".to_vec().into(); + let key2: Key = b"key2".to_vec().into(); + let value2: Value = b"value2".to_vec().into(); + puts(&txn, vec![(key1, value1), (key2, value2)]); + + // get + let key1: Key = b"key1".to_vec().into(); + let value1 = get(&txn, &key1); + println!("{:?}", (key1, value1)); + + // scan + let key1: Key = b"key1".to_vec().into(); + scan(&txn, &key1, 10); + + // delete + let key1: Key = b"key1".to_vec().into(); + let key2: Key = b"key2".to_vec().into(); + dels(&txn, vec![key1, key2]); +} diff --git a/src/lib.rs b/src/lib.rs index 8567b9f7..8826c167 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,8 +17,6 @@ pub struct Key(Vec); pub struct Value(Vec); #[derive(Default, Clone, Eq, PartialEq, Debug)] pub struct KvPair(Key, Value); -#[derive(Default, Clone, Eq, PartialEq, Debug)] -pub struct KeyRange(Key, Key); pub type KvFuture = Box + Send>; @@ -28,6 +26,12 @@ impl Into for Vec { } } +impl AsRef for Key { + fn as_ref(&self) -> &Self { + self + } +} + impl Into for Vec { fn into(self) -> Value { Value(self) @@ -40,12 +44,6 @@ impl Into for (Key, Value) { } } -impl Into for (Key, Key) { - fn into(self) -> KeyRange { - KeyRange(self.0, self.1) - } -} - #[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)] #[serde(default)] #[serde(rename_all = "kebab-case")] @@ -57,12 +55,13 @@ pub struct Config { } impl Config { - pub fn new(pd_endpoints: E) -> Self + pub fn new(pd_endpoints: E) -> Self where - E: IntoIterator, + E: IntoIterator, + S: Into, { Config { - pd_endpoints: pd_endpoints.into_iter().collect(), + pd_endpoints: pd_endpoints.into_iter().map(Into::into).collect(), ca_path: None, cert_path: None, key_path: None, diff --git a/src/raw.rs b/src/raw.rs index 2178c561..ccb4e14b 100644 --- a/src/raw.rs +++ b/src/raw.rs @@ -1,19 +1,106 @@ -use {Config, Key, KeyRange, KvFuture, KvPair, Value}; +use std::ops::RangeBounds; -pub trait Client { - fn new(_config: &Config) -> KvFuture { +use {Config, Key, KvFuture, KvPair, Value}; + +pub struct RawClient; + +impl RawClient { + pub fn new(_config: &Config) -> KvFuture { + unimplemented!() + } +} + +impl Client for RawClient { + fn get(&self, _key: K, _cf: C) -> KvFuture + where + K: AsRef, + C: Into>, + { + unimplemented!() + } + + fn batch_get(&self, _keys: K, _cf: C) -> KvFuture> + where + K: AsRef<[Key]>, + C: Into>, + { unimplemented!() } + fn put(&self, _pair: P, _cf: C) -> KvFuture<()> + where + P: Into, + C: Into>, + { + unimplemented!() + } + + fn batch_put(&self, _pairs: I, _cf: C) -> KvFuture<()> + where + I: IntoIterator, + P: Into, + C: Into>, + { + unimplemented!() + } + + fn delete(&self, _key: K, _cf: C) -> KvFuture<()> + where + K: AsRef, + C: Into>, + { + unimplemented!() + } + + fn batch_delete(&self, _keys: K, _cf: C) -> KvFuture<()> + where + K: AsRef<[Key]>, + C: Into>, + { + unimplemented!() + } + + fn scan(&self, _range: R, _limit: u32, _key_only: bool, _cf: C) -> KvFuture> + where + R: RangeBounds, + C: Into>, + { + unimplemented!() + } + + fn batch_scan( + &self, + _ranges: R, + _each_limit: u32, + _key_only: bool, + _cf: C, + ) -> KvFuture> + where + R: AsRef<[B]>, + B: RangeBounds, + C: Into>, + { + unimplemented!() + } + + fn delete_range(&self, _range: R, _cf: C) -> KvFuture<()> + where + R: RangeBounds, + C: Into>, + { + unimplemented!() + } +} + +pub trait Client { fn get(&self, key: K, cf: C) -> KvFuture where - K: Into, + K: AsRef, C: Into>; - fn batch_get(&self, keys: I, cf: C) -> KvFuture> + fn batch_get(&self, keys: K, cf: C) -> KvFuture> where - I: IntoIterator, - K: Into, + K: AsRef<[Key]>, C: Into>; fn put(&self, pair: P, cf: C) -> KvFuture<()> @@ -29,34 +116,33 @@ pub trait Client { fn delete(&self, key: K, cf: C) -> KvFuture<()> where - K: Into, + K: AsRef, C: Into>; - fn batch_delete(&self, keys: I, cf: C) -> KvFuture<()> + fn batch_delete(&self, keys: K, cf: C) -> KvFuture<()> where - I: IntoIterator, - K: Into, + K: AsRef<[Key]>, C: Into>; fn scan(&self, range: R, limit: u32, key_only: bool, cf: C) -> KvFuture> where - R: Into, + R: RangeBounds, C: Into>; - fn batch_scan( + fn batch_scan( &self, - ranges: I, + ranges: R, each_limit: u32, key_only: bool, cf: C, ) -> KvFuture> where - I: IntoIterator, - R: Into, + R: AsRef<[B]>, + B: RangeBounds, C: Into>; fn delete_range(&self, range: R, cf: C) -> KvFuture<()> where - R: Into, + R: RangeBounds, C: Into>; } diff --git a/src/transaction.rs b/src/transaction.rs index 90bf6faa..59859644 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -51,20 +51,19 @@ impl Stream for Scanner { pub trait Retriever { fn get(&self, key: K) -> KvFuture where - K: Into; + K: AsRef; - fn batch_get(&self, keys: I) -> KvFuture> + fn batch_get(&self, keys: K) -> KvFuture> where - I: IntoIterator, - K: Into; + K: AsRef<[Key]>; fn seek(&self, key: K) -> KvFuture where - K: Into; + K: AsRef; fn seek_reverse(&self, key: K) -> KvFuture where - K: Into; + K: AsRef; } pub trait Mutator { @@ -74,24 +73,23 @@ pub trait Mutator { fn delete(&mut self, key: K) -> KvFuture<()> where - K: Into; + K: AsRef; } pub struct Transaction; impl Transaction { - pub fn commit(&mut self) -> KvFuture<()> { + pub fn commit(self) -> KvFuture<()> { unimplemented!() } - pub fn rollback(&mut self) -> KvFuture<()> { + pub fn rollback(self) -> KvFuture<()> { unimplemented!() } - pub fn lock_keys(&mut self, keys: I) -> KvFuture<()> + pub fn lock_keys(&mut self, keys: K) -> KvFuture<()> where - I: IntoIterator, - K: Into, + K: AsRef<[Key]>, { drop(keys); unimplemented!() @@ -113,16 +111,15 @@ impl Transaction { impl Retriever for Transaction { fn get(&self, key: K) -> KvFuture where - K: Into, + K: AsRef, { drop(key); unimplemented!() } - fn batch_get(&self, keys: I) -> KvFuture> + fn batch_get(&self, keys: K) -> KvFuture> where - I: IntoIterator, - K: Into, + K: AsRef<[Key]>, { drop(keys); unimplemented!() @@ -130,7 +127,7 @@ impl Retriever for Transaction { fn seek(&self, key: K) -> KvFuture where - K: Into, + K: AsRef, { drop(key); unimplemented!() @@ -138,7 +135,7 @@ impl Retriever for Transaction { fn seek_reverse(&self, key: K) -> KvFuture where - K: Into, + K: AsRef, { drop(key); unimplemented!() @@ -156,7 +153,7 @@ impl Mutator for Transaction { fn delete(&mut self, key: K) -> KvFuture<()> where - K: Into, + K: AsRef, { drop(key); unimplemented!() @@ -168,16 +165,15 @@ pub struct Snapshot; impl Retriever for Snapshot { fn get(&self, key: K) -> KvFuture where - K: Into, + K: AsRef, { drop(key); unimplemented!() } - fn batch_get(&self, keys: I) -> KvFuture> + fn batch_get(&self, keys: K) -> KvFuture> where - I: IntoIterator, - K: Into, + K: AsRef<[Key]>, { drop(keys); unimplemented!() @@ -185,7 +181,7 @@ impl Retriever for Snapshot { fn seek(&self, key: K) -> KvFuture where - K: Into, + K: AsRef, { drop(key); unimplemented!() @@ -193,7 +189,7 @@ impl Retriever for Snapshot { fn seek_reverse(&self, key: K) -> KvFuture where - K: Into, + K: AsRef, { drop(key); unimplemented!() @@ -201,10 +197,24 @@ impl Retriever for Snapshot { } pub trait Client { - fn new(_config: &Config) -> KvFuture { + fn begin(&self) -> KvFuture; + + fn begin_with_timestamp(&self, _timestamp: Timestamp) -> KvFuture; + + fn snapshot(&self) -> KvFuture; + + fn current_timestamp(&self) -> Timestamp; +} + +pub struct TxnClient {} + +impl TxnClient { + pub fn new(_config: &Config) -> KvFuture { unimplemented!() } +} +impl Client for TxnClient { fn begin(&self) -> KvFuture { unimplemented!() } From ef2d11f77216a73f0abc20c3eaa2731aeda316c1 Mon Sep 17 00:00:00 2001 From: Xiaoguang Sun Date: Wed, 14 Nov 2018 09:08:29 +0800 Subject: [PATCH 05/17] Use builder style interface for raw kv Signed-off-by: Xiaoguang Sun --- examples/raw.rs | 30 ++- src/raw.rs | 479 ++++++++++++++++++++++++++++++++++++++---------- 2 files changed, 404 insertions(+), 105 deletions(-) diff --git a/examples/raw.rs b/examples/raw.rs index a8174a04..3e328f6f 100644 --- a/examples/raw.rs +++ b/examples/raw.rs @@ -14,35 +14,51 @@ fn main() { let key: Key = b"Company".to_vec().into(); let value: Value = b"PingCAP".to_vec().into(); - raw.put((Clone::clone(&key), Clone::clone(&value)), None) + raw.put((Clone::clone(&key), Clone::clone(&value))) + .cf("test_cf") .wait() .expect("Could not put kv pair to tikv"); println!("Successfully put {:?}:{:?} to tikv", key, value); - let value = raw.get(&key, None).wait().expect("Could not get value"); + let value = raw + .get(&key) + .cf("test_cf") + .wait() + .expect("Could not get value"); println!("Found val: {:?} for key: {:?}", value, key); - raw.delete(&key, None) + raw.delete(&key) + .cf("test_cf") .wait() .expect("Could not delete value"); println!("Key: {:?} deleted", key); - raw.get(&key, None) + raw.get(&key) + .cf("test_cf") .wait() .expect_err("Get returned value for not existing key"); let keys = vec![b"k1".to_vec().into(), b"k2".to_vec().into()]; let values = raw - .batch_get(&keys, None) + .batch_get(&keys) + .cf("test_cf") .wait() .expect("Could not get values"); println!("Found values: {:?} for keys: {:?}", values, keys); let start: Key = b"k1".to_vec().into(); let end: Key = b"k2".to_vec().into(); - raw.scan(&start..&end, 10, false, None); + raw.scan(&start..&end, 10) + .cf("test_cf") + .key_only() + .wait() + .expect("Could not scan"); let ranges = [&start..&end, &start..&end]; - raw.batch_scan(&ranges, 10, false, None); + raw.batch_scan(&ranges, 10) + .cf("test_cf") + .key_only() + .wait() + .expect("Could not batch scan"); } diff --git a/src/raw.rs b/src/raw.rs index ccb4e14b..8e8879ec 100644 --- a/src/raw.rs +++ b/src/raw.rs @@ -1,148 +1,431 @@ use std::ops::RangeBounds; +use futures::{Future, Poll}; + use {Config, Key, KvFuture, KvPair, Value}; -pub struct RawClient; +#[derive(Default, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug)] +pub struct ColumnFamily(String); -impl RawClient { - pub fn new(_config: &Config) -> KvFuture { - unimplemented!() +impl Into for String { + fn into(self) -> ColumnFamily { + ColumnFamily(self) } } -impl Client for RawClient { - fn get(&self, _key: K, _cf: C) -> KvFuture - where - K: AsRef, - C: Into>, - { +impl<'a> Into for &'a str { + fn into(self) -> ColumnFamily { + ColumnFamily(self.to_owned()) + } +} + +pub struct Get<'a, AClient: Client> { + client: &'a AClient, + key: Key, + cf: Option, +} + +impl<'a, AClient: Client> Get<'a, AClient> { + fn new(client: &'a AClient, key: Key) -> Self { + Get { + client, + key, + cf: None, + } + } + + pub fn cf(mut self, cf: impl Into) -> Self { + self.cf = Some(cf.into()); + self + } +} + +impl<'a, AClient: Client> Future for Get<'a, AClient> { + type Item = Value; + type Error = (); + + fn poll(&mut self) -> Poll { + let _ = &self.client; + let _ = &self.key; + let _ = &self.cf; unimplemented!() } +} - fn batch_get(&self, _keys: K, _cf: C) -> KvFuture> - where - K: AsRef<[Key]>, - C: Into>, - { +pub struct BatchGet<'a, AClient: Client> { + client: &'a AClient, + keys: Vec, + cf: Option, +} + +impl<'a, AClient: Client> BatchGet<'a, AClient> { + fn new(client: &'a AClient, keys: Vec) -> Self { + BatchGet { + client, + keys, + cf: None, + } + } + + pub fn cf(mut self, cf: impl Into) -> Self { + self.cf = Some(cf.into()); + self + } +} + +impl<'a, AClient: Client> Future for BatchGet<'a, AClient> { + type Item = Vec; + type Error = (); + + fn poll(&mut self) -> Poll { + let _ = &self.client; + let _ = &self.keys; + let _ = &self.cf; unimplemented!() } +} - fn put(&self, _pair: P, _cf: C) -> KvFuture<()> - where - P: Into, - C: Into>, - { +pub struct Put<'a, AClient: Client> { + client: &'a AClient, + pair: KvPair, + cf: Option, +} + +impl<'a, AClient: Client> Put<'a, AClient> { + fn new(client: &'a AClient, pair: KvPair) -> Self { + Put { + client, + pair, + cf: None, + } + } + + pub fn cf(mut self, cf: impl Into) -> Self { + self.cf = Some(cf.into()); + self + } +} + +impl<'a, AClient: Client> Future for Put<'a, AClient> { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll { + let _ = &self.client; + let _ = &self.pair; + let _ = &self.cf; unimplemented!() } +} - fn batch_put(&self, _pairs: I, _cf: C) -> KvFuture<()> - where - I: IntoIterator, - P: Into, - C: Into>, - { +pub struct BatchPut<'a, AClient: Client> { + client: &'a AClient, + pairs: Vec, + cf: Option, +} + +impl<'a, AClient: Client> BatchPut<'a, AClient> { + fn new(client: &'a AClient, pairs: Vec) -> Self { + BatchPut { + client, + pairs, + cf: None, + } + } + + pub fn cf(mut self, cf: impl Into) -> Self { + self.cf = Some(cf.into()); + self + } +} + +impl<'a, AClient: Client> Future for BatchPut<'a, AClient> { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll { + let _ = &self.client; + let _ = &self.pairs; + let _ = &self.cf; unimplemented!() } +} - fn delete(&self, _key: K, _cf: C) -> KvFuture<()> - where - K: AsRef, - C: Into>, - { +pub struct Delete<'a, AClient: Client> { + client: &'a AClient, + key: Key, + cf: Option, +} + +impl<'a, AClient: Client> Delete<'a, AClient> { + fn new(client: &'a AClient, key: Key) -> Self { + Delete { + client, + key, + cf: None, + } + } + + pub fn cf(mut self, cf: impl Into) -> Self { + self.cf = Some(cf.into()); + self + } +} + +impl<'a, AClient: Client> Future for Delete<'a, AClient> { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll { + let _ = &self.client; + let _ = &self.key; + let _ = &self.cf; unimplemented!() } +} - fn batch_delete(&self, _keys: K, _cf: C) -> KvFuture<()> - where - K: AsRef<[Key]>, - C: Into>, - { +pub struct BatchDelete<'a, AClient: Client> { + client: &'a AClient, + keys: Vec, + cf: Option, +} + +impl<'a, AClient: Client> BatchDelete<'a, AClient> { + fn new(client: &'a AClient, keys: Vec) -> Self { + BatchDelete { + client, + keys, + cf: None, + } + } + + pub fn cf(mut self, cf: impl Into) -> Self { + self.cf = Some(cf.into()); + self + } +} + +impl<'a, AClient: Client> Future for BatchDelete<'a, AClient> { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll { + let _ = &self.client; + let _ = &self.keys; + let _ = &self.cf; unimplemented!() } +} - fn scan(&self, _range: R, _limit: u32, _key_only: bool, _cf: C) -> KvFuture> - where - R: RangeBounds, - C: Into>, - { +pub struct Scan<'a, AClient: Client> { + client: &'a AClient, + range: (Key, Key), + limit: u32, + key_only: bool, + cf: Option, +} + +impl<'a, AClient: Client> Scan<'a, AClient> { + fn new(client: &'a AClient, range: (Key, Key), limit: u32) -> Self { + Scan { + client, + range, + limit, + key_only: false, + cf: None, + } + } + + pub fn key_only(mut self) -> Self { + self.key_only = true; + self + } + + pub fn cf(mut self, cf: impl Into) -> Self { + self.cf = Some(cf.into()); + self + } +} + +impl<'a, AClient: Client> Future for Scan<'a, AClient> { + type Item = Vec; + type Error = (); + + fn poll(&mut self) -> Poll { + let _ = &self.client; + let _ = &self.range; + let _ = &self.limit; + let _ = &self.key_only; + let _ = &self.cf; unimplemented!() } +} - fn batch_scan( - &self, - _ranges: R, - _each_limit: u32, - _key_only: bool, - _cf: C, - ) -> KvFuture> - where - R: AsRef<[B]>, - B: RangeBounds, - C: Into>, - { +pub struct BatchScan<'a, AClient: Client> { + client: &'a AClient, + ranges: Vec<(Key, Key)>, + each_limit: u32, + key_only: bool, + cf: Option, +} + +impl<'a, AClient: Client> BatchScan<'a, AClient> { + fn new(client: &'a AClient, ranges: Vec<(Key, Key)>, each_limit: u32) -> Self { + BatchScan { + client, + ranges, + each_limit, + key_only: false, + cf: None, + } + } + + pub fn key_only(mut self) -> Self { + self.key_only = true; + self + } + + pub fn cf(mut self, cf: impl Into) -> Self { + self.cf = Some(cf.into()); + self + } +} + +impl<'a, AClient: Client> Future for BatchScan<'a, AClient> { + type Item = Vec; + type Error = (); + + fn poll(&mut self) -> Poll { + let _ = &self.client; + let _ = &self.ranges; + let _ = &self.each_limit; + let _ = &self.key_only; + let _ = &self.cf; unimplemented!() } +} - fn delete_range(&self, _range: R, _cf: C) -> KvFuture<()> - where - R: RangeBounds, - C: Into>, - { +pub struct DeleteRange<'a, AClient: Client> { + client: &'a AClient, + range: (Key, Key), + cf: Option, +} + +impl<'a, AClient: Client> DeleteRange<'a, AClient> { + fn new(client: &'a AClient, range: (Key, Key)) -> Self { + DeleteRange { + client, + range, + cf: None, + } + } + + pub fn cf(mut self, cf: impl Into) -> Self { + self.cf = Some(cf.into()); + self + } +} + +impl<'a, AClient: Client> Future for DeleteRange<'a, AClient> { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll { + let _ = &self.client; + let _ = &self.range; + let _ = &self.cf; unimplemented!() } } pub trait Client { - fn get(&self, key: K, cf: C) -> KvFuture - where - K: AsRef, - C: Into>; + type AClient: Client; - fn batch_get(&self, keys: K, cf: C) -> KvFuture> - where - K: AsRef<[Key]>, - C: Into>; + fn get(&self, key: impl AsRef) -> Get; - fn put(&self, pair: P, cf: C) -> KvFuture<()> - where - P: Into, - C: Into>; + fn batch_get(&self, keys: impl AsRef<[Key]>) -> BatchGet; - fn batch_put(&self, pairs: I, cf: C) -> KvFuture<()> - where - I: IntoIterator, - P: Into, - C: Into>; + fn put(&self, pair: impl Into) -> Put; - fn delete(&self, key: K, cf: C) -> KvFuture<()> - where - K: AsRef, - C: Into>; + fn batch_put( + &self, + pairs: impl IntoIterator>, + ) -> BatchPut; - fn batch_delete(&self, keys: K, cf: C) -> KvFuture<()> - where - K: AsRef<[Key]>, - C: Into>; + fn delete(&self, key: impl AsRef) -> Delete; - fn scan(&self, range: R, limit: u32, key_only: bool, cf: C) -> KvFuture> - where - R: RangeBounds, - C: Into>; + fn batch_delete(&self, keys: impl AsRef<[Key]>) -> BatchDelete; + + fn scan(&self, range: impl RangeBounds, limit: u32) -> Scan; - fn batch_scan( + fn batch_scan( &self, - ranges: R, + ranges: Ranges, each_limit: u32, - key_only: bool, - cf: C, - ) -> KvFuture> + ) -> BatchScan where - R: AsRef<[B]>, - B: RangeBounds, - C: Into>; + Ranges: AsRef<[Bounds]>, + Bounds: RangeBounds; + + fn delete_range(&self, range: impl RangeBounds) -> DeleteRange; +} + +pub struct RawClient; + +impl RawClient { + pub fn new(_config: &Config) -> KvFuture { + unimplemented!() + } + + fn extract_range(_range: &impl RangeBounds) -> (Key, Key) { + unimplemented!() + } +} + +impl Client for RawClient { + type AClient = Self; + + fn get(&self, key: impl AsRef) -> Get { + Get::new(self, Clone::clone(key.as_ref())) + } + + fn batch_get(&self, keys: impl AsRef<[Key]>) -> BatchGet { + BatchGet::new(self, keys.as_ref().to_vec()) + } - fn delete_range(&self, range: R, cf: C) -> KvFuture<()> + fn put(&self, pair: impl Into) -> Put { + Put::new(self, pair.into()) + } + + fn batch_put(&self, pairs: impl IntoIterator>) -> BatchPut { + BatchPut::new(self, pairs.into_iter().map(Into::into).collect()) + } + + fn delete(&self, key: impl AsRef) -> Delete { + Delete::new(self, Clone::clone(key.as_ref())) + } + + fn batch_delete(&self, keys: impl AsRef<[Key]>) -> BatchDelete { + BatchDelete::new(self, keys.as_ref().to_vec()) + } + + fn scan(&self, range: impl RangeBounds, limit: u32) -> Scan { + Scan::new(self, Self::extract_range(&range), limit) + } + + fn batch_scan(&self, ranges: Ranges, each_limit: u32) -> BatchScan where - R: RangeBounds, - C: Into>; + Ranges: AsRef<[Bounds]>, + Bounds: RangeBounds, + { + BatchScan::new( + self, + ranges.as_ref().iter().map(Self::extract_range).collect(), + each_limit, + ) + } + + fn delete_range(&self, range: impl RangeBounds) -> DeleteRange { + DeleteRange::new(self, Self::extract_range(&range)) + } } From c3af980c9e18cbdaccc345c257b2cf1d3d6f5d05 Mon Sep 17 00:00:00 2001 From: Xiaoguang Sun Date: Wed, 14 Nov 2018 22:52:58 +0800 Subject: [PATCH 06/17] Add errors definition Signed-off-by: Xiaoguang Sun --- Cargo.toml | 2 ++ src/errors.rs | 76 +++++++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 12 ++++++-- 3 files changed, 87 insertions(+), 3 deletions(-) create mode 100644 src/errors.rs diff --git a/Cargo.toml b/Cargo.toml index 0a395358..8f2a59a9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,3 +14,5 @@ name = "tikv_client" futures = "0.1" serde = "1.0" serde_derive = "1.0" +quick-error = "1.2" +grpcio = { version = "0.4", features = [ "secure" ] } diff --git a/src/errors.rs b/src/errors.rs new file mode 100644 index 00000000..02bc4cd1 --- /dev/null +++ b/src/errors.rs @@ -0,0 +1,76 @@ +// Copyright 2016 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::error; +use std::result; + +quick_error!{ + #[derive(Debug)] + pub enum Error { + Io(err: ::std::io::Error) { + from() + cause(err) + description(err.description()) + } + Grpc(err: ::grpc::Error) { + from() + cause(err) + description(err.description()) + } + Canceled(err: ::futures::sync::oneshot::Canceled) { + from() + cause(err) + description(err.description()) + } + Other(err: Box) { + from() + cause(err.as_ref()) + description(err.description()) + display("unknown error {:?}", err) + } + RegionForKeyNotFound(key: Vec) { + description("region is not found") + display("region is not found for key {:?}", key) + } + RegionNotFound(id: u64) { + description("region is not found") + display("region {:?} is not found", id) + } + NotLeader(region_id: u64) { + description("peer is not leader") + display("peer is not leader for region {:?}.", region_id) + } + StoreNotMatch { + description("store not match") + display("store not match") + } + KeyNotInRegion(key: Vec, region_id: u64, start_key: Vec, end_key: Vec) { + description("region is not found") + display("key {:?} is not in region {:?}: [{:?}, {:?})", key, region_id, start_key, end_key) + } + StaleEpoch { + description("stale epoch") + display("stale epoch") + } + ServerIsBusy(reason: String) { + description("server is busy") + display("server is busy: {:?}", reason) + } + RaftEntryTooLarge(region_id: u64, entry_size: u64) { + description("raft entry too large") + display("{:?} bytes raft entry of region {:?} is too large", entry_size, region_id) + } + } +} + +pub type Result = result::Result; diff --git a/src/lib.rs b/src/lib.rs index 8826c167..58db956f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,14 +2,20 @@ extern crate futures; extern crate serde; #[macro_use] extern crate serde_derive; +#[macro_use] +extern crate quick_error; +extern crate grpcio as grpc; + +pub mod raw; +pub mod transaction; +pub mod errors; -use std::io::Error; use std::path::PathBuf; use futures::Future; -pub mod raw; -pub mod transaction; +pub use errors::Error; +pub use errors::Result; #[derive(Default, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug)] pub struct Key(Vec); From 9c790b1cdf0bd8750fbd84d0ff8586a4ccf52e5d Mon Sep 17 00:00:00 2001 From: Xiaoguang Sun Date: Wed, 14 Nov 2018 23:14:14 +0800 Subject: [PATCH 07/17] Use impl Trait whenever possible Signed-off-by: Xiaoguang Sun --- examples/transaction.rs | 9 ++--- src/lib.rs | 17 +++------ src/transaction.rs | 79 +++++++++-------------------------------- 3 files changed, 24 insertions(+), 81 deletions(-) diff --git a/examples/transaction.rs b/examples/transaction.rs index e8f9fd12..1c37b3e6 100644 --- a/examples/transaction.rs +++ b/examples/transaction.rs @@ -5,10 +5,7 @@ use futures::{Async, Future, Stream}; use tikv_client::transaction::{Client, Mutator, Retriever, TxnClient}; use tikv_client::*; -fn puts(client: &TxnClient, pairs: P) -where - P: IntoIterator, - I: Into, +fn puts(client: &TxnClient, pairs: impl IntoIterator>) { let mut txn = client.begin().wait().expect("Could not begin transaction"); let _: Vec<()> = pairs @@ -44,9 +41,7 @@ fn scan(client: &TxnClient, start: &Key, limit: usize) { } } -fn dels

(client: &TxnClient, pairs: P) -where - P: IntoIterator, +fn dels(client: &TxnClient, pairs: impl IntoIterator) { let mut txn = client.begin().wait().expect("Could not begin transaction"); let _: Vec<()> = pairs diff --git a/src/lib.rs b/src/lib.rs index 58db956f..ba1ed0ab 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,9 +6,9 @@ extern crate serde_derive; extern crate quick_error; extern crate grpcio as grpc; +pub mod errors; pub mod raw; pub mod transaction; -pub mod errors; use std::path::PathBuf; @@ -61,11 +61,7 @@ pub struct Config { } impl Config { - pub fn new(pd_endpoints: E) -> Self - where - E: IntoIterator, - S: Into, - { + pub fn new(pd_endpoints: impl IntoIterator>) -> Self { Config { pd_endpoints: pd_endpoints.into_iter().map(Into::into).collect(), ca_path: None, @@ -75,16 +71,13 @@ impl Config { } pub fn with_security( - pd_endpoints: E, + pd_endpoints: impl IntoIterator>, ca_path: PathBuf, cert_path: PathBuf, key_path: PathBuf, - ) -> Self - where - E: IntoIterator, - { + ) -> Self { Config { - pd_endpoints: pd_endpoints.into_iter().collect(), + pd_endpoints: pd_endpoints.into_iter().map(Into::into).collect(), ca_path: Some(ca_path), cert_path: Some(cert_path), key_path: Some(key_path), diff --git a/src/transaction.rs b/src/transaction.rs index 59859644..63dcdc68 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -49,31 +49,19 @@ impl Stream for Scanner { } pub trait Retriever { - fn get(&self, key: K) -> KvFuture - where - K: AsRef; + fn get(&self, key: impl AsRef) -> KvFuture; - fn batch_get(&self, keys: K) -> KvFuture> - where - K: AsRef<[Key]>; + fn batch_get(&self, keys: impl AsRef<[Key]>) -> KvFuture>; - fn seek(&self, key: K) -> KvFuture - where - K: AsRef; + fn seek(&self, key: impl AsRef) -> KvFuture; - fn seek_reverse(&self, key: K) -> KvFuture - where - K: AsRef; + fn seek_reverse(&self, key: impl AsRef) -> KvFuture; } pub trait Mutator { - fn set

(&mut self, pair: P) -> KvFuture<()> - where - P: Into; + fn set(&mut self, pair: impl Into) -> KvFuture<()>; - fn delete(&mut self, key: K) -> KvFuture<()> - where - K: AsRef; + fn delete(&mut self, key: impl AsRef) -> KvFuture<()>; } pub struct Transaction; @@ -87,10 +75,7 @@ impl Transaction { unimplemented!() } - pub fn lock_keys(&mut self, keys: K) -> KvFuture<()> - where - K: AsRef<[Key]>, - { + pub fn lock_keys(&mut self, keys: impl AsRef<[Key]>) -> KvFuture<()> { drop(keys); unimplemented!() } @@ -109,52 +94,34 @@ impl Transaction { } impl Retriever for Transaction { - fn get(&self, key: K) -> KvFuture - where - K: AsRef, - { + fn get(&self, key: impl AsRef) -> KvFuture { drop(key); unimplemented!() } - fn batch_get(&self, keys: K) -> KvFuture> - where - K: AsRef<[Key]>, - { + fn batch_get(&self, keys: impl AsRef<[Key]>) -> KvFuture> { drop(keys); unimplemented!() } - fn seek(&self, key: K) -> KvFuture - where - K: AsRef, - { + fn seek(&self, key: impl AsRef) -> KvFuture { drop(key); unimplemented!() } - fn seek_reverse(&self, key: K) -> KvFuture - where - K: AsRef, - { + fn seek_reverse(&self, key: impl AsRef) -> KvFuture { drop(key); unimplemented!() } } impl Mutator for Transaction { - fn set

(&mut self, pair: P) -> KvFuture<()> - where - P: Into, - { + fn set(&mut self, pair: impl Into) -> KvFuture<()> { drop(pair); unimplemented!() } - fn delete(&mut self, key: K) -> KvFuture<()> - where - K: AsRef, - { + fn delete(&mut self, key: impl AsRef) -> KvFuture<()> { drop(key); unimplemented!() } @@ -163,34 +130,22 @@ impl Mutator for Transaction { pub struct Snapshot; impl Retriever for Snapshot { - fn get(&self, key: K) -> KvFuture - where - K: AsRef, - { + fn get(&self, key: impl AsRef) -> KvFuture { drop(key); unimplemented!() } - fn batch_get(&self, keys: K) -> KvFuture> - where - K: AsRef<[Key]>, - { + fn batch_get(&self, keys: impl AsRef<[Key]>) -> KvFuture> { drop(keys); unimplemented!() } - fn seek(&self, key: K) -> KvFuture - where - K: AsRef, - { + fn seek(&self, key: impl AsRef) -> KvFuture { drop(key); unimplemented!() } - fn seek_reverse(&self, key: K) -> KvFuture - where - K: AsRef, - { + fn seek_reverse(&self, key: impl AsRef) -> KvFuture { drop(key); unimplemented!() } From bb0f0b44cfd89ff7cd52b96dc8202668bbeacb10 Mon Sep 17 00:00:00 2001 From: Xiaoguang Sun Date: Thu, 15 Nov 2018 08:53:50 +0800 Subject: [PATCH 08/17] Make it compile on stable toolchain Signed-off-by: Xiaoguang Sun --- examples/raw.rs | 2 +- examples/transaction.rs | 6 ++--- src/lib.rs | 17 ++++++++++++ src/raw.rs | 58 ++++++++++++++++++++--------------------- 4 files changed, 49 insertions(+), 34 deletions(-) diff --git a/examples/raw.rs b/examples/raw.rs index 3e328f6f..a5d7e4df 100644 --- a/examples/raw.rs +++ b/examples/raw.rs @@ -14,7 +14,7 @@ fn main() { let key: Key = b"Company".to_vec().into(); let value: Value = b"PingCAP".to_vec().into(); - raw.put((Clone::clone(&key), Clone::clone(&value))) + raw.put((key.clone(), value.clone())) .cf("test_cf") .wait() .expect("Could not put kv pair to tikv"); diff --git a/examples/transaction.rs b/examples/transaction.rs index 1c37b3e6..1b78d2b8 100644 --- a/examples/transaction.rs +++ b/examples/transaction.rs @@ -5,8 +5,7 @@ use futures::{Async, Future, Stream}; use tikv_client::transaction::{Client, Mutator, Retriever, TxnClient}; use tikv_client::*; -fn puts(client: &TxnClient, pairs: impl IntoIterator>) -{ +fn puts(client: &TxnClient, pairs: impl IntoIterator>) { let mut txn = client.begin().wait().expect("Could not begin transaction"); let _: Vec<()> = pairs .into_iter() @@ -41,8 +40,7 @@ fn scan(client: &TxnClient, start: &Key, limit: usize) { } } -fn dels(client: &TxnClient, pairs: impl IntoIterator) -{ +fn dels(client: &TxnClient, pairs: impl IntoIterator) { let mut txn = client.begin().wait().expect("Could not begin transaction"); let _: Vec<()> = pairs .into_iter() diff --git a/src/lib.rs b/src/lib.rs index ba1ed0ab..5b3f79d9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,6 +11,7 @@ pub mod raw; pub mod transaction; use std::path::PathBuf; +use std::ops::Deref; use futures::Future; @@ -38,12 +39,28 @@ impl AsRef for Key { } } +impl Deref for Key { + type Target = Vec; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + impl Into for Vec { fn into(self) -> Value { Value(self) } } +impl Deref for Value { + type Target = Vec; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + impl Into for (Key, Value) { fn into(self) -> KvPair { KvPair(self.0, self.1) diff --git a/src/raw.rs b/src/raw.rs index 8e8879ec..e972fb4c 100644 --- a/src/raw.rs +++ b/src/raw.rs @@ -19,13 +19,13 @@ impl<'a> Into for &'a str { } } -pub struct Get<'a, AClient: Client> { +pub struct Get<'a, AClient: Client + 'a> { client: &'a AClient, key: Key, cf: Option, } -impl<'a, AClient: Client> Get<'a, AClient> { +impl<'a, AClient: Client + 'a> Get<'a, AClient> { fn new(client: &'a AClient, key: Key) -> Self { Get { client, @@ -40,7 +40,7 @@ impl<'a, AClient: Client> Get<'a, AClient> { } } -impl<'a, AClient: Client> Future for Get<'a, AClient> { +impl<'a, AClient: Client + 'a> Future for Get<'a, AClient> { type Item = Value; type Error = (); @@ -52,13 +52,13 @@ impl<'a, AClient: Client> Future for Get<'a, AClient> { } } -pub struct BatchGet<'a, AClient: Client> { +pub struct BatchGet<'a, AClient: Client + 'a> { client: &'a AClient, keys: Vec, cf: Option, } -impl<'a, AClient: Client> BatchGet<'a, AClient> { +impl<'a, AClient: Client + 'a> BatchGet<'a, AClient> { fn new(client: &'a AClient, keys: Vec) -> Self { BatchGet { client, @@ -73,7 +73,7 @@ impl<'a, AClient: Client> BatchGet<'a, AClient> { } } -impl<'a, AClient: Client> Future for BatchGet<'a, AClient> { +impl<'a, AClient: Client + 'a> Future for BatchGet<'a, AClient> { type Item = Vec; type Error = (); @@ -85,13 +85,13 @@ impl<'a, AClient: Client> Future for BatchGet<'a, AClient> { } } -pub struct Put<'a, AClient: Client> { +pub struct Put<'a, AClient: Client + 'a> { client: &'a AClient, pair: KvPair, cf: Option, } -impl<'a, AClient: Client> Put<'a, AClient> { +impl<'a, AClient: Client + 'a> Put<'a, AClient> { fn new(client: &'a AClient, pair: KvPair) -> Self { Put { client, @@ -106,7 +106,7 @@ impl<'a, AClient: Client> Put<'a, AClient> { } } -impl<'a, AClient: Client> Future for Put<'a, AClient> { +impl<'a, AClient: Client + 'a> Future for Put<'a, AClient> { type Item = (); type Error = (); @@ -118,13 +118,13 @@ impl<'a, AClient: Client> Future for Put<'a, AClient> { } } -pub struct BatchPut<'a, AClient: Client> { +pub struct BatchPut<'a, AClient: Client + 'a> { client: &'a AClient, pairs: Vec, cf: Option, } -impl<'a, AClient: Client> BatchPut<'a, AClient> { +impl<'a, AClient: Client + 'a> BatchPut<'a, AClient> { fn new(client: &'a AClient, pairs: Vec) -> Self { BatchPut { client, @@ -139,7 +139,7 @@ impl<'a, AClient: Client> BatchPut<'a, AClient> { } } -impl<'a, AClient: Client> Future for BatchPut<'a, AClient> { +impl<'a, AClient: Client + 'a> Future for BatchPut<'a, AClient> { type Item = (); type Error = (); @@ -151,13 +151,13 @@ impl<'a, AClient: Client> Future for BatchPut<'a, AClient> { } } -pub struct Delete<'a, AClient: Client> { +pub struct Delete<'a, AClient: Client + 'a> { client: &'a AClient, key: Key, cf: Option, } -impl<'a, AClient: Client> Delete<'a, AClient> { +impl<'a, AClient: Client + 'a> Delete<'a, AClient> { fn new(client: &'a AClient, key: Key) -> Self { Delete { client, @@ -172,7 +172,7 @@ impl<'a, AClient: Client> Delete<'a, AClient> { } } -impl<'a, AClient: Client> Future for Delete<'a, AClient> { +impl<'a, AClient: Client + 'a> Future for Delete<'a, AClient> { type Item = (); type Error = (); @@ -184,13 +184,13 @@ impl<'a, AClient: Client> Future for Delete<'a, AClient> { } } -pub struct BatchDelete<'a, AClient: Client> { +pub struct BatchDelete<'a, AClient: Client + 'a> { client: &'a AClient, keys: Vec, cf: Option, } -impl<'a, AClient: Client> BatchDelete<'a, AClient> { +impl<'a, AClient: Client + 'a> BatchDelete<'a, AClient> { fn new(client: &'a AClient, keys: Vec) -> Self { BatchDelete { client, @@ -205,7 +205,7 @@ impl<'a, AClient: Client> BatchDelete<'a, AClient> { } } -impl<'a, AClient: Client> Future for BatchDelete<'a, AClient> { +impl<'a, AClient: Client + 'a> Future for BatchDelete<'a, AClient> { type Item = (); type Error = (); @@ -217,7 +217,7 @@ impl<'a, AClient: Client> Future for BatchDelete<'a, AClient> { } } -pub struct Scan<'a, AClient: Client> { +pub struct Scan<'a, AClient: Client + 'a> { client: &'a AClient, range: (Key, Key), limit: u32, @@ -225,7 +225,7 @@ pub struct Scan<'a, AClient: Client> { cf: Option, } -impl<'a, AClient: Client> Scan<'a, AClient> { +impl<'a, AClient: Client + 'a> Scan<'a, AClient> { fn new(client: &'a AClient, range: (Key, Key), limit: u32) -> Self { Scan { client, @@ -247,7 +247,7 @@ impl<'a, AClient: Client> Scan<'a, AClient> { } } -impl<'a, AClient: Client> Future for Scan<'a, AClient> { +impl<'a, AClient: Client + 'a> Future for Scan<'a, AClient> { type Item = Vec; type Error = (); @@ -261,7 +261,7 @@ impl<'a, AClient: Client> Future for Scan<'a, AClient> { } } -pub struct BatchScan<'a, AClient: Client> { +pub struct BatchScan<'a, AClient: Client + 'a> { client: &'a AClient, ranges: Vec<(Key, Key)>, each_limit: u32, @@ -269,7 +269,7 @@ pub struct BatchScan<'a, AClient: Client> { cf: Option, } -impl<'a, AClient: Client> BatchScan<'a, AClient> { +impl<'a, AClient: Client + 'a> BatchScan<'a, AClient> { fn new(client: &'a AClient, ranges: Vec<(Key, Key)>, each_limit: u32) -> Self { BatchScan { client, @@ -291,7 +291,7 @@ impl<'a, AClient: Client> BatchScan<'a, AClient> { } } -impl<'a, AClient: Client> Future for BatchScan<'a, AClient> { +impl<'a, AClient: Client + 'a> Future for BatchScan<'a, AClient> { type Item = Vec; type Error = (); @@ -305,13 +305,13 @@ impl<'a, AClient: Client> Future for BatchScan<'a, AClient> { } } -pub struct DeleteRange<'a, AClient: Client> { +pub struct DeleteRange<'a, AClient: Client + 'a> { client: &'a AClient, range: (Key, Key), cf: Option, } -impl<'a, AClient: Client> DeleteRange<'a, AClient> { +impl<'a, AClient: Client + 'a> DeleteRange<'a, AClient> { fn new(client: &'a AClient, range: (Key, Key)) -> Self { DeleteRange { client, @@ -326,7 +326,7 @@ impl<'a, AClient: Client> DeleteRange<'a, AClient> { } } -impl<'a, AClient: Client> Future for DeleteRange<'a, AClient> { +impl<'a, AClient: Client + 'a> Future for DeleteRange<'a, AClient> { type Item = (); type Error = (); @@ -386,7 +386,7 @@ impl Client for RawClient { type AClient = Self; fn get(&self, key: impl AsRef) -> Get { - Get::new(self, Clone::clone(key.as_ref())) + Get::new(self, key.as_ref().clone()) } fn batch_get(&self, keys: impl AsRef<[Key]>) -> BatchGet { @@ -402,7 +402,7 @@ impl Client for RawClient { } fn delete(&self, key: impl AsRef) -> Delete { - Delete::new(self, Clone::clone(key.as_ref())) + Delete::new(self, key.as_ref().clone()) } fn batch_delete(&self, keys: impl AsRef<[Key]>) -> BatchDelete { From d5e2b9e401d5859103060c8e38a86f22636f4747 Mon Sep 17 00:00:00 2001 From: Xiaoguang Sun Date: Thu, 15 Nov 2018 10:18:40 +0800 Subject: [PATCH 09/17] Use RangeBounds for transactional api Scanner Signed-off-by: Xiaoguang Sun --- examples/transaction.rs | 43 +++++++------ src/lib.rs | 4 +- src/raw.rs | 134 +++++++++++++++++++++------------------- src/transaction.rs | 33 ++++------ 4 files changed, 108 insertions(+), 106 deletions(-) diff --git a/examples/transaction.rs b/examples/transaction.rs index 1b78d2b8..a0641aea 100644 --- a/examples/transaction.rs +++ b/examples/transaction.rs @@ -1,7 +1,9 @@ extern crate futures; extern crate tikv_client; -use futures::{Async, Future, Stream}; +use std::ops::RangeBounds; + +use futures::{Future, Stream}; use tikv_client::transaction::{Client, Mutator, Retriever, TxnClient}; use tikv_client::*; @@ -21,28 +23,29 @@ fn get(client: &TxnClient, key: &Key) -> Value { txn.get(key).wait().expect("Could not get value") } -fn scan(client: &TxnClient, start: &Key, limit: usize) { - let txn = client.begin().wait().expect("Could not begin transaction"); - let mut scanner = txn.seek(start).wait().expect("Could not seek to start key"); - let mut limit = limit; - loop { - if limit == 0 { - break; - } - match scanner.poll() { - Ok(Async::Ready(None)) => return, - Ok(Async::Ready(Some(pair))) => { +fn scan(client: &TxnClient, range: impl RangeBounds, mut limit: usize) { + client + .begin() + .wait() + .expect("Could not begin transaction") + .scan(range) + .take_while(move |_| { + Ok(if limit == 0 { + false + } else { limit -= 1; - println!("{:?}", pair); - } - _ => break, - } - } + true + }) + }).for_each(|pair| { + println!("{:?}", pair); + Ok(()) + }).wait() + .expect("Could not scan keys"); } -fn dels(client: &TxnClient, pairs: impl IntoIterator) { +fn dels(client: &TxnClient, keys: impl IntoIterator) { let mut txn = client.begin().wait().expect("Could not begin transaction"); - let _: Vec<()> = pairs + let _: Vec<()> = keys .into_iter() .map(|p| { txn.delete(p).wait().expect("Could not delete key"); @@ -70,7 +73,7 @@ fn main() { // scan let key1: Key = b"key1".to_vec().into(); - scan(&txn, &key1, 10); + scan(&txn, key1.., 10); // delete let key1: Key = b"key1".to_vec().into(); diff --git a/src/lib.rs b/src/lib.rs index 5b3f79d9..2bdd68e4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,8 +10,8 @@ pub mod errors; pub mod raw; pub mod transaction; -use std::path::PathBuf; use std::ops::Deref; +use std::path::PathBuf; use futures::Future; @@ -87,7 +87,7 @@ impl Config { } } - pub fn with_security( + pub fn with_security( pd_endpoints: impl IntoIterator>, ca_path: PathBuf, cert_path: PathBuf, diff --git a/src/raw.rs b/src/raw.rs index e972fb4c..b015684a 100644 --- a/src/raw.rs +++ b/src/raw.rs @@ -19,14 +19,14 @@ impl<'a> Into for &'a str { } } -pub struct Get<'a, AClient: Client + 'a> { - client: &'a AClient, +pub struct Get<'a, Impl: Client + 'a> { + client: &'a Impl, key: Key, cf: Option, } -impl<'a, AClient: Client + 'a> Get<'a, AClient> { - fn new(client: &'a AClient, key: Key) -> Self { +impl<'a, Impl: Client + 'a> Get<'a, Impl> { + fn new(client: &'a Impl, key: Key) -> Self { Get { client, key, @@ -40,7 +40,7 @@ impl<'a, AClient: Client + 'a> Get<'a, AClient> { } } -impl<'a, AClient: Client + 'a> Future for Get<'a, AClient> { +impl<'a, Impl: Client + 'a> Future for Get<'a, Impl> { type Item = Value; type Error = (); @@ -52,14 +52,14 @@ impl<'a, AClient: Client + 'a> Future for Get<'a, AClient> { } } -pub struct BatchGet<'a, AClient: Client + 'a> { - client: &'a AClient, +pub struct BatchGet<'a, Impl: Client + 'a> { + client: &'a Impl, keys: Vec, cf: Option, } -impl<'a, AClient: Client + 'a> BatchGet<'a, AClient> { - fn new(client: &'a AClient, keys: Vec) -> Self { +impl<'a, Impl: Client + 'a> BatchGet<'a, Impl> { + fn new(client: &'a Impl, keys: Vec) -> Self { BatchGet { client, keys, @@ -73,7 +73,7 @@ impl<'a, AClient: Client + 'a> BatchGet<'a, AClient> { } } -impl<'a, AClient: Client + 'a> Future for BatchGet<'a, AClient> { +impl<'a, Impl: Client + 'a> Future for BatchGet<'a, Impl> { type Item = Vec; type Error = (); @@ -85,14 +85,14 @@ impl<'a, AClient: Client + 'a> Future for BatchGet<'a, AClient> { } } -pub struct Put<'a, AClient: Client + 'a> { - client: &'a AClient, +pub struct Put<'a, Impl: Client + 'a> { + client: &'a Impl, pair: KvPair, cf: Option, } -impl<'a, AClient: Client + 'a> Put<'a, AClient> { - fn new(client: &'a AClient, pair: KvPair) -> Self { +impl<'a, Impl: Client + 'a> Put<'a, Impl> { + fn new(client: &'a Impl, pair: KvPair) -> Self { Put { client, pair, @@ -106,7 +106,7 @@ impl<'a, AClient: Client + 'a> Put<'a, AClient> { } } -impl<'a, AClient: Client + 'a> Future for Put<'a, AClient> { +impl<'a, Impl: Client + 'a> Future for Put<'a, Impl> { type Item = (); type Error = (); @@ -118,14 +118,14 @@ impl<'a, AClient: Client + 'a> Future for Put<'a, AClient> { } } -pub struct BatchPut<'a, AClient: Client + 'a> { - client: &'a AClient, +pub struct BatchPut<'a, Impl: Client + 'a> { + client: &'a Impl, pairs: Vec, cf: Option, } -impl<'a, AClient: Client + 'a> BatchPut<'a, AClient> { - fn new(client: &'a AClient, pairs: Vec) -> Self { +impl<'a, Impl: Client + 'a> BatchPut<'a, Impl> { + fn new(client: &'a Impl, pairs: Vec) -> Self { BatchPut { client, pairs, @@ -139,7 +139,7 @@ impl<'a, AClient: Client + 'a> BatchPut<'a, AClient> { } } -impl<'a, AClient: Client + 'a> Future for BatchPut<'a, AClient> { +impl<'a, Impl: Client + 'a> Future for BatchPut<'a, Impl> { type Item = (); type Error = (); @@ -151,14 +151,14 @@ impl<'a, AClient: Client + 'a> Future for BatchPut<'a, AClient> { } } -pub struct Delete<'a, AClient: Client + 'a> { - client: &'a AClient, +pub struct Delete<'a, Impl: Client + 'a> { + client: &'a Impl, key: Key, cf: Option, } -impl<'a, AClient: Client + 'a> Delete<'a, AClient> { - fn new(client: &'a AClient, key: Key) -> Self { +impl<'a, Impl: Client + 'a> Delete<'a, Impl> { + fn new(client: &'a Impl, key: Key) -> Self { Delete { client, key, @@ -172,7 +172,7 @@ impl<'a, AClient: Client + 'a> Delete<'a, AClient> { } } -impl<'a, AClient: Client + 'a> Future for Delete<'a, AClient> { +impl<'a, Impl: Client + 'a> Future for Delete<'a, Impl> { type Item = (); type Error = (); @@ -184,14 +184,14 @@ impl<'a, AClient: Client + 'a> Future for Delete<'a, AClient> { } } -pub struct BatchDelete<'a, AClient: Client + 'a> { - client: &'a AClient, +pub struct BatchDelete<'a, Impl: Client + 'a> { + client: &'a Impl, keys: Vec, cf: Option, } -impl<'a, AClient: Client + 'a> BatchDelete<'a, AClient> { - fn new(client: &'a AClient, keys: Vec) -> Self { +impl<'a, Impl: Client + 'a> BatchDelete<'a, Impl> { + fn new(client: &'a Impl, keys: Vec) -> Self { BatchDelete { client, keys, @@ -205,7 +205,7 @@ impl<'a, AClient: Client + 'a> BatchDelete<'a, AClient> { } } -impl<'a, AClient: Client + 'a> Future for BatchDelete<'a, AClient> { +impl<'a, Impl: Client + 'a> Future for BatchDelete<'a, Impl> { type Item = (); type Error = (); @@ -217,22 +217,24 @@ impl<'a, AClient: Client + 'a> Future for BatchDelete<'a, AClient> { } } -pub struct Scan<'a, AClient: Client + 'a> { - client: &'a AClient, +pub struct Scan<'a, Impl: Client + 'a> { + client: &'a Impl, range: (Key, Key), limit: u32, key_only: bool, cf: Option, + reverse: bool, } -impl<'a, AClient: Client + 'a> Scan<'a, AClient> { - fn new(client: &'a AClient, range: (Key, Key), limit: u32) -> Self { +impl<'a, Impl: Client + 'a> Scan<'a, Impl> { + fn new(client: &'a Impl, range: (Key, Key), limit: u32) -> Self { Scan { client, range, limit, key_only: false, cf: None, + reverse: false, } } @@ -245,9 +247,14 @@ impl<'a, AClient: Client + 'a> Scan<'a, AClient> { self.cf = Some(cf.into()); self } + + pub fn reverse(mut self) -> Self { + self.reverse = true; + self + } } -impl<'a, AClient: Client + 'a> Future for Scan<'a, AClient> { +impl<'a, Impl: Client + 'a> Future for Scan<'a, Impl> { type Item = Vec; type Error = (); @@ -261,22 +268,24 @@ impl<'a, AClient: Client + 'a> Future for Scan<'a, AClient> { } } -pub struct BatchScan<'a, AClient: Client + 'a> { - client: &'a AClient, +pub struct BatchScan<'a, Impl: Client + 'a> { + client: &'a Impl, ranges: Vec<(Key, Key)>, each_limit: u32, key_only: bool, cf: Option, + reverse: bool, } -impl<'a, AClient: Client + 'a> BatchScan<'a, AClient> { - fn new(client: &'a AClient, ranges: Vec<(Key, Key)>, each_limit: u32) -> Self { +impl<'a, Impl: Client + 'a> BatchScan<'a, Impl> { + fn new(client: &'a Impl, ranges: Vec<(Key, Key)>, each_limit: u32) -> Self { BatchScan { client, ranges, each_limit, key_only: false, cf: None, + reverse: false, } } @@ -289,9 +298,14 @@ impl<'a, AClient: Client + 'a> BatchScan<'a, AClient> { self.cf = Some(cf.into()); self } + + pub fn reverse(mut self) -> Self { + self.reverse = true; + self + } } -impl<'a, AClient: Client + 'a> Future for BatchScan<'a, AClient> { +impl<'a, Impl: Client + 'a> Future for BatchScan<'a, Impl> { type Item = Vec; type Error = (); @@ -305,14 +319,14 @@ impl<'a, AClient: Client + 'a> Future for BatchScan<'a, AClient> { } } -pub struct DeleteRange<'a, AClient: Client + 'a> { - client: &'a AClient, +pub struct DeleteRange<'a, Impl: Client + 'a> { + client: &'a Impl, range: (Key, Key), cf: Option, } -impl<'a, AClient: Client + 'a> DeleteRange<'a, AClient> { - fn new(client: &'a AClient, range: (Key, Key)) -> Self { +impl<'a, Impl: Client + 'a> DeleteRange<'a, Impl> { + fn new(client: &'a Impl, range: (Key, Key)) -> Self { DeleteRange { client, range, @@ -326,7 +340,7 @@ impl<'a, AClient: Client + 'a> DeleteRange<'a, AClient> { } } -impl<'a, AClient: Client + 'a> Future for DeleteRange<'a, AClient> { +impl<'a, Impl: Client + 'a> Future for DeleteRange<'a, Impl> { type Item = (); type Error = (); @@ -339,35 +353,29 @@ impl<'a, AClient: Client + 'a> Future for DeleteRange<'a, AClient> { } pub trait Client { - type AClient: Client; + type Impl: Client; - fn get(&self, key: impl AsRef) -> Get; + fn get(&self, key: impl AsRef) -> Get; - fn batch_get(&self, keys: impl AsRef<[Key]>) -> BatchGet; + fn batch_get(&self, keys: impl AsRef<[Key]>) -> BatchGet; - fn put(&self, pair: impl Into) -> Put; + fn put(&self, pair: impl Into) -> Put; - fn batch_put( - &self, - pairs: impl IntoIterator>, - ) -> BatchPut; + fn batch_put(&self, pairs: impl IntoIterator>) + -> BatchPut; - fn delete(&self, key: impl AsRef) -> Delete; + fn delete(&self, key: impl AsRef) -> Delete; - fn batch_delete(&self, keys: impl AsRef<[Key]>) -> BatchDelete; + fn batch_delete(&self, keys: impl AsRef<[Key]>) -> BatchDelete; - fn scan(&self, range: impl RangeBounds, limit: u32) -> Scan; + fn scan(&self, range: impl RangeBounds, limit: u32) -> Scan; - fn batch_scan( - &self, - ranges: Ranges, - each_limit: u32, - ) -> BatchScan + fn batch_scan(&self, ranges: Ranges, each_limit: u32) -> BatchScan where Ranges: AsRef<[Bounds]>, Bounds: RangeBounds; - fn delete_range(&self, range: impl RangeBounds) -> DeleteRange; + fn delete_range(&self, range: impl RangeBounds) -> DeleteRange; } pub struct RawClient; @@ -383,7 +391,7 @@ impl RawClient { } impl Client for RawClient { - type AClient = Self; + type Impl = Self; fn get(&self, key: impl AsRef) -> Get { Get::new(self, key.as_ref().clone()) diff --git a/src/transaction.rs b/src/transaction.rs index 63dcdc68..3a5574dd 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -1,4 +1,5 @@ -use std::io::Error; +use std::ops::RangeBounds; +use Error; use futures::{Poll, Stream}; @@ -29,16 +30,6 @@ impl Timestamp { pub struct Scanner; -impl Scanner { - pub fn set_limit(&mut self, _limit: u32) { - unimplemented!() - } - - pub fn set_key_only(&mut self, _key_only: bool) { - unimplemented!() - } -} - impl Stream for Scanner { type Item = KvPair; type Error = Error; @@ -53,9 +44,9 @@ pub trait Retriever { fn batch_get(&self, keys: impl AsRef<[Key]>) -> KvFuture>; - fn seek(&self, key: impl AsRef) -> KvFuture; + fn scan(&self, range: impl RangeBounds) -> Scanner; - fn seek_reverse(&self, key: impl AsRef) -> KvFuture; + fn scan_reverse(&self, range: impl RangeBounds) -> Scanner; } pub trait Mutator { @@ -104,13 +95,13 @@ impl Retriever for Transaction { unimplemented!() } - fn seek(&self, key: impl AsRef) -> KvFuture { - drop(key); + fn scan(&self, range: impl RangeBounds) -> Scanner { + drop(range); unimplemented!() } - fn seek_reverse(&self, key: impl AsRef) -> KvFuture { - drop(key); + fn scan_reverse(&self, range: impl RangeBounds) -> Scanner { + drop(range); unimplemented!() } } @@ -140,13 +131,13 @@ impl Retriever for Snapshot { unimplemented!() } - fn seek(&self, key: impl AsRef) -> KvFuture { - drop(key); + fn scan(&self, range: impl RangeBounds) -> Scanner { + drop(range); unimplemented!() } - fn seek_reverse(&self, key: impl AsRef) -> KvFuture { - drop(key); + fn scan_reverse(&self, range: impl RangeBounds) -> Scanner { + drop(range); unimplemented!() } } From fd8e249e822227490761e87abe897276a77943ff Mon Sep 17 00:00:00 2001 From: Xiaoguang Sun Date: Thu, 15 Nov 2018 22:41:06 +0800 Subject: [PATCH 10/17] Add transaction isolation level Signed-off-by: Xiaoguang Sun --- examples/transaction.rs | 19 +++++++------------ src/transaction.rs | 24 +++++++++++++++++------- 2 files changed, 24 insertions(+), 19 deletions(-) diff --git a/examples/transaction.rs b/examples/transaction.rs index a0641aea..176c592a 100644 --- a/examples/transaction.rs +++ b/examples/transaction.rs @@ -3,31 +3,26 @@ extern crate tikv_client; use std::ops::RangeBounds; -use futures::{Future, Stream}; +use futures::{future, Future, Stream}; use tikv_client::transaction::{Client, Mutator, Retriever, TxnClient}; use tikv_client::*; fn puts(client: &TxnClient, pairs: impl IntoIterator>) { - let mut txn = client.begin().wait().expect("Could not begin transaction"); - let _: Vec<()> = pairs - .into_iter() - .map(Into::into) - .map(|p| { - txn.set(p).wait().expect("Could not set key value pair"); - }).collect(); + let mut txn = client.begin(); + let _: Vec<()> = future::join_all(pairs.into_iter().map(Into::into).map(|p| txn.set(p))) + .wait() + .expect("Could not set key value pairs"); txn.commit().wait().expect("Could not commit transaction"); } fn get(client: &TxnClient, key: &Key) -> Value { - let txn = client.begin().wait().expect("Could not begin transaction"); + let txn = client.begin(); txn.get(key).wait().expect("Could not get value") } fn scan(client: &TxnClient, range: impl RangeBounds, mut limit: usize) { client .begin() - .wait() - .expect("Could not begin transaction") .scan(range) .take_while(move |_| { Ok(if limit == 0 { @@ -44,7 +39,7 @@ fn scan(client: &TxnClient, range: impl RangeBounds, mut limit: usize) { } fn dels(client: &TxnClient, keys: impl IntoIterator) { - let mut txn = client.begin().wait().expect("Could not begin transaction"); + let mut txn = client.begin(); let _: Vec<()> = keys .into_iter() .map(|p| { diff --git a/src/transaction.rs b/src/transaction.rs index 3a5574dd..0519e037 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -55,6 +55,12 @@ pub trait Mutator { fn delete(&mut self, key: impl AsRef) -> KvFuture<()>; } +#[derive(Copy, Clone, Eq, PartialEq, Debug)] +pub enum IsolationLevel { + SnapshotIsolation, + ReadCommitted, +} + pub struct Transaction; impl Transaction { @@ -79,7 +85,11 @@ impl Transaction { unimplemented!() } - pub fn snapshot(&self) -> KvFuture { + pub fn snapshot(&self) -> Snapshot { + unimplemented!() + } + + pub fn set_isolation_level(&mut self, _level: IsolationLevel) { unimplemented!() } } @@ -143,11 +153,11 @@ impl Retriever for Snapshot { } pub trait Client { - fn begin(&self) -> KvFuture; + fn begin(&self) -> Transaction; - fn begin_with_timestamp(&self, _timestamp: Timestamp) -> KvFuture; + fn begin_with_timestamp(&self, _timestamp: Timestamp) -> Transaction; - fn snapshot(&self) -> KvFuture; + fn snapshot(&self) -> Snapshot; fn current_timestamp(&self) -> Timestamp; } @@ -161,15 +171,15 @@ impl TxnClient { } impl Client for TxnClient { - fn begin(&self) -> KvFuture { + fn begin(&self) -> Transaction { unimplemented!() } - fn begin_with_timestamp(&self, _timestamp: Timestamp) -> KvFuture { + fn begin_with_timestamp(&self, _timestamp: Timestamp) -> Transaction { unimplemented!() } - fn snapshot(&self) -> KvFuture { + fn snapshot(&self) -> Snapshot { unimplemented!() } From 59f9c706128adc3d7bdaed02c3c9730f6f37a210 Mon Sep 17 00:00:00 2001 From: Xiaoguang Sun Date: Fri, 16 Nov 2018 23:14:23 +0800 Subject: [PATCH 11/17] Remove trait from raw client api Signed-off-by: Xiaoguang Sun --- examples/raw.rs | 3 +- examples/transaction.rs | 3 +- src/raw.rs | 142 ++++++++++++++++------------------------ 3 files changed, 59 insertions(+), 89 deletions(-) diff --git a/examples/raw.rs b/examples/raw.rs index a5d7e4df..88a5b224 100644 --- a/examples/raw.rs +++ b/examples/raw.rs @@ -2,12 +2,11 @@ extern crate futures; extern crate tikv_client; use futures::future::Future; -use tikv_client::raw::Client; use tikv_client::*; fn main() { let config = Config::new(vec!["127.0.0.1:3379"]); - let raw = raw::RawClient::new(&config) + let raw = raw::Client::new(&config) .wait() .expect("Could not connect to tikv"); diff --git a/examples/transaction.rs b/examples/transaction.rs index 176c592a..a6ae2262 100644 --- a/examples/transaction.rs +++ b/examples/transaction.rs @@ -4,7 +4,7 @@ extern crate tikv_client; use std::ops::RangeBounds; use futures::{future, Future, Stream}; -use tikv_client::transaction::{Client, Mutator, Retriever, TxnClient}; +use tikv_client::transaction::{Client, IsolationLevel, Mutator, Retriever, TxnClient}; use tikv_client::*; fn puts(client: &TxnClient, pairs: impl IntoIterator>) { @@ -40,6 +40,7 @@ fn scan(client: &TxnClient, range: impl RangeBounds, mut limit: usize) { fn dels(client: &TxnClient, keys: impl IntoIterator) { let mut txn = client.begin(); + txn.set_isolation_level(IsolationLevel::ReadCommitted); let _: Vec<()> = keys .into_iter() .map(|p| { diff --git a/src/raw.rs b/src/raw.rs index b015684a..41b701f8 100644 --- a/src/raw.rs +++ b/src/raw.rs @@ -19,14 +19,14 @@ impl<'a> Into for &'a str { } } -pub struct Get<'a, Impl: Client + 'a> { - client: &'a Impl, +pub struct Get<'a> { + client: &'a Client, key: Key, cf: Option, } -impl<'a, Impl: Client + 'a> Get<'a, Impl> { - fn new(client: &'a Impl, key: Key) -> Self { +impl<'a> Get<'a> { + fn new(client: &'a Client, key: Key) -> Self { Get { client, key, @@ -40,7 +40,7 @@ impl<'a, Impl: Client + 'a> Get<'a, Impl> { } } -impl<'a, Impl: Client + 'a> Future for Get<'a, Impl> { +impl<'a> Future for Get<'a> { type Item = Value; type Error = (); @@ -52,14 +52,14 @@ impl<'a, Impl: Client + 'a> Future for Get<'a, Impl> { } } -pub struct BatchGet<'a, Impl: Client + 'a> { - client: &'a Impl, +pub struct BatchGet<'a> { + client: &'a Client, keys: Vec, cf: Option, } -impl<'a, Impl: Client + 'a> BatchGet<'a, Impl> { - fn new(client: &'a Impl, keys: Vec) -> Self { +impl<'a> BatchGet<'a> { + fn new(client: &'a Client, keys: Vec) -> Self { BatchGet { client, keys, @@ -73,7 +73,7 @@ impl<'a, Impl: Client + 'a> BatchGet<'a, Impl> { } } -impl<'a, Impl: Client + 'a> Future for BatchGet<'a, Impl> { +impl<'a> Future for BatchGet<'a> { type Item = Vec; type Error = (); @@ -85,14 +85,14 @@ impl<'a, Impl: Client + 'a> Future for BatchGet<'a, Impl> { } } -pub struct Put<'a, Impl: Client + 'a> { - client: &'a Impl, +pub struct Put<'a> { + client: &'a Client, pair: KvPair, cf: Option, } -impl<'a, Impl: Client + 'a> Put<'a, Impl> { - fn new(client: &'a Impl, pair: KvPair) -> Self { +impl<'a> Put<'a> { + fn new(client: &'a Client, pair: KvPair) -> Self { Put { client, pair, @@ -106,7 +106,7 @@ impl<'a, Impl: Client + 'a> Put<'a, Impl> { } } -impl<'a, Impl: Client + 'a> Future for Put<'a, Impl> { +impl<'a> Future for Put<'a> { type Item = (); type Error = (); @@ -118,14 +118,14 @@ impl<'a, Impl: Client + 'a> Future for Put<'a, Impl> { } } -pub struct BatchPut<'a, Impl: Client + 'a> { - client: &'a Impl, +pub struct BatchPut<'a> { + client: &'a Client, pairs: Vec, cf: Option, } -impl<'a, Impl: Client + 'a> BatchPut<'a, Impl> { - fn new(client: &'a Impl, pairs: Vec) -> Self { +impl<'a> BatchPut<'a> { + fn new(client: &'a Client, pairs: Vec) -> Self { BatchPut { client, pairs, @@ -139,7 +139,7 @@ impl<'a, Impl: Client + 'a> BatchPut<'a, Impl> { } } -impl<'a, Impl: Client + 'a> Future for BatchPut<'a, Impl> { +impl<'a> Future for BatchPut<'a> { type Item = (); type Error = (); @@ -151,14 +151,14 @@ impl<'a, Impl: Client + 'a> Future for BatchPut<'a, Impl> { } } -pub struct Delete<'a, Impl: Client + 'a> { - client: &'a Impl, +pub struct Delete<'a> { + client: &'a Client, key: Key, cf: Option, } -impl<'a, Impl: Client + 'a> Delete<'a, Impl> { - fn new(client: &'a Impl, key: Key) -> Self { +impl<'a> Delete<'a> { + fn new(client: &'a Client, key: Key) -> Self { Delete { client, key, @@ -172,7 +172,7 @@ impl<'a, Impl: Client + 'a> Delete<'a, Impl> { } } -impl<'a, Impl: Client + 'a> Future for Delete<'a, Impl> { +impl<'a> Future for Delete<'a> { type Item = (); type Error = (); @@ -184,14 +184,14 @@ impl<'a, Impl: Client + 'a> Future for Delete<'a, Impl> { } } -pub struct BatchDelete<'a, Impl: Client + 'a> { - client: &'a Impl, +pub struct BatchDelete<'a> { + client: &'a Client, keys: Vec, cf: Option, } -impl<'a, Impl: Client + 'a> BatchDelete<'a, Impl> { - fn new(client: &'a Impl, keys: Vec) -> Self { +impl<'a> BatchDelete<'a> { + fn new(client: &'a Client, keys: Vec) -> Self { BatchDelete { client, keys, @@ -205,7 +205,7 @@ impl<'a, Impl: Client + 'a> BatchDelete<'a, Impl> { } } -impl<'a, Impl: Client + 'a> Future for BatchDelete<'a, Impl> { +impl<'a> Future for BatchDelete<'a> { type Item = (); type Error = (); @@ -217,8 +217,8 @@ impl<'a, Impl: Client + 'a> Future for BatchDelete<'a, Impl> { } } -pub struct Scan<'a, Impl: Client + 'a> { - client: &'a Impl, +pub struct Scan<'a> { + client: &'a Client, range: (Key, Key), limit: u32, key_only: bool, @@ -226,8 +226,8 @@ pub struct Scan<'a, Impl: Client + 'a> { reverse: bool, } -impl<'a, Impl: Client + 'a> Scan<'a, Impl> { - fn new(client: &'a Impl, range: (Key, Key), limit: u32) -> Self { +impl<'a> Scan<'a> { + fn new(client: &'a Client, range: (Key, Key), limit: u32) -> Self { Scan { client, range, @@ -254,7 +254,7 @@ impl<'a, Impl: Client + 'a> Scan<'a, Impl> { } } -impl<'a, Impl: Client + 'a> Future for Scan<'a, Impl> { +impl<'a> Future for Scan<'a> { type Item = Vec; type Error = (); @@ -268,8 +268,8 @@ impl<'a, Impl: Client + 'a> Future for Scan<'a, Impl> { } } -pub struct BatchScan<'a, Impl: Client + 'a> { - client: &'a Impl, +pub struct BatchScan<'a> { + client: &'a Client, ranges: Vec<(Key, Key)>, each_limit: u32, key_only: bool, @@ -277,8 +277,8 @@ pub struct BatchScan<'a, Impl: Client + 'a> { reverse: bool, } -impl<'a, Impl: Client + 'a> BatchScan<'a, Impl> { - fn new(client: &'a Impl, ranges: Vec<(Key, Key)>, each_limit: u32) -> Self { +impl<'a> BatchScan<'a> { + fn new(client: &'a Client, ranges: Vec<(Key, Key)>, each_limit: u32) -> Self { BatchScan { client, ranges, @@ -305,7 +305,7 @@ impl<'a, Impl: Client + 'a> BatchScan<'a, Impl> { } } -impl<'a, Impl: Client + 'a> Future for BatchScan<'a, Impl> { +impl<'a> Future for BatchScan<'a> { type Item = Vec; type Error = (); @@ -319,14 +319,14 @@ impl<'a, Impl: Client + 'a> Future for BatchScan<'a, Impl> { } } -pub struct DeleteRange<'a, Impl: Client + 'a> { - client: &'a Impl, +pub struct DeleteRange<'a> { + client: &'a Client, range: (Key, Key), cf: Option, } -impl<'a, Impl: Client + 'a> DeleteRange<'a, Impl> { - fn new(client: &'a Impl, range: (Key, Key)) -> Self { +impl<'a> DeleteRange<'a> { + fn new(client: &'a Client, range: (Key, Key)) -> Self { DeleteRange { client, range, @@ -340,7 +340,7 @@ impl<'a, Impl: Client + 'a> DeleteRange<'a, Impl> { } } -impl<'a, Impl: Client + 'a> Future for DeleteRange<'a, Impl> { +impl<'a> Future for DeleteRange<'a> { type Item = (); type Error = (); @@ -352,35 +352,9 @@ impl<'a, Impl: Client + 'a> Future for DeleteRange<'a, Impl> { } } -pub trait Client { - type Impl: Client; +pub struct Client; - fn get(&self, key: impl AsRef) -> Get; - - fn batch_get(&self, keys: impl AsRef<[Key]>) -> BatchGet; - - fn put(&self, pair: impl Into) -> Put; - - fn batch_put(&self, pairs: impl IntoIterator>) - -> BatchPut; - - fn delete(&self, key: impl AsRef) -> Delete; - - fn batch_delete(&self, keys: impl AsRef<[Key]>) -> BatchDelete; - - fn scan(&self, range: impl RangeBounds, limit: u32) -> Scan; - - fn batch_scan(&self, ranges: Ranges, each_limit: u32) -> BatchScan - where - Ranges: AsRef<[Bounds]>, - Bounds: RangeBounds; - - fn delete_range(&self, range: impl RangeBounds) -> DeleteRange; -} - -pub struct RawClient; - -impl RawClient { +impl Client { pub fn new(_config: &Config) -> KvFuture { unimplemented!() } @@ -388,40 +362,36 @@ impl RawClient { fn extract_range(_range: &impl RangeBounds) -> (Key, Key) { unimplemented!() } -} - -impl Client for RawClient { - type Impl = Self; - fn get(&self, key: impl AsRef) -> Get { + pub fn get(&self, key: impl AsRef) -> Get { Get::new(self, key.as_ref().clone()) } - fn batch_get(&self, keys: impl AsRef<[Key]>) -> BatchGet { + pub fn batch_get(&self, keys: impl AsRef<[Key]>) -> BatchGet { BatchGet::new(self, keys.as_ref().to_vec()) } - fn put(&self, pair: impl Into) -> Put { + pub fn put(&self, pair: impl Into) -> Put { Put::new(self, pair.into()) } - fn batch_put(&self, pairs: impl IntoIterator>) -> BatchPut { + pub fn batch_put(&self, pairs: impl IntoIterator>) -> BatchPut { BatchPut::new(self, pairs.into_iter().map(Into::into).collect()) } - fn delete(&self, key: impl AsRef) -> Delete { + pub fn delete(&self, key: impl AsRef) -> Delete { Delete::new(self, key.as_ref().clone()) } - fn batch_delete(&self, keys: impl AsRef<[Key]>) -> BatchDelete { + pub fn batch_delete(&self, keys: impl AsRef<[Key]>) -> BatchDelete { BatchDelete::new(self, keys.as_ref().to_vec()) } - fn scan(&self, range: impl RangeBounds, limit: u32) -> Scan { + pub fn scan(&self, range: impl RangeBounds, limit: u32) -> Scan { Scan::new(self, Self::extract_range(&range), limit) } - fn batch_scan(&self, ranges: Ranges, each_limit: u32) -> BatchScan + pub fn batch_scan(&self, ranges: Ranges, each_limit: u32) -> BatchScan where Ranges: AsRef<[Bounds]>, Bounds: RangeBounds, @@ -433,7 +403,7 @@ impl Client for RawClient { ) } - fn delete_range(&self, range: impl RangeBounds) -> DeleteRange { + pub fn delete_range(&self, range: impl RangeBounds) -> DeleteRange { DeleteRange::new(self, Self::extract_range(&range)) } } From 131248837549205a92632b99e1ea466ca60d106f Mon Sep 17 00:00:00 2001 From: Xiaoguang Sun Date: Sat, 17 Nov 2018 00:13:11 +0800 Subject: [PATCH 12/17] Do not use trait in APIs Signed-off-by: Xiaoguang Sun --- examples/raw.rs | 2 +- examples/transaction.rs | 12 ++--- src/lib.rs | 5 +- src/raw.rs | 32 ++++++------ src/transaction.rs | 109 ++++++++++++++++------------------------ 5 files changed, 69 insertions(+), 91 deletions(-) diff --git a/examples/raw.rs b/examples/raw.rs index 88a5b224..28754099 100644 --- a/examples/raw.rs +++ b/examples/raw.rs @@ -13,7 +13,7 @@ fn main() { let key: Key = b"Company".to_vec().into(); let value: Value = b"PingCAP".to_vec().into(); - raw.put((key.clone(), value.clone())) + raw.put(key.clone(), value.clone()) .cf("test_cf") .wait() .expect("Could not put kv pair to tikv"); diff --git a/examples/transaction.rs b/examples/transaction.rs index a6ae2262..5104466e 100644 --- a/examples/transaction.rs +++ b/examples/transaction.rs @@ -4,10 +4,10 @@ extern crate tikv_client; use std::ops::RangeBounds; use futures::{future, Future, Stream}; -use tikv_client::transaction::{Client, IsolationLevel, Mutator, Retriever, TxnClient}; +use tikv_client::transaction::{Client, IsolationLevel}; use tikv_client::*; -fn puts(client: &TxnClient, pairs: impl IntoIterator>) { +fn puts(client: &Client, pairs: impl IntoIterator>) { let mut txn = client.begin(); let _: Vec<()> = future::join_all(pairs.into_iter().map(Into::into).map(|p| txn.set(p))) .wait() @@ -15,12 +15,12 @@ fn puts(client: &TxnClient, pairs: impl IntoIterator>) txn.commit().wait().expect("Could not commit transaction"); } -fn get(client: &TxnClient, key: &Key) -> Value { +fn get(client: &Client, key: &Key) -> Value { let txn = client.begin(); txn.get(key).wait().expect("Could not get value") } -fn scan(client: &TxnClient, range: impl RangeBounds, mut limit: usize) { +fn scan(client: &Client, range: impl RangeBounds, mut limit: usize) { client .begin() .scan(range) @@ -38,7 +38,7 @@ fn scan(client: &TxnClient, range: impl RangeBounds, mut limit: usize) { .expect("Could not scan keys"); } -fn dels(client: &TxnClient, keys: impl IntoIterator) { +fn dels(client: &Client, keys: impl IntoIterator) { let mut txn = client.begin(); txn.set_isolation_level(IsolationLevel::ReadCommitted); let _: Vec<()> = keys @@ -51,7 +51,7 @@ fn dels(client: &TxnClient, keys: impl IntoIterator) { fn main() { let config = Config::new(vec!["127.0.0.1:3379"]); - let txn = TxnClient::new(&config) + let txn = Client::new(&config) .wait() .expect("Could not connect to tikv"); diff --git a/src/lib.rs b/src/lib.rs index 2bdd68e4..9b2d70b3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,4 @@ +#![cfg_attr(not(feature = "cargo-clippy"), allow(unknown_lints))] extern crate futures; extern crate serde; #[macro_use] @@ -13,8 +14,6 @@ pub mod transaction; use std::ops::Deref; use std::path::PathBuf; -use futures::Future; - pub use errors::Error; pub use errors::Result; @@ -25,8 +24,6 @@ pub struct Value(Vec); #[derive(Default, Clone, Eq, PartialEq, Debug)] pub struct KvPair(Key, Value); -pub type KvFuture = Box + Send>; - impl Into for Vec { fn into(self) -> Key { Key(self) diff --git a/src/raw.rs b/src/raw.rs index 41b701f8..8abb8e76 100644 --- a/src/raw.rs +++ b/src/raw.rs @@ -1,8 +1,8 @@ use std::ops::RangeBounds; -use futures::{Future, Poll}; +use futures::{future, Future, Poll}; -use {Config, Key, KvFuture, KvPair, Value}; +use {Config, Error, Key, KvPair, Value}; #[derive(Default, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug)] pub struct ColumnFamily(String); @@ -87,15 +87,17 @@ impl<'a> Future for BatchGet<'a> { pub struct Put<'a> { client: &'a Client, - pair: KvPair, + key: Key, + value: Value, cf: Option, } impl<'a> Put<'a> { - fn new(client: &'a Client, pair: KvPair) -> Self { + fn new(client: &'a Client, key: Key, value: Value) -> Self { Put { client, - pair, + key, + value, cf: None, } } @@ -112,7 +114,8 @@ impl<'a> Future for Put<'a> { fn poll(&mut self) -> Poll { let _ = &self.client; - let _ = &self.pair; + let _ = &self.key; + let _ = &self.value; let _ = &self.cf; unimplemented!() } @@ -355,12 +358,9 @@ impl<'a> Future for DeleteRange<'a> { pub struct Client; impl Client { - pub fn new(_config: &Config) -> KvFuture { - unimplemented!() - } - - fn extract_range(_range: &impl RangeBounds) -> (Key, Key) { - unimplemented!() + #![allow(new_ret_no_self)] + pub fn new(_config: &Config) -> impl Future + Send { + future::ok(Client {}) } pub fn get(&self, key: impl AsRef) -> Get { @@ -371,8 +371,8 @@ impl Client { BatchGet::new(self, keys.as_ref().to_vec()) } - pub fn put(&self, pair: impl Into) -> Put { - Put::new(self, pair.into()) + pub fn put(&self, key: impl Into, value: impl Into) -> Put { + Put::new(self, key.into(), value.into()) } pub fn batch_put(&self, pairs: impl IntoIterator>) -> BatchPut { @@ -406,4 +406,8 @@ impl Client { pub fn delete_range(&self, range: impl RangeBounds) -> DeleteRange { DeleteRange::new(self, Self::extract_range(&range)) } + + fn extract_range(_range: &impl RangeBounds) -> (Key, Key) { + unimplemented!() + } } diff --git a/src/transaction.rs b/src/transaction.rs index 0519e037..babb1ade 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -1,9 +1,8 @@ use std::ops::RangeBounds; -use Error; -use futures::{Poll, Stream}; +use futures::{future, Future, Poll, Stream}; -use {Config, Key, KvFuture, KvPair, Value}; +use {Config, Error, Key, KvPair, Value}; #[derive(Copy, Clone)] pub struct Timestamp(u64); @@ -39,22 +38,6 @@ impl Stream for Scanner { } } -pub trait Retriever { - fn get(&self, key: impl AsRef) -> KvFuture; - - fn batch_get(&self, keys: impl AsRef<[Key]>) -> KvFuture>; - - fn scan(&self, range: impl RangeBounds) -> Scanner; - - fn scan_reverse(&self, range: impl RangeBounds) -> Scanner; -} - -pub trait Mutator { - fn set(&mut self, pair: impl Into) -> KvFuture<()>; - - fn delete(&mut self, key: impl AsRef) -> KvFuture<()>; -} - #[derive(Copy, Clone, Eq, PartialEq, Debug)] pub enum IsolationLevel { SnapshotIsolation, @@ -64,17 +47,20 @@ pub enum IsolationLevel { pub struct Transaction; impl Transaction { - pub fn commit(self) -> KvFuture<()> { - unimplemented!() + pub fn commit(self) -> impl Future + Send { + future::ok(()) } - pub fn rollback(self) -> KvFuture<()> { - unimplemented!() + pub fn rollback(self) -> impl Future + Send { + future::ok(()) } - pub fn lock_keys(&mut self, keys: impl AsRef<[Key]>) -> KvFuture<()> { + pub fn lock_keys( + &mut self, + keys: impl AsRef<[Key]>, + ) -> impl Future + Send { drop(keys); - unimplemented!() + future::ok(()) } pub fn is_readonly(&self) -> bool { @@ -92,98 +78,89 @@ impl Transaction { pub fn set_isolation_level(&mut self, _level: IsolationLevel) { unimplemented!() } -} -impl Retriever for Transaction { - fn get(&self, key: impl AsRef) -> KvFuture { + pub fn get(&self, key: impl AsRef) -> impl Future + Send { drop(key); - unimplemented!() + future::ok(b"".to_vec().into()) } - fn batch_get(&self, keys: impl AsRef<[Key]>) -> KvFuture> { + pub fn batch_get( + &self, + keys: impl AsRef<[Key]>, + ) -> impl Future, Error = Error> + Send { drop(keys); - unimplemented!() + future::ok(Vec::new()) } - fn scan(&self, range: impl RangeBounds) -> Scanner { + pub fn scan(&self, range: impl RangeBounds) -> Scanner { drop(range); unimplemented!() } - fn scan_reverse(&self, range: impl RangeBounds) -> Scanner { + pub fn scan_reverse(&self, range: impl RangeBounds) -> Scanner { drop(range); unimplemented!() } -} -impl Mutator for Transaction { - fn set(&mut self, pair: impl Into) -> KvFuture<()> { + pub fn set(&mut self, pair: impl Into) -> impl Future + Send { drop(pair); - unimplemented!() + future::ok(()) } - fn delete(&mut self, key: impl AsRef) -> KvFuture<()> { + pub fn delete(&mut self, key: impl AsRef) -> impl Future + Send { drop(key); - unimplemented!() + future::ok(()) } } pub struct Snapshot; -impl Retriever for Snapshot { - fn get(&self, key: impl AsRef) -> KvFuture { +impl Snapshot { + pub fn get(&self, key: impl AsRef) -> impl Future + Send { drop(key); - unimplemented!() + future::ok(b"".to_vec().into()) } - fn batch_get(&self, keys: impl AsRef<[Key]>) -> KvFuture> { + pub fn batch_get( + &self, + keys: impl AsRef<[Key]>, + ) -> impl Future, Error = Error> + Send { drop(keys); - unimplemented!() + future::ok(Vec::new()) } - fn scan(&self, range: impl RangeBounds) -> Scanner { + pub fn scan(&self, range: impl RangeBounds) -> Scanner { drop(range); unimplemented!() } - fn scan_reverse(&self, range: impl RangeBounds) -> Scanner { + pub fn scan_reverse(&self, range: impl RangeBounds) -> Scanner { drop(range); unimplemented!() } } -pub trait Client { - fn begin(&self) -> Transaction; - - fn begin_with_timestamp(&self, _timestamp: Timestamp) -> Transaction; - - fn snapshot(&self) -> Snapshot; +pub struct Client {} - fn current_timestamp(&self) -> Timestamp; -} - -pub struct TxnClient {} - -impl TxnClient { - pub fn new(_config: &Config) -> KvFuture { - unimplemented!() +impl Client { + #![allow(new_ret_no_self)] + pub fn new(_config: &Config) -> impl Future + Send { + future::ok(Client {}) } -} -impl Client for TxnClient { - fn begin(&self) -> Transaction { + pub fn begin(&self) -> Transaction { unimplemented!() } - fn begin_with_timestamp(&self, _timestamp: Timestamp) -> Transaction { + pub fn begin_with_timestamp(&self, _timestamp: Timestamp) -> Transaction { unimplemented!() } - fn snapshot(&self) -> Snapshot { + pub fn snapshot(&self) -> Snapshot { unimplemented!() } - fn current_timestamp(&self) -> Timestamp { + pub fn current_timestamp(&self) -> Timestamp { unimplemented!() } } From fd66665661b4ead5aa936e929f4ed3fb07c46af2 Mon Sep 17 00:00:00 2001 From: Xiaoguang Sun Date: Sat, 17 Nov 2018 00:44:41 +0800 Subject: [PATCH 13/17] Do not allow unknown_lints Signed-off-by: Xiaoguang Sun --- src/lib.rs | 1 - src/raw.rs | 2 +- src/transaction.rs | 2 +- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 9b2d70b3..b0f83bbe 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,3 @@ -#![cfg_attr(not(feature = "cargo-clippy"), allow(unknown_lints))] extern crate futures; extern crate serde; #[macro_use] diff --git a/src/raw.rs b/src/raw.rs index 8abb8e76..687e50f0 100644 --- a/src/raw.rs +++ b/src/raw.rs @@ -358,7 +358,7 @@ impl<'a> Future for DeleteRange<'a> { pub struct Client; impl Client { - #![allow(new_ret_no_self)] + #![cfg_attr(feature = "cargo-clippy", allow(new_ret_no_self))] pub fn new(_config: &Config) -> impl Future + Send { future::ok(Client {}) } diff --git a/src/transaction.rs b/src/transaction.rs index babb1ade..9ce8909d 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -143,7 +143,7 @@ impl Snapshot { pub struct Client {} impl Client { - #![allow(new_ret_no_self)] + #![cfg_attr(feature = "cargo-clippy", allow(new_ret_no_self))] pub fn new(_config: &Config) -> impl Future + Send { future::ok(Client {}) } From 233eb9e273d1a471216845b2d152408e11f90080 Mon Sep 17 00:00:00 2001 From: Xiaoguang Sun Date: Sun, 18 Nov 2018 23:10:57 +0800 Subject: [PATCH 14/17] Change transactional api to use concrete future Signed-off-by: Xiaoguang Sun --- examples/transaction.rs | 10 +- src/lib.rs | 14 +++ src/raw.rs | 28 ++++- src/transaction.rs | 230 ++++++++++++++++++++++++++++++++-------- 4 files changed, 233 insertions(+), 49 deletions(-) diff --git a/examples/transaction.rs b/examples/transaction.rs index 5104466e..2278cd2f 100644 --- a/examples/transaction.rs +++ b/examples/transaction.rs @@ -9,9 +9,13 @@ use tikv_client::*; fn puts(client: &Client, pairs: impl IntoIterator>) { let mut txn = client.begin(); - let _: Vec<()> = future::join_all(pairs.into_iter().map(Into::into).map(|p| txn.set(p))) - .wait() - .expect("Could not set key value pairs"); + let _: Vec<()> = future::join_all( + pairs + .into_iter() + .map(Into::into) + .map(|p| txn.set(p.key().clone(), p.value().clone())), + ).wait() + .expect("Could not set key value pairs"); txn.commit().wait().expect("Could not commit transaction"); } diff --git a/src/lib.rs b/src/lib.rs index b0f83bbe..01e6917d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -57,6 +57,20 @@ impl Deref for Value { } } +impl KvPair { + pub fn new(key: Key, value: Value) -> Self { + KvPair(key, value) + } + + pub fn key(&self) -> &Key { + &self.0 + } + + pub fn value(&self) -> &Value { + &self.1 + } +} + impl Into for (Key, Value) { fn into(self) -> KvPair { KvPair(self.0, self.1) diff --git a/src/raw.rs b/src/raw.rs index 687e50f0..5b82fb33 100644 --- a/src/raw.rs +++ b/src/raw.rs @@ -1,6 +1,6 @@ use std::ops::RangeBounds; -use futures::{future, Future, Poll}; +use futures::{Future, Poll}; use {Config, Error, Key, KvPair, Value}; @@ -42,7 +42,7 @@ impl<'a> Get<'a> { impl<'a> Future for Get<'a> { type Item = Value; - type Error = (); + type Error = Error; fn poll(&mut self) -> Poll { let _ = &self.client; @@ -355,12 +355,32 @@ impl<'a> Future for DeleteRange<'a> { } } +pub struct Connect { + config: Config, +} + +impl Connect { + fn new(config: Config) -> Self { + Connect { config } + } +} + +impl Future for Connect { + type Item = Client; + type Error = Error; + + fn poll(&mut self) -> Poll { + let _config = &self.config; + unimplemented!() + } +} + pub struct Client; impl Client { #![cfg_attr(feature = "cargo-clippy", allow(new_ret_no_self))] - pub fn new(_config: &Config) -> impl Future + Send { - future::ok(Client {}) + pub fn new(config: &Config) -> Connect { + Connect::new(config.clone()) } pub fn get(&self, key: impl AsRef) -> Get { diff --git a/src/transaction.rs b/src/transaction.rs index 9ce8909d..6f53efe4 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -1,6 +1,6 @@ use std::ops::RangeBounds; -use futures::{future, Future, Poll, Stream}; +use futures::{Future, Poll, Stream}; use {Config, Error, Key, KvPair, Value}; @@ -44,23 +44,163 @@ pub enum IsolationLevel { ReadCommitted, } -pub struct Transaction; +pub struct Get { + key: Key, +} + +impl Get { + fn new(key: Key) -> Self { + Get { key } + } +} + +impl Future for Get { + type Item = Value; + type Error = Error; + + fn poll(&mut self) -> Poll { + let _key = &self.key; + unimplemented!() + } +} + +pub struct BatchGet { + keys: Vec, +} + +impl BatchGet { + fn new(keys: Vec) -> Self { + BatchGet { keys } + } +} + +impl Future for BatchGet { + type Item = Value; + type Error = Error; + + fn poll(&mut self) -> Poll { + let _keys = &self.keys; + unimplemented!() + } +} + +pub struct Commit { + txn: Transaction, +} + +impl Commit { + fn new(txn: Transaction) -> Self { + Commit { txn } + } +} + +impl Future for Commit { + type Item = (); + type Error = Error; + + fn poll(&mut self) -> Poll { + let _txn = &self.txn; + unimplemented!() + } +} + +pub struct Rollback { + txn: Transaction, +} + +impl Rollback { + fn new(txn: Transaction) -> Self { + Rollback { txn } + } +} + +impl Future for Rollback { + type Item = (); + type Error = Error; + + fn poll(&mut self) -> Poll { + let _txn = &self.txn; + unimplemented!() + } +} + +pub struct LockKeys { + keys: Vec, +} + +impl LockKeys { + fn new(keys: Vec) -> Self { + LockKeys { keys } + } +} + +impl Future for LockKeys { + type Item = (); + type Error = Error; + + fn poll(&mut self) -> Poll { + let _keys = &self.keys; + unimplemented!() + } +} + +pub struct Set { + key: Key, + value: Value, +} + +impl Set { + fn new(key: Key, value: Value) -> Self { + Set { key, value } + } +} + +impl Future for Set { + type Item = (); + type Error = Error; + + fn poll(&mut self) -> Poll { + let _key = &self.key; + let _value = &self.value; + unimplemented!() + } +} + +pub struct Delete { + key: Key, +} + +impl Delete { + fn new(key: Key) -> Self { + Delete { key } + } +} + +impl Future for Delete { + type Item = (); + type Error = Error; + + fn poll(&mut self) -> Poll { + let _key = &self.key; + unimplemented!() + } +} + +pub struct Transaction { + snapshot: Snapshot, +} impl Transaction { - pub fn commit(self) -> impl Future + Send { - future::ok(()) + pub fn commit(self) -> Commit { + Commit::new(self) } - pub fn rollback(self) -> impl Future + Send { - future::ok(()) + pub fn rollback(self) -> Rollback { + Rollback::new(self) } - pub fn lock_keys( - &mut self, - keys: impl AsRef<[Key]>, - ) -> impl Future + Send { - drop(keys); - future::ok(()) + pub fn lock_keys(&mut self, keys: impl AsRef<[Key]>) -> LockKeys { + LockKeys::new(keys.as_ref().to_vec().clone()) } pub fn is_readonly(&self) -> bool { @@ -79,54 +219,40 @@ impl Transaction { unimplemented!() } - pub fn get(&self, key: impl AsRef) -> impl Future + Send { - drop(key); - future::ok(b"".to_vec().into()) + pub fn get(&self, key: impl AsRef) -> Get { + self.snapshot.get(key) } - pub fn batch_get( - &self, - keys: impl AsRef<[Key]>, - ) -> impl Future, Error = Error> + Send { - drop(keys); - future::ok(Vec::new()) + pub fn batch_get(&self, keys: impl AsRef<[Key]>) -> BatchGet { + self.snapshot.batch_get(keys) } pub fn scan(&self, range: impl RangeBounds) -> Scanner { - drop(range); - unimplemented!() + self.snapshot.scan(range) } pub fn scan_reverse(&self, range: impl RangeBounds) -> Scanner { - drop(range); - unimplemented!() + self.snapshot.scan_reverse(range) } - pub fn set(&mut self, pair: impl Into) -> impl Future + Send { - drop(pair); - future::ok(()) + pub fn set(&mut self, key: impl Into, value: impl Into) -> Set { + Set::new(key.into(), value.into()) } - pub fn delete(&mut self, key: impl AsRef) -> impl Future + Send { - drop(key); - future::ok(()) + pub fn delete(&mut self, key: impl AsRef) -> Delete { + Delete::new(key.as_ref().clone()) } } pub struct Snapshot; impl Snapshot { - pub fn get(&self, key: impl AsRef) -> impl Future + Send { - drop(key); - future::ok(b"".to_vec().into()) + pub fn get(&self, key: impl AsRef) -> Get { + Get::new(key.as_ref().clone()) } - pub fn batch_get( - &self, - keys: impl AsRef<[Key]>, - ) -> impl Future, Error = Error> + Send { - drop(keys); - future::ok(Vec::new()) + pub fn batch_get(&self, keys: impl AsRef<[Key]>) -> BatchGet { + BatchGet::new(keys.as_ref().to_vec().clone()) } pub fn scan(&self, range: impl RangeBounds) -> Scanner { @@ -140,12 +266,32 @@ impl Snapshot { } } +pub struct Connect { + config: Config, +} + +impl Connect { + fn new(config: Config) -> Self { + Connect { config } + } +} + +impl Future for Connect { + type Item = Client; + type Error = Error; + + fn poll(&mut self) -> Poll { + let _config = &self.config; + unimplemented!() + } +} + pub struct Client {} impl Client { #![cfg_attr(feature = "cargo-clippy", allow(new_ret_no_self))] - pub fn new(_config: &Config) -> impl Future + Send { - future::ok(Client {}) + pub fn new(config: &Config) -> Connect { + Connect::new(config.clone()) } pub fn begin(&self) -> Transaction { From 1c34827879ddce46fddc5907f899a155188399d4 Mon Sep 17 00:00:00 2001 From: Xiaoguang Sun Date: Mon, 19 Nov 2018 19:55:41 +0800 Subject: [PATCH 15/17] Use ToString to convert input to ColumnFamily Signed-off-by: Xiaoguang Sun --- src/raw.rs | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/src/raw.rs b/src/raw.rs index 5b82fb33..18a8ea0f 100644 --- a/src/raw.rs +++ b/src/raw.rs @@ -7,15 +7,12 @@ use {Config, Error, Key, KvPair, Value}; #[derive(Default, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug)] pub struct ColumnFamily(String); -impl Into for String { - fn into(self) -> ColumnFamily { - ColumnFamily(self) - } -} - -impl<'a> Into for &'a str { - fn into(self) -> ColumnFamily { - ColumnFamily(self.to_owned()) +impl From for ColumnFamily +where + T: ToString, +{ + fn from(i: T) -> ColumnFamily { + ColumnFamily(i.to_string()) } } From 6f94151e5f887fb065223cfc745cdae3c5e331f1 Mon Sep 17 00:00:00 2001 From: Xiaoguang Sun Date: Mon, 19 Nov 2018 22:12:09 +0800 Subject: [PATCH 16/17] Change Config to builder style Signed-off-by: Xiaoguang Sun --- src/lib.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 01e6917d..182a25d0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -98,16 +98,14 @@ impl Config { } pub fn with_security( - pd_endpoints: impl IntoIterator>, + mut self, ca_path: PathBuf, cert_path: PathBuf, key_path: PathBuf, ) -> Self { - Config { - pd_endpoints: pd_endpoints.into_iter().map(Into::into).collect(), - ca_path: Some(ca_path), - cert_path: Some(cert_path), - key_path: Some(key_path), - } + self.ca_path = Some(ca_path); + self.cert_path = Some(cert_path); + self.key_path = Some(key_path); + self } } From f02d339c9f4f14fa980e73d8f6b45e853d104d36 Mon Sep 17 00:00:00 2001 From: Xiaoguang Sun Date: Mon, 19 Nov 2018 22:18:46 +0800 Subject: [PATCH 17/17] Config::with_security use impl Into Signed-off-by: Xiaoguang Sun --- examples/raw.rs | 8 +++++++- examples/transaction.rs | 7 ++++++- src/lib.rs | 12 ++++++------ 3 files changed, 19 insertions(+), 8 deletions(-) diff --git a/examples/raw.rs b/examples/raw.rs index 28754099..4c5887a2 100644 --- a/examples/raw.rs +++ b/examples/raw.rs @@ -1,11 +1,17 @@ extern crate futures; extern crate tikv_client; +use std::path::PathBuf; + use futures::future::Future; use tikv_client::*; fn main() { - let config = Config::new(vec!["127.0.0.1:3379"]); + let config = Config::new(vec!["127.0.0.1:3379"]).with_security( + PathBuf::from("/path/to/ca.pem"), + PathBuf::from("/path/to/client.pem"), + PathBuf::from("/path/to/client-key.pem"), + ); let raw = raw::Client::new(&config) .wait() .expect("Could not connect to tikv"); diff --git a/examples/transaction.rs b/examples/transaction.rs index 2278cd2f..032c651e 100644 --- a/examples/transaction.rs +++ b/examples/transaction.rs @@ -2,6 +2,7 @@ extern crate futures; extern crate tikv_client; use std::ops::RangeBounds; +use std::path::PathBuf; use futures::{future, Future, Stream}; use tikv_client::transaction::{Client, IsolationLevel}; @@ -54,7 +55,11 @@ fn dels(client: &Client, keys: impl IntoIterator) { } fn main() { - let config = Config::new(vec!["127.0.0.1:3379"]); + let config = Config::new(vec!["127.0.0.1:3379"]).with_security( + PathBuf::from("/path/to/ca.pem"), + PathBuf::from("/path/to/client.pem"), + PathBuf::from("/path/to/client-key.pem"), + ); let txn = Client::new(&config) .wait() .expect("Could not connect to tikv"); diff --git a/src/lib.rs b/src/lib.rs index 182a25d0..772cc2f0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -99,13 +99,13 @@ impl Config { pub fn with_security( mut self, - ca_path: PathBuf, - cert_path: PathBuf, - key_path: PathBuf, + ca_path: impl Into, + cert_path: impl Into, + key_path: impl Into, ) -> Self { - self.ca_path = Some(ca_path); - self.cert_path = Some(cert_path); - self.key_path = Some(key_path); + self.ca_path = Some(ca_path.into()); + self.cert_path = Some(cert_path.into()); + self.key_path = Some(key_path.into()); self } }