From a1617cf4a08c315bee69f30406d1c83eccae70d7 Mon Sep 17 00:00:00 2001 From: suyanhanx Date: Mon, 8 May 2023 23:52:01 +0800 Subject: [PATCH 1/4] feat(services/gcs): writer abort Signed-off-by: suyanhanx --- core/src/services/gcs/core.rs | 14 ++++++++++++++ core/src/services/gcs/writer.rs | 20 ++++++++++++++++++-- 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/core/src/services/gcs/core.rs b/core/src/services/gcs/core.rs index 5ac9dc207681..7ebe25dbe311 100644 --- a/core/src/services/gcs/core.rs +++ b/core/src/services/gcs/core.rs @@ -538,4 +538,18 @@ impl GcsCore { self.send(req).await } + + pub async fn gcs_abort_resumable_upload( + &self, + location: &str, + ) -> Result> { + let mut req = Request::delete(location) + .header(CONTENT_LENGTH, 0) + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + self.send(req).await + } } diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs index e853de83b2b5..36e851f68ff3 100644 --- a/core/src/services/gcs/writer.rs +++ b/core/src/services/gcs/writer.rs @@ -163,9 +163,25 @@ impl oio::Write for GcsWriter { } } - // TODO: we can cancel the upload by sending a DELETE request to the location async fn abort(&mut self) -> Result<()> { - Ok(()) + let location = if let Some(location) = &self.location { + location + } else { + return Ok(()); + }; + + let resp = self.core.gcs_abort_resumable_upload(location).await?; + + match resp.status() { + // gcs returns 204 if the upload aborted successfully + StatusCode::NO_CONTENT => { + resp.into_body().consume().await?; + self.location = None; + self.buffer.clear(); + Ok(()) + } + _ => Err(parse_error(resp).await?), + } } async fn close(&mut self) -> Result<()> { From 63a95ffe31bfcdb47aa591a29fff16ef31315579 Mon Sep 17 00:00:00 2001 From: suyanhanx Date: Tue, 9 May 2023 00:38:24 +0800 Subject: [PATCH 2/4] fix, success resp status code must be 499 Signed-off-by: suyanhanx --- core/src/services/gcs/writer.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs index 36e851f68ff3..e23f7b34c0ec 100644 --- a/core/src/services/gcs/writer.rs +++ b/core/src/services/gcs/writer.rs @@ -173,8 +173,9 @@ impl oio::Write for GcsWriter { let resp = self.core.gcs_abort_resumable_upload(location).await?; match resp.status() { - // gcs returns 204 if the upload aborted successfully - StatusCode::NO_CONTENT => { + // gcs returns 499 if the upload aborted successfully + // reference: https://cloud.google.com/storage/docs/performing-resumable-uploads#cancel-upload-json + n if n == StatusCode::from_u16(499).unwrap() => { resp.into_body().consume().await?; self.location = None; self.buffer.clear(); From 6888d338ce3ee800980ad9dd20cf65a4b31e679f Mon Sep 17 00:00:00 2001 From: suyanhanx Date: Tue, 9 May 2023 00:50:31 +0800 Subject: [PATCH 3/4] remove unwrap Signed-off-by: suyanhanx --- core/src/services/gcs/writer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs index e23f7b34c0ec..5d5d20192cd5 100644 --- a/core/src/services/gcs/writer.rs +++ b/core/src/services/gcs/writer.rs @@ -175,7 +175,7 @@ impl oio::Write for GcsWriter { match resp.status() { // gcs returns 499 if the upload aborted successfully // reference: https://cloud.google.com/storage/docs/performing-resumable-uploads#cancel-upload-json - n if n == StatusCode::from_u16(499).unwrap() => { + n if n.as_u16() == 499 => { resp.into_body().consume().await?; self.location = None; self.buffer.clear(); From 49bbc353b3d82b8f8841cd0cfce1985ce0e5ede4 Mon Sep 17 00:00:00 2001 From: suyanhanx Date: Tue, 9 May 2023 00:58:00 +0800 Subject: [PATCH 4/4] simplify Signed-off-by: suyanhanx --- core/src/services/gcs/writer.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs index 5d5d20192cd5..3bde86fad991 100644 --- a/core/src/services/gcs/writer.rs +++ b/core/src/services/gcs/writer.rs @@ -172,10 +172,10 @@ impl oio::Write for GcsWriter { let resp = self.core.gcs_abort_resumable_upload(location).await?; - match resp.status() { + match resp.status().as_u16() { // gcs returns 499 if the upload aborted successfully // reference: https://cloud.google.com/storage/docs/performing-resumable-uploads#cancel-upload-json - n if n.as_u16() == 499 => { + 499 => { resp.into_body().consume().await?; self.location = None; self.buffer.clear();