Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,607 changes: 1,149 additions & 458 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[workspace]
resolver = "2"
default-members = [
"crates/crashtracker",
"crates/process_discovery",
Expand Down
16 changes: 16 additions & 0 deletions crates/capabilities-test/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[package]
name = "capabilities-test"
version = "0.1.0"
edition = "2021"
description = "Wasm test harness for SpawnCapability and SleepCapability"

[lib]
crate-type = ["cdylib", "rlib"]

[dependencies]
wasm-bindgen = "0.2"
wasm-bindgen-futures = "0.4"
js-sys = "0.3"
libdatadog-nodejs-capabilities = { path = "../capabilities" }
libdd-capabilities = { git = "https://github.com/DataDog/libdatadog.git", branch = "jwiriath/runtime-capability" }
console_error_panic_hook = "0.1"
90 changes: 90 additions & 0 deletions crates/capabilities-test/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

//! Wasm test harness for `SpawnCapability` and `SleepCapability`.
//!
//! Each `#[wasm_bindgen]` function exercises a specific capability and returns
//! a value the JS test can assert on.

use std::time::Duration;

use libdd_capabilities::sleep::SleepCapability;
use libdd_capabilities::spawn::SpawnCapability;
use libdatadog_nodejs_capabilities::{WasmSleepCapability, WasmSpawnCapability};
use wasm_bindgen::prelude::*;

#[wasm_bindgen(start)]
fn init() {
console_error_panic_hook::set_once();
}

/// Spawn a task that returns a fixed value and await it.
#[wasm_bindgen]
pub async fn test_spawn_returns_value() -> u32 {
let spawner = WasmSpawnCapability;
let handle = spawner.spawn(async { 42u32 });
handle.await
}

/// Sleep for `ms` milliseconds and return the actual elapsed time (ms).
/// The JS side can assert this is >= `ms`.
#[wasm_bindgen]
pub async fn test_sleep_duration_ms(ms: u32) -> f64 {
let sleeper = WasmSleepCapability;
let start = js_sys::Date::now();
sleeper.sleep(Duration::from_millis(ms as u64)).await;
js_sys::Date::now() - start
}

/// Spawn a task that sleeps then returns a greeting. Exercises both
/// capabilities together.
#[wasm_bindgen]
pub async fn test_spawn_with_sleep(ms: u32) -> String {
let spawner = WasmSpawnCapability;
let handle = spawner.spawn(async move {
let sleeper = WasmSleepCapability;
sleeper.sleep(Duration::from_millis(ms as u64)).await;
format!("slept {}ms", ms)
});
handle.await
}

/// Spawn multiple concurrent tasks and collect all results.
#[wasm_bindgen]
pub async fn test_spawn_concurrent() -> JsValue {
let spawner = WasmSpawnCapability;
let h1 = spawner.spawn(async { 1u32 });
let h2 = spawner.spawn(async { 2u32 });
let h3 = spawner.spawn(async { 3u32 });
let (r1, r2, r3) = futures_join(h1, h2, h3).await;
JsValue::from(r1 + r2 + r3)
}

async fn futures_join<A, B, C>(
a: impl core::future::Future<Output = A>,
b: impl core::future::Future<Output = B>,
c: impl core::future::Future<Output = C>,
) -> (A, B, C) {
// Manual join: spawn b and c, await a inline, then collect.
// We can't use tokio::join! here (no runtime), so we just await
// sequentially -- the tasks are already running on the event loop via
// spawn_local, so their futures resolve as soon as polled.
let a = a.await;
let b = b.await;
let c = c.await;
(a, b, c)
}

/// Spawn a task and drop the handle (cancel). Verify the main thread
/// is not blocked and we get back control immediately.
#[wasm_bindgen]
pub async fn test_spawn_cancel() -> bool {
let spawner = WasmSpawnCapability;
let handle = spawner.spawn(async {
let sleeper = WasmSleepCapability;
sleeper.sleep(Duration::from_secs(60)).await;
panic!("should have been cancelled");
});
drop(handle);
true
}
23 changes: 23 additions & 0 deletions crates/capabilities/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[package]
name = "libdatadog-nodejs-capabilities"
version = "0.1.0"
edition = "2021"
description = "Wasm capability implementations for libdatadog-nodejs (backed by JS transports)"

[lib]
crate-type = ["rlib"]

[dependencies]
wasm-bindgen = "0.2"
wasm-bindgen-futures = "0.4"
js-sys = "0.3"
serde_json = "1.0"
http = "1"
bytes = "1.4"
futures-core = "0.3"
futures-util = { version = "0.3", default-features = false, features = ["alloc", "channel"] }
anyhow = "1"
libdd-capabilities = { git = "https://github.com/DataDog/libdatadog.git", branch = "jwiriath/runtime-capability" }

[dev-dependencies]
wasm-bindgen-test = "0.3"
119 changes: 119 additions & 0 deletions crates/capabilities/src/http.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

//! Wasm implementation of [`HttpClientCapability`] backed by Node.js `http.request`.
//!
//! The JS transport is imported via `wasm_bindgen(module = ...)` from
//! `http_transport.js`, which ships alongside the wasm output.

use std::collections::HashMap;
use std::future::Future;

use bytes::Bytes;
use js_sys;
use wasm_bindgen::prelude::*;
use wasm_bindgen_futures::JsFuture;

use libdd_capabilities::http::{HttpClientCapability, HttpError};
use libdd_capabilities::maybe_send::MaybeSend;

#[wasm_bindgen(module = "/src/http_transport.js")]
extern "C" {
#[wasm_bindgen(js_name = "httpRequest")]
fn http_request(
method: &str,
url: &str,
headers_json: &str,
body: &[u8],
) -> js_sys::Promise;
}

/// Wasm [`HttpClientCapability`] implementation that delegates to Node.js HTTP.
#[derive(Clone, Debug)]
pub struct WasmHttpClient;

impl HttpClientCapability for WasmHttpClient {
fn new_client() -> Self {
Self
}

#[allow(clippy::manual_async_fn)]
fn request(
&self,
req: http::Request<Bytes>,
) -> impl Future<Output = Result<http::Response<Bytes>, HttpError>> + MaybeSend {
async move {
let method = req.method().as_str().to_owned();
let url = req.uri().to_string();
let headers_json = serialize_headers(req.headers())?;
let body = req.into_body();

let result = JsFuture::from(http_request(&method, &url, &headers_json, &body))
.await
.map_err(|e| HttpError::Network(anyhow::anyhow!("{:?}", e)))?;

let status = js_sys::Reflect::get(&result, &JsValue::from_str("status"))
.map_err(|_| HttpError::Other(anyhow::anyhow!("missing status in response")))?
.as_f64()
.ok_or_else(|| HttpError::Other(anyhow::anyhow!("status is not a number")))?
as u16;

let headers = parse_response_headers(&result)?;

let body_js = js_sys::Reflect::get(&result, &JsValue::from_str("body"))
.map_err(|_| HttpError::Other(anyhow::anyhow!("missing body in response")))?;

let body = if body_js.is_undefined() || body_js.is_null() {
Bytes::new()
} else {
Bytes::from(js_sys::Uint8Array::new(&body_js).to_vec())
};

let mut builder = http::Response::builder().status(status);
for (name, value) in &headers {
builder = builder.header(name.as_str(), value.as_str());
}
builder
.body(body)
.map_err(|e| HttpError::Other(e.into()))
}
}

}

/// Parse response headers from a JS object `{ "header-name": "value", ... }`.
///
/// Node.js `res.headers` returns lowercased header names with string values.
fn parse_response_headers(result: &JsValue) -> Result<Vec<(String, String)>, HttpError> {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dealing w/ JsValue directly can sometimes be a perf bottleneck. It's fine for now though. We can benchmark later.

Alternatives would be sending un-parsed headers (i.e. raw packet data) in here directly. Yep, that's a thing we can do.

We can also consider the network socket interface as a potential replacement layer here. Regardless, we have tons of room to figure that out and make improvements as we go, so this is all fine for now.

let headers_js = js_sys::Reflect::get(result, &JsValue::from_str("headers"))
.map_err(|_| HttpError::Other(anyhow::anyhow!("missing headers in response")))?;

if headers_js.is_undefined() || headers_js.is_null() {
return Ok(Vec::new());
}

let entries = js_sys::Object::entries(&js_sys::Object::unchecked_from_js(headers_js));
let mut headers = Vec::with_capacity(entries.length() as usize);
for i in 0..entries.length() {
let entry = js_sys::Array::from(&entries.get(i));
if let (Some(key), Some(value)) = (entry.get(0).as_string(), entry.get(1).as_string()) {
headers.push((key, value));
}
}
Ok(headers)
}

fn serialize_headers(headers: &http::HeaderMap) -> Result<String, HttpError> {
let mut map: HashMap<&str, Vec<&str>> = HashMap::new();
for (name, value) in headers.iter() {
map.entry(name.as_str())
.or_default()
.push(value.to_str().unwrap_or(""));
}
let flat: HashMap<&str, String> = map
.into_iter()
.map(|(k, v)| (k, v.join(", ")))
.collect();
serde_json::to_string(&flat)
.map_err(|e| HttpError::InvalidRequest(e.into()))
}
35 changes: 35 additions & 0 deletions crates/capabilities/src/http_transport.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
const http = require('http');
const https = require('https');

module.exports.httpRequest = function (method, url, headersJson, body) {
const headers = JSON.parse(headersJson || '{}');
headers['Content-Length'] = body.length;
const parsed = new URL(url);
const transport = parsed.protocol === 'https:' ? https : http;

return new Promise((resolve, reject) => {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a promise here is a bit of a code smell. That said, it's the easiest to map to Futures, so I'm fine with keeping it here.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean like using fetch directly that returns "natively" a Promise ? Or not using promises at all ?

const req = transport.request(
{
hostname: parsed.hostname,
port: parsed.port,
path: parsed.pathname + parsed.search,
method,
headers,
},
(res) => {
const chunks = [];
res.on('data', (chunk) => chunks.push(chunk));
res.on('end', () => {
resolve({
status: res.statusCode,
headers: res.headers,
body: new Uint8Array(Buffer.concat(chunks)),
});
});
}
);
req.on('error', reject);
req.write(body);
req.end();
});
};
63 changes: 63 additions & 0 deletions crates/capabilities/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

//! Wasm capability implementations for libdatadog-nodejs.
//!
//! `WasmCapabilities` is the bundle struct that implements all capability
//! traits using wasm_bindgen + JS transports. The wasm binding crate pins
//! this type as the generic parameter for libdatadog structs.

pub mod http;
pub mod sleep;
pub mod spawn;

use core::future::Future;
use std::time::Duration;

use futures_util::future::RemoteHandle;

pub use http::WasmHttpClient;
use libdd_capabilities::http::{HttpClientCapability, HttpError};
use libdd_capabilities::sleep::SleepCapability;
use libdd_capabilities::spawn::SpawnCapability;
use libdd_capabilities::MaybeSend;
pub use sleep::WasmSleepCapability;
pub use spawn::WasmSpawnCapability;

/// Bundle struct for wasm platform capabilities.
///
/// Delegates to [`WasmHttpClient`] for HTTP, [`WasmSleepCapability`] for
/// sleep, and [`WasmSpawnCapability`] for task spawning.
#[derive(Clone, Debug)]
pub struct WasmCapabilities;

impl HttpClientCapability for WasmCapabilities {
fn new_client() -> Self {
Self
}

fn request(
&self,
req: ::http::Request<bytes::Bytes>,
) -> impl Future<Output = Result<::http::Response<bytes::Bytes>, HttpError>> + MaybeSend {
WasmHttpClient.request(req)
}
}

impl SleepCapability for WasmCapabilities {
fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + MaybeSend {
WasmSleepCapability.sleep(duration)
}
}

impl SpawnCapability for WasmCapabilities {
type JoinHandle<T: MaybeSend + 'static> = RemoteHandle<T>;

fn spawn<F, T>(&self, future: F) -> RemoteHandle<T>
where
F: Future<Output = T> + MaybeSend + 'static,
T: MaybeSend + 'static,
{
WasmSpawnCapability.spawn(future)
}
}
32 changes: 32 additions & 0 deletions crates/capabilities/src/sleep.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

//! Wasm sleep implementation backed by JS `setTimeout`.

use core::future::Future;
use std::time::Duration;

use wasm_bindgen::prelude::*;
use wasm_bindgen_futures::JsFuture;

use libdd_capabilities::maybe_send::MaybeSend;
use libdd_capabilities::sleep::SleepCapability;

#[wasm_bindgen(module = "/src/sleep_transport.js")]
extern "C" {
#[wasm_bindgen(js_name = "sleep")]
fn js_sleep(ms: f64) -> js_sys::Promise;
}

#[derive(Clone, Debug)]
pub struct WasmSleepCapability;

impl SleepCapability for WasmSleepCapability {
#[allow(clippy::manual_async_fn)]
fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + MaybeSend {
async move {
let ms = duration.as_millis() as f64;
let _ = JsFuture::from(js_sleep(ms)).await;
}
}
}
3 changes: 3 additions & 0 deletions crates/capabilities/src/sleep_transport.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module.exports.sleep = function(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
};
Loading