diff src/readpst.c @ 201:3850a3b11745

fixes for parallel readpst
author Carl Byington <carl@five-ten-sg.com>
date Sat, 16 May 2009 10:32:26 -0700
parents d360f96f71f6
children 2f38c4ce606f
line wrap: on
line diff
--- a/src/readpst.c	Wed May 13 20:06:53 2009 -0700
+++ b/src/readpst.c	Sat May 16 10:32:26 2009 -0700
@@ -26,8 +26,8 @@
     int32_t type;
 };
 
-void      grim_reaper();
-pid_t     try_fork();
+int       grim_reaper();
+pid_t     try_fork(char* folder);
 void      process(pst_item *outeritem, pst_desc_tree *d_ptr);
 void      write_email_body(FILE *f, char *body);
 void      removeCR(char *c);
@@ -110,76 +110,84 @@
 #define RTF_ATTACH_TYPE "application/rtf"
 
 // global settings
-int mode         = MODE_NORMAL;
-int mode_MH      = 0;   // a submode of MODE_SEPARATE
-int output_mode  = OUTPUT_NORMAL;
-int contact_mode = CMODE_VCARD;
-int deleted_mode = DMODE_EXCLUDE;
-int contact_mode_specified = 0;
-int overwrite = 0;
-int save_rtf_body = 1;
-pst_file pstfile;
-regex_t  meta_charset_pattern;
+int         mode         = MODE_NORMAL;
+int         mode_MH      = 0;   // a submode of MODE_SEPARATE
+int         output_mode  = OUTPUT_NORMAL;
+int         contact_mode = CMODE_VCARD;
+int         deleted_mode = DMODE_EXCLUDE;
+int         contact_mode_specified = 0;
+int         overwrite = 0;
+int         save_rtf_body = 1;
+pst_file    pstfile;
+regex_t     meta_charset_pattern;
 
-int active_children;    // number of children of this process, cannot be larger than max_children
-int max_children;       // setup by main(), and at the start of new child process
-pid_t *child_processes; // setup by main(), and at the start of new child process
+int         number_processors = 1;  // number of cpus we have
+int         max_children  = 0;      // based on number of cpus and command line args
+int         max_child_specified = 0;// have command line arg -j
+int         active_children;        // number of children of this process, cannot be larger than max_children
+pid_t*      child_processes;        // setup by main(), and at the start of new child process
 
 #ifdef HAVE_SEMAPHORE_H
-sem_t global_children;
+int         shared_memory_id;
+sem_t*      global_children = NULL;
 #endif
 
 
-void grim_reaper(int waitall)
+int grim_reaper(int waitall)
+{
+    int available = 0;
+#ifdef HAVE_FORK
+#ifdef HAVE_SEMAPHORE_H
+    if (global_children) {
+        sem_getvalue(global_children, &available);
+        //printf("grim reaper %s for pid %d (parent %d) with %d children, %d available\n", (waitall) ? "all" : "", getpid(), getppid(), active_children, available);
+        fflush(stdout);
+        int i,j;
+        for (i=0; i<active_children; i++) {
+            pid_t child = child_processes[i];
+            pid_t ch = waitpid(child, NULL, ((waitall) ? 0 : WNOHANG));
+            if (ch == child) {
+                // this has terminated, remove it from the list
+                for (j=i; j<active_children-1; j++) {
+                    child_processes[j] = child_processes[j+1];
+                }
+                active_children--;
+                i--;
+            }
+        }
+        sem_getvalue(global_children, &available);
+        //printf("grim reaper %s for pid %d with %d children, %d available\n", (waitall) ? "all" : "", getpid(), active_children, available);
+        fflush(stdout);
+    }
+#endif
+#endif
+    return available;
+}
+
+
+pid_t try_fork(char *folder)
 {
 #ifdef HAVE_FORK
 #ifdef HAVE_SEMAPHORE_H
-    printf("grim reaper %s for pid %d (parent %d) with %d children\n", (waitall) ? "all" : "", getpid(), getppid(), active_children);
-    fflush(stdout);
-    int i,j;
-    for (i=0; i<active_children; i++) {
-        pid_t child = child_processes[i];
-        pid_t ch = waitpid(child, NULL, ((waitall) ? 0 : WNOHANG));
-        if (ch == child) {
-            // this has terminated, remove it from the list
-            for (j=i; j<active_children-1; j++) {
-                child_processes[j] = child_processes[j+1];
-            }
-            active_children--;
-            i--;
-        }
-    }
-    printf("grim reaper %s for pid %d with %d children\n", (waitall) ? "all" : "", getpid(), active_children);
-    fflush(stdout);
-#endif
-#endif
-}
-
-
-pid_t try_fork()
-{
-#ifdef HAVE_FORK
-#ifdef HAVE_SEMAPHORE_H
-    int available;
-    grim_reaper(0);
-    sem_getvalue(&global_children, &available);
+    int available = grim_reaper(0);
     if (available) {
-        sem_wait(&global_children);
+        sem_wait(global_children);
         pid_t child = fork();
         if (child < 0) {
             // fork failed, pretend it worked and we are the child
             return 0;
         }
         else if (child == 0) {
-            pid_t me = getpid();
-            printf("forked child pid %d \n", me);
-            fflush(stdout);
             // fork worked, and we are the child, reinitialize *our* list of children
             active_children = 0;
             memset(child_processes, 0, sizeof(pid_t) * max_children);
+            pst_reopen(&pstfile);   // close and reopen the pst file to get an independent file position pointer
         }
         else {
             // fork worked, and we are the parent, record this child that we need to wait for
+            pid_t me = getpid();
+            //printf("parent %d forked child pid %d to process folder %s\n", me, child, folder);
+            fflush(stdout);
             child_processes[active_children++] = child;
         }
         return child;
@@ -232,7 +240,7 @@
             if (d_ptr->child && (deleted_mode == DMODE_INCLUDE || strcasecmp(item->file_as.str, "Deleted Items"))) {
                 //if this is a non-empty folder other than deleted items, we want to recurse into it
                 pid_t parent = getpid();
-                pid_t child = try_fork();
+                pid_t child = try_fork(item->file_as.str);
                 if (child == 0) {
                     // we are the child process, or the original parent if no children were available
                     pid_t me = getpid();
@@ -243,7 +251,7 @@
                         // we really were a child, forked for the sole purpose of processing this folder
                         // free my child count slot before really exiting, since
                         // all I am doing here is waiting for my children to exit
-                        sem_post(&global_children);
+                        sem_post(global_children);
                         grim_reaper(1); // wait for all my child processes to exit
                         exit(0);        // really exit
                     }
@@ -351,7 +359,7 @@
     }
 
     // command-line option handling
-    while ((c = getopt(argc, argv, "bc:Dd:hko:qrSMVw"))!= -1) {
+    while ((c = getopt(argc, argv, "bc:Dd:hj:kMo:qrSVw"))!= -1) {
         switch (c) {
         case 'b':
             save_rtf_body = 0;
@@ -380,9 +388,9 @@
             usage();
             exit(0);
             break;
-        case 'V':
-            version();
-            exit(0);
+        case 'j':
+            max_children = atoi(optarg);
+            max_child_specified = 1;
             break;
         case 'k':
             mode = MODE_KMAIL;
@@ -404,6 +412,10 @@
             mode = MODE_SEPARATE;
             mode_MH = 0;
             break;
+        case 'V':
+            version();
+            exit(0);
+            break;
         case 'w':
             overwrite = 1;
             break;
@@ -443,8 +455,6 @@
         DIE(("main: Cannot change to output dir %s: %s\n", output_dir, strerror(x)));
     }
 
-    if (output_mode != OUTPUT_QUIET) printf("About to start processing first record...\n");
-
     d_ptr = pstfile.d_head; // first record is main record
     item  = pst_parse_item(&pstfile, d_ptr, NULL);
     if (!item || !item->message_store) {
@@ -474,16 +484,38 @@
         DIE(("Top of folders record not found. Cannot continue\n"));
     }
 
-    max_children = (d_log) ? 0 : 10;
+#ifdef _SC_NPROCESSORS_ONLN
+    number_processors =  sysconf(_SC_NPROCESSORS_ONLN);
+#endif
+    max_children    = (d_log) ? 0 : (!max_child_specified) ? number_processors * 4 : max_children;
+    active_children = 0;
     child_processes = (pid_t *)pst_malloc(sizeof(pid_t) * max_children);
-    active_children = 0;
     memset(child_processes, 0, sizeof(pid_t) * max_children);
+
 #ifdef HAVE_SEMAPHORE_H
-    sem_init(&global_children, 1, max_children);
+    if (max_children) {
+        shared_memory_id = shmget(IPC_PRIVATE, sizeof(sem_t), 0777);
+        //printf("shared memory id %d\n", shared_memory_id);
+        if (shared_memory_id >= 0) {
+            global_children = (sem_t *)shmat(shared_memory_id, NULL, 0);
+            //printf("shared memory pointer %p\n", (void*)global_children);
+            if (global_children == (sem_t *)-1) global_children = NULL;
+            if (global_children) sem_init(global_children, 1, max_children);
+            shmctl(shared_memory_id, IPC_RMID, NULL);
+        }
+    }
 #endif
+
     process(item, d_ptr->child);    // do the children of TOPF
     grim_reaper(1); // wait for all child processes
 
+#ifdef HAVE_SEMAPHORE_H
+    if (global_children) {
+        sem_destroy(global_children);
+        shmdt(global_children);
+    }
+#endif
+
     pst_freeItem(item);
     pst_close(&pstfile);
     DEBUG_RET();
@@ -531,7 +563,6 @@
     printf("Usage: %s [OPTIONS] {PST FILENAME}\n", prog_name);
     printf("OPTIONS:\n");
     printf("\t-V\t- Version. Display program version\n");
-    printf("\t-C\t- Decrypt (compressible encryption) the entire file and output on stdout (not typically useful)\n");
     printf("\t-D\t- Include deleted items in output\n");
     printf("\t-M\t- MH. Write emails in the MH format\n");
     printf("\t-S\t- Separate. Write emails in the separate format\n");
@@ -539,11 +570,14 @@
     printf("\t-c[v|l]\t- Set the Contact output mode. -cv = VCard, -cl = EMail list\n");
     printf("\t-d <filename> \t- Debug to file. This is a binary log. Use readpstlog to print it\n");
     printf("\t-h\t- Help. This screen\n");
+    printf("\t-j <integer>\t- Number of parallel jobs to run\n");
     printf("\t-k\t- KMail. Output in kmail format\n");
     printf("\t-o <dirname>\t- Output directory to write files to. CWD is changed *after* opening pst file\n");
     printf("\t-q\t- Quiet. Only print error messages\n");
     printf("\t-r\t- Recursive. Output in a recursive format\n");
     printf("\t-w\t- Overwrite any output mbox files\n");
+    printf("\n");
+    printf("Only one of -k -M -r -S should be specified\n");
     DEBUG_RET();
 }
 
@@ -713,7 +747,7 @@
         struct dirent *dirent = NULL;
         struct stat filestat;
         if (!(sdir = opendir("./"))) {
-            WARN(("mk_separate_dir: Cannot open dir \"%s\" for deletion of old contents\n", "./"));
+            DEBUG_WARN(("mk_separate_dir: Cannot open dir \"%s\" for deletion of old contents\n", "./"));
         } else {
             while ((dirent = readdir(sdir))) {
                 if (lstat(dirent->d_name, &filestat) != -1)
@@ -751,7 +785,7 @@
     DEBUG_ENT("mk_separate_file");
     DEBUG_MAIN(("opening next file to save email\n"));
     if (f->item_count > 999999999) { // bigger than nine 9's
-        DIE(("mk_separate_file: The number of emails in this folder has become too high to handle"));
+        DIE(("mk_separate_file: The number of emails in this folder has become too high to handle\n"));
     }
     sprintf(f->name, SEP_MAIL_FILE_TEMPLATE, f->item_count + name_offset);
     if (f->output) fclose(f->output);
@@ -848,7 +882,7 @@
     }
     DEBUG_EMAIL(("Saving attachment to %s\n", temp));
     if (!(fp = fopen(temp, "w"))) {
-        WARN(("write_separate_attachment: Cannot open attachment save file \"%s\"\n", temp));
+        DEBUG_WARN(("write_separate_attachment: Cannot open attachment save file \"%s\"\n", temp));
     } else {
         (void)pst_attach_to_file(pst, attach, fp);
         fclose(fp);
@@ -1681,7 +1715,8 @@
             pst_recurrence *rdata = pst_convert_recurrence(appointment);
             fprintf(f_output, "RRULE:FREQ=%s", rules[rdata->type]);
             if (rdata->count)       fprintf(f_output, ";COUNT=%u",      rdata->count);
-            if (rdata->interval)    fprintf(f_output, ";INTERVAL=%u",   rdata->interval);
+            if ((rdata->interval != 1) &&
+                (rdata->interval))  fprintf(f_output, ";INTERVAL=%u",   rdata->interval);
             if (rdata->dayofmonth)  fprintf(f_output, ";BYMONTHDAY=%d", rdata->dayofmonth);
             if (rdata->monthofyear) fprintf(f_output, ";BYMONTH=%d",    rdata->monthofyear);
             if (rdata->position)    fprintf(f_output, ";BYSETPOS=%d",   rdata->position);
@@ -1694,11 +1729,12 @@
                     int bit = 1 << i;
                     if (bit & rdata->bydaymask) {
                         char temp[40];
-                        snprintf(temp, sizeof(temp), "%s%s%s", byday, (empty) ? "BYDAY=" : ";", days[i]);
+                        snprintf(temp, sizeof(temp), "%s%s%s", byday, (empty) ? ";BYDAY=" : ";", days[i]);
                         strcpy(byday, temp);
                         empty = 0;
                     }
                 }
+                fprintf(f_output, "%s", byday);
             }
             fprintf(f_output, "\n");
             pst_free_recurrence(rdata);
@@ -1816,7 +1852,7 @@
         fclose(f->output);
         stat(f->name, &st);
         if (!st.st_size) {
-            WARN(("removing empty output file %s ", f->name));
+            DEBUG_WARN(("removing empty output file %s\n", f->name));
             remove(f->name);
         }
     }