Mon Aug 31 12:30:41 2015

Asterisk developer's documentation


pbx_spool.c File Reference

Full-featured outgoing call spool support. More...

#include "asterisk.h"
#include <sys/stat.h>
#include <time.h>
#include <utime.h>
#include <dirent.h>
#include <sys/inotify.h>
#include "asterisk/paths.h"
#include "asterisk/lock.h"
#include "asterisk/file.h"
#include "asterisk/logger.h"
#include "asterisk/channel.h"
#include "asterisk/callerid.h"
#include "asterisk/pbx.h"
#include "asterisk/module.h"
#include "asterisk/utils.h"
#include "asterisk/options.h"

Go to the source code of this file.

Data Structures

struct  createlist
struct  direntry
struct  dirlist
struct  openlist
struct  outgoing

Enumerations

enum  { SPOOL_FLAG_ALWAYS_DELETE = (1 << 0), SPOOL_FLAG_ARCHIVE = (1 << 1) }

Functions

static void __reg_module (void)
static void __unreg_module (void)
static int apply_outgoing (struct outgoing *o, FILE *f)
static void * attempt_thread (void *data)
static void free_outgoing (struct outgoing *o)
static void launch_service (struct outgoing *o)
static int load_module (void)
static struct outgoingnew_outgoing (const char *fn)
static void queue_created_files (void)
static void queue_file (const char *filename, time_t when)
static void queue_file_create (const char *filename)
static void queue_file_open (const char *filename)
static void queue_file_write (const char *filename)
static int remove_from_queue (struct outgoing *o, const char *status)
 Remove a call file from the outgoing queue optionally moving it in the archive dir.
static void safe_append (struct outgoing *o, time_t now, char *s)
static int scan_service (const char *fn, time_t now)
static void * scan_thread (void *unused)
static int unload_module (void)

Variables

static struct ast_module_info __mod_info = { .name = AST_MODULE, .flags = AST_MODFLAG_LOAD_ORDER , .description = "Outgoing Spool Support" , .key = "This paragraph is copyright (c) 2006 by Digium, Inc. \In order for your module to load, it must return this \key via a function called \"key\". Any code which \includes this paragraph must be licensed under the GNU \General Public License version 2 or later (at your \option). In addition to Digium's general reservations \of rights, Digium expressly reserves the right to \allow other parties to license this paragraph under \different terms. Any use of Digium, Inc. trademarks or \logos (including \"Asterisk\" or \"Digium\") without \express written permission of Digium, Inc. is prohibited.\n" , .buildopt_sum = "ac1f6a56484a8820659555499174e588" , .load = load_module, .unload = unload_module, .load_pri = AST_MODPRI_DEFAULT, }
static struct ast_module_infoast_module_info = &__mod_info
static char qdir [255]
static char qdonedir [255]

Detailed Description

Full-featured outgoing call spool support.

Definition in file pbx_spool.c.


Enumeration Type Documentation

anonymous enum
Enumerator:
SPOOL_FLAG_ALWAYS_DELETE 

Always delete the call file after a call succeeds or the maximum number of retries is exceeded, even if the modification time of the call file is in the future.

SPOOL_FLAG_ARCHIVE 

Definition at line 62 of file pbx_spool.c.

00062      {
00063    /*! Always delete the call file after a call succeeds or the
00064     * maximum number of retries is exceeded, even if the
00065     * modification time of the call file is in the future.
00066     */
00067    SPOOL_FLAG_ALWAYS_DELETE = (1 << 0),
00068    /* Don't unlink the call file after processing, move in qdonedir */
00069    SPOOL_FLAG_ARCHIVE = (1 << 1),
00070 };


Function Documentation

static void __reg_module ( void   )  [static]

Definition at line 893 of file pbx_spool.c.

static void __unreg_module ( void   )  [static]

Definition at line 893 of file pbx_spool.c.

static int apply_outgoing ( struct outgoing o,
FILE *  f 
) [static]

Definition at line 147 of file pbx_spool.c.

References outgoing::app, app, ast_callerid_split(), ast_log(), ast_parse_allow_disallow(), ast_set2_flag, ast_skip_blanks(), ast_string_field_set, ast_strlen_zero(), ast_trim_blanks(), ast_true(), ast_variable_new(), outgoing::callingpid, cid_name, cid_num, context, outgoing::dest, outgoing::exten, exten, outgoing::fn, outgoing::format, last, LOG_NOTICE, LOG_WARNING, outgoing::maxretries, ast_variable::next, outgoing::options, outgoing::priority, outgoing::retries, outgoing::retrytime, SPOOL_FLAG_ALWAYS_DELETE, SPOOL_FLAG_ARCHIVE, outgoing::tech, var, outgoing::vars, and outgoing::waittime.

Referenced by scan_service().

00148 {
00149    char buf[256];
00150    char *c, *c2;
00151    int lineno = 0;
00152    struct ast_variable *var, *last = o->vars;
00153 
00154    while (last && last->next) {
00155       last = last->next;
00156    }
00157 
00158    while(fgets(buf, sizeof(buf), f)) {
00159       lineno++;
00160       /* Trim comments */
00161       c = buf;
00162       while ((c = strchr(c, '#'))) {
00163          if ((c == buf) || (*(c-1) == ' ') || (*(c-1) == '\t'))
00164             *c = '\0';
00165          else
00166             c++;
00167       }
00168 
00169       c = buf;
00170       while ((c = strchr(c, ';'))) {
00171          if ((c > buf) && (c[-1] == '\\')) {
00172             memmove(c - 1, c, strlen(c) + 1);
00173             c++;
00174          } else {
00175             *c = '\0';
00176             break;
00177          }
00178       }
00179 
00180       /* Trim trailing white space */
00181       ast_trim_blanks(buf);
00182       if (ast_strlen_zero(buf)) {
00183          continue;
00184       }
00185       c = strchr(buf, ':');
00186       if (!c) {
00187          ast_log(LOG_NOTICE, "Syntax error at line %d of %s\n", lineno, o->fn);
00188          continue;
00189       }
00190       *c = '\0';
00191       c = ast_skip_blanks(c + 1);
00192 #if 0
00193       printf("'%s' is '%s' at line %d\n", buf, c, lineno);
00194 #endif
00195       if (!strcasecmp(buf, "channel")) {
00196          if ((c2 = strchr(c, '/'))) {
00197             *c2 = '\0';
00198             c2++;
00199             ast_string_field_set(o, tech, c);
00200             ast_string_field_set(o, dest, c2);
00201          } else {
00202             ast_log(LOG_NOTICE, "Channel should be in form Tech/Dest at line %d of %s\n", lineno, o->fn);
00203          }
00204       } else if (!strcasecmp(buf, "callerid")) {
00205          char cid_name[80] = {0}, cid_num[80] = {0};
00206          ast_callerid_split(c, cid_name, sizeof(cid_name), cid_num, sizeof(cid_num));
00207          ast_string_field_set(o, cid_num, cid_num);
00208          ast_string_field_set(o, cid_name, cid_name);
00209       } else if (!strcasecmp(buf, "application")) {
00210          ast_string_field_set(o, app, c);
00211       } else if (!strcasecmp(buf, "data")) {
00212          ast_string_field_set(o, data, c);
00213       } else if (!strcasecmp(buf, "maxretries")) {
00214          if (sscanf(c, "%30d", &o->maxretries) != 1) {
00215             ast_log(LOG_WARNING, "Invalid max retries at line %d of %s\n", lineno, o->fn);
00216             o->maxretries = 0;
00217          }
00218       } else if (!strcasecmp(buf, "codecs")) {
00219          ast_parse_allow_disallow(NULL, &o->format, c, 1);
00220       } else if (!strcasecmp(buf, "context")) {
00221          ast_string_field_set(o, context, c);
00222       } else if (!strcasecmp(buf, "extension")) {
00223          ast_string_field_set(o, exten, c);
00224       } else if (!strcasecmp(buf, "priority")) {
00225          if ((sscanf(c, "%30d", &o->priority) != 1) || (o->priority < 1)) {
00226             ast_log(LOG_WARNING, "Invalid priority at line %d of %s\n", lineno, o->fn);
00227             o->priority = 1;
00228          }
00229       } else if (!strcasecmp(buf, "retrytime")) {
00230          if ((sscanf(c, "%30d", &o->retrytime) != 1) || (o->retrytime < 1)) {
00231             ast_log(LOG_WARNING, "Invalid retrytime at line %d of %s\n", lineno, o->fn);
00232             o->retrytime = 300;
00233          }
00234       } else if (!strcasecmp(buf, "waittime")) {
00235          if ((sscanf(c, "%30d", &o->waittime) != 1) || (o->waittime < 1)) {
00236             ast_log(LOG_WARNING, "Invalid waittime at line %d of %s\n", lineno, o->fn);
00237             o->waittime = 45;
00238          }
00239       } else if (!strcasecmp(buf, "retry")) {
00240          o->retries++;
00241       } else if (!strcasecmp(buf, "startretry")) {
00242          if (sscanf(c, "%30ld", &o->callingpid) != 1) {
00243             ast_log(LOG_WARNING, "Unable to retrieve calling PID!\n");
00244             o->callingpid = 0;
00245          }
00246       } else if (!strcasecmp(buf, "endretry") || !strcasecmp(buf, "abortretry")) {
00247          o->callingpid = 0;
00248          o->retries++;
00249       } else if (!strcasecmp(buf, "delayedretry")) {
00250       } else if (!strcasecmp(buf, "setvar") || !strcasecmp(buf, "set")) {
00251          c2 = c;
00252          strsep(&c2, "=");
00253          if (c2) {
00254             var = ast_variable_new(c, c2, o->fn);
00255             if (var) {
00256                /* Always insert at the end, because some people want to treat the spool file as a script */
00257                if (last) {
00258                   last->next = var;
00259                } else {
00260                   o->vars = var;
00261                }
00262                last = var;
00263             }
00264          } else
00265             ast_log(LOG_WARNING, "Malformed \"%s\" argument.  Should be \"%s: variable=value\"\n", buf, buf);
00266       } else if (!strcasecmp(buf, "account")) {
00267          ast_string_field_set(o, account, c);
00268       } else if (!strcasecmp(buf, "alwaysdelete")) {
00269          ast_set2_flag(&o->options, ast_true(c), SPOOL_FLAG_ALWAYS_DELETE);
00270       } else if (!strcasecmp(buf, "archive")) {
00271          ast_set2_flag(&o->options, ast_true(c), SPOOL_FLAG_ARCHIVE);
00272       } else {
00273          ast_log(LOG_WARNING, "Unknown keyword '%s' at line %d of %s\n", buf, lineno, o->fn);
00274       }
00275    }
00276    if (ast_strlen_zero(o->tech) || ast_strlen_zero(o->dest) || (ast_strlen_zero(o->app) && ast_strlen_zero(o->exten))) {
00277       ast_log(LOG_WARNING, "At least one of app or extension must be specified, along with tech and dest in file %s\n", o->fn);
00278       return -1;
00279    }
00280    return 0;
00281 }

static void* attempt_thread ( void *  data  )  [static]

Definition at line 358 of file pbx_spool.c.

References outgoing::account, outgoing::app, ast_channel_reason2str(), ast_log(), ast_pbx_outgoing_app(), ast_pbx_outgoing_exten(), ast_strlen_zero(), ast_verb, outgoing::cid_name, outgoing::cid_num, outgoing::context, outgoing::data, outgoing::dest, outgoing::exten, outgoing::fn, outgoing::format, free_outgoing(), LOG_NOTICE, outgoing::maxretries, outgoing::priority, queue_file(), remove_from_queue(), outgoing::retries, outgoing::retrytime, safe_append(), outgoing::tech, outgoing::vars, and outgoing::waittime.

Referenced by launch_service().

00359 {
00360    struct outgoing *o = data;
00361    int res, reason;
00362    if (!ast_strlen_zero(o->app)) {
00363       ast_verb(3, "Attempting call on %s/%s for application %s(%s) (Retry %d)\n", o->tech, o->dest, o->app, o->data, o->retries);
00364       res = ast_pbx_outgoing_app(o->tech, o->format, (void *) o->dest, o->waittime * 1000, o->app, o->data, &reason, 2 /* wait to finish */, o->cid_num, o->cid_name, o->vars, o->account, NULL);
00365       o->vars = NULL;
00366    } else {
00367       ast_verb(3, "Attempting call on %s/%s for %s@%s:%d (Retry %d)\n", o->tech, o->dest, o->exten, o->context,o->priority, o->retries);
00368       res = ast_pbx_outgoing_exten(o->tech, o->format, (void *) o->dest, o->waittime * 1000, o->context, o->exten, o->priority, &reason, 2 /* wait to finish */, o->cid_num, o->cid_name, o->vars, o->account, NULL);
00369       o->vars = NULL;
00370    }
00371    if (res) {
00372       ast_log(LOG_NOTICE, "Call failed to go through, reason (%d) %s\n", reason, ast_channel_reason2str(reason));
00373       if (o->retries >= o->maxretries + 1) {
00374          /* Max retries exceeded */
00375          ast_log(LOG_NOTICE, "Queued call to %s/%s expired without completion after %d attempt%s\n", o->tech, o->dest, o->retries - 1, ((o->retries - 1) != 1) ? "s" : "");
00376          remove_from_queue(o, "Expired");
00377       } else {
00378          /* Notate that the call is still active */
00379          safe_append(o, time(NULL), "EndRetry");
00380 #if defined(HAVE_INOTIFY) || defined(HAVE_KQUEUE)
00381          queue_file(o->fn, time(NULL) + o->retrytime);
00382 #endif
00383       }
00384    } else {
00385       ast_log(LOG_NOTICE, "Call completed to %s/%s\n", o->tech, o->dest);
00386       remove_from_queue(o, "Completed");
00387    }
00388    free_outgoing(o);
00389    return NULL;
00390 }

static void free_outgoing ( struct outgoing o  )  [static]

Definition at line 104 of file pbx_spool.c.

References ast_free, ast_string_field_free_memory, ast_variables_destroy(), and outgoing::vars.

Referenced by attempt_thread(), launch_service(), new_outgoing(), and scan_service().

00105 {
00106    if (o->vars) {
00107       ast_variables_destroy(o->vars);
00108    }
00109    ast_string_field_free_memory(o);
00110    ast_free(o);
00111 }

static void launch_service ( struct outgoing o  )  [static]

Definition at line 392 of file pbx_spool.c.

References ast_log(), ast_pthread_create_detached, attempt_thread(), free_outgoing(), and LOG_WARNING.

Referenced by scan_service().

00393 {
00394    pthread_t t;
00395    int ret;
00396 
00397    if ((ret = ast_pthread_create_detached(&t, NULL, attempt_thread, o))) {
00398       ast_log(LOG_WARNING, "Unable to create thread :( (returned error: %d)\n", ret);
00399       free_outgoing(o);
00400    }
00401 }

static int load_module ( void   )  [static]

Definition at line 874 of file pbx_spool.c.

References ast_config_AST_SPOOL_DIR, ast_log(), ast_mkdir(), AST_MODULE_LOAD_DECLINE, AST_MODULE_LOAD_FAILURE, AST_MODULE_LOAD_SUCCESS, ast_pthread_create_detached_background, LOG_WARNING, scan_thread(), and thread.

00875 {
00876    pthread_t thread;
00877    int ret;
00878    snprintf(qdir, sizeof(qdir), "%s/%s", ast_config_AST_SPOOL_DIR, "outgoing");
00879    if (ast_mkdir(qdir, 0777)) {
00880       ast_log(LOG_WARNING, "Unable to create queue directory %s -- outgoing spool disabled\n", qdir);
00881       return AST_MODULE_LOAD_DECLINE;
00882    }
00883    snprintf(qdonedir, sizeof(qdir), "%s/%s", ast_config_AST_SPOOL_DIR, "outgoing_done");
00884 
00885    if ((ret = ast_pthread_create_detached_background(&thread, NULL, scan_thread, NULL))) {
00886       ast_log(LOG_WARNING, "Unable to create thread :( (returned error: %d)\n", ret);
00887       return AST_MODULE_LOAD_FAILURE;
00888    }
00889 
00890    return AST_MODULE_LOAD_SUCCESS;
00891 }

static struct outgoing* new_outgoing ( const char *  fn  )  [static, read]

Definition at line 113 of file pbx_spool.c.

References ast_calloc, AST_FORMAT_SLINEAR, ast_free, ast_set_flag, ast_string_field_init, ast_string_field_set, ast_strlen_zero(), outgoing::fn, outgoing::format, free_outgoing(), outgoing::options, outgoing::priority, outgoing::retrytime, SPOOL_FLAG_ALWAYS_DELETE, and outgoing::waittime.

Referenced by scan_service().

00114 {
00115    struct outgoing *o;
00116 
00117    o = ast_calloc(1, sizeof(*o));
00118    if (!o) {
00119       return NULL;
00120    }
00121 
00122    /* Initialize the new object. */
00123    o->priority = 1;
00124    o->retrytime = 300;
00125    o->waittime = 45;
00126    o->format = AST_FORMAT_SLINEAR;
00127    ast_set_flag(&o->options, SPOOL_FLAG_ALWAYS_DELETE);
00128    if (ast_string_field_init(o, 128)) {
00129       /*
00130        * No need to call free_outgoing here since the failure was to
00131        * allocate string fields and no variables have been allocated
00132        * yet.
00133        */
00134       ast_free(o);
00135       return NULL;
00136    }
00137    ast_string_field_set(o, fn, fn);
00138    if (ast_strlen_zero(o->fn)) {
00139       /* String field set failed.  Since this string is important we must fail. */
00140       free_outgoing(o);
00141       return NULL;
00142    }
00143 
00144    return o;
00145 }

static void queue_created_files ( void   )  [static]

Definition at line 585 of file pbx_spool.c.

References ast_free, AST_LIST_REMOVE_CURRENT, AST_LIST_TRAVERSE_SAFE_BEGIN, AST_LIST_TRAVERSE_SAFE_END, direntry::mtime, direntry::name, and queue_file().

Referenced by scan_thread().

00586 {
00587    struct direntry *cur;
00588    time_t now = time(NULL);
00589 
00590    AST_LIST_TRAVERSE_SAFE_BEGIN(&createlist, cur, list) {
00591       if (cur->mtime > now) {
00592          break;
00593       }
00594 
00595       AST_LIST_REMOVE_CURRENT(list);
00596       queue_file(cur->name, 0);
00597       ast_free(cur);
00598    }
00599    AST_LIST_TRAVERSE_SAFE_END
00600 }

static void queue_file ( const char *  filename,
time_t  when 
) [static]

Definition at line 488 of file pbx_spool.c.

References ast_alloca, ast_calloc, AST_LIST_EMPTY, AST_LIST_INSERT_BEFORE_CURRENT, AST_LIST_INSERT_HEAD, AST_LIST_INSERT_TAIL, AST_LIST_LOCK, AST_LIST_TRAVERSE, AST_LIST_TRAVERSE_SAFE_BEGIN, AST_LIST_TRAVERSE_SAFE_END, AST_LIST_UNLOCK, ast_log(), errno, LOG_WARNING, direntry::mtime, direntry::name, and scan_service().

Referenced by attempt_thread(), queue_created_files(), queue_file_write(), and scan_thread().

00489 {
00490    struct stat st;
00491    struct direntry *cur, *new;
00492    int res;
00493    time_t now = time(NULL);
00494 
00495    if (!strchr(filename, '/')) {
00496       char *fn = ast_alloca(strlen(qdir) + strlen(filename) + 2);
00497       sprintf(fn, "%s/%s", qdir, filename); /* SAFE */
00498       filename = fn;
00499    }
00500 
00501    if (when == 0) {
00502       if (stat(filename, &st)) {
00503          ast_log(LOG_WARNING, "Unable to stat %s: %s\n", filename, strerror(errno));
00504          return;
00505       }
00506 
00507       if (!S_ISREG(st.st_mode)) {
00508          return;
00509       }
00510 
00511       when = st.st_mtime;
00512    }
00513 
00514    /* Need to check the existing list in order to avoid duplicates. */
00515    AST_LIST_LOCK(&dirlist);
00516    AST_LIST_TRAVERSE(&dirlist, cur, list) {
00517       if (cur->mtime == when && !strcmp(filename, cur->name)) {
00518          AST_LIST_UNLOCK(&dirlist);
00519          return;
00520       }
00521    }
00522 
00523    if ((res = when) > now || (res = scan_service(filename, now)) > 0) {
00524       if (!(new = ast_calloc(1, sizeof(*new) + strlen(filename) + 1))) {
00525          AST_LIST_UNLOCK(&dirlist);
00526          return;
00527       }
00528       new->mtime = res;
00529       strcpy(new->name, filename);
00530       /* List is ordered by mtime */
00531       if (AST_LIST_EMPTY(&dirlist)) {
00532          AST_LIST_INSERT_HEAD(&dirlist, new, list);
00533       } else {
00534          int found = 0;
00535          AST_LIST_TRAVERSE_SAFE_BEGIN(&dirlist, cur, list) {
00536             if (cur->mtime > new->mtime) {
00537                AST_LIST_INSERT_BEFORE_CURRENT(new, list);
00538                found = 1;
00539                break;
00540             }
00541          }
00542          AST_LIST_TRAVERSE_SAFE_END
00543          if (!found) {
00544             AST_LIST_INSERT_TAIL(&dirlist, new, list);
00545          }
00546       }
00547    }
00548    AST_LIST_UNLOCK(&dirlist);
00549 }

static void queue_file_create ( const char *  filename  )  [static]

Definition at line 552 of file pbx_spool.c.

References ast_calloc, AST_LIST_INSERT_TAIL, AST_LIST_TRAVERSE, direntry::mtime, and direntry::name.

Referenced by scan_thread().

00553 {
00554    struct direntry *cur;
00555 
00556    AST_LIST_TRAVERSE(&createlist, cur, list) {
00557       if (!strcmp(cur->name, filename)) {
00558          return;
00559       }
00560    }
00561 
00562    if (!(cur = ast_calloc(1, sizeof(*cur) + strlen(filename) + 1))) {
00563       return;
00564    }
00565    strcpy(cur->name, filename);
00566    /* We'll handle this file unless an IN_OPEN event occurs within 2 seconds */
00567    cur->mtime = time(NULL) + 2;
00568    AST_LIST_INSERT_TAIL(&createlist, cur, list);
00569 }

static void queue_file_open ( const char *  filename  )  [static]

Definition at line 571 of file pbx_spool.c.

References AST_LIST_INSERT_TAIL, AST_LIST_REMOVE_CURRENT, AST_LIST_TRAVERSE_SAFE_BEGIN, AST_LIST_TRAVERSE_SAFE_END, and direntry::name.

Referenced by scan_thread().

00572 {
00573    struct direntry *cur;
00574 
00575    AST_LIST_TRAVERSE_SAFE_BEGIN(&createlist, cur, list) {
00576       if (!strcmp(cur->name, filename)) {
00577          AST_LIST_REMOVE_CURRENT(list);
00578          AST_LIST_INSERT_TAIL(&openlist, cur, list);
00579          break;
00580       }
00581    }
00582    AST_LIST_TRAVERSE_SAFE_END
00583 }

static void queue_file_write ( const char *  filename  )  [static]

Definition at line 602 of file pbx_spool.c.

References ast_free, AST_LIST_REMOVE_CURRENT, AST_LIST_TRAVERSE_SAFE_BEGIN, AST_LIST_TRAVERSE_SAFE_END, direntry::name, and queue_file().

Referenced by scan_thread().

00603 {
00604    struct direntry *cur;
00605    /* Only queue entries where an IN_CREATE preceded the IN_CLOSE_WRITE */
00606    AST_LIST_TRAVERSE_SAFE_BEGIN(&openlist, cur, list) {
00607       if (!strcmp(cur->name, filename)) {
00608          AST_LIST_REMOVE_CURRENT(list);
00609          ast_free(cur);
00610          queue_file(filename, 0);
00611          break;
00612       }
00613    }
00614    AST_LIST_TRAVERSE_SAFE_END
00615 }

static int remove_from_queue ( struct outgoing o,
const char *  status 
) [static]

Remove a call file from the outgoing queue optionally moving it in the archive dir.

Parameters:
o the pointer to outgoing struct
status the exit status of the call. Can be "Completed", "Failed" or "Expired"

Definition at line 307 of file pbx_spool.c.

References ast_log(), ast_mkdir(), ast_test_flag, f, outgoing::fn, LOG_WARNING, outgoing::options, SPOOL_FLAG_ALWAYS_DELETE, and SPOOL_FLAG_ARCHIVE.

Referenced by attempt_thread(), and scan_service().

00308 {
00309    FILE *f;
00310    char newfn[256];
00311    const char *bname;
00312 
00313    if (!ast_test_flag(&o->options, SPOOL_FLAG_ALWAYS_DELETE)) {
00314       struct stat current_file_status;
00315 
00316       if (!stat(o->fn, &current_file_status)) {
00317          if (time(NULL) < current_file_status.st_mtime) {
00318             return 0;
00319          }
00320       }
00321    }
00322 
00323    if (!ast_test_flag(&o->options, SPOOL_FLAG_ARCHIVE)) {
00324       unlink(o->fn);
00325       return 0;
00326    }
00327 
00328    if (ast_mkdir(qdonedir, 0777)) {
00329       ast_log(LOG_WARNING, "Unable to create queue directory %s -- outgoing spool archiving disabled\n", qdonedir);
00330       unlink(o->fn);
00331       return -1;
00332    }
00333 
00334    if (!(bname = strrchr(o->fn, '/'))) {
00335       bname = o->fn;
00336    } else {
00337       bname++;
00338    }
00339 
00340    snprintf(newfn, sizeof(newfn), "%s/%s", qdonedir, bname);
00341    /* If there is already a call file with the name in the archive dir, it will be overwritten. */
00342    unlink(newfn);
00343    if (rename(o->fn, newfn) != 0) {
00344       unlink(o->fn);
00345       return -1;
00346    }
00347 
00348    /* Only append to the file AFTER we move it out of the watched directory,
00349     * otherwise the fclose() causes another event for inotify(7) */
00350    if ((f = fopen(newfn, "a"))) {
00351       fprintf(f, "Status: %s\n", status);
00352       fclose(f);
00353    }
00354 
00355    return 0;
00356 }

static void safe_append ( struct outgoing o,
time_t  now,
char *  s 
) [static]

Definition at line 283 of file pbx_spool.c.

References ast_debug, ast_log(), ast_mainpid, outgoing::dest, errno, f, outgoing::fn, LOG_WARNING, outgoing::retries, outgoing::retrytime, and outgoing::tech.

Referenced by attempt_thread(), and scan_service().

00284 {
00285    FILE *f;
00286    struct utimbuf tbuf = { .actime = now, .modtime = now + o->retrytime };
00287 
00288    ast_debug(1, "Outgoing %s/%s: %s\n", o->tech, o->dest, s);
00289 
00290    if ((f = fopen(o->fn, "a"))) {
00291       fprintf(f, "\n%s: %ld %d (%ld)\n", s, (long)ast_mainpid, o->retries, (long) now);
00292       fclose(f);
00293    }
00294 
00295    /* Update the file time */
00296    if (utime(o->fn, &tbuf)) {
00297       ast_log(LOG_WARNING, "Unable to set utime on %s: %s\n", o->fn, strerror(errno));
00298    }
00299 }

static int scan_service ( const char *  fn,
time_t  now 
) [static]

Todo:
XXX There is some odd delayed duplicate servicing of call files going on. We need to suppress the error message if the file does not exist as a result.

Definition at line 404 of file pbx_spool.c.

References apply_outgoing(), ast_debug, ast_log(), ast_mainpid, outgoing::callingpid, outgoing::dest, errno, f, outgoing::fn, free_outgoing(), launch_service(), LOG_DEBUG, LOG_NOTICE, LOG_WARNING, outgoing::maxretries, new_outgoing(), remove_from_queue(), outgoing::retries, outgoing::retrytime, safe_append(), and outgoing::tech.

Referenced by queue_file().

00405 {
00406    struct outgoing *o;
00407    FILE *f;
00408    int res;
00409 
00410    o = new_outgoing(fn);
00411    if (!o) {
00412       return -1;
00413    }
00414 
00415    /* Attempt to open the file */
00416    f = fopen(o->fn, "r");
00417    if (!f) {
00418 #if defined(HAVE_INOTIFY) || defined(HAVE_KQUEUE)
00419       /*!
00420        * \todo XXX There is some odd delayed duplicate servicing of
00421        * call files going on.  We need to suppress the error message
00422        * if the file does not exist as a result.
00423        */
00424       if (errno != ENOENT)
00425 #endif
00426       {
00427          ast_log(LOG_WARNING, "Unable to open %s: '%s'(%d), deleting\n",
00428             o->fn, strerror(errno), (int) errno);
00429       }
00430       remove_from_queue(o, "Failed");
00431       free_outgoing(o);
00432       return -1;
00433    }
00434 
00435    /* Read in and verify the contents */
00436    res = apply_outgoing(o, f);
00437    fclose(f);
00438    if (res) {
00439       ast_log(LOG_WARNING, "Invalid file contents in %s, deleting\n", o->fn);
00440       remove_from_queue(o, "Failed");
00441       free_outgoing(o);
00442       return -1;
00443    }
00444 
00445    ast_debug(1, "Filename: %s, Retries: %d, max: %d\n", o->fn, o->retries, o->maxretries);
00446    if (o->retries <= o->maxretries) {
00447       now += o->retrytime;
00448       if (o->callingpid && (o->callingpid == ast_mainpid)) {
00449          safe_append(o, time(NULL), "DelayedRetry");
00450          ast_log(LOG_DEBUG, "Delaying retry since we're currently running '%s'\n", o->fn);
00451          free_outgoing(o);
00452       } else {
00453          /* Increment retries */
00454          o->retries++;
00455          /* If someone else was calling, they're presumably gone now
00456             so abort their retry and continue as we were... */
00457          if (o->callingpid)
00458             safe_append(o, time(NULL), "AbortRetry");
00459 
00460          safe_append(o, now, "StartRetry");
00461          launch_service(o);
00462       }
00463       return now;
00464    }
00465 
00466    ast_log(LOG_NOTICE, "Queued call to %s/%s expired without completion after %d attempt%s\n",
00467       o->tech, o->dest, o->retries - 1, ((o->retries - 1) != 1) ? "s" : "");
00468    remove_from_queue(o, "Expired");
00469    free_outgoing(o);
00470    return 0;
00471 }

static void* scan_thread ( void *  unused  )  [static]

Definition at line 618 of file pbx_spool.c.

References ast_debug, ast_free, ast_fully_booted, AST_LIST_EMPTY, AST_LIST_FIRST, AST_LIST_LOCK, AST_LIST_REMOVE_HEAD, AST_LIST_UNLOCK, ast_log(), errno, HAVE_INOTIFY, inotify_fd, len(), LOG_ERROR, direntry::mtime, direntry::name, direntry::next, queue_created_files(), queue_file(), queue_file_create(), queue_file_open(), and queue_file_write().

Referenced by load_module().

00619 {
00620    DIR *dir;
00621    struct dirent *de;
00622    time_t now;
00623    struct timespec ts = { .tv_sec = 1 };
00624 #ifdef HAVE_INOTIFY
00625    ssize_t res;
00626    int inotify_fd = inotify_init();
00627    struct inotify_event *iev;
00628    char buf[8192] __attribute__((aligned (sizeof(int))));
00629    struct pollfd pfd = { .fd = inotify_fd, .events = POLLIN };
00630 #else
00631    struct timespec nowait = { .tv_sec = 0, .tv_nsec = 1 };
00632    int inotify_fd = kqueue();
00633    struct kevent kev;
00634    struct kevent event;
00635 #endif
00636    struct direntry *cur;
00637 
00638    while (!ast_fully_booted) {
00639       nanosleep(&ts, NULL);
00640    }
00641 
00642    if (inotify_fd < 0) {
00643       ast_log(LOG_ERROR, "Unable to initialize "
00644 #ifdef HAVE_INOTIFY
00645          "inotify(7)"
00646 #else
00647          "kqueue(2)"
00648 #endif
00649          "\n");
00650       return NULL;
00651    }
00652 
00653 #ifdef HAVE_INOTIFY
00654    inotify_add_watch(inotify_fd, qdir, IN_CREATE | IN_OPEN | IN_CLOSE_WRITE | IN_MOVED_TO);
00655 #endif
00656 
00657    /* First, run through the directory and clear existing entries */
00658    if (!(dir = opendir(qdir))) {
00659       ast_log(LOG_ERROR, "Unable to open directory %s: %s\n", qdir, strerror(errno));
00660       return NULL;
00661    }
00662 
00663 #ifndef HAVE_INOTIFY
00664    EV_SET(&kev, dirfd(dir), EVFILT_VNODE, EV_ADD | EV_ENABLE | EV_CLEAR, NOTE_WRITE, 0, NULL);
00665    if (kevent(inotify_fd, &kev, 1, &event, 1, &nowait) < 0 && errno != 0) {
00666       ast_log(LOG_ERROR, "Unable to watch directory %s: %s\n", qdir, strerror(errno));
00667    }
00668 #endif
00669    now = time(NULL);
00670    while ((de = readdir(dir))) {
00671       queue_file(de->d_name, 0);
00672    }
00673 
00674 #ifdef HAVE_INOTIFY
00675    /* Directory needs to remain open for kqueue(2) */
00676    closedir(dir);
00677 #endif
00678 
00679    /* Wait for either a) next timestamp to occur, or b) a change to happen */
00680    for (;/* ever */;) {
00681       time_t next = AST_LIST_EMPTY(&dirlist) ? INT_MAX : AST_LIST_FIRST(&dirlist)->mtime;
00682 
00683       time(&now);
00684       if (next > now) {
00685 #ifdef HAVE_INOTIFY
00686          int stage = 0;
00687          /* Convert from seconds to milliseconds, unless there's nothing
00688           * in the queue already, in which case, we wait forever. */
00689          int waittime = next == INT_MAX ? -1 : (next - now) * 1000;
00690          if (!AST_LIST_EMPTY(&createlist)) {
00691             waittime = 1000;
00692          }
00693          /* When a file arrives, add it to the queue, in mtime order. */
00694          if ((res = poll(&pfd, 1, waittime)) > 0 && (stage = 1) &&
00695             (res = read(inotify_fd, &buf, sizeof(buf))) >= sizeof(*iev)) {
00696             ssize_t len = 0;
00697             /* File(s) added to directory, add them to my list */
00698             for (iev = (void *) buf; res >= sizeof(*iev); iev = (struct inotify_event *) (((char *) iev) + len)) {
00699                /* For an IN_MOVED_TO event, simply process the file. However, if
00700                 * we get an IN_CREATE event it *might* be an open(O_CREAT) or it
00701                 * might be a hardlink (like smsq does, since rename() might
00702                 * overwrite an existing file). So we have to see if we get a
00703                 * subsequent IN_OPEN event on the same file. If we do, keep it
00704                 * on the openlist and wait for the corresponding IN_CLOSE_WRITE.
00705                 * If we *don't* see an IN_OPEN event, then it was a hard link so
00706                 * it can be processed immediately.
00707                 *
00708                 * Unfortunately, although open(O_CREAT) is an atomic file system
00709                 * operation, the inotify subsystem doesn't give it to us in a
00710                 * single event with both IN_CREATE|IN_OPEN set. It's two separate
00711                 * events, and the kernel doesn't even give them to us at the same
00712                 * time. We can read() from inotify_fd after the IN_CREATE event,
00713                 * and get *nothing* from it. The IN_OPEN arrives only later! So
00714                 * we have a very short timeout of 2 seconds. */
00715                if (iev->mask & IN_CREATE) {
00716                   queue_file_create(iev->name);
00717                } else if (iev->mask & IN_OPEN) {
00718                   queue_file_open(iev->name);
00719                } else if (iev->mask & IN_CLOSE_WRITE) {
00720                   queue_file_write(iev->name);
00721                } else if (iev->mask & IN_MOVED_TO) {
00722                   queue_file(iev->name, 0);
00723                } else {
00724                   ast_log(LOG_ERROR, "Unexpected event %d for file '%s'\n", (int) iev->mask, iev->name);
00725                }
00726 
00727                len = sizeof(*iev) + iev->len;
00728                res -= len;
00729             }
00730          } else if (res < 0 && errno != EINTR && errno != EAGAIN) {
00731             ast_debug(1, "Got an error back from %s(2): %s\n", stage ? "read" : "poll", strerror(errno));
00732          }
00733          time(&now);
00734       }
00735       queue_created_files();
00736 #else
00737          int num_events;
00738          /* If queue empty then wait forever */
00739          if (next == INT_MAX) {
00740             num_events = kevent(inotify_fd, &kev, 1, &event, 1, NULL);
00741          } else {
00742             struct timespec ts2 = { .tv_sec = (unsigned long int)(next - now), .tv_nsec = 0 };
00743             num_events = kevent(inotify_fd, &kev, 1, &event, 1, &ts2);
00744          }
00745          if ((num_events < 0) || (event.flags == EV_ERROR)) {
00746             ast_debug(10, "KEvent error %s\n", strerror(errno));
00747             continue;
00748          } else if (num_events == 0) {
00749             /* Interrupt or timeout, restart calculations */
00750             continue;
00751          } else {
00752             /* Directory changed, rescan */
00753             rewinddir(dir);
00754             while ((de = readdir(dir))) {
00755                queue_file(de->d_name, 0);
00756             }
00757          }
00758          time(&now);
00759       }
00760 #endif
00761 
00762       /* Empty the list of all entries ready to be processed */
00763       AST_LIST_LOCK(&dirlist);
00764       while (!AST_LIST_EMPTY(&dirlist) && AST_LIST_FIRST(&dirlist)->mtime <= now) {
00765          cur = AST_LIST_REMOVE_HEAD(&dirlist, list);
00766          queue_file(cur->name, cur->mtime);
00767          ast_free(cur);
00768       }
00769       AST_LIST_UNLOCK(&dirlist);
00770    }
00771    return NULL;
00772 }

static int unload_module ( void   )  [static]

Definition at line 869 of file pbx_spool.c.

00870 {
00871    return -1;
00872 }


Variable Documentation

struct ast_module_info __mod_info = { .name = AST_MODULE, .flags = AST_MODFLAG_LOAD_ORDER , .description = "Outgoing Spool Support" , .key = "This paragraph is copyright (c) 2006 by Digium, Inc. \In order for your module to load, it must return this \key via a function called \"key\". Any code which \includes this paragraph must be licensed under the GNU \General Public License version 2 or later (at your \option). In addition to Digium's general reservations \of rights, Digium expressly reserves the right to \allow other parties to license this paragraph under \different terms. Any use of Digium, Inc. trademarks or \logos (including \"Asterisk\" or \"Digium\") without \express written permission of Digium, Inc. is prohibited.\n" , .buildopt_sum = "ac1f6a56484a8820659555499174e588" , .load = load_module, .unload = unload_module, .load_pri = AST_MODPRI_DEFAULT, } [static]

Definition at line 893 of file pbx_spool.c.

Definition at line 893 of file pbx_spool.c.

char qdir[255] [static]

Definition at line 72 of file pbx_spool.c.

char qdonedir[255] [static]

Definition at line 73 of file pbx_spool.c.


Generated on 31 Aug 2015 for Asterisk - The Open Source Telephony Project by  doxygen 1.6.1