diff --git a/examples/pd-client.rs b/examples/pd-client.rs index a96d5f34..10e9fbbc 100644 --- a/examples/pd-client.rs +++ b/examples/pd-client.rs @@ -7,7 +7,7 @@ extern crate tikv_client; use std::env; use std::sync::Arc; -use futures::future::Future; +use futures::Future; use simplelog::*; use tikv_client::pd::*; @@ -26,9 +26,11 @@ fn main() { let pd_client = PdRpcClient::new(&addr, Arc::clone(&security_manager)) .unwrap_or_else(|e| panic!("failed to create rpc client: {:?}", e)); - println!("Cluster ID: {}", pd_client.get_cluster_id().unwrap()); - let store = pd_client.get_store(1).wait(); - println!("Store: {:?}", store); - let region = pd_client.get_region(b"abc").wait(); - println!("Region: {:?}", region); + println!("Cluster ID: {:?}", pd_client.get_cluster_id()); + println!("Store: {:?}", pd_client.get_store(1).wait()); + println!("All Stores: {:?}", pd_client.get_all_stores().wait()); + println!("Region: {:?}", pd_client.get_region(b"abc").wait()); + for _ in 0..10 { + println!("TSO: {:?}", pd_client.get_ts().wait()); + } } diff --git a/src/pd/errors.rs b/src/errors.rs similarity index 100% rename from src/pd/errors.rs rename to src/errors.rs diff --git a/src/lib.rs b/src/lib.rs index 339e14ac..0a9bec75 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -38,6 +38,7 @@ extern crate lazy_static; #[cfg(target_os = "linux")] extern crate libc; -pub mod pd; #[macro_use] pub mod util; +pub mod errors; +pub mod pd; diff --git a/src/pd/client.rs b/src/pd/client.rs index 00a6e5f5..63d12975 100644 --- a/src/pd/client.rs +++ b/src/pd/client.rs @@ -15,14 +15,16 @@ use std::fmt; use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; -use futures::sync::oneshot; use futures::Future; use grpc::{CallOption, EnvBuilder}; use kvproto::metapb; -use kvproto::pdpb::{self, Member}; +use kvproto::pdpb::{ + GetAllStoresRequest, GetRegionByIDRequest, GetRegionRequest, GetStoreRequest, Member, +}; -use super::util::{check_resp_header, validate_endpoints, LeaderClient, Request}; +use super::leader::{check_resp_header, validate_endpoints, LeaderClient, Request}; use super::{Error, PdClient, RegionInfo, Result, PD_REQUEST_HISTOGRAM_VEC, REQUEST_TIMEOUT}; +use futures::sync::oneshot; use pd::{PdFuture, PdTimestamp}; use util::security::SecurityManager; use util::time::duration_to_sec; @@ -32,29 +34,11 @@ const CQ_COUNT: usize = 1; const CLIENT_PREFIX: &str = "pd"; const LEADER_CHANGE_RETRY: usize = 10; -macro_rules! thd_name { - ($name:expr) => {{ - $crate::util::get_tag_from_thread_name() - .map(|tag| format!("{}::{}", $name, tag)) - .unwrap_or_else(|| $name.to_owned()) - }}; -} - pub struct PdRpcClient { cluster_id: u64, leader: Arc>, } -macro_rules! request { - ($cluster_id:expr, $type:ty) => {{ - let mut request = <$type>::new(); - let mut header = pdpb::RequestHeader::new(); - header.set_cluster_id($cluster_id); - request.set_header(header); - request - }}; -} - impl PdRpcClient { pub fn new(endpoints: &[&str], security_mgr: Arc) -> Result { let env = Arc::new( @@ -67,34 +51,29 @@ impl PdRpcClient { Ok(PdRpcClient { cluster_id: members.get_header().get_cluster_id(), - leader: Arc::new(RwLock::new(LeaderClient::new( - env, - security_mgr, - client, - members, - ))), + leader: LeaderClient::new(env, security_mgr, client, members), }) } + fn get_leader(&self) -> Member { + self.leader.rl().members.get_leader().clone() + } + #[inline] fn call_option() -> CallOption { CallOption::default().timeout(Duration::from_secs(REQUEST_TIMEOUT)) } - pub fn get_leader(&self) -> Member { - self.leader.rl().members.get_leader().clone() - } - fn get_region_and_leader_async( &self, key: &[u8], ) -> impl Future), Error = Error> { let timer = Instant::now(); - let mut req = request!(self.cluster_id, pdpb::GetRegionRequest); + let mut req = request!(self.cluster_id, GetRegionRequest); req.set_region_key(key.to_owned()); - let executor = move |client: &RwLock, req: pdpb::GetRegionRequest| { + let executor = move |client: &RwLock, req: GetRegionRequest| { let receiver = client .rl() .client @@ -119,84 +98,86 @@ impl PdRpcClient { Ok((region, leader)) })) as PdFuture<_> }; - self.request(req, executor, LEADER_CHANGE_RETRY).execute() + self.request(req, executor, LEADER_CHANGE_RETRY) } - fn get_store_async(&self, store_id: u64) -> impl Future { - let timer = Instant::now(); + fn request(&self, req: Req, func: F, retry: usize) -> PdFuture + where + Req: Clone + Send + 'static, + Resp: Send + fmt::Debug + 'static, + F: FnMut(&RwLock, Req) -> PdFuture + Send + 'static, + { + let future = Request::new(req, func, Arc::clone(&self.leader), retry).execute(); + let (tx, rx) = oneshot::channel(); + let future = Box::new( + future + .and_then(move |resp| { + tx.send(resp).unwrap(); + Ok(()) + }).map_err(|e| panic!("{}", e)), + ); + self.leader.wl().schedule(future); + Box::new(rx.map_err(Error::Canceled).and_then(Ok)) + } +} - let mut req = request!(self.cluster_id, pdpb::GetStoreRequest); - req.set_store_id(store_id); +impl PdClient for PdRpcClient { + fn get_cluster_id(&self) -> Result { + Ok(self.cluster_id) + } + + fn handle_reconnect(&self, f: F) { + let mut leader = self.leader.wl(); + leader.on_reconnect = Some(Box::new(f)); + } - let executor = move |client: &RwLock, req: pdpb::GetStoreRequest| { + fn get_all_stores(&self) -> PdFuture> { + let timer = Instant::now(); + + let executor = move |client: &RwLock, req: GetAllStoresRequest| { let receiver = client .rl() .client - .get_store_async_opt(&req, Self::call_option()) + .get_all_stores_async_opt(&req, Self::call_option()) .unwrap(); Box::new(receiver.map_err(Error::Grpc).and_then(move |mut resp| { PD_REQUEST_HISTOGRAM_VEC - .with_label_values(&["get_store"]) + .with_label_values(&["get_all_stores"]) .observe(duration_to_sec(timer.elapsed())); check_resp_header(resp.get_header())?; - Ok(resp.take_store()) + Ok(resp.take_stores().into_vec()) })) as PdFuture<_> }; - self.request(req, executor, LEADER_CHANGE_RETRY).execute() - } - - pub fn get_cluster_id(&self) -> Result { - Ok(self.cluster_id) - } - - pub fn get_ts(&self) -> Result { - self.get_ts_async().wait() + self.request( + request!(self.cluster_id, GetAllStoresRequest), + executor, + LEADER_CHANGE_RETRY, + ) } - pub fn get_ts_async(&self) -> PdFuture { + fn get_store(&self, store_id: u64) -> PdFuture { let timer = Instant::now(); - let mut req = request!(self.cluster_id, pdpb::TsoRequest); - req.set_count(1); - - let (tx, rx) = oneshot::channel::(); - let leader = self.leader.wl(); - leader.tso_requests_sender.unbounded_send(tx).unwrap(); - Box::new(rx.map_err(Error::Canceled).and_then(move |ts| { - PD_REQUEST_HISTOGRAM_VEC - .with_label_values(&["get_ts"]) - .observe(duration_to_sec(timer.elapsed())); - Ok(ts) - })) - } - - pub fn on_reconnect(&self, f: Box) { - let mut leader = self.leader.wl(); - leader.on_reconnect = Some(f); - } - - pub fn request(&self, req: Req, func: F, retry: usize) -> Request - where - Req: Clone + Send + 'static, - Resp: Send + 'static, - F: FnMut(&RwLock, Req) -> PdFuture + Send + 'static, - { - Request::new(req, func, Arc::clone(&self.leader), retry) - } -} - -impl PdClient for PdRpcClient { - fn get_cluster_id(&self) -> Result { - Ok(self.cluster_id) - } + let mut req = request!(self.cluster_id, GetStoreRequest); + req.set_store_id(store_id); - fn get_cluster_config(&self) -> PdFuture { - unimplemented!() - } + let executor = move |client: &RwLock, req: GetStoreRequest| { + let receiver = client + .rl() + .client + .get_store_async_opt(&req, Self::call_option()) + .unwrap(); + Box::new(receiver.map_err(Error::Grpc).and_then(move |mut resp| { + PD_REQUEST_HISTOGRAM_VEC + .with_label_values(&["get_store"]) + .observe(duration_to_sec(timer.elapsed())); + check_resp_header(resp.get_header())?; + Ok(resp.take_store()) + })) as PdFuture<_> + }; - fn get_store(&self, store_id: u64) -> PdFuture { - Box::new(self.get_store_async(store_id).and_then(Ok)) + self.request(req, executor, LEADER_CHANGE_RETRY) } fn get_region(&self, key: &[u8]) -> PdFuture { @@ -213,10 +194,10 @@ impl PdClient for PdRpcClient { fn get_region_by_id(&self, region_id: u64) -> PdFuture> { let timer = Instant::now(); - let mut req = request!(self.cluster_id, pdpb::GetRegionByIDRequest); + let mut req = request!(self.cluster_id, GetRegionByIDRequest); req.set_region_id(region_id); - let executor = move |client: &RwLock, req: pdpb::GetRegionByIDRequest| { + let executor = move |client: &RwLock, req: GetRegionByIDRequest| { let handler = client .rl() .client @@ -235,7 +216,11 @@ impl PdClient for PdRpcClient { })) as PdFuture<_> }; - self.request(req, executor, LEADER_CHANGE_RETRY).execute() + self.request(req, executor, LEADER_CHANGE_RETRY) + } + + fn get_ts(&self) -> PdFuture { + Box::new(self.leader.wl().get_ts()) } } diff --git a/src/pd/util.rs b/src/pd/leader.rs similarity index 56% rename from src/pd/util.rs rename to src/pd/leader.rs index 26d9c6be..329a1445 100644 --- a/src/pd/util.rs +++ b/src/pd/leader.rs @@ -1,51 +1,223 @@ use fxhash::FxHashSet as HashSet; use std::result; -use std::sync::Arc; -use std::sync::RwLock; +use std::sync::{Arc, RwLock}; +use std::thread::{self, JoinHandle}; use std::time::Duration; use std::time::Instant; use futures::future::{loop_fn, ok, Loop}; use futures::sync::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; use futures::sync::oneshot; -use futures::task::Task; -use futures::Future; -use grpc::{CallOption, ChannelBuilder, ClientDuplexReceiver, ClientDuplexSender, Environment}; +use futures::{Future, Sink, Stream}; +use grpc::{CallOption, ChannelBuilder, Environment, WriteFlags}; use kvproto::pdpb::{ GetMembersRequest, GetMembersResponse, ResponseHeader, TsoRequest, TsoResponse, }; use kvproto::pdpb_grpc::PdClient; use tokio_timer::timer::Handle; -use super::{Error, PdFuture, PdTimestamp, Result, REQUEST_TIMEOUT}; +use super::{Error, PdFuture, PdTimestamp, Result, PD_REQUEST_HISTOGRAM_VEC, REQUEST_TIMEOUT}; +use tokio_core::reactor::{Core, Handle as OtherHandle}; use util::security::SecurityManager; +use util::time::duration_to_sec; use util::timer::GLOBAL_TIMER_HANDLE; -use util::{Either, HandyRwLock}; - -macro_rules! box_err { - ($e:expr) => ({ - use std::error::Error; - let e: Box = format!("[{}:{}]: {}", file!(), line!(), $e).into(); - e.into() - }); - ($f:tt, $($arg:expr),+) => ({ - box_err!(format!($f, $($arg),+)) - }); +use util::HandyRwLock; + +macro_rules! request { + ($cluster_id:expr, $type:ty) => {{ + let mut request = <$type>::new(); + let mut header = ::kvproto::pdpb::RequestHeader::new(); + header.set_cluster_id($cluster_id); + request.set_header(header); + request + }}; +} + +type TsoChannel = oneshot::Sender; +type PdRequest = Box + Send>; + +pub enum PdTask { + TsoInit, + TsoRequest, + TsoResponse(Vec>, TsoResponse), + Request(PdRequest), +} + +struct PdReactor { + task_tx: Option>>, + tso_tx: UnboundedSender, + tso_rx: Option>, + + handle: Option>, + tso_pending: Option>, + tso_buffer: Option>, + tso_batch: Vec, +} + +impl Drop for PdReactor { + fn drop(&mut self) { + if let Some(handle) = self.handle.take() { + handle.join().unwrap(); + } + } +} + +impl PdReactor { + fn new() -> PdReactor { + let (tso_tx, tso_rx) = unbounded(); + PdReactor { + task_tx: None, + tso_tx, + tso_rx: Some(tso_rx), + handle: None, + tso_buffer: Some(Vec::with_capacity(8)), + tso_batch: Vec::with_capacity(8), + tso_pending: None, + } + } + + fn start(&mut self, client: Arc>) { + if self.handle.is_none() { + info!("starting pd reactor thread"); + let (task_tx, task_rx) = unbounded(); + task_tx.unbounded_send(Some(PdTask::TsoInit)).unwrap(); + self.task_tx = Some(task_tx); + self.handle = Some( + thread::Builder::new() + .name("dispatcher thread".to_owned()) + .spawn(move || Self::poll(&client, task_rx)) + .unwrap(), + ) + } else { + warn!("tso sender and receiver are stale, refreshing.."); + let (tso_tx, tso_rx) = unbounded(); + self.tso_tx = tso_tx; + self.tso_rx = Some(tso_rx); + self.schedule(PdTask::TsoInit); + } + } + + fn schedule(&self, task: PdTask) { + self.task_tx + .as_ref() + .unwrap() + .unbounded_send(Some(task)) + .expect("unbounded send should never fail"); + } + + fn poll(client: &Arc>, rx: UnboundedReceiver>) { + let mut core = Core::new().unwrap(); + let handle = core.handle(); + { + let f = rx.take_while(|t| Ok(t.is_some())).for_each(|t| { + Self::dispatch(&client, t.unwrap(), &handle); + Ok(()) + }); + core.run(f).unwrap(); + } + } + + fn init(client: &Arc>, handle: &OtherHandle) { + let client = Arc::clone(client); + let (tx, rx) = client.wl().client.tso().unwrap(); + let tso_rx = client.wl().reactor.tso_rx.take().unwrap(); + handle.spawn( + tx.sink_map_err(Error::Grpc) + .send_all(tso_rx.then(|r| match r { + Ok(r) => Ok((r, WriteFlags::default())), + Err(()) => Err(Error::Other(box_err!("failed to recv tso requests"))), + })).then(|r| match r { + Ok((mut sender, _)) => { + sender.get_mut().cancel(); + Ok(()) + } + Err(e) => { + error!("failed to send tso requests: {:?}", e); + Err(()) + } + }), + ); + handle.spawn( + rx.for_each(move |resp| { + let mut client = client.wl(); + let reactor = &mut client.reactor; + let tso_pending = reactor.tso_pending.take().unwrap(); + reactor.schedule(PdTask::TsoResponse(tso_pending, resp)); + if !reactor.tso_batch.is_empty() { + /* schedule another tso_batch of request */ + reactor.schedule(PdTask::TsoRequest); + } + Ok(()) + }).map_err(|e| panic!("unexpected error: {:?}", e)), + ); + } + + fn tso_request(client: &Arc>) { + let mut client = client.wl(); + let cluster_id = client.cluster_id; + let reactor = &mut client.reactor; + let mut tso_batch = reactor.tso_buffer.take().unwrap(); + tso_batch.extend(reactor.tso_batch.drain(..)); + let mut request = request!(cluster_id, TsoRequest); + request.set_count(tso_batch.len() as u32); + reactor.tso_pending = Some(tso_batch); + reactor.tso_tx.unbounded_send(request).unwrap(); + } + + fn tso_response( + client: &Arc>, + mut requests: Vec, + response: &TsoResponse, + ) { + let timestamp = response.get_timestamp(); + for (offset, request) in requests.drain(..).enumerate() { + request + .send(PdTimestamp { + physical: timestamp.physical, + logical: timestamp.logical + offset as i64, + }).unwrap(); + } + client.wl().reactor.tso_buffer = Some(requests); + } + + fn dispatch(client: &Arc>, task: PdTask, handle: &OtherHandle) { + match task { + PdTask::TsoRequest => Self::tso_request(client), + PdTask::TsoResponse(requests, response) => { + Self::tso_response(client, requests, &response) + } + PdTask::TsoInit => Self::init(client, handle), + PdTask::Request(task) => handle.spawn(task), + } + } + + fn get_ts(&mut self) -> impl Future { + let timer = Instant::now(); + let (tx, rx) = oneshot::channel::(); + self.tso_batch.push(tx); + if self.tso_pending.is_none() { + /* schedule tso request to run */ + self.schedule(PdTask::TsoRequest); + } + rx.map_err(Error::Canceled).and_then(move |ts| { + PD_REQUEST_HISTOGRAM_VEC + .with_label_values(&["get_ts"]) + .observe(duration_to_sec(timer.elapsed())); + Ok(ts) + }) + } } pub struct LeaderClient { - env: Arc, pub client: PdClient, pub members: GetMembersResponse, - security_mgr: Arc, pub on_reconnect: Option>, - pub tso_sender: Either>, UnboundedSender>, - pub tso_receiver: Either>, Task>, - - pub tso_requests: UnboundedReceiver>, - pub tso_requests_sender: UnboundedSender>, + env: Arc, + cluster_id: u64, + security_mgr: Arc, last_update: Instant, + reactor: PdReactor, } impl LeaderClient { @@ -54,23 +226,29 @@ impl LeaderClient { security_mgr: Arc, client: PdClient, members: GetMembersResponse, - ) -> LeaderClient { - let (tx, rx) = client.tso().unwrap(); - let (tso_sender, tso_receiver) = unbounded(); - LeaderClient { + ) -> Arc> { + let cluster_id = members.get_header().get_cluster_id(); + let client = Arc::new(RwLock::new(LeaderClient { env, - tso_sender: Either::Left(Some(tx)), - tso_receiver: Either::Left(Some(rx)), client, members, security_mgr, on_reconnect: None, - last_update: Instant::now(), + reactor: PdReactor::new(), + cluster_id, + })); - tso_requests: tso_receiver, - tso_requests_sender: tso_sender, - } + client.wl().reactor.start(Arc::clone(&client)); + client + } + + pub fn get_ts(&mut self) -> impl Future { + self.reactor.get_ts() + } + + pub fn schedule(&self, task: PdRequest) { + self.reactor.schedule(PdTask::Request(task)); } } @@ -337,8 +515,7 @@ pub fn check_resp_header(header: &ResponseHeader) -> Result<()> { } // Re-establish connection with PD leader in synchronized fashion. -pub fn reconnect(leader: &RwLock) -> Result<()> { - println!("try reconnect"); +pub fn reconnect(leader: &Arc>) -> Result<()> { let ((client, members), start) = { let leader = leader.rl(); if leader.last_update.elapsed() < Duration::from_secs(RECONNECT_INTERVAL_SEC) { @@ -354,26 +531,15 @@ pub fn reconnect(leader: &RwLock) -> Result<()> { }; { + let leader_clone = Arc::clone(leader); let mut leader = leader.wl(); - let (tx, rx) = client.tso().unwrap(); - warn!("tso sender and receiver are stale, refreshing.."); - - // Try to cancel an unused tso sender. - if let Either::Left(Some(ref mut r)) = leader.tso_sender { - info!("cancel tso sender"); - r.cancel(); - } - leader.tso_sender = Either::Left(Some(tx)); - if let Either::Right(ref mut task) = leader.tso_receiver { - task.notify(); - } - leader.tso_receiver = Either::Left(Some(rx)); leader.client = client; leader.members = members; leader.last_update = Instant::now(); if let Some(ref on_reconnect) = leader.on_reconnect { on_reconnect(); } + leader.reactor.start(leader_clone); } warn!("updating PD client done, spent {:?}", start.elapsed()); Ok(()) diff --git a/src/pd/mod.rs b/src/pd/mod.rs index 07afad72..6c9203d7 100644 --- a/src/pd/mod.rs +++ b/src/pd/mod.rs @@ -1,11 +1,9 @@ +#[macro_use] +mod leader; mod client; -mod util; -pub mod errors; pub use self::client::PdRpcClient; -pub use self::errors::{Error, Result}; -pub use self::util::validate_endpoints; -pub use self::util::RECONNECT_INTERVAL_SEC; +pub use errors::{Error, Result}; use std::ops::Deref; @@ -39,6 +37,7 @@ impl Deref for RegionInfo { pub const INVALID_ID: u64 = 0; const REQUEST_TIMEOUT: u64 = 2; // 2s +#[derive(Debug)] pub struct PdTimestamp { pub physical: i64, pub logical: i64, @@ -60,12 +59,7 @@ pub trait PdClient: Send + Sync { fn get_store(&self, store_id: u64) -> PdFuture; // Get all stores information. - fn get_all_stores(&self) -> PdFuture> { - unimplemented!(); - } - - // Get cluster meta information. - fn get_cluster_config(&self) -> PdFuture; + fn get_all_stores(&self) -> PdFuture>; // For route. // Get region which the key belong to. @@ -81,4 +75,7 @@ pub trait PdClient: Send + Sync { // // Please note that this method should only be called once. fn handle_reconnect(&self, _: F) {} + + // get a timestamp from PD + fn get_ts(&self) -> PdFuture; }