00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 #include "asterisk.h"
00031
00032 ASTERISK_FILE_VERSION(__FILE__, "$Revision: 413586 $")
00033
00034 #ifdef DEBUG_SCHEDULER
00035 #define DEBUG(a) do { \
00036 if (option_debug) \
00037 DEBUG_M(a) \
00038 } while (0)
00039 #else
00040 #define DEBUG(a)
00041 #endif
00042
00043 #include <sys/time.h>
00044
00045 #include "asterisk/sched.h"
00046 #include "asterisk/channel.h"
00047 #include "asterisk/lock.h"
00048 #include "asterisk/utils.h"
00049 #include "asterisk/linkedlists.h"
00050 #include "asterisk/dlinkedlists.h"
00051 #include "asterisk/hashtab.h"
00052 #include "asterisk/heap.h"
00053 #include "asterisk/threadstorage.h"
00054
00055 AST_THREADSTORAGE(last_del_id);
00056
00057 struct sched {
00058 AST_LIST_ENTRY(sched) list;
00059 int id;
00060 struct timeval when;
00061 int resched;
00062 int variable;
00063 const void *data;
00064 ast_sched_cb callback;
00065 ssize_t __heap_index;
00066 };
00067
00068 struct sched_context {
00069 ast_mutex_t lock;
00070 unsigned int eventcnt;
00071 unsigned int schedcnt;
00072 unsigned int highwater;
00073 struct ast_hashtab *schedq_ht;
00074 struct ast_heap *sched_heap;
00075
00076 #ifdef SCHED_MAX_CACHE
00077 AST_LIST_HEAD_NOLOCK(, sched) schedc;
00078 unsigned int schedccnt;
00079 #endif
00080 };
00081
00082 struct ast_sched_thread {
00083 pthread_t thread;
00084 ast_mutex_t lock;
00085 ast_cond_t cond;
00086 struct sched_context *context;
00087 unsigned int stop:1;
00088 };
00089
00090 static void *sched_run(void *data)
00091 {
00092 struct ast_sched_thread *st = data;
00093
00094 while (!st->stop) {
00095 int ms;
00096 struct timespec ts = {
00097 .tv_sec = 0,
00098 };
00099
00100 ast_mutex_lock(&st->lock);
00101
00102 if (st->stop) {
00103 ast_mutex_unlock(&st->lock);
00104 return NULL;
00105 }
00106
00107 ms = ast_sched_wait(st->context);
00108
00109 if (ms == -1) {
00110 ast_cond_wait(&st->cond, &st->lock);
00111 } else {
00112 struct timeval tv;
00113 tv = ast_tvadd(ast_tvnow(), ast_samp2tv(ms, 1000));
00114 ts.tv_sec = tv.tv_sec;
00115 ts.tv_nsec = tv.tv_usec * 1000;
00116 ast_cond_timedwait(&st->cond, &st->lock, &ts);
00117 }
00118
00119 ast_mutex_unlock(&st->lock);
00120
00121 if (st->stop) {
00122 return NULL;
00123 }
00124
00125 ast_sched_runq(st->context);
00126 }
00127
00128 return NULL;
00129 }
00130
00131 void ast_sched_thread_poke(struct ast_sched_thread *st)
00132 {
00133 ast_mutex_lock(&st->lock);
00134 ast_cond_signal(&st->cond);
00135 ast_mutex_unlock(&st->lock);
00136 }
00137
00138 struct sched_context *ast_sched_thread_get_context(struct ast_sched_thread *st)
00139 {
00140 return st->context;
00141 }
00142
00143 struct ast_sched_thread *ast_sched_thread_destroy(struct ast_sched_thread *st)
00144 {
00145 if (st->thread != AST_PTHREADT_NULL) {
00146 ast_mutex_lock(&st->lock);
00147 st->stop = 1;
00148 ast_cond_signal(&st->cond);
00149 ast_mutex_unlock(&st->lock);
00150 pthread_join(st->thread, NULL);
00151 st->thread = AST_PTHREADT_NULL;
00152 }
00153
00154 ast_mutex_destroy(&st->lock);
00155 ast_cond_destroy(&st->cond);
00156
00157 if (st->context) {
00158 sched_context_destroy(st->context);
00159 st->context = NULL;
00160 }
00161
00162 ast_free(st);
00163
00164 return NULL;
00165 }
00166
00167 struct ast_sched_thread *ast_sched_thread_create(void)
00168 {
00169 struct ast_sched_thread *st;
00170
00171 if (!(st = ast_calloc(1, sizeof(*st)))) {
00172 return NULL;
00173 }
00174
00175 ast_mutex_init(&st->lock);
00176 ast_cond_init(&st->cond, NULL);
00177
00178 st->thread = AST_PTHREADT_NULL;
00179
00180 if (!(st->context = sched_context_create())) {
00181 ast_log(LOG_ERROR, "Failed to create scheduler\n");
00182 ast_sched_thread_destroy(st);
00183 return NULL;
00184 }
00185
00186 if (ast_pthread_create_background(&st->thread, NULL, sched_run, st)) {
00187 ast_log(LOG_ERROR, "Failed to create scheduler thread\n");
00188 ast_sched_thread_destroy(st);
00189 return NULL;
00190 }
00191
00192 return st;
00193 }
00194
00195 int ast_sched_thread_add_variable(struct ast_sched_thread *st, int when, ast_sched_cb cb,
00196 const void *data, int variable)
00197 {
00198 int res;
00199
00200 ast_mutex_lock(&st->lock);
00201 res = ast_sched_add_variable(st->context, when, cb, data, variable);
00202 if (res != -1) {
00203 ast_cond_signal(&st->cond);
00204 }
00205 ast_mutex_unlock(&st->lock);
00206
00207 return res;
00208 }
00209
00210 int ast_sched_thread_add(struct ast_sched_thread *st, int when, ast_sched_cb cb,
00211 const void *data)
00212 {
00213 int res;
00214
00215 ast_mutex_lock(&st->lock);
00216 res = ast_sched_add(st->context, when, cb, data);
00217 if (res != -1) {
00218 ast_cond_signal(&st->cond);
00219 }
00220 ast_mutex_unlock(&st->lock);
00221
00222 return res;
00223 }
00224
00225
00226
00227 static int sched_cmp(const void *a, const void *b)
00228 {
00229 const struct sched *as = a;
00230 const struct sched *bs = b;
00231 return as->id != bs->id;
00232 }
00233
00234 static unsigned int sched_hash(const void *obj)
00235 {
00236 const struct sched *s = obj;
00237 unsigned int h = s->id;
00238 return h;
00239 }
00240
00241 static int sched_time_cmp(void *a, void *b)
00242 {
00243 return ast_tvcmp(((struct sched *) b)->when, ((struct sched *) a)->when);
00244 }
00245
00246 struct sched_context *sched_context_create(void)
00247 {
00248 struct sched_context *tmp;
00249
00250 if (!(tmp = ast_calloc(1, sizeof(*tmp))))
00251 return NULL;
00252
00253 ast_mutex_init(&tmp->lock);
00254 tmp->eventcnt = 1;
00255
00256 tmp->schedq_ht = ast_hashtab_create(23, sched_cmp, ast_hashtab_resize_java, ast_hashtab_newsize_java, sched_hash, 1);
00257
00258 if (!(tmp->sched_heap = ast_heap_create(8, sched_time_cmp,
00259 offsetof(struct sched, __heap_index)))) {
00260 sched_context_destroy(tmp);
00261 return NULL;
00262 }
00263
00264 return tmp;
00265 }
00266
00267 void sched_context_destroy(struct sched_context *con)
00268 {
00269 struct sched *s;
00270
00271 ast_mutex_lock(&con->lock);
00272
00273 #ifdef SCHED_MAX_CACHE
00274
00275 while ((s = AST_LIST_REMOVE_HEAD(&con->schedc, list)))
00276 ast_free(s);
00277 #endif
00278
00279 if (con->sched_heap) {
00280 while ((s = ast_heap_pop(con->sched_heap))) {
00281 ast_free(s);
00282 }
00283 ast_heap_destroy(con->sched_heap);
00284 con->sched_heap = NULL;
00285 }
00286
00287 ast_hashtab_destroy(con->schedq_ht, NULL);
00288 con->schedq_ht = NULL;
00289
00290
00291 ast_mutex_unlock(&con->lock);
00292 ast_mutex_destroy(&con->lock);
00293 ast_free(con);
00294 }
00295
00296 static struct sched *sched_alloc(struct sched_context *con)
00297 {
00298 struct sched *tmp;
00299
00300
00301
00302
00303
00304 #ifdef SCHED_MAX_CACHE
00305 if ((tmp = AST_LIST_REMOVE_HEAD(&con->schedc, list)))
00306 con->schedccnt--;
00307 else
00308 #endif
00309 tmp = ast_calloc(1, sizeof(*tmp));
00310
00311 return tmp;
00312 }
00313
00314 static void sched_release(struct sched_context *con, struct sched *tmp)
00315 {
00316
00317
00318
00319
00320
00321 #ifdef SCHED_MAX_CACHE
00322 if (con->schedccnt < SCHED_MAX_CACHE) {
00323 AST_LIST_INSERT_HEAD(&con->schedc, tmp, list);
00324 con->schedccnt++;
00325 } else
00326 #endif
00327 ast_free(tmp);
00328 }
00329
00330
00331
00332
00333
00334 int ast_sched_wait(struct sched_context *con)
00335 {
00336 int ms;
00337 struct sched *s;
00338
00339 DEBUG(ast_debug(1, "ast_sched_wait()\n"));
00340
00341 ast_mutex_lock(&con->lock);
00342 if ((s = ast_heap_peek(con->sched_heap, 1))) {
00343 ms = ast_tvdiff_ms(s->when, ast_tvnow());
00344 if (ms < 0) {
00345 ms = 0;
00346 }
00347 } else {
00348 ms = -1;
00349 }
00350 ast_mutex_unlock(&con->lock);
00351
00352 return ms;
00353 }
00354
00355
00356
00357
00358
00359
00360
00361 static void schedule(struct sched_context *con, struct sched *s)
00362 {
00363 ast_heap_push(con->sched_heap, s);
00364
00365 if (!ast_hashtab_insert_safe(con->schedq_ht, s)) {
00366 ast_log(LOG_WARNING,"Schedule Queue entry %d is already in table!\n", s->id);
00367 }
00368
00369 con->schedcnt++;
00370
00371 if (con->schedcnt > con->highwater) {
00372 con->highwater = con->schedcnt;
00373 }
00374 }
00375
00376
00377
00378
00379
00380 static int sched_settime(struct timeval *t, int when)
00381 {
00382 struct timeval now = ast_tvnow();
00383
00384
00385 if (ast_tvzero(*t))
00386 *t = now;
00387 *t = ast_tvadd(*t, ast_samp2tv(when, 1000));
00388 if (ast_tvcmp(*t, now) < 0) {
00389 *t = now;
00390 }
00391 return 0;
00392 }
00393
00394 int ast_sched_replace_variable(int old_id, struct sched_context *con, int when, ast_sched_cb callback, const void *data, int variable)
00395 {
00396
00397 if (old_id > 0) {
00398 AST_SCHED_DEL(con, old_id);
00399 }
00400 return ast_sched_add_variable(con, when, callback, data, variable);
00401 }
00402
00403
00404
00405
00406 int ast_sched_add_variable(struct sched_context *con, int when, ast_sched_cb callback, const void *data, int variable)
00407 {
00408 struct sched *tmp;
00409 int res = -1;
00410
00411 DEBUG(ast_debug(1, "ast_sched_add()\n"));
00412
00413 ast_mutex_lock(&con->lock);
00414 if ((tmp = sched_alloc(con))) {
00415 tmp->id = con->eventcnt++;
00416 tmp->callback = callback;
00417 tmp->data = data;
00418 tmp->resched = when;
00419 tmp->variable = variable;
00420 tmp->when = ast_tv(0, 0);
00421 if (sched_settime(&tmp->when, when)) {
00422 sched_release(con, tmp);
00423 } else {
00424 schedule(con, tmp);
00425 res = tmp->id;
00426 }
00427 }
00428 #ifdef DUMP_SCHEDULER
00429
00430 if (option_debug)
00431 ast_sched_dump(con);
00432 #endif
00433 ast_mutex_unlock(&con->lock);
00434
00435 return res;
00436 }
00437
00438 int ast_sched_replace(int old_id, struct sched_context *con, int when, ast_sched_cb callback, const void *data)
00439 {
00440 if (old_id > -1) {
00441 AST_SCHED_DEL(con, old_id);
00442 }
00443 return ast_sched_add(con, when, callback, data);
00444 }
00445
00446 int ast_sched_add(struct sched_context *con, int when, ast_sched_cb callback, const void *data)
00447 {
00448 return ast_sched_add_variable(con, when, callback, data, 0);
00449 }
00450
00451 const void *ast_sched_find_data(struct sched_context *con, int id)
00452 {
00453 struct sched tmp,*res;
00454 tmp.id = id;
00455 res = ast_hashtab_lookup(con->schedq_ht, &tmp);
00456 if (res)
00457 return res->data;
00458 return NULL;
00459 }
00460
00461
00462
00463
00464
00465
00466
00467 #ifndef AST_DEVMODE
00468 int ast_sched_del(struct sched_context *con, int id)
00469 #else
00470 int _ast_sched_del(struct sched_context *con, int id, const char *file, int line, const char *function)
00471 #endif
00472 {
00473 struct sched *s, tmp = {
00474 .id = id,
00475 };
00476 int *last_id = ast_threadstorage_get(&last_del_id, sizeof(int));
00477
00478 DEBUG(ast_debug(1, "ast_sched_del(%d)\n", id));
00479
00480 if (id < 0) {
00481 return 0;
00482 }
00483
00484 ast_mutex_lock(&con->lock);
00485 s = ast_hashtab_lookup(con->schedq_ht, &tmp);
00486 if (s) {
00487 if (!ast_heap_remove(con->sched_heap, s)) {
00488 ast_log(LOG_WARNING,"sched entry %d not in the sched heap?\n", s->id);
00489 }
00490
00491 if (!ast_hashtab_remove_this_object(con->schedq_ht, s)) {
00492 ast_log(LOG_WARNING,"Found sched entry %d, then couldn't remove it?\n", s->id);
00493 }
00494
00495 con->schedcnt--;
00496
00497 sched_release(con, s);
00498 }
00499
00500 #ifdef DUMP_SCHEDULER
00501
00502 if (option_debug)
00503 ast_sched_dump(con);
00504 #endif
00505 ast_mutex_unlock(&con->lock);
00506
00507 if (!s && *last_id != id) {
00508 ast_debug(1, "Attempted to delete nonexistent schedule entry %d!\n", id);
00509 #ifndef AST_DEVMODE
00510 ast_assert(s != NULL);
00511 #else
00512 {
00513 char buf[100];
00514 snprintf(buf, sizeof(buf), "s != NULL, id=%d", id);
00515 _ast_assert(0, buf, file, line, function);
00516 }
00517 #endif
00518 *last_id = id;
00519 return -1;
00520 } else if (!s) {
00521 return -1;
00522 }
00523
00524 return 0;
00525 }
00526
00527 void ast_sched_report(struct sched_context *con, struct ast_str **buf, struct ast_cb_names *cbnames)
00528 {
00529 int i, x;
00530 struct sched *cur;
00531 int countlist[cbnames->numassocs + 1];
00532 size_t heap_size;
00533
00534 memset(countlist, 0, sizeof(countlist));
00535 ast_str_set(buf, 0, " Highwater = %u\n schedcnt = %u\n", con->highwater, con->schedcnt);
00536
00537 ast_mutex_lock(&con->lock);
00538
00539 heap_size = ast_heap_size(con->sched_heap);
00540 for (x = 1; x <= heap_size; x++) {
00541 cur = ast_heap_peek(con->sched_heap, x);
00542
00543 for (i = 0; i < cbnames->numassocs; i++) {
00544 if (cur->callback == cbnames->cblist[i]) {
00545 break;
00546 }
00547 }
00548 if (i < cbnames->numassocs) {
00549 countlist[i]++;
00550 } else {
00551 countlist[cbnames->numassocs]++;
00552 }
00553 }
00554
00555 ast_mutex_unlock(&con->lock);
00556
00557 for (i = 0; i < cbnames->numassocs; i++) {
00558 ast_str_append(buf, 0, " %s : %d\n", cbnames->list[i], countlist[i]);
00559 }
00560
00561 ast_str_append(buf, 0, " <unknown> : %d\n", countlist[cbnames->numassocs]);
00562 }
00563
00564
00565 void ast_sched_dump(struct sched_context *con)
00566 {
00567 struct sched *q;
00568 struct timeval when = ast_tvnow();
00569 int x;
00570 size_t heap_size;
00571 #ifdef SCHED_MAX_CACHE
00572 ast_debug(1, "Asterisk Schedule Dump (%u in Q, %u Total, %u Cache, %u high-water)\n", con->schedcnt, con->eventcnt - 1, con->schedccnt, con->highwater);
00573 #else
00574 ast_debug(1, "Asterisk Schedule Dump (%u in Q, %u Total, %u high-water)\n", con->schedcnt, con->eventcnt - 1, con->highwater);
00575 #endif
00576
00577 ast_debug(1, "=============================================================\n");
00578 ast_debug(1, "|ID Callback Data Time (sec:ms) |\n");
00579 ast_debug(1, "+-----+-----------------+-----------------+-----------------+\n");
00580 ast_mutex_lock(&con->lock);
00581 heap_size = ast_heap_size(con->sched_heap);
00582 for (x = 1; x <= heap_size; x++) {
00583 struct timeval delta;
00584 q = ast_heap_peek(con->sched_heap, x);
00585 delta = ast_tvsub(q->when, when);
00586 ast_debug(1, "|%.4d | %-15p | %-15p | %.6ld : %.6ld |\n",
00587 q->id,
00588 q->callback,
00589 q->data,
00590 (long)delta.tv_sec,
00591 (long int)delta.tv_usec);
00592 }
00593 ast_mutex_unlock(&con->lock);
00594 ast_debug(1, "=============================================================\n");
00595 }
00596
00597
00598
00599
00600 int ast_sched_runq(struct sched_context *con)
00601 {
00602 struct sched *current;
00603 struct timeval when;
00604 int numevents;
00605 int res;
00606
00607 DEBUG(ast_debug(1, "ast_sched_runq()\n"));
00608
00609 ast_mutex_lock(&con->lock);
00610
00611 when = ast_tvadd(ast_tvnow(), ast_tv(0, 1000));
00612 for (numevents = 0; (current = ast_heap_peek(con->sched_heap, 1)); numevents++) {
00613
00614
00615
00616
00617
00618 if (ast_tvcmp(current->when, when) != -1) {
00619 break;
00620 }
00621
00622 current = ast_heap_pop(con->sched_heap);
00623
00624 if (!ast_hashtab_remove_this_object(con->schedq_ht, current)) {
00625 ast_log(LOG_ERROR,"Sched entry %d was in the schedq list but not in the hashtab???\n", current->id);
00626 }
00627
00628 con->schedcnt--;
00629
00630
00631
00632
00633
00634
00635
00636
00637
00638
00639 ast_mutex_unlock(&con->lock);
00640 res = current->callback(current->data);
00641 ast_mutex_lock(&con->lock);
00642
00643 if (res) {
00644
00645
00646
00647
00648 if (sched_settime(¤t->when, current->variable? res : current->resched)) {
00649 sched_release(con, current);
00650 } else {
00651 schedule(con, current);
00652 }
00653 } else {
00654
00655 sched_release(con, current);
00656 }
00657 }
00658
00659 ast_mutex_unlock(&con->lock);
00660
00661 return numevents;
00662 }
00663
00664 long ast_sched_when(struct sched_context *con,int id)
00665 {
00666 struct sched *s, tmp;
00667 long secs = -1;
00668 DEBUG(ast_debug(1, "ast_sched_when()\n"));
00669
00670 ast_mutex_lock(&con->lock);
00671
00672
00673 tmp.id = id;
00674 s = ast_hashtab_lookup(con->schedq_ht, &tmp);
00675
00676 if (s) {
00677 struct timeval now = ast_tvnow();
00678 secs = s->when.tv_sec - now.tv_sec;
00679 }
00680 ast_mutex_unlock(&con->lock);
00681
00682 return secs;
00683 }