diff options
Diffstat (limited to 'examples/pigz.c')
-rw-r--r-- | examples/pigz.c | 452 |
1 files changed, 452 insertions, 0 deletions
diff --git a/examples/pigz.c b/examples/pigz.c new file mode 100644 index 0000000..42794d0 --- /dev/null +++ b/examples/pigz.c | |||
@@ -0,0 +1,452 @@ | |||
1 | /* pigz.c -- parallel implementation of gzip | ||
2 | * Copyright (C) 2007 Mark Adler | ||
3 | * Version 1.1 28 January 2007 Mark Adler | ||
4 | */ | ||
5 | |||
6 | /* Version history: | ||
7 | 1.0 17 Jan 2007 First version | ||
8 | 1.1 28 Jan 2007 Avoid void * arithmetic (some compilers don't get that) | ||
9 | Add note about requiring zlib 1.2.3 | ||
10 | Allow compression level 0 (no compression) | ||
11 | Completely rewrite parallelism -- add a write thread | ||
12 | Use deflateSetDictionary() to make use of history | ||
13 | Tune argument defaults to best performance on four cores | ||
14 | */ | ||
15 | |||
16 | /* | ||
17 | pigz compresses from stdin to stdout using threads to make use of multiple | ||
18 | processors and cores. The input is broken up into 128 KB chunks, and each | ||
19 | is compressed separately. The CRC for each chunk is also calculated | ||
20 | separately. The compressed chunks are written in order to the output, | ||
21 | and the overall CRC is calculated from the CRC's of the chunks. | ||
22 | |||
23 | The compressed data format generated is the gzip format using the deflate | ||
24 | compression method. First a gzip header is written, followed by raw deflate | ||
25 | partial streams. They are partial, in that they do not have a terminating | ||
26 | block. At the end, the deflate stream is terminated with a final empty | ||
27 | static block, and lastly a gzip trailer is written with the CRC and the | ||
28 | number of input bytes. | ||
29 | |||
30 | Each raw deflate partial stream is terminated by an empty stored block | ||
31 | (using the Z_SYNC_FLUSH option of zlib), in order to end that partial | ||
32 | bit stream at a byte boundary. That allows the partial streams to be | ||
33 | concantenated simply as sequences of bytes. This adds a very small four | ||
34 | or five byte overhead to the output for each input chunk. | ||
35 | |||
36 | zlib's crc32_combine() routine allows the calcuation of the CRC of the | ||
37 | entire input using the independent CRC's of the chunks. pigz requires zlib | ||
38 | version 1.2.3 or later, since that is the first version that provides the | ||
39 | crc32_combine() function. | ||
40 | |||
41 | pigz uses the POSIX pthread library for thread control and communication. | ||
42 | */ | ||
43 | |||
44 | #include <stdio.h> | ||
45 | #include <stdlib.h> | ||
46 | #include <string.h> | ||
47 | #include <pthread.h> | ||
48 | #include <sys/types.h> | ||
49 | #include <sys/uio.h> | ||
50 | #include <unistd.h> | ||
51 | #include "zlib.h" | ||
52 | |||
53 | #define local static | ||
54 | |||
55 | /* exit with error */ | ||
56 | local void bail(char *msg) | ||
57 | { | ||
58 | fprintf(stderr, "pigz abort: %s\n", msg); | ||
59 | exit(1); | ||
60 | } | ||
61 | |||
62 | /* read up to len bytes into buf, repeating read() calls as needed */ | ||
63 | local size_t readn(int desc, unsigned char *buf, size_t len) | ||
64 | { | ||
65 | ssize_t ret; | ||
66 | size_t got; | ||
67 | |||
68 | got = 0; | ||
69 | while (len) { | ||
70 | ret = read(desc, buf, len); | ||
71 | if (ret < 0) | ||
72 | bail("read error"); | ||
73 | if (ret == 0) | ||
74 | break; | ||
75 | buf += ret; | ||
76 | len -= ret; | ||
77 | got += ret; | ||
78 | } | ||
79 | return got; | ||
80 | } | ||
81 | |||
82 | /* write len bytes, repeating write() calls as needed */ | ||
83 | local void writen(int desc, unsigned char *buf, size_t len) | ||
84 | { | ||
85 | ssize_t ret; | ||
86 | |||
87 | while (len) { | ||
88 | ret = write(desc, buf, len); | ||
89 | if (ret < 1) | ||
90 | bail("write error"); | ||
91 | buf += ret; | ||
92 | len -= ret; | ||
93 | } | ||
94 | } | ||
95 | |||
96 | /* a flag variable for communication between two threads */ | ||
97 | struct flag { | ||
98 | int value; /* value of flag */ | ||
99 | pthread_mutex_t lock; /* lock for checking and changing flag */ | ||
100 | pthread_cond_t cond; /* condition for signaling on flag change */ | ||
101 | }; | ||
102 | |||
103 | /* initialize a flag for use, starting with value val */ | ||
104 | local void flag_init(struct flag *me, int val) | ||
105 | { | ||
106 | me->value = val; | ||
107 | pthread_mutex_init(&(me->lock), NULL); | ||
108 | pthread_cond_init(&(me->cond), NULL); | ||
109 | } | ||
110 | |||
111 | /* set the flag to val, signal another process that may be waiting for it */ | ||
112 | local void flag_set(struct flag *me, int val) | ||
113 | { | ||
114 | pthread_mutex_lock(&(me->lock)); | ||
115 | me->value = val; | ||
116 | pthread_cond_signal(&(me->cond)); | ||
117 | pthread_mutex_unlock(&(me->lock)); | ||
118 | } | ||
119 | |||
120 | /* if it isn't already, wait for some other thread to set the flag to val */ | ||
121 | local void flag_wait(struct flag *me, int val) | ||
122 | { | ||
123 | pthread_mutex_lock(&(me->lock)); | ||
124 | while (me->value != val) | ||
125 | pthread_cond_wait(&(me->cond), &(me->lock)); | ||
126 | pthread_mutex_unlock(&(me->lock)); | ||
127 | } | ||
128 | |||
129 | /* if flag is equal to val, wait for some other thread to change it */ | ||
130 | local void flag_wait_not(struct flag *me, int val) | ||
131 | { | ||
132 | pthread_mutex_lock(&(me->lock)); | ||
133 | while (me->value == val) | ||
134 | pthread_cond_wait(&(me->cond), &(me->lock)); | ||
135 | pthread_mutex_unlock(&(me->lock)); | ||
136 | } | ||
137 | |||
138 | /* clean up the flag when done with it */ | ||
139 | local void flag_done(struct flag *me) | ||
140 | { | ||
141 | pthread_cond_destroy(&(me->cond)); | ||
142 | pthread_mutex_destroy(&(me->lock)); | ||
143 | } | ||
144 | |||
145 | /* a unit of work to feed to compress_thread() -- it is assumed that the out | ||
146 | buffer is large enough to hold the maximum size len bytes could deflate to, | ||
147 | plus five bytes for the final sync marker */ | ||
148 | struct work { | ||
149 | size_t len; /* length of input */ | ||
150 | unsigned long crc; /* crc of input */ | ||
151 | unsigned char *buf; /* input */ | ||
152 | unsigned char *out; /* space for output (guaranteed big enough) */ | ||
153 | z_stream strm; /* pre-initialized z_stream */ | ||
154 | struct flag busy; /* busy flag indicating work unit in use */ | ||
155 | pthread_t comp; /* this compression thread */ | ||
156 | }; | ||
157 | |||
158 | /* busy flag values */ | ||
159 | #define IDLE 0 /* compress and writing done -- can start compress */ | ||
160 | #define COMP 1 /* compress -- input and output buffers in use */ | ||
161 | #define WRITE 2 /* compress done, writing output -- can read input */ | ||
162 | |||
163 | /* read-only globals (set by main/read thread before others started) */ | ||
164 | local int ind; /* input file descriptor */ | ||
165 | local int outd; /* output file descriptor */ | ||
166 | local int level; /* compression level */ | ||
167 | local int procs; /* number of compression threads (>= 2) */ | ||
168 | local size_t size; /* uncompressed input size per thread (>= 32K) */ | ||
169 | local struct work *jobs; /* work units: jobs[0..procs-1] */ | ||
170 | |||
171 | /* next and previous jobs[] indices */ | ||
172 | #define NEXT(n) ((n) == procs - 1 ? 0 : (n) + 1) | ||
173 | #define PREV(n) ((n) == 0 ? procs - 1 : (n) - 1) | ||
174 | |||
175 | /* sliding dictionary size for deflate */ | ||
176 | #define DICT 32768U | ||
177 | |||
178 | /* largest power of 2 that fits in an unsigned int -- used to limit requests | ||
179 | to zlib functions that use unsigned int lengths */ | ||
180 | #define MAX ((((unsigned)-1) >> 1) + 1) | ||
181 | |||
182 | /* compress thread: compress the input in the provided work unit and compute | ||
183 | its crc -- assume that the amount of space at job->out is guaranteed to be | ||
184 | enough for the compressed output, as determined by the maximum expansion | ||
185 | of deflate compression -- use the input in the previous work unit (if there | ||
186 | is one) to set the deflate dictionary for better compression */ | ||
187 | local void *compress_thread(void *arg) | ||
188 | { | ||
189 | size_t len; /* input length for this work unit */ | ||
190 | unsigned long crc; /* crc of input data */ | ||
191 | struct work *prev; /* previous work unit */ | ||
192 | struct work *job = arg; /* work unit for this thread */ | ||
193 | z_stream *strm = &(job->strm); /* zlib stream for this work unit */ | ||
194 | |||
195 | /* reset state for a new compressed stream */ | ||
196 | (void)deflateReset(strm); | ||
197 | |||
198 | /* initialize input, output, and crc */ | ||
199 | strm->next_in = job->buf; | ||
200 | strm->next_out = job->out; | ||
201 | len = job->len; | ||
202 | crc = crc32(0L, Z_NULL, 0); | ||
203 | |||
204 | /* set dictionary if this isn't the first work unit, and if we will be | ||
205 | compressing something (the read thread assures that the dictionary | ||
206 | data in the previous work unit is still there) */ | ||
207 | prev = jobs + PREV(job - jobs); | ||
208 | if (prev->buf != NULL && len != 0) | ||
209 | deflateSetDictionary(strm, prev->buf + (size - DICT), DICT); | ||
210 | |||
211 | /* run MAX-sized amounts of input through deflate and crc32 -- this loop | ||
212 | is needed for those cases where the integer type is smaller than the | ||
213 | size_t type, or when len is close to the limit of the size_t type */ | ||
214 | while (len > MAX) { | ||
215 | strm->avail_in = MAX; | ||
216 | strm->avail_out = (unsigned)-1; | ||
217 | crc = crc32(crc, strm->next_in, strm->avail_in); | ||
218 | (void)deflate(strm, Z_NO_FLUSH); | ||
219 | len -= MAX; | ||
220 | } | ||
221 | |||
222 | /* run last piece through deflate and crc32, follow with a sync marker */ | ||
223 | if (len) { | ||
224 | strm->avail_in = len; | ||
225 | strm->avail_out = (unsigned)-1; | ||
226 | crc = crc32(crc, strm->next_in, strm->avail_in); | ||
227 | (void)deflate(strm, Z_SYNC_FLUSH); | ||
228 | } | ||
229 | |||
230 | /* don't need to Z_FINISH, since we'd delete the last two bytes anyway */ | ||
231 | |||
232 | /* return result */ | ||
233 | job->crc = crc; | ||
234 | return NULL; | ||
235 | } | ||
236 | |||
237 | /* put a 4-byte integer into a byte array in LSB order */ | ||
238 | #define PUT4(a,b) (*(a)=(b),(a)[1]=(b)>>8,(a)[2]=(b)>>16,(a)[3]=(b)>>24) | ||
239 | |||
240 | /* write thread: wait for compression threads to complete, write output in | ||
241 | order, also write gzip header and trailer around the compressed data */ | ||
242 | local void *write_thread(void *arg) | ||
243 | { | ||
244 | int n; /* compress thread index */ | ||
245 | size_t len; /* length of input processed */ | ||
246 | unsigned long tot; /* total uncompressed size (overflow ok) */ | ||
247 | unsigned long crc; /* CRC-32 of uncompressed data */ | ||
248 | unsigned char wrap[10]; /* gzip header or trailer */ | ||
249 | |||
250 | /* write simple gzip header */ | ||
251 | memcpy(wrap, "\037\213\10\0\0\0\0\0\0\3", 10); | ||
252 | wrap[8] = level == 9 ? 2 : (level == 1 ? 4 : 0); | ||
253 | writen(outd, wrap, 10); | ||
254 | |||
255 | /* process output of compress threads until end of input */ | ||
256 | tot = 0; | ||
257 | crc = crc32(0L, Z_NULL, 0); | ||
258 | n = 0; | ||
259 | do { | ||
260 | /* wait for compress thread to start, then wait to complete */ | ||
261 | flag_wait(&(jobs[n].busy), COMP); | ||
262 | pthread_join(jobs[n].comp, NULL); | ||
263 | |||
264 | /* now that compress is done, allow read thread to use input buffer */ | ||
265 | flag_set(&(jobs[n].busy), WRITE); | ||
266 | |||
267 | /* write compressed data and update length and crc */ | ||
268 | writen(outd, jobs[n].out, jobs[n].strm.next_out - jobs[n].out); | ||
269 | len = jobs[n].len; | ||
270 | tot += len; | ||
271 | crc = crc32_combine(crc, jobs[n].crc, len); | ||
272 | |||
273 | /* release this work unit and go to the next work unit */ | ||
274 | flag_set(&(jobs[n].busy), IDLE); | ||
275 | n = NEXT(n); | ||
276 | |||
277 | /* an input buffer less than size in length indicates end of input */ | ||
278 | } while (len == size); | ||
279 | |||
280 | /* write final static block and gzip trailer (crc and len mod 2^32) */ | ||
281 | wrap[0] = 3; wrap[1] = 0; | ||
282 | PUT4(wrap + 2, crc); | ||
283 | PUT4(wrap + 6, tot); | ||
284 | writen(outd, wrap, 10); | ||
285 | return NULL; | ||
286 | } | ||
287 | |||
288 | /* one-time initialization of a work unit -- this is where we set the deflate | ||
289 | compression level and request raw deflate, and also where we set the size | ||
290 | of the output buffer to guarantee enough space for a worst-case deflate | ||
291 | ending with a Z_SYNC_FLUSH */ | ||
292 | local void job_init(struct work *job) | ||
293 | { | ||
294 | int ret; /* deflateInit2() return value */ | ||
295 | |||
296 | job->buf = malloc(size); | ||
297 | job->out = malloc(size + (size >> 11) + 10); | ||
298 | job->strm.zfree = Z_NULL; | ||
299 | job->strm.zalloc = Z_NULL; | ||
300 | job->strm.opaque = Z_NULL; | ||
301 | ret = deflateInit2(&(job->strm), level, Z_DEFLATED, -15, 8, | ||
302 | Z_DEFAULT_STRATEGY); | ||
303 | if (job->buf == NULL || job->out == NULL || ret != Z_OK) | ||
304 | bail("not enough memory"); | ||
305 | } | ||
306 | |||
307 | /* compress ind to outd in the gzip format, using multiple threads for the | ||
308 | compression and crc calculation and another thread for writing the output -- | ||
309 | the read thread is the main thread */ | ||
310 | local void read_thread(void) | ||
311 | { | ||
312 | int n; /* general index */ | ||
313 | size_t got; /* amount read */ | ||
314 | pthread_attr_t attr; /* thread attributes (left at defaults) */ | ||
315 | pthread_t write; /* write thread */ | ||
316 | |||
317 | /* set defaults (not all pthread implementations default to joinable) */ | ||
318 | pthread_attr_init(&attr); | ||
319 | pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); | ||
320 | |||
321 | /* allocate and set up work list (individual work units will be initialized | ||
322 | as needed, in case the input is short), assure that allocation size | ||
323 | arithmetic does not overflow */ | ||
324 | if (size + (size >> 11) + 10 < (size >> 11) + 10 || | ||
325 | (ssize_t)(size + (size >> 11) + 10) < 0 || | ||
326 | ((size_t)0 - 1) / procs <= sizeof(struct work) || | ||
327 | (jobs = malloc(procs * sizeof(struct work))) == NULL) | ||
328 | bail("not enough memory"); | ||
329 | for (n = 0; n < procs; n++) { | ||
330 | jobs[n].buf = NULL; | ||
331 | flag_init(&(jobs[n].busy), IDLE); | ||
332 | } | ||
333 | |||
334 | /* start write thread */ | ||
335 | pthread_create(&write, &attr, write_thread, NULL); | ||
336 | |||
337 | /* read from input and start compress threads (write thread will pick up | ||
338 | the output of the compress threads) */ | ||
339 | n = 0; | ||
340 | do { | ||
341 | /* initialize this work unit if it's the first time it's used */ | ||
342 | if (jobs[n].buf == NULL) | ||
343 | job_init(jobs + n); | ||
344 | |||
345 | /* read input data, but wait for last compress on this work unit to be | ||
346 | done, and wait for the dictionary to be used by the last compress on | ||
347 | the next work unit */ | ||
348 | flag_wait_not(&(jobs[n].busy), COMP); | ||
349 | flag_wait_not(&(jobs[NEXT(n)].busy), COMP); | ||
350 | got = readn(ind, jobs[n].buf, size); | ||
351 | |||
352 | /* start compress thread, but wait for write to be done first */ | ||
353 | flag_wait(&(jobs[n].busy), IDLE); | ||
354 | jobs[n].len = got; | ||
355 | pthread_create(&(jobs[n].comp), &attr, compress_thread, jobs + n); | ||
356 | |||
357 | /* mark work unit so write thread knows compress was started */ | ||
358 | flag_set(&(jobs[n].busy), COMP); | ||
359 | |||
360 | /* go to the next work unit */ | ||
361 | n = NEXT(n); | ||
362 | |||
363 | /* do until end of input, indicated by a read less than size */ | ||
364 | } while (got == size); | ||
365 | |||
366 | /* wait for the write thread to complete -- the write thread will join with | ||
367 | all of the compress threads, so this waits for all of the threads to | ||
368 | complete */ | ||
369 | pthread_join(write, NULL); | ||
370 | |||
371 | /* free up all requested resources and return */ | ||
372 | for (n = procs - 1; n >= 0; n--) { | ||
373 | flag_done(&(jobs[n].busy)); | ||
374 | (void)deflateEnd(&(jobs[n].strm)); | ||
375 | free(jobs[n].out); | ||
376 | free(jobs[n].buf); | ||
377 | } | ||
378 | free(jobs); | ||
379 | pthread_attr_destroy(&attr); | ||
380 | } | ||
381 | |||
382 | /* Process arguments for level, size, and procs, compress from stdin to | ||
383 | stdout in the gzip format. Note that procs must be at least two in | ||
384 | order to provide a dictionary in one work unit for the other work | ||
385 | unit, and that size must be at least 32K to store a full dictionary. */ | ||
386 | int main(int argc, char **argv) | ||
387 | { | ||
388 | int n; /* general index */ | ||
389 | int get; /* command line parameters to get */ | ||
390 | char *arg; /* command line argument */ | ||
391 | |||
392 | /* set defaults -- 32 processes and 128K buffers was found to provide | ||
393 | good utilization of four cores (about 97%) and balanced the overall | ||
394 | execution time impact of more threads against more dictionary | ||
395 | processing for a fixed amount of memory -- the memory usage for these | ||
396 | settings and full use of all work units (at least 4 MB of input) is | ||
397 | 16.2 MB | ||
398 | */ | ||
399 | level = Z_DEFAULT_COMPRESSION; | ||
400 | procs = 32; | ||
401 | size = 131072UL; | ||
402 | |||
403 | /* process command-line arguments */ | ||
404 | get = 0; | ||
405 | for (n = 1; n < argc; n++) { | ||
406 | arg = argv[n]; | ||
407 | if (*arg == '-') { | ||
408 | while (*++arg) | ||
409 | if (*arg >= '0' && *arg <= '9') /* compression level */ | ||
410 | level = *arg - '0'; | ||
411 | else if (*arg == 'b') /* chunk size in K */ | ||
412 | get |= 1; | ||
413 | else if (*arg == 'p') /* number of processes */ | ||
414 | get |= 2; | ||
415 | else if (*arg == 'h') { /* help */ | ||
416 | fputs("usage: pigz [-0..9] [-b blocksizeinK]", stderr); | ||
417 | fputs(" [-p processes] < foo > foo.gz\n", stderr); | ||
418 | return 0; | ||
419 | } | ||
420 | else | ||
421 | bail("invalid option"); | ||
422 | } | ||
423 | else if (get & 1) { | ||
424 | if (get & 2) | ||
425 | bail("you need to separate the -b and -p options"); | ||
426 | size = (size_t)(atol(arg)) << 10; /* chunk size */ | ||
427 | if (size < DICT) | ||
428 | bail("invalid option"); | ||
429 | get = 0; | ||
430 | } | ||
431 | else if (get & 2) { | ||
432 | procs = atoi(arg); /* processes */ | ||
433 | if (procs < 2) | ||
434 | bail("invalid option"); | ||
435 | get = 0; | ||
436 | } | ||
437 | else | ||
438 | bail("invalid option (you need to pipe input and output)"); | ||
439 | } | ||
440 | if (get) | ||
441 | bail("missing option argument"); | ||
442 | |||
443 | /* do parallel compression from stdin to stdout (the read thread starts up | ||
444 | the write thread and the compression threads, and they all join before | ||
445 | the read thread returns) */ | ||
446 | ind = 0; | ||
447 | outd = 1; | ||
448 | read_thread(); | ||
449 | |||
450 | /* done */ | ||
451 | return 0; | ||
452 | } | ||