From eeda42e8d36f432feb4de8a14ae0592a987d7fd1 Mon Sep 17 00:00:00 2001 From: 0x676e67 Date: Sat, 8 Mar 2025 13:38:26 +0800 Subject: [PATCH 1/2] feat(decompression): support HTTP responses containing multiple ZSTD frames --- tower-http/src/decompression/body.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tower-http/src/decompression/body.rs b/tower-http/src/decompression/body.rs index 9378e5ca..a35036de 100644 --- a/tower-http/src/decompression/body.rs +++ b/tower-http/src/decompression/body.rs @@ -397,7 +397,9 @@ where type Output = ZstdDecoder; fn apply(input: Self::Input, _quality: CompressionLevel) -> Self::Output { - ZstdDecoder::new(input) + let decoder = ZstdDecoder::new(input); + decoder.multiple_members(true); + decoder } fn get_pin_mut(pinned: Pin<&mut Self::Output>) -> Pin<&mut Self::Input> { From 21828b25fb2e3a8a64840cee97e42f99cfc6ac17 Mon Sep 17 00:00:00 2001 From: gngpp Date: Sat, 8 Mar 2025 13:56:01 +0800 Subject: [PATCH 2/2] feat(decompression): support HTTP responses containing multiple ZSTD frames --- tower-http/src/decompression/body.rs | 2 +- tower-http/src/decompression/mod.rs | 34 ++++++++++++++++++++++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/tower-http/src/decompression/body.rs b/tower-http/src/decompression/body.rs index a35036de..a2970d65 100644 --- a/tower-http/src/decompression/body.rs +++ b/tower-http/src/decompression/body.rs @@ -397,7 +397,7 @@ where type Output = ZstdDecoder; fn apply(input: Self::Input, _quality: CompressionLevel) -> Self::Output { - let decoder = ZstdDecoder::new(input); + let mut decoder = ZstdDecoder::new(input); decoder.multiple_members(true); decoder } diff --git a/tower-http/src/decompression/mod.rs b/tower-http/src/decompression/mod.rs index 708df439..50d4d5fa 100644 --- a/tower-http/src/decompression/mod.rs +++ b/tower-http/src/decompression/mod.rs @@ -168,6 +168,24 @@ mod tests { assert_eq!(decompressed_data, "Hello, World!"); } + #[tokio::test] + async fn decompress_multi_zstd() { + let mut client = Decompression::new(service_fn(handle_multi_zstd)); + + let req = Request::builder() + .header("accept-encoding", "zstd") + .body(Body::empty()) + .unwrap(); + let res = client.ready().await.unwrap().call(req).await.unwrap(); + + // read the body, it will be decompressed automatically + let body = res.into_body(); + let decompressed_data = + String::from_utf8(body.collect().await.unwrap().to_bytes().to_vec()).unwrap(); + + assert_eq!(decompressed_data, "Hello, World!"); + } + async fn handle_multi_gz(_req: Request) -> Result, Infallible> { let mut buf = Vec::new(); let mut enc1 = GzEncoder::new(&mut buf, Default::default()); @@ -184,6 +202,22 @@ mod tests { Ok(res) } + async fn handle_multi_zstd(_req: Request) -> Result, Infallible> { + let mut buf = Vec::new(); + let mut enc1 = zstd::Encoder::new(&mut buf, Default::default()).unwrap(); + enc1.write_all(b"Hello, ").unwrap(); + enc1.finish().unwrap(); + + let mut enc2 = zstd::Encoder::new(&mut buf, Default::default()).unwrap(); + enc2.write_all(b"World!").unwrap(); + enc2.finish().unwrap(); + + let mut res = Response::new(Body::from(buf)); + res.headers_mut() + .insert("content-encoding", "zstd".parse().unwrap()); + Ok(res) + } + #[allow(dead_code)] async fn is_compatible_with_hyper() { let client =