From a8ae4d8050485250565a5c4d7b45d3d5ffffc500 Mon Sep 17 00:00:00 2001 From: Acelogic Date: Wed, 11 Mar 2026 16:32:32 -0400 Subject: [PATCH 1/3] fix: stream multipart uploads to avoid OOM on large files (#244) Replace the buffered file read + body copy with a streaming multipart/related body that reads the file in 64 KB chunks. Previously, uploading a file required ~4x the file size in RAM (tokio::fs::read allocates a Vec, then build_multipart_body copies it into a second growing Vec). A 5 GB upload would request ~20 GB of contiguous memory, crashing the process. The new build_multipart_stream function yields preamble, file chunks, and postamble through a futures_util::stream::unfold state machine, keeping memory usage constant regardless of file size. Content-Length is computed from file metadata so Google APIs still receive the correct header. Closes #244 --- .changeset/stream-multipart-uploads.md | 5 + src/executor.rs | 166 +++++++++++++++++++++++-- 2 files changed, 162 insertions(+), 9 deletions(-) create mode 100644 .changeset/stream-multipart-uploads.md diff --git a/.changeset/stream-multipart-uploads.md b/.changeset/stream-multipart-uploads.md new file mode 100644 index 00000000..868f7ace --- /dev/null +++ b/.changeset/stream-multipart-uploads.md @@ -0,0 +1,5 @@ +--- +"@googleworkspace/cli": patch +--- + +Stream file uploads instead of buffering entire file in memory, fixing OOM crashes on large files diff --git a/src/executor.rs b/src/executor.rs index 49101ece..2b6211b6 100644 --- a/src/executor.rs +++ b/src/executor.rs @@ -24,7 +24,7 @@ use std::path::PathBuf; use anyhow::Context; use futures_util::StreamExt; use serde_json::{json, Map, Value}; -use tokio::io::AsyncWriteExt; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; use crate::discovery::{RestDescription, RestMethod}; use crate::error::GwsError; @@ -182,17 +182,22 @@ async fn build_http_request( if input.is_upload { let upload_path = upload_path.expect("upload_path must be Some when is_upload is true"); - let file_bytes = tokio::fs::read(upload_path).await.map_err(|e| { - GwsError::Validation(format!( - "Failed to read upload file '{}': {}", - upload_path, e - )) - })?; + let file_size = tokio::fs::metadata(upload_path) + .await + .map_err(|e| { + GwsError::Validation(format!( + "Failed to read upload file '{}': {}", + upload_path, e + )) + })? + .len(); request = request.query(&[("uploadType", "multipart")]); - let (multipart_body, content_type) = build_multipart_body(&input.body, &file_bytes)?; + let (body, content_type, content_length) = + build_multipart_stream(&input.body, upload_path, file_size); request = request.header("Content-Type", content_type); - request = request.body(multipart_body); + request = request.header("Content-Length", content_length); + request = request.body(body); } else if let Some(ref body_val) = input.body { request = request.header("Content-Type", "application/json"); request = request.json(body_val); @@ -731,6 +736,7 @@ fn handle_error_response( /// Builds a multipart/related body for media upload requests. /// /// Returns the body bytes and the Content-Type header value (with boundary). +#[cfg(test)] fn build_multipart_body( metadata: &Option, file_bytes: &[u8], @@ -768,6 +774,97 @@ fn build_multipart_body( Ok((body, content_type)) } +/// Build a streaming multipart/related body for file uploads. +/// +/// Instead of reading the entire file into memory, this streams the file +/// contents from disk in 64 KB chunks, keeping memory usage constant +/// regardless of file size. Returns `(body, content_type, content_length)`. +fn build_multipart_stream( + metadata: &Option, + file_path: &str, + file_size: u64, +) -> (reqwest::Body, String, u64) { + let boundary = format!("gws_boundary_{:016x}", rand::random::()); + + let media_mime = metadata + .as_ref() + .and_then(|m| m.get("mimeType")) + .and_then(|v| v.as_str()) + .unwrap_or("application/octet-stream") + .to_string(); + + let metadata_json = metadata + .as_ref() + .map(|m| serde_json::to_string(m).unwrap_or_else(|_| "{}".to_string())) + .unwrap_or_else(|| "{}".to_string()); + + let preamble = format!( + "--{boundary}\r\n\ + Content-Type: application/json; charset=UTF-8\r\n\r\n\ + {metadata_json}\r\n\ + --{boundary}\r\n\ + Content-Type: {media_mime}\r\n\r\n" + ); + let postamble = format!("\r\n--{boundary}--\r\n"); + + let content_length = preamble.len() as u64 + file_size + postamble.len() as u64; + let content_type = format!("multipart/related; boundary={boundary}"); + + // State machine for the streaming body: preamble -> file chunks -> postamble + enum State { + Preamble { + preamble: Vec, + file_path: String, + postamble: Vec, + }, + Streaming { + file: tokio::fs::File, + postamble: Vec, + }, + Done, + } + + let initial = State::Preamble { + preamble: preamble.into_bytes(), + file_path: file_path.to_owned(), + postamble: postamble.into_bytes(), + }; + + let stream = futures_util::stream::unfold(initial, |state| async move { + match state { + State::Preamble { + preamble, + file_path, + postamble, + } => match tokio::fs::File::open(&file_path).await { + Ok(file) => Some((Ok(preamble), State::Streaming { file, postamble })), + Err(e) => Some((Err(e), State::Done)), + }, + State::Streaming { + mut file, + postamble, + } => { + let mut buf = vec![0u8; 64 * 1024]; + match file.read(&mut buf).await { + Ok(0) => Some((Ok(postamble), State::Done)), + Ok(n) => { + buf.truncate(n); + Some((Ok(buf), State::Streaming { file, postamble })) + } + Err(e) => Some((Err(e), State::Done)), + } + } + State::Done => None, + } + }); + + ( + reqwest::Body::wrap_stream(stream), + content_type, + content_length, + ) +} + /// Validates a JSON body against a Discovery Document schema. fn validate_body_against_schema( body: &Value, @@ -1218,6 +1315,57 @@ mod tests { assert!(body_str.contains("Binary data")); } + #[tokio::test] + async fn test_build_multipart_stream_content_length() { + use std::io::Write; + let metadata = Some(json!({ "name": "test.txt", "mimeType": "text/plain" })); + let content = b"Hello streaming world"; + + let mut tmp = tempfile::NamedTempFile::new().unwrap(); + tmp.write_all(content).unwrap(); + let path = tmp.path().to_str().unwrap().to_string(); + + let (_, content_type, content_length) = + build_multipart_stream(&metadata, &path, content.len() as u64); + + assert!(content_type.starts_with("multipart/related; boundary=")); + let boundary = content_type.split("boundary=").nth(1).unwrap(); + assert!(boundary.starts_with("gws_boundary_")); + + // Verify content_length matches the expected structure: + // preamble + file_size + postamble + let metadata_json = serde_json::to_string(metadata.as_ref().unwrap()).unwrap(); + let preamble_len = format!( + "--{boundary}\r\nContent-Type: application/json; charset=UTF-8\r\n\r\n{metadata_json}\r\n--{boundary}\r\nContent-Type: text/plain\r\n\r\n" + ).len() as u64; + let postamble_len = format!("\r\n--{boundary}--\r\n").len() as u64; + assert_eq!(content_length, preamble_len + content.len() as u64 + postamble_len); + } + + #[tokio::test] + async fn test_build_multipart_stream_large_file() { + use std::io::Write; + let metadata = Some(json!({ "name": "big.bin", "mimeType": "application/octet-stream" })); + let content = vec![0xABu8; 256 * 1024]; // 256KB — larger than the 64KB chunk size + + let mut tmp = tempfile::NamedTempFile::new().unwrap(); + tmp.write_all(&content).unwrap(); + let path = tmp.path().to_str().unwrap().to_string(); + + let (_, _, content_length) = + build_multipart_stream(&metadata, &path, content.len() as u64); + + // Verify the declared content_length is consistent + let metadata_json = serde_json::to_string(metadata.as_ref().unwrap()).unwrap(); + let boundary_example = "gws_boundary_0000000000000000"; + // Structural overhead is: preamble + postamble (boundary length is fixed at 29 chars) + let expected_overhead = format!( + "--{boundary_example}\r\nContent-Type: application/json; charset=UTF-8\r\n\r\n{metadata_json}\r\n--{boundary_example}\r\nContent-Type: application/octet-stream\r\n\r\n" + ).len() as u64 + + format!("\r\n--{boundary_example}--\r\n").len() as u64; + assert_eq!(content_length, expected_overhead + content.len() as u64); + } + #[test] fn test_build_url_basic() { let doc = RestDescription { From 5941eef42585619e74e5fc1a19a148b2024591f4 Mon Sep 17 00:00:00 2001 From: Acelogic Date: Wed, 11 Mar 2026 16:38:42 -0400 Subject: [PATCH 2/3] refactor: use ReaderStream instead of manual unfold state machine Address review feedback: replace the hand-rolled futures_util::stream::unfold state machine with tokio_util::io::ReaderStream chained between preamble and postamble streams. This is more idiomatic, avoids per-chunk buffer re-allocation, and is easier to read. --- Cargo.lock | 1 + Cargo.toml | 1 + src/executor.rs | 67 +++++++++++++------------------------------------ 3 files changed, 20 insertions(+), 49 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 68931d42..35743d37 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -872,6 +872,7 @@ dependencies = [ "tempfile", "thiserror 2.0.18", "tokio", + "tokio-util", "yup-oauth2", "zeroize", ] diff --git a/Cargo.toml b/Cargo.toml index 44bf0235..8791d9cb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,6 +45,7 @@ serde_json = "1" sha2 = "0.10" thiserror = "2" tokio = { version = "1", features = ["full"] } +tokio-util = { version = "0.7", features = ["io"] } yup-oauth2 = "12" futures-util = "0.3" base64 = "0.22.1" diff --git a/src/executor.rs b/src/executor.rs index 2b6211b6..0e95d466 100644 --- a/src/executor.rs +++ b/src/executor.rs @@ -22,9 +22,9 @@ use std::collections::{HashMap, HashSet}; use std::path::PathBuf; use anyhow::Context; -use futures_util::StreamExt; +use futures_util::{StreamExt, TryStreamExt}; use serde_json::{json, Map, Value}; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::io::AsyncWriteExt; use crate::discovery::{RestDescription, RestMethod}; use crate::error::GwsError; @@ -810,53 +810,22 @@ fn build_multipart_stream( let content_length = preamble.len() as u64 + file_size + postamble.len() as u64; let content_type = format!("multipart/related; boundary={boundary}"); - // State machine for the streaming body: preamble -> file chunks -> postamble - enum State { - Preamble { - preamble: Vec, - file_path: String, - postamble: Vec, - }, - Streaming { - file: tokio::fs::File, - postamble: Vec, - }, - Done, - } - - let initial = State::Preamble { - preamble: preamble.into_bytes(), - file_path: file_path.to_owned(), - postamble: postamble.into_bytes(), - }; - - let stream = futures_util::stream::unfold(initial, |state| async move { - match state { - State::Preamble { - preamble, - file_path, - postamble, - } => match tokio::fs::File::open(&file_path).await { - Ok(file) => Some((Ok(preamble), State::Streaming { file, postamble })), - Err(e) => Some((Err(e), State::Done)), - }, - State::Streaming { - mut file, - postamble, - } => { - let mut buf = vec![0u8; 64 * 1024]; - match file.read(&mut buf).await { - Ok(0) => Some((Ok(postamble), State::Done)), - Ok(n) => { - buf.truncate(n); - Some((Ok(buf), State::Streaming { file, postamble })) - } - Err(e) => Some((Err(e), State::Done)), - } - } - State::Done => None, - } - }); + // Chain: preamble bytes -> file chunks (via ReaderStream) -> postamble bytes + let file_path = file_path.to_owned(); + let preamble_bytes = preamble.into_bytes(); + let postamble_bytes = postamble.into_bytes(); + + let file_stream = + futures_util::stream::once(async move { tokio::fs::File::open(file_path).await }) + .map_ok(|f| tokio_util::io::ReaderStream::new(f).map_ok(|b| b.to_vec())) + .try_flatten(); + + let stream = + futures_util::stream::once(async { Ok::<_, std::io::Error>(preamble_bytes) }) + .chain(file_stream) + .chain(futures_util::stream::once(async { + Ok::<_, std::io::Error>(postamble_bytes) + })); ( reqwest::Body::wrap_stream(stream), From 813dd280ff13cfffef75c785495e0b4d65f7d4f9 Mon Sep 17 00:00:00 2001 From: Acelogic Date: Wed, 11 Mar 2026 16:52:38 -0400 Subject: [PATCH 3/3] refactor: use bytes::Bytes for zero-copy streaming Remove the per-chunk .to_vec() copy by using bytes::Bytes throughout the stream chain. ReaderStream already yields Bytes, so the preamble and postamble just need to match the type. --- Cargo.lock | 1 + Cargo.toml | 1 + src/executor.rs | 7 ++++--- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 35743d37..29d54997 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -851,6 +851,7 @@ dependencies = [ "anyhow", "async-trait", "base64", + "bytes", "chrono", "clap", "crossterm", diff --git a/Cargo.toml b/Cargo.toml index 8791d9cb..ebd547f2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,6 +58,7 @@ async-trait = "0.1.89" serde_yaml = "0.9.34" percent-encoding = "2.3.2" zeroize = { version = "1.8.2", features = ["derive"] } +bytes = "1.11.1" # The profile that 'cargo dist' will build with diff --git a/src/executor.rs b/src/executor.rs index 0e95d466..78982d59 100644 --- a/src/executor.rs +++ b/src/executor.rs @@ -811,13 +811,14 @@ fn build_multipart_stream( let content_type = format!("multipart/related; boundary={boundary}"); // Chain: preamble bytes -> file chunks (via ReaderStream) -> postamble bytes + // All parts use bytes::Bytes for zero-copy streaming. let file_path = file_path.to_owned(); - let preamble_bytes = preamble.into_bytes(); - let postamble_bytes = postamble.into_bytes(); + let preamble_bytes = bytes::Bytes::from(preamble.into_bytes()); + let postamble_bytes = bytes::Bytes::from(postamble.into_bytes()); let file_stream = futures_util::stream::once(async move { tokio::fs::File::open(file_path).await }) - .map_ok(|f| tokio_util::io::ReaderStream::new(f).map_ok(|b| b.to_vec())) + .map_ok(tokio_util::io::ReaderStream::new) .try_flatten(); let stream =