Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
312 changes: 160 additions & 152 deletions Cargo.lock

Large diffs are not rendered by default.

26 changes: 10 additions & 16 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,15 @@ codegen-units = 1
lto = true
opt-level = "z"
panic = "abort"
strip = "none"
debug = 1

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[patch."https://github.com/DataDog/libdatadog.git"]
libdd-trace-utils = { path = "/Users/bryan.english/libdatadog-local/libdd-trace-utils" }
libdd-capabilities = { path = "/Users/bryan.english/libdatadog-local/libdd-capabilities" }
libdd-data-pipeline = { path = "/Users/bryan.english/libdatadog-local/libdd-data-pipeline" }
libdd-trace-protobuf = { path = "/Users/bryan.english/libdatadog-local/libdd-trace-protobuf" }
libdd-trace-stats = { path = "/Users/bryan.english/libdatadog-local/libdd-trace-stats" }
libdd-common = { path = "/Users/bryan.english/libdatadog-local/libdd-common" }
libdd-tinybytes = { path = "/Users/bryan.english/libdatadog-local/libdd-tinybytes" }
libdd-trace-normalization = { path = "/Users/bryan.english/libdatadog-local/libdd-trace-normalization" }
libdd-dogstatsd-client = { path = "/Users/bryan.english/libdatadog-local/libdd-dogstatsd-client" }
libdd-ddsketch = { path = "/Users/bryan.english/libdatadog-local/libdd-ddsketch" }
# strip = "none"
debug = true


[patch.'https://github.com/DataDog/libdatadog.git']
libdd-capabilities = { path = "../libdatadog_node_native_spans_worktree/libdd-capabilities" }
libdd-capabilities-impl = { path = "../libdatadog_node_native_spans_worktree/libdd-capabilities-impl" }
libdd-data-pipeline = { path = "../libdatadog_node_native_spans_worktree/libdd-data-pipeline" }
libdd-trace-utils = { path = "../libdatadog_node_native_spans_worktree/libdd-trace-utils" }
libdd-trace-stats = { path = "../libdatadog_node_native_spans_worktree/libdd-trace-stats" }
libdd-trace-protobuf = { path = "../libdatadog_node_native_spans_worktree/libdd-trace-protobuf" }
libdd-library-config = { path = "../libdatadog_node_native_spans_worktree/libdd-library-config" }
4 changes: 4 additions & 0 deletions change-buffer-build.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
rm -f Cargo.lock
rm -rf prebuilds/*
npm run build-wasm
npx napi build --platform -p pipeline-native -o prebuilds/fastline --release
1 change: 0 additions & 1 deletion crates/capabilities/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ crate-type = ["rlib"]
wasm-bindgen = "0.2"
wasm-bindgen-futures = "0.4"
js-sys = "0.3"
serde_json = "1.0"
http = "1"
bytes = "1.4"
futures-core = "0.3"
Expand Down
165 changes: 109 additions & 56 deletions crates/capabilities/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,43 @@
//! The JS transport is imported via `wasm_bindgen(module = ...)` from
//! `http_transport.js`, which ships alongside the wasm output.

use std::collections::HashMap;
use std::future::Future;
use std::io::Write as _;
use std::sync::LazyLock;

use bytes::Bytes;
use js_sys;
use http::{HeaderMap, HeaderName, HeaderValue};
use js_sys::{self, Array, JsString, Number, Uint8Array};
use wasm_bindgen::prelude::*;
use wasm_bindgen_futures::JsFuture;

use libdd_capabilities::http::{HttpClientTrait, HttpError};
use libdd_capabilities::maybe_send::MaybeSend;

static WASM_MEMORY: LazyLock<JsValue> = LazyLock::new(|| wasm_bindgen::memory());

#[wasm_bindgen(module = "/src/http_transport.js")]
extern "C" {
#[wasm_bindgen(js_name = "httpRequest")]
fn http_request(
method: &str,
url: &str,
headers_json: &str,
body: &[u8],
host: &str,
port: u16,
is_https: bool,
head_ptr: *const u8,
head_len: u32,
body_ptr: *const u8,
body_len: u32,
wasm_memory: &JsValue,
) -> js_sys::Promise;

#[wasm_bindgen(js_name = "setStorage")]
pub fn set_storage(new_storage: &JsValue);
}

/// Wasm [`HttpClientTrait`] implementation that delegates to Node.js HTTP.
///
/// Named `DefaultHttpClient` to match the native version's public API.
#[derive(Debug, Clone)]
pub struct DefaultHttpClient;

impl HttpClientTrait for DefaultHttpClient {
Expand All @@ -44,77 +56,118 @@ impl HttpClientTrait for DefaultHttpClient {
req: http::Request<Bytes>,
) -> impl Future<Output = Result<http::Response<Bytes>, HttpError>> + MaybeSend {
async move {
let method = req.method().as_str().to_owned();
let url = req.uri().to_string();
let headers_json = serialize_headers(req.headers())?;
let scheme = req.uri().scheme_str().unwrap_or("http");
let is_https = scheme == "https";
let host = req
.uri()
.host()
.ok_or_else(|| HttpError::InvalidRequest(anyhow::anyhow!("missing host in URI")))?
.to_owned();
let port = req
.uri()
.port_u16()
.unwrap_or(if is_https { 443 } else { 80 });

let head = serialize_request_head(&req, &host, port, is_https)?;
let body = req.into_body();

let result = JsFuture::from(http_request(&method, &url, &headers_json, &body))
.await
.map_err(|e| HttpError::Network(anyhow::anyhow!("{:?}", e)))?;

let status = js_sys::Reflect::get(&result, &JsValue::from_str("status"))
.map_err(|_| HttpError::Other(anyhow::anyhow!("missing status in response")))?
let result = JsFuture::from(http_request(
&host,
port,
is_https,
head.as_ptr(),
head.len() as u32,
body.as_ptr(),
body.len() as u32,
WASM_MEMORY.as_ref(),
))
.await
.map_err(|e| HttpError::Network(anyhow::anyhow!("{:?}", e)))?;

let result: js_sys::ArrayTuple<(Number, Array<JsString>, Uint8Array)> =
js_sys::ArrayTuple::unchecked_from_js(result);

let status = result
.get0()
.as_f64()
.ok_or_else(|| HttpError::Other(anyhow::anyhow!("status is not a number")))?
as u16;

let headers = parse_response_headers(&result)?;

let body_js = js_sys::Reflect::get(&result, &JsValue::from_str("body"))
.map_err(|_| HttpError::Other(anyhow::anyhow!("missing body in response")))?;
let headers = parse_response_headers(result.get1())?;

let body = if body_js.is_undefined() || body_js.is_null() {
Bytes::new()
} else {
Bytes::from(js_sys::Uint8Array::new(&body_js).to_vec())
};
let body = Bytes::from(result.get2().to_vec());

let mut builder = http::Response::builder().status(status);
for (name, value) in &headers {
builder = builder.header(name.as_str(), value.as_str());
}
builder
.body(body)
.map_err(|e| HttpError::Other(e.into()))
*builder.headers_mut().unwrap() = headers;
builder.body(body).map_err(|e| HttpError::Other(e.into()))
}
}

}

/// Parse response headers from a JS object `{ "header-name": "value", ... }`.
///
/// Node.js `res.headers` returns lowercased header names with string values.
fn parse_response_headers(result: &JsValue) -> Result<Vec<(String, String)>, HttpError> {
let headers_js = js_sys::Reflect::get(result, &JsValue::from_str("headers"))
.map_err(|_| HttpError::Other(anyhow::anyhow!("missing headers in response")))?;

if headers_js.is_undefined() || headers_js.is_null() {
return Ok(Vec::new());
}

let entries = js_sys::Object::entries(&js_sys::Object::unchecked_from_js(headers_js));
let mut headers = Vec::with_capacity(entries.length() as usize);
for i in 0..entries.length() {
let entry = js_sys::Array::from(&entries.get(i));
if let (Some(key), Some(value)) = (entry.get(0).as_string(), entry.get(1).as_string()) {
headers.push((key, value));
fn parse_response_headers(header_js: Array<JsString>) -> Result<HeaderMap, HttpError> {
let len = header_js.length() as usize;
let mut headers = HeaderMap::with_capacity(len / 2);
for i in 0..(len / 2) {
let key = header_js.get((i * 2) as u32).as_string();
let val = header_js.get((i * 2 + 1) as u32).as_string();
if let (Some(key), Some(val)) = (key, val) {
headers.insert(
HeaderName::from_bytes(key.as_bytes()).unwrap(),
HeaderValue::from_maybe_shared(Bytes::from(val)).unwrap(),
Comment on lines +118 to +119
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Replace header parsing unwraps with fallible handling

Response header parsing now calls unwrap() on both HeaderName::from_bytes(...) and HeaderValue::from_maybe_shared(...); any non-conformant upstream header (invalid token/value bytes) will panic this request path instead of returning an HttpError. This is a regression from the previous non-panicking behavior and can crash or trap the WASM execution when talking to unexpected or malformed HTTP servers.

Useful? React with 👍 / 👎.

);
}
}
Ok(headers)
}

fn serialize_headers(headers: &http::HeaderMap) -> Result<String, HttpError> {
let mut map: HashMap<&str, Vec<&str>> = HashMap::new();
/// Serialize the full HTTP/1.1 request head (request line + Host + Content-Length
/// + user headers + terminating CRLF) into a contiguous byte buffer.
///
/// The buffer is handed to JS by pointer; JS assigns it to
/// `req._header`, bypassing Node's `_storeHeader` serialization.
fn serialize_request_head(
req: &http::Request<Bytes>,
host: &str,
port: u16,
is_https: bool,
) -> Result<Vec<u8>, HttpError> {
let method = req.method().as_str();
let path_and_query = req
.uri()
.path_and_query()
.map(|pq| pq.as_str())
.unwrap_or("/");
let body_len = req.body().len();
let headers = req.headers();

let mut buf = Vec::with_capacity(256 + headers.len() * 64);

buf.extend_from_slice(method.as_bytes());
buf.push(b' ');
buf.extend_from_slice(path_and_query.as_bytes());
buf.extend_from_slice(b" HTTP/1.1\r\n");

buf.extend_from_slice(b"Host: ");
buf.extend_from_slice(host.as_bytes());
let default_port = if is_https { 443 } else { 80 };
if port != default_port {
write!(&mut buf, ":{port}").map_err(|e| HttpError::Other(e.into()))?;
}
buf.extend_from_slice(b"\r\n");

write!(&mut buf, "Content-Length: {body_len}\r\n").map_err(|e| HttpError::Other(e.into()))?;

for (name, value) in headers.iter() {
map.entry(name.as_str())
.or_default()
.push(value.to_str().unwrap_or(""));
buf.extend_from_slice(name.as_str().as_bytes());
buf.extend_from_slice(b": ");
buf.extend_from_slice(value.as_bytes());
buf.extend_from_slice(b"\r\n");
}
let flat: HashMap<&str, String> = map
.into_iter()
.map(|(k, v)| (k, v.join(", ")))
.collect();
serde_json::to_string(&flat)
.map_err(|e| HttpError::InvalidRequest(e.into()))

buf.extend_from_slice(b"\r\n");

Ok(buf)
}
88 changes: 60 additions & 28 deletions crates/capabilities/src/http_transport.js
Original file line number Diff line number Diff line change
@@ -1,35 +1,67 @@
const http = require('http');
const https = require('https');

module.exports.httpRequest = function (method, url, headersJson, body) {
const headers = JSON.parse(headersJson || '{}');
headers['Content-Length'] = body.length;
const parsed = new URL(url);
const transport = parsed.protocol === 'https:' ? https : http;

return new Promise((resolve, reject) => {
const req = transport.request(
{
hostname: parsed.hostname,
port: parsed.port,
path: parsed.pathname + parsed.search,
method,
headers,
},
(res) => {
const chunks = [];
res.on('data', (chunk) => chunks.push(chunk));
res.on('end', () => {
resolve({
status: res.statusCode,
headers: res.headers,
body: new Uint8Array(Buffer.concat(chunks)),
let storage = (f) => f();

module.exports.setStorage = function (new_storage) {
storage = new_storage;
}

module.exports.httpRequest = function (host, port, isHttps, head_ptr, head_len, body_ptr, body_len, wasm_memory) {
const transport = isHttps ? https : http;

function isDetachedBufferError(err) {
return err instanceof TypeError && /detached/i.test(err.message);
}

function attempt() {
return new Promise((resolve, reject) => {
storage(() => {
// wasm_memory.buffer is replaced each time WebAssembly.Memory grows, so
// the views must be recreated on every attempt against the current buffer.
const headView = new Uint8Array(wasm_memory.buffer, head_ptr, head_len);
const bodyView = new Uint8Array(wasm_memory.buffer, body_ptr, body_len);

// host/port drive socket selection; method/path/headers are placeholders
// because we replace the rendered head below.
const req = transport.request({ host, port, method: 'POST', path: '/' }, (res) => {
const chunks = [];
res.on('data', (chunk) => chunks.push(chunk));
res.on('end', () => {
const body = Buffer.concat(chunks)
resolve([
res.statusCode,
res.rawHeaders,
new Uint8Array(body.buffer),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve Buffer bounds when returning response body

Buffer.concat(chunks) can return a pooled Buffer whose byteOffset/byteLength cover only part of its underlying ArrayBuffer; constructing new Uint8Array(body.buffer) exposes the entire backing store instead of just the response bytes. In those cases the Rust side receives extra stale bytes (and potentially unrelated memory), corrupting HTTP response payloads for small responses. Return a view that respects the buffer slice (e.g., with offset/length) before passing it across WASM.

Useful? React with 👍 / 👎.

]);
});
});
req.on('error', reject);

// Bypass Node's headers: the Rust side has already produced the full
// request head in HTTP/1.1 wire format. Setting _header before write()
// makes write/end skip _implicitHeader and _send prepends our bytes.

try {
req._header = Buffer.from(headView);
req.write(bodyView);
req.end();
} catch (err) {
reject(err);
}
})
});
}

function attemptWithRetry() {
return attempt().catch((err) => {
process.stderr.write("httpRequest error: " + err + "\n")
if (isDetachedBufferError(err)) {
return attemptWithRetry();
}
);
req.on('error', reject);
req.write(body);
req.end();
});
throw err;
});
}

return attemptWithRetry();
};
8 changes: 7 additions & 1 deletion crates/pipeline/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ impl WasmSpanState {
.set_tracer_version(tracer_version)
.set_language(lang)
.set_language_version(lang_version)
.set_language_interpreter(lang_interpreter);
.set_language_interpreter(lang_interpreter)
.enable_agent_rates_payload_version();

let mut change_queue = vec![0u8; change_queue_size as usize];
let change_buffer =
Expand Down Expand Up @@ -475,3 +476,8 @@ pub fn get_op_codes() -> JsValue {
}
obj.into()
}

#[wasm_bindgen(js_name = "setStorage")]
pub fn set_storage(new_storage: &JsValue) {
libdatadog_nodejs_capabilities::http::set_storage(new_storage);
}
5 changes: 4 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"build-debug": "mkdir -p target && yarn -s cargo-build > ./target/out.ndjson && yarn -s copy-artifacts",
"build-release": "mkdir -p target && yarn -s cargo-build-release > ./target/out.ndjson && yarn -s copy-artifacts",
"build-all": "mkdir -p target && yarn -s cargo-build -- --workspace > ./target/out.ndjson && yarn -s copy-artifacts && yarn -s build-wasm",
"build-wasm": "yarn -s install-wasm-pack && node scripts/build-wasm.js library_config && node scripts/build-wasm.js datadog-js-zstd && node scripts/build-wasm.js trace_exporter && node scripts/build-wasm.js pipeline",
"build-wasm": "node scripts/build-wasm.js library_config && node scripts/build-wasm.js datadog-js-zstd && node scripts/build-wasm.js trace_exporter && node scripts/build-wasm.js pipeline",
"cargo-build-release": "yarn -s cargo-build -- --release",
"cargo-build": "cargo build --message-format=json-render-diagnostics",
"copy-artifacts": "node ./scripts/copy-artifacts",
Expand All @@ -27,5 +27,8 @@
"homepage": "https://github.com/DataDog/libdatadog-nodejs#readme",
"publishConfig": {
"access": "public"
},
"dependencies": {
"@napi-rs/cli": "^3.6.0"
}
}
Loading