Mon Oct 8 12:39:26 2012

Asterisk developer's documentation


pbx_spool.c File Reference

Full-featured outgoing call spool support. More...

#include "asterisk.h"
#include <sys/stat.h>
#include <time.h>
#include <utime.h>
#include <dirent.h>
#include <sys/inotify.h>
#include "asterisk/paths.h"
#include "asterisk/lock.h"
#include "asterisk/file.h"
#include "asterisk/logger.h"
#include "asterisk/channel.h"
#include "asterisk/callerid.h"
#include "asterisk/pbx.h"
#include "asterisk/module.h"
#include "asterisk/utils.h"
#include "asterisk/options.h"

Go to the source code of this file.

Data Structures

struct  createlist
struct  direntry
struct  dirlist
struct  openlist
struct  outgoing

Enumerations

enum  { SPOOL_FLAG_ALWAYS_DELETE = (1 << 0), SPOOL_FLAG_ARCHIVE = (1 << 1) }

Functions

static void __reg_module (void)
static void __unreg_module (void)
static int apply_outgoing (struct outgoing *o, const char *fn, FILE *f)
static void * attempt_thread (void *data)
static void free_outgoing (struct outgoing *o)
static int init_outgoing (struct outgoing *o)
static void launch_service (struct outgoing *o)
static int load_module (void)
static void queue_created_files (void)
static void queue_file (const char *filename, time_t when)
static void queue_file_create (const char *filename)
static void queue_file_open (const char *filename)
static void queue_file_write (const char *filename)
static int remove_from_queue (struct outgoing *o, const char *status)
 Remove a call file from the outgoing queue optionally moving it in the archive dir.
static void safe_append (struct outgoing *o, time_t now, char *s)
static int scan_service (const char *fn, time_t now)
static void * scan_thread (void *unused)
static int unload_module (void)

Variables

static struct ast_module_info __mod_info = { .name = AST_MODULE, .flags = AST_MODFLAG_LOAD_ORDER , .description = "Outgoing Spool Support" , .key = "This paragraph is copyright (c) 2006 by Digium, Inc. \In order for your module to load, it must return this \key via a function called \"key\". Any code which \includes this paragraph must be licensed under the GNU \General Public License version 2 or later (at your \option). In addition to Digium's general reservations \of rights, Digium expressly reserves the right to \allow other parties to license this paragraph under \different terms. Any use of Digium, Inc. trademarks or \logos (including \"Asterisk\" or \"Digium\") without \express written permission of Digium, Inc. is prohibited.\n" , .buildopt_sum = "ac1f6a56484a8820659555499174e588" , .load = load_module, .unload = unload_module, .load_pri = AST_MODPRI_DEFAULT, }
static struct ast_module_infoast_module_info = &__mod_info
static char qdir [255]
static char qdonedir [255]


Detailed Description

Full-featured outgoing call spool support.

Definition in file pbx_spool.c.


Enumeration Type Documentation

anonymous enum

Enumerator:
SPOOL_FLAG_ALWAYS_DELETE  Always delete the call file after a call succeeds or the maximum number of retries is exceeded, even if the modification time of the call file is in the future.
SPOOL_FLAG_ARCHIVE 

Definition at line 62 of file pbx_spool.c.

00062      {
00063    /*! Always delete the call file after a call succeeds or the
00064     * maximum number of retries is exceeded, even if the
00065     * modification time of the call file is in the future.
00066     */
00067    SPOOL_FLAG_ALWAYS_DELETE = (1 << 0),
00068    /* Don't unlink the call file after processing, move in qdonedir */
00069    SPOOL_FLAG_ARCHIVE = (1 << 1),
00070 };


Function Documentation

static void __reg_module ( void   )  [static]

Definition at line 864 of file pbx_spool.c.

static void __unreg_module ( void   )  [static]

Definition at line 864 of file pbx_spool.c.

static int apply_outgoing ( struct outgoing o,
const char *  fn,
FILE *  f 
) [static]

Definition at line 126 of file pbx_spool.c.

References outgoing::app, app, ast_callerid_split(), ast_log(), ast_parse_allow_disallow(), ast_set2_flag, ast_string_field_set, ast_strlen_zero(), ast_true(), ast_variable_new(), outgoing::callingpid, cid_name, cid_num, context, outgoing::dest, outgoing::exten, exten, outgoing::format, last, LOG_NOTICE, LOG_WARNING, outgoing::maxretries, sla_ringing_trunk::next, outgoing::options, outgoing::priority, outgoing::retries, outgoing::retrytime, SPOOL_FLAG_ALWAYS_DELETE, SPOOL_FLAG_ARCHIVE, strsep(), outgoing::tech, var, outgoing::vars, and outgoing::waittime.

Referenced by scan_service().

00127 {
00128    char buf[256];
00129    char *c, *c2;
00130    int lineno = 0;
00131    struct ast_variable *var, *last = o->vars;
00132 
00133    while (last && last->next) {
00134       last = last->next;
00135    }
00136 
00137    while(fgets(buf, sizeof(buf), f)) {
00138       lineno++;
00139       /* Trim comments */
00140       c = buf;
00141       while ((c = strchr(c, '#'))) {
00142          if ((c == buf) || (*(c-1) == ' ') || (*(c-1) == '\t'))
00143             *c = '\0';
00144          else
00145             c++;
00146       }
00147 
00148       c = buf;
00149       while ((c = strchr(c, ';'))) {
00150          if ((c > buf) && (c[-1] == '\\')) {
00151             memmove(c - 1, c, strlen(c) + 1);
00152             c++;
00153          } else {
00154             *c = '\0';
00155             break;
00156          }
00157       }
00158 
00159       /* Trim trailing white space */
00160       while(!ast_strlen_zero(buf) && buf[strlen(buf) - 1] < 33)
00161          buf[strlen(buf) - 1] = '\0';
00162       if (!ast_strlen_zero(buf)) {
00163          c = strchr(buf, ':');
00164          if (c) {
00165             *c = '\0';
00166             c++;
00167             while ((*c) && (*c < 33))
00168                c++;
00169 #if 0
00170             printf("'%s' is '%s' at line %d\n", buf, c, lineno);
00171 #endif
00172             if (!strcasecmp(buf, "channel")) {
00173                if ((c2 = strchr(c, '/'))) {
00174                   *c2 = '\0';
00175                   c2++;
00176                   ast_string_field_set(o, tech, c);
00177                   ast_string_field_set(o, dest, c2);
00178                } else {
00179                   ast_log(LOG_NOTICE, "Channel should be in form Tech/Dest at line %d of %s\n", lineno, fn);
00180                }
00181             } else if (!strcasecmp(buf, "callerid")) {
00182                char cid_name[80] = {0}, cid_num[80] = {0};
00183                ast_callerid_split(c, cid_name, sizeof(cid_name), cid_num, sizeof(cid_num));
00184                ast_string_field_set(o, cid_num, cid_num);
00185                ast_string_field_set(o, cid_name, cid_name);
00186             } else if (!strcasecmp(buf, "application")) {
00187                ast_string_field_set(o, app, c);
00188             } else if (!strcasecmp(buf, "data")) {
00189                ast_string_field_set(o, data, c);
00190             } else if (!strcasecmp(buf, "maxretries")) {
00191                if (sscanf(c, "%30d", &o->maxretries) != 1) {
00192                   ast_log(LOG_WARNING, "Invalid max retries at line %d of %s\n", lineno, fn);
00193                   o->maxretries = 0;
00194                }
00195             } else if (!strcasecmp(buf, "codecs")) {
00196                ast_parse_allow_disallow(NULL, &o->format, c, 1);
00197             } else if (!strcasecmp(buf, "context")) {
00198                ast_string_field_set(o, context, c);
00199             } else if (!strcasecmp(buf, "extension")) {
00200                ast_string_field_set(o, exten, c);
00201             } else if (!strcasecmp(buf, "priority")) {
00202                if ((sscanf(c, "%30d", &o->priority) != 1) || (o->priority < 1)) {
00203                   ast_log(LOG_WARNING, "Invalid priority at line %d of %s\n", lineno, fn);
00204                   o->priority = 1;
00205                }
00206             } else if (!strcasecmp(buf, "retrytime")) {
00207                if ((sscanf(c, "%30d", &o->retrytime) != 1) || (o->retrytime < 1)) {
00208                   ast_log(LOG_WARNING, "Invalid retrytime at line %d of %s\n", lineno, fn);
00209                   o->retrytime = 300;
00210                }
00211             } else if (!strcasecmp(buf, "waittime")) {
00212                if ((sscanf(c, "%30d", &o->waittime) != 1) || (o->waittime < 1)) {
00213                   ast_log(LOG_WARNING, "Invalid waittime at line %d of %s\n", lineno, fn);
00214                   o->waittime = 45;
00215                }
00216             } else if (!strcasecmp(buf, "retry")) {
00217                o->retries++;
00218             } else if (!strcasecmp(buf, "startretry")) {
00219                if (sscanf(c, "%30ld", &o->callingpid) != 1) {
00220                   ast_log(LOG_WARNING, "Unable to retrieve calling PID!\n");
00221                   o->callingpid = 0;
00222                }
00223             } else if (!strcasecmp(buf, "endretry") || !strcasecmp(buf, "abortretry")) {
00224                o->callingpid = 0;
00225                o->retries++;
00226             } else if (!strcasecmp(buf, "delayedretry")) {
00227             } else if (!strcasecmp(buf, "setvar") || !strcasecmp(buf, "set")) {
00228                c2 = c;
00229                strsep(&c2, "=");
00230                if (c2) {
00231                   var = ast_variable_new(c, c2, fn);
00232                   if (var) {
00233                      /* Always insert at the end, because some people want to treat the spool file as a script */
00234                      if (last) {
00235                         last->next = var;
00236                      } else {
00237                         o->vars = var;
00238                      }
00239                      last = var;
00240                   }
00241                } else
00242                   ast_log(LOG_WARNING, "Malformed \"%s\" argument.  Should be \"%s: variable=value\"\n", buf, buf);
00243             } else if (!strcasecmp(buf, "account")) {
00244                ast_string_field_set(o, account, c);
00245             } else if (!strcasecmp(buf, "alwaysdelete")) {
00246                ast_set2_flag(&o->options, ast_true(c), SPOOL_FLAG_ALWAYS_DELETE);
00247             } else if (!strcasecmp(buf, "archive")) {
00248                ast_set2_flag(&o->options, ast_true(c), SPOOL_FLAG_ARCHIVE);
00249             } else {
00250                ast_log(LOG_WARNING, "Unknown keyword '%s' at line %d of %s\n", buf, lineno, fn);
00251             }
00252          } else
00253             ast_log(LOG_NOTICE, "Syntax error at line %d of %s\n", lineno, fn);
00254       }
00255    }
00256    ast_string_field_set(o, fn, fn);
00257    if (ast_strlen_zero(o->tech) || ast_strlen_zero(o->dest) || (ast_strlen_zero(o->app) && ast_strlen_zero(o->exten))) {
00258       ast_log(LOG_WARNING, "At least one of app or extension must be specified, along with tech and dest in file %s\n", fn);
00259       return -1;
00260    }
00261    return 0;
00262 }

static void* attempt_thread ( void *  data  )  [static]

Definition at line 339 of file pbx_spool.c.

References outgoing::account, outgoing::app, ast_channel_reason2str(), ast_log(), ast_pbx_outgoing_app(), ast_pbx_outgoing_exten(), ast_strlen_zero(), ast_verb, outgoing::cid_name, outgoing::cid_num, outgoing::context, outgoing::data, outgoing::dest, outgoing::exten, outgoing::fn, outgoing::format, free_outgoing(), LOG_NOTICE, outgoing::maxretries, outgoing::priority, queue_file(), remove_from_queue(), outgoing::retries, outgoing::retrytime, safe_append(), outgoing::tech, outgoing::vars, and outgoing::waittime.

Referenced by launch_service().

00340 {
00341    struct outgoing *o = data;
00342    int res, reason;
00343    if (!ast_strlen_zero(o->app)) {
00344       ast_verb(3, "Attempting call on %s/%s for application %s(%s) (Retry %d)\n", o->tech, o->dest, o->app, o->data, o->retries);
00345       res = ast_pbx_outgoing_app(o->tech, o->format, (void *) o->dest, o->waittime * 1000, o->app, o->data, &reason, 2 /* wait to finish */, o->cid_num, o->cid_name, o->vars, o->account, NULL);
00346       o->vars = NULL;
00347    } else {
00348       ast_verb(3, "Attempting call on %s/%s for %s@%s:%d (Retry %d)\n", o->tech, o->dest, o->exten, o->context,o->priority, o->retries);
00349       res = ast_pbx_outgoing_exten(o->tech, o->format, (void *) o->dest, o->waittime * 1000, o->context, o->exten, o->priority, &reason, 2 /* wait to finish */, o->cid_num, o->cid_name, o->vars, o->account, NULL);
00350       o->vars = NULL;
00351    }
00352    if (res) {
00353       ast_log(LOG_NOTICE, "Call failed to go through, reason (%d) %s\n", reason, ast_channel_reason2str(reason));
00354       if (o->retries >= o->maxretries + 1) {
00355          /* Max retries exceeded */
00356          ast_log(LOG_NOTICE, "Queued call to %s/%s expired without completion after %d attempt%s\n", o->tech, o->dest, o->retries - 1, ((o->retries - 1) != 1) ? "s" : "");
00357          remove_from_queue(o, "Expired");
00358       } else {
00359          /* Notate that the call is still active */
00360          safe_append(o, time(NULL), "EndRetry");
00361 #if defined(HAVE_INOTIFY) || defined(HAVE_KQUEUE)
00362          queue_file(o->fn, time(NULL) + o->retrytime);
00363 #endif
00364       }
00365    } else {
00366       ast_log(LOG_NOTICE, "Call completed to %s/%s\n", o->tech, o->dest);
00367       remove_from_queue(o, "Completed");
00368    }
00369    free_outgoing(o);
00370    return NULL;
00371 }

static void free_outgoing ( struct outgoing o  )  [static]

Definition at line 117 of file pbx_spool.c.

References ast_free, ast_string_field_free_memory, ast_variables_destroy(), and outgoing::vars.

Referenced by attempt_thread(), launch_service(), and scan_service().

00118 {
00119    if (o->vars) {
00120       ast_variables_destroy(o->vars);
00121    }
00122    ast_string_field_free_memory(o);
00123    ast_free(o);
00124 }

static int init_outgoing ( struct outgoing o  )  [static]

Definition at line 104 of file pbx_spool.c.

References AST_FORMAT_SLINEAR, ast_set_flag, ast_string_field_init, outgoing::format, outgoing::options, outgoing::priority, outgoing::retrytime, SPOOL_FLAG_ALWAYS_DELETE, and outgoing::waittime.

Referenced by scan_service().

00105 {
00106    o->priority = 1;
00107    o->retrytime = 300;
00108    o->waittime = 45;
00109    o->format = AST_FORMAT_SLINEAR;
00110    ast_set_flag(&o->options, SPOOL_FLAG_ALWAYS_DELETE);
00111    if (ast_string_field_init(o, 128)) {
00112       return -1;
00113    }
00114    return 0;
00115 }

static void launch_service ( struct outgoing o  )  [static]

Definition at line 373 of file pbx_spool.c.

References ast_log(), ast_pthread_create_detached, attempt_thread(), free_outgoing(), and LOG_WARNING.

Referenced by scan_service().

00374 {
00375    pthread_t t;
00376    int ret;
00377 
00378    if ((ret = ast_pthread_create_detached(&t, NULL, attempt_thread, o))) {
00379       ast_log(LOG_WARNING, "Unable to create thread :( (returned error: %d)\n", ret);
00380       free_outgoing(o);
00381    }
00382 }

static int load_module ( void   )  [static]

Definition at line 845 of file pbx_spool.c.

References ast_config_AST_SPOOL_DIR, ast_log(), ast_mkdir(), AST_MODULE_LOAD_DECLINE, AST_MODULE_LOAD_FAILURE, AST_MODULE_LOAD_SUCCESS, ast_pthread_create_detached_background, LOG_WARNING, scan_thread(), and thread.

00846 {
00847    pthread_t thread;
00848    int ret;
00849    snprintf(qdir, sizeof(qdir), "%s/%s", ast_config_AST_SPOOL_DIR, "outgoing");
00850    if (ast_mkdir(qdir, 0777)) {
00851       ast_log(LOG_WARNING, "Unable to create queue directory %s -- outgoing spool disabled\n", qdir);
00852       return AST_MODULE_LOAD_DECLINE;
00853    }
00854    snprintf(qdonedir, sizeof(qdir), "%s/%s", ast_config_AST_SPOOL_DIR, "outgoing_done");
00855 
00856    if ((ret = ast_pthread_create_detached_background(&thread, NULL, scan_thread, NULL))) {
00857       ast_log(LOG_WARNING, "Unable to create thread :( (returned error: %d)\n", ret);
00858       return AST_MODULE_LOAD_FAILURE;
00859    }
00860 
00861    return AST_MODULE_LOAD_SUCCESS;
00862 }

static void queue_created_files ( void   )  [static]

Definition at line 567 of file pbx_spool.c.

References ast_free, AST_LIST_REMOVE_CURRENT, AST_LIST_TRAVERSE_SAFE_BEGIN, AST_LIST_TRAVERSE_SAFE_END, direntry::list, direntry::mtime, direntry::name, and queue_file().

Referenced by scan_thread().

00568 {
00569    struct direntry *cur;
00570    time_t now = time(NULL);
00571 
00572    AST_LIST_TRAVERSE_SAFE_BEGIN(&createlist, cur, list) {
00573       if (cur->mtime > now) {
00574          break;
00575       }
00576 
00577       AST_LIST_REMOVE_CURRENT(list);
00578       queue_file(cur->name, 0);
00579       ast_free(cur);
00580    }
00581    AST_LIST_TRAVERSE_SAFE_END
00582 }

static void queue_file ( const char *  filename,
time_t  when 
) [static]

Definition at line 470 of file pbx_spool.c.

References ast_calloc, AST_LIST_EMPTY, AST_LIST_INSERT_BEFORE_CURRENT, AST_LIST_INSERT_HEAD, AST_LIST_INSERT_TAIL, AST_LIST_LOCK, AST_LIST_TRAVERSE, AST_LIST_TRAVERSE_SAFE_BEGIN, AST_LIST_TRAVERSE_SAFE_END, AST_LIST_UNLOCK, ast_log(), errno, direntry::list, LOG_WARNING, direntry::mtime, direntry::name, and scan_service().

Referenced by attempt_thread(), queue_created_files(), queue_file_write(), and scan_thread().

00471 {
00472    struct stat st;
00473    struct direntry *cur, *new;
00474    int res;
00475    time_t now = time(NULL);
00476 
00477    if (filename[0] != '/') {
00478       char *fn = alloca(strlen(qdir) + strlen(filename) + 2);
00479       sprintf(fn, "%s/%s", qdir, filename); /* SAFE */
00480       filename = fn;
00481    }
00482 
00483    if (when == 0) {
00484       if (stat(filename, &st)) {
00485          ast_log(LOG_WARNING, "Unable to stat %s: %s\n", filename, strerror(errno));
00486          return;
00487       }
00488 
00489       if (!S_ISREG(st.st_mode)) {
00490          return;
00491       }
00492 
00493       when = st.st_mtime;
00494    }
00495 
00496    /* Need to check the existing list in order to avoid duplicates. */
00497    AST_LIST_LOCK(&dirlist);
00498    AST_LIST_TRAVERSE(&dirlist, cur, list) {
00499       if (cur->mtime == when && !strcmp(filename, cur->name)) {
00500          AST_LIST_UNLOCK(&dirlist);
00501          return;
00502       }
00503    }
00504 
00505    if ((res = when) > now || (res = scan_service(filename, now)) > 0) {
00506       if (!(new = ast_calloc(1, sizeof(*new) + strlen(filename) + 1))) {
00507          AST_LIST_UNLOCK(&dirlist);
00508          return;
00509       }
00510       new->mtime = res;
00511       strcpy(new->name, filename);
00512       /* List is ordered by mtime */
00513       if (AST_LIST_EMPTY(&dirlist)) {
00514          AST_LIST_INSERT_HEAD(&dirlist, new, list);
00515       } else {
00516          int found = 0;
00517          AST_LIST_TRAVERSE_SAFE_BEGIN(&dirlist, cur, list) {
00518             if (cur->mtime > new->mtime) {
00519                AST_LIST_INSERT_BEFORE_CURRENT(new, list);
00520                found = 1;
00521                break;
00522             }
00523          }
00524          AST_LIST_TRAVERSE_SAFE_END
00525          if (!found) {
00526             AST_LIST_INSERT_TAIL(&dirlist, new, list);
00527          }
00528       }
00529    }
00530    AST_LIST_UNLOCK(&dirlist);
00531 }

static void queue_file_create ( const char *  filename  )  [static]

Definition at line 534 of file pbx_spool.c.

References ast_calloc, AST_LIST_INSERT_TAIL, AST_LIST_TRAVERSE, direntry::list, and direntry::name.

Referenced by scan_thread().

00535 {
00536    struct direntry *cur;
00537 
00538    AST_LIST_TRAVERSE(&createlist, cur, list) {
00539       if (!strcmp(cur->name, filename)) {
00540          return;
00541       }
00542    }
00543 
00544    if (!(cur = ast_calloc(1, sizeof(*cur) + strlen(filename) + 1))) {
00545       return;
00546    }
00547    strcpy(cur->name, filename);
00548    /* We'll handle this file unless an IN_OPEN event occurs within 2 seconds */
00549    cur->mtime = time(NULL) + 2;
00550    AST_LIST_INSERT_TAIL(&createlist, cur, list);
00551 }

static void queue_file_open ( const char *  filename  )  [static]

Definition at line 553 of file pbx_spool.c.

References AST_LIST_INSERT_TAIL, AST_LIST_REMOVE_CURRENT, AST_LIST_TRAVERSE_SAFE_BEGIN, AST_LIST_TRAVERSE_SAFE_END, direntry::list, and direntry::name.

Referenced by scan_thread().

00554 {
00555    struct direntry *cur;
00556 
00557    AST_LIST_TRAVERSE_SAFE_BEGIN(&createlist, cur, list) {
00558       if (!strcmp(cur->name, filename)) {
00559          AST_LIST_REMOVE_CURRENT(list);
00560          AST_LIST_INSERT_TAIL(&openlist, cur, list);
00561          break;
00562       }
00563    }
00564    AST_LIST_TRAVERSE_SAFE_END
00565 }

static void queue_file_write ( const char *  filename  )  [static]

Definition at line 584 of file pbx_spool.c.

References ast_free, AST_LIST_REMOVE_CURRENT, AST_LIST_TRAVERSE_SAFE_BEGIN, AST_LIST_TRAVERSE_SAFE_END, direntry::list, direntry::name, and queue_file().

Referenced by scan_thread().

00585 {
00586    struct direntry *cur;
00587    /* Only queue entries where an IN_CREATE preceded the IN_CLOSE_WRITE */
00588    AST_LIST_TRAVERSE_SAFE_BEGIN(&openlist, cur, list) {
00589       if (!strcmp(cur->name, filename)) {
00590          AST_LIST_REMOVE_CURRENT(list);
00591          ast_free(cur);
00592          queue_file(filename, 0);
00593          break;
00594       }
00595    }
00596    AST_LIST_TRAVERSE_SAFE_END
00597 }

static int remove_from_queue ( struct outgoing o,
const char *  status 
) [static]

Remove a call file from the outgoing queue optionally moving it in the archive dir.

Parameters:
o the pointer to outgoing struct
status the exit status of the call. Can be "Completed", "Failed" or "Expired"

Definition at line 288 of file pbx_spool.c.

References ast_log(), ast_mkdir(), ast_test_flag, f, outgoing::fn, LOG_WARNING, outgoing::options, SPOOL_FLAG_ALWAYS_DELETE, and SPOOL_FLAG_ARCHIVE.

00289 {
00290    FILE *f;
00291    char newfn[256];
00292    const char *bname;
00293 
00294    if (!ast_test_flag(&o->options, SPOOL_FLAG_ALWAYS_DELETE)) {
00295       struct stat current_file_status;
00296 
00297       if (!stat(o->fn, &current_file_status)) {
00298          if (time(NULL) < current_file_status.st_mtime) {
00299             return 0;
00300          }
00301       }
00302    }
00303 
00304    if (!ast_test_flag(&o->options, SPOOL_FLAG_ARCHIVE)) {
00305       unlink(o->fn);
00306       return 0;
00307    }
00308 
00309    if (ast_mkdir(qdonedir, 0777)) {
00310       ast_log(LOG_WARNING, "Unable to create queue directory %s -- outgoing spool archiving disabled\n", qdonedir);
00311       unlink(o->fn);
00312       return -1;
00313    }
00314 
00315    if (!(bname = strrchr(o->fn, '/'))) {
00316       bname = o->fn;
00317    } else {
00318       bname++;
00319    }
00320 
00321    snprintf(newfn, sizeof(newfn), "%s/%s", qdonedir, bname);
00322    /* a existing call file the archive dir is overwritten */
00323    unlink(newfn);
00324    if (rename(o->fn, newfn) != 0) {
00325       unlink(o->fn);
00326       return -1;
00327    }
00328 
00329    /* Only append to the file AFTER we move it out of the watched directory,
00330     * otherwise the fclose() causes another event for inotify(7) */
00331    if ((f = fopen(newfn, "a"))) {
00332       fprintf(f, "Status: %s\n", status);
00333       fclose(f);
00334    }
00335 
00336    return 0;
00337 }

static void safe_append ( struct outgoing o,
time_t  now,
char *  s 
) [static]

Definition at line 264 of file pbx_spool.c.

References ast_debug, ast_log(), ast_mainpid, outgoing::dest, errno, f, outgoing::fn, LOG_WARNING, outgoing::retries, outgoing::retrytime, and outgoing::tech.

Referenced by attempt_thread(), and scan_service().

00265 {
00266    FILE *f;
00267    struct utimbuf tbuf = { .actime = now, .modtime = now + o->retrytime };
00268 
00269    ast_debug(1, "Outgoing %s/%s: %s\n", o->tech, o->dest, s);
00270 
00271    if ((f = fopen(o->fn, "a"))) {
00272       fprintf(f, "\n%s: %ld %d (%ld)\n", s, (long)ast_mainpid, o->retries, (long) now);
00273       fclose(f);
00274    }
00275 
00276    /* Update the file time */
00277    if (utime(o->fn, &tbuf)) {
00278       ast_log(LOG_WARNING, "Unable to set utime on %s: %s\n", o->fn, strerror(errno));
00279    }
00280 }

static int scan_service ( const char *  fn,
time_t  now 
) [static]

Definition at line 385 of file pbx_spool.c.

References apply_outgoing(), ast_calloc, ast_free, ast_log(), ast_mainpid, errno, f, free_outgoing(), init_outgoing(), launch_service(), LOG_DEBUG, LOG_NOTICE, LOG_WARNING, remove_from_queue(), and safe_append().

Referenced by queue_file().

00386 {
00387    struct outgoing *o = NULL;
00388    FILE *f;
00389    int res = 0;
00390 
00391    if (!(o = ast_calloc(1, sizeof(*o)))) {
00392       ast_log(LOG_WARNING, "Out of memory ;(\n");
00393       return -1;
00394    }
00395 
00396    if (init_outgoing(o)) {
00397       /* No need to call free_outgoing here since we know the failure
00398        * was to allocate string fields and no variables have been allocated
00399        * yet.
00400        */
00401       ast_free(o);
00402       return -1;
00403    }
00404 
00405    /* Attempt to open the file */
00406    if (!(f = fopen(fn, "r"))) {
00407       remove_from_queue(o, "Failed");
00408       free_outgoing(o);
00409 #if !defined(HAVE_INOTIFY) && !defined(HAVE_KQUEUE)
00410       ast_log(LOG_WARNING, "Unable to open %s: %s, deleting\n", fn, strerror(errno));
00411 #endif
00412       return -1;
00413    }
00414 
00415    /* Read in and verify the contents */
00416    if (apply_outgoing(o, fn, f)) {
00417       remove_from_queue(o, "Failed");
00418       free_outgoing(o);
00419       ast_log(LOG_WARNING, "Invalid file contents in %s, deleting\n", fn);
00420       fclose(f);
00421       return -1;
00422    }
00423 
00424 #if 0
00425    printf("Filename: %s, Retries: %d, max: %d\n", fn, o->retries, o->maxretries);
00426 #endif
00427    fclose(f);
00428    if (o->retries <= o->maxretries) {
00429       now += o->retrytime;
00430       if (o->callingpid && (o->callingpid == ast_mainpid)) {
00431          safe_append(o, time(NULL), "DelayedRetry");
00432          ast_log(LOG_DEBUG, "Delaying retry since we're currently running '%s'\n", o->fn);
00433          free_outgoing(o);
00434       } else {
00435          /* Increment retries */
00436          o->retries++;
00437          /* If someone else was calling, they're presumably gone now
00438             so abort their retry and continue as we were... */
00439          if (o->callingpid)
00440             safe_append(o, time(NULL), "AbortRetry");
00441 
00442          safe_append(o, now, "StartRetry");
00443          launch_service(o);
00444       }
00445       res = now;
00446    } else {
00447       ast_log(LOG_NOTICE, "Queued call to %s/%s expired without completion after %d attempt%s\n", o->tech, o->dest, o->retries - 1, ((o->retries - 1) != 1) ? "s" : "");
00448       remove_from_queue(o, "Expired");
00449       free_outgoing(o);
00450    }
00451 
00452    return res;
00453 }

static void* scan_thread ( void *  unused  )  [static]

Definition at line 600 of file pbx_spool.c.

References ast_debug, ast_free, ast_fully_booted, AST_LIST_EMPTY, AST_LIST_FIRST, AST_LIST_LOCK, AST_LIST_REMOVE_HEAD, AST_LIST_UNLOCK, ast_log(), errno, HAVE_INOTIFY, inotify_fd, len(), direntry::list, LOG_ERROR, direntry::mtime, direntry::name, direntry::next, queue_created_files(), queue_file(), queue_file_create(), queue_file_open(), and queue_file_write().

Referenced by load_module().

00601 {
00602    DIR *dir;
00603    struct dirent *de;
00604    time_t now;
00605    struct timespec ts = { .tv_sec = 1 };
00606 #ifdef HAVE_INOTIFY
00607    ssize_t res;
00608    int inotify_fd = inotify_init();
00609    struct inotify_event *iev;
00610    char buf[8192] __attribute__((aligned (sizeof(int))));
00611    struct pollfd pfd = { .fd = inotify_fd, .events = POLLIN };
00612 #else
00613    struct timespec nowait = { 0, 1 };
00614    int inotify_fd = kqueue();
00615    struct kevent kev;
00616 #endif
00617    struct direntry *cur;
00618 
00619    while (!ast_fully_booted) {
00620       nanosleep(&ts, NULL);
00621    }
00622 
00623    if (inotify_fd < 0) {
00624       ast_log(LOG_ERROR, "Unable to initialize "
00625 #ifdef HAVE_INOTIFY
00626          "inotify(7)"
00627 #else
00628          "kqueue(2)"
00629 #endif
00630          "\n");
00631       return NULL;
00632    }
00633 
00634 #ifdef HAVE_INOTIFY
00635    inotify_add_watch(inotify_fd, qdir, IN_CREATE | IN_OPEN | IN_CLOSE_WRITE | IN_MOVED_TO);
00636 #endif
00637 
00638    /* First, run through the directory and clear existing entries */
00639    if (!(dir = opendir(qdir))) {
00640       ast_log(LOG_ERROR, "Unable to open directory %s: %s\n", qdir, strerror(errno));
00641       return NULL;
00642    }
00643 
00644 #ifndef HAVE_INOTIFY
00645    EV_SET(&kev, dirfd(dir), EVFILT_VNODE, EV_ADD | EV_ENABLE | EV_CLEAR, NOTE_WRITE, 0, NULL);
00646    if (kevent(inotify_fd, &kev, 1, NULL, 0, &nowait) < 0 && errno != 0) {
00647       ast_log(LOG_ERROR, "Unable to watch directory %s: %s\n", qdir, strerror(errno));
00648    }
00649 #endif
00650    now = time(NULL);
00651    while ((de = readdir(dir))) {
00652       queue_file(de->d_name, 0);
00653    }
00654 
00655 #ifdef HAVE_INOTIFY
00656    /* Directory needs to remain open for kqueue(2) */
00657    closedir(dir);
00658 #endif
00659 
00660    /* Wait for either a) next timestamp to occur, or b) a change to happen */
00661    for (;/* ever */;) {
00662       time_t next = AST_LIST_EMPTY(&dirlist) ? INT_MAX : AST_LIST_FIRST(&dirlist)->mtime;
00663 
00664       time(&now);
00665       if (next > now) {
00666 #ifdef HAVE_INOTIFY
00667          int stage = 0;
00668          /* Convert from seconds to milliseconds, unless there's nothing
00669           * in the queue already, in which case, we wait forever. */
00670          int waittime = next == INT_MAX ? -1 : (next - now) * 1000;
00671          if (!AST_LIST_EMPTY(&createlist)) {
00672             waittime = 1000;
00673          }
00674          /* When a file arrives, add it to the queue, in mtime order. */
00675          if ((res = poll(&pfd, 1, waittime)) > 0 && (stage = 1) &&
00676             (res = read(inotify_fd, &buf, sizeof(buf))) >= sizeof(*iev)) {
00677             ssize_t len = 0;
00678             /* File(s) added to directory, add them to my list */
00679             for (iev = (void *) buf; res >= sizeof(*iev); iev = (struct inotify_event *) (((char *) iev) + len)) {
00680                /* For an IN_MOVED_TO event, simply process the file. However, if
00681                 * we get an IN_CREATE event it *might* be an open(O_CREAT) or it
00682                 * might be a hardlink (like smsq does, since rename() might
00683                 * overwrite an existing file). So we have to see if we get a
00684                 * subsequent IN_OPEN event on the same file. If we do, keep it
00685                 * on the openlist and wait for the corresponding IN_CLOSE_WRITE.
00686                 * If we *don't* see an IN_OPEN event, then it was a hard link so
00687                 * it can be processed immediately.
00688                 *
00689                 * Unfortunately, although open(O_CREAT) is an atomic file system
00690                 * operation, the inotify subsystem doesn't give it to us in a
00691                 * single event with both IN_CREATE|IN_OPEN set. It's two separate
00692                 * events, and the kernel doesn't even give them to us at the same
00693                 * time. We can read() from inotify_fd after the IN_CREATE event,
00694                 * and get *nothing* from it. The IN_OPEN arrives only later! So
00695                 * we have a very short timeout of 2 seconds. */
00696                if (iev->mask & IN_CREATE) {
00697                   queue_file_create(iev->name);
00698                } else if (iev->mask & IN_OPEN) {
00699                   queue_file_open(iev->name);
00700                } else if (iev->mask & IN_CLOSE_WRITE) {
00701                   queue_file_write(iev->name);
00702                } else if (iev->mask & IN_MOVED_TO) {
00703                   queue_file(iev->name, 0);
00704                } else {
00705                   ast_log(LOG_ERROR, "Unexpected event %d for file '%s'\n", (int) iev->mask, iev->name);
00706                }
00707 
00708                len = sizeof(*iev) + iev->len;
00709                res -= len;
00710             }
00711          } else if (res < 0 && errno != EINTR && errno != EAGAIN) {
00712             ast_debug(1, "Got an error back from %s(2): %s\n", stage ? "read" : "poll", strerror(errno));
00713          }
00714          time(&now);
00715       }
00716       queue_created_files();
00717 #else
00718          struct timespec ts2 = { next - now, 0 };
00719          if (kevent(inotify_fd, NULL, 0, &kev, 1, &ts2) <= 0) {
00720             /* Interrupt or timeout, restart calculations */
00721             continue;
00722          } else {
00723             /* Directory changed, rescan */
00724             rewinddir(dir);
00725             while ((de = readdir(dir))) {
00726                queue_file(de->d_name, 0);
00727             }
00728          }
00729          time(&now);
00730       }
00731 #endif
00732 
00733       /* Empty the list of all entries ready to be processed */
00734       AST_LIST_LOCK(&dirlist);
00735       while (!AST_LIST_EMPTY(&dirlist) && AST_LIST_FIRST(&dirlist)->mtime <= now) {
00736          cur = AST_LIST_REMOVE_HEAD(&dirlist, list);
00737          queue_file(cur->name, cur->mtime);
00738          ast_free(cur);
00739       }
00740       AST_LIST_UNLOCK(&dirlist);
00741    }
00742    return NULL;
00743 }

static int unload_module ( void   )  [static]

Definition at line 840 of file pbx_spool.c.

00841 {
00842    return -1;
00843 }


Variable Documentation

struct ast_module_info __mod_info = { .name = AST_MODULE, .flags = AST_MODFLAG_LOAD_ORDER , .description = "Outgoing Spool Support" , .key = "This paragraph is copyright (c) 2006 by Digium, Inc. \In order for your module to load, it must return this \key via a function called \"key\". Any code which \includes this paragraph must be licensed under the GNU \General Public License version 2 or later (at your \option). In addition to Digium's general reservations \of rights, Digium expressly reserves the right to \allow other parties to license this paragraph under \different terms. Any use of Digium, Inc. trademarks or \logos (including \"Asterisk\" or \"Digium\") without \express written permission of Digium, Inc. is prohibited.\n" , .buildopt_sum = "ac1f6a56484a8820659555499174e588" , .load = load_module, .unload = unload_module, .load_pri = AST_MODPRI_DEFAULT, } [static]

Definition at line 864 of file pbx_spool.c.

struct ast_module_info* ast_module_info = &__mod_info [static]

Definition at line 864 of file pbx_spool.c.

char qdir[255] [static]

Definition at line 72 of file pbx_spool.c.

char qdonedir[255] [static]

Definition at line 73 of file pbx_spool.c.


Generated on Mon Oct 8 12:39:26 2012 for Asterisk - The Open Source Telephony Project by  doxygen 1.4.7