diff --git a/dali/benchmark/displacement_cpu_bench.cc b/dali/benchmark/displacement_cpu_bench.cc index cd890dab293..cb19fd35441 100644 --- a/dali/benchmark/displacement_cpu_bench.cc +++ b/dali/benchmark/displacement_cpu_bench.cc @@ -94,7 +94,7 @@ void DisplacementBench(benchmark::State& st) {//NOLINT } // We need a thread pool - ThreadPool tp(4, 0, false, "DisplacementBench"); + OldThreadPool tp(4, 0, false, "DisplacementBench"); // Create workspace and set input and output Workspace ws; diff --git a/dali/benchmark/operator_bench.h b/dali/benchmark/operator_bench.h index a7a002897dc..3b46fe71b26 100644 --- a/dali/benchmark/operator_bench.h +++ b/dali/benchmark/operator_bench.h @@ -68,7 +68,7 @@ class OperatorBench : public DALIBenchmark { // Create workspace and set input and output Workspace ws; ws.AddInput(data_in); - ThreadPool tp(num_threads, 0, false, "OperatorBench"); + OldThreadPool tp(num_threads, 0, false, "OperatorBench"); ws.SetThreadPool(&tp); Setup>(op_ptr, op_spec, ws, batch_size); diff --git a/dali/benchmark/thread_pool_bench.cc b/dali/benchmark/thread_pool_bench.cc index 86ea6fb017c..66c5e5033ad 100644 --- a/dali/benchmark/thread_pool_bench.cc +++ b/dali/benchmark/thread_pool_bench.cc @@ -33,7 +33,7 @@ BENCHMARK_DEFINE_F(ThreadPoolBench, AddWorkDeferred)(benchmark::State& st) { int work_size_max = st.range(2); int nthreads = st.range(3); - ThreadPool thread_pool(nthreads, 0, false, "ThreadPoolBench"); + OldThreadPool thread_pool(nthreads, 0, false, "ThreadPoolBench"); std::vector data(2000, 0xFF); std::atomic total_count(0); diff --git a/dali/kernels/test/scatter_gather_test.cc b/dali/kernels/test/scatter_gather_test.cc index 67a47059d3e..ad386810dfa 100644 --- a/dali/kernels/test/scatter_gather_test.cc +++ b/dali/kernels/test/scatter_gather_test.cc @@ -142,7 +142,7 @@ class ScatterGatherTest : public testing::Test { this->template Memset(out_ptr.get(), 0, out.size()); T sg(max_block); - ThreadPool tp(4, 0, false, "test TP"); + OldThreadPool tp(4, 0, false, "test TP"); // copy for (auto &r : ranges) sg.AddCopy(r.dst, r.src, r.size); diff --git a/dali/operators/decoder/nvjpeg/nvjpeg_decoder_decoupled_api.h b/dali/operators/decoder/nvjpeg/nvjpeg_decoder_decoupled_api.h index 878577db0e5..0df79370fe7 100644 --- a/dali/operators/decoder/nvjpeg/nvjpeg_decoder_decoupled_api.h +++ b/dali/operators/decoder/nvjpeg/nvjpeg_decoder_decoupled_api.h @@ -1150,8 +1150,8 @@ class nvJPEGDecoder : public StatelessOperator, CachedDecoderImpl nvjpegDevAllocator_t device_allocator_; nvjpegPinnedAllocator_t pinned_allocator_; - ThreadPool thread_pool_; - ThreadPool nvjpeg2k_thread_; + OldThreadPool thread_pool_; + OldThreadPool nvjpeg2k_thread_; static constexpr int kOutputDim = 3; TensorList hw_decoder_images_staging_; diff --git a/dali/operators/imgcodec/image_decoder.h b/dali/operators/imgcodec/image_decoder.h index 14c031fa36c..d449a6723ac 100644 --- a/dali/operators/imgcodec/image_decoder.h +++ b/dali/operators/imgcodec/image_decoder.h @@ -229,7 +229,7 @@ class ImageDecoder : public StatelessOperator { GetDecoderSpecificArguments(spec); if (std::is_same::value) { - thread_pool_ = std::make_unique(num_threads_, device_id_, + thread_pool_ = std::make_unique(num_threads_, device_id_, spec.GetArgument("affine"), "MixedDecoder"); if (spec_.HasArgument("cache_size")) cache_ = std::make_unique(spec_); diff --git a/dali/operators/reader/fits_reader_op.cc b/dali/operators/reader/fits_reader_op.cc index cb600781ed3..539d33d9105 100644 --- a/dali/operators/reader/fits_reader_op.cc +++ b/dali/operators/reader/fits_reader_op.cc @@ -43,7 +43,7 @@ This operator can be used in the following modes: 2. Read file names from a text file indicated in `file_list` argument. 3. Read files listed in `files` argument. 4. Number of outputs per sample corresponds to the length of `hdu_indices` argument. By default, -first HDU with data is read from each file, so the number of outputs defaults to 1. +first HDU with data is read from each file, so the number of outputs defaults to 1. )") .NumInput(0) .OutputFn(detail::FitsReaderOutputFn) @@ -94,7 +94,7 @@ If `file_root` is provided, the paths are treated as being relative to it. This argument is mutually exclusive with `file_list`.)", nullptr) .AddOptionalArg("hdu_indices", - R"(HDU indices to read. If not provided, the first HDU after the primary + R"(HDU indices to read. If not provided, the first HDU after the primary will be yielded. Since HDUs are indexed starting from 1, the default value is as follows: hdu_indices = [2]. Size of the provided list hdu_indices defines number of outputs per sample.)", std::vector{2}) @@ -114,7 +114,7 @@ void FitsReaderCPU::RunImpl(Workspace &ws) { auto &output = ws.Output(output_idx); for (int file_idx = 0; file_idx < num_samples; file_idx++) { auto &sample = GetSample(file_idx); - ThreadPool::Work copy_task = [output_idx = output_idx, data_idx = file_idx, &output, + auto copy_task = [output_idx = output_idx, data_idx = file_idx, &output, &sample](int) { std::memcpy(output.raw_mutable_tensor(data_idx), sample.data[output_idx].raw_data(), sample.data[output_idx].nbytes()); diff --git a/dali/operators/reader/nemo_asr_reader_op.h b/dali/operators/reader/nemo_asr_reader_op.h index 02c94b4c8c4..3d2dcf9f8a4 100644 --- a/dali/operators/reader/nemo_asr_reader_op.h +++ b/dali/operators/reader/nemo_asr_reader_op.h @@ -46,7 +46,7 @@ class NemoAsrReader : public DataReader DALIDataType dtype_; int num_threads_; - ThreadPool thread_pool_; + OldThreadPool thread_pool_; // prefetch_depth * batch_size set of buffers that we reuse to decode audio using TensorListPtr = std::unique_ptr>; diff --git a/dali/operators/reader/numpy_reader_gpu_op.h b/dali/operators/reader/numpy_reader_gpu_op.h index 860d8ae832f..4810df0723c 100644 --- a/dali/operators/reader/numpy_reader_gpu_op.h +++ b/dali/operators/reader/numpy_reader_gpu_op.h @@ -55,7 +55,7 @@ class NumpyReaderGPU : gds::GDSLazyInit, public NumpyReader> prefetched_batch_tensors_; diff --git a/dali/operators/reader/numpy_reader_op.h b/dali/operators/reader/numpy_reader_op.h index 6dd2df49468..e04f98033d5 100644 --- a/dali/operators/reader/numpy_reader_op.h +++ b/dali/operators/reader/numpy_reader_op.h @@ -196,8 +196,8 @@ class NumpyReaderCPU : public NumpyReader { */ size_t o_direct_alignm_ = 0; size_t o_direct_read_len_alignm_ = 0; - // ThreadPool for prefetch which is a separate thread - ThreadPool thread_pool_; + // Thread Pool for prefetch which is a separate thread + OldThreadPool thread_pool_; }; } // namespace dali diff --git a/dali/operators/reader/tfrecord_reader_op.h b/dali/operators/reader/tfrecord_reader_op.h index 4c2cbd453b1..6cf9e29167b 100644 --- a/dali/operators/reader/tfrecord_reader_op.h +++ b/dali/operators/reader/tfrecord_reader_op.h @@ -79,8 +79,8 @@ class TFRecordReader bool dont_use_mmap_ = false; bool use_o_direct_ = false; size_t o_direct_chunk_size_ = 0; - // ThreadPool for prefetch which is a separate thread - ThreadPool thread_pool_; + // Thread Pool for prefetch which is a separate thread + OldThreadPool thread_pool_; }; } // namespace dali diff --git a/dali/operators/reader/webdataset_reader_op.cc b/dali/operators/reader/webdataset_reader_op.cc index 55b14eee77d..f3fc0179939 100644 --- a/dali/operators/reader/webdataset_reader_op.cc +++ b/dali/operators/reader/webdataset_reader_op.cc @@ -50,7 +50,7 @@ void WebdatasetReader::RunImpl(Workspace &ws) { auto& output = ws.Output(output_idx); for (int data_idx = 0; data_idx < num_samples; data_idx++) { auto& sample = GetSample(data_idx); - ThreadPool::Work copy_task = [output_idx = output_idx, data_idx = data_idx, &output, + auto copy_task = [output_idx = output_idx, data_idx = data_idx, &output, &sample](int) { output.SetMeta(data_idx, sample[output_idx].GetMeta()); std::memcpy(output.raw_mutable_tensor(data_idx), sample[output_idx].raw_data(), diff --git a/dali/operators/video/decoder/video_decoder_base.h b/dali/operators/video/decoder/video_decoder_base.h index 608486006e5..8c7284b4682 100644 --- a/dali/operators/video/decoder/video_decoder_base.h +++ b/dali/operators/video/decoder/video_decoder_base.h @@ -48,7 +48,7 @@ class DLL_PUBLIC VideoDecoderBase : public Operator { if (spec_.HasArgument("device")) { auto device_str = spec_.template GetArgument("device"); if (device_str == "mixed") { - thread_pool_ = std::make_unique( + thread_pool_ = std::make_unique( spec.GetArgument("num_threads"), spec.GetArgument("device_id"), spec.GetArgument("affine"), "VideoDecoder"); } diff --git a/dali/operators/video/input/video_input.h b/dali/operators/video/input/video_input.h index 210ee4dd779..8891bb93e76 100644 --- a/dali/operators/video/input/video_input.h +++ b/dali/operators/video/input/video_input.h @@ -99,8 +99,11 @@ class VideoInput : public InputOperator { last_sequence_policy_ == "partial" || last_sequence_policy_ == "pad", make_string("Provided `last_sequence_policy` is not supported: ", last_sequence_policy_)); if constexpr (!is_cpu) { - thread_pool_.emplace(this->num_threads_, spec.GetArgument("device_id"), - spec.GetArgument("affine"), "VideoInput"); + thread_pool_ = std::make_unique( + this->num_threads_, + spec.GetArgument("device_id"), + spec.GetArgument("affine"), + "VideoInput"); } } @@ -208,7 +211,7 @@ class VideoInput : public InputOperator { if constexpr (is_cpu) { return ws.GetThreadPool(); } else { - assert(thread_pool_.has_value()); + assert(thread_pool_); return *thread_pool_; } } @@ -243,7 +246,7 @@ class VideoInput : public InputOperator { uint8_t pad_frame_value_ = 0; /// CPU operators have default Thread Pool inside Workspace. Mixed and GPU ops don't. - std::optional thread_pool_ = std::nullopt; + std::unique_ptr thread_pool_; std::vector> frames_decoders_; diff --git a/dali/pipeline/executor/executor2/exec2.cc b/dali/pipeline/executor/executor2/exec2.cc index 1367d4ecd66..d7a91106bba 100644 --- a/dali/pipeline/executor/executor2/exec2.cc +++ b/dali/pipeline/executor/executor2/exec2.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2024-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -24,6 +24,7 @@ #include "dali/pipeline/executor/executor2/exec_graph.h" #include "dali/pipeline/executor/executor2/stream_assignment.h" #include "dali/pipeline/operator/builtin/input_operator.h" +#include "dali/pipeline/util/new_thread_pool.h" namespace dali { namespace exec2 { @@ -42,9 +43,9 @@ void LimitBackendConcurrency(ExecGraph &graph, OpType backend, int max_concurren void ApplyConcurrencyLimit(ExecGraph &graph, OperatorConcurrency concurrency) { switch (concurrency) { case OperatorConcurrency::Full: - // TODO(michalz): Fix ThreadPool. - LimitBackendConcurrency(graph, OpType::CPU); - break; // other operators have no restrictions + if (!UseNewThreadPool()) // old thread pool is not thread safe + LimitBackendConcurrency(graph, OpType::CPU); + break; case OperatorConcurrency::Backend: LimitBackendConcurrency(graph, OpType::CPU); LimitBackendConcurrency(graph, OpType::GPU); @@ -345,18 +346,37 @@ class Executor2::Impl { } void SetupThreadPool() { - if (graph_info_.num_cpu > 0) { - tp_ = std::make_unique( - config_.thread_pool_threads, - config_.device.value_or(CPU_ONLY_DEVICE_ID), - config_.set_affinity, - "Executorv_v2"); + thread_pool_wrappers_.clear(); + new_tp_.reset(); + old_tp_.reset(); + + if (UseNewThreadPool()) { + std::cerr << "\n!!! Forced use of NewThreadPool !!!" << std::endl; + if (graph_info_.num_cpu > 0) { + new_tp_ = std::make_unique( + config_.thread_pool_threads, + config_.device.value_or(CPU_ONLY_DEVICE_ID), + config_.set_affinity, + "Executor_v2"); + } + for (auto &n : graph_.Nodes()) { + if (n.backend == OpType::CPU) { + thread_pool_wrappers_.push_back(std::make_unique(new_tp_.get())); + n.env.thread_pool = thread_pool_wrappers_.back().get(); + } + } } else { - tp_.reset(); - } - for (auto &n : graph_.Nodes()) { - if (n.backend == OpType::CPU) - n.env.thread_pool = tp_.get(); + if (graph_info_.num_cpu > 0) { + old_tp_ = std::make_unique( + config_.thread_pool_threads, + config_.device.value_or(CPU_ONLY_DEVICE_ID), + config_.set_affinity, + "Executor_v2"); + } + for (auto &n : graph_.Nodes()) { + if (n.backend == OpType::CPU) + n.env.thread_pool = old_tp_.get(); + } } } @@ -421,7 +441,9 @@ class Executor2::Impl { // Runtime environment - std::unique_ptr tp_; + std::unique_ptr old_tp_; + std::unique_ptr new_tp_; + std::vector> thread_pool_wrappers_; std::queue pending_outputs_; std::vector streams_; std::map> node_map_; diff --git a/dali/pipeline/executor/executor2/exec_graph_test.cc b/dali/pipeline/executor/executor2/exec_graph_test.cc index 44989a12a4a..ca49ce4e7ea 100644 --- a/dali/pipeline/executor/executor2/exec_graph_test.cc +++ b/dali/pipeline/executor/executor2/exec_graph_test.cc @@ -84,7 +84,7 @@ TEST(ExecGraphTest, SimpleGraph) { LimitBackendConcurrency(g, OpType::CPU); WorkspaceParams params = {}; - auto tp = std::make_unique(std::thread::hardware_concurrency(), 0, false, "test"); + auto tp = std::make_unique(std::thread::hardware_concurrency(), 0, false, "test"); ExecEnv env; env.thread_pool = tp.get(); params.env = &env; @@ -144,7 +144,7 @@ TEST(ExecGraphTest, SimpleGraphRepeat) { g.Link(n1, 0, n2, 1); g.Link(n2, 0, no, 0); LimitBackendConcurrency(g, OpType::CPU); - ThreadPool tp(4, 0, false, "test"); + OldThreadPool tp(4, 0, false, "test"); WorkspaceParams params = {}; ExecEnv env; env.thread_pool = &tp; @@ -212,7 +212,7 @@ TEST(ExecGraphTest, SimpleGraphScheduleAheadCPU) { g.Link(n2, 0, no, 0); LimitBackendConcurrency(g, OpType::CPU); - ThreadPool tp(4, 0, false, "test"); + OldThreadPool tp(4, 0, false, "test"); WorkspaceParams params = {}; ExecEnv env; env.thread_pool = &tp; @@ -306,7 +306,7 @@ TEST(ExecGraphTest, GraphScheduleAheadGPU) { n2->env.order = s2; no->env.order = s3; - ThreadPool tp(4, 0, false, "test"); + OldThreadPool tp(4, 0, false, "test"); n1->env.thread_pool = &tp; @@ -379,7 +379,7 @@ TEST(ExecGraphTest, Exception) { g.Link(n1, 0, n2, 1); g.Link(n2, 0, no, 0); LimitBackendConcurrency(g, OpType::CPU); - ThreadPool tp(std::thread::hardware_concurrency(), 0, false, "test"); + OldThreadPool tp(std::thread::hardware_concurrency(), 0, false, "test"); WorkspaceParams params = {}; ExecEnv env; env.thread_pool = &tp; @@ -460,7 +460,7 @@ TEST(ExecGraphTest, LoweredExec) { g.Lower(def); LimitBackendConcurrency(g, OpType::CPU); - ThreadPool tp(std::thread::hardware_concurrency(), 0, false, "test"); + OldThreadPool tp(std::thread::hardware_concurrency(), 0, false, "test"); WorkspaceParams params = {}; ExecEnv env; env.thread_pool = &tp; diff --git a/dali/pipeline/executor/executor_impl.h b/dali/pipeline/executor/executor_impl.h index 1b919ff8913..0fa5d5fd2f8 100644 --- a/dali/pipeline/executor/executor_impl.h +++ b/dali/pipeline/executor/executor_impl.h @@ -318,7 +318,7 @@ class DLL_PUBLIC Executor : public ExecutorBase, public QueuePolicy { OpGraph *graph_ = nullptr; EventPool event_pool_; - ThreadPool thread_pool_; + OldThreadPool thread_pool_; std::vector errors_; mutable std::mutex errors_mutex_; bool exec_error_; diff --git a/dali/pipeline/operator/eager_operator.h b/dali/pipeline/operator/eager_operator.h index 2abaa9da147..e636bd4fe7d 100644 --- a/dali/pipeline/operator/eager_operator.h +++ b/dali/pipeline/operator/eager_operator.h @@ -140,8 +140,8 @@ class DLL_PUBLIC EagerOperator { DLL_PUBLIC inline static void UpdateThreadPool(int num_threads) { std::lock_guard lock(shared_thread_pool_mutex_); - SharedThreadPoolInstance().reset( - new ThreadPool(num_threads, CPU_ONLY_DEVICE_ID, false, "EagerOperator")); + SharedThreadPoolInstance() = std::make_unique( + num_threads, CPU_ONLY_DEVICE_ID, false, "EagerOperator"); } // Update shared CUDA stream used for all direct operators. @@ -170,7 +170,7 @@ class DLL_PUBLIC EagerOperator { } static inline std::shared_ptr &SharedThreadPoolInstance() { - static std::shared_ptr thread_pool = std::make_shared( + static std::shared_ptr thread_pool = std::make_shared( GetDefaultNumThreads(), CPU_ONLY_DEVICE_ID, false, "EagerOperator"); return thread_pool; diff --git a/dali/pipeline/operator/false_gpu_operator.h b/dali/pipeline/operator/false_gpu_operator.h index 41ae260e871..f61f2220025 100644 --- a/dali/pipeline/operator/false_gpu_operator.h +++ b/dali/pipeline/operator/false_gpu_operator.h @@ -116,7 +116,7 @@ class FalseGPUOperator : public Operator { private: CPUOperator cpu_impl_; - ThreadPool thread_pool_; + OldThreadPool thread_pool_; Workspace cpu_ws_; // Keep it here so that we can modify (ws gives only const ref to inputs) diff --git a/dali/pipeline/pipeline_debug.h b/dali/pipeline/pipeline_debug.h index d8005188027..0bc6decef0b 100644 --- a/dali/pipeline/pipeline_debug.h +++ b/dali/pipeline/pipeline_debug.h @@ -95,7 +95,7 @@ class DLL_PUBLIC PipelineDebug { int device_id_; int num_threads_; CUDAStreamLease cuda_stream_; - ThreadPool thread_pool_; + OldThreadPool thread_pool_; std::unordered_map> cpu_operators_; std::unordered_map> gpu_operators_; std::unordered_map> mixed_operators_; diff --git a/dali/pipeline/util/new_thread_pool.cc b/dali/pipeline/util/new_thread_pool.cc new file mode 100644 index 00000000000..4fbb0b7c4d1 --- /dev/null +++ b/dali/pipeline/util/new_thread_pool.cc @@ -0,0 +1,170 @@ +// Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include "dali/core/call_at_exit.h" +#include "dali/core/small_vector.h" +#include "dali/pipeline/util/new_thread_pool.h" +#include "dali/core/device_guard.h" +#include "dali/util/nvml.h" +#include "dali/core/nvtx.h" + +namespace dali { + +NewThreadPool::NewThreadPool( + int num_threads, + std::optional device_id, + bool set_affinity, + std::string name) + : name_(name) { + if (device_id.has_value() && *device_id == CPU_ONLY_DEVICE_ID) + device_id = std::nullopt; + device_id_ = device_id; +#if NVML_ENABLED + // We use NVML only for setting thread affinity + if (device_id.has_value() && set_affinity) { + nvml_handle_ = nvml::NvmlInstance::CreateNvmlInstance(); + } +#endif + Init(num_threads, [=, this](int thread_idx) { + return OnThreadStart(thread_idx, set_affinity); + }); +} + +std::any NewThreadPool::OnThreadStart(int thread_idx, bool set_affinity) { + std::string name = make_string("[DALI][NT", thread_idx, "]", name_); + SetThreadName(name.c_str()); + std::any dg; + if (device_id_.has_value()) + dg.emplace(*device_id_); +#if NVML_ENABLED + try { + if (set_affinity) { + const char *env_affinity = std::getenv("DALI_AFFINITY_MASK"); + int core = -1; + if (env_affinity) { + const auto &vec = string_split(env_affinity, ','); + if ((size_t)thread_idx < vec.size()) { + core = std::stoi(vec[thread_idx]); + } else { + DALI_WARN("DALI_AFFINITY_MASK environment variable is set, " + "but does not have enough entries: thread_id (", thread_idx, + ") vs #entries (", vec.size(), "). Ignoring..."); + } + } + nvml::SetCPUAffinity(core); + } + } catch (const std::exception &e) { + DALI_WARN("Couldn't set thread affinity in thread ", thread_idx, " of thread pool \"", + name_, "\". Exception ", typeid(e).name(), ": ", e.what()); + } catch (...) { + DALI_WARN("Couldn't set thread affinity in thread ", thread_idx, " of thread pool \"", + name_, "\". Unknown error."); + } +#endif + return dg; +} + +ThreadPoolFacade::~ThreadPoolFacade() noexcept { + RunAll(); +} + +void ThreadPoolFacade::AddWork(std::function work, int64_t priority) { + if (jobs_.empty() || jobs_.front().Started()) + jobs_.emplace_front(); + jobs_.front().AddTask(std::move(work), priority); +} + +void ThreadPoolFacade::AddWork(std::function work, int64_t priority) { + if (jobs_.empty() || jobs_.front().Started()) + jobs_.emplace_front(); + jobs_.front().AddTask([w = std::move(work)]() { + w(ThreadPoolBase::this_thread_idx()); + }, priority); +} + +void ThreadPoolFacade::RunAll(bool wait) { + if (!jobs_.empty()) { + if (!wait) { + if (!jobs_.front().Started()) // all subsequent jobs_ must be started + jobs_.front().Run(*tp_, false); + } else { + if (jobs_.size() == 1) { // fast path for the common case + auto atexit = AtScopeExit([&]() { + jobs_.clear(); + }); + if (jobs_.front().Started()) + jobs_.front().Wait(); + else + jobs_.front().Run(*tp_, true); + } else { + if (!jobs_.front().Started()) + jobs_.front().Run(*tp_, false); + WaitForWork(); + } + } + } +} + +void ThreadPoolFacade::WaitForWork() { + if (!jobs_.empty()) { + if (!jobs_.front().Started()) + throw std::logic_error("WaitForWork called without Run"); + auto atexit = AtScopeExit([&]() { + jobs_.clear(); + }); + // This won't be allocated unless an exception was thrown + std::vector errs; + // The jobs in jobs_ are ordered from latest to oldest, so theres little chance that more than + // one Wait would block. + for (auto &job : jobs_) { + try { + job.Wait(); + } catch (MultipleErrors &e) { + // unwrap MultipleErrors to avoid nesting + errs.insert(errs.end(), e.errors().begin(), e.errors().end()); + } catch (...) { + errs.push_back(std::current_exception()); + } + } + if (errs.size() == 1) { + std::rethrow_exception(std::move(errs[0])); + } else if (errs.size() > 1) { + std::reverse(errs.begin(), errs.end()); + throw MultipleErrors(std::move(errs)); + } // else no error + } +} + +int ThreadPoolFacade::NumThreads() const { + return tp_->NumThreads(); +} + +std::vector ThreadPoolFacade::GetThreadIds() const { + return tp_->GetThreadIds(); +} + +bool UseNewThreadPool() { + static bool use_new_thread_pool = []() { + const char *new_tp = getenv("DALI_USE_NEW_THREAD_POOL"); + return new_tp && atoi(new_tp); + }(); + return use_new_thread_pool; +} + + +} // namespace dali diff --git a/dali/pipeline/util/new_thread_pool.h b/dali/pipeline/util/new_thread_pool.h new file mode 100644 index 00000000000..0d011c682e1 --- /dev/null +++ b/dali/pipeline/util/new_thread_pool.h @@ -0,0 +1,105 @@ +// Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef DALI_PIPELINE_UTIL_NEW_THREAD_POOL_H_ +#define DALI_PIPELINE_UTIL_NEW_THREAD_POOL_H_ + +#include +#include +#include +#include +#include "dali/core/exec/thread_pool_base.h" +#if NVML_ENABLED +#include "dali/util/nvml.h" +#endif +#include "dali/pipeline/util/thread_pool_interface.h" + +namespace dali { + +class DLL_PUBLIC NewThreadPool : public ThreadPoolBase { + public: + NewThreadPool(int num_threads, std::optional device_id, bool set_affinity, std::string name); + + private: + std::any OnThreadStart(int thread_idx, bool set_affinity); + std::optional device_id_; + std::string name_; +#if NVML_ENABLED + nvml::NvmlInstance nvml_handle_; +#endif +}; + +/** Combines an existing ThreadPoolBase and Jobs to provide a backward-compatible interface. + * + * This class wraps a (non-owning) pointer to a ThreadPoolBase object and an owning list of Jobs. + * The jobs are created as needed when AddWork is called and destroyed when Wait is called. + * + * There's a subtle difference in priority handling between old thread pool and this wrapper + * when adding more work while the previous work is already started. + */ +class DLL_PUBLIC ThreadPoolFacade : public ThreadPool { + public: + /** Constructs a ThreadPool facade for a ThreadPoolBase object + * + * @param thread_pool A pointer to an existing thread pool. The caller must keep it alive + * until all work items submitted to the facade have been waited for. + */ + explicit ThreadPoolFacade(ThreadPoolBase *thread_pool) : tp_(thread_pool) {} + ~ThreadPoolFacade() noexcept override; + + /** Adds a new wokr item, with a priority. Higher priority items are picked up first. + * + * After work has been added, it must be run and waited for. + * + * @param work A parameterless function representing the work item. + */ + void AddWork(std::function work, int64_t priority = 0) override; + /** Adds a new wokr item, with a priority. Higher priority items are picked up first. + * + * After work has been added, it must be run and waited for. + * + * @param work A function representing the work item, parameterized with a thread index. + */ + void AddWork(std::function work, int64_t priority = 0) override; + + /** Sumbits all work added for execution. + * + * Adding more work after this call requires calling RunAll again. + */ + void RunAll(bool wait = true) override; + + /** Waits for the work to complete. + * + * This function waits until all work items are complete. + * If any of them throws an exception, the function will rethrow it. + * If multiple items throw, the exceptions are wrapped into MultiplErrors exception. + */ + void WaitForWork() override; + + /** Returns the number of threads in the underlying thread pool */ + int NumThreads() const override; + + /** Returns the systsm-specific thread ids (not indices) of the threads in the thread pool */ + std::vector GetThreadIds() const override; + + private: + ThreadPoolBase *tp_ = nullptr; + std::list jobs_; +}; + +DLL_PUBLIC bool UseNewThreadPool(); + +} // namespace dali + +#endif // DALI_PIPELINE_UTIL_NEW_THREAD_POOL_H_ diff --git a/dali/pipeline/util/thread_pool.cc b/dali/pipeline/util/thread_pool.cc index d30a5f64837..e1a7fe7d248 100644 --- a/dali/pipeline/util/thread_pool.cc +++ b/dali/pipeline/util/thread_pool.cc @@ -27,7 +27,7 @@ namespace dali { -ThreadPool::ThreadPool(int num_thread, int device_id, bool set_affinity, const char* name) +OldThreadPool::OldThreadPool(int num_thread, int device_id, bool set_affinity, const char* name) : threads_(num_thread) { DALI_ENFORCE(num_thread > 0, "Thread pool must have non-zero size"); #if NVML_ENABLED @@ -38,13 +38,14 @@ ThreadPool::ThreadPool(int num_thread, int device_id, bool set_affinity, const c #endif // Start the threads in the main loop for (int i = 0; i < num_thread; ++i) { - threads_[i] = std::thread(std::bind(&ThreadPool::ThreadMain, this, i, device_id, set_affinity, + threads_[i] = std::thread(std::bind(&OldThreadPool::ThreadMain, this, + i, device_id, set_affinity, make_string("[DALI][TP", i, "]", name))); } tl_errors_.resize(num_thread); } -ThreadPool::~ThreadPool() { +OldThreadPool::~OldThreadPool() { WaitForWork(false); std::unique_lock lock(queue_lock_); @@ -58,7 +59,11 @@ ThreadPool::~ThreadPool() { } } -void ThreadPool::AddWork(Work work, int64_t priority) { +void OldThreadPool::AddWork(Work work, int64_t priority) { + AddWork([w = std::move(work)](int) { w(); }, priority); +} + +void OldThreadPool::AddWork(WorkWithThreadIdx work, int64_t priority) { outstanding_work_.fetch_add(1); if (started_) { { @@ -71,8 +76,12 @@ void ThreadPool::AddWork(Work work, int64_t priority) { } } +void OldThreadPool::WaitForWork() { + WaitForWork(true); +} + // Blocks until all work issued to the thread pool is complete -void ThreadPool::WaitForWork(bool checkForErrors) { +void OldThreadPool::WaitForWork(bool checkForErrors) { if (outstanding_work_.load()) { std::unique_lock lock(completed_mutex_); completed_.wait(lock, [&, this] { @@ -95,7 +104,7 @@ void ThreadPool::WaitForWork(bool checkForErrors) { } } -void ThreadPool::RunAll(bool wait) { +void OldThreadPool::RunAll(bool wait) { if (!started_) { { std::lock_guard lock(queue_lock_); @@ -108,11 +117,11 @@ void ThreadPool::RunAll(bool wait) { } } -int ThreadPool::NumThreads() const { +int OldThreadPool::NumThreads() const { return threads_.size(); } -std::vector ThreadPool::GetThreadIds() const { +std::vector OldThreadPool::GetThreadIds() const { std::vector tids; tids.reserve(threads_.size()); for (const auto &thread : threads_) @@ -121,8 +130,9 @@ std::vector ThreadPool::GetThreadIds() const { } -void ThreadPool::ThreadMain(int thread_id, int device_id, bool set_affinity, +void OldThreadPool::ThreadMain(int thread_id, int device_id, bool set_affinity, const std::string &name) { + this_thread_idx_ = thread_id; SetThreadName(name.c_str()); DeviceGuard g(device_id); try { @@ -158,7 +168,7 @@ void ThreadPool::ThreadMain(int thread_id, int device_id, bool set_affinity, break; // Get work from the queue. - Work work = std::move(work_queue_.top().second); + WorkWithThreadIdx work = std::move(work_queue_.top().second); work_queue_.pop(); // Unlock the lock lock.unlock(); diff --git a/dali/pipeline/util/thread_pool.h b/dali/pipeline/util/thread_pool.h index 3e32d759f62..2dd9a087aff 100644 --- a/dali/pipeline/util/thread_pool.h +++ b/dali/pipeline/util/thread_pool.h @@ -32,21 +32,22 @@ #endif #include "dali/core/semaphore.h" #include "dali/core/spinlock.h" - +#include "dali/core/exec/thread_idx.h" +#include "dali/pipeline/util/thread_pool_interface.h" namespace dali { -class DLL_PUBLIC ThreadPool { +class DLL_PUBLIC OldThreadPool : public ThreadPool { public: - // Basic unit of work that our threads do - typedef std::function Work; + using Work = std::function; + using WorkWithThreadIdx = std::function; - DLL_PUBLIC ThreadPool(int num_thread, int device_id, bool set_affinity, const char* name); + OldThreadPool(int num_thread, int device_id, bool set_affinity, const char* name); - DLL_PUBLIC ThreadPool(int num_thread, int device_id, bool set_affinity, const std::string& name) - : ThreadPool(num_thread, device_id, set_affinity, name.c_str()) {} + OldThreadPool(int num_thread, int device_id, bool set_affinity, const std::string& name) + : OldThreadPool(num_thread, device_id, set_affinity, name.c_str()) {} - DLL_PUBLIC ~ThreadPool(); + ~OldThreadPool(); /** * @brief Adds work to the queue with optional priority, and optionally starts processing @@ -56,33 +57,36 @@ class DLL_PUBLIC ThreadPool { * Once work is started, the threads will continue to pick up whatever work is scheduled * until WaitForWork is called. */ - DLL_PUBLIC void AddWork(Work work, int64_t priority = 0); + void AddWork(WorkWithThreadIdx work, int64_t priority = 0) override; + + void AddWork(Work work, int64_t priority = 0) override; /** * @brief Wakes up all the threads to complete all the queued work, * optionally not waiting for the work to be finished before return * (the default wait=true is equivalent to invoking WaitForWork after RunAll). */ - DLL_PUBLIC void RunAll(bool wait = true); + void RunAll(bool wait = true) override; /** * @brief Waits until all work issued to the thread pool is complete */ - DLL_PUBLIC void WaitForWork(bool checkForErrors = true); + void WaitForWork() override; - DLL_PUBLIC int NumThreads() const; + int NumThreads() const override; - DLL_PUBLIC std::vector GetThreadIds() const; + std::vector GetThreadIds() const override; - DISABLE_COPY_MOVE_ASSIGN(ThreadPool); + DISABLE_COPY_MOVE_ASSIGN(OldThreadPool); private: - DLL_PUBLIC void ThreadMain(int thread_id, int device_id, bool set_affinity, - const std::string &name); + void WaitForWork(bool checkForErrors); + + void ThreadMain(int thread_id, int device_id, bool set_affinity, const std::string &name); vector threads_; - using PrioritizedWork = std::pair; + using PrioritizedWork = std::pair>; struct SortByPriority { bool operator() (const PrioritizedWork &a, const PrioritizedWork &b) { return a.first < b.first; diff --git a/dali/pipeline/util/thread_pool_interface.h b/dali/pipeline/util/thread_pool_interface.h new file mode 100644 index 00000000000..7d351101135 --- /dev/null +++ b/dali/pipeline/util/thread_pool_interface.h @@ -0,0 +1,44 @@ +// Copyright (c) 2017-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef DALI_PIPELINE_UTIL_THREAD_POOL_INTERFACE_H_ +#define DALI_PIPELINE_UTIL_THREAD_POOL_INTERFACE_H_ + +#include +#include +#include +#include "dali/core/exec/thread_idx.h" + +namespace dali { + +class DLL_PUBLIC ThreadPool : public ThisThreadIdx { + public: + virtual ~ThreadPool() = default; + + virtual void AddWork(std::function work, int64_t priority = 0) = 0; + + virtual void AddWork(std::function work, int64_t priority = 0) = 0; + + virtual void RunAll(bool wait = true) = 0; + + virtual void WaitForWork() = 0; + + virtual int NumThreads() const = 0; + + virtual std::vector GetThreadIds() const = 0; +}; + +} // namespace dali + +#endif // DALI_PIPELINE_UTIL_THREAD_POOL_INTERFACE_H_ diff --git a/dali/pipeline/util/thread_pool_test.cc b/dali/pipeline/util/thread_pool_test.cc index b5ff8e240f9..685118b3cdb 100644 --- a/dali/pipeline/util/thread_pool_test.cc +++ b/dali/pipeline/util/thread_pool_test.cc @@ -21,7 +21,7 @@ namespace dali { namespace test { TEST(ThreadPool, AddWork) { - ThreadPool tp(16, 0, false, "ThreadPool test"); + OldThreadPool tp(16, 0, false, "OldThreadPool test"); std::atomic count{0}; auto increase = [&count](int thread_id) { count++; }; for (int i = 0; i < 64; i++) { @@ -35,7 +35,7 @@ TEST(ThreadPool, AddWork) { TEST(ThreadPool, AddWorkWithPriority) { // only one thread to ensure deterministic behavior - ThreadPool tp(1, 0, false, "ThreadPool test"); + OldThreadPool tp(1, 0, false, "OldThreadPool test"); std::atomic count{0}; auto set_to_1 = [&count](int thread_id) { count = 1; @@ -60,12 +60,12 @@ TEST(ThreadPool, AddWorkWithPriority) { TEST(ThreadPool, CheckName) { - const char given_thread_pool_name[] = "ThreadPool test"; - const char full_thread_pool_name[] = "[DALI][TP0]ThreadPool test"; + const char given_thread_pool_name[] = "OldThreadPool test"; + const char full_thread_pool_name[] = "[DALI][TP0]OldThreadPool test"; // max len supported by pthread_getname_np is 16 char read_thread_pool_name[16] = {0, }; // only one thread to ensure deterministic behavior - ThreadPool tp(1, 0, false, given_thread_pool_name); + OldThreadPool tp(1, 0, false, given_thread_pool_name); auto set_name = [&read_thread_pool_name](int thread_id) { pthread_getname_np(pthread_self(), read_thread_pool_name, sizeof(read_thread_pool_name)); }; diff --git a/dali/python/backend_impl.cc b/dali/python/backend_impl.cc index 2fd0cf11759..5437c06cdf9 100644 --- a/dali/python/backend_impl.cc +++ b/dali/python/backend_impl.cc @@ -2411,7 +2411,7 @@ void ExposeThreadPool(py::module &m) { CUDA_CALL(cudaGetDevice(&dev)); device_id = dev; } - return std::make_shared(num_threads, *device_id, set_affinity, name.data()); + return std::make_shared(num_threads, *device_id, set_affinity, name.data()); }), "num_threads"_a, "device_id"_a = py::none(), diff --git a/dali/test/operators/identity_input.h b/dali/test/operators/identity_input.h index 8bdaf7dee70..813ede465f4 100644 --- a/dali/test/operators/identity_input.h +++ b/dali/test/operators/identity_input.h @@ -36,7 +36,7 @@ class IdentityInput : public InputOperator { InputOperator(spec), cpu_input_(spec.GetArgument("cpu_input")) { if constexpr (std::is_same_v) { - tp_ = std::make_unique(this->num_threads_, this->device_id_, false, + tp_ = std::make_unique(this->num_threads_, this->device_id_, false, "PassthroughInput thread pool"); } } diff --git a/dali/test/python/auto_aug/test_auto_augment.py b/dali/test/python/auto_aug/test_auto_augment.py index b6ba9e44b5a..6fb51de2840 100644 --- a/dali/test/python/auto_aug/test_auto_augment.py +++ b/dali/test/python/auto_aug/test_auto_augment.py @@ -1,4 +1,4 @@ -# Copyright (c) 2023-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# Copyright (c) 2023-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -22,7 +22,7 @@ from nose2.tools import params from nvidia.dali import fn, types -from nvidia.dali import pipeline_def +from nvidia.dali import pipeline_def, OperatorConcurrency from nvidia.dali.auto_aug import auto_augment, augmentations as a from nvidia.dali.auto_aug.core import augmentation, Policy @@ -35,6 +35,8 @@ vid_files = ["sintel_trailer-720p_2.mp4"] vid_filenames = [os.path.join(vid_dir, vid_file) for vid_file in vid_files] +concurrency = OperatorConcurrency.FULL + def mag_to_param_with_op_id(op_id): def mag_to_param(magnitude): @@ -98,7 +100,12 @@ def test_run_auto_aug(i, args): batch_size = batch_sizes[i % len(batch_sizes)] @pipeline_def( - enable_conditionals=True, batch_size=batch_size, num_threads=4, device_id=0, seed=43 + enable_conditionals=True, + batch_size=batch_size, + num_threads=4, + device_id=0, + seed=43, + concurrency=concurrency, ) def pipeline(): encoded_image, _ = fn.readers.file(name="Reader", file_root=images_dir) @@ -134,7 +141,7 @@ def setUpClass(cls): size_1 = (215, 128) size_2 = (215, 220) - @pipeline_def(batch_size=6, device_id=0, num_threads=4, seed=42) + @pipeline_def(batch_size=6, device_id=0, num_threads=4, seed=42, concurrency=concurrency) def pipeline(size): video = fn.readers.video_resize( filenames=vid_filenames, @@ -177,7 +184,12 @@ def test_uniform(self, i, args): assert device in ("gpu", "cpu") @pipeline_def( - batch_size=batch_size, device_id=0, num_threads=4, seed=205, enable_conditionals=True + batch_size=batch_size, + device_id=0, + num_threads=4, + seed=205, + enable_conditionals=True, + concurrency=concurrency, ) def pipeline(): rng = random.Random(42 + i) @@ -464,7 +476,14 @@ def yet_another_aug(data, _): def test_unused_arg_fail(): - @pipeline_def(enable_conditionals=True, batch_size=5, num_threads=4, device_id=0, seed=43) + @pipeline_def( + enable_conditionals=True, + batch_size=5, + num_threads=4, + device_id=0, + seed=43, + concurrency=concurrency, + ) def pipeline(): encoded_image, _ = fn.readers.file(name="Reader", file_root=images_dir) image = fn.decoders.image(encoded_image, device="mixed") @@ -477,7 +496,14 @@ def pipeline(): def test_empty_policy_fail(): - @pipeline_def(enable_conditionals=True, batch_size=5, num_threads=4, device_id=0, seed=43) + @pipeline_def( + enable_conditionals=True, + batch_size=5, + num_threads=4, + device_id=0, + seed=43, + concurrency=concurrency, + ) def pipeline(): encoded_image, _ = fn.readers.file(name="Reader", file_root=images_dir) image = fn.decoders.image(encoded_image, device="mixed") @@ -492,7 +518,14 @@ def pipeline(): def test_missing_shape_fail(): - @pipeline_def(enable_conditionals=True, batch_size=5, num_threads=4, device_id=0, seed=43) + @pipeline_def( + enable_conditionals=True, + batch_size=5, + num_threads=4, + device_id=0, + seed=43, + concurrency=concurrency, + ) def pipeline(): encoded_image, _ = fn.readers.file(name="Reader", file_root=images_dir) image = fn.decoders.image(encoded_image, device="mixed") diff --git a/dali/util/s3_client_manager.h b/dali/util/s3_client_manager.h index 2620d795aaa..bc9dde600ee 100644 --- a/dali/util/s3_client_manager.h +++ b/dali/util/s3_client_manager.h @@ -49,7 +49,7 @@ struct S3ClientManager { // thread if necessary). This avoids problems in initializing the dependent Common RunTime C // libraries. static void RunInitOrShutdown(std::function work) { - static ThreadPool s_thread_pool_(1, CPU_ONLY_DEVICE_ID, false, "S3ClientManager"); + static OldThreadPool s_thread_pool_(1, CPU_ONLY_DEVICE_ID, false, "S3ClientManager"); s_thread_pool_.AddWork(std::move(work)); s_thread_pool_.RunAll(); } diff --git a/include/dali/core/exec/thread_pool_base.h b/include/dali/core/exec/thread_pool_base.h index 87a90fee969..4e2b94bc905 100644 --- a/include/dali/core/exec/thread_pool_base.h +++ b/include/dali/core/exec/thread_pool_base.h @@ -58,6 +58,13 @@ class DLL_PUBLIC JobBaseFields { */ template class DLL_PUBLIC JobBase : public JobBaseFields { + public: + bool Started() const noexcept { return executor_ != nullptr; } + bool WaitStarted() const noexcept { return wait_started_; } + bool WaitCompleted() const noexcept { return wait_completed_; } + + static constexpr bool IsCooperative() { return cooperative; } + protected: JobBase() = default; ~JobBase() noexcept(false); @@ -76,8 +83,6 @@ class DLL_PUBLIC JobBase : public JobBaseFields { */ void DoNotify(); - static constexpr bool IsCooperative() { return cooperative; } - std::atomic_int num_pending_tasks_{0}; std::atomic_bool running_{false}; int total_tasks_ = 0; diff --git a/qa/TL0_python-self-test-core-newtp/test.sh b/qa/TL0_python-self-test-core-newtp/test.sh new file mode 100644 index 00000000000..f27eee0d12f --- /dev/null +++ b/qa/TL0_python-self-test-core-newtp/test.sh @@ -0,0 +1,3 @@ +#!/bin/bash -e +export DALI_USE_NEW_THREAD_POOL=1 +source ../TL0_python-self-test-core/test.sh diff --git a/qa/TL0_python-self-test-core/test.sh b/qa/TL0_python-self-test-core/test.sh index 4fa84258a0d..dd1f504f839 100644 --- a/qa/TL0_python-self-test-core/test.sh +++ b/qa/TL0_python-self-test-core/test.sh @@ -1,4 +1,3 @@ #!/bin/bash -e -export DALI_USE_EXEC2=0 bash -e ./test_nofw.sh bash -e ./test_pytorch.sh diff --git a/qa/TL1_decoder_perf/test.sh b/qa/TL1_decoder_perf/test.sh index 36cb292245d..c2077535120 100644 --- a/qa/TL1_decoder_perf/test.sh +++ b/qa/TL1_decoder_perf/test.sh @@ -5,11 +5,15 @@ target_dir=./internal_tools LOG1="dali_legacy.log" LOG2="dali_nvimgcodec.log" +LOG1_TP="dali_legacy_new_tp.log" +LOG2_TP="dali_nvimgcodec_new_tp.log" LOG1_NDD="dali_ndd_legacy.log" LOG2_NDD="dali_ndd_nvimgcodec.log" function CLEAN_AND_EXIT { rm -rf ${LOG1} rm -rf ${LOG2} + rm -rf ${LOG1_TP} + rm -rf ${LOG2_TP} rm -rf ${LOG1_NDD} rm -rf ${LOG2_NDD} exit $1 @@ -26,6 +30,8 @@ test_body() { # use taskset to avoid inefficient data migration between cores we don't want to use taskset --cpu-list 0-127 python hw_decoder_bench.py --width_hint 6000 --height_hint 6000 -b 408 -d 0 -g gpu -w 100 -t 100000 -i ${DALI_EXTRA_PATH}/db/single/jpeg -p rn50 -j 70 --hw_load 0.12 | tee ${LOG1} taskset --cpu-list 0-127 python hw_decoder_bench.py --width_hint 6000 --height_hint 6000 -b 408 -d 0 -g gpu -w 100 -t 100000 -i ${DALI_EXTRA_PATH}/db/single/jpeg -p rn50 -j 70 --hw_load 0.12 --experimental_decoder | tee ${LOG2} + DALI_USE_NEW_THREAD_POOL=1 taskset --cpu-list 0-127 python hw_decoder_bench.py --width_hint 6000 --height_hint 6000 -b 408 -d 0 -g gpu -w 100 -t 100000 -i ${DALI_EXTRA_PATH}/db/single/jpeg -p rn50 -j 70 --hw_load 0.12 | tee ${LOG1_TP} + DALI_USE_NEW_THREAD_POOL=1 taskset --cpu-list 0-127 python hw_decoder_bench.py --width_hint 6000 --height_hint 6000 -b 408 -d 0 -g gpu -w 100 -t 100000 -i ${DALI_EXTRA_PATH}/db/single/jpeg -p rn50 -j 70 --hw_load 0.12 --experimental_decoder | tee ${LOG2_TP} taskset --cpu-list 0-127 python hw_decoder_bench.py --width_hint 6000 --height_hint 6000 -b 408 -d 0 -g gpu -w 100 -t 100000 -i ${DALI_EXTRA_PATH}/db/single/jpeg -p ndd_rn50 -j 70 --hw_load 0.12 | tee ${LOG1_NDD} taskset --cpu-list 0-127 python hw_decoder_bench.py --width_hint 6000 --height_hint 6000 -b 408 -d 0 -g gpu -w 100 -t 100000 -i ${DALI_EXTRA_PATH}/db/single/jpeg -p ndd_rn50 -j 70 --hw_load 0.12 --experimental_decoder | tee ${LOG2_NDD} @@ -38,6 +44,8 @@ test_body() { # use taskset to avoid inefficient data migration between cores we don't want to use taskset --cpu-list 0-71 python hw_decoder_bench.py --width_hint 6000 --height_hint 6000 -b 408 -d 0 -g gpu -w 100 -t 100000 -i ${DALI_EXTRA_PATH}/db/single/jpeg -p rn50 -j 72 --hw_load 0.11 | tee ${LOG1} taskset --cpu-list 0-71 python hw_decoder_bench.py --width_hint 6000 --height_hint 6000 -b 408 -d 0 -g gpu -w 100 -t 100000 -i ${DALI_EXTRA_PATH}/db/single/jpeg -p rn50 -j 72 --hw_load 0.11 --experimental_decoder | tee ${LOG2} + DALI_USE_NEW_THREAD_POOL=1 taskset --cpu-list 0-71 python hw_decoder_bench.py --width_hint 6000 --height_hint 6000 -b 408 -d 0 -g gpu -w 100 -t 100000 -i ${DALI_EXTRA_PATH}/db/single/jpeg -p rn50 -j 72 --hw_load 0.11 | tee ${LOG1_TP} + DALI_USE_NEW_THREAD_POOL=1 taskset --cpu-list 0-71 python hw_decoder_bench.py --width_hint 6000 --height_hint 6000 -b 408 -d 0 -g gpu -w 100 -t 100000 -i ${DALI_EXTRA_PATH}/db/single/jpeg -p rn50 -j 72 --hw_load 0.11 --experimental_decoder | tee ${LOG2_TP} taskset --cpu-list 0-71 python hw_decoder_bench.py --width_hint 6000 --height_hint 6000 -b 408 -d 0 -g gpu -w 100 -t 100000 -i ${DALI_EXTRA_PATH}/db/single/jpeg -p ndd_rn50 -j 72 --hw_load 0.11 | tee ${LOG1_NDD} taskset --cpu-list 0-71 python hw_decoder_bench.py --width_hint 6000 --height_hint 6000 -b 408 -d 0 -g gpu -w 100 -t 100000 -i ${DALI_EXTRA_PATH}/db/single/jpeg -p ndd_rn50 -j 72 --hw_load 0.11 --experimental_decoder | tee ${LOG2_NDD} fi @@ -82,17 +90,21 @@ test_body() { PERF_RESULT2_NDD=$(perf_check "${LOG2_NDD}" "$MIN_PERF2_NDD") PERF_RESULT3=$(perf_check "${LOG2}" "$(extract_perf "${LOG1}")" 5) PERF_RESULT3_NDD=$(perf_check "${LOG2_NDD}" "$(extract_perf "${LOG1_NDD}")" 5) + PERF_RESULT1_TP=$(perf_check "${LOG1_TP}" "$(extract_perf "${LOG1}")" 2) + PERF_RESULT2_TP=$(perf_check "${LOG2_TP}" "$(extract_perf "${LOG2}")" 2) echo "PERF_RESULT1=${PERF_RESULT1}" echo "PERF_RESULT2=${PERF_RESULT2}" echo "PERF_RESULT3=${PERF_RESULT3}" + echo "PERF_RESULT1_TP=${PERF_RESULT1_TP}" + echo "PERF_RESULT2_TP=${PERF_RESULT2_TP}" echo "PERF_RESULT1_NDD=${PERF_RESULT1_NDD}" echo "PERF_RESULT2_NDD=${PERF_RESULT2_NDD}" echo "PERF_RESULT3_NDD=${PERF_RESULT3_NDD}" # if [[ "$PERF_RESULT1" == "OK" && "$PERF_RESULT2" == "OK" && "$PERF_RESULT3" == "OK" && "$PERF_RESULT1_NDD" == "OK" && "$PERF_RESULT2_NDD" == "OK" && "$PERF_RESULT3_NDD" == "OK" ]]; then # don't check experimental decoder performance with dynamic mode - if [[ "$PERF_RESULT1" == "OK" && "$PERF_RESULT2" == "OK" && "$PERF_RESULT3" == "OK" && "$PERF_RESULT1_NDD" == "OK" ]]; then + if [[ "$PERF_RESULT1" == "OK" && "$PERF_RESULT2" == "OK" && "$PERF_RESULT1_TP" == "OK" && "$PERF_RESULT2_TP" == "OK" && "$PERF_RESULT3" == "OK" && "$PERF_RESULT1_NDD" == "OK" ]]; then CLEAN_AND_EXIT 0 else CLEAN_AND_EXIT 1