Wed Jan 8 2020 09:49:42

Asterisk developer's documentation


bridge_multiplexed.c
Go to the documentation of this file.
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2008, Digium, Inc.
5  *
6  * Joshua Colp <jcolp@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18 
19 /*! \file
20  *
21  * \brief Two channel bridging module which groups bridges into batches of threads
22  *
23  * \author Joshua Colp <jcolp@digium.com>
24  *
25  * \ingroup bridges
26  */
27 
28 /*** MODULEINFO
29  <support_level>core</support_level>
30  ***/
31 
32 #include "asterisk.h"
33 
34 ASTERISK_FILE_VERSION(__FILE__, "$Revision: 361403 $")
35 
36 #include <stdio.h>
37 #include <stdlib.h>
38 #include <string.h>
39 #include <sys/types.h>
40 #include <sys/stat.h>
41 #include <fcntl.h>
42 
43 #include "asterisk/module.h"
44 #include "asterisk/channel.h"
45 #include "asterisk/bridging.h"
47 #include "asterisk/frame.h"
48 #include "asterisk/astobj2.h"
49 
50 /*! \brief Number of buckets our multiplexed thread container can have */
51 #define MULTIPLEXED_BUCKETS 53
52 
53 /*! \brief Number of channels we handle in a single thread */
54 #define MULTIPLEXED_MAX_CHANNELS 8
55 
56 /*! \brief Structure which represents a single thread handling multiple 2 channel bridges */
58  /*! Thread itself */
59  pthread_t thread;
60  /*! Pipe used to wake up the multiplexed thread */
61  int pipe[2];
62  /*! Channels in this thread */
64  /*! Number of channels in this thread */
65  unsigned int count;
66  /*! Bit used to indicate that the thread is waiting on channels */
67  unsigned int waiting:1;
68  /*! Number of channels actually being serviced by this thread */
69  unsigned int service_count;
70 };
71 
72 /*! \brief Container of all operating multiplexed threads */
74 
75 /*! \brief Callback function for finding a free multiplexed thread */
76 static int find_multiplexed_thread(void *obj, void *arg, int flags)
77 {
79  return (multiplexed_thread->count <= (MULTIPLEXED_MAX_CHANNELS - 2)) ? CMP_MATCH | CMP_STOP : 0;
80 }
81 
82 /*! \brief Destroy callback for a multiplexed thread structure */
83 static void destroy_multiplexed_thread(void *obj)
84 {
86 
87  if (multiplexed_thread->pipe[0] > -1) {
88  close(multiplexed_thread->pipe[0]);
89  }
90  if (multiplexed_thread->pipe[1] > -1) {
91  close(multiplexed_thread->pipe[1]);
92  }
93 
94  return;
95 }
96 
97 /*! \brief Create function which finds/reserves/references a multiplexed thread structure */
98 static int multiplexed_bridge_create(struct ast_bridge *bridge)
99 {
101 
102  ao2_lock(multiplexed_threads);
103 
104  /* Try to find an existing thread to handle our additional channels */
105  if (!(multiplexed_thread = ao2_callback(multiplexed_threads, 0, find_multiplexed_thread, NULL))) {
106  int flags;
107 
108  /* If we failed we will have to create a new one from scratch */
109  if (!(multiplexed_thread = ao2_alloc(sizeof(*multiplexed_thread), destroy_multiplexed_thread))) {
110  ast_debug(1, "Failed to find or create a new multiplexed thread for bridge '%p'\n", bridge);
111  ao2_unlock(multiplexed_threads);
112  return -1;
113  }
114 
115  multiplexed_thread->pipe[0] = multiplexed_thread->pipe[1] = -1;
116  /* Setup a pipe so we can poke the thread itself when needed */
117  if (pipe(multiplexed_thread->pipe)) {
118  ast_debug(1, "Failed to create a pipe for poking a multiplexed thread for bridge '%p'\n", bridge);
119  ao2_ref(multiplexed_thread, -1);
120  ao2_unlock(multiplexed_threads);
121  return -1;
122  }
123 
124  /* Setup each pipe for non-blocking operation */
125  flags = fcntl(multiplexed_thread->pipe[0], F_GETFL);
126  if (fcntl(multiplexed_thread->pipe[0], F_SETFL, flags | O_NONBLOCK) < 0) {
127  ast_log(LOG_WARNING, "Failed to setup first nudge pipe for non-blocking operation on %p (%d: %s)\n", bridge, errno, strerror(errno));
128  ao2_ref(multiplexed_thread, -1);
129  ao2_unlock(multiplexed_threads);
130  return -1;
131  }
132  flags = fcntl(multiplexed_thread->pipe[1], F_GETFL);
133  if (fcntl(multiplexed_thread->pipe[1], F_SETFL, flags | O_NONBLOCK) < 0) {
134  ast_log(LOG_WARNING, "Failed to setup second nudge pipe for non-blocking operation on %p (%d: %s)\n", bridge, errno, strerror(errno));
135  ao2_ref(multiplexed_thread, -1);
136  ao2_unlock(multiplexed_threads);
137  return -1;
138  }
139 
140  /* Set up default parameters */
141  multiplexed_thread->thread = AST_PTHREADT_NULL;
142 
143  /* Finally link us into the container so others may find us */
144  ao2_link(multiplexed_threads, multiplexed_thread);
145  ast_debug(1, "Created multiplexed thread '%p' for bridge '%p'\n", multiplexed_thread, bridge);
146  } else {
147  ast_debug(1, "Found multiplexed thread '%p' for bridge '%p'\n", multiplexed_thread, bridge);
148  }
149 
150  /* Bump the count of the thread structure up by two since the channels for this bridge will be joining shortly */
151  multiplexed_thread->count += 2;
152 
153  ao2_unlock(multiplexed_threads);
154 
155  bridge->bridge_pvt = multiplexed_thread;
156 
157  return 0;
158 }
159 
160 /*! \brief Internal function which nudges the thread */
162 {
163  int nudge = 0;
164 
165  if (multiplexed_thread->thread == AST_PTHREADT_NULL) {
166  return;
167  }
168 
169  if (write(multiplexed_thread->pipe[1], &nudge, sizeof(nudge)) != sizeof(nudge)) {
170  ast_log(LOG_ERROR, "We couldn't poke multiplexed thread '%p'... something is VERY wrong\n", multiplexed_thread);
171  }
172 
173  while (multiplexed_thread->waiting) {
174  sched_yield();
175  }
176 
177  return;
178 }
179 
180 /*! \brief Destroy function which unreserves/unreferences/removes a multiplexed thread structure */
181 static int multiplexed_bridge_destroy(struct ast_bridge *bridge)
182 {
184 
185  ao2_lock(multiplexed_threads);
186 
187  multiplexed_thread->count -= 2;
188 
189  if (!multiplexed_thread->count) {
190  ast_debug(1, "Unlinking multiplexed thread '%p' since nobody is using it anymore\n", multiplexed_thread);
191  ao2_unlink(multiplexed_threads, multiplexed_thread);
192  }
193 
194  multiplexed_nudge(multiplexed_thread);
195 
196  ao2_unlock(multiplexed_threads);
197 
198  ao2_ref(multiplexed_thread, -1);
199 
200  return 0;
201 }
202 
203 /*! \brief Thread function that executes for multiplexed threads */
204 static void *multiplexed_thread_function(void *data)
205 {
206  struct multiplexed_thread *multiplexed_thread = data;
207  int fds = multiplexed_thread->pipe[0];
208 
209  ao2_lock(multiplexed_thread);
210 
211  ast_debug(1, "Starting actual thread for multiplexed thread '%p'\n", multiplexed_thread);
212 
213  while (multiplexed_thread->thread != AST_PTHREADT_STOP) {
214  struct ast_channel *winner = NULL, *first = multiplexed_thread->chans[0];
215  int to = -1, outfd = -1;
216 
217  /* Move channels around so not just the first one gets priority */
218  memmove(multiplexed_thread->chans, multiplexed_thread->chans + 1, sizeof(struct ast_channel *) * (multiplexed_thread->service_count - 1));
219  multiplexed_thread->chans[multiplexed_thread->service_count - 1] = first;
220 
221  multiplexed_thread->waiting = 1;
222  ao2_unlock(multiplexed_thread);
223  winner = ast_waitfor_nandfds(multiplexed_thread->chans, multiplexed_thread->service_count, &fds, 1, NULL, &outfd, &to);
224  multiplexed_thread->waiting = 0;
225  ao2_lock(multiplexed_thread);
226  if (multiplexed_thread->thread == AST_PTHREADT_STOP) {
227  break;
228  }
229 
230  if (outfd > -1) {
231  int nudge;
232 
233  if (read(multiplexed_thread->pipe[0], &nudge, sizeof(nudge)) < 0) {
234  if (errno != EINTR && errno != EAGAIN) {
235  ast_log(LOG_WARNING, "read() failed for pipe on multiplexed thread '%p': %s\n", multiplexed_thread, strerror(errno));
236  }
237  }
238  }
239  if (winner && winner->bridge) {
240  struct ast_bridge *bridge = winner->bridge;
241  int stop = 0;
242  ao2_unlock(multiplexed_thread);
243  while ((bridge = winner->bridge) && ao2_trylock(bridge)) {
244  sched_yield();
245  if (multiplexed_thread->thread == AST_PTHREADT_STOP) {
246  stop = 1;
247  break;
248  }
249  }
250  if (!stop && bridge) {
251  ast_bridge_handle_trip(bridge, NULL, winner, -1);
252  ao2_unlock(bridge);
253  }
254  ao2_lock(multiplexed_thread);
255  }
256  }
257 
258  multiplexed_thread->thread = AST_PTHREADT_NULL;
259 
260  ast_debug(1, "Stopping actual thread for multiplexed thread '%p'\n", multiplexed_thread);
261 
262  ao2_unlock(multiplexed_thread);
263  ao2_ref(multiplexed_thread, -1);
264 
265  return NULL;
266 }
267 
268 /*! \brief Helper function which adds or removes a channel and nudges the thread */
270 {
271  int i, removed = 0;
272  pthread_t thread = AST_PTHREADT_NULL;
273 
274  ao2_lock(multiplexed_thread);
275 
276  multiplexed_nudge(multiplexed_thread);
277 
278  for (i = 0; i < MULTIPLEXED_MAX_CHANNELS; i++) {
279  if (multiplexed_thread->chans[i] == chan) {
280  if (!add) {
281  multiplexed_thread->chans[i] = NULL;
282  multiplexed_thread->service_count--;
283  removed = 1;
284  }
285  break;
286  } else if (!multiplexed_thread->chans[i] && add) {
287  multiplexed_thread->chans[i] = chan;
288  multiplexed_thread->service_count++;
289  break;
290  }
291  }
292 
293  if (multiplexed_thread->service_count && multiplexed_thread->thread == AST_PTHREADT_NULL) {
294  ao2_ref(multiplexed_thread, +1);
295  if (ast_pthread_create(&multiplexed_thread->thread, NULL, multiplexed_thread_function, multiplexed_thread)) {
296  ao2_ref(multiplexed_thread, -1);
297  ast_debug(1, "Failed to create an actual thread for multiplexed thread '%p', trying next time\n", multiplexed_thread);
298  }
299  } else if (!multiplexed_thread->service_count && multiplexed_thread->thread != AST_PTHREADT_NULL) {
300  thread = multiplexed_thread->thread;
301  multiplexed_thread->thread = AST_PTHREADT_STOP;
302  } else if (!add && removed) {
303  memmove(multiplexed_thread->chans + i, multiplexed_thread->chans + i + 1, sizeof(struct ast_channel *) * (MULTIPLEXED_MAX_CHANNELS - (i + 1)));
304  }
305 
306  ao2_unlock(multiplexed_thread);
307 
308  if (thread != AST_PTHREADT_NULL) {
309  pthread_join(thread, NULL);
310  }
311 
312  return;
313 }
314 
315 /*! \brief Join function which actually adds the channel into the array to be monitored */
316 static int multiplexed_bridge_join(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel)
317 {
318  struct ast_channel *c0 = AST_LIST_FIRST(&bridge->channels)->chan, *c1 = AST_LIST_LAST(&bridge->channels)->chan;
320 
321  ast_debug(1, "Adding channel '%s' to multiplexed thread '%p' for monitoring\n", bridge_channel->chan->name, multiplexed_thread);
322 
323  multiplexed_add_or_remove(multiplexed_thread, bridge_channel->chan, 1);
324 
325  /* If the second channel has not yet joined do not make things compatible */
326  if (c0 == c1) {
327  return 0;
328  }
329 
330  if (((c0->writeformat == c1->readformat) && (c0->readformat == c1->writeformat) && (c0->nativeformats == c1->nativeformats))) {
331  return 0;
332  }
333 
334  return ast_channel_make_compatible(c0, c1);
335 }
336 
337 /*! \brief Leave function which actually removes the channel from the array */
338 static int multiplexed_bridge_leave(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel)
339 {
341 
342  ast_debug(1, "Removing channel '%s' from multiplexed thread '%p'\n", bridge_channel->chan->name, multiplexed_thread);
343 
344  multiplexed_add_or_remove(multiplexed_thread, bridge_channel->chan, 0);
345 
346  return 0;
347 }
348 
349 /*! \brief Suspend function which means control of the channel is going elsewhere */
350 static void multiplexed_bridge_suspend(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel)
351 {
353 
354  ast_debug(1, "Suspending channel '%s' from multiplexed thread '%p'\n", bridge_channel->chan->name, multiplexed_thread);
355 
356  multiplexed_add_or_remove(multiplexed_thread, bridge_channel->chan, 0);
357 
358  return;
359 }
360 
361 /*! \brief Unsuspend function which means control of the channel is coming back to us */
362 static void multiplexed_bridge_unsuspend(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel)
363 {
365 
366  ast_debug(1, "Unsuspending channel '%s' from multiplexed thread '%p'\n", bridge_channel->chan->name, multiplexed_thread);
367 
368  multiplexed_add_or_remove(multiplexed_thread, bridge_channel->chan, 1);
369 
370  return;
371 }
372 
373 /*! \brief Write function for writing frames into the bridge */
374 static enum ast_bridge_write_result multiplexed_bridge_write(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel, struct ast_frame *frame)
375 {
376  struct ast_bridge_channel *other;
377 
378  if (AST_LIST_FIRST(&bridge->channels) == AST_LIST_LAST(&bridge->channels)) {
380  }
381 
382  if (!(other = (AST_LIST_FIRST(&bridge->channels) == bridge_channel ? AST_LIST_LAST(&bridge->channels) : AST_LIST_FIRST(&bridge->channels)))) {
384  }
385 
386  if (other->state == AST_BRIDGE_CHANNEL_STATE_WAIT) {
387  ast_write(other->chan, frame);
388  }
389 
391 }
392 
394  .name = "multiplexed_bridge",
395  .capabilities = AST_BRIDGE_CAPABILITY_1TO1MIX,
396  .preference = AST_BRIDGE_PREFERENCE_HIGH,
398  .create = multiplexed_bridge_create,
399  .destroy = multiplexed_bridge_destroy,
400  .join = multiplexed_bridge_join,
401  .leave = multiplexed_bridge_leave,
402  .suspend = multiplexed_bridge_suspend,
403  .unsuspend = multiplexed_bridge_unsuspend,
404  .write = multiplexed_bridge_write,
405 };
406 
407 static int unload_module(void)
408 {
409  int res = ast_bridge_technology_unregister(&multiplexed_bridge);
410 
411  ao2_ref(multiplexed_threads, -1);
412 
413  return res;
414 }
415 
416 static int load_module(void)
417 {
418  if (!(multiplexed_threads = ao2_container_alloc(MULTIPLEXED_BUCKETS, NULL, NULL))) {
420  }
421 
422  return ast_bridge_technology_register(&multiplexed_bridge);
423 }
424 
425 AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "Multiplexed two channel bridging module");
static int multiplexed_bridge_destroy(struct ast_bridge *bridge)
Destroy function which unreserves/unreferences/removes a multiplexed thread structure.
static void destroy_multiplexed_thread(void *obj)
Destroy callback for a multiplexed thread structure.
pthread_t thread
Definition: app_meetme.c:962
Main Channel structure associated with a channel.
Definition: channel.h:742
#define AST_MODULE_INFO_STANDARD(keystr, desc)
Definition: module.h:396
#define MULTIPLEXED_BUCKETS
Number of buckets our multiplexed thread container can have.
Asterisk main include file. File version handling, generic pbx functions.
#define ao2_link(arg1, arg2)
Definition: astobj2.h:785
#define AST_LIST_FIRST(head)
Returns the first entry contained in a list.
Definition: linkedlists.h:420
static int multiplexed_bridge_join(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel)
Join function which actually adds the channel into the array to be monitored.
format_t writeformat
Definition: channel.h:854
Structure which represents a single thread handling multiple 2 channel bridges.
#define MULTIPLEXED_MAX_CHANNELS
Number of channels we handle in a single thread.
#define LOG_WARNING
Definition: logger.h:144
#define ao2_callback(c, flags, cb_fn, arg)
Definition: astobj2.h:910
static int multiplexed_bridge_leave(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel)
Leave function which actually removes the channel from the array.
struct ast_channel * chans[MULTIPLEXED_MAX_CHANNELS]
static void multiplexed_bridge_suspend(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel)
Suspend function which means control of the channel is going elsewhere.
static struct ao2_container * multiplexed_threads
Container of all operating multiplexed threads.
unsigned int stop
Definition: app_meetme.c:969
static void multiplexed_bridge_unsuspend(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel)
Unsuspend function which means control of the channel is coming back to us.
#define ao2_unlock(a)
Definition: astobj2.h:497
format_t nativeformats
Definition: channel.h:852
static int multiplexed_bridge_create(struct ast_bridge *bridge)
Create function which finds/reserves/references a multiplexed thread structure.
#define ast_bridge_technology_register(technology)
See __ast_bridge_technology_register()
struct ast_channel * ast_waitfor_nandfds(struct ast_channel **chan, int n, int *fds, int nfds, int *exception, int *outfd, int *ms)
Waits for activity on a group of channels.
Definition: channel.c:3188
static enum ast_bridge_write_result multiplexed_bridge_write(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel, struct ast_frame *frame)
Write function for writing frames into the bridge.
#define ast_debug(level,...)
Log a DEBUG message.
Definition: logger.h:236
General Asterisk PBX channel definitions.
#define AST_PTHREADT_NULL
Definition: lock.h:65
Asterisk internal frame definitions.
Channel Bridging API.
#define AST_FORMAT_TEXT_MASK
Definition: frame.h:297
Channel Bridging API.
#define ao2_ref(o, delta)
Definition: astobj2.h:472
#define ao2_lock(a)
Definition: astobj2.h:488
unsigned int service_count
static int unload_module(void)
int ast_bridge_technology_unregister(struct ast_bridge_technology *technology)
Unregister a bridge technology from use.
Definition: bridging.c:99
void ast_bridge_handle_trip(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel, struct ast_channel *chan, int outfd)
Feed notification that a frame is waiting on a channel into the bridging core.
Definition: bridging.c:277
static int load_module(void)
Structure that contains information about a bridge.
Definition: bridging.h:149
#define LOG_ERROR
Definition: logger.h:155
#define ao2_trylock(a)
Definition: astobj2.h:506
static void * multiplexed_thread_function(void *data)
Thread function that executes for multiplexed threads.
const ast_string_field name
Definition: channel.h:787
struct sla_ringing_trunk * first
Definition: app_meetme.c:965
void ast_log(int level, const char *file, int line, const char *function, const char *fmt,...)
Used for sending a log message This is the standard logger function. Probably the only way you will i...
Definition: logger.c:1207
static struct ast_bridge_technology multiplexed_bridge
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:430
#define AST_LIST_LAST(head)
Returns the last entry contained in a list.
Definition: linkedlists.h:428
struct ast_bridge * bridge
Definition: channel.h:865
#define AST_FORMAT_VIDEO_MASK
Definition: frame.h:290
int errno
#define ast_pthread_create(a, b, c, d)
Definition: utils.h:418
#define AST_FORMAT_AUDIO_MASK
Definition: frame.h:274
int ast_write(struct ast_channel *chan, struct ast_frame *frame)
Write a frame to a channel This function writes the given frame to the indicated channel.
Definition: channel.c:4916
struct ast_channel * chan
Definition: bridging.h:125
Structure that contains information regarding a channel in a bridge.
Definition: bridging.h:117
int ast_channel_make_compatible(struct ast_channel *c0, struct ast_channel *c1)
Makes two channel formats compatible.
Definition: channel.c:5970
format_t readformat
Definition: channel.h:853
#define ao2_container_alloc(arg1, arg2, arg3)
Definition: astobj2.h:734
Structure that is the essence of a bridge technology.
#define AST_PTHREADT_STOP
Definition: lock.h:66
ast_bridge_write_result
Return values for bridge technology write function.
Definition: bridging.h:102
Data structure associated with a single frame of data.
Definition: frame.h:142
static int find_multiplexed_thread(void *obj, void *arg, int flags)
Callback function for finding a free multiplexed thread.
#define ASTERISK_GPL_KEY
The text the key() function should return.
Definition: module.h:38
Asterisk module definitions.
enum ast_bridge_channel_state state
Definition: bridging.h:123
#define ao2_unlink(arg1, arg2)
Definition: astobj2.h:817
void * bridge_pvt
Definition: bridging.h:163
static void multiplexed_nudge(struct multiplexed_thread *multiplexed_thread)
Internal function which nudges the thread.
#define ASTERISK_FILE_VERSION(file, version)
Register/unregister a source code file with the core.
Definition: asterisk.h:180
static void multiplexed_add_or_remove(struct multiplexed_thread *multiplexed_thread, struct ast_channel *chan, int add)
Helper function which adds or removes a channel and nudges the thread.