From cf9f22680feb94142871b2abaf0757c75cecba34 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Zientkiewicz?= Date: Mon, 23 Feb 2026 10:56:32 +0100 Subject: [PATCH 01/19] [WIP] MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Michał Zientkiewicz --- dali/pipeline/util/thread_pool.cc | 1 + dali/pipeline/util/thread_pool.h | 18 ++++++++++++++++-- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/dali/pipeline/util/thread_pool.cc b/dali/pipeline/util/thread_pool.cc index d30a5f64837..b58b5fbf09a 100644 --- a/dali/pipeline/util/thread_pool.cc +++ b/dali/pipeline/util/thread_pool.cc @@ -123,6 +123,7 @@ std::vector ThreadPool::GetThreadIds() const { void ThreadPool::ThreadMain(int thread_id, int device_id, bool set_affinity, const std::string &name) { + this_thread_idx_ = thread_id; SetThreadName(name.c_str()); DeviceGuard g(device_id); try { diff --git a/dali/pipeline/util/thread_pool.h b/dali/pipeline/util/thread_pool.h index 3e32d759f62..dcdbc031d36 100644 --- a/dali/pipeline/util/thread_pool.h +++ b/dali/pipeline/util/thread_pool.h @@ -32,11 +32,25 @@ #endif #include "dali/core/semaphore.h" #include "dali/core/spinlock.h" - +#include "dali/core/exec/thread_idx.h" namespace dali { -class DLL_PUBLIC ThreadPool { +class SingleJobThreadPool : public ThisThreadIdx { + virtual ~SingleJobThreadPool() = default; + + virtual void AddWork(std::function work, int64_t priority = 0) = 0; + + virtual void AddWork(std::function work, int64_t priority = 0) = 0; + + virtual void Run(bool wait) = 0; + + virtual void WaitForWork() = 0; + + virtual std::vector GetThreadIds() const = 0; +}; + +class DLL_PUBLIC ThreadPool : public SingleJobThreadPool { public: // Basic unit of work that our threads do typedef std::function Work; From 443659b177c933815e2cbb3f190487290a9b1045 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Zientkiewicz?= Date: Mon, 23 Feb 2026 21:44:59 +0100 Subject: [PATCH 02/19] Extract common interface. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Michał Zientkiewicz --- dali/benchmark/displacement_cpu_bench.cc | 2 +- dali/benchmark/operator_bench.h | 2 +- dali/benchmark/thread_pool_bench.cc | 2 +- dali/kernels/test/scatter_gather_test.cc | 2 +- .../nvjpeg/nvjpeg_decoder_decoupled_api.h | 4 +- dali/operators/imgcodec/image_decoder.h | 2 +- dali/operators/reader/fits_reader_op.cc | 6 +-- dali/operators/reader/nemo_asr_reader_op.h | 2 +- dali/operators/reader/numpy_reader_gpu_op.h | 2 +- dali/operators/reader/numpy_reader_op.h | 4 +- dali/operators/reader/tfrecord_reader_op.h | 4 +- dali/operators/reader/webdataset_reader_op.cc | 2 +- .../video/decoder/video_decoder_base.h | 2 +- dali/operators/video/input/video_input.h | 11 +++-- dali/pipeline/executor/executor2/exec2.cc | 2 +- .../executor/executor2/exec_graph_test.cc | 12 ++--- dali/pipeline/executor/executor_impl.h | 2 +- dali/pipeline/operator/eager_operator.h | 6 +-- dali/pipeline/operator/false_gpu_operator.h | 2 +- dali/pipeline/pipeline_debug.h | 2 +- dali/pipeline/util/thread_pool.cc | 29 +++++++----- dali/pipeline/util/thread_pool.h | 44 +++++++++++-------- dali/pipeline/util/thread_pool_test.cc | 10 ++--- dali/python/backend_impl.cc | 2 +- dali/test/operators/identity_input.h | 2 +- dali/util/s3_client_manager.h | 2 +- 26 files changed, 90 insertions(+), 72 deletions(-) diff --git a/dali/benchmark/displacement_cpu_bench.cc b/dali/benchmark/displacement_cpu_bench.cc index cd890dab293..cb19fd35441 100644 --- a/dali/benchmark/displacement_cpu_bench.cc +++ b/dali/benchmark/displacement_cpu_bench.cc @@ -94,7 +94,7 @@ void DisplacementBench(benchmark::State& st) {//NOLINT } // We need a thread pool - ThreadPool tp(4, 0, false, "DisplacementBench"); + OldThreadPool tp(4, 0, false, "DisplacementBench"); // Create workspace and set input and output Workspace ws; diff --git a/dali/benchmark/operator_bench.h b/dali/benchmark/operator_bench.h index a7a002897dc..3b46fe71b26 100644 --- a/dali/benchmark/operator_bench.h +++ b/dali/benchmark/operator_bench.h @@ -68,7 +68,7 @@ class OperatorBench : public DALIBenchmark { // Create workspace and set input and output Workspace ws; ws.AddInput(data_in); - ThreadPool tp(num_threads, 0, false, "OperatorBench"); + OldThreadPool tp(num_threads, 0, false, "OperatorBench"); ws.SetThreadPool(&tp); Setup>(op_ptr, op_spec, ws, batch_size); diff --git a/dali/benchmark/thread_pool_bench.cc b/dali/benchmark/thread_pool_bench.cc index 86ea6fb017c..66c5e5033ad 100644 --- a/dali/benchmark/thread_pool_bench.cc +++ b/dali/benchmark/thread_pool_bench.cc @@ -33,7 +33,7 @@ BENCHMARK_DEFINE_F(ThreadPoolBench, AddWorkDeferred)(benchmark::State& st) { int work_size_max = st.range(2); int nthreads = st.range(3); - ThreadPool thread_pool(nthreads, 0, false, "ThreadPoolBench"); + OldThreadPool thread_pool(nthreads, 0, false, "ThreadPoolBench"); std::vector data(2000, 0xFF); std::atomic total_count(0); diff --git a/dali/kernels/test/scatter_gather_test.cc b/dali/kernels/test/scatter_gather_test.cc index 67a47059d3e..ad386810dfa 100644 --- a/dali/kernels/test/scatter_gather_test.cc +++ b/dali/kernels/test/scatter_gather_test.cc @@ -142,7 +142,7 @@ class ScatterGatherTest : public testing::Test { this->template Memset(out_ptr.get(), 0, out.size()); T sg(max_block); - ThreadPool tp(4, 0, false, "test TP"); + OldThreadPool tp(4, 0, false, "test TP"); // copy for (auto &r : ranges) sg.AddCopy(r.dst, r.src, r.size); diff --git a/dali/operators/decoder/nvjpeg/nvjpeg_decoder_decoupled_api.h b/dali/operators/decoder/nvjpeg/nvjpeg_decoder_decoupled_api.h index 878577db0e5..0df79370fe7 100644 --- a/dali/operators/decoder/nvjpeg/nvjpeg_decoder_decoupled_api.h +++ b/dali/operators/decoder/nvjpeg/nvjpeg_decoder_decoupled_api.h @@ -1150,8 +1150,8 @@ class nvJPEGDecoder : public StatelessOperator, CachedDecoderImpl nvjpegDevAllocator_t device_allocator_; nvjpegPinnedAllocator_t pinned_allocator_; - ThreadPool thread_pool_; - ThreadPool nvjpeg2k_thread_; + OldThreadPool thread_pool_; + OldThreadPool nvjpeg2k_thread_; static constexpr int kOutputDim = 3; TensorList hw_decoder_images_staging_; diff --git a/dali/operators/imgcodec/image_decoder.h b/dali/operators/imgcodec/image_decoder.h index 14c031fa36c..d449a6723ac 100644 --- a/dali/operators/imgcodec/image_decoder.h +++ b/dali/operators/imgcodec/image_decoder.h @@ -229,7 +229,7 @@ class ImageDecoder : public StatelessOperator { GetDecoderSpecificArguments(spec); if (std::is_same::value) { - thread_pool_ = std::make_unique(num_threads_, device_id_, + thread_pool_ = std::make_unique(num_threads_, device_id_, spec.GetArgument("affine"), "MixedDecoder"); if (spec_.HasArgument("cache_size")) cache_ = std::make_unique(spec_); diff --git a/dali/operators/reader/fits_reader_op.cc b/dali/operators/reader/fits_reader_op.cc index cb600781ed3..539d33d9105 100644 --- a/dali/operators/reader/fits_reader_op.cc +++ b/dali/operators/reader/fits_reader_op.cc @@ -43,7 +43,7 @@ This operator can be used in the following modes: 2. Read file names from a text file indicated in `file_list` argument. 3. Read files listed in `files` argument. 4. Number of outputs per sample corresponds to the length of `hdu_indices` argument. By default, -first HDU with data is read from each file, so the number of outputs defaults to 1. +first HDU with data is read from each file, so the number of outputs defaults to 1. )") .NumInput(0) .OutputFn(detail::FitsReaderOutputFn) @@ -94,7 +94,7 @@ If `file_root` is provided, the paths are treated as being relative to it. This argument is mutually exclusive with `file_list`.)", nullptr) .AddOptionalArg("hdu_indices", - R"(HDU indices to read. If not provided, the first HDU after the primary + R"(HDU indices to read. If not provided, the first HDU after the primary will be yielded. Since HDUs are indexed starting from 1, the default value is as follows: hdu_indices = [2]. Size of the provided list hdu_indices defines number of outputs per sample.)", std::vector{2}) @@ -114,7 +114,7 @@ void FitsReaderCPU::RunImpl(Workspace &ws) { auto &output = ws.Output(output_idx); for (int file_idx = 0; file_idx < num_samples; file_idx++) { auto &sample = GetSample(file_idx); - ThreadPool::Work copy_task = [output_idx = output_idx, data_idx = file_idx, &output, + auto copy_task = [output_idx = output_idx, data_idx = file_idx, &output, &sample](int) { std::memcpy(output.raw_mutable_tensor(data_idx), sample.data[output_idx].raw_data(), sample.data[output_idx].nbytes()); diff --git a/dali/operators/reader/nemo_asr_reader_op.h b/dali/operators/reader/nemo_asr_reader_op.h index 02c94b4c8c4..3d2dcf9f8a4 100644 --- a/dali/operators/reader/nemo_asr_reader_op.h +++ b/dali/operators/reader/nemo_asr_reader_op.h @@ -46,7 +46,7 @@ class NemoAsrReader : public DataReader DALIDataType dtype_; int num_threads_; - ThreadPool thread_pool_; + OldThreadPool thread_pool_; // prefetch_depth * batch_size set of buffers that we reuse to decode audio using TensorListPtr = std::unique_ptr>; diff --git a/dali/operators/reader/numpy_reader_gpu_op.h b/dali/operators/reader/numpy_reader_gpu_op.h index 860d8ae832f..4810df0723c 100644 --- a/dali/operators/reader/numpy_reader_gpu_op.h +++ b/dali/operators/reader/numpy_reader_gpu_op.h @@ -55,7 +55,7 @@ class NumpyReaderGPU : gds::GDSLazyInit, public NumpyReader> prefetched_batch_tensors_; diff --git a/dali/operators/reader/numpy_reader_op.h b/dali/operators/reader/numpy_reader_op.h index 6dd2df49468..d2725b8d15e 100644 --- a/dali/operators/reader/numpy_reader_op.h +++ b/dali/operators/reader/numpy_reader_op.h @@ -196,8 +196,8 @@ class NumpyReaderCPU : public NumpyReader { */ size_t o_direct_alignm_ = 0; size_t o_direct_read_len_alignm_ = 0; - // ThreadPool for prefetch which is a separate thread - ThreadPool thread_pool_; + // OldThreadPool for prefetch which is a separate thread + OldThreadPool thread_pool_; }; } // namespace dali diff --git a/dali/operators/reader/tfrecord_reader_op.h b/dali/operators/reader/tfrecord_reader_op.h index 4c2cbd453b1..b7b40817cd8 100644 --- a/dali/operators/reader/tfrecord_reader_op.h +++ b/dali/operators/reader/tfrecord_reader_op.h @@ -79,8 +79,8 @@ class TFRecordReader bool dont_use_mmap_ = false; bool use_o_direct_ = false; size_t o_direct_chunk_size_ = 0; - // ThreadPool for prefetch which is a separate thread - ThreadPool thread_pool_; + // OldThreadPool for prefetch which is a separate thread + OldThreadPool thread_pool_; }; } // namespace dali diff --git a/dali/operators/reader/webdataset_reader_op.cc b/dali/operators/reader/webdataset_reader_op.cc index 55b14eee77d..f3fc0179939 100644 --- a/dali/operators/reader/webdataset_reader_op.cc +++ b/dali/operators/reader/webdataset_reader_op.cc @@ -50,7 +50,7 @@ void WebdatasetReader::RunImpl(Workspace &ws) { auto& output = ws.Output(output_idx); for (int data_idx = 0; data_idx < num_samples; data_idx++) { auto& sample = GetSample(data_idx); - ThreadPool::Work copy_task = [output_idx = output_idx, data_idx = data_idx, &output, + auto copy_task = [output_idx = output_idx, data_idx = data_idx, &output, &sample](int) { output.SetMeta(data_idx, sample[output_idx].GetMeta()); std::memcpy(output.raw_mutable_tensor(data_idx), sample[output_idx].raw_data(), diff --git a/dali/operators/video/decoder/video_decoder_base.h b/dali/operators/video/decoder/video_decoder_base.h index 608486006e5..8c7284b4682 100644 --- a/dali/operators/video/decoder/video_decoder_base.h +++ b/dali/operators/video/decoder/video_decoder_base.h @@ -48,7 +48,7 @@ class DLL_PUBLIC VideoDecoderBase : public Operator { if (spec_.HasArgument("device")) { auto device_str = spec_.template GetArgument("device"); if (device_str == "mixed") { - thread_pool_ = std::make_unique( + thread_pool_ = std::make_unique( spec.GetArgument("num_threads"), spec.GetArgument("device_id"), spec.GetArgument("affine"), "VideoDecoder"); } diff --git a/dali/operators/video/input/video_input.h b/dali/operators/video/input/video_input.h index 210ee4dd779..8891bb93e76 100644 --- a/dali/operators/video/input/video_input.h +++ b/dali/operators/video/input/video_input.h @@ -99,8 +99,11 @@ class VideoInput : public InputOperator { last_sequence_policy_ == "partial" || last_sequence_policy_ == "pad", make_string("Provided `last_sequence_policy` is not supported: ", last_sequence_policy_)); if constexpr (!is_cpu) { - thread_pool_.emplace(this->num_threads_, spec.GetArgument("device_id"), - spec.GetArgument("affine"), "VideoInput"); + thread_pool_ = std::make_unique( + this->num_threads_, + spec.GetArgument("device_id"), + spec.GetArgument("affine"), + "VideoInput"); } } @@ -208,7 +211,7 @@ class VideoInput : public InputOperator { if constexpr (is_cpu) { return ws.GetThreadPool(); } else { - assert(thread_pool_.has_value()); + assert(thread_pool_); return *thread_pool_; } } @@ -243,7 +246,7 @@ class VideoInput : public InputOperator { uint8_t pad_frame_value_ = 0; /// CPU operators have default Thread Pool inside Workspace. Mixed and GPU ops don't. - std::optional thread_pool_ = std::nullopt; + std::unique_ptr thread_pool_; std::vector> frames_decoders_; diff --git a/dali/pipeline/executor/executor2/exec2.cc b/dali/pipeline/executor/executor2/exec2.cc index 1367d4ecd66..cd49da7cce3 100644 --- a/dali/pipeline/executor/executor2/exec2.cc +++ b/dali/pipeline/executor/executor2/exec2.cc @@ -346,7 +346,7 @@ class Executor2::Impl { void SetupThreadPool() { if (graph_info_.num_cpu > 0) { - tp_ = std::make_unique( + tp_ = std::make_unique( config_.thread_pool_threads, config_.device.value_or(CPU_ONLY_DEVICE_ID), config_.set_affinity, diff --git a/dali/pipeline/executor/executor2/exec_graph_test.cc b/dali/pipeline/executor/executor2/exec_graph_test.cc index 44989a12a4a..ca49ce4e7ea 100644 --- a/dali/pipeline/executor/executor2/exec_graph_test.cc +++ b/dali/pipeline/executor/executor2/exec_graph_test.cc @@ -84,7 +84,7 @@ TEST(ExecGraphTest, SimpleGraph) { LimitBackendConcurrency(g, OpType::CPU); WorkspaceParams params = {}; - auto tp = std::make_unique(std::thread::hardware_concurrency(), 0, false, "test"); + auto tp = std::make_unique(std::thread::hardware_concurrency(), 0, false, "test"); ExecEnv env; env.thread_pool = tp.get(); params.env = &env; @@ -144,7 +144,7 @@ TEST(ExecGraphTest, SimpleGraphRepeat) { g.Link(n1, 0, n2, 1); g.Link(n2, 0, no, 0); LimitBackendConcurrency(g, OpType::CPU); - ThreadPool tp(4, 0, false, "test"); + OldThreadPool tp(4, 0, false, "test"); WorkspaceParams params = {}; ExecEnv env; env.thread_pool = &tp; @@ -212,7 +212,7 @@ TEST(ExecGraphTest, SimpleGraphScheduleAheadCPU) { g.Link(n2, 0, no, 0); LimitBackendConcurrency(g, OpType::CPU); - ThreadPool tp(4, 0, false, "test"); + OldThreadPool tp(4, 0, false, "test"); WorkspaceParams params = {}; ExecEnv env; env.thread_pool = &tp; @@ -306,7 +306,7 @@ TEST(ExecGraphTest, GraphScheduleAheadGPU) { n2->env.order = s2; no->env.order = s3; - ThreadPool tp(4, 0, false, "test"); + OldThreadPool tp(4, 0, false, "test"); n1->env.thread_pool = &tp; @@ -379,7 +379,7 @@ TEST(ExecGraphTest, Exception) { g.Link(n1, 0, n2, 1); g.Link(n2, 0, no, 0); LimitBackendConcurrency(g, OpType::CPU); - ThreadPool tp(std::thread::hardware_concurrency(), 0, false, "test"); + OldThreadPool tp(std::thread::hardware_concurrency(), 0, false, "test"); WorkspaceParams params = {}; ExecEnv env; env.thread_pool = &tp; @@ -460,7 +460,7 @@ TEST(ExecGraphTest, LoweredExec) { g.Lower(def); LimitBackendConcurrency(g, OpType::CPU); - ThreadPool tp(std::thread::hardware_concurrency(), 0, false, "test"); + OldThreadPool tp(std::thread::hardware_concurrency(), 0, false, "test"); WorkspaceParams params = {}; ExecEnv env; env.thread_pool = &tp; diff --git a/dali/pipeline/executor/executor_impl.h b/dali/pipeline/executor/executor_impl.h index 1b919ff8913..0fa5d5fd2f8 100644 --- a/dali/pipeline/executor/executor_impl.h +++ b/dali/pipeline/executor/executor_impl.h @@ -318,7 +318,7 @@ class DLL_PUBLIC Executor : public ExecutorBase, public QueuePolicy { OpGraph *graph_ = nullptr; EventPool event_pool_; - ThreadPool thread_pool_; + OldThreadPool thread_pool_; std::vector errors_; mutable std::mutex errors_mutex_; bool exec_error_; diff --git a/dali/pipeline/operator/eager_operator.h b/dali/pipeline/operator/eager_operator.h index 2abaa9da147..e636bd4fe7d 100644 --- a/dali/pipeline/operator/eager_operator.h +++ b/dali/pipeline/operator/eager_operator.h @@ -140,8 +140,8 @@ class DLL_PUBLIC EagerOperator { DLL_PUBLIC inline static void UpdateThreadPool(int num_threads) { std::lock_guard lock(shared_thread_pool_mutex_); - SharedThreadPoolInstance().reset( - new ThreadPool(num_threads, CPU_ONLY_DEVICE_ID, false, "EagerOperator")); + SharedThreadPoolInstance() = std::make_unique( + num_threads, CPU_ONLY_DEVICE_ID, false, "EagerOperator"); } // Update shared CUDA stream used for all direct operators. @@ -170,7 +170,7 @@ class DLL_PUBLIC EagerOperator { } static inline std::shared_ptr &SharedThreadPoolInstance() { - static std::shared_ptr thread_pool = std::make_shared( + static std::shared_ptr thread_pool = std::make_shared( GetDefaultNumThreads(), CPU_ONLY_DEVICE_ID, false, "EagerOperator"); return thread_pool; diff --git a/dali/pipeline/operator/false_gpu_operator.h b/dali/pipeline/operator/false_gpu_operator.h index 41ae260e871..f61f2220025 100644 --- a/dali/pipeline/operator/false_gpu_operator.h +++ b/dali/pipeline/operator/false_gpu_operator.h @@ -116,7 +116,7 @@ class FalseGPUOperator : public Operator { private: CPUOperator cpu_impl_; - ThreadPool thread_pool_; + OldThreadPool thread_pool_; Workspace cpu_ws_; // Keep it here so that we can modify (ws gives only const ref to inputs) diff --git a/dali/pipeline/pipeline_debug.h b/dali/pipeline/pipeline_debug.h index d8005188027..0bc6decef0b 100644 --- a/dali/pipeline/pipeline_debug.h +++ b/dali/pipeline/pipeline_debug.h @@ -95,7 +95,7 @@ class DLL_PUBLIC PipelineDebug { int device_id_; int num_threads_; CUDAStreamLease cuda_stream_; - ThreadPool thread_pool_; + OldThreadPool thread_pool_; std::unordered_map> cpu_operators_; std::unordered_map> gpu_operators_; std::unordered_map> mixed_operators_; diff --git a/dali/pipeline/util/thread_pool.cc b/dali/pipeline/util/thread_pool.cc index b58b5fbf09a..e1a7fe7d248 100644 --- a/dali/pipeline/util/thread_pool.cc +++ b/dali/pipeline/util/thread_pool.cc @@ -27,7 +27,7 @@ namespace dali { -ThreadPool::ThreadPool(int num_thread, int device_id, bool set_affinity, const char* name) +OldThreadPool::OldThreadPool(int num_thread, int device_id, bool set_affinity, const char* name) : threads_(num_thread) { DALI_ENFORCE(num_thread > 0, "Thread pool must have non-zero size"); #if NVML_ENABLED @@ -38,13 +38,14 @@ ThreadPool::ThreadPool(int num_thread, int device_id, bool set_affinity, const c #endif // Start the threads in the main loop for (int i = 0; i < num_thread; ++i) { - threads_[i] = std::thread(std::bind(&ThreadPool::ThreadMain, this, i, device_id, set_affinity, + threads_[i] = std::thread(std::bind(&OldThreadPool::ThreadMain, this, + i, device_id, set_affinity, make_string("[DALI][TP", i, "]", name))); } tl_errors_.resize(num_thread); } -ThreadPool::~ThreadPool() { +OldThreadPool::~OldThreadPool() { WaitForWork(false); std::unique_lock lock(queue_lock_); @@ -58,7 +59,11 @@ ThreadPool::~ThreadPool() { } } -void ThreadPool::AddWork(Work work, int64_t priority) { +void OldThreadPool::AddWork(Work work, int64_t priority) { + AddWork([w = std::move(work)](int) { w(); }, priority); +} + +void OldThreadPool::AddWork(WorkWithThreadIdx work, int64_t priority) { outstanding_work_.fetch_add(1); if (started_) { { @@ -71,8 +76,12 @@ void ThreadPool::AddWork(Work work, int64_t priority) { } } +void OldThreadPool::WaitForWork() { + WaitForWork(true); +} + // Blocks until all work issued to the thread pool is complete -void ThreadPool::WaitForWork(bool checkForErrors) { +void OldThreadPool::WaitForWork(bool checkForErrors) { if (outstanding_work_.load()) { std::unique_lock lock(completed_mutex_); completed_.wait(lock, [&, this] { @@ -95,7 +104,7 @@ void ThreadPool::WaitForWork(bool checkForErrors) { } } -void ThreadPool::RunAll(bool wait) { +void OldThreadPool::RunAll(bool wait) { if (!started_) { { std::lock_guard lock(queue_lock_); @@ -108,11 +117,11 @@ void ThreadPool::RunAll(bool wait) { } } -int ThreadPool::NumThreads() const { +int OldThreadPool::NumThreads() const { return threads_.size(); } -std::vector ThreadPool::GetThreadIds() const { +std::vector OldThreadPool::GetThreadIds() const { std::vector tids; tids.reserve(threads_.size()); for (const auto &thread : threads_) @@ -121,7 +130,7 @@ std::vector ThreadPool::GetThreadIds() const { } -void ThreadPool::ThreadMain(int thread_id, int device_id, bool set_affinity, +void OldThreadPool::ThreadMain(int thread_id, int device_id, bool set_affinity, const std::string &name) { this_thread_idx_ = thread_id; SetThreadName(name.c_str()); @@ -159,7 +168,7 @@ void ThreadPool::ThreadMain(int thread_id, int device_id, bool set_affinity, break; // Get work from the queue. - Work work = std::move(work_queue_.top().second); + WorkWithThreadIdx work = std::move(work_queue_.top().second); work_queue_.pop(); // Unlock the lock lock.unlock(); diff --git a/dali/pipeline/util/thread_pool.h b/dali/pipeline/util/thread_pool.h index dcdbc031d36..530a372eff8 100644 --- a/dali/pipeline/util/thread_pool.h +++ b/dali/pipeline/util/thread_pool.h @@ -36,31 +36,34 @@ namespace dali { -class SingleJobThreadPool : public ThisThreadIdx { - virtual ~SingleJobThreadPool() = default; +class DLL_PUBLIC ThreadPool : public ThisThreadIdx { + public: + virtual ~ThreadPool() = default; virtual void AddWork(std::function work, int64_t priority = 0) = 0; virtual void AddWork(std::function work, int64_t priority = 0) = 0; - virtual void Run(bool wait) = 0; + virtual void RunAll(bool wait = true) = 0; virtual void WaitForWork() = 0; + virtual int NumThreads() const = 0; + virtual std::vector GetThreadIds() const = 0; }; -class DLL_PUBLIC ThreadPool : public SingleJobThreadPool { +class DLL_PUBLIC OldThreadPool : public ThreadPool { public: - // Basic unit of work that our threads do - typedef std::function Work; + using Work = std::function; + using WorkWithThreadIdx = std::function; - DLL_PUBLIC ThreadPool(int num_thread, int device_id, bool set_affinity, const char* name); + OldThreadPool(int num_thread, int device_id, bool set_affinity, const char* name); - DLL_PUBLIC ThreadPool(int num_thread, int device_id, bool set_affinity, const std::string& name) - : ThreadPool(num_thread, device_id, set_affinity, name.c_str()) {} + OldThreadPool(int num_thread, int device_id, bool set_affinity, const std::string& name) + : OldThreadPool(num_thread, device_id, set_affinity, name.c_str()) {} - DLL_PUBLIC ~ThreadPool(); + ~OldThreadPool(); /** * @brief Adds work to the queue with optional priority, and optionally starts processing @@ -70,33 +73,36 @@ class DLL_PUBLIC ThreadPool : public SingleJobThreadPool { * Once work is started, the threads will continue to pick up whatever work is scheduled * until WaitForWork is called. */ - DLL_PUBLIC void AddWork(Work work, int64_t priority = 0); + void AddWork(WorkWithThreadIdx work, int64_t priority = 0) override; + + void AddWork(Work work, int64_t priority = 0) override; /** * @brief Wakes up all the threads to complete all the queued work, * optionally not waiting for the work to be finished before return * (the default wait=true is equivalent to invoking WaitForWork after RunAll). */ - DLL_PUBLIC void RunAll(bool wait = true); + void RunAll(bool wait = true) override; /** * @brief Waits until all work issued to the thread pool is complete */ - DLL_PUBLIC void WaitForWork(bool checkForErrors = true); + void WaitForWork() override; - DLL_PUBLIC int NumThreads() const; + int NumThreads() const override; - DLL_PUBLIC std::vector GetThreadIds() const; + std::vector GetThreadIds() const override; - DISABLE_COPY_MOVE_ASSIGN(ThreadPool); + DISABLE_COPY_MOVE_ASSIGN(OldThreadPool); private: - DLL_PUBLIC void ThreadMain(int thread_id, int device_id, bool set_affinity, - const std::string &name); + void WaitForWork(bool checkForErrors); + + void ThreadMain(int thread_id, int device_id, bool set_affinity, const std::string &name); vector threads_; - using PrioritizedWork = std::pair; + using PrioritizedWork = std::pair>; struct SortByPriority { bool operator() (const PrioritizedWork &a, const PrioritizedWork &b) { return a.first < b.first; diff --git a/dali/pipeline/util/thread_pool_test.cc b/dali/pipeline/util/thread_pool_test.cc index b5ff8e240f9..685118b3cdb 100644 --- a/dali/pipeline/util/thread_pool_test.cc +++ b/dali/pipeline/util/thread_pool_test.cc @@ -21,7 +21,7 @@ namespace dali { namespace test { TEST(ThreadPool, AddWork) { - ThreadPool tp(16, 0, false, "ThreadPool test"); + OldThreadPool tp(16, 0, false, "OldThreadPool test"); std::atomic count{0}; auto increase = [&count](int thread_id) { count++; }; for (int i = 0; i < 64; i++) { @@ -35,7 +35,7 @@ TEST(ThreadPool, AddWork) { TEST(ThreadPool, AddWorkWithPriority) { // only one thread to ensure deterministic behavior - ThreadPool tp(1, 0, false, "ThreadPool test"); + OldThreadPool tp(1, 0, false, "OldThreadPool test"); std::atomic count{0}; auto set_to_1 = [&count](int thread_id) { count = 1; @@ -60,12 +60,12 @@ TEST(ThreadPool, AddWorkWithPriority) { TEST(ThreadPool, CheckName) { - const char given_thread_pool_name[] = "ThreadPool test"; - const char full_thread_pool_name[] = "[DALI][TP0]ThreadPool test"; + const char given_thread_pool_name[] = "OldThreadPool test"; + const char full_thread_pool_name[] = "[DALI][TP0]OldThreadPool test"; // max len supported by pthread_getname_np is 16 char read_thread_pool_name[16] = {0, }; // only one thread to ensure deterministic behavior - ThreadPool tp(1, 0, false, given_thread_pool_name); + OldThreadPool tp(1, 0, false, given_thread_pool_name); auto set_name = [&read_thread_pool_name](int thread_id) { pthread_getname_np(pthread_self(), read_thread_pool_name, sizeof(read_thread_pool_name)); }; diff --git a/dali/python/backend_impl.cc b/dali/python/backend_impl.cc index 2fd0cf11759..5437c06cdf9 100644 --- a/dali/python/backend_impl.cc +++ b/dali/python/backend_impl.cc @@ -2411,7 +2411,7 @@ void ExposeThreadPool(py::module &m) { CUDA_CALL(cudaGetDevice(&dev)); device_id = dev; } - return std::make_shared(num_threads, *device_id, set_affinity, name.data()); + return std::make_shared(num_threads, *device_id, set_affinity, name.data()); }), "num_threads"_a, "device_id"_a = py::none(), diff --git a/dali/test/operators/identity_input.h b/dali/test/operators/identity_input.h index 8bdaf7dee70..813ede465f4 100644 --- a/dali/test/operators/identity_input.h +++ b/dali/test/operators/identity_input.h @@ -36,7 +36,7 @@ class IdentityInput : public InputOperator { InputOperator(spec), cpu_input_(spec.GetArgument("cpu_input")) { if constexpr (std::is_same_v) { - tp_ = std::make_unique(this->num_threads_, this->device_id_, false, + tp_ = std::make_unique(this->num_threads_, this->device_id_, false, "PassthroughInput thread pool"); } } diff --git a/dali/util/s3_client_manager.h b/dali/util/s3_client_manager.h index 2620d795aaa..bc9dde600ee 100644 --- a/dali/util/s3_client_manager.h +++ b/dali/util/s3_client_manager.h @@ -49,7 +49,7 @@ struct S3ClientManager { // thread if necessary). This avoids problems in initializing the dependent Common RunTime C // libraries. static void RunInitOrShutdown(std::function work) { - static ThreadPool s_thread_pool_(1, CPU_ONLY_DEVICE_ID, false, "S3ClientManager"); + static OldThreadPool s_thread_pool_(1, CPU_ONLY_DEVICE_ID, false, "S3ClientManager"); s_thread_pool_.AddWork(std::move(work)); s_thread_pool_.RunAll(); } From 3d9e2210dd73e85cb01fa9e2ffc20eecfb9a2470 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Zientkiewicz?= Date: Tue, 24 Feb 2026 00:09:28 +0100 Subject: [PATCH 03/19] Facade working and hooked up to exec2. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Michał Zientkiewicz --- dali/pipeline/executor/executor2/exec2.cc | 60 ++++++--- dali/pipeline/util/new_thread_pool.cc | 119 ++++++++++++++++++ dali/pipeline/util/new_thread_pool.h | 61 +++++++++ dali/pipeline/util/thread_pool.h | 18 +-- .../test/python/auto_aug/test_auto_augment.py | 49 ++++++-- 5 files changed, 268 insertions(+), 39 deletions(-) create mode 100644 dali/pipeline/util/new_thread_pool.cc create mode 100644 dali/pipeline/util/new_thread_pool.h diff --git a/dali/pipeline/executor/executor2/exec2.cc b/dali/pipeline/executor/executor2/exec2.cc index cd49da7cce3..2692d213aa8 100644 --- a/dali/pipeline/executor/executor2/exec2.cc +++ b/dali/pipeline/executor/executor2/exec2.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2024-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -24,12 +24,22 @@ #include "dali/pipeline/executor/executor2/exec_graph.h" #include "dali/pipeline/executor/executor2/stream_assignment.h" #include "dali/pipeline/operator/builtin/input_operator.h" +#include "dali/pipeline/util/new_thread_pool.h" namespace dali { namespace exec2 { namespace { + +bool UseNewThreadPool() { + static bool use_new_thread_pool = []() { + const char *new_tp = getenv("DALI_USE_NEW_THREAD_POOL"); + return new_tp && atoi(new_tp); + }(); + return use_new_thread_pool; +} + void LimitBackendConcurrency(ExecGraph &graph, OpType backend, int max_concurrency = 1) { auto sem = std::make_shared(max_concurrency); for (auto &n : graph.Nodes()) { @@ -43,7 +53,8 @@ void ApplyConcurrencyLimit(ExecGraph &graph, OperatorConcurrency concurrency) { switch (concurrency) { case OperatorConcurrency::Full: // TODO(michalz): Fix ThreadPool. - LimitBackendConcurrency(graph, OpType::CPU); + if (!UseNewThreadPool()) + LimitBackendConcurrency(graph, OpType::CPU); break; // other operators have no restrictions case OperatorConcurrency::Backend: LimitBackendConcurrency(graph, OpType::CPU); @@ -345,18 +356,37 @@ class Executor2::Impl { } void SetupThreadPool() { - if (graph_info_.num_cpu > 0) { - tp_ = std::make_unique( - config_.thread_pool_threads, - config_.device.value_or(CPU_ONLY_DEVICE_ID), - config_.set_affinity, - "Executorv_v2"); + new_tp_.reset(); + old_tp_.reset(); + thread_pool_wrappers_.clear(); + + if (UseNewThreadPool()) { + std::cerr << "DEBUG: Using new thread pool" << std::endl; + if (graph_info_.num_cpu > 0) { + new_tp_ = std::make_unique( + config_.thread_pool_threads, + config_.device.value_or(CPU_ONLY_DEVICE_ID), + config_.set_affinity, + "Executorv_v2"); + } + for (auto &n : graph_.Nodes()) { + if (n.backend == OpType::CPU) { + thread_pool_wrappers_.push_back(std::make_unique(new_tp_.get())); + n.env.thread_pool = thread_pool_wrappers_.back().get(); + } + } } else { - tp_.reset(); - } - for (auto &n : graph_.Nodes()) { - if (n.backend == OpType::CPU) - n.env.thread_pool = tp_.get(); + if (graph_info_.num_cpu > 0) { + old_tp_ = std::make_unique( + config_.thread_pool_threads, + config_.device.value_or(CPU_ONLY_DEVICE_ID), + config_.set_affinity, + "Executorv_v2"); + } + for (auto &n : graph_.Nodes()) { + if (n.backend == OpType::CPU) + n.env.thread_pool = old_tp_.get(); + } } } @@ -421,7 +451,9 @@ class Executor2::Impl { // Runtime environment - std::unique_ptr tp_; + std::unique_ptr old_tp_; + std::unique_ptr new_tp_; + std::vector> thread_pool_wrappers_; std::queue pending_outputs_; std::vector streams_; std::map> node_map_; diff --git a/dali/pipeline/util/new_thread_pool.cc b/dali/pipeline/util/new_thread_pool.cc new file mode 100644 index 00000000000..6a110c8148d --- /dev/null +++ b/dali/pipeline/util/new_thread_pool.cc @@ -0,0 +1,119 @@ +// Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include "dali/pipeline/util/new_thread_pool.h" +#include "dali/core/device_guard.h" +#include "dali/util/nvml.h" +#include "dali/core/nvtx.h" + +namespace dali { + +NewThreadPool::NewThreadPool( + int num_threads, + std::optional device_id, + bool set_affinity, + std::string name) + : name_(name) { + if (device_id.has_value() && *device_id == CPU_ONLY_DEVICE_ID) + device_id = std::nullopt; +#if NVML_ENABLED + // We use NVML only for setting thread affinity + if (device_id.has_value() && set_affinity) { + nvml_handle_ = nvml::NvmlInstance::CreateNvmlInstance(); + } +#endif + Init(num_threads, [=, this](int thread_idx) { + return OnThreadStart(thread_idx, set_affinity); + }); +} + +std::any NewThreadPool::OnThreadStart(int thread_idx, bool set_affinity) { + std::string name = make_string("[DALI][NT", thread_idx, "]", name_); + SetThreadName(name.c_str()); + std::any dg; + if (device_id_.has_value()) + dg.emplace(*device_id_); +#if NVML_ENABLED + try { + if (set_affinity) { + const char *env_affinity = std::getenv("DALI_AFFINITY_MASK"); + int core = -1; + if (env_affinity) { + const auto &vec = string_split(env_affinity, ','); + if ((size_t)thread_idx < vec.size()) { + core = std::stoi(vec[thread_idx]); + } else { + DALI_WARN("DALI_AFFINITY_MASK environment variable is set, " + "but does not have enough entries: thread_id (", thread_idx, + ") vs #entries (", vec.size(), "). Ignoring..."); + } + } + nvml::SetCPUAffinity(core); + } + } catch (const std::exception &e) { + DALI_WARN("Couldn't set thread affinity in thread ", thread_idx, " of thread pool \"", + name_, "\". Exception ", typeid(e).name(), ": ", e.what()); + } catch (...) { + DALI_WARN("Couldn't set thread affinity in thread ", thread_idx, " of thread pool \"", + name_, "\". Unknown error."); + } +#endif + return dg; +} + +ThreadPoolFacade::~ThreadPoolFacade() { + RunAll(); +} + +void ThreadPoolFacade::AddWork(std::function work, int64_t priority) { + if (!job_) + job_.emplace(); + job_->AddTask(work, priority); +} + +void ThreadPoolFacade::AddWork(std::function work, int64_t priority) { + if (!job_) + job_.emplace(); + job_->AddTask([w = std::move(work)]() { + w(ThreadPoolBase::this_thread_idx()); + }, priority); +} + +void ThreadPoolFacade::RunAll(bool wait) { + if (job_) { + job_->Run(*tp_, wait); + if (wait) + job_.reset(); + } +} + +void ThreadPoolFacade::WaitForWork() { + if (job_) { + job_->Wait(); + job_.reset(); + } +} + +int ThreadPoolFacade::NumThreads() const { + return tp_->NumThreads(); +} + +std::vector ThreadPoolFacade::GetThreadIds() const { + return tp_->GetThreadIds(); +} + +} // namespace dali diff --git a/dali/pipeline/util/new_thread_pool.h b/dali/pipeline/util/new_thread_pool.h new file mode 100644 index 00000000000..0bdbb5518f4 --- /dev/null +++ b/dali/pipeline/util/new_thread_pool.h @@ -0,0 +1,61 @@ +// Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef DALI_PIPELINE_UTIL_NEW_THREAD_POOL_H_ +#define DALI_PIPELINE_UTIL_NEW_THREAD_POOL_H_ + +#include +#include +#include +#include "dali/core/exec/thread_pool_base.h" +#if NVML_ENABLED +#include "dali/util/nvml.h" +#endif +#include "dali/pipeline/util/thread_pool_interface.h" + +namespace dali { + +class DLL_PUBLIC NewThreadPool : public ThreadPoolBase { + public: + NewThreadPool(int num_threads, std::optional device_id, bool set_affinity, std::string name); + + private: + std::any OnThreadStart(int thread_idx, bool set_affinity); + std::optional device_id_; + std::string name_; +#if NVML_ENABLED + nvml::NvmlInstance nvml_handle_; +#endif +}; + +class DLL_PUBLIC ThreadPoolFacade : public ThreadPool { + public: + explicit ThreadPoolFacade(ThreadPoolBase *thread_pool) : tp_(thread_pool) {} + ~ThreadPoolFacade() override; + + void AddWork(std::function work, int64_t priority = 0) override; + void AddWork(std::function work, int64_t priority = 0) override; + void RunAll(bool wait = true) override; + void WaitForWork() override; + int NumThreads() const; + std::vector GetThreadIds() const; + + private: + ThreadPoolBase *tp_ = nullptr; + std::optional job_; +}; + +} // namespace dali + +#endif // DALI_PIPELINE_UTIL_NEW_THREAD_POOL_H_ diff --git a/dali/pipeline/util/thread_pool.h b/dali/pipeline/util/thread_pool.h index 530a372eff8..2dd9a087aff 100644 --- a/dali/pipeline/util/thread_pool.h +++ b/dali/pipeline/util/thread_pool.h @@ -33,26 +33,10 @@ #include "dali/core/semaphore.h" #include "dali/core/spinlock.h" #include "dali/core/exec/thread_idx.h" +#include "dali/pipeline/util/thread_pool_interface.h" namespace dali { -class DLL_PUBLIC ThreadPool : public ThisThreadIdx { - public: - virtual ~ThreadPool() = default; - - virtual void AddWork(std::function work, int64_t priority = 0) = 0; - - virtual void AddWork(std::function work, int64_t priority = 0) = 0; - - virtual void RunAll(bool wait = true) = 0; - - virtual void WaitForWork() = 0; - - virtual int NumThreads() const = 0; - - virtual std::vector GetThreadIds() const = 0; -}; - class DLL_PUBLIC OldThreadPool : public ThreadPool { public: using Work = std::function; diff --git a/dali/test/python/auto_aug/test_auto_augment.py b/dali/test/python/auto_aug/test_auto_augment.py index b6ba9e44b5a..e69a08eaf16 100644 --- a/dali/test/python/auto_aug/test_auto_augment.py +++ b/dali/test/python/auto_aug/test_auto_augment.py @@ -1,4 +1,4 @@ -# Copyright (c) 2023-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# Copyright (c) 2023-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -22,7 +22,7 @@ from nose2.tools import params from nvidia.dali import fn, types -from nvidia.dali import pipeline_def +from nvidia.dali import pipeline_def, OperatorConcurrency from nvidia.dali.auto_aug import auto_augment, augmentations as a from nvidia.dali.auto_aug.core import augmentation, Policy @@ -98,7 +98,12 @@ def test_run_auto_aug(i, args): batch_size = batch_sizes[i % len(batch_sizes)] @pipeline_def( - enable_conditionals=True, batch_size=batch_size, num_threads=4, device_id=0, seed=43 + enable_conditionals=True, + batch_size=batch_size, + num_threads=4, + device_id=0, + seed=43, + concurrency=OperatorConcurrency.FULL, ) def pipeline(): encoded_image, _ = fn.readers.file(name="Reader", file_root=images_dir) @@ -134,7 +139,9 @@ def setUpClass(cls): size_1 = (215, 128) size_2 = (215, 220) - @pipeline_def(batch_size=6, device_id=0, num_threads=4, seed=42) + @pipeline_def( + batch_size=6, device_id=0, num_threads=4, seed=42, concurrency=OperatorConcurrency.FULL + ) def pipeline(size): video = fn.readers.video_resize( filenames=vid_filenames, @@ -177,7 +184,12 @@ def test_uniform(self, i, args): assert device in ("gpu", "cpu") @pipeline_def( - batch_size=batch_size, device_id=0, num_threads=4, seed=205, enable_conditionals=True + batch_size=batch_size, + device_id=0, + num_threads=4, + seed=205, + enable_conditionals=True, + concurrency=OperatorConcurrency.FULL, ) def pipeline(): rng = random.Random(42 + i) @@ -464,7 +476,14 @@ def yet_another_aug(data, _): def test_unused_arg_fail(): - @pipeline_def(enable_conditionals=True, batch_size=5, num_threads=4, device_id=0, seed=43) + @pipeline_def( + enable_conditionals=True, + batch_size=5, + num_threads=4, + device_id=0, + seed=43, + concurrency=OperatorConcurrency.FULL, + ) def pipeline(): encoded_image, _ = fn.readers.file(name="Reader", file_root=images_dir) image = fn.decoders.image(encoded_image, device="mixed") @@ -477,7 +496,14 @@ def pipeline(): def test_empty_policy_fail(): - @pipeline_def(enable_conditionals=True, batch_size=5, num_threads=4, device_id=0, seed=43) + @pipeline_def( + enable_conditionals=True, + batch_size=5, + num_threads=4, + device_id=0, + seed=43, + concurrency=OperatorConcurrency.FULL, + ) def pipeline(): encoded_image, _ = fn.readers.file(name="Reader", file_root=images_dir) image = fn.decoders.image(encoded_image, device="mixed") @@ -492,7 +518,14 @@ def pipeline(): def test_missing_shape_fail(): - @pipeline_def(enable_conditionals=True, batch_size=5, num_threads=4, device_id=0, seed=43) + @pipeline_def( + enable_conditionals=True, + batch_size=5, + num_threads=4, + device_id=0, + seed=43, + concurrency=OperatorConcurrency.FULL, + ) def pipeline(): encoded_image, _ = fn.readers.file(name="Reader", file_root=images_dir) image = fn.decoders.image(encoded_image, device="mixed") From f877ef31f258f8ca006e609f5c05caddfde78986 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Zientkiewicz?= Date: Tue, 24 Feb 2026 00:15:02 +0100 Subject: [PATCH 04/19] Fix comment. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Michał Zientkiewicz --- dali/pipeline/executor/executor2/exec2.cc | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/dali/pipeline/executor/executor2/exec2.cc b/dali/pipeline/executor/executor2/exec2.cc index 2692d213aa8..95c6b734fa9 100644 --- a/dali/pipeline/executor/executor2/exec2.cc +++ b/dali/pipeline/executor/executor2/exec2.cc @@ -52,10 +52,9 @@ void LimitBackendConcurrency(ExecGraph &graph, OpType backend, int max_concurren void ApplyConcurrencyLimit(ExecGraph &graph, OperatorConcurrency concurrency) { switch (concurrency) { case OperatorConcurrency::Full: - // TODO(michalz): Fix ThreadPool. - if (!UseNewThreadPool()) + if (!UseNewThreadPool()) // old thread pool is not thread safe LimitBackendConcurrency(graph, OpType::CPU); - break; // other operators have no restrictions + break; case OperatorConcurrency::Backend: LimitBackendConcurrency(graph, OpType::CPU); LimitBackendConcurrency(graph, OpType::GPU); From 81cd10ac4524a092ddbdf1c392c5ea6c6643f895 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Zientkiewicz?= Date: Tue, 24 Feb 2026 00:16:55 +0100 Subject: [PATCH 05/19] Temporarily enable new thread pool. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Michał Zientkiewicz --- dali/pipeline/executor/executor2/exec2.cc | 5 ++- dali/pipeline/util/thread_pool_interface.h | 44 ++++++++++++++++++++++ 2 files changed, 47 insertions(+), 2 deletions(-) create mode 100644 dali/pipeline/util/thread_pool_interface.h diff --git a/dali/pipeline/executor/executor2/exec2.cc b/dali/pipeline/executor/executor2/exec2.cc index 95c6b734fa9..a95b4c1ff94 100644 --- a/dali/pipeline/executor/executor2/exec2.cc +++ b/dali/pipeline/executor/executor2/exec2.cc @@ -33,11 +33,12 @@ namespace { bool UseNewThreadPool() { - static bool use_new_thread_pool = []() { + return true; // FIXME + /*static bool use_new_thread_pool = []() { const char *new_tp = getenv("DALI_USE_NEW_THREAD_POOL"); return new_tp && atoi(new_tp); }(); - return use_new_thread_pool; + return use_new_thread_pool;*/ } void LimitBackendConcurrency(ExecGraph &graph, OpType backend, int max_concurrency = 1) { diff --git a/dali/pipeline/util/thread_pool_interface.h b/dali/pipeline/util/thread_pool_interface.h new file mode 100644 index 00000000000..7d351101135 --- /dev/null +++ b/dali/pipeline/util/thread_pool_interface.h @@ -0,0 +1,44 @@ +// Copyright (c) 2017-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef DALI_PIPELINE_UTIL_THREAD_POOL_INTERFACE_H_ +#define DALI_PIPELINE_UTIL_THREAD_POOL_INTERFACE_H_ + +#include +#include +#include +#include "dali/core/exec/thread_idx.h" + +namespace dali { + +class DLL_PUBLIC ThreadPool : public ThisThreadIdx { + public: + virtual ~ThreadPool() = default; + + virtual void AddWork(std::function work, int64_t priority = 0) = 0; + + virtual void AddWork(std::function work, int64_t priority = 0) = 0; + + virtual void RunAll(bool wait = true) = 0; + + virtual void WaitForWork() = 0; + + virtual int NumThreads() const = 0; + + virtual std::vector GetThreadIds() const = 0; +}; + +} // namespace dali + +#endif // DALI_PIPELINE_UTIL_THREAD_POOL_INTERFACE_H_ From 3dab38e9deb7220a44c3144c5f869b89fa17082b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Zientkiewicz?= Date: Tue, 24 Feb 2026 09:10:30 +0100 Subject: [PATCH 06/19] Adjust exception specification for clang. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ---- Signed-off-by: Michał Zientkiewicz --- dali/pipeline/util/new_thread_pool.cc | 2 +- dali/pipeline/util/new_thread_pool.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dali/pipeline/util/new_thread_pool.cc b/dali/pipeline/util/new_thread_pool.cc index 6a110c8148d..21b0dfe551d 100644 --- a/dali/pipeline/util/new_thread_pool.cc +++ b/dali/pipeline/util/new_thread_pool.cc @@ -75,7 +75,7 @@ std::any NewThreadPool::OnThreadStart(int thread_idx, bool set_affinity) { return dg; } -ThreadPoolFacade::~ThreadPoolFacade() { +ThreadPoolFacade::~ThreadPoolFacade() noexcept { RunAll(); } diff --git a/dali/pipeline/util/new_thread_pool.h b/dali/pipeline/util/new_thread_pool.h index 0bdbb5518f4..9b370420127 100644 --- a/dali/pipeline/util/new_thread_pool.h +++ b/dali/pipeline/util/new_thread_pool.h @@ -42,7 +42,7 @@ class DLL_PUBLIC NewThreadPool : public ThreadPoolBase { class DLL_PUBLIC ThreadPoolFacade : public ThreadPool { public: explicit ThreadPoolFacade(ThreadPoolBase *thread_pool) : tp_(thread_pool) {} - ~ThreadPoolFacade() override; + ~ThreadPoolFacade() noexcept override; void AddWork(std::function work, int64_t priority = 0) override; void AddWork(std::function work, int64_t priority = 0) override; From b9d5f8d5f4e6cd9bb9a20e76a6ff28d97c3052fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Zientkiewicz?= Date: Tue, 24 Feb 2026 09:11:40 +0100 Subject: [PATCH 07/19] Add override specifiers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Michał Zientkiewicz --- dali/pipeline/util/new_thread_pool.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dali/pipeline/util/new_thread_pool.h b/dali/pipeline/util/new_thread_pool.h index 9b370420127..92030c3fe9f 100644 --- a/dali/pipeline/util/new_thread_pool.h +++ b/dali/pipeline/util/new_thread_pool.h @@ -48,8 +48,8 @@ class DLL_PUBLIC ThreadPoolFacade : public ThreadPool { void AddWork(std::function work, int64_t priority = 0) override; void RunAll(bool wait = true) override; void WaitForWork() override; - int NumThreads() const; - std::vector GetThreadIds() const; + int NumThreads() const override; + std::vector GetThreadIds() const override; private: ThreadPoolBase *tp_ = nullptr; From 98a7e65b5b337e766aba8924480332f42bf78e14 Mon Sep 17 00:00:00 2001 From: Michal Zientkiewicz Date: Tue, 24 Feb 2026 15:09:45 +0100 Subject: [PATCH 08/19] Use non-reentrant jobs in new thread pool facade. Signed-off-by: Michal Zientkiewicz --- dali/pipeline/executor/executor2/exec2.cc | 5 ++--- dali/pipeline/util/new_thread_pool.cc | 11 ++++++++--- dali/test/python/auto_aug/test_auto_augment.py | 16 ++++++++-------- 3 files changed, 18 insertions(+), 14 deletions(-) diff --git a/dali/pipeline/executor/executor2/exec2.cc b/dali/pipeline/executor/executor2/exec2.cc index a95b4c1ff94..95c6b734fa9 100644 --- a/dali/pipeline/executor/executor2/exec2.cc +++ b/dali/pipeline/executor/executor2/exec2.cc @@ -33,12 +33,11 @@ namespace { bool UseNewThreadPool() { - return true; // FIXME - /*static bool use_new_thread_pool = []() { + static bool use_new_thread_pool = []() { const char *new_tp = getenv("DALI_USE_NEW_THREAD_POOL"); return new_tp && atoi(new_tp); }(); - return use_new_thread_pool;*/ + return use_new_thread_pool; } void LimitBackendConcurrency(ExecGraph &graph, OpType backend, int max_concurrency = 1) { diff --git a/dali/pipeline/util/new_thread_pool.cc b/dali/pipeline/util/new_thread_pool.cc index 21b0dfe551d..6d1ec346036 100644 --- a/dali/pipeline/util/new_thread_pool.cc +++ b/dali/pipeline/util/new_thread_pool.cc @@ -15,6 +15,7 @@ #include #include #include +#include "dali/core/call_at_exit.h" #include "dali/pipeline/util/new_thread_pool.h" #include "dali/core/device_guard.h" #include "dali/util/nvml.h" @@ -95,16 +96,20 @@ void ThreadPoolFacade::AddWork(std::function work, int64_t priority) void ThreadPoolFacade::RunAll(bool wait) { if (job_) { + auto atexit = AtScopeExit([&]() { + if (wait) + job_.reset(); + }); job_->Run(*tp_, wait); - if (wait) - job_.reset(); } } void ThreadPoolFacade::WaitForWork() { if (job_) { + auto atexit = AtScopeExit([&]() { + job_.reset(); + }); job_->Wait(); - job_.reset(); } } diff --git a/dali/test/python/auto_aug/test_auto_augment.py b/dali/test/python/auto_aug/test_auto_augment.py index e69a08eaf16..6fb51de2840 100644 --- a/dali/test/python/auto_aug/test_auto_augment.py +++ b/dali/test/python/auto_aug/test_auto_augment.py @@ -35,6 +35,8 @@ vid_files = ["sintel_trailer-720p_2.mp4"] vid_filenames = [os.path.join(vid_dir, vid_file) for vid_file in vid_files] +concurrency = OperatorConcurrency.FULL + def mag_to_param_with_op_id(op_id): def mag_to_param(magnitude): @@ -103,7 +105,7 @@ def test_run_auto_aug(i, args): num_threads=4, device_id=0, seed=43, - concurrency=OperatorConcurrency.FULL, + concurrency=concurrency, ) def pipeline(): encoded_image, _ = fn.readers.file(name="Reader", file_root=images_dir) @@ -139,9 +141,7 @@ def setUpClass(cls): size_1 = (215, 128) size_2 = (215, 220) - @pipeline_def( - batch_size=6, device_id=0, num_threads=4, seed=42, concurrency=OperatorConcurrency.FULL - ) + @pipeline_def(batch_size=6, device_id=0, num_threads=4, seed=42, concurrency=concurrency) def pipeline(size): video = fn.readers.video_resize( filenames=vid_filenames, @@ -189,7 +189,7 @@ def test_uniform(self, i, args): num_threads=4, seed=205, enable_conditionals=True, - concurrency=OperatorConcurrency.FULL, + concurrency=concurrency, ) def pipeline(): rng = random.Random(42 + i) @@ -482,7 +482,7 @@ def test_unused_arg_fail(): num_threads=4, device_id=0, seed=43, - concurrency=OperatorConcurrency.FULL, + concurrency=concurrency, ) def pipeline(): encoded_image, _ = fn.readers.file(name="Reader", file_root=images_dir) @@ -502,7 +502,7 @@ def test_empty_policy_fail(): num_threads=4, device_id=0, seed=43, - concurrency=OperatorConcurrency.FULL, + concurrency=concurrency, ) def pipeline(): encoded_image, _ = fn.readers.file(name="Reader", file_root=images_dir) @@ -524,7 +524,7 @@ def test_missing_shape_fail(): num_threads=4, device_id=0, seed=43, - concurrency=OperatorConcurrency.FULL, + concurrency=concurrency, ) def pipeline(): encoded_image, _ = fn.readers.file(name="Reader", file_root=images_dir) From d1c7962073cc2e77836bf45b50e49c3035397091 Mon Sep 17 00:00:00 2001 From: Michal Zientkiewicz Date: Mon, 2 Mar 2026 17:28:22 +0100 Subject: [PATCH 09/19] Move UseNewThreadPool to NewThreadPool. Signed-off-by: Michal Zientkiewicz --- dali/pipeline/executor/executor2/exec2.cc | 9 --------- dali/pipeline/util/new_thread_pool.cc | 9 +++++++++ dali/pipeline/util/new_thread_pool.h | 2 ++ 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/dali/pipeline/executor/executor2/exec2.cc b/dali/pipeline/executor/executor2/exec2.cc index 95c6b734fa9..180c8a9d5bb 100644 --- a/dali/pipeline/executor/executor2/exec2.cc +++ b/dali/pipeline/executor/executor2/exec2.cc @@ -31,15 +31,6 @@ namespace exec2 { namespace { - -bool UseNewThreadPool() { - static bool use_new_thread_pool = []() { - const char *new_tp = getenv("DALI_USE_NEW_THREAD_POOL"); - return new_tp && atoi(new_tp); - }(); - return use_new_thread_pool; -} - void LimitBackendConcurrency(ExecGraph &graph, OpType backend, int max_concurrency = 1) { auto sem = std::make_shared(max_concurrency); for (auto &n : graph.Nodes()) { diff --git a/dali/pipeline/util/new_thread_pool.cc b/dali/pipeline/util/new_thread_pool.cc index 6d1ec346036..7c700494cac 100644 --- a/dali/pipeline/util/new_thread_pool.cc +++ b/dali/pipeline/util/new_thread_pool.cc @@ -121,4 +121,13 @@ std::vector ThreadPoolFacade::GetThreadIds() const { return tp_->GetThreadIds(); } +bool UseNewThreadPool() { + static bool use_new_thread_pool = []() { + const char *new_tp = getenv("DALI_USE_NEW_THREAD_POOL"); + return new_tp && atoi(new_tp); + }(); + return use_new_thread_pool; +} + + } // namespace dali diff --git a/dali/pipeline/util/new_thread_pool.h b/dali/pipeline/util/new_thread_pool.h index 92030c3fe9f..f93e0ec3f82 100644 --- a/dali/pipeline/util/new_thread_pool.h +++ b/dali/pipeline/util/new_thread_pool.h @@ -56,6 +56,8 @@ class DLL_PUBLIC ThreadPoolFacade : public ThreadPool { std::optional job_; }; +DLL_PUBLIC bool UseNewThreadPool(); + } // namespace dali #endif // DALI_PIPELINE_UTIL_NEW_THREAD_POOL_H_ From 1eca9456193aa9e216c58cde3b7db456ad12a72a Mon Sep 17 00:00:00 2001 From: Michal Zientkiewicz Date: Thu, 5 Mar 2026 17:27:05 +0100 Subject: [PATCH 10/19] Add tests with new thread pool. Signed-off-by: Michal Zientkiewicz --- dali/operators/reader/numpy_reader_op.h | 2 +- dali/operators/reader/tfrecord_reader_op.h | 2 +- dali/pipeline/executor/executor2/exec2.cc | 2 +- qa/TL0_python-self-test-core-newtp/test.sh | 3 +++ qa/TL0_python-self-test-core/test.sh | 1 - qa/TL1_decoder_perf/test.sh | 16 ++++++++++++++-- 6 files changed, 20 insertions(+), 6 deletions(-) create mode 100644 qa/TL0_python-self-test-core-newtp/test.sh diff --git a/dali/operators/reader/numpy_reader_op.h b/dali/operators/reader/numpy_reader_op.h index d2725b8d15e..e04f98033d5 100644 --- a/dali/operators/reader/numpy_reader_op.h +++ b/dali/operators/reader/numpy_reader_op.h @@ -196,7 +196,7 @@ class NumpyReaderCPU : public NumpyReader { */ size_t o_direct_alignm_ = 0; size_t o_direct_read_len_alignm_ = 0; - // OldThreadPool for prefetch which is a separate thread + // Thread Pool for prefetch which is a separate thread OldThreadPool thread_pool_; }; diff --git a/dali/operators/reader/tfrecord_reader_op.h b/dali/operators/reader/tfrecord_reader_op.h index b7b40817cd8..6cf9e29167b 100644 --- a/dali/operators/reader/tfrecord_reader_op.h +++ b/dali/operators/reader/tfrecord_reader_op.h @@ -79,7 +79,7 @@ class TFRecordReader bool dont_use_mmap_ = false; bool use_o_direct_ = false; size_t o_direct_chunk_size_ = 0; - // OldThreadPool for prefetch which is a separate thread + // Thread Pool for prefetch which is a separate thread OldThreadPool thread_pool_; }; diff --git a/dali/pipeline/executor/executor2/exec2.cc b/dali/pipeline/executor/executor2/exec2.cc index 180c8a9d5bb..27399ff987c 100644 --- a/dali/pipeline/executor/executor2/exec2.cc +++ b/dali/pipeline/executor/executor2/exec2.cc @@ -351,7 +351,7 @@ class Executor2::Impl { thread_pool_wrappers_.clear(); if (UseNewThreadPool()) { - std::cerr << "DEBUG: Using new thread pool" << std::endl; + std::cerr << "\n!!! Forced use of NewThreadPool !!!" << std::endl; if (graph_info_.num_cpu > 0) { new_tp_ = std::make_unique( config_.thread_pool_threads, diff --git a/qa/TL0_python-self-test-core-newtp/test.sh b/qa/TL0_python-self-test-core-newtp/test.sh new file mode 100644 index 00000000000..f27eee0d12f --- /dev/null +++ b/qa/TL0_python-self-test-core-newtp/test.sh @@ -0,0 +1,3 @@ +#!/bin/bash -e +export DALI_USE_NEW_THREAD_POOL=1 +source ../TL0_python-self-test-core/test.sh diff --git a/qa/TL0_python-self-test-core/test.sh b/qa/TL0_python-self-test-core/test.sh index 4fa84258a0d..dd1f504f839 100644 --- a/qa/TL0_python-self-test-core/test.sh +++ b/qa/TL0_python-self-test-core/test.sh @@ -1,4 +1,3 @@ #!/bin/bash -e -export DALI_USE_EXEC2=0 bash -e ./test_nofw.sh bash -e ./test_pytorch.sh diff --git a/qa/TL1_decoder_perf/test.sh b/qa/TL1_decoder_perf/test.sh index 36cb292245d..e200d0e0e0f 100644 --- a/qa/TL1_decoder_perf/test.sh +++ b/qa/TL1_decoder_perf/test.sh @@ -5,6 +5,8 @@ target_dir=./internal_tools LOG1="dali_legacy.log" LOG2="dali_nvimgcodec.log" +LOG1_TP="dali_legacy_new_tp.log" +LOG2_TP="dali_nvimgcodec_new_tp.log" LOG1_NDD="dali_ndd_legacy.log" LOG2_NDD="dali_ndd_nvimgcodec.log" function CLEAN_AND_EXIT { @@ -26,6 +28,8 @@ test_body() { # use taskset to avoid inefficient data migration between cores we don't want to use taskset --cpu-list 0-127 python hw_decoder_bench.py --width_hint 6000 --height_hint 6000 -b 408 -d 0 -g gpu -w 100 -t 100000 -i ${DALI_EXTRA_PATH}/db/single/jpeg -p rn50 -j 70 --hw_load 0.12 | tee ${LOG1} taskset --cpu-list 0-127 python hw_decoder_bench.py --width_hint 6000 --height_hint 6000 -b 408 -d 0 -g gpu -w 100 -t 100000 -i ${DALI_EXTRA_PATH}/db/single/jpeg -p rn50 -j 70 --hw_load 0.12 --experimental_decoder | tee ${LOG2} + DALI_USE_NEW_THREAD_POOL=1 taskset --cpu-list 0-127 python hw_decoder_bench.py --width_hint 6000 --height_hint 6000 -b 408 -d 0 -g gpu -w 100 -t 100000 -i ${DALI_EXTRA_PATH}/db/single/jpeg -p rn50 -j 70 --hw_load 0.12 | tee ${LOG1_TP} + DALI_USE_NEW_THREAD_POOL=1 taskset --cpu-list 0-127 python hw_decoder_bench.py --width_hint 6000 --height_hint 6000 -b 408 -d 0 -g gpu -w 100 -t 100000 -i ${DALI_EXTRA_PATH}/db/single/jpeg -p rn50 -j 70 --hw_load 0.12 --experimental_decoder | tee ${LOG2_TP} taskset --cpu-list 0-127 python hw_decoder_bench.py --width_hint 6000 --height_hint 6000 -b 408 -d 0 -g gpu -w 100 -t 100000 -i ${DALI_EXTRA_PATH}/db/single/jpeg -p ndd_rn50 -j 70 --hw_load 0.12 | tee ${LOG1_NDD} taskset --cpu-list 0-127 python hw_decoder_bench.py --width_hint 6000 --height_hint 6000 -b 408 -d 0 -g gpu -w 100 -t 100000 -i ${DALI_EXTRA_PATH}/db/single/jpeg -p ndd_rn50 -j 70 --hw_load 0.12 --experimental_decoder | tee ${LOG2_NDD} @@ -37,7 +41,9 @@ test_body() { MIN_PERF2_NDD=20000; # TODO(janton): remove this second value. # use taskset to avoid inefficient data migration between cores we don't want to use taskset --cpu-list 0-71 python hw_decoder_bench.py --width_hint 6000 --height_hint 6000 -b 408 -d 0 -g gpu -w 100 -t 100000 -i ${DALI_EXTRA_PATH}/db/single/jpeg -p rn50 -j 72 --hw_load 0.11 | tee ${LOG1} - taskset --cpu-list 0-71 python hw_decoder_bench.py --width_hint 6000 --height_hint 6000 -b 408 -d 0 -g gpu -w 100 -t 100000 -i ${DALI_EXTRA_PATH}/db/single/jpeg -p rn50 -j 72 --hw_load 0.11 --experimental_decoder | tee ${LOG2} + taskset --cpu-list 0-71 python hw_decoder_bench.py --width_hint 6000 --height_hint 6000 -b 408 -d 0 -g gpu -w 100 -t 100000 -i ${DALI_EXTRA_PATH}/db/single/jpeg -p rn50 -j 72 --hw_load 0.11 --experimental_decoder | tee ${LOG2_TP} + DALI_USE_NEW_THREAD_POOL=1 taskset --cpu-list 0-71 python hw_decoder_bench.py --width_hint 6000 --height_hint 6000 -b 408 -d 0 -g gpu -w 100 -t 100000 -i ${DALI_EXTRA_PATH}/db/single/jpeg -p rn50 -j 72 --hw_load 0.11 | tee ${LOG1} + DALI_USE_NEW_THREAD_POOL=1 taskset --cpu-list 0-71 python hw_decoder_bench.py --width_hint 6000 --height_hint 6000 -b 408 -d 0 -g gpu -w 100 -t 100000 -i ${DALI_EXTRA_PATH}/db/single/jpeg -p rn50 -j 72 --hw_load 0.11 --experimental_decoder | tee ${LOG2_TP} taskset --cpu-list 0-71 python hw_decoder_bench.py --width_hint 6000 --height_hint 6000 -b 408 -d 0 -g gpu -w 100 -t 100000 -i ${DALI_EXTRA_PATH}/db/single/jpeg -p ndd_rn50 -j 72 --hw_load 0.11 | tee ${LOG1_NDD} taskset --cpu-list 0-71 python hw_decoder_bench.py --width_hint 6000 --height_hint 6000 -b 408 -d 0 -g gpu -w 100 -t 100000 -i ${DALI_EXTRA_PATH}/db/single/jpeg -p ndd_rn50 -j 72 --hw_load 0.11 --experimental_decoder | tee ${LOG2_NDD} fi @@ -78,21 +84,27 @@ test_body() { } PERF_RESULT1=$(perf_check "${LOG1}" "$MIN_PERF") PERF_RESULT2=$(perf_check "${LOG2}" "$MIN_PERF2") + PERF_RESULT1_TP=$(perf_check "${LOG1_TP}" "$MIN_PERF") + PERF_RESULT2_TP=$(perf_check "${LOG2_TP}" "$MIN_PERF2") PERF_RESULT1_NDD=$(perf_check "${LOG1_NDD}" "$MIN_PERF_NDD") PERF_RESULT2_NDD=$(perf_check "${LOG2_NDD}" "$MIN_PERF2_NDD") PERF_RESULT3=$(perf_check "${LOG2}" "$(extract_perf "${LOG1}")" 5) PERF_RESULT3_NDD=$(perf_check "${LOG2_NDD}" "$(extract_perf "${LOG1_NDD}")" 5) + PERF_RESULT1_TP=$(perf_check "${LOG1_TP}" "$(extract_perf "${LOG1}")" 2) + PERF_RESULT2_TP=$(perf_check "${LOG2_TP}" "$(extract_perf "${LOG2}")" 2) echo "PERF_RESULT1=${PERF_RESULT1}" echo "PERF_RESULT2=${PERF_RESULT2}" echo "PERF_RESULT3=${PERF_RESULT3}" + echo "PERF_RESULT1=${PERF_RESULT1_TP}" + echo "PERF_RESULT2=${PERF_RESULT2_TP}" echo "PERF_RESULT1_NDD=${PERF_RESULT1_NDD}" echo "PERF_RESULT2_NDD=${PERF_RESULT2_NDD}" echo "PERF_RESULT3_NDD=${PERF_RESULT3_NDD}" # if [[ "$PERF_RESULT1" == "OK" && "$PERF_RESULT2" == "OK" && "$PERF_RESULT3" == "OK" && "$PERF_RESULT1_NDD" == "OK" && "$PERF_RESULT2_NDD" == "OK" && "$PERF_RESULT3_NDD" == "OK" ]]; then # don't check experimental decoder performance with dynamic mode - if [[ "$PERF_RESULT1" == "OK" && "$PERF_RESULT2" == "OK" && "$PERF_RESULT3" == "OK" && "$PERF_RESULT1_NDD" == "OK" ]]; then + if [[ "$PERF_RESULT1" == "OK" && "$PERF_RESULT2" == "OK" && "$PERF_RESULT1_TP" == "OK" && "$PERF_RESULT2_TP" == "OK" && "$PERF_RESULT3" == "OK" && "$PERF_RESULT1_NDD" == "OK" ]]; then CLEAN_AND_EXIT 0 else CLEAN_AND_EXIT 1 From a9ec7fa954bee5ab6b75e1d4f4c9692dd43b0600 Mon Sep 17 00:00:00 2001 From: Michal Zientkiewicz Date: Mon, 9 Mar 2026 11:16:34 +0100 Subject: [PATCH 11/19] Fix test invocation. Assign device_id in NewThreadPool constructor. Signed-off-by: Michal Zientkiewicz --- dali/pipeline/util/new_thread_pool.cc | 1 + qa/TL1_decoder_perf/test.sh | 10 ++++------ 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/dali/pipeline/util/new_thread_pool.cc b/dali/pipeline/util/new_thread_pool.cc index 7c700494cac..172ba4b5566 100644 --- a/dali/pipeline/util/new_thread_pool.cc +++ b/dali/pipeline/util/new_thread_pool.cc @@ -31,6 +31,7 @@ NewThreadPool::NewThreadPool( : name_(name) { if (device_id.has_value() && *device_id == CPU_ONLY_DEVICE_ID) device_id = std::nullopt; + device_id_ = device_id; #if NVML_ENABLED // We use NVML only for setting thread affinity if (device_id.has_value() && set_affinity) { diff --git a/qa/TL1_decoder_perf/test.sh b/qa/TL1_decoder_perf/test.sh index e200d0e0e0f..fde847c0590 100644 --- a/qa/TL1_decoder_perf/test.sh +++ b/qa/TL1_decoder_perf/test.sh @@ -41,8 +41,8 @@ test_body() { MIN_PERF2_NDD=20000; # TODO(janton): remove this second value. # use taskset to avoid inefficient data migration between cores we don't want to use taskset --cpu-list 0-71 python hw_decoder_bench.py --width_hint 6000 --height_hint 6000 -b 408 -d 0 -g gpu -w 100 -t 100000 -i ${DALI_EXTRA_PATH}/db/single/jpeg -p rn50 -j 72 --hw_load 0.11 | tee ${LOG1} - taskset --cpu-list 0-71 python hw_decoder_bench.py --width_hint 6000 --height_hint 6000 -b 408 -d 0 -g gpu -w 100 -t 100000 -i ${DALI_EXTRA_PATH}/db/single/jpeg -p rn50 -j 72 --hw_load 0.11 --experimental_decoder | tee ${LOG2_TP} - DALI_USE_NEW_THREAD_POOL=1 taskset --cpu-list 0-71 python hw_decoder_bench.py --width_hint 6000 --height_hint 6000 -b 408 -d 0 -g gpu -w 100 -t 100000 -i ${DALI_EXTRA_PATH}/db/single/jpeg -p rn50 -j 72 --hw_load 0.11 | tee ${LOG1} + taskset --cpu-list 0-71 python hw_decoder_bench.py --width_hint 6000 --height_hint 6000 -b 408 -d 0 -g gpu -w 100 -t 100000 -i ${DALI_EXTRA_PATH}/db/single/jpeg -p rn50 -j 72 --hw_load 0.11 --experimental_decoder | tee ${LOG2} + DALI_USE_NEW_THREAD_POOL=1 taskset --cpu-list 0-71 python hw_decoder_bench.py --width_hint 6000 --height_hint 6000 -b 408 -d 0 -g gpu -w 100 -t 100000 -i ${DALI_EXTRA_PATH}/db/single/jpeg -p rn50 -j 72 --hw_load 0.11 | tee ${LOG1_TP} DALI_USE_NEW_THREAD_POOL=1 taskset --cpu-list 0-71 python hw_decoder_bench.py --width_hint 6000 --height_hint 6000 -b 408 -d 0 -g gpu -w 100 -t 100000 -i ${DALI_EXTRA_PATH}/db/single/jpeg -p rn50 -j 72 --hw_load 0.11 --experimental_decoder | tee ${LOG2_TP} taskset --cpu-list 0-71 python hw_decoder_bench.py --width_hint 6000 --height_hint 6000 -b 408 -d 0 -g gpu -w 100 -t 100000 -i ${DALI_EXTRA_PATH}/db/single/jpeg -p ndd_rn50 -j 72 --hw_load 0.11 | tee ${LOG1_NDD} taskset --cpu-list 0-71 python hw_decoder_bench.py --width_hint 6000 --height_hint 6000 -b 408 -d 0 -g gpu -w 100 -t 100000 -i ${DALI_EXTRA_PATH}/db/single/jpeg -p ndd_rn50 -j 72 --hw_load 0.11 --experimental_decoder | tee ${LOG2_NDD} @@ -84,8 +84,6 @@ test_body() { } PERF_RESULT1=$(perf_check "${LOG1}" "$MIN_PERF") PERF_RESULT2=$(perf_check "${LOG2}" "$MIN_PERF2") - PERF_RESULT1_TP=$(perf_check "${LOG1_TP}" "$MIN_PERF") - PERF_RESULT2_TP=$(perf_check "${LOG2_TP}" "$MIN_PERF2") PERF_RESULT1_NDD=$(perf_check "${LOG1_NDD}" "$MIN_PERF_NDD") PERF_RESULT2_NDD=$(perf_check "${LOG2_NDD}" "$MIN_PERF2_NDD") PERF_RESULT3=$(perf_check "${LOG2}" "$(extract_perf "${LOG1}")" 5) @@ -96,8 +94,8 @@ test_body() { echo "PERF_RESULT1=${PERF_RESULT1}" echo "PERF_RESULT2=${PERF_RESULT2}" echo "PERF_RESULT3=${PERF_RESULT3}" - echo "PERF_RESULT1=${PERF_RESULT1_TP}" - echo "PERF_RESULT2=${PERF_RESULT2_TP}" + echo "PERF_RESULT1_TP=${PERF_RESULT1_TP}" + echo "PERF_RESULT2_TP=${PERF_RESULT2_TP}" echo "PERF_RESULT1_NDD=${PERF_RESULT1_NDD}" echo "PERF_RESULT2_NDD=${PERF_RESULT2_NDD}" echo "PERF_RESULT3_NDD=${PERF_RESULT3_NDD}" From 8709e9630545d838aff410e97a4eb9e21b4f5d4d Mon Sep 17 00:00:00 2001 From: Michal Zientkiewicz Date: Mon, 9 Mar 2026 11:18:45 +0100 Subject: [PATCH 12/19] Update perf test cleanup function. Signed-off-by: Michal Zientkiewicz --- qa/TL1_decoder_perf/test.sh | 2 ++ 1 file changed, 2 insertions(+) diff --git a/qa/TL1_decoder_perf/test.sh b/qa/TL1_decoder_perf/test.sh index fde847c0590..c2077535120 100644 --- a/qa/TL1_decoder_perf/test.sh +++ b/qa/TL1_decoder_perf/test.sh @@ -12,6 +12,8 @@ LOG2_NDD="dali_ndd_nvimgcodec.log" function CLEAN_AND_EXIT { rm -rf ${LOG1} rm -rf ${LOG2} + rm -rf ${LOG1_TP} + rm -rf ${LOG2_TP} rm -rf ${LOG1_NDD} rm -rf ${LOG2_NDD} exit $1 From dc92497af789fe91e00bdd1b681dd5048031137c Mon Sep 17 00:00:00 2001 From: Michal Zientkiewicz Date: Mon, 9 Mar 2026 16:48:49 +0100 Subject: [PATCH 13/19] Allow multiple runs before wait in ThreadPoolFacade. Signed-off-by: Michal Zientkiewicz --- dali/pipeline/util/new_thread_pool.cc | 66 +++++++++++++++++------ dali/pipeline/util/new_thread_pool.h | 3 +- include/dali/core/exec/thread_pool_base.h | 9 +++- 3 files changed, 60 insertions(+), 18 deletions(-) diff --git a/dali/pipeline/util/new_thread_pool.cc b/dali/pipeline/util/new_thread_pool.cc index 172ba4b5566..c90b191fce9 100644 --- a/dali/pipeline/util/new_thread_pool.cc +++ b/dali/pipeline/util/new_thread_pool.cc @@ -12,10 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include #include "dali/core/call_at_exit.h" +#include "dali/core/small_vector.h" #include "dali/pipeline/util/new_thread_pool.h" #include "dali/core/device_guard.h" #include "dali/util/nvml.h" @@ -82,35 +84,69 @@ ThreadPoolFacade::~ThreadPoolFacade() noexcept { } void ThreadPoolFacade::AddWork(std::function work, int64_t priority) { - if (!job_) - job_.emplace(); - job_->AddTask(work, priority); + if (jobs_.empty() || jobs_.front().Started()) + jobs_.emplace_front(); + jobs_.front().AddTask(work, priority); } void ThreadPoolFacade::AddWork(std::function work, int64_t priority) { - if (!job_) - job_.emplace(); - job_->AddTask([w = std::move(work)]() { + if (jobs_.empty() || jobs_.front().Started()) + jobs_.emplace_front(); + jobs_.front().AddTask([w = std::move(work)]() { w(ThreadPoolBase::this_thread_idx()); }, priority); } void ThreadPoolFacade::RunAll(bool wait) { - if (job_) { - auto atexit = AtScopeExit([&]() { - if (wait) - job_.reset(); - }); - job_->Run(*tp_, wait); + if (!jobs_.empty()) { + if (!wait) { + if (!jobs_.front().Started()) // all subsequent jobs_ must be started + jobs_.front().Run(*tp_, false); + } else { + if (jobs_.size() == 1) { // fast path for the common case + auto atexit = AtScopeExit([&]() { + jobs_.clear(); + }); + if (jobs_.front().Started()) + jobs_.front().Wait(); + else + jobs_.front().Run(*tp_, true); + } else { + if (jobs_.front().Started()) + jobs_.front().Wait(); + WaitForWork(); + } + } } } void ThreadPoolFacade::WaitForWork() { - if (job_) { + if (!jobs_.empty()) { + if (!jobs_.front().Started()) + throw std::logic_error("WaitForWork called without Run"); auto atexit = AtScopeExit([&]() { - job_.reset(); + jobs_.clear(); }); - job_->Wait(); + // This won't be allocated unless an exception was thrown + std::vector errs; + // The jobs in jobs_ are ordered from latest to oldest, so theres little chance that more than + // one Wait would block. + for (auto &job : jobs_) { + try { + job.Wait(); + } catch (MultipleErrors &e) { + // unwrap MultipleErrors to avoid nesting + errs.insert(errs.end(), e.errors().begin(), e.errors().end()); + } catch (...) { + errs.push_back(std::current_exception()); + } + } + if (errs.size() == 1) { + std::rethrow_exception(std::move(errs[0])); + } else if (errs.size() > 1) { + std::reverse(errs.begin(), errs.end()); + MultipleErrors(std::move(errs)); + } // else no error } } diff --git a/dali/pipeline/util/new_thread_pool.h b/dali/pipeline/util/new_thread_pool.h index f93e0ec3f82..0bd04859fe1 100644 --- a/dali/pipeline/util/new_thread_pool.h +++ b/dali/pipeline/util/new_thread_pool.h @@ -15,6 +15,7 @@ #ifndef DALI_PIPELINE_UTIL_NEW_THREAD_POOL_H_ #define DALI_PIPELINE_UTIL_NEW_THREAD_POOL_H_ +#include #include #include #include @@ -53,7 +54,7 @@ class DLL_PUBLIC ThreadPoolFacade : public ThreadPool { private: ThreadPoolBase *tp_ = nullptr; - std::optional job_; + std::list jobs_; }; DLL_PUBLIC bool UseNewThreadPool(); diff --git a/include/dali/core/exec/thread_pool_base.h b/include/dali/core/exec/thread_pool_base.h index 87a90fee969..3627eaa7cc8 100644 --- a/include/dali/core/exec/thread_pool_base.h +++ b/include/dali/core/exec/thread_pool_base.h @@ -58,6 +58,13 @@ class DLL_PUBLIC JobBaseFields { */ template class DLL_PUBLIC JobBase : public JobBaseFields { + public: + bool Started() const noexcept { return executor_ != nullptr; } + bool WaitStarted() const noexcept { return wait_started_; } + bool WaitCompleted() const noexcept { return wait_started_; } + + static constexpr bool IsCooperative() { return cooperative; } + protected: JobBase() = default; ~JobBase() noexcept(false); @@ -76,8 +83,6 @@ class DLL_PUBLIC JobBase : public JobBaseFields { */ void DoNotify(); - static constexpr bool IsCooperative() { return cooperative; } - std::atomic_int num_pending_tasks_{0}; std::atomic_bool running_{false}; int total_tasks_ = 0; From 783d100246cc84a886ead502f6c059023a5c2b0c Mon Sep 17 00:00:00 2001 From: Michal Zientkiewicz Date: Mon, 9 Mar 2026 17:08:33 +0100 Subject: [PATCH 14/19] Fixes. Signed-off-by: Michal Zientkiewicz --- dali/pipeline/util/new_thread_pool.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dali/pipeline/util/new_thread_pool.cc b/dali/pipeline/util/new_thread_pool.cc index c90b191fce9..1dec6545263 100644 --- a/dali/pipeline/util/new_thread_pool.cc +++ b/dali/pipeline/util/new_thread_pool.cc @@ -112,8 +112,8 @@ void ThreadPoolFacade::RunAll(bool wait) { else jobs_.front().Run(*tp_, true); } else { - if (jobs_.front().Started()) - jobs_.front().Wait(); + if (!jobs_.front().Started()) + jobs_.front().Run(*tp_, false); WaitForWork(); } } @@ -145,7 +145,7 @@ void ThreadPoolFacade::WaitForWork() { std::rethrow_exception(std::move(errs[0])); } else if (errs.size() > 1) { std::reverse(errs.begin(), errs.end()); - MultipleErrors(std::move(errs)); + throw MultipleErrors(std::move(errs)); } // else no error } } From d09c10ec3e00648d8aab7f8365337e87324f5968 Mon Sep 17 00:00:00 2001 From: Michal Zientkiewicz Date: Mon, 9 Mar 2026 17:10:01 +0100 Subject: [PATCH 15/19] Typo. Signed-off-by: Michal Zientkiewicz --- dali/pipeline/executor/executor2/exec2.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dali/pipeline/executor/executor2/exec2.cc b/dali/pipeline/executor/executor2/exec2.cc index 27399ff987c..280e3e61778 100644 --- a/dali/pipeline/executor/executor2/exec2.cc +++ b/dali/pipeline/executor/executor2/exec2.cc @@ -357,7 +357,7 @@ class Executor2::Impl { config_.thread_pool_threads, config_.device.value_or(CPU_ONLY_DEVICE_ID), config_.set_affinity, - "Executorv_v2"); + "Executor_v2"); } for (auto &n : graph_.Nodes()) { if (n.backend == OpType::CPU) { @@ -371,7 +371,7 @@ class Executor2::Impl { config_.thread_pool_threads, config_.device.value_or(CPU_ONLY_DEVICE_ID), config_.set_affinity, - "Executorv_v2"); + "Executor_v2"); } for (auto &n : graph_.Nodes()) { if (n.backend == OpType::CPU) From 7dd4d8d7b16560014210d359024720f746714c97 Mon Sep 17 00:00:00 2001 From: Michal Zientkiewicz Date: Mon, 9 Mar 2026 17:17:56 +0100 Subject: [PATCH 16/19] Fixes. Signed-off-by: Michal Zientkiewicz --- dali/pipeline/executor/executor2/exec2.cc | 2 +- dali/pipeline/util/new_thread_pool.cc | 2 +- include/dali/core/exec/thread_pool_base.h | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dali/pipeline/executor/executor2/exec2.cc b/dali/pipeline/executor/executor2/exec2.cc index 280e3e61778..d7a91106bba 100644 --- a/dali/pipeline/executor/executor2/exec2.cc +++ b/dali/pipeline/executor/executor2/exec2.cc @@ -346,9 +346,9 @@ class Executor2::Impl { } void SetupThreadPool() { + thread_pool_wrappers_.clear(); new_tp_.reset(); old_tp_.reset(); - thread_pool_wrappers_.clear(); if (UseNewThreadPool()) { std::cerr << "\n!!! Forced use of NewThreadPool !!!" << std::endl; diff --git a/dali/pipeline/util/new_thread_pool.cc b/dali/pipeline/util/new_thread_pool.cc index 1dec6545263..4fbb0b7c4d1 100644 --- a/dali/pipeline/util/new_thread_pool.cc +++ b/dali/pipeline/util/new_thread_pool.cc @@ -86,7 +86,7 @@ ThreadPoolFacade::~ThreadPoolFacade() noexcept { void ThreadPoolFacade::AddWork(std::function work, int64_t priority) { if (jobs_.empty() || jobs_.front().Started()) jobs_.emplace_front(); - jobs_.front().AddTask(work, priority); + jobs_.front().AddTask(std::move(work), priority); } void ThreadPoolFacade::AddWork(std::function work, int64_t priority) { diff --git a/include/dali/core/exec/thread_pool_base.h b/include/dali/core/exec/thread_pool_base.h index 3627eaa7cc8..4e2b94bc905 100644 --- a/include/dali/core/exec/thread_pool_base.h +++ b/include/dali/core/exec/thread_pool_base.h @@ -61,7 +61,7 @@ class DLL_PUBLIC JobBase : public JobBaseFields { public: bool Started() const noexcept { return executor_ != nullptr; } bool WaitStarted() const noexcept { return wait_started_; } - bool WaitCompleted() const noexcept { return wait_started_; } + bool WaitCompleted() const noexcept { return wait_completed_; } static constexpr bool IsCooperative() { return cooperative; } From 5d9cf4fb319b2d3e604e2367a7efded7400e0d88 Mon Sep 17 00:00:00 2001 From: Michal Zientkiewicz Date: Mon, 9 Mar 2026 17:42:43 +0100 Subject: [PATCH 17/19] Documentation. Signed-off-by: Michal Zientkiewicz --- dali/pipeline/util/new_thread_pool.h | 41 ++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/dali/pipeline/util/new_thread_pool.h b/dali/pipeline/util/new_thread_pool.h index 0bd04859fe1..0d011c682e1 100644 --- a/dali/pipeline/util/new_thread_pool.h +++ b/dali/pipeline/util/new_thread_pool.h @@ -40,16 +40,57 @@ class DLL_PUBLIC NewThreadPool : public ThreadPoolBase { #endif }; +/** Combines an existing ThreadPoolBase and Jobs to provide a backward-compatible interface. + * + * This class wraps a (non-owning) pointer to a ThreadPoolBase object and an owning list of Jobs. + * The jobs are created as needed when AddWork is called and destroyed when Wait is called. + * + * There's a subtle difference in priority handling between old thread pool and this wrapper + * when adding more work while the previous work is already started. + */ class DLL_PUBLIC ThreadPoolFacade : public ThreadPool { public: + /** Constructs a ThreadPool facade for a ThreadPoolBase object + * + * @param thread_pool A pointer to an existing thread pool. The caller must keep it alive + * until all work items submitted to the facade have been waited for. + */ explicit ThreadPoolFacade(ThreadPoolBase *thread_pool) : tp_(thread_pool) {} ~ThreadPoolFacade() noexcept override; + /** Adds a new wokr item, with a priority. Higher priority items are picked up first. + * + * After work has been added, it must be run and waited for. + * + * @param work A parameterless function representing the work item. + */ void AddWork(std::function work, int64_t priority = 0) override; + /** Adds a new wokr item, with a priority. Higher priority items are picked up first. + * + * After work has been added, it must be run and waited for. + * + * @param work A function representing the work item, parameterized with a thread index. + */ void AddWork(std::function work, int64_t priority = 0) override; + + /** Sumbits all work added for execution. + * + * Adding more work after this call requires calling RunAll again. + */ void RunAll(bool wait = true) override; + + /** Waits for the work to complete. + * + * This function waits until all work items are complete. + * If any of them throws an exception, the function will rethrow it. + * If multiple items throw, the exceptions are wrapped into MultiplErrors exception. + */ void WaitForWork() override; + + /** Returns the number of threads in the underlying thread pool */ int NumThreads() const override; + + /** Returns the systsm-specific thread ids (not indices) of the threads in the thread pool */ std::vector GetThreadIds() const override; private: From c499fa5d6711400ecf90700f48d296a174453a87 Mon Sep 17 00:00:00 2001 From: Michal Zientkiewicz Date: Tue, 10 Mar 2026 11:07:20 +0100 Subject: [PATCH 18/19] Fix -newtp launch script. Signed-off-by: Michal Zientkiewicz --- qa/TL0_python-self-test-core-newtp/test.sh | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/qa/TL0_python-self-test-core-newtp/test.sh b/qa/TL0_python-self-test-core-newtp/test.sh index f27eee0d12f..5873669832e 100644 --- a/qa/TL0_python-self-test-core-newtp/test.sh +++ b/qa/TL0_python-self-test-core-newtp/test.sh @@ -1,3 +1,5 @@ #!/bin/bash -e export DALI_USE_NEW_THREAD_POOL=1 -source ../TL0_python-self-test-core/test.sh +pushd ../TL0_python-self-test-core +bash -e ../TL0_python-self-test-core/test.sh +popd From 01a53c01677a5fe18d8f90239d1dae3c413afba1 Mon Sep 17 00:00:00 2001 From: Michal Zientkiewicz Date: Tue, 10 Mar 2026 13:18:17 +0100 Subject: [PATCH 19/19] Review issues. Signed-off-by: Michal Zientkiewicz --- dali/pipeline/util/new_thread_pool.h | 8 ++++---- qa/TL0_python-self-test-core-newtp/test.sh | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/dali/pipeline/util/new_thread_pool.h b/dali/pipeline/util/new_thread_pool.h index 0d011c682e1..1a8660b9568 100644 --- a/dali/pipeline/util/new_thread_pool.h +++ b/dali/pipeline/util/new_thread_pool.h @@ -58,14 +58,14 @@ class DLL_PUBLIC ThreadPoolFacade : public ThreadPool { explicit ThreadPoolFacade(ThreadPoolBase *thread_pool) : tp_(thread_pool) {} ~ThreadPoolFacade() noexcept override; - /** Adds a new wokr item, with a priority. Higher priority items are picked up first. + /** Adds a new work item, with a priority. Higher priority items are picked up first. * * After work has been added, it must be run and waited for. * * @param work A parameterless function representing the work item. */ void AddWork(std::function work, int64_t priority = 0) override; - /** Adds a new wokr item, with a priority. Higher priority items are picked up first. + /** Adds a new work item, with a priority. Higher priority items are picked up first. * * After work has been added, it must be run and waited for. * @@ -73,7 +73,7 @@ class DLL_PUBLIC ThreadPoolFacade : public ThreadPool { */ void AddWork(std::function work, int64_t priority = 0) override; - /** Sumbits all work added for execution. + /** Submits all work added for execution. * * Adding more work after this call requires calling RunAll again. */ @@ -83,7 +83,7 @@ class DLL_PUBLIC ThreadPoolFacade : public ThreadPool { * * This function waits until all work items are complete. * If any of them throws an exception, the function will rethrow it. - * If multiple items throw, the exceptions are wrapped into MultiplErrors exception. + * If multiple items throw, the exceptions are wrapped into MultipleErrors exception. */ void WaitForWork() override; diff --git a/qa/TL0_python-self-test-core-newtp/test.sh b/qa/TL0_python-self-test-core-newtp/test.sh index 5873669832e..a55d1407f74 100644 --- a/qa/TL0_python-self-test-core-newtp/test.sh +++ b/qa/TL0_python-self-test-core-newtp/test.sh @@ -1,5 +1,5 @@ #!/bin/bash -e export DALI_USE_NEW_THREAD_POOL=1 pushd ../TL0_python-self-test-core -bash -e ../TL0_python-self-test-core/test.sh +bash -e ./test.sh popd