aboutsummaryrefslogtreecommitdiff
path: root/C/MtCoder.c
diff options
context:
space:
mode:
Diffstat (limited to 'C/MtCoder.c')
-rw-r--r--C/MtCoder.c104
1 files changed, 40 insertions, 64 deletions
diff --git a/C/MtCoder.c b/C/MtCoder.c
index 99dc909..6f58abb 100644
--- a/C/MtCoder.c
+++ b/C/MtCoder.c
@@ -1,28 +1,28 @@
1/* MtCoder.c -- Multi-thread Coder 1/* MtCoder.c -- Multi-thread Coder
22021-12-21 : Igor Pavlov : Public domain */ 22023-04-13 : Igor Pavlov : Public domain */
3 3
4#include "Precomp.h" 4#include "Precomp.h"
5 5
6#include "MtCoder.h" 6#include "MtCoder.h"
7 7
8#ifndef _7ZIP_ST 8#ifndef Z7_ST
9 9
10static SRes MtProgressThunk_Progress(const ICompressProgress *pp, UInt64 inSize, UInt64 outSize) 10static SRes MtProgressThunk_Progress(ICompressProgressPtr pp, UInt64 inSize, UInt64 outSize)
11{ 11{
12 CMtProgressThunk *thunk = CONTAINER_FROM_VTBL(pp, CMtProgressThunk, vt); 12 Z7_CONTAINER_FROM_VTBL_TO_DECL_VAR_pp_vt_p(CMtProgressThunk)
13 UInt64 inSize2 = 0; 13 UInt64 inSize2 = 0;
14 UInt64 outSize2 = 0; 14 UInt64 outSize2 = 0;
15 if (inSize != (UInt64)(Int64)-1) 15 if (inSize != (UInt64)(Int64)-1)
16 { 16 {
17 inSize2 = inSize - thunk->inSize; 17 inSize2 = inSize - p->inSize;
18 thunk->inSize = inSize; 18 p->inSize = inSize;
19 } 19 }
20 if (outSize != (UInt64)(Int64)-1) 20 if (outSize != (UInt64)(Int64)-1)
21 { 21 {
22 outSize2 = outSize - thunk->outSize; 22 outSize2 = outSize - p->outSize;
23 thunk->outSize = outSize; 23 p->outSize = outSize;
24 } 24 }
25 return MtProgress_ProgressAdd(thunk->mtProgress, inSize2, outSize2); 25 return MtProgress_ProgressAdd(p->mtProgress, inSize2, outSize2);
26} 26}
27 27
28 28
@@ -36,20 +36,12 @@ void MtProgressThunk_CreateVTable(CMtProgressThunk *p)
36#define RINOK_THREAD(x) { if ((x) != 0) return SZ_ERROR_THREAD; } 36#define RINOK_THREAD(x) { if ((x) != 0) return SZ_ERROR_THREAD; }
37 37
38 38
39static WRes ArEvent_OptCreate_And_Reset(CEvent *p)
40{
41 if (Event_IsCreated(p))
42 return Event_Reset(p);
43 return AutoResetEvent_CreateNotSignaled(p);
44}
45
46
47static THREAD_FUNC_DECL ThreadFunc(void *pp); 39static THREAD_FUNC_DECL ThreadFunc(void *pp);
48 40
49 41
50static SRes MtCoderThread_CreateAndStart(CMtCoderThread *t) 42static SRes MtCoderThread_CreateAndStart(CMtCoderThread *t)
51{ 43{
52 WRes wres = ArEvent_OptCreate_And_Reset(&t->startEvent); 44 WRes wres = AutoResetEvent_OptCreate_And_Reset(&t->startEvent);
53 if (wres == 0) 45 if (wres == 0)
54 { 46 {
55 t->stop = False; 47 t->stop = False;
@@ -84,24 +76,6 @@ static void MtCoderThread_Destruct(CMtCoderThread *t)
84 76
85 77
86 78
87static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize)
88{
89 size_t size = *processedSize;
90 *processedSize = 0;
91 while (size != 0)
92 {
93 size_t cur = size;
94 SRes res = ISeqInStream_Read(stream, data, &cur);
95 *processedSize += cur;
96 data += cur;
97 size -= cur;
98 RINOK(res);
99 if (cur == 0)
100 return SZ_OK;
101 }
102 return SZ_OK;
103}
104
105 79
106/* 80/*
107 ThreadFunc2() returns: 81 ThreadFunc2() returns:
@@ -152,7 +126,7 @@ static SRes ThreadFunc2(CMtCoderThread *t)
152 } 126 }
153 if (res == SZ_OK) 127 if (res == SZ_OK)
154 { 128 {
155 res = FullRead(mtc->inStream, t->inBuf, &size); 129 res = SeqInStream_ReadMax(mtc->inStream, t->inBuf, &size);
156 readProcessed = mtc->readProcessed + size; 130 readProcessed = mtc->readProcessed + size;
157 mtc->readProcessed = readProcessed; 131 mtc->readProcessed = readProcessed;
158 } 132 }
@@ -253,7 +227,7 @@ static SRes ThreadFunc2(CMtCoderThread *t)
253 block->finished = finished; 227 block->finished = finished;
254 } 228 }
255 229
256 #ifdef MTCODER__USE_WRITE_THREAD 230 #ifdef MTCODER_USE_WRITE_THREAD
257 RINOK_THREAD(Event_Set(&mtc->writeEvents[bi])) 231 RINOK_THREAD(Event_Set(&mtc->writeEvents[bi]))
258 #else 232 #else
259 { 233 {
@@ -352,7 +326,7 @@ static THREAD_FUNC_DECL ThreadFunc(void *pp)
352 MtProgress_SetError(&mtc->mtProgress, res); 326 MtProgress_SetError(&mtc->mtProgress, res);
353 } 327 }
354 328
355 #ifndef MTCODER__USE_WRITE_THREAD 329 #ifndef MTCODER_USE_WRITE_THREAD
356 { 330 {
357 unsigned numFinished = (unsigned)InterlockedIncrement(&mtc->numFinishedThreads); 331 unsigned numFinished = (unsigned)InterlockedIncrement(&mtc->numFinishedThreads);
358 if (numFinished == mtc->numStartedThreads) 332 if (numFinished == mtc->numStartedThreads)
@@ -389,7 +363,7 @@ void MtCoder_Construct(CMtCoder *p)
389 Event_Construct(&p->readEvent); 363 Event_Construct(&p->readEvent);
390 Semaphore_Construct(&p->blocksSemaphore); 364 Semaphore_Construct(&p->blocksSemaphore);
391 365
392 for (i = 0; i < MTCODER__THREADS_MAX; i++) 366 for (i = 0; i < MTCODER_THREADS_MAX; i++)
393 { 367 {
394 CMtCoderThread *t = &p->threads[i]; 368 CMtCoderThread *t = &p->threads[i];
395 t->mtCoder = p; 369 t->mtCoder = p;
@@ -397,11 +371,11 @@ void MtCoder_Construct(CMtCoder *p)
397 t->inBuf = NULL; 371 t->inBuf = NULL;
398 t->stop = False; 372 t->stop = False;
399 Event_Construct(&t->startEvent); 373 Event_Construct(&t->startEvent);
400 Thread_Construct(&t->thread); 374 Thread_CONSTRUCT(&t->thread)
401 } 375 }
402 376
403 #ifdef MTCODER__USE_WRITE_THREAD 377 #ifdef MTCODER_USE_WRITE_THREAD
404 for (i = 0; i < MTCODER__BLOCKS_MAX; i++) 378 for (i = 0; i < MTCODER_BLOCKS_MAX; i++)
405 Event_Construct(&p->writeEvents[i]); 379 Event_Construct(&p->writeEvents[i]);
406 #else 380 #else
407 Event_Construct(&p->finishedEvent); 381 Event_Construct(&p->finishedEvent);
@@ -424,14 +398,14 @@ static void MtCoder_Free(CMtCoder *p)
424 Event_Set(&p->readEvent); 398 Event_Set(&p->readEvent);
425 */ 399 */
426 400
427 for (i = 0; i < MTCODER__THREADS_MAX; i++) 401 for (i = 0; i < MTCODER_THREADS_MAX; i++)
428 MtCoderThread_Destruct(&p->threads[i]); 402 MtCoderThread_Destruct(&p->threads[i]);
429 403
430 Event_Close(&p->readEvent); 404 Event_Close(&p->readEvent);
431 Semaphore_Close(&p->blocksSemaphore); 405 Semaphore_Close(&p->blocksSemaphore);
432 406
433 #ifdef MTCODER__USE_WRITE_THREAD 407 #ifdef MTCODER_USE_WRITE_THREAD
434 for (i = 0; i < MTCODER__BLOCKS_MAX; i++) 408 for (i = 0; i < MTCODER_BLOCKS_MAX; i++)
435 Event_Close(&p->writeEvents[i]); 409 Event_Close(&p->writeEvents[i]);
436 #else 410 #else
437 Event_Close(&p->finishedEvent); 411 Event_Close(&p->finishedEvent);
@@ -455,20 +429,20 @@ SRes MtCoder_Code(CMtCoder *p)
455 unsigned i; 429 unsigned i;
456 SRes res = SZ_OK; 430 SRes res = SZ_OK;
457 431
458 if (numThreads > MTCODER__THREADS_MAX) 432 if (numThreads > MTCODER_THREADS_MAX)
459 numThreads = MTCODER__THREADS_MAX; 433 numThreads = MTCODER_THREADS_MAX;
460 numBlocksMax = MTCODER__GET_NUM_BLOCKS_FROM_THREADS(numThreads); 434 numBlocksMax = MTCODER_GET_NUM_BLOCKS_FROM_THREADS(numThreads);
461 435
462 if (p->blockSize < ((UInt32)1 << 26)) numBlocksMax++; 436 if (p->blockSize < ((UInt32)1 << 26)) numBlocksMax++;
463 if (p->blockSize < ((UInt32)1 << 24)) numBlocksMax++; 437 if (p->blockSize < ((UInt32)1 << 24)) numBlocksMax++;
464 if (p->blockSize < ((UInt32)1 << 22)) numBlocksMax++; 438 if (p->blockSize < ((UInt32)1 << 22)) numBlocksMax++;
465 439
466 if (numBlocksMax > MTCODER__BLOCKS_MAX) 440 if (numBlocksMax > MTCODER_BLOCKS_MAX)
467 numBlocksMax = MTCODER__BLOCKS_MAX; 441 numBlocksMax = MTCODER_BLOCKS_MAX;
468 442
469 if (p->blockSize != p->allocatedBufsSize) 443 if (p->blockSize != p->allocatedBufsSize)
470 { 444 {
471 for (i = 0; i < MTCODER__THREADS_MAX; i++) 445 for (i = 0; i < MTCODER_THREADS_MAX; i++)
472 { 446 {
473 CMtCoderThread *t = &p->threads[i]; 447 CMtCoderThread *t = &p->threads[i];
474 if (t->inBuf) 448 if (t->inBuf)
@@ -484,23 +458,23 @@ SRes MtCoder_Code(CMtCoder *p)
484 458
485 MtProgress_Init(&p->mtProgress, p->progress); 459 MtProgress_Init(&p->mtProgress, p->progress);
486 460
487 #ifdef MTCODER__USE_WRITE_THREAD 461 #ifdef MTCODER_USE_WRITE_THREAD
488 for (i = 0; i < numBlocksMax; i++) 462 for (i = 0; i < numBlocksMax; i++)
489 { 463 {
490 RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->writeEvents[i])); 464 RINOK_THREAD(AutoResetEvent_OptCreate_And_Reset(&p->writeEvents[i]))
491 } 465 }
492 #else 466 #else
493 RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->finishedEvent)); 467 RINOK_THREAD(AutoResetEvent_OptCreate_And_Reset(&p->finishedEvent))
494 #endif 468 #endif
495 469
496 { 470 {
497 RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->readEvent)); 471 RINOK_THREAD(AutoResetEvent_OptCreate_And_Reset(&p->readEvent))
498 RINOK_THREAD(Semaphore_OptCreateInit(&p->blocksSemaphore, numBlocksMax, numBlocksMax)); 472 RINOK_THREAD(Semaphore_OptCreateInit(&p->blocksSemaphore, numBlocksMax, numBlocksMax))
499 } 473 }
500 474
501 for (i = 0; i < MTCODER__BLOCKS_MAX - 1; i++) 475 for (i = 0; i < MTCODER_BLOCKS_MAX - 1; i++)
502 p->freeBlockList[i] = i + 1; 476 p->freeBlockList[i] = i + 1;
503 p->freeBlockList[MTCODER__BLOCKS_MAX - 1] = (unsigned)(int)-1; 477 p->freeBlockList[MTCODER_BLOCKS_MAX - 1] = (unsigned)(int)-1;
504 p->freeBlockHead = 0; 478 p->freeBlockHead = 0;
505 479
506 p->readProcessed = 0; 480 p->readProcessed = 0;
@@ -508,10 +482,10 @@ SRes MtCoder_Code(CMtCoder *p)
508 p->numBlocksMax = numBlocksMax; 482 p->numBlocksMax = numBlocksMax;
509 p->stopReading = False; 483 p->stopReading = False;
510 484
511 #ifndef MTCODER__USE_WRITE_THREAD 485 #ifndef MTCODER_USE_WRITE_THREAD
512 p->writeIndex = 0; 486 p->writeIndex = 0;
513 p->writeRes = SZ_OK; 487 p->writeRes = SZ_OK;
514 for (i = 0; i < MTCODER__BLOCKS_MAX; i++) 488 for (i = 0; i < MTCODER_BLOCKS_MAX; i++)
515 p->ReadyBlocks[i] = False; 489 p->ReadyBlocks[i] = False;
516 p->numFinishedThreads = 0; 490 p->numFinishedThreads = 0;
517 #endif 491 #endif
@@ -522,12 +496,12 @@ SRes MtCoder_Code(CMtCoder *p)
522 // for (i = 0; i < numThreads; i++) 496 // for (i = 0; i < numThreads; i++)
523 { 497 {
524 CMtCoderThread *nextThread = &p->threads[p->numStartedThreads++]; 498 CMtCoderThread *nextThread = &p->threads[p->numStartedThreads++];
525 RINOK(MtCoderThread_CreateAndStart(nextThread)); 499 RINOK(MtCoderThread_CreateAndStart(nextThread))
526 } 500 }
527 501
528 RINOK_THREAD(Event_Set(&p->readEvent)) 502 RINOK_THREAD(Event_Set(&p->readEvent))
529 503
530 #ifdef MTCODER__USE_WRITE_THREAD 504 #ifdef MTCODER_USE_WRITE_THREAD
531 { 505 {
532 unsigned bi = 0; 506 unsigned bi = 0;
533 507
@@ -582,7 +556,7 @@ SRes MtCoder_Code(CMtCoder *p)
582 if (res == SZ_OK) 556 if (res == SZ_OK)
583 res = p->mtProgress.res; 557 res = p->mtProgress.res;
584 558
585 #ifndef MTCODER__USE_WRITE_THREAD 559 #ifndef MTCODER_USE_WRITE_THREAD
586 if (res == SZ_OK) 560 if (res == SZ_OK)
587 res = p->writeRes; 561 res = p->writeRes;
588 #endif 562 #endif
@@ -593,3 +567,5 @@ SRes MtCoder_Code(CMtCoder *p)
593} 567}
594 568
595#endif 569#endif
570
571#undef RINOK_THREAD