diff options
-rw-r--r-- | docs/index.html | 49 | ||||
-rw-r--r-- | src/cancel.cpp | 178 | ||||
-rw-r--r-- | src/cancel.h | 8 | ||||
-rw-r--r-- | src/keeper.cpp | 19 | ||||
-rw-r--r-- | src/keeper.h | 10 | ||||
-rw-r--r-- | src/lanes.cpp | 337 | ||||
-rw-r--r-- | src/lanes.lua | 451 | ||||
-rw-r--r-- | src/lanes_private.h | 43 | ||||
-rw-r--r-- | src/linda.cpp | 92 | ||||
-rw-r--r-- | src/macros_and_utils.h | 5 | ||||
-rw-r--r-- | src/threading.cpp | 525 | ||||
-rw-r--r-- | src/threading.h | 58 | ||||
-rw-r--r-- | tests/cancel.lua | 20 |
13 files changed, 588 insertions, 1207 deletions
diff --git a/docs/index.html b/docs/index.html index ee5acfa..d24d3d7 100644 --- a/docs/index.html +++ b/docs/index.html | |||
@@ -425,7 +425,18 @@ | |||
425 | number >= 0 | 425 | number >= 0 |
426 | </td> | 426 | </td> |
427 | <td> | 427 | <td> |
428 | Sets the duration in seconds Lanes will wait for graceful termination of running lanes at application shutdown. Irrelevant for builds using pthreads. Default is <tt>0.25</tt>. | 428 | Sets the duration in seconds Lanes will wait for graceful termination of running lanes at application shutdown. Default is <tt>0.25</tt>. |
429 | </td> | ||
430 | </tr> | ||
431 | <tr valign=top> | ||
432 | <td id="shutdown_mode"> | ||
433 | <code>.shutdown_mode</code> | ||
434 | </td> | ||
435 | <td> | ||
436 | <tt>"hard"</tt>/<tt>"soft"</tt>/<tt>"call"</tt>/<tt>"ret"</tt>/<tt>"line"</tt>/<tt>"count"</tt> | ||
437 | </td> | ||
438 | <td> | ||
439 | Select the cancellation mode used at Lanes shutdown to request free running lane termination. See <a href="#cancelling">lane cancellation</a>. Default is <tt>"hard"</tt>. | ||
429 | </td> | 440 | </td> |
430 | </tr> | 441 | </tr> |
431 | </table> | 442 | </table> |
@@ -875,16 +886,6 @@ | |||
875 | received <a href="#cancelling">cancellation</a> and finished itself. | 886 | received <a href="#cancelling">cancellation</a> and finished itself. |
876 | </td> | 887 | </td> |
877 | </tr> | 888 | </tr> |
878 | <tr> | ||
879 | <td/> | ||
880 | <td> | ||
881 | <tt>"killed"</tt> | ||
882 | </td> | ||
883 | <td/> | ||
884 | <td> | ||
885 | was forcefully killed by <tt>lane_h:cancel()</tt> | ||
886 | </td> | ||
887 | </tr> | ||
888 | </table> | 889 | </table> |
889 | </p> | 890 | </p> |
890 | 891 | ||
@@ -996,36 +997,33 @@ | |||
996 | <h2 id="cancelling">Cancelling</h2> | 997 | <h2 id="cancelling">Cancelling</h2> |
997 | 998 | ||
998 | <table border="1" bgcolor="#E0E0FF" cellpadding="10" style="width:50%"><tr><td><pre> | 999 | <table border="1" bgcolor="#E0E0FF" cellpadding="10" style="width:50%"><tr><td><pre> |
999 | bool[,reason] = lane_h:cancel( "soft" [, timeout] [, wake_bool]) | 1000 | bool[,reason] = lane_h:cancel( "soft" [, timeout] [, wake_lane]) |
1000 | bool[,reason] = lane_h:cancel( "hard" [, timeout] [, force [, forcekill_timeout]]) | 1001 | bool[,reason] = lane_h:cancel( "hard" [, timeout] [, wake_lane]) |
1001 | bool[,reason] = lane_h:cancel( [mode, hookcount] [, timeout] [, force [, forcekill_timeout]]) | 1002 | bool[,reason] = lane_h:cancel( [mode, hookcount] [, timeout] [, wake_lane]) |
1002 | </pre></td></tr></table> | 1003 | </pre></td></tr></table> |
1003 | 1004 | ||
1004 | <p> | 1005 | <p> |
1005 | <tt>cancel()</tt> sends a cancellation request to the lane.<br/> | 1006 | <tt>cancel()</tt> sends a cancellation request to the lane.<br/> |
1006 | First argument is a <tt>mode</tt> can be one of <tt>"hard"</tt>, <tt>"soft"</tt>, <tt>"count"</tt>, <tt>"line"</tt>, <tt>"call"</tt>, <tt>"ret"</tt>. | 1007 | First argument is a <tt>mode</tt> can be one of <tt>"hard"</tt>, <tt>"soft"</tt>, <tt>"call"</tt>, <tt>"ret"</tt>, <tt>"line"</tt>, <tt>"count"</tt>. |
1007 | If <tt>mode</tt> is not specified, it defaults to <tt>"hard"</tt>. | 1008 | If <tt>mode</tt> is not specified, it defaults to <tt>"hard"</tt>. |
1009 | If <tt>wake_lane</tt> is <tt>true</tt>, the lane is also signalled so that execution returns from any pending linda operation. Linda operations detecting the cancellation request return <tt>lanes.cancel_error</tt>. | ||
1008 | </p> | 1010 | </p> |
1009 | <p> | 1011 | <p> |
1010 | If <tt>mode</tt> is <tt>"soft"</tt>, cancellation will only cause <tt>cancel_test()</tt> to return <tt>true</tt>, so that the lane can cleanup manually.<br/> | 1012 | If <tt>mode</tt> is <tt>"soft"</tt>, cancellation will only cause <tt>cancel_test()</tt> to return <tt>true</tt>, so that the lane can cleanup manually.<br/> |
1011 | If <tt>wake_bool</tt> is <tt>true</tt>, the lane is also signalled so that execution returns from any pending linda operation. Linda operations detecting the cancellation request return <tt>lanes.cancel_error</tt>. | ||
1012 | </p> | 1013 | </p> |
1013 | <p> | 1014 | <p> |
1014 | If <tt>mode</tt> is <tt>"hard"</tt>, waits for the request to be processed, or a timeout to occur. Linda operations detecting the cancellation request will raise a special cancellation error (meaning they won't return in that case).<br/> | 1015 | If <tt>mode</tt> is <tt>"hard"</tt>, waits for the request to be processed, or a timeout to occur. Linda operations detecting the cancellation request will raise a special cancellation error (meaning they won't return in that case).<br/> |
1015 | <tt>timeout</tt> defaults to 0 if not specified. | 1016 | <tt>wake_lane</tt> defaults to <tt>true</tt>, and <tt>timeout</tt> defaults to 0 if not specified. |
1016 | </p> | 1017 | </p> |
1017 | <p> | 1018 | <p> |
1018 | Other values of <tt>mode</tt> will asynchronously install the corresponding hook, then behave as <tt>"hard"</tt>. | 1019 | Other values of <tt>mode</tt> will asynchronously install the corresponding hook, then behave as <tt>"hard"</tt>. |
1019 | </p> | 1020 | </p> |
1020 | <p> | 1021 | <p> |
1021 | If <tt>force_kill_bool</tt> is <tt>true</tt>, <tt>forcekill_timeout</tt> can be set to tell how long lanes will wait for the OS thread to terminate before raising an error. Windows threads always terminate immediately, but it might not always be the case with some pthread implementations. | ||
1022 | </p> | ||
1023 | <p> | ||
1024 | Returns <tt>true, lane_h.status</tt> if lane was already done (in <tt>"done"</tt>, <tt>"error"</tt> or <tt>"cancelled"</tt> status), or the cancellation was fruitful within <tt>timeout_secs</tt> timeout period.<br/> | 1022 | Returns <tt>true, lane_h.status</tt> if lane was already done (in <tt>"done"</tt>, <tt>"error"</tt> or <tt>"cancelled"</tt> status), or the cancellation was fruitful within <tt>timeout_secs</tt> timeout period.<br/> |
1025 | Returns <tt>false, "timeout"</tt> otherwise. | 1023 | Returns <tt>false, "timeout"</tt> otherwise. |
1026 | </p> | 1024 | </p> |
1027 | <p> | 1025 | <p> |
1028 | If the lane is still running after the timeout expired and <tt>force_kill</tt> is <tt>true</tt>, the OS thread running the lane is forcefully killed. This means no GC, probable OS resource leaks (thread stack, locks, DLL notifications), and should generally be the last resort. | 1026 | If the lane is still running after the timeout expired, there is a chance lanes will raise an error at shutdown when failing to terminate all free-running lanes within the specified timeout. |
1029 | </p> | 1027 | </p> |
1030 | <p> | 1028 | <p> |
1031 | Cancellation is tested <u>before</u> going to sleep in <tt>receive()</tt> or <tt>send()</tt> calls and after executing <tt>cancelstep</tt> Lua statements. A pending <tt>receive()</tt>or <tt>send()</tt> call is awakened. | 1029 | Cancellation is tested <u>before</u> going to sleep in <tt>receive()</tt> or <tt>send()</tt> calls and after executing <tt>cancelstep</tt> Lua statements. A pending <tt>receive()</tt>or <tt>send()</tt> call is awakened. |
@@ -1396,6 +1394,14 @@ events to a common Linda, but... :).</font> | |||
1396 | Default duration is null, which should only cause a thread context switch. | 1394 | Default duration is null, which should only cause a thread context switch. |
1397 | </p> | 1395 | </p> |
1398 | 1396 | ||
1397 | <table border="1" bgcolor="#E0E0FF" cellpadding="10" style="width:50%"><tr><td><pre> | ||
1398 | number = lanes.now_secs() | ||
1399 | </pre></td></tr></table> | ||
1400 | |||
1401 | <p> | ||
1402 | Returns the current value of the clock used by timers and lindas. | ||
1403 | </p> | ||
1404 | |||
1399 | <!-- locks +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ --> | 1405 | <!-- locks +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ --> |
1400 | <hr/> | 1406 | <hr/> |
1401 | <h2 id="locks">Locks etc.</h2> | 1407 | <h2 id="locks">Locks etc.</h2> |
@@ -1797,3 +1803,4 @@ int luaD_new_clonable( lua_State* L) | |||
1797 | 1803 | ||
1798 | </body> | 1804 | </body> |
1799 | </html> | 1805 | </html> |
1806 | </pre> \ No newline at end of file | ||
diff --git a/src/cancel.cpp b/src/cancel.cpp index 4667f07..6a94343 100644 --- a/src/cancel.cpp +++ b/src/cancel.cpp | |||
@@ -92,7 +92,7 @@ static void cancel_hook(lua_State* L, [[maybe_unused]] lua_Debug* ar) | |||
92 | // ################################################################################################ | 92 | // ################################################################################################ |
93 | 93 | ||
94 | //--- | 94 | //--- |
95 | // = thread_cancel( lane_ud [,timeout_secs=0.0] [,force_kill_bool=false] ) | 95 | // = thread_cancel( lane_ud [,timeout_secs=0.0] [,wake_lindas_bool=false] ) |
96 | // | 96 | // |
97 | // The originator thread asking us specifically to cancel the other thread. | 97 | // The originator thread asking us specifically to cancel the other thread. |
98 | // | 98 | // |
@@ -100,9 +100,8 @@ static void cancel_hook(lua_State* L, [[maybe_unused]] lua_Debug* ar) | |||
100 | // 0.0: just signal it to cancel, no time waited | 100 | // 0.0: just signal it to cancel, no time waited |
101 | // >0: time to wait for the lane to detect cancellation | 101 | // >0: time to wait for the lane to detect cancellation |
102 | // | 102 | // |
103 | // 'force_kill': if true, and lane does not detect cancellation within timeout, | 103 | // 'wake_lindas_bool': if true, signal any linda the thread is waiting on |
104 | // it is forcefully killed. Using this with 0.0 timeout means just kill | 104 | // instead of waiting for its timeout (if any) |
105 | // (unless the lane is already finished). | ||
106 | // | 105 | // |
107 | // Returns: true if the lane was already finished (DONE/ERROR_ST/CANCELLED) or if we | 106 | // Returns: true if the lane was already finished (DONE/ERROR_ST/CANCELLED) or if we |
108 | // managed to cancel it. | 107 | // managed to cancel it. |
@@ -111,76 +110,47 @@ static void cancel_hook(lua_State* L, [[maybe_unused]] lua_Debug* ar) | |||
111 | 110 | ||
112 | // ################################################################################################ | 111 | // ################################################################################################ |
113 | 112 | ||
114 | static CancelResult thread_cancel_soft(Lane* lane_, double secs_, bool wake_lindas_) | 113 | static CancelResult thread_cancel_soft(Lane* lane_, lua_Duration duration_, bool wake_lane_) |
115 | { | 114 | { |
116 | lane_->cancel_request = CancelRequest::Soft; // it's now signaled to stop | 115 | lane_->cancel_request = CancelRequest::Soft; // it's now signaled to stop |
117 | // negative timeout: we don't want to truly abort the lane, we just want it to react to cancel_test() on its own | 116 | // negative timeout: we don't want to truly abort the lane, we just want it to react to cancel_test() on its own |
118 | if (wake_lindas_) // wake the thread so that execution returns from any pending linda operation if desired | 117 | if (wake_lane_) // wake the thread so that execution returns from any pending linda operation if desired |
119 | { | 118 | { |
120 | SIGNAL_T* const waiting_on{ lane_->waiting_on }; | 119 | std::condition_variable* const waiting_on{ lane_->m_waiting_on }; |
121 | if (lane_->status == WAITING && waiting_on != nullptr) | 120 | if (lane_->status == WAITING && waiting_on != nullptr) |
122 | { | 121 | { |
123 | SIGNAL_ALL( waiting_on); | 122 | waiting_on->notify_all(); |
124 | } | 123 | } |
125 | } | 124 | } |
126 | 125 | ||
127 | return THREAD_WAIT(&lane_->thread, secs_, &lane_->done_signal, &lane_->done_lock, &lane_->status) ? CancelResult::Cancelled : CancelResult::Timeout; | 126 | return lane_->waitForCompletion(duration_) ? CancelResult::Cancelled : CancelResult::Timeout; |
128 | } | 127 | } |
129 | 128 | ||
130 | // ################################################################################################ | 129 | // ################################################################################################ |
131 | 130 | ||
132 | static CancelResult thread_cancel_hard(lua_State* L, Lane* lane_, double secs_, bool force_, double waitkill_timeout_) | 131 | static CancelResult thread_cancel_hard(Lane* lane_, lua_Duration duration_, bool wake_lane_) |
133 | { | 132 | { |
134 | lane_->cancel_request = CancelRequest::Hard; // it's now signaled to stop | 133 | lane_->cancel_request = CancelRequest::Hard; // it's now signaled to stop |
134 | //lane_->m_thread.get_stop_source().request_stop(); | ||
135 | if (wake_lane_) // wake the thread so that execution returns from any pending linda operation if desired | ||
135 | { | 136 | { |
136 | SIGNAL_T* waiting_on = lane_->waiting_on; | 137 | std::condition_variable* waiting_on = lane_->m_waiting_on; |
137 | if (lane_->status == WAITING && waiting_on != nullptr) | 138 | if (lane_->status == WAITING && waiting_on != nullptr) |
138 | { | 139 | { |
139 | SIGNAL_ALL( waiting_on); | 140 | waiting_on->notify_all(); |
140 | } | 141 | } |
141 | } | 142 | } |
142 | 143 | ||
143 | CancelResult result{ THREAD_WAIT(&lane_->thread, secs_, &lane_->done_signal, &lane_->done_lock, &lane_->status) ? CancelResult::Cancelled : CancelResult::Timeout }; | 144 | CancelResult result{ lane_->waitForCompletion(duration_) ? CancelResult::Cancelled : CancelResult::Timeout }; |
144 | |||
145 | if ((result == CancelResult::Timeout) && force_) | ||
146 | { | ||
147 | // Killing is asynchronous; we _will_ wait for it to be done at | ||
148 | // GC, to make sure the data structure can be released (alternative | ||
149 | // would be use of "cancellation cleanup handlers" that at least | ||
150 | // PThread seems to have). | ||
151 | // | ||
152 | THREAD_KILL(&lane_->thread); | ||
153 | #if THREADAPI == THREADAPI_PTHREAD | ||
154 | // pthread: make sure the thread is really stopped! | ||
155 | // note that this may block forever if the lane doesn't call a cancellation point and pthread doesn't honor PTHREAD_CANCEL_ASYNCHRONOUS | ||
156 | result = THREAD_WAIT(&lane_->thread, waitkill_timeout_, &lane_->done_signal, &lane_->done_lock, &lane_->status) ? CancelResult::Killed : CancelResult::Timeout; | ||
157 | if (result == CancelResult::Timeout) | ||
158 | { | ||
159 | std::ignore = luaL_error( L, "force-killed lane failed to terminate within %f second%s", waitkill_timeout_, waitkill_timeout_ > 1 ? "s" : ""); | ||
160 | } | ||
161 | #else | ||
162 | (void) waitkill_timeout_; // unused | ||
163 | (void) L; // unused | ||
164 | #endif // THREADAPI == THREADAPI_PTHREAD | ||
165 | lane_->mstatus = Lane::Killed; // mark 'gc' to wait for it | ||
166 | // note that lane_->status value must remain to whatever it was at the time of the kill | ||
167 | // because we need to know if we can lua_close() the Lua State or not. | ||
168 | result = CancelResult::Killed; | ||
169 | } | ||
170 | return result; | 145 | return result; |
171 | } | 146 | } |
172 | 147 | ||
173 | // ################################################################################################ | 148 | // ################################################################################################ |
174 | 149 | ||
175 | CancelResult thread_cancel(lua_State* L, Lane* lane_, CancelOp op_, double secs_, bool force_, double waitkill_timeout_) | 150 | CancelResult thread_cancel(Lane* lane_, CancelOp op_, int hook_count_, lua_Duration duration_, bool wake_lane_) |
176 | { | 151 | { |
177 | // remember that lanes are not transferable: only one thread can cancel a lane, so no multithreading issue here | 152 | // remember that lanes are not transferable: only one thread can cancel a lane, so no multithreading issue here |
178 | // We can read 'lane_->status' without locks, but not wait for it (if Posix no PTHREAD_TIMEDJOIN) | 153 | // We can read 'lane_->status' without locks, but not wait for it (if Posix no PTHREAD_TIMEDJOIN) |
179 | if (lane_->mstatus == Lane::Killed) | ||
180 | { | ||
181 | return CancelResult::Killed; | ||
182 | } | ||
183 | |||
184 | if (lane_->status >= DONE) | 154 | if (lane_->status >= DONE) |
185 | { | 155 | { |
186 | // say "ok" by default, including when lane is already done | 156 | // say "ok" by default, including when lane is already done |
@@ -191,48 +161,57 @@ CancelResult thread_cancel(lua_State* L, Lane* lane_, CancelOp op_, double secs_ | |||
191 | // let us hope we never land here with a pointer on a linda that has been destroyed... | 161 | // let us hope we never land here with a pointer on a linda that has been destroyed... |
192 | if (op_ == CancelOp::Soft) | 162 | if (op_ == CancelOp::Soft) |
193 | { | 163 | { |
194 | return thread_cancel_soft(lane_, secs_, force_); | 164 | return thread_cancel_soft(lane_, duration_, wake_lane_); |
165 | } | ||
166 | else if (static_cast<int>(op_) > static_cast<int>(CancelOp::Soft)) | ||
167 | { | ||
168 | lua_sethook(lane_->L, cancel_hook, static_cast<int>(op_), hook_count_); | ||
195 | } | 169 | } |
196 | 170 | ||
197 | return thread_cancel_hard(L, lane_, secs_, force_, waitkill_timeout_); | 171 | return thread_cancel_hard(lane_, duration_, wake_lane_); |
198 | } | 172 | } |
199 | 173 | ||
200 | // ################################################################################################ | 174 | // ################################################################################################ |
201 | // ################################################################################################ | 175 | // ################################################################################################ |
202 | 176 | ||
203 | // > 0: the mask | 177 | CancelOp which_cancel_op(char const* op_string_) |
204 | // = 0: soft | 178 | { |
205 | // < 0: hard | 179 | CancelOp op{ CancelOp::Invalid }; |
206 | static CancelOp which_op(lua_State* L, int idx_) | 180 | if (strcmp(op_string_, "hard") == 0) |
181 | { | ||
182 | op = CancelOp::Hard; | ||
183 | } | ||
184 | else if (strcmp(op_string_, "soft") == 0) | ||
185 | { | ||
186 | op = CancelOp::Soft; | ||
187 | } | ||
188 | else if (strcmp(op_string_, "call") == 0) | ||
189 | { | ||
190 | op = CancelOp::MaskCall; | ||
191 | } | ||
192 | else if (strcmp(op_string_, "ret") == 0) | ||
193 | { | ||
194 | op = CancelOp::MaskRet; | ||
195 | } | ||
196 | else if (strcmp(op_string_, "line") == 0) | ||
197 | { | ||
198 | op = CancelOp::MaskLine; | ||
199 | } | ||
200 | else if (strcmp(op_string_, "count") == 0) | ||
201 | { | ||
202 | op = CancelOp::MaskCount; | ||
203 | } | ||
204 | return op; | ||
205 | } | ||
206 | |||
207 | // ################################################################################################ | ||
208 | |||
209 | static CancelOp which_cancel_op(lua_State* L, int idx_) | ||
207 | { | 210 | { |
208 | if (lua_type(L, idx_) == LUA_TSTRING) | 211 | if (lua_type(L, idx_) == LUA_TSTRING) |
209 | { | 212 | { |
210 | CancelOp op{ CancelOp::Invalid }; | 213 | char const* const str{ lua_tostring(L, idx_) }; |
211 | char const* str = lua_tostring(L, idx_); | 214 | CancelOp op{ which_cancel_op(str) }; |
212 | if (strcmp(str, "hard") == 0) | ||
213 | { | ||
214 | op = CancelOp::Hard; | ||
215 | } | ||
216 | else if (strcmp(str, "soft") == 0) | ||
217 | { | ||
218 | op = CancelOp::Soft; | ||
219 | } | ||
220 | else if (strcmp(str, "call") == 0) | ||
221 | { | ||
222 | op = CancelOp::MaskCall; | ||
223 | } | ||
224 | else if (strcmp(str, "ret") == 0) | ||
225 | { | ||
226 | op = CancelOp::MaskRet; | ||
227 | } | ||
228 | else if (strcmp(str, "line") == 0) | ||
229 | { | ||
230 | op = CancelOp::MaskLine; | ||
231 | } | ||
232 | else if (strcmp(str, "count") == 0) | ||
233 | { | ||
234 | op = CancelOp::MaskCount; | ||
235 | } | ||
236 | lua_remove(L, idx_); // argument is processed, remove it | 215 | lua_remove(L, idx_); // argument is processed, remove it |
237 | if (op == CancelOp::Invalid) | 216 | if (op == CancelOp::Invalid) |
238 | { | 217 | { |
@@ -245,53 +224,60 @@ static CancelOp which_op(lua_State* L, int idx_) | |||
245 | 224 | ||
246 | // ################################################################################################ | 225 | // ################################################################################################ |
247 | 226 | ||
248 | // bool[,reason] = lane_h:cancel( [mode, hookcount] [, timeout] [, force [, forcekill_timeout]]) | 227 | // bool[,reason] = lane_h:cancel( [mode, hookcount] [, timeout] [, wake_lindas]) |
249 | LUAG_FUNC(thread_cancel) | 228 | LUAG_FUNC(thread_cancel) |
250 | { | 229 | { |
251 | Lane* const lane{ lua_toLane(L, 1) }; | 230 | Lane* const lane{ lua_toLane(L, 1) }; |
252 | CancelOp const op{ which_op(L, 2) }; // this removes the op string from the stack | 231 | CancelOp const op{ which_cancel_op(L, 2) }; // this removes the op string from the stack |
253 | 232 | ||
233 | int hook_count{ 0 }; | ||
254 | if (static_cast<int>(op) > static_cast<int>(CancelOp::Soft)) // hook is requested | 234 | if (static_cast<int>(op) > static_cast<int>(CancelOp::Soft)) // hook is requested |
255 | { | 235 | { |
256 | int const hook_count{ static_cast<int>(lua_tointeger(L, 2)) }; | 236 | hook_count = static_cast<int>(luaL_checkinteger(L, 2)); |
257 | lua_remove(L, 2); // argument is processed, remove it | 237 | lua_remove(L, 2); // argument is processed, remove it |
258 | if (hook_count < 1) | 238 | if (hook_count < 1) |
259 | { | 239 | { |
260 | return luaL_error(L, "hook count cannot be < 1"); | 240 | return luaL_error(L, "hook count cannot be < 1"); |
261 | } | 241 | } |
262 | lua_sethook(lane->L, cancel_hook, static_cast<int>(op), hook_count); | ||
263 | } | 242 | } |
264 | 243 | ||
265 | double secs{ 0.0 }; | 244 | lua_Duration wait_timeout{ 0.0 }; |
266 | if (lua_type(L, 2) == LUA_TNUMBER) | 245 | if (lua_type(L, 2) == LUA_TNUMBER) |
267 | { | 246 | { |
268 | secs = lua_tonumber(L, 2); | 247 | wait_timeout = lua_Duration{ lua_tonumber(L, 2) }; |
269 | lua_remove(L, 2); // argument is processed, remove it | 248 | lua_remove(L, 2); // argument is processed, remove it |
270 | if (secs < 0.0) | 249 | if (wait_timeout.count() < 0.0) |
271 | { | 250 | { |
272 | return luaL_error(L, "cancel timeout cannot be < 0"); | 251 | return luaL_error(L, "cancel timeout cannot be < 0"); |
273 | } | 252 | } |
274 | } | 253 | } |
275 | 254 | // we wake by default in "hard" mode (remember that hook is hard too), but this can be turned off if desired | |
276 | bool const force{ lua_toboolean(L, 2) ? true : false }; // false if nothing there | 255 | bool wake_lane{ op != CancelOp::Soft }; |
277 | double const forcekill_timeout{ luaL_optnumber(L, 3, 0.0) }; | 256 | if (lua_gettop(L) >= 2) |
278 | switch (thread_cancel(L, lane, op, secs, force, forcekill_timeout)) | 257 | { |
258 | if (!lua_isboolean(L, 2)) | ||
259 | { | ||
260 | return luaL_error(L, "wake_lindas parameter is not a boolean"); | ||
261 | } | ||
262 | wake_lane = lua_toboolean(L, 2); | ||
263 | lua_remove(L, 2); // argument is processed, remove it | ||
264 | } | ||
265 | switch (thread_cancel(lane, op, hook_count, wait_timeout, wake_lane)) | ||
279 | { | 266 | { |
267 | default: // should never happen unless we added a case and forgot to handle it | ||
268 | ASSERT_L(false); | ||
269 | break; | ||
270 | |||
280 | case CancelResult::Timeout: | 271 | case CancelResult::Timeout: |
281 | lua_pushboolean(L, 0); | 272 | lua_pushboolean(L, 0); |
282 | lua_pushstring(L, "timeout"); | 273 | lua_pushstring(L, "timeout"); |
283 | return 2; | 274 | break; |
284 | 275 | ||
285 | case CancelResult::Cancelled: | 276 | case CancelResult::Cancelled: |
286 | lua_pushboolean(L, 1); | 277 | lua_pushboolean(L, 1); |
287 | push_thread_status(L, lane); | 278 | push_thread_status(L, lane); |
288 | return 2; | 279 | break; |
289 | |||
290 | case CancelResult::Killed: | ||
291 | lua_pushboolean(L, 1); | ||
292 | push_thread_status(L, lane); | ||
293 | return 2; | ||
294 | } | 280 | } |
295 | // should never happen, only here to prevent the compiler from complaining of "not all control paths returning a value" | 281 | // should never happen, only here to prevent the compiler from complaining of "not all control paths returning a value" |
296 | return 0; | 282 | return 2; |
297 | } | 283 | } |
diff --git a/src/cancel.h b/src/cancel.h index 884e193..954b04e 100644 --- a/src/cancel.h +++ b/src/cancel.h | |||
@@ -13,6 +13,8 @@ extern "C" { | |||
13 | #include "uniquekey.h" | 13 | #include "uniquekey.h" |
14 | #include "macros_and_utils.h" | 14 | #include "macros_and_utils.h" |
15 | 15 | ||
16 | #include <chrono> | ||
17 | |||
16 | // ################################################################################################ | 18 | // ################################################################################################ |
17 | 19 | ||
18 | class Lane; // forward | 20 | class Lane; // forward |
@@ -30,8 +32,7 @@ enum class CancelRequest | |||
30 | enum class CancelResult | 32 | enum class CancelResult |
31 | { | 33 | { |
32 | Timeout, | 34 | Timeout, |
33 | Cancelled, | 35 | Cancelled |
34 | Killed | ||
35 | }; | 36 | }; |
36 | 37 | ||
37 | enum class CancelOp | 38 | enum class CancelOp |
@@ -48,7 +49,8 @@ enum class CancelOp | |||
48 | // crc64/we of string "CANCEL_ERROR" generated at http://www.nitrxgen.net/hashgen/ | 49 | // crc64/we of string "CANCEL_ERROR" generated at http://www.nitrxgen.net/hashgen/ |
49 | static constexpr UniqueKey CANCEL_ERROR{ 0xe97d41626cc97577ull }; // 'raise_cancel_error' sentinel | 50 | static constexpr UniqueKey CANCEL_ERROR{ 0xe97d41626cc97577ull }; // 'raise_cancel_error' sentinel |
50 | 51 | ||
51 | CancelResult thread_cancel(lua_State* L, Lane* lane_, CancelOp op_, double secs_, bool force_, double waitkill_timeout_); | 52 | CancelOp which_cancel_op(char const* op_string_); |
53 | CancelResult thread_cancel(Lane* lane_, CancelOp op_, int hook_count_, lua_Duration secs_, bool wake_lindas_); | ||
52 | 54 | ||
53 | [[noreturn]] static inline void raise_cancel_error(lua_State* L) | 55 | [[noreturn]] static inline void raise_cancel_error(lua_State* L) |
54 | { | 56 | { |
diff --git a/src/keeper.cpp b/src/keeper.cpp index 937d190..0aea18e 100644 --- a/src/keeper.cpp +++ b/src/keeper.cpp | |||
@@ -627,7 +627,7 @@ void close_keepers(Universe* U) | |||
627 | } | 627 | } |
628 | for (int i = 0; i < nbKeepers; ++i) | 628 | for (int i = 0; i < nbKeepers; ++i) |
629 | { | 629 | { |
630 | MUTEX_FREE(&U->keepers->keeper_array[i].keeper_cs); | 630 | U->keepers->keeper_array[i].~Keeper(); |
631 | } | 631 | } |
632 | // free the keeper bookkeeping structure | 632 | // free the keeper bookkeeping structure |
633 | U->internal_allocator.free(U->keepers, sizeof(Keepers) + (nbKeepers - 1) * sizeof(Keeper)); | 633 | U->internal_allocator.free(U->keepers, sizeof(Keepers) + (nbKeepers - 1) * sizeof(Keeper)); |
@@ -673,9 +673,14 @@ void init_keepers(Universe* U, lua_State* L) | |||
673 | { | 673 | { |
674 | std::ignore = luaL_error(L, "init_keepers() failed while creating keeper array; out of memory"); | 674 | std::ignore = luaL_error(L, "init_keepers() failed while creating keeper array; out of memory"); |
675 | } | 675 | } |
676 | memset(U->keepers, 0, bytes); | 676 | U->keepers->Keepers::Keepers(); |
677 | U->keepers->gc_threshold = keepers_gc_threshold; | 677 | U->keepers->gc_threshold = keepers_gc_threshold; |
678 | U->keepers->nb_keepers = nb_keepers; | 678 | U->keepers->nb_keepers = nb_keepers; |
679 | |||
680 | for (int i = 0; i < nb_keepers; ++i) | ||
681 | { | ||
682 | U->keepers->keeper_array[i].Keeper::Keeper(); | ||
683 | } | ||
679 | } | 684 | } |
680 | for (int i = 0; i < nb_keepers; ++i) // keepersUD | 685 | for (int i = 0; i < nb_keepers; ++i) // keepersUD |
681 | { | 686 | { |
@@ -687,10 +692,6 @@ void init_keepers(Universe* U, lua_State* L) | |||
687 | } | 692 | } |
688 | 693 | ||
689 | U->keepers->keeper_array[i].L = K; | 694 | U->keepers->keeper_array[i].L = K; |
690 | // we can trigger a GC from inside keeper_call(), where a keeper is acquired | ||
691 | // from there, GC can collect a linda, which would acquire the keeper again, and deadlock the thread. | ||
692 | // therefore, we need a recursive mutex. | ||
693 | MUTEX_RECURSIVE_INIT(&U->keepers->keeper_array[i].keeper_cs); | ||
694 | 695 | ||
695 | if (U->keepers->gc_threshold >= 0) | 696 | if (U->keepers->gc_threshold >= 0) |
696 | { | 697 | { |
@@ -772,8 +773,7 @@ Keeper* keeper_acquire(Keepers* keepers_, uintptr_t magic_) | |||
772 | */ | 773 | */ |
773 | unsigned int i = (unsigned int)((magic_ >> KEEPER_MAGIC_SHIFT) % nbKeepers); | 774 | unsigned int i = (unsigned int)((magic_ >> KEEPER_MAGIC_SHIFT) % nbKeepers); |
774 | Keeper* K = &keepers_->keeper_array[i]; | 775 | Keeper* K = &keepers_->keeper_array[i]; |
775 | 776 | K->m_mutex.lock(); | |
776 | MUTEX_LOCK( &K->keeper_cs); | ||
777 | //++ K->count; | 777 | //++ K->count; |
778 | return K; | 778 | return K; |
779 | } | 779 | } |
@@ -787,7 +787,7 @@ void keeper_release(Keeper* K) | |||
787 | //-- K->count; | 787 | //-- K->count; |
788 | if (K) | 788 | if (K) |
789 | { | 789 | { |
790 | MUTEX_UNLOCK(&K->keeper_cs); | 790 | K->m_mutex.unlock(); |
791 | } | 791 | } |
792 | } | 792 | } |
793 | 793 | ||
@@ -843,7 +843,6 @@ int keeper_call(Universe* U, lua_State* K, keeper_api_t func_, lua_State* L, voi | |||
843 | if ((args == 0) || luaG_inter_copy(U, L, K, args, LookupMode::ToKeeper) == 0) // L->K | 843 | if ((args == 0) || luaG_inter_copy(U, L, K, args, LookupMode::ToKeeper) == 0) // L->K |
844 | { | 844 | { |
845 | lua_call(K, 1 + args, LUA_MULTRET); | 845 | lua_call(K, 1 + args, LUA_MULTRET); |
846 | |||
847 | retvals = lua_gettop(K) - Ktos; | 846 | retvals = lua_gettop(K) - Ktos; |
848 | // note that this can raise a luaL_error while the keeper state (and its mutex) is acquired | 847 | // note that this can raise a luaL_error while the keeper state (and its mutex) is acquired |
849 | // this may interrupt a lane, causing the destruction of the underlying OS thread | 848 | // this may interrupt a lane, causing the destruction of the underlying OS thread |
diff --git a/src/keeper.h b/src/keeper.h index f7e3951..931c1d5 100644 --- a/src/keeper.h +++ b/src/keeper.h | |||
@@ -11,21 +11,23 @@ extern "C" { | |||
11 | #include "threading.h" | 11 | #include "threading.h" |
12 | #include "uniquekey.h" | 12 | #include "uniquekey.h" |
13 | 13 | ||
14 | #include <mutex> | ||
15 | |||
14 | // forwards | 16 | // forwards |
15 | enum class LookupMode; | 17 | enum class LookupMode; |
16 | struct Universe; | 18 | struct Universe; |
17 | 19 | ||
18 | struct Keeper | 20 | struct Keeper |
19 | { | 21 | { |
20 | MUTEX_T keeper_cs; | 22 | std::mutex m_mutex; |
21 | lua_State* L; | 23 | lua_State* L{ nullptr }; |
22 | // int count; | 24 | // int count; |
23 | }; | 25 | }; |
24 | 26 | ||
25 | struct Keepers | 27 | struct Keepers |
26 | { | 28 | { |
27 | int gc_threshold{ 0 }; | 29 | int gc_threshold{ 0 }; |
28 | int nb_keepers; | 30 | int nb_keepers{ 0 }; |
29 | Keeper keeper_array[1]; | 31 | Keeper keeper_array[1]; |
30 | }; | 32 | }; |
31 | 33 | ||
@@ -38,7 +40,7 @@ void close_keepers(Universe* U); | |||
38 | 40 | ||
39 | Keeper* which_keeper(Keepers* keepers_, uintptr_t magic_); | 41 | Keeper* which_keeper(Keepers* keepers_, uintptr_t magic_); |
40 | Keeper* keeper_acquire(Keepers* keepers_, uintptr_t magic_); | 42 | Keeper* keeper_acquire(Keepers* keepers_, uintptr_t magic_); |
41 | void keeper_release(Keeper* K); | 43 | void keeper_release(Keeper* K_); |
42 | void keeper_toggle_nil_sentinels(lua_State* L, int val_i_, LookupMode const mode_); | 44 | void keeper_toggle_nil_sentinels(lua_State* L, int val_i_, LookupMode const mode_); |
43 | int keeper_push_linda_storage(Universe* U, lua_State* L, void* ptr_, uintptr_t magic_); | 45 | int keeper_push_linda_storage(Universe* U, lua_State* L, void* ptr_, uintptr_t magic_); |
44 | 46 | ||
diff --git a/src/lanes.cpp b/src/lanes.cpp index 08584a2..4dd9b46 100644 --- a/src/lanes.cpp +++ b/src/lanes.cpp | |||
@@ -108,11 +108,6 @@ Lane::Lane(Universe* U_, lua_State* L_) | |||
108 | : U{ U_ } | 108 | : U{ U_ } |
109 | , L{ L_ } | 109 | , L{ L_ } |
110 | { | 110 | { |
111 | #if THREADWAIT_METHOD == THREADWAIT_CONDVAR | ||
112 | MUTEX_INIT(&done_lock); | ||
113 | SIGNAL_INIT(&done_signal); | ||
114 | #endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR | ||
115 | |||
116 | #if HAVE_LANE_TRACKING() | 111 | #if HAVE_LANE_TRACKING() |
117 | if (U->tracking_first) | 112 | if (U->tracking_first) |
118 | { | 113 | { |
@@ -121,6 +116,29 @@ Lane::Lane(Universe* U_, lua_State* L_) | |||
121 | #endif // HAVE_LANE_TRACKING() | 116 | #endif // HAVE_LANE_TRACKING() |
122 | } | 117 | } |
123 | 118 | ||
119 | bool Lane::waitForCompletion(lua_Duration duration_) | ||
120 | { | ||
121 | std::chrono::time_point<std::chrono::steady_clock> until{ std::chrono::time_point<std::chrono::steady_clock>::max() }; | ||
122 | if (duration_.count() >= 0.0) | ||
123 | { | ||
124 | until = std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::steady_clock::duration>(duration_); | ||
125 | } | ||
126 | |||
127 | std::unique_lock lock{ m_done_mutex }; | ||
128 | //std::stop_token token{ m_thread.get_stop_token() }; | ||
129 | //return m_done_signal.wait_for(lock, token, secs_, [this](){ return status >= DONE; }); | ||
130 | return m_done_signal.wait_until(lock, until, [this](){ return status >= DONE; }); | ||
131 | } | ||
132 | |||
133 | static void lane_main(Lane* lane); | ||
134 | void Lane::startThread(int priority_) | ||
135 | { | ||
136 | m_thread = std::jthread([this]() { lane_main(this); }); | ||
137 | if (priority_ != THREAD_PRIO_DEFAULT) | ||
138 | { | ||
139 | JTHREAD_SET_PRIORITY(m_thread, priority_); | ||
140 | } | ||
141 | } | ||
124 | 142 | ||
125 | /* Do you want full call stacks, or just the line where the error happened? | 143 | /* Do you want full call stacks, or just the line where the error happened? |
126 | * | 144 | * |
@@ -144,7 +162,7 @@ static void securize_debug_threadname(lua_State* L, Lane* lane_) | |||
144 | } | 162 | } |
145 | 163 | ||
146 | #if ERROR_FULL_STACK | 164 | #if ERROR_FULL_STACK |
147 | static int lane_error( lua_State* L); | 165 | static int lane_error(lua_State* L); |
148 | // crc64/we of string "STACKTRACE_REGKEY" generated at http://www.nitrxgen.net/hashgen/ | 166 | // crc64/we of string "STACKTRACE_REGKEY" generated at http://www.nitrxgen.net/hashgen/ |
149 | static constexpr UniqueKey STACKTRACE_REGKEY{ 0x534af7d3226a429full }; | 167 | static constexpr UniqueKey STACKTRACE_REGKEY{ 0x534af7d3226a429full }; |
150 | #endif // ERROR_FULL_STACK | 168 | #endif // ERROR_FULL_STACK |
@@ -255,11 +273,6 @@ Lane::~Lane() | |||
255 | { | 273 | { |
256 | // Clean up after a (finished) thread | 274 | // Clean up after a (finished) thread |
257 | // | 275 | // |
258 | #if THREADWAIT_METHOD == THREADWAIT_CONDVAR | ||
259 | SIGNAL_FREE(&done_signal); | ||
260 | MUTEX_FREE(&done_lock); | ||
261 | #endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR | ||
262 | |||
263 | #if HAVE_LANE_TRACKING() | 276 | #if HAVE_LANE_TRACKING() |
264 | if (U->tracking_first != nullptr) | 277 | if (U->tracking_first != nullptr) |
265 | { | 278 | { |
@@ -455,26 +468,27 @@ static bool selfdestruct_remove(Lane* lane_) | |||
455 | static int universe_gc( lua_State* L) | 468 | static int universe_gc( lua_State* L) |
456 | { | 469 | { |
457 | Universe* const U{ lua_tofulluserdata<Universe>(L, 1) }; | 470 | Universe* const U{ lua_tofulluserdata<Universe>(L, 1) }; |
471 | lua_Duration const shutdown_timeout{ lua_tonumber(L, lua_upvalueindex(1)) }; | ||
472 | [[maybe_unused]] char const* const op_string{ lua_tostring(L, lua_upvalueindex(2)) }; | ||
473 | CancelOp const op{ which_cancel_op(op_string) }; | ||
458 | 474 | ||
459 | while (U->selfdestruct_first != SELFDESTRUCT_END) // true at most once! | 475 | if (U->selfdestruct_first != SELFDESTRUCT_END) |
460 | { | 476 | { |
477 | |||
461 | // Signal _all_ still running threads to exit (including the timer thread) | 478 | // Signal _all_ still running threads to exit (including the timer thread) |
462 | // | 479 | // |
463 | { | 480 | { |
464 | std::lock_guard<std::mutex> guard{ U->selfdestruct_cs }; | 481 | std::lock_guard<std::mutex> guard{ U->selfdestruct_cs }; |
465 | Lane* lane{ U->selfdestruct_first }; | 482 | Lane* lane{ U->selfdestruct_first }; |
483 | lua_Duration timeout{ 1us }; | ||
466 | while (lane != SELFDESTRUCT_END) | 484 | while (lane != SELFDESTRUCT_END) |
467 | { | 485 | { |
468 | // attempt a regular unforced hard cancel with a small timeout | 486 | // attempt the requested cancel with a small timeout. |
469 | bool const cancelled{ THREAD_ISNULL(lane->thread) || thread_cancel(L, lane, CancelOp::Hard, 0.0001, false, 0.0) != CancelResult::Timeout }; | 487 | // if waiting on a linda, they will raise a cancel_error. |
470 | // if we failed, and we know the thread is waiting on a linda | 488 | // if a cancellation hook is desired, it will be installed to try to raise an error |
471 | if (cancelled == false && lane->status == WAITING && lane->waiting_on != nullptr) | 489 | if (lane->m_thread.joinable()) |
472 | { | 490 | { |
473 | // signal the linda to wake up the thread so that it can react to the cancel query | 491 | std::ignore = thread_cancel(lane, op, 1, timeout, true); |
474 | // let us hope we never land here with a pointer on a linda that has been destroyed... | ||
475 | SIGNAL_T* const waiting_on{ lane->waiting_on }; | ||
476 | // lane->waiting_on = nullptr; // useful, or not? | ||
477 | SIGNAL_ALL(waiting_on); | ||
478 | } | 492 | } |
479 | lane = lane->selfdestruct_next; | 493 | lane = lane->selfdestruct_next; |
480 | } | 494 | } |
@@ -482,47 +496,32 @@ static int universe_gc( lua_State* L) | |||
482 | 496 | ||
483 | // When noticing their cancel, the lanes will remove themselves from | 497 | // When noticing their cancel, the lanes will remove themselves from |
484 | // the selfdestruct chain. | 498 | // the selfdestruct chain. |
485 | |||
486 | // TBD: Not sure if Windows (multi core) will require the timed approach, | ||
487 | // or single Yield. I don't have machine to test that (so leaving | ||
488 | // for timed approach). -- AKa 25-Oct-2008 | ||
489 | |||
490 | // OS X 10.5 (Intel) needs more to avoid segfaults. | ||
491 | // | ||
492 | // "make test" is okay. 100's of "make require" are okay. | ||
493 | // | ||
494 | // Tested on MacBook Core Duo 2GHz and 10.5.5: | ||
495 | // -- AKa 25-Oct-2008 | ||
496 | // | ||
497 | { | 499 | { |
498 | lua_Number const shutdown_timeout = lua_tonumber(L, lua_upvalueindex(1)); | 500 | std::chrono::time_point<std::chrono::steady_clock> t_until{ std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::steady_clock::duration>(shutdown_timeout) }; |
499 | double const t_until = now_secs() + shutdown_timeout; | ||
500 | 501 | ||
501 | while (U->selfdestruct_first != SELFDESTRUCT_END) | 502 | while (U->selfdestruct_first != SELFDESTRUCT_END) |
502 | { | 503 | { |
503 | YIELD(); // give threads time to act on their cancel | 504 | // give threads time to act on their cancel |
505 | YIELD(); | ||
506 | // count the number of cancelled thread that didn't have the time to act yet | ||
507 | int n{ 0 }; | ||
504 | { | 508 | { |
505 | // count the number of cancelled thread that didn't have the time to act yet | 509 | std::lock_guard<std::mutex> guard{ U->selfdestruct_cs }; |
506 | int n = 0; | 510 | Lane* lane{ U->selfdestruct_first }; |
507 | double t_now = 0.0; | 511 | while (lane != SELFDESTRUCT_END) |
508 | { | 512 | { |
509 | std::lock_guard<std::mutex> guard{ U->selfdestruct_cs }; | 513 | if (lane->cancel_request != CancelRequest::None) |
510 | Lane* lane{ U->selfdestruct_first }; | 514 | ++n; |
511 | while (lane != SELFDESTRUCT_END) | 515 | lane = lane->selfdestruct_next; |
512 | { | ||
513 | if (lane->cancel_request == CancelRequest::Hard) | ||
514 | ++n; | ||
515 | lane = lane->selfdestruct_next; | ||
516 | } | ||
517 | } | ||
518 | // if timeout elapsed, or we know all threads have acted, stop waiting | ||
519 | t_now = now_secs(); | ||
520 | if (n == 0 || (t_now >= t_until)) | ||
521 | { | ||
522 | DEBUGSPEW_CODE(fprintf(stderr, "%d uncancelled lane(s) remain after waiting %fs at process end.\n", n, shutdown_timeout - (t_until - t_now))); | ||
523 | break; | ||
524 | } | 516 | } |
525 | } | 517 | } |
518 | // if timeout elapsed, or we know all threads have acted, stop waiting | ||
519 | std::chrono::time_point<std::chrono::steady_clock> t_now = std::chrono::steady_clock::now(); | ||
520 | if (n == 0 || (t_now >= t_until)) | ||
521 | { | ||
522 | DEBUGSPEW_CODE(fprintf(stderr, "%d uncancelled lane(s) remain after waiting %fs at process end.\n", n, shutdown_timeout.count())); | ||
523 | break; | ||
524 | } | ||
526 | } | 525 | } |
527 | } | 526 | } |
528 | 527 | ||
@@ -532,48 +531,17 @@ static int universe_gc( lua_State* L) | |||
532 | { | 531 | { |
533 | YIELD(); | 532 | YIELD(); |
534 | } | 533 | } |
535 | |||
536 | //--- | ||
537 | // Kill the still free running threads | ||
538 | // | ||
539 | if (U->selfdestruct_first != SELFDESTRUCT_END) | ||
540 | { | ||
541 | unsigned int n = 0; | ||
542 | // first thing we did was to raise the linda signals the threads were waiting on (if any) | ||
543 | // therefore, any well-behaved thread should be in CANCELLED state | ||
544 | // these are not running, and the state can be closed | ||
545 | { | ||
546 | std::lock_guard<std::mutex> guard{ U->selfdestruct_cs }; | ||
547 | Lane* lane{ U->selfdestruct_first }; | ||
548 | while (lane != SELFDESTRUCT_END) | ||
549 | { | ||
550 | Lane* const next_s{ lane->selfdestruct_next }; | ||
551 | lane->selfdestruct_next = nullptr; // detach from selfdestruct chain | ||
552 | if (!THREAD_ISNULL(lane->thread)) // can be nullptr if previous 'soft' termination succeeded | ||
553 | { | ||
554 | THREAD_KILL(&lane->thread); | ||
555 | #if THREADAPI == THREADAPI_PTHREAD | ||
556 | // pthread: make sure the thread is really stopped! | ||
557 | THREAD_WAIT(&lane->thread, -1, &lane->done_signal, &lane->done_lock, &lane->status); | ||
558 | #endif // THREADAPI == THREADAPI_PTHREAD | ||
559 | } | ||
560 | // NO lua_close() in this case because we don't know where execution of the state was interrupted | ||
561 | delete lane; | ||
562 | lane = next_s; | ||
563 | ++n; | ||
564 | } | ||
565 | U->selfdestruct_first = SELFDESTRUCT_END; | ||
566 | } | ||
567 | |||
568 | DEBUGSPEW_CODE(fprintf(stderr, "Killed %d lane(s) at process end.\n", n)); | ||
569 | } | ||
570 | } | 534 | } |
571 | 535 | ||
572 | // If some lanes are currently cleaning after themselves, wait until they are done. | 536 | // If after all this, we still have some free-running lanes, it's an external user error, they should have stopped appropriately |
573 | // They are no longer listed in the selfdestruct chain, but they still have to lua_close(). | ||
574 | while (U->selfdestructing_count.load(std::memory_order_acquire) > 0) | ||
575 | { | 537 | { |
576 | YIELD(); | 538 | std::lock_guard<std::mutex> guard{ U->selfdestruct_cs }; |
539 | Lane* lane{ U->selfdestruct_first }; | ||
540 | if (lane != SELFDESTRUCT_END) | ||
541 | { | ||
542 | // this causes a leak because we don't call U's destructor (which could be bad if the still running lanes are accessing it) | ||
543 | std::ignore = luaL_error(L, "Zombie thread %s refuses to die!", lane->debug_name); | ||
544 | } | ||
577 | } | 545 | } |
578 | 546 | ||
579 | // necessary so that calling free_deep_prelude doesn't crash because linda_id expects a linda lightuserdata at absolute slot 1 | 547 | // necessary so that calling free_deep_prelude doesn't crash because linda_id expects a linda lightuserdata at absolute slot 1 |
@@ -874,20 +842,8 @@ static char const* get_errcode_name( int _code) | |||
874 | } | 842 | } |
875 | #endif // USE_DEBUG_SPEW() | 843 | #endif // USE_DEBUG_SPEW() |
876 | 844 | ||
877 | #if THREADWAIT_METHOD == THREADWAIT_CONDVAR // implies THREADAPI == THREADAPI_PTHREAD | 845 | static void lane_main(Lane* lane) |
878 | static void thread_cleanup_handler(void* opaque) | ||
879 | { | 846 | { |
880 | Lane* lane{ (Lane*) opaque }; | ||
881 | MUTEX_LOCK(&lane->done_lock); | ||
882 | lane->status = CANCELLED; | ||
883 | SIGNAL_ONE(&lane->done_signal); // wake up master (while 'lane->done_lock' is on) | ||
884 | MUTEX_UNLOCK(&lane->done_lock); | ||
885 | } | ||
886 | #endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR | ||
887 | |||
888 | static THREAD_RETURN_T THREAD_CALLCONV lane_main(void* vs) | ||
889 | { | ||
890 | Lane* lane{ (Lane*) vs }; | ||
891 | lua_State* const L{ lane->L }; | 847 | lua_State* const L{ lane->L }; |
892 | // wait until the launching thread has finished preparing L | 848 | // wait until the launching thread has finished preparing L |
893 | lane->m_ready.wait(); | 849 | lane->m_ready.wait(); |
@@ -897,8 +853,6 @@ static THREAD_RETURN_T THREAD_CALLCONV lane_main(void* vs) | |||
897 | // At this point, the lane function and arguments are on the stack | 853 | // At this point, the lane function and arguments are on the stack |
898 | int const nargs{ lua_gettop(L) - 1 }; | 854 | int const nargs{ lua_gettop(L) - 1 }; |
899 | DEBUGSPEW_CODE(Universe* U = universe_get(L)); | 855 | DEBUGSPEW_CODE(Universe* U = universe_get(L)); |
900 | THREAD_MAKE_ASYNCH_CANCELLABLE(); | ||
901 | THREAD_CLEANUP_PUSH(thread_cleanup_handler, lane); | ||
902 | lane->status = RUNNING; // PENDING -> RUNNING | 856 | lane->status = RUNNING; // PENDING -> RUNNING |
903 | 857 | ||
904 | // Tie "set_finalizer()" to the state | 858 | // Tie "set_finalizer()" to the state |
@@ -949,18 +903,19 @@ static THREAD_RETURN_T THREAD_CALLCONV lane_main(void* vs) | |||
949 | // the finalizer generated an error, and left its own error message [and stack trace] on the stack | 903 | // the finalizer generated an error, and left its own error message [and stack trace] on the stack |
950 | rc = rc2; // we're overruling the earlier script error or normal return | 904 | rc = rc2; // we're overruling the earlier script error or normal return |
951 | } | 905 | } |
952 | lane->waiting_on = nullptr; // just in case | 906 | lane->m_waiting_on = nullptr; // just in case |
953 | if (selfdestruct_remove(lane)) // check and remove (under lock!) | 907 | if (selfdestruct_remove(lane)) // check and remove (under lock!) |
954 | { | 908 | { |
955 | // We're a free-running thread and no-one's there to clean us up. | 909 | // We're a free-running thread and no-one's there to clean us up. |
956 | // | ||
957 | lua_close(lane->L); | 910 | lua_close(lane->L); |
958 | 911 | lane->L = nullptr; // just in case | |
959 | lane->U->selfdestruct_cs.lock(); | 912 | lane->U->selfdestruct_cs.lock(); |
960 | // done with lua_close(), terminal shutdown sequence may proceed | 913 | // done with lua_close(), terminal shutdown sequence may proceed |
961 | lane->U->selfdestructing_count.fetch_sub(1, std::memory_order_release); | 914 | lane->U->selfdestructing_count.fetch_sub(1, std::memory_order_release); |
962 | lane->U->selfdestruct_cs.unlock(); | 915 | lane->U->selfdestruct_cs.unlock(); |
963 | 916 | ||
917 | // we destroy our jthread member from inside the thread body, so we have to detach so that we don't try to join, as this doesn't seem a good idea | ||
918 | lane->m_thread.detach(); | ||
964 | delete lane; | 919 | delete lane; |
965 | lane = nullptr; | 920 | lane = nullptr; |
966 | } | 921 | } |
@@ -972,21 +927,14 @@ static THREAD_RETURN_T THREAD_CALLCONV lane_main(void* vs) | |||
972 | enum e_status st = (rc == 0) ? DONE : CANCEL_ERROR.equals(L, 1) ? CANCELLED : ERROR_ST; | 927 | enum e_status st = (rc == 0) ? DONE : CANCEL_ERROR.equals(L, 1) ? CANCELLED : ERROR_ST; |
973 | 928 | ||
974 | // Posix no PTHREAD_TIMEDJOIN: | 929 | // Posix no PTHREAD_TIMEDJOIN: |
975 | // 'done_lock' protects the -> DONE|ERROR_ST|CANCELLED state change | 930 | // 'm_done_mutex' protects the -> DONE|ERROR_ST|CANCELLED state change |
976 | // | 931 | // |
977 | #if THREADWAIT_METHOD == THREADWAIT_CONDVAR | ||
978 | MUTEX_LOCK(&lane->done_lock); | ||
979 | { | 932 | { |
980 | #endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR | 933 | std::lock_guard lock{ lane->m_done_mutex }; |
981 | lane->status = st; | 934 | lane->status = st; |
982 | #if THREADWAIT_METHOD == THREADWAIT_CONDVAR | 935 | lane->m_done_signal.notify_one();// wake up master (while 'lane->m_done_mutex' is on) |
983 | SIGNAL_ONE(&lane->done_signal); // wake up master (while 'lane->done_lock' is on) | ||
984 | } | 936 | } |
985 | MUTEX_UNLOCK(&lane->done_lock); | ||
986 | #endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR | ||
987 | } | 937 | } |
988 | THREAD_CLEANUP_POP(false); | ||
989 | return 0; // ignored | ||
990 | } | 938 | } |
991 | 939 | ||
992 | // ################################################################################################# | 940 | // ################################################################################################# |
@@ -1115,13 +1063,11 @@ LUAG_FUNC(lane_new) | |||
1115 | // leave a single cancel_error on the stack for the caller | 1063 | // leave a single cancel_error on the stack for the caller |
1116 | lua_settop(m_lane->L, 0); | 1064 | lua_settop(m_lane->L, 0); |
1117 | CANCEL_ERROR.pushKey(m_lane->L); | 1065 | CANCEL_ERROR.pushKey(m_lane->L); |
1118 | #if THREADWAIT_METHOD == THREADWAIT_CONDVAR | 1066 | { |
1119 | MUTEX_LOCK(&m_lane->done_lock); | 1067 | std::lock_guard lock{ m_lane->m_done_mutex }; |
1120 | #endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR | 1068 | m_lane->status = CANCELLED; |
1121 | m_lane->status = CANCELLED; | 1069 | m_lane->m_done_signal.notify_one(); // wake up master (while 'lane->m_done_mutex' is on) |
1122 | #if THREADWAIT_METHOD == THREADWAIT_CONDVAR | 1070 | } |
1123 | MUTEX_UNLOCK(&m_lane->done_lock); | ||
1124 | #endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR | ||
1125 | // unblock the thread so that it can terminate gracefully | 1071 | // unblock the thread so that it can terminate gracefully |
1126 | m_lane->m_ready.count_down(); | 1072 | m_lane->m_ready.count_down(); |
1127 | } | 1073 | } |
@@ -1170,7 +1116,7 @@ LUAG_FUNC(lane_new) | |||
1170 | } onExit{ L, lane, gc_cb_idx }; | 1116 | } onExit{ L, lane, gc_cb_idx }; |
1171 | // launch the thread early, it will sync with a std::latch to parallelize OS thread warmup and L2 preparation | 1117 | // launch the thread early, it will sync with a std::latch to parallelize OS thread warmup and L2 preparation |
1172 | DEBUGSPEW_CODE(fprintf(stderr, INDENT_BEGIN "lane_new: launching thread\n" INDENT_END)); | 1118 | DEBUGSPEW_CODE(fprintf(stderr, INDENT_BEGIN "lane_new: launching thread\n" INDENT_END)); |
1173 | THREAD_CREATE(&lane->thread, lane_main, lane, priority); | 1119 | lane->startThread(priority); |
1174 | 1120 | ||
1175 | STACK_GROW( L2, nargs + 3); // | 1121 | STACK_GROW( L2, nargs + 3); // |
1176 | STACK_CHECK_START_REL(L2, 0); | 1122 | STACK_CHECK_START_REL(L2, 0); |
@@ -1347,7 +1293,7 @@ LUAG_FUNC(lane_new) | |||
1347 | static int lane_gc(lua_State* L) | 1293 | static int lane_gc(lua_State* L) |
1348 | { | 1294 | { |
1349 | bool have_gc_cb{ false }; | 1295 | bool have_gc_cb{ false }; |
1350 | Lane* lane{ lua_toLane(L, 1) }; // ud | 1296 | Lane* const lane{ lua_toLane(L, 1) }; // ud |
1351 | 1297 | ||
1352 | // if there a gc callback? | 1298 | // if there a gc callback? |
1353 | lua_getiuservalue(L, 1, 1); // ud uservalue | 1299 | lua_getiuservalue(L, 1, 1); // ud uservalue |
@@ -1365,30 +1311,7 @@ static int lane_gc(lua_State* L) | |||
1365 | } | 1311 | } |
1366 | 1312 | ||
1367 | // We can read 'lane->status' without locks, but not wait for it | 1313 | // We can read 'lane->status' without locks, but not wait for it |
1368 | // test Killed state first, as it doesn't need to enter the selfdestruct chain | 1314 | if (lane->status < DONE) |
1369 | if (lane->mstatus == Lane::Killed) | ||
1370 | { | ||
1371 | // Make sure a kill has proceeded, before cleaning up the data structure. | ||
1372 | // | ||
1373 | // NO lua_close() in this case because we don't know where execution of the state was interrupted | ||
1374 | DEBUGSPEW_CODE(fprintf(stderr, "** Joining with a killed thread (needs testing) **")); | ||
1375 | // make sure the thread is no longer running, just like thread_join() | ||
1376 | if (!THREAD_ISNULL(lane->thread)) | ||
1377 | { | ||
1378 | THREAD_WAIT(&lane->thread, -1, &lane->done_signal, &lane->done_lock, &lane->status); | ||
1379 | } | ||
1380 | if (lane->status >= DONE && lane->L) | ||
1381 | { | ||
1382 | // we know the thread was killed while the Lua VM was not doing anything: we should be able to close it without crashing | ||
1383 | // now, thread_cancel() will not forcefully kill a lane with lane->status >= DONE, so I am not sure it can ever happen | ||
1384 | lua_close(lane->L); | ||
1385 | lane->L = nullptr; | ||
1386 | // just in case, but s will be freed soon so... | ||
1387 | lane->debug_name = "<gc>"; | ||
1388 | } | ||
1389 | DEBUGSPEW_CODE(fprintf(stderr, "** Joined ok **")); | ||
1390 | } | ||
1391 | else if (lane->status < DONE) | ||
1392 | { | 1315 | { |
1393 | // still running: will have to be cleaned up later | 1316 | // still running: will have to be cleaned up later |
1394 | selfdestruct_add(lane); | 1317 | selfdestruct_add(lane); |
@@ -1437,7 +1360,6 @@ static char const * thread_status_string(Lane* lane_) | |||
1437 | { | 1360 | { |
1438 | enum e_status const st{ lane_->status }; // read just once (volatile) | 1361 | enum e_status const st{ lane_->status }; // read just once (volatile) |
1439 | char const* str = | 1362 | char const* str = |
1440 | (lane_->mstatus == Lane::Killed) ? "killed" : // new to v3.3.0! | ||
1441 | (st == PENDING) ? "pending" : | 1363 | (st == PENDING) ? "pending" : |
1442 | (st == RUNNING) ? "running" : // like in 'co.status()' | 1364 | (st == RUNNING) ? "running" : // like in 'co.status()' |
1443 | (st == WAITING) ? "waiting" : | 1365 | (st == WAITING) ? "waiting" : |
@@ -1471,9 +1393,10 @@ int push_thread_status(lua_State* L, Lane* lane_) | |||
1471 | LUAG_FUNC(thread_join) | 1393 | LUAG_FUNC(thread_join) |
1472 | { | 1394 | { |
1473 | Lane* const lane{ lua_toLane(L, 1) }; | 1395 | Lane* const lane{ lua_toLane(L, 1) }; |
1474 | lua_Number const wait_secs{ luaL_optnumber(L, 2, -1.0) }; | 1396 | lua_Duration const duration{ luaL_optnumber(L, 2, -1.0) }; |
1475 | lua_State* const L2{ lane->L }; | 1397 | lua_State* const L2{ lane->L }; |
1476 | bool const done{ THREAD_ISNULL(lane->thread) || THREAD_WAIT(&lane->thread, wait_secs, &lane->done_signal, &lane->done_lock, &lane->status) }; | 1398 | |
1399 | bool const done{ !lane->m_thread.joinable() || lane->waitForCompletion(duration) }; | ||
1477 | if (!done || !L2) | 1400 | if (!done || !L2) |
1478 | { | 1401 | { |
1479 | STACK_GROW(L, 2); | 1402 | STACK_GROW(L, 2); |
@@ -1486,58 +1409,47 @@ LUAG_FUNC(thread_join) | |||
1486 | // Thread is DONE/ERROR_ST/CANCELLED; all ours now | 1409 | // Thread is DONE/ERROR_ST/CANCELLED; all ours now |
1487 | 1410 | ||
1488 | int ret{ 0 }; | 1411 | int ret{ 0 }; |
1489 | if (lane->mstatus == Lane::Killed) // OS thread was killed if thread_cancel was forced | 1412 | Universe* const U{ lane->U }; |
1490 | { | 1413 | // debug_name is a pointer to string possibly interned in the lane's state, that no longer exists when the state is closed |
1491 | // in that case, even if the thread was killed while DONE/ERROR_ST/CANCELLED, ignore regular return values | 1414 | // so store it in the userdata uservalue at a key that can't possibly collide |
1492 | STACK_GROW(L, 2); | 1415 | securize_debug_threadname(L, lane); |
1493 | lua_pushnil(L); | 1416 | switch (lane->status) |
1494 | lua_pushliteral(L, "killed"); | ||
1495 | ret = 2; | ||
1496 | } | ||
1497 | else | ||
1498 | { | 1417 | { |
1499 | Universe* const U{ lane->U }; | 1418 | case DONE: |
1500 | // debug_name is a pointer to string possibly interned in the lane's state, that no longer exists when the state is closed | ||
1501 | // so store it in the userdata uservalue at a key that can't possibly collide | ||
1502 | securize_debug_threadname(L, lane); | ||
1503 | switch (lane->status) | ||
1504 | { | 1419 | { |
1505 | case DONE: | 1420 | int const n{ lua_gettop(L2) }; // whole L2 stack |
1421 | if ((n > 0) && (luaG_inter_move(U, L2, L, n, LookupMode::LaneBody) != 0)) | ||
1506 | { | 1422 | { |
1507 | int const n{ lua_gettop(L2) }; // whole L2 stack | 1423 | return luaL_error(L, "tried to copy unsupported types"); |
1508 | if ((n > 0) && (luaG_inter_move(U, L2, L, n, LookupMode::LaneBody) != 0)) | ||
1509 | { | ||
1510 | return luaL_error(L, "tried to copy unsupported types"); | ||
1511 | } | ||
1512 | ret = n; | ||
1513 | } | 1424 | } |
1514 | break; | 1425 | ret = n; |
1426 | } | ||
1427 | break; | ||
1515 | 1428 | ||
1516 | case ERROR_ST: | 1429 | case ERROR_ST: |
1430 | { | ||
1431 | int const n{ lua_gettop(L2) }; | ||
1432 | STACK_GROW(L, 3); | ||
1433 | lua_pushnil(L); | ||
1434 | // even when ERROR_FULL_STACK, if the error is not LUA_ERRRUN, the handler wasn't called, and we only have 1 error message on the stack ... | ||
1435 | if (luaG_inter_move(U, L2, L, n, LookupMode::LaneBody) != 0) // nil "err" [trace] | ||
1517 | { | 1436 | { |
1518 | int const n{ lua_gettop(L2) }; | 1437 | return luaL_error(L, "tried to copy unsupported types: %s", lua_tostring(L, -n)); |
1519 | STACK_GROW(L, 3); | ||
1520 | lua_pushnil(L); | ||
1521 | // even when ERROR_FULL_STACK, if the error is not LUA_ERRRUN, the handler wasn't called, and we only have 1 error message on the stack ... | ||
1522 | if (luaG_inter_move(U, L2, L, n, LookupMode::LaneBody) != 0) // nil "err" [trace] | ||
1523 | { | ||
1524 | return luaL_error(L, "tried to copy unsupported types: %s", lua_tostring(L, -n)); | ||
1525 | } | ||
1526 | ret = 1 + n; | ||
1527 | } | 1438 | } |
1528 | break; | 1439 | ret = 1 + n; |
1440 | } | ||
1441 | break; | ||
1529 | 1442 | ||
1530 | case CANCELLED: | 1443 | case CANCELLED: |
1531 | ret = 0; | 1444 | ret = 0; |
1532 | break; | 1445 | break; |
1533 | 1446 | ||
1534 | default: | 1447 | default: |
1535 | DEBUGSPEW_CODE(fprintf(stderr, "Status: %d\n", lane->status)); | 1448 | DEBUGSPEW_CODE(fprintf(stderr, "Status: %d\n", lane->status)); |
1536 | ASSERT_L(false); | 1449 | ASSERT_L(false); |
1537 | ret = 0; | 1450 | ret = 0; |
1538 | } | ||
1539 | lua_close(L2); | ||
1540 | } | 1451 | } |
1452 | lua_close(L2); | ||
1541 | lane->L = nullptr; | 1453 | lane->L = nullptr; |
1542 | STACK_CHECK(L, ret); | 1454 | STACK_CHECK(L, ret); |
1543 | return ret; | 1455 | return ret; |
@@ -1596,15 +1508,12 @@ LUAG_FUNC(thread_index) | |||
1596 | switch (lane->status) | 1508 | switch (lane->status) |
1597 | { | 1509 | { |
1598 | default: | 1510 | default: |
1599 | if (lane->mstatus != Lane::Killed) | 1511 | // this is an internal error, we probably never get here |
1600 | { | 1512 | lua_settop(L, 0); |
1601 | // this is an internal error, we probably never get here | 1513 | lua_pushliteral(L, "Unexpected status: "); |
1602 | lua_settop(L, 0); | 1514 | lua_pushstring(L, thread_status_string(lane)); |
1603 | lua_pushliteral(L, "Unexpected status: "); | 1515 | lua_concat(L, 2); |
1604 | lua_pushstring(L, thread_status_string(lane)); | 1516 | raise_lua_error(L); |
1605 | lua_concat(L, 2); | ||
1606 | raise_lua_error(L); | ||
1607 | } | ||
1608 | [[fallthrough]]; // fall through if we are killed, as we got nil, "killed" on the stack | 1517 | [[fallthrough]]; // fall through if we are killed, as we got nil, "killed" on the stack |
1609 | 1518 | ||
1610 | case DONE: // got regular return values | 1519 | case DONE: // got regular return values |
@@ -1790,8 +1699,7 @@ LUAG_FUNC(wakeup_conv) | |||
1790 | lua_pop(L,1); | 1699 | lua_pop(L,1); |
1791 | STACK_CHECK(L, 0); | 1700 | STACK_CHECK(L, 0); |
1792 | 1701 | ||
1793 | struct tm t; | 1702 | std::tm t{}; |
1794 | memset(&t, 0, sizeof(t)); | ||
1795 | t.tm_year = year - 1900; | 1703 | t.tm_year = year - 1900; |
1796 | t.tm_mon= month-1; // 0..11 | 1704 | t.tm_mon= month-1; // 0..11 |
1797 | t.tm_mday= day; // 1..31 | 1705 | t.tm_mday= day; // 1..31 |
@@ -1800,7 +1708,7 @@ LUAG_FUNC(wakeup_conv) | |||
1800 | t.tm_sec= sec; // 0..60 | 1708 | t.tm_sec= sec; // 0..60 |
1801 | t.tm_isdst= isdst; // 0/1/negative | 1709 | t.tm_isdst= isdst; // 0/1/negative |
1802 | 1710 | ||
1803 | lua_pushnumber(L, static_cast<lua_Number>(mktime(&t))); // ms=0 | 1711 | lua_pushnumber(L, static_cast<lua_Number>(std::mktime(&t))); // resolution: 1 second |
1804 | return 1; | 1712 | return 1; |
1805 | } | 1713 | } |
1806 | 1714 | ||
@@ -1909,13 +1817,14 @@ LUAG_FUNC(configure) | |||
1909 | DEBUGSPEW_CODE(fprintf( stderr, INDENT_BEGIN "%p: lanes.configure() BEGIN\n" INDENT_END, L)); | 1817 | DEBUGSPEW_CODE(fprintf( stderr, INDENT_BEGIN "%p: lanes.configure() BEGIN\n" INDENT_END, L)); |
1910 | DEBUGSPEW_CODE(if (U) U->debugspew_indent_depth.fetch_add(1, std::memory_order_relaxed)); | 1818 | DEBUGSPEW_CODE(if (U) U->debugspew_indent_depth.fetch_add(1, std::memory_order_relaxed)); |
1911 | 1819 | ||
1912 | if(U == nullptr) | 1820 | if (U == nullptr) |
1913 | { | 1821 | { |
1914 | U = universe_create( L); // settings universe | 1822 | U = universe_create(L); // settings universe |
1915 | DEBUGSPEW_CODE(U->debugspew_indent_depth.fetch_add(1, std::memory_order_relaxed)); | 1823 | DEBUGSPEW_CODE(U->debugspew_indent_depth.fetch_add(1, std::memory_order_relaxed)); |
1916 | lua_newtable( L); // settings universe mt | 1824 | lua_newtable( L); // settings universe mt |
1917 | lua_getfield(L, 1, "shutdown_timeout"); // settings universe mt shutdown_timeout | 1825 | lua_getfield(L, 1, "shutdown_timeout"); // settings universe mt shutdown_timeout |
1918 | lua_pushcclosure(L, universe_gc, 1); // settings universe mt universe_gc | 1826 | lua_getfield(L, 1, "shutdown_mode"); // settings universe mt shutdown_timeout shutdown_mode |
1827 | lua_pushcclosure(L, universe_gc, 2); // settings universe mt universe_gc | ||
1919 | lua_setfield(L, -2, "__gc"); // settings universe mt | 1828 | lua_setfield(L, -2, "__gc"); // settings universe mt |
1920 | lua_setmetatable(L, -2); // settings universe | 1829 | lua_setmetatable(L, -2); // settings universe |
1921 | lua_pop(L, 1); // settings | 1830 | lua_pop(L, 1); // settings |
diff --git a/src/lanes.lua b/src/lanes.lua index 6af286a..fd3d22b 100644 --- a/src/lanes.lua +++ b/src/lanes.lua | |||
@@ -73,6 +73,7 @@ lanes.configure = function( settings_) | |||
73 | keepers_gc_threshold = -1, | 73 | keepers_gc_threshold = -1, |
74 | on_state_create = nil, | 74 | on_state_create = nil, |
75 | shutdown_timeout = 0.25, | 75 | shutdown_timeout = 0.25, |
76 | shutdown_mode = "hard", | ||
76 | with_timers = true, | 77 | with_timers = true, |
77 | track_lanes = false, | 78 | track_lanes = false, |
78 | demote_full_userdata = nil, | 79 | demote_full_userdata = nil, |
@@ -113,6 +114,11 @@ lanes.configure = function( settings_) | |||
113 | -- shutdown_timeout should be a number >= 0 | 114 | -- shutdown_timeout should be a number >= 0 |
114 | return type( val_) == "number" and val_ >= 0 | 115 | return type( val_) == "number" and val_ >= 0 |
115 | end, | 116 | end, |
117 | shutdown_mode = function( val_) | ||
118 | local valid_hooks = { soft = true, hard = true, call = true, ret = true, line = true, count = true } | ||
119 | -- shutdown_mode should be a known hook mask | ||
120 | return valid_hooks[val_] | ||
121 | end, | ||
116 | track_lanes = boolean_param_checker, | 122 | track_lanes = boolean_param_checker, |
117 | demote_full_userdata = boolean_param_checker, | 123 | demote_full_userdata = boolean_param_checker, |
118 | verbose_errors = boolean_param_checker | 124 | verbose_errors = boolean_param_checker |
@@ -367,262 +373,263 @@ lanes.configure = function( settings_) | |||
367 | 373 | ||
368 | 374 | ||
369 | if settings.with_timers ~= false then | 375 | if settings.with_timers ~= false then |
376 | -- | ||
377 | -- On first 'require "lanes"', a timer lane is spawned that will maintain | ||
378 | -- timer tables and sleep in between the timer events. All interaction with | ||
379 | -- the timer lane happens via a 'timer_gateway' Linda, which is common to | ||
380 | -- all that 'require "lanes"'. | ||
381 | -- | ||
382 | -- Linda protocol to timer lane: | ||
383 | -- | ||
384 | -- TGW_KEY: linda_h, key, [wakeup_at_secs], [repeat_secs] | ||
385 | -- | ||
386 | local TGW_KEY= "(timer control)" -- the key does not matter, a 'weird' key may help debugging | ||
387 | local TGW_QUERY, TGW_REPLY = "(timer query)", "(timer reply)" | ||
388 | local first_time_key= "first time" | ||
370 | 389 | ||
371 | -- | 390 | local first_time = timer_gateway:get( first_time_key) == nil |
372 | -- On first 'require "lanes"', a timer lane is spawned that will maintain | 391 | timer_gateway:set( first_time_key, true) |
373 | -- timer tables and sleep in between the timer events. All interaction with | ||
374 | -- the timer lane happens via a 'timer_gateway' Linda, which is common to | ||
375 | -- all that 'require "lanes"'. | ||
376 | -- | ||
377 | -- Linda protocol to timer lane: | ||
378 | -- | ||
379 | -- TGW_KEY: linda_h, key, [wakeup_at_secs], [repeat_secs] | ||
380 | -- | ||
381 | local TGW_KEY= "(timer control)" -- the key does not matter, a 'weird' key may help debugging | ||
382 | local TGW_QUERY, TGW_REPLY = "(timer query)", "(timer reply)" | ||
383 | local first_time_key= "first time" | ||
384 | |||
385 | local first_time = timer_gateway:get( first_time_key) == nil | ||
386 | timer_gateway:set( first_time_key, true) | ||
387 | |||
388 | -- | ||
389 | -- Timer lane; initialize only on the first 'require "lanes"' instance (which naturally | ||
390 | -- has 'table' always declared) | ||
391 | -- | ||
392 | if first_time then | ||
393 | 392 | ||
394 | local now_secs = core.now_secs | 393 | local now_secs = core.now_secs |
395 | assert( type( now_secs) == "function") | 394 | local wakeup_conv = core.wakeup_conv |
396 | ----- | 395 | |
397 | -- Snore loop (run as a lane on the background) | ||
398 | -- | ||
399 | -- High priority, to get trustworthy timings. | ||
400 | -- | 396 | -- |
401 | -- We let the timer lane be a "free running" thread; no handle to it | 397 | -- Timer lane; initialize only on the first 'require "lanes"' instance (which naturally |
402 | -- remains. | 398 | -- has 'table' always declared) |
403 | -- | 399 | -- |
404 | local timer_body = function() | 400 | if first_time then |
405 | set_debug_threadname( "LanesTimer") | 401 | |
406 | -- | 402 | assert( type( now_secs) == "function") |
407 | -- { [deep_linda_lightuserdata]= { [deep_linda_lightuserdata]=linda_h, | 403 | ----- |
408 | -- [key]= { wakeup_secs [,period_secs] } [, ...] }, | 404 | -- Snore loop (run as a lane on the background) |
409 | -- } | ||
410 | -- | ||
411 | -- Collection of all running timers, indexed with linda's & key. | ||
412 | -- | 405 | -- |
413 | -- Note that we need to use the deep lightuserdata identifiers, instead | 406 | -- High priority, to get trustworthy timings. |
414 | -- of 'linda_h' themselves as table indices. Otherwise, we'd get multiple | ||
415 | -- entries for the same timer. | ||
416 | -- | 407 | -- |
417 | -- The 'hidden' reference to Linda proxy is used in 'check_timers()' but | 408 | -- We let the timer lane be a "free running" thread; no handle to it |
418 | -- also important to keep the Linda alive, even if all outside world threw | 409 | -- remains. |
419 | -- away pointers to it (which would ruin uniqueness of the deep pointer). | ||
420 | -- Now we're safe. | ||
421 | -- | 410 | -- |
422 | local collection = {} | 411 | local timer_body = function() |
423 | local table_insert = assert( table.insert) | 412 | set_debug_threadname( "LanesTimer") |
424 | 413 | -- | |
425 | local get_timers = function() | 414 | -- { [deep_linda_lightuserdata]= { [deep_linda_lightuserdata]=linda_h, |
426 | local r = {} | 415 | -- [key]= { wakeup_secs [,period_secs] } [, ...] }, |
427 | for deep, t in pairs( collection) do | 416 | -- } |
428 | -- WR( tostring( deep)) | 417 | -- |
429 | local l = t[deep] | 418 | -- Collection of all running timers, indexed with linda's & key. |
430 | for key, timer_data in pairs( t) do | 419 | -- |
431 | if key ~= deep then | 420 | -- Note that we need to use the deep lightuserdata identifiers, instead |
432 | table_insert( r, {l, key, timer_data}) | 421 | -- of 'linda_h' themselves as table indices. Otherwise, we'd get multiple |
422 | -- entries for the same timer. | ||
423 | -- | ||
424 | -- The 'hidden' reference to Linda proxy is used in 'check_timers()' but | ||
425 | -- also important to keep the Linda alive, even if all outside world threw | ||
426 | -- away pointers to it (which would ruin uniqueness of the deep pointer). | ||
427 | -- Now we're safe. | ||
428 | -- | ||
429 | local collection = {} | ||
430 | local table_insert = assert( table.insert) | ||
431 | |||
432 | local get_timers = function() | ||
433 | local r = {} | ||
434 | for deep, t in pairs( collection) do | ||
435 | -- WR( tostring( deep)) | ||
436 | local l = t[deep] | ||
437 | for key, timer_data in pairs( t) do | ||
438 | if key ~= deep then | ||
439 | table_insert( r, {l, key, timer_data}) | ||
440 | end | ||
433 | end | 441 | end |
434 | end | 442 | end |
435 | end | 443 | return r |
436 | return r | 444 | end -- get_timers() |
437 | end -- get_timers() | ||
438 | |||
439 | -- | ||
440 | -- set_timer( linda_h, key [,wakeup_at_secs [,period_secs]] ) | ||
441 | -- | ||
442 | local set_timer = function( linda, key, wakeup_at, period) | ||
443 | assert( wakeup_at == nil or wakeup_at > 0.0) | ||
444 | assert( period == nil or period > 0.0) | ||
445 | 445 | ||
446 | local linda_deep = linda:deep() | ||
447 | assert( linda_deep) | ||
448 | |||
449 | -- Find or make a lookup for this timer | ||
450 | -- | 446 | -- |
451 | local t1 = collection[linda_deep] | 447 | -- set_timer( linda_h, key [,wakeup_at_secs [,period_secs]] ) |
452 | if not t1 then | 448 | -- |
453 | t1 = { [linda_deep] = linda} -- proxy to use the Linda | 449 | local set_timer = function( linda, key, wakeup_at, period) |
454 | collection[linda_deep] = t1 | 450 | assert( wakeup_at == nil or wakeup_at > 0.0) |
455 | end | 451 | assert( period == nil or period > 0.0) |
456 | 452 | ||
457 | if wakeup_at == nil then | 453 | local linda_deep = linda:deep() |
458 | -- Clear the timer | 454 | assert( linda_deep) |
459 | -- | ||
460 | t1[key]= nil | ||
461 | 455 | ||
462 | -- Remove empty tables from collection; speeds timer checks and | 456 | -- Find or make a lookup for this timer |
463 | -- lets our 'safety reference' proxy be gc:ed as well. | ||
464 | -- | 457 | -- |
465 | local empty = true | 458 | local t1 = collection[linda_deep] |
466 | for k, _ in pairs( t1) do | 459 | if not t1 then |
467 | if k ~= linda_deep then | 460 | t1 = { [linda_deep] = linda} -- proxy to use the Linda |
468 | empty = false | 461 | collection[linda_deep] = t1 |
469 | break | ||
470 | end | ||
471 | end | ||
472 | if empty then | ||
473 | collection[linda_deep] = nil | ||
474 | end | 462 | end |
475 | 463 | ||
476 | -- Note: any unread timer value is left at 'linda[key]' intensionally; | 464 | if wakeup_at == nil then |
477 | -- clearing a timer just stops it. | 465 | -- Clear the timer |
478 | else | 466 | -- |
479 | -- New timer or changing the timings | 467 | t1[key]= nil |
480 | -- | ||
481 | local t2 = t1[key] | ||
482 | if not t2 then | ||
483 | t2= {} | ||
484 | t1[key]= t2 | ||
485 | end | ||
486 | 468 | ||
487 | t2[1] = wakeup_at | 469 | -- Remove empty tables from collection; speeds timer checks and |
488 | t2[2] = period -- can be 'nil' | 470 | -- lets our 'safety reference' proxy be gc:ed as well. |
489 | end | 471 | -- |
490 | end -- set_timer() | 472 | local empty = true |
473 | for k, _ in pairs( t1) do | ||
474 | if k ~= linda_deep then | ||
475 | empty = false | ||
476 | break | ||
477 | end | ||
478 | end | ||
479 | if empty then | ||
480 | collection[linda_deep] = nil | ||
481 | end | ||
491 | 482 | ||
492 | ----- | 483 | -- Note: any unread timer value is left at 'linda[key]' intensionally; |
493 | -- [next_wakeup_at]= check_timers() | 484 | -- clearing a timer just stops it. |
494 | -- Check timers, and wake up the ones expired (if any) | 485 | else |
495 | -- Returns the closest upcoming (remaining) wakeup time (or 'nil' if none). | 486 | -- New timer or changing the timings |
496 | local check_timers = function() | ||
497 | local now = now_secs() | ||
498 | local next_wakeup | ||
499 | |||
500 | for linda_deep,t1 in pairs(collection) do | ||
501 | for key,t2 in pairs(t1) do | ||
502 | -- | 487 | -- |
503 | if key==linda_deep then | 488 | local t2 = t1[key] |
504 | -- no 'continue' in Lua :/ | 489 | if not t2 then |
505 | else | 490 | t2= {} |
506 | -- 't2': { wakeup_at_secs [,period_secs] } | 491 | t1[key]= t2 |
492 | end | ||
493 | |||
494 | t2[1] = wakeup_at | ||
495 | t2[2] = period -- can be 'nil' | ||
496 | end | ||
497 | end -- set_timer() | ||
498 | |||
499 | ----- | ||
500 | -- [next_wakeup_at]= check_timers() | ||
501 | -- Check timers, and wake up the ones expired (if any) | ||
502 | -- Returns the closest upcoming (remaining) wakeup time (or 'nil' if none). | ||
503 | local check_timers = function() | ||
504 | local now = now_secs() | ||
505 | local next_wakeup | ||
506 | |||
507 | for linda_deep,t1 in pairs(collection) do | ||
508 | for key,t2 in pairs(t1) do | ||
507 | -- | 509 | -- |
508 | local wakeup_at= t2[1] | 510 | if key==linda_deep then |
509 | local period= t2[2] -- may be 'nil' | 511 | -- no 'continue' in Lua :/ |
510 | 512 | else | |
511 | if wakeup_at <= now then | 513 | -- 't2': { wakeup_at_secs [,period_secs] } |
512 | local linda= t1[linda_deep] | 514 | -- |
513 | assert(linda) | 515 | local wakeup_at= t2[1] |
514 | 516 | local period= t2[2] -- may be 'nil' | |
515 | linda:set( key, now ) | 517 | |
516 | 518 | if wakeup_at <= now then | |
517 | -- 'pairs()' allows the values to be modified (and even | 519 | local linda= t1[linda_deep] |
518 | -- removed) as far as keys are not touched | 520 | assert(linda) |
519 | 521 | ||
520 | if not period then | 522 | linda:set( key, now ) |
521 | -- one-time timer; gone | 523 | |
522 | -- | 524 | -- 'pairs()' allows the values to be modified (and even |
523 | t1[key]= nil | 525 | -- removed) as far as keys are not touched |
524 | wakeup_at= nil -- no 'continue' in Lua :/ | 526 | |
525 | else | 527 | if not period then |
526 | -- repeating timer; find next wakeup (may jump multiple repeats) | 528 | -- one-time timer; gone |
527 | -- | 529 | -- |
528 | repeat | 530 | t1[key]= nil |
529 | wakeup_at= wakeup_at+period | 531 | wakeup_at= nil -- no 'continue' in Lua :/ |
530 | until wakeup_at > now | 532 | else |
531 | 533 | -- repeating timer; find next wakeup (may jump multiple repeats) | |
532 | t2[1]= wakeup_at | 534 | -- |
535 | repeat | ||
536 | wakeup_at= wakeup_at+period | ||
537 | until wakeup_at > now | ||
538 | |||
539 | t2[1]= wakeup_at | ||
540 | end | ||
533 | end | 541 | end |
534 | end | ||
535 | 542 | ||
536 | if wakeup_at and ((not next_wakeup) or (wakeup_at < next_wakeup)) then | 543 | if wakeup_at and ((not next_wakeup) or (wakeup_at < next_wakeup)) then |
537 | next_wakeup= wakeup_at | 544 | next_wakeup= wakeup_at |
545 | end | ||
538 | end | 546 | end |
547 | end -- t2 loop | ||
548 | end -- t1 loop | ||
549 | |||
550 | return next_wakeup -- may be 'nil' | ||
551 | end -- check_timers() | ||
552 | |||
553 | local timer_gateway_batched = timer_gateway.batched | ||
554 | set_finalizer( function( err, stk) | ||
555 | if err and type( err) ~= "userdata" then | ||
556 | WR( "LanesTimer error: "..tostring(err)) | ||
557 | --elseif type( err) == "userdata" then | ||
558 | -- WR( "LanesTimer after cancel" ) | ||
559 | --else | ||
560 | -- WR("LanesTimer finalized") | ||
561 | end | ||
562 | end) | ||
563 | while true do | ||
564 | local next_wakeup = check_timers() | ||
565 | |||
566 | -- Sleep until next timer to wake up, or a set/clear command | ||
567 | -- | ||
568 | local secs | ||
569 | if next_wakeup then | ||
570 | secs = next_wakeup - now_secs() | ||
571 | if secs < 0 then secs = 0 end | ||
572 | end | ||
573 | local key, what = timer_gateway:receive( secs, TGW_KEY, TGW_QUERY) | ||
574 | |||
575 | if key == TGW_KEY then | ||
576 | assert( getmetatable( what) == "Linda") -- 'what' should be a linda on which the client sets a timer | ||
577 | local _, key, wakeup_at, period = timer_gateway:receive( 0, timer_gateway_batched, TGW_KEY, 3) | ||
578 | assert( key) | ||
579 | set_timer( what, key, wakeup_at, period and period > 0 and period or nil) | ||
580 | elseif key == TGW_QUERY then | ||
581 | if what == "get_timers" then | ||
582 | timer_gateway:send( TGW_REPLY, get_timers()) | ||
583 | else | ||
584 | timer_gateway:send( TGW_REPLY, "unknown query " .. what) | ||
539 | end | 585 | end |
540 | end -- t2 loop | 586 | --elseif secs == nil then -- got no value while block-waiting? |
541 | end -- t1 loop | 587 | -- WR( "timer lane: no linda, aborted?") |
542 | 588 | end | |
543 | return next_wakeup -- may be 'nil' | ||
544 | end -- check_timers() | ||
545 | |||
546 | local timer_gateway_batched = timer_gateway.batched | ||
547 | set_finalizer( function( err, stk) | ||
548 | if err and type( err) ~= "userdata" then | ||
549 | WR( "LanesTimer error: "..tostring(err)) | ||
550 | --elseif type( err) == "userdata" then | ||
551 | -- WR( "LanesTimer after cancel" ) | ||
552 | --else | ||
553 | -- WR("LanesTimer finalized") | ||
554 | end | 589 | end |
555 | end) | 590 | end -- timer_body() |
556 | while true do | 591 | timer_lane = gen( "*", { package= {}, priority = max_prio}, timer_body)() -- "*" instead of "io,package" for LuaJIT compatibility... |
557 | local next_wakeup = check_timers() | 592 | end -- first_time |
558 | 593 | ||
559 | -- Sleep until next timer to wake up, or a set/clear command | 594 | ----- |
595 | -- = timer( linda_h, key_val, date_tbl|first_secs [,period_secs] ) | ||
596 | -- | ||
597 | -- PUBLIC LANES API | ||
598 | timer = function( linda, key, a, period ) | ||
599 | if getmetatable( linda) ~= "Linda" then | ||
600 | error "expecting a Linda" | ||
601 | end | ||
602 | if a == 0.0 then | ||
603 | -- Caller expects to get current time stamp in Linda, on return | ||
604 | -- (like the timer had expired instantly); it would be good to set this | ||
605 | -- as late as possible (to give most current time) but also we want it | ||
606 | -- to precede any possible timers that might start striking. | ||
560 | -- | 607 | -- |
561 | local secs | 608 | linda:set( key, now_secs()) |
562 | if next_wakeup then | 609 | |
563 | secs = next_wakeup - now_secs() | 610 | if not period or period==0.0 then |
564 | if secs < 0 then secs = 0 end | 611 | timer_gateway:send( TGW_KEY, linda, key, nil, nil ) -- clear the timer |
565 | end | 612 | return -- nothing more to do |
566 | local key, what = timer_gateway:receive( secs, TGW_KEY, TGW_QUERY) | ||
567 | |||
568 | if key == TGW_KEY then | ||
569 | assert( getmetatable( what) == "Linda") -- 'what' should be a linda on which the client sets a timer | ||
570 | local _, key, wakeup_at, period = timer_gateway:receive( 0, timer_gateway_batched, TGW_KEY, 3) | ||
571 | assert( key) | ||
572 | set_timer( what, key, wakeup_at, period and period > 0 and period or nil) | ||
573 | elseif key == TGW_QUERY then | ||
574 | if what == "get_timers" then | ||
575 | timer_gateway:send( TGW_REPLY, get_timers()) | ||
576 | else | ||
577 | timer_gateway:send( TGW_REPLY, "unknown query " .. what) | ||
578 | end | ||
579 | --elseif secs == nil then -- got no value while block-waiting? | ||
580 | -- WR( "timer lane: no linda, aborted?") | ||
581 | end | 613 | end |
614 | a= period | ||
582 | end | 615 | end |
583 | end -- timer_body() | ||
584 | timer_lane = gen( "*", { package= {}, priority = max_prio}, timer_body)() -- "*" instead of "io,package" for LuaJIT compatibility... | ||
585 | end -- first_time | ||
586 | 616 | ||
587 | ----- | 617 | local wakeup_at= type(a)=="table" and wakeup_conv(a) -- given point of time |
588 | -- = timer( linda_h, key_val, date_tbl|first_secs [,period_secs] ) | 618 | or (a and now_secs()+a or nil) |
589 | -- | 619 | -- queue to timer |
590 | -- PUBLIC LANES API | ||
591 | timer = function( linda, key, a, period ) | ||
592 | if getmetatable( linda) ~= "Linda" then | ||
593 | error "expecting a Linda" | ||
594 | end | ||
595 | if a == 0.0 then | ||
596 | -- Caller expects to get current time stamp in Linda, on return | ||
597 | -- (like the timer had expired instantly); it would be good to set this | ||
598 | -- as late as possible (to give most current time) but also we want it | ||
599 | -- to precede any possible timers that might start striking. | ||
600 | -- | 620 | -- |
601 | linda:set( key, core.now_secs()) | 621 | timer_gateway:send( TGW_KEY, linda, key, wakeup_at, period ) |
622 | end -- timer() | ||
602 | 623 | ||
603 | if not period or period==0.0 then | 624 | ----- |
604 | timer_gateway:send( TGW_KEY, linda, key, nil, nil ) -- clear the timer | 625 | -- {[{linda, slot, when, period}[,...]]} = timers() |
605 | return -- nothing more to do | ||
606 | end | ||
607 | a= period | ||
608 | end | ||
609 | |||
610 | local wakeup_at= type(a)=="table" and core.wakeup_conv(a) -- given point of time | ||
611 | or (a and core.now_secs()+a or nil) | ||
612 | -- queue to timer | ||
613 | -- | 626 | -- |
614 | timer_gateway:send( TGW_KEY, linda, key, wakeup_at, period ) | 627 | -- PUBLIC LANES API |
615 | end | 628 | timers = function() |
616 | 629 | timer_gateway:send( TGW_QUERY, "get_timers") | |
617 | ----- | 630 | local _, r = timer_gateway:receive( TGW_REPLY) |
618 | -- {[{linda, slot, when, period}[,...]]} = timers() | 631 | return r |
619 | -- | 632 | end -- timers() |
620 | -- PUBLIC LANES API | ||
621 | timers = function() | ||
622 | timer_gateway:send( TGW_QUERY, "get_timers") | ||
623 | local _, r = timer_gateway:receive( TGW_REPLY) | ||
624 | return r | ||
625 | end | ||
626 | 633 | ||
627 | end -- settings.with_timers | 634 | end -- settings.with_timers |
628 | 635 | ||
diff --git a/src/lanes_private.h b/src/lanes_private.h index bcc3014..01d43c0 100644 --- a/src/lanes_private.h +++ b/src/lanes_private.h | |||
@@ -4,27 +4,26 @@ | |||
4 | #include "uniquekey.h" | 4 | #include "uniquekey.h" |
5 | #include "universe.h" | 5 | #include "universe.h" |
6 | 6 | ||
7 | #include <chrono> | ||
8 | #include <condition_variable> | ||
7 | #include <latch> | 9 | #include <latch> |
10 | #include <stop_token> | ||
11 | #include <thread> | ||
8 | 12 | ||
9 | // NOTE: values to be changed by either thread, during execution, without | 13 | // NOTE: values to be changed by either thread, during execution, without |
10 | // locking, are marked "volatile" | 14 | // locking, are marked "volatile" |
11 | // | 15 | // |
12 | class Lane | 16 | class Lane |
13 | { | 17 | { |
14 | private: | ||
15 | |||
16 | enum class ThreadStatus | ||
17 | { | ||
18 | Normal, // normal master side state | ||
19 | Killed // issued an OS kill | ||
20 | }; | ||
21 | |||
22 | public: | 18 | public: |
23 | 19 | ||
24 | using enum ThreadStatus; | 20 | // the thread |
25 | 21 | std::jthread m_thread; | |
26 | THREAD_T thread; | 22 | // a latch to wait for the lua_State to be ready |
27 | std::latch m_ready{ 1 }; | 23 | std::latch m_ready{ 1 }; |
24 | // to wait for stop requests through m_thread's stop_source | ||
25 | std::mutex m_done_mutex; | ||
26 | std::condition_variable m_done_signal; // use condition_variable_any if waiting for a stop_token | ||
28 | // | 27 | // |
29 | // M: sub-thread OS thread | 28 | // M: sub-thread OS thread |
30 | // S: not used | 29 | // S: not used |
@@ -42,7 +41,7 @@ class Lane | |||
42 | // M: sets to PENDING (before launching) | 41 | // M: sets to PENDING (before launching) |
43 | // S: updates -> RUNNING/WAITING -> DONE/ERROR_ST/CANCELLED | 42 | // S: updates -> RUNNING/WAITING -> DONE/ERROR_ST/CANCELLED |
44 | 43 | ||
45 | SIGNAL_T* volatile waiting_on{ nullptr }; | 44 | std::condition_variable* volatile m_waiting_on{ nullptr }; |
46 | // | 45 | // |
47 | // When status is WAITING, points on the linda's signal the thread waits on, else nullptr | 46 | // When status is WAITING, points on the linda's signal the thread waits on, else nullptr |
48 | 47 | ||
@@ -51,23 +50,6 @@ class Lane | |||
51 | // M: sets to false, flags true for cancel request | 50 | // M: sets to false, flags true for cancel request |
52 | // S: reads to see if cancel is requested | 51 | // S: reads to see if cancel is requested |
53 | 52 | ||
54 | #if THREADWAIT_METHOD == THREADWAIT_CONDVAR | ||
55 | SIGNAL_T done_signal; | ||
56 | // | ||
57 | // M: Waited upon at lane ending (if Posix with no PTHREAD_TIMEDJOIN) | ||
58 | // S: sets the signal once cancellation is noticed (avoids a kill) | ||
59 | |||
60 | MUTEX_T done_lock; | ||
61 | // | ||
62 | // Lock required by 'done_signal' condition variable, protecting | ||
63 | // lane status changes to DONE/ERROR_ST/CANCELLED. | ||
64 | #endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR | ||
65 | |||
66 | volatile ThreadStatus mstatus{ Normal }; | ||
67 | // | ||
68 | // M: sets to Normal, if issued a kill changes to Killed | ||
69 | // S: not used | ||
70 | |||
71 | Lane* volatile selfdestruct_next{ nullptr }; | 53 | Lane* volatile selfdestruct_next{ nullptr }; |
72 | // | 54 | // |
73 | // M: sets to non-nullptr if facing lane handle '__gc' cycle but the lane | 55 | // M: sets to non-nullptr if facing lane handle '__gc' cycle but the lane |
@@ -88,6 +70,9 @@ class Lane | |||
88 | 70 | ||
89 | Lane(Universe* U_, lua_State* L_); | 71 | Lane(Universe* U_, lua_State* L_); |
90 | ~Lane(); | 72 | ~Lane(); |
73 | |||
74 | bool waitForCompletion(lua_Duration duration_); | ||
75 | void startThread(int priority_); | ||
91 | }; | 76 | }; |
92 | 77 | ||
93 | // xxh64 of string "LANE_POINTER_REGKEY" generated at https://www.pelock.com/products/hash-calculator | 78 | // xxh64 of string "LANE_POINTER_REGKEY" generated at https://www.pelock.com/products/hash-calculator |
diff --git a/src/linda.cpp b/src/linda.cpp index 5ee4768..ea1410e 100644 --- a/src/linda.cpp +++ b/src/linda.cpp | |||
@@ -61,8 +61,8 @@ class Linda : public DeepPrelude // Deep userdata MUST start with this header | |||
61 | 61 | ||
62 | public: | 62 | public: |
63 | 63 | ||
64 | SIGNAL_T read_happened; | 64 | std::condition_variable m_read_happened; |
65 | SIGNAL_T write_happened; | 65 | std::condition_variable m_write_happened; |
66 | Universe* const U; // the universe this linda belongs to | 66 | Universe* const U; // the universe this linda belongs to |
67 | uintptr_t const group; // a group to control keeper allocation between lindas | 67 | uintptr_t const group; // a group to control keeper allocation between lindas |
68 | CancelRequest simulate_cancel{ CancelRequest::None }; | 68 | CancelRequest simulate_cancel{ CancelRequest::None }; |
@@ -81,17 +81,11 @@ class Linda : public DeepPrelude // Deep userdata MUST start with this header | |||
81 | : U{ U_ } | 81 | : U{ U_ } |
82 | , group{ group_ << KEEPER_MAGIC_SHIFT } | 82 | , group{ group_ << KEEPER_MAGIC_SHIFT } |
83 | { | 83 | { |
84 | SIGNAL_INIT(&read_happened); | ||
85 | SIGNAL_INIT(&write_happened); | ||
86 | |||
87 | setName(name_, len_); | 84 | setName(name_, len_); |
88 | } | 85 | } |
89 | 86 | ||
90 | ~Linda() | 87 | ~Linda() |
91 | { | 88 | { |
92 | // There aren't any lanes waiting on these lindas, since all proxies have been gc'ed. Right? | ||
93 | SIGNAL_FREE(&read_happened); | ||
94 | SIGNAL_FREE(&write_happened); | ||
95 | if (std::holds_alternative<AllocatedName>(m_name)) | 89 | if (std::holds_alternative<AllocatedName>(m_name)) |
96 | { | 90 | { |
97 | AllocatedName& name = std::get<AllocatedName>(m_name); | 91 | AllocatedName& name = std::get<AllocatedName>(m_name); |
@@ -216,15 +210,19 @@ LUAG_FUNC(linda_protected_call) | |||
216 | LUAG_FUNC(linda_send) | 210 | LUAG_FUNC(linda_send) |
217 | { | 211 | { |
218 | Linda* const linda{ lua_toLinda<false>(L, 1) }; | 212 | Linda* const linda{ lua_toLinda<false>(L, 1) }; |
219 | time_d timeout{ -1.0 }; | 213 | std::chrono::time_point<std::chrono::steady_clock> until{ std::chrono::time_point<std::chrono::steady_clock>::max() }; |
220 | int key_i{ 2 }; // index of first key, if timeout not there | 214 | int key_i{ 2 }; // index of first key, if timeout not there |
221 | 215 | ||
222 | if (lua_type(L, 2) == LUA_TNUMBER) // we don't want to use lua_isnumber() because of autocoercion | 216 | if (lua_type(L, 2) == LUA_TNUMBER) // we don't want to use lua_isnumber() because of autocoercion |
223 | { | 217 | { |
224 | timeout = SIGNAL_TIMEOUT_PREPARE(lua_tonumber(L, 2)); | 218 | lua_Duration const duration{ lua_tonumber(L, 2) }; |
219 | if (duration.count() >= 0.0) | ||
220 | { | ||
221 | until = std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::steady_clock::duration>(duration); | ||
222 | } | ||
225 | ++key_i; | 223 | ++key_i; |
226 | } | 224 | } |
227 | else if (lua_isnil(L, 2)) // alternate explicit "no timeout" by passing nil before the key | 225 | else if (lua_isnil(L, 2)) // alternate explicit "infinite timeout" by passing nil before the key |
228 | { | 226 | { |
229 | ++key_i; | 227 | ++key_i; |
230 | } | 228 | } |
@@ -266,6 +264,7 @@ LUAG_FUNC(linda_send) | |||
266 | lua_State* const KL{ K ? K->L : nullptr }; | 264 | lua_State* const KL{ K ? K->L : nullptr }; |
267 | if (KL == nullptr) | 265 | if (KL == nullptr) |
268 | return 0; | 266 | return 0; |
267 | |||
269 | STACK_CHECK_START_REL(KL, 0); | 268 | STACK_CHECK_START_REL(KL, 0); |
270 | for (bool try_again{ true };;) | 269 | for (bool try_again{ true };;) |
271 | { | 270 | { |
@@ -295,12 +294,12 @@ LUAG_FUNC(linda_send) | |||
295 | if (ret) | 294 | if (ret) |
296 | { | 295 | { |
297 | // Wake up ALL waiting threads | 296 | // Wake up ALL waiting threads |
298 | SIGNAL_ALL(&linda->write_happened); | 297 | linda->m_write_happened.notify_all(); |
299 | break; | 298 | break; |
300 | } | 299 | } |
301 | 300 | ||
302 | // instant timout to bypass the wait syscall | 301 | // instant timout to bypass the wait syscall |
303 | if (timeout == 0.0) | 302 | if (std::chrono::steady_clock::now() >= until) |
304 | { | 303 | { |
305 | break; /* no wait; instant timeout */ | 304 | break; /* no wait; instant timeout */ |
306 | } | 305 | } |
@@ -314,14 +313,17 @@ LUAG_FUNC(linda_send) | |||
314 | prev_status = lane->status; // RUNNING, most likely | 313 | prev_status = lane->status; // RUNNING, most likely |
315 | ASSERT_L(prev_status == RUNNING); // but check, just in case | 314 | ASSERT_L(prev_status == RUNNING); // but check, just in case |
316 | lane->status = WAITING; | 315 | lane->status = WAITING; |
317 | ASSERT_L(lane->waiting_on == nullptr); | 316 | ASSERT_L(lane->m_waiting_on == nullptr); |
318 | lane->waiting_on = &linda->read_happened; | 317 | lane->m_waiting_on = &linda->m_read_happened; |
319 | } | 318 | } |
320 | // could not send because no room: wait until some data was read before trying again, or until timeout is reached | 319 | // could not send because no room: wait until some data was read before trying again, or until timeout is reached |
321 | try_again = SIGNAL_WAIT(&linda->read_happened, &K->keeper_cs, timeout); | 320 | std::unique_lock<std::mutex> keeper_lock{ K->m_mutex, std::adopt_lock }; |
321 | std::cv_status const status{ linda->m_read_happened.wait_until(keeper_lock, until) }; | ||
322 | keeper_lock.release(); // we don't want to release the lock! | ||
323 | try_again = (status == std::cv_status::no_timeout); // detect spurious wakeups | ||
322 | if (lane != nullptr) | 324 | if (lane != nullptr) |
323 | { | 325 | { |
324 | lane->waiting_on = nullptr; | 326 | lane->m_waiting_on = nullptr; |
325 | lane->status = prev_status; | 327 | lane->status = prev_status; |
326 | } | 328 | } |
327 | } | 329 | } |
@@ -369,21 +371,24 @@ static constexpr UniqueKey BATCH_SENTINEL{ 0x2DDFEE0968C62AA7ull }; | |||
369 | LUAG_FUNC(linda_receive) | 371 | LUAG_FUNC(linda_receive) |
370 | { | 372 | { |
371 | Linda* const linda{ lua_toLinda<false>(L, 1) }; | 373 | Linda* const linda{ lua_toLinda<false>(L, 1) }; |
372 | 374 | std::chrono::time_point<std::chrono::steady_clock> until{ std::chrono::time_point<std::chrono::steady_clock>::max() }; | |
373 | time_d timeout{ -1.0 }; | 375 | int key_i{ 2 }; // index of first key, if timeout not there |
374 | int key_i{ 2 }; | ||
375 | 376 | ||
376 | if (lua_type(L, 2) == LUA_TNUMBER) // we don't want to use lua_isnumber() because of autocoercion | 377 | if (lua_type(L, 2) == LUA_TNUMBER) // we don't want to use lua_isnumber() because of autocoercion |
377 | { | 378 | { |
378 | timeout = SIGNAL_TIMEOUT_PREPARE(lua_tonumber(L, 2)); | 379 | lua_Duration const duration{ lua_tonumber(L, 2) }; |
380 | if (duration.count() >= 0.0) | ||
381 | { | ||
382 | until = std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::steady_clock::duration>(duration); | ||
383 | } | ||
379 | ++key_i; | 384 | ++key_i; |
380 | } | 385 | } |
381 | else if (lua_isnil(L, 2)) // alternate explicit "no timeout" by passing nil before the key | 386 | else if (lua_isnil(L, 2)) // alternate explicit "infinite timeout" by passing nil before the key |
382 | { | 387 | { |
383 | ++key_i; | 388 | ++key_i; |
384 | } | 389 | } |
385 | 390 | ||
386 | keeper_api_t keeper_receive; | 391 | keeper_api_t selected_keeper_receive{ nullptr }; |
387 | int expected_pushed_min{ 0 }, expected_pushed_max{ 0 }; | 392 | int expected_pushed_min{ 0 }, expected_pushed_max{ 0 }; |
388 | // are we in batched mode? | 393 | // are we in batched mode? |
389 | BATCH_SENTINEL.pushKey(L); | 394 | BATCH_SENTINEL.pushKey(L); |
@@ -396,7 +401,7 @@ LUAG_FUNC(linda_receive) | |||
396 | // make sure the keys are of a valid type | 401 | // make sure the keys are of a valid type |
397 | check_key_types(L, key_i, key_i); | 402 | check_key_types(L, key_i, key_i); |
398 | // receive multiple values from a single slot | 403 | // receive multiple values from a single slot |
399 | keeper_receive = KEEPER_API(receive_batched); | 404 | selected_keeper_receive = KEEPER_API(receive_batched); |
400 | // we expect a user-defined amount of return value | 405 | // we expect a user-defined amount of return value |
401 | expected_pushed_min = (int) luaL_checkinteger(L, key_i + 1); | 406 | expected_pushed_min = (int) luaL_checkinteger(L, key_i + 1); |
402 | expected_pushed_max = (int) luaL_optinteger(L, key_i + 2, expected_pushed_min); | 407 | expected_pushed_max = (int) luaL_optinteger(L, key_i + 2, expected_pushed_min); |
@@ -413,17 +418,20 @@ LUAG_FUNC(linda_receive) | |||
413 | // make sure the keys are of a valid type | 418 | // make sure the keys are of a valid type |
414 | check_key_types(L, key_i, lua_gettop(L)); | 419 | check_key_types(L, key_i, lua_gettop(L)); |
415 | // receive a single value, checking multiple slots | 420 | // receive a single value, checking multiple slots |
416 | keeper_receive = KEEPER_API(receive); | 421 | selected_keeper_receive = KEEPER_API(receive); |
417 | // we expect a single (value, key) pair of returned values | 422 | // we expect a single (value, key) pair of returned values |
418 | expected_pushed_min = expected_pushed_max = 2; | 423 | expected_pushed_min = expected_pushed_max = 2; |
419 | } | 424 | } |
420 | 425 | ||
421 | Lane* const lane{ LANE_POINTER_REGKEY.readLightUserDataValue<Lane>(L) }; | 426 | Lane* const lane{ LANE_POINTER_REGKEY.readLightUserDataValue<Lane>(L) }; |
422 | Keeper* const K{ which_keeper(linda->U->keepers, linda->hashSeed()) }; | 427 | Keeper* const K{ which_keeper(linda->U->keepers, linda->hashSeed()) }; |
423 | if (K == nullptr) | 428 | lua_State* const KL{ K ? K->L : nullptr }; |
429 | if (KL == nullptr) | ||
424 | return 0; | 430 | return 0; |
431 | |||
425 | CancelRequest cancel{ CancelRequest::None }; | 432 | CancelRequest cancel{ CancelRequest::None }; |
426 | int pushed{ 0 }; | 433 | int pushed{ 0 }; |
434 | STACK_CHECK_START_REL(KL, 0); | ||
427 | for (bool try_again{ true };;) | 435 | for (bool try_again{ true };;) |
428 | { | 436 | { |
429 | if (lane != nullptr) | 437 | if (lane != nullptr) |
@@ -439,7 +447,7 @@ LUAG_FUNC(linda_receive) | |||
439 | } | 447 | } |
440 | 448 | ||
441 | // all arguments of receive() but the first are passed to the keeper's receive function | 449 | // all arguments of receive() but the first are passed to the keeper's receive function |
442 | pushed = keeper_call(linda->U, K->L, keeper_receive, L, linda, key_i); | 450 | pushed = keeper_call(linda->U, KL, selected_keeper_receive, L, linda, key_i); |
443 | if (pushed < 0) | 451 | if (pushed < 0) |
444 | { | 452 | { |
445 | break; | 453 | break; |
@@ -451,11 +459,11 @@ LUAG_FUNC(linda_receive) | |||
451 | keeper_toggle_nil_sentinels(L, lua_gettop(L) - pushed, LookupMode::FromKeeper); | 459 | keeper_toggle_nil_sentinels(L, lua_gettop(L) - pushed, LookupMode::FromKeeper); |
452 | // To be done from within the 'K' locking area | 460 | // To be done from within the 'K' locking area |
453 | // | 461 | // |
454 | SIGNAL_ALL(&linda->read_happened); | 462 | linda->m_read_happened.notify_all(); |
455 | break; | 463 | break; |
456 | } | 464 | } |
457 | 465 | ||
458 | if (timeout == 0.0) | 466 | if (std::chrono::steady_clock::now() >= until) |
459 | { | 467 | { |
460 | break; /* instant timeout */ | 468 | break; /* instant timeout */ |
461 | } | 469 | } |
@@ -469,18 +477,22 @@ LUAG_FUNC(linda_receive) | |||
469 | prev_status = lane->status; // RUNNING, most likely | 477 | prev_status = lane->status; // RUNNING, most likely |
470 | ASSERT_L(prev_status == RUNNING); // but check, just in case | 478 | ASSERT_L(prev_status == RUNNING); // but check, just in case |
471 | lane->status = WAITING; | 479 | lane->status = WAITING; |
472 | ASSERT_L(lane->waiting_on == nullptr); | 480 | ASSERT_L(lane->m_waiting_on == nullptr); |
473 | lane->waiting_on = &linda->write_happened; | 481 | lane->m_waiting_on = &linda->m_write_happened; |
474 | } | 482 | } |
475 | // not enough data to read: wakeup when data was sent, or when timeout is reached | 483 | // not enough data to read: wakeup when data was sent, or when timeout is reached |
476 | try_again = SIGNAL_WAIT(&linda->write_happened, &K->keeper_cs, timeout); | 484 | std::unique_lock<std::mutex> keeper_lock{ K->m_mutex, std::adopt_lock }; |
485 | std::cv_status const status{ linda->m_write_happened.wait_until(keeper_lock, until) }; | ||
486 | keeper_lock.release(); // we don't want to release the lock! | ||
487 | try_again = (status == std::cv_status::no_timeout); // detect spurious wakeups | ||
477 | if (lane != nullptr) | 488 | if (lane != nullptr) |
478 | { | 489 | { |
479 | lane->waiting_on = nullptr; | 490 | lane->m_waiting_on = nullptr; |
480 | lane->status = prev_status; | 491 | lane->status = prev_status; |
481 | } | 492 | } |
482 | } | 493 | } |
483 | } | 494 | } |
495 | STACK_CHECK(KL, 0); | ||
484 | 496 | ||
485 | if (pushed < 0) | 497 | if (pushed < 0) |
486 | { | 498 | { |
@@ -537,13 +549,13 @@ LUAG_FUNC(linda_set) | |||
537 | if (has_value) | 549 | if (has_value) |
538 | { | 550 | { |
539 | // we put some data in the slot, tell readers that they should wake | 551 | // we put some data in the slot, tell readers that they should wake |
540 | SIGNAL_ALL(&linda->write_happened); // To be done from within the 'K' locking area | 552 | linda->m_write_happened.notify_all(); // To be done from within the 'K' locking area |
541 | } | 553 | } |
542 | if (pushed == 1) | 554 | if (pushed == 1) |
543 | { | 555 | { |
544 | // the key was full, but it is no longer the case, tell writers they should wake | 556 | // the key was full, but it is no longer the case, tell writers they should wake |
545 | ASSERT_L(lua_type(L, -1) == LUA_TBOOLEAN && lua_toboolean(L, -1) == 1); | 557 | ASSERT_L(lua_type(L, -1) == LUA_TBOOLEAN && lua_toboolean(L, -1) == 1); |
546 | SIGNAL_ALL(&linda->read_happened); // To be done from within the 'K' locking area | 558 | linda->m_read_happened.notify_all(); // To be done from within the 'K' locking area |
547 | } | 559 | } |
548 | } | 560 | } |
549 | } | 561 | } |
@@ -648,7 +660,7 @@ LUAG_FUNC( linda_limit) | |||
648 | if( pushed == 1) | 660 | if( pushed == 1) |
649 | { | 661 | { |
650 | ASSERT_L( lua_type( L, -1) == LUA_TBOOLEAN && lua_toboolean( L, -1) == 1); | 662 | ASSERT_L( lua_type( L, -1) == LUA_TBOOLEAN && lua_toboolean( L, -1) == 1); |
651 | SIGNAL_ALL( &linda->read_happened); // To be done from within the 'K' locking area | 663 | linda->m_read_happened.notify_all(); // To be done from within the 'K' locking area |
652 | } | 664 | } |
653 | } | 665 | } |
654 | else // linda is cancelled | 666 | else // linda is cancelled |
@@ -678,8 +690,8 @@ LUAG_FUNC(linda_cancel) | |||
678 | linda->simulate_cancel = CancelRequest::Soft; | 690 | linda->simulate_cancel = CancelRequest::Soft; |
679 | if (strcmp(who, "both") == 0) // tell everyone writers to wake up | 691 | if (strcmp(who, "both") == 0) // tell everyone writers to wake up |
680 | { | 692 | { |
681 | SIGNAL_ALL(&linda->write_happened); | 693 | linda->m_write_happened.notify_all(); |
682 | SIGNAL_ALL(&linda->read_happened); | 694 | linda->m_read_happened.notify_all(); |
683 | } | 695 | } |
684 | else if (strcmp(who, "none") == 0) // reset flag | 696 | else if (strcmp(who, "none") == 0) // reset flag |
685 | { | 697 | { |
@@ -687,11 +699,11 @@ LUAG_FUNC(linda_cancel) | |||
687 | } | 699 | } |
688 | else if (strcmp(who, "read") == 0) // tell blocked readers to wake up | 700 | else if (strcmp(who, "read") == 0) // tell blocked readers to wake up |
689 | { | 701 | { |
690 | SIGNAL_ALL(&linda->write_happened); | 702 | linda->m_write_happened.notify_all(); |
691 | } | 703 | } |
692 | else if (strcmp(who, "write") == 0) // tell blocked writers to wake up | 704 | else if (strcmp(who, "write") == 0) // tell blocked writers to wake up |
693 | { | 705 | { |
694 | SIGNAL_ALL(&linda->read_happened); | 706 | linda->m_read_happened.notify_all(); |
695 | } | 707 | } |
696 | else | 708 | else |
697 | { | 709 | { |
diff --git a/src/macros_and_utils.h b/src/macros_and_utils.h index e29e7fb..997b452 100644 --- a/src/macros_and_utils.h +++ b/src/macros_and_utils.h | |||
@@ -11,9 +11,12 @@ extern "C" { | |||
11 | #endif // __cplusplus | 11 | #endif // __cplusplus |
12 | 12 | ||
13 | #include <cassert> | 13 | #include <cassert> |
14 | #include <chrono> | ||
14 | #include <tuple> | 15 | #include <tuple> |
15 | #include <type_traits> | 16 | #include <type_traits> |
16 | 17 | ||
18 | using namespace std::chrono_literals; | ||
19 | |||
17 | #define USE_DEBUG_SPEW() 0 | 20 | #define USE_DEBUG_SPEW() 0 |
18 | #if USE_DEBUG_SPEW() | 21 | #if USE_DEBUG_SPEW() |
19 | extern char const* debugspew_indent; | 22 | extern char const* debugspew_indent; |
@@ -167,3 +170,5 @@ T* lua_newuserdatauv(lua_State* L, int nuvalue_) | |||
167 | std::ignore = lua_error(L); // doesn't return | 170 | std::ignore = lua_error(L); // doesn't return |
168 | assert(false); // we should never get here, but i'm paranoid | 171 | assert(false); // we should never get here, but i'm paranoid |
169 | } | 172 | } |
173 | |||
174 | using lua_Duration = std::chrono::template duration<lua_Number>; | ||
diff --git a/src/threading.cpp b/src/threading.cpp index afeb184..fc20931 100644 --- a/src/threading.cpp +++ b/src/threading.cpp | |||
@@ -93,9 +93,6 @@ THE SOFTWARE. | |||
93 | # pragma warning( disable : 4054 ) | 93 | # pragma warning( disable : 4054 ) |
94 | #endif | 94 | #endif |
95 | 95 | ||
96 | //#define THREAD_CREATE_RETRIES_MAX 20 | ||
97 | // loops (maybe retry forever?) | ||
98 | |||
99 | /* | 96 | /* |
100 | * FAIL is for unexpected API return values - essentially programming | 97 | * FAIL is for unexpected API return values - essentially programming |
101 | * error in _this_ code. | 98 | * error in _this_ code. |
@@ -196,36 +193,6 @@ time_d now_secs(void) { | |||
196 | } | 193 | } |
197 | 194 | ||
198 | 195 | ||
199 | /* | ||
200 | */ | ||
201 | time_d SIGNAL_TIMEOUT_PREPARE( double secs ) { | ||
202 | if (secs<=0.0) return secs; | ||
203 | else return now_secs() + secs; | ||
204 | } | ||
205 | |||
206 | |||
207 | #if THREADAPI == THREADAPI_PTHREAD | ||
208 | /* | ||
209 | * Prepare 'abs_secs' kind of timeout to 'timespec' format | ||
210 | */ | ||
211 | static void prepare_timeout( struct timespec *ts, time_d abs_secs ) { | ||
212 | assert(ts); | ||
213 | assert( abs_secs >= 0.0 ); | ||
214 | |||
215 | if (abs_secs==0.0) | ||
216 | abs_secs= now_secs(); | ||
217 | |||
218 | ts->tv_sec= (time_t) floor( abs_secs ); | ||
219 | ts->tv_nsec= ((long)((abs_secs - ts->tv_sec) * 1000.0 +0.5)) * 1000000UL; // 1ms = 1000000ns | ||
220 | if (ts->tv_nsec == 1000000000UL) | ||
221 | { | ||
222 | ts->tv_nsec = 0; | ||
223 | ts->tv_sec = ts->tv_sec + 1; | ||
224 | } | ||
225 | } | ||
226 | #endif // THREADAPI == THREADAPI_PTHREAD | ||
227 | |||
228 | |||
229 | /*---=== Threading ===---*/ | 196 | /*---=== Threading ===---*/ |
230 | 197 | ||
231 | //--- | 198 | //--- |
@@ -268,30 +235,6 @@ static void prepare_timeout( struct timespec *ts, time_d abs_secs ) { | |||
268 | 235 | ||
269 | #if THREADAPI == THREADAPI_WINDOWS | 236 | #if THREADAPI == THREADAPI_WINDOWS |
270 | 237 | ||
271 | #if _WIN32_WINNT < 0x0600 // CONDITION_VARIABLE aren't available | ||
272 | // | ||
273 | void MUTEX_INIT( MUTEX_T *ref ) { | ||
274 | *ref= CreateMutex( nullptr /*security attr*/, false /*not locked*/, nullptr ); | ||
275 | if (!ref) FAIL( "CreateMutex", GetLastError() ); | ||
276 | } | ||
277 | void MUTEX_FREE( MUTEX_T *ref ) { | ||
278 | if (!CloseHandle(*ref)) FAIL( "CloseHandle (mutex)", GetLastError() ); | ||
279 | *ref= nullptr; | ||
280 | } | ||
281 | void MUTEX_LOCK( MUTEX_T *ref ) | ||
282 | { | ||
283 | DWORD rc = WaitForSingleObject( *ref, INFINITE); | ||
284 | // ERROR_WAIT_NO_CHILDREN means a thread was killed (lane terminated because of error raised during a linda transfer for example) while having grabbed this mutex | ||
285 | // this is not a big problem as we will grab it just the same, so ignore this particular error | ||
286 | if( rc != 0 && rc != ERROR_WAIT_NO_CHILDREN) | ||
287 | FAIL( "WaitForSingleObject", (rc == WAIT_FAILED) ? GetLastError() : rc); | ||
288 | } | ||
289 | void MUTEX_UNLOCK( MUTEX_T *ref ) { | ||
290 | if (!ReleaseMutex(*ref)) | ||
291 | FAIL( "ReleaseMutex", GetLastError() ); | ||
292 | } | ||
293 | #endif // CONDITION_VARIABLE aren't available | ||
294 | |||
295 | static int const gs_prio_remap[] = | 238 | static int const gs_prio_remap[] = |
296 | { | 239 | { |
297 | THREAD_PRIORITY_IDLE, | 240 | THREAD_PRIORITY_IDLE, |
@@ -303,37 +246,7 @@ static int const gs_prio_remap[] = | |||
303 | THREAD_PRIORITY_TIME_CRITICAL | 246 | THREAD_PRIORITY_TIME_CRITICAL |
304 | }; | 247 | }; |
305 | 248 | ||
306 | /* MSDN: "If you would like to use the CRT in ThreadProc, use the | 249 | // ############################################################################################### |
307 | _beginthreadex function instead (of CreateThread)." | ||
308 | MSDN: "you can create at most 2028 threads" | ||
309 | */ | ||
310 | // Note: Visual C++ requires '__stdcall' where it is | ||
311 | void THREAD_CREATE( THREAD_T* ref, THREAD_RETURN_T (__stdcall *func)( void*), void* data, int prio /* -3..+3 */) | ||
312 | { | ||
313 | HANDLE h = (HANDLE) _beginthreadex(nullptr, // security | ||
314 | _THREAD_STACK_SIZE, | ||
315 | func, | ||
316 | data, | ||
317 | 0, // flags (0/CREATE_SUSPENDED) | ||
318 | nullptr // thread id (not used) | ||
319 | ); | ||
320 | |||
321 | if (h == nullptr) // _beginthreadex returns 0L on failure instead of -1L (like _beginthread) | ||
322 | { | ||
323 | FAIL( "CreateThread", GetLastError()); | ||
324 | } | ||
325 | |||
326 | if (prio != THREAD_PRIO_DEFAULT) | ||
327 | { | ||
328 | if (!SetThreadPriority( h, gs_prio_remap[prio + 3])) | ||
329 | { | ||
330 | FAIL( "SetThreadPriority", GetLastError()); | ||
331 | } | ||
332 | } | ||
333 | |||
334 | *ref = h; | ||
335 | } | ||
336 | |||
337 | 250 | ||
338 | void THREAD_SET_PRIORITY( int prio) | 251 | void THREAD_SET_PRIORITY( int prio) |
339 | { | 252 | { |
@@ -344,42 +257,26 @@ void THREAD_SET_PRIORITY( int prio) | |||
344 | } | 257 | } |
345 | } | 258 | } |
346 | 259 | ||
347 | void THREAD_SET_AFFINITY( unsigned int aff) | 260 | // ############################################################################################### |
261 | |||
262 | void JTHREAD_SET_PRIORITY(std::jthread& thread_, int prio_) | ||
348 | { | 263 | { |
349 | if( !SetThreadAffinityMask( GetCurrentThread(), aff)) | 264 | // prio range [-3,+3] was checked by the caller |
265 | if (!SetThreadPriority(thread_.native_handle(), gs_prio_remap[prio_ + 3])) | ||
350 | { | 266 | { |
351 | FAIL( "THREAD_SET_AFFINITY", GetLastError()); | 267 | FAIL("JTHREAD_SET_PRIORITY", GetLastError()); |
352 | } | 268 | } |
353 | } | 269 | } |
354 | 270 | ||
355 | bool THREAD_WAIT_IMPL( THREAD_T *ref, double secs) | 271 | // ############################################################################################### |
356 | { | ||
357 | DWORD ms = (secs<0.0) ? INFINITE : (DWORD)((secs*1000.0)+0.5); | ||
358 | 272 | ||
359 | DWORD rc= WaitForSingleObject( *ref, ms /*timeout*/ ); | 273 | void THREAD_SET_AFFINITY(unsigned int aff) |
360 | // | 274 | { |
361 | // (WAIT_ABANDONED) | 275 | if( !SetThreadAffinityMask( GetCurrentThread(), aff)) |
362 | // WAIT_OBJECT_0 success (0) | ||
363 | // WAIT_TIMEOUT | ||
364 | // WAIT_FAILED more info via GetLastError() | ||
365 | |||
366 | if (rc == WAIT_TIMEOUT) return false; | ||
367 | if( rc !=0) FAIL( "WaitForSingleObject", rc==WAIT_FAILED ? GetLastError() : rc); | ||
368 | *ref = nullptr; // thread no longer usable | ||
369 | return true; | ||
370 | } | ||
371 | // | ||
372 | void THREAD_KILL( THREAD_T *ref ) | ||
373 | { | 276 | { |
374 | // nonexistent on Xbox360, simply disable until a better solution is found | 277 | FAIL( "THREAD_SET_AFFINITY", GetLastError()); |
375 | #if !defined( PLATFORM_XBOX) | ||
376 | // in theory no-one should call this as it is very dangerous (memory and mutex leaks, no notification of DLLs, etc.) | ||
377 | if (!TerminateThread( *ref, 0 )) FAIL("TerminateThread", GetLastError()); | ||
378 | #endif // PLATFORM_XBOX | ||
379 | *ref = nullptr; | ||
380 | } | 278 | } |
381 | 279 | } | |
382 | void THREAD_MAKE_ASYNCH_CANCELLABLE() {} // nothing to do for windows threads, we can cancel them anytime we want | ||
383 | 280 | ||
384 | #if !defined __GNUC__ | 281 | #if !defined __GNUC__ |
385 | //see http://msdn.microsoft.com/en-us/library/xcb2z8hs.aspx | 282 | //see http://msdn.microsoft.com/en-us/library/xcb2z8hs.aspx |
@@ -414,158 +311,6 @@ bool THREAD_WAIT_IMPL( THREAD_T *ref, double secs) | |||
414 | #endif // !__GNUC__ | 311 | #endif // !__GNUC__ |
415 | } | 312 | } |
416 | 313 | ||
417 | #if _WIN32_WINNT < 0x0600 // CONDITION_VARIABLE aren't available | ||
418 | |||
419 | void SIGNAL_INIT( SIGNAL_T* ref) | ||
420 | { | ||
421 | InitializeCriticalSection( &ref->signalCS); | ||
422 | InitializeCriticalSection( &ref->countCS); | ||
423 | if( 0 == (ref->waitEvent = CreateEvent( 0, true, false, 0))) // manual-reset | ||
424 | FAIL( "CreateEvent", GetLastError()); | ||
425 | if( 0 == (ref->waitDoneEvent = CreateEvent( 0, false, false, 0))) // auto-reset | ||
426 | FAIL( "CreateEvent", GetLastError()); | ||
427 | ref->waitersCount = 0; | ||
428 | } | ||
429 | |||
430 | void SIGNAL_FREE( SIGNAL_T* ref) | ||
431 | { | ||
432 | CloseHandle( ref->waitDoneEvent); | ||
433 | CloseHandle( ref->waitEvent); | ||
434 | DeleteCriticalSection( &ref->countCS); | ||
435 | DeleteCriticalSection( &ref->signalCS); | ||
436 | } | ||
437 | |||
438 | bool SIGNAL_WAIT( SIGNAL_T* ref, MUTEX_T* mu_ref, time_d abs_secs) | ||
439 | { | ||
440 | DWORD errc; | ||
441 | DWORD ms; | ||
442 | |||
443 | if( abs_secs < 0.0) | ||
444 | ms = INFINITE; | ||
445 | else if( abs_secs == 0.0) | ||
446 | ms = 0; | ||
447 | else | ||
448 | { | ||
449 | time_d msd = (abs_secs - now_secs()) * 1000.0 + 0.5; | ||
450 | // If the time already passed, still try once (ms==0). A short timeout | ||
451 | // may have turned negative or 0 because of the two time samples done. | ||
452 | ms = msd <= 0.0 ? 0 : (DWORD)msd; | ||
453 | } | ||
454 | |||
455 | EnterCriticalSection( &ref->signalCS); | ||
456 | EnterCriticalSection( &ref->countCS); | ||
457 | ++ ref->waitersCount; | ||
458 | LeaveCriticalSection( &ref->countCS); | ||
459 | LeaveCriticalSection( &ref->signalCS); | ||
460 | |||
461 | errc = SignalObjectAndWait( *mu_ref, ref->waitEvent, ms, false); | ||
462 | |||
463 | EnterCriticalSection( &ref->countCS); | ||
464 | if( 0 == -- ref->waitersCount) | ||
465 | { | ||
466 | // we're the last one leaving... | ||
467 | ResetEvent( ref->waitEvent); | ||
468 | SetEvent( ref->waitDoneEvent); | ||
469 | } | ||
470 | LeaveCriticalSection( &ref->countCS); | ||
471 | MUTEX_LOCK( mu_ref); | ||
472 | |||
473 | switch( errc) | ||
474 | { | ||
475 | case WAIT_TIMEOUT: | ||
476 | return false; | ||
477 | case WAIT_OBJECT_0: | ||
478 | return true; | ||
479 | } | ||
480 | |||
481 | FAIL( "SignalObjectAndWait", GetLastError()); | ||
482 | return false; | ||
483 | } | ||
484 | |||
485 | void SIGNAL_ALL( SIGNAL_T* ref) | ||
486 | { | ||
487 | DWORD errc = WAIT_OBJECT_0; | ||
488 | |||
489 | EnterCriticalSection( &ref->signalCS); | ||
490 | EnterCriticalSection( &ref->countCS); | ||
491 | |||
492 | if( ref->waitersCount > 0) | ||
493 | { | ||
494 | ResetEvent( ref->waitDoneEvent); | ||
495 | SetEvent( ref->waitEvent); | ||
496 | LeaveCriticalSection( &ref->countCS); | ||
497 | errc = WaitForSingleObject( ref->waitDoneEvent, INFINITE); | ||
498 | } | ||
499 | else | ||
500 | { | ||
501 | LeaveCriticalSection( &ref->countCS); | ||
502 | } | ||
503 | |||
504 | LeaveCriticalSection( &ref->signalCS); | ||
505 | |||
506 | if( WAIT_OBJECT_0 != errc) | ||
507 | FAIL( "WaitForSingleObject", GetLastError()); | ||
508 | } | ||
509 | |||
510 | #else // CONDITION_VARIABLE are available, use them | ||
511 | |||
512 | // | ||
513 | void SIGNAL_INIT( SIGNAL_T *ref ) | ||
514 | { | ||
515 | InitializeConditionVariable( ref); | ||
516 | } | ||
517 | |||
518 | void SIGNAL_FREE( SIGNAL_T *ref ) | ||
519 | { | ||
520 | // nothing to do | ||
521 | (void)ref; | ||
522 | } | ||
523 | |||
524 | bool SIGNAL_WAIT( SIGNAL_T *ref, MUTEX_T *mu_ref, time_d abs_secs) | ||
525 | { | ||
526 | long ms; | ||
527 | |||
528 | if( abs_secs < 0.0) | ||
529 | ms = INFINITE; | ||
530 | else if( abs_secs == 0.0) | ||
531 | ms = 0; | ||
532 | else | ||
533 | { | ||
534 | ms = (long) ((abs_secs - now_secs())*1000.0 + 0.5); | ||
535 | |||
536 | // If the time already passed, still try once (ms==0). A short timeout | ||
537 | // may have turned negative or 0 because of the two time samples done. | ||
538 | // | ||
539 | if( ms < 0) | ||
540 | ms = 0; | ||
541 | } | ||
542 | |||
543 | if( !SleepConditionVariableCS( ref, mu_ref, ms)) | ||
544 | { | ||
545 | if( GetLastError() == ERROR_TIMEOUT) | ||
546 | { | ||
547 | return false; | ||
548 | } | ||
549 | else | ||
550 | { | ||
551 | FAIL( "SleepConditionVariableCS", GetLastError()); | ||
552 | } | ||
553 | } | ||
554 | return true; | ||
555 | } | ||
556 | |||
557 | void SIGNAL_ONE( SIGNAL_T *ref ) | ||
558 | { | ||
559 | WakeConditionVariable( ref); | ||
560 | } | ||
561 | |||
562 | void SIGNAL_ALL( SIGNAL_T *ref ) | ||
563 | { | ||
564 | WakeAllConditionVariable( ref); | ||
565 | } | ||
566 | |||
567 | #endif // CONDITION_VARIABLE are available | ||
568 | |||
569 | #else // THREADAPI == THREADAPI_PTHREAD | 314 | #else // THREADAPI == THREADAPI_PTHREAD |
570 | // PThread (Linux, OS X, ...) | 315 | // PThread (Linux, OS X, ...) |
571 | // | 316 | // |
@@ -607,44 +352,6 @@ bool THREAD_WAIT_IMPL( THREAD_T *ref, double secs) | |||
607 | abort(); | 352 | abort(); |
608 | } | 353 | } |
609 | #define PT_CALL( call ) { int rc= call; if (rc!=0) _PT_FAIL( rc, #call, __FILE__, __LINE__ ); } | 354 | #define PT_CALL( call ) { int rc= call; if (rc!=0) _PT_FAIL( rc, #call, __FILE__, __LINE__ ); } |
610 | // | ||
611 | void SIGNAL_INIT( SIGNAL_T *ref ) { | ||
612 | PT_CALL(pthread_cond_init(ref, nullptr /*attr*/)); | ||
613 | } | ||
614 | void SIGNAL_FREE( SIGNAL_T *ref ) { | ||
615 | PT_CALL( pthread_cond_destroy(ref) ); | ||
616 | } | ||
617 | // | ||
618 | /* | ||
619 | * Timeout is given as absolute since we may have fake wakeups during | ||
620 | * a timed out sleep. A Linda with some other key read, or just because | ||
621 | * PThread cond vars can wake up unwantedly. | ||
622 | */ | ||
623 | bool SIGNAL_WAIT( SIGNAL_T *ref, pthread_mutex_t *mu, time_d abs_secs ) { | ||
624 | if (abs_secs<0.0) { | ||
625 | PT_CALL( pthread_cond_wait( ref, mu ) ); // infinite | ||
626 | } else { | ||
627 | int rc; | ||
628 | struct timespec ts; | ||
629 | |||
630 | assert( abs_secs != 0.0 ); | ||
631 | prepare_timeout( &ts, abs_secs ); | ||
632 | |||
633 | rc= pthread_cond_timedwait( ref, mu, &ts ); | ||
634 | |||
635 | if (rc==ETIMEDOUT) return false; | ||
636 | if (rc) { _PT_FAIL( rc, "pthread_cond_timedwait()", __FILE__, __LINE__ ); } | ||
637 | } | ||
638 | return true; | ||
639 | } | ||
640 | // | ||
641 | void SIGNAL_ONE( SIGNAL_T *ref ) { | ||
642 | PT_CALL( pthread_cond_signal(ref) ); // wake up ONE (or no) waiting thread | ||
643 | } | ||
644 | // | ||
645 | void SIGNAL_ALL( SIGNAL_T *ref ) { | ||
646 | PT_CALL( pthread_cond_broadcast(ref) ); // wake up ALL waiting threads | ||
647 | } | ||
648 | 355 | ||
649 | // array of 7 thread priority values, hand-tuned by platform so that we offer a uniform [-3,+3] public priority range | 356 | // array of 7 thread priority values, hand-tuned by platform so that we offer a uniform [-3,+3] public priority range |
650 | static int const gs_prio_remap[] = | 357 | static int const gs_prio_remap[] = |
@@ -775,129 +482,36 @@ static int select_prio(int prio /* -3..+3 */) | |||
775 | return gs_prio_remap[prio + 3]; | 482 | return gs_prio_remap[prio + 3]; |
776 | } | 483 | } |
777 | 484 | ||
778 | void THREAD_CREATE( THREAD_T* ref, THREAD_RETURN_T (*func)( void*), void* data, int prio /* -3..+3 */) | 485 | void THREAD_SET_PRIORITY( int prio) |
779 | { | 486 | { |
780 | pthread_attr_t a; | ||
781 | bool const change_priority = | ||
782 | #ifdef PLATFORM_LINUX | 487 | #ifdef PLATFORM_LINUX |
783 | sudo && // only root-privileged process can change priorities | 488 | if( sudo) // only root-privileged process can change priorities |
784 | #endif | 489 | #endif // PLATFORM_LINUX |
785 | (prio != THREAD_PRIO_DEFAULT); | ||
786 | |||
787 | PT_CALL( pthread_attr_init( &a)); | ||
788 | |||
789 | #ifndef PTHREAD_TIMEDJOIN | ||
790 | // We create a NON-JOINABLE thread. This is mainly due to the lack of | ||
791 | // 'pthread_timedjoin()', but does offer other benefits (s.a. earlier | ||
792 | // freeing of the thread's resources). | ||
793 | // | ||
794 | PT_CALL( pthread_attr_setdetachstate( &a, PTHREAD_CREATE_DETACHED)); | ||
795 | #endif // PTHREAD_TIMEDJOIN | ||
796 | |||
797 | // Use this to find a system's default stack size (DEBUG) | ||
798 | #if 0 | ||
799 | { | ||
800 | size_t n; | ||
801 | pthread_attr_getstacksize( &a, &n); | ||
802 | fprintf( stderr, "Getstack: %u\n", (unsigned int)n); | ||
803 | } | ||
804 | // 524288 on OS X | ||
805 | // 2097152 on Linux x86 (Ubuntu 7.04) | ||
806 | // 1048576 on FreeBSD 6.2 SMP i386 | ||
807 | #endif // 0 | ||
808 | |||
809 | #if defined _THREAD_STACK_SIZE && _THREAD_STACK_SIZE > 0 | ||
810 | PT_CALL( pthread_attr_setstacksize( &a, _THREAD_STACK_SIZE)); | ||
811 | #endif | ||
812 | |||
813 | if (change_priority) | ||
814 | { | 490 | { |
815 | struct sched_param sp; | 491 | struct sched_param sp; |
816 | // "The specified scheduling parameters are only used if the scheduling | 492 | // prio range [-3,+3] was checked by the caller |
817 | // parameter inheritance attribute is PTHREAD_EXPLICIT_SCHED." | 493 | sp.sched_priority = gs_prio_remap[ prio + 3]; |
818 | // | 494 | PT_CALL( pthread_setschedparam( pthread_self(), _PRIO_MODE, &sp)); |
819 | #if !defined __ANDROID__ || ( defined __ANDROID__ && __ANDROID_API__ >= 28 ) | ||
820 | PT_CALL( pthread_attr_setinheritsched( &a, PTHREAD_EXPLICIT_SCHED)); | ||
821 | #endif | ||
822 | |||
823 | #ifdef _PRIO_SCOPE | ||
824 | PT_CALL( pthread_attr_setscope( &a, _PRIO_SCOPE)); | ||
825 | #endif // _PRIO_SCOPE | ||
826 | |||
827 | PT_CALL( pthread_attr_setschedpolicy( &a, _PRIO_MODE)); | ||
828 | |||
829 | sp.sched_priority = select_prio(prio); | ||
830 | PT_CALL( pthread_attr_setschedparam( &a, &sp)); | ||
831 | } | ||
832 | |||
833 | //--- | ||
834 | // Seems on OS X, _POSIX_THREAD_THREADS_MAX is some kind of system | ||
835 | // thread limit (not userland thread). Actual limit for us is way higher. | ||
836 | // PTHREAD_THREADS_MAX is not defined (even though man page refers to it!) | ||
837 | // | ||
838 | # ifndef THREAD_CREATE_RETRIES_MAX | ||
839 | // Don't bother with retries; a failure is a failure | ||
840 | // | ||
841 | { | ||
842 | int rc = pthread_create( ref, &a, func, data); | ||
843 | if( rc) _PT_FAIL( rc, "pthread_create()", __FILE__, __LINE__ - 1); | ||
844 | } | 495 | } |
845 | # else | ||
846 | # error "This code deprecated" | ||
847 | /* | ||
848 | // Wait slightly if thread creation has exchausted the system | ||
849 | // | ||
850 | { int retries; | ||
851 | for( retries=0; retries<THREAD_CREATE_RETRIES_MAX; retries++ ) { | ||
852 | |||
853 | int rc= pthread_create( ref, &a, func, data ); | ||
854 | // | ||
855 | // OS X / Linux: | ||
856 | // EAGAIN: ".. lacked the necessary resources to create | ||
857 | // another thread, or the system-imposed limit on the | ||
858 | // total number of threads in a process | ||
859 | // [PTHREAD_THREADS_MAX] would be exceeded." | ||
860 | // EINVAL: attr is invalid | ||
861 | // Linux: | ||
862 | // EPERM: no rights for given parameters or scheduling (no sudo) | ||
863 | // ENOMEM: (known to fail with this code, too - not listed in man) | ||
864 | |||
865 | if (rc==0) break; // ok! | ||
866 | |||
867 | // In practise, exhaustion seems to be coming from memory, not a | ||
868 | // maximum number of threads. Keep tuning... ;) | ||
869 | // | ||
870 | if (rc==EAGAIN) { | ||
871 | //fprintf( stderr, "Looping (retries=%d) ", retries ); // DEBUG | ||
872 | |||
873 | // Try again, later. | ||
874 | |||
875 | Yield(); | ||
876 | } else { | ||
877 | _PT_FAIL( rc, "pthread_create()", __FILE__, __LINE__ ); | ||
878 | } | ||
879 | } | ||
880 | } | ||
881 | */ | ||
882 | # endif | ||
883 | |||
884 | PT_CALL( pthread_attr_destroy( &a)); | ||
885 | } | 496 | } |
886 | 497 | ||
498 | // ################################################################################################# | ||
887 | 499 | ||
888 | void THREAD_SET_PRIORITY( int prio) | 500 | void JTHREAD_SET_PRIORITY(std::jthread& thread_, int prio_) |
889 | { | 501 | { |
890 | #ifdef PLATFORM_LINUX | 502 | #ifdef PLATFORM_LINUX |
891 | if( sudo) // only root-privileged process can change priorities | 503 | if (sudo) // only root-privileged process can change priorities |
892 | #endif // PLATFORM_LINUX | 504 | #endif // PLATFORM_LINUX |
893 | { | 505 | { |
894 | struct sched_param sp; | 506 | struct sched_param sp; |
895 | // prio range [-3,+3] was checked by the caller | 507 | // prio range [-3,+3] was checked by the caller |
896 | sp.sched_priority = gs_prio_remap[ prio + 3]; | 508 | sp.sched_priority = gs_prio_remap[prio_ + 3]; |
897 | PT_CALL( pthread_setschedparam( pthread_self(), _PRIO_MODE, &sp)); | 509 | PT_CALL(pthread_setschedparam(static_cast<pthread_t>(thread_.native_handle()), _PRIO_MODE, &sp)); |
898 | } | 510 | } |
899 | } | 511 | } |
900 | 512 | ||
513 | // ################################################################################################# | ||
514 | |||
901 | void THREAD_SET_AFFINITY( unsigned int aff) | 515 | void THREAD_SET_AFFINITY( unsigned int aff) |
902 | { | 516 | { |
903 | int bit = 0; | 517 | int bit = 0; |
@@ -929,93 +543,6 @@ void THREAD_SET_AFFINITY( unsigned int aff) | |||
929 | #endif | 543 | #endif |
930 | } | 544 | } |
931 | 545 | ||
932 | /* | ||
933 | * Wait for a thread to finish. | ||
934 | * | ||
935 | * 'mu_ref' is a lock we should use for the waiting; initially unlocked. | ||
936 | * Same lock as passed to THREAD_EXIT. | ||
937 | * | ||
938 | * Returns true for successful wait, false for timed out | ||
939 | */ | ||
940 | bool THREAD_WAIT( THREAD_T *ref, double secs , SIGNAL_T *signal_ref, MUTEX_T *mu_ref, volatile enum e_status *st_ref) | ||
941 | { | ||
942 | struct timespec ts_store; | ||
943 | const struct timespec* timeout = nullptr; | ||
944 | bool done; | ||
945 | |||
946 | // Do timeout counting before the locks | ||
947 | // | ||
948 | #if THREADWAIT_METHOD == THREADWAIT_TIMEOUT | ||
949 | if (secs>=0.0) | ||
950 | #else // THREADWAIT_METHOD == THREADWAIT_CONDVAR | ||
951 | if (secs>0.0) | ||
952 | #endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR | ||
953 | { | ||
954 | prepare_timeout( &ts_store, now_secs()+secs ); | ||
955 | timeout= &ts_store; | ||
956 | } | ||
957 | |||
958 | #if THREADWAIT_METHOD == THREADWAIT_TIMEOUT | ||
959 | /* Thread is joinable | ||
960 | */ | ||
961 | if (!timeout) { | ||
962 | PT_CALL(pthread_join(*ref, nullptr /*ignore exit value*/)); | ||
963 | done = true; | ||
964 | } else { | ||
965 | int rc = PTHREAD_TIMEDJOIN(*ref, nullptr, timeout); | ||
966 | if ((rc!=0) && (rc!=ETIMEDOUT)) { | ||
967 | _PT_FAIL( rc, "PTHREAD_TIMEDJOIN", __FILE__, __LINE__-2 ); | ||
968 | } | ||
969 | done= rc==0; | ||
970 | } | ||
971 | #else // THREADWAIT_METHOD == THREADWAIT_CONDVAR | ||
972 | /* Since we've set the thread up as PTHREAD_CREATE_DETACHED, we cannot | ||
973 | * join with it. Use the cond.var. | ||
974 | */ | ||
975 | (void) ref; // unused | ||
976 | MUTEX_LOCK( mu_ref ); | ||
977 | |||
978 | // 'secs'==0.0 does not need to wait, just take the current status | ||
979 | // within the 'mu_ref' locks | ||
980 | // | ||
981 | if (secs != 0.0) { | ||
982 | while( *st_ref < DONE ) { | ||
983 | if (!timeout) { | ||
984 | PT_CALL( pthread_cond_wait( signal_ref, mu_ref )); | ||
985 | } else { | ||
986 | int rc= pthread_cond_timedwait( signal_ref, mu_ref, timeout ); | ||
987 | if (rc==ETIMEDOUT) break; | ||
988 | if (rc!=0) _PT_FAIL( rc, "pthread_cond_timedwait", __FILE__, __LINE__-2 ); | ||
989 | } | ||
990 | } | ||
991 | } | ||
992 | done= *st_ref >= DONE; // DONE|ERROR_ST|CANCELLED | ||
993 | |||
994 | MUTEX_UNLOCK( mu_ref ); | ||
995 | #endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR | ||
996 | return done; | ||
997 | } | ||
998 | // | ||
999 | void THREAD_KILL( THREAD_T *ref ) { | ||
1000 | #ifdef __ANDROID__ | ||
1001 | __android_log_print(ANDROID_LOG_WARN, LOG_TAG, "Cannot kill thread!"); | ||
1002 | #else | ||
1003 | pthread_cancel( *ref ); | ||
1004 | #endif | ||
1005 | } | ||
1006 | |||
1007 | void THREAD_MAKE_ASYNCH_CANCELLABLE() | ||
1008 | { | ||
1009 | #ifdef __ANDROID__ | ||
1010 | __android_log_print(ANDROID_LOG_WARN, LOG_TAG, "Cannot make thread async cancellable!"); | ||
1011 | #else | ||
1012 | // that's the default, but just in case... | ||
1013 | pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, nullptr); | ||
1014 | // we want cancellation to take effect immediately if possible, instead of waiting for a cancellation point (which is the default) | ||
1015 | pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, nullptr); | ||
1016 | #endif | ||
1017 | } | ||
1018 | |||
1019 | void THREAD_SETNAME( char const* _name) | 546 | void THREAD_SETNAME( char const* _name) |
1020 | { | 547 | { |
1021 | // exact API to set the thread name is platform-dependant | 548 | // exact API to set the thread name is platform-dependant |
diff --git a/src/threading.h b/src/threading.h index 38a021f..e9f302a 100644 --- a/src/threading.h +++ b/src/threading.h | |||
@@ -1,13 +1,9 @@ | |||
1 | #pragma once | 1 | #pragma once |
2 | 2 | ||
3 | /* | ||
4 | * win32-pthread: | ||
5 | * define HAVE_WIN32_PTHREAD and PTW32_INCLUDE_WINDOWS_H in your project configuration when building for win32-pthread. | ||
6 | * link against pthreadVC2.lib, and of course have pthreadVC2.dll somewhere in your path. | ||
7 | */ | ||
8 | #include "platform.h" | 3 | #include "platform.h" |
9 | 4 | ||
10 | #include <time.h> | 5 | #include <time.h> |
6 | #include <thread> | ||
11 | 7 | ||
12 | /* Note: ERROR is a defined entity on Win32 | 8 | /* Note: ERROR is a defined entity on Win32 |
13 | PENDING: The Lua VM hasn't done anything yet. | 9 | PENDING: The Lua VM hasn't done anything yet. |
@@ -19,7 +15,7 @@ enum e_status { PENDING, RUNNING, WAITING, DONE, ERROR_ST, CANCELLED }; | |||
19 | #define THREADAPI_WINDOWS 1 | 15 | #define THREADAPI_WINDOWS 1 |
20 | #define THREADAPI_PTHREAD 2 | 16 | #define THREADAPI_PTHREAD 2 |
21 | 17 | ||
22 | #if( defined( PLATFORM_XBOX) || defined( PLATFORM_WIN32) || defined( PLATFORM_POCKETPC)) && !defined( HAVE_WIN32_PTHREAD) | 18 | #if( defined( PLATFORM_XBOX) || defined( PLATFORM_WIN32) || defined( PLATFORM_POCKETPC)) |
23 | //#pragma message ( "THREADAPI_WINDOWS" ) | 19 | //#pragma message ( "THREADAPI_WINDOWS" ) |
24 | #define THREADAPI THREADAPI_WINDOWS | 20 | #define THREADAPI THREADAPI_WINDOWS |
25 | #else // (defined PLATFORM_WIN32) || (defined PLATFORM_POCKETPC) | 21 | #else // (defined PLATFORM_WIN32) || (defined PLATFORM_POCKETPC) |
@@ -68,16 +64,9 @@ enum e_status { PENDING, RUNNING, WAITING, DONE, ERROR_ST, CANCELLED }; | |||
68 | }; | 64 | }; |
69 | 65 | ||
70 | 66 | ||
71 | #define MUTEX_T HANDLE | ||
72 | void MUTEX_INIT( MUTEX_T* ref); | ||
73 | void MUTEX_FREE( MUTEX_T* ref); | ||
74 | void MUTEX_LOCK( MUTEX_T* ref); | ||
75 | void MUTEX_UNLOCK( MUTEX_T* ref); | ||
76 | |||
77 | #else // CONDITION_VARIABLE are available, use them | 67 | #else // CONDITION_VARIABLE are available, use them |
78 | 68 | ||
79 | #define SIGNAL_T CONDITION_VARIABLE | 69 | #define SIGNAL_T CONDITION_VARIABLE |
80 | #define MUTEX_T CRITICAL_SECTION | ||
81 | #define MUTEX_INIT( ref) InitializeCriticalSection( ref) | 70 | #define MUTEX_INIT( ref) InitializeCriticalSection( ref) |
82 | #define MUTEX_FREE( ref) DeleteCriticalSection( ref) | 71 | #define MUTEX_FREE( ref) DeleteCriticalSection( ref) |
83 | #define MUTEX_LOCK( ref) EnterCriticalSection( ref) | 72 | #define MUTEX_LOCK( ref) EnterCriticalSection( ref) |
@@ -111,7 +100,6 @@ enum e_status { PENDING, RUNNING, WAITING, DONE, ERROR_ST, CANCELLED }; | |||
111 | # define _MUTEX_RECURSIVE PTHREAD_MUTEX_RECURSIVE | 100 | # define _MUTEX_RECURSIVE PTHREAD_MUTEX_RECURSIVE |
112 | #endif | 101 | #endif |
113 | 102 | ||
114 | #define MUTEX_T pthread_mutex_t | ||
115 | #define MUTEX_INIT(ref) pthread_mutex_init(ref, nullptr) | 103 | #define MUTEX_INIT(ref) pthread_mutex_init(ref, nullptr) |
116 | #define MUTEX_RECURSIVE_INIT(ref) \ | 104 | #define MUTEX_RECURSIVE_INIT(ref) \ |
117 | { pthread_mutexattr_t a; pthread_mutexattr_init( &a ); \ | 105 | { pthread_mutexattr_t a; pthread_mutexattr_init( &a ); \ |
@@ -126,8 +114,6 @@ enum e_status { PENDING, RUNNING, WAITING, DONE, ERROR_ST, CANCELLED }; | |||
126 | 114 | ||
127 | using SIGNAL_T = pthread_cond_t; | 115 | using SIGNAL_T = pthread_cond_t; |
128 | 116 | ||
129 | void SIGNAL_ONE( SIGNAL_T *ref ); | ||
130 | |||
131 | // Yield is non-portable: | 117 | // Yield is non-portable: |
132 | // | 118 | // |
133 | // OS X 10.4.8/9 has pthread_yield_np() | 119 | // OS X 10.4.8/9 has pthread_yield_np() |
@@ -143,10 +129,6 @@ enum e_status { PENDING, RUNNING, WAITING, DONE, ERROR_ST, CANCELLED }; | |||
143 | #define THREAD_CALLCONV | 129 | #define THREAD_CALLCONV |
144 | #endif //THREADAPI == THREADAPI_PTHREAD | 130 | #endif //THREADAPI == THREADAPI_PTHREAD |
145 | 131 | ||
146 | void SIGNAL_INIT( SIGNAL_T *ref ); | ||
147 | void SIGNAL_FREE( SIGNAL_T *ref ); | ||
148 | void SIGNAL_ALL( SIGNAL_T *ref ); | ||
149 | |||
150 | /* | 132 | /* |
151 | * 'time_d': <0.0 for no timeout | 133 | * 'time_d': <0.0 for no timeout |
152 | * 0.0 for instant check | 134 | * 0.0 for instant check |
@@ -155,11 +137,6 @@ void SIGNAL_ALL( SIGNAL_T *ref ); | |||
155 | using time_d = double; | 137 | using time_d = double; |
156 | time_d now_secs(void); | 138 | time_d now_secs(void); |
157 | 139 | ||
158 | time_d SIGNAL_TIMEOUT_PREPARE( double rel_secs ); | ||
159 | |||
160 | bool SIGNAL_WAIT( SIGNAL_T *ref, MUTEX_T *mu, time_d timeout ); | ||
161 | |||
162 | |||
163 | /*---=== Threading ===--- | 140 | /*---=== Threading ===--- |
164 | */ | 141 | */ |
165 | 142 | ||
@@ -167,16 +144,9 @@ bool SIGNAL_WAIT( SIGNAL_T *ref, MUTEX_T *mu, time_d timeout ); | |||
167 | 144 | ||
168 | #if THREADAPI == THREADAPI_WINDOWS | 145 | #if THREADAPI == THREADAPI_WINDOWS |
169 | 146 | ||
170 | using THREAD_T = HANDLE; | ||
171 | # define THREAD_ISNULL( _h) (_h == 0) | ||
172 | void THREAD_CREATE( THREAD_T* ref, THREAD_RETURN_T (__stdcall *func)( void*), void* data, int prio /* -3..+3 */); | ||
173 | |||
174 | # define THREAD_PRIO_MIN (-3) | 147 | # define THREAD_PRIO_MIN (-3) |
175 | # define THREAD_PRIO_MAX (+3) | 148 | # define THREAD_PRIO_MAX (+3) |
176 | 149 | ||
177 | # define THREAD_CLEANUP_PUSH( cb_, val_) | ||
178 | # define THREAD_CLEANUP_POP( execute_) | ||
179 | |||
180 | #else // THREADAPI == THREADAPI_PTHREAD | 150 | #else // THREADAPI == THREADAPI_PTHREAD |
181 | 151 | ||
182 | /* Platforms that have a timed 'pthread_join()' can get away with a simpler | 152 | /* Platforms that have a timed 'pthread_join()' can get away with a simpler |
@@ -195,11 +165,6 @@ bool SIGNAL_WAIT( SIGNAL_T *ref, MUTEX_T *mu, time_d timeout ); | |||
195 | # endif | 165 | # endif |
196 | # endif | 166 | # endif |
197 | 167 | ||
198 | using THREAD_T = pthread_t; | ||
199 | # define THREAD_ISNULL( _h) 0 // pthread_t may be a structure: never 'null' by itself | ||
200 | |||
201 | void THREAD_CREATE( THREAD_T* ref, THREAD_RETURN_T (*func)( void*), void* data, int prio /* -3..+3 */); | ||
202 | |||
203 | # if defined(PLATFORM_LINUX) | 168 | # if defined(PLATFORM_LINUX) |
204 | extern volatile bool sudo; | 169 | extern volatile bool sudo; |
205 | # ifdef LINUX_SCHED_RR | 170 | # ifdef LINUX_SCHED_RR |
@@ -213,13 +178,6 @@ bool SIGNAL_WAIT( SIGNAL_T *ref, MUTEX_T *mu, time_d timeout ); | |||
213 | # define THREAD_PRIO_MAX (+3) | 178 | # define THREAD_PRIO_MAX (+3) |
214 | # endif | 179 | # endif |
215 | 180 | ||
216 | # if THREADWAIT_METHOD == THREADWAIT_CONDVAR | ||
217 | # define THREAD_CLEANUP_PUSH( cb_, val_) pthread_cleanup_push( cb_, val_) | ||
218 | # define THREAD_CLEANUP_POP( execute_) pthread_cleanup_pop( execute_) | ||
219 | # else | ||
220 | # define THREAD_CLEANUP_PUSH( cb_, val_) { | ||
221 | # define THREAD_CLEANUP_POP( execute_) } | ||
222 | # endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR | ||
223 | #endif // THREADAPI == THREADAPI_WINDOWS | 181 | #endif // THREADAPI == THREADAPI_WINDOWS |
224 | 182 | ||
225 | /* | 183 | /* |
@@ -236,16 +194,8 @@ bool SIGNAL_WAIT( SIGNAL_T *ref, MUTEX_T *mu, time_d timeout ); | |||
236 | #endif // THREADAPI == THREADAPI_WINDOWS || (defined PTHREAD_TIMEDJOIN) | 194 | #endif // THREADAPI == THREADAPI_WINDOWS || (defined PTHREAD_TIMEDJOIN) |
237 | 195 | ||
238 | 196 | ||
239 | #if THREADWAIT_METHOD == THREADWAIT_TIMEOUT | ||
240 | bool THREAD_WAIT_IMPL( THREAD_T *ref, double secs); | ||
241 | #define THREAD_WAIT( a, b, c, d, e) THREAD_WAIT_IMPL( a, b) | ||
242 | #else // THREADWAIT_METHOD == THREADWAIT_CONDVAR | ||
243 | bool THREAD_WAIT_IMPL( THREAD_T *ref, double secs, SIGNAL_T *signal_ref, MUTEX_T *mu_ref, volatile enum e_status *st_ref); | ||
244 | #define THREAD_WAIT THREAD_WAIT_IMPL | ||
245 | #endif // // THREADWAIT_METHOD == THREADWAIT_CONDVAR | ||
246 | |||
247 | void THREAD_KILL( THREAD_T* ref); | ||
248 | void THREAD_SETNAME( char const* _name); | 197 | void THREAD_SETNAME( char const* _name); |
249 | void THREAD_MAKE_ASYNCH_CANCELLABLE(); | ||
250 | void THREAD_SET_PRIORITY( int prio); | 198 | void THREAD_SET_PRIORITY( int prio); |
251 | void THREAD_SET_AFFINITY( unsigned int aff); | 199 | void THREAD_SET_AFFINITY( unsigned int aff); |
200 | |||
201 | void JTHREAD_SET_PRIORITY(std::jthread& thread_, int prio_); | ||
diff --git a/tests/cancel.lua b/tests/cancel.lua index c5bb761..c22103f 100644 --- a/tests/cancel.lua +++ b/tests/cancel.lua | |||
@@ -139,6 +139,7 @@ if not next(which_tests) or which_tests.linda then | |||
139 | linda:receive( 1, "yeah") | 139 | linda:receive( 1, "yeah") |
140 | 140 | ||
141 | -- linda cancel: linda:receive() returns cancel_error immediately | 141 | -- linda cancel: linda:receive() returns cancel_error immediately |
142 | print "cancelling" | ||
142 | linda:cancel( "both") | 143 | linda:cancel( "both") |
143 | 144 | ||
144 | -- wait until cancellation is effective. | 145 | -- wait until cancellation is effective. |
@@ -163,6 +164,7 @@ if not next(which_tests) or which_tests.soft then | |||
163 | waitCancellation( h, "waiting") | 164 | waitCancellation( h, "waiting") |
164 | 165 | ||
165 | -- soft cancel, this time awakens waiting linda operations, which returns cancel_error immediately, no timeout. | 166 | -- soft cancel, this time awakens waiting linda operations, which returns cancel_error immediately, no timeout. |
167 | print "cancelling" | ||
166 | h:cancel( "soft", true) | 168 | h:cancel( "soft", true) |
167 | 169 | ||
168 | -- wait until cancellation is effective. the lane will interrupt its loop and print the exit message | 170 | -- wait until cancellation is effective. the lane will interrupt its loop and print the exit message |
@@ -177,6 +179,7 @@ if not next(which_tests) or which_tests.hook then | |||
177 | linda:receive( 2, "yeah") | 179 | linda:receive( 2, "yeah") |
178 | 180 | ||
179 | -- count hook cancel after some instruction instructions | 181 | -- count hook cancel after some instruction instructions |
182 | print "cancelling" | ||
180 | h:cancel( "line", 300, 5.0) | 183 | h:cancel( "line", 300, 5.0) |
181 | 184 | ||
182 | -- wait until cancellation is effective. the lane will interrupt its loop and print the exit message | 185 | -- wait until cancellation is effective. the lane will interrupt its loop and print the exit message |
@@ -193,6 +196,7 @@ if not next(which_tests) or which_tests.hard then | |||
193 | linda:receive( 2, "yeah") | 196 | linda:receive( 2, "yeah") |
194 | 197 | ||
195 | -- hard cancel: the lane will be interrupted from inside its current linda:receive() and won't return from it | 198 | -- hard cancel: the lane will be interrupted from inside its current linda:receive() and won't return from it |
199 | print "cancelling" | ||
196 | h:cancel() | 200 | h:cancel() |
197 | 201 | ||
198 | -- wait until cancellation is effective. the lane will be stopped by the linda operation throwing an error | 202 | -- wait until cancellation is effective. the lane will be stopped by the linda operation throwing an error |
@@ -209,27 +213,13 @@ if not next(which_tests) or which_tests.hard_unprotected then | |||
209 | linda:receive( 2, "yeah") | 213 | linda:receive( 2, "yeah") |
210 | 214 | ||
211 | -- hard cancel: the lane will be interrupted from inside its current linda:receive() and won't return from it | 215 | -- hard cancel: the lane will be interrupted from inside its current linda:receive() and won't return from it |
216 | print "cancelling" | ||
212 | h:cancel() | 217 | h:cancel() |
213 | 218 | ||
214 | -- wait until cancellation is effective. the lane will be stopped by the linda operation throwing an error | 219 | -- wait until cancellation is effective. the lane will be stopped by the linda operation throwing an error |
215 | waitCancellation( h, "cancelled") | 220 | waitCancellation( h, "cancelled") |
216 | end | 221 | end |
217 | 222 | ||
218 | if not next(which_tests) or which_tests.kill then | ||
219 | remaining_tests.kill = nil | ||
220 | print "\n\n####################################################################\nbegin kill cancel test\n" | ||
221 | h = lanes.gen( "*", laneBody)( "busy", 50000000) -- start a pure Lua busy loop lane | ||
222 | |||
223 | -- wait 1/3s before cancelling the lane, before the busy loop can finish | ||
224 | print "wait 0.3s" | ||
225 | linda:receive( 0.3, "yeah") | ||
226 | |||
227 | -- hard cancel with kill: the lane thread will be forcefully terminated. kill timeout is pthread-specific | ||
228 | h:cancel( true, 1.0) | ||
229 | |||
230 | -- wait until cancellation is effective. the lane will be stopped by the linda operation throwing an error | ||
231 | waitCancellation( h, "killed") | ||
232 | end | ||
233 | --#################################################################### | 223 | --#################################################################### |
234 | 224 | ||
235 | local unknown_test, val = next(remaining_tests) | 225 | local unknown_test, val = next(remaining_tests) |