00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 #include "asterisk.h"
00029
00030 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
00031
00032 #include <stdio.h>
00033 #include <stdlib.h>
00034 #include <string.h>
00035 #include <sys/types.h>
00036 #include <sys/stat.h>
00037 #include <fcntl.h>
00038
00039 #include "asterisk/module.h"
00040 #include "asterisk/channel.h"
00041 #include "asterisk/bridging.h"
00042 #include "asterisk/bridging_technology.h"
00043 #include "asterisk/frame.h"
00044 #include "asterisk/astobj2.h"
00045
00046
00047 #define MULTIPLEXED_BUCKETS 53
00048
00049
00050 #define MULTIPLEXED_MAX_CHANNELS 8
00051
00052
00053 struct multiplexed_thread {
00054
00055 pthread_t thread;
00056
00057 int pipe[2];
00058
00059 struct ast_channel *chans[MULTIPLEXED_MAX_CHANNELS];
00060
00061 unsigned int count;
00062
00063 unsigned int waiting:1;
00064
00065 unsigned int service_count;
00066 };
00067
00068
00069 static struct ao2_container *multiplexed_threads;
00070
00071
00072 static int find_multiplexed_thread(void *obj, void *arg, int flags)
00073 {
00074 struct multiplexed_thread *multiplexed_thread = obj;
00075 return (multiplexed_thread->count <= (MULTIPLEXED_MAX_CHANNELS - 2)) ? CMP_MATCH | CMP_STOP : 0;
00076 }
00077
00078
00079 static void destroy_multiplexed_thread(void *obj)
00080 {
00081 struct multiplexed_thread *multiplexed_thread = obj;
00082
00083 if (multiplexed_thread->pipe[0] > -1) {
00084 close(multiplexed_thread->pipe[0]);
00085 }
00086 if (multiplexed_thread->pipe[1] > -1) {
00087 close(multiplexed_thread->pipe[1]);
00088 }
00089
00090 return;
00091 }
00092
00093
00094 static int multiplexed_bridge_create(struct ast_bridge *bridge)
00095 {
00096 struct multiplexed_thread *multiplexed_thread;
00097
00098 ao2_lock(multiplexed_threads);
00099
00100
00101 if (!(multiplexed_thread = ao2_callback(multiplexed_threads, 0, find_multiplexed_thread, NULL))) {
00102 int flags;
00103
00104
00105 if (!(multiplexed_thread = ao2_alloc(sizeof(*multiplexed_thread), destroy_multiplexed_thread))) {
00106 ast_debug(1, "Failed to find or create a new multiplexed thread for bridge '%p'\n", bridge);
00107 ao2_unlock(multiplexed_threads);
00108 return -1;
00109 }
00110
00111 multiplexed_thread->pipe[0] = multiplexed_thread->pipe[1] = -1;
00112
00113 if (pipe(multiplexed_thread->pipe)) {
00114 ast_debug(1, "Failed to create a pipe for poking a multiplexed thread for bridge '%p'\n", bridge);
00115 ao2_ref(multiplexed_thread, -1);
00116 ao2_unlock(multiplexed_threads);
00117 return -1;
00118 }
00119
00120
00121 flags = fcntl(multiplexed_thread->pipe[0], F_GETFL);
00122 if (fcntl(multiplexed_thread->pipe[0], F_SETFL, flags | O_NONBLOCK) < 0) {
00123 ast_log(LOG_WARNING, "Failed to setup first nudge pipe for non-blocking operation on %p (%d: %s)\n", bridge, errno, strerror(errno));
00124 ao2_ref(multiplexed_thread, -1);
00125 ao2_unlock(multiplexed_threads);
00126 return -1;
00127 }
00128 flags = fcntl(multiplexed_thread->pipe[1], F_GETFL);
00129 if (fcntl(multiplexed_thread->pipe[1], F_SETFL, flags | O_NONBLOCK) < 0) {
00130 ast_log(LOG_WARNING, "Failed to setup second nudge pipe for non-blocking operation on %p (%d: %s)\n", bridge, errno, strerror(errno));
00131 ao2_ref(multiplexed_thread, -1);
00132 ao2_unlock(multiplexed_threads);
00133 return -1;
00134 }
00135
00136
00137 multiplexed_thread->thread = AST_PTHREADT_NULL;
00138
00139
00140 ao2_link(multiplexed_threads, multiplexed_thread);
00141 ast_debug(1, "Created multiplexed thread '%p' for bridge '%p'\n", multiplexed_thread, bridge);
00142 } else {
00143 ast_debug(1, "Found multiplexed thread '%p' for bridge '%p'\n", multiplexed_thread, bridge);
00144 }
00145
00146
00147 multiplexed_thread->count += 2;
00148
00149 ao2_unlock(multiplexed_threads);
00150
00151 bridge->bridge_pvt = multiplexed_thread;
00152
00153 return 0;
00154 }
00155
00156
00157 static void multiplexed_nudge(struct multiplexed_thread *multiplexed_thread)
00158 {
00159 int nudge = 0;
00160
00161 if (multiplexed_thread->thread == AST_PTHREADT_NULL) {
00162 return;
00163 }
00164
00165 if (write(multiplexed_thread->pipe[1], &nudge, sizeof(nudge)) != sizeof(nudge)) {
00166 ast_log(LOG_ERROR, "We couldn't poke multiplexed thread '%p'... something is VERY wrong\n", multiplexed_thread);
00167 }
00168
00169 while (multiplexed_thread->waiting) {
00170 sched_yield();
00171 }
00172
00173 return;
00174 }
00175
00176
00177 static int multiplexed_bridge_destroy(struct ast_bridge *bridge)
00178 {
00179 struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt;
00180
00181 ao2_lock(multiplexed_threads);
00182
00183 multiplexed_thread->count -= 2;
00184
00185 if (!multiplexed_thread->count) {
00186 ast_debug(1, "Unlinking multiplexed thread '%p' since nobody is using it anymore\n", multiplexed_thread);
00187 ao2_unlink(multiplexed_threads, multiplexed_thread);
00188 }
00189
00190 multiplexed_nudge(multiplexed_thread);
00191
00192 ao2_unlock(multiplexed_threads);
00193
00194 ao2_ref(multiplexed_thread, -1);
00195
00196 return 0;
00197 }
00198
00199
00200 static void *multiplexed_thread_function(void *data)
00201 {
00202 struct multiplexed_thread *multiplexed_thread = data;
00203 int fds = multiplexed_thread->pipe[0];
00204
00205 ao2_lock(multiplexed_thread);
00206
00207 ast_debug(1, "Starting actual thread for multiplexed thread '%p'\n", multiplexed_thread);
00208
00209 while (multiplexed_thread->thread != AST_PTHREADT_STOP) {
00210 struct ast_channel *winner = NULL, *first = multiplexed_thread->chans[0];
00211 int to = -1, outfd = -1;
00212
00213
00214 memmove(multiplexed_thread->chans, multiplexed_thread->chans + 1, sizeof(struct ast_channel *) * (multiplexed_thread->service_count - 1));
00215 multiplexed_thread->chans[multiplexed_thread->service_count - 1] = first;
00216
00217 multiplexed_thread->waiting = 1;
00218 ao2_unlock(multiplexed_thread);
00219 winner = ast_waitfor_nandfds(multiplexed_thread->chans, multiplexed_thread->service_count, &fds, 1, NULL, &outfd, &to);
00220 multiplexed_thread->waiting = 0;
00221 ao2_lock(multiplexed_thread);
00222
00223 if (outfd > -1) {
00224 int nudge;
00225
00226 if (read(multiplexed_thread->pipe[0], &nudge, sizeof(nudge)) < 0) {
00227 if (errno != EINTR && errno != EAGAIN) {
00228 ast_log(LOG_WARNING, "read() failed for pipe on multiplexed thread '%p': %s\n", multiplexed_thread, strerror(errno));
00229 }
00230 }
00231 }
00232 if (winner && winner->bridge) {
00233 ast_bridge_handle_trip(winner->bridge, NULL, winner, -1);
00234 }
00235 }
00236
00237 multiplexed_thread->thread = AST_PTHREADT_NULL;
00238
00239 ast_debug(1, "Stopping actual thread for multiplexed thread '%p'\n", multiplexed_thread);
00240
00241 ao2_unlock(multiplexed_thread);
00242 ao2_ref(multiplexed_thread, -1);
00243
00244 return NULL;
00245 }
00246
00247
00248 static void multiplexed_add_or_remove(struct multiplexed_thread *multiplexed_thread, struct ast_channel *chan, int add)
00249 {
00250 int i, removed = 0;
00251 pthread_t thread = AST_PTHREADT_NULL;
00252
00253 ao2_lock(multiplexed_thread);
00254
00255 multiplexed_nudge(multiplexed_thread);
00256
00257 for (i = 0; i < MULTIPLEXED_MAX_CHANNELS; i++) {
00258 if (multiplexed_thread->chans[i] == chan) {
00259 if (!add) {
00260 multiplexed_thread->chans[i] = NULL;
00261 multiplexed_thread->service_count--;
00262 removed = 1;
00263 }
00264 break;
00265 } else if (!multiplexed_thread->chans[i] && add) {
00266 multiplexed_thread->chans[i] = chan;
00267 multiplexed_thread->service_count++;
00268 break;
00269 }
00270 }
00271
00272 if (multiplexed_thread->service_count && multiplexed_thread->thread == AST_PTHREADT_NULL) {
00273 ao2_ref(multiplexed_thread, +1);
00274 if (ast_pthread_create(&multiplexed_thread->thread, NULL, multiplexed_thread_function, multiplexed_thread)) {
00275 ao2_ref(multiplexed_thread, -1);
00276 ast_debug(1, "Failed to create an actual thread for multiplexed thread '%p', trying next time\n", multiplexed_thread);
00277 }
00278 } else if (!multiplexed_thread->service_count && multiplexed_thread->thread != AST_PTHREADT_NULL) {
00279 thread = multiplexed_thread->thread;
00280 multiplexed_thread->thread = AST_PTHREADT_STOP;
00281 } else if (!add && removed) {
00282 memmove(multiplexed_thread->chans + i, multiplexed_thread->chans + i + 1, sizeof(struct ast_channel *) * (MULTIPLEXED_MAX_CHANNELS - (i + 1)));
00283 }
00284
00285 ao2_unlock(multiplexed_thread);
00286
00287 if (thread != AST_PTHREADT_NULL) {
00288 pthread_join(thread, NULL);
00289 }
00290
00291 return;
00292 }
00293
00294
00295 static int multiplexed_bridge_join(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel)
00296 {
00297 struct ast_channel *c0 = AST_LIST_FIRST(&bridge->channels)->chan, *c1 = AST_LIST_LAST(&bridge->channels)->chan;
00298 struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt;
00299
00300 ast_debug(1, "Adding channel '%s' to multiplexed thread '%p' for monitoring\n", bridge_channel->chan->name, multiplexed_thread);
00301
00302 multiplexed_add_or_remove(multiplexed_thread, bridge_channel->chan, 1);
00303
00304
00305 if (c0 == c1) {
00306 return 0;
00307 }
00308
00309 if (((c0->writeformat == c1->readformat) && (c0->readformat == c1->writeformat) && (c0->nativeformats == c1->nativeformats))) {
00310 return 0;
00311 }
00312
00313 return ast_channel_make_compatible(c0, c1);
00314 }
00315
00316
00317 static int multiplexed_bridge_leave(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel)
00318 {
00319 struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt;
00320
00321 ast_debug(1, "Removing channel '%s' from multiplexed thread '%p'\n", bridge_channel->chan->name, multiplexed_thread);
00322
00323 multiplexed_add_or_remove(multiplexed_thread, bridge_channel->chan, 0);
00324
00325 return 0;
00326 }
00327
00328
00329 static void multiplexed_bridge_suspend(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel)
00330 {
00331 struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt;
00332
00333 ast_debug(1, "Suspending channel '%s' from multiplexed thread '%p'\n", bridge_channel->chan->name, multiplexed_thread);
00334
00335 multiplexed_add_or_remove(multiplexed_thread, bridge_channel->chan, 0);
00336
00337 return;
00338 }
00339
00340
00341 static void multiplexed_bridge_unsuspend(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel)
00342 {
00343 struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt;
00344
00345 ast_debug(1, "Unsuspending channel '%s' from multiplexed thread '%p'\n", bridge_channel->chan->name, multiplexed_thread);
00346
00347 multiplexed_add_or_remove(multiplexed_thread, bridge_channel->chan, 1);
00348
00349 return;
00350 }
00351
00352
00353 static enum ast_bridge_write_result multiplexed_bridge_write(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel, struct ast_frame *frame)
00354 {
00355 struct ast_bridge_channel *other;
00356
00357 if (AST_LIST_FIRST(&bridge->channels) == AST_LIST_LAST(&bridge->channels)) {
00358 return AST_BRIDGE_WRITE_FAILED;
00359 }
00360
00361 if (!(other = (AST_LIST_FIRST(&bridge->channels) == bridge_channel ? AST_LIST_LAST(&bridge->channels) : AST_LIST_FIRST(&bridge->channels)))) {
00362 return AST_BRIDGE_WRITE_FAILED;
00363 }
00364
00365 if (other->state == AST_BRIDGE_CHANNEL_STATE_WAIT) {
00366 ast_write(other->chan, frame);
00367 }
00368
00369 return AST_BRIDGE_WRITE_SUCCESS;
00370 }
00371
00372 static struct ast_bridge_technology multiplexed_bridge = {
00373 .name = "multiplexed_bridge",
00374 .capabilities = AST_BRIDGE_CAPABILITY_1TO1MIX,
00375 .preference = AST_BRIDGE_PREFERENCE_HIGH,
00376 .formats = AST_FORMAT_AUDIO_MASK | AST_FORMAT_VIDEO_MASK | AST_FORMAT_TEXT_MASK,
00377 .create = multiplexed_bridge_create,
00378 .destroy = multiplexed_bridge_destroy,
00379 .join = multiplexed_bridge_join,
00380 .leave = multiplexed_bridge_leave,
00381 .suspend = multiplexed_bridge_suspend,
00382 .unsuspend = multiplexed_bridge_unsuspend,
00383 .write = multiplexed_bridge_write,
00384 };
00385
00386 static int unload_module(void)
00387 {
00388 int res = ast_bridge_technology_unregister(&multiplexed_bridge);
00389
00390 ao2_ref(multiplexed_threads, -1);
00391
00392 return res;
00393 }
00394
00395 static int load_module(void)
00396 {
00397 if (!(multiplexed_threads = ao2_container_alloc(MULTIPLEXED_BUCKETS, NULL, NULL))) {
00398 return AST_MODULE_LOAD_DECLINE;
00399 }
00400
00401 return ast_bridge_technology_register(&multiplexed_bridge);
00402 }
00403
00404 AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "Multiplexed two channel bridging module");