diff --git a/interface/cuvis.hpp b/interface/cuvis.hpp index 5bb0323..62aafa9 100644 --- a/interface/cuvis.hpp +++ b/interface/cuvis.hpp @@ -397,7 +397,6 @@ namespace cuvis * */ std::chrono::milliseconds max_buftime; - /** The frame is saved including all results from processing, e.g. the cube. * * @copydoc cuvis_save_args_t.full_export @@ -493,35 +492,26 @@ namespace cuvis */ operator cuvis_worker_settings_t() const; - /** - * @copydoc cuvis_worker_settings_t.worker_count - */ - unsigned worker_count; + /** @copydoc cuvis_worker_settings_t.input_queue_size */ + size_t input_queue_size; - /** - * @copydoc cuvis_worker_settings_t.poll_interval - */ - std::chrono::milliseconds poll_interval; + /** @copydoc cuvis_worker_settings_t.mandatory_queue_size */ + size_t mandatory_queue_size; - /** - * @copydoc cuvis_worker_settings_t.worker_queue_size - */ - int worker_queue_size; + /** @copydoc cuvis_worker_settings_t.supplementary_queue_size */ + size_t supplementary_queue_size; - /** - * @copydoc cuvis_worker_settings_t.worker_queue_hard_limit - */ - int worker_queue_hard_limit; + /** @copydoc cuvis_worker_settings_t.output_queue_size */ + size_t output_queue_size; - /** - * @copydoc cuvis_worker_settings_t.worker_queue_soft_limit - */ - int worker_queue_soft_limit; + /** @copydoc cuvis_worker_settings_t.can_skip_measurements */ + bool can_skip_measurements; - /** - * @copydoc cuvis_worker_settings_t.can_drop - */ - bool can_drop; + /** @copydoc cuvis_worker_settings_t.can_skip_supplementary_steps */ + bool can_skip_supplementary_steps; + + /** @copydoc cuvis_worker_settings_t.can_drop_results */ + bool can_drop_results; }; /* @@ -1287,8 +1277,18 @@ namespace cuvis std::exception_ptr exception; }; - using worker_callback_t = std::function; + struct worker_state_t + { + size_t measurementsInQueue; + size_t sessionFilesInQueue; + size_t framesInQueue; + size_t measurementsBeingProcessed; + size_t resultsInQueue; + bool hasAcquisitionContext; + bool isProcessing; + }; + using worker_callback_t = std::function; public: Worker(WorkerArgs const& args); @@ -1297,23 +1297,38 @@ namespace cuvis void set_proc_cont(ProcessingContext const* procCont); void set_exporter(Exporter const* exporter); void set_viewer(Viewer const* viewer); - std::future set_session_file(SessionFile const* session, bool skip_dropped_frames = false); - std::pair query_session_progress(); - bool get_drop_behavior(); - void set_drop_behavior(bool canDrop); + + void ingest_measurement(Measurement const* measurement); + void ingest_session_file(SessionFile const* session, std::string frame_selection); + double query_session_progress(); + + bool get_can_drop_results(); + bool get_can_skip_measurements(); + bool get_can_skip_supplementary(); bool has_next_result() const; - worker_return_t get_next_result() const; + worker_return_t get_next_result(std::chrono::milliseconds timeout) const; + size_t get_queue_used() const; + size_t get_input_queue_limit() const; + size_t get_mandatory_queue_limit() const; + size_t get_supplementary_queue_limit() const; + size_t get_output_queue_limit() const; - void register_worker_callback(worker_callback_t callback, unsigned concurrency = 1); + void start_processing(); + void stop_processing(); + void drop_all_queued(); - void reset_worker_callback(); + bool is_processing_mandatory() const; + bool is_processing() const; + int32_t get_threads_busy() const; - std::pair get_queue_limits() const; - size_t get_queue_used() const; - void set_queue_limits(int32_t hard_limit, int32_t soft_limit); + worker_state_t get_state() const; + + void register_worker_callback(worker_callback_t callback, unsigned concurrency = 1); + + void reset_worker_callback(); private: std::shared_ptr _worker; @@ -1538,13 +1553,13 @@ namespace cuvis } break; case cuvis_data_type_t::data_type_string: { - CUVIS_SIZE buffer_length; - chk(cuvis_measurement_get_data_string_length(*_mesu, key, &buffer_length)); - + CUVIS_SIZE buffer_length; + chk(cuvis_measurement_get_data_string_length(*_mesu, key, &buffer_length)); + CUVIS_CHAR* value = new CUVIS_CHAR[buffer_length]; chk(cuvis_measurement_get_data_string(*_mesu, key, buffer_length, value)); _string_data->emplace(std::string(key), std::string(value)); - delete[] value; + delete[] value; } break; default: // unknown or unsupported @@ -1755,55 +1770,42 @@ namespace cuvis } } - // Returns the number of frames processed in total via the future - inline std::future Worker::set_session_file(SessionFile const* session, bool skip_dropped_frames) + inline void Worker::ingest_session_file(SessionFile const* session, std::string frame_selection) { if (session != nullptr) { - chk(cuvis_worker_set_session_file(*_worker, *session->_session, skip_dropped_frames ? 1 : 0)); - CUVIS_WORKER this_worker = *_worker; - - auto wait_on_session_done = [this_worker]() { - CUVIS_INT session_total; - CUVIS_INT session_current; - CUVIS_INT status; - do - { - status = cuvis_worker_query_session_progress(this_worker, &session_current, &session_total); - } while (status == status_ok); - return session_current; - }; - - return std::async(std::launch::deferred, wait_on_session_done); - } - else - { - chk(cuvis_worker_set_session_file(*_worker, CUVIS_HANDLE_NULL, skip_dropped_frames ? 1 : 0)); - auto dummy = []() { return -1; }; - return std::async(std::launch::deferred, dummy); + chk(cuvis_worker_ingest_session_file(*_worker, *session->_session, frame_selection.c_str())); } } - inline std::pair Worker::query_session_progress() + inline double Worker::query_session_progress() { - CUVIS_INT read = 0; - CUVIS_INT total = 0; - if (cuvis_worker_query_session_progress(*_worker, &read, &total) == status_ok) - { - return std::make_pair(read, total); - } - return std::make_pair(-1, -1); + double out = -1; + chk(cuvis_worker_query_session_progress(*_worker, &out)); + return out; } - inline bool Worker::get_drop_behavior() + inline bool Worker::get_can_drop_results() { CUVIS_INT canDrop; - chk(cuvis_worker_get_drop_behavior(*_worker, &canDrop)); + chk(cuvis_worker_get_can_drop_results(*_worker, &canDrop)); return bool(canDrop); } - inline void Worker::set_drop_behavior(bool canDrop) { chk(cuvis_worker_set_drop_behavior(*_worker, canDrop ? 1 : 0)); } + inline bool Worker::get_can_skip_measurements() + { + CUVIS_INT canSkip; + chk(cuvis_worker_get_can_skip_measurements(*_worker, &canSkip)); + return bool(canSkip); + } + + inline bool Worker::get_can_skip_supplementary() + { + CUVIS_INT canSkip; + chk(cuvis_worker_get_can_skip_supplementary(*_worker, &canSkip)); + return bool(canSkip); + } inline void Worker::set_proc_cont(ProcessingContext const* procCont) { @@ -1847,29 +1849,46 @@ namespace cuvis return hasNext != 0; } - inline void Worker::set_queue_limits(int32_t hard_limit, int32_t soft_limit) + inline size_t Worker::get_input_queue_limit() const { - chk(cuvis_worker_set_queue_limits(*_worker, hard_limit, soft_limit)); - return; + CUVIS_SIZE limit; + chk(cuvis_worker_get_input_queue_limit(*_worker, &limit)); + return limit; + } + + inline size_t Worker::get_mandatory_queue_limit() const + { + CUVIS_SIZE limit; + chk(cuvis_worker_get_mandatory_queue_limit(*_worker, &limit)); + return limit; } - inline std::pair Worker::get_queue_limits() const + inline size_t Worker::get_supplementary_queue_limit() const { - int32_t hard, soft; - chk(cuvis_worker_get_queue_limits(*_worker, &hard, &soft)); - return std::make_pair(hard, soft); + CUVIS_SIZE limit; + chk(cuvis_worker_get_supplementary_queue_limit(*_worker, &limit)); + return limit; } + + inline size_t Worker::get_output_queue_limit() const + { + CUVIS_SIZE limit; + chk(cuvis_worker_get_output_queue_limit(*_worker, &limit)); + return limit; + } + + inline size_t Worker::get_queue_used() const { int_t size; chk(cuvis_worker_get_queue_used(*_worker, &size)); return size; } - inline Worker::worker_return_t Worker::get_next_result() const + inline Worker::worker_return_t Worker::get_next_result(std::chrono::milliseconds timeout) const { CUVIS_VIEW current_view; CUVIS_MESU current_mesu; - auto code = cuvis_worker_get_next_result(*_worker, ¤t_mesu, ¤t_view, 0); + auto code = cuvis_worker_get_next_result(*_worker, ¤t_mesu, ¤t_view, timeout.count()); std::optional mesu; std::optional view; @@ -1895,6 +1914,53 @@ namespace cuvis return {std::move(mesu), view, except}; } + inline void Worker::ingest_measurement(Measurement const* measurement) { chk(cuvis_worker_ingest_mesu(*_worker, *measurement->_mesu)); } + + inline void Worker::start_processing() { chk(cuvis_worker_start(*_worker)); } + + inline void Worker::stop_processing() { chk(cuvis_worker_stop(*_worker)); } + + inline void Worker::drop_all_queued() { chk(cuvis_worker_drop_all_queued(*_worker)); } + + inline bool Worker::is_processing_mandatory() const + { + CUVIS_INT state; + chk(cuvis_worker_is_processing_mandatory(*_worker, &state)); + return state != 0; + } + + inline bool Worker::is_processing() const + { + CUVIS_INT state; + chk(cuvis_worker_is_processing(*_worker, &state)); + return state != 0; + } + + inline int32_t Worker::get_threads_busy() const + { + CUVIS_INT count; + chk(cuvis_worker_get_threads_busy(*_worker, &count)); + return count; + } + + inline Worker::worker_state_t Worker::get_state() const + { + CUVIS_WORKER_STATE state; + chk(cuvis_worker_get_state(*_worker, &state)); + + Worker::worker_state_t out; + out.measurementsInQueue = state.measurementsInQueue; + out.sessionFilesInQueue = state.sessionFilesInQueue; + out.framesInQueue = state.framesInQueue; + out.measurementsBeingProcessed = state.measurementsBeingProcessed; + out.resultsInQueue = state.resultsInQueue; + out.hasAcquisitionContext = state.hasAcquisitionContext != 0; + out.isProcessing = state.isProcessing != 0; + + return out; + } + + inline void Worker::register_worker_callback(worker_callback_t callback, unsigned concurrency) { reset_worker_callback(); @@ -1907,10 +1973,10 @@ namespace cuvis std::deque> async_tasks; while (_worker_poll_thread_run.load()) { - if (has_next_result()) - { - auto ret = get_next_result(); + auto ret = get_next_result(std::chrono::milliseconds(1000)); + if (ret.mesu.has_value()) + { async_tasks.push_back(std::async(std::launch::async, callback, std::move(ret))); if (async_tasks.size() >= concurrency) @@ -1924,7 +1990,6 @@ namespace cuvis } if (!_worker_poll_thread_run.load()) - { return; } @@ -1939,11 +2004,6 @@ namespace cuvis async_tasks.pop_front(); } } - - else - { - std::this_thread::sleep_for(poll_time); - } } }); } @@ -2551,6 +2611,7 @@ namespace cuvis save_args.soft_limit = static_cast(soft_limit); save_args.hard_limit = static_cast(hard_limit); save_args.max_buftime = static_cast(max_buftime.count()); + save_args.full_export = full_export ? 1 : 0; return save_args; } @@ -2595,22 +2656,27 @@ namespace cuvis return args; } + inline WorkerArgs::WorkerArgs() - : worker_count(std::thread::hardware_concurrency()), - poll_interval(std::chrono::milliseconds(5)), - worker_queue_hard_limit(100), - worker_queue_soft_limit(90), - can_drop(false) + : input_queue_size(10), + mandatory_queue_size(4), + supplementary_queue_size(4), + output_queue_size(10), + can_skip_measurements(false), + can_skip_supplementary_steps(true), + can_drop_results(true) {} inline WorkerArgs::operator cuvis_worker_settings_t() const { cuvis_worker_settings_t args; - args.worker_count = worker_count; - args.poll_interval = (std::int32_t)(poll_interval.count()); - args.worker_queue_hard_limit = worker_queue_hard_limit; - args.worker_queue_soft_limit = worker_queue_soft_limit; - args.can_drop = can_drop; + args.input_queue_size = input_queue_size; + args.mandatory_queue_size = mandatory_queue_size; + args.supplementary_queue_size = supplementary_queue_size; + args.output_queue_size = output_queue_size; + args.can_skip_measurements = can_skip_measurements ? 1 : 0; + args.can_skip_supplementary_steps = can_skip_supplementary_steps ? 1 : 0; + args.can_drop_results = can_drop_results ? 1 : 0; return args; }