-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathThreadPool.cpp
More file actions
106 lines (99 loc) · 2.69 KB
/
ThreadPool.cpp
File metadata and controls
106 lines (99 loc) · 2.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
#include <iostream>
#include "RequestInfo.h"
#include "ThreadPool.h"
#include "UDPResponse.h"
#include "commons.h"
#include "utils.h"
template <typename T>
ThreadPool<T>::ThreadPool(T& t):
myTask(t),
myCondVar(),
myMutex(),
myQueue(),
myThreads(NUM_THREADS),
running(false),
myThrdCounter(0) {
#if defined(DEBUG) && VERBOSENESS > 2
std::cout << getUTCTime() << " [DEBUG] Constructing ThreadPool class..." << '\n';
#endif
}
template <typename T>
ThreadPool<T>::~ThreadPool() {
#if defined(DEBUG) && VERBOSENESS > 2
std::cout << getUTCTime() << " [DEBUG] Destructing ThreadPool class..." << '\n';
#endif
stop();
}
template <typename T>
void ThreadPool<T>::start() {
std::unique_lock<std::mutex> l(myMutex);
if (!(running)) {
for (auto& thrd: myThreads) {
thrd = std::thread(&ThreadPool<T>::thread, this);
}
running = true;
}
}
template <typename T>
void ThreadPool<T>::stop() {
bool join = false;
{
std::unique_lock<std::mutex> l(myMutex);
if (running) {
running = false;
join = true;
myCondVar.notify_all();
}
}
if (join) {
for (auto& thrd: myThreads) { // TODO Check how this loop can be skipped when not necessary (the threads are not running)!
if (thrd.joinable()) {
thrd.join();
}
}
}
}
template <typename T>
void ThreadPool<T>::append(const RequestInfo& r) {
size_t queueSize = 0;
{
std::unique_lock<std::mutex> l(myMutex);
// TODO Add here a proper code block if a queue limit is necessary!
myQueue.emplace(r);
queueSize = myQueue.size();
myCondVar.notify_one();
}
#if defined(DEBUG) && VERBOSENESS > 2
std::cout << getUTCTime() << " [DEBUG] Appending request to thread pool" << '\n';
std::cout << "Current queue size: " << queueSize << '\n';
#endif
}
template <typename T>
void ThreadPool<T>::thread() {
RequestInfo r(UDP_BUFFER_SIZE);
unsigned int thrdNum = ++myThrdCounter;
while (true) {
{
std::unique_lock<std::mutex> l(myMutex);
if (myQueue.empty()) {
if (running) {
myCondVar.wait(l);
continue;
} else {
#if defined(DEBUG) && VERBOSENESS > 1
std::cout << getUTCTime() << " [DEBUG] Exiting thread n. " << thrdNum << "..." << '\n';
#endif
break;
}
}
r = myQueue.front();
myQueue.pop();
// TODO Add here a proper code block if a queue limit is necessary!
}
#if defined(DEBUG) && VERBOSENESS > 2
std::cout << getUTCTime() << " [DEBUG] Thread n." << thrdNum << " is computing request " << r.referDatagram().token() << "-" << r.referDatagram().tid() << '\n';
#endif
myTask(r); // Do the task, passing it the request informations...
}
}
template class ThreadPool<UDPResponse>;