From 6bb3731e00beed0450fbcc94125445c22286c0a1 Mon Sep 17 00:00:00 2001 From: Hoverbear Date: Mon, 19 Nov 2018 09:09:10 -0800 Subject: [PATCH 1/9] Bump to 2018 edition. Signed-off-by: Hoverbear --- Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/Cargo.toml b/Cargo.toml index 1f481829..344150b4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ license = "Apache-2.0" authors = ["The TiKV Project Authors"] repository = "https://github.com/tikv/client-rust" description = "The rust language implementation of TiKV client." +edition = "2018" [lib] name = "tikv_client" From 81e5d1a5a67dd89ecd5f09724cf930388ecc68a0 Mon Sep 17 00:00:00 2001 From: Hoverbear Date: Mon, 19 Nov 2018 09:17:43 -0800 Subject: [PATCH 2/9] Add toolchain Signed-off-by: Hoverbear --- rust-toolchain | 4 ++++ src/errors.rs | 4 +++- src/lib.rs | 17 +++++------------ src/raw.rs | 2 +- src/transaction.rs | 2 +- 5 files changed, 14 insertions(+), 15 deletions(-) create mode 100644 rust-toolchain diff --git a/rust-toolchain b/rust-toolchain new file mode 100644 index 00000000..f22b1d64 --- /dev/null +++ b/rust-toolchain @@ -0,0 +1,4 @@ +nightly + +# We explicitly use nightly for the moment even though we target stable so we +# can develop for the 2018 edition. \ No newline at end of file diff --git a/src/errors.rs b/src/errors.rs index a8d81488..59ced9bd 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -11,6 +11,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use quick_error::quick_error; +use grpcio; use std::error; use std::result; @@ -22,7 +24,7 @@ quick_error!{ cause(err) description(err.description()) } - Grpc(err: ::grpc::Error) { + Grpc(err: grpcio::Error) { from() cause(err) description(err.description()) diff --git a/src/lib.rs b/src/lib.rs index 1827b26d..89db422e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,23 +11,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -extern crate futures; -extern crate serde; -#[macro_use] -extern crate serde_derive; -#[macro_use] -extern crate quick_error; -extern crate grpcio as grpc; +use std::ops::Deref; +use std::path::PathBuf; +use serde_derive::*; pub mod errors; pub mod raw; pub mod transaction; -use std::ops::Deref; -use std::path::PathBuf; - -pub use errors::Error; -pub use errors::Result; +pub use crate::errors::Error; +pub use crate::errors::Result; #[derive(Default, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug)] pub struct Key(Vec); diff --git a/src/raw.rs b/src/raw.rs index 0bad042d..62ef5a6e 100644 --- a/src/raw.rs +++ b/src/raw.rs @@ -15,7 +15,7 @@ use std::ops::RangeBounds; use futures::{Future, Poll}; -use {Config, Error, Key, KvPair, Value}; +use crate::{Config, Error, Key, KvPair, Value}; #[derive(Default, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug)] pub struct ColumnFamily(String); diff --git a/src/transaction.rs b/src/transaction.rs index ffa4021b..abe219e8 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -15,7 +15,7 @@ use std::ops::RangeBounds; use futures::{Future, Poll, Stream}; -use {Config, Error, Key, KvPair, Value}; +use crate::{Config, Error, Key, KvPair, Value}; #[derive(Copy, Clone)] pub struct Timestamp(u64); From eb481391d9449c9e8b0a9796678249a76872d615 Mon Sep 17 00:00:00 2001 From: Hoverbear Date: Mon, 19 Nov 2018 09:22:25 -0800 Subject: [PATCH 3/9] Fix examples Signed-off-by: Hoverbear --- examples/raw.rs | 6 +----- examples/transaction.rs | 24 +++++++++++++----------- 2 files changed, 14 insertions(+), 16 deletions(-) diff --git a/examples/raw.rs b/examples/raw.rs index c8bfbdfc..403c1e79 100644 --- a/examples/raw.rs +++ b/examples/raw.rs @@ -11,12 +11,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -extern crate futures; -extern crate tikv_client; - -use std::path::PathBuf; - use futures::future::Future; +use std::path::PathBuf; use tikv_client::*; fn main() { diff --git a/examples/transaction.rs b/examples/transaction.rs index 3c77c070..23a1e285 100644 --- a/examples/transaction.rs +++ b/examples/transaction.rs @@ -11,15 +11,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -extern crate futures; -extern crate tikv_client; - +use futures::{future, Future, Stream}; use std::ops::RangeBounds; use std::path::PathBuf; - -use futures::{future, Future, Stream}; -use tikv_client::transaction::{Client, IsolationLevel}; -use tikv_client::*; +use tikv_client::{ + transaction::{Client, IsolationLevel}, + Config, Key, KvPair, Value, +}; fn puts(client: &Client, pairs: impl IntoIterator>) { let mut txn = client.begin(); @@ -28,7 +26,8 @@ fn puts(client: &Client, pairs: impl IntoIterator>) { .into_iter() .map(Into::into) .map(|p| txn.set(p.key().clone(), p.value().clone())), - ).wait() + ) + .wait() .expect("Could not set key value pairs"); txn.commit().wait().expect("Could not commit transaction"); } @@ -49,10 +48,12 @@ fn scan(client: &Client, range: impl RangeBounds, mut limit: usize) { limit -= 1; true }) - }).for_each(|pair| { + }) + .for_each(|pair| { println!("{:?}", pair); Ok(()) - }).wait() + }) + .wait() .expect("Could not scan keys"); } @@ -63,7 +64,8 @@ fn dels(client: &Client, keys: impl IntoIterator) { .into_iter() .map(|p| { txn.delete(p).wait().expect("Could not delete key"); - }).collect(); + }) + .collect(); txn.commit().wait().expect("Could not commit transaction"); } From 4630ebcaa4011b95b7ef9f82696e489b3894f396 Mon Sep 17 00:00:00 2001 From: Hoverbear Date: Mon, 19 Nov 2018 09:23:55 -0800 Subject: [PATCH 4/9] fmt and lint Signed-off-by: Hoverbear --- src/errors.rs | 5 ++--- src/lib.rs | 2 +- src/raw.rs | 8 +++----- src/transaction.rs | 8 +++----- 4 files changed, 9 insertions(+), 14 deletions(-) diff --git a/src/errors.rs b/src/errors.rs index 59ced9bd..e3e81c39 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -11,10 +11,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use quick_error::quick_error; use grpcio; -use std::error; -use std::result; +use quick_error::quick_error; +use std::{error, result}; quick_error!{ #[derive(Debug)] diff --git a/src/lib.rs b/src/lib.rs index 89db422e..3589dc89 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,9 +11,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +use serde_derive::*; use std::ops::Deref; use std::path::PathBuf; -use serde_derive::*; pub mod errors; pub mod raw; diff --git a/src/raw.rs b/src/raw.rs index 62ef5a6e..93b2ee38 100644 --- a/src/raw.rs +++ b/src/raw.rs @@ -11,11 +11,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::ops::RangeBounds; - -use futures::{Future, Poll}; - use crate::{Config, Error, Key, KvPair, Value}; +use futures::{Future, Poll}; +use std::ops::RangeBounds; #[derive(Default, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug)] pub struct ColumnFamily(String); @@ -388,7 +386,7 @@ impl Future for Connect { pub struct Client; impl Client { - #![cfg_attr(feature = "cargo-clippy", allow(new_ret_no_self))] + #![cfg_attr(feature = "cargo-clippy", allow(clippy::new_ret_no_self))] pub fn new(config: &Config) -> Connect { Connect::new(config.clone()) } diff --git a/src/transaction.rs b/src/transaction.rs index abe219e8..3db2ae53 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -11,11 +11,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::ops::RangeBounds; - -use futures::{Future, Poll, Stream}; - use crate::{Config, Error, Key, KvPair, Value}; +use futures::{Future, Poll, Stream}; +use std::ops::RangeBounds; #[derive(Copy, Clone)] pub struct Timestamp(u64); @@ -302,7 +300,7 @@ impl Future for Connect { pub struct Client {} impl Client { - #![cfg_attr(feature = "cargo-clippy", allow(new_ret_no_self))] + #![cfg_attr(feature = "cargo-clippy", allow(clippy::new_ret_no_self))] pub fn new(config: &Config) -> Connect { Connect::new(config.clone()) } From 176c887bfcf53f3111dc74f8d7dff65ef23882c2 Mon Sep 17 00:00:00 2001 From: Hoverbear Date: Mon, 19 Nov 2018 09:04:25 -0800 Subject: [PATCH 5/9] Documentation, first pass --- src/lib.rs | 100 ++++++++++++++++++++++++++++++++++++++++----- src/raw.rs | 60 ++++++++++++++++++++++++++- src/transaction.rs | 9 ++-- 3 files changed, 152 insertions(+), 17 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 3589dc89..2cd4b48e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -22,16 +22,68 @@ pub mod transaction; pub use crate::errors::Error; pub use crate::errors::Result; +/// The key part of a key/value pair. +/// +/// In TiKV, keys are an ordered sequence of bytes. This has an advantage over choosing `String` as valid `UTF-8` is not required. +/// This means that the user is permitted to store any data they wish, as long as it can be represented by bytes. +/// +/// This type implements `Deref>` so it can be used like one transparently. +/// +/// This type contains an owned value, so it should be treated it like `String` or `Vec` over a `&str` or `&[u8]`. +/// +/// ```rust +/// # use tikv_client::Key; +/// let from_bytes = Key::from(b"TiKV".to_vec()); +/// let from_string = Key::from(String::from("TiKV")); +/// +/// assert_eq!(from_bytes, from_string); +/// assert_eq!(*from_bytes, b"TiKV".to_vec()); +/// ``` #[derive(Default, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug)] pub struct Key(Vec); + +/// The value part of a key/value pair. +/// +/// In TiKV, values are an ordered sequence of bytes. This has an advantage over choosing `String` as valid `UTF-8` is not required. +/// This means that the user is permitted to store any data they wish, as long as it can be represented by bytes. +/// +/// This type implements `Deref>` so it can be used like one transparently. +/// +/// This type contains an owned value, so it should be treated it like `String` or `Vec` over a `&str` or `&[u8]`. +/// +/// ```rust +/// # use tikv_client::Value; +/// let from_bytes = Value::from(b"TiKV".to_vec()); +/// let from_string = Value::from(String::from("TiKV")); +/// +/// assert_eq!(from_bytes, from_string); +/// assert_eq!(*from_bytes, b"TiKV".to_vec()); +/// ``` #[derive(Default, Clone, Eq, PartialEq, Hash, Debug)] pub struct Value(Vec); +/// A key/value pair. +/// +/// Used primarily in batch and scan requests. +/// +/// ```rust +/// # use tikv_client::{Key, Value, KvPair}; +/// let key = b"key".to_vec(); +/// let value = b"value".to_vec(); +/// let constructed = KvPair::new(key.clone(), value.clone()); +/// let from_tuple = KvPair::from((key.into(), value.into())); +/// assert_eq!(constructed, from_tuple); #[derive(Default, Clone, Eq, PartialEq, Debug)] pub struct KvPair(Key, Value); -impl Into for Vec { - fn into(self) -> Key { - Key(self) +impl From> for Key { + fn from(v: Vec) -> Self { + Key(v) + } +} + +impl From for Key { + fn from(v: String) -> Key { + Key(v.into_bytes()) } } @@ -49,9 +101,15 @@ impl Deref for Key { } } -impl Into for Vec { - fn into(self) -> Value { - Value(self) +impl From> for Value { + fn from(v: Vec) -> Self { + Value(v) + } +} + +impl From for Value { + fn from(v: String) -> Value { + Value(v.into_bytes()) } } @@ -64,8 +122,8 @@ impl Deref for Value { } impl KvPair { - pub fn new(key: Key, value: Value) -> Self { - KvPair(key, value) + pub fn new(key: impl Into, value: impl Into) -> Self { + KvPair(key.into(), value.into()) } pub fn key(&self) -> &Key { @@ -77,12 +135,17 @@ impl KvPair { } } -impl Into for (Key, Value) { - fn into(self) -> KvPair { - KvPair(self.0, self.1) +impl From<(Key, Value)> for KvPair { + fn from((k, v): (Key, Value)) -> KvPair { + KvPair(k, v) } } +/// The configuration of either a [`raw::Client`](raw/struct.Client.html) or a [`transaction::Client`](transaction/struct.Client.html). +/// +/// Because TiKV is managed by a [PD](https://github.com/pingcap/pd/) cluster the endpoints for PD must be provided, **not** the TiKV nodes. +/// +/// It's important to **include more than one PD endpoint** (include all, if possible!) This helps avoid having a *single point of failure*. #[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)] #[serde(default)] #[serde(rename_all = "kebab-case")] @@ -94,6 +157,12 @@ pub struct Config { } impl Config { + /// Create a new [`Config`](struct.Config.html) which coordinates with the given PD endpoints. + /// + /// ```rust + /// # use tikv_client::Config; + /// let config = Config::new(vec!["192.168.0.100:2379", "192.168.0.101:2379"]); + /// ``` pub fn new(pd_endpoints: impl IntoIterator>) -> Self { Config { pd_endpoints: pd_endpoints.into_iter().map(Into::into).collect(), @@ -103,6 +172,15 @@ impl Config { } } + /// Set the certificate authority, certificate, and key locations for the [`Config`](struct.Config.html). + /// + /// By default, TiKV connections do not have utilize transport layer security. Enable it by setting these values. + /// + /// ```rust + /// # use tikv_client::Config; + /// let config = Config::new(vec!["192.168.0.100:2379", "192.168.0.101:2379"]) + /// .with_security("root.ca", "internal.cert", "internal.key"); + /// ``` pub fn with_security( mut self, ca_path: impl Into, diff --git a/src/raw.rs b/src/raw.rs index 93b2ee38..9985a909 100644 --- a/src/raw.rs +++ b/src/raw.rs @@ -15,6 +15,26 @@ use crate::{Config, Error, Key, KvPair, Value}; use futures::{Future, Poll}; use std::ops::RangeBounds; +/// A [`ColumnFamily`](struct.ColumnFamily.html) is an optional parameter for [`raw::Client`](struct.Client.html) requests. +/// +/// TiKV uses RocksDB's `ColumnFamily` support. You can learn more about RocksDB's `ColumnFamily`s [on their wiki](https://github.com/facebook/rocksdb/wiki/Column-Families). +/// +/// By default in TiKV data is stored in three different `ColumnFamily` values, configurable in the TiKV server's configuration: +/// +/// * Default: Where real user data is stored. Set by `[rocksdb.defaultcf]`. +/// * Write: Where MVCC and index related data are stored. Set by `[rocksdb.writecf]`. +/// * Lock: Where lock information is stored. Set by `[rocksdb.lockcf]`. +/// +/// Not providing a call a `ColumnFamily` means it will use the default value of `default`. +/// +/// The best (and only) way to create a [`ColumnFamily`](struct.ColumnFamily.html) is via the `From` implementation: +/// +/// ```rust +/// # use tikv_client::raw::ColumnFamily; +/// let cf = ColumnFamily::from("write"); +/// let cf = ColumnFamily::from(String::from("write")); +/// let cf = ColumnFamily::from(&String::from("write")); +/// ``` #[derive(Default, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug)] pub struct ColumnFamily(String); @@ -27,6 +47,7 @@ where } } +/// A raw request to get the [`Value`](struct.Value.html) of a given [`Key`](struct.key.html). pub struct Get<'a> { client: &'a Client, key: Key, @@ -42,6 +63,7 @@ impl<'a> Get<'a> { } } + /// Set the (optional) [`ColumnFamily`](struct.ColumnFamily.html). pub fn cf(mut self, cf: impl Into) -> Self { self.cf = Some(cf.into()); self @@ -74,7 +96,7 @@ impl<'a> BatchGet<'a> { cf: None, } } - + /// Set the (optional) [`ColumnFamily`](struct.ColumnFamily.html). pub fn cf(mut self, cf: impl Into) -> Self { self.cf = Some(cf.into()); self @@ -110,6 +132,7 @@ impl<'a> Put<'a> { } } + /// Set the (optional) [`ColumnFamily`](struct.ColumnFamily.html). pub fn cf(mut self, cf: impl Into) -> Self { self.cf = Some(cf.into()); self @@ -144,6 +167,7 @@ impl<'a> BatchPut<'a> { } } + /// Set the (optional) [`ColumnFamily`](struct.ColumnFamily.html). pub fn cf(mut self, cf: impl Into) -> Self { self.cf = Some(cf.into()); self @@ -177,6 +201,7 @@ impl<'a> Delete<'a> { } } + /// Set the (optional) [`ColumnFamily`](struct.ColumnFamily.html). pub fn cf(mut self, cf: impl Into) -> Self { self.cf = Some(cf.into()); self @@ -210,6 +235,7 @@ impl<'a> BatchDelete<'a> { } } + /// Set the (optional) [`ColumnFamily`](struct.ColumnFamily.html). pub fn cf(mut self, cf: impl Into) -> Self { self.cf = Some(cf.into()); self @@ -254,6 +280,7 @@ impl<'a> Scan<'a> { self } + /// Set the (optional) [`ColumnFamily`](struct.ColumnFamily.html). pub fn cf(mut self, cf: impl Into) -> Self { self.cf = Some(cf.into()); self @@ -305,6 +332,7 @@ impl<'a> BatchScan<'a> { self } + /// Set the (optional) [`ColumnFamily`](struct.ColumnFamily.html). pub fn cf(mut self, cf: impl Into) -> Self { self.cf = Some(cf.into()); self @@ -345,6 +373,7 @@ impl<'a> DeleteRange<'a> { } } + /// Set the (optional) [`ColumnFamily`](struct.ColumnFamily.html). pub fn cf(mut self, cf: impl Into) -> Self { self.cf = Some(cf.into()); self @@ -363,6 +392,14 @@ impl<'a> Future for DeleteRange<'a> { } } +/// A future which resolves the initial connection between the [`Client`](struct.Client.html) and the TiKV cluster. +/// +/// ```rust,no_run +/// # use tikv_client::{Config, raw::{Client, Connect}}; +/// # use futures::Future; +/// let connect = Client::new(&Config::default()); +/// let client = connect.wait(); +/// ``` pub struct Connect { config: Config, } @@ -383,42 +420,59 @@ impl Future for Connect { } } +/// The TiKV raw [`Client`](struct.Client.html) is used to issue requests to the TiKV server and PD cluster. pub struct Client; impl Client { #![cfg_attr(feature = "cargo-clippy", allow(clippy::new_ret_no_self))] + /// Create a new [`Client`](struct.Client.html) once the [`Connect`](struct.Connect.html) resolves. + /// + /// ```rust,no_run + /// # use tikv_client::{Config, raw::{Client, Connect}}; + /// # use futures::Future; + /// let connect = Client::new(&Config::default()); + /// let client = connect.wait(); + /// ``` pub fn new(config: &Config) -> Connect { Connect::new(config.clone()) } + /// Create a new [`Get`](struct.Get.html) request. pub fn get(&self, key: impl AsRef) -> Get { Get::new(self, key.as_ref().clone()) } + /// Create a new [`BatchGet`](struct.BatchGet.html) request. pub fn batch_get(&self, keys: impl AsRef<[Key]>) -> BatchGet { BatchGet::new(self, keys.as_ref().to_vec()) } + /// Create a new [`Put`](struct.Put.html) request. pub fn put(&self, key: impl Into, value: impl Into) -> Put { Put::new(self, key.into(), value.into()) } - + + /// Create a new [`BatchPut`](struct.BatchPut.html) request. pub fn batch_put(&self, pairs: impl IntoIterator>) -> BatchPut { BatchPut::new(self, pairs.into_iter().map(Into::into).collect()) } + /// Create a new [`Delete`](struct.Delete.html) request. pub fn delete(&self, key: impl AsRef) -> Delete { Delete::new(self, key.as_ref().clone()) } + /// Create a new [`BatchDelete`](struct.BatchDelete.html) request. pub fn batch_delete(&self, keys: impl AsRef<[Key]>) -> BatchDelete { BatchDelete::new(self, keys.as_ref().to_vec()) } + /// Create a new [`Scan`](struct.Scan.html) request. pub fn scan(&self, range: impl RangeBounds, limit: u32) -> Scan { Scan::new(self, Self::extract_range(&range), limit) } + /// Create a new [`BatchScan`](struct.BatchScan.html) request. pub fn batch_scan(&self, ranges: Ranges, each_limit: u32) -> BatchScan where Ranges: AsRef<[Bounds]>, @@ -431,10 +485,12 @@ impl Client { ) } + /// Create a new [`DeleteRange`](struct.DeleteRange.html) request. pub fn delete_range(&self, range: impl RangeBounds) -> DeleteRange { DeleteRange::new(self, Self::extract_range(&range)) } + // Returns the bounds for a given [`RangeBounds`](struct.RangeBounds.html). fn extract_range(_range: &impl RangeBounds) -> (Key, Key) { unimplemented!() } diff --git a/src/transaction.rs b/src/transaction.rs index 3db2ae53..2b1c97ca 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -13,14 +13,15 @@ use crate::{Config, Error, Key, KvPair, Value}; use futures::{Future, Poll, Stream}; -use std::ops::RangeBounds; +use std::ops::{RangeBounds, Deref}; +/// A logical timestamp produced by PD. #[derive(Copy, Clone)] pub struct Timestamp(u64); -impl Into for u64 { - fn into(self) -> Timestamp { - Timestamp(self) +impl From for Timestamp { + fn from(v: u64) -> Self { + Timestamp(v) } } From fd2f08bc304c64025a91016fb43d683c87ba40d8 Mon Sep 17 00:00:00 2001 From: Hoverbear Date: Thu, 29 Nov 2018 17:03:32 -0800 Subject: [PATCH 6/9] Add more documentation for raw client --- src/errors.rs | 18 ++++ src/lib.rs | 191 ++++++++++++++++++++++++++++---------- src/raw.rs | 223 ++++++++++++++++++++++++++++++++++++++------- src/transaction.rs | 10 +- 4 files changed, 362 insertions(+), 80 deletions(-) diff --git a/src/errors.rs b/src/errors.rs index e3e81c39..e2da0547 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -16,57 +16,75 @@ use quick_error::quick_error; use std::{error, result}; quick_error!{ + /// An error originating from Raft or dependencies. + /// + /// This client currently uses [`quick_error`](https://docs.rs/quick-error/1.2.2/quick_error/) + /// for errors. *This may change in future versions.* #[derive(Debug)] pub enum Error { + /// Wraps a a `std::io::Error`. Io(err: ::std::io::Error) { from() cause(err) description(err.description()) } + /// Wraps a `grpcio::Error`. Grpc(err: grpcio::Error) { from() cause(err) description(err.description()) } + /// Represents that a futures oneshot channel was cancelled. Canceled(err: ::futures::sync::oneshot::Canceled) { from() cause(err) description(err.description()) } + /// An unknown error. + /// + /// Generally, this is not an expected error. Other(err: Box) { from() cause(err.as_ref()) description(err.description()) display("unknown error {:?}", err) } + /// A region was not found for the given key. RegionForKeyNotFound(key: Vec) { description("region is not found") display("region is not found for key {:?}", key) } + /// A region was not found. RegionNotFound(id: u64) { description("region is not found") display("region {:?} is not found", id) } + /// The peer is not a leader of the given region. NotLeader(region_id: u64) { description("peer is not leader") display("peer is not leader for region {:?}.", region_id) } + /// The store does not match. StoreNotMatch { description("store not match") display("store not match") } + /// The given key is not eithin the given region. KeyNotInRegion(key: Vec, region_id: u64, start_key: Vec, end_key: Vec) { description("region is not found") display("key {:?} is not in region {:?}: [{:?}, {:?})", key, region_id, start_key, end_key) } + /// A stale epoch. StaleEpoch { description("stale epoch") display("stale epoch") } + /// The server is too busy. ServerIsBusy(reason: String) { description("server is busy") display("server is busy: {:?}", reason) } + /// The given raft entry is too large for the region. RaftEntryTooLarge(region_id: u64, entry_size: u64) { description("raft entry too large") display("{:?} bytes raft entry of region {:?} is too large", entry_size, region_id) diff --git a/src/lib.rs b/src/lib.rs index 2cd4b48e..f67b3bb2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,68 +12,54 @@ // limitations under the License. use serde_derive::*; -use std::ops::Deref; -use std::path::PathBuf; +use std::{ops::{Deref, Bound}, path::PathBuf}; -pub mod errors; +mod errors; pub mod raw; pub mod transaction; +#[doc(inline)] pub use crate::errors::Error; +#[doc(inline)] pub use crate::errors::Result; /// The key part of a key/value pair. /// -/// In TiKV, keys are an ordered sequence of bytes. This has an advantage over choosing `String` as valid `UTF-8` is not required. -/// This means that the user is permitted to store any data they wish, as long as it can be represented by bytes. +/// In TiKV, keys are an ordered sequence of bytes. This has an advantage over choosing `String` as valid `UTF-8` is not required. This means that the user is permitted to store any data they wish, +/// as long as it can be represented by bytes. (Which is to say, pretty much anything!) /// -/// This type implements `Deref>` so it can be used like one transparently. +/// This is a *wrapper type* that implements `Deref>` so it can be used like one transparently. /// -/// This type contains an owned value, so it should be treated it like `String` or `Vec` over a `&str` or `&[u8]`. +/// This type also implements `From` for many types. With one exception, these are all done without reallocation. Using a `&'static str`, like many examples do for simplicity, has an internal +/// allocation cost. /// -/// ```rust -/// # use tikv_client::Key; -/// let from_bytes = Key::from(b"TiKV".to_vec()); -/// let from_string = Key::from(String::from("TiKV")); +/// This type wraps around an owned value, so it should be treated it like `String` or `Vec` +/// over a `&str` or `&[u8]`. /// -/// assert_eq!(from_bytes, from_string); -/// assert_eq!(*from_bytes, b"TiKV".to_vec()); -/// ``` -#[derive(Default, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug)] -pub struct Key(Vec); - -/// The value part of a key/value pair. -/// -/// In TiKV, values are an ordered sequence of bytes. This has an advantage over choosing `String` as valid `UTF-8` is not required. -/// This means that the user is permitted to store any data they wish, as long as it can be represented by bytes. +/// ```rust +/// use tikv_client::Key; /// -/// This type implements `Deref>` so it can be used like one transparently. +/// let static_str: &'static str = "TiKV"; +/// let from_static_str = Key::from(static_str); /// -/// This type contains an owned value, so it should be treated it like `String` or `Vec` over a `&str` or `&[u8]`. +/// let string: String = String::from(static_str); +/// let from_string = Key::from(string); +/// assert_eq!(from_static_str, from_string); /// -/// ```rust -/// # use tikv_client::Value; -/// let from_bytes = Value::from(b"TiKV".to_vec()); -/// let from_string = Value::from(String::from("TiKV")); +/// let vec: Vec = static_str.as_bytes().to_vec(); +/// let from_vec = Key::from(vec); +/// assert_eq!(from_static_str, from_vec); /// -/// assert_eq!(from_bytes, from_string); -/// assert_eq!(*from_bytes, b"TiKV".to_vec()); +/// let bytes = static_str.as_bytes().to_vec(); +/// let from_bytes = Key::from(bytes); +/// assert_eq!(from_static_str, from_bytes); /// ``` -#[derive(Default, Clone, Eq, PartialEq, Hash, Debug)] -pub struct Value(Vec); -/// A key/value pair. -/// -/// Used primarily in batch and scan requests. /// -/// ```rust -/// # use tikv_client::{Key, Value, KvPair}; -/// let key = b"key".to_vec(); -/// let value = b"value".to_vec(); -/// let constructed = KvPair::new(key.clone(), value.clone()); -/// let from_tuple = KvPair::from((key.into(), value.into())); -/// assert_eq!(constructed, from_tuple); -#[derive(Default, Clone, Eq, PartialEq, Debug)] -pub struct KvPair(Key, Value); +/// **But, you should not need to worry about all this:** Many functions which accept a `Key` +/// accept an `Into`, which means all of the above types can be passed directly to those +/// functions. +#[derive(Default, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug)] +pub struct Key(Vec); impl From> for Key { fn from(v: Vec) -> Self { @@ -87,6 +73,14 @@ impl From for Key { } } +impl<'a> From<&'static str> for Key { + fn from(v: &'static str) -> Key { + let mut vec = Vec::new(); + vec.extend_from_slice(v.as_bytes()); + Key(vec) + } +} + impl AsRef for Key { fn as_ref(&self) -> &Self { self @@ -101,6 +95,50 @@ impl Deref for Key { } } +impl Key { + pub fn new(value: Vec) -> Self { + Key(value) + } +} + +/// The value part of a key/value pair. +/// +/// In TiKV, values are an ordered sequence of bytes. This has an advantage over choosing `String` as valid `UTF-8` is not required. This means that the user is permitted to store any data they wish, +/// as long as it can be represented by bytes. (Which is to say, pretty much anything!) +/// +/// This is a *wrapper type* that implements `Deref>` so it can be used like one transparently. +/// +/// This type also implements `From` for many types. With one exception, these are all done without reallocation. Using a `&'static str`, like many examples do for simplicity, has an internal +/// allocation cost. +/// +/// This type wraps around an owned value, so it should be treated it like `String` or `Vec` +/// over a `&str` or `&[u8]`. +/// +/// ```rust +/// use tikv_client::Value; +/// +/// let static_str: &'static str = "TiKV"; +/// let from_static_str = Value::from(static_str); +/// +/// let string: String = String::from(static_str); +/// let from_string = Value::from(string); +/// assert_eq!(from_static_str, from_string); +/// +/// let vec: Vec = static_str.as_bytes().to_vec(); +/// let from_vec = Value::from(vec); +/// assert_eq!(from_static_str, from_vec); +/// +/// let bytes = static_str.as_bytes().to_vec(); +/// let from_bytes = Value::from(bytes); +/// assert_eq!(from_static_str, from_bytes); +/// ``` +/// +/// **But, you should not need to worry about all this:** Many functions which accept a `Value` +/// accept an `Into`, which means all of the above types can be passed directly to those +/// functions. +#[derive(Default, Clone, Eq, PartialEq, Hash, Debug)] +pub struct Value(Vec); + impl From> for Value { fn from(v: Vec) -> Self { Value(v) @@ -113,6 +151,14 @@ impl From for Value { } } +impl From<&'static str> for Value { + fn from(v: &'static str) -> Value { + let mut vec = Vec::new(); + vec.extend_from_slice(v.as_bytes()); + Value(vec) + } +} + impl Deref for Value { type Target = Vec; @@ -121,23 +167,64 @@ impl Deref for Value { } } +/// A key/value pair. +/// +/// ```rust +/// # use tikv_client::{Key, Value, KvPair}; +/// let key = "key"; +/// let value = "value"; +/// let constructed = KvPair::new(key, value); +/// let from_tuple = KvPair::from((key, value)); +/// assert_eq!(constructed, from_tuple); +/// ``` +/// +/// **But, you should not need to worry about all this:** Many functions which accept a `KvPair` +/// accept an `Into`, which means all of the above types can be passed directly to those +/// functions. +#[derive(Default, Clone, Eq, PartialEq, Debug)] +pub struct KvPair(Key, Value); + impl KvPair { + /// Create a new `KvPair`. pub fn new(key: impl Into, value: impl Into) -> Self { KvPair(key.into(), value.into()) } + /// Immutably borrow the `Key` part of the `KvPair`. pub fn key(&self) -> &Key { &self.0 } + /// Immutably borrow the `Value` part of the `KvPair`. pub fn value(&self) -> &Value { &self.1 } + + /// Mutably borrow the `Key` part of the `KvPair`. + pub fn key_mut(&mut self) -> &mut Key { + &mut self.0 + } + + /// Mutably borrow the `Value` part of the `KvPair`. + pub fn value_mut(&mut self) -> &mut Value { + &mut self.1 + } + + /// Set the `Key` part of the `KvPair`. + pub fn set_key(&mut self, k: impl Into) { + self.0 = k.into(); + } + + /// Set the `Value` part of the `KvPair`. + pub fn set_value(&mut self, v: impl Into) { + self.1 = v.into(); + } } -impl From<(Key, Value)> for KvPair { - fn from((k, v): (Key, Value)) -> KvPair { - KvPair(k, v) +impl From<(K, V)> for KvPair +where K: Into, V: Into { + fn from((k, v): (K, V)) -> Self { + KvPair(k.into(), v.into()) } } @@ -193,3 +280,15 @@ impl Config { self } } + +fn transmute_bound(b: Bound<&K>) -> Bound +where + K: Into + Clone, +{ + use std::ops::Bound::*; + match b { + Included(k) => Included(k.clone().into()), + Excluded(k) => Excluded(k.clone().into()), + Unbounded => Unbounded, + } +} diff --git a/src/raw.rs b/src/raw.rs index 9985a909..f22d2908 100644 --- a/src/raw.rs +++ b/src/raw.rs @@ -11,9 +11,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::{Config, Error, Key, KvPair, Value}; +/*! Raw related functionality. + +Using the [`raw::Client`](struct.Client.html) you can utilize TiKV's raw interface. + +This interface offers optimal performance as it does not require coordination with a timestamp +oracle, while the transactional interface does. + +**Warning:** It is not advisible to use the both raw and transactional functionality in the same keyspace. + */ + +use crate::{transmute_bound, Config, Error, Key, KvPair, Value}; use futures::{Future, Poll}; -use std::ops::RangeBounds; +use std::{u32, ops::{RangeBounds, Bound}}; /// A [`ColumnFamily`](struct.ColumnFamily.html) is an optional parameter for [`raw::Client`](struct.Client.html) requests. /// @@ -186,6 +196,19 @@ impl<'a> Future for BatchPut<'a> { } } +/// An unresolved delete request. +/// +/// Once resolved this request will result in the deletion of the given key. +/// +/// ```rust,no_run +/// use tikv_client::{Config, raw::Client}; +/// use futures::Future; +/// let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); +/// let connected_client = connecting_client.wait().unwrap(); +/// let key = "TiKV"; +/// let delete_req = connected_client.delete(key); +/// delete_req.wait(); +/// ``` pub struct Delete<'a> { client: &'a Client, key: Key, @@ -256,7 +279,7 @@ impl<'a> Future for BatchDelete<'a> { pub struct Scan<'a> { client: &'a Client, - range: (Key, Key), + range: (Bound, Bound), limit: u32, key_only: bool, cf: Option, @@ -264,7 +287,7 @@ pub struct Scan<'a> { } impl<'a> Scan<'a> { - fn new(client: &'a Client, range: (Key, Key), limit: u32) -> Self { + fn new(client: &'a Client, range: (Bound, Bound), limit: u32) -> Self { Scan { client, range, @@ -308,7 +331,7 @@ impl<'a> Future for Scan<'a> { pub struct BatchScan<'a> { client: &'a Client, - ranges: Vec<(Key, Key)>, + ranges: Vec<(Bound, Bound)>, each_limit: u32, key_only: bool, cf: Option, @@ -316,7 +339,7 @@ pub struct BatchScan<'a> { } impl<'a> BatchScan<'a> { - fn new(client: &'a Client, ranges: Vec<(Key, Key)>, each_limit: u32) -> Self { + fn new(client: &'a Client, ranges: Vec<(Bound, Bound)>, each_limit: u32) -> Self { BatchScan { client, ranges, @@ -360,12 +383,12 @@ impl<'a> Future for BatchScan<'a> { pub struct DeleteRange<'a> { client: &'a Client, - range: (Key, Key), + range: (Bound, Bound), cf: Option, } impl<'a> DeleteRange<'a> { - fn new(client: &'a Client, range: (Key, Key)) -> Self { + fn new(client: &'a Client, range: (Bound, Bound)) -> Self { DeleteRange { client, range, @@ -424,74 +447,208 @@ impl Future for Connect { pub struct Client; impl Client { - #![cfg_attr(feature = "cargo-clippy", allow(clippy::new_ret_no_self))] /// Create a new [`Client`](struct.Client.html) once the [`Connect`](struct.Connect.html) resolves. /// /// ```rust,no_run - /// # use tikv_client::{Config, raw::{Client, Connect}}; - /// # use futures::Future; + /// use tikv_client::{Config, raw::{Client, Connect}}; + /// use futures::Future; /// let connect = Client::new(&Config::default()); /// let client = connect.wait(); /// ``` + #[cfg_attr(feature = "cargo-clippy", allow(clippy::new_ret_no_self))] pub fn new(config: &Config) -> Connect { Connect::new(config.clone()) } /// Create a new [`Get`](struct.Get.html) request. - pub fn get(&self, key: impl AsRef) -> Get { - Get::new(self, key.as_ref().clone()) + /// + /// Once resolved this request will result in the fetching of the value associated with the given key. + /// + /// ```rust,no_run + /// # use tikv_client::{Value, Config, raw::Client}; + /// # use futures::Future; + /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.wait().unwrap(); + /// let key = "TiKV"; + /// let req = connected_client.get(key); + /// let result: Value = req.wait().unwrap(); + /// ``` + pub fn get(&self, key: impl Into) -> Get { + Get::new(self, key.into()) } /// Create a new [`BatchGet`](struct.BatchGet.html) request. - pub fn batch_get(&self, keys: impl AsRef<[Key]>) -> BatchGet { - BatchGet::new(self, keys.as_ref().to_vec()) + /// + /// Once resolved this request will result in the fetching of the values associated with the given keys. + /// + /// ```rust,no_run + /// # use tikv_client::{KvPair, Config, raw::Client}; + /// # use futures::Future; + /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.wait().unwrap(); + /// let keys = vec!["TiKV", "TiDB"]; + /// let req = connected_client.batch_get(keys); + /// let result: Vec = req.wait().unwrap(); + /// ``` + pub fn batch_get(&self, keys: impl IntoIterator>) -> BatchGet { + BatchGet::new(self, keys.into_iter().map(|v| v.into()).collect()) } /// Create a new [`Put`](struct.Put.html) request. + /// + /// Once resolved this request will result in the setting of the value associated with the given key. + /// + /// ```rust,no_run + /// # use tikv_client::{Key, Value, Config, raw::Client}; + /// # use futures::Future; + /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.wait().unwrap(); + /// let key = "TiKV"; + /// let val = "TiKV"; + /// let req = connected_client.put(key, val); + /// let result: () = req.wait().unwrap(); + /// ``` pub fn put(&self, key: impl Into, value: impl Into) -> Put { Put::new(self, key.into(), value.into()) } /// Create a new [`BatchPut`](struct.BatchPut.html) request. + /// + /// Once resolved this request will result in the setting of the value associated with the given key. + /// + /// ```rust,no_run + /// # use tikv_client::{Error, Result, KvPair, Key, Value, Config, raw::Client}; + /// # use futures::Future; + /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.wait().unwrap(); + /// let kvpair1 = ("PD", "Go"); + /// let kvpair2 = ("TiKV", "Rust"); + /// let iterable = vec![kvpair1, kvpair2]; + /// let req = connected_client.batch_put(iterable); + /// let result: () = req.wait().unwrap(); + /// ``` pub fn batch_put(&self, pairs: impl IntoIterator>) -> BatchPut { BatchPut::new(self, pairs.into_iter().map(Into::into).collect()) } /// Create a new [`Delete`](struct.Delete.html) request. - pub fn delete(&self, key: impl AsRef) -> Delete { - Delete::new(self, key.as_ref().clone()) + /// + /// Once resolved this request will result in the deletion of the given key. + /// + /// ```rust,no_run + /// # use tikv_client::{Key, Config, raw::Client}; + /// # use futures::Future; + /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.wait().unwrap(); + /// let key = "TiKV"; + /// let req = connected_client.delete(key); + /// let result: () = req.wait().unwrap(); + /// ``` + pub fn delete(&self, key: impl Into) -> Delete { + Delete::new(self, key.into()) } /// Create a new [`BatchDelete`](struct.BatchDelete.html) request. - pub fn batch_delete(&self, keys: impl AsRef<[Key]>) -> BatchDelete { - BatchDelete::new(self, keys.as_ref().to_vec()) + /// + /// Once resolved this request will result in the deletion of the given keys. + /// + /// ```rust,no_run + /// # use tikv_client::{Config, raw::Client}; + /// # use futures::Future; + /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.wait().unwrap(); + /// let keys = vec!["TiKV", "TiDB"]; + /// let req = connected_client.batch_delete(keys); + /// let result: () = req.wait().unwrap(); + /// ``` + pub fn batch_delete(&self, keys: impl IntoIterator>) -> BatchDelete { + BatchDelete::new(self, keys.into_iter().map(Into::into).collect()) } /// Create a new [`Scan`](struct.Scan.html) request. - pub fn scan(&self, range: impl RangeBounds, limit: u32) -> Scan { - Scan::new(self, Self::extract_range(&range), limit) + /// + /// Once resolved this request will result in a scanner over the given keys. + /// + /// If not passed a `limit` parameter, it will default to `u32::MAX`. + /// + /// ```rust,no_run + /// # use tikv_client::{KvPair, Config, raw::Client}; + /// # use futures::Future; + /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.wait().unwrap(); + /// let inclusive_range = "TiKV"..="TiDB"; + /// let req = connected_client.scan(inclusive_range, 2); + /// let result: Vec = req.wait().unwrap(); + /// ``` + pub fn scan(&self, range: impl RangeBounds, limit: impl Into>) -> Scan + where + K: Into + Clone, + { + Scan::new(self, + (transmute_bound(range.start_bound()), transmute_bound(range.end_bound())), + limit.into().unwrap_or(u32::MAX) + ) } /// Create a new [`BatchScan`](struct.BatchScan.html) request. - pub fn batch_scan(&self, ranges: Ranges, each_limit: u32) -> BatchScan + /// + /// Once resolved this request will result in a set of scanners over the given keys. + /// + /// If not passed a `limit` parameter, it will default to `u32::MAX`. + /// + /// ```rust,no_run + /// # use tikv_client::{Key, Config, raw::Client}; + /// # use futures::Future; + /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.wait().unwrap(); + /// let inclusive_range1 = "TiDB"..="TiKV"; + /// let inclusive_range2 = "TiKV"..="TiSpark"; + /// let iterable = vec![inclusive_range1, inclusive_range2]; + /// let req = connected_client.batch_scan(iterable, 2); + /// let result = req.wait(); + /// ``` + pub fn batch_scan( + &self, + ranges: impl IntoIterator>, + each_limit: impl Into>, + ) -> BatchScan where - Ranges: AsRef<[Bounds]>, - Bounds: RangeBounds, + K: Into + Clone, { BatchScan::new( self, - ranges.as_ref().iter().map(Self::extract_range).collect(), - each_limit, + ranges.into_iter().map(|v| + (transmute_bound(v.start_bound()), transmute_bound(v.end_bound())) + ).collect(), + each_limit.into().unwrap_or(u32::MAX), ) } /// Create a new [`DeleteRange`](struct.DeleteRange.html) request. - pub fn delete_range(&self, range: impl RangeBounds) -> DeleteRange { - DeleteRange::new(self, Self::extract_range(&range)) - } - - // Returns the bounds for a given [`RangeBounds`](struct.RangeBounds.html). - fn extract_range(_range: &impl RangeBounds) -> (Key, Key) { - unimplemented!() + /// + /// Once resolved this request will result in the deletion of all keys over the given range. + /// + /// If not passed a `limit` parameter, it will default to `u32::MAX`. + /// + /// ```rust,no_run + /// # use tikv_client::{Key, Config, raw::Client}; + /// # use futures::Future; + /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.wait().unwrap(); + /// let inclusive_range = "TiKV"..="TiDB"; + /// let req = connected_client.delete_range(inclusive_range); + /// let result: () = req.wait().unwrap(); + /// ``` + pub fn delete_range(&self, range: impl RangeBounds) -> DeleteRange + where + K: Into + Clone, + { + DeleteRange::new( + self, + ( + transmute_bound(range.start_bound()), + transmute_bound(range.end_bound()), + ), + ) } } diff --git a/src/transaction.rs b/src/transaction.rs index 2b1c97ca..8cfc39f3 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -11,9 +11,17 @@ // See the License for the specific language governing permissions and // limitations under the License. +/*! Transactional related functionality. + +Using the [`transaction::Client`](struct.Client.html) you can utilize TiKV's transactional interface. + +This interface offers SQL-like transactions on top of the raw interface. + +**Warning:** It is not advisible to use the both raw and transactional functionality in the same keyspace. + */ use crate::{Config, Error, Key, KvPair, Value}; use futures::{Future, Poll, Stream}; -use std::ops::{RangeBounds, Deref}; +use std::ops::RangeBounds; /// A logical timestamp produced by PD. #[derive(Copy, Clone)] From 9034c02646963160752c900ae5f75f196bdffa4f Mon Sep 17 00:00:00 2001 From: Hoverbear Date: Fri, 7 Dec 2018 10:04:36 -0800 Subject: [PATCH 7/9] Document various parts of the raw/transaction interface. --- examples/raw.rs | 67 ++++-- src/lib.rs | 28 ++- src/raw.rs | 557 ++++++++++++++++++++++++--------------------- src/transaction.rs | 68 +++++- 4 files changed, 432 insertions(+), 288 deletions(-) diff --git a/examples/raw.rs b/examples/raw.rs index 403c1e79..0be637a9 100644 --- a/examples/raw.rs +++ b/examples/raw.rs @@ -13,36 +13,56 @@ use futures::future::Future; use std::path::PathBuf; -use tikv_client::*; +use tikv_client::{Result, Config, raw::Client, Key, Value}; -fn main() { - let config = Config::new(vec!["127.0.0.1:3379"]).with_security( +const KEY: &str = "TiKV"; +const VALUE: &str = "Rust"; +const CUSTOM_CF: &str = "custom_cf"; + +fn main() -> Result<()> { + // Create a configuration to use for the example. + // Optionally encrypt the traffic. + let config = Config::new(vec![ + "192.168.0.101:3379", // Avoid a single point of failure, + "192.168.0.100:3379", // use at least two PD endpoints. + ]).with_security( PathBuf::from("/path/to/ca.pem"), PathBuf::from("/path/to/client.pem"), PathBuf::from("/path/to/client-key.pem"), ); - let raw = raw::Client::new(&config) - .wait() - .expect("Could not connect to tikv"); - let key: Key = b"Company".to_vec().into(); - let value: Value = b"PingCAP".to_vec().into(); + // When we first create a client we recieve a `Connect` structure which must be resolved before + // the client is actually connected and usable. + let unconnnected_client = Client::new(&config); + let client = unconnnected_client.wait()?; - raw.put(key.clone(), value.clone()) - .cf("test_cf") - .wait() - .expect("Could not put kv pair to tikv"); - println!("Successfully put {:?}:{:?} to tikv", key, value); + // Requests are created from the connected client. These calls return structures which + // implement `Future`. This means the `Future` must be resolved before the action ever takes + // place. + // + // Here we set the key `TiKV` to have the value `Rust` associated with it. + let put_request = client.put(KEY, VALUE); + let put_result: () = put_request.wait()?; // Returns a `tikv_client::Error` on failure. + println!("Put key \"{}\", value \"{}\".", KEY, VALUE); - let value = raw - .get(&key) - .cf("test_cf") - .wait() - .expect("Could not get value"); - println!("Found val: {:?} for key: {:?}", value, key); + // + // Unlike a standard Rust HashMap all calls take owned values. This is because under the hood + // protobufs must take ownership of the data. If we only took a borrow we'd need to internally // clone it. This is against Rust API guidelines, so you must manage this yourself. + // + // Above, you saw we can use a `&'static str`, this is primarily for making examples short. + // This type is practical to use for real things, and usage forces an internal copy. + // + // It is best to pass a `Vec` in terms of explictness and speed. `String`s and a few other + // types are supported as well, but it all ends up as `Vec` in the end. + let key: String = String::from(KEY); + let value: Value = client.get(key).wait()?; + assert_eq!(value.as_ref(), VALUE); + println!("Get key \"{:?}\" returned value \"{:?}\".", value, KEY); - raw.delete(&key) - .cf("test_cf") + // You can also set the `ColumnFamily` used by the request. + // This is *advanced usage* and should have some special considerations. + let req = raw.delete(&key) + .cf(CUSTOM_CF) .wait() .expect("Could not delete value"); println!("Key: {:?} deleted", key); @@ -70,9 +90,12 @@ fn main() { .expect("Could not scan"); let ranges = [&start..&end, &start..&end]; - raw.batch_scan(&ranges, 10) + raw.batch_scan(ranges, 10) .cf("test_cf") .key_only() .wait() .expect("Could not batch scan"); + + // Cleanly exit. + Ok(()) } diff --git a/src/lib.rs b/src/lib.rs index f67b3bb2..057052d4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -228,24 +228,32 @@ where K: Into, V: Into { } } -/// The configuration of either a [`raw::Client`](raw/struct.Client.html) or a [`transaction::Client`](transaction/struct.Client.html). +/// The configuration for either a [`raw::Client`](raw/struct.Client.html) or a +/// [`transaction::Client`](transaction/struct.Client.html). /// -/// Because TiKV is managed by a [PD](https://github.com/pingcap/pd/) cluster the endpoints for PD must be provided, **not** the TiKV nodes. +/// Because TiKV is managed by a [PD](https://github.com/pingcap/pd/) cluster, the endpoints for PD +/// must be provided, **not** the TiKV nodes. /// /// It's important to **include more than one PD endpoint** (include all, if possible!) This helps avoid having a *single point of failure*. +/// +/// By default, this client will use an insecure connection over encryption-on-the-wire. Your +/// deployment may have chosen to rely on security measures such as a private network, or a VPN +/// layer providing secure transmission. TiKV does not currently offer encryption-at-rest. #[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)] #[serde(default)] #[serde(rename_all = "kebab-case")] pub struct Config { - pub pd_endpoints: Vec, - pub ca_path: Option, - pub cert_path: Option, - pub key_path: Option, + pd_endpoints: Vec, + ca_path: Option, + cert_path: Option, + key_path: Option, } impl Config { /// Create a new [`Config`](struct.Config.html) which coordinates with the given PD endpoints. /// + /// It's important to **include more than one PD endpoint** (include all, if possible!) This helps avoid having a *single point of failure*. + /// /// ```rust /// # use tikv_client::Config; /// let config = Config::new(vec!["192.168.0.100:2379", "192.168.0.101:2379"]); @@ -281,6 +289,14 @@ impl Config { } } +/// Unfortunately due to the API limitations of `RangeBound` we can only ever get an `&` (not a +/// `&mut` or owned value) of a `Bound`. So in order to coherce the value into the bound we want +/// we must clone the data. +/// +/// The only way to avoid this would be to only accept two `Bound` arguments, not one +/// `RangeBound` argument. This would mean `"abc"..="xyz"` would not be possible. +/// +// TODO: If you feel tricky please try to remove this clone! fn transmute_bound(b: Bound<&K>) -> Bound where K: Into + Clone, diff --git a/src/raw.rs b/src/raw.rs index f22d2908..a871569d 100644 --- a/src/raw.rs +++ b/src/raw.rs @@ -23,7 +23,259 @@ oracle, while the transactional interface does. use crate::{transmute_bound, Config, Error, Key, KvPair, Value}; use futures::{Future, Poll}; -use std::{u32, ops::{RangeBounds, Bound}}; +use std::{u32, ops::{Deref, RangeBounds, Bound}}; + +/// The TiKV raw [`Client`](struct.Client.html) is used to issue requests to the TiKV server and PD cluster. +pub struct Client; + +impl Client { + /// Create a new [`Client`](struct.Client.html) once the [`Connect`](struct.Connect.html) resolves. + /// + /// ```rust,no_run + /// use tikv_client::{Config, raw::Client}; + /// use futures::Future; + /// let connect = Client::new(&Config::default()); + /// let client = connect.wait(); + /// ``` + #[cfg_attr(feature = "cargo-clippy", allow(clippy::new_ret_no_self))] + pub fn new(config: &Config) -> Connect { + Connect::new(config.clone()) + } + + /// Create a new [`Get`](struct.Get.html) request. + /// + /// Once resolved this request will result in the fetching of the value associated with the given key. + /// + /// ```rust,no_run + /// # use tikv_client::{Value, Config, raw::Client}; + /// # use futures::Future; + /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.wait().unwrap(); + /// let key = "TiKV"; + /// let req = connected_client.get(key); + /// let result: Value = req.wait().unwrap(); + /// ``` + pub fn get(&self, key: impl Into) -> Get { + Get::new(self, key.into()) + } + + /// Create a new [`BatchGet`](struct.BatchGet.html) request. + /// + /// Once resolved this request will result in the fetching of the values associated with the given keys. + /// + /// ```rust,no_run + /// # use tikv_client::{KvPair, Config, raw::Client}; + /// # use futures::Future; + /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.wait().unwrap(); + /// let keys = vec!["TiKV", "TiDB"]; + /// let req = connected_client.batch_get(keys); + /// let result: Vec = req.wait().unwrap(); + /// ``` + pub fn batch_get(&self, keys: impl IntoIterator>) -> BatchGet { + BatchGet::new(self, keys.into_iter().map(|v| v.into()).collect()) + } + + /// Create a new [`Put`](struct.Put.html) request. + /// + /// Once resolved this request will result in the setting of the value associated with the given key. + /// + /// ```rust,no_run + /// # use tikv_client::{Key, Value, Config, raw::Client}; + /// # use futures::Future; + /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.wait().unwrap(); + /// let key = "TiKV"; + /// let val = "TiKV"; + /// let req = connected_client.put(key, val); + /// let result: () = req.wait().unwrap(); + /// ``` + pub fn put(&self, key: impl Into, value: impl Into) -> Put { + Put::new(self, key.into(), value.into()) + } + + /// Create a new [`BatchPut`](struct.BatchPut.html) request. + /// + /// Once resolved this request will result in the setting of the value associated with the given key. + /// + /// ```rust,no_run + /// # use tikv_client::{Error, Result, KvPair, Key, Value, Config, raw::Client}; + /// # use futures::Future; + /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.wait().unwrap(); + /// let kvpair1 = ("PD", "Go"); + /// let kvpair2 = ("TiKV", "Rust"); + /// let iterable = vec![kvpair1, kvpair2]; + /// let req = connected_client.batch_put(iterable); + /// let result: () = req.wait().unwrap(); + /// ``` + pub fn batch_put(&self, pairs: impl IntoIterator>) -> BatchPut { + BatchPut::new(self, pairs.into_iter().map(Into::into).collect()) + } + + /// Create a new [`Delete`](struct.Delete.html) request. + /// + /// Once resolved this request will result in the deletion of the given key. + /// + /// ```rust,no_run + /// # use tikv_client::{Key, Config, raw::Client}; + /// # use futures::Future; + /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.wait().unwrap(); + /// let key = "TiKV"; + /// let req = connected_client.delete(key); + /// let result: () = req.wait().unwrap(); + /// ``` + pub fn delete(&self, key: impl Into) -> Delete { + Delete::new(self, key.into()) + } + + /// Create a new [`BatchDelete`](struct.BatchDelete.html) request. + /// + /// Once resolved this request will result in the deletion of the given keys. + /// + /// ```rust,no_run + /// # use tikv_client::{Config, raw::Client}; + /// # use futures::Future; + /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.wait().unwrap(); + /// let keys = vec!["TiKV", "TiDB"]; + /// let req = connected_client.batch_delete(keys); + /// let result: () = req.wait().unwrap(); + /// ``` + pub fn batch_delete(&self, keys: impl IntoIterator>) -> BatchDelete { + BatchDelete::new(self, keys.into_iter().map(Into::into).collect()) + } + + /// Create a new [`Scan`](struct.Scan.html) request. + /// + /// Once resolved this request will result in a scanner over the given keys. + /// + /// If not passed a `limit` parameter, it will default to `u32::MAX`. + /// + /// ```rust,no_run + /// # use tikv_client::{KvPair, Config, raw::Client}; + /// # use futures::Future; + /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.wait().unwrap(); + /// let inclusive_range = "TiKV"..="TiDB"; + /// let req = connected_client.scan(inclusive_range, 2); + /// let result: Vec = req.wait().unwrap(); + /// ``` + pub fn scan(&self, range: impl RangeBounds, limit: impl Into>) -> Scan + where + K: Into + Clone, + { + Scan::new( + self, + ( + transmute_bound(range.start_bound()), + transmute_bound(range.end_bound()), + ), + limit.into().unwrap_or(u32::MAX), + ) + } + + /// Create a new [`BatchScan`](struct.BatchScan.html) request. + /// + /// Once resolved this request will result in a set of scanners over the given keys. + /// + /// If not passed a `limit` parameter, it will default to `u32::MAX`. + /// + /// ```rust,no_run + /// # use tikv_client::{Key, Config, raw::Client}; + /// # use futures::Future; + /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.wait().unwrap(); + /// let inclusive_range1 = "TiDB"..="TiKV"; + /// let inclusive_range2 = "TiKV"..="TiSpark"; + /// let iterable = vec![inclusive_range1, inclusive_range2]; + /// let req = connected_client.batch_scan(iterable, 2); + /// let result = req.wait(); + /// ``` + pub fn batch_scan( + &self, + ranges: impl IntoIterator>, + each_limit: impl Into>, + ) -> BatchScan + where + K: Into + Clone, + { + BatchScan::new( + self, + ranges + .into_iter() + .map(|v| { + ( + transmute_bound(v.start_bound()), + transmute_bound(v.end_bound()), + ) + }) + .collect(), + each_limit.into().unwrap_or(u32::MAX), + ) + } + + /// Create a new [`DeleteRange`](struct.DeleteRange.html) request. + /// + /// Once resolved this request will result in the deletion of all keys over the given range. + /// + /// If not passed a `limit` parameter, it will default to `u32::MAX`. + /// + /// ```rust,no_run + /// # use tikv_client::{Key, Config, raw::Client}; + /// # use futures::Future; + /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.wait().unwrap(); + /// let inclusive_range = "TiKV"..="TiDB"; + /// let req = connected_client.delete_range(inclusive_range); + /// let result: () = req.wait().unwrap(); + /// ``` + pub fn delete_range(&self, range: impl RangeBounds) -> DeleteRange + where + K: Into + Clone, + { + DeleteRange::new( + self, + ( + transmute_bound(range.start_bound()), + transmute_bound(range.end_bound()), + ), + ) + } +} + +/// An unresolved [`Client`](struct.Client.html) connection to a TiKV cluster. +/// +/// Once resolved it will result in a connected [`Client`](struct.Client.html). +/// +/// ```rust,no_run +/// use tikv_client::{Config, raw::{Client, Connect}}; +/// use futures::Future; +/// +/// let connect: Connect = Client::new(&Config::default()); +/// let client: Client = connect.wait().unwrap(); +/// ``` +pub struct Connect { + config: Config, +} + +impl Connect { + fn new(config: Config) -> Self { + Connect { config } + } +} + +impl Future for Connect { + type Item = Client; + type Error = Error; + + fn poll(&mut self) -> Poll { + let _config = &self.config; + unimplemented!() + } +} + /// A [`ColumnFamily`](struct.ColumnFamily.html) is an optional parameter for [`raw::Client`](struct.Client.html) requests. /// @@ -41,10 +293,17 @@ use std::{u32, ops::{RangeBounds, Bound}}; /// /// ```rust /// # use tikv_client::raw::ColumnFamily; +/// /// let cf = ColumnFamily::from("write"); /// let cf = ColumnFamily::from(String::from("write")); /// let cf = ColumnFamily::from(&String::from("write")); /// ``` +/// +/// This is a *wrapper type* that implements `Deref` so it can be used like one transparently. +/// +/// **But, you should not need to worry about all this:** Many functions which accept a +/// `ColumnFamily` accept an `Into`, which means all of the above types can be passed +/// directly to those functions. #[derive(Default, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug)] pub struct ColumnFamily(String); @@ -57,7 +316,16 @@ where } } -/// A raw request to get the [`Value`](struct.Value.html) of a given [`Key`](struct.key.html). +impl Deref for ColumnFamily { + type Target = String; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +/// An unresolved [`Client::get`](struct.Client.html#method.get) request. +/// +/// Once resolved this request will result in the fetching of the value associated with the given key. pub struct Get<'a> { client: &'a Client, key: Key, @@ -92,6 +360,9 @@ impl<'a> Future for Get<'a> { } } +/// An unresolved [`Client::batch_get`](struct.Client.html#method.batch_get) request. +/// +/// Once resolved this request will result in the fetching of the values associated with the given keys. pub struct BatchGet<'a> { client: &'a Client, keys: Vec, @@ -115,7 +386,7 @@ impl<'a> BatchGet<'a> { impl<'a> Future for BatchGet<'a> { type Item = Vec; - type Error = (); + type Error = Error; fn poll(&mut self) -> Poll { let _ = &self.client; @@ -125,6 +396,9 @@ impl<'a> Future for BatchGet<'a> { } } +/// An unresolved [`Client::put`](struct.Client.html#method.put) request. +/// +/// Once resolved this request will result in the setting of the value associated with the given key. pub struct Put<'a> { client: &'a Client, key: Key, @@ -151,7 +425,7 @@ impl<'a> Put<'a> { impl<'a> Future for Put<'a> { type Item = (); - type Error = (); + type Error = Error; fn poll(&mut self) -> Poll { let _ = &self.client; @@ -162,6 +436,9 @@ impl<'a> Future for Put<'a> { } } +/// An unresolved [`Client::batch_put`](struct.Client.html#method.batch_put) request. +/// +/// Once resolved this request will result in the setting of the value associated with the given key. pub struct BatchPut<'a> { client: &'a Client, pairs: Vec, @@ -186,7 +463,7 @@ impl<'a> BatchPut<'a> { impl<'a> Future for BatchPut<'a> { type Item = (); - type Error = (); + type Error = Error; fn poll(&mut self) -> Poll { let _ = &self.client; @@ -196,19 +473,9 @@ impl<'a> Future for BatchPut<'a> { } } -/// An unresolved delete request. +/// An unresolved [`Client::delete`](struct.Client.html#method.delete) request. /// /// Once resolved this request will result in the deletion of the given key. -/// -/// ```rust,no_run -/// use tikv_client::{Config, raw::Client}; -/// use futures::Future; -/// let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); -/// let connected_client = connecting_client.wait().unwrap(); -/// let key = "TiKV"; -/// let delete_req = connected_client.delete(key); -/// delete_req.wait(); -/// ``` pub struct Delete<'a> { client: &'a Client, key: Key, @@ -233,7 +500,7 @@ impl<'a> Delete<'a> { impl<'a> Future for Delete<'a> { type Item = (); - type Error = (); + type Error = Error; fn poll(&mut self) -> Poll { let _ = &self.client; @@ -243,6 +510,7 @@ impl<'a> Future for Delete<'a> { } } +/// An unresolved [`Client::batch_delete`](struct.Client.html#method.batch_delete) request. pub struct BatchDelete<'a> { client: &'a Client, keys: Vec, @@ -267,7 +535,7 @@ impl<'a> BatchDelete<'a> { impl<'a> Future for BatchDelete<'a> { type Item = (); - type Error = (); + type Error = Error; fn poll(&mut self) -> Poll { let _ = &self.client; @@ -277,6 +545,9 @@ impl<'a> Future for BatchDelete<'a> { } } +/// An unresolved [`Client::scan`](struct.Client.html#method.scan) request. +/// +/// Once resolved this request will result in a scanner over the given keys. pub struct Scan<'a> { client: &'a Client, range: (Bound, Bound), @@ -317,7 +588,7 @@ impl<'a> Scan<'a> { impl<'a> Future for Scan<'a> { type Item = Vec; - type Error = (); + type Error = Error; fn poll(&mut self) -> Poll { let _ = &self.client; @@ -329,6 +600,9 @@ impl<'a> Future for Scan<'a> { } } +/// An unresolved [`Client::batch_scan`](struct.Client.html#method.batch_scan) request. +/// +/// Once resolved this request will result in a set of scanners over the given keys. pub struct BatchScan<'a> { client: &'a Client, ranges: Vec<(Bound, Bound)>, @@ -369,7 +643,7 @@ impl<'a> BatchScan<'a> { impl<'a> Future for BatchScan<'a> { type Item = Vec; - type Error = (); + type Error = Error; fn poll(&mut self) -> Poll { let _ = &self.client; @@ -381,6 +655,9 @@ impl<'a> Future for BatchScan<'a> { } } +/// An unresolved [`Client::delete_range`](struct.Client.html#method.delete_range) request. +/// +/// Once resolved this request will result in the deletion of all keys over the given range. pub struct DeleteRange<'a> { client: &'a Client, range: (Bound, Bound), @@ -405,7 +682,7 @@ impl<'a> DeleteRange<'a> { impl<'a> Future for DeleteRange<'a> { type Item = (); - type Error = (); + type Error = Error; fn poll(&mut self) -> Poll { let _ = &self.client; @@ -414,241 +691,3 @@ impl<'a> Future for DeleteRange<'a> { unimplemented!() } } - -/// A future which resolves the initial connection between the [`Client`](struct.Client.html) and the TiKV cluster. -/// -/// ```rust,no_run -/// # use tikv_client::{Config, raw::{Client, Connect}}; -/// # use futures::Future; -/// let connect = Client::new(&Config::default()); -/// let client = connect.wait(); -/// ``` -pub struct Connect { - config: Config, -} - -impl Connect { - fn new(config: Config) -> Self { - Connect { config } - } -} - -impl Future for Connect { - type Item = Client; - type Error = Error; - - fn poll(&mut self) -> Poll { - let _config = &self.config; - unimplemented!() - } -} - -/// The TiKV raw [`Client`](struct.Client.html) is used to issue requests to the TiKV server and PD cluster. -pub struct Client; - -impl Client { - /// Create a new [`Client`](struct.Client.html) once the [`Connect`](struct.Connect.html) resolves. - /// - /// ```rust,no_run - /// use tikv_client::{Config, raw::{Client, Connect}}; - /// use futures::Future; - /// let connect = Client::new(&Config::default()); - /// let client = connect.wait(); - /// ``` - #[cfg_attr(feature = "cargo-clippy", allow(clippy::new_ret_no_self))] - pub fn new(config: &Config) -> Connect { - Connect::new(config.clone()) - } - - /// Create a new [`Get`](struct.Get.html) request. - /// - /// Once resolved this request will result in the fetching of the value associated with the given key. - /// - /// ```rust,no_run - /// # use tikv_client::{Value, Config, raw::Client}; - /// # use futures::Future; - /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); - /// # let connected_client = connecting_client.wait().unwrap(); - /// let key = "TiKV"; - /// let req = connected_client.get(key); - /// let result: Value = req.wait().unwrap(); - /// ``` - pub fn get(&self, key: impl Into) -> Get { - Get::new(self, key.into()) - } - - /// Create a new [`BatchGet`](struct.BatchGet.html) request. - /// - /// Once resolved this request will result in the fetching of the values associated with the given keys. - /// - /// ```rust,no_run - /// # use tikv_client::{KvPair, Config, raw::Client}; - /// # use futures::Future; - /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); - /// # let connected_client = connecting_client.wait().unwrap(); - /// let keys = vec!["TiKV", "TiDB"]; - /// let req = connected_client.batch_get(keys); - /// let result: Vec = req.wait().unwrap(); - /// ``` - pub fn batch_get(&self, keys: impl IntoIterator>) -> BatchGet { - BatchGet::new(self, keys.into_iter().map(|v| v.into()).collect()) - } - - /// Create a new [`Put`](struct.Put.html) request. - /// - /// Once resolved this request will result in the setting of the value associated with the given key. - /// - /// ```rust,no_run - /// # use tikv_client::{Key, Value, Config, raw::Client}; - /// # use futures::Future; - /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); - /// # let connected_client = connecting_client.wait().unwrap(); - /// let key = "TiKV"; - /// let val = "TiKV"; - /// let req = connected_client.put(key, val); - /// let result: () = req.wait().unwrap(); - /// ``` - pub fn put(&self, key: impl Into, value: impl Into) -> Put { - Put::new(self, key.into(), value.into()) - } - - /// Create a new [`BatchPut`](struct.BatchPut.html) request. - /// - /// Once resolved this request will result in the setting of the value associated with the given key. - /// - /// ```rust,no_run - /// # use tikv_client::{Error, Result, KvPair, Key, Value, Config, raw::Client}; - /// # use futures::Future; - /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); - /// # let connected_client = connecting_client.wait().unwrap(); - /// let kvpair1 = ("PD", "Go"); - /// let kvpair2 = ("TiKV", "Rust"); - /// let iterable = vec![kvpair1, kvpair2]; - /// let req = connected_client.batch_put(iterable); - /// let result: () = req.wait().unwrap(); - /// ``` - pub fn batch_put(&self, pairs: impl IntoIterator>) -> BatchPut { - BatchPut::new(self, pairs.into_iter().map(Into::into).collect()) - } - - /// Create a new [`Delete`](struct.Delete.html) request. - /// - /// Once resolved this request will result in the deletion of the given key. - /// - /// ```rust,no_run - /// # use tikv_client::{Key, Config, raw::Client}; - /// # use futures::Future; - /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); - /// # let connected_client = connecting_client.wait().unwrap(); - /// let key = "TiKV"; - /// let req = connected_client.delete(key); - /// let result: () = req.wait().unwrap(); - /// ``` - pub fn delete(&self, key: impl Into) -> Delete { - Delete::new(self, key.into()) - } - - /// Create a new [`BatchDelete`](struct.BatchDelete.html) request. - /// - /// Once resolved this request will result in the deletion of the given keys. - /// - /// ```rust,no_run - /// # use tikv_client::{Config, raw::Client}; - /// # use futures::Future; - /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); - /// # let connected_client = connecting_client.wait().unwrap(); - /// let keys = vec!["TiKV", "TiDB"]; - /// let req = connected_client.batch_delete(keys); - /// let result: () = req.wait().unwrap(); - /// ``` - pub fn batch_delete(&self, keys: impl IntoIterator>) -> BatchDelete { - BatchDelete::new(self, keys.into_iter().map(Into::into).collect()) - } - - /// Create a new [`Scan`](struct.Scan.html) request. - /// - /// Once resolved this request will result in a scanner over the given keys. - /// - /// If not passed a `limit` parameter, it will default to `u32::MAX`. - /// - /// ```rust,no_run - /// # use tikv_client::{KvPair, Config, raw::Client}; - /// # use futures::Future; - /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); - /// # let connected_client = connecting_client.wait().unwrap(); - /// let inclusive_range = "TiKV"..="TiDB"; - /// let req = connected_client.scan(inclusive_range, 2); - /// let result: Vec = req.wait().unwrap(); - /// ``` - pub fn scan(&self, range: impl RangeBounds, limit: impl Into>) -> Scan - where - K: Into + Clone, - { - Scan::new(self, - (transmute_bound(range.start_bound()), transmute_bound(range.end_bound())), - limit.into().unwrap_or(u32::MAX) - ) - } - - /// Create a new [`BatchScan`](struct.BatchScan.html) request. - /// - /// Once resolved this request will result in a set of scanners over the given keys. - /// - /// If not passed a `limit` parameter, it will default to `u32::MAX`. - /// - /// ```rust,no_run - /// # use tikv_client::{Key, Config, raw::Client}; - /// # use futures::Future; - /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); - /// # let connected_client = connecting_client.wait().unwrap(); - /// let inclusive_range1 = "TiDB"..="TiKV"; - /// let inclusive_range2 = "TiKV"..="TiSpark"; - /// let iterable = vec![inclusive_range1, inclusive_range2]; - /// let req = connected_client.batch_scan(iterable, 2); - /// let result = req.wait(); - /// ``` - pub fn batch_scan( - &self, - ranges: impl IntoIterator>, - each_limit: impl Into>, - ) -> BatchScan - where - K: Into + Clone, - { - BatchScan::new( - self, - ranges.into_iter().map(|v| - (transmute_bound(v.start_bound()), transmute_bound(v.end_bound())) - ).collect(), - each_limit.into().unwrap_or(u32::MAX), - ) - } - - /// Create a new [`DeleteRange`](struct.DeleteRange.html) request. - /// - /// Once resolved this request will result in the deletion of all keys over the given range. - /// - /// If not passed a `limit` parameter, it will default to `u32::MAX`. - /// - /// ```rust,no_run - /// # use tikv_client::{Key, Config, raw::Client}; - /// # use futures::Future; - /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); - /// # let connected_client = connecting_client.wait().unwrap(); - /// let inclusive_range = "TiKV"..="TiDB"; - /// let req = connected_client.delete_range(inclusive_range); - /// let result: () = req.wait().unwrap(); - /// ``` - pub fn delete_range(&self, range: impl RangeBounds) -> DeleteRange - where - K: Into + Clone, - { - DeleteRange::new( - self, - ( - transmute_bound(range.start_bound()), - transmute_bound(range.end_bound()), - ), - ) - } -} diff --git a/src/transaction.rs b/src/transaction.rs index 8cfc39f3..24c115ef 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -286,6 +286,17 @@ impl Snapshot { } } +/// An unresolved [`Client`](struct.Client.html) connection to a TiKV cluster. +/// +/// Once resolved it will result in a connected [`Client`](struct.Client.html). +/// +/// ```rust,no_run +/// use tikv_client::{Config, transaction::{Client, Connect}}; +/// use futures::Future; +/// +/// let connect: Connect = Client::new(&Config::default()); +/// let client: Client = connect.wait().unwrap(); +/// ``` pub struct Connect { config: Config, } @@ -306,26 +317,81 @@ impl Future for Connect { } } +/// The TiKV transactional [`Client`](struct.Client.html) is used to issue requests to the TiKV server and PD cluster. pub struct Client {} impl Client { - #![cfg_attr(feature = "cargo-clippy", allow(clippy::new_ret_no_self))] + /// Create a new [`Client`](struct.Client.html) once the [`Connect`](struct.Connect.html) resolves. + /// + /// ```rust,no_run + /// use tikv_client::{Config, transaction::Client}; + /// use futures::Future; + /// let connect = Client::new(&Config::default()); + /// let client = connect.wait(); + /// ``` + #[cfg_attr(feature = "cargo-clippy", allow(clippy::new_ret_no_self))] pub fn new(config: &Config) -> Connect { Connect::new(config.clone()) } + /// Create a new [`Transaction`](struct.Transaction.html) using the timestamp from [`current_timestamp`](struct.Client.html#method.current_timestamp). + /// + /// Using the transaction you can issue commands like [`get`](struct.Transaction.html#method.get) or [`set`](file:///home/hoverbear/git/client-rust/target/doc/tikv_client/transaction/struct.Transaction.html#method.set). + /// + /// ```rust,no_run + /// use tikv_client::{Config, transaction::Client}; + /// use futures::Future; + /// let connect = Client::new(&Config::default()); + /// let client = connect.wait().unwrap(); + /// let transaction = client.begin(); + /// // ... Issue some commands. + /// let commit = transaction.commit(); + /// let result: () = commit.wait().unwrap(); + /// ``` pub fn begin(&self) -> Transaction { unimplemented!() } + /// Create a new [`Transaction`](struct.Transaction.html) at the provded timestamp. + /// + /// ```rust,no_run + /// use tikv_client::{Config, transaction::Client}; + /// use futures::Future; + /// let connect = Client::new(&Config::default()); + /// let client = connect.wait().unwrap(); + /// let timestamp = client.current_timestamp(); + /// let transaction = client.begin_with_timestamp(timestamp); + /// // ... Issue some commands. + /// let commit = transaction.commit(); + /// let result: () = commit.wait().unwrap(); + /// ``` pub fn begin_with_timestamp(&self, _timestamp: Timestamp) -> Transaction { unimplemented!() } + /// Get a [`Snapshot`](struct.Snapshot.html) using the timestamp from [`current_timestamp`](struct.Client.html#method.current_timestamp). + /// + /// ```rust,no_run + /// use tikv_client::{Config, transaction::Client}; + /// use futures::Future; + /// let connect = Client::new(&Config::default()); + /// let client = connect.wait().unwrap(); + /// let snapshot = client.snapshot(); + /// // ... Issue some commands. + /// ``` pub fn snapshot(&self) -> Snapshot { unimplemented!() } + /// Retrieve the current [`Timestamp`](struct.Timestamp.html). + /// + /// ```rust,no_run + /// use tikv_client::{Config, transaction::Client}; + /// use futures::Future; + /// let connect = Client::new(&Config::default()); + /// let client = connect.wait().unwrap(); + /// let timestamp = client.current_timestamp(); + /// ``` pub fn current_timestamp(&self) -> Timestamp { unimplemented!() } From faeffb3a423d3b2e813bee7192b29dde8d7a7821 Mon Sep 17 00:00:00 2001 From: Hoverbear Date: Thu, 27 Dec 2018 14:58:28 -0800 Subject: [PATCH 8/9] More docs, sticnarf feedback. --- src/errors.rs | 5 +- src/lib.rs | 121 ++++++++++++++++++++++++++++++++++++++++++--- src/raw.rs | 50 ++++--------------- src/transaction.rs | 48 ++++++++++++++++++ 4 files changed, 176 insertions(+), 48 deletions(-) diff --git a/src/errors.rs b/src/errors.rs index e2da0547..4ac32d41 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -16,7 +16,7 @@ use quick_error::quick_error; use std::{error, result}; quick_error!{ - /// An error originating from Raft or dependencies. + /// An error originating from the TiKV client or dependencies. /// /// This client currently uses [`quick_error`](https://docs.rs/quick-error/1.2.2/quick_error/) /// for errors. *This may change in future versions.* @@ -42,7 +42,7 @@ quick_error!{ } /// An unknown error. /// - /// Generally, this is not an expected error. + /// Generally, this is not an expected error. Please report it if encountered. Other(err: Box) { from() cause(err.as_ref()) @@ -92,4 +92,5 @@ quick_error!{ } } +/// A result holding an [`Error`](enum.Error.html). pub type Result = result::Result; diff --git a/src/lib.rs b/src/lib.rs index 057052d4..563104c0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,7 +12,10 @@ // limitations under the License. use serde_derive::*; -use std::{ops::{Deref, Bound}, path::PathBuf}; +use std::{ + ops::{Deref, Bound, Range, RangeFrom, RangeFull, RangeInclusive, RangeTo, RangeToInclusive}, + path::PathBuf, +}; mod errors; pub mod raw; @@ -289,6 +292,113 @@ impl Config { } } +/// A convenience trait for expressing ranges. +/// +/// In TiKV, keys are an ordered sequence of bytes. This means we can have ranges over those +/// bytes. Eg `001` is before `010`. +/// +/// This trait has implementations for common range types like `a..b`, `a..=b` where `a` and `b` +/// `impl Into`. You could implement this for your own types. +/// +/// ```rust +/// use tikv_client::{KeyRange, Key}; +/// use std::ops::{Range, RangeInclusive, RangeTo, RangeToInclusive, RangeFrom, RangeFull, Bound}; +/// +/// let explict_range: Range = Range { start: Key::from("Rust"), end: Key::from("TiKV") }; +/// let from_explict_range = explict_range.into_bounds(); +/// +/// let range: Range<&str> = "Rust".."TiKV"; +/// let from_range = range.into_bounds(); +/// assert_eq!(from_explict_range, from_range); +/// +/// let range: RangeInclusive<&str> = "Rust"..="TiKV"; +/// let from_range = range.into_bounds(); +/// assert_eq!( +/// (Bound::Included(Key::from("Rust")), Bound::Included(Key::from("TiKV"))), +/// from_range +/// ); +/// +/// let range_from: RangeFrom<&str> = "Rust"..; +/// let from_range_from = range_from.into_bounds(); +/// assert_eq!( +/// (Bound::Included(Key::from("Rust")), Bound::Unbounded), +/// from_range_from, +/// ); +/// +/// let range_to: RangeTo<&str> = .."TiKV"; +/// let from_range_to = range_to.into_bounds(); +/// assert_eq!( +/// (Bound::Unbounded, Bound::Excluded(Key::from("TiKV"))), +/// from_range_to, +/// ); +/// +/// let range_to_inclusive: RangeToInclusive<&str> = ..="TiKV"; +/// let from_range_to_inclusive = range_to_inclusive.into_bounds(); +/// assert_eq!( +/// (Bound::Unbounded, Bound::Included(Key::from("TiKV"))), +/// from_range_to_inclusive, +/// ); +/// +/// let range_full: RangeFull = ..; +/// let from_range_full = range_full.into_bounds(); +/// assert_eq!( +/// (Bound::Unbounded, Bound::Unbounded), +/// from_range_full +/// ); +/// ``` +/// +/// **But, you should not need to worry about all this:** Many functions accept a `impl KeyRange` +/// which means all of the above types can be passed directly to those functions. +pub trait KeyRange { + fn into_bounds(self) -> (Bound, Bound); +} + +impl> KeyRange for Range { + fn into_bounds(self) -> (Bound, Bound) { + ( + Bound::Included(self.start.into()), + Bound::Excluded(self.end.into()), + ) + } +} + +impl> KeyRange for RangeFrom { + fn into_bounds(self) -> (Bound, Bound) { + (Bound::Included(self.start.into()), Bound::Unbounded) + } +} + +impl KeyRange for RangeFull { + fn into_bounds(self) -> (Bound, Bound) { + (Bound::Unbounded, Bound::Unbounded) + } +} + +impl> KeyRange for RangeInclusive { + fn into_bounds(self) -> (Bound, Bound) { + let (start, end) = self.into_inner(); + (Bound::Included(start.into()), Bound::Included(end.into())) + } +} + +impl> KeyRange for RangeTo { + fn into_bounds(self) -> (Bound, Bound) { + (Bound::Unbounded, Bound::Excluded(self.end.into())) + } +} + +impl> KeyRange for RangeToInclusive { + fn into_bounds(self) -> (Bound, Bound) { + (Bound::Unbounded, Bound::Included(self.end.into())) + } +} + +impl> KeyRange for (Bound, Bound) { + fn into_bounds(self) -> (Bound, Bound) { + (transmute_bound(self.0), transmute_bound(self.1)) + } +} + /// Unfortunately due to the API limitations of `RangeBound` we can only ever get an `&` (not a /// `&mut` or owned value) of a `Bound`. So in order to coherce the value into the bound we want /// we must clone the data. @@ -296,15 +406,14 @@ impl Config { /// The only way to avoid this would be to only accept two `Bound` arguments, not one /// `RangeBound` argument. This would mean `"abc"..="xyz"` would not be possible. /// -// TODO: If you feel tricky please try to remove this clone! -fn transmute_bound(b: Bound<&K>) -> Bound +fn transmute_bound(b: Bound) -> Bound where - K: Into + Clone, + K: Into, { use std::ops::Bound::*; match b { - Included(k) => Included(k.clone().into()), - Excluded(k) => Excluded(k.clone().into()), + Included(k) => Included(k.into()), + Excluded(k) => Excluded(k.into()), Unbounded => Unbounded, } } diff --git a/src/raw.rs b/src/raw.rs index a871569d..730ee013 100644 --- a/src/raw.rs +++ b/src/raw.rs @@ -21,9 +21,9 @@ oracle, while the transactional interface does. **Warning:** It is not advisible to use the both raw and transactional functionality in the same keyspace. */ -use crate::{transmute_bound, Config, Error, Key, KvPair, Value}; +use crate::{Config, Error, Key, KeyRange, KvPair, Value}; use futures::{Future, Poll}; -use std::{u32, ops::{Deref, RangeBounds, Bound}}; +use std::{u32, ops::{Bound, Deref}}; /// The TiKV raw [`Client`](struct.Client.html) is used to issue requests to the TiKV server and PD cluster. pub struct Client; @@ -162,18 +162,8 @@ impl Client { /// let req = connected_client.scan(inclusive_range, 2); /// let result: Vec = req.wait().unwrap(); /// ``` - pub fn scan(&self, range: impl RangeBounds, limit: impl Into>) -> Scan - where - K: Into + Clone, - { - Scan::new( - self, - ( - transmute_bound(range.start_bound()), - transmute_bound(range.end_bound()), - ), - limit.into().unwrap_or(u32::MAX), - ) + pub fn scan(&self, range: impl KeyRange, limit: impl Into>) -> Scan { + Scan::new(self, range.into_bounds(), limit.into().unwrap_or(u32::MAX)) } /// Create a new [`BatchScan`](struct.BatchScan.html) request. @@ -193,25 +183,14 @@ impl Client { /// let req = connected_client.batch_scan(iterable, 2); /// let result = req.wait(); /// ``` - pub fn batch_scan( + pub fn batch_scan( &self, - ranges: impl IntoIterator>, + ranges: impl IntoIterator, each_limit: impl Into>, - ) -> BatchScan - where - K: Into + Clone, - { + ) -> BatchScan { BatchScan::new( self, - ranges - .into_iter() - .map(|v| { - ( - transmute_bound(v.start_bound()), - transmute_bound(v.end_bound()), - ) - }) - .collect(), + ranges.into_iter().map(KeyRange::into_bounds).collect(), each_limit.into().unwrap_or(u32::MAX), ) } @@ -231,17 +210,8 @@ impl Client { /// let req = connected_client.delete_range(inclusive_range); /// let result: () = req.wait().unwrap(); /// ``` - pub fn delete_range(&self, range: impl RangeBounds) -> DeleteRange - where - K: Into + Clone, - { - DeleteRange::new( - self, - ( - transmute_bound(range.start_bound()), - transmute_bound(range.end_bound()), - ), - ) + pub fn delete_range(&self, range: impl KeyRange) -> DeleteRange { + DeleteRange::new(self, range.into_bounds()) } } diff --git a/src/transaction.rs b/src/transaction.rs index 24c115ef..f61a6ebd 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -58,9 +58,34 @@ impl Stream for Scanner { } } +/// The isolation level guarantees provided by the transaction. #[derive(Copy, Clone, Eq, PartialEq, Debug)] pub enum IsolationLevel { + /// Consistent reads and conflict free writes. + /// + /// Snapshot isolation guarantees: + /// * All reads will see the last committed value of the data at the snapshot timestamp. + /// * The transaction will only successfully commit if no updates to the data have created a + /// conflict with concurrent updates made sine the snapshot. + /// + /// Using this level means: + /// * Lost updates don't occur. + /// * Dirty reads don't occur. + /// * Non-repeatable reads don't occur. + /// * Phantom reads don't occur. SnapshotIsolation, + /// Reads may not be consistent, but writes are conflict free. + /// + /// Read committed guarantees: + /// * All reads are committed at the moment it is read. + /// not repeatable. + /// * Write locks are only released at the end of the transaction. + /// + /// Using this level means: + /// * Lost updates don't occur. + /// * Dirty reads don't occur. + /// * Non-repeatable reads may occur. + /// * Phantom reads may occur. ReadCommitted, } @@ -206,15 +231,32 @@ impl Future for Delete { } } +/// A undo-able set of actions on the dataset. +/// +/// Using a transaction you can prepare a set of actions (such as `get`, or `set`) on data at a +/// particular timestamp obtained from the placement driver. +/// +/// Once a transaction is commited, a new commit timestamp is obtained from the placement driver. pub struct Transaction { snapshot: Snapshot, } impl Transaction { + /// Create a new transaction operating on the given snapshot. + pub fn new(snapshot: Snapshot) -> Self { + Self { + snapshot + } + } + + /// Commit the actions of the transaction. + /// + /// Once committed, it is no longer possible to `rollback` the actions in the transaction. pub fn commit(self) -> Commit { Commit::new(self) } + /// Rollback the actions of the transaction. pub fn rollback(self) -> Rollback { Rollback::new(self) } @@ -227,14 +269,19 @@ impl Transaction { unimplemented!() } + /// Returns the timestamp which the transaction started at. + /// + /// pub fn start_ts(&self) -> Timestamp { unimplemented!() } + /// Get the `Snapshot` the transaction is operating on. pub fn snapshot(&self) -> Snapshot { unimplemented!() } + /// Set the isolation level of the transaction. pub fn set_isolation_level(&mut self, _level: IsolationLevel) { unimplemented!() } @@ -264,6 +311,7 @@ impl Transaction { } } +/// A snapshot of dataset at a particular point in time. pub struct Snapshot; impl Snapshot { From 2bbe7e116a6f6ca37d1d9918c066aea5b33c6dc9 Mon Sep 17 00:00:00 2001 From: Hoverbear Date: Thu, 27 Dec 2018 17:05:06 -0800 Subject: [PATCH 9/9] Document transaction module --- src/raw.rs | 19 +- src/transaction.rs | 591 ++++++++++++++++++++++++++++----------------- 2 files changed, 389 insertions(+), 221 deletions(-) diff --git a/src/raw.rs b/src/raw.rs index 730ee013..954bb8ae 100644 --- a/src/raw.rs +++ b/src/raw.rs @@ -44,7 +44,8 @@ impl Client { /// Create a new [`Get`](struct.Get.html) request. /// - /// Once resolved this request will result in the fetching of the value associated with the given key. + /// Once resolved this request will result in the fetching of the value associated with the + /// given key. /// /// ```rust,no_run /// # use tikv_client::{Value, Config, raw::Client}; @@ -61,7 +62,8 @@ impl Client { /// Create a new [`BatchGet`](struct.BatchGet.html) request. /// - /// Once resolved this request will result in the fetching of the values associated with the given keys. + /// Once resolved this request will result in the fetching of the values associated with the + /// given keys. /// /// ```rust,no_run /// # use tikv_client::{KvPair, Config, raw::Client}; @@ -295,7 +297,8 @@ impl Deref for ColumnFamily { /// An unresolved [`Client::get`](struct.Client.html#method.get) request. /// -/// Once resolved this request will result in the fetching of the value associated with the given key. +/// Once resolved this request will result in the fetching of the value associated with the given +/// key. pub struct Get<'a> { client: &'a Client, key: Key, @@ -332,7 +335,8 @@ impl<'a> Future for Get<'a> { /// An unresolved [`Client::batch_get`](struct.Client.html#method.batch_get) request. /// -/// Once resolved this request will result in the fetching of the values associated with the given keys. +/// Once resolved this request will result in the fetching of the values associated with the given +/// keys. pub struct BatchGet<'a> { client: &'a Client, keys: Vec, @@ -368,7 +372,8 @@ impl<'a> Future for BatchGet<'a> { /// An unresolved [`Client::put`](struct.Client.html#method.put) request. /// -/// Once resolved this request will result in the setting of the value associated with the given key. +/// Once resolved this request will result in the setting of the value associated with the given +/// key. pub struct Put<'a> { client: &'a Client, key: Key, @@ -481,6 +486,8 @@ impl<'a> Future for Delete<'a> { } /// An unresolved [`Client::batch_delete`](struct.Client.html#method.batch_delete) request. +/// +/// Once resolved this request will result in the deletion of the given keys. pub struct BatchDelete<'a> { client: &'a Client, keys: Vec, @@ -516,7 +523,7 @@ impl<'a> Future for BatchDelete<'a> { } /// An unresolved [`Client::scan`](struct.Client.html#method.scan) request. -/// +/// /// Once resolved this request will result in a scanner over the given keys. pub struct Scan<'a> { client: &'a Client, diff --git a/src/transaction.rs b/src/transaction.rs index f61a6ebd..2f97cf66 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -23,6 +23,117 @@ use crate::{Config, Error, Key, KvPair, Value}; use futures::{Future, Poll, Stream}; use std::ops::RangeBounds; +/// The TiKV transactional [`Client`](struct.Client.html) is used to issue requests to the TiKV server and PD cluster. +pub struct Client; + +impl Client { + /// Create a new [`Client`](struct.Client.html) once the [`Connect`](struct.Connect.html) resolves. + /// + /// ```rust,no_run + /// use tikv_client::{Config, transaction::Client}; + /// use futures::Future; + /// let connect = Client::new(&Config::default()); + /// let client = connect.wait(); + /// ``` + #[cfg_attr(feature = "cargo-clippy", allow(clippy::new_ret_no_self))] + pub fn new(config: &Config) -> Connect { + Connect::new(config.clone()) + } + + /// Create a new [`Transaction`](struct.Transaction.html) using the timestamp from [`current_timestamp`](struct.Client.html#method.current_timestamp). + /// + /// Using the transaction you can issue commands like [`get`](struct.Transaction.html#method.get) or [`set`](file:///home/hoverbear/git/client-rust/target/doc/tikv_client/transaction/struct.Transaction.html#method.set). + /// + /// ```rust,no_run + /// use tikv_client::{Config, transaction::Client}; + /// use futures::Future; + /// let connect = Client::new(&Config::default()); + /// let client = connect.wait().unwrap(); + /// let transaction = client.begin(); + /// // ... Issue some commands. + /// let commit = transaction.commit(); + /// let result: () = commit.wait().unwrap(); + /// ``` + pub fn begin(&self) -> Transaction { + unimplemented!() + } + + /// Create a new [`Transaction`](struct.Transaction.html) at the provded timestamp. + /// + /// ```rust,no_run + /// use tikv_client::{Config, transaction::Client}; + /// use futures::Future; + /// let connect = Client::new(&Config::default()); + /// let client = connect.wait().unwrap(); + /// let timestamp = client.current_timestamp(); + /// let transaction = client.begin_with_timestamp(timestamp); + /// // ... Issue some commands. + /// let commit = transaction.commit(); + /// let result: () = commit.wait().unwrap(); + /// ``` + pub fn begin_with_timestamp(&self, _timestamp: Timestamp) -> Transaction { + unimplemented!() + } + + /// Get a [`Snapshot`](struct.Snapshot.html) using the timestamp from [`current_timestamp`](struct.Client.html#method.current_timestamp). + /// + /// ```rust,no_run + /// use tikv_client::{Config, transaction::Client}; + /// use futures::Future; + /// let connect = Client::new(&Config::default()); + /// let client = connect.wait().unwrap(); + /// let snapshot = client.snapshot(); + /// // ... Issue some commands. + /// ``` + pub fn snapshot(&self) -> Snapshot { + unimplemented!() + } + + /// Retrieve the current [`Timestamp`](struct.Timestamp.html). + /// + /// ```rust,no_run + /// use tikv_client::{Config, transaction::Client}; + /// use futures::Future; + /// let connect = Client::new(&Config::default()); + /// let client = connect.wait().unwrap(); + /// let timestamp = client.current_timestamp(); + /// ``` + pub fn current_timestamp(&self) -> Timestamp { + unimplemented!() + } +} + +/// An unresolved [`Client`](struct.Client.html) connection to a TiKV cluster. +/// +/// Once resolved it will result in a connected [`Client`](struct.Client.html). +/// +/// ```rust,no_run +/// use tikv_client::{Config, transaction::{Client, Connect}}; +/// use futures::Future; +/// +/// let connect: Connect = Client::new(&Config::default()); +/// let client: Client = connect.wait().unwrap(); +/// ``` +pub struct Connect { + config: Config, +} + +impl Connect { + fn new(config: Config) -> Self { + Connect { config } + } +} + +impl Future for Connect { + type Item = Client; + type Error = Error; + + fn poll(&mut self) -> Poll { + let _config = &self.config; + unimplemented!() + } +} + /// A logical timestamp produced by PD. #[derive(Copy, Clone)] pub struct Timestamp(u64); @@ -47,6 +158,246 @@ impl Timestamp { } } +/// A undo-able set of actions on the dataset. +/// +/// Using a transaction you can prepare a set of actions (such as `get`, or `set`) on data at a +/// particular timestamp obtained from the placement driver. +/// +/// Once a transaction is commited, a new commit timestamp is obtained from the placement driver. +pub struct Transaction { + snapshot: Snapshot, +} + +impl Transaction { + /// Create a new transaction operating on the given snapshot. + /// + /// ```rust,no_run + /// use tikv_client::{Config, transaction::Client}; + /// use futures::Future; + /// let connect = Client::new(&Config::default()); + /// let client = connect.wait().unwrap(); + /// let txn = client.begin(); + /// ``` + pub fn new(snapshot: Snapshot) -> Self { + Self { snapshot } + } + + /// Commit the actions of the transaction. + /// + /// Once committed, it is no longer possible to `rollback` the actions in the transaction. + /// + /// ```rust,no_run + /// # use tikv_client::{Config, transaction::Client}; + /// # use futures::Future; + /// # let connect = Client::new(&Config::default()); + /// # let connected_client = connect.wait().unwrap(); + /// let txn = connected_client.begin(); + /// // ... Do some actions. + /// let req = txn.commit(); + /// let result: () = req.wait().unwrap(); + /// ``` + pub fn commit(self) -> Commit { + Commit::new(self) + } + + /// Rollback the actions of the transaction. + /// + /// ```rust,no_run + /// # use tikv_client::{Config, transaction::Client}; + /// # use futures::Future; + /// # let connect = Client::new(&Config::default()); + /// # let connected_client = connect.wait().unwrap(); + /// let txn = connected_client.begin(); + /// // ... Do some actions. + /// let req = txn.rollback(); + /// let result: () = req.wait().unwrap(); + /// ``` + pub fn rollback(self) -> Rollback { + Rollback::new(self) + } + + /// Lock the given keys. + /// + /// ```rust,no_run + /// # use tikv_client::{Config, transaction::Client}; + /// # use futures::Future; + /// # let connect = Client::new(&Config::default()); + /// # let connected_client = connect.wait().unwrap(); + /// let mut txn = connected_client.begin(); + /// // ... Do some actions. + /// let req = txn.lock_keys(vec!["TiKV", "Rust"]); + /// let result: () = req.wait().unwrap(); + /// ``` + pub fn lock_keys(&mut self, keys: impl IntoIterator>) -> LockKeys { + LockKeys::new(keys.into_iter().map(|v| v.into()).collect()) + } + + pub fn is_readonly(&self) -> bool { + unimplemented!() + } + + /// Returns the timestamp which the transaction started at. + /// + /// ```rust,no_run + /// # use tikv_client::{Config, transaction::{Client, Timestamp}}; + /// # use futures::Future; + /// # let connect = Client::new(&Config::default()); + /// # let connected_client = connect.wait().unwrap(); + /// let txn = connected_client.begin(); + /// // ... Do some actions. + /// let ts: Timestamp = txn.start_ts(); + /// ``` + pub fn start_ts(&self) -> Timestamp { + unimplemented!() + } + + /// Get the `Snapshot` the transaction is operating on. + /// + /// ```rust,no_run + /// # use tikv_client::{Config, transaction::{Client, Snapshot}}; + /// # use futures::Future; + /// # let connect = Client::new(&Config::default()); + /// # let connected_client = connect.wait().unwrap(); + /// let txn = connected_client.begin(); + /// // ... Do some actions. + /// let snap: Snapshot = txn.snapshot(); + /// ``` + pub fn snapshot(&self) -> Snapshot { + unimplemented!() + } + + /// Set the isolation level of the transaction. + /// + /// ```rust,no_run + /// # use tikv_client::{Config, transaction::{Client, IsolationLevel}}; + /// # use futures::Future; + /// # let connect = Client::new(&Config::default()); + /// # let connected_client = connect.wait().unwrap(); + /// let mut txn = connected_client.begin(); + /// txn.set_isolation_level(IsolationLevel::SnapshotIsolation); + /// ``` + pub fn set_isolation_level(&mut self, _level: IsolationLevel) { + unimplemented!() + } + + /// Create a new [`Get`](struct.Get.html) request. + /// + /// Once resolved this request will result in the fetching of the value associated with the + /// given key. + /// + /// ```rust,no_run + /// # use tikv_client::{Value, Config, transaction::Client}; + /// # use futures::Future; + /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.wait().unwrap(); + /// let mut txn = connected_client.begin(); + /// let key = "TiKV"; + /// let req = txn.get(key); + /// let result: Value = req.wait().unwrap(); + /// // Finish the transaction... + /// txn.commit().wait().unwrap(); + /// ``` + pub fn get(&self, key: impl Into) -> Get { + self.snapshot.get(key.into()) + } + + /// Create a new [`BatchGet`](struct.BatchGet.html) request. + /// + /// Once resolved this request will result in the fetching of the values associated with the + /// given keys. + /// + /// ```rust,no_run + /// # use tikv_client::{KvPair, Config, transaction::Client}; + /// # use futures::Future; + /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.wait().unwrap(); + /// let mut txn = connected_client.begin(); + /// let keys = vec!["TiKV", "TiDB"]; + /// let req = txn.batch_get(keys); + /// let result: Vec = req.wait().unwrap(); + /// // Finish the transaction... + /// txn.commit().wait().unwrap(); + /// ``` + pub fn batch_get(&self, keys: impl IntoIterator>) -> BatchGet { + self.snapshot.batch_get(keys) + } + + pub fn scan(&self, range: impl RangeBounds) -> Scanner { + self.snapshot.scan(range) + } + + pub fn scan_reverse(&self, range: impl RangeBounds) -> Scanner { + self.snapshot.scan_reverse(range) + } + + /// Create a new [`Set`](struct.Set.html) request. + /// + /// Once resolved this request will result in the setting of the value associated with the given key. + /// + /// ```rust,no_run + /// # use tikv_client::{Key, Value, Config, transaction::Client}; + /// # use futures::Future; + /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.wait().unwrap(); + /// let mut txn = connected_client.begin(); + /// let key = "TiKV"; + /// let val = "TiKV"; + /// let req = txn.set(key, val); + /// let result: () = req.wait().unwrap(); + /// // Finish the transaction... + /// txn.commit().wait().unwrap(); + /// ``` + pub fn set(&mut self, key: impl Into, value: impl Into) -> Set { + Set::new(key.into(), value.into()) + } + + /// Create a new [`Delete`](struct.Delete.html) request. + /// + /// Once resolved this request will result in the deletion of the given key. + /// + /// ```rust,no_run + /// # use tikv_client::{Key, Config, transaction::Client}; + /// # use futures::Future; + /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.wait().unwrap(); + /// let mut txn = connected_client.begin(); + /// let key = "TiKV"; + /// let req = txn.delete(key); + /// let result: () = req.wait().unwrap(); + /// // Finish the transaction... + /// txn.commit().wait().unwrap(); + /// ``` + pub fn delete(&mut self, key: impl Into) -> Delete { + Delete::new(key.into()) + } +} + +/// A snapshot of dataset at a particular point in time. +pub struct Snapshot; + +impl Snapshot { + pub fn get(&self, key: impl Into) -> Get { + Get::new(key.into()) + } + + pub fn batch_get(&self, keys: impl IntoIterator>) -> BatchGet { + BatchGet::new(keys.into_iter().map(|v| v.into()).collect()) + } + + pub fn scan(&self, range: impl RangeBounds) -> Scanner { + drop(range); + unimplemented!() + } + + pub fn scan_reverse(&self, range: impl RangeBounds) -> Scanner { + drop(range); + unimplemented!() + } +} + +/// An unresolved [`Transaction::scan`](struct.Transaction.html#method.scan) request. +/// +/// Once resolved this request will result in a scanner over the given keys. pub struct Scanner; impl Stream for Scanner { @@ -89,6 +440,10 @@ pub enum IsolationLevel { ReadCommitted, } +/// An unresolved [`Transaction::get`](struct.Transaction.html#method.get) request. +/// +/// Once resolved this request will result in the fetching of the value associated with the given +/// key. pub struct Get { key: Key, } @@ -109,6 +464,10 @@ impl Future for Get { } } +/// An unresolved [`Transaction::batch_get`](struct.Transaction.html#method.batch_get) request. +/// +/// Once resolved this request will result in the fetching of the values associated with the given +/// keys. pub struct BatchGet { keys: Vec, } @@ -120,7 +479,7 @@ impl BatchGet { } impl Future for BatchGet { - type Item = Value; + type Item = Vec; type Error = Error; fn poll(&mut self) -> Poll { @@ -129,6 +488,9 @@ impl Future for BatchGet { } } +/// An unresolved [`Transaction::commit`](struct.Transaction.html#method.commit) request. +/// +/// Once resolved this request will result in the committing of the transaction. pub struct Commit { txn: Transaction, } @@ -149,6 +511,9 @@ impl Future for Commit { } } +/// An unresolved [`Transaction::rollback`](struct.Transaction.html#method.rollback) request. +/// +/// Once resolved this request will result in the rolling back of the transaction. pub struct Rollback { txn: Transaction, } @@ -169,6 +534,9 @@ impl Future for Rollback { } } +/// An unresolved [`Transaction::lock_keys`](struct.Transaction.html#method.lock_keys) request. +/// +/// Once resolved this request will result in the locking of the given keys. pub struct LockKeys { keys: Vec, } @@ -189,6 +557,10 @@ impl Future for LockKeys { } } +/// An unresolved [`Transaction::set`](struct.Transaction.html#method.set) request. +/// +/// Once resolved this request will result in the setting of the value associated with the given +/// key. pub struct Set { key: Key, value: Value, @@ -211,6 +583,9 @@ impl Future for Set { } } +/// An unresolved [`Transaction::delete`](struct.Transaction.html#method.delete) request. +/// +/// Once resolved this request will result in the deletion of the given key. pub struct Delete { key: Key, } @@ -230,217 +605,3 @@ impl Future for Delete { unimplemented!() } } - -/// A undo-able set of actions on the dataset. -/// -/// Using a transaction you can prepare a set of actions (such as `get`, or `set`) on data at a -/// particular timestamp obtained from the placement driver. -/// -/// Once a transaction is commited, a new commit timestamp is obtained from the placement driver. -pub struct Transaction { - snapshot: Snapshot, -} - -impl Transaction { - /// Create a new transaction operating on the given snapshot. - pub fn new(snapshot: Snapshot) -> Self { - Self { - snapshot - } - } - - /// Commit the actions of the transaction. - /// - /// Once committed, it is no longer possible to `rollback` the actions in the transaction. - pub fn commit(self) -> Commit { - Commit::new(self) - } - - /// Rollback the actions of the transaction. - pub fn rollback(self) -> Rollback { - Rollback::new(self) - } - - pub fn lock_keys(&mut self, keys: impl AsRef<[Key]>) -> LockKeys { - LockKeys::new(keys.as_ref().to_vec().clone()) - } - - pub fn is_readonly(&self) -> bool { - unimplemented!() - } - - /// Returns the timestamp which the transaction started at. - /// - /// - pub fn start_ts(&self) -> Timestamp { - unimplemented!() - } - - /// Get the `Snapshot` the transaction is operating on. - pub fn snapshot(&self) -> Snapshot { - unimplemented!() - } - - /// Set the isolation level of the transaction. - pub fn set_isolation_level(&mut self, _level: IsolationLevel) { - unimplemented!() - } - - pub fn get(&self, key: impl AsRef) -> Get { - self.snapshot.get(key) - } - - pub fn batch_get(&self, keys: impl AsRef<[Key]>) -> BatchGet { - self.snapshot.batch_get(keys) - } - - pub fn scan(&self, range: impl RangeBounds) -> Scanner { - self.snapshot.scan(range) - } - - pub fn scan_reverse(&self, range: impl RangeBounds) -> Scanner { - self.snapshot.scan_reverse(range) - } - - pub fn set(&mut self, key: impl Into, value: impl Into) -> Set { - Set::new(key.into(), value.into()) - } - - pub fn delete(&mut self, key: impl AsRef) -> Delete { - Delete::new(key.as_ref().clone()) - } -} - -/// A snapshot of dataset at a particular point in time. -pub struct Snapshot; - -impl Snapshot { - pub fn get(&self, key: impl AsRef) -> Get { - Get::new(key.as_ref().clone()) - } - - pub fn batch_get(&self, keys: impl AsRef<[Key]>) -> BatchGet { - BatchGet::new(keys.as_ref().to_vec().clone()) - } - - pub fn scan(&self, range: impl RangeBounds) -> Scanner { - drop(range); - unimplemented!() - } - - pub fn scan_reverse(&self, range: impl RangeBounds) -> Scanner { - drop(range); - unimplemented!() - } -} - -/// An unresolved [`Client`](struct.Client.html) connection to a TiKV cluster. -/// -/// Once resolved it will result in a connected [`Client`](struct.Client.html). -/// -/// ```rust,no_run -/// use tikv_client::{Config, transaction::{Client, Connect}}; -/// use futures::Future; -/// -/// let connect: Connect = Client::new(&Config::default()); -/// let client: Client = connect.wait().unwrap(); -/// ``` -pub struct Connect { - config: Config, -} - -impl Connect { - fn new(config: Config) -> Self { - Connect { config } - } -} - -impl Future for Connect { - type Item = Client; - type Error = Error; - - fn poll(&mut self) -> Poll { - let _config = &self.config; - unimplemented!() - } -} - -/// The TiKV transactional [`Client`](struct.Client.html) is used to issue requests to the TiKV server and PD cluster. -pub struct Client {} - -impl Client { - /// Create a new [`Client`](struct.Client.html) once the [`Connect`](struct.Connect.html) resolves. - /// - /// ```rust,no_run - /// use tikv_client::{Config, transaction::Client}; - /// use futures::Future; - /// let connect = Client::new(&Config::default()); - /// let client = connect.wait(); - /// ``` - #[cfg_attr(feature = "cargo-clippy", allow(clippy::new_ret_no_self))] - pub fn new(config: &Config) -> Connect { - Connect::new(config.clone()) - } - - /// Create a new [`Transaction`](struct.Transaction.html) using the timestamp from [`current_timestamp`](struct.Client.html#method.current_timestamp). - /// - /// Using the transaction you can issue commands like [`get`](struct.Transaction.html#method.get) or [`set`](file:///home/hoverbear/git/client-rust/target/doc/tikv_client/transaction/struct.Transaction.html#method.set). - /// - /// ```rust,no_run - /// use tikv_client::{Config, transaction::Client}; - /// use futures::Future; - /// let connect = Client::new(&Config::default()); - /// let client = connect.wait().unwrap(); - /// let transaction = client.begin(); - /// // ... Issue some commands. - /// let commit = transaction.commit(); - /// let result: () = commit.wait().unwrap(); - /// ``` - pub fn begin(&self) -> Transaction { - unimplemented!() - } - - /// Create a new [`Transaction`](struct.Transaction.html) at the provded timestamp. - /// - /// ```rust,no_run - /// use tikv_client::{Config, transaction::Client}; - /// use futures::Future; - /// let connect = Client::new(&Config::default()); - /// let client = connect.wait().unwrap(); - /// let timestamp = client.current_timestamp(); - /// let transaction = client.begin_with_timestamp(timestamp); - /// // ... Issue some commands. - /// let commit = transaction.commit(); - /// let result: () = commit.wait().unwrap(); - /// ``` - pub fn begin_with_timestamp(&self, _timestamp: Timestamp) -> Transaction { - unimplemented!() - } - - /// Get a [`Snapshot`](struct.Snapshot.html) using the timestamp from [`current_timestamp`](struct.Client.html#method.current_timestamp). - /// - /// ```rust,no_run - /// use tikv_client::{Config, transaction::Client}; - /// use futures::Future; - /// let connect = Client::new(&Config::default()); - /// let client = connect.wait().unwrap(); - /// let snapshot = client.snapshot(); - /// // ... Issue some commands. - /// ``` - pub fn snapshot(&self) -> Snapshot { - unimplemented!() - } - - /// Retrieve the current [`Timestamp`](struct.Timestamp.html). - /// - /// ```rust,no_run - /// use tikv_client::{Config, transaction::Client}; - /// use futures::Future; - /// let connect = Client::new(&Config::default()); - /// let client = connect.wait().unwrap(); - /// let timestamp = client.current_timestamp(); - /// ``` - pub fn current_timestamp(&self) -> Timestamp { - unimplemented!() - } -}