This source file includes following definitions.
- _sir_threadpool_create
- _sir_threadpool_add_job
- _sir_threadpool_destroy
- thread_pool_proc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 #include "sir/condition.h"
34 #include "sir/threadpool.h"
35 #include "sir/internal.h"
36 #include "sir/queue.h"
37 #include "sir/mutex.h"
38
39 #if !defined(__WIN__)
40 static void* thread_pool_proc(void* arg);
41 #else
42 static unsigned __stdcall thread_pool_proc(void* arg);
43 #endif
44
45 bool _sir_threadpool_create(sir_threadpool** pool, size_t num_threads) {
46 if (!pool || !num_threads || num_threads > SIR_THREADPOOL_MAX_THREADS)
47 return _sir_seterror(_SIR_E_INVALID);
48
49 *pool = calloc(1, sizeof(sir_threadpool));
50 if (!*pool)
51 return _sir_handleerr(errno);
52
53 (*pool)->threads = calloc(num_threads, sizeof(sir_thread));
54 if (!(*pool)->threads) {
55 int err = errno;
56 _sir_safefree(pool);
57 return _sir_handleerr(err);
58 }
59
60 (*pool)->num_threads = num_threads;
61
62 if (!_sir_queue_create(&(*pool)->jobs) || !_sir_condcreate(&(*pool)->cond) ||
63 !_sir_mutexcreate(&(*pool)->mutex)) {
64 bool destroy = _sir_threadpool_destroy(pool);
65 SIR_ASSERT_UNUSED(destroy, destroy);
66 return false;
67 }
68
69 #if !defined(__WIN__)
70 pthread_attr_t attr = {0};
71 int op = pthread_attr_init(&attr);
72 if (0 != op) {
73 bool destroy = _sir_threadpool_destroy(pool);
74 SIR_ASSERT_UNUSED(destroy, destroy);
75 return _sir_handleerr(op);
76 }
77 #endif
78
79 int thrd_err = 0;
80 bool thrd_create = true;
81 for (size_t n = 0; n < num_threads; n++) {
82 #if !defined(__WIN__)
83 op = pthread_create(&(*pool)->threads[n], &attr, &thread_pool_proc, *pool);
84 if (0 != op) {
85 (*pool)->threads[n] = 0;
86 thrd_err = op;
87 thrd_create = false;
88 break;
89 }
90 #else
91 (*pool)->threads[n] = (HANDLE)_beginthreadex(NULL, 0, &thread_pool_proc,
92 *pool, 0, NULL);
93 if (!(*pool)->threads[n]) {
94 thrd_err = errno;
95 thrd_create = false;
96 break;
97 }
98 #endif
99 }
100
101 #if !defined(__WIN__)
102 op = pthread_attr_destroy(&attr);
103 SIR_ASSERT_UNUSED(0 == op, op);
104 #endif
105
106 if (!thrd_create) {
107 bool destroy = _sir_threadpool_destroy(pool);
108 SIR_ASSERT_UNUSED(destroy, destroy);
109 return _sir_handleerr(thrd_err);
110 }
111
112 return !!*pool;
113 }
114
115 bool _sir_threadpool_add_job(sir_threadpool* pool, sir_threadpool_job* job) {
116 bool retval = false;
117
118 if (pool && pool->jobs && job && job->fn && job->data) {
119 bool locked = _sir_mutexlock(&pool->mutex);
120 SIR_ASSERT(locked);
121
122 if (locked) {
123 if (_sir_queue_push(pool->jobs, job)) {
124 retval = _sir_condbroadcast(&pool->cond);
125 _sir_selflog("added job; new size: %zu", _sir_queue_size(pool->jobs));
126 }
127
128 bool unlocked = _sir_mutexunlock(&pool->mutex);
129 SIR_ASSERT_UNUSED(unlocked, unlocked);
130 }
131 }
132
133 return retval;
134 }
135
136 bool _sir_threadpool_destroy(sir_threadpool** pool) {
137 if (!pool || !*pool)
138 return _sir_seterror(_SIR_E_INVALID);
139
140 bool locked = _sir_mutexlock(&(*pool)->mutex);
141 SIR_ASSERT(locked);
142
143 if (locked) {
144 _sir_selflog("broadcasting signal to condition var...");
145 (*pool)->cancel = true;
146
147 bool bcast = _sir_condbroadcast(&(*pool)->cond);
148 SIR_ASSERT_UNUSED(bcast, bcast);
149
150 bool unlock = _sir_mutexunlock(&(*pool)->mutex);
151 SIR_ASSERT_UNUSED(unlock, unlock);
152 }
153
154 bool destroy = true;
155 for (size_t n = 0; n < (*pool)->num_threads; n++) {
156 SIR_ASSERT(0 != (*pool)->threads[n]);
157 if (0 == (*pool)->threads[n])
158 continue;
159 _sir_selflog("joining thread %zu of %zu...", n + 1, (*pool)->num_threads);
160 #if !defined(__WIN__)
161 int join = pthread_join((*pool)->threads[n], NULL);
162 SIR_ASSERT(0 == join);
163 _sir_eqland(destroy, 0 == join);
164 #else
165 DWORD join = WaitForSingleObject((*pool)->threads[n], INFINITE);
166 SIR_ASSERT(WAIT_OBJECT_0 == join);
167 _sir_eqland(destroy, WAIT_OBJECT_0 == join);
168 #endif
169 }
170
171 _sir_eqland(destroy, _sir_queue_destroy(&(*pool)->jobs));
172 SIR_ASSERT(destroy);
173
174 _sir_eqland(destroy, _sir_conddestroy(&(*pool)->cond));
175 SIR_ASSERT(destroy);
176
177 _sir_eqland(destroy, _sir_mutexdestroy(&(*pool)->mutex));
178 SIR_ASSERT(destroy);
179
180 _sir_safefree(&(*pool)->threads);
181 _sir_safefree(pool);
182
183 return destroy;
184 }
185
186 #if !defined(__WIN__)
187 static void* thread_pool_proc(void* arg)
188 #else
189 static unsigned __stdcall thread_pool_proc(void* arg)
190 #endif
191 {
192 sir_threadpool* pool = (sir_threadpool*)arg;
193 while (true) {
194 bool locked = _sir_mutexlock(&pool->mutex);
195 SIR_ASSERT_UNUSED(locked, locked);
196
197 while (_sir_queue_isempty(pool->jobs) && !pool->cancel) {
198 #if !defined(__WIN__)
199
200 sir_wait wait = {time(NULL) + 2, 0};
201 #else
202
203 sir_wait wait = 2000;
204 #endif
205 (void)_sir_condwait_timeout(&pool->cond, &pool->mutex, &wait);
206 }
207
208 if (!pool->cancel) {
209 sir_threadpool_job* job = NULL;
210 bool job_popped = _sir_queue_pop(pool->jobs, (void**)&job);
211
212 bool unlocked = _sir_mutexunlock(&pool->mutex);
213 SIR_ASSERT_UNUSED(unlocked, unlocked);
214
215 if (job_popped) {
216 _sir_selflog("picked up job (fn: %"PRIxPTR", data: %p)",
217 (uintptr_t)job->fn, job->data);
218 job->fn(job->data);
219 _sir_safefree(&job);
220 }
221 } else {
222 _sir_selflog("cancel flag is set; exiting");
223 bool unlocked = _sir_mutexunlock(&pool->mutex);
224 SIR_ASSERT_UNUSED(unlocked, unlocked);
225 break;
226 }
227 }
228
229 #if !defined(__WIN__)
230 return NULL;
231 #else
232 return 0U;
233 #endif
234 }