Wed Apr 6 11:29:53 2011

Asterisk developer's documentation


bridge_multiplexed.c File Reference

Two channel bridging module which groups bridges into batches of threads. More...

#include "asterisk.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include "asterisk/module.h"
#include "asterisk/channel.h"
#include "asterisk/bridging.h"
#include "asterisk/bridging_technology.h"
#include "asterisk/frame.h"
#include "asterisk/astobj2.h"

Go to the source code of this file.

Data Structures

struct  multiplexed_thread
 Structure which represents a single thread handling multiple 2 channel bridges. More...

Defines

#define MULTIPLEXED_BUCKETS   53
 Number of buckets our multiplexed thread container can have.
#define MULTIPLEXED_MAX_CHANNELS   8
 Number of channels we handle in a single thread.

Functions

static void __reg_module (void)
static void __unreg_module (void)
static void destroy_multiplexed_thread (void *obj)
 Destroy callback for a multiplexed thread structure.
static int find_multiplexed_thread (void *obj, void *arg, int flags)
 Callback function for finding a free multiplexed thread.
static int load_module (void)
static void multiplexed_add_or_remove (struct multiplexed_thread *multiplexed_thread, struct ast_channel *chan, int add)
 Helper function which adds or removes a channel and nudges the thread.
static int multiplexed_bridge_create (struct ast_bridge *bridge)
 Create function which finds/reserves/references a multiplexed thread structure.
static int multiplexed_bridge_destroy (struct ast_bridge *bridge)
 Destroy function which unreserves/unreferences/removes a multiplexed thread structure.
static int multiplexed_bridge_join (struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel)
 Join function which actually adds the channel into the array to be monitored.
static int multiplexed_bridge_leave (struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel)
 Leave function which actually removes the channel from the array.
static void multiplexed_bridge_suspend (struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel)
 Suspend function which means control of the channel is going elsewhere.
static void multiplexed_bridge_unsuspend (struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel)
 Unsuspend function which means control of the channel is coming back to us.
static enum ast_bridge_write_result multiplexed_bridge_write (struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel, struct ast_frame *frame)
 Write function for writing frames into the bridge.
static void multiplexed_nudge (struct multiplexed_thread *multiplexed_thread)
 Internal function which nudges the thread.
static void * multiplexed_thread_function (void *data)
 Thread function that executes for multiplexed threads.
static int unload_module (void)

Variables

static struct ast_module_info __mod_info = { .name = AST_MODULE, .flags = AST_MODFLAG_LOAD_ORDER , .description = "Multiplexed two channel bridging module" , .key = "This paragraph is copyright (c) 2006 by Digium, Inc. \In order for your module to load, it must return this \key via a function called \"key\". Any code which \includes this paragraph must be licensed under the GNU \General Public License version 2 or later (at your \option). In addition to Digium's general reservations \of rights, Digium expressly reserves the right to \allow other parties to license this paragraph under \different terms. Any use of Digium, Inc. trademarks or \logos (including \"Asterisk\" or \"Digium\") without \express written permission of Digium, Inc. is prohibited.\n" , .buildopt_sum = "8586c2a7d357cb591cc3a6607a8f62d1" , .load = load_module, .unload = unload_module, .load_pri = AST_MODPRI_DEFAULT, }
static struct ast_module_infoast_module_info = &__mod_info
static struct ast_bridge_technology multiplexed_bridge
static struct ao2_containermultiplexed_threads
 Container of all operating multiplexed threads.


Detailed Description

Two channel bridging module which groups bridges into batches of threads.

Author:
Joshua Colp <jcolp@digium.com>

Definition in file bridge_multiplexed.c.


Define Documentation

#define MULTIPLEXED_BUCKETS   53

Number of buckets our multiplexed thread container can have.

Definition at line 47 of file bridge_multiplexed.c.

Referenced by load_module().

#define MULTIPLEXED_MAX_CHANNELS   8

Number of channels we handle in a single thread.

Definition at line 50 of file bridge_multiplexed.c.

Referenced by find_multiplexed_thread(), and multiplexed_add_or_remove().


Function Documentation

static void __reg_module ( void   )  [static]

Definition at line 404 of file bridge_multiplexed.c.

static void __unreg_module ( void   )  [static]

Definition at line 404 of file bridge_multiplexed.c.

static void destroy_multiplexed_thread ( void *  obj  )  [static]

Destroy callback for a multiplexed thread structure.

Definition at line 79 of file bridge_multiplexed.c.

References multiplexed_thread::pipe.

Referenced by multiplexed_bridge_create().

00080 {
00081    struct multiplexed_thread *multiplexed_thread = obj;
00082 
00083    if (multiplexed_thread->pipe[0] > -1) {
00084       close(multiplexed_thread->pipe[0]);
00085    }
00086    if (multiplexed_thread->pipe[1] > -1) {
00087       close(multiplexed_thread->pipe[1]);
00088    }
00089 
00090    return;
00091 }

static int find_multiplexed_thread ( void *  obj,
void *  arg,
int  flags 
) [static]

Callback function for finding a free multiplexed thread.

Definition at line 72 of file bridge_multiplexed.c.

References CMP_MATCH, CMP_STOP, multiplexed_thread::count, and MULTIPLEXED_MAX_CHANNELS.

Referenced by multiplexed_bridge_create().

00073 {
00074    struct multiplexed_thread *multiplexed_thread = obj;
00075    return (multiplexed_thread->count <= (MULTIPLEXED_MAX_CHANNELS - 2)) ? CMP_MATCH | CMP_STOP : 0;
00076 }

static int load_module ( void   )  [static]

Definition at line 395 of file bridge_multiplexed.c.

References ao2_container_alloc, ast_bridge_technology_register, AST_MODULE_LOAD_DECLINE, multiplexed_bridge, MULTIPLEXED_BUCKETS, and multiplexed_threads.

00396 {
00397    if (!(multiplexed_threads = ao2_container_alloc(MULTIPLEXED_BUCKETS, NULL, NULL))) {
00398       return AST_MODULE_LOAD_DECLINE;
00399    }
00400 
00401    return ast_bridge_technology_register(&multiplexed_bridge);
00402 }

static void multiplexed_add_or_remove ( struct multiplexed_thread multiplexed_thread,
struct ast_channel chan,
int  add 
) [static]

Helper function which adds or removes a channel and nudges the thread.

Definition at line 248 of file bridge_multiplexed.c.

References ao2_lock, AST_PTHREADT_NULL, multiplexed_thread::chans, MULTIPLEXED_MAX_CHANNELS, multiplexed_nudge(), multiplexed_thread::service_count, and thread.

Referenced by multiplexed_bridge_join(), multiplexed_bridge_leave(), multiplexed_bridge_suspend(), and multiplexed_bridge_unsuspend().

00249 {
00250    int i, removed = 0;
00251    pthread_t thread = AST_PTHREADT_NULL;
00252 
00253    ao2_lock(multiplexed_thread);
00254 
00255    multiplexed_nudge(multiplexed_thread);
00256 
00257    for (i = 0; i < MULTIPLEXED_MAX_CHANNELS; i++) {
00258       if (multiplexed_thread->chans[i] == chan) {
00259          if (!add) {
00260             multiplexed_thread->chans[i] = NULL;
00261             multiplexed_thread->service_count--;
00262             removed = 1;
00263          }
00264          break;
00265       } else if (!multiplexed_thread->chans[i] && add) {
00266          multiplexed_thread->chans[i] = chan;
00267          multiplexed_thread->service_count++;
00268          break;
00269       }
00270    }
00271 
00272    if (multiplexed_thread->service_count && multiplexed_thread->thread == AST_PTHREADT_NULL) {
00273       ao2_ref(multiplexed_thread, +1);
00274       if (ast_pthread_create(&multiplexed_thread->thread, NULL, multiplexed_thread_function, multiplexed_thread)) {
00275          ao2_ref(multiplexed_thread, -1);
00276          ast_debug(1, "Failed to create an actual thread for multiplexed thread '%p', trying next time\n", multiplexed_thread);
00277       }
00278    } else if (!multiplexed_thread->service_count && multiplexed_thread->thread != AST_PTHREADT_NULL) {
00279       thread = multiplexed_thread->thread;
00280       multiplexed_thread->thread = AST_PTHREADT_STOP;
00281    } else if (!add && removed) {
00282       memmove(multiplexed_thread->chans + i, multiplexed_thread->chans + i + 1, sizeof(struct ast_channel *) * (MULTIPLEXED_MAX_CHANNELS - (i + 1)));
00283    }
00284 
00285    ao2_unlock(multiplexed_thread);
00286 
00287    if (thread != AST_PTHREADT_NULL) {
00288       pthread_join(thread, NULL);
00289    }
00290 
00291    return;
00292 }

static int multiplexed_bridge_create ( struct ast_bridge bridge  )  [static]

Create function which finds/reserves/references a multiplexed thread structure.

Definition at line 94 of file bridge_multiplexed.c.

References ao2_alloc, ao2_callback, ao2_link, ao2_lock, ao2_ref, ao2_unlock, ast_debug, ast_log(), AST_PTHREADT_NULL, ast_bridge::bridge_pvt, multiplexed_thread::count, destroy_multiplexed_thread(), errno, find_multiplexed_thread(), LOG_WARNING, multiplexed_threads, and multiplexed_thread::pipe.

00095 {
00096    struct multiplexed_thread *multiplexed_thread;
00097 
00098    ao2_lock(multiplexed_threads);
00099 
00100    /* Try to find an existing thread to handle our additional channels */
00101    if (!(multiplexed_thread = ao2_callback(multiplexed_threads, 0, find_multiplexed_thread, NULL))) {
00102       int flags;
00103 
00104       /* If we failed we will have to create a new one from scratch */
00105       if (!(multiplexed_thread = ao2_alloc(sizeof(*multiplexed_thread), destroy_multiplexed_thread))) {
00106          ast_debug(1, "Failed to find or create a new multiplexed thread for bridge '%p'\n", bridge);
00107          ao2_unlock(multiplexed_threads);
00108          return -1;
00109       }
00110 
00111       multiplexed_thread->pipe[0] = multiplexed_thread->pipe[1] = -1;
00112       /* Setup a pipe so we can poke the thread itself when needed */
00113       if (pipe(multiplexed_thread->pipe)) {
00114          ast_debug(1, "Failed to create a pipe for poking a multiplexed thread for bridge '%p'\n", bridge);
00115          ao2_ref(multiplexed_thread, -1);
00116          ao2_unlock(multiplexed_threads);
00117          return -1;
00118       }
00119 
00120       /* Setup each pipe for non-blocking operation */
00121       flags = fcntl(multiplexed_thread->pipe[0], F_GETFL);
00122       if (fcntl(multiplexed_thread->pipe[0], F_SETFL, flags | O_NONBLOCK) < 0) {
00123          ast_log(LOG_WARNING, "Failed to setup first nudge pipe for non-blocking operation on %p (%d: %s)\n", bridge, errno, strerror(errno));
00124          ao2_ref(multiplexed_thread, -1);
00125          ao2_unlock(multiplexed_threads);
00126          return -1;
00127       }
00128       flags = fcntl(multiplexed_thread->pipe[1], F_GETFL);
00129       if (fcntl(multiplexed_thread->pipe[1], F_SETFL, flags | O_NONBLOCK) < 0) {
00130          ast_log(LOG_WARNING, "Failed to setup second nudge pipe for non-blocking operation on %p (%d: %s)\n", bridge, errno, strerror(errno));
00131          ao2_ref(multiplexed_thread, -1);
00132          ao2_unlock(multiplexed_threads);
00133          return -1;
00134       }
00135 
00136       /* Set up default parameters */
00137       multiplexed_thread->thread = AST_PTHREADT_NULL;
00138 
00139       /* Finally link us into the container so others may find us */
00140       ao2_link(multiplexed_threads, multiplexed_thread);
00141       ast_debug(1, "Created multiplexed thread '%p' for bridge '%p'\n", multiplexed_thread, bridge);
00142    } else {
00143       ast_debug(1, "Found multiplexed thread '%p' for bridge '%p'\n", multiplexed_thread, bridge);
00144    }
00145 
00146    /* Bump the count of the thread structure up by two since the channels for this bridge will be joining shortly */
00147    multiplexed_thread->count += 2;
00148 
00149    ao2_unlock(multiplexed_threads);
00150 
00151    bridge->bridge_pvt = multiplexed_thread;
00152 
00153    return 0;
00154 }

static int multiplexed_bridge_destroy ( struct ast_bridge bridge  )  [static]

Destroy function which unreserves/unreferences/removes a multiplexed thread structure.

Definition at line 177 of file bridge_multiplexed.c.

References ao2_lock, ao2_ref, ao2_unlink, ao2_unlock, ast_debug, ast_bridge::bridge_pvt, multiplexed_thread::count, multiplexed_nudge(), and multiplexed_threads.

00178 {
00179    struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt;
00180 
00181    ao2_lock(multiplexed_threads);
00182 
00183    multiplexed_thread->count -= 2;
00184 
00185    if (!multiplexed_thread->count) {
00186       ast_debug(1, "Unlinking multiplexed thread '%p' since nobody is using it anymore\n", multiplexed_thread);
00187       ao2_unlink(multiplexed_threads, multiplexed_thread);
00188    }
00189 
00190    multiplexed_nudge(multiplexed_thread);
00191 
00192    ao2_unlock(multiplexed_threads);
00193 
00194    ao2_ref(multiplexed_thread, -1);
00195 
00196    return 0;
00197 }

static int multiplexed_bridge_join ( struct ast_bridge bridge,
struct ast_bridge_channel bridge_channel 
) [static]

Join function which actually adds the channel into the array to be monitored.

Definition at line 295 of file bridge_multiplexed.c.

References ast_channel_make_compatible(), ast_debug, AST_LIST_FIRST, AST_LIST_LAST, ast_channel::bridge, ast_bridge::bridge_pvt, ast_bridge_channel::chan, multiplexed_add_or_remove(), ast_channel::name, ast_channel::nativeformats, ast_channel::readformat, and ast_channel::writeformat.

00296 {
00297    struct ast_channel *c0 = AST_LIST_FIRST(&bridge->channels)->chan, *c1 = AST_LIST_LAST(&bridge->channels)->chan;
00298    struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt;
00299 
00300    ast_debug(1, "Adding channel '%s' to multiplexed thread '%p' for monitoring\n", bridge_channel->chan->name, multiplexed_thread);
00301 
00302    multiplexed_add_or_remove(multiplexed_thread, bridge_channel->chan, 1);
00303 
00304    /* If the second channel has not yet joined do not make things compatible */
00305    if (c0 == c1) {
00306       return 0;
00307    }
00308 
00309    if (((c0->writeformat == c1->readformat) && (c0->readformat == c1->writeformat) && (c0->nativeformats == c1->nativeformats))) {
00310       return 0;
00311    }
00312 
00313    return ast_channel_make_compatible(c0, c1);
00314 }

static int multiplexed_bridge_leave ( struct ast_bridge bridge,
struct ast_bridge_channel bridge_channel 
) [static]

Leave function which actually removes the channel from the array.

Definition at line 317 of file bridge_multiplexed.c.

References ast_debug, ast_bridge::bridge_pvt, ast_bridge_channel::chan, multiplexed_add_or_remove(), and ast_channel::name.

00318 {
00319    struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt;
00320 
00321    ast_debug(1, "Removing channel '%s' from multiplexed thread '%p'\n", bridge_channel->chan->name, multiplexed_thread);
00322 
00323    multiplexed_add_or_remove(multiplexed_thread, bridge_channel->chan, 0);
00324 
00325    return 0;
00326 }

static void multiplexed_bridge_suspend ( struct ast_bridge bridge,
struct ast_bridge_channel bridge_channel 
) [static]

Suspend function which means control of the channel is going elsewhere.

Definition at line 329 of file bridge_multiplexed.c.

References ast_debug, ast_bridge::bridge_pvt, ast_bridge_channel::chan, multiplexed_add_or_remove(), and ast_channel::name.

00330 {
00331    struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt;
00332 
00333    ast_debug(1, "Suspending channel '%s' from multiplexed thread '%p'\n", bridge_channel->chan->name, multiplexed_thread);
00334 
00335    multiplexed_add_or_remove(multiplexed_thread, bridge_channel->chan, 0);
00336 
00337    return;
00338 }

static void multiplexed_bridge_unsuspend ( struct ast_bridge bridge,
struct ast_bridge_channel bridge_channel 
) [static]

Unsuspend function which means control of the channel is coming back to us.

Definition at line 341 of file bridge_multiplexed.c.

References ast_debug, ast_bridge::bridge_pvt, ast_bridge_channel::chan, multiplexed_add_or_remove(), and ast_channel::name.

00342 {
00343    struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt;
00344 
00345    ast_debug(1, "Unsuspending channel '%s' from multiplexed thread '%p'\n", bridge_channel->chan->name, multiplexed_thread);
00346 
00347    multiplexed_add_or_remove(multiplexed_thread, bridge_channel->chan, 1);
00348 
00349    return;
00350 }

static enum ast_bridge_write_result multiplexed_bridge_write ( struct ast_bridge bridge,
struct ast_bridge_channel bridge_channel,
struct ast_frame frame 
) [static]

Write function for writing frames into the bridge.

Definition at line 353 of file bridge_multiplexed.c.

References AST_BRIDGE_CHANNEL_STATE_WAIT, AST_BRIDGE_WRITE_FAILED, AST_BRIDGE_WRITE_SUCCESS, AST_LIST_FIRST, AST_LIST_LAST, ast_write(), ast_bridge_channel::bridge, ast_bridge_channel::chan, and ast_bridge_channel::state.

00354 {
00355    struct ast_bridge_channel *other;
00356 
00357    if (AST_LIST_FIRST(&bridge->channels) == AST_LIST_LAST(&bridge->channels)) {
00358       return AST_BRIDGE_WRITE_FAILED;
00359    }
00360 
00361    if (!(other = (AST_LIST_FIRST(&bridge->channels) == bridge_channel ? AST_LIST_LAST(&bridge->channels) : AST_LIST_FIRST(&bridge->channels)))) {
00362       return AST_BRIDGE_WRITE_FAILED;
00363    }
00364 
00365    if (other->state == AST_BRIDGE_CHANNEL_STATE_WAIT) {
00366       ast_write(other->chan, frame);
00367    }
00368 
00369    return AST_BRIDGE_WRITE_SUCCESS;
00370 }

static void multiplexed_nudge ( struct multiplexed_thread multiplexed_thread  )  [static]

Internal function which nudges the thread.

Definition at line 157 of file bridge_multiplexed.c.

References ast_log(), AST_PTHREADT_NULL, LOG_ERROR, multiplexed_thread::pipe, multiplexed_thread::thread, and multiplexed_thread::waiting.

Referenced by multiplexed_add_or_remove(), and multiplexed_bridge_destroy().

00158 {
00159    int nudge = 0;
00160 
00161    if (multiplexed_thread->thread == AST_PTHREADT_NULL) {
00162       return;
00163    }
00164 
00165    if (write(multiplexed_thread->pipe[1], &nudge, sizeof(nudge)) != sizeof(nudge)) {
00166       ast_log(LOG_ERROR, "We couldn't poke multiplexed thread '%p'... something is VERY wrong\n", multiplexed_thread);
00167    }
00168 
00169    while (multiplexed_thread->waiting) {
00170       sched_yield();
00171    }
00172 
00173    return;
00174 }

static void* multiplexed_thread_function ( void *  data  )  [static]

Thread function that executes for multiplexed threads.

Definition at line 200 of file bridge_multiplexed.c.

References ao2_lock, ao2_ref, ao2_unlock, ast_bridge_handle_trip(), ast_debug, ast_log(), AST_PTHREADT_NULL, AST_PTHREADT_STOP, ast_waitfor_nandfds(), ast_channel::bridge, multiplexed_thread::chans, errno, first, LOG_WARNING, multiplexed_thread::pipe, multiplexed_thread::service_count, multiplexed_thread::thread, and multiplexed_thread::waiting.

00201 {
00202    struct multiplexed_thread *multiplexed_thread = data;
00203    int fds = multiplexed_thread->pipe[0];
00204 
00205    ao2_lock(multiplexed_thread);
00206 
00207    ast_debug(1, "Starting actual thread for multiplexed thread '%p'\n", multiplexed_thread);
00208 
00209    while (multiplexed_thread->thread != AST_PTHREADT_STOP) {
00210       struct ast_channel *winner = NULL, *first = multiplexed_thread->chans[0];
00211       int to = -1, outfd = -1;
00212 
00213       /* Move channels around so not just the first one gets priority */
00214       memmove(multiplexed_thread->chans, multiplexed_thread->chans + 1, sizeof(struct ast_channel *) * (multiplexed_thread->service_count - 1));
00215       multiplexed_thread->chans[multiplexed_thread->service_count - 1] = first;
00216 
00217       multiplexed_thread->waiting = 1;
00218       ao2_unlock(multiplexed_thread);
00219       winner = ast_waitfor_nandfds(multiplexed_thread->chans, multiplexed_thread->service_count, &fds, 1, NULL, &outfd, &to);
00220       multiplexed_thread->waiting = 0;
00221       ao2_lock(multiplexed_thread);
00222 
00223       if (outfd > -1) {
00224          int nudge;
00225 
00226          if (read(multiplexed_thread->pipe[0], &nudge, sizeof(nudge)) < 0) {
00227             if (errno != EINTR && errno != EAGAIN) {
00228                ast_log(LOG_WARNING, "read() failed for pipe on multiplexed thread '%p': %s\n", multiplexed_thread, strerror(errno));
00229             }
00230          }
00231       }
00232       if (winner && winner->bridge) {
00233          ast_bridge_handle_trip(winner->bridge, NULL, winner, -1);
00234       }
00235    }
00236 
00237    multiplexed_thread->thread = AST_PTHREADT_NULL;
00238 
00239    ast_debug(1, "Stopping actual thread for multiplexed thread '%p'\n", multiplexed_thread);
00240 
00241    ao2_unlock(multiplexed_thread);
00242    ao2_ref(multiplexed_thread, -1);
00243 
00244    return NULL;
00245 }

static int unload_module ( void   )  [static]

Definition at line 386 of file bridge_multiplexed.c.

References ao2_ref, ast_bridge_technology_unregister(), multiplexed_bridge, and multiplexed_threads.

00387 {
00388    int res = ast_bridge_technology_unregister(&multiplexed_bridge);
00389 
00390    ao2_ref(multiplexed_threads, -1);
00391 
00392    return res;
00393 }


Variable Documentation

struct ast_module_info __mod_info = { .name = AST_MODULE, .flags = AST_MODFLAG_LOAD_ORDER , .description = "Multiplexed two channel bridging module" , .key = "This paragraph is copyright (c) 2006 by Digium, Inc. \In order for your module to load, it must return this \key via a function called \"key\". Any code which \includes this paragraph must be licensed under the GNU \General Public License version 2 or later (at your \option). In addition to Digium's general reservations \of rights, Digium expressly reserves the right to \allow other parties to license this paragraph under \different terms. Any use of Digium, Inc. trademarks or \logos (including \"Asterisk\" or \"Digium\") without \express written permission of Digium, Inc. is prohibited.\n" , .buildopt_sum = "8586c2a7d357cb591cc3a6607a8f62d1" , .load = load_module, .unload = unload_module, .load_pri = AST_MODPRI_DEFAULT, } [static]

Definition at line 404 of file bridge_multiplexed.c.

struct ast_module_info* ast_module_info = &__mod_info [static]

Definition at line 404 of file bridge_multiplexed.c.

struct ast_bridge_technology multiplexed_bridge [static]

Definition at line 372 of file bridge_multiplexed.c.

Referenced by load_module(), and unload_module().

struct ao2_container* multiplexed_threads [static]

Container of all operating multiplexed threads.

Definition at line 69 of file bridge_multiplexed.c.

Referenced by load_module(), multiplexed_bridge_create(), multiplexed_bridge_destroy(), and unload_module().


Generated on Wed Apr 6 11:29:53 2011 for Asterisk - The Open Source Telephony Project by  doxygen 1.4.7