Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ env:
cache: cargo

rust:
os:
- linux
- windows
- osx

matrix:
include:
Expand All @@ -33,6 +37,8 @@ matrix:


script:
- docker run -d --net=host --name pd --rm pingcap/pd --name "pd" --data-dir "pd" --client-urls "http://127.0.0.1:2379" --advertise-client-urls "http://127.0.0.1:2379"
- docker run -d --net=host --name kv --rm pingcap/tikv --pd-endpoints "127.0.0.1:2379" --addr "127.0.0.1:2378" --data-dir "kv"
- cargo test --all -- --nocapture
# Validate benches still work.
- cargo bench --all -- --test
17 changes: 17 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,20 @@ serde = "1.0"
serde_derive = "1.0"
quick-error = "1.2"
grpcio = { version = "0.4", features = [ "secure" ] }
protobuf = "~2.0"
tokio-core = "0.1"
tokio-timer = "0.2"
fxhash = "0.2"
lazy_static = "0.2.1"
log = "0.3.9"

[dependencies.kvproto]
git = "https://github.com/pingcap/kvproto.git"

[dependencies.prometheus]
version = "0.4.2"
default-features = false
features = ["push", "process"]

[dev-dependencies]
tempdir = "0.3"
102 changes: 70 additions & 32 deletions examples/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,66 +13,104 @@

use futures::future::Future;
use std::path::PathBuf;
use tikv_client::*;
use tikv_client::{raw::Client, Config, Key, KvPair, Result, Value};

fn main() {
let config = Config::new(vec!["127.0.0.1:3379"]).with_security(
const KEY: &str = "TiKV";
const VALUE: &str = "Rust";
const CUSTOM_CF: &str = "default";

fn main() -> Result<()> {
// Create a configuration to use for the example.
// Optionally encrypt the traffic.
let config = Config::new(vec![
"192.168.0.100:3379", // Avoid a single point of failure,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use clap to pass all as command args? So we don't need to rebuild it when we want to communicate with another cluster.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #16 :) Let's leave that for another PR, this PR is already far too large.

"192.168.0.101:3379", // use more than one PD endpoint.
"192.168.0.102:3379",
])
.with_security(
PathBuf::from("/path/to/ca.pem"),
PathBuf::from("/path/to/client.pem"),
PathBuf::from("/path/to/client-key.pem"),
);
let raw = raw::Client::new(&config)
.wait()
.expect("Could not connect to tikv");

let key: Key = b"Company".to_vec().into();
let value: Value = b"PingCAP".to_vec().into();
// When we first create a client we recieve a `Connect` structure which must be resolved before
// the client is actually connected and usable.
let unconnnected_client = Client::new(&config);
let client = unconnnected_client.wait()?;

raw.put(key.clone(), value.clone())
.cf("test_cf")
.wait()
.expect("Could not put kv pair to tikv");
println!("Successfully put {:?}:{:?} to tikv", key, value);
// Requests are created from the connected client. These calls return structures which
// implement `Future`. This means the `Future` must be resolved before the action ever takes
// place.
//
// Here we set the key `TiKV` to have the value `Rust` associated with it.
let put_request = client.put(KEY, VALUE);
put_request.wait()?; // Returns a `tikv_client::Error` on failure.
println!("Put key \"{}\", value \"{}\".", KEY, VALUE);

let value = raw
.get(&key)
.cf("test_cf")
.wait()
.expect("Could not get value");
println!("Found val: {:?} for key: {:?}", value, key);
//
// Unlike a standard Rust HashMap all calls take owned values. This is because under the hood
// protobufs must take ownership of the data. If we only took a borrow we'd need to internally
// clone it. This is against Rust API guidelines, so you must manage this yourself.
//
// Above, you saw we can use a `&'static str`, this is primarily for making examples short.
// This type is practical to use for real things, and usage forces an internal copy.
//
// It is best to pass a `Vec<u8>` in terms of explictness and speed. `String`s and a few other
// types are supported as well, but it all ends up as `Vec<u8>` in the end.
let key: String = String::from(KEY);
let value: Value = client.get(key.clone()).wait()?;
assert_eq!(value.as_ref(), VALUE.as_bytes());
println!("Get key \"{:?}\" returned value \"{:?}\".", value, KEY);

raw.delete(&key)
.cf("test_cf")
// You can also set the `ColumnFamily` used by the request.
// This is *advanced usage* and should have some special considerations.
client
.delete(key.clone())
.cf(CUSTOM_CF)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, I suggest removing cf here because now we can't let the user create CF dynamically. Providing cf here may confuse the users if they want to use other cf.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we change cf to private and remove it from example at this time?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sunxiaoguang Maybe you can just remove it from the example for now? If people configure TiKV with custom CFs we must allow them to use them, but perhaps this part of the API can change later.

.wait()
.expect("Could not delete value");
println!("Key: {:?} deleted", key);

raw.get(&key)
.cf("test_cf")
client
.get(key)
.cf(CUSTOM_CF)
.wait()
.expect_err("Get returned value for not existing key");

let keys = vec![b"k1".to_vec().into(), b"k2".to_vec().into()];
let pairs: Vec<KvPair> = (1..3)
.map(|i| KvPair::from((Key::from(format!("k{}", i)), Value::from(format!("v{}", i)))))
.collect();
client
.batch_put(pairs.clone())
.wait()
.expect("Could not put pairs");

let keys = vec![Key::from(b"k1".to_vec()), Key::from(b"k2".to_vec())];
Comment thread
Hoverbear marked this conversation as resolved.

let values = raw
.batch_get(&keys)
.cf("test_cf")
let values = client
.batch_get(keys.clone())
.cf(CUSTOM_CF)
.wait()
.expect("Could not get values");
println!("Found values: {:?} for keys: {:?}", values, keys);

let start: Key = b"k1".to_vec().into();
let end: Key = b"k2".to_vec().into();
raw.scan(&start..&end, 10)
.cf("test_cf")
client
.scan(start.clone()..end.clone(), 10)
.cf(CUSTOM_CF)
.key_only()
.wait()
.expect("Could not scan");

let ranges = [&start..&end, &start..&end];
raw.batch_scan(&ranges, 10)
.cf("test_cf")
let ranges = vec![start.clone()..end.clone(), start.clone()..end.clone()];
client
.batch_scan(ranges, 10)
.cf(CUSTOM_CF)
.key_only()
.wait()
.expect("Could not batch scan");

// Cleanly exit.
Ok(())
}
6 changes: 3 additions & 3 deletions examples/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ fn puts(client: &Client, pairs: impl IntoIterator<Item = impl Into<KvPair>>) {
txn.commit().wait().expect("Could not commit transaction");
}

fn get(client: &Client, key: &Key) -> Value {
fn get(client: &Client, key: Key) -> Value {
let txn = client.begin();
txn.get(key).wait().expect("Could not get value")
}
Expand Down Expand Up @@ -70,7 +70,7 @@ fn dels(client: &Client, keys: impl IntoIterator<Item = Key>) {
}

fn main() {
let config = Config::new(vec!["127.0.0.1:3379"]).with_security(
let config = Config::new(vec!["127.0.0.1:2379"]).with_security(
PathBuf::from("/path/to/ca.pem"),
PathBuf::from("/path/to/client.pem"),
PathBuf::from("/path/to/client-key.pem"),
Expand All @@ -88,7 +88,7 @@ fn main() {

// get
let key1: Key = b"key1".to_vec().into();
let value1 = get(&txn, &key1);
let value1 = get(&txn, key1.clone());
println!("{:?}", (key1, value1));

// scan
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain
Original file line number Diff line number Diff line change
@@ -1 +1 @@
stable
stable
Comment thread
brson marked this conversation as resolved.
83 changes: 71 additions & 12 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,62 +16,121 @@ use quick_error::quick_error;
use std::{error, result};

quick_error! {
/// An error originating from the TiKV client or dependencies.
///
/// This client currently uses [`quick_error`](https://docs.rs/quick-error/1.2.2/quick_error/)
/// for errors. *This may change in future versions.*
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a big deal, but mentioning the library that implements the error type doesn't seem to me to provide useful end-user info, particularly because quick_error doesn't (I think) provide any unique user-facing capabilities on top of std Error. I'd imagine that whatever this enum ends up looking like will be reflected in the API documentation.

I'm not saying to take this line out, but I am curious if you have ideas about how clients would use this info.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we intend to move to failure, I think it's more of a warning that it will change.

#[derive(Debug)]
pub enum Error {
/// Wraps a `std::io::Error`.
Io(err: ::std::io::Error) {
from()
cause(err)
description(err.description())
}
/// Wraps a `grpcio::Error`.
Grpc(err: grpcio::Error) {
from()
cause(err)
description(err.description())
}
/// Represents that a futures oneshot channel was cancelled.
Canceled(err: ::futures::sync::oneshot::Canceled) {
from()
cause(err)
description(err.description())
}
/// An unknown error.
///
/// Generally, this is not an expected error. Please report it if encountered.
Other(err: Box<error::Error + Sync + Send>) {
from()
cause(err.as_ref())
description(err.description())
display("unknown error {:?}", err)
}
/// A region was not found for the given key.
RegionForKeyNotFound(key: Vec<u8>) {
description("region is not found")
display("region is not found for key {:?}", key)
}
RegionNotFound(id: u64) {
/// A region was not found.
RegionNotFound(region_id: u64, message: Option<String>) {
description("region is not found")
display("region {:?} is not found", id)
display("region {:?} is not found. {}", region_id, message.as_ref().unwrap_or(&"".to_owned()))
}
NotLeader(region_id: u64) {
/// The peer is not a leader of the given region.
NotLeader(region_id: u64, message: Option<String>) {
description("peer is not leader")
display("peer is not leader for region {:?}.", region_id)
display("peer is not leader for region {}. {}", region_id, message.as_ref().unwrap_or(&"".to_owned()))
}
StoreNotMatch {
/// The store does not match.
StoreNotMatch(request_store_id: u64, actual_store_id: u64, message: String) {
description("store not match")
display("store not match")
display("requesting store '{}' when actual store is '{}'. {}", request_store_id, actual_store_id, message)
}
/// The given key is not within the given region.
KeyNotInRegion(key: Vec<u8>, region_id: u64, start_key: Vec<u8>, end_key: Vec<u8>) {
description("region is not found")
display("key {:?} is not in region {:?}: [{:?}, {:?})", key, region_id, start_key, end_key)
}
StaleEpoch {
/// A stale epoch.
StaleEpoch(message: Option<String>) {
description("stale epoch")
display("stale epoch")
display("{}", message.as_ref().unwrap_or(&"".to_owned()))
}
ServerIsBusy(reason: String) {
StaleCommand(message: String) {
description("stale command")
display("{}", message)
}
/// The server is too busy.
ServerIsBusy(reason: String, backoff: u64) {
description("server is busy")
display("server is busy: {:?}", reason)
display("server is busy: {:?}. Backoff {} ms", reason, backoff)
}
RaftEntryTooLarge(region_id: u64, entry_size: u64) {
/// The given raft entry is too large for the region.
RaftEntryTooLarge(region_id: u64, entry_size: u64, message: String) {
description("raft entry too large")
display("{:?} bytes raft entry of region {:?} is too large", entry_size, region_id)
display("{:?} bytes raft entry of region {:?} is too large. {}", entry_size, region_id, message)
}
KeyError(message: String) {
description("key error")
display("{}", message)
}
KVError(message: String) {
description("kv error")
display("{}", message)
}
InternalError(message: String) {
description("internal error")
display("{}", message)
}
InvalidKeyRange {
description("invalid key range")
display("Only left closed intervals are supported")
}
Unimplemented {
description("unimplemented feature")
display("Unimplemented feature")
}
EmptyValue {
description("can not set empty value")
display("Can not set empty value")
}
NoSuchKey {
description("key does not exist")
display("Key doest not exist")
}
InvalidOverlappingRanges {
description("ranges can not be overlapping")
display("Ranges can not be overlapping")
Comment thread
Hoverbear marked this conversation as resolved.
}
MaxScanLimitExceeded(limit: u32, max_limit: u32) {
description("limit exceeds max scan limit")
display("Limit {} excceds max scan limit {}", limit, max_limit)
}
}
}

/// A result holding an [`Error`](enum.Error.html).
pub type Result<T> = result::Result<T, Error>;
Loading