diff options
Diffstat (limited to 'C/MtCoder.c')
-rw-r--r-- | C/MtCoder.c | 595 |
1 files changed, 595 insertions, 0 deletions
diff --git a/C/MtCoder.c b/C/MtCoder.c new file mode 100644 index 0000000..99dc909 --- /dev/null +++ b/C/MtCoder.c | |||
@@ -0,0 +1,595 @@ | |||
1 | /* MtCoder.c -- Multi-thread Coder | ||
2 | 2021-12-21 : Igor Pavlov : Public domain */ | ||
3 | |||
4 | #include "Precomp.h" | ||
5 | |||
6 | #include "MtCoder.h" | ||
7 | |||
8 | #ifndef _7ZIP_ST | ||
9 | |||
10 | static SRes MtProgressThunk_Progress(const ICompressProgress *pp, UInt64 inSize, UInt64 outSize) | ||
11 | { | ||
12 | CMtProgressThunk *thunk = CONTAINER_FROM_VTBL(pp, CMtProgressThunk, vt); | ||
13 | UInt64 inSize2 = 0; | ||
14 | UInt64 outSize2 = 0; | ||
15 | if (inSize != (UInt64)(Int64)-1) | ||
16 | { | ||
17 | inSize2 = inSize - thunk->inSize; | ||
18 | thunk->inSize = inSize; | ||
19 | } | ||
20 | if (outSize != (UInt64)(Int64)-1) | ||
21 | { | ||
22 | outSize2 = outSize - thunk->outSize; | ||
23 | thunk->outSize = outSize; | ||
24 | } | ||
25 | return MtProgress_ProgressAdd(thunk->mtProgress, inSize2, outSize2); | ||
26 | } | ||
27 | |||
28 | |||
29 | void MtProgressThunk_CreateVTable(CMtProgressThunk *p) | ||
30 | { | ||
31 | p->vt.Progress = MtProgressThunk_Progress; | ||
32 | } | ||
33 | |||
34 | |||
35 | |||
36 | #define RINOK_THREAD(x) { if ((x) != 0) return SZ_ERROR_THREAD; } | ||
37 | |||
38 | |||
39 | static WRes ArEvent_OptCreate_And_Reset(CEvent *p) | ||
40 | { | ||
41 | if (Event_IsCreated(p)) | ||
42 | return Event_Reset(p); | ||
43 | return AutoResetEvent_CreateNotSignaled(p); | ||
44 | } | ||
45 | |||
46 | |||
47 | static THREAD_FUNC_DECL ThreadFunc(void *pp); | ||
48 | |||
49 | |||
50 | static SRes MtCoderThread_CreateAndStart(CMtCoderThread *t) | ||
51 | { | ||
52 | WRes wres = ArEvent_OptCreate_And_Reset(&t->startEvent); | ||
53 | if (wres == 0) | ||
54 | { | ||
55 | t->stop = False; | ||
56 | if (!Thread_WasCreated(&t->thread)) | ||
57 | wres = Thread_Create(&t->thread, ThreadFunc, t); | ||
58 | if (wres == 0) | ||
59 | wres = Event_Set(&t->startEvent); | ||
60 | } | ||
61 | if (wres == 0) | ||
62 | return SZ_OK; | ||
63 | return MY_SRes_HRESULT_FROM_WRes(wres); | ||
64 | } | ||
65 | |||
66 | |||
67 | static void MtCoderThread_Destruct(CMtCoderThread *t) | ||
68 | { | ||
69 | if (Thread_WasCreated(&t->thread)) | ||
70 | { | ||
71 | t->stop = 1; | ||
72 | Event_Set(&t->startEvent); | ||
73 | Thread_Wait_Close(&t->thread); | ||
74 | } | ||
75 | |||
76 | Event_Close(&t->startEvent); | ||
77 | |||
78 | if (t->inBuf) | ||
79 | { | ||
80 | ISzAlloc_Free(t->mtCoder->allocBig, t->inBuf); | ||
81 | t->inBuf = NULL; | ||
82 | } | ||
83 | } | ||
84 | |||
85 | |||
86 | |||
87 | static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize) | ||
88 | { | ||
89 | size_t size = *processedSize; | ||
90 | *processedSize = 0; | ||
91 | while (size != 0) | ||
92 | { | ||
93 | size_t cur = size; | ||
94 | SRes res = ISeqInStream_Read(stream, data, &cur); | ||
95 | *processedSize += cur; | ||
96 | data += cur; | ||
97 | size -= cur; | ||
98 | RINOK(res); | ||
99 | if (cur == 0) | ||
100 | return SZ_OK; | ||
101 | } | ||
102 | return SZ_OK; | ||
103 | } | ||
104 | |||
105 | |||
106 | /* | ||
107 | ThreadFunc2() returns: | ||
108 | SZ_OK - in all normal cases (even for stream error or memory allocation error) | ||
109 | SZ_ERROR_THREAD - in case of failure in system synch function | ||
110 | */ | ||
111 | |||
112 | static SRes ThreadFunc2(CMtCoderThread *t) | ||
113 | { | ||
114 | CMtCoder *mtc = t->mtCoder; | ||
115 | |||
116 | for (;;) | ||
117 | { | ||
118 | unsigned bi; | ||
119 | SRes res; | ||
120 | SRes res2; | ||
121 | BoolInt finished; | ||
122 | unsigned bufIndex; | ||
123 | size_t size; | ||
124 | const Byte *inData; | ||
125 | UInt64 readProcessed = 0; | ||
126 | |||
127 | RINOK_THREAD(Event_Wait(&mtc->readEvent)) | ||
128 | |||
129 | /* after Event_Wait(&mtc->readEvent) we must call Event_Set(&mtc->readEvent) in any case to unlock another threads */ | ||
130 | |||
131 | if (mtc->stopReading) | ||
132 | { | ||
133 | return Event_Set(&mtc->readEvent) == 0 ? SZ_OK : SZ_ERROR_THREAD; | ||
134 | } | ||
135 | |||
136 | res = MtProgress_GetError(&mtc->mtProgress); | ||
137 | |||
138 | size = 0; | ||
139 | inData = NULL; | ||
140 | finished = True; | ||
141 | |||
142 | if (res == SZ_OK) | ||
143 | { | ||
144 | size = mtc->blockSize; | ||
145 | if (mtc->inStream) | ||
146 | { | ||
147 | if (!t->inBuf) | ||
148 | { | ||
149 | t->inBuf = (Byte *)ISzAlloc_Alloc(mtc->allocBig, mtc->blockSize); | ||
150 | if (!t->inBuf) | ||
151 | res = SZ_ERROR_MEM; | ||
152 | } | ||
153 | if (res == SZ_OK) | ||
154 | { | ||
155 | res = FullRead(mtc->inStream, t->inBuf, &size); | ||
156 | readProcessed = mtc->readProcessed + size; | ||
157 | mtc->readProcessed = readProcessed; | ||
158 | } | ||
159 | if (res != SZ_OK) | ||
160 | { | ||
161 | mtc->readRes = res; | ||
162 | /* after reading error - we can stop encoding of previous blocks */ | ||
163 | MtProgress_SetError(&mtc->mtProgress, res); | ||
164 | } | ||
165 | else | ||
166 | finished = (size != mtc->blockSize); | ||
167 | } | ||
168 | else | ||
169 | { | ||
170 | size_t rem; | ||
171 | readProcessed = mtc->readProcessed; | ||
172 | rem = mtc->inDataSize - (size_t)readProcessed; | ||
173 | if (size > rem) | ||
174 | size = rem; | ||
175 | inData = mtc->inData + (size_t)readProcessed; | ||
176 | readProcessed += size; | ||
177 | mtc->readProcessed = readProcessed; | ||
178 | finished = (mtc->inDataSize == (size_t)readProcessed); | ||
179 | } | ||
180 | } | ||
181 | |||
182 | /* we must get some block from blocksSemaphore before Event_Set(&mtc->readEvent) */ | ||
183 | |||
184 | res2 = SZ_OK; | ||
185 | |||
186 | if (Semaphore_Wait(&mtc->blocksSemaphore) != 0) | ||
187 | { | ||
188 | res2 = SZ_ERROR_THREAD; | ||
189 | if (res == SZ_OK) | ||
190 | { | ||
191 | res = res2; | ||
192 | // MtProgress_SetError(&mtc->mtProgress, res); | ||
193 | } | ||
194 | } | ||
195 | |||
196 | bi = mtc->blockIndex; | ||
197 | |||
198 | if (++mtc->blockIndex >= mtc->numBlocksMax) | ||
199 | mtc->blockIndex = 0; | ||
200 | |||
201 | bufIndex = (unsigned)(int)-1; | ||
202 | |||
203 | if (res == SZ_OK) | ||
204 | res = MtProgress_GetError(&mtc->mtProgress); | ||
205 | |||
206 | if (res != SZ_OK) | ||
207 | finished = True; | ||
208 | |||
209 | if (!finished) | ||
210 | { | ||
211 | if (mtc->numStartedThreads < mtc->numStartedThreadsLimit | ||
212 | && mtc->expectedDataSize != readProcessed) | ||
213 | { | ||
214 | res = MtCoderThread_CreateAndStart(&mtc->threads[mtc->numStartedThreads]); | ||
215 | if (res == SZ_OK) | ||
216 | mtc->numStartedThreads++; | ||
217 | else | ||
218 | { | ||
219 | MtProgress_SetError(&mtc->mtProgress, res); | ||
220 | finished = True; | ||
221 | } | ||
222 | } | ||
223 | } | ||
224 | |||
225 | if (finished) | ||
226 | mtc->stopReading = True; | ||
227 | |||
228 | RINOK_THREAD(Event_Set(&mtc->readEvent)) | ||
229 | |||
230 | if (res2 != SZ_OK) | ||
231 | return res2; | ||
232 | |||
233 | if (res == SZ_OK) | ||
234 | { | ||
235 | CriticalSection_Enter(&mtc->cs); | ||
236 | bufIndex = mtc->freeBlockHead; | ||
237 | mtc->freeBlockHead = mtc->freeBlockList[bufIndex]; | ||
238 | CriticalSection_Leave(&mtc->cs); | ||
239 | |||
240 | res = mtc->mtCallback->Code(mtc->mtCallbackObject, t->index, bufIndex, | ||
241 | mtc->inStream ? t->inBuf : inData, size, finished); | ||
242 | |||
243 | // MtProgress_Reinit(&mtc->mtProgress, t->index); | ||
244 | |||
245 | if (res != SZ_OK) | ||
246 | MtProgress_SetError(&mtc->mtProgress, res); | ||
247 | } | ||
248 | |||
249 | { | ||
250 | CMtCoderBlock *block = &mtc->blocks[bi]; | ||
251 | block->res = res; | ||
252 | block->bufIndex = bufIndex; | ||
253 | block->finished = finished; | ||
254 | } | ||
255 | |||
256 | #ifdef MTCODER__USE_WRITE_THREAD | ||
257 | RINOK_THREAD(Event_Set(&mtc->writeEvents[bi])) | ||
258 | #else | ||
259 | { | ||
260 | unsigned wi; | ||
261 | { | ||
262 | CriticalSection_Enter(&mtc->cs); | ||
263 | wi = mtc->writeIndex; | ||
264 | if (wi == bi) | ||
265 | mtc->writeIndex = (unsigned)(int)-1; | ||
266 | else | ||
267 | mtc->ReadyBlocks[bi] = True; | ||
268 | CriticalSection_Leave(&mtc->cs); | ||
269 | } | ||
270 | |||
271 | if (wi != bi) | ||
272 | { | ||
273 | if (res != SZ_OK || finished) | ||
274 | return 0; | ||
275 | continue; | ||
276 | } | ||
277 | |||
278 | if (mtc->writeRes != SZ_OK) | ||
279 | res = mtc->writeRes; | ||
280 | |||
281 | for (;;) | ||
282 | { | ||
283 | if (res == SZ_OK && bufIndex != (unsigned)(int)-1) | ||
284 | { | ||
285 | res = mtc->mtCallback->Write(mtc->mtCallbackObject, bufIndex); | ||
286 | if (res != SZ_OK) | ||
287 | { | ||
288 | mtc->writeRes = res; | ||
289 | MtProgress_SetError(&mtc->mtProgress, res); | ||
290 | } | ||
291 | } | ||
292 | |||
293 | if (++wi >= mtc->numBlocksMax) | ||
294 | wi = 0; | ||
295 | { | ||
296 | BoolInt isReady; | ||
297 | |||
298 | CriticalSection_Enter(&mtc->cs); | ||
299 | |||
300 | if (bufIndex != (unsigned)(int)-1) | ||
301 | { | ||
302 | mtc->freeBlockList[bufIndex] = mtc->freeBlockHead; | ||
303 | mtc->freeBlockHead = bufIndex; | ||
304 | } | ||
305 | |||
306 | isReady = mtc->ReadyBlocks[wi]; | ||
307 | |||
308 | if (isReady) | ||
309 | mtc->ReadyBlocks[wi] = False; | ||
310 | else | ||
311 | mtc->writeIndex = wi; | ||
312 | |||
313 | CriticalSection_Leave(&mtc->cs); | ||
314 | |||
315 | RINOK_THREAD(Semaphore_Release1(&mtc->blocksSemaphore)) | ||
316 | |||
317 | if (!isReady) | ||
318 | break; | ||
319 | } | ||
320 | |||
321 | { | ||
322 | CMtCoderBlock *block = &mtc->blocks[wi]; | ||
323 | if (res == SZ_OK && block->res != SZ_OK) | ||
324 | res = block->res; | ||
325 | bufIndex = block->bufIndex; | ||
326 | finished = block->finished; | ||
327 | } | ||
328 | } | ||
329 | } | ||
330 | #endif | ||
331 | |||
332 | if (finished || res != SZ_OK) | ||
333 | return 0; | ||
334 | } | ||
335 | } | ||
336 | |||
337 | |||
338 | static THREAD_FUNC_DECL ThreadFunc(void *pp) | ||
339 | { | ||
340 | CMtCoderThread *t = (CMtCoderThread *)pp; | ||
341 | for (;;) | ||
342 | { | ||
343 | if (Event_Wait(&t->startEvent) != 0) | ||
344 | return (THREAD_FUNC_RET_TYPE)SZ_ERROR_THREAD; | ||
345 | if (t->stop) | ||
346 | return 0; | ||
347 | { | ||
348 | SRes res = ThreadFunc2(t); | ||
349 | CMtCoder *mtc = t->mtCoder; | ||
350 | if (res != SZ_OK) | ||
351 | { | ||
352 | MtProgress_SetError(&mtc->mtProgress, res); | ||
353 | } | ||
354 | |||
355 | #ifndef MTCODER__USE_WRITE_THREAD | ||
356 | { | ||
357 | unsigned numFinished = (unsigned)InterlockedIncrement(&mtc->numFinishedThreads); | ||
358 | if (numFinished == mtc->numStartedThreads) | ||
359 | if (Event_Set(&mtc->finishedEvent) != 0) | ||
360 | return (THREAD_FUNC_RET_TYPE)SZ_ERROR_THREAD; | ||
361 | } | ||
362 | #endif | ||
363 | } | ||
364 | } | ||
365 | } | ||
366 | |||
367 | |||
368 | |||
369 | void MtCoder_Construct(CMtCoder *p) | ||
370 | { | ||
371 | unsigned i; | ||
372 | |||
373 | p->blockSize = 0; | ||
374 | p->numThreadsMax = 0; | ||
375 | p->expectedDataSize = (UInt64)(Int64)-1; | ||
376 | |||
377 | p->inStream = NULL; | ||
378 | p->inData = NULL; | ||
379 | p->inDataSize = 0; | ||
380 | |||
381 | p->progress = NULL; | ||
382 | p->allocBig = NULL; | ||
383 | |||
384 | p->mtCallback = NULL; | ||
385 | p->mtCallbackObject = NULL; | ||
386 | |||
387 | p->allocatedBufsSize = 0; | ||
388 | |||
389 | Event_Construct(&p->readEvent); | ||
390 | Semaphore_Construct(&p->blocksSemaphore); | ||
391 | |||
392 | for (i = 0; i < MTCODER__THREADS_MAX; i++) | ||
393 | { | ||
394 | CMtCoderThread *t = &p->threads[i]; | ||
395 | t->mtCoder = p; | ||
396 | t->index = i; | ||
397 | t->inBuf = NULL; | ||
398 | t->stop = False; | ||
399 | Event_Construct(&t->startEvent); | ||
400 | Thread_Construct(&t->thread); | ||
401 | } | ||
402 | |||
403 | #ifdef MTCODER__USE_WRITE_THREAD | ||
404 | for (i = 0; i < MTCODER__BLOCKS_MAX; i++) | ||
405 | Event_Construct(&p->writeEvents[i]); | ||
406 | #else | ||
407 | Event_Construct(&p->finishedEvent); | ||
408 | #endif | ||
409 | |||
410 | CriticalSection_Init(&p->cs); | ||
411 | CriticalSection_Init(&p->mtProgress.cs); | ||
412 | } | ||
413 | |||
414 | |||
415 | |||
416 | |||
417 | static void MtCoder_Free(CMtCoder *p) | ||
418 | { | ||
419 | unsigned i; | ||
420 | |||
421 | /* | ||
422 | p->stopReading = True; | ||
423 | if (Event_IsCreated(&p->readEvent)) | ||
424 | Event_Set(&p->readEvent); | ||
425 | */ | ||
426 | |||
427 | for (i = 0; i < MTCODER__THREADS_MAX; i++) | ||
428 | MtCoderThread_Destruct(&p->threads[i]); | ||
429 | |||
430 | Event_Close(&p->readEvent); | ||
431 | Semaphore_Close(&p->blocksSemaphore); | ||
432 | |||
433 | #ifdef MTCODER__USE_WRITE_THREAD | ||
434 | for (i = 0; i < MTCODER__BLOCKS_MAX; i++) | ||
435 | Event_Close(&p->writeEvents[i]); | ||
436 | #else | ||
437 | Event_Close(&p->finishedEvent); | ||
438 | #endif | ||
439 | } | ||
440 | |||
441 | |||
442 | void MtCoder_Destruct(CMtCoder *p) | ||
443 | { | ||
444 | MtCoder_Free(p); | ||
445 | |||
446 | CriticalSection_Delete(&p->cs); | ||
447 | CriticalSection_Delete(&p->mtProgress.cs); | ||
448 | } | ||
449 | |||
450 | |||
451 | SRes MtCoder_Code(CMtCoder *p) | ||
452 | { | ||
453 | unsigned numThreads = p->numThreadsMax; | ||
454 | unsigned numBlocksMax; | ||
455 | unsigned i; | ||
456 | SRes res = SZ_OK; | ||
457 | |||
458 | if (numThreads > MTCODER__THREADS_MAX) | ||
459 | numThreads = MTCODER__THREADS_MAX; | ||
460 | numBlocksMax = MTCODER__GET_NUM_BLOCKS_FROM_THREADS(numThreads); | ||
461 | |||
462 | if (p->blockSize < ((UInt32)1 << 26)) numBlocksMax++; | ||
463 | if (p->blockSize < ((UInt32)1 << 24)) numBlocksMax++; | ||
464 | if (p->blockSize < ((UInt32)1 << 22)) numBlocksMax++; | ||
465 | |||
466 | if (numBlocksMax > MTCODER__BLOCKS_MAX) | ||
467 | numBlocksMax = MTCODER__BLOCKS_MAX; | ||
468 | |||
469 | if (p->blockSize != p->allocatedBufsSize) | ||
470 | { | ||
471 | for (i = 0; i < MTCODER__THREADS_MAX; i++) | ||
472 | { | ||
473 | CMtCoderThread *t = &p->threads[i]; | ||
474 | if (t->inBuf) | ||
475 | { | ||
476 | ISzAlloc_Free(p->allocBig, t->inBuf); | ||
477 | t->inBuf = NULL; | ||
478 | } | ||
479 | } | ||
480 | p->allocatedBufsSize = p->blockSize; | ||
481 | } | ||
482 | |||
483 | p->readRes = SZ_OK; | ||
484 | |||
485 | MtProgress_Init(&p->mtProgress, p->progress); | ||
486 | |||
487 | #ifdef MTCODER__USE_WRITE_THREAD | ||
488 | for (i = 0; i < numBlocksMax; i++) | ||
489 | { | ||
490 | RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->writeEvents[i])); | ||
491 | } | ||
492 | #else | ||
493 | RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->finishedEvent)); | ||
494 | #endif | ||
495 | |||
496 | { | ||
497 | RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->readEvent)); | ||
498 | RINOK_THREAD(Semaphore_OptCreateInit(&p->blocksSemaphore, numBlocksMax, numBlocksMax)); | ||
499 | } | ||
500 | |||
501 | for (i = 0; i < MTCODER__BLOCKS_MAX - 1; i++) | ||
502 | p->freeBlockList[i] = i + 1; | ||
503 | p->freeBlockList[MTCODER__BLOCKS_MAX - 1] = (unsigned)(int)-1; | ||
504 | p->freeBlockHead = 0; | ||
505 | |||
506 | p->readProcessed = 0; | ||
507 | p->blockIndex = 0; | ||
508 | p->numBlocksMax = numBlocksMax; | ||
509 | p->stopReading = False; | ||
510 | |||
511 | #ifndef MTCODER__USE_WRITE_THREAD | ||
512 | p->writeIndex = 0; | ||
513 | p->writeRes = SZ_OK; | ||
514 | for (i = 0; i < MTCODER__BLOCKS_MAX; i++) | ||
515 | p->ReadyBlocks[i] = False; | ||
516 | p->numFinishedThreads = 0; | ||
517 | #endif | ||
518 | |||
519 | p->numStartedThreadsLimit = numThreads; | ||
520 | p->numStartedThreads = 0; | ||
521 | |||
522 | // for (i = 0; i < numThreads; i++) | ||
523 | { | ||
524 | CMtCoderThread *nextThread = &p->threads[p->numStartedThreads++]; | ||
525 | RINOK(MtCoderThread_CreateAndStart(nextThread)); | ||
526 | } | ||
527 | |||
528 | RINOK_THREAD(Event_Set(&p->readEvent)) | ||
529 | |||
530 | #ifdef MTCODER__USE_WRITE_THREAD | ||
531 | { | ||
532 | unsigned bi = 0; | ||
533 | |||
534 | for (;; bi++) | ||
535 | { | ||
536 | if (bi >= numBlocksMax) | ||
537 | bi = 0; | ||
538 | |||
539 | RINOK_THREAD(Event_Wait(&p->writeEvents[bi])) | ||
540 | |||
541 | { | ||
542 | const CMtCoderBlock *block = &p->blocks[bi]; | ||
543 | unsigned bufIndex = block->bufIndex; | ||
544 | BoolInt finished = block->finished; | ||
545 | if (res == SZ_OK && block->res != SZ_OK) | ||
546 | res = block->res; | ||
547 | |||
548 | if (bufIndex != (unsigned)(int)-1) | ||
549 | { | ||
550 | if (res == SZ_OK) | ||
551 | { | ||
552 | res = p->mtCallback->Write(p->mtCallbackObject, bufIndex); | ||
553 | if (res != SZ_OK) | ||
554 | MtProgress_SetError(&p->mtProgress, res); | ||
555 | } | ||
556 | |||
557 | CriticalSection_Enter(&p->cs); | ||
558 | { | ||
559 | p->freeBlockList[bufIndex] = p->freeBlockHead; | ||
560 | p->freeBlockHead = bufIndex; | ||
561 | } | ||
562 | CriticalSection_Leave(&p->cs); | ||
563 | } | ||
564 | |||
565 | RINOK_THREAD(Semaphore_Release1(&p->blocksSemaphore)) | ||
566 | |||
567 | if (finished) | ||
568 | break; | ||
569 | } | ||
570 | } | ||
571 | } | ||
572 | #else | ||
573 | { | ||
574 | WRes wres = Event_Wait(&p->finishedEvent); | ||
575 | res = MY_SRes_HRESULT_FROM_WRes(wres); | ||
576 | } | ||
577 | #endif | ||
578 | |||
579 | if (res == SZ_OK) | ||
580 | res = p->readRes; | ||
581 | |||
582 | if (res == SZ_OK) | ||
583 | res = p->mtProgress.res; | ||
584 | |||
585 | #ifndef MTCODER__USE_WRITE_THREAD | ||
586 | if (res == SZ_OK) | ||
587 | res = p->writeRes; | ||
588 | #endif | ||
589 | |||
590 | if (res != SZ_OK) | ||
591 | MtCoder_Free(p); | ||
592 | return res; | ||
593 | } | ||
594 | |||
595 | #endif | ||