comparison src/readpst.c @ 201:3850a3b11745

fixes for parallel readpst
author Carl Byington <carl@five-ten-sg.com>
date Sat, 16 May 2009 10:32:26 -0700
parents d360f96f71f6
children 2f38c4ce606f
comparison
equal deleted inserted replaced
200:d360f96f71f6 201:3850a3b11745
24 int32_t item_count; 24 int32_t item_count;
25 int32_t skip_count; 25 int32_t skip_count;
26 int32_t type; 26 int32_t type;
27 }; 27 };
28 28
29 void grim_reaper(); 29 int grim_reaper();
30 pid_t try_fork(); 30 pid_t try_fork(char* folder);
31 void process(pst_item *outeritem, pst_desc_tree *d_ptr); 31 void process(pst_item *outeritem, pst_desc_tree *d_ptr);
32 void write_email_body(FILE *f, char *body); 32 void write_email_body(FILE *f, char *body);
33 void removeCR(char *c); 33 void removeCR(char *c);
34 void usage(); 34 void usage();
35 void version(); 35 void version();
108 #define RTF_ATTACH_NAME "rtf-body.rtf" 108 #define RTF_ATTACH_NAME "rtf-body.rtf"
109 // mime type for the attachment 109 // mime type for the attachment
110 #define RTF_ATTACH_TYPE "application/rtf" 110 #define RTF_ATTACH_TYPE "application/rtf"
111 111
112 // global settings 112 // global settings
113 int mode = MODE_NORMAL; 113 int mode = MODE_NORMAL;
114 int mode_MH = 0; // a submode of MODE_SEPARATE 114 int mode_MH = 0; // a submode of MODE_SEPARATE
115 int output_mode = OUTPUT_NORMAL; 115 int output_mode = OUTPUT_NORMAL;
116 int contact_mode = CMODE_VCARD; 116 int contact_mode = CMODE_VCARD;
117 int deleted_mode = DMODE_EXCLUDE; 117 int deleted_mode = DMODE_EXCLUDE;
118 int contact_mode_specified = 0; 118 int contact_mode_specified = 0;
119 int overwrite = 0; 119 int overwrite = 0;
120 int save_rtf_body = 1; 120 int save_rtf_body = 1;
121 pst_file pstfile; 121 pst_file pstfile;
122 regex_t meta_charset_pattern; 122 regex_t meta_charset_pattern;
123 123
124 int active_children; // number of children of this process, cannot be larger than max_children 124 int number_processors = 1; // number of cpus we have
125 int max_children; // setup by main(), and at the start of new child process 125 int max_children = 0; // based on number of cpus and command line args
126 pid_t *child_processes; // setup by main(), and at the start of new child process 126 int max_child_specified = 0;// have command line arg -j
127 int active_children; // number of children of this process, cannot be larger than max_children
128 pid_t* child_processes; // setup by main(), and at the start of new child process
127 129
128 #ifdef HAVE_SEMAPHORE_H 130 #ifdef HAVE_SEMAPHORE_H
129 sem_t global_children; 131 int shared_memory_id;
132 sem_t* global_children = NULL;
130 #endif 133 #endif
131 134
132 135
133 void grim_reaper(int waitall) 136 int grim_reaper(int waitall)
134 { 137 {
138 int available = 0;
135 #ifdef HAVE_FORK 139 #ifdef HAVE_FORK
136 #ifdef HAVE_SEMAPHORE_H 140 #ifdef HAVE_SEMAPHORE_H
137 printf("grim reaper %s for pid %d (parent %d) with %d children\n", (waitall) ? "all" : "", getpid(), getppid(), active_children); 141 if (global_children) {
138 fflush(stdout); 142 sem_getvalue(global_children, &available);
139 int i,j; 143 //printf("grim reaper %s for pid %d (parent %d) with %d children, %d available\n", (waitall) ? "all" : "", getpid(), getppid(), active_children, available);
140 for (i=0; i<active_children; i++) { 144 fflush(stdout);
141 pid_t child = child_processes[i]; 145 int i,j;
142 pid_t ch = waitpid(child, NULL, ((waitall) ? 0 : WNOHANG)); 146 for (i=0; i<active_children; i++) {
143 if (ch == child) { 147 pid_t child = child_processes[i];
144 // this has terminated, remove it from the list 148 pid_t ch = waitpid(child, NULL, ((waitall) ? 0 : WNOHANG));
145 for (j=i; j<active_children-1; j++) { 149 if (ch == child) {
146 child_processes[j] = child_processes[j+1]; 150 // this has terminated, remove it from the list
147 } 151 for (j=i; j<active_children-1; j++) {
148 active_children--; 152 child_processes[j] = child_processes[j+1];
149 i--; 153 }
150 } 154 active_children--;
151 } 155 i--;
152 printf("grim reaper %s for pid %d with %d children\n", (waitall) ? "all" : "", getpid(), active_children); 156 }
153 fflush(stdout); 157 }
158 sem_getvalue(global_children, &available);
159 //printf("grim reaper %s for pid %d with %d children, %d available\n", (waitall) ? "all" : "", getpid(), active_children, available);
160 fflush(stdout);
161 }
154 #endif 162 #endif
155 #endif 163 #endif
156 } 164 return available;
157 165 }
158 166
159 pid_t try_fork() 167
168 pid_t try_fork(char *folder)
160 { 169 {
161 #ifdef HAVE_FORK 170 #ifdef HAVE_FORK
162 #ifdef HAVE_SEMAPHORE_H 171 #ifdef HAVE_SEMAPHORE_H
163 int available; 172 int available = grim_reaper(0);
164 grim_reaper(0);
165 sem_getvalue(&global_children, &available);
166 if (available) { 173 if (available) {
167 sem_wait(&global_children); 174 sem_wait(global_children);
168 pid_t child = fork(); 175 pid_t child = fork();
169 if (child < 0) { 176 if (child < 0) {
170 // fork failed, pretend it worked and we are the child 177 // fork failed, pretend it worked and we are the child
171 return 0; 178 return 0;
172 } 179 }
173 else if (child == 0) { 180 else if (child == 0) {
174 pid_t me = getpid();
175 printf("forked child pid %d \n", me);
176 fflush(stdout);
177 // fork worked, and we are the child, reinitialize *our* list of children 181 // fork worked, and we are the child, reinitialize *our* list of children
178 active_children = 0; 182 active_children = 0;
179 memset(child_processes, 0, sizeof(pid_t) * max_children); 183 memset(child_processes, 0, sizeof(pid_t) * max_children);
184 pst_reopen(&pstfile); // close and reopen the pst file to get an independent file position pointer
180 } 185 }
181 else { 186 else {
182 // fork worked, and we are the parent, record this child that we need to wait for 187 // fork worked, and we are the parent, record this child that we need to wait for
188 pid_t me = getpid();
189 //printf("parent %d forked child pid %d to process folder %s\n", me, child, folder);
190 fflush(stdout);
183 child_processes[active_children++] = child; 191 child_processes[active_children++] = child;
184 } 192 }
185 return child; 193 return child;
186 } 194 }
187 else { 195 else {
230 fflush(stdout); 238 fflush(stdout);
231 ff.item_count++; 239 ff.item_count++;
232 if (d_ptr->child && (deleted_mode == DMODE_INCLUDE || strcasecmp(item->file_as.str, "Deleted Items"))) { 240 if (d_ptr->child && (deleted_mode == DMODE_INCLUDE || strcasecmp(item->file_as.str, "Deleted Items"))) {
233 //if this is a non-empty folder other than deleted items, we want to recurse into it 241 //if this is a non-empty folder other than deleted items, we want to recurse into it
234 pid_t parent = getpid(); 242 pid_t parent = getpid();
235 pid_t child = try_fork(); 243 pid_t child = try_fork(item->file_as.str);
236 if (child == 0) { 244 if (child == 0) {
237 // we are the child process, or the original parent if no children were available 245 // we are the child process, or the original parent if no children were available
238 pid_t me = getpid(); 246 pid_t me = getpid();
239 process(item, d_ptr->child); 247 process(item, d_ptr->child);
240 #ifdef HAVE_FORK 248 #ifdef HAVE_FORK
241 #ifdef HAVE_SEMAPHORE_H 249 #ifdef HAVE_SEMAPHORE_H
242 if (me != parent) { 250 if (me != parent) {
243 // we really were a child, forked for the sole purpose of processing this folder 251 // we really were a child, forked for the sole purpose of processing this folder
244 // free my child count slot before really exiting, since 252 // free my child count slot before really exiting, since
245 // all I am doing here is waiting for my children to exit 253 // all I am doing here is waiting for my children to exit
246 sem_post(&global_children); 254 sem_post(global_children);
247 grim_reaper(1); // wait for all my child processes to exit 255 grim_reaper(1); // wait for all my child processes to exit
248 exit(0); // really exit 256 exit(0); // really exit
249 } 257 }
250 #endif 258 #endif
251 #endif 259 #endif
349 printf("cannot compile regex pattern to find content charset in html bodies\n"); 357 printf("cannot compile regex pattern to find content charset in html bodies\n");
350 exit(3); 358 exit(3);
351 } 359 }
352 360
353 // command-line option handling 361 // command-line option handling
354 while ((c = getopt(argc, argv, "bc:Dd:hko:qrSMVw"))!= -1) { 362 while ((c = getopt(argc, argv, "bc:Dd:hj:kMo:qrSVw"))!= -1) {
355 switch (c) { 363 switch (c) {
356 case 'b': 364 case 'b':
357 save_rtf_body = 0; 365 save_rtf_body = 0;
358 break; 366 break;
359 case 'c': 367 case 'c':
378 break; 386 break;
379 case 'h': 387 case 'h':
380 usage(); 388 usage();
381 exit(0); 389 exit(0);
382 break; 390 break;
391 case 'j':
392 max_children = atoi(optarg);
393 max_child_specified = 1;
394 break;
395 case 'k':
396 mode = MODE_KMAIL;
397 break;
398 case 'M':
399 mode = MODE_SEPARATE;
400 mode_MH = 1;
401 break;
402 case 'o':
403 output_dir = optarg;
404 break;
405 case 'q':
406 output_mode = OUTPUT_QUIET;
407 break;
408 case 'r':
409 mode = MODE_RECURSE;
410 break;
411 case 'S':
412 mode = MODE_SEPARATE;
413 mode_MH = 0;
414 break;
383 case 'V': 415 case 'V':
384 version(); 416 version();
385 exit(0); 417 exit(0);
386 break;
387 case 'k':
388 mode = MODE_KMAIL;
389 break;
390 case 'M':
391 mode = MODE_SEPARATE;
392 mode_MH = 1;
393 break;
394 case 'o':
395 output_dir = optarg;
396 break;
397 case 'q':
398 output_mode = OUTPUT_QUIET;
399 break;
400 case 'r':
401 mode = MODE_RECURSE;
402 break;
403 case 'S':
404 mode = MODE_SEPARATE;
405 mode_MH = 0;
406 break; 418 break;
407 case 'w': 419 case 'w':
408 overwrite = 1; 420 overwrite = 1;
409 break; 421 break;
410 default: 422 default:
440 x = errno; 452 x = errno;
441 pst_close(&pstfile); 453 pst_close(&pstfile);
442 DEBUG_RET(); 454 DEBUG_RET();
443 DIE(("main: Cannot change to output dir %s: %s\n", output_dir, strerror(x))); 455 DIE(("main: Cannot change to output dir %s: %s\n", output_dir, strerror(x)));
444 } 456 }
445
446 if (output_mode != OUTPUT_QUIET) printf("About to start processing first record...\n");
447 457
448 d_ptr = pstfile.d_head; // first record is main record 458 d_ptr = pstfile.d_head; // first record is main record
449 item = pst_parse_item(&pstfile, d_ptr, NULL); 459 item = pst_parse_item(&pstfile, d_ptr, NULL);
450 if (!item || !item->message_store) { 460 if (!item || !item->message_store) {
451 DEBUG_RET(); 461 DEBUG_RET();
472 if (!d_ptr) { 482 if (!d_ptr) {
473 DEBUG_RET(); 483 DEBUG_RET();
474 DIE(("Top of folders record not found. Cannot continue\n")); 484 DIE(("Top of folders record not found. Cannot continue\n"));
475 } 485 }
476 486
477 max_children = (d_log) ? 0 : 10; 487 #ifdef _SC_NPROCESSORS_ONLN
488 number_processors = sysconf(_SC_NPROCESSORS_ONLN);
489 #endif
490 max_children = (d_log) ? 0 : (!max_child_specified) ? number_processors * 4 : max_children;
491 active_children = 0;
478 child_processes = (pid_t *)pst_malloc(sizeof(pid_t) * max_children); 492 child_processes = (pid_t *)pst_malloc(sizeof(pid_t) * max_children);
479 active_children = 0;
480 memset(child_processes, 0, sizeof(pid_t) * max_children); 493 memset(child_processes, 0, sizeof(pid_t) * max_children);
494
481 #ifdef HAVE_SEMAPHORE_H 495 #ifdef HAVE_SEMAPHORE_H
482 sem_init(&global_children, 1, max_children); 496 if (max_children) {
497 shared_memory_id = shmget(IPC_PRIVATE, sizeof(sem_t), 0777);
498 //printf("shared memory id %d\n", shared_memory_id);
499 if (shared_memory_id >= 0) {
500 global_children = (sem_t *)shmat(shared_memory_id, NULL, 0);
501 //printf("shared memory pointer %p\n", (void*)global_children);
502 if (global_children == (sem_t *)-1) global_children = NULL;
503 if (global_children) sem_init(global_children, 1, max_children);
504 shmctl(shared_memory_id, IPC_RMID, NULL);
505 }
506 }
483 #endif 507 #endif
508
484 process(item, d_ptr->child); // do the children of TOPF 509 process(item, d_ptr->child); // do the children of TOPF
485 grim_reaper(1); // wait for all child processes 510 grim_reaper(1); // wait for all child processes
511
512 #ifdef HAVE_SEMAPHORE_H
513 if (global_children) {
514 sem_destroy(global_children);
515 shmdt(global_children);
516 }
517 #endif
486 518
487 pst_freeItem(item); 519 pst_freeItem(item);
488 pst_close(&pstfile); 520 pst_close(&pstfile);
489 DEBUG_RET(); 521 DEBUG_RET();
490 regfree(&meta_charset_pattern); 522 regfree(&meta_charset_pattern);
529 DEBUG_ENT("usage"); 561 DEBUG_ENT("usage");
530 version(); 562 version();
531 printf("Usage: %s [OPTIONS] {PST FILENAME}\n", prog_name); 563 printf("Usage: %s [OPTIONS] {PST FILENAME}\n", prog_name);
532 printf("OPTIONS:\n"); 564 printf("OPTIONS:\n");
533 printf("\t-V\t- Version. Display program version\n"); 565 printf("\t-V\t- Version. Display program version\n");
534 printf("\t-C\t- Decrypt (compressible encryption) the entire file and output on stdout (not typically useful)\n");
535 printf("\t-D\t- Include deleted items in output\n"); 566 printf("\t-D\t- Include deleted items in output\n");
536 printf("\t-M\t- MH. Write emails in the MH format\n"); 567 printf("\t-M\t- MH. Write emails in the MH format\n");
537 printf("\t-S\t- Separate. Write emails in the separate format\n"); 568 printf("\t-S\t- Separate. Write emails in the separate format\n");
538 printf("\t-b\t- Don't save RTF-Body attachments\n"); 569 printf("\t-b\t- Don't save RTF-Body attachments\n");
539 printf("\t-c[v|l]\t- Set the Contact output mode. -cv = VCard, -cl = EMail list\n"); 570 printf("\t-c[v|l]\t- Set the Contact output mode. -cv = VCard, -cl = EMail list\n");
540 printf("\t-d <filename> \t- Debug to file. This is a binary log. Use readpstlog to print it\n"); 571 printf("\t-d <filename> \t- Debug to file. This is a binary log. Use readpstlog to print it\n");
541 printf("\t-h\t- Help. This screen\n"); 572 printf("\t-h\t- Help. This screen\n");
573 printf("\t-j <integer>\t- Number of parallel jobs to run\n");
542 printf("\t-k\t- KMail. Output in kmail format\n"); 574 printf("\t-k\t- KMail. Output in kmail format\n");
543 printf("\t-o <dirname>\t- Output directory to write files to. CWD is changed *after* opening pst file\n"); 575 printf("\t-o <dirname>\t- Output directory to write files to. CWD is changed *after* opening pst file\n");
544 printf("\t-q\t- Quiet. Only print error messages\n"); 576 printf("\t-q\t- Quiet. Only print error messages\n");
545 printf("\t-r\t- Recursive. Output in a recursive format\n"); 577 printf("\t-r\t- Recursive. Output in a recursive format\n");
546 printf("\t-w\t- Overwrite any output mbox files\n"); 578 printf("\t-w\t- Overwrite any output mbox files\n");
579 printf("\n");
580 printf("Only one of -k -M -r -S should be specified\n");
547 DEBUG_RET(); 581 DEBUG_RET();
548 } 582 }
549 583
550 584
551 void version() { 585 void version() {
711 #if !defined(WIN32) && !defined(__CYGWIN__) 745 #if !defined(WIN32) && !defined(__CYGWIN__)
712 DIR * sdir = NULL; 746 DIR * sdir = NULL;
713 struct dirent *dirent = NULL; 747 struct dirent *dirent = NULL;
714 struct stat filestat; 748 struct stat filestat;
715 if (!(sdir = opendir("./"))) { 749 if (!(sdir = opendir("./"))) {
716 WARN(("mk_separate_dir: Cannot open dir \"%s\" for deletion of old contents\n", "./")); 750 DEBUG_WARN(("mk_separate_dir: Cannot open dir \"%s\" for deletion of old contents\n", "./"));
717 } else { 751 } else {
718 while ((dirent = readdir(sdir))) { 752 while ((dirent = readdir(sdir))) {
719 if (lstat(dirent->d_name, &filestat) != -1) 753 if (lstat(dirent->d_name, &filestat) != -1)
720 if (S_ISREG(filestat.st_mode)) { 754 if (S_ISREG(filestat.st_mode)) {
721 if (unlink(dirent->d_name)) { 755 if (unlink(dirent->d_name)) {
749 int mk_separate_file(struct file_ll *f) { 783 int mk_separate_file(struct file_ll *f) {
750 const int name_offset = 1; 784 const int name_offset = 1;
751 DEBUG_ENT("mk_separate_file"); 785 DEBUG_ENT("mk_separate_file");
752 DEBUG_MAIN(("opening next file to save email\n")); 786 DEBUG_MAIN(("opening next file to save email\n"));
753 if (f->item_count > 999999999) { // bigger than nine 9's 787 if (f->item_count > 999999999) { // bigger than nine 9's
754 DIE(("mk_separate_file: The number of emails in this folder has become too high to handle")); 788 DIE(("mk_separate_file: The number of emails in this folder has become too high to handle\n"));
755 } 789 }
756 sprintf(f->name, SEP_MAIL_FILE_TEMPLATE, f->item_count + name_offset); 790 sprintf(f->name, SEP_MAIL_FILE_TEMPLATE, f->item_count + name_offset);
757 if (f->output) fclose(f->output); 791 if (f->output) fclose(f->output);
758 f->output = NULL; 792 f->output = NULL;
759 check_filename(f->name); 793 check_filename(f->name);
846 DIE(("error finding attachment name. exhausted possibilities to %s\n", temp)); 880 DIE(("error finding attachment name. exhausted possibilities to %s\n", temp));
847 } 881 }
848 } 882 }
849 DEBUG_EMAIL(("Saving attachment to %s\n", temp)); 883 DEBUG_EMAIL(("Saving attachment to %s\n", temp));
850 if (!(fp = fopen(temp, "w"))) { 884 if (!(fp = fopen(temp, "w"))) {
851 WARN(("write_separate_attachment: Cannot open attachment save file \"%s\"\n", temp)); 885 DEBUG_WARN(("write_separate_attachment: Cannot open attachment save file \"%s\"\n", temp));
852 } else { 886 } else {
853 (void)pst_attach_to_file(pst, attach, fp); 887 (void)pst_attach_to_file(pst, attach, fp);
854 fclose(fp); 888 fclose(fp);
855 } 889 }
856 if (temp) free(temp); 890 if (temp) free(temp);
1679 const char* rules[] = {"DAILY", "WEEKLY", "MONTHLY", "YEARLY"}; 1713 const char* rules[] = {"DAILY", "WEEKLY", "MONTHLY", "YEARLY"};
1680 const char* days[] = {"SU", "MO", "TU", "WE", "TH", "FR", "SA"}; 1714 const char* days[] = {"SU", "MO", "TU", "WE", "TH", "FR", "SA"};
1681 pst_recurrence *rdata = pst_convert_recurrence(appointment); 1715 pst_recurrence *rdata = pst_convert_recurrence(appointment);
1682 fprintf(f_output, "RRULE:FREQ=%s", rules[rdata->type]); 1716 fprintf(f_output, "RRULE:FREQ=%s", rules[rdata->type]);
1683 if (rdata->count) fprintf(f_output, ";COUNT=%u", rdata->count); 1717 if (rdata->count) fprintf(f_output, ";COUNT=%u", rdata->count);
1684 if (rdata->interval) fprintf(f_output, ";INTERVAL=%u", rdata->interval); 1718 if ((rdata->interval != 1) &&
1719 (rdata->interval)) fprintf(f_output, ";INTERVAL=%u", rdata->interval);
1685 if (rdata->dayofmonth) fprintf(f_output, ";BYMONTHDAY=%d", rdata->dayofmonth); 1720 if (rdata->dayofmonth) fprintf(f_output, ";BYMONTHDAY=%d", rdata->dayofmonth);
1686 if (rdata->monthofyear) fprintf(f_output, ";BYMONTH=%d", rdata->monthofyear); 1721 if (rdata->monthofyear) fprintf(f_output, ";BYMONTH=%d", rdata->monthofyear);
1687 if (rdata->position) fprintf(f_output, ";BYSETPOS=%d", rdata->position); 1722 if (rdata->position) fprintf(f_output, ";BYSETPOS=%d", rdata->position);
1688 if (rdata->bydaymask) { 1723 if (rdata->bydaymask) {
1689 char byday[40]; 1724 char byday[40];
1692 memset(byday, 0, sizeof(byday)); 1727 memset(byday, 0, sizeof(byday));
1693 for (i=0; i<6; i++) { 1728 for (i=0; i<6; i++) {
1694 int bit = 1 << i; 1729 int bit = 1 << i;
1695 if (bit & rdata->bydaymask) { 1730 if (bit & rdata->bydaymask) {
1696 char temp[40]; 1731 char temp[40];
1697 snprintf(temp, sizeof(temp), "%s%s%s", byday, (empty) ? "BYDAY=" : ";", days[i]); 1732 snprintf(temp, sizeof(temp), "%s%s%s", byday, (empty) ? ";BYDAY=" : ";", days[i]);
1698 strcpy(byday, temp); 1733 strcpy(byday, temp);
1699 empty = 0; 1734 empty = 0;
1700 } 1735 }
1701 } 1736 }
1737 fprintf(f_output, "%s", byday);
1702 } 1738 }
1703 fprintf(f_output, "\n"); 1739 fprintf(f_output, "\n");
1704 pst_free_recurrence(rdata); 1740 pst_free_recurrence(rdata);
1705 } 1741 }
1706 switch (appointment->label) { 1742 switch (appointment->label) {
1814 if (f->output) { 1850 if (f->output) {
1815 struct stat st; 1851 struct stat st;
1816 fclose(f->output); 1852 fclose(f->output);
1817 stat(f->name, &st); 1853 stat(f->name, &st);
1818 if (!st.st_size) { 1854 if (!st.st_size) {
1819 WARN(("removing empty output file %s ", f->name)); 1855 DEBUG_WARN(("removing empty output file %s\n", f->name));
1820 remove(f->name); 1856 remove(f->name);
1821 } 1857 }
1822 } 1858 }
1823 free(f->name); 1859 free(f->name);
1824 free(f->dname); 1860 free(f->dname);