aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/cancel.cpp178
-rw-r--r--src/cancel.h8
-rw-r--r--src/keeper.cpp19
-rw-r--r--src/keeper.h10
-rw-r--r--src/lanes.cpp337
-rw-r--r--src/lanes.lua451
-rw-r--r--src/lanes_private.h43
-rw-r--r--src/linda.cpp92
-rw-r--r--src/macros_and_utils.h5
-rw-r--r--src/threading.cpp525
-rw-r--r--src/threading.h58
11 files changed, 555 insertions, 1171 deletions
diff --git a/src/cancel.cpp b/src/cancel.cpp
index 4667f07..6a94343 100644
--- a/src/cancel.cpp
+++ b/src/cancel.cpp
@@ -92,7 +92,7 @@ static void cancel_hook(lua_State* L, [[maybe_unused]] lua_Debug* ar)
92// ################################################################################################ 92// ################################################################################################
93 93
94//--- 94//---
95// = thread_cancel( lane_ud [,timeout_secs=0.0] [,force_kill_bool=false] ) 95// = thread_cancel( lane_ud [,timeout_secs=0.0] [,wake_lindas_bool=false] )
96// 96//
97// The originator thread asking us specifically to cancel the other thread. 97// The originator thread asking us specifically to cancel the other thread.
98// 98//
@@ -100,9 +100,8 @@ static void cancel_hook(lua_State* L, [[maybe_unused]] lua_Debug* ar)
100// 0.0: just signal it to cancel, no time waited 100// 0.0: just signal it to cancel, no time waited
101// >0: time to wait for the lane to detect cancellation 101// >0: time to wait for the lane to detect cancellation
102// 102//
103// 'force_kill': if true, and lane does not detect cancellation within timeout, 103// 'wake_lindas_bool': if true, signal any linda the thread is waiting on
104// it is forcefully killed. Using this with 0.0 timeout means just kill 104// instead of waiting for its timeout (if any)
105// (unless the lane is already finished).
106// 105//
107// Returns: true if the lane was already finished (DONE/ERROR_ST/CANCELLED) or if we 106// Returns: true if the lane was already finished (DONE/ERROR_ST/CANCELLED) or if we
108// managed to cancel it. 107// managed to cancel it.
@@ -111,76 +110,47 @@ static void cancel_hook(lua_State* L, [[maybe_unused]] lua_Debug* ar)
111 110
112// ################################################################################################ 111// ################################################################################################
113 112
114static CancelResult thread_cancel_soft(Lane* lane_, double secs_, bool wake_lindas_) 113static CancelResult thread_cancel_soft(Lane* lane_, lua_Duration duration_, bool wake_lane_)
115{ 114{
116 lane_->cancel_request = CancelRequest::Soft; // it's now signaled to stop 115 lane_->cancel_request = CancelRequest::Soft; // it's now signaled to stop
117 // negative timeout: we don't want to truly abort the lane, we just want it to react to cancel_test() on its own 116 // negative timeout: we don't want to truly abort the lane, we just want it to react to cancel_test() on its own
118 if (wake_lindas_) // wake the thread so that execution returns from any pending linda operation if desired 117 if (wake_lane_) // wake the thread so that execution returns from any pending linda operation if desired
119 { 118 {
120 SIGNAL_T* const waiting_on{ lane_->waiting_on }; 119 std::condition_variable* const waiting_on{ lane_->m_waiting_on };
121 if (lane_->status == WAITING && waiting_on != nullptr) 120 if (lane_->status == WAITING && waiting_on != nullptr)
122 { 121 {
123 SIGNAL_ALL( waiting_on); 122 waiting_on->notify_all();
124 } 123 }
125 } 124 }
126 125
127 return THREAD_WAIT(&lane_->thread, secs_, &lane_->done_signal, &lane_->done_lock, &lane_->status) ? CancelResult::Cancelled : CancelResult::Timeout; 126 return lane_->waitForCompletion(duration_) ? CancelResult::Cancelled : CancelResult::Timeout;
128} 127}
129 128
130// ################################################################################################ 129// ################################################################################################
131 130
132static CancelResult thread_cancel_hard(lua_State* L, Lane* lane_, double secs_, bool force_, double waitkill_timeout_) 131static CancelResult thread_cancel_hard(Lane* lane_, lua_Duration duration_, bool wake_lane_)
133{ 132{
134 lane_->cancel_request = CancelRequest::Hard; // it's now signaled to stop 133 lane_->cancel_request = CancelRequest::Hard; // it's now signaled to stop
134 //lane_->m_thread.get_stop_source().request_stop();
135 if (wake_lane_) // wake the thread so that execution returns from any pending linda operation if desired
135 { 136 {
136 SIGNAL_T* waiting_on = lane_->waiting_on; 137 std::condition_variable* waiting_on = lane_->m_waiting_on;
137 if (lane_->status == WAITING && waiting_on != nullptr) 138 if (lane_->status == WAITING && waiting_on != nullptr)
138 { 139 {
139 SIGNAL_ALL( waiting_on); 140 waiting_on->notify_all();
140 } 141 }
141 } 142 }
142 143
143 CancelResult result{ THREAD_WAIT(&lane_->thread, secs_, &lane_->done_signal, &lane_->done_lock, &lane_->status) ? CancelResult::Cancelled : CancelResult::Timeout }; 144 CancelResult result{ lane_->waitForCompletion(duration_) ? CancelResult::Cancelled : CancelResult::Timeout };
144
145 if ((result == CancelResult::Timeout) && force_)
146 {
147 // Killing is asynchronous; we _will_ wait for it to be done at
148 // GC, to make sure the data structure can be released (alternative
149 // would be use of "cancellation cleanup handlers" that at least
150 // PThread seems to have).
151 //
152 THREAD_KILL(&lane_->thread);
153#if THREADAPI == THREADAPI_PTHREAD
154 // pthread: make sure the thread is really stopped!
155 // note that this may block forever if the lane doesn't call a cancellation point and pthread doesn't honor PTHREAD_CANCEL_ASYNCHRONOUS
156 result = THREAD_WAIT(&lane_->thread, waitkill_timeout_, &lane_->done_signal, &lane_->done_lock, &lane_->status) ? CancelResult::Killed : CancelResult::Timeout;
157 if (result == CancelResult::Timeout)
158 {
159 std::ignore = luaL_error( L, "force-killed lane failed to terminate within %f second%s", waitkill_timeout_, waitkill_timeout_ > 1 ? "s" : "");
160 }
161#else
162 (void) waitkill_timeout_; // unused
163 (void) L; // unused
164#endif // THREADAPI == THREADAPI_PTHREAD
165 lane_->mstatus = Lane::Killed; // mark 'gc' to wait for it
166 // note that lane_->status value must remain to whatever it was at the time of the kill
167 // because we need to know if we can lua_close() the Lua State or not.
168 result = CancelResult::Killed;
169 }
170 return result; 145 return result;
171} 146}
172 147
173// ################################################################################################ 148// ################################################################################################
174 149
175CancelResult thread_cancel(lua_State* L, Lane* lane_, CancelOp op_, double secs_, bool force_, double waitkill_timeout_) 150CancelResult thread_cancel(Lane* lane_, CancelOp op_, int hook_count_, lua_Duration duration_, bool wake_lane_)
176{ 151{
177 // remember that lanes are not transferable: only one thread can cancel a lane, so no multithreading issue here 152 // remember that lanes are not transferable: only one thread can cancel a lane, so no multithreading issue here
178 // We can read 'lane_->status' without locks, but not wait for it (if Posix no PTHREAD_TIMEDJOIN) 153 // We can read 'lane_->status' without locks, but not wait for it (if Posix no PTHREAD_TIMEDJOIN)
179 if (lane_->mstatus == Lane::Killed)
180 {
181 return CancelResult::Killed;
182 }
183
184 if (lane_->status >= DONE) 154 if (lane_->status >= DONE)
185 { 155 {
186 // say "ok" by default, including when lane is already done 156 // say "ok" by default, including when lane is already done
@@ -191,48 +161,57 @@ CancelResult thread_cancel(lua_State* L, Lane* lane_, CancelOp op_, double secs_
191 // let us hope we never land here with a pointer on a linda that has been destroyed... 161 // let us hope we never land here with a pointer on a linda that has been destroyed...
192 if (op_ == CancelOp::Soft) 162 if (op_ == CancelOp::Soft)
193 { 163 {
194 return thread_cancel_soft(lane_, secs_, force_); 164 return thread_cancel_soft(lane_, duration_, wake_lane_);
165 }
166 else if (static_cast<int>(op_) > static_cast<int>(CancelOp::Soft))
167 {
168 lua_sethook(lane_->L, cancel_hook, static_cast<int>(op_), hook_count_);
195 } 169 }
196 170
197 return thread_cancel_hard(L, lane_, secs_, force_, waitkill_timeout_); 171 return thread_cancel_hard(lane_, duration_, wake_lane_);
198} 172}
199 173
200// ################################################################################################ 174// ################################################################################################
201// ################################################################################################ 175// ################################################################################################
202 176
203// > 0: the mask 177CancelOp which_cancel_op(char const* op_string_)
204// = 0: soft 178{
205// < 0: hard 179 CancelOp op{ CancelOp::Invalid };
206static CancelOp which_op(lua_State* L, int idx_) 180 if (strcmp(op_string_, "hard") == 0)
181 {
182 op = CancelOp::Hard;
183 }
184 else if (strcmp(op_string_, "soft") == 0)
185 {
186 op = CancelOp::Soft;
187 }
188 else if (strcmp(op_string_, "call") == 0)
189 {
190 op = CancelOp::MaskCall;
191 }
192 else if (strcmp(op_string_, "ret") == 0)
193 {
194 op = CancelOp::MaskRet;
195 }
196 else if (strcmp(op_string_, "line") == 0)
197 {
198 op = CancelOp::MaskLine;
199 }
200 else if (strcmp(op_string_, "count") == 0)
201 {
202 op = CancelOp::MaskCount;
203 }
204 return op;
205}
206
207// ################################################################################################
208
209static CancelOp which_cancel_op(lua_State* L, int idx_)
207{ 210{
208 if (lua_type(L, idx_) == LUA_TSTRING) 211 if (lua_type(L, idx_) == LUA_TSTRING)
209 { 212 {
210 CancelOp op{ CancelOp::Invalid }; 213 char const* const str{ lua_tostring(L, idx_) };
211 char const* str = lua_tostring(L, idx_); 214 CancelOp op{ which_cancel_op(str) };
212 if (strcmp(str, "hard") == 0)
213 {
214 op = CancelOp::Hard;
215 }
216 else if (strcmp(str, "soft") == 0)
217 {
218 op = CancelOp::Soft;
219 }
220 else if (strcmp(str, "call") == 0)
221 {
222 op = CancelOp::MaskCall;
223 }
224 else if (strcmp(str, "ret") == 0)
225 {
226 op = CancelOp::MaskRet;
227 }
228 else if (strcmp(str, "line") == 0)
229 {
230 op = CancelOp::MaskLine;
231 }
232 else if (strcmp(str, "count") == 0)
233 {
234 op = CancelOp::MaskCount;
235 }
236 lua_remove(L, idx_); // argument is processed, remove it 215 lua_remove(L, idx_); // argument is processed, remove it
237 if (op == CancelOp::Invalid) 216 if (op == CancelOp::Invalid)
238 { 217 {
@@ -245,53 +224,60 @@ static CancelOp which_op(lua_State* L, int idx_)
245 224
246// ################################################################################################ 225// ################################################################################################
247 226
248// bool[,reason] = lane_h:cancel( [mode, hookcount] [, timeout] [, force [, forcekill_timeout]]) 227// bool[,reason] = lane_h:cancel( [mode, hookcount] [, timeout] [, wake_lindas])
249LUAG_FUNC(thread_cancel) 228LUAG_FUNC(thread_cancel)
250{ 229{
251 Lane* const lane{ lua_toLane(L, 1) }; 230 Lane* const lane{ lua_toLane(L, 1) };
252 CancelOp const op{ which_op(L, 2) }; // this removes the op string from the stack 231 CancelOp const op{ which_cancel_op(L, 2) }; // this removes the op string from the stack
253 232
233 int hook_count{ 0 };
254 if (static_cast<int>(op) > static_cast<int>(CancelOp::Soft)) // hook is requested 234 if (static_cast<int>(op) > static_cast<int>(CancelOp::Soft)) // hook is requested
255 { 235 {
256 int const hook_count{ static_cast<int>(lua_tointeger(L, 2)) }; 236 hook_count = static_cast<int>(luaL_checkinteger(L, 2));
257 lua_remove(L, 2); // argument is processed, remove it 237 lua_remove(L, 2); // argument is processed, remove it
258 if (hook_count < 1) 238 if (hook_count < 1)
259 { 239 {
260 return luaL_error(L, "hook count cannot be < 1"); 240 return luaL_error(L, "hook count cannot be < 1");
261 } 241 }
262 lua_sethook(lane->L, cancel_hook, static_cast<int>(op), hook_count);
263 } 242 }
264 243
265 double secs{ 0.0 }; 244 lua_Duration wait_timeout{ 0.0 };
266 if (lua_type(L, 2) == LUA_TNUMBER) 245 if (lua_type(L, 2) == LUA_TNUMBER)
267 { 246 {
268 secs = lua_tonumber(L, 2); 247 wait_timeout = lua_Duration{ lua_tonumber(L, 2) };
269 lua_remove(L, 2); // argument is processed, remove it 248 lua_remove(L, 2); // argument is processed, remove it
270 if (secs < 0.0) 249 if (wait_timeout.count() < 0.0)
271 { 250 {
272 return luaL_error(L, "cancel timeout cannot be < 0"); 251 return luaL_error(L, "cancel timeout cannot be < 0");
273 } 252 }
274 } 253 }
275 254 // we wake by default in "hard" mode (remember that hook is hard too), but this can be turned off if desired
276 bool const force{ lua_toboolean(L, 2) ? true : false }; // false if nothing there 255 bool wake_lane{ op != CancelOp::Soft };
277 double const forcekill_timeout{ luaL_optnumber(L, 3, 0.0) }; 256 if (lua_gettop(L) >= 2)
278 switch (thread_cancel(L, lane, op, secs, force, forcekill_timeout)) 257 {
258 if (!lua_isboolean(L, 2))
259 {
260 return luaL_error(L, "wake_lindas parameter is not a boolean");
261 }
262 wake_lane = lua_toboolean(L, 2);
263 lua_remove(L, 2); // argument is processed, remove it
264 }
265 switch (thread_cancel(lane, op, hook_count, wait_timeout, wake_lane))
279 { 266 {
267 default: // should never happen unless we added a case and forgot to handle it
268 ASSERT_L(false);
269 break;
270
280 case CancelResult::Timeout: 271 case CancelResult::Timeout:
281 lua_pushboolean(L, 0); 272 lua_pushboolean(L, 0);
282 lua_pushstring(L, "timeout"); 273 lua_pushstring(L, "timeout");
283 return 2; 274 break;
284 275
285 case CancelResult::Cancelled: 276 case CancelResult::Cancelled:
286 lua_pushboolean(L, 1); 277 lua_pushboolean(L, 1);
287 push_thread_status(L, lane); 278 push_thread_status(L, lane);
288 return 2; 279 break;
289
290 case CancelResult::Killed:
291 lua_pushboolean(L, 1);
292 push_thread_status(L, lane);
293 return 2;
294 } 280 }
295 // should never happen, only here to prevent the compiler from complaining of "not all control paths returning a value" 281 // should never happen, only here to prevent the compiler from complaining of "not all control paths returning a value"
296 return 0; 282 return 2;
297} 283}
diff --git a/src/cancel.h b/src/cancel.h
index 884e193..954b04e 100644
--- a/src/cancel.h
+++ b/src/cancel.h
@@ -13,6 +13,8 @@ extern "C" {
13#include "uniquekey.h" 13#include "uniquekey.h"
14#include "macros_and_utils.h" 14#include "macros_and_utils.h"
15 15
16#include <chrono>
17
16// ################################################################################################ 18// ################################################################################################
17 19
18class Lane; // forward 20class Lane; // forward
@@ -30,8 +32,7 @@ enum class CancelRequest
30enum class CancelResult 32enum class CancelResult
31{ 33{
32 Timeout, 34 Timeout,
33 Cancelled, 35 Cancelled
34 Killed
35}; 36};
36 37
37enum class CancelOp 38enum class CancelOp
@@ -48,7 +49,8 @@ enum class CancelOp
48// crc64/we of string "CANCEL_ERROR" generated at http://www.nitrxgen.net/hashgen/ 49// crc64/we of string "CANCEL_ERROR" generated at http://www.nitrxgen.net/hashgen/
49static constexpr UniqueKey CANCEL_ERROR{ 0xe97d41626cc97577ull }; // 'raise_cancel_error' sentinel 50static constexpr UniqueKey CANCEL_ERROR{ 0xe97d41626cc97577ull }; // 'raise_cancel_error' sentinel
50 51
51CancelResult thread_cancel(lua_State* L, Lane* lane_, CancelOp op_, double secs_, bool force_, double waitkill_timeout_); 52CancelOp which_cancel_op(char const* op_string_);
53CancelResult thread_cancel(Lane* lane_, CancelOp op_, int hook_count_, lua_Duration secs_, bool wake_lindas_);
52 54
53[[noreturn]] static inline void raise_cancel_error(lua_State* L) 55[[noreturn]] static inline void raise_cancel_error(lua_State* L)
54{ 56{
diff --git a/src/keeper.cpp b/src/keeper.cpp
index 937d190..0aea18e 100644
--- a/src/keeper.cpp
+++ b/src/keeper.cpp
@@ -627,7 +627,7 @@ void close_keepers(Universe* U)
627 } 627 }
628 for (int i = 0; i < nbKeepers; ++i) 628 for (int i = 0; i < nbKeepers; ++i)
629 { 629 {
630 MUTEX_FREE(&U->keepers->keeper_array[i].keeper_cs); 630 U->keepers->keeper_array[i].~Keeper();
631 } 631 }
632 // free the keeper bookkeeping structure 632 // free the keeper bookkeeping structure
633 U->internal_allocator.free(U->keepers, sizeof(Keepers) + (nbKeepers - 1) * sizeof(Keeper)); 633 U->internal_allocator.free(U->keepers, sizeof(Keepers) + (nbKeepers - 1) * sizeof(Keeper));
@@ -673,9 +673,14 @@ void init_keepers(Universe* U, lua_State* L)
673 { 673 {
674 std::ignore = luaL_error(L, "init_keepers() failed while creating keeper array; out of memory"); 674 std::ignore = luaL_error(L, "init_keepers() failed while creating keeper array; out of memory");
675 } 675 }
676 memset(U->keepers, 0, bytes); 676 U->keepers->Keepers::Keepers();
677 U->keepers->gc_threshold = keepers_gc_threshold; 677 U->keepers->gc_threshold = keepers_gc_threshold;
678 U->keepers->nb_keepers = nb_keepers; 678 U->keepers->nb_keepers = nb_keepers;
679
680 for (int i = 0; i < nb_keepers; ++i)
681 {
682 U->keepers->keeper_array[i].Keeper::Keeper();
683 }
679 } 684 }
680 for (int i = 0; i < nb_keepers; ++i) // keepersUD 685 for (int i = 0; i < nb_keepers; ++i) // keepersUD
681 { 686 {
@@ -687,10 +692,6 @@ void init_keepers(Universe* U, lua_State* L)
687 } 692 }
688 693
689 U->keepers->keeper_array[i].L = K; 694 U->keepers->keeper_array[i].L = K;
690 // we can trigger a GC from inside keeper_call(), where a keeper is acquired
691 // from there, GC can collect a linda, which would acquire the keeper again, and deadlock the thread.
692 // therefore, we need a recursive mutex.
693 MUTEX_RECURSIVE_INIT(&U->keepers->keeper_array[i].keeper_cs);
694 695
695 if (U->keepers->gc_threshold >= 0) 696 if (U->keepers->gc_threshold >= 0)
696 { 697 {
@@ -772,8 +773,7 @@ Keeper* keeper_acquire(Keepers* keepers_, uintptr_t magic_)
772 */ 773 */
773 unsigned int i = (unsigned int)((magic_ >> KEEPER_MAGIC_SHIFT) % nbKeepers); 774 unsigned int i = (unsigned int)((magic_ >> KEEPER_MAGIC_SHIFT) % nbKeepers);
774 Keeper* K = &keepers_->keeper_array[i]; 775 Keeper* K = &keepers_->keeper_array[i];
775 776 K->m_mutex.lock();
776 MUTEX_LOCK( &K->keeper_cs);
777 //++ K->count; 777 //++ K->count;
778 return K; 778 return K;
779 } 779 }
@@ -787,7 +787,7 @@ void keeper_release(Keeper* K)
787 //-- K->count; 787 //-- K->count;
788 if (K) 788 if (K)
789 { 789 {
790 MUTEX_UNLOCK(&K->keeper_cs); 790 K->m_mutex.unlock();
791 } 791 }
792} 792}
793 793
@@ -843,7 +843,6 @@ int keeper_call(Universe* U, lua_State* K, keeper_api_t func_, lua_State* L, voi
843 if ((args == 0) || luaG_inter_copy(U, L, K, args, LookupMode::ToKeeper) == 0) // L->K 843 if ((args == 0) || luaG_inter_copy(U, L, K, args, LookupMode::ToKeeper) == 0) // L->K
844 { 844 {
845 lua_call(K, 1 + args, LUA_MULTRET); 845 lua_call(K, 1 + args, LUA_MULTRET);
846
847 retvals = lua_gettop(K) - Ktos; 846 retvals = lua_gettop(K) - Ktos;
848 // note that this can raise a luaL_error while the keeper state (and its mutex) is acquired 847 // note that this can raise a luaL_error while the keeper state (and its mutex) is acquired
849 // this may interrupt a lane, causing the destruction of the underlying OS thread 848 // this may interrupt a lane, causing the destruction of the underlying OS thread
diff --git a/src/keeper.h b/src/keeper.h
index f7e3951..931c1d5 100644
--- a/src/keeper.h
+++ b/src/keeper.h
@@ -11,21 +11,23 @@ extern "C" {
11#include "threading.h" 11#include "threading.h"
12#include "uniquekey.h" 12#include "uniquekey.h"
13 13
14#include <mutex>
15
14// forwards 16// forwards
15enum class LookupMode; 17enum class LookupMode;
16struct Universe; 18struct Universe;
17 19
18struct Keeper 20struct Keeper
19{ 21{
20 MUTEX_T keeper_cs; 22 std::mutex m_mutex;
21 lua_State* L; 23 lua_State* L{ nullptr };
22 // int count; 24 // int count;
23}; 25};
24 26
25struct Keepers 27struct Keepers
26{ 28{
27 int gc_threshold{ 0 }; 29 int gc_threshold{ 0 };
28 int nb_keepers; 30 int nb_keepers{ 0 };
29 Keeper keeper_array[1]; 31 Keeper keeper_array[1];
30}; 32};
31 33
@@ -38,7 +40,7 @@ void close_keepers(Universe* U);
38 40
39Keeper* which_keeper(Keepers* keepers_, uintptr_t magic_); 41Keeper* which_keeper(Keepers* keepers_, uintptr_t magic_);
40Keeper* keeper_acquire(Keepers* keepers_, uintptr_t magic_); 42Keeper* keeper_acquire(Keepers* keepers_, uintptr_t magic_);
41void keeper_release(Keeper* K); 43void keeper_release(Keeper* K_);
42void keeper_toggle_nil_sentinels(lua_State* L, int val_i_, LookupMode const mode_); 44void keeper_toggle_nil_sentinels(lua_State* L, int val_i_, LookupMode const mode_);
43int keeper_push_linda_storage(Universe* U, lua_State* L, void* ptr_, uintptr_t magic_); 45int keeper_push_linda_storage(Universe* U, lua_State* L, void* ptr_, uintptr_t magic_);
44 46
diff --git a/src/lanes.cpp b/src/lanes.cpp
index 08584a2..4dd9b46 100644
--- a/src/lanes.cpp
+++ b/src/lanes.cpp
@@ -108,11 +108,6 @@ Lane::Lane(Universe* U_, lua_State* L_)
108: U{ U_ } 108: U{ U_ }
109, L{ L_ } 109, L{ L_ }
110{ 110{
111#if THREADWAIT_METHOD == THREADWAIT_CONDVAR
112 MUTEX_INIT(&done_lock);
113 SIGNAL_INIT(&done_signal);
114#endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR
115
116#if HAVE_LANE_TRACKING() 111#if HAVE_LANE_TRACKING()
117 if (U->tracking_first) 112 if (U->tracking_first)
118 { 113 {
@@ -121,6 +116,29 @@ Lane::Lane(Universe* U_, lua_State* L_)
121#endif // HAVE_LANE_TRACKING() 116#endif // HAVE_LANE_TRACKING()
122} 117}
123 118
119bool Lane::waitForCompletion(lua_Duration duration_)
120{
121 std::chrono::time_point<std::chrono::steady_clock> until{ std::chrono::time_point<std::chrono::steady_clock>::max() };
122 if (duration_.count() >= 0.0)
123 {
124 until = std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::steady_clock::duration>(duration_);
125 }
126
127 std::unique_lock lock{ m_done_mutex };
128 //std::stop_token token{ m_thread.get_stop_token() };
129 //return m_done_signal.wait_for(lock, token, secs_, [this](){ return status >= DONE; });
130 return m_done_signal.wait_until(lock, until, [this](){ return status >= DONE; });
131}
132
133static void lane_main(Lane* lane);
134void Lane::startThread(int priority_)
135{
136 m_thread = std::jthread([this]() { lane_main(this); });
137 if (priority_ != THREAD_PRIO_DEFAULT)
138 {
139 JTHREAD_SET_PRIORITY(m_thread, priority_);
140 }
141}
124 142
125/* Do you want full call stacks, or just the line where the error happened? 143/* Do you want full call stacks, or just the line where the error happened?
126* 144*
@@ -144,7 +162,7 @@ static void securize_debug_threadname(lua_State* L, Lane* lane_)
144} 162}
145 163
146#if ERROR_FULL_STACK 164#if ERROR_FULL_STACK
147static int lane_error( lua_State* L); 165static int lane_error(lua_State* L);
148// crc64/we of string "STACKTRACE_REGKEY" generated at http://www.nitrxgen.net/hashgen/ 166// crc64/we of string "STACKTRACE_REGKEY" generated at http://www.nitrxgen.net/hashgen/
149static constexpr UniqueKey STACKTRACE_REGKEY{ 0x534af7d3226a429full }; 167static constexpr UniqueKey STACKTRACE_REGKEY{ 0x534af7d3226a429full };
150#endif // ERROR_FULL_STACK 168#endif // ERROR_FULL_STACK
@@ -255,11 +273,6 @@ Lane::~Lane()
255{ 273{
256 // Clean up after a (finished) thread 274 // Clean up after a (finished) thread
257 // 275 //
258#if THREADWAIT_METHOD == THREADWAIT_CONDVAR
259 SIGNAL_FREE(&done_signal);
260 MUTEX_FREE(&done_lock);
261#endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR
262
263#if HAVE_LANE_TRACKING() 276#if HAVE_LANE_TRACKING()
264 if (U->tracking_first != nullptr) 277 if (U->tracking_first != nullptr)
265 { 278 {
@@ -455,26 +468,27 @@ static bool selfdestruct_remove(Lane* lane_)
455static int universe_gc( lua_State* L) 468static int universe_gc( lua_State* L)
456{ 469{
457 Universe* const U{ lua_tofulluserdata<Universe>(L, 1) }; 470 Universe* const U{ lua_tofulluserdata<Universe>(L, 1) };
471 lua_Duration const shutdown_timeout{ lua_tonumber(L, lua_upvalueindex(1)) };
472 [[maybe_unused]] char const* const op_string{ lua_tostring(L, lua_upvalueindex(2)) };
473 CancelOp const op{ which_cancel_op(op_string) };
458 474
459 while (U->selfdestruct_first != SELFDESTRUCT_END) // true at most once! 475 if (U->selfdestruct_first != SELFDESTRUCT_END)
460 { 476 {
477
461 // Signal _all_ still running threads to exit (including the timer thread) 478 // Signal _all_ still running threads to exit (including the timer thread)
462 // 479 //
463 { 480 {
464 std::lock_guard<std::mutex> guard{ U->selfdestruct_cs }; 481 std::lock_guard<std::mutex> guard{ U->selfdestruct_cs };
465 Lane* lane{ U->selfdestruct_first }; 482 Lane* lane{ U->selfdestruct_first };
483 lua_Duration timeout{ 1us };
466 while (lane != SELFDESTRUCT_END) 484 while (lane != SELFDESTRUCT_END)
467 { 485 {
468 // attempt a regular unforced hard cancel with a small timeout 486 // attempt the requested cancel with a small timeout.
469 bool const cancelled{ THREAD_ISNULL(lane->thread) || thread_cancel(L, lane, CancelOp::Hard, 0.0001, false, 0.0) != CancelResult::Timeout }; 487 // if waiting on a linda, they will raise a cancel_error.
470 // if we failed, and we know the thread is waiting on a linda 488 // if a cancellation hook is desired, it will be installed to try to raise an error
471 if (cancelled == false && lane->status == WAITING && lane->waiting_on != nullptr) 489 if (lane->m_thread.joinable())
472 { 490 {
473 // signal the linda to wake up the thread so that it can react to the cancel query 491 std::ignore = thread_cancel(lane, op, 1, timeout, true);
474 // let us hope we never land here with a pointer on a linda that has been destroyed...
475 SIGNAL_T* const waiting_on{ lane->waiting_on };
476 // lane->waiting_on = nullptr; // useful, or not?
477 SIGNAL_ALL(waiting_on);
478 } 492 }
479 lane = lane->selfdestruct_next; 493 lane = lane->selfdestruct_next;
480 } 494 }
@@ -482,47 +496,32 @@ static int universe_gc( lua_State* L)
482 496
483 // When noticing their cancel, the lanes will remove themselves from 497 // When noticing their cancel, the lanes will remove themselves from
484 // the selfdestruct chain. 498 // the selfdestruct chain.
485
486 // TBD: Not sure if Windows (multi core) will require the timed approach,
487 // or single Yield. I don't have machine to test that (so leaving
488 // for timed approach). -- AKa 25-Oct-2008
489
490 // OS X 10.5 (Intel) needs more to avoid segfaults.
491 //
492 // "make test" is okay. 100's of "make require" are okay.
493 //
494 // Tested on MacBook Core Duo 2GHz and 10.5.5:
495 // -- AKa 25-Oct-2008
496 //
497 { 499 {
498 lua_Number const shutdown_timeout = lua_tonumber(L, lua_upvalueindex(1)); 500 std::chrono::time_point<std::chrono::steady_clock> t_until{ std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::steady_clock::duration>(shutdown_timeout) };
499 double const t_until = now_secs() + shutdown_timeout;
500 501
501 while (U->selfdestruct_first != SELFDESTRUCT_END) 502 while (U->selfdestruct_first != SELFDESTRUCT_END)
502 { 503 {
503 YIELD(); // give threads time to act on their cancel 504 // give threads time to act on their cancel
505 YIELD();
506 // count the number of cancelled thread that didn't have the time to act yet
507 int n{ 0 };
504 { 508 {
505 // count the number of cancelled thread that didn't have the time to act yet 509 std::lock_guard<std::mutex> guard{ U->selfdestruct_cs };
506 int n = 0; 510 Lane* lane{ U->selfdestruct_first };
507 double t_now = 0.0; 511 while (lane != SELFDESTRUCT_END)
508 { 512 {
509 std::lock_guard<std::mutex> guard{ U->selfdestruct_cs }; 513 if (lane->cancel_request != CancelRequest::None)
510 Lane* lane{ U->selfdestruct_first }; 514 ++n;
511 while (lane != SELFDESTRUCT_END) 515 lane = lane->selfdestruct_next;
512 {
513 if (lane->cancel_request == CancelRequest::Hard)
514 ++n;
515 lane = lane->selfdestruct_next;
516 }
517 }
518 // if timeout elapsed, or we know all threads have acted, stop waiting
519 t_now = now_secs();
520 if (n == 0 || (t_now >= t_until))
521 {
522 DEBUGSPEW_CODE(fprintf(stderr, "%d uncancelled lane(s) remain after waiting %fs at process end.\n", n, shutdown_timeout - (t_until - t_now)));
523 break;
524 } 516 }
525 } 517 }
518 // if timeout elapsed, or we know all threads have acted, stop waiting
519 std::chrono::time_point<std::chrono::steady_clock> t_now = std::chrono::steady_clock::now();
520 if (n == 0 || (t_now >= t_until))
521 {
522 DEBUGSPEW_CODE(fprintf(stderr, "%d uncancelled lane(s) remain after waiting %fs at process end.\n", n, shutdown_timeout.count()));
523 break;
524 }
526 } 525 }
527 } 526 }
528 527
@@ -532,48 +531,17 @@ static int universe_gc( lua_State* L)
532 { 531 {
533 YIELD(); 532 YIELD();
534 } 533 }
535
536 //---
537 // Kill the still free running threads
538 //
539 if (U->selfdestruct_first != SELFDESTRUCT_END)
540 {
541 unsigned int n = 0;
542 // first thing we did was to raise the linda signals the threads were waiting on (if any)
543 // therefore, any well-behaved thread should be in CANCELLED state
544 // these are not running, and the state can be closed
545 {
546 std::lock_guard<std::mutex> guard{ U->selfdestruct_cs };
547 Lane* lane{ U->selfdestruct_first };
548 while (lane != SELFDESTRUCT_END)
549 {
550 Lane* const next_s{ lane->selfdestruct_next };
551 lane->selfdestruct_next = nullptr; // detach from selfdestruct chain
552 if (!THREAD_ISNULL(lane->thread)) // can be nullptr if previous 'soft' termination succeeded
553 {
554 THREAD_KILL(&lane->thread);
555#if THREADAPI == THREADAPI_PTHREAD
556 // pthread: make sure the thread is really stopped!
557 THREAD_WAIT(&lane->thread, -1, &lane->done_signal, &lane->done_lock, &lane->status);
558#endif // THREADAPI == THREADAPI_PTHREAD
559 }
560 // NO lua_close() in this case because we don't know where execution of the state was interrupted
561 delete lane;
562 lane = next_s;
563 ++n;
564 }
565 U->selfdestruct_first = SELFDESTRUCT_END;
566 }
567
568 DEBUGSPEW_CODE(fprintf(stderr, "Killed %d lane(s) at process end.\n", n));
569 }
570 } 534 }
571 535
572 // If some lanes are currently cleaning after themselves, wait until they are done. 536 // If after all this, we still have some free-running lanes, it's an external user error, they should have stopped appropriately
573 // They are no longer listed in the selfdestruct chain, but they still have to lua_close().
574 while (U->selfdestructing_count.load(std::memory_order_acquire) > 0)
575 { 537 {
576 YIELD(); 538 std::lock_guard<std::mutex> guard{ U->selfdestruct_cs };
539 Lane* lane{ U->selfdestruct_first };
540 if (lane != SELFDESTRUCT_END)
541 {
542 // this causes a leak because we don't call U's destructor (which could be bad if the still running lanes are accessing it)
543 std::ignore = luaL_error(L, "Zombie thread %s refuses to die!", lane->debug_name);
544 }
577 } 545 }
578 546
579 // necessary so that calling free_deep_prelude doesn't crash because linda_id expects a linda lightuserdata at absolute slot 1 547 // necessary so that calling free_deep_prelude doesn't crash because linda_id expects a linda lightuserdata at absolute slot 1
@@ -874,20 +842,8 @@ static char const* get_errcode_name( int _code)
874} 842}
875#endif // USE_DEBUG_SPEW() 843#endif // USE_DEBUG_SPEW()
876 844
877#if THREADWAIT_METHOD == THREADWAIT_CONDVAR // implies THREADAPI == THREADAPI_PTHREAD 845static void lane_main(Lane* lane)
878static void thread_cleanup_handler(void* opaque)
879{ 846{
880 Lane* lane{ (Lane*) opaque };
881 MUTEX_LOCK(&lane->done_lock);
882 lane->status = CANCELLED;
883 SIGNAL_ONE(&lane->done_signal); // wake up master (while 'lane->done_lock' is on)
884 MUTEX_UNLOCK(&lane->done_lock);
885}
886#endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR
887
888static THREAD_RETURN_T THREAD_CALLCONV lane_main(void* vs)
889{
890 Lane* lane{ (Lane*) vs };
891 lua_State* const L{ lane->L }; 847 lua_State* const L{ lane->L };
892 // wait until the launching thread has finished preparing L 848 // wait until the launching thread has finished preparing L
893 lane->m_ready.wait(); 849 lane->m_ready.wait();
@@ -897,8 +853,6 @@ static THREAD_RETURN_T THREAD_CALLCONV lane_main(void* vs)
897 // At this point, the lane function and arguments are on the stack 853 // At this point, the lane function and arguments are on the stack
898 int const nargs{ lua_gettop(L) - 1 }; 854 int const nargs{ lua_gettop(L) - 1 };
899 DEBUGSPEW_CODE(Universe* U = universe_get(L)); 855 DEBUGSPEW_CODE(Universe* U = universe_get(L));
900 THREAD_MAKE_ASYNCH_CANCELLABLE();
901 THREAD_CLEANUP_PUSH(thread_cleanup_handler, lane);
902 lane->status = RUNNING; // PENDING -> RUNNING 856 lane->status = RUNNING; // PENDING -> RUNNING
903 857
904 // Tie "set_finalizer()" to the state 858 // Tie "set_finalizer()" to the state
@@ -949,18 +903,19 @@ static THREAD_RETURN_T THREAD_CALLCONV lane_main(void* vs)
949 // the finalizer generated an error, and left its own error message [and stack trace] on the stack 903 // the finalizer generated an error, and left its own error message [and stack trace] on the stack
950 rc = rc2; // we're overruling the earlier script error or normal return 904 rc = rc2; // we're overruling the earlier script error or normal return
951 } 905 }
952 lane->waiting_on = nullptr; // just in case 906 lane->m_waiting_on = nullptr; // just in case
953 if (selfdestruct_remove(lane)) // check and remove (under lock!) 907 if (selfdestruct_remove(lane)) // check and remove (under lock!)
954 { 908 {
955 // We're a free-running thread and no-one's there to clean us up. 909 // We're a free-running thread and no-one's there to clean us up.
956 //
957 lua_close(lane->L); 910 lua_close(lane->L);
958 911 lane->L = nullptr; // just in case
959 lane->U->selfdestruct_cs.lock(); 912 lane->U->selfdestruct_cs.lock();
960 // done with lua_close(), terminal shutdown sequence may proceed 913 // done with lua_close(), terminal shutdown sequence may proceed
961 lane->U->selfdestructing_count.fetch_sub(1, std::memory_order_release); 914 lane->U->selfdestructing_count.fetch_sub(1, std::memory_order_release);
962 lane->U->selfdestruct_cs.unlock(); 915 lane->U->selfdestruct_cs.unlock();
963 916
917 // we destroy our jthread member from inside the thread body, so we have to detach so that we don't try to join, as this doesn't seem a good idea
918 lane->m_thread.detach();
964 delete lane; 919 delete lane;
965 lane = nullptr; 920 lane = nullptr;
966 } 921 }
@@ -972,21 +927,14 @@ static THREAD_RETURN_T THREAD_CALLCONV lane_main(void* vs)
972 enum e_status st = (rc == 0) ? DONE : CANCEL_ERROR.equals(L, 1) ? CANCELLED : ERROR_ST; 927 enum e_status st = (rc == 0) ? DONE : CANCEL_ERROR.equals(L, 1) ? CANCELLED : ERROR_ST;
973 928
974 // Posix no PTHREAD_TIMEDJOIN: 929 // Posix no PTHREAD_TIMEDJOIN:
975 // 'done_lock' protects the -> DONE|ERROR_ST|CANCELLED state change 930 // 'm_done_mutex' protects the -> DONE|ERROR_ST|CANCELLED state change
976 // 931 //
977#if THREADWAIT_METHOD == THREADWAIT_CONDVAR
978 MUTEX_LOCK(&lane->done_lock);
979 { 932 {
980#endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR 933 std::lock_guard lock{ lane->m_done_mutex };
981 lane->status = st; 934 lane->status = st;
982#if THREADWAIT_METHOD == THREADWAIT_CONDVAR 935 lane->m_done_signal.notify_one();// wake up master (while 'lane->m_done_mutex' is on)
983 SIGNAL_ONE(&lane->done_signal); // wake up master (while 'lane->done_lock' is on)
984 } 936 }
985 MUTEX_UNLOCK(&lane->done_lock);
986#endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR
987 } 937 }
988 THREAD_CLEANUP_POP(false);
989 return 0; // ignored
990} 938}
991 939
992// ################################################################################################# 940// #################################################################################################
@@ -1115,13 +1063,11 @@ LUAG_FUNC(lane_new)
1115 // leave a single cancel_error on the stack for the caller 1063 // leave a single cancel_error on the stack for the caller
1116 lua_settop(m_lane->L, 0); 1064 lua_settop(m_lane->L, 0);
1117 CANCEL_ERROR.pushKey(m_lane->L); 1065 CANCEL_ERROR.pushKey(m_lane->L);
1118#if THREADWAIT_METHOD == THREADWAIT_CONDVAR 1066 {
1119 MUTEX_LOCK(&m_lane->done_lock); 1067 std::lock_guard lock{ m_lane->m_done_mutex };
1120#endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR 1068 m_lane->status = CANCELLED;
1121 m_lane->status = CANCELLED; 1069 m_lane->m_done_signal.notify_one(); // wake up master (while 'lane->m_done_mutex' is on)
1122#if THREADWAIT_METHOD == THREADWAIT_CONDVAR 1070 }
1123 MUTEX_UNLOCK(&m_lane->done_lock);
1124#endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR
1125 // unblock the thread so that it can terminate gracefully 1071 // unblock the thread so that it can terminate gracefully
1126 m_lane->m_ready.count_down(); 1072 m_lane->m_ready.count_down();
1127 } 1073 }
@@ -1170,7 +1116,7 @@ LUAG_FUNC(lane_new)
1170 } onExit{ L, lane, gc_cb_idx }; 1116 } onExit{ L, lane, gc_cb_idx };
1171 // launch the thread early, it will sync with a std::latch to parallelize OS thread warmup and L2 preparation 1117 // launch the thread early, it will sync with a std::latch to parallelize OS thread warmup and L2 preparation
1172 DEBUGSPEW_CODE(fprintf(stderr, INDENT_BEGIN "lane_new: launching thread\n" INDENT_END)); 1118 DEBUGSPEW_CODE(fprintf(stderr, INDENT_BEGIN "lane_new: launching thread\n" INDENT_END));
1173 THREAD_CREATE(&lane->thread, lane_main, lane, priority); 1119 lane->startThread(priority);
1174 1120
1175 STACK_GROW( L2, nargs + 3); // 1121 STACK_GROW( L2, nargs + 3); //
1176 STACK_CHECK_START_REL(L2, 0); 1122 STACK_CHECK_START_REL(L2, 0);
@@ -1347,7 +1293,7 @@ LUAG_FUNC(lane_new)
1347static int lane_gc(lua_State* L) 1293static int lane_gc(lua_State* L)
1348{ 1294{
1349 bool have_gc_cb{ false }; 1295 bool have_gc_cb{ false };
1350 Lane* lane{ lua_toLane(L, 1) }; // ud 1296 Lane* const lane{ lua_toLane(L, 1) }; // ud
1351 1297
1352 // if there a gc callback? 1298 // if there a gc callback?
1353 lua_getiuservalue(L, 1, 1); // ud uservalue 1299 lua_getiuservalue(L, 1, 1); // ud uservalue
@@ -1365,30 +1311,7 @@ static int lane_gc(lua_State* L)
1365 } 1311 }
1366 1312
1367 // We can read 'lane->status' without locks, but not wait for it 1313 // We can read 'lane->status' without locks, but not wait for it
1368 // test Killed state first, as it doesn't need to enter the selfdestruct chain 1314 if (lane->status < DONE)
1369 if (lane->mstatus == Lane::Killed)
1370 {
1371 // Make sure a kill has proceeded, before cleaning up the data structure.
1372 //
1373 // NO lua_close() in this case because we don't know where execution of the state was interrupted
1374 DEBUGSPEW_CODE(fprintf(stderr, "** Joining with a killed thread (needs testing) **"));
1375 // make sure the thread is no longer running, just like thread_join()
1376 if (!THREAD_ISNULL(lane->thread))
1377 {
1378 THREAD_WAIT(&lane->thread, -1, &lane->done_signal, &lane->done_lock, &lane->status);
1379 }
1380 if (lane->status >= DONE && lane->L)
1381 {
1382 // we know the thread was killed while the Lua VM was not doing anything: we should be able to close it without crashing
1383 // now, thread_cancel() will not forcefully kill a lane with lane->status >= DONE, so I am not sure it can ever happen
1384 lua_close(lane->L);
1385 lane->L = nullptr;
1386 // just in case, but s will be freed soon so...
1387 lane->debug_name = "<gc>";
1388 }
1389 DEBUGSPEW_CODE(fprintf(stderr, "** Joined ok **"));
1390 }
1391 else if (lane->status < DONE)
1392 { 1315 {
1393 // still running: will have to be cleaned up later 1316 // still running: will have to be cleaned up later
1394 selfdestruct_add(lane); 1317 selfdestruct_add(lane);
@@ -1437,7 +1360,6 @@ static char const * thread_status_string(Lane* lane_)
1437{ 1360{
1438 enum e_status const st{ lane_->status }; // read just once (volatile) 1361 enum e_status const st{ lane_->status }; // read just once (volatile)
1439 char const* str = 1362 char const* str =
1440 (lane_->mstatus == Lane::Killed) ? "killed" : // new to v3.3.0!
1441 (st == PENDING) ? "pending" : 1363 (st == PENDING) ? "pending" :
1442 (st == RUNNING) ? "running" : // like in 'co.status()' 1364 (st == RUNNING) ? "running" : // like in 'co.status()'
1443 (st == WAITING) ? "waiting" : 1365 (st == WAITING) ? "waiting" :
@@ -1471,9 +1393,10 @@ int push_thread_status(lua_State* L, Lane* lane_)
1471LUAG_FUNC(thread_join) 1393LUAG_FUNC(thread_join)
1472{ 1394{
1473 Lane* const lane{ lua_toLane(L, 1) }; 1395 Lane* const lane{ lua_toLane(L, 1) };
1474 lua_Number const wait_secs{ luaL_optnumber(L, 2, -1.0) }; 1396 lua_Duration const duration{ luaL_optnumber(L, 2, -1.0) };
1475 lua_State* const L2{ lane->L }; 1397 lua_State* const L2{ lane->L };
1476 bool const done{ THREAD_ISNULL(lane->thread) || THREAD_WAIT(&lane->thread, wait_secs, &lane->done_signal, &lane->done_lock, &lane->status) }; 1398
1399 bool const done{ !lane->m_thread.joinable() || lane->waitForCompletion(duration) };
1477 if (!done || !L2) 1400 if (!done || !L2)
1478 { 1401 {
1479 STACK_GROW(L, 2); 1402 STACK_GROW(L, 2);
@@ -1486,58 +1409,47 @@ LUAG_FUNC(thread_join)
1486 // Thread is DONE/ERROR_ST/CANCELLED; all ours now 1409 // Thread is DONE/ERROR_ST/CANCELLED; all ours now
1487 1410
1488 int ret{ 0 }; 1411 int ret{ 0 };
1489 if (lane->mstatus == Lane::Killed) // OS thread was killed if thread_cancel was forced 1412 Universe* const U{ lane->U };
1490 { 1413 // debug_name is a pointer to string possibly interned in the lane's state, that no longer exists when the state is closed
1491 // in that case, even if the thread was killed while DONE/ERROR_ST/CANCELLED, ignore regular return values 1414 // so store it in the userdata uservalue at a key that can't possibly collide
1492 STACK_GROW(L, 2); 1415 securize_debug_threadname(L, lane);
1493 lua_pushnil(L); 1416 switch (lane->status)
1494 lua_pushliteral(L, "killed");
1495 ret = 2;
1496 }
1497 else
1498 { 1417 {
1499 Universe* const U{ lane->U }; 1418 case DONE:
1500 // debug_name is a pointer to string possibly interned in the lane's state, that no longer exists when the state is closed
1501 // so store it in the userdata uservalue at a key that can't possibly collide
1502 securize_debug_threadname(L, lane);
1503 switch (lane->status)
1504 { 1419 {
1505 case DONE: 1420 int const n{ lua_gettop(L2) }; // whole L2 stack
1421 if ((n > 0) && (luaG_inter_move(U, L2, L, n, LookupMode::LaneBody) != 0))
1506 { 1422 {
1507 int const n{ lua_gettop(L2) }; // whole L2 stack 1423 return luaL_error(L, "tried to copy unsupported types");
1508 if ((n > 0) && (luaG_inter_move(U, L2, L, n, LookupMode::LaneBody) != 0))
1509 {
1510 return luaL_error(L, "tried to copy unsupported types");
1511 }
1512 ret = n;
1513 } 1424 }
1514 break; 1425 ret = n;
1426 }
1427 break;
1515 1428
1516 case ERROR_ST: 1429 case ERROR_ST:
1430 {
1431 int const n{ lua_gettop(L2) };
1432 STACK_GROW(L, 3);
1433 lua_pushnil(L);
1434 // even when ERROR_FULL_STACK, if the error is not LUA_ERRRUN, the handler wasn't called, and we only have 1 error message on the stack ...
1435 if (luaG_inter_move(U, L2, L, n, LookupMode::LaneBody) != 0) // nil "err" [trace]
1517 { 1436 {
1518 int const n{ lua_gettop(L2) }; 1437 return luaL_error(L, "tried to copy unsupported types: %s", lua_tostring(L, -n));
1519 STACK_GROW(L, 3);
1520 lua_pushnil(L);
1521 // even when ERROR_FULL_STACK, if the error is not LUA_ERRRUN, the handler wasn't called, and we only have 1 error message on the stack ...
1522 if (luaG_inter_move(U, L2, L, n, LookupMode::LaneBody) != 0) // nil "err" [trace]
1523 {
1524 return luaL_error(L, "tried to copy unsupported types: %s", lua_tostring(L, -n));
1525 }
1526 ret = 1 + n;
1527 } 1438 }
1528 break; 1439 ret = 1 + n;
1440 }
1441 break;
1529 1442
1530 case CANCELLED: 1443 case CANCELLED:
1531 ret = 0; 1444 ret = 0;
1532 break; 1445 break;
1533 1446
1534 default: 1447 default:
1535 DEBUGSPEW_CODE(fprintf(stderr, "Status: %d\n", lane->status)); 1448 DEBUGSPEW_CODE(fprintf(stderr, "Status: %d\n", lane->status));
1536 ASSERT_L(false); 1449 ASSERT_L(false);
1537 ret = 0; 1450 ret = 0;
1538 }
1539 lua_close(L2);
1540 } 1451 }
1452 lua_close(L2);
1541 lane->L = nullptr; 1453 lane->L = nullptr;
1542 STACK_CHECK(L, ret); 1454 STACK_CHECK(L, ret);
1543 return ret; 1455 return ret;
@@ -1596,15 +1508,12 @@ LUAG_FUNC(thread_index)
1596 switch (lane->status) 1508 switch (lane->status)
1597 { 1509 {
1598 default: 1510 default:
1599 if (lane->mstatus != Lane::Killed) 1511 // this is an internal error, we probably never get here
1600 { 1512 lua_settop(L, 0);
1601 // this is an internal error, we probably never get here 1513 lua_pushliteral(L, "Unexpected status: ");
1602 lua_settop(L, 0); 1514 lua_pushstring(L, thread_status_string(lane));
1603 lua_pushliteral(L, "Unexpected status: "); 1515 lua_concat(L, 2);
1604 lua_pushstring(L, thread_status_string(lane)); 1516 raise_lua_error(L);
1605 lua_concat(L, 2);
1606 raise_lua_error(L);
1607 }
1608 [[fallthrough]]; // fall through if we are killed, as we got nil, "killed" on the stack 1517 [[fallthrough]]; // fall through if we are killed, as we got nil, "killed" on the stack
1609 1518
1610 case DONE: // got regular return values 1519 case DONE: // got regular return values
@@ -1790,8 +1699,7 @@ LUAG_FUNC(wakeup_conv)
1790 lua_pop(L,1); 1699 lua_pop(L,1);
1791 STACK_CHECK(L, 0); 1700 STACK_CHECK(L, 0);
1792 1701
1793 struct tm t; 1702 std::tm t{};
1794 memset(&t, 0, sizeof(t));
1795 t.tm_year = year - 1900; 1703 t.tm_year = year - 1900;
1796 t.tm_mon= month-1; // 0..11 1704 t.tm_mon= month-1; // 0..11
1797 t.tm_mday= day; // 1..31 1705 t.tm_mday= day; // 1..31
@@ -1800,7 +1708,7 @@ LUAG_FUNC(wakeup_conv)
1800 t.tm_sec= sec; // 0..60 1708 t.tm_sec= sec; // 0..60
1801 t.tm_isdst= isdst; // 0/1/negative 1709 t.tm_isdst= isdst; // 0/1/negative
1802 1710
1803 lua_pushnumber(L, static_cast<lua_Number>(mktime(&t))); // ms=0 1711 lua_pushnumber(L, static_cast<lua_Number>(std::mktime(&t))); // resolution: 1 second
1804 return 1; 1712 return 1;
1805} 1713}
1806 1714
@@ -1909,13 +1817,14 @@ LUAG_FUNC(configure)
1909 DEBUGSPEW_CODE(fprintf( stderr, INDENT_BEGIN "%p: lanes.configure() BEGIN\n" INDENT_END, L)); 1817 DEBUGSPEW_CODE(fprintf( stderr, INDENT_BEGIN "%p: lanes.configure() BEGIN\n" INDENT_END, L));
1910 DEBUGSPEW_CODE(if (U) U->debugspew_indent_depth.fetch_add(1, std::memory_order_relaxed)); 1818 DEBUGSPEW_CODE(if (U) U->debugspew_indent_depth.fetch_add(1, std::memory_order_relaxed));
1911 1819
1912 if(U == nullptr) 1820 if (U == nullptr)
1913 { 1821 {
1914 U = universe_create( L); // settings universe 1822 U = universe_create(L); // settings universe
1915 DEBUGSPEW_CODE(U->debugspew_indent_depth.fetch_add(1, std::memory_order_relaxed)); 1823 DEBUGSPEW_CODE(U->debugspew_indent_depth.fetch_add(1, std::memory_order_relaxed));
1916 lua_newtable( L); // settings universe mt 1824 lua_newtable( L); // settings universe mt
1917 lua_getfield(L, 1, "shutdown_timeout"); // settings universe mt shutdown_timeout 1825 lua_getfield(L, 1, "shutdown_timeout"); // settings universe mt shutdown_timeout
1918 lua_pushcclosure(L, universe_gc, 1); // settings universe mt universe_gc 1826 lua_getfield(L, 1, "shutdown_mode"); // settings universe mt shutdown_timeout shutdown_mode
1827 lua_pushcclosure(L, universe_gc, 2); // settings universe mt universe_gc
1919 lua_setfield(L, -2, "__gc"); // settings universe mt 1828 lua_setfield(L, -2, "__gc"); // settings universe mt
1920 lua_setmetatable(L, -2); // settings universe 1829 lua_setmetatable(L, -2); // settings universe
1921 lua_pop(L, 1); // settings 1830 lua_pop(L, 1); // settings
diff --git a/src/lanes.lua b/src/lanes.lua
index 6af286a..fd3d22b 100644
--- a/src/lanes.lua
+++ b/src/lanes.lua
@@ -73,6 +73,7 @@ lanes.configure = function( settings_)
73 keepers_gc_threshold = -1, 73 keepers_gc_threshold = -1,
74 on_state_create = nil, 74 on_state_create = nil,
75 shutdown_timeout = 0.25, 75 shutdown_timeout = 0.25,
76 shutdown_mode = "hard",
76 with_timers = true, 77 with_timers = true,
77 track_lanes = false, 78 track_lanes = false,
78 demote_full_userdata = nil, 79 demote_full_userdata = nil,
@@ -113,6 +114,11 @@ lanes.configure = function( settings_)
113 -- shutdown_timeout should be a number >= 0 114 -- shutdown_timeout should be a number >= 0
114 return type( val_) == "number" and val_ >= 0 115 return type( val_) == "number" and val_ >= 0
115 end, 116 end,
117 shutdown_mode = function( val_)
118 local valid_hooks = { soft = true, hard = true, call = true, ret = true, line = true, count = true }
119 -- shutdown_mode should be a known hook mask
120 return valid_hooks[val_]
121 end,
116 track_lanes = boolean_param_checker, 122 track_lanes = boolean_param_checker,
117 demote_full_userdata = boolean_param_checker, 123 demote_full_userdata = boolean_param_checker,
118 verbose_errors = boolean_param_checker 124 verbose_errors = boolean_param_checker
@@ -367,262 +373,263 @@ lanes.configure = function( settings_)
367 373
368 374
369 if settings.with_timers ~= false then 375 if settings.with_timers ~= false then
376 --
377 -- On first 'require "lanes"', a timer lane is spawned that will maintain
378 -- timer tables and sleep in between the timer events. All interaction with
379 -- the timer lane happens via a 'timer_gateway' Linda, which is common to
380 -- all that 'require "lanes"'.
381 --
382 -- Linda protocol to timer lane:
383 --
384 -- TGW_KEY: linda_h, key, [wakeup_at_secs], [repeat_secs]
385 --
386 local TGW_KEY= "(timer control)" -- the key does not matter, a 'weird' key may help debugging
387 local TGW_QUERY, TGW_REPLY = "(timer query)", "(timer reply)"
388 local first_time_key= "first time"
370 389
371 -- 390 local first_time = timer_gateway:get( first_time_key) == nil
372 -- On first 'require "lanes"', a timer lane is spawned that will maintain 391 timer_gateway:set( first_time_key, true)
373 -- timer tables and sleep in between the timer events. All interaction with
374 -- the timer lane happens via a 'timer_gateway' Linda, which is common to
375 -- all that 'require "lanes"'.
376 --
377 -- Linda protocol to timer lane:
378 --
379 -- TGW_KEY: linda_h, key, [wakeup_at_secs], [repeat_secs]
380 --
381 local TGW_KEY= "(timer control)" -- the key does not matter, a 'weird' key may help debugging
382 local TGW_QUERY, TGW_REPLY = "(timer query)", "(timer reply)"
383 local first_time_key= "first time"
384
385 local first_time = timer_gateway:get( first_time_key) == nil
386 timer_gateway:set( first_time_key, true)
387
388 --
389 -- Timer lane; initialize only on the first 'require "lanes"' instance (which naturally
390 -- has 'table' always declared)
391 --
392 if first_time then
393 392
394 local now_secs = core.now_secs 393 local now_secs = core.now_secs
395 assert( type( now_secs) == "function") 394 local wakeup_conv = core.wakeup_conv
396 ----- 395
397 -- Snore loop (run as a lane on the background)
398 --
399 -- High priority, to get trustworthy timings.
400 -- 396 --
401 -- We let the timer lane be a "free running" thread; no handle to it 397 -- Timer lane; initialize only on the first 'require "lanes"' instance (which naturally
402 -- remains. 398 -- has 'table' always declared)
403 -- 399 --
404 local timer_body = function() 400 if first_time then
405 set_debug_threadname( "LanesTimer") 401
406 -- 402 assert( type( now_secs) == "function")
407 -- { [deep_linda_lightuserdata]= { [deep_linda_lightuserdata]=linda_h, 403 -----
408 -- [key]= { wakeup_secs [,period_secs] } [, ...] }, 404 -- Snore loop (run as a lane on the background)
409 -- }
410 --
411 -- Collection of all running timers, indexed with linda's & key.
412 -- 405 --
413 -- Note that we need to use the deep lightuserdata identifiers, instead 406 -- High priority, to get trustworthy timings.
414 -- of 'linda_h' themselves as table indices. Otherwise, we'd get multiple
415 -- entries for the same timer.
416 -- 407 --
417 -- The 'hidden' reference to Linda proxy is used in 'check_timers()' but 408 -- We let the timer lane be a "free running" thread; no handle to it
418 -- also important to keep the Linda alive, even if all outside world threw 409 -- remains.
419 -- away pointers to it (which would ruin uniqueness of the deep pointer).
420 -- Now we're safe.
421 -- 410 --
422 local collection = {} 411 local timer_body = function()
423 local table_insert = assert( table.insert) 412 set_debug_threadname( "LanesTimer")
424 413 --
425 local get_timers = function() 414 -- { [deep_linda_lightuserdata]= { [deep_linda_lightuserdata]=linda_h,
426 local r = {} 415 -- [key]= { wakeup_secs [,period_secs] } [, ...] },
427 for deep, t in pairs( collection) do 416 -- }
428 -- WR( tostring( deep)) 417 --
429 local l = t[deep] 418 -- Collection of all running timers, indexed with linda's & key.
430 for key, timer_data in pairs( t) do 419 --
431 if key ~= deep then 420 -- Note that we need to use the deep lightuserdata identifiers, instead
432 table_insert( r, {l, key, timer_data}) 421 -- of 'linda_h' themselves as table indices. Otherwise, we'd get multiple
422 -- entries for the same timer.
423 --
424 -- The 'hidden' reference to Linda proxy is used in 'check_timers()' but
425 -- also important to keep the Linda alive, even if all outside world threw
426 -- away pointers to it (which would ruin uniqueness of the deep pointer).
427 -- Now we're safe.
428 --
429 local collection = {}
430 local table_insert = assert( table.insert)
431
432 local get_timers = function()
433 local r = {}
434 for deep, t in pairs( collection) do
435 -- WR( tostring( deep))
436 local l = t[deep]
437 for key, timer_data in pairs( t) do
438 if key ~= deep then
439 table_insert( r, {l, key, timer_data})
440 end
433 end 441 end
434 end 442 end
435 end 443 return r
436 return r 444 end -- get_timers()
437 end -- get_timers()
438
439 --
440 -- set_timer( linda_h, key [,wakeup_at_secs [,period_secs]] )
441 --
442 local set_timer = function( linda, key, wakeup_at, period)
443 assert( wakeup_at == nil or wakeup_at > 0.0)
444 assert( period == nil or period > 0.0)
445 445
446 local linda_deep = linda:deep()
447 assert( linda_deep)
448
449 -- Find or make a lookup for this timer
450 -- 446 --
451 local t1 = collection[linda_deep] 447 -- set_timer( linda_h, key [,wakeup_at_secs [,period_secs]] )
452 if not t1 then 448 --
453 t1 = { [linda_deep] = linda} -- proxy to use the Linda 449 local set_timer = function( linda, key, wakeup_at, period)
454 collection[linda_deep] = t1 450 assert( wakeup_at == nil or wakeup_at > 0.0)
455 end 451 assert( period == nil or period > 0.0)
456 452
457 if wakeup_at == nil then 453 local linda_deep = linda:deep()
458 -- Clear the timer 454 assert( linda_deep)
459 --
460 t1[key]= nil
461 455
462 -- Remove empty tables from collection; speeds timer checks and 456 -- Find or make a lookup for this timer
463 -- lets our 'safety reference' proxy be gc:ed as well.
464 -- 457 --
465 local empty = true 458 local t1 = collection[linda_deep]
466 for k, _ in pairs( t1) do 459 if not t1 then
467 if k ~= linda_deep then 460 t1 = { [linda_deep] = linda} -- proxy to use the Linda
468 empty = false 461 collection[linda_deep] = t1
469 break
470 end
471 end
472 if empty then
473 collection[linda_deep] = nil
474 end 462 end
475 463
476 -- Note: any unread timer value is left at 'linda[key]' intensionally; 464 if wakeup_at == nil then
477 -- clearing a timer just stops it. 465 -- Clear the timer
478 else 466 --
479 -- New timer or changing the timings 467 t1[key]= nil
480 --
481 local t2 = t1[key]
482 if not t2 then
483 t2= {}
484 t1[key]= t2
485 end
486 468
487 t2[1] = wakeup_at 469 -- Remove empty tables from collection; speeds timer checks and
488 t2[2] = period -- can be 'nil' 470 -- lets our 'safety reference' proxy be gc:ed as well.
489 end 471 --
490 end -- set_timer() 472 local empty = true
473 for k, _ in pairs( t1) do
474 if k ~= linda_deep then
475 empty = false
476 break
477 end
478 end
479 if empty then
480 collection[linda_deep] = nil
481 end
491 482
492 ----- 483 -- Note: any unread timer value is left at 'linda[key]' intensionally;
493 -- [next_wakeup_at]= check_timers() 484 -- clearing a timer just stops it.
494 -- Check timers, and wake up the ones expired (if any) 485 else
495 -- Returns the closest upcoming (remaining) wakeup time (or 'nil' if none). 486 -- New timer or changing the timings
496 local check_timers = function()
497 local now = now_secs()
498 local next_wakeup
499
500 for linda_deep,t1 in pairs(collection) do
501 for key,t2 in pairs(t1) do
502 -- 487 --
503 if key==linda_deep then 488 local t2 = t1[key]
504 -- no 'continue' in Lua :/ 489 if not t2 then
505 else 490 t2= {}
506 -- 't2': { wakeup_at_secs [,period_secs] } 491 t1[key]= t2
492 end
493
494 t2[1] = wakeup_at
495 t2[2] = period -- can be 'nil'
496 end
497 end -- set_timer()
498
499 -----
500 -- [next_wakeup_at]= check_timers()
501 -- Check timers, and wake up the ones expired (if any)
502 -- Returns the closest upcoming (remaining) wakeup time (or 'nil' if none).
503 local check_timers = function()
504 local now = now_secs()
505 local next_wakeup
506
507 for linda_deep,t1 in pairs(collection) do
508 for key,t2 in pairs(t1) do
507 -- 509 --
508 local wakeup_at= t2[1] 510 if key==linda_deep then
509 local period= t2[2] -- may be 'nil' 511 -- no 'continue' in Lua :/
510 512 else
511 if wakeup_at <= now then 513 -- 't2': { wakeup_at_secs [,period_secs] }
512 local linda= t1[linda_deep] 514 --
513 assert(linda) 515 local wakeup_at= t2[1]
514 516 local period= t2[2] -- may be 'nil'
515 linda:set( key, now ) 517
516 518 if wakeup_at <= now then
517 -- 'pairs()' allows the values to be modified (and even 519 local linda= t1[linda_deep]
518 -- removed) as far as keys are not touched 520 assert(linda)
519 521
520 if not period then 522 linda:set( key, now )
521 -- one-time timer; gone 523
522 -- 524 -- 'pairs()' allows the values to be modified (and even
523 t1[key]= nil 525 -- removed) as far as keys are not touched
524 wakeup_at= nil -- no 'continue' in Lua :/ 526
525 else 527 if not period then
526 -- repeating timer; find next wakeup (may jump multiple repeats) 528 -- one-time timer; gone
527 -- 529 --
528 repeat 530 t1[key]= nil
529 wakeup_at= wakeup_at+period 531 wakeup_at= nil -- no 'continue' in Lua :/
530 until wakeup_at > now 532 else
531 533 -- repeating timer; find next wakeup (may jump multiple repeats)
532 t2[1]= wakeup_at 534 --
535 repeat
536 wakeup_at= wakeup_at+period
537 until wakeup_at > now
538
539 t2[1]= wakeup_at
540 end
533 end 541 end
534 end
535 542
536 if wakeup_at and ((not next_wakeup) or (wakeup_at < next_wakeup)) then 543 if wakeup_at and ((not next_wakeup) or (wakeup_at < next_wakeup)) then
537 next_wakeup= wakeup_at 544 next_wakeup= wakeup_at
545 end
538 end 546 end
547 end -- t2 loop
548 end -- t1 loop
549
550 return next_wakeup -- may be 'nil'
551 end -- check_timers()
552
553 local timer_gateway_batched = timer_gateway.batched
554 set_finalizer( function( err, stk)
555 if err and type( err) ~= "userdata" then
556 WR( "LanesTimer error: "..tostring(err))
557 --elseif type( err) == "userdata" then
558 -- WR( "LanesTimer after cancel" )
559 --else
560 -- WR("LanesTimer finalized")
561 end
562 end)
563 while true do
564 local next_wakeup = check_timers()
565
566 -- Sleep until next timer to wake up, or a set/clear command
567 --
568 local secs
569 if next_wakeup then
570 secs = next_wakeup - now_secs()
571 if secs < 0 then secs = 0 end
572 end
573 local key, what = timer_gateway:receive( secs, TGW_KEY, TGW_QUERY)
574
575 if key == TGW_KEY then
576 assert( getmetatable( what) == "Linda") -- 'what' should be a linda on which the client sets a timer
577 local _, key, wakeup_at, period = timer_gateway:receive( 0, timer_gateway_batched, TGW_KEY, 3)
578 assert( key)
579 set_timer( what, key, wakeup_at, period and period > 0 and period or nil)
580 elseif key == TGW_QUERY then
581 if what == "get_timers" then
582 timer_gateway:send( TGW_REPLY, get_timers())
583 else
584 timer_gateway:send( TGW_REPLY, "unknown query " .. what)
539 end 585 end
540 end -- t2 loop 586 --elseif secs == nil then -- got no value while block-waiting?
541 end -- t1 loop 587 -- WR( "timer lane: no linda, aborted?")
542 588 end
543 return next_wakeup -- may be 'nil'
544 end -- check_timers()
545
546 local timer_gateway_batched = timer_gateway.batched
547 set_finalizer( function( err, stk)
548 if err and type( err) ~= "userdata" then
549 WR( "LanesTimer error: "..tostring(err))
550 --elseif type( err) == "userdata" then
551 -- WR( "LanesTimer after cancel" )
552 --else
553 -- WR("LanesTimer finalized")
554 end 589 end
555 end) 590 end -- timer_body()
556 while true do 591 timer_lane = gen( "*", { package= {}, priority = max_prio}, timer_body)() -- "*" instead of "io,package" for LuaJIT compatibility...
557 local next_wakeup = check_timers() 592 end -- first_time
558 593
559 -- Sleep until next timer to wake up, or a set/clear command 594 -----
595 -- = timer( linda_h, key_val, date_tbl|first_secs [,period_secs] )
596 --
597 -- PUBLIC LANES API
598 timer = function( linda, key, a, period )
599 if getmetatable( linda) ~= "Linda" then
600 error "expecting a Linda"
601 end
602 if a == 0.0 then
603 -- Caller expects to get current time stamp in Linda, on return
604 -- (like the timer had expired instantly); it would be good to set this
605 -- as late as possible (to give most current time) but also we want it
606 -- to precede any possible timers that might start striking.
560 -- 607 --
561 local secs 608 linda:set( key, now_secs())
562 if next_wakeup then 609
563 secs = next_wakeup - now_secs() 610 if not period or period==0.0 then
564 if secs < 0 then secs = 0 end 611 timer_gateway:send( TGW_KEY, linda, key, nil, nil ) -- clear the timer
565 end 612 return -- nothing more to do
566 local key, what = timer_gateway:receive( secs, TGW_KEY, TGW_QUERY)
567
568 if key == TGW_KEY then
569 assert( getmetatable( what) == "Linda") -- 'what' should be a linda on which the client sets a timer
570 local _, key, wakeup_at, period = timer_gateway:receive( 0, timer_gateway_batched, TGW_KEY, 3)
571 assert( key)
572 set_timer( what, key, wakeup_at, period and period > 0 and period or nil)
573 elseif key == TGW_QUERY then
574 if what == "get_timers" then
575 timer_gateway:send( TGW_REPLY, get_timers())
576 else
577 timer_gateway:send( TGW_REPLY, "unknown query " .. what)
578 end
579 --elseif secs == nil then -- got no value while block-waiting?
580 -- WR( "timer lane: no linda, aborted?")
581 end 613 end
614 a= period
582 end 615 end
583 end -- timer_body()
584 timer_lane = gen( "*", { package= {}, priority = max_prio}, timer_body)() -- "*" instead of "io,package" for LuaJIT compatibility...
585 end -- first_time
586 616
587 ----- 617 local wakeup_at= type(a)=="table" and wakeup_conv(a) -- given point of time
588 -- = timer( linda_h, key_val, date_tbl|first_secs [,period_secs] ) 618 or (a and now_secs()+a or nil)
589 -- 619 -- queue to timer
590 -- PUBLIC LANES API
591 timer = function( linda, key, a, period )
592 if getmetatable( linda) ~= "Linda" then
593 error "expecting a Linda"
594 end
595 if a == 0.0 then
596 -- Caller expects to get current time stamp in Linda, on return
597 -- (like the timer had expired instantly); it would be good to set this
598 -- as late as possible (to give most current time) but also we want it
599 -- to precede any possible timers that might start striking.
600 -- 620 --
601 linda:set( key, core.now_secs()) 621 timer_gateway:send( TGW_KEY, linda, key, wakeup_at, period )
622 end -- timer()
602 623
603 if not period or period==0.0 then 624 -----
604 timer_gateway:send( TGW_KEY, linda, key, nil, nil ) -- clear the timer 625 -- {[{linda, slot, when, period}[,...]]} = timers()
605 return -- nothing more to do
606 end
607 a= period
608 end
609
610 local wakeup_at= type(a)=="table" and core.wakeup_conv(a) -- given point of time
611 or (a and core.now_secs()+a or nil)
612 -- queue to timer
613 -- 626 --
614 timer_gateway:send( TGW_KEY, linda, key, wakeup_at, period ) 627 -- PUBLIC LANES API
615 end 628 timers = function()
616 629 timer_gateway:send( TGW_QUERY, "get_timers")
617 ----- 630 local _, r = timer_gateway:receive( TGW_REPLY)
618 -- {[{linda, slot, when, period}[,...]]} = timers() 631 return r
619 -- 632 end -- timers()
620 -- PUBLIC LANES API
621 timers = function()
622 timer_gateway:send( TGW_QUERY, "get_timers")
623 local _, r = timer_gateway:receive( TGW_REPLY)
624 return r
625 end
626 633
627 end -- settings.with_timers 634 end -- settings.with_timers
628 635
diff --git a/src/lanes_private.h b/src/lanes_private.h
index bcc3014..01d43c0 100644
--- a/src/lanes_private.h
+++ b/src/lanes_private.h
@@ -4,27 +4,26 @@
4#include "uniquekey.h" 4#include "uniquekey.h"
5#include "universe.h" 5#include "universe.h"
6 6
7#include <chrono>
8#include <condition_variable>
7#include <latch> 9#include <latch>
10#include <stop_token>
11#include <thread>
8 12
9// NOTE: values to be changed by either thread, during execution, without 13// NOTE: values to be changed by either thread, during execution, without
10// locking, are marked "volatile" 14// locking, are marked "volatile"
11// 15//
12class Lane 16class Lane
13{ 17{
14 private:
15
16 enum class ThreadStatus
17 {
18 Normal, // normal master side state
19 Killed // issued an OS kill
20 };
21
22 public: 18 public:
23 19
24 using enum ThreadStatus; 20 // the thread
25 21 std::jthread m_thread;
26 THREAD_T thread; 22 // a latch to wait for the lua_State to be ready
27 std::latch m_ready{ 1 }; 23 std::latch m_ready{ 1 };
24 // to wait for stop requests through m_thread's stop_source
25 std::mutex m_done_mutex;
26 std::condition_variable m_done_signal; // use condition_variable_any if waiting for a stop_token
28 // 27 //
29 // M: sub-thread OS thread 28 // M: sub-thread OS thread
30 // S: not used 29 // S: not used
@@ -42,7 +41,7 @@ class Lane
42 // M: sets to PENDING (before launching) 41 // M: sets to PENDING (before launching)
43 // S: updates -> RUNNING/WAITING -> DONE/ERROR_ST/CANCELLED 42 // S: updates -> RUNNING/WAITING -> DONE/ERROR_ST/CANCELLED
44 43
45 SIGNAL_T* volatile waiting_on{ nullptr }; 44 std::condition_variable* volatile m_waiting_on{ nullptr };
46 // 45 //
47 // When status is WAITING, points on the linda's signal the thread waits on, else nullptr 46 // When status is WAITING, points on the linda's signal the thread waits on, else nullptr
48 47
@@ -51,23 +50,6 @@ class Lane
51 // M: sets to false, flags true for cancel request 50 // M: sets to false, flags true for cancel request
52 // S: reads to see if cancel is requested 51 // S: reads to see if cancel is requested
53 52
54#if THREADWAIT_METHOD == THREADWAIT_CONDVAR
55 SIGNAL_T done_signal;
56 //
57 // M: Waited upon at lane ending (if Posix with no PTHREAD_TIMEDJOIN)
58 // S: sets the signal once cancellation is noticed (avoids a kill)
59
60 MUTEX_T done_lock;
61 //
62 // Lock required by 'done_signal' condition variable, protecting
63 // lane status changes to DONE/ERROR_ST/CANCELLED.
64#endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR
65
66 volatile ThreadStatus mstatus{ Normal };
67 //
68 // M: sets to Normal, if issued a kill changes to Killed
69 // S: not used
70
71 Lane* volatile selfdestruct_next{ nullptr }; 53 Lane* volatile selfdestruct_next{ nullptr };
72 // 54 //
73 // M: sets to non-nullptr if facing lane handle '__gc' cycle but the lane 55 // M: sets to non-nullptr if facing lane handle '__gc' cycle but the lane
@@ -88,6 +70,9 @@ class Lane
88 70
89 Lane(Universe* U_, lua_State* L_); 71 Lane(Universe* U_, lua_State* L_);
90 ~Lane(); 72 ~Lane();
73
74 bool waitForCompletion(lua_Duration duration_);
75 void startThread(int priority_);
91}; 76};
92 77
93// xxh64 of string "LANE_POINTER_REGKEY" generated at https://www.pelock.com/products/hash-calculator 78// xxh64 of string "LANE_POINTER_REGKEY" generated at https://www.pelock.com/products/hash-calculator
diff --git a/src/linda.cpp b/src/linda.cpp
index 5ee4768..ea1410e 100644
--- a/src/linda.cpp
+++ b/src/linda.cpp
@@ -61,8 +61,8 @@ class Linda : public DeepPrelude // Deep userdata MUST start with this header
61 61
62 public: 62 public:
63 63
64 SIGNAL_T read_happened; 64 std::condition_variable m_read_happened;
65 SIGNAL_T write_happened; 65 std::condition_variable m_write_happened;
66 Universe* const U; // the universe this linda belongs to 66 Universe* const U; // the universe this linda belongs to
67 uintptr_t const group; // a group to control keeper allocation between lindas 67 uintptr_t const group; // a group to control keeper allocation between lindas
68 CancelRequest simulate_cancel{ CancelRequest::None }; 68 CancelRequest simulate_cancel{ CancelRequest::None };
@@ -81,17 +81,11 @@ class Linda : public DeepPrelude // Deep userdata MUST start with this header
81 : U{ U_ } 81 : U{ U_ }
82 , group{ group_ << KEEPER_MAGIC_SHIFT } 82 , group{ group_ << KEEPER_MAGIC_SHIFT }
83 { 83 {
84 SIGNAL_INIT(&read_happened);
85 SIGNAL_INIT(&write_happened);
86
87 setName(name_, len_); 84 setName(name_, len_);
88 } 85 }
89 86
90 ~Linda() 87 ~Linda()
91 { 88 {
92 // There aren't any lanes waiting on these lindas, since all proxies have been gc'ed. Right?
93 SIGNAL_FREE(&read_happened);
94 SIGNAL_FREE(&write_happened);
95 if (std::holds_alternative<AllocatedName>(m_name)) 89 if (std::holds_alternative<AllocatedName>(m_name))
96 { 90 {
97 AllocatedName& name = std::get<AllocatedName>(m_name); 91 AllocatedName& name = std::get<AllocatedName>(m_name);
@@ -216,15 +210,19 @@ LUAG_FUNC(linda_protected_call)
216LUAG_FUNC(linda_send) 210LUAG_FUNC(linda_send)
217{ 211{
218 Linda* const linda{ lua_toLinda<false>(L, 1) }; 212 Linda* const linda{ lua_toLinda<false>(L, 1) };
219 time_d timeout{ -1.0 }; 213 std::chrono::time_point<std::chrono::steady_clock> until{ std::chrono::time_point<std::chrono::steady_clock>::max() };
220 int key_i{ 2 }; // index of first key, if timeout not there 214 int key_i{ 2 }; // index of first key, if timeout not there
221 215
222 if (lua_type(L, 2) == LUA_TNUMBER) // we don't want to use lua_isnumber() because of autocoercion 216 if (lua_type(L, 2) == LUA_TNUMBER) // we don't want to use lua_isnumber() because of autocoercion
223 { 217 {
224 timeout = SIGNAL_TIMEOUT_PREPARE(lua_tonumber(L, 2)); 218 lua_Duration const duration{ lua_tonumber(L, 2) };
219 if (duration.count() >= 0.0)
220 {
221 until = std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::steady_clock::duration>(duration);
222 }
225 ++key_i; 223 ++key_i;
226 } 224 }
227 else if (lua_isnil(L, 2)) // alternate explicit "no timeout" by passing nil before the key 225 else if (lua_isnil(L, 2)) // alternate explicit "infinite timeout" by passing nil before the key
228 { 226 {
229 ++key_i; 227 ++key_i;
230 } 228 }
@@ -266,6 +264,7 @@ LUAG_FUNC(linda_send)
266 lua_State* const KL{ K ? K->L : nullptr }; 264 lua_State* const KL{ K ? K->L : nullptr };
267 if (KL == nullptr) 265 if (KL == nullptr)
268 return 0; 266 return 0;
267
269 STACK_CHECK_START_REL(KL, 0); 268 STACK_CHECK_START_REL(KL, 0);
270 for (bool try_again{ true };;) 269 for (bool try_again{ true };;)
271 { 270 {
@@ -295,12 +294,12 @@ LUAG_FUNC(linda_send)
295 if (ret) 294 if (ret)
296 { 295 {
297 // Wake up ALL waiting threads 296 // Wake up ALL waiting threads
298 SIGNAL_ALL(&linda->write_happened); 297 linda->m_write_happened.notify_all();
299 break; 298 break;
300 } 299 }
301 300
302 // instant timout to bypass the wait syscall 301 // instant timout to bypass the wait syscall
303 if (timeout == 0.0) 302 if (std::chrono::steady_clock::now() >= until)
304 { 303 {
305 break; /* no wait; instant timeout */ 304 break; /* no wait; instant timeout */
306 } 305 }
@@ -314,14 +313,17 @@ LUAG_FUNC(linda_send)
314 prev_status = lane->status; // RUNNING, most likely 313 prev_status = lane->status; // RUNNING, most likely
315 ASSERT_L(prev_status == RUNNING); // but check, just in case 314 ASSERT_L(prev_status == RUNNING); // but check, just in case
316 lane->status = WAITING; 315 lane->status = WAITING;
317 ASSERT_L(lane->waiting_on == nullptr); 316 ASSERT_L(lane->m_waiting_on == nullptr);
318 lane->waiting_on = &linda->read_happened; 317 lane->m_waiting_on = &linda->m_read_happened;
319 } 318 }
320 // could not send because no room: wait until some data was read before trying again, or until timeout is reached 319 // could not send because no room: wait until some data was read before trying again, or until timeout is reached
321 try_again = SIGNAL_WAIT(&linda->read_happened, &K->keeper_cs, timeout); 320 std::unique_lock<std::mutex> keeper_lock{ K->m_mutex, std::adopt_lock };
321 std::cv_status const status{ linda->m_read_happened.wait_until(keeper_lock, until) };
322 keeper_lock.release(); // we don't want to release the lock!
323 try_again = (status == std::cv_status::no_timeout); // detect spurious wakeups
322 if (lane != nullptr) 324 if (lane != nullptr)
323 { 325 {
324 lane->waiting_on = nullptr; 326 lane->m_waiting_on = nullptr;
325 lane->status = prev_status; 327 lane->status = prev_status;
326 } 328 }
327 } 329 }
@@ -369,21 +371,24 @@ static constexpr UniqueKey BATCH_SENTINEL{ 0x2DDFEE0968C62AA7ull };
369LUAG_FUNC(linda_receive) 371LUAG_FUNC(linda_receive)
370{ 372{
371 Linda* const linda{ lua_toLinda<false>(L, 1) }; 373 Linda* const linda{ lua_toLinda<false>(L, 1) };
372 374 std::chrono::time_point<std::chrono::steady_clock> until{ std::chrono::time_point<std::chrono::steady_clock>::max() };
373 time_d timeout{ -1.0 }; 375 int key_i{ 2 }; // index of first key, if timeout not there
374 int key_i{ 2 };
375 376
376 if (lua_type(L, 2) == LUA_TNUMBER) // we don't want to use lua_isnumber() because of autocoercion 377 if (lua_type(L, 2) == LUA_TNUMBER) // we don't want to use lua_isnumber() because of autocoercion
377 { 378 {
378 timeout = SIGNAL_TIMEOUT_PREPARE(lua_tonumber(L, 2)); 379 lua_Duration const duration{ lua_tonumber(L, 2) };
380 if (duration.count() >= 0.0)
381 {
382 until = std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::steady_clock::duration>(duration);
383 }
379 ++key_i; 384 ++key_i;
380 } 385 }
381 else if (lua_isnil(L, 2)) // alternate explicit "no timeout" by passing nil before the key 386 else if (lua_isnil(L, 2)) // alternate explicit "infinite timeout" by passing nil before the key
382 { 387 {
383 ++key_i; 388 ++key_i;
384 } 389 }
385 390
386 keeper_api_t keeper_receive; 391 keeper_api_t selected_keeper_receive{ nullptr };
387 int expected_pushed_min{ 0 }, expected_pushed_max{ 0 }; 392 int expected_pushed_min{ 0 }, expected_pushed_max{ 0 };
388 // are we in batched mode? 393 // are we in batched mode?
389 BATCH_SENTINEL.pushKey(L); 394 BATCH_SENTINEL.pushKey(L);
@@ -396,7 +401,7 @@ LUAG_FUNC(linda_receive)
396 // make sure the keys are of a valid type 401 // make sure the keys are of a valid type
397 check_key_types(L, key_i, key_i); 402 check_key_types(L, key_i, key_i);
398 // receive multiple values from a single slot 403 // receive multiple values from a single slot
399 keeper_receive = KEEPER_API(receive_batched); 404 selected_keeper_receive = KEEPER_API(receive_batched);
400 // we expect a user-defined amount of return value 405 // we expect a user-defined amount of return value
401 expected_pushed_min = (int) luaL_checkinteger(L, key_i + 1); 406 expected_pushed_min = (int) luaL_checkinteger(L, key_i + 1);
402 expected_pushed_max = (int) luaL_optinteger(L, key_i + 2, expected_pushed_min); 407 expected_pushed_max = (int) luaL_optinteger(L, key_i + 2, expected_pushed_min);
@@ -413,17 +418,20 @@ LUAG_FUNC(linda_receive)
413 // make sure the keys are of a valid type 418 // make sure the keys are of a valid type
414 check_key_types(L, key_i, lua_gettop(L)); 419 check_key_types(L, key_i, lua_gettop(L));
415 // receive a single value, checking multiple slots 420 // receive a single value, checking multiple slots
416 keeper_receive = KEEPER_API(receive); 421 selected_keeper_receive = KEEPER_API(receive);
417 // we expect a single (value, key) pair of returned values 422 // we expect a single (value, key) pair of returned values
418 expected_pushed_min = expected_pushed_max = 2; 423 expected_pushed_min = expected_pushed_max = 2;
419 } 424 }
420 425
421 Lane* const lane{ LANE_POINTER_REGKEY.readLightUserDataValue<Lane>(L) }; 426 Lane* const lane{ LANE_POINTER_REGKEY.readLightUserDataValue<Lane>(L) };
422 Keeper* const K{ which_keeper(linda->U->keepers, linda->hashSeed()) }; 427 Keeper* const K{ which_keeper(linda->U->keepers, linda->hashSeed()) };
423 if (K == nullptr) 428 lua_State* const KL{ K ? K->L : nullptr };
429 if (KL == nullptr)
424 return 0; 430 return 0;
431
425 CancelRequest cancel{ CancelRequest::None }; 432 CancelRequest cancel{ CancelRequest::None };
426 int pushed{ 0 }; 433 int pushed{ 0 };
434 STACK_CHECK_START_REL(KL, 0);
427 for (bool try_again{ true };;) 435 for (bool try_again{ true };;)
428 { 436 {
429 if (lane != nullptr) 437 if (lane != nullptr)
@@ -439,7 +447,7 @@ LUAG_FUNC(linda_receive)
439 } 447 }
440 448
441 // all arguments of receive() but the first are passed to the keeper's receive function 449 // all arguments of receive() but the first are passed to the keeper's receive function
442 pushed = keeper_call(linda->U, K->L, keeper_receive, L, linda, key_i); 450 pushed = keeper_call(linda->U, KL, selected_keeper_receive, L, linda, key_i);
443 if (pushed < 0) 451 if (pushed < 0)
444 { 452 {
445 break; 453 break;
@@ -451,11 +459,11 @@ LUAG_FUNC(linda_receive)
451 keeper_toggle_nil_sentinels(L, lua_gettop(L) - pushed, LookupMode::FromKeeper); 459 keeper_toggle_nil_sentinels(L, lua_gettop(L) - pushed, LookupMode::FromKeeper);
452 // To be done from within the 'K' locking area 460 // To be done from within the 'K' locking area
453 // 461 //
454 SIGNAL_ALL(&linda->read_happened); 462 linda->m_read_happened.notify_all();
455 break; 463 break;
456 } 464 }
457 465
458 if (timeout == 0.0) 466 if (std::chrono::steady_clock::now() >= until)
459 { 467 {
460 break; /* instant timeout */ 468 break; /* instant timeout */
461 } 469 }
@@ -469,18 +477,22 @@ LUAG_FUNC(linda_receive)
469 prev_status = lane->status; // RUNNING, most likely 477 prev_status = lane->status; // RUNNING, most likely
470 ASSERT_L(prev_status == RUNNING); // but check, just in case 478 ASSERT_L(prev_status == RUNNING); // but check, just in case
471 lane->status = WAITING; 479 lane->status = WAITING;
472 ASSERT_L(lane->waiting_on == nullptr); 480 ASSERT_L(lane->m_waiting_on == nullptr);
473 lane->waiting_on = &linda->write_happened; 481 lane->m_waiting_on = &linda->m_write_happened;
474 } 482 }
475 // not enough data to read: wakeup when data was sent, or when timeout is reached 483 // not enough data to read: wakeup when data was sent, or when timeout is reached
476 try_again = SIGNAL_WAIT(&linda->write_happened, &K->keeper_cs, timeout); 484 std::unique_lock<std::mutex> keeper_lock{ K->m_mutex, std::adopt_lock };
485 std::cv_status const status{ linda->m_write_happened.wait_until(keeper_lock, until) };
486 keeper_lock.release(); // we don't want to release the lock!
487 try_again = (status == std::cv_status::no_timeout); // detect spurious wakeups
477 if (lane != nullptr) 488 if (lane != nullptr)
478 { 489 {
479 lane->waiting_on = nullptr; 490 lane->m_waiting_on = nullptr;
480 lane->status = prev_status; 491 lane->status = prev_status;
481 } 492 }
482 } 493 }
483 } 494 }
495 STACK_CHECK(KL, 0);
484 496
485 if (pushed < 0) 497 if (pushed < 0)
486 { 498 {
@@ -537,13 +549,13 @@ LUAG_FUNC(linda_set)
537 if (has_value) 549 if (has_value)
538 { 550 {
539 // we put some data in the slot, tell readers that they should wake 551 // we put some data in the slot, tell readers that they should wake
540 SIGNAL_ALL(&linda->write_happened); // To be done from within the 'K' locking area 552 linda->m_write_happened.notify_all(); // To be done from within the 'K' locking area
541 } 553 }
542 if (pushed == 1) 554 if (pushed == 1)
543 { 555 {
544 // the key was full, but it is no longer the case, tell writers they should wake 556 // the key was full, but it is no longer the case, tell writers they should wake
545 ASSERT_L(lua_type(L, -1) == LUA_TBOOLEAN && lua_toboolean(L, -1) == 1); 557 ASSERT_L(lua_type(L, -1) == LUA_TBOOLEAN && lua_toboolean(L, -1) == 1);
546 SIGNAL_ALL(&linda->read_happened); // To be done from within the 'K' locking area 558 linda->m_read_happened.notify_all(); // To be done from within the 'K' locking area
547 } 559 }
548 } 560 }
549 } 561 }
@@ -648,7 +660,7 @@ LUAG_FUNC( linda_limit)
648 if( pushed == 1) 660 if( pushed == 1)
649 { 661 {
650 ASSERT_L( lua_type( L, -1) == LUA_TBOOLEAN && lua_toboolean( L, -1) == 1); 662 ASSERT_L( lua_type( L, -1) == LUA_TBOOLEAN && lua_toboolean( L, -1) == 1);
651 SIGNAL_ALL( &linda->read_happened); // To be done from within the 'K' locking area 663 linda->m_read_happened.notify_all(); // To be done from within the 'K' locking area
652 } 664 }
653 } 665 }
654 else // linda is cancelled 666 else // linda is cancelled
@@ -678,8 +690,8 @@ LUAG_FUNC(linda_cancel)
678 linda->simulate_cancel = CancelRequest::Soft; 690 linda->simulate_cancel = CancelRequest::Soft;
679 if (strcmp(who, "both") == 0) // tell everyone writers to wake up 691 if (strcmp(who, "both") == 0) // tell everyone writers to wake up
680 { 692 {
681 SIGNAL_ALL(&linda->write_happened); 693 linda->m_write_happened.notify_all();
682 SIGNAL_ALL(&linda->read_happened); 694 linda->m_read_happened.notify_all();
683 } 695 }
684 else if (strcmp(who, "none") == 0) // reset flag 696 else if (strcmp(who, "none") == 0) // reset flag
685 { 697 {
@@ -687,11 +699,11 @@ LUAG_FUNC(linda_cancel)
687 } 699 }
688 else if (strcmp(who, "read") == 0) // tell blocked readers to wake up 700 else if (strcmp(who, "read") == 0) // tell blocked readers to wake up
689 { 701 {
690 SIGNAL_ALL(&linda->write_happened); 702 linda->m_write_happened.notify_all();
691 } 703 }
692 else if (strcmp(who, "write") == 0) // tell blocked writers to wake up 704 else if (strcmp(who, "write") == 0) // tell blocked writers to wake up
693 { 705 {
694 SIGNAL_ALL(&linda->read_happened); 706 linda->m_read_happened.notify_all();
695 } 707 }
696 else 708 else
697 { 709 {
diff --git a/src/macros_and_utils.h b/src/macros_and_utils.h
index e29e7fb..997b452 100644
--- a/src/macros_and_utils.h
+++ b/src/macros_and_utils.h
@@ -11,9 +11,12 @@ extern "C" {
11#endif // __cplusplus 11#endif // __cplusplus
12 12
13#include <cassert> 13#include <cassert>
14#include <chrono>
14#include <tuple> 15#include <tuple>
15#include <type_traits> 16#include <type_traits>
16 17
18using namespace std::chrono_literals;
19
17#define USE_DEBUG_SPEW() 0 20#define USE_DEBUG_SPEW() 0
18#if USE_DEBUG_SPEW() 21#if USE_DEBUG_SPEW()
19extern char const* debugspew_indent; 22extern char const* debugspew_indent;
@@ -167,3 +170,5 @@ T* lua_newuserdatauv(lua_State* L, int nuvalue_)
167 std::ignore = lua_error(L); // doesn't return 170 std::ignore = lua_error(L); // doesn't return
168 assert(false); // we should never get here, but i'm paranoid 171 assert(false); // we should never get here, but i'm paranoid
169} 172}
173
174using lua_Duration = std::chrono::template duration<lua_Number>;
diff --git a/src/threading.cpp b/src/threading.cpp
index afeb184..fc20931 100644
--- a/src/threading.cpp
+++ b/src/threading.cpp
@@ -93,9 +93,6 @@ THE SOFTWARE.
93# pragma warning( disable : 4054 ) 93# pragma warning( disable : 4054 )
94#endif 94#endif
95 95
96//#define THREAD_CREATE_RETRIES_MAX 20
97 // loops (maybe retry forever?)
98
99/* 96/*
100* FAIL is for unexpected API return values - essentially programming 97* FAIL is for unexpected API return values - essentially programming
101* error in _this_ code. 98* error in _this_ code.
@@ -196,36 +193,6 @@ time_d now_secs(void) {
196} 193}
197 194
198 195
199/*
200*/
201time_d SIGNAL_TIMEOUT_PREPARE( double secs ) {
202 if (secs<=0.0) return secs;
203 else return now_secs() + secs;
204}
205
206
207#if THREADAPI == THREADAPI_PTHREAD
208/*
209* Prepare 'abs_secs' kind of timeout to 'timespec' format
210*/
211static void prepare_timeout( struct timespec *ts, time_d abs_secs ) {
212 assert(ts);
213 assert( abs_secs >= 0.0 );
214
215 if (abs_secs==0.0)
216 abs_secs= now_secs();
217
218 ts->tv_sec= (time_t) floor( abs_secs );
219 ts->tv_nsec= ((long)((abs_secs - ts->tv_sec) * 1000.0 +0.5)) * 1000000UL; // 1ms = 1000000ns
220 if (ts->tv_nsec == 1000000000UL)
221 {
222 ts->tv_nsec = 0;
223 ts->tv_sec = ts->tv_sec + 1;
224 }
225}
226#endif // THREADAPI == THREADAPI_PTHREAD
227
228
229/*---=== Threading ===---*/ 196/*---=== Threading ===---*/
230 197
231//--- 198//---
@@ -268,30 +235,6 @@ static void prepare_timeout( struct timespec *ts, time_d abs_secs ) {
268 235
269#if THREADAPI == THREADAPI_WINDOWS 236#if THREADAPI == THREADAPI_WINDOWS
270 237
271#if _WIN32_WINNT < 0x0600 // CONDITION_VARIABLE aren't available
272 //
273 void MUTEX_INIT( MUTEX_T *ref ) {
274 *ref= CreateMutex( nullptr /*security attr*/, false /*not locked*/, nullptr );
275 if (!ref) FAIL( "CreateMutex", GetLastError() );
276 }
277 void MUTEX_FREE( MUTEX_T *ref ) {
278 if (!CloseHandle(*ref)) FAIL( "CloseHandle (mutex)", GetLastError() );
279 *ref= nullptr;
280 }
281 void MUTEX_LOCK( MUTEX_T *ref )
282 {
283 DWORD rc = WaitForSingleObject( *ref, INFINITE);
284 // ERROR_WAIT_NO_CHILDREN means a thread was killed (lane terminated because of error raised during a linda transfer for example) while having grabbed this mutex
285 // this is not a big problem as we will grab it just the same, so ignore this particular error
286 if( rc != 0 && rc != ERROR_WAIT_NO_CHILDREN)
287 FAIL( "WaitForSingleObject", (rc == WAIT_FAILED) ? GetLastError() : rc);
288 }
289 void MUTEX_UNLOCK( MUTEX_T *ref ) {
290 if (!ReleaseMutex(*ref))
291 FAIL( "ReleaseMutex", GetLastError() );
292 }
293#endif // CONDITION_VARIABLE aren't available
294
295static int const gs_prio_remap[] = 238static int const gs_prio_remap[] =
296{ 239{
297 THREAD_PRIORITY_IDLE, 240 THREAD_PRIORITY_IDLE,
@@ -303,37 +246,7 @@ static int const gs_prio_remap[] =
303 THREAD_PRIORITY_TIME_CRITICAL 246 THREAD_PRIORITY_TIME_CRITICAL
304}; 247};
305 248
306/* MSDN: "If you would like to use the CRT in ThreadProc, use the 249// ###############################################################################################
307_beginthreadex function instead (of CreateThread)."
308MSDN: "you can create at most 2028 threads"
309*/
310// Note: Visual C++ requires '__stdcall' where it is
311void THREAD_CREATE( THREAD_T* ref, THREAD_RETURN_T (__stdcall *func)( void*), void* data, int prio /* -3..+3 */)
312{
313 HANDLE h = (HANDLE) _beginthreadex(nullptr, // security
314 _THREAD_STACK_SIZE,
315 func,
316 data,
317 0, // flags (0/CREATE_SUSPENDED)
318 nullptr // thread id (not used)
319 );
320
321 if (h == nullptr) // _beginthreadex returns 0L on failure instead of -1L (like _beginthread)
322 {
323 FAIL( "CreateThread", GetLastError());
324 }
325
326 if (prio != THREAD_PRIO_DEFAULT)
327 {
328 if (!SetThreadPriority( h, gs_prio_remap[prio + 3]))
329 {
330 FAIL( "SetThreadPriority", GetLastError());
331 }
332 }
333
334 *ref = h;
335}
336
337 250
338void THREAD_SET_PRIORITY( int prio) 251void THREAD_SET_PRIORITY( int prio)
339{ 252{
@@ -344,42 +257,26 @@ void THREAD_SET_PRIORITY( int prio)
344 } 257 }
345} 258}
346 259
347void THREAD_SET_AFFINITY( unsigned int aff) 260// ###############################################################################################
261
262void JTHREAD_SET_PRIORITY(std::jthread& thread_, int prio_)
348{ 263{
349 if( !SetThreadAffinityMask( GetCurrentThread(), aff)) 264 // prio range [-3,+3] was checked by the caller
265 if (!SetThreadPriority(thread_.native_handle(), gs_prio_remap[prio_ + 3]))
350 { 266 {
351 FAIL( "THREAD_SET_AFFINITY", GetLastError()); 267 FAIL("JTHREAD_SET_PRIORITY", GetLastError());
352 } 268 }
353} 269}
354 270
355bool THREAD_WAIT_IMPL( THREAD_T *ref, double secs) 271// ###############################################################################################
356{
357 DWORD ms = (secs<0.0) ? INFINITE : (DWORD)((secs*1000.0)+0.5);
358 272
359 DWORD rc= WaitForSingleObject( *ref, ms /*timeout*/ ); 273void THREAD_SET_AFFINITY(unsigned int aff)
360 // 274{
361 // (WAIT_ABANDONED) 275 if( !SetThreadAffinityMask( GetCurrentThread(), aff))
362 // WAIT_OBJECT_0 success (0)
363 // WAIT_TIMEOUT
364 // WAIT_FAILED more info via GetLastError()
365
366 if (rc == WAIT_TIMEOUT) return false;
367 if( rc !=0) FAIL( "WaitForSingleObject", rc==WAIT_FAILED ? GetLastError() : rc);
368 *ref = nullptr; // thread no longer usable
369 return true;
370 }
371 //
372 void THREAD_KILL( THREAD_T *ref )
373 { 276 {
374 // nonexistent on Xbox360, simply disable until a better solution is found 277 FAIL( "THREAD_SET_AFFINITY", GetLastError());
375 #if !defined( PLATFORM_XBOX)
376 // in theory no-one should call this as it is very dangerous (memory and mutex leaks, no notification of DLLs, etc.)
377 if (!TerminateThread( *ref, 0 )) FAIL("TerminateThread", GetLastError());
378 #endif // PLATFORM_XBOX
379 *ref = nullptr;
380 } 278 }
381 279}
382 void THREAD_MAKE_ASYNCH_CANCELLABLE() {} // nothing to do for windows threads, we can cancel them anytime we want
383 280
384#if !defined __GNUC__ 281#if !defined __GNUC__
385 //see http://msdn.microsoft.com/en-us/library/xcb2z8hs.aspx 282 //see http://msdn.microsoft.com/en-us/library/xcb2z8hs.aspx
@@ -414,158 +311,6 @@ bool THREAD_WAIT_IMPL( THREAD_T *ref, double secs)
414#endif // !__GNUC__ 311#endif // !__GNUC__
415 } 312 }
416 313
417#if _WIN32_WINNT < 0x0600 // CONDITION_VARIABLE aren't available
418
419 void SIGNAL_INIT( SIGNAL_T* ref)
420 {
421 InitializeCriticalSection( &ref->signalCS);
422 InitializeCriticalSection( &ref->countCS);
423 if( 0 == (ref->waitEvent = CreateEvent( 0, true, false, 0))) // manual-reset
424 FAIL( "CreateEvent", GetLastError());
425 if( 0 == (ref->waitDoneEvent = CreateEvent( 0, false, false, 0))) // auto-reset
426 FAIL( "CreateEvent", GetLastError());
427 ref->waitersCount = 0;
428 }
429
430 void SIGNAL_FREE( SIGNAL_T* ref)
431 {
432 CloseHandle( ref->waitDoneEvent);
433 CloseHandle( ref->waitEvent);
434 DeleteCriticalSection( &ref->countCS);
435 DeleteCriticalSection( &ref->signalCS);
436 }
437
438 bool SIGNAL_WAIT( SIGNAL_T* ref, MUTEX_T* mu_ref, time_d abs_secs)
439 {
440 DWORD errc;
441 DWORD ms;
442
443 if( abs_secs < 0.0)
444 ms = INFINITE;
445 else if( abs_secs == 0.0)
446 ms = 0;
447 else
448 {
449 time_d msd = (abs_secs - now_secs()) * 1000.0 + 0.5;
450 // If the time already passed, still try once (ms==0). A short timeout
451 // may have turned negative or 0 because of the two time samples done.
452 ms = msd <= 0.0 ? 0 : (DWORD)msd;
453 }
454
455 EnterCriticalSection( &ref->signalCS);
456 EnterCriticalSection( &ref->countCS);
457 ++ ref->waitersCount;
458 LeaveCriticalSection( &ref->countCS);
459 LeaveCriticalSection( &ref->signalCS);
460
461 errc = SignalObjectAndWait( *mu_ref, ref->waitEvent, ms, false);
462
463 EnterCriticalSection( &ref->countCS);
464 if( 0 == -- ref->waitersCount)
465 {
466 // we're the last one leaving...
467 ResetEvent( ref->waitEvent);
468 SetEvent( ref->waitDoneEvent);
469 }
470 LeaveCriticalSection( &ref->countCS);
471 MUTEX_LOCK( mu_ref);
472
473 switch( errc)
474 {
475 case WAIT_TIMEOUT:
476 return false;
477 case WAIT_OBJECT_0:
478 return true;
479 }
480
481 FAIL( "SignalObjectAndWait", GetLastError());
482 return false;
483 }
484
485 void SIGNAL_ALL( SIGNAL_T* ref)
486 {
487 DWORD errc = WAIT_OBJECT_0;
488
489 EnterCriticalSection( &ref->signalCS);
490 EnterCriticalSection( &ref->countCS);
491
492 if( ref->waitersCount > 0)
493 {
494 ResetEvent( ref->waitDoneEvent);
495 SetEvent( ref->waitEvent);
496 LeaveCriticalSection( &ref->countCS);
497 errc = WaitForSingleObject( ref->waitDoneEvent, INFINITE);
498 }
499 else
500 {
501 LeaveCriticalSection( &ref->countCS);
502 }
503
504 LeaveCriticalSection( &ref->signalCS);
505
506 if( WAIT_OBJECT_0 != errc)
507 FAIL( "WaitForSingleObject", GetLastError());
508 }
509
510#else // CONDITION_VARIABLE are available, use them
511
512 //
513 void SIGNAL_INIT( SIGNAL_T *ref )
514 {
515 InitializeConditionVariable( ref);
516 }
517
518 void SIGNAL_FREE( SIGNAL_T *ref )
519 {
520 // nothing to do
521 (void)ref;
522 }
523
524 bool SIGNAL_WAIT( SIGNAL_T *ref, MUTEX_T *mu_ref, time_d abs_secs)
525 {
526 long ms;
527
528 if( abs_secs < 0.0)
529 ms = INFINITE;
530 else if( abs_secs == 0.0)
531 ms = 0;
532 else
533 {
534 ms = (long) ((abs_secs - now_secs())*1000.0 + 0.5);
535
536 // If the time already passed, still try once (ms==0). A short timeout
537 // may have turned negative or 0 because of the two time samples done.
538 //
539 if( ms < 0)
540 ms = 0;
541 }
542
543 if( !SleepConditionVariableCS( ref, mu_ref, ms))
544 {
545 if( GetLastError() == ERROR_TIMEOUT)
546 {
547 return false;
548 }
549 else
550 {
551 FAIL( "SleepConditionVariableCS", GetLastError());
552 }
553 }
554 return true;
555 }
556
557 void SIGNAL_ONE( SIGNAL_T *ref )
558 {
559 WakeConditionVariable( ref);
560 }
561
562 void SIGNAL_ALL( SIGNAL_T *ref )
563 {
564 WakeAllConditionVariable( ref);
565 }
566
567#endif // CONDITION_VARIABLE are available
568
569#else // THREADAPI == THREADAPI_PTHREAD 314#else // THREADAPI == THREADAPI_PTHREAD
570 // PThread (Linux, OS X, ...) 315 // PThread (Linux, OS X, ...)
571 // 316 //
@@ -607,44 +352,6 @@ bool THREAD_WAIT_IMPL( THREAD_T *ref, double secs)
607 abort(); 352 abort();
608 } 353 }
609 #define PT_CALL( call ) { int rc= call; if (rc!=0) _PT_FAIL( rc, #call, __FILE__, __LINE__ ); } 354 #define PT_CALL( call ) { int rc= call; if (rc!=0) _PT_FAIL( rc, #call, __FILE__, __LINE__ ); }
610 //
611 void SIGNAL_INIT( SIGNAL_T *ref ) {
612 PT_CALL(pthread_cond_init(ref, nullptr /*attr*/));
613 }
614 void SIGNAL_FREE( SIGNAL_T *ref ) {
615 PT_CALL( pthread_cond_destroy(ref) );
616 }
617 //
618 /*
619 * Timeout is given as absolute since we may have fake wakeups during
620 * a timed out sleep. A Linda with some other key read, or just because
621 * PThread cond vars can wake up unwantedly.
622 */
623 bool SIGNAL_WAIT( SIGNAL_T *ref, pthread_mutex_t *mu, time_d abs_secs ) {
624 if (abs_secs<0.0) {
625 PT_CALL( pthread_cond_wait( ref, mu ) ); // infinite
626 } else {
627 int rc;
628 struct timespec ts;
629
630 assert( abs_secs != 0.0 );
631 prepare_timeout( &ts, abs_secs );
632
633 rc= pthread_cond_timedwait( ref, mu, &ts );
634
635 if (rc==ETIMEDOUT) return false;
636 if (rc) { _PT_FAIL( rc, "pthread_cond_timedwait()", __FILE__, __LINE__ ); }
637 }
638 return true;
639 }
640 //
641 void SIGNAL_ONE( SIGNAL_T *ref ) {
642 PT_CALL( pthread_cond_signal(ref) ); // wake up ONE (or no) waiting thread
643 }
644 //
645 void SIGNAL_ALL( SIGNAL_T *ref ) {
646 PT_CALL( pthread_cond_broadcast(ref) ); // wake up ALL waiting threads
647 }
648 355
649// array of 7 thread priority values, hand-tuned by platform so that we offer a uniform [-3,+3] public priority range 356// array of 7 thread priority values, hand-tuned by platform so that we offer a uniform [-3,+3] public priority range
650static int const gs_prio_remap[] = 357static int const gs_prio_remap[] =
@@ -775,129 +482,36 @@ static int select_prio(int prio /* -3..+3 */)
775 return gs_prio_remap[prio + 3]; 482 return gs_prio_remap[prio + 3];
776} 483}
777 484
778void THREAD_CREATE( THREAD_T* ref, THREAD_RETURN_T (*func)( void*), void* data, int prio /* -3..+3 */) 485void THREAD_SET_PRIORITY( int prio)
779{ 486{
780 pthread_attr_t a;
781 bool const change_priority =
782#ifdef PLATFORM_LINUX 487#ifdef PLATFORM_LINUX
783 sudo && // only root-privileged process can change priorities 488 if( sudo) // only root-privileged process can change priorities
784#endif 489#endif // PLATFORM_LINUX
785 (prio != THREAD_PRIO_DEFAULT);
786
787 PT_CALL( pthread_attr_init( &a));
788
789#ifndef PTHREAD_TIMEDJOIN
790 // We create a NON-JOINABLE thread. This is mainly due to the lack of
791 // 'pthread_timedjoin()', but does offer other benefits (s.a. earlier
792 // freeing of the thread's resources).
793 //
794 PT_CALL( pthread_attr_setdetachstate( &a, PTHREAD_CREATE_DETACHED));
795#endif // PTHREAD_TIMEDJOIN
796
797 // Use this to find a system's default stack size (DEBUG)
798#if 0
799 {
800 size_t n;
801 pthread_attr_getstacksize( &a, &n);
802 fprintf( stderr, "Getstack: %u\n", (unsigned int)n);
803 }
804 // 524288 on OS X
805 // 2097152 on Linux x86 (Ubuntu 7.04)
806 // 1048576 on FreeBSD 6.2 SMP i386
807#endif // 0
808
809#if defined _THREAD_STACK_SIZE && _THREAD_STACK_SIZE > 0
810 PT_CALL( pthread_attr_setstacksize( &a, _THREAD_STACK_SIZE));
811#endif
812
813 if (change_priority)
814 { 490 {
815 struct sched_param sp; 491 struct sched_param sp;
816 // "The specified scheduling parameters are only used if the scheduling 492 // prio range [-3,+3] was checked by the caller
817 // parameter inheritance attribute is PTHREAD_EXPLICIT_SCHED." 493 sp.sched_priority = gs_prio_remap[ prio + 3];
818 // 494 PT_CALL( pthread_setschedparam( pthread_self(), _PRIO_MODE, &sp));
819#if !defined __ANDROID__ || ( defined __ANDROID__ && __ANDROID_API__ >= 28 )
820 PT_CALL( pthread_attr_setinheritsched( &a, PTHREAD_EXPLICIT_SCHED));
821#endif
822
823#ifdef _PRIO_SCOPE
824 PT_CALL( pthread_attr_setscope( &a, _PRIO_SCOPE));
825#endif // _PRIO_SCOPE
826
827 PT_CALL( pthread_attr_setschedpolicy( &a, _PRIO_MODE));
828
829 sp.sched_priority = select_prio(prio);
830 PT_CALL( pthread_attr_setschedparam( &a, &sp));
831 }
832
833 //---
834 // Seems on OS X, _POSIX_THREAD_THREADS_MAX is some kind of system
835 // thread limit (not userland thread). Actual limit for us is way higher.
836 // PTHREAD_THREADS_MAX is not defined (even though man page refers to it!)
837 //
838# ifndef THREAD_CREATE_RETRIES_MAX
839 // Don't bother with retries; a failure is a failure
840 //
841 {
842 int rc = pthread_create( ref, &a, func, data);
843 if( rc) _PT_FAIL( rc, "pthread_create()", __FILE__, __LINE__ - 1);
844 } 495 }
845# else
846# error "This code deprecated"
847 /*
848 // Wait slightly if thread creation has exchausted the system
849 //
850 { int retries;
851 for( retries=0; retries<THREAD_CREATE_RETRIES_MAX; retries++ ) {
852
853 int rc= pthread_create( ref, &a, func, data );
854 //
855 // OS X / Linux:
856 // EAGAIN: ".. lacked the necessary resources to create
857 // another thread, or the system-imposed limit on the
858 // total number of threads in a process
859 // [PTHREAD_THREADS_MAX] would be exceeded."
860 // EINVAL: attr is invalid
861 // Linux:
862 // EPERM: no rights for given parameters or scheduling (no sudo)
863 // ENOMEM: (known to fail with this code, too - not listed in man)
864
865 if (rc==0) break; // ok!
866
867 // In practise, exhaustion seems to be coming from memory, not a
868 // maximum number of threads. Keep tuning... ;)
869 //
870 if (rc==EAGAIN) {
871 //fprintf( stderr, "Looping (retries=%d) ", retries ); // DEBUG
872
873 // Try again, later.
874
875 Yield();
876 } else {
877 _PT_FAIL( rc, "pthread_create()", __FILE__, __LINE__ );
878 }
879 }
880 }
881 */
882# endif
883
884 PT_CALL( pthread_attr_destroy( &a));
885} 496}
886 497
498// #################################################################################################
887 499
888void THREAD_SET_PRIORITY( int prio) 500void JTHREAD_SET_PRIORITY(std::jthread& thread_, int prio_)
889{ 501{
890#ifdef PLATFORM_LINUX 502#ifdef PLATFORM_LINUX
891 if( sudo) // only root-privileged process can change priorities 503 if (sudo) // only root-privileged process can change priorities
892#endif // PLATFORM_LINUX 504#endif // PLATFORM_LINUX
893 { 505 {
894 struct sched_param sp; 506 struct sched_param sp;
895 // prio range [-3,+3] was checked by the caller 507 // prio range [-3,+3] was checked by the caller
896 sp.sched_priority = gs_prio_remap[ prio + 3]; 508 sp.sched_priority = gs_prio_remap[prio_ + 3];
897 PT_CALL( pthread_setschedparam( pthread_self(), _PRIO_MODE, &sp)); 509 PT_CALL(pthread_setschedparam(static_cast<pthread_t>(thread_.native_handle()), _PRIO_MODE, &sp));
898 } 510 }
899} 511}
900 512
513// #################################################################################################
514
901void THREAD_SET_AFFINITY( unsigned int aff) 515void THREAD_SET_AFFINITY( unsigned int aff)
902{ 516{
903 int bit = 0; 517 int bit = 0;
@@ -929,93 +543,6 @@ void THREAD_SET_AFFINITY( unsigned int aff)
929#endif 543#endif
930} 544}
931 545
932 /*
933 * Wait for a thread to finish.
934 *
935 * 'mu_ref' is a lock we should use for the waiting; initially unlocked.
936 * Same lock as passed to THREAD_EXIT.
937 *
938 * Returns true for successful wait, false for timed out
939 */
940bool THREAD_WAIT( THREAD_T *ref, double secs , SIGNAL_T *signal_ref, MUTEX_T *mu_ref, volatile enum e_status *st_ref)
941{
942 struct timespec ts_store;
943 const struct timespec* timeout = nullptr;
944 bool done;
945
946 // Do timeout counting before the locks
947 //
948#if THREADWAIT_METHOD == THREADWAIT_TIMEOUT
949 if (secs>=0.0)
950#else // THREADWAIT_METHOD == THREADWAIT_CONDVAR
951 if (secs>0.0)
952#endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR
953 {
954 prepare_timeout( &ts_store, now_secs()+secs );
955 timeout= &ts_store;
956 }
957
958#if THREADWAIT_METHOD == THREADWAIT_TIMEOUT
959 /* Thread is joinable
960 */
961 if (!timeout) {
962 PT_CALL(pthread_join(*ref, nullptr /*ignore exit value*/));
963 done = true;
964 } else {
965 int rc = PTHREAD_TIMEDJOIN(*ref, nullptr, timeout);
966 if ((rc!=0) && (rc!=ETIMEDOUT)) {
967 _PT_FAIL( rc, "PTHREAD_TIMEDJOIN", __FILE__, __LINE__-2 );
968 }
969 done= rc==0;
970 }
971#else // THREADWAIT_METHOD == THREADWAIT_CONDVAR
972 /* Since we've set the thread up as PTHREAD_CREATE_DETACHED, we cannot
973 * join with it. Use the cond.var.
974 */
975 (void) ref; // unused
976 MUTEX_LOCK( mu_ref );
977
978 // 'secs'==0.0 does not need to wait, just take the current status
979 // within the 'mu_ref' locks
980 //
981 if (secs != 0.0) {
982 while( *st_ref < DONE ) {
983 if (!timeout) {
984 PT_CALL( pthread_cond_wait( signal_ref, mu_ref ));
985 } else {
986 int rc= pthread_cond_timedwait( signal_ref, mu_ref, timeout );
987 if (rc==ETIMEDOUT) break;
988 if (rc!=0) _PT_FAIL( rc, "pthread_cond_timedwait", __FILE__, __LINE__-2 );
989 }
990 }
991 }
992 done= *st_ref >= DONE; // DONE|ERROR_ST|CANCELLED
993
994 MUTEX_UNLOCK( mu_ref );
995#endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR
996 return done;
997 }
998 //
999 void THREAD_KILL( THREAD_T *ref ) {
1000#ifdef __ANDROID__
1001 __android_log_print(ANDROID_LOG_WARN, LOG_TAG, "Cannot kill thread!");
1002#else
1003 pthread_cancel( *ref );
1004#endif
1005 }
1006
1007 void THREAD_MAKE_ASYNCH_CANCELLABLE()
1008 {
1009#ifdef __ANDROID__
1010 __android_log_print(ANDROID_LOG_WARN, LOG_TAG, "Cannot make thread async cancellable!");
1011#else
1012 // that's the default, but just in case...
1013 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, nullptr);
1014 // we want cancellation to take effect immediately if possible, instead of waiting for a cancellation point (which is the default)
1015 pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, nullptr);
1016#endif
1017 }
1018
1019 void THREAD_SETNAME( char const* _name) 546 void THREAD_SETNAME( char const* _name)
1020 { 547 {
1021 // exact API to set the thread name is platform-dependant 548 // exact API to set the thread name is platform-dependant
diff --git a/src/threading.h b/src/threading.h
index 38a021f..e9f302a 100644
--- a/src/threading.h
+++ b/src/threading.h
@@ -1,13 +1,9 @@
1#pragma once 1#pragma once
2 2
3/*
4 * win32-pthread:
5 * define HAVE_WIN32_PTHREAD and PTW32_INCLUDE_WINDOWS_H in your project configuration when building for win32-pthread.
6 * link against pthreadVC2.lib, and of course have pthreadVC2.dll somewhere in your path.
7 */
8#include "platform.h" 3#include "platform.h"
9 4
10#include <time.h> 5#include <time.h>
6#include <thread>
11 7
12/* Note: ERROR is a defined entity on Win32 8/* Note: ERROR is a defined entity on Win32
13 PENDING: The Lua VM hasn't done anything yet. 9 PENDING: The Lua VM hasn't done anything yet.
@@ -19,7 +15,7 @@ enum e_status { PENDING, RUNNING, WAITING, DONE, ERROR_ST, CANCELLED };
19#define THREADAPI_WINDOWS 1 15#define THREADAPI_WINDOWS 1
20#define THREADAPI_PTHREAD 2 16#define THREADAPI_PTHREAD 2
21 17
22#if( defined( PLATFORM_XBOX) || defined( PLATFORM_WIN32) || defined( PLATFORM_POCKETPC)) && !defined( HAVE_WIN32_PTHREAD) 18#if( defined( PLATFORM_XBOX) || defined( PLATFORM_WIN32) || defined( PLATFORM_POCKETPC))
23//#pragma message ( "THREADAPI_WINDOWS" ) 19//#pragma message ( "THREADAPI_WINDOWS" )
24#define THREADAPI THREADAPI_WINDOWS 20#define THREADAPI THREADAPI_WINDOWS
25#else // (defined PLATFORM_WIN32) || (defined PLATFORM_POCKETPC) 21#else // (defined PLATFORM_WIN32) || (defined PLATFORM_POCKETPC)
@@ -68,16 +64,9 @@ enum e_status { PENDING, RUNNING, WAITING, DONE, ERROR_ST, CANCELLED };
68 }; 64 };
69 65
70 66
71 #define MUTEX_T HANDLE
72 void MUTEX_INIT( MUTEX_T* ref);
73 void MUTEX_FREE( MUTEX_T* ref);
74 void MUTEX_LOCK( MUTEX_T* ref);
75 void MUTEX_UNLOCK( MUTEX_T* ref);
76
77 #else // CONDITION_VARIABLE are available, use them 67 #else // CONDITION_VARIABLE are available, use them
78 68
79 #define SIGNAL_T CONDITION_VARIABLE 69 #define SIGNAL_T CONDITION_VARIABLE
80 #define MUTEX_T CRITICAL_SECTION
81 #define MUTEX_INIT( ref) InitializeCriticalSection( ref) 70 #define MUTEX_INIT( ref) InitializeCriticalSection( ref)
82 #define MUTEX_FREE( ref) DeleteCriticalSection( ref) 71 #define MUTEX_FREE( ref) DeleteCriticalSection( ref)
83 #define MUTEX_LOCK( ref) EnterCriticalSection( ref) 72 #define MUTEX_LOCK( ref) EnterCriticalSection( ref)
@@ -111,7 +100,6 @@ enum e_status { PENDING, RUNNING, WAITING, DONE, ERROR_ST, CANCELLED };
111 # define _MUTEX_RECURSIVE PTHREAD_MUTEX_RECURSIVE 100 # define _MUTEX_RECURSIVE PTHREAD_MUTEX_RECURSIVE
112 #endif 101 #endif
113 102
114 #define MUTEX_T pthread_mutex_t
115 #define MUTEX_INIT(ref) pthread_mutex_init(ref, nullptr) 103 #define MUTEX_INIT(ref) pthread_mutex_init(ref, nullptr)
116 #define MUTEX_RECURSIVE_INIT(ref) \ 104 #define MUTEX_RECURSIVE_INIT(ref) \
117 { pthread_mutexattr_t a; pthread_mutexattr_init( &a ); \ 105 { pthread_mutexattr_t a; pthread_mutexattr_init( &a ); \
@@ -126,8 +114,6 @@ enum e_status { PENDING, RUNNING, WAITING, DONE, ERROR_ST, CANCELLED };
126 114
127 using SIGNAL_T = pthread_cond_t; 115 using SIGNAL_T = pthread_cond_t;
128 116
129 void SIGNAL_ONE( SIGNAL_T *ref );
130
131 // Yield is non-portable: 117 // Yield is non-portable:
132 // 118 //
133 // OS X 10.4.8/9 has pthread_yield_np() 119 // OS X 10.4.8/9 has pthread_yield_np()
@@ -143,10 +129,6 @@ enum e_status { PENDING, RUNNING, WAITING, DONE, ERROR_ST, CANCELLED };
143 #define THREAD_CALLCONV 129 #define THREAD_CALLCONV
144#endif //THREADAPI == THREADAPI_PTHREAD 130#endif //THREADAPI == THREADAPI_PTHREAD
145 131
146void SIGNAL_INIT( SIGNAL_T *ref );
147void SIGNAL_FREE( SIGNAL_T *ref );
148void SIGNAL_ALL( SIGNAL_T *ref );
149
150/* 132/*
151* 'time_d': <0.0 for no timeout 133* 'time_d': <0.0 for no timeout
152* 0.0 for instant check 134* 0.0 for instant check
@@ -155,11 +137,6 @@ void SIGNAL_ALL( SIGNAL_T *ref );
155using time_d = double; 137using time_d = double;
156time_d now_secs(void); 138time_d now_secs(void);
157 139
158time_d SIGNAL_TIMEOUT_PREPARE( double rel_secs );
159
160bool SIGNAL_WAIT( SIGNAL_T *ref, MUTEX_T *mu, time_d timeout );
161
162
163/*---=== Threading ===--- 140/*---=== Threading ===---
164*/ 141*/
165 142
@@ -167,16 +144,9 @@ bool SIGNAL_WAIT( SIGNAL_T *ref, MUTEX_T *mu, time_d timeout );
167 144
168#if THREADAPI == THREADAPI_WINDOWS 145#if THREADAPI == THREADAPI_WINDOWS
169 146
170 using THREAD_T = HANDLE;
171# define THREAD_ISNULL( _h) (_h == 0)
172 void THREAD_CREATE( THREAD_T* ref, THREAD_RETURN_T (__stdcall *func)( void*), void* data, int prio /* -3..+3 */);
173
174# define THREAD_PRIO_MIN (-3) 147# define THREAD_PRIO_MIN (-3)
175# define THREAD_PRIO_MAX (+3) 148# define THREAD_PRIO_MAX (+3)
176 149
177# define THREAD_CLEANUP_PUSH( cb_, val_)
178# define THREAD_CLEANUP_POP( execute_)
179
180#else // THREADAPI == THREADAPI_PTHREAD 150#else // THREADAPI == THREADAPI_PTHREAD
181 151
182 /* Platforms that have a timed 'pthread_join()' can get away with a simpler 152 /* Platforms that have a timed 'pthread_join()' can get away with a simpler
@@ -195,11 +165,6 @@ bool SIGNAL_WAIT( SIGNAL_T *ref, MUTEX_T *mu, time_d timeout );
195# endif 165# endif
196# endif 166# endif
197 167
198 using THREAD_T = pthread_t;
199# define THREAD_ISNULL( _h) 0 // pthread_t may be a structure: never 'null' by itself
200
201 void THREAD_CREATE( THREAD_T* ref, THREAD_RETURN_T (*func)( void*), void* data, int prio /* -3..+3 */);
202
203# if defined(PLATFORM_LINUX) 168# if defined(PLATFORM_LINUX)
204 extern volatile bool sudo; 169 extern volatile bool sudo;
205# ifdef LINUX_SCHED_RR 170# ifdef LINUX_SCHED_RR
@@ -213,13 +178,6 @@ bool SIGNAL_WAIT( SIGNAL_T *ref, MUTEX_T *mu, time_d timeout );
213# define THREAD_PRIO_MAX (+3) 178# define THREAD_PRIO_MAX (+3)
214# endif 179# endif
215 180
216# if THREADWAIT_METHOD == THREADWAIT_CONDVAR
217# define THREAD_CLEANUP_PUSH( cb_, val_) pthread_cleanup_push( cb_, val_)
218# define THREAD_CLEANUP_POP( execute_) pthread_cleanup_pop( execute_)
219# else
220# define THREAD_CLEANUP_PUSH( cb_, val_) {
221# define THREAD_CLEANUP_POP( execute_) }
222# endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR
223#endif // THREADAPI == THREADAPI_WINDOWS 181#endif // THREADAPI == THREADAPI_WINDOWS
224 182
225/* 183/*
@@ -236,16 +194,8 @@ bool SIGNAL_WAIT( SIGNAL_T *ref, MUTEX_T *mu, time_d timeout );
236#endif // THREADAPI == THREADAPI_WINDOWS || (defined PTHREAD_TIMEDJOIN) 194#endif // THREADAPI == THREADAPI_WINDOWS || (defined PTHREAD_TIMEDJOIN)
237 195
238 196
239#if THREADWAIT_METHOD == THREADWAIT_TIMEOUT
240bool THREAD_WAIT_IMPL( THREAD_T *ref, double secs);
241#define THREAD_WAIT( a, b, c, d, e) THREAD_WAIT_IMPL( a, b)
242#else // THREADWAIT_METHOD == THREADWAIT_CONDVAR
243bool THREAD_WAIT_IMPL( THREAD_T *ref, double secs, SIGNAL_T *signal_ref, MUTEX_T *mu_ref, volatile enum e_status *st_ref);
244#define THREAD_WAIT THREAD_WAIT_IMPL
245#endif // // THREADWAIT_METHOD == THREADWAIT_CONDVAR
246
247void THREAD_KILL( THREAD_T* ref);
248void THREAD_SETNAME( char const* _name); 197void THREAD_SETNAME( char const* _name);
249void THREAD_MAKE_ASYNCH_CANCELLABLE();
250void THREAD_SET_PRIORITY( int prio); 198void THREAD_SET_PRIORITY( int prio);
251void THREAD_SET_AFFINITY( unsigned int aff); 199void THREAD_SET_AFFINITY( unsigned int aff);
200
201void JTHREAD_SET_PRIORITY(std::jthread& thread_, int prio_);