From b6d15773c83e57c5781662d56e3763ab93be31d8 Mon Sep 17 00:00:00 2001 From: KCarretto Date: Sun, 11 Feb 2024 21:35:58 +0000 Subject: [PATCH] refactor eldritch runtime --- implants/golem/Cargo.toml | 2 +- implants/golem/src/main.rs | 57 +--- implants/imix/src/agent.rs | 6 +- implants/imix/src/install.rs | 25 +- implants/imix/src/task.rs | 22 +- implants/lib/eldritch/Cargo.toml | 1 + implants/lib/eldritch/src/assets/copy_impl.rs | 46 ++- implants/lib/eldritch/src/lib.rs | 2 +- implants/lib/eldritch/src/report/mod.rs | 14 +- .../eldritch/src/report/process_list_impl.rs | 6 +- implants/lib/eldritch/src/runtime/broker.rs | 110 ------- implants/lib/eldritch/src/runtime/drain.rs | 38 +++ .../src/runtime/{client.rs => environment.rs} | 57 ++-- implants/lib/eldritch/src/runtime/exec.rs | 304 ++++++++++-------- implants/lib/eldritch/src/runtime/mod.rs | 43 ++- 15 files changed, 335 insertions(+), 398 deletions(-) delete mode 100644 implants/lib/eldritch/src/runtime/broker.rs create mode 100644 implants/lib/eldritch/src/runtime/drain.rs rename implants/lib/eldritch/src/runtime/{client.rs => environment.rs} (53%) diff --git a/implants/golem/Cargo.toml b/implants/golem/Cargo.toml index af9326984..2d855b883 100644 --- a/implants/golem/Cargo.toml +++ b/implants/golem/Cargo.toml @@ -4,7 +4,7 @@ version = "0.0.5" edition = "2021" [dependencies] -eldritch = { workspace = true } +eldritch = { workspace = true, features = ["print_stdout"] } tokio = { workspace = true, features = ["macros"] } clap = { workspace = true } starlark = { workspace = true } diff --git a/implants/golem/src/main.rs b/implants/golem/src/main.rs index 108da5b26..bd6c0d1a6 100644 --- a/implants/golem/src/main.rs +++ b/implants/golem/src/main.rs @@ -1,61 +1,36 @@ extern crate eldritch; extern crate golem; +mod inter; + use anyhow::{anyhow, Result}; use clap::{Arg, Command}; use eldritch::pb::Tome; -use eldritch::Runtime; use std::collections::HashMap; use std::fs; use std::process; -use tokio::task::JoinHandle; - -mod inter; struct ParsedTome { - pub path: String, pub eldritch: String, } -struct Handle { - handle: JoinHandle<()>, - path: String, - broker: eldritch::Broker, -} - async fn run_tomes(tomes: Vec) -> Result> { - let mut handles = Vec::new(); + let mut runtimes = Vec::new(); for tome in tomes { - let (mut runtime, broker) = Runtime::new(); - runtime.with_stdout_reporting(); - let handle = tokio::task::spawn_blocking(move || { - runtime.run(Tome { - eldritch: tome.eldritch, - parameters: HashMap::new(), - file_names: Vec::new(), - }); - }); - handles.push(Handle { - handle, - path: tome.path, - broker, - }); + let runtime = eldritch::start(Tome { + eldritch: tome.eldritch, + parameters: HashMap::new(), + file_names: Vec::new(), + }) + .await; + runtimes.push(runtime); } let mut result = Vec::new(); - for handle in handles { - match handle.handle.await { - Ok(_) => {} - Err(err) => { - eprintln!( - "error waiting for tome to complete: {} {}", - handle.path, err - ); - continue; - } - }; - let mut out = handle.broker.collect_text(); - let errors = handle.broker.collect_errors(); + for runtime in &mut runtimes { + runtime.finish().await; + let mut out = runtime.collect_text(); + let errors = runtime.collect_errors(); if !errors.is_empty() { return Err(anyhow!("tome execution failed: {:?}", errors)); } @@ -90,7 +65,6 @@ fn main() -> anyhow::Result<()> { let tome_path = tome.to_string().clone(); let tome_contents = fs::read_to_string(tome_path.clone())?; parsed_tomes.push(ParsedTome { - path: tome_path, eldritch: tome_contents, }); } @@ -117,7 +91,6 @@ fn main() -> anyhow::Result<()> { let filename = embedded_file_path.split('/').last().unwrap_or(""); println!("{}", embedded_file_path); if filename == "main.eldritch" { - let tome_path = embedded_file_path.to_string().clone(); let tome_contents_extraction_result = match eldritch::assets::Asset::get(embedded_file_path.as_ref()) { Some(local_tome_content) => { @@ -137,7 +110,6 @@ fn main() -> anyhow::Result<()> { } }; parsed_tomes.push(ParsedTome { - path: tome_path, eldritch: tome_contents, }); } @@ -170,7 +142,6 @@ mod tests { #[tokio::test] async fn test_golem_execute_tomes_in_parallel() -> anyhow::Result<()> { let parsed_tomes = Vec::from([ParsedTome { - path: "test_hello.eldritch".to_string(), eldritch: r#"print("hello world")"#.to_string(), }]); diff --git a/implants/imix/src/agent.rs b/implants/imix/src/agent.rs index 30916d505..a483a9e48 100644 --- a/implants/imix/src/agent.rs +++ b/implants/imix/src/agent.rs @@ -4,7 +4,6 @@ use c2::{ pb::{Beacon, ClaimTasksRequest}, Transport, GRPC, }; -use eldritch::Runtime; use std::time::{Duration, Instant}; /* @@ -52,9 +51,8 @@ impl Agent { } }; - let (runtime, output) = Runtime::new(); - let handle = tokio::task::spawn_blocking(move || runtime.run(tome)); - self.handles.push(TaskHandle::new(task.id, output, handle)); + let runtime = eldritch::start(tome).await; + self.handles.push(TaskHandle::new(task.id, runtime)); #[cfg(debug_assertions)] log::info!("spawned task execution for id={}", task.id); diff --git a/implants/imix/src/install.rs b/implants/imix/src/install.rs index 6fcaf8d1d..ee35ecab6 100644 --- a/implants/imix/src/install.rs +++ b/implants/imix/src/install.rs @@ -1,5 +1,5 @@ use anyhow::{anyhow, Result}; -use eldritch::{pb::Tome, Runtime}; +use eldritch::pb::Tome; use std::collections::HashMap; pub async fn install() { @@ -31,24 +31,15 @@ pub async fn install() { // Run tome #[cfg(debug_assertions)] log::info!("running tome {embedded_file_path}"); - let (runtime, handle) = Runtime::new(); - match tokio::task::spawn_blocking(move || { - runtime.run(Tome { - eldritch, - parameters: HashMap::new(), - file_names: Vec::new(), - }); + let mut runtime = eldritch::start(Tome { + eldritch, + parameters: HashMap::new(), + file_names: Vec::new(), }) - .await - { - Ok(_) => {} - Err(_err) => { - #[cfg(debug_assertions)] - log::error!("failed waiting for tome execution: {}", _err); - } - } + .await; + runtime.finish().await; - let _output = handle.collect_text().join(""); + let _output = runtime.collect_text().join(""); #[cfg(debug_assertions)] log::info!("{_output}"); } diff --git a/implants/imix/src/task.rs b/implants/imix/src/task.rs index 55085a5b4..cac89256d 100644 --- a/implants/imix/src/task.rs +++ b/implants/imix/src/task.rs @@ -15,18 +15,16 @@ use tokio::task::JoinHandle; */ pub struct TaskHandle { id: i64, - task: JoinHandle<()>, - eldritch: eldritch::Broker, + runtime: eldritch::Runtime, download_handles: Vec>, } impl TaskHandle { // Track a new task handle. - pub fn new(id: i64, eldritch: eldritch::Broker, task: JoinHandle<()>) -> TaskHandle { + pub fn new(id: i64, runtime: eldritch::Runtime) -> TaskHandle { TaskHandle { id, - task, - eldritch, + runtime, download_handles: Vec::new(), } } @@ -41,16 +39,16 @@ impl TaskHandle { } // Check Task - self.task.is_finished() + self.runtime.is_finished() } // Report any available task output. // Also responsible for downloading any files requested by the eldritch runtime. pub async fn report(&mut self, tavern: &mut impl Transport) -> Result<()> { - let exec_started_at = self.eldritch.get_exec_started_at(); - let exec_finished_at = self.eldritch.get_exec_finished_at(); - let text = self.eldritch.collect_text(); - let err = self.eldritch.collect_errors().pop().map(|err| TaskError { + let exec_started_at = self.runtime.get_exec_started_at(); + let exec_finished_at = self.runtime.get_exec_finished_at(); + let text = self.runtime.collect_text(); + let err = self.runtime.collect_errors().pop().map(|err| TaskError { msg: err.to_string(), }); @@ -95,7 +93,7 @@ impl TaskHandle { } // Report Process Lists - let process_lists = self.eldritch.collect_process_lists(); + let process_lists = self.runtime.collect_process_lists(); for list in process_lists { #[cfg(debug_assertions)] log::info!("reporting process list: len={}", list.list.len()); @@ -120,7 +118,7 @@ impl TaskHandle { } // Download Files - let file_reqs = self.eldritch.collect_file_requests(); + let file_reqs = self.runtime.collect_file_requests(); for req in file_reqs { let name = req.name(); match self.start_file_download(tavern, req).await { diff --git a/implants/lib/eldritch/Cargo.toml b/implants/lib/eldritch/Cargo.toml index 28afd0306..9998bc950 100644 --- a/implants/lib/eldritch/Cargo.toml +++ b/implants/lib/eldritch/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" [features] # Check if compiled by imix imix = [] +print_stdout = [] [dependencies] aes = { workspace = true } diff --git a/implants/lib/eldritch/src/assets/copy_impl.rs b/implants/lib/eldritch/src/assets/copy_impl.rs index 36c178db9..08d15a2e4 100644 --- a/implants/lib/eldritch/src/assets/copy_impl.rs +++ b/implants/lib/eldritch/src/assets/copy_impl.rs @@ -1,4 +1,4 @@ -use crate::runtime::Client; +use crate::runtime::Environment; use anyhow::{Context, Result}; use starlark::{eval::Evaluator, values::list::ListRef}; use std::fs::OpenOptions; @@ -60,8 +60,8 @@ pub fn copy(starlark_eval: &Evaluator<'_, '_>, src: String, dst: String) -> Resu let src_value = starlark_eval.module().heap().alloc_str(&src); if tmp_list.contains(&src_value.to_value()) { - let client = Client::from_extra(starlark_eval.extra)?; - let file_reciever = client.request_file(src)?; + let env = Environment::from_extra(starlark_eval.extra)?; + let file_reciever = env.request_file(src)?; return copy_remote(file_reciever, dst); } @@ -72,8 +72,6 @@ pub fn copy(starlark_eval: &Evaluator<'_, '_>, src: String, dst: String) -> Resu #[cfg(test)] mod tests { use crate::assets::copy_impl::copy_remote; - use crate::Runtime; - use std::sync::mpsc::channel; use std::{collections::HashMap, io::prelude::*}; use tempfile::NamedTempFile; @@ -110,25 +108,21 @@ mod tests { let mut tmp_file_dst = NamedTempFile::new()?; let path_dst = String::from(tmp_file_dst.path().to_str().unwrap()); - // Create a runtime - let (runtime, broker) = Runtime::new(); - - // Execute eldritch in it's own thread - let handle = tokio::task::spawn_blocking(move || { - runtime.run(crate::pb::Tome { - eldritch: r#"assets.copy("test_tome/test_file.txt", input_params['test_output'])"# - .to_owned(), - parameters: HashMap::from([("test_output".to_string(), path_dst)]), - file_names: Vec::from(["test_tome/test_file.txt".to_string()]), - }) - }); + // Run Eldritch (in it's own thread) + let mut runtime = crate::start(crate::pb::Tome { + eldritch: r#"assets.copy("test_tome/test_file.txt", input_params['test_output'])"# + .to_owned(), + parameters: HashMap::from([("test_output".to_string(), path_dst)]), + file_names: Vec::from(["test_tome/test_file.txt".to_string()]), + }) + .await; // We now mock the agent, looping until eldritch requests a file // We omit the sleep performed by the agent, just to save test time loop { - // The broker only returns the data that is currently available + // The runtime only returns the data that is currently available // So this may return an empty vec if our eldritch tokio task has not yet been scheduled - let mut reqs = broker.collect_file_requests(); + let mut reqs = runtime.collect_file_requests(); // If no file request is yet available, just continue looping if reqs.is_empty() { @@ -154,7 +148,7 @@ mod tests { } // Now that we've finished writing data, we wait for eldritch to finish - handle.await?; + runtime.finish().await; // Lastly, assert the file was written correctly let mut contents = String::new(); @@ -164,8 +158,8 @@ mod tests { Ok(()) } - #[test] - fn test_embedded_copy() -> anyhow::Result<()> { + #[tokio::test] + async fn test_embedded_copy() -> anyhow::Result<()> { // Create files let mut tmp_file_dst = NamedTempFile::new()?; let path_dst = String::from(tmp_file_dst.path().to_str().unwrap()); @@ -175,8 +169,7 @@ mod tests { #[cfg(target_os = "windows")] let path_src = "exec_script/hello_world.bat".to_string(); - let (runtime, broker) = Runtime::new(); - runtime.run(crate::pb::Tome { + let runtime = crate::start(crate::pb::Tome { eldritch: r#"assets.copy(input_params['src_file'], input_params['test_output'])"# .to_owned(), parameters: HashMap::from([ @@ -184,9 +177,10 @@ mod tests { ("test_output".to_string(), path_dst), ]), file_names: Vec::from(["test_tome/test_file.txt".to_string()]), - }); + }) + .await; - assert!(broker.collect_errors().is_empty()); // No errors even though the remote asset is inaccessible + assert!(runtime.collect_errors().is_empty()); // No errors even though the remote asset is inaccessible let mut contents = String::new(); tmp_file_dst.read_to_string(&mut contents)?; diff --git a/implants/lib/eldritch/src/lib.rs b/implants/lib/eldritch/src/lib.rs index f46295115..f55975ad7 100644 --- a/implants/lib/eldritch/src/lib.rs +++ b/implants/lib/eldritch/src/lib.rs @@ -12,7 +12,7 @@ pub mod pb { include!("eldritch.rs"); } -pub use runtime::{Broker, FileRequest, Runtime}; +pub use runtime::{start, FileRequest, Runtime}; #[allow(unused_imports)] use starlark::const_frozen_string; diff --git a/implants/lib/eldritch/src/report/mod.rs b/implants/lib/eldritch/src/report/mod.rs index 1e3ff24f7..1d3a8900a 100644 --- a/implants/lib/eldritch/src/report/mod.rs +++ b/implants/lib/eldritch/src/report/mod.rs @@ -63,7 +63,6 @@ mod test { use crate::pb::process::Status; use crate::pb::{Process, ProcessList, Tome}; - use crate::Runtime; use anyhow::Error; macro_rules! process_list_tests { @@ -72,23 +71,20 @@ mod test { #[tokio::test] async fn $name() { let tc: TestCase = $value; - let (runtime, broker) = Runtime::new(); - let handle = tokio::task::spawn_blocking(move || { - runtime.run(tc.tome); - }); + let mut runtime = crate::start(tc.tome).await; + runtime.finish().await; let want_err_str = match tc.want_error { Some(err) => err.to_string(), None => "".to_string(), }; - let err_str = match broker.collect_errors().pop() { + let err_str = match runtime.collect_errors().pop() { Some(err) => err.to_string(), None => "".to_string(), }; assert_eq!(want_err_str, err_str); - assert_eq!(tc.want_output, broker.collect_text().join("")); - assert_eq!(Some(tc.want_proc_list), broker.collect_process_lists().pop()); - handle.await.unwrap(); + assert_eq!(tc.want_output, runtime.collect_text().join("")); + assert_eq!(Some(tc.want_proc_list), runtime.collect_process_lists().pop()); } )* } diff --git a/implants/lib/eldritch/src/report/process_list_impl.rs b/implants/lib/eldritch/src/report/process_list_impl.rs index 52e83fa0b..8957cd7a2 100644 --- a/implants/lib/eldritch/src/report/process_list_impl.rs +++ b/implants/lib/eldritch/src/report/process_list_impl.rs @@ -4,14 +4,14 @@ use starlark::{collections::SmallMap, eval::Evaluator}; use crate::{ pb::{process::Status, Process, ProcessList}, - runtime::Client, + runtime::Environment, }; pub fn process_list( starlark_eval: &Evaluator<'_, '_>, process_list: Vec>, ) -> Result<()> { - let client = Client::from_extra(starlark_eval.extra)?; + let env = Environment::from_extra(starlark_eval.extra)?; let mut pb_process_list = ProcessList { list: Vec::new() }; for proc in process_list { @@ -28,7 +28,7 @@ pub fn process_list( }) } - client.report_process_list(pb_process_list)?; + env.report_process_list(pb_process_list)?; Ok(()) } diff --git a/implants/lib/eldritch/src/runtime/broker.rs b/implants/lib/eldritch/src/runtime/broker.rs deleted file mode 100644 index fe8edaa49..000000000 --- a/implants/lib/eldritch/src/runtime/broker.rs +++ /dev/null @@ -1,110 +0,0 @@ -use super::FileRequest; -use crate::pb::{File, ProcessList}; -use anyhow::Error; -use prost_types::Timestamp; -use std::sync::mpsc::Receiver; -use std::time::Duration; - -/* - * A Broker to the runtime enables callers to interact with in-progress eldritch execution. - * This enables the realtime collection of output and structured output, - * as well as providing resources (such as files) to the runtime. - * Each of the `collect` methods will return lists of all currently available data. - */ -pub struct Broker { - pub(super) exec_started_at: Receiver, - pub(super) exec_finished_at: Receiver, - pub(super) outputs: Receiver, - pub(super) errors: Receiver, - pub(super) process_lists: Receiver, - pub(super) files: Receiver, - pub(super) file_requests: Receiver, -} - -impl Broker { - /* - * Returns the timestamp of when execution started, if available. - */ - pub fn get_exec_started_at(&self) -> Option { - drain_last(&self.exec_started_at) - } - - /* - * Returns the timestamp of when execution finished, if available. - */ - pub fn get_exec_finished_at(&self) -> Option { - drain_last(&self.exec_finished_at) - } - - /* - * Collects all currently available reported text output. - */ - pub fn collect_text(&self) -> Vec { - drain(&self.outputs) - } - - /* - * Collects all currently available reported errors, if any. - */ - pub fn collect_errors(&self) -> Vec { - drain(&self.errors) - } - - /* - * Returns all currently available reported process lists, if any. - */ - pub fn collect_process_lists(&self) -> Vec { - drain(&self.process_lists) - } - - /* - * Returns all currently available reported files, if any. - */ - pub fn collect_files(&self) -> Vec { - drain(&self.files) - } - - /* - * Returns all FileRequests that the eldritch runtime has requested, if any. - */ - pub fn collect_file_requests(&self) -> Vec { - drain(&self.file_requests) - } -} - -/* - * Drain a receiver, returning only the last currently available result. - */ -fn drain_last(receiver: &Receiver) -> Option { - drain(receiver).pop() -} - -/* - * Drain a receiver, returning all currently available results as a Vec. - */ -fn drain(reciever: &Receiver) -> Vec { - let mut result: Vec = Vec::new(); - loop { - let val = match reciever.recv_timeout(Duration::from_millis(100)) { - Ok(v) => v, - Err(err) => { - match err.to_string().as_str() { - "channel is empty and sending half is closed" => { - break; - } - "timed out waiting on channel" => { - break; - } - _ => { - #[cfg(debug_assertions)] - eprint!("failed to drain channel: {}", err) - } - } - break; - } - }; - // let appended_line = format!("{}{}", res.to_owned(), new_res_line); - result.push(val); - } - result -} diff --git a/implants/lib/eldritch/src/runtime/drain.rs b/implants/lib/eldritch/src/runtime/drain.rs new file mode 100644 index 000000000..5a38c45da --- /dev/null +++ b/implants/lib/eldritch/src/runtime/drain.rs @@ -0,0 +1,38 @@ +use std::sync::mpsc::Receiver; +use std::time::Duration; + +/* + * Drain a receiver, returning only the last currently available result. + */ +pub fn drain_last(receiver: &Receiver) -> Option { + drain(receiver).pop() +} + +/* + * Drain a receiver, returning all currently available results as a Vec. + */ +pub fn drain(reciever: &Receiver) -> Vec { + let mut result: Vec = Vec::new(); + loop { + let val = match reciever.recv_timeout(Duration::from_millis(100)) { + Ok(v) => v, + Err(err) => { + match err.to_string().as_str() { + "channel is empty and sending half is closed" => { + break; + } + "timed out waiting on channel" => { + break; + } + _ => { + #[cfg(debug_assertions)] + eprint!("failed to drain channel: {}", err) + } + } + break; + } + }; + result.push(val); + } + result +} diff --git a/implants/lib/eldritch/src/runtime/client.rs b/implants/lib/eldritch/src/runtime/environment.rs similarity index 53% rename from implants/lib/eldritch/src/runtime/client.rs rename to implants/lib/eldritch/src/runtime/environment.rs index ef38e5171..bfafd045e 100644 --- a/implants/lib/eldritch/src/runtime/client.rs +++ b/implants/lib/eldritch/src/runtime/environment.rs @@ -1,11 +1,14 @@ use crate::pb::{File, ProcessList}; use anyhow::{Context, Error, Result}; -use starlark::values::{AnyLifetime, ProvidesStaticType}; +use starlark::{ + values::{AnyLifetime, ProvidesStaticType}, + PrintHandler, +}; use std::sync::mpsc::{channel, Receiver, Sender}; pub struct FileRequest { name: String, - ch_data: Sender>, + tx_data: Sender>, } impl FileRequest { @@ -14,28 +17,28 @@ impl FileRequest { } pub fn send_chunk(&self, chunk: Vec) -> Result<()> { - self.ch_data.send(chunk)?; + self.tx_data.send(chunk)?; Ok(()) } } #[derive(ProvidesStaticType)] -pub struct Client { - pub(super) ch_output: Sender, - pub(super) ch_error: Sender, - pub(super) ch_process_list: Sender, - pub(super) ch_file: Sender, - pub(super) ch_file_requests: Sender, +pub struct Environment { + pub(super) tx_output: Sender, + pub(super) tx_error: Sender, + pub(super) tx_process_list: Sender, + pub(super) tx_file: Sender, + pub(super) tx_file_request: Sender, } -impl Client { +impl Environment { /* - * Extract an existing runtime client from the starlark evaluator extra field. + * Extract an existing runtime environment from the starlark evaluator extra field. */ - pub fn from_extra<'a>(extra: Option<&'a dyn AnyLifetime<'a>>) -> Result<&'a Client> { + pub fn from_extra<'a>(extra: Option<&'a dyn AnyLifetime<'a>>) -> Result<&'a Environment> { extra .context("no extra field present in evaluator")? - .downcast_ref::() + .downcast_ref::() .context("no runtime client present in evaluator") } @@ -43,15 +46,15 @@ impl Client { * Report output of the tome execution. */ pub fn report_output(&self, output: String) -> Result<()> { - self.ch_output.send(output)?; + self.tx_output.send(output)?; Ok(()) } /* - * Report error of the tome execution. + * Report error during tome execution. */ pub fn report_error(&self, err: anyhow::Error) -> Result<()> { - self.ch_error.send(err)?; + self.tx_error.send(err)?; Ok(()) } @@ -59,7 +62,7 @@ impl Client { * Report a process list that was collected by the tome. */ pub fn report_process_list(&self, processes: ProcessList) -> Result<()> { - self.ch_process_list.send(processes)?; + self.tx_process_list.send(processes)?; Ok(()) } @@ -67,7 +70,7 @@ impl Client { * Report a file that was collected by the tome. */ pub fn report_file(&self, f: File) -> Result<()> { - self.ch_file.send(f)?; + self.tx_file.send(f)?; Ok(()) } @@ -76,8 +79,22 @@ impl Client { * This will return a channel of file chunks. */ pub fn request_file(&self, name: String) -> Result>> { - let (ch_data, data) = channel::>(); - self.ch_file_requests.send(FileRequest { name, ch_data })?; + let (tx_data, data) = channel::>(); + self.tx_file_request.send(FileRequest { name, tx_data })?; Ok(data) } } + +/* + * Enables Environment to be used as a starlark print handler. + */ +impl PrintHandler for Environment { + fn println(&self, text: &str) -> Result<()> { + self.report_output(text.to_string())?; + + #[cfg(feature = "print_stdout")] + print!("{}", text); + + Ok(()) + } +} diff --git a/implants/lib/eldritch/src/runtime/exec.rs b/implants/lib/eldritch/src/runtime/exec.rs index f4513c1b3..2213ee1e6 100644 --- a/implants/lib/eldritch/src/runtime/exec.rs +++ b/implants/lib/eldritch/src/runtime/exec.rs @@ -1,13 +1,12 @@ -use super::{Broker, Client, FileRequest}; +use super::{drain::drain, drain::drain_last, Environment, FileRequest}; use crate::pb::{File, ProcessList}; use crate::{ assets::AssetsLibrary, crypto::CryptoLibrary, file::FileLibrary, pb::Tome, pivot::PivotLibrary, process::ProcessLibrary, report::ReportLibrary, sys::SysLibrary, time::TimeLibrary, }; -use anyhow::{Error, Result}; +use anyhow::{Context, Error, Result}; use chrono::Utc; use prost_types::Timestamp; -use starlark::PrintHandler; use starlark::{ collections::SmallMap, environment::{Globals, GlobalsBuilder, LibraryExtension, Module}, @@ -17,113 +16,134 @@ use starlark::{ values::dict::Dict, values::AllocValue, }; -use std::sync::mpsc::{channel, Sender}; +use std::sync::mpsc::{channel, Receiver}; +use tokio::task::JoinHandle; -/* - * Eldritch Runtime - * - * This runtime is responsible for executing Tomes and reporting their output. - * It acts as an interface between callers and starlark, exposing our standard libraries to the starlark interpreter. - * It is also used to provide dependency injection for eldritch library functions (using `Runtime::from_extra(starlark_interpreter.extra)`). - */ -pub struct Runtime { - stdout_reporting: bool, +pub async fn start(tome: Tome) -> Runtime { + let (tx_exec_started_at, rx_exec_started_at) = channel::(); + let (tx_exec_finished_at, rx_exec_finished_at) = channel::(); + let (tx_error, rx_error) = channel::(); + let (tx_output, rx_output) = channel::(); + let (tx_process_list, rx_process_list) = channel::(); + let (tx_file, rx_file) = channel::(); + let (tx_file_request, rx_file_request) = channel::(); - ch_exec_started_at: Sender, - ch_exec_finished_at: Sender, + let env = Environment { + tx_output, + tx_error: tx_error.clone(), + tx_process_list, + tx_file, + tx_file_request, + }; - client: Client, -} - -impl Runtime { - /* - * Prepare a new Runtime for execution of a single tome. - */ - pub fn new() -> (Runtime, Broker) { - let (ch_exec_started_at, exec_started_at) = channel::(); - let (ch_exec_finished_at, exec_finished_at) = channel::(); - let (ch_error, errors) = channel::(); - let (ch_output, outputs) = channel::(); - let (ch_process_list, process_lists) = channel::(); - let (ch_file, files) = channel::(); - let (ch_file_requests, file_requests) = channel::(); - ( - Runtime { - stdout_reporting: false, - ch_exec_started_at, - ch_exec_finished_at, - client: Client { - ch_output, - ch_error, - ch_process_list, - ch_file, - ch_file_requests, - }, - }, - Broker { - exec_started_at, - exec_finished_at, - outputs, - errors, - process_lists, - files, - file_requests, - }, - ) - } - - /* - * Run an Eldritch tome, returning an error if it fails. - * Output from the tome is exposed via channels, see `reported_output`, `reported_process_list`, and `reported_files`. - */ - pub fn run(&self, tome: Tome) { - match self.report_exec_started_at() { + let handle = tokio::task::spawn_blocking(move || { + // Send exec_started_at + let start = Utc::now(); + match tx_exec_started_at.send(Timestamp { + seconds: start.timestamp(), + nanos: start.timestamp_subsec_nanos() as i32, + }) { Ok(_) => {} Err(_err) => { #[cfg(debug_assertions)] - log::error!("failed to send exec_started_at: {_err}"); + log::error!("failed to send exec_started_at (tome={:?}): {}", tome, _err); } } - match self.run_impl(tome) { - Ok(_) => {} - Err(err) => match self.client.report_error(err) { - Ok(_) => {} - Err(_send_err) => { - #[cfg(debug_assertions)] - log::error!("failed to send error: {_send_err}"); + #[cfg(debug_assertions)] + log::info!("evaluating tome: {:?}", tome); + + // Run Tome + match run_impl(env, &tome) { + Ok(_) => { + #[cfg(debug_assertions)] + log::info!("tome evaluation successful (tome={:?})", tome); + } + Err(err) => { + #[cfg(debug_assertions)] + log::info!("tome evaluation failed (tome={:?}): {}", tome, err); + + // Report evaluation errors + match tx_error.send(err) { + Ok(_) => {} + Err(_send_err) => { + #[cfg(debug_assertions)] + log::error!( + "failed to report tome evaluation error (tome={:?}): {}", + tome, + _send_err + ); + } } - }, - } + } + }; - match self.report_exec_finished_at() { + // Send exec_finished_at + let end = Utc::now(); + match tx_exec_finished_at.send(Timestamp { + seconds: end.timestamp(), + nanos: end.timestamp_subsec_nanos() as i32, + }) { Ok(_) => {} Err(_err) => { #[cfg(debug_assertions)] - log::error!("failed to send exec_finished_at: {_err}"); + log::error!( + "failed to send exec_finished_at (tome={:?}): {}", + tome, + _err + ); } } + }); + + Runtime { + handle: Some(handle), + rx_exec_started_at, + rx_exec_finished_at, + rx_error, + rx_output, + rx_process_list, + rx_file, + rx_file_request, } +} - fn run_impl(&self, tome: Tome) -> Result<()> { - let ast = Runtime::parse(&tome)?; - let module = Runtime::alloc_module(&tome)?; - let globals = Runtime::globals(); +fn run_impl(env: Environment, tome: &Tome) -> Result<()> { + let ast = Runtime::parse(tome).context("failed to parse tome")?; + let module = Runtime::alloc_module(tome).context("failed to allocate module")?; + let globals = Runtime::globals(); + let mut eval: Evaluator = Evaluator::new(&module); + eval.extra = Some(&env); + eval.set_print_handler(&env); - let mut eval: Evaluator = Evaluator::new(&module); - eval.extra = Some(&self.client); - eval.set_print_handler(self); + eval.eval_module(ast, &globals) + .context("failed to evaluate tome")?; - match eval.eval_module(ast, &globals) { - Ok(_) => Ok(()), - Err(_err) => { - #[cfg(debug_assertions)] - log::error!("tome execution failed: {_err}"); - Err(_err) - } - } - } + Ok(()) +} +/* + * Eldritch Runtime + * + * This runtime is responsible for executing Tomes and reporting their output. + * It acts as an interface between callers and starlark, exposing our standard libraries to the starlark interpreter. + * It is also used to provide dependency injection for eldritch library functions (using `Runtime::from_extra(starlark_interpreter.extra)`). + */ +pub struct Runtime { + handle: Option>, + rx_exec_started_at: Receiver, + // stdout_reporting: bool, + // exec_started_at: Receiver, + rx_exec_finished_at: Receiver, + rx_output: Receiver, + rx_error: Receiver, + rx_process_list: Receiver, + rx_file: Receiver, + rx_file_request: Receiver, + // client: Client, +} + +impl Runtime { /* * Globals available to eldritch code. * This provides all of our starlark standard libraries. @@ -163,15 +183,8 @@ impl Runtime { /* * Parse an Eldritch tome into a starlark Abstract Syntax Tree (AST) Module. */ - fn parse(tome: &Tome) -> Result { - match AstModule::parse("main", tome.eldritch.to_string(), &Dialect::Extended) { - Ok(res) => Ok(res), - Err(err) => Err(anyhow::anyhow!( - "[eldritch] Unable to parse eldritch tome: {}: {}", - err.to_string(), - tome.eldritch.to_string(), - )), - } + fn parse(tome: &Tome) -> anyhow::Result { + AstModule::parse("main", tome.eldritch.to_string(), &Dialect::Extended) } /* @@ -206,47 +219,80 @@ impl Runtime { } /* - * Print execution results to stdout as they become available. + * Returns true if the tome has completed execution, false otherwise. */ - pub fn with_stdout_reporting(&mut self) -> &mut Self { - self.stdout_reporting = true; - self + pub fn is_finished(&self) -> bool { + match &self.handle { + Some(handle) => handle.is_finished(), + None => true, + } } /* - * Send exec_started_at timestamp. + * finish() yields until the tome has finished. */ - fn report_exec_started_at(&self) -> Result<()> { - let now = Utc::now(); - self.ch_exec_started_at.send(Timestamp { - seconds: now.timestamp(), - nanos: now.timestamp_subsec_nanos() as i32, - })?; - Ok(()) + pub async fn finish(&mut self) { + match self.handle.take() { + Some(handle) => match handle.await { + Ok(_) => {} + Err(_err) => { + #[cfg(debug_assertions)] + log::error!("failed to join runtime handle: {}", _err); + } + }, + None => { + #[cfg(debug_assertions)] + log::error!("attempted to join runtime handle which has already finished"); + } + }; } /* - * Send exec_finished_at timestamp. + * Returns the timestamp of when execution started, if available. */ - fn report_exec_finished_at(&self) -> Result<()> { - let now = Utc::now(); - self.ch_exec_finished_at.send(Timestamp { - seconds: now.timestamp(), - nanos: now.timestamp_subsec_nanos() as i32, - })?; - Ok(()) + pub fn get_exec_started_at(&self) -> Option { + drain_last(&self.rx_exec_started_at) } -} -/* - * Enables Runtime to be used as a starlark print handler. - */ -impl PrintHandler for Runtime { - fn println(&self, text: &str) -> anyhow::Result<()> { - self.client.report_output(text.to_string())?; - if self.stdout_reporting { - print!("{}", text); - } - Ok(()) + /* + * Returns the timestamp of when execution finished, if available. + */ + pub fn get_exec_finished_at(&self) -> Option { + drain_last(&self.rx_exec_finished_at) + } + + /* + * Collects all currently available reported text output. + */ + pub fn collect_text(&self) -> Vec { + drain(&self.rx_output) + } + + /* + * Collects all currently available reported errors, if any. + */ + pub fn collect_errors(&self) -> Vec { + drain(&self.rx_error) + } + + /* + * Returns all currently available reported process lists, if any. + */ + pub fn collect_process_lists(&self) -> Vec { + drain(&self.rx_process_list) + } + + /* + * Returns all currently available reported files, if any. + */ + pub fn collect_files(&self) -> Vec { + drain(&self.rx_file) + } + + /* + * Returns all FileRequests that the eldritch runtime has requested, if any. + */ + pub fn collect_file_requests(&self) -> Vec { + drain(&self.rx_file_request) } } diff --git a/implants/lib/eldritch/src/runtime/mod.rs b/implants/lib/eldritch/src/runtime/mod.rs index 1b6700cd6..e5c164cdd 100644 --- a/implants/lib/eldritch/src/runtime/mod.rs +++ b/implants/lib/eldritch/src/runtime/mod.rs @@ -1,14 +1,13 @@ -mod broker; -mod client; +mod drain; +mod environment; mod exec; -pub use broker::Broker; -pub use client::{Client, FileRequest}; -pub use exec::Runtime; +pub use environment::{Environment, FileRequest}; +pub use exec::{start, Runtime}; #[cfg(test)] mod tests { - use crate::{pb::Tome, Runtime}; + use crate::pb::Tome; use anyhow::Error; use std::collections::HashMap; use tempfile::NamedTempFile; @@ -16,22 +15,22 @@ mod tests { macro_rules! runtime_tests { ($($name:ident: $value:expr,)*) => { $( - #[test] - fn $name() { + #[tokio::test] + async fn $name() { let tc: TestCase = $value; - let (runtime, broker) = Runtime::new(); - runtime.run(tc.tome); + let mut runtime = crate::start(tc.tome).await; + runtime.finish().await; let want_err_str = match tc.want_error { Some(err) => err.to_string(), None => "".to_string(), }; - let err_str = match broker.collect_errors().pop() { + let err_str = match runtime.collect_errors().pop() { Some(err) => err.to_string(), None => "".to_string(), }; assert_eq!(want_err_str, err_str); - assert_eq!(tc.want_output, broker.collect_text().join("")); + assert_eq!(tc.want_output, runtime.collect_text().join("")); } )* } @@ -149,18 +148,16 @@ mod tests { .replace('\\', "\\\\"); let eldritch = format!(r#"file.download("https://www.google.com/", "{path}"); print("ok")"#); - let (runtime, broker) = Runtime::new(); - let t = tokio::task::spawn_blocking(move || { - runtime.run(Tome { - eldritch, - parameters: HashMap::new(), - file_names: Vec::new(), - }); - }); - assert!(t.await.is_ok()); + let mut runtime = crate::start(Tome { + eldritch, + parameters: HashMap::new(), + file_names: Vec::new(), + }) + .await; + runtime.finish().await; - let out = broker.collect_text(); - let err = broker.collect_errors().pop(); + let out = runtime.collect_text(); + let err = runtime.collect_errors().pop(); assert!( err.is_none(), "failed with err {}",