Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dali/benchmark/displacement_cpu_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ void DisplacementBench(benchmark::State& st) {//NOLINT
}

// We need a thread pool
ThreadPool tp(4, 0, false, "DisplacementBench");
OldThreadPool tp(4, 0, false, "DisplacementBench");

// Create workspace and set input and output
Workspace ws;
Expand Down
2 changes: 1 addition & 1 deletion dali/benchmark/operator_bench.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class OperatorBench : public DALIBenchmark {
// Create workspace and set input and output
Workspace ws;
ws.AddInput(data_in);
ThreadPool tp(num_threads, 0, false, "OperatorBench");
OldThreadPool tp(num_threads, 0, false, "OperatorBench");
ws.SetThreadPool(&tp);

Setup<TensorList<CPUBackend>>(op_ptr, op_spec, ws, batch_size);
Expand Down
2 changes: 1 addition & 1 deletion dali/benchmark/thread_pool_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ BENCHMARK_DEFINE_F(ThreadPoolBench, AddWorkDeferred)(benchmark::State& st) {
int work_size_max = st.range(2);
int nthreads = st.range(3);

ThreadPool thread_pool(nthreads, 0, false, "ThreadPoolBench");
OldThreadPool thread_pool(nthreads, 0, false, "ThreadPoolBench");
std::vector<uint8_t> data(2000, 0xFF);

std::atomic<int64_t> total_count(0);
Expand Down
2 changes: 1 addition & 1 deletion dali/kernels/test/scatter_gather_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ class ScatterGatherTest : public testing::Test {
this->template Memset<kind>(out_ptr.get(), 0, out.size());

T sg(max_block);
ThreadPool tp(4, 0, false, "test TP");
OldThreadPool tp(4, 0, false, "test TP");
// copy
for (auto &r : ranges)
sg.AddCopy(r.dst, r.src, r.size);
Expand Down
4 changes: 2 additions & 2 deletions dali/operators/decoder/nvjpeg/nvjpeg_decoder_decoupled_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -1150,8 +1150,8 @@ class nvJPEGDecoder : public StatelessOperator<MixedBackend>, CachedDecoderImpl
nvjpegDevAllocator_t device_allocator_;
nvjpegPinnedAllocator_t pinned_allocator_;

ThreadPool thread_pool_;
ThreadPool nvjpeg2k_thread_;
OldThreadPool thread_pool_;
OldThreadPool nvjpeg2k_thread_;
static constexpr int kOutputDim = 3;

TensorList<CPUBackend> hw_decoder_images_staging_;
Expand Down
2 changes: 1 addition & 1 deletion dali/operators/imgcodec/image_decoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ class ImageDecoder : public StatelessOperator<Backend> {
GetDecoderSpecificArguments(spec);

if (std::is_same<MixedBackend, Backend>::value) {
thread_pool_ = std::make_unique<ThreadPool>(num_threads_, device_id_,
thread_pool_ = std::make_unique<OldThreadPool>(num_threads_, device_id_,
spec.GetArgument<bool>("affine"), "MixedDecoder");
if (spec_.HasArgument("cache_size"))
cache_ = std::make_unique<CachedDecoderImpl>(spec_);
Expand Down
6 changes: 3 additions & 3 deletions dali/operators/reader/fits_reader_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ This operator can be used in the following modes:
2. Read file names from a text file indicated in `file_list` argument.
3. Read files listed in `files` argument.
4. Number of outputs per sample corresponds to the length of `hdu_indices` argument. By default,
first HDU with data is read from each file, so the number of outputs defaults to 1.
first HDU with data is read from each file, so the number of outputs defaults to 1.
)")
.NumInput(0)
.OutputFn(detail::FitsReaderOutputFn)
Expand Down Expand Up @@ -94,7 +94,7 @@ If `file_root` is provided, the paths are treated as being relative to it.
This argument is mutually exclusive with `file_list`.)",
nullptr)
.AddOptionalArg("hdu_indices",
R"(HDU indices to read. If not provided, the first HDU after the primary
R"(HDU indices to read. If not provided, the first HDU after the primary
will be yielded. Since HDUs are indexed starting from 1, the default value is as follows: hdu_indices = [2].
Size of the provided list hdu_indices defines number of outputs per sample.)",
std::vector<int>{2})
Expand All @@ -114,7 +114,7 @@ void FitsReaderCPU::RunImpl(Workspace &ws) {
auto &output = ws.Output<CPUBackend>(output_idx);
for (int file_idx = 0; file_idx < num_samples; file_idx++) {
auto &sample = GetSample(file_idx);
ThreadPool::Work copy_task = [output_idx = output_idx, data_idx = file_idx, &output,
auto copy_task = [output_idx = output_idx, data_idx = file_idx, &output,
&sample](int) {
std::memcpy(output.raw_mutable_tensor(data_idx), sample.data[output_idx].raw_data(),
sample.data[output_idx].nbytes());
Expand Down
2 changes: 1 addition & 1 deletion dali/operators/reader/nemo_asr_reader_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class NemoAsrReader : public DataReader<CPUBackend, AsrSample, AsrSample, true>
DALIDataType dtype_;

int num_threads_;
ThreadPool thread_pool_;
OldThreadPool thread_pool_;

// prefetch_depth * batch_size set of buffers that we reuse to decode audio
using TensorListPtr = std::unique_ptr<TensorList<CPUBackend>>;
Expand Down
2 changes: 1 addition & 1 deletion dali/operators/reader/numpy_reader_gpu_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class NumpyReaderGPU : gds::GDSLazyInit, public NumpyReader<GPUBackend, NumpyFil
protected:
// we need to do the threading manually because gpu workspaces
// do not have a thread pool
ThreadPool thread_pool_;
OldThreadPool thread_pool_;

vector<TensorList<GPUBackend>> prefetched_batch_tensors_;

Expand Down
4 changes: 2 additions & 2 deletions dali/operators/reader/numpy_reader_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,8 @@ class NumpyReaderCPU : public NumpyReader<CPUBackend, NumpyFileWrapper> {
*/
size_t o_direct_alignm_ = 0;
size_t o_direct_read_len_alignm_ = 0;
// ThreadPool for prefetch which is a separate thread
ThreadPool thread_pool_;
// Thread Pool for prefetch which is a separate thread
OldThreadPool thread_pool_;
};

} // namespace dali
Expand Down
4 changes: 2 additions & 2 deletions dali/operators/reader/tfrecord_reader_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ class TFRecordReader
bool dont_use_mmap_ = false;
bool use_o_direct_ = false;
size_t o_direct_chunk_size_ = 0;
// ThreadPool for prefetch which is a separate thread
ThreadPool thread_pool_;
// Thread Pool for prefetch which is a separate thread
OldThreadPool thread_pool_;
};

} // namespace dali
Expand Down
2 changes: 1 addition & 1 deletion dali/operators/reader/webdataset_reader_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ void WebdatasetReader::RunImpl(Workspace &ws) {
auto& output = ws.Output<CPUBackend>(output_idx);
for (int data_idx = 0; data_idx < num_samples; data_idx++) {
auto& sample = GetSample(data_idx);
ThreadPool::Work copy_task = [output_idx = output_idx, data_idx = data_idx, &output,
auto copy_task = [output_idx = output_idx, data_idx = data_idx, &output,
&sample](int) {
output.SetMeta(data_idx, sample[output_idx].GetMeta());
std::memcpy(output.raw_mutable_tensor(data_idx), sample[output_idx].raw_data(),
Expand Down
2 changes: 1 addition & 1 deletion dali/operators/video/decoder/video_decoder_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class DLL_PUBLIC VideoDecoderBase : public Operator<Backend> {
if (spec_.HasArgument("device")) {
auto device_str = spec_.template GetArgument<std::string>("device");
if (device_str == "mixed") {
thread_pool_ = std::make_unique<ThreadPool>(
thread_pool_ = std::make_unique<OldThreadPool>(
spec.GetArgument<int>("num_threads"), spec.GetArgument<int>("device_id"),
spec.GetArgument<bool>("affine"), "VideoDecoder");
}
Expand Down
11 changes: 7 additions & 4 deletions dali/operators/video/input/video_input.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,11 @@ class VideoInput : public InputOperator<Backend> {
last_sequence_policy_ == "partial" || last_sequence_policy_ == "pad",
make_string("Provided `last_sequence_policy` is not supported: ", last_sequence_policy_));
if constexpr (!is_cpu) {
thread_pool_.emplace(this->num_threads_, spec.GetArgument<int>("device_id"),
spec.GetArgument<bool>("affine"), "VideoInput<MixedBackend>");
thread_pool_ = std::make_unique<OldThreadPool>(
this->num_threads_,
spec.GetArgument<int>("device_id"),
spec.GetArgument<bool>("affine"),
"VideoInput<MixedBackend>");
}
}

Expand Down Expand Up @@ -208,7 +211,7 @@ class VideoInput : public InputOperator<Backend> {
if constexpr (is_cpu) {
return ws.GetThreadPool();
} else {
assert(thread_pool_.has_value());
assert(thread_pool_);
return *thread_pool_;
}
}
Expand Down Expand Up @@ -243,7 +246,7 @@ class VideoInput : public InputOperator<Backend> {
uint8_t pad_frame_value_ = 0;

/// CPU operators have default Thread Pool inside Workspace. Mixed and GPU ops don't.
std::optional<ThreadPool> thread_pool_ = std::nullopt;
std::unique_ptr<ThreadPool> thread_pool_;

std::vector<std::unique_ptr<FramesDecoderImpl>> frames_decoders_;

Expand Down
54 changes: 38 additions & 16 deletions dali/pipeline/executor/executor2/exec2.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// Copyright (c) 2024-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -24,6 +24,7 @@
#include "dali/pipeline/executor/executor2/exec_graph.h"
#include "dali/pipeline/executor/executor2/stream_assignment.h"
#include "dali/pipeline/operator/builtin/input_operator.h"
#include "dali/pipeline/util/new_thread_pool.h"

namespace dali {
namespace exec2 {
Expand All @@ -42,9 +43,9 @@ void LimitBackendConcurrency(ExecGraph &graph, OpType backend, int max_concurren
void ApplyConcurrencyLimit(ExecGraph &graph, OperatorConcurrency concurrency) {
switch (concurrency) {
case OperatorConcurrency::Full:
// TODO(michalz): Fix ThreadPool.
LimitBackendConcurrency(graph, OpType::CPU);
break; // other operators have no restrictions
if (!UseNewThreadPool()) // old thread pool is not thread safe
LimitBackendConcurrency(graph, OpType::CPU);
Comment on lines 45 to +47
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clarify why concurrency policy and thread pool setup calls both use UseNewThreadPool()

Both ApplyConcurrencyLimit (line 117) and SetupThreadPool() (line 119) call UseNewThreadPool(), relying on the fact that it evaluates its static bool only once. This subtle coupling is worth documenting:

Suggested change
case OperatorConcurrency::Full:
// TODO(michalz): Fix ThreadPool.
LimitBackendConcurrency(graph, OpType::CPU);
break; // other operators have no restrictions
if (!UseNewThreadPool()) // old thread pool is not thread safe
LimitBackendConcurrency(graph, OpType::CPU);
case OperatorConcurrency::Full:
// NewThreadPool's Job-based scheduling is thread-safe, so concurrent CPU operators
// are safe. OldThreadPool is not thread-safe and must be serialized. Because
// UseNewThreadPool() evaluates its static bool only once, both this check and the
// call to SetupThreadPool() will always agree on which pool to use.
if (!UseNewThreadPool())
LimitBackendConcurrency(graph, OpType::CPU);
break;

break;
case OperatorConcurrency::Backend:
LimitBackendConcurrency(graph, OpType::CPU);
LimitBackendConcurrency(graph, OpType::GPU);
Expand Down Expand Up @@ -345,18 +346,37 @@ class Executor2::Impl {
}

void SetupThreadPool() {
if (graph_info_.num_cpu > 0) {
tp_ = std::make_unique<ThreadPool>(
config_.thread_pool_threads,
config_.device.value_or(CPU_ONLY_DEVICE_ID),
config_.set_affinity,
"Executorv_v2");
thread_pool_wrappers_.clear();
new_tp_.reset();
old_tp_.reset();

if (UseNewThreadPool()) {
std::cerr << "\n!!! Forced use of NewThreadPool !!!" << std::endl;
if (graph_info_.num_cpu > 0) {
Comment on lines +353 to +355
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debug std::cerr left in production code

This diagnostic print was presumably added during development but should not be shipped:

Suggested change
if (UseNewThreadPool()) {
std::cerr << "\n!!! Forced use of NewThreadPool !!!" << std::endl;
if (graph_info_.num_cpu > 0) {
if (UseNewThreadPool()) {
if (graph_info_.num_cpu > 0) {

Remove the std::cerr line entirely — it will spam every user's stderr whenever they set DALI_USE_NEW_THREAD_POOL=1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@greptileai This will be removed before merging, but is necessary to validate that the flag is properly set and used in CI.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it! That makes sense for CI validation. Consider adding a TODO comment on that line so it's clear it needs to be removed:

Suggested change
if (UseNewThreadPool()) {
std::cerr << "\n!!! Forced use of NewThreadPool !!!" << std::endl;
if (graph_info_.num_cpu > 0) {
// TODO: Remove before merge - only for CI validation
std::cerr << "\n!!! Forced use of NewThreadPool !!!" << std::endl;

This way it won't accidentally slip through code review.

new_tp_ = std::make_unique<NewThreadPool>(
config_.thread_pool_threads,
config_.device.value_or(CPU_ONLY_DEVICE_ID),
config_.set_affinity,
"Executor_v2");
}
for (auto &n : graph_.Nodes()) {
if (n.backend == OpType::CPU) {
thread_pool_wrappers_.push_back(std::make_unique<ThreadPoolFacade>(new_tp_.get()));
n.env.thread_pool = thread_pool_wrappers_.back().get();
}
}
} else {
tp_.reset();
}
for (auto &n : graph_.Nodes()) {
if (n.backend == OpType::CPU)
n.env.thread_pool = tp_.get();
if (graph_info_.num_cpu > 0) {
old_tp_ = std::make_unique<OldThreadPool>(
config_.thread_pool_threads,
config_.device.value_or(CPU_ONLY_DEVICE_ID),
config_.set_affinity,
"Executor_v2");
}
for (auto &n : graph_.Nodes()) {
if (n.backend == OpType::CPU)
n.env.thread_pool = old_tp_.get();
}
}
}

Expand Down Expand Up @@ -421,7 +441,9 @@ class Executor2::Impl {

// Runtime environment

std::unique_ptr<ThreadPool> tp_;
std::unique_ptr<OldThreadPool> old_tp_;
std::unique_ptr<NewThreadPool> new_tp_;
std::vector<std::unique_ptr<ThreadPool>> thread_pool_wrappers_;
std::queue<tasking::TaskFuture> pending_outputs_;
std::vector<CUDAStreamLease> streams_;
std::map<std::string, ExecNode *, std::less<>> node_map_;
Expand Down
12 changes: 6 additions & 6 deletions dali/pipeline/executor/executor2/exec_graph_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ TEST(ExecGraphTest, SimpleGraph) {
LimitBackendConcurrency(g, OpType::CPU);

WorkspaceParams params = {};
auto tp = std::make_unique<ThreadPool>(std::thread::hardware_concurrency(), 0, false, "test");
auto tp = std::make_unique<OldThreadPool>(std::thread::hardware_concurrency(), 0, false, "test");
ExecEnv env;
env.thread_pool = tp.get();
params.env = &env;
Expand Down Expand Up @@ -144,7 +144,7 @@ TEST(ExecGraphTest, SimpleGraphRepeat) {
g.Link(n1, 0, n2, 1);
g.Link(n2, 0, no, 0);
LimitBackendConcurrency(g, OpType::CPU);
ThreadPool tp(4, 0, false, "test");
OldThreadPool tp(4, 0, false, "test");
WorkspaceParams params = {};
ExecEnv env;
env.thread_pool = &tp;
Expand Down Expand Up @@ -212,7 +212,7 @@ TEST(ExecGraphTest, SimpleGraphScheduleAheadCPU) {
g.Link(n2, 0, no, 0);
LimitBackendConcurrency(g, OpType::CPU);

ThreadPool tp(4, 0, false, "test");
OldThreadPool tp(4, 0, false, "test");
WorkspaceParams params = {};
ExecEnv env;
env.thread_pool = &tp;
Expand Down Expand Up @@ -306,7 +306,7 @@ TEST(ExecGraphTest, GraphScheduleAheadGPU) {
n2->env.order = s2;
no->env.order = s3;

ThreadPool tp(4, 0, false, "test");
OldThreadPool tp(4, 0, false, "test");

n1->env.thread_pool = &tp;

Expand Down Expand Up @@ -379,7 +379,7 @@ TEST(ExecGraphTest, Exception) {
g.Link(n1, 0, n2, 1);
g.Link(n2, 0, no, 0);
LimitBackendConcurrency(g, OpType::CPU);
ThreadPool tp(std::thread::hardware_concurrency(), 0, false, "test");
OldThreadPool tp(std::thread::hardware_concurrency(), 0, false, "test");
WorkspaceParams params = {};
ExecEnv env;
env.thread_pool = &tp;
Expand Down Expand Up @@ -460,7 +460,7 @@ TEST(ExecGraphTest, LoweredExec) {
g.Lower(def);
LimitBackendConcurrency(g, OpType::CPU);

ThreadPool tp(std::thread::hardware_concurrency(), 0, false, "test");
OldThreadPool tp(std::thread::hardware_concurrency(), 0, false, "test");
WorkspaceParams params = {};
ExecEnv env;
env.thread_pool = &tp;
Expand Down
2 changes: 1 addition & 1 deletion dali/pipeline/executor/executor_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ class DLL_PUBLIC Executor : public ExecutorBase, public QueuePolicy {

OpGraph *graph_ = nullptr;
EventPool event_pool_;
ThreadPool thread_pool_;
OldThreadPool thread_pool_;
std::vector<ErrorInfo> errors_;
mutable std::mutex errors_mutex_;
bool exec_error_;
Expand Down
6 changes: 3 additions & 3 deletions dali/pipeline/operator/eager_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ class DLL_PUBLIC EagerOperator {
DLL_PUBLIC inline static void UpdateThreadPool(int num_threads) {
std::lock_guard lock(shared_thread_pool_mutex_);

SharedThreadPoolInstance().reset(
new ThreadPool(num_threads, CPU_ONLY_DEVICE_ID, false, "EagerOperator"));
SharedThreadPoolInstance() = std::make_unique<OldThreadPool>(
num_threads, CPU_ONLY_DEVICE_ID, false, "EagerOperator");
}

// Update shared CUDA stream used for all direct operators.
Expand Down Expand Up @@ -170,7 +170,7 @@ class DLL_PUBLIC EagerOperator {
}

static inline std::shared_ptr<ThreadPool> &SharedThreadPoolInstance() {
static std::shared_ptr<ThreadPool> thread_pool = std::make_shared<ThreadPool>(
static std::shared_ptr<ThreadPool> thread_pool = std::make_shared<OldThreadPool>(
GetDefaultNumThreads(), CPU_ONLY_DEVICE_ID, false, "EagerOperator");

return thread_pool;
Expand Down
2 changes: 1 addition & 1 deletion dali/pipeline/operator/false_gpu_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ class FalseGPUOperator : public Operator<GPUBackend> {

private:
CPUOperator cpu_impl_;
ThreadPool thread_pool_;
OldThreadPool thread_pool_;
Workspace cpu_ws_;

// Keep it here so that we can modify (ws gives only const ref to inputs)
Expand Down
2 changes: 1 addition & 1 deletion dali/pipeline/pipeline_debug.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class DLL_PUBLIC PipelineDebug {
int device_id_;
int num_threads_;
CUDAStreamLease cuda_stream_;
ThreadPool thread_pool_;
OldThreadPool thread_pool_;
std::unordered_map<int, EagerOperator<CPUBackend>> cpu_operators_;
std::unordered_map<int, EagerOperator<GPUBackend>> gpu_operators_;
std::unordered_map<int, EagerOperator<MixedBackend>> mixed_operators_;
Expand Down
Loading