diff --git a/src/datastore/datastore.rs b/src/datastore/datastore.rs index 9efcb2e6..611a9224 100644 --- a/src/datastore/datastore.rs +++ b/src/datastore/datastore.rs @@ -1,5 +1,3 @@ -use std::fmt; -use std::thread; use std::collections::HashMap; use chrono::DateTime; @@ -8,13 +6,9 @@ use chrono::Utc; use chrono::Duration; use rusqlite::Connection; -use rusqlite::DropBehavior; -use rusqlite::TransactionBehavior; use serde_json::value::Value; -use crossbeam_requests; - use crate::models::Bucket; use crate::models::BucketMetadata; use crate::models::Event; @@ -22,67 +16,7 @@ use crate::transform; use rusqlite::types::ToSql; -/* TODO - * - Replace some unwraps with MpscError - * - Improve DatastoreError - */ - -#[derive(Debug,Clone)] -pub enum Responses { - Empty(), - Bucket(Bucket), - BucketMap(HashMap), - Event(Event), - EventList(Vec), - Count(i64) -} - -#[derive(Debug,Clone)] -pub enum DatastoreMethod { - Memory(), - File(String), -} - -#[derive(Debug,Clone)] -pub enum DatastoreError { - NoSuchBucket, - BucketAlreadyExists, - MpscError, - InternalError(String), -} - -#[derive(Debug,Clone)] -pub enum Commands { - CreateBucket(Bucket), - DeleteBucket(String), - GetBucket(String), - GetBuckets(), - InsertEvents(String, Vec), - Heartbeat(String, Event, f64), - GetEvents(String, Option>, Option>, Option), - GetEventCount(String, Option>, Option>), - ForceCommit(), -} - -type RequestSender = crossbeam_requests::RequestSender>; -type RequestReceiver = crossbeam_requests::RequestReceiver>; - -#[derive(Clone)] -pub struct Datastore { - method: DatastoreMethod, - requester: RequestSender, -} - -impl fmt::Debug for Datastore { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "Datastore({:?})", self.method) - } -} - -struct DatastoreWorker { - responder: RequestReceiver, - quit: bool -} +use crate::datastore::DatastoreError; /* * ### Database version changelog ### @@ -175,131 +109,20 @@ fn _migrate_v2_to_v3(conn: &Connection) { conn.pragma_update(None, "user_version", &3).expect("Failed to update database version!"); } -struct DatastoreInstance { - commit: bool, - uncommited_events: u64, +pub struct DatastoreInstance { buckets_cache: HashMap, } -impl DatastoreWorker { - pub fn new(responder: crossbeam_requests::RequestReceiver>) -> Self { - DatastoreWorker { - responder: responder, - quit: false - } - } - - fn work_loop(&mut self, method: DatastoreMethod) -> () { - let mut conn = match method { - DatastoreMethod::Memory() => Connection::open_in_memory() - .expect("Failed to create in-memory datastore"), - DatastoreMethod::File(path) => Connection::open(path) - .expect("Failed to create datastore") - }; - _create_tables(&conn); +impl DatastoreInstance { + pub fn new(conn: &Connection) -> Result { let mut ds = DatastoreInstance { - commit: false, - uncommited_events: 0, - buckets_cache: HashMap::new() + buckets_cache: HashMap::new(), }; - let mut last_heartbeat = HashMap::new(); - match ds.get_stored_buckets(&conn) { - Ok(_) => (), - Err(err) => panic!("Failed to initially load buckets from datastore! {:?}", err) - } - loop { - let mut transaction = match conn.transaction_with_behavior(TransactionBehavior::Exclusive) { - Ok(transaction) => transaction, - Err(err) => panic!("Unable to take exclusive lock on SQLite database! {}", err) - }; - transaction.set_drop_behavior(DropBehavior::Commit); - loop { - let (request, response_sender) = match self.responder.poll() { - Ok((req, res_sender)) => (req, res_sender), - Err(e) => { // All references to responder is gone, quit - info!("DB worker quitting due to {:?}", e); - self.quit = true; - break; - } - }; - let response = match request { - Commands::CreateBucket(bucket) => { - ds.commit = true; - match ds.create_bucket(&transaction, bucket) { - Ok(_) => Ok(Responses::Empty()), - Err(e) => Err(e) - } - }, - Commands::DeleteBucket(bucketname) => { - ds.commit = true; - match ds.delete_bucket(&transaction, &bucketname) { - Ok(_) => Ok(Responses::Empty()), - Err(e) => Err(e) - } - }, - Commands::GetBucket(bucketname) => { - ds.commit = true; - match ds.get_bucket(&bucketname) { - Ok(b) => Ok(Responses::Bucket(b)), - Err(e) => Err(e) - } - }, - Commands::GetBuckets() => { - ds.commit = true; - Ok(Responses::BucketMap(ds.get_buckets())) - }, - Commands::InsertEvents(bucketname, events) => { - ds.commit = true; - match ds.insert_events(&transaction, &bucketname, events) { - Ok(events) => { - last_heartbeat.insert(bucketname.to_string(), None); // invalidate last_heartbeat cache - Ok(Responses::EventList(events)) - }, - Err(e) => Err(e) - } - }, - Commands::Heartbeat(bucketname, event, pulsetime) => { - match ds.heartbeat(&transaction, &bucketname, event, pulsetime, &mut last_heartbeat) { - Ok(e) => Ok(Responses::Event(e)), - Err(e) => Err(e) - } - }, - Commands::GetEvents(bucketname, starttime_opt, endtime_opt, limit_opt) => { - ds.commit = true; - match ds.get_events(&transaction, &bucketname, starttime_opt, endtime_opt, limit_opt) { - Ok(el) => Ok(Responses::EventList(el)), - Err(e) => Err(e) - } - }, - Commands::GetEventCount(bucketname, starttime_opt, endtime_opt) => { - ds.commit = true; - match ds.get_event_count(&transaction, &bucketname, starttime_opt, endtime_opt) { - Ok(n) => Ok(Responses::Count(n)), - Err(e) => Err(e) - } - }, - Commands::ForceCommit() => { - ds.commit = true; - Ok(Responses::Empty()) - }, - }; - response_sender.respond(response); - if ds.commit || ds.uncommited_events > 100 { break }; - } - debug!("Commiting DB! Force commit {}, {} uncommited events", ds.commit, ds.uncommited_events); - match transaction.commit() { - Ok(_) => (), - Err(err) => panic!("Failed to commit datastore transaction! {}", err) - } - ds.commit = false; - ds.uncommited_events = 0; - if self.quit { break }; - } - info!("DB Worker thread finished"); + _create_tables(&conn); + ds.get_stored_buckets(&conn)?; + Ok(ds) } -} -impl DatastoreInstance { fn get_stored_buckets(&mut self, conn: &Connection) -> Result <(), DatastoreError> { let mut stmt = match conn.prepare(" SELECT buckets.id, buckets.name, buckets.type, buckets.client, @@ -372,7 +195,7 @@ impl DatastoreInstance { Ok(()) } - fn create_bucket(&mut self, conn: &Connection, mut bucket: Bucket) -> Result<(), DatastoreError> { + pub fn create_bucket(&mut self, conn: &Connection, mut bucket: Bucket) -> Result<(), DatastoreError> { bucket.created = match bucket.created { Some(created) => Some(created), None => Some(Utc::now()) @@ -397,7 +220,6 @@ impl DatastoreInstance { bucket.events = None; // Cache bucket self.buckets_cache.insert(bucket.id.clone(), bucket.clone()); - self.commit = true; // Insert events if let Some(events) = events { self.insert_events(conn, &bucket.id, events)?; @@ -416,7 +238,7 @@ impl DatastoreInstance { } } - fn delete_bucket(&mut self, conn: &Connection, bucket_id: &str) -> Result<(), DatastoreError>{ + pub fn delete_bucket(&mut self, conn: &Connection, bucket_id: &str) -> Result<(), DatastoreError>{ let bucket = (self.get_bucket(&bucket_id))?; // Delete all events in bucket match conn.execute("DELETE FROM events WHERE bucketrow = ?1", &[&bucket.bid]) { @@ -439,7 +261,7 @@ impl DatastoreInstance { } } - fn get_bucket(&self, bucket_id: &str) -> Result { + pub fn get_bucket(&self, bucket_id: &str) -> Result { let cached_bucket = self.buckets_cache.get(bucket_id); match cached_bucket { Some(bucket) => Ok(bucket.clone()), @@ -447,7 +269,7 @@ impl DatastoreInstance { } } - fn get_buckets(&self) -> HashMap { + pub fn get_buckets(&self) -> HashMap { return self.buckets_cache.clone(); } @@ -542,7 +364,6 @@ impl DatastoreInstance { Ok(_) => self.update_endtime(&mut bucket, event), Err(err) => return Err(DatastoreError::InternalError(format!("Failed to execute replace_last_event SQL statement: {}", err))) }; - self.uncommited_events += 1; Ok(()) } @@ -684,134 +505,3 @@ impl DatastoreInstance { return Ok(count); } } - -impl Datastore { - - pub fn new(dbpath: String) -> Self { - let method = DatastoreMethod::File(dbpath); - Datastore::_new_internal(method) - } - - pub fn new_in_memory() -> Self { - let method = DatastoreMethod::Memory(); - Datastore::_new_internal(method) - } - - fn _new_internal(method: DatastoreMethod) -> Self { - let (requester, responder) = crossbeam_requests::channel::>(); - let method_clone = method.clone(); - let _thread = thread::spawn(move || { - let mut di = DatastoreWorker::new(responder); - di.work_loop(method_clone); - }); - Datastore { - method, - requester, - } - } - - pub fn create_bucket(&self, bucket: &Bucket) -> Result<(), DatastoreError> { - let cmd = Commands::CreateBucket(bucket.clone()); - let receiver = self.requester.request(cmd).unwrap(); - match receiver.collect().unwrap() { - Ok(_) => Ok(()), - Err(e) => Err(e) - } - } - - pub fn delete_bucket(&self, bucket_id: &str) -> Result<(), DatastoreError>{ - let cmd = Commands::DeleteBucket(bucket_id.to_string()); - let receiver = self.requester.request(cmd).unwrap(); - match receiver.collect().unwrap() { - Ok(r) => match r { - Responses::Empty() => Ok(()), - _ => panic!("Invalid response") - }, - Err(e) => Err(e) - } - } - - pub fn get_bucket(&self, bucket_id: &str) -> Result { - let cmd = Commands::GetBucket(bucket_id.to_string()); - let receiver = self.requester.request(cmd).unwrap(); - match receiver.collect().unwrap() { - Ok(r) => match r { - Responses::Bucket(b) => Ok(b), - _ => panic!("Invalid response") - }, - Err(e) => Err(e) - } - } - - pub fn get_buckets(&self) -> Result, DatastoreError> { - let cmd = Commands::GetBuckets(); - let receiver = self.requester.request(cmd).unwrap(); - match receiver.collect().unwrap() { - Ok(r) => match r { - Responses::BucketMap(bm) => Ok(bm), - e => Err(DatastoreError::InternalError(format!("Invalid response: {:?}", e))) - }, - Err(e) => Err(e) - } - } - - pub fn insert_events(&self, bucket_id: &str, events: &[Event]) -> Result, DatastoreError> { - let cmd = Commands::InsertEvents(bucket_id.to_string(), events.to_vec()); - let receiver = self.requester.request(cmd).unwrap(); - match receiver.collect().unwrap() { - Ok(r) => match r { - Responses::EventList(events) => Ok(events), - _ => panic!("Invalid response") - }, - Err(e) => Err(e) - } - } - - pub fn heartbeat(&self, bucket_id: &str, heartbeat: Event, pulsetime: f64) -> Result { - let cmd = Commands::Heartbeat(bucket_id.to_string(), heartbeat, pulsetime); - let receiver = self.requester.request(cmd).unwrap(); - match receiver.collect().unwrap() { - Ok(r) => match r { - Responses::Event(e) => return Ok(e), - _ => panic!("Invalid response") - }, - Err(e) => Err(e) - } - } - - pub fn get_events(&self, bucket_id: &str, starttime_opt: Option>, endtime_opt: Option>, limit_opt: Option) -> Result, DatastoreError> { - let cmd = Commands::GetEvents(bucket_id.to_string(), starttime_opt.clone(), endtime_opt.clone(), limit_opt.clone()); - let receiver = self.requester.request(cmd).unwrap(); - match receiver.collect().unwrap() { - Ok(r) => match r { - Responses::EventList(el) => Ok(el), - _ => panic!("Invalid response") - }, - Err(e) => Err(e) - } - } - - pub fn get_event_count(&self, bucket_id: &str, starttime_opt: Option>, endtime_opt: Option>) -> Result { - let cmd = Commands::GetEventCount(bucket_id.to_string(), starttime_opt.clone(), endtime_opt.clone()); - let receiver = self.requester.request(cmd).unwrap(); - match receiver.collect().unwrap() { - Ok(r) => match r { - Responses::Count(n) => Ok(n), - _ => panic!("Invalid response") - }, - Err(e) => Err(e) - } - } - - pub fn force_commit(&self) -> Result<(), DatastoreError> { - let cmd = Commands::ForceCommit(); - let receiver = self.requester.request(cmd).unwrap(); - match receiver.collect().unwrap() { - Ok(r) => match r { - Responses::Empty() => Ok(()), - _ => panic!("Invalid response") - }, - Err(e) => Err(e) - } - } -} diff --git a/src/datastore/mod.rs b/src/datastore/mod.rs index 8f0b8461..2f9754df 100644 --- a/src/datastore/mod.rs +++ b/src/datastore/mod.rs @@ -1,4 +1,20 @@ mod datastore; +mod worker; -pub use self::datastore::Datastore; -pub use self::datastore::DatastoreError; +pub use self::datastore::DatastoreInstance; +pub use self::worker::Datastore; + +pub enum DatastoreMethod { + Memory(), + File(String), +} + +/* TODO: Implement this as a proper error */ +#[derive(Debug,Clone)] +pub enum DatastoreError { + NoSuchBucket, + BucketAlreadyExists, + MpscError, + ReadOnly, + InternalError(String), +} diff --git a/src/datastore/worker.rs b/src/datastore/worker.rs new file mode 100644 index 00000000..55c7e068 --- /dev/null +++ b/src/datastore/worker.rs @@ -0,0 +1,310 @@ +use std::fmt; +use std::thread; +use std::collections::HashMap; + +use chrono::DateTime; +use chrono::Utc; + +use rusqlite::Connection; +use rusqlite::DropBehavior; +use rusqlite::TransactionBehavior; + +use crate::datastore::DatastoreError; +use crate::datastore::DatastoreMethod; +use crate::datastore::DatastoreInstance; + +use crate::models::Bucket; +use crate::models::Event; + +use crossbeam_requests; + +type RequestSender = crossbeam_requests::RequestSender>; +type RequestReceiver = crossbeam_requests::RequestReceiver>; + + +#[derive(Clone)] +pub struct Datastore { + requester: RequestSender, +} + +impl fmt::Debug for Datastore { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "Datastore()") + } +} + +/* + * TODO: + * - Allow read requests to go straight through a read-only db connection instead of requesting the + * worker thread for better performance? + */ + +#[derive(Debug,Clone)] +pub enum Responses { + Empty(), + Bucket(Bucket), + BucketMap(HashMap), + Event(Event), + EventList(Vec), + Count(i64) +} + +#[derive(Debug,Clone)] +pub enum Commands { + CreateBucket(Bucket), + DeleteBucket(String), + GetBucket(String), + GetBuckets(), + InsertEvents(String, Vec), + Heartbeat(String, Event, f64), + GetEvents(String, Option>, Option>, Option), + GetEventCount(String, Option>, Option>), + ForceCommit(), +} + +struct DatastoreWorker { + responder: RequestReceiver, + quit: bool +} + +impl DatastoreWorker { + pub fn new(responder: crossbeam_requests::RequestReceiver>) -> Self { + DatastoreWorker { + responder, + quit: false, + } + } + + fn work_loop(&mut self, method: DatastoreMethod) -> () { + // Open SQLite connection + let mut conn = match method { + DatastoreMethod::Memory() => Connection::open_in_memory() + .expect("Failed to create in-memory datastore"), + DatastoreMethod::File(path) => Connection::open(path) + .expect("Failed to create datastore") + }; + let mut ds = DatastoreInstance::new(&conn).unwrap(); + let mut last_heartbeat = HashMap::new(); + loop { + let mut transaction = match conn.transaction_with_behavior(TransactionBehavior::Immediate) { + Ok(transaction) => transaction, + Err(err) => panic!("Unable to start immediate transaction on SQLite database! {}", err) + }; + let mut commit = false; + let mut uncommited_events = 0; + transaction.set_drop_behavior(DropBehavior::Commit); + loop { + let (request, response_sender) = match self.responder.poll() { + Ok((req, res_sender)) => (req, res_sender), + Err(_) => { // All references to responder is gone, quit + info!("DB worker quitting"); + self.quit = true; + break; + } + }; + let response = match request { + Commands::CreateBucket(bucket) => { + match ds.create_bucket(&transaction, bucket) { + Ok(_) => { + commit = true; + Ok(Responses::Empty()) + }, + Err(e) => Err(e) + } + }, + Commands::DeleteBucket(bucketname) => { + match ds.delete_bucket(&transaction, &bucketname) { + Ok(_) => { + commit = true; + Ok(Responses::Empty()) + }, + Err(e) => Err(e) + } + }, + Commands::GetBucket(bucketname) => { + match ds.get_bucket(&bucketname) { + Ok(b) => Ok(Responses::Bucket(b)), + Err(e) => Err(e) + } + }, + Commands::GetBuckets() => { + Ok(Responses::BucketMap(ds.get_buckets())) + }, + Commands::InsertEvents(bucketname, events) => { + match ds.insert_events(&transaction, &bucketname, events) { + Ok(events) => { + uncommited_events += events.len(); + last_heartbeat.insert(bucketname.to_string(), None); // invalidate last_heartbeat cache + Ok(Responses::EventList(events)) + }, + Err(e) => Err(e) + } + }, + Commands::Heartbeat(bucketname, event, pulsetime) => { + match ds.heartbeat(&transaction, &bucketname, event, pulsetime, &mut last_heartbeat) { + Ok(e) => { + uncommited_events += 1; + Ok(Responses::Event(e)) + }, + Err(e) => Err(e) + } + }, + Commands::GetEvents(bucketname, starttime_opt, endtime_opt, limit_opt) => { + match ds.get_events(&transaction, &bucketname, starttime_opt, endtime_opt, limit_opt) { + Ok(el) => Ok(Responses::EventList(el)), + Err(e) => Err(e) + } + }, + Commands::GetEventCount(bucketname, starttime_opt, endtime_opt) => { + match ds.get_event_count(&transaction, &bucketname, starttime_opt, endtime_opt) { + Ok(n) => Ok(Responses::Count(n)), + Err(e) => Err(e) + } + }, + Commands::ForceCommit() => { + commit = true; + Ok(Responses::Empty()) + }, + }; + response_sender.respond(response); + if commit || uncommited_events > 100 { break }; + } + debug!("Commiting DB! Force commit {}, {} uncommited events", commit, uncommited_events); + match transaction.commit() { + Ok(_) => (), + Err(err) => panic!("Failed to commit datastore transaction! {}", err) + } + if self.quit { break }; + } + info!("DB Worker thread finished"); + } +} + +impl Datastore { + + pub fn new(dbpath: String) -> Self { + let method = DatastoreMethod::File(dbpath); + Datastore::_new_internal(method) + } + + pub fn new_in_memory() -> Self { + let method = DatastoreMethod::Memory(); + Datastore::_new_internal(method) + } + + fn _new_internal(method: DatastoreMethod) -> Self { + let (requester, responder) = crossbeam_requests::channel::>(); + let _thread = thread::spawn(move || { + let mut di = DatastoreWorker::new(responder); + di.work_loop(method); + }); + Datastore { + requester, + } + } + + pub fn create_bucket(&self, bucket: &Bucket) -> Result<(), DatastoreError> { + let cmd = Commands::CreateBucket(bucket.clone()); + let receiver = self.requester.request(cmd).unwrap(); + match receiver.collect().unwrap() { + Ok(_) => Ok(()), + Err(e) => Err(e) + } + } + + pub fn delete_bucket(&self, bucket_id: &str) -> Result<(), DatastoreError>{ + let cmd = Commands::DeleteBucket(bucket_id.to_string()); + let receiver = self.requester.request(cmd).unwrap(); + match receiver.collect().unwrap() { + Ok(r) => match r { + Responses::Empty() => Ok(()), + _ => panic!("Invalid response") + }, + Err(e) => Err(e) + } + } + + pub fn get_bucket(&self, bucket_id: &str) -> Result { + let cmd = Commands::GetBucket(bucket_id.to_string()); + let receiver = self.requester.request(cmd).unwrap(); + match receiver.collect().unwrap() { + Ok(r) => match r { + Responses::Bucket(b) => Ok(b), + _ => panic!("Invalid response") + }, + Err(e) => Err(e) + } + } + + pub fn get_buckets(&self) -> Result, DatastoreError> { + let cmd = Commands::GetBuckets(); + let receiver = self.requester.request(cmd).unwrap(); + match receiver.collect().unwrap() { + Ok(r) => match r { + Responses::BucketMap(bm) => Ok(bm), + e => Err(DatastoreError::InternalError(format!("Invalid response: {:?}", e))) + }, + Err(e) => Err(e) + } + } + + pub fn insert_events(&self, bucket_id: &str, events: &[Event]) -> Result, DatastoreError> { + let cmd = Commands::InsertEvents(bucket_id.to_string(), events.to_vec()); + let receiver = self.requester.request(cmd).unwrap(); + match receiver.collect().unwrap() { + Ok(r) => match r { + Responses::EventList(events) => Ok(events), + _ => panic!("Invalid response") + }, + Err(e) => Err(e) + } + } + + pub fn heartbeat(&self, bucket_id: &str, heartbeat: Event, pulsetime: f64) -> Result { + let cmd = Commands::Heartbeat(bucket_id.to_string(), heartbeat, pulsetime); + let receiver = self.requester.request(cmd).unwrap(); + match receiver.collect().unwrap() { + Ok(r) => match r { + Responses::Event(e) => return Ok(e), + _ => panic!("Invalid response") + }, + Err(e) => Err(e) + } + } + + pub fn get_events(&self, bucket_id: &str, starttime_opt: Option>, endtime_opt: Option>, limit_opt: Option) -> Result, DatastoreError> { + let cmd = Commands::GetEvents(bucket_id.to_string(), starttime_opt.clone(), endtime_opt.clone(), limit_opt.clone()); + let receiver = self.requester.request(cmd).unwrap(); + match receiver.collect().unwrap() { + Ok(r) => match r { + Responses::EventList(el) => Ok(el), + _ => panic!("Invalid response") + }, + Err(e) => Err(e) + } + } + + pub fn get_event_count(&self, bucket_id: &str, starttime_opt: Option>, endtime_opt: Option>) -> Result { + let cmd = Commands::GetEventCount(bucket_id.to_string(), starttime_opt.clone(), endtime_opt.clone()); + let receiver = self.requester.request(cmd).unwrap(); + match receiver.collect().unwrap() { + Ok(r) => match r { + Responses::Count(n) => Ok(n), + _ => panic!("Invalid response") + }, + Err(e) => Err(e) + } + } + + pub fn force_commit(&self) -> Result<(), DatastoreError> { + let cmd = Commands::ForceCommit(); + let receiver = self.requester.request(cmd).unwrap(); + match receiver.collect().unwrap() { + Ok(r) => match r { + Responses::Empty() => Ok(()), + _ => panic!("Invalid response") + }, + Err(e) => Err(e) + } + } +}