diff --git a/Development/cmake/NmosCppTest.cmake b/Development/cmake/NmosCppTest.cmake index e3048c14a..e22a395bc 100644 --- a/Development/cmake/NmosCppTest.cmake +++ b/Development/cmake/NmosCppTest.cmake @@ -52,6 +52,12 @@ set(NMOS_CPP_TEST_NMOS_TEST_SOURCES set(NMOS_CPP_TEST_NMOS_TEST_HEADERS ) +set(NMOS_CPP_TEST_PPLX_TEST_SOURCES + pplx/test/pplx_utils_test.cpp + ) +set(NMOS_CPP_TEST_PPLX_TEST_HEADERS + ) + set(NMOS_CPP_TEST_RQL_TEST_SOURCES rql/test/rql_test.cpp ) @@ -78,6 +84,8 @@ add_executable( ${NMOS_CPP_TEST_MDNS_TEST_HEADERS} ${NMOS_CPP_TEST_NMOS_TEST_SOURCES} ${NMOS_CPP_TEST_NMOS_TEST_HEADERS} + ${NMOS_CPP_TEST_PPLX_TEST_SOURCES} + ${NMOS_CPP_TEST_PPLX_TEST_HEADERS} ${NMOS_CPP_TEST_RQL_TEST_SOURCES} ${NMOS_CPP_TEST_RQL_TEST_HEADERS} ${NMOS_CPP_TEST_SDP_TEST_SOURCES} @@ -90,6 +98,7 @@ source_group("cpprest\\test\\Source Files" FILES ${NMOS_CPP_TEST_CPPREST_TEST_SO source_group("lldp\\test\\Source Files" FILES ${NMOS_CPP_TEST_LLDP_TEST_SOURCES}) source_group("mdns\\test\\Source Files" FILES ${NMOS_CPP_TEST_MDNS_TEST_SOURCES}) source_group("nmos\\test\\Source Files" FILES ${NMOS_CPP_TEST_NMOS_TEST_SOURCES}) +source_group("pplx\\test\\Source Files" FILES ${NMOS_CPP_TEST_PPLX_TEST_SOURCES}) source_group("rql\\test\\Source Files" FILES ${NMOS_CPP_TEST_RQL_TEST_SOURCES}) source_group("sdp\\test\\Source Files" FILES ${NMOS_CPP_TEST_SDP_TEST_SOURCES}) @@ -99,6 +108,7 @@ source_group("cpprest\\test\\Header Files" FILES ${NMOS_CPP_TEST_CPPREST_TEST_HE source_group("lldp\\test\\Header Files" FILES ${NMOS_CPP_TEST_LLDP_TEST_HEADERS}) source_group("mdns\\test\\Header Files" FILES ${NMOS_CPP_TEST_MDNS_TEST_HEADERS}) source_group("nmos\\test\\Header Files" FILES ${NMOS_CPP_TEST_NMOS_TEST_HEADERS}) +source_group("pplx\\test\\Header Files" FILES ${NMOS_CPP_TEST_PPLX_TEST_HEADERS}) source_group("rql\\test\\Header Files" FILES ${NMOS_CPP_TEST_RQL_TEST_HEADERS}) source_group("sdp\\test\\Header Files" FILES ${NMOS_CPP_TEST_SDP_TEST_HEADERS}) diff --git a/Development/nmos/connection_events_activation.cpp b/Development/nmos/connection_events_activation.cpp index 72cc99e18..17b014332 100644 --- a/Development/nmos/connection_events_activation.cpp +++ b/Development/nmos/connection_events_activation.cpp @@ -34,12 +34,12 @@ namespace nmos if (active && !connection_uri_or_null.is_null() && !ext_is_07_source_id_or_null.is_null()) { events_ws_client->subscribe(connection_resource.id, connection_uri_or_null.as_string(), ext_is_07_source_id_or_null.as_string()) - .then(pplx::observe_exception()); + .then(pplx::observe_exception()); } else { events_ws_client->unsubscribe(connection_resource.id) - .then(pplx::observe_exception()); + .then(pplx::observe_exception()); } }; } diff --git a/Development/nmos/events_ws_client.cpp b/Development/nmos/events_ws_client.cpp index 822941afd..6e0486bb2 100644 --- a/Development/nmos/events_ws_client.cpp +++ b/Development/nmos/events_ws_client.cpp @@ -91,8 +91,6 @@ namespace nmos nmos::read_lock read_lock() const { return nmos::read_lock{ mutex }; } nmos::write_lock write_lock() const { return nmos::write_lock{ mutex }; } - static void wait_nothrow(pplx::task t) { try { t.wait(); } catch (...) {} } - static nmos::details::omanip_gate make_gate(slog::base_gate& gate) { return{ gate, nmos::stash_category(nmos::categories::send_events_ws_commands) }; } }; @@ -283,7 +281,7 @@ namespace nmos .then([] { return true; }); }); }, token); - }).then(pplx::observe_exception()); + }).then(pplx::observe_exception()); connection = connections.insert({ connection_uri, client }).first; } @@ -335,11 +333,7 @@ namespace nmos subscriptions.clear(); connections.clear(); - return pplx::when_all(tasks.begin(), tasks.end()).then([tasks](pplx::task finally) - { - for (auto& task : tasks) wait_nothrow(task); - finally.wait(); - }); + return pplx::ranges::when_all(tasks).then(pplx::observe_exceptions(tasks)); } } diff --git a/Development/nmos/mdns.cpp b/Development/nmos/mdns.cpp index 8aaede97c..d69ab7511 100644 --- a/Development/nmos/mdns.cpp +++ b/Development/nmos/mdns.cpp @@ -524,7 +524,7 @@ namespace nmos // when either task is completed, cancel and wait for the other to be completed // and then merge the two sets of results - resolve_task = pplx::when_any(both_tasks.begin(), both_tasks.end()).then([both_results, linked_source, both_tasks](std::pair first_result) + resolve_task = pplx::ranges::when_any(both_tasks).then([both_results, linked_source, both_tasks](std::pair first_result) { if (!both_results[first_result.second]->empty()) linked_source.cancel(); diff --git a/Development/nmos/mdns_api.cpp b/Development/nmos/mdns_api.cpp index ac2e5c6a6..15bc7806b 100644 --- a/Development/nmos/mdns_api.cpp +++ b/Development/nmos/mdns_api.cpp @@ -206,13 +206,10 @@ namespace nmos return mdns_result(browsed1, resolved); })); } - return pplx::when_all(tasks.begin(), tasks.end()).then([res, version, tasks](pplx::task> finally) mutable + return pplx::ranges::when_all(tasks).then([res, version, tasks](pplx::task> finally) mutable { // to ensure an exception from one doesn't leave other tasks' exceptions unobserved - for (auto& task : tasks) - { - try { task.wait(); } catch (...) {} - } + for (auto& task : tasks) pplx::details::wait_nothrow(task); // merge results that have the same host_target, port and txt records // and only differ in the resolved addresses diff --git a/Development/nmos/server.cpp b/Development/nmos/server.cpp index 52ff269cd..909112470 100644 --- a/Development/nmos/server.cpp +++ b/Development/nmos/server.cpp @@ -7,11 +7,6 @@ namespace nmos : model(model) {} - namespace details - { - void wait_nothrow(pplx::task t) { try { t.wait(); } catch (...) {} } - } - pplx::task server::open() { return pplx::create_task([&] @@ -70,11 +65,7 @@ namespace nmos if (0 <= ws_listener.uri().port()) tasks.push_back(ws_listener.open()); } - return pplx::when_all(tasks.begin(), tasks.end()).then([tasks](pplx::task finally) - { - for (auto& task : tasks) details::wait_nothrow(task); - finally.wait(); - }); + return pplx::ranges::when_all(tasks).then(pplx::observe_exceptions(tasks)); }); } @@ -95,11 +86,7 @@ namespace nmos if (0 <= ws_listener.uri().port()) tasks.push_back(ws_listener.close()); } - return pplx::when_all(tasks.begin(), tasks.end()).then([tasks](pplx::task finally) - { - for (auto& task : tasks) details::wait_nothrow(task); - finally.wait(); - }); + return pplx::ranges::when_all(tasks).then(pplx::observe_exceptions(tasks)); }); } diff --git a/Development/pplx/pplx_utils.h b/Development/pplx/pplx_utils.h index d7987e67a..3b90a0ce3 100644 --- a/Development/pplx/pplx_utils.h +++ b/Development/pplx/pplx_utils.h @@ -2,6 +2,7 @@ #define PPLX_PPLX_UTILS_H #include +#include #include "pplx/pplxtasks.h" #if (defined(_MSC_VER) && (_MSC_VER >= 1800)) && !CPPREST_FORCE_PPLX @@ -86,6 +87,28 @@ namespace pplx return pplx::task() == task; } + namespace details + { + template + void wait_nothrow(const pplx::task& task) + { + try + { + task.wait(); + } + catch (...) {} + } + } + + struct exception_observer + { + template + void operator()(pplx::task finally) const + { + details::wait_nothrow(finally); + } + }; + /// /// Silently 'observe' any exception thrown from a task. /// @@ -93,18 +116,133 @@ namespace pplx /// Exceptions that are unobserved when a task is destructed will terminate the process. /// Add this as a continuation to silently swallow all exceptions. /// + inline exception_observer observe_exception() + { + return exception_observer(); + } + + namespace details + { + // see http://ericniebler.com/2013/08/07/universal-references-and-the-copy-constructo/ + template + using disable_if_same_or_derived = + typename std::enable_if< + !std::is_base_of::type + >::value + >::type; + } + template - struct observe_exception + struct exceptions_observer { - void operator()(pplx::task finally) const + template > + explicit exceptions_observer(InputRange&& tasks) : tasks(tasks.begin(), tasks.end()) {} + + template + exceptions_observer(InputIterator&& first, InputIterator&& last) : tasks(std::forward(first), std::forward(last)) {} + + template + void operator()(pplx::task finally) const { - try + for (auto& task : tasks) details::wait_nothrow(task); + details::wait_nothrow(finally); + } + + private: + std::vector> tasks; + }; + + /// + /// Silently 'observe' all exceptions thrown from a range of tasks. + /// + /// + /// Exceptions that are unobserved when a task is destructed will terminate the process. + /// Add this as a continuation to silently swallow all exceptions. + /// + template ().begin())>::value_type::result_type> + inline exceptions_observer observe_exceptions(InputRange&& tasks) + { + return exceptions_observer(std::forward(tasks)); + } + + /// + /// Silently 'observe' all exceptions thrown from a range of tasks. + /// + /// + /// Exceptions that are unobserved when a task is destructed will terminate the process. + /// Add this as a continuation to silently swallow all exceptions. + /// + template ::value_type::result_type> + inline exceptions_observer observe_exceptions(InputIterator&& first, InputIterator&& last) + { + return exceptions_observer(std::forward(first), std::forward(last)); + } + + namespace details + { + template + struct workaround_default_task + { + pplx::task operator()(pplx::task task) const { - finally.wait(); + if (!pplx::empty(task)) return task; + // convert default constructed tasks into ones that pplx::when_{all,any} can handle + // see https://github.com/microsoft/cpprestsdk/issues/1701 + try + { + task.wait(); + // unreachable code + return task; + } + catch (const pplx::invalid_operation& e) + { + auto workaround = pplx::task_from_exception(e); + details::wait_nothrow(workaround); + return workaround; + } + } + }; + } + + namespace ranges + { + namespace details + { + template + auto when_all(InputRange&& tasks, const pplx::task_options& options = pplx::task_options()) + -> decltype(pplx::when_all(tasks.begin(), tasks.end(), options)) + { + return pplx::when_all(tasks.begin(), tasks.end(), options); } - catch (...) {} } - }; + + template + auto when_all(InputRange&& tasks, const pplx::task_options& options = pplx::task_options()) + -> decltype(pplx::when_all(tasks.begin(), tasks.end(), options)) + { + using ReturnType = typename std::iterator_traits::value_type::result_type; + return pplx::ranges::details::when_all(tasks | boost::adaptors::transformed(pplx::details::workaround_default_task())); + } + + namespace details + { + template + auto when_any(InputRange&& tasks, const pplx::task_options& options = pplx::task_options()) + -> decltype(pplx::when_any(tasks.begin(), tasks.end(), options)) + { + return pplx::when_any(tasks.begin(), tasks.end(), options); + } + } + + template + auto when_any(InputRange&& tasks, const pplx::task_options& options = pplx::task_options()) + -> decltype(pplx::when_any(tasks.begin(), tasks.end(), options)) + { + using ReturnType = typename std::iterator_traits::value_type::result_type; + return pplx::ranges::details::when_any(tasks | boost::adaptors::transformed(pplx::details::workaround_default_task())); + } + } /// /// RAII helper for classes that have asynchronous open/close member functions. diff --git a/Development/pplx/test/pplx_utils_test.cpp b/Development/pplx/test/pplx_utils_test.cpp new file mode 100644 index 000000000..a20a02162 --- /dev/null +++ b/Development/pplx/test/pplx_utils_test.cpp @@ -0,0 +1,109 @@ +// The first "test" is of course whether the header compiles standalone +#include "pplx/pplx_utils.h" + +#include "bst/test/test.h" + +namespace +{ + template + inline pplx::task task_from_default() { return pplx::task_from_result(ReturnType()); } + + template <> + inline pplx::task task_from_default() { return pplx::task_from_result(); } +} + +//////////////////////////////////////////////////////////////////////////////////////////// +BST_TEMPLATE_TEST_CASE_2(testPplxWhenAll, ReturnType, void, int) +{ + pplx::task default_task; + pplx::task successful_task1 = task_from_default(); + pplx::task successful_task2 = task_from_default(); + pplx::task failed_task = pplx::task_from_exception(std::runtime_error("failed")); + pplx::task_completion_event tce; + pplx::task incomplete_task(tce); + using final_task = decltype(pplx::ranges::when_all(std::declval>>())); + + BST_REQUIRE_THROW(failed_task.wait(), std::runtime_error); + BST_REQUIRE_THROW(default_task.wait(), pplx::invalid_operation); + + { + auto tasks = { successful_task1, successful_task2 }; + bool continuation = false; + pplx::ranges::when_all(tasks).then([&](final_task finally) + { + finally.wait(); + continuation = true; + }).wait(); + BST_REQUIRE(continuation); + } + + { + auto tasks = { successful_task1, failed_task, incomplete_task }; + bool continuation = false; + pplx::ranges::when_all(tasks).then([&](final_task finally) + { + BST_REQUIRE_THROW(finally.wait(), std::runtime_error); + continuation = true; + }).wait(); + BST_REQUIRE(continuation); + } + + { + auto tasks = { default_task, incomplete_task }; + bool continuation = false; + pplx::ranges::when_all(tasks).then([&](final_task finally) + { + BST_REQUIRE_THROW(finally.wait(), pplx::invalid_operation); + continuation = true; + }).wait(); + BST_REQUIRE(continuation); + } +} + +//////////////////////////////////////////////////////////////////////////////////////////// +BST_TEMPLATE_TEST_CASE_2(testPplxWhenAny, ReturnType, void, int) +{ + pplx::task default_task; + pplx::task successful_task1 = task_from_default(); + pplx::task successful_task2 = task_from_default(); + pplx::task failed_task = pplx::task_from_exception(std::runtime_error("failed")); + pplx::task_completion_event tce; + pplx::task incomplete_task(tce); + using final_task = decltype(pplx::ranges::when_any(std::declval>>())); + + BST_REQUIRE_THROW(failed_task.wait(), std::runtime_error); + BST_REQUIRE_THROW(default_task.wait(), pplx::invalid_operation); + + { + auto tasks = { successful_task1, successful_task2 }; + bool continuation = false; + pplx::ranges::when_any(tasks).then([&](final_task finally) + { + finally.wait(); + continuation = true; + }).wait(); + BST_REQUIRE(continuation); + } + + { + auto tasks = { successful_task1, failed_task, incomplete_task }; + bool continuation = false; + pplx::ranges::when_any(tasks).then([&](final_task finally) + { + finally.wait(); + continuation = true; + }).wait(); + BST_REQUIRE(continuation); + } + + { + auto tasks = { default_task }; + bool continuation = false; + pplx::ranges::when_any(tasks).then([&](final_task finally) + { + BST_REQUIRE_THROW(finally.wait(), pplx::invalid_operation); + continuation = true; + }).wait(); + BST_REQUIRE(continuation); + } +}