Two channel bridging module which groups bridges into batches of threads. More...
#include "asterisk.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include "asterisk/module.h"
#include "asterisk/channel.h"
#include "asterisk/bridging.h"
#include "asterisk/bridging_technology.h"
#include "asterisk/frame.h"
#include "asterisk/astobj2.h"
Go to the source code of this file.
Data Structures | |
struct | multiplexed_thread |
Structure which represents a single thread handling multiple 2 channel bridges. More... | |
Defines | |
#define | MULTIPLEXED_BUCKETS 53 |
Number of buckets our multiplexed thread container can have. | |
#define | MULTIPLEXED_MAX_CHANNELS 8 |
Number of channels we handle in a single thread. | |
Functions | |
AST_MODULE_INFO_STANDARD (ASTERISK_GPL_KEY,"Multiplexed two channel bridging module") | |
static void | destroy_multiplexed_thread (void *obj) |
Destroy callback for a multiplexed thread structure. | |
static int | find_multiplexed_thread (void *obj, void *arg, int flags) |
Callback function for finding a free multiplexed thread. | |
static int | load_module (void) |
static void | multiplexed_add_or_remove (struct multiplexed_thread *multiplexed_thread, struct ast_channel *chan, int add) |
Helper function which adds or removes a channel and nudges the thread. | |
static int | multiplexed_bridge_create (struct ast_bridge *bridge) |
Create function which finds/reserves/references a multiplexed thread structure. | |
static int | multiplexed_bridge_destroy (struct ast_bridge *bridge) |
Destroy function which unreserves/unreferences/removes a multiplexed thread structure. | |
static int | multiplexed_bridge_join (struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel) |
Join function which actually adds the channel into the array to be monitored. | |
static int | multiplexed_bridge_leave (struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel) |
Leave function which actually removes the channel from the array. | |
static void | multiplexed_bridge_suspend (struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel) |
Suspend function which means control of the channel is going elsewhere. | |
static void | multiplexed_bridge_unsuspend (struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel) |
Unsuspend function which means control of the channel is coming back to us. | |
static enum ast_bridge_write_result | multiplexed_bridge_write (struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel, struct ast_frame *frame) |
Write function for writing frames into the bridge. | |
static void | multiplexed_nudge (struct multiplexed_thread *multiplexed_thread) |
Internal function which nudges the thread. | |
static void * | multiplexed_thread_function (void *data) |
Thread function that executes for multiplexed threads. | |
static int | unload_module (void) |
Variables | |
static struct ast_bridge_technology | multiplexed_bridge |
static struct ao2_container * | multiplexed_threads |
Container of all operating multiplexed threads. |
Two channel bridging module which groups bridges into batches of threads.
Definition in file bridge_multiplexed.c.
#define MULTIPLEXED_BUCKETS 53 |
Number of buckets our multiplexed thread container can have.
Definition at line 51 of file bridge_multiplexed.c.
Referenced by load_module().
#define MULTIPLEXED_MAX_CHANNELS 8 |
Number of channels we handle in a single thread.
Definition at line 54 of file bridge_multiplexed.c.
Referenced by find_multiplexed_thread(), and multiplexed_add_or_remove().
AST_MODULE_INFO_STANDARD | ( | ASTERISK_GPL_KEY | , | |
"Multiplexed two channel bridging module" | ||||
) |
static void destroy_multiplexed_thread | ( | void * | obj | ) | [static] |
Destroy callback for a multiplexed thread structure.
Definition at line 83 of file bridge_multiplexed.c.
References multiplexed_thread::pipe.
Referenced by multiplexed_bridge_create().
00084 { 00085 struct multiplexed_thread *multiplexed_thread = obj; 00086 00087 if (multiplexed_thread->pipe[0] > -1) { 00088 close(multiplexed_thread->pipe[0]); 00089 } 00090 if (multiplexed_thread->pipe[1] > -1) { 00091 close(multiplexed_thread->pipe[1]); 00092 } 00093 00094 return; 00095 }
static int find_multiplexed_thread | ( | void * | obj, | |
void * | arg, | |||
int | flags | |||
) | [static] |
Callback function for finding a free multiplexed thread.
Definition at line 76 of file bridge_multiplexed.c.
References CMP_MATCH, CMP_STOP, multiplexed_thread::count, and MULTIPLEXED_MAX_CHANNELS.
Referenced by multiplexed_bridge_create().
00077 { 00078 struct multiplexed_thread *multiplexed_thread = obj; 00079 return (multiplexed_thread->count <= (MULTIPLEXED_MAX_CHANNELS - 2)) ? CMP_MATCH | CMP_STOP : 0; 00080 }
static int load_module | ( | void | ) | [static] |
Definition at line 416 of file bridge_multiplexed.c.
References ao2_container_alloc, ast_bridge_technology_register, AST_MODULE_LOAD_DECLINE, and MULTIPLEXED_BUCKETS.
00417 { 00418 if (!(multiplexed_threads = ao2_container_alloc(MULTIPLEXED_BUCKETS, NULL, NULL))) { 00419 return AST_MODULE_LOAD_DECLINE; 00420 } 00421 00422 return ast_bridge_technology_register(&multiplexed_bridge); 00423 }
static void multiplexed_add_or_remove | ( | struct multiplexed_thread * | multiplexed_thread, | |
struct ast_channel * | chan, | |||
int | add | |||
) | [static] |
Helper function which adds or removes a channel and nudges the thread.
Definition at line 269 of file bridge_multiplexed.c.
References ao2_lock, ao2_ref, ao2_unlock, ast_debug, ast_pthread_create, AST_PTHREADT_NULL, AST_PTHREADT_STOP, multiplexed_thread::chans, MULTIPLEXED_MAX_CHANNELS, multiplexed_nudge(), multiplexed_thread_function(), multiplexed_thread::service_count, multiplexed_thread::thread, and thread.
Referenced by multiplexed_bridge_join(), multiplexed_bridge_leave(), multiplexed_bridge_suspend(), and multiplexed_bridge_unsuspend().
00270 { 00271 int i, removed = 0; 00272 pthread_t thread = AST_PTHREADT_NULL; 00273 00274 ao2_lock(multiplexed_thread); 00275 00276 multiplexed_nudge(multiplexed_thread); 00277 00278 for (i = 0; i < MULTIPLEXED_MAX_CHANNELS; i++) { 00279 if (multiplexed_thread->chans[i] == chan) { 00280 if (!add) { 00281 multiplexed_thread->chans[i] = NULL; 00282 multiplexed_thread->service_count--; 00283 removed = 1; 00284 } 00285 break; 00286 } else if (!multiplexed_thread->chans[i] && add) { 00287 multiplexed_thread->chans[i] = chan; 00288 multiplexed_thread->service_count++; 00289 break; 00290 } 00291 } 00292 00293 if (multiplexed_thread->service_count && multiplexed_thread->thread == AST_PTHREADT_NULL) { 00294 ao2_ref(multiplexed_thread, +1); 00295 if (ast_pthread_create(&multiplexed_thread->thread, NULL, multiplexed_thread_function, multiplexed_thread)) { 00296 ao2_ref(multiplexed_thread, -1); 00297 ast_debug(1, "Failed to create an actual thread for multiplexed thread '%p', trying next time\n", multiplexed_thread); 00298 } 00299 } else if (!multiplexed_thread->service_count && multiplexed_thread->thread != AST_PTHREADT_NULL) { 00300 thread = multiplexed_thread->thread; 00301 multiplexed_thread->thread = AST_PTHREADT_STOP; 00302 } else if (!add && removed) { 00303 memmove(multiplexed_thread->chans + i, multiplexed_thread->chans + i + 1, sizeof(struct ast_channel *) * (MULTIPLEXED_MAX_CHANNELS - (i + 1))); 00304 } 00305 00306 ao2_unlock(multiplexed_thread); 00307 00308 if (thread != AST_PTHREADT_NULL) { 00309 pthread_join(thread, NULL); 00310 } 00311 00312 return; 00313 }
static int multiplexed_bridge_create | ( | struct ast_bridge * | bridge | ) | [static] |
Create function which finds/reserves/references a multiplexed thread structure.
Definition at line 98 of file bridge_multiplexed.c.
References ao2_alloc, ao2_callback, ao2_link, ao2_lock, ao2_ref, ao2_unlock, ast_debug, ast_log(), AST_PTHREADT_NULL, ast_bridge::bridge_pvt, multiplexed_thread::count, destroy_multiplexed_thread(), errno, find_multiplexed_thread(), LOG_WARNING, multiplexed_thread::pipe, and multiplexed_thread::thread.
00099 { 00100 struct multiplexed_thread *multiplexed_thread; 00101 00102 ao2_lock(multiplexed_threads); 00103 00104 /* Try to find an existing thread to handle our additional channels */ 00105 if (!(multiplexed_thread = ao2_callback(multiplexed_threads, 0, find_multiplexed_thread, NULL))) { 00106 int flags; 00107 00108 /* If we failed we will have to create a new one from scratch */ 00109 if (!(multiplexed_thread = ao2_alloc(sizeof(*multiplexed_thread), destroy_multiplexed_thread))) { 00110 ast_debug(1, "Failed to find or create a new multiplexed thread for bridge '%p'\n", bridge); 00111 ao2_unlock(multiplexed_threads); 00112 return -1; 00113 } 00114 00115 multiplexed_thread->pipe[0] = multiplexed_thread->pipe[1] = -1; 00116 /* Setup a pipe so we can poke the thread itself when needed */ 00117 if (pipe(multiplexed_thread->pipe)) { 00118 ast_debug(1, "Failed to create a pipe for poking a multiplexed thread for bridge '%p'\n", bridge); 00119 ao2_ref(multiplexed_thread, -1); 00120 ao2_unlock(multiplexed_threads); 00121 return -1; 00122 } 00123 00124 /* Setup each pipe for non-blocking operation */ 00125 flags = fcntl(multiplexed_thread->pipe[0], F_GETFL); 00126 if (fcntl(multiplexed_thread->pipe[0], F_SETFL, flags | O_NONBLOCK) < 0) { 00127 ast_log(LOG_WARNING, "Failed to setup first nudge pipe for non-blocking operation on %p (%d: %s)\n", bridge, errno, strerror(errno)); 00128 ao2_ref(multiplexed_thread, -1); 00129 ao2_unlock(multiplexed_threads); 00130 return -1; 00131 } 00132 flags = fcntl(multiplexed_thread->pipe[1], F_GETFL); 00133 if (fcntl(multiplexed_thread->pipe[1], F_SETFL, flags | O_NONBLOCK) < 0) { 00134 ast_log(LOG_WARNING, "Failed to setup second nudge pipe for non-blocking operation on %p (%d: %s)\n", bridge, errno, strerror(errno)); 00135 ao2_ref(multiplexed_thread, -1); 00136 ao2_unlock(multiplexed_threads); 00137 return -1; 00138 } 00139 00140 /* Set up default parameters */ 00141 multiplexed_thread->thread = AST_PTHREADT_NULL; 00142 00143 /* Finally link us into the container so others may find us */ 00144 ao2_link(multiplexed_threads, multiplexed_thread); 00145 ast_debug(1, "Created multiplexed thread '%p' for bridge '%p'\n", multiplexed_thread, bridge); 00146 } else { 00147 ast_debug(1, "Found multiplexed thread '%p' for bridge '%p'\n", multiplexed_thread, bridge); 00148 } 00149 00150 /* Bump the count of the thread structure up by two since the channels for this bridge will be joining shortly */ 00151 multiplexed_thread->count += 2; 00152 00153 ao2_unlock(multiplexed_threads); 00154 00155 bridge->bridge_pvt = multiplexed_thread; 00156 00157 return 0; 00158 }
static int multiplexed_bridge_destroy | ( | struct ast_bridge * | bridge | ) | [static] |
Destroy function which unreserves/unreferences/removes a multiplexed thread structure.
Definition at line 181 of file bridge_multiplexed.c.
References ao2_lock, ao2_ref, ao2_unlink, ao2_unlock, ast_debug, ast_bridge::bridge_pvt, multiplexed_thread::count, and multiplexed_nudge().
00182 { 00183 struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt; 00184 00185 ao2_lock(multiplexed_threads); 00186 00187 multiplexed_thread->count -= 2; 00188 00189 if (!multiplexed_thread->count) { 00190 ast_debug(1, "Unlinking multiplexed thread '%p' since nobody is using it anymore\n", multiplexed_thread); 00191 ao2_unlink(multiplexed_threads, multiplexed_thread); 00192 } 00193 00194 multiplexed_nudge(multiplexed_thread); 00195 00196 ao2_unlock(multiplexed_threads); 00197 00198 ao2_ref(multiplexed_thread, -1); 00199 00200 return 0; 00201 }
static int multiplexed_bridge_join | ( | struct ast_bridge * | bridge, | |
struct ast_bridge_channel * | bridge_channel | |||
) | [static] |
Join function which actually adds the channel into the array to be monitored.
Definition at line 316 of file bridge_multiplexed.c.
References ast_channel_make_compatible(), ast_debug, AST_LIST_FIRST, AST_LIST_LAST, ast_bridge::bridge_pvt, ast_bridge_channel::chan, multiplexed_add_or_remove(), ast_channel::nativeformats, ast_channel::readformat, and ast_channel::writeformat.
00317 { 00318 struct ast_channel *c0 = AST_LIST_FIRST(&bridge->channels)->chan, *c1 = AST_LIST_LAST(&bridge->channels)->chan; 00319 struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt; 00320 00321 ast_debug(1, "Adding channel '%s' to multiplexed thread '%p' for monitoring\n", bridge_channel->chan->name, multiplexed_thread); 00322 00323 multiplexed_add_or_remove(multiplexed_thread, bridge_channel->chan, 1); 00324 00325 /* If the second channel has not yet joined do not make things compatible */ 00326 if (c0 == c1) { 00327 return 0; 00328 } 00329 00330 if (((c0->writeformat == c1->readformat) && (c0->readformat == c1->writeformat) && (c0->nativeformats == c1->nativeformats))) { 00331 return 0; 00332 } 00333 00334 return ast_channel_make_compatible(c0, c1); 00335 }
static int multiplexed_bridge_leave | ( | struct ast_bridge * | bridge, | |
struct ast_bridge_channel * | bridge_channel | |||
) | [static] |
Leave function which actually removes the channel from the array.
Definition at line 338 of file bridge_multiplexed.c.
References ast_debug, ast_bridge::bridge_pvt, ast_bridge_channel::chan, and multiplexed_add_or_remove().
00339 { 00340 struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt; 00341 00342 ast_debug(1, "Removing channel '%s' from multiplexed thread '%p'\n", bridge_channel->chan->name, multiplexed_thread); 00343 00344 multiplexed_add_or_remove(multiplexed_thread, bridge_channel->chan, 0); 00345 00346 return 0; 00347 }
static void multiplexed_bridge_suspend | ( | struct ast_bridge * | bridge, | |
struct ast_bridge_channel * | bridge_channel | |||
) | [static] |
Suspend function which means control of the channel is going elsewhere.
Definition at line 350 of file bridge_multiplexed.c.
References ast_debug, ast_bridge::bridge_pvt, ast_bridge_channel::chan, and multiplexed_add_or_remove().
00351 { 00352 struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt; 00353 00354 ast_debug(1, "Suspending channel '%s' from multiplexed thread '%p'\n", bridge_channel->chan->name, multiplexed_thread); 00355 00356 multiplexed_add_or_remove(multiplexed_thread, bridge_channel->chan, 0); 00357 00358 return; 00359 }
static void multiplexed_bridge_unsuspend | ( | struct ast_bridge * | bridge, | |
struct ast_bridge_channel * | bridge_channel | |||
) | [static] |
Unsuspend function which means control of the channel is coming back to us.
Definition at line 362 of file bridge_multiplexed.c.
References ast_debug, ast_bridge::bridge_pvt, ast_bridge_channel::chan, and multiplexed_add_or_remove().
00363 { 00364 struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt; 00365 00366 ast_debug(1, "Unsuspending channel '%s' from multiplexed thread '%p'\n", bridge_channel->chan->name, multiplexed_thread); 00367 00368 multiplexed_add_or_remove(multiplexed_thread, bridge_channel->chan, 1); 00369 00370 return; 00371 }
static enum ast_bridge_write_result multiplexed_bridge_write | ( | struct ast_bridge * | bridge, | |
struct ast_bridge_channel * | bridge_channel, | |||
struct ast_frame * | frame | |||
) | [static] |
Write function for writing frames into the bridge.
Definition at line 374 of file bridge_multiplexed.c.
References AST_BRIDGE_CHANNEL_STATE_WAIT, AST_BRIDGE_WRITE_FAILED, AST_BRIDGE_WRITE_SUCCESS, AST_LIST_FIRST, AST_LIST_LAST, ast_write(), ast_bridge_channel::chan, and ast_bridge_channel::state.
00375 { 00376 struct ast_bridge_channel *other; 00377 00378 if (AST_LIST_FIRST(&bridge->channels) == AST_LIST_LAST(&bridge->channels)) { 00379 return AST_BRIDGE_WRITE_FAILED; 00380 } 00381 00382 if (!(other = (AST_LIST_FIRST(&bridge->channels) == bridge_channel ? AST_LIST_LAST(&bridge->channels) : AST_LIST_FIRST(&bridge->channels)))) { 00383 return AST_BRIDGE_WRITE_FAILED; 00384 } 00385 00386 if (other->state == AST_BRIDGE_CHANNEL_STATE_WAIT) { 00387 ast_write(other->chan, frame); 00388 } 00389 00390 return AST_BRIDGE_WRITE_SUCCESS; 00391 }
static void multiplexed_nudge | ( | struct multiplexed_thread * | multiplexed_thread | ) | [static] |
Internal function which nudges the thread.
Definition at line 161 of file bridge_multiplexed.c.
References ast_log(), AST_PTHREADT_NULL, LOG_ERROR, multiplexed_thread::pipe, multiplexed_thread::thread, and multiplexed_thread::waiting.
Referenced by multiplexed_add_or_remove(), and multiplexed_bridge_destroy().
00162 { 00163 int nudge = 0; 00164 00165 if (multiplexed_thread->thread == AST_PTHREADT_NULL) { 00166 return; 00167 } 00168 00169 if (write(multiplexed_thread->pipe[1], &nudge, sizeof(nudge)) != sizeof(nudge)) { 00170 ast_log(LOG_ERROR, "We couldn't poke multiplexed thread '%p'... something is VERY wrong\n", multiplexed_thread); 00171 } 00172 00173 while (multiplexed_thread->waiting) { 00174 sched_yield(); 00175 } 00176 00177 return; 00178 }
static void* multiplexed_thread_function | ( | void * | data | ) | [static] |
Thread function that executes for multiplexed threads.
Definition at line 204 of file bridge_multiplexed.c.
References ao2_lock, ao2_ref, ao2_trylock, ao2_unlock, ast_bridge_handle_trip(), ast_debug, ast_log(), AST_PTHREADT_NULL, AST_PTHREADT_STOP, ast_waitfor_nandfds(), ast_channel::bridge, multiplexed_thread::chans, errno, first, LOG_WARNING, multiplexed_thread::pipe, multiplexed_thread::service_count, stop, multiplexed_thread::thread, and multiplexed_thread::waiting.
Referenced by multiplexed_add_or_remove().
00205 { 00206 struct multiplexed_thread *multiplexed_thread = data; 00207 int fds = multiplexed_thread->pipe[0]; 00208 00209 ao2_lock(multiplexed_thread); 00210 00211 ast_debug(1, "Starting actual thread for multiplexed thread '%p'\n", multiplexed_thread); 00212 00213 while (multiplexed_thread->thread != AST_PTHREADT_STOP) { 00214 struct ast_channel *winner = NULL, *first = multiplexed_thread->chans[0]; 00215 int to = -1, outfd = -1; 00216 00217 /* Move channels around so not just the first one gets priority */ 00218 memmove(multiplexed_thread->chans, multiplexed_thread->chans + 1, sizeof(struct ast_channel *) * (multiplexed_thread->service_count - 1)); 00219 multiplexed_thread->chans[multiplexed_thread->service_count - 1] = first; 00220 00221 multiplexed_thread->waiting = 1; 00222 ao2_unlock(multiplexed_thread); 00223 winner = ast_waitfor_nandfds(multiplexed_thread->chans, multiplexed_thread->service_count, &fds, 1, NULL, &outfd, &to); 00224 multiplexed_thread->waiting = 0; 00225 ao2_lock(multiplexed_thread); 00226 if (multiplexed_thread->thread == AST_PTHREADT_STOP) { 00227 break; 00228 } 00229 00230 if (outfd > -1) { 00231 int nudge; 00232 00233 if (read(multiplexed_thread->pipe[0], &nudge, sizeof(nudge)) < 0) { 00234 if (errno != EINTR && errno != EAGAIN) { 00235 ast_log(LOG_WARNING, "read() failed for pipe on multiplexed thread '%p': %s\n", multiplexed_thread, strerror(errno)); 00236 } 00237 } 00238 } 00239 if (winner && winner->bridge) { 00240 struct ast_bridge *bridge = winner->bridge; 00241 int stop = 0; 00242 ao2_unlock(multiplexed_thread); 00243 while ((bridge = winner->bridge) && ao2_trylock(bridge)) { 00244 sched_yield(); 00245 if (multiplexed_thread->thread == AST_PTHREADT_STOP) { 00246 stop = 1; 00247 break; 00248 } 00249 } 00250 if (!stop && bridge) { 00251 ast_bridge_handle_trip(bridge, NULL, winner, -1); 00252 ao2_unlock(bridge); 00253 } 00254 ao2_lock(multiplexed_thread); 00255 } 00256 } 00257 00258 multiplexed_thread->thread = AST_PTHREADT_NULL; 00259 00260 ast_debug(1, "Stopping actual thread for multiplexed thread '%p'\n", multiplexed_thread); 00261 00262 ao2_unlock(multiplexed_thread); 00263 ao2_ref(multiplexed_thread, -1); 00264 00265 return NULL; 00266 }
static int unload_module | ( | void | ) | [static] |
Definition at line 407 of file bridge_multiplexed.c.
References ao2_ref, and ast_bridge_technology_unregister().
00408 { 00409 int res = ast_bridge_technology_unregister(&multiplexed_bridge); 00410 00411 ao2_ref(multiplexed_threads, -1); 00412 00413 return res; 00414 }
struct ast_bridge_technology multiplexed_bridge [static] |
Definition at line 393 of file bridge_multiplexed.c.
struct ao2_container* multiplexed_threads [static] |
Container of all operating multiplexed threads.
Definition at line 73 of file bridge_multiplexed.c.