Mercurial > libpst
diff src/readpst.c @ 201:3850a3b11745
fixes for parallel readpst
author | Carl Byington <carl@five-ten-sg.com> |
---|---|
date | Sat, 16 May 2009 10:32:26 -0700 |
parents | d360f96f71f6 |
children | 2f38c4ce606f |
line wrap: on
line diff
--- a/src/readpst.c Wed May 13 20:06:53 2009 -0700 +++ b/src/readpst.c Sat May 16 10:32:26 2009 -0700 @@ -26,8 +26,8 @@ int32_t type; }; -void grim_reaper(); -pid_t try_fork(); +int grim_reaper(); +pid_t try_fork(char* folder); void process(pst_item *outeritem, pst_desc_tree *d_ptr); void write_email_body(FILE *f, char *body); void removeCR(char *c); @@ -110,76 +110,84 @@ #define RTF_ATTACH_TYPE "application/rtf" // global settings -int mode = MODE_NORMAL; -int mode_MH = 0; // a submode of MODE_SEPARATE -int output_mode = OUTPUT_NORMAL; -int contact_mode = CMODE_VCARD; -int deleted_mode = DMODE_EXCLUDE; -int contact_mode_specified = 0; -int overwrite = 0; -int save_rtf_body = 1; -pst_file pstfile; -regex_t meta_charset_pattern; +int mode = MODE_NORMAL; +int mode_MH = 0; // a submode of MODE_SEPARATE +int output_mode = OUTPUT_NORMAL; +int contact_mode = CMODE_VCARD; +int deleted_mode = DMODE_EXCLUDE; +int contact_mode_specified = 0; +int overwrite = 0; +int save_rtf_body = 1; +pst_file pstfile; +regex_t meta_charset_pattern; -int active_children; // number of children of this process, cannot be larger than max_children -int max_children; // setup by main(), and at the start of new child process -pid_t *child_processes; // setup by main(), and at the start of new child process +int number_processors = 1; // number of cpus we have +int max_children = 0; // based on number of cpus and command line args +int max_child_specified = 0;// have command line arg -j +int active_children; // number of children of this process, cannot be larger than max_children +pid_t* child_processes; // setup by main(), and at the start of new child process #ifdef HAVE_SEMAPHORE_H -sem_t global_children; +int shared_memory_id; +sem_t* global_children = NULL; #endif -void grim_reaper(int waitall) +int grim_reaper(int waitall) +{ + int available = 0; +#ifdef HAVE_FORK +#ifdef HAVE_SEMAPHORE_H + if (global_children) { + sem_getvalue(global_children, &available); + //printf("grim reaper %s for pid %d (parent %d) with %d children, %d available\n", (waitall) ? "all" : "", getpid(), getppid(), active_children, available); + fflush(stdout); + int i,j; + for (i=0; i<active_children; i++) { + pid_t child = child_processes[i]; + pid_t ch = waitpid(child, NULL, ((waitall) ? 0 : WNOHANG)); + if (ch == child) { + // this has terminated, remove it from the list + for (j=i; j<active_children-1; j++) { + child_processes[j] = child_processes[j+1]; + } + active_children--; + i--; + } + } + sem_getvalue(global_children, &available); + //printf("grim reaper %s for pid %d with %d children, %d available\n", (waitall) ? "all" : "", getpid(), active_children, available); + fflush(stdout); + } +#endif +#endif + return available; +} + + +pid_t try_fork(char *folder) { #ifdef HAVE_FORK #ifdef HAVE_SEMAPHORE_H - printf("grim reaper %s for pid %d (parent %d) with %d children\n", (waitall) ? "all" : "", getpid(), getppid(), active_children); - fflush(stdout); - int i,j; - for (i=0; i<active_children; i++) { - pid_t child = child_processes[i]; - pid_t ch = waitpid(child, NULL, ((waitall) ? 0 : WNOHANG)); - if (ch == child) { - // this has terminated, remove it from the list - for (j=i; j<active_children-1; j++) { - child_processes[j] = child_processes[j+1]; - } - active_children--; - i--; - } - } - printf("grim reaper %s for pid %d with %d children\n", (waitall) ? "all" : "", getpid(), active_children); - fflush(stdout); -#endif -#endif -} - - -pid_t try_fork() -{ -#ifdef HAVE_FORK -#ifdef HAVE_SEMAPHORE_H - int available; - grim_reaper(0); - sem_getvalue(&global_children, &available); + int available = grim_reaper(0); if (available) { - sem_wait(&global_children); + sem_wait(global_children); pid_t child = fork(); if (child < 0) { // fork failed, pretend it worked and we are the child return 0; } else if (child == 0) { - pid_t me = getpid(); - printf("forked child pid %d \n", me); - fflush(stdout); // fork worked, and we are the child, reinitialize *our* list of children active_children = 0; memset(child_processes, 0, sizeof(pid_t) * max_children); + pst_reopen(&pstfile); // close and reopen the pst file to get an independent file position pointer } else { // fork worked, and we are the parent, record this child that we need to wait for + pid_t me = getpid(); + //printf("parent %d forked child pid %d to process folder %s\n", me, child, folder); + fflush(stdout); child_processes[active_children++] = child; } return child; @@ -232,7 +240,7 @@ if (d_ptr->child && (deleted_mode == DMODE_INCLUDE || strcasecmp(item->file_as.str, "Deleted Items"))) { //if this is a non-empty folder other than deleted items, we want to recurse into it pid_t parent = getpid(); - pid_t child = try_fork(); + pid_t child = try_fork(item->file_as.str); if (child == 0) { // we are the child process, or the original parent if no children were available pid_t me = getpid(); @@ -243,7 +251,7 @@ // we really were a child, forked for the sole purpose of processing this folder // free my child count slot before really exiting, since // all I am doing here is waiting for my children to exit - sem_post(&global_children); + sem_post(global_children); grim_reaper(1); // wait for all my child processes to exit exit(0); // really exit } @@ -351,7 +359,7 @@ } // command-line option handling - while ((c = getopt(argc, argv, "bc:Dd:hko:qrSMVw"))!= -1) { + while ((c = getopt(argc, argv, "bc:Dd:hj:kMo:qrSVw"))!= -1) { switch (c) { case 'b': save_rtf_body = 0; @@ -380,9 +388,9 @@ usage(); exit(0); break; - case 'V': - version(); - exit(0); + case 'j': + max_children = atoi(optarg); + max_child_specified = 1; break; case 'k': mode = MODE_KMAIL; @@ -404,6 +412,10 @@ mode = MODE_SEPARATE; mode_MH = 0; break; + case 'V': + version(); + exit(0); + break; case 'w': overwrite = 1; break; @@ -443,8 +455,6 @@ DIE(("main: Cannot change to output dir %s: %s\n", output_dir, strerror(x))); } - if (output_mode != OUTPUT_QUIET) printf("About to start processing first record...\n"); - d_ptr = pstfile.d_head; // first record is main record item = pst_parse_item(&pstfile, d_ptr, NULL); if (!item || !item->message_store) { @@ -474,16 +484,38 @@ DIE(("Top of folders record not found. Cannot continue\n")); } - max_children = (d_log) ? 0 : 10; +#ifdef _SC_NPROCESSORS_ONLN + number_processors = sysconf(_SC_NPROCESSORS_ONLN); +#endif + max_children = (d_log) ? 0 : (!max_child_specified) ? number_processors * 4 : max_children; + active_children = 0; child_processes = (pid_t *)pst_malloc(sizeof(pid_t) * max_children); - active_children = 0; memset(child_processes, 0, sizeof(pid_t) * max_children); + #ifdef HAVE_SEMAPHORE_H - sem_init(&global_children, 1, max_children); + if (max_children) { + shared_memory_id = shmget(IPC_PRIVATE, sizeof(sem_t), 0777); + //printf("shared memory id %d\n", shared_memory_id); + if (shared_memory_id >= 0) { + global_children = (sem_t *)shmat(shared_memory_id, NULL, 0); + //printf("shared memory pointer %p\n", (void*)global_children); + if (global_children == (sem_t *)-1) global_children = NULL; + if (global_children) sem_init(global_children, 1, max_children); + shmctl(shared_memory_id, IPC_RMID, NULL); + } + } #endif + process(item, d_ptr->child); // do the children of TOPF grim_reaper(1); // wait for all child processes +#ifdef HAVE_SEMAPHORE_H + if (global_children) { + sem_destroy(global_children); + shmdt(global_children); + } +#endif + pst_freeItem(item); pst_close(&pstfile); DEBUG_RET(); @@ -531,7 +563,6 @@ printf("Usage: %s [OPTIONS] {PST FILENAME}\n", prog_name); printf("OPTIONS:\n"); printf("\t-V\t- Version. Display program version\n"); - printf("\t-C\t- Decrypt (compressible encryption) the entire file and output on stdout (not typically useful)\n"); printf("\t-D\t- Include deleted items in output\n"); printf("\t-M\t- MH. Write emails in the MH format\n"); printf("\t-S\t- Separate. Write emails in the separate format\n"); @@ -539,11 +570,14 @@ printf("\t-c[v|l]\t- Set the Contact output mode. -cv = VCard, -cl = EMail list\n"); printf("\t-d <filename> \t- Debug to file. This is a binary log. Use readpstlog to print it\n"); printf("\t-h\t- Help. This screen\n"); + printf("\t-j <integer>\t- Number of parallel jobs to run\n"); printf("\t-k\t- KMail. Output in kmail format\n"); printf("\t-o <dirname>\t- Output directory to write files to. CWD is changed *after* opening pst file\n"); printf("\t-q\t- Quiet. Only print error messages\n"); printf("\t-r\t- Recursive. Output in a recursive format\n"); printf("\t-w\t- Overwrite any output mbox files\n"); + printf("\n"); + printf("Only one of -k -M -r -S should be specified\n"); DEBUG_RET(); } @@ -713,7 +747,7 @@ struct dirent *dirent = NULL; struct stat filestat; if (!(sdir = opendir("./"))) { - WARN(("mk_separate_dir: Cannot open dir \"%s\" for deletion of old contents\n", "./")); + DEBUG_WARN(("mk_separate_dir: Cannot open dir \"%s\" for deletion of old contents\n", "./")); } else { while ((dirent = readdir(sdir))) { if (lstat(dirent->d_name, &filestat) != -1) @@ -751,7 +785,7 @@ DEBUG_ENT("mk_separate_file"); DEBUG_MAIN(("opening next file to save email\n")); if (f->item_count > 999999999) { // bigger than nine 9's - DIE(("mk_separate_file: The number of emails in this folder has become too high to handle")); + DIE(("mk_separate_file: The number of emails in this folder has become too high to handle\n")); } sprintf(f->name, SEP_MAIL_FILE_TEMPLATE, f->item_count + name_offset); if (f->output) fclose(f->output); @@ -848,7 +882,7 @@ } DEBUG_EMAIL(("Saving attachment to %s\n", temp)); if (!(fp = fopen(temp, "w"))) { - WARN(("write_separate_attachment: Cannot open attachment save file \"%s\"\n", temp)); + DEBUG_WARN(("write_separate_attachment: Cannot open attachment save file \"%s\"\n", temp)); } else { (void)pst_attach_to_file(pst, attach, fp); fclose(fp); @@ -1681,7 +1715,8 @@ pst_recurrence *rdata = pst_convert_recurrence(appointment); fprintf(f_output, "RRULE:FREQ=%s", rules[rdata->type]); if (rdata->count) fprintf(f_output, ";COUNT=%u", rdata->count); - if (rdata->interval) fprintf(f_output, ";INTERVAL=%u", rdata->interval); + if ((rdata->interval != 1) && + (rdata->interval)) fprintf(f_output, ";INTERVAL=%u", rdata->interval); if (rdata->dayofmonth) fprintf(f_output, ";BYMONTHDAY=%d", rdata->dayofmonth); if (rdata->monthofyear) fprintf(f_output, ";BYMONTH=%d", rdata->monthofyear); if (rdata->position) fprintf(f_output, ";BYSETPOS=%d", rdata->position); @@ -1694,11 +1729,12 @@ int bit = 1 << i; if (bit & rdata->bydaymask) { char temp[40]; - snprintf(temp, sizeof(temp), "%s%s%s", byday, (empty) ? "BYDAY=" : ";", days[i]); + snprintf(temp, sizeof(temp), "%s%s%s", byday, (empty) ? ";BYDAY=" : ";", days[i]); strcpy(byday, temp); empty = 0; } } + fprintf(f_output, "%s", byday); } fprintf(f_output, "\n"); pst_free_recurrence(rdata); @@ -1816,7 +1852,7 @@ fclose(f->output); stat(f->name, &st); if (!st.st_size) { - WARN(("removing empty output file %s ", f->name)); + DEBUG_WARN(("removing empty output file %s\n", f->name)); remove(f->name); } }