Sat Mar 10 01:54:25 2012

Asterisk developer's documentation


res_timing_pthread.c

Go to the documentation of this file.
00001 /*
00002  * Asterisk -- An open source telephony toolkit.
00003  *
00004  * Copyright (C) 2008, Digium, Inc.
00005  *
00006  * Russell Bryant <russell@digium.com>
00007  *
00008  * See http://www.asterisk.org for more information about
00009  * the Asterisk project. Please do not directly contact
00010  * any of the maintainers of this project for assistance;
00011  * the project provides a web site, mailing lists and IRC
00012  * channels for your use.
00013  *
00014  * This program is free software, distributed under the terms of
00015  * the GNU General Public License Version 2. See the LICENSE file
00016  * at the top of the source tree.
00017  */
00018 
00019 /*!
00020  * \file
00021  * \author Russell Bryant <russell@digium.com>
00022  *
00023  * \brief pthread timing interface
00024  */
00025 
00026 /*** MODULEINFO
00027    <support_level>extended</support_level>
00028  ***/
00029 
00030 #include "asterisk.h"
00031 
00032 ASTERISK_FILE_VERSION(__FILE__, "$Revision: 349194 $");
00033 
00034 #include <math.h>
00035 #include <sys/select.h>
00036 
00037 #include "asterisk/module.h"
00038 #include "asterisk/timing.h"
00039 #include "asterisk/utils.h"
00040 #include "asterisk/astobj2.h"
00041 #include "asterisk/time.h"
00042 #include "asterisk/lock.h"
00043 #include "asterisk/poll-compat.h"
00044 
00045 static void *timing_funcs_handle;
00046 
00047 static int pthread_timer_open(void);
00048 static void pthread_timer_close(int handle);
00049 static int pthread_timer_set_rate(int handle, unsigned int rate);
00050 static void pthread_timer_ack(int handle, unsigned int quantity);
00051 static int pthread_timer_enable_continuous(int handle);
00052 static int pthread_timer_disable_continuous(int handle);
00053 static enum ast_timer_event pthread_timer_get_event(int handle);
00054 static unsigned int pthread_timer_get_max_rate(int handle);
00055 
00056 static struct ast_timing_interface pthread_timing = {
00057    .name = "pthread",
00058    .priority = 0, /* use this as a last resort */
00059    .timer_open = pthread_timer_open,
00060    .timer_close = pthread_timer_close,
00061    .timer_set_rate = pthread_timer_set_rate,
00062    .timer_ack = pthread_timer_ack,
00063    .timer_enable_continuous = pthread_timer_enable_continuous,
00064    .timer_disable_continuous = pthread_timer_disable_continuous,
00065    .timer_get_event = pthread_timer_get_event,
00066    .timer_get_max_rate = pthread_timer_get_max_rate,
00067 };
00068 
00069 /* 1 tick / 10 ms */
00070 #define MAX_RATE 100
00071 
00072 static struct ao2_container *pthread_timers;
00073 #define PTHREAD_TIMER_BUCKETS 563
00074 
00075 enum {
00076    PIPE_READ =  0,
00077    PIPE_WRITE = 1
00078 };
00079 
00080 enum pthread_timer_state {
00081    TIMER_STATE_IDLE,
00082    TIMER_STATE_TICKING,
00083 };
00084 
00085 struct pthread_timer {
00086    int pipe[2];
00087    enum pthread_timer_state state;
00088    unsigned int rate;
00089    /*! Interval in ms for current rate */
00090    unsigned int interval;
00091    unsigned int tick_count;
00092    unsigned int pending_ticks;
00093    struct timeval start;
00094    unsigned int continuous:1;
00095 };
00096 
00097 static void pthread_timer_destructor(void *obj);
00098 static struct pthread_timer *find_timer(int handle, int unlinkobj);
00099 static void write_byte(struct pthread_timer *timer);
00100 static void read_pipe(struct pthread_timer *timer, unsigned int num);
00101 
00102 /*!
00103  * \brief Data for the timing thread
00104  */
00105 static struct {
00106    pthread_t thread;
00107    ast_mutex_t lock;
00108    ast_cond_t cond;
00109    unsigned int stop:1;
00110 } timing_thread;
00111 
00112 static int pthread_timer_open(void)
00113 {
00114    struct pthread_timer *timer;
00115    int fd;
00116 
00117    if (!(timer = ao2_alloc(sizeof(*timer), pthread_timer_destructor))) {
00118       errno = ENOMEM;
00119       return -1;
00120    }
00121 
00122    timer->pipe[PIPE_READ] = timer->pipe[PIPE_WRITE] = -1;
00123    timer->state = TIMER_STATE_IDLE;
00124 
00125    if (pipe(timer->pipe)) {
00126       ao2_ref(timer, -1);
00127       return -1;
00128    }
00129 
00130    ao2_lock(pthread_timers);
00131    if (!ao2_container_count(pthread_timers)) {
00132       ast_mutex_lock(&timing_thread.lock);
00133       ast_cond_signal(&timing_thread.cond);
00134       ast_mutex_unlock(&timing_thread.lock);
00135    }
00136    ao2_link(pthread_timers, timer);
00137    ao2_unlock(pthread_timers);
00138 
00139    fd = timer->pipe[PIPE_READ];
00140 
00141    ao2_ref(timer, -1);
00142 
00143    return fd;
00144 }
00145 
00146 static void pthread_timer_close(int handle)
00147 {
00148    struct pthread_timer *timer;
00149 
00150    if (!(timer = find_timer(handle, 1))) {
00151       return;
00152    }
00153 
00154    ao2_ref(timer, -1);
00155 }
00156 
00157 static int pthread_timer_set_rate(int handle, unsigned int rate)
00158 {
00159    struct pthread_timer *timer;
00160 
00161    if (!(timer = find_timer(handle, 0))) {
00162       errno = EINVAL;
00163       return -1;
00164    }
00165 
00166    if (rate > MAX_RATE) {
00167       ast_log(LOG_ERROR, "res_timing_pthread only supports timers at a "
00168             "max rate of %d / sec\n", MAX_RATE);
00169       errno = EINVAL;
00170       return -1;
00171    }
00172 
00173    ao2_lock(timer);
00174 
00175    if ((timer->rate = rate)) {
00176       timer->interval = roundf(1000.0 / ((float) rate));
00177       timer->start = ast_tvnow();
00178       timer->state = TIMER_STATE_TICKING;
00179    } else {
00180       timer->interval = 0;
00181       timer->start = ast_tv(0, 0);
00182       timer->state = TIMER_STATE_IDLE;
00183    }
00184    timer->tick_count = 0;
00185 
00186    ao2_unlock(timer);
00187 
00188    ao2_ref(timer, -1);
00189 
00190    return 0;
00191 }
00192 
00193 static void pthread_timer_ack(int handle, unsigned int quantity)
00194 {
00195    struct pthread_timer *timer;
00196 
00197    ast_assert(quantity > 0);
00198 
00199    if (!(timer = find_timer(handle, 0))) {
00200       return;
00201    }
00202 
00203    ao2_lock(timer);
00204    read_pipe(timer, quantity);
00205    ao2_unlock(timer);
00206 
00207    ao2_ref(timer, -1);
00208 }
00209 
00210 static int pthread_timer_enable_continuous(int handle)
00211 {
00212    struct pthread_timer *timer;
00213 
00214    if (!(timer = find_timer(handle, 0))) {
00215       errno = EINVAL;
00216       return -1;
00217    }
00218 
00219    ao2_lock(timer);
00220    if (!timer->continuous) {
00221       timer->continuous = 1;
00222       write_byte(timer);
00223    }
00224    ao2_unlock(timer);
00225 
00226    ao2_ref(timer, -1);
00227 
00228    return 0;
00229 }
00230 
00231 static int pthread_timer_disable_continuous(int handle)
00232 {
00233    struct pthread_timer *timer;
00234 
00235    if (!(timer = find_timer(handle, 0))) {
00236       errno = EINVAL;
00237       return -1;
00238    }
00239 
00240    ao2_lock(timer);
00241    if (timer->continuous) {
00242       timer->continuous = 0;
00243       read_pipe(timer, 1);
00244    }
00245    ao2_unlock(timer);
00246 
00247    ao2_ref(timer, -1);
00248 
00249    return 0;
00250 }
00251 
00252 static enum ast_timer_event pthread_timer_get_event(int handle)
00253 {
00254    struct pthread_timer *timer;
00255    enum ast_timer_event res = AST_TIMING_EVENT_EXPIRED;
00256 
00257    if (!(timer = find_timer(handle, 0))) {
00258       return res;
00259    }
00260 
00261    ao2_lock(timer);
00262    if (timer->continuous && timer->pending_ticks == 1) {
00263       res = AST_TIMING_EVENT_CONTINUOUS;
00264    }
00265    ao2_unlock(timer);
00266 
00267    ao2_ref(timer, -1);
00268 
00269    return res;
00270 }
00271 
00272 static unsigned int pthread_timer_get_max_rate(int handle)
00273 {
00274    return MAX_RATE;
00275 }
00276 
00277 static struct pthread_timer *find_timer(int handle, int unlinkobj)
00278 {
00279    struct pthread_timer *timer;
00280    struct pthread_timer tmp_timer;
00281    int flags = OBJ_POINTER;
00282 
00283    tmp_timer.pipe[PIPE_READ] = handle;
00284 
00285    if (unlinkobj) {
00286       flags |= OBJ_UNLINK;
00287    }
00288 
00289    if (!(timer = ao2_find(pthread_timers, &tmp_timer, flags))) {
00290       ast_assert(timer != NULL);
00291       return NULL;
00292    }
00293 
00294    return timer;
00295 }
00296 
00297 static void pthread_timer_destructor(void *obj)
00298 {
00299    struct pthread_timer *timer = obj;
00300 
00301    if (timer->pipe[PIPE_READ] > -1) {
00302       close(timer->pipe[PIPE_READ]);
00303       timer->pipe[PIPE_READ] = -1;
00304    }
00305 
00306    if (timer->pipe[PIPE_WRITE] > -1) {
00307       close(timer->pipe[PIPE_WRITE]);
00308       timer->pipe[PIPE_WRITE] = -1;
00309    }
00310 }
00311 
00312 /*!
00313  * \note only PIPE_READ is guaranteed valid
00314  */
00315 static int pthread_timer_hash(const void *obj, const int flags)
00316 {
00317    const struct pthread_timer *timer = obj;
00318 
00319    return timer->pipe[PIPE_READ];
00320 }
00321 
00322 /*!
00323  * \note only PIPE_READ is guaranteed valid
00324  */
00325 static int pthread_timer_cmp(void *obj, void *arg, int flags)
00326 {
00327    struct pthread_timer *timer1 = obj, *timer2 = arg;
00328 
00329    return (timer1->pipe[PIPE_READ] == timer2->pipe[PIPE_READ]) ? CMP_MATCH | CMP_STOP : 0;
00330 }
00331 
00332 /*!
00333  * \retval 0 no timer tick needed
00334  * \retval non-zero write to the timing pipe needed
00335  */
00336 static int check_timer(struct pthread_timer *timer)
00337 {
00338    struct timeval now;
00339 
00340    if (timer->state == TIMER_STATE_IDLE) {
00341       return 0;
00342    }
00343 
00344    now = ast_tvnow();
00345 
00346    if (timer->tick_count < (ast_tvdiff_ms(now, timer->start) / timer->interval)) {
00347       timer->tick_count++;
00348       if (!timer->tick_count) {
00349          /* Handle overflow. */
00350          timer->start = now;
00351       }
00352       return 1;
00353    }
00354 
00355    return 0;
00356 }
00357 
00358 /*!
00359  * \internal
00360  * \pre timer is locked
00361  */
00362 static void read_pipe(struct pthread_timer *timer, unsigned int quantity)
00363 {
00364    int rd_fd = timer->pipe[PIPE_READ];
00365    int pending_ticks = timer->pending_ticks;
00366 
00367    ast_assert(quantity);
00368 
00369    if (timer->continuous && pending_ticks) {
00370       pending_ticks--;
00371    }
00372 
00373    if (quantity > pending_ticks) {
00374       quantity = pending_ticks;
00375    }
00376 
00377    if (!quantity) {
00378       return;
00379    }
00380 
00381    do {
00382       unsigned char buf[1024];
00383       ssize_t res;
00384       struct pollfd pfd = {
00385          .fd = rd_fd,
00386          .events = POLLIN,
00387       };
00388 
00389       if (ast_poll(&pfd, 1, 0) != 1) {
00390          ast_debug(1, "Reading not available on timing pipe, "
00391                "quantity: %u\n", quantity);
00392          break;
00393       }
00394 
00395       res = read(rd_fd, buf,
00396          (quantity < sizeof(buf)) ? quantity : sizeof(buf));
00397 
00398       if (res == -1) {
00399          if (errno == EAGAIN) {
00400             continue;
00401          }
00402          ast_log(LOG_ERROR, "read failed on timing pipe: %s\n",
00403                strerror(errno));
00404          break;
00405       }
00406 
00407       quantity -= res;
00408       timer->pending_ticks -= res;
00409    } while (quantity);
00410 }
00411 
00412 /*!
00413  * \internal
00414  * \pre timer is locked
00415  */
00416 static void write_byte(struct pthread_timer *timer)
00417 {
00418    ssize_t res;
00419    unsigned char x = 42;
00420 
00421    do {
00422       res = write(timer->pipe[PIPE_WRITE], &x, 1);
00423    } while (res == -1 && errno == EAGAIN);
00424 
00425    if (res == -1) {
00426       ast_log(LOG_ERROR, "Error writing to timing pipe: %s\n",
00427             strerror(errno));
00428    } else {
00429       timer->pending_ticks++;
00430    }
00431 }
00432 
00433 static int run_timer(void *obj, void *arg, int flags)
00434 {
00435    struct pthread_timer *timer = obj;
00436 
00437    if (timer->state == TIMER_STATE_IDLE) {
00438       return 0;
00439    }
00440 
00441    ao2_lock(timer);
00442    if (check_timer(timer)) {
00443       write_byte(timer);
00444    }
00445    ao2_unlock(timer);
00446 
00447    return 0;
00448 }
00449 
00450 static void *do_timing(void *arg)
00451 {
00452    struct timeval next_wakeup = ast_tvnow();
00453 
00454    while (!timing_thread.stop) {
00455       struct timespec ts = { 0, };
00456 
00457       ao2_callback(pthread_timers, OBJ_NODATA, run_timer, NULL);
00458 
00459       next_wakeup = ast_tvadd(next_wakeup, ast_tv(0, 5000));
00460 
00461       ts.tv_sec = next_wakeup.tv_sec;
00462       ts.tv_nsec = next_wakeup.tv_usec * 1000;
00463 
00464       ast_mutex_lock(&timing_thread.lock);
00465       if (!timing_thread.stop) {
00466          if (ao2_container_count(pthread_timers)) {
00467             ast_cond_timedwait(&timing_thread.cond, &timing_thread.lock, &ts);
00468          } else {
00469             ast_cond_wait(&timing_thread.cond, &timing_thread.lock);
00470          }
00471       }
00472       ast_mutex_unlock(&timing_thread.lock);
00473    }
00474 
00475    return NULL;
00476 }
00477 
00478 static int init_timing_thread(void)
00479 {
00480    ast_mutex_init(&timing_thread.lock);
00481    ast_cond_init(&timing_thread.cond, NULL);
00482 
00483    if (ast_pthread_create_background(&timing_thread.thread, NULL, do_timing, NULL)) {
00484       ast_log(LOG_ERROR, "Unable to start timing thread.\n");
00485       return -1;
00486    }
00487 
00488    return 0;
00489 }
00490 
00491 static int load_module(void)
00492 {
00493    if (!(pthread_timers = ao2_container_alloc(PTHREAD_TIMER_BUCKETS,
00494       pthread_timer_hash, pthread_timer_cmp))) {
00495       return AST_MODULE_LOAD_DECLINE;
00496    }
00497 
00498    if (init_timing_thread()) {
00499       ao2_ref(pthread_timers, -1);
00500       pthread_timers = NULL;
00501       return AST_MODULE_LOAD_DECLINE;
00502    }
00503 
00504    return (timing_funcs_handle = ast_register_timing_interface(&pthread_timing)) ?
00505       AST_MODULE_LOAD_SUCCESS : AST_MODULE_LOAD_DECLINE;
00506 }
00507 
00508 static int unload_module(void)
00509 {
00510    int res;
00511 
00512    ast_mutex_lock(&timing_thread.lock);
00513    timing_thread.stop = 1;
00514    ast_cond_signal(&timing_thread.cond);
00515    ast_mutex_unlock(&timing_thread.lock);
00516    pthread_join(timing_thread.thread, NULL);
00517 
00518    if (!(res = ast_unregister_timing_interface(timing_funcs_handle))) {
00519       ao2_ref(pthread_timers, -1);
00520       pthread_timers = NULL;
00521    }
00522 
00523    return res;
00524 }
00525 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_LOAD_ORDER, "pthread Timing Interface",
00526       .load = load_module,
00527       .unload = unload_module,
00528       .load_pri = AST_MODPRI_TIMING,
00529       );

Generated on Sat Mar 10 01:54:25 2012 for Asterisk - The Open Source Telephony Project by  doxygen 1.4.7