Mercurial > libpst
changeset 200:d360f96f71f6
start changes for parallel readpst on multi-processor machines
author | Carl Byington <carl@five-ten-sg.com> |
---|---|
date | Wed, 13 May 2009 20:06:53 -0700 |
parents | e3a46f66332b |
children | 3850a3b11745 |
files | ChangeLog TODO configure.in regression/regression-tests.bash src/define.h src/readpst.c |
diffstat | 6 files changed, 134 insertions(+), 17 deletions(-) [+] |
line wrap: on
line diff
--- a/ChangeLog Wed May 13 11:59:55 2009 -0700 +++ b/ChangeLog Wed May 13 20:06:53 2009 -0700 @@ -9,6 +9,8 @@ * removed contact->access_method since we don't have a mapi element for it. * changed pst_attach_to_mem to return pst_binary structure. * decode more recurrence mapi elements. + * change interfaces to be thread safe. + * readpst changes for parallel operation on multi processor machines. LibPST 0.6.37 (2009-04-17) ===============================
--- a/TODO Wed May 13 11:59:55 2009 -0700 +++ b/TODO Wed May 13 20:06:53 2009 -0700 @@ -6,8 +6,4 @@ At the next soname bump (to libpst.so.5) we should remove readpstlog, and produce ascii debug log files - try fork() with child limit to get parallel readpst move some of readpst into the shared library, in particular write_normal_email() - add helper function item_actual_type (mail, contact, etc) - change gmtime() and ctime() to _r versions for thread safe operation - <http://www.geocities.com/cainrandom/dev/MAPIRecurrence.html>
--- a/configure.in Wed May 13 11:59:55 2009 -0700 +++ b/configure.in Wed May 13 20:06:53 2009 -0700 @@ -133,18 +133,23 @@ ) AC_HEADER_DIRENT AC_HEADER_STDC -AC_CHECK_HEADERS([ctype.h dirent.h errno.h fcntl.h inttypes.h limits.h regex.h signal.h stdarg.h stdint.h stdio.h stdlib.h string.h sys/param.h sys/stat.h sys/types.h time.h unistd.h wchar.h]) +AC_CHECK_HEADERS([ctype.h dirent.h errno.h fcntl.h inttypes.h limits.h regex.h semaphore.h signal.h stdarg.h stdint.h stdio.h stdlib.h string.h sys/param.h sys/stat.h sys/types.h time.h unistd.h wchar.h]) +AC_SEARCH_LIBS([sem_init],rt) # Checks for typedefs, structures, and compiler characteristics. AC_HEADER_STDBOOL +AC_HEADER_SYS_WAIT AC_C_CONST +AC_C_INLINE AC_TYPE_OFF_T AC_TYPE_SIZE_T +AC_TYPE_PID_T AC_STRUCT_TM # Checks for library functions. +AC_FUNC_FORK AC_FUNC_FSEEKO AC_FUNC_STAT AC_FUNC_LSTAT
--- a/regression/regression-tests.bash Wed May 13 11:59:55 2009 -0700 +++ b/regression/regression-tests.bash Wed May 13 20:06:53 2009 -0700 @@ -53,8 +53,9 @@ mkdir output$n # ../src/readpst -cv -o output$n $fn >$ba.err 2>&1 # readpst -cv -o output$n -d dumper $fn >$ba.err 2>&1 - $val ../src/readpst -r -D -cv -o output$n -d dumper $fn >$ba.err 2>&1 - ../src/readpstlog -f I dumper >$ba.log + $val ../src/readpst -r -D -cv -o output$n $fn + #$val ../src/readpst -r -D -cv -o output$n -d dumper $fn >$ba.err 2>&1 + # ../src/readpstlog -f I dumper >$ba.log #../src/getidblock -d -p $fn 0 >$ba.fulldump #../src/readpstlog -f I getidblock.log >$ba.fulldump.log @@ -117,7 +118,7 @@ #dopst 14 joe.romanowski.pst #dopst 15 hourig1.pst ##dopst 16 hourig2.pst - ##dopst 17 hourig3.pst + dopst 17 hourig3.pst #dopst 18 test-mac.pst ##dopst 19 harris.pst #dopst 20 spam.pst
--- a/src/define.h Wed May 13 11:59:55 2009 -0700 +++ b/src/define.h Wed May 13 20:06:53 2009 -0700 @@ -136,10 +136,18 @@ #include <sys/types.h> #endif +#ifdef HAVE_SYS_WAIT_H + #include <sys/wait.h> +#endif + #ifdef HAVE_DIRENT_H #include <dirent.h> #endif +#ifdef HAVE_SEMAPHORE_H + #include <semaphore.h> +#endif + void pst_debug(const char *fmt, ...); void pst_debug_hexdumper(FILE* out, char* buf, size_t size, int col, int delta); @@ -158,14 +166,16 @@ #define LOGSTOP() {MESSAGESTOP();DEBUGSTOP();} -#define DIE(x) {\ - MESSAGEPRINT(x, 0);\ - printf x;\ - exit(EXIT_FAILURE);\ +#define DIE(x) { \ + MESSAGEPRINT(x, 0); \ + printf x; \ + fflush(stdout); \ + exit(EXIT_FAILURE); \ } -#define WARN(x) {\ - MESSAGEPRINT(x, 0);\ - printf x;\ +#define WARN(x) { \ + MESSAGEPRINT(x, 0); \ + printf x; \ + fflush(stdout); \ } #ifdef DEBUGPRINT
--- a/src/readpst.c Wed May 13 11:59:55 2009 -0700 +++ b/src/readpst.c Wed May 13 20:06:53 2009 -0700 @@ -26,6 +26,8 @@ int32_t type; }; +void grim_reaper(); +pid_t try_fork(); void process(pst_item *outeritem, pst_desc_tree *d_ptr); void write_email_body(FILE *f, char *body); void removeCR(char *c); @@ -119,6 +121,77 @@ pst_file pstfile; regex_t meta_charset_pattern; +int active_children; // number of children of this process, cannot be larger than max_children +int max_children; // setup by main(), and at the start of new child process +pid_t *child_processes; // setup by main(), and at the start of new child process + +#ifdef HAVE_SEMAPHORE_H +sem_t global_children; +#endif + + +void grim_reaper(int waitall) +{ +#ifdef HAVE_FORK +#ifdef HAVE_SEMAPHORE_H + printf("grim reaper %s for pid %d (parent %d) with %d children\n", (waitall) ? "all" : "", getpid(), getppid(), active_children); + fflush(stdout); + int i,j; + for (i=0; i<active_children; i++) { + pid_t child = child_processes[i]; + pid_t ch = waitpid(child, NULL, ((waitall) ? 0 : WNOHANG)); + if (ch == child) { + // this has terminated, remove it from the list + for (j=i; j<active_children-1; j++) { + child_processes[j] = child_processes[j+1]; + } + active_children--; + i--; + } + } + printf("grim reaper %s for pid %d with %d children\n", (waitall) ? "all" : "", getpid(), active_children); + fflush(stdout); +#endif +#endif +} + + +pid_t try_fork() +{ +#ifdef HAVE_FORK +#ifdef HAVE_SEMAPHORE_H + int available; + grim_reaper(0); + sem_getvalue(&global_children, &available); + if (available) { + sem_wait(&global_children); + pid_t child = fork(); + if (child < 0) { + // fork failed, pretend it worked and we are the child + return 0; + } + else if (child == 0) { + pid_t me = getpid(); + printf("forked child pid %d \n", me); + fflush(stdout); + // fork worked, and we are the child, reinitialize *our* list of children + active_children = 0; + memset(child_processes, 0, sizeof(pid_t) * max_children); + } + else { + // fork worked, and we are the parent, record this child that we need to wait for + child_processes[active_children++] = child; + } + return child; + } + else { + return 0; // pretend to have forked and we are the child + } +#endif +#endif + return 0; +} + void process(pst_item *outeritem, pst_desc_tree *d_ptr) { @@ -154,10 +227,29 @@ if (item->folder && item->file_as.str) { DEBUG_MAIN(("Processing Folder \"%s\"\n", item->file_as.str)); if (output_mode != OUTPUT_QUIET) printf("Processing Folder \"%s\"\n", item->file_as.str); + fflush(stdout); ff.item_count++; if (d_ptr->child && (deleted_mode == DMODE_INCLUDE || strcasecmp(item->file_as.str, "Deleted Items"))) { //if this is a non-empty folder other than deleted items, we want to recurse into it - process(item, d_ptr->child); + pid_t parent = getpid(); + pid_t child = try_fork(); + if (child == 0) { + // we are the child process, or the original parent if no children were available + pid_t me = getpid(); + process(item, d_ptr->child); +#ifdef HAVE_FORK +#ifdef HAVE_SEMAPHORE_H + if (me != parent) { + // we really were a child, forked for the sole purpose of processing this folder + // free my child count slot before really exiting, since + // all I am doing here is waiting for my children to exit + sem_post(&global_children); + grim_reaper(1); // wait for all my child processes to exit + exit(0); // really exit + } +#endif +#endif + } } } else if (item->contact && (item->type == PST_TYPE_CONTACT)) { @@ -382,7 +474,16 @@ DIE(("Top of folders record not found. Cannot continue\n")); } + max_children = (d_log) ? 0 : 10; + child_processes = (pid_t *)pst_malloc(sizeof(pid_t) * max_children); + active_children = 0; + memset(child_processes, 0, sizeof(pid_t) * max_children); +#ifdef HAVE_SEMAPHORE_H + sem_init(&global_children, 1, max_children); +#endif process(item, d_ptr->child); // do the children of TOPF + grim_reaper(1); // wait for all child processes + pst_freeItem(item); pst_close(&pstfile); DEBUG_RET(); @@ -1229,7 +1330,9 @@ if (!has_date && item->email->sent_date) { char c_time[C_TIME_SIZE]; - strftime(c_time, C_TIME_SIZE, "%a, %d %b %Y %H:%M:%S %z", gmtime(&em_time)); + struct tm stm; + gmtime_r(&em_time, &stm); + strftime(c_time, C_TIME_SIZE, "%a, %d %b %Y %H:%M:%S %z", &stm); fprintf(f_output, "Date: %s\n", c_time); }