Wed Jan 8 2020 09:50:21

Asterisk developer's documentation


taskprocessor.c File Reference

Maintain a container of uniquely-named taskprocessor threads that can be shared across modules. More...

#include "asterisk.h"
#include "asterisk/_private.h"
#include "asterisk/module.h"
#include "asterisk/time.h"
#include "asterisk/astobj2.h"
#include "asterisk/cli.h"
#include "asterisk/taskprocessor.h"

Go to the source code of this file.

Data Structures

struct  ast_taskprocessor
 A ast_taskprocessor structure is a singleton by name. More...
 
struct  ast_taskprocessor::tps_queue
 Taskprocessor queue. More...
 
struct  tps_task
 tps_task structure is queued to a taskprocessor More...
 
struct  tps_taskprocessor_stats
 tps_taskprocessor_stats maintain statistics for a taskprocessor. More...
 

Macros

#define TPS_MAX_BUCKETS   7
 

Functions

struct ast_taskprocessorast_taskprocessor_get (const char *name, enum ast_tps_options create)
 Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary. More...
 
const char * ast_taskprocessor_name (struct ast_taskprocessor *tps)
 Return the name of the taskprocessor singleton. More...
 
int ast_taskprocessor_push (struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap)
 Push a task into the specified taskprocessor queue and signal the taskprocessor thread. More...
 
void * ast_taskprocessor_unreference (struct ast_taskprocessor *tps)
 Unreference the specified taskprocessor and its reference count will decrement. More...
 
int ast_tps_init (void)
 
static char * cli_tps_ping (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 
static char * cli_tps_report (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 
static int tps_cmp_cb (void *obj, void *arg, int flags)
 The astobj2 compare callback for taskprocessors. More...
 
static int tps_hash_cb (const void *obj, const int flags)
 The astobj2 hash callback for taskprocessors. More...
 
static int tps_ping_handler (void *datap)
 CLI taskprocessor ping <blah>handler function. More...
 
static void * tps_processing_function (void *data)
 The task processing function executed by a taskprocessor. More...
 
static void tps_shutdown (void)
 
static struct tps_tasktps_task_alloc (int(*task_exe)(void *datap), void *datap)
 
static void * tps_task_free (struct tps_task *task)
 
static int tps_taskprocessor_depth (struct ast_taskprocessor *tps)
 Return the size of the taskprocessor queue. More...
 
static void tps_taskprocessor_destroy (void *tps)
 Destroy the taskprocessor when its refcount reaches zero. More...
 
static struct tps_tasktps_taskprocessor_pop (struct ast_taskprocessor *tps)
 Remove the front task off the taskprocessor queue. More...
 
static char * tps_taskprocessor_tab_complete (struct ast_taskprocessor *p, struct ast_cli_args *a)
 

Variables

static ast_cond_t cli_ping_cond
 CLI taskprocessor ping <blah>operation requires a ping condition. More...
 
static ast_mutex_t cli_ping_cond_lock = { PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP , NULL, 1 }
 CLI taskprocessor ping <blah>operation requires a ping condition lock. More...
 
static struct ast_cli_entry taskprocessor_clis []
 
static struct ao2_containertps_singletons
 tps_singletons is the astobj2 container for taskprocessor singletons More...
 

Detailed Description

Maintain a container of uniquely-named taskprocessor threads that can be shared across modules.

Author
Dwayne Hubbard dhubb.nosp@m.ard@.nosp@m.digiu.nosp@m.m.co.nosp@m.m

Definition in file taskprocessor.c.

Macro Definition Documentation

#define TPS_MAX_BUCKETS   7

Definition at line 87 of file taskprocessor.c.

Referenced by ast_tps_init().

Function Documentation

struct ast_taskprocessor* ast_taskprocessor_get ( const char *  name,
enum ast_tps_options  create 
)

Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary.

The default behavior of instantiating a taskprocessor if one does not already exist can be disabled by specifying the TPS_REF_IF_EXISTS ast_tps_options as the second argument to ast_taskprocessor_get().

Parameters
nameThe name of the taskprocessor
createUse 0 by default or specify TPS_REF_IF_EXISTS to return NULL if the taskprocessor does not already exist return A pointer to a reference counted taskprocessor under normal conditions, or NULL if the TPS_REF_IF_EXISTS reference type is specified and the taskprocessor does not exist
Since
1.6.1

Definition at line 424 of file taskprocessor.c.

References ao2_alloc, ao2_find, ao2_link, ao2_lock, ao2_ref, ao2_unlock, ast_calloc, ast_cond_init, ast_log(), ast_mutex_init, ast_pthread_create, AST_PTHREADT_NULL, ast_strdup, ast_strlen_zero(), LOG_ERROR, LOG_WARNING, name, ast_taskprocessor::name, OBJ_POINTER, ast_taskprocessor::poll_cond, ast_taskprocessor::poll_thread, ast_taskprocessor::poll_thread_run, ast_taskprocessor::stats, ast_taskprocessor::taskprocessor_lock, tps_processing_function(), TPS_REF_IF_EXISTS, tps_singletons, and tps_taskprocessor_destroy().

Referenced by ast_cc_init(), ast_event_init(), cli_tps_ping(), load_module(), load_objects(), and load_pbx().

425 {
426  struct ast_taskprocessor *p, tmp_tps = {
427  .name = name,
428  };
429 
430  if (ast_strlen_zero(name)) {
431  ast_log(LOG_ERROR, "requesting a nameless taskprocessor!!!\n");
432  return NULL;
433  }
435  p = ao2_find(tps_singletons, &tmp_tps, OBJ_POINTER);
436  if (p) {
438  return p;
439  }
440  if (create & TPS_REF_IF_EXISTS) {
441  /* calling function does not want a new taskprocessor to be created if it doesn't already exist */
443  return NULL;
444  }
445  /* create a new taskprocessor */
446  if (!(p = ao2_alloc(sizeof(*p), tps_taskprocessor_destroy))) {
448  ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name);
449  return NULL;
450  }
451 
452  ast_cond_init(&p->poll_cond, NULL);
454 
455  if (!(p->stats = ast_calloc(1, sizeof(*p->stats)))) {
457  ast_log(LOG_WARNING, "failed to create taskprocessor stats for '%s'\n", name);
458  ao2_ref(p, -1);
459  return NULL;
460  }
461  if (!(p->name = ast_strdup(name))) {
463  ao2_ref(p, -1);
464  return NULL;
465  }
466  p->poll_thread_run = 1;
468  if (ast_pthread_create(&p->poll_thread, NULL, tps_processing_function, p) < 0) {
470  ast_log(LOG_ERROR, "Taskprocessor '%s' failed to create the processing thread.\n", p->name);
471  ao2_ref(p, -1);
472  return NULL;
473  }
474  if (!(ao2_link(tps_singletons, p))) {
476  ast_log(LOG_ERROR, "Failed to add taskprocessor '%s' to container\n", p->name);
477  ao2_ref(p, -1);
478  return NULL;
479  }
481  return p;
482 }
#define ao2_link(arg1, arg2)
Definition: astobj2.h:785
const char * name
Friendly name of the taskprocessor.
Definition: taskprocessor.c:69
#define ast_strdup(a)
Definition: astmm.h:109
#define LOG_WARNING
Definition: logger.h:144
ast_cond_t poll_cond
Thread poll condition.
Definition: taskprocessor.c:71
static struct ao2_container * tps_singletons
tps_singletons is the astobj2 container for taskprocessor singletons
Definition: taskprocessor.c:89
#define ast_cond_init(cond, attr)
Definition: lock.h:167
#define ao2_unlock(a)
Definition: astobj2.h:497
static void * tps_processing_function(void *data)
The task processing function executed by a taskprocessor.
ast_mutex_t taskprocessor_lock
Taskprocessor lock.
Definition: taskprocessor.c:75
static void tps_taskprocessor_destroy(void *tps)
Destroy the taskprocessor when its refcount reaches zero.
#define AST_PTHREADT_NULL
Definition: lock.h:65
static force_inline int attribute_pure ast_strlen_zero(const char *s)
Definition: strings.h:63
#define ao2_ref(o, delta)
Definition: astobj2.h:472
#define ao2_lock(a)
Definition: astobj2.h:488
#define LOG_ERROR
Definition: logger.h:155
void ast_log(int level, const char *file, int line, const char *function, const char *fmt,...)
Used for sending a log message This is the standard logger function. Probably the only way you will i...
Definition: logger.c:1207
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:430
unsigned char poll_thread_run
Taskprocesor thread run flag.
Definition: taskprocessor.c:77
#define ao2_find(arg1, arg2, arg3)
Definition: astobj2.h:964
static const char name[]
#define ast_pthread_create(a, b, c, d)
Definition: utils.h:418
pthread_t poll_thread
Taskprocessor thread.
Definition: taskprocessor.c:73
return a reference to a taskprocessor ONLY if it already exists
Definition: taskprocessor.h:60
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:67
#define ast_calloc(a, b)
Definition: astmm.h:82
#define ast_mutex_init(pmutex)
Definition: lock.h:152
struct tps_taskprocessor_stats * stats
Taskprocessor statistics.
Definition: taskprocessor.c:79
const char* ast_taskprocessor_name ( struct ast_taskprocessor tps)

Return the name of the taskprocessor singleton.

Since
1.6.1

Definition at line 412 of file taskprocessor.c.

References ast_log(), LOG_ERROR, and ast_taskprocessor::name.

413 {
414  if (!tps) {
415  ast_log(LOG_ERROR, "no taskprocessor specified!\n");
416  return NULL;
417  }
418  return tps->name;
419 }
const char * name
Friendly name of the taskprocessor.
Definition: taskprocessor.c:69
#define LOG_ERROR
Definition: logger.h:155
void ast_log(int level, const char *file, int line, const char *function, const char *fmt,...)
Used for sending a log message This is the standard logger function. Probably the only way you will i...
Definition: logger.c:1207
int ast_taskprocessor_push ( struct ast_taskprocessor tps,
int(*)(void *datap)  task_exe,
void *  datap 
)

Push a task into the specified taskprocessor queue and signal the taskprocessor thread.

Parameters
tpsThe taskprocessor structure
task_exeThe task handling function to push into the taskprocessor queue
datapThe data to be used by the task handling function
Return values
0success
-1failure
Since
1.6.1

Definition at line 499 of file taskprocessor.c.

References ast_cond_signal, AST_LIST_INSERT_TAIL, ast_log(), ast_mutex_lock, ast_mutex_unlock, LOG_ERROR, ast_taskprocessor::name, ast_taskprocessor::poll_cond, ast_taskprocessor::taskprocessor_lock, ast_taskprocessor::tps_queue, ast_taskprocessor::tps_queue_size, and tps_task_alloc().

Referenced by ast_cc_agent_status_response(), ast_cc_monitor_failed(), ast_cc_monitor_party_b_free(), ast_cc_monitor_status_request(), ast_cc_monitor_stop_ringing(), ast_event_queue(), cc_request_state_change(), cli_tps_ping(), device_state_cb(), generic_agent_devstate_cb(), generic_monitor_devstate_cb(), handle_cc_status(), iax2_transmit(), mwi_sub_event_cb(), and mwi_unsub_event_cb().

500 {
501  struct tps_task *t;
502 
503  if (!tps || !task_exe) {
504  ast_log(LOG_ERROR, "%s is missing!!\n", (tps) ? "task callback" : "taskprocessor");
505  return -1;
506  }
507  if (!(t = tps_task_alloc(task_exe, datap))) {
508  ast_log(LOG_ERROR, "failed to allocate task! Can't push to '%s'\n", tps->name);
509  return -1;
510  }
513  tps->tps_queue_size++;
514  ast_cond_signal(&tps->poll_cond);
516  return 0;
517 }
const char * name
Friendly name of the taskprocessor.
Definition: taskprocessor.c:69
ast_cond_t poll_cond
Thread poll condition.
Definition: taskprocessor.c:71
#define ast_mutex_lock(a)
Definition: lock.h:155
#define ast_cond_signal(cond)
Definition: lock.h:169
void * datap
The data pointer for the task execute() function.
Definition: taskprocessor.c:53
ast_mutex_t taskprocessor_lock
Taskprocessor lock.
Definition: taskprocessor.c:75
#define LOG_ERROR
Definition: logger.h:155
#define AST_LIST_INSERT_TAIL(head, elm, field)
Appends a list entry to the tail of a list.
Definition: linkedlists.h:716
void ast_log(int level, const char *file, int line, const char *function, const char *fmt,...)
Used for sending a log message This is the standard logger function. Probably the only way you will i...
Definition: logger.c:1207
struct tps_task::@303 list
AST_LIST_ENTRY overhead.
struct ast_taskprocessor::tps_queue tps_queue
tps_task structure is queued to a taskprocessor
Definition: taskprocessor.c:49
long tps_queue_size
Taskprocessor current queue size.
Definition: taskprocessor.c:81
static struct tps_task * tps_task_alloc(int(*task_exe)(void *datap), void *datap)
#define ast_mutex_unlock(a)
Definition: lock.h:156
void* ast_taskprocessor_unreference ( struct ast_taskprocessor tps)

Unreference the specified taskprocessor and its reference count will decrement.

Taskprocessors use astobj2 and will unlink from the taskprocessor singleton container and destroy themself when the taskprocessor reference count reaches zero.

Parameters
tpstaskprocessor to unreference
Returns
NULL
Since
1.6.1

Definition at line 485 of file taskprocessor.c.

References ao2_link, ao2_lock, ao2_ref, ao2_unlink, ao2_unlock, and tps_singletons.

Referenced by __unload_module(), cc_shutdown(), event_shutdown(), unload_module(), and unload_pbx().

486 {
487  if (tps) {
490  if (ao2_ref(tps, -1) > 1) {
491  ao2_link(tps_singletons, tps);
492  }
494  }
495  return NULL;
496 }
#define ao2_link(arg1, arg2)
Definition: astobj2.h:785
static struct ao2_container * tps_singletons
tps_singletons is the astobj2 container for taskprocessor singletons
Definition: taskprocessor.c:89
#define ao2_unlock(a)
Definition: astobj2.h:497
#define ao2_ref(o, delta)
Definition: astobj2.h:472
#define ao2_lock(a)
Definition: astobj2.h:488
#define ao2_unlink(arg1, arg2)
Definition: astobj2.h:817
int ast_tps_init ( void  )

Provided by taskprocessor.c

Definition at line 134 of file taskprocessor.c.

References ao2_container_alloc, ARRAY_LEN, ast_cli_register_multiple(), ast_cond_init, ast_log(), ast_register_atexit(), cli_ping_cond, LOG_ERROR, taskprocessor_clis, tps_cmp_cb(), tps_hash_cb(), TPS_MAX_BUCKETS, tps_shutdown(), and tps_singletons.

Referenced by main().

135 {
137  ast_log(LOG_ERROR, "taskprocessor container failed to initialize!\n");
138  return -1;
139  }
140 
142 
144 
146 
147  return 0;
148 }
static ast_cond_t cli_ping_cond
CLI taskprocessor ping &lt;blah&gt;operation requires a ping condition.
Definition: taskprocessor.c:92
#define ARRAY_LEN(a)
Definition: isdn_lib.c:42
static void tps_shutdown(void)
static int tps_cmp_cb(void *obj, void *arg, int flags)
The astobj2 compare callback for taskprocessors.
static int tps_hash_cb(const void *obj, const int flags)
The astobj2 hash callback for taskprocessors.
static struct ao2_container * tps_singletons
tps_singletons is the astobj2 container for taskprocessor singletons
Definition: taskprocessor.c:89
#define ast_cond_init(cond, attr)
Definition: lock.h:167
#define TPS_MAX_BUCKETS
Definition: taskprocessor.c:87
int ast_register_atexit(void(*func)(void))
Register a function to be executed before Asterisk exits.
Definition: asterisk.c:998
#define LOG_ERROR
Definition: logger.h:155
void ast_log(int level, const char *file, int line, const char *function, const char *fmt,...)
Used for sending a log message This is the standard logger function. Probably the only way you will i...
Definition: logger.c:1207
static struct ast_cli_entry taskprocessor_clis[]
#define ao2_container_alloc(arg1, arg2, arg3)
Definition: astobj2.h:734
int ast_cli_register_multiple(struct ast_cli_entry *e, int len)
Register multiple commands.
Definition: cli.c:2167
static char * cli_tps_ping ( struct ast_cli_entry e,
int  cmd,
struct ast_cli_args a 
)
static

Definition at line 205 of file taskprocessor.c.

References ao2_ref, ast_cli_args::argc, ast_cli_args::argv, ast_cli(), ast_cond_timedwait, ast_mutex_lock, ast_mutex_unlock, ast_samp2tv(), ast_taskprocessor_get(), ast_taskprocessor_push(), ast_tvadd(), ast_tvnow(), ast_tvsub(), CLI_FAILURE, CLI_GENERATE, CLI_INIT, cli_ping_cond, cli_ping_cond_lock, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, ast_cli_args::fd, name, tps_ping_handler(), TPS_REF_IF_EXISTS, tps_taskprocessor_tab_complete(), and ast_cli_entry::usage.

206 {
207  struct timeval begin, end, delta;
208  const char *name;
209  struct timeval when;
210  struct timespec ts;
211  struct ast_taskprocessor *tps = NULL;
212 
213  switch (cmd) {
214  case CLI_INIT:
215  e->command = "core ping taskprocessor";
216  e->usage =
217  "Usage: core ping taskprocessor <taskprocessor>\n"
218  " Displays the time required for a task to be processed\n";
219  return NULL;
220  case CLI_GENERATE:
221  return tps_taskprocessor_tab_complete(tps, a);
222  }
223 
224  if (a->argc != 4)
225  return CLI_SHOWUSAGE;
226 
227  name = a->argv[3];
228  if (!(tps = ast_taskprocessor_get(name, TPS_REF_IF_EXISTS))) {
229  ast_cli(a->fd, "\nping failed: %s not found\n\n", name);
230  return CLI_SUCCESS;
231  }
232  ast_cli(a->fd, "\npinging %s ...", name);
233  when = ast_tvadd((begin = ast_tvnow()), ast_samp2tv(1000, 1000));
234  ts.tv_sec = when.tv_sec;
235  ts.tv_nsec = when.tv_usec * 1000;
237  if (ast_taskprocessor_push(tps, tps_ping_handler, 0) < 0) {
238  ast_cli(a->fd, "\nping failed: could not push task to %s\n\n", name);
239  ao2_ref(tps, -1);
240  return CLI_FAILURE;
241  }
244  end = ast_tvnow();
245  delta = ast_tvsub(end, begin);
246  ast_cli(a->fd, "\n\t%24s ping time: %.1ld.%.6ld sec\n\n", name, (long)delta.tv_sec, (long int)delta.tv_usec);
247  ao2_ref(tps, -1);
248  return CLI_SUCCESS;
249 }
static ast_cond_t cli_ping_cond
CLI taskprocessor ping &lt;blah&gt;operation requires a ping condition.
Definition: taskprocessor.c:92
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap)
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
const int argc
Definition: cli.h:154
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary...
Definition: cli.h:146
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:142
#define ast_mutex_lock(a)
Definition: lock.h:155
void ast_cli(int fd, const char *fmt,...)
Definition: cli.c:105
const int fd
Definition: cli.h:153
#define ao2_ref(o, delta)
Definition: astobj2.h:472
struct timeval ast_samp2tv(unsigned int _nsamp, unsigned int _rate)
Returns a timeval corresponding to the duration of n samples at rate r. Useful to convert samples to ...
Definition: time.h:191
static ast_mutex_t cli_ping_cond_lock
CLI taskprocessor ping &lt;blah&gt;operation requires a ping condition lock.
Definition: taskprocessor.c:95
const char *const * argv
Definition: cli.h:155
#define CLI_SHOWUSAGE
Definition: cli.h:44
struct timeval ast_tvadd(struct timeval a, struct timeval b)
Returns the sum of two timevals a + b.
Definition: utils.c:1587
#define CLI_FAILURE
Definition: cli.h:45
static const char name[]
char * command
Definition: cli.h:180
const char * usage
Definition: cli.h:171
return a reference to a taskprocessor ONLY if it already exists
Definition: taskprocessor.h:60
static char * tps_taskprocessor_tab_complete(struct ast_taskprocessor *p, struct ast_cli_args *a)
#define CLI_SUCCESS
Definition: cli.h:43
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:67
static int tps_ping_handler(void *datap)
CLI taskprocessor ping &lt;blah&gt;handler function.
struct timeval ast_tvsub(struct timeval a, struct timeval b)
Returns the difference of two timevals a - b.
Definition: utils.c:1601
#define ast_cond_timedwait(cond, mutex, time)
Definition: lock.h:172
#define ast_mutex_unlock(a)
Definition: lock.h:156
static char * cli_tps_report ( struct ast_cli_entry e,
int  cmd,
struct ast_cli_args a 
)
static

Definition at line 251 of file taskprocessor.c.

References tps_taskprocessor_stats::_tasks_processed_count, ao2_container_count(), ao2_iterator_destroy(), ao2_iterator_init(), ao2_iterator_next, ao2_ref, ast_cli_args::argc, ast_cli_entry::args, ast_cli(), ast_copy_string(), CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, ast_cli_args::fd, tps_taskprocessor_stats::max_qsize, name, ast_taskprocessor::name, ast_taskprocessor::stats, ast_taskprocessor::tps_queue_size, tps_singletons, and ast_cli_entry::usage.

252 {
253  char name[256];
254  int tcount;
255  unsigned long qsize;
256  unsigned long maxqsize;
257  unsigned long processed;
258  struct ast_taskprocessor *p;
259  struct ao2_iterator i;
260 
261  switch (cmd) {
262  case CLI_INIT:
263  e->command = "core show taskprocessors";
264  e->usage =
265  "Usage: core show taskprocessors\n"
266  " Shows a list of instantiated task processors and their statistics\n";
267  return NULL;
268  case CLI_GENERATE:
269  return NULL;
270  }
271 
272  if (a->argc != e->args)
273  return CLI_SHOWUSAGE;
274 
275  ast_cli(a->fd, "\n\t+----- Processor -----+--- Processed ---+- In Queue -+- Max Depth -+");
277  while ((p = ao2_iterator_next(&i))) {
278  ast_copy_string(name, p->name, sizeof(name));
279  qsize = p->tps_queue_size;
280  maxqsize = p->stats->max_qsize;
281  processed = p->stats->_tasks_processed_count;
282  ast_cli(a->fd, "\n%24s %17lu %12lu %12lu", name, processed, qsize, maxqsize);
283  ao2_ref(p, -1);
284  }
287  ast_cli(a->fd, "\n\t+---------------------+-----------------+------------+-------------+\n\t%d taskprocessors\n\n", tcount);
288  return CLI_SUCCESS;
289 }
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
Definition: astobj2.c:470
const char * name
Friendly name of the taskprocessor.
Definition: taskprocessor.c:69
#define ao2_iterator_next(arg1)
Definition: astobj2.h:1126
const int argc
Definition: cli.h:154
static struct ao2_container * tps_singletons
tps_singletons is the astobj2 container for taskprocessor singletons
Definition: taskprocessor.c:89
Definition: cli.h:146
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags)
Create an iterator for a container.
Definition: astobj2.c:818
void ast_cli(int fd, const char *fmt,...)
Definition: cli.c:105
int args
This gets set in ast_cli_register()
Definition: cli.h:179
unsigned long _tasks_processed_count
This is the current number of tasks processed.
Definition: taskprocessor.c:63
const int fd
Definition: cli.h:153
#define ao2_ref(o, delta)
Definition: astobj2.h:472
#define CLI_SHOWUSAGE
Definition: cli.h:44
static const char name[]
char * command
Definition: cli.h:180
void ao2_iterator_destroy(struct ao2_iterator *i)
Destroy a container iterator.
Definition: astobj2.c:833
const char * usage
Definition: cli.h:171
#define CLI_SUCCESS
Definition: cli.h:43
long tps_queue_size
Taskprocessor current queue size.
Definition: taskprocessor.c:81
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:67
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
Definition: astobj2.h:1053
void ast_copy_string(char *dst, const char *src, size_t size)
Size-limited null-terminating string copy.
Definition: strings.h:223
struct tps_taskprocessor_stats * stats
Taskprocessor statistics.
Definition: taskprocessor.c:79
unsigned long max_qsize
This is the maximum number of tasks queued at any one time.
Definition: taskprocessor.c:61
static int tps_cmp_cb ( void *  obj,
void *  arg,
int  flags 
)
static

The astobj2 compare callback for taskprocessors.

Definition at line 355 of file taskprocessor.c.

References CMP_MATCH, CMP_STOP, and ast_taskprocessor::name.

Referenced by ast_tps_init().

356 {
357  struct ast_taskprocessor *lhs = obj, *rhs = arg;
358 
359  return !strcasecmp(lhs->name, rhs->name) ? CMP_MATCH | CMP_STOP : 0;
360 }
const char * name
Friendly name of the taskprocessor.
Definition: taskprocessor.c:69
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:67
static int tps_hash_cb ( const void *  obj,
const int  flags 
)
static

The astobj2 hash callback for taskprocessors.

Definition at line 347 of file taskprocessor.c.

References ast_str_case_hash(), and ast_taskprocessor::name.

Referenced by ast_tps_init().

348 {
349  const struct ast_taskprocessor *tps = obj;
350 
351  return ast_str_case_hash(tps->name);
352 }
const char * name
Friendly name of the taskprocessor.
Definition: taskprocessor.c:69
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:67
static force_inline int attribute_pure ast_str_case_hash(const char *str)
Compute a hash value on a case-insensitive string.
Definition: strings.h:989
static int tps_ping_handler ( void *  datap)
static

CLI taskprocessor ping <blah>handler function.

Definition at line 196 of file taskprocessor.c.

References ast_cond_signal, ast_mutex_lock, ast_mutex_unlock, cli_ping_cond, and cli_ping_cond_lock.

Referenced by cli_tps_ping().

197 {
201  return 0;
202 }
static ast_cond_t cli_ping_cond
CLI taskprocessor ping &lt;blah&gt;operation requires a ping condition.
Definition: taskprocessor.c:92
#define ast_mutex_lock(a)
Definition: lock.h:155
#define ast_cond_signal(cond)
Definition: lock.h:169
static ast_mutex_t cli_ping_cond_lock
CLI taskprocessor ping &lt;blah&gt;operation requires a ping condition lock.
Definition: taskprocessor.c:95
#define ast_mutex_unlock(a)
Definition: lock.h:156
static void * tps_processing_function ( void *  data)
static

The task processing function executed by a taskprocessor.

Definition at line 292 of file taskprocessor.c.

References tps_taskprocessor_stats::_tasks_processed_count, ast_cond_wait, ast_log(), ast_mutex_lock, ast_mutex_unlock, tps_task::datap, tps_task::execute, LOG_ERROR, LOG_WARNING, tps_taskprocessor_stats::max_qsize, ast_taskprocessor::poll_cond, ast_taskprocessor::poll_thread_run, ast_taskprocessor::stats, ast_taskprocessor::taskprocessor_lock, tps_task_free(), tps_taskprocessor_depth(), and tps_taskprocessor_pop().

Referenced by ast_taskprocessor_get().

293 {
294  struct ast_taskprocessor *i = data;
295  struct tps_task *t;
296  int size;
297 
298  if (!i) {
299  ast_log(LOG_ERROR, "cannot start thread_function loop without a ast_taskprocessor structure.\n");
300  return NULL;
301  }
302 
303  while (i->poll_thread_run) {
305  if (!i->poll_thread_run) {
307  break;
308  }
309  if (!(size = tps_taskprocessor_depth(i))) {
311  if (!i->poll_thread_run) {
313  break;
314  }
315  }
317  /* stuff is in the queue */
318  if (!(t = tps_taskprocessor_pop(i))) {
319  ast_log(LOG_ERROR, "Wtf?? %d tasks in the queue, but we're popping blanks!\n", size);
320  continue;
321  }
322  if (!t->execute) {
323  ast_log(LOG_WARNING, "Task is missing a function to execute!\n");
324  tps_task_free(t);
325  continue;
326  }
327  t->execute(t->datap);
328 
330  if (i->stats) {
332  if (size > i->stats->max_qsize) {
333  i->stats->max_qsize = size;
334  }
335  }
337 
338  tps_task_free(t);
339  }
340  while ((t = tps_taskprocessor_pop(i))) {
341  tps_task_free(t);
342  }
343  return NULL;
344 }
#define LOG_WARNING
Definition: logger.h:144
ast_cond_t poll_cond
Thread poll condition.
Definition: taskprocessor.c:71
#define ast_cond_wait(cond, mutex)
Definition: lock.h:171
#define ast_mutex_lock(a)
Definition: lock.h:155
static struct tps_task * tps_taskprocessor_pop(struct ast_taskprocessor *tps)
Remove the front task off the taskprocessor queue.
void * datap
The data pointer for the task execute() function.
Definition: taskprocessor.c:53
ast_mutex_t taskprocessor_lock
Taskprocessor lock.
Definition: taskprocessor.c:75
unsigned long _tasks_processed_count
This is the current number of tasks processed.
Definition: taskprocessor.c:63
#define LOG_ERROR
Definition: logger.h:155
void ast_log(int level, const char *file, int line, const char *function, const char *fmt,...)
Used for sending a log message This is the standard logger function. Probably the only way you will i...
Definition: logger.c:1207
unsigned char poll_thread_run
Taskprocesor thread run flag.
Definition: taskprocessor.c:77
int(* execute)(void *datap)
The execute() task callback function pointer.
Definition: taskprocessor.c:51
tps_task structure is queued to a taskprocessor
Definition: taskprocessor.c:49
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:67
static int tps_taskprocessor_depth(struct ast_taskprocessor *tps)
Return the size of the taskprocessor queue.
struct tps_taskprocessor_stats * stats
Taskprocessor statistics.
Definition: taskprocessor.c:79
unsigned long max_qsize
This is the maximum number of tasks queued at any one time.
Definition: taskprocessor.c:61
static void * tps_task_free(struct tps_task *task)
#define ast_mutex_unlock(a)
Definition: lock.h:156
static void tps_shutdown ( void  )
static

Definition at line 126 of file taskprocessor.c.

References ao2_t_ref, ARRAY_LEN, ast_cli_unregister_multiple(), taskprocessor_clis, and tps_singletons.

Referenced by ast_tps_init().

127 {
129  ao2_t_ref(tps_singletons, -1, "Unref tps_singletons in shutdown");
130  tps_singletons = NULL;
131 }
#define ao2_t_ref(o, delta, tag)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:471
#define ARRAY_LEN(a)
Definition: isdn_lib.c:42
int ast_cli_unregister_multiple(struct ast_cli_entry *e, int len)
Unregister multiple commands.
Definition: cli.c:2177
static struct ao2_container * tps_singletons
tps_singletons is the astobj2 container for taskprocessor singletons
Definition: taskprocessor.c:89
static struct ast_cli_entry taskprocessor_clis[]
static struct tps_task* tps_task_alloc ( int(*)(void *datap)  task_exe,
void *  datap 
)
static

Definition at line 151 of file taskprocessor.c.

References ast_calloc, tps_task::datap, and tps_task::execute.

Referenced by ast_taskprocessor_push().

152 {
153  struct tps_task *t;
154  if ((t = ast_calloc(1, sizeof(*t)))) {
155  t->execute = task_exe;
156  t->datap = datap;
157  }
158  return t;
159 }
void * datap
The data pointer for the task execute() function.
Definition: taskprocessor.c:53
int(* execute)(void *datap)
The execute() task callback function pointer.
Definition: taskprocessor.c:51
tps_task structure is queued to a taskprocessor
Definition: taskprocessor.c:49
#define ast_calloc(a, b)
Definition: astmm.h:82
static void* tps_task_free ( struct tps_task task)
static

Definition at line 162 of file taskprocessor.c.

References ast_free.

Referenced by tps_processing_function().

163 {
164  if (task) {
165  ast_free(task);
166  }
167  return NULL;
168 }
#define ast_free(a)
Definition: astmm.h:97
static int tps_taskprocessor_depth ( struct ast_taskprocessor tps)
static

Return the size of the taskprocessor queue.

Definition at line 406 of file taskprocessor.c.

References ast_taskprocessor::tps_queue_size.

Referenced by tps_processing_function().

407 {
408  return (tps) ? tps->tps_queue_size : -1;
409 }
long tps_queue_size
Taskprocessor current queue size.
Definition: taskprocessor.c:81
static void tps_taskprocessor_destroy ( void *  tps)
static

Destroy the taskprocessor when its refcount reaches zero.

Definition at line 363 of file taskprocessor.c.

References ast_cond_destroy, ast_cond_signal, ast_free, ast_log(), ast_mutex_destroy, ast_mutex_lock, ast_mutex_unlock, AST_PTHREADT_NULL, LOG_DEBUG, LOG_ERROR, ast_taskprocessor::name, ast_taskprocessor::poll_cond, ast_taskprocessor::poll_thread, ast_taskprocessor::poll_thread_run, ast_taskprocessor::stats, and ast_taskprocessor::taskprocessor_lock.

Referenced by ast_taskprocessor_get().

364 {
365  struct ast_taskprocessor *t = tps;
366 
367  if (!tps) {
368  ast_log(LOG_ERROR, "missing taskprocessor\n");
369  return;
370  }
371  ast_log(LOG_DEBUG, "destroying taskprocessor '%s'\n", t->name);
372  /* kill it */
374  t->poll_thread_run = 0;
377  pthread_join(t->poll_thread, NULL);
381  /* free it */
382  if (t->stats) {
383  ast_free(t->stats);
384  t->stats = NULL;
385  }
386  ast_free((char *) t->name);
387 }
const char * name
Friendly name of the taskprocessor.
Definition: taskprocessor.c:69
ast_cond_t poll_cond
Thread poll condition.
Definition: taskprocessor.c:71
#define ast_mutex_lock(a)
Definition: lock.h:155
#define LOG_DEBUG
Definition: logger.h:122
#define ast_cond_signal(cond)
Definition: lock.h:169
ast_mutex_t taskprocessor_lock
Taskprocessor lock.
Definition: taskprocessor.c:75
#define AST_PTHREADT_NULL
Definition: lock.h:65
#define LOG_ERROR
Definition: logger.h:155
void ast_log(int level, const char *file, int line, const char *function, const char *fmt,...)
Used for sending a log message This is the standard logger function. Probably the only way you will i...
Definition: logger.c:1207
#define ast_cond_destroy(cond)
Definition: lock.h:168
unsigned char poll_thread_run
Taskprocesor thread run flag.
Definition: taskprocessor.c:77
#define ast_free(a)
Definition: astmm.h:97
pthread_t poll_thread
Taskprocessor thread.
Definition: taskprocessor.c:73
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:67
#define ast_mutex_destroy(a)
Definition: lock.h:154
struct tps_taskprocessor_stats * stats
Taskprocessor statistics.
Definition: taskprocessor.c:79
#define ast_mutex_unlock(a)
Definition: lock.h:156
static struct tps_task * tps_taskprocessor_pop ( struct ast_taskprocessor tps)
static

Remove the front task off the taskprocessor queue.

Definition at line 390 of file taskprocessor.c.

References AST_LIST_REMOVE_HEAD, ast_log(), ast_mutex_lock, ast_mutex_unlock, LOG_ERROR, ast_taskprocessor::taskprocessor_lock, ast_taskprocessor::tps_queue, and ast_taskprocessor::tps_queue_size.

Referenced by tps_processing_function().

391 {
392  struct tps_task *task;
393 
394  if (!tps) {
395  ast_log(LOG_ERROR, "missing taskprocessor\n");
396  return NULL;
397  }
399  if ((task = AST_LIST_REMOVE_HEAD(&tps->tps_queue, list))) {
400  tps->tps_queue_size--;
401  }
403  return task;
404 }
#define ast_mutex_lock(a)
Definition: lock.h:155
ast_mutex_t taskprocessor_lock
Taskprocessor lock.
Definition: taskprocessor.c:75
#define AST_LIST_REMOVE_HEAD(head, field)
Removes and returns the head entry from a list.
Definition: linkedlists.h:818
#define LOG_ERROR
Definition: logger.h:155
void ast_log(int level, const char *file, int line, const char *function, const char *fmt,...)
Used for sending a log message This is the standard logger function. Probably the only way you will i...
Definition: logger.c:1207
struct tps_task::@303 list
AST_LIST_ENTRY overhead.
struct ast_taskprocessor::tps_queue tps_queue
tps_task structure is queued to a taskprocessor
Definition: taskprocessor.c:49
long tps_queue_size
Taskprocessor current queue size.
Definition: taskprocessor.c:81
#define ast_mutex_unlock(a)
Definition: lock.h:156
static char* tps_taskprocessor_tab_complete ( struct ast_taskprocessor p,
struct ast_cli_args a 
)
static

Definition at line 171 of file taskprocessor.c.

References ao2_iterator_destroy(), ao2_iterator_init(), ao2_iterator_next, ao2_ref, ast_strdup, ast_cli_args::n, name, ast_taskprocessor::name, ast_cli_args::pos, tps_singletons, and ast_cli_args::word.

Referenced by cli_tps_ping().

172 {
173  int tklen;
174  int wordnum = 0;
175  char *name = NULL;
176  struct ao2_iterator i;
177 
178  if (a->pos != 3)
179  return NULL;
180 
181  tklen = strlen(a->word);
183  while ((p = ao2_iterator_next(&i))) {
184  if (!strncasecmp(a->word, p->name, tklen) && ++wordnum > a->n) {
185  name = ast_strdup(p->name);
186  ao2_ref(p, -1);
187  break;
188  }
189  ao2_ref(p, -1);
190  }
192  return name;
193 }
const char * name
Friendly name of the taskprocessor.
Definition: taskprocessor.c:69
#define ast_strdup(a)
Definition: astmm.h:109
#define ao2_iterator_next(arg1)
Definition: astobj2.h:1126
static struct ao2_container * tps_singletons
tps_singletons is the astobj2 container for taskprocessor singletons
Definition: taskprocessor.c:89
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags)
Create an iterator for a container.
Definition: astobj2.c:818
const int n
Definition: cli.h:159
#define ao2_ref(o, delta)
Definition: astobj2.h:472
static const char name[]
const char * word
Definition: cli.h:157
void ao2_iterator_destroy(struct ao2_iterator *i)
Destroy a container iterator.
Definition: astobj2.c:833
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
Definition: astobj2.h:1053
const int pos
Definition: cli.h:158

Variable Documentation

ast_cond_t cli_ping_cond
static

CLI taskprocessor ping <blah>operation requires a ping condition.

Definition at line 92 of file taskprocessor.c.

Referenced by ast_tps_init(), cli_tps_ping(), and tps_ping_handler().

ast_mutex_t cli_ping_cond_lock = { PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP , NULL, 1 }
static

CLI taskprocessor ping <blah>operation requires a ping condition lock.

Definition at line 95 of file taskprocessor.c.

Referenced by cli_tps_ping(), and tps_ping_handler().

struct ast_cli_entry taskprocessor_clis[]
static
Initial value:
= {
AST_CLI_DEFINE(cli_tps_ping, "Ping a named task processor"),
AST_CLI_DEFINE(cli_tps_report, "List instantiated task processors and statistics"),
}
#define AST_CLI_DEFINE(fn, txt,...)
Definition: cli.h:191
static char * cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
static char * cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)

Definition at line 120 of file taskprocessor.c.

Referenced by ast_tps_init(), and tps_shutdown().

struct ao2_container* tps_singletons
static

tps_singletons is the astobj2 container for taskprocessor singletons

Definition at line 89 of file taskprocessor.c.

Referenced by ast_taskprocessor_get(), ast_taskprocessor_unreference(), ast_tps_init(), cli_tps_report(), tps_shutdown(), and tps_taskprocessor_tab_complete().