39 #include <sys/types.h>
51 #define MULTIPLEXED_BUCKETS 53
54 #define MULTIPLEXED_MAX_CHANNELS 8
87 if (multiplexed_thread->
pipe[0] > -1) {
88 close(multiplexed_thread->
pipe[0]);
90 if (multiplexed_thread->
pipe[1] > -1) {
91 close(multiplexed_thread->
pipe[1]);
110 ast_debug(1,
"Failed to find or create a new multiplexed thread for bridge '%p'\n", bridge);
115 multiplexed_thread->
pipe[0] = multiplexed_thread->
pipe[1] = -1;
117 if (
pipe(multiplexed_thread->
pipe)) {
118 ast_debug(1,
"Failed to create a pipe for poking a multiplexed thread for bridge '%p'\n", bridge);
119 ao2_ref(multiplexed_thread, -1);
125 flags = fcntl(multiplexed_thread->
pipe[0], F_GETFL);
126 if (fcntl(multiplexed_thread->
pipe[0], F_SETFL, flags | O_NONBLOCK) < 0) {
128 ao2_ref(multiplexed_thread, -1);
132 flags = fcntl(multiplexed_thread->
pipe[1], F_GETFL);
133 if (fcntl(multiplexed_thread->
pipe[1], F_SETFL, flags | O_NONBLOCK) < 0) {
135 ao2_ref(multiplexed_thread, -1);
144 ao2_link(multiplexed_threads, multiplexed_thread);
145 ast_debug(1,
"Created multiplexed thread '%p' for bridge '%p'\n", multiplexed_thread, bridge);
147 ast_debug(1,
"Found multiplexed thread '%p' for bridge '%p'\n", multiplexed_thread, bridge);
151 multiplexed_thread->
count += 2;
169 if (write(multiplexed_thread->
pipe[1], &nudge,
sizeof(nudge)) !=
sizeof(nudge)) {
170 ast_log(
LOG_ERROR,
"We couldn't poke multiplexed thread '%p'... something is VERY wrong\n", multiplexed_thread);
173 while (multiplexed_thread->
waiting) {
187 multiplexed_thread->
count -= 2;
189 if (!multiplexed_thread->
count) {
190 ast_debug(1,
"Unlinking multiplexed thread '%p' since nobody is using it anymore\n", multiplexed_thread);
191 ao2_unlink(multiplexed_threads, multiplexed_thread);
198 ao2_ref(multiplexed_thread, -1);
207 int fds = multiplexed_thread->
pipe[0];
211 ast_debug(1,
"Starting actual thread for multiplexed thread '%p'\n", multiplexed_thread);
215 int to = -1, outfd = -1;
221 multiplexed_thread->
waiting = 1;
224 multiplexed_thread->
waiting = 0;
233 if (read(multiplexed_thread->
pipe[0], &nudge,
sizeof(nudge)) < 0) {
235 ast_log(
LOG_WARNING,
"read() failed for pipe on multiplexed thread '%p': %s\n", multiplexed_thread, strerror(
errno));
239 if (winner && winner->
bridge) {
250 if (!stop && bridge) {
260 ast_debug(1,
"Stopping actual thread for multiplexed thread '%p'\n", multiplexed_thread);
263 ao2_ref(multiplexed_thread, -1);
279 if (multiplexed_thread->
chans[i] == chan) {
281 multiplexed_thread->
chans[i] = NULL;
286 }
else if (!multiplexed_thread->
chans[i] && add) {
287 multiplexed_thread->
chans[i] = chan;
294 ao2_ref(multiplexed_thread, +1);
296 ao2_ref(multiplexed_thread, -1);
297 ast_debug(1,
"Failed to create an actual thread for multiplexed thread '%p', trying next time\n", multiplexed_thread);
300 thread = multiplexed_thread->
thread;
302 }
else if (!add && removed) {
303 memmove(multiplexed_thread->
chans + i, multiplexed_thread->
chans + i + 1,
sizeof(
struct ast_channel *) * (MULTIPLEXED_MAX_CHANNELS - (i + 1)));
309 pthread_join(thread, NULL);
321 ast_debug(1,
"Adding channel '%s' to multiplexed thread '%p' for monitoring\n", bridge_channel->
chan->
name, multiplexed_thread);
342 ast_debug(1,
"Removing channel '%s' from multiplexed thread '%p'\n", bridge_channel->
chan->
name, multiplexed_thread);
354 ast_debug(1,
"Suspending channel '%s' from multiplexed thread '%p'\n", bridge_channel->
chan->
name, multiplexed_thread);
366 ast_debug(1,
"Unsuspending channel '%s' from multiplexed thread '%p'\n", bridge_channel->
chan->
name, multiplexed_thread);
394 .
name =
"multiplexed_bridge",
411 ao2_ref(multiplexed_threads, -1);
static int multiplexed_bridge_destroy(struct ast_bridge *bridge)
Destroy function which unreserves/unreferences/removes a multiplexed thread structure.
static void destroy_multiplexed_thread(void *obj)
Destroy callback for a multiplexed thread structure.
Main Channel structure associated with a channel.
#define AST_MODULE_INFO_STANDARD(keystr, desc)
#define MULTIPLEXED_BUCKETS
Number of buckets our multiplexed thread container can have.
Asterisk main include file. File version handling, generic pbx functions.
#define ao2_link(arg1, arg2)
#define AST_LIST_FIRST(head)
Returns the first entry contained in a list.
static int multiplexed_bridge_join(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel)
Join function which actually adds the channel into the array to be monitored.
Structure which represents a single thread handling multiple 2 channel bridges.
#define MULTIPLEXED_MAX_CHANNELS
Number of channels we handle in a single thread.
#define ao2_callback(c, flags, cb_fn, arg)
static int multiplexed_bridge_leave(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel)
Leave function which actually removes the channel from the array.
struct ast_channel * chans[MULTIPLEXED_MAX_CHANNELS]
static void multiplexed_bridge_suspend(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel)
Suspend function which means control of the channel is going elsewhere.
static struct ao2_container * multiplexed_threads
Container of all operating multiplexed threads.
static void multiplexed_bridge_unsuspend(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel)
Unsuspend function which means control of the channel is coming back to us.
static int multiplexed_bridge_create(struct ast_bridge *bridge)
Create function which finds/reserves/references a multiplexed thread structure.
#define ast_bridge_technology_register(technology)
See __ast_bridge_technology_register()
struct ast_channel * ast_waitfor_nandfds(struct ast_channel **chan, int n, int *fds, int nfds, int *exception, int *outfd, int *ms)
Waits for activity on a group of channels.
static enum ast_bridge_write_result multiplexed_bridge_write(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel, struct ast_frame *frame)
Write function for writing frames into the bridge.
#define ast_debug(level,...)
Log a DEBUG message.
General Asterisk PBX channel definitions.
#define AST_PTHREADT_NULL
Asterisk internal frame definitions.
#define AST_FORMAT_TEXT_MASK
#define ao2_ref(o, delta)
unsigned int service_count
static int unload_module(void)
int ast_bridge_technology_unregister(struct ast_bridge_technology *technology)
Unregister a bridge technology from use.
void ast_bridge_handle_trip(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel, struct ast_channel *chan, int outfd)
Feed notification that a frame is waiting on a channel into the bridging core.
static int load_module(void)
Structure that contains information about a bridge.
static void * multiplexed_thread_function(void *data)
Thread function that executes for multiplexed threads.
const ast_string_field name
struct sla_ringing_trunk * first
void ast_log(int level, const char *file, int line, const char *function, const char *fmt,...)
Used for sending a log message This is the standard logger function. Probably the only way you will i...
static struct ast_bridge_technology multiplexed_bridge
#define ao2_alloc(data_size, destructor_fn)
#define AST_LIST_LAST(head)
Returns the last entry contained in a list.
struct ast_bridge * bridge
#define AST_FORMAT_VIDEO_MASK
#define ast_pthread_create(a, b, c, d)
#define AST_FORMAT_AUDIO_MASK
int ast_write(struct ast_channel *chan, struct ast_frame *frame)
Write a frame to a channel This function writes the given frame to the indicated channel.
struct ast_channel * chan
Structure that contains information regarding a channel in a bridge.
int ast_channel_make_compatible(struct ast_channel *c0, struct ast_channel *c1)
Makes two channel formats compatible.
#define ao2_container_alloc(arg1, arg2, arg3)
Structure that is the essence of a bridge technology.
#define AST_PTHREADT_STOP
ast_bridge_write_result
Return values for bridge technology write function.
Data structure associated with a single frame of data.
static int find_multiplexed_thread(void *obj, void *arg, int flags)
Callback function for finding a free multiplexed thread.
#define ASTERISK_GPL_KEY
The text the key() function should return.
Asterisk module definitions.
enum ast_bridge_channel_state state
#define ao2_unlink(arg1, arg2)
static void multiplexed_nudge(struct multiplexed_thread *multiplexed_thread)
Internal function which nudges the thread.
#define ASTERISK_FILE_VERSION(file, version)
Register/unregister a source code file with the core.
static void multiplexed_add_or_remove(struct multiplexed_thread *multiplexed_thread, struct ast_channel *chan, int add)
Helper function which adds or removes a channel and nudges the thread.