diff --git a/Cargo.toml b/Cargo.toml index 1f481829..344150b4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ license = "Apache-2.0" authors = ["The TiKV Project Authors"] repository = "https://github.com/tikv/client-rust" description = "The rust language implementation of TiKV client." +edition = "2018" [lib] name = "tikv_client" diff --git a/examples/raw.rs b/examples/raw.rs index c8bfbdfc..0be637a9 100644 --- a/examples/raw.rs +++ b/examples/raw.rs @@ -11,42 +11,58 @@ // See the License for the specific language governing permissions and // limitations under the License. -extern crate futures; -extern crate tikv_client; - +use futures::future::Future; use std::path::PathBuf; +use tikv_client::{Result, Config, raw::Client, Key, Value}; -use futures::future::Future; -use tikv_client::*; +const KEY: &str = "TiKV"; +const VALUE: &str = "Rust"; +const CUSTOM_CF: &str = "custom_cf"; -fn main() { - let config = Config::new(vec!["127.0.0.1:3379"]).with_security( +fn main() -> Result<()> { + // Create a configuration to use for the example. + // Optionally encrypt the traffic. + let config = Config::new(vec![ + "192.168.0.101:3379", // Avoid a single point of failure, + "192.168.0.100:3379", // use at least two PD endpoints. + ]).with_security( PathBuf::from("/path/to/ca.pem"), PathBuf::from("/path/to/client.pem"), PathBuf::from("/path/to/client-key.pem"), ); - let raw = raw::Client::new(&config) - .wait() - .expect("Could not connect to tikv"); - let key: Key = b"Company".to_vec().into(); - let value: Value = b"PingCAP".to_vec().into(); + // When we first create a client we recieve a `Connect` structure which must be resolved before + // the client is actually connected and usable. + let unconnnected_client = Client::new(&config); + let client = unconnnected_client.wait()?; - raw.put(key.clone(), value.clone()) - .cf("test_cf") - .wait() - .expect("Could not put kv pair to tikv"); - println!("Successfully put {:?}:{:?} to tikv", key, value); + // Requests are created from the connected client. These calls return structures which + // implement `Future`. This means the `Future` must be resolved before the action ever takes + // place. + // + // Here we set the key `TiKV` to have the value `Rust` associated with it. + let put_request = client.put(KEY, VALUE); + let put_result: () = put_request.wait()?; // Returns a `tikv_client::Error` on failure. + println!("Put key \"{}\", value \"{}\".", KEY, VALUE); - let value = raw - .get(&key) - .cf("test_cf") - .wait() - .expect("Could not get value"); - println!("Found val: {:?} for key: {:?}", value, key); + // + // Unlike a standard Rust HashMap all calls take owned values. This is because under the hood + // protobufs must take ownership of the data. If we only took a borrow we'd need to internally // clone it. This is against Rust API guidelines, so you must manage this yourself. + // + // Above, you saw we can use a `&'static str`, this is primarily for making examples short. + // This type is practical to use for real things, and usage forces an internal copy. + // + // It is best to pass a `Vec` in terms of explictness and speed. `String`s and a few other + // types are supported as well, but it all ends up as `Vec` in the end. + let key: String = String::from(KEY); + let value: Value = client.get(key).wait()?; + assert_eq!(value.as_ref(), VALUE); + println!("Get key \"{:?}\" returned value \"{:?}\".", value, KEY); - raw.delete(&key) - .cf("test_cf") + // You can also set the `ColumnFamily` used by the request. + // This is *advanced usage* and should have some special considerations. + let req = raw.delete(&key) + .cf(CUSTOM_CF) .wait() .expect("Could not delete value"); println!("Key: {:?} deleted", key); @@ -74,9 +90,12 @@ fn main() { .expect("Could not scan"); let ranges = [&start..&end, &start..&end]; - raw.batch_scan(&ranges, 10) + raw.batch_scan(ranges, 10) .cf("test_cf") .key_only() .wait() .expect("Could not batch scan"); + + // Cleanly exit. + Ok(()) } diff --git a/examples/transaction.rs b/examples/transaction.rs index 3c77c070..23a1e285 100644 --- a/examples/transaction.rs +++ b/examples/transaction.rs @@ -11,15 +11,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -extern crate futures; -extern crate tikv_client; - +use futures::{future, Future, Stream}; use std::ops::RangeBounds; use std::path::PathBuf; - -use futures::{future, Future, Stream}; -use tikv_client::transaction::{Client, IsolationLevel}; -use tikv_client::*; +use tikv_client::{ + transaction::{Client, IsolationLevel}, + Config, Key, KvPair, Value, +}; fn puts(client: &Client, pairs: impl IntoIterator>) { let mut txn = client.begin(); @@ -28,7 +26,8 @@ fn puts(client: &Client, pairs: impl IntoIterator>) { .into_iter() .map(Into::into) .map(|p| txn.set(p.key().clone(), p.value().clone())), - ).wait() + ) + .wait() .expect("Could not set key value pairs"); txn.commit().wait().expect("Could not commit transaction"); } @@ -49,10 +48,12 @@ fn scan(client: &Client, range: impl RangeBounds, mut limit: usize) { limit -= 1; true }) - }).for_each(|pair| { + }) + .for_each(|pair| { println!("{:?}", pair); Ok(()) - }).wait() + }) + .wait() .expect("Could not scan keys"); } @@ -63,7 +64,8 @@ fn dels(client: &Client, keys: impl IntoIterator) { .into_iter() .map(|p| { txn.delete(p).wait().expect("Could not delete key"); - }).collect(); + }) + .collect(); txn.commit().wait().expect("Could not commit transaction"); } diff --git a/rust-toolchain b/rust-toolchain new file mode 100644 index 00000000..f22b1d64 --- /dev/null +++ b/rust-toolchain @@ -0,0 +1,4 @@ +nightly + +# We explicitly use nightly for the moment even though we target stable so we +# can develop for the 2018 edition. \ No newline at end of file diff --git a/src/errors.rs b/src/errors.rs index a8d81488..4ac32d41 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -11,61 +11,80 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::error; -use std::result; +use grpcio; +use quick_error::quick_error; +use std::{error, result}; quick_error!{ + /// An error originating from the TiKV client or dependencies. + /// + /// This client currently uses [`quick_error`](https://docs.rs/quick-error/1.2.2/quick_error/) + /// for errors. *This may change in future versions.* #[derive(Debug)] pub enum Error { + /// Wraps a a `std::io::Error`. Io(err: ::std::io::Error) { from() cause(err) description(err.description()) } - Grpc(err: ::grpc::Error) { + /// Wraps a `grpcio::Error`. + Grpc(err: grpcio::Error) { from() cause(err) description(err.description()) } + /// Represents that a futures oneshot channel was cancelled. Canceled(err: ::futures::sync::oneshot::Canceled) { from() cause(err) description(err.description()) } + /// An unknown error. + /// + /// Generally, this is not an expected error. Please report it if encountered. Other(err: Box) { from() cause(err.as_ref()) description(err.description()) display("unknown error {:?}", err) } + /// A region was not found for the given key. RegionForKeyNotFound(key: Vec) { description("region is not found") display("region is not found for key {:?}", key) } + /// A region was not found. RegionNotFound(id: u64) { description("region is not found") display("region {:?} is not found", id) } + /// The peer is not a leader of the given region. NotLeader(region_id: u64) { description("peer is not leader") display("peer is not leader for region {:?}.", region_id) } + /// The store does not match. StoreNotMatch { description("store not match") display("store not match") } + /// The given key is not eithin the given region. KeyNotInRegion(key: Vec, region_id: u64, start_key: Vec, end_key: Vec) { description("region is not found") display("key {:?} is not in region {:?}: [{:?}, {:?})", key, region_id, start_key, end_key) } + /// A stale epoch. StaleEpoch { description("stale epoch") display("stale epoch") } + /// The server is too busy. ServerIsBusy(reason: String) { description("server is busy") display("server is busy: {:?}", reason) } + /// The given raft entry is too large for the region. RaftEntryTooLarge(region_id: u64, entry_size: u64) { description("raft entry too large") display("{:?} bytes raft entry of region {:?} is too large", entry_size, region_id) @@ -73,4 +92,5 @@ quick_error!{ } } +/// A result holding an [`Error`](enum.Error.html). pub type Result = result::Result; diff --git a/src/lib.rs b/src/lib.rs index 1827b26d..563104c0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,34 +11,76 @@ // See the License for the specific language governing permissions and // limitations under the License. -extern crate futures; -extern crate serde; -#[macro_use] -extern crate serde_derive; -#[macro_use] -extern crate quick_error; -extern crate grpcio as grpc; - -pub mod errors; +use serde_derive::*; +use std::{ + ops::{Deref, Bound, Range, RangeFrom, RangeFull, RangeInclusive, RangeTo, RangeToInclusive}, + path::PathBuf, +}; + +mod errors; pub mod raw; pub mod transaction; -use std::ops::Deref; -use std::path::PathBuf; - -pub use errors::Error; -pub use errors::Result; +#[doc(inline)] +pub use crate::errors::Error; +#[doc(inline)] +pub use crate::errors::Result; +/// The key part of a key/value pair. +/// +/// In TiKV, keys are an ordered sequence of bytes. This has an advantage over choosing `String` as valid `UTF-8` is not required. This means that the user is permitted to store any data they wish, +/// as long as it can be represented by bytes. (Which is to say, pretty much anything!) +/// +/// This is a *wrapper type* that implements `Deref>` so it can be used like one transparently. +/// +/// This type also implements `From` for many types. With one exception, these are all done without reallocation. Using a `&'static str`, like many examples do for simplicity, has an internal +/// allocation cost. +/// +/// This type wraps around an owned value, so it should be treated it like `String` or `Vec` +/// over a `&str` or `&[u8]`. +/// +/// ```rust +/// use tikv_client::Key; +/// +/// let static_str: &'static str = "TiKV"; +/// let from_static_str = Key::from(static_str); +/// +/// let string: String = String::from(static_str); +/// let from_string = Key::from(string); +/// assert_eq!(from_static_str, from_string); +/// +/// let vec: Vec = static_str.as_bytes().to_vec(); +/// let from_vec = Key::from(vec); +/// assert_eq!(from_static_str, from_vec); +/// +/// let bytes = static_str.as_bytes().to_vec(); +/// let from_bytes = Key::from(bytes); +/// assert_eq!(from_static_str, from_bytes); +/// ``` +/// +/// **But, you should not need to worry about all this:** Many functions which accept a `Key` +/// accept an `Into`, which means all of the above types can be passed directly to those +/// functions. #[derive(Default, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug)] pub struct Key(Vec); -#[derive(Default, Clone, Eq, PartialEq, Hash, Debug)] -pub struct Value(Vec); -#[derive(Default, Clone, Eq, PartialEq, Debug)] -pub struct KvPair(Key, Value); -impl Into for Vec { - fn into(self) -> Key { - Key(self) +impl From> for Key { + fn from(v: Vec) -> Self { + Key(v) + } +} + +impl From for Key { + fn from(v: String) -> Key { + Key(v.into_bytes()) + } +} + +impl<'a> From<&'static str> for Key { + fn from(v: &'static str) -> Key { + let mut vec = Vec::new(); + vec.extend_from_slice(v.as_bytes()); + Key(vec) } } @@ -56,9 +98,67 @@ impl Deref for Key { } } -impl Into for Vec { - fn into(self) -> Value { - Value(self) +impl Key { + pub fn new(value: Vec) -> Self { + Key(value) + } +} + +/// The value part of a key/value pair. +/// +/// In TiKV, values are an ordered sequence of bytes. This has an advantage over choosing `String` as valid `UTF-8` is not required. This means that the user is permitted to store any data they wish, +/// as long as it can be represented by bytes. (Which is to say, pretty much anything!) +/// +/// This is a *wrapper type* that implements `Deref>` so it can be used like one transparently. +/// +/// This type also implements `From` for many types. With one exception, these are all done without reallocation. Using a `&'static str`, like many examples do for simplicity, has an internal +/// allocation cost. +/// +/// This type wraps around an owned value, so it should be treated it like `String` or `Vec` +/// over a `&str` or `&[u8]`. +/// +/// ```rust +/// use tikv_client::Value; +/// +/// let static_str: &'static str = "TiKV"; +/// let from_static_str = Value::from(static_str); +/// +/// let string: String = String::from(static_str); +/// let from_string = Value::from(string); +/// assert_eq!(from_static_str, from_string); +/// +/// let vec: Vec = static_str.as_bytes().to_vec(); +/// let from_vec = Value::from(vec); +/// assert_eq!(from_static_str, from_vec); +/// +/// let bytes = static_str.as_bytes().to_vec(); +/// let from_bytes = Value::from(bytes); +/// assert_eq!(from_static_str, from_bytes); +/// ``` +/// +/// **But, you should not need to worry about all this:** Many functions which accept a `Value` +/// accept an `Into`, which means all of the above types can be passed directly to those +/// functions. +#[derive(Default, Clone, Eq, PartialEq, Hash, Debug)] +pub struct Value(Vec); + +impl From> for Value { + fn from(v: Vec) -> Self { + Value(v) + } +} + +impl From for Value { + fn from(v: String) -> Value { + Value(v.into_bytes()) + } +} + +impl From<&'static str> for Value { + fn from(v: &'static str) -> Value { + let mut vec = Vec::new(); + vec.extend_from_slice(v.as_bytes()); + Value(vec) } } @@ -70,37 +170,97 @@ impl Deref for Value { } } +/// A key/value pair. +/// +/// ```rust +/// # use tikv_client::{Key, Value, KvPair}; +/// let key = "key"; +/// let value = "value"; +/// let constructed = KvPair::new(key, value); +/// let from_tuple = KvPair::from((key, value)); +/// assert_eq!(constructed, from_tuple); +/// ``` +/// +/// **But, you should not need to worry about all this:** Many functions which accept a `KvPair` +/// accept an `Into`, which means all of the above types can be passed directly to those +/// functions. +#[derive(Default, Clone, Eq, PartialEq, Debug)] +pub struct KvPair(Key, Value); + impl KvPair { - pub fn new(key: Key, value: Value) -> Self { - KvPair(key, value) + /// Create a new `KvPair`. + pub fn new(key: impl Into, value: impl Into) -> Self { + KvPair(key.into(), value.into()) } + /// Immutably borrow the `Key` part of the `KvPair`. pub fn key(&self) -> &Key { &self.0 } + /// Immutably borrow the `Value` part of the `KvPair`. pub fn value(&self) -> &Value { &self.1 } + + /// Mutably borrow the `Key` part of the `KvPair`. + pub fn key_mut(&mut self) -> &mut Key { + &mut self.0 + } + + /// Mutably borrow the `Value` part of the `KvPair`. + pub fn value_mut(&mut self) -> &mut Value { + &mut self.1 + } + + /// Set the `Key` part of the `KvPair`. + pub fn set_key(&mut self, k: impl Into) { + self.0 = k.into(); + } + + /// Set the `Value` part of the `KvPair`. + pub fn set_value(&mut self, v: impl Into) { + self.1 = v.into(); + } } -impl Into for (Key, Value) { - fn into(self) -> KvPair { - KvPair(self.0, self.1) +impl From<(K, V)> for KvPair +where K: Into, V: Into { + fn from((k, v): (K, V)) -> Self { + KvPair(k.into(), v.into()) } } +/// The configuration for either a [`raw::Client`](raw/struct.Client.html) or a +/// [`transaction::Client`](transaction/struct.Client.html). +/// +/// Because TiKV is managed by a [PD](https://github.com/pingcap/pd/) cluster, the endpoints for PD +/// must be provided, **not** the TiKV nodes. +/// +/// It's important to **include more than one PD endpoint** (include all, if possible!) This helps avoid having a *single point of failure*. +/// +/// By default, this client will use an insecure connection over encryption-on-the-wire. Your +/// deployment may have chosen to rely on security measures such as a private network, or a VPN +/// layer providing secure transmission. TiKV does not currently offer encryption-at-rest. #[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)] #[serde(default)] #[serde(rename_all = "kebab-case")] pub struct Config { - pub pd_endpoints: Vec, - pub ca_path: Option, - pub cert_path: Option, - pub key_path: Option, + pd_endpoints: Vec, + ca_path: Option, + cert_path: Option, + key_path: Option, } impl Config { + /// Create a new [`Config`](struct.Config.html) which coordinates with the given PD endpoints. + /// + /// It's important to **include more than one PD endpoint** (include all, if possible!) This helps avoid having a *single point of failure*. + /// + /// ```rust + /// # use tikv_client::Config; + /// let config = Config::new(vec!["192.168.0.100:2379", "192.168.0.101:2379"]); + /// ``` pub fn new(pd_endpoints: impl IntoIterator>) -> Self { Config { pd_endpoints: pd_endpoints.into_iter().map(Into::into).collect(), @@ -110,6 +270,15 @@ impl Config { } } + /// Set the certificate authority, certificate, and key locations for the [`Config`](struct.Config.html). + /// + /// By default, TiKV connections do not have utilize transport layer security. Enable it by setting these values. + /// + /// ```rust + /// # use tikv_client::Config; + /// let config = Config::new(vec!["192.168.0.100:2379", "192.168.0.101:2379"]) + /// .with_security("root.ca", "internal.cert", "internal.key"); + /// ``` pub fn with_security( mut self, ca_path: impl Into, @@ -122,3 +291,129 @@ impl Config { self } } + +/// A convenience trait for expressing ranges. +/// +/// In TiKV, keys are an ordered sequence of bytes. This means we can have ranges over those +/// bytes. Eg `001` is before `010`. +/// +/// This trait has implementations for common range types like `a..b`, `a..=b` where `a` and `b` +/// `impl Into`. You could implement this for your own types. +/// +/// ```rust +/// use tikv_client::{KeyRange, Key}; +/// use std::ops::{Range, RangeInclusive, RangeTo, RangeToInclusive, RangeFrom, RangeFull, Bound}; +/// +/// let explict_range: Range = Range { start: Key::from("Rust"), end: Key::from("TiKV") }; +/// let from_explict_range = explict_range.into_bounds(); +/// +/// let range: Range<&str> = "Rust".."TiKV"; +/// let from_range = range.into_bounds(); +/// assert_eq!(from_explict_range, from_range); +/// +/// let range: RangeInclusive<&str> = "Rust"..="TiKV"; +/// let from_range = range.into_bounds(); +/// assert_eq!( +/// (Bound::Included(Key::from("Rust")), Bound::Included(Key::from("TiKV"))), +/// from_range +/// ); +/// +/// let range_from: RangeFrom<&str> = "Rust"..; +/// let from_range_from = range_from.into_bounds(); +/// assert_eq!( +/// (Bound::Included(Key::from("Rust")), Bound::Unbounded), +/// from_range_from, +/// ); +/// +/// let range_to: RangeTo<&str> = .."TiKV"; +/// let from_range_to = range_to.into_bounds(); +/// assert_eq!( +/// (Bound::Unbounded, Bound::Excluded(Key::from("TiKV"))), +/// from_range_to, +/// ); +/// +/// let range_to_inclusive: RangeToInclusive<&str> = ..="TiKV"; +/// let from_range_to_inclusive = range_to_inclusive.into_bounds(); +/// assert_eq!( +/// (Bound::Unbounded, Bound::Included(Key::from("TiKV"))), +/// from_range_to_inclusive, +/// ); +/// +/// let range_full: RangeFull = ..; +/// let from_range_full = range_full.into_bounds(); +/// assert_eq!( +/// (Bound::Unbounded, Bound::Unbounded), +/// from_range_full +/// ); +/// ``` +/// +/// **But, you should not need to worry about all this:** Many functions accept a `impl KeyRange` +/// which means all of the above types can be passed directly to those functions. +pub trait KeyRange { + fn into_bounds(self) -> (Bound, Bound); +} + +impl> KeyRange for Range { + fn into_bounds(self) -> (Bound, Bound) { + ( + Bound::Included(self.start.into()), + Bound::Excluded(self.end.into()), + ) + } +} + +impl> KeyRange for RangeFrom { + fn into_bounds(self) -> (Bound, Bound) { + (Bound::Included(self.start.into()), Bound::Unbounded) + } +} + +impl KeyRange for RangeFull { + fn into_bounds(self) -> (Bound, Bound) { + (Bound::Unbounded, Bound::Unbounded) + } +} + +impl> KeyRange for RangeInclusive { + fn into_bounds(self) -> (Bound, Bound) { + let (start, end) = self.into_inner(); + (Bound::Included(start.into()), Bound::Included(end.into())) + } +} + +impl> KeyRange for RangeTo { + fn into_bounds(self) -> (Bound, Bound) { + (Bound::Unbounded, Bound::Excluded(self.end.into())) + } +} + +impl> KeyRange for RangeToInclusive { + fn into_bounds(self) -> (Bound, Bound) { + (Bound::Unbounded, Bound::Included(self.end.into())) + } +} + +impl> KeyRange for (Bound, Bound) { + fn into_bounds(self) -> (Bound, Bound) { + (transmute_bound(self.0), transmute_bound(self.1)) + } +} + +/// Unfortunately due to the API limitations of `RangeBound` we can only ever get an `&` (not a +/// `&mut` or owned value) of a `Bound`. So in order to coherce the value into the bound we want +/// we must clone the data. +/// +/// The only way to avoid this would be to only accept two `Bound` arguments, not one +/// `RangeBound` argument. This would mean `"abc"..="xyz"` would not be possible. +/// +fn transmute_bound(b: Bound) -> Bound +where + K: Into, +{ + use std::ops::Bound::*; + match b { + Included(k) => Included(k.into()), + Excluded(k) => Excluded(k.into()), + Unbounded => Unbounded, + } +} diff --git a/src/raw.rs b/src/raw.rs index 0bad042d..954bb8ae 100644 --- a/src/raw.rs +++ b/src/raw.rs @@ -11,12 +11,271 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::ops::RangeBounds; +/*! Raw related functionality. +Using the [`raw::Client`](struct.Client.html) you can utilize TiKV's raw interface. + +This interface offers optimal performance as it does not require coordination with a timestamp +oracle, while the transactional interface does. + +**Warning:** It is not advisible to use the both raw and transactional functionality in the same keyspace. + */ + +use crate::{Config, Error, Key, KeyRange, KvPair, Value}; use futures::{Future, Poll}; +use std::{u32, ops::{Bound, Deref}}; + +/// The TiKV raw [`Client`](struct.Client.html) is used to issue requests to the TiKV server and PD cluster. +pub struct Client; + +impl Client { + /// Create a new [`Client`](struct.Client.html) once the [`Connect`](struct.Connect.html) resolves. + /// + /// ```rust,no_run + /// use tikv_client::{Config, raw::Client}; + /// use futures::Future; + /// let connect = Client::new(&Config::default()); + /// let client = connect.wait(); + /// ``` + #[cfg_attr(feature = "cargo-clippy", allow(clippy::new_ret_no_self))] + pub fn new(config: &Config) -> Connect { + Connect::new(config.clone()) + } + + /// Create a new [`Get`](struct.Get.html) request. + /// + /// Once resolved this request will result in the fetching of the value associated with the + /// given key. + /// + /// ```rust,no_run + /// # use tikv_client::{Value, Config, raw::Client}; + /// # use futures::Future; + /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.wait().unwrap(); + /// let key = "TiKV"; + /// let req = connected_client.get(key); + /// let result: Value = req.wait().unwrap(); + /// ``` + pub fn get(&self, key: impl Into) -> Get { + Get::new(self, key.into()) + } + + /// Create a new [`BatchGet`](struct.BatchGet.html) request. + /// + /// Once resolved this request will result in the fetching of the values associated with the + /// given keys. + /// + /// ```rust,no_run + /// # use tikv_client::{KvPair, Config, raw::Client}; + /// # use futures::Future; + /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.wait().unwrap(); + /// let keys = vec!["TiKV", "TiDB"]; + /// let req = connected_client.batch_get(keys); + /// let result: Vec = req.wait().unwrap(); + /// ``` + pub fn batch_get(&self, keys: impl IntoIterator>) -> BatchGet { + BatchGet::new(self, keys.into_iter().map(|v| v.into()).collect()) + } + + /// Create a new [`Put`](struct.Put.html) request. + /// + /// Once resolved this request will result in the setting of the value associated with the given key. + /// + /// ```rust,no_run + /// # use tikv_client::{Key, Value, Config, raw::Client}; + /// # use futures::Future; + /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.wait().unwrap(); + /// let key = "TiKV"; + /// let val = "TiKV"; + /// let req = connected_client.put(key, val); + /// let result: () = req.wait().unwrap(); + /// ``` + pub fn put(&self, key: impl Into, value: impl Into) -> Put { + Put::new(self, key.into(), value.into()) + } + + /// Create a new [`BatchPut`](struct.BatchPut.html) request. + /// + /// Once resolved this request will result in the setting of the value associated with the given key. + /// + /// ```rust,no_run + /// # use tikv_client::{Error, Result, KvPair, Key, Value, Config, raw::Client}; + /// # use futures::Future; + /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.wait().unwrap(); + /// let kvpair1 = ("PD", "Go"); + /// let kvpair2 = ("TiKV", "Rust"); + /// let iterable = vec![kvpair1, kvpair2]; + /// let req = connected_client.batch_put(iterable); + /// let result: () = req.wait().unwrap(); + /// ``` + pub fn batch_put(&self, pairs: impl IntoIterator>) -> BatchPut { + BatchPut::new(self, pairs.into_iter().map(Into::into).collect()) + } + + /// Create a new [`Delete`](struct.Delete.html) request. + /// + /// Once resolved this request will result in the deletion of the given key. + /// + /// ```rust,no_run + /// # use tikv_client::{Key, Config, raw::Client}; + /// # use futures::Future; + /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.wait().unwrap(); + /// let key = "TiKV"; + /// let req = connected_client.delete(key); + /// let result: () = req.wait().unwrap(); + /// ``` + pub fn delete(&self, key: impl Into) -> Delete { + Delete::new(self, key.into()) + } + + /// Create a new [`BatchDelete`](struct.BatchDelete.html) request. + /// + /// Once resolved this request will result in the deletion of the given keys. + /// + /// ```rust,no_run + /// # use tikv_client::{Config, raw::Client}; + /// # use futures::Future; + /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.wait().unwrap(); + /// let keys = vec!["TiKV", "TiDB"]; + /// let req = connected_client.batch_delete(keys); + /// let result: () = req.wait().unwrap(); + /// ``` + pub fn batch_delete(&self, keys: impl IntoIterator>) -> BatchDelete { + BatchDelete::new(self, keys.into_iter().map(Into::into).collect()) + } + + /// Create a new [`Scan`](struct.Scan.html) request. + /// + /// Once resolved this request will result in a scanner over the given keys. + /// + /// If not passed a `limit` parameter, it will default to `u32::MAX`. + /// + /// ```rust,no_run + /// # use tikv_client::{KvPair, Config, raw::Client}; + /// # use futures::Future; + /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.wait().unwrap(); + /// let inclusive_range = "TiKV"..="TiDB"; + /// let req = connected_client.scan(inclusive_range, 2); + /// let result: Vec = req.wait().unwrap(); + /// ``` + pub fn scan(&self, range: impl KeyRange, limit: impl Into>) -> Scan { + Scan::new(self, range.into_bounds(), limit.into().unwrap_or(u32::MAX)) + } + + /// Create a new [`BatchScan`](struct.BatchScan.html) request. + /// + /// Once resolved this request will result in a set of scanners over the given keys. + /// + /// If not passed a `limit` parameter, it will default to `u32::MAX`. + /// + /// ```rust,no_run + /// # use tikv_client::{Key, Config, raw::Client}; + /// # use futures::Future; + /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.wait().unwrap(); + /// let inclusive_range1 = "TiDB"..="TiKV"; + /// let inclusive_range2 = "TiKV"..="TiSpark"; + /// let iterable = vec![inclusive_range1, inclusive_range2]; + /// let req = connected_client.batch_scan(iterable, 2); + /// let result = req.wait(); + /// ``` + pub fn batch_scan( + &self, + ranges: impl IntoIterator, + each_limit: impl Into>, + ) -> BatchScan { + BatchScan::new( + self, + ranges.into_iter().map(KeyRange::into_bounds).collect(), + each_limit.into().unwrap_or(u32::MAX), + ) + } + + /// Create a new [`DeleteRange`](struct.DeleteRange.html) request. + /// + /// Once resolved this request will result in the deletion of all keys over the given range. + /// + /// If not passed a `limit` parameter, it will default to `u32::MAX`. + /// + /// ```rust,no_run + /// # use tikv_client::{Key, Config, raw::Client}; + /// # use futures::Future; + /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.wait().unwrap(); + /// let inclusive_range = "TiKV"..="TiDB"; + /// let req = connected_client.delete_range(inclusive_range); + /// let result: () = req.wait().unwrap(); + /// ``` + pub fn delete_range(&self, range: impl KeyRange) -> DeleteRange { + DeleteRange::new(self, range.into_bounds()) + } +} + +/// An unresolved [`Client`](struct.Client.html) connection to a TiKV cluster. +/// +/// Once resolved it will result in a connected [`Client`](struct.Client.html). +/// +/// ```rust,no_run +/// use tikv_client::{Config, raw::{Client, Connect}}; +/// use futures::Future; +/// +/// let connect: Connect = Client::new(&Config::default()); +/// let client: Client = connect.wait().unwrap(); +/// ``` +pub struct Connect { + config: Config, +} + +impl Connect { + fn new(config: Config) -> Self { + Connect { config } + } +} + +impl Future for Connect { + type Item = Client; + type Error = Error; + + fn poll(&mut self) -> Poll { + let _config = &self.config; + unimplemented!() + } +} -use {Config, Error, Key, KvPair, Value}; +/// A [`ColumnFamily`](struct.ColumnFamily.html) is an optional parameter for [`raw::Client`](struct.Client.html) requests. +/// +/// TiKV uses RocksDB's `ColumnFamily` support. You can learn more about RocksDB's `ColumnFamily`s [on their wiki](https://github.com/facebook/rocksdb/wiki/Column-Families). +/// +/// By default in TiKV data is stored in three different `ColumnFamily` values, configurable in the TiKV server's configuration: +/// +/// * Default: Where real user data is stored. Set by `[rocksdb.defaultcf]`. +/// * Write: Where MVCC and index related data are stored. Set by `[rocksdb.writecf]`. +/// * Lock: Where lock information is stored. Set by `[rocksdb.lockcf]`. +/// +/// Not providing a call a `ColumnFamily` means it will use the default value of `default`. +/// +/// The best (and only) way to create a [`ColumnFamily`](struct.ColumnFamily.html) is via the `From` implementation: +/// +/// ```rust +/// # use tikv_client::raw::ColumnFamily; +/// +/// let cf = ColumnFamily::from("write"); +/// let cf = ColumnFamily::from(String::from("write")); +/// let cf = ColumnFamily::from(&String::from("write")); +/// ``` +/// +/// This is a *wrapper type* that implements `Deref` so it can be used like one transparently. +/// +/// **But, you should not need to worry about all this:** Many functions which accept a +/// `ColumnFamily` accept an `Into`, which means all of the above types can be passed +/// directly to those functions. #[derive(Default, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug)] pub struct ColumnFamily(String); @@ -29,6 +288,17 @@ where } } +impl Deref for ColumnFamily { + type Target = String; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +/// An unresolved [`Client::get`](struct.Client.html#method.get) request. +/// +/// Once resolved this request will result in the fetching of the value associated with the given +/// key. pub struct Get<'a> { client: &'a Client, key: Key, @@ -44,6 +314,7 @@ impl<'a> Get<'a> { } } + /// Set the (optional) [`ColumnFamily`](struct.ColumnFamily.html). pub fn cf(mut self, cf: impl Into) -> Self { self.cf = Some(cf.into()); self @@ -62,6 +333,10 @@ impl<'a> Future for Get<'a> { } } +/// An unresolved [`Client::batch_get`](struct.Client.html#method.batch_get) request. +/// +/// Once resolved this request will result in the fetching of the values associated with the given +/// keys. pub struct BatchGet<'a> { client: &'a Client, keys: Vec, @@ -76,7 +351,7 @@ impl<'a> BatchGet<'a> { cf: None, } } - + /// Set the (optional) [`ColumnFamily`](struct.ColumnFamily.html). pub fn cf(mut self, cf: impl Into) -> Self { self.cf = Some(cf.into()); self @@ -85,7 +360,7 @@ impl<'a> BatchGet<'a> { impl<'a> Future for BatchGet<'a> { type Item = Vec; - type Error = (); + type Error = Error; fn poll(&mut self) -> Poll { let _ = &self.client; @@ -95,6 +370,10 @@ impl<'a> Future for BatchGet<'a> { } } +/// An unresolved [`Client::put`](struct.Client.html#method.put) request. +/// +/// Once resolved this request will result in the setting of the value associated with the given +/// key. pub struct Put<'a> { client: &'a Client, key: Key, @@ -112,6 +391,7 @@ impl<'a> Put<'a> { } } + /// Set the (optional) [`ColumnFamily`](struct.ColumnFamily.html). pub fn cf(mut self, cf: impl Into) -> Self { self.cf = Some(cf.into()); self @@ -120,7 +400,7 @@ impl<'a> Put<'a> { impl<'a> Future for Put<'a> { type Item = (); - type Error = (); + type Error = Error; fn poll(&mut self) -> Poll { let _ = &self.client; @@ -131,6 +411,9 @@ impl<'a> Future for Put<'a> { } } +/// An unresolved [`Client::batch_put`](struct.Client.html#method.batch_put) request. +/// +/// Once resolved this request will result in the setting of the value associated with the given key. pub struct BatchPut<'a> { client: &'a Client, pairs: Vec, @@ -146,6 +429,7 @@ impl<'a> BatchPut<'a> { } } + /// Set the (optional) [`ColumnFamily`](struct.ColumnFamily.html). pub fn cf(mut self, cf: impl Into) -> Self { self.cf = Some(cf.into()); self @@ -154,7 +438,7 @@ impl<'a> BatchPut<'a> { impl<'a> Future for BatchPut<'a> { type Item = (); - type Error = (); + type Error = Error; fn poll(&mut self) -> Poll { let _ = &self.client; @@ -164,6 +448,9 @@ impl<'a> Future for BatchPut<'a> { } } +/// An unresolved [`Client::delete`](struct.Client.html#method.delete) request. +/// +/// Once resolved this request will result in the deletion of the given key. pub struct Delete<'a> { client: &'a Client, key: Key, @@ -179,6 +466,7 @@ impl<'a> Delete<'a> { } } + /// Set the (optional) [`ColumnFamily`](struct.ColumnFamily.html). pub fn cf(mut self, cf: impl Into) -> Self { self.cf = Some(cf.into()); self @@ -187,7 +475,7 @@ impl<'a> Delete<'a> { impl<'a> Future for Delete<'a> { type Item = (); - type Error = (); + type Error = Error; fn poll(&mut self) -> Poll { let _ = &self.client; @@ -197,6 +485,9 @@ impl<'a> Future for Delete<'a> { } } +/// An unresolved [`Client::batch_delete`](struct.Client.html#method.batch_delete) request. +/// +/// Once resolved this request will result in the deletion of the given keys. pub struct BatchDelete<'a> { client: &'a Client, keys: Vec, @@ -212,6 +503,7 @@ impl<'a> BatchDelete<'a> { } } + /// Set the (optional) [`ColumnFamily`](struct.ColumnFamily.html). pub fn cf(mut self, cf: impl Into) -> Self { self.cf = Some(cf.into()); self @@ -220,7 +512,7 @@ impl<'a> BatchDelete<'a> { impl<'a> Future for BatchDelete<'a> { type Item = (); - type Error = (); + type Error = Error; fn poll(&mut self) -> Poll { let _ = &self.client; @@ -230,9 +522,12 @@ impl<'a> Future for BatchDelete<'a> { } } +/// An unresolved [`Client::scan`](struct.Client.html#method.scan) request. +/// +/// Once resolved this request will result in a scanner over the given keys. pub struct Scan<'a> { client: &'a Client, - range: (Key, Key), + range: (Bound, Bound), limit: u32, key_only: bool, cf: Option, @@ -240,7 +535,7 @@ pub struct Scan<'a> { } impl<'a> Scan<'a> { - fn new(client: &'a Client, range: (Key, Key), limit: u32) -> Self { + fn new(client: &'a Client, range: (Bound, Bound), limit: u32) -> Self { Scan { client, range, @@ -256,6 +551,7 @@ impl<'a> Scan<'a> { self } + /// Set the (optional) [`ColumnFamily`](struct.ColumnFamily.html). pub fn cf(mut self, cf: impl Into) -> Self { self.cf = Some(cf.into()); self @@ -269,7 +565,7 @@ impl<'a> Scan<'a> { impl<'a> Future for Scan<'a> { type Item = Vec; - type Error = (); + type Error = Error; fn poll(&mut self) -> Poll { let _ = &self.client; @@ -281,9 +577,12 @@ impl<'a> Future for Scan<'a> { } } +/// An unresolved [`Client::batch_scan`](struct.Client.html#method.batch_scan) request. +/// +/// Once resolved this request will result in a set of scanners over the given keys. pub struct BatchScan<'a> { client: &'a Client, - ranges: Vec<(Key, Key)>, + ranges: Vec<(Bound, Bound)>, each_limit: u32, key_only: bool, cf: Option, @@ -291,7 +590,7 @@ pub struct BatchScan<'a> { } impl<'a> BatchScan<'a> { - fn new(client: &'a Client, ranges: Vec<(Key, Key)>, each_limit: u32) -> Self { + fn new(client: &'a Client, ranges: Vec<(Bound, Bound)>, each_limit: u32) -> Self { BatchScan { client, ranges, @@ -307,6 +606,7 @@ impl<'a> BatchScan<'a> { self } + /// Set the (optional) [`ColumnFamily`](struct.ColumnFamily.html). pub fn cf(mut self, cf: impl Into) -> Self { self.cf = Some(cf.into()); self @@ -320,7 +620,7 @@ impl<'a> BatchScan<'a> { impl<'a> Future for BatchScan<'a> { type Item = Vec; - type Error = (); + type Error = Error; fn poll(&mut self) -> Poll { let _ = &self.client; @@ -332,14 +632,17 @@ impl<'a> Future for BatchScan<'a> { } } +/// An unresolved [`Client::delete_range`](struct.Client.html#method.delete_range) request. +/// +/// Once resolved this request will result in the deletion of all keys over the given range. pub struct DeleteRange<'a> { client: &'a Client, - range: (Key, Key), + range: (Bound, Bound), cf: Option, } impl<'a> DeleteRange<'a> { - fn new(client: &'a Client, range: (Key, Key)) -> Self { + fn new(client: &'a Client, range: (Bound, Bound)) -> Self { DeleteRange { client, range, @@ -347,6 +650,7 @@ impl<'a> DeleteRange<'a> { } } + /// Set the (optional) [`ColumnFamily`](struct.ColumnFamily.html). pub fn cf(mut self, cf: impl Into) -> Self { self.cf = Some(cf.into()); self @@ -355,7 +659,7 @@ impl<'a> DeleteRange<'a> { impl<'a> Future for DeleteRange<'a> { type Item = (); - type Error = (); + type Error = Error; fn poll(&mut self) -> Poll { let _ = &self.client; @@ -364,80 +668,3 @@ impl<'a> Future for DeleteRange<'a> { unimplemented!() } } - -pub struct Connect { - config: Config, -} - -impl Connect { - fn new(config: Config) -> Self { - Connect { config } - } -} - -impl Future for Connect { - type Item = Client; - type Error = Error; - - fn poll(&mut self) -> Poll { - let _config = &self.config; - unimplemented!() - } -} - -pub struct Client; - -impl Client { - #![cfg_attr(feature = "cargo-clippy", allow(new_ret_no_self))] - pub fn new(config: &Config) -> Connect { - Connect::new(config.clone()) - } - - pub fn get(&self, key: impl AsRef) -> Get { - Get::new(self, key.as_ref().clone()) - } - - pub fn batch_get(&self, keys: impl AsRef<[Key]>) -> BatchGet { - BatchGet::new(self, keys.as_ref().to_vec()) - } - - pub fn put(&self, key: impl Into, value: impl Into) -> Put { - Put::new(self, key.into(), value.into()) - } - - pub fn batch_put(&self, pairs: impl IntoIterator>) -> BatchPut { - BatchPut::new(self, pairs.into_iter().map(Into::into).collect()) - } - - pub fn delete(&self, key: impl AsRef) -> Delete { - Delete::new(self, key.as_ref().clone()) - } - - pub fn batch_delete(&self, keys: impl AsRef<[Key]>) -> BatchDelete { - BatchDelete::new(self, keys.as_ref().to_vec()) - } - - pub fn scan(&self, range: impl RangeBounds, limit: u32) -> Scan { - Scan::new(self, Self::extract_range(&range), limit) - } - - pub fn batch_scan(&self, ranges: Ranges, each_limit: u32) -> BatchScan - where - Ranges: AsRef<[Bounds]>, - Bounds: RangeBounds, - { - BatchScan::new( - self, - ranges.as_ref().iter().map(Self::extract_range).collect(), - each_limit, - ) - } - - pub fn delete_range(&self, range: impl RangeBounds) -> DeleteRange { - DeleteRange::new(self, Self::extract_range(&range)) - } - - fn extract_range(_range: &impl RangeBounds) -> (Key, Key) { - unimplemented!() - } -} diff --git a/src/transaction.rs b/src/transaction.rs index ffa4021b..2f97cf66 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -11,18 +11,136 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::ops::RangeBounds; +/*! Transactional related functionality. + +Using the [`transaction::Client`](struct.Client.html) you can utilize TiKV's transactional interface. +This interface offers SQL-like transactions on top of the raw interface. + +**Warning:** It is not advisible to use the both raw and transactional functionality in the same keyspace. + */ +use crate::{Config, Error, Key, KvPair, Value}; use futures::{Future, Poll, Stream}; +use std::ops::RangeBounds; + +/// The TiKV transactional [`Client`](struct.Client.html) is used to issue requests to the TiKV server and PD cluster. +pub struct Client; + +impl Client { + /// Create a new [`Client`](struct.Client.html) once the [`Connect`](struct.Connect.html) resolves. + /// + /// ```rust,no_run + /// use tikv_client::{Config, transaction::Client}; + /// use futures::Future; + /// let connect = Client::new(&Config::default()); + /// let client = connect.wait(); + /// ``` + #[cfg_attr(feature = "cargo-clippy", allow(clippy::new_ret_no_self))] + pub fn new(config: &Config) -> Connect { + Connect::new(config.clone()) + } + + /// Create a new [`Transaction`](struct.Transaction.html) using the timestamp from [`current_timestamp`](struct.Client.html#method.current_timestamp). + /// + /// Using the transaction you can issue commands like [`get`](struct.Transaction.html#method.get) or [`set`](file:///home/hoverbear/git/client-rust/target/doc/tikv_client/transaction/struct.Transaction.html#method.set). + /// + /// ```rust,no_run + /// use tikv_client::{Config, transaction::Client}; + /// use futures::Future; + /// let connect = Client::new(&Config::default()); + /// let client = connect.wait().unwrap(); + /// let transaction = client.begin(); + /// // ... Issue some commands. + /// let commit = transaction.commit(); + /// let result: () = commit.wait().unwrap(); + /// ``` + pub fn begin(&self) -> Transaction { + unimplemented!() + } + + /// Create a new [`Transaction`](struct.Transaction.html) at the provded timestamp. + /// + /// ```rust,no_run + /// use tikv_client::{Config, transaction::Client}; + /// use futures::Future; + /// let connect = Client::new(&Config::default()); + /// let client = connect.wait().unwrap(); + /// let timestamp = client.current_timestamp(); + /// let transaction = client.begin_with_timestamp(timestamp); + /// // ... Issue some commands. + /// let commit = transaction.commit(); + /// let result: () = commit.wait().unwrap(); + /// ``` + pub fn begin_with_timestamp(&self, _timestamp: Timestamp) -> Transaction { + unimplemented!() + } + + /// Get a [`Snapshot`](struct.Snapshot.html) using the timestamp from [`current_timestamp`](struct.Client.html#method.current_timestamp). + /// + /// ```rust,no_run + /// use tikv_client::{Config, transaction::Client}; + /// use futures::Future; + /// let connect = Client::new(&Config::default()); + /// let client = connect.wait().unwrap(); + /// let snapshot = client.snapshot(); + /// // ... Issue some commands. + /// ``` + pub fn snapshot(&self) -> Snapshot { + unimplemented!() + } + + /// Retrieve the current [`Timestamp`](struct.Timestamp.html). + /// + /// ```rust,no_run + /// use tikv_client::{Config, transaction::Client}; + /// use futures::Future; + /// let connect = Client::new(&Config::default()); + /// let client = connect.wait().unwrap(); + /// let timestamp = client.current_timestamp(); + /// ``` + pub fn current_timestamp(&self) -> Timestamp { + unimplemented!() + } +} + +/// An unresolved [`Client`](struct.Client.html) connection to a TiKV cluster. +/// +/// Once resolved it will result in a connected [`Client`](struct.Client.html). +/// +/// ```rust,no_run +/// use tikv_client::{Config, transaction::{Client, Connect}}; +/// use futures::Future; +/// +/// let connect: Connect = Client::new(&Config::default()); +/// let client: Client = connect.wait().unwrap(); +/// ``` +pub struct Connect { + config: Config, +} + +impl Connect { + fn new(config: Config) -> Self { + Connect { config } + } +} -use {Config, Error, Key, KvPair, Value}; +impl Future for Connect { + type Item = Client; + type Error = Error; + + fn poll(&mut self) -> Poll { + let _config = &self.config; + unimplemented!() + } +} +/// A logical timestamp produced by PD. #[derive(Copy, Clone)] pub struct Timestamp(u64); -impl Into for u64 { - fn into(self) -> Timestamp { - Timestamp(self) +impl From for Timestamp { + fn from(v: u64) -> Self { + Timestamp(v) } } @@ -40,6 +158,246 @@ impl Timestamp { } } +/// A undo-able set of actions on the dataset. +/// +/// Using a transaction you can prepare a set of actions (such as `get`, or `set`) on data at a +/// particular timestamp obtained from the placement driver. +/// +/// Once a transaction is commited, a new commit timestamp is obtained from the placement driver. +pub struct Transaction { + snapshot: Snapshot, +} + +impl Transaction { + /// Create a new transaction operating on the given snapshot. + /// + /// ```rust,no_run + /// use tikv_client::{Config, transaction::Client}; + /// use futures::Future; + /// let connect = Client::new(&Config::default()); + /// let client = connect.wait().unwrap(); + /// let txn = client.begin(); + /// ``` + pub fn new(snapshot: Snapshot) -> Self { + Self { snapshot } + } + + /// Commit the actions of the transaction. + /// + /// Once committed, it is no longer possible to `rollback` the actions in the transaction. + /// + /// ```rust,no_run + /// # use tikv_client::{Config, transaction::Client}; + /// # use futures::Future; + /// # let connect = Client::new(&Config::default()); + /// # let connected_client = connect.wait().unwrap(); + /// let txn = connected_client.begin(); + /// // ... Do some actions. + /// let req = txn.commit(); + /// let result: () = req.wait().unwrap(); + /// ``` + pub fn commit(self) -> Commit { + Commit::new(self) + } + + /// Rollback the actions of the transaction. + /// + /// ```rust,no_run + /// # use tikv_client::{Config, transaction::Client}; + /// # use futures::Future; + /// # let connect = Client::new(&Config::default()); + /// # let connected_client = connect.wait().unwrap(); + /// let txn = connected_client.begin(); + /// // ... Do some actions. + /// let req = txn.rollback(); + /// let result: () = req.wait().unwrap(); + /// ``` + pub fn rollback(self) -> Rollback { + Rollback::new(self) + } + + /// Lock the given keys. + /// + /// ```rust,no_run + /// # use tikv_client::{Config, transaction::Client}; + /// # use futures::Future; + /// # let connect = Client::new(&Config::default()); + /// # let connected_client = connect.wait().unwrap(); + /// let mut txn = connected_client.begin(); + /// // ... Do some actions. + /// let req = txn.lock_keys(vec!["TiKV", "Rust"]); + /// let result: () = req.wait().unwrap(); + /// ``` + pub fn lock_keys(&mut self, keys: impl IntoIterator>) -> LockKeys { + LockKeys::new(keys.into_iter().map(|v| v.into()).collect()) + } + + pub fn is_readonly(&self) -> bool { + unimplemented!() + } + + /// Returns the timestamp which the transaction started at. + /// + /// ```rust,no_run + /// # use tikv_client::{Config, transaction::{Client, Timestamp}}; + /// # use futures::Future; + /// # let connect = Client::new(&Config::default()); + /// # let connected_client = connect.wait().unwrap(); + /// let txn = connected_client.begin(); + /// // ... Do some actions. + /// let ts: Timestamp = txn.start_ts(); + /// ``` + pub fn start_ts(&self) -> Timestamp { + unimplemented!() + } + + /// Get the `Snapshot` the transaction is operating on. + /// + /// ```rust,no_run + /// # use tikv_client::{Config, transaction::{Client, Snapshot}}; + /// # use futures::Future; + /// # let connect = Client::new(&Config::default()); + /// # let connected_client = connect.wait().unwrap(); + /// let txn = connected_client.begin(); + /// // ... Do some actions. + /// let snap: Snapshot = txn.snapshot(); + /// ``` + pub fn snapshot(&self) -> Snapshot { + unimplemented!() + } + + /// Set the isolation level of the transaction. + /// + /// ```rust,no_run + /// # use tikv_client::{Config, transaction::{Client, IsolationLevel}}; + /// # use futures::Future; + /// # let connect = Client::new(&Config::default()); + /// # let connected_client = connect.wait().unwrap(); + /// let mut txn = connected_client.begin(); + /// txn.set_isolation_level(IsolationLevel::SnapshotIsolation); + /// ``` + pub fn set_isolation_level(&mut self, _level: IsolationLevel) { + unimplemented!() + } + + /// Create a new [`Get`](struct.Get.html) request. + /// + /// Once resolved this request will result in the fetching of the value associated with the + /// given key. + /// + /// ```rust,no_run + /// # use tikv_client::{Value, Config, transaction::Client}; + /// # use futures::Future; + /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.wait().unwrap(); + /// let mut txn = connected_client.begin(); + /// let key = "TiKV"; + /// let req = txn.get(key); + /// let result: Value = req.wait().unwrap(); + /// // Finish the transaction... + /// txn.commit().wait().unwrap(); + /// ``` + pub fn get(&self, key: impl Into) -> Get { + self.snapshot.get(key.into()) + } + + /// Create a new [`BatchGet`](struct.BatchGet.html) request. + /// + /// Once resolved this request will result in the fetching of the values associated with the + /// given keys. + /// + /// ```rust,no_run + /// # use tikv_client::{KvPair, Config, transaction::Client}; + /// # use futures::Future; + /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.wait().unwrap(); + /// let mut txn = connected_client.begin(); + /// let keys = vec!["TiKV", "TiDB"]; + /// let req = txn.batch_get(keys); + /// let result: Vec = req.wait().unwrap(); + /// // Finish the transaction... + /// txn.commit().wait().unwrap(); + /// ``` + pub fn batch_get(&self, keys: impl IntoIterator>) -> BatchGet { + self.snapshot.batch_get(keys) + } + + pub fn scan(&self, range: impl RangeBounds) -> Scanner { + self.snapshot.scan(range) + } + + pub fn scan_reverse(&self, range: impl RangeBounds) -> Scanner { + self.snapshot.scan_reverse(range) + } + + /// Create a new [`Set`](struct.Set.html) request. + /// + /// Once resolved this request will result in the setting of the value associated with the given key. + /// + /// ```rust,no_run + /// # use tikv_client::{Key, Value, Config, transaction::Client}; + /// # use futures::Future; + /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.wait().unwrap(); + /// let mut txn = connected_client.begin(); + /// let key = "TiKV"; + /// let val = "TiKV"; + /// let req = txn.set(key, val); + /// let result: () = req.wait().unwrap(); + /// // Finish the transaction... + /// txn.commit().wait().unwrap(); + /// ``` + pub fn set(&mut self, key: impl Into, value: impl Into) -> Set { + Set::new(key.into(), value.into()) + } + + /// Create a new [`Delete`](struct.Delete.html) request. + /// + /// Once resolved this request will result in the deletion of the given key. + /// + /// ```rust,no_run + /// # use tikv_client::{Key, Config, transaction::Client}; + /// # use futures::Future; + /// # let connecting_client = Client::new(&Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.wait().unwrap(); + /// let mut txn = connected_client.begin(); + /// let key = "TiKV"; + /// let req = txn.delete(key); + /// let result: () = req.wait().unwrap(); + /// // Finish the transaction... + /// txn.commit().wait().unwrap(); + /// ``` + pub fn delete(&mut self, key: impl Into) -> Delete { + Delete::new(key.into()) + } +} + +/// A snapshot of dataset at a particular point in time. +pub struct Snapshot; + +impl Snapshot { + pub fn get(&self, key: impl Into) -> Get { + Get::new(key.into()) + } + + pub fn batch_get(&self, keys: impl IntoIterator>) -> BatchGet { + BatchGet::new(keys.into_iter().map(|v| v.into()).collect()) + } + + pub fn scan(&self, range: impl RangeBounds) -> Scanner { + drop(range); + unimplemented!() + } + + pub fn scan_reverse(&self, range: impl RangeBounds) -> Scanner { + drop(range); + unimplemented!() + } +} + +/// An unresolved [`Transaction::scan`](struct.Transaction.html#method.scan) request. +/// +/// Once resolved this request will result in a scanner over the given keys. pub struct Scanner; impl Stream for Scanner { @@ -51,12 +409,41 @@ impl Stream for Scanner { } } +/// The isolation level guarantees provided by the transaction. #[derive(Copy, Clone, Eq, PartialEq, Debug)] pub enum IsolationLevel { + /// Consistent reads and conflict free writes. + /// + /// Snapshot isolation guarantees: + /// * All reads will see the last committed value of the data at the snapshot timestamp. + /// * The transaction will only successfully commit if no updates to the data have created a + /// conflict with concurrent updates made sine the snapshot. + /// + /// Using this level means: + /// * Lost updates don't occur. + /// * Dirty reads don't occur. + /// * Non-repeatable reads don't occur. + /// * Phantom reads don't occur. SnapshotIsolation, + /// Reads may not be consistent, but writes are conflict free. + /// + /// Read committed guarantees: + /// * All reads are committed at the moment it is read. + /// not repeatable. + /// * Write locks are only released at the end of the transaction. + /// + /// Using this level means: + /// * Lost updates don't occur. + /// * Dirty reads don't occur. + /// * Non-repeatable reads may occur. + /// * Phantom reads may occur. ReadCommitted, } +/// An unresolved [`Transaction::get`](struct.Transaction.html#method.get) request. +/// +/// Once resolved this request will result in the fetching of the value associated with the given +/// key. pub struct Get { key: Key, } @@ -77,6 +464,10 @@ impl Future for Get { } } +/// An unresolved [`Transaction::batch_get`](struct.Transaction.html#method.batch_get) request. +/// +/// Once resolved this request will result in the fetching of the values associated with the given +/// keys. pub struct BatchGet { keys: Vec, } @@ -88,7 +479,7 @@ impl BatchGet { } impl Future for BatchGet { - type Item = Value; + type Item = Vec; type Error = Error; fn poll(&mut self) -> Poll { @@ -97,6 +488,9 @@ impl Future for BatchGet { } } +/// An unresolved [`Transaction::commit`](struct.Transaction.html#method.commit) request. +/// +/// Once resolved this request will result in the committing of the transaction. pub struct Commit { txn: Transaction, } @@ -117,6 +511,9 @@ impl Future for Commit { } } +/// An unresolved [`Transaction::rollback`](struct.Transaction.html#method.rollback) request. +/// +/// Once resolved this request will result in the rolling back of the transaction. pub struct Rollback { txn: Transaction, } @@ -137,6 +534,9 @@ impl Future for Rollback { } } +/// An unresolved [`Transaction::lock_keys`](struct.Transaction.html#method.lock_keys) request. +/// +/// Once resolved this request will result in the locking of the given keys. pub struct LockKeys { keys: Vec, } @@ -157,6 +557,10 @@ impl Future for LockKeys { } } +/// An unresolved [`Transaction::set`](struct.Transaction.html#method.set) request. +/// +/// Once resolved this request will result in the setting of the value associated with the given +/// key. pub struct Set { key: Key, value: Value, @@ -179,6 +583,9 @@ impl Future for Set { } } +/// An unresolved [`Transaction::delete`](struct.Transaction.html#method.delete) request. +/// +/// Once resolved this request will result in the deletion of the given key. pub struct Delete { key: Key, } @@ -198,128 +605,3 @@ impl Future for Delete { unimplemented!() } } - -pub struct Transaction { - snapshot: Snapshot, -} - -impl Transaction { - pub fn commit(self) -> Commit { - Commit::new(self) - } - - pub fn rollback(self) -> Rollback { - Rollback::new(self) - } - - pub fn lock_keys(&mut self, keys: impl AsRef<[Key]>) -> LockKeys { - LockKeys::new(keys.as_ref().to_vec().clone()) - } - - pub fn is_readonly(&self) -> bool { - unimplemented!() - } - - pub fn start_ts(&self) -> Timestamp { - unimplemented!() - } - - pub fn snapshot(&self) -> Snapshot { - unimplemented!() - } - - pub fn set_isolation_level(&mut self, _level: IsolationLevel) { - unimplemented!() - } - - pub fn get(&self, key: impl AsRef) -> Get { - self.snapshot.get(key) - } - - pub fn batch_get(&self, keys: impl AsRef<[Key]>) -> BatchGet { - self.snapshot.batch_get(keys) - } - - pub fn scan(&self, range: impl RangeBounds) -> Scanner { - self.snapshot.scan(range) - } - - pub fn scan_reverse(&self, range: impl RangeBounds) -> Scanner { - self.snapshot.scan_reverse(range) - } - - pub fn set(&mut self, key: impl Into, value: impl Into) -> Set { - Set::new(key.into(), value.into()) - } - - pub fn delete(&mut self, key: impl AsRef) -> Delete { - Delete::new(key.as_ref().clone()) - } -} - -pub struct Snapshot; - -impl Snapshot { - pub fn get(&self, key: impl AsRef) -> Get { - Get::new(key.as_ref().clone()) - } - - pub fn batch_get(&self, keys: impl AsRef<[Key]>) -> BatchGet { - BatchGet::new(keys.as_ref().to_vec().clone()) - } - - pub fn scan(&self, range: impl RangeBounds) -> Scanner { - drop(range); - unimplemented!() - } - - pub fn scan_reverse(&self, range: impl RangeBounds) -> Scanner { - drop(range); - unimplemented!() - } -} - -pub struct Connect { - config: Config, -} - -impl Connect { - fn new(config: Config) -> Self { - Connect { config } - } -} - -impl Future for Connect { - type Item = Client; - type Error = Error; - - fn poll(&mut self) -> Poll { - let _config = &self.config; - unimplemented!() - } -} - -pub struct Client {} - -impl Client { - #![cfg_attr(feature = "cargo-clippy", allow(new_ret_no_self))] - pub fn new(config: &Config) -> Connect { - Connect::new(config.clone()) - } - - pub fn begin(&self) -> Transaction { - unimplemented!() - } - - pub fn begin_with_timestamp(&self, _timestamp: Timestamp) -> Transaction { - unimplemented!() - } - - pub fn snapshot(&self) -> Snapshot { - unimplemented!() - } - - pub fn current_timestamp(&self) -> Timestamp { - unimplemented!() - } -}