root/src/libsir/src/sirthreadpool.c

/* [previous][next][first][last][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. _sir_threadpool_create
  2. _sir_threadpool_add_job
  3. _sir_threadpool_destroy
  4. thread_pool_proc

   1 /*
   2  * sirthreadpool.c
   3  *
   4  * Version: 2.2.5
   5  *
   6  * -----------------------------------------------------------------------------
   7  *
   8  * SPDX-License-Identifier: MIT
   9  *
  10  * Copyright (c) 2018-2024 Ryan M. Lederman <lederman@gmail.com>
  11  * Copyright (c) 2018-2024 Jeffrey H. Johnson <trnsz@pobox.com>
  12  *
  13  * Permission is hereby granted, free of charge, to any person obtaining a copy of
  14  * this software and associated documentation files (the "Software"), to deal in
  15  * the Software without restriction, including without limitation the rights to
  16  * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
  17  * the Software, and to permit persons to whom the Software is furnished to do so,
  18  * subject to the following conditions:
  19  *
  20  * The above copyright notice and this permission notice shall be included in all
  21  * copies or substantial portions of the Software.
  22  *
  23  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  24  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
  25  * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
  26  * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
  27  * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  28  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  29  *
  30  * -----------------------------------------------------------------------------
  31  */
  32 
  33 #include "sir/condition.h"
  34 #include "sir/threadpool.h"
  35 #include "sir/internal.h"
  36 #include "sir/queue.h"
  37 #include "sir/mutex.h"
  38 
  39 #if !defined(__WIN__)
  40 static void* thread_pool_proc(void* arg);
  41 #else
  42 static unsigned __stdcall thread_pool_proc(void* arg);
  43 #endif
  44 
  45 bool _sir_threadpool_create(sir_threadpool** pool, size_t num_threads) {
     /* [previous][next][first][last][top][bottom][index][help] */
  46     if (!pool || !num_threads || num_threads > SIR_THREADPOOL_MAX_THREADS)
  47         return _sir_seterror(_SIR_E_INVALID);
  48 
  49     *pool = calloc(1, sizeof(sir_threadpool));
  50     if (!*pool)
  51         return _sir_handleerr(errno);
  52 
  53     (*pool)->threads = calloc(num_threads, sizeof(sir_thread));
  54     if (!(*pool)->threads) {
  55         int err = errno;
  56         _sir_safefree(pool);
  57         return _sir_handleerr(err);
  58     }
  59 
  60     (*pool)->num_threads = num_threads;
  61 
  62     if (!_sir_queue_create(&(*pool)->jobs) || !_sir_condcreate(&(*pool)->cond) ||
  63         !_sir_mutexcreate(&(*pool)->mutex)) {
  64         bool destroy = _sir_threadpool_destroy(pool);
  65         SIR_ASSERT_UNUSED(destroy, destroy);
  66         return false;
  67     }
  68 
  69 #if !defined(__WIN__)
  70     pthread_attr_t attr = {0};
  71     int op = pthread_attr_init(&attr);
  72     if (0 != op) {
  73         bool destroy = _sir_threadpool_destroy(pool);
  74         SIR_ASSERT_UNUSED(destroy, destroy);
  75         return _sir_handleerr(op);
  76     }
  77 #endif
  78 
  79     int thrd_err     = 0;
  80     bool thrd_create = true;
  81     for (size_t n = 0; n < num_threads; n++) {
  82 #if !defined(__WIN__)
  83         op = pthread_create(&(*pool)->threads[n], &attr, &thread_pool_proc, *pool);
  84         if (0 != op) {
  85             (*pool)->threads[n] = 0;
  86             thrd_err    = op;
  87             thrd_create = false;
  88             break;
  89         }
  90 #else /* __WIN__ */
  91         (*pool)->threads[n] = (HANDLE)_beginthreadex(NULL, 0, &thread_pool_proc,
  92             *pool, 0, NULL);
  93         if (!(*pool)->threads[n]) {
  94             thrd_err    = errno;
  95             thrd_create = false;
  96             break;
  97         }
  98 #endif
  99     }
 100 
 101 #if !defined(__WIN__)
 102     op = pthread_attr_destroy(&attr);
 103     SIR_ASSERT_UNUSED(0 == op, op);
 104 #endif
 105 
 106     if (!thrd_create) {
 107         bool destroy = _sir_threadpool_destroy(pool);
 108         SIR_ASSERT_UNUSED(destroy, destroy);
 109         return _sir_handleerr(thrd_err);
 110     }
 111 
 112     return !!*pool;
 113 }
 114 
 115 bool _sir_threadpool_add_job(sir_threadpool* pool, sir_threadpool_job* job) {
     /* [previous][next][first][last][top][bottom][index][help] */
 116     bool retval = false;
 117 
 118     if (pool && pool->jobs && job && job->fn && job->data) {
 119         bool locked = _sir_mutexlock(&pool->mutex);
 120         SIR_ASSERT(locked);
 121 
 122         if (locked) {
 123             if (_sir_queue_push(pool->jobs, job)) {
 124                 retval = _sir_condbroadcast(&pool->cond);
 125                 _sir_selflog("added job; new size: %zu", _sir_queue_size(pool->jobs));
 126             }
 127 
 128             bool unlocked = _sir_mutexunlock(&pool->mutex);
 129             SIR_ASSERT_UNUSED(unlocked, unlocked);
 130         }
 131     }
 132 
 133     return retval;
 134 }
 135 
 136 bool _sir_threadpool_destroy(sir_threadpool** pool) {
     /* [previous][next][first][last][top][bottom][index][help] */
 137     if (!pool || !*pool)
 138         return _sir_seterror(_SIR_E_INVALID);
 139 
 140     bool locked = _sir_mutexlock(&(*pool)->mutex);
 141     SIR_ASSERT(locked);
 142 
 143     if (locked) {
 144         _sir_selflog("broadcasting signal to condition var...");
 145         (*pool)->cancel = true;
 146 
 147         bool bcast = _sir_condbroadcast(&(*pool)->cond);
 148         SIR_ASSERT_UNUSED(bcast, bcast);
 149 
 150         bool unlock = _sir_mutexunlock(&(*pool)->mutex);
 151         SIR_ASSERT_UNUSED(unlock, unlock);
 152     }
 153 
 154     bool destroy = true;
 155     for (size_t n = 0; n < (*pool)->num_threads; n++) {
 156         SIR_ASSERT(0 != (*pool)->threads[n]);
 157         if (0 == (*pool)->threads[n])
 158             continue;
 159         _sir_selflog("joining thread %zu of %zu...", n + 1, (*pool)->num_threads);
 160 #if !defined(__WIN__)
 161         int join = pthread_join((*pool)->threads[n], NULL);
 162         SIR_ASSERT(0 == join);
 163         _sir_eqland(destroy, 0 == join);
 164 #else /* __WIN__ */
 165         DWORD join = WaitForSingleObject((*pool)->threads[n], INFINITE);
 166         SIR_ASSERT(WAIT_OBJECT_0 == join);
 167         _sir_eqland(destroy, WAIT_OBJECT_0 == join);
 168 #endif
 169     }
 170 
 171     _sir_eqland(destroy, _sir_queue_destroy(&(*pool)->jobs));
 172     SIR_ASSERT(destroy);
 173 
 174     _sir_eqland(destroy, _sir_conddestroy(&(*pool)->cond));
 175     SIR_ASSERT(destroy);
 176 
 177     _sir_eqland(destroy, _sir_mutexdestroy(&(*pool)->mutex));
 178     SIR_ASSERT(destroy);
 179 
 180     _sir_safefree(&(*pool)->threads);
 181     _sir_safefree(pool);
 182 
 183     return destroy;
 184 }
 185 
 186 #if !defined(__WIN__)
 187 static void* thread_pool_proc(void* arg)
     /* [previous][next][first][last][top][bottom][index][help] */
 188 #else
 189 static unsigned __stdcall thread_pool_proc(void* arg)
 190 #endif
 191 {
 192     sir_threadpool* pool = (sir_threadpool*)arg;
 193     while (true) {
 194         bool locked = _sir_mutexlock(&pool->mutex);
 195         SIR_ASSERT_UNUSED(locked, locked);
 196 
 197         while (_sir_queue_isempty(pool->jobs) && !pool->cancel) {
 198 #if !defined(__WIN__)
 199             /* seconds; absolute fixed time. */
 200             sir_wait wait = {time(NULL) + 2, 0};
 201 #else
 202             /* msec; relative from now. */
 203             sir_wait wait = 2000;
 204 #endif
 205             (void)_sir_condwait_timeout(&pool->cond, &pool->mutex, &wait);
 206         }
 207 
 208         if (!pool->cancel) {
 209             sir_threadpool_job* job = NULL;
 210             bool job_popped         = _sir_queue_pop(pool->jobs, (void**)&job);
 211 
 212             bool unlocked = _sir_mutexunlock(&pool->mutex);
 213             SIR_ASSERT_UNUSED(unlocked, unlocked);
 214 
 215             if (job_popped) {
 216                 _sir_selflog("picked up job (fn: %"PRIxPTR", data: %p)",
 217                     (uintptr_t)job->fn, job->data);
 218                 job->fn(job->data);
 219                 _sir_safefree(&job);
 220             }
 221         } else {
 222             _sir_selflog("cancel flag is set; exiting");
 223             bool unlocked = _sir_mutexunlock(&pool->mutex);
 224             SIR_ASSERT_UNUSED(unlocked, unlocked);
 225             break;
 226         }
 227     }
 228 
 229 #if !defined(__WIN__)
 230     return NULL;
 231 #else /* __WIN__ */
 232     return 0U;
 233 #endif
 234 }

/* [previous][next][first][last][top][bottom][index][help] */