From 96daea993eeea17f0c64325491943e48795ff751 Mon Sep 17 00:00:00 2001 From: Benoit Germain Date: Mon, 8 Apr 2024 16:57:53 +0200 Subject: C++ migration: use std::jthread, std::condition_variable, std::chrono. win32 pthread support is gone new setting configure.shutdown_mode for cancellation of free-running threads at shutdown. no more hard thread termination! If a thread doesn't cooperate, an error is raised. lane.status "killed" is gone lane:cancel can't force-kill. --- docs/index.html | 49 +++-- src/cancel.cpp | 178 ++++++++--------- src/cancel.h | 8 +- src/keeper.cpp | 19 +- src/keeper.h | 10 +- src/lanes.cpp | 337 ++++++++++++------------------- src/lanes.lua | 451 +++++++++++++++++++++--------------------- src/lanes_private.h | 43 ++-- src/linda.cpp | 92 +++++---- src/macros_and_utils.h | 5 + src/threading.cpp | 525 +++---------------------------------------------- src/threading.h | 58 +----- tests/cancel.lua | 20 +- 13 files changed, 588 insertions(+), 1207 deletions(-) diff --git a/docs/index.html b/docs/index.html index ee5acfa..d24d3d7 100644 --- a/docs/index.html +++ b/docs/index.html @@ -425,7 +425,18 @@ number >= 0 - Sets the duration in seconds Lanes will wait for graceful termination of running lanes at application shutdown. Irrelevant for builds using pthreads. Default is 0.25. + Sets the duration in seconds Lanes will wait for graceful termination of running lanes at application shutdown. Default is 0.25. + + + + + .shutdown_mode + + + "hard"/"soft"/"call"/"ret"/"line"/"count" + + + Select the cancellation mode used at Lanes shutdown to request free running lane termination. See lane cancellation. Default is "hard". @@ -875,16 +886,6 @@ received cancellation and finished itself. - - - - "killed" - - - - was forcefully killed by lane_h:cancel() - -

@@ -996,36 +997,33 @@

Cancelling

-	bool[,reason] = lane_h:cancel( "soft" [, timeout] [, wake_bool])
-	bool[,reason] = lane_h:cancel( "hard" [, timeout] [, force [, forcekill_timeout]])
-	bool[,reason] = lane_h:cancel( [mode, hookcount] [, timeout] [, force [, forcekill_timeout]])
+	bool[,reason] = lane_h:cancel( "soft" [, timeout] [, wake_lane])
+	bool[,reason] = lane_h:cancel( "hard" [, timeout] [, wake_lane])
+	bool[,reason] = lane_h:cancel( [mode, hookcount] [, timeout] [, wake_lane])
 

cancel() sends a cancellation request to the lane.
- First argument is a mode can be one of "hard", "soft", "count", "line", "call", "ret". + First argument is a mode can be one of "hard", "soft", "call", "ret", "line", "count". If mode is not specified, it defaults to "hard". + If wake_lane is true, the lane is also signalled so that execution returns from any pending linda operation. Linda operations detecting the cancellation request return lanes.cancel_error.

If mode is "soft", cancellation will only cause cancel_test() to return true, so that the lane can cleanup manually.
- If wake_bool is true, the lane is also signalled so that execution returns from any pending linda operation. Linda operations detecting the cancellation request return lanes.cancel_error.

If mode is "hard", waits for the request to be processed, or a timeout to occur. Linda operations detecting the cancellation request will raise a special cancellation error (meaning they won't return in that case).
- timeout defaults to 0 if not specified. + wake_lane defaults to true, and timeout defaults to 0 if not specified.

Other values of mode will asynchronously install the corresponding hook, then behave as "hard".

-

- If force_kill_bool is true, forcekill_timeout can be set to tell how long lanes will wait for the OS thread to terminate before raising an error. Windows threads always terminate immediately, but it might not always be the case with some pthread implementations. -

Returns true, lane_h.status if lane was already done (in "done", "error" or "cancelled" status), or the cancellation was fruitful within timeout_secs timeout period.
Returns false, "timeout" otherwise.

- If the lane is still running after the timeout expired and force_kill is true, the OS thread running the lane is forcefully killed. This means no GC, probable OS resource leaks (thread stack, locks, DLL notifications), and should generally be the last resort. + If the lane is still running after the timeout expired, there is a chance lanes will raise an error at shutdown when failing to terminate all free-running lanes within the specified timeout.

Cancellation is tested before going to sleep in receive() or send() calls and after executing cancelstep Lua statements. A pending receive()or send() call is awakened. @@ -1396,6 +1394,14 @@ events to a common Linda, but... :). Default duration is null, which should only cause a thread context switch.

+
+	number = lanes.now_secs()
+
+ +

+ Returns the current value of the clock used by timers and lindas. +

+

Locks etc.

@@ -1797,3 +1803,4 @@ int luaD_new_clonable( lua_State* L) + \ No newline at end of file diff --git a/src/cancel.cpp b/src/cancel.cpp index 4667f07..6a94343 100644 --- a/src/cancel.cpp +++ b/src/cancel.cpp @@ -92,7 +92,7 @@ static void cancel_hook(lua_State* L, [[maybe_unused]] lua_Debug* ar) // ################################################################################################ //--- -// = thread_cancel( lane_ud [,timeout_secs=0.0] [,force_kill_bool=false] ) +// = thread_cancel( lane_ud [,timeout_secs=0.0] [,wake_lindas_bool=false] ) // // The originator thread asking us specifically to cancel the other thread. // @@ -100,9 +100,8 @@ static void cancel_hook(lua_State* L, [[maybe_unused]] lua_Debug* ar) // 0.0: just signal it to cancel, no time waited // >0: time to wait for the lane to detect cancellation // -// 'force_kill': if true, and lane does not detect cancellation within timeout, -// it is forcefully killed. Using this with 0.0 timeout means just kill -// (unless the lane is already finished). +// 'wake_lindas_bool': if true, signal any linda the thread is waiting on +// instead of waiting for its timeout (if any) // // Returns: true if the lane was already finished (DONE/ERROR_ST/CANCELLED) or if we // managed to cancel it. @@ -111,76 +110,47 @@ static void cancel_hook(lua_State* L, [[maybe_unused]] lua_Debug* ar) // ################################################################################################ -static CancelResult thread_cancel_soft(Lane* lane_, double secs_, bool wake_lindas_) +static CancelResult thread_cancel_soft(Lane* lane_, lua_Duration duration_, bool wake_lane_) { lane_->cancel_request = CancelRequest::Soft; // it's now signaled to stop // negative timeout: we don't want to truly abort the lane, we just want it to react to cancel_test() on its own - if (wake_lindas_) // wake the thread so that execution returns from any pending linda operation if desired + if (wake_lane_) // wake the thread so that execution returns from any pending linda operation if desired { - SIGNAL_T* const waiting_on{ lane_->waiting_on }; + std::condition_variable* const waiting_on{ lane_->m_waiting_on }; if (lane_->status == WAITING && waiting_on != nullptr) { - SIGNAL_ALL( waiting_on); + waiting_on->notify_all(); } } - return THREAD_WAIT(&lane_->thread, secs_, &lane_->done_signal, &lane_->done_lock, &lane_->status) ? CancelResult::Cancelled : CancelResult::Timeout; + return lane_->waitForCompletion(duration_) ? CancelResult::Cancelled : CancelResult::Timeout; } // ################################################################################################ -static CancelResult thread_cancel_hard(lua_State* L, Lane* lane_, double secs_, bool force_, double waitkill_timeout_) +static CancelResult thread_cancel_hard(Lane* lane_, lua_Duration duration_, bool wake_lane_) { lane_->cancel_request = CancelRequest::Hard; // it's now signaled to stop + //lane_->m_thread.get_stop_source().request_stop(); + if (wake_lane_) // wake the thread so that execution returns from any pending linda operation if desired { - SIGNAL_T* waiting_on = lane_->waiting_on; + std::condition_variable* waiting_on = lane_->m_waiting_on; if (lane_->status == WAITING && waiting_on != nullptr) { - SIGNAL_ALL( waiting_on); + waiting_on->notify_all(); } } - CancelResult result{ THREAD_WAIT(&lane_->thread, secs_, &lane_->done_signal, &lane_->done_lock, &lane_->status) ? CancelResult::Cancelled : CancelResult::Timeout }; - - if ((result == CancelResult::Timeout) && force_) - { - // Killing is asynchronous; we _will_ wait for it to be done at - // GC, to make sure the data structure can be released (alternative - // would be use of "cancellation cleanup handlers" that at least - // PThread seems to have). - // - THREAD_KILL(&lane_->thread); -#if THREADAPI == THREADAPI_PTHREAD - // pthread: make sure the thread is really stopped! - // note that this may block forever if the lane doesn't call a cancellation point and pthread doesn't honor PTHREAD_CANCEL_ASYNCHRONOUS - result = THREAD_WAIT(&lane_->thread, waitkill_timeout_, &lane_->done_signal, &lane_->done_lock, &lane_->status) ? CancelResult::Killed : CancelResult::Timeout; - if (result == CancelResult::Timeout) - { - std::ignore = luaL_error( L, "force-killed lane failed to terminate within %f second%s", waitkill_timeout_, waitkill_timeout_ > 1 ? "s" : ""); - } -#else - (void) waitkill_timeout_; // unused - (void) L; // unused -#endif // THREADAPI == THREADAPI_PTHREAD - lane_->mstatus = Lane::Killed; // mark 'gc' to wait for it - // note that lane_->status value must remain to whatever it was at the time of the kill - // because we need to know if we can lua_close() the Lua State or not. - result = CancelResult::Killed; - } + CancelResult result{ lane_->waitForCompletion(duration_) ? CancelResult::Cancelled : CancelResult::Timeout }; return result; } // ################################################################################################ -CancelResult thread_cancel(lua_State* L, Lane* lane_, CancelOp op_, double secs_, bool force_, double waitkill_timeout_) +CancelResult thread_cancel(Lane* lane_, CancelOp op_, int hook_count_, lua_Duration duration_, bool wake_lane_) { // remember that lanes are not transferable: only one thread can cancel a lane, so no multithreading issue here // We can read 'lane_->status' without locks, but not wait for it (if Posix no PTHREAD_TIMEDJOIN) - if (lane_->mstatus == Lane::Killed) - { - return CancelResult::Killed; - } - if (lane_->status >= DONE) { // say "ok" by default, including when lane is already done @@ -191,48 +161,57 @@ CancelResult thread_cancel(lua_State* L, Lane* lane_, CancelOp op_, double secs_ // let us hope we never land here with a pointer on a linda that has been destroyed... if (op_ == CancelOp::Soft) { - return thread_cancel_soft(lane_, secs_, force_); + return thread_cancel_soft(lane_, duration_, wake_lane_); + } + else if (static_cast(op_) > static_cast(CancelOp::Soft)) + { + lua_sethook(lane_->L, cancel_hook, static_cast(op_), hook_count_); } - return thread_cancel_hard(L, lane_, secs_, force_, waitkill_timeout_); + return thread_cancel_hard(lane_, duration_, wake_lane_); } // ################################################################################################ // ################################################################################################ -// > 0: the mask -// = 0: soft -// < 0: hard -static CancelOp which_op(lua_State* L, int idx_) +CancelOp which_cancel_op(char const* op_string_) +{ + CancelOp op{ CancelOp::Invalid }; + if (strcmp(op_string_, "hard") == 0) + { + op = CancelOp::Hard; + } + else if (strcmp(op_string_, "soft") == 0) + { + op = CancelOp::Soft; + } + else if (strcmp(op_string_, "call") == 0) + { + op = CancelOp::MaskCall; + } + else if (strcmp(op_string_, "ret") == 0) + { + op = CancelOp::MaskRet; + } + else if (strcmp(op_string_, "line") == 0) + { + op = CancelOp::MaskLine; + } + else if (strcmp(op_string_, "count") == 0) + { + op = CancelOp::MaskCount; + } + return op; +} + +// ################################################################################################ + +static CancelOp which_cancel_op(lua_State* L, int idx_) { if (lua_type(L, idx_) == LUA_TSTRING) { - CancelOp op{ CancelOp::Invalid }; - char const* str = lua_tostring(L, idx_); - if (strcmp(str, "hard") == 0) - { - op = CancelOp::Hard; - } - else if (strcmp(str, "soft") == 0) - { - op = CancelOp::Soft; - } - else if (strcmp(str, "call") == 0) - { - op = CancelOp::MaskCall; - } - else if (strcmp(str, "ret") == 0) - { - op = CancelOp::MaskRet; - } - else if (strcmp(str, "line") == 0) - { - op = CancelOp::MaskLine; - } - else if (strcmp(str, "count") == 0) - { - op = CancelOp::MaskCount; - } + char const* const str{ lua_tostring(L, idx_) }; + CancelOp op{ which_cancel_op(str) }; lua_remove(L, idx_); // argument is processed, remove it if (op == CancelOp::Invalid) { @@ -245,53 +224,60 @@ static CancelOp which_op(lua_State* L, int idx_) // ################################################################################################ -// bool[,reason] = lane_h:cancel( [mode, hookcount] [, timeout] [, force [, forcekill_timeout]]) +// bool[,reason] = lane_h:cancel( [mode, hookcount] [, timeout] [, wake_lindas]) LUAG_FUNC(thread_cancel) { Lane* const lane{ lua_toLane(L, 1) }; - CancelOp const op{ which_op(L, 2) }; // this removes the op string from the stack + CancelOp const op{ which_cancel_op(L, 2) }; // this removes the op string from the stack + int hook_count{ 0 }; if (static_cast(op) > static_cast(CancelOp::Soft)) // hook is requested { - int const hook_count{ static_cast(lua_tointeger(L, 2)) }; + hook_count = static_cast(luaL_checkinteger(L, 2)); lua_remove(L, 2); // argument is processed, remove it if (hook_count < 1) { return luaL_error(L, "hook count cannot be < 1"); } - lua_sethook(lane->L, cancel_hook, static_cast(op), hook_count); } - double secs{ 0.0 }; + lua_Duration wait_timeout{ 0.0 }; if (lua_type(L, 2) == LUA_TNUMBER) { - secs = lua_tonumber(L, 2); + wait_timeout = lua_Duration{ lua_tonumber(L, 2) }; lua_remove(L, 2); // argument is processed, remove it - if (secs < 0.0) + if (wait_timeout.count() < 0.0) { return luaL_error(L, "cancel timeout cannot be < 0"); } } - - bool const force{ lua_toboolean(L, 2) ? true : false }; // false if nothing there - double const forcekill_timeout{ luaL_optnumber(L, 3, 0.0) }; - switch (thread_cancel(L, lane, op, secs, force, forcekill_timeout)) + // we wake by default in "hard" mode (remember that hook is hard too), but this can be turned off if desired + bool wake_lane{ op != CancelOp::Soft }; + if (lua_gettop(L) >= 2) + { + if (!lua_isboolean(L, 2)) + { + return luaL_error(L, "wake_lindas parameter is not a boolean"); + } + wake_lane = lua_toboolean(L, 2); + lua_remove(L, 2); // argument is processed, remove it + } + switch (thread_cancel(lane, op, hook_count, wait_timeout, wake_lane)) { + default: // should never happen unless we added a case and forgot to handle it + ASSERT_L(false); + break; + case CancelResult::Timeout: lua_pushboolean(L, 0); lua_pushstring(L, "timeout"); - return 2; + break; case CancelResult::Cancelled: lua_pushboolean(L, 1); push_thread_status(L, lane); - return 2; - - case CancelResult::Killed: - lua_pushboolean(L, 1); - push_thread_status(L, lane); - return 2; + break; } // should never happen, only here to prevent the compiler from complaining of "not all control paths returning a value" - return 0; + return 2; } diff --git a/src/cancel.h b/src/cancel.h index 884e193..954b04e 100644 --- a/src/cancel.h +++ b/src/cancel.h @@ -13,6 +13,8 @@ extern "C" { #include "uniquekey.h" #include "macros_and_utils.h" +#include + // ################################################################################################ class Lane; // forward @@ -30,8 +32,7 @@ enum class CancelRequest enum class CancelResult { Timeout, - Cancelled, - Killed + Cancelled }; enum class CancelOp @@ -48,7 +49,8 @@ enum class CancelOp // crc64/we of string "CANCEL_ERROR" generated at http://www.nitrxgen.net/hashgen/ static constexpr UniqueKey CANCEL_ERROR{ 0xe97d41626cc97577ull }; // 'raise_cancel_error' sentinel -CancelResult thread_cancel(lua_State* L, Lane* lane_, CancelOp op_, double secs_, bool force_, double waitkill_timeout_); +CancelOp which_cancel_op(char const* op_string_); +CancelResult thread_cancel(Lane* lane_, CancelOp op_, int hook_count_, lua_Duration secs_, bool wake_lindas_); [[noreturn]] static inline void raise_cancel_error(lua_State* L) { diff --git a/src/keeper.cpp b/src/keeper.cpp index 937d190..0aea18e 100644 --- a/src/keeper.cpp +++ b/src/keeper.cpp @@ -627,7 +627,7 @@ void close_keepers(Universe* U) } for (int i = 0; i < nbKeepers; ++i) { - MUTEX_FREE(&U->keepers->keeper_array[i].keeper_cs); + U->keepers->keeper_array[i].~Keeper(); } // free the keeper bookkeeping structure U->internal_allocator.free(U->keepers, sizeof(Keepers) + (nbKeepers - 1) * sizeof(Keeper)); @@ -673,9 +673,14 @@ void init_keepers(Universe* U, lua_State* L) { std::ignore = luaL_error(L, "init_keepers() failed while creating keeper array; out of memory"); } - memset(U->keepers, 0, bytes); + U->keepers->Keepers::Keepers(); U->keepers->gc_threshold = keepers_gc_threshold; U->keepers->nb_keepers = nb_keepers; + + for (int i = 0; i < nb_keepers; ++i) + { + U->keepers->keeper_array[i].Keeper::Keeper(); + } } for (int i = 0; i < nb_keepers; ++i) // keepersUD { @@ -687,10 +692,6 @@ void init_keepers(Universe* U, lua_State* L) } U->keepers->keeper_array[i].L = K; - // we can trigger a GC from inside keeper_call(), where a keeper is acquired - // from there, GC can collect a linda, which would acquire the keeper again, and deadlock the thread. - // therefore, we need a recursive mutex. - MUTEX_RECURSIVE_INIT(&U->keepers->keeper_array[i].keeper_cs); if (U->keepers->gc_threshold >= 0) { @@ -772,8 +773,7 @@ Keeper* keeper_acquire(Keepers* keepers_, uintptr_t magic_) */ unsigned int i = (unsigned int)((magic_ >> KEEPER_MAGIC_SHIFT) % nbKeepers); Keeper* K = &keepers_->keeper_array[i]; - - MUTEX_LOCK( &K->keeper_cs); + K->m_mutex.lock(); //++ K->count; return K; } @@ -787,7 +787,7 @@ void keeper_release(Keeper* K) //-- K->count; if (K) { - MUTEX_UNLOCK(&K->keeper_cs); + K->m_mutex.unlock(); } } @@ -843,7 +843,6 @@ int keeper_call(Universe* U, lua_State* K, keeper_api_t func_, lua_State* L, voi if ((args == 0) || luaG_inter_copy(U, L, K, args, LookupMode::ToKeeper) == 0) // L->K { lua_call(K, 1 + args, LUA_MULTRET); - retvals = lua_gettop(K) - Ktos; // note that this can raise a luaL_error while the keeper state (and its mutex) is acquired // this may interrupt a lane, causing the destruction of the underlying OS thread diff --git a/src/keeper.h b/src/keeper.h index f7e3951..931c1d5 100644 --- a/src/keeper.h +++ b/src/keeper.h @@ -11,21 +11,23 @@ extern "C" { #include "threading.h" #include "uniquekey.h" +#include + // forwards enum class LookupMode; struct Universe; struct Keeper { - MUTEX_T keeper_cs; - lua_State* L; + std::mutex m_mutex; + lua_State* L{ nullptr }; // int count; }; struct Keepers { int gc_threshold{ 0 }; - int nb_keepers; + int nb_keepers{ 0 }; Keeper keeper_array[1]; }; @@ -38,7 +40,7 @@ void close_keepers(Universe* U); Keeper* which_keeper(Keepers* keepers_, uintptr_t magic_); Keeper* keeper_acquire(Keepers* keepers_, uintptr_t magic_); -void keeper_release(Keeper* K); +void keeper_release(Keeper* K_); void keeper_toggle_nil_sentinels(lua_State* L, int val_i_, LookupMode const mode_); int keeper_push_linda_storage(Universe* U, lua_State* L, void* ptr_, uintptr_t magic_); diff --git a/src/lanes.cpp b/src/lanes.cpp index 08584a2..4dd9b46 100644 --- a/src/lanes.cpp +++ b/src/lanes.cpp @@ -108,11 +108,6 @@ Lane::Lane(Universe* U_, lua_State* L_) : U{ U_ } , L{ L_ } { -#if THREADWAIT_METHOD == THREADWAIT_CONDVAR - MUTEX_INIT(&done_lock); - SIGNAL_INIT(&done_signal); -#endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR - #if HAVE_LANE_TRACKING() if (U->tracking_first) { @@ -121,6 +116,29 @@ Lane::Lane(Universe* U_, lua_State* L_) #endif // HAVE_LANE_TRACKING() } +bool Lane::waitForCompletion(lua_Duration duration_) +{ + std::chrono::time_point until{ std::chrono::time_point::max() }; + if (duration_.count() >= 0.0) + { + until = std::chrono::steady_clock::now() + std::chrono::duration_cast(duration_); + } + + std::unique_lock lock{ m_done_mutex }; + //std::stop_token token{ m_thread.get_stop_token() }; + //return m_done_signal.wait_for(lock, token, secs_, [this](){ return status >= DONE; }); + return m_done_signal.wait_until(lock, until, [this](){ return status >= DONE; }); +} + +static void lane_main(Lane* lane); +void Lane::startThread(int priority_) +{ + m_thread = std::jthread([this]() { lane_main(this); }); + if (priority_ != THREAD_PRIO_DEFAULT) + { + JTHREAD_SET_PRIORITY(m_thread, priority_); + } +} /* Do you want full call stacks, or just the line where the error happened? * @@ -144,7 +162,7 @@ static void securize_debug_threadname(lua_State* L, Lane* lane_) } #if ERROR_FULL_STACK -static int lane_error( lua_State* L); +static int lane_error(lua_State* L); // crc64/we of string "STACKTRACE_REGKEY" generated at http://www.nitrxgen.net/hashgen/ static constexpr UniqueKey STACKTRACE_REGKEY{ 0x534af7d3226a429full }; #endif // ERROR_FULL_STACK @@ -255,11 +273,6 @@ Lane::~Lane() { // Clean up after a (finished) thread // -#if THREADWAIT_METHOD == THREADWAIT_CONDVAR - SIGNAL_FREE(&done_signal); - MUTEX_FREE(&done_lock); -#endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR - #if HAVE_LANE_TRACKING() if (U->tracking_first != nullptr) { @@ -455,26 +468,27 @@ static bool selfdestruct_remove(Lane* lane_) static int universe_gc( lua_State* L) { Universe* const U{ lua_tofulluserdata(L, 1) }; + lua_Duration const shutdown_timeout{ lua_tonumber(L, lua_upvalueindex(1)) }; + [[maybe_unused]] char const* const op_string{ lua_tostring(L, lua_upvalueindex(2)) }; + CancelOp const op{ which_cancel_op(op_string) }; - while (U->selfdestruct_first != SELFDESTRUCT_END) // true at most once! + if (U->selfdestruct_first != SELFDESTRUCT_END) { + // Signal _all_ still running threads to exit (including the timer thread) // { std::lock_guard guard{ U->selfdestruct_cs }; Lane* lane{ U->selfdestruct_first }; + lua_Duration timeout{ 1us }; while (lane != SELFDESTRUCT_END) { - // attempt a regular unforced hard cancel with a small timeout - bool const cancelled{ THREAD_ISNULL(lane->thread) || thread_cancel(L, lane, CancelOp::Hard, 0.0001, false, 0.0) != CancelResult::Timeout }; - // if we failed, and we know the thread is waiting on a linda - if (cancelled == false && lane->status == WAITING && lane->waiting_on != nullptr) + // attempt the requested cancel with a small timeout. + // if waiting on a linda, they will raise a cancel_error. + // if a cancellation hook is desired, it will be installed to try to raise an error + if (lane->m_thread.joinable()) { - // signal the linda to wake up the thread so that it can react to the cancel query - // let us hope we never land here with a pointer on a linda that has been destroyed... - SIGNAL_T* const waiting_on{ lane->waiting_on }; - // lane->waiting_on = nullptr; // useful, or not? - SIGNAL_ALL(waiting_on); + std::ignore = thread_cancel(lane, op, 1, timeout, true); } lane = lane->selfdestruct_next; } @@ -482,47 +496,32 @@ static int universe_gc( lua_State* L) // When noticing their cancel, the lanes will remove themselves from // the selfdestruct chain. - - // TBD: Not sure if Windows (multi core) will require the timed approach, - // or single Yield. I don't have machine to test that (so leaving - // for timed approach). -- AKa 25-Oct-2008 - - // OS X 10.5 (Intel) needs more to avoid segfaults. - // - // "make test" is okay. 100's of "make require" are okay. - // - // Tested on MacBook Core Duo 2GHz and 10.5.5: - // -- AKa 25-Oct-2008 - // { - lua_Number const shutdown_timeout = lua_tonumber(L, lua_upvalueindex(1)); - double const t_until = now_secs() + shutdown_timeout; + std::chrono::time_point t_until{ std::chrono::steady_clock::now() + std::chrono::duration_cast(shutdown_timeout) }; while (U->selfdestruct_first != SELFDESTRUCT_END) { - YIELD(); // give threads time to act on their cancel + // give threads time to act on their cancel + YIELD(); + // count the number of cancelled thread that didn't have the time to act yet + int n{ 0 }; { - // count the number of cancelled thread that didn't have the time to act yet - int n = 0; - double t_now = 0.0; + std::lock_guard guard{ U->selfdestruct_cs }; + Lane* lane{ U->selfdestruct_first }; + while (lane != SELFDESTRUCT_END) { - std::lock_guard guard{ U->selfdestruct_cs }; - Lane* lane{ U->selfdestruct_first }; - while (lane != SELFDESTRUCT_END) - { - if (lane->cancel_request == CancelRequest::Hard) - ++n; - lane = lane->selfdestruct_next; - } - } - // if timeout elapsed, or we know all threads have acted, stop waiting - t_now = now_secs(); - if (n == 0 || (t_now >= t_until)) - { - DEBUGSPEW_CODE(fprintf(stderr, "%d uncancelled lane(s) remain after waiting %fs at process end.\n", n, shutdown_timeout - (t_until - t_now))); - break; + if (lane->cancel_request != CancelRequest::None) + ++n; + lane = lane->selfdestruct_next; } } + // if timeout elapsed, or we know all threads have acted, stop waiting + std::chrono::time_point t_now = std::chrono::steady_clock::now(); + if (n == 0 || (t_now >= t_until)) + { + DEBUGSPEW_CODE(fprintf(stderr, "%d uncancelled lane(s) remain after waiting %fs at process end.\n", n, shutdown_timeout.count())); + break; + } } } @@ -532,48 +531,17 @@ static int universe_gc( lua_State* L) { YIELD(); } - - //--- - // Kill the still free running threads - // - if (U->selfdestruct_first != SELFDESTRUCT_END) - { - unsigned int n = 0; - // first thing we did was to raise the linda signals the threads were waiting on (if any) - // therefore, any well-behaved thread should be in CANCELLED state - // these are not running, and the state can be closed - { - std::lock_guard guard{ U->selfdestruct_cs }; - Lane* lane{ U->selfdestruct_first }; - while (lane != SELFDESTRUCT_END) - { - Lane* const next_s{ lane->selfdestruct_next }; - lane->selfdestruct_next = nullptr; // detach from selfdestruct chain - if (!THREAD_ISNULL(lane->thread)) // can be nullptr if previous 'soft' termination succeeded - { - THREAD_KILL(&lane->thread); -#if THREADAPI == THREADAPI_PTHREAD - // pthread: make sure the thread is really stopped! - THREAD_WAIT(&lane->thread, -1, &lane->done_signal, &lane->done_lock, &lane->status); -#endif // THREADAPI == THREADAPI_PTHREAD - } - // NO lua_close() in this case because we don't know where execution of the state was interrupted - delete lane; - lane = next_s; - ++n; - } - U->selfdestruct_first = SELFDESTRUCT_END; - } - - DEBUGSPEW_CODE(fprintf(stderr, "Killed %d lane(s) at process end.\n", n)); - } } - // If some lanes are currently cleaning after themselves, wait until they are done. - // They are no longer listed in the selfdestruct chain, but they still have to lua_close(). - while (U->selfdestructing_count.load(std::memory_order_acquire) > 0) + // If after all this, we still have some free-running lanes, it's an external user error, they should have stopped appropriately { - YIELD(); + std::lock_guard guard{ U->selfdestruct_cs }; + Lane* lane{ U->selfdestruct_first }; + if (lane != SELFDESTRUCT_END) + { + // this causes a leak because we don't call U's destructor (which could be bad if the still running lanes are accessing it) + std::ignore = luaL_error(L, "Zombie thread %s refuses to die!", lane->debug_name); + } } // necessary so that calling free_deep_prelude doesn't crash because linda_id expects a linda lightuserdata at absolute slot 1 @@ -874,20 +842,8 @@ static char const* get_errcode_name( int _code) } #endif // USE_DEBUG_SPEW() -#if THREADWAIT_METHOD == THREADWAIT_CONDVAR // implies THREADAPI == THREADAPI_PTHREAD -static void thread_cleanup_handler(void* opaque) +static void lane_main(Lane* lane) { - Lane* lane{ (Lane*) opaque }; - MUTEX_LOCK(&lane->done_lock); - lane->status = CANCELLED; - SIGNAL_ONE(&lane->done_signal); // wake up master (while 'lane->done_lock' is on) - MUTEX_UNLOCK(&lane->done_lock); -} -#endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR - -static THREAD_RETURN_T THREAD_CALLCONV lane_main(void* vs) -{ - Lane* lane{ (Lane*) vs }; lua_State* const L{ lane->L }; // wait until the launching thread has finished preparing L lane->m_ready.wait(); @@ -897,8 +853,6 @@ static THREAD_RETURN_T THREAD_CALLCONV lane_main(void* vs) // At this point, the lane function and arguments are on the stack int const nargs{ lua_gettop(L) - 1 }; DEBUGSPEW_CODE(Universe* U = universe_get(L)); - THREAD_MAKE_ASYNCH_CANCELLABLE(); - THREAD_CLEANUP_PUSH(thread_cleanup_handler, lane); lane->status = RUNNING; // PENDING -> RUNNING // Tie "set_finalizer()" to the state @@ -949,18 +903,19 @@ static THREAD_RETURN_T THREAD_CALLCONV lane_main(void* vs) // the finalizer generated an error, and left its own error message [and stack trace] on the stack rc = rc2; // we're overruling the earlier script error or normal return } - lane->waiting_on = nullptr; // just in case + lane->m_waiting_on = nullptr; // just in case if (selfdestruct_remove(lane)) // check and remove (under lock!) { // We're a free-running thread and no-one's there to clean us up. - // lua_close(lane->L); - + lane->L = nullptr; // just in case lane->U->selfdestruct_cs.lock(); // done with lua_close(), terminal shutdown sequence may proceed lane->U->selfdestructing_count.fetch_sub(1, std::memory_order_release); lane->U->selfdestruct_cs.unlock(); + // we destroy our jthread member from inside the thread body, so we have to detach so that we don't try to join, as this doesn't seem a good idea + lane->m_thread.detach(); delete lane; lane = nullptr; } @@ -972,21 +927,14 @@ static THREAD_RETURN_T THREAD_CALLCONV lane_main(void* vs) enum e_status st = (rc == 0) ? DONE : CANCEL_ERROR.equals(L, 1) ? CANCELLED : ERROR_ST; // Posix no PTHREAD_TIMEDJOIN: - // 'done_lock' protects the -> DONE|ERROR_ST|CANCELLED state change + // 'm_done_mutex' protects the -> DONE|ERROR_ST|CANCELLED state change // -#if THREADWAIT_METHOD == THREADWAIT_CONDVAR - MUTEX_LOCK(&lane->done_lock); { -#endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR + std::lock_guard lock{ lane->m_done_mutex }; lane->status = st; -#if THREADWAIT_METHOD == THREADWAIT_CONDVAR - SIGNAL_ONE(&lane->done_signal); // wake up master (while 'lane->done_lock' is on) + lane->m_done_signal.notify_one();// wake up master (while 'lane->m_done_mutex' is on) } - MUTEX_UNLOCK(&lane->done_lock); -#endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR } - THREAD_CLEANUP_POP(false); - return 0; // ignored } // ################################################################################################# @@ -1115,13 +1063,11 @@ LUAG_FUNC(lane_new) // leave a single cancel_error on the stack for the caller lua_settop(m_lane->L, 0); CANCEL_ERROR.pushKey(m_lane->L); -#if THREADWAIT_METHOD == THREADWAIT_CONDVAR - MUTEX_LOCK(&m_lane->done_lock); -#endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR - m_lane->status = CANCELLED; -#if THREADWAIT_METHOD == THREADWAIT_CONDVAR - MUTEX_UNLOCK(&m_lane->done_lock); -#endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR + { + std::lock_guard lock{ m_lane->m_done_mutex }; + m_lane->status = CANCELLED; + m_lane->m_done_signal.notify_one(); // wake up master (while 'lane->m_done_mutex' is on) + } // unblock the thread so that it can terminate gracefully m_lane->m_ready.count_down(); } @@ -1170,7 +1116,7 @@ LUAG_FUNC(lane_new) } onExit{ L, lane, gc_cb_idx }; // launch the thread early, it will sync with a std::latch to parallelize OS thread warmup and L2 preparation DEBUGSPEW_CODE(fprintf(stderr, INDENT_BEGIN "lane_new: launching thread\n" INDENT_END)); - THREAD_CREATE(&lane->thread, lane_main, lane, priority); + lane->startThread(priority); STACK_GROW( L2, nargs + 3); // STACK_CHECK_START_REL(L2, 0); @@ -1347,7 +1293,7 @@ LUAG_FUNC(lane_new) static int lane_gc(lua_State* L) { bool have_gc_cb{ false }; - Lane* lane{ lua_toLane(L, 1) }; // ud + Lane* const lane{ lua_toLane(L, 1) }; // ud // if there a gc callback? lua_getiuservalue(L, 1, 1); // ud uservalue @@ -1365,30 +1311,7 @@ static int lane_gc(lua_State* L) } // We can read 'lane->status' without locks, but not wait for it - // test Killed state first, as it doesn't need to enter the selfdestruct chain - if (lane->mstatus == Lane::Killed) - { - // Make sure a kill has proceeded, before cleaning up the data structure. - // - // NO lua_close() in this case because we don't know where execution of the state was interrupted - DEBUGSPEW_CODE(fprintf(stderr, "** Joining with a killed thread (needs testing) **")); - // make sure the thread is no longer running, just like thread_join() - if (!THREAD_ISNULL(lane->thread)) - { - THREAD_WAIT(&lane->thread, -1, &lane->done_signal, &lane->done_lock, &lane->status); - } - if (lane->status >= DONE && lane->L) - { - // we know the thread was killed while the Lua VM was not doing anything: we should be able to close it without crashing - // now, thread_cancel() will not forcefully kill a lane with lane->status >= DONE, so I am not sure it can ever happen - lua_close(lane->L); - lane->L = nullptr; - // just in case, but s will be freed soon so... - lane->debug_name = ""; - } - DEBUGSPEW_CODE(fprintf(stderr, "** Joined ok **")); - } - else if (lane->status < DONE) + if (lane->status < DONE) { // still running: will have to be cleaned up later selfdestruct_add(lane); @@ -1437,7 +1360,6 @@ static char const * thread_status_string(Lane* lane_) { enum e_status const st{ lane_->status }; // read just once (volatile) char const* str = - (lane_->mstatus == Lane::Killed) ? "killed" : // new to v3.3.0! (st == PENDING) ? "pending" : (st == RUNNING) ? "running" : // like in 'co.status()' (st == WAITING) ? "waiting" : @@ -1471,9 +1393,10 @@ int push_thread_status(lua_State* L, Lane* lane_) LUAG_FUNC(thread_join) { Lane* const lane{ lua_toLane(L, 1) }; - lua_Number const wait_secs{ luaL_optnumber(L, 2, -1.0) }; + lua_Duration const duration{ luaL_optnumber(L, 2, -1.0) }; lua_State* const L2{ lane->L }; - bool const done{ THREAD_ISNULL(lane->thread) || THREAD_WAIT(&lane->thread, wait_secs, &lane->done_signal, &lane->done_lock, &lane->status) }; + + bool const done{ !lane->m_thread.joinable() || lane->waitForCompletion(duration) }; if (!done || !L2) { STACK_GROW(L, 2); @@ -1486,58 +1409,47 @@ LUAG_FUNC(thread_join) // Thread is DONE/ERROR_ST/CANCELLED; all ours now int ret{ 0 }; - if (lane->mstatus == Lane::Killed) // OS thread was killed if thread_cancel was forced - { - // in that case, even if the thread was killed while DONE/ERROR_ST/CANCELLED, ignore regular return values - STACK_GROW(L, 2); - lua_pushnil(L); - lua_pushliteral(L, "killed"); - ret = 2; - } - else + Universe* const U{ lane->U }; + // debug_name is a pointer to string possibly interned in the lane's state, that no longer exists when the state is closed + // so store it in the userdata uservalue at a key that can't possibly collide + securize_debug_threadname(L, lane); + switch (lane->status) { - Universe* const U{ lane->U }; - // debug_name is a pointer to string possibly interned in the lane's state, that no longer exists when the state is closed - // so store it in the userdata uservalue at a key that can't possibly collide - securize_debug_threadname(L, lane); - switch (lane->status) + case DONE: { - case DONE: + int const n{ lua_gettop(L2) }; // whole L2 stack + if ((n > 0) && (luaG_inter_move(U, L2, L, n, LookupMode::LaneBody) != 0)) { - int const n{ lua_gettop(L2) }; // whole L2 stack - if ((n > 0) && (luaG_inter_move(U, L2, L, n, LookupMode::LaneBody) != 0)) - { - return luaL_error(L, "tried to copy unsupported types"); - } - ret = n; + return luaL_error(L, "tried to copy unsupported types"); } - break; + ret = n; + } + break; - case ERROR_ST: + case ERROR_ST: + { + int const n{ lua_gettop(L2) }; + STACK_GROW(L, 3); + lua_pushnil(L); + // even when ERROR_FULL_STACK, if the error is not LUA_ERRRUN, the handler wasn't called, and we only have 1 error message on the stack ... + if (luaG_inter_move(U, L2, L, n, LookupMode::LaneBody) != 0) // nil "err" [trace] { - int const n{ lua_gettop(L2) }; - STACK_GROW(L, 3); - lua_pushnil(L); - // even when ERROR_FULL_STACK, if the error is not LUA_ERRRUN, the handler wasn't called, and we only have 1 error message on the stack ... - if (luaG_inter_move(U, L2, L, n, LookupMode::LaneBody) != 0) // nil "err" [trace] - { - return luaL_error(L, "tried to copy unsupported types: %s", lua_tostring(L, -n)); - } - ret = 1 + n; + return luaL_error(L, "tried to copy unsupported types: %s", lua_tostring(L, -n)); } - break; + ret = 1 + n; + } + break; - case CANCELLED: - ret = 0; - break; + case CANCELLED: + ret = 0; + break; - default: - DEBUGSPEW_CODE(fprintf(stderr, "Status: %d\n", lane->status)); - ASSERT_L(false); - ret = 0; - } - lua_close(L2); + default: + DEBUGSPEW_CODE(fprintf(stderr, "Status: %d\n", lane->status)); + ASSERT_L(false); + ret = 0; } + lua_close(L2); lane->L = nullptr; STACK_CHECK(L, ret); return ret; @@ -1596,15 +1508,12 @@ LUAG_FUNC(thread_index) switch (lane->status) { default: - if (lane->mstatus != Lane::Killed) - { - // this is an internal error, we probably never get here - lua_settop(L, 0); - lua_pushliteral(L, "Unexpected status: "); - lua_pushstring(L, thread_status_string(lane)); - lua_concat(L, 2); - raise_lua_error(L); - } + // this is an internal error, we probably never get here + lua_settop(L, 0); + lua_pushliteral(L, "Unexpected status: "); + lua_pushstring(L, thread_status_string(lane)); + lua_concat(L, 2); + raise_lua_error(L); [[fallthrough]]; // fall through if we are killed, as we got nil, "killed" on the stack case DONE: // got regular return values @@ -1790,8 +1699,7 @@ LUAG_FUNC(wakeup_conv) lua_pop(L,1); STACK_CHECK(L, 0); - struct tm t; - memset(&t, 0, sizeof(t)); + std::tm t{}; t.tm_year = year - 1900; t.tm_mon= month-1; // 0..11 t.tm_mday= day; // 1..31 @@ -1800,7 +1708,7 @@ LUAG_FUNC(wakeup_conv) t.tm_sec= sec; // 0..60 t.tm_isdst= isdst; // 0/1/negative - lua_pushnumber(L, static_cast(mktime(&t))); // ms=0 + lua_pushnumber(L, static_cast(std::mktime(&t))); // resolution: 1 second return 1; } @@ -1909,13 +1817,14 @@ LUAG_FUNC(configure) DEBUGSPEW_CODE(fprintf( stderr, INDENT_BEGIN "%p: lanes.configure() BEGIN\n" INDENT_END, L)); DEBUGSPEW_CODE(if (U) U->debugspew_indent_depth.fetch_add(1, std::memory_order_relaxed)); - if(U == nullptr) + if (U == nullptr) { - U = universe_create( L); // settings universe + U = universe_create(L); // settings universe DEBUGSPEW_CODE(U->debugspew_indent_depth.fetch_add(1, std::memory_order_relaxed)); lua_newtable( L); // settings universe mt lua_getfield(L, 1, "shutdown_timeout"); // settings universe mt shutdown_timeout - lua_pushcclosure(L, universe_gc, 1); // settings universe mt universe_gc + lua_getfield(L, 1, "shutdown_mode"); // settings universe mt shutdown_timeout shutdown_mode + lua_pushcclosure(L, universe_gc, 2); // settings universe mt universe_gc lua_setfield(L, -2, "__gc"); // settings universe mt lua_setmetatable(L, -2); // settings universe lua_pop(L, 1); // settings diff --git a/src/lanes.lua b/src/lanes.lua index 6af286a..fd3d22b 100644 --- a/src/lanes.lua +++ b/src/lanes.lua @@ -73,6 +73,7 @@ lanes.configure = function( settings_) keepers_gc_threshold = -1, on_state_create = nil, shutdown_timeout = 0.25, + shutdown_mode = "hard", with_timers = true, track_lanes = false, demote_full_userdata = nil, @@ -113,6 +114,11 @@ lanes.configure = function( settings_) -- shutdown_timeout should be a number >= 0 return type( val_) == "number" and val_ >= 0 end, + shutdown_mode = function( val_) + local valid_hooks = { soft = true, hard = true, call = true, ret = true, line = true, count = true } + -- shutdown_mode should be a known hook mask + return valid_hooks[val_] + end, track_lanes = boolean_param_checker, demote_full_userdata = boolean_param_checker, verbose_errors = boolean_param_checker @@ -367,262 +373,263 @@ lanes.configure = function( settings_) if settings.with_timers ~= false then + -- + -- On first 'require "lanes"', a timer lane is spawned that will maintain + -- timer tables and sleep in between the timer events. All interaction with + -- the timer lane happens via a 'timer_gateway' Linda, which is common to + -- all that 'require "lanes"'. + -- + -- Linda protocol to timer lane: + -- + -- TGW_KEY: linda_h, key, [wakeup_at_secs], [repeat_secs] + -- + local TGW_KEY= "(timer control)" -- the key does not matter, a 'weird' key may help debugging + local TGW_QUERY, TGW_REPLY = "(timer query)", "(timer reply)" + local first_time_key= "first time" - -- - -- On first 'require "lanes"', a timer lane is spawned that will maintain - -- timer tables and sleep in between the timer events. All interaction with - -- the timer lane happens via a 'timer_gateway' Linda, which is common to - -- all that 'require "lanes"'. - -- - -- Linda protocol to timer lane: - -- - -- TGW_KEY: linda_h, key, [wakeup_at_secs], [repeat_secs] - -- - local TGW_KEY= "(timer control)" -- the key does not matter, a 'weird' key may help debugging - local TGW_QUERY, TGW_REPLY = "(timer query)", "(timer reply)" - local first_time_key= "first time" - - local first_time = timer_gateway:get( first_time_key) == nil - timer_gateway:set( first_time_key, true) - - -- - -- Timer lane; initialize only on the first 'require "lanes"' instance (which naturally - -- has 'table' always declared) - -- - if first_time then + local first_time = timer_gateway:get( first_time_key) == nil + timer_gateway:set( first_time_key, true) local now_secs = core.now_secs - assert( type( now_secs) == "function") - ----- - -- Snore loop (run as a lane on the background) - -- - -- High priority, to get trustworthy timings. + local wakeup_conv = core.wakeup_conv + -- - -- We let the timer lane be a "free running" thread; no handle to it - -- remains. + -- Timer lane; initialize only on the first 'require "lanes"' instance (which naturally + -- has 'table' always declared) -- - local timer_body = function() - set_debug_threadname( "LanesTimer") - -- - -- { [deep_linda_lightuserdata]= { [deep_linda_lightuserdata]=linda_h, - -- [key]= { wakeup_secs [,period_secs] } [, ...] }, - -- } - -- - -- Collection of all running timers, indexed with linda's & key. + if first_time then + + assert( type( now_secs) == "function") + ----- + -- Snore loop (run as a lane on the background) -- - -- Note that we need to use the deep lightuserdata identifiers, instead - -- of 'linda_h' themselves as table indices. Otherwise, we'd get multiple - -- entries for the same timer. + -- High priority, to get trustworthy timings. -- - -- The 'hidden' reference to Linda proxy is used in 'check_timers()' but - -- also important to keep the Linda alive, even if all outside world threw - -- away pointers to it (which would ruin uniqueness of the deep pointer). - -- Now we're safe. + -- We let the timer lane be a "free running" thread; no handle to it + -- remains. -- - local collection = {} - local table_insert = assert( table.insert) - - local get_timers = function() - local r = {} - for deep, t in pairs( collection) do - -- WR( tostring( deep)) - local l = t[deep] - for key, timer_data in pairs( t) do - if key ~= deep then - table_insert( r, {l, key, timer_data}) + local timer_body = function() + set_debug_threadname( "LanesTimer") + -- + -- { [deep_linda_lightuserdata]= { [deep_linda_lightuserdata]=linda_h, + -- [key]= { wakeup_secs [,period_secs] } [, ...] }, + -- } + -- + -- Collection of all running timers, indexed with linda's & key. + -- + -- Note that we need to use the deep lightuserdata identifiers, instead + -- of 'linda_h' themselves as table indices. Otherwise, we'd get multiple + -- entries for the same timer. + -- + -- The 'hidden' reference to Linda proxy is used in 'check_timers()' but + -- also important to keep the Linda alive, even if all outside world threw + -- away pointers to it (which would ruin uniqueness of the deep pointer). + -- Now we're safe. + -- + local collection = {} + local table_insert = assert( table.insert) + + local get_timers = function() + local r = {} + for deep, t in pairs( collection) do + -- WR( tostring( deep)) + local l = t[deep] + for key, timer_data in pairs( t) do + if key ~= deep then + table_insert( r, {l, key, timer_data}) + end end end - end - return r - end -- get_timers() - - -- - -- set_timer( linda_h, key [,wakeup_at_secs [,period_secs]] ) - -- - local set_timer = function( linda, key, wakeup_at, period) - assert( wakeup_at == nil or wakeup_at > 0.0) - assert( period == nil or period > 0.0) + return r + end -- get_timers() - local linda_deep = linda:deep() - assert( linda_deep) - - -- Find or make a lookup for this timer -- - local t1 = collection[linda_deep] - if not t1 then - t1 = { [linda_deep] = linda} -- proxy to use the Linda - collection[linda_deep] = t1 - end + -- set_timer( linda_h, key [,wakeup_at_secs [,period_secs]] ) + -- + local set_timer = function( linda, key, wakeup_at, period) + assert( wakeup_at == nil or wakeup_at > 0.0) + assert( period == nil or period > 0.0) - if wakeup_at == nil then - -- Clear the timer - -- - t1[key]= nil + local linda_deep = linda:deep() + assert( linda_deep) - -- Remove empty tables from collection; speeds timer checks and - -- lets our 'safety reference' proxy be gc:ed as well. + -- Find or make a lookup for this timer -- - local empty = true - for k, _ in pairs( t1) do - if k ~= linda_deep then - empty = false - break - end - end - if empty then - collection[linda_deep] = nil + local t1 = collection[linda_deep] + if not t1 then + t1 = { [linda_deep] = linda} -- proxy to use the Linda + collection[linda_deep] = t1 end - -- Note: any unread timer value is left at 'linda[key]' intensionally; - -- clearing a timer just stops it. - else - -- New timer or changing the timings - -- - local t2 = t1[key] - if not t2 then - t2= {} - t1[key]= t2 - end + if wakeup_at == nil then + -- Clear the timer + -- + t1[key]= nil - t2[1] = wakeup_at - t2[2] = period -- can be 'nil' - end - end -- set_timer() + -- Remove empty tables from collection; speeds timer checks and + -- lets our 'safety reference' proxy be gc:ed as well. + -- + local empty = true + for k, _ in pairs( t1) do + if k ~= linda_deep then + empty = false + break + end + end + if empty then + collection[linda_deep] = nil + end - ----- - -- [next_wakeup_at]= check_timers() - -- Check timers, and wake up the ones expired (if any) - -- Returns the closest upcoming (remaining) wakeup time (or 'nil' if none). - local check_timers = function() - local now = now_secs() - local next_wakeup - - for linda_deep,t1 in pairs(collection) do - for key,t2 in pairs(t1) do + -- Note: any unread timer value is left at 'linda[key]' intensionally; + -- clearing a timer just stops it. + else + -- New timer or changing the timings -- - if key==linda_deep then - -- no 'continue' in Lua :/ - else - -- 't2': { wakeup_at_secs [,period_secs] } + local t2 = t1[key] + if not t2 then + t2= {} + t1[key]= t2 + end + + t2[1] = wakeup_at + t2[2] = period -- can be 'nil' + end + end -- set_timer() + + ----- + -- [next_wakeup_at]= check_timers() + -- Check timers, and wake up the ones expired (if any) + -- Returns the closest upcoming (remaining) wakeup time (or 'nil' if none). + local check_timers = function() + local now = now_secs() + local next_wakeup + + for linda_deep,t1 in pairs(collection) do + for key,t2 in pairs(t1) do -- - local wakeup_at= t2[1] - local period= t2[2] -- may be 'nil' - - if wakeup_at <= now then - local linda= t1[linda_deep] - assert(linda) - - linda:set( key, now ) - - -- 'pairs()' allows the values to be modified (and even - -- removed) as far as keys are not touched - - if not period then - -- one-time timer; gone - -- - t1[key]= nil - wakeup_at= nil -- no 'continue' in Lua :/ - else - -- repeating timer; find next wakeup (may jump multiple repeats) - -- - repeat - wakeup_at= wakeup_at+period - until wakeup_at > now - - t2[1]= wakeup_at + if key==linda_deep then + -- no 'continue' in Lua :/ + else + -- 't2': { wakeup_at_secs [,period_secs] } + -- + local wakeup_at= t2[1] + local period= t2[2] -- may be 'nil' + + if wakeup_at <= now then + local linda= t1[linda_deep] + assert(linda) + + linda:set( key, now ) + + -- 'pairs()' allows the values to be modified (and even + -- removed) as far as keys are not touched + + if not period then + -- one-time timer; gone + -- + t1[key]= nil + wakeup_at= nil -- no 'continue' in Lua :/ + else + -- repeating timer; find next wakeup (may jump multiple repeats) + -- + repeat + wakeup_at= wakeup_at+period + until wakeup_at > now + + t2[1]= wakeup_at + end end - end - if wakeup_at and ((not next_wakeup) or (wakeup_at < next_wakeup)) then - next_wakeup= wakeup_at + if wakeup_at and ((not next_wakeup) or (wakeup_at < next_wakeup)) then + next_wakeup= wakeup_at + end end + end -- t2 loop + end -- t1 loop + + return next_wakeup -- may be 'nil' + end -- check_timers() + + local timer_gateway_batched = timer_gateway.batched + set_finalizer( function( err, stk) + if err and type( err) ~= "userdata" then + WR( "LanesTimer error: "..tostring(err)) + --elseif type( err) == "userdata" then + -- WR( "LanesTimer after cancel" ) + --else + -- WR("LanesTimer finalized") + end + end) + while true do + local next_wakeup = check_timers() + + -- Sleep until next timer to wake up, or a set/clear command + -- + local secs + if next_wakeup then + secs = next_wakeup - now_secs() + if secs < 0 then secs = 0 end + end + local key, what = timer_gateway:receive( secs, TGW_KEY, TGW_QUERY) + + if key == TGW_KEY then + assert( getmetatable( what) == "Linda") -- 'what' should be a linda on which the client sets a timer + local _, key, wakeup_at, period = timer_gateway:receive( 0, timer_gateway_batched, TGW_KEY, 3) + assert( key) + set_timer( what, key, wakeup_at, period and period > 0 and period or nil) + elseif key == TGW_QUERY then + if what == "get_timers" then + timer_gateway:send( TGW_REPLY, get_timers()) + else + timer_gateway:send( TGW_REPLY, "unknown query " .. what) end - end -- t2 loop - end -- t1 loop - - return next_wakeup -- may be 'nil' - end -- check_timers() - - local timer_gateway_batched = timer_gateway.batched - set_finalizer( function( err, stk) - if err and type( err) ~= "userdata" then - WR( "LanesTimer error: "..tostring(err)) - --elseif type( err) == "userdata" then - -- WR( "LanesTimer after cancel" ) - --else - -- WR("LanesTimer finalized") + --elseif secs == nil then -- got no value while block-waiting? + -- WR( "timer lane: no linda, aborted?") + end end - end) - while true do - local next_wakeup = check_timers() + end -- timer_body() + timer_lane = gen( "*", { package= {}, priority = max_prio}, timer_body)() -- "*" instead of "io,package" for LuaJIT compatibility... + end -- first_time - -- Sleep until next timer to wake up, or a set/clear command + ----- + -- = timer( linda_h, key_val, date_tbl|first_secs [,period_secs] ) + -- + -- PUBLIC LANES API + timer = function( linda, key, a, period ) + if getmetatable( linda) ~= "Linda" then + error "expecting a Linda" + end + if a == 0.0 then + -- Caller expects to get current time stamp in Linda, on return + -- (like the timer had expired instantly); it would be good to set this + -- as late as possible (to give most current time) but also we want it + -- to precede any possible timers that might start striking. -- - local secs - if next_wakeup then - secs = next_wakeup - now_secs() - if secs < 0 then secs = 0 end - end - local key, what = timer_gateway:receive( secs, TGW_KEY, TGW_QUERY) - - if key == TGW_KEY then - assert( getmetatable( what) == "Linda") -- 'what' should be a linda on which the client sets a timer - local _, key, wakeup_at, period = timer_gateway:receive( 0, timer_gateway_batched, TGW_KEY, 3) - assert( key) - set_timer( what, key, wakeup_at, period and period > 0 and period or nil) - elseif key == TGW_QUERY then - if what == "get_timers" then - timer_gateway:send( TGW_REPLY, get_timers()) - else - timer_gateway:send( TGW_REPLY, "unknown query " .. what) - end - --elseif secs == nil then -- got no value while block-waiting? - -- WR( "timer lane: no linda, aborted?") + linda:set( key, now_secs()) + + if not period or period==0.0 then + timer_gateway:send( TGW_KEY, linda, key, nil, nil ) -- clear the timer + return -- nothing more to do end + a= period end - end -- timer_body() - timer_lane = gen( "*", { package= {}, priority = max_prio}, timer_body)() -- "*" instead of "io,package" for LuaJIT compatibility... - end -- first_time - ----- - -- = timer( linda_h, key_val, date_tbl|first_secs [,period_secs] ) - -- - -- PUBLIC LANES API - timer = function( linda, key, a, period ) - if getmetatable( linda) ~= "Linda" then - error "expecting a Linda" - end - if a == 0.0 then - -- Caller expects to get current time stamp in Linda, on return - -- (like the timer had expired instantly); it would be good to set this - -- as late as possible (to give most current time) but also we want it - -- to precede any possible timers that might start striking. + local wakeup_at= type(a)=="table" and wakeup_conv(a) -- given point of time + or (a and now_secs()+a or nil) + -- queue to timer -- - linda:set( key, core.now_secs()) + timer_gateway:send( TGW_KEY, linda, key, wakeup_at, period ) + end -- timer() - if not period or period==0.0 then - timer_gateway:send( TGW_KEY, linda, key, nil, nil ) -- clear the timer - return -- nothing more to do - end - a= period - end - - local wakeup_at= type(a)=="table" and core.wakeup_conv(a) -- given point of time - or (a and core.now_secs()+a or nil) - -- queue to timer + ----- + -- {[{linda, slot, when, period}[,...]]} = timers() -- - timer_gateway:send( TGW_KEY, linda, key, wakeup_at, period ) - end - - ----- - -- {[{linda, slot, when, period}[,...]]} = timers() - -- - -- PUBLIC LANES API - timers = function() - timer_gateway:send( TGW_QUERY, "get_timers") - local _, r = timer_gateway:receive( TGW_REPLY) - return r - end + -- PUBLIC LANES API + timers = function() + timer_gateway:send( TGW_QUERY, "get_timers") + local _, r = timer_gateway:receive( TGW_REPLY) + return r + end -- timers() end -- settings.with_timers diff --git a/src/lanes_private.h b/src/lanes_private.h index bcc3014..01d43c0 100644 --- a/src/lanes_private.h +++ b/src/lanes_private.h @@ -4,27 +4,26 @@ #include "uniquekey.h" #include "universe.h" +#include +#include #include +#include +#include // NOTE: values to be changed by either thread, during execution, without // locking, are marked "volatile" // class Lane { - private: - - enum class ThreadStatus - { - Normal, // normal master side state - Killed // issued an OS kill - }; - public: - using enum ThreadStatus; - - THREAD_T thread; + // the thread + std::jthread m_thread; + // a latch to wait for the lua_State to be ready std::latch m_ready{ 1 }; + // to wait for stop requests through m_thread's stop_source + std::mutex m_done_mutex; + std::condition_variable m_done_signal; // use condition_variable_any if waiting for a stop_token // // M: sub-thread OS thread // S: not used @@ -42,7 +41,7 @@ class Lane // M: sets to PENDING (before launching) // S: updates -> RUNNING/WAITING -> DONE/ERROR_ST/CANCELLED - SIGNAL_T* volatile waiting_on{ nullptr }; + std::condition_variable* volatile m_waiting_on{ nullptr }; // // When status is WAITING, points on the linda's signal the thread waits on, else nullptr @@ -51,23 +50,6 @@ class Lane // M: sets to false, flags true for cancel request // S: reads to see if cancel is requested -#if THREADWAIT_METHOD == THREADWAIT_CONDVAR - SIGNAL_T done_signal; - // - // M: Waited upon at lane ending (if Posix with no PTHREAD_TIMEDJOIN) - // S: sets the signal once cancellation is noticed (avoids a kill) - - MUTEX_T done_lock; - // - // Lock required by 'done_signal' condition variable, protecting - // lane status changes to DONE/ERROR_ST/CANCELLED. -#endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR - - volatile ThreadStatus mstatus{ Normal }; - // - // M: sets to Normal, if issued a kill changes to Killed - // S: not used - Lane* volatile selfdestruct_next{ nullptr }; // // M: sets to non-nullptr if facing lane handle '__gc' cycle but the lane @@ -88,6 +70,9 @@ class Lane Lane(Universe* U_, lua_State* L_); ~Lane(); + + bool waitForCompletion(lua_Duration duration_); + void startThread(int priority_); }; // xxh64 of string "LANE_POINTER_REGKEY" generated at https://www.pelock.com/products/hash-calculator diff --git a/src/linda.cpp b/src/linda.cpp index 5ee4768..ea1410e 100644 --- a/src/linda.cpp +++ b/src/linda.cpp @@ -61,8 +61,8 @@ class Linda : public DeepPrelude // Deep userdata MUST start with this header public: - SIGNAL_T read_happened; - SIGNAL_T write_happened; + std::condition_variable m_read_happened; + std::condition_variable m_write_happened; Universe* const U; // the universe this linda belongs to uintptr_t const group; // a group to control keeper allocation between lindas CancelRequest simulate_cancel{ CancelRequest::None }; @@ -81,17 +81,11 @@ class Linda : public DeepPrelude // Deep userdata MUST start with this header : U{ U_ } , group{ group_ << KEEPER_MAGIC_SHIFT } { - SIGNAL_INIT(&read_happened); - SIGNAL_INIT(&write_happened); - setName(name_, len_); } ~Linda() { - // There aren't any lanes waiting on these lindas, since all proxies have been gc'ed. Right? - SIGNAL_FREE(&read_happened); - SIGNAL_FREE(&write_happened); if (std::holds_alternative(m_name)) { AllocatedName& name = std::get(m_name); @@ -216,15 +210,19 @@ LUAG_FUNC(linda_protected_call) LUAG_FUNC(linda_send) { Linda* const linda{ lua_toLinda(L, 1) }; - time_d timeout{ -1.0 }; + std::chrono::time_point until{ std::chrono::time_point::max() }; int key_i{ 2 }; // index of first key, if timeout not there if (lua_type(L, 2) == LUA_TNUMBER) // we don't want to use lua_isnumber() because of autocoercion { - timeout = SIGNAL_TIMEOUT_PREPARE(lua_tonumber(L, 2)); + lua_Duration const duration{ lua_tonumber(L, 2) }; + if (duration.count() >= 0.0) + { + until = std::chrono::steady_clock::now() + std::chrono::duration_cast(duration); + } ++key_i; } - else if (lua_isnil(L, 2)) // alternate explicit "no timeout" by passing nil before the key + else if (lua_isnil(L, 2)) // alternate explicit "infinite timeout" by passing nil before the key { ++key_i; } @@ -266,6 +264,7 @@ LUAG_FUNC(linda_send) lua_State* const KL{ K ? K->L : nullptr }; if (KL == nullptr) return 0; + STACK_CHECK_START_REL(KL, 0); for (bool try_again{ true };;) { @@ -295,12 +294,12 @@ LUAG_FUNC(linda_send) if (ret) { // Wake up ALL waiting threads - SIGNAL_ALL(&linda->write_happened); + linda->m_write_happened.notify_all(); break; } // instant timout to bypass the wait syscall - if (timeout == 0.0) + if (std::chrono::steady_clock::now() >= until) { break; /* no wait; instant timeout */ } @@ -314,14 +313,17 @@ LUAG_FUNC(linda_send) prev_status = lane->status; // RUNNING, most likely ASSERT_L(prev_status == RUNNING); // but check, just in case lane->status = WAITING; - ASSERT_L(lane->waiting_on == nullptr); - lane->waiting_on = &linda->read_happened; + ASSERT_L(lane->m_waiting_on == nullptr); + lane->m_waiting_on = &linda->m_read_happened; } // could not send because no room: wait until some data was read before trying again, or until timeout is reached - try_again = SIGNAL_WAIT(&linda->read_happened, &K->keeper_cs, timeout); + std::unique_lock keeper_lock{ K->m_mutex, std::adopt_lock }; + std::cv_status const status{ linda->m_read_happened.wait_until(keeper_lock, until) }; + keeper_lock.release(); // we don't want to release the lock! + try_again = (status == std::cv_status::no_timeout); // detect spurious wakeups if (lane != nullptr) { - lane->waiting_on = nullptr; + lane->m_waiting_on = nullptr; lane->status = prev_status; } } @@ -369,21 +371,24 @@ static constexpr UniqueKey BATCH_SENTINEL{ 0x2DDFEE0968C62AA7ull }; LUAG_FUNC(linda_receive) { Linda* const linda{ lua_toLinda(L, 1) }; - - time_d timeout{ -1.0 }; - int key_i{ 2 }; + std::chrono::time_point until{ std::chrono::time_point::max() }; + int key_i{ 2 }; // index of first key, if timeout not there if (lua_type(L, 2) == LUA_TNUMBER) // we don't want to use lua_isnumber() because of autocoercion { - timeout = SIGNAL_TIMEOUT_PREPARE(lua_tonumber(L, 2)); + lua_Duration const duration{ lua_tonumber(L, 2) }; + if (duration.count() >= 0.0) + { + until = std::chrono::steady_clock::now() + std::chrono::duration_cast(duration); + } ++key_i; } - else if (lua_isnil(L, 2)) // alternate explicit "no timeout" by passing nil before the key + else if (lua_isnil(L, 2)) // alternate explicit "infinite timeout" by passing nil before the key { ++key_i; } - keeper_api_t keeper_receive; + keeper_api_t selected_keeper_receive{ nullptr }; int expected_pushed_min{ 0 }, expected_pushed_max{ 0 }; // are we in batched mode? BATCH_SENTINEL.pushKey(L); @@ -396,7 +401,7 @@ LUAG_FUNC(linda_receive) // make sure the keys are of a valid type check_key_types(L, key_i, key_i); // receive multiple values from a single slot - keeper_receive = KEEPER_API(receive_batched); + selected_keeper_receive = KEEPER_API(receive_batched); // we expect a user-defined amount of return value expected_pushed_min = (int) luaL_checkinteger(L, key_i + 1); expected_pushed_max = (int) luaL_optinteger(L, key_i + 2, expected_pushed_min); @@ -413,17 +418,20 @@ LUAG_FUNC(linda_receive) // make sure the keys are of a valid type check_key_types(L, key_i, lua_gettop(L)); // receive a single value, checking multiple slots - keeper_receive = KEEPER_API(receive); + selected_keeper_receive = KEEPER_API(receive); // we expect a single (value, key) pair of returned values expected_pushed_min = expected_pushed_max = 2; } Lane* const lane{ LANE_POINTER_REGKEY.readLightUserDataValue(L) }; Keeper* const K{ which_keeper(linda->U->keepers, linda->hashSeed()) }; - if (K == nullptr) + lua_State* const KL{ K ? K->L : nullptr }; + if (KL == nullptr) return 0; + CancelRequest cancel{ CancelRequest::None }; int pushed{ 0 }; + STACK_CHECK_START_REL(KL, 0); for (bool try_again{ true };;) { if (lane != nullptr) @@ -439,7 +447,7 @@ LUAG_FUNC(linda_receive) } // all arguments of receive() but the first are passed to the keeper's receive function - pushed = keeper_call(linda->U, K->L, keeper_receive, L, linda, key_i); + pushed = keeper_call(linda->U, KL, selected_keeper_receive, L, linda, key_i); if (pushed < 0) { break; @@ -451,11 +459,11 @@ LUAG_FUNC(linda_receive) keeper_toggle_nil_sentinels(L, lua_gettop(L) - pushed, LookupMode::FromKeeper); // To be done from within the 'K' locking area // - SIGNAL_ALL(&linda->read_happened); + linda->m_read_happened.notify_all(); break; } - if (timeout == 0.0) + if (std::chrono::steady_clock::now() >= until) { break; /* instant timeout */ } @@ -469,18 +477,22 @@ LUAG_FUNC(linda_receive) prev_status = lane->status; // RUNNING, most likely ASSERT_L(prev_status == RUNNING); // but check, just in case lane->status = WAITING; - ASSERT_L(lane->waiting_on == nullptr); - lane->waiting_on = &linda->write_happened; + ASSERT_L(lane->m_waiting_on == nullptr); + lane->m_waiting_on = &linda->m_write_happened; } // not enough data to read: wakeup when data was sent, or when timeout is reached - try_again = SIGNAL_WAIT(&linda->write_happened, &K->keeper_cs, timeout); + std::unique_lock keeper_lock{ K->m_mutex, std::adopt_lock }; + std::cv_status const status{ linda->m_write_happened.wait_until(keeper_lock, until) }; + keeper_lock.release(); // we don't want to release the lock! + try_again = (status == std::cv_status::no_timeout); // detect spurious wakeups if (lane != nullptr) { - lane->waiting_on = nullptr; + lane->m_waiting_on = nullptr; lane->status = prev_status; } } } + STACK_CHECK(KL, 0); if (pushed < 0) { @@ -537,13 +549,13 @@ LUAG_FUNC(linda_set) if (has_value) { // we put some data in the slot, tell readers that they should wake - SIGNAL_ALL(&linda->write_happened); // To be done from within the 'K' locking area + linda->m_write_happened.notify_all(); // To be done from within the 'K' locking area } if (pushed == 1) { // the key was full, but it is no longer the case, tell writers they should wake ASSERT_L(lua_type(L, -1) == LUA_TBOOLEAN && lua_toboolean(L, -1) == 1); - SIGNAL_ALL(&linda->read_happened); // To be done from within the 'K' locking area + linda->m_read_happened.notify_all(); // To be done from within the 'K' locking area } } } @@ -648,7 +660,7 @@ LUAG_FUNC( linda_limit) if( pushed == 1) { ASSERT_L( lua_type( L, -1) == LUA_TBOOLEAN && lua_toboolean( L, -1) == 1); - SIGNAL_ALL( &linda->read_happened); // To be done from within the 'K' locking area + linda->m_read_happened.notify_all(); // To be done from within the 'K' locking area } } else // linda is cancelled @@ -678,8 +690,8 @@ LUAG_FUNC(linda_cancel) linda->simulate_cancel = CancelRequest::Soft; if (strcmp(who, "both") == 0) // tell everyone writers to wake up { - SIGNAL_ALL(&linda->write_happened); - SIGNAL_ALL(&linda->read_happened); + linda->m_write_happened.notify_all(); + linda->m_read_happened.notify_all(); } else if (strcmp(who, "none") == 0) // reset flag { @@ -687,11 +699,11 @@ LUAG_FUNC(linda_cancel) } else if (strcmp(who, "read") == 0) // tell blocked readers to wake up { - SIGNAL_ALL(&linda->write_happened); + linda->m_write_happened.notify_all(); } else if (strcmp(who, "write") == 0) // tell blocked writers to wake up { - SIGNAL_ALL(&linda->read_happened); + linda->m_read_happened.notify_all(); } else { diff --git a/src/macros_and_utils.h b/src/macros_and_utils.h index e29e7fb..997b452 100644 --- a/src/macros_and_utils.h +++ b/src/macros_and_utils.h @@ -11,9 +11,12 @@ extern "C" { #endif // __cplusplus #include +#include #include #include +using namespace std::chrono_literals; + #define USE_DEBUG_SPEW() 0 #if USE_DEBUG_SPEW() extern char const* debugspew_indent; @@ -167,3 +170,5 @@ T* lua_newuserdatauv(lua_State* L, int nuvalue_) std::ignore = lua_error(L); // doesn't return assert(false); // we should never get here, but i'm paranoid } + +using lua_Duration = std::chrono::template duration; diff --git a/src/threading.cpp b/src/threading.cpp index afeb184..fc20931 100644 --- a/src/threading.cpp +++ b/src/threading.cpp @@ -93,9 +93,6 @@ THE SOFTWARE. # pragma warning( disable : 4054 ) #endif -//#define THREAD_CREATE_RETRIES_MAX 20 - // loops (maybe retry forever?) - /* * FAIL is for unexpected API return values - essentially programming * error in _this_ code. @@ -196,36 +193,6 @@ time_d now_secs(void) { } -/* -*/ -time_d SIGNAL_TIMEOUT_PREPARE( double secs ) { - if (secs<=0.0) return secs; - else return now_secs() + secs; -} - - -#if THREADAPI == THREADAPI_PTHREAD -/* -* Prepare 'abs_secs' kind of timeout to 'timespec' format -*/ -static void prepare_timeout( struct timespec *ts, time_d abs_secs ) { - assert(ts); - assert( abs_secs >= 0.0 ); - - if (abs_secs==0.0) - abs_secs= now_secs(); - - ts->tv_sec= (time_t) floor( abs_secs ); - ts->tv_nsec= ((long)((abs_secs - ts->tv_sec) * 1000.0 +0.5)) * 1000000UL; // 1ms = 1000000ns - if (ts->tv_nsec == 1000000000UL) - { - ts->tv_nsec = 0; - ts->tv_sec = ts->tv_sec + 1; - } -} -#endif // THREADAPI == THREADAPI_PTHREAD - - /*---=== Threading ===---*/ //--- @@ -268,30 +235,6 @@ static void prepare_timeout( struct timespec *ts, time_d abs_secs ) { #if THREADAPI == THREADAPI_WINDOWS -#if _WIN32_WINNT < 0x0600 // CONDITION_VARIABLE aren't available - // - void MUTEX_INIT( MUTEX_T *ref ) { - *ref= CreateMutex( nullptr /*security attr*/, false /*not locked*/, nullptr ); - if (!ref) FAIL( "CreateMutex", GetLastError() ); - } - void MUTEX_FREE( MUTEX_T *ref ) { - if (!CloseHandle(*ref)) FAIL( "CloseHandle (mutex)", GetLastError() ); - *ref= nullptr; - } - void MUTEX_LOCK( MUTEX_T *ref ) - { - DWORD rc = WaitForSingleObject( *ref, INFINITE); - // ERROR_WAIT_NO_CHILDREN means a thread was killed (lane terminated because of error raised during a linda transfer for example) while having grabbed this mutex - // this is not a big problem as we will grab it just the same, so ignore this particular error - if( rc != 0 && rc != ERROR_WAIT_NO_CHILDREN) - FAIL( "WaitForSingleObject", (rc == WAIT_FAILED) ? GetLastError() : rc); - } - void MUTEX_UNLOCK( MUTEX_T *ref ) { - if (!ReleaseMutex(*ref)) - FAIL( "ReleaseMutex", GetLastError() ); - } -#endif // CONDITION_VARIABLE aren't available - static int const gs_prio_remap[] = { THREAD_PRIORITY_IDLE, @@ -303,37 +246,7 @@ static int const gs_prio_remap[] = THREAD_PRIORITY_TIME_CRITICAL }; -/* MSDN: "If you would like to use the CRT in ThreadProc, use the -_beginthreadex function instead (of CreateThread)." -MSDN: "you can create at most 2028 threads" -*/ -// Note: Visual C++ requires '__stdcall' where it is -void THREAD_CREATE( THREAD_T* ref, THREAD_RETURN_T (__stdcall *func)( void*), void* data, int prio /* -3..+3 */) -{ - HANDLE h = (HANDLE) _beginthreadex(nullptr, // security - _THREAD_STACK_SIZE, - func, - data, - 0, // flags (0/CREATE_SUSPENDED) - nullptr // thread id (not used) - ); - - if (h == nullptr) // _beginthreadex returns 0L on failure instead of -1L (like _beginthread) - { - FAIL( "CreateThread", GetLastError()); - } - - if (prio != THREAD_PRIO_DEFAULT) - { - if (!SetThreadPriority( h, gs_prio_remap[prio + 3])) - { - FAIL( "SetThreadPriority", GetLastError()); - } - } - - *ref = h; -} - +// ############################################################################################### void THREAD_SET_PRIORITY( int prio) { @@ -344,42 +257,26 @@ void THREAD_SET_PRIORITY( int prio) } } -void THREAD_SET_AFFINITY( unsigned int aff) +// ############################################################################################### + +void JTHREAD_SET_PRIORITY(std::jthread& thread_, int prio_) { - if( !SetThreadAffinityMask( GetCurrentThread(), aff)) + // prio range [-3,+3] was checked by the caller + if (!SetThreadPriority(thread_.native_handle(), gs_prio_remap[prio_ + 3])) { - FAIL( "THREAD_SET_AFFINITY", GetLastError()); + FAIL("JTHREAD_SET_PRIORITY", GetLastError()); } } -bool THREAD_WAIT_IMPL( THREAD_T *ref, double secs) -{ - DWORD ms = (secs<0.0) ? INFINITE : (DWORD)((secs*1000.0)+0.5); +// ############################################################################################### - DWORD rc= WaitForSingleObject( *ref, ms /*timeout*/ ); - // - // (WAIT_ABANDONED) - // WAIT_OBJECT_0 success (0) - // WAIT_TIMEOUT - // WAIT_FAILED more info via GetLastError() - - if (rc == WAIT_TIMEOUT) return false; - if( rc !=0) FAIL( "WaitForSingleObject", rc==WAIT_FAILED ? GetLastError() : rc); - *ref = nullptr; // thread no longer usable - return true; - } - // - void THREAD_KILL( THREAD_T *ref ) +void THREAD_SET_AFFINITY(unsigned int aff) +{ + if( !SetThreadAffinityMask( GetCurrentThread(), aff)) { - // nonexistent on Xbox360, simply disable until a better solution is found - #if !defined( PLATFORM_XBOX) - // in theory no-one should call this as it is very dangerous (memory and mutex leaks, no notification of DLLs, etc.) - if (!TerminateThread( *ref, 0 )) FAIL("TerminateThread", GetLastError()); - #endif // PLATFORM_XBOX - *ref = nullptr; + FAIL( "THREAD_SET_AFFINITY", GetLastError()); } - - void THREAD_MAKE_ASYNCH_CANCELLABLE() {} // nothing to do for windows threads, we can cancel them anytime we want +} #if !defined __GNUC__ //see http://msdn.microsoft.com/en-us/library/xcb2z8hs.aspx @@ -414,158 +311,6 @@ bool THREAD_WAIT_IMPL( THREAD_T *ref, double secs) #endif // !__GNUC__ } -#if _WIN32_WINNT < 0x0600 // CONDITION_VARIABLE aren't available - - void SIGNAL_INIT( SIGNAL_T* ref) - { - InitializeCriticalSection( &ref->signalCS); - InitializeCriticalSection( &ref->countCS); - if( 0 == (ref->waitEvent = CreateEvent( 0, true, false, 0))) // manual-reset - FAIL( "CreateEvent", GetLastError()); - if( 0 == (ref->waitDoneEvent = CreateEvent( 0, false, false, 0))) // auto-reset - FAIL( "CreateEvent", GetLastError()); - ref->waitersCount = 0; - } - - void SIGNAL_FREE( SIGNAL_T* ref) - { - CloseHandle( ref->waitDoneEvent); - CloseHandle( ref->waitEvent); - DeleteCriticalSection( &ref->countCS); - DeleteCriticalSection( &ref->signalCS); - } - - bool SIGNAL_WAIT( SIGNAL_T* ref, MUTEX_T* mu_ref, time_d abs_secs) - { - DWORD errc; - DWORD ms; - - if( abs_secs < 0.0) - ms = INFINITE; - else if( abs_secs == 0.0) - ms = 0; - else - { - time_d msd = (abs_secs - now_secs()) * 1000.0 + 0.5; - // If the time already passed, still try once (ms==0). A short timeout - // may have turned negative or 0 because of the two time samples done. - ms = msd <= 0.0 ? 0 : (DWORD)msd; - } - - EnterCriticalSection( &ref->signalCS); - EnterCriticalSection( &ref->countCS); - ++ ref->waitersCount; - LeaveCriticalSection( &ref->countCS); - LeaveCriticalSection( &ref->signalCS); - - errc = SignalObjectAndWait( *mu_ref, ref->waitEvent, ms, false); - - EnterCriticalSection( &ref->countCS); - if( 0 == -- ref->waitersCount) - { - // we're the last one leaving... - ResetEvent( ref->waitEvent); - SetEvent( ref->waitDoneEvent); - } - LeaveCriticalSection( &ref->countCS); - MUTEX_LOCK( mu_ref); - - switch( errc) - { - case WAIT_TIMEOUT: - return false; - case WAIT_OBJECT_0: - return true; - } - - FAIL( "SignalObjectAndWait", GetLastError()); - return false; - } - - void SIGNAL_ALL( SIGNAL_T* ref) - { - DWORD errc = WAIT_OBJECT_0; - - EnterCriticalSection( &ref->signalCS); - EnterCriticalSection( &ref->countCS); - - if( ref->waitersCount > 0) - { - ResetEvent( ref->waitDoneEvent); - SetEvent( ref->waitEvent); - LeaveCriticalSection( &ref->countCS); - errc = WaitForSingleObject( ref->waitDoneEvent, INFINITE); - } - else - { - LeaveCriticalSection( &ref->countCS); - } - - LeaveCriticalSection( &ref->signalCS); - - if( WAIT_OBJECT_0 != errc) - FAIL( "WaitForSingleObject", GetLastError()); - } - -#else // CONDITION_VARIABLE are available, use them - - // - void SIGNAL_INIT( SIGNAL_T *ref ) - { - InitializeConditionVariable( ref); - } - - void SIGNAL_FREE( SIGNAL_T *ref ) - { - // nothing to do - (void)ref; - } - - bool SIGNAL_WAIT( SIGNAL_T *ref, MUTEX_T *mu_ref, time_d abs_secs) - { - long ms; - - if( abs_secs < 0.0) - ms = INFINITE; - else if( abs_secs == 0.0) - ms = 0; - else - { - ms = (long) ((abs_secs - now_secs())*1000.0 + 0.5); - - // If the time already passed, still try once (ms==0). A short timeout - // may have turned negative or 0 because of the two time samples done. - // - if( ms < 0) - ms = 0; - } - - if( !SleepConditionVariableCS( ref, mu_ref, ms)) - { - if( GetLastError() == ERROR_TIMEOUT) - { - return false; - } - else - { - FAIL( "SleepConditionVariableCS", GetLastError()); - } - } - return true; - } - - void SIGNAL_ONE( SIGNAL_T *ref ) - { - WakeConditionVariable( ref); - } - - void SIGNAL_ALL( SIGNAL_T *ref ) - { - WakeAllConditionVariable( ref); - } - -#endif // CONDITION_VARIABLE are available - #else // THREADAPI == THREADAPI_PTHREAD // PThread (Linux, OS X, ...) // @@ -607,44 +352,6 @@ bool THREAD_WAIT_IMPL( THREAD_T *ref, double secs) abort(); } #define PT_CALL( call ) { int rc= call; if (rc!=0) _PT_FAIL( rc, #call, __FILE__, __LINE__ ); } - // - void SIGNAL_INIT( SIGNAL_T *ref ) { - PT_CALL(pthread_cond_init(ref, nullptr /*attr*/)); - } - void SIGNAL_FREE( SIGNAL_T *ref ) { - PT_CALL( pthread_cond_destroy(ref) ); - } - // - /* - * Timeout is given as absolute since we may have fake wakeups during - * a timed out sleep. A Linda with some other key read, or just because - * PThread cond vars can wake up unwantedly. - */ - bool SIGNAL_WAIT( SIGNAL_T *ref, pthread_mutex_t *mu, time_d abs_secs ) { - if (abs_secs<0.0) { - PT_CALL( pthread_cond_wait( ref, mu ) ); // infinite - } else { - int rc; - struct timespec ts; - - assert( abs_secs != 0.0 ); - prepare_timeout( &ts, abs_secs ); - - rc= pthread_cond_timedwait( ref, mu, &ts ); - - if (rc==ETIMEDOUT) return false; - if (rc) { _PT_FAIL( rc, "pthread_cond_timedwait()", __FILE__, __LINE__ ); } - } - return true; - } - // - void SIGNAL_ONE( SIGNAL_T *ref ) { - PT_CALL( pthread_cond_signal(ref) ); // wake up ONE (or no) waiting thread - } - // - void SIGNAL_ALL( SIGNAL_T *ref ) { - PT_CALL( pthread_cond_broadcast(ref) ); // wake up ALL waiting threads - } // array of 7 thread priority values, hand-tuned by platform so that we offer a uniform [-3,+3] public priority range static int const gs_prio_remap[] = @@ -775,129 +482,36 @@ static int select_prio(int prio /* -3..+3 */) return gs_prio_remap[prio + 3]; } -void THREAD_CREATE( THREAD_T* ref, THREAD_RETURN_T (*func)( void*), void* data, int prio /* -3..+3 */) +void THREAD_SET_PRIORITY( int prio) { - pthread_attr_t a; - bool const change_priority = #ifdef PLATFORM_LINUX - sudo && // only root-privileged process can change priorities -#endif - (prio != THREAD_PRIO_DEFAULT); - - PT_CALL( pthread_attr_init( &a)); - -#ifndef PTHREAD_TIMEDJOIN - // We create a NON-JOINABLE thread. This is mainly due to the lack of - // 'pthread_timedjoin()', but does offer other benefits (s.a. earlier - // freeing of the thread's resources). - // - PT_CALL( pthread_attr_setdetachstate( &a, PTHREAD_CREATE_DETACHED)); -#endif // PTHREAD_TIMEDJOIN - - // Use this to find a system's default stack size (DEBUG) -#if 0 - { - size_t n; - pthread_attr_getstacksize( &a, &n); - fprintf( stderr, "Getstack: %u\n", (unsigned int)n); - } - // 524288 on OS X - // 2097152 on Linux x86 (Ubuntu 7.04) - // 1048576 on FreeBSD 6.2 SMP i386 -#endif // 0 - -#if defined _THREAD_STACK_SIZE && _THREAD_STACK_SIZE > 0 - PT_CALL( pthread_attr_setstacksize( &a, _THREAD_STACK_SIZE)); -#endif - - if (change_priority) + if( sudo) // only root-privileged process can change priorities +#endif // PLATFORM_LINUX { struct sched_param sp; - // "The specified scheduling parameters are only used if the scheduling - // parameter inheritance attribute is PTHREAD_EXPLICIT_SCHED." - // -#if !defined __ANDROID__ || ( defined __ANDROID__ && __ANDROID_API__ >= 28 ) - PT_CALL( pthread_attr_setinheritsched( &a, PTHREAD_EXPLICIT_SCHED)); -#endif - -#ifdef _PRIO_SCOPE - PT_CALL( pthread_attr_setscope( &a, _PRIO_SCOPE)); -#endif // _PRIO_SCOPE - - PT_CALL( pthread_attr_setschedpolicy( &a, _PRIO_MODE)); - - sp.sched_priority = select_prio(prio); - PT_CALL( pthread_attr_setschedparam( &a, &sp)); - } - - //--- - // Seems on OS X, _POSIX_THREAD_THREADS_MAX is some kind of system - // thread limit (not userland thread). Actual limit for us is way higher. - // PTHREAD_THREADS_MAX is not defined (even though man page refers to it!) - // -# ifndef THREAD_CREATE_RETRIES_MAX - // Don't bother with retries; a failure is a failure - // - { - int rc = pthread_create( ref, &a, func, data); - if( rc) _PT_FAIL( rc, "pthread_create()", __FILE__, __LINE__ - 1); + // prio range [-3,+3] was checked by the caller + sp.sched_priority = gs_prio_remap[ prio + 3]; + PT_CALL( pthread_setschedparam( pthread_self(), _PRIO_MODE, &sp)); } -# else -# error "This code deprecated" - /* - // Wait slightly if thread creation has exchausted the system - // - { int retries; - for( retries=0; retries(thread_.native_handle()), _PRIO_MODE, &sp)); } } +// ################################################################################################# + void THREAD_SET_AFFINITY( unsigned int aff) { int bit = 0; @@ -929,93 +543,6 @@ void THREAD_SET_AFFINITY( unsigned int aff) #endif } - /* - * Wait for a thread to finish. - * - * 'mu_ref' is a lock we should use for the waiting; initially unlocked. - * Same lock as passed to THREAD_EXIT. - * - * Returns true for successful wait, false for timed out - */ -bool THREAD_WAIT( THREAD_T *ref, double secs , SIGNAL_T *signal_ref, MUTEX_T *mu_ref, volatile enum e_status *st_ref) -{ - struct timespec ts_store; - const struct timespec* timeout = nullptr; - bool done; - - // Do timeout counting before the locks - // -#if THREADWAIT_METHOD == THREADWAIT_TIMEOUT - if (secs>=0.0) -#else // THREADWAIT_METHOD == THREADWAIT_CONDVAR - if (secs>0.0) -#endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR - { - prepare_timeout( &ts_store, now_secs()+secs ); - timeout= &ts_store; - } - -#if THREADWAIT_METHOD == THREADWAIT_TIMEOUT - /* Thread is joinable - */ - if (!timeout) { - PT_CALL(pthread_join(*ref, nullptr /*ignore exit value*/)); - done = true; - } else { - int rc = PTHREAD_TIMEDJOIN(*ref, nullptr, timeout); - if ((rc!=0) && (rc!=ETIMEDOUT)) { - _PT_FAIL( rc, "PTHREAD_TIMEDJOIN", __FILE__, __LINE__-2 ); - } - done= rc==0; - } -#else // THREADWAIT_METHOD == THREADWAIT_CONDVAR - /* Since we've set the thread up as PTHREAD_CREATE_DETACHED, we cannot - * join with it. Use the cond.var. - */ - (void) ref; // unused - MUTEX_LOCK( mu_ref ); - - // 'secs'==0.0 does not need to wait, just take the current status - // within the 'mu_ref' locks - // - if (secs != 0.0) { - while( *st_ref < DONE ) { - if (!timeout) { - PT_CALL( pthread_cond_wait( signal_ref, mu_ref )); - } else { - int rc= pthread_cond_timedwait( signal_ref, mu_ref, timeout ); - if (rc==ETIMEDOUT) break; - if (rc!=0) _PT_FAIL( rc, "pthread_cond_timedwait", __FILE__, __LINE__-2 ); - } - } - } - done= *st_ref >= DONE; // DONE|ERROR_ST|CANCELLED - - MUTEX_UNLOCK( mu_ref ); -#endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR - return done; - } - // - void THREAD_KILL( THREAD_T *ref ) { -#ifdef __ANDROID__ - __android_log_print(ANDROID_LOG_WARN, LOG_TAG, "Cannot kill thread!"); -#else - pthread_cancel( *ref ); -#endif - } - - void THREAD_MAKE_ASYNCH_CANCELLABLE() - { -#ifdef __ANDROID__ - __android_log_print(ANDROID_LOG_WARN, LOG_TAG, "Cannot make thread async cancellable!"); -#else - // that's the default, but just in case... - pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, nullptr); - // we want cancellation to take effect immediately if possible, instead of waiting for a cancellation point (which is the default) - pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, nullptr); -#endif - } - void THREAD_SETNAME( char const* _name) { // exact API to set the thread name is platform-dependant diff --git a/src/threading.h b/src/threading.h index 38a021f..e9f302a 100644 --- a/src/threading.h +++ b/src/threading.h @@ -1,13 +1,9 @@ #pragma once -/* - * win32-pthread: - * define HAVE_WIN32_PTHREAD and PTW32_INCLUDE_WINDOWS_H in your project configuration when building for win32-pthread. - * link against pthreadVC2.lib, and of course have pthreadVC2.dll somewhere in your path. - */ #include "platform.h" #include +#include /* Note: ERROR is a defined entity on Win32 PENDING: The Lua VM hasn't done anything yet. @@ -19,7 +15,7 @@ enum e_status { PENDING, RUNNING, WAITING, DONE, ERROR_ST, CANCELLED }; #define THREADAPI_WINDOWS 1 #define THREADAPI_PTHREAD 2 -#if( defined( PLATFORM_XBOX) || defined( PLATFORM_WIN32) || defined( PLATFORM_POCKETPC)) && !defined( HAVE_WIN32_PTHREAD) +#if( defined( PLATFORM_XBOX) || defined( PLATFORM_WIN32) || defined( PLATFORM_POCKETPC)) //#pragma message ( "THREADAPI_WINDOWS" ) #define THREADAPI THREADAPI_WINDOWS #else // (defined PLATFORM_WIN32) || (defined PLATFORM_POCKETPC) @@ -68,16 +64,9 @@ enum e_status { PENDING, RUNNING, WAITING, DONE, ERROR_ST, CANCELLED }; }; - #define MUTEX_T HANDLE - void MUTEX_INIT( MUTEX_T* ref); - void MUTEX_FREE( MUTEX_T* ref); - void MUTEX_LOCK( MUTEX_T* ref); - void MUTEX_UNLOCK( MUTEX_T* ref); - #else // CONDITION_VARIABLE are available, use them #define SIGNAL_T CONDITION_VARIABLE - #define MUTEX_T CRITICAL_SECTION #define MUTEX_INIT( ref) InitializeCriticalSection( ref) #define MUTEX_FREE( ref) DeleteCriticalSection( ref) #define MUTEX_LOCK( ref) EnterCriticalSection( ref) @@ -111,7 +100,6 @@ enum e_status { PENDING, RUNNING, WAITING, DONE, ERROR_ST, CANCELLED }; # define _MUTEX_RECURSIVE PTHREAD_MUTEX_RECURSIVE #endif - #define MUTEX_T pthread_mutex_t #define MUTEX_INIT(ref) pthread_mutex_init(ref, nullptr) #define MUTEX_RECURSIVE_INIT(ref) \ { pthread_mutexattr_t a; pthread_mutexattr_init( &a ); \ @@ -126,8 +114,6 @@ enum e_status { PENDING, RUNNING, WAITING, DONE, ERROR_ST, CANCELLED }; using SIGNAL_T = pthread_cond_t; - void SIGNAL_ONE( SIGNAL_T *ref ); - // Yield is non-portable: // // OS X 10.4.8/9 has pthread_yield_np() @@ -143,10 +129,6 @@ enum e_status { PENDING, RUNNING, WAITING, DONE, ERROR_ST, CANCELLED }; #define THREAD_CALLCONV #endif //THREADAPI == THREADAPI_PTHREAD -void SIGNAL_INIT( SIGNAL_T *ref ); -void SIGNAL_FREE( SIGNAL_T *ref ); -void SIGNAL_ALL( SIGNAL_T *ref ); - /* * 'time_d': <0.0 for no timeout * 0.0 for instant check @@ -155,11 +137,6 @@ void SIGNAL_ALL( SIGNAL_T *ref ); using time_d = double; time_d now_secs(void); -time_d SIGNAL_TIMEOUT_PREPARE( double rel_secs ); - -bool SIGNAL_WAIT( SIGNAL_T *ref, MUTEX_T *mu, time_d timeout ); - - /*---=== Threading ===--- */ @@ -167,16 +144,9 @@ bool SIGNAL_WAIT( SIGNAL_T *ref, MUTEX_T *mu, time_d timeout ); #if THREADAPI == THREADAPI_WINDOWS - using THREAD_T = HANDLE; -# define THREAD_ISNULL( _h) (_h == 0) - void THREAD_CREATE( THREAD_T* ref, THREAD_RETURN_T (__stdcall *func)( void*), void* data, int prio /* -3..+3 */); - # define THREAD_PRIO_MIN (-3) # define THREAD_PRIO_MAX (+3) -# define THREAD_CLEANUP_PUSH( cb_, val_) -# define THREAD_CLEANUP_POP( execute_) - #else // THREADAPI == THREADAPI_PTHREAD /* Platforms that have a timed 'pthread_join()' can get away with a simpler @@ -195,11 +165,6 @@ bool SIGNAL_WAIT( SIGNAL_T *ref, MUTEX_T *mu, time_d timeout ); # endif # endif - using THREAD_T = pthread_t; -# define THREAD_ISNULL( _h) 0 // pthread_t may be a structure: never 'null' by itself - - void THREAD_CREATE( THREAD_T* ref, THREAD_RETURN_T (*func)( void*), void* data, int prio /* -3..+3 */); - # if defined(PLATFORM_LINUX) extern volatile bool sudo; # ifdef LINUX_SCHED_RR @@ -213,13 +178,6 @@ bool SIGNAL_WAIT( SIGNAL_T *ref, MUTEX_T *mu, time_d timeout ); # define THREAD_PRIO_MAX (+3) # endif -# if THREADWAIT_METHOD == THREADWAIT_CONDVAR -# define THREAD_CLEANUP_PUSH( cb_, val_) pthread_cleanup_push( cb_, val_) -# define THREAD_CLEANUP_POP( execute_) pthread_cleanup_pop( execute_) -# else -# define THREAD_CLEANUP_PUSH( cb_, val_) { -# define THREAD_CLEANUP_POP( execute_) } -# endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR #endif // THREADAPI == THREADAPI_WINDOWS /* @@ -236,16 +194,8 @@ bool SIGNAL_WAIT( SIGNAL_T *ref, MUTEX_T *mu, time_d timeout ); #endif // THREADAPI == THREADAPI_WINDOWS || (defined PTHREAD_TIMEDJOIN) -#if THREADWAIT_METHOD == THREADWAIT_TIMEOUT -bool THREAD_WAIT_IMPL( THREAD_T *ref, double secs); -#define THREAD_WAIT( a, b, c, d, e) THREAD_WAIT_IMPL( a, b) -#else // THREADWAIT_METHOD == THREADWAIT_CONDVAR -bool THREAD_WAIT_IMPL( THREAD_T *ref, double secs, SIGNAL_T *signal_ref, MUTEX_T *mu_ref, volatile enum e_status *st_ref); -#define THREAD_WAIT THREAD_WAIT_IMPL -#endif // // THREADWAIT_METHOD == THREADWAIT_CONDVAR - -void THREAD_KILL( THREAD_T* ref); void THREAD_SETNAME( char const* _name); -void THREAD_MAKE_ASYNCH_CANCELLABLE(); void THREAD_SET_PRIORITY( int prio); void THREAD_SET_AFFINITY( unsigned int aff); + +void JTHREAD_SET_PRIORITY(std::jthread& thread_, int prio_); diff --git a/tests/cancel.lua b/tests/cancel.lua index c5bb761..c22103f 100644 --- a/tests/cancel.lua +++ b/tests/cancel.lua @@ -139,6 +139,7 @@ if not next(which_tests) or which_tests.linda then linda:receive( 1, "yeah") -- linda cancel: linda:receive() returns cancel_error immediately + print "cancelling" linda:cancel( "both") -- wait until cancellation is effective. @@ -163,6 +164,7 @@ if not next(which_tests) or which_tests.soft then waitCancellation( h, "waiting") -- soft cancel, this time awakens waiting linda operations, which returns cancel_error immediately, no timeout. + print "cancelling" h:cancel( "soft", true) -- wait until cancellation is effective. the lane will interrupt its loop and print the exit message @@ -177,6 +179,7 @@ if not next(which_tests) or which_tests.hook then linda:receive( 2, "yeah") -- count hook cancel after some instruction instructions + print "cancelling" h:cancel( "line", 300, 5.0) -- wait until cancellation is effective. the lane will interrupt its loop and print the exit message @@ -193,6 +196,7 @@ if not next(which_tests) or which_tests.hard then linda:receive( 2, "yeah") -- hard cancel: the lane will be interrupted from inside its current linda:receive() and won't return from it + print "cancelling" h:cancel() -- wait until cancellation is effective. the lane will be stopped by the linda operation throwing an error @@ -209,27 +213,13 @@ if not next(which_tests) or which_tests.hard_unprotected then linda:receive( 2, "yeah") -- hard cancel: the lane will be interrupted from inside its current linda:receive() and won't return from it + print "cancelling" h:cancel() -- wait until cancellation is effective. the lane will be stopped by the linda operation throwing an error waitCancellation( h, "cancelled") end -if not next(which_tests) or which_tests.kill then - remaining_tests.kill = nil - print "\n\n####################################################################\nbegin kill cancel test\n" - h = lanes.gen( "*", laneBody)( "busy", 50000000) -- start a pure Lua busy loop lane - - -- wait 1/3s before cancelling the lane, before the busy loop can finish - print "wait 0.3s" - linda:receive( 0.3, "yeah") - - -- hard cancel with kill: the lane thread will be forcefully terminated. kill timeout is pthread-specific - h:cancel( true, 1.0) - - -- wait until cancellation is effective. the lane will be stopped by the linda operation throwing an error - waitCancellation( h, "killed") -end --#################################################################### local unknown_test, val = next(remaining_tests) -- cgit v1.2.3-55-g6feb