00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026 #include "asterisk.h"
00027
00028 ASTERISK_FILE_VERSION(__FILE__, "$Revision: 352955 $")
00029
00030 #include "asterisk/_private.h"
00031 #include "asterisk/module.h"
00032 #include "asterisk/time.h"
00033 #include "asterisk/astobj2.h"
00034 #include "asterisk/cli.h"
00035 #include "asterisk/taskprocessor.h"
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045 struct tps_task {
00046
00047 int (*execute)(void *datap);
00048
00049 void *datap;
00050
00051 AST_LIST_ENTRY(tps_task) list;
00052 };
00053
00054
00055 struct tps_taskprocessor_stats {
00056
00057 unsigned long max_qsize;
00058
00059 unsigned long _tasks_processed_count;
00060 };
00061
00062
00063 struct ast_taskprocessor {
00064
00065 const char *name;
00066
00067 ast_cond_t poll_cond;
00068
00069 pthread_t poll_thread;
00070
00071 ast_mutex_t taskprocessor_lock;
00072
00073 unsigned char poll_thread_run;
00074
00075 struct tps_taskprocessor_stats *stats;
00076
00077 long tps_queue_size;
00078
00079 AST_LIST_HEAD_NOLOCK(tps_queue, tps_task) tps_queue;
00080
00081 AST_LIST_ENTRY(ast_taskprocessor) list;
00082 };
00083 #define TPS_MAX_BUCKETS 7
00084
00085 static struct ao2_container *tps_singletons;
00086
00087
00088 static ast_cond_t cli_ping_cond;
00089
00090
00091 AST_MUTEX_DEFINE_STATIC(cli_ping_cond_lock);
00092
00093
00094 static int tps_hash_cb(const void *obj, const int flags);
00095
00096 static int tps_cmp_cb(void *obj, void *arg, int flags);
00097
00098
00099 static void *tps_processing_function(void *data);
00100
00101
00102 static void tps_taskprocessor_destroy(void *tps);
00103
00104
00105 static int tps_ping_handler(void *datap);
00106
00107
00108 static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps);
00109
00110
00111 static int tps_taskprocessor_depth(struct ast_taskprocessor *tps);
00112
00113 static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
00114 static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
00115
00116 static struct ast_cli_entry taskprocessor_clis[] = {
00117 AST_CLI_DEFINE(cli_tps_ping, "Ping a named task processor"),
00118 AST_CLI_DEFINE(cli_tps_report, "List instantiated task processors and statistics"),
00119 };
00120
00121
00122 int ast_tps_init(void)
00123 {
00124 if (!(tps_singletons = ao2_container_alloc(TPS_MAX_BUCKETS, tps_hash_cb, tps_cmp_cb))) {
00125 ast_log(LOG_ERROR, "taskprocessor container failed to initialize!\n");
00126 return -1;
00127 }
00128
00129 ast_cond_init(&cli_ping_cond, NULL);
00130
00131 ast_cli_register_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
00132 return 0;
00133 }
00134
00135
00136 static struct tps_task *tps_task_alloc(int (*task_exe)(void *datap), void *datap)
00137 {
00138 struct tps_task *t;
00139 if ((t = ast_calloc(1, sizeof(*t)))) {
00140 t->execute = task_exe;
00141 t->datap = datap;
00142 }
00143 return t;
00144 }
00145
00146
00147 static void *tps_task_free(struct tps_task *task)
00148 {
00149 if (task) {
00150 ast_free(task);
00151 }
00152 return NULL;
00153 }
00154
00155
00156 static char *tps_taskprocessor_tab_complete(struct ast_taskprocessor *p, struct ast_cli_args *a)
00157 {
00158 int tklen;
00159 int wordnum = 0;
00160 char *name = NULL;
00161 struct ao2_iterator i;
00162
00163 if (a->pos != 3)
00164 return NULL;
00165
00166 tklen = strlen(a->word);
00167 i = ao2_iterator_init(tps_singletons, 0);
00168 while ((p = ao2_iterator_next(&i))) {
00169 if (!strncasecmp(a->word, p->name, tklen) && ++wordnum > a->n) {
00170 name = ast_strdup(p->name);
00171 ao2_ref(p, -1);
00172 break;
00173 }
00174 ao2_ref(p, -1);
00175 }
00176 ao2_iterator_destroy(&i);
00177 return name;
00178 }
00179
00180
00181 static int tps_ping_handler(void *datap)
00182 {
00183 ast_mutex_lock(&cli_ping_cond_lock);
00184 ast_cond_signal(&cli_ping_cond);
00185 ast_mutex_unlock(&cli_ping_cond_lock);
00186 return 0;
00187 }
00188
00189
00190 static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
00191 {
00192 struct timeval begin, end, delta;
00193 const char *name;
00194 struct timeval when;
00195 struct timespec ts;
00196 struct ast_taskprocessor *tps = NULL;
00197
00198 switch (cmd) {
00199 case CLI_INIT:
00200 e->command = "core ping taskprocessor";
00201 e->usage =
00202 "Usage: core ping taskprocessor <taskprocessor>\n"
00203 " Displays the time required for a task to be processed\n";
00204 return NULL;
00205 case CLI_GENERATE:
00206 return tps_taskprocessor_tab_complete(tps, a);
00207 }
00208
00209 if (a->argc != 4)
00210 return CLI_SHOWUSAGE;
00211
00212 name = a->argv[3];
00213 if (!(tps = ast_taskprocessor_get(name, TPS_REF_IF_EXISTS))) {
00214 ast_cli(a->fd, "\nping failed: %s not found\n\n", name);
00215 return CLI_SUCCESS;
00216 }
00217 ast_cli(a->fd, "\npinging %s ...", name);
00218 when = ast_tvadd((begin = ast_tvnow()), ast_samp2tv(1000, 1000));
00219 ts.tv_sec = when.tv_sec;
00220 ts.tv_nsec = when.tv_usec * 1000;
00221 ast_mutex_lock(&cli_ping_cond_lock);
00222 if (ast_taskprocessor_push(tps, tps_ping_handler, 0) < 0) {
00223 ast_cli(a->fd, "\nping failed: could not push task to %s\n\n", name);
00224 ao2_ref(tps, -1);
00225 return CLI_FAILURE;
00226 }
00227 ast_cond_timedwait(&cli_ping_cond, &cli_ping_cond_lock, &ts);
00228 ast_mutex_unlock(&cli_ping_cond_lock);
00229 end = ast_tvnow();
00230 delta = ast_tvsub(end, begin);
00231 ast_cli(a->fd, "\n\t%24s ping time: %.1ld.%.6ld sec\n\n", name, (long)delta.tv_sec, (long int)delta.tv_usec);
00232 ao2_ref(tps, -1);
00233 return CLI_SUCCESS;
00234 }
00235
00236 static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
00237 {
00238 char name[256];
00239 int tcount;
00240 unsigned long qsize;
00241 unsigned long maxqsize;
00242 unsigned long processed;
00243 struct ast_taskprocessor *p;
00244 struct ao2_iterator i;
00245
00246 switch (cmd) {
00247 case CLI_INIT:
00248 e->command = "core show taskprocessors";
00249 e->usage =
00250 "Usage: core show taskprocessors\n"
00251 " Shows a list of instantiated task processors and their statistics\n";
00252 return NULL;
00253 case CLI_GENERATE:
00254 return NULL;
00255 }
00256
00257 if (a->argc != e->args)
00258 return CLI_SHOWUSAGE;
00259
00260 ast_cli(a->fd, "\n\t+----- Processor -----+--- Processed ---+- In Queue -+- Max Depth -+");
00261 i = ao2_iterator_init(tps_singletons, 0);
00262 while ((p = ao2_iterator_next(&i))) {
00263 ast_copy_string(name, p->name, sizeof(name));
00264 qsize = p->tps_queue_size;
00265 maxqsize = p->stats->max_qsize;
00266 processed = p->stats->_tasks_processed_count;
00267 ast_cli(a->fd, "\n%24s %17ld %12ld %12ld", name, processed, qsize, maxqsize);
00268 ao2_ref(p, -1);
00269 }
00270 ao2_iterator_destroy(&i);
00271 tcount = ao2_container_count(tps_singletons);
00272 ast_cli(a->fd, "\n\t+---------------------+-----------------+------------+-------------+\n\t%d taskprocessors\n\n", tcount);
00273 return CLI_SUCCESS;
00274 }
00275
00276
00277 static void *tps_processing_function(void *data)
00278 {
00279 struct ast_taskprocessor *i = data;
00280 struct tps_task *t;
00281 int size;
00282
00283 if (!i) {
00284 ast_log(LOG_ERROR, "cannot start thread_function loop without a ast_taskprocessor structure.\n");
00285 return NULL;
00286 }
00287
00288 while (i->poll_thread_run) {
00289 ast_mutex_lock(&i->taskprocessor_lock);
00290 if (!i->poll_thread_run) {
00291 ast_mutex_unlock(&i->taskprocessor_lock);
00292 break;
00293 }
00294 if (!(size = tps_taskprocessor_depth(i))) {
00295 ast_cond_wait(&i->poll_cond, &i->taskprocessor_lock);
00296 if (!i->poll_thread_run) {
00297 ast_mutex_unlock(&i->taskprocessor_lock);
00298 break;
00299 }
00300 }
00301 ast_mutex_unlock(&i->taskprocessor_lock);
00302
00303 if (!(t = tps_taskprocessor_pop(i))) {
00304 ast_log(LOG_ERROR, "Wtf?? %d tasks in the queue, but we're popping blanks!\n", size);
00305 continue;
00306 }
00307 if (!t->execute) {
00308 ast_log(LOG_WARNING, "Task is missing a function to execute!\n");
00309 tps_task_free(t);
00310 continue;
00311 }
00312 t->execute(t->datap);
00313
00314 ast_mutex_lock(&i->taskprocessor_lock);
00315 if (i->stats) {
00316 i->stats->_tasks_processed_count++;
00317 if (size > i->stats->max_qsize) {
00318 i->stats->max_qsize = size;
00319 }
00320 }
00321 ast_mutex_unlock(&i->taskprocessor_lock);
00322
00323 tps_task_free(t);
00324 }
00325 while ((t = tps_taskprocessor_pop(i))) {
00326 tps_task_free(t);
00327 }
00328 return NULL;
00329 }
00330
00331
00332 static int tps_hash_cb(const void *obj, const int flags)
00333 {
00334 const struct ast_taskprocessor *tps = obj;
00335
00336 return ast_str_case_hash(tps->name);
00337 }
00338
00339
00340 static int tps_cmp_cb(void *obj, void *arg, int flags)
00341 {
00342 struct ast_taskprocessor *lhs = obj, *rhs = arg;
00343
00344 return !strcasecmp(lhs->name, rhs->name) ? CMP_MATCH | CMP_STOP : 0;
00345 }
00346
00347
00348 static void tps_taskprocessor_destroy(void *tps)
00349 {
00350 struct ast_taskprocessor *t = tps;
00351
00352 if (!tps) {
00353 ast_log(LOG_ERROR, "missing taskprocessor\n");
00354 return;
00355 }
00356 ast_log(LOG_DEBUG, "destroying taskprocessor '%s'\n", t->name);
00357
00358 ast_mutex_lock(&t->taskprocessor_lock);
00359 t->poll_thread_run = 0;
00360 ast_cond_signal(&t->poll_cond);
00361 ast_mutex_unlock(&t->taskprocessor_lock);
00362 pthread_join(t->poll_thread, NULL);
00363 t->poll_thread = AST_PTHREADT_NULL;
00364 ast_mutex_destroy(&t->taskprocessor_lock);
00365 ast_cond_destroy(&t->poll_cond);
00366
00367 if (t->stats) {
00368 ast_free(t->stats);
00369 t->stats = NULL;
00370 }
00371 ast_free((char *) t->name);
00372 }
00373
00374
00375 static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps)
00376 {
00377 struct tps_task *task;
00378
00379 if (!tps) {
00380 ast_log(LOG_ERROR, "missing taskprocessor\n");
00381 return NULL;
00382 }
00383 ast_mutex_lock(&tps->taskprocessor_lock);
00384 if ((task = AST_LIST_REMOVE_HEAD(&tps->tps_queue, list))) {
00385 tps->tps_queue_size--;
00386 }
00387 ast_mutex_unlock(&tps->taskprocessor_lock);
00388 return task;
00389 }
00390
00391 static int tps_taskprocessor_depth(struct ast_taskprocessor *tps)
00392 {
00393 return (tps) ? tps->tps_queue_size : -1;
00394 }
00395
00396
00397 const char *ast_taskprocessor_name(struct ast_taskprocessor *tps)
00398 {
00399 if (!tps) {
00400 ast_log(LOG_ERROR, "no taskprocessor specified!\n");
00401 return NULL;
00402 }
00403 return tps->name;
00404 }
00405
00406
00407
00408
00409 struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_options create)
00410 {
00411 struct ast_taskprocessor *p, tmp_tps = {
00412 .name = name,
00413 };
00414
00415 if (ast_strlen_zero(name)) {
00416 ast_log(LOG_ERROR, "requesting a nameless taskprocessor!!!\n");
00417 return NULL;
00418 }
00419 ao2_lock(tps_singletons);
00420 p = ao2_find(tps_singletons, &tmp_tps, OBJ_POINTER);
00421 if (p) {
00422 ao2_unlock(tps_singletons);
00423 return p;
00424 }
00425 if (create & TPS_REF_IF_EXISTS) {
00426
00427 ao2_unlock(tps_singletons);
00428 return NULL;
00429 }
00430
00431 if (!(p = ao2_alloc(sizeof(*p), tps_taskprocessor_destroy))) {
00432 ao2_unlock(tps_singletons);
00433 ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name);
00434 return NULL;
00435 }
00436
00437 ast_cond_init(&p->poll_cond, NULL);
00438 ast_mutex_init(&p->taskprocessor_lock);
00439
00440 if (!(p->stats = ast_calloc(1, sizeof(*p->stats)))) {
00441 ao2_unlock(tps_singletons);
00442 ast_log(LOG_WARNING, "failed to create taskprocessor stats for '%s'\n", name);
00443 ao2_ref(p, -1);
00444 return NULL;
00445 }
00446 if (!(p->name = ast_strdup(name))) {
00447 ao2_unlock(tps_singletons);
00448 ao2_ref(p, -1);
00449 return NULL;
00450 }
00451 p->poll_thread_run = 1;
00452 p->poll_thread = AST_PTHREADT_NULL;
00453 if (ast_pthread_create(&p->poll_thread, NULL, tps_processing_function, p) < 0) {
00454 ao2_unlock(tps_singletons);
00455 ast_log(LOG_ERROR, "Taskprocessor '%s' failed to create the processing thread.\n", p->name);
00456 ao2_ref(p, -1);
00457 return NULL;
00458 }
00459 if (!(ao2_link(tps_singletons, p))) {
00460 ao2_unlock(tps_singletons);
00461 ast_log(LOG_ERROR, "Failed to add taskprocessor '%s' to container\n", p->name);
00462 ao2_ref(p, -1);
00463 return NULL;
00464 }
00465 ao2_unlock(tps_singletons);
00466 return p;
00467 }
00468
00469
00470 void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
00471 {
00472 if (tps) {
00473 ao2_lock(tps_singletons);
00474 ao2_unlink(tps_singletons, tps);
00475 if (ao2_ref(tps, -1) > 1) {
00476 ao2_link(tps_singletons, tps);
00477 }
00478 ao2_unlock(tps_singletons);
00479 }
00480 return NULL;
00481 }
00482
00483
00484 int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
00485 {
00486 struct tps_task *t;
00487
00488 if (!tps || !task_exe) {
00489 ast_log(LOG_ERROR, "%s is missing!!\n", (tps) ? "task callback" : "taskprocessor");
00490 return -1;
00491 }
00492 if (!(t = tps_task_alloc(task_exe, datap))) {
00493 ast_log(LOG_ERROR, "failed to allocate task! Can't push to '%s'\n", tps->name);
00494 return -1;
00495 }
00496 ast_mutex_lock(&tps->taskprocessor_lock);
00497 AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
00498 tps->tps_queue_size++;
00499 ast_cond_signal(&tps->poll_cond);
00500 ast_mutex_unlock(&tps->taskprocessor_lock);
00501 return 0;
00502 }
00503