From d8d7bcdb29de471c1d8f33a81a47f77932a931ac Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 29 Mar 2021 11:30:08 -1000 Subject: [PATCH 1/4] ARROW-12040: Switch to using task group instead of manually tracking tasks in flight. This makes the walker thread safe so we can get rid of the mutex ARROW-12040: Slight cleanup --- cpp/src/arrow/filesystem/s3fs.cc | 74 +++++++++++--------------------- 1 file changed, 25 insertions(+), 49 deletions(-) diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc index 1940f4dd4df..28afe296011 100644 --- a/cpp/src/arrow/filesystem/s3fs.cc +++ b/cpp/src/arrow/filesystem/s3fs.cc @@ -79,11 +79,13 @@ #include "arrow/util/future.h" #include "arrow/util/logging.h" #include "arrow/util/optional.h" +#include "arrow/util/task_group.h" #include "arrow/util/thread_pool.h" #include "arrow/util/windows_fixup.h" namespace arrow { +using internal::TaskGroup; using internal::Uri; namespace fs { @@ -1142,26 +1144,17 @@ struct TreeWalker : public std::enable_shared_from_this { recursion_handler_(std::move(recursion_handler)) {} private: - std::mutex mutex_; - Future<> future_; - std::atomic num_in_flight_; + std::shared_ptr task_group_; Status DoWalk() { - future_ = decltype(future_)::Make(); - num_in_flight_ = 0; + task_group_ = + TaskGroup::MakeThreaded(io_context_.executor(), io_context_.stop_token()); WalkChild(base_dir_, /*nesting_depth=*/0); // When this returns, ListObjectsV2 tasks either have finished or will exit early - return future_.status(); + return task_group_->Finish(); } - bool is_finished() const { return future_.is_finished(); } - - void ListObjectsFinished(Status st) { - const auto in_flight = --num_in_flight_; - if (!st.ok() || !in_flight) { - future_.MarkFinished(std::move(st)); - } - } + bool ok() const { return task_group_->ok(); } struct ListObjectsV2Handler { std::shared_ptr walker; @@ -1169,56 +1162,41 @@ struct TreeWalker : public std::enable_shared_from_this { int32_t nesting_depth; S3Model::ListObjectsV2Request req; - void operator()(const Result& result) { + Status operator()(const Result& result) { // Serialize calls to operation-specific handlers - std::unique_lock guard(walker->mutex_); - if (walker->is_finished()) { + if (!walker->ok()) { // Early exit: avoid executing handlers if DoWalk() returned - return; + return Status::OK(); } if (!result.ok()) { - HandleError(result.status()); - return; + return result.status(); } const auto& outcome = *result; if (!outcome.IsSuccess()) { - Status st = walker->error_handler_(outcome.GetError()); - HandleError(std::move(st)); - return; + return walker->error_handler_(outcome.GetError()); } - HandleResult(outcome.GetResult()); + return HandleResult(outcome.GetResult()); } void SpawnListObjectsV2() { auto walker = this->walker; auto req = this->req; - auto maybe_fut = walker->io_context_.executor()->Submit( - walker->io_context_.stop_token(), - [walker, req]() { return walker->client_->ListObjectsV2(req); }); - if (!maybe_fut.ok()) { - HandleError(maybe_fut.status()); - return; - } - maybe_fut->AddCallback(*this); + auto cb = *this; + walker->task_group_->Append([walker, req, cb]() mutable { + Result result = + walker->client_->ListObjectsV2(req); + return cb(result); + }); } - void HandleError(Status status) { walker->ListObjectsFinished(std::move(status)); } - - void HandleResult(const S3Model::ListObjectsV2Result& result) { + Status HandleResult(const S3Model::ListObjectsV2Result& result) { bool recurse = result.GetCommonPrefixes().size() > 0; if (recurse) { - auto maybe_recurse = walker->recursion_handler_(nesting_depth + 1); - if (!maybe_recurse.ok()) { - walker->ListObjectsFinished(maybe_recurse.status()); - return; - } - recurse &= *maybe_recurse; - } - Status st = walker->result_handler_(prefix, result); - if (!st.ok()) { - walker->ListObjectsFinished(std::move(st)); - return; + ARROW_ASSIGN_OR_RAISE(auto maybe_recurse, + walker->recursion_handler_(nesting_depth + 1)); + recurse &= maybe_recurse; } + RETURN_NOT_OK(walker->result_handler_(prefix, result)); if (recurse) { walker->WalkChildren(result, nesting_depth + 1); } @@ -1228,9 +1206,8 @@ struct TreeWalker : public std::enable_shared_from_this { DCHECK(!result.GetNextContinuationToken().empty()); req.SetContinuationToken(result.GetNextContinuationToken()); SpawnListObjectsV2(); - } else { - walker->ListObjectsFinished(Status::OK()); } + return Status::OK(); } void Start() { @@ -1246,7 +1223,6 @@ struct TreeWalker : public std::enable_shared_from_this { void WalkChild(std::string key, int32_t nesting_depth) { ListObjectsV2Handler handler{shared_from_this(), std::move(key), nesting_depth, {}}; - ++num_in_flight_; handler.Start(); } From d81ebc264c5ca24786532058178b6e8cb6344763 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 5 Apr 2021 13:31:33 -1000 Subject: [PATCH 2/4] ARROW-12040: The mutex is still necessary to guard the calls to the various handlers as they are not thread safe --- cpp/src/arrow/filesystem/s3fs.cc | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc index 28afe296011..63811d882e4 100644 --- a/cpp/src/arrow/filesystem/s3fs.cc +++ b/cpp/src/arrow/filesystem/s3fs.cc @@ -78,6 +78,7 @@ #include "arrow/util/checked_cast.h" #include "arrow/util/future.h" #include "arrow/util/logging.h" +#include "arrow/util/mutex.h" #include "arrow/util/optional.h" #include "arrow/util/task_group.h" #include "arrow/util/thread_pool.h" @@ -1145,6 +1146,7 @@ struct TreeWalker : public std::enable_shared_from_this { private: std::shared_ptr task_group_; + util::Mutex mutex_; Status DoWalk() { task_group_ = @@ -1173,7 +1175,10 @@ struct TreeWalker : public std::enable_shared_from_this { } const auto& outcome = *result; if (!outcome.IsSuccess()) { - return walker->error_handler_(outcome.GetError()); + { + auto guard = walker->mutex_.Lock(); + return walker->error_handler_(outcome.GetError()); + } } return HandleResult(outcome.GetResult()); } @@ -1190,13 +1195,18 @@ struct TreeWalker : public std::enable_shared_from_this { } Status HandleResult(const S3Model::ListObjectsV2Result& result) { - bool recurse = result.GetCommonPrefixes().size() > 0; - if (recurse) { - ARROW_ASSIGN_OR_RAISE(auto maybe_recurse, - walker->recursion_handler_(nesting_depth + 1)); - recurse &= maybe_recurse; + bool recurse; + { + // Only one thread should be running result_handler_/recursion_handler_ at a time + auto guard = walker->mutex_.Lock(); + recurse = result.GetCommonPrefixes().size() > 0; + if (recurse) { + ARROW_ASSIGN_OR_RAISE(auto maybe_recurse, + walker->recursion_handler_(nesting_depth + 1)); + recurse &= maybe_recurse; + } + RETURN_NOT_OK(walker->result_handler_(prefix, result)); } - RETURN_NOT_OK(walker->result_handler_(prefix, result)); if (recurse) { walker->WalkChildren(result, nesting_depth + 1); } From e3620296a443d4ebe1f6f9230f23754a289d1324 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Tue, 6 Apr 2021 11:50:06 -1000 Subject: [PATCH 3/4] ARROW-12040: Addressing comments from PR --- cpp/src/arrow/filesystem/s3fs.cc | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc index 63811d882e4..958b4b84fb5 100644 --- a/cpp/src/arrow/filesystem/s3fs.cc +++ b/cpp/src/arrow/filesystem/s3fs.cc @@ -1146,7 +1146,7 @@ struct TreeWalker : public std::enable_shared_from_this { private: std::shared_ptr task_group_; - util::Mutex mutex_; + std::mutex mutex_; Status DoWalk() { task_group_ = @@ -1176,7 +1176,7 @@ struct TreeWalker : public std::enable_shared_from_this { const auto& outcome = *result; if (!outcome.IsSuccess()) { { - auto guard = walker->mutex_.Lock(); + std::lock_guard guard(walker->mutex_); return walker->error_handler_(outcome.GetError()); } } @@ -1184,12 +1184,10 @@ struct TreeWalker : public std::enable_shared_from_this { } void SpawnListObjectsV2() { - auto walker = this->walker; - auto req = this->req; auto cb = *this; - walker->task_group_->Append([walker, req, cb]() mutable { + walker->task_group_->Append([cb]() mutable { Result result = - walker->client_->ListObjectsV2(req); + cb.walker->client_->ListObjectsV2(cb.req); return cb(result); }); } @@ -1198,7 +1196,7 @@ struct TreeWalker : public std::enable_shared_from_this { bool recurse; { // Only one thread should be running result_handler_/recursion_handler_ at a time - auto guard = walker->mutex_.Lock(); + std::lock_guard guard(walker->mutex_); recurse = result.GetCommonPrefixes().size() > 0; if (recurse) { ARROW_ASSIGN_OR_RAISE(auto maybe_recurse, From df50ffed3e5f1d0cd081b23827184caa7025a50a Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Wed, 7 Apr 2021 10:18:20 +0200 Subject: [PATCH 4/4] Nit: remove unused include --- cpp/src/arrow/filesystem/s3fs.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc index 958b4b84fb5..599e64be87d 100644 --- a/cpp/src/arrow/filesystem/s3fs.cc +++ b/cpp/src/arrow/filesystem/s3fs.cc @@ -78,7 +78,6 @@ #include "arrow/util/checked_cast.h" #include "arrow/util/future.h" #include "arrow/util/logging.h" -#include "arrow/util/mutex.h" #include "arrow/util/optional.h" #include "arrow/util/task_group.h" #include "arrow/util/thread_pool.h"