Fri Jun 19 12:09:52 2009

Asterisk developer's documentation


res_timing_pthread.c

Go to the documentation of this file.
00001 /*
00002  * Asterisk -- An open source telephony toolkit.
00003  *
00004  * Copyright (C) 2008, Digium, Inc.
00005  *
00006  * Russell Bryant <russell@digium.com>
00007  *
00008  * See http://www.asterisk.org for more information about
00009  * the Asterisk project. Please do not directly contact
00010  * any of the maintainers of this project for assistance;
00011  * the project provides a web site, mailing lists and IRC
00012  * channels for your use.
00013  *
00014  * This program is free software, distributed under the terms of
00015  * the GNU General Public License Version 2. See the LICENSE file
00016  * at the top of the source tree.
00017  */
00018 
00019 /*! 
00020  * \file
00021  * \author Russell Bryant <russell@digium.com>
00022  *
00023  * \brief pthread timing interface 
00024  */
00025 
00026 #include "asterisk.h"
00027 
00028 ASTERISK_FILE_VERSION(__FILE__, "$Revision: 176675 $");
00029 
00030 #include <math.h>
00031 #include <sys/select.h>
00032 
00033 #include "asterisk/module.h"
00034 #include "asterisk/timing.h"
00035 #include "asterisk/utils.h"
00036 #include "asterisk/astobj2.h"
00037 #include "asterisk/time.h"
00038 #include "asterisk/lock.h"
00039 
00040 static void *timing_funcs_handle;
00041 
00042 static int pthread_timer_open(void);
00043 static void pthread_timer_close(int handle);
00044 static int pthread_timer_set_rate(int handle, unsigned int rate);
00045 static void pthread_timer_ack(int handle, unsigned int quantity);
00046 static int pthread_timer_enable_continuous(int handle);
00047 static int pthread_timer_disable_continuous(int handle);
00048 static enum ast_timer_event pthread_timer_get_event(int handle);
00049 static unsigned int pthread_timer_get_max_rate(int handle);
00050 
00051 static struct ast_timing_interface pthread_timing = {
00052    .name = "pthread",
00053    .priority = 0, /* use this as a last resort */
00054    .timer_open = pthread_timer_open,
00055    .timer_close = pthread_timer_close,
00056    .timer_set_rate = pthread_timer_set_rate,
00057    .timer_ack = pthread_timer_ack,
00058    .timer_enable_continuous = pthread_timer_enable_continuous,
00059    .timer_disable_continuous = pthread_timer_disable_continuous,
00060    .timer_get_event = pthread_timer_get_event,
00061    .timer_get_max_rate = pthread_timer_get_max_rate,
00062 };
00063 
00064 /* 1 tick / 10 ms */
00065 #define MAX_RATE 100
00066 
00067 static struct ao2_container *pthread_timers;
00068 #define PTHREAD_TIMER_BUCKETS 563
00069 
00070 enum {
00071    PIPE_READ =  0,
00072    PIPE_WRITE = 1
00073 };
00074 
00075 enum pthread_timer_state {
00076    TIMER_STATE_IDLE,
00077    TIMER_STATE_TICKING,
00078    TIMER_STATE_CONTINUOUS,
00079 };
00080 
00081 struct pthread_timer {
00082    int pipe[2];
00083    enum pthread_timer_state state;
00084    unsigned int rate;
00085    /*! Interval in ms for current rate */
00086    unsigned int interval;
00087    unsigned int tick_count;
00088    struct timeval start;
00089 };
00090 
00091 static void pthread_timer_destructor(void *obj);
00092 static struct pthread_timer *find_timer(int handle, int unlinkobj);
00093 static void write_byte(int wr_fd);
00094 static void read_pipe(int rd_fd, unsigned int num, int clear);
00095 
00096 /*!
00097  * \brief Data for the timing thread
00098  */
00099 static struct {
00100    pthread_t thread;
00101    ast_mutex_t lock;
00102    ast_cond_t cond;
00103    unsigned int stop:1;
00104 } timing_thread;
00105 
00106 static int pthread_timer_open(void)
00107 {
00108    struct pthread_timer *timer;
00109    int fd;
00110 
00111    if (!(timer = ao2_alloc(sizeof(*timer), pthread_timer_destructor))) {
00112       errno = ENOMEM;
00113       return -1;
00114    }
00115 
00116    timer->pipe[PIPE_READ] = timer->pipe[PIPE_WRITE] = -1;
00117    timer->state = TIMER_STATE_IDLE;
00118 
00119    if (pipe(timer->pipe)) {
00120       ao2_ref(timer, -1);
00121       return -1;
00122    }
00123 
00124    ao2_lock(pthread_timers);
00125    if (!ao2_container_count(pthread_timers)) {
00126       ast_mutex_lock(&timing_thread.lock);
00127       ast_cond_signal(&timing_thread.cond);
00128       ast_mutex_unlock(&timing_thread.lock);
00129    }
00130    ao2_link(pthread_timers, timer);
00131    ao2_unlock(pthread_timers);
00132 
00133    fd = timer->pipe[PIPE_READ];
00134 
00135    ao2_ref(timer, -1);
00136 
00137    return fd;
00138 }
00139 
00140 static void pthread_timer_close(int handle)
00141 {
00142    struct pthread_timer *timer;
00143 
00144    if (!(timer = find_timer(handle, 1))) {
00145       return;
00146    }
00147 
00148    ao2_ref(timer, -1);
00149 }
00150 
00151 static void set_state(struct pthread_timer *timer)
00152 {
00153    unsigned int rate = timer->rate;
00154 
00155    if (rate) {
00156       timer->state = TIMER_STATE_TICKING;
00157       timer->interval = roundf(1000.0 / ((float) rate));
00158       timer->start = ast_tvnow();
00159    } else {
00160       timer->state = TIMER_STATE_IDLE;
00161       timer->interval = 0;
00162       timer->start = ast_tv(0, 0);
00163    }
00164 
00165    timer->tick_count = 0;
00166 }
00167 
00168 static int pthread_timer_set_rate(int handle, unsigned int rate)
00169 {
00170    struct pthread_timer *timer;
00171 
00172    if (!(timer = find_timer(handle, 0))) {
00173       errno = EINVAL;
00174       return -1;
00175    }
00176 
00177    if (rate > MAX_RATE) {
00178       ast_log(LOG_ERROR, "res_timing_pthread only supports timers at a max rate of %d / sec\n",
00179          MAX_RATE);
00180       errno = EINVAL;
00181       return -1;
00182    }
00183 
00184    ao2_lock(timer);
00185    timer->rate = rate;
00186    if (timer->state != TIMER_STATE_CONTINUOUS) {
00187       set_state(timer);
00188    }
00189    
00190    ao2_unlock(timer);
00191 
00192    ao2_ref(timer, -1);
00193 
00194    return 0;
00195 }
00196 
00197 static void pthread_timer_ack(int handle, unsigned int quantity)
00198 {
00199    struct pthread_timer *timer;
00200 
00201    ast_assert(quantity > 0);
00202 
00203    if (!(timer = find_timer(handle, 0))) {
00204       return;
00205    }
00206 
00207    if (timer->state == TIMER_STATE_CONTINUOUS) {
00208       /* Leave the pipe alone, please! */
00209       return;
00210    }
00211 
00212    read_pipe(timer->pipe[PIPE_READ], quantity, 0);
00213 
00214    ao2_ref(timer, -1);
00215 }
00216 
00217 static int pthread_timer_enable_continuous(int handle)
00218 {
00219    struct pthread_timer *timer;
00220 
00221    if (!(timer = find_timer(handle, 0))) {
00222       errno = EINVAL;
00223       return -1;
00224    }
00225 
00226    ao2_lock(timer);
00227    timer->state = TIMER_STATE_CONTINUOUS;
00228    write_byte(timer->pipe[PIPE_WRITE]);
00229    ao2_unlock(timer);
00230 
00231    ao2_ref(timer, -1);
00232 
00233    return 0;
00234 }
00235 
00236 static int pthread_timer_disable_continuous(int handle)
00237 {
00238    struct pthread_timer *timer;
00239 
00240    if (!(timer = find_timer(handle, 0))) {
00241       errno = EINVAL;
00242       return -1;
00243    }
00244 
00245    ao2_lock(timer);
00246    set_state(timer);
00247    read_pipe(timer->pipe[PIPE_READ], 0, 1);
00248    ao2_unlock(timer);
00249 
00250    ao2_ref(timer, -1);
00251 
00252    return 0;
00253 }
00254 
00255 static enum ast_timer_event pthread_timer_get_event(int handle)
00256 {
00257    struct pthread_timer *timer;
00258    enum ast_timer_event res = AST_TIMING_EVENT_EXPIRED;
00259 
00260    if (!(timer = find_timer(handle, 0))) {
00261       return res;
00262    }
00263 
00264    if (timer->state == TIMER_STATE_CONTINUOUS) {
00265       res = AST_TIMING_EVENT_CONTINUOUS;
00266    }
00267 
00268    ao2_ref(timer, -1);
00269 
00270    return res;
00271 }
00272 
00273 static unsigned int pthread_timer_get_max_rate(int handle)
00274 {
00275    return MAX_RATE;
00276 }
00277 
00278 static struct pthread_timer *find_timer(int handle, int unlinkobj)
00279 {
00280    struct pthread_timer *timer;
00281    struct pthread_timer tmp_timer;
00282    int flags = OBJ_POINTER;
00283 
00284    tmp_timer.pipe[PIPE_READ] = handle;
00285 
00286    if (unlinkobj) {
00287       flags |= OBJ_UNLINK;
00288    }
00289 
00290    if (!(timer = ao2_find(pthread_timers, &tmp_timer, flags))) {
00291       ast_assert(timer != NULL);
00292       return NULL;
00293    }
00294 
00295    return timer;
00296 }
00297 
00298 static void pthread_timer_destructor(void *obj)
00299 {
00300    struct pthread_timer *timer = obj;
00301 
00302    if (timer->pipe[PIPE_READ] > -1) {
00303       close(timer->pipe[PIPE_READ]);
00304       timer->pipe[PIPE_READ] = -1;
00305    }
00306 
00307    if (timer->pipe[PIPE_WRITE] > -1) {
00308       close(timer->pipe[PIPE_WRITE]);
00309       timer->pipe[PIPE_WRITE] = -1;
00310    }
00311 }
00312 
00313 /*!
00314  * \note only PIPE_READ is guaranteed valid 
00315  */
00316 static int pthread_timer_hash(const void *obj, const int flags)
00317 {
00318    const struct pthread_timer *timer = obj;
00319 
00320    return timer->pipe[PIPE_READ];
00321 }
00322 
00323 /*!
00324  * \note only PIPE_READ is guaranteed valid 
00325  */
00326 static int pthread_timer_cmp(void *obj, void *arg, int flags)
00327 {
00328    struct pthread_timer *timer1 = obj, *timer2 = arg;
00329 
00330    return (timer1->pipe[PIPE_READ] == timer2->pipe[PIPE_READ]) ? CMP_MATCH | CMP_STOP : 0;
00331 }
00332 
00333 /*!
00334  * \retval 0 no timer tick needed
00335  * \retval non-zero write to the timing pipe needed
00336  */
00337 static int check_timer(struct pthread_timer *timer)
00338 {
00339    struct timeval now;
00340 
00341    if (timer->state == TIMER_STATE_IDLE || timer->state == TIMER_STATE_CONTINUOUS) {
00342       return 0;   
00343    }
00344    
00345    now = ast_tvnow();
00346 
00347    if (timer->tick_count < (ast_tvdiff_ms(now, timer->start) / timer->interval)) {
00348       timer->tick_count++;
00349       if (!timer->tick_count) {
00350          timer->start = now;
00351       }
00352       return 1;
00353    }
00354 
00355    return 0;
00356 }
00357 
00358 static void read_pipe(int rd_fd, unsigned int quantity, int clear)
00359 {
00360    ast_assert(quantity || clear);
00361 
00362    if (!quantity && clear) {
00363       quantity = 1;
00364    }
00365 
00366    do {
00367       unsigned char buf[1024];
00368       ssize_t res;
00369       fd_set rfds;
00370       struct timeval timeout = {
00371          .tv_sec = 0,
00372       };
00373 
00374       /* Make sure there is data to read */
00375       FD_ZERO(&rfds);
00376       FD_SET(rd_fd, &rfds);
00377 
00378       if (select(rd_fd + 1, &rfds, NULL, NULL, &timeout) != 1) {
00379          break;
00380       }
00381 
00382       res = read(rd_fd, buf, 
00383          (quantity < sizeof(buf)) ? quantity : sizeof(buf));
00384 
00385       if (res == -1) {
00386          if (errno == EAGAIN) {
00387             continue;
00388          }
00389          ast_log(LOG_ERROR, "read failed on timing pipe: %s\n", strerror(errno));
00390          break;
00391       }
00392 
00393       if (clear) {
00394          continue;
00395       }
00396 
00397       quantity -= res;
00398    } while (quantity);
00399 }
00400 
00401 static void write_byte(int wr_fd)
00402 {
00403    do {
00404       ssize_t res;
00405       unsigned char x = 42;
00406 
00407       res = write(wr_fd, &x, 1); 
00408 
00409       if (res == -1) {
00410          if (errno == EAGAIN) {
00411             continue;
00412          }
00413          ast_log(LOG_ERROR, "Error writing to timing pipe: %s\n", strerror(errno));
00414       }
00415    } while (0);
00416 }
00417 
00418 static int run_timer(void *obj, void *arg, int flags)
00419 {
00420    struct pthread_timer *timer = obj;
00421 
00422    if (timer->state == TIMER_STATE_IDLE) {
00423       return 0;
00424    }
00425 
00426    ao2_lock(timer);
00427 
00428    if (check_timer(timer)) {
00429       write_byte(timer->pipe[PIPE_WRITE]);
00430    }
00431    
00432    ao2_unlock(timer);
00433 
00434    return 0;
00435 }
00436 
00437 static void *do_timing(void *arg)
00438 {
00439    struct timeval next_wakeup = ast_tvnow();
00440 
00441    while (!timing_thread.stop) {
00442       struct timespec ts = { 0, };
00443 
00444       ao2_callback(pthread_timers, OBJ_NODATA, run_timer, NULL);
00445 
00446       next_wakeup = ast_tvadd(next_wakeup, ast_tv(0, 5000));
00447 
00448       ts.tv_sec = next_wakeup.tv_sec;
00449       ts.tv_nsec = next_wakeup.tv_usec * 1000;
00450 
00451       ast_mutex_lock(&timing_thread.lock);
00452       if (!timing_thread.stop) {
00453          if (ao2_container_count(pthread_timers)) {
00454             ast_cond_timedwait(&timing_thread.cond, &timing_thread.lock, &ts);
00455          } else {
00456             ast_cond_wait(&timing_thread.cond, &timing_thread.lock);
00457          }
00458       }
00459       ast_mutex_unlock(&timing_thread.lock);
00460    }
00461 
00462    return NULL;
00463 }
00464 
00465 static int init_timing_thread(void)
00466 {
00467    ast_mutex_init(&timing_thread.lock);
00468    ast_cond_init(&timing_thread.cond, NULL);
00469 
00470    if (ast_pthread_create_background(&timing_thread.thread, NULL, do_timing, NULL)) {
00471       ast_log(LOG_ERROR, "Unable to start timing thread.\n");
00472       return -1;
00473    }
00474 
00475    return 0;
00476 }
00477 
00478 static int load_module(void)
00479 {
00480    if (!(pthread_timers = ao2_container_alloc(PTHREAD_TIMER_BUCKETS, 
00481       pthread_timer_hash, pthread_timer_cmp))) {
00482       return AST_MODULE_LOAD_DECLINE;
00483    }
00484 
00485    if (init_timing_thread()) {
00486       ao2_ref(pthread_timers, -1);
00487       pthread_timers = NULL;
00488       return AST_MODULE_LOAD_DECLINE;
00489    }
00490 
00491    return (timing_funcs_handle = ast_register_timing_interface(&pthread_timing)) ?
00492       AST_MODULE_LOAD_SUCCESS : AST_MODULE_LOAD_DECLINE;
00493 }
00494 
00495 static int unload_module(void)
00496 {
00497    int res;
00498 
00499    ast_mutex_lock(&timing_thread.lock);
00500    timing_thread.stop = 1;
00501    ast_cond_signal(&timing_thread.cond);
00502    ast_mutex_unlock(&timing_thread.lock);
00503    pthread_join(timing_thread.thread, NULL);
00504 
00505    if (!(res = ast_unregister_timing_interface(timing_funcs_handle))) {
00506       ao2_ref(pthread_timers, -1);
00507       pthread_timers = NULL;
00508    }
00509 
00510    return res;
00511 }
00512 
00513 AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "pthread Timing Interface");

Generated on Fri Jun 19 12:09:52 2009 for Asterisk - the Open Source PBX by  doxygen 1.4.7