00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 #include "asterisk.h"
00032
00033 ASTERISK_FILE_VERSION(__FILE__, "$Revision: 335497 $");
00034
00035 #include <stdlib.h>
00036 #include <stdio.h>
00037 #include <string.h>
00038 #include <unistd.h>
00039 #include <errno.h>
00040
00041 #include "ais.h"
00042
00043 #include "asterisk/module.h"
00044 #include "asterisk/utils.h"
00045 #include "asterisk/cli.h"
00046 #include "asterisk/logger.h"
00047 #include "asterisk/event.h"
00048 #include "asterisk/config.h"
00049 #include "asterisk/linkedlists.h"
00050 #include "asterisk/devicestate.h"
00051
00052 #ifndef AST_MODULE
00053
00054 #define AST_MODULE "res_ais"
00055 #endif
00056
00057 SaEvtHandleT evt_handle;
00058 static SaAisErrorT evt_init_res;
00059
00060 void evt_channel_open_cb(SaInvocationT invocation, SaEvtChannelHandleT channel_handle,
00061 SaAisErrorT error);
00062 void evt_event_deliver_cb(SaEvtSubscriptionIdT subscription_id,
00063 const SaEvtEventHandleT event_handle, const SaSizeT event_datalen);
00064
00065 static const SaEvtCallbacksT evt_callbacks = {
00066 .saEvtChannelOpenCallback = evt_channel_open_cb,
00067 .saEvtEventDeliverCallback = evt_event_deliver_cb,
00068 };
00069
00070 static const struct {
00071 const char *str;
00072 enum ast_event_type type;
00073 } supported_event_types[] = {
00074 { "mwi", AST_EVENT_MWI },
00075 { "device_state", AST_EVENT_DEVICE_STATE_CHANGE },
00076 };
00077
00078
00079 static int unique_id;
00080
00081 struct subscribe_event {
00082 AST_LIST_ENTRY(subscribe_event) entry;
00083
00084
00085
00086 SaEvtSubscriptionIdT id;
00087 enum ast_event_type type;
00088 };
00089
00090 struct publish_event {
00091 AST_LIST_ENTRY(publish_event) entry;
00092
00093
00094 struct ast_event_sub *sub;
00095 enum ast_event_type type;
00096 };
00097
00098 struct event_channel {
00099 AST_RWLIST_ENTRY(event_channel) entry;
00100 AST_LIST_HEAD_NOLOCK(, subscribe_event) subscribe_events;
00101 AST_LIST_HEAD_NOLOCK(, publish_event) publish_events;
00102 SaEvtChannelHandleT handle;
00103 char name[1];
00104 };
00105
00106 static AST_RWLIST_HEAD_STATIC(event_channels, event_channel);
00107
00108 void evt_channel_open_cb(SaInvocationT invocation, SaEvtChannelHandleT channel_handle,
00109 SaAisErrorT error)
00110 {
00111
00112 }
00113
00114 static void queue_event(struct ast_event *ast_event)
00115 {
00116 ast_event_queue_and_cache(ast_event);
00117 }
00118
00119 void evt_event_deliver_cb(SaEvtSubscriptionIdT sub_id,
00120 const SaEvtEventHandleT event_handle, const SaSizeT event_datalen)
00121 {
00122
00123
00124
00125
00126 static unsigned char buf[4096];
00127 struct ast_event *event_dup, *event = (void *) buf;
00128 SaAisErrorT ais_res;
00129 SaSizeT len = sizeof(buf);
00130
00131 if (event_datalen > len) {
00132 ast_log(LOG_ERROR, "Event received with size %u, which is too big\n"
00133 "for the allocated size %u. Change the code to increase the size.\n",
00134 (unsigned int) event_datalen, (unsigned int) len);
00135 return;
00136 }
00137
00138 if (event_datalen < ast_event_minimum_length()) {
00139 ast_debug(1, "Ignoring event that's too small. %u < %u\n",
00140 (unsigned int) event_datalen,
00141 (unsigned int) ast_event_minimum_length());
00142 return;
00143 }
00144
00145 ais_res = saEvtEventDataGet(event_handle, event, &len);
00146 if (ais_res != SA_AIS_OK) {
00147 ast_log(LOG_ERROR, "Error retrieving event payload: %s\n",
00148 ais_err2str(ais_res));
00149 return;
00150 }
00151
00152 if (!ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(event, AST_EVENT_IE_EID))) {
00153
00154 return;
00155 }
00156
00157 if (!(event_dup = ast_malloc(len)))
00158 return;
00159
00160 memcpy(event_dup, event, len);
00161
00162 queue_event(event_dup);
00163 }
00164
00165 static const char *type_to_filter_str(enum ast_event_type type)
00166 {
00167 const char *filter_str = NULL;
00168 int i;
00169
00170 for (i = 0; i < ARRAY_LEN(supported_event_types); i++) {
00171 if (supported_event_types[i].type == type) {
00172 filter_str = supported_event_types[i].str;
00173 break;
00174 }
00175 }
00176
00177 return filter_str;
00178 }
00179
00180 static void ast_event_cb(const struct ast_event *ast_event, void *data)
00181 {
00182 SaEvtEventHandleT event_handle;
00183 SaAisErrorT ais_res;
00184 struct event_channel *event_channel = data;
00185 SaClmClusterNodeT local_node;
00186 SaEvtEventPatternArrayT pattern_array;
00187 SaEvtEventPatternT pattern;
00188 SaSizeT len;
00189 const char *filter_str;
00190 SaEvtEventIdT event_id;
00191
00192 ast_log(LOG_DEBUG, "Got an event to forward\n");
00193
00194 if (ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(ast_event, AST_EVENT_IE_EID))) {
00195
00196 ast_log(LOG_DEBUG, "Returning here\n");
00197 return;
00198 }
00199
00200 ais_res = saEvtEventAllocate(event_channel->handle, &event_handle);
00201 if (ais_res != SA_AIS_OK) {
00202 ast_log(LOG_ERROR, "Error allocating event: %s\n", ais_err2str(ais_res));
00203 ast_log(LOG_DEBUG, "Returning here\n");
00204 return;
00205 }
00206
00207 ais_res = saClmClusterNodeGet(clm_handle, SA_CLM_LOCAL_NODE_ID,
00208 SA_TIME_ONE_SECOND, &local_node);
00209 if (ais_res != SA_AIS_OK) {
00210 ast_log(LOG_ERROR, "Error getting local node name: %s\n", ais_err2str(ais_res));
00211 goto return_event_free;
00212 }
00213
00214 filter_str = type_to_filter_str(ast_event_get_type(ast_event));
00215 len = strlen(filter_str) + 1;
00216 pattern.pattern = (SaUint8T *) filter_str;
00217 pattern.patternSize = len;
00218 pattern.allocatedSize = len;
00219
00220 pattern_array.allocatedNumber = 1;
00221 pattern_array.patternsNumber = 1;
00222 pattern_array.patterns = &pattern;
00223
00224
00225
00226
00227
00228 ais_res = saEvtEventAttributesSet(event_handle, &pattern_array,
00229 SA_EVT_LOWEST_PRIORITY, SA_TIME_ONE_MINUTE, &local_node.nodeName);
00230 if (ais_res != SA_AIS_OK) {
00231 ast_log(LOG_ERROR, "Error setting event attributes: %s\n", ais_err2str(ais_res));
00232 goto return_event_free;
00233 }
00234
00235 ais_res = saEvtEventPublish(event_handle,
00236 ast_event, ast_event_get_size(ast_event), &event_id);
00237 if (ais_res != SA_AIS_OK) {
00238 ast_log(LOG_ERROR, "Error publishing event: %s\n", ais_err2str(ais_res));
00239 goto return_event_free;
00240 }
00241
00242 return_event_free:
00243 ais_res = saEvtEventFree(event_handle);
00244 if (ais_res != SA_AIS_OK) {
00245 ast_log(LOG_ERROR, "Error freeing allocated event: %s\n", ais_err2str(ais_res));
00246 }
00247 ast_log(LOG_DEBUG, "Returning here (event_free)\n");
00248 }
00249
00250 static char *ais_evt_show_event_channels(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
00251 {
00252 struct event_channel *event_channel;
00253
00254 switch (cmd) {
00255 case CLI_INIT:
00256 e->command = "ais evt show event channels";
00257 e->usage =
00258 "Usage: ais evt show event channels\n"
00259 " List configured event channels for the (EVT) Eventing service.\n";
00260 return NULL;
00261
00262 case CLI_GENERATE:
00263 return NULL;
00264 }
00265
00266 if (a->argc != e->args)
00267 return CLI_SHOWUSAGE;
00268
00269 ast_cli(a->fd, "\n"
00270 "=============================================================\n"
00271 "=== Event Channels ==========================================\n"
00272 "=============================================================\n"
00273 "===\n");
00274
00275 AST_RWLIST_RDLOCK(&event_channels);
00276 AST_RWLIST_TRAVERSE(&event_channels, event_channel, entry) {
00277 struct publish_event *publish_event;
00278 struct subscribe_event *subscribe_event;
00279
00280 ast_cli(a->fd, "=== ---------------------------------------------------------\n"
00281 "=== Event Channel Name: %s\n", event_channel->name);
00282
00283 AST_LIST_TRAVERSE(&event_channel->publish_events, publish_event, entry) {
00284 ast_cli(a->fd, "=== ==> Publishing Event Type: %s\n",
00285 type_to_filter_str(publish_event->type));
00286 }
00287
00288 AST_LIST_TRAVERSE(&event_channel->subscribe_events, subscribe_event, entry) {
00289 ast_cli(a->fd, "=== ==> Subscribing to Event Type: %s\n",
00290 type_to_filter_str(subscribe_event->type));
00291 }
00292
00293 ast_cli(a->fd, "=== ---------------------------------------------------------\n"
00294 "===\n");
00295 }
00296 AST_RWLIST_UNLOCK(&event_channels);
00297
00298 ast_cli(a->fd, "=============================================================\n"
00299 "\n");
00300
00301 return CLI_SUCCESS;
00302 }
00303
00304 static struct ast_cli_entry ais_cli[] = {
00305 AST_CLI_DEFINE(ais_evt_show_event_channels, "Show configured event channels"),
00306 };
00307
00308 static void add_publish_event(struct event_channel *event_channel, const char *event_type)
00309 {
00310 int i;
00311 enum ast_event_type type = -1;
00312 struct publish_event *publish_event;
00313
00314 for (i = 0; i < ARRAY_LEN(supported_event_types); i++) {
00315 if (!strcasecmp(event_type, supported_event_types[i].str)) {
00316 type = supported_event_types[i].type;
00317 break;
00318 }
00319 }
00320
00321 if (type == -1) {
00322 ast_log(LOG_WARNING, "publish_event option given with invalid value '%s'\n", event_type);
00323 return;
00324 }
00325
00326 if (type == AST_EVENT_DEVICE_STATE_CHANGE && ast_enable_distributed_devstate()) {
00327 return;
00328 }
00329
00330 if (!(publish_event = ast_calloc(1, sizeof(*publish_event)))) {
00331 return;
00332 }
00333
00334 publish_event->type = type;
00335 ast_log(LOG_DEBUG, "Subscribing to event type %d\n", type);
00336 publish_event->sub = ast_event_subscribe(type, ast_event_cb, "AIS", event_channel,
00337 AST_EVENT_IE_END);
00338 ast_event_dump_cache(publish_event->sub);
00339
00340 AST_LIST_INSERT_TAIL(&event_channel->publish_events, publish_event, entry);
00341 }
00342
00343 static SaAisErrorT set_egress_subscription(struct event_channel *event_channel,
00344 struct subscribe_event *subscribe_event)
00345 {
00346 SaAisErrorT ais_res;
00347 SaEvtEventFilterArrayT filter_array;
00348 SaEvtEventFilterT filter;
00349 const char *filter_str = NULL;
00350 SaSizeT len;
00351
00352
00353 filter_str = type_to_filter_str(subscribe_event->type);
00354
00355 filter.filterType = SA_EVT_EXACT_FILTER;
00356 len = strlen(filter_str) + 1;
00357 filter.filter.allocatedSize = len;
00358 filter.filter.patternSize = len;
00359 filter.filter.pattern = (SaUint8T *) filter_str;
00360
00361 filter_array.filtersNumber = 1;
00362 filter_array.filters = &filter;
00363
00364 ais_res = saEvtEventSubscribe(event_channel->handle, &filter_array,
00365 subscribe_event->id);
00366
00367 return ais_res;
00368 }
00369
00370 static void add_subscribe_event(struct event_channel *event_channel, const char *event_type)
00371 {
00372 int i;
00373 enum ast_event_type type = -1;
00374 struct subscribe_event *subscribe_event;
00375 SaAisErrorT ais_res;
00376
00377 for (i = 0; i < ARRAY_LEN(supported_event_types); i++) {
00378 if (!strcasecmp(event_type, supported_event_types[i].str)) {
00379 type = supported_event_types[i].type;
00380 break;
00381 }
00382 }
00383
00384 if (type == -1) {
00385 ast_log(LOG_WARNING, "subscribe_event option given with invalid value '%s'\n", event_type);
00386 return;
00387 }
00388
00389 if (type == AST_EVENT_DEVICE_STATE_CHANGE && ast_enable_distributed_devstate()) {
00390 return;
00391 }
00392
00393 if (!(subscribe_event = ast_calloc(1, sizeof(*subscribe_event)))) {
00394 return;
00395 }
00396
00397 subscribe_event->type = type;
00398 subscribe_event->id = ast_atomic_fetchadd_int(&unique_id, +1);
00399
00400 ais_res = set_egress_subscription(event_channel, subscribe_event);
00401 if (ais_res != SA_AIS_OK) {
00402 ast_log(LOG_ERROR, "Error setting up egress subscription: %s\n",
00403 ais_err2str(ais_res));
00404 free(subscribe_event);
00405 return;
00406 }
00407
00408 AST_LIST_INSERT_TAIL(&event_channel->subscribe_events, subscribe_event, entry);
00409 }
00410
00411 static void build_event_channel(struct ast_config *cfg, const char *cat)
00412 {
00413 struct ast_variable *var;
00414 struct event_channel *event_channel;
00415 SaAisErrorT ais_res;
00416 SaNameT sa_name = { 0, };
00417
00418 AST_RWLIST_WRLOCK(&event_channels);
00419 AST_RWLIST_TRAVERSE(&event_channels, event_channel, entry) {
00420 if (!strcasecmp(event_channel->name, cat))
00421 break;
00422 }
00423 AST_RWLIST_UNLOCK(&event_channels);
00424 if (event_channel) {
00425 ast_log(LOG_WARNING, "Event channel '%s' was specified twice in "
00426 "configuration. Second instance ignored.\n", cat);
00427 return;
00428 }
00429
00430 if (!(event_channel = ast_calloc(1, sizeof(*event_channel) + strlen(cat))))
00431 return;
00432
00433 strcpy(event_channel->name, cat);
00434 ast_copy_string((char *) sa_name.value, cat, sizeof(sa_name.value));
00435 sa_name.length = strlen((char *) sa_name.value);
00436 ais_res = saEvtChannelOpen(evt_handle, &sa_name,
00437 SA_EVT_CHANNEL_PUBLISHER | SA_EVT_CHANNEL_SUBSCRIBER | SA_EVT_CHANNEL_CREATE,
00438 SA_TIME_MAX, &event_channel->handle);
00439 if (ais_res != SA_AIS_OK) {
00440 ast_log(LOG_ERROR, "Error opening event channel: %s\n", ais_err2str(ais_res));
00441 free(event_channel);
00442 return;
00443 }
00444
00445 for (var = ast_variable_browse(cfg, cat); var; var = var->next) {
00446 if (!strcasecmp(var->name, "type")) {
00447 continue;
00448 } else if (!strcasecmp(var->name, "publish_event")) {
00449 add_publish_event(event_channel, var->value);
00450 } else if (!strcasecmp(var->name, "subscribe_event")) {
00451 add_subscribe_event(event_channel, var->value);
00452 } else {
00453 ast_log(LOG_WARNING, "Event channel '%s' contains invalid option '%s'\n",
00454 event_channel->name, var->name);
00455 }
00456 }
00457
00458 AST_RWLIST_WRLOCK(&event_channels);
00459 AST_RWLIST_INSERT_TAIL(&event_channels, event_channel, entry);
00460 AST_RWLIST_UNLOCK(&event_channels);
00461 }
00462
00463 static void load_config(void)
00464 {
00465 static const char filename[] = "ais.conf";
00466 struct ast_config *cfg;
00467 const char *cat = NULL;
00468 struct ast_flags config_flags = { 0 };
00469
00470 if (!(cfg = ast_config_load(filename, config_flags)) || cfg == CONFIG_STATUS_FILEINVALID)
00471 return;
00472
00473 while ((cat = ast_category_browse(cfg, cat))) {
00474 const char *type;
00475
00476 if (!strcasecmp(cat, "general"))
00477 continue;
00478
00479 if (!(type = ast_variable_retrieve(cfg, cat, "type"))) {
00480 ast_log(LOG_WARNING, "Invalid entry in %s defined with no type!\n",
00481 filename);
00482 continue;
00483 }
00484
00485 if (!strcasecmp(type, "event_channel")) {
00486 build_event_channel(cfg, cat);
00487 } else {
00488 ast_log(LOG_WARNING, "Entry in %s defined with invalid type '%s'\n",
00489 filename, type);
00490 }
00491 }
00492
00493 ast_config_destroy(cfg);
00494 }
00495
00496 static void publish_event_destroy(struct publish_event *publish_event)
00497 {
00498 ast_event_unsubscribe(publish_event->sub);
00499
00500 free(publish_event);
00501 }
00502
00503 static void subscribe_event_destroy(const struct event_channel *event_channel,
00504 struct subscribe_event *subscribe_event)
00505 {
00506 SaAisErrorT ais_res;
00507
00508
00509
00510 ais_res = saEvtEventUnsubscribe(event_channel->handle, subscribe_event->id);
00511 if (ais_res != SA_AIS_OK) {
00512 ast_log(LOG_ERROR, "Error unsubscribing: %s\n", ais_err2str(ais_res));
00513 }
00514
00515 free(subscribe_event);
00516 }
00517
00518 static void event_channel_destroy(struct event_channel *event_channel)
00519 {
00520 struct publish_event *publish_event;
00521 struct subscribe_event *subscribe_event;
00522 SaAisErrorT ais_res;
00523
00524 while ((publish_event = AST_LIST_REMOVE_HEAD(&event_channel->publish_events, entry)))
00525 publish_event_destroy(publish_event);
00526 while ((subscribe_event = AST_LIST_REMOVE_HEAD(&event_channel->subscribe_events, entry)))
00527 subscribe_event_destroy(event_channel, subscribe_event);
00528
00529 ais_res = saEvtChannelClose(event_channel->handle);
00530 if (ais_res != SA_AIS_OK) {
00531 ast_log(LOG_ERROR, "Error closing event channel '%s': %s\n",
00532 event_channel->name, ais_err2str(ais_res));
00533 }
00534
00535 free(event_channel);
00536 }
00537
00538 static void destroy_event_channels(void)
00539 {
00540 struct event_channel *event_channel;
00541
00542 AST_RWLIST_WRLOCK(&event_channels);
00543 while ((event_channel = AST_RWLIST_REMOVE_HEAD(&event_channels, entry))) {
00544 event_channel_destroy(event_channel);
00545 }
00546 AST_RWLIST_UNLOCK(&event_channels);
00547 }
00548
00549 int ast_ais_evt_load_module(void)
00550 {
00551 evt_init_res = saEvtInitialize(&evt_handle, &evt_callbacks, &ais_version);
00552 if (evt_init_res != SA_AIS_OK) {
00553 ast_log(LOG_ERROR, "Could not initialize eventing service: %s\n",
00554 ais_err2str(evt_init_res));
00555 return -1;
00556 }
00557
00558 load_config();
00559
00560 ast_cli_register_multiple(ais_cli, ARRAY_LEN(ais_cli));
00561
00562 return 0;
00563 }
00564
00565 int ast_ais_evt_unload_module(void)
00566 {
00567 SaAisErrorT ais_res;
00568
00569 if (evt_init_res != SA_AIS_OK) {
00570 return 0;
00571 }
00572
00573 destroy_event_channels();
00574
00575 ais_res = saEvtFinalize(evt_handle);
00576 if (ais_res != SA_AIS_OK) {
00577 ast_log(LOG_ERROR, "Problem stopping eventing service: %s\n",
00578 ais_err2str(ais_res));
00579 return -1;
00580 }
00581
00582 return 0;
00583 }