aboutsummaryrefslogtreecommitdiff
path: root/C/Threads.c
diff options
context:
space:
mode:
authorIgor Pavlov <87184205+ip7z@users.noreply.github.com>2021-12-27 00:00:00 +0000
committerIgor Pavlov <87184205+ip7z@users.noreply.github.com>2022-03-18 15:35:13 +0500
commitf19f813537c7aea1c20749c914e756b54a9c3cf5 (patch)
tree816ba62ca7c0fa19f2eb46d9e9d6f7dd7c3a744d /C/Threads.c
parent98e06a519b63b81986abe76d28887f6984a7732b (diff)
download7zip-21.07.tar.gz
7zip-21.07.tar.bz2
7zip-21.07.zip
'21.07'21.07
Diffstat (limited to 'C/Threads.c')
-rw-r--r--C/Threads.c540
1 files changed, 540 insertions, 0 deletions
diff --git a/C/Threads.c b/C/Threads.c
new file mode 100644
index 0000000..58eb90f
--- /dev/null
+++ b/C/Threads.c
@@ -0,0 +1,540 @@
1/* Threads.c -- multithreading library
22021-12-21 : Igor Pavlov : Public domain */
3
4#include "Precomp.h"
5
6#ifdef _WIN32
7
8#ifndef USE_THREADS_CreateThread
9#include <process.h>
10#endif
11
12#include "Threads.h"
13
14static WRes GetError()
15{
16 DWORD res = GetLastError();
17 return res ? (WRes)res : 1;
18}
19
20static WRes HandleToWRes(HANDLE h) { return (h != NULL) ? 0 : GetError(); }
21static WRes BOOLToWRes(BOOL v) { return v ? 0 : GetError(); }
22
23WRes HandlePtr_Close(HANDLE *p)
24{
25 if (*p != NULL)
26 {
27 if (!CloseHandle(*p))
28 return GetError();
29 *p = NULL;
30 }
31 return 0;
32}
33
34WRes Handle_WaitObject(HANDLE h)
35{
36 DWORD dw = WaitForSingleObject(h, INFINITE);
37 /*
38 (dw) result:
39 WAIT_OBJECT_0 // 0
40 WAIT_ABANDONED // 0x00000080 : is not compatible with Win32 Error space
41 WAIT_TIMEOUT // 0x00000102 : is compatible with Win32 Error space
42 WAIT_FAILED // 0xFFFFFFFF
43 */
44 if (dw == WAIT_FAILED)
45 {
46 dw = GetLastError();
47 if (dw == 0)
48 return WAIT_FAILED;
49 }
50 return (WRes)dw;
51}
52
53#define Thread_Wait(p) Handle_WaitObject(*(p))
54
55WRes Thread_Wait_Close(CThread *p)
56{
57 WRes res = Thread_Wait(p);
58 WRes res2 = Thread_Close(p);
59 return (res != 0 ? res : res2);
60}
61
62WRes Thread_Create(CThread *p, THREAD_FUNC_TYPE func, LPVOID param)
63{
64 /* Windows Me/98/95: threadId parameter may not be NULL in _beginthreadex/CreateThread functions */
65
66 #ifdef USE_THREADS_CreateThread
67
68 DWORD threadId;
69 *p = CreateThread(NULL, 0, func, param, 0, &threadId);
70
71 #else
72
73 unsigned threadId;
74 *p = (HANDLE)(_beginthreadex(NULL, 0, func, param, 0, &threadId));
75
76 #endif
77
78 /* maybe we must use errno here, but probably GetLastError() is also OK. */
79 return HandleToWRes(*p);
80}
81
82
83WRes Thread_Create_With_Affinity(CThread *p, THREAD_FUNC_TYPE func, LPVOID param, CAffinityMask affinity)
84{
85 #ifdef USE_THREADS_CreateThread
86
87 UNUSED_VAR(affinity)
88 return Thread_Create(p, func, param);
89
90 #else
91
92 /* Windows Me/98/95: threadId parameter may not be NULL in _beginthreadex/CreateThread functions */
93 HANDLE h;
94 WRes wres;
95 unsigned threadId;
96 h = (HANDLE)(_beginthreadex(NULL, 0, func, param, CREATE_SUSPENDED, &threadId));
97 *p = h;
98 wres = HandleToWRes(h);
99 if (h)
100 {
101 {
102 // DWORD_PTR prevMask =
103 SetThreadAffinityMask(h, (DWORD_PTR)affinity);
104 /*
105 if (prevMask == 0)
106 {
107 // affinity change is non-critical error, so we can ignore it
108 // wres = GetError();
109 }
110 */
111 }
112 {
113 DWORD prevSuspendCount = ResumeThread(h);
114 /* ResumeThread() returns:
115 0 : was_not_suspended
116 1 : was_resumed
117 -1 : error
118 */
119 if (prevSuspendCount == (DWORD)-1)
120 wres = GetError();
121 }
122 }
123
124 /* maybe we must use errno here, but probably GetLastError() is also OK. */
125 return wres;
126
127 #endif
128}
129
130
131static WRes Event_Create(CEvent *p, BOOL manualReset, int signaled)
132{
133 *p = CreateEvent(NULL, manualReset, (signaled ? TRUE : FALSE), NULL);
134 return HandleToWRes(*p);
135}
136
137WRes Event_Set(CEvent *p) { return BOOLToWRes(SetEvent(*p)); }
138WRes Event_Reset(CEvent *p) { return BOOLToWRes(ResetEvent(*p)); }
139
140WRes ManualResetEvent_Create(CManualResetEvent *p, int signaled) { return Event_Create(p, TRUE, signaled); }
141WRes AutoResetEvent_Create(CAutoResetEvent *p, int signaled) { return Event_Create(p, FALSE, signaled); }
142WRes ManualResetEvent_CreateNotSignaled(CManualResetEvent *p) { return ManualResetEvent_Create(p, 0); }
143WRes AutoResetEvent_CreateNotSignaled(CAutoResetEvent *p) { return AutoResetEvent_Create(p, 0); }
144
145
146WRes Semaphore_Create(CSemaphore *p, UInt32 initCount, UInt32 maxCount)
147{
148 // negative ((LONG)maxCount) is not supported in WIN32::CreateSemaphore()
149 *p = CreateSemaphore(NULL, (LONG)initCount, (LONG)maxCount, NULL);
150 return HandleToWRes(*p);
151}
152
153WRes Semaphore_OptCreateInit(CSemaphore *p, UInt32 initCount, UInt32 maxCount)
154{
155 // if (Semaphore_IsCreated(p))
156 {
157 WRes wres = Semaphore_Close(p);
158 if (wres != 0)
159 return wres;
160 }
161 return Semaphore_Create(p, initCount, maxCount);
162}
163
164static WRes Semaphore_Release(CSemaphore *p, LONG releaseCount, LONG *previousCount)
165 { return BOOLToWRes(ReleaseSemaphore(*p, releaseCount, previousCount)); }
166WRes Semaphore_ReleaseN(CSemaphore *p, UInt32 num)
167 { return Semaphore_Release(p, (LONG)num, NULL); }
168WRes Semaphore_Release1(CSemaphore *p) { return Semaphore_ReleaseN(p, 1); }
169
170WRes CriticalSection_Init(CCriticalSection *p)
171{
172 /* InitializeCriticalSection() can raise exception:
173 Windows XP, 2003 : can raise a STATUS_NO_MEMORY exception
174 Windows Vista+ : no exceptions */
175 #ifdef _MSC_VER
176 __try
177 #endif
178 {
179 InitializeCriticalSection(p);
180 /* InitializeCriticalSectionAndSpinCount(p, 0); */
181 }
182 #ifdef _MSC_VER
183 __except (EXCEPTION_EXECUTE_HANDLER) { return ERROR_NOT_ENOUGH_MEMORY; }
184 #endif
185 return 0;
186}
187
188
189
190
191#else // _WIN32
192
193// ---------- POSIX ----------
194
195#ifndef __APPLE__
196#ifndef _7ZIP_AFFINITY_DISABLE
197// _GNU_SOURCE can be required for pthread_setaffinity_np() / CPU_ZERO / CPU_SET
198#define _GNU_SOURCE
199#endif
200#endif
201
202#include "Threads.h"
203
204#include <errno.h>
205#include <stdlib.h>
206#include <string.h>
207#ifdef _7ZIP_AFFINITY_SUPPORTED
208// #include <sched.h>
209#endif
210
211
212// #include <stdio.h>
213// #define PRF(p) p
214#define PRF(p)
215
216#define Print(s) PRF(printf("\n%s\n", s))
217
218// #include <stdio.h>
219
220WRes Thread_Create_With_CpuSet(CThread *p, THREAD_FUNC_TYPE func, LPVOID param, const CCpuSet *cpuSet)
221{
222 // new thread in Posix probably inherits affinity from parrent thread
223 Print("Thread_Create_With_CpuSet");
224
225 pthread_attr_t attr;
226 int ret;
227 // int ret2;
228
229 p->_created = 0;
230
231 RINOK(pthread_attr_init(&attr));
232
233 ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
234
235 if (!ret)
236 {
237 if (cpuSet)
238 {
239 #ifdef _7ZIP_AFFINITY_SUPPORTED
240
241 /*
242 printf("\n affinity :");
243 unsigned i;
244 for (i = 0; i < sizeof(*cpuSet) && i < 8; i++)
245 {
246 Byte b = *((const Byte *)cpuSet + i);
247 char temp[32];
248 #define GET_HEX_CHAR(t) ((char)(((t < 10) ? ('0' + t) : ('A' + (t - 10)))))
249 temp[0] = GET_HEX_CHAR((b & 0xF));
250 temp[1] = GET_HEX_CHAR((b >> 4));
251 // temp[0] = GET_HEX_CHAR((b >> 4)); // big-endian
252 // temp[1] = GET_HEX_CHAR((b & 0xF)); // big-endian
253 temp[2] = 0;
254 printf("%s", temp);
255 }
256 printf("\n");
257 */
258
259 // ret2 =
260 pthread_attr_setaffinity_np(&attr, sizeof(*cpuSet), cpuSet);
261 // if (ret2) ret = ret2;
262 #endif
263 }
264
265 ret = pthread_create(&p->_tid, &attr, func, param);
266
267 if (!ret)
268 {
269 p->_created = 1;
270 /*
271 if (cpuSet)
272 {
273 // ret2 =
274 pthread_setaffinity_np(p->_tid, sizeof(*cpuSet), cpuSet);
275 // if (ret2) ret = ret2;
276 }
277 */
278 }
279 }
280 // ret2 =
281 pthread_attr_destroy(&attr);
282 // if (ret2 != 0) ret = ret2;
283 return ret;
284}
285
286
287WRes Thread_Create(CThread *p, THREAD_FUNC_TYPE func, LPVOID param)
288{
289 return Thread_Create_With_CpuSet(p, func, param, NULL);
290}
291
292
293WRes Thread_Create_With_Affinity(CThread *p, THREAD_FUNC_TYPE func, LPVOID param, CAffinityMask affinity)
294{
295 Print("Thread_Create_WithAffinity");
296 CCpuSet cs;
297 unsigned i;
298 CpuSet_Zero(&cs);
299 for (i = 0; i < sizeof(affinity) * 8; i++)
300 {
301 if (affinity == 0)
302 break;
303 if (affinity & 1)
304 {
305 CpuSet_Set(&cs, i);
306 }
307 affinity >>= 1;
308 }
309 return Thread_Create_With_CpuSet(p, func, param, &cs);
310}
311
312
313WRes Thread_Close(CThread *p)
314{
315 // Print("Thread_Close");
316 int ret;
317 if (!p->_created)
318 return 0;
319
320 ret = pthread_detach(p->_tid);
321 p->_tid = 0;
322 p->_created = 0;
323 return ret;
324}
325
326
327WRes Thread_Wait_Close(CThread *p)
328{
329 // Print("Thread_Wait_Close");
330 void *thread_return;
331 int ret;
332 if (!p->_created)
333 return EINVAL;
334
335 ret = pthread_join(p->_tid, &thread_return);
336 // probably we can't use that (_tid) after pthread_join(), so we close thread here
337 p->_created = 0;
338 p->_tid = 0;
339 return ret;
340}
341
342
343
344static WRes Event_Create(CEvent *p, int manualReset, int signaled)
345{
346 RINOK(pthread_mutex_init(&p->_mutex, NULL));
347 RINOK(pthread_cond_init(&p->_cond, NULL));
348 p->_manual_reset = manualReset;
349 p->_state = (signaled ? True : False);
350 p->_created = 1;
351 return 0;
352}
353
354WRes ManualResetEvent_Create(CManualResetEvent *p, int signaled)
355 { return Event_Create(p, True, signaled); }
356WRes ManualResetEvent_CreateNotSignaled(CManualResetEvent *p)
357 { return ManualResetEvent_Create(p, 0); }
358WRes AutoResetEvent_Create(CAutoResetEvent *p, int signaled)
359 { return Event_Create(p, False, signaled); }
360WRes AutoResetEvent_CreateNotSignaled(CAutoResetEvent *p)
361 { return AutoResetEvent_Create(p, 0); }
362
363
364WRes Event_Set(CEvent *p)
365{
366 RINOK(pthread_mutex_lock(&p->_mutex));
367 p->_state = True;
368 int res1 = pthread_cond_broadcast(&p->_cond);
369 int res2 = pthread_mutex_unlock(&p->_mutex);
370 return (res2 ? res2 : res1);
371}
372
373WRes Event_Reset(CEvent *p)
374{
375 RINOK(pthread_mutex_lock(&p->_mutex));
376 p->_state = False;
377 return pthread_mutex_unlock(&p->_mutex);
378}
379
380WRes Event_Wait(CEvent *p)
381{
382 RINOK(pthread_mutex_lock(&p->_mutex));
383 while (p->_state == False)
384 {
385 // ETIMEDOUT
386 // ret =
387 pthread_cond_wait(&p->_cond, &p->_mutex);
388 // if (ret != 0) break;
389 }
390 if (p->_manual_reset == False)
391 {
392 p->_state = False;
393 }
394 return pthread_mutex_unlock(&p->_mutex);
395}
396
397WRes Event_Close(CEvent *p)
398{
399 if (!p->_created)
400 return 0;
401 p->_created = 0;
402 {
403 int res1 = pthread_mutex_destroy(&p->_mutex);
404 int res2 = pthread_cond_destroy(&p->_cond);
405 return (res1 ? res1 : res2);
406 }
407}
408
409
410WRes Semaphore_Create(CSemaphore *p, UInt32 initCount, UInt32 maxCount)
411{
412 if (initCount > maxCount || maxCount < 1)
413 return EINVAL;
414 RINOK(pthread_mutex_init(&p->_mutex, NULL));
415 RINOK(pthread_cond_init(&p->_cond, NULL));
416 p->_count = initCount;
417 p->_maxCount = maxCount;
418 p->_created = 1;
419 return 0;
420}
421
422
423WRes Semaphore_OptCreateInit(CSemaphore *p, UInt32 initCount, UInt32 maxCount)
424{
425 if (Semaphore_IsCreated(p))
426 {
427 /*
428 WRes wres = Semaphore_Close(p);
429 if (wres != 0)
430 return wres;
431 */
432 if (initCount > maxCount || maxCount < 1)
433 return EINVAL;
434 // return EINVAL; // for debug
435 p->_count = initCount;
436 p->_maxCount = maxCount;
437 return 0;
438 }
439 return Semaphore_Create(p, initCount, maxCount);
440}
441
442
443WRes Semaphore_ReleaseN(CSemaphore *p, UInt32 releaseCount)
444{
445 UInt32 newCount;
446 int ret;
447
448 if (releaseCount < 1)
449 return EINVAL;
450
451 RINOK(pthread_mutex_lock(&p->_mutex));
452
453 newCount = p->_count + releaseCount;
454 if (newCount > p->_maxCount)
455 ret = ERROR_TOO_MANY_POSTS; // EINVAL;
456 else
457 {
458 p->_count = newCount;
459 ret = pthread_cond_broadcast(&p->_cond);
460 }
461 RINOK(pthread_mutex_unlock(&p->_mutex));
462 return ret;
463}
464
465WRes Semaphore_Wait(CSemaphore *p)
466{
467 RINOK(pthread_mutex_lock(&p->_mutex));
468 while (p->_count < 1)
469 {
470 pthread_cond_wait(&p->_cond, &p->_mutex);
471 }
472 p->_count--;
473 return pthread_mutex_unlock(&p->_mutex);
474}
475
476WRes Semaphore_Close(CSemaphore *p)
477{
478 if (!p->_created)
479 return 0;
480 p->_created = 0;
481 {
482 int res1 = pthread_mutex_destroy(&p->_mutex);
483 int res2 = pthread_cond_destroy(&p->_cond);
484 return (res1 ? res1 : res2);
485 }
486}
487
488
489
490WRes CriticalSection_Init(CCriticalSection *p)
491{
492 // Print("CriticalSection_Init");
493 if (!p)
494 return EINTR;
495 return pthread_mutex_init(&p->_mutex, NULL);
496}
497
498void CriticalSection_Enter(CCriticalSection *p)
499{
500 // Print("CriticalSection_Enter");
501 if (p)
502 {
503 // int ret =
504 pthread_mutex_lock(&p->_mutex);
505 }
506}
507
508void CriticalSection_Leave(CCriticalSection *p)
509{
510 // Print("CriticalSection_Leave");
511 if (p)
512 {
513 // int ret =
514 pthread_mutex_unlock(&p->_mutex);
515 }
516}
517
518void CriticalSection_Delete(CCriticalSection *p)
519{
520 // Print("CriticalSection_Delete");
521 if (p)
522 {
523 // int ret =
524 pthread_mutex_destroy(&p->_mutex);
525 }
526}
527
528LONG InterlockedIncrement(LONG volatile *addend)
529{
530 // Print("InterlockedIncrement");
531 #ifdef USE_HACK_UNSAFE_ATOMIC
532 LONG val = *addend + 1;
533 *addend = val;
534 return val;
535 #else
536 return __sync_add_and_fetch(addend, 1);
537 #endif
538}
539
540#endif // _WIN32