Wed Jan 8 2020 09:49:50

Asterisk developer's documentation


res_timing_pthread.c
Go to the documentation of this file.
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2008, Digium, Inc.
5  *
6  * Russell Bryant <russell@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18 
19 /*!
20  * \file
21  * \author Russell Bryant <russell@digium.com>
22  *
23  * \brief pthread timing interface
24  */
25 
26 /*** MODULEINFO
27  <support_level>extended</support_level>
28  ***/
29 
30 #include "asterisk.h"
31 
32 ASTERISK_FILE_VERSION(__FILE__, "$Revision: 386109 $");
33 
34 #include <stdbool.h>
35 #include <math.h>
36 #include <unistd.h>
37 #include <fcntl.h>
38 
39 #include "asterisk/module.h"
40 #include "asterisk/timing.h"
41 #include "asterisk/utils.h"
42 #include "asterisk/astobj2.h"
43 #include "asterisk/time.h"
44 #include "asterisk/lock.h"
45 
46 static void *timing_funcs_handle;
47 
48 static int pthread_timer_open(void);
49 static void pthread_timer_close(int handle);
50 static int pthread_timer_set_rate(int handle, unsigned int rate);
51 static int pthread_timer_ack(int handle, unsigned int quantity);
52 static int pthread_timer_enable_continuous(int handle);
53 static int pthread_timer_disable_continuous(int handle);
54 static enum ast_timer_event pthread_timer_get_event(int handle);
55 static unsigned int pthread_timer_get_max_rate(int handle);
56 
58  .name = "pthread",
59  .priority = 0, /* use this as a last resort */
60  .timer_open = pthread_timer_open,
61  .timer_close = pthread_timer_close,
62  .timer_set_rate = pthread_timer_set_rate,
63  .timer_ack = pthread_timer_ack,
64  .timer_enable_continuous = pthread_timer_enable_continuous,
65  .timer_disable_continuous = pthread_timer_disable_continuous,
66  .timer_get_event = pthread_timer_get_event,
67  .timer_get_max_rate = pthread_timer_get_max_rate,
68 };
69 
70 /* 1 tick / 10 ms */
71 #define MAX_RATE 100
72 
74 #define PTHREAD_TIMER_BUCKETS 563
75 
76 enum {
77  PIPE_READ = 0,
79 };
80 
84 };
85 
86 struct pthread_timer {
87  int pipe[2];
89  unsigned int rate;
90  /*! Interval in ms for current rate */
91  unsigned int interval;
92  unsigned int tick_count;
93  unsigned int pending_ticks;
94  struct timeval start;
95  bool continuous:1;
96  bool pipe_signaled:1;
97 };
98 
99 static void pthread_timer_destructor(void *obj);
100 static struct pthread_timer *find_timer(int handle, int unlinkobj);
101 static void signal_pipe(struct pthread_timer *timer);
102 static void unsignal_pipe(struct pthread_timer *timer);
103 static void ack_ticks(struct pthread_timer *timer, unsigned int num);
104 
105 /*!
106  * \brief Data for the timing thread
107  */
108 static struct {
109  pthread_t thread;
112  unsigned int stop:1;
113 } timing_thread;
114 
115 static int pthread_timer_open(void)
116 {
117  struct pthread_timer *timer;
118  int fd;
119  int i;
120 
121  if (!(timer = ao2_alloc(sizeof(*timer), pthread_timer_destructor))) {
122  errno = ENOMEM;
123  return -1;
124  }
125 
126  timer->pipe[PIPE_READ] = timer->pipe[PIPE_WRITE] = -1;
127  timer->state = TIMER_STATE_IDLE;
128 
129  if (pipe(timer->pipe)) {
130  ao2_ref(timer, -1);
131  return -1;
132  }
133 
134  for (i = 0; i < ARRAY_LEN(timer->pipe); ++i) {
135  int flags = fcntl(timer->pipe[i], F_GETFL);
136  flags |= O_NONBLOCK;
137  fcntl(timer->pipe[i], F_SETFL, flags);
138  }
139 
140  ao2_lock(pthread_timers);
141  if (!ao2_container_count(pthread_timers)) {
145  }
146  ao2_link(pthread_timers, timer);
147  ao2_unlock(pthread_timers);
148 
149  fd = timer->pipe[PIPE_READ];
150 
151  ao2_ref(timer, -1);
152 
153  return fd;
154 }
155 
156 static void pthread_timer_close(int handle)
157 {
158  struct pthread_timer *timer;
159 
160  if (!(timer = find_timer(handle, 1))) {
161  return;
162  }
163 
164  ao2_ref(timer, -1);
165 }
166 
167 static int pthread_timer_set_rate(int handle, unsigned int rate)
168 {
169  struct pthread_timer *timer;
170 
171  if (!(timer = find_timer(handle, 0))) {
172  errno = EINVAL;
173  return -1;
174  }
175 
176  if (rate > MAX_RATE) {
177  ast_log(LOG_ERROR, "res_timing_pthread only supports timers at a "
178  "max rate of %d / sec\n", MAX_RATE);
179  errno = EINVAL;
180  return -1;
181  }
182 
183  ao2_lock(timer);
184 
185  if ((timer->rate = rate)) {
186  timer->interval = roundf(1000.0 / ((float) rate));
187  timer->start = ast_tvnow();
188  timer->state = TIMER_STATE_TICKING;
189  } else {
190  timer->interval = 0;
191  timer->start = ast_tv(0, 0);
192  timer->state = TIMER_STATE_IDLE;
193  }
194  timer->tick_count = 0;
195 
196  ao2_unlock(timer);
197 
198  ao2_ref(timer, -1);
199 
200  return 0;
201 }
202 
203 static int pthread_timer_ack(int handle, unsigned int quantity)
204 {
205  struct pthread_timer *timer;
206 
207  ast_assert(quantity > 0);
208 
209  if (!(timer = find_timer(handle, 0))) {
210  return -1;
211  }
212 
213  ao2_lock(timer);
214  ack_ticks(timer, quantity);
215  ao2_unlock(timer);
216 
217  ao2_ref(timer, -1);
218 
219  return 0;
220 }
221 
222 static int pthread_timer_enable_continuous(int handle)
223 {
224  struct pthread_timer *timer;
225 
226  if (!(timer = find_timer(handle, 0))) {
227  errno = EINVAL;
228  return -1;
229  }
230 
231  ao2_lock(timer);
232  if (!timer->continuous) {
233  timer->continuous = true;
234  signal_pipe(timer);
235  }
236  ao2_unlock(timer);
237 
238  ao2_ref(timer, -1);
239 
240  return 0;
241 }
242 
243 static int pthread_timer_disable_continuous(int handle)
244 {
245  struct pthread_timer *timer;
246 
247  if (!(timer = find_timer(handle, 0))) {
248  errno = EINVAL;
249  return -1;
250  }
251 
252  ao2_lock(timer);
253  if (timer->continuous) {
254  timer->continuous = false;
255  unsignal_pipe(timer);
256  }
257  ao2_unlock(timer);
258 
259  ao2_ref(timer, -1);
260 
261  return 0;
262 }
263 
265 {
266  struct pthread_timer *timer;
268 
269  if (!(timer = find_timer(handle, 0))) {
270  return res;
271  }
272 
273  ao2_lock(timer);
274  if (timer->continuous) {
276  }
277  ao2_unlock(timer);
278 
279  ao2_ref(timer, -1);
280 
281  return res;
282 }
283 
284 static unsigned int pthread_timer_get_max_rate(int handle)
285 {
286  return MAX_RATE;
287 }
288 
289 static struct pthread_timer *find_timer(int handle, int unlinkobj)
290 {
291  struct pthread_timer *timer;
292  struct pthread_timer tmp_timer;
293  int flags = OBJ_POINTER;
294 
295  tmp_timer.pipe[PIPE_READ] = handle;
296 
297  if (unlinkobj) {
298  flags |= OBJ_UNLINK;
299  }
300 
301  if (!(timer = ao2_find(pthread_timers, &tmp_timer, flags))) {
302  ast_assert(timer != NULL);
303  return NULL;
304  }
305 
306  return timer;
307 }
308 
309 static void pthread_timer_destructor(void *obj)
310 {
311  struct pthread_timer *timer = obj;
312 
313  if (timer->pipe[PIPE_READ] > -1) {
314  close(timer->pipe[PIPE_READ]);
315  timer->pipe[PIPE_READ] = -1;
316  }
317 
318  if (timer->pipe[PIPE_WRITE] > -1) {
319  close(timer->pipe[PIPE_WRITE]);
320  timer->pipe[PIPE_WRITE] = -1;
321  }
322 }
323 
324 /*!
325  * \note only PIPE_READ is guaranteed valid
326  */
327 static int pthread_timer_hash(const void *obj, const int flags)
328 {
329  const struct pthread_timer *timer = obj;
330 
331  return timer->pipe[PIPE_READ];
332 }
333 
334 /*!
335  * \note only PIPE_READ is guaranteed valid
336  */
337 static int pthread_timer_cmp(void *obj, void *arg, int flags)
338 {
339  struct pthread_timer *timer1 = obj, *timer2 = arg;
340 
341  return (timer1->pipe[PIPE_READ] == timer2->pipe[PIPE_READ]) ? CMP_MATCH | CMP_STOP : 0;
342 }
343 
344 /*!
345  * \retval 0 no timer tick needed
346  * \retval non-zero write to the timing pipe needed
347  */
348 static int check_timer(struct pthread_timer *timer)
349 {
350  struct timeval now;
351 
352  if (timer->state == TIMER_STATE_IDLE) {
353  return 0;
354  }
355 
356  now = ast_tvnow();
357 
358  if (timer->tick_count < (ast_tvdiff_ms(now, timer->start) / timer->interval)) {
359  timer->tick_count++;
360  if (!timer->tick_count) {
361  /* Handle overflow. */
362  timer->start = now;
363  }
364  return 1;
365  }
366 
367  return 0;
368 }
369 
370 /*!
371  * \internal
372  * \pre timer is locked
373  */
374 static void ack_ticks(struct pthread_timer *timer, unsigned int quantity)
375 {
376  int pending_ticks = timer->pending_ticks;
377 
378  ast_assert(quantity);
379 
380  if (quantity > pending_ticks) {
381  quantity = pending_ticks;
382  }
383 
384  if (!quantity) {
385  return;
386  }
387 
388  timer->pending_ticks -= quantity;
389 
390  if ((0 == timer->pending_ticks) && !timer->continuous) {
391  unsignal_pipe(timer);
392  }
393 }
394 
395 /*!
396  * \internal
397  * \pre timer is locked
398  */
399 static void signal_pipe(struct pthread_timer *timer)
400 {
401  ssize_t res;
402  unsigned char x = 42;
403 
404  if (timer->pipe_signaled) {
405  return;
406  }
407 
408  res = write(timer->pipe[PIPE_WRITE], &x, 1);
409  if (-1 == res) {
410  ast_log(LOG_ERROR, "Error writing to timing pipe: %s\n",
411  strerror(errno));
412  } else {
413  timer->pipe_signaled = true;
414  }
415 }
416 
417 /*!
418  * \internal
419  * \pre timer is locked
420  */
421 static void unsignal_pipe(struct pthread_timer *timer)
422 {
423  ssize_t res;
424  unsigned long buffer;
425 
426  if (!timer->pipe_signaled) {
427  return;
428  }
429 
430  res = read(timer->pipe[PIPE_READ], &buffer, sizeof(buffer));
431  if (-1 == res) {
432  ast_log(LOG_ERROR, "Error reading from pipe: %s\n",
433  strerror(errno));
434  } else {
435  timer->pipe_signaled = false;
436  }
437 }
438 
439 static int run_timer(void *obj, void *arg, int flags)
440 {
441  struct pthread_timer *timer = obj;
442 
443  if (timer->state == TIMER_STATE_IDLE) {
444  return 0;
445  }
446 
447  ao2_lock(timer);
448  if (check_timer(timer)) {
449  timer->pending_ticks++;
450  signal_pipe(timer);
451  }
452  ao2_unlock(timer);
453 
454  return 0;
455 }
456 
457 static void *do_timing(void *arg)
458 {
459  struct timeval next_wakeup = ast_tvnow();
460 
461  while (!timing_thread.stop) {
462  struct timespec ts = { 0, };
463 
464  ao2_callback(pthread_timers, OBJ_NODATA, run_timer, NULL);
465 
466  next_wakeup = ast_tvadd(next_wakeup, ast_tv(0, 5000));
467 
468  ts.tv_sec = next_wakeup.tv_sec;
469  ts.tv_nsec = next_wakeup.tv_usec * 1000;
470 
472  if (!timing_thread.stop) {
473  if (ao2_container_count(pthread_timers)) {
475  } else {
477  }
478  }
480  }
481 
482  return NULL;
483 }
484 
485 static int init_timing_thread(void)
486 {
488  ast_cond_init(&timing_thread.cond, NULL);
489 
490  if (ast_pthread_create_background(&timing_thread.thread, NULL, do_timing, NULL)) {
491  ast_log(LOG_ERROR, "Unable to start timing thread.\n");
492  return -1;
493  }
494 
495  return 0;
496 }
497 
498 static int load_module(void)
499 {
500  if (!(pthread_timers = ao2_container_alloc(PTHREAD_TIMER_BUCKETS,
503  }
504 
505  if (init_timing_thread()) {
506  ao2_ref(pthread_timers, -1);
507  pthread_timers = NULL;
509  }
510 
511  return (timing_funcs_handle = ast_register_timing_interface(&pthread_timing)) ?
513 }
514 
515 static int unload_module(void)
516 {
517  int res;
518 
520  timing_thread.stop = 1;
523  pthread_join(timing_thread.thread, NULL);
524 
526  ao2_ref(pthread_timers, -1);
527  pthread_timers = NULL;
528  }
529 
530  return res;
531 }
532 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_LOAD_ORDER, "pthread Timing Interface",
533  .load = load_module,
534  .unload = unload_module,
535  .load_pri = AST_MODPRI_TIMING,
536  );
const char * name
Definition: timing.h:70
Timing module interface.
Definition: timing.h:69
pthread_t thread
Definition: app_meetme.c:962
static struct pthread_timer * find_timer(int handle, int unlinkobj)
static void pthread_timer_close(int handle)
static struct ast_timing_interface pthread_timing
Asterisk locking-related definitions:
Asterisk main include file. File version handling, generic pbx functions.
#define ao2_link(arg1, arg2)
Definition: astobj2.h:785
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
Definition: astobj2.c:470
#define ARRAY_LEN(a)
Definition: isdn_lib.c:42
static void ack_ticks(struct pthread_timer *timer, unsigned int num)
unsigned int rate
struct timeval start
static struct ao2_container * pthread_timers
static int check_timer(struct pthread_timer *timer)
Time-related functions and macros.
int ast_unregister_timing_interface(void *handle)
Unregister a previously registered timing interface.
Definition: timing.c:105
static int pthread_timer_ack(int handle, unsigned int quantity)
unsigned int tick_count
unsigned int pending_ticks
ast_timer_event
Definition: timing.h:57
static void pthread_timer_destructor(void *obj)
unsigned int interval
#define ao2_callback(c, flags, cb_fn, arg)
Definition: astobj2.h:910
#define PTHREAD_TIMER_BUCKETS
unsigned int stop
Definition: app_meetme.c:969
#define ast_cond_wait(cond, mutex)
Definition: lock.h:171
#define ast_cond_init(cond, attr)
Definition: lock.h:167
static struct @352 timing_thread
Data for the timing thread.
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:142
#define ast_assert(a)
Definition: utils.h:738
#define ast_mutex_lock(a)
Definition: lock.h:155
#define ao2_unlock(a)
Definition: astobj2.h:497
static int pthread_timer_open(void)
float roundf(float x)
#define AST_MODULE_INFO(keystr, flags_to_set, desc, fields...)
Definition: module.h:374
int64_t ast_tvdiff_ms(struct timeval end, struct timeval start)
Computes the difference (in milliseconds) between two struct timeval instances.
Definition: time.h:90
static int unload_module(void)
static void signal_pipe(struct pthread_timer *timer)
#define ast_cond_signal(cond)
Definition: lock.h:169
Utility functions.
pthread_cond_t ast_cond_t
Definition: lock.h:144
#define ast_pthread_create_background(a, b, c, d)
Definition: utils.h:426
static int run_timer(void *obj, void *arg, int flags)
static unsigned int pthread_timer_get_max_rate(int handle)
static int pthread_timer_disable_continuous(int handle)
ast_mutex_t lock
Definition: app_meetme.c:964
ast_cond_t cond
Definition: app_meetme.c:963
#define ao2_ref(o, delta)
Definition: astobj2.h:472
#define ao2_lock(a)
Definition: astobj2.h:488
static int pthread_timer_cmp(void *obj, void *arg, int flags)
static void * do_timing(void *arg)
static int load_module(void)
enum pthread_timer_state state
#define LOG_ERROR
Definition: logger.h:155
struct timeval ast_tvadd(struct timeval a, struct timeval b)
Returns the sum of two timevals a + b.
Definition: utils.c:1587
void ast_log(int level, const char *file, int line, const char *function, const char *fmt,...)
Used for sending a log message This is the standard logger function. Probably the only way you will i...
Definition: logger.c:1207
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:430
#define ao2_find(arg1, arg2, arg3)
Definition: astobj2.h:964
int errno
static void unsignal_pipe(struct pthread_timer *timer)
#define MAX_RATE
static enum ast_timer_event pthread_timer_get_event(int handle)
#define ao2_container_alloc(arg1, arg2, arg3)
Definition: astobj2.h:734
struct timeval ast_tv(ast_time_t sec, ast_suseconds_t usec)
Returns a timeval from sec, usec.
Definition: time.h:179
static void * timing_funcs_handle
#define ast_register_timing_interface(i)
Register a set of timing functions.
Definition: timing.h:94
static int init_timing_thread(void)
static int pthread_timer_set_rate(int handle, unsigned int rate)
#define ast_mutex_init(pmutex)
Definition: lock.h:152
#define ASTERISK_GPL_KEY
The text the key() function should return.
Definition: module.h:38
static struct ast_timer * timer
Definition: chan_iax2.c:313
Asterisk module definitions.
static int pthread_timer_enable_continuous(int handle)
#define ast_cond_timedwait(cond, mutex, time)
Definition: lock.h:172
Timing source management.
Structure for mutex and tracking information.
Definition: lock.h:121
#define ASTERISK_FILE_VERSION(file, version)
Register/unregister a source code file with the core.
Definition: asterisk.h:180
#define ast_mutex_unlock(a)
Definition: lock.h:156
pthread_timer_state
static int pthread_timer_hash(const void *obj, const int flags)