Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/runtime/minidump.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ bool Minidump::_minidump_cb(const google_breakpad::MinidumpDescriptor& descripto
}

void Minidump::stop() {
if (_stop) {
if (config::disable_minidump || _stop) {
return;
}
_stop = true;
Expand Down
19 changes: 9 additions & 10 deletions be/src/util/priority_thread_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ class PriorityThreadPool {
// -- queue_size: the maximum size of the queue on which work items are offered. If the
// queue exceeds this size, subsequent calls to Offer will block until there is
// capacity available.
// -- work_function: the function to run every time an item is consumed from the queue
PriorityThreadPool(uint32_t num_threads, uint32_t queue_size)
: _work_queue(queue_size), _shutdown(false) {
for (int i = 0; i < num_threads; ++i) {
Expand Down Expand Up @@ -118,6 +117,15 @@ class PriorityThreadPool {
protected:
virtual bool is_shutdown() { return _shutdown; }

// Collection of worker threads that process work from the queue.
ThreadGroup _threads;

// Guards _empty_cv
std::mutex _lock;

// Signalled when the queue becomes empty
std::condition_variable _empty_cv;

private:
// Driver method for each thread in the pool. Continues to read work from the queue
// until the pool is shutdown.
Expand All @@ -137,17 +145,8 @@ class PriorityThreadPool {
// FIFO order.
BlockingPriorityQueue<Task> _work_queue;

// Collection of worker threads that process work from the queue.
ThreadGroup _threads;

// Guards _empty_cv
std::mutex _lock;

// Set to true when threads should stop doing work and terminate.
std::atomic<bool> _shutdown;

// Signalled when the queue becomes empty
std::condition_variable _empty_cv;
};

} // namespace doris
19 changes: 5 additions & 14 deletions be/src/util/priority_work_stealing_thread_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ class PriorityWorkStealingThreadPool : public PriorityThreadPool {
// -- queue_size: the maximum size of the queue on which work items are offered. If the
// queue exceeds this size, subsequent calls to Offer will block until there is
// capacity available.
// -- work_function: the function to run every time an item is consumed from the queue
PriorityWorkStealingThreadPool(uint32_t num_threads, uint32_t num_queues, uint32_t queue_size)
: PriorityThreadPool(0, 0) {
DCHECK_GT(num_queues, 0);
Expand All @@ -50,6 +49,11 @@ class PriorityWorkStealingThreadPool : public PriorityThreadPool {
}
}

virtual ~PriorityWorkStealingThreadPool() {
shutdown();
join();
}

// Blocking operation that puts a work item on the queue. If the queue is full, blocks
// until there is capacity available.
//
Expand Down Expand Up @@ -79,10 +83,6 @@ class PriorityWorkStealingThreadPool : public PriorityThreadPool {
}
}

// Blocks until all threads are finished. shutdown does not need to have been called,
// since it may be called on a separate thread.
void join() override { _threads.join_all(); }

uint32_t get_queue_size() const override {
uint32_t size = 0;
for (auto work_queue : _work_queues) {
Expand Down Expand Up @@ -141,15 +141,6 @@ class PriorityWorkStealingThreadPool : public PriorityThreadPool {
// Queue on which work items are held until a thread is available to process them in
// FIFO order.
std::vector<std::shared_ptr<BlockingPriorityQueue<Task>>> _work_queues;

// Collection of worker threads that process work from the queues.
ThreadGroup _threads;

// Guards _empty_cv
std::mutex _lock;

// Signalled when the queue becomes empty
std::condition_variable _empty_cv;
};

} // namespace doris