Mon Mar 19 11:30:30 2012

Asterisk developer's documentation


taskprocessor.c

Go to the documentation of this file.
00001 /*
00002  * Asterisk -- An open source telephony toolkit.
00003  *
00004  * Copyright (C) 2007-2008, Digium, Inc.
00005  *
00006  * Dwayne M. Hubbard <dhubbard@digium.com>
00007  *
00008  * See http://www.asterisk.org for more information about
00009  * the Asterisk project. Please do not directly contact
00010  * any of the maintainers of this project for assistance;
00011  * the project provides a web site, mailing lists and IRC
00012  * channels for your use.
00013  *
00014  * This program is free software, distributed under the terms of
00015  * the GNU General Public License Version 2. See the LICENSE file
00016  * at the top of the source tree.
00017  */
00018 
00019 /*!
00020  * \file
00021  * \brief Maintain a container of uniquely-named taskprocessor threads that can be shared across modules.
00022  *
00023  * \author Dwayne Hubbard <dhubbard@digium.com>
00024  */
00025 
00026 #include "asterisk.h"
00027 
00028 ASTERISK_FILE_VERSION(__FILE__, "$Revision: 352955 $")
00029 
00030 #include "asterisk/_private.h"
00031 #include "asterisk/module.h"
00032 #include "asterisk/time.h"
00033 #include "asterisk/astobj2.h"
00034 #include "asterisk/cli.h"
00035 #include "asterisk/taskprocessor.h"
00036 
00037 
00038 /*!
00039  * \brief tps_task structure is queued to a taskprocessor
00040  *
00041  * tps_tasks are processed in FIFO order and freed by the taskprocessing
00042  * thread after the task handler returns.  The callback function that is assigned
00043  * to the execute() function pointer is responsible for releasing datap resources if necessary.
00044  */
00045 struct tps_task {
00046    /*! \brief The execute() task callback function pointer */
00047    int (*execute)(void *datap);
00048    /*! \brief The data pointer for the task execute() function */
00049    void *datap;
00050    /*! \brief AST_LIST_ENTRY overhead */
00051    AST_LIST_ENTRY(tps_task) list;
00052 };
00053 
00054 /*! \brief tps_taskprocessor_stats maintain statistics for a taskprocessor. */
00055 struct tps_taskprocessor_stats {
00056    /*! \brief This is the maximum number of tasks queued at any one time */
00057    unsigned long max_qsize;
00058    /*! \brief This is the current number of tasks processed */
00059    unsigned long _tasks_processed_count;
00060 };
00061 
00062 /*! \brief A ast_taskprocessor structure is a singleton by name */
00063 struct ast_taskprocessor {
00064    /*! \brief Friendly name of the taskprocessor */
00065    const char *name;
00066    /*! \brief Thread poll condition */
00067    ast_cond_t poll_cond;
00068    /*! \brief Taskprocessor thread */
00069    pthread_t poll_thread;
00070    /*! \brief Taskprocessor lock */
00071    ast_mutex_t taskprocessor_lock;
00072    /*! \brief Taskprocesor thread run flag */
00073    unsigned char poll_thread_run;
00074    /*! \brief Taskprocessor statistics */
00075    struct tps_taskprocessor_stats *stats;
00076    /*! \brief Taskprocessor current queue size */
00077    long tps_queue_size;
00078    /*! \brief Taskprocessor queue */
00079    AST_LIST_HEAD_NOLOCK(tps_queue, tps_task) tps_queue;
00080    /*! \brief Taskprocessor singleton list entry */
00081    AST_LIST_ENTRY(ast_taskprocessor) list;
00082 };
00083 #define TPS_MAX_BUCKETS 7
00084 /*! \brief tps_singletons is the astobj2 container for taskprocessor singletons */
00085 static struct ao2_container *tps_singletons;
00086 
00087 /*! \brief CLI <example>taskprocessor ping &lt;blah&gt;</example> operation requires a ping condition */
00088 static ast_cond_t cli_ping_cond;
00089 
00090 /*! \brief CLI <example>taskprocessor ping &lt;blah&gt;</example> operation requires a ping condition lock */
00091 AST_MUTEX_DEFINE_STATIC(cli_ping_cond_lock);
00092 
00093 /*! \brief The astobj2 hash callback for taskprocessors */
00094 static int tps_hash_cb(const void *obj, const int flags);
00095 /*! \brief The astobj2 compare callback for taskprocessors */
00096 static int tps_cmp_cb(void *obj, void *arg, int flags);
00097 
00098 /*! \brief The task processing function executed by a taskprocessor */
00099 static void *tps_processing_function(void *data);
00100 
00101 /*! \brief Destroy the taskprocessor when its refcount reaches zero */
00102 static void tps_taskprocessor_destroy(void *tps);
00103 
00104 /*! \brief CLI <example>taskprocessor ping &lt;blah&gt;</example> handler function */
00105 static int tps_ping_handler(void *datap);
00106 
00107 /*! \brief Remove the front task off the taskprocessor queue */
00108 static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps);
00109 
00110 /*! \brief Return the size of the taskprocessor queue */
00111 static int tps_taskprocessor_depth(struct ast_taskprocessor *tps);
00112 
00113 static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
00114 static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
00115 
00116 static struct ast_cli_entry taskprocessor_clis[] = {
00117    AST_CLI_DEFINE(cli_tps_ping, "Ping a named task processor"),
00118    AST_CLI_DEFINE(cli_tps_report, "List instantiated task processors and statistics"),
00119 };
00120 
00121 /* initialize the taskprocessor container and register CLI operations */
00122 int ast_tps_init(void)
00123 {
00124    if (!(tps_singletons = ao2_container_alloc(TPS_MAX_BUCKETS, tps_hash_cb, tps_cmp_cb))) {
00125       ast_log(LOG_ERROR, "taskprocessor container failed to initialize!\n");
00126       return -1;
00127    }
00128 
00129    ast_cond_init(&cli_ping_cond, NULL);
00130 
00131    ast_cli_register_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
00132    return 0;
00133 }
00134 
00135 /* allocate resources for the task */
00136 static struct tps_task *tps_task_alloc(int (*task_exe)(void *datap), void *datap)
00137 {
00138    struct tps_task *t;
00139    if ((t = ast_calloc(1, sizeof(*t)))) {
00140       t->execute = task_exe;
00141       t->datap = datap;
00142    }
00143    return t;
00144 }
00145 
00146 /* release task resources */  
00147 static void *tps_task_free(struct tps_task *task)
00148 {
00149    if (task) {
00150       ast_free(task);
00151    }
00152    return NULL;
00153 }
00154 
00155 /* taskprocessor tab completion */
00156 static char *tps_taskprocessor_tab_complete(struct ast_taskprocessor *p, struct ast_cli_args *a) 
00157 {
00158    int tklen;
00159    int wordnum = 0;
00160    char *name = NULL;
00161    struct ao2_iterator i;
00162 
00163    if (a->pos != 3)
00164       return NULL;
00165 
00166    tklen = strlen(a->word);
00167    i = ao2_iterator_init(tps_singletons, 0);
00168    while ((p = ao2_iterator_next(&i))) {
00169       if (!strncasecmp(a->word, p->name, tklen) && ++wordnum > a->n) {
00170          name = ast_strdup(p->name);
00171          ao2_ref(p, -1);
00172          break;
00173       }
00174       ao2_ref(p, -1);
00175    }
00176    ao2_iterator_destroy(&i);
00177    return name;
00178 }
00179 
00180 /* ping task handling function */
00181 static int tps_ping_handler(void *datap)
00182 {
00183    ast_mutex_lock(&cli_ping_cond_lock);
00184    ast_cond_signal(&cli_ping_cond);
00185    ast_mutex_unlock(&cli_ping_cond_lock);
00186    return 0;
00187 }
00188 
00189 /* ping the specified taskprocessor and display the ping time on the CLI */
00190 static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
00191 {
00192    struct timeval begin, end, delta;
00193    const char *name;
00194    struct timeval when;
00195    struct timespec ts;
00196    struct ast_taskprocessor *tps = NULL;
00197 
00198    switch (cmd) {
00199    case CLI_INIT:
00200       e->command = "core ping taskprocessor";
00201       e->usage = 
00202          "Usage: core ping taskprocessor <taskprocessor>\n"
00203          "  Displays the time required for a task to be processed\n";
00204       return NULL;
00205    case CLI_GENERATE:
00206       return tps_taskprocessor_tab_complete(tps, a);
00207    }
00208 
00209    if (a->argc != 4)
00210       return CLI_SHOWUSAGE;
00211 
00212    name = a->argv[3];
00213    if (!(tps = ast_taskprocessor_get(name, TPS_REF_IF_EXISTS))) {
00214       ast_cli(a->fd, "\nping failed: %s not found\n\n", name);
00215       return CLI_SUCCESS;
00216    }
00217    ast_cli(a->fd, "\npinging %s ...", name);
00218    when = ast_tvadd((begin = ast_tvnow()), ast_samp2tv(1000, 1000));
00219    ts.tv_sec = when.tv_sec;
00220    ts.tv_nsec = when.tv_usec * 1000;
00221    ast_mutex_lock(&cli_ping_cond_lock);
00222    if (ast_taskprocessor_push(tps, tps_ping_handler, 0) < 0) {
00223       ast_cli(a->fd, "\nping failed: could not push task to %s\n\n", name);
00224       ao2_ref(tps, -1);
00225       return CLI_FAILURE;
00226    }
00227    ast_cond_timedwait(&cli_ping_cond, &cli_ping_cond_lock, &ts);
00228    ast_mutex_unlock(&cli_ping_cond_lock);
00229    end = ast_tvnow();
00230    delta = ast_tvsub(end, begin);
00231    ast_cli(a->fd, "\n\t%24s ping time: %.1ld.%.6ld sec\n\n", name, (long)delta.tv_sec, (long int)delta.tv_usec);
00232    ao2_ref(tps, -1);
00233    return CLI_SUCCESS;  
00234 }
00235 
00236 static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
00237 {
00238    char name[256];
00239    int tcount;
00240    unsigned long qsize;
00241    unsigned long maxqsize;
00242    unsigned long processed;
00243    struct ast_taskprocessor *p;
00244    struct ao2_iterator i;
00245 
00246    switch (cmd) {
00247    case CLI_INIT:
00248       e->command = "core show taskprocessors";
00249       e->usage = 
00250          "Usage: core show taskprocessors\n"
00251          "  Shows a list of instantiated task processors and their statistics\n";
00252       return NULL;
00253    case CLI_GENERATE:
00254       return NULL;   
00255    }
00256 
00257    if (a->argc != e->args)
00258       return CLI_SHOWUSAGE;
00259 
00260    ast_cli(a->fd, "\n\t+----- Processor -----+--- Processed ---+- In Queue -+- Max Depth -+");
00261    i = ao2_iterator_init(tps_singletons, 0);
00262    while ((p = ao2_iterator_next(&i))) {
00263       ast_copy_string(name, p->name, sizeof(name));
00264       qsize = p->tps_queue_size;
00265       maxqsize = p->stats->max_qsize;
00266       processed = p->stats->_tasks_processed_count;
00267       ast_cli(a->fd, "\n%24s   %17ld %12ld %12ld", name, processed, qsize, maxqsize);
00268       ao2_ref(p, -1);
00269    }
00270    ao2_iterator_destroy(&i);
00271    tcount = ao2_container_count(tps_singletons); 
00272    ast_cli(a->fd, "\n\t+---------------------+-----------------+------------+-------------+\n\t%d taskprocessors\n\n", tcount);
00273    return CLI_SUCCESS;  
00274 }
00275 
00276 /* this is the task processing worker function */
00277 static void *tps_processing_function(void *data)
00278 {
00279    struct ast_taskprocessor *i = data;
00280    struct tps_task *t;
00281    int size;
00282 
00283    if (!i) {
00284       ast_log(LOG_ERROR, "cannot start thread_function loop without a ast_taskprocessor structure.\n");
00285       return NULL;
00286    }
00287 
00288    while (i->poll_thread_run) {
00289       ast_mutex_lock(&i->taskprocessor_lock);
00290       if (!i->poll_thread_run) {
00291          ast_mutex_unlock(&i->taskprocessor_lock);
00292          break;
00293       }
00294       if (!(size = tps_taskprocessor_depth(i))) {
00295          ast_cond_wait(&i->poll_cond, &i->taskprocessor_lock);
00296          if (!i->poll_thread_run) {
00297             ast_mutex_unlock(&i->taskprocessor_lock);
00298             break;
00299          }
00300       }
00301       ast_mutex_unlock(&i->taskprocessor_lock);
00302       /* stuff is in the queue */
00303       if (!(t = tps_taskprocessor_pop(i))) {
00304          ast_log(LOG_ERROR, "Wtf?? %d tasks in the queue, but we're popping blanks!\n", size);
00305          continue;
00306       }
00307       if (!t->execute) {
00308          ast_log(LOG_WARNING, "Task is missing a function to execute!\n");
00309          tps_task_free(t);
00310          continue;
00311       }
00312       t->execute(t->datap);
00313  
00314       ast_mutex_lock(&i->taskprocessor_lock);
00315       if (i->stats) {
00316          i->stats->_tasks_processed_count++;
00317          if (size > i->stats->max_qsize) {
00318             i->stats->max_qsize = size;
00319          }
00320       }
00321       ast_mutex_unlock(&i->taskprocessor_lock);
00322  
00323       tps_task_free(t);
00324    }
00325    while ((t = tps_taskprocessor_pop(i))) {
00326       tps_task_free(t);
00327    }
00328    return NULL;
00329 }
00330 
00331 /* hash callback for astobj2 */
00332 static int tps_hash_cb(const void *obj, const int flags)
00333 {
00334    const struct ast_taskprocessor *tps = obj;
00335 
00336    return ast_str_case_hash(tps->name);
00337 }
00338 
00339 /* compare callback for astobj2 */
00340 static int tps_cmp_cb(void *obj, void *arg, int flags)
00341 {
00342    struct ast_taskprocessor *lhs = obj, *rhs = arg;
00343 
00344    return !strcasecmp(lhs->name, rhs->name) ? CMP_MATCH | CMP_STOP : 0;
00345 }
00346 
00347 /* destroy the taskprocessor */
00348 static void tps_taskprocessor_destroy(void *tps)
00349 {
00350    struct ast_taskprocessor *t = tps;
00351    
00352    if (!tps) {
00353       ast_log(LOG_ERROR, "missing taskprocessor\n");
00354       return;
00355    }
00356    ast_log(LOG_DEBUG, "destroying taskprocessor '%s'\n", t->name);
00357    /* kill it */  
00358    ast_mutex_lock(&t->taskprocessor_lock);
00359    t->poll_thread_run = 0;
00360    ast_cond_signal(&t->poll_cond);
00361    ast_mutex_unlock(&t->taskprocessor_lock);
00362    pthread_join(t->poll_thread, NULL);
00363    t->poll_thread = AST_PTHREADT_NULL;
00364    ast_mutex_destroy(&t->taskprocessor_lock);
00365    ast_cond_destroy(&t->poll_cond);
00366    /* free it */
00367    if (t->stats) {
00368       ast_free(t->stats);
00369       t->stats = NULL;
00370    }
00371    ast_free((char *) t->name);
00372 }
00373 
00374 /* pop the front task and return it */
00375 static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps)
00376 {
00377    struct tps_task *task;
00378 
00379    if (!tps) {
00380       ast_log(LOG_ERROR, "missing taskprocessor\n");
00381       return NULL;
00382    }
00383    ast_mutex_lock(&tps->taskprocessor_lock);
00384    if ((task = AST_LIST_REMOVE_HEAD(&tps->tps_queue, list))) {
00385       tps->tps_queue_size--;
00386    }
00387    ast_mutex_unlock(&tps->taskprocessor_lock);
00388    return task;
00389 }
00390 
00391 static int tps_taskprocessor_depth(struct ast_taskprocessor *tps)
00392 {
00393    return (tps) ? tps->tps_queue_size : -1;
00394 }
00395 
00396 /* taskprocessor name accessor */
00397 const char *ast_taskprocessor_name(struct ast_taskprocessor *tps)
00398 {
00399    if (!tps) {
00400       ast_log(LOG_ERROR, "no taskprocessor specified!\n");
00401       return NULL;
00402    }
00403    return tps->name;
00404 }
00405 
00406 /* Provide a reference to a taskprocessor.  Create the taskprocessor if necessary, but don't
00407  * create the taskprocessor if we were told via ast_tps_options to return a reference only 
00408  * if it already exists */
00409 struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_options create)
00410 {
00411    struct ast_taskprocessor *p, tmp_tps = {
00412       .name = name,
00413    };
00414       
00415    if (ast_strlen_zero(name)) {
00416       ast_log(LOG_ERROR, "requesting a nameless taskprocessor!!!\n");
00417       return NULL;
00418    }
00419    ao2_lock(tps_singletons);
00420    p = ao2_find(tps_singletons, &tmp_tps, OBJ_POINTER);
00421    if (p) {
00422       ao2_unlock(tps_singletons);
00423       return p;
00424    }
00425    if (create & TPS_REF_IF_EXISTS) {
00426       /* calling function does not want a new taskprocessor to be created if it doesn't already exist */
00427       ao2_unlock(tps_singletons);
00428       return NULL;
00429    }
00430    /* create a new taskprocessor */
00431    if (!(p = ao2_alloc(sizeof(*p), tps_taskprocessor_destroy))) {
00432       ao2_unlock(tps_singletons);
00433       ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name);
00434       return NULL;
00435    }
00436 
00437    ast_cond_init(&p->poll_cond, NULL);
00438    ast_mutex_init(&p->taskprocessor_lock);
00439 
00440    if (!(p->stats = ast_calloc(1, sizeof(*p->stats)))) {
00441       ao2_unlock(tps_singletons);
00442       ast_log(LOG_WARNING, "failed to create taskprocessor stats for '%s'\n", name);
00443       ao2_ref(p, -1);
00444       return NULL;
00445    }
00446    if (!(p->name = ast_strdup(name))) {
00447       ao2_unlock(tps_singletons);
00448       ao2_ref(p, -1);
00449       return NULL;
00450    }
00451    p->poll_thread_run = 1;
00452    p->poll_thread = AST_PTHREADT_NULL;
00453    if (ast_pthread_create(&p->poll_thread, NULL, tps_processing_function, p) < 0) {
00454       ao2_unlock(tps_singletons);
00455       ast_log(LOG_ERROR, "Taskprocessor '%s' failed to create the processing thread.\n", p->name);
00456       ao2_ref(p, -1);
00457       return NULL;
00458    }
00459    if (!(ao2_link(tps_singletons, p))) {
00460       ao2_unlock(tps_singletons);
00461       ast_log(LOG_ERROR, "Failed to add taskprocessor '%s' to container\n", p->name);
00462       ao2_ref(p, -1);
00463       return NULL;
00464    }
00465    ao2_unlock(tps_singletons);
00466    return p;
00467 }
00468 
00469 /* decrement the taskprocessor reference count and unlink from the container if necessary */
00470 void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
00471 {
00472    if (tps) {
00473       ao2_lock(tps_singletons);
00474       ao2_unlink(tps_singletons, tps);
00475       if (ao2_ref(tps, -1) > 1) {
00476          ao2_link(tps_singletons, tps);
00477       }
00478       ao2_unlock(tps_singletons);
00479    }
00480    return NULL;
00481 }
00482 
00483 /* push the task into the taskprocessor queue */   
00484 int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
00485 {
00486    struct tps_task *t;
00487 
00488    if (!tps || !task_exe) {
00489       ast_log(LOG_ERROR, "%s is missing!!\n", (tps) ? "task callback" : "taskprocessor");
00490       return -1;
00491    }
00492    if (!(t = tps_task_alloc(task_exe, datap))) {
00493       ast_log(LOG_ERROR, "failed to allocate task!  Can't push to '%s'\n", tps->name);
00494       return -1;
00495    }
00496    ast_mutex_lock(&tps->taskprocessor_lock);
00497    AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
00498    tps->tps_queue_size++;
00499    ast_cond_signal(&tps->poll_cond);
00500    ast_mutex_unlock(&tps->taskprocessor_lock);
00501    return 0;
00502 }
00503 

Generated on Mon Mar 19 11:30:30 2012 for Asterisk - The Open Source Telephony Project by  doxygen 1.4.7