00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026 #include "asterisk.h"
00027
00028 ASTERISK_FILE_VERSION(__FILE__, "$Revision: 278465 $");
00029
00030 #include <math.h>
00031 #include <sys/select.h>
00032
00033 #include "asterisk/module.h"
00034 #include "asterisk/timing.h"
00035 #include "asterisk/utils.h"
00036 #include "asterisk/astobj2.h"
00037 #include "asterisk/time.h"
00038 #include "asterisk/lock.h"
00039 #include "asterisk/poll-compat.h"
00040
00041 static void *timing_funcs_handle;
00042
00043 static int pthread_timer_open(void);
00044 static void pthread_timer_close(int handle);
00045 static int pthread_timer_set_rate(int handle, unsigned int rate);
00046 static void pthread_timer_ack(int handle, unsigned int quantity);
00047 static int pthread_timer_enable_continuous(int handle);
00048 static int pthread_timer_disable_continuous(int handle);
00049 static enum ast_timer_event pthread_timer_get_event(int handle);
00050 static unsigned int pthread_timer_get_max_rate(int handle);
00051
00052 static struct ast_timing_interface pthread_timing = {
00053 .name = "pthread",
00054 .priority = 0,
00055 .timer_open = pthread_timer_open,
00056 .timer_close = pthread_timer_close,
00057 .timer_set_rate = pthread_timer_set_rate,
00058 .timer_ack = pthread_timer_ack,
00059 .timer_enable_continuous = pthread_timer_enable_continuous,
00060 .timer_disable_continuous = pthread_timer_disable_continuous,
00061 .timer_get_event = pthread_timer_get_event,
00062 .timer_get_max_rate = pthread_timer_get_max_rate,
00063 };
00064
00065
00066 #define MAX_RATE 100
00067
00068 static struct ao2_container *pthread_timers;
00069 #define PTHREAD_TIMER_BUCKETS 563
00070
00071 enum {
00072 PIPE_READ = 0,
00073 PIPE_WRITE = 1
00074 };
00075
00076 enum pthread_timer_state {
00077 TIMER_STATE_IDLE,
00078 TIMER_STATE_TICKING,
00079 };
00080
00081 struct pthread_timer {
00082 int pipe[2];
00083 enum pthread_timer_state state;
00084 unsigned int rate;
00085
00086 unsigned int interval;
00087 unsigned int tick_count;
00088 unsigned int pending_ticks;
00089 struct timeval start;
00090 unsigned int continuous:1;
00091 };
00092
00093 static void pthread_timer_destructor(void *obj);
00094 static struct pthread_timer *find_timer(int handle, int unlinkobj);
00095 static void write_byte(struct pthread_timer *timer);
00096 static void read_pipe(struct pthread_timer *timer, unsigned int num);
00097
00098
00099
00100
00101 static struct {
00102 pthread_t thread;
00103 ast_mutex_t lock;
00104 ast_cond_t cond;
00105 unsigned int stop:1;
00106 } timing_thread;
00107
00108 static int pthread_timer_open(void)
00109 {
00110 struct pthread_timer *timer;
00111 int fd;
00112
00113 if (!(timer = ao2_alloc(sizeof(*timer), pthread_timer_destructor))) {
00114 errno = ENOMEM;
00115 return -1;
00116 }
00117
00118 timer->pipe[PIPE_READ] = timer->pipe[PIPE_WRITE] = -1;
00119 timer->state = TIMER_STATE_IDLE;
00120
00121 if (pipe(timer->pipe)) {
00122 ao2_ref(timer, -1);
00123 return -1;
00124 }
00125
00126 ao2_lock(pthread_timers);
00127 if (!ao2_container_count(pthread_timers)) {
00128 ast_mutex_lock(&timing_thread.lock);
00129 ast_cond_signal(&timing_thread.cond);
00130 ast_mutex_unlock(&timing_thread.lock);
00131 }
00132 ao2_link(pthread_timers, timer);
00133 ao2_unlock(pthread_timers);
00134
00135 fd = timer->pipe[PIPE_READ];
00136
00137 ao2_ref(timer, -1);
00138
00139 return fd;
00140 }
00141
00142 static void pthread_timer_close(int handle)
00143 {
00144 struct pthread_timer *timer;
00145
00146 if (!(timer = find_timer(handle, 1))) {
00147 return;
00148 }
00149
00150 ao2_ref(timer, -1);
00151 }
00152
00153 static int pthread_timer_set_rate(int handle, unsigned int rate)
00154 {
00155 struct pthread_timer *timer;
00156
00157 if (!(timer = find_timer(handle, 0))) {
00158 errno = EINVAL;
00159 return -1;
00160 }
00161
00162 if (rate > MAX_RATE) {
00163 ast_log(LOG_ERROR, "res_timing_pthread only supports timers at a "
00164 "max rate of %d / sec\n", MAX_RATE);
00165 errno = EINVAL;
00166 return -1;
00167 }
00168
00169 ao2_lock(timer);
00170
00171 if ((timer->rate = rate)) {
00172 timer->interval = roundf(1000.0 / ((float) rate));
00173 timer->start = ast_tvnow();
00174 timer->state = TIMER_STATE_TICKING;
00175 } else {
00176 timer->interval = 0;
00177 timer->start = ast_tv(0, 0);
00178 timer->state = TIMER_STATE_IDLE;
00179 }
00180 timer->tick_count = 0;
00181
00182 ao2_unlock(timer);
00183
00184 ao2_ref(timer, -1);
00185
00186 return 0;
00187 }
00188
00189 static void pthread_timer_ack(int handle, unsigned int quantity)
00190 {
00191 struct pthread_timer *timer;
00192
00193 ast_assert(quantity > 0);
00194
00195 if (!(timer = find_timer(handle, 0))) {
00196 return;
00197 }
00198
00199 ao2_lock(timer);
00200 read_pipe(timer, quantity);
00201 ao2_unlock(timer);
00202
00203 ao2_ref(timer, -1);
00204 }
00205
00206 static int pthread_timer_enable_continuous(int handle)
00207 {
00208 struct pthread_timer *timer;
00209
00210 if (!(timer = find_timer(handle, 0))) {
00211 errno = EINVAL;
00212 return -1;
00213 }
00214
00215 ao2_lock(timer);
00216 if (!timer->continuous) {
00217 timer->continuous = 1;
00218 write_byte(timer);
00219 }
00220 ao2_unlock(timer);
00221
00222 ao2_ref(timer, -1);
00223
00224 return 0;
00225 }
00226
00227 static int pthread_timer_disable_continuous(int handle)
00228 {
00229 struct pthread_timer *timer;
00230
00231 if (!(timer = find_timer(handle, 0))) {
00232 errno = EINVAL;
00233 return -1;
00234 }
00235
00236 ao2_lock(timer);
00237 if (timer->continuous) {
00238 timer->continuous = 0;
00239 read_pipe(timer, 1);
00240 }
00241 ao2_unlock(timer);
00242
00243 ao2_ref(timer, -1);
00244
00245 return 0;
00246 }
00247
00248 static enum ast_timer_event pthread_timer_get_event(int handle)
00249 {
00250 struct pthread_timer *timer;
00251 enum ast_timer_event res = AST_TIMING_EVENT_EXPIRED;
00252
00253 if (!(timer = find_timer(handle, 0))) {
00254 return res;
00255 }
00256
00257 ao2_lock(timer);
00258 if (timer->continuous && timer->pending_ticks == 1) {
00259 res = AST_TIMING_EVENT_CONTINUOUS;
00260 }
00261 ao2_unlock(timer);
00262
00263 ao2_ref(timer, -1);
00264
00265 return res;
00266 }
00267
00268 static unsigned int pthread_timer_get_max_rate(int handle)
00269 {
00270 return MAX_RATE;
00271 }
00272
00273 static struct pthread_timer *find_timer(int handle, int unlinkobj)
00274 {
00275 struct pthread_timer *timer;
00276 struct pthread_timer tmp_timer;
00277 int flags = OBJ_POINTER;
00278
00279 tmp_timer.pipe[PIPE_READ] = handle;
00280
00281 if (unlinkobj) {
00282 flags |= OBJ_UNLINK;
00283 }
00284
00285 if (!(timer = ao2_find(pthread_timers, &tmp_timer, flags))) {
00286 ast_assert(timer != NULL);
00287 return NULL;
00288 }
00289
00290 return timer;
00291 }
00292
00293 static void pthread_timer_destructor(void *obj)
00294 {
00295 struct pthread_timer *timer = obj;
00296
00297 if (timer->pipe[PIPE_READ] > -1) {
00298 close(timer->pipe[PIPE_READ]);
00299 timer->pipe[PIPE_READ] = -1;
00300 }
00301
00302 if (timer->pipe[PIPE_WRITE] > -1) {
00303 close(timer->pipe[PIPE_WRITE]);
00304 timer->pipe[PIPE_WRITE] = -1;
00305 }
00306 }
00307
00308
00309
00310
00311 static int pthread_timer_hash(const void *obj, const int flags)
00312 {
00313 const struct pthread_timer *timer = obj;
00314
00315 return timer->pipe[PIPE_READ];
00316 }
00317
00318
00319
00320
00321 static int pthread_timer_cmp(void *obj, void *arg, int flags)
00322 {
00323 struct pthread_timer *timer1 = obj, *timer2 = arg;
00324
00325 return (timer1->pipe[PIPE_READ] == timer2->pipe[PIPE_READ]) ? CMP_MATCH | CMP_STOP : 0;
00326 }
00327
00328
00329
00330
00331
00332 static int check_timer(struct pthread_timer *timer)
00333 {
00334 struct timeval now;
00335
00336 if (timer->state == TIMER_STATE_IDLE) {
00337 return 0;
00338 }
00339
00340 now = ast_tvnow();
00341
00342 if (timer->tick_count < (ast_tvdiff_ms(now, timer->start) / timer->interval)) {
00343 timer->tick_count++;
00344 if (!timer->tick_count) {
00345
00346 timer->start = now;
00347 }
00348 return 1;
00349 }
00350
00351 return 0;
00352 }
00353
00354
00355
00356
00357
00358 static void read_pipe(struct pthread_timer *timer, unsigned int quantity)
00359 {
00360 int rd_fd = timer->pipe[PIPE_READ];
00361 int pending_ticks = timer->pending_ticks;
00362
00363 ast_assert(quantity);
00364
00365 if (timer->continuous && pending_ticks) {
00366 pending_ticks--;
00367 }
00368
00369 if (quantity > pending_ticks) {
00370 quantity = pending_ticks;
00371 }
00372
00373 if (!quantity) {
00374 return;
00375 }
00376
00377 do {
00378 unsigned char buf[1024];
00379 ssize_t res;
00380 struct pollfd pfd = {
00381 .fd = rd_fd,
00382 .events = POLLIN,
00383 };
00384
00385 if (ast_poll(&pfd, 1, 0) != 1) {
00386 ast_debug(1, "Reading not available on timing pipe, "
00387 "quantity: %u\n", quantity);
00388 break;
00389 }
00390
00391 res = read(rd_fd, buf,
00392 (quantity < sizeof(buf)) ? quantity : sizeof(buf));
00393
00394 if (res == -1) {
00395 if (errno == EAGAIN) {
00396 continue;
00397 }
00398 ast_log(LOG_ERROR, "read failed on timing pipe: %s\n",
00399 strerror(errno));
00400 break;
00401 }
00402
00403 quantity -= res;
00404 timer->pending_ticks -= res;
00405 } while (quantity);
00406 }
00407
00408
00409
00410
00411
00412 static void write_byte(struct pthread_timer *timer)
00413 {
00414 ssize_t res;
00415 unsigned char x = 42;
00416
00417 do {
00418 res = write(timer->pipe[PIPE_WRITE], &x, 1);
00419 } while (res == -1 && errno == EAGAIN);
00420
00421 if (res == -1) {
00422 ast_log(LOG_ERROR, "Error writing to timing pipe: %s\n",
00423 strerror(errno));
00424 } else {
00425 timer->pending_ticks++;
00426 }
00427 }
00428
00429 static int run_timer(void *obj, void *arg, int flags)
00430 {
00431 struct pthread_timer *timer = obj;
00432
00433 if (timer->state == TIMER_STATE_IDLE) {
00434 return 0;
00435 }
00436
00437 ao2_lock(timer);
00438 if (check_timer(timer)) {
00439 write_byte(timer);
00440 }
00441 ao2_unlock(timer);
00442
00443 return 0;
00444 }
00445
00446 static void *do_timing(void *arg)
00447 {
00448 struct timeval next_wakeup = ast_tvnow();
00449
00450 while (!timing_thread.stop) {
00451 struct timespec ts = { 0, };
00452
00453 ao2_callback(pthread_timers, OBJ_NODATA, run_timer, NULL);
00454
00455 next_wakeup = ast_tvadd(next_wakeup, ast_tv(0, 5000));
00456
00457 ts.tv_sec = next_wakeup.tv_sec;
00458 ts.tv_nsec = next_wakeup.tv_usec * 1000;
00459
00460 ast_mutex_lock(&timing_thread.lock);
00461 if (!timing_thread.stop) {
00462 if (ao2_container_count(pthread_timers)) {
00463 ast_cond_timedwait(&timing_thread.cond, &timing_thread.lock, &ts);
00464 } else {
00465 ast_cond_wait(&timing_thread.cond, &timing_thread.lock);
00466 }
00467 }
00468 ast_mutex_unlock(&timing_thread.lock);
00469 }
00470
00471 return NULL;
00472 }
00473
00474 static int init_timing_thread(void)
00475 {
00476 ast_mutex_init(&timing_thread.lock);
00477 ast_cond_init(&timing_thread.cond, NULL);
00478
00479 if (ast_pthread_create_background(&timing_thread.thread, NULL, do_timing, NULL)) {
00480 ast_log(LOG_ERROR, "Unable to start timing thread.\n");
00481 return -1;
00482 }
00483
00484 return 0;
00485 }
00486
00487 static int load_module(void)
00488 {
00489 if (!(pthread_timers = ao2_container_alloc(PTHREAD_TIMER_BUCKETS,
00490 pthread_timer_hash, pthread_timer_cmp))) {
00491 return AST_MODULE_LOAD_DECLINE;
00492 }
00493
00494 if (init_timing_thread()) {
00495 ao2_ref(pthread_timers, -1);
00496 pthread_timers = NULL;
00497 return AST_MODULE_LOAD_DECLINE;
00498 }
00499
00500 return (timing_funcs_handle = ast_register_timing_interface(&pthread_timing)) ?
00501 AST_MODULE_LOAD_SUCCESS : AST_MODULE_LOAD_DECLINE;
00502 }
00503
00504 static int unload_module(void)
00505 {
00506 int res;
00507
00508 ast_mutex_lock(&timing_thread.lock);
00509 timing_thread.stop = 1;
00510 ast_cond_signal(&timing_thread.cond);
00511 ast_mutex_unlock(&timing_thread.lock);
00512 pthread_join(timing_thread.thread, NULL);
00513
00514 if (!(res = ast_unregister_timing_interface(timing_funcs_handle))) {
00515 ao2_ref(pthread_timers, -1);
00516 pthread_timers = NULL;
00517 }
00518
00519 return res;
00520 }
00521 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_LOAD_ORDER, "pthread Timing Interface",
00522 .load = load_module,
00523 .unload = unload_module,
00524 .load_pri = AST_MODPRI_CHANNEL_DEPEND,
00525 );