Thu Sep 7 01:03:04 2017

Asterisk developer's documentation


sched.c

Go to the documentation of this file.
00001 /*
00002  * Asterisk -- An open source telephony toolkit.
00003  *
00004  * Copyright (C) 1999 - 2008, Digium, Inc.
00005  *
00006  * Mark Spencer <markster@digium.com>
00007  *
00008  * See http://www.asterisk.org for more information about
00009  * the Asterisk project. Please do not directly contact
00010  * any of the maintainers of this project for assistance;
00011  * the project provides a web site, mailing lists and IRC
00012  * channels for your use.
00013  *
00014  * This program is free software, distributed under the terms of
00015  * the GNU General Public License Version 2. See the LICENSE file
00016  * at the top of the source tree.
00017  */
00018 
00019 /*! \file
00020  *
00021  * \brief Scheduler Routines (from cheops-NG)
00022  *
00023  * \author Mark Spencer <markster@digium.com>
00024  */
00025 
00026 /*** MODULEINFO
00027    <support_level>core</support_level>
00028  ***/
00029 
00030 #include "asterisk.h"
00031 
00032 ASTERISK_FILE_VERSION(__FILE__, "$Revision: 413586 $")
00033 
00034 #ifdef DEBUG_SCHEDULER
00035 #define DEBUG(a) do { \
00036    if (option_debug) \
00037       DEBUG_M(a) \
00038    } while (0)
00039 #else
00040 #define DEBUG(a) 
00041 #endif
00042 
00043 #include <sys/time.h>
00044 
00045 #include "asterisk/sched.h"
00046 #include "asterisk/channel.h"
00047 #include "asterisk/lock.h"
00048 #include "asterisk/utils.h"
00049 #include "asterisk/linkedlists.h"
00050 #include "asterisk/dlinkedlists.h"
00051 #include "asterisk/hashtab.h"
00052 #include "asterisk/heap.h"
00053 #include "asterisk/threadstorage.h"
00054 
00055 AST_THREADSTORAGE(last_del_id);
00056 
00057 struct sched {
00058    AST_LIST_ENTRY(sched) list;
00059    int id;                       /*!< ID number of event */
00060    struct timeval when;          /*!< Absolute time event should take place */
00061    int resched;                  /*!< When to reschedule */
00062    int variable;                 /*!< Use return value from callback to reschedule */
00063    const void *data;             /*!< Data */
00064    ast_sched_cb callback;        /*!< Callback */
00065    ssize_t __heap_index;
00066 };
00067 
00068 struct sched_context {
00069    ast_mutex_t lock;
00070    unsigned int eventcnt;                  /*!< Number of events processed */
00071    unsigned int schedcnt;                  /*!< Number of outstanding schedule events */
00072    unsigned int highwater;             /*!< highest count so far */
00073    struct ast_hashtab *schedq_ht;             /*!< hash table for fast searching */
00074    struct ast_heap *sched_heap;
00075 
00076 #ifdef SCHED_MAX_CACHE
00077    AST_LIST_HEAD_NOLOCK(, sched) schedc;   /*!< Cache of unused schedule structures and how many */
00078    unsigned int schedccnt;
00079 #endif
00080 };
00081 
00082 struct ast_sched_thread {
00083    pthread_t thread;
00084    ast_mutex_t lock;
00085    ast_cond_t cond;
00086    struct sched_context *context;
00087    unsigned int stop:1;
00088 };
00089 
00090 static void *sched_run(void *data)
00091 {
00092    struct ast_sched_thread *st = data;
00093 
00094    while (!st->stop) {
00095       int ms;
00096       struct timespec ts = {
00097          .tv_sec = 0,   
00098       };
00099 
00100       ast_mutex_lock(&st->lock);
00101 
00102       if (st->stop) {
00103          ast_mutex_unlock(&st->lock);
00104          return NULL;
00105       }
00106 
00107       ms = ast_sched_wait(st->context);
00108 
00109       if (ms == -1) {
00110          ast_cond_wait(&st->cond, &st->lock);
00111       } else { 
00112          struct timeval tv;
00113          tv = ast_tvadd(ast_tvnow(), ast_samp2tv(ms, 1000));
00114          ts.tv_sec = tv.tv_sec;
00115          ts.tv_nsec = tv.tv_usec * 1000;
00116          ast_cond_timedwait(&st->cond, &st->lock, &ts);
00117       }
00118 
00119       ast_mutex_unlock(&st->lock);
00120 
00121       if (st->stop) {
00122          return NULL;
00123       }
00124 
00125       ast_sched_runq(st->context);
00126    }
00127 
00128    return NULL;
00129 }
00130 
00131 void ast_sched_thread_poke(struct ast_sched_thread *st)
00132 {
00133    ast_mutex_lock(&st->lock);
00134    ast_cond_signal(&st->cond);
00135    ast_mutex_unlock(&st->lock);
00136 }
00137 
00138 struct sched_context *ast_sched_thread_get_context(struct ast_sched_thread *st)
00139 {
00140    return st->context;
00141 }
00142 
00143 struct ast_sched_thread *ast_sched_thread_destroy(struct ast_sched_thread *st)
00144 {
00145    if (st->thread != AST_PTHREADT_NULL) {
00146       ast_mutex_lock(&st->lock);
00147       st->stop = 1;
00148       ast_cond_signal(&st->cond);
00149       ast_mutex_unlock(&st->lock);
00150       pthread_join(st->thread, NULL);
00151       st->thread = AST_PTHREADT_NULL;
00152    }
00153 
00154    ast_mutex_destroy(&st->lock);
00155    ast_cond_destroy(&st->cond);
00156 
00157    if (st->context) {
00158       sched_context_destroy(st->context);
00159       st->context = NULL;
00160    }
00161 
00162    ast_free(st);
00163 
00164    return NULL;
00165 }
00166 
00167 struct ast_sched_thread *ast_sched_thread_create(void)
00168 {
00169    struct ast_sched_thread *st;
00170 
00171    if (!(st = ast_calloc(1, sizeof(*st)))) {
00172       return NULL;
00173    }
00174 
00175    ast_mutex_init(&st->lock);
00176    ast_cond_init(&st->cond, NULL);
00177 
00178    st->thread = AST_PTHREADT_NULL;
00179 
00180    if (!(st->context = sched_context_create())) {
00181       ast_log(LOG_ERROR, "Failed to create scheduler\n");
00182       ast_sched_thread_destroy(st);
00183       return NULL;
00184    }
00185    
00186    if (ast_pthread_create_background(&st->thread, NULL, sched_run, st)) {
00187       ast_log(LOG_ERROR, "Failed to create scheduler thread\n");
00188       ast_sched_thread_destroy(st);
00189       return NULL;
00190    }
00191 
00192    return st;
00193 }
00194 
00195 int ast_sched_thread_add_variable(struct ast_sched_thread *st, int when, ast_sched_cb cb,
00196       const void *data, int variable)
00197 {
00198    int res;
00199 
00200    ast_mutex_lock(&st->lock);
00201    res = ast_sched_add_variable(st->context, when, cb, data, variable);
00202    if (res != -1) {
00203       ast_cond_signal(&st->cond);
00204    }
00205    ast_mutex_unlock(&st->lock);
00206 
00207    return res;
00208 }
00209 
00210 int ast_sched_thread_add(struct ast_sched_thread *st, int when, ast_sched_cb cb,
00211       const void *data)
00212 {
00213    int res;
00214 
00215    ast_mutex_lock(&st->lock);
00216    res = ast_sched_add(st->context, when, cb, data);
00217    if (res != -1) {
00218       ast_cond_signal(&st->cond);
00219    }
00220    ast_mutex_unlock(&st->lock);
00221 
00222    return res;
00223 }
00224 
00225 /* hash routines for sched */
00226 
00227 static int sched_cmp(const void *a, const void *b)
00228 {
00229    const struct sched *as = a;
00230    const struct sched *bs = b;
00231    return as->id != bs->id; /* return 0 on a match like strcmp would */
00232 }
00233 
00234 static unsigned int sched_hash(const void *obj)
00235 {
00236    const struct sched *s = obj;
00237    unsigned int h = s->id;
00238    return h;
00239 }
00240 
00241 static int sched_time_cmp(void *a, void *b)
00242 {
00243    return ast_tvcmp(((struct sched *) b)->when, ((struct sched *) a)->when);
00244 }
00245 
00246 struct sched_context *sched_context_create(void)
00247 {
00248    struct sched_context *tmp;
00249 
00250    if (!(tmp = ast_calloc(1, sizeof(*tmp))))
00251       return NULL;
00252 
00253    ast_mutex_init(&tmp->lock);
00254    tmp->eventcnt = 1;
00255 
00256    tmp->schedq_ht = ast_hashtab_create(23, sched_cmp, ast_hashtab_resize_java, ast_hashtab_newsize_java, sched_hash, 1);
00257 
00258    if (!(tmp->sched_heap = ast_heap_create(8, sched_time_cmp,
00259          offsetof(struct sched, __heap_index)))) {
00260       sched_context_destroy(tmp);
00261       return NULL;
00262    }
00263 
00264    return tmp;
00265 }
00266 
00267 void sched_context_destroy(struct sched_context *con)
00268 {
00269    struct sched *s;
00270 
00271    ast_mutex_lock(&con->lock);
00272 
00273 #ifdef SCHED_MAX_CACHE
00274    /* Eliminate the cache */
00275    while ((s = AST_LIST_REMOVE_HEAD(&con->schedc, list)))
00276       ast_free(s);
00277 #endif
00278 
00279    if (con->sched_heap) {
00280       while ((s = ast_heap_pop(con->sched_heap))) {
00281          ast_free(s);
00282       }
00283       ast_heap_destroy(con->sched_heap);
00284       con->sched_heap = NULL;
00285    }
00286 
00287    ast_hashtab_destroy(con->schedq_ht, NULL);
00288    con->schedq_ht = NULL;
00289    
00290    /* And the context */
00291    ast_mutex_unlock(&con->lock);
00292    ast_mutex_destroy(&con->lock);
00293    ast_free(con);
00294 }
00295 
00296 static struct sched *sched_alloc(struct sched_context *con)
00297 {
00298    struct sched *tmp;
00299 
00300    /*
00301     * We keep a small cache of schedule entries
00302     * to minimize the number of necessary malloc()'s
00303     */
00304 #ifdef SCHED_MAX_CACHE
00305    if ((tmp = AST_LIST_REMOVE_HEAD(&con->schedc, list)))
00306       con->schedccnt--;
00307    else
00308 #endif
00309       tmp = ast_calloc(1, sizeof(*tmp));
00310 
00311    return tmp;
00312 }
00313 
00314 static void sched_release(struct sched_context *con, struct sched *tmp)
00315 {
00316    /*
00317     * Add to the cache, or just free() if we
00318     * already have too many cache entries
00319     */
00320 
00321 #ifdef SCHED_MAX_CACHE   
00322    if (con->schedccnt < SCHED_MAX_CACHE) {
00323       AST_LIST_INSERT_HEAD(&con->schedc, tmp, list);
00324       con->schedccnt++;
00325    } else
00326 #endif
00327       ast_free(tmp);
00328 }
00329 
00330 /*! \brief
00331  * Return the number of milliseconds 
00332  * until the next scheduled event
00333  */
00334 int ast_sched_wait(struct sched_context *con)
00335 {
00336    int ms;
00337    struct sched *s;
00338 
00339    DEBUG(ast_debug(1, "ast_sched_wait()\n"));
00340 
00341    ast_mutex_lock(&con->lock);
00342    if ((s = ast_heap_peek(con->sched_heap, 1))) {
00343       ms = ast_tvdiff_ms(s->when, ast_tvnow());
00344       if (ms < 0) {
00345          ms = 0;
00346       }
00347    } else {
00348       ms = -1;
00349    }
00350    ast_mutex_unlock(&con->lock);
00351 
00352    return ms;
00353 }
00354 
00355 
00356 /*! \brief
00357  * Take a sched structure and put it in the
00358  * queue, such that the soonest event is
00359  * first in the list. 
00360  */
00361 static void schedule(struct sched_context *con, struct sched *s)
00362 {
00363    ast_heap_push(con->sched_heap, s);
00364 
00365    if (!ast_hashtab_insert_safe(con->schedq_ht, s)) {
00366       ast_log(LOG_WARNING,"Schedule Queue entry %d is already in table!\n", s->id);
00367    }
00368 
00369    con->schedcnt++;
00370 
00371    if (con->schedcnt > con->highwater) {
00372       con->highwater = con->schedcnt;
00373    }
00374 }
00375 
00376 /*! \brief
00377  * given the last event *tv and the offset in milliseconds 'when',
00378  * computes the next value,
00379  */
00380 static int sched_settime(struct timeval *t, int when)
00381 {
00382    struct timeval now = ast_tvnow();
00383 
00384    /*ast_debug(1, "TV -> %lu,%lu\n", tv->tv_sec, tv->tv_usec);*/
00385    if (ast_tvzero(*t))  /* not supplied, default to now */
00386       *t = now;
00387    *t = ast_tvadd(*t, ast_samp2tv(when, 1000));
00388    if (ast_tvcmp(*t, now) < 0) {
00389       *t = now;
00390    }
00391    return 0;
00392 }
00393 
00394 int ast_sched_replace_variable(int old_id, struct sched_context *con, int when, ast_sched_cb callback, const void *data, int variable)
00395 {
00396    /* 0 means the schedule item is new; do not delete */
00397    if (old_id > 0) {
00398       AST_SCHED_DEL(con, old_id);
00399    }
00400    return ast_sched_add_variable(con, when, callback, data, variable);
00401 }
00402 
00403 /*! \brief
00404  * Schedule callback(data) to happen when ms into the future
00405  */
00406 int ast_sched_add_variable(struct sched_context *con, int when, ast_sched_cb callback, const void *data, int variable)
00407 {
00408    struct sched *tmp;
00409    int res = -1;
00410 
00411    DEBUG(ast_debug(1, "ast_sched_add()\n"));
00412 
00413    ast_mutex_lock(&con->lock);
00414    if ((tmp = sched_alloc(con))) {
00415       tmp->id = con->eventcnt++;
00416       tmp->callback = callback;
00417       tmp->data = data;
00418       tmp->resched = when;
00419       tmp->variable = variable;
00420       tmp->when = ast_tv(0, 0);
00421       if (sched_settime(&tmp->when, when)) {
00422          sched_release(con, tmp);
00423       } else {
00424          schedule(con, tmp);
00425          res = tmp->id;
00426       }
00427    }
00428 #ifdef DUMP_SCHEDULER
00429    /* Dump contents of the context while we have the lock so nothing gets screwed up by accident. */
00430    if (option_debug)
00431       ast_sched_dump(con);
00432 #endif
00433    ast_mutex_unlock(&con->lock);
00434 
00435    return res;
00436 }
00437 
00438 int ast_sched_replace(int old_id, struct sched_context *con, int when, ast_sched_cb callback, const void *data)
00439 {
00440    if (old_id > -1) {
00441       AST_SCHED_DEL(con, old_id);
00442    }
00443    return ast_sched_add(con, when, callback, data);
00444 }
00445 
00446 int ast_sched_add(struct sched_context *con, int when, ast_sched_cb callback, const void *data)
00447 {
00448    return ast_sched_add_variable(con, when, callback, data, 0);
00449 }
00450 
00451 const void *ast_sched_find_data(struct sched_context *con, int id)
00452 {
00453    struct sched tmp,*res;
00454    tmp.id = id;
00455    res = ast_hashtab_lookup(con->schedq_ht, &tmp);
00456    if (res)
00457       return res->data;
00458    return NULL;
00459 }
00460 
00461 /*! \brief
00462  * Delete the schedule entry with number
00463  * "id".  It's nearly impossible that there
00464  * would be two or more in the list with that
00465  * id.
00466  */
00467 #ifndef AST_DEVMODE
00468 int ast_sched_del(struct sched_context *con, int id)
00469 #else
00470 int _ast_sched_del(struct sched_context *con, int id, const char *file, int line, const char *function)
00471 #endif
00472 {
00473    struct sched *s, tmp = {
00474       .id = id,
00475    };
00476    int *last_id = ast_threadstorage_get(&last_del_id, sizeof(int));
00477 
00478    DEBUG(ast_debug(1, "ast_sched_del(%d)\n", id));
00479 
00480    if (id < 0) {
00481       return 0;
00482    }
00483 
00484    ast_mutex_lock(&con->lock);
00485    s = ast_hashtab_lookup(con->schedq_ht, &tmp);
00486    if (s) {
00487       if (!ast_heap_remove(con->sched_heap, s)) {
00488          ast_log(LOG_WARNING,"sched entry %d not in the sched heap?\n", s->id);
00489       }
00490 
00491       if (!ast_hashtab_remove_this_object(con->schedq_ht, s)) {
00492          ast_log(LOG_WARNING,"Found sched entry %d, then couldn't remove it?\n", s->id);
00493       }
00494 
00495       con->schedcnt--;
00496 
00497       sched_release(con, s);
00498    }
00499 
00500 #ifdef DUMP_SCHEDULER
00501    /* Dump contents of the context while we have the lock so nothing gets screwed up by accident. */
00502    if (option_debug)
00503       ast_sched_dump(con);
00504 #endif
00505    ast_mutex_unlock(&con->lock);
00506 
00507    if (!s && *last_id != id) {
00508       ast_debug(1, "Attempted to delete nonexistent schedule entry %d!\n", id);
00509 #ifndef AST_DEVMODE
00510       ast_assert(s != NULL);
00511 #else
00512       {
00513       char buf[100];
00514       snprintf(buf, sizeof(buf), "s != NULL, id=%d", id);
00515       _ast_assert(0, buf, file, line, function);
00516       }
00517 #endif
00518       *last_id = id;
00519       return -1;
00520    } else if (!s) {
00521       return -1;
00522    }
00523 
00524    return 0;
00525 }
00526 
00527 void ast_sched_report(struct sched_context *con, struct ast_str **buf, struct ast_cb_names *cbnames)
00528 {
00529    int i, x;
00530    struct sched *cur;
00531    int countlist[cbnames->numassocs + 1];
00532    size_t heap_size;
00533    
00534    memset(countlist, 0, sizeof(countlist));
00535    ast_str_set(buf, 0, " Highwater = %u\n schedcnt = %u\n", con->highwater, con->schedcnt);
00536 
00537    ast_mutex_lock(&con->lock);
00538 
00539    heap_size = ast_heap_size(con->sched_heap);
00540    for (x = 1; x <= heap_size; x++) {
00541       cur = ast_heap_peek(con->sched_heap, x);
00542       /* match the callback to the cblist */
00543       for (i = 0; i < cbnames->numassocs; i++) {
00544          if (cur->callback == cbnames->cblist[i]) {
00545             break;
00546          }
00547       }
00548       if (i < cbnames->numassocs) {
00549          countlist[i]++;
00550       } else {
00551          countlist[cbnames->numassocs]++;
00552       }
00553    }
00554 
00555    ast_mutex_unlock(&con->lock);
00556 
00557    for (i = 0; i < cbnames->numassocs; i++) {
00558       ast_str_append(buf, 0, "    %s : %d\n", cbnames->list[i], countlist[i]);
00559    }
00560 
00561    ast_str_append(buf, 0, "   <unknown> : %d\n", countlist[cbnames->numassocs]);
00562 }
00563    
00564 /*! \brief Dump the contents of the scheduler to LOG_DEBUG */
00565 void ast_sched_dump(struct sched_context *con)
00566 {
00567    struct sched *q;
00568    struct timeval when = ast_tvnow();
00569    int x;
00570    size_t heap_size;
00571 #ifdef SCHED_MAX_CACHE
00572    ast_debug(1, "Asterisk Schedule Dump (%u in Q, %u Total, %u Cache, %u high-water)\n", con->schedcnt, con->eventcnt - 1, con->schedccnt, con->highwater);
00573 #else
00574    ast_debug(1, "Asterisk Schedule Dump (%u in Q, %u Total, %u high-water)\n", con->schedcnt, con->eventcnt - 1, con->highwater);
00575 #endif
00576 
00577    ast_debug(1, "=============================================================\n");
00578    ast_debug(1, "|ID    Callback          Data              Time  (sec:ms)   |\n");
00579    ast_debug(1, "+-----+-----------------+-----------------+-----------------+\n");
00580    ast_mutex_lock(&con->lock);
00581    heap_size = ast_heap_size(con->sched_heap);
00582    for (x = 1; x <= heap_size; x++) {
00583       struct timeval delta;
00584       q = ast_heap_peek(con->sched_heap, x);
00585       delta = ast_tvsub(q->when, when);
00586       ast_debug(1, "|%.4d | %-15p | %-15p | %.6ld : %.6ld |\n", 
00587          q->id,
00588          q->callback,
00589          q->data,
00590          (long)delta.tv_sec,
00591          (long int)delta.tv_usec);
00592    }
00593    ast_mutex_unlock(&con->lock);
00594    ast_debug(1, "=============================================================\n");
00595 }
00596 
00597 /*! \brief
00598  * Launch all events which need to be run at this time.
00599  */
00600 int ast_sched_runq(struct sched_context *con)
00601 {
00602    struct sched *current;
00603    struct timeval when;
00604    int numevents;
00605    int res;
00606 
00607    DEBUG(ast_debug(1, "ast_sched_runq()\n"));
00608       
00609    ast_mutex_lock(&con->lock);
00610 
00611    when = ast_tvadd(ast_tvnow(), ast_tv(0, 1000));
00612    for (numevents = 0; (current = ast_heap_peek(con->sched_heap, 1)); numevents++) {
00613       /* schedule all events which are going to expire within 1ms.
00614        * We only care about millisecond accuracy anyway, so this will
00615        * help us get more than one event at one time if they are very
00616        * close together.
00617        */
00618       if (ast_tvcmp(current->when, when) != -1) {
00619          break;
00620       }
00621       
00622       current = ast_heap_pop(con->sched_heap);
00623 
00624       if (!ast_hashtab_remove_this_object(con->schedq_ht, current)) {
00625          ast_log(LOG_ERROR,"Sched entry %d was in the schedq list but not in the hashtab???\n", current->id);
00626       }
00627 
00628       con->schedcnt--;
00629 
00630       /*
00631        * At this point, the schedule queue is still intact.  We
00632        * have removed the first event and the rest is still there,
00633        * so it's permissible for the callback to add new events, but
00634        * trying to delete itself won't work because it isn't in
00635        * the schedule queue.  If that's what it wants to do, it 
00636        * should return 0.
00637        */
00638          
00639       ast_mutex_unlock(&con->lock);
00640       res = current->callback(current->data);
00641       ast_mutex_lock(&con->lock);
00642          
00643       if (res) {
00644          /*
00645           * If they return non-zero, we should schedule them to be
00646           * run again.
00647           */
00648          if (sched_settime(&current->when, current->variable? res : current->resched)) {
00649             sched_release(con, current);
00650          } else {
00651             schedule(con, current);
00652          }
00653       } else {
00654          /* No longer needed, so release it */
00655          sched_release(con, current);
00656       }
00657    }
00658 
00659    ast_mutex_unlock(&con->lock);
00660    
00661    return numevents;
00662 }
00663 
00664 long ast_sched_when(struct sched_context *con,int id)
00665 {
00666    struct sched *s, tmp;
00667    long secs = -1;
00668    DEBUG(ast_debug(1, "ast_sched_when()\n"));
00669 
00670    ast_mutex_lock(&con->lock);
00671    
00672    /* these next 2 lines replace a lookup loop */
00673    tmp.id = id;
00674    s = ast_hashtab_lookup(con->schedq_ht, &tmp);
00675    
00676    if (s) {
00677       struct timeval now = ast_tvnow();
00678       secs = s->when.tv_sec - now.tv_sec;
00679    }
00680    ast_mutex_unlock(&con->lock);
00681    
00682    return secs;
00683 }

Generated on 7 Sep 2017 for Asterisk - The Open Source Telephony Project by  doxygen 1.6.1