From 0a8e7230a00d5128fbb698abd4686c07325a1c44 Mon Sep 17 00:00:00 2001 From: Yilin Chen Date: Wed, 16 Jan 2019 23:04:54 +0800 Subject: [PATCH 1/3] Use failure for error management. Signed-off-by: Yilin Chen --- Cargo.toml | 2 +- src/errors.rs | 242 +++++++++++++++++++++-------------------- src/lib.rs | 9 +- src/raw.rs | 28 +++-- src/rpc/client.rs | 10 +- src/rpc/pd/client.rs | 11 +- src/rpc/pd/leader.rs | 17 +-- src/rpc/pd/mod.rs | 17 ++- src/rpc/tikv/client.rs | 82 ++++++++------ src/rpc/util.rs | 5 +- 10 files changed, 239 insertions(+), 184 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 2a46c2ef..09f60afe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,6 @@ name = "tikv_client" futures = "0.1" serde = "1.0" serde_derive = "1.0" -quick-error = "1.2" grpcio = { version = "0.4", features = [ "secure" ] } protobuf = "~2.0" tokio-core = "0.1" @@ -23,6 +22,7 @@ tokio-timer = "0.2" fxhash = "0.2" lazy_static = "0.2.1" log = "0.3.9" +failure = "0.1" [dependencies.kvproto] git = "https://github.com/pingcap/kvproto.git" diff --git a/src/errors.rs b/src/errors.rs index 118ba202..ada5b064 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -10,127 +10,137 @@ // distributed under the License is distributed on an "AS IS" BASIS, // See the License for the specific language governing permissions and // limitations under the License. - +use failure::{Backtrace, Context, Fail}; use grpcio; -use quick_error::quick_error; -use std::{error, result}; +use std::fmt::{self, Display}; +use std::result; -quick_error! { - /// An error originating from the TiKV client or dependencies. - /// - /// This client currently uses [`quick_error`](https://docs.rs/quick-error/1.2.2/quick_error/) - /// for errors. *This may change in future versions.* - #[derive(Debug)] - pub enum Error { - /// Wraps a `std::io::Error`. - Io(err: ::std::io::Error) { - from() - cause(err) - description(err.description()) - } - /// Wraps a `grpcio::Error`. - Grpc(err: grpcio::Error) { - from() - cause(err) - description(err.description()) - } - /// Represents that a futures oneshot channel was cancelled. - Canceled(err: ::futures::sync::oneshot::Canceled) { - from() - cause(err) - description(err.description()) - } - /// An unknown error. - /// - /// Generally, this is not an expected error. Please report it if encountered. - Other(err: Box) { - from() - cause(err.as_ref()) - description(err.description()) - display("unknown error {:?}", err) - } - /// A region was not found for the given key. - RegionForKeyNotFound(key: Vec) { - description("region is not found") - display("region is not found for key {:?}", key) - } - /// A region was not found. - RegionNotFound(region_id: u64, message: Option) { - description("region is not found") - display("region {:?} is not found. {}", region_id, message.as_ref().unwrap_or(&"".to_owned())) - } - /// The peer is not a leader of the given region. - NotLeader(region_id: u64, message: Option) { - description("peer is not leader") - display("peer is not leader for region {}. {}", region_id, message.as_ref().unwrap_or(&"".to_owned())) - } - /// The store does not match. - StoreNotMatch(request_store_id: u64, actual_store_id: u64, message: String) { - description("store not match") - display("requesting store '{}' when actual store is '{}'. {}", request_store_id, actual_store_id, message) - } - /// The given key is not within the given region. - KeyNotInRegion(key: Vec, region_id: u64, start_key: Vec, end_key: Vec) { - description("region is not found") - display("key {:?} is not in region {:?}: [{:?}, {:?})", key, region_id, start_key, end_key) - } - /// A stale epoch. - StaleEpoch(message: Option) { - description("stale epoch") - display("{}", message.as_ref().unwrap_or(&"".to_owned())) - } - StaleCommand(message: String) { - description("stale command") - display("{}", message) - } - /// The server is too busy. - ServerIsBusy(reason: String, backoff: u64) { - description("server is busy") - display("server is busy: {:?}. Backoff {} ms", reason, backoff) - } - /// The given raft entry is too large for the region. - RaftEntryTooLarge(region_id: u64, entry_size: u64, message: String) { - description("raft entry too large") - display("{:?} bytes raft entry of region {:?} is too large. {}", entry_size, region_id, message) - } - KeyError(message: String) { - description("key error") - display("{}", message) - } - KVError(message: String) { - description("kv error") - display("{}", message) - } - InternalError(message: String) { - description("internal error") - display("{}", message) - } - InvalidKeyRange { - description("invalid key range") - display("Only left closed intervals are supported") - } - Unimplemented { - description("unimplemented feature") - display("Unimplemented feature") - } - EmptyValue { - description("can not set empty value") - display("Can not set empty value") - } - NoSuchKey { - description("key does not exist") - display("Key doest not exist") - } - InvalidOverlappingRanges { - description("ranges can not be overlapping") - display("Ranges can not be overlapping") - } - MaxScanLimitExceeded(limit: u32, max_limit: u32) { - description("limit exceeds max scan limit") - display("Limit {} excceds max scan limit {}", limit, max_limit) +#[derive(Debug)] +pub struct Error { + inner: Context, +} + +/// An error originating from the TiKV client or dependencies. +#[derive(Debug, Fail)] +pub enum ErrorKind { + /// Wraps a `std::io::Error`. + #[fail(display = "IO error: {}", _0)] + Io(#[fail(cause)] std::io::Error), + /// Wraps a `grpcio::Error`. + #[fail(display = "gRPC error: {}", _0)] + Grpc(#[fail(cause)] grpcio::Error), + /// Represents that a futures oneshot channel was cancelled. + #[fail(display = "A futures oneshot channel was canceled. {}", _0)] + Canceled(#[fail(cause)] futures::sync::oneshot::Canceled), + /// Feature is not implemented. + #[fail(display = "Unimplemented feature")] + Unimplemented, + // No region is found for the given key. + #[fail(display = "Region is not found for key: {:?}", key)] + RegionForKeyNotFound { key: Vec }, + /// The peer is not the leader for the region. + #[fail(display = "Peer is not leader for region {}. {}", region_id, message)] + NotLeader { region_id: u64, message: String }, + /// Stale epoch + #[fail(display = "Stale epoch. {}", message)] + StaleEpoch { message: String }, + /// No region is found for the given id. + #[fail(display = "Region {} is not found. {}", region_id, message)] + RegionNotFound { region_id: u64, message: String }, + /// Invalid key range to scan. Only left bounded intervals are supported. + #[fail(display = "Only left bounded intervals are supported")] + InvalidKeyRange, + /// No such key + #[fail(display = "Key does not exist")] + NoSuchKey, + /// Cannot set an empty value + #[fail(display = "Cannot set an empty value")] + EmptyValue, + /// Scan limit exceeds the maximum + #[fail(display = "Limit {} exceeds max scan limit {}", limit, max_limit)] + MaxScanLimitExceeded { limit: u32, max_limit: u32 }, + /// Wraps `kvproto::kvrpcpb::KeyError` + #[fail(display = "{:?}", _0)] + KeyError(kvproto::kvrpcpb::KeyError), + /// A string error returned by TiKV server + #[fail(display = "Kv error. {}", message)] + KvError { message: String }, + /// Reconstructed `kvproto::errorpb::KeyNotInRegion` + #[fail( + display = "Key {:?} is not in region {}: [{:?}, {:?})", + key, region_id, start_key, end_key + )] + KeyNotInRegion { + key: Vec, + region_id: u64, + start_key: Vec, + end_key: Vec, + }, + /// Reconstructed `kvproto::errorpb::ServerIsBusy` + #[fail(display = "Server is busy: {}. Backoff {} ms", reason, backoff_ms)] + ServerIsBusy { reason: String, backoff_ms: u64 }, + /// Reconstructed `kvproto::errorpb::StaleCommand` + #[fail(display = "Stale command. {}", message)] + StaleCommand { message: String }, + /// Reconstructed `kvproto::errorpb::StoreNotMatch` + #[fail( + display = "Requesting store {} when actual store is {}. {}", + request_store_id, actual_store_id, message + )] + StoreNotMatch { + request_store_id: u64, + actual_store_id: u64, + message: String, + }, + /// Reconstructed `kvproto::errorpb::RaftEntryTooLarge` + #[fail( + display = "{} bytes raft entry of region {} is too large. {}", + entry_size, region_id, message + )] + RaftEntryTooLarge { + region_id: u64, + entry_size: u64, + message: String, + }, + #[fail(display = "{}", message)] + InternalError { message: String }, +} + +impl Fail for Error { + fn cause(&self) -> Option<&Fail> { + self.inner.cause() + } + + fn backtrace(&self) -> Option<&Backtrace> { + self.inner.backtrace() + } +} + +impl Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + Display::fmt(&self.inner, f) + } +} + +impl Error { + pub fn kind(&self) -> &ErrorKind { + self.inner.get_context() + } +} + +impl From for Error { + fn from(kind: ErrorKind) -> Error { + Error { + inner: Context::new(kind), } } } +impl From> for Error { + fn from(inner: Context) -> Error { + Error { inner } + } +} + /// A result holding an [`Error`](enum.Error.html). pub type Result = result::Result; diff --git a/src/lib.rs b/src/lib.rs index 58d026ee..c2907aa0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,8 +10,9 @@ // distributed under the License is distributed on an "AS IS" BASIS, // See the License for the specific language governing permissions and // limitations under the License. -#![recursion_limit = "128"] -#![type_length_limit = "1572864"] + +// Long and nested future chains can quickly result in large generic types. +#![type_length_limit = "16777216"] use futures::Future; use serde_derive::*; @@ -33,6 +34,8 @@ pub mod transaction; #[doc(inline)] pub use crate::errors::Error; #[doc(inline)] +pub use crate::errors::ErrorKind; +#[doc(inline)] pub use crate::errors::Result; /// The key part of a key/value pair. @@ -462,7 +465,7 @@ fn range_to_keys(range: (Bound, Bound)) -> Result<(Key, Option)> } v } - Bound::Unbounded => return Err(Error::InvalidKeyRange), + Bound::Unbounded => Err(ErrorKind::InvalidKeyRange)?, }; let end = match range.1 { Bound::Included(v) => Some(v), diff --git a/src/raw.rs b/src/raw.rs index 3e3a432e..f5fd73f3 100644 --- a/src/raw.rs +++ b/src/raw.rs @@ -20,7 +20,9 @@ /// /// **Warning:** It is not advisible to use the both raw and transactional functionality in the same keyspace. /// -use crate::{rpc::RpcClient, Config, Error, Key, KeyRange, KvFuture, KvPair, Result, Value}; +use crate::{ + rpc::RpcClient, Config, Error, ErrorKind, Key, KeyRange, KvFuture, KvPair, Result, Value, +}; use futures::{future, Async, Future, Poll}; use std::{ ops::{Bound, Deref}, @@ -679,10 +681,13 @@ impl RequestInner for ScanInner { fn execute(self, client: Arc, cf: Option) -> KvFuture { if self.limit > MAX_RAW_KV_SCAN_LIMIT { - Box::new(future::err(Error::MaxScanLimitExceeded( - self.limit, - MAX_RAW_KV_SCAN_LIMIT, - ))) + Box::new(future::err( + ErrorKind::MaxScanLimitExceeded { + limit: self.limit, + max_limit: MAX_RAW_KV_SCAN_LIMIT, + } + .into(), + )) } else { let keys = match self.range.into_keys() { Err(e) => return Box::new(future::err(e)), @@ -751,13 +756,16 @@ impl RequestInner for BatchScanInner { fn execute(self, client: Arc, cf: Option) -> KvFuture { if self.each_limit > MAX_RAW_KV_SCAN_LIMIT { - Box::new(future::err(Error::MaxScanLimitExceeded( - self.each_limit, - MAX_RAW_KV_SCAN_LIMIT, - ))) + Box::new(future::err( + ErrorKind::MaxScanLimitExceeded { + limit: self.each_limit, + max_limit: MAX_RAW_KV_SCAN_LIMIT, + } + .into(), + )) } else if self.ranges.iter().any(Result::is_err) { // All errors must be InvalidKeyRange so we can simply return a new InvalidKeyRange - Box::new(future::err(Error::InvalidKeyRange)) + Box::new(future::err(ErrorKind::InvalidKeyRange.into())) } else { Box::new(client.raw_batch_scan( self.ranges.into_iter().map(Result::unwrap).collect(), diff --git a/src/rpc/client.rs b/src/rpc/client.rs index c6e15083..e24f039c 100644 --- a/src/rpc/client.rs +++ b/src/rpc/client.rs @@ -35,7 +35,7 @@ use crate::{ tikv::KvClient, util::HandyRwLock, }, - Config, Error, Key, KvPair, Result, Value, + Config, Error, ErrorKind, Key, KvPair, Result, Value, }; const CQ_COUNT: usize = 1; @@ -253,7 +253,7 @@ impl RpcClient { .and_then(move |context| context.client().raw_get(context, key)) .and_then(move |value| { if value.is_empty() { - Err(Error::NoSuchKey) + Err(ErrorKind::NoSuchKey)? } else { Ok(value) } @@ -292,7 +292,7 @@ impl RpcClient { cf: Option, ) -> impl Future { if value.is_empty() { - Either::A(future::err(Error::EmptyValue)) + Either::A(future::err(ErrorKind::EmptyValue.into())) } else { Either::B( Self::raw(self.inner(), &key, cf) @@ -307,7 +307,7 @@ impl RpcClient { cf: Option, ) -> impl Future { if pairs.iter().any(|p| p.value().is_empty()) { - Either::A(future::err(Error::EmptyValue)) + Either::A(future::err(ErrorKind::EmptyValue.into())) } else { let inner = self.inner(); Either::B( @@ -427,7 +427,7 @@ impl RpcClient { ) -> impl Future, Error = Error> { drop(ranges); drop(cf); - future::err(Error::Unimplemented) + future::err(ErrorKind::Unimplemented.into()) } pub fn raw_delete_range( diff --git a/src/rpc/pd/client.rs b/src/rpc/pd/client.rs index 5f3a25bd..64481693 100644 --- a/src/rpc/pd/client.rs +++ b/src/rpc/pd/client.rs @@ -32,7 +32,7 @@ use crate::{ security::SecurityManager, util::HandyRwLock, }, - Error, Result, + Error, ErrorKind, Result, }; const LEADER_CHANGE_RETRY: usize = 10; @@ -102,7 +102,7 @@ impl PdClient { let region = if resp.has_region() { resp.take_region() } else { - return Err(Error::RegionForKeyNotFound(key)); + Err(ErrorKind::RegionForKeyNotFound { key })? }; let leader = if resp.has_leader() { Some(resp.take_leader()) @@ -128,7 +128,10 @@ impl PdClient { let region = if resp.has_region() { resp.take_region() } else { - return Err(Error::RegionNotFound(region_id, None)); + Err(ErrorKind::RegionNotFound { + region_id, + message: String::default(), + })? }; let leader = if resp.has_leader() { Some(resp.take_leader()) @@ -155,7 +158,7 @@ impl PdClient { let cli = &cli.rl().client; executor(cli, option) .unwrap() - .map_err(Error::Grpc) + .map_err(|e| ErrorKind::Grpc(e).into()) .and_then(|r| { { let header = r.header(); diff --git a/src/rpc/pd/leader.rs b/src/rpc/pd/leader.rs index 5a1a9a31..1c759935 100644 --- a/src/rpc/pd/leader.rs +++ b/src/rpc/pd/leader.rs @@ -39,7 +39,7 @@ use crate::{ security::SecurityManager, util::HandyRwLock, }, - Error, Result, + Error, ErrorKind, Result, }; macro_rules! pd_request { @@ -139,12 +139,12 @@ impl PdReactor { let (tx, rx) = client.wl().client.tso().unwrap(); let tso_rx = client.wl().reactor.tso_rx.take().unwrap(); handle.spawn( - tx.sink_map_err(Error::Grpc) + tx.sink_map_err(|e| ErrorKind::Grpc(e).into()) .send_all(tso_rx.then(|r| match r { Ok(r) => Ok((r, WriteFlags::default())), Err(()) => Err(internal_err!("failed to recv tso requests")), })) - .then(|r| match r { + .then(|r: Result<_>| match r { Ok((mut sender, _)) => { sender.get_mut().cancel(); Ok(()) @@ -220,7 +220,8 @@ impl PdReactor { // Schedule tso request to run. self.schedule(PdTask::Request); } - rx.map_err(Error::Canceled).then(move |r| context.done(r)) + rx.map_err(|e| ErrorKind::Canceled(e).into()) + .then(move |r| context.done(r)) } } @@ -362,10 +363,10 @@ fn connect( ) -> Result<(pdpb_grpc::PdClient, pdpb::GetMembersResponse)> { let client = security_mgr.connect(env, addr, pdpb_grpc::PdClient::new)?; let option = CallOption::default().timeout(timeout); - match client.get_members_opt(&pdpb::GetMembersRequest::new(), option) { - Ok(resp) => Ok((client, resp)), - Err(e) => Err(Error::Grpc(e)), - } + let resp = client + .get_members_opt(&pdpb::GetMembersRequest::new(), option) + .map_err(ErrorKind::Grpc)?; + Ok((client, resp)) } fn try_connect( diff --git a/src/rpc/pd/mod.rs b/src/rpc/pd/mod.rs index f1e95f27..fd429ee3 100644 --- a/src/rpc/pd/mod.rs +++ b/src/rpc/pd/mod.rs @@ -16,7 +16,7 @@ use std::ops::{Deref, DerefMut}; use kvproto::{kvrpcpb, metapb}; pub use crate::rpc::pd::client::PdClient; -use crate::{Error, Key, Result}; +use crate::{ErrorKind, Key, Result}; #[macro_use] mod leader; @@ -76,7 +76,13 @@ impl Region { pub fn context(&self) -> Result { self.leader .as_ref() - .ok_or_else(|| Error::NotLeader(self.region.get_id(), None)) + .ok_or_else(|| { + ErrorKind::NotLeader { + region_id: self.region.get_id(), + message: String::default(), + } + .into() + }) .map(|l| { let mut ctx = kvrpcpb::Context::default(); ctx.set_region_id(self.region.get_id()); @@ -113,7 +119,12 @@ impl Region { .as_ref() .map(Clone::clone) .map(Into::into) - .ok_or_else(|| Error::StaleEpoch(None)) + .ok_or_else(|| { + ErrorKind::StaleEpoch { + message: String::default(), + } + .into() + }) } pub fn meta(&self) -> metapb::Region { diff --git a/src/rpc/tikv/client.rs b/src/rpc/tikv/client.rs index 4fa81e26..fc674e5e 100644 --- a/src/rpc/tikv/client.rs +++ b/src/rpc/tikv/client.rs @@ -25,7 +25,7 @@ use crate::{ tikv::context::{request_context, RequestContext}, }, transaction::{Mutation, TxnInfo}, - Error, Key, KvPair, Result, Value, + Error, ErrorKind, Key, KvPair, Result, Value, }; trait HasRegionError { @@ -39,42 +39,59 @@ trait HasError { impl From for Error { fn from(mut e: errorpb::Error) -> Error { let message = e.take_message(); - if e.has_not_leader() { + let kind = if e.has_not_leader() { let e = e.get_not_leader(); - Error::NotLeader( - e.get_region_id(), - Some(format!("{}. Leader: {:?}", message, e.get_leader())), - ) + ErrorKind::NotLeader { + region_id: e.get_region_id(), + message: format!("{}. Leader: {:?}", message, e.get_leader()), + } } else if e.has_region_not_found() { - Error::RegionNotFound(e.get_region_not_found().get_region_id(), Some(message)) + ErrorKind::RegionNotFound { + region_id: e.get_region_not_found().get_region_id(), + message, + } } else if e.has_key_not_in_region() { let mut e = e.take_key_not_in_region(); - Error::KeyNotInRegion( - e.take_key(), - e.get_region_id(), - e.take_start_key(), - e.take_end_key(), - ) + ErrorKind::KeyNotInRegion { + key: e.take_key(), + region_id: e.get_region_id(), + start_key: e.take_start_key(), + end_key: e.take_end_key(), + } } else if e.has_stale_epoch() { - Error::StaleEpoch(Some(format!( - "{}. New epoch: {:?}", - message, - e.get_stale_epoch().get_new_regions() - ))) + ErrorKind::StaleEpoch { + message: format!( + "{}. New epoch: {:?}", + message, + e.get_stale_epoch().get_new_regions() + ), + } } else if e.has_server_is_busy() { let mut e = e.take_server_is_busy(); - Error::ServerIsBusy(e.take_reason(), e.get_backoff_ms()) + ErrorKind::ServerIsBusy { + reason: e.take_reason(), + backoff_ms: e.get_backoff_ms(), + } } else if e.has_stale_command() { - Error::StaleCommand(message) + ErrorKind::StaleCommand { message } } else if e.has_store_not_match() { let e = e.get_store_not_match(); - Error::StoreNotMatch(e.get_request_store_id(), e.get_actual_store_id(), message) + ErrorKind::StoreNotMatch { + request_store_id: e.get_request_store_id(), + actual_store_id: e.get_actual_store_id(), + message, + } } else if e.has_raft_entry_too_large() { let e = e.get_raft_entry_too_large(); - Error::RaftEntryTooLarge(e.get_region_id(), e.get_entry_size(), message) + ErrorKind::RaftEntryTooLarge { + region_id: e.get_region_id(), + entry_size: e.get_entry_size(), + message, + } } else { - Error::InternalError(message) - } + ErrorKind::InternalError { message } + }; + kind.into() } } @@ -114,18 +131,12 @@ has_region_error!(kvrpcpb::RawDeleteRangeResponse); has_region_error!(kvrpcpb::RawScanResponse); has_region_error!(kvrpcpb::RawBatchScanResponse); -impl From for Error { - fn from(e: kvrpcpb::KeyError) -> Error { - Error::KeyError(format!("{:?}", e)) - } -} - macro_rules! has_key_error { ($type:ty) => { impl HasError for $type { fn error(&mut self) -> Option { if self.has_error() { - Some(self.take_error().into()) + Some(ErrorKind::KeyError(self.take_error()).into()) } else { None } @@ -149,7 +160,12 @@ macro_rules! has_str_error { if self.get_error().is_empty() { None } else { - Some(Error::KVError(self.take_error())) + Some( + ErrorKind::KvError { + message: self.take_error(), + } + .into(), + ) } } } @@ -632,7 +648,7 @@ impl KvClient { ) .unwrap() .then(|r| match r { - Err(e) => Err(Error::Grpc(e)), + Err(e) => Err(ErrorKind::Grpc(e))?, Ok(mut r) => { if let Some(e) = r.region_error() { Err(e) diff --git a/src/rpc/util.rs b/src/rpc/util.rs index 42c0c5fe..470b3a20 100644 --- a/src/rpc/util.rs +++ b/src/rpc/util.rs @@ -22,7 +22,10 @@ use tokio_timer::{self, timer::Handle}; macro_rules! internal_err { ($e:expr) => ({ - $crate::Error::InternalError(format!("[{}:{}]: {}", file!(), line!(), $e)) + let kind = $crate::ErrorKind::InternalError { + message: format!("[{}:{}]: {}", file!(), line!(), $e) + }; + $crate::Error::from(kind) }); ($f:tt, $($arg:expr),+) => ({ internal_err!(format!($f, $($arg),+)) From d4ea0c7d45b023f1355a73eba1d387a2c8b863fa Mon Sep 17 00:00:00 2001 From: Yilin Chen Date: Sat, 19 Jan 2019 11:12:48 +0800 Subject: [PATCH 2/3] Add error constructors Signed-off-by: Yilin Chen --- src/errors.rs | 121 ++++++++++++++++++++++++++++++++++++++++- src/lib.rs | 2 +- src/raw.rs | 26 ++++----- src/rpc/client.rs | 10 ++-- src/rpc/pd/client.rs | 11 ++-- src/rpc/pd/leader.rs | 9 ++- src/rpc/pd/mod.rs | 17 +----- src/rpc/tikv/client.rs | 70 +++++++----------------- src/rpc/util.rs | 6 +- 9 files changed, 167 insertions(+), 105 deletions(-) diff --git a/src/errors.rs b/src/errors.rs index ada5b064..16e2a714 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -79,10 +79,10 @@ pub enum ErrorKind { /// Reconstructed `kvproto::errorpb::ServerIsBusy` #[fail(display = "Server is busy: {}. Backoff {} ms", reason, backoff_ms)] ServerIsBusy { reason: String, backoff_ms: u64 }, - /// Reconstructed `kvproto::errorpb::StaleCommand` + /// Represents `kvproto::errorpb::StaleCommand` with additional error message #[fail(display = "Stale command. {}", message)] StaleCommand { message: String }, - /// Reconstructed `kvproto::errorpb::StoreNotMatch` + /// Represents `kvproto::errorpb::StoreNotMatch` with additional error message #[fail( display = "Requesting store {} when actual store is {}. {}", request_store_id, actual_store_id, message @@ -92,7 +92,7 @@ pub enum ErrorKind { actual_store_id: u64, message: String, }, - /// Reconstructed `kvproto::errorpb::RaftEntryTooLarge` + /// Represents `kvproto::errorpb::RaftEntryTooLarge` with additional error message #[fail( display = "{} bytes raft entry of region {} is too large. {}", entry_size, region_id, message @@ -126,6 +126,97 @@ impl Error { pub fn kind(&self) -> &ErrorKind { self.inner.get_context() } + + pub(crate) fn unimplemented() -> Self { + Error::from(ErrorKind::Unimplemented) + } + + pub(crate) fn region_for_key_not_found(key: Vec) -> Self { + Error::from(ErrorKind::RegionForKeyNotFound { key }) + } + + pub(crate) fn not_leader(region_id: u64, message: Option) -> Self { + Error::from(ErrorKind::NotLeader { + region_id, + message: message.unwrap_or_default(), + }) + } + + pub(crate) fn stale_epoch(message: Option) -> Self { + Error::from(ErrorKind::StaleEpoch { + message: message.unwrap_or_default(), + }) + } + + pub(crate) fn region_not_found(region_id: u64, message: Option) -> Self { + Error::from(ErrorKind::RegionNotFound { + region_id, + message: message.unwrap_or_default(), + }) + } + + pub(crate) fn invalid_key_range() -> Self { + Error::from(ErrorKind::InvalidKeyRange) + } + + pub(crate) fn no_such_key() -> Self { + Error::from(ErrorKind::NoSuchKey) + } + + pub(crate) fn empty_value() -> Self { + Error::from(ErrorKind::EmptyValue) + } + + pub(crate) fn max_scan_limit_exceeded(limit: u32, max_limit: u32) -> Self { + Error::from(ErrorKind::MaxScanLimitExceeded { limit, max_limit }) + } + + pub(crate) fn kv_error(message: String) -> Self { + Error::from(ErrorKind::KvError { message }) + } + + pub(crate) fn key_not_in_region(mut e: kvproto::errorpb::KeyNotInRegion) -> Self { + Error::from(ErrorKind::KeyNotInRegion { + key: e.take_key(), + region_id: e.get_region_id(), + start_key: e.take_start_key(), + end_key: e.take_end_key(), + }) + } + + pub(crate) fn server_is_busy(mut e: kvproto::errorpb::ServerIsBusy) -> Self { + Error::from(ErrorKind::ServerIsBusy { + reason: e.take_reason(), + backoff_ms: e.get_backoff_ms(), + }) + } + + pub(crate) fn stale_command(message: String) -> Self { + Error::from(ErrorKind::StaleCommand { message }) + } + + pub(crate) fn store_not_match(e: kvproto::errorpb::StoreNotMatch, message: String) -> Self { + Error::from(ErrorKind::StoreNotMatch { + request_store_id: e.get_request_store_id(), + actual_store_id: e.get_actual_store_id(), + message, + }) + } + + pub(crate) fn raft_entry_too_large( + e: kvproto::errorpb::RaftEntryTooLarge, + message: String, + ) -> Self { + Error::from(ErrorKind::RaftEntryTooLarge { + region_id: e.get_region_id(), + entry_size: e.get_entry_size(), + message, + }) + } + + pub(crate) fn internal_error(message: String) -> Self { + Error::from(ErrorKind::InternalError { message }) + } } impl From for Error { @@ -142,5 +233,29 @@ impl From> for Error { } } +impl From for Error { + fn from(err: std::io::Error) -> Self { + Error::from(ErrorKind::Io(err)) + } +} + +impl From for Error { + fn from(err: grpcio::Error) -> Self { + Error::from(ErrorKind::Grpc(err)) + } +} + +impl From for Error { + fn from(err: futures::sync::oneshot::Canceled) -> Self { + Error::from(ErrorKind::Canceled(err)) + } +} + +impl From for Error { + fn from(err: kvproto::kvrpcpb::KeyError) -> Self { + Error::from(ErrorKind::KeyError(err)) + } +} + /// A result holding an [`Error`](enum.Error.html). pub type Result = result::Result; diff --git a/src/lib.rs b/src/lib.rs index c2907aa0..962dd5ed 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -465,7 +465,7 @@ fn range_to_keys(range: (Bound, Bound)) -> Result<(Key, Option)> } v } - Bound::Unbounded => Err(ErrorKind::InvalidKeyRange)?, + Bound::Unbounded => Err(Error::invalid_key_range())?, }; let end = match range.1 { Bound::Included(v) => Some(v), diff --git a/src/raw.rs b/src/raw.rs index f5fd73f3..6fdcf3e5 100644 --- a/src/raw.rs +++ b/src/raw.rs @@ -21,7 +21,7 @@ /// **Warning:** It is not advisible to use the both raw and transactional functionality in the same keyspace. /// use crate::{ - rpc::RpcClient, Config, Error, ErrorKind, Key, KeyRange, KvFuture, KvPair, Result, Value, + rpc::RpcClient, Config, Error, Key, KeyRange, KvFuture, KvPair, Result, Value, }; use futures::{future, Async, Future, Poll}; use std::{ @@ -681,13 +681,10 @@ impl RequestInner for ScanInner { fn execute(self, client: Arc, cf: Option) -> KvFuture { if self.limit > MAX_RAW_KV_SCAN_LIMIT { - Box::new(future::err( - ErrorKind::MaxScanLimitExceeded { - limit: self.limit, - max_limit: MAX_RAW_KV_SCAN_LIMIT, - } - .into(), - )) + Box::new(future::err(Error::max_scan_limit_exceeded( + self.limit, + MAX_RAW_KV_SCAN_LIMIT, + ))) } else { let keys = match self.range.into_keys() { Err(e) => return Box::new(future::err(e)), @@ -756,16 +753,13 @@ impl RequestInner for BatchScanInner { fn execute(self, client: Arc, cf: Option) -> KvFuture { if self.each_limit > MAX_RAW_KV_SCAN_LIMIT { - Box::new(future::err( - ErrorKind::MaxScanLimitExceeded { - limit: self.each_limit, - max_limit: MAX_RAW_KV_SCAN_LIMIT, - } - .into(), - )) + Box::new(future::err(Error::max_scan_limit_exceeded( + self.each_limit, + MAX_RAW_KV_SCAN_LIMIT, + ))) } else if self.ranges.iter().any(Result::is_err) { // All errors must be InvalidKeyRange so we can simply return a new InvalidKeyRange - Box::new(future::err(ErrorKind::InvalidKeyRange.into())) + Box::new(future::err(Error::invalid_key_range())) } else { Box::new(client.raw_batch_scan( self.ranges.into_iter().map(Result::unwrap).collect(), diff --git a/src/rpc/client.rs b/src/rpc/client.rs index e24f039c..d87ec99c 100644 --- a/src/rpc/client.rs +++ b/src/rpc/client.rs @@ -35,7 +35,7 @@ use crate::{ tikv::KvClient, util::HandyRwLock, }, - Config, Error, ErrorKind, Key, KvPair, Result, Value, + Config, Error, Key, KvPair, Result, Value, }; const CQ_COUNT: usize = 1; @@ -253,7 +253,7 @@ impl RpcClient { .and_then(move |context| context.client().raw_get(context, key)) .and_then(move |value| { if value.is_empty() { - Err(ErrorKind::NoSuchKey)? + Err(Error::no_such_key())? } else { Ok(value) } @@ -292,7 +292,7 @@ impl RpcClient { cf: Option, ) -> impl Future { if value.is_empty() { - Either::A(future::err(ErrorKind::EmptyValue.into())) + Either::A(future::err(Error::empty_value())) } else { Either::B( Self::raw(self.inner(), &key, cf) @@ -307,7 +307,7 @@ impl RpcClient { cf: Option, ) -> impl Future { if pairs.iter().any(|p| p.value().is_empty()) { - Either::A(future::err(ErrorKind::EmptyValue.into())) + Either::A(future::err(Error::empty_value())) } else { let inner = self.inner(); Either::B( @@ -427,7 +427,7 @@ impl RpcClient { ) -> impl Future, Error = Error> { drop(ranges); drop(cf); - future::err(ErrorKind::Unimplemented.into()) + future::err(Error::unimplemented()) } pub fn raw_delete_range( diff --git a/src/rpc/pd/client.rs b/src/rpc/pd/client.rs index 64481693..fc374b83 100644 --- a/src/rpc/pd/client.rs +++ b/src/rpc/pd/client.rs @@ -32,7 +32,7 @@ use crate::{ security::SecurityManager, util::HandyRwLock, }, - Error, ErrorKind, Result, + Error, Result, }; const LEADER_CHANGE_RETRY: usize = 10; @@ -102,7 +102,7 @@ impl PdClient { let region = if resp.has_region() { resp.take_region() } else { - Err(ErrorKind::RegionForKeyNotFound { key })? + Err(Error::region_for_key_not_found(key))? }; let leader = if resp.has_leader() { Some(resp.take_leader()) @@ -128,10 +128,7 @@ impl PdClient { let region = if resp.has_region() { resp.take_region() } else { - Err(ErrorKind::RegionNotFound { - region_id, - message: String::default(), - })? + Err(Error::region_not_found(region_id, None))? }; let leader = if resp.has_leader() { Some(resp.take_leader()) @@ -158,7 +155,7 @@ impl PdClient { let cli = &cli.rl().client; executor(cli, option) .unwrap() - .map_err(|e| ErrorKind::Grpc(e).into()) + .map_err(Into::into) .and_then(|r| { { let header = r.header(); diff --git a/src/rpc/pd/leader.rs b/src/rpc/pd/leader.rs index 1c759935..fbb8f9a0 100644 --- a/src/rpc/pd/leader.rs +++ b/src/rpc/pd/leader.rs @@ -39,7 +39,7 @@ use crate::{ security::SecurityManager, util::HandyRwLock, }, - Error, ErrorKind, Result, + Error, Result, }; macro_rules! pd_request { @@ -139,7 +139,7 @@ impl PdReactor { let (tx, rx) = client.wl().client.tso().unwrap(); let tso_rx = client.wl().reactor.tso_rx.take().unwrap(); handle.spawn( - tx.sink_map_err(|e| ErrorKind::Grpc(e).into()) + tx.sink_map_err(Into::into) .send_all(tso_rx.then(|r| match r { Ok(r) => Ok((r, WriteFlags::default())), Err(()) => Err(internal_err!("failed to recv tso requests")), @@ -220,8 +220,7 @@ impl PdReactor { // Schedule tso request to run. self.schedule(PdTask::Request); } - rx.map_err(|e| ErrorKind::Canceled(e).into()) - .then(move |r| context.done(r)) + rx.map_err(Into::into).then(move |r| context.done(r)) } } @@ -365,7 +364,7 @@ fn connect( let option = CallOption::default().timeout(timeout); let resp = client .get_members_opt(&pdpb::GetMembersRequest::new(), option) - .map_err(ErrorKind::Grpc)?; + .map_err(Error::from)?; Ok((client, resp)) } diff --git a/src/rpc/pd/mod.rs b/src/rpc/pd/mod.rs index fd429ee3..0fc3ab36 100644 --- a/src/rpc/pd/mod.rs +++ b/src/rpc/pd/mod.rs @@ -16,7 +16,7 @@ use std::ops::{Deref, DerefMut}; use kvproto::{kvrpcpb, metapb}; pub use crate::rpc::pd::client::PdClient; -use crate::{ErrorKind, Key, Result}; +use crate::{Error, Key, Result}; #[macro_use] mod leader; @@ -76,13 +76,7 @@ impl Region { pub fn context(&self) -> Result { self.leader .as_ref() - .ok_or_else(|| { - ErrorKind::NotLeader { - region_id: self.region.get_id(), - message: String::default(), - } - .into() - }) + .ok_or_else(|| Error::not_leader(self.region.get_id(), None)) .map(|l| { let mut ctx = kvrpcpb::Context::default(); ctx.set_region_id(self.region.get_id()); @@ -119,12 +113,7 @@ impl Region { .as_ref() .map(Clone::clone) .map(Into::into) - .ok_or_else(|| { - ErrorKind::StaleEpoch { - message: String::default(), - } - .into() - }) + .ok_or_else(|| Error::stale_epoch(None)) } pub fn meta(&self) -> metapb::Region { diff --git a/src/rpc/tikv/client.rs b/src/rpc/tikv/client.rs index fc674e5e..49be25cb 100644 --- a/src/rpc/tikv/client.rs +++ b/src/rpc/tikv/client.rs @@ -39,59 +39,32 @@ trait HasError { impl From for Error { fn from(mut e: errorpb::Error) -> Error { let message = e.take_message(); - let kind = if e.has_not_leader() { + if e.has_not_leader() { let e = e.get_not_leader(); - ErrorKind::NotLeader { - region_id: e.get_region_id(), - message: format!("{}. Leader: {:?}", message, e.get_leader()), - } + let message = format!("{}. Leader: {:?}", message, e.get_leader()); + Error::not_leader(e.get_region_id(), Some(message)) } else if e.has_region_not_found() { - ErrorKind::RegionNotFound { - region_id: e.get_region_not_found().get_region_id(), - message, - } + Error::region_not_found(e.get_region_not_found().get_region_id(), Some(message)) } else if e.has_key_not_in_region() { - let mut e = e.take_key_not_in_region(); - ErrorKind::KeyNotInRegion { - key: e.take_key(), - region_id: e.get_region_id(), - start_key: e.take_start_key(), - end_key: e.take_end_key(), - } + Error::key_not_in_region(e.take_key_not_in_region()) } else if e.has_stale_epoch() { - ErrorKind::StaleEpoch { - message: format!( - "{}. New epoch: {:?}", - message, - e.get_stale_epoch().get_new_regions() - ), - } + let message = format!( + "{}. New epoch: {:?}", + message, + e.get_stale_epoch().get_new_regions() + ); + Error::stale_epoch(Some(message)) } else if e.has_server_is_busy() { - let mut e = e.take_server_is_busy(); - ErrorKind::ServerIsBusy { - reason: e.take_reason(), - backoff_ms: e.get_backoff_ms(), - } + Error::server_is_busy(e.take_server_is_busy()) } else if e.has_stale_command() { - ErrorKind::StaleCommand { message } + Error::stale_command(message) } else if e.has_store_not_match() { - let e = e.get_store_not_match(); - ErrorKind::StoreNotMatch { - request_store_id: e.get_request_store_id(), - actual_store_id: e.get_actual_store_id(), - message, - } + Error::store_not_match(e.take_store_not_match(), message) } else if e.has_raft_entry_too_large() { - let e = e.get_raft_entry_too_large(); - ErrorKind::RaftEntryTooLarge { - region_id: e.get_region_id(), - entry_size: e.get_entry_size(), - message, - } + Error::raft_entry_too_large(e.take_raft_entry_too_large(), message) } else { - ErrorKind::InternalError { message } - }; - kind.into() + Error::internal_error(message) + } } } @@ -136,7 +109,7 @@ macro_rules! has_key_error { impl HasError for $type { fn error(&mut self) -> Option { if self.has_error() { - Some(ErrorKind::KeyError(self.take_error()).into()) + Some(self.take_error().into()) } else { None } @@ -160,12 +133,7 @@ macro_rules! has_str_error { if self.get_error().is_empty() { None } else { - Some( - ErrorKind::KvError { - message: self.take_error(), - } - .into(), - ) + Some(Error::kv_error(self.take_error()) ) } } } diff --git a/src/rpc/util.rs b/src/rpc/util.rs index 470b3a20..9fddb775 100644 --- a/src/rpc/util.rs +++ b/src/rpc/util.rs @@ -22,9 +22,9 @@ use tokio_timer::{self, timer::Handle}; macro_rules! internal_err { ($e:expr) => ({ - let kind = $crate::ErrorKind::InternalError { - message: format!("[{}:{}]: {}", file!(), line!(), $e) - }; + let kind = $crate::Error::internal_error( + format!("[{}:{}]: {}", file!(), line!(), $e) + ); $crate::Error::from(kind) }); ($f:tt, $($arg:expr),+) => ({ From 166058b2c477f2e265c152f7501b6f1a103122ea Mon Sep 17 00:00:00 2001 From: Yilin Chen Date: Mon, 21 Jan 2019 14:15:28 +0800 Subject: [PATCH 3/3] Remove NoSuchKey error Signed-off-by: Yilin Chen --- examples/raw.rs | 6 ++---- src/errors.rs | 7 ------- src/rpc/client.rs | 6 +++--- 3 files changed, 5 insertions(+), 14 deletions(-) diff --git a/examples/raw.rs b/examples/raw.rs index eebe6249..79d1f0b5 100644 --- a/examples/raw.rs +++ b/examples/raw.rs @@ -70,10 +70,8 @@ fn main() -> Result<()> { .expect("Could not delete value"); println!("Key: {:?} deleted", key); - client - .get(key) - .wait() - .expect_err("Get returned value for not existing key"); + // Get returns None for non-existing key + assert!(client.get(key).wait()?.is_none()); let pairs: Vec = (1..3) .map(|i| KvPair::from((Key::from(format!("k{}", i)), Value::from(format!("v{}", i))))) diff --git a/src/errors.rs b/src/errors.rs index 16e2a714..920264d8 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -50,9 +50,6 @@ pub enum ErrorKind { /// Invalid key range to scan. Only left bounded intervals are supported. #[fail(display = "Only left bounded intervals are supported")] InvalidKeyRange, - /// No such key - #[fail(display = "Key does not exist")] - NoSuchKey, /// Cannot set an empty value #[fail(display = "Cannot set an empty value")] EmptyValue, @@ -159,10 +156,6 @@ impl Error { Error::from(ErrorKind::InvalidKeyRange) } - pub(crate) fn no_such_key() -> Self { - Error::from(ErrorKind::NoSuchKey) - } - pub(crate) fn empty_value() -> Self { Error::from(ErrorKind::EmptyValue) } diff --git a/src/rpc/client.rs b/src/rpc/client.rs index 89a19a12..fc3db470 100644 --- a/src/rpc/client.rs +++ b/src/rpc/client.rs @@ -285,7 +285,7 @@ impl RpcClient { cf: Option, ) -> impl Future { if value.is_empty() { - Either::A(future::err(Error::EmptyValue)) + Either::A(future::err(Error::empty_value())) } else { Either::B( Self::raw(self.inner(), &key, cf) @@ -300,7 +300,7 @@ impl RpcClient { cf: Option, ) -> impl Future { if pairs.iter().any(|p| p.value().is_empty()) { - Either::A(future::err(Error::EmptyValue)) + Either::A(future::err(Error::empty_value())) } else { let inner = self.inner(); Either::B( @@ -416,7 +416,7 @@ impl RpcClient { ) -> impl Future, Error = Error> { drop(ranges); drop(cf); - future::err(Error::Unimplemented) + future::err(Error::unimplemented()) } pub fn raw_delete_range(