34 #include "asterisk/_private.h"
87 #define TPS_MAX_BUCKETS 7
98 static int tps_hash_cb(
const void *obj,
const int flags);
100 static int tps_cmp_cb(
void *obj,
void *arg,
int flags);
181 tklen = strlen(a->
word);
184 if (!strncasecmp(a->
word, p->
name, tklen) && ++wordnum > a->
n) {
207 struct timeval begin, end, delta;
215 e->
command =
"core ping taskprocessor";
217 "Usage: core ping taskprocessor <taskprocessor>\n"
218 " Displays the time required for a task to be processed\n";
229 ast_cli(a->
fd,
"\nping failed: %s not found\n\n", name);
232 ast_cli(a->
fd,
"\npinging %s ...", name);
234 ts.tv_sec = when.tv_sec;
235 ts.tv_nsec = when.tv_usec * 1000;
238 ast_cli(a->
fd,
"\nping failed: could not push task to %s\n\n", name);
246 ast_cli(a->
fd,
"\n\t%24s ping time: %.1ld.%.6ld sec\n\n", name, (
long)delta.tv_sec, (
long int)delta.tv_usec);
256 unsigned long maxqsize;
257 unsigned long processed;
263 e->
command =
"core show taskprocessors";
265 "Usage: core show taskprocessors\n"
266 " Shows a list of instantiated task processors and their statistics\n";
275 ast_cli(a->
fd,
"\n\t+----- Processor -----+--- Processed ---+- In Queue -+- Max Depth -+");
282 ast_cli(a->
fd,
"\n%24s %17lu %12lu %12lu", name, processed, qsize, maxqsize);
287 ast_cli(a->
fd,
"\n\t+---------------------+-----------------+------------+-------------+\n\t%d taskprocessors\n\n", tcount);
299 ast_log(
LOG_ERROR,
"cannot start thread_function loop without a ast_taskprocessor structure.\n");
319 ast_log(
LOG_ERROR,
"Wtf?? %d tasks in the queue, but we're popping blanks!\n", size);
503 if (!tps || !task_exe) {
504 ast_log(
LOG_ERROR,
"%s is missing!!\n", (tps) ?
"task callback" :
"taskprocessor");
#define ao2_t_ref(o, delta, tag)
Reference/unreference an object and return the old refcount.
static ast_cond_t cli_ping_cond
CLI taskprocessor ping <blah>operation requires a ping condition.
#define AST_CLI_DEFINE(fn, txt,...)
Asterisk main include file. File version handling, generic pbx functions.
#define ao2_link(arg1, arg2)
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
const char * name
Friendly name of the taskprocessor.
static void tps_shutdown(void)
int ast_cli_unregister_multiple(struct ast_cli_entry *e, int len)
Unregister multiple commands.
Time-related functions and macros.
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap)
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
static int tps_cmp_cb(void *obj, void *arg, int flags)
The astobj2 compare callback for taskprocessors.
#define ao2_iterator_next(arg1)
descriptor for a cli entry.
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary...
static int tps_hash_cb(const void *obj, const int flags)
The astobj2 hash callback for taskprocessors.
ast_cond_t poll_cond
Thread poll condition.
static struct ao2_container * tps_singletons
tps_singletons is the astobj2 container for taskprocessor singletons
#define ast_cond_wait(cond, mutex)
#define ast_cond_init(cond, attr)
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
const char * ast_taskprocessor_name(struct ast_taskprocessor *tps)
Return the name of the taskprocessor singleton.
#define ast_mutex_lock(a)
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags)
Create an iterator for a container.
void ast_cli(int fd, const char *fmt,...)
static struct tps_task * tps_taskprocessor_pop(struct ast_taskprocessor *tps)
Remove the front task off the taskprocessor queue.
#define ast_cond_signal(cond)
int args
This gets set in ast_cli_register()
pthread_cond_t ast_cond_t
static void * tps_processing_function(void *data)
The task processing function executed by a taskprocessor.
void * datap
The data pointer for the task execute() function.
ast_mutex_t taskprocessor_lock
Taskprocessor lock.
unsigned long _tasks_processed_count
This is the current number of tasks processed.
static void tps_taskprocessor_destroy(void *tps)
Destroy the taskprocessor when its refcount reaches zero.
#define AST_PTHREADT_NULL
static force_inline int attribute_pure ast_strlen_zero(const char *s)
#define ao2_ref(o, delta)
int ast_register_atexit(void(*func)(void))
Register a function to be executed before Asterisk exits.
struct timeval ast_samp2tv(unsigned int _nsamp, unsigned int _rate)
Returns a timeval corresponding to the duration of n samples at rate r. Useful to convert samples to ...
static ast_mutex_t cli_ping_cond_lock
CLI taskprocessor ping <blah>operation requires a ping condition lock.
#define AST_LIST_REMOVE_HEAD(head, field)
Removes and returns the head entry from a list.
#define AST_LIST_HEAD_NOLOCK(name, type)
Defines a structure to be used to hold a list of specified type (with no lock).
#define AST_LIST_INSERT_TAIL(head, elm, field)
Appends a list entry to the tail of a list.
ast_tps_options
ast_tps_options for specification of taskprocessor options
struct timeval ast_tvadd(struct timeval a, struct timeval b)
Returns the sum of two timevals a + b.
void ast_log(int level, const char *file, int line, const char *function, const char *fmt,...)
Used for sending a log message This is the standard logger function. Probably the only way you will i...
#define ast_cond_destroy(cond)
#define ao2_alloc(data_size, destructor_fn)
struct tps_task::@303 list
AST_LIST_ENTRY overhead.
#define AST_LIST_ENTRY(type)
Declare a forward link structure inside a list entry.
unsigned char poll_thread_run
Taskprocesor thread run flag.
#define ao2_find(arg1, arg2, arg3)
#define ast_pthread_create(a, b, c, d)
An API for managing task processing threads that can be shared across modules.
int(* execute)(void *datap)
The execute() task callback function pointer.
void ao2_iterator_destroy(struct ao2_iterator *i)
Destroy a container iterator.
struct ast_taskprocessor::tps_queue tps_queue
tps_taskprocessor_stats maintain statistics for a taskprocessor.
tps_task structure is queued to a taskprocessor
pthread_t poll_thread
Taskprocessor thread.
return a reference to a taskprocessor ONLY if it already exists
static char * tps_taskprocessor_tab_complete(struct ast_taskprocessor *p, struct ast_cli_args *a)
long tps_queue_size
Taskprocessor current queue size.
A ast_taskprocessor structure is a singleton by name.
static struct ast_cli_entry taskprocessor_clis[]
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
static struct tps_task * tps_task_alloc(int(*task_exe)(void *datap), void *datap)
Standard Command Line Interface.
void ast_copy_string(char *dst, const char *src, size_t size)
Size-limited null-terminating string copy.
#define ao2_container_alloc(arg1, arg2, arg3)
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
int ast_cli_register_multiple(struct ast_cli_entry *e, int len)
Register multiple commands.
static int tps_ping_handler(void *datap)
CLI taskprocessor ping <blah>handler function.
static int tps_taskprocessor_depth(struct ast_taskprocessor *tps)
Return the size of the taskprocessor queue.
static char * cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
struct timeval ast_tvsub(struct timeval a, struct timeval b)
Returns the difference of two timevals a - b.
#define ast_mutex_init(pmutex)
#define ast_mutex_destroy(a)
Asterisk module definitions.
struct tps_taskprocessor_stats * stats
Taskprocessor statistics.
unsigned long max_qsize
This is the maximum number of tasks queued at any one time.
#define ao2_unlink(arg1, arg2)
#define ast_cond_timedwait(cond, mutex, time)
static void * tps_task_free(struct tps_task *task)
#define AST_MUTEX_DEFINE_STATIC(mutex)
Structure for mutex and tracking information.
static force_inline int attribute_pure ast_str_case_hash(const char *str)
Compute a hash value on a case-insensitive string.
static char * cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
#define ASTERISK_FILE_VERSION(file, version)
Register/unregister a source code file with the core.
#define ast_mutex_unlock(a)