Full-featured outgoing call spool support. More...
#include "asterisk.h"
#include <sys/stat.h>
#include <time.h>
#include <utime.h>
#include <dirent.h>
#include <sys/inotify.h>
#include "asterisk/paths.h"
#include "asterisk/lock.h"
#include "asterisk/file.h"
#include "asterisk/logger.h"
#include "asterisk/channel.h"
#include "asterisk/callerid.h"
#include "asterisk/pbx.h"
#include "asterisk/module.h"
#include "asterisk/utils.h"
#include "asterisk/options.h"
Go to the source code of this file.
Data Structures | |
struct | createlist |
struct | direntry |
struct | dirlist |
struct | openlist |
struct | outgoing |
Enumerations | |
enum | { SPOOL_FLAG_ALWAYS_DELETE = (1 << 0), SPOOL_FLAG_ARCHIVE = (1 << 1) } |
Functions | |
static void | __reg_module (void) |
static void | __unreg_module (void) |
static int | apply_outgoing (struct outgoing *o, FILE *f) |
static void * | attempt_thread (void *data) |
static void | free_outgoing (struct outgoing *o) |
static void | launch_service (struct outgoing *o) |
static int | load_module (void) |
static struct outgoing * | new_outgoing (const char *fn) |
static void | queue_created_files (void) |
static void | queue_file (const char *filename, time_t when) |
static void | queue_file_create (const char *filename) |
static void | queue_file_open (const char *filename) |
static void | queue_file_write (const char *filename) |
static int | remove_from_queue (struct outgoing *o, const char *status) |
Remove a call file from the outgoing queue optionally moving it in the archive dir. | |
static void | safe_append (struct outgoing *o, time_t now, char *s) |
static int | scan_service (const char *fn, time_t now) |
static void * | scan_thread (void *unused) |
static int | unload_module (void) |
Variables | |
static struct ast_module_info | __mod_info = { .name = AST_MODULE, .flags = AST_MODFLAG_LOAD_ORDER , .description = "Outgoing Spool Support" , .key = "This paragraph is copyright (c) 2006 by Digium, Inc. \In order for your module to load, it must return this \key via a function called \"key\". Any code which \includes this paragraph must be licensed under the GNU \General Public License version 2 or later (at your \option). In addition to Digium's general reservations \of rights, Digium expressly reserves the right to \allow other parties to license this paragraph under \different terms. Any use of Digium, Inc. trademarks or \logos (including \"Asterisk\" or \"Digium\") without \express written permission of Digium, Inc. is prohibited.\n" , .buildopt_sum = "ac1f6a56484a8820659555499174e588" , .load = load_module, .unload = unload_module, .load_pri = AST_MODPRI_DEFAULT, } |
static struct ast_module_info * | ast_module_info = &__mod_info |
static char | qdir [255] |
static char | qdonedir [255] |
Full-featured outgoing call spool support.
Definition in file pbx_spool.c.
anonymous enum |
SPOOL_FLAG_ALWAYS_DELETE |
Always delete the call file after a call succeeds or the maximum number of retries is exceeded, even if the modification time of the call file is in the future. |
SPOOL_FLAG_ARCHIVE |
Definition at line 62 of file pbx_spool.c.
00062 { 00063 /*! Always delete the call file after a call succeeds or the 00064 * maximum number of retries is exceeded, even if the 00065 * modification time of the call file is in the future. 00066 */ 00067 SPOOL_FLAG_ALWAYS_DELETE = (1 << 0), 00068 /* Don't unlink the call file after processing, move in qdonedir */ 00069 SPOOL_FLAG_ARCHIVE = (1 << 1), 00070 };
static void __reg_module | ( | void | ) | [static] |
Definition at line 893 of file pbx_spool.c.
static void __unreg_module | ( | void | ) | [static] |
Definition at line 893 of file pbx_spool.c.
static int apply_outgoing | ( | struct outgoing * | o, | |
FILE * | f | |||
) | [static] |
Definition at line 147 of file pbx_spool.c.
References outgoing::app, app, ast_callerid_split(), ast_log(), ast_parse_allow_disallow(), ast_set2_flag, ast_skip_blanks(), ast_string_field_set, ast_strlen_zero(), ast_trim_blanks(), ast_true(), ast_variable_new(), outgoing::callingpid, cid_name, cid_num, context, outgoing::dest, outgoing::exten, exten, outgoing::fn, outgoing::format, last, LOG_NOTICE, LOG_WARNING, outgoing::maxretries, ast_variable::next, outgoing::options, outgoing::priority, outgoing::retries, outgoing::retrytime, SPOOL_FLAG_ALWAYS_DELETE, SPOOL_FLAG_ARCHIVE, outgoing::tech, var, outgoing::vars, and outgoing::waittime.
Referenced by scan_service().
00148 { 00149 char buf[256]; 00150 char *c, *c2; 00151 int lineno = 0; 00152 struct ast_variable *var, *last = o->vars; 00153 00154 while (last && last->next) { 00155 last = last->next; 00156 } 00157 00158 while(fgets(buf, sizeof(buf), f)) { 00159 lineno++; 00160 /* Trim comments */ 00161 c = buf; 00162 while ((c = strchr(c, '#'))) { 00163 if ((c == buf) || (*(c-1) == ' ') || (*(c-1) == '\t')) 00164 *c = '\0'; 00165 else 00166 c++; 00167 } 00168 00169 c = buf; 00170 while ((c = strchr(c, ';'))) { 00171 if ((c > buf) && (c[-1] == '\\')) { 00172 memmove(c - 1, c, strlen(c) + 1); 00173 c++; 00174 } else { 00175 *c = '\0'; 00176 break; 00177 } 00178 } 00179 00180 /* Trim trailing white space */ 00181 ast_trim_blanks(buf); 00182 if (ast_strlen_zero(buf)) { 00183 continue; 00184 } 00185 c = strchr(buf, ':'); 00186 if (!c) { 00187 ast_log(LOG_NOTICE, "Syntax error at line %d of %s\n", lineno, o->fn); 00188 continue; 00189 } 00190 *c = '\0'; 00191 c = ast_skip_blanks(c + 1); 00192 #if 0 00193 printf("'%s' is '%s' at line %d\n", buf, c, lineno); 00194 #endif 00195 if (!strcasecmp(buf, "channel")) { 00196 if ((c2 = strchr(c, '/'))) { 00197 *c2 = '\0'; 00198 c2++; 00199 ast_string_field_set(o, tech, c); 00200 ast_string_field_set(o, dest, c2); 00201 } else { 00202 ast_log(LOG_NOTICE, "Channel should be in form Tech/Dest at line %d of %s\n", lineno, o->fn); 00203 } 00204 } else if (!strcasecmp(buf, "callerid")) { 00205 char cid_name[80] = {0}, cid_num[80] = {0}; 00206 ast_callerid_split(c, cid_name, sizeof(cid_name), cid_num, sizeof(cid_num)); 00207 ast_string_field_set(o, cid_num, cid_num); 00208 ast_string_field_set(o, cid_name, cid_name); 00209 } else if (!strcasecmp(buf, "application")) { 00210 ast_string_field_set(o, app, c); 00211 } else if (!strcasecmp(buf, "data")) { 00212 ast_string_field_set(o, data, c); 00213 } else if (!strcasecmp(buf, "maxretries")) { 00214 if (sscanf(c, "%30d", &o->maxretries) != 1) { 00215 ast_log(LOG_WARNING, "Invalid max retries at line %d of %s\n", lineno, o->fn); 00216 o->maxretries = 0; 00217 } 00218 } else if (!strcasecmp(buf, "codecs")) { 00219 ast_parse_allow_disallow(NULL, &o->format, c, 1); 00220 } else if (!strcasecmp(buf, "context")) { 00221 ast_string_field_set(o, context, c); 00222 } else if (!strcasecmp(buf, "extension")) { 00223 ast_string_field_set(o, exten, c); 00224 } else if (!strcasecmp(buf, "priority")) { 00225 if ((sscanf(c, "%30d", &o->priority) != 1) || (o->priority < 1)) { 00226 ast_log(LOG_WARNING, "Invalid priority at line %d of %s\n", lineno, o->fn); 00227 o->priority = 1; 00228 } 00229 } else if (!strcasecmp(buf, "retrytime")) { 00230 if ((sscanf(c, "%30d", &o->retrytime) != 1) || (o->retrytime < 1)) { 00231 ast_log(LOG_WARNING, "Invalid retrytime at line %d of %s\n", lineno, o->fn); 00232 o->retrytime = 300; 00233 } 00234 } else if (!strcasecmp(buf, "waittime")) { 00235 if ((sscanf(c, "%30d", &o->waittime) != 1) || (o->waittime < 1)) { 00236 ast_log(LOG_WARNING, "Invalid waittime at line %d of %s\n", lineno, o->fn); 00237 o->waittime = 45; 00238 } 00239 } else if (!strcasecmp(buf, "retry")) { 00240 o->retries++; 00241 } else if (!strcasecmp(buf, "startretry")) { 00242 if (sscanf(c, "%30ld", &o->callingpid) != 1) { 00243 ast_log(LOG_WARNING, "Unable to retrieve calling PID!\n"); 00244 o->callingpid = 0; 00245 } 00246 } else if (!strcasecmp(buf, "endretry") || !strcasecmp(buf, "abortretry")) { 00247 o->callingpid = 0; 00248 o->retries++; 00249 } else if (!strcasecmp(buf, "delayedretry")) { 00250 } else if (!strcasecmp(buf, "setvar") || !strcasecmp(buf, "set")) { 00251 c2 = c; 00252 strsep(&c2, "="); 00253 if (c2) { 00254 var = ast_variable_new(c, c2, o->fn); 00255 if (var) { 00256 /* Always insert at the end, because some people want to treat the spool file as a script */ 00257 if (last) { 00258 last->next = var; 00259 } else { 00260 o->vars = var; 00261 } 00262 last = var; 00263 } 00264 } else 00265 ast_log(LOG_WARNING, "Malformed \"%s\" argument. Should be \"%s: variable=value\"\n", buf, buf); 00266 } else if (!strcasecmp(buf, "account")) { 00267 ast_string_field_set(o, account, c); 00268 } else if (!strcasecmp(buf, "alwaysdelete")) { 00269 ast_set2_flag(&o->options, ast_true(c), SPOOL_FLAG_ALWAYS_DELETE); 00270 } else if (!strcasecmp(buf, "archive")) { 00271 ast_set2_flag(&o->options, ast_true(c), SPOOL_FLAG_ARCHIVE); 00272 } else { 00273 ast_log(LOG_WARNING, "Unknown keyword '%s' at line %d of %s\n", buf, lineno, o->fn); 00274 } 00275 } 00276 if (ast_strlen_zero(o->tech) || ast_strlen_zero(o->dest) || (ast_strlen_zero(o->app) && ast_strlen_zero(o->exten))) { 00277 ast_log(LOG_WARNING, "At least one of app or extension must be specified, along with tech and dest in file %s\n", o->fn); 00278 return -1; 00279 } 00280 return 0; 00281 }
static void* attempt_thread | ( | void * | data | ) | [static] |
Definition at line 358 of file pbx_spool.c.
References outgoing::account, outgoing::app, ast_channel_reason2str(), ast_log(), ast_pbx_outgoing_app(), ast_pbx_outgoing_exten(), ast_strlen_zero(), ast_verb, outgoing::cid_name, outgoing::cid_num, outgoing::context, outgoing::data, outgoing::dest, outgoing::exten, outgoing::fn, outgoing::format, free_outgoing(), LOG_NOTICE, outgoing::maxretries, outgoing::priority, queue_file(), remove_from_queue(), outgoing::retries, outgoing::retrytime, safe_append(), outgoing::tech, outgoing::vars, and outgoing::waittime.
Referenced by launch_service().
00359 { 00360 struct outgoing *o = data; 00361 int res, reason; 00362 if (!ast_strlen_zero(o->app)) { 00363 ast_verb(3, "Attempting call on %s/%s for application %s(%s) (Retry %d)\n", o->tech, o->dest, o->app, o->data, o->retries); 00364 res = ast_pbx_outgoing_app(o->tech, o->format, (void *) o->dest, o->waittime * 1000, o->app, o->data, &reason, 2 /* wait to finish */, o->cid_num, o->cid_name, o->vars, o->account, NULL); 00365 o->vars = NULL; 00366 } else { 00367 ast_verb(3, "Attempting call on %s/%s for %s@%s:%d (Retry %d)\n", o->tech, o->dest, o->exten, o->context,o->priority, o->retries); 00368 res = ast_pbx_outgoing_exten(o->tech, o->format, (void *) o->dest, o->waittime * 1000, o->context, o->exten, o->priority, &reason, 2 /* wait to finish */, o->cid_num, o->cid_name, o->vars, o->account, NULL); 00369 o->vars = NULL; 00370 } 00371 if (res) { 00372 ast_log(LOG_NOTICE, "Call failed to go through, reason (%d) %s\n", reason, ast_channel_reason2str(reason)); 00373 if (o->retries >= o->maxretries + 1) { 00374 /* Max retries exceeded */ 00375 ast_log(LOG_NOTICE, "Queued call to %s/%s expired without completion after %d attempt%s\n", o->tech, o->dest, o->retries - 1, ((o->retries - 1) != 1) ? "s" : ""); 00376 remove_from_queue(o, "Expired"); 00377 } else { 00378 /* Notate that the call is still active */ 00379 safe_append(o, time(NULL), "EndRetry"); 00380 #if defined(HAVE_INOTIFY) || defined(HAVE_KQUEUE) 00381 queue_file(o->fn, time(NULL) + o->retrytime); 00382 #endif 00383 } 00384 } else { 00385 ast_log(LOG_NOTICE, "Call completed to %s/%s\n", o->tech, o->dest); 00386 remove_from_queue(o, "Completed"); 00387 } 00388 free_outgoing(o); 00389 return NULL; 00390 }
static void free_outgoing | ( | struct outgoing * | o | ) | [static] |
Definition at line 104 of file pbx_spool.c.
References ast_free, ast_string_field_free_memory, ast_variables_destroy(), and outgoing::vars.
Referenced by attempt_thread(), launch_service(), new_outgoing(), and scan_service().
00105 { 00106 if (o->vars) { 00107 ast_variables_destroy(o->vars); 00108 } 00109 ast_string_field_free_memory(o); 00110 ast_free(o); 00111 }
static void launch_service | ( | struct outgoing * | o | ) | [static] |
Definition at line 392 of file pbx_spool.c.
References ast_log(), ast_pthread_create_detached, attempt_thread(), free_outgoing(), and LOG_WARNING.
Referenced by scan_service().
00393 { 00394 pthread_t t; 00395 int ret; 00396 00397 if ((ret = ast_pthread_create_detached(&t, NULL, attempt_thread, o))) { 00398 ast_log(LOG_WARNING, "Unable to create thread :( (returned error: %d)\n", ret); 00399 free_outgoing(o); 00400 } 00401 }
static int load_module | ( | void | ) | [static] |
Definition at line 874 of file pbx_spool.c.
References ast_config_AST_SPOOL_DIR, ast_log(), ast_mkdir(), AST_MODULE_LOAD_DECLINE, AST_MODULE_LOAD_FAILURE, AST_MODULE_LOAD_SUCCESS, ast_pthread_create_detached_background, LOG_WARNING, scan_thread(), and thread.
00875 { 00876 pthread_t thread; 00877 int ret; 00878 snprintf(qdir, sizeof(qdir), "%s/%s", ast_config_AST_SPOOL_DIR, "outgoing"); 00879 if (ast_mkdir(qdir, 0777)) { 00880 ast_log(LOG_WARNING, "Unable to create queue directory %s -- outgoing spool disabled\n", qdir); 00881 return AST_MODULE_LOAD_DECLINE; 00882 } 00883 snprintf(qdonedir, sizeof(qdir), "%s/%s", ast_config_AST_SPOOL_DIR, "outgoing_done"); 00884 00885 if ((ret = ast_pthread_create_detached_background(&thread, NULL, scan_thread, NULL))) { 00886 ast_log(LOG_WARNING, "Unable to create thread :( (returned error: %d)\n", ret); 00887 return AST_MODULE_LOAD_FAILURE; 00888 } 00889 00890 return AST_MODULE_LOAD_SUCCESS; 00891 }
static struct outgoing* new_outgoing | ( | const char * | fn | ) | [static, read] |
Definition at line 113 of file pbx_spool.c.
References ast_calloc, AST_FORMAT_SLINEAR, ast_free, ast_set_flag, ast_string_field_init, ast_string_field_set, ast_strlen_zero(), outgoing::fn, outgoing::format, free_outgoing(), outgoing::options, outgoing::priority, outgoing::retrytime, SPOOL_FLAG_ALWAYS_DELETE, and outgoing::waittime.
Referenced by scan_service().
00114 { 00115 struct outgoing *o; 00116 00117 o = ast_calloc(1, sizeof(*o)); 00118 if (!o) { 00119 return NULL; 00120 } 00121 00122 /* Initialize the new object. */ 00123 o->priority = 1; 00124 o->retrytime = 300; 00125 o->waittime = 45; 00126 o->format = AST_FORMAT_SLINEAR; 00127 ast_set_flag(&o->options, SPOOL_FLAG_ALWAYS_DELETE); 00128 if (ast_string_field_init(o, 128)) { 00129 /* 00130 * No need to call free_outgoing here since the failure was to 00131 * allocate string fields and no variables have been allocated 00132 * yet. 00133 */ 00134 ast_free(o); 00135 return NULL; 00136 } 00137 ast_string_field_set(o, fn, fn); 00138 if (ast_strlen_zero(o->fn)) { 00139 /* String field set failed. Since this string is important we must fail. */ 00140 free_outgoing(o); 00141 return NULL; 00142 } 00143 00144 return o; 00145 }
static void queue_created_files | ( | void | ) | [static] |
Definition at line 585 of file pbx_spool.c.
References ast_free, AST_LIST_REMOVE_CURRENT, AST_LIST_TRAVERSE_SAFE_BEGIN, AST_LIST_TRAVERSE_SAFE_END, direntry::mtime, direntry::name, and queue_file().
Referenced by scan_thread().
00586 { 00587 struct direntry *cur; 00588 time_t now = time(NULL); 00589 00590 AST_LIST_TRAVERSE_SAFE_BEGIN(&createlist, cur, list) { 00591 if (cur->mtime > now) { 00592 break; 00593 } 00594 00595 AST_LIST_REMOVE_CURRENT(list); 00596 queue_file(cur->name, 0); 00597 ast_free(cur); 00598 } 00599 AST_LIST_TRAVERSE_SAFE_END 00600 }
static void queue_file | ( | const char * | filename, | |
time_t | when | |||
) | [static] |
Definition at line 488 of file pbx_spool.c.
References ast_alloca, ast_calloc, AST_LIST_EMPTY, AST_LIST_INSERT_BEFORE_CURRENT, AST_LIST_INSERT_HEAD, AST_LIST_INSERT_TAIL, AST_LIST_LOCK, AST_LIST_TRAVERSE, AST_LIST_TRAVERSE_SAFE_BEGIN, AST_LIST_TRAVERSE_SAFE_END, AST_LIST_UNLOCK, ast_log(), errno, LOG_WARNING, direntry::mtime, direntry::name, and scan_service().
Referenced by attempt_thread(), queue_created_files(), queue_file_write(), and scan_thread().
00489 { 00490 struct stat st; 00491 struct direntry *cur, *new; 00492 int res; 00493 time_t now = time(NULL); 00494 00495 if (!strchr(filename, '/')) { 00496 char *fn = ast_alloca(strlen(qdir) + strlen(filename) + 2); 00497 sprintf(fn, "%s/%s", qdir, filename); /* SAFE */ 00498 filename = fn; 00499 } 00500 00501 if (when == 0) { 00502 if (stat(filename, &st)) { 00503 ast_log(LOG_WARNING, "Unable to stat %s: %s\n", filename, strerror(errno)); 00504 return; 00505 } 00506 00507 if (!S_ISREG(st.st_mode)) { 00508 return; 00509 } 00510 00511 when = st.st_mtime; 00512 } 00513 00514 /* Need to check the existing list in order to avoid duplicates. */ 00515 AST_LIST_LOCK(&dirlist); 00516 AST_LIST_TRAVERSE(&dirlist, cur, list) { 00517 if (cur->mtime == when && !strcmp(filename, cur->name)) { 00518 AST_LIST_UNLOCK(&dirlist); 00519 return; 00520 } 00521 } 00522 00523 if ((res = when) > now || (res = scan_service(filename, now)) > 0) { 00524 if (!(new = ast_calloc(1, sizeof(*new) + strlen(filename) + 1))) { 00525 AST_LIST_UNLOCK(&dirlist); 00526 return; 00527 } 00528 new->mtime = res; 00529 strcpy(new->name, filename); 00530 /* List is ordered by mtime */ 00531 if (AST_LIST_EMPTY(&dirlist)) { 00532 AST_LIST_INSERT_HEAD(&dirlist, new, list); 00533 } else { 00534 int found = 0; 00535 AST_LIST_TRAVERSE_SAFE_BEGIN(&dirlist, cur, list) { 00536 if (cur->mtime > new->mtime) { 00537 AST_LIST_INSERT_BEFORE_CURRENT(new, list); 00538 found = 1; 00539 break; 00540 } 00541 } 00542 AST_LIST_TRAVERSE_SAFE_END 00543 if (!found) { 00544 AST_LIST_INSERT_TAIL(&dirlist, new, list); 00545 } 00546 } 00547 } 00548 AST_LIST_UNLOCK(&dirlist); 00549 }
static void queue_file_create | ( | const char * | filename | ) | [static] |
Definition at line 552 of file pbx_spool.c.
References ast_calloc, AST_LIST_INSERT_TAIL, AST_LIST_TRAVERSE, direntry::mtime, and direntry::name.
Referenced by scan_thread().
00553 { 00554 struct direntry *cur; 00555 00556 AST_LIST_TRAVERSE(&createlist, cur, list) { 00557 if (!strcmp(cur->name, filename)) { 00558 return; 00559 } 00560 } 00561 00562 if (!(cur = ast_calloc(1, sizeof(*cur) + strlen(filename) + 1))) { 00563 return; 00564 } 00565 strcpy(cur->name, filename); 00566 /* We'll handle this file unless an IN_OPEN event occurs within 2 seconds */ 00567 cur->mtime = time(NULL) + 2; 00568 AST_LIST_INSERT_TAIL(&createlist, cur, list); 00569 }
static void queue_file_open | ( | const char * | filename | ) | [static] |
Definition at line 571 of file pbx_spool.c.
References AST_LIST_INSERT_TAIL, AST_LIST_REMOVE_CURRENT, AST_LIST_TRAVERSE_SAFE_BEGIN, AST_LIST_TRAVERSE_SAFE_END, and direntry::name.
Referenced by scan_thread().
00572 { 00573 struct direntry *cur; 00574 00575 AST_LIST_TRAVERSE_SAFE_BEGIN(&createlist, cur, list) { 00576 if (!strcmp(cur->name, filename)) { 00577 AST_LIST_REMOVE_CURRENT(list); 00578 AST_LIST_INSERT_TAIL(&openlist, cur, list); 00579 break; 00580 } 00581 } 00582 AST_LIST_TRAVERSE_SAFE_END 00583 }
static void queue_file_write | ( | const char * | filename | ) | [static] |
Definition at line 602 of file pbx_spool.c.
References ast_free, AST_LIST_REMOVE_CURRENT, AST_LIST_TRAVERSE_SAFE_BEGIN, AST_LIST_TRAVERSE_SAFE_END, direntry::name, and queue_file().
Referenced by scan_thread().
00603 { 00604 struct direntry *cur; 00605 /* Only queue entries where an IN_CREATE preceded the IN_CLOSE_WRITE */ 00606 AST_LIST_TRAVERSE_SAFE_BEGIN(&openlist, cur, list) { 00607 if (!strcmp(cur->name, filename)) { 00608 AST_LIST_REMOVE_CURRENT(list); 00609 ast_free(cur); 00610 queue_file(filename, 0); 00611 break; 00612 } 00613 } 00614 AST_LIST_TRAVERSE_SAFE_END 00615 }
static int remove_from_queue | ( | struct outgoing * | o, | |
const char * | status | |||
) | [static] |
Remove a call file from the outgoing queue optionally moving it in the archive dir.
o | the pointer to outgoing struct | |
status | the exit status of the call. Can be "Completed", "Failed" or "Expired" |
Definition at line 307 of file pbx_spool.c.
References ast_log(), ast_mkdir(), ast_test_flag, f, outgoing::fn, LOG_WARNING, outgoing::options, SPOOL_FLAG_ALWAYS_DELETE, and SPOOL_FLAG_ARCHIVE.
Referenced by attempt_thread(), and scan_service().
00308 { 00309 FILE *f; 00310 char newfn[256]; 00311 const char *bname; 00312 00313 if (!ast_test_flag(&o->options, SPOOL_FLAG_ALWAYS_DELETE)) { 00314 struct stat current_file_status; 00315 00316 if (!stat(o->fn, ¤t_file_status)) { 00317 if (time(NULL) < current_file_status.st_mtime) { 00318 return 0; 00319 } 00320 } 00321 } 00322 00323 if (!ast_test_flag(&o->options, SPOOL_FLAG_ARCHIVE)) { 00324 unlink(o->fn); 00325 return 0; 00326 } 00327 00328 if (ast_mkdir(qdonedir, 0777)) { 00329 ast_log(LOG_WARNING, "Unable to create queue directory %s -- outgoing spool archiving disabled\n", qdonedir); 00330 unlink(o->fn); 00331 return -1; 00332 } 00333 00334 if (!(bname = strrchr(o->fn, '/'))) { 00335 bname = o->fn; 00336 } else { 00337 bname++; 00338 } 00339 00340 snprintf(newfn, sizeof(newfn), "%s/%s", qdonedir, bname); 00341 /* If there is already a call file with the name in the archive dir, it will be overwritten. */ 00342 unlink(newfn); 00343 if (rename(o->fn, newfn) != 0) { 00344 unlink(o->fn); 00345 return -1; 00346 } 00347 00348 /* Only append to the file AFTER we move it out of the watched directory, 00349 * otherwise the fclose() causes another event for inotify(7) */ 00350 if ((f = fopen(newfn, "a"))) { 00351 fprintf(f, "Status: %s\n", status); 00352 fclose(f); 00353 } 00354 00355 return 0; 00356 }
static void safe_append | ( | struct outgoing * | o, | |
time_t | now, | |||
char * | s | |||
) | [static] |
Definition at line 283 of file pbx_spool.c.
References ast_debug, ast_log(), ast_mainpid, outgoing::dest, errno, f, outgoing::fn, LOG_WARNING, outgoing::retries, outgoing::retrytime, and outgoing::tech.
Referenced by attempt_thread(), and scan_service().
00284 { 00285 FILE *f; 00286 struct utimbuf tbuf = { .actime = now, .modtime = now + o->retrytime }; 00287 00288 ast_debug(1, "Outgoing %s/%s: %s\n", o->tech, o->dest, s); 00289 00290 if ((f = fopen(o->fn, "a"))) { 00291 fprintf(f, "\n%s: %ld %d (%ld)\n", s, (long)ast_mainpid, o->retries, (long) now); 00292 fclose(f); 00293 } 00294 00295 /* Update the file time */ 00296 if (utime(o->fn, &tbuf)) { 00297 ast_log(LOG_WARNING, "Unable to set utime on %s: %s\n", o->fn, strerror(errno)); 00298 } 00299 }
static int scan_service | ( | const char * | fn, | |
time_t | now | |||
) | [static] |
Definition at line 404 of file pbx_spool.c.
References apply_outgoing(), ast_debug, ast_log(), ast_mainpid, outgoing::callingpid, outgoing::dest, errno, f, outgoing::fn, free_outgoing(), launch_service(), LOG_DEBUG, LOG_NOTICE, LOG_WARNING, outgoing::maxretries, new_outgoing(), remove_from_queue(), outgoing::retries, outgoing::retrytime, safe_append(), and outgoing::tech.
Referenced by queue_file().
00405 { 00406 struct outgoing *o; 00407 FILE *f; 00408 int res; 00409 00410 o = new_outgoing(fn); 00411 if (!o) { 00412 return -1; 00413 } 00414 00415 /* Attempt to open the file */ 00416 f = fopen(o->fn, "r"); 00417 if (!f) { 00418 #if defined(HAVE_INOTIFY) || defined(HAVE_KQUEUE) 00419 /*! 00420 * \todo XXX There is some odd delayed duplicate servicing of 00421 * call files going on. We need to suppress the error message 00422 * if the file does not exist as a result. 00423 */ 00424 if (errno != ENOENT) 00425 #endif 00426 { 00427 ast_log(LOG_WARNING, "Unable to open %s: '%s'(%d), deleting\n", 00428 o->fn, strerror(errno), (int) errno); 00429 } 00430 remove_from_queue(o, "Failed"); 00431 free_outgoing(o); 00432 return -1; 00433 } 00434 00435 /* Read in and verify the contents */ 00436 res = apply_outgoing(o, f); 00437 fclose(f); 00438 if (res) { 00439 ast_log(LOG_WARNING, "Invalid file contents in %s, deleting\n", o->fn); 00440 remove_from_queue(o, "Failed"); 00441 free_outgoing(o); 00442 return -1; 00443 } 00444 00445 ast_debug(1, "Filename: %s, Retries: %d, max: %d\n", o->fn, o->retries, o->maxretries); 00446 if (o->retries <= o->maxretries) { 00447 now += o->retrytime; 00448 if (o->callingpid && (o->callingpid == ast_mainpid)) { 00449 safe_append(o, time(NULL), "DelayedRetry"); 00450 ast_log(LOG_DEBUG, "Delaying retry since we're currently running '%s'\n", o->fn); 00451 free_outgoing(o); 00452 } else { 00453 /* Increment retries */ 00454 o->retries++; 00455 /* If someone else was calling, they're presumably gone now 00456 so abort their retry and continue as we were... */ 00457 if (o->callingpid) 00458 safe_append(o, time(NULL), "AbortRetry"); 00459 00460 safe_append(o, now, "StartRetry"); 00461 launch_service(o); 00462 } 00463 return now; 00464 } 00465 00466 ast_log(LOG_NOTICE, "Queued call to %s/%s expired without completion after %d attempt%s\n", 00467 o->tech, o->dest, o->retries - 1, ((o->retries - 1) != 1) ? "s" : ""); 00468 remove_from_queue(o, "Expired"); 00469 free_outgoing(o); 00470 return 0; 00471 }
static void* scan_thread | ( | void * | unused | ) | [static] |
Definition at line 618 of file pbx_spool.c.
References ast_debug, ast_free, ast_fully_booted, AST_LIST_EMPTY, AST_LIST_FIRST, AST_LIST_LOCK, AST_LIST_REMOVE_HEAD, AST_LIST_UNLOCK, ast_log(), errno, HAVE_INOTIFY, inotify_fd, len(), LOG_ERROR, direntry::mtime, direntry::name, direntry::next, queue_created_files(), queue_file(), queue_file_create(), queue_file_open(), and queue_file_write().
Referenced by load_module().
00619 { 00620 DIR *dir; 00621 struct dirent *de; 00622 time_t now; 00623 struct timespec ts = { .tv_sec = 1 }; 00624 #ifdef HAVE_INOTIFY 00625 ssize_t res; 00626 int inotify_fd = inotify_init(); 00627 struct inotify_event *iev; 00628 char buf[8192] __attribute__((aligned (sizeof(int)))); 00629 struct pollfd pfd = { .fd = inotify_fd, .events = POLLIN }; 00630 #else 00631 struct timespec nowait = { .tv_sec = 0, .tv_nsec = 1 }; 00632 int inotify_fd = kqueue(); 00633 struct kevent kev; 00634 struct kevent event; 00635 #endif 00636 struct direntry *cur; 00637 00638 while (!ast_fully_booted) { 00639 nanosleep(&ts, NULL); 00640 } 00641 00642 if (inotify_fd < 0) { 00643 ast_log(LOG_ERROR, "Unable to initialize " 00644 #ifdef HAVE_INOTIFY 00645 "inotify(7)" 00646 #else 00647 "kqueue(2)" 00648 #endif 00649 "\n"); 00650 return NULL; 00651 } 00652 00653 #ifdef HAVE_INOTIFY 00654 inotify_add_watch(inotify_fd, qdir, IN_CREATE | IN_OPEN | IN_CLOSE_WRITE | IN_MOVED_TO); 00655 #endif 00656 00657 /* First, run through the directory and clear existing entries */ 00658 if (!(dir = opendir(qdir))) { 00659 ast_log(LOG_ERROR, "Unable to open directory %s: %s\n", qdir, strerror(errno)); 00660 return NULL; 00661 } 00662 00663 #ifndef HAVE_INOTIFY 00664 EV_SET(&kev, dirfd(dir), EVFILT_VNODE, EV_ADD | EV_ENABLE | EV_CLEAR, NOTE_WRITE, 0, NULL); 00665 if (kevent(inotify_fd, &kev, 1, &event, 1, &nowait) < 0 && errno != 0) { 00666 ast_log(LOG_ERROR, "Unable to watch directory %s: %s\n", qdir, strerror(errno)); 00667 } 00668 #endif 00669 now = time(NULL); 00670 while ((de = readdir(dir))) { 00671 queue_file(de->d_name, 0); 00672 } 00673 00674 #ifdef HAVE_INOTIFY 00675 /* Directory needs to remain open for kqueue(2) */ 00676 closedir(dir); 00677 #endif 00678 00679 /* Wait for either a) next timestamp to occur, or b) a change to happen */ 00680 for (;/* ever */;) { 00681 time_t next = AST_LIST_EMPTY(&dirlist) ? INT_MAX : AST_LIST_FIRST(&dirlist)->mtime; 00682 00683 time(&now); 00684 if (next > now) { 00685 #ifdef HAVE_INOTIFY 00686 int stage = 0; 00687 /* Convert from seconds to milliseconds, unless there's nothing 00688 * in the queue already, in which case, we wait forever. */ 00689 int waittime = next == INT_MAX ? -1 : (next - now) * 1000; 00690 if (!AST_LIST_EMPTY(&createlist)) { 00691 waittime = 1000; 00692 } 00693 /* When a file arrives, add it to the queue, in mtime order. */ 00694 if ((res = poll(&pfd, 1, waittime)) > 0 && (stage = 1) && 00695 (res = read(inotify_fd, &buf, sizeof(buf))) >= sizeof(*iev)) { 00696 ssize_t len = 0; 00697 /* File(s) added to directory, add them to my list */ 00698 for (iev = (void *) buf; res >= sizeof(*iev); iev = (struct inotify_event *) (((char *) iev) + len)) { 00699 /* For an IN_MOVED_TO event, simply process the file. However, if 00700 * we get an IN_CREATE event it *might* be an open(O_CREAT) or it 00701 * might be a hardlink (like smsq does, since rename() might 00702 * overwrite an existing file). So we have to see if we get a 00703 * subsequent IN_OPEN event on the same file. If we do, keep it 00704 * on the openlist and wait for the corresponding IN_CLOSE_WRITE. 00705 * If we *don't* see an IN_OPEN event, then it was a hard link so 00706 * it can be processed immediately. 00707 * 00708 * Unfortunately, although open(O_CREAT) is an atomic file system 00709 * operation, the inotify subsystem doesn't give it to us in a 00710 * single event with both IN_CREATE|IN_OPEN set. It's two separate 00711 * events, and the kernel doesn't even give them to us at the same 00712 * time. We can read() from inotify_fd after the IN_CREATE event, 00713 * and get *nothing* from it. The IN_OPEN arrives only later! So 00714 * we have a very short timeout of 2 seconds. */ 00715 if (iev->mask & IN_CREATE) { 00716 queue_file_create(iev->name); 00717 } else if (iev->mask & IN_OPEN) { 00718 queue_file_open(iev->name); 00719 } else if (iev->mask & IN_CLOSE_WRITE) { 00720 queue_file_write(iev->name); 00721 } else if (iev->mask & IN_MOVED_TO) { 00722 queue_file(iev->name, 0); 00723 } else { 00724 ast_log(LOG_ERROR, "Unexpected event %d for file '%s'\n", (int) iev->mask, iev->name); 00725 } 00726 00727 len = sizeof(*iev) + iev->len; 00728 res -= len; 00729 } 00730 } else if (res < 0 && errno != EINTR && errno != EAGAIN) { 00731 ast_debug(1, "Got an error back from %s(2): %s\n", stage ? "read" : "poll", strerror(errno)); 00732 } 00733 time(&now); 00734 } 00735 queue_created_files(); 00736 #else 00737 int num_events; 00738 /* If queue empty then wait forever */ 00739 if (next == INT_MAX) { 00740 num_events = kevent(inotify_fd, &kev, 1, &event, 1, NULL); 00741 } else { 00742 struct timespec ts2 = { .tv_sec = (unsigned long int)(next - now), .tv_nsec = 0 }; 00743 num_events = kevent(inotify_fd, &kev, 1, &event, 1, &ts2); 00744 } 00745 if ((num_events < 0) || (event.flags == EV_ERROR)) { 00746 ast_debug(10, "KEvent error %s\n", strerror(errno)); 00747 continue; 00748 } else if (num_events == 0) { 00749 /* Interrupt or timeout, restart calculations */ 00750 continue; 00751 } else { 00752 /* Directory changed, rescan */ 00753 rewinddir(dir); 00754 while ((de = readdir(dir))) { 00755 queue_file(de->d_name, 0); 00756 } 00757 } 00758 time(&now); 00759 } 00760 #endif 00761 00762 /* Empty the list of all entries ready to be processed */ 00763 AST_LIST_LOCK(&dirlist); 00764 while (!AST_LIST_EMPTY(&dirlist) && AST_LIST_FIRST(&dirlist)->mtime <= now) { 00765 cur = AST_LIST_REMOVE_HEAD(&dirlist, list); 00766 queue_file(cur->name, cur->mtime); 00767 ast_free(cur); 00768 } 00769 AST_LIST_UNLOCK(&dirlist); 00770 } 00771 return NULL; 00772 }
static int unload_module | ( | void | ) | [static] |
Definition at line 869 of file pbx_spool.c.
struct ast_module_info __mod_info = { .name = AST_MODULE, .flags = AST_MODFLAG_LOAD_ORDER , .description = "Outgoing Spool Support" , .key = "This paragraph is copyright (c) 2006 by Digium, Inc. \In order for your module to load, it must return this \key via a function called \"key\". Any code which \includes this paragraph must be licensed under the GNU \General Public License version 2 or later (at your \option). In addition to Digium's general reservations \of rights, Digium expressly reserves the right to \allow other parties to license this paragraph under \different terms. Any use of Digium, Inc. trademarks or \logos (including \"Asterisk\" or \"Digium\") without \express written permission of Digium, Inc. is prohibited.\n" , .buildopt_sum = "ac1f6a56484a8820659555499174e588" , .load = load_module, .unload = unload_module, .load_pri = AST_MODPRI_DEFAULT, } [static] |
Definition at line 893 of file pbx_spool.c.
struct ast_module_info* ast_module_info = &__mod_info [static] |
Definition at line 893 of file pbx_spool.c.
char qdir[255] [static] |
Definition at line 72 of file pbx_spool.c.
char qdonedir[255] [static] |
Definition at line 73 of file pbx_spool.c.