Mon Oct 8 12:38:58 2012

Asterisk developer's documentation


bridge_multiplexed.c

Go to the documentation of this file.
00001 /*
00002  * Asterisk -- An open source telephony toolkit.
00003  *
00004  * Copyright (C) 2008, Digium, Inc.
00005  *
00006  * Joshua Colp <jcolp@digium.com>
00007  *
00008  * See http://www.asterisk.org for more information about
00009  * the Asterisk project. Please do not directly contact
00010  * any of the maintainers of this project for assistance;
00011  * the project provides a web site, mailing lists and IRC
00012  * channels for your use.
00013  *
00014  * This program is free software, distributed under the terms of
00015  * the GNU General Public License Version 2. See the LICENSE file
00016  * at the top of the source tree.
00017  */
00018 
00019 /*! \file
00020  *
00021  * \brief Two channel bridging module which groups bridges into batches of threads
00022  *
00023  * \author Joshua Colp <jcolp@digium.com>
00024  *
00025  * \ingroup bridges
00026  */
00027 
00028 /*** MODULEINFO
00029    <support_level>core</support_level>
00030  ***/
00031 
00032 #include "asterisk.h"
00033 
00034 ASTERISK_FILE_VERSION(__FILE__, "$Revision: 361403 $")
00035 
00036 #include <stdio.h>
00037 #include <stdlib.h>
00038 #include <string.h>
00039 #include <sys/types.h>
00040 #include <sys/stat.h>
00041 #include <fcntl.h>
00042 
00043 #include "asterisk/module.h"
00044 #include "asterisk/channel.h"
00045 #include "asterisk/bridging.h"
00046 #include "asterisk/bridging_technology.h"
00047 #include "asterisk/frame.h"
00048 #include "asterisk/astobj2.h"
00049 
00050 /*! \brief Number of buckets our multiplexed thread container can have */
00051 #define MULTIPLEXED_BUCKETS 53
00052 
00053 /*! \brief Number of channels we handle in a single thread */
00054 #define MULTIPLEXED_MAX_CHANNELS 8
00055 
00056 /*! \brief Structure which represents a single thread handling multiple 2 channel bridges */
00057 struct multiplexed_thread {
00058    /*! Thread itself */
00059    pthread_t thread;
00060    /*! Pipe used to wake up the multiplexed thread */
00061    int pipe[2];
00062    /*! Channels in this thread */
00063    struct ast_channel *chans[MULTIPLEXED_MAX_CHANNELS];
00064    /*! Number of channels in this thread */
00065    unsigned int count;
00066    /*! Bit used to indicate that the thread is waiting on channels */
00067    unsigned int waiting:1;
00068    /*! Number of channels actually being serviced by this thread */
00069    unsigned int service_count;
00070 };
00071 
00072 /*! \brief Container of all operating multiplexed threads */
00073 static struct ao2_container *multiplexed_threads;
00074 
00075 /*! \brief Callback function for finding a free multiplexed thread */
00076 static int find_multiplexed_thread(void *obj, void *arg, int flags)
00077 {
00078    struct multiplexed_thread *multiplexed_thread = obj;
00079    return (multiplexed_thread->count <= (MULTIPLEXED_MAX_CHANNELS - 2)) ? CMP_MATCH | CMP_STOP : 0;
00080 }
00081 
00082 /*! \brief Destroy callback for a multiplexed thread structure */
00083 static void destroy_multiplexed_thread(void *obj)
00084 {
00085    struct multiplexed_thread *multiplexed_thread = obj;
00086 
00087    if (multiplexed_thread->pipe[0] > -1) {
00088       close(multiplexed_thread->pipe[0]);
00089    }
00090    if (multiplexed_thread->pipe[1] > -1) {
00091       close(multiplexed_thread->pipe[1]);
00092    }
00093 
00094    return;
00095 }
00096 
00097 /*! \brief Create function which finds/reserves/references a multiplexed thread structure */
00098 static int multiplexed_bridge_create(struct ast_bridge *bridge)
00099 {
00100    struct multiplexed_thread *multiplexed_thread;
00101 
00102    ao2_lock(multiplexed_threads);
00103 
00104    /* Try to find an existing thread to handle our additional channels */
00105    if (!(multiplexed_thread = ao2_callback(multiplexed_threads, 0, find_multiplexed_thread, NULL))) {
00106       int flags;
00107 
00108       /* If we failed we will have to create a new one from scratch */
00109       if (!(multiplexed_thread = ao2_alloc(sizeof(*multiplexed_thread), destroy_multiplexed_thread))) {
00110          ast_debug(1, "Failed to find or create a new multiplexed thread for bridge '%p'\n", bridge);
00111          ao2_unlock(multiplexed_threads);
00112          return -1;
00113       }
00114 
00115       multiplexed_thread->pipe[0] = multiplexed_thread->pipe[1] = -1;
00116       /* Setup a pipe so we can poke the thread itself when needed */
00117       if (pipe(multiplexed_thread->pipe)) {
00118          ast_debug(1, "Failed to create a pipe for poking a multiplexed thread for bridge '%p'\n", bridge);
00119          ao2_ref(multiplexed_thread, -1);
00120          ao2_unlock(multiplexed_threads);
00121          return -1;
00122       }
00123 
00124       /* Setup each pipe for non-blocking operation */
00125       flags = fcntl(multiplexed_thread->pipe[0], F_GETFL);
00126       if (fcntl(multiplexed_thread->pipe[0], F_SETFL, flags | O_NONBLOCK) < 0) {
00127          ast_log(LOG_WARNING, "Failed to setup first nudge pipe for non-blocking operation on %p (%d: %s)\n", bridge, errno, strerror(errno));
00128          ao2_ref(multiplexed_thread, -1);
00129          ao2_unlock(multiplexed_threads);
00130          return -1;
00131       }
00132       flags = fcntl(multiplexed_thread->pipe[1], F_GETFL);
00133       if (fcntl(multiplexed_thread->pipe[1], F_SETFL, flags | O_NONBLOCK) < 0) {
00134          ast_log(LOG_WARNING, "Failed to setup second nudge pipe for non-blocking operation on %p (%d: %s)\n", bridge, errno, strerror(errno));
00135          ao2_ref(multiplexed_thread, -1);
00136          ao2_unlock(multiplexed_threads);
00137          return -1;
00138       }
00139 
00140       /* Set up default parameters */
00141       multiplexed_thread->thread = AST_PTHREADT_NULL;
00142 
00143       /* Finally link us into the container so others may find us */
00144       ao2_link(multiplexed_threads, multiplexed_thread);
00145       ast_debug(1, "Created multiplexed thread '%p' for bridge '%p'\n", multiplexed_thread, bridge);
00146    } else {
00147       ast_debug(1, "Found multiplexed thread '%p' for bridge '%p'\n", multiplexed_thread, bridge);
00148    }
00149 
00150    /* Bump the count of the thread structure up by two since the channels for this bridge will be joining shortly */
00151    multiplexed_thread->count += 2;
00152 
00153    ao2_unlock(multiplexed_threads);
00154 
00155    bridge->bridge_pvt = multiplexed_thread;
00156 
00157    return 0;
00158 }
00159 
00160 /*! \brief Internal function which nudges the thread */
00161 static void multiplexed_nudge(struct multiplexed_thread *multiplexed_thread)
00162 {
00163    int nudge = 0;
00164 
00165    if (multiplexed_thread->thread == AST_PTHREADT_NULL) {
00166       return;
00167    }
00168 
00169    if (write(multiplexed_thread->pipe[1], &nudge, sizeof(nudge)) != sizeof(nudge)) {
00170       ast_log(LOG_ERROR, "We couldn't poke multiplexed thread '%p'... something is VERY wrong\n", multiplexed_thread);
00171    }
00172 
00173    while (multiplexed_thread->waiting) {
00174       sched_yield();
00175    }
00176 
00177    return;
00178 }
00179 
00180 /*! \brief Destroy function which unreserves/unreferences/removes a multiplexed thread structure */
00181 static int multiplexed_bridge_destroy(struct ast_bridge *bridge)
00182 {
00183    struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt;
00184 
00185    ao2_lock(multiplexed_threads);
00186 
00187    multiplexed_thread->count -= 2;
00188 
00189    if (!multiplexed_thread->count) {
00190       ast_debug(1, "Unlinking multiplexed thread '%p' since nobody is using it anymore\n", multiplexed_thread);
00191       ao2_unlink(multiplexed_threads, multiplexed_thread);
00192    }
00193 
00194    multiplexed_nudge(multiplexed_thread);
00195 
00196    ao2_unlock(multiplexed_threads);
00197 
00198    ao2_ref(multiplexed_thread, -1);
00199 
00200    return 0;
00201 }
00202 
00203 /*! \brief Thread function that executes for multiplexed threads */
00204 static void *multiplexed_thread_function(void *data)
00205 {
00206    struct multiplexed_thread *multiplexed_thread = data;
00207    int fds = multiplexed_thread->pipe[0];
00208 
00209    ao2_lock(multiplexed_thread);
00210 
00211    ast_debug(1, "Starting actual thread for multiplexed thread '%p'\n", multiplexed_thread);
00212 
00213    while (multiplexed_thread->thread != AST_PTHREADT_STOP) {
00214       struct ast_channel *winner = NULL, *first = multiplexed_thread->chans[0];
00215       int to = -1, outfd = -1;
00216 
00217       /* Move channels around so not just the first one gets priority */
00218       memmove(multiplexed_thread->chans, multiplexed_thread->chans + 1, sizeof(struct ast_channel *) * (multiplexed_thread->service_count - 1));
00219       multiplexed_thread->chans[multiplexed_thread->service_count - 1] = first;
00220 
00221       multiplexed_thread->waiting = 1;
00222       ao2_unlock(multiplexed_thread);
00223       winner = ast_waitfor_nandfds(multiplexed_thread->chans, multiplexed_thread->service_count, &fds, 1, NULL, &outfd, &to);
00224       multiplexed_thread->waiting = 0;
00225       ao2_lock(multiplexed_thread);
00226       if (multiplexed_thread->thread == AST_PTHREADT_STOP) {
00227          break;
00228       }
00229 
00230       if (outfd > -1) {
00231          int nudge;
00232 
00233          if (read(multiplexed_thread->pipe[0], &nudge, sizeof(nudge)) < 0) {
00234             if (errno != EINTR && errno != EAGAIN) {
00235                ast_log(LOG_WARNING, "read() failed for pipe on multiplexed thread '%p': %s\n", multiplexed_thread, strerror(errno));
00236             }
00237          }
00238       }
00239       if (winner && winner->bridge) {
00240          struct ast_bridge *bridge = winner->bridge;
00241          int stop = 0;
00242          ao2_unlock(multiplexed_thread);
00243          while ((bridge = winner->bridge) && ao2_trylock(bridge)) {
00244             sched_yield();
00245             if (multiplexed_thread->thread == AST_PTHREADT_STOP) {
00246                stop = 1;
00247                break;
00248             }
00249          }
00250          if (!stop && bridge) {
00251             ast_bridge_handle_trip(bridge, NULL, winner, -1);
00252             ao2_unlock(bridge);
00253          }
00254          ao2_lock(multiplexed_thread);
00255       }
00256    }
00257 
00258    multiplexed_thread->thread = AST_PTHREADT_NULL;
00259 
00260    ast_debug(1, "Stopping actual thread for multiplexed thread '%p'\n", multiplexed_thread);
00261 
00262    ao2_unlock(multiplexed_thread);
00263    ao2_ref(multiplexed_thread, -1);
00264 
00265    return NULL;
00266 }
00267 
00268 /*! \brief Helper function which adds or removes a channel and nudges the thread */
00269 static void multiplexed_add_or_remove(struct multiplexed_thread *multiplexed_thread, struct ast_channel *chan, int add)
00270 {
00271    int i, removed = 0;
00272    pthread_t thread = AST_PTHREADT_NULL;
00273 
00274    ao2_lock(multiplexed_thread);
00275 
00276    multiplexed_nudge(multiplexed_thread);
00277 
00278    for (i = 0; i < MULTIPLEXED_MAX_CHANNELS; i++) {
00279       if (multiplexed_thread->chans[i] == chan) {
00280          if (!add) {
00281             multiplexed_thread->chans[i] = NULL;
00282             multiplexed_thread->service_count--;
00283             removed = 1;
00284          }
00285          break;
00286       } else if (!multiplexed_thread->chans[i] && add) {
00287          multiplexed_thread->chans[i] = chan;
00288          multiplexed_thread->service_count++;
00289          break;
00290       }
00291    }
00292 
00293    if (multiplexed_thread->service_count && multiplexed_thread->thread == AST_PTHREADT_NULL) {
00294       ao2_ref(multiplexed_thread, +1);
00295       if (ast_pthread_create(&multiplexed_thread->thread, NULL, multiplexed_thread_function, multiplexed_thread)) {
00296          ao2_ref(multiplexed_thread, -1);
00297          ast_debug(1, "Failed to create an actual thread for multiplexed thread '%p', trying next time\n", multiplexed_thread);
00298       }
00299    } else if (!multiplexed_thread->service_count && multiplexed_thread->thread != AST_PTHREADT_NULL) {
00300       thread = multiplexed_thread->thread;
00301       multiplexed_thread->thread = AST_PTHREADT_STOP;
00302    } else if (!add && removed) {
00303       memmove(multiplexed_thread->chans + i, multiplexed_thread->chans + i + 1, sizeof(struct ast_channel *) * (MULTIPLEXED_MAX_CHANNELS - (i + 1)));
00304    }
00305 
00306    ao2_unlock(multiplexed_thread);
00307 
00308    if (thread != AST_PTHREADT_NULL) {
00309       pthread_join(thread, NULL);
00310    }
00311 
00312    return;
00313 }
00314 
00315 /*! \brief Join function which actually adds the channel into the array to be monitored */
00316 static int multiplexed_bridge_join(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel)
00317 {
00318    struct ast_channel *c0 = AST_LIST_FIRST(&bridge->channels)->chan, *c1 = AST_LIST_LAST(&bridge->channels)->chan;
00319    struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt;
00320 
00321    ast_debug(1, "Adding channel '%s' to multiplexed thread '%p' for monitoring\n", bridge_channel->chan->name, multiplexed_thread);
00322 
00323    multiplexed_add_or_remove(multiplexed_thread, bridge_channel->chan, 1);
00324 
00325    /* If the second channel has not yet joined do not make things compatible */
00326    if (c0 == c1) {
00327       return 0;
00328    }
00329 
00330    if (((c0->writeformat == c1->readformat) && (c0->readformat == c1->writeformat) && (c0->nativeformats == c1->nativeformats))) {
00331       return 0;
00332    }
00333 
00334    return ast_channel_make_compatible(c0, c1);
00335 }
00336 
00337 /*! \brief Leave function which actually removes the channel from the array */
00338 static int multiplexed_bridge_leave(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel)
00339 {
00340    struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt;
00341 
00342    ast_debug(1, "Removing channel '%s' from multiplexed thread '%p'\n", bridge_channel->chan->name, multiplexed_thread);
00343 
00344    multiplexed_add_or_remove(multiplexed_thread, bridge_channel->chan, 0);
00345 
00346    return 0;
00347 }
00348 
00349 /*! \brief Suspend function which means control of the channel is going elsewhere */
00350 static void multiplexed_bridge_suspend(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel)
00351 {
00352    struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt;
00353 
00354    ast_debug(1, "Suspending channel '%s' from multiplexed thread '%p'\n", bridge_channel->chan->name, multiplexed_thread);
00355 
00356    multiplexed_add_or_remove(multiplexed_thread, bridge_channel->chan, 0);
00357 
00358    return;
00359 }
00360 
00361 /*! \brief Unsuspend function which means control of the channel is coming back to us */
00362 static void multiplexed_bridge_unsuspend(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel)
00363 {
00364    struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt;
00365 
00366    ast_debug(1, "Unsuspending channel '%s' from multiplexed thread '%p'\n", bridge_channel->chan->name, multiplexed_thread);
00367 
00368    multiplexed_add_or_remove(multiplexed_thread, bridge_channel->chan, 1);
00369 
00370    return;
00371 }
00372 
00373 /*! \brief Write function for writing frames into the bridge */
00374 static enum ast_bridge_write_result multiplexed_bridge_write(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel, struct ast_frame *frame)
00375 {
00376    struct ast_bridge_channel *other;
00377 
00378    if (AST_LIST_FIRST(&bridge->channels) == AST_LIST_LAST(&bridge->channels)) {
00379       return AST_BRIDGE_WRITE_FAILED;
00380    }
00381 
00382    if (!(other = (AST_LIST_FIRST(&bridge->channels) == bridge_channel ? AST_LIST_LAST(&bridge->channels) : AST_LIST_FIRST(&bridge->channels)))) {
00383       return AST_BRIDGE_WRITE_FAILED;
00384    }
00385 
00386    if (other->state == AST_BRIDGE_CHANNEL_STATE_WAIT) {
00387       ast_write(other->chan, frame);
00388    }
00389 
00390    return AST_BRIDGE_WRITE_SUCCESS;
00391 }
00392 
00393 static struct ast_bridge_technology multiplexed_bridge = {
00394    .name = "multiplexed_bridge",
00395    .capabilities = AST_BRIDGE_CAPABILITY_1TO1MIX,
00396    .preference = AST_BRIDGE_PREFERENCE_HIGH,
00397    .formats = AST_FORMAT_AUDIO_MASK | AST_FORMAT_VIDEO_MASK | AST_FORMAT_TEXT_MASK,
00398    .create = multiplexed_bridge_create,
00399    .destroy = multiplexed_bridge_destroy,
00400    .join = multiplexed_bridge_join,
00401    .leave = multiplexed_bridge_leave,
00402    .suspend = multiplexed_bridge_suspend,
00403    .unsuspend = multiplexed_bridge_unsuspend,
00404    .write = multiplexed_bridge_write,
00405 };
00406 
00407 static int unload_module(void)
00408 {
00409    int res = ast_bridge_technology_unregister(&multiplexed_bridge);
00410 
00411    ao2_ref(multiplexed_threads, -1);
00412 
00413    return res;
00414 }
00415 
00416 static int load_module(void)
00417 {
00418    if (!(multiplexed_threads = ao2_container_alloc(MULTIPLEXED_BUCKETS, NULL, NULL))) {
00419       return AST_MODULE_LOAD_DECLINE;
00420    }
00421 
00422    return ast_bridge_technology_register(&multiplexed_bridge);
00423 }
00424 
00425 AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "Multiplexed two channel bridging module");

Generated on Mon Oct 8 12:38:58 2012 for Asterisk - The Open Source Telephony Project by  doxygen 1.4.7