Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ name = "tikv_client"
futures = "0.1"
serde = "1.0"
serde_derive = "1.0"
quick-error = "1.2"
grpcio = { version = "0.4", features = [ "secure" ] }
protobuf = "~2.0"
tokio-core = "0.1"
tokio-timer = "0.2"
fxhash = "0.2"
lazy_static = "0.2.1"
log = "0.3.9"
failure = "0.1"

[dependencies.kvproto]
git = "https://github.com/pingcap/kvproto.git"
Expand Down
6 changes: 2 additions & 4 deletions examples/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,8 @@ fn main() -> Result<()> {
.expect("Could not delete value");
println!("Key: {:?} deleted", key);

client
.get(key)
.wait()
.expect_err("Get returned value for not existing key");
// Get returns None for non-existing key
assert!(client.get(key).wait()?.is_none());

let pairs: Vec<KvPair> = (1..3)
.map(|i| KvPair::from((Key::from(format!("k{}", i)), Value::from(format!("v{}", i)))))
Expand Down
352 changes: 235 additions & 117 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,127 +10,245 @@
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

use failure::{Backtrace, Context, Fail};
use grpcio;
use quick_error::quick_error;
use std::{error, result};

quick_error! {
/// An error originating from the TiKV client or dependencies.
///
/// This client currently uses [`quick_error`](https://docs.rs/quick-error/1.2.2/quick_error/)
/// for errors. *This may change in future versions.*
#[derive(Debug)]
pub enum Error {
/// Wraps a `std::io::Error`.
Io(err: ::std::io::Error) {
from()
cause(err)
description(err.description())
}
/// Wraps a `grpcio::Error`.
Grpc(err: grpcio::Error) {
from()
cause(err)
description(err.description())
}
/// Represents that a futures oneshot channel was cancelled.
Canceled(err: ::futures::sync::oneshot::Canceled) {
from()
cause(err)
description(err.description())
}
/// An unknown error.
///
/// Generally, this is not an expected error. Please report it if encountered.
Other(err: Box<error::Error + Sync + Send>) {
from()
cause(err.as_ref())
description(err.description())
display("unknown error {:?}", err)
}
/// A region was not found for the given key.
RegionForKeyNotFound(key: Vec<u8>) {
description("region is not found")
display("region is not found for key {:?}", key)
}
/// A region was not found.
RegionNotFound(region_id: u64, message: Option<String>) {
description("region is not found")
display("region {:?} is not found. {}", region_id, message.as_ref().unwrap_or(&"".to_owned()))
}
/// The peer is not a leader of the given region.
NotLeader(region_id: u64, message: Option<String>) {
description("peer is not leader")
display("peer is not leader for region {}. {}", region_id, message.as_ref().unwrap_or(&"".to_owned()))
}
/// The store does not match.
StoreNotMatch(request_store_id: u64, actual_store_id: u64, message: String) {
description("store not match")
display("requesting store '{}' when actual store is '{}'. {}", request_store_id, actual_store_id, message)
}
/// The given key is not within the given region.
KeyNotInRegion(key: Vec<u8>, region_id: u64, start_key: Vec<u8>, end_key: Vec<u8>) {
description("region is not found")
display("key {:?} is not in region {:?}: [{:?}, {:?})", key, region_id, start_key, end_key)
}
/// A stale epoch.
StaleEpoch(message: Option<String>) {
description("stale epoch")
display("{}", message.as_ref().unwrap_or(&"".to_owned()))
}
StaleCommand(message: String) {
description("stale command")
display("{}", message)
}
/// The server is too busy.
ServerIsBusy(reason: String, backoff: u64) {
description("server is busy")
display("server is busy: {:?}. Backoff {} ms", reason, backoff)
}
/// The given raft entry is too large for the region.
RaftEntryTooLarge(region_id: u64, entry_size: u64, message: String) {
description("raft entry too large")
display("{:?} bytes raft entry of region {:?} is too large. {}", entry_size, region_id, message)
}
KeyError(message: String) {
description("key error")
display("{}", message)
}
KVError(message: String) {
description("kv error")
display("{}", message)
}
InternalError(message: String) {
description("internal error")
display("{}", message)
}
InvalidKeyRange {
description("invalid key range")
display("Only left closed intervals are supported")
}
Unimplemented {
description("unimplemented feature")
display("Unimplemented feature")
}
EmptyValue {
description("can not set empty value")
display("Can not set empty value")
}
NoSuchKey {
description("key does not exist")
display("Key doest not exist")
}
InvalidOverlappingRanges {
description("ranges can not be overlapping")
display("Ranges can not be overlapping")
}
MaxScanLimitExceeded(limit: u32, max_limit: u32) {
description("limit exceeds max scan limit")
display("Limit {} excceds max scan limit {}", limit, max_limit)
use std::fmt::{self, Display};
use std::result;

#[derive(Debug)]
pub struct Error {
inner: Context<ErrorKind>,
}

/// An error originating from the TiKV client or dependencies.
#[derive(Debug, Fail)]
pub enum ErrorKind {
/// Wraps a `std::io::Error`.
#[fail(display = "IO error: {}", _0)]
Io(#[fail(cause)] std::io::Error),
/// Wraps a `grpcio::Error`.
#[fail(display = "gRPC error: {}", _0)]
Grpc(#[fail(cause)] grpcio::Error),
/// Represents that a futures oneshot channel was cancelled.
#[fail(display = "A futures oneshot channel was canceled. {}", _0)]
Canceled(#[fail(cause)] futures::sync::oneshot::Canceled),
/// Feature is not implemented.
#[fail(display = "Unimplemented feature")]
Unimplemented,
// No region is found for the given key.
#[fail(display = "Region is not found for key: {:?}", key)]
RegionForKeyNotFound { key: Vec<u8> },
/// The peer is not the leader for the region.
#[fail(display = "Peer is not leader for region {}. {}", region_id, message)]
NotLeader { region_id: u64, message: String },
/// Stale epoch
#[fail(display = "Stale epoch. {}", message)]
StaleEpoch { message: String },
/// No region is found for the given id.
#[fail(display = "Region {} is not found. {}", region_id, message)]
RegionNotFound { region_id: u64, message: String },
/// Invalid key range to scan. Only left bounded intervals are supported.
#[fail(display = "Only left bounded intervals are supported")]
InvalidKeyRange,
/// Cannot set an empty value
#[fail(display = "Cannot set an empty value")]
EmptyValue,
/// Scan limit exceeds the maximum
#[fail(display = "Limit {} exceeds max scan limit {}", limit, max_limit)]
MaxScanLimitExceeded { limit: u32, max_limit: u32 },
/// Wraps `kvproto::kvrpcpb::KeyError`
#[fail(display = "{:?}", _0)]
KeyError(kvproto::kvrpcpb::KeyError),
/// A string error returned by TiKV server
#[fail(display = "Kv error. {}", message)]
KvError { message: String },
/// Reconstructed `kvproto::errorpb::KeyNotInRegion`
#[fail(
display = "Key {:?} is not in region {}: [{:?}, {:?})",
key, region_id, start_key, end_key
)]
KeyNotInRegion {
key: Vec<u8>,
region_id: u64,
start_key: Vec<u8>,
end_key: Vec<u8>,
},
/// Reconstructed `kvproto::errorpb::ServerIsBusy`
#[fail(display = "Server is busy: {}. Backoff {} ms", reason, backoff_ms)]
ServerIsBusy { reason: String, backoff_ms: u64 },
/// Represents `kvproto::errorpb::StaleCommand` with additional error message
#[fail(display = "Stale command. {}", message)]
StaleCommand { message: String },
/// Represents `kvproto::errorpb::StoreNotMatch` with additional error message
#[fail(
display = "Requesting store {} when actual store is {}. {}",
request_store_id, actual_store_id, message
)]
StoreNotMatch {
request_store_id: u64,
actual_store_id: u64,
message: String,
},
/// Represents `kvproto::errorpb::RaftEntryTooLarge` with additional error message
#[fail(
display = "{} bytes raft entry of region {} is too large. {}",
entry_size, region_id, message
)]
RaftEntryTooLarge {
region_id: u64,
entry_size: u64,
message: String,
},
#[fail(display = "{}", message)]
InternalError { message: String },
}

impl Fail for Error {
fn cause(&self) -> Option<&Fail> {
self.inner.cause()
}

fn backtrace(&self) -> Option<&Backtrace> {
self.inner.backtrace()
}
}

impl Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
Display::fmt(&self.inner, f)
}
}

impl Error {
pub fn kind(&self) -> &ErrorKind {
self.inner.get_context()
}

pub(crate) fn unimplemented() -> Self {
Error::from(ErrorKind::Unimplemented)
}

pub(crate) fn region_for_key_not_found(key: Vec<u8>) -> Self {
Error::from(ErrorKind::RegionForKeyNotFound { key })
}

pub(crate) fn not_leader(region_id: u64, message: Option<String>) -> Self {
Error::from(ErrorKind::NotLeader {
region_id,
message: message.unwrap_or_default(),
})
}

pub(crate) fn stale_epoch(message: Option<String>) -> Self {
Error::from(ErrorKind::StaleEpoch {
message: message.unwrap_or_default(),
})
}

pub(crate) fn region_not_found(region_id: u64, message: Option<String>) -> Self {
Error::from(ErrorKind::RegionNotFound {
region_id,
message: message.unwrap_or_default(),
})
}

pub(crate) fn invalid_key_range() -> Self {
Error::from(ErrorKind::InvalidKeyRange)
}

pub(crate) fn empty_value() -> Self {
Error::from(ErrorKind::EmptyValue)
}

pub(crate) fn max_scan_limit_exceeded(limit: u32, max_limit: u32) -> Self {
Error::from(ErrorKind::MaxScanLimitExceeded { limit, max_limit })
}

pub(crate) fn kv_error(message: String) -> Self {
Error::from(ErrorKind::KvError { message })
}

pub(crate) fn key_not_in_region(mut e: kvproto::errorpb::KeyNotInRegion) -> Self {
Error::from(ErrorKind::KeyNotInRegion {
key: e.take_key(),
region_id: e.get_region_id(),
start_key: e.take_start_key(),
end_key: e.take_end_key(),
})
}

pub(crate) fn server_is_busy(mut e: kvproto::errorpb::ServerIsBusy) -> Self {
Error::from(ErrorKind::ServerIsBusy {
reason: e.take_reason(),
backoff_ms: e.get_backoff_ms(),
})
}

pub(crate) fn stale_command(message: String) -> Self {
Error::from(ErrorKind::StaleCommand { message })
}

pub(crate) fn store_not_match(e: kvproto::errorpb::StoreNotMatch, message: String) -> Self {
Error::from(ErrorKind::StoreNotMatch {
request_store_id: e.get_request_store_id(),
actual_store_id: e.get_actual_store_id(),
message,
})
}

pub(crate) fn raft_entry_too_large(
e: kvproto::errorpb::RaftEntryTooLarge,
message: String,
) -> Self {
Error::from(ErrorKind::RaftEntryTooLarge {
region_id: e.get_region_id(),
entry_size: e.get_entry_size(),
message,
})
}

pub(crate) fn internal_error(message: String) -> Self {
Error::from(ErrorKind::InternalError { message })
}
}

impl From<ErrorKind> for Error {
fn from(kind: ErrorKind) -> Error {
Error {
inner: Context::new(kind),
}
}
}

impl From<Context<ErrorKind>> for Error {
fn from(inner: Context<ErrorKind>) -> Error {
Error { inner }
}
}

impl From<std::io::Error> for Error {
fn from(err: std::io::Error) -> Self {
Error::from(ErrorKind::Io(err))
}
}

impl From<grpcio::Error> for Error {
fn from(err: grpcio::Error) -> Self {
Error::from(ErrorKind::Grpc(err))
}
}

impl From<futures::sync::oneshot::Canceled> for Error {
fn from(err: futures::sync::oneshot::Canceled) -> Self {
Error::from(ErrorKind::Canceled(err))
}
}

impl From<kvproto::kvrpcpb::KeyError> for Error {
fn from(err: kvproto::kvrpcpb::KeyError) -> Self {
Error::from(ErrorKind::KeyError(err))
}
}

/// A result holding an [`Error`](enum.Error.html).
pub type Result<T> = result::Result<T, Error>;
Loading