aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/index.html49
-rw-r--r--src/cancel.cpp178
-rw-r--r--src/cancel.h8
-rw-r--r--src/keeper.cpp19
-rw-r--r--src/keeper.h10
-rw-r--r--src/lanes.cpp337
-rw-r--r--src/lanes.lua451
-rw-r--r--src/lanes_private.h43
-rw-r--r--src/linda.cpp92
-rw-r--r--src/macros_and_utils.h5
-rw-r--r--src/threading.cpp525
-rw-r--r--src/threading.h58
-rw-r--r--tests/cancel.lua20
13 files changed, 588 insertions, 1207 deletions
diff --git a/docs/index.html b/docs/index.html
index ee5acfa..d24d3d7 100644
--- a/docs/index.html
+++ b/docs/index.html
@@ -425,7 +425,18 @@
425 number >= 0 425 number >= 0
426 </td> 426 </td>
427 <td> 427 <td>
428 Sets the duration in seconds Lanes will wait for graceful termination of running lanes at application shutdown. Irrelevant for builds using pthreads. Default is <tt>0.25</tt>. 428 Sets the duration in seconds Lanes will wait for graceful termination of running lanes at application shutdown. Default is <tt>0.25</tt>.
429 </td>
430 </tr>
431 <tr valign=top>
432 <td id="shutdown_mode">
433 <code>.shutdown_mode</code>
434 </td>
435 <td>
436 <tt>"hard"</tt>/<tt>"soft"</tt>/<tt>"call"</tt>/<tt>"ret"</tt>/<tt>"line"</tt>/<tt>"count"</tt>
437 </td>
438 <td>
439 Select the cancellation mode used at Lanes shutdown to request free running lane termination. See <a href="#cancelling">lane cancellation</a>. Default is <tt>"hard"</tt>.
429 </td> 440 </td>
430 </tr> 441 </tr>
431 </table> 442 </table>
@@ -875,16 +886,6 @@
875 received <a href="#cancelling">cancellation</a> and finished itself. 886 received <a href="#cancelling">cancellation</a> and finished itself.
876 </td> 887 </td>
877 </tr> 888 </tr>
878 <tr>
879 <td/>
880 <td>
881 <tt>"killed"</tt>
882 </td>
883 <td/>
884 <td>
885 was forcefully killed by <tt>lane_h:cancel()</tt>
886 </td>
887 </tr>
888 </table> 889 </table>
889</p> 890</p>
890 891
@@ -996,36 +997,33 @@
996<h2 id="cancelling">Cancelling</h2> 997<h2 id="cancelling">Cancelling</h2>
997 998
998<table border="1" bgcolor="#E0E0FF" cellpadding="10" style="width:50%"><tr><td><pre> 999<table border="1" bgcolor="#E0E0FF" cellpadding="10" style="width:50%"><tr><td><pre>
999 bool[,reason] = lane_h:cancel( "soft" [, timeout] [, wake_bool]) 1000 bool[,reason] = lane_h:cancel( "soft" [, timeout] [, wake_lane])
1000 bool[,reason] = lane_h:cancel( "hard" [, timeout] [, force [, forcekill_timeout]]) 1001 bool[,reason] = lane_h:cancel( "hard" [, timeout] [, wake_lane])
1001 bool[,reason] = lane_h:cancel( [mode, hookcount] [, timeout] [, force [, forcekill_timeout]]) 1002 bool[,reason] = lane_h:cancel( [mode, hookcount] [, timeout] [, wake_lane])
1002</pre></td></tr></table> 1003</pre></td></tr></table>
1003 1004
1004<p> 1005<p>
1005 <tt>cancel()</tt> sends a cancellation request to the lane.<br/> 1006 <tt>cancel()</tt> sends a cancellation request to the lane.<br/>
1006 First argument is a <tt>mode</tt> can be one of <tt>"hard"</tt>, <tt>"soft"</tt>, <tt>"count"</tt>, <tt>"line"</tt>, <tt>"call"</tt>, <tt>"ret"</tt>. 1007 First argument is a <tt>mode</tt> can be one of <tt>"hard"</tt>, <tt>"soft"</tt>, <tt>"call"</tt>, <tt>"ret"</tt>, <tt>"line"</tt>, <tt>"count"</tt>.
1007 If <tt>mode</tt> is not specified, it defaults to <tt>"hard"</tt>. 1008 If <tt>mode</tt> is not specified, it defaults to <tt>"hard"</tt>.
1009 If <tt>wake_lane</tt> is <tt>true</tt>, the lane is also signalled so that execution returns from any pending linda operation. Linda operations detecting the cancellation request return <tt>lanes.cancel_error</tt>.
1008</p> 1010</p>
1009<p> 1011<p>
1010 If <tt>mode</tt> is <tt>"soft"</tt>, cancellation will only cause <tt>cancel_test()</tt> to return <tt>true</tt>, so that the lane can cleanup manually.<br/> 1012 If <tt>mode</tt> is <tt>"soft"</tt>, cancellation will only cause <tt>cancel_test()</tt> to return <tt>true</tt>, so that the lane can cleanup manually.<br/>
1011 If <tt>wake_bool</tt> is <tt>true</tt>, the lane is also signalled so that execution returns from any pending linda operation. Linda operations detecting the cancellation request return <tt>lanes.cancel_error</tt>.
1012</p> 1013</p>
1013<p> 1014<p>
1014 If <tt>mode</tt> is <tt>"hard"</tt>, waits for the request to be processed, or a timeout to occur. Linda operations detecting the cancellation request will raise a special cancellation error (meaning they won't return in that case).<br/> 1015 If <tt>mode</tt> is <tt>"hard"</tt>, waits for the request to be processed, or a timeout to occur. Linda operations detecting the cancellation request will raise a special cancellation error (meaning they won't return in that case).<br/>
1015 <tt>timeout</tt> defaults to 0 if not specified. 1016 <tt>wake_lane</tt> defaults to <tt>true</tt>, and <tt>timeout</tt> defaults to 0 if not specified.
1016</p> 1017</p>
1017<p> 1018<p>
1018 Other values of <tt>mode</tt> will asynchronously install the corresponding hook, then behave as <tt>"hard"</tt>. 1019 Other values of <tt>mode</tt> will asynchronously install the corresponding hook, then behave as <tt>"hard"</tt>.
1019</p> 1020</p>
1020<p> 1021<p>
1021 If <tt>force_kill_bool</tt> is <tt>true</tt>, <tt>forcekill_timeout</tt> can be set to tell how long lanes will wait for the OS thread to terminate before raising an error. Windows threads always terminate immediately, but it might not always be the case with some pthread implementations.
1022</p>
1023<p>
1024 Returns <tt>true, lane_h.status</tt> if lane was already done (in <tt>"done"</tt>, <tt>"error"</tt> or <tt>"cancelled"</tt> status), or the cancellation was fruitful within <tt>timeout_secs</tt> timeout period.<br/> 1022 Returns <tt>true, lane_h.status</tt> if lane was already done (in <tt>"done"</tt>, <tt>"error"</tt> or <tt>"cancelled"</tt> status), or the cancellation was fruitful within <tt>timeout_secs</tt> timeout period.<br/>
1025 Returns <tt>false, "timeout"</tt> otherwise. 1023 Returns <tt>false, "timeout"</tt> otherwise.
1026</p> 1024</p>
1027<p> 1025<p>
1028 If the lane is still running after the timeout expired and <tt>force_kill</tt> is <tt>true</tt>, the OS thread running the lane is forcefully killed. This means no GC, probable OS resource leaks (thread stack, locks, DLL notifications), and should generally be the last resort. 1026 If the lane is still running after the timeout expired, there is a chance lanes will raise an error at shutdown when failing to terminate all free-running lanes within the specified timeout.
1029</p> 1027</p>
1030<p> 1028<p>
1031 Cancellation is tested <u>before</u> going to sleep in <tt>receive()</tt> or <tt>send()</tt> calls and after executing <tt>cancelstep</tt> Lua statements. A pending <tt>receive()</tt>or <tt>send()</tt> call is awakened. 1029 Cancellation is tested <u>before</u> going to sleep in <tt>receive()</tt> or <tt>send()</tt> calls and after executing <tt>cancelstep</tt> Lua statements. A pending <tt>receive()</tt>or <tt>send()</tt> call is awakened.
@@ -1396,6 +1394,14 @@ events to a common Linda, but... :).</font>
1396 Default duration is null, which should only cause a thread context switch. 1394 Default duration is null, which should only cause a thread context switch.
1397</p> 1395</p>
1398 1396
1397<table border="1" bgcolor="#E0E0FF" cellpadding="10" style="width:50%"><tr><td><pre>
1398 number = lanes.now_secs()
1399</pre></td></tr></table>
1400
1401<p>
1402 Returns the current value of the clock used by timers and lindas.
1403</p>
1404
1399<!-- locks +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ --> 1405<!-- locks +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ -->
1400<hr/> 1406<hr/>
1401<h2 id="locks">Locks etc.</h2> 1407<h2 id="locks">Locks etc.</h2>
@@ -1797,3 +1803,4 @@ int luaD_new_clonable( lua_State* L)
1797 1803
1798</body> 1804</body>
1799</html> 1805</html>
1806</pre> \ No newline at end of file
diff --git a/src/cancel.cpp b/src/cancel.cpp
index 4667f07..6a94343 100644
--- a/src/cancel.cpp
+++ b/src/cancel.cpp
@@ -92,7 +92,7 @@ static void cancel_hook(lua_State* L, [[maybe_unused]] lua_Debug* ar)
92// ################################################################################################ 92// ################################################################################################
93 93
94//--- 94//---
95// = thread_cancel( lane_ud [,timeout_secs=0.0] [,force_kill_bool=false] ) 95// = thread_cancel( lane_ud [,timeout_secs=0.0] [,wake_lindas_bool=false] )
96// 96//
97// The originator thread asking us specifically to cancel the other thread. 97// The originator thread asking us specifically to cancel the other thread.
98// 98//
@@ -100,9 +100,8 @@ static void cancel_hook(lua_State* L, [[maybe_unused]] lua_Debug* ar)
100// 0.0: just signal it to cancel, no time waited 100// 0.0: just signal it to cancel, no time waited
101// >0: time to wait for the lane to detect cancellation 101// >0: time to wait for the lane to detect cancellation
102// 102//
103// 'force_kill': if true, and lane does not detect cancellation within timeout, 103// 'wake_lindas_bool': if true, signal any linda the thread is waiting on
104// it is forcefully killed. Using this with 0.0 timeout means just kill 104// instead of waiting for its timeout (if any)
105// (unless the lane is already finished).
106// 105//
107// Returns: true if the lane was already finished (DONE/ERROR_ST/CANCELLED) or if we 106// Returns: true if the lane was already finished (DONE/ERROR_ST/CANCELLED) or if we
108// managed to cancel it. 107// managed to cancel it.
@@ -111,76 +110,47 @@ static void cancel_hook(lua_State* L, [[maybe_unused]] lua_Debug* ar)
111 110
112// ################################################################################################ 111// ################################################################################################
113 112
114static CancelResult thread_cancel_soft(Lane* lane_, double secs_, bool wake_lindas_) 113static CancelResult thread_cancel_soft(Lane* lane_, lua_Duration duration_, bool wake_lane_)
115{ 114{
116 lane_->cancel_request = CancelRequest::Soft; // it's now signaled to stop 115 lane_->cancel_request = CancelRequest::Soft; // it's now signaled to stop
117 // negative timeout: we don't want to truly abort the lane, we just want it to react to cancel_test() on its own 116 // negative timeout: we don't want to truly abort the lane, we just want it to react to cancel_test() on its own
118 if (wake_lindas_) // wake the thread so that execution returns from any pending linda operation if desired 117 if (wake_lane_) // wake the thread so that execution returns from any pending linda operation if desired
119 { 118 {
120 SIGNAL_T* const waiting_on{ lane_->waiting_on }; 119 std::condition_variable* const waiting_on{ lane_->m_waiting_on };
121 if (lane_->status == WAITING && waiting_on != nullptr) 120 if (lane_->status == WAITING && waiting_on != nullptr)
122 { 121 {
123 SIGNAL_ALL( waiting_on); 122 waiting_on->notify_all();
124 } 123 }
125 } 124 }
126 125
127 return THREAD_WAIT(&lane_->thread, secs_, &lane_->done_signal, &lane_->done_lock, &lane_->status) ? CancelResult::Cancelled : CancelResult::Timeout; 126 return lane_->waitForCompletion(duration_) ? CancelResult::Cancelled : CancelResult::Timeout;
128} 127}
129 128
130// ################################################################################################ 129// ################################################################################################
131 130
132static CancelResult thread_cancel_hard(lua_State* L, Lane* lane_, double secs_, bool force_, double waitkill_timeout_) 131static CancelResult thread_cancel_hard(Lane* lane_, lua_Duration duration_, bool wake_lane_)
133{ 132{
134 lane_->cancel_request = CancelRequest::Hard; // it's now signaled to stop 133 lane_->cancel_request = CancelRequest::Hard; // it's now signaled to stop
134 //lane_->m_thread.get_stop_source().request_stop();
135 if (wake_lane_) // wake the thread so that execution returns from any pending linda operation if desired
135 { 136 {
136 SIGNAL_T* waiting_on = lane_->waiting_on; 137 std::condition_variable* waiting_on = lane_->m_waiting_on;
137 if (lane_->status == WAITING && waiting_on != nullptr) 138 if (lane_->status == WAITING && waiting_on != nullptr)
138 { 139 {
139 SIGNAL_ALL( waiting_on); 140 waiting_on->notify_all();
140 } 141 }
141 } 142 }
142 143
143 CancelResult result{ THREAD_WAIT(&lane_->thread, secs_, &lane_->done_signal, &lane_->done_lock, &lane_->status) ? CancelResult::Cancelled : CancelResult::Timeout }; 144 CancelResult result{ lane_->waitForCompletion(duration_) ? CancelResult::Cancelled : CancelResult::Timeout };
144
145 if ((result == CancelResult::Timeout) && force_)
146 {
147 // Killing is asynchronous; we _will_ wait for it to be done at
148 // GC, to make sure the data structure can be released (alternative
149 // would be use of "cancellation cleanup handlers" that at least
150 // PThread seems to have).
151 //
152 THREAD_KILL(&lane_->thread);
153#if THREADAPI == THREADAPI_PTHREAD
154 // pthread: make sure the thread is really stopped!
155 // note that this may block forever if the lane doesn't call a cancellation point and pthread doesn't honor PTHREAD_CANCEL_ASYNCHRONOUS
156 result = THREAD_WAIT(&lane_->thread, waitkill_timeout_, &lane_->done_signal, &lane_->done_lock, &lane_->status) ? CancelResult::Killed : CancelResult::Timeout;
157 if (result == CancelResult::Timeout)
158 {
159 std::ignore = luaL_error( L, "force-killed lane failed to terminate within %f second%s", waitkill_timeout_, waitkill_timeout_ > 1 ? "s" : "");
160 }
161#else
162 (void) waitkill_timeout_; // unused
163 (void) L; // unused
164#endif // THREADAPI == THREADAPI_PTHREAD
165 lane_->mstatus = Lane::Killed; // mark 'gc' to wait for it
166 // note that lane_->status value must remain to whatever it was at the time of the kill
167 // because we need to know if we can lua_close() the Lua State or not.
168 result = CancelResult::Killed;
169 }
170 return result; 145 return result;
171} 146}
172 147
173// ################################################################################################ 148// ################################################################################################
174 149
175CancelResult thread_cancel(lua_State* L, Lane* lane_, CancelOp op_, double secs_, bool force_, double waitkill_timeout_) 150CancelResult thread_cancel(Lane* lane_, CancelOp op_, int hook_count_, lua_Duration duration_, bool wake_lane_)
176{ 151{
177 // remember that lanes are not transferable: only one thread can cancel a lane, so no multithreading issue here 152 // remember that lanes are not transferable: only one thread can cancel a lane, so no multithreading issue here
178 // We can read 'lane_->status' without locks, but not wait for it (if Posix no PTHREAD_TIMEDJOIN) 153 // We can read 'lane_->status' without locks, but not wait for it (if Posix no PTHREAD_TIMEDJOIN)
179 if (lane_->mstatus == Lane::Killed)
180 {
181 return CancelResult::Killed;
182 }
183
184 if (lane_->status >= DONE) 154 if (lane_->status >= DONE)
185 { 155 {
186 // say "ok" by default, including when lane is already done 156 // say "ok" by default, including when lane is already done
@@ -191,48 +161,57 @@ CancelResult thread_cancel(lua_State* L, Lane* lane_, CancelOp op_, double secs_
191 // let us hope we never land here with a pointer on a linda that has been destroyed... 161 // let us hope we never land here with a pointer on a linda that has been destroyed...
192 if (op_ == CancelOp::Soft) 162 if (op_ == CancelOp::Soft)
193 { 163 {
194 return thread_cancel_soft(lane_, secs_, force_); 164 return thread_cancel_soft(lane_, duration_, wake_lane_);
165 }
166 else if (static_cast<int>(op_) > static_cast<int>(CancelOp::Soft))
167 {
168 lua_sethook(lane_->L, cancel_hook, static_cast<int>(op_), hook_count_);
195 } 169 }
196 170
197 return thread_cancel_hard(L, lane_, secs_, force_, waitkill_timeout_); 171 return thread_cancel_hard(lane_, duration_, wake_lane_);
198} 172}
199 173
200// ################################################################################################ 174// ################################################################################################
201// ################################################################################################ 175// ################################################################################################
202 176
203// > 0: the mask 177CancelOp which_cancel_op(char const* op_string_)
204// = 0: soft 178{
205// < 0: hard 179 CancelOp op{ CancelOp::Invalid };
206static CancelOp which_op(lua_State* L, int idx_) 180 if (strcmp(op_string_, "hard") == 0)
181 {
182 op = CancelOp::Hard;
183 }
184 else if (strcmp(op_string_, "soft") == 0)
185 {
186 op = CancelOp::Soft;
187 }
188 else if (strcmp(op_string_, "call") == 0)
189 {
190 op = CancelOp::MaskCall;
191 }
192 else if (strcmp(op_string_, "ret") == 0)
193 {
194 op = CancelOp::MaskRet;
195 }
196 else if (strcmp(op_string_, "line") == 0)
197 {
198 op = CancelOp::MaskLine;
199 }
200 else if (strcmp(op_string_, "count") == 0)
201 {
202 op = CancelOp::MaskCount;
203 }
204 return op;
205}
206
207// ################################################################################################
208
209static CancelOp which_cancel_op(lua_State* L, int idx_)
207{ 210{
208 if (lua_type(L, idx_) == LUA_TSTRING) 211 if (lua_type(L, idx_) == LUA_TSTRING)
209 { 212 {
210 CancelOp op{ CancelOp::Invalid }; 213 char const* const str{ lua_tostring(L, idx_) };
211 char const* str = lua_tostring(L, idx_); 214 CancelOp op{ which_cancel_op(str) };
212 if (strcmp(str, "hard") == 0)
213 {
214 op = CancelOp::Hard;
215 }
216 else if (strcmp(str, "soft") == 0)
217 {
218 op = CancelOp::Soft;
219 }
220 else if (strcmp(str, "call") == 0)
221 {
222 op = CancelOp::MaskCall;
223 }
224 else if (strcmp(str, "ret") == 0)
225 {
226 op = CancelOp::MaskRet;
227 }
228 else if (strcmp(str, "line") == 0)
229 {
230 op = CancelOp::MaskLine;
231 }
232 else if (strcmp(str, "count") == 0)
233 {
234 op = CancelOp::MaskCount;
235 }
236 lua_remove(L, idx_); // argument is processed, remove it 215 lua_remove(L, idx_); // argument is processed, remove it
237 if (op == CancelOp::Invalid) 216 if (op == CancelOp::Invalid)
238 { 217 {
@@ -245,53 +224,60 @@ static CancelOp which_op(lua_State* L, int idx_)
245 224
246// ################################################################################################ 225// ################################################################################################
247 226
248// bool[,reason] = lane_h:cancel( [mode, hookcount] [, timeout] [, force [, forcekill_timeout]]) 227// bool[,reason] = lane_h:cancel( [mode, hookcount] [, timeout] [, wake_lindas])
249LUAG_FUNC(thread_cancel) 228LUAG_FUNC(thread_cancel)
250{ 229{
251 Lane* const lane{ lua_toLane(L, 1) }; 230 Lane* const lane{ lua_toLane(L, 1) };
252 CancelOp const op{ which_op(L, 2) }; // this removes the op string from the stack 231 CancelOp const op{ which_cancel_op(L, 2) }; // this removes the op string from the stack
253 232
233 int hook_count{ 0 };
254 if (static_cast<int>(op) > static_cast<int>(CancelOp::Soft)) // hook is requested 234 if (static_cast<int>(op) > static_cast<int>(CancelOp::Soft)) // hook is requested
255 { 235 {
256 int const hook_count{ static_cast<int>(lua_tointeger(L, 2)) }; 236 hook_count = static_cast<int>(luaL_checkinteger(L, 2));
257 lua_remove(L, 2); // argument is processed, remove it 237 lua_remove(L, 2); // argument is processed, remove it
258 if (hook_count < 1) 238 if (hook_count < 1)
259 { 239 {
260 return luaL_error(L, "hook count cannot be < 1"); 240 return luaL_error(L, "hook count cannot be < 1");
261 } 241 }
262 lua_sethook(lane->L, cancel_hook, static_cast<int>(op), hook_count);
263 } 242 }
264 243
265 double secs{ 0.0 }; 244 lua_Duration wait_timeout{ 0.0 };
266 if (lua_type(L, 2) == LUA_TNUMBER) 245 if (lua_type(L, 2) == LUA_TNUMBER)
267 { 246 {
268 secs = lua_tonumber(L, 2); 247 wait_timeout = lua_Duration{ lua_tonumber(L, 2) };
269 lua_remove(L, 2); // argument is processed, remove it 248 lua_remove(L, 2); // argument is processed, remove it
270 if (secs < 0.0) 249 if (wait_timeout.count() < 0.0)
271 { 250 {
272 return luaL_error(L, "cancel timeout cannot be < 0"); 251 return luaL_error(L, "cancel timeout cannot be < 0");
273 } 252 }
274 } 253 }
275 254 // we wake by default in "hard" mode (remember that hook is hard too), but this can be turned off if desired
276 bool const force{ lua_toboolean(L, 2) ? true : false }; // false if nothing there 255 bool wake_lane{ op != CancelOp::Soft };
277 double const forcekill_timeout{ luaL_optnumber(L, 3, 0.0) }; 256 if (lua_gettop(L) >= 2)
278 switch (thread_cancel(L, lane, op, secs, force, forcekill_timeout)) 257 {
258 if (!lua_isboolean(L, 2))
259 {
260 return luaL_error(L, "wake_lindas parameter is not a boolean");
261 }
262 wake_lane = lua_toboolean(L, 2);
263 lua_remove(L, 2); // argument is processed, remove it
264 }
265 switch (thread_cancel(lane, op, hook_count, wait_timeout, wake_lane))
279 { 266 {
267 default: // should never happen unless we added a case and forgot to handle it
268 ASSERT_L(false);
269 break;
270
280 case CancelResult::Timeout: 271 case CancelResult::Timeout:
281 lua_pushboolean(L, 0); 272 lua_pushboolean(L, 0);
282 lua_pushstring(L, "timeout"); 273 lua_pushstring(L, "timeout");
283 return 2; 274 break;
284 275
285 case CancelResult::Cancelled: 276 case CancelResult::Cancelled:
286 lua_pushboolean(L, 1); 277 lua_pushboolean(L, 1);
287 push_thread_status(L, lane); 278 push_thread_status(L, lane);
288 return 2; 279 break;
289
290 case CancelResult::Killed:
291 lua_pushboolean(L, 1);
292 push_thread_status(L, lane);
293 return 2;
294 } 280 }
295 // should never happen, only here to prevent the compiler from complaining of "not all control paths returning a value" 281 // should never happen, only here to prevent the compiler from complaining of "not all control paths returning a value"
296 return 0; 282 return 2;
297} 283}
diff --git a/src/cancel.h b/src/cancel.h
index 884e193..954b04e 100644
--- a/src/cancel.h
+++ b/src/cancel.h
@@ -13,6 +13,8 @@ extern "C" {
13#include "uniquekey.h" 13#include "uniquekey.h"
14#include "macros_and_utils.h" 14#include "macros_and_utils.h"
15 15
16#include <chrono>
17
16// ################################################################################################ 18// ################################################################################################
17 19
18class Lane; // forward 20class Lane; // forward
@@ -30,8 +32,7 @@ enum class CancelRequest
30enum class CancelResult 32enum class CancelResult
31{ 33{
32 Timeout, 34 Timeout,
33 Cancelled, 35 Cancelled
34 Killed
35}; 36};
36 37
37enum class CancelOp 38enum class CancelOp
@@ -48,7 +49,8 @@ enum class CancelOp
48// crc64/we of string "CANCEL_ERROR" generated at http://www.nitrxgen.net/hashgen/ 49// crc64/we of string "CANCEL_ERROR" generated at http://www.nitrxgen.net/hashgen/
49static constexpr UniqueKey CANCEL_ERROR{ 0xe97d41626cc97577ull }; // 'raise_cancel_error' sentinel 50static constexpr UniqueKey CANCEL_ERROR{ 0xe97d41626cc97577ull }; // 'raise_cancel_error' sentinel
50 51
51CancelResult thread_cancel(lua_State* L, Lane* lane_, CancelOp op_, double secs_, bool force_, double waitkill_timeout_); 52CancelOp which_cancel_op(char const* op_string_);
53CancelResult thread_cancel(Lane* lane_, CancelOp op_, int hook_count_, lua_Duration secs_, bool wake_lindas_);
52 54
53[[noreturn]] static inline void raise_cancel_error(lua_State* L) 55[[noreturn]] static inline void raise_cancel_error(lua_State* L)
54{ 56{
diff --git a/src/keeper.cpp b/src/keeper.cpp
index 937d190..0aea18e 100644
--- a/src/keeper.cpp
+++ b/src/keeper.cpp
@@ -627,7 +627,7 @@ void close_keepers(Universe* U)
627 } 627 }
628 for (int i = 0; i < nbKeepers; ++i) 628 for (int i = 0; i < nbKeepers; ++i)
629 { 629 {
630 MUTEX_FREE(&U->keepers->keeper_array[i].keeper_cs); 630 U->keepers->keeper_array[i].~Keeper();
631 } 631 }
632 // free the keeper bookkeeping structure 632 // free the keeper bookkeeping structure
633 U->internal_allocator.free(U->keepers, sizeof(Keepers) + (nbKeepers - 1) * sizeof(Keeper)); 633 U->internal_allocator.free(U->keepers, sizeof(Keepers) + (nbKeepers - 1) * sizeof(Keeper));
@@ -673,9 +673,14 @@ void init_keepers(Universe* U, lua_State* L)
673 { 673 {
674 std::ignore = luaL_error(L, "init_keepers() failed while creating keeper array; out of memory"); 674 std::ignore = luaL_error(L, "init_keepers() failed while creating keeper array; out of memory");
675 } 675 }
676 memset(U->keepers, 0, bytes); 676 U->keepers->Keepers::Keepers();
677 U->keepers->gc_threshold = keepers_gc_threshold; 677 U->keepers->gc_threshold = keepers_gc_threshold;
678 U->keepers->nb_keepers = nb_keepers; 678 U->keepers->nb_keepers = nb_keepers;
679
680 for (int i = 0; i < nb_keepers; ++i)
681 {
682 U->keepers->keeper_array[i].Keeper::Keeper();
683 }
679 } 684 }
680 for (int i = 0; i < nb_keepers; ++i) // keepersUD 685 for (int i = 0; i < nb_keepers; ++i) // keepersUD
681 { 686 {
@@ -687,10 +692,6 @@ void init_keepers(Universe* U, lua_State* L)
687 } 692 }
688 693
689 U->keepers->keeper_array[i].L = K; 694 U->keepers->keeper_array[i].L = K;
690 // we can trigger a GC from inside keeper_call(), where a keeper is acquired
691 // from there, GC can collect a linda, which would acquire the keeper again, and deadlock the thread.
692 // therefore, we need a recursive mutex.
693 MUTEX_RECURSIVE_INIT(&U->keepers->keeper_array[i].keeper_cs);
694 695
695 if (U->keepers->gc_threshold >= 0) 696 if (U->keepers->gc_threshold >= 0)
696 { 697 {
@@ -772,8 +773,7 @@ Keeper* keeper_acquire(Keepers* keepers_, uintptr_t magic_)
772 */ 773 */
773 unsigned int i = (unsigned int)((magic_ >> KEEPER_MAGIC_SHIFT) % nbKeepers); 774 unsigned int i = (unsigned int)((magic_ >> KEEPER_MAGIC_SHIFT) % nbKeepers);
774 Keeper* K = &keepers_->keeper_array[i]; 775 Keeper* K = &keepers_->keeper_array[i];
775 776 K->m_mutex.lock();
776 MUTEX_LOCK( &K->keeper_cs);
777 //++ K->count; 777 //++ K->count;
778 return K; 778 return K;
779 } 779 }
@@ -787,7 +787,7 @@ void keeper_release(Keeper* K)
787 //-- K->count; 787 //-- K->count;
788 if (K) 788 if (K)
789 { 789 {
790 MUTEX_UNLOCK(&K->keeper_cs); 790 K->m_mutex.unlock();
791 } 791 }
792} 792}
793 793
@@ -843,7 +843,6 @@ int keeper_call(Universe* U, lua_State* K, keeper_api_t func_, lua_State* L, voi
843 if ((args == 0) || luaG_inter_copy(U, L, K, args, LookupMode::ToKeeper) == 0) // L->K 843 if ((args == 0) || luaG_inter_copy(U, L, K, args, LookupMode::ToKeeper) == 0) // L->K
844 { 844 {
845 lua_call(K, 1 + args, LUA_MULTRET); 845 lua_call(K, 1 + args, LUA_MULTRET);
846
847 retvals = lua_gettop(K) - Ktos; 846 retvals = lua_gettop(K) - Ktos;
848 // note that this can raise a luaL_error while the keeper state (and its mutex) is acquired 847 // note that this can raise a luaL_error while the keeper state (and its mutex) is acquired
849 // this may interrupt a lane, causing the destruction of the underlying OS thread 848 // this may interrupt a lane, causing the destruction of the underlying OS thread
diff --git a/src/keeper.h b/src/keeper.h
index f7e3951..931c1d5 100644
--- a/src/keeper.h
+++ b/src/keeper.h
@@ -11,21 +11,23 @@ extern "C" {
11#include "threading.h" 11#include "threading.h"
12#include "uniquekey.h" 12#include "uniquekey.h"
13 13
14#include <mutex>
15
14// forwards 16// forwards
15enum class LookupMode; 17enum class LookupMode;
16struct Universe; 18struct Universe;
17 19
18struct Keeper 20struct Keeper
19{ 21{
20 MUTEX_T keeper_cs; 22 std::mutex m_mutex;
21 lua_State* L; 23 lua_State* L{ nullptr };
22 // int count; 24 // int count;
23}; 25};
24 26
25struct Keepers 27struct Keepers
26{ 28{
27 int gc_threshold{ 0 }; 29 int gc_threshold{ 0 };
28 int nb_keepers; 30 int nb_keepers{ 0 };
29 Keeper keeper_array[1]; 31 Keeper keeper_array[1];
30}; 32};
31 33
@@ -38,7 +40,7 @@ void close_keepers(Universe* U);
38 40
39Keeper* which_keeper(Keepers* keepers_, uintptr_t magic_); 41Keeper* which_keeper(Keepers* keepers_, uintptr_t magic_);
40Keeper* keeper_acquire(Keepers* keepers_, uintptr_t magic_); 42Keeper* keeper_acquire(Keepers* keepers_, uintptr_t magic_);
41void keeper_release(Keeper* K); 43void keeper_release(Keeper* K_);
42void keeper_toggle_nil_sentinels(lua_State* L, int val_i_, LookupMode const mode_); 44void keeper_toggle_nil_sentinels(lua_State* L, int val_i_, LookupMode const mode_);
43int keeper_push_linda_storage(Universe* U, lua_State* L, void* ptr_, uintptr_t magic_); 45int keeper_push_linda_storage(Universe* U, lua_State* L, void* ptr_, uintptr_t magic_);
44 46
diff --git a/src/lanes.cpp b/src/lanes.cpp
index 08584a2..4dd9b46 100644
--- a/src/lanes.cpp
+++ b/src/lanes.cpp
@@ -108,11 +108,6 @@ Lane::Lane(Universe* U_, lua_State* L_)
108: U{ U_ } 108: U{ U_ }
109, L{ L_ } 109, L{ L_ }
110{ 110{
111#if THREADWAIT_METHOD == THREADWAIT_CONDVAR
112 MUTEX_INIT(&done_lock);
113 SIGNAL_INIT(&done_signal);
114#endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR
115
116#if HAVE_LANE_TRACKING() 111#if HAVE_LANE_TRACKING()
117 if (U->tracking_first) 112 if (U->tracking_first)
118 { 113 {
@@ -121,6 +116,29 @@ Lane::Lane(Universe* U_, lua_State* L_)
121#endif // HAVE_LANE_TRACKING() 116#endif // HAVE_LANE_TRACKING()
122} 117}
123 118
119bool Lane::waitForCompletion(lua_Duration duration_)
120{
121 std::chrono::time_point<std::chrono::steady_clock> until{ std::chrono::time_point<std::chrono::steady_clock>::max() };
122 if (duration_.count() >= 0.0)
123 {
124 until = std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::steady_clock::duration>(duration_);
125 }
126
127 std::unique_lock lock{ m_done_mutex };
128 //std::stop_token token{ m_thread.get_stop_token() };
129 //return m_done_signal.wait_for(lock, token, secs_, [this](){ return status >= DONE; });
130 return m_done_signal.wait_until(lock, until, [this](){ return status >= DONE; });
131}
132
133static void lane_main(Lane* lane);
134void Lane::startThread(int priority_)
135{
136 m_thread = std::jthread([this]() { lane_main(this); });
137 if (priority_ != THREAD_PRIO_DEFAULT)
138 {
139 JTHREAD_SET_PRIORITY(m_thread, priority_);
140 }
141}
124 142
125/* Do you want full call stacks, or just the line where the error happened? 143/* Do you want full call stacks, or just the line where the error happened?
126* 144*
@@ -144,7 +162,7 @@ static void securize_debug_threadname(lua_State* L, Lane* lane_)
144} 162}
145 163
146#if ERROR_FULL_STACK 164#if ERROR_FULL_STACK
147static int lane_error( lua_State* L); 165static int lane_error(lua_State* L);
148// crc64/we of string "STACKTRACE_REGKEY" generated at http://www.nitrxgen.net/hashgen/ 166// crc64/we of string "STACKTRACE_REGKEY" generated at http://www.nitrxgen.net/hashgen/
149static constexpr UniqueKey STACKTRACE_REGKEY{ 0x534af7d3226a429full }; 167static constexpr UniqueKey STACKTRACE_REGKEY{ 0x534af7d3226a429full };
150#endif // ERROR_FULL_STACK 168#endif // ERROR_FULL_STACK
@@ -255,11 +273,6 @@ Lane::~Lane()
255{ 273{
256 // Clean up after a (finished) thread 274 // Clean up after a (finished) thread
257 // 275 //
258#if THREADWAIT_METHOD == THREADWAIT_CONDVAR
259 SIGNAL_FREE(&done_signal);
260 MUTEX_FREE(&done_lock);
261#endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR
262
263#if HAVE_LANE_TRACKING() 276#if HAVE_LANE_TRACKING()
264 if (U->tracking_first != nullptr) 277 if (U->tracking_first != nullptr)
265 { 278 {
@@ -455,26 +468,27 @@ static bool selfdestruct_remove(Lane* lane_)
455static int universe_gc( lua_State* L) 468static int universe_gc( lua_State* L)
456{ 469{
457 Universe* const U{ lua_tofulluserdata<Universe>(L, 1) }; 470 Universe* const U{ lua_tofulluserdata<Universe>(L, 1) };
471 lua_Duration const shutdown_timeout{ lua_tonumber(L, lua_upvalueindex(1)) };
472 [[maybe_unused]] char const* const op_string{ lua_tostring(L, lua_upvalueindex(2)) };
473 CancelOp const op{ which_cancel_op(op_string) };
458 474
459 while (U->selfdestruct_first != SELFDESTRUCT_END) // true at most once! 475 if (U->selfdestruct_first != SELFDESTRUCT_END)
460 { 476 {
477
461 // Signal _all_ still running threads to exit (including the timer thread) 478 // Signal _all_ still running threads to exit (including the timer thread)
462 // 479 //
463 { 480 {
464 std::lock_guard<std::mutex> guard{ U->selfdestruct_cs }; 481 std::lock_guard<std::mutex> guard{ U->selfdestruct_cs };
465 Lane* lane{ U->selfdestruct_first }; 482 Lane* lane{ U->selfdestruct_first };
483 lua_Duration timeout{ 1us };
466 while (lane != SELFDESTRUCT_END) 484 while (lane != SELFDESTRUCT_END)
467 { 485 {
468 // attempt a regular unforced hard cancel with a small timeout 486 // attempt the requested cancel with a small timeout.
469 bool const cancelled{ THREAD_ISNULL(lane->thread) || thread_cancel(L, lane, CancelOp::Hard, 0.0001, false, 0.0) != CancelResult::Timeout }; 487 // if waiting on a linda, they will raise a cancel_error.
470 // if we failed, and we know the thread is waiting on a linda 488 // if a cancellation hook is desired, it will be installed to try to raise an error
471 if (cancelled == false && lane->status == WAITING && lane->waiting_on != nullptr) 489 if (lane->m_thread.joinable())
472 { 490 {
473 // signal the linda to wake up the thread so that it can react to the cancel query 491 std::ignore = thread_cancel(lane, op, 1, timeout, true);
474 // let us hope we never land here with a pointer on a linda that has been destroyed...
475 SIGNAL_T* const waiting_on{ lane->waiting_on };
476 // lane->waiting_on = nullptr; // useful, or not?
477 SIGNAL_ALL(waiting_on);
478 } 492 }
479 lane = lane->selfdestruct_next; 493 lane = lane->selfdestruct_next;
480 } 494 }
@@ -482,47 +496,32 @@ static int universe_gc( lua_State* L)
482 496
483 // When noticing their cancel, the lanes will remove themselves from 497 // When noticing their cancel, the lanes will remove themselves from
484 // the selfdestruct chain. 498 // the selfdestruct chain.
485
486 // TBD: Not sure if Windows (multi core) will require the timed approach,
487 // or single Yield. I don't have machine to test that (so leaving
488 // for timed approach). -- AKa 25-Oct-2008
489
490 // OS X 10.5 (Intel) needs more to avoid segfaults.
491 //
492 // "make test" is okay. 100's of "make require" are okay.
493 //
494 // Tested on MacBook Core Duo 2GHz and 10.5.5:
495 // -- AKa 25-Oct-2008
496 //
497 { 499 {
498 lua_Number const shutdown_timeout = lua_tonumber(L, lua_upvalueindex(1)); 500 std::chrono::time_point<std::chrono::steady_clock> t_until{ std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::steady_clock::duration>(shutdown_timeout) };
499 double const t_until = now_secs() + shutdown_timeout;
500 501
501 while (U->selfdestruct_first != SELFDESTRUCT_END) 502 while (U->selfdestruct_first != SELFDESTRUCT_END)
502 { 503 {
503 YIELD(); // give threads time to act on their cancel 504 // give threads time to act on their cancel
505 YIELD();
506 // count the number of cancelled thread that didn't have the time to act yet
507 int n{ 0 };
504 { 508 {
505 // count the number of cancelled thread that didn't have the time to act yet 509 std::lock_guard<std::mutex> guard{ U->selfdestruct_cs };
506 int n = 0; 510 Lane* lane{ U->selfdestruct_first };
507 double t_now = 0.0; 511 while (lane != SELFDESTRUCT_END)
508 { 512 {
509 std::lock_guard<std::mutex> guard{ U->selfdestruct_cs }; 513 if (lane->cancel_request != CancelRequest::None)
510 Lane* lane{ U->selfdestruct_first }; 514 ++n;
511 while (lane != SELFDESTRUCT_END) 515 lane = lane->selfdestruct_next;
512 {
513 if (lane->cancel_request == CancelRequest::Hard)
514 ++n;
515 lane = lane->selfdestruct_next;
516 }
517 }
518 // if timeout elapsed, or we know all threads have acted, stop waiting
519 t_now = now_secs();
520 if (n == 0 || (t_now >= t_until))
521 {
522 DEBUGSPEW_CODE(fprintf(stderr, "%d uncancelled lane(s) remain after waiting %fs at process end.\n", n, shutdown_timeout - (t_until - t_now)));
523 break;
524 } 516 }
525 } 517 }
518 // if timeout elapsed, or we know all threads have acted, stop waiting
519 std::chrono::time_point<std::chrono::steady_clock> t_now = std::chrono::steady_clock::now();
520 if (n == 0 || (t_now >= t_until))
521 {
522 DEBUGSPEW_CODE(fprintf(stderr, "%d uncancelled lane(s) remain after waiting %fs at process end.\n", n, shutdown_timeout.count()));
523 break;
524 }
526 } 525 }
527 } 526 }
528 527
@@ -532,48 +531,17 @@ static int universe_gc( lua_State* L)
532 { 531 {
533 YIELD(); 532 YIELD();
534 } 533 }
535
536 //---
537 // Kill the still free running threads
538 //
539 if (U->selfdestruct_first != SELFDESTRUCT_END)
540 {
541 unsigned int n = 0;
542 // first thing we did was to raise the linda signals the threads were waiting on (if any)
543 // therefore, any well-behaved thread should be in CANCELLED state
544 // these are not running, and the state can be closed
545 {
546 std::lock_guard<std::mutex> guard{ U->selfdestruct_cs };
547 Lane* lane{ U->selfdestruct_first };
548 while (lane != SELFDESTRUCT_END)
549 {
550 Lane* const next_s{ lane->selfdestruct_next };
551 lane->selfdestruct_next = nullptr; // detach from selfdestruct chain
552 if (!THREAD_ISNULL(lane->thread)) // can be nullptr if previous 'soft' termination succeeded
553 {
554 THREAD_KILL(&lane->thread);
555#if THREADAPI == THREADAPI_PTHREAD
556 // pthread: make sure the thread is really stopped!
557 THREAD_WAIT(&lane->thread, -1, &lane->done_signal, &lane->done_lock, &lane->status);
558#endif // THREADAPI == THREADAPI_PTHREAD
559 }
560 // NO lua_close() in this case because we don't know where execution of the state was interrupted
561 delete lane;
562 lane = next_s;
563 ++n;
564 }
565 U->selfdestruct_first = SELFDESTRUCT_END;
566 }
567
568 DEBUGSPEW_CODE(fprintf(stderr, "Killed %d lane(s) at process end.\n", n));
569 }
570 } 534 }
571 535
572 // If some lanes are currently cleaning after themselves, wait until they are done. 536 // If after all this, we still have some free-running lanes, it's an external user error, they should have stopped appropriately
573 // They are no longer listed in the selfdestruct chain, but they still have to lua_close().
574 while (U->selfdestructing_count.load(std::memory_order_acquire) > 0)
575 { 537 {
576 YIELD(); 538 std::lock_guard<std::mutex> guard{ U->selfdestruct_cs };
539 Lane* lane{ U->selfdestruct_first };
540 if (lane != SELFDESTRUCT_END)
541 {
542 // this causes a leak because we don't call U's destructor (which could be bad if the still running lanes are accessing it)
543 std::ignore = luaL_error(L, "Zombie thread %s refuses to die!", lane->debug_name);
544 }
577 } 545 }
578 546
579 // necessary so that calling free_deep_prelude doesn't crash because linda_id expects a linda lightuserdata at absolute slot 1 547 // necessary so that calling free_deep_prelude doesn't crash because linda_id expects a linda lightuserdata at absolute slot 1
@@ -874,20 +842,8 @@ static char const* get_errcode_name( int _code)
874} 842}
875#endif // USE_DEBUG_SPEW() 843#endif // USE_DEBUG_SPEW()
876 844
877#if THREADWAIT_METHOD == THREADWAIT_CONDVAR // implies THREADAPI == THREADAPI_PTHREAD 845static void lane_main(Lane* lane)
878static void thread_cleanup_handler(void* opaque)
879{ 846{
880 Lane* lane{ (Lane*) opaque };
881 MUTEX_LOCK(&lane->done_lock);
882 lane->status = CANCELLED;
883 SIGNAL_ONE(&lane->done_signal); // wake up master (while 'lane->done_lock' is on)
884 MUTEX_UNLOCK(&lane->done_lock);
885}
886#endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR
887
888static THREAD_RETURN_T THREAD_CALLCONV lane_main(void* vs)
889{
890 Lane* lane{ (Lane*) vs };
891 lua_State* const L{ lane->L }; 847 lua_State* const L{ lane->L };
892 // wait until the launching thread has finished preparing L 848 // wait until the launching thread has finished preparing L
893 lane->m_ready.wait(); 849 lane->m_ready.wait();
@@ -897,8 +853,6 @@ static THREAD_RETURN_T THREAD_CALLCONV lane_main(void* vs)
897 // At this point, the lane function and arguments are on the stack 853 // At this point, the lane function and arguments are on the stack
898 int const nargs{ lua_gettop(L) - 1 }; 854 int const nargs{ lua_gettop(L) - 1 };
899 DEBUGSPEW_CODE(Universe* U = universe_get(L)); 855 DEBUGSPEW_CODE(Universe* U = universe_get(L));
900 THREAD_MAKE_ASYNCH_CANCELLABLE();
901 THREAD_CLEANUP_PUSH(thread_cleanup_handler, lane);
902 lane->status = RUNNING; // PENDING -> RUNNING 856 lane->status = RUNNING; // PENDING -> RUNNING
903 857
904 // Tie "set_finalizer()" to the state 858 // Tie "set_finalizer()" to the state
@@ -949,18 +903,19 @@ static THREAD_RETURN_T THREAD_CALLCONV lane_main(void* vs)
949 // the finalizer generated an error, and left its own error message [and stack trace] on the stack 903 // the finalizer generated an error, and left its own error message [and stack trace] on the stack
950 rc = rc2; // we're overruling the earlier script error or normal return 904 rc = rc2; // we're overruling the earlier script error or normal return
951 } 905 }
952 lane->waiting_on = nullptr; // just in case 906 lane->m_waiting_on = nullptr; // just in case
953 if (selfdestruct_remove(lane)) // check and remove (under lock!) 907 if (selfdestruct_remove(lane)) // check and remove (under lock!)
954 { 908 {
955 // We're a free-running thread and no-one's there to clean us up. 909 // We're a free-running thread and no-one's there to clean us up.
956 //
957 lua_close(lane->L); 910 lua_close(lane->L);
958 911 lane->L = nullptr; // just in case
959 lane->U->selfdestruct_cs.lock(); 912 lane->U->selfdestruct_cs.lock();
960 // done with lua_close(), terminal shutdown sequence may proceed 913 // done with lua_close(), terminal shutdown sequence may proceed
961 lane->U->selfdestructing_count.fetch_sub(1, std::memory_order_release); 914 lane->U->selfdestructing_count.fetch_sub(1, std::memory_order_release);
962 lane->U->selfdestruct_cs.unlock(); 915 lane->U->selfdestruct_cs.unlock();
963 916
917 // we destroy our jthread member from inside the thread body, so we have to detach so that we don't try to join, as this doesn't seem a good idea
918 lane->m_thread.detach();
964 delete lane; 919 delete lane;
965 lane = nullptr; 920 lane = nullptr;
966 } 921 }
@@ -972,21 +927,14 @@ static THREAD_RETURN_T THREAD_CALLCONV lane_main(void* vs)
972 enum e_status st = (rc == 0) ? DONE : CANCEL_ERROR.equals(L, 1) ? CANCELLED : ERROR_ST; 927 enum e_status st = (rc == 0) ? DONE : CANCEL_ERROR.equals(L, 1) ? CANCELLED : ERROR_ST;
973 928
974 // Posix no PTHREAD_TIMEDJOIN: 929 // Posix no PTHREAD_TIMEDJOIN:
975 // 'done_lock' protects the -> DONE|ERROR_ST|CANCELLED state change 930 // 'm_done_mutex' protects the -> DONE|ERROR_ST|CANCELLED state change
976 // 931 //
977#if THREADWAIT_METHOD == THREADWAIT_CONDVAR
978 MUTEX_LOCK(&lane->done_lock);
979 { 932 {
980#endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR 933 std::lock_guard lock{ lane->m_done_mutex };
981 lane->status = st; 934 lane->status = st;
982#if THREADWAIT_METHOD == THREADWAIT_CONDVAR 935 lane->m_done_signal.notify_one();// wake up master (while 'lane->m_done_mutex' is on)
983 SIGNAL_ONE(&lane->done_signal); // wake up master (while 'lane->done_lock' is on)
984 } 936 }
985 MUTEX_UNLOCK(&lane->done_lock);
986#endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR
987 } 937 }
988 THREAD_CLEANUP_POP(false);
989 return 0; // ignored
990} 938}
991 939
992// ################################################################################################# 940// #################################################################################################
@@ -1115,13 +1063,11 @@ LUAG_FUNC(lane_new)
1115 // leave a single cancel_error on the stack for the caller 1063 // leave a single cancel_error on the stack for the caller
1116 lua_settop(m_lane->L, 0); 1064 lua_settop(m_lane->L, 0);
1117 CANCEL_ERROR.pushKey(m_lane->L); 1065 CANCEL_ERROR.pushKey(m_lane->L);
1118#if THREADWAIT_METHOD == THREADWAIT_CONDVAR 1066 {
1119 MUTEX_LOCK(&m_lane->done_lock); 1067 std::lock_guard lock{ m_lane->m_done_mutex };
1120#endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR 1068 m_lane->status = CANCELLED;
1121 m_lane->status = CANCELLED; 1069 m_lane->m_done_signal.notify_one(); // wake up master (while 'lane->m_done_mutex' is on)
1122#if THREADWAIT_METHOD == THREADWAIT_CONDVAR 1070 }
1123 MUTEX_UNLOCK(&m_lane->done_lock);
1124#endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR
1125 // unblock the thread so that it can terminate gracefully 1071 // unblock the thread so that it can terminate gracefully
1126 m_lane->m_ready.count_down(); 1072 m_lane->m_ready.count_down();
1127 } 1073 }
@@ -1170,7 +1116,7 @@ LUAG_FUNC(lane_new)
1170 } onExit{ L, lane, gc_cb_idx }; 1116 } onExit{ L, lane, gc_cb_idx };
1171 // launch the thread early, it will sync with a std::latch to parallelize OS thread warmup and L2 preparation 1117 // launch the thread early, it will sync with a std::latch to parallelize OS thread warmup and L2 preparation
1172 DEBUGSPEW_CODE(fprintf(stderr, INDENT_BEGIN "lane_new: launching thread\n" INDENT_END)); 1118 DEBUGSPEW_CODE(fprintf(stderr, INDENT_BEGIN "lane_new: launching thread\n" INDENT_END));
1173 THREAD_CREATE(&lane->thread, lane_main, lane, priority); 1119 lane->startThread(priority);
1174 1120
1175 STACK_GROW( L2, nargs + 3); // 1121 STACK_GROW( L2, nargs + 3); //
1176 STACK_CHECK_START_REL(L2, 0); 1122 STACK_CHECK_START_REL(L2, 0);
@@ -1347,7 +1293,7 @@ LUAG_FUNC(lane_new)
1347static int lane_gc(lua_State* L) 1293static int lane_gc(lua_State* L)
1348{ 1294{
1349 bool have_gc_cb{ false }; 1295 bool have_gc_cb{ false };
1350 Lane* lane{ lua_toLane(L, 1) }; // ud 1296 Lane* const lane{ lua_toLane(L, 1) }; // ud
1351 1297
1352 // if there a gc callback? 1298 // if there a gc callback?
1353 lua_getiuservalue(L, 1, 1); // ud uservalue 1299 lua_getiuservalue(L, 1, 1); // ud uservalue
@@ -1365,30 +1311,7 @@ static int lane_gc(lua_State* L)
1365 } 1311 }
1366 1312
1367 // We can read 'lane->status' without locks, but not wait for it 1313 // We can read 'lane->status' without locks, but not wait for it
1368 // test Killed state first, as it doesn't need to enter the selfdestruct chain 1314 if (lane->status < DONE)
1369 if (lane->mstatus == Lane::Killed)
1370 {
1371 // Make sure a kill has proceeded, before cleaning up the data structure.
1372 //
1373 // NO lua_close() in this case because we don't know where execution of the state was interrupted
1374 DEBUGSPEW_CODE(fprintf(stderr, "** Joining with a killed thread (needs testing) **"));
1375 // make sure the thread is no longer running, just like thread_join()
1376 if (!THREAD_ISNULL(lane->thread))
1377 {
1378 THREAD_WAIT(&lane->thread, -1, &lane->done_signal, &lane->done_lock, &lane->status);
1379 }
1380 if (lane->status >= DONE && lane->L)
1381 {
1382 // we know the thread was killed while the Lua VM was not doing anything: we should be able to close it without crashing
1383 // now, thread_cancel() will not forcefully kill a lane with lane->status >= DONE, so I am not sure it can ever happen
1384 lua_close(lane->L);
1385 lane->L = nullptr;
1386 // just in case, but s will be freed soon so...
1387 lane->debug_name = "<gc>";
1388 }
1389 DEBUGSPEW_CODE(fprintf(stderr, "** Joined ok **"));
1390 }
1391 else if (lane->status < DONE)
1392 { 1315 {
1393 // still running: will have to be cleaned up later 1316 // still running: will have to be cleaned up later
1394 selfdestruct_add(lane); 1317 selfdestruct_add(lane);
@@ -1437,7 +1360,6 @@ static char const * thread_status_string(Lane* lane_)
1437{ 1360{
1438 enum e_status const st{ lane_->status }; // read just once (volatile) 1361 enum e_status const st{ lane_->status }; // read just once (volatile)
1439 char const* str = 1362 char const* str =
1440 (lane_->mstatus == Lane::Killed) ? "killed" : // new to v3.3.0!
1441 (st == PENDING) ? "pending" : 1363 (st == PENDING) ? "pending" :
1442 (st == RUNNING) ? "running" : // like in 'co.status()' 1364 (st == RUNNING) ? "running" : // like in 'co.status()'
1443 (st == WAITING) ? "waiting" : 1365 (st == WAITING) ? "waiting" :
@@ -1471,9 +1393,10 @@ int push_thread_status(lua_State* L, Lane* lane_)
1471LUAG_FUNC(thread_join) 1393LUAG_FUNC(thread_join)
1472{ 1394{
1473 Lane* const lane{ lua_toLane(L, 1) }; 1395 Lane* const lane{ lua_toLane(L, 1) };
1474 lua_Number const wait_secs{ luaL_optnumber(L, 2, -1.0) }; 1396 lua_Duration const duration{ luaL_optnumber(L, 2, -1.0) };
1475 lua_State* const L2{ lane->L }; 1397 lua_State* const L2{ lane->L };
1476 bool const done{ THREAD_ISNULL(lane->thread) || THREAD_WAIT(&lane->thread, wait_secs, &lane->done_signal, &lane->done_lock, &lane->status) }; 1398
1399 bool const done{ !lane->m_thread.joinable() || lane->waitForCompletion(duration) };
1477 if (!done || !L2) 1400 if (!done || !L2)
1478 { 1401 {
1479 STACK_GROW(L, 2); 1402 STACK_GROW(L, 2);
@@ -1486,58 +1409,47 @@ LUAG_FUNC(thread_join)
1486 // Thread is DONE/ERROR_ST/CANCELLED; all ours now 1409 // Thread is DONE/ERROR_ST/CANCELLED; all ours now
1487 1410
1488 int ret{ 0 }; 1411 int ret{ 0 };
1489 if (lane->mstatus == Lane::Killed) // OS thread was killed if thread_cancel was forced 1412 Universe* const U{ lane->U };
1490 { 1413 // debug_name is a pointer to string possibly interned in the lane's state, that no longer exists when the state is closed
1491 // in that case, even if the thread was killed while DONE/ERROR_ST/CANCELLED, ignore regular return values 1414 // so store it in the userdata uservalue at a key that can't possibly collide
1492 STACK_GROW(L, 2); 1415 securize_debug_threadname(L, lane);
1493 lua_pushnil(L); 1416 switch (lane->status)
1494 lua_pushliteral(L, "killed");
1495 ret = 2;
1496 }
1497 else
1498 { 1417 {
1499 Universe* const U{ lane->U }; 1418 case DONE:
1500 // debug_name is a pointer to string possibly interned in the lane's state, that no longer exists when the state is closed
1501 // so store it in the userdata uservalue at a key that can't possibly collide
1502 securize_debug_threadname(L, lane);
1503 switch (lane->status)
1504 { 1419 {
1505 case DONE: 1420 int const n{ lua_gettop(L2) }; // whole L2 stack
1421 if ((n > 0) && (luaG_inter_move(U, L2, L, n, LookupMode::LaneBody) != 0))
1506 { 1422 {
1507 int const n{ lua_gettop(L2) }; // whole L2 stack 1423 return luaL_error(L, "tried to copy unsupported types");
1508 if ((n > 0) && (luaG_inter_move(U, L2, L, n, LookupMode::LaneBody) != 0))
1509 {
1510 return luaL_error(L, "tried to copy unsupported types");
1511 }
1512 ret = n;
1513 } 1424 }
1514 break; 1425 ret = n;
1426 }
1427 break;
1515 1428
1516 case ERROR_ST: 1429 case ERROR_ST:
1430 {
1431 int const n{ lua_gettop(L2) };
1432 STACK_GROW(L, 3);
1433 lua_pushnil(L);
1434 // even when ERROR_FULL_STACK, if the error is not LUA_ERRRUN, the handler wasn't called, and we only have 1 error message on the stack ...
1435 if (luaG_inter_move(U, L2, L, n, LookupMode::LaneBody) != 0) // nil "err" [trace]
1517 { 1436 {
1518 int const n{ lua_gettop(L2) }; 1437 return luaL_error(L, "tried to copy unsupported types: %s", lua_tostring(L, -n));
1519 STACK_GROW(L, 3);
1520 lua_pushnil(L);
1521 // even when ERROR_FULL_STACK, if the error is not LUA_ERRRUN, the handler wasn't called, and we only have 1 error message on the stack ...
1522 if (luaG_inter_move(U, L2, L, n, LookupMode::LaneBody) != 0) // nil "err" [trace]
1523 {
1524 return luaL_error(L, "tried to copy unsupported types: %s", lua_tostring(L, -n));
1525 }
1526 ret = 1 + n;
1527 } 1438 }
1528 break; 1439 ret = 1 + n;
1440 }
1441 break;
1529 1442
1530 case CANCELLED: 1443 case CANCELLED:
1531 ret = 0; 1444 ret = 0;
1532 break; 1445 break;
1533 1446
1534 default: 1447 default:
1535 DEBUGSPEW_CODE(fprintf(stderr, "Status: %d\n", lane->status)); 1448 DEBUGSPEW_CODE(fprintf(stderr, "Status: %d\n", lane->status));
1536 ASSERT_L(false); 1449 ASSERT_L(false);
1537 ret = 0; 1450 ret = 0;
1538 }
1539 lua_close(L2);
1540 } 1451 }
1452 lua_close(L2);
1541 lane->L = nullptr; 1453 lane->L = nullptr;
1542 STACK_CHECK(L, ret); 1454 STACK_CHECK(L, ret);
1543 return ret; 1455 return ret;
@@ -1596,15 +1508,12 @@ LUAG_FUNC(thread_index)
1596 switch (lane->status) 1508 switch (lane->status)
1597 { 1509 {
1598 default: 1510 default:
1599 if (lane->mstatus != Lane::Killed) 1511 // this is an internal error, we probably never get here
1600 { 1512 lua_settop(L, 0);
1601 // this is an internal error, we probably never get here 1513 lua_pushliteral(L, "Unexpected status: ");
1602 lua_settop(L, 0); 1514 lua_pushstring(L, thread_status_string(lane));
1603 lua_pushliteral(L, "Unexpected status: "); 1515 lua_concat(L, 2);
1604 lua_pushstring(L, thread_status_string(lane)); 1516 raise_lua_error(L);
1605 lua_concat(L, 2);
1606 raise_lua_error(L);
1607 }
1608 [[fallthrough]]; // fall through if we are killed, as we got nil, "killed" on the stack 1517 [[fallthrough]]; // fall through if we are killed, as we got nil, "killed" on the stack
1609 1518
1610 case DONE: // got regular return values 1519 case DONE: // got regular return values
@@ -1790,8 +1699,7 @@ LUAG_FUNC(wakeup_conv)
1790 lua_pop(L,1); 1699 lua_pop(L,1);
1791 STACK_CHECK(L, 0); 1700 STACK_CHECK(L, 0);
1792 1701
1793 struct tm t; 1702 std::tm t{};
1794 memset(&t, 0, sizeof(t));
1795 t.tm_year = year - 1900; 1703 t.tm_year = year - 1900;
1796 t.tm_mon= month-1; // 0..11 1704 t.tm_mon= month-1; // 0..11
1797 t.tm_mday= day; // 1..31 1705 t.tm_mday= day; // 1..31
@@ -1800,7 +1708,7 @@ LUAG_FUNC(wakeup_conv)
1800 t.tm_sec= sec; // 0..60 1708 t.tm_sec= sec; // 0..60
1801 t.tm_isdst= isdst; // 0/1/negative 1709 t.tm_isdst= isdst; // 0/1/negative
1802 1710
1803 lua_pushnumber(L, static_cast<lua_Number>(mktime(&t))); // ms=0 1711 lua_pushnumber(L, static_cast<lua_Number>(std::mktime(&t))); // resolution: 1 second
1804 return 1; 1712 return 1;
1805} 1713}
1806 1714
@@ -1909,13 +1817,14 @@ LUAG_FUNC(configure)
1909 DEBUGSPEW_CODE(fprintf( stderr, INDENT_BEGIN "%p: lanes.configure() BEGIN\n" INDENT_END, L)); 1817 DEBUGSPEW_CODE(fprintf( stderr, INDENT_BEGIN "%p: lanes.configure() BEGIN\n" INDENT_END, L));
1910 DEBUGSPEW_CODE(if (U) U->debugspew_indent_depth.fetch_add(1, std::memory_order_relaxed)); 1818 DEBUGSPEW_CODE(if (U) U->debugspew_indent_depth.fetch_add(1, std::memory_order_relaxed));
1911 1819
1912 if(U == nullptr) 1820 if (U == nullptr)
1913 { 1821 {
1914 U = universe_create( L); // settings universe 1822 U = universe_create(L); // settings universe
1915 DEBUGSPEW_CODE(U->debugspew_indent_depth.fetch_add(1, std::memory_order_relaxed)); 1823 DEBUGSPEW_CODE(U->debugspew_indent_depth.fetch_add(1, std::memory_order_relaxed));
1916 lua_newtable( L); // settings universe mt 1824 lua_newtable( L); // settings universe mt
1917 lua_getfield(L, 1, "shutdown_timeout"); // settings universe mt shutdown_timeout 1825 lua_getfield(L, 1, "shutdown_timeout"); // settings universe mt shutdown_timeout
1918 lua_pushcclosure(L, universe_gc, 1); // settings universe mt universe_gc 1826 lua_getfield(L, 1, "shutdown_mode"); // settings universe mt shutdown_timeout shutdown_mode
1827 lua_pushcclosure(L, universe_gc, 2); // settings universe mt universe_gc
1919 lua_setfield(L, -2, "__gc"); // settings universe mt 1828 lua_setfield(L, -2, "__gc"); // settings universe mt
1920 lua_setmetatable(L, -2); // settings universe 1829 lua_setmetatable(L, -2); // settings universe
1921 lua_pop(L, 1); // settings 1830 lua_pop(L, 1); // settings
diff --git a/src/lanes.lua b/src/lanes.lua
index 6af286a..fd3d22b 100644
--- a/src/lanes.lua
+++ b/src/lanes.lua
@@ -73,6 +73,7 @@ lanes.configure = function( settings_)
73 keepers_gc_threshold = -1, 73 keepers_gc_threshold = -1,
74 on_state_create = nil, 74 on_state_create = nil,
75 shutdown_timeout = 0.25, 75 shutdown_timeout = 0.25,
76 shutdown_mode = "hard",
76 with_timers = true, 77 with_timers = true,
77 track_lanes = false, 78 track_lanes = false,
78 demote_full_userdata = nil, 79 demote_full_userdata = nil,
@@ -113,6 +114,11 @@ lanes.configure = function( settings_)
113 -- shutdown_timeout should be a number >= 0 114 -- shutdown_timeout should be a number >= 0
114 return type( val_) == "number" and val_ >= 0 115 return type( val_) == "number" and val_ >= 0
115 end, 116 end,
117 shutdown_mode = function( val_)
118 local valid_hooks = { soft = true, hard = true, call = true, ret = true, line = true, count = true }
119 -- shutdown_mode should be a known hook mask
120 return valid_hooks[val_]
121 end,
116 track_lanes = boolean_param_checker, 122 track_lanes = boolean_param_checker,
117 demote_full_userdata = boolean_param_checker, 123 demote_full_userdata = boolean_param_checker,
118 verbose_errors = boolean_param_checker 124 verbose_errors = boolean_param_checker
@@ -367,262 +373,263 @@ lanes.configure = function( settings_)
367 373
368 374
369 if settings.with_timers ~= false then 375 if settings.with_timers ~= false then
376 --
377 -- On first 'require "lanes"', a timer lane is spawned that will maintain
378 -- timer tables and sleep in between the timer events. All interaction with
379 -- the timer lane happens via a 'timer_gateway' Linda, which is common to
380 -- all that 'require "lanes"'.
381 --
382 -- Linda protocol to timer lane:
383 --
384 -- TGW_KEY: linda_h, key, [wakeup_at_secs], [repeat_secs]
385 --
386 local TGW_KEY= "(timer control)" -- the key does not matter, a 'weird' key may help debugging
387 local TGW_QUERY, TGW_REPLY = "(timer query)", "(timer reply)"
388 local first_time_key= "first time"
370 389
371 -- 390 local first_time = timer_gateway:get( first_time_key) == nil
372 -- On first 'require "lanes"', a timer lane is spawned that will maintain 391 timer_gateway:set( first_time_key, true)
373 -- timer tables and sleep in between the timer events. All interaction with
374 -- the timer lane happens via a 'timer_gateway' Linda, which is common to
375 -- all that 'require "lanes"'.
376 --
377 -- Linda protocol to timer lane:
378 --
379 -- TGW_KEY: linda_h, key, [wakeup_at_secs], [repeat_secs]
380 --
381 local TGW_KEY= "(timer control)" -- the key does not matter, a 'weird' key may help debugging
382 local TGW_QUERY, TGW_REPLY = "(timer query)", "(timer reply)"
383 local first_time_key= "first time"
384
385 local first_time = timer_gateway:get( first_time_key) == nil
386 timer_gateway:set( first_time_key, true)
387
388 --
389 -- Timer lane; initialize only on the first 'require "lanes"' instance (which naturally
390 -- has 'table' always declared)
391 --
392 if first_time then
393 392
394 local now_secs = core.now_secs 393 local now_secs = core.now_secs
395 assert( type( now_secs) == "function") 394 local wakeup_conv = core.wakeup_conv
396 ----- 395
397 -- Snore loop (run as a lane on the background)
398 --
399 -- High priority, to get trustworthy timings.
400 -- 396 --
401 -- We let the timer lane be a "free running" thread; no handle to it 397 -- Timer lane; initialize only on the first 'require "lanes"' instance (which naturally
402 -- remains. 398 -- has 'table' always declared)
403 -- 399 --
404 local timer_body = function() 400 if first_time then
405 set_debug_threadname( "LanesTimer") 401
406 -- 402 assert( type( now_secs) == "function")
407 -- { [deep_linda_lightuserdata]= { [deep_linda_lightuserdata]=linda_h, 403 -----
408 -- [key]= { wakeup_secs [,period_secs] } [, ...] }, 404 -- Snore loop (run as a lane on the background)
409 -- }
410 --
411 -- Collection of all running timers, indexed with linda's & key.
412 -- 405 --
413 -- Note that we need to use the deep lightuserdata identifiers, instead 406 -- High priority, to get trustworthy timings.
414 -- of 'linda_h' themselves as table indices. Otherwise, we'd get multiple
415 -- entries for the same timer.
416 -- 407 --
417 -- The 'hidden' reference to Linda proxy is used in 'check_timers()' but 408 -- We let the timer lane be a "free running" thread; no handle to it
418 -- also important to keep the Linda alive, even if all outside world threw 409 -- remains.
419 -- away pointers to it (which would ruin uniqueness of the deep pointer).
420 -- Now we're safe.
421 -- 410 --
422 local collection = {} 411 local timer_body = function()
423 local table_insert = assert( table.insert) 412 set_debug_threadname( "LanesTimer")
424 413 --
425 local get_timers = function() 414 -- { [deep_linda_lightuserdata]= { [deep_linda_lightuserdata]=linda_h,
426 local r = {} 415 -- [key]= { wakeup_secs [,period_secs] } [, ...] },
427 for deep, t in pairs( collection) do 416 -- }
428 -- WR( tostring( deep)) 417 --
429 local l = t[deep] 418 -- Collection of all running timers, indexed with linda's & key.
430 for key, timer_data in pairs( t) do 419 --
431 if key ~= deep then 420 -- Note that we need to use the deep lightuserdata identifiers, instead
432 table_insert( r, {l, key, timer_data}) 421 -- of 'linda_h' themselves as table indices. Otherwise, we'd get multiple
422 -- entries for the same timer.
423 --
424 -- The 'hidden' reference to Linda proxy is used in 'check_timers()' but
425 -- also important to keep the Linda alive, even if all outside world threw
426 -- away pointers to it (which would ruin uniqueness of the deep pointer).
427 -- Now we're safe.
428 --
429 local collection = {}
430 local table_insert = assert( table.insert)
431
432 local get_timers = function()
433 local r = {}
434 for deep, t in pairs( collection) do
435 -- WR( tostring( deep))
436 local l = t[deep]
437 for key, timer_data in pairs( t) do
438 if key ~= deep then
439 table_insert( r, {l, key, timer_data})
440 end
433 end 441 end
434 end 442 end
435 end 443 return r
436 return r 444 end -- get_timers()
437 end -- get_timers()
438
439 --
440 -- set_timer( linda_h, key [,wakeup_at_secs [,period_secs]] )
441 --
442 local set_timer = function( linda, key, wakeup_at, period)
443 assert( wakeup_at == nil or wakeup_at > 0.0)
444 assert( period == nil or period > 0.0)
445 445
446 local linda_deep = linda:deep()
447 assert( linda_deep)
448
449 -- Find or make a lookup for this timer
450 -- 446 --
451 local t1 = collection[linda_deep] 447 -- set_timer( linda_h, key [,wakeup_at_secs [,period_secs]] )
452 if not t1 then 448 --
453 t1 = { [linda_deep] = linda} -- proxy to use the Linda 449 local set_timer = function( linda, key, wakeup_at, period)
454 collection[linda_deep] = t1 450 assert( wakeup_at == nil or wakeup_at > 0.0)
455 end 451 assert( period == nil or period > 0.0)
456 452
457 if wakeup_at == nil then 453 local linda_deep = linda:deep()
458 -- Clear the timer 454 assert( linda_deep)
459 --
460 t1[key]= nil
461 455
462 -- Remove empty tables from collection; speeds timer checks and 456 -- Find or make a lookup for this timer
463 -- lets our 'safety reference' proxy be gc:ed as well.
464 -- 457 --
465 local empty = true 458 local t1 = collection[linda_deep]
466 for k, _ in pairs( t1) do 459 if not t1 then
467 if k ~= linda_deep then 460 t1 = { [linda_deep] = linda} -- proxy to use the Linda
468 empty = false 461 collection[linda_deep] = t1
469 break
470 end
471 end
472 if empty then
473 collection[linda_deep] = nil
474 end 462 end
475 463
476 -- Note: any unread timer value is left at 'linda[key]' intensionally; 464 if wakeup_at == nil then
477 -- clearing a timer just stops it. 465 -- Clear the timer
478 else 466 --
479 -- New timer or changing the timings 467 t1[key]= nil
480 --
481 local t2 = t1[key]
482 if not t2 then
483 t2= {}
484 t1[key]= t2
485 end
486 468
487 t2[1] = wakeup_at 469 -- Remove empty tables from collection; speeds timer checks and
488 t2[2] = period -- can be 'nil' 470 -- lets our 'safety reference' proxy be gc:ed as well.
489 end 471 --
490 end -- set_timer() 472 local empty = true
473 for k, _ in pairs( t1) do
474 if k ~= linda_deep then
475 empty = false
476 break
477 end
478 end
479 if empty then
480 collection[linda_deep] = nil
481 end
491 482
492 ----- 483 -- Note: any unread timer value is left at 'linda[key]' intensionally;
493 -- [next_wakeup_at]= check_timers() 484 -- clearing a timer just stops it.
494 -- Check timers, and wake up the ones expired (if any) 485 else
495 -- Returns the closest upcoming (remaining) wakeup time (or 'nil' if none). 486 -- New timer or changing the timings
496 local check_timers = function()
497 local now = now_secs()
498 local next_wakeup
499
500 for linda_deep,t1 in pairs(collection) do
501 for key,t2 in pairs(t1) do
502 -- 487 --
503 if key==linda_deep then 488 local t2 = t1[key]
504 -- no 'continue' in Lua :/ 489 if not t2 then
505 else 490 t2= {}
506 -- 't2': { wakeup_at_secs [,period_secs] } 491 t1[key]= t2
492 end
493
494 t2[1] = wakeup_at
495 t2[2] = period -- can be 'nil'
496 end
497 end -- set_timer()
498
499 -----
500 -- [next_wakeup_at]= check_timers()
501 -- Check timers, and wake up the ones expired (if any)
502 -- Returns the closest upcoming (remaining) wakeup time (or 'nil' if none).
503 local check_timers = function()
504 local now = now_secs()
505 local next_wakeup
506
507 for linda_deep,t1 in pairs(collection) do
508 for key,t2 in pairs(t1) do
507 -- 509 --
508 local wakeup_at= t2[1] 510 if key==linda_deep then
509 local period= t2[2] -- may be 'nil' 511 -- no 'continue' in Lua :/
510 512 else
511 if wakeup_at <= now then 513 -- 't2': { wakeup_at_secs [,period_secs] }
512 local linda= t1[linda_deep] 514 --
513 assert(linda) 515 local wakeup_at= t2[1]
514 516 local period= t2[2] -- may be 'nil'
515 linda:set( key, now ) 517
516 518 if wakeup_at <= now then
517 -- 'pairs()' allows the values to be modified (and even 519 local linda= t1[linda_deep]
518 -- removed) as far as keys are not touched 520 assert(linda)
519 521
520 if not period then 522 linda:set( key, now )
521 -- one-time timer; gone 523
522 -- 524 -- 'pairs()' allows the values to be modified (and even
523 t1[key]= nil 525 -- removed) as far as keys are not touched
524 wakeup_at= nil -- no 'continue' in Lua :/ 526
525 else 527 if not period then
526 -- repeating timer; find next wakeup (may jump multiple repeats) 528 -- one-time timer; gone
527 -- 529 --
528 repeat 530 t1[key]= nil
529 wakeup_at= wakeup_at+period 531 wakeup_at= nil -- no 'continue' in Lua :/
530 until wakeup_at > now 532 else
531 533 -- repeating timer; find next wakeup (may jump multiple repeats)
532 t2[1]= wakeup_at 534 --
535 repeat
536 wakeup_at= wakeup_at+period
537 until wakeup_at > now
538
539 t2[1]= wakeup_at
540 end
533 end 541 end
534 end
535 542
536 if wakeup_at and ((not next_wakeup) or (wakeup_at < next_wakeup)) then 543 if wakeup_at and ((not next_wakeup) or (wakeup_at < next_wakeup)) then
537 next_wakeup= wakeup_at 544 next_wakeup= wakeup_at
545 end
538 end 546 end
547 end -- t2 loop
548 end -- t1 loop
549
550 return next_wakeup -- may be 'nil'
551 end -- check_timers()
552
553 local timer_gateway_batched = timer_gateway.batched
554 set_finalizer( function( err, stk)
555 if err and type( err) ~= "userdata" then
556 WR( "LanesTimer error: "..tostring(err))
557 --elseif type( err) == "userdata" then
558 -- WR( "LanesTimer after cancel" )
559 --else
560 -- WR("LanesTimer finalized")
561 end
562 end)
563 while true do
564 local next_wakeup = check_timers()
565
566 -- Sleep until next timer to wake up, or a set/clear command
567 --
568 local secs
569 if next_wakeup then
570 secs = next_wakeup - now_secs()
571 if secs < 0 then secs = 0 end
572 end
573 local key, what = timer_gateway:receive( secs, TGW_KEY, TGW_QUERY)
574
575 if key == TGW_KEY then
576 assert( getmetatable( what) == "Linda") -- 'what' should be a linda on which the client sets a timer
577 local _, key, wakeup_at, period = timer_gateway:receive( 0, timer_gateway_batched, TGW_KEY, 3)
578 assert( key)
579 set_timer( what, key, wakeup_at, period and period > 0 and period or nil)
580 elseif key == TGW_QUERY then
581 if what == "get_timers" then
582 timer_gateway:send( TGW_REPLY, get_timers())
583 else
584 timer_gateway:send( TGW_REPLY, "unknown query " .. what)
539 end 585 end
540 end -- t2 loop 586 --elseif secs == nil then -- got no value while block-waiting?
541 end -- t1 loop 587 -- WR( "timer lane: no linda, aborted?")
542 588 end
543 return next_wakeup -- may be 'nil'
544 end -- check_timers()
545
546 local timer_gateway_batched = timer_gateway.batched
547 set_finalizer( function( err, stk)
548 if err and type( err) ~= "userdata" then
549 WR( "LanesTimer error: "..tostring(err))
550 --elseif type( err) == "userdata" then
551 -- WR( "LanesTimer after cancel" )
552 --else
553 -- WR("LanesTimer finalized")
554 end 589 end
555 end) 590 end -- timer_body()
556 while true do 591 timer_lane = gen( "*", { package= {}, priority = max_prio}, timer_body)() -- "*" instead of "io,package" for LuaJIT compatibility...
557 local next_wakeup = check_timers() 592 end -- first_time
558 593
559 -- Sleep until next timer to wake up, or a set/clear command 594 -----
595 -- = timer( linda_h, key_val, date_tbl|first_secs [,period_secs] )
596 --
597 -- PUBLIC LANES API
598 timer = function( linda, key, a, period )
599 if getmetatable( linda) ~= "Linda" then
600 error "expecting a Linda"
601 end
602 if a == 0.0 then
603 -- Caller expects to get current time stamp in Linda, on return
604 -- (like the timer had expired instantly); it would be good to set this
605 -- as late as possible (to give most current time) but also we want it
606 -- to precede any possible timers that might start striking.
560 -- 607 --
561 local secs 608 linda:set( key, now_secs())
562 if next_wakeup then 609
563 secs = next_wakeup - now_secs() 610 if not period or period==0.0 then
564 if secs < 0 then secs = 0 end 611 timer_gateway:send( TGW_KEY, linda, key, nil, nil ) -- clear the timer
565 end 612 return -- nothing more to do
566 local key, what = timer_gateway:receive( secs, TGW_KEY, TGW_QUERY)
567
568 if key == TGW_KEY then
569 assert( getmetatable( what) == "Linda") -- 'what' should be a linda on which the client sets a timer
570 local _, key, wakeup_at, period = timer_gateway:receive( 0, timer_gateway_batched, TGW_KEY, 3)
571 assert( key)
572 set_timer( what, key, wakeup_at, period and period > 0 and period or nil)
573 elseif key == TGW_QUERY then
574 if what == "get_timers" then
575 timer_gateway:send( TGW_REPLY, get_timers())
576 else
577 timer_gateway:send( TGW_REPLY, "unknown query " .. what)
578 end
579 --elseif secs == nil then -- got no value while block-waiting?
580 -- WR( "timer lane: no linda, aborted?")
581 end 613 end
614 a= period
582 end 615 end
583 end -- timer_body()
584 timer_lane = gen( "*", { package= {}, priority = max_prio}, timer_body)() -- "*" instead of "io,package" for LuaJIT compatibility...
585 end -- first_time
586 616
587 ----- 617 local wakeup_at= type(a)=="table" and wakeup_conv(a) -- given point of time
588 -- = timer( linda_h, key_val, date_tbl|first_secs [,period_secs] ) 618 or (a and now_secs()+a or nil)
589 -- 619 -- queue to timer
590 -- PUBLIC LANES API
591 timer = function( linda, key, a, period )
592 if getmetatable( linda) ~= "Linda" then
593 error "expecting a Linda"
594 end
595 if a == 0.0 then
596 -- Caller expects to get current time stamp in Linda, on return
597 -- (like the timer had expired instantly); it would be good to set this
598 -- as late as possible (to give most current time) but also we want it
599 -- to precede any possible timers that might start striking.
600 -- 620 --
601 linda:set( key, core.now_secs()) 621 timer_gateway:send( TGW_KEY, linda, key, wakeup_at, period )
622 end -- timer()
602 623
603 if not period or period==0.0 then 624 -----
604 timer_gateway:send( TGW_KEY, linda, key, nil, nil ) -- clear the timer 625 -- {[{linda, slot, when, period}[,...]]} = timers()
605 return -- nothing more to do
606 end
607 a= period
608 end
609
610 local wakeup_at= type(a)=="table" and core.wakeup_conv(a) -- given point of time
611 or (a and core.now_secs()+a or nil)
612 -- queue to timer
613 -- 626 --
614 timer_gateway:send( TGW_KEY, linda, key, wakeup_at, period ) 627 -- PUBLIC LANES API
615 end 628 timers = function()
616 629 timer_gateway:send( TGW_QUERY, "get_timers")
617 ----- 630 local _, r = timer_gateway:receive( TGW_REPLY)
618 -- {[{linda, slot, when, period}[,...]]} = timers() 631 return r
619 -- 632 end -- timers()
620 -- PUBLIC LANES API
621 timers = function()
622 timer_gateway:send( TGW_QUERY, "get_timers")
623 local _, r = timer_gateway:receive( TGW_REPLY)
624 return r
625 end
626 633
627 end -- settings.with_timers 634 end -- settings.with_timers
628 635
diff --git a/src/lanes_private.h b/src/lanes_private.h
index bcc3014..01d43c0 100644
--- a/src/lanes_private.h
+++ b/src/lanes_private.h
@@ -4,27 +4,26 @@
4#include "uniquekey.h" 4#include "uniquekey.h"
5#include "universe.h" 5#include "universe.h"
6 6
7#include <chrono>
8#include <condition_variable>
7#include <latch> 9#include <latch>
10#include <stop_token>
11#include <thread>
8 12
9// NOTE: values to be changed by either thread, during execution, without 13// NOTE: values to be changed by either thread, during execution, without
10// locking, are marked "volatile" 14// locking, are marked "volatile"
11// 15//
12class Lane 16class Lane
13{ 17{
14 private:
15
16 enum class ThreadStatus
17 {
18 Normal, // normal master side state
19 Killed // issued an OS kill
20 };
21
22 public: 18 public:
23 19
24 using enum ThreadStatus; 20 // the thread
25 21 std::jthread m_thread;
26 THREAD_T thread; 22 // a latch to wait for the lua_State to be ready
27 std::latch m_ready{ 1 }; 23 std::latch m_ready{ 1 };
24 // to wait for stop requests through m_thread's stop_source
25 std::mutex m_done_mutex;
26 std::condition_variable m_done_signal; // use condition_variable_any if waiting for a stop_token
28 // 27 //
29 // M: sub-thread OS thread 28 // M: sub-thread OS thread
30 // S: not used 29 // S: not used
@@ -42,7 +41,7 @@ class Lane
42 // M: sets to PENDING (before launching) 41 // M: sets to PENDING (before launching)
43 // S: updates -> RUNNING/WAITING -> DONE/ERROR_ST/CANCELLED 42 // S: updates -> RUNNING/WAITING -> DONE/ERROR_ST/CANCELLED
44 43
45 SIGNAL_T* volatile waiting_on{ nullptr }; 44 std::condition_variable* volatile m_waiting_on{ nullptr };
46 // 45 //
47 // When status is WAITING, points on the linda's signal the thread waits on, else nullptr 46 // When status is WAITING, points on the linda's signal the thread waits on, else nullptr
48 47
@@ -51,23 +50,6 @@ class Lane
51 // M: sets to false, flags true for cancel request 50 // M: sets to false, flags true for cancel request
52 // S: reads to see if cancel is requested 51 // S: reads to see if cancel is requested
53 52
54#if THREADWAIT_METHOD == THREADWAIT_CONDVAR
55 SIGNAL_T done_signal;
56 //
57 // M: Waited upon at lane ending (if Posix with no PTHREAD_TIMEDJOIN)
58 // S: sets the signal once cancellation is noticed (avoids a kill)
59
60 MUTEX_T done_lock;
61 //
62 // Lock required by 'done_signal' condition variable, protecting
63 // lane status changes to DONE/ERROR_ST/CANCELLED.
64#endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR
65
66 volatile ThreadStatus mstatus{ Normal };
67 //
68 // M: sets to Normal, if issued a kill changes to Killed
69 // S: not used
70
71 Lane* volatile selfdestruct_next{ nullptr }; 53 Lane* volatile selfdestruct_next{ nullptr };
72 // 54 //
73 // M: sets to non-nullptr if facing lane handle '__gc' cycle but the lane 55 // M: sets to non-nullptr if facing lane handle '__gc' cycle but the lane
@@ -88,6 +70,9 @@ class Lane
88 70
89 Lane(Universe* U_, lua_State* L_); 71 Lane(Universe* U_, lua_State* L_);
90 ~Lane(); 72 ~Lane();
73
74 bool waitForCompletion(lua_Duration duration_);
75 void startThread(int priority_);
91}; 76};
92 77
93// xxh64 of string "LANE_POINTER_REGKEY" generated at https://www.pelock.com/products/hash-calculator 78// xxh64 of string "LANE_POINTER_REGKEY" generated at https://www.pelock.com/products/hash-calculator
diff --git a/src/linda.cpp b/src/linda.cpp
index 5ee4768..ea1410e 100644
--- a/src/linda.cpp
+++ b/src/linda.cpp
@@ -61,8 +61,8 @@ class Linda : public DeepPrelude // Deep userdata MUST start with this header
61 61
62 public: 62 public:
63 63
64 SIGNAL_T read_happened; 64 std::condition_variable m_read_happened;
65 SIGNAL_T write_happened; 65 std::condition_variable m_write_happened;
66 Universe* const U; // the universe this linda belongs to 66 Universe* const U; // the universe this linda belongs to
67 uintptr_t const group; // a group to control keeper allocation between lindas 67 uintptr_t const group; // a group to control keeper allocation between lindas
68 CancelRequest simulate_cancel{ CancelRequest::None }; 68 CancelRequest simulate_cancel{ CancelRequest::None };
@@ -81,17 +81,11 @@ class Linda : public DeepPrelude // Deep userdata MUST start with this header
81 : U{ U_ } 81 : U{ U_ }
82 , group{ group_ << KEEPER_MAGIC_SHIFT } 82 , group{ group_ << KEEPER_MAGIC_SHIFT }
83 { 83 {
84 SIGNAL_INIT(&read_happened);
85 SIGNAL_INIT(&write_happened);
86
87 setName(name_, len_); 84 setName(name_, len_);
88 } 85 }
89 86
90 ~Linda() 87 ~Linda()
91 { 88 {
92 // There aren't any lanes waiting on these lindas, since all proxies have been gc'ed. Right?
93 SIGNAL_FREE(&read_happened);
94 SIGNAL_FREE(&write_happened);
95 if (std::holds_alternative<AllocatedName>(m_name)) 89 if (std::holds_alternative<AllocatedName>(m_name))
96 { 90 {
97 AllocatedName& name = std::get<AllocatedName>(m_name); 91 AllocatedName& name = std::get<AllocatedName>(m_name);
@@ -216,15 +210,19 @@ LUAG_FUNC(linda_protected_call)
216LUAG_FUNC(linda_send) 210LUAG_FUNC(linda_send)
217{ 211{
218 Linda* const linda{ lua_toLinda<false>(L, 1) }; 212 Linda* const linda{ lua_toLinda<false>(L, 1) };
219 time_d timeout{ -1.0 }; 213 std::chrono::time_point<std::chrono::steady_clock> until{ std::chrono::time_point<std::chrono::steady_clock>::max() };
220 int key_i{ 2 }; // index of first key, if timeout not there 214 int key_i{ 2 }; // index of first key, if timeout not there
221 215
222 if (lua_type(L, 2) == LUA_TNUMBER) // we don't want to use lua_isnumber() because of autocoercion 216 if (lua_type(L, 2) == LUA_TNUMBER) // we don't want to use lua_isnumber() because of autocoercion
223 { 217 {
224 timeout = SIGNAL_TIMEOUT_PREPARE(lua_tonumber(L, 2)); 218 lua_Duration const duration{ lua_tonumber(L, 2) };
219 if (duration.count() >= 0.0)
220 {
221 until = std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::steady_clock::duration>(duration);
222 }
225 ++key_i; 223 ++key_i;
226 } 224 }
227 else if (lua_isnil(L, 2)) // alternate explicit "no timeout" by passing nil before the key 225 else if (lua_isnil(L, 2)) // alternate explicit "infinite timeout" by passing nil before the key
228 { 226 {
229 ++key_i; 227 ++key_i;
230 } 228 }
@@ -266,6 +264,7 @@ LUAG_FUNC(linda_send)
266 lua_State* const KL{ K ? K->L : nullptr }; 264 lua_State* const KL{ K ? K->L : nullptr };
267 if (KL == nullptr) 265 if (KL == nullptr)
268 return 0; 266 return 0;
267
269 STACK_CHECK_START_REL(KL, 0); 268 STACK_CHECK_START_REL(KL, 0);
270 for (bool try_again{ true };;) 269 for (bool try_again{ true };;)
271 { 270 {
@@ -295,12 +294,12 @@ LUAG_FUNC(linda_send)
295 if (ret) 294 if (ret)
296 { 295 {
297 // Wake up ALL waiting threads 296 // Wake up ALL waiting threads
298 SIGNAL_ALL(&linda->write_happened); 297 linda->m_write_happened.notify_all();
299 break; 298 break;
300 } 299 }
301 300
302 // instant timout to bypass the wait syscall 301 // instant timout to bypass the wait syscall
303 if (timeout == 0.0) 302 if (std::chrono::steady_clock::now() >= until)
304 { 303 {
305 break; /* no wait; instant timeout */ 304 break; /* no wait; instant timeout */
306 } 305 }
@@ -314,14 +313,17 @@ LUAG_FUNC(linda_send)
314 prev_status = lane->status; // RUNNING, most likely 313 prev_status = lane->status; // RUNNING, most likely
315 ASSERT_L(prev_status == RUNNING); // but check, just in case 314 ASSERT_L(prev_status == RUNNING); // but check, just in case
316 lane->status = WAITING; 315 lane->status = WAITING;
317 ASSERT_L(lane->waiting_on == nullptr); 316 ASSERT_L(lane->m_waiting_on == nullptr);
318 lane->waiting_on = &linda->read_happened; 317 lane->m_waiting_on = &linda->m_read_happened;
319 } 318 }
320 // could not send because no room: wait until some data was read before trying again, or until timeout is reached 319 // could not send because no room: wait until some data was read before trying again, or until timeout is reached
321 try_again = SIGNAL_WAIT(&linda->read_happened, &K->keeper_cs, timeout); 320 std::unique_lock<std::mutex> keeper_lock{ K->m_mutex, std::adopt_lock };
321 std::cv_status const status{ linda->m_read_happened.wait_until(keeper_lock, until) };
322 keeper_lock.release(); // we don't want to release the lock!
323 try_again = (status == std::cv_status::no_timeout); // detect spurious wakeups
322 if (lane != nullptr) 324 if (lane != nullptr)
323 { 325 {
324 lane->waiting_on = nullptr; 326 lane->m_waiting_on = nullptr;
325 lane->status = prev_status; 327 lane->status = prev_status;
326 } 328 }
327 } 329 }
@@ -369,21 +371,24 @@ static constexpr UniqueKey BATCH_SENTINEL{ 0x2DDFEE0968C62AA7ull };
369LUAG_FUNC(linda_receive) 371LUAG_FUNC(linda_receive)
370{ 372{
371 Linda* const linda{ lua_toLinda<false>(L, 1) }; 373 Linda* const linda{ lua_toLinda<false>(L, 1) };
372 374 std::chrono::time_point<std::chrono::steady_clock> until{ std::chrono::time_point<std::chrono::steady_clock>::max() };
373 time_d timeout{ -1.0 }; 375 int key_i{ 2 }; // index of first key, if timeout not there
374 int key_i{ 2 };
375 376
376 if (lua_type(L, 2) == LUA_TNUMBER) // we don't want to use lua_isnumber() because of autocoercion 377 if (lua_type(L, 2) == LUA_TNUMBER) // we don't want to use lua_isnumber() because of autocoercion
377 { 378 {
378 timeout = SIGNAL_TIMEOUT_PREPARE(lua_tonumber(L, 2)); 379 lua_Duration const duration{ lua_tonumber(L, 2) };
380 if (duration.count() >= 0.0)
381 {
382 until = std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::steady_clock::duration>(duration);
383 }
379 ++key_i; 384 ++key_i;
380 } 385 }
381 else if (lua_isnil(L, 2)) // alternate explicit "no timeout" by passing nil before the key 386 else if (lua_isnil(L, 2)) // alternate explicit "infinite timeout" by passing nil before the key
382 { 387 {
383 ++key_i; 388 ++key_i;
384 } 389 }
385 390
386 keeper_api_t keeper_receive; 391 keeper_api_t selected_keeper_receive{ nullptr };
387 int expected_pushed_min{ 0 }, expected_pushed_max{ 0 }; 392 int expected_pushed_min{ 0 }, expected_pushed_max{ 0 };
388 // are we in batched mode? 393 // are we in batched mode?
389 BATCH_SENTINEL.pushKey(L); 394 BATCH_SENTINEL.pushKey(L);
@@ -396,7 +401,7 @@ LUAG_FUNC(linda_receive)
396 // make sure the keys are of a valid type 401 // make sure the keys are of a valid type
397 check_key_types(L, key_i, key_i); 402 check_key_types(L, key_i, key_i);
398 // receive multiple values from a single slot 403 // receive multiple values from a single slot
399 keeper_receive = KEEPER_API(receive_batched); 404 selected_keeper_receive = KEEPER_API(receive_batched);
400 // we expect a user-defined amount of return value 405 // we expect a user-defined amount of return value
401 expected_pushed_min = (int) luaL_checkinteger(L, key_i + 1); 406 expected_pushed_min = (int) luaL_checkinteger(L, key_i + 1);
402 expected_pushed_max = (int) luaL_optinteger(L, key_i + 2, expected_pushed_min); 407 expected_pushed_max = (int) luaL_optinteger(L, key_i + 2, expected_pushed_min);
@@ -413,17 +418,20 @@ LUAG_FUNC(linda_receive)
413 // make sure the keys are of a valid type 418 // make sure the keys are of a valid type
414 check_key_types(L, key_i, lua_gettop(L)); 419 check_key_types(L, key_i, lua_gettop(L));
415 // receive a single value, checking multiple slots 420 // receive a single value, checking multiple slots
416 keeper_receive = KEEPER_API(receive); 421 selected_keeper_receive = KEEPER_API(receive);
417 // we expect a single (value, key) pair of returned values 422 // we expect a single (value, key) pair of returned values
418 expected_pushed_min = expected_pushed_max = 2; 423 expected_pushed_min = expected_pushed_max = 2;
419 } 424 }
420 425
421 Lane* const lane{ LANE_POINTER_REGKEY.readLightUserDataValue<Lane>(L) }; 426 Lane* const lane{ LANE_POINTER_REGKEY.readLightUserDataValue<Lane>(L) };
422 Keeper* const K{ which_keeper(linda->U->keepers, linda->hashSeed()) }; 427 Keeper* const K{ which_keeper(linda->U->keepers, linda->hashSeed()) };
423 if (K == nullptr) 428 lua_State* const KL{ K ? K->L : nullptr };
429 if (KL == nullptr)
424 return 0; 430 return 0;
431
425 CancelRequest cancel{ CancelRequest::None }; 432 CancelRequest cancel{ CancelRequest::None };
426 int pushed{ 0 }; 433 int pushed{ 0 };
434 STACK_CHECK_START_REL(KL, 0);
427 for (bool try_again{ true };;) 435 for (bool try_again{ true };;)
428 { 436 {
429 if (lane != nullptr) 437 if (lane != nullptr)
@@ -439,7 +447,7 @@ LUAG_FUNC(linda_receive)
439 } 447 }
440 448
441 // all arguments of receive() but the first are passed to the keeper's receive function 449 // all arguments of receive() but the first are passed to the keeper's receive function
442 pushed = keeper_call(linda->U, K->L, keeper_receive, L, linda, key_i); 450 pushed = keeper_call(linda->U, KL, selected_keeper_receive, L, linda, key_i);
443 if (pushed < 0) 451 if (pushed < 0)
444 { 452 {
445 break; 453 break;
@@ -451,11 +459,11 @@ LUAG_FUNC(linda_receive)
451 keeper_toggle_nil_sentinels(L, lua_gettop(L) - pushed, LookupMode::FromKeeper); 459 keeper_toggle_nil_sentinels(L, lua_gettop(L) - pushed, LookupMode::FromKeeper);
452 // To be done from within the 'K' locking area 460 // To be done from within the 'K' locking area
453 // 461 //
454 SIGNAL_ALL(&linda->read_happened); 462 linda->m_read_happened.notify_all();
455 break; 463 break;
456 } 464 }
457 465
458 if (timeout == 0.0) 466 if (std::chrono::steady_clock::now() >= until)
459 { 467 {
460 break; /* instant timeout */ 468 break; /* instant timeout */
461 } 469 }
@@ -469,18 +477,22 @@ LUAG_FUNC(linda_receive)
469 prev_status = lane->status; // RUNNING, most likely 477 prev_status = lane->status; // RUNNING, most likely
470 ASSERT_L(prev_status == RUNNING); // but check, just in case 478 ASSERT_L(prev_status == RUNNING); // but check, just in case
471 lane->status = WAITING; 479 lane->status = WAITING;
472 ASSERT_L(lane->waiting_on == nullptr); 480 ASSERT_L(lane->m_waiting_on == nullptr);
473 lane->waiting_on = &linda->write_happened; 481 lane->m_waiting_on = &linda->m_write_happened;
474 } 482 }
475 // not enough data to read: wakeup when data was sent, or when timeout is reached 483 // not enough data to read: wakeup when data was sent, or when timeout is reached
476 try_again = SIGNAL_WAIT(&linda->write_happened, &K->keeper_cs, timeout); 484 std::unique_lock<std::mutex> keeper_lock{ K->m_mutex, std::adopt_lock };
485 std::cv_status const status{ linda->m_write_happened.wait_until(keeper_lock, until) };
486 keeper_lock.release(); // we don't want to release the lock!
487 try_again = (status == std::cv_status::no_timeout); // detect spurious wakeups
477 if (lane != nullptr) 488 if (lane != nullptr)
478 { 489 {
479 lane->waiting_on = nullptr; 490 lane->m_waiting_on = nullptr;
480 lane->status = prev_status; 491 lane->status = prev_status;
481 } 492 }
482 } 493 }
483 } 494 }
495 STACK_CHECK(KL, 0);
484 496
485 if (pushed < 0) 497 if (pushed < 0)
486 { 498 {
@@ -537,13 +549,13 @@ LUAG_FUNC(linda_set)
537 if (has_value) 549 if (has_value)
538 { 550 {
539 // we put some data in the slot, tell readers that they should wake 551 // we put some data in the slot, tell readers that they should wake
540 SIGNAL_ALL(&linda->write_happened); // To be done from within the 'K' locking area 552 linda->m_write_happened.notify_all(); // To be done from within the 'K' locking area
541 } 553 }
542 if (pushed == 1) 554 if (pushed == 1)
543 { 555 {
544 // the key was full, but it is no longer the case, tell writers they should wake 556 // the key was full, but it is no longer the case, tell writers they should wake
545 ASSERT_L(lua_type(L, -1) == LUA_TBOOLEAN && lua_toboolean(L, -1) == 1); 557 ASSERT_L(lua_type(L, -1) == LUA_TBOOLEAN && lua_toboolean(L, -1) == 1);
546 SIGNAL_ALL(&linda->read_happened); // To be done from within the 'K' locking area 558 linda->m_read_happened.notify_all(); // To be done from within the 'K' locking area
547 } 559 }
548 } 560 }
549 } 561 }
@@ -648,7 +660,7 @@ LUAG_FUNC( linda_limit)
648 if( pushed == 1) 660 if( pushed == 1)
649 { 661 {
650 ASSERT_L( lua_type( L, -1) == LUA_TBOOLEAN && lua_toboolean( L, -1) == 1); 662 ASSERT_L( lua_type( L, -1) == LUA_TBOOLEAN && lua_toboolean( L, -1) == 1);
651 SIGNAL_ALL( &linda->read_happened); // To be done from within the 'K' locking area 663 linda->m_read_happened.notify_all(); // To be done from within the 'K' locking area
652 } 664 }
653 } 665 }
654 else // linda is cancelled 666 else // linda is cancelled
@@ -678,8 +690,8 @@ LUAG_FUNC(linda_cancel)
678 linda->simulate_cancel = CancelRequest::Soft; 690 linda->simulate_cancel = CancelRequest::Soft;
679 if (strcmp(who, "both") == 0) // tell everyone writers to wake up 691 if (strcmp(who, "both") == 0) // tell everyone writers to wake up
680 { 692 {
681 SIGNAL_ALL(&linda->write_happened); 693 linda->m_write_happened.notify_all();
682 SIGNAL_ALL(&linda->read_happened); 694 linda->m_read_happened.notify_all();
683 } 695 }
684 else if (strcmp(who, "none") == 0) // reset flag 696 else if (strcmp(who, "none") == 0) // reset flag
685 { 697 {
@@ -687,11 +699,11 @@ LUAG_FUNC(linda_cancel)
687 } 699 }
688 else if (strcmp(who, "read") == 0) // tell blocked readers to wake up 700 else if (strcmp(who, "read") == 0) // tell blocked readers to wake up
689 { 701 {
690 SIGNAL_ALL(&linda->write_happened); 702 linda->m_write_happened.notify_all();
691 } 703 }
692 else if (strcmp(who, "write") == 0) // tell blocked writers to wake up 704 else if (strcmp(who, "write") == 0) // tell blocked writers to wake up
693 { 705 {
694 SIGNAL_ALL(&linda->read_happened); 706 linda->m_read_happened.notify_all();
695 } 707 }
696 else 708 else
697 { 709 {
diff --git a/src/macros_and_utils.h b/src/macros_and_utils.h
index e29e7fb..997b452 100644
--- a/src/macros_and_utils.h
+++ b/src/macros_and_utils.h
@@ -11,9 +11,12 @@ extern "C" {
11#endif // __cplusplus 11#endif // __cplusplus
12 12
13#include <cassert> 13#include <cassert>
14#include <chrono>
14#include <tuple> 15#include <tuple>
15#include <type_traits> 16#include <type_traits>
16 17
18using namespace std::chrono_literals;
19
17#define USE_DEBUG_SPEW() 0 20#define USE_DEBUG_SPEW() 0
18#if USE_DEBUG_SPEW() 21#if USE_DEBUG_SPEW()
19extern char const* debugspew_indent; 22extern char const* debugspew_indent;
@@ -167,3 +170,5 @@ T* lua_newuserdatauv(lua_State* L, int nuvalue_)
167 std::ignore = lua_error(L); // doesn't return 170 std::ignore = lua_error(L); // doesn't return
168 assert(false); // we should never get here, but i'm paranoid 171 assert(false); // we should never get here, but i'm paranoid
169} 172}
173
174using lua_Duration = std::chrono::template duration<lua_Number>;
diff --git a/src/threading.cpp b/src/threading.cpp
index afeb184..fc20931 100644
--- a/src/threading.cpp
+++ b/src/threading.cpp
@@ -93,9 +93,6 @@ THE SOFTWARE.
93# pragma warning( disable : 4054 ) 93# pragma warning( disable : 4054 )
94#endif 94#endif
95 95
96//#define THREAD_CREATE_RETRIES_MAX 20
97 // loops (maybe retry forever?)
98
99/* 96/*
100* FAIL is for unexpected API return values - essentially programming 97* FAIL is for unexpected API return values - essentially programming
101* error in _this_ code. 98* error in _this_ code.
@@ -196,36 +193,6 @@ time_d now_secs(void) {
196} 193}
197 194
198 195
199/*
200*/
201time_d SIGNAL_TIMEOUT_PREPARE( double secs ) {
202 if (secs<=0.0) return secs;
203 else return now_secs() + secs;
204}
205
206
207#if THREADAPI == THREADAPI_PTHREAD
208/*
209* Prepare 'abs_secs' kind of timeout to 'timespec' format
210*/
211static void prepare_timeout( struct timespec *ts, time_d abs_secs ) {
212 assert(ts);
213 assert( abs_secs >= 0.0 );
214
215 if (abs_secs==0.0)
216 abs_secs= now_secs();
217
218 ts->tv_sec= (time_t) floor( abs_secs );
219 ts->tv_nsec= ((long)((abs_secs - ts->tv_sec) * 1000.0 +0.5)) * 1000000UL; // 1ms = 1000000ns
220 if (ts->tv_nsec == 1000000000UL)
221 {
222 ts->tv_nsec = 0;
223 ts->tv_sec = ts->tv_sec + 1;
224 }
225}
226#endif // THREADAPI == THREADAPI_PTHREAD
227
228
229/*---=== Threading ===---*/ 196/*---=== Threading ===---*/
230 197
231//--- 198//---
@@ -268,30 +235,6 @@ static void prepare_timeout( struct timespec *ts, time_d abs_secs ) {
268 235
269#if THREADAPI == THREADAPI_WINDOWS 236#if THREADAPI == THREADAPI_WINDOWS
270 237
271#if _WIN32_WINNT < 0x0600 // CONDITION_VARIABLE aren't available
272 //
273 void MUTEX_INIT( MUTEX_T *ref ) {
274 *ref= CreateMutex( nullptr /*security attr*/, false /*not locked*/, nullptr );
275 if (!ref) FAIL( "CreateMutex", GetLastError() );
276 }
277 void MUTEX_FREE( MUTEX_T *ref ) {
278 if (!CloseHandle(*ref)) FAIL( "CloseHandle (mutex)", GetLastError() );
279 *ref= nullptr;
280 }
281 void MUTEX_LOCK( MUTEX_T *ref )
282 {
283 DWORD rc = WaitForSingleObject( *ref, INFINITE);
284 // ERROR_WAIT_NO_CHILDREN means a thread was killed (lane terminated because of error raised during a linda transfer for example) while having grabbed this mutex
285 // this is not a big problem as we will grab it just the same, so ignore this particular error
286 if( rc != 0 && rc != ERROR_WAIT_NO_CHILDREN)
287 FAIL( "WaitForSingleObject", (rc == WAIT_FAILED) ? GetLastError() : rc);
288 }
289 void MUTEX_UNLOCK( MUTEX_T *ref ) {
290 if (!ReleaseMutex(*ref))
291 FAIL( "ReleaseMutex", GetLastError() );
292 }
293#endif // CONDITION_VARIABLE aren't available
294
295static int const gs_prio_remap[] = 238static int const gs_prio_remap[] =
296{ 239{
297 THREAD_PRIORITY_IDLE, 240 THREAD_PRIORITY_IDLE,
@@ -303,37 +246,7 @@ static int const gs_prio_remap[] =
303 THREAD_PRIORITY_TIME_CRITICAL 246 THREAD_PRIORITY_TIME_CRITICAL
304}; 247};
305 248
306/* MSDN: "If you would like to use the CRT in ThreadProc, use the 249// ###############################################################################################
307_beginthreadex function instead (of CreateThread)."
308MSDN: "you can create at most 2028 threads"
309*/
310// Note: Visual C++ requires '__stdcall' where it is
311void THREAD_CREATE( THREAD_T* ref, THREAD_RETURN_T (__stdcall *func)( void*), void* data, int prio /* -3..+3 */)
312{
313 HANDLE h = (HANDLE) _beginthreadex(nullptr, // security
314 _THREAD_STACK_SIZE,
315 func,
316 data,
317 0, // flags (0/CREATE_SUSPENDED)
318 nullptr // thread id (not used)
319 );
320
321 if (h == nullptr) // _beginthreadex returns 0L on failure instead of -1L (like _beginthread)
322 {
323 FAIL( "CreateThread", GetLastError());
324 }
325
326 if (prio != THREAD_PRIO_DEFAULT)
327 {
328 if (!SetThreadPriority( h, gs_prio_remap[prio + 3]))
329 {
330 FAIL( "SetThreadPriority", GetLastError());
331 }
332 }
333
334 *ref = h;
335}
336
337 250
338void THREAD_SET_PRIORITY( int prio) 251void THREAD_SET_PRIORITY( int prio)
339{ 252{
@@ -344,42 +257,26 @@ void THREAD_SET_PRIORITY( int prio)
344 } 257 }
345} 258}
346 259
347void THREAD_SET_AFFINITY( unsigned int aff) 260// ###############################################################################################
261
262void JTHREAD_SET_PRIORITY(std::jthread& thread_, int prio_)
348{ 263{
349 if( !SetThreadAffinityMask( GetCurrentThread(), aff)) 264 // prio range [-3,+3] was checked by the caller
265 if (!SetThreadPriority(thread_.native_handle(), gs_prio_remap[prio_ + 3]))
350 { 266 {
351 FAIL( "THREAD_SET_AFFINITY", GetLastError()); 267 FAIL("JTHREAD_SET_PRIORITY", GetLastError());
352 } 268 }
353} 269}
354 270
355bool THREAD_WAIT_IMPL( THREAD_T *ref, double secs) 271// ###############################################################################################
356{
357 DWORD ms = (secs<0.0) ? INFINITE : (DWORD)((secs*1000.0)+0.5);
358 272
359 DWORD rc= WaitForSingleObject( *ref, ms /*timeout*/ ); 273void THREAD_SET_AFFINITY(unsigned int aff)
360 // 274{
361 // (WAIT_ABANDONED) 275 if( !SetThreadAffinityMask( GetCurrentThread(), aff))
362 // WAIT_OBJECT_0 success (0)
363 // WAIT_TIMEOUT
364 // WAIT_FAILED more info via GetLastError()
365
366 if (rc == WAIT_TIMEOUT) return false;
367 if( rc !=0) FAIL( "WaitForSingleObject", rc==WAIT_FAILED ? GetLastError() : rc);
368 *ref = nullptr; // thread no longer usable
369 return true;
370 }
371 //
372 void THREAD_KILL( THREAD_T *ref )
373 { 276 {
374 // nonexistent on Xbox360, simply disable until a better solution is found 277 FAIL( "THREAD_SET_AFFINITY", GetLastError());
375 #if !defined( PLATFORM_XBOX)
376 // in theory no-one should call this as it is very dangerous (memory and mutex leaks, no notification of DLLs, etc.)
377 if (!TerminateThread( *ref, 0 )) FAIL("TerminateThread", GetLastError());
378 #endif // PLATFORM_XBOX
379 *ref = nullptr;
380 } 278 }
381 279}
382 void THREAD_MAKE_ASYNCH_CANCELLABLE() {} // nothing to do for windows threads, we can cancel them anytime we want
383 280
384#if !defined __GNUC__ 281#if !defined __GNUC__
385 //see http://msdn.microsoft.com/en-us/library/xcb2z8hs.aspx 282 //see http://msdn.microsoft.com/en-us/library/xcb2z8hs.aspx
@@ -414,158 +311,6 @@ bool THREAD_WAIT_IMPL( THREAD_T *ref, double secs)
414#endif // !__GNUC__ 311#endif // !__GNUC__
415 } 312 }
416 313
417#if _WIN32_WINNT < 0x0600 // CONDITION_VARIABLE aren't available
418
419 void SIGNAL_INIT( SIGNAL_T* ref)
420 {
421 InitializeCriticalSection( &ref->signalCS);
422 InitializeCriticalSection( &ref->countCS);
423 if( 0 == (ref->waitEvent = CreateEvent( 0, true, false, 0))) // manual-reset
424 FAIL( "CreateEvent", GetLastError());
425 if( 0 == (ref->waitDoneEvent = CreateEvent( 0, false, false, 0))) // auto-reset
426 FAIL( "CreateEvent", GetLastError());
427 ref->waitersCount = 0;
428 }
429
430 void SIGNAL_FREE( SIGNAL_T* ref)
431 {
432 CloseHandle( ref->waitDoneEvent);
433 CloseHandle( ref->waitEvent);
434 DeleteCriticalSection( &ref->countCS);
435 DeleteCriticalSection( &ref->signalCS);
436 }
437
438 bool SIGNAL_WAIT( SIGNAL_T* ref, MUTEX_T* mu_ref, time_d abs_secs)
439 {
440 DWORD errc;
441 DWORD ms;
442
443 if( abs_secs < 0.0)
444 ms = INFINITE;
445 else if( abs_secs == 0.0)
446 ms = 0;
447 else
448 {
449 time_d msd = (abs_secs - now_secs()) * 1000.0 + 0.5;
450 // If the time already passed, still try once (ms==0). A short timeout
451 // may have turned negative or 0 because of the two time samples done.
452 ms = msd <= 0.0 ? 0 : (DWORD)msd;
453 }
454
455 EnterCriticalSection( &ref->signalCS);
456 EnterCriticalSection( &ref->countCS);
457 ++ ref->waitersCount;
458 LeaveCriticalSection( &ref->countCS);
459 LeaveCriticalSection( &ref->signalCS);
460
461 errc = SignalObjectAndWait( *mu_ref, ref->waitEvent, ms, false);
462
463 EnterCriticalSection( &ref->countCS);
464 if( 0 == -- ref->waitersCount)
465 {
466 // we're the last one leaving...
467 ResetEvent( ref->waitEvent);
468 SetEvent( ref->waitDoneEvent);
469 }
470 LeaveCriticalSection( &ref->countCS);
471 MUTEX_LOCK( mu_ref);
472
473 switch( errc)
474 {
475 case WAIT_TIMEOUT:
476 return false;
477 case WAIT_OBJECT_0:
478 return true;
479 }
480
481 FAIL( "SignalObjectAndWait", GetLastError());
482 return false;
483 }
484
485 void SIGNAL_ALL( SIGNAL_T* ref)
486 {
487 DWORD errc = WAIT_OBJECT_0;
488
489 EnterCriticalSection( &ref->signalCS);
490 EnterCriticalSection( &ref->countCS);
491
492 if( ref->waitersCount > 0)
493 {
494 ResetEvent( ref->waitDoneEvent);
495 SetEvent( ref->waitEvent);
496 LeaveCriticalSection( &ref->countCS);
497 errc = WaitForSingleObject( ref->waitDoneEvent, INFINITE);
498 }
499 else
500 {
501 LeaveCriticalSection( &ref->countCS);
502 }
503
504 LeaveCriticalSection( &ref->signalCS);
505
506 if( WAIT_OBJECT_0 != errc)
507 FAIL( "WaitForSingleObject", GetLastError());
508 }
509
510#else // CONDITION_VARIABLE are available, use them
511
512 //
513 void SIGNAL_INIT( SIGNAL_T *ref )
514 {
515 InitializeConditionVariable( ref);
516 }
517
518 void SIGNAL_FREE( SIGNAL_T *ref )
519 {
520 // nothing to do
521 (void)ref;
522 }
523
524 bool SIGNAL_WAIT( SIGNAL_T *ref, MUTEX_T *mu_ref, time_d abs_secs)
525 {
526 long ms;
527
528 if( abs_secs < 0.0)
529 ms = INFINITE;
530 else if( abs_secs == 0.0)
531 ms = 0;
532 else
533 {
534 ms = (long) ((abs_secs - now_secs())*1000.0 + 0.5);
535
536 // If the time already passed, still try once (ms==0). A short timeout
537 // may have turned negative or 0 because of the two time samples done.
538 //
539 if( ms < 0)
540 ms = 0;
541 }
542
543 if( !SleepConditionVariableCS( ref, mu_ref, ms))
544 {
545 if( GetLastError() == ERROR_TIMEOUT)
546 {
547 return false;
548 }
549 else
550 {
551 FAIL( "SleepConditionVariableCS", GetLastError());
552 }
553 }
554 return true;
555 }
556
557 void SIGNAL_ONE( SIGNAL_T *ref )
558 {
559 WakeConditionVariable( ref);
560 }
561
562 void SIGNAL_ALL( SIGNAL_T *ref )
563 {
564 WakeAllConditionVariable( ref);
565 }
566
567#endif // CONDITION_VARIABLE are available
568
569#else // THREADAPI == THREADAPI_PTHREAD 314#else // THREADAPI == THREADAPI_PTHREAD
570 // PThread (Linux, OS X, ...) 315 // PThread (Linux, OS X, ...)
571 // 316 //
@@ -607,44 +352,6 @@ bool THREAD_WAIT_IMPL( THREAD_T *ref, double secs)
607 abort(); 352 abort();
608 } 353 }
609 #define PT_CALL( call ) { int rc= call; if (rc!=0) _PT_FAIL( rc, #call, __FILE__, __LINE__ ); } 354 #define PT_CALL( call ) { int rc= call; if (rc!=0) _PT_FAIL( rc, #call, __FILE__, __LINE__ ); }
610 //
611 void SIGNAL_INIT( SIGNAL_T *ref ) {
612 PT_CALL(pthread_cond_init(ref, nullptr /*attr*/));
613 }
614 void SIGNAL_FREE( SIGNAL_T *ref ) {
615 PT_CALL( pthread_cond_destroy(ref) );
616 }
617 //
618 /*
619 * Timeout is given as absolute since we may have fake wakeups during
620 * a timed out sleep. A Linda with some other key read, or just because
621 * PThread cond vars can wake up unwantedly.
622 */
623 bool SIGNAL_WAIT( SIGNAL_T *ref, pthread_mutex_t *mu, time_d abs_secs ) {
624 if (abs_secs<0.0) {
625 PT_CALL( pthread_cond_wait( ref, mu ) ); // infinite
626 } else {
627 int rc;
628 struct timespec ts;
629
630 assert( abs_secs != 0.0 );
631 prepare_timeout( &ts, abs_secs );
632
633 rc= pthread_cond_timedwait( ref, mu, &ts );
634
635 if (rc==ETIMEDOUT) return false;
636 if (rc) { _PT_FAIL( rc, "pthread_cond_timedwait()", __FILE__, __LINE__ ); }
637 }
638 return true;
639 }
640 //
641 void SIGNAL_ONE( SIGNAL_T *ref ) {
642 PT_CALL( pthread_cond_signal(ref) ); // wake up ONE (or no) waiting thread
643 }
644 //
645 void SIGNAL_ALL( SIGNAL_T *ref ) {
646 PT_CALL( pthread_cond_broadcast(ref) ); // wake up ALL waiting threads
647 }
648 355
649// array of 7 thread priority values, hand-tuned by platform so that we offer a uniform [-3,+3] public priority range 356// array of 7 thread priority values, hand-tuned by platform so that we offer a uniform [-3,+3] public priority range
650static int const gs_prio_remap[] = 357static int const gs_prio_remap[] =
@@ -775,129 +482,36 @@ static int select_prio(int prio /* -3..+3 */)
775 return gs_prio_remap[prio + 3]; 482 return gs_prio_remap[prio + 3];
776} 483}
777 484
778void THREAD_CREATE( THREAD_T* ref, THREAD_RETURN_T (*func)( void*), void* data, int prio /* -3..+3 */) 485void THREAD_SET_PRIORITY( int prio)
779{ 486{
780 pthread_attr_t a;
781 bool const change_priority =
782#ifdef PLATFORM_LINUX 487#ifdef PLATFORM_LINUX
783 sudo && // only root-privileged process can change priorities 488 if( sudo) // only root-privileged process can change priorities
784#endif 489#endif // PLATFORM_LINUX
785 (prio != THREAD_PRIO_DEFAULT);
786
787 PT_CALL( pthread_attr_init( &a));
788
789#ifndef PTHREAD_TIMEDJOIN
790 // We create a NON-JOINABLE thread. This is mainly due to the lack of
791 // 'pthread_timedjoin()', but does offer other benefits (s.a. earlier
792 // freeing of the thread's resources).
793 //
794 PT_CALL( pthread_attr_setdetachstate( &a, PTHREAD_CREATE_DETACHED));
795#endif // PTHREAD_TIMEDJOIN
796
797 // Use this to find a system's default stack size (DEBUG)
798#if 0
799 {
800 size_t n;
801 pthread_attr_getstacksize( &a, &n);
802 fprintf( stderr, "Getstack: %u\n", (unsigned int)n);
803 }
804 // 524288 on OS X
805 // 2097152 on Linux x86 (Ubuntu 7.04)
806 // 1048576 on FreeBSD 6.2 SMP i386
807#endif // 0
808
809#if defined _THREAD_STACK_SIZE && _THREAD_STACK_SIZE > 0
810 PT_CALL( pthread_attr_setstacksize( &a, _THREAD_STACK_SIZE));
811#endif
812
813 if (change_priority)
814 { 490 {
815 struct sched_param sp; 491 struct sched_param sp;
816 // "The specified scheduling parameters are only used if the scheduling 492 // prio range [-3,+3] was checked by the caller
817 // parameter inheritance attribute is PTHREAD_EXPLICIT_SCHED." 493 sp.sched_priority = gs_prio_remap[ prio + 3];
818 // 494 PT_CALL( pthread_setschedparam( pthread_self(), _PRIO_MODE, &sp));
819#if !defined __ANDROID__ || ( defined __ANDROID__ && __ANDROID_API__ >= 28 )
820 PT_CALL( pthread_attr_setinheritsched( &a, PTHREAD_EXPLICIT_SCHED));
821#endif
822
823#ifdef _PRIO_SCOPE
824 PT_CALL( pthread_attr_setscope( &a, _PRIO_SCOPE));
825#endif // _PRIO_SCOPE
826
827 PT_CALL( pthread_attr_setschedpolicy( &a, _PRIO_MODE));
828
829 sp.sched_priority = select_prio(prio);
830 PT_CALL( pthread_attr_setschedparam( &a, &sp));
831 }
832
833 //---
834 // Seems on OS X, _POSIX_THREAD_THREADS_MAX is some kind of system
835 // thread limit (not userland thread). Actual limit for us is way higher.
836 // PTHREAD_THREADS_MAX is not defined (even though man page refers to it!)
837 //
838# ifndef THREAD_CREATE_RETRIES_MAX
839 // Don't bother with retries; a failure is a failure
840 //
841 {
842 int rc = pthread_create( ref, &a, func, data);
843 if( rc) _PT_FAIL( rc, "pthread_create()", __FILE__, __LINE__ - 1);
844 } 495 }
845# else
846# error "This code deprecated"
847 /*
848 // Wait slightly if thread creation has exchausted the system
849 //
850 { int retries;
851 for( retries=0; retries<THREAD_CREATE_RETRIES_MAX; retries++ ) {
852
853 int rc= pthread_create( ref, &a, func, data );
854 //
855 // OS X / Linux:
856 // EAGAIN: ".. lacked the necessary resources to create
857 // another thread, or the system-imposed limit on the
858 // total number of threads in a process
859 // [PTHREAD_THREADS_MAX] would be exceeded."
860 // EINVAL: attr is invalid
861 // Linux:
862 // EPERM: no rights for given parameters or scheduling (no sudo)
863 // ENOMEM: (known to fail with this code, too - not listed in man)
864
865 if (rc==0) break; // ok!
866
867 // In practise, exhaustion seems to be coming from memory, not a
868 // maximum number of threads. Keep tuning... ;)
869 //
870 if (rc==EAGAIN) {
871 //fprintf( stderr, "Looping (retries=%d) ", retries ); // DEBUG
872
873 // Try again, later.
874
875 Yield();
876 } else {
877 _PT_FAIL( rc, "pthread_create()", __FILE__, __LINE__ );
878 }
879 }
880 }
881 */
882# endif
883
884 PT_CALL( pthread_attr_destroy( &a));
885} 496}
886 497
498// #################################################################################################
887 499
888void THREAD_SET_PRIORITY( int prio) 500void JTHREAD_SET_PRIORITY(std::jthread& thread_, int prio_)
889{ 501{
890#ifdef PLATFORM_LINUX 502#ifdef PLATFORM_LINUX
891 if( sudo) // only root-privileged process can change priorities 503 if (sudo) // only root-privileged process can change priorities
892#endif // PLATFORM_LINUX 504#endif // PLATFORM_LINUX
893 { 505 {
894 struct sched_param sp; 506 struct sched_param sp;
895 // prio range [-3,+3] was checked by the caller 507 // prio range [-3,+3] was checked by the caller
896 sp.sched_priority = gs_prio_remap[ prio + 3]; 508 sp.sched_priority = gs_prio_remap[prio_ + 3];
897 PT_CALL( pthread_setschedparam( pthread_self(), _PRIO_MODE, &sp)); 509 PT_CALL(pthread_setschedparam(static_cast<pthread_t>(thread_.native_handle()), _PRIO_MODE, &sp));
898 } 510 }
899} 511}
900 512
513// #################################################################################################
514
901void THREAD_SET_AFFINITY( unsigned int aff) 515void THREAD_SET_AFFINITY( unsigned int aff)
902{ 516{
903 int bit = 0; 517 int bit = 0;
@@ -929,93 +543,6 @@ void THREAD_SET_AFFINITY( unsigned int aff)
929#endif 543#endif
930} 544}
931 545
932 /*
933 * Wait for a thread to finish.
934 *
935 * 'mu_ref' is a lock we should use for the waiting; initially unlocked.
936 * Same lock as passed to THREAD_EXIT.
937 *
938 * Returns true for successful wait, false for timed out
939 */
940bool THREAD_WAIT( THREAD_T *ref, double secs , SIGNAL_T *signal_ref, MUTEX_T *mu_ref, volatile enum e_status *st_ref)
941{
942 struct timespec ts_store;
943 const struct timespec* timeout = nullptr;
944 bool done;
945
946 // Do timeout counting before the locks
947 //
948#if THREADWAIT_METHOD == THREADWAIT_TIMEOUT
949 if (secs>=0.0)
950#else // THREADWAIT_METHOD == THREADWAIT_CONDVAR
951 if (secs>0.0)
952#endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR
953 {
954 prepare_timeout( &ts_store, now_secs()+secs );
955 timeout= &ts_store;
956 }
957
958#if THREADWAIT_METHOD == THREADWAIT_TIMEOUT
959 /* Thread is joinable
960 */
961 if (!timeout) {
962 PT_CALL(pthread_join(*ref, nullptr /*ignore exit value*/));
963 done = true;
964 } else {
965 int rc = PTHREAD_TIMEDJOIN(*ref, nullptr, timeout);
966 if ((rc!=0) && (rc!=ETIMEDOUT)) {
967 _PT_FAIL( rc, "PTHREAD_TIMEDJOIN", __FILE__, __LINE__-2 );
968 }
969 done= rc==0;
970 }
971#else // THREADWAIT_METHOD == THREADWAIT_CONDVAR
972 /* Since we've set the thread up as PTHREAD_CREATE_DETACHED, we cannot
973 * join with it. Use the cond.var.
974 */
975 (void) ref; // unused
976 MUTEX_LOCK( mu_ref );
977
978 // 'secs'==0.0 does not need to wait, just take the current status
979 // within the 'mu_ref' locks
980 //
981 if (secs != 0.0) {
982 while( *st_ref < DONE ) {
983 if (!timeout) {
984 PT_CALL( pthread_cond_wait( signal_ref, mu_ref ));
985 } else {
986 int rc= pthread_cond_timedwait( signal_ref, mu_ref, timeout );
987 if (rc==ETIMEDOUT) break;
988 if (rc!=0) _PT_FAIL( rc, "pthread_cond_timedwait", __FILE__, __LINE__-2 );
989 }
990 }
991 }
992 done= *st_ref >= DONE; // DONE|ERROR_ST|CANCELLED
993
994 MUTEX_UNLOCK( mu_ref );
995#endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR
996 return done;
997 }
998 //
999 void THREAD_KILL( THREAD_T *ref ) {
1000#ifdef __ANDROID__
1001 __android_log_print(ANDROID_LOG_WARN, LOG_TAG, "Cannot kill thread!");
1002#else
1003 pthread_cancel( *ref );
1004#endif
1005 }
1006
1007 void THREAD_MAKE_ASYNCH_CANCELLABLE()
1008 {
1009#ifdef __ANDROID__
1010 __android_log_print(ANDROID_LOG_WARN, LOG_TAG, "Cannot make thread async cancellable!");
1011#else
1012 // that's the default, but just in case...
1013 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, nullptr);
1014 // we want cancellation to take effect immediately if possible, instead of waiting for a cancellation point (which is the default)
1015 pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, nullptr);
1016#endif
1017 }
1018
1019 void THREAD_SETNAME( char const* _name) 546 void THREAD_SETNAME( char const* _name)
1020 { 547 {
1021 // exact API to set the thread name is platform-dependant 548 // exact API to set the thread name is platform-dependant
diff --git a/src/threading.h b/src/threading.h
index 38a021f..e9f302a 100644
--- a/src/threading.h
+++ b/src/threading.h
@@ -1,13 +1,9 @@
1#pragma once 1#pragma once
2 2
3/*
4 * win32-pthread:
5 * define HAVE_WIN32_PTHREAD and PTW32_INCLUDE_WINDOWS_H in your project configuration when building for win32-pthread.
6 * link against pthreadVC2.lib, and of course have pthreadVC2.dll somewhere in your path.
7 */
8#include "platform.h" 3#include "platform.h"
9 4
10#include <time.h> 5#include <time.h>
6#include <thread>
11 7
12/* Note: ERROR is a defined entity on Win32 8/* Note: ERROR is a defined entity on Win32
13 PENDING: The Lua VM hasn't done anything yet. 9 PENDING: The Lua VM hasn't done anything yet.
@@ -19,7 +15,7 @@ enum e_status { PENDING, RUNNING, WAITING, DONE, ERROR_ST, CANCELLED };
19#define THREADAPI_WINDOWS 1 15#define THREADAPI_WINDOWS 1
20#define THREADAPI_PTHREAD 2 16#define THREADAPI_PTHREAD 2
21 17
22#if( defined( PLATFORM_XBOX) || defined( PLATFORM_WIN32) || defined( PLATFORM_POCKETPC)) && !defined( HAVE_WIN32_PTHREAD) 18#if( defined( PLATFORM_XBOX) || defined( PLATFORM_WIN32) || defined( PLATFORM_POCKETPC))
23//#pragma message ( "THREADAPI_WINDOWS" ) 19//#pragma message ( "THREADAPI_WINDOWS" )
24#define THREADAPI THREADAPI_WINDOWS 20#define THREADAPI THREADAPI_WINDOWS
25#else // (defined PLATFORM_WIN32) || (defined PLATFORM_POCKETPC) 21#else // (defined PLATFORM_WIN32) || (defined PLATFORM_POCKETPC)
@@ -68,16 +64,9 @@ enum e_status { PENDING, RUNNING, WAITING, DONE, ERROR_ST, CANCELLED };
68 }; 64 };
69 65
70 66
71 #define MUTEX_T HANDLE
72 void MUTEX_INIT( MUTEX_T* ref);
73 void MUTEX_FREE( MUTEX_T* ref);
74 void MUTEX_LOCK( MUTEX_T* ref);
75 void MUTEX_UNLOCK( MUTEX_T* ref);
76
77 #else // CONDITION_VARIABLE are available, use them 67 #else // CONDITION_VARIABLE are available, use them
78 68
79 #define SIGNAL_T CONDITION_VARIABLE 69 #define SIGNAL_T CONDITION_VARIABLE
80 #define MUTEX_T CRITICAL_SECTION
81 #define MUTEX_INIT( ref) InitializeCriticalSection( ref) 70 #define MUTEX_INIT( ref) InitializeCriticalSection( ref)
82 #define MUTEX_FREE( ref) DeleteCriticalSection( ref) 71 #define MUTEX_FREE( ref) DeleteCriticalSection( ref)
83 #define MUTEX_LOCK( ref) EnterCriticalSection( ref) 72 #define MUTEX_LOCK( ref) EnterCriticalSection( ref)
@@ -111,7 +100,6 @@ enum e_status { PENDING, RUNNING, WAITING, DONE, ERROR_ST, CANCELLED };
111 # define _MUTEX_RECURSIVE PTHREAD_MUTEX_RECURSIVE 100 # define _MUTEX_RECURSIVE PTHREAD_MUTEX_RECURSIVE
112 #endif 101 #endif
113 102
114 #define MUTEX_T pthread_mutex_t
115 #define MUTEX_INIT(ref) pthread_mutex_init(ref, nullptr) 103 #define MUTEX_INIT(ref) pthread_mutex_init(ref, nullptr)
116 #define MUTEX_RECURSIVE_INIT(ref) \ 104 #define MUTEX_RECURSIVE_INIT(ref) \
117 { pthread_mutexattr_t a; pthread_mutexattr_init( &a ); \ 105 { pthread_mutexattr_t a; pthread_mutexattr_init( &a ); \
@@ -126,8 +114,6 @@ enum e_status { PENDING, RUNNING, WAITING, DONE, ERROR_ST, CANCELLED };
126 114
127 using SIGNAL_T = pthread_cond_t; 115 using SIGNAL_T = pthread_cond_t;
128 116
129 void SIGNAL_ONE( SIGNAL_T *ref );
130
131 // Yield is non-portable: 117 // Yield is non-portable:
132 // 118 //
133 // OS X 10.4.8/9 has pthread_yield_np() 119 // OS X 10.4.8/9 has pthread_yield_np()
@@ -143,10 +129,6 @@ enum e_status { PENDING, RUNNING, WAITING, DONE, ERROR_ST, CANCELLED };
143 #define THREAD_CALLCONV 129 #define THREAD_CALLCONV
144#endif //THREADAPI == THREADAPI_PTHREAD 130#endif //THREADAPI == THREADAPI_PTHREAD
145 131
146void SIGNAL_INIT( SIGNAL_T *ref );
147void SIGNAL_FREE( SIGNAL_T *ref );
148void SIGNAL_ALL( SIGNAL_T *ref );
149
150/* 132/*
151* 'time_d': <0.0 for no timeout 133* 'time_d': <0.0 for no timeout
152* 0.0 for instant check 134* 0.0 for instant check
@@ -155,11 +137,6 @@ void SIGNAL_ALL( SIGNAL_T *ref );
155using time_d = double; 137using time_d = double;
156time_d now_secs(void); 138time_d now_secs(void);
157 139
158time_d SIGNAL_TIMEOUT_PREPARE( double rel_secs );
159
160bool SIGNAL_WAIT( SIGNAL_T *ref, MUTEX_T *mu, time_d timeout );
161
162
163/*---=== Threading ===--- 140/*---=== Threading ===---
164*/ 141*/
165 142
@@ -167,16 +144,9 @@ bool SIGNAL_WAIT( SIGNAL_T *ref, MUTEX_T *mu, time_d timeout );
167 144
168#if THREADAPI == THREADAPI_WINDOWS 145#if THREADAPI == THREADAPI_WINDOWS
169 146
170 using THREAD_T = HANDLE;
171# define THREAD_ISNULL( _h) (_h == 0)
172 void THREAD_CREATE( THREAD_T* ref, THREAD_RETURN_T (__stdcall *func)( void*), void* data, int prio /* -3..+3 */);
173
174# define THREAD_PRIO_MIN (-3) 147# define THREAD_PRIO_MIN (-3)
175# define THREAD_PRIO_MAX (+3) 148# define THREAD_PRIO_MAX (+3)
176 149
177# define THREAD_CLEANUP_PUSH( cb_, val_)
178# define THREAD_CLEANUP_POP( execute_)
179
180#else // THREADAPI == THREADAPI_PTHREAD 150#else // THREADAPI == THREADAPI_PTHREAD
181 151
182 /* Platforms that have a timed 'pthread_join()' can get away with a simpler 152 /* Platforms that have a timed 'pthread_join()' can get away with a simpler
@@ -195,11 +165,6 @@ bool SIGNAL_WAIT( SIGNAL_T *ref, MUTEX_T *mu, time_d timeout );
195# endif 165# endif
196# endif 166# endif
197 167
198 using THREAD_T = pthread_t;
199# define THREAD_ISNULL( _h) 0 // pthread_t may be a structure: never 'null' by itself
200
201 void THREAD_CREATE( THREAD_T* ref, THREAD_RETURN_T (*func)( void*), void* data, int prio /* -3..+3 */);
202
203# if defined(PLATFORM_LINUX) 168# if defined(PLATFORM_LINUX)
204 extern volatile bool sudo; 169 extern volatile bool sudo;
205# ifdef LINUX_SCHED_RR 170# ifdef LINUX_SCHED_RR
@@ -213,13 +178,6 @@ bool SIGNAL_WAIT( SIGNAL_T *ref, MUTEX_T *mu, time_d timeout );
213# define THREAD_PRIO_MAX (+3) 178# define THREAD_PRIO_MAX (+3)
214# endif 179# endif
215 180
216# if THREADWAIT_METHOD == THREADWAIT_CONDVAR
217# define THREAD_CLEANUP_PUSH( cb_, val_) pthread_cleanup_push( cb_, val_)
218# define THREAD_CLEANUP_POP( execute_) pthread_cleanup_pop( execute_)
219# else
220# define THREAD_CLEANUP_PUSH( cb_, val_) {
221# define THREAD_CLEANUP_POP( execute_) }
222# endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR
223#endif // THREADAPI == THREADAPI_WINDOWS 181#endif // THREADAPI == THREADAPI_WINDOWS
224 182
225/* 183/*
@@ -236,16 +194,8 @@ bool SIGNAL_WAIT( SIGNAL_T *ref, MUTEX_T *mu, time_d timeout );
236#endif // THREADAPI == THREADAPI_WINDOWS || (defined PTHREAD_TIMEDJOIN) 194#endif // THREADAPI == THREADAPI_WINDOWS || (defined PTHREAD_TIMEDJOIN)
237 195
238 196
239#if THREADWAIT_METHOD == THREADWAIT_TIMEOUT
240bool THREAD_WAIT_IMPL( THREAD_T *ref, double secs);
241#define THREAD_WAIT( a, b, c, d, e) THREAD_WAIT_IMPL( a, b)
242#else // THREADWAIT_METHOD == THREADWAIT_CONDVAR
243bool THREAD_WAIT_IMPL( THREAD_T *ref, double secs, SIGNAL_T *signal_ref, MUTEX_T *mu_ref, volatile enum e_status *st_ref);
244#define THREAD_WAIT THREAD_WAIT_IMPL
245#endif // // THREADWAIT_METHOD == THREADWAIT_CONDVAR
246
247void THREAD_KILL( THREAD_T* ref);
248void THREAD_SETNAME( char const* _name); 197void THREAD_SETNAME( char const* _name);
249void THREAD_MAKE_ASYNCH_CANCELLABLE();
250void THREAD_SET_PRIORITY( int prio); 198void THREAD_SET_PRIORITY( int prio);
251void THREAD_SET_AFFINITY( unsigned int aff); 199void THREAD_SET_AFFINITY( unsigned int aff);
200
201void JTHREAD_SET_PRIORITY(std::jthread& thread_, int prio_);
diff --git a/tests/cancel.lua b/tests/cancel.lua
index c5bb761..c22103f 100644
--- a/tests/cancel.lua
+++ b/tests/cancel.lua
@@ -139,6 +139,7 @@ if not next(which_tests) or which_tests.linda then
139 linda:receive( 1, "yeah") 139 linda:receive( 1, "yeah")
140 140
141 -- linda cancel: linda:receive() returns cancel_error immediately 141 -- linda cancel: linda:receive() returns cancel_error immediately
142 print "cancelling"
142 linda:cancel( "both") 143 linda:cancel( "both")
143 144
144 -- wait until cancellation is effective. 145 -- wait until cancellation is effective.
@@ -163,6 +164,7 @@ if not next(which_tests) or which_tests.soft then
163 waitCancellation( h, "waiting") 164 waitCancellation( h, "waiting")
164 165
165 -- soft cancel, this time awakens waiting linda operations, which returns cancel_error immediately, no timeout. 166 -- soft cancel, this time awakens waiting linda operations, which returns cancel_error immediately, no timeout.
167 print "cancelling"
166 h:cancel( "soft", true) 168 h:cancel( "soft", true)
167 169
168 -- wait until cancellation is effective. the lane will interrupt its loop and print the exit message 170 -- wait until cancellation is effective. the lane will interrupt its loop and print the exit message
@@ -177,6 +179,7 @@ if not next(which_tests) or which_tests.hook then
177 linda:receive( 2, "yeah") 179 linda:receive( 2, "yeah")
178 180
179 -- count hook cancel after some instruction instructions 181 -- count hook cancel after some instruction instructions
182 print "cancelling"
180 h:cancel( "line", 300, 5.0) 183 h:cancel( "line", 300, 5.0)
181 184
182 -- wait until cancellation is effective. the lane will interrupt its loop and print the exit message 185 -- wait until cancellation is effective. the lane will interrupt its loop and print the exit message
@@ -193,6 +196,7 @@ if not next(which_tests) or which_tests.hard then
193 linda:receive( 2, "yeah") 196 linda:receive( 2, "yeah")
194 197
195 -- hard cancel: the lane will be interrupted from inside its current linda:receive() and won't return from it 198 -- hard cancel: the lane will be interrupted from inside its current linda:receive() and won't return from it
199 print "cancelling"
196 h:cancel() 200 h:cancel()
197 201
198 -- wait until cancellation is effective. the lane will be stopped by the linda operation throwing an error 202 -- wait until cancellation is effective. the lane will be stopped by the linda operation throwing an error
@@ -209,27 +213,13 @@ if not next(which_tests) or which_tests.hard_unprotected then
209 linda:receive( 2, "yeah") 213 linda:receive( 2, "yeah")
210 214
211 -- hard cancel: the lane will be interrupted from inside its current linda:receive() and won't return from it 215 -- hard cancel: the lane will be interrupted from inside its current linda:receive() and won't return from it
216 print "cancelling"
212 h:cancel() 217 h:cancel()
213 218
214 -- wait until cancellation is effective. the lane will be stopped by the linda operation throwing an error 219 -- wait until cancellation is effective. the lane will be stopped by the linda operation throwing an error
215 waitCancellation( h, "cancelled") 220 waitCancellation( h, "cancelled")
216end 221end
217 222
218if not next(which_tests) or which_tests.kill then
219 remaining_tests.kill = nil
220 print "\n\n####################################################################\nbegin kill cancel test\n"
221 h = lanes.gen( "*", laneBody)( "busy", 50000000) -- start a pure Lua busy loop lane
222
223 -- wait 1/3s before cancelling the lane, before the busy loop can finish
224 print "wait 0.3s"
225 linda:receive( 0.3, "yeah")
226
227 -- hard cancel with kill: the lane thread will be forcefully terminated. kill timeout is pthread-specific
228 h:cancel( true, 1.0)
229
230 -- wait until cancellation is effective. the lane will be stopped by the linda operation throwing an error
231 waitCancellation( h, "killed")
232end
233--#################################################################### 223--####################################################################
234 224
235local unknown_test, val = next(remaining_tests) 225local unknown_test, val = next(remaining_tests)