00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include "asterisk.h"
00036
00037 ASTERISK_FILE_VERSION(__FILE__, "$Revision: 369001 $");
00038
00039 #include <stdlib.h>
00040 #include <stdio.h>
00041 #include <string.h>
00042 #include <unistd.h>
00043 #include <errno.h>
00044
00045 #include "ais.h"
00046
00047 #include "asterisk/module.h"
00048 #include "asterisk/utils.h"
00049 #include "asterisk/cli.h"
00050 #include "asterisk/logger.h"
00051 #include "asterisk/event.h"
00052 #include "asterisk/config.h"
00053 #include "asterisk/linkedlists.h"
00054 #include "asterisk/devicestate.h"
00055
00056 #ifndef AST_MODULE
00057
00058 #define AST_MODULE "res_ais"
00059 #endif
00060
00061 SaEvtHandleT evt_handle;
00062 static SaAisErrorT evt_init_res;
00063
00064 void evt_channel_open_cb(SaInvocationT invocation, SaEvtChannelHandleT channel_handle,
00065 SaAisErrorT error);
00066 void evt_event_deliver_cb(SaEvtSubscriptionIdT subscription_id,
00067 const SaEvtEventHandleT event_handle, const SaSizeT event_datalen);
00068
00069 static const SaEvtCallbacksT evt_callbacks = {
00070 .saEvtChannelOpenCallback = evt_channel_open_cb,
00071 .saEvtEventDeliverCallback = evt_event_deliver_cb,
00072 };
00073
00074 static const struct {
00075 const char *str;
00076 enum ast_event_type type;
00077 } supported_event_types[] = {
00078 { "mwi", AST_EVENT_MWI },
00079 { "device_state", AST_EVENT_DEVICE_STATE_CHANGE },
00080 };
00081
00082
00083 static int unique_id;
00084
00085 struct subscribe_event {
00086 AST_LIST_ENTRY(subscribe_event) entry;
00087
00088
00089
00090 SaEvtSubscriptionIdT id;
00091 enum ast_event_type type;
00092 };
00093
00094 struct publish_event {
00095 AST_LIST_ENTRY(publish_event) entry;
00096
00097
00098 struct ast_event_sub *sub;
00099 enum ast_event_type type;
00100 };
00101
00102 struct event_channel {
00103 AST_RWLIST_ENTRY(event_channel) entry;
00104 AST_LIST_HEAD_NOLOCK(, subscribe_event) subscribe_events;
00105 AST_LIST_HEAD_NOLOCK(, publish_event) publish_events;
00106 SaEvtChannelHandleT handle;
00107 char name[1];
00108 };
00109
00110 static AST_RWLIST_HEAD_STATIC(event_channels, event_channel);
00111
00112 void evt_channel_open_cb(SaInvocationT invocation, SaEvtChannelHandleT channel_handle,
00113 SaAisErrorT error)
00114 {
00115
00116 }
00117
00118 static void queue_event(struct ast_event *ast_event)
00119 {
00120 ast_event_queue_and_cache(ast_event);
00121 }
00122
00123 void evt_event_deliver_cb(SaEvtSubscriptionIdT sub_id,
00124 const SaEvtEventHandleT event_handle, const SaSizeT event_datalen)
00125 {
00126
00127
00128
00129
00130 static unsigned char buf[4096];
00131 struct ast_event *event_dup, *event = (void *) buf;
00132 SaAisErrorT ais_res;
00133 SaSizeT len = sizeof(buf);
00134
00135 if (event_datalen > len) {
00136 ast_log(LOG_ERROR, "Event received with size %u, which is too big\n"
00137 "for the allocated size %u. Change the code to increase the size.\n",
00138 (unsigned int) event_datalen, (unsigned int) len);
00139 return;
00140 }
00141
00142 if (event_datalen < ast_event_minimum_length()) {
00143 ast_debug(1, "Ignoring event that's too small. %u < %u\n",
00144 (unsigned int) event_datalen,
00145 (unsigned int) ast_event_minimum_length());
00146 return;
00147 }
00148
00149 ais_res = saEvtEventDataGet(event_handle, event, &len);
00150 if (ais_res != SA_AIS_OK) {
00151 ast_log(LOG_ERROR, "Error retrieving event payload: %s\n",
00152 ais_err2str(ais_res));
00153 return;
00154 }
00155
00156 if (!ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(event, AST_EVENT_IE_EID))) {
00157
00158 return;
00159 }
00160
00161 if (!(event_dup = ast_malloc(len)))
00162 return;
00163
00164 memcpy(event_dup, event, len);
00165
00166 queue_event(event_dup);
00167 }
00168
00169 static const char *type_to_filter_str(enum ast_event_type type)
00170 {
00171 const char *filter_str = NULL;
00172 int i;
00173
00174 for (i = 0; i < ARRAY_LEN(supported_event_types); i++) {
00175 if (supported_event_types[i].type == type) {
00176 filter_str = supported_event_types[i].str;
00177 break;
00178 }
00179 }
00180
00181 return filter_str;
00182 }
00183
00184 static void ast_event_cb(const struct ast_event *ast_event, void *data)
00185 {
00186 SaEvtEventHandleT event_handle;
00187 SaAisErrorT ais_res;
00188 struct event_channel *event_channel = data;
00189 SaClmClusterNodeT local_node;
00190 SaEvtEventPatternArrayT pattern_array;
00191 SaEvtEventPatternT pattern;
00192 SaSizeT len;
00193 const char *filter_str;
00194 SaEvtEventIdT event_id;
00195
00196 ast_log(LOG_DEBUG, "Got an event to forward\n");
00197
00198 if (ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(ast_event, AST_EVENT_IE_EID))) {
00199
00200 ast_log(LOG_DEBUG, "Returning here\n");
00201 return;
00202 }
00203
00204 ais_res = saEvtEventAllocate(event_channel->handle, &event_handle);
00205 if (ais_res != SA_AIS_OK) {
00206 ast_log(LOG_ERROR, "Error allocating event: %s\n", ais_err2str(ais_res));
00207 ast_log(LOG_DEBUG, "Returning here\n");
00208 return;
00209 }
00210
00211 ais_res = saClmClusterNodeGet(clm_handle, SA_CLM_LOCAL_NODE_ID,
00212 SA_TIME_ONE_SECOND, &local_node);
00213 if (ais_res != SA_AIS_OK) {
00214 ast_log(LOG_ERROR, "Error getting local node name: %s\n", ais_err2str(ais_res));
00215 goto return_event_free;
00216 }
00217
00218 filter_str = type_to_filter_str(ast_event_get_type(ast_event));
00219 len = strlen(filter_str) + 1;
00220 pattern.pattern = (SaUint8T *) filter_str;
00221 pattern.patternSize = len;
00222 pattern.allocatedSize = len;
00223
00224 pattern_array.allocatedNumber = 1;
00225 pattern_array.patternsNumber = 1;
00226 pattern_array.patterns = &pattern;
00227
00228
00229
00230
00231
00232 ais_res = saEvtEventAttributesSet(event_handle, &pattern_array,
00233 SA_EVT_LOWEST_PRIORITY, SA_TIME_ONE_MINUTE, &local_node.nodeName);
00234 if (ais_res != SA_AIS_OK) {
00235 ast_log(LOG_ERROR, "Error setting event attributes: %s\n", ais_err2str(ais_res));
00236 goto return_event_free;
00237 }
00238
00239 for (;;) {
00240 ais_res = saEvtEventPublish(event_handle,
00241 ast_event, ast_event_get_size(ast_event), &event_id);
00242 if (ais_res != SA_AIS_ERR_TRY_AGAIN) {
00243 break;
00244 }
00245 sched_yield();
00246 }
00247
00248 if (ais_res != SA_AIS_OK) {
00249 ast_log(LOG_ERROR, "Error publishing event: %s\n", ais_err2str(ais_res));
00250 goto return_event_free;
00251 }
00252
00253 return_event_free:
00254 ais_res = saEvtEventFree(event_handle);
00255 if (ais_res != SA_AIS_OK) {
00256 ast_log(LOG_ERROR, "Error freeing allocated event: %s\n", ais_err2str(ais_res));
00257 }
00258 ast_log(LOG_DEBUG, "Returning here (event_free)\n");
00259 }
00260
00261 static char *ais_evt_show_event_channels(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
00262 {
00263 struct event_channel *event_channel;
00264
00265 switch (cmd) {
00266 case CLI_INIT:
00267 e->command = "ais evt show event channels";
00268 e->usage =
00269 "Usage: ais evt show event channels\n"
00270 " List configured event channels for the (EVT) Eventing service.\n";
00271 return NULL;
00272
00273 case CLI_GENERATE:
00274 return NULL;
00275 }
00276
00277 if (a->argc != e->args)
00278 return CLI_SHOWUSAGE;
00279
00280 ast_cli(a->fd, "\n"
00281 "=============================================================\n"
00282 "=== Event Channels ==========================================\n"
00283 "=============================================================\n"
00284 "===\n");
00285
00286 AST_RWLIST_RDLOCK(&event_channels);
00287 AST_RWLIST_TRAVERSE(&event_channels, event_channel, entry) {
00288 struct publish_event *publish_event;
00289 struct subscribe_event *subscribe_event;
00290
00291 ast_cli(a->fd, "=== ---------------------------------------------------------\n"
00292 "=== Event Channel Name: %s\n", event_channel->name);
00293
00294 AST_LIST_TRAVERSE(&event_channel->publish_events, publish_event, entry) {
00295 ast_cli(a->fd, "=== ==> Publishing Event Type: %s\n",
00296 type_to_filter_str(publish_event->type));
00297 }
00298
00299 AST_LIST_TRAVERSE(&event_channel->subscribe_events, subscribe_event, entry) {
00300 ast_cli(a->fd, "=== ==> Subscribing to Event Type: %s\n",
00301 type_to_filter_str(subscribe_event->type));
00302 }
00303
00304 ast_cli(a->fd, "=== ---------------------------------------------------------\n"
00305 "===\n");
00306 }
00307 AST_RWLIST_UNLOCK(&event_channels);
00308
00309 ast_cli(a->fd, "=============================================================\n"
00310 "\n");
00311
00312 return CLI_SUCCESS;
00313 }
00314
00315 static struct ast_cli_entry ais_cli[] = {
00316 AST_CLI_DEFINE(ais_evt_show_event_channels, "Show configured event channels"),
00317 };
00318
00319 void ast_ais_evt_membership_changed(void)
00320 {
00321 struct event_channel *ec;
00322
00323 AST_RWLIST_RDLOCK(&event_channels);
00324 AST_RWLIST_TRAVERSE(&event_channels, ec, entry) {
00325 struct publish_event *pe;
00326
00327 AST_LIST_TRAVERSE(&ec->publish_events, pe, entry) {
00328 ast_debug(1, "Dumping cache for event channel '%s'\n", ec->name);
00329 ast_event_dump_cache(pe->sub);
00330 }
00331 }
00332 AST_RWLIST_UNLOCK(&event_channels);
00333 }
00334
00335 static void add_publish_event(struct event_channel *event_channel, const char *event_type)
00336 {
00337 int i;
00338 enum ast_event_type type = -1;
00339 struct publish_event *publish_event;
00340
00341 for (i = 0; i < ARRAY_LEN(supported_event_types); i++) {
00342 if (!strcasecmp(event_type, supported_event_types[i].str)) {
00343 type = supported_event_types[i].type;
00344 break;
00345 }
00346 }
00347
00348 if (type == -1) {
00349 ast_log(LOG_WARNING, "publish_event option given with invalid value '%s'\n", event_type);
00350 return;
00351 }
00352
00353 if (type == AST_EVENT_DEVICE_STATE_CHANGE && ast_enable_distributed_devstate()) {
00354 return;
00355 }
00356
00357 if (!(publish_event = ast_calloc(1, sizeof(*publish_event)))) {
00358 return;
00359 }
00360
00361 publish_event->type = type;
00362 ast_log(LOG_DEBUG, "Subscribing to event type %d\n", type);
00363 publish_event->sub = ast_event_subscribe(type, ast_event_cb, "AIS", event_channel,
00364 AST_EVENT_IE_END);
00365 ast_event_dump_cache(publish_event->sub);
00366
00367 AST_LIST_INSERT_TAIL(&event_channel->publish_events, publish_event, entry);
00368 }
00369
00370 static SaAisErrorT set_egress_subscription(struct event_channel *event_channel,
00371 struct subscribe_event *subscribe_event)
00372 {
00373 SaAisErrorT ais_res;
00374 SaEvtEventFilterArrayT filter_array;
00375 SaEvtEventFilterT filter;
00376 const char *filter_str = NULL;
00377 SaSizeT len;
00378
00379
00380 filter_str = type_to_filter_str(subscribe_event->type);
00381
00382 filter.filterType = SA_EVT_EXACT_FILTER;
00383 len = strlen(filter_str) + 1;
00384 filter.filter.allocatedSize = len;
00385 filter.filter.patternSize = len;
00386 filter.filter.pattern = (SaUint8T *) filter_str;
00387
00388 filter_array.filtersNumber = 1;
00389 filter_array.filters = &filter;
00390
00391 ais_res = saEvtEventSubscribe(event_channel->handle, &filter_array,
00392 subscribe_event->id);
00393
00394 return ais_res;
00395 }
00396
00397 static void add_subscribe_event(struct event_channel *event_channel, const char *event_type)
00398 {
00399 int i;
00400 enum ast_event_type type = -1;
00401 struct subscribe_event *subscribe_event;
00402 SaAisErrorT ais_res;
00403
00404 for (i = 0; i < ARRAY_LEN(supported_event_types); i++) {
00405 if (!strcasecmp(event_type, supported_event_types[i].str)) {
00406 type = supported_event_types[i].type;
00407 break;
00408 }
00409 }
00410
00411 if (type == -1) {
00412 ast_log(LOG_WARNING, "subscribe_event option given with invalid value '%s'\n", event_type);
00413 return;
00414 }
00415
00416 if (type == AST_EVENT_DEVICE_STATE_CHANGE && ast_enable_distributed_devstate()) {
00417 return;
00418 }
00419
00420 if (!(subscribe_event = ast_calloc(1, sizeof(*subscribe_event)))) {
00421 return;
00422 }
00423
00424 subscribe_event->type = type;
00425 subscribe_event->id = ast_atomic_fetchadd_int(&unique_id, +1);
00426
00427 ais_res = set_egress_subscription(event_channel, subscribe_event);
00428 if (ais_res != SA_AIS_OK) {
00429 ast_log(LOG_ERROR, "Error setting up egress subscription: %s\n",
00430 ais_err2str(ais_res));
00431 free(subscribe_event);
00432 return;
00433 }
00434
00435 AST_LIST_INSERT_TAIL(&event_channel->subscribe_events, subscribe_event, entry);
00436 }
00437
00438 static void build_event_channel(struct ast_config *cfg, const char *cat)
00439 {
00440 struct ast_variable *var;
00441 struct event_channel *event_channel;
00442 SaAisErrorT ais_res;
00443 SaNameT sa_name = { 0, };
00444
00445 AST_RWLIST_WRLOCK(&event_channels);
00446 AST_RWLIST_TRAVERSE(&event_channels, event_channel, entry) {
00447 if (!strcasecmp(event_channel->name, cat))
00448 break;
00449 }
00450 AST_RWLIST_UNLOCK(&event_channels);
00451 if (event_channel) {
00452 ast_log(LOG_WARNING, "Event channel '%s' was specified twice in "
00453 "configuration. Second instance ignored.\n", cat);
00454 return;
00455 }
00456
00457 if (!(event_channel = ast_calloc(1, sizeof(*event_channel) + strlen(cat))))
00458 return;
00459
00460 strcpy(event_channel->name, cat);
00461 ast_copy_string((char *) sa_name.value, cat, sizeof(sa_name.value));
00462 sa_name.length = strlen((char *) sa_name.value);
00463 ais_res = saEvtChannelOpen(evt_handle, &sa_name,
00464 SA_EVT_CHANNEL_PUBLISHER | SA_EVT_CHANNEL_SUBSCRIBER | SA_EVT_CHANNEL_CREATE,
00465 SA_TIME_MAX, &event_channel->handle);
00466 if (ais_res != SA_AIS_OK) {
00467 ast_log(LOG_ERROR, "Error opening event channel: %s\n", ais_err2str(ais_res));
00468 free(event_channel);
00469 return;
00470 }
00471
00472 for (var = ast_variable_browse(cfg, cat); var; var = var->next) {
00473 if (!strcasecmp(var->name, "type")) {
00474 continue;
00475 } else if (!strcasecmp(var->name, "publish_event")) {
00476 add_publish_event(event_channel, var->value);
00477 } else if (!strcasecmp(var->name, "subscribe_event")) {
00478 add_subscribe_event(event_channel, var->value);
00479 } else {
00480 ast_log(LOG_WARNING, "Event channel '%s' contains invalid option '%s'\n",
00481 event_channel->name, var->name);
00482 }
00483 }
00484
00485 AST_RWLIST_WRLOCK(&event_channels);
00486 AST_RWLIST_INSERT_TAIL(&event_channels, event_channel, entry);
00487 AST_RWLIST_UNLOCK(&event_channels);
00488 }
00489
00490 static void load_config(void)
00491 {
00492 static const char filename[] = "ais.conf";
00493 struct ast_config *cfg;
00494 const char *cat = NULL;
00495 struct ast_flags config_flags = { 0 };
00496
00497 if (!(cfg = ast_config_load(filename, config_flags)) || cfg == CONFIG_STATUS_FILEINVALID)
00498 return;
00499
00500 while ((cat = ast_category_browse(cfg, cat))) {
00501 const char *type;
00502
00503 if (!strcasecmp(cat, "general"))
00504 continue;
00505
00506 if (!(type = ast_variable_retrieve(cfg, cat, "type"))) {
00507 ast_log(LOG_WARNING, "Invalid entry in %s defined with no type!\n",
00508 filename);
00509 continue;
00510 }
00511
00512 if (!strcasecmp(type, "event_channel")) {
00513 build_event_channel(cfg, cat);
00514 } else {
00515 ast_log(LOG_WARNING, "Entry in %s defined with invalid type '%s'\n",
00516 filename, type);
00517 }
00518 }
00519
00520 ast_config_destroy(cfg);
00521 }
00522
00523 static void publish_event_destroy(struct publish_event *publish_event)
00524 {
00525 ast_event_unsubscribe(publish_event->sub);
00526
00527 free(publish_event);
00528 }
00529
00530 static void subscribe_event_destroy(const struct event_channel *event_channel,
00531 struct subscribe_event *subscribe_event)
00532 {
00533 SaAisErrorT ais_res;
00534
00535
00536
00537 ais_res = saEvtEventUnsubscribe(event_channel->handle, subscribe_event->id);
00538 if (ais_res != SA_AIS_OK) {
00539 ast_log(LOG_ERROR, "Error unsubscribing: %s\n", ais_err2str(ais_res));
00540 }
00541
00542 free(subscribe_event);
00543 }
00544
00545 static void event_channel_destroy(struct event_channel *event_channel)
00546 {
00547 struct publish_event *publish_event;
00548 struct subscribe_event *subscribe_event;
00549 SaAisErrorT ais_res;
00550
00551 while ((publish_event = AST_LIST_REMOVE_HEAD(&event_channel->publish_events, entry)))
00552 publish_event_destroy(publish_event);
00553 while ((subscribe_event = AST_LIST_REMOVE_HEAD(&event_channel->subscribe_events, entry)))
00554 subscribe_event_destroy(event_channel, subscribe_event);
00555
00556 ais_res = saEvtChannelClose(event_channel->handle);
00557 if (ais_res != SA_AIS_OK) {
00558 ast_log(LOG_ERROR, "Error closing event channel '%s': %s\n",
00559 event_channel->name, ais_err2str(ais_res));
00560 }
00561
00562 free(event_channel);
00563 }
00564
00565 static void destroy_event_channels(void)
00566 {
00567 struct event_channel *event_channel;
00568
00569 AST_RWLIST_WRLOCK(&event_channels);
00570 while ((event_channel = AST_RWLIST_REMOVE_HEAD(&event_channels, entry))) {
00571 event_channel_destroy(event_channel);
00572 }
00573 AST_RWLIST_UNLOCK(&event_channels);
00574 }
00575
00576 int ast_ais_evt_load_module(void)
00577 {
00578 evt_init_res = saEvtInitialize(&evt_handle, &evt_callbacks, &ais_version);
00579 if (evt_init_res != SA_AIS_OK) {
00580 ast_log(LOG_ERROR, "Could not initialize eventing service: %s\n",
00581 ais_err2str(evt_init_res));
00582 return -1;
00583 }
00584
00585 load_config();
00586
00587 ast_cli_register_multiple(ais_cli, ARRAY_LEN(ais_cli));
00588
00589 return 0;
00590 }
00591
00592 int ast_ais_evt_unload_module(void)
00593 {
00594 SaAisErrorT ais_res;
00595
00596 if (evt_init_res != SA_AIS_OK) {
00597 return 0;
00598 }
00599
00600 destroy_event_channels();
00601
00602 ais_res = saEvtFinalize(evt_handle);
00603 if (ais_res != SA_AIS_OK) {
00604 ast_log(LOG_ERROR, "Problem stopping eventing service: %s\n",
00605 ais_err2str(ais_res));
00606 return -1;
00607 }
00608
00609 return 0;
00610 }