aboutsummaryrefslogtreecommitdiff
path: root/C/MtCoder.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--C/MtCoder.c61
1 files changed, 47 insertions, 14 deletions
diff --git a/C/MtCoder.c b/C/MtCoder.c
index 03959b6..923b19a 100644
--- a/C/MtCoder.c
+++ b/C/MtCoder.c
@@ -1,5 +1,5 @@
1/* MtCoder.c -- Multi-thread Coder 1/* MtCoder.c -- Multi-thread Coder
22023-09-07 : Igor Pavlov : Public domain */ 2: Igor Pavlov : Public domain */
3 3
4#include "Precomp.h" 4#include "Precomp.h"
5 5
@@ -39,14 +39,28 @@ void MtProgressThunk_CreateVTable(CMtProgressThunk *p)
39static THREAD_FUNC_DECL ThreadFunc(void *pp); 39static THREAD_FUNC_DECL ThreadFunc(void *pp);
40 40
41 41
42static SRes MtCoderThread_CreateAndStart(CMtCoderThread *t) 42static SRes MtCoderThread_CreateAndStart(CMtCoderThread *t
43#ifdef _WIN32
44 , CMtCoder * const mtc
45#endif
46 )
43{ 47{
44 WRes wres = AutoResetEvent_OptCreate_And_Reset(&t->startEvent); 48 WRes wres = AutoResetEvent_OptCreate_And_Reset(&t->startEvent);
49 // printf("\n====== MtCoderThread_CreateAndStart : \n");
45 if (wres == 0) 50 if (wres == 0)
46 { 51 {
47 t->stop = False; 52 t->stop = False;
48 if (!Thread_WasCreated(&t->thread)) 53 if (!Thread_WasCreated(&t->thread))
49 wres = Thread_Create(&t->thread, ThreadFunc, t); 54 {
55#ifdef _WIN32
56 if (mtc->numThreadGroups)
57 wres = Thread_Create_With_Group(&t->thread, ThreadFunc, t,
58 ThreadNextGroup_GetNext(&mtc->nextGroup), // group
59 0); // affinityMask
60 else
61#endif
62 wres = Thread_Create(&t->thread, ThreadFunc, t);
63 }
50 if (wres == 0) 64 if (wres == 0)
51 wres = Event_Set(&t->startEvent); 65 wres = Event_Set(&t->startEvent);
52 } 66 }
@@ -56,6 +70,7 @@ static SRes MtCoderThread_CreateAndStart(CMtCoderThread *t)
56} 70}
57 71
58 72
73Z7_FORCE_INLINE
59static void MtCoderThread_Destruct(CMtCoderThread *t) 74static void MtCoderThread_Destruct(CMtCoderThread *t)
60{ 75{
61 if (Thread_WasCreated(&t->thread)) 76 if (Thread_WasCreated(&t->thread))
@@ -85,7 +100,7 @@ static void MtCoderThread_Destruct(CMtCoderThread *t)
85 100
86static SRes ThreadFunc2(CMtCoderThread *t) 101static SRes ThreadFunc2(CMtCoderThread *t)
87{ 102{
88 CMtCoder *mtc = t->mtCoder; 103 CMtCoder * const mtc = t->mtCoder;
89 104
90 for (;;) 105 for (;;)
91 { 106 {
@@ -185,7 +200,11 @@ static SRes ThreadFunc2(CMtCoderThread *t)
185 if (mtc->numStartedThreads < mtc->numStartedThreadsLimit 200 if (mtc->numStartedThreads < mtc->numStartedThreadsLimit
186 && mtc->expectedDataSize != readProcessed) 201 && mtc->expectedDataSize != readProcessed)
187 { 202 {
188 res = MtCoderThread_CreateAndStart(&mtc->threads[mtc->numStartedThreads]); 203 res = MtCoderThread_CreateAndStart(&mtc->threads[mtc->numStartedThreads]
204#ifdef _WIN32
205 , mtc
206#endif
207 );
189 if (res == SZ_OK) 208 if (res == SZ_OK)
190 mtc->numStartedThreads++; 209 mtc->numStartedThreads++;
191 else 210 else
@@ -221,7 +240,7 @@ static SRes ThreadFunc2(CMtCoderThread *t)
221 } 240 }
222 241
223 { 242 {
224 CMtCoderBlock *block = &mtc->blocks[bi]; 243 CMtCoderBlock * const block = &mtc->blocks[bi];
225 block->res = res; 244 block->res = res;
226 block->bufIndex = bufIndex; 245 block->bufIndex = bufIndex;
227 block->finished = finished; 246 block->finished = finished;
@@ -311,7 +330,7 @@ static SRes ThreadFunc2(CMtCoderThread *t)
311 330
312static THREAD_FUNC_DECL ThreadFunc(void *pp) 331static THREAD_FUNC_DECL ThreadFunc(void *pp)
313{ 332{
314 CMtCoderThread *t = (CMtCoderThread *)pp; 333 CMtCoderThread * const t = (CMtCoderThread *)pp;
315 for (;;) 334 for (;;)
316 { 335 {
317 if (Event_Wait(&t->startEvent) != 0) 336 if (Event_Wait(&t->startEvent) != 0)
@@ -319,7 +338,7 @@ static THREAD_FUNC_DECL ThreadFunc(void *pp)
319 if (t->stop) 338 if (t->stop)
320 return 0; 339 return 0;
321 { 340 {
322 SRes res = ThreadFunc2(t); 341 const SRes res = ThreadFunc2(t);
323 CMtCoder *mtc = t->mtCoder; 342 CMtCoder *mtc = t->mtCoder;
324 if (res != SZ_OK) 343 if (res != SZ_OK)
325 { 344 {
@@ -328,7 +347,7 @@ static THREAD_FUNC_DECL ThreadFunc(void *pp)
328 347
329 #ifndef MTCODER_USE_WRITE_THREAD 348 #ifndef MTCODER_USE_WRITE_THREAD
330 { 349 {
331 unsigned numFinished = (unsigned)InterlockedIncrement(&mtc->numFinishedThreads); 350 const unsigned numFinished = (unsigned)InterlockedIncrement(&mtc->numFinishedThreads);
332 if (numFinished == mtc->numStartedThreads) 351 if (numFinished == mtc->numStartedThreads)
333 if (Event_Set(&mtc->finishedEvent) != 0) 352 if (Event_Set(&mtc->finishedEvent) != 0)
334 return (THREAD_FUNC_RET_TYPE)SZ_ERROR_THREAD; 353 return (THREAD_FUNC_RET_TYPE)SZ_ERROR_THREAD;
@@ -346,6 +365,7 @@ void MtCoder_Construct(CMtCoder *p)
346 365
347 p->blockSize = 0; 366 p->blockSize = 0;
348 p->numThreadsMax = 0; 367 p->numThreadsMax = 0;
368 p->numThreadGroups = 0;
349 p->expectedDataSize = (UInt64)(Int64)-1; 369 p->expectedDataSize = (UInt64)(Int64)-1;
350 370
351 p->inStream = NULL; 371 p->inStream = NULL;
@@ -429,6 +449,8 @@ SRes MtCoder_Code(CMtCoder *p)
429 unsigned i; 449 unsigned i;
430 SRes res = SZ_OK; 450 SRes res = SZ_OK;
431 451
452 // printf("\n====== MtCoder_Code : \n");
453
432 if (numThreads > MTCODER_THREADS_MAX) 454 if (numThreads > MTCODER_THREADS_MAX)
433 numThreads = MTCODER_THREADS_MAX; 455 numThreads = MTCODER_THREADS_MAX;
434 numBlocksMax = MTCODER_GET_NUM_BLOCKS_FROM_THREADS(numThreads); 456 numBlocksMax = MTCODER_GET_NUM_BLOCKS_FROM_THREADS(numThreads);
@@ -492,11 +514,22 @@ SRes MtCoder_Code(CMtCoder *p)
492 514
493 p->numStartedThreadsLimit = numThreads; 515 p->numStartedThreadsLimit = numThreads;
494 p->numStartedThreads = 0; 516 p->numStartedThreads = 0;
517 ThreadNextGroup_Init(&p->nextGroup, p->numThreadGroups, 0); // startGroup
495 518
496 // for (i = 0; i < numThreads; i++) 519 // for (i = 0; i < numThreads; i++)
497 { 520 {
521 // here we create new thread for first block.
522 // And each new thread will create another new thread after block reading
523 // until numStartedThreadsLimit is reached.
498 CMtCoderThread *nextThread = &p->threads[p->numStartedThreads++]; 524 CMtCoderThread *nextThread = &p->threads[p->numStartedThreads++];
499 RINOK(MtCoderThread_CreateAndStart(nextThread)) 525 {
526 const SRes res2 = MtCoderThread_CreateAndStart(nextThread
527#ifdef _WIN32
528 , p
529#endif
530 );
531 RINOK(res2)
532 }
500 } 533 }
501 534
502 RINOK_THREAD(Event_Set(&p->readEvent)) 535 RINOK_THREAD(Event_Set(&p->readEvent))
@@ -513,9 +546,9 @@ SRes MtCoder_Code(CMtCoder *p)
513 RINOK_THREAD(Event_Wait(&p->writeEvents[bi])) 546 RINOK_THREAD(Event_Wait(&p->writeEvents[bi]))
514 547
515 { 548 {
516 const CMtCoderBlock *block = &p->blocks[bi]; 549 const CMtCoderBlock * const block = &p->blocks[bi];
517 unsigned bufIndex = block->bufIndex; 550 const unsigned bufIndex = block->bufIndex;
518 BoolInt finished = block->finished; 551 const BoolInt finished = block->finished;
519 if (res == SZ_OK && block->res != SZ_OK) 552 if (res == SZ_OK && block->res != SZ_OK)
520 res = block->res; 553 res = block->res;
521 554
@@ -545,7 +578,7 @@ SRes MtCoder_Code(CMtCoder *p)
545 } 578 }
546 #else 579 #else
547 { 580 {
548 WRes wres = Event_Wait(&p->finishedEvent); 581 const WRes wres = Event_Wait(&p->finishedEvent);
549 res = MY_SRes_HRESULT_FROM_WRes(wres); 582 res = MY_SRes_HRESULT_FROM_WRes(wres);
550 } 583 }
551 #endif 584 #endif