SphinxBase 0.6

src/libsphinxbase/util/sbthread.c

Go to the documentation of this file.
00001 /* -*- c-basic-offset: 4; indent-tabs-mode: nil -*- */
00002 /* ====================================================================
00003  * Copyright (c) 2008 Carnegie Mellon University.  All rights 
00004  * reserved.
00005  *
00006  * Redistribution and use in source and binary forms, with or without
00007  * modification, are permitted provided that the following conditions
00008  * are met:
00009  *
00010  * 1. Redistributions of source code must retain the above copyright
00011  *    notice, this list of conditions and the following disclaimer. 
00012  *
00013  * 2. Redistributions in binary form must reproduce the above copyright
00014  *    notice, this list of conditions and the following disclaimer in
00015  *    the documentation and/or other materials provided with the
00016  *    distribution.
00017  *
00018  * This work was supported in part by funding from the Defense Advanced 
00019  * Research Projects Agency and the National Science Foundation of the 
00020  * United States of America, and the CMU Sphinx Speech Consortium.
00021  *
00022  * THIS SOFTWARE IS PROVIDED BY CARNEGIE MELLON UNIVERSITY ``AS IS'' AND 
00023  * ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, 
00024  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
00025  * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL CARNEGIE MELLON UNIVERSITY
00026  * NOR ITS EMPLOYEES BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
00027  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 
00028  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 
00029  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 
00030  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 
00031  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 
00032  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00033  *
00034  * ====================================================================
00035  *
00036  */
00037 
00044 #include <string.h>
00045 
00046 #include "sphinxbase/sbthread.h"
00047 #include "sphinxbase/ckd_alloc.h"
00048 #include "sphinxbase/err.h"
00049 
00050 /*
00051  * Platform-specific parts: threads, mutexes, and signals.
00052  */
00053 #if (defined(_WIN32) || defined(__CYGWIN__)) && !defined(__SYMBIAN32__)
00054 #define _WIN32_WINNT 0x0400
00055 #include <windows.h>
00056 
00057 struct sbthread_s {
00058     cmd_ln_t *config;
00059     sbmsgq_t *msgq;
00060     sbthread_main func;
00061     void *arg;
00062     HANDLE th;
00063     DWORD tid;
00064 };
00065 
00066 struct sbmsgq_s {
00067     /* Ringbuffer for passing messages. */
00068     char *data;
00069     size_t depth;
00070     size_t out;
00071     size_t nbytes;
00072 
00073     /* Current message is stored here. */
00074     char *msg;
00075     size_t msglen;
00076     CRITICAL_SECTION mtx;
00077     HANDLE evt;
00078 };
00079 
00080 struct sbevent_s {
00081     HANDLE evt;
00082 };
00083 
00084 struct sbmtx_s {
00085     CRITICAL_SECTION mtx;
00086 };
00087 
00088 DWORD WINAPI
00089 sbthread_internal_main(LPVOID arg)
00090 {
00091     sbthread_t *th = (sbthread_t *)arg;
00092     int rv;
00093 
00094     rv = (*th->func)(th);
00095     return (DWORD)rv;
00096 }
00097 
00098 sbthread_t *
00099 sbthread_start(cmd_ln_t *config, sbthread_main func, void *arg)
00100 {
00101     sbthread_t *th;
00102 
00103     th = ckd_calloc(1, sizeof(*th));
00104     th->config = config;
00105     th->func = func;
00106     th->arg = arg;
00107     th->msgq = sbmsgq_init(256);
00108     th->th = CreateThread(NULL, 0, sbthread_internal_main, th, 0, &th->tid);
00109     if (th->th == NULL) {
00110         sbthread_free(th);
00111         return NULL;
00112     }
00113     return th;
00114 }
00115 
00116 int
00117 sbthread_wait(sbthread_t *th)
00118 {
00119     DWORD rv, exit;
00120 
00121     /* It has already been joined. */
00122     if (th->th == NULL)
00123         return -1;
00124 
00125     rv = WaitForSingleObject(th->th, INFINITE);
00126     if (rv == WAIT_FAILED) {
00127         E_ERROR("Failed to join thread: WAIT_FAILED\n");
00128         return -1;
00129     }
00130     GetExitCodeThread(th->th, &exit);
00131     CloseHandle(th->th);
00132     th->th = NULL;
00133     return (int)exit;
00134 }
00135 
00136 static DWORD
00137 cond_timed_wait(HANDLE cond, int sec, int nsec)
00138 {
00139     DWORD rv;
00140     if (sec == -1) {
00141         rv = WaitForSingleObject(cond, INFINITE);
00142     }
00143     else {
00144         DWORD ms;
00145 
00146         ms = sec * 1000 + nsec / (1000*1000);
00147         rv = WaitForSingleObject(cond, ms);
00148     }
00149     return rv;
00150 }
00151 
00152 /* Silvio Moioli: updated to use Unicode */
00153 sbevent_t *
00154 sbevent_init(void)
00155 {
00156     sbevent_t *evt;
00157 
00158     evt = ckd_calloc(1, sizeof(*evt));
00159     evt->evt = CreateEventW(NULL, FALSE, FALSE, NULL);
00160     if (evt->evt == NULL) {
00161         ckd_free(evt);
00162         return NULL;
00163     }
00164     return evt;
00165 }
00166 
00167 void
00168 sbevent_free(sbevent_t *evt)
00169 {
00170     CloseHandle(evt->evt);
00171     ckd_free(evt);
00172 }
00173 
00174 int
00175 sbevent_signal(sbevent_t *evt)
00176 {
00177     return SetEvent(evt->evt) ? 0 : -1;
00178 }
00179 
00180 int
00181 sbevent_wait(sbevent_t *evt, int sec, int nsec)
00182 {
00183     DWORD rv;
00184 
00185     rv = cond_timed_wait(evt->evt, sec, nsec);
00186     return rv;
00187 }
00188 
00189 sbmtx_t *
00190 sbmtx_init(void)
00191 {
00192     sbmtx_t *mtx;
00193 
00194     mtx = ckd_calloc(1, sizeof(*mtx));
00195     InitializeCriticalSection(&mtx->mtx);
00196     return mtx;
00197 }
00198 
00199 int
00200 sbmtx_trylock(sbmtx_t *mtx)
00201 {
00202     return TryEnterCriticalSection(&mtx->mtx) ? 0 : -1;
00203 }
00204 
00205 int
00206 sbmtx_lock(sbmtx_t *mtx)
00207 {
00208     EnterCriticalSection(&mtx->mtx);
00209     return 0;
00210 }
00211 
00212 int
00213 sbmtx_unlock(sbmtx_t *mtx)
00214 {
00215     LeaveCriticalSection(&mtx->mtx);
00216     return 0;
00217 }
00218 
00219 void
00220 sbmtx_free(sbmtx_t *mtx)
00221 {
00222     DeleteCriticalSection(&mtx->mtx);
00223     ckd_free(mtx);
00224 }
00225 
00226 sbmsgq_t *
00227 sbmsgq_init(size_t depth)
00228 {
00229     sbmsgq_t *msgq;
00230 
00231     msgq = ckd_calloc(1, sizeof(*msgq));
00232     msgq->depth = depth;
00233     msgq->evt = CreateEventW(NULL, FALSE, FALSE, NULL);
00234     if (msgq->evt == NULL) {
00235         ckd_free(msgq);
00236         return NULL;
00237     }
00238     InitializeCriticalSection(&msgq->mtx);
00239     msgq->data = ckd_calloc(depth, 1);
00240     msgq->msg = ckd_calloc(depth, 1);
00241     return msgq;
00242 }
00243 
00244 void
00245 sbmsgq_free(sbmsgq_t *msgq)
00246 {
00247     CloseHandle(msgq->evt);
00248     ckd_free(msgq->data);
00249     ckd_free(msgq->msg);
00250     ckd_free(msgq);
00251 }
00252 
00253 int
00254 sbmsgq_send(sbmsgq_t *q, size_t len, void const *data)
00255 {
00256     char const *cdata = (char const *)data;
00257     size_t in;
00258 
00259     /* Don't allow things bigger than depth to be sent! */
00260     if (len + sizeof(len) > q->depth)
00261         return -1;
00262 
00263     if (q->nbytes + len + sizeof(len) > q->depth)
00264         WaitForSingleObject(q->evt, INFINITE);
00265 
00266     /* Lock things while we manipulate the buffer (FIXME: this
00267        actually should have been atomic with the wait above ...) */
00268     EnterCriticalSection(&q->mtx);
00269     in = (q->out + q->nbytes) % q->depth;
00270     /* First write the size of the message. */
00271     if (in + sizeof(len) > q->depth) {
00272         /* Handle the annoying case where the size field gets wrapped around. */
00273         size_t len1 = q->depth - in;
00274         memcpy(q->data + in, &len, len1);
00275         memcpy(q->data, ((char *)&len) + len1, sizeof(len) - len1);
00276         q->nbytes += sizeof(len);
00277         in = sizeof(len) - len1;
00278     }
00279     else {
00280         memcpy(q->data + in, &len, sizeof(len));
00281         q->nbytes += sizeof(len);
00282         in += sizeof(len);
00283     }
00284 
00285     /* Now write the message body. */
00286     if (in + len > q->depth) {
00287         /* Handle wraparound. */
00288         size_t len1 = q->depth - in;
00289         memcpy(q->data + in, cdata, len1);
00290         q->nbytes += len1;
00291         cdata += len1;
00292         len -= len1;
00293         in = 0;
00294     }
00295     memcpy(q->data + in, cdata, len);
00296     q->nbytes += len;
00297 
00298     /* Signal the condition variable. */
00299     SetEvent(q->evt);
00300     /* Unlock. */
00301     LeaveCriticalSection(&q->mtx);
00302 
00303     return 0;
00304 }
00305 
00306 void *
00307 sbmsgq_wait(sbmsgq_t *q, size_t *out_len, int sec, int nsec)
00308 {
00309     char *outptr;
00310     size_t len;
00311 
00312     /* Wait for data to be available. */
00313     if (q->nbytes == 0) {
00314         if (cond_timed_wait(q->evt, sec, nsec) == WAIT_FAILED)
00315             /* Timed out or something... */
00316             return NULL;
00317     }
00318     /* Lock to manipulate the queue (FIXME) */
00319     EnterCriticalSection(&q->mtx);
00320     /* Get the message size. */
00321     if (q->out + sizeof(q->msglen) > q->depth) {
00322         /* Handle annoying wraparound case. */
00323         size_t len1 = q->depth - q->out;
00324         memcpy(&q->msglen, q->data + q->out, len1);
00325         memcpy(((char *)&q->msglen) + len1, q->data,
00326                sizeof(q->msglen) - len1);
00327         q->out = sizeof(q->msglen) - len1;
00328     }
00329     else {
00330         memcpy(&q->msglen, q->data + q->out, sizeof(q->msglen));
00331         q->out += sizeof(q->msglen);
00332     }
00333     q->nbytes -= sizeof(q->msglen);
00334     /* Get the message body. */
00335     outptr = q->msg;
00336     len = q->msglen;
00337     if (q->out + q->msglen > q->depth) {
00338         /* Handle wraparound. */
00339         size_t len1 = q->depth - q->out;
00340         memcpy(outptr, q->data + q->out, len1);
00341         outptr += len1;
00342         len -= len1;
00343         q->nbytes -= len1;
00344         q->out = 0;
00345     }
00346     memcpy(outptr, q->data + q->out, len);
00347     q->nbytes -= len;
00348     q->out += len;
00349 
00350     /* Signal the condition variable. */
00351     SetEvent(q->evt);
00352     /* Unlock. */
00353     LeaveCriticalSection(&q->mtx);
00354     if (out_len)
00355         *out_len = q->msglen;
00356     return q->msg;
00357 }
00358 
00359 #else /* POSIX */
00360 #include <pthread.h>
00361 #include <sys/time.h>
00362 
00363 struct sbthread_s {
00364     cmd_ln_t *config;
00365     sbmsgq_t *msgq;
00366     sbthread_main func;
00367     void *arg;
00368     pthread_t th;
00369 };
00370 
00371 struct sbmsgq_s {
00372     /* Ringbuffer for passing messages. */
00373     char *data;
00374     size_t depth;
00375     size_t out;
00376     size_t nbytes;
00377 
00378     /* Current message is stored here. */
00379     char *msg;
00380     size_t msglen;
00381     pthread_mutex_t mtx;
00382     pthread_cond_t cond;
00383 };
00384 
00385 struct sbevent_s {
00386     pthread_mutex_t mtx;
00387     pthread_cond_t cond;
00388     int signalled;
00389 };
00390 
00391 struct sbmtx_s {
00392     pthread_mutex_t mtx;
00393 };
00394 
00395 static void *
00396 sbthread_internal_main(void *arg)
00397 {
00398     sbthread_t *th = (sbthread_t *)arg;
00399     int rv;
00400 
00401     rv = (*th->func)(th);
00402     return (void *)(long)rv;
00403 }
00404 
00405 sbthread_t *
00406 sbthread_start(cmd_ln_t *config, sbthread_main func, void *arg)
00407 {
00408     sbthread_t *th;
00409     int rv;
00410 
00411     th = ckd_calloc(1, sizeof(*th));
00412     th->config = config;
00413     th->func = func;
00414     th->arg = arg;
00415     th->msgq = sbmsgq_init(1024);
00416     if ((rv = pthread_create(&th->th, NULL, &sbthread_internal_main, th)) != 0) {
00417         E_ERROR("Failed to create thread: %d\n", rv);
00418         sbthread_free(th);
00419         return NULL;
00420     }
00421     return th;
00422 }
00423 
00424 int
00425 sbthread_wait(sbthread_t *th)
00426 {
00427     void *exit;
00428     int rv;
00429 
00430     /* It has already been joined. */
00431     if (th->th == (pthread_t)-1)
00432         return -1;
00433 
00434     rv = pthread_join(th->th, &exit);
00435     if (rv != 0) {
00436         E_ERROR("Failed to join thread: %d\n", rv);
00437         return -1;
00438     }
00439     th->th = (pthread_t)-1;
00440     return (int)(long)exit;
00441 }
00442 
00443 sbmsgq_t *
00444 sbmsgq_init(size_t depth)
00445 {
00446     sbmsgq_t *msgq;
00447 
00448     msgq = ckd_calloc(1, sizeof(*msgq));
00449     msgq->depth = depth;
00450     if (pthread_cond_init(&msgq->cond, NULL) != 0) {
00451         ckd_free(msgq);
00452         return NULL;
00453     }
00454     if (pthread_mutex_init(&msgq->mtx, NULL) != 0) {
00455         pthread_cond_destroy(&msgq->cond);
00456         ckd_free(msgq);
00457         return NULL;
00458     }
00459     msgq->data = ckd_calloc(depth, 1);
00460     msgq->msg = ckd_calloc(depth, 1);
00461     return msgq;
00462 }
00463 
00464 void
00465 sbmsgq_free(sbmsgq_t *msgq)
00466 {
00467     pthread_mutex_destroy(&msgq->mtx);
00468     pthread_cond_destroy(&msgq->cond);
00469     ckd_free(msgq->data);
00470     ckd_free(msgq->msg);
00471     ckd_free(msgq);
00472 }
00473 
00474 int
00475 sbmsgq_send(sbmsgq_t *q, size_t len, void const *data)
00476 {
00477     size_t in;
00478 
00479     /* Don't allow things bigger than depth to be sent! */
00480     if (len + sizeof(len) > q->depth)
00481         return -1;
00482 
00483     /* Lock the condition variable while we manipulate the buffer. */
00484     pthread_mutex_lock(&q->mtx);
00485     if (q->nbytes + len + sizeof(len) > q->depth) {
00486         /* Unlock and wait for space to be available. */
00487         if (pthread_cond_wait(&q->cond, &q->mtx) != 0) {
00488             /* Timed out, don't send anything. */
00489             pthread_mutex_unlock(&q->mtx);
00490             return -1;
00491         }
00492         /* Condition is now locked again. */
00493     }
00494     in = (q->out + q->nbytes) % q->depth;
00495 
00496     /* First write the size of the message. */
00497     if (in + sizeof(len) > q->depth) {
00498         /* Handle the annoying case where the size field gets wrapped around. */
00499         size_t len1 = q->depth - in;
00500         memcpy(q->data + in, &len, len1);
00501         memcpy(q->data, ((char *)&len) + len1, sizeof(len) - len1);
00502         q->nbytes += sizeof(len);
00503         in = sizeof(len) - len1;
00504     }
00505     else {
00506         memcpy(q->data + in, &len, sizeof(len));
00507         q->nbytes += sizeof(len);
00508         in += sizeof(len);
00509     }
00510 
00511     /* Now write the message body. */
00512     if (in + len > q->depth) {
00513         /* Handle wraparound. */
00514         size_t len1 = q->depth - in;
00515         memcpy(q->data + in, data, len1);
00516         q->nbytes += len1;
00517         data = (char const *)data + len1;
00518         len -= len1;
00519         in = 0;
00520     }
00521     memcpy(q->data + in, data, len);
00522     q->nbytes += len;
00523 
00524     /* Signal the condition variable. */
00525     pthread_cond_signal(&q->cond);
00526     /* Unlock it, we have nothing else to do. */
00527     pthread_mutex_unlock(&q->mtx);
00528     return 0;
00529 }
00530 
00531 static int
00532 cond_timed_wait(pthread_cond_t *cond, pthread_mutex_t *mtx, int sec, int nsec)
00533 {
00534     int rv;
00535     if (sec == -1) {
00536         rv = pthread_cond_wait(cond, mtx);
00537     }
00538     else {
00539         struct timeval now;
00540         struct timespec end;
00541 
00542         gettimeofday(&now, NULL);
00543         end.tv_sec = now.tv_sec + sec;
00544         end.tv_nsec = now.tv_usec * 1000 + nsec;
00545         if (end.tv_nsec > (1000*1000*1000)) {
00546             sec += end.tv_nsec / (1000*1000*1000);
00547             end.tv_nsec = end.tv_nsec % (1000*1000*1000);
00548         }
00549         rv = pthread_cond_timedwait(cond, mtx, &end);
00550     }
00551     return rv;
00552 }
00553 
00554 void *
00555 sbmsgq_wait(sbmsgq_t *q, size_t *out_len, int sec, int nsec)
00556 {
00557     char *outptr;
00558     size_t len;
00559 
00560     /* Lock the condition variable while we manipulate nmsg. */
00561     pthread_mutex_lock(&q->mtx);
00562     if (q->nbytes == 0) {
00563         /* Unlock the condition variable and wait for a signal. */
00564         if (cond_timed_wait(&q->cond, &q->mtx, sec, nsec) != 0) {
00565             /* Timed out or something... */
00566             pthread_mutex_unlock(&q->mtx);
00567             return NULL;
00568         }
00569         /* Condition variable is now locked again. */
00570     }
00571     /* Get the message size. */
00572     if (q->out + sizeof(q->msglen) > q->depth) {
00573         /* Handle annoying wraparound case. */
00574         size_t len1 = q->depth - q->out;
00575         memcpy(&q->msglen, q->data + q->out, len1);
00576         memcpy(((char *)&q->msglen) + len1, q->data,
00577                sizeof(q->msglen) - len1);
00578         q->out = sizeof(q->msglen) - len1;
00579     }
00580     else {
00581         memcpy(&q->msglen, q->data + q->out, sizeof(q->msglen));
00582         q->out += sizeof(q->msglen);
00583     }
00584     q->nbytes -= sizeof(q->msglen);
00585     /* Get the message body. */
00586     outptr = q->msg;
00587     len = q->msglen;
00588     if (q->out + q->msglen > q->depth) {
00589         /* Handle wraparound. */
00590         size_t len1 = q->depth - q->out;
00591         memcpy(outptr, q->data + q->out, len1);
00592         outptr += len1;
00593         len -= len1;
00594         q->nbytes -= len1;
00595         q->out = 0;
00596     }
00597     memcpy(outptr, q->data + q->out, len);
00598     q->nbytes -= len;
00599     q->out += len;
00600 
00601     /* Signal the condition variable. */
00602     pthread_cond_signal(&q->cond);
00603     /* Unlock the condition variable, we are done. */
00604     pthread_mutex_unlock(&q->mtx);
00605     if (out_len)
00606         *out_len = q->msglen;
00607     return q->msg;
00608 }
00609 
00610 sbevent_t *
00611 sbevent_init(void)
00612 {
00613     sbevent_t *evt;
00614     int rv;
00615 
00616     evt = ckd_calloc(1, sizeof(*evt));
00617     if ((rv = pthread_mutex_init(&evt->mtx, NULL)) != 0) {
00618         E_ERROR("Failed to initialize mutex: %d\n", rv);
00619         ckd_free(evt);
00620         return NULL;
00621     }
00622     if ((rv = pthread_cond_init(&evt->cond, NULL)) != 0) {
00623         E_ERROR_SYSTEM("Failed to initialize mutex: %d\n", rv);
00624         pthread_mutex_destroy(&evt->mtx);
00625         ckd_free(evt);
00626         return NULL;
00627     }
00628     return evt;
00629 }
00630 
00631 void
00632 sbevent_free(sbevent_t *evt)
00633 {
00634     pthread_mutex_destroy(&evt->mtx);
00635     pthread_cond_destroy(&evt->cond);
00636     ckd_free(evt);
00637 }
00638 
00639 int
00640 sbevent_signal(sbevent_t *evt)
00641 {
00642     int rv;
00643 
00644     pthread_mutex_lock(&evt->mtx);
00645     evt->signalled = TRUE;
00646     rv = pthread_cond_signal(&evt->cond);
00647     pthread_mutex_unlock(&evt->mtx);
00648     return rv;
00649 }
00650 
00651 int
00652 sbevent_wait(sbevent_t *evt, int sec, int nsec)
00653 {
00654     int rv = 0;
00655 
00656     /* Lock the mutex before we check its signalled state. */
00657     pthread_mutex_lock(&evt->mtx);
00658     /* If it's not signalled, then wait until it is. */
00659     if (!evt->signalled)
00660         rv = cond_timed_wait(&evt->cond, &evt->mtx, sec, nsec);
00661     /* Set its state to unsignalled if we were successful. */
00662     if (rv == 0)
00663         evt->signalled = FALSE;
00664     /* And unlock its mutex. */
00665     pthread_mutex_unlock(&evt->mtx);
00666 
00667     return rv;
00668 }
00669 
00670 sbmtx_t *
00671 sbmtx_init(void)
00672 {
00673     sbmtx_t *mtx;
00674 
00675     mtx = ckd_calloc(1, sizeof(*mtx));
00676     if (pthread_mutex_init(&mtx->mtx, NULL) != 0) {
00677         ckd_free(mtx);
00678         return NULL;
00679     }
00680     return mtx;
00681 }
00682 
00683 int
00684 sbmtx_trylock(sbmtx_t *mtx)
00685 {
00686     return pthread_mutex_trylock(&mtx->mtx);
00687 }
00688 
00689 int
00690 sbmtx_lock(sbmtx_t *mtx)
00691 {
00692     return pthread_mutex_lock(&mtx->mtx);
00693 }
00694 
00695 int
00696 sbmtx_unlock(sbmtx_t *mtx)
00697 {
00698     return pthread_mutex_unlock(&mtx->mtx);
00699 }
00700 
00701 void
00702 sbmtx_free(sbmtx_t *mtx)
00703 {
00704     pthread_mutex_destroy(&mtx->mtx);
00705     ckd_free(mtx);
00706 }
00707 #endif /* not WIN32 */
00708 
00709 cmd_ln_t *
00710 sbthread_config(sbthread_t *th)
00711 {
00712     return th->config;
00713 }
00714 
00715 void *
00716 sbthread_arg(sbthread_t *th)
00717 {
00718     return th->arg;
00719 }
00720 
00721 sbmsgq_t *
00722 sbthread_msgq(sbthread_t *th)
00723 {
00724     return th->msgq;
00725 }
00726 
00727 int
00728 sbthread_send(sbthread_t *th, size_t len, void const *data)
00729 {
00730     return sbmsgq_send(th->msgq, len, data);
00731 }
00732 
00733 void
00734 sbthread_free(sbthread_t *th)
00735 {
00736     sbthread_wait(th);
00737     sbmsgq_free(th->msgq);
00738     ckd_free(th);
00739 }