00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026 #include "asterisk.h"
00027
00028 ASTERISK_FILE_VERSION(__FILE__, "$Revision: 176675 $");
00029
00030 #include <math.h>
00031 #include <sys/select.h>
00032
00033 #include "asterisk/module.h"
00034 #include "asterisk/timing.h"
00035 #include "asterisk/utils.h"
00036 #include "asterisk/astobj2.h"
00037 #include "asterisk/time.h"
00038 #include "asterisk/lock.h"
00039
00040 static void *timing_funcs_handle;
00041
00042 static int pthread_timer_open(void);
00043 static void pthread_timer_close(int handle);
00044 static int pthread_timer_set_rate(int handle, unsigned int rate);
00045 static void pthread_timer_ack(int handle, unsigned int quantity);
00046 static int pthread_timer_enable_continuous(int handle);
00047 static int pthread_timer_disable_continuous(int handle);
00048 static enum ast_timer_event pthread_timer_get_event(int handle);
00049 static unsigned int pthread_timer_get_max_rate(int handle);
00050
00051 static struct ast_timing_interface pthread_timing = {
00052 .name = "pthread",
00053 .priority = 0,
00054 .timer_open = pthread_timer_open,
00055 .timer_close = pthread_timer_close,
00056 .timer_set_rate = pthread_timer_set_rate,
00057 .timer_ack = pthread_timer_ack,
00058 .timer_enable_continuous = pthread_timer_enable_continuous,
00059 .timer_disable_continuous = pthread_timer_disable_continuous,
00060 .timer_get_event = pthread_timer_get_event,
00061 .timer_get_max_rate = pthread_timer_get_max_rate,
00062 };
00063
00064
00065 #define MAX_RATE 100
00066
00067 static struct ao2_container *pthread_timers;
00068 #define PTHREAD_TIMER_BUCKETS 563
00069
00070 enum {
00071 PIPE_READ = 0,
00072 PIPE_WRITE = 1
00073 };
00074
00075 enum pthread_timer_state {
00076 TIMER_STATE_IDLE,
00077 TIMER_STATE_TICKING,
00078 TIMER_STATE_CONTINUOUS,
00079 };
00080
00081 struct pthread_timer {
00082 int pipe[2];
00083 enum pthread_timer_state state;
00084 unsigned int rate;
00085
00086 unsigned int interval;
00087 unsigned int tick_count;
00088 struct timeval start;
00089 };
00090
00091 static void pthread_timer_destructor(void *obj);
00092 static struct pthread_timer *find_timer(int handle, int unlinkobj);
00093 static void write_byte(int wr_fd);
00094 static void read_pipe(int rd_fd, unsigned int num, int clear);
00095
00096
00097
00098
00099 static struct {
00100 pthread_t thread;
00101 ast_mutex_t lock;
00102 ast_cond_t cond;
00103 unsigned int stop:1;
00104 } timing_thread;
00105
00106 static int pthread_timer_open(void)
00107 {
00108 struct pthread_timer *timer;
00109 int fd;
00110
00111 if (!(timer = ao2_alloc(sizeof(*timer), pthread_timer_destructor))) {
00112 errno = ENOMEM;
00113 return -1;
00114 }
00115
00116 timer->pipe[PIPE_READ] = timer->pipe[PIPE_WRITE] = -1;
00117 timer->state = TIMER_STATE_IDLE;
00118
00119 if (pipe(timer->pipe)) {
00120 ao2_ref(timer, -1);
00121 return -1;
00122 }
00123
00124 ao2_lock(pthread_timers);
00125 if (!ao2_container_count(pthread_timers)) {
00126 ast_mutex_lock(&timing_thread.lock);
00127 ast_cond_signal(&timing_thread.cond);
00128 ast_mutex_unlock(&timing_thread.lock);
00129 }
00130 ao2_link(pthread_timers, timer);
00131 ao2_unlock(pthread_timers);
00132
00133 fd = timer->pipe[PIPE_READ];
00134
00135 ao2_ref(timer, -1);
00136
00137 return fd;
00138 }
00139
00140 static void pthread_timer_close(int handle)
00141 {
00142 struct pthread_timer *timer;
00143
00144 if (!(timer = find_timer(handle, 1))) {
00145 return;
00146 }
00147
00148 ao2_ref(timer, -1);
00149 }
00150
00151 static void set_state(struct pthread_timer *timer)
00152 {
00153 unsigned int rate = timer->rate;
00154
00155 if (rate) {
00156 timer->state = TIMER_STATE_TICKING;
00157 timer->interval = roundf(1000.0 / ((float) rate));
00158 timer->start = ast_tvnow();
00159 } else {
00160 timer->state = TIMER_STATE_IDLE;
00161 timer->interval = 0;
00162 timer->start = ast_tv(0, 0);
00163 }
00164
00165 timer->tick_count = 0;
00166 }
00167
00168 static int pthread_timer_set_rate(int handle, unsigned int rate)
00169 {
00170 struct pthread_timer *timer;
00171
00172 if (!(timer = find_timer(handle, 0))) {
00173 errno = EINVAL;
00174 return -1;
00175 }
00176
00177 if (rate > MAX_RATE) {
00178 ast_log(LOG_ERROR, "res_timing_pthread only supports timers at a max rate of %d / sec\n",
00179 MAX_RATE);
00180 errno = EINVAL;
00181 return -1;
00182 }
00183
00184 ao2_lock(timer);
00185 timer->rate = rate;
00186 if (timer->state != TIMER_STATE_CONTINUOUS) {
00187 set_state(timer);
00188 }
00189
00190 ao2_unlock(timer);
00191
00192 ao2_ref(timer, -1);
00193
00194 return 0;
00195 }
00196
00197 static void pthread_timer_ack(int handle, unsigned int quantity)
00198 {
00199 struct pthread_timer *timer;
00200
00201 ast_assert(quantity > 0);
00202
00203 if (!(timer = find_timer(handle, 0))) {
00204 return;
00205 }
00206
00207 if (timer->state == TIMER_STATE_CONTINUOUS) {
00208
00209 return;
00210 }
00211
00212 read_pipe(timer->pipe[PIPE_READ], quantity, 0);
00213
00214 ao2_ref(timer, -1);
00215 }
00216
00217 static int pthread_timer_enable_continuous(int handle)
00218 {
00219 struct pthread_timer *timer;
00220
00221 if (!(timer = find_timer(handle, 0))) {
00222 errno = EINVAL;
00223 return -1;
00224 }
00225
00226 ao2_lock(timer);
00227 timer->state = TIMER_STATE_CONTINUOUS;
00228 write_byte(timer->pipe[PIPE_WRITE]);
00229 ao2_unlock(timer);
00230
00231 ao2_ref(timer, -1);
00232
00233 return 0;
00234 }
00235
00236 static int pthread_timer_disable_continuous(int handle)
00237 {
00238 struct pthread_timer *timer;
00239
00240 if (!(timer = find_timer(handle, 0))) {
00241 errno = EINVAL;
00242 return -1;
00243 }
00244
00245 ao2_lock(timer);
00246 set_state(timer);
00247 read_pipe(timer->pipe[PIPE_READ], 0, 1);
00248 ao2_unlock(timer);
00249
00250 ao2_ref(timer, -1);
00251
00252 return 0;
00253 }
00254
00255 static enum ast_timer_event pthread_timer_get_event(int handle)
00256 {
00257 struct pthread_timer *timer;
00258 enum ast_timer_event res = AST_TIMING_EVENT_EXPIRED;
00259
00260 if (!(timer = find_timer(handle, 0))) {
00261 return res;
00262 }
00263
00264 if (timer->state == TIMER_STATE_CONTINUOUS) {
00265 res = AST_TIMING_EVENT_CONTINUOUS;
00266 }
00267
00268 ao2_ref(timer, -1);
00269
00270 return res;
00271 }
00272
00273 static unsigned int pthread_timer_get_max_rate(int handle)
00274 {
00275 return MAX_RATE;
00276 }
00277
00278 static struct pthread_timer *find_timer(int handle, int unlinkobj)
00279 {
00280 struct pthread_timer *timer;
00281 struct pthread_timer tmp_timer;
00282 int flags = OBJ_POINTER;
00283
00284 tmp_timer.pipe[PIPE_READ] = handle;
00285
00286 if (unlinkobj) {
00287 flags |= OBJ_UNLINK;
00288 }
00289
00290 if (!(timer = ao2_find(pthread_timers, &tmp_timer, flags))) {
00291 ast_assert(timer != NULL);
00292 return NULL;
00293 }
00294
00295 return timer;
00296 }
00297
00298 static void pthread_timer_destructor(void *obj)
00299 {
00300 struct pthread_timer *timer = obj;
00301
00302 if (timer->pipe[PIPE_READ] > -1) {
00303 close(timer->pipe[PIPE_READ]);
00304 timer->pipe[PIPE_READ] = -1;
00305 }
00306
00307 if (timer->pipe[PIPE_WRITE] > -1) {
00308 close(timer->pipe[PIPE_WRITE]);
00309 timer->pipe[PIPE_WRITE] = -1;
00310 }
00311 }
00312
00313
00314
00315
00316 static int pthread_timer_hash(const void *obj, const int flags)
00317 {
00318 const struct pthread_timer *timer = obj;
00319
00320 return timer->pipe[PIPE_READ];
00321 }
00322
00323
00324
00325
00326 static int pthread_timer_cmp(void *obj, void *arg, int flags)
00327 {
00328 struct pthread_timer *timer1 = obj, *timer2 = arg;
00329
00330 return (timer1->pipe[PIPE_READ] == timer2->pipe[PIPE_READ]) ? CMP_MATCH | CMP_STOP : 0;
00331 }
00332
00333
00334
00335
00336
00337 static int check_timer(struct pthread_timer *timer)
00338 {
00339 struct timeval now;
00340
00341 if (timer->state == TIMER_STATE_IDLE || timer->state == TIMER_STATE_CONTINUOUS) {
00342 return 0;
00343 }
00344
00345 now = ast_tvnow();
00346
00347 if (timer->tick_count < (ast_tvdiff_ms(now, timer->start) / timer->interval)) {
00348 timer->tick_count++;
00349 if (!timer->tick_count) {
00350 timer->start = now;
00351 }
00352 return 1;
00353 }
00354
00355 return 0;
00356 }
00357
00358 static void read_pipe(int rd_fd, unsigned int quantity, int clear)
00359 {
00360 ast_assert(quantity || clear);
00361
00362 if (!quantity && clear) {
00363 quantity = 1;
00364 }
00365
00366 do {
00367 unsigned char buf[1024];
00368 ssize_t res;
00369 fd_set rfds;
00370 struct timeval timeout = {
00371 .tv_sec = 0,
00372 };
00373
00374
00375 FD_ZERO(&rfds);
00376 FD_SET(rd_fd, &rfds);
00377
00378 if (select(rd_fd + 1, &rfds, NULL, NULL, &timeout) != 1) {
00379 break;
00380 }
00381
00382 res = read(rd_fd, buf,
00383 (quantity < sizeof(buf)) ? quantity : sizeof(buf));
00384
00385 if (res == -1) {
00386 if (errno == EAGAIN) {
00387 continue;
00388 }
00389 ast_log(LOG_ERROR, "read failed on timing pipe: %s\n", strerror(errno));
00390 break;
00391 }
00392
00393 if (clear) {
00394 continue;
00395 }
00396
00397 quantity -= res;
00398 } while (quantity);
00399 }
00400
00401 static void write_byte(int wr_fd)
00402 {
00403 do {
00404 ssize_t res;
00405 unsigned char x = 42;
00406
00407 res = write(wr_fd, &x, 1);
00408
00409 if (res == -1) {
00410 if (errno == EAGAIN) {
00411 continue;
00412 }
00413 ast_log(LOG_ERROR, "Error writing to timing pipe: %s\n", strerror(errno));
00414 }
00415 } while (0);
00416 }
00417
00418 static int run_timer(void *obj, void *arg, int flags)
00419 {
00420 struct pthread_timer *timer = obj;
00421
00422 if (timer->state == TIMER_STATE_IDLE) {
00423 return 0;
00424 }
00425
00426 ao2_lock(timer);
00427
00428 if (check_timer(timer)) {
00429 write_byte(timer->pipe[PIPE_WRITE]);
00430 }
00431
00432 ao2_unlock(timer);
00433
00434 return 0;
00435 }
00436
00437 static void *do_timing(void *arg)
00438 {
00439 struct timeval next_wakeup = ast_tvnow();
00440
00441 while (!timing_thread.stop) {
00442 struct timespec ts = { 0, };
00443
00444 ao2_callback(pthread_timers, OBJ_NODATA, run_timer, NULL);
00445
00446 next_wakeup = ast_tvadd(next_wakeup, ast_tv(0, 5000));
00447
00448 ts.tv_sec = next_wakeup.tv_sec;
00449 ts.tv_nsec = next_wakeup.tv_usec * 1000;
00450
00451 ast_mutex_lock(&timing_thread.lock);
00452 if (!timing_thread.stop) {
00453 if (ao2_container_count(pthread_timers)) {
00454 ast_cond_timedwait(&timing_thread.cond, &timing_thread.lock, &ts);
00455 } else {
00456 ast_cond_wait(&timing_thread.cond, &timing_thread.lock);
00457 }
00458 }
00459 ast_mutex_unlock(&timing_thread.lock);
00460 }
00461
00462 return NULL;
00463 }
00464
00465 static int init_timing_thread(void)
00466 {
00467 ast_mutex_init(&timing_thread.lock);
00468 ast_cond_init(&timing_thread.cond, NULL);
00469
00470 if (ast_pthread_create_background(&timing_thread.thread, NULL, do_timing, NULL)) {
00471 ast_log(LOG_ERROR, "Unable to start timing thread.\n");
00472 return -1;
00473 }
00474
00475 return 0;
00476 }
00477
00478 static int load_module(void)
00479 {
00480 if (!(pthread_timers = ao2_container_alloc(PTHREAD_TIMER_BUCKETS,
00481 pthread_timer_hash, pthread_timer_cmp))) {
00482 return AST_MODULE_LOAD_DECLINE;
00483 }
00484
00485 if (init_timing_thread()) {
00486 ao2_ref(pthread_timers, -1);
00487 pthread_timers = NULL;
00488 return AST_MODULE_LOAD_DECLINE;
00489 }
00490
00491 return (timing_funcs_handle = ast_register_timing_interface(&pthread_timing)) ?
00492 AST_MODULE_LOAD_SUCCESS : AST_MODULE_LOAD_DECLINE;
00493 }
00494
00495 static int unload_module(void)
00496 {
00497 int res;
00498
00499 ast_mutex_lock(&timing_thread.lock);
00500 timing_thread.stop = 1;
00501 ast_cond_signal(&timing_thread.cond);
00502 ast_mutex_unlock(&timing_thread.lock);
00503 pthread_join(timing_thread.thread, NULL);
00504
00505 if (!(res = ast_unregister_timing_interface(timing_funcs_handle))) {
00506 ao2_ref(pthread_timers, -1);
00507 pthread_timers = NULL;
00508 }
00509
00510 return res;
00511 }
00512
00513 AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "pthread Timing Interface");