Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ before_install:
fi
- if [ "$TRAVIS_OS_NAME" = "linux" ] && [ "$BUILD_ANDROID" != "true" ]; then
echo "Installing coverage tools";
cargo install cargo-kcov;
cargo install --force cargo-kcov;
cargo kcov --print-install-kcov-sh | sh;
fi

Expand Down
6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,21 @@ name = "aw_server"
version = "0.1.0"
authors = ["Johan Bjäreholt <johan@bjareho.lt>", "Erik Bjäreholt <erik@bjareho.lt>"]
edition = "2018"
default-run = "aw-server-rust"

[lib]
name = "aw_server"
crate-type = ["lib", "cdylib"]
path = "src/lib.rs"

[[bin]]
name = "aw-server-rust"
path = "src/main.rs"

[[bin]]
name = "aw-sync-rust"
path = "src/sync/main.rs"

[dependencies]
rocket = "0.4"
rocket_contrib = { version = "*", default-features = false, features = ["json"] }
Expand Down
37 changes: 32 additions & 5 deletions src/datastore/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub enum Responses {
Count(i64)
}

#[derive(Debug,Clone)]
pub enum DatastoreMethod {
Memory(),
File(String),
Expand All @@ -60,19 +61,21 @@ pub enum Commands {
Heartbeat(String, Event, f64),
GetEvents(String, Option<DateTime<Utc>>, Option<DateTime<Utc>>, Option<u64>),
GetEventCount(String, Option<DateTime<Utc>>, Option<DateTime<Utc>>),
ForceCommit(),
}

type RequestSender = crossbeam_requests::RequestSender<Commands, Result<Responses, DatastoreError>>;
type RequestReceiver = crossbeam_requests::RequestReceiver<Commands, Result<Responses, DatastoreError>>;

#[derive(Clone)]
pub struct Datastore {
method: DatastoreMethod,
requester: RequestSender,
}

impl fmt::Debug for Datastore {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Datastore()")
write!(f, "Datastore({:?})", self.method)
}
}

Expand Down Expand Up @@ -213,35 +216,40 @@ impl DatastoreWorker {
loop {
let (request, response_sender) = match self.responder.poll() {
Ok((req, res_sender)) => (req, res_sender),
Err(_) => { // All references to responder is gone, quit
info!("DB worker quitting");
Err(e) => { // All references to responder is gone, quit
info!("DB worker quitting due to {:?}", e);
self.quit = true;
break;
}
};
let response = match request {
Commands::CreateBucket(bucket) => {
ds.commit = true;
match ds.create_bucket(&transaction, bucket) {
Ok(_) => Ok(Responses::Empty()),
Err(e) => Err(e)
}
},
Commands::DeleteBucket(bucketname) => {
ds.commit = true;
match ds.delete_bucket(&transaction, &bucketname) {
Ok(_) => Ok(Responses::Empty()),
Err(e) => Err(e)
}
},
Commands::GetBucket(bucketname) => {
ds.commit = true;
match ds.get_bucket(&bucketname) {
Ok(b) => Ok(Responses::Bucket(b)),
Err(e) => Err(e)
}
},
Commands::GetBuckets() => {
ds.commit = true;
Ok(Responses::BucketMap(ds.get_buckets()))
},
Commands::InsertEvents(bucketname, events) => {
ds.commit = true;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove

Copy link
Copy Markdown
Member Author

@ErikBjare ErikBjare Sep 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I won't until we've fixed the commit-before-get. If I do I have to put ds.force_commit() after practically every ds.insert_events() which is really annoying and hard to debug.

match ds.insert_events(&transaction, &bucketname, events) {
Ok(events) => {
last_heartbeat.insert(bucketname.to_string(), None); // invalidate last_heartbeat cache
Expand All @@ -257,17 +265,23 @@ impl DatastoreWorker {
}
},
Commands::GetEvents(bucketname, starttime_opt, endtime_opt, limit_opt) => {
ds.commit = true;
match ds.get_events(&transaction, &bucketname, starttime_opt, endtime_opt, limit_opt) {
Ok(el) => Ok(Responses::EventList(el)),
Err(e) => Err(e)
}
},
Commands::GetEventCount(bucketname, starttime_opt, endtime_opt) => {
ds.commit = true;
match ds.get_event_count(&transaction, &bucketname, starttime_opt, endtime_opt) {
Ok(n) => Ok(Responses::Count(n)),
Err(e) => Err(e)
}
},
Commands::ForceCommit() => {
ds.commit = true;
Ok(Responses::Empty())
},
};
response_sender.respond(response);
if ds.commit || ds.uncommited_events > 100 { break };
Expand Down Expand Up @@ -413,7 +427,6 @@ impl DatastoreInstance {
match conn.execute("DELETE FROM buckets WHERE id = ?1", &[&bucket.bid]) {
Ok(_) => {
self.buckets_cache.remove(bucket_id);
self.commit = true;
return Ok(());
},
Err(err) => match err {
Expand Down Expand Up @@ -686,11 +699,13 @@ impl Datastore {

fn _new_internal(method: DatastoreMethod) -> Self {
let (requester, responder) = crossbeam_requests::channel::<Commands, Result<Responses, DatastoreError>>();
let method_clone = method.clone();
let _thread = thread::spawn(move || {
let mut di = DatastoreWorker::new(responder);
di.work_loop(method);
di.work_loop(method_clone);
Comment thread
ErikBjare marked this conversation as resolved.
});
Datastore {
method,
requester,
}
}
Expand Down Expand Up @@ -787,4 +802,16 @@ impl Datastore {
Err(e) => Err(e)
}
}

pub fn force_commit(&self) -> Result<(), DatastoreError> {
let cmd = Commands::ForceCommit();
let receiver = self.requester.request(cmd).unwrap();
match receiver.collect().unwrap() {
Ok(r) => match r {
Responses::Empty() => Ok(()),
_ => panic!("Invalid response")
},
Err(e) => Err(e)
}
}
}
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ extern crate plex;

extern crate appdirs;

#[cfg(target_os="android")]
#[macro_use] extern crate lazy_static;

#[macro_use] extern crate log;
Expand All @@ -40,3 +41,5 @@ pub mod config;

#[cfg(target_os="android")]
pub mod android;

pub mod sync;
2 changes: 1 addition & 1 deletion src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub fn setup_logger() -> Result<(), fern::InitError> {
})
.chain(std::io::stdout())
.level(log::LevelFilter::Info)
.level_for("aw_server", log::LevelFilter::Debug)
//.level_for("aw_server", log::LevelFilter::Debug)
)
// No color and lower log levels to logfile
.chain(fern::Dispatch::new()
Expand Down
39 changes: 2 additions & 37 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,41 +1,6 @@
#![feature(plugin)]
#![feature(proc_macro_hygiene)]
#![feature(custom_attribute)]
#![feature(decl_macro)]
#[macro_use] extern crate rocket;
#[macro_use] extern crate rocket_contrib;
extern crate rocket_cors;
extern crate multipart;

extern crate serde;
extern crate serde_json;
#[macro_use] extern crate serde_derive;

extern crate rusqlite;

extern crate crossbeam_requests;

extern crate chrono;

extern crate plex;

extern crate appdirs;

#[macro_use] extern crate lazy_static;

#[macro_use] extern crate log;
extern crate fern;

extern crate toml;

pub mod models;
pub mod transform;
pub mod datastore;
pub mod query;
pub mod endpoints;
pub mod dirs;
pub mod logging;
pub mod config;

use aw_server::*;

fn main() {
use std::sync::Mutex;
Expand Down
21 changes: 21 additions & 0 deletions src/sync/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#[macro_use] extern crate log;

use aw_server::*;

fn main() {
// What needs to be done:
// - [x] Setup local sync bucket
// - Import local buckets and sync events from aw-server (either through API or through creating a read-only Datastore)
// - Import buckets and sync events from remotes

println!("Started aw-sync-rust...");
logging::setup_logger().expect("Failed to setup logging");

aw_server::sync::sync_run();
info!("Finished successfully, exiting...");

// Needed to give the datastores some time to commit before program is shut down.
// 100ms isn't actually needed, seemed to work fine with as little as 10ms, but I'd rather give
// it some wiggleroom.
std::thread::sleep(std::time::Duration::from_millis(100));
}
Loading