00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026 #include "asterisk.h"
00027
00028 ASTERISK_FILE_VERSION(__FILE__, "$Revision: 199745 $");
00029
00030 #include <math.h>
00031 #include <sys/select.h>
00032
00033 #include "asterisk/module.h"
00034 #include "asterisk/timing.h"
00035 #include "asterisk/utils.h"
00036 #include "asterisk/astobj2.h"
00037 #include "asterisk/time.h"
00038 #include "asterisk/lock.h"
00039
00040 static void *timing_funcs_handle;
00041
00042 static int pthread_timer_open(void);
00043 static void pthread_timer_close(int handle);
00044 static int pthread_timer_set_rate(int handle, unsigned int rate);
00045 static void pthread_timer_ack(int handle, unsigned int quantity);
00046 static int pthread_timer_enable_continuous(int handle);
00047 static int pthread_timer_disable_continuous(int handle);
00048 static enum ast_timer_event pthread_timer_get_event(int handle);
00049 static unsigned int pthread_timer_get_max_rate(int handle);
00050
00051 static struct ast_timing_interface pthread_timing = {
00052 .name = "pthread",
00053 .priority = 0,
00054 .timer_open = pthread_timer_open,
00055 .timer_close = pthread_timer_close,
00056 .timer_set_rate = pthread_timer_set_rate,
00057 .timer_ack = pthread_timer_ack,
00058 .timer_enable_continuous = pthread_timer_enable_continuous,
00059 .timer_disable_continuous = pthread_timer_disable_continuous,
00060 .timer_get_event = pthread_timer_get_event,
00061 .timer_get_max_rate = pthread_timer_get_max_rate,
00062 };
00063
00064
00065 #define MAX_RATE 100
00066
00067 static struct ao2_container *pthread_timers;
00068 #define PTHREAD_TIMER_BUCKETS 563
00069
00070 enum {
00071 PIPE_READ = 0,
00072 PIPE_WRITE = 1
00073 };
00074
00075 enum pthread_timer_state {
00076 TIMER_STATE_IDLE,
00077 TIMER_STATE_TICKING,
00078 };
00079
00080 struct pthread_timer {
00081 int pipe[2];
00082 enum pthread_timer_state state;
00083 unsigned int rate;
00084
00085 unsigned int interval;
00086 unsigned int tick_count;
00087 unsigned int pending_ticks;
00088 struct timeval start;
00089 unsigned int continuous:1;
00090 };
00091
00092 static void pthread_timer_destructor(void *obj);
00093 static struct pthread_timer *find_timer(int handle, int unlinkobj);
00094 static void write_byte(struct pthread_timer *timer);
00095 static void read_pipe(struct pthread_timer *timer, unsigned int num);
00096
00097
00098
00099
00100 static struct {
00101 pthread_t thread;
00102 ast_mutex_t lock;
00103 ast_cond_t cond;
00104 unsigned int stop:1;
00105 } timing_thread;
00106
00107 static int pthread_timer_open(void)
00108 {
00109 struct pthread_timer *timer;
00110 int fd;
00111
00112 if (!(timer = ao2_alloc(sizeof(*timer), pthread_timer_destructor))) {
00113 errno = ENOMEM;
00114 return -1;
00115 }
00116
00117 timer->pipe[PIPE_READ] = timer->pipe[PIPE_WRITE] = -1;
00118 timer->state = TIMER_STATE_IDLE;
00119
00120 if (pipe(timer->pipe)) {
00121 ao2_ref(timer, -1);
00122 return -1;
00123 }
00124
00125 ao2_lock(pthread_timers);
00126 if (!ao2_container_count(pthread_timers)) {
00127 ast_mutex_lock(&timing_thread.lock);
00128 ast_cond_signal(&timing_thread.cond);
00129 ast_mutex_unlock(&timing_thread.lock);
00130 }
00131 ao2_link(pthread_timers, timer);
00132 ao2_unlock(pthread_timers);
00133
00134 fd = timer->pipe[PIPE_READ];
00135
00136 ao2_ref(timer, -1);
00137
00138 return fd;
00139 }
00140
00141 static void pthread_timer_close(int handle)
00142 {
00143 struct pthread_timer *timer;
00144
00145 if (!(timer = find_timer(handle, 1))) {
00146 return;
00147 }
00148
00149 ao2_ref(timer, -1);
00150 }
00151
00152 static int pthread_timer_set_rate(int handle, unsigned int rate)
00153 {
00154 struct pthread_timer *timer;
00155
00156 if (!(timer = find_timer(handle, 0))) {
00157 errno = EINVAL;
00158 return -1;
00159 }
00160
00161 if (rate > MAX_RATE) {
00162 ast_log(LOG_ERROR, "res_timing_pthread only supports timers at a "
00163 "max rate of %d / sec\n", MAX_RATE);
00164 errno = EINVAL;
00165 return -1;
00166 }
00167
00168 ao2_lock(timer);
00169
00170 if ((timer->rate = rate)) {
00171 timer->interval = roundf(1000.0 / ((float) rate));
00172 timer->start = ast_tvnow();
00173 timer->state = TIMER_STATE_TICKING;
00174 } else {
00175 timer->interval = 0;
00176 timer->start = ast_tv(0, 0);
00177 timer->state = TIMER_STATE_IDLE;
00178 }
00179 timer->tick_count = 0;
00180
00181 ao2_unlock(timer);
00182
00183 ao2_ref(timer, -1);
00184
00185 return 0;
00186 }
00187
00188 static void pthread_timer_ack(int handle, unsigned int quantity)
00189 {
00190 struct pthread_timer *timer;
00191
00192 ast_assert(quantity > 0);
00193
00194 if (!(timer = find_timer(handle, 0))) {
00195 return;
00196 }
00197
00198 ao2_lock(timer);
00199 read_pipe(timer, quantity);
00200 ao2_unlock(timer);
00201
00202 ao2_ref(timer, -1);
00203 }
00204
00205 static int pthread_timer_enable_continuous(int handle)
00206 {
00207 struct pthread_timer *timer;
00208
00209 if (!(timer = find_timer(handle, 0))) {
00210 errno = EINVAL;
00211 return -1;
00212 }
00213
00214 ao2_lock(timer);
00215 if (!timer->continuous) {
00216 timer->continuous = 1;
00217 write_byte(timer);
00218 }
00219 ao2_unlock(timer);
00220
00221 ao2_ref(timer, -1);
00222
00223 return 0;
00224 }
00225
00226 static int pthread_timer_disable_continuous(int handle)
00227 {
00228 struct pthread_timer *timer;
00229
00230 if (!(timer = find_timer(handle, 0))) {
00231 errno = EINVAL;
00232 return -1;
00233 }
00234
00235 ao2_lock(timer);
00236 if (timer->continuous) {
00237 timer->continuous = 0;
00238 read_pipe(timer, 1);
00239 }
00240 ao2_unlock(timer);
00241
00242 ao2_ref(timer, -1);
00243
00244 return 0;
00245 }
00246
00247 static enum ast_timer_event pthread_timer_get_event(int handle)
00248 {
00249 struct pthread_timer *timer;
00250 enum ast_timer_event res = AST_TIMING_EVENT_EXPIRED;
00251
00252 if (!(timer = find_timer(handle, 0))) {
00253 return res;
00254 }
00255
00256 ao2_lock(timer);
00257 if (timer->continuous && timer->pending_ticks == 1) {
00258 res = AST_TIMING_EVENT_CONTINUOUS;
00259 }
00260 ao2_unlock(timer);
00261
00262 ao2_ref(timer, -1);
00263
00264 return res;
00265 }
00266
00267 static unsigned int pthread_timer_get_max_rate(int handle)
00268 {
00269 return MAX_RATE;
00270 }
00271
00272 static struct pthread_timer *find_timer(int handle, int unlinkobj)
00273 {
00274 struct pthread_timer *timer;
00275 struct pthread_timer tmp_timer;
00276 int flags = OBJ_POINTER;
00277
00278 tmp_timer.pipe[PIPE_READ] = handle;
00279
00280 if (unlinkobj) {
00281 flags |= OBJ_UNLINK;
00282 }
00283
00284 if (!(timer = ao2_find(pthread_timers, &tmp_timer, flags))) {
00285 ast_assert(timer != NULL);
00286 return NULL;
00287 }
00288
00289 return timer;
00290 }
00291
00292 static void pthread_timer_destructor(void *obj)
00293 {
00294 struct pthread_timer *timer = obj;
00295
00296 if (timer->pipe[PIPE_READ] > -1) {
00297 close(timer->pipe[PIPE_READ]);
00298 timer->pipe[PIPE_READ] = -1;
00299 }
00300
00301 if (timer->pipe[PIPE_WRITE] > -1) {
00302 close(timer->pipe[PIPE_WRITE]);
00303 timer->pipe[PIPE_WRITE] = -1;
00304 }
00305 }
00306
00307
00308
00309
00310 static int pthread_timer_hash(const void *obj, const int flags)
00311 {
00312 const struct pthread_timer *timer = obj;
00313
00314 return timer->pipe[PIPE_READ];
00315 }
00316
00317
00318
00319
00320 static int pthread_timer_cmp(void *obj, void *arg, int flags)
00321 {
00322 struct pthread_timer *timer1 = obj, *timer2 = arg;
00323
00324 return (timer1->pipe[PIPE_READ] == timer2->pipe[PIPE_READ]) ? CMP_MATCH | CMP_STOP : 0;
00325 }
00326
00327
00328
00329
00330
00331 static int check_timer(struct pthread_timer *timer)
00332 {
00333 struct timeval now;
00334
00335 if (timer->state == TIMER_STATE_IDLE) {
00336 return 0;
00337 }
00338
00339 now = ast_tvnow();
00340
00341 if (timer->tick_count < (ast_tvdiff_ms(now, timer->start) / timer->interval)) {
00342 timer->tick_count++;
00343 if (!timer->tick_count) {
00344
00345 timer->start = now;
00346 }
00347 return 1;
00348 }
00349
00350 return 0;
00351 }
00352
00353
00354
00355
00356
00357 static void read_pipe(struct pthread_timer *timer, unsigned int quantity)
00358 {
00359 int rd_fd = timer->pipe[PIPE_READ];
00360 int pending_ticks = timer->pending_ticks;
00361
00362 ast_assert(quantity);
00363
00364 if (timer->continuous && pending_ticks) {
00365 pending_ticks--;
00366 }
00367
00368 if (quantity > pending_ticks) {
00369 quantity = pending_ticks;
00370 }
00371
00372 if (!quantity) {
00373 return;
00374 }
00375
00376 do {
00377 unsigned char buf[1024];
00378 ssize_t res;
00379 fd_set rfds;
00380 struct timeval timeout = {
00381 .tv_sec = 0,
00382 };
00383
00384
00385 FD_ZERO(&rfds);
00386 FD_SET(rd_fd, &rfds);
00387
00388 if (select(rd_fd + 1, &rfds, NULL, NULL, &timeout) != 1) {
00389 ast_debug(1, "Reading not available on timing pipe, "
00390 "quantity: %u\n", quantity);
00391 break;
00392 }
00393
00394 res = read(rd_fd, buf,
00395 (quantity < sizeof(buf)) ? quantity : sizeof(buf));
00396
00397 if (res == -1) {
00398 if (errno == EAGAIN) {
00399 continue;
00400 }
00401 ast_log(LOG_ERROR, "read failed on timing pipe: %s\n",
00402 strerror(errno));
00403 break;
00404 }
00405
00406 quantity -= res;
00407 timer->pending_ticks -= res;
00408 } while (quantity);
00409 }
00410
00411
00412
00413
00414
00415 static void write_byte(struct pthread_timer *timer)
00416 {
00417 ssize_t res;
00418 unsigned char x = 42;
00419
00420 do {
00421 res = write(timer->pipe[PIPE_WRITE], &x, 1);
00422 } while (res == -1 && errno == EAGAIN);
00423
00424 if (res == -1) {
00425 ast_log(LOG_ERROR, "Error writing to timing pipe: %s\n",
00426 strerror(errno));
00427 } else {
00428 timer->pending_ticks++;
00429 }
00430 }
00431
00432 static int run_timer(void *obj, void *arg, int flags)
00433 {
00434 struct pthread_timer *timer = obj;
00435
00436 if (timer->state == TIMER_STATE_IDLE) {
00437 return 0;
00438 }
00439
00440 ao2_lock(timer);
00441 if (check_timer(timer)) {
00442 write_byte(timer);
00443 }
00444 ao2_unlock(timer);
00445
00446 return 0;
00447 }
00448
00449 static void *do_timing(void *arg)
00450 {
00451 struct timeval next_wakeup = ast_tvnow();
00452
00453 while (!timing_thread.stop) {
00454 struct timespec ts = { 0, };
00455
00456 ao2_callback(pthread_timers, OBJ_NODATA, run_timer, NULL);
00457
00458 next_wakeup = ast_tvadd(next_wakeup, ast_tv(0, 5000));
00459
00460 ts.tv_sec = next_wakeup.tv_sec;
00461 ts.tv_nsec = next_wakeup.tv_usec * 1000;
00462
00463 ast_mutex_lock(&timing_thread.lock);
00464 if (!timing_thread.stop) {
00465 if (ao2_container_count(pthread_timers)) {
00466 ast_cond_timedwait(&timing_thread.cond, &timing_thread.lock, &ts);
00467 } else {
00468 ast_cond_wait(&timing_thread.cond, &timing_thread.lock);
00469 }
00470 }
00471 ast_mutex_unlock(&timing_thread.lock);
00472 }
00473
00474 return NULL;
00475 }
00476
00477 static int init_timing_thread(void)
00478 {
00479 ast_mutex_init(&timing_thread.lock);
00480 ast_cond_init(&timing_thread.cond, NULL);
00481
00482 if (ast_pthread_create_background(&timing_thread.thread, NULL, do_timing, NULL)) {
00483 ast_log(LOG_ERROR, "Unable to start timing thread.\n");
00484 return -1;
00485 }
00486
00487 return 0;
00488 }
00489
00490 static int load_module(void)
00491 {
00492 if (!(pthread_timers = ao2_container_alloc(PTHREAD_TIMER_BUCKETS,
00493 pthread_timer_hash, pthread_timer_cmp))) {
00494 return AST_MODULE_LOAD_DECLINE;
00495 }
00496
00497 if (init_timing_thread()) {
00498 ao2_ref(pthread_timers, -1);
00499 pthread_timers = NULL;
00500 return AST_MODULE_LOAD_DECLINE;
00501 }
00502
00503 return (timing_funcs_handle = ast_register_timing_interface(&pthread_timing)) ?
00504 AST_MODULE_LOAD_SUCCESS : AST_MODULE_LOAD_DECLINE;
00505 }
00506
00507 static int unload_module(void)
00508 {
00509 int res;
00510
00511 ast_mutex_lock(&timing_thread.lock);
00512 timing_thread.stop = 1;
00513 ast_cond_signal(&timing_thread.cond);
00514 ast_mutex_unlock(&timing_thread.lock);
00515 pthread_join(timing_thread.thread, NULL);
00516
00517 if (!(res = ast_unregister_timing_interface(timing_funcs_handle))) {
00518 ao2_ref(pthread_timers, -1);
00519 pthread_timers = NULL;
00520 }
00521
00522 return res;
00523 }
00524 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_LOAD_ORDER, "pthread Timing Interface",
00525 .load = load_module,
00526 .unload = unload_module,
00527 .load_pri = 10,
00528 );