00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032 #include "asterisk.h"
00033
00034 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
00035
00036 #include <stdio.h>
00037 #include <stdlib.h>
00038 #include <string.h>
00039 #include <sys/types.h>
00040 #include <sys/stat.h>
00041 #include <fcntl.h>
00042
00043 #include "asterisk/module.h"
00044 #include "asterisk/channel.h"
00045 #include "asterisk/bridging.h"
00046 #include "asterisk/bridging_technology.h"
00047 #include "asterisk/frame.h"
00048 #include "asterisk/astobj2.h"
00049
00050
00051 #define MULTIPLEXED_BUCKETS 53
00052
00053
00054 #define MULTIPLEXED_MAX_CHANNELS 8
00055
00056
00057 struct multiplexed_thread {
00058
00059 pthread_t thread;
00060
00061 int pipe[2];
00062
00063 struct ast_channel *chans[MULTIPLEXED_MAX_CHANNELS];
00064
00065 unsigned int count;
00066
00067 unsigned int waiting:1;
00068
00069 unsigned int service_count;
00070 };
00071
00072
00073 static struct ao2_container *multiplexed_threads;
00074
00075
00076 static int find_multiplexed_thread(void *obj, void *arg, int flags)
00077 {
00078 struct multiplexed_thread *multiplexed_thread = obj;
00079 return (multiplexed_thread->count <= (MULTIPLEXED_MAX_CHANNELS - 2)) ? CMP_MATCH | CMP_STOP : 0;
00080 }
00081
00082
00083 static void destroy_multiplexed_thread(void *obj)
00084 {
00085 struct multiplexed_thread *multiplexed_thread = obj;
00086
00087 if (multiplexed_thread->pipe[0] > -1) {
00088 close(multiplexed_thread->pipe[0]);
00089 }
00090 if (multiplexed_thread->pipe[1] > -1) {
00091 close(multiplexed_thread->pipe[1]);
00092 }
00093
00094 return;
00095 }
00096
00097
00098 static int multiplexed_bridge_create(struct ast_bridge *bridge)
00099 {
00100 struct multiplexed_thread *multiplexed_thread;
00101
00102 ao2_lock(multiplexed_threads);
00103
00104
00105 if (!(multiplexed_thread = ao2_callback(multiplexed_threads, 0, find_multiplexed_thread, NULL))) {
00106 int flags;
00107
00108
00109 if (!(multiplexed_thread = ao2_alloc(sizeof(*multiplexed_thread), destroy_multiplexed_thread))) {
00110 ast_debug(1, "Failed to find or create a new multiplexed thread for bridge '%p'\n", bridge);
00111 ao2_unlock(multiplexed_threads);
00112 return -1;
00113 }
00114
00115 multiplexed_thread->pipe[0] = multiplexed_thread->pipe[1] = -1;
00116
00117 if (pipe(multiplexed_thread->pipe)) {
00118 ast_debug(1, "Failed to create a pipe for poking a multiplexed thread for bridge '%p'\n", bridge);
00119 ao2_ref(multiplexed_thread, -1);
00120 ao2_unlock(multiplexed_threads);
00121 return -1;
00122 }
00123
00124
00125 flags = fcntl(multiplexed_thread->pipe[0], F_GETFL);
00126 if (fcntl(multiplexed_thread->pipe[0], F_SETFL, flags | O_NONBLOCK) < 0) {
00127 ast_log(LOG_WARNING, "Failed to setup first nudge pipe for non-blocking operation on %p (%d: %s)\n", bridge, errno, strerror(errno));
00128 ao2_ref(multiplexed_thread, -1);
00129 ao2_unlock(multiplexed_threads);
00130 return -1;
00131 }
00132 flags = fcntl(multiplexed_thread->pipe[1], F_GETFL);
00133 if (fcntl(multiplexed_thread->pipe[1], F_SETFL, flags | O_NONBLOCK) < 0) {
00134 ast_log(LOG_WARNING, "Failed to setup second nudge pipe for non-blocking operation on %p (%d: %s)\n", bridge, errno, strerror(errno));
00135 ao2_ref(multiplexed_thread, -1);
00136 ao2_unlock(multiplexed_threads);
00137 return -1;
00138 }
00139
00140
00141 multiplexed_thread->thread = AST_PTHREADT_NULL;
00142
00143
00144 ao2_link(multiplexed_threads, multiplexed_thread);
00145 ast_debug(1, "Created multiplexed thread '%p' for bridge '%p'\n", multiplexed_thread, bridge);
00146 } else {
00147 ast_debug(1, "Found multiplexed thread '%p' for bridge '%p'\n", multiplexed_thread, bridge);
00148 }
00149
00150
00151 multiplexed_thread->count += 2;
00152
00153 ao2_unlock(multiplexed_threads);
00154
00155 bridge->bridge_pvt = multiplexed_thread;
00156
00157 return 0;
00158 }
00159
00160
00161 static void multiplexed_nudge(struct multiplexed_thread *multiplexed_thread)
00162 {
00163 int nudge = 0;
00164
00165 if (multiplexed_thread->thread == AST_PTHREADT_NULL) {
00166 return;
00167 }
00168
00169 if (write(multiplexed_thread->pipe[1], &nudge, sizeof(nudge)) != sizeof(nudge)) {
00170 ast_log(LOG_ERROR, "We couldn't poke multiplexed thread '%p'... something is VERY wrong\n", multiplexed_thread);
00171 }
00172
00173 while (multiplexed_thread->waiting) {
00174 sched_yield();
00175 }
00176
00177 return;
00178 }
00179
00180
00181 static int multiplexed_bridge_destroy(struct ast_bridge *bridge)
00182 {
00183 struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt;
00184
00185 ao2_lock(multiplexed_threads);
00186
00187 multiplexed_thread->count -= 2;
00188
00189 if (!multiplexed_thread->count) {
00190 ast_debug(1, "Unlinking multiplexed thread '%p' since nobody is using it anymore\n", multiplexed_thread);
00191 ao2_unlink(multiplexed_threads, multiplexed_thread);
00192 }
00193
00194 multiplexed_nudge(multiplexed_thread);
00195
00196 ao2_unlock(multiplexed_threads);
00197
00198 ao2_ref(multiplexed_thread, -1);
00199
00200 return 0;
00201 }
00202
00203
00204 static void *multiplexed_thread_function(void *data)
00205 {
00206 struct multiplexed_thread *multiplexed_thread = data;
00207 int fds = multiplexed_thread->pipe[0];
00208
00209 ao2_lock(multiplexed_thread);
00210
00211 ast_debug(1, "Starting actual thread for multiplexed thread '%p'\n", multiplexed_thread);
00212
00213 while (multiplexed_thread->thread != AST_PTHREADT_STOP) {
00214 struct ast_channel *winner = NULL, *first = multiplexed_thread->chans[0];
00215 int to = -1, outfd = -1;
00216
00217
00218 memmove(multiplexed_thread->chans, multiplexed_thread->chans + 1, sizeof(struct ast_channel *) * (multiplexed_thread->service_count - 1));
00219 multiplexed_thread->chans[multiplexed_thread->service_count - 1] = first;
00220
00221 multiplexed_thread->waiting = 1;
00222 ao2_unlock(multiplexed_thread);
00223 winner = ast_waitfor_nandfds(multiplexed_thread->chans, multiplexed_thread->service_count, &fds, 1, NULL, &outfd, &to);
00224 multiplexed_thread->waiting = 0;
00225 ao2_lock(multiplexed_thread);
00226 if (multiplexed_thread->thread == AST_PTHREADT_STOP) {
00227 break;
00228 }
00229
00230 if (outfd > -1) {
00231 int nudge;
00232
00233 if (read(multiplexed_thread->pipe[0], &nudge, sizeof(nudge)) < 0) {
00234 if (errno != EINTR && errno != EAGAIN) {
00235 ast_log(LOG_WARNING, "read() failed for pipe on multiplexed thread '%p': %s\n", multiplexed_thread, strerror(errno));
00236 }
00237 }
00238 }
00239 if (winner && winner->bridge) {
00240 struct ast_bridge *bridge = winner->bridge;
00241 int stop = 0;
00242 ao2_unlock(multiplexed_thread);
00243 while ((bridge = winner->bridge) && ao2_trylock(bridge)) {
00244 sched_yield();
00245 if (multiplexed_thread->thread == AST_PTHREADT_STOP) {
00246 stop = 1;
00247 break;
00248 }
00249 }
00250 if (!stop && bridge) {
00251 ast_bridge_handle_trip(bridge, NULL, winner, -1);
00252 ao2_unlock(bridge);
00253 }
00254 ao2_lock(multiplexed_thread);
00255 }
00256 }
00257
00258 multiplexed_thread->thread = AST_PTHREADT_NULL;
00259
00260 ast_debug(1, "Stopping actual thread for multiplexed thread '%p'\n", multiplexed_thread);
00261
00262 ao2_unlock(multiplexed_thread);
00263 ao2_ref(multiplexed_thread, -1);
00264
00265 return NULL;
00266 }
00267
00268
00269 static void multiplexed_add_or_remove(struct multiplexed_thread *multiplexed_thread, struct ast_channel *chan, int add)
00270 {
00271 int i, removed = 0;
00272 pthread_t thread = AST_PTHREADT_NULL;
00273
00274 ao2_lock(multiplexed_thread);
00275
00276 multiplexed_nudge(multiplexed_thread);
00277
00278 for (i = 0; i < MULTIPLEXED_MAX_CHANNELS; i++) {
00279 if (multiplexed_thread->chans[i] == chan) {
00280 if (!add) {
00281 multiplexed_thread->chans[i] = NULL;
00282 multiplexed_thread->service_count--;
00283 removed = 1;
00284 }
00285 break;
00286 } else if (!multiplexed_thread->chans[i] && add) {
00287 multiplexed_thread->chans[i] = chan;
00288 multiplexed_thread->service_count++;
00289 break;
00290 }
00291 }
00292
00293 if (multiplexed_thread->service_count && multiplexed_thread->thread == AST_PTHREADT_NULL) {
00294 ao2_ref(multiplexed_thread, +1);
00295 if (ast_pthread_create(&multiplexed_thread->thread, NULL, multiplexed_thread_function, multiplexed_thread)) {
00296 ao2_ref(multiplexed_thread, -1);
00297 ast_debug(1, "Failed to create an actual thread for multiplexed thread '%p', trying next time\n", multiplexed_thread);
00298 }
00299 } else if (!multiplexed_thread->service_count && multiplexed_thread->thread != AST_PTHREADT_NULL) {
00300 thread = multiplexed_thread->thread;
00301 multiplexed_thread->thread = AST_PTHREADT_STOP;
00302 } else if (!add && removed) {
00303 memmove(multiplexed_thread->chans + i, multiplexed_thread->chans + i + 1, sizeof(struct ast_channel *) * (MULTIPLEXED_MAX_CHANNELS - (i + 1)));
00304 }
00305
00306 ao2_unlock(multiplexed_thread);
00307
00308 if (thread != AST_PTHREADT_NULL) {
00309 pthread_join(thread, NULL);
00310 }
00311
00312 return;
00313 }
00314
00315
00316 static int multiplexed_bridge_join(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel)
00317 {
00318 struct ast_channel *c0 = AST_LIST_FIRST(&bridge->channels)->chan, *c1 = AST_LIST_LAST(&bridge->channels)->chan;
00319 struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt;
00320
00321 ast_debug(1, "Adding channel '%s' to multiplexed thread '%p' for monitoring\n", bridge_channel->chan->name, multiplexed_thread);
00322
00323 multiplexed_add_or_remove(multiplexed_thread, bridge_channel->chan, 1);
00324
00325
00326 if (c0 == c1) {
00327 return 0;
00328 }
00329
00330 if (((c0->writeformat == c1->readformat) && (c0->readformat == c1->writeformat) && (c0->nativeformats == c1->nativeformats))) {
00331 return 0;
00332 }
00333
00334 return ast_channel_make_compatible(c0, c1);
00335 }
00336
00337
00338 static int multiplexed_bridge_leave(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel)
00339 {
00340 struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt;
00341
00342 ast_debug(1, "Removing channel '%s' from multiplexed thread '%p'\n", bridge_channel->chan->name, multiplexed_thread);
00343
00344 multiplexed_add_or_remove(multiplexed_thread, bridge_channel->chan, 0);
00345
00346 return 0;
00347 }
00348
00349
00350 static void multiplexed_bridge_suspend(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel)
00351 {
00352 struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt;
00353
00354 ast_debug(1, "Suspending channel '%s' from multiplexed thread '%p'\n", bridge_channel->chan->name, multiplexed_thread);
00355
00356 multiplexed_add_or_remove(multiplexed_thread, bridge_channel->chan, 0);
00357
00358 return;
00359 }
00360
00361
00362 static void multiplexed_bridge_unsuspend(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel)
00363 {
00364 struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt;
00365
00366 ast_debug(1, "Unsuspending channel '%s' from multiplexed thread '%p'\n", bridge_channel->chan->name, multiplexed_thread);
00367
00368 multiplexed_add_or_remove(multiplexed_thread, bridge_channel->chan, 1);
00369
00370 return;
00371 }
00372
00373
00374 static enum ast_bridge_write_result multiplexed_bridge_write(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel, struct ast_frame *frame)
00375 {
00376 struct ast_bridge_channel *other;
00377
00378 if (AST_LIST_FIRST(&bridge->channels) == AST_LIST_LAST(&bridge->channels)) {
00379 return AST_BRIDGE_WRITE_FAILED;
00380 }
00381
00382 if (!(other = (AST_LIST_FIRST(&bridge->channels) == bridge_channel ? AST_LIST_LAST(&bridge->channels) : AST_LIST_FIRST(&bridge->channels)))) {
00383 return AST_BRIDGE_WRITE_FAILED;
00384 }
00385
00386 if (other->state == AST_BRIDGE_CHANNEL_STATE_WAIT) {
00387 ast_write(other->chan, frame);
00388 }
00389
00390 return AST_BRIDGE_WRITE_SUCCESS;
00391 }
00392
00393 static struct ast_bridge_technology multiplexed_bridge = {
00394 .name = "multiplexed_bridge",
00395 .capabilities = AST_BRIDGE_CAPABILITY_1TO1MIX,
00396 .preference = AST_BRIDGE_PREFERENCE_HIGH,
00397 .formats = AST_FORMAT_AUDIO_MASK | AST_FORMAT_VIDEO_MASK | AST_FORMAT_TEXT_MASK,
00398 .create = multiplexed_bridge_create,
00399 .destroy = multiplexed_bridge_destroy,
00400 .join = multiplexed_bridge_join,
00401 .leave = multiplexed_bridge_leave,
00402 .suspend = multiplexed_bridge_suspend,
00403 .unsuspend = multiplexed_bridge_unsuspend,
00404 .write = multiplexed_bridge_write,
00405 };
00406
00407 static int unload_module(void)
00408 {
00409 int res = ast_bridge_technology_unregister(&multiplexed_bridge);
00410
00411 ao2_ref(multiplexed_threads, -1);
00412
00413 return res;
00414 }
00415
00416 static int load_module(void)
00417 {
00418 if (!(multiplexed_threads = ao2_container_alloc(MULTIPLEXED_BUCKETS, NULL, NULL))) {
00419 return AST_MODULE_LOAD_DECLINE;
00420 }
00421
00422 return ast_bridge_technology_register(&multiplexed_bridge);
00423 }
00424
00425 AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "Multiplexed two channel bridging module");