Skip to content

New ThreadPool + thread pool facade#6224

Open
mzient wants to merge 12 commits intoNVIDIA:mainfrom
mzient:ThreadPoolFacade
Open

New ThreadPool + thread pool facade#6224
mzient wants to merge 12 commits intoNVIDIA:mainfrom
mzient:ThreadPoolFacade

Conversation

@mzient
Copy link
Contributor

@mzient mzient commented Feb 23, 2026

Category:

Refactoring (Redesign of existing code that doesn't affect functionality)

Description:

This change does the following:

  • it extracts an interface from the old ThreadPool (now called ThreadPool)
  • it renames ThreadPool to OldThreadPool
  • it adds NewThreadPool and a ThreadPool Facade that aggregates a pointer to a NewThreadPool and Job object
  • it enables the use of either OldThreadPool or (via the facade) NewThreadPool in the new executor
  • it adds an environment variable DALI_USE_NEW_THREAD_POOL, which can be used to enable the new thread pool
  • it extends the qa tests with running python core tests with the new thread pool
  • it extends decoder perf test - there's a 2% margin for the new thread pool vs old in respective decoders (legacy and ndvimgcodec)

Additional information:

Affected modules and functionalities:

Key points relevant for the review:

Tests:

New qa tests script

  • Existing tests apply
  • New tests added
    • Python tests
    • GTests
    • Benchmark
    • Other
  • N/A

Checklist

Documentation

  • Existing documentation applies
  • Documentation updated
    • Docstring
    • Doxygen
    • RST
    • Jupyter
    • Other
  • N/A

DALI team only

Requirements

  • Implements new requirements
  • Affects existing requirements
  • N/A

REQ IDs: N/A

JIRA TASK: N/A

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [44656169]: BUILD STARTED

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [44667089]: BUILD STARTED

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [44667089]: BUILD FAILED

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [44656169]: BUILD PASSED

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [44698015]: BUILD STARTED

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [44698471]: BUILD STARTED

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [44717682]: BUILD STARTED

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [44717682]: BUILD FAILED

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [44719838]: BUILD STARTED

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [44698471]: BUILD FAILED

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [44719838]: BUILD FAILED

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [44719838]: BUILD PASSED

@mzient mzient force-pushed the ThreadPoolFacade branch 4 times, most recently from 193e754 to e485702 Compare March 5, 2026 14:47
@mzient mzient changed the title Thread pool facade New ThreadPool + thread pool facade Mar 5, 2026
@dali-automaton
Copy link
Collaborator

CI MESSAGE: [45440403]: BUILD STARTED

@mzient mzient marked this pull request as ready for review March 5, 2026 16:34
@greptile-apps
Copy link
Contributor

greptile-apps bot commented Mar 5, 2026

Greptile Summary

This PR introduces a ThreadPool abstract interface, renames the old concrete class to OldThreadPool, adds a new NewThreadPool (backed by ThreadPoolBase) alongside a ThreadPoolFacade adapter, and wires them into the exec2 executor behind the DALI_USE_NEW_THREAD_POOL environment variable. The structural refactoring is sound, but the ThreadPoolFacade has a correctness gap in its RunAll(wait=false) path.

Key findings:

  • Logic bug — ThreadPoolFacade::RunAll(false) leaves job_ in an invalid state: When RunAll(false) is called, the underlying Job is submitted to the thread pool (setting its internal executor_ pointer to non-null), but job_ is NOT reset because the AtScopeExit guard only fires on wait=true. Any subsequent AddWork call will then hit Job::AddTask's guard (executor_ != nullptr) and throw std::logic_error. More critically, if the facade is ever destroyed without a following WaitForWork(), the destructor's own RunAll() also calls job_->Run() on the already-started job, which throws inside a noexcept destructor and invokes std::terminate(). The RunAll(false) pattern is actively used by image_decoder.h (lines 423 and 772) via the abstract ThreadPool* interface, so a ThreadPoolFacade will receive these calls at runtime.
  • Style — typo "Executorv_v2" in thread pool name: Present on both the NewThreadPool and OldThreadPool creation paths in exec2.cc; should be "Executor_v2".
  • The std::cerr debug print introduced in exec2.cc (line 354) has been noted in a prior review thread and should also be removed.

Confidence Score: 2/5

  • Not safe to merge without fixing the ThreadPoolFacade::RunAll(false) logic bug, which can cause std::terminate() in a destructor and breaks the ThreadPool interface contract.
  • The core RunAll(wait=false) path in ThreadPoolFacade does not properly reset job_ after launching it, which violates the ThreadPool interface contract and creates a std::terminate() risk in the noexcept destructor. The RunAll(false) pattern is exercised at runtime via image_decoder.h when DALI_USE_NEW_THREAD_POOL=1.
  • dali/pipeline/util/new_thread_pool.cc (RunAll logic) and dali/pipeline/executor/executor2/exec2.cc (debug cerr, typo).

Important Files Changed

Filename Overview
dali/pipeline/util/new_thread_pool.h Introduces NewThreadPool (wraps ThreadPoolBase) and ThreadPoolFacade (adapts ThreadPoolBase to the ThreadPool interface); RunAll(false) path in the facade has a correctness issue.
dali/pipeline/util/new_thread_pool.cc Implements NewThreadPool and ThreadPoolFacade; RunAll(wait=false) leaves job_ pointing at an already-started Job, causing AddWork and the destructor to throw, with the latter resulting in std::terminate() since the destructor is noexcept.
dali/pipeline/util/thread_pool_interface.h New abstract ThreadPool interface extracted from the old concrete class; clean and straightforward.
dali/pipeline/util/thread_pool.h Old ThreadPool renamed to OldThreadPool and made to implement the new ThreadPool interface; mechanical rename with no logic changes.
dali/pipeline/executor/executor2/exec2.cc Adds SetupThreadPool() to switch between OldThreadPool and NewThreadPool+ThreadPoolFacade based on env var; contains a debug std::cerr (flagged by prior review) and a typo "Executorv_v2" in thread pool names on both paths.
dali/pipeline/util/thread_pool_test.cc Updated to use OldThreadPool directly; existing tests unchanged in semantics.
qa/TL1_decoder_perf/test.sh Adds new-thread-pool benchmark runs and corresponding PERF_RESULT_TP checks; log variable assignments and echo labels are correct in the final version.
qa/TL0_python-self-test-core-newtp/test.sh New QA test that re-runs the core Python self-tests with DALI_USE_NEW_THREAD_POOL=1; trivial wrapper script, looks correct.
dali/operators/imgcodec/image_decoder.h Uses tp_->RunAll(false) + tp_->WaitForWork() pattern via the abstract ThreadPool* interface; currently safe in practice, but will silently break if a ThreadPoolFacade is used and AddWork is called between the two calls.

Sequence Diagram

sequenceDiagram
    participant Op as Operator (image_decoder.h)
    participant Facade as ThreadPoolFacade
    participant Job as Job (std::optional)
    participant TP as NewThreadPool (ThreadPoolBase)

    Op->>Facade: AddWork(task1)
    Facade->>Job: emplace() if empty
    Facade->>Job: AddTask(task1)

    Op->>Facade: RunAll(wait=false)
    Facade->>Job: Run(*tp_, false)
    Note over Job: executor_ set to &tp<br/>tasks submitted to TP
    Job-->>TP: BeginBulkAdd / Submit tasks
    Note over Facade: ⚠️ job_ NOT reset (wait=false)<br/>executor_ is now non-null

    Op->>Op: do other work (e.g. last task in caller thread)

    Op->>Facade: WaitForWork()
    Facade->>Job: Wait()
    Note over Job: Blocks until all tasks done
    Facade->>Job: job_.reset()  ✅ safe here

    Note over Facade: --- BUG SCENARIO ---
    Op->>Facade: AddWork(task2)  [after RunAll(false), before WaitForWork]
    Facade->>Job: AddTask(task2)
    Note over Job: ❌ throws std::logic_error<br/>"job already started"

    Note over Facade: --- DESTRUCTOR BUG ---
    Facade->>Facade: ~ThreadPoolFacade() → RunAll(true)
    Facade->>Job: Run(*tp_, true)  [job_ still set from RunAll(false)]
    Note over Job: ❌ throws in noexcept destructor<br/>→ std::terminate()
Loading

Last reviewed commit: 8709e96

Comment on lines +353 to +355
if (UseNewThreadPool()) {
std::cerr << "\n!!! Forced use of NewThreadPool !!!" << std::endl;
if (graph_info_.num_cpu > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debug std::cerr left in production code

This diagnostic print was presumably added during development but should not be shipped:

Suggested change
if (UseNewThreadPool()) {
std::cerr << "\n!!! Forced use of NewThreadPool !!!" << std::endl;
if (graph_info_.num_cpu > 0) {
if (UseNewThreadPool()) {
if (graph_info_.num_cpu > 0) {

Remove the std::cerr line entirely — it will spam every user's stderr whenever they set DALI_USE_NEW_THREAD_POOL=1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@greptileai This will be removed before merging, but is necessary to validate that the flag is properly set and used in CI.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it! That makes sense for CI validation. Consider adding a TODO comment on that line so it's clear it needs to be removed:

Suggested change
if (UseNewThreadPool()) {
std::cerr << "\n!!! Forced use of NewThreadPool !!!" << std::endl;
if (graph_info_.num_cpu > 0) {
// TODO: Remove before merge - only for CI validation
std::cerr << "\n!!! Forced use of NewThreadPool !!!" << std::endl;

This way it won't accidentally slip through code review.

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [45440403]: BUILD FAILED

mzient added 5 commits March 9, 2026 11:17
Signed-off-by: Michał Zientkiewicz <mzient@gmail.com>
Signed-off-by: Michał Zientkiewicz <mzient@gmail.com>
Signed-off-by: Michał Zientkiewicz <mzient@gmail.com>
Signed-off-by: Michał Zientkiewicz <mzient@gmail.com>
Signed-off-by: Michał Zientkiewicz <mzient@gmail.com>
mzient and others added 6 commits March 9, 2026 11:17
----
Signed-off-by: Michał Zientkiewicz <michalz@nvidia.com>
Signed-off-by: Michał Zientkiewicz <mzient@gmail.com>
Signed-off-by: Michal Zientkiewicz <michalz@nvidia.com>
Signed-off-by: Michal Zientkiewicz <michalz@nvidia.com>
Signed-off-by: Michal Zientkiewicz <michalz@nvidia.com>
Signed-off-by: Michal Zientkiewicz <michalz@nvidia.com>
@mzient mzient force-pushed the ThreadPoolFacade branch from 85e5c27 to a9ec7fa Compare March 9, 2026 10:17
Signed-off-by: Michal Zientkiewicz <michalz@nvidia.com>
@dali-automaton
Copy link
Collaborator

CI MESSAGE: [45700206]: BUILD STARTED

Comment on lines +98 to +106
void ThreadPoolFacade::RunAll(bool wait) {
if (job_) {
auto atexit = AtScopeExit([&]() {
if (wait)
job_.reset();
});
job_->Run(*tp_, wait);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RunAll(false) leaves job_ in an unusable state

When RunAll(wait=false) is called, the AtScopeExit lambda only resets job_ when wait=true (line 101–102). This means job_ is left pointing to a Job whose executor_ has been set by the call to job_->Run(*tp_, false) on line 104.

This causes two downstream failures:

  1. Subsequent AddWork calls throw: Job::AddTask (in thread_pool_base.cc:114) unconditionally throws std::logic_error("This job has already been started - cannot add more tasks to it") when executor_ != nullptr. The OldThreadPool contract explicitly allows more work to be enqueued after RunAll(false), so callers that rely on this (a legitimate use of the ThreadPool interface) will get an unexpected exception.

  2. Destructor calls std::terminate(): The destructor calls RunAll() (wait=true). If job_ is still set from a previous RunAll(false), this calls job_->Run(*tp_, true) on an already-started job, which throws std::logic_error("This job has already been started.") (see thread_pool_base.cc:85–86). Since the destructor is declared noexcept, the exception propagates to std::terminate().

This is actively relevant: image_decoder.h calls tp_->RunAll(false) at lines 423 and 772, where tp_ is ThreadPool* — which will be a ThreadPoolFacade when DALI_USE_NEW_THREAD_POOL=1 is set.

The fix is to track the in-flight job separately (e.g. in a std::optional<Job> running_job_) and reset job_ unconditionally after Run, so that new work always goes into a fresh Job:

void ThreadPoolFacade::RunAll(bool wait) {
  if (job_) {
    // Move the current job into a separate slot so new AddWork calls get a fresh job
    auto running = std::move(job_);  // job_ is now nullopt
    running->Run(*tp_, wait);        // submit to thread pool
    if (wait) {
      // Wait() already called inside Run when wait=true - nothing to do
    }
    // If !wait, running_job_ should be retained until WaitForWork is called
  }
}

A complete solution would need a running_job_ member to support WaitForWork() after RunAll(false).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@greptileai RunAll(false) must be followed by a call to WaitForWork.
When calling RunAll(false), the job will be retained (see the condition in the lambda passed to AtScopeExit) for use in a subsequent call to WaitForWork. Not calling WaitForWork before the facade is destroyed is an error and rightfully throws an exception.
Whan calling RunAll(true), the job_ is destroyed at scope exit, enabling scheduling more work afterwards.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the clarification. I see now - the contract requires WaitForWork() after RunAll(false), and the existing code in image_decoder.h already follows this correctly (lines 423-426: RunAll(false)setup_task(-1)WaitForWork()). The destructor exception is intentional to catch misuse.

My original comment was based on incorrectly assuming the new thread pool should have the same fire-and-forget semantics as OldThreadPool for RunAll(false). The explicit WaitForWork() requirement makes the async contract clearer and is working as designed.

config_.thread_pool_threads,
config_.device.value_or(CPU_ONLY_DEVICE_ID),
config_.set_affinity,
"Executorv_v2");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in thread pool name — double v

The thread pool is named "Executorv_v2" in both the NewThreadPool and OldThreadPool creation paths, which contains a double v. This name appears in thread names visible in profiling tools (nsys, perf, etc.) and log messages, so a typo here is noticeable. It should be "Executor_v2".

Suggested change
"Executorv_v2");
"Executor_v2");

config_.thread_pool_threads,
config_.device.value_or(CPU_ONLY_DEVICE_ID),
config_.set_affinity,
"Executorv_v2");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same typo on the OldThreadPool path

Suggested change
"Executorv_v2");
"Executor_v2");

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [45700206]: BUILD FAILED

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants