From 624612b0e28c0935325ec7dc376f251a5b694b75 Mon Sep 17 00:00:00 2001 From: mwish Date: Thu, 14 Sep 2023 11:28:23 +0800 Subject: [PATCH 1/6] GH-37670: [C++] IO Output extend from enable_shared_from_this --- cpp/src/arrow/filesystem/s3fs.cc | 27 +++++++++++++++------------ cpp/src/arrow/io/interfaces.h | 4 +++- 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc index 29f8882225a..3da6b922ac8 100644 --- a/cpp/src/arrow/filesystem/s3fs.cc +++ b/cpp/src/arrow/filesystem/s3fs.cc @@ -1472,21 +1472,23 @@ class ObjectOutputStream final : public io::OutputStream { RETURN_NOT_OK(UploadPart("", 0)); } + auto self = + ::arrow::internal::checked_pointer_cast(shared_from_this()); // Wait for in-progress uploads to finish (if async writes are enabled) - return FlushAsync().Then([this]() { - ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); + return FlushAsync().Then([self]() { + ARROW_ASSIGN_OR_RAISE(auto client_lock, self->holder_->Lock()); // At this point, all part uploads have finished successfully - DCHECK_GT(part_number_, 1); - DCHECK_EQ(upload_state_->completed_parts.size(), - static_cast(part_number_ - 1)); + DCHECK_GT(self->part_number_, 1); + DCHECK_EQ(self->upload_state_->completed_parts.size(), + static_cast(self->part_number_ - 1)); S3Model::CompletedMultipartUpload completed_upload; - completed_upload.SetParts(upload_state_->completed_parts); + completed_upload.SetParts(self->upload_state_->completed_parts); S3Model::CompleteMultipartUploadRequest req; - req.SetBucket(ToAwsString(path_.bucket)); - req.SetKey(ToAwsString(path_.key)); - req.SetUploadId(upload_id_); + req.SetBucket(ToAwsString(self->path_.bucket)); + req.SetKey(ToAwsString(self->path_.key)); + req.SetUploadId(self->upload_id_); req.SetMultipartUpload(std::move(completed_upload)); auto outcome = @@ -1494,12 +1496,13 @@ class ObjectOutputStream final : public io::OutputStream { if (!outcome.IsSuccess()) { return ErrorToStatus( std::forward_as_tuple("When completing multiple part upload for key '", - path_.key, "' in bucket '", path_.bucket, "': "), + self->path_.key, "' in bucket '", self->path_.bucket, + "': "), "CompleteMultipartUpload", outcome.GetError()); } - holder_ = nullptr; - closed_ = true; + self->holder_ = nullptr; + self->closed_ = true; return Status::OK(); }); } diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h index dcbe4feb261..d6eb9bdbf66 100644 --- a/cpp/src/arrow/io/interfaces.h +++ b/cpp/src/arrow/io/interfaces.h @@ -200,7 +200,9 @@ class ARROW_EXPORT Readable { virtual const IOContext& io_context() const; }; -class ARROW_EXPORT OutputStream : virtual public FileInterface, public Writable { +class ARROW_EXPORT OutputStream : virtual public FileInterface, + public Writable, + public std::enable_shared_from_this { protected: OutputStream() = default; }; From 6915cdb705e2fb541bd76bbeb7c59a0755023ce5 Mon Sep 17 00:00:00 2001 From: mwish Date: Thu, 14 Sep 2023 13:52:16 +0800 Subject: [PATCH 2/6] fix multi-derived problem --- cpp/src/arrow/filesystem/s3fs.cc | 6 +++--- cpp/src/arrow/io/interfaces.h | 4 +--- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc index 3da6b922ac8..a34e4858ada 100644 --- a/cpp/src/arrow/filesystem/s3fs.cc +++ b/cpp/src/arrow/filesystem/s3fs.cc @@ -1372,7 +1372,8 @@ class ObjectInputFile final : public io::RandomAccessFile { static constexpr int64_t kPartUploadSize = 10 * 1024 * 1024; // An OutputStream that writes to a S3 object -class ObjectOutputStream final : public io::OutputStream { +class ObjectOutputStream final : public io::OutputStream, + public std::enable_shared_from_this { protected: struct UploadState; @@ -1472,8 +1473,7 @@ class ObjectOutputStream final : public io::OutputStream { RETURN_NOT_OK(UploadPart("", 0)); } - auto self = - ::arrow::internal::checked_pointer_cast(shared_from_this()); + auto self = shared_from_this(); // Wait for in-progress uploads to finish (if async writes are enabled) return FlushAsync().Then([self]() { ARROW_ASSIGN_OR_RAISE(auto client_lock, self->holder_->Lock()); diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h index d6eb9bdbf66..dcbe4feb261 100644 --- a/cpp/src/arrow/io/interfaces.h +++ b/cpp/src/arrow/io/interfaces.h @@ -200,9 +200,7 @@ class ARROW_EXPORT Readable { virtual const IOContext& io_context() const; }; -class ARROW_EXPORT OutputStream : virtual public FileInterface, - public Writable, - public std::enable_shared_from_this { +class ARROW_EXPORT OutputStream : virtual public FileInterface, public Writable { protected: OutputStream() = default; }; From da1d1da5cc6b8b9bc4ccc2408d614a7ffccdadf0 Mon Sep 17 00:00:00 2001 From: mwish Date: Fri, 15 Sep 2023 18:42:18 +0800 Subject: [PATCH 3/6] Fix S3Test --- cpp/src/arrow/filesystem/s3fs.cc | 77 +++++++++++++++++++------------- 1 file changed, 45 insertions(+), 32 deletions(-) diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc index a34e4858ada..cd125aec64d 100644 --- a/cpp/src/arrow/filesystem/s3fs.cc +++ b/cpp/src/arrow/filesystem/s3fs.cc @@ -1455,9 +1455,51 @@ class ObjectOutputStream final : public io::OutputStream, // OutputStream interface + Status FinishPartUploadAfterFlush() { + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); + + // At this point, all part uploads have finished successfully + DCHECK_GT(part_number_, 1); + DCHECK_EQ(upload_state_->completed_parts.size(), + static_cast(part_number_ - 1)); + + S3Model::CompletedMultipartUpload completed_upload; + completed_upload.SetParts(upload_state_->completed_parts); + S3Model::CompleteMultipartUploadRequest req; + req.SetBucket(ToAwsString(path_.bucket)); + req.SetKey(ToAwsString(path_.key)); + req.SetUploadId(upload_id_); + req.SetMultipartUpload(std::move(completed_upload)); + + auto outcome = + client_lock.Move()->CompleteMultipartUploadWithErrorFixup(std::move(req)); + if (!outcome.IsSuccess()) { + return ErrorToStatus( + std::forward_as_tuple("When completing multiple part upload for key '", + path_.key, "' in bucket '", path_.bucket, "': "), + "CompleteMultipartUpload", outcome.GetError()); + } + + holder_ = nullptr; + closed_ = true; + return Status::OK(); + } + Status Close() override { - auto fut = CloseAsync(); - return fut.status(); + if (closed_) return Status::OK(); + + if (current_part_) { + // Upload last part + RETURN_NOT_OK(CommitCurrentPart()); + } + + // S3 mandates at least one part, upload an empty one if necessary + if (part_number_ == 1) { + RETURN_NOT_OK(UploadPart("", 0)); + } + + RETURN_NOT_OK(Flush()); + return FinishPartUploadAfterFlush(); } Future<> CloseAsync() override { @@ -1475,36 +1517,7 @@ class ObjectOutputStream final : public io::OutputStream, auto self = shared_from_this(); // Wait for in-progress uploads to finish (if async writes are enabled) - return FlushAsync().Then([self]() { - ARROW_ASSIGN_OR_RAISE(auto client_lock, self->holder_->Lock()); - - // At this point, all part uploads have finished successfully - DCHECK_GT(self->part_number_, 1); - DCHECK_EQ(self->upload_state_->completed_parts.size(), - static_cast(self->part_number_ - 1)); - - S3Model::CompletedMultipartUpload completed_upload; - completed_upload.SetParts(self->upload_state_->completed_parts); - S3Model::CompleteMultipartUploadRequest req; - req.SetBucket(ToAwsString(self->path_.bucket)); - req.SetKey(ToAwsString(self->path_.key)); - req.SetUploadId(self->upload_id_); - req.SetMultipartUpload(std::move(completed_upload)); - - auto outcome = - client_lock.Move()->CompleteMultipartUploadWithErrorFixup(std::move(req)); - if (!outcome.IsSuccess()) { - return ErrorToStatus( - std::forward_as_tuple("When completing multiple part upload for key '", - self->path_.key, "' in bucket '", self->path_.bucket, - "': "), - "CompleteMultipartUpload", outcome.GetError()); - } - - self->holder_ = nullptr; - self->closed_ = true; - return Status::OK(); - }); + return FlushAsync().Then([self]() { return self->FinishPartUploadAfterFlush(); }); } bool closed() const override { return closed_; } From 035b753db5faac50fbbc0ea1358489d8b1397d28 Mon Sep 17 00:00:00 2001 From: mwish Date: Tue, 19 Sep 2023 13:57:21 +0800 Subject: [PATCH 4/6] add tests --- cpp/src/arrow/filesystem/s3fs_test.cc | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/cpp/src/arrow/filesystem/s3fs_test.cc b/cpp/src/arrow/filesystem/s3fs_test.cc index e9f14fde723..c48701886cc 100644 --- a/cpp/src/arrow/filesystem/s3fs_test.cc +++ b/cpp/src/arrow/filesystem/s3fs_test.cc @@ -590,6 +590,17 @@ class TestS3FS : public S3TestMixin { AssertObjectContents(client_.get(), "bucket", "somefile", "new data"); } + void TestOpenOutputStreamCloseAsyncDestructor() { + std::shared_ptr stream; + ASSERT_OK_AND_ASSIGN(stream, fs_->OpenOutputStream("bucket/somefile")); + ASSERT_OK(stream->Write("new data")); + // Destructor implicitly closes stream and completes the multipart upload. + auto closeAsyncFut = stream->CloseAsync(); + stream.reset(); + ASSERT_OK(closeAsyncFut.MoveResult()); + AssertObjectContents(client_.get(), "bucket", "somefile", "new data"); + } + protected: S3Options options_; std::shared_ptr fs_; @@ -1177,6 +1188,16 @@ TEST_F(TestS3FS, OpenOutputStreamDestructorSyncWrite) { TestOpenOutputStreamDestructor(); } +TEST_F(TestS3FS, OpenOutputStreamAsyncDestructorBackgroundWrites) { + TestOpenOutputStreamCloseAsyncDestructor(); +} + +TEST_F(TestS3FS, OpenOutputStreamAsyncDestructorSyncWrite) { + options_.background_writes = false; + MakeFileSystem(); + TestOpenOutputStreamCloseAsyncDestructor(); +} + TEST_F(TestS3FS, OpenOutputStreamMetadata) { std::shared_ptr stream; From d87f09bf28568a2bff605545948af76874cabf1e Mon Sep 17 00:00:00 2001 From: mwish Date: Tue, 19 Sep 2023 21:01:41 +0800 Subject: [PATCH 5/6] fix comment --- cpp/src/arrow/filesystem/s3fs.cc | 34 +++++++++++++-------------- cpp/src/arrow/filesystem/s3fs_test.cc | 2 ++ 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc index 04a74be12f5..08fbcde6fd9 100644 --- a/cpp/src/arrow/filesystem/s3fs.cc +++ b/cpp/src/arrow/filesystem/s3fs.cc @@ -1454,6 +1454,20 @@ class ObjectOutputStream final : public io::OutputStream { // OutputStream interface + Status EnsureReadyToFlushFromClose() { + if (current_part_) { + // Upload last part + RETURN_NOT_OK(CommitCurrentPart()); + } + + // S3 mandates at least one part, upload an empty one if necessary + if (part_number_ == 1) { + RETURN_NOT_OK(UploadPart("", 0)); + } + + return Status::OK(); + } + Status FinishPartUploadAfterFlush() { ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); @@ -1487,15 +1501,7 @@ class ObjectOutputStream final : public io::OutputStream { Status Close() override { if (closed_) return Status::OK(); - if (current_part_) { - // Upload last part - RETURN_NOT_OK(CommitCurrentPart()); - } - - // S3 mandates at least one part, upload an empty one if necessary - if (part_number_ == 1) { - RETURN_NOT_OK(UploadPart("", 0)); - } + RETURN_NOT_OK(EnsureReadyToFlushFromClose()); RETURN_NOT_OK(Flush()); return FinishPartUploadAfterFlush(); @@ -1504,15 +1510,7 @@ class ObjectOutputStream final : public io::OutputStream { Future<> CloseAsync() override { if (closed_) return Status::OK(); - if (current_part_) { - // Upload last part - RETURN_NOT_OK(CommitCurrentPart()); - } - - // S3 mandates at least one part, upload an empty one if necessary - if (part_number_ == 1) { - RETURN_NOT_OK(UploadPart("", 0)); - } + RETURN_NOT_OK(EnsureReadyToFlushFromClose()); auto self = std::dynamic_pointer_cast(shared_from_this()); // Wait for in-progress uploads to finish (if async writes are enabled) diff --git a/cpp/src/arrow/filesystem/s3fs_test.cc b/cpp/src/arrow/filesystem/s3fs_test.cc index c48701886cc..61489b37ef4 100644 --- a/cpp/src/arrow/filesystem/s3fs_test.cc +++ b/cpp/src/arrow/filesystem/s3fs_test.cc @@ -595,6 +595,8 @@ class TestS3FS : public S3TestMixin { ASSERT_OK_AND_ASSIGN(stream, fs_->OpenOutputStream("bucket/somefile")); ASSERT_OK(stream->Write("new data")); // Destructor implicitly closes stream and completes the multipart upload. + // GH-37670: Testing it doesn't matter whether flush is triggered asynchronously + // after CloseAsync or synchronously after stream.reset(). auto closeAsyncFut = stream->CloseAsync(); stream.reset(); ASSERT_OK(closeAsyncFut.MoveResult()); From 56c111878da0e8a9ba2c0a5bb2e6ce13327b6a77 Mon Sep 17 00:00:00 2001 From: mwish <1506118561@qq.com> Date: Tue, 19 Sep 2023 22:33:16 +0800 Subject: [PATCH 6/6] Update cpp/src/arrow/filesystem/s3fs_test.cc Co-authored-by: Benjamin Kietzman --- cpp/src/arrow/filesystem/s3fs_test.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/filesystem/s3fs_test.cc b/cpp/src/arrow/filesystem/s3fs_test.cc index 61489b37ef4..f88ee7eef93 100644 --- a/cpp/src/arrow/filesystem/s3fs_test.cc +++ b/cpp/src/arrow/filesystem/s3fs_test.cc @@ -596,7 +596,9 @@ class TestS3FS : public S3TestMixin { ASSERT_OK(stream->Write("new data")); // Destructor implicitly closes stream and completes the multipart upload. // GH-37670: Testing it doesn't matter whether flush is triggered asynchronously - // after CloseAsync or synchronously after stream.reset(). + // after CloseAsync or synchronously after stream.reset() since we're just + // checking that `closeAsyncFut` keeps the stream alive until completion + // rather than segfaulting on a dangling stream auto closeAsyncFut = stream->CloseAsync(); stream.reset(); ASSERT_OK(closeAsyncFut.MoveResult());