comparison src/readpst.c @ 200:d360f96f71f6

start changes for parallel readpst on multi-processor machines
author Carl Byington <carl@five-ten-sg.com>
date Wed, 13 May 2009 20:06:53 -0700
parents e3a46f66332b
children 3850a3b11745
comparison
equal deleted inserted replaced
199:e3a46f66332b 200:d360f96f71f6
24 int32_t item_count; 24 int32_t item_count;
25 int32_t skip_count; 25 int32_t skip_count;
26 int32_t type; 26 int32_t type;
27 }; 27 };
28 28
29 void grim_reaper();
30 pid_t try_fork();
29 void process(pst_item *outeritem, pst_desc_tree *d_ptr); 31 void process(pst_item *outeritem, pst_desc_tree *d_ptr);
30 void write_email_body(FILE *f, char *body); 32 void write_email_body(FILE *f, char *body);
31 void removeCR(char *c); 33 void removeCR(char *c);
32 void usage(); 34 void usage();
33 void version(); 35 void version();
117 int overwrite = 0; 119 int overwrite = 0;
118 int save_rtf_body = 1; 120 int save_rtf_body = 1;
119 pst_file pstfile; 121 pst_file pstfile;
120 regex_t meta_charset_pattern; 122 regex_t meta_charset_pattern;
121 123
124 int active_children; // number of children of this process, cannot be larger than max_children
125 int max_children; // setup by main(), and at the start of new child process
126 pid_t *child_processes; // setup by main(), and at the start of new child process
127
128 #ifdef HAVE_SEMAPHORE_H
129 sem_t global_children;
130 #endif
131
132
133 void grim_reaper(int waitall)
134 {
135 #ifdef HAVE_FORK
136 #ifdef HAVE_SEMAPHORE_H
137 printf("grim reaper %s for pid %d (parent %d) with %d children\n", (waitall) ? "all" : "", getpid(), getppid(), active_children);
138 fflush(stdout);
139 int i,j;
140 for (i=0; i<active_children; i++) {
141 pid_t child = child_processes[i];
142 pid_t ch = waitpid(child, NULL, ((waitall) ? 0 : WNOHANG));
143 if (ch == child) {
144 // this has terminated, remove it from the list
145 for (j=i; j<active_children-1; j++) {
146 child_processes[j] = child_processes[j+1];
147 }
148 active_children--;
149 i--;
150 }
151 }
152 printf("grim reaper %s for pid %d with %d children\n", (waitall) ? "all" : "", getpid(), active_children);
153 fflush(stdout);
154 #endif
155 #endif
156 }
157
158
159 pid_t try_fork()
160 {
161 #ifdef HAVE_FORK
162 #ifdef HAVE_SEMAPHORE_H
163 int available;
164 grim_reaper(0);
165 sem_getvalue(&global_children, &available);
166 if (available) {
167 sem_wait(&global_children);
168 pid_t child = fork();
169 if (child < 0) {
170 // fork failed, pretend it worked and we are the child
171 return 0;
172 }
173 else if (child == 0) {
174 pid_t me = getpid();
175 printf("forked child pid %d \n", me);
176 fflush(stdout);
177 // fork worked, and we are the child, reinitialize *our* list of children
178 active_children = 0;
179 memset(child_processes, 0, sizeof(pid_t) * max_children);
180 }
181 else {
182 // fork worked, and we are the parent, record this child that we need to wait for
183 child_processes[active_children++] = child;
184 }
185 return child;
186 }
187 else {
188 return 0; // pretend to have forked and we are the child
189 }
190 #endif
191 #endif
192 return 0;
193 }
194
122 195
123 void process(pst_item *outeritem, pst_desc_tree *d_ptr) 196 void process(pst_item *outeritem, pst_desc_tree *d_ptr)
124 { 197 {
125 struct file_ll ff; 198 struct file_ll ff;
126 pst_item *item = NULL; 199 pst_item *item = NULL;
152 } 225 }
153 226
154 if (item->folder && item->file_as.str) { 227 if (item->folder && item->file_as.str) {
155 DEBUG_MAIN(("Processing Folder \"%s\"\n", item->file_as.str)); 228 DEBUG_MAIN(("Processing Folder \"%s\"\n", item->file_as.str));
156 if (output_mode != OUTPUT_QUIET) printf("Processing Folder \"%s\"\n", item->file_as.str); 229 if (output_mode != OUTPUT_QUIET) printf("Processing Folder \"%s\"\n", item->file_as.str);
230 fflush(stdout);
157 ff.item_count++; 231 ff.item_count++;
158 if (d_ptr->child && (deleted_mode == DMODE_INCLUDE || strcasecmp(item->file_as.str, "Deleted Items"))) { 232 if (d_ptr->child && (deleted_mode == DMODE_INCLUDE || strcasecmp(item->file_as.str, "Deleted Items"))) {
159 //if this is a non-empty folder other than deleted items, we want to recurse into it 233 //if this is a non-empty folder other than deleted items, we want to recurse into it
160 process(item, d_ptr->child); 234 pid_t parent = getpid();
235 pid_t child = try_fork();
236 if (child == 0) {
237 // we are the child process, or the original parent if no children were available
238 pid_t me = getpid();
239 process(item, d_ptr->child);
240 #ifdef HAVE_FORK
241 #ifdef HAVE_SEMAPHORE_H
242 if (me != parent) {
243 // we really were a child, forked for the sole purpose of processing this folder
244 // free my child count slot before really exiting, since
245 // all I am doing here is waiting for my children to exit
246 sem_post(&global_children);
247 grim_reaper(1); // wait for all my child processes to exit
248 exit(0); // really exit
249 }
250 #endif
251 #endif
252 }
161 } 253 }
162 254
163 } else if (item->contact && (item->type == PST_TYPE_CONTACT)) { 255 } else if (item->contact && (item->type == PST_TYPE_CONTACT)) {
164 if (!ff.type) ff.type = item->type; 256 if (!ff.type) ff.type = item->type;
165 DEBUG_MAIN(("main: Processing Contact\n")); 257 DEBUG_MAIN(("main: Processing Contact\n"));
380 if (!d_ptr) { 472 if (!d_ptr) {
381 DEBUG_RET(); 473 DEBUG_RET();
382 DIE(("Top of folders record not found. Cannot continue\n")); 474 DIE(("Top of folders record not found. Cannot continue\n"));
383 } 475 }
384 476
477 max_children = (d_log) ? 0 : 10;
478 child_processes = (pid_t *)pst_malloc(sizeof(pid_t) * max_children);
479 active_children = 0;
480 memset(child_processes, 0, sizeof(pid_t) * max_children);
481 #ifdef HAVE_SEMAPHORE_H
482 sem_init(&global_children, 1, max_children);
483 #endif
385 process(item, d_ptr->child); // do the children of TOPF 484 process(item, d_ptr->child); // do the children of TOPF
485 grim_reaper(1); // wait for all child processes
486
386 pst_freeItem(item); 487 pst_freeItem(item);
387 pst_close(&pstfile); 488 pst_close(&pstfile);
388 DEBUG_RET(); 489 DEBUG_RET();
389 regfree(&meta_charset_pattern); 490 regfree(&meta_charset_pattern);
390 return 0; 491 return 0;
1227 fprintf(f_output, "Cc: %s\n", item->email->cc_address.str); 1328 fprintf(f_output, "Cc: %s\n", item->email->cc_address.str);
1228 } 1329 }
1229 1330
1230 if (!has_date && item->email->sent_date) { 1331 if (!has_date && item->email->sent_date) {
1231 char c_time[C_TIME_SIZE]; 1332 char c_time[C_TIME_SIZE];
1232 strftime(c_time, C_TIME_SIZE, "%a, %d %b %Y %H:%M:%S %z", gmtime(&em_time)); 1333 struct tm stm;
1334 gmtime_r(&em_time, &stm);
1335 strftime(c_time, C_TIME_SIZE, "%a, %d %b %Y %H:%M:%S %z", &stm);
1233 fprintf(f_output, "Date: %s\n", c_time); 1336 fprintf(f_output, "Date: %s\n", c_time);
1234 } 1337 }
1235 1338
1236 if (!has_msgid && item->email->messageid.str) { 1339 if (!has_msgid && item->email->messageid.str) {
1237 pst_convert_utf8(item, &item->email->messageid); 1340 pst_convert_utf8(item, &item->email->messageid);