Tue Aug 20 16:34:38 2013

Asterisk developer's documentation


res_timing_pthread.c

Go to the documentation of this file.
00001 /*
00002  * Asterisk -- An open source telephony toolkit.
00003  *
00004  * Copyright (C) 2008, Digium, Inc.
00005  *
00006  * Russell Bryant <russell@digium.com>
00007  *
00008  * See http://www.asterisk.org for more information about
00009  * the Asterisk project. Please do not directly contact
00010  * any of the maintainers of this project for assistance;
00011  * the project provides a web site, mailing lists and IRC
00012  * channels for your use.
00013  *
00014  * This program is free software, distributed under the terms of
00015  * the GNU General Public License Version 2. See the LICENSE file
00016  * at the top of the source tree.
00017  */
00018 
00019 /*!
00020  * \file
00021  * \author Russell Bryant <russell@digium.com>
00022  *
00023  * \brief pthread timing interface
00024  */
00025 
00026 /*** MODULEINFO
00027    <support_level>extended</support_level>
00028  ***/
00029 
00030 #include "asterisk.h"
00031 
00032 ASTERISK_FILE_VERSION(__FILE__, "$Revision: 388195 $");
00033 
00034 #include <stdbool.h>
00035 #include <math.h>
00036 #include <unistd.h>
00037 #include <fcntl.h>
00038 
00039 #include "asterisk/module.h"
00040 #include "asterisk/timing.h"
00041 #include "asterisk/utils.h"
00042 #include "asterisk/astobj2.h"
00043 #include "asterisk/time.h"
00044 #include "asterisk/lock.h"
00045 
00046 static void *timing_funcs_handle;
00047 
00048 static int pthread_timer_open(void);
00049 static void pthread_timer_close(int handle);
00050 static int pthread_timer_set_rate(int handle, unsigned int rate);
00051 static int pthread_timer_ack(int handle, unsigned int quantity);
00052 static int pthread_timer_enable_continuous(int handle);
00053 static int pthread_timer_disable_continuous(int handle);
00054 static enum ast_timer_event pthread_timer_get_event(int handle);
00055 static unsigned int pthread_timer_get_max_rate(int handle);
00056 
00057 static struct ast_timing_interface pthread_timing = {
00058    .name = "pthread",
00059    .priority = 0, /* use this as a last resort */
00060    .timer_open = pthread_timer_open,
00061    .timer_close = pthread_timer_close,
00062    .timer_set_rate = pthread_timer_set_rate,
00063    .timer_ack = pthread_timer_ack,
00064    .timer_enable_continuous = pthread_timer_enable_continuous,
00065    .timer_disable_continuous = pthread_timer_disable_continuous,
00066    .timer_get_event = pthread_timer_get_event,
00067    .timer_get_max_rate = pthread_timer_get_max_rate,
00068 };
00069 
00070 /* 1 tick / 10 ms */
00071 #define MAX_RATE 100
00072 
00073 static struct ao2_container *pthread_timers;
00074 #define PTHREAD_TIMER_BUCKETS 563
00075 
00076 enum {
00077    PIPE_READ =  0,
00078    PIPE_WRITE = 1
00079 };
00080 
00081 enum pthread_timer_state {
00082    TIMER_STATE_IDLE,
00083    TIMER_STATE_TICKING,
00084 };
00085 
00086 struct pthread_timer {
00087    int pipe[2];
00088    enum pthread_timer_state state;
00089    unsigned int rate;
00090    /*! Interval in ms for current rate */
00091    unsigned int interval;
00092    unsigned int tick_count;
00093    unsigned int pending_ticks;
00094    struct timeval start;
00095    bool continuous:1;
00096    bool pipe_signaled:1;
00097 };
00098 
00099 static void pthread_timer_destructor(void *obj);
00100 static struct pthread_timer *find_timer(int handle, int unlinkobj);
00101 static void signal_pipe(struct pthread_timer *timer);
00102 static void unsignal_pipe(struct pthread_timer *timer);
00103 static void ack_ticks(struct pthread_timer *timer, unsigned int num);
00104 
00105 /*!
00106  * \brief Data for the timing thread
00107  */
00108 static struct {
00109    pthread_t thread;
00110    ast_mutex_t lock;
00111    ast_cond_t cond;
00112    unsigned int stop:1;
00113 } timing_thread;
00114 
00115 static int pthread_timer_open(void)
00116 {
00117    struct pthread_timer *timer;
00118    int fd;
00119    int i;
00120 
00121    if (!(timer = ao2_alloc(sizeof(*timer), pthread_timer_destructor))) {
00122       errno = ENOMEM;
00123       return -1;
00124    }
00125 
00126    timer->pipe[PIPE_READ] = timer->pipe[PIPE_WRITE] = -1;
00127    timer->state = TIMER_STATE_IDLE;
00128 
00129    if (pipe(timer->pipe)) {
00130       ao2_ref(timer, -1);
00131       return -1;
00132    }
00133 
00134    for (i = 0; i < ARRAY_LEN(timer->pipe); ++i) {
00135       int flags = fcntl(timer->pipe[i], F_GETFL);
00136       flags |= O_NONBLOCK;
00137       fcntl(timer->pipe[i], F_SETFL, flags);
00138    }
00139    
00140    ao2_lock(pthread_timers);
00141    if (!ao2_container_count(pthread_timers)) {
00142       ast_mutex_lock(&timing_thread.lock);
00143       ast_cond_signal(&timing_thread.cond);
00144       ast_mutex_unlock(&timing_thread.lock);
00145    }
00146    ao2_link(pthread_timers, timer);
00147    ao2_unlock(pthread_timers);
00148 
00149    fd = timer->pipe[PIPE_READ];
00150 
00151    ao2_ref(timer, -1);
00152 
00153    return fd;
00154 }
00155 
00156 static void pthread_timer_close(int handle)
00157 {
00158    struct pthread_timer *timer;
00159 
00160    if (!(timer = find_timer(handle, 1))) {
00161       return;
00162    }
00163 
00164    ao2_ref(timer, -1);
00165 }
00166 
00167 static int pthread_timer_set_rate(int handle, unsigned int rate)
00168 {
00169    struct pthread_timer *timer;
00170 
00171    if (!(timer = find_timer(handle, 0))) {
00172       errno = EINVAL;
00173       return -1;
00174    }
00175 
00176    if (rate > MAX_RATE) {
00177       ast_log(LOG_ERROR, "res_timing_pthread only supports timers at a "
00178             "max rate of %d / sec\n", MAX_RATE);
00179       errno = EINVAL;
00180       return -1;
00181    }
00182 
00183    ao2_lock(timer);
00184 
00185    if ((timer->rate = rate)) {
00186       timer->interval = roundf(1000.0 / ((float) rate));
00187       timer->start = ast_tvnow();
00188       timer->state = TIMER_STATE_TICKING;
00189    } else {
00190       timer->interval = 0;
00191       timer->start = ast_tv(0, 0);
00192       timer->state = TIMER_STATE_IDLE;
00193    }
00194    timer->tick_count = 0;
00195 
00196    ao2_unlock(timer);
00197 
00198    ao2_ref(timer, -1);
00199 
00200    return 0;
00201 }
00202 
00203 static int pthread_timer_ack(int handle, unsigned int quantity)
00204 {
00205    struct pthread_timer *timer;
00206 
00207    ast_assert(quantity > 0);
00208 
00209    if (!(timer = find_timer(handle, 0))) {
00210       return -1;
00211    }
00212 
00213    ao2_lock(timer);
00214    ack_ticks(timer, quantity);
00215    ao2_unlock(timer);
00216 
00217    ao2_ref(timer, -1);
00218 
00219    return 0;
00220 }
00221 
00222 static int pthread_timer_enable_continuous(int handle)
00223 {
00224    struct pthread_timer *timer;
00225 
00226    if (!(timer = find_timer(handle, 0))) {
00227       errno = EINVAL;
00228       return -1;
00229    }
00230 
00231    ao2_lock(timer);
00232    if (!timer->continuous) {
00233       timer->continuous = true;
00234       signal_pipe(timer);
00235    }
00236    ao2_unlock(timer);
00237 
00238    ao2_ref(timer, -1);
00239 
00240    return 0;
00241 }
00242 
00243 static int pthread_timer_disable_continuous(int handle)
00244 {
00245    struct pthread_timer *timer;
00246 
00247    if (!(timer = find_timer(handle, 0))) {
00248       errno = EINVAL;
00249       return -1;
00250    }
00251 
00252    ao2_lock(timer);
00253    if (timer->continuous) {
00254       timer->continuous = false;
00255       unsignal_pipe(timer);
00256    }
00257    ao2_unlock(timer);
00258 
00259    ao2_ref(timer, -1);
00260 
00261    return 0;
00262 }
00263 
00264 static enum ast_timer_event pthread_timer_get_event(int handle)
00265 {
00266    struct pthread_timer *timer;
00267    enum ast_timer_event res = AST_TIMING_EVENT_EXPIRED;
00268 
00269    if (!(timer = find_timer(handle, 0))) {
00270       return res;
00271    }
00272 
00273    ao2_lock(timer);
00274    if (timer->continuous) {
00275       res = AST_TIMING_EVENT_CONTINUOUS;
00276    }
00277    ao2_unlock(timer);
00278 
00279    ao2_ref(timer, -1);
00280 
00281    return res;
00282 }
00283 
00284 static unsigned int pthread_timer_get_max_rate(int handle)
00285 {
00286    return MAX_RATE;
00287 }
00288 
00289 static struct pthread_timer *find_timer(int handle, int unlinkobj)
00290 {
00291    struct pthread_timer *timer;
00292    struct pthread_timer tmp_timer;
00293    int flags = OBJ_POINTER;
00294 
00295    tmp_timer.pipe[PIPE_READ] = handle;
00296 
00297    if (unlinkobj) {
00298       flags |= OBJ_UNLINK;
00299    }
00300 
00301    if (!(timer = ao2_find(pthread_timers, &tmp_timer, flags))) {
00302       ast_assert(timer != NULL);
00303       return NULL;
00304    }
00305 
00306    return timer;
00307 }
00308 
00309 static void pthread_timer_destructor(void *obj)
00310 {
00311    struct pthread_timer *timer = obj;
00312 
00313    if (timer->pipe[PIPE_READ] > -1) {
00314       close(timer->pipe[PIPE_READ]);
00315       timer->pipe[PIPE_READ] = -1;
00316    }
00317 
00318    if (timer->pipe[PIPE_WRITE] > -1) {
00319       close(timer->pipe[PIPE_WRITE]);
00320       timer->pipe[PIPE_WRITE] = -1;
00321    }
00322 }
00323 
00324 /*!
00325  * \note only PIPE_READ is guaranteed valid
00326  */
00327 static int pthread_timer_hash(const void *obj, const int flags)
00328 {
00329    const struct pthread_timer *timer = obj;
00330 
00331    return timer->pipe[PIPE_READ];
00332 }
00333 
00334 /*!
00335  * \note only PIPE_READ is guaranteed valid
00336  */
00337 static int pthread_timer_cmp(void *obj, void *arg, int flags)
00338 {
00339    struct pthread_timer *timer1 = obj, *timer2 = arg;
00340 
00341    return (timer1->pipe[PIPE_READ] == timer2->pipe[PIPE_READ]) ? CMP_MATCH | CMP_STOP : 0;
00342 }
00343 
00344 /*!
00345  * \retval 0 no timer tick needed
00346  * \retval non-zero write to the timing pipe needed
00347  */
00348 static int check_timer(struct pthread_timer *timer)
00349 {
00350    struct timeval now;
00351 
00352    if (timer->state == TIMER_STATE_IDLE) {
00353       return 0;
00354    }
00355 
00356    now = ast_tvnow();
00357 
00358    if (timer->tick_count < (ast_tvdiff_ms(now, timer->start) / timer->interval)) {
00359       timer->tick_count++;
00360       if (!timer->tick_count) {
00361          /* Handle overflow. */
00362          timer->start = now;
00363       }
00364       return 1;
00365    }
00366 
00367    return 0;
00368 }
00369 
00370 /*!
00371  * \internal
00372  * \pre timer is locked
00373  */
00374 static void ack_ticks(struct pthread_timer *timer, unsigned int quantity)
00375 {
00376    int pending_ticks = timer->pending_ticks;
00377 
00378    ast_assert(quantity);
00379 
00380    if (quantity > pending_ticks) {
00381       quantity = pending_ticks;
00382    }
00383 
00384    if (!quantity) {
00385       return;
00386    }
00387 
00388    timer->pending_ticks -= quantity;
00389 
00390    if ((0 == timer->pending_ticks) && !timer->continuous) {
00391       unsignal_pipe(timer);
00392    }
00393 }
00394 
00395 /*!
00396  * \internal
00397  * \pre timer is locked
00398  */
00399 static void signal_pipe(struct pthread_timer *timer)
00400 {
00401    ssize_t res;
00402    unsigned char x = 42;
00403 
00404    if (timer->pipe_signaled) {
00405       return;
00406    }
00407 
00408    res = write(timer->pipe[PIPE_WRITE], &x, 1);
00409    if (-1 == res) {
00410       ast_log(LOG_ERROR, "Error writing to timing pipe: %s\n",
00411             strerror(errno));
00412    } else {
00413       timer->pipe_signaled = true;
00414    }
00415 }
00416 
00417 /*!
00418  * \internal
00419  * \pre timer is locked
00420  */
00421 static void unsignal_pipe(struct pthread_timer *timer)
00422 {
00423    ssize_t res;
00424    unsigned long buffer;
00425 
00426    if (!timer->pipe_signaled) {
00427       return;
00428    }
00429 
00430    res = read(timer->pipe[PIPE_READ], &buffer, sizeof(buffer));
00431    if (-1 == res) {
00432       ast_log(LOG_ERROR, "Error reading from pipe: %s\n",
00433             strerror(errno));
00434    } else {
00435       timer->pipe_signaled = false;
00436    }
00437 }
00438 
00439 static int run_timer(void *obj, void *arg, int flags)
00440 {
00441    struct pthread_timer *timer = obj;
00442 
00443    if (timer->state == TIMER_STATE_IDLE) {
00444       return 0;
00445    }
00446 
00447    ao2_lock(timer);
00448    if (check_timer(timer)) {
00449       timer->pending_ticks++;
00450       signal_pipe(timer);
00451    }
00452    ao2_unlock(timer);
00453 
00454    return 0;
00455 }
00456 
00457 static void *do_timing(void *arg)
00458 {
00459    struct timeval next_wakeup = ast_tvnow();
00460 
00461    while (!timing_thread.stop) {
00462       struct timespec ts = { 0, };
00463 
00464       ao2_callback(pthread_timers, OBJ_NODATA, run_timer, NULL);
00465 
00466       next_wakeup = ast_tvadd(next_wakeup, ast_tv(0, 5000));
00467 
00468       ts.tv_sec = next_wakeup.tv_sec;
00469       ts.tv_nsec = next_wakeup.tv_usec * 1000;
00470 
00471       ast_mutex_lock(&timing_thread.lock);
00472       if (!timing_thread.stop) {
00473          if (ao2_container_count(pthread_timers)) {
00474             ast_cond_timedwait(&timing_thread.cond, &timing_thread.lock, &ts);
00475          } else {
00476             ast_cond_wait(&timing_thread.cond, &timing_thread.lock);
00477          }
00478       }
00479       ast_mutex_unlock(&timing_thread.lock);
00480    }
00481 
00482    return NULL;
00483 }
00484 
00485 static int init_timing_thread(void)
00486 {
00487    ast_mutex_init(&timing_thread.lock);
00488    ast_cond_init(&timing_thread.cond, NULL);
00489 
00490    if (ast_pthread_create_background(&timing_thread.thread, NULL, do_timing, NULL)) {
00491       ast_log(LOG_ERROR, "Unable to start timing thread.\n");
00492       return -1;
00493    }
00494 
00495    return 0;
00496 }
00497 
00498 static int load_module(void)
00499 {
00500    if (!(pthread_timers = ao2_container_alloc(PTHREAD_TIMER_BUCKETS,
00501       pthread_timer_hash, pthread_timer_cmp))) {
00502       return AST_MODULE_LOAD_DECLINE;
00503    }
00504 
00505    if (init_timing_thread()) {
00506       ao2_ref(pthread_timers, -1);
00507       pthread_timers = NULL;
00508       return AST_MODULE_LOAD_DECLINE;
00509    }
00510 
00511    return (timing_funcs_handle = ast_register_timing_interface(&pthread_timing)) ?
00512       AST_MODULE_LOAD_SUCCESS : AST_MODULE_LOAD_DECLINE;
00513 }
00514 
00515 static int unload_module(void)
00516 {
00517    int res;
00518 
00519    ast_mutex_lock(&timing_thread.lock);
00520    timing_thread.stop = 1;
00521    ast_cond_signal(&timing_thread.cond);
00522    ast_mutex_unlock(&timing_thread.lock);
00523    pthread_join(timing_thread.thread, NULL);
00524 
00525    if (!(res = ast_unregister_timing_interface(timing_funcs_handle))) {
00526       ao2_ref(pthread_timers, -1);
00527       pthread_timers = NULL;
00528    }
00529 
00530    return res;
00531 }
00532 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_LOAD_ORDER, "pthread Timing Interface",
00533       .load = load_module,
00534       .unload = unload_module,
00535       .load_pri = AST_MODPRI_TIMING,
00536       );

Generated on 20 Aug 2013 for Asterisk - The Open Source Telephony Project by  doxygen 1.6.1