Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Development/cmake/NmosCppTest.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ set(NMOS_CPP_TEST_NMOS_TEST_SOURCES
set(NMOS_CPP_TEST_NMOS_TEST_HEADERS
)

set(NMOS_CPP_TEST_PPLX_TEST_SOURCES
pplx/test/pplx_utils_test.cpp
)
set(NMOS_CPP_TEST_PPLX_TEST_HEADERS
)

set(NMOS_CPP_TEST_RQL_TEST_SOURCES
rql/test/rql_test.cpp
)
Expand All @@ -78,6 +84,8 @@ add_executable(
${NMOS_CPP_TEST_MDNS_TEST_HEADERS}
${NMOS_CPP_TEST_NMOS_TEST_SOURCES}
${NMOS_CPP_TEST_NMOS_TEST_HEADERS}
${NMOS_CPP_TEST_PPLX_TEST_SOURCES}
${NMOS_CPP_TEST_PPLX_TEST_HEADERS}
${NMOS_CPP_TEST_RQL_TEST_SOURCES}
${NMOS_CPP_TEST_RQL_TEST_HEADERS}
${NMOS_CPP_TEST_SDP_TEST_SOURCES}
Expand All @@ -90,6 +98,7 @@ source_group("cpprest\\test\\Source Files" FILES ${NMOS_CPP_TEST_CPPREST_TEST_SO
source_group("lldp\\test\\Source Files" FILES ${NMOS_CPP_TEST_LLDP_TEST_SOURCES})
source_group("mdns\\test\\Source Files" FILES ${NMOS_CPP_TEST_MDNS_TEST_SOURCES})
source_group("nmos\\test\\Source Files" FILES ${NMOS_CPP_TEST_NMOS_TEST_SOURCES})
source_group("pplx\\test\\Source Files" FILES ${NMOS_CPP_TEST_PPLX_TEST_SOURCES})
source_group("rql\\test\\Source Files" FILES ${NMOS_CPP_TEST_RQL_TEST_SOURCES})
source_group("sdp\\test\\Source Files" FILES ${NMOS_CPP_TEST_SDP_TEST_SOURCES})

Expand All @@ -99,6 +108,7 @@ source_group("cpprest\\test\\Header Files" FILES ${NMOS_CPP_TEST_CPPREST_TEST_HE
source_group("lldp\\test\\Header Files" FILES ${NMOS_CPP_TEST_LLDP_TEST_HEADERS})
source_group("mdns\\test\\Header Files" FILES ${NMOS_CPP_TEST_MDNS_TEST_HEADERS})
source_group("nmos\\test\\Header Files" FILES ${NMOS_CPP_TEST_NMOS_TEST_HEADERS})
source_group("pplx\\test\\Header Files" FILES ${NMOS_CPP_TEST_PPLX_TEST_HEADERS})
source_group("rql\\test\\Header Files" FILES ${NMOS_CPP_TEST_RQL_TEST_HEADERS})
source_group("sdp\\test\\Header Files" FILES ${NMOS_CPP_TEST_SDP_TEST_HEADERS})

Expand Down
4 changes: 2 additions & 2 deletions Development/nmos/connection_events_activation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ namespace nmos
if (active && !connection_uri_or_null.is_null() && !ext_is_07_source_id_or_null.is_null())
{
events_ws_client->subscribe(connection_resource.id, connection_uri_or_null.as_string(), ext_is_07_source_id_or_null.as_string())
.then(pplx::observe_exception<void>());
.then(pplx::observe_exception());
}
else
{
events_ws_client->unsubscribe(connection_resource.id)
.then(pplx::observe_exception<void>());
.then(pplx::observe_exception());
}
};
}
Expand Down
10 changes: 2 additions & 8 deletions Development/nmos/events_ws_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,6 @@ namespace nmos
nmos::read_lock read_lock() const { return nmos::read_lock{ mutex }; }
nmos::write_lock write_lock() const { return nmos::write_lock{ mutex }; }

static void wait_nothrow(pplx::task<void> t) { try { t.wait(); } catch (...) {} }

static nmos::details::omanip_gate make_gate(slog::base_gate& gate) { return{ gate, nmos::stash_category(nmos::categories::send_events_ws_commands) }; }
};

Expand Down Expand Up @@ -283,7 +281,7 @@ namespace nmos
.then([] { return true; });
});
}, token);
}).then(pplx::observe_exception<void>());
}).then(pplx::observe_exception());

connection = connections.insert({ connection_uri, client }).first;
}
Expand Down Expand Up @@ -335,11 +333,7 @@ namespace nmos
subscriptions.clear();
connections.clear();

return pplx::when_all(tasks.begin(), tasks.end()).then([tasks](pplx::task<void> finally)
{
for (auto& task : tasks) wait_nothrow(task);
finally.wait();
});
return pplx::ranges::when_all(tasks).then(pplx::observe_exceptions(tasks));
}
}

Expand Down
2 changes: 1 addition & 1 deletion Development/nmos/mdns.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ namespace nmos

// when either task is completed, cancel and wait for the other to be completed
// and then merge the two sets of results
resolve_task = pplx::when_any(both_tasks.begin(), both_tasks.end()).then([both_results, linked_source, both_tasks](std::pair<bool, size_t> first_result)
resolve_task = pplx::ranges::when_any(both_tasks).then([both_results, linked_source, both_tasks](std::pair<bool, size_t> first_result)
{
if (!both_results[first_result.second]->empty()) linked_source.cancel();

Expand Down
7 changes: 2 additions & 5 deletions Development/nmos/mdns_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,13 +206,10 @@ namespace nmos
return mdns_result(browsed1, resolved);
}));
}
return pplx::when_all(tasks.begin(), tasks.end()).then([res, version, tasks](pplx::task<std::vector<mdns_result>> finally) mutable
return pplx::ranges::when_all(tasks).then([res, version, tasks](pplx::task<std::vector<mdns_result>> finally) mutable
{
// to ensure an exception from one doesn't leave other tasks' exceptions unobserved
for (auto& task : tasks)
{
try { task.wait(); } catch (...) {}
}
for (auto& task : tasks) pplx::details::wait_nothrow(task);

// merge results that have the same host_target, port and txt records
// and only differ in the resolved addresses
Expand Down
17 changes: 2 additions & 15 deletions Development/nmos/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,6 @@ namespace nmos
: model(model)
{}

namespace details
{
void wait_nothrow(pplx::task<void> t) { try { t.wait(); } catch (...) {} }
}

pplx::task<void> server::open()
{
return pplx::create_task([&]
Expand Down Expand Up @@ -70,11 +65,7 @@ namespace nmos
if (0 <= ws_listener.uri().port()) tasks.push_back(ws_listener.open());
}

return pplx::when_all(tasks.begin(), tasks.end()).then([tasks](pplx::task<void> finally)
{
for (auto& task : tasks) details::wait_nothrow(task);
finally.wait();
});
return pplx::ranges::when_all(tasks).then(pplx::observe_exceptions(tasks));
});
}

Expand All @@ -95,11 +86,7 @@ namespace nmos
if (0 <= ws_listener.uri().port()) tasks.push_back(ws_listener.close());
}

return pplx::when_all(tasks.begin(), tasks.end()).then([tasks](pplx::task<void> finally)
{
for (auto& task : tasks) details::wait_nothrow(task);
finally.wait();
});
return pplx::ranges::when_all(tasks).then(pplx::observe_exceptions(tasks));
});
}

Expand Down
150 changes: 144 additions & 6 deletions Development/pplx/pplx_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define PPLX_PPLX_UTILS_H

#include <chrono>
#include <boost/range/adaptor/transformed.hpp>
#include "pplx/pplxtasks.h"

#if (defined(_MSC_VER) && (_MSC_VER >= 1800)) && !CPPREST_FORCE_PPLX
Expand Down Expand Up @@ -86,25 +87,162 @@ namespace pplx
return pplx::task<ReturnType>() == task;
}

namespace details
{
template <typename ReturnType>
void wait_nothrow(const pplx::task<ReturnType>& task)
{
try
{
task.wait();
}
catch (...) {}
}
}

struct exception_observer
{
template <typename ContinuationReturnType>
void operator()(pplx::task<ContinuationReturnType> finally) const
{
details::wait_nothrow(finally);
}
};

/// <summary>
/// Silently 'observe' any exception thrown from a task.
/// </summary>
/// <remarks>
/// Exceptions that are unobserved when a task is destructed will terminate the process.
/// Add this as a continuation to silently swallow all exceptions.
/// </remarks>
inline exception_observer observe_exception()
{
return exception_observer();
}

namespace details
{
// see http://ericniebler.com/2013/08/07/universal-references-and-the-copy-constructo/
template<typename A, typename B>
using disable_if_same_or_derived =
typename std::enable_if<
!std::is_base_of<A,typename
std::remove_reference<B>::type
>::value
>::type;
}

template <typename ReturnType>
struct observe_exception
struct exceptions_observer
{
void operator()(pplx::task<ReturnType> finally) const
template <typename InputRange, typename X = pplx::details::disable_if_same_or_derived<exceptions_observer, InputRange>>
explicit exceptions_observer(InputRange&& tasks) : tasks(tasks.begin(), tasks.end()) {}

template <typename InputIterator>
exceptions_observer(InputIterator&& first, InputIterator&& last) : tasks(std::forward<InputIterator>(first), std::forward<InputIterator>(last)) {}

template <typename ContinuationReturnType>
void operator()(pplx::task<ContinuationReturnType> finally) const
{
try
for (auto& task : tasks) details::wait_nothrow(task);
details::wait_nothrow(finally);
}

private:
std::vector<pplx::task<ReturnType>> tasks;
};

/// <summary>
/// Silently 'observe' all exceptions thrown from a range of tasks.
/// </summary>
/// <remarks>
/// Exceptions that are unobserved when a task is destructed will terminate the process.
/// Add this as a continuation to silently swallow all exceptions.
/// </remarks>
template <typename InputRange, typename ReturnType = typename std::iterator_traits<decltype(std::declval<InputRange>().begin())>::value_type::result_type>
inline exceptions_observer<ReturnType> observe_exceptions(InputRange&& tasks)
{
return exceptions_observer<ReturnType>(std::forward<InputRange>(tasks));
}

/// <summary>
/// Silently 'observe' all exceptions thrown from a range of tasks.
/// </summary>
/// <remarks>
/// Exceptions that are unobserved when a task is destructed will terminate the process.
/// Add this as a continuation to silently swallow all exceptions.
/// </remarks>
template <typename InputIterator, typename ReturnType = typename std::iterator_traits<InputIterator>::value_type::result_type>
inline exceptions_observer<ReturnType> observe_exceptions(InputIterator&& first, InputIterator&& last)
{
return exceptions_observer<ReturnType>(std::forward<InputIterator>(first), std::forward<InputIterator>(last));
}

namespace details
{
template <typename ReturnType>
struct workaround_default_task
{
pplx::task<ReturnType> operator()(pplx::task<ReturnType> task) const
{
finally.wait();
if (!pplx::empty(task)) return task;
// convert default constructed tasks into ones that pplx::when_{all,any} can handle
// see https://github.com/microsoft/cpprestsdk/issues/1701
try
{
task.wait();
// unreachable code
return task;
}
catch (const pplx::invalid_operation& e)
{
auto workaround = pplx::task_from_exception<ReturnType>(e);
details::wait_nothrow(workaround);
return workaround;
}
}
};
}

namespace ranges
{
namespace details
{
template <typename InputRange>
auto when_all(InputRange&& tasks, const pplx::task_options& options = pplx::task_options())
-> decltype(pplx::when_all(tasks.begin(), tasks.end(), options))
{
return pplx::when_all(tasks.begin(), tasks.end(), options);
}
catch (...) {}
}
};

template <typename InputRange>
auto when_all(InputRange&& tasks, const pplx::task_options& options = pplx::task_options())
-> decltype(pplx::when_all(tasks.begin(), tasks.end(), options))
{
using ReturnType = typename std::iterator_traits<decltype(tasks.begin())>::value_type::result_type;
return pplx::ranges::details::when_all(tasks | boost::adaptors::transformed(pplx::details::workaround_default_task<ReturnType>()));
}

namespace details
{
template <typename InputRange>
auto when_any(InputRange&& tasks, const pplx::task_options& options = pplx::task_options())
-> decltype(pplx::when_any(tasks.begin(), tasks.end(), options))
{
return pplx::when_any(tasks.begin(), tasks.end(), options);
}
}

template <typename InputRange>
auto when_any(InputRange&& tasks, const pplx::task_options& options = pplx::task_options())
-> decltype(pplx::when_any(tasks.begin(), tasks.end(), options))
{
using ReturnType = typename std::iterator_traits<decltype(tasks.begin())>::value_type::result_type;
return pplx::ranges::details::when_any(tasks | boost::adaptors::transformed(pplx::details::workaround_default_task<ReturnType>()));
}
}

/// <summary>
/// RAII helper for classes that have asynchronous open/close member functions.
Expand Down
Loading