Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ fn main() -> Result<()> {
// It is best to pass a `Vec<u8>` in terms of explictness and speed. `String`s and a few other
// types are supported as well, but it all ends up as `Vec<u8>` in the end.
let key: String = String::from(KEY);
let value: Value = client.get(key.clone()).wait()?;
let value: Value = client.get(key.clone()).wait()?.expect("value must exist");
assert_eq!(value.as_ref(), VALUE.as_bytes());
println!("Get key \"{:?}\" returned value \"{:?}\".", value, KEY);

Expand Down
6 changes: 3 additions & 3 deletions src/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl Client {
/// # let connected_client = connecting_client.wait().unwrap();
/// let key = "TiKV";
/// let req = connected_client.get(key);
/// let result: Value = req.wait().unwrap();
/// let result: Option<Value> = req.wait().unwrap();
/// ```
pub fn get(&self, key: impl Into<Key>) -> Get {
Get::new(self.rpc(), GetInner::new(key.into()))
Expand Down Expand Up @@ -388,7 +388,7 @@ impl Get {
}

impl Future for Get {
type Item = Value;
type Item = Option<Value>;
type Error = Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
Expand All @@ -407,7 +407,7 @@ impl GetInner {
}

impl RequestInner for GetInner {
type Resp = Value;
type Resp = Option<Value>;

fn execute(self, client: Arc<RpcClient>, cf: Option<ColumnFamily>) -> KvFuture<Self::Resp> {
Box::new(client.raw_get(self.key, cf))
Expand Down
135 changes: 60 additions & 75 deletions src/rpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,12 +231,11 @@ impl RpcClient {
key: &Key,
cf: Option<ColumnFamily>,
) -> impl Future<Item = RawContext, Error = Error> {
Self::region_context(inner, key)
.map(move |(region, client)| RawContext::new(region, client, cf))
Self::region_context(inner, key).map(|(region, client)| RawContext::new(region, client, cf))
}

fn txn(inner: Arc<RpcClientInner>, key: &Key) -> impl Future<Item = TxnContext, Error = Error> {
Self::region_context(inner, key).map(move |(region, _client)| TxnContext::new(region))
Self::region_context(inner, key).map(|(region, _client)| TxnContext::new(region))
}

#[inline]
Expand All @@ -248,16 +247,10 @@ impl RpcClient {
&self,
key: Key,
cf: Option<ColumnFamily>,
) -> impl Future<Item = Value, Error = Error> {
) -> impl Future<Item = Option<Value>, Error = Error> {
Self::raw(self.inner(), &key, cf)
.and_then(move |context| context.client().raw_get(context, key))
.and_then(move |value| {
if value.is_empty() {
Err(Error::NoSuchKey)
} else {
Ok(value)
}
})
.and_then(|context| context.client().raw_get(context, key))
.map(|value| if value.is_empty() { None } else { Some(value) })
}

pub fn raw_batch_get(
Expand All @@ -274,8 +267,8 @@ impl RpcClient {
let inner = Arc::clone(&inner);
let cf = cf.clone();
let task = Self::region_context_by_id(inner, region.id)
.map(move |(region, client)| RawContext::new(region, client, cf))
.and_then(move |context| {
.map(|(region, client)| RawContext::new(region, client, cf))
.and_then(|context| {
context.client().raw_batch_get(context, keys.into_iter())
});
tasks.push(task);
Expand All @@ -296,7 +289,7 @@ impl RpcClient {
} else {
Either::B(
Self::raw(self.inner(), &key, cf)
.and_then(move |context| context.client().raw_put(context, key, value)),
.and_then(|context| context.client().raw_put(context, key, value)),
)
}
}
Expand All @@ -319,10 +312,8 @@ impl RpcClient {
let inner = Arc::clone(&inner);
let cf = cf.clone();
let task = Self::region_context_by_id(inner, region.id)
.map(move |(region, client)| RawContext::new(region, client, cf))
.and_then(move |context| {
context.client().raw_batch_put(context, pairs)
});
.map(|(region, client)| RawContext::new(region, client, cf))
.and_then(|context| context.client().raw_batch_put(context, pairs));
tasks.push(task);
}
future::join_all(tasks)
Expand All @@ -338,7 +329,7 @@ impl RpcClient {
cf: Option<ColumnFamily>,
) -> impl Future<Item = (), Error = Error> {
Self::raw(self.inner(), &key, cf)
.and_then(move |context| context.client().raw_delete(context, key))
.and_then(|context| context.client().raw_delete(context, key))
}

pub fn raw_batch_delete(
Expand All @@ -355,8 +346,8 @@ impl RpcClient {
let inner = Arc::clone(&inner);
let cf = cf.clone();
let task = Self::region_context_by_id(inner, region.id)
.map(move |(region, client)| RawContext::new(region, client, cf))
.and_then(move |context| context.client().raw_batch_delete(context, keys));
.map(|(region, client)| RawContext::new(region, client, cf))
.and_then(|context| context.client().raw_batch_delete(context, keys));
tasks.push(task);
}
future::join_all(tasks)
Expand Down Expand Up @@ -386,35 +377,33 @@ impl RpcClient {
);
let inner = Arc::clone(&self.inner);
loop_fn((inner, scan), |(inner, scan)| {
inner
.locate_key(scan.start_key())
.and_then(move |location| {
let region = location.into_inner();
let cf = scan.cf.clone();
Self::region_context_by_id(Arc::clone(&inner), region.id)
.map(move |(region, client)| {
(scan, region.range(), RawContext::new(region, client, cf))
})
.and_then(move |(mut scan, region_range, context)| {
let (start_key, end_key) = scan.range();
context
.client()
.raw_scan(context, start_key, end_key, scan.limit, scan.key_only)
.map(move |pairs| (scan, region_range, pairs))
})
.map(move |(mut scan, region_range, mut pairs)| {
let limit = scan.limit;
scan.result_mut().append(&mut pairs);
if scan.result().len() as u32 >= limit {
Loop::Break(scan.into_inner())
} else {
match scan.next(region_range) {
ScanRegionsStatus::Continue => Loop::Continue((inner, scan)),
ScanRegionsStatus::Break => Loop::Break(scan.into_inner()),
}
inner.locate_key(scan.start_key()).and_then(|location| {
let region = location.into_inner();
let cf = scan.cf.clone();
Self::region_context_by_id(Arc::clone(&inner), region.id)
.map(|(region, client)| {
(scan, region.range(), RawContext::new(region, client, cf))
})
.and_then(|(mut scan, region_range, context)| {
let (start_key, end_key) = scan.range();
context
.client()
.raw_scan(context, start_key, end_key, scan.limit, scan.key_only)
.map(|pairs| (scan, region_range, pairs))
})
.map(|(mut scan, region_range, mut pairs)| {
let limit = scan.limit;
scan.result_mut().append(&mut pairs);
if scan.result().len() as u32 >= limit {
Loop::Break(scan.into_inner())
} else {
match scan.next(region_range) {
ScanRegionsStatus::Continue => Loop::Continue((inner, scan)),
ScanRegionsStatus::Break => Loop::Break(scan.into_inner()),
}
})
})
}
})
})
})
}

Expand All @@ -438,31 +427,27 @@ impl RpcClient {
let scan: ScanRegionsContext<(), Option<ColumnFamily>> = ScanRegionsContext::new(range, cf);
let inner = Arc::clone(&self.inner);
loop_fn((inner, scan), |(inner, scan)| {
inner
.locate_key(scan.start_key())
.and_then(move |location| {
let region = location.into_inner();
let cf = scan.clone();
Self::region_context_by_id(Arc::clone(&inner), region.id)
.map(move |(region, client)| {
(scan, region.range(), RawContext::new(region, client, cf))
})
.and_then(move |(mut scan, region_range, context)| {
let (start_key, end_key) = scan.range();
let start_key = start_key.expect("start key must be specified");
let end_key = end_key.expect("end key must be specified");
context
.client()
.raw_delete_range(context, start_key, end_key)
.map(move |_| (scan, region_range))
})
.map(
move |(mut scan, region_range)| match scan.next(region_range) {
ScanRegionsStatus::Continue => Loop::Continue((inner, scan)),
ScanRegionsStatus::Break => Loop::Break(()),
},
)
})
inner.locate_key(scan.start_key()).and_then(|location| {
let region = location.into_inner();
let cf = scan.clone();
Self::region_context_by_id(Arc::clone(&inner), region.id)
.map(|(region, client)| {
(scan, region.range(), RawContext::new(region, client, cf))
})
.and_then(|(mut scan, region_range, context)| {
let (start_key, end_key) = scan.range();
let start_key = start_key.expect("start key must be specified");
let end_key = end_key.expect("end key must be specified");
context
.client()
.raw_delete_range(context, start_key, end_key)
.map(|_| (scan, region_range))
})
.map(|(mut scan, region_range)| match scan.next(region_range) {
ScanRegionsStatus::Continue => Loop::Continue((inner, scan)),
ScanRegionsStatus::Break => Loop::Break(()),
})
})
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/rpc/pd/leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ pub fn try_connect_leader(
let mut resp = None;
// Try to connect to other members, then the previous leader.
'outer: for m in members
.into_iter()
.iter()
.filter(|m| *m != previous_leader)
.chain(&[previous_leader.clone()])
{
Expand Down
2 changes: 1 addition & 1 deletion src/rpc/security.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ mod tests {
let example_cert = temp.path().join("cert");
let example_pem = temp.path().join("key");
for (id, f) in (&[&example_ca, &example_cert, &example_pem])
.into_iter()
.iter()
.enumerate()
{
File::create(f).unwrap().write_all(&[id as u8]).unwrap();
Expand Down
10 changes: 7 additions & 3 deletions tests/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,18 @@ fn test_existence(client: &Client, existing_pairs: &[KvPair], not_existing_keys:
for pair in existing_pairs.iter().map(Clone::clone) {
let (key, value) = pair.into_inner();
assert_eq!(
client.get(key).wait().expect("Could not get value"),
client
.get(key)
.wait()
.expect("Could not get value")
.expect("key doesn't exist"),
value.clone(),
);
}

for key in not_existing_keys.clone().into_iter() {
let r = client.get(key).wait();
assert!(r.is_err());
let r = client.get(key).wait().expect("Cound not get value");
assert!(r.is_none());
}

let mut existing_keys = Vec::with_capacity(existing_pairs.len());
Expand Down