diff --git a/.travis.yml b/.travis.yml index 21d47b4a..2a186885 100644 --- a/.travis.yml +++ b/.travis.yml @@ -31,7 +31,7 @@ before_install: fi - if [ "$TRAVIS_OS_NAME" = "linux" ] && [ "$BUILD_ANDROID" != "true" ]; then echo "Installing coverage tools"; - cargo install cargo-kcov; + cargo install --force cargo-kcov; cargo kcov --print-install-kcov-sh | sh; fi diff --git a/Cargo.toml b/Cargo.toml index 0105fd1a..3d950feb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,8 +3,10 @@ name = "aw_server" version = "0.1.0" authors = ["Johan Bjäreholt ", "Erik Bjäreholt "] edition = "2018" +default-run = "aw-server-rust" [lib] +name = "aw_server" crate-type = ["lib", "cdylib"] path = "src/lib.rs" @@ -12,6 +14,10 @@ path = "src/lib.rs" name = "aw-server-rust" path = "src/main.rs" +[[bin]] +name = "aw-sync-rust" +path = "src/sync/main.rs" + [dependencies] rocket = "0.4" rocket_contrib = { version = "*", default-features = false, features = ["json"] } diff --git a/src/datastore/datastore.rs b/src/datastore/datastore.rs index 44651c79..9efcb2e6 100644 --- a/src/datastore/datastore.rs +++ b/src/datastore/datastore.rs @@ -37,6 +37,7 @@ pub enum Responses { Count(i64) } +#[derive(Debug,Clone)] pub enum DatastoreMethod { Memory(), File(String), @@ -60,6 +61,7 @@ pub enum Commands { Heartbeat(String, Event, f64), GetEvents(String, Option>, Option>, Option), GetEventCount(String, Option>, Option>), + ForceCommit(), } type RequestSender = crossbeam_requests::RequestSender>; @@ -67,12 +69,13 @@ type RequestReceiver = crossbeam_requests::RequestReceiver fmt::Result { - write!(f, "Datastore()") + write!(f, "Datastore({:?})", self.method) } } @@ -213,35 +216,40 @@ impl DatastoreWorker { loop { let (request, response_sender) = match self.responder.poll() { Ok((req, res_sender)) => (req, res_sender), - Err(_) => { // All references to responder is gone, quit - info!("DB worker quitting"); + Err(e) => { // All references to responder is gone, quit + info!("DB worker quitting due to {:?}", e); self.quit = true; break; } }; let response = match request { Commands::CreateBucket(bucket) => { + ds.commit = true; match ds.create_bucket(&transaction, bucket) { Ok(_) => Ok(Responses::Empty()), Err(e) => Err(e) } }, Commands::DeleteBucket(bucketname) => { + ds.commit = true; match ds.delete_bucket(&transaction, &bucketname) { Ok(_) => Ok(Responses::Empty()), Err(e) => Err(e) } }, Commands::GetBucket(bucketname) => { + ds.commit = true; match ds.get_bucket(&bucketname) { Ok(b) => Ok(Responses::Bucket(b)), Err(e) => Err(e) } }, Commands::GetBuckets() => { + ds.commit = true; Ok(Responses::BucketMap(ds.get_buckets())) }, Commands::InsertEvents(bucketname, events) => { + ds.commit = true; match ds.insert_events(&transaction, &bucketname, events) { Ok(events) => { last_heartbeat.insert(bucketname.to_string(), None); // invalidate last_heartbeat cache @@ -257,17 +265,23 @@ impl DatastoreWorker { } }, Commands::GetEvents(bucketname, starttime_opt, endtime_opt, limit_opt) => { + ds.commit = true; match ds.get_events(&transaction, &bucketname, starttime_opt, endtime_opt, limit_opt) { Ok(el) => Ok(Responses::EventList(el)), Err(e) => Err(e) } }, Commands::GetEventCount(bucketname, starttime_opt, endtime_opt) => { + ds.commit = true; match ds.get_event_count(&transaction, &bucketname, starttime_opt, endtime_opt) { Ok(n) => Ok(Responses::Count(n)), Err(e) => Err(e) } }, + Commands::ForceCommit() => { + ds.commit = true; + Ok(Responses::Empty()) + }, }; response_sender.respond(response); if ds.commit || ds.uncommited_events > 100 { break }; @@ -413,7 +427,6 @@ impl DatastoreInstance { match conn.execute("DELETE FROM buckets WHERE id = ?1", &[&bucket.bid]) { Ok(_) => { self.buckets_cache.remove(bucket_id); - self.commit = true; return Ok(()); }, Err(err) => match err { @@ -686,11 +699,13 @@ impl Datastore { fn _new_internal(method: DatastoreMethod) -> Self { let (requester, responder) = crossbeam_requests::channel::>(); + let method_clone = method.clone(); let _thread = thread::spawn(move || { let mut di = DatastoreWorker::new(responder); - di.work_loop(method); + di.work_loop(method_clone); }); Datastore { + method, requester, } } @@ -787,4 +802,16 @@ impl Datastore { Err(e) => Err(e) } } + + pub fn force_commit(&self) -> Result<(), DatastoreError> { + let cmd = Commands::ForceCommit(); + let receiver = self.requester.request(cmd).unwrap(); + match receiver.collect().unwrap() { + Ok(r) => match r { + Responses::Empty() => Ok(()), + _ => panic!("Invalid response") + }, + Err(e) => Err(e) + } + } } diff --git a/src/lib.rs b/src/lib.rs index 898685de..5dce0272 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -21,6 +21,7 @@ extern crate plex; extern crate appdirs; +#[cfg(target_os="android")] #[macro_use] extern crate lazy_static; #[macro_use] extern crate log; @@ -40,3 +41,5 @@ pub mod config; #[cfg(target_os="android")] pub mod android; + +pub mod sync; diff --git a/src/logging.rs b/src/logging.rs index 2a2d5130..36d91128 100644 --- a/src/logging.rs +++ b/src/logging.rs @@ -38,7 +38,7 @@ pub fn setup_logger() -> Result<(), fern::InitError> { }) .chain(std::io::stdout()) .level(log::LevelFilter::Info) - .level_for("aw_server", log::LevelFilter::Debug) + //.level_for("aw_server", log::LevelFilter::Debug) ) // No color and lower log levels to logfile .chain(fern::Dispatch::new() diff --git a/src/main.rs b/src/main.rs index c944a70a..7fa693cf 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,41 +1,6 @@ -#![feature(plugin)] -#![feature(proc_macro_hygiene)] -#![feature(custom_attribute)] -#![feature(decl_macro)] -#[macro_use] extern crate rocket; -#[macro_use] extern crate rocket_contrib; -extern crate rocket_cors; -extern crate multipart; - -extern crate serde; -extern crate serde_json; -#[macro_use] extern crate serde_derive; - -extern crate rusqlite; - -extern crate crossbeam_requests; - -extern crate chrono; - -extern crate plex; - -extern crate appdirs; - -#[macro_use] extern crate lazy_static; - #[macro_use] extern crate log; -extern crate fern; - -extern crate toml; - -pub mod models; -pub mod transform; -pub mod datastore; -pub mod query; -pub mod endpoints; -pub mod dirs; -pub mod logging; -pub mod config; + +use aw_server::*; fn main() { use std::sync::Mutex; diff --git a/src/sync/main.rs b/src/sync/main.rs new file mode 100644 index 00000000..7d8a2326 --- /dev/null +++ b/src/sync/main.rs @@ -0,0 +1,21 @@ +#[macro_use] extern crate log; + +use aw_server::*; + +fn main() { + // What needs to be done: + // - [x] Setup local sync bucket + // - Import local buckets and sync events from aw-server (either through API or through creating a read-only Datastore) + // - Import buckets and sync events from remotes + + println!("Started aw-sync-rust..."); + logging::setup_logger().expect("Failed to setup logging"); + + aw_server::sync::sync_run(); + info!("Finished successfully, exiting..."); + + // Needed to give the datastores some time to commit before program is shut down. + // 100ms isn't actually needed, seemed to work fine with as little as 10ms, but I'd rather give + // it some wiggleroom. + std::thread::sleep(std::time::Duration::from_millis(100)); +} diff --git a/src/sync/mod.rs b/src/sync/mod.rs new file mode 100644 index 00000000..1f0ff1ab --- /dev/null +++ b/src/sync/mod.rs @@ -0,0 +1,160 @@ +/// Basic syncing for ActivityWatch +/// Based on: https://github.com/ActivityWatch/aw-server/pull/50 +/// +/// This does not handle any direct peer interaction/connections/networking, it works as a "bring your own folder synchronizer". +/// +/// It manages a sync-folder by syncing the aw-server datastore with a copy/staging datastore in the folder (one for each host). +/// The sync folder is then synced with remotes using Syncthing/Dropbox/whatever. + +use std::fs; +use std::path::Path; + +use chrono::{DateTime, Utc, Duration}; +use serde_json; + +use crate::datastore::{Datastore, DatastoreError}; +use crate::models::{Event, Bucket}; + +/// Performs a single sync pass +pub fn sync_run() { + // TODO: Get path using dirs module + let sync_directory = Path::new("/tmp/aw-sync-rust/testing"); + fs::create_dir_all(sync_directory).unwrap(); + + // TODO: Use the local datastore here, preferably passed from main + let ds_local = Datastore::new(sync_directory.join("test-local.db").into_os_string().into_string().unwrap()); + info!("Set up local datastore"); + //log_buckets(&ds_local)?; + + let ds_remotes = setup_test(sync_directory).unwrap(); + info!("Set up remote datastores"); + + // FIXME: These are not the datastores that should actually be synced, I'm just testing + for ds_from in &ds_remotes { + sync_datastores(&ds_from, &ds_local); + } + + log_buckets(&ds_local); + for ds_from in &ds_remotes { + ds_from.force_commit().unwrap(); + log_buckets(&ds_from); + } +} + +fn setup_test(sync_directory: &Path) -> std::io::Result> { + let mut datastores: Vec = Vec::new(); + for n in 0..2 { + let ds = Datastore::new(sync_directory.join(format!("test-remote-{}.db", n)).to_str().unwrap().to_string()); + + // Create a bucket + let bucket_jsonstr = format!(r#"{{ + "id": "bucket-{}", + "type": "test", + "hostname": "device-{}", + "client": "test" + }}"#, n, n); + let bucket: Bucket = serde_json::from_str(&bucket_jsonstr)?; + match ds.create_bucket(&bucket) { + Ok(()) => (), + Err(e) => match e { + DatastoreError::BucketAlreadyExists => { + debug!("bucket already exists, skipping"); + } + e => panic!("woops! {:?}", e), + } + }; + + // Insert some testing events into the bucket + let events: Vec = (0..3).map(|i| { + let timestamp: DateTime = Utc::now() + Duration::milliseconds(i*10); + let event_jsonstr = format!(r#"{{ + "timestamp": "{}", + "duration": 0, + "data": {{"test": {} }} + }}"#, timestamp.to_rfc3339(), i); + let event = serde_json::from_str(&event_jsonstr).unwrap(); + event + }).collect::>(); + + ds.insert_events(bucket.id.as_str(), &events[..]).unwrap(); + ds.force_commit().unwrap(); + //let new_eventcount = ds.get_event_count(bucket.id.as_str(), None, None).unwrap(); + //info!("Eventcount: {:?} ({} new)", new_eventcount, events.len()); + datastores.push(ds); + }; + Ok(datastores) +} + +/// Returns the sync-destination bucket for a given bucket, creates it if it doesn't exist. +fn get_or_create_sync_bucket(bucket_from: &Bucket, ds_to: &Datastore) -> Bucket { + // Ensure the bucket ID ends in "-synced" + let new_id = format!("{}-synced", bucket_from.id.replace("-synced", "")); + + match ds_to.get_bucket(new_id.as_str()) { + Ok(bucket) => bucket, + Err(DatastoreError::NoSuchBucket) => { + let mut bucket_new = bucket_from.clone(); + bucket_new.id = new_id.clone(); + // TODO: Replace sync origin with hostname/GUID and discuss how we will treat the data + // attributes for internal use. + bucket_new.data.insert("$aw.sync.origin".to_string(), serde_json::json!("test")); + ds_to.create_bucket(&bucket_new).unwrap(); + ds_to.get_bucket(new_id.as_str()).unwrap() + }, + Err(e) => panic!(e), + } +} + +/// Syncs all buckets from `ds_from` to `ds_to` with `-synced` appended to the ID of the destination bucket. +pub fn sync_datastores(ds_from: &Datastore, ds_to: &Datastore) { + // FIXME: "-synced" should only be appended when synced to the local database, not to the + // staging area for local buckets. + info!("Syncing {:?} to {:?}", ds_from, ds_to); + + let buckets_from = ds_from.get_buckets().unwrap(); + for bucket_from in buckets_from.values() { + let bucket_to = get_or_create_sync_bucket(bucket_from, ds_to); + ds_to.force_commit().unwrap(); + let eventcount_to_old = ds_to.get_event_count(bucket_to.id.as_str(), None, None).unwrap(); + //info!("{:?}", bucket_to); + + // Sync events + // FIXME: This should use bucket_to.metadata.end, but it doesn't because it doesn't work + // for empty buckets (Should be None, is Some(unknown_time)) + // let resume_sync_at = bucket_to.metadata.end; + let most_recent_events = ds_to.get_events(bucket_to.id.as_str(), None, None, Some(1)).unwrap(); + let resume_sync_at = match most_recent_events.first() { + Some(e) => Some(e.timestamp + e.duration), + None => None, + }; + + info!("Resumed at: {:?}", resume_sync_at); + let mut events: Vec = ds_from.get_events(bucket_from.id.as_str(), resume_sync_at, None, None).unwrap().iter().map(|e| { + let mut new_e = e.clone(); + new_e.id = None; + //info!("{:?}", new_e); + new_e + }).collect(); + + // Sort ascending + events.sort_by(|a, b| a.timestamp.cmp(&b.timestamp)); + //info!("{:?}", events); + for event in events { + ds_to.heartbeat(bucket_to.id.as_str(), event, 0.0).unwrap(); + } + + ds_to.force_commit().unwrap(); + let eventcount_to_new = ds_to.get_event_count(bucket_to.id.as_str(), None, None).unwrap(); + info!("Synced {} new events", eventcount_to_new - eventcount_to_old); + } +} + +fn log_buckets(ds: &Datastore) { + // Logs all buckets and some metadata for a given datastore + let buckets = ds.get_buckets().unwrap(); + info!("Buckets in {:?}:", ds); + for bucket in buckets.values() { + info!(" - {}", bucket.id.as_str()); + info!(" eventcount: {:?}", ds.get_event_count(bucket.id.as_str(), None, None).unwrap()); + }; +} diff --git a/tests/sync.rs b/tests/sync.rs new file mode 100644 index 00000000..693a7116 --- /dev/null +++ b/tests/sync.rs @@ -0,0 +1,160 @@ +#[macro_use] extern crate log; + +#[cfg(test)] +mod sync_tests { + use std::collections::{HashMap}; + use chrono::{DateTime, Utc, Duration}; + + use aw_server::models::{Bucket, Event}; + use aw_server::datastore::{Datastore, DatastoreError}; + + struct TestState { + ds_src: Datastore, + ds_dest: Datastore, + } + + fn init_teststate() -> TestState { + return TestState { + ds_src: Datastore::new_in_memory(), + ds_dest: Datastore::new_in_memory(), + } + } + + fn create_bucket(ds: &Datastore, n: i32) -> String { + // Create a bucket + let bucket_id = format!("bucket-{}", n); + let bucket_jsonstr = format!(r#"{{ + "id": "{}", + "type": "test", + "hostname": "device-{}", + "client": "test" + }}"#, bucket_id, n); + let bucket: Bucket = serde_json::from_str(&bucket_jsonstr).unwrap(); + match ds.create_bucket(&bucket) { + Ok(()) => (), + Err(e) => match e { + DatastoreError::BucketAlreadyExists => { + debug!("bucket already exists, skipping"); + } + e => panic!("woops! {:?}", e), + } + }; + bucket_id + } + + fn create_event(data_str: &str) -> Event { + // A workaround needed because otherwise events might get same timestamp if + // call is repeated quickly on platforms with a low-precision clock. + std::thread::sleep(std::time::Duration::from_millis(5)); + + let timestamp: DateTime = Utc::now(); + let event_jsonstr = format!(r#"{{ + "timestamp": "{}", + "duration": 0, + "data": {{"test": {} }} + }}"#, timestamp.to_rfc3339(), data_str); + serde_json::from_str(&event_jsonstr).unwrap() + } + + fn create_events(ds: &Datastore, bucket_id: &str, n: i64) { + let events: Vec = (0..n).map(|i| { + create_event(format!("{}", i).as_str()) + }).collect::>(); + + ds.insert_events(bucket_id, &events[..]).unwrap(); + ds.force_commit().unwrap(); + } + + fn get_all_buckets(datastores: Vec<&Datastore>) -> Vec<(&Datastore, Bucket)> { + let mut all_buckets: Vec<(&Datastore, Bucket)> = Vec::new(); + for ds in datastores { + let buckets = ds.get_buckets().unwrap(); + for bucket in buckets.values() { + all_buckets.push((ds, bucket.clone())); + } + } + all_buckets + } + + fn get_all_buckets_map(datastores: Vec<&Datastore>) -> HashMap { + let all_buckets = get_all_buckets(datastores); + all_buckets.iter().cloned().map(|(ds, b)| (b.id.clone(), (ds, b))).collect() + } + + + #[test] + fn test_buckets_created() { + // TODO: Split up this test + let state = init_teststate(); + create_bucket(&state.ds_src, 0); + + aw_server::sync::sync_datastores(&state.ds_src, &state.ds_dest); + + let buckets_src: HashMap = state.ds_src.get_buckets().unwrap(); + let buckets_dest: HashMap = state.ds_dest.get_buckets().unwrap(); + assert!(buckets_src.len() == buckets_dest.len()); + } + + fn check_synced_buckets_equal_to_src(all_buckets_map: &HashMap) { + for (ds, bucket) in all_buckets_map.values() { + if bucket.id.contains("-synced") { + let bucket_src_id = bucket.id.replace("-synced", ""); + let (ds_src, bucket_src) = all_buckets_map.get(&bucket_src_id).unwrap(); + let events_synced = ds.get_events(bucket.id.as_str(), None, None, None).unwrap(); + let events_src = ds_src.get_events(bucket_src.id.as_str(), None, None, None).unwrap(); + println!("{:?}", events_synced); + println!("{:?}", events_src); + assert!(events_synced == events_src); + } + } + } + + #[test] + fn test_one_updated_event() { + // This tests the syncing of one single event that is then updated by a heartbeat after the + // first sync pass. + let state = init_teststate(); + + let bucket_id = create_bucket(&state.ds_src, 0); + state.ds_src.heartbeat(bucket_id.as_str(), create_event("1"), 1.0).unwrap(); + + aw_server::sync::sync_datastores(&state.ds_src, &state.ds_dest); + + let all_datastores: Vec<&Datastore> = [&state.ds_src, &state.ds_dest].iter().cloned().collect(); + let all_buckets_map = get_all_buckets_map(all_datastores); + + // Check that all synced buckets are identical to source bucket + check_synced_buckets_equal_to_src(&all_buckets_map); + + // Add some more events + state.ds_src.heartbeat(bucket_id.as_str(), create_event("1"), 1.0).unwrap(); + aw_server::sync::sync_datastores(&state.ds_src, &state.ds_dest); + + // Check again that new events were indeed synced + check_synced_buckets_equal_to_src(&all_buckets_map); + } + + #[test] + fn test_events() { + let state = init_teststate(); + + let bucket_id = create_bucket(&state.ds_src, 0); + create_events(&state.ds_src, bucket_id.as_str(), 10); + + aw_server::sync::sync_datastores(&state.ds_src, &state.ds_dest); + + let all_datastores: Vec<&Datastore> = [&state.ds_src, &state.ds_dest].iter().cloned().collect(); + let all_buckets_map = get_all_buckets_map(all_datastores); + + // Check that all synced buckets are identical to source bucket + check_synced_buckets_equal_to_src(&all_buckets_map); + + // Add some more events + create_events(&state.ds_src, bucket_id.as_str(), 10); + aw_server::sync::sync_datastores(&state.ds_src, &state.ds_dest); + + // Check again that new events were indeed synced + check_synced_buckets_equal_to_src(&all_buckets_map); + } +} +