From 58904ba170c1aaf1bb3201f623896c7b354672a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Bj=C3=A4reholt?= Date: Wed, 28 Aug 2019 13:24:15 +0200 Subject: [PATCH 01/12] binary now uses the library (removing duplicate imports), added basic entrypoint for syncing binary --- Cargo.toml | 6 ++++++ src/main.rs | 37 +------------------------------------ src/sync.rs | 9 +++++++++ 3 files changed, 16 insertions(+), 36 deletions(-) create mode 100644 src/sync.rs diff --git a/Cargo.toml b/Cargo.toml index c1acfe62..2ddb3056 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,8 +3,10 @@ name = "aw_server" version = "0.1.0" authors = ["Johan Bjäreholt ", "Erik Bjäreholt "] edition = "2018" +default-run = "aw-server-rust" [lib] +name = "activitywatch" crate-type = ["lib", "cdylib"] path = "src/lib.rs" @@ -12,6 +14,10 @@ path = "src/lib.rs" name = "aw-server-rust" path = "src/main.rs" +[[bin]] +name = "aw-sync-rust" +path = "src/sync.rs" + [dependencies] rocket = "0.4" rocket_contrib = { version = "*", default-features = false, features = ["json"] } diff --git a/src/main.rs b/src/main.rs index 0643adbf..0badd7cd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,41 +1,6 @@ -#![feature(plugin)] -#![feature(proc_macro_hygiene)] -#![feature(custom_attribute)] -#![feature(decl_macro)] -#[macro_use] extern crate rocket; -#[macro_use] extern crate rocket_contrib; -extern crate rocket_cors; -extern crate multipart; - -extern crate serde; -extern crate serde_json; -#[macro_use] extern crate serde_derive; - -extern crate rusqlite; - -extern crate crossbeam_requests; - -extern crate chrono; - -extern crate plex; - -extern crate appdirs; - -#[macro_use] extern crate lazy_static; - #[macro_use] extern crate log; -extern crate fern; - -extern crate toml; -pub mod models; -pub mod transform; -pub mod datastore; -pub mod query; -pub mod endpoints; -pub mod dirs; -pub mod logging; -pub mod config; +use activitywatch::*; fn main() { use std::path::PathBuf; diff --git a/src/sync.rs b/src/sync.rs new file mode 100644 index 00000000..3d00ef9f --- /dev/null +++ b/src/sync.rs @@ -0,0 +1,9 @@ +#[macro_use] extern crate log; + +use activitywatch::*; + +fn main() { + println!("Started aw-sync-rust..."); + datastore::Datastore::new("/tmp/test.db".to_string()); + info!("Not implemented yet, exiting"); +} From 7494895c1247361cd0e970f58bf55d294d473b48 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Bj=C3=A4reholt?= Date: Wed, 28 Aug 2019 14:01:50 +0200 Subject: [PATCH 02/12] fixed tests --- Cargo.toml | 2 +- src/main.rs | 2 +- src/sync.rs | 22 ++++++++++++++++++---- 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 2ddb3056..abe71990 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,7 @@ edition = "2018" default-run = "aw-server-rust" [lib] -name = "activitywatch" +name = "aw_server" crate-type = ["lib", "cdylib"] path = "src/lib.rs" diff --git a/src/main.rs b/src/main.rs index 0badd7cd..0580df84 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,6 @@ #[macro_use] extern crate log; -use activitywatch::*; +use aw_server::*; fn main() { use std::path::PathBuf; diff --git a/src/sync.rs b/src/sync.rs index 3d00ef9f..3e325464 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -1,9 +1,23 @@ #[macro_use] extern crate log; -use activitywatch::*; +use aw_server::*; -fn main() { +use std::fs; +use std::path::Path; + +fn main() -> std::io::Result<()> { println!("Started aw-sync-rust..."); - datastore::Datastore::new("/tmp/test.db".to_string()); - info!("Not implemented yet, exiting"); + logging::setup_logger().expect("Failed to setup logging"); + setup()?; + warn!("Not implemented yet, exiting"); + Ok(()) +} + +fn setup() -> std::io::Result<()> { + // TODO: Get path using dirs module + let p = Path::new("/tmp/aw-sync-rust/testing"); + fs::create_dir_all(p)?; + info!("Created syncing directory"); + datastore::Datastore::new(p.join("test.db").to_str().unwrap().to_string()); + Ok(()) } From c93c8a0a6241e2714fd37c6f367d1098e0373315 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Bj=C3=A4reholt?= Date: Thu, 5 Sep 2019 15:50:13 +0200 Subject: [PATCH 03/12] progress on syncing --- src/sync.rs | 132 ++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 122 insertions(+), 10 deletions(-) diff --git a/src/sync.rs b/src/sync.rs index 3e325464..a904542f 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -1,23 +1,135 @@ -#[macro_use] extern crate log; +// Basic syncing for ActivityWatch +// Based on: https://github.com/ActivityWatch/aw-server/pull/50 +// +// This does not handle any direct peer interaction/connections/networking, it works as a "bring your own folder synchronizer". +// +// It manages a sync-folder by syncing the aw-server datastore with a copy/staging datastore in the folder (one for each host). +// The sync folder is then synced with remotes using Syncthing/Dropbox/whatever. -use aw_server::*; +#[macro_use] extern crate log; use std::fs; use std::path::Path; +use chrono::{DateTime, Utc}; +use serde_json; + +use aw_server::*; +use aw_server::datastore::{Datastore, DatastoreError}; +use aw_server::models::{Event, Bucket}; + + fn main() -> std::io::Result<()> { + // What needs to be done: + // - [x] Setup local sync bucket + // - Import local buckets and sync events from aw-server (either through API or through creating a read-only Datastore) + // - Import buckets and sync events from remotes + println!("Started aw-sync-rust..."); logging::setup_logger().expect("Failed to setup logging"); - setup()?; - warn!("Not implemented yet, exiting"); - Ok(()) -} -fn setup() -> std::io::Result<()> { // TODO: Get path using dirs module - let p = Path::new("/tmp/aw-sync-rust/testing"); - fs::create_dir_all(p)?; + let sync_directory = Path::new("/tmp/aw-sync-rust/testing"); + fs::create_dir_all(sync_directory)?; info!("Created syncing directory"); - datastore::Datastore::new(p.join("test.db").to_str().unwrap().to_string()); + + let ds_local = setup(sync_directory)?; + log_buckets(&ds_local)?; + + let ds_remotes = setup_test(sync_directory)?; + log_buckets(ds_remotes.first().unwrap())?; + + // FIXME: These are not the datastores that should actually be synced, I'm just testing + for ds_from in ds_remotes { + sync_datastores(&ds_from, &ds_local); + } + + log_buckets(&ds_local)?; + + info!("Finished successfully, exiting..."); + Ok(()) +} + +fn setup(sync_directory: &Path) -> std::io::Result { + // Setup the local sync db + + // TODO: better filename + let ds = Datastore::new(sync_directory.join("test-local.db").to_str().unwrap().to_string()); + Ok(ds) +} + +fn setup_test(sync_directory: &Path) -> std::io::Result> { + let mut datastores: Vec = Vec::new(); + for n in 0..2 { + let ds = Datastore::new(sync_directory.join(format!("test-remote-{}.db", n)).to_str().unwrap().to_string()); + + // Create a bucket + let bucket_jsonstr = format!(r#"{{ + "id": "bucket-{}", + "type": "test", + "hostname": "device-{}", + "client": "test" + }}"#, n, n); + let bucket: Bucket = serde_json::from_str(&bucket_jsonstr)?; + match ds.create_bucket(&bucket) { + Ok(()) => (), + Err(e) => match e { + DatastoreError::BucketAlreadyExists => { + debug!("bucket already exists, skipping"); + } + e => panic!("woops! {:?}", e), + } + }; + + // Insert some testing events into the bucket + let events: Vec = (0..3).map(|i| { + let timestamp: DateTime = Utc::now(); + let event_jsonstr = format!(r#"{{ + "timestamp": "{}", + "duration": 0, + "data": {{"test": {} }} + }}"#, timestamp.to_rfc3339(), i); + let event = serde_json::from_str(&event_jsonstr).unwrap(); + event + }).collect::>(); + ds.insert_events(bucket.id.as_str(), &events[..]).unwrap(); + info!("Eventcount: {:?}", ds.get_event_count(bucket.id.as_str(), None, None).unwrap()); + datastores.push(ds); + }; + Ok(datastores) +} + +fn sync_datastores(ds_from: &Datastore, ds_to: &Datastore) -> () { + info!("Syncing {:?} to {:?}", ds_from, ds_to); + + let buckets_from = ds_from.get_buckets().unwrap(); + for bucket in buckets_from.values() { + // Check if bucket exists in destination, if not then create + let buckets_to = ds_to.get_buckets().unwrap(); + let new_id = format!("{}-synced", bucket.id); + if !buckets_to.contains_key(new_id.as_str()) { + let mut bucket_new = bucket.clone(); + bucket_new.id = new_id.clone(); + ds_to.create_bucket(&bucket_new).unwrap(); + } + + // Sync events + // FIXME: Events are not being saved, does the datastore worker need more time before exit? + let events: Vec = ds_from.get_events(bucket.id.as_str(), None, None, None).unwrap(); + info!("Syncing events: {:?}", events.len()); + ds_to.insert_events(new_id.as_str(), &events[..]).unwrap(); + } + () +} + +fn log_buckets(ds: &Datastore) -> std::io::Result<()> { + // Logs all buckets and some associated data for a given datastore + let buckets = ds.get_buckets().unwrap(); + info!("Buckets: {:?}", buckets.keys()); + for bucket in buckets.values() { + info!("Bucket '{}'", bucket.id.as_str()); + info!(" - eventcount: {:?}", ds.get_event_count(bucket.id.as_str(), None, None).unwrap()); + } + Ok(()) } From dee8ee51c09eb1fc88dcc73f808e2e85cddf3850 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Bj=C3=A4reholt?= Date: Tue, 27 Aug 2019 21:48:08 +0200 Subject: [PATCH 04/12] datastore: fix blinking test --- tests/datastore.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/datastore.rs b/tests/datastore.rs index 5bec3508..9b1d447d 100644 --- a/tests/datastore.rs +++ b/tests/datastore.rs @@ -54,7 +54,7 @@ mod datastore_tests { None => panic!("Expected 'None' in bucket to be replaced with current time"), Some(created) => { let now = Utc::now(); - assert!(created < now); + assert!(created <= now); assert!(created > now - Duration::seconds(10)); } }; From e09717e8c7f53d4284377e3e2b42de8cbb05c394 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Bj=C3=A4reholt?= Date: Thu, 5 Sep 2019 19:20:13 +0200 Subject: [PATCH 05/12] improved logging output, added minor test --- src/datastore/datastore.rs | 8 ++++++-- src/sync.rs | 34 +++++++++++++++++++++++++++++----- 2 files changed, 35 insertions(+), 7 deletions(-) diff --git a/src/datastore/datastore.rs b/src/datastore/datastore.rs index 8b0849c9..648af1b7 100644 --- a/src/datastore/datastore.rs +++ b/src/datastore/datastore.rs @@ -37,6 +37,7 @@ pub enum Responses { Count(i64) } +#[derive(Debug,Clone)] pub enum DatastoreMethod { Memory(), File(String), @@ -67,12 +68,13 @@ type Responder = crossbeam_requests::Responder fmt::Result { - write!(f, "Datastore()") + write!(f, "Datastore({:?})", self.method) } } @@ -677,11 +679,13 @@ impl Datastore { fn _new_internal(method: DatastoreMethod) -> Self { let (responder, requester) = crossbeam_requests::channel::>(); + let method_clone = method.clone(); let _thread = thread::spawn(move || { let mut di = DatastoreWorker::new(responder); - di.work_loop(method); + di.work_loop(method_clone); }); Datastore { + method: method, requester: requester, } } diff --git a/src/sync.rs b/src/sync.rs index a904542f..05d6c22d 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -34,22 +34,41 @@ fn main() -> std::io::Result<()> { info!("Created syncing directory"); let ds_local = setup(sync_directory)?; + info!("Set up local datastore"); log_buckets(&ds_local)?; let ds_remotes = setup_test(sync_directory)?; + info!("Set up remote datastores"); log_buckets(ds_remotes.first().unwrap())?; // FIXME: These are not the datastores that should actually be synced, I'm just testing - for ds_from in ds_remotes { + for ds_from in &ds_remotes { sync_datastores(&ds_from, &ds_local); } + std::thread::sleep(std::time::Duration::from_millis(1000)); + log_buckets(&ds_local)?; + test(&ds_local, &ds_remotes); + info!("Finished successfully, exiting..."); + + std::thread::sleep(std::time::Duration::from_millis(1000)); + Ok(()) } +fn test(ds_local: &Datastore, ds_remotes: &Vec) { + // Post-sync test + let n_remote_buckets: usize = ds_remotes.iter().map(|x| { + x.get_buckets().unwrap().len() + }).sum(); + assert!(ds_local.get_buckets().unwrap().len() == n_remote_buckets); + + // TODO: Check that number of events are equal across source and destination buckets +} + fn setup(sync_directory: &Path) -> std::io::Result { // Setup the local sync db @@ -115,7 +134,12 @@ fn sync_datastores(ds_from: &Datastore, ds_to: &Datastore) -> () { // Sync events // FIXME: Events are not being saved, does the datastore worker need more time before exit? - let events: Vec = ds_from.get_events(bucket.id.as_str(), None, None, None).unwrap(); + let events: Vec = ds_from.get_events(bucket.id.as_str(), None, None, None).unwrap().iter().map(|e| { + let mut new_e = e.clone(); + new_e.id = None; + //info!("{:?}", new_e); + new_e + }).collect(); info!("Syncing events: {:?}", events.len()); ds_to.insert_events(new_id.as_str(), &events[..]).unwrap(); } @@ -125,10 +149,10 @@ fn sync_datastores(ds_from: &Datastore, ds_to: &Datastore) -> () { fn log_buckets(ds: &Datastore) -> std::io::Result<()> { // Logs all buckets and some associated data for a given datastore let buckets = ds.get_buckets().unwrap(); - info!("Buckets: {:?}", buckets.keys()); + info!("Buckets in {:?}:", ds); for bucket in buckets.values() { - info!("Bucket '{}'", bucket.id.as_str()); - info!(" - eventcount: {:?}", ds.get_event_count(bucket.id.as_str(), None, None).unwrap()); + info!(" - {}", bucket.id.as_str()); + info!(" eventcount: {:?}", ds.get_event_count(bucket.id.as_str(), None, None).unwrap()); } Ok(()) From 7159b77fe8c9688f935b2b01b6b2973a78fec5c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Bj=C3=A4reholt?= Date: Mon, 9 Sep 2019 16:17:53 +0200 Subject: [PATCH 06/12] major progress on syncing, added tests --- .travis.yml | 2 +- Cargo.toml | 2 +- src/datastore/datastore.rs | 28 ++++++- src/lib.rs | 2 + src/logging.rs | 2 +- src/sync.rs | 161 ++++++++++++++++++++++++------------- src/sync_main.rs | 21 +++++ tests/sync.rs | 128 +++++++++++++++++++++++++++++ 8 files changed, 284 insertions(+), 62 deletions(-) create mode 100644 src/sync_main.rs create mode 100644 tests/sync.rs diff --git a/.travis.yml b/.travis.yml index 21d47b4a..2a186885 100644 --- a/.travis.yml +++ b/.travis.yml @@ -31,7 +31,7 @@ before_install: fi - if [ "$TRAVIS_OS_NAME" = "linux" ] && [ "$BUILD_ANDROID" != "true" ]; then echo "Installing coverage tools"; - cargo install cargo-kcov; + cargo install --force cargo-kcov; cargo kcov --print-install-kcov-sh | sh; fi diff --git a/Cargo.toml b/Cargo.toml index abe71990..d9aed00c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,7 @@ path = "src/main.rs" [[bin]] name = "aw-sync-rust" -path = "src/sync.rs" +path = "src/sync_main.rs" [dependencies] rocket = "0.4" diff --git a/src/datastore/datastore.rs b/src/datastore/datastore.rs index 648af1b7..f5776b9a 100644 --- a/src/datastore/datastore.rs +++ b/src/datastore/datastore.rs @@ -61,6 +61,7 @@ pub enum Commands { Heartbeat(String, Event, f64), GetEvents(String, Option>, Option>, Option), GetEventCount(String, Option>, Option>), + ForceCommit(), } type Requester = crossbeam_requests::Requester>; @@ -215,35 +216,40 @@ impl DatastoreWorker { loop { let mut request = match self.responder.poll() { Ok(r) => r, - Err(_) => { // All references to responder is gone, quit - info!("DB worker quitting"); + Err(e) => { // All references to responder is gone, quit + info!("DB worker quitting due to {:?}", e); self.quit = true; break; } }; let response = match request.body() { Commands::CreateBucket(bucket) => { + ds.commit = true; match ds.create_bucket(&transaction, bucket.clone()) { Ok(_) => Ok(Responses::Empty()), Err(e) => Err(e) } }, Commands::DeleteBucket(bucketname) => { + ds.commit = true; match ds.delete_bucket(&transaction, bucketname) { Ok(_) => Ok(Responses::Empty()), Err(e) => Err(e) } }, Commands::GetBucket(bucketname) => { + ds.commit = true; match ds.get_bucket(bucketname) { Ok(b) => Ok(Responses::Bucket(b)), Err(e) => Err(e) } }, Commands::GetBuckets() => { + ds.commit = true; Ok(Responses::BucketMap(ds.get_buckets())) }, Commands::InsertEvents(bucketname, events) => { + ds.commit = true; match ds.insert_events(&transaction, bucketname, events) { Ok(_) => { last_heartbeat.insert(bucketname.to_string(), None); // invalidate last_heartbeat cache @@ -259,17 +265,23 @@ impl DatastoreWorker { } }, Commands::GetEvents(bucketname, starttime_opt, endtime_opt, limit_opt) => { + ds.commit = true; match ds.get_events(&transaction, bucketname, *starttime_opt, *endtime_opt, *limit_opt) { Ok(el) => Ok(Responses::EventList(el)), Err(e) => Err(e) } }, Commands::GetEventCount(bucketname, starttime_opt, endtime_opt) => { + ds.commit = true; match ds.get_event_count(&transaction, bucketname, *starttime_opt, *endtime_opt) { Ok(n) => Ok(Responses::Count(n)), Err(e) => Err(e) } }, + Commands::ForceCommit() => { + ds.commit = true; + Ok(Responses::Empty()) + }, }; request.respond(response); if ds.commit || ds.uncommited_events > 100 { break }; @@ -382,7 +394,6 @@ impl DatastoreInstance { info!("Created bucket {}", inserted_bucket.id); self.buckets_cache.insert(bucket.id.clone(), inserted_bucket); - self.commit = true; }, // FIXME: This match is ugly, is it possible to write it in a cleaner way? Err(err) => match err { @@ -410,7 +421,6 @@ impl DatastoreInstance { match conn.execute("DELETE FROM buckets WHERE id = ?1", &[&bucket.bid]) { Ok(_) => { self.buckets_cache.remove(bucket_id); - self.commit = true; return Ok(()); }, Err(err) => match err { @@ -766,4 +776,14 @@ impl Datastore { Err(e) => Err(e) } } + + pub fn commit(&self) -> Result<(), DatastoreError> { + match self.requester.request(Commands::ForceCommit()) { + Ok(r) => match r { + Responses::Empty() => Ok(()), + _ => panic!("Invalid response") + }, + Err(e) => Err(e) + } + } } diff --git a/src/lib.rs b/src/lib.rs index 898685de..dbf4e65e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -40,3 +40,5 @@ pub mod config; #[cfg(target_os="android")] pub mod android; + +pub mod sync; diff --git a/src/logging.rs b/src/logging.rs index 2a2d5130..36d91128 100644 --- a/src/logging.rs +++ b/src/logging.rs @@ -38,7 +38,7 @@ pub fn setup_logger() -> Result<(), fern::InitError> { }) .chain(std::io::stdout()) .level(log::LevelFilter::Info) - .level_for("aw_server", log::LevelFilter::Debug) + //.level_for("aw_server", log::LevelFilter::Debug) ) // No color and lower log levels to logfile .chain(fern::Dispatch::new() diff --git a/src/sync.rs b/src/sync.rs index 05d6c22d..94f09402 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -1,79 +1,98 @@ -// Basic syncing for ActivityWatch -// Based on: https://github.com/ActivityWatch/aw-server/pull/50 -// -// This does not handle any direct peer interaction/connections/networking, it works as a "bring your own folder synchronizer". -// -// It manages a sync-folder by syncing the aw-server datastore with a copy/staging datastore in the folder (one for each host). -// The sync folder is then synced with remotes using Syncthing/Dropbox/whatever. - -#[macro_use] extern crate log; +/// Basic syncing for ActivityWatch +/// Based on: https://github.com/ActivityWatch/aw-server/pull/50 +/// +/// This does not handle any direct peer interaction/connections/networking, it works as a "bring your own folder synchronizer". +/// +/// It manages a sync-folder by syncing the aw-server datastore with a copy/staging datastore in the folder (one for each host). +/// The sync folder is then synced with remotes using Syncthing/Dropbox/whatever. use std::fs; use std::path::Path; +use std::collections::HashMap; -use chrono::{DateTime, Utc}; +use chrono::{DateTime, Utc, Duration}; use serde_json; -use aw_server::*; -use aw_server::datastore::{Datastore, DatastoreError}; -use aw_server::models::{Event, Bucket}; - - -fn main() -> std::io::Result<()> { - // What needs to be done: - // - [x] Setup local sync bucket - // - Import local buckets and sync events from aw-server (either through API or through creating a read-only Datastore) - // - Import buckets and sync events from remotes - - println!("Started aw-sync-rust..."); - logging::setup_logger().expect("Failed to setup logging"); +use crate::datastore::{Datastore, DatastoreError}; +use crate::models::{Event, Bucket}; +pub fn sync_run() -> std::io::Result<()> { // TODO: Get path using dirs module let sync_directory = Path::new("/tmp/aw-sync-rust/testing"); fs::create_dir_all(sync_directory)?; - info!("Created syncing directory"); let ds_local = setup(sync_directory)?; info!("Set up local datastore"); - log_buckets(&ds_local)?; + //log_buckets(&ds_local)?; let ds_remotes = setup_test(sync_directory)?; info!("Set up remote datastores"); - log_buckets(ds_remotes.first().unwrap())?; // FIXME: These are not the datastores that should actually be synced, I'm just testing for ds_from in &ds_remotes { sync_datastores(&ds_from, &ds_local); } - std::thread::sleep(std::time::Duration::from_millis(1000)); - + ds_local.commit().unwrap(); log_buckets(&ds_local)?; + for ds_from in &ds_remotes { + ds_from.commit().unwrap(); + log_buckets(&ds_from)?; + } test(&ds_local, &ds_remotes); - info!("Finished successfully, exiting..."); - - std::thread::sleep(std::time::Duration::from_millis(1000)); - Ok(()) } fn test(ds_local: &Datastore, ds_remotes: &Vec) { // Post-sync test - let n_remote_buckets: usize = ds_remotes.iter().map(|x| { - x.get_buckets().unwrap().len() - }).sum(); - assert!(ds_local.get_buckets().unwrap().len() == n_remote_buckets); + info!("Running tests..."); + + let buckets_local: HashMap = ds_local.get_buckets().unwrap(); + let buckets_remote: HashMap = ds_remotes.iter().fold(HashMap::new(), |mut acc, x| { + let buckets = x.get_buckets().unwrap(); + for bucket in buckets.values() { + let bucket_copy = bucket.clone(); + assert!(!acc.contains_key(&bucket.id)); + acc.insert(bucket_copy.id.clone(), bucket_copy.clone()); + }; + acc + }); + assert!(buckets_local.len() == buckets_remote.len()); + + let mut all_buckets: Vec<(&Datastore, Bucket)> = Vec::new(); + for bucket in buckets_local.values() { + all_buckets.push((ds_local, bucket.clone())); + } + for ds_remote in ds_remotes { + let buckets = ds_remote.get_buckets().unwrap(); + for bucket in buckets.values() { + all_buckets.push((ds_remote, bucket.clone())); + } + } - // TODO: Check that number of events are equal across source and destination buckets + let all_buckets_map: HashMap = all_buckets.iter().cloned().map(|(ds, b)| (b.id.clone(), (ds, b))).collect(); + + // Check that all synced buckets are identical to source bucket + for (ds, bucket) in all_buckets { + if bucket.id.contains("-synced") { + let bucket_src_id = bucket.id.replace("-synced", ""); + let (ds_src, bucket_src) = all_buckets_map.get(&bucket_src_id).unwrap(); + let events_synced = ds.get_events(bucket.id.as_str(), None, None, None).unwrap(); + let events_src = ds_src.get_events(bucket_src.id.as_str(), None, None, None).unwrap(); + //info!("{:?}", events_synced); + //info!("{:?}", events_src); + assert!(events_synced == events_src); + } + } } fn setup(sync_directory: &Path) -> std::io::Result { // Setup the local sync db // TODO: better filename - let ds = Datastore::new(sync_directory.join("test-local.db").to_str().unwrap().to_string()); + let ds = Datastore::new(sync_directory.join("test-local.db").into_os_string().into_string().unwrap()); Ok(ds) } @@ -102,7 +121,7 @@ fn setup_test(sync_directory: &Path) -> std::io::Result> { // Insert some testing events into the bucket let events: Vec = (0..3).map(|i| { - let timestamp: DateTime = Utc::now(); + let timestamp: DateTime = Utc::now() + Duration::milliseconds(i*10); let event_jsonstr = format!(r#"{{ "timestamp": "{}", "duration": 0, @@ -111,43 +130,75 @@ fn setup_test(sync_directory: &Path) -> std::io::Result> { let event = serde_json::from_str(&event_jsonstr).unwrap(); event }).collect::>(); + ds.insert_events(bucket.id.as_str(), &events[..]).unwrap(); - info!("Eventcount: {:?}", ds.get_event_count(bucket.id.as_str(), None, None).unwrap()); + ds.commit().unwrap(); + //let new_eventcount = ds.get_event_count(bucket.id.as_str(), None, None).unwrap(); + //info!("Eventcount: {:?} ({} new)", new_eventcount, events.len()); datastores.push(ds); }; Ok(datastores) } -fn sync_datastores(ds_from: &Datastore, ds_to: &Datastore) -> () { +fn get_or_create_sync_bucket(bucket_from: &Bucket, ds_to: &Datastore) -> Bucket { + // Check if bucket exists in destination, if not then create + let new_id = format!("{}-synced", bucket_from.id); + match ds_to.get_bucket(new_id.as_str()) { + Ok(bucket) => bucket, + Err(DatastoreError::NoSuchBucket) => { + let mut bucket_new = bucket_from.clone(); + bucket_new.id = new_id.clone(); + ds_to.create_bucket(&bucket_new).unwrap(); + ds_to.get_bucket(new_id.as_str()).unwrap() + }, + Err(e) => panic!(e), + } +} + +pub fn sync_datastores(ds_from: &Datastore, ds_to: &Datastore) -> () { info!("Syncing {:?} to {:?}", ds_from, ds_to); let buckets_from = ds_from.get_buckets().unwrap(); - for bucket in buckets_from.values() { - // Check if bucket exists in destination, if not then create - let buckets_to = ds_to.get_buckets().unwrap(); - let new_id = format!("{}-synced", bucket.id); - if !buckets_to.contains_key(new_id.as_str()) { - let mut bucket_new = bucket.clone(); - bucket_new.id = new_id.clone(); - ds_to.create_bucket(&bucket_new).unwrap(); - } + for bucket_from in buckets_from.values() { + let bucket_to = get_or_create_sync_bucket(bucket_from, ds_to); + ds_to.commit().unwrap(); + let eventcount_to_old = ds_to.get_event_count(bucket_to.id.as_str(), None, None).unwrap(); + //info!("{:?}", bucket_to); // Sync events - // FIXME: Events are not being saved, does the datastore worker need more time before exit? - let events: Vec = ds_from.get_events(bucket.id.as_str(), None, None, None).unwrap().iter().map(|e| { + // FIXME: This should use bucket_to.metadata.end, but it doesn't because it doesn't work + // for empty buckets (Should be None, is Some(unknown_time)) + // let resume_sync_at = bucket_to.metadata.end; + let most_recent_events = ds_to.get_events(bucket_to.id.as_str(), None, None, Some(1)).unwrap(); + let resume_sync_at = match most_recent_events.first() { + Some(e) => Some(e.timestamp + e.duration), + None => None, + }; + + info!("Resumed at: {:?}", resume_sync_at); + let mut events: Vec = ds_from.get_events(bucket_from.id.as_str(), resume_sync_at, None, None).unwrap().iter().map(|e| { let mut new_e = e.clone(); new_e.id = None; //info!("{:?}", new_e); new_e }).collect(); - info!("Syncing events: {:?}", events.len()); - ds_to.insert_events(new_id.as_str(), &events[..]).unwrap(); + + // Sort ascending + events.sort_by(|a, b| a.timestamp.cmp(&b.timestamp)); + //info!("{:?}", events); + for event in events { + ds_to.heartbeat(bucket_to.id.as_str(), event, 1.0).unwrap(); + } + + ds_to.commit().unwrap(); + let eventcount_to_new = ds_to.get_event_count(bucket_to.id.as_str(), None, None).unwrap(); + info!("Synced {} new events", eventcount_to_new - eventcount_to_old); } () } fn log_buckets(ds: &Datastore) -> std::io::Result<()> { - // Logs all buckets and some associated data for a given datastore + // Logs all buckets and some metadata for a given datastore let buckets = ds.get_buckets().unwrap(); info!("Buckets in {:?}:", ds); for bucket in buckets.values() { diff --git a/src/sync_main.rs b/src/sync_main.rs new file mode 100644 index 00000000..eea5ea01 --- /dev/null +++ b/src/sync_main.rs @@ -0,0 +1,21 @@ +#[macro_use] extern crate log; + +use aw_server::*; + +fn main() { + // What needs to be done: + // - [x] Setup local sync bucket + // - Import local buckets and sync events from aw-server (either through API or through creating a read-only Datastore) + // - Import buckets and sync events from remotes + + println!("Started aw-sync-rust..."); + logging::setup_logger().expect("Failed to setup logging"); + + aw_server::sync::sync_run().unwrap(); + info!("Finished successfully, exiting..."); + + // Needed to give the datastores some time to commit before program is shut down. + // 100ms isn't actually needed, seemed to work fine with as little as 10ms, but I'd rather give + // it some wiggleroom. + std::thread::sleep(std::time::Duration::from_millis(100)); +} diff --git a/tests/sync.rs b/tests/sync.rs new file mode 100644 index 00000000..704d1a2d --- /dev/null +++ b/tests/sync.rs @@ -0,0 +1,128 @@ +#[macro_use] extern crate log; + +#[cfg(test)] +mod sync_tests { + use std::collections::{HashMap}; + use chrono::{DateTime, Utc, Duration}; + + use aw_server::models::{Bucket, Event}; + use aw_server::datastore::{Datastore, DatastoreError}; + + struct TestState { + ds_src: Datastore, + ds_dest: Datastore, + } + + fn init_teststate() -> TestState { + return TestState { + ds_src: Datastore::new_in_memory(), + ds_dest: Datastore::new_in_memory(), + } + } + + fn create_bucket(ds: &Datastore, n: i32) -> String { + // Create a bucket + let bucket_id = format!("bucket-{}", n); + let bucket_jsonstr = format!(r#"{{ + "id": "{}", + "type": "test", + "hostname": "device-{}", + "client": "test" + }}"#, bucket_id, n); + let bucket: Bucket = serde_json::from_str(&bucket_jsonstr).unwrap(); + match ds.create_bucket(&bucket) { + Ok(()) => (), + Err(e) => match e { + DatastoreError::BucketAlreadyExists => { + debug!("bucket already exists, skipping"); + } + e => panic!("woops! {:?}", e), + } + }; + bucket_id + } + + fn create_events(ds: &Datastore, bucket_id: &str, n: i64) { + let events: Vec = (0..n).map(|i| { + let timestamp: DateTime = Utc::now(); + let event_jsonstr = format!(r#"{{ + "timestamp": "{}", + "duration": 0, + "data": {{"test": {} }} + }}"#, timestamp.to_rfc3339(), i); + let event = serde_json::from_str(&event_jsonstr).unwrap(); + event + }).collect::>(); + + ds.insert_events(bucket_id, &events[..]).unwrap(); + ds.commit().unwrap(); + } + + fn get_all_buckets(datastores: Vec<&Datastore>) -> Vec<(&Datastore, Bucket)> { + let mut all_buckets: Vec<(&Datastore, Bucket)> = Vec::new(); + for ds in datastores { + let buckets = ds.get_buckets().unwrap(); + for bucket in buckets.values() { + all_buckets.push((ds, bucket.clone())); + } + } + all_buckets + } + + fn get_all_buckets_map(datastores: Vec<&Datastore>) -> HashMap { + let all_buckets = get_all_buckets(datastores); + all_buckets.iter().cloned().map(|(ds, b)| (b.id.clone(), (ds, b))).collect() + } + + + #[test] + fn test_buckets_created() { + // TODO: Split up this test + let state = init_teststate(); + create_bucket(&state.ds_src, 0); + + aw_server::sync::sync_datastores(&state.ds_src, &state.ds_dest); + + let buckets_src: HashMap = state.ds_src.get_buckets().unwrap(); + let buckets_dest: HashMap = state.ds_dest.get_buckets().unwrap(); + assert!(buckets_src.len() == buckets_dest.len()); + } + + fn check_synced_buckets_equal_to_src(all_buckets_map: &HashMap) { + for (ds, bucket) in all_buckets_map.values() { + if bucket.id.contains("-synced") { + let bucket_src_id = bucket.id.replace("-synced", ""); + let (ds_src, bucket_src) = all_buckets_map.get(&bucket_src_id).unwrap(); + let events_synced = ds.get_events(bucket.id.as_str(), None, None, None).unwrap(); + let events_src = ds_src.get_events(bucket_src.id.as_str(), None, None, None).unwrap(); + println!("{:?}", events_synced); + println!("{:?}", events_src); + assert!(events_synced == events_src); + } + } + } + + #[test] + fn test_events() { + // TODO: Split up this test + let state = init_teststate(); + + let bucket_id = create_bucket(&state.ds_src, 0); + create_events(&state.ds_src, bucket_id.as_str(), 10); + + aw_server::sync::sync_datastores(&state.ds_src, &state.ds_dest); + + let all_datastores: Vec<&Datastore> = [&state.ds_src, &state.ds_dest].iter().cloned().collect(); + let all_buckets_map = get_all_buckets_map(all_datastores); + + // Check that all synced buckets are identical to source bucket + check_synced_buckets_equal_to_src(&all_buckets_map); + + // Add some more events + create_events(&state.ds_src, bucket_id.as_str(), 10); + aw_server::sync::sync_datastores(&state.ds_src, &state.ds_dest); + + check_synced_buckets_equal_to_src(&all_buckets_map); + } +} + From c143fd5bbf069bbbcddcfe3f600c4d6fe7800857 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Bj=C3=A4reholt?= Date: Mon, 9 Sep 2019 16:36:25 +0200 Subject: [PATCH 07/12] cleaned up code, added comments --- src/sync.rs | 91 ++++++++++-------------------------------------- src/sync_main.rs | 2 +- tests/sync.rs | 47 ++++++++++++++++++++----- 3 files changed, 58 insertions(+), 82 deletions(-) diff --git a/src/sync.rs b/src/sync.rs index 94f09402..f47ae8b0 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -8,7 +8,6 @@ use std::fs; use std::path::Path; -use std::collections::HashMap; use chrono::{DateTime, Utc, Duration}; use serde_json; @@ -16,16 +15,18 @@ use serde_json; use crate::datastore::{Datastore, DatastoreError}; use crate::models::{Event, Bucket}; -pub fn sync_run() -> std::io::Result<()> { +/// Performs a single sync pass +pub fn sync_run() { // TODO: Get path using dirs module let sync_directory = Path::new("/tmp/aw-sync-rust/testing"); - fs::create_dir_all(sync_directory)?; + fs::create_dir_all(sync_directory).unwrap(); - let ds_local = setup(sync_directory)?; + // TODO: Use the local datastore here, preferably passed from main + let ds_local = Datastore::new(sync_directory.join("test-local.db").into_os_string().into_string().unwrap()); info!("Set up local datastore"); //log_buckets(&ds_local)?; - let ds_remotes = setup_test(sync_directory)?; + let ds_remotes = setup_test(sync_directory).unwrap(); info!("Set up remote datastores"); // FIXME: These are not the datastores that should actually be synced, I'm just testing @@ -33,67 +34,11 @@ pub fn sync_run() -> std::io::Result<()> { sync_datastores(&ds_from, &ds_local); } - ds_local.commit().unwrap(); - log_buckets(&ds_local)?; + log_buckets(&ds_local); for ds_from in &ds_remotes { ds_from.commit().unwrap(); - log_buckets(&ds_from)?; + log_buckets(&ds_from); } - - test(&ds_local, &ds_remotes); - - Ok(()) -} - -fn test(ds_local: &Datastore, ds_remotes: &Vec) { - // Post-sync test - info!("Running tests..."); - - let buckets_local: HashMap = ds_local.get_buckets().unwrap(); - let buckets_remote: HashMap = ds_remotes.iter().fold(HashMap::new(), |mut acc, x| { - let buckets = x.get_buckets().unwrap(); - for bucket in buckets.values() { - let bucket_copy = bucket.clone(); - assert!(!acc.contains_key(&bucket.id)); - acc.insert(bucket_copy.id.clone(), bucket_copy.clone()); - }; - acc - }); - assert!(buckets_local.len() == buckets_remote.len()); - - let mut all_buckets: Vec<(&Datastore, Bucket)> = Vec::new(); - for bucket in buckets_local.values() { - all_buckets.push((ds_local, bucket.clone())); - } - for ds_remote in ds_remotes { - let buckets = ds_remote.get_buckets().unwrap(); - for bucket in buckets.values() { - all_buckets.push((ds_remote, bucket.clone())); - } - } - - let all_buckets_map: HashMap = all_buckets.iter().cloned().map(|(ds, b)| (b.id.clone(), (ds, b))).collect(); - - // Check that all synced buckets are identical to source bucket - for (ds, bucket) in all_buckets { - if bucket.id.contains("-synced") { - let bucket_src_id = bucket.id.replace("-synced", ""); - let (ds_src, bucket_src) = all_buckets_map.get(&bucket_src_id).unwrap(); - let events_synced = ds.get_events(bucket.id.as_str(), None, None, None).unwrap(); - let events_src = ds_src.get_events(bucket_src.id.as_str(), None, None, None).unwrap(); - //info!("{:?}", events_synced); - //info!("{:?}", events_src); - assert!(events_synced == events_src); - } - } -} - -fn setup(sync_directory: &Path) -> std::io::Result { - // Setup the local sync db - - // TODO: better filename - let ds = Datastore::new(sync_directory.join("test-local.db").into_os_string().into_string().unwrap()); - Ok(ds) } fn setup_test(sync_directory: &Path) -> std::io::Result> { @@ -140,9 +85,11 @@ fn setup_test(sync_directory: &Path) -> std::io::Result> { Ok(datastores) } +/// Returns the sync-destination bucket for a given bucket, creates it if it doesn't exist. fn get_or_create_sync_bucket(bucket_from: &Bucket, ds_to: &Datastore) -> Bucket { - // Check if bucket exists in destination, if not then create - let new_id = format!("{}-synced", bucket_from.id); + // Ensure the bucket ID ends in "-synced" + let new_id = format!("{}-synced", bucket_from.id.replace("-synced", "")); + match ds_to.get_bucket(new_id.as_str()) { Ok(bucket) => bucket, Err(DatastoreError::NoSuchBucket) => { @@ -155,7 +102,10 @@ fn get_or_create_sync_bucket(bucket_from: &Bucket, ds_to: &Datastore) -> Bucket } } -pub fn sync_datastores(ds_from: &Datastore, ds_to: &Datastore) -> () { +/// Syncs all buckets from `ds_from` to `ds_to` with `-synced` appended to the ID of the destination bucket. +pub fn sync_datastores(ds_from: &Datastore, ds_to: &Datastore) { + // FIXME: "-synced" should only be appended when synced to the local database, not to the + // staging area for local buckets. info!("Syncing {:?} to {:?}", ds_from, ds_to); let buckets_from = ds_from.get_buckets().unwrap(); @@ -187,24 +137,21 @@ pub fn sync_datastores(ds_from: &Datastore, ds_to: &Datastore) -> () { events.sort_by(|a, b| a.timestamp.cmp(&b.timestamp)); //info!("{:?}", events); for event in events { - ds_to.heartbeat(bucket_to.id.as_str(), event, 1.0).unwrap(); + ds_to.heartbeat(bucket_to.id.as_str(), event, 0.0).unwrap(); } ds_to.commit().unwrap(); let eventcount_to_new = ds_to.get_event_count(bucket_to.id.as_str(), None, None).unwrap(); info!("Synced {} new events", eventcount_to_new - eventcount_to_old); } - () } -fn log_buckets(ds: &Datastore) -> std::io::Result<()> { +fn log_buckets(ds: &Datastore) { // Logs all buckets and some metadata for a given datastore let buckets = ds.get_buckets().unwrap(); info!("Buckets in {:?}:", ds); for bucket in buckets.values() { info!(" - {}", bucket.id.as_str()); info!(" eventcount: {:?}", ds.get_event_count(bucket.id.as_str(), None, None).unwrap()); - } - - Ok(()) + }; } diff --git a/src/sync_main.rs b/src/sync_main.rs index eea5ea01..7d8a2326 100644 --- a/src/sync_main.rs +++ b/src/sync_main.rs @@ -11,7 +11,7 @@ fn main() { println!("Started aw-sync-rust..."); logging::setup_logger().expect("Failed to setup logging"); - aw_server::sync::sync_run().unwrap(); + aw_server::sync::sync_run(); info!("Finished successfully, exiting..."); // Needed to give the datastores some time to commit before program is shut down. diff --git a/tests/sync.rs b/tests/sync.rs index 704d1a2d..2708cc96 100644 --- a/tests/sync.rs +++ b/tests/sync.rs @@ -42,16 +42,20 @@ mod sync_tests { bucket_id } + fn create_event(data_str: &str) -> Event { + let timestamp: DateTime = Utc::now(); + let event_jsonstr = format!(r#"{{ + "timestamp": "{}", + "duration": 0, + "data": {{"test": {} }} + }}"#, timestamp.to_rfc3339(), data_str); + let event = serde_json::from_str(&event_jsonstr).unwrap(); + event + } + fn create_events(ds: &Datastore, bucket_id: &str, n: i64) { let events: Vec = (0..n).map(|i| { - let timestamp: DateTime = Utc::now(); - let event_jsonstr = format!(r#"{{ - "timestamp": "{}", - "duration": 0, - "data": {{"test": {} }} - }}"#, timestamp.to_rfc3339(), i); - let event = serde_json::from_str(&event_jsonstr).unwrap(); - event + create_event(format!("{}", i).as_str()) }).collect::>(); ds.insert_events(bucket_id, &events[..]).unwrap(); @@ -102,9 +106,33 @@ mod sync_tests { } } + #[test] + fn test_one_updated_event() { + // This tests the syncing of one single event that is then updated by a heartbeat after the + // first sync pass. + let state = init_teststate(); + + let bucket_id = create_bucket(&state.ds_src, 0); + state.ds_src.heartbeat(bucket_id.as_str(), create_event("1"), 1.0).unwrap(); + + aw_server::sync::sync_datastores(&state.ds_src, &state.ds_dest); + + let all_datastores: Vec<&Datastore> = [&state.ds_src, &state.ds_dest].iter().cloned().collect(); + let all_buckets_map = get_all_buckets_map(all_datastores); + + // Check that all synced buckets are identical to source bucket + check_synced_buckets_equal_to_src(&all_buckets_map); + + // Add some more events + state.ds_src.heartbeat(bucket_id.as_str(), create_event("1"), 1.0).unwrap(); + aw_server::sync::sync_datastores(&state.ds_src, &state.ds_dest); + + // Check again that new events were indeed synced + check_synced_buckets_equal_to_src(&all_buckets_map); + } + #[test] fn test_events() { - // TODO: Split up this test let state = init_teststate(); let bucket_id = create_bucket(&state.ds_src, 0); @@ -122,6 +150,7 @@ mod sync_tests { create_events(&state.ds_src, bucket_id.as_str(), 10); aw_server::sync::sync_datastores(&state.ds_src, &state.ds_dest); + // Check again that new events were indeed synced check_synced_buckets_equal_to_src(&all_buckets_map); } } From 94cb00d1bca8fcc5701caea45b79390de557d8dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Bj=C3=A4reholt?= Date: Mon, 9 Sep 2019 16:45:26 +0200 Subject: [PATCH 08/12] added workaround for low-precision clocks --- tests/sync.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tests/sync.rs b/tests/sync.rs index 2708cc96..336b06c6 100644 --- a/tests/sync.rs +++ b/tests/sync.rs @@ -43,14 +43,17 @@ mod sync_tests { } fn create_event(data_str: &str) -> Event { + // A workaround needed because otherwise events might get same timestamp if + // call is repeated quickly on platforms with a low-precision clock. + std::thread::sleep(std::time::Duration::from_millis(5)); + let timestamp: DateTime = Utc::now(); let event_jsonstr = format!(r#"{{ "timestamp": "{}", "duration": 0, "data": {{"test": {} }} }}"#, timestamp.to_rfc3339(), data_str); - let event = serde_json::from_str(&event_jsonstr).unwrap(); - event + serde_json::from_str(&event_jsonstr).unwrap() } fn create_events(ds: &Datastore, bucket_id: &str, n: i64) { From dc9d88dbb3e85864232d72d23c89cb22d9db5160 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Bj=C3=A4reholt?= Date: Wed, 11 Sep 2019 09:27:09 +0200 Subject: [PATCH 09/12] moved sync-files into subfolder --- Cargo.toml | 2 +- src/{sync_main.rs => sync/main.rs} | 0 src/{sync.rs => sync/mod.rs} | 0 3 files changed, 1 insertion(+), 1 deletion(-) rename src/{sync_main.rs => sync/main.rs} (100%) rename src/{sync.rs => sync/mod.rs} (100%) diff --git a/Cargo.toml b/Cargo.toml index d9aed00c..0bb0d36f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,7 @@ path = "src/main.rs" [[bin]] name = "aw-sync-rust" -path = "src/sync_main.rs" +path = "src/sync/main.rs" [dependencies] rocket = "0.4" diff --git a/src/sync_main.rs b/src/sync/main.rs similarity index 100% rename from src/sync_main.rs rename to src/sync/main.rs diff --git a/src/sync.rs b/src/sync/mod.rs similarity index 100% rename from src/sync.rs rename to src/sync/mod.rs From 7c16e894105fd4ee4685c75f79a04789685a6bd3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Bj=C3=A4reholt?= Date: Mon, 16 Sep 2019 10:54:40 +0200 Subject: [PATCH 10/12] renamed commit to force_commit --- src/datastore/datastore.rs | 2 +- src/sync/mod.rs | 8 ++++---- tests/sync.rs | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/datastore/datastore.rs b/src/datastore/datastore.rs index 0d100e8b..9efcb2e6 100644 --- a/src/datastore/datastore.rs +++ b/src/datastore/datastore.rs @@ -803,7 +803,7 @@ impl Datastore { } } - pub fn commit(&self) -> Result<(), DatastoreError> { + pub fn force_commit(&self) -> Result<(), DatastoreError> { let cmd = Commands::ForceCommit(); let receiver = self.requester.request(cmd).unwrap(); match receiver.collect().unwrap() { diff --git a/src/sync/mod.rs b/src/sync/mod.rs index f47ae8b0..81e2de62 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -36,7 +36,7 @@ pub fn sync_run() { log_buckets(&ds_local); for ds_from in &ds_remotes { - ds_from.commit().unwrap(); + ds_from.force_commit().unwrap(); log_buckets(&ds_from); } } @@ -77,7 +77,7 @@ fn setup_test(sync_directory: &Path) -> std::io::Result> { }).collect::>(); ds.insert_events(bucket.id.as_str(), &events[..]).unwrap(); - ds.commit().unwrap(); + ds.force_commit().unwrap(); //let new_eventcount = ds.get_event_count(bucket.id.as_str(), None, None).unwrap(); //info!("Eventcount: {:?} ({} new)", new_eventcount, events.len()); datastores.push(ds); @@ -111,7 +111,7 @@ pub fn sync_datastores(ds_from: &Datastore, ds_to: &Datastore) { let buckets_from = ds_from.get_buckets().unwrap(); for bucket_from in buckets_from.values() { let bucket_to = get_or_create_sync_bucket(bucket_from, ds_to); - ds_to.commit().unwrap(); + ds_to.force_commit().unwrap(); let eventcount_to_old = ds_to.get_event_count(bucket_to.id.as_str(), None, None).unwrap(); //info!("{:?}", bucket_to); @@ -140,7 +140,7 @@ pub fn sync_datastores(ds_from: &Datastore, ds_to: &Datastore) { ds_to.heartbeat(bucket_to.id.as_str(), event, 0.0).unwrap(); } - ds_to.commit().unwrap(); + ds_to.force_commit().unwrap(); let eventcount_to_new = ds_to.get_event_count(bucket_to.id.as_str(), None, None).unwrap(); info!("Synced {} new events", eventcount_to_new - eventcount_to_old); } diff --git a/tests/sync.rs b/tests/sync.rs index 336b06c6..693a7116 100644 --- a/tests/sync.rs +++ b/tests/sync.rs @@ -62,7 +62,7 @@ mod sync_tests { }).collect::>(); ds.insert_events(bucket_id, &events[..]).unwrap(); - ds.commit().unwrap(); + ds.force_commit().unwrap(); } fn get_all_buckets(datastores: Vec<&Datastore>) -> Vec<(&Datastore, Bucket)> { From 672e20dcb65c2f5c31157011b7978c6d5c677b21 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Bj=C3=A4reholt?= Date: Mon, 16 Sep 2019 14:35:06 +0200 Subject: [PATCH 11/12] added sync origin to bucket data --- src/sync/mod.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 81e2de62..d474d276 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -95,6 +95,9 @@ fn get_or_create_sync_bucket(bucket_from: &Bucket, ds_to: &Datastore) -> Bucket Err(DatastoreError::NoSuchBucket) => { let mut bucket_new = bucket_from.clone(); bucket_new.id = new_id.clone(); + // TODO: Replace sync origin with hostname/GUID and discuss how we will treat the data + // attributes for internal use. + bucket_new.data.insert("$aw.sync.origin".to_string(), json!("test")); ds_to.create_bucket(&bucket_new).unwrap(); ds_to.get_bucket(new_id.as_str()).unwrap() }, From 6280ada12f17c0d20bb15c859d16c9751a44a403 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Bj=C3=A4reholt?= Date: Fri, 20 Sep 2019 11:04:03 +0200 Subject: [PATCH 12/12] fixed type error and got cfg'd out lazy_static on non-Android platforms --- src/lib.rs | 1 + src/sync/mod.rs | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index dbf4e65e..5dce0272 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -21,6 +21,7 @@ extern crate plex; extern crate appdirs; +#[cfg(target_os="android")] #[macro_use] extern crate lazy_static; #[macro_use] extern crate log; diff --git a/src/sync/mod.rs b/src/sync/mod.rs index d474d276..1f0ff1ab 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -97,7 +97,7 @@ fn get_or_create_sync_bucket(bucket_from: &Bucket, ds_to: &Datastore) -> Bucket bucket_new.id = new_id.clone(); // TODO: Replace sync origin with hostname/GUID and discuss how we will treat the data // attributes for internal use. - bucket_new.data.insert("$aw.sync.origin".to_string(), json!("test")); + bucket_new.data.insert("$aw.sync.origin".to_string(), serde_json::json!("test")); ds_to.create_bucket(&bucket_new).unwrap(); ds_to.get_bucket(new_id.as_str()).unwrap() },