From efade7a836b4b38c66e578f4116301c8cd743927 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 9 Jan 2024 21:12:28 +0800 Subject: [PATCH 1/6] Fix concurrent retry Signed-off-by: Xuanwo --- core/src/raw/futures_util.rs | 22 +++- core/src/raw/oio/write/block_write.rs | 26 +++-- .../raw/oio/write/multipart_upload_write.rs | 105 ++++++++++++------ core/src/raw/oio/write/range_write.rs | 19 ++-- core/src/types/list.rs | 5 +- 5 files changed, 117 insertions(+), 60 deletions(-) diff --git a/core/src/raw/futures_util.rs b/core/src/raw/futures_util.rs index d7a1168b6340..390d263ccb4f 100644 --- a/core/src/raw/futures_util.rs +++ b/core/src/raw/futures_util.rs @@ -134,8 +134,8 @@ where self.remaining() > 0 } - /// Push new future into the queue. - pub fn push(&mut self, f: F) { + /// Push new future into the end of queue. + pub fn push_back(&mut self, f: F) { debug_assert!( self.has_remaining(), "concurrent futures must have remaining space" @@ -149,6 +149,22 @@ where Tasks::Large(v) => v.push_back(f), } } + + /// Push new future into the start of queue, this task will be exactly the next to poll. + pub fn push_front(&mut self, f: F) { + debug_assert!( + self.has_remaining(), + "concurrent futures must have remaining space" + ); + + match &mut self.tasks { + Tasks::Once(fut) => { + *fut = Some(f); + } + Tasks::Small(v) => v.push_front(TaskResult::Polling(f)), + Tasks::Large(v) => v.push_front(f), + } + } } impl futures::Stream for ConcurrentFutures @@ -230,7 +246,7 @@ mod tests { idx }; self.idx += 1; - self.tasks.push(Box::pin(fut)); + self.tasks.push_back(Box::pin(fut)); } if let Some(v) = ready!(self.tasks.poll_next_unpin(cx)) { diff --git a/core/src/raw/oio/write/block_write.rs b/core/src/raw/oio/write/block_write.rs index 8665d50c00e8..fdcb9dc8c80b 100644 --- a/core/src/raw/oio/write/block_write.rs +++ b/core/src/raw/oio/write/block_write.rs @@ -164,10 +164,11 @@ where self.block_ids.push(block_id.clone()); let w = self.w.clone(); let size = cache.len(); - self.futures.push(WriteBlockFuture(Box::pin(async move { - w.write_block(size as u64, block_id, AsyncBody::ChunkedBytes(cache)) - .await - }))); + self.futures + .push_back(WriteBlockFuture(Box::pin(async move { + w.write_block(size as u64, block_id, AsyncBody::ChunkedBytes(cache)) + .await + }))); let size = self.fill_cache(bs); return Poll::Ready(Ok(size)); } else { @@ -222,14 +223,15 @@ where self.block_ids.push(block_id.clone()); let size = cache.len(); let w = self.w.clone(); - self.futures.push(WriteBlockFuture(Box::pin(async move { - w.write_block( - size as u64, - block_id, - AsyncBody::ChunkedBytes(cache), - ) - .await - }))); + self.futures + .push_back(WriteBlockFuture(Box::pin(async move { + w.write_block( + size as u64, + block_id, + AsyncBody::ChunkedBytes(cache), + ) + .await + }))); } } while ready!(self.futures.poll_next_unpin(cx)).is_some() {} diff --git a/core/src/raw/oio/write/multipart_upload_write.rs b/core/src/raw/oio/write/multipart_upload_write.rs index 3a3d81f891ac..0b24cd2981f3 100644 --- a/core/src/raw/oio/write/multipart_upload_write.rs +++ b/core/src/raw/oio/write/multipart_upload_write.rs @@ -107,7 +107,12 @@ pub struct MultipartUploadPart { pub etag: String, } -struct WritePartFuture(BoxedFuture>); +/// WritePartResult is the result returned by [`WritePartFuture`]. +/// +/// The error part will carries inout `(part_number, bytes, err)` so caller can retry them. +type WritePartResult = std::result::Result; + +struct WritePartFuture(BoxedFuture); /// # Safety /// @@ -120,12 +125,34 @@ unsafe impl Send for WritePartFuture {} unsafe impl Sync for WritePartFuture {} impl Future for WritePartFuture { - type Output = Result; + type Output = WritePartResult; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { self.get_mut().0.poll_unpin(cx) } } +impl WritePartFuture { + pub fn new( + w: Arc, + upload_id: Arc, + part_number: usize, + bytes: oio::ChunkedBytes, + ) -> Self { + let fut = async move { + w.write_part( + &upload_id, + part_number, + bytes.len() as u64, + AsyncBody::ChunkedBytes(bytes.clone()), + ) + .await + .map_err(|err| (part_number, bytes, err)) + }; + + WritePartFuture(Box::pin(fut)) + } +} + /// MultipartUploadWriter will implements [`Write`] based on multipart /// uploads. pub struct MultipartUploadWriter { @@ -133,7 +160,6 @@ pub struct MultipartUploadWriter { w: Arc, upload_id: Option>, - concurrent: usize, parts: Vec, cache: Option, futures: ConcurrentFutures, @@ -164,7 +190,6 @@ impl MultipartUploadWriter { w: Arc::new(inner), upload_id: None, - concurrent, parts: Vec::new(), cache: None, futures: ConcurrentFutures::new(1.max(concurrent)), @@ -196,21 +221,30 @@ where let cache = self.cache.take().expect("pending write must exist"); let part_number = self.next_part_number; self.next_part_number += 1; - let w = self.w.clone(); - let size = cache.len(); - self.futures.push(WritePartFuture(Box::pin(async move { - w.write_part( - &upload_id, - part_number, - size as u64, - AsyncBody::ChunkedBytes(cache), - ) - .await - }))); + + self.futures.push_back(WritePartFuture::new( + self.w.clone(), + upload_id, + part_number, + cache, + )); let size = self.fill_cache(bs); return Poll::Ready(Ok(size)); } else if let Some(part) = ready!(self.futures.poll_next_unpin(cx)) { - self.parts.push(part?); + match part { + Ok(part) => { + self.parts.push(part); + } + Err((part_number, bytes, err)) => { + self.futures.push_front(WritePartFuture::new( + self.w.clone(), + upload_id, + part_number, + bytes, + )); + return Poll::Ready(Err(err)); + } + } } } None => { @@ -264,27 +298,30 @@ where let upload_id = upload_id.clone(); let part_number = self.next_part_number; self.next_part_number += 1; - let size = cache.len(); - let w = self.w.clone(); - self.futures.push(WritePartFuture(Box::pin(async move { - w.write_part( - &upload_id, - part_number, - size as u64, - AsyncBody::ChunkedBytes(cache), - ) - .await - }))); + + self.futures.push_back(WritePartFuture::new( + self.w.clone(), + upload_id, + part_number, + cache, + )); } } - while let Some(mut part) = ready!(self.futures.poll_next_unpin(cx)) - { - // Don't retry close if concurrent write failed. - // TODO: Remove this after addressed. - if self.concurrent > 1 { - part = part.map_err(|err| err.set_permanent()); + while let Some(part) = ready!(self.futures.poll_next_unpin(cx)) { + match part { + Ok(part) => { + self.parts.push(part); + } + Err((part_number, bytes, err)) => { + self.futures.push_front(WritePartFuture::new( + self.w.clone(), + upload_id, + part_number, + bytes, + )); + return Poll::Ready(Err(err)); + } } - self.parts.push(part?); } } } diff --git a/core/src/raw/oio/write/range_write.rs b/core/src/raw/oio/write/range_write.rs index 285f4818fe9b..ec0e4a7077b8 100644 --- a/core/src/raw/oio/write/range_write.rs +++ b/core/src/raw/oio/write/range_write.rs @@ -171,15 +171,16 @@ impl oio::Write for RangeWriter { let offset = self.next_offset; self.next_offset += size; let w = self.w.clone(); - self.futures.push(WriteRangeFuture(Box::pin(async move { - w.write_range( - &location, - offset, - size, - AsyncBody::ChunkedBytes(cache), - ) - .await - }))); + self.futures + .push_back(WriteRangeFuture(Box::pin(async move { + w.write_range( + &location, + offset, + size, + AsyncBody::ChunkedBytes(cache), + ) + .await + }))); let size = self.fill_cache(bs); return Poll::Ready(Ok(size)); } else if let Some(result) = ready!(self.futures.poll_next_unpin(cx)) { diff --git a/core/src/types/list.rs b/core/src/types/list.rs index 2f386093019b..295726556a32 100644 --- a/core/src/types/list.rs +++ b/core/src/types/list.rs @@ -157,14 +157,15 @@ impl Stream for Lister { Poll::Ready(Ok(Some(oe))) => { let (path, metadata) = oe.into_entry().into_parts(); if metadata.contains_metakey(self.required_metakey) { - self.tasks.push(StatTask::Known(Some((path, metadata)))); + self.tasks + .push_back(StatTask::Known(Some((path, metadata)))); } else { let acc = self.acc.clone(); let fut = async move { let res = acc.stat(&path, OpStat::default()).await; (path, res.map(|rp| rp.into_metadata())) }; - self.tasks.push(StatTask::Stating(Box::pin(fut))); + self.tasks.push_back(StatTask::Stating(Box::pin(fut))); } } Poll::Ready(Ok(None)) => { From c7c68fa81574ec74c126210ebd186405056c898b Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 9 Jan 2024 21:24:37 +0800 Subject: [PATCH 2/6] FIx retry for range write Signed-off-by: Xuanwo --- core/src/raw/oio/write/range_write.rs | 89 ++++++++++++++++++--------- 1 file changed, 59 insertions(+), 30 deletions(-) diff --git a/core/src/raw/oio/write/range_write.rs b/core/src/raw/oio/write/range_write.rs index ec0e4a7077b8..f4d797d368a7 100644 --- a/core/src/raw/oio/write/range_write.rs +++ b/core/src/raw/oio/write/range_write.rs @@ -86,7 +86,12 @@ pub trait RangeWrite: Send + Sync + Unpin + 'static { async fn abort_range(&self, location: &str) -> Result<()>; } -struct WriteRangeFuture(BoxedFuture>); +/// WritePartResult is the result returned by [`WriteRangeFuture`]. +/// +/// The error part will carries inout `(offset, bytes, err)` so caller can retry them. +type WriteRangeResult = std::result::Result<(), (u64, oio::ChunkedBytes, Error)>; + +struct WriteRangeFuture(BoxedFuture); /// # Safety /// @@ -99,19 +104,40 @@ unsafe impl Send for WriteRangeFuture {} unsafe impl Sync for WriteRangeFuture {} impl Future for WriteRangeFuture { - type Output = Result<()>; + type Output = WriteRangeResult; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { self.get_mut().0.poll_unpin(cx) } } +impl WriteRangeFuture { + pub fn new( + w: Arc, + location: Arc, + offset: u64, + bytes: oio::ChunkedBytes, + ) -> Self { + let fut = async move { + w.write_range( + &location, + offset, + bytes.len() as u64, + AsyncBody::ChunkedBytes(bytes.clone()), + ) + .await + .map_err(|err| (offset, bytes, err)) + }; + + WriteRangeFuture(Box::pin(fut)) + } +} + /// RangeWriter will implements [`Write`] based on range write. pub struct RangeWriter { - location: Option, + location: Option>, next_offset: u64, buffer: Option, futures: ConcurrentFutures, - concurrent: usize, w: Arc, state: State, @@ -145,7 +171,6 @@ impl RangeWriter { buffer: None, location: None, next_offset: 0, - concurrent, } } @@ -167,24 +192,27 @@ impl oio::Write for RangeWriter { Some(location) => { if self.futures.has_remaining() { let cache = self.buffer.take().expect("cache must be valid"); - let size = cache.len() as u64; let offset = self.next_offset; - self.next_offset += size; - let w = self.w.clone(); - self.futures - .push_back(WriteRangeFuture(Box::pin(async move { - w.write_range( - &location, - offset, - size, - AsyncBody::ChunkedBytes(cache), - ) - .await - }))); + self.next_offset += cache.len() as u64; + self.futures.push_back(WriteRangeFuture::new( + self.w.clone(), + location, + offset, + cache, + )); + let size = self.fill_cache(bs); return Poll::Ready(Ok(size)); - } else if let Some(result) = ready!(self.futures.poll_next_unpin(cx)) { - result?; + } else if let Some(Err((offset, bytes, err))) = + ready!(self.futures.poll_next_unpin(cx)) + { + self.futures.push_front(WriteRangeFuture::new( + self.w.clone(), + location, + offset, + bytes, + )); + return Poll::Ready(Err(err)); } } None => { @@ -203,7 +231,7 @@ impl oio::Write for RangeWriter { State::Init(fut) => { let res = ready!(fut.poll_unpin(cx)); self.state = State::Idle; - self.location = Some(res?); + self.location = Some(Arc::new(res?)); } State::Complete(_) => { unreachable!("RangeWriter must not go into State::Complete during poll_write") @@ -223,15 +251,16 @@ impl oio::Write for RangeWriter { match self.location.clone() { Some(location) => { if !self.futures.is_empty() { - while let Some(mut result) = - ready!(self.futures.poll_next_unpin(cx)) - { - // Don't retry close if concurrent write failed. - // TODO: Remove this after addressed. - if self.concurrent > 1 { - result = result.map_err(|err| err.set_permanent()); - } - result?; + while let Some(result) = ready!(self.futures.poll_next_unpin(cx)) { + if let Err((offset, bytes, err)) = result { + self.futures.push_front(WriteRangeFuture::new( + self.w.clone(), + location.clone(), + offset, + bytes, + )); + return Poll::Ready(Err(err)); + }; } } match self.buffer.take() { From 0e953403d0a36b63e552c9d447cb46d8d1655475 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 9 Jan 2024 21:26:12 +0800 Subject: [PATCH 3/6] Fix typo Signed-off-by: Xuanwo --- core/src/raw/oio/write/multipart_upload_write.rs | 2 +- core/src/raw/oio/write/range_write.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/raw/oio/write/multipart_upload_write.rs b/core/src/raw/oio/write/multipart_upload_write.rs index 0b24cd2981f3..27a4cc4b9f56 100644 --- a/core/src/raw/oio/write/multipart_upload_write.rs +++ b/core/src/raw/oio/write/multipart_upload_write.rs @@ -109,7 +109,7 @@ pub struct MultipartUploadPart { /// WritePartResult is the result returned by [`WritePartFuture`]. /// -/// The error part will carries inout `(part_number, bytes, err)` so caller can retry them. +/// The error part will carries input `(part_number, bytes, err)` so caller can retry them. type WritePartResult = std::result::Result; struct WritePartFuture(BoxedFuture); diff --git a/core/src/raw/oio/write/range_write.rs b/core/src/raw/oio/write/range_write.rs index f4d797d368a7..1ab461c5c122 100644 --- a/core/src/raw/oio/write/range_write.rs +++ b/core/src/raw/oio/write/range_write.rs @@ -88,7 +88,7 @@ pub trait RangeWrite: Send + Sync + Unpin + 'static { /// WritePartResult is the result returned by [`WriteRangeFuture`]. /// -/// The error part will carries inout `(offset, bytes, err)` so caller can retry them. +/// The error part will carries input `(offset, bytes, err)` so caller can retry them. type WriteRangeResult = std::result::Result<(), (u64, oio::ChunkedBytes, Error)>; struct WriteRangeFuture(BoxedFuture); From f5c751713b3bb91b5c24bc4e75fd7443cbd50815 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 10 Jan 2024 15:13:28 +0800 Subject: [PATCH 4/6] Add test for retry in concurrent Signed-off-by: Xuanwo --- .../raw/oio/write/multipart_upload_write.rs | 220 ++++++++++++++---- core/src/raw/oio/write/range_write.rs | 122 +++++++++- 2 files changed, 291 insertions(+), 51 deletions(-) diff --git a/core/src/raw/oio/write/multipart_upload_write.rs b/core/src/raw/oio/write/multipart_upload_write.rs index 27a4cc4b9f56..3749dcb3f5dd 100644 --- a/core/src/raw/oio/write/multipart_upload_write.rs +++ b/core/src/raw/oio/write/multipart_upload_write.rs @@ -216,7 +216,6 @@ where State::Idle => { match self.upload_id.as_ref() { Some(upload_id) => { - let upload_id = upload_id.clone(); if self.futures.has_remaining() { let cache = self.cache.take().expect("pending write must exist"); let part_number = self.next_part_number; @@ -224,13 +223,15 @@ where self.futures.push_back(WritePartFuture::new( self.w.clone(), - upload_id, + upload_id.clone(), part_number, cache, )); let size = self.fill_cache(bs); return Poll::Ready(Ok(size)); - } else if let Some(part) = ready!(self.futures.poll_next_unpin(cx)) { + } + + if let Some(part) = ready!(self.futures.poll_next_unpin(cx)) { match part { Ok(part) => { self.parts.push(part); @@ -238,7 +239,7 @@ where Err((part_number, bytes, err)) => { self.futures.push_front(WritePartFuture::new( self.w.clone(), - upload_id, + upload_id.clone(), part_number, bytes, )); @@ -262,8 +263,8 @@ where } State::Init(fut) => { let upload_id = ready!(fut.as_mut().poll(cx)); - self.state = State::Idle; self.upload_id = Some(Arc::new(upload_id?)); + self.state = State::Idle; } State::Close(_) => { unreachable!( @@ -283,65 +284,63 @@ where loop { match &mut self.state { State::Idle => { - match self.upload_id.clone() { + match self.upload_id.as_ref() { Some(upload_id) => { - let w = self.w.clone(); + // futures queue is empty and cache is consumed, we can complete the upload. if self.futures.is_empty() && self.cache.is_none() { + let w = self.w.clone(); let upload_id = upload_id.clone(); let parts = self.parts.clone(); + self.state = State::Close(Box::pin(async move { w.complete_part(&upload_id, &parts).await })); - } else { - if self.futures.has_remaining() { - if let Some(cache) = self.cache.take() { - let upload_id = upload_id.clone(); - let part_number = self.next_part_number; - self.next_part_number += 1; - - self.futures.push_back(WritePartFuture::new( + continue; + } + + if self.futures.has_remaining() { + // This must be the final task. + if let Some(cache) = self.cache.take() { + let part_number = self.next_part_number; + self.next_part_number += 1; + + self.futures.push_back(WritePartFuture::new( + self.w.clone(), + upload_id.clone(), + part_number, + cache, + )); + } + } + + if let Some(part) = ready!(self.futures.poll_next_unpin(cx)) { + match part { + Ok(part) => { + self.parts.push(part); + } + Err((part_number, bytes, err)) => { + self.futures.push_front(WritePartFuture::new( self.w.clone(), - upload_id, + upload_id.clone(), part_number, - cache, + bytes, )); - } - } - while let Some(part) = ready!(self.futures.poll_next_unpin(cx)) { - match part { - Ok(part) => { - self.parts.push(part); - } - Err((part_number, bytes, err)) => { - self.futures.push_front(WritePartFuture::new( - self.w.clone(), - upload_id, - part_number, - bytes, - )); - return Poll::Ready(Err(err)); - } + return Poll::Ready(Err(err)); } } } } - None => match &self.cache { - Some(cache) => { - let w = self.w.clone(); - let bs = cache.clone(); - self.state = State::Close(Box::pin(async move { - let size = bs.len(); - w.write_once(size as u64, AsyncBody::ChunkedBytes(bs)).await - })); - } - None => { - let w = self.w.clone(); - // Call write_once if there is no data in cache and no upload_id. - self.state = State::Close(Box::pin(async move { - w.write_once(0, AsyncBody::Empty).await - })); - } - }, + None => { + let w = self.w.clone(); + let (size, body) = match self.cache.clone() { + Some(cache) => (cache.len(), AsyncBody::ChunkedBytes(cache)), + None => (0, AsyncBody::Empty), + }; + // Call write_once if there is no upload_id. + self.state = State::Close(Box::pin(async move { + w.write_once(size as u64, body).await + })); + } } } State::Close(fut) => { @@ -396,3 +395,124 @@ where } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::raw::oio::WriteExt; + use pretty_assertions::assert_eq; + use rand::{thread_rng, Rng, RngCore}; + use std::sync::Mutex; + + struct TestWrite { + upload_id: String, + part_numbers: Vec, + length: u64, + } + + impl TestWrite { + pub fn new() -> Arc> { + let v = Self { + upload_id: uuid::Uuid::new_v4().to_string(), + part_numbers: Vec::new(), + length: 0, + }; + + Arc::new(Mutex::new(v)) + } + } + + #[cfg_attr(not(target_arch = "wasm32"), async_trait)] + #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] + impl MultipartUploadWrite for Arc> { + async fn write_once(&self, size: u64, _: AsyncBody) -> Result<()> { + self.lock().unwrap().length += size; + Ok(()) + } + + async fn initiate_part(&self) -> Result { + let upload_id = self.lock().unwrap().upload_id.clone(); + Ok(upload_id) + } + + async fn write_part( + &self, + upload_id: &str, + part_number: usize, + size: u64, + _: AsyncBody, + ) -> Result { + let mut test = self.lock().unwrap(); + assert_eq!(upload_id, test.upload_id); + + // We will have 50% percent rate for write part to fail. + if thread_rng().gen_bool(5.0 / 10.0) { + return Err(Error::new(ErrorKind::Unexpected, "I'm a crazy monkey!")); + } + + test.part_numbers.push(part_number); + test.length += size; + + Ok(MultipartUploadPart { + part_number, + etag: "etag".to_string(), + }) + } + + async fn complete_part( + &self, + upload_id: &str, + parts: &[MultipartUploadPart], + ) -> Result<()> { + let test = self.lock().unwrap(); + assert_eq!(upload_id, test.upload_id); + assert_eq!(parts.len(), test.part_numbers.len()); + + Ok(()) + } + + async fn abort_part(&self, upload_id: &str) -> Result<()> { + let test = self.lock().unwrap(); + assert_eq!(upload_id, test.upload_id); + + Ok(()) + } + } + + #[tokio::test] + async fn test_multipart_upload_writer_with_concurrent_errors() { + let mut rng = thread_rng(); + + let mut w = MultipartUploadWriter::new(TestWrite::new(), 8); + let mut total_size = 0u64; + + for _ in 0..1000 { + let size = rng.gen_range(1..1024); + total_size += size as u64; + + let mut bs = vec![0; size]; + rng.fill_bytes(&mut bs); + + loop { + match w.write(&bs.as_slice()).await { + Ok(_) => break, + Err(_) => continue, + } + } + } + + loop { + match w.close().await { + Ok(_) => break, + Err(_) => continue, + } + } + + let actual_parts: Vec<_> = w.parts.into_iter().map(|v| v.part_number).collect(); + let expected_parts: Vec<_> = (0..1000).collect(); + assert_eq!(actual_parts, expected_parts); + + let actual_size = w.w.lock().unwrap().length; + assert_eq!(actual_size, total_size); + } +} diff --git a/core/src/raw/oio/write/range_write.rs b/core/src/raw/oio/write/range_write.rs index 1ab461c5c122..1d67a5cbc11d 100644 --- a/core/src/raw/oio/write/range_write.rs +++ b/core/src/raw/oio/write/range_write.rs @@ -203,7 +203,9 @@ impl oio::Write for RangeWriter { let size = self.fill_cache(bs); return Poll::Ready(Ok(size)); - } else if let Some(Err((offset, bytes, err))) = + } + + if let Some(Err((offset, bytes, err))) = ready!(self.futures.poll_next_unpin(cx)) { self.futures.push_front(WriteRangeFuture::new( @@ -343,3 +345,121 @@ impl oio::Write for RangeWriter { } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::raw::oio::WriteExt; + use pretty_assertions::assert_eq; + use rand::{thread_rng, Rng, RngCore}; + use std::collections::HashSet; + use std::sync::Mutex; + + struct TestWrite { + length: u64, + bytes: HashSet, + } + + impl TestWrite { + pub fn new() -> Arc> { + let v = Self { + bytes: HashSet::new(), + length: 0, + }; + + Arc::new(Mutex::new(v)) + } + } + + #[cfg_attr(not(target_arch = "wasm32"), async_trait)] + #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] + impl RangeWrite for Arc> { + async fn write_once(&self, size: u64, _: AsyncBody) -> Result<()> { + let mut test = self.lock().unwrap(); + test.length += size; + test.bytes.extend(0..size); + + Ok(()) + } + + async fn initiate_range(&self) -> Result { + Ok("test".to_string()) + } + + async fn write_range(&self, _: &str, offset: u64, size: u64, _: AsyncBody) -> Result<()> { + let mut test = self.lock().unwrap(); + test.length += size; + + let input = (offset..offset + size).collect::>(); + + assert!( + test.bytes.is_disjoint(&input), + "input should not have overlap" + ); + test.bytes.extend(input); + + Ok(()) + } + + async fn complete_range( + &self, + _: &str, + offset: u64, + size: u64, + _: AsyncBody, + ) -> Result<()> { + let mut test = self.lock().unwrap(); + test.length += size; + + let input = (offset..offset + size).collect::>(); + assert!( + test.bytes.is_disjoint(&input), + "input should not have overlap" + ); + test.bytes.extend(input); + + Ok(()) + } + + async fn abort_range(&self, _: &str) -> Result<()> { + Ok(()) + } + } + + #[tokio::test] + async fn test_range_writer_with_concurrent_errors() { + let mut rng = thread_rng(); + + let mut w = RangeWriter::new(TestWrite::new(), 8); + let mut total_size = 0u64; + + for _ in 0..1000 { + let size = rng.gen_range(1..1024); + total_size += size as u64; + + let mut bs = vec![0; size]; + rng.fill_bytes(&mut bs); + + loop { + match w.write(&bs.as_slice()).await { + Ok(_) => break, + Err(_) => continue, + } + } + } + + loop { + match w.close().await { + Ok(_) => break, + Err(_) => continue, + } + } + + let actual_bytes = w.w.lock().unwrap().bytes.clone(); + let expected_bytes: HashSet<_> = (0..total_size).collect(); + assert_eq!(actual_bytes, expected_bytes); + + let actual_size = w.w.lock().unwrap().length; + assert_eq!(actual_size, total_size); + } +} From f8a12b01a1ff31d6ced539d7c8d68783eb1875bb Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 10 Jan 2024 15:15:54 +0800 Subject: [PATCH 5/6] FIx Signed-off-by: Xuanwo --- core/src/raw/oio/write/range_write.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/core/src/raw/oio/write/range_write.rs b/core/src/raw/oio/write/range_write.rs index 1d67a5cbc11d..ffc7432a4d49 100644 --- a/core/src/raw/oio/write/range_write.rs +++ b/core/src/raw/oio/write/range_write.rs @@ -387,6 +387,11 @@ mod tests { } async fn write_range(&self, _: &str, offset: u64, size: u64, _: AsyncBody) -> Result<()> { + // We will have 50% percent rate for write part to fail. + if thread_rng().gen_bool(5.0 / 10.0) { + return Err(Error::new(ErrorKind::Unexpected, "I'm a crazy monkey!")); + } + let mut test = self.lock().unwrap(); test.length += size; From fa573c60b099b6049d5477b06377855a2b298feb Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 10 Jan 2024 15:34:01 +0800 Subject: [PATCH 6/6] Fix build Signed-off-by: Xuanwo --- core/src/raw/oio/write/range_write.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/raw/oio/write/range_write.rs b/core/src/raw/oio/write/range_write.rs index ffc7432a4d49..13a7b2fb3e9a 100644 --- a/core/src/raw/oio/write/range_write.rs +++ b/core/src/raw/oio/write/range_write.rs @@ -257,7 +257,7 @@ impl oio::Write for RangeWriter { if let Err((offset, bytes, err)) = result { self.futures.push_front(WriteRangeFuture::new( self.w.clone(), - location.clone(), + location, offset, bytes, ));