#include "asterisk.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include "asterisk/module.h"
#include "asterisk/channel.h"
#include "asterisk/bridging.h"
#include "asterisk/bridging_technology.h"
#include "asterisk/frame.h"
#include "asterisk/astobj2.h"
Go to the source code of this file.
Data Structures | |
struct | multiplexed_thread |
Structure which represents a single thread handling multiple 2 channel bridges. More... | |
Defines | |
#define | MULTIPLEXED_BUCKETS 53 |
Number of buckets our multiplexed thread container can have. | |
#define | MULTIPLEXED_MAX_CHANNELS 8 |
Number of channels we handle in a single thread. | |
Functions | |
static void | __reg_module (void) |
static void | __unreg_module (void) |
static void | destroy_multiplexed_thread (void *obj) |
Destroy callback for a multiplexed thread structure. | |
static int | find_multiplexed_thread (void *obj, void *arg, int flags) |
Callback function for finding a free multiplexed thread. | |
static int | load_module (void) |
static void | multiplexed_add_or_remove (struct multiplexed_thread *multiplexed_thread, struct ast_channel *chan, int add) |
Helper function which adds or removes a channel and nudges the thread. | |
static int | multiplexed_bridge_create (struct ast_bridge *bridge) |
Create function which finds/reserves/references a multiplexed thread structure. | |
static int | multiplexed_bridge_destroy (struct ast_bridge *bridge) |
Destroy function which unreserves/unreferences/removes a multiplexed thread structure. | |
static int | multiplexed_bridge_join (struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel) |
Join function which actually adds the channel into the array to be monitored. | |
static int | multiplexed_bridge_leave (struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel) |
Leave function which actually removes the channel from the array. | |
static void | multiplexed_bridge_suspend (struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel) |
Suspend function which means control of the channel is going elsewhere. | |
static void | multiplexed_bridge_unsuspend (struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel) |
Unsuspend function which means control of the channel is coming back to us. | |
static enum ast_bridge_write_result | multiplexed_bridge_write (struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel, struct ast_frame *frame) |
Write function for writing frames into the bridge. | |
static void | multiplexed_nudge (struct multiplexed_thread *multiplexed_thread) |
Internal function which nudges the thread. | |
static void * | multiplexed_thread_function (void *data) |
Thread function that executes for multiplexed threads. | |
static int | unload_module (void) |
Variables | |
static struct ast_module_info | __mod_info = { .name = AST_MODULE, .flags = AST_MODFLAG_LOAD_ORDER , .description = "Multiplexed two channel bridging module" , .key = "This paragraph is copyright (c) 2006 by Digium, Inc. \In order for your module to load, it must return this \key via a function called \"key\". Any code which \includes this paragraph must be licensed under the GNU \General Public License version 2 or later (at your \option). In addition to Digium's general reservations \of rights, Digium expressly reserves the right to \allow other parties to license this paragraph under \different terms. Any use of Digium, Inc. trademarks or \logos (including \"Asterisk\" or \"Digium\") without \express written permission of Digium, Inc. is prohibited.\n" , .buildopt_sum = "8586c2a7d357cb591cc3a6607a8f62d1" , .load = load_module, .unload = unload_module, .load_pri = AST_MODPRI_DEFAULT, } |
static struct ast_module_info * | ast_module_info = &__mod_info |
static struct ast_bridge_technology | multiplexed_bridge |
static struct ao2_container * | multiplexed_threads |
Container of all operating multiplexed threads. |
Definition in file bridge_multiplexed.c.
#define MULTIPLEXED_BUCKETS 53 |
Number of buckets our multiplexed thread container can have.
Definition at line 47 of file bridge_multiplexed.c.
Referenced by load_module().
#define MULTIPLEXED_MAX_CHANNELS 8 |
Number of channels we handle in a single thread.
Definition at line 50 of file bridge_multiplexed.c.
Referenced by find_multiplexed_thread(), and multiplexed_add_or_remove().
static void __reg_module | ( | void | ) | [static] |
Definition at line 404 of file bridge_multiplexed.c.
static void __unreg_module | ( | void | ) | [static] |
Definition at line 404 of file bridge_multiplexed.c.
static void destroy_multiplexed_thread | ( | void * | obj | ) | [static] |
Destroy callback for a multiplexed thread structure.
Definition at line 79 of file bridge_multiplexed.c.
References multiplexed_thread::pipe.
Referenced by multiplexed_bridge_create().
00080 { 00081 struct multiplexed_thread *multiplexed_thread = obj; 00082 00083 if (multiplexed_thread->pipe[0] > -1) { 00084 close(multiplexed_thread->pipe[0]); 00085 } 00086 if (multiplexed_thread->pipe[1] > -1) { 00087 close(multiplexed_thread->pipe[1]); 00088 } 00089 00090 return; 00091 }
static int find_multiplexed_thread | ( | void * | obj, | |
void * | arg, | |||
int | flags | |||
) | [static] |
Callback function for finding a free multiplexed thread.
Definition at line 72 of file bridge_multiplexed.c.
References CMP_MATCH, CMP_STOP, multiplexed_thread::count, and MULTIPLEXED_MAX_CHANNELS.
Referenced by multiplexed_bridge_create().
00073 { 00074 struct multiplexed_thread *multiplexed_thread = obj; 00075 return (multiplexed_thread->count <= (MULTIPLEXED_MAX_CHANNELS - 2)) ? CMP_MATCH | CMP_STOP : 0; 00076 }
static int load_module | ( | void | ) | [static] |
Definition at line 395 of file bridge_multiplexed.c.
References ao2_container_alloc, ast_bridge_technology_register, AST_MODULE_LOAD_DECLINE, multiplexed_bridge, MULTIPLEXED_BUCKETS, and multiplexed_threads.
00396 { 00397 if (!(multiplexed_threads = ao2_container_alloc(MULTIPLEXED_BUCKETS, NULL, NULL))) { 00398 return AST_MODULE_LOAD_DECLINE; 00399 } 00400 00401 return ast_bridge_technology_register(&multiplexed_bridge); 00402 }
static void multiplexed_add_or_remove | ( | struct multiplexed_thread * | multiplexed_thread, | |
struct ast_channel * | chan, | |||
int | add | |||
) | [static] |
Helper function which adds or removes a channel and nudges the thread.
Definition at line 248 of file bridge_multiplexed.c.
References ao2_lock, AST_PTHREADT_NULL, multiplexed_thread::chans, MULTIPLEXED_MAX_CHANNELS, multiplexed_nudge(), multiplexed_thread::service_count, and thread.
Referenced by multiplexed_bridge_join(), multiplexed_bridge_leave(), multiplexed_bridge_suspend(), and multiplexed_bridge_unsuspend().
00249 { 00250 int i, removed = 0; 00251 pthread_t thread = AST_PTHREADT_NULL; 00252 00253 ao2_lock(multiplexed_thread); 00254 00255 multiplexed_nudge(multiplexed_thread); 00256 00257 for (i = 0; i < MULTIPLEXED_MAX_CHANNELS; i++) { 00258 if (multiplexed_thread->chans[i] == chan) { 00259 if (!add) { 00260 multiplexed_thread->chans[i] = NULL; 00261 multiplexed_thread->service_count--; 00262 removed = 1; 00263 } 00264 break; 00265 } else if (!multiplexed_thread->chans[i] && add) { 00266 multiplexed_thread->chans[i] = chan; 00267 multiplexed_thread->service_count++; 00268 break; 00269 } 00270 } 00271 00272 if (multiplexed_thread->service_count && multiplexed_thread->thread == AST_PTHREADT_NULL) { 00273 ao2_ref(multiplexed_thread, +1); 00274 if (ast_pthread_create(&multiplexed_thread->thread, NULL, multiplexed_thread_function, multiplexed_thread)) { 00275 ao2_ref(multiplexed_thread, -1); 00276 ast_debug(1, "Failed to create an actual thread for multiplexed thread '%p', trying next time\n", multiplexed_thread); 00277 } 00278 } else if (!multiplexed_thread->service_count && multiplexed_thread->thread != AST_PTHREADT_NULL) { 00279 thread = multiplexed_thread->thread; 00280 multiplexed_thread->thread = AST_PTHREADT_STOP; 00281 } else if (!add && removed) { 00282 memmove(multiplexed_thread->chans + i, multiplexed_thread->chans + i + 1, sizeof(struct ast_channel *) * (MULTIPLEXED_MAX_CHANNELS - (i + 1))); 00283 } 00284 00285 ao2_unlock(multiplexed_thread); 00286 00287 if (thread != AST_PTHREADT_NULL) { 00288 pthread_join(thread, NULL); 00289 } 00290 00291 return; 00292 }
static int multiplexed_bridge_create | ( | struct ast_bridge * | bridge | ) | [static] |
Create function which finds/reserves/references a multiplexed thread structure.
Definition at line 94 of file bridge_multiplexed.c.
References ao2_alloc, ao2_callback, ao2_link, ao2_lock, ao2_ref, ao2_unlock, ast_debug, ast_log(), AST_PTHREADT_NULL, ast_bridge::bridge_pvt, multiplexed_thread::count, destroy_multiplexed_thread(), errno, find_multiplexed_thread(), LOG_WARNING, multiplexed_threads, and multiplexed_thread::pipe.
00095 { 00096 struct multiplexed_thread *multiplexed_thread; 00097 00098 ao2_lock(multiplexed_threads); 00099 00100 /* Try to find an existing thread to handle our additional channels */ 00101 if (!(multiplexed_thread = ao2_callback(multiplexed_threads, 0, find_multiplexed_thread, NULL))) { 00102 int flags; 00103 00104 /* If we failed we will have to create a new one from scratch */ 00105 if (!(multiplexed_thread = ao2_alloc(sizeof(*multiplexed_thread), destroy_multiplexed_thread))) { 00106 ast_debug(1, "Failed to find or create a new multiplexed thread for bridge '%p'\n", bridge); 00107 ao2_unlock(multiplexed_threads); 00108 return -1; 00109 } 00110 00111 multiplexed_thread->pipe[0] = multiplexed_thread->pipe[1] = -1; 00112 /* Setup a pipe so we can poke the thread itself when needed */ 00113 if (pipe(multiplexed_thread->pipe)) { 00114 ast_debug(1, "Failed to create a pipe for poking a multiplexed thread for bridge '%p'\n", bridge); 00115 ao2_ref(multiplexed_thread, -1); 00116 ao2_unlock(multiplexed_threads); 00117 return -1; 00118 } 00119 00120 /* Setup each pipe for non-blocking operation */ 00121 flags = fcntl(multiplexed_thread->pipe[0], F_GETFL); 00122 if (fcntl(multiplexed_thread->pipe[0], F_SETFL, flags | O_NONBLOCK) < 0) { 00123 ast_log(LOG_WARNING, "Failed to setup first nudge pipe for non-blocking operation on %p (%d: %s)\n", bridge, errno, strerror(errno)); 00124 ao2_ref(multiplexed_thread, -1); 00125 ao2_unlock(multiplexed_threads); 00126 return -1; 00127 } 00128 flags = fcntl(multiplexed_thread->pipe[1], F_GETFL); 00129 if (fcntl(multiplexed_thread->pipe[1], F_SETFL, flags | O_NONBLOCK) < 0) { 00130 ast_log(LOG_WARNING, "Failed to setup second nudge pipe for non-blocking operation on %p (%d: %s)\n", bridge, errno, strerror(errno)); 00131 ao2_ref(multiplexed_thread, -1); 00132 ao2_unlock(multiplexed_threads); 00133 return -1; 00134 } 00135 00136 /* Set up default parameters */ 00137 multiplexed_thread->thread = AST_PTHREADT_NULL; 00138 00139 /* Finally link us into the container so others may find us */ 00140 ao2_link(multiplexed_threads, multiplexed_thread); 00141 ast_debug(1, "Created multiplexed thread '%p' for bridge '%p'\n", multiplexed_thread, bridge); 00142 } else { 00143 ast_debug(1, "Found multiplexed thread '%p' for bridge '%p'\n", multiplexed_thread, bridge); 00144 } 00145 00146 /* Bump the count of the thread structure up by two since the channels for this bridge will be joining shortly */ 00147 multiplexed_thread->count += 2; 00148 00149 ao2_unlock(multiplexed_threads); 00150 00151 bridge->bridge_pvt = multiplexed_thread; 00152 00153 return 0; 00154 }
static int multiplexed_bridge_destroy | ( | struct ast_bridge * | bridge | ) | [static] |
Destroy function which unreserves/unreferences/removes a multiplexed thread structure.
Definition at line 177 of file bridge_multiplexed.c.
References ao2_lock, ao2_ref, ao2_unlink, ao2_unlock, ast_debug, ast_bridge::bridge_pvt, multiplexed_thread::count, multiplexed_nudge(), and multiplexed_threads.
00178 { 00179 struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt; 00180 00181 ao2_lock(multiplexed_threads); 00182 00183 multiplexed_thread->count -= 2; 00184 00185 if (!multiplexed_thread->count) { 00186 ast_debug(1, "Unlinking multiplexed thread '%p' since nobody is using it anymore\n", multiplexed_thread); 00187 ao2_unlink(multiplexed_threads, multiplexed_thread); 00188 } 00189 00190 multiplexed_nudge(multiplexed_thread); 00191 00192 ao2_unlock(multiplexed_threads); 00193 00194 ao2_ref(multiplexed_thread, -1); 00195 00196 return 0; 00197 }
static int multiplexed_bridge_join | ( | struct ast_bridge * | bridge, | |
struct ast_bridge_channel * | bridge_channel | |||
) | [static] |
Join function which actually adds the channel into the array to be monitored.
Definition at line 295 of file bridge_multiplexed.c.
References ast_channel_make_compatible(), ast_debug, AST_LIST_FIRST, AST_LIST_LAST, ast_channel::bridge, ast_bridge::bridge_pvt, ast_bridge_channel::chan, multiplexed_add_or_remove(), ast_channel::name, ast_channel::nativeformats, ast_channel::readformat, and ast_channel::writeformat.
00296 { 00297 struct ast_channel *c0 = AST_LIST_FIRST(&bridge->channels)->chan, *c1 = AST_LIST_LAST(&bridge->channels)->chan; 00298 struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt; 00299 00300 ast_debug(1, "Adding channel '%s' to multiplexed thread '%p' for monitoring\n", bridge_channel->chan->name, multiplexed_thread); 00301 00302 multiplexed_add_or_remove(multiplexed_thread, bridge_channel->chan, 1); 00303 00304 /* If the second channel has not yet joined do not make things compatible */ 00305 if (c0 == c1) { 00306 return 0; 00307 } 00308 00309 if (((c0->writeformat == c1->readformat) && (c0->readformat == c1->writeformat) && (c0->nativeformats == c1->nativeformats))) { 00310 return 0; 00311 } 00312 00313 return ast_channel_make_compatible(c0, c1); 00314 }
static int multiplexed_bridge_leave | ( | struct ast_bridge * | bridge, | |
struct ast_bridge_channel * | bridge_channel | |||
) | [static] |
Leave function which actually removes the channel from the array.
Definition at line 317 of file bridge_multiplexed.c.
References ast_debug, ast_bridge::bridge_pvt, ast_bridge_channel::chan, multiplexed_add_or_remove(), and ast_channel::name.
00318 { 00319 struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt; 00320 00321 ast_debug(1, "Removing channel '%s' from multiplexed thread '%p'\n", bridge_channel->chan->name, multiplexed_thread); 00322 00323 multiplexed_add_or_remove(multiplexed_thread, bridge_channel->chan, 0); 00324 00325 return 0; 00326 }
static void multiplexed_bridge_suspend | ( | struct ast_bridge * | bridge, | |
struct ast_bridge_channel * | bridge_channel | |||
) | [static] |
Suspend function which means control of the channel is going elsewhere.
Definition at line 329 of file bridge_multiplexed.c.
References ast_debug, ast_bridge::bridge_pvt, ast_bridge_channel::chan, multiplexed_add_or_remove(), and ast_channel::name.
00330 { 00331 struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt; 00332 00333 ast_debug(1, "Suspending channel '%s' from multiplexed thread '%p'\n", bridge_channel->chan->name, multiplexed_thread); 00334 00335 multiplexed_add_or_remove(multiplexed_thread, bridge_channel->chan, 0); 00336 00337 return; 00338 }
static void multiplexed_bridge_unsuspend | ( | struct ast_bridge * | bridge, | |
struct ast_bridge_channel * | bridge_channel | |||
) | [static] |
Unsuspend function which means control of the channel is coming back to us.
Definition at line 341 of file bridge_multiplexed.c.
References ast_debug, ast_bridge::bridge_pvt, ast_bridge_channel::chan, multiplexed_add_or_remove(), and ast_channel::name.
00342 { 00343 struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt; 00344 00345 ast_debug(1, "Unsuspending channel '%s' from multiplexed thread '%p'\n", bridge_channel->chan->name, multiplexed_thread); 00346 00347 multiplexed_add_or_remove(multiplexed_thread, bridge_channel->chan, 1); 00348 00349 return; 00350 }
static enum ast_bridge_write_result multiplexed_bridge_write | ( | struct ast_bridge * | bridge, | |
struct ast_bridge_channel * | bridge_channel, | |||
struct ast_frame * | frame | |||
) | [static] |
Write function for writing frames into the bridge.
Definition at line 353 of file bridge_multiplexed.c.
References AST_BRIDGE_CHANNEL_STATE_WAIT, AST_BRIDGE_WRITE_FAILED, AST_BRIDGE_WRITE_SUCCESS, AST_LIST_FIRST, AST_LIST_LAST, ast_write(), ast_bridge_channel::bridge, ast_bridge_channel::chan, and ast_bridge_channel::state.
00354 { 00355 struct ast_bridge_channel *other; 00356 00357 if (AST_LIST_FIRST(&bridge->channels) == AST_LIST_LAST(&bridge->channels)) { 00358 return AST_BRIDGE_WRITE_FAILED; 00359 } 00360 00361 if (!(other = (AST_LIST_FIRST(&bridge->channels) == bridge_channel ? AST_LIST_LAST(&bridge->channels) : AST_LIST_FIRST(&bridge->channels)))) { 00362 return AST_BRIDGE_WRITE_FAILED; 00363 } 00364 00365 if (other->state == AST_BRIDGE_CHANNEL_STATE_WAIT) { 00366 ast_write(other->chan, frame); 00367 } 00368 00369 return AST_BRIDGE_WRITE_SUCCESS; 00370 }
static void multiplexed_nudge | ( | struct multiplexed_thread * | multiplexed_thread | ) | [static] |
Internal function which nudges the thread.
Definition at line 157 of file bridge_multiplexed.c.
References ast_log(), AST_PTHREADT_NULL, LOG_ERROR, multiplexed_thread::pipe, multiplexed_thread::thread, and multiplexed_thread::waiting.
Referenced by multiplexed_add_or_remove(), and multiplexed_bridge_destroy().
00158 { 00159 int nudge = 0; 00160 00161 if (multiplexed_thread->thread == AST_PTHREADT_NULL) { 00162 return; 00163 } 00164 00165 if (write(multiplexed_thread->pipe[1], &nudge, sizeof(nudge)) != sizeof(nudge)) { 00166 ast_log(LOG_ERROR, "We couldn't poke multiplexed thread '%p'... something is VERY wrong\n", multiplexed_thread); 00167 } 00168 00169 while (multiplexed_thread->waiting) { 00170 sched_yield(); 00171 } 00172 00173 return; 00174 }
static void* multiplexed_thread_function | ( | void * | data | ) | [static] |
Thread function that executes for multiplexed threads.
Definition at line 200 of file bridge_multiplexed.c.
References ao2_lock, ao2_ref, ao2_unlock, ast_bridge_handle_trip(), ast_debug, ast_log(), AST_PTHREADT_NULL, AST_PTHREADT_STOP, ast_waitfor_nandfds(), ast_channel::bridge, multiplexed_thread::chans, errno, first, LOG_WARNING, multiplexed_thread::pipe, multiplexed_thread::service_count, multiplexed_thread::thread, and multiplexed_thread::waiting.
00201 { 00202 struct multiplexed_thread *multiplexed_thread = data; 00203 int fds = multiplexed_thread->pipe[0]; 00204 00205 ao2_lock(multiplexed_thread); 00206 00207 ast_debug(1, "Starting actual thread for multiplexed thread '%p'\n", multiplexed_thread); 00208 00209 while (multiplexed_thread->thread != AST_PTHREADT_STOP) { 00210 struct ast_channel *winner = NULL, *first = multiplexed_thread->chans[0]; 00211 int to = -1, outfd = -1; 00212 00213 /* Move channels around so not just the first one gets priority */ 00214 memmove(multiplexed_thread->chans, multiplexed_thread->chans + 1, sizeof(struct ast_channel *) * (multiplexed_thread->service_count - 1)); 00215 multiplexed_thread->chans[multiplexed_thread->service_count - 1] = first; 00216 00217 multiplexed_thread->waiting = 1; 00218 ao2_unlock(multiplexed_thread); 00219 winner = ast_waitfor_nandfds(multiplexed_thread->chans, multiplexed_thread->service_count, &fds, 1, NULL, &outfd, &to); 00220 multiplexed_thread->waiting = 0; 00221 ao2_lock(multiplexed_thread); 00222 00223 if (outfd > -1) { 00224 int nudge; 00225 00226 if (read(multiplexed_thread->pipe[0], &nudge, sizeof(nudge)) < 0) { 00227 if (errno != EINTR && errno != EAGAIN) { 00228 ast_log(LOG_WARNING, "read() failed for pipe on multiplexed thread '%p': %s\n", multiplexed_thread, strerror(errno)); 00229 } 00230 } 00231 } 00232 if (winner && winner->bridge) { 00233 ast_bridge_handle_trip(winner->bridge, NULL, winner, -1); 00234 } 00235 } 00236 00237 multiplexed_thread->thread = AST_PTHREADT_NULL; 00238 00239 ast_debug(1, "Stopping actual thread for multiplexed thread '%p'\n", multiplexed_thread); 00240 00241 ao2_unlock(multiplexed_thread); 00242 ao2_ref(multiplexed_thread, -1); 00243 00244 return NULL; 00245 }
static int unload_module | ( | void | ) | [static] |
Definition at line 386 of file bridge_multiplexed.c.
References ao2_ref, ast_bridge_technology_unregister(), multiplexed_bridge, and multiplexed_threads.
00387 { 00388 int res = ast_bridge_technology_unregister(&multiplexed_bridge); 00389 00390 ao2_ref(multiplexed_threads, -1); 00391 00392 return res; 00393 }
struct ast_module_info __mod_info = { .name = AST_MODULE, .flags = AST_MODFLAG_LOAD_ORDER , .description = "Multiplexed two channel bridging module" , .key = "This paragraph is copyright (c) 2006 by Digium, Inc. \In order for your module to load, it must return this \key via a function called \"key\". Any code which \includes this paragraph must be licensed under the GNU \General Public License version 2 or later (at your \option). In addition to Digium's general reservations \of rights, Digium expressly reserves the right to \allow other parties to license this paragraph under \different terms. Any use of Digium, Inc. trademarks or \logos (including \"Asterisk\" or \"Digium\") without \express written permission of Digium, Inc. is prohibited.\n" , .buildopt_sum = "8586c2a7d357cb591cc3a6607a8f62d1" , .load = load_module, .unload = unload_module, .load_pri = AST_MODPRI_DEFAULT, } [static] |
Definition at line 404 of file bridge_multiplexed.c.
struct ast_module_info* ast_module_info = &__mod_info [static] |
Definition at line 404 of file bridge_multiplexed.c.
struct ast_bridge_technology multiplexed_bridge [static] |
Definition at line 372 of file bridge_multiplexed.c.
Referenced by load_module(), and unload_module().
struct ao2_container* multiplexed_threads [static] |
Container of all operating multiplexed threads.
Definition at line 69 of file bridge_multiplexed.c.
Referenced by load_module(), multiplexed_bridge_create(), multiplexed_bridge_destroy(), and unload_module().