Mon Mar 19 11:30:27 2012

Asterisk developer's documentation


evt.c

Go to the documentation of this file.
00001 /*
00002  * Asterisk -- An open source telephony toolkit.
00003  *
00004  * Copyright (C) 2007, Digium, Inc.
00005  *
00006  * Russell Bryant <russell@digium.com>
00007  *
00008  * See http://www.asterisk.org for more information about
00009  * the Asterisk project. Please do not directly contact
00010  * any of the maintainers of this project for assistance;
00011  * the project provides a web site, mailing lists and IRC
00012  * channels for your use.
00013  *
00014  * This program is free software, distributed under the terms of
00015  * the GNU General Public License Version 2. See the LICENSE file
00016  * at the top of the source tree.
00017  */
00018 
00019 /*!
00020  * \file
00021  * \author Russell Bryant <russell@digium.com>
00022  *
00023  * \brief Usage of the SAForum AIS (Application Interface Specification)
00024  *
00025  * \arg http://www.openais.org/
00026  *
00027  * This file contains the code specific to the use of the EVT
00028  * (Event) Service.
00029  */
00030 
00031 #include "asterisk.h"
00032 
00033 ASTERISK_FILE_VERSION(__FILE__, "$Revision: 335497 $");
00034 
00035 #include <stdlib.h>
00036 #include <stdio.h>
00037 #include <string.h>
00038 #include <unistd.h>
00039 #include <errno.h>
00040 
00041 #include "ais.h"
00042 
00043 #include "asterisk/module.h"
00044 #include "asterisk/utils.h"
00045 #include "asterisk/cli.h"
00046 #include "asterisk/logger.h"
00047 #include "asterisk/event.h"
00048 #include "asterisk/config.h"
00049 #include "asterisk/linkedlists.h"
00050 #include "asterisk/devicestate.h"
00051 
00052 #ifndef AST_MODULE
00053 /* XXX HACK */
00054 #define AST_MODULE "res_ais"
00055 #endif
00056 
00057 SaEvtHandleT evt_handle;
00058 static SaAisErrorT evt_init_res;
00059 
00060 void evt_channel_open_cb(SaInvocationT invocation, SaEvtChannelHandleT channel_handle,
00061    SaAisErrorT error);
00062 void evt_event_deliver_cb(SaEvtSubscriptionIdT subscription_id,
00063    const SaEvtEventHandleT event_handle, const SaSizeT event_datalen);
00064 
00065 static const SaEvtCallbacksT evt_callbacks = {
00066    .saEvtChannelOpenCallback  = evt_channel_open_cb,
00067    .saEvtEventDeliverCallback = evt_event_deliver_cb,
00068 };
00069 
00070 static const struct {
00071    const char *str;
00072    enum ast_event_type type;
00073 } supported_event_types[] = {
00074    { "mwi", AST_EVENT_MWI },
00075    { "device_state", AST_EVENT_DEVICE_STATE_CHANGE },
00076 };
00077 
00078 /*! Used to provide unique id's to egress subscriptions */
00079 static int unique_id;
00080 
00081 struct subscribe_event {
00082    AST_LIST_ENTRY(subscribe_event) entry;
00083    /*! This is a unique identifier to identify this subscription in the event
00084     *  channel through the different API calls, subscribe, unsubscribe, and
00085     *  the event deliver callback. */
00086    SaEvtSubscriptionIdT id;
00087    enum ast_event_type type;
00088 };
00089 
00090 struct publish_event {
00091    AST_LIST_ENTRY(publish_event) entry;
00092    /*! We subscribe to events internally so that we can publish them
00093     *  on this event channel. */
00094    struct ast_event_sub *sub;
00095    enum ast_event_type type;
00096 };
00097 
00098 struct event_channel {
00099    AST_RWLIST_ENTRY(event_channel) entry;
00100    AST_LIST_HEAD_NOLOCK(, subscribe_event) subscribe_events;
00101    AST_LIST_HEAD_NOLOCK(, publish_event) publish_events;
00102    SaEvtChannelHandleT handle;
00103    char name[1];
00104 };
00105 
00106 static AST_RWLIST_HEAD_STATIC(event_channels, event_channel);
00107 
00108 void evt_channel_open_cb(SaInvocationT invocation, SaEvtChannelHandleT channel_handle,
00109    SaAisErrorT error)
00110 {
00111 
00112 }
00113 
00114 static void queue_event(struct ast_event *ast_event)
00115 {
00116    ast_event_queue_and_cache(ast_event);
00117 }
00118 
00119 void evt_event_deliver_cb(SaEvtSubscriptionIdT sub_id,
00120    const SaEvtEventHandleT event_handle, const SaSizeT event_datalen)
00121 {
00122    /* It is important to note that this works because we *know* that this
00123     * function will only be called by a single thread, the dispatch_thread.
00124     * If this module gets changed such that this is no longer the case, this
00125     * should get changed to a thread-local buffer, instead. */
00126    static unsigned char buf[4096];
00127    struct ast_event *event_dup, *event = (void *) buf;
00128    SaAisErrorT ais_res;
00129    SaSizeT len = sizeof(buf);
00130 
00131    if (event_datalen > len) {
00132       ast_log(LOG_ERROR, "Event received with size %u, which is too big\n"
00133          "for the allocated size %u. Change the code to increase the size.\n",
00134          (unsigned int) event_datalen, (unsigned int) len);
00135       return;
00136    }
00137 
00138    if (event_datalen < ast_event_minimum_length()) {
00139       ast_debug(1, "Ignoring event that's too small. %u < %u\n",
00140          (unsigned int) event_datalen,
00141          (unsigned int) ast_event_minimum_length());
00142       return;
00143    }
00144 
00145    ais_res = saEvtEventDataGet(event_handle, event, &len);
00146    if (ais_res != SA_AIS_OK) {
00147       ast_log(LOG_ERROR, "Error retrieving event payload: %s\n",
00148          ais_err2str(ais_res));
00149       return;
00150    }
00151 
00152    if (!ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(event, AST_EVENT_IE_EID))) {
00153       /* Don't feed events back in that originated locally. */
00154       return;
00155    }
00156 
00157    if (!(event_dup = ast_malloc(len)))
00158       return;
00159 
00160    memcpy(event_dup, event, len);
00161 
00162    queue_event(event_dup);
00163 }
00164 
00165 static const char *type_to_filter_str(enum ast_event_type type)
00166 {
00167    const char *filter_str = NULL;
00168    int i;
00169 
00170    for (i = 0; i < ARRAY_LEN(supported_event_types); i++) {
00171       if (supported_event_types[i].type == type) {
00172          filter_str = supported_event_types[i].str;
00173          break;
00174       }
00175    }
00176 
00177    return filter_str;
00178 }
00179 
00180 static void ast_event_cb(const struct ast_event *ast_event, void *data)
00181 {
00182    SaEvtEventHandleT event_handle;
00183    SaAisErrorT ais_res;
00184    struct event_channel *event_channel = data;
00185    SaClmClusterNodeT local_node;
00186    SaEvtEventPatternArrayT pattern_array;
00187    SaEvtEventPatternT pattern;
00188    SaSizeT len;
00189    const char *filter_str;
00190    SaEvtEventIdT event_id;
00191 
00192    ast_log(LOG_DEBUG, "Got an event to forward\n");
00193 
00194    if (ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(ast_event, AST_EVENT_IE_EID))) {
00195       /* If the event didn't originate from this server, don't send it back out. */
00196       ast_log(LOG_DEBUG, "Returning here\n");
00197       return;
00198    }
00199 
00200    ais_res = saEvtEventAllocate(event_channel->handle, &event_handle);
00201    if (ais_res != SA_AIS_OK) {
00202       ast_log(LOG_ERROR, "Error allocating event: %s\n", ais_err2str(ais_res));
00203       ast_log(LOG_DEBUG, "Returning here\n");
00204       return;
00205    }
00206 
00207    ais_res = saClmClusterNodeGet(clm_handle, SA_CLM_LOCAL_NODE_ID,
00208       SA_TIME_ONE_SECOND, &local_node);
00209    if (ais_res != SA_AIS_OK) {
00210       ast_log(LOG_ERROR, "Error getting local node name: %s\n", ais_err2str(ais_res));
00211       goto return_event_free;
00212    }
00213 
00214    filter_str = type_to_filter_str(ast_event_get_type(ast_event));
00215    len = strlen(filter_str) + 1;
00216    pattern.pattern = (SaUint8T *) filter_str;
00217    pattern.patternSize = len;
00218    pattern.allocatedSize = len;
00219 
00220    pattern_array.allocatedNumber = 1;
00221    pattern_array.patternsNumber = 1;
00222    pattern_array.patterns = &pattern;
00223 
00224    /*!
00225     * /todo Make retention time configurable
00226     * /todo Make event priorities configurable
00227     */
00228    ais_res = saEvtEventAttributesSet(event_handle, &pattern_array,
00229       SA_EVT_LOWEST_PRIORITY, SA_TIME_ONE_MINUTE, &local_node.nodeName);
00230    if (ais_res != SA_AIS_OK) {
00231       ast_log(LOG_ERROR, "Error setting event attributes: %s\n", ais_err2str(ais_res));
00232       goto return_event_free;
00233    }
00234 
00235    ais_res = saEvtEventPublish(event_handle,
00236       ast_event, ast_event_get_size(ast_event), &event_id);
00237    if (ais_res != SA_AIS_OK) {
00238       ast_log(LOG_ERROR, "Error publishing event: %s\n", ais_err2str(ais_res));
00239       goto return_event_free;
00240    }
00241 
00242 return_event_free:
00243    ais_res = saEvtEventFree(event_handle);
00244    if (ais_res != SA_AIS_OK) {
00245       ast_log(LOG_ERROR, "Error freeing allocated event: %s\n", ais_err2str(ais_res));
00246    }
00247    ast_log(LOG_DEBUG, "Returning here (event_free)\n");
00248 }
00249 
00250 static char *ais_evt_show_event_channels(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
00251 {
00252    struct event_channel *event_channel;
00253 
00254    switch (cmd) {
00255    case CLI_INIT:
00256       e->command = "ais evt show event channels";
00257       e->usage =
00258          "Usage: ais evt show event channels\n"
00259          "       List configured event channels for the (EVT) Eventing service.\n";
00260       return NULL;
00261 
00262    case CLI_GENERATE:
00263       return NULL;   /* no completion */
00264    }
00265 
00266    if (a->argc != e->args)
00267       return CLI_SHOWUSAGE;
00268 
00269    ast_cli(a->fd, "\n"
00270                "=============================================================\n"
00271                "=== Event Channels ==========================================\n"
00272                "=============================================================\n"
00273                "===\n");
00274 
00275    AST_RWLIST_RDLOCK(&event_channels);
00276    AST_RWLIST_TRAVERSE(&event_channels, event_channel, entry) {
00277       struct publish_event *publish_event;
00278       struct subscribe_event *subscribe_event;
00279 
00280       ast_cli(a->fd, "=== ---------------------------------------------------------\n"
00281                      "=== Event Channel Name: %s\n", event_channel->name);
00282 
00283       AST_LIST_TRAVERSE(&event_channel->publish_events, publish_event, entry) {
00284          ast_cli(a->fd, "=== ==> Publishing Event Type: %s\n",
00285             type_to_filter_str(publish_event->type));
00286       }
00287 
00288       AST_LIST_TRAVERSE(&event_channel->subscribe_events, subscribe_event, entry) {
00289          ast_cli(a->fd, "=== ==> Subscribing to Event Type: %s\n",
00290             type_to_filter_str(subscribe_event->type));
00291       }
00292 
00293       ast_cli(a->fd, "=== ---------------------------------------------------------\n"
00294                      "===\n");
00295    }
00296    AST_RWLIST_UNLOCK(&event_channels);
00297 
00298    ast_cli(a->fd, "=============================================================\n"
00299                   "\n");
00300 
00301    return CLI_SUCCESS;
00302 }
00303 
00304 static struct ast_cli_entry ais_cli[] = {
00305    AST_CLI_DEFINE(ais_evt_show_event_channels, "Show configured event channels"),
00306 };
00307 
00308 static void add_publish_event(struct event_channel *event_channel, const char *event_type)
00309 {
00310    int i;
00311    enum ast_event_type type = -1;
00312    struct publish_event *publish_event;
00313 
00314    for (i = 0; i < ARRAY_LEN(supported_event_types); i++) {
00315       if (!strcasecmp(event_type, supported_event_types[i].str)) {
00316          type = supported_event_types[i].type;
00317          break;
00318       }
00319    }
00320 
00321    if (type == -1) {
00322       ast_log(LOG_WARNING, "publish_event option given with invalid value '%s'\n", event_type);
00323       return;
00324    }
00325 
00326    if (type == AST_EVENT_DEVICE_STATE_CHANGE && ast_enable_distributed_devstate()) {
00327       return;
00328    }
00329 
00330    if (!(publish_event = ast_calloc(1, sizeof(*publish_event)))) {
00331       return;
00332    }
00333 
00334    publish_event->type = type;
00335    ast_log(LOG_DEBUG, "Subscribing to event type %d\n", type);
00336    publish_event->sub = ast_event_subscribe(type, ast_event_cb, "AIS", event_channel,
00337       AST_EVENT_IE_END);
00338    ast_event_dump_cache(publish_event->sub);
00339 
00340    AST_LIST_INSERT_TAIL(&event_channel->publish_events, publish_event, entry);
00341 }
00342 
00343 static SaAisErrorT set_egress_subscription(struct event_channel *event_channel,
00344    struct subscribe_event *subscribe_event)
00345 {
00346    SaAisErrorT ais_res;
00347    SaEvtEventFilterArrayT filter_array;
00348    SaEvtEventFilterT filter;
00349    const char *filter_str = NULL;
00350    SaSizeT len;
00351 
00352    /* We know it's going to be valid.  It was checked earlier. */
00353    filter_str = type_to_filter_str(subscribe_event->type);
00354 
00355    filter.filterType = SA_EVT_EXACT_FILTER;
00356    len = strlen(filter_str) + 1;
00357    filter.filter.allocatedSize = len;
00358    filter.filter.patternSize = len;
00359    filter.filter.pattern = (SaUint8T *) filter_str;
00360 
00361    filter_array.filtersNumber = 1;
00362    filter_array.filters = &filter;
00363 
00364    ais_res = saEvtEventSubscribe(event_channel->handle, &filter_array,
00365       subscribe_event->id);
00366 
00367    return ais_res;
00368 }
00369 
00370 static void add_subscribe_event(struct event_channel *event_channel, const char *event_type)
00371 {
00372    int i;
00373    enum ast_event_type type = -1;
00374    struct subscribe_event *subscribe_event;
00375    SaAisErrorT ais_res;
00376 
00377    for (i = 0; i < ARRAY_LEN(supported_event_types); i++) {
00378       if (!strcasecmp(event_type, supported_event_types[i].str)) {
00379          type = supported_event_types[i].type;
00380          break;
00381       }
00382    }
00383 
00384    if (type == -1) {
00385       ast_log(LOG_WARNING, "subscribe_event option given with invalid value '%s'\n", event_type);
00386       return;
00387    }
00388 
00389    if (type == AST_EVENT_DEVICE_STATE_CHANGE && ast_enable_distributed_devstate()) {
00390       return;
00391    }
00392 
00393    if (!(subscribe_event = ast_calloc(1, sizeof(*subscribe_event)))) {
00394       return;
00395    }
00396 
00397    subscribe_event->type = type;
00398    subscribe_event->id = ast_atomic_fetchadd_int(&unique_id, +1);
00399 
00400    ais_res = set_egress_subscription(event_channel, subscribe_event);
00401    if (ais_res != SA_AIS_OK) {
00402       ast_log(LOG_ERROR, "Error setting up egress subscription: %s\n",
00403          ais_err2str(ais_res));
00404       free(subscribe_event);
00405       return;
00406    }
00407 
00408    AST_LIST_INSERT_TAIL(&event_channel->subscribe_events, subscribe_event, entry);
00409 }
00410 
00411 static void build_event_channel(struct ast_config *cfg, const char *cat)
00412 {
00413    struct ast_variable *var;
00414    struct event_channel *event_channel;
00415    SaAisErrorT ais_res;
00416    SaNameT sa_name = { 0, };
00417 
00418    AST_RWLIST_WRLOCK(&event_channels);
00419    AST_RWLIST_TRAVERSE(&event_channels, event_channel, entry) {
00420       if (!strcasecmp(event_channel->name, cat))
00421          break;
00422    }
00423    AST_RWLIST_UNLOCK(&event_channels);
00424    if (event_channel) {
00425       ast_log(LOG_WARNING, "Event channel '%s' was specified twice in "
00426          "configuration.  Second instance ignored.\n", cat);
00427       return;
00428    }
00429 
00430    if (!(event_channel = ast_calloc(1, sizeof(*event_channel) + strlen(cat))))
00431       return;
00432 
00433    strcpy(event_channel->name, cat);
00434    ast_copy_string((char *) sa_name.value, cat, sizeof(sa_name.value));
00435    sa_name.length = strlen((char *) sa_name.value);
00436    ais_res = saEvtChannelOpen(evt_handle, &sa_name,
00437       SA_EVT_CHANNEL_PUBLISHER | SA_EVT_CHANNEL_SUBSCRIBER | SA_EVT_CHANNEL_CREATE,
00438       SA_TIME_MAX, &event_channel->handle);
00439    if (ais_res != SA_AIS_OK) {
00440       ast_log(LOG_ERROR, "Error opening event channel: %s\n", ais_err2str(ais_res));
00441       free(event_channel);
00442       return;
00443    }
00444 
00445    for (var = ast_variable_browse(cfg, cat); var; var = var->next) {
00446       if (!strcasecmp(var->name, "type")) {
00447          continue;
00448       } else if (!strcasecmp(var->name, "publish_event")) {
00449          add_publish_event(event_channel, var->value);
00450       } else if (!strcasecmp(var->name, "subscribe_event")) {
00451          add_subscribe_event(event_channel, var->value);
00452       } else {
00453          ast_log(LOG_WARNING, "Event channel '%s' contains invalid option '%s'\n",
00454             event_channel->name, var->name);
00455       }
00456    }
00457 
00458    AST_RWLIST_WRLOCK(&event_channels);
00459    AST_RWLIST_INSERT_TAIL(&event_channels, event_channel, entry);
00460    AST_RWLIST_UNLOCK(&event_channels);
00461 }
00462 
00463 static void load_config(void)
00464 {
00465    static const char filename[] = "ais.conf";
00466    struct ast_config *cfg;
00467    const char *cat = NULL;
00468    struct ast_flags config_flags = { 0 };
00469 
00470    if (!(cfg = ast_config_load(filename, config_flags)) || cfg == CONFIG_STATUS_FILEINVALID)
00471       return;
00472 
00473    while ((cat = ast_category_browse(cfg, cat))) {
00474       const char *type;
00475 
00476       if (!strcasecmp(cat, "general"))
00477          continue;
00478 
00479       if (!(type = ast_variable_retrieve(cfg, cat, "type"))) {
00480          ast_log(LOG_WARNING, "Invalid entry in %s defined with no type!\n",
00481             filename);
00482          continue;
00483       }
00484 
00485       if (!strcasecmp(type, "event_channel")) {
00486          build_event_channel(cfg, cat);
00487       } else {
00488          ast_log(LOG_WARNING, "Entry in %s defined with invalid type '%s'\n",
00489             filename, type);
00490       }
00491    }
00492 
00493    ast_config_destroy(cfg);
00494 }
00495 
00496 static void publish_event_destroy(struct publish_event *publish_event)
00497 {
00498    ast_event_unsubscribe(publish_event->sub);
00499 
00500    free(publish_event);
00501 }
00502 
00503 static void subscribe_event_destroy(const struct event_channel *event_channel,
00504    struct subscribe_event *subscribe_event)
00505 {
00506    SaAisErrorT ais_res;
00507 
00508    /* saEvtChannelClose() will actually do this automatically, but it just
00509     * feels cleaner to go ahead and do it manually ... */
00510    ais_res = saEvtEventUnsubscribe(event_channel->handle, subscribe_event->id);
00511    if (ais_res != SA_AIS_OK) {
00512       ast_log(LOG_ERROR, "Error unsubscribing: %s\n", ais_err2str(ais_res));
00513    }
00514 
00515    free(subscribe_event);
00516 }
00517 
00518 static void event_channel_destroy(struct event_channel *event_channel)
00519 {
00520    struct publish_event *publish_event;
00521    struct subscribe_event *subscribe_event;
00522    SaAisErrorT ais_res;
00523 
00524    while ((publish_event = AST_LIST_REMOVE_HEAD(&event_channel->publish_events, entry)))
00525       publish_event_destroy(publish_event);
00526    while ((subscribe_event = AST_LIST_REMOVE_HEAD(&event_channel->subscribe_events, entry)))
00527       subscribe_event_destroy(event_channel, subscribe_event);
00528 
00529    ais_res = saEvtChannelClose(event_channel->handle);
00530    if (ais_res != SA_AIS_OK) {
00531       ast_log(LOG_ERROR, "Error closing event channel '%s': %s\n",
00532          event_channel->name, ais_err2str(ais_res));
00533    }
00534 
00535    free(event_channel);
00536 }
00537 
00538 static void destroy_event_channels(void)
00539 {
00540    struct event_channel *event_channel;
00541 
00542    AST_RWLIST_WRLOCK(&event_channels);
00543    while ((event_channel = AST_RWLIST_REMOVE_HEAD(&event_channels, entry))) {
00544       event_channel_destroy(event_channel);
00545    }
00546    AST_RWLIST_UNLOCK(&event_channels);
00547 }
00548 
00549 int ast_ais_evt_load_module(void)
00550 {
00551    evt_init_res = saEvtInitialize(&evt_handle, &evt_callbacks, &ais_version);
00552    if (evt_init_res != SA_AIS_OK) {
00553       ast_log(LOG_ERROR, "Could not initialize eventing service: %s\n",
00554          ais_err2str(evt_init_res));
00555       return -1;
00556    }
00557 
00558    load_config();
00559 
00560    ast_cli_register_multiple(ais_cli, ARRAY_LEN(ais_cli));
00561 
00562    return 0;
00563 }
00564 
00565 int ast_ais_evt_unload_module(void)
00566 {
00567    SaAisErrorT ais_res;
00568 
00569    if (evt_init_res != SA_AIS_OK) {
00570       return 0;
00571    }
00572 
00573    destroy_event_channels();
00574 
00575    ais_res = saEvtFinalize(evt_handle);
00576    if (ais_res != SA_AIS_OK) {
00577       ast_log(LOG_ERROR, "Problem stopping eventing service: %s\n",
00578          ais_err2str(ais_res));
00579       return -1;
00580    }
00581 
00582    return 0;
00583 }

Generated on Mon Mar 19 11:30:27 2012 for Asterisk - The Open Source Telephony Project by  doxygen 1.4.7