From 82f4821b6bbb548f53f63135715ccd115be43080 Mon Sep 17 00:00:00 2001 From: hulto <7121375+hulto@users.noreply.github.com> Date: Tue, 23 Jan 2024 02:28:12 +0000 Subject: [PATCH 1/4] Working on testing. --- implants/imix/src/exec.rs | 66 +++++++++++++++++++------------------- implants/imix/src/tasks.rs | 56 +++++++++++++++++++++++++++++--- 2 files changed, 84 insertions(+), 38 deletions(-) diff --git a/implants/imix/src/exec.rs b/implants/imix/src/exec.rs index ca34d445c..26fabc8b5 100644 --- a/implants/imix/src/exec.rs +++ b/implants/imix/src/exec.rs @@ -13,6 +13,7 @@ pub struct AsyncTask { pub start_time: DateTime, pub grpc_task: Task, pub print_reciever: Receiver, + pub error_reciever: Receiver, } async fn handle_exec_tome( @@ -54,6 +55,7 @@ async fn handle_exec_tome( pub async fn handle_exec_timeout_and_response( task: Task, print_channel_sender: Sender, + error_channel_sender: Sender, timeout: Option, ) -> Result<(), Error> { // Tasks will be forcebly stopped after 1 week. @@ -80,7 +82,10 @@ pub async fn handle_exec_timeout_and_response( print_channel_sender .clone() .send(format!("---[RESULT]----\n{}\n---------", tome_result.0))?; - print_channel_sender + print_channel_sender // Temporary - pending UI updates + .clone() + .send(format!("---[ERROR]----\n{}\n--------", tome_result.1))?; + error_channel_sender .clone() .send(format!("---[ERROR]----\n{}\n--------", tome_result.1))?; Ok(()) @@ -88,11 +93,11 @@ pub async fn handle_exec_timeout_and_response( #[cfg(test)] mod tests { - use super::handle_exec_tome; + use super::{handle_exec_timeout_and_response, handle_exec_tome}; use anyhow::Result; use c2::pb::Task; use std::collections::HashMap; - use std::sync::mpsc::channel; + use std::sync::mpsc::{channel, Sender}; use std::time::Duration; #[test] @@ -125,51 +130,46 @@ print(sys.shell(input_params["cmd"])["stdout"]) Ok(()) } - #[test] - fn imix_handle_exec_tome_error() -> Result<()> { - let test_tome_input = Task { - id: 123, - eldritch: r#" -aoeu -"# - .to_string(), - parameters: HashMap::new(), - file_names: Vec::new(), - quest_name: "test_quest".to_string(), - }; - - let runtime: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build() - .unwrap(); - - let (sender, receiver) = channel::(); - - let exec_future = handle_exec_tome(test_tome_input, sender.clone()); - let (eld_output, eld_error) = runtime.block_on(exec_future)?; + #[tokio::test] + async fn imix_handle_exec_tome_error() -> Result<()> { + let mut task_channel_error: Vec = Vec::new(); + + let (print_sender, print_reciever) = channel::(); + let (error_sender, error_reciever) = channel::(); + let res = handle_exec_timeout_and_response( + Task { + id: 123, + eldritch: r#""#.to_string(), + parameters: HashMap::from([]), + file_names: Vec::from([]), + quest_name: "Yeet Yeet".to_string(), + }, + print_sender, + error_sender, + None, + ) + .await?; - let mut index = 0; loop { - let cmd_output = match receiver.recv_timeout(Duration::from_millis(500)) { + let new_res_line = match error_reciever.recv_timeout(Duration::from_millis(100)) { Ok(local_res_string) => local_res_string, Err(local_err) => { match local_err.to_string().as_str() { "channel is empty and sending half is closed" => { break; } - "timed out waiting on channel" => break, + "timed out waiting on channel" => { + break; + } _ => eprint!("Error: {}", local_err), } break; } }; - assert_eq!(cmd_output, "".to_string()); - - index = index + 1; + // let appended_line = format!("{}{}", res.to_owned(), new_res_line); + task_channel_error.push(new_res_line); } - assert_eq!(eld_output, "".to_string()); - assert_eq!(eld_error, "[eldritch] Eldritch eval_module failed:\nerror: Variable `aoeu` not found\n --> 123:2:1\n |\n2 | aoeu\n | ^^^^\n |\n".to_string()); Ok(()) } diff --git a/implants/imix/src/tasks.rs b/implants/imix/src/tasks.rs index 772020afb..5d8c61ace 100644 --- a/implants/imix/src/tasks.rs +++ b/implants/imix/src/tasks.rs @@ -8,7 +8,7 @@ use anyhow::{Context, Result}; use c2::pb::c2_manual_client::TavernClient; use c2::pb::{ Agent, Beacon, ClaimTasksRequest, Host, ReportTaskOutputRequest, ReportTaskOutputResponse, - Task, TaskOutput, + Task, TaskError, TaskOutput, }; use chrono::Utc; use std::sync::mpsc::channel; @@ -63,8 +63,13 @@ pub async fn start_new_tasks( eprintln!("Launching:\n{:?}", task.clone().eldritch); let (sender, receiver) = channel::(); - let exec_with_timeout = - handle_exec_timeout_and_response(task.clone(), sender.clone(), None); + let (error_sender, error_receiver) = channel::(); + let exec_with_timeout = handle_exec_timeout_and_response( + task.clone(), + sender.clone(), + error_sender.clone(), + None, + ); #[cfg(debug_assertions)] eprintln!( @@ -80,6 +85,7 @@ pub async fn start_new_tasks( start_time: Utc::now(), grpc_task: task.clone(), print_reciever: receiver, + error_reciever: error_receiver, }, ) { Some(_old_task) => { @@ -109,6 +115,7 @@ fn queue_task_output( loop_start_time: Instant, ) { let mut task_channel_output: Vec = Vec::new(); + let mut task_channel_error: Vec = Vec::new(); loop { #[cfg(debug_assertions)] @@ -139,6 +146,35 @@ fn queue_task_output( task_channel_output.push(new_res_line); } + loop { + #[cfg(debug_assertions)] + eprintln!( + "[{}]: Task # {} recieving output", + (Instant::now() - loop_start_time).as_millis(), + task_id + ); + let new_res_line = match async_task + .error_reciever + .recv_timeout(Duration::from_millis(100)) + { + Ok(local_res_string) => local_res_string, + Err(local_err) => { + match local_err.to_string().as_str() { + "channel is empty and sending half is closed" => { + break; + } + "timed out waiting on channel" => { + break; + } + _ => eprint!("Error: {}", local_err), + } + break; + } + }; + // let appended_line = format!("{}{}", res.to_owned(), new_res_line); + task_channel_error.push(new_res_line); + } + let task_is_finished = async_task.future_join_handle.is_finished(); let task_response_exec_finished_at = match task_is_finished { true => Some(Utc::now()), @@ -147,6 +183,14 @@ fn queue_task_output( // If the task is finished or there's new data queue a new task result. if task_is_finished || task_channel_output.len() > 0 { + let task_error = if task_channel_error.len() > 0 { + Some(TaskError { + msg: task_channel_error.join(""), + }) + } else { + None + }; + let task_response = TaskOutput { id: async_task.grpc_task.id.clone(), exec_started_at: Some(prost_types::Timestamp { @@ -161,12 +205,14 @@ fn queue_task_output( None => None, }, output: task_channel_output.join(""), - error: None, + error: task_error, }; running_task_res_map .entry(task_id) - .and_modify(|cur_list| cur_list.push(task_response.clone())) + .and_modify(|cur_list| { + cur_list.push(task_response.clone()); + }) .or_insert(vec![task_response]); } } From 885b7c40c9eb9c9f2a65d6a1a7e7a69d1fb762e8 Mon Sep 17 00:00:00 2001 From: hulto <7121375+hulto@users.noreply.github.com> Date: Tue, 23 Jan 2024 02:33:43 +0000 Subject: [PATCH 2/4] Test works. --- implants/imix/src/exec.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/implants/imix/src/exec.rs b/implants/imix/src/exec.rs index 26fabc8b5..5709d6c19 100644 --- a/implants/imix/src/exec.rs +++ b/implants/imix/src/exec.rs @@ -85,9 +85,7 @@ pub async fn handle_exec_timeout_and_response( print_channel_sender // Temporary - pending UI updates .clone() .send(format!("---[ERROR]----\n{}\n--------", tome_result.1))?; - error_channel_sender - .clone() - .send(format!("---[ERROR]----\n{}\n--------", tome_result.1))?; + error_channel_sender.clone().send(tome_result.1)?; Ok(()) } @@ -139,10 +137,10 @@ print(sys.shell(input_params["cmd"])["stdout"]) let res = handle_exec_timeout_and_response( Task { id: 123, - eldritch: r#""#.to_string(), + eldritch: r#"print('okay')"#.to_string(), parameters: HashMap::from([]), file_names: Vec::from([]), - quest_name: "Yeet Yeet".to_string(), + quest_name: "Poggers".to_string(), }, print_sender, error_sender, @@ -170,6 +168,7 @@ print(sys.shell(input_params["cmd"])["stdout"]) task_channel_error.push(new_res_line); } + assert_eq!(task_channel_error.join(""), "".to_string()); Ok(()) } From 9d2f2cfdf08d2dd5b01aa763b8cb51c81414c2e5 Mon Sep 17 00:00:00 2001 From: hulto <7121375+hulto@users.noreply.github.com> Date: Tue, 23 Jan 2024 02:52:23 +0000 Subject: [PATCH 3/4] Lil refactor. --- implants/imix/src/exec.rs | 37 +++++--------- implants/imix/src/tasks.rs | 98 +++++++++++++------------------------- 2 files changed, 43 insertions(+), 92 deletions(-) diff --git a/implants/imix/src/exec.rs b/implants/imix/src/exec.rs index 5709d6c19..80a7d544a 100644 --- a/implants/imix/src/exec.rs +++ b/implants/imix/src/exec.rs @@ -91,11 +91,13 @@ pub async fn handle_exec_timeout_and_response( #[cfg(test)] mod tests { + use crate::tasks::drain_sender; + use super::{handle_exec_timeout_and_response, handle_exec_tome}; use anyhow::Result; use c2::pb::Task; use std::collections::HashMap; - use std::sync::mpsc::{channel, Sender}; + use std::sync::mpsc::channel; use std::time::Duration; #[test] @@ -130,14 +132,14 @@ print(sys.shell(input_params["cmd"])["stdout"]) #[tokio::test] async fn imix_handle_exec_tome_error() -> Result<()> { - let mut task_channel_error: Vec = Vec::new(); - let (print_sender, print_reciever) = channel::(); let (error_sender, error_reciever) = channel::(); - let res = handle_exec_timeout_and_response( + let _res = handle_exec_timeout_and_response( Task { id: 123, - eldritch: r#"print('okay')"#.to_string(), + eldritch: r#"print(no_var) +"# + .to_string(), parameters: HashMap::from([]), file_names: Vec::from([]), quest_name: "Poggers".to_string(), @@ -148,27 +150,10 @@ print(sys.shell(input_params["cmd"])["stdout"]) ) .await?; - loop { - let new_res_line = match error_reciever.recv_timeout(Duration::from_millis(100)) { - Ok(local_res_string) => local_res_string, - Err(local_err) => { - match local_err.to_string().as_str() { - "channel is empty and sending half is closed" => { - break; - } - "timed out waiting on channel" => { - break; - } - _ => eprint!("Error: {}", local_err), - } - break; - } - }; - // let appended_line = format!("{}{}", res.to_owned(), new_res_line); - task_channel_error.push(new_res_line); - } - - assert_eq!(task_channel_error.join(""), "".to_string()); + let task_channel_error = drain_sender(&error_reciever)?; + let _task_channel_output = drain_sender(&print_reciever)?; + + assert!(task_channel_error.contains(&"Variable `no_var` not found".to_string())); Ok(()) } diff --git a/implants/imix/src/tasks.rs b/implants/imix/src/tasks.rs index 5d8c61ace..b645a2768 100644 --- a/implants/imix/src/tasks.rs +++ b/implants/imix/src/tasks.rs @@ -11,10 +11,34 @@ use c2::pb::{ Task, TaskError, TaskOutput, }; use chrono::Utc; -use std::sync::mpsc::channel; +use std::sync::mpsc::{channel, Receiver}; use tokio::task; use tonic::Status; +pub fn drain_sender(reciever: &Receiver) -> Result { + let mut channel_res: Vec = Vec::new(); + loop { + let new_res_line = match reciever.recv_timeout(Duration::from_millis(100)) { + Ok(local_res_string) => local_res_string, + Err(local_err) => { + match local_err.to_string().as_str() { + "channel is empty and sending half is closed" => { + break; + } + "timed out waiting on channel" => { + break; + } + _ => eprint!("Error: {}", local_err), + } + break; + } + }; + // let appended_line = format!("{}{}", res.to_owned(), new_res_line); + channel_res.push(new_res_line); + } + Ok(channel_res.join("")) +} + pub async fn get_new_tasks( agent_properties: AgentProperties, imix_config: Config, @@ -112,68 +136,9 @@ fn queue_task_output( async_task: &AsyncTask, task_id: TaskID, running_task_res_map: &mut HashMap>, - loop_start_time: Instant, -) { - let mut task_channel_output: Vec = Vec::new(); - let mut task_channel_error: Vec = Vec::new(); - - loop { - #[cfg(debug_assertions)] - eprintln!( - "[{}]: Task # {} recieving output", - (Instant::now() - loop_start_time).as_millis(), - task_id - ); - let new_res_line = match async_task - .print_reciever - .recv_timeout(Duration::from_millis(100)) - { - Ok(local_res_string) => local_res_string, - Err(local_err) => { - match local_err.to_string().as_str() { - "channel is empty and sending half is closed" => { - break; - } - "timed out waiting on channel" => { - break; - } - _ => eprint!("Error: {}", local_err), - } - break; - } - }; - // let appended_line = format!("{}{}", res.to_owned(), new_res_line); - task_channel_output.push(new_res_line); - } - - loop { - #[cfg(debug_assertions)] - eprintln!( - "[{}]: Task # {} recieving output", - (Instant::now() - loop_start_time).as_millis(), - task_id - ); - let new_res_line = match async_task - .error_reciever - .recv_timeout(Duration::from_millis(100)) - { - Ok(local_res_string) => local_res_string, - Err(local_err) => { - match local_err.to_string().as_str() { - "channel is empty and sending half is closed" => { - break; - } - "timed out waiting on channel" => { - break; - } - _ => eprint!("Error: {}", local_err), - } - break; - } - }; - // let appended_line = format!("{}{}", res.to_owned(), new_res_line); - task_channel_error.push(new_res_line); - } +) -> Result<()> { + let task_channel_output = drain_sender(&async_task.print_reciever)?; + let task_channel_error = drain_sender(&async_task.error_reciever)?; let task_is_finished = async_task.future_join_handle.is_finished(); let task_response_exec_finished_at = match task_is_finished { @@ -185,7 +150,7 @@ fn queue_task_output( if task_is_finished || task_channel_output.len() > 0 { let task_error = if task_channel_error.len() > 0 { Some(TaskError { - msg: task_channel_error.join(""), + msg: task_channel_error, }) } else { None @@ -204,7 +169,7 @@ fn queue_task_output( }), None => None, }, - output: task_channel_output.join(""), + output: task_channel_output, error: task_error, }; @@ -215,6 +180,7 @@ fn queue_task_output( }) .or_insert(vec![task_response]); } + Ok(()) } pub async fn submit_task_output( @@ -235,7 +201,7 @@ pub async fn submit_task_output( ); // Loop over each line of output from the task and append it the the channel output. - queue_task_output(async_task, *task_id, running_task_res_map, loop_start_time); + let _ = queue_task_output(async_task, *task_id, running_task_res_map); } // Iterate over queued task results and send them back to the server From 7d4bc274aaeee2f6d8ed2025ab99a20cea083789 Mon Sep 17 00:00:00 2001 From: hulto <7121375+hulto@users.noreply.github.com> Date: Tue, 23 Jan 2024 03:04:18 +0000 Subject: [PATCH 4/4] Fix sus thing --- implants/imix/src/tasks.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/implants/imix/src/tasks.rs b/implants/imix/src/tasks.rs index b645a2768..085b23f8c 100644 --- a/implants/imix/src/tasks.rs +++ b/implants/imix/src/tasks.rs @@ -201,7 +201,7 @@ pub async fn submit_task_output( ); // Loop over each line of output from the task and append it the the channel output. - let _ = queue_task_output(async_task, *task_id, running_task_res_map); + queue_task_output(async_task, *task_id, running_task_res_map)?; } // Iterate over queued task results and send them back to the server