From 40776379bc1b930f6f10c56292dba2392247d3e4 Mon Sep 17 00:00:00 2001 From: Hoverbear Date: Fri, 28 Dec 2018 11:10:17 -0800 Subject: [PATCH 1/3] Document most public code. Signed-off-by: Hoverbear --- examples/raw.rs | 79 +++--- examples/transaction.rs | 4 +- rust-toolchain | 2 +- src/errors.rs | 24 +- src/lib.rs | 398 ++++++++++++++++++++++++++--- src/raw.rs | 446 +++++++++++++++++++++++---------- src/rpc/client.rs | 4 +- src/rpc/tikv/client.rs | 4 +- src/transaction.rs | 537 +++++++++++++++++++++++++++++++--------- tests/raw.rs | 25 +- 10 files changed, 1182 insertions(+), 341 deletions(-) diff --git a/examples/raw.rs b/examples/raw.rs index 2f8b4d21..efa273ed 100644 --- a/examples/raw.rs +++ b/examples/raw.rs @@ -13,49 +13,69 @@ use futures::future::Future; use std::path::PathBuf; -use tikv_client::*; +use tikv_client::{Result, Config, raw::Client, Key, Value}; -fn main() { - let config = Config::new(vec!["127.0.0.1:2379"]).with_security( +const KEY: &str = "TiKV"; +const VALUE: &str = "Rust"; +const CUSTOM_CF: &str = "custom_cf"; + +fn main() -> Result<()> { + // Create a configuration to use for the example. + // Optionally encrypt the traffic. + let config = Config::new(vec![ + "192.168.0.101:3379", // Avoid a single point of failure, + "192.168.0.100:3379", // use at least two PD endpoints. + ]).with_security( PathBuf::from("/path/to/ca.pem"), PathBuf::from("/path/to/client.pem"), PathBuf::from("/path/to/client-key.pem"), ); - let raw = raw::Client::new(&config) - .wait() - .expect("Could not connect to tikv"); - let key: Key = b"Company".to_vec().into(); - let value: Value = b"PingCAP".to_vec().into(); + // When we first create a client we recieve a `Connect` structure which must be resolved before + // the client is actually connected and usable. + let unconnnected_client = Client::new(&config); + let client = unconnnected_client.wait()?; - raw.put(key.clone(), value.clone()) - .cf("test_cf") - .wait() - .expect("Could not put kv pair to tikv"); - println!("Successfully put {:?}:{:?} to tikv", key, value); + // Requests are created from the connected client. These calls return structures which + // implement `Future`. This means the `Future` must be resolved before the action ever takes + // place. + // + // Here we set the key `TiKV` to have the value `Rust` associated with it. + let put_request = client.put(KEY, VALUE); + let put_result: () = put_request.wait()?; // Returns a `tikv_client::Error` on failure. + println!("Put key \"{}\", value \"{}\".", KEY, VALUE); - let value = raw - .get(&key) - .cf("test_cf") - .wait() - .expect("Could not get value"); - println!("Found val: {:?} for key: {:?}", value, key); + // + // Unlike a standard Rust HashMap all calls take owned values. This is because under the hood + // protobufs must take ownership of the data. If we only took a borrow we'd need to internally // clone it. This is against Rust API guidelines, so you must manage this yourself. + // + // Above, you saw we can use a `&'static str`, this is primarily for making examples short. + // This type is practical to use for real things, and usage forces an internal copy. + // + // It is best to pass a `Vec` in terms of explictness and speed. `String`s and a few other + // types are supported as well, but it all ends up as `Vec` in the end. + let key: String = String::from(KEY); + let value: Value = client.get(key.clone()).wait()?; + assert_eq!(value.as_ref(), VALUE.as_bytes()); + println!("Get key \"{:?}\" returned value \"{:?}\".", value, KEY); - raw.delete(&key) - .cf("test_cf") + // You can also set the `ColumnFamily` used by the request. + // This is *advanced usage* and should have some special considerations. + let req = client.delete(key.clone()) + .cf(CUSTOM_CF) .wait() .expect("Could not delete value"); println!("Key: {:?} deleted", key); - raw.get(&key) + client.get(key) .cf("test_cf") .wait() .expect_err("Get returned value for not existing key"); - let keys = vec![b"k1".to_vec().into(), b"k2".to_vec().into()]; + let keys = vec![Key::from(b"k1".to_vec()), Key::from(b"k2".to_vec())]; - let values = raw - .batch_get(&keys) + let values = client + .batch_get(keys.clone()) .cf("test_cf") .wait() .expect("Could not get values"); @@ -63,16 +83,19 @@ fn main() { let start: Key = b"k1".to_vec().into(); let end: Key = b"k2".to_vec().into(); - raw.scan(&start..&end, 10) + client.scan(start.clone()..end.clone(), 10) .cf("test_cf") .key_only() .wait() .expect("Could not scan"); - let ranges = [&start..&end, &start..&end]; - raw.batch_scan(&ranges, 10) + let ranges = vec![start.clone()..end.clone(), start.clone()..end.clone()]; + client.batch_scan(ranges, 10) .cf("test_cf") .key_only() .wait() .expect("Could not batch scan"); + + // Cleanly exit. + Ok(()) } diff --git a/examples/transaction.rs b/examples/transaction.rs index 7904ca8c..f2e7a567 100644 --- a/examples/transaction.rs +++ b/examples/transaction.rs @@ -32,7 +32,7 @@ fn puts(client: &Client, pairs: impl IntoIterator>) { txn.commit().wait().expect("Could not commit transaction"); } -fn get(client: &Client, key: &Key) -> Value { +fn get(client: &Client, key: Key) -> Value { let txn = client.begin(); txn.get(key).wait().expect("Could not get value") } @@ -88,7 +88,7 @@ fn main() { // get let key1: Key = b"key1".to_vec().into(); - let value1 = get(&txn, &key1); + let value1 = get(&txn, key1.clone()); println!("{:?}", (key1, value1)); // scan diff --git a/rust-toolchain b/rust-toolchain index 870bbe4e..2bf5ad04 100644 --- a/rust-toolchain +++ b/rust-toolchain @@ -1 +1 @@ -stable \ No newline at end of file +stable diff --git a/src/errors.rs b/src/errors.rs index 95a8d542..f9c93275 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -11,55 +11,70 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{error, result}; - use grpcio; use quick_error::quick_error; +use std::{error, result}; -quick_error! { +quick_error!{ + /// An error originating from the TiKV client or dependencies. + /// + /// This client currently uses [`quick_error`](https://docs.rs/quick-error/1.2.2/quick_error/) + /// for errors. *This may change in future versions.* #[derive(Debug)] pub enum Error { + /// Wraps a a `std::io::Error`. Io(err: ::std::io::Error) { from() cause(err) description(err.description()) } + /// Wraps a `grpcio::Error`. Grpc(err: grpcio::Error) { from() cause(err) description(err.description()) } + /// Represents that a futures oneshot channel was cancelled. Canceled(err: ::futures::sync::oneshot::Canceled) { from() cause(err) description(err.description()) } + /// An unknown error. + /// + /// Generally, this is not an expected error. Please report it if encountered. Other(err: Box) { from() cause(err.as_ref()) description(err.description()) display("unknown error {:?}", err) } + /// A region was not found for the given key. RegionForKeyNotFound(key: Vec) { description("region is not found") display("region is not found for key {:?}", key) } + /// A region was not found. RegionNotFound(region_id: u64, message: Option) { description("region is not found") display("region {:?} is not found. {}", region_id, message.as_ref().unwrap_or(&"".to_owned())) } + /// The peer is not a leader of the given region. NotLeader(region_id: u64, message: Option) { description("peer is not leader") display("peer is not leader for region {}. {}", region_id, message.as_ref().unwrap_or(&"".to_owned())) } + /// The store does not match. StoreNotMatch(request_store_id: u64, actual_store_id: u64, message: String) { description("store not match") display("requesting store '{}' when actual store is '{}'. {}", request_store_id, actual_store_id, message) } + /// The given key is not eithin the given region. KeyNotInRegion(key: Vec, region_id: u64, start_key: Vec, end_key: Vec) { description("region is not found") display("key {:?} is not in region {:?}: [{:?}, {:?})", key, region_id, start_key, end_key) } + /// A stale epoch. StaleEpoch(message: Option) { description("stale epoch") display("{}", message.as_ref().unwrap_or(&"".to_owned())) @@ -68,10 +83,12 @@ quick_error! { description("stale command") display("{}", message) } + /// The server is too busy. ServerIsBusy(reason: String, backoff: u64) { description("server is busy") display("server is busy: {:?}. Backoff {} ms", reason, backoff) } + /// The given raft entry is too large for the region. RaftEntryTooLarge(region_id: u64, entry_size: u64, message: String) { description("raft entry too large") display("{:?} bytes raft entry of region {:?} is too large. {}", entry_size, region_id, message) @@ -111,4 +128,5 @@ quick_error! { } } +/// A result holding an [`Error`](enum.Error.html). pub type Result = result::Result; diff --git a/src/lib.rs b/src/lib.rs index 48cd651f..ccaabf40 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,37 +10,92 @@ // distributed under the License is distributed on an "AS IS" BASIS, // See the License for the specific language governing permissions and // limitations under the License. - #![recursion_limit = "128"] #![type_length_limit = "2097152"] -use std::{convert::AsRef, ops::Deref, path::PathBuf, time::Duration}; -use futures::Future; use serde_derive::*; +use std::{ + ops::{Deref, DerefMut, Bound, Range, RangeFrom, RangeFull, RangeInclusive, RangeTo, RangeToInclusive}, + path::PathBuf, + time::Duration, +}; +use futures::Future; -pub use crate::errors::{Error, Result}; - -pub mod errors; +mod errors; pub mod raw; mod rpc; pub mod transaction; +#[doc(inline)] +pub use crate::errors::Error; +#[doc(inline)] +pub use crate::errors::Result; + +/// The key part of a key/value pair. +/// +/// In TiKV, keys are an ordered sequence of bytes. This has an advantage over choosing `String` as valid `UTF-8` is not required. This means that the user is permitted to store any data they wish, +/// as long as it can be represented by bytes. (Which is to say, pretty much anything!) +/// +/// This is a *wrapper type* that implements `Deref>` so it can be used like one transparently. +/// +/// This type also implements `From` for many types. With one exception, these are all done without reallocation. Using a `&'static str`, like many examples do for simplicity, has an internal +/// allocation cost. +/// +/// This type wraps around an owned value, so it should be treated it like `String` or `Vec` +/// over a `&str` or `&[u8]`. +/// +/// ```rust +/// use tikv_client::Key; +/// +/// let static_str: &'static str = "TiKV"; +/// let from_static_str = Key::from(static_str); +/// +/// let string: String = String::from(static_str); +/// let from_string = Key::from(string); +/// assert_eq!(from_static_str, from_string); +/// +/// let vec: Vec = static_str.as_bytes().to_vec(); +/// let from_vec = Key::from(vec); +/// assert_eq!(from_static_str, from_vec); +/// +/// let bytes = static_str.as_bytes().to_vec(); +/// let from_bytes = Key::from(bytes); +/// assert_eq!(from_static_str, from_bytes); +/// ``` +/// +/// **But, you should not need to worry about all this:** Many functions which accept a `Key` +/// accept an `Into`, which means all of the above types can be passed directly to those +/// functions. #[derive(Default, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug)] pub struct Key(Vec); -#[derive(Default, Clone, Eq, PartialEq, Hash, Debug)] -pub struct Value(Vec); -#[derive(Default, Clone, Eq, PartialEq, Debug)] -pub struct KvPair(Key, Value); + +impl Key { + pub fn new(value: Vec) -> Self { + Key(value) + } + + fn into_inner(self) -> Vec { + self.0 + } +} impl From> for Key { - fn from(vec: Vec) -> Key { - Key(vec) + fn from(v: Vec) -> Self { + Key(v) } } -impl<'a> From<&'a [u8]> for Key { - fn from(s: &'a [u8]) -> Key { - Key(s.to_vec()) +impl From for Key { + fn from(v: String) -> Key { + Key(v.into_bytes()) + } +} + +impl<'a> From<&'static str> for Key { + fn from(v: &'static str) -> Key { + let mut vec = Vec::new(); + vec.extend_from_slice(v.as_bytes()); + Key(vec) } } @@ -64,33 +119,77 @@ impl Deref for Key { } } -impl Key { - fn into_inner(self) -> Vec { - self.0 +impl DerefMut for Key { + fn deref_mut<'a>(&'a mut self) -> &'a mut [u8] { + &mut self.0 } } -impl From> for Value { - fn from(vec: Vec) -> Value { - Value(vec) +/// The value part of a key/value pair. +/// +/// In TiKV, values are an ordered sequence of bytes. This has an advantage over choosing `String` as valid `UTF-8` is not required. This means that the user is permitted to store any data they wish, +/// as long as it can be represented by bytes. (Which is to say, pretty much anything!) +/// +/// This is a *wrapper type* that implements `Deref>` so it can be used like one transparently. +/// +/// This type also implements `From` for many types. With one exception, these are all done without reallocation. Using a `&'static str`, like many examples do for simplicity, has an internal +/// allocation cost. +/// +/// This type wraps around an owned value, so it should be treated it like `String` or `Vec` +/// over a `&str` or `&[u8]`. +/// +/// ```rust +/// use tikv_client::Value; +/// +/// let static_str: &'static str = "TiKV"; +/// let from_static_str = Value::from(static_str); +/// +/// let string: String = String::from(static_str); +/// let from_string = Value::from(string); +/// assert_eq!(from_static_str, from_string); +/// +/// let vec: Vec = static_str.as_bytes().to_vec(); +/// let from_vec = Value::from(vec); +/// assert_eq!(from_static_str, from_vec); +/// +/// let bytes = static_str.as_bytes().to_vec(); +/// let from_bytes = Value::from(bytes); +/// assert_eq!(from_static_str, from_bytes); +/// ``` +/// +/// **But, you should not need to worry about all this:** Many functions which accept a `Value` +/// accept an `Into`, which means all of the above types can be passed directly to those +/// functions. +#[derive(Default, Clone, Eq, PartialEq, Hash, Debug)] +pub struct Value(Vec); + +impl Value { + pub fn new(value: Vec) -> Self { + Value(value) + } + + fn into_inner(self) -> Vec { + self.0 } } -impl<'a> From<&'a [u8]> for Value { - fn from(s: &'a [u8]) -> Value { - Value(s.to_vec()) +impl From> for Value { + fn from(v: Vec) -> Self { + Value(v) } } -impl AsRef for Value { - fn as_ref(&self) -> &Self { - self +impl From for Value { + fn from(v: String) -> Value { + Value(v.into_bytes()) } } -impl AsRef<[u8]> for Value { - fn as_ref(&self) -> &[u8] { - &self.0 +impl From<&'static str> for Value { + fn from(v: &'static str) -> Value { + let mut vec = Vec::new(); + vec.extend_from_slice(v.as_bytes()); + Value(vec) } } @@ -102,21 +201,35 @@ impl Deref for Value { } } -impl Value { - fn into_inner(self) -> Vec { - self.0 - } -} +/// A key/value pair. +/// +/// ```rust +/// # use tikv_client::{Key, Value, KvPair}; +/// let key = "key"; +/// let value = "value"; +/// let constructed = KvPair::new(key, value); +/// let from_tuple = KvPair::from((key, value)); +/// assert_eq!(constructed, from_tuple); +/// ``` +/// +/// **But, you should not need to worry about all this:** Many functions which accept a `KvPair` +/// accept an `Into`, which means all of the above types can be passed directly to those +/// functions. +#[derive(Default, Clone, Eq, PartialEq, Debug)] +pub struct KvPair(Key, Value); impl KvPair { - pub fn new(key: Key, value: Value) -> Self { - KvPair(key, value) + /// Create a new `KvPair`. + pub fn new(key: impl Into, value: impl Into) -> Self { + KvPair(key.into(), value.into()) } + /// Immutably borrow the `Key` part of the `KvPair`. pub fn key(&self) -> &Key { &self.0 } + /// Immutably borrow the `Value` part of the `KvPair`. pub fn value(&self) -> &Value { &self.1 } @@ -124,14 +237,54 @@ impl KvPair { pub fn into_inner(self) -> (Key, Value) { (self.0, self.1) } + + pub fn into_key(self) -> Key { + self.0 + } + + pub fn into_value(self) -> Value { + self.1 + } + + /// Mutably borrow the `Key` part of the `KvPair`. + pub fn key_mut(&mut self) -> &mut Key { + &mut self.0 + } + + /// Mutably borrow the `Value` part of the `KvPair`. + pub fn value_mut(&mut self) -> &mut Value { + &mut self.1 + } + + /// Set the `Key` part of the `KvPair`. + pub fn set_key(&mut self, k: impl Into) { + self.0 = k.into(); + } + + /// Set the `Value` part of the `KvPair`. + pub fn set_value(&mut self, v: impl Into) { + self.1 = v.into(); + } } -impl From<(Key, Value)> for KvPair { - fn from(pair: (Key, Value)) -> KvPair { - KvPair(pair.0, pair.1) +impl From<(K, V)> for KvPair +where K: Into, V: Into { + fn from((k, v): (K, V)) -> Self { + KvPair(k.into(), v.into()) } } +/// The configuration for either a [`raw::Client`](raw/struct.Client.html) or a +/// [`transaction::Client`](transaction/struct.Client.html). +/// +/// Because TiKV is managed by a [PD](https://github.com/pingcap/pd/) cluster, the endpoints for PD +/// must be provided, **not** the TiKV nodes. +/// +/// It's important to **include more than one PD endpoint** (include all, if possible!) This helps avoid having a *single point of failure*. +/// +/// By default, this client will use an insecure connection over encryption-on-the-wire. Your +/// deployment may have chosen to rely on security measures such as a private network, or a VPN +/// layer providing secure transmission. TiKV does not currently offer encryption-at-rest. #[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)] #[serde(default)] #[serde(rename_all = "kebab-case")] @@ -144,6 +297,14 @@ pub struct Config { } impl Config { + /// Create a new [`Config`](struct.Config.html) which coordinates with the given PD endpoints. + /// + /// It's important to **include more than one PD endpoint** (include all, if possible!) This helps avoid having a *single point of failure*. + /// + /// ```rust + /// # use tikv_client::Config; + /// let config = Config::new(vec!["192.168.0.100:2379", "192.168.0.101:2379"]); + /// ``` pub fn new(pd_endpoints: impl IntoIterator>) -> Self { Config { pd_endpoints: pd_endpoints.into_iter().map(Into::into).collect(), @@ -154,6 +315,15 @@ impl Config { } } + /// Set the certificate authority, certificate, and key locations for the [`Config`](struct.Config.html). + /// + /// By default, TiKV connections do not have utilize transport layer security. Enable it by setting these values. + /// + /// ```rust + /// # use tikv_client::Config; + /// let config = Config::new(vec!["192.168.0.100:2379", "192.168.0.101:2379"]) + /// .with_security("root.ca", "internal.cert", "internal.key"); + /// ``` pub fn with_security( mut self, ca_path: impl Into, @@ -173,3 +343,151 @@ impl Config { } pub type KvFuture = Box>; + +/// A convenience trait for expressing ranges. +/// +/// In TiKV, keys are an ordered sequence of bytes. This means we can have ranges over those +/// bytes. Eg `001` is before `010`. +/// +/// This trait has implementations for common range types like `a..b`, `a..=b` where `a` and `b` +/// `impl Into`. You could implement this for your own types. +/// +/// ```rust +/// use tikv_client::{KeyRange, Key}; +/// use std::ops::{Range, RangeInclusive, RangeTo, RangeToInclusive, RangeFrom, RangeFull, Bound}; +/// +/// let explict_range: Range = Range { start: Key::from("Rust"), end: Key::from("TiKV") }; +/// let from_explict_range = explict_range.into_bounds(); +/// +/// let range: Range<&str> = "Rust".."TiKV"; +/// let from_range = range.into_bounds(); +/// assert_eq!(from_explict_range, from_range); +/// +/// let range: RangeInclusive<&str> = "Rust"..="TiKV"; +/// let from_range = range.into_bounds(); +/// assert_eq!( +/// (Bound::Included(Key::from("Rust")), Bound::Included(Key::from("TiKV"))), +/// from_range +/// ); +/// +/// let range_from: RangeFrom<&str> = "Rust"..; +/// let from_range_from = range_from.into_bounds(); +/// assert_eq!( +/// (Bound::Included(Key::from("Rust")), Bound::Unbounded), +/// from_range_from, +/// ); +/// +/// let range_to: RangeTo<&str> = .."TiKV"; +/// let from_range_to = range_to.into_bounds(); +/// assert_eq!( +/// (Bound::Unbounded, Bound::Excluded(Key::from("TiKV"))), +/// from_range_to, +/// ); +/// +/// let range_to_inclusive: RangeToInclusive<&str> = ..="TiKV"; +/// let from_range_to_inclusive = range_to_inclusive.into_bounds(); +/// assert_eq!( +/// (Bound::Unbounded, Bound::Included(Key::from("TiKV"))), +/// from_range_to_inclusive, +/// ); +/// +/// let range_full: RangeFull = ..; +/// let from_range_full = range_full.into_bounds(); +/// assert_eq!( +/// (Bound::Unbounded, Bound::Unbounded), +/// from_range_full +/// ); +/// ``` +/// +/// **But, you should not need to worry about all this:** Many functions accept a `impl KeyRange` +/// which means all of the above types can be passed directly to those functions. +pub trait KeyRange: Sized { + fn into_bounds(self) -> (Bound, Bound); + fn into_keys(self) -> Result<(Key, Option)> { + range_to_keys(self.into_bounds()) + } +} + +fn range_to_keys(range: (Bound, Bound)) -> Result<(Key, Option)> { + let start = match range.0 { + Bound::Included(v) => v, + Bound::Excluded(mut v) => { + let len = v.len(); + if len > 0 { + v.deref_mut().get_mut(len -1).map(|v| { *v += 1; v }); + } + v + }, + Bound::Unbounded => return Err(Error::InvalidKeyRange), + }; + let end = match range.1 { + Bound::Included(v) => Some(v), + Bound::Excluded(mut v) => Some({ + let len = v.len(); + if len > 0 { + v.deref_mut().get_mut(len -1).map(|v| { *v -= 1; v }); + } + v + }), + Bound::Unbounded => None, + }; + Ok((start, end)) +} + +impl> KeyRange for Range { + fn into_bounds(self) -> (Bound, Bound) { + ( + Bound::Included(self.start.into()), + Bound::Excluded(self.end.into()), + ) + } +} + +impl> KeyRange for RangeFrom { + fn into_bounds(self) -> (Bound, Bound) { + (Bound::Included(self.start.into()), Bound::Unbounded) + } +} + +impl KeyRange for RangeFull { + fn into_bounds(self) -> (Bound, Bound) { + (Bound::Unbounded, Bound::Unbounded) + } +} + +impl> KeyRange for RangeInclusive { + fn into_bounds(self) -> (Bound, Bound) { + let (start, end) = self.into_inner(); + (Bound::Included(start.into()), Bound::Included(end.into())) + } +} + +impl> KeyRange for RangeTo { + fn into_bounds(self) -> (Bound, Bound) { + (Bound::Unbounded, Bound::Excluded(self.end.into())) + } +} + +impl> KeyRange for RangeToInclusive { + fn into_bounds(self) -> (Bound, Bound) { + (Bound::Unbounded, Bound::Included(self.end.into())) + } +} + +impl> KeyRange for (Bound, Bound) { + fn into_bounds(self) -> (Bound, Bound) { + (transmute_bound(self.0), transmute_bound(self.1)) + } +} + +fn transmute_bound(b: Bound) -> Bound +where + K: Into, +{ + use std::ops::Bound::*; + match b { + Included(k) => Included(k.into()), + Excluded(k) => Excluded(k.into()), + Unbounded => Unbounded, + } +} diff --git a/src/raw.rs b/src/raw.rs index d6a060fb..13ed10cf 100644 --- a/src/raw.rs +++ b/src/raw.rs @@ -11,15 +11,282 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{ - ops::{Bound, Deref, DerefMut, RangeBounds}, - sync::Arc, -}; +/*! Raw related functionality. -use futures::{future, Async, Future, Poll}; +Using the [`raw::Client`](struct.Client.html) you can utilize TiKV's raw interface. -use crate::{rpc::RpcClient, Config, Error, Key, KvFuture, KvPair, Result, Value}; +This interface offers optimal performance as it does not require coordination with a timestamp +oracle, while the transactional interface does. +**Warning:** It is not advisible to use the both raw and transactional functionality in the same keyspace. + */ + +use crate::{rpc::RpcClient, Config, Error, Key, KeyRange, KvFuture, KvPair, Result, Value}; +use futures::{future, Future, Poll, Async}; +use std::{u32, sync::Arc, ops::{Bound, Deref, RangeBounds, DerefMut}}; + + +/// The TiKV raw [`Client`](struct.Client.html) is used to issue requests to the TiKV server and PD cluster. +pub struct Client { + rpc: Arc +} + +impl Client { + /// Create a new [`Client`](struct.Client.html) once the [`Connect`](struct.Connect.html) resolves. + /// + /// ```rust,no_run + /// use tikv_client::{Config, raw::Client}; + /// use futures::Future; + /// let connect = Client::new(&Config::default()); + /// let client = connect.wait(); + /// ``` + #[cfg_attr(feature = "cargo-clippy", allow(clippy::new_ret_no_self))] + pub fn new(config: &Config) -> Connect { + Connect::new(config.clone()) + } + + #[inline] + fn rpc(&self) -> Arc { + Arc::clone(&self.rpc) + } + + /// Create a new [`Get`](struct.Get.html) request. + /// + /// Once resolved this request will result in the fetching of the value associated with the + /// given key. + /// + /// ```rust,no_run + /// # use tikv_client::{Value, Config, raw::Client}; + /// # use futures::Future; + /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.wait().unwrap(); + /// let key = "TiKV"; + /// let req = connected_client.get(key); + /// let result: Value = req.wait().unwrap(); + /// ``` + pub fn get(&self, key: impl Into) -> Get { + Get::new(self.rpc(), GetInner::new(key.into())) + } + + /// Create a new [`BatchGet`](struct.BatchGet.html) request. + /// + /// Once resolved this request will result in the fetching of the values associated with the + /// given keys. + /// + /// ```rust,no_run + /// # use tikv_client::{KvPair, Config, raw::Client}; + /// # use futures::Future; + /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.wait().unwrap(); + /// let keys = vec!["TiKV", "TiDB"]; + /// let req = connected_client.batch_get(keys); + /// let result: Vec = req.wait().unwrap(); + /// ``` + pub fn batch_get(&self, keys: impl IntoIterator>) -> BatchGet { + BatchGet::new(self.rpc(), BatchGetInner::new(keys.into_iter().map(Into::into).collect())) + } + + /// Create a new [`Put`](struct.Put.html) request. + /// + /// Once resolved this request will result in the setting of the value associated with the given key. + /// + /// ```rust,no_run + /// # use tikv_client::{Key, Value, Config, raw::Client}; + /// # use futures::Future; + /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.wait().unwrap(); + /// let key = "TiKV"; + /// let val = "TiKV"; + /// let req = connected_client.put(key, val); + /// let result: () = req.wait().unwrap(); + /// ``` + pub fn put(&self, key: impl Into, value: impl Into) -> Put { + Put::new(self.rpc(), PutInner::new(key.into(), value.into())) + } + + /// Create a new [`BatchPut`](struct.BatchPut.html) request. + /// + /// Once resolved this request will result in the setting of the value associated with the given key. + /// + /// ```rust,no_run + /// # use tikv_client::{Error, Result, KvPair, Key, Value, Config, raw::Client}; + /// # use futures::Future; + /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.wait().unwrap(); + /// let kvpair1 = ("PD", "Go"); + /// let kvpair2 = ("TiKV", "Rust"); + /// let iterable = vec![kvpair1, kvpair2]; + /// let req = connected_client.batch_put(iterable); + /// let result: () = req.wait().unwrap(); + /// ``` + pub fn batch_put(&self, pairs: impl IntoIterator>) -> BatchPut { + BatchPut::new(self.rpc(), BatchPutInner::new(pairs.into_iter().map(Into::into).collect())) + } + + /// Create a new [`Delete`](struct.Delete.html) request. + /// + /// Once resolved this request will result in the deletion of the given key. + /// + /// ```rust,no_run + /// # use tikv_client::{Key, Config, raw::Client}; + /// # use futures::Future; + /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.wait().unwrap(); + /// let key = "TiKV"; + /// let req = connected_client.delete(key); + /// let result: () = req.wait().unwrap(); + /// ``` + pub fn delete(&self, key: impl Into) -> Delete { + Delete::new(self.rpc(), DeleteInner::new(key.into())) + } + + /// Create a new [`BatchDelete`](struct.BatchDelete.html) request. + /// + /// Once resolved this request will result in the deletion of the given keys. + /// + /// ```rust,no_run + /// # use tikv_client::{Config, raw::Client}; + /// # use futures::Future; + /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.wait().unwrap(); + /// let keys = vec!["TiKV", "TiDB"]; + /// let req = connected_client.batch_delete(keys); + /// let result: () = req.wait().unwrap(); + /// ``` + pub fn batch_delete(&self, keys: impl IntoIterator>) -> BatchDelete { + BatchDelete::new(self.rpc(), BatchDeleteInner::new(keys.into_iter().map(Into::into).collect())) + } + + /// Create a new [`Scan`](struct.Scan.html) request. + /// + /// Once resolved this request will result in a scanner over the given keys. + /// + /// If not passed a `limit` parameter, it will default to `u32::MAX`. + /// + /// ```rust,no_run + /// # use tikv_client::{KvPair, Config, raw::Client}; + /// # use futures::Future; + /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.wait().unwrap(); + /// let inclusive_range = "TiKV"..="TiDB"; + /// let req = connected_client.scan(inclusive_range, 2); + /// let result: Vec = req.wait().unwrap(); + /// ``` + pub fn scan(&self, range: impl KeyRange, limit: impl Into>) -> Scan { + Scan::new(self.rpc(), ScanInner::new(range.into_bounds(), limit.into().unwrap_or(u32::MAX))) + } + + /// Create a new [`BatchScan`](struct.BatchScan.html) request. + /// + /// Once resolved this request will result in a set of scanners over the given keys. + /// + /// If not passed a `limit` parameter, it will default to `u32::MAX`. + /// + /// ```rust,no_run + /// # use tikv_client::{Key, Config, raw::Client}; + /// # use futures::Future; + /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.wait().unwrap(); + /// let inclusive_range1 = "TiDB"..="TiKV"; + /// let inclusive_range2 = "TiKV"..="TiSpark"; + /// let iterable = vec![inclusive_range1, inclusive_range2]; + /// let req = connected_client.batch_scan(iterable, 2); + /// let result = req.wait(); + /// ``` + pub fn batch_scan( + &self, + ranges: impl IntoIterator, + each_limit: impl Into>, + ) -> BatchScan { + BatchScan::new( + self.rpc(), + BatchScanInner::new( + ranges.into_iter().map(KeyRange::into_keys).collect(), + each_limit.into().unwrap_or(u32::MAX), + ), + ) + } + + /// Create a new [`DeleteRange`](struct.DeleteRange.html) request. + /// + /// Once resolved this request will result in the deletion of all keys over the given range. + /// + /// If not passed a `limit` parameter, it will default to `u32::MAX`. + /// + /// ```rust,no_run + /// # use tikv_client::{Key, Config, raw::Client}; + /// # use futures::Future; + /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.wait().unwrap(); + /// let inclusive_range = "TiKV"..="TiDB"; + /// let req = connected_client.delete_range(inclusive_range); + /// let result: () = req.wait().unwrap(); + /// ``` + pub fn delete_range(&self, range: impl KeyRange) -> DeleteRange { + DeleteRange::new(self.rpc(), DeleteRangeInner::new(range.into_keys())) + } +} + +/// An unresolved [`Client`](struct.Client.html) connection to a TiKV cluster. +/// +/// Once resolved it will result in a connected [`Client`](struct.Client.html). +/// +/// ```rust,no_run +/// use tikv_client::{Config, raw::{Client, Connect}}; +/// use futures::Future; +/// +/// let connect: Connect = Client::new(&Config::default()); +/// let client: Client = connect.wait().unwrap(); +/// ``` +pub struct Connect { + config: Config, +} + +impl Connect { + fn new(config: Config) -> Self { + Connect { config } + } +} + +impl Future for Connect { + type Item = Client; + type Error = Error; + + fn poll(&mut self) -> Poll { + let config = &self.config; + let rpc = Arc::new(RpcClient::connect(config)?); + Ok(Async::Ready(Client { rpc })) + } +} + + +/// A [`ColumnFamily`](struct.ColumnFamily.html) is an optional parameter for [`raw::Client`](struct.Client.html) requests. +/// +/// TiKV uses RocksDB's `ColumnFamily` support. You can learn more about RocksDB's `ColumnFamily`s [on their wiki](https://github.com/facebook/rocksdb/wiki/Column-Families). +/// +/// By default in TiKV data is stored in three different `ColumnFamily` values, configurable in the TiKV server's configuration: +/// +/// * Default: Where real user data is stored. Set by `[rocksdb.defaultcf]`. +/// * Write: Where MVCC and index related data are stored. Set by `[rocksdb.writecf]`. +/// * Lock: Where lock information is stored. Set by `[rocksdb.lockcf]`. +/// +/// Not providing a call a `ColumnFamily` means it will use the default value of `default`. +/// +/// The best (and only) way to create a [`ColumnFamily`](struct.ColumnFamily.html) is via the `From` implementation: +/// +/// ```rust +/// # use tikv_client::raw::ColumnFamily; +/// +/// let cf = ColumnFamily::from("write"); +/// let cf = ColumnFamily::from(String::from("write")); +/// let cf = ColumnFamily::from(&String::from("write")); +/// ``` +/// +/// This is a *wrapper type* that implements `Deref` so it can be used like one transparently. +/// +/// **But, you should not need to worry about all this:** Many functions which accept a +/// `ColumnFamily` accept an `Into`, which means all of the above types can be passed +/// directly to those functions. #[derive(Default, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug)] pub struct ColumnFamily(String); @@ -38,6 +305,15 @@ impl ColumnFamily { } } +impl Deref for ColumnFamily { + type Target = String; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + + + pub trait RequestInner: Sized { type Resp; @@ -65,6 +341,7 @@ where } } + /// Set the (optional) [`ColumnFamily`](struct.ColumnFamily.html). pub fn cf(mut self, cf: impl Into) -> Self { self.cf = Some(cf.into()); self @@ -128,6 +405,10 @@ impl RequestInner for GetInner { } } +/// An unresolved [`Client::get`](struct.Client.html#method.get) request. +/// +/// Once resolved this request will result in the fetching of the value associated with the given +/// key. pub type Get = Request; pub struct BatchGetInner { @@ -148,8 +429,14 @@ impl BatchGetInner { } } +/// An unresolved [`Client::batch_get`](struct.Client.html#method.batch_get) request. +/// +/// Once resolved this request will result in the fetching of the values associated with the given +/// keys. pub type BatchGet = Request; + + pub struct PutInner { key: Key, value: Value, @@ -170,8 +457,15 @@ impl RequestInner for PutInner { } } +/// An unresolved [`Client::put`](struct.Client.html#method.put) request. +/// +/// Once resolved this request will result in the putting of the value associated with the given +/// key. pub type Put = Request; +/// An unresolved [`Client::batch_put`](struct.Client.html#method.batch_put) request. +/// +/// Once resolved this request will result in the setting of the value associated with the given key. pub struct BatchPutInner { pairs: Vec, } @@ -190,6 +484,10 @@ impl RequestInner for BatchPutInner { } } + +/// An unresolved [`Client::batch_put`](struct.Client.html#method.batch_put) request. +/// +/// Once resolved this request will result in the setting of the value associated with the given key. pub type BatchPut = Request; pub struct DeleteInner { @@ -210,6 +508,9 @@ impl RequestInner for DeleteInner { } } +/// An unresolved [`Client::delete`](struct.Client.html#method.delete) request. +/// +/// Once resolved this request will result in the deletion of the given key. pub type Delete = Request; pub struct BatchDeleteInner { @@ -233,13 +534,13 @@ impl RequestInner for BatchDeleteInner { pub type BatchDelete = Request; pub struct ScanInner { - range: Result<(Key, Option)>, + range: (Bound, Bound), limit: u32, key_only: bool, } impl ScanInner { - fn new(range: Result<(Key, Option)>, limit: u32) -> Self { + fn new(range: (Bound, Bound), limit: u32) -> Self { ScanInner { range, limit, @@ -252,10 +553,11 @@ impl RequestInner for ScanInner { type Resp = Vec; fn execute(self, client: Arc, cf: Option) -> KvFuture { - match self.range { - Ok(range) => Box::new(client.raw_scan(range, self.limit, self.key_only, cf)), - Err(e) => Box::new(future::err(e)), - } + let keys = match self.range.into_keys() { + Err(e) => return Box::new(future::err(e)), + Ok(v) => v, + }; + Box::new(client.raw_scan(keys, self.limit, self.key_only, cf)) } } @@ -270,6 +572,7 @@ impl Scan { } } + /// Set the (optional) [`ColumnFamily`](struct.ColumnFamily.html). pub fn cf(mut self, cf: impl Into) -> Self { self.request = self.request.cf(cf); self @@ -339,6 +642,7 @@ impl BatchScan { } } + /// Set the (optional) [`ColumnFamily`](struct.ColumnFamily.html). pub fn cf(mut self, cf: impl Into) -> Self { self.request = self.request.cf(cf); self @@ -362,6 +666,8 @@ impl Future for BatchScan { } } +pub type DeleteRange = Request; + pub struct DeleteRangeInner { range: Result<(Key, Option)>, } @@ -382,119 +688,3 @@ impl RequestInner for DeleteRangeInner { } } } - -pub type DeleteRange = Request; - -pub struct Connect { - config: Config, -} - -impl Connect { - fn new(config: Config) -> Self { - Connect { config } - } -} - -impl Future for Connect { - type Item = Client; - type Error = Error; - - fn poll(&mut self) -> Poll { - let config = &self.config; - let rpc = Arc::new(RpcClient::connect(config)?); - Ok(Async::Ready(Client { rpc })) - } -} - -pub struct Client { - rpc: Arc, -} - -impl Client { - #![allow(clippy::new_ret_no_self)] - pub fn new(config: &Config) -> Connect { - Connect::new(config.clone()) - } - - #[inline] - fn rpc(&self) -> Arc { - Arc::clone(&self.rpc) - } - - pub fn get(&self, key: impl AsRef) -> Get { - Get::new(self.rpc(), GetInner::new(key.as_ref().clone())) - } - - pub fn batch_get(&self, keys: impl AsRef<[Key]>) -> BatchGet { - BatchGet::new(self.rpc(), BatchGetInner::new(keys.as_ref().to_vec())) - } - - pub fn put(&self, key: impl Into, value: impl Into) -> Put { - Put::new(self.rpc(), PutInner::new(key.into(), value.into())) - } - - pub fn batch_put(&self, pairs: impl IntoIterator>) -> BatchPut { - BatchPut::new( - self.rpc(), - BatchPutInner::new(pairs.into_iter().map(Into::into).collect()), - ) - } - - pub fn delete(&self, key: impl AsRef) -> Delete { - Delete::new(self.rpc(), DeleteInner::new(key.as_ref().clone())) - } - - pub fn batch_delete(&self, keys: impl AsRef<[Key]>) -> BatchDelete { - BatchDelete::new(self.rpc(), BatchDeleteInner::new(keys.as_ref().to_vec())) - } - - pub fn scan(&self, range: impl RangeBounds, limit: u32) -> Scan { - Scan::new( - self.rpc(), - ScanInner::new(Self::range_bounds(&range), limit), - ) - } - - pub fn batch_scan(&self, ranges: Ranges, each_limit: u32) -> BatchScan - where - Ranges: AsRef<[Bounds]>, - Bounds: RangeBounds, - { - BatchScan::new( - self.rpc(), - BatchScanInner::new( - ranges.as_ref().iter().map(Self::range_bounds).collect(), - each_limit, - ), - ) - } - - pub fn delete_range(&self, range: impl RangeBounds) -> DeleteRange { - DeleteRange::new( - self.rpc(), - DeleteRangeInner::new(Self::range_bounds(&range)), - ) - } - - fn bound(bound: Bound<&Key>) -> Option { - match bound { - Bound::Included(k) => Some(k.clone()), - Bound::Excluded(k) => Some(k.clone()), - Bound::Unbounded => None, - } - } - - fn range_bounds(range: &impl RangeBounds) -> Result<(Key, Option)> { - if let Bound::Included(_) = range.end_bound() { - return Err(Error::InvalidKeyRange); - } - if let Bound::Excluded(_) = range.start_bound() { - return Err(Error::InvalidKeyRange); - } - - Ok(( - Self::bound(range.start_bound()).unwrap_or_else(Key::default), - Self::bound(range.end_bound()), - )) - } -} diff --git a/src/rpc/client.rs b/src/rpc/client.rs index 3680a0dc..a33ce31d 100644 --- a/src/rpc/client.rs +++ b/src/rpc/client.rs @@ -486,11 +486,11 @@ impl RegionContext { } fn start_key(&self) -> Key { - self.region.start_key().into() + self.region.start_key().to_vec().into() } fn end_key(&self) -> Key { - self.region.end_key().into() + self.region.end_key().to_vec().into() } fn range(&self) -> (Key, Key) { diff --git a/src/rpc/tikv/client.rs b/src/rpc/tikv/client.rs index 0dfc591e..e7d85b3c 100644 --- a/src/rpc/tikv/client.rs +++ b/src/rpc/tikv/client.rs @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{fmt, sync::Arc, time::Duration}; +use std::{fmt, sync::Arc, time::Duration, ops::Bound}; use futures::Future; use grpcio::{CallOption, Environment}; @@ -662,7 +662,7 @@ impl KvClient { #[inline] fn convert_from_grpc_pair(mut pair: kvrpcpb::KvPair) -> KvPair { - KvPair::new(pair.take_key().into(), pair.take_value().into()) + KvPair::new(Key::from(pair.take_key()), Value::from(pair.take_value())) } #[inline] diff --git a/src/transaction.rs b/src/transaction.rs index c43f58f3..1d6e425c 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -11,11 +11,118 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::ops::RangeBounds; +/*! Transactional related functionality. -use futures::{Future, Poll, Stream}; +Using the [`transaction::Client`](struct.Client.html) you can utilize TiKV's transactional interface. +This interface offers SQL-like transactions on top of the raw interface. + +**Warning:** It is not advisible to use the both raw and transactional functionality in the same keyspace. + */ use crate::{Config, Error, Key, KvPair, Value}; +use futures::{Future, Poll, Stream}; +use std::ops::RangeBounds; + +/// The TiKV transactional [`Client`](struct.Client.html) is used to issue requests to the TiKV server and PD cluster. +pub struct Client; + +impl Client { + /// Create a new [`Client`](struct.Client.html) once the [`Connect`](struct.Connect.html) resolves. + /// + /// ```rust,no_run + /// use tikv_client::{Config, transaction::Client}; + /// use futures::Future; + /// let connect = Client::new(&Config::default()); + /// let client = connect.wait(); + /// ``` + #[cfg_attr(feature = "cargo-clippy", allow(clippy::new_ret_no_self))] + pub fn new(config: &Config) -> Connect { + Connect::new(config.clone()) + } + + /// Create a new [`Transaction`](struct.Transaction.html) using the timestamp from [`current_timestamp`](struct.Client.html#method.current_timestamp). + /// + /// Using the transaction you can issue commands like [`get`](struct.Transaction.html#method.get) or [`set`](file:///home/hoverbear/git/client-rust/target/doc/tikv_client/transaction/struct.Transaction.html#method.set). + /// + /// ```rust,no_run + /// use tikv_client::{Config, transaction::Client}; + /// use futures::Future; + /// let connect = Client::new(&Config::default()); + /// let client = connect.wait().unwrap(); + /// let transaction = client.begin(); + /// // ... Issue some commands. + /// let commit = transaction.commit(); + /// let result: () = commit.wait().unwrap(); + /// ``` + pub fn begin(&self) -> Transaction { + unimplemented!() + } + + /// Create a new [`Transaction`](struct.Transaction.html) at the provded timestamp. + /// + /// ```rust,no_run + /// use tikv_client::{Config, transaction::Client}; + /// use futures::Future; + /// let connect = Client::new(&Config::default()); + /// let client = connect.wait().unwrap(); + /// let timestamp = client.current_timestamp(); + /// let transaction = client.begin_with_timestamp(timestamp); + /// // ... Issue some commands. + /// let commit = transaction.commit(); + /// let result: () = commit.wait().unwrap(); + /// ``` + pub fn begin_with_timestamp(&self, _timestamp: Timestamp) -> Transaction { + unimplemented!() + } + + /// Get a [`Snapshot`](struct.Snapshot.html) using the timestamp from [`current_timestamp`](struct.Client.html#method.current_timestamp). + /// + /// ```rust,no_run + /// use tikv_client::{Config, transaction::Client}; + /// use futures::Future; + /// let connect = Client::new(&Config::default()); + /// let client = connect.wait().unwrap(); + /// let snapshot = client.snapshot(); + /// // ... Issue some commands. + /// ``` + pub fn snapshot(&self) -> Snapshot { + unimplemented!() + } + + /// Retrieve the current [`Timestamp`](struct.Timestamp.html). + /// + /// ```rust,no_run + /// use tikv_client::{Config, transaction::Client}; + /// use futures::Future; + /// let connect = Client::new(&Config::default()); + /// let client = connect.wait().unwrap(); + /// let timestamp = client.current_timestamp(); + /// ``` + pub fn current_timestamp(&self) -> Timestamp { + unimplemented!() + } +} + +/// An unresolved [`Client`](struct.Client.html) connection to a TiKV cluster. +/// +/// Once resolved it will result in a connected [`Client`](struct.Client.html). +/// +/// ```rust,no_run +/// use tikv_client::{Config, transaction::{Client, Connect}}; +/// use futures::Future; +/// +/// let connect: Connect = Client::new(&Config::default()); +/// let client: Client = connect.wait().unwrap(); +/// ``` +pub struct Connect { + config: Config, +} + +impl Connect { + fn new(config: Config) -> Self { + Connect { config } + } +} pub enum Mutation { Put(Key, Value), @@ -29,12 +136,23 @@ pub struct TxnInfo { pub status: u64, } +impl Future for Connect { + type Item = Client; + type Error = Error; + + fn poll(&mut self) -> Poll { + let _config = &self.config; + unimplemented!() + } +} + +/// A logical timestamp produced by PD. #[derive(Copy, Clone)] pub struct Timestamp(u64); -impl Into for u64 { - fn into(self) -> Timestamp { - Timestamp(self) +impl From for Timestamp { + fn from(v: u64) -> Self { + Timestamp(v) } } @@ -52,6 +170,246 @@ impl Timestamp { } } +/// A undo-able set of actions on the dataset. +/// +/// Using a transaction you can prepare a set of actions (such as `get`, or `set`) on data at a +/// particular timestamp obtained from the placement driver. +/// +/// Once a transaction is commited, a new commit timestamp is obtained from the placement driver. +pub struct Transaction { + snapshot: Snapshot, +} + +impl Transaction { + /// Create a new transaction operating on the given snapshot. + /// + /// ```rust,no_run + /// use tikv_client::{Config, transaction::Client}; + /// use futures::Future; + /// let connect = Client::new(&Config::default()); + /// let client = connect.wait().unwrap(); + /// let txn = client.begin(); + /// ``` + pub fn new(snapshot: Snapshot) -> Self { + Self { snapshot } + } + + /// Commit the actions of the transaction. + /// + /// Once committed, it is no longer possible to `rollback` the actions in the transaction. + /// + /// ```rust,no_run + /// # use tikv_client::{Config, transaction::Client}; + /// # use futures::Future; + /// # let connect = Client::new(&Config::default()); + /// # let connected_client = connect.wait().unwrap(); + /// let txn = connected_client.begin(); + /// // ... Do some actions. + /// let req = txn.commit(); + /// let result: () = req.wait().unwrap(); + /// ``` + pub fn commit(self) -> Commit { + Commit::new(self) + } + + /// Rollback the actions of the transaction. + /// + /// ```rust,no_run + /// # use tikv_client::{Config, transaction::Client}; + /// # use futures::Future; + /// # let connect = Client::new(&Config::default()); + /// # let connected_client = connect.wait().unwrap(); + /// let txn = connected_client.begin(); + /// // ... Do some actions. + /// let req = txn.rollback(); + /// let result: () = req.wait().unwrap(); + /// ``` + pub fn rollback(self) -> Rollback { + Rollback::new(self) + } + + /// Lock the given keys. + /// + /// ```rust,no_run + /// # use tikv_client::{Config, transaction::Client}; + /// # use futures::Future; + /// # let connect = Client::new(&Config::default()); + /// # let connected_client = connect.wait().unwrap(); + /// let mut txn = connected_client.begin(); + /// // ... Do some actions. + /// let req = txn.lock_keys(vec!["TiKV", "Rust"]); + /// let result: () = req.wait().unwrap(); + /// ``` + pub fn lock_keys(&mut self, keys: impl IntoIterator>) -> LockKeys { + LockKeys::new(keys.into_iter().map(|v| v.into()).collect()) + } + + pub fn is_readonly(&self) -> bool { + unimplemented!() + } + + /// Returns the timestamp which the transaction started at. + /// + /// ```rust,no_run + /// # use tikv_client::{Config, transaction::{Client, Timestamp}}; + /// # use futures::Future; + /// # let connect = Client::new(&Config::default()); + /// # let connected_client = connect.wait().unwrap(); + /// let txn = connected_client.begin(); + /// // ... Do some actions. + /// let ts: Timestamp = txn.start_ts(); + /// ``` + pub fn start_ts(&self) -> Timestamp { + unimplemented!() + } + + /// Get the `Snapshot` the transaction is operating on. + /// + /// ```rust,no_run + /// # use tikv_client::{Config, transaction::{Client, Snapshot}}; + /// # use futures::Future; + /// # let connect = Client::new(&Config::default()); + /// # let connected_client = connect.wait().unwrap(); + /// let txn = connected_client.begin(); + /// // ... Do some actions. + /// let snap: Snapshot = txn.snapshot(); + /// ``` + pub fn snapshot(&self) -> Snapshot { + unimplemented!() + } + + /// Set the isolation level of the transaction. + /// + /// ```rust,no_run + /// # use tikv_client::{Config, transaction::{Client, IsolationLevel}}; + /// # use futures::Future; + /// # let connect = Client::new(&Config::default()); + /// # let connected_client = connect.wait().unwrap(); + /// let mut txn = connected_client.begin(); + /// txn.set_isolation_level(IsolationLevel::SnapshotIsolation); + /// ``` + pub fn set_isolation_level(&mut self, _level: IsolationLevel) { + unimplemented!() + } + + /// Create a new [`Get`](struct.Get.html) request. + /// + /// Once resolved this request will result in the fetching of the value associated with the + /// given key. + /// + /// ```rust,no_run + /// # use tikv_client::{Value, Config, transaction::Client}; + /// # use futures::Future; + /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.wait().unwrap(); + /// let mut txn = connected_client.begin(); + /// let key = "TiKV"; + /// let req = txn.get(key); + /// let result: Value = req.wait().unwrap(); + /// // Finish the transaction... + /// txn.commit().wait().unwrap(); + /// ``` + pub fn get(&self, key: impl Into) -> Get { + self.snapshot.get(key.into()) + } + + /// Create a new [`BatchGet`](struct.BatchGet.html) request. + /// + /// Once resolved this request will result in the fetching of the values associated with the + /// given keys. + /// + /// ```rust,no_run + /// # use tikv_client::{KvPair, Config, transaction::Client}; + /// # use futures::Future; + /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.wait().unwrap(); + /// let mut txn = connected_client.begin(); + /// let keys = vec!["TiKV", "TiDB"]; + /// let req = txn.batch_get(keys); + /// let result: Vec = req.wait().unwrap(); + /// // Finish the transaction... + /// txn.commit().wait().unwrap(); + /// ``` + pub fn batch_get(&self, keys: impl IntoIterator>) -> BatchGet { + self.snapshot.batch_get(keys) + } + + pub fn scan(&self, range: impl RangeBounds) -> Scanner { + self.snapshot.scan(range) + } + + pub fn scan_reverse(&self, range: impl RangeBounds) -> Scanner { + self.snapshot.scan_reverse(range) + } + + /// Create a new [`Set`](struct.Set.html) request. + /// + /// Once resolved this request will result in the setting of the value associated with the given key. + /// + /// ```rust,no_run + /// # use tikv_client::{Key, Value, Config, transaction::Client}; + /// # use futures::Future; + /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.wait().unwrap(); + /// let mut txn = connected_client.begin(); + /// let key = "TiKV"; + /// let val = "TiKV"; + /// let req = txn.set(key, val); + /// let result: () = req.wait().unwrap(); + /// // Finish the transaction... + /// txn.commit().wait().unwrap(); + /// ``` + pub fn set(&mut self, key: impl Into, value: impl Into) -> Set { + Set::new(key.into(), value.into()) + } + + /// Create a new [`Delete`](struct.Delete.html) request. + /// + /// Once resolved this request will result in the deletion of the given key. + /// + /// ```rust,no_run + /// # use tikv_client::{Key, Config, transaction::Client}; + /// # use futures::Future; + /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.wait().unwrap(); + /// let mut txn = connected_client.begin(); + /// let key = "TiKV"; + /// let req = txn.delete(key); + /// let result: () = req.wait().unwrap(); + /// // Finish the transaction... + /// txn.commit().wait().unwrap(); + /// ``` + pub fn delete(&mut self, key: impl Into) -> Delete { + Delete::new(key.into()) + } +} + +/// A snapshot of dataset at a particular point in time. +pub struct Snapshot; + +impl Snapshot { + pub fn get(&self, key: impl Into) -> Get { + Get::new(key.into()) + } + + pub fn batch_get(&self, keys: impl IntoIterator>) -> BatchGet { + BatchGet::new(keys.into_iter().map(|v| v.into()).collect()) + } + + pub fn scan(&self, range: impl RangeBounds) -> Scanner { + drop(range); + unimplemented!() + } + + pub fn scan_reverse(&self, range: impl RangeBounds) -> Scanner { + drop(range); + unimplemented!() + } +} + +/// An unresolved [`Transaction::scan`](struct.Transaction.html#method.scan) request. +/// +/// Once resolved this request will result in a scanner over the given keys. pub struct Scanner; impl Stream for Scanner { @@ -63,12 +421,41 @@ impl Stream for Scanner { } } +/// The isolation level guarantees provided by the transaction. #[derive(Copy, Clone, Eq, PartialEq, Debug)] pub enum IsolationLevel { + /// Consistent reads and conflict free writes. + /// + /// Snapshot isolation guarantees: + /// * All reads will see the last committed value of the data at the snapshot timestamp. + /// * The transaction will only successfully commit if no updates to the data have created a + /// conflict with concurrent updates made sine the snapshot. + /// + /// Using this level means: + /// * Lost updates don't occur. + /// * Dirty reads don't occur. + /// * Non-repeatable reads don't occur. + /// * Phantom reads don't occur. SnapshotIsolation, + /// Reads may not be consistent, but writes are conflict free. + /// + /// Read committed guarantees: + /// * All reads are committed at the moment it is read. + /// not repeatable. + /// * Write locks are only released at the end of the transaction. + /// + /// Using this level means: + /// * Lost updates don't occur. + /// * Dirty reads don't occur. + /// * Non-repeatable reads may occur. + /// * Phantom reads may occur. ReadCommitted, } +/// An unresolved [`Transaction::get`](struct.Transaction.html#method.get) request. +/// +/// Once resolved this request will result in the fetching of the value associated with the given +/// key. pub struct Get { key: Key, } @@ -89,6 +476,10 @@ impl Future for Get { } } +/// An unresolved [`Transaction::batch_get`](struct.Transaction.html#method.batch_get) request. +/// +/// Once resolved this request will result in the fetching of the values associated with the given +/// keys. pub struct BatchGet { keys: Vec, } @@ -100,7 +491,7 @@ impl BatchGet { } impl Future for BatchGet { - type Item = Value; + type Item = Vec; type Error = Error; fn poll(&mut self) -> Poll { @@ -109,6 +500,9 @@ impl Future for BatchGet { } } +/// An unresolved [`Transaction::commit`](struct.Transaction.html#method.commit) request. +/// +/// Once resolved this request will result in the committing of the transaction. pub struct Commit { txn: Transaction, } @@ -129,6 +523,9 @@ impl Future for Commit { } } +/// An unresolved [`Transaction::rollback`](struct.Transaction.html#method.rollback) request. +/// +/// Once resolved this request will result in the rolling back of the transaction. pub struct Rollback { txn: Transaction, } @@ -149,6 +546,9 @@ impl Future for Rollback { } } +/// An unresolved [`Transaction::lock_keys`](struct.Transaction.html#method.lock_keys) request. +/// +/// Once resolved this request will result in the locking of the given keys. pub struct LockKeys { keys: Vec, } @@ -169,6 +569,10 @@ impl Future for LockKeys { } } +/// An unresolved [`Transaction::set`](struct.Transaction.html#method.set) request. +/// +/// Once resolved this request will result in the setting of the value associated with the given +/// key. pub struct Set { key: Key, value: Value, @@ -191,6 +595,9 @@ impl Future for Set { } } +/// An unresolved [`Transaction::delete`](struct.Transaction.html#method.delete) request. +/// +/// Once resolved this request will result in the deletion of the given key. pub struct Delete { key: Key, } @@ -209,120 +616,4 @@ impl Future for Delete { let _key = &self.key; unimplemented!() } -} - -pub struct Transaction { - snapshot: Snapshot, -} - -impl Transaction { - pub fn commit(self) -> Commit { - Commit::new(self) - } - - pub fn rollback(self) -> Rollback { - Rollback::new(self) - } - - pub fn lock_keys(&mut self, keys: impl AsRef<[Key]>) -> LockKeys { - LockKeys::new(keys.as_ref().to_vec().clone()) - } - - pub fn is_readonly(&self) -> bool { - unimplemented!() - } - - pub fn start_ts(&self) -> Timestamp { - unimplemented!() - } - - pub fn snapshot(&self) -> Snapshot { - unimplemented!() - } - - pub fn set_isolation_level(&mut self, _level: IsolationLevel) { - unimplemented!() - } - - pub fn get(&self, key: impl AsRef) -> Get { - self.snapshot.get(key) - } - - pub fn batch_get(&self, keys: impl AsRef<[Key]>) -> BatchGet { - self.snapshot.batch_get(keys) - } - - pub fn scan(&self, range: impl RangeBounds) -> Scanner { - self.snapshot.scan(range) - } - - pub fn set(&mut self, key: impl Into, value: impl Into) -> Set { - Set::new(key.into(), value.into()) - } - - pub fn delete(&mut self, key: impl AsRef) -> Delete { - Delete::new(key.as_ref().clone()) - } -} - -pub struct Snapshot; - -impl Snapshot { - pub fn get(&self, key: impl AsRef) -> Get { - Get::new(key.as_ref().clone()) - } - - pub fn batch_get(&self, keys: impl AsRef<[Key]>) -> BatchGet { - BatchGet::new(keys.as_ref().to_vec().clone()) - } - - pub fn scan(&self, range: impl RangeBounds) -> Scanner { - drop(range); - unimplemented!() - } -} - -pub struct Connect { - config: Config, -} - -impl Connect { - fn new(config: Config) -> Self { - Connect { config } - } -} - -impl Future for Connect { - type Item = Client; - type Error = Error; - - fn poll(&mut self) -> Poll { - let _config = &self.config; - unimplemented!() - } -} - -pub struct Client {} - -impl Client { - #![allow(clippy::new_ret_no_self)] - pub fn new(config: &Config) -> Connect { - Connect::new(config.clone()) - } - - pub fn begin(&self) -> Transaction { - unimplemented!() - } - - pub fn begin_with_timestamp(&self, _timestamp: Timestamp) -> Transaction { - unimplemented!() - } - - pub fn snapshot(&self) -> Snapshot { - unimplemented!() - } - - pub fn current_timestamp(&self) -> Timestamp { - unimplemented!() - } -} +} \ No newline at end of file diff --git a/tests/raw.rs b/tests/raw.rs index cfe17177..46e85b54 100644 --- a/tests/raw.rs +++ b/tests/raw.rs @@ -28,7 +28,7 @@ fn wipe_all(client: &Client) { let test_key_start = generate_key(0); let test_key_end = generate_key(NUM_TEST_KEYS as i32 - 1); client - .delete_range(&test_key_start..&test_key_end) + .delete_range(test_key_start..test_key_end) .wait() .expect("Could not delete test keys"); } @@ -46,7 +46,7 @@ fn test_empty(client: &Client) { let test_key_end = generate_key(NUM_TEST_KEYS as i32 - 1); assert!(client - .scan(&test_key_start..&test_key_end, NUM_TEST_KEYS) + .scan(test_key_start..test_key_end, NUM_TEST_KEYS) .wait() .expect("Could not scan") .is_empty()); @@ -56,14 +56,15 @@ fn test_existence(client: &Client, existing_pairs: Vec, not_existing_key let test_key_start = generate_key(0); let test_key_end = generate_key(NUM_TEST_KEYS as i32 - 1); - for pair in existing_pairs.iter() { + for pair in existing_pairs.clone().into_iter() { + let (key, value) = pair.into_inner(); assert_eq!( - client.get(pair.key()).wait().expect("Could not get value"), - pair.value().clone(), + client.get(key).wait().expect("Could not get value"), + value.clone(), ); } - for key in not_existing_keys.iter() { + for key in not_existing_keys.clone().into_iter() { let r = client.get(key).wait(); assert!(r.is_err()); } @@ -81,7 +82,7 @@ fn test_existence(client: &Client, existing_pairs: Vec, not_existing_key assert_eq!( client - .batch_get(&all_keys) + .batch_get(all_keys) .wait() .expect("Could not get value in batch"), existing_pairs, @@ -89,7 +90,7 @@ fn test_existence(client: &Client, existing_pairs: Vec, not_existing_key assert_eq!( client - .batch_get(¬_existing_keys) + .batch_get(not_existing_keys) .wait() .expect("Could not get value in batch"), Vec::new(), @@ -97,7 +98,7 @@ fn test_existence(client: &Client, existing_pairs: Vec, not_existing_key assert_eq!( client - .scan(&test_key_start..&test_key_end, NUM_TEST_KEYS) + .scan(test_key_start.clone()..test_key_end.clone(), NUM_TEST_KEYS) .wait() .expect("Could not scan"), existing_pairs, @@ -105,7 +106,7 @@ fn test_existence(client: &Client, existing_pairs: Vec, not_existing_key assert_eq!( client - .scan(&test_key_start..&test_key_end, NUM_TEST_KEYS) + .scan(test_key_start.clone()..test_key_end.clone(), NUM_TEST_KEYS) .key_only() .wait() .expect("Could not scan"), @@ -129,7 +130,7 @@ fn basic_raw_test() { vec![generate_key(1), generate_key(2)], ); - assert!(client.delete(&generate_key(0)).wait().is_ok()); + assert!(client.delete(generate_key(0)).wait().is_ok()); test_existence( &client, Vec::new(), @@ -147,7 +148,7 @@ fn basic_raw_test() { ); let keys: Vec = vec![generate_key(8), generate_key(9)]; - assert!(client.batch_delete(&keys).wait().is_ok()); + assert!(client.batch_delete(keys).wait().is_ok()); let mut pairs = pairs; pairs.truncate(8); test_existence( From 2cf29060f73cb96bafd9f2b7a668b6c4a1a44c7b Mon Sep 17 00:00:00 2001 From: Hoverbear Date: Fri, 28 Dec 2018 11:22:26 -0800 Subject: [PATCH 2/3] Reduce pub surface Signed-off-by: Hoverbear --- src/raw.rs | 96 +++++++++++++++++++++++++++++++++--------------------- 1 file changed, 59 insertions(+), 37 deletions(-) diff --git a/src/raw.rs b/src/raw.rs index 13ed10cf..6800dd47 100644 --- a/src/raw.rs +++ b/src/raw.rs @@ -387,7 +387,15 @@ where } } -pub struct GetInner { +/// An unresolved [`Client::get`](struct.Client.html#method.get) request. +/// +/// Once resolved this request will result in the fetching of the value associated with the given +/// key. +pub struct Get { + request: Request, +} + +pub(crate) struct GetInner { key: Key, } @@ -405,13 +413,15 @@ impl RequestInner for GetInner { } } -/// An unresolved [`Client::get`](struct.Client.html#method.get) request. -/// -/// Once resolved this request will result in the fetching of the value associated with the given -/// key. -pub type Get = Request; +/// An unresolved [`Client::batch_get`](struct.Client.html#method.batch_get) request. +/// +/// Once resolved this request will result in the fetching of the values associated with the given +/// keys. +pub struct BatchGet { + request: Request, +} -pub struct BatchGetInner { +pub(crate) struct BatchGetInner { keys: Vec, } @@ -429,15 +439,15 @@ impl BatchGetInner { } } -/// An unresolved [`Client::batch_get`](struct.Client.html#method.batch_get) request. -/// -/// Once resolved this request will result in the fetching of the values associated with the given -/// keys. -pub type BatchGet = Request; - - +/// An unresolved [`Client::put`](struct.Client.html#method.put) request. +/// +/// Once resolved this request will result in the putting of the value associated with the given +/// key. +pub struct Put { + request: Request, +} -pub struct PutInner { +pub(crate) struct PutInner { key: Key, value: Value, } @@ -457,16 +467,14 @@ impl RequestInner for PutInner { } } -/// An unresolved [`Client::put`](struct.Client.html#method.put) request. -/// -/// Once resolved this request will result in the putting of the value associated with the given -/// key. -pub type Put = Request; - /// An unresolved [`Client::batch_put`](struct.Client.html#method.batch_put) request. /// /// Once resolved this request will result in the setting of the value associated with the given key. -pub struct BatchPutInner { +pub struct BatchPut { + request: Request, +} + +pub(crate) struct BatchPutInner { pairs: Vec, } @@ -484,13 +492,14 @@ impl RequestInner for BatchPutInner { } } +/// An unresolved [`Client::delete`](struct.Client.html#method.delete) request. +/// +/// Once resolved this request will result in the deletion of the given key. +pub struct Delete { + request: Request, +} -/// An unresolved [`Client::batch_put`](struct.Client.html#method.batch_put) request. -/// -/// Once resolved this request will result in the setting of the value associated with the given key. -pub type BatchPut = Request; - -pub struct DeleteInner { +pub(crate) struct DeleteInner { key: Key, } @@ -508,12 +517,14 @@ impl RequestInner for DeleteInner { } } -/// An unresolved [`Client::delete`](struct.Client.html#method.delete) request. +/// An unresolved [`Client::batch_delete`](struct.Client.html#method.batch_delete) request. /// -/// Once resolved this request will result in the deletion of the given key. -pub type Delete = Request; +/// Once resolved this request will result in the deletion of the given keys. +pub struct BatchDelete { + request: Request, +} -pub struct BatchDeleteInner { +pub(crate) struct BatchDeleteInner { keys: Vec, } @@ -531,9 +542,8 @@ impl RequestInner for BatchDeleteInner { } } -pub type BatchDelete = Request; -pub struct ScanInner { +pub(crate) struct ScanInner { range: (Bound, Bound), limit: u32, key_only: bool, @@ -561,6 +571,9 @@ impl RequestInner for ScanInner { } } +/// An unresolved [`Client::scan`](struct.Client.html#method.scan) request. +/// +/// Once resolved this request will result in a scanner over the given range. pub struct Scan { request: Request, } @@ -596,7 +609,7 @@ impl Future for Scan { } } -pub struct BatchScanInner { +pub(crate) struct BatchScanInner { ranges: Vec)>>, each_limit: u32, key_only: bool, @@ -631,6 +644,9 @@ impl RequestInner for BatchScanInner { } } +/// An unresolved [`Client::batch_scan`](struct.Client.html#method.batch_scan) request. +/// +/// Once resolved this request will result in a scanner over the given ranges. pub struct BatchScan { request: Request, } @@ -666,9 +682,15 @@ impl Future for BatchScan { } } -pub type DeleteRange = Request; +/// An unresolved [`Client::delete_range`](struct.Client.html#method.delete_range) request. +/// +/// Once resolved this request will result in the deletion of the values in the given +/// range. +pub struct DeleteRange { + request: Request, +} -pub struct DeleteRangeInner { +pub(crate) struct DeleteRangeInner { range: Result<(Key, Option)>, } From 0ebb7b0f5c5c76b595dd740d1befbbe4eea8e221 Mon Sep 17 00:00:00 2001 From: Hoverbear Date: Fri, 28 Dec 2018 11:37:08 -0800 Subject: [PATCH 3/3] fmt/lint Signed-off-by: Hoverbear --- examples/raw.rs | 21 +++-- src/errors.rs | 2 +- src/lib.rs | 78 +++++++++-------- src/raw.rs | 184 +++++++++++++++++++++++++++++++++++------ src/rpc/tikv/client.rs | 2 +- src/transaction.rs | 8 +- 6 files changed, 221 insertions(+), 74 deletions(-) diff --git a/examples/raw.rs b/examples/raw.rs index efa273ed..5c166c4b 100644 --- a/examples/raw.rs +++ b/examples/raw.rs @@ -13,7 +13,7 @@ use futures::future::Future; use std::path::PathBuf; -use tikv_client::{Result, Config, raw::Client, Key, Value}; +use tikv_client::{raw::Client, Config, Key, Result, Value}; const KEY: &str = "TiKV"; const VALUE: &str = "Rust"; @@ -25,7 +25,8 @@ fn main() -> Result<()> { let config = Config::new(vec![ "192.168.0.101:3379", // Avoid a single point of failure, "192.168.0.100:3379", // use at least two PD endpoints. - ]).with_security( + ]) + .with_security( PathBuf::from("/path/to/ca.pem"), PathBuf::from("/path/to/client.pem"), PathBuf::from("/path/to/client-key.pem"), @@ -36,7 +37,7 @@ fn main() -> Result<()> { let unconnnected_client = Client::new(&config); let client = unconnnected_client.wait()?; - // Requests are created from the connected client. These calls return structures which + // Requests are created from the connected client. These calls return structures which // implement `Future`. This means the `Future` must be resolved before the action ever takes // place. // @@ -45,7 +46,7 @@ fn main() -> Result<()> { let put_result: () = put_request.wait()?; // Returns a `tikv_client::Error` on failure. println!("Put key \"{}\", value \"{}\".", KEY, VALUE); - // + // // Unlike a standard Rust HashMap all calls take owned values. This is because under the hood // protobufs must take ownership of the data. If we only took a borrow we'd need to internally // clone it. This is against Rust API guidelines, so you must manage this yourself. // @@ -61,13 +62,15 @@ fn main() -> Result<()> { // You can also set the `ColumnFamily` used by the request. // This is *advanced usage* and should have some special considerations. - let req = client.delete(key.clone()) + let req = client + .delete(key.clone()) .cf(CUSTOM_CF) .wait() .expect("Could not delete value"); println!("Key: {:?} deleted", key); - client.get(key) + client + .get(key) .cf("test_cf") .wait() .expect_err("Get returned value for not existing key"); @@ -83,14 +86,16 @@ fn main() -> Result<()> { let start: Key = b"k1".to_vec().into(); let end: Key = b"k2".to_vec().into(); - client.scan(start.clone()..end.clone(), 10) + client + .scan(start.clone()..end.clone(), 10) .cf("test_cf") .key_only() .wait() .expect("Could not scan"); let ranges = vec![start.clone()..end.clone(), start.clone()..end.clone()]; - client.batch_scan(ranges, 10) + client + .batch_scan(ranges, 10) .cf("test_cf") .key_only() .wait() diff --git a/src/errors.rs b/src/errors.rs index f9c93275..deee4291 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -15,7 +15,7 @@ use grpcio; use quick_error::quick_error; use std::{error, result}; -quick_error!{ +quick_error! { /// An error originating from the TiKV client or dependencies. /// /// This client currently uses [`quick_error`](https://docs.rs/quick-error/1.2.2/quick_error/) diff --git a/src/lib.rs b/src/lib.rs index ccaabf40..4ca64c78 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,13 +13,16 @@ #![recursion_limit = "128"] #![type_length_limit = "2097152"] +use futures::Future; use serde_derive::*; use std::{ - ops::{Deref, DerefMut, Bound, Range, RangeFrom, RangeFull, RangeInclusive, RangeTo, RangeToInclusive}, + ops::{ + Bound, Deref, DerefMut, Range, RangeFrom, RangeFull, RangeInclusive, RangeTo, + RangeToInclusive, + }, path::PathBuf, time::Duration, }; -use futures::Future; mod errors; pub mod raw; @@ -32,38 +35,38 @@ pub use crate::errors::Error; pub use crate::errors::Result; /// The key part of a key/value pair. -/// +/// /// In TiKV, keys are an ordered sequence of bytes. This has an advantage over choosing `String` as valid `UTF-8` is not required. This means that the user is permitted to store any data they wish, /// as long as it can be represented by bytes. (Which is to say, pretty much anything!) -/// +/// /// This is a *wrapper type* that implements `Deref>` so it can be used like one transparently. -/// +/// /// This type also implements `From` for many types. With one exception, these are all done without reallocation. Using a `&'static str`, like many examples do for simplicity, has an internal -/// allocation cost. -/// +/// allocation cost. +/// /// This type wraps around an owned value, so it should be treated it like `String` or `Vec` /// over a `&str` or `&[u8]`. -/// +/// /// ```rust /// use tikv_client::Key; -/// +/// /// let static_str: &'static str = "TiKV"; /// let from_static_str = Key::from(static_str); -/// +/// /// let string: String = String::from(static_str); /// let from_string = Key::from(string); /// assert_eq!(from_static_str, from_string); -/// +/// /// let vec: Vec = static_str.as_bytes().to_vec(); /// let from_vec = Key::from(vec); /// assert_eq!(from_static_str, from_vec); -/// +/// /// let bytes = static_str.as_bytes().to_vec(); /// let from_bytes = Key::from(bytes); /// assert_eq!(from_static_str, from_bytes); /// ``` -/// -/// **But, you should not need to worry about all this:** Many functions which accept a `Key` +/// +/// **But, you should not need to worry about all this:** Many functions which accept a `Key` /// accept an `Into`, which means all of the above types can be passed directly to those /// functions. #[derive(Default, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug)] @@ -114,13 +117,13 @@ impl AsRef<[u8]> for Key { impl Deref for Key { type Target = [u8]; - fn deref<'a>(&'a self) -> &'a Self::Target { + fn deref(&self) -> &Self::Target { &self.0 } } impl DerefMut for Key { - fn deref_mut<'a>(&'a mut self) -> &'a mut [u8] { + fn deref_mut(&mut self) -> &mut [u8] { &mut self.0 } } @@ -196,13 +199,13 @@ impl From<&'static str> for Value { impl Deref for Value { type Target = [u8]; - fn deref<'a>(&'a self) -> &'a Self::Target { + fn deref(&self) -> &Self::Target { &self.0 } } /// A key/value pair. -/// +/// /// ```rust /// # use tikv_client::{Key, Value, KvPair}; /// let key = "key"; @@ -211,7 +214,7 @@ impl Deref for Value { /// let from_tuple = KvPair::from((key, value)); /// assert_eq!(constructed, from_tuple); /// ``` -/// +/// /// **But, you should not need to worry about all this:** Many functions which accept a `KvPair` /// accept an `Into`, which means all of the above types can be passed directly to those /// functions. @@ -268,7 +271,10 @@ impl KvPair { } impl From<(K, V)> for KvPair -where K: Into, V: Into { +where + K: Into, + V: Into, +{ fn from((k, v): (K, V)) -> Self { KvPair(k.into(), v.into()) } @@ -276,12 +282,12 @@ where K: Into, V: Into { /// The configuration for either a [`raw::Client`](raw/struct.Client.html) or a /// [`transaction::Client`](transaction/struct.Client.html). -/// +/// /// Because TiKV is managed by a [PD](https://github.com/pingcap/pd/) cluster, the endpoints for PD /// must be provided, **not** the TiKV nodes. /// /// It's important to **include more than one PD endpoint** (include all, if possible!) This helps avoid having a *single point of failure*. -/// +/// /// By default, this client will use an insecure connection over encryption-on-the-wire. Your /// deployment may have chosen to rely on security measures such as a private network, or a VPN /// layer providing secure transmission. TiKV does not currently offer encryption-at-rest. @@ -298,9 +304,9 @@ pub struct Config { impl Config { /// Create a new [`Config`](struct.Config.html) which coordinates with the given PD endpoints. - /// + /// /// It's important to **include more than one PD endpoint** (include all, if possible!) This helps avoid having a *single point of failure*. - /// + /// /// ```rust /// # use tikv_client::Config; /// let config = Config::new(vec!["192.168.0.100:2379", "192.168.0.101:2379"]); @@ -316,9 +322,9 @@ impl Config { } /// Set the certificate authority, certificate, and key locations for the [`Config`](struct.Config.html). - /// + /// /// By default, TiKV connections do not have utilize transport layer security. Enable it by setting these values. - /// + /// /// ```rust /// # use tikv_client::Config; /// let config = Config::new(vec!["192.168.0.100:2379", "192.168.0.101:2379"]) @@ -358,18 +364,18 @@ pub type KvFuture = Box>; /// /// let explict_range: Range = Range { start: Key::from("Rust"), end: Key::from("TiKV") }; /// let from_explict_range = explict_range.into_bounds(); -/// +/// /// let range: Range<&str> = "Rust".."TiKV"; /// let from_range = range.into_bounds(); /// assert_eq!(from_explict_range, from_range); -/// +/// /// let range: RangeInclusive<&str> = "Rust"..="TiKV"; /// let from_range = range.into_bounds(); /// assert_eq!( /// (Bound::Included(Key::from("Rust")), Bound::Included(Key::from("TiKV"))), /// from_range /// ); -/// +/// /// let range_from: RangeFrom<&str> = "Rust"..; /// let from_range_from = range_from.into_bounds(); /// assert_eq!( @@ -383,7 +389,7 @@ pub type KvFuture = Box>; /// (Bound::Unbounded, Bound::Excluded(Key::from("TiKV"))), /// from_range_to, /// ); -/// +/// /// let range_to_inclusive: RangeToInclusive<&str> = ..="TiKV"; /// let from_range_to_inclusive = range_to_inclusive.into_bounds(); /// assert_eq!( @@ -414,10 +420,13 @@ fn range_to_keys(range: (Bound, Bound)) -> Result<(Key, Option)> Bound::Excluded(mut v) => { let len = v.len(); if len > 0 { - v.deref_mut().get_mut(len -1).map(|v| { *v += 1; v }); + v.deref_mut().get_mut(len - 1).map(|v| { + *v += 1; + v + }); } v - }, + } Bound::Unbounded => return Err(Error::InvalidKeyRange), }; let end = match range.1 { @@ -425,7 +434,10 @@ fn range_to_keys(range: (Bound, Bound)) -> Result<(Key, Option)> Bound::Excluded(mut v) => Some({ let len = v.len(); if len > 0 { - v.deref_mut().get_mut(len -1).map(|v| { *v -= 1; v }); + v.deref_mut().get_mut(len - 1).map(|v| { + *v -= 1; + v + }); } v }), diff --git a/src/raw.rs b/src/raw.rs index 6800dd47..674e3155 100644 --- a/src/raw.rs +++ b/src/raw.rs @@ -22,13 +22,16 @@ oracle, while the transactional interface does. */ use crate::{rpc::RpcClient, Config, Error, Key, KeyRange, KvFuture, KvPair, Result, Value}; -use futures::{future, Future, Poll, Async}; -use std::{u32, sync::Arc, ops::{Bound, Deref, RangeBounds, DerefMut}}; - +use futures::{future, Async, Future, Poll}; +use std::{ + ops::{Bound, Deref, DerefMut}, + sync::Arc, + u32, +}; /// The TiKV raw [`Client`](struct.Client.html) is used to issue requests to the TiKV server and PD cluster. pub struct Client { - rpc: Arc + rpc: Arc, } impl Client { @@ -70,7 +73,7 @@ impl Client { /// Create a new [`BatchGet`](struct.BatchGet.html) request. /// - /// Once resolved this request will result in the fetching of the values associated with the + /// Once resolved this request will result in the fetching of the values associated with the /// given keys. /// /// ```rust,no_run @@ -83,7 +86,10 @@ impl Client { /// let result: Vec = req.wait().unwrap(); /// ``` pub fn batch_get(&self, keys: impl IntoIterator>) -> BatchGet { - BatchGet::new(self.rpc(), BatchGetInner::new(keys.into_iter().map(Into::into).collect())) + BatchGet::new( + self.rpc(), + BatchGetInner::new(keys.into_iter().map(Into::into).collect()), + ) } /// Create a new [`Put`](struct.Put.html) request. @@ -120,7 +126,10 @@ impl Client { /// let result: () = req.wait().unwrap(); /// ``` pub fn batch_put(&self, pairs: impl IntoIterator>) -> BatchPut { - BatchPut::new(self.rpc(), BatchPutInner::new(pairs.into_iter().map(Into::into).collect())) + BatchPut::new( + self.rpc(), + BatchPutInner::new(pairs.into_iter().map(Into::into).collect()), + ) } /// Create a new [`Delete`](struct.Delete.html) request. @@ -154,7 +163,10 @@ impl Client { /// let result: () = req.wait().unwrap(); /// ``` pub fn batch_delete(&self, keys: impl IntoIterator>) -> BatchDelete { - BatchDelete::new(self.rpc(), BatchDeleteInner::new(keys.into_iter().map(Into::into).collect())) + BatchDelete::new( + self.rpc(), + BatchDeleteInner::new(keys.into_iter().map(Into::into).collect()), + ) } /// Create a new [`Scan`](struct.Scan.html) request. @@ -173,7 +185,10 @@ impl Client { /// let result: Vec = req.wait().unwrap(); /// ``` pub fn scan(&self, range: impl KeyRange, limit: impl Into>) -> Scan { - Scan::new(self.rpc(), ScanInner::new(range.into_bounds(), limit.into().unwrap_or(u32::MAX))) + Scan::new( + self.rpc(), + ScanInner::new(range.into_bounds(), limit.into().unwrap_or(u32::MAX)), + ) } /// Create a new [`BatchScan`](struct.BatchScan.html) request. @@ -228,13 +243,13 @@ impl Client { } /// An unresolved [`Client`](struct.Client.html) connection to a TiKV cluster. -/// +/// /// Once resolved it will result in a connected [`Client`](struct.Client.html). /// /// ```rust,no_run /// use tikv_client::{Config, raw::{Client, Connect}}; /// use futures::Future; -/// +/// /// let connect: Connect = Client::new(&Config::default()); /// let client: Client = connect.wait().unwrap(); /// ``` @@ -259,31 +274,30 @@ impl Future for Connect { } } - /// A [`ColumnFamily`](struct.ColumnFamily.html) is an optional parameter for [`raw::Client`](struct.Client.html) requests. -/// +/// /// TiKV uses RocksDB's `ColumnFamily` support. You can learn more about RocksDB's `ColumnFamily`s [on their wiki](https://github.com/facebook/rocksdb/wiki/Column-Families). -/// +/// /// By default in TiKV data is stored in three different `ColumnFamily` values, configurable in the TiKV server's configuration: -/// +/// /// * Default: Where real user data is stored. Set by `[rocksdb.defaultcf]`. /// * Write: Where MVCC and index related data are stored. Set by `[rocksdb.writecf]`. /// * Lock: Where lock information is stored. Set by `[rocksdb.lockcf]`. -/// +/// /// Not providing a call a `ColumnFamily` means it will use the default value of `default`. -/// +/// /// The best (and only) way to create a [`ColumnFamily`](struct.ColumnFamily.html) is via the `From` implementation: -/// +/// /// ```rust /// # use tikv_client::raw::ColumnFamily; -/// +/// /// let cf = ColumnFamily::from("write"); /// let cf = ColumnFamily::from(String::from("write")); /// let cf = ColumnFamily::from(&String::from("write")); /// ``` -/// +/// /// This is a *wrapper type* that implements `Deref` so it can be used like one transparently. -/// +/// /// **But, you should not need to worry about all this:** Many functions which accept a /// `ColumnFamily` accept an `Into`, which means all of the above types can be passed /// directly to those functions. @@ -312,8 +326,6 @@ impl Deref for ColumnFamily { } } - - pub trait RequestInner: Sized { type Resp; @@ -388,13 +400,30 @@ where } /// An unresolved [`Client::get`](struct.Client.html#method.get) request. -/// +/// /// Once resolved this request will result in the fetching of the value associated with the given /// key. pub struct Get { request: Request, } +impl Get { + fn new(client: Arc, inner: GetInner) -> Self { + Self { + request: Request::new(client, inner) + } + } +} + +impl Future for Get { + type Item = Value; + type Error = Error; + + fn poll(&mut self) -> Poll { + self.request.poll() + } +} + pub(crate) struct GetInner { key: Key, } @@ -421,6 +450,23 @@ pub struct BatchGet { request: Request, } +impl BatchGet { + fn new(client: Arc, inner: BatchGetInner) -> Self { + Self { + request: Request::new(client, inner) + } + } +} + +impl Future for BatchGet { + type Item = Vec; + type Error = Error; + + fn poll(&mut self) -> Poll { + self.request.poll() + } +} + pub(crate) struct BatchGetInner { keys: Vec, } @@ -440,13 +486,30 @@ impl BatchGetInner { } /// An unresolved [`Client::put`](struct.Client.html#method.put) request. -/// +/// /// Once resolved this request will result in the putting of the value associated with the given /// key. pub struct Put { request: Request, } +impl Put { + fn new(client: Arc, inner: PutInner) -> Self { + Self { + request: Request::new(client, inner) + } + } +} + +impl Future for Put { + type Item = (); + type Error = Error; + + fn poll(&mut self) -> Poll { + self.request.poll() + } +} + pub(crate) struct PutInner { key: Key, value: Value, @@ -468,12 +531,29 @@ impl RequestInner for PutInner { } /// An unresolved [`Client::batch_put`](struct.Client.html#method.batch_put) request. -/// +/// /// Once resolved this request will result in the setting of the value associated with the given key. pub struct BatchPut { request: Request, } +impl BatchPut { + fn new(client: Arc, inner: BatchPutInner) -> Self { + Self { + request: Request::new(client, inner) + } + } +} + +impl Future for BatchPut { + type Item = (); + type Error = Error; + + fn poll(&mut self) -> Poll { + self.request.poll() + } +} + pub(crate) struct BatchPutInner { pairs: Vec, } @@ -499,6 +579,23 @@ pub struct Delete { request: Request, } +impl Delete { + fn new(client: Arc, inner: DeleteInner) -> Self { + Self { + request: Request::new(client, inner) + } + } +} + +impl Future for Delete { + type Item = (); + type Error = Error; + + fn poll(&mut self) -> Poll { + self.request.poll() + } +} + pub(crate) struct DeleteInner { key: Key, } @@ -524,6 +621,23 @@ pub struct BatchDelete { request: Request, } +impl BatchDelete { + fn new(client: Arc, inner: BatchDeleteInner) -> Self { + Self { + request: Request::new(client, inner) + } + } +} + +impl Future for BatchDelete { + type Item = (); + type Error = Error; + + fn poll(&mut self) -> Poll { + self.request.poll() + } +} + pub(crate) struct BatchDeleteInner { keys: Vec, } @@ -542,7 +656,6 @@ impl RequestInner for BatchDeleteInner { } } - pub(crate) struct ScanInner { range: (Bound, Bound), limit: u32, @@ -690,6 +803,23 @@ pub struct DeleteRange { request: Request, } +impl DeleteRange { + fn new(client: Arc, inner: DeleteRangeInner) -> Self { + Self { + request: Request::new(client, inner) + } + } +} + +impl Future for DeleteRange { + type Item = (); + type Error = Error; + + fn poll(&mut self) -> Poll { + self.request.poll() + } +} + pub(crate) struct DeleteRangeInner { range: Result<(Key, Option)>, } diff --git a/src/rpc/tikv/client.rs b/src/rpc/tikv/client.rs index e7d85b3c..febe1541 100644 --- a/src/rpc/tikv/client.rs +++ b/src/rpc/tikv/client.rs @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{fmt, sync::Arc, time::Duration, ops::Bound}; +use std::{fmt, sync::Arc, time::Duration}; use futures::Future; use grpcio::{CallOption, Environment}; diff --git a/src/transaction.rs b/src/transaction.rs index 1d6e425c..28cb88d6 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -213,7 +213,7 @@ impl Transaction { } /// Rollback the actions of the transaction. - /// + /// /// ```rust,no_run /// # use tikv_client::{Config, transaction::Client}; /// # use futures::Future; @@ -229,7 +229,7 @@ impl Transaction { } /// Lock the given keys. - /// + /// /// ```rust,no_run /// # use tikv_client::{Config, transaction::Client}; /// # use futures::Future; @@ -315,7 +315,7 @@ impl Transaction { /// Create a new [`BatchGet`](struct.BatchGet.html) request. /// - /// Once resolved this request will result in the fetching of the values associated with the + /// Once resolved this request will result in the fetching of the values associated with the /// given keys. /// /// ```rust,no_run @@ -616,4 +616,4 @@ impl Future for Delete { let _key = &self.key; unimplemented!() } -} \ No newline at end of file +}