Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/stream-multipart-uploads.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@googleworkspace/cli": patch
---

Stream file uploads instead of buffering entire file in memory, fixing OOM crashes on large files
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ serde_json = "1"
sha2 = "0.10"
thiserror = "2"
tokio = { version = "1", features = ["full"] }
tokio-util = { version = "0.7", features = ["io"] }
yup-oauth2 = "12"
futures-util = "0.3"
base64 = "0.22.1"
Expand All @@ -57,6 +58,7 @@ async-trait = "0.1.89"
serde_yaml = "0.9.34"
percent-encoding = "2.3.2"
zeroize = { version = "1.8.2", features = ["derive"] }
bytes = "1.11.1"


# The profile that 'cargo dist' will build with
Expand Down
136 changes: 127 additions & 9 deletions src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::collections::{HashMap, HashSet};
use std::path::PathBuf;

use anyhow::Context;
use futures_util::StreamExt;
use futures_util::{StreamExt, TryStreamExt};
use serde_json::{json, Map, Value};
use tokio::io::AsyncWriteExt;

Expand Down Expand Up @@ -182,17 +182,22 @@ async fn build_http_request(
if input.is_upload {
let upload_path = upload_path.expect("upload_path must be Some when is_upload is true");

let file_bytes = tokio::fs::read(upload_path).await.map_err(|e| {
GwsError::Validation(format!(
"Failed to read upload file '{}': {}",
upload_path, e
))
})?;
let file_size = tokio::fs::metadata(upload_path)
.await
.map_err(|e| {
GwsError::Validation(format!(
"Failed to read upload file '{}': {}",
upload_path, e
))
})?
.len();

request = request.query(&[("uploadType", "multipart")]);
let (multipart_body, content_type) = build_multipart_body(&input.body, &file_bytes)?;
let (body, content_type, content_length) =
build_multipart_stream(&input.body, upload_path, file_size);
request = request.header("Content-Type", content_type);
request = request.body(multipart_body);
request = request.header("Content-Length", content_length);
request = request.body(body);
} else if let Some(ref body_val) = input.body {
request = request.header("Content-Type", "application/json");
request = request.json(body_val);
Expand Down Expand Up @@ -731,6 +736,7 @@ fn handle_error_response<T>(
/// Builds a multipart/related body for media upload requests.
///
/// Returns the body bytes and the Content-Type header value (with boundary).
#[cfg(test)]
fn build_multipart_body(
metadata: &Option<Value>,
file_bytes: &[u8],
Expand Down Expand Up @@ -768,6 +774,67 @@ fn build_multipart_body(
Ok((body, content_type))
}

/// Build a streaming multipart/related body for file uploads.
///
/// Instead of reading the entire file into memory, this streams the file
/// contents from disk in 64 KB chunks, keeping memory usage constant
/// regardless of file size. Returns `(body, content_type, content_length)`.
fn build_multipart_stream(
metadata: &Option<Value>,
file_path: &str,
file_size: u64,
) -> (reqwest::Body, String, u64) {
let boundary = format!("gws_boundary_{:016x}", rand::random::<u64>());

let media_mime = metadata
.as_ref()
.and_then(|m| m.get("mimeType"))
.and_then(|v| v.as_str())
.unwrap_or("application/octet-stream")
.to_string();

let metadata_json = metadata
.as_ref()
.map(|m| serde_json::to_string(m).unwrap_or_else(|_| "{}".to_string()))
.unwrap_or_else(|| "{}".to_string());

let preamble = format!(
"--{boundary}\r\n\
Content-Type: application/json; charset=UTF-8\r\n\r\n\
{metadata_json}\r\n\
--{boundary}\r\n\
Content-Type: {media_mime}\r\n\r\n"
);
let postamble = format!("\r\n--{boundary}--\r\n");

let content_length = preamble.len() as u64 + file_size + postamble.len() as u64;
let content_type = format!("multipart/related; boundary={boundary}");

// Chain: preamble bytes -> file chunks (via ReaderStream) -> postamble bytes
// All parts use bytes::Bytes for zero-copy streaming.
let file_path = file_path.to_owned();
let preamble_bytes = bytes::Bytes::from(preamble.into_bytes());
let postamble_bytes = bytes::Bytes::from(postamble.into_bytes());

let file_stream =
futures_util::stream::once(async move { tokio::fs::File::open(file_path).await })
.map_ok(tokio_util::io::ReaderStream::new)
.try_flatten();

let stream =
futures_util::stream::once(async { Ok::<_, std::io::Error>(preamble_bytes) })
.chain(file_stream)
.chain(futures_util::stream::once(async {
Ok::<_, std::io::Error>(postamble_bytes)
}));
Comment on lines +813 to +829
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

While this implementation correctly streams the file, it performs an unnecessary allocation and copy for each chunk of the file by converting Bytes to Vec<u8> with .map_ok(|b| b.to_vec()).

Given that the goal of this change is to optimize memory usage for large files, we can further improve efficiency by avoiding this copy. By using reqwest::bytes::Bytes for all parts of the stream, you can achieve a zero-copy implementation for the file chunks. This makes the streaming even more memory-efficient.

Suggested change
// Chain: preamble bytes -> file chunks (via ReaderStream) -> postamble bytes
let file_path = file_path.to_owned();
let preamble_bytes = preamble.into_bytes();
let postamble_bytes = postamble.into_bytes();
let file_stream =
futures_util::stream::once(async move { tokio::fs::File::open(file_path).await })
.map_ok(|f| tokio_util::io::ReaderStream::new(f).map_ok(|b| b.to_vec()))
.try_flatten();
let stream =
futures_util::stream::once(async { Ok::<_, std::io::Error>(preamble_bytes) })
.chain(file_stream)
.chain(futures_util::stream::once(async {
Ok::<_, std::io::Error>(postamble_bytes)
}));
// Chain: preamble bytes -> file chunks (via ReaderStream) -> postamble bytes
let file_path = file_path.to_owned();
let preamble_bytes = reqwest::bytes::Bytes::from(preamble.into_bytes());
let postamble_bytes = reqwest::bytes::Bytes::from(postamble.into_bytes());
let file_stream =
futures_util::stream::once(async move { tokio::fs::File::open(file_path).await })
.map_ok(|f| tokio_util::io::ReaderStream::new(f))
.try_flatten();
let stream =
futures_util::stream::once(async { Ok::<_, std::io::Error>(preamble_bytes) })
.chain(file_stream)
.chain(futures_util::stream::once(async {
Ok::<_, std::io::Error>(postamble_bytes)
}));


(
reqwest::Body::wrap_stream(stream),
content_type,
content_length,
)
}
Comment on lines +782 to +836
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The use of unwrap_or_else on lines 798 and 799 can hide a JSON serialization error, causing the upload to proceed with empty metadata ({}). This could lead to silent failures or hard-to-debug server-side errors.

To make this more robust, I recommend changing build_multipart_stream to return a Result and propagating any serialization errors. This aligns with the error handling pattern used elsewhere in the file.

Here's how you could refactor the function:

fn build_multipart_stream(
    metadata: &Option<Value>,
    file_path: &str,
    file_size: u64,
) -> Result<(reqwest::Body, String, u64), GwsError> {
    let boundary = format!("gws_boundary_{:016x}", rand::random::<u64>());

    let media_mime = metadata
        .as_ref()
        .and_then(|m| m.get("mimeType"))
        .and_then(|v| v.as_str())
        .unwrap_or("application/octet-stream")
        .to_string();

    let metadata_json = match metadata {
        Some(m) => serde_json::to_string(m).map_err(|e| {
            GwsError::Validation(format!("Failed to serialize upload metadata: {e}"))
        })?,
        None => "{}".to_string(),
    };

    let preamble = format!(
        "--{boundary}\r\n\
         Content-Type: application/json; charset=UTF-8\r\n\r\n\
         {metadata_json}\r\n\
         --{boundary}\r\n\
         Content-Type: {media_mime}\r\n\r\n"
    );
    let postamble = format!("\r\n--{boundary}--\r\n");

    let content_length = preamble.len() as u64 + file_size + postamble.len() as u64;
    let content_type = format!("multipart/related; boundary={boundary}");

    // ... (rest of the function is fine)

    let file_path = file_path.to_owned();
    let preamble_bytes = bytes::Bytes::from(preamble.into_bytes());
    let postamble_bytes = bytes::Bytes::from(postamble.into_bytes());

    let file_stream =
        futures_util::stream::once(async move { tokio::fs::File::open(file_path).await })
            .map_ok(tokio_util::io::ReaderStream::new)
            .try_flatten();

    let stream =
        futures_util::stream::once(async { Ok::<_, std::io::Error>(preamble_bytes) })
            .chain(file_stream)
            .chain(futures_util::stream::once(async {
                Ok::<_, std::io::Error>(postamble_bytes)
            }));

    Ok((
        reqwest::Body::wrap_stream(stream),
        content_type,
        content_length,
    ))
}

You would also need to update the call site in build_http_request to use the ? operator:

// line 196
let (body, content_type, content_length) =
    build_multipart_stream(&input.body, upload_path, file_size)?;


/// Validates a JSON body against a Discovery Document schema.
fn validate_body_against_schema(
body: &Value,
Expand Down Expand Up @@ -1218,6 +1285,57 @@ mod tests {
assert!(body_str.contains("Binary data"));
}

#[tokio::test]
async fn test_build_multipart_stream_content_length() {
use std::io::Write;
let metadata = Some(json!({ "name": "test.txt", "mimeType": "text/plain" }));
let content = b"Hello streaming world";

let mut tmp = tempfile::NamedTempFile::new().unwrap();
tmp.write_all(content).unwrap();
let path = tmp.path().to_str().unwrap().to_string();

let (_, content_type, content_length) =
build_multipart_stream(&metadata, &path, content.len() as u64);

assert!(content_type.starts_with("multipart/related; boundary="));
let boundary = content_type.split("boundary=").nth(1).unwrap();
assert!(boundary.starts_with("gws_boundary_"));

// Verify content_length matches the expected structure:
// preamble + file_size + postamble
let metadata_json = serde_json::to_string(metadata.as_ref().unwrap()).unwrap();
let preamble_len = format!(
"--{boundary}\r\nContent-Type: application/json; charset=UTF-8\r\n\r\n{metadata_json}\r\n--{boundary}\r\nContent-Type: text/plain\r\n\r\n"
).len() as u64;
let postamble_len = format!("\r\n--{boundary}--\r\n").len() as u64;
assert_eq!(content_length, preamble_len + content.len() as u64 + postamble_len);
}

#[tokio::test]
async fn test_build_multipart_stream_large_file() {
use std::io::Write;
let metadata = Some(json!({ "name": "big.bin", "mimeType": "application/octet-stream" }));
let content = vec![0xABu8; 256 * 1024]; // 256KB — larger than the 64KB chunk size

let mut tmp = tempfile::NamedTempFile::new().unwrap();
tmp.write_all(&content).unwrap();
let path = tmp.path().to_str().unwrap().to_string();

let (_, _, content_length) =
build_multipart_stream(&metadata, &path, content.len() as u64);

// Verify the declared content_length is consistent
let metadata_json = serde_json::to_string(metadata.as_ref().unwrap()).unwrap();
let boundary_example = "gws_boundary_0000000000000000";
// Structural overhead is: preamble + postamble (boundary length is fixed at 29 chars)
let expected_overhead = format!(
"--{boundary_example}\r\nContent-Type: application/json; charset=UTF-8\r\n\r\n{metadata_json}\r\n--{boundary_example}\r\nContent-Type: application/octet-stream\r\n\r\n"
).len() as u64
+ format!("\r\n--{boundary_example}--\r\n").len() as u64;
assert_eq!(content_length, expected_overhead + content.len() as u64);
}

#[test]
fn test_build_url_basic() {
let doc = RestDescription {
Expand Down
Loading