diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 637f3d1a54f..e06fad9a1de 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -190,6 +190,7 @@ set(ARROW_SRCS io/slow.cc io/stdio.cc io/transform.cc + util/async_util.cc util/basic_decimal.cc util/bit_block_counter.cc util/bit_run_reader.cc diff --git a/cpp/src/arrow/util/CMakeLists.txt b/cpp/src/arrow/util/CMakeLists.txt index aa875ab6bee..1b14215ddd8 100644 --- a/cpp/src/arrow/util/CMakeLists.txt +++ b/cpp/src/arrow/util/CMakeLists.txt @@ -42,6 +42,7 @@ add_arrow_test(utility-test SOURCES align_util_test.cc async_generator_test.cc + async_util_test.cc bit_block_counter_test.cc bit_util_test.cc cache_test.cc diff --git a/cpp/src/arrow/util/async_util.cc b/cpp/src/arrow/util/async_util.cc new file mode 100644 index 00000000000..76c971f576e --- /dev/null +++ b/cpp/src/arrow/util/async_util.cc @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/util/async_util.h" + +#include "arrow/util/future.h" +#include "arrow/util/logging.h" + +namespace arrow { +namespace util { + +AsyncDestroyable::AsyncDestroyable() : on_closed_(Future<>::Make()) {} + +#ifndef NDEBUG +AsyncDestroyable::~AsyncDestroyable() { + DCHECK(constructed_correctly_) << "An instance of AsyncDestroyable must be created by " + "MakeSharedAsync or MakeUniqueAsync"; +} +#else +AsyncDestroyable::~AsyncDestroyable() = default; +#endif + +void AsyncDestroyable::Destroy() { + DoDestroy().AddCallback([this](const Status& st) { + on_closed_.MarkFinished(st); + delete this; + }); +} + +Status AsyncTaskGroup::AddTask(const Future<>& task) { + auto guard = mutex_.Lock(); + if (all_tasks_done_.is_finished()) { + return Status::Invalid("Attempt to add a task after the task group has completed"); + } + if (!err_.ok()) { + return err_; + } + // If the task is already finished there is nothing to track so lets save + // some work and return early + if (task.is_finished()) { + err_ &= task.status(); + return Status::OK(); + } + running_tasks_++; + guard.Unlock(); + task.AddCallback([this](const Status& st) { + auto guard = mutex_.Lock(); + err_ &= st; + if (--running_tasks_ == 0 && finished_adding_) { + guard.Unlock(); + all_tasks_done_.MarkFinished(err_); + } + }); + return Status::OK(); +} + +Future<> AsyncTaskGroup::WaitForTasksToFinish() { + auto guard = mutex_.Lock(); + finished_adding_ = true; + if (running_tasks_ == 0) { + all_tasks_done_.MarkFinished(err_); + return all_tasks_done_; + } + return all_tasks_done_; +} + +} // namespace util +} // namespace arrow diff --git a/cpp/src/arrow/util/async_util.h b/cpp/src/arrow/util/async_util.h new file mode 100644 index 00000000000..31e5d09a86c --- /dev/null +++ b/cpp/src/arrow/util/async_util.h @@ -0,0 +1,132 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "arrow/result.h" +#include "arrow/status.h" +#include "arrow/util/future.h" +#include "arrow/util/mutex.h" + +namespace arrow { +namespace util { + +/// Custom deleter for AsyncDestroyable objects +template +struct DestroyingDeleter { + void operator()(T* p) { p->Destroy(); } +}; + +/// An object which should be asynchronously closed before it is destroyed +/// +/// Classes can extend this to ensure that the close method is called and completed +/// before the instance is deleted. This provides smart_ptr / delete semantics for +/// objects with an asynchronous destructor. +/// +/// Classes which extend this must be constructed using MakeSharedAsync or MakeUniqueAsync +class ARROW_EXPORT AsyncDestroyable { + public: + AsyncDestroyable(); + virtual ~AsyncDestroyable(); + + /// A future which will complete when the AsyncDestroyable has finished and is ready + /// to be deleted. + /// + /// This can be used to ensure all work done by this object has been completed before + /// proceeding. + Future<> on_closed() { return on_closed_; } + + protected: + /// Subclasses should override this and perform any cleanup. Once the future returned + /// by this method finishes then this object is eligible for destruction and any + /// reference to `this` will be invalid + virtual Future<> DoDestroy() = 0; + + private: + void Destroy(); + + Future<> on_closed_; +#ifndef NDEBUG + bool constructed_correctly_ = false; +#endif + + template + friend struct DestroyingDeleter; + template + friend std::shared_ptr MakeSharedAsync(Args&&... args); + template + friend std::unique_ptr> MakeUniqueAsync(Args&&... args); +}; + +template +std::shared_ptr MakeSharedAsync(Args&&... args) { + static_assert(std::is_base_of::value, + "Nursery::MakeSharedCloseable only works with AsyncDestroyable types"); + std::shared_ptr ptr(new T(std::forward(args)...), DestroyingDeleter()); +#ifndef NDEBUG + ptr->constructed_correctly_ = true; +#endif + return ptr; +} + +template +std::unique_ptr> MakeUniqueAsync(Args&&... args) { + static_assert(std::is_base_of::value, + "Nursery::MakeUniqueCloseable only works with AsyncDestroyable types"); + std::unique_ptr> ptr(new T(std::forward(args)...), + DestroyingDeleter()); +#ifndef NDEBUG + ptr->constructed_correctly_ = true; +#endif + return ptr; +} + +/// A utility which keeps track of a collection of asynchronous tasks +/// +/// This can be used to provide structured concurrency for asynchronous development. +/// A task group created at a high level can be distributed amongst low level components +/// which register work to be completed. The high level job can then wait for all work +/// to be completed before cleaning up. +class ARROW_EXPORT AsyncTaskGroup { + public: + /// Add a task to be tracked by this task group + /// + /// If a previous task has failed then adding a task will fail + /// + /// If WaitForTasksToFinish has been called and the returned future has been marked + /// completed then adding a task will fail. + Status AddTask(const Future<>& task); + /// A future that will be completed when all running tasks are finished. + /// + /// It is allowed for tasks to be added after this call provided the future has not yet + /// completed. This should be safe as long as the tasks being added are added as part + /// of a task that is tracked. As soon as the count of running tasks reaches 0 this + /// future will be marked complete. + /// + /// Any attempt to add a task after the returned future has completed will fail. + Future<> WaitForTasksToFinish(); + + private: + bool finished_adding_ = false; + int running_tasks_ = 0; + Status err_; + Future<> all_tasks_done_ = Future<>::Make(); + util::Mutex mutex_; +}; + +} // namespace util +} // namespace arrow diff --git a/cpp/src/arrow/util/async_util_test.cc b/cpp/src/arrow/util/async_util_test.cc new file mode 100644 index 00000000000..f263ee548cf --- /dev/null +++ b/cpp/src/arrow/util/async_util_test.cc @@ -0,0 +1,154 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/util/async_util.h" + +#include + +#include "arrow/result.h" +#include "arrow/testing/future_util.h" +#include "arrow/testing/gtest_util.h" + +namespace arrow { +namespace util { + +class GatingDestroyable : public AsyncDestroyable { + public: + GatingDestroyable(Future<> close_future, bool* destroyed) + : close_future_(std::move(close_future)), destroyed_(destroyed) {} + ~GatingDestroyable() override { *destroyed_ = true; } + + protected: + Future<> DoDestroy() override { return close_future_; } + + private: + Future<> close_future_; + bool* destroyed_; +}; + +template +void TestAsyncDestroyable(Factory factory) { + Future<> gate = Future<>::Make(); + bool destroyed = false; + bool on_closed = false; + { + auto obj = factory(gate, &destroyed); + obj->on_closed().AddCallback([&](const Status& st) { on_closed = true; }); + ASSERT_FALSE(destroyed); + } + ASSERT_FALSE(destroyed); + ASSERT_FALSE(on_closed); + gate.MarkFinished(); + ASSERT_TRUE(destroyed); + ASSERT_TRUE(on_closed); +} + +TEST(AsyncDestroyable, MakeShared) { + TestAsyncDestroyable([](Future<> gate, bool* destroyed) { + return MakeSharedAsync(gate, destroyed); + }); +} + +TEST(AsyncDestroyable, MakeUnique) { + TestAsyncDestroyable([](Future<> gate, bool* destroyed) { + return MakeUniqueAsync(gate, destroyed); + }); +} + +TEST(AsyncTaskGroup, Basic) { + AsyncTaskGroup task_group; + Future<> fut1 = Future<>::Make(); + Future<> fut2 = Future<>::Make(); + ASSERT_OK(task_group.AddTask(fut1)); + ASSERT_OK(task_group.AddTask(fut2)); + Future<> all_done = task_group.WaitForTasksToFinish(); + AssertNotFinished(all_done); + fut1.MarkFinished(); + AssertNotFinished(all_done); + fut2.MarkFinished(); + ASSERT_FINISHES_OK(all_done); +} + +TEST(AsyncTaskGroup, NoTasks) { + AsyncTaskGroup task_group; + ASSERT_FINISHES_OK(task_group.WaitForTasksToFinish()); +} + +TEST(AsyncTaskGroup, AddAfterDone) { + AsyncTaskGroup task_group; + ASSERT_FINISHES_OK(task_group.WaitForTasksToFinish()); + ASSERT_RAISES(Invalid, task_group.AddTask(Future<>::Make())); +} + +TEST(AsyncTaskGroup, AddAfterWaitButBeforeFinish) { + AsyncTaskGroup task_group; + Future<> task_one = Future<>::Make(); + ASSERT_OK(task_group.AddTask(task_one)); + Future<> finish_fut = task_group.WaitForTasksToFinish(); + AssertNotFinished(finish_fut); + Future<> task_two = Future<>::Make(); + ASSERT_OK(task_group.AddTask(task_two)); + AssertNotFinished(finish_fut); + task_one.MarkFinished(); + AssertNotFinished(finish_fut); + task_two.MarkFinished(); + AssertFinished(finish_fut); + ASSERT_FINISHES_OK(finish_fut); +} + +TEST(AsyncTaskGroup, Error) { + AsyncTaskGroup task_group; + Future<> failed_task = Future<>::MakeFinished(Status::Invalid("XYZ")); + ASSERT_OK(task_group.AddTask(failed_task)); + ASSERT_FINISHES_AND_RAISES(Invalid, task_group.WaitForTasksToFinish()); +} + +TEST(AsyncTaskGroup, TaskFinishesAfterError) { + AsyncTaskGroup task_group; + Future<> fut1 = Future<>::Make(); + ASSERT_OK(task_group.AddTask(fut1)); + ASSERT_OK(task_group.AddTask(Future<>::MakeFinished(Status::Invalid("XYZ")))); + Future<> finished_fut = task_group.WaitForTasksToFinish(); + AssertNotFinished(finished_fut); + fut1.MarkFinished(); + ASSERT_FINISHES_AND_RAISES(Invalid, finished_fut); +} + +TEST(AsyncTaskGroup, AddAfterFailed) { + AsyncTaskGroup task_group; + ASSERT_OK(task_group.AddTask(Future<>::MakeFinished(Status::Invalid("XYZ")))); + ASSERT_RAISES(Invalid, task_group.AddTask(Future<>::Make())); + ASSERT_FINISHES_AND_RAISES(Invalid, task_group.WaitForTasksToFinish()); +} + +TEST(AsyncTaskGroup, FailAfterAdd) { + AsyncTaskGroup task_group; + Future<> will_fail = Future<>::Make(); + ASSERT_OK(task_group.AddTask(will_fail)); + Future<> added_later_and_passes = Future<>::Make(); + ASSERT_OK(task_group.AddTask(added_later_and_passes)); + will_fail.MarkFinished(Status::Invalid("XYZ")); + ASSERT_RAISES(Invalid, task_group.AddTask(Future<>::Make())); + Future<> finished_fut = task_group.WaitForTasksToFinish(); + AssertNotFinished(finished_fut); + added_later_and_passes.MarkFinished(); + AssertFinished(finished_fut); + ASSERT_FINISHES_AND_RAISES(Invalid, finished_fut); +} + +} // namespace util +} // namespace arrow