00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 #include "asterisk.h"
00031
00032 ASTERISK_FILE_VERSION(__FILE__, "$Revision: 349194 $");
00033
00034 #include <math.h>
00035 #include <sys/select.h>
00036
00037 #include "asterisk/module.h"
00038 #include "asterisk/timing.h"
00039 #include "asterisk/utils.h"
00040 #include "asterisk/astobj2.h"
00041 #include "asterisk/time.h"
00042 #include "asterisk/lock.h"
00043 #include "asterisk/poll-compat.h"
00044
00045 static void *timing_funcs_handle;
00046
00047 static int pthread_timer_open(void);
00048 static void pthread_timer_close(int handle);
00049 static int pthread_timer_set_rate(int handle, unsigned int rate);
00050 static void pthread_timer_ack(int handle, unsigned int quantity);
00051 static int pthread_timer_enable_continuous(int handle);
00052 static int pthread_timer_disable_continuous(int handle);
00053 static enum ast_timer_event pthread_timer_get_event(int handle);
00054 static unsigned int pthread_timer_get_max_rate(int handle);
00055
00056 static struct ast_timing_interface pthread_timing = {
00057 .name = "pthread",
00058 .priority = 0,
00059 .timer_open = pthread_timer_open,
00060 .timer_close = pthread_timer_close,
00061 .timer_set_rate = pthread_timer_set_rate,
00062 .timer_ack = pthread_timer_ack,
00063 .timer_enable_continuous = pthread_timer_enable_continuous,
00064 .timer_disable_continuous = pthread_timer_disable_continuous,
00065 .timer_get_event = pthread_timer_get_event,
00066 .timer_get_max_rate = pthread_timer_get_max_rate,
00067 };
00068
00069
00070 #define MAX_RATE 100
00071
00072 static struct ao2_container *pthread_timers;
00073 #define PTHREAD_TIMER_BUCKETS 563
00074
00075 enum {
00076 PIPE_READ = 0,
00077 PIPE_WRITE = 1
00078 };
00079
00080 enum pthread_timer_state {
00081 TIMER_STATE_IDLE,
00082 TIMER_STATE_TICKING,
00083 };
00084
00085 struct pthread_timer {
00086 int pipe[2];
00087 enum pthread_timer_state state;
00088 unsigned int rate;
00089
00090 unsigned int interval;
00091 unsigned int tick_count;
00092 unsigned int pending_ticks;
00093 struct timeval start;
00094 unsigned int continuous:1;
00095 };
00096
00097 static void pthread_timer_destructor(void *obj);
00098 static struct pthread_timer *find_timer(int handle, int unlinkobj);
00099 static void write_byte(struct pthread_timer *timer);
00100 static void read_pipe(struct pthread_timer *timer, unsigned int num);
00101
00102
00103
00104
00105 static struct {
00106 pthread_t thread;
00107 ast_mutex_t lock;
00108 ast_cond_t cond;
00109 unsigned int stop:1;
00110 } timing_thread;
00111
00112 static int pthread_timer_open(void)
00113 {
00114 struct pthread_timer *timer;
00115 int fd;
00116
00117 if (!(timer = ao2_alloc(sizeof(*timer), pthread_timer_destructor))) {
00118 errno = ENOMEM;
00119 return -1;
00120 }
00121
00122 timer->pipe[PIPE_READ] = timer->pipe[PIPE_WRITE] = -1;
00123 timer->state = TIMER_STATE_IDLE;
00124
00125 if (pipe(timer->pipe)) {
00126 ao2_ref(timer, -1);
00127 return -1;
00128 }
00129
00130 ao2_lock(pthread_timers);
00131 if (!ao2_container_count(pthread_timers)) {
00132 ast_mutex_lock(&timing_thread.lock);
00133 ast_cond_signal(&timing_thread.cond);
00134 ast_mutex_unlock(&timing_thread.lock);
00135 }
00136 ao2_link(pthread_timers, timer);
00137 ao2_unlock(pthread_timers);
00138
00139 fd = timer->pipe[PIPE_READ];
00140
00141 ao2_ref(timer, -1);
00142
00143 return fd;
00144 }
00145
00146 static void pthread_timer_close(int handle)
00147 {
00148 struct pthread_timer *timer;
00149
00150 if (!(timer = find_timer(handle, 1))) {
00151 return;
00152 }
00153
00154 ao2_ref(timer, -1);
00155 }
00156
00157 static int pthread_timer_set_rate(int handle, unsigned int rate)
00158 {
00159 struct pthread_timer *timer;
00160
00161 if (!(timer = find_timer(handle, 0))) {
00162 errno = EINVAL;
00163 return -1;
00164 }
00165
00166 if (rate > MAX_RATE) {
00167 ast_log(LOG_ERROR, "res_timing_pthread only supports timers at a "
00168 "max rate of %d / sec\n", MAX_RATE);
00169 errno = EINVAL;
00170 return -1;
00171 }
00172
00173 ao2_lock(timer);
00174
00175 if ((timer->rate = rate)) {
00176 timer->interval = roundf(1000.0 / ((float) rate));
00177 timer->start = ast_tvnow();
00178 timer->state = TIMER_STATE_TICKING;
00179 } else {
00180 timer->interval = 0;
00181 timer->start = ast_tv(0, 0);
00182 timer->state = TIMER_STATE_IDLE;
00183 }
00184 timer->tick_count = 0;
00185
00186 ao2_unlock(timer);
00187
00188 ao2_ref(timer, -1);
00189
00190 return 0;
00191 }
00192
00193 static void pthread_timer_ack(int handle, unsigned int quantity)
00194 {
00195 struct pthread_timer *timer;
00196
00197 ast_assert(quantity > 0);
00198
00199 if (!(timer = find_timer(handle, 0))) {
00200 return;
00201 }
00202
00203 ao2_lock(timer);
00204 read_pipe(timer, quantity);
00205 ao2_unlock(timer);
00206
00207 ao2_ref(timer, -1);
00208 }
00209
00210 static int pthread_timer_enable_continuous(int handle)
00211 {
00212 struct pthread_timer *timer;
00213
00214 if (!(timer = find_timer(handle, 0))) {
00215 errno = EINVAL;
00216 return -1;
00217 }
00218
00219 ao2_lock(timer);
00220 if (!timer->continuous) {
00221 timer->continuous = 1;
00222 write_byte(timer);
00223 }
00224 ao2_unlock(timer);
00225
00226 ao2_ref(timer, -1);
00227
00228 return 0;
00229 }
00230
00231 static int pthread_timer_disable_continuous(int handle)
00232 {
00233 struct pthread_timer *timer;
00234
00235 if (!(timer = find_timer(handle, 0))) {
00236 errno = EINVAL;
00237 return -1;
00238 }
00239
00240 ao2_lock(timer);
00241 if (timer->continuous) {
00242 timer->continuous = 0;
00243 read_pipe(timer, 1);
00244 }
00245 ao2_unlock(timer);
00246
00247 ao2_ref(timer, -1);
00248
00249 return 0;
00250 }
00251
00252 static enum ast_timer_event pthread_timer_get_event(int handle)
00253 {
00254 struct pthread_timer *timer;
00255 enum ast_timer_event res = AST_TIMING_EVENT_EXPIRED;
00256
00257 if (!(timer = find_timer(handle, 0))) {
00258 return res;
00259 }
00260
00261 ao2_lock(timer);
00262 if (timer->continuous && timer->pending_ticks == 1) {
00263 res = AST_TIMING_EVENT_CONTINUOUS;
00264 }
00265 ao2_unlock(timer);
00266
00267 ao2_ref(timer, -1);
00268
00269 return res;
00270 }
00271
00272 static unsigned int pthread_timer_get_max_rate(int handle)
00273 {
00274 return MAX_RATE;
00275 }
00276
00277 static struct pthread_timer *find_timer(int handle, int unlinkobj)
00278 {
00279 struct pthread_timer *timer;
00280 struct pthread_timer tmp_timer;
00281 int flags = OBJ_POINTER;
00282
00283 tmp_timer.pipe[PIPE_READ] = handle;
00284
00285 if (unlinkobj) {
00286 flags |= OBJ_UNLINK;
00287 }
00288
00289 if (!(timer = ao2_find(pthread_timers, &tmp_timer, flags))) {
00290 ast_assert(timer != NULL);
00291 return NULL;
00292 }
00293
00294 return timer;
00295 }
00296
00297 static void pthread_timer_destructor(void *obj)
00298 {
00299 struct pthread_timer *timer = obj;
00300
00301 if (timer->pipe[PIPE_READ] > -1) {
00302 close(timer->pipe[PIPE_READ]);
00303 timer->pipe[PIPE_READ] = -1;
00304 }
00305
00306 if (timer->pipe[PIPE_WRITE] > -1) {
00307 close(timer->pipe[PIPE_WRITE]);
00308 timer->pipe[PIPE_WRITE] = -1;
00309 }
00310 }
00311
00312
00313
00314
00315 static int pthread_timer_hash(const void *obj, const int flags)
00316 {
00317 const struct pthread_timer *timer = obj;
00318
00319 return timer->pipe[PIPE_READ];
00320 }
00321
00322
00323
00324
00325 static int pthread_timer_cmp(void *obj, void *arg, int flags)
00326 {
00327 struct pthread_timer *timer1 = obj, *timer2 = arg;
00328
00329 return (timer1->pipe[PIPE_READ] == timer2->pipe[PIPE_READ]) ? CMP_MATCH | CMP_STOP : 0;
00330 }
00331
00332
00333
00334
00335
00336 static int check_timer(struct pthread_timer *timer)
00337 {
00338 struct timeval now;
00339
00340 if (timer->state == TIMER_STATE_IDLE) {
00341 return 0;
00342 }
00343
00344 now = ast_tvnow();
00345
00346 if (timer->tick_count < (ast_tvdiff_ms(now, timer->start) / timer->interval)) {
00347 timer->tick_count++;
00348 if (!timer->tick_count) {
00349
00350 timer->start = now;
00351 }
00352 return 1;
00353 }
00354
00355 return 0;
00356 }
00357
00358
00359
00360
00361
00362 static void read_pipe(struct pthread_timer *timer, unsigned int quantity)
00363 {
00364 int rd_fd = timer->pipe[PIPE_READ];
00365 int pending_ticks = timer->pending_ticks;
00366
00367 ast_assert(quantity);
00368
00369 if (timer->continuous && pending_ticks) {
00370 pending_ticks--;
00371 }
00372
00373 if (quantity > pending_ticks) {
00374 quantity = pending_ticks;
00375 }
00376
00377 if (!quantity) {
00378 return;
00379 }
00380
00381 do {
00382 unsigned char buf[1024];
00383 ssize_t res;
00384 struct pollfd pfd = {
00385 .fd = rd_fd,
00386 .events = POLLIN,
00387 };
00388
00389 if (ast_poll(&pfd, 1, 0) != 1) {
00390 ast_debug(1, "Reading not available on timing pipe, "
00391 "quantity: %u\n", quantity);
00392 break;
00393 }
00394
00395 res = read(rd_fd, buf,
00396 (quantity < sizeof(buf)) ? quantity : sizeof(buf));
00397
00398 if (res == -1) {
00399 if (errno == EAGAIN) {
00400 continue;
00401 }
00402 ast_log(LOG_ERROR, "read failed on timing pipe: %s\n",
00403 strerror(errno));
00404 break;
00405 }
00406
00407 quantity -= res;
00408 timer->pending_ticks -= res;
00409 } while (quantity);
00410 }
00411
00412
00413
00414
00415
00416 static void write_byte(struct pthread_timer *timer)
00417 {
00418 ssize_t res;
00419 unsigned char x = 42;
00420
00421 do {
00422 res = write(timer->pipe[PIPE_WRITE], &x, 1);
00423 } while (res == -1 && errno == EAGAIN);
00424
00425 if (res == -1) {
00426 ast_log(LOG_ERROR, "Error writing to timing pipe: %s\n",
00427 strerror(errno));
00428 } else {
00429 timer->pending_ticks++;
00430 }
00431 }
00432
00433 static int run_timer(void *obj, void *arg, int flags)
00434 {
00435 struct pthread_timer *timer = obj;
00436
00437 if (timer->state == TIMER_STATE_IDLE) {
00438 return 0;
00439 }
00440
00441 ao2_lock(timer);
00442 if (check_timer(timer)) {
00443 write_byte(timer);
00444 }
00445 ao2_unlock(timer);
00446
00447 return 0;
00448 }
00449
00450 static void *do_timing(void *arg)
00451 {
00452 struct timeval next_wakeup = ast_tvnow();
00453
00454 while (!timing_thread.stop) {
00455 struct timespec ts = { 0, };
00456
00457 ao2_callback(pthread_timers, OBJ_NODATA, run_timer, NULL);
00458
00459 next_wakeup = ast_tvadd(next_wakeup, ast_tv(0, 5000));
00460
00461 ts.tv_sec = next_wakeup.tv_sec;
00462 ts.tv_nsec = next_wakeup.tv_usec * 1000;
00463
00464 ast_mutex_lock(&timing_thread.lock);
00465 if (!timing_thread.stop) {
00466 if (ao2_container_count(pthread_timers)) {
00467 ast_cond_timedwait(&timing_thread.cond, &timing_thread.lock, &ts);
00468 } else {
00469 ast_cond_wait(&timing_thread.cond, &timing_thread.lock);
00470 }
00471 }
00472 ast_mutex_unlock(&timing_thread.lock);
00473 }
00474
00475 return NULL;
00476 }
00477
00478 static int init_timing_thread(void)
00479 {
00480 ast_mutex_init(&timing_thread.lock);
00481 ast_cond_init(&timing_thread.cond, NULL);
00482
00483 if (ast_pthread_create_background(&timing_thread.thread, NULL, do_timing, NULL)) {
00484 ast_log(LOG_ERROR, "Unable to start timing thread.\n");
00485 return -1;
00486 }
00487
00488 return 0;
00489 }
00490
00491 static int load_module(void)
00492 {
00493 if (!(pthread_timers = ao2_container_alloc(PTHREAD_TIMER_BUCKETS,
00494 pthread_timer_hash, pthread_timer_cmp))) {
00495 return AST_MODULE_LOAD_DECLINE;
00496 }
00497
00498 if (init_timing_thread()) {
00499 ao2_ref(pthread_timers, -1);
00500 pthread_timers = NULL;
00501 return AST_MODULE_LOAD_DECLINE;
00502 }
00503
00504 return (timing_funcs_handle = ast_register_timing_interface(&pthread_timing)) ?
00505 AST_MODULE_LOAD_SUCCESS : AST_MODULE_LOAD_DECLINE;
00506 }
00507
00508 static int unload_module(void)
00509 {
00510 int res;
00511
00512 ast_mutex_lock(&timing_thread.lock);
00513 timing_thread.stop = 1;
00514 ast_cond_signal(&timing_thread.cond);
00515 ast_mutex_unlock(&timing_thread.lock);
00516 pthread_join(timing_thread.thread, NULL);
00517
00518 if (!(res = ast_unregister_timing_interface(timing_funcs_handle))) {
00519 ao2_ref(pthread_timers, -1);
00520 pthread_timers = NULL;
00521 }
00522
00523 return res;
00524 }
00525 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_LOAD_ORDER, "pthread Timing Interface",
00526 .load = load_module,
00527 .unload = unload_module,
00528 .load_pri = AST_MODPRI_TIMING,
00529 );