From 33c8bfac244942d417e869774ab48ad4f0f7d5c0 Mon Sep 17 00:00:00 2001 From: Alberto Soragna Date: Fri, 5 May 2023 17:54:43 +0100 Subject: [PATCH 1/3] add mutex to protect events_executor current entity collection and unit-test Signed-off-by: Alberto Soragna --- .../events_executor/events_executor.hpp | 7 +- .../events_executor/events_executor.cpp | 4 ++ .../test/rclcpp/executors/test_executors.cpp | 70 +++++++++++++++++++ 3 files changed, 80 insertions(+), 1 deletion(-) diff --git a/rclcpp/include/rclcpp/experimental/executors/events_executor/events_executor.hpp b/rclcpp/include/rclcpp/experimental/executors/events_executor/events_executor.hpp index 2365b67697..65650ff2e4 100644 --- a/rclcpp/include/rclcpp/experimental/executors/events_executor/events_executor.hpp +++ b/rclcpp/include/rclcpp/experimental/executors/events_executor/events_executor.hpp @@ -253,6 +253,8 @@ class EventsExecutor : public rclcpp::Executor typename CollectionType::EntitySharedPtr retrieve_entity(typename CollectionType::Key entity_id, CollectionType & collection) { + std::lock_guard lock(collection_mutex_); + // Check if the entity_id is in the collection auto it = collection.find(entity_id); if (it == collection.end()) { @@ -274,9 +276,12 @@ class EventsExecutor : public rclcpp::Executor rclcpp::experimental::executors::EventsQueue::UniquePtr events_queue_; std::shared_ptr entities_collector_; - std::shared_ptr current_entities_collection_; std::shared_ptr notify_waitable_; + /// Mutex to protect the current_entities_collection_ + std::recursive_mutex collection_mutex_; + std::shared_ptr current_entities_collection_; + /// Flag used to reduce the number of unnecessary waitable events std::atomic notify_waitable_event_pushed_ {false}; diff --git a/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp b/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp index 33e9cb67bd..33400ddd51 100644 --- a/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp +++ b/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp @@ -386,11 +386,15 @@ EventsExecutor::get_automatically_added_callback_groups_from_nodes() void EventsExecutor::refresh_current_collection_from_callback_groups() { + // Build the new collection this->entities_collector_->update_collections(); auto callback_groups = this->entities_collector_->get_all_callback_groups(); rclcpp::executors::ExecutorEntitiesCollection new_collection; rclcpp::executors::build_entities_collection(callback_groups, new_collection); + // Acquire lock before modifying the current collection + std::lock_guard lock(collection_mutex_); + // TODO(alsora): this may be implemented in a better way. // We need the notify waitable to be included in the executor "current_collection" // because we need to be able to retrieve events for it. diff --git a/rclcpp/test/rclcpp/executors/test_executors.cpp b/rclcpp/test/rclcpp/executors/test_executors.cpp index 232baaace3..4b79f39a4e 100644 --- a/rclcpp/test/rclcpp/executors/test_executors.cpp +++ b/rclcpp/test/rclcpp/executors/test_executors.cpp @@ -796,6 +796,76 @@ TYPED_TEST(TestExecutors, testRaceConditionAddNode) } } +// This test verifies the thread-safety of adding and removing a node +// while the executor is spinning and events are ready. +// This test does not contain expectations, but rather it verifies that +// we can run a "stressful routine" without crashing. +TYPED_TEST(TestExecutors, stressAddRemoveNode) +{ + using ExecutorType = TypeParam; + // rmw_connextdds doesn't support events-executor + if ( + std::is_same() && + std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) + { + GTEST_SKIP(); + } + + // Spawn some threads to do some heavy work + std::atomic should_cancel = false; + std::vector stress_threads; + for (size_t i = 0; i < 5 * std::thread::hardware_concurrency(); i++) { + stress_threads.emplace_back( + [&should_cancel, i]() { + // This is just some arbitrary heavy work + volatile size_t total = 0; + for (size_t k = 0; k < 549528914167; k++) { + if (should_cancel) { + break; + } + total += k * (i + 42); + } + }); + } + + ExecutorType executor; + + // A timer that is "always" ready (the timer callback doesn't do anything) + auto timer = this->node->create_wall_timer(std::chrono::nanoseconds(1), []() {}); + + // This thread spins the executor until it's cancelled + std::thread spinner_thread([&]() { + executor.spin(); + }); + + // This thread publishes data in a busy loop (the node has a subscription) + std::thread publisher_thread([&]() { + for (size_t i = 0; i < 10000; i++) { + this->publisher->publish(test_msgs::msg::Empty()); + } + }); + + // This thread adds/remove the node that contains the entities in a busy loop + std::thread add_remove_thread([&]() { + for (size_t i = 0; i < 10000; i++) { + executor.add_node(this->node); + executor.remove_node(this->node); + } + }); + + // Wait for the threads that do real work to finish + publisher_thread.join(); + add_remove_thread.join(); + + // The test is now completed: we can join the threads + should_cancel = true; + for (auto & t : stress_threads) { + t.join(); + } + executor.cancel(); + spinner_thread.join(); +} + // Check spin_until_future_complete with node base pointer (instantiates its own executor) TEST(TestExecutors, testSpinUntilFutureCompleteNodeBasePtr) { From 4f1032114df0620af51b44a0d93877d5538f5a25 Mon Sep 17 00:00:00 2001 From: Alberto Soragna Date: Thu, 11 May 2023 23:15:50 +0100 Subject: [PATCH 2/3] be more precise with mutex locks; make stress test less stressfull Signed-off-by: Alberto Soragna --- .../events_executor/events_executor.hpp | 2 - .../events_executor/events_executor.cpp | 51 ++++++++++++------- .../test/rclcpp/executors/test_executors.cpp | 36 ++++--------- 3 files changed, 44 insertions(+), 45 deletions(-) diff --git a/rclcpp/include/rclcpp/experimental/executors/events_executor/events_executor.hpp b/rclcpp/include/rclcpp/experimental/executors/events_executor/events_executor.hpp index 65650ff2e4..dd5b1ebe63 100644 --- a/rclcpp/include/rclcpp/experimental/executors/events_executor/events_executor.hpp +++ b/rclcpp/include/rclcpp/experimental/executors/events_executor/events_executor.hpp @@ -253,8 +253,6 @@ class EventsExecutor : public rclcpp::Executor typename CollectionType::EntitySharedPtr retrieve_entity(typename CollectionType::Key entity_id, CollectionType & collection) { - std::lock_guard lock(collection_mutex_); - // Check if the entity_id is in the collection auto it = collection.find(entity_id); if (it == collection.end()) { diff --git a/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp b/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp index 33400ddd51..6579407b79 100644 --- a/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp +++ b/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp @@ -273,10 +273,13 @@ EventsExecutor::execute_event(const ExecutorEvent & event) switch (event.type) { case ExecutorEventType::CLIENT_EVENT: { - auto client = this->retrieve_entity( - static_cast(event.entity_key), - current_entities_collection_->clients); - + rclcpp::ClientBase::SharedPtr client; + { + std::lock_guard lock(collection_mutex_); + client = this->retrieve_entity( + static_cast(event.entity_key), + current_entities_collection_->clients); + } if (client) { for (size_t i = 0; i < event.num_events; i++) { execute_client(client); @@ -287,9 +290,13 @@ EventsExecutor::execute_event(const ExecutorEvent & event) } case ExecutorEventType::SUBSCRIPTION_EVENT: { - auto subscription = this->retrieve_entity( - static_cast(event.entity_key), - current_entities_collection_->subscriptions); + rclcpp::SubscriptionBase::SharedPtr subscription; + { + std::lock_guard lock(collection_mutex_); + subscription = this->retrieve_entity( + static_cast(event.entity_key), + current_entities_collection_->subscriptions); + } if (subscription) { for (size_t i = 0; i < event.num_events; i++) { execute_subscription(subscription); @@ -299,10 +306,13 @@ EventsExecutor::execute_event(const ExecutorEvent & event) } case ExecutorEventType::SERVICE_EVENT: { - auto service = this->retrieve_entity( - static_cast(event.entity_key), - current_entities_collection_->services); - + rclcpp::ServiceBase::SharedPtr service; + { + std::lock_guard lock(collection_mutex_); + service = this->retrieve_entity( + static_cast(event.entity_key), + current_entities_collection_->services); + } if (service) { for (size_t i = 0; i < event.num_events; i++) { execute_service(service); @@ -319,9 +329,13 @@ EventsExecutor::execute_event(const ExecutorEvent & event) } case ExecutorEventType::WAITABLE_EVENT: { - auto waitable = this->retrieve_entity( - static_cast(event.entity_key), - current_entities_collection_->waitables); + rclcpp::Waitable::SharedPtr waitable; + { + std::lock_guard lock(collection_mutex_); + waitable = this->retrieve_entity( + static_cast(event.entity_key), + current_entities_collection_->waitables); + } if (waitable) { for (size_t i = 0; i < event.num_events; i++) { auto data = waitable->take_data_by_entity_id(event.waitable_data); @@ -392,9 +406,6 @@ EventsExecutor::refresh_current_collection_from_callback_groups() rclcpp::executors::ExecutorEntitiesCollection new_collection; rclcpp::executors::build_entities_collection(callback_groups, new_collection); - // Acquire lock before modifying the current collection - std::lock_guard lock(collection_mutex_); - // TODO(alsora): this may be implemented in a better way. // We need the notify waitable to be included in the executor "current_collection" // because we need to be able to retrieve events for it. @@ -404,6 +415,9 @@ EventsExecutor::refresh_current_collection_from_callback_groups() // To do it, we need to add the notify waitable as an entry in both the new and // current collections such that it's neither added or removed. this->add_notify_waitable_to_collection(new_collection.waitables); + + // Acquire lock before modifying the current collection + std::lock_guard lock(collection_mutex_); this->add_notify_waitable_to_collection(current_entities_collection_->waitables); this->refresh_current_collection(new_collection); @@ -413,6 +427,9 @@ void EventsExecutor::refresh_current_collection( const rclcpp::executors::ExecutorEntitiesCollection & new_collection) { + // Acquire lock before modifying the current collection + std::lock_guard lock(collection_mutex_); + current_entities_collection_->timers.update( new_collection.timers, [this](rclcpp::TimerBase::SharedPtr timer) {timers_manager_->add_timer(timer);}, diff --git a/rclcpp/test/rclcpp/executors/test_executors.cpp b/rclcpp/test/rclcpp/executors/test_executors.cpp index 4b79f39a4e..e9aad61685 100644 --- a/rclcpp/test/rclcpp/executors/test_executors.cpp +++ b/rclcpp/test/rclcpp/executors/test_executors.cpp @@ -811,23 +811,6 @@ TYPED_TEST(TestExecutors, stressAddRemoveNode) GTEST_SKIP(); } - // Spawn some threads to do some heavy work - std::atomic should_cancel = false; - std::vector stress_threads; - for (size_t i = 0; i < 5 * std::thread::hardware_concurrency(); i++) { - stress_threads.emplace_back( - [&should_cancel, i]() { - // This is just some arbitrary heavy work - volatile size_t total = 0; - for (size_t k = 0; k < 549528914167; k++) { - if (should_cancel) { - break; - } - total += k * (i + 42); - } - }); - } - ExecutorType executor; // A timer that is "always" ready (the timer callback doesn't do anything) @@ -839,29 +822,30 @@ TYPED_TEST(TestExecutors, stressAddRemoveNode) }); // This thread publishes data in a busy loop (the node has a subscription) - std::thread publisher_thread([&]() { - for (size_t i = 0; i < 10000; i++) { + std::thread publisher_thread1([&]() { + for (size_t i = 0; i < 100000; i++) { + this->publisher->publish(test_msgs::msg::Empty()); + } + }); + std::thread publisher_thread2([&]() { + for (size_t i = 0; i < 100000; i++) { this->publisher->publish(test_msgs::msg::Empty()); } }); // This thread adds/remove the node that contains the entities in a busy loop std::thread add_remove_thread([&]() { - for (size_t i = 0; i < 10000; i++) { + for (size_t i = 0; i < 100000; i++) { executor.add_node(this->node); executor.remove_node(this->node); } }); // Wait for the threads that do real work to finish - publisher_thread.join(); + publisher_thread1.join(); + publisher_thread2.join(); add_remove_thread.join(); - // The test is now completed: we can join the threads - should_cancel = true; - for (auto & t : stress_threads) { - t.join(); - } executor.cancel(); spinner_thread.join(); } From 42c119cee2ab0b8ae96a218c3fa7e1b64a428290 Mon Sep 17 00:00:00 2001 From: Alberto Soragna Date: Sun, 14 May 2023 08:04:06 +0100 Subject: [PATCH 3/3] fix uncrustify error Signed-off-by: Alberto Soragna --- .../experimental/executors/events_executor/events_executor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp b/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp index 6579407b79..64b07c0814 100644 --- a/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp +++ b/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp @@ -296,7 +296,7 @@ EventsExecutor::execute_event(const ExecutorEvent & event) subscription = this->retrieve_entity( static_cast(event.entity_key), current_entities_collection_->subscriptions); - } + } if (subscription) { for (size_t i = 0; i < event.num_events; i++) { execute_subscription(subscription);