Wed Jan 27 20:02:10 2016

Asterisk developer's documentation


evt.c

Go to the documentation of this file.
00001 /*
00002  * Asterisk -- An open source telephony toolkit.
00003  *
00004  * Copyright (C) 2007, Digium, Inc.
00005  *
00006  * Russell Bryant <russell@digium.com>
00007  *
00008  * See http://www.asterisk.org for more information about
00009  * the Asterisk project. Please do not directly contact
00010  * any of the maintainers of this project for assistance;
00011  * the project provides a web site, mailing lists and IRC
00012  * channels for your use.
00013  *
00014  * This program is free software, distributed under the terms of
00015  * the GNU General Public License Version 2. See the LICENSE file
00016  * at the top of the source tree.
00017  */
00018 
00019 /*!
00020  * \file
00021  * \author Russell Bryant <russell@digium.com>
00022  *
00023  * \brief Usage of the SAForum AIS (Application Interface Specification)
00024  *
00025  * \arg http://www.openais.org/
00026  *
00027  * This file contains the code specific to the use of the EVT
00028  * (Event) Service.
00029  */
00030 
00031 /*** MODULEINFO
00032    <support_level>extended</support_level>
00033  ***/
00034 
00035 #include "asterisk.h"
00036 
00037 ASTERISK_FILE_VERSION(__FILE__, "$Revision: 369001 $");
00038 
00039 #include <stdlib.h>
00040 #include <stdio.h>
00041 #include <string.h>
00042 #include <unistd.h>
00043 #include <errno.h>
00044 
00045 #include "ais.h"
00046 
00047 #include "asterisk/module.h"
00048 #include "asterisk/utils.h"
00049 #include "asterisk/cli.h"
00050 #include "asterisk/logger.h"
00051 #include "asterisk/event.h"
00052 #include "asterisk/config.h"
00053 #include "asterisk/linkedlists.h"
00054 #include "asterisk/devicestate.h"
00055 
00056 #ifndef AST_MODULE
00057 /* XXX HACK */
00058 #define AST_MODULE "res_ais"
00059 #endif
00060 
00061 SaEvtHandleT evt_handle;
00062 static SaAisErrorT evt_init_res;
00063 
00064 void evt_channel_open_cb(SaInvocationT invocation, SaEvtChannelHandleT channel_handle,
00065    SaAisErrorT error);
00066 void evt_event_deliver_cb(SaEvtSubscriptionIdT subscription_id,
00067    const SaEvtEventHandleT event_handle, const SaSizeT event_datalen);
00068 
00069 static const SaEvtCallbacksT evt_callbacks = {
00070    .saEvtChannelOpenCallback  = evt_channel_open_cb,
00071    .saEvtEventDeliverCallback = evt_event_deliver_cb,
00072 };
00073 
00074 static const struct {
00075    const char *str;
00076    enum ast_event_type type;
00077 } supported_event_types[] = {
00078    { "mwi", AST_EVENT_MWI },
00079    { "device_state", AST_EVENT_DEVICE_STATE_CHANGE },
00080 };
00081 
00082 /*! Used to provide unique id's to egress subscriptions */
00083 static int unique_id;
00084 
00085 struct subscribe_event {
00086    AST_LIST_ENTRY(subscribe_event) entry;
00087    /*! This is a unique identifier to identify this subscription in the event
00088     *  channel through the different API calls, subscribe, unsubscribe, and
00089     *  the event deliver callback. */
00090    SaEvtSubscriptionIdT id;
00091    enum ast_event_type type;
00092 };
00093 
00094 struct publish_event {
00095    AST_LIST_ENTRY(publish_event) entry;
00096    /*! We subscribe to events internally so that we can publish them
00097     *  on this event channel. */
00098    struct ast_event_sub *sub;
00099    enum ast_event_type type;
00100 };
00101 
00102 struct event_channel {
00103    AST_RWLIST_ENTRY(event_channel) entry;
00104    AST_LIST_HEAD_NOLOCK(, subscribe_event) subscribe_events;
00105    AST_LIST_HEAD_NOLOCK(, publish_event) publish_events;
00106    SaEvtChannelHandleT handle;
00107    char name[1];
00108 };
00109 
00110 static AST_RWLIST_HEAD_STATIC(event_channels, event_channel);
00111 
00112 void evt_channel_open_cb(SaInvocationT invocation, SaEvtChannelHandleT channel_handle,
00113    SaAisErrorT error)
00114 {
00115 
00116 }
00117 
00118 static void queue_event(struct ast_event *ast_event)
00119 {
00120    ast_event_queue_and_cache(ast_event);
00121 }
00122 
00123 void evt_event_deliver_cb(SaEvtSubscriptionIdT sub_id,
00124    const SaEvtEventHandleT event_handle, const SaSizeT event_datalen)
00125 {
00126    /* It is important to note that this works because we *know* that this
00127     * function will only be called by a single thread, the dispatch_thread.
00128     * If this module gets changed such that this is no longer the case, this
00129     * should get changed to a thread-local buffer, instead. */
00130    static unsigned char buf[4096];
00131    struct ast_event *event_dup, *event = (void *) buf;
00132    SaAisErrorT ais_res;
00133    SaSizeT len = sizeof(buf);
00134 
00135    if (event_datalen > len) {
00136       ast_log(LOG_ERROR, "Event received with size %u, which is too big\n"
00137          "for the allocated size %u. Change the code to increase the size.\n",
00138          (unsigned int) event_datalen, (unsigned int) len);
00139       return;
00140    }
00141 
00142    if (event_datalen < ast_event_minimum_length()) {
00143       ast_debug(1, "Ignoring event that's too small. %u < %u\n",
00144          (unsigned int) event_datalen,
00145          (unsigned int) ast_event_minimum_length());
00146       return;
00147    }
00148 
00149    ais_res = saEvtEventDataGet(event_handle, event, &len);
00150    if (ais_res != SA_AIS_OK) {
00151       ast_log(LOG_ERROR, "Error retrieving event payload: %s\n",
00152          ais_err2str(ais_res));
00153       return;
00154    }
00155 
00156    if (!ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(event, AST_EVENT_IE_EID))) {
00157       /* Don't feed events back in that originated locally. */
00158       return;
00159    }
00160 
00161    if (!(event_dup = ast_malloc(len)))
00162       return;
00163 
00164    memcpy(event_dup, event, len);
00165 
00166    queue_event(event_dup);
00167 }
00168 
00169 static const char *type_to_filter_str(enum ast_event_type type)
00170 {
00171    const char *filter_str = NULL;
00172    int i;
00173 
00174    for (i = 0; i < ARRAY_LEN(supported_event_types); i++) {
00175       if (supported_event_types[i].type == type) {
00176          filter_str = supported_event_types[i].str;
00177          break;
00178       }
00179    }
00180 
00181    return filter_str;
00182 }
00183 
00184 static void ast_event_cb(const struct ast_event *ast_event, void *data)
00185 {
00186    SaEvtEventHandleT event_handle;
00187    SaAisErrorT ais_res;
00188    struct event_channel *event_channel = data;
00189    SaClmClusterNodeT local_node;
00190    SaEvtEventPatternArrayT pattern_array;
00191    SaEvtEventPatternT pattern;
00192    SaSizeT len;
00193    const char *filter_str;
00194    SaEvtEventIdT event_id;
00195 
00196    ast_log(LOG_DEBUG, "Got an event to forward\n");
00197 
00198    if (ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(ast_event, AST_EVENT_IE_EID))) {
00199       /* If the event didn't originate from this server, don't send it back out. */
00200       ast_log(LOG_DEBUG, "Returning here\n");
00201       return;
00202    }
00203 
00204    ais_res = saEvtEventAllocate(event_channel->handle, &event_handle);
00205    if (ais_res != SA_AIS_OK) {
00206       ast_log(LOG_ERROR, "Error allocating event: %s\n", ais_err2str(ais_res));
00207       ast_log(LOG_DEBUG, "Returning here\n");
00208       return;
00209    }
00210 
00211    ais_res = saClmClusterNodeGet(clm_handle, SA_CLM_LOCAL_NODE_ID,
00212       SA_TIME_ONE_SECOND, &local_node);
00213    if (ais_res != SA_AIS_OK) {
00214       ast_log(LOG_ERROR, "Error getting local node name: %s\n", ais_err2str(ais_res));
00215       goto return_event_free;
00216    }
00217 
00218    filter_str = type_to_filter_str(ast_event_get_type(ast_event));
00219    len = strlen(filter_str) + 1;
00220    pattern.pattern = (SaUint8T *) filter_str;
00221    pattern.patternSize = len;
00222    pattern.allocatedSize = len;
00223 
00224    pattern_array.allocatedNumber = 1;
00225    pattern_array.patternsNumber = 1;
00226    pattern_array.patterns = &pattern;
00227 
00228    /*!
00229     * /todo Make retention time configurable
00230     * /todo Make event priorities configurable
00231     */
00232    ais_res = saEvtEventAttributesSet(event_handle, &pattern_array,
00233       SA_EVT_LOWEST_PRIORITY, SA_TIME_ONE_MINUTE, &local_node.nodeName);
00234    if (ais_res != SA_AIS_OK) {
00235       ast_log(LOG_ERROR, "Error setting event attributes: %s\n", ais_err2str(ais_res));
00236       goto return_event_free;
00237    }
00238 
00239    for (;;) {
00240       ais_res = saEvtEventPublish(event_handle,
00241          ast_event, ast_event_get_size(ast_event), &event_id);
00242       if (ais_res != SA_AIS_ERR_TRY_AGAIN) {
00243          break;
00244       }
00245       sched_yield();
00246    }
00247 
00248    if (ais_res != SA_AIS_OK) {
00249       ast_log(LOG_ERROR, "Error publishing event: %s\n", ais_err2str(ais_res));
00250       goto return_event_free;
00251    }
00252 
00253 return_event_free:
00254    ais_res = saEvtEventFree(event_handle);
00255    if (ais_res != SA_AIS_OK) {
00256       ast_log(LOG_ERROR, "Error freeing allocated event: %s\n", ais_err2str(ais_res));
00257    }
00258    ast_log(LOG_DEBUG, "Returning here (event_free)\n");
00259 }
00260 
00261 static char *ais_evt_show_event_channels(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
00262 {
00263    struct event_channel *event_channel;
00264 
00265    switch (cmd) {
00266    case CLI_INIT:
00267       e->command = "ais evt show event channels";
00268       e->usage =
00269          "Usage: ais evt show event channels\n"
00270          "       List configured event channels for the (EVT) Eventing service.\n";
00271       return NULL;
00272 
00273    case CLI_GENERATE:
00274       return NULL;   /* no completion */
00275    }
00276 
00277    if (a->argc != e->args)
00278       return CLI_SHOWUSAGE;
00279 
00280    ast_cli(a->fd, "\n"
00281                "=============================================================\n"
00282                "=== Event Channels ==========================================\n"
00283                "=============================================================\n"
00284                "===\n");
00285 
00286    AST_RWLIST_RDLOCK(&event_channels);
00287    AST_RWLIST_TRAVERSE(&event_channels, event_channel, entry) {
00288       struct publish_event *publish_event;
00289       struct subscribe_event *subscribe_event;
00290 
00291       ast_cli(a->fd, "=== ---------------------------------------------------------\n"
00292                      "=== Event Channel Name: %s\n", event_channel->name);
00293 
00294       AST_LIST_TRAVERSE(&event_channel->publish_events, publish_event, entry) {
00295          ast_cli(a->fd, "=== ==> Publishing Event Type: %s\n",
00296             type_to_filter_str(publish_event->type));
00297       }
00298 
00299       AST_LIST_TRAVERSE(&event_channel->subscribe_events, subscribe_event, entry) {
00300          ast_cli(a->fd, "=== ==> Subscribing to Event Type: %s\n",
00301             type_to_filter_str(subscribe_event->type));
00302       }
00303 
00304       ast_cli(a->fd, "=== ---------------------------------------------------------\n"
00305                      "===\n");
00306    }
00307    AST_RWLIST_UNLOCK(&event_channels);
00308 
00309    ast_cli(a->fd, "=============================================================\n"
00310                   "\n");
00311 
00312    return CLI_SUCCESS;
00313 }
00314 
00315 static struct ast_cli_entry ais_cli[] = {
00316    AST_CLI_DEFINE(ais_evt_show_event_channels, "Show configured event channels"),
00317 };
00318 
00319 void ast_ais_evt_membership_changed(void)
00320 {
00321    struct event_channel *ec;
00322 
00323    AST_RWLIST_RDLOCK(&event_channels);
00324    AST_RWLIST_TRAVERSE(&event_channels, ec, entry) {
00325       struct publish_event *pe;
00326 
00327       AST_LIST_TRAVERSE(&ec->publish_events, pe, entry) {
00328          ast_debug(1, "Dumping cache for event channel '%s'\n", ec->name);
00329          ast_event_dump_cache(pe->sub);
00330       }
00331    }
00332    AST_RWLIST_UNLOCK(&event_channels);
00333 }
00334 
00335 static void add_publish_event(struct event_channel *event_channel, const char *event_type)
00336 {
00337    int i;
00338    enum ast_event_type type = -1;
00339    struct publish_event *publish_event;
00340 
00341    for (i = 0; i < ARRAY_LEN(supported_event_types); i++) {
00342       if (!strcasecmp(event_type, supported_event_types[i].str)) {
00343          type = supported_event_types[i].type;
00344          break;
00345       }
00346    }
00347 
00348    if (type == -1) {
00349       ast_log(LOG_WARNING, "publish_event option given with invalid value '%s'\n", event_type);
00350       return;
00351    }
00352 
00353    if (type == AST_EVENT_DEVICE_STATE_CHANGE && ast_enable_distributed_devstate()) {
00354       return;
00355    }
00356 
00357    if (!(publish_event = ast_calloc(1, sizeof(*publish_event)))) {
00358       return;
00359    }
00360 
00361    publish_event->type = type;
00362    ast_log(LOG_DEBUG, "Subscribing to event type %d\n", type);
00363    publish_event->sub = ast_event_subscribe(type, ast_event_cb, "AIS", event_channel,
00364       AST_EVENT_IE_END);
00365    ast_event_dump_cache(publish_event->sub);
00366 
00367    AST_LIST_INSERT_TAIL(&event_channel->publish_events, publish_event, entry);
00368 }
00369 
00370 static SaAisErrorT set_egress_subscription(struct event_channel *event_channel,
00371    struct subscribe_event *subscribe_event)
00372 {
00373    SaAisErrorT ais_res;
00374    SaEvtEventFilterArrayT filter_array;
00375    SaEvtEventFilterT filter;
00376    const char *filter_str = NULL;
00377    SaSizeT len;
00378 
00379    /* We know it's going to be valid.  It was checked earlier. */
00380    filter_str = type_to_filter_str(subscribe_event->type);
00381 
00382    filter.filterType = SA_EVT_EXACT_FILTER;
00383    len = strlen(filter_str) + 1;
00384    filter.filter.allocatedSize = len;
00385    filter.filter.patternSize = len;
00386    filter.filter.pattern = (SaUint8T *) filter_str;
00387 
00388    filter_array.filtersNumber = 1;
00389    filter_array.filters = &filter;
00390 
00391    ais_res = saEvtEventSubscribe(event_channel->handle, &filter_array,
00392       subscribe_event->id);
00393 
00394    return ais_res;
00395 }
00396 
00397 static void add_subscribe_event(struct event_channel *event_channel, const char *event_type)
00398 {
00399    int i;
00400    enum ast_event_type type = -1;
00401    struct subscribe_event *subscribe_event;
00402    SaAisErrorT ais_res;
00403 
00404    for (i = 0; i < ARRAY_LEN(supported_event_types); i++) {
00405       if (!strcasecmp(event_type, supported_event_types[i].str)) {
00406          type = supported_event_types[i].type;
00407          break;
00408       }
00409    }
00410 
00411    if (type == -1) {
00412       ast_log(LOG_WARNING, "subscribe_event option given with invalid value '%s'\n", event_type);
00413       return;
00414    }
00415 
00416    if (type == AST_EVENT_DEVICE_STATE_CHANGE && ast_enable_distributed_devstate()) {
00417       return;
00418    }
00419 
00420    if (!(subscribe_event = ast_calloc(1, sizeof(*subscribe_event)))) {
00421       return;
00422    }
00423 
00424    subscribe_event->type = type;
00425    subscribe_event->id = ast_atomic_fetchadd_int(&unique_id, +1);
00426 
00427    ais_res = set_egress_subscription(event_channel, subscribe_event);
00428    if (ais_res != SA_AIS_OK) {
00429       ast_log(LOG_ERROR, "Error setting up egress subscription: %s\n",
00430          ais_err2str(ais_res));
00431       free(subscribe_event);
00432       return;
00433    }
00434 
00435    AST_LIST_INSERT_TAIL(&event_channel->subscribe_events, subscribe_event, entry);
00436 }
00437 
00438 static void build_event_channel(struct ast_config *cfg, const char *cat)
00439 {
00440    struct ast_variable *var;
00441    struct event_channel *event_channel;
00442    SaAisErrorT ais_res;
00443    SaNameT sa_name = { 0, };
00444 
00445    AST_RWLIST_WRLOCK(&event_channels);
00446    AST_RWLIST_TRAVERSE(&event_channels, event_channel, entry) {
00447       if (!strcasecmp(event_channel->name, cat))
00448          break;
00449    }
00450    AST_RWLIST_UNLOCK(&event_channels);
00451    if (event_channel) {
00452       ast_log(LOG_WARNING, "Event channel '%s' was specified twice in "
00453          "configuration.  Second instance ignored.\n", cat);
00454       return;
00455    }
00456 
00457    if (!(event_channel = ast_calloc(1, sizeof(*event_channel) + strlen(cat))))
00458       return;
00459 
00460    strcpy(event_channel->name, cat);
00461    ast_copy_string((char *) sa_name.value, cat, sizeof(sa_name.value));
00462    sa_name.length = strlen((char *) sa_name.value);
00463    ais_res = saEvtChannelOpen(evt_handle, &sa_name,
00464       SA_EVT_CHANNEL_PUBLISHER | SA_EVT_CHANNEL_SUBSCRIBER | SA_EVT_CHANNEL_CREATE,
00465       SA_TIME_MAX, &event_channel->handle);
00466    if (ais_res != SA_AIS_OK) {
00467       ast_log(LOG_ERROR, "Error opening event channel: %s\n", ais_err2str(ais_res));
00468       free(event_channel);
00469       return;
00470    }
00471 
00472    for (var = ast_variable_browse(cfg, cat); var; var = var->next) {
00473       if (!strcasecmp(var->name, "type")) {
00474          continue;
00475       } else if (!strcasecmp(var->name, "publish_event")) {
00476          add_publish_event(event_channel, var->value);
00477       } else if (!strcasecmp(var->name, "subscribe_event")) {
00478          add_subscribe_event(event_channel, var->value);
00479       } else {
00480          ast_log(LOG_WARNING, "Event channel '%s' contains invalid option '%s'\n",
00481             event_channel->name, var->name);
00482       }
00483    }
00484 
00485    AST_RWLIST_WRLOCK(&event_channels);
00486    AST_RWLIST_INSERT_TAIL(&event_channels, event_channel, entry);
00487    AST_RWLIST_UNLOCK(&event_channels);
00488 }
00489 
00490 static void load_config(void)
00491 {
00492    static const char filename[] = "ais.conf";
00493    struct ast_config *cfg;
00494    const char *cat = NULL;
00495    struct ast_flags config_flags = { 0 };
00496 
00497    if (!(cfg = ast_config_load(filename, config_flags)) || cfg == CONFIG_STATUS_FILEINVALID)
00498       return;
00499 
00500    while ((cat = ast_category_browse(cfg, cat))) {
00501       const char *type;
00502 
00503       if (!strcasecmp(cat, "general"))
00504          continue;
00505 
00506       if (!(type = ast_variable_retrieve(cfg, cat, "type"))) {
00507          ast_log(LOG_WARNING, "Invalid entry in %s defined with no type!\n",
00508             filename);
00509          continue;
00510       }
00511 
00512       if (!strcasecmp(type, "event_channel")) {
00513          build_event_channel(cfg, cat);
00514       } else {
00515          ast_log(LOG_WARNING, "Entry in %s defined with invalid type '%s'\n",
00516             filename, type);
00517       }
00518    }
00519 
00520    ast_config_destroy(cfg);
00521 }
00522 
00523 static void publish_event_destroy(struct publish_event *publish_event)
00524 {
00525    ast_event_unsubscribe(publish_event->sub);
00526 
00527    free(publish_event);
00528 }
00529 
00530 static void subscribe_event_destroy(const struct event_channel *event_channel,
00531    struct subscribe_event *subscribe_event)
00532 {
00533    SaAisErrorT ais_res;
00534 
00535    /* saEvtChannelClose() will actually do this automatically, but it just
00536     * feels cleaner to go ahead and do it manually ... */
00537    ais_res = saEvtEventUnsubscribe(event_channel->handle, subscribe_event->id);
00538    if (ais_res != SA_AIS_OK) {
00539       ast_log(LOG_ERROR, "Error unsubscribing: %s\n", ais_err2str(ais_res));
00540    }
00541 
00542    free(subscribe_event);
00543 }
00544 
00545 static void event_channel_destroy(struct event_channel *event_channel)
00546 {
00547    struct publish_event *publish_event;
00548    struct subscribe_event *subscribe_event;
00549    SaAisErrorT ais_res;
00550 
00551    while ((publish_event = AST_LIST_REMOVE_HEAD(&event_channel->publish_events, entry)))
00552       publish_event_destroy(publish_event);
00553    while ((subscribe_event = AST_LIST_REMOVE_HEAD(&event_channel->subscribe_events, entry)))
00554       subscribe_event_destroy(event_channel, subscribe_event);
00555 
00556    ais_res = saEvtChannelClose(event_channel->handle);
00557    if (ais_res != SA_AIS_OK) {
00558       ast_log(LOG_ERROR, "Error closing event channel '%s': %s\n",
00559          event_channel->name, ais_err2str(ais_res));
00560    }
00561 
00562    free(event_channel);
00563 }
00564 
00565 static void destroy_event_channels(void)
00566 {
00567    struct event_channel *event_channel;
00568 
00569    AST_RWLIST_WRLOCK(&event_channels);
00570    while ((event_channel = AST_RWLIST_REMOVE_HEAD(&event_channels, entry))) {
00571       event_channel_destroy(event_channel);
00572    }
00573    AST_RWLIST_UNLOCK(&event_channels);
00574 }
00575 
00576 int ast_ais_evt_load_module(void)
00577 {
00578    evt_init_res = saEvtInitialize(&evt_handle, &evt_callbacks, &ais_version);
00579    if (evt_init_res != SA_AIS_OK) {
00580       ast_log(LOG_ERROR, "Could not initialize eventing service: %s\n",
00581          ais_err2str(evt_init_res));
00582       return -1;
00583    }
00584 
00585    load_config();
00586 
00587    ast_cli_register_multiple(ais_cli, ARRAY_LEN(ais_cli));
00588 
00589    return 0;
00590 }
00591 
00592 int ast_ais_evt_unload_module(void)
00593 {
00594    SaAisErrorT ais_res;
00595 
00596    if (evt_init_res != SA_AIS_OK) {
00597       return 0;
00598    }
00599 
00600    destroy_event_channels();
00601 
00602    ais_res = saEvtFinalize(evt_handle);
00603    if (ais_res != SA_AIS_OK) {
00604       ast_log(LOG_ERROR, "Problem stopping eventing service: %s\n",
00605          ais_err2str(ais_res));
00606       return -1;
00607    }
00608 
00609    return 0;
00610 }

Generated on 27 Jan 2016 for Asterisk - The Open Source Telephony Project by  doxygen 1.6.1