#include "asterisk.h"
#include <sys/stat.h>
#include <time.h>
#include <utime.h>
#include <dirent.h>
#include <sys/inotify.h>
#include "asterisk/paths.h"
#include "asterisk/lock.h"
#include "asterisk/file.h"
#include "asterisk/logger.h"
#include "asterisk/channel.h"
#include "asterisk/callerid.h"
#include "asterisk/pbx.h"
#include "asterisk/module.h"
#include "asterisk/utils.h"
#include "asterisk/options.h"
Go to the source code of this file.
Data Structures | |
struct | createlist |
struct | direntry |
struct | dirlist |
struct | openlist |
struct | outgoing |
Enumerations | |
enum | { SPOOL_FLAG_ALWAYS_DELETE = (1 << 0), SPOOL_FLAG_ARCHIVE = (1 << 1) } |
Functions | |
static void | __reg_module (void) |
static void | __unreg_module (void) |
static int | apply_outgoing (struct outgoing *o, const char *fn, FILE *f) |
static void * | attempt_thread (void *data) |
static void | free_outgoing (struct outgoing *o) |
static int | init_outgoing (struct outgoing *o) |
static void | launch_service (struct outgoing *o) |
static int | load_module (void) |
static void | queue_created_files (void) |
static void | queue_file (const char *filename, time_t when) |
static void | queue_file_create (const char *filename) |
static void | queue_file_open (const char *filename) |
static void | queue_file_write (const char *filename) |
static int | remove_from_queue (struct outgoing *o, const char *status) |
Remove a call file from the outgoing queue optionally moving it in the archive dir. | |
static void | safe_append (struct outgoing *o, time_t now, char *s) |
static int | scan_service (const char *fn, time_t now) |
static void * | scan_thread (void *unused) |
static int | unload_module (void) |
Variables | |
static struct ast_module_info | __mod_info = { .name = AST_MODULE, .flags = AST_MODFLAG_LOAD_ORDER , .description = "Outgoing Spool Support" , .key = "This paragraph is copyright (c) 2006 by Digium, Inc. \In order for your module to load, it must return this \key via a function called \"key\". Any code which \includes this paragraph must be licensed under the GNU \General Public License version 2 or later (at your \option). In addition to Digium's general reservations \of rights, Digium expressly reserves the right to \allow other parties to license this paragraph under \different terms. Any use of Digium, Inc. trademarks or \logos (including \"Asterisk\" or \"Digium\") without \express written permission of Digium, Inc. is prohibited.\n" , .buildopt_sum = "ac1f6a56484a8820659555499174e588" , .load = load_module, .unload = unload_module, .load_pri = AST_MODPRI_DEFAULT, } |
static struct ast_module_info * | ast_module_info = &__mod_info |
static char | qdir [255] |
static char | qdonedir [255] |
Definition in file pbx_spool.c.
anonymous enum |
Definition at line 62 of file pbx_spool.c.
00062 { 00063 /*! Always delete the call file after a call succeeds or the 00064 * maximum number of retries is exceeded, even if the 00065 * modification time of the call file is in the future. 00066 */ 00067 SPOOL_FLAG_ALWAYS_DELETE = (1 << 0), 00068 /* Don't unlink the call file after processing, move in qdonedir */ 00069 SPOOL_FLAG_ARCHIVE = (1 << 1), 00070 };
static void __reg_module | ( | void | ) | [static] |
Definition at line 864 of file pbx_spool.c.
static void __unreg_module | ( | void | ) | [static] |
Definition at line 864 of file pbx_spool.c.
static int apply_outgoing | ( | struct outgoing * | o, | |
const char * | fn, | |||
FILE * | f | |||
) | [static] |
Definition at line 126 of file pbx_spool.c.
References outgoing::app, app, ast_callerid_split(), ast_log(), ast_parse_allow_disallow(), ast_set2_flag, ast_string_field_set, ast_strlen_zero(), ast_true(), ast_variable_new(), outgoing::callingpid, cid_name, cid_num, context, outgoing::dest, outgoing::exten, exten, outgoing::format, last, LOG_NOTICE, LOG_WARNING, outgoing::maxretries, sla_ringing_trunk::next, outgoing::options, outgoing::priority, outgoing::retries, outgoing::retrytime, SPOOL_FLAG_ALWAYS_DELETE, SPOOL_FLAG_ARCHIVE, strsep(), outgoing::tech, var, outgoing::vars, and outgoing::waittime.
Referenced by scan_service().
00127 { 00128 char buf[256]; 00129 char *c, *c2; 00130 int lineno = 0; 00131 struct ast_variable *var, *last = o->vars; 00132 00133 while (last && last->next) { 00134 last = last->next; 00135 } 00136 00137 while(fgets(buf, sizeof(buf), f)) { 00138 lineno++; 00139 /* Trim comments */ 00140 c = buf; 00141 while ((c = strchr(c, '#'))) { 00142 if ((c == buf) || (*(c-1) == ' ') || (*(c-1) == '\t')) 00143 *c = '\0'; 00144 else 00145 c++; 00146 } 00147 00148 c = buf; 00149 while ((c = strchr(c, ';'))) { 00150 if ((c > buf) && (c[-1] == '\\')) { 00151 memmove(c - 1, c, strlen(c) + 1); 00152 c++; 00153 } else { 00154 *c = '\0'; 00155 break; 00156 } 00157 } 00158 00159 /* Trim trailing white space */ 00160 while(!ast_strlen_zero(buf) && buf[strlen(buf) - 1] < 33) 00161 buf[strlen(buf) - 1] = '\0'; 00162 if (!ast_strlen_zero(buf)) { 00163 c = strchr(buf, ':'); 00164 if (c) { 00165 *c = '\0'; 00166 c++; 00167 while ((*c) && (*c < 33)) 00168 c++; 00169 #if 0 00170 printf("'%s' is '%s' at line %d\n", buf, c, lineno); 00171 #endif 00172 if (!strcasecmp(buf, "channel")) { 00173 if ((c2 = strchr(c, '/'))) { 00174 *c2 = '\0'; 00175 c2++; 00176 ast_string_field_set(o, tech, c); 00177 ast_string_field_set(o, dest, c2); 00178 } else { 00179 ast_log(LOG_NOTICE, "Channel should be in form Tech/Dest at line %d of %s\n", lineno, fn); 00180 } 00181 } else if (!strcasecmp(buf, "callerid")) { 00182 char cid_name[80] = {0}, cid_num[80] = {0}; 00183 ast_callerid_split(c, cid_name, sizeof(cid_name), cid_num, sizeof(cid_num)); 00184 ast_string_field_set(o, cid_num, cid_num); 00185 ast_string_field_set(o, cid_name, cid_name); 00186 } else if (!strcasecmp(buf, "application")) { 00187 ast_string_field_set(o, app, c); 00188 } else if (!strcasecmp(buf, "data")) { 00189 ast_string_field_set(o, data, c); 00190 } else if (!strcasecmp(buf, "maxretries")) { 00191 if (sscanf(c, "%30d", &o->maxretries) != 1) { 00192 ast_log(LOG_WARNING, "Invalid max retries at line %d of %s\n", lineno, fn); 00193 o->maxretries = 0; 00194 } 00195 } else if (!strcasecmp(buf, "codecs")) { 00196 ast_parse_allow_disallow(NULL, &o->format, c, 1); 00197 } else if (!strcasecmp(buf, "context")) { 00198 ast_string_field_set(o, context, c); 00199 } else if (!strcasecmp(buf, "extension")) { 00200 ast_string_field_set(o, exten, c); 00201 } else if (!strcasecmp(buf, "priority")) { 00202 if ((sscanf(c, "%30d", &o->priority) != 1) || (o->priority < 1)) { 00203 ast_log(LOG_WARNING, "Invalid priority at line %d of %s\n", lineno, fn); 00204 o->priority = 1; 00205 } 00206 } else if (!strcasecmp(buf, "retrytime")) { 00207 if ((sscanf(c, "%30d", &o->retrytime) != 1) || (o->retrytime < 1)) { 00208 ast_log(LOG_WARNING, "Invalid retrytime at line %d of %s\n", lineno, fn); 00209 o->retrytime = 300; 00210 } 00211 } else if (!strcasecmp(buf, "waittime")) { 00212 if ((sscanf(c, "%30d", &o->waittime) != 1) || (o->waittime < 1)) { 00213 ast_log(LOG_WARNING, "Invalid waittime at line %d of %s\n", lineno, fn); 00214 o->waittime = 45; 00215 } 00216 } else if (!strcasecmp(buf, "retry")) { 00217 o->retries++; 00218 } else if (!strcasecmp(buf, "startretry")) { 00219 if (sscanf(c, "%30ld", &o->callingpid) != 1) { 00220 ast_log(LOG_WARNING, "Unable to retrieve calling PID!\n"); 00221 o->callingpid = 0; 00222 } 00223 } else if (!strcasecmp(buf, "endretry") || !strcasecmp(buf, "abortretry")) { 00224 o->callingpid = 0; 00225 o->retries++; 00226 } else if (!strcasecmp(buf, "delayedretry")) { 00227 } else if (!strcasecmp(buf, "setvar") || !strcasecmp(buf, "set")) { 00228 c2 = c; 00229 strsep(&c2, "="); 00230 if (c2) { 00231 var = ast_variable_new(c, c2, fn); 00232 if (var) { 00233 /* Always insert at the end, because some people want to treat the spool file as a script */ 00234 if (last) { 00235 last->next = var; 00236 } else { 00237 o->vars = var; 00238 } 00239 last = var; 00240 } 00241 } else 00242 ast_log(LOG_WARNING, "Malformed \"%s\" argument. Should be \"%s: variable=value\"\n", buf, buf); 00243 } else if (!strcasecmp(buf, "account")) { 00244 ast_string_field_set(o, account, c); 00245 } else if (!strcasecmp(buf, "alwaysdelete")) { 00246 ast_set2_flag(&o->options, ast_true(c), SPOOL_FLAG_ALWAYS_DELETE); 00247 } else if (!strcasecmp(buf, "archive")) { 00248 ast_set2_flag(&o->options, ast_true(c), SPOOL_FLAG_ARCHIVE); 00249 } else { 00250 ast_log(LOG_WARNING, "Unknown keyword '%s' at line %d of %s\n", buf, lineno, fn); 00251 } 00252 } else 00253 ast_log(LOG_NOTICE, "Syntax error at line %d of %s\n", lineno, fn); 00254 } 00255 } 00256 ast_string_field_set(o, fn, fn); 00257 if (ast_strlen_zero(o->tech) || ast_strlen_zero(o->dest) || (ast_strlen_zero(o->app) && ast_strlen_zero(o->exten))) { 00258 ast_log(LOG_WARNING, "At least one of app or extension must be specified, along with tech and dest in file %s\n", fn); 00259 return -1; 00260 } 00261 return 0; 00262 }
static void* attempt_thread | ( | void * | data | ) | [static] |
Definition at line 339 of file pbx_spool.c.
References outgoing::account, outgoing::app, ast_channel_reason2str(), ast_log(), ast_pbx_outgoing_app(), ast_pbx_outgoing_exten(), ast_strlen_zero(), ast_verb, outgoing::cid_name, outgoing::cid_num, outgoing::context, outgoing::data, outgoing::dest, outgoing::exten, outgoing::fn, outgoing::format, free_outgoing(), LOG_NOTICE, outgoing::maxretries, outgoing::priority, queue_file(), remove_from_queue(), outgoing::retries, outgoing::retrytime, safe_append(), outgoing::tech, outgoing::vars, and outgoing::waittime.
Referenced by launch_service().
00340 { 00341 struct outgoing *o = data; 00342 int res, reason; 00343 if (!ast_strlen_zero(o->app)) { 00344 ast_verb(3, "Attempting call on %s/%s for application %s(%s) (Retry %d)\n", o->tech, o->dest, o->app, o->data, o->retries); 00345 res = ast_pbx_outgoing_app(o->tech, o->format, (void *) o->dest, o->waittime * 1000, o->app, o->data, &reason, 2 /* wait to finish */, o->cid_num, o->cid_name, o->vars, o->account, NULL); 00346 o->vars = NULL; 00347 } else { 00348 ast_verb(3, "Attempting call on %s/%s for %s@%s:%d (Retry %d)\n", o->tech, o->dest, o->exten, o->context,o->priority, o->retries); 00349 res = ast_pbx_outgoing_exten(o->tech, o->format, (void *) o->dest, o->waittime * 1000, o->context, o->exten, o->priority, &reason, 2 /* wait to finish */, o->cid_num, o->cid_name, o->vars, o->account, NULL); 00350 o->vars = NULL; 00351 } 00352 if (res) { 00353 ast_log(LOG_NOTICE, "Call failed to go through, reason (%d) %s\n", reason, ast_channel_reason2str(reason)); 00354 if (o->retries >= o->maxretries + 1) { 00355 /* Max retries exceeded */ 00356 ast_log(LOG_NOTICE, "Queued call to %s/%s expired without completion after %d attempt%s\n", o->tech, o->dest, o->retries - 1, ((o->retries - 1) != 1) ? "s" : ""); 00357 remove_from_queue(o, "Expired"); 00358 } else { 00359 /* Notate that the call is still active */ 00360 safe_append(o, time(NULL), "EndRetry"); 00361 #if defined(HAVE_INOTIFY) || defined(HAVE_KQUEUE) 00362 queue_file(o->fn, time(NULL) + o->retrytime); 00363 #endif 00364 } 00365 } else { 00366 ast_log(LOG_NOTICE, "Call completed to %s/%s\n", o->tech, o->dest); 00367 remove_from_queue(o, "Completed"); 00368 } 00369 free_outgoing(o); 00370 return NULL; 00371 }
static void free_outgoing | ( | struct outgoing * | o | ) | [static] |
Definition at line 117 of file pbx_spool.c.
References ast_free, ast_string_field_free_memory, ast_variables_destroy(), and outgoing::vars.
Referenced by attempt_thread(), launch_service(), and scan_service().
00118 { 00119 if (o->vars) { 00120 ast_variables_destroy(o->vars); 00121 } 00122 ast_string_field_free_memory(o); 00123 ast_free(o); 00124 }
static int init_outgoing | ( | struct outgoing * | o | ) | [static] |
Definition at line 104 of file pbx_spool.c.
References AST_FORMAT_SLINEAR, ast_set_flag, ast_string_field_init, outgoing::format, outgoing::options, outgoing::priority, outgoing::retrytime, SPOOL_FLAG_ALWAYS_DELETE, and outgoing::waittime.
Referenced by scan_service().
00105 { 00106 o->priority = 1; 00107 o->retrytime = 300; 00108 o->waittime = 45; 00109 o->format = AST_FORMAT_SLINEAR; 00110 ast_set_flag(&o->options, SPOOL_FLAG_ALWAYS_DELETE); 00111 if (ast_string_field_init(o, 128)) { 00112 return -1; 00113 } 00114 return 0; 00115 }
static void launch_service | ( | struct outgoing * | o | ) | [static] |
Definition at line 373 of file pbx_spool.c.
References ast_log(), ast_pthread_create_detached, attempt_thread(), free_outgoing(), and LOG_WARNING.
Referenced by scan_service().
00374 { 00375 pthread_t t; 00376 int ret; 00377 00378 if ((ret = ast_pthread_create_detached(&t, NULL, attempt_thread, o))) { 00379 ast_log(LOG_WARNING, "Unable to create thread :( (returned error: %d)\n", ret); 00380 free_outgoing(o); 00381 } 00382 }
static int load_module | ( | void | ) | [static] |
Definition at line 845 of file pbx_spool.c.
References ast_config_AST_SPOOL_DIR, ast_log(), ast_mkdir(), AST_MODULE_LOAD_DECLINE, AST_MODULE_LOAD_FAILURE, AST_MODULE_LOAD_SUCCESS, ast_pthread_create_detached_background, LOG_WARNING, scan_thread(), and thread.
00846 { 00847 pthread_t thread; 00848 int ret; 00849 snprintf(qdir, sizeof(qdir), "%s/%s", ast_config_AST_SPOOL_DIR, "outgoing"); 00850 if (ast_mkdir(qdir, 0777)) { 00851 ast_log(LOG_WARNING, "Unable to create queue directory %s -- outgoing spool disabled\n", qdir); 00852 return AST_MODULE_LOAD_DECLINE; 00853 } 00854 snprintf(qdonedir, sizeof(qdir), "%s/%s", ast_config_AST_SPOOL_DIR, "outgoing_done"); 00855 00856 if ((ret = ast_pthread_create_detached_background(&thread, NULL, scan_thread, NULL))) { 00857 ast_log(LOG_WARNING, "Unable to create thread :( (returned error: %d)\n", ret); 00858 return AST_MODULE_LOAD_FAILURE; 00859 } 00860 00861 return AST_MODULE_LOAD_SUCCESS; 00862 }
static void queue_created_files | ( | void | ) | [static] |
Definition at line 567 of file pbx_spool.c.
References ast_free, AST_LIST_REMOVE_CURRENT, AST_LIST_TRAVERSE_SAFE_BEGIN, AST_LIST_TRAVERSE_SAFE_END, direntry::list, direntry::mtime, direntry::name, and queue_file().
Referenced by scan_thread().
00568 { 00569 struct direntry *cur; 00570 time_t now = time(NULL); 00571 00572 AST_LIST_TRAVERSE_SAFE_BEGIN(&createlist, cur, list) { 00573 if (cur->mtime > now) { 00574 break; 00575 } 00576 00577 AST_LIST_REMOVE_CURRENT(list); 00578 queue_file(cur->name, 0); 00579 ast_free(cur); 00580 } 00581 AST_LIST_TRAVERSE_SAFE_END 00582 }
static void queue_file | ( | const char * | filename, | |
time_t | when | |||
) | [static] |
Definition at line 470 of file pbx_spool.c.
References ast_calloc, AST_LIST_EMPTY, AST_LIST_INSERT_BEFORE_CURRENT, AST_LIST_INSERT_HEAD, AST_LIST_INSERT_TAIL, AST_LIST_LOCK, AST_LIST_TRAVERSE, AST_LIST_TRAVERSE_SAFE_BEGIN, AST_LIST_TRAVERSE_SAFE_END, AST_LIST_UNLOCK, ast_log(), errno, direntry::list, LOG_WARNING, direntry::mtime, direntry::name, and scan_service().
Referenced by attempt_thread(), queue_created_files(), queue_file_write(), and scan_thread().
00471 { 00472 struct stat st; 00473 struct direntry *cur, *new; 00474 int res; 00475 time_t now = time(NULL); 00476 00477 if (filename[0] != '/') { 00478 char *fn = alloca(strlen(qdir) + strlen(filename) + 2); 00479 sprintf(fn, "%s/%s", qdir, filename); /* SAFE */ 00480 filename = fn; 00481 } 00482 00483 if (when == 0) { 00484 if (stat(filename, &st)) { 00485 ast_log(LOG_WARNING, "Unable to stat %s: %s\n", filename, strerror(errno)); 00486 return; 00487 } 00488 00489 if (!S_ISREG(st.st_mode)) { 00490 return; 00491 } 00492 00493 when = st.st_mtime; 00494 } 00495 00496 /* Need to check the existing list in order to avoid duplicates. */ 00497 AST_LIST_LOCK(&dirlist); 00498 AST_LIST_TRAVERSE(&dirlist, cur, list) { 00499 if (cur->mtime == when && !strcmp(filename, cur->name)) { 00500 AST_LIST_UNLOCK(&dirlist); 00501 return; 00502 } 00503 } 00504 00505 if ((res = when) > now || (res = scan_service(filename, now)) > 0) { 00506 if (!(new = ast_calloc(1, sizeof(*new) + strlen(filename) + 1))) { 00507 AST_LIST_UNLOCK(&dirlist); 00508 return; 00509 } 00510 new->mtime = res; 00511 strcpy(new->name, filename); 00512 /* List is ordered by mtime */ 00513 if (AST_LIST_EMPTY(&dirlist)) { 00514 AST_LIST_INSERT_HEAD(&dirlist, new, list); 00515 } else { 00516 int found = 0; 00517 AST_LIST_TRAVERSE_SAFE_BEGIN(&dirlist, cur, list) { 00518 if (cur->mtime > new->mtime) { 00519 AST_LIST_INSERT_BEFORE_CURRENT(new, list); 00520 found = 1; 00521 break; 00522 } 00523 } 00524 AST_LIST_TRAVERSE_SAFE_END 00525 if (!found) { 00526 AST_LIST_INSERT_TAIL(&dirlist, new, list); 00527 } 00528 } 00529 } 00530 AST_LIST_UNLOCK(&dirlist); 00531 }
static void queue_file_create | ( | const char * | filename | ) | [static] |
Definition at line 534 of file pbx_spool.c.
References ast_calloc, AST_LIST_INSERT_TAIL, AST_LIST_TRAVERSE, direntry::list, and direntry::name.
Referenced by scan_thread().
00535 { 00536 struct direntry *cur; 00537 00538 AST_LIST_TRAVERSE(&createlist, cur, list) { 00539 if (!strcmp(cur->name, filename)) { 00540 return; 00541 } 00542 } 00543 00544 if (!(cur = ast_calloc(1, sizeof(*cur) + strlen(filename) + 1))) { 00545 return; 00546 } 00547 strcpy(cur->name, filename); 00548 /* We'll handle this file unless an IN_OPEN event occurs within 2 seconds */ 00549 cur->mtime = time(NULL) + 2; 00550 AST_LIST_INSERT_TAIL(&createlist, cur, list); 00551 }
static void queue_file_open | ( | const char * | filename | ) | [static] |
Definition at line 553 of file pbx_spool.c.
References AST_LIST_INSERT_TAIL, AST_LIST_REMOVE_CURRENT, AST_LIST_TRAVERSE_SAFE_BEGIN, AST_LIST_TRAVERSE_SAFE_END, direntry::list, and direntry::name.
Referenced by scan_thread().
00554 { 00555 struct direntry *cur; 00556 00557 AST_LIST_TRAVERSE_SAFE_BEGIN(&createlist, cur, list) { 00558 if (!strcmp(cur->name, filename)) { 00559 AST_LIST_REMOVE_CURRENT(list); 00560 AST_LIST_INSERT_TAIL(&openlist, cur, list); 00561 break; 00562 } 00563 } 00564 AST_LIST_TRAVERSE_SAFE_END 00565 }
static void queue_file_write | ( | const char * | filename | ) | [static] |
Definition at line 584 of file pbx_spool.c.
References ast_free, AST_LIST_REMOVE_CURRENT, AST_LIST_TRAVERSE_SAFE_BEGIN, AST_LIST_TRAVERSE_SAFE_END, direntry::list, direntry::name, and queue_file().
Referenced by scan_thread().
00585 { 00586 struct direntry *cur; 00587 /* Only queue entries where an IN_CREATE preceded the IN_CLOSE_WRITE */ 00588 AST_LIST_TRAVERSE_SAFE_BEGIN(&openlist, cur, list) { 00589 if (!strcmp(cur->name, filename)) { 00590 AST_LIST_REMOVE_CURRENT(list); 00591 ast_free(cur); 00592 queue_file(filename, 0); 00593 break; 00594 } 00595 } 00596 AST_LIST_TRAVERSE_SAFE_END 00597 }
static int remove_from_queue | ( | struct outgoing * | o, | |
const char * | status | |||
) | [static] |
Remove a call file from the outgoing queue optionally moving it in the archive dir.
o | the pointer to outgoing struct | |
status | the exit status of the call. Can be "Completed", "Failed" or "Expired" |
Definition at line 288 of file pbx_spool.c.
References ast_log(), ast_mkdir(), ast_test_flag, f, outgoing::fn, LOG_WARNING, outgoing::options, SPOOL_FLAG_ALWAYS_DELETE, and SPOOL_FLAG_ARCHIVE.
00289 { 00290 FILE *f; 00291 char newfn[256]; 00292 const char *bname; 00293 00294 if (!ast_test_flag(&o->options, SPOOL_FLAG_ALWAYS_DELETE)) { 00295 struct stat current_file_status; 00296 00297 if (!stat(o->fn, ¤t_file_status)) { 00298 if (time(NULL) < current_file_status.st_mtime) { 00299 return 0; 00300 } 00301 } 00302 } 00303 00304 if (!ast_test_flag(&o->options, SPOOL_FLAG_ARCHIVE)) { 00305 unlink(o->fn); 00306 return 0; 00307 } 00308 00309 if (ast_mkdir(qdonedir, 0777)) { 00310 ast_log(LOG_WARNING, "Unable to create queue directory %s -- outgoing spool archiving disabled\n", qdonedir); 00311 unlink(o->fn); 00312 return -1; 00313 } 00314 00315 if (!(bname = strrchr(o->fn, '/'))) { 00316 bname = o->fn; 00317 } else { 00318 bname++; 00319 } 00320 00321 snprintf(newfn, sizeof(newfn), "%s/%s", qdonedir, bname); 00322 /* a existing call file the archive dir is overwritten */ 00323 unlink(newfn); 00324 if (rename(o->fn, newfn) != 0) { 00325 unlink(o->fn); 00326 return -1; 00327 } 00328 00329 /* Only append to the file AFTER we move it out of the watched directory, 00330 * otherwise the fclose() causes another event for inotify(7) */ 00331 if ((f = fopen(newfn, "a"))) { 00332 fprintf(f, "Status: %s\n", status); 00333 fclose(f); 00334 } 00335 00336 return 0; 00337 }
static void safe_append | ( | struct outgoing * | o, | |
time_t | now, | |||
char * | s | |||
) | [static] |
Definition at line 264 of file pbx_spool.c.
References ast_debug, ast_log(), ast_mainpid, outgoing::dest, errno, f, outgoing::fn, LOG_WARNING, outgoing::retries, outgoing::retrytime, and outgoing::tech.
Referenced by attempt_thread(), and scan_service().
00265 { 00266 FILE *f; 00267 struct utimbuf tbuf = { .actime = now, .modtime = now + o->retrytime }; 00268 00269 ast_debug(1, "Outgoing %s/%s: %s\n", o->tech, o->dest, s); 00270 00271 if ((f = fopen(o->fn, "a"))) { 00272 fprintf(f, "\n%s: %ld %d (%ld)\n", s, (long)ast_mainpid, o->retries, (long) now); 00273 fclose(f); 00274 } 00275 00276 /* Update the file time */ 00277 if (utime(o->fn, &tbuf)) { 00278 ast_log(LOG_WARNING, "Unable to set utime on %s: %s\n", o->fn, strerror(errno)); 00279 } 00280 }
static int scan_service | ( | const char * | fn, | |
time_t | now | |||
) | [static] |
Definition at line 385 of file pbx_spool.c.
References apply_outgoing(), ast_calloc, ast_free, ast_log(), ast_mainpid, errno, f, free_outgoing(), init_outgoing(), launch_service(), LOG_DEBUG, LOG_NOTICE, LOG_WARNING, remove_from_queue(), and safe_append().
Referenced by queue_file().
00386 { 00387 struct outgoing *o = NULL; 00388 FILE *f; 00389 int res = 0; 00390 00391 if (!(o = ast_calloc(1, sizeof(*o)))) { 00392 ast_log(LOG_WARNING, "Out of memory ;(\n"); 00393 return -1; 00394 } 00395 00396 if (init_outgoing(o)) { 00397 /* No need to call free_outgoing here since we know the failure 00398 * was to allocate string fields and no variables have been allocated 00399 * yet. 00400 */ 00401 ast_free(o); 00402 return -1; 00403 } 00404 00405 /* Attempt to open the file */ 00406 if (!(f = fopen(fn, "r"))) { 00407 remove_from_queue(o, "Failed"); 00408 free_outgoing(o); 00409 #if !defined(HAVE_INOTIFY) && !defined(HAVE_KQUEUE) 00410 ast_log(LOG_WARNING, "Unable to open %s: %s, deleting\n", fn, strerror(errno)); 00411 #endif 00412 return -1; 00413 } 00414 00415 /* Read in and verify the contents */ 00416 if (apply_outgoing(o, fn, f)) { 00417 remove_from_queue(o, "Failed"); 00418 free_outgoing(o); 00419 ast_log(LOG_WARNING, "Invalid file contents in %s, deleting\n", fn); 00420 fclose(f); 00421 return -1; 00422 } 00423 00424 #if 0 00425 printf("Filename: %s, Retries: %d, max: %d\n", fn, o->retries, o->maxretries); 00426 #endif 00427 fclose(f); 00428 if (o->retries <= o->maxretries) { 00429 now += o->retrytime; 00430 if (o->callingpid && (o->callingpid == ast_mainpid)) { 00431 safe_append(o, time(NULL), "DelayedRetry"); 00432 ast_log(LOG_DEBUG, "Delaying retry since we're currently running '%s'\n", o->fn); 00433 free_outgoing(o); 00434 } else { 00435 /* Increment retries */ 00436 o->retries++; 00437 /* If someone else was calling, they're presumably gone now 00438 so abort their retry and continue as we were... */ 00439 if (o->callingpid) 00440 safe_append(o, time(NULL), "AbortRetry"); 00441 00442 safe_append(o, now, "StartRetry"); 00443 launch_service(o); 00444 } 00445 res = now; 00446 } else { 00447 ast_log(LOG_NOTICE, "Queued call to %s/%s expired without completion after %d attempt%s\n", o->tech, o->dest, o->retries - 1, ((o->retries - 1) != 1) ? "s" : ""); 00448 remove_from_queue(o, "Expired"); 00449 free_outgoing(o); 00450 } 00451 00452 return res; 00453 }
static void* scan_thread | ( | void * | unused | ) | [static] |
Definition at line 600 of file pbx_spool.c.
References ast_debug, ast_free, ast_fully_booted, AST_LIST_EMPTY, AST_LIST_FIRST, AST_LIST_LOCK, AST_LIST_REMOVE_HEAD, AST_LIST_UNLOCK, ast_log(), errno, HAVE_INOTIFY, inotify_fd, len(), direntry::list, LOG_ERROR, direntry::mtime, direntry::name, direntry::next, queue_created_files(), queue_file(), queue_file_create(), queue_file_open(), and queue_file_write().
Referenced by load_module().
00601 { 00602 DIR *dir; 00603 struct dirent *de; 00604 time_t now; 00605 struct timespec ts = { .tv_sec = 1 }; 00606 #ifdef HAVE_INOTIFY 00607 ssize_t res; 00608 int inotify_fd = inotify_init(); 00609 struct inotify_event *iev; 00610 char buf[8192] __attribute__((aligned (sizeof(int)))); 00611 struct pollfd pfd = { .fd = inotify_fd, .events = POLLIN }; 00612 #else 00613 struct timespec nowait = { 0, 1 }; 00614 int inotify_fd = kqueue(); 00615 struct kevent kev; 00616 #endif 00617 struct direntry *cur; 00618 00619 while (!ast_fully_booted) { 00620 nanosleep(&ts, NULL); 00621 } 00622 00623 if (inotify_fd < 0) { 00624 ast_log(LOG_ERROR, "Unable to initialize " 00625 #ifdef HAVE_INOTIFY 00626 "inotify(7)" 00627 #else 00628 "kqueue(2)" 00629 #endif 00630 "\n"); 00631 return NULL; 00632 } 00633 00634 #ifdef HAVE_INOTIFY 00635 inotify_add_watch(inotify_fd, qdir, IN_CREATE | IN_OPEN | IN_CLOSE_WRITE | IN_MOVED_TO); 00636 #endif 00637 00638 /* First, run through the directory and clear existing entries */ 00639 if (!(dir = opendir(qdir))) { 00640 ast_log(LOG_ERROR, "Unable to open directory %s: %s\n", qdir, strerror(errno)); 00641 return NULL; 00642 } 00643 00644 #ifndef HAVE_INOTIFY 00645 EV_SET(&kev, dirfd(dir), EVFILT_VNODE, EV_ADD | EV_ENABLE | EV_CLEAR, NOTE_WRITE, 0, NULL); 00646 if (kevent(inotify_fd, &kev, 1, NULL, 0, &nowait) < 0 && errno != 0) { 00647 ast_log(LOG_ERROR, "Unable to watch directory %s: %s\n", qdir, strerror(errno)); 00648 } 00649 #endif 00650 now = time(NULL); 00651 while ((de = readdir(dir))) { 00652 queue_file(de->d_name, 0); 00653 } 00654 00655 #ifdef HAVE_INOTIFY 00656 /* Directory needs to remain open for kqueue(2) */ 00657 closedir(dir); 00658 #endif 00659 00660 /* Wait for either a) next timestamp to occur, or b) a change to happen */ 00661 for (;/* ever */;) { 00662 time_t next = AST_LIST_EMPTY(&dirlist) ? INT_MAX : AST_LIST_FIRST(&dirlist)->mtime; 00663 00664 time(&now); 00665 if (next > now) { 00666 #ifdef HAVE_INOTIFY 00667 int stage = 0; 00668 /* Convert from seconds to milliseconds, unless there's nothing 00669 * in the queue already, in which case, we wait forever. */ 00670 int waittime = next == INT_MAX ? -1 : (next - now) * 1000; 00671 if (!AST_LIST_EMPTY(&createlist)) { 00672 waittime = 1000; 00673 } 00674 /* When a file arrives, add it to the queue, in mtime order. */ 00675 if ((res = poll(&pfd, 1, waittime)) > 0 && (stage = 1) && 00676 (res = read(inotify_fd, &buf, sizeof(buf))) >= sizeof(*iev)) { 00677 ssize_t len = 0; 00678 /* File(s) added to directory, add them to my list */ 00679 for (iev = (void *) buf; res >= sizeof(*iev); iev = (struct inotify_event *) (((char *) iev) + len)) { 00680 /* For an IN_MOVED_TO event, simply process the file. However, if 00681 * we get an IN_CREATE event it *might* be an open(O_CREAT) or it 00682 * might be a hardlink (like smsq does, since rename() might 00683 * overwrite an existing file). So we have to see if we get a 00684 * subsequent IN_OPEN event on the same file. If we do, keep it 00685 * on the openlist and wait for the corresponding IN_CLOSE_WRITE. 00686 * If we *don't* see an IN_OPEN event, then it was a hard link so 00687 * it can be processed immediately. 00688 * 00689 * Unfortunately, although open(O_CREAT) is an atomic file system 00690 * operation, the inotify subsystem doesn't give it to us in a 00691 * single event with both IN_CREATE|IN_OPEN set. It's two separate 00692 * events, and the kernel doesn't even give them to us at the same 00693 * time. We can read() from inotify_fd after the IN_CREATE event, 00694 * and get *nothing* from it. The IN_OPEN arrives only later! So 00695 * we have a very short timeout of 2 seconds. */ 00696 if (iev->mask & IN_CREATE) { 00697 queue_file_create(iev->name); 00698 } else if (iev->mask & IN_OPEN) { 00699 queue_file_open(iev->name); 00700 } else if (iev->mask & IN_CLOSE_WRITE) { 00701 queue_file_write(iev->name); 00702 } else if (iev->mask & IN_MOVED_TO) { 00703 queue_file(iev->name, 0); 00704 } else { 00705 ast_log(LOG_ERROR, "Unexpected event %d for file '%s'\n", (int) iev->mask, iev->name); 00706 } 00707 00708 len = sizeof(*iev) + iev->len; 00709 res -= len; 00710 } 00711 } else if (res < 0 && errno != EINTR && errno != EAGAIN) { 00712 ast_debug(1, "Got an error back from %s(2): %s\n", stage ? "read" : "poll", strerror(errno)); 00713 } 00714 time(&now); 00715 } 00716 queue_created_files(); 00717 #else 00718 struct timespec ts2 = { next - now, 0 }; 00719 if (kevent(inotify_fd, NULL, 0, &kev, 1, &ts2) <= 0) { 00720 /* Interrupt or timeout, restart calculations */ 00721 continue; 00722 } else { 00723 /* Directory changed, rescan */ 00724 rewinddir(dir); 00725 while ((de = readdir(dir))) { 00726 queue_file(de->d_name, 0); 00727 } 00728 } 00729 time(&now); 00730 } 00731 #endif 00732 00733 /* Empty the list of all entries ready to be processed */ 00734 AST_LIST_LOCK(&dirlist); 00735 while (!AST_LIST_EMPTY(&dirlist) && AST_LIST_FIRST(&dirlist)->mtime <= now) { 00736 cur = AST_LIST_REMOVE_HEAD(&dirlist, list); 00737 queue_file(cur->name, cur->mtime); 00738 ast_free(cur); 00739 } 00740 AST_LIST_UNLOCK(&dirlist); 00741 } 00742 return NULL; 00743 }
static int unload_module | ( | void | ) | [static] |
struct ast_module_info __mod_info = { .name = AST_MODULE, .flags = AST_MODFLAG_LOAD_ORDER , .description = "Outgoing Spool Support" , .key = "This paragraph is copyright (c) 2006 by Digium, Inc. \In order for your module to load, it must return this \key via a function called \"key\". Any code which \includes this paragraph must be licensed under the GNU \General Public License version 2 or later (at your \option). In addition to Digium's general reservations \of rights, Digium expressly reserves the right to \allow other parties to license this paragraph under \different terms. Any use of Digium, Inc. trademarks or \logos (including \"Asterisk\" or \"Digium\") without \express written permission of Digium, Inc. is prohibited.\n" , .buildopt_sum = "ac1f6a56484a8820659555499174e588" , .load = load_module, .unload = unload_module, .load_pri = AST_MODPRI_DEFAULT, } [static] |
Definition at line 864 of file pbx_spool.c.
struct ast_module_info* ast_module_info = &__mod_info [static] |
Definition at line 864 of file pbx_spool.c.
char qdir[255] [static] |
Definition at line 72 of file pbx_spool.c.
char qdonedir[255] [static] |
Definition at line 73 of file pbx_spool.c.