Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
751 changes: 491 additions & 260 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ authors = ["Johan Bjäreholt <johan@bjareho.lt>", "Erik Bjäreholt <erik@bjareho
rocket = "0.4"
rocket_contrib = { version = "*", default-features = false, features = ["json"] }
rocket_cors = "0.4"
multipart = "0.16"
serde = "1.0"
serde_json = "1.0"
serde_derive = "1.0"
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ A reimplementation of aw-server in Rust

Primary features missing:
- Config
- Export HTTP api
- Export HTTP api for single buckets
- Safe CORS (not allow all, fix dynamic CORS in rocket_cors upstream)

Caveats:
Expand Down
2 changes: 1 addition & 1 deletion src/datastore/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ impl DatastoreInstance {
let mut inserted_bucket = bucket.clone();
inserted_bucket.bid = Some(rowid);

info!("Created bucket {:?}", inserted_bucket);
info!("Created bucket {}", inserted_bucket.id);
self.buckets_cache.insert(bucket.id.clone(), inserted_bucket);
self.commit = true;
},
Expand Down
33 changes: 33 additions & 0 deletions src/endpoints/bucket.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
use std::collections::HashMap;
use std::io::Cursor;

use rocket_contrib::json::Json;

use chrono::DateTime;
use chrono::Utc;

use models::Bucket;
use models::BucketsExport;
use models::Event;

use rocket::State;
use rocket::response::status;
use rocket::response::Response;
use rocket::http::Header;
use rocket::http::Status;

use endpoints::ServerState;
Expand Down Expand Up @@ -162,6 +166,35 @@ pub fn bucket_event_count(bucket_id: String, state: State<ServerState>) -> Resul
}
}

#[get("/<bucket_id>/export")]
pub fn bucket_export(bucket_id: String, state: State<ServerState>) -> Result<Response, Status> {
let datastore = endpoints_get_lock!(state.datastore);
let mut export = BucketsExport {
buckets: HashMap::new()
};
let mut bucket = match datastore.get_bucket(&bucket_id) {
Ok(bucket) => bucket,
Err(err) => match err {
DatastoreError::NoSuchBucket => return Err(Status::NotFound),
e => {
warn!("Failed to fetch events: {:?}", e);
return Err(Status::InternalServerError);
}
}
};
bucket.events = Some(datastore.get_events(&bucket_id, None, None, None).expect("Failed to get events for bucket"));
export.buckets.insert(bucket_id.clone(), bucket);
let filename = format!("aw-bucket-export_{}.json", bucket_id);

let header_content = format!("attachment; filename={}", filename);
let response = Response::build()
.status(Status::Ok)
.header(Header::new("Content-Disposition", header_content))
.sized_body(Cursor::new(serde_json::to_string(&export).expect("Failed to serialize")))
.finalize();
return Ok(response);
}

#[delete("/<bucket_id>")]
pub fn bucket_delete(bucket_id: String, state: State<ServerState>) -> Result<(), Status> {
let datastore = endpoints_get_lock!(state.datastore);
Expand Down
30 changes: 30 additions & 0 deletions src/endpoints/export.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use std::collections::HashMap;
use std::io::Cursor;

use rocket::http::Header;
use rocket::response::Response;
use rocket::http::Status;
use rocket::State;

use endpoints::ServerState;
use models::BucketsExport;

#[get("/")]
pub fn buckets_export(state: State<ServerState>) -> Result<Response, Status> {
let datastore = endpoints_get_lock!(state.datastore);
let mut export = BucketsExport {
buckets: HashMap::new()
};
let mut buckets = datastore.get_buckets().unwrap();
for (bid, mut bucket) in buckets.drain() {
bucket.events = Some(datastore.get_events(&bid, None, None, None).expect("Failed to get events for bucket"));
export.buckets.insert(bid, bucket);
}

let response = Response::build()
.status(Status::Ok)
.header(Header::new("Content-Disposition", "attachment; filename=aw-buckets-export.json"))
.sized_body(Cursor::new(serde_json::to_string(&export).expect("Failed to serialize")))
.finalize();
return Ok(response);
}
76 changes: 50 additions & 26 deletions src/endpoints/import.rs
Original file line number Diff line number Diff line change
@@ -1,41 +1,65 @@
use rocket::State;
use rocket::Data;
use rocket::http::Status;
use rocket::http::ContentType;
use rocket_contrib::json::Json;

use std::collections::HashMap;
use multipart::server::Multipart;

use endpoints::ServerState;
use models::Bucket;
use std::io::Read;
use std::sync::Mutex;

#[derive(Clone,Serialize,Deserialize)]
#[serde(untagged)]
pub enum ImportFormat {
Single(Bucket),
Multiple(HashMap<String, Bucket>),
}
use models::BucketsExport;
use datastore::Datastore;
use endpoints::ServerState;

#[post("/", data = "<json_data>", format = "application/json")]
pub fn bucket_import(state: State<ServerState>, json_data: Json<ImportFormat>) -> Result<(), Status> {
match json_data.into_inner() {
ImportFormat::Single(bucket) => match endpoints_get_lock!(state.datastore).create_bucket(&bucket) {
fn import(datastore_mutex: &Mutex<Datastore>, import: BucketsExport) -> Result<(), Status> {
let datastore = endpoints_get_lock!(datastore_mutex);
for (_bucketname, bucket) in import.buckets {
match datastore.create_bucket(&bucket) {
Ok(_) => (),
Err(e) => {
warn!("Failed to import bucket: {:?}", e);
return Err(Status::InternalServerError)
}
},
ImportFormat::Multiple(buckets) => {
for (_bucketname, bucket) in buckets {
match endpoints_get_lock!(state.datastore).create_bucket(&bucket) {
Ok(_) => (),
Err(e) => {
warn!("Failed to import bucket: {:?}", e);
return Err(Status::InternalServerError)
},
}
}
return Err(Status::InternalServerError);
},
}
}
Ok(())
}

#[post("/", data = "<json_data>", format = "application/json")]
pub fn bucket_import_json(state: State<ServerState>, json_data: Json<BucketsExport>) -> Result<(), Status> {
import(&state.datastore, json_data.into_inner())
}

// FIXME: This eats a lot of RAM (double the amount of the size of the file imported)
// In Rocket 0.5 this will likely be improved when native multipart support is added
#[post("/", data = "<data>", format = "multipart/form-data")]
pub fn bucket_import_form(state: State<ServerState>, cont_type: &ContentType, data: Data) -> Result<(), Status> {
let (_, boundary) = cont_type.params().find(|&(k, _)| k == "boundary").ok_or_else( || {
warn!("`Content-Type: multipart/form-data` boundary param not provided");
return Status::BadRequest;
)?;

let string = process_multipart_packets(boundary, data);

let import_data : BucketsExport = serde_json::from_str(&string)
.expect("Failed to deserialize import data as JSON to bucket format");

import(&state.datastore, import_data)

}

// NOTE: this is far from a optimal way of parsing multipart packets as it doesn't check
// headers and can be used for denial-of-service attacks as we don't have a size limit and
// store everything in RAM
fn process_multipart_packets(boundary: &str, data: Data) -> String {
let mut content = String::new();
Multipart::with_body(data.open(), boundary).foreach_entry(| mut entry | {
let mut string = String::new();
entry.data.read_to_string(&mut string).expect("Failed to parse multipart data to utf-8");
content.push_str(&string);
}).expect("Failed to retrieve multipart upload");

content
}
12 changes: 9 additions & 3 deletions src/endpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ mod bucket;
mod query;
mod import;
mod cors;
mod export;

use datastore::Datastore;

Expand Down Expand Up @@ -107,13 +108,18 @@ pub fn rocket(server_state: ServerState, config: Option<Config>) -> rocket::Rock
.mount("/api/0/info", routes![server_info])
.mount("/api/0/buckets", routes![
bucket::bucket_new, bucket::bucket_delete, bucket::buckets_get, bucket::bucket_get,
bucket::bucket_events_get, bucket::bucket_events_create, bucket::bucket_events_heartbeat, bucket::bucket_event_count
bucket::bucket_events_get, bucket::bucket_events_create, bucket::bucket_events_heartbeat, bucket::bucket_event_count,
bucket::bucket_export
])
.mount("/api/0/query", routes![
query::query
])
.mount("/api/0/import/", routes![
import::bucket_import
.mount("/api/0/import", routes![
import::bucket_import_json,
import::bucket_import_form
])
.mount("/api/0/export", routes![
export::buckets_export
])
.attach(cors::cors())
.register(catchers![not_modified, not_found])
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#[macro_use] extern crate rocket;
#[macro_use] extern crate rocket_contrib;
extern crate rocket_cors;
extern crate multipart;

extern crate serde;
extern crate serde_json;
Expand Down
1 change: 1 addition & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#[macro_use] extern crate rocket;
#[macro_use] extern crate rocket_contrib;
extern crate rocket_cors;
extern crate multipart;

extern crate serde;
extern crate serde_json;
Expand Down
6 changes: 6 additions & 0 deletions src/models/bucket.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use chrono::DateTime;
use chrono::Utc;
use std::collections::HashMap;

use models::Event;

Expand All @@ -16,3 +17,8 @@ pub struct Bucket {
pub created: Option<DateTime<Utc>>,
pub events: Option<Vec<Event>>,
}

#[derive(Clone,Serialize,Deserialize)]
pub struct BucketsExport {
pub buckets: HashMap<String, Bucket>,
}
1 change: 1 addition & 0 deletions src/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mod timeinterval;
mod query;

pub use self::bucket::Bucket;
pub use self::bucket::BucketsExport;
pub use self::event::Event;
pub use self::timeinterval::TimeInterval;
pub use self::query::Query;
Expand Down
65 changes: 44 additions & 21 deletions tests/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@ extern crate aw_server;
mod api_tests {
use std::path::PathBuf;
use std::sync::Mutex;
use rocket::http::Header;
use rocket::http::ContentType;

use aw_server::datastore;
use aw_server::endpoints;

use aw_server::models::BucketsExport;

fn setup_testserver() -> rocket::Rocket {
let state = endpoints::ServerState {
datastore: Mutex::new(datastore::Datastore::new_in_memory()),
Expand Down Expand Up @@ -176,59 +179,79 @@ mod api_tests {
}

#[test]
fn test_import() {
fn test_import_export() {
let server = setup_testserver();
let client = rocket::local::Client::new(server).expect("valid instance");

// Import single bucket
// Import bucket
let mut res = client.post("/api/0/import")
.header(ContentType::JSON)
.body(r#"{
.body(r#"{"buckets":
{"id1": {
"id": "id1",
"type": "type",
"client": "client",
"hostname": "hostname",
"events": [{
"timestamp":"2000-01-01T00:00:00.000000+00:00",
"timestamp":"2000-01-01T00:00:00Z",
"duration":1.0,
"data": {}
}]
}"#)
}}}"#)
.dispatch();
debug!("{:?}", res.body_string());
assert_eq!(res.status(), rocket::http::Status::Ok);

// Get created bucket
res = client.get("/api/0/buckets/id1")
// Export single created bucket
let mut res = client.get("/api/0/buckets/id1/export")
.header(ContentType::JSON)
.dispatch();
debug!("{:?}", res.body_string());
assert_eq!(res.status(), rocket::http::Status::Ok);
let export : BucketsExport = serde_json::from_str(&res.body_string().unwrap()).unwrap();

// Import multiple buckets
let mut res = client.post("/api/0/import")
// Delete bucket so we can import it again
res = client.delete("/api/0/buckets/id1")
.header(ContentType::JSON)
.body(r#"{"id2": {
"id": "id2",
"type": "type",
"client": "client",
"hostname": "hostname",
"events": [{
"timestamp":"2000-01-01T00:00:00Z",
"duration":1.0,
"data": {}
}]
}}"#)
.dispatch();
debug!("{:?}", res.body_string());
assert_eq!(res.status(), rocket::http::Status::Ok);

// Import exported bucket again but with multipart/form-data
// NOTE: DO NOT TOUCH START AND END VARS!
// They are byte-perfect and this was really cumbersome to fix, modifying them will most
// likely break the test
let start = b"--a\r\nContent-Disposition: form-data; name=\"test\"\r\n\r\n";
let content = serde_json::to_vec(&export).unwrap();
let end = b"--a--";
let sum = [&start[..], &content[..], &end[..]].concat();
let mut res = client.post("/api/0/import")
.header(Header::new("Content-Type", "multipart/form-data; boundary=a"))
.body(&sum[..])
.dispatch();
debug!("{:?}", res.body_string());
assert_eq!(res.status(), rocket::http::Status::Ok);

// Get created bucket
res = client.get("/api/0/buckets/id1")
let mut res = client.get("/api/0/buckets/id1")
.header(ContentType::JSON)
.dispatch();
println!("{:?}", res.body_string());
assert_eq!(res.status(), rocket::http::Status::Ok);

// Export all buckets
let mut res = client.get("/api/0/export")
.header(ContentType::JSON)
.dispatch();
debug!("{:?}", res.body_string());
assert_eq!(res.status(), rocket::http::Status::Ok);
let export : BucketsExport = serde_json::from_str(&res.body_string().unwrap()).unwrap();
let mut buckets = export.buckets;
assert_eq!(buckets.len(), 1);
let b = buckets.remove("id1").unwrap();
assert_eq!(b.events.unwrap().len(), 1);

assert_eq!(buckets.len(), 0);
}

#[test]
Expand Down