Wed Jan 27 20:02:16 2016

Asterisk developer's documentation


taskprocessor.c

Go to the documentation of this file.
00001 /*
00002  * Asterisk -- An open source telephony toolkit.
00003  *
00004  * Copyright (C) 2007-2008, Digium, Inc.
00005  *
00006  * Dwayne M. Hubbard <dhubbard@digium.com>
00007  *
00008  * See http://www.asterisk.org for more information about
00009  * the Asterisk project. Please do not directly contact
00010  * any of the maintainers of this project for assistance;
00011  * the project provides a web site, mailing lists and IRC
00012  * channels for your use.
00013  *
00014  * This program is free software, distributed under the terms of
00015  * the GNU General Public License Version 2. See the LICENSE file
00016  * at the top of the source tree.
00017  */
00018 
00019 /*!
00020  * \file
00021  * \brief Maintain a container of uniquely-named taskprocessor threads that can be shared across modules.
00022  *
00023  * \author Dwayne Hubbard <dhubbard@digium.com>
00024  */
00025 
00026 /*** MODULEINFO
00027    <support_level>core</support_level>
00028  ***/
00029 
00030 #include "asterisk.h"
00031 
00032 ASTERISK_FILE_VERSION(__FILE__, "$Revision: 413586 $")
00033 
00034 #include "asterisk/_private.h"
00035 #include "asterisk/module.h"
00036 #include "asterisk/time.h"
00037 #include "asterisk/astobj2.h"
00038 #include "asterisk/cli.h"
00039 #include "asterisk/taskprocessor.h"
00040 
00041 
00042 /*!
00043  * \brief tps_task structure is queued to a taskprocessor
00044  *
00045  * tps_tasks are processed in FIFO order and freed by the taskprocessing
00046  * thread after the task handler returns.  The callback function that is assigned
00047  * to the execute() function pointer is responsible for releasing datap resources if necessary.
00048  */
00049 struct tps_task {
00050    /*! \brief The execute() task callback function pointer */
00051    int (*execute)(void *datap);
00052    /*! \brief The data pointer for the task execute() function */
00053    void *datap;
00054    /*! \brief AST_LIST_ENTRY overhead */
00055    AST_LIST_ENTRY(tps_task) list;
00056 };
00057 
00058 /*! \brief tps_taskprocessor_stats maintain statistics for a taskprocessor. */
00059 struct tps_taskprocessor_stats {
00060    /*! \brief This is the maximum number of tasks queued at any one time */
00061    unsigned long max_qsize;
00062    /*! \brief This is the current number of tasks processed */
00063    unsigned long _tasks_processed_count;
00064 };
00065 
00066 /*! \brief A ast_taskprocessor structure is a singleton by name */
00067 struct ast_taskprocessor {
00068    /*! \brief Friendly name of the taskprocessor */
00069    const char *name;
00070    /*! \brief Thread poll condition */
00071    ast_cond_t poll_cond;
00072    /*! \brief Taskprocessor thread */
00073    pthread_t poll_thread;
00074    /*! \brief Taskprocessor lock */
00075    ast_mutex_t taskprocessor_lock;
00076    /*! \brief Taskprocesor thread run flag */
00077    unsigned char poll_thread_run;
00078    /*! \brief Taskprocessor statistics */
00079    struct tps_taskprocessor_stats *stats;
00080    /*! \brief Taskprocessor current queue size */
00081    long tps_queue_size;
00082    /*! \brief Taskprocessor queue */
00083    AST_LIST_HEAD_NOLOCK(tps_queue, tps_task) tps_queue;
00084    /*! \brief Taskprocessor singleton list entry */
00085    AST_LIST_ENTRY(ast_taskprocessor) list;
00086 };
00087 #define TPS_MAX_BUCKETS 7
00088 /*! \brief tps_singletons is the astobj2 container for taskprocessor singletons */
00089 static struct ao2_container *tps_singletons;
00090 
00091 /*! \brief CLI <example>taskprocessor ping &lt;blah&gt;</example> operation requires a ping condition */
00092 static ast_cond_t cli_ping_cond;
00093 
00094 /*! \brief CLI <example>taskprocessor ping &lt;blah&gt;</example> operation requires a ping condition lock */
00095 AST_MUTEX_DEFINE_STATIC(cli_ping_cond_lock);
00096 
00097 /*! \brief The astobj2 hash callback for taskprocessors */
00098 static int tps_hash_cb(const void *obj, const int flags);
00099 /*! \brief The astobj2 compare callback for taskprocessors */
00100 static int tps_cmp_cb(void *obj, void *arg, int flags);
00101 
00102 /*! \brief The task processing function executed by a taskprocessor */
00103 static void *tps_processing_function(void *data);
00104 
00105 /*! \brief Destroy the taskprocessor when its refcount reaches zero */
00106 static void tps_taskprocessor_destroy(void *tps);
00107 
00108 /*! \brief CLI <example>taskprocessor ping &lt;blah&gt;</example> handler function */
00109 static int tps_ping_handler(void *datap);
00110 
00111 /*! \brief Remove the front task off the taskprocessor queue */
00112 static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps);
00113 
00114 /*! \brief Return the size of the taskprocessor queue */
00115 static int tps_taskprocessor_depth(struct ast_taskprocessor *tps);
00116 
00117 static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
00118 static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
00119 
00120 static struct ast_cli_entry taskprocessor_clis[] = {
00121    AST_CLI_DEFINE(cli_tps_ping, "Ping a named task processor"),
00122    AST_CLI_DEFINE(cli_tps_report, "List instantiated task processors and statistics"),
00123 };
00124 
00125 /*! \internal \brief Clean up resources on Asterisk shutdown */
00126 static void tps_shutdown(void)
00127 {
00128    ast_cli_unregister_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
00129    ao2_t_ref(tps_singletons, -1, "Unref tps_singletons in shutdown");
00130    tps_singletons = NULL;
00131 }
00132 
00133 /* initialize the taskprocessor container and register CLI operations */
00134 int ast_tps_init(void)
00135 {
00136    if (!(tps_singletons = ao2_container_alloc(TPS_MAX_BUCKETS, tps_hash_cb, tps_cmp_cb))) {
00137       ast_log(LOG_ERROR, "taskprocessor container failed to initialize!\n");
00138       return -1;
00139    }
00140 
00141    ast_cond_init(&cli_ping_cond, NULL);
00142 
00143    ast_cli_register_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
00144 
00145    ast_register_atexit(tps_shutdown);
00146 
00147    return 0;
00148 }
00149 
00150 /* allocate resources for the task */
00151 static struct tps_task *tps_task_alloc(int (*task_exe)(void *datap), void *datap)
00152 {
00153    struct tps_task *t;
00154    if ((t = ast_calloc(1, sizeof(*t)))) {
00155       t->execute = task_exe;
00156       t->datap = datap;
00157    }
00158    return t;
00159 }
00160 
00161 /* release task resources */  
00162 static void *tps_task_free(struct tps_task *task)
00163 {
00164    if (task) {
00165       ast_free(task);
00166    }
00167    return NULL;
00168 }
00169 
00170 /* taskprocessor tab completion */
00171 static char *tps_taskprocessor_tab_complete(struct ast_taskprocessor *p, struct ast_cli_args *a) 
00172 {
00173    int tklen;
00174    int wordnum = 0;
00175    char *name = NULL;
00176    struct ao2_iterator i;
00177 
00178    if (a->pos != 3)
00179       return NULL;
00180 
00181    tklen = strlen(a->word);
00182    i = ao2_iterator_init(tps_singletons, 0);
00183    while ((p = ao2_iterator_next(&i))) {
00184       if (!strncasecmp(a->word, p->name, tklen) && ++wordnum > a->n) {
00185          name = ast_strdup(p->name);
00186          ao2_ref(p, -1);
00187          break;
00188       }
00189       ao2_ref(p, -1);
00190    }
00191    ao2_iterator_destroy(&i);
00192    return name;
00193 }
00194 
00195 /* ping task handling function */
00196 static int tps_ping_handler(void *datap)
00197 {
00198    ast_mutex_lock(&cli_ping_cond_lock);
00199    ast_cond_signal(&cli_ping_cond);
00200    ast_mutex_unlock(&cli_ping_cond_lock);
00201    return 0;
00202 }
00203 
00204 /* ping the specified taskprocessor and display the ping time on the CLI */
00205 static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
00206 {
00207    struct timeval begin, end, delta;
00208    const char *name;
00209    struct timeval when;
00210    struct timespec ts;
00211    struct ast_taskprocessor *tps = NULL;
00212 
00213    switch (cmd) {
00214    case CLI_INIT:
00215       e->command = "core ping taskprocessor";
00216       e->usage = 
00217          "Usage: core ping taskprocessor <taskprocessor>\n"
00218          "  Displays the time required for a task to be processed\n";
00219       return NULL;
00220    case CLI_GENERATE:
00221       return tps_taskprocessor_tab_complete(tps, a);
00222    }
00223 
00224    if (a->argc != 4)
00225       return CLI_SHOWUSAGE;
00226 
00227    name = a->argv[3];
00228    if (!(tps = ast_taskprocessor_get(name, TPS_REF_IF_EXISTS))) {
00229       ast_cli(a->fd, "\nping failed: %s not found\n\n", name);
00230       return CLI_SUCCESS;
00231    }
00232    ast_cli(a->fd, "\npinging %s ...", name);
00233    when = ast_tvadd((begin = ast_tvnow()), ast_samp2tv(1000, 1000));
00234    ts.tv_sec = when.tv_sec;
00235    ts.tv_nsec = when.tv_usec * 1000;
00236    ast_mutex_lock(&cli_ping_cond_lock);
00237    if (ast_taskprocessor_push(tps, tps_ping_handler, 0) < 0) {
00238       ast_cli(a->fd, "\nping failed: could not push task to %s\n\n", name);
00239       ao2_ref(tps, -1);
00240       return CLI_FAILURE;
00241    }
00242    ast_cond_timedwait(&cli_ping_cond, &cli_ping_cond_lock, &ts);
00243    ast_mutex_unlock(&cli_ping_cond_lock);
00244    end = ast_tvnow();
00245    delta = ast_tvsub(end, begin);
00246    ast_cli(a->fd, "\n\t%24s ping time: %.1ld.%.6ld sec\n\n", name, (long)delta.tv_sec, (long int)delta.tv_usec);
00247    ao2_ref(tps, -1);
00248    return CLI_SUCCESS;  
00249 }
00250 
00251 static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
00252 {
00253    char name[256];
00254    int tcount;
00255    unsigned long qsize;
00256    unsigned long maxqsize;
00257    unsigned long processed;
00258    struct ast_taskprocessor *p;
00259    struct ao2_iterator i;
00260 
00261    switch (cmd) {
00262    case CLI_INIT:
00263       e->command = "core show taskprocessors";
00264       e->usage = 
00265          "Usage: core show taskprocessors\n"
00266          "  Shows a list of instantiated task processors and their statistics\n";
00267       return NULL;
00268    case CLI_GENERATE:
00269       return NULL;   
00270    }
00271 
00272    if (a->argc != e->args)
00273       return CLI_SHOWUSAGE;
00274 
00275    ast_cli(a->fd, "\n\t+----- Processor -----+--- Processed ---+- In Queue -+- Max Depth -+");
00276    i = ao2_iterator_init(tps_singletons, 0);
00277    while ((p = ao2_iterator_next(&i))) {
00278       ast_copy_string(name, p->name, sizeof(name));
00279       qsize = p->tps_queue_size;
00280       maxqsize = p->stats->max_qsize;
00281       processed = p->stats->_tasks_processed_count;
00282       ast_cli(a->fd, "\n%24s   %17lu %12lu %12lu", name, processed, qsize, maxqsize);
00283       ao2_ref(p, -1);
00284    }
00285    ao2_iterator_destroy(&i);
00286    tcount = ao2_container_count(tps_singletons); 
00287    ast_cli(a->fd, "\n\t+---------------------+-----------------+------------+-------------+\n\t%d taskprocessors\n\n", tcount);
00288    return CLI_SUCCESS;  
00289 }
00290 
00291 /* this is the task processing worker function */
00292 static void *tps_processing_function(void *data)
00293 {
00294    struct ast_taskprocessor *i = data;
00295    struct tps_task *t;
00296    int size;
00297 
00298    if (!i) {
00299       ast_log(LOG_ERROR, "cannot start thread_function loop without a ast_taskprocessor structure.\n");
00300       return NULL;
00301    }
00302 
00303    while (i->poll_thread_run) {
00304       ast_mutex_lock(&i->taskprocessor_lock);
00305       if (!i->poll_thread_run) {
00306          ast_mutex_unlock(&i->taskprocessor_lock);
00307          break;
00308       }
00309       if (!(size = tps_taskprocessor_depth(i))) {
00310          ast_cond_wait(&i->poll_cond, &i->taskprocessor_lock);
00311          if (!i->poll_thread_run) {
00312             ast_mutex_unlock(&i->taskprocessor_lock);
00313             break;
00314          }
00315       }
00316       ast_mutex_unlock(&i->taskprocessor_lock);
00317       /* stuff is in the queue */
00318       if (!(t = tps_taskprocessor_pop(i))) {
00319          ast_log(LOG_ERROR, "Wtf?? %d tasks in the queue, but we're popping blanks!\n", size);
00320          continue;
00321       }
00322       if (!t->execute) {
00323          ast_log(LOG_WARNING, "Task is missing a function to execute!\n");
00324          tps_task_free(t);
00325          continue;
00326       }
00327       t->execute(t->datap);
00328  
00329       ast_mutex_lock(&i->taskprocessor_lock);
00330       if (i->stats) {
00331          i->stats->_tasks_processed_count++;
00332          if (size > i->stats->max_qsize) {
00333             i->stats->max_qsize = size;
00334          }
00335       }
00336       ast_mutex_unlock(&i->taskprocessor_lock);
00337  
00338       tps_task_free(t);
00339    }
00340    while ((t = tps_taskprocessor_pop(i))) {
00341       tps_task_free(t);
00342    }
00343    return NULL;
00344 }
00345 
00346 /* hash callback for astobj2 */
00347 static int tps_hash_cb(const void *obj, const int flags)
00348 {
00349    const struct ast_taskprocessor *tps = obj;
00350 
00351    return ast_str_case_hash(tps->name);
00352 }
00353 
00354 /* compare callback for astobj2 */
00355 static int tps_cmp_cb(void *obj, void *arg, int flags)
00356 {
00357    struct ast_taskprocessor *lhs = obj, *rhs = arg;
00358 
00359    return !strcasecmp(lhs->name, rhs->name) ? CMP_MATCH | CMP_STOP : 0;
00360 }
00361 
00362 /* destroy the taskprocessor */
00363 static void tps_taskprocessor_destroy(void *tps)
00364 {
00365    struct ast_taskprocessor *t = tps;
00366    
00367    if (!tps) {
00368       ast_log(LOG_ERROR, "missing taskprocessor\n");
00369       return;
00370    }
00371    ast_log(LOG_DEBUG, "destroying taskprocessor '%s'\n", t->name);
00372    /* kill it */  
00373    ast_mutex_lock(&t->taskprocessor_lock);
00374    t->poll_thread_run = 0;
00375    ast_cond_signal(&t->poll_cond);
00376    ast_mutex_unlock(&t->taskprocessor_lock);
00377    pthread_join(t->poll_thread, NULL);
00378    t->poll_thread = AST_PTHREADT_NULL;
00379    ast_mutex_destroy(&t->taskprocessor_lock);
00380    ast_cond_destroy(&t->poll_cond);
00381    /* free it */
00382    if (t->stats) {
00383       ast_free(t->stats);
00384       t->stats = NULL;
00385    }
00386    ast_free((char *) t->name);
00387 }
00388 
00389 /* pop the front task and return it */
00390 static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps)
00391 {
00392    struct tps_task *task;
00393 
00394    if (!tps) {
00395       ast_log(LOG_ERROR, "missing taskprocessor\n");
00396       return NULL;
00397    }
00398    ast_mutex_lock(&tps->taskprocessor_lock);
00399    if ((task = AST_LIST_REMOVE_HEAD(&tps->tps_queue, list))) {
00400       tps->tps_queue_size--;
00401    }
00402    ast_mutex_unlock(&tps->taskprocessor_lock);
00403    return task;
00404 }
00405 
00406 static int tps_taskprocessor_depth(struct ast_taskprocessor *tps)
00407 {
00408    return (tps) ? tps->tps_queue_size : -1;
00409 }
00410 
00411 /* taskprocessor name accessor */
00412 const char *ast_taskprocessor_name(struct ast_taskprocessor *tps)
00413 {
00414    if (!tps) {
00415       ast_log(LOG_ERROR, "no taskprocessor specified!\n");
00416       return NULL;
00417    }
00418    return tps->name;
00419 }
00420 
00421 /* Provide a reference to a taskprocessor.  Create the taskprocessor if necessary, but don't
00422  * create the taskprocessor if we were told via ast_tps_options to return a reference only 
00423  * if it already exists */
00424 struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_options create)
00425 {
00426    struct ast_taskprocessor *p, tmp_tps = {
00427       .name = name,
00428    };
00429       
00430    if (ast_strlen_zero(name)) {
00431       ast_log(LOG_ERROR, "requesting a nameless taskprocessor!!!\n");
00432       return NULL;
00433    }
00434    ao2_lock(tps_singletons);
00435    p = ao2_find(tps_singletons, &tmp_tps, OBJ_POINTER);
00436    if (p) {
00437       ao2_unlock(tps_singletons);
00438       return p;
00439    }
00440    if (create & TPS_REF_IF_EXISTS) {
00441       /* calling function does not want a new taskprocessor to be created if it doesn't already exist */
00442       ao2_unlock(tps_singletons);
00443       return NULL;
00444    }
00445    /* create a new taskprocessor */
00446    if (!(p = ao2_alloc(sizeof(*p), tps_taskprocessor_destroy))) {
00447       ao2_unlock(tps_singletons);
00448       ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name);
00449       return NULL;
00450    }
00451 
00452    ast_cond_init(&p->poll_cond, NULL);
00453    ast_mutex_init(&p->taskprocessor_lock);
00454 
00455    if (!(p->stats = ast_calloc(1, sizeof(*p->stats)))) {
00456       ao2_unlock(tps_singletons);
00457       ast_log(LOG_WARNING, "failed to create taskprocessor stats for '%s'\n", name);
00458       ao2_ref(p, -1);
00459       return NULL;
00460    }
00461    if (!(p->name = ast_strdup(name))) {
00462       ao2_unlock(tps_singletons);
00463       ao2_ref(p, -1);
00464       return NULL;
00465    }
00466    p->poll_thread_run = 1;
00467    p->poll_thread = AST_PTHREADT_NULL;
00468    if (ast_pthread_create(&p->poll_thread, NULL, tps_processing_function, p) < 0) {
00469       ao2_unlock(tps_singletons);
00470       ast_log(LOG_ERROR, "Taskprocessor '%s' failed to create the processing thread.\n", p->name);
00471       ao2_ref(p, -1);
00472       return NULL;
00473    }
00474    if (!(ao2_link(tps_singletons, p))) {
00475       ao2_unlock(tps_singletons);
00476       ast_log(LOG_ERROR, "Failed to add taskprocessor '%s' to container\n", p->name);
00477       ao2_ref(p, -1);
00478       return NULL;
00479    }
00480    ao2_unlock(tps_singletons);
00481    return p;
00482 }
00483 
00484 /* decrement the taskprocessor reference count and unlink from the container if necessary */
00485 void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
00486 {
00487    if (tps) {
00488       ao2_lock(tps_singletons);
00489       ao2_unlink(tps_singletons, tps);
00490       if (ao2_ref(tps, -1) > 1) {
00491          ao2_link(tps_singletons, tps);
00492       }
00493       ao2_unlock(tps_singletons);
00494    }
00495    return NULL;
00496 }
00497 
00498 /* push the task into the taskprocessor queue */   
00499 int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
00500 {
00501    struct tps_task *t;
00502 
00503    if (!tps || !task_exe) {
00504       ast_log(LOG_ERROR, "%s is missing!!\n", (tps) ? "task callback" : "taskprocessor");
00505       return -1;
00506    }
00507    if (!(t = tps_task_alloc(task_exe, datap))) {
00508       ast_log(LOG_ERROR, "failed to allocate task!  Can't push to '%s'\n", tps->name);
00509       return -1;
00510    }
00511    ast_mutex_lock(&tps->taskprocessor_lock);
00512    AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
00513    tps->tps_queue_size++;
00514    ast_cond_signal(&tps->poll_cond);
00515    ast_mutex_unlock(&tps->taskprocessor_lock);
00516    return 0;
00517 }
00518 

Generated on 27 Jan 2016 for Asterisk - The Open Source Telephony Project by  doxygen 1.6.1