changeset 200:d360f96f71f6

start changes for parallel readpst on multi-processor machines
author Carl Byington <carl@five-ten-sg.com>
date Wed, 13 May 2009 20:06:53 -0700
parents e3a46f66332b
children 3850a3b11745
files ChangeLog TODO configure.in regression/regression-tests.bash src/define.h src/readpst.c
diffstat 6 files changed, 134 insertions(+), 17 deletions(-) [+]
line wrap: on
line diff
--- a/ChangeLog	Wed May 13 11:59:55 2009 -0700
+++ b/ChangeLog	Wed May 13 20:06:53 2009 -0700
@@ -9,6 +9,8 @@
     * removed contact->access_method since we don't have a mapi element for it.
     * changed pst_attach_to_mem to return pst_binary structure.
     * decode more recurrence mapi elements.
+    * change interfaces to be thread safe.
+    * readpst changes for parallel operation on multi processor machines.
 
 LibPST 0.6.37 (2009-04-17)
 ===============================
--- a/TODO	Wed May 13 11:59:55 2009 -0700
+++ b/TODO	Wed May 13 20:06:53 2009 -0700
@@ -6,8 +6,4 @@
 
 At the next soname bump (to libpst.so.5) we should
     remove readpstlog, and produce ascii debug log files
-    try fork() with child limit to get parallel readpst
     move some of readpst into the shared library, in particular write_normal_email()
-    add helper function item_actual_type (mail, contact, etc)
-    change gmtime() and ctime() to _r versions for thread safe operation
-    <http://www.geocities.com/cainrandom/dev/MAPIRecurrence.html>
--- a/configure.in	Wed May 13 11:59:55 2009 -0700
+++ b/configure.in	Wed May 13 20:06:53 2009 -0700
@@ -133,18 +133,23 @@
     )
 AC_HEADER_DIRENT
 AC_HEADER_STDC
-AC_CHECK_HEADERS([ctype.h dirent.h errno.h fcntl.h inttypes.h limits.h regex.h signal.h stdarg.h stdint.h stdio.h stdlib.h string.h sys/param.h sys/stat.h sys/types.h time.h unistd.h wchar.h])
+AC_CHECK_HEADERS([ctype.h dirent.h errno.h fcntl.h inttypes.h limits.h regex.h semaphore.h signal.h stdarg.h stdint.h stdio.h stdlib.h string.h sys/param.h sys/stat.h sys/types.h time.h unistd.h wchar.h])
+AC_SEARCH_LIBS([sem_init],rt)
 
 
 # Checks for typedefs, structures, and compiler characteristics.
 AC_HEADER_STDBOOL
+AC_HEADER_SYS_WAIT
 AC_C_CONST
+AC_C_INLINE
 AC_TYPE_OFF_T
 AC_TYPE_SIZE_T
+AC_TYPE_PID_T
 AC_STRUCT_TM
 
 
 # Checks for library functions.
+AC_FUNC_FORK
 AC_FUNC_FSEEKO
 AC_FUNC_STAT
 AC_FUNC_LSTAT
--- a/regression/regression-tests.bash	Wed May 13 11:59:55 2009 -0700
+++ b/regression/regression-tests.bash	Wed May 13 20:06:53 2009 -0700
@@ -53,8 +53,9 @@
     mkdir output$n
     #    ../src/readpst -cv -o output$n $fn >$ba.err 2>&1
     #           readpst -cv -o output$n -d dumper $fn >$ba.err 2>&1
-    $val ../src/readpst -r -D -cv -o output$n -d dumper $fn >$ba.err 2>&1
-         ../src/readpstlog -f I dumper >$ba.log
+    $val ../src/readpst -r -D -cv -o output$n  $fn
+    #$val ../src/readpst -r -D -cv -o output$n -d dumper $fn >$ba.err 2>&1
+    #     ../src/readpstlog -f I dumper >$ba.log
 
     #../src/getidblock -d -p $fn 0 >$ba.fulldump
     #../src/readpstlog -f I getidblock.log >$ba.fulldump.log
@@ -117,7 +118,7 @@
     #dopst  14 joe.romanowski.pst
     #dopst  15 hourig1.pst
     ##dopst  16 hourig2.pst
-    ##dopst  17 hourig3.pst
+    dopst  17 hourig3.pst
     #dopst  18 test-mac.pst
     ##dopst  19 harris.pst
     #dopst  20 spam.pst
--- a/src/define.h	Wed May 13 11:59:55 2009 -0700
+++ b/src/define.h	Wed May 13 20:06:53 2009 -0700
@@ -136,10 +136,18 @@
     #include <sys/types.h>
 #endif
 
+#ifdef HAVE_SYS_WAIT_H
+    #include <sys/wait.h>
+#endif
+
 #ifdef HAVE_DIRENT_H
     #include <dirent.h>
 #endif
 
+#ifdef HAVE_SEMAPHORE_H
+    #include <semaphore.h>
+#endif
+
 
 void  pst_debug(const char *fmt, ...);
 void  pst_debug_hexdumper(FILE* out, char* buf, size_t size, int col, int delta);
@@ -158,14 +166,16 @@
 
 #define LOGSTOP() {MESSAGESTOP();DEBUGSTOP();}
 
-#define DIE(x) {\
- MESSAGEPRINT(x, 0);\
- printf x;\
- exit(EXIT_FAILURE);\
+#define DIE(x) {            \
+    MESSAGEPRINT(x, 0);     \
+    printf x;               \
+    fflush(stdout);         \
+    exit(EXIT_FAILURE);     \
 }
-#define WARN(x) {\
- MESSAGEPRINT(x, 0);\
- printf x;\
+#define WARN(x) {           \
+    MESSAGEPRINT(x, 0);     \
+    printf x;               \
+    fflush(stdout);         \
 }
 
 #ifdef DEBUGPRINT
--- a/src/readpst.c	Wed May 13 11:59:55 2009 -0700
+++ b/src/readpst.c	Wed May 13 20:06:53 2009 -0700
@@ -26,6 +26,8 @@
     int32_t type;
 };
 
+void      grim_reaper();
+pid_t     try_fork();
 void      process(pst_item *outeritem, pst_desc_tree *d_ptr);
 void      write_email_body(FILE *f, char *body);
 void      removeCR(char *c);
@@ -119,6 +121,77 @@
 pst_file pstfile;
 regex_t  meta_charset_pattern;
 
+int active_children;    // number of children of this process, cannot be larger than max_children
+int max_children;       // setup by main(), and at the start of new child process
+pid_t *child_processes; // setup by main(), and at the start of new child process
+
+#ifdef HAVE_SEMAPHORE_H
+sem_t global_children;
+#endif
+
+
+void grim_reaper(int waitall)
+{
+#ifdef HAVE_FORK
+#ifdef HAVE_SEMAPHORE_H
+    printf("grim reaper %s for pid %d (parent %d) with %d children\n", (waitall) ? "all" : "", getpid(), getppid(), active_children);
+    fflush(stdout);
+    int i,j;
+    for (i=0; i<active_children; i++) {
+        pid_t child = child_processes[i];
+        pid_t ch = waitpid(child, NULL, ((waitall) ? 0 : WNOHANG));
+        if (ch == child) {
+            // this has terminated, remove it from the list
+            for (j=i; j<active_children-1; j++) {
+                child_processes[j] = child_processes[j+1];
+            }
+            active_children--;
+            i--;
+        }
+    }
+    printf("grim reaper %s for pid %d with %d children\n", (waitall) ? "all" : "", getpid(), active_children);
+    fflush(stdout);
+#endif
+#endif
+}
+
+
+pid_t try_fork()
+{
+#ifdef HAVE_FORK
+#ifdef HAVE_SEMAPHORE_H
+    int available;
+    grim_reaper(0);
+    sem_getvalue(&global_children, &available);
+    if (available) {
+        sem_wait(&global_children);
+        pid_t child = fork();
+        if (child < 0) {
+            // fork failed, pretend it worked and we are the child
+            return 0;
+        }
+        else if (child == 0) {
+            pid_t me = getpid();
+            printf("forked child pid %d \n", me);
+            fflush(stdout);
+            // fork worked, and we are the child, reinitialize *our* list of children
+            active_children = 0;
+            memset(child_processes, 0, sizeof(pid_t) * max_children);
+        }
+        else {
+            // fork worked, and we are the parent, record this child that we need to wait for
+            child_processes[active_children++] = child;
+        }
+        return child;
+    }
+    else {
+        return 0;   // pretend to have forked and we are the child
+    }
+#endif
+#endif
+    return 0;
+}
+
 
 void process(pst_item *outeritem, pst_desc_tree *d_ptr)
 {
@@ -154,10 +227,29 @@
         if (item->folder && item->file_as.str) {
             DEBUG_MAIN(("Processing Folder \"%s\"\n", item->file_as.str));
             if (output_mode != OUTPUT_QUIET) printf("Processing Folder \"%s\"\n", item->file_as.str);
+            fflush(stdout);
             ff.item_count++;
             if (d_ptr->child && (deleted_mode == DMODE_INCLUDE || strcasecmp(item->file_as.str, "Deleted Items"))) {
                 //if this is a non-empty folder other than deleted items, we want to recurse into it
-                process(item, d_ptr->child);
+                pid_t parent = getpid();
+                pid_t child = try_fork();
+                if (child == 0) {
+                    // we are the child process, or the original parent if no children were available
+                    pid_t me = getpid();
+                    process(item, d_ptr->child);
+#ifdef HAVE_FORK
+#ifdef HAVE_SEMAPHORE_H
+                    if (me != parent) {
+                        // we really were a child, forked for the sole purpose of processing this folder
+                        // free my child count slot before really exiting, since
+                        // all I am doing here is waiting for my children to exit
+                        sem_post(&global_children);
+                        grim_reaper(1); // wait for all my child processes to exit
+                        exit(0);        // really exit
+                    }
+#endif
+#endif
+                }
             }
 
         } else if (item->contact && (item->type == PST_TYPE_CONTACT)) {
@@ -382,7 +474,16 @@
         DIE(("Top of folders record not found. Cannot continue\n"));
     }
 
+    max_children = (d_log) ? 0 : 10;
+    child_processes = (pid_t *)pst_malloc(sizeof(pid_t) * max_children);
+    active_children = 0;
+    memset(child_processes, 0, sizeof(pid_t) * max_children);
+#ifdef HAVE_SEMAPHORE_H
+    sem_init(&global_children, 1, max_children);
+#endif
     process(item, d_ptr->child);    // do the children of TOPF
+    grim_reaper(1); // wait for all child processes
+
     pst_freeItem(item);
     pst_close(&pstfile);
     DEBUG_RET();
@@ -1229,7 +1330,9 @@
 
     if (!has_date && item->email->sent_date) {
         char c_time[C_TIME_SIZE];
-        strftime(c_time, C_TIME_SIZE, "%a, %d %b %Y %H:%M:%S %z", gmtime(&em_time));
+        struct tm stm;
+        gmtime_r(&em_time, &stm);
+        strftime(c_time, C_TIME_SIZE, "%a, %d %b %Y %H:%M:%S %z", &stm);
         fprintf(f_output, "Date: %s\n", c_time);
     }