Mercurial > libpst
comparison src/readpst.c @ 200:d360f96f71f6
start changes for parallel readpst on multi-processor machines
author | Carl Byington <carl@five-ten-sg.com> |
---|---|
date | Wed, 13 May 2009 20:06:53 -0700 |
parents | e3a46f66332b |
children | 3850a3b11745 |
comparison
equal
deleted
inserted
replaced
199:e3a46f66332b | 200:d360f96f71f6 |
---|---|
24 int32_t item_count; | 24 int32_t item_count; |
25 int32_t skip_count; | 25 int32_t skip_count; |
26 int32_t type; | 26 int32_t type; |
27 }; | 27 }; |
28 | 28 |
29 void grim_reaper(); | |
30 pid_t try_fork(); | |
29 void process(pst_item *outeritem, pst_desc_tree *d_ptr); | 31 void process(pst_item *outeritem, pst_desc_tree *d_ptr); |
30 void write_email_body(FILE *f, char *body); | 32 void write_email_body(FILE *f, char *body); |
31 void removeCR(char *c); | 33 void removeCR(char *c); |
32 void usage(); | 34 void usage(); |
33 void version(); | 35 void version(); |
117 int overwrite = 0; | 119 int overwrite = 0; |
118 int save_rtf_body = 1; | 120 int save_rtf_body = 1; |
119 pst_file pstfile; | 121 pst_file pstfile; |
120 regex_t meta_charset_pattern; | 122 regex_t meta_charset_pattern; |
121 | 123 |
124 int active_children; // number of children of this process, cannot be larger than max_children | |
125 int max_children; // setup by main(), and at the start of new child process | |
126 pid_t *child_processes; // setup by main(), and at the start of new child process | |
127 | |
128 #ifdef HAVE_SEMAPHORE_H | |
129 sem_t global_children; | |
130 #endif | |
131 | |
132 | |
133 void grim_reaper(int waitall) | |
134 { | |
135 #ifdef HAVE_FORK | |
136 #ifdef HAVE_SEMAPHORE_H | |
137 printf("grim reaper %s for pid %d (parent %d) with %d children\n", (waitall) ? "all" : "", getpid(), getppid(), active_children); | |
138 fflush(stdout); | |
139 int i,j; | |
140 for (i=0; i<active_children; i++) { | |
141 pid_t child = child_processes[i]; | |
142 pid_t ch = waitpid(child, NULL, ((waitall) ? 0 : WNOHANG)); | |
143 if (ch == child) { | |
144 // this has terminated, remove it from the list | |
145 for (j=i; j<active_children-1; j++) { | |
146 child_processes[j] = child_processes[j+1]; | |
147 } | |
148 active_children--; | |
149 i--; | |
150 } | |
151 } | |
152 printf("grim reaper %s for pid %d with %d children\n", (waitall) ? "all" : "", getpid(), active_children); | |
153 fflush(stdout); | |
154 #endif | |
155 #endif | |
156 } | |
157 | |
158 | |
159 pid_t try_fork() | |
160 { | |
161 #ifdef HAVE_FORK | |
162 #ifdef HAVE_SEMAPHORE_H | |
163 int available; | |
164 grim_reaper(0); | |
165 sem_getvalue(&global_children, &available); | |
166 if (available) { | |
167 sem_wait(&global_children); | |
168 pid_t child = fork(); | |
169 if (child < 0) { | |
170 // fork failed, pretend it worked and we are the child | |
171 return 0; | |
172 } | |
173 else if (child == 0) { | |
174 pid_t me = getpid(); | |
175 printf("forked child pid %d \n", me); | |
176 fflush(stdout); | |
177 // fork worked, and we are the child, reinitialize *our* list of children | |
178 active_children = 0; | |
179 memset(child_processes, 0, sizeof(pid_t) * max_children); | |
180 } | |
181 else { | |
182 // fork worked, and we are the parent, record this child that we need to wait for | |
183 child_processes[active_children++] = child; | |
184 } | |
185 return child; | |
186 } | |
187 else { | |
188 return 0; // pretend to have forked and we are the child | |
189 } | |
190 #endif | |
191 #endif | |
192 return 0; | |
193 } | |
194 | |
122 | 195 |
123 void process(pst_item *outeritem, pst_desc_tree *d_ptr) | 196 void process(pst_item *outeritem, pst_desc_tree *d_ptr) |
124 { | 197 { |
125 struct file_ll ff; | 198 struct file_ll ff; |
126 pst_item *item = NULL; | 199 pst_item *item = NULL; |
152 } | 225 } |
153 | 226 |
154 if (item->folder && item->file_as.str) { | 227 if (item->folder && item->file_as.str) { |
155 DEBUG_MAIN(("Processing Folder \"%s\"\n", item->file_as.str)); | 228 DEBUG_MAIN(("Processing Folder \"%s\"\n", item->file_as.str)); |
156 if (output_mode != OUTPUT_QUIET) printf("Processing Folder \"%s\"\n", item->file_as.str); | 229 if (output_mode != OUTPUT_QUIET) printf("Processing Folder \"%s\"\n", item->file_as.str); |
230 fflush(stdout); | |
157 ff.item_count++; | 231 ff.item_count++; |
158 if (d_ptr->child && (deleted_mode == DMODE_INCLUDE || strcasecmp(item->file_as.str, "Deleted Items"))) { | 232 if (d_ptr->child && (deleted_mode == DMODE_INCLUDE || strcasecmp(item->file_as.str, "Deleted Items"))) { |
159 //if this is a non-empty folder other than deleted items, we want to recurse into it | 233 //if this is a non-empty folder other than deleted items, we want to recurse into it |
160 process(item, d_ptr->child); | 234 pid_t parent = getpid(); |
235 pid_t child = try_fork(); | |
236 if (child == 0) { | |
237 // we are the child process, or the original parent if no children were available | |
238 pid_t me = getpid(); | |
239 process(item, d_ptr->child); | |
240 #ifdef HAVE_FORK | |
241 #ifdef HAVE_SEMAPHORE_H | |
242 if (me != parent) { | |
243 // we really were a child, forked for the sole purpose of processing this folder | |
244 // free my child count slot before really exiting, since | |
245 // all I am doing here is waiting for my children to exit | |
246 sem_post(&global_children); | |
247 grim_reaper(1); // wait for all my child processes to exit | |
248 exit(0); // really exit | |
249 } | |
250 #endif | |
251 #endif | |
252 } | |
161 } | 253 } |
162 | 254 |
163 } else if (item->contact && (item->type == PST_TYPE_CONTACT)) { | 255 } else if (item->contact && (item->type == PST_TYPE_CONTACT)) { |
164 if (!ff.type) ff.type = item->type; | 256 if (!ff.type) ff.type = item->type; |
165 DEBUG_MAIN(("main: Processing Contact\n")); | 257 DEBUG_MAIN(("main: Processing Contact\n")); |
380 if (!d_ptr) { | 472 if (!d_ptr) { |
381 DEBUG_RET(); | 473 DEBUG_RET(); |
382 DIE(("Top of folders record not found. Cannot continue\n")); | 474 DIE(("Top of folders record not found. Cannot continue\n")); |
383 } | 475 } |
384 | 476 |
477 max_children = (d_log) ? 0 : 10; | |
478 child_processes = (pid_t *)pst_malloc(sizeof(pid_t) * max_children); | |
479 active_children = 0; | |
480 memset(child_processes, 0, sizeof(pid_t) * max_children); | |
481 #ifdef HAVE_SEMAPHORE_H | |
482 sem_init(&global_children, 1, max_children); | |
483 #endif | |
385 process(item, d_ptr->child); // do the children of TOPF | 484 process(item, d_ptr->child); // do the children of TOPF |
485 grim_reaper(1); // wait for all child processes | |
486 | |
386 pst_freeItem(item); | 487 pst_freeItem(item); |
387 pst_close(&pstfile); | 488 pst_close(&pstfile); |
388 DEBUG_RET(); | 489 DEBUG_RET(); |
389 regfree(&meta_charset_pattern); | 490 regfree(&meta_charset_pattern); |
390 return 0; | 491 return 0; |
1227 fprintf(f_output, "Cc: %s\n", item->email->cc_address.str); | 1328 fprintf(f_output, "Cc: %s\n", item->email->cc_address.str); |
1228 } | 1329 } |
1229 | 1330 |
1230 if (!has_date && item->email->sent_date) { | 1331 if (!has_date && item->email->sent_date) { |
1231 char c_time[C_TIME_SIZE]; | 1332 char c_time[C_TIME_SIZE]; |
1232 strftime(c_time, C_TIME_SIZE, "%a, %d %b %Y %H:%M:%S %z", gmtime(&em_time)); | 1333 struct tm stm; |
1334 gmtime_r(&em_time, &stm); | |
1335 strftime(c_time, C_TIME_SIZE, "%a, %d %b %Y %H:%M:%S %z", &stm); | |
1233 fprintf(f_output, "Date: %s\n", c_time); | 1336 fprintf(f_output, "Date: %s\n", c_time); |
1234 } | 1337 } |
1235 | 1338 |
1236 if (!has_msgid && item->email->messageid.str) { | 1339 if (!has_msgid && item->email->messageid.str) { |
1237 pst_convert_utf8(item, &item->email->messageid); | 1340 pst_convert_utf8(item, &item->email->messageid); |