diff --git a/Cargo.toml b/Cargo.toml index 2a46c2ef..09f60afe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,6 @@ name = "tikv_client" futures = "0.1" serde = "1.0" serde_derive = "1.0" -quick-error = "1.2" grpcio = { version = "0.4", features = [ "secure" ] } protobuf = "~2.0" tokio-core = "0.1" @@ -23,6 +22,7 @@ tokio-timer = "0.2" fxhash = "0.2" lazy_static = "0.2.1" log = "0.3.9" +failure = "0.1" [dependencies.kvproto] git = "https://github.com/pingcap/kvproto.git" diff --git a/examples/raw.rs b/examples/raw.rs index eebe6249..79d1f0b5 100644 --- a/examples/raw.rs +++ b/examples/raw.rs @@ -70,10 +70,8 @@ fn main() -> Result<()> { .expect("Could not delete value"); println!("Key: {:?} deleted", key); - client - .get(key) - .wait() - .expect_err("Get returned value for not existing key"); + // Get returns None for non-existing key + assert!(client.get(key).wait()?.is_none()); let pairs: Vec = (1..3) .map(|i| KvPair::from((Key::from(format!("k{}", i)), Value::from(format!("v{}", i))))) diff --git a/src/errors.rs b/src/errors.rs index 118ba202..920264d8 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -10,127 +10,245 @@ // distributed under the License is distributed on an "AS IS" BASIS, // See the License for the specific language governing permissions and // limitations under the License. - +use failure::{Backtrace, Context, Fail}; use grpcio; -use quick_error::quick_error; -use std::{error, result}; - -quick_error! { - /// An error originating from the TiKV client or dependencies. - /// - /// This client currently uses [`quick_error`](https://docs.rs/quick-error/1.2.2/quick_error/) - /// for errors. *This may change in future versions.* - #[derive(Debug)] - pub enum Error { - /// Wraps a `std::io::Error`. - Io(err: ::std::io::Error) { - from() - cause(err) - description(err.description()) - } - /// Wraps a `grpcio::Error`. - Grpc(err: grpcio::Error) { - from() - cause(err) - description(err.description()) - } - /// Represents that a futures oneshot channel was cancelled. - Canceled(err: ::futures::sync::oneshot::Canceled) { - from() - cause(err) - description(err.description()) - } - /// An unknown error. - /// - /// Generally, this is not an expected error. Please report it if encountered. - Other(err: Box) { - from() - cause(err.as_ref()) - description(err.description()) - display("unknown error {:?}", err) - } - /// A region was not found for the given key. - RegionForKeyNotFound(key: Vec) { - description("region is not found") - display("region is not found for key {:?}", key) - } - /// A region was not found. - RegionNotFound(region_id: u64, message: Option) { - description("region is not found") - display("region {:?} is not found. {}", region_id, message.as_ref().unwrap_or(&"".to_owned())) - } - /// The peer is not a leader of the given region. - NotLeader(region_id: u64, message: Option) { - description("peer is not leader") - display("peer is not leader for region {}. {}", region_id, message.as_ref().unwrap_or(&"".to_owned())) - } - /// The store does not match. - StoreNotMatch(request_store_id: u64, actual_store_id: u64, message: String) { - description("store not match") - display("requesting store '{}' when actual store is '{}'. {}", request_store_id, actual_store_id, message) - } - /// The given key is not within the given region. - KeyNotInRegion(key: Vec, region_id: u64, start_key: Vec, end_key: Vec) { - description("region is not found") - display("key {:?} is not in region {:?}: [{:?}, {:?})", key, region_id, start_key, end_key) - } - /// A stale epoch. - StaleEpoch(message: Option) { - description("stale epoch") - display("{}", message.as_ref().unwrap_or(&"".to_owned())) - } - StaleCommand(message: String) { - description("stale command") - display("{}", message) - } - /// The server is too busy. - ServerIsBusy(reason: String, backoff: u64) { - description("server is busy") - display("server is busy: {:?}. Backoff {} ms", reason, backoff) - } - /// The given raft entry is too large for the region. - RaftEntryTooLarge(region_id: u64, entry_size: u64, message: String) { - description("raft entry too large") - display("{:?} bytes raft entry of region {:?} is too large. {}", entry_size, region_id, message) - } - KeyError(message: String) { - description("key error") - display("{}", message) - } - KVError(message: String) { - description("kv error") - display("{}", message) - } - InternalError(message: String) { - description("internal error") - display("{}", message) - } - InvalidKeyRange { - description("invalid key range") - display("Only left closed intervals are supported") - } - Unimplemented { - description("unimplemented feature") - display("Unimplemented feature") - } - EmptyValue { - description("can not set empty value") - display("Can not set empty value") - } - NoSuchKey { - description("key does not exist") - display("Key doest not exist") - } - InvalidOverlappingRanges { - description("ranges can not be overlapping") - display("Ranges can not be overlapping") - } - MaxScanLimitExceeded(limit: u32, max_limit: u32) { - description("limit exceeds max scan limit") - display("Limit {} excceds max scan limit {}", limit, max_limit) +use std::fmt::{self, Display}; +use std::result; + +#[derive(Debug)] +pub struct Error { + inner: Context, +} + +/// An error originating from the TiKV client or dependencies. +#[derive(Debug, Fail)] +pub enum ErrorKind { + /// Wraps a `std::io::Error`. + #[fail(display = "IO error: {}", _0)] + Io(#[fail(cause)] std::io::Error), + /// Wraps a `grpcio::Error`. + #[fail(display = "gRPC error: {}", _0)] + Grpc(#[fail(cause)] grpcio::Error), + /// Represents that a futures oneshot channel was cancelled. + #[fail(display = "A futures oneshot channel was canceled. {}", _0)] + Canceled(#[fail(cause)] futures::sync::oneshot::Canceled), + /// Feature is not implemented. + #[fail(display = "Unimplemented feature")] + Unimplemented, + // No region is found for the given key. + #[fail(display = "Region is not found for key: {:?}", key)] + RegionForKeyNotFound { key: Vec }, + /// The peer is not the leader for the region. + #[fail(display = "Peer is not leader for region {}. {}", region_id, message)] + NotLeader { region_id: u64, message: String }, + /// Stale epoch + #[fail(display = "Stale epoch. {}", message)] + StaleEpoch { message: String }, + /// No region is found for the given id. + #[fail(display = "Region {} is not found. {}", region_id, message)] + RegionNotFound { region_id: u64, message: String }, + /// Invalid key range to scan. Only left bounded intervals are supported. + #[fail(display = "Only left bounded intervals are supported")] + InvalidKeyRange, + /// Cannot set an empty value + #[fail(display = "Cannot set an empty value")] + EmptyValue, + /// Scan limit exceeds the maximum + #[fail(display = "Limit {} exceeds max scan limit {}", limit, max_limit)] + MaxScanLimitExceeded { limit: u32, max_limit: u32 }, + /// Wraps `kvproto::kvrpcpb::KeyError` + #[fail(display = "{:?}", _0)] + KeyError(kvproto::kvrpcpb::KeyError), + /// A string error returned by TiKV server + #[fail(display = "Kv error. {}", message)] + KvError { message: String }, + /// Reconstructed `kvproto::errorpb::KeyNotInRegion` + #[fail( + display = "Key {:?} is not in region {}: [{:?}, {:?})", + key, region_id, start_key, end_key + )] + KeyNotInRegion { + key: Vec, + region_id: u64, + start_key: Vec, + end_key: Vec, + }, + /// Reconstructed `kvproto::errorpb::ServerIsBusy` + #[fail(display = "Server is busy: {}. Backoff {} ms", reason, backoff_ms)] + ServerIsBusy { reason: String, backoff_ms: u64 }, + /// Represents `kvproto::errorpb::StaleCommand` with additional error message + #[fail(display = "Stale command. {}", message)] + StaleCommand { message: String }, + /// Represents `kvproto::errorpb::StoreNotMatch` with additional error message + #[fail( + display = "Requesting store {} when actual store is {}. {}", + request_store_id, actual_store_id, message + )] + StoreNotMatch { + request_store_id: u64, + actual_store_id: u64, + message: String, + }, + /// Represents `kvproto::errorpb::RaftEntryTooLarge` with additional error message + #[fail( + display = "{} bytes raft entry of region {} is too large. {}", + entry_size, region_id, message + )] + RaftEntryTooLarge { + region_id: u64, + entry_size: u64, + message: String, + }, + #[fail(display = "{}", message)] + InternalError { message: String }, +} + +impl Fail for Error { + fn cause(&self) -> Option<&Fail> { + self.inner.cause() + } + + fn backtrace(&self) -> Option<&Backtrace> { + self.inner.backtrace() + } +} + +impl Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + Display::fmt(&self.inner, f) + } +} + +impl Error { + pub fn kind(&self) -> &ErrorKind { + self.inner.get_context() + } + + pub(crate) fn unimplemented() -> Self { + Error::from(ErrorKind::Unimplemented) + } + + pub(crate) fn region_for_key_not_found(key: Vec) -> Self { + Error::from(ErrorKind::RegionForKeyNotFound { key }) + } + + pub(crate) fn not_leader(region_id: u64, message: Option) -> Self { + Error::from(ErrorKind::NotLeader { + region_id, + message: message.unwrap_or_default(), + }) + } + + pub(crate) fn stale_epoch(message: Option) -> Self { + Error::from(ErrorKind::StaleEpoch { + message: message.unwrap_or_default(), + }) + } + + pub(crate) fn region_not_found(region_id: u64, message: Option) -> Self { + Error::from(ErrorKind::RegionNotFound { + region_id, + message: message.unwrap_or_default(), + }) + } + + pub(crate) fn invalid_key_range() -> Self { + Error::from(ErrorKind::InvalidKeyRange) + } + + pub(crate) fn empty_value() -> Self { + Error::from(ErrorKind::EmptyValue) + } + + pub(crate) fn max_scan_limit_exceeded(limit: u32, max_limit: u32) -> Self { + Error::from(ErrorKind::MaxScanLimitExceeded { limit, max_limit }) + } + + pub(crate) fn kv_error(message: String) -> Self { + Error::from(ErrorKind::KvError { message }) + } + + pub(crate) fn key_not_in_region(mut e: kvproto::errorpb::KeyNotInRegion) -> Self { + Error::from(ErrorKind::KeyNotInRegion { + key: e.take_key(), + region_id: e.get_region_id(), + start_key: e.take_start_key(), + end_key: e.take_end_key(), + }) + } + + pub(crate) fn server_is_busy(mut e: kvproto::errorpb::ServerIsBusy) -> Self { + Error::from(ErrorKind::ServerIsBusy { + reason: e.take_reason(), + backoff_ms: e.get_backoff_ms(), + }) + } + + pub(crate) fn stale_command(message: String) -> Self { + Error::from(ErrorKind::StaleCommand { message }) + } + + pub(crate) fn store_not_match(e: kvproto::errorpb::StoreNotMatch, message: String) -> Self { + Error::from(ErrorKind::StoreNotMatch { + request_store_id: e.get_request_store_id(), + actual_store_id: e.get_actual_store_id(), + message, + }) + } + + pub(crate) fn raft_entry_too_large( + e: kvproto::errorpb::RaftEntryTooLarge, + message: String, + ) -> Self { + Error::from(ErrorKind::RaftEntryTooLarge { + region_id: e.get_region_id(), + entry_size: e.get_entry_size(), + message, + }) + } + + pub(crate) fn internal_error(message: String) -> Self { + Error::from(ErrorKind::InternalError { message }) + } +} + +impl From for Error { + fn from(kind: ErrorKind) -> Error { + Error { + inner: Context::new(kind), } } } +impl From> for Error { + fn from(inner: Context) -> Error { + Error { inner } + } +} + +impl From for Error { + fn from(err: std::io::Error) -> Self { + Error::from(ErrorKind::Io(err)) + } +} + +impl From for Error { + fn from(err: grpcio::Error) -> Self { + Error::from(ErrorKind::Grpc(err)) + } +} + +impl From for Error { + fn from(err: futures::sync::oneshot::Canceled) -> Self { + Error::from(ErrorKind::Canceled(err)) + } +} + +impl From for Error { + fn from(err: kvproto::kvrpcpb::KeyError) -> Self { + Error::from(ErrorKind::KeyError(err)) + } +} + /// A result holding an [`Error`](enum.Error.html). pub type Result = result::Result; diff --git a/src/lib.rs b/src/lib.rs index 1745d129..3b6b7123 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,8 +10,9 @@ // distributed under the License is distributed on an "AS IS" BASIS, // See the License for the specific language governing permissions and // limitations under the License. -#![recursion_limit = "128"] -#![type_length_limit = "1572864"] + +// Long and nested future chains can quickly result in large generic types. +#![type_length_limit = "16777216"] //! TiKV Client for Rust. //! @@ -131,6 +132,8 @@ pub mod transaction; #[doc(inline)] pub use crate::errors::Error; #[doc(inline)] +pub use crate::errors::ErrorKind; +#[doc(inline)] pub use crate::errors::Result; struct HexRepr<'a>(pub &'a [u8]); @@ -596,7 +599,7 @@ fn range_to_keys(range: (Bound, Bound)) -> Result<(Key, Option)> } v } - Bound::Unbounded => return Err(Error::InvalidKeyRange), + Bound::Unbounded => Err(Error::invalid_key_range())?, }; let end = match range.1 { Bound::Included(v) => Some(v), diff --git a/src/raw.rs b/src/raw.rs index b5de288e..d489f6c3 100644 --- a/src/raw.rs +++ b/src/raw.rs @@ -679,7 +679,7 @@ impl RequestInner for ScanInner { fn execute(self, client: Arc, cf: Option) -> KvFuture { if self.limit > MAX_RAW_KV_SCAN_LIMIT { - Box::new(future::err(Error::MaxScanLimitExceeded( + Box::new(future::err(Error::max_scan_limit_exceeded( self.limit, MAX_RAW_KV_SCAN_LIMIT, ))) @@ -751,13 +751,13 @@ impl RequestInner for BatchScanInner { fn execute(self, client: Arc, cf: Option) -> KvFuture { if self.each_limit > MAX_RAW_KV_SCAN_LIMIT { - Box::new(future::err(Error::MaxScanLimitExceeded( + Box::new(future::err(Error::max_scan_limit_exceeded( self.each_limit, MAX_RAW_KV_SCAN_LIMIT, ))) } else if self.ranges.iter().any(Result::is_err) { // All errors must be InvalidKeyRange so we can simply return a new InvalidKeyRange - Box::new(future::err(Error::InvalidKeyRange)) + Box::new(future::err(Error::invalid_key_range())) } else { Box::new(client.raw_batch_scan( self.ranges.into_iter().map(Result::unwrap).collect(), diff --git a/src/rpc/client.rs b/src/rpc/client.rs index 89a19a12..fc3db470 100644 --- a/src/rpc/client.rs +++ b/src/rpc/client.rs @@ -285,7 +285,7 @@ impl RpcClient { cf: Option, ) -> impl Future { if value.is_empty() { - Either::A(future::err(Error::EmptyValue)) + Either::A(future::err(Error::empty_value())) } else { Either::B( Self::raw(self.inner(), &key, cf) @@ -300,7 +300,7 @@ impl RpcClient { cf: Option, ) -> impl Future { if pairs.iter().any(|p| p.value().is_empty()) { - Either::A(future::err(Error::EmptyValue)) + Either::A(future::err(Error::empty_value())) } else { let inner = self.inner(); Either::B( @@ -416,7 +416,7 @@ impl RpcClient { ) -> impl Future, Error = Error> { drop(ranges); drop(cf); - future::err(Error::Unimplemented) + future::err(Error::unimplemented()) } pub fn raw_delete_range( diff --git a/src/rpc/pd/client.rs b/src/rpc/pd/client.rs index 5f3a25bd..fc374b83 100644 --- a/src/rpc/pd/client.rs +++ b/src/rpc/pd/client.rs @@ -102,7 +102,7 @@ impl PdClient { let region = if resp.has_region() { resp.take_region() } else { - return Err(Error::RegionForKeyNotFound(key)); + Err(Error::region_for_key_not_found(key))? }; let leader = if resp.has_leader() { Some(resp.take_leader()) @@ -128,7 +128,7 @@ impl PdClient { let region = if resp.has_region() { resp.take_region() } else { - return Err(Error::RegionNotFound(region_id, None)); + Err(Error::region_not_found(region_id, None))? }; let leader = if resp.has_leader() { Some(resp.take_leader()) @@ -155,7 +155,7 @@ impl PdClient { let cli = &cli.rl().client; executor(cli, option) .unwrap() - .map_err(Error::Grpc) + .map_err(Into::into) .and_then(|r| { { let header = r.header(); diff --git a/src/rpc/pd/leader.rs b/src/rpc/pd/leader.rs index 0261d594..773d2add 100644 --- a/src/rpc/pd/leader.rs +++ b/src/rpc/pd/leader.rs @@ -139,12 +139,12 @@ impl PdReactor { let (tx, rx) = client.wl().client.tso().unwrap(); let tso_rx = client.wl().reactor.tso_rx.take().unwrap(); handle.spawn( - tx.sink_map_err(Error::Grpc) + tx.sink_map_err(Into::into) .send_all(tso_rx.then(|r| match r { Ok(r) => Ok((r, WriteFlags::default())), Err(()) => Err(internal_err!("failed to recv tso requests")), })) - .then(|r| match r { + .then(|r: Result<_>| match r { Ok((mut sender, _)) => { sender.get_mut().cancel(); Ok(()) @@ -220,7 +220,7 @@ impl PdReactor { // Schedule tso request to run. self.schedule(PdTask::Request); } - rx.map_err(Error::Canceled).then(move |r| context.done(r)) + rx.map_err(Into::into).then(move |r| context.done(r)) } } @@ -362,10 +362,10 @@ fn connect( ) -> Result<(pdpb_grpc::PdClient, pdpb::GetMembersResponse)> { let client = security_mgr.connect(env, addr, pdpb_grpc::PdClient::new)?; let option = CallOption::default().timeout(timeout); - match client.get_members_opt(&pdpb::GetMembersRequest::new(), option) { - Ok(resp) => Ok((client, resp)), - Err(e) => Err(Error::Grpc(e)), - } + let resp = client + .get_members_opt(&pdpb::GetMembersRequest::new(), option) + .map_err(Error::from)?; + Ok((client, resp)) } fn try_connect( diff --git a/src/rpc/pd/mod.rs b/src/rpc/pd/mod.rs index f1e95f27..0fc3ab36 100644 --- a/src/rpc/pd/mod.rs +++ b/src/rpc/pd/mod.rs @@ -76,7 +76,7 @@ impl Region { pub fn context(&self) -> Result { self.leader .as_ref() - .ok_or_else(|| Error::NotLeader(self.region.get_id(), None)) + .ok_or_else(|| Error::not_leader(self.region.get_id(), None)) .map(|l| { let mut ctx = kvrpcpb::Context::default(); ctx.set_region_id(self.region.get_id()); @@ -113,7 +113,7 @@ impl Region { .as_ref() .map(Clone::clone) .map(Into::into) - .ok_or_else(|| Error::StaleEpoch(None)) + .ok_or_else(|| Error::stale_epoch(None)) } pub fn meta(&self) -> metapb::Region { diff --git a/src/rpc/tikv/client.rs b/src/rpc/tikv/client.rs index 4fa81e26..49be25cb 100644 --- a/src/rpc/tikv/client.rs +++ b/src/rpc/tikv/client.rs @@ -25,7 +25,7 @@ use crate::{ tikv::context::{request_context, RequestContext}, }, transaction::{Mutation, TxnInfo}, - Error, Key, KvPair, Result, Value, + Error, ErrorKind, Key, KvPair, Result, Value, }; trait HasRegionError { @@ -41,39 +41,29 @@ impl From for Error { let message = e.take_message(); if e.has_not_leader() { let e = e.get_not_leader(); - Error::NotLeader( - e.get_region_id(), - Some(format!("{}. Leader: {:?}", message, e.get_leader())), - ) + let message = format!("{}. Leader: {:?}", message, e.get_leader()); + Error::not_leader(e.get_region_id(), Some(message)) } else if e.has_region_not_found() { - Error::RegionNotFound(e.get_region_not_found().get_region_id(), Some(message)) + Error::region_not_found(e.get_region_not_found().get_region_id(), Some(message)) } else if e.has_key_not_in_region() { - let mut e = e.take_key_not_in_region(); - Error::KeyNotInRegion( - e.take_key(), - e.get_region_id(), - e.take_start_key(), - e.take_end_key(), - ) + Error::key_not_in_region(e.take_key_not_in_region()) } else if e.has_stale_epoch() { - Error::StaleEpoch(Some(format!( + let message = format!( "{}. New epoch: {:?}", message, e.get_stale_epoch().get_new_regions() - ))) + ); + Error::stale_epoch(Some(message)) } else if e.has_server_is_busy() { - let mut e = e.take_server_is_busy(); - Error::ServerIsBusy(e.take_reason(), e.get_backoff_ms()) + Error::server_is_busy(e.take_server_is_busy()) } else if e.has_stale_command() { - Error::StaleCommand(message) + Error::stale_command(message) } else if e.has_store_not_match() { - let e = e.get_store_not_match(); - Error::StoreNotMatch(e.get_request_store_id(), e.get_actual_store_id(), message) + Error::store_not_match(e.take_store_not_match(), message) } else if e.has_raft_entry_too_large() { - let e = e.get_raft_entry_too_large(); - Error::RaftEntryTooLarge(e.get_region_id(), e.get_entry_size(), message) + Error::raft_entry_too_large(e.take_raft_entry_too_large(), message) } else { - Error::InternalError(message) + Error::internal_error(message) } } } @@ -114,12 +104,6 @@ has_region_error!(kvrpcpb::RawDeleteRangeResponse); has_region_error!(kvrpcpb::RawScanResponse); has_region_error!(kvrpcpb::RawBatchScanResponse); -impl From for Error { - fn from(e: kvrpcpb::KeyError) -> Error { - Error::KeyError(format!("{:?}", e)) - } -} - macro_rules! has_key_error { ($type:ty) => { impl HasError for $type { @@ -149,7 +133,7 @@ macro_rules! has_str_error { if self.get_error().is_empty() { None } else { - Some(Error::KVError(self.take_error())) + Some(Error::kv_error(self.take_error()) ) } } } @@ -632,7 +616,7 @@ impl KvClient { ) .unwrap() .then(|r| match r { - Err(e) => Err(Error::Grpc(e)), + Err(e) => Err(ErrorKind::Grpc(e))?, Ok(mut r) => { if let Some(e) = r.region_error() { Err(e) diff --git a/src/rpc/util.rs b/src/rpc/util.rs index 42c0c5fe..9fddb775 100644 --- a/src/rpc/util.rs +++ b/src/rpc/util.rs @@ -22,7 +22,10 @@ use tokio_timer::{self, timer::Handle}; macro_rules! internal_err { ($e:expr) => ({ - $crate::Error::InternalError(format!("[{}:{}]: {}", file!(), line!(), $e)) + let kind = $crate::Error::internal_error( + format!("[{}:{}]: {}", file!(), line!(), $e) + ); + $crate::Error::from(kind) }); ($f:tt, $($arg:expr),+) => ({ internal_err!(format!($f, $($arg),+))