Wed Apr 6 11:30:08 2011

Asterisk developer's documentation


pbx_spool.c File Reference

Full-featured outgoing call spool support. More...

#include "asterisk.h"
#include <sys/stat.h>
#include <time.h>
#include <utime.h>
#include <dirent.h>
#include <sys/inotify.h>
#include "asterisk/paths.h"
#include "asterisk/lock.h"
#include "asterisk/file.h"
#include "asterisk/logger.h"
#include "asterisk/channel.h"
#include "asterisk/callerid.h"
#include "asterisk/pbx.h"
#include "asterisk/module.h"
#include "asterisk/utils.h"
#include "asterisk/options.h"

Go to the source code of this file.

Data Structures

struct  createlist
struct  direntry
struct  dirlist
struct  outgoing

Enumerations

enum  { SPOOL_FLAG_ALWAYS_DELETE = (1 << 0), SPOOL_FLAG_ARCHIVE = (1 << 1) }

Functions

static void __reg_module (void)
static void __unreg_module (void)
static int apply_outgoing (struct outgoing *o, const char *fn, FILE *f)
static void * attempt_thread (void *data)
static void free_outgoing (struct outgoing *o)
static int init_outgoing (struct outgoing *o)
static void launch_service (struct outgoing *o)
static int load_module (void)
static void queue_file (const char *filename, time_t when)
static void queue_file_create (const char *filename)
static void queue_file_write (const char *filename)
static int remove_from_queue (struct outgoing *o, const char *status)
 Remove a call file from the outgoing queue optionally moving it in the archive dir.
static void safe_append (struct outgoing *o, time_t now, char *s)
static int scan_service (const char *fn, time_t now)
static void * scan_thread (void *unused)
static int unload_module (void)

Variables

static struct ast_module_info __mod_info = { .name = AST_MODULE, .flags = AST_MODFLAG_LOAD_ORDER , .description = "Outgoing Spool Support" , .key = "This paragraph is copyright (c) 2006 by Digium, Inc. \In order for your module to load, it must return this \key via a function called \"key\". Any code which \includes this paragraph must be licensed under the GNU \General Public License version 2 or later (at your \option). In addition to Digium's general reservations \of rights, Digium expressly reserves the right to \allow other parties to license this paragraph under \different terms. Any use of Digium, Inc. trademarks or \logos (including \"Asterisk\" or \"Digium\") without \express written permission of Digium, Inc. is prohibited.\n" , .buildopt_sum = "8586c2a7d357cb591cc3a6607a8f62d1" , .load = load_module, .unload = unload_module, .load_pri = AST_MODPRI_DEFAULT, }
static struct ast_module_infoast_module_info = &__mod_info
static char qdir [255]
static char qdonedir [255]


Detailed Description

Full-featured outgoing call spool support.

Definition in file pbx_spool.c.


Enumeration Type Documentation

anonymous enum

Enumerator:
SPOOL_FLAG_ALWAYS_DELETE  Always delete the call file after a call succeeds or the maximum number of retries is exceeded, even if the modification time of the call file is in the future.
SPOOL_FLAG_ARCHIVE 

Definition at line 58 of file pbx_spool.c.

00058      {
00059    /*! Always delete the call file after a call succeeds or the
00060     * maximum number of retries is exceeded, even if the
00061     * modification time of the call file is in the future.
00062     */
00063    SPOOL_FLAG_ALWAYS_DELETE = (1 << 0),
00064    /* Don't unlink the call file after processing, move in qdonedir */
00065    SPOOL_FLAG_ARCHIVE = (1 << 1),
00066 };


Function Documentation

static void __reg_module ( void   )  [static]

Definition at line 781 of file pbx_spool.c.

static void __unreg_module ( void   )  [static]

Definition at line 781 of file pbx_spool.c.

static int apply_outgoing ( struct outgoing o,
const char *  fn,
FILE *  f 
) [static]

Definition at line 122 of file pbx_spool.c.

References outgoing::app, app, ast_callerid_split(), ast_log(), ast_parse_allow_disallow(), ast_set2_flag, ast_string_field_set, ast_strlen_zero(), ast_true(), ast_variable_new(), outgoing::callingpid, cid_name, cid_num, context, outgoing::dest, outgoing::exten, exten, outgoing::format, last, LOG_NOTICE, LOG_WARNING, outgoing::maxretries, sla_ringing_trunk::next, outgoing::options, outgoing::priority, outgoing::retries, outgoing::retrytime, SPOOL_FLAG_ALWAYS_DELETE, SPOOL_FLAG_ARCHIVE, strsep(), outgoing::tech, var, outgoing::vars, and outgoing::waittime.

Referenced by scan_service().

00123 {
00124    char buf[256];
00125    char *c, *c2;
00126    int lineno = 0;
00127    struct ast_variable *var, *last = o->vars;
00128 
00129    while (last && last->next) {
00130       last = last->next;
00131    }
00132 
00133    while(fgets(buf, sizeof(buf), f)) {
00134       lineno++;
00135       /* Trim comments */
00136       c = buf;
00137       while ((c = strchr(c, '#'))) {
00138          if ((c == buf) || (*(c-1) == ' ') || (*(c-1) == '\t'))
00139             *c = '\0';
00140          else
00141             c++;
00142       }
00143 
00144       c = buf;
00145       while ((c = strchr(c, ';'))) {
00146          if ((c > buf) && (c[-1] == '\\')) {
00147             memmove(c - 1, c, strlen(c) + 1);
00148             c++;
00149          } else {
00150             *c = '\0';
00151             break;
00152          }
00153       }
00154 
00155       /* Trim trailing white space */
00156       while(!ast_strlen_zero(buf) && buf[strlen(buf) - 1] < 33)
00157          buf[strlen(buf) - 1] = '\0';
00158       if (!ast_strlen_zero(buf)) {
00159          c = strchr(buf, ':');
00160          if (c) {
00161             *c = '\0';
00162             c++;
00163             while ((*c) && (*c < 33))
00164                c++;
00165 #if 0
00166             printf("'%s' is '%s' at line %d\n", buf, c, lineno);
00167 #endif
00168             if (!strcasecmp(buf, "channel")) {
00169                if ((c2 = strchr(c, '/'))) {
00170                   *c2 = '\0';
00171                   c2++;
00172                   ast_string_field_set(o, tech, c);
00173                   ast_string_field_set(o, dest, c2);
00174                } else {
00175                   ast_log(LOG_NOTICE, "Channel should be in form Tech/Dest at line %d of %s\n", lineno, fn);
00176                }
00177             } else if (!strcasecmp(buf, "callerid")) {
00178                char cid_name[80] = {0}, cid_num[80] = {0};
00179                ast_callerid_split(c, cid_name, sizeof(cid_name), cid_num, sizeof(cid_num));
00180                ast_string_field_set(o, cid_num, cid_num);
00181                ast_string_field_set(o, cid_name, cid_name);
00182             } else if (!strcasecmp(buf, "application")) {
00183                ast_string_field_set(o, app, c);
00184             } else if (!strcasecmp(buf, "data")) {
00185                ast_string_field_set(o, data, c);
00186             } else if (!strcasecmp(buf, "maxretries")) {
00187                if (sscanf(c, "%30d", &o->maxretries) != 1) {
00188                   ast_log(LOG_WARNING, "Invalid max retries at line %d of %s\n", lineno, fn);
00189                   o->maxretries = 0;
00190                }
00191             } else if (!strcasecmp(buf, "codecs")) {
00192                ast_parse_allow_disallow(NULL, &o->format, c, 1);
00193             } else if (!strcasecmp(buf, "context")) {
00194                ast_string_field_set(o, context, c);
00195             } else if (!strcasecmp(buf, "extension")) {
00196                ast_string_field_set(o, exten, c);
00197             } else if (!strcasecmp(buf, "priority")) {
00198                if ((sscanf(c, "%30d", &o->priority) != 1) || (o->priority < 1)) {
00199                   ast_log(LOG_WARNING, "Invalid priority at line %d of %s\n", lineno, fn);
00200                   o->priority = 1;
00201                }
00202             } else if (!strcasecmp(buf, "retrytime")) {
00203                if ((sscanf(c, "%30d", &o->retrytime) != 1) || (o->retrytime < 1)) {
00204                   ast_log(LOG_WARNING, "Invalid retrytime at line %d of %s\n", lineno, fn);
00205                   o->retrytime = 300;
00206                }
00207             } else if (!strcasecmp(buf, "waittime")) {
00208                if ((sscanf(c, "%30d", &o->waittime) != 1) || (o->waittime < 1)) {
00209                   ast_log(LOG_WARNING, "Invalid waittime at line %d of %s\n", lineno, fn);
00210                   o->waittime = 45;
00211                }
00212             } else if (!strcasecmp(buf, "retry")) {
00213                o->retries++;
00214             } else if (!strcasecmp(buf, "startretry")) {
00215                if (sscanf(c, "%30ld", &o->callingpid) != 1) {
00216                   ast_log(LOG_WARNING, "Unable to retrieve calling PID!\n");
00217                   o->callingpid = 0;
00218                }
00219             } else if (!strcasecmp(buf, "endretry") || !strcasecmp(buf, "abortretry")) {
00220                o->callingpid = 0;
00221                o->retries++;
00222             } else if (!strcasecmp(buf, "delayedretry")) {
00223             } else if (!strcasecmp(buf, "setvar") || !strcasecmp(buf, "set")) {
00224                c2 = c;
00225                strsep(&c2, "=");
00226                if (c2) {
00227                   var = ast_variable_new(c, c2, fn);
00228                   if (var) {
00229                      /* Always insert at the end, because some people want to treat the spool file as a script */
00230                      if (last) {
00231                         last->next = var;
00232                      } else {
00233                         o->vars = var;
00234                      }
00235                      last = var;
00236                   }
00237                } else
00238                   ast_log(LOG_WARNING, "Malformed \"%s\" argument.  Should be \"%s: variable=value\"\n", buf, buf);
00239             } else if (!strcasecmp(buf, "account")) {
00240                ast_string_field_set(o, account, c);
00241             } else if (!strcasecmp(buf, "alwaysdelete")) {
00242                ast_set2_flag(&o->options, ast_true(c), SPOOL_FLAG_ALWAYS_DELETE);
00243             } else if (!strcasecmp(buf, "archive")) {
00244                ast_set2_flag(&o->options, ast_true(c), SPOOL_FLAG_ARCHIVE);
00245             } else {
00246                ast_log(LOG_WARNING, "Unknown keyword '%s' at line %d of %s\n", buf, lineno, fn);
00247             }
00248          } else
00249             ast_log(LOG_NOTICE, "Syntax error at line %d of %s\n", lineno, fn);
00250       }
00251    }
00252    ast_string_field_set(o, fn, fn);
00253    if (ast_strlen_zero(o->tech) || ast_strlen_zero(o->dest) || (ast_strlen_zero(o->app) && ast_strlen_zero(o->exten))) {
00254       ast_log(LOG_WARNING, "At least one of app or extension must be specified, along with tech and dest in file %s\n", fn);
00255       return -1;
00256    }
00257    return 0;
00258 }

static void* attempt_thread ( void *  data  )  [static]

Definition at line 335 of file pbx_spool.c.

References outgoing::account, outgoing::app, ast_channel_reason2str(), ast_log(), ast_pbx_outgoing_app(), ast_pbx_outgoing_exten(), ast_strlen_zero(), ast_verb, outgoing::cid_name, outgoing::cid_num, outgoing::context, outgoing::data, outgoing::dest, outgoing::exten, outgoing::fn, outgoing::format, free_outgoing(), LOG_NOTICE, outgoing::maxretries, outgoing::priority, queue_file(), remove_from_queue(), outgoing::retries, outgoing::retrytime, safe_append(), outgoing::tech, outgoing::vars, and outgoing::waittime.

Referenced by launch_service().

00336 {
00337    struct outgoing *o = data;
00338    int res, reason;
00339    if (!ast_strlen_zero(o->app)) {
00340       ast_verb(3, "Attempting call on %s/%s for application %s(%s) (Retry %d)\n", o->tech, o->dest, o->app, o->data, o->retries);
00341       res = ast_pbx_outgoing_app(o->tech, o->format, (void *) o->dest, o->waittime * 1000, o->app, o->data, &reason, 2 /* wait to finish */, o->cid_num, o->cid_name, o->vars, o->account, NULL);
00342       o->vars = NULL;
00343    } else {
00344       ast_verb(3, "Attempting call on %s/%s for %s@%s:%d (Retry %d)\n", o->tech, o->dest, o->exten, o->context,o->priority, o->retries);
00345       res = ast_pbx_outgoing_exten(o->tech, o->format, (void *) o->dest, o->waittime * 1000, o->context, o->exten, o->priority, &reason, 2 /* wait to finish */, o->cid_num, o->cid_name, o->vars, o->account, NULL);
00346       o->vars = NULL;
00347    }
00348    if (res) {
00349       ast_log(LOG_NOTICE, "Call failed to go through, reason (%d) %s\n", reason, ast_channel_reason2str(reason));
00350       if (o->retries >= o->maxretries + 1) {
00351          /* Max retries exceeded */
00352          ast_log(LOG_NOTICE, "Queued call to %s/%s expired without completion after %d attempt%s\n", o->tech, o->dest, o->retries - 1, ((o->retries - 1) != 1) ? "s" : "");
00353          remove_from_queue(o, "Expired");
00354       } else {
00355          /* Notate that the call is still active */
00356          safe_append(o, time(NULL), "EndRetry");
00357 #if defined(HAVE_INOTIFY) || defined(HAVE_KQUEUE)
00358          queue_file(o->fn, time(NULL) + o->retrytime);
00359 #endif
00360       }
00361    } else {
00362       ast_log(LOG_NOTICE, "Call completed to %s/%s\n", o->tech, o->dest);
00363       remove_from_queue(o, "Completed");
00364    }
00365    free_outgoing(o);
00366    return NULL;
00367 }

static void free_outgoing ( struct outgoing o  )  [static]

Definition at line 113 of file pbx_spool.c.

References ast_free, ast_string_field_free_memory, ast_variables_destroy(), and outgoing::vars.

Referenced by attempt_thread(), launch_service(), and scan_service().

00114 {
00115    if (o->vars) {
00116       ast_variables_destroy(o->vars);
00117    }
00118    ast_string_field_free_memory(o);
00119    ast_free(o);
00120 }

static int init_outgoing ( struct outgoing o  )  [static]

Definition at line 100 of file pbx_spool.c.

References AST_FORMAT_SLINEAR, ast_set_flag, ast_string_field_init, outgoing::format, outgoing::options, outgoing::priority, outgoing::retrytime, SPOOL_FLAG_ALWAYS_DELETE, and outgoing::waittime.

Referenced by scan_service().

00101 {
00102    o->priority = 1;
00103    o->retrytime = 300;
00104    o->waittime = 45;
00105    o->format = AST_FORMAT_SLINEAR;
00106    ast_set_flag(&o->options, SPOOL_FLAG_ALWAYS_DELETE);
00107    if (ast_string_field_init(o, 128)) {
00108       return -1;
00109    }
00110    return 0;
00111 }

static void launch_service ( struct outgoing o  )  [static]

Definition at line 369 of file pbx_spool.c.

References ast_log(), ast_pthread_create_detached, attempt_thread(), free_outgoing(), and LOG_WARNING.

Referenced by scan_service().

00370 {
00371    pthread_t t;
00372    int ret;
00373 
00374    if ((ret = ast_pthread_create_detached(&t, NULL, attempt_thread, o))) {
00375       ast_log(LOG_WARNING, "Unable to create thread :( (returned error: %d)\n", ret);
00376       free_outgoing(o);
00377    }
00378 }

static int load_module ( void   )  [static]

Definition at line 762 of file pbx_spool.c.

References ast_config_AST_SPOOL_DIR, ast_log(), ast_mkdir(), AST_MODULE_LOAD_DECLINE, AST_MODULE_LOAD_FAILURE, AST_MODULE_LOAD_SUCCESS, ast_pthread_create_detached_background, LOG_WARNING, scan_thread(), and thread.

00763 {
00764    pthread_t thread;
00765    int ret;
00766    snprintf(qdir, sizeof(qdir), "%s/%s", ast_config_AST_SPOOL_DIR, "outgoing");
00767    if (ast_mkdir(qdir, 0777)) {
00768       ast_log(LOG_WARNING, "Unable to create queue directory %s -- outgoing spool disabled\n", qdir);
00769       return AST_MODULE_LOAD_DECLINE;
00770    }
00771    snprintf(qdonedir, sizeof(qdir), "%s/%s", ast_config_AST_SPOOL_DIR, "outgoing_done");
00772 
00773    if ((ret = ast_pthread_create_detached_background(&thread, NULL, scan_thread, NULL))) {
00774       ast_log(LOG_WARNING, "Unable to create thread :( (returned error: %d)\n", ret);
00775       return AST_MODULE_LOAD_FAILURE;
00776    }
00777 
00778    return AST_MODULE_LOAD_SUCCESS;
00779 }

static void queue_file ( const char *  filename,
time_t  when 
) [static]

Definition at line 465 of file pbx_spool.c.

References ast_calloc, AST_LIST_EMPTY, AST_LIST_INSERT_BEFORE_CURRENT, AST_LIST_INSERT_HEAD, AST_LIST_INSERT_TAIL, AST_LIST_LOCK, AST_LIST_TRAVERSE, AST_LIST_TRAVERSE_SAFE_BEGIN, AST_LIST_TRAVERSE_SAFE_END, AST_LIST_UNLOCK, ast_log(), errno, direntry::list, LOG_WARNING, direntry::mtime, direntry::name, and scan_service().

Referenced by attempt_thread(), queue_file_write(), and scan_thread().

00466 {
00467    struct stat st;
00468    struct direntry *cur, *new;
00469    int res;
00470    time_t now = time(NULL);
00471 
00472    if (filename[0] != '/') {
00473       char *fn = alloca(strlen(qdir) + strlen(filename) + 2);
00474       sprintf(fn, "%s/%s", qdir, filename); /* SAFE */
00475       filename = fn;
00476    }
00477 
00478    if (when == 0) {
00479       if (stat(filename, &st)) {
00480          ast_log(LOG_WARNING, "Unable to stat %s: %s\n", filename, strerror(errno));
00481          return;
00482       }
00483 
00484       if (!S_ISREG(st.st_mode)) {
00485          return;
00486       }
00487 
00488       when = st.st_mtime;
00489    }
00490 
00491    /* Need to check the existing list in order to avoid duplicates. */
00492    AST_LIST_LOCK(&dirlist);
00493    AST_LIST_TRAVERSE(&dirlist, cur, list) {
00494       if (cur->mtime == when && !strcmp(filename, cur->name)) {
00495          AST_LIST_UNLOCK(&dirlist);
00496          return;
00497       }
00498    }
00499 
00500    if ((res = when) > now || (res = scan_service(filename, now)) > 0) {
00501       if (!(new = ast_calloc(1, sizeof(*new) + strlen(filename) + 1))) {
00502          AST_LIST_UNLOCK(&dirlist);
00503          return;
00504       }
00505       new->mtime = res;
00506       strcpy(new->name, filename);
00507       /* List is ordered by mtime */
00508       if (AST_LIST_EMPTY(&dirlist)) {
00509          AST_LIST_INSERT_HEAD(&dirlist, new, list);
00510       } else {
00511          int found = 0;
00512          AST_LIST_TRAVERSE_SAFE_BEGIN(&dirlist, cur, list) {
00513             if (cur->mtime > new->mtime) {
00514                AST_LIST_INSERT_BEFORE_CURRENT(new, list);
00515                found = 1;
00516                break;
00517             }
00518          }
00519          AST_LIST_TRAVERSE_SAFE_END
00520          if (!found) {
00521             AST_LIST_INSERT_TAIL(&dirlist, new, list);
00522          }
00523       }
00524    }
00525    AST_LIST_UNLOCK(&dirlist);
00526 }

static void queue_file_create ( const char *  filename  )  [static]

Definition at line 529 of file pbx_spool.c.

References ast_calloc, AST_LIST_INSERT_TAIL, AST_LIST_TRAVERSE, direntry::list, and direntry::name.

Referenced by scan_thread().

00530 {
00531    struct direntry *cur;
00532 
00533    AST_LIST_TRAVERSE(&createlist, cur, list) {
00534       if (!strcmp(cur->name, filename)) {
00535          return;
00536       }
00537    }
00538 
00539    if (!(cur = ast_calloc(1, sizeof(*cur) + strlen(filename) + 1))) {
00540       return;
00541    }
00542    strcpy(cur->name, filename);
00543    AST_LIST_INSERT_TAIL(&createlist, cur, list);
00544 }

static void queue_file_write ( const char *  filename  )  [static]

Definition at line 546 of file pbx_spool.c.

References ast_free, AST_LIST_REMOVE_CURRENT, AST_LIST_TRAVERSE_SAFE_BEGIN, AST_LIST_TRAVERSE_SAFE_END, direntry::list, direntry::name, and queue_file().

Referenced by scan_thread().

00547 {
00548    struct direntry *cur;
00549    /* Only queue entries where an IN_CREATE preceded the IN_CLOSE_WRITE */
00550    AST_LIST_TRAVERSE_SAFE_BEGIN(&createlist, cur, list) {
00551       if (!strcmp(cur->name, filename)) {
00552          AST_LIST_REMOVE_CURRENT(list);
00553          ast_free(cur);
00554          queue_file(filename, 0);
00555          break;
00556       }
00557    }
00558    AST_LIST_TRAVERSE_SAFE_END
00559 }

static int remove_from_queue ( struct outgoing o,
const char *  status 
) [static]

Remove a call file from the outgoing queue optionally moving it in the archive dir.

Parameters:
o the pointer to outgoing struct
status the exit status of the call. Can be "Completed", "Failed" or "Expired"

Definition at line 284 of file pbx_spool.c.

References ast_log(), ast_mkdir(), ast_test_flag, f, outgoing::fn, LOG_WARNING, outgoing::options, SPOOL_FLAG_ALWAYS_DELETE, and SPOOL_FLAG_ARCHIVE.

00285 {
00286    FILE *f;
00287    char newfn[256];
00288    const char *bname;
00289 
00290    if (!ast_test_flag(&o->options, SPOOL_FLAG_ALWAYS_DELETE)) {
00291       struct stat current_file_status;
00292 
00293       if (!stat(o->fn, &current_file_status)) {
00294          if (time(NULL) < current_file_status.st_mtime) {
00295             return 0;
00296          }
00297       }
00298    }
00299 
00300    if (!ast_test_flag(&o->options, SPOOL_FLAG_ARCHIVE)) {
00301       unlink(o->fn);
00302       return 0;
00303    }
00304 
00305    if (ast_mkdir(qdonedir, 0777)) {
00306       ast_log(LOG_WARNING, "Unable to create queue directory %s -- outgoing spool archiving disabled\n", qdonedir);
00307       unlink(o->fn);
00308       return -1;
00309    }
00310 
00311    if (!(bname = strrchr(o->fn, '/'))) {
00312       bname = o->fn;
00313    } else {
00314       bname++;
00315    }
00316 
00317    snprintf(newfn, sizeof(newfn), "%s/%s", qdonedir, bname);
00318    /* a existing call file the archive dir is overwritten */
00319    unlink(newfn);
00320    if (rename(o->fn, newfn) != 0) {
00321       unlink(o->fn);
00322       return -1;
00323    }
00324 
00325    /* Only append to the file AFTER we move it out of the watched directory,
00326     * otherwise the fclose() causes another event for inotify(7) */
00327    if ((f = fopen(newfn, "a"))) {
00328       fprintf(f, "Status: %s\n", status);
00329       fclose(f);
00330    }
00331 
00332    return 0;
00333 }

static void safe_append ( struct outgoing o,
time_t  now,
char *  s 
) [static]

Definition at line 260 of file pbx_spool.c.

References ast_debug, ast_log(), ast_mainpid, outgoing::dest, errno, f, outgoing::fn, LOG_WARNING, outgoing::retries, outgoing::retrytime, and outgoing::tech.

Referenced by attempt_thread(), and scan_service().

00261 {
00262    FILE *f;
00263    struct utimbuf tbuf = { .actime = now, .modtime = now + o->retrytime };
00264 
00265    ast_debug(1, "Outgoing %s/%s: %s\n", o->tech, o->dest, s);
00266 
00267    if ((f = fopen(o->fn, "a"))) {
00268       fprintf(f, "\n%s: %ld %d (%ld)\n", s, (long)ast_mainpid, o->retries, (long) now);
00269       fclose(f);
00270    }
00271 
00272    /* Update the file time */
00273    if (utime(o->fn, &tbuf)) {
00274       ast_log(LOG_WARNING, "Unable to set utime on %s: %s\n", o->fn, strerror(errno));
00275    }
00276 }

static int scan_service ( const char *  fn,
time_t  now 
) [static]

Definition at line 381 of file pbx_spool.c.

References apply_outgoing(), ast_calloc, ast_free, ast_log(), ast_mainpid, errno, f, free_outgoing(), init_outgoing(), launch_service(), LOG_DEBUG, LOG_NOTICE, LOG_WARNING, remove_from_queue(), and safe_append().

Referenced by queue_file().

00382 {
00383    struct outgoing *o = NULL;
00384    FILE *f;
00385    int res = 0;
00386 
00387    if (!(o = ast_calloc(1, sizeof(*o)))) {
00388       ast_log(LOG_WARNING, "Out of memory ;(\n");
00389       return -1;
00390    }
00391 
00392    if (init_outgoing(o)) {
00393       /* No need to call free_outgoing here since we know the failure
00394        * was to allocate string fields and no variables have been allocated
00395        * yet.
00396        */
00397       ast_free(o);
00398       return -1;
00399    }
00400 
00401    /* Attempt to open the file */
00402    if (!(f = fopen(fn, "r"))) {
00403       remove_from_queue(o, "Failed");
00404       free_outgoing(o);
00405 #if !defined(HAVE_INOTIFY) && !defined(HAVE_KQUEUE)
00406       ast_log(LOG_WARNING, "Unable to open %s: %s, deleting\n", fn, strerror(errno));
00407 #endif
00408       return -1;
00409    }
00410 
00411    /* Read in and verify the contents */
00412    if (apply_outgoing(o, fn, f)) {
00413       remove_from_queue(o, "Failed");
00414       free_outgoing(o);
00415       ast_log(LOG_WARNING, "Invalid file contents in %s, deleting\n", fn);
00416       fclose(f);
00417       return -1;
00418    }
00419 
00420 #if 0
00421    printf("Filename: %s, Retries: %d, max: %d\n", fn, o->retries, o->maxretries);
00422 #endif
00423    fclose(f);
00424    if (o->retries <= o->maxretries) {
00425       now += o->retrytime;
00426       if (o->callingpid && (o->callingpid == ast_mainpid)) {
00427          safe_append(o, time(NULL), "DelayedRetry");
00428          ast_log(LOG_DEBUG, "Delaying retry since we're currently running '%s'\n", o->fn);
00429          free_outgoing(o);
00430       } else {
00431          /* Increment retries */
00432          o->retries++;
00433          /* If someone else was calling, they're presumably gone now
00434             so abort their retry and continue as we were... */
00435          if (o->callingpid)
00436             safe_append(o, time(NULL), "AbortRetry");
00437 
00438          safe_append(o, now, "StartRetry");
00439          launch_service(o);
00440       }
00441       res = now;
00442    } else {
00443       ast_log(LOG_NOTICE, "Queued call to %s/%s expired without completion after %d attempt%s\n", o->tech, o->dest, o->retries - 1, ((o->retries - 1) != 1) ? "s" : "");
00444       remove_from_queue(o, "Expired");
00445       free_outgoing(o);
00446    }
00447 
00448    return res;
00449 }

static void* scan_thread ( void *  unused  )  [static]

Definition at line 562 of file pbx_spool.c.

References ast_debug, ast_free, ast_fully_booted, AST_LIST_EMPTY, AST_LIST_FIRST, AST_LIST_LOCK, AST_LIST_REMOVE_HEAD, AST_LIST_UNLOCK, ast_log(), errno, HAVE_INOTIFY, inotify_fd, len(), direntry::list, LOG_ERROR, direntry::mtime, direntry::name, direntry::next, queue_file(), queue_file_create(), and queue_file_write().

Referenced by load_module().

00563 {
00564    DIR *dir;
00565    struct dirent *de;
00566    time_t now;
00567    struct timespec ts = { .tv_sec = 1 };
00568 #ifdef HAVE_INOTIFY
00569    ssize_t res;
00570    int inotify_fd = inotify_init();
00571    struct inotify_event *iev;
00572    char buf[8192] __attribute__((aligned (sizeof(int))));
00573    struct pollfd pfd = { .fd = inotify_fd, .events = POLLIN };
00574 #else
00575    struct timespec nowait = { 0, 1 };
00576    int inotify_fd = kqueue();
00577    struct kevent kev;
00578 #endif
00579    struct direntry *cur;
00580 
00581    while (!ast_fully_booted) {
00582       nanosleep(&ts, NULL);
00583    }
00584 
00585    if (inotify_fd < 0) {
00586       ast_log(LOG_ERROR, "Unable to initialize "
00587 #ifdef HAVE_INOTIFY
00588          "inotify(7)"
00589 #else
00590          "kqueue(2)"
00591 #endif
00592          "\n");
00593       return NULL;
00594    }
00595 
00596 #ifdef HAVE_INOTIFY
00597    inotify_add_watch(inotify_fd, qdir, IN_CREATE | IN_CLOSE_WRITE | IN_MOVED_TO);
00598 #endif
00599 
00600    /* First, run through the directory and clear existing entries */
00601    if (!(dir = opendir(qdir))) {
00602       ast_log(LOG_ERROR, "Unable to open directory %s: %s\n", qdir, strerror(errno));
00603       return NULL;
00604    }
00605 
00606 #ifndef HAVE_INOTIFY
00607    EV_SET(&kev, dirfd(dir), EVFILT_VNODE, EV_ADD | EV_ENABLE | EV_CLEAR, NOTE_WRITE, 0, NULL);
00608    if (kevent(inotify_fd, &kev, 1, NULL, 0, &nowait) < 0 && errno != 0) {
00609       ast_log(LOG_ERROR, "Unable to watch directory %s: %s\n", qdir, strerror(errno));
00610    }
00611 #endif
00612    now = time(NULL);
00613    while ((de = readdir(dir))) {
00614       queue_file(de->d_name, 0);
00615    }
00616 
00617 #ifdef HAVE_INOTIFY
00618    /* Directory needs to remain open for kqueue(2) */
00619    closedir(dir);
00620 #endif
00621 
00622    /* Wait for either a) next timestamp to occur, or b) a change to happen */
00623    for (;/* ever */;) {
00624       time_t next = AST_LIST_EMPTY(&dirlist) ? INT_MAX : AST_LIST_FIRST(&dirlist)->mtime;
00625 
00626       time(&now);
00627       if (next > now) {
00628 #ifdef HAVE_INOTIFY
00629          int stage = 0;
00630          /* Convert from seconds to milliseconds, unless there's nothing
00631           * in the queue already, in which case, we wait forever. */
00632          int waittime = next == INT_MAX ? -1 : (next - now) * 1000;
00633          /* When a file arrives, add it to the queue, in mtime order. */
00634          if ((res = poll(&pfd, 1, waittime)) > 0 && (stage = 1) &&
00635             (res = read(inotify_fd, &buf, sizeof(buf))) >= sizeof(*iev)) {
00636             ssize_t len = 0;
00637             /* File(s) added to directory, add them to my list */
00638             for (iev = (void *) buf; res >= sizeof(*iev); iev = (struct inotify_event *) (((char *) iev) + len)) {
00639                if (iev->mask & IN_CREATE) {
00640                   queue_file_create(iev->name);
00641                } else if (iev->mask & IN_CLOSE_WRITE) {
00642                   queue_file_write(iev->name);
00643                } else if (iev->mask & IN_MOVED_TO) {
00644                   queue_file(iev->name, 0);
00645                } else {
00646                   ast_log(LOG_ERROR, "Unexpected event %d for file '%s'\n", (int) iev->mask, iev->name);
00647                }
00648 
00649                len = sizeof(*iev) + iev->len;
00650                res -= len;
00651             }
00652          } else if (res < 0 && errno != EINTR && errno != EAGAIN) {
00653             ast_debug(1, "Got an error back from %s(2): %s\n", stage ? "read" : "poll", strerror(errno));
00654          }
00655 #else
00656          struct timespec ts2 = { next - now, 0 };
00657          if (kevent(inotify_fd, NULL, 0, &kev, 1, &ts2) <= 0) {
00658             /* Interrupt or timeout, restart calculations */
00659             continue;
00660          } else {
00661             /* Directory changed, rescan */
00662             rewinddir(dir);
00663             while ((de = readdir(dir))) {
00664                queue_file(de->d_name, 0);
00665             }
00666          }
00667 #endif
00668          time(&now);
00669       }
00670 
00671       /* Empty the list of all entries ready to be processed */
00672       AST_LIST_LOCK(&dirlist);
00673       while (!AST_LIST_EMPTY(&dirlist) && AST_LIST_FIRST(&dirlist)->mtime <= now) {
00674          cur = AST_LIST_REMOVE_HEAD(&dirlist, list);
00675          queue_file(cur->name, cur->mtime);
00676          ast_free(cur);
00677       }
00678       AST_LIST_UNLOCK(&dirlist);
00679    }
00680    return NULL;
00681 }

static int unload_module ( void   )  [static]

Definition at line 757 of file pbx_spool.c.

00758 {
00759    return -1;
00760 }


Variable Documentation

struct ast_module_info __mod_info = { .name = AST_MODULE, .flags = AST_MODFLAG_LOAD_ORDER , .description = "Outgoing Spool Support" , .key = "This paragraph is copyright (c) 2006 by Digium, Inc. \In order for your module to load, it must return this \key via a function called \"key\". Any code which \includes this paragraph must be licensed under the GNU \General Public License version 2 or later (at your \option). In addition to Digium's general reservations \of rights, Digium expressly reserves the right to \allow other parties to license this paragraph under \different terms. Any use of Digium, Inc. trademarks or \logos (including \"Asterisk\" or \"Digium\") without \express written permission of Digium, Inc. is prohibited.\n" , .buildopt_sum = "8586c2a7d357cb591cc3a6607a8f62d1" , .load = load_module, .unload = unload_module, .load_pri = AST_MODPRI_DEFAULT, } [static]

Definition at line 781 of file pbx_spool.c.

struct ast_module_info* ast_module_info = &__mod_info [static]

Definition at line 781 of file pbx_spool.c.

char qdir[255] [static]

Definition at line 68 of file pbx_spool.c.

char qdonedir[255] [static]

Definition at line 69 of file pbx_spool.c.


Generated on Wed Apr 6 11:30:08 2011 for Asterisk - The Open Source Telephony Project by  doxygen 1.4.7