This source file includes following definitions.
- _sir_threadpool_create
- _sir_threadpool_add_job
- _sir_threadpool_destroy
- thread_pool_proc
   1 
   2 
   3 
   4 
   5 
   6 
   7 
   8 
   9 
  10 
  11 
  12 
  13 
  14 
  15 
  16 
  17 
  18 
  19 
  20 
  21 
  22 
  23 
  24 
  25 
  26 
  27 
  28 
  29 
  30 
  31 
  32 
  33 #include "sir/condition.h"
  34 #include "sir/threadpool.h"
  35 #include "sir/internal.h"
  36 #include "sir/queue.h"
  37 #include "sir/mutex.h"
  38 
  39 #if !defined(__WIN__)
  40 static void* thread_pool_proc(void* arg);
  41 #else
  42 static unsigned __stdcall thread_pool_proc(void* arg);
  43 #endif
  44 
  45 bool _sir_threadpool_create(sir_threadpool** pool, size_t num_threads) {
     
  46     if (!pool || !num_threads || num_threads > SIR_THREADPOOL_MAX_THREADS)
  47         return _sir_seterror(_SIR_E_INVALID);
  48 
  49     *pool = calloc(1, sizeof(sir_threadpool));
  50     if (!*pool)
  51         return _sir_handleerr(errno);
  52 
  53     (*pool)->threads = calloc(num_threads, sizeof(sir_thread));
  54     if (!(*pool)->threads) {
  55         int err = errno;
  56         _sir_safefree(pool);
  57         return _sir_handleerr(err);
  58     }
  59 
  60     (*pool)->num_threads = num_threads;
  61 
  62     if (!_sir_queue_create(&(*pool)->jobs) || !_sir_condcreate(&(*pool)->cond) ||
  63         !_sir_mutexcreate(&(*pool)->mutex)) {
  64         bool destroy = _sir_threadpool_destroy(pool);
  65         SIR_ASSERT_UNUSED(destroy, destroy);
  66         return false;
  67     }
  68 
  69 #if !defined(__WIN__)
  70     pthread_attr_t attr = {0};
  71     int op = pthread_attr_init(&attr);
  72     if (0 != op) {
  73         bool destroy = _sir_threadpool_destroy(pool);
  74         SIR_ASSERT_UNUSED(destroy, destroy);
  75         return _sir_handleerr(op);
  76     }
  77 #endif
  78 
  79     int thrd_err     = 0;
  80     bool thrd_create = true;
  81     for (size_t n = 0; n < num_threads; n++) {
  82 #if !defined(__WIN__)
  83         op = pthread_create(&(*pool)->threads[n], &attr, &thread_pool_proc, *pool);
  84         if (0 != op) {
  85             (*pool)->threads[n] = 0;
  86             thrd_err    = op;
  87             thrd_create = false;
  88             break;
  89         }
  90 #else 
  91         (*pool)->threads[n] = (HANDLE)_beginthreadex(NULL, 0, &thread_pool_proc,
  92             *pool, 0, NULL);
  93         if (!(*pool)->threads[n]) {
  94             thrd_err    = errno;
  95             thrd_create = false;
  96             break;
  97         }
  98 #endif
  99     }
 100 
 101 #if !defined(__WIN__)
 102     op = pthread_attr_destroy(&attr);
 103     SIR_ASSERT_UNUSED(0 == op, op);
 104 #endif
 105 
 106     if (!thrd_create) {
 107         bool destroy = _sir_threadpool_destroy(pool);
 108         SIR_ASSERT_UNUSED(destroy, destroy);
 109         return _sir_handleerr(thrd_err);
 110     }
 111 
 112     return !!*pool;
 113 }
 114 
 115 bool _sir_threadpool_add_job(sir_threadpool* pool, sir_threadpool_job* job) {
     
 116     bool retval = false;
 117 
 118     if (pool && pool->jobs && job && job->fn && job->data) {
 119         bool locked = _sir_mutexlock(&pool->mutex);
 120         SIR_ASSERT(locked);
 121 
 122         if (locked) {
 123             if (_sir_queue_push(pool->jobs, job)) {
 124                 retval = _sir_condbroadcast(&pool->cond);
 125                 _sir_selflog("added job; new size: %zu", _sir_queue_size(pool->jobs));
 126             }
 127 
 128             bool unlocked = _sir_mutexunlock(&pool->mutex);
 129             SIR_ASSERT_UNUSED(unlocked, unlocked);
 130         }
 131     }
 132 
 133     return retval;
 134 }
 135 
 136 bool _sir_threadpool_destroy(sir_threadpool** pool) {
     
 137     if (!pool || !*pool)
 138         return _sir_seterror(_SIR_E_INVALID);
 139 
 140     bool locked = _sir_mutexlock(&(*pool)->mutex);
 141     SIR_ASSERT(locked);
 142 
 143     if (locked) {
 144         _sir_selflog("broadcasting signal to condition var...");
 145         (*pool)->cancel = true;
 146 
 147         bool bcast = _sir_condbroadcast(&(*pool)->cond);
 148         SIR_ASSERT_UNUSED(bcast, bcast);
 149 
 150         bool unlock = _sir_mutexunlock(&(*pool)->mutex);
 151         SIR_ASSERT_UNUSED(unlock, unlock);
 152     }
 153 
 154     bool destroy = true;
 155     for (size_t n = 0; n < (*pool)->num_threads; n++) {
 156         SIR_ASSERT(0 != (*pool)->threads[n]);
 157         if (0 == (*pool)->threads[n])
 158             continue;
 159         _sir_selflog("joining thread %zu of %zu...", n + 1, (*pool)->num_threads);
 160 #if !defined(__WIN__)
 161         int join = pthread_join((*pool)->threads[n], NULL);
 162         SIR_ASSERT(0 == join);
 163         _sir_eqland(destroy, 0 == join);
 164 #else 
 165         DWORD join = WaitForSingleObject((*pool)->threads[n], INFINITE);
 166         SIR_ASSERT(WAIT_OBJECT_0 == join);
 167         _sir_eqland(destroy, WAIT_OBJECT_0 == join);
 168 #endif
 169     }
 170 
 171     _sir_eqland(destroy, _sir_queue_destroy(&(*pool)->jobs));
 172     SIR_ASSERT(destroy);
 173 
 174     _sir_eqland(destroy, _sir_conddestroy(&(*pool)->cond));
 175     SIR_ASSERT(destroy);
 176 
 177     _sir_eqland(destroy, _sir_mutexdestroy(&(*pool)->mutex));
 178     SIR_ASSERT(destroy);
 179 
 180     _sir_safefree(&(*pool)->threads);
 181     _sir_safefree(pool);
 182 
 183     return destroy;
 184 }
 185 
 186 #if !defined(__WIN__)
 187 static void* thread_pool_proc(void* arg)
     
 188 #else
 189 static unsigned __stdcall thread_pool_proc(void* arg)
 190 #endif
 191 {
 192     sir_threadpool* pool = (sir_threadpool*)arg;
 193     while (true) {
 194         bool locked = _sir_mutexlock(&pool->mutex);
 195         SIR_ASSERT_UNUSED(locked, locked);
 196 
 197         while (_sir_queue_isempty(pool->jobs) && !pool->cancel) {
 198 #if !defined(__WIN__)
 199             
 200             sir_wait wait = {time(NULL) + 2, 0};
 201 #else
 202             
 203             sir_wait wait = 2000;
 204 #endif
 205             (void)_sir_condwait_timeout(&pool->cond, &pool->mutex, &wait);
 206         }
 207 
 208         if (!pool->cancel) {
 209             sir_threadpool_job* job = NULL;
 210             bool job_popped         = _sir_queue_pop(pool->jobs, (void**)&job);
 211 
 212             bool unlocked = _sir_mutexunlock(&pool->mutex);
 213             SIR_ASSERT_UNUSED(unlocked, unlocked);
 214 
 215             if (job_popped) {
 216                 _sir_selflog("picked up job (fn: %"PRIxPTR", data: %p)",
 217                     (uintptr_t)job->fn, job->data);
 218                 job->fn(job->data);
 219                 _sir_safefree(&job);
 220             }
 221         } else {
 222             _sir_selflog("cancel flag is set; exiting");
 223             bool unlocked = _sir_mutexunlock(&pool->mutex);
 224             SIR_ASSERT_UNUSED(unlocked, unlocked);
 225             break;
 226         }
 227     }
 228 
 229 #if !defined(__WIN__)
 230     return NULL;
 231 #else 
 232     return 0U;
 233 #endif
 234 }