diff --git a/implants/imix/src/exec.rs b/implants/imix/src/exec.rs index ca34d445c..80a7d544a 100644 --- a/implants/imix/src/exec.rs +++ b/implants/imix/src/exec.rs @@ -13,6 +13,7 @@ pub struct AsyncTask { pub start_time: DateTime, pub grpc_task: Task, pub print_reciever: Receiver, + pub error_reciever: Receiver, } async fn handle_exec_tome( @@ -54,6 +55,7 @@ async fn handle_exec_tome( pub async fn handle_exec_timeout_and_response( task: Task, print_channel_sender: Sender, + error_channel_sender: Sender, timeout: Option, ) -> Result<(), Error> { // Tasks will be forcebly stopped after 1 week. @@ -80,15 +82,18 @@ pub async fn handle_exec_timeout_and_response( print_channel_sender .clone() .send(format!("---[RESULT]----\n{}\n---------", tome_result.0))?; - print_channel_sender + print_channel_sender // Temporary - pending UI updates .clone() .send(format!("---[ERROR]----\n{}\n--------", tome_result.1))?; + error_channel_sender.clone().send(tome_result.1)?; Ok(()) } #[cfg(test)] mod tests { - use super::handle_exec_tome; + use crate::tasks::drain_sender; + + use super::{handle_exec_timeout_and_response, handle_exec_tome}; use anyhow::Result; use c2::pb::Task; use std::collections::HashMap; @@ -125,51 +130,30 @@ print(sys.shell(input_params["cmd"])["stdout"]) Ok(()) } - #[test] - fn imix_handle_exec_tome_error() -> Result<()> { - let test_tome_input = Task { - id: 123, - eldritch: r#" -aoeu + #[tokio::test] + async fn imix_handle_exec_tome_error() -> Result<()> { + let (print_sender, print_reciever) = channel::(); + let (error_sender, error_reciever) = channel::(); + let _res = handle_exec_timeout_and_response( + Task { + id: 123, + eldritch: r#"print(no_var) "# - .to_string(), - parameters: HashMap::new(), - file_names: Vec::new(), - quest_name: "test_quest".to_string(), - }; - - let runtime: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build() - .unwrap(); - - let (sender, receiver) = channel::(); + .to_string(), + parameters: HashMap::from([]), + file_names: Vec::from([]), + quest_name: "Poggers".to_string(), + }, + print_sender, + error_sender, + None, + ) + .await?; - let exec_future = handle_exec_tome(test_tome_input, sender.clone()); - let (eld_output, eld_error) = runtime.block_on(exec_future)?; + let task_channel_error = drain_sender(&error_reciever)?; + let _task_channel_output = drain_sender(&print_reciever)?; - let mut index = 0; - loop { - let cmd_output = match receiver.recv_timeout(Duration::from_millis(500)) { - Ok(local_res_string) => local_res_string, - Err(local_err) => { - match local_err.to_string().as_str() { - "channel is empty and sending half is closed" => { - break; - } - "timed out waiting on channel" => break, - _ => eprint!("Error: {}", local_err), - } - break; - } - }; - assert_eq!(cmd_output, "".to_string()); - - index = index + 1; - } - - assert_eq!(eld_output, "".to_string()); - assert_eq!(eld_error, "[eldritch] Eldritch eval_module failed:\nerror: Variable `aoeu` not found\n --> 123:2:1\n |\n2 | aoeu\n | ^^^^\n |\n".to_string()); + assert!(task_channel_error.contains(&"Variable `no_var` not found".to_string())); Ok(()) } diff --git a/implants/imix/src/tasks.rs b/implants/imix/src/tasks.rs index 772020afb..085b23f8c 100644 --- a/implants/imix/src/tasks.rs +++ b/implants/imix/src/tasks.rs @@ -8,13 +8,37 @@ use anyhow::{Context, Result}; use c2::pb::c2_manual_client::TavernClient; use c2::pb::{ Agent, Beacon, ClaimTasksRequest, Host, ReportTaskOutputRequest, ReportTaskOutputResponse, - Task, TaskOutput, + Task, TaskError, TaskOutput, }; use chrono::Utc; -use std::sync::mpsc::channel; +use std::sync::mpsc::{channel, Receiver}; use tokio::task; use tonic::Status; +pub fn drain_sender(reciever: &Receiver) -> Result { + let mut channel_res: Vec = Vec::new(); + loop { + let new_res_line = match reciever.recv_timeout(Duration::from_millis(100)) { + Ok(local_res_string) => local_res_string, + Err(local_err) => { + match local_err.to_string().as_str() { + "channel is empty and sending half is closed" => { + break; + } + "timed out waiting on channel" => { + break; + } + _ => eprint!("Error: {}", local_err), + } + break; + } + }; + // let appended_line = format!("{}{}", res.to_owned(), new_res_line); + channel_res.push(new_res_line); + } + Ok(channel_res.join("")) +} + pub async fn get_new_tasks( agent_properties: AgentProperties, imix_config: Config, @@ -63,8 +87,13 @@ pub async fn start_new_tasks( eprintln!("Launching:\n{:?}", task.clone().eldritch); let (sender, receiver) = channel::(); - let exec_with_timeout = - handle_exec_timeout_and_response(task.clone(), sender.clone(), None); + let (error_sender, error_receiver) = channel::(); + let exec_with_timeout = handle_exec_timeout_and_response( + task.clone(), + sender.clone(), + error_sender.clone(), + None, + ); #[cfg(debug_assertions)] eprintln!( @@ -80,6 +109,7 @@ pub async fn start_new_tasks( start_time: Utc::now(), grpc_task: task.clone(), print_reciever: receiver, + error_reciever: error_receiver, }, ) { Some(_old_task) => { @@ -106,38 +136,9 @@ fn queue_task_output( async_task: &AsyncTask, task_id: TaskID, running_task_res_map: &mut HashMap>, - loop_start_time: Instant, -) { - let mut task_channel_output: Vec = Vec::new(); - - loop { - #[cfg(debug_assertions)] - eprintln!( - "[{}]: Task # {} recieving output", - (Instant::now() - loop_start_time).as_millis(), - task_id - ); - let new_res_line = match async_task - .print_reciever - .recv_timeout(Duration::from_millis(100)) - { - Ok(local_res_string) => local_res_string, - Err(local_err) => { - match local_err.to_string().as_str() { - "channel is empty and sending half is closed" => { - break; - } - "timed out waiting on channel" => { - break; - } - _ => eprint!("Error: {}", local_err), - } - break; - } - }; - // let appended_line = format!("{}{}", res.to_owned(), new_res_line); - task_channel_output.push(new_res_line); - } +) -> Result<()> { + let task_channel_output = drain_sender(&async_task.print_reciever)?; + let task_channel_error = drain_sender(&async_task.error_reciever)?; let task_is_finished = async_task.future_join_handle.is_finished(); let task_response_exec_finished_at = match task_is_finished { @@ -147,6 +148,14 @@ fn queue_task_output( // If the task is finished or there's new data queue a new task result. if task_is_finished || task_channel_output.len() > 0 { + let task_error = if task_channel_error.len() > 0 { + Some(TaskError { + msg: task_channel_error, + }) + } else { + None + }; + let task_response = TaskOutput { id: async_task.grpc_task.id.clone(), exec_started_at: Some(prost_types::Timestamp { @@ -160,15 +169,18 @@ fn queue_task_output( }), None => None, }, - output: task_channel_output.join(""), - error: None, + output: task_channel_output, + error: task_error, }; running_task_res_map .entry(task_id) - .and_modify(|cur_list| cur_list.push(task_response.clone())) + .and_modify(|cur_list| { + cur_list.push(task_response.clone()); + }) .or_insert(vec![task_response]); } + Ok(()) } pub async fn submit_task_output( @@ -189,7 +201,7 @@ pub async fn submit_task_output( ); // Loop over each line of output from the task and append it the the channel output. - queue_task_output(async_task, *task_id, running_task_res_map, loop_start_time); + queue_task_output(async_task, *task_id, running_task_res_map)?; } // Iterate over queued task results and send them back to the server