diff src/readpst.c @ 200:d360f96f71f6

start changes for parallel readpst on multi-processor machines
author Carl Byington <carl@five-ten-sg.com>
date Wed, 13 May 2009 20:06:53 -0700
parents e3a46f66332b
children 3850a3b11745
line wrap: on
line diff
--- a/src/readpst.c	Wed May 13 11:59:55 2009 -0700
+++ b/src/readpst.c	Wed May 13 20:06:53 2009 -0700
@@ -26,6 +26,8 @@
     int32_t type;
 };
 
+void      grim_reaper();
+pid_t     try_fork();
 void      process(pst_item *outeritem, pst_desc_tree *d_ptr);
 void      write_email_body(FILE *f, char *body);
 void      removeCR(char *c);
@@ -119,6 +121,77 @@
 pst_file pstfile;
 regex_t  meta_charset_pattern;
 
+int active_children;    // number of children of this process, cannot be larger than max_children
+int max_children;       // setup by main(), and at the start of new child process
+pid_t *child_processes; // setup by main(), and at the start of new child process
+
+#ifdef HAVE_SEMAPHORE_H
+sem_t global_children;
+#endif
+
+
+void grim_reaper(int waitall)
+{
+#ifdef HAVE_FORK
+#ifdef HAVE_SEMAPHORE_H
+    printf("grim reaper %s for pid %d (parent %d) with %d children\n", (waitall) ? "all" : "", getpid(), getppid(), active_children);
+    fflush(stdout);
+    int i,j;
+    for (i=0; i<active_children; i++) {
+        pid_t child = child_processes[i];
+        pid_t ch = waitpid(child, NULL, ((waitall) ? 0 : WNOHANG));
+        if (ch == child) {
+            // this has terminated, remove it from the list
+            for (j=i; j<active_children-1; j++) {
+                child_processes[j] = child_processes[j+1];
+            }
+            active_children--;
+            i--;
+        }
+    }
+    printf("grim reaper %s for pid %d with %d children\n", (waitall) ? "all" : "", getpid(), active_children);
+    fflush(stdout);
+#endif
+#endif
+}
+
+
+pid_t try_fork()
+{
+#ifdef HAVE_FORK
+#ifdef HAVE_SEMAPHORE_H
+    int available;
+    grim_reaper(0);
+    sem_getvalue(&global_children, &available);
+    if (available) {
+        sem_wait(&global_children);
+        pid_t child = fork();
+        if (child < 0) {
+            // fork failed, pretend it worked and we are the child
+            return 0;
+        }
+        else if (child == 0) {
+            pid_t me = getpid();
+            printf("forked child pid %d \n", me);
+            fflush(stdout);
+            // fork worked, and we are the child, reinitialize *our* list of children
+            active_children = 0;
+            memset(child_processes, 0, sizeof(pid_t) * max_children);
+        }
+        else {
+            // fork worked, and we are the parent, record this child that we need to wait for
+            child_processes[active_children++] = child;
+        }
+        return child;
+    }
+    else {
+        return 0;   // pretend to have forked and we are the child
+    }
+#endif
+#endif
+    return 0;
+}
+
 
 void process(pst_item *outeritem, pst_desc_tree *d_ptr)
 {
@@ -154,10 +227,29 @@
         if (item->folder && item->file_as.str) {
             DEBUG_MAIN(("Processing Folder \"%s\"\n", item->file_as.str));
             if (output_mode != OUTPUT_QUIET) printf("Processing Folder \"%s\"\n", item->file_as.str);
+            fflush(stdout);
             ff.item_count++;
             if (d_ptr->child && (deleted_mode == DMODE_INCLUDE || strcasecmp(item->file_as.str, "Deleted Items"))) {
                 //if this is a non-empty folder other than deleted items, we want to recurse into it
-                process(item, d_ptr->child);
+                pid_t parent = getpid();
+                pid_t child = try_fork();
+                if (child == 0) {
+                    // we are the child process, or the original parent if no children were available
+                    pid_t me = getpid();
+                    process(item, d_ptr->child);
+#ifdef HAVE_FORK
+#ifdef HAVE_SEMAPHORE_H
+                    if (me != parent) {
+                        // we really were a child, forked for the sole purpose of processing this folder
+                        // free my child count slot before really exiting, since
+                        // all I am doing here is waiting for my children to exit
+                        sem_post(&global_children);
+                        grim_reaper(1); // wait for all my child processes to exit
+                        exit(0);        // really exit
+                    }
+#endif
+#endif
+                }
             }
 
         } else if (item->contact && (item->type == PST_TYPE_CONTACT)) {
@@ -382,7 +474,16 @@
         DIE(("Top of folders record not found. Cannot continue\n"));
     }
 
+    max_children = (d_log) ? 0 : 10;
+    child_processes = (pid_t *)pst_malloc(sizeof(pid_t) * max_children);
+    active_children = 0;
+    memset(child_processes, 0, sizeof(pid_t) * max_children);
+#ifdef HAVE_SEMAPHORE_H
+    sem_init(&global_children, 1, max_children);
+#endif
     process(item, d_ptr->child);    // do the children of TOPF
+    grim_reaper(1); // wait for all child processes
+
     pst_freeItem(item);
     pst_close(&pstfile);
     DEBUG_RET();
@@ -1229,7 +1330,9 @@
 
     if (!has_date && item->email->sent_date) {
         char c_time[C_TIME_SIZE];
-        strftime(c_time, C_TIME_SIZE, "%a, %d %b %Y %H:%M:%S %z", gmtime(&em_time));
+        struct tm stm;
+        gmtime_r(&em_time, &stm);
+        strftime(c_time, C_TIME_SIZE, "%a, %d %b %Y %H:%M:%S %z", &stm);
         fprintf(f_output, "Date: %s\n", c_time);
     }