From 96daea993eeea17f0c64325491943e48795ff751 Mon Sep 17 00:00:00 2001 From: Benoit Germain Date: Mon, 8 Apr 2024 16:57:53 +0200 Subject: C++ migration: use std::jthread, std::condition_variable, std::chrono. win32 pthread support is gone new setting configure.shutdown_mode for cancellation of free-running threads at shutdown. no more hard thread termination! If a thread doesn't cooperate, an error is raised. lane.status "killed" is gone lane:cancel can't force-kill. --- src/cancel.cpp | 178 ++++++++--------- src/cancel.h | 8 +- src/keeper.cpp | 19 +- src/keeper.h | 10 +- src/lanes.cpp | 337 ++++++++++++------------------- src/lanes.lua | 451 +++++++++++++++++++++--------------------- src/lanes_private.h | 43 ++-- src/linda.cpp | 92 +++++---- src/macros_and_utils.h | 5 + src/threading.cpp | 525 +++---------------------------------------------- src/threading.h | 58 +----- 11 files changed, 555 insertions(+), 1171 deletions(-) (limited to 'src') diff --git a/src/cancel.cpp b/src/cancel.cpp index 4667f07..6a94343 100644 --- a/src/cancel.cpp +++ b/src/cancel.cpp @@ -92,7 +92,7 @@ static void cancel_hook(lua_State* L, [[maybe_unused]] lua_Debug* ar) // ################################################################################################ //--- -// = thread_cancel( lane_ud [,timeout_secs=0.0] [,force_kill_bool=false] ) +// = thread_cancel( lane_ud [,timeout_secs=0.0] [,wake_lindas_bool=false] ) // // The originator thread asking us specifically to cancel the other thread. // @@ -100,9 +100,8 @@ static void cancel_hook(lua_State* L, [[maybe_unused]] lua_Debug* ar) // 0.0: just signal it to cancel, no time waited // >0: time to wait for the lane to detect cancellation // -// 'force_kill': if true, and lane does not detect cancellation within timeout, -// it is forcefully killed. Using this with 0.0 timeout means just kill -// (unless the lane is already finished). +// 'wake_lindas_bool': if true, signal any linda the thread is waiting on +// instead of waiting for its timeout (if any) // // Returns: true if the lane was already finished (DONE/ERROR_ST/CANCELLED) or if we // managed to cancel it. @@ -111,76 +110,47 @@ static void cancel_hook(lua_State* L, [[maybe_unused]] lua_Debug* ar) // ################################################################################################ -static CancelResult thread_cancel_soft(Lane* lane_, double secs_, bool wake_lindas_) +static CancelResult thread_cancel_soft(Lane* lane_, lua_Duration duration_, bool wake_lane_) { lane_->cancel_request = CancelRequest::Soft; // it's now signaled to stop // negative timeout: we don't want to truly abort the lane, we just want it to react to cancel_test() on its own - if (wake_lindas_) // wake the thread so that execution returns from any pending linda operation if desired + if (wake_lane_) // wake the thread so that execution returns from any pending linda operation if desired { - SIGNAL_T* const waiting_on{ lane_->waiting_on }; + std::condition_variable* const waiting_on{ lane_->m_waiting_on }; if (lane_->status == WAITING && waiting_on != nullptr) { - SIGNAL_ALL( waiting_on); + waiting_on->notify_all(); } } - return THREAD_WAIT(&lane_->thread, secs_, &lane_->done_signal, &lane_->done_lock, &lane_->status) ? CancelResult::Cancelled : CancelResult::Timeout; + return lane_->waitForCompletion(duration_) ? CancelResult::Cancelled : CancelResult::Timeout; } // ################################################################################################ -static CancelResult thread_cancel_hard(lua_State* L, Lane* lane_, double secs_, bool force_, double waitkill_timeout_) +static CancelResult thread_cancel_hard(Lane* lane_, lua_Duration duration_, bool wake_lane_) { lane_->cancel_request = CancelRequest::Hard; // it's now signaled to stop + //lane_->m_thread.get_stop_source().request_stop(); + if (wake_lane_) // wake the thread so that execution returns from any pending linda operation if desired { - SIGNAL_T* waiting_on = lane_->waiting_on; + std::condition_variable* waiting_on = lane_->m_waiting_on; if (lane_->status == WAITING && waiting_on != nullptr) { - SIGNAL_ALL( waiting_on); + waiting_on->notify_all(); } } - CancelResult result{ THREAD_WAIT(&lane_->thread, secs_, &lane_->done_signal, &lane_->done_lock, &lane_->status) ? CancelResult::Cancelled : CancelResult::Timeout }; - - if ((result == CancelResult::Timeout) && force_) - { - // Killing is asynchronous; we _will_ wait for it to be done at - // GC, to make sure the data structure can be released (alternative - // would be use of "cancellation cleanup handlers" that at least - // PThread seems to have). - // - THREAD_KILL(&lane_->thread); -#if THREADAPI == THREADAPI_PTHREAD - // pthread: make sure the thread is really stopped! - // note that this may block forever if the lane doesn't call a cancellation point and pthread doesn't honor PTHREAD_CANCEL_ASYNCHRONOUS - result = THREAD_WAIT(&lane_->thread, waitkill_timeout_, &lane_->done_signal, &lane_->done_lock, &lane_->status) ? CancelResult::Killed : CancelResult::Timeout; - if (result == CancelResult::Timeout) - { - std::ignore = luaL_error( L, "force-killed lane failed to terminate within %f second%s", waitkill_timeout_, waitkill_timeout_ > 1 ? "s" : ""); - } -#else - (void) waitkill_timeout_; // unused - (void) L; // unused -#endif // THREADAPI == THREADAPI_PTHREAD - lane_->mstatus = Lane::Killed; // mark 'gc' to wait for it - // note that lane_->status value must remain to whatever it was at the time of the kill - // because we need to know if we can lua_close() the Lua State or not. - result = CancelResult::Killed; - } + CancelResult result{ lane_->waitForCompletion(duration_) ? CancelResult::Cancelled : CancelResult::Timeout }; return result; } // ################################################################################################ -CancelResult thread_cancel(lua_State* L, Lane* lane_, CancelOp op_, double secs_, bool force_, double waitkill_timeout_) +CancelResult thread_cancel(Lane* lane_, CancelOp op_, int hook_count_, lua_Duration duration_, bool wake_lane_) { // remember that lanes are not transferable: only one thread can cancel a lane, so no multithreading issue here // We can read 'lane_->status' without locks, but not wait for it (if Posix no PTHREAD_TIMEDJOIN) - if (lane_->mstatus == Lane::Killed) - { - return CancelResult::Killed; - } - if (lane_->status >= DONE) { // say "ok" by default, including when lane is already done @@ -191,48 +161,57 @@ CancelResult thread_cancel(lua_State* L, Lane* lane_, CancelOp op_, double secs_ // let us hope we never land here with a pointer on a linda that has been destroyed... if (op_ == CancelOp::Soft) { - return thread_cancel_soft(lane_, secs_, force_); + return thread_cancel_soft(lane_, duration_, wake_lane_); + } + else if (static_cast(op_) > static_cast(CancelOp::Soft)) + { + lua_sethook(lane_->L, cancel_hook, static_cast(op_), hook_count_); } - return thread_cancel_hard(L, lane_, secs_, force_, waitkill_timeout_); + return thread_cancel_hard(lane_, duration_, wake_lane_); } // ################################################################################################ // ################################################################################################ -// > 0: the mask -// = 0: soft -// < 0: hard -static CancelOp which_op(lua_State* L, int idx_) +CancelOp which_cancel_op(char const* op_string_) +{ + CancelOp op{ CancelOp::Invalid }; + if (strcmp(op_string_, "hard") == 0) + { + op = CancelOp::Hard; + } + else if (strcmp(op_string_, "soft") == 0) + { + op = CancelOp::Soft; + } + else if (strcmp(op_string_, "call") == 0) + { + op = CancelOp::MaskCall; + } + else if (strcmp(op_string_, "ret") == 0) + { + op = CancelOp::MaskRet; + } + else if (strcmp(op_string_, "line") == 0) + { + op = CancelOp::MaskLine; + } + else if (strcmp(op_string_, "count") == 0) + { + op = CancelOp::MaskCount; + } + return op; +} + +// ################################################################################################ + +static CancelOp which_cancel_op(lua_State* L, int idx_) { if (lua_type(L, idx_) == LUA_TSTRING) { - CancelOp op{ CancelOp::Invalid }; - char const* str = lua_tostring(L, idx_); - if (strcmp(str, "hard") == 0) - { - op = CancelOp::Hard; - } - else if (strcmp(str, "soft") == 0) - { - op = CancelOp::Soft; - } - else if (strcmp(str, "call") == 0) - { - op = CancelOp::MaskCall; - } - else if (strcmp(str, "ret") == 0) - { - op = CancelOp::MaskRet; - } - else if (strcmp(str, "line") == 0) - { - op = CancelOp::MaskLine; - } - else if (strcmp(str, "count") == 0) - { - op = CancelOp::MaskCount; - } + char const* const str{ lua_tostring(L, idx_) }; + CancelOp op{ which_cancel_op(str) }; lua_remove(L, idx_); // argument is processed, remove it if (op == CancelOp::Invalid) { @@ -245,53 +224,60 @@ static CancelOp which_op(lua_State* L, int idx_) // ################################################################################################ -// bool[,reason] = lane_h:cancel( [mode, hookcount] [, timeout] [, force [, forcekill_timeout]]) +// bool[,reason] = lane_h:cancel( [mode, hookcount] [, timeout] [, wake_lindas]) LUAG_FUNC(thread_cancel) { Lane* const lane{ lua_toLane(L, 1) }; - CancelOp const op{ which_op(L, 2) }; // this removes the op string from the stack + CancelOp const op{ which_cancel_op(L, 2) }; // this removes the op string from the stack + int hook_count{ 0 }; if (static_cast(op) > static_cast(CancelOp::Soft)) // hook is requested { - int const hook_count{ static_cast(lua_tointeger(L, 2)) }; + hook_count = static_cast(luaL_checkinteger(L, 2)); lua_remove(L, 2); // argument is processed, remove it if (hook_count < 1) { return luaL_error(L, "hook count cannot be < 1"); } - lua_sethook(lane->L, cancel_hook, static_cast(op), hook_count); } - double secs{ 0.0 }; + lua_Duration wait_timeout{ 0.0 }; if (lua_type(L, 2) == LUA_TNUMBER) { - secs = lua_tonumber(L, 2); + wait_timeout = lua_Duration{ lua_tonumber(L, 2) }; lua_remove(L, 2); // argument is processed, remove it - if (secs < 0.0) + if (wait_timeout.count() < 0.0) { return luaL_error(L, "cancel timeout cannot be < 0"); } } - - bool const force{ lua_toboolean(L, 2) ? true : false }; // false if nothing there - double const forcekill_timeout{ luaL_optnumber(L, 3, 0.0) }; - switch (thread_cancel(L, lane, op, secs, force, forcekill_timeout)) + // we wake by default in "hard" mode (remember that hook is hard too), but this can be turned off if desired + bool wake_lane{ op != CancelOp::Soft }; + if (lua_gettop(L) >= 2) + { + if (!lua_isboolean(L, 2)) + { + return luaL_error(L, "wake_lindas parameter is not a boolean"); + } + wake_lane = lua_toboolean(L, 2); + lua_remove(L, 2); // argument is processed, remove it + } + switch (thread_cancel(lane, op, hook_count, wait_timeout, wake_lane)) { + default: // should never happen unless we added a case and forgot to handle it + ASSERT_L(false); + break; + case CancelResult::Timeout: lua_pushboolean(L, 0); lua_pushstring(L, "timeout"); - return 2; + break; case CancelResult::Cancelled: lua_pushboolean(L, 1); push_thread_status(L, lane); - return 2; - - case CancelResult::Killed: - lua_pushboolean(L, 1); - push_thread_status(L, lane); - return 2; + break; } // should never happen, only here to prevent the compiler from complaining of "not all control paths returning a value" - return 0; + return 2; } diff --git a/src/cancel.h b/src/cancel.h index 884e193..954b04e 100644 --- a/src/cancel.h +++ b/src/cancel.h @@ -13,6 +13,8 @@ extern "C" { #include "uniquekey.h" #include "macros_and_utils.h" +#include + // ################################################################################################ class Lane; // forward @@ -30,8 +32,7 @@ enum class CancelRequest enum class CancelResult { Timeout, - Cancelled, - Killed + Cancelled }; enum class CancelOp @@ -48,7 +49,8 @@ enum class CancelOp // crc64/we of string "CANCEL_ERROR" generated at http://www.nitrxgen.net/hashgen/ static constexpr UniqueKey CANCEL_ERROR{ 0xe97d41626cc97577ull }; // 'raise_cancel_error' sentinel -CancelResult thread_cancel(lua_State* L, Lane* lane_, CancelOp op_, double secs_, bool force_, double waitkill_timeout_); +CancelOp which_cancel_op(char const* op_string_); +CancelResult thread_cancel(Lane* lane_, CancelOp op_, int hook_count_, lua_Duration secs_, bool wake_lindas_); [[noreturn]] static inline void raise_cancel_error(lua_State* L) { diff --git a/src/keeper.cpp b/src/keeper.cpp index 937d190..0aea18e 100644 --- a/src/keeper.cpp +++ b/src/keeper.cpp @@ -627,7 +627,7 @@ void close_keepers(Universe* U) } for (int i = 0; i < nbKeepers; ++i) { - MUTEX_FREE(&U->keepers->keeper_array[i].keeper_cs); + U->keepers->keeper_array[i].~Keeper(); } // free the keeper bookkeeping structure U->internal_allocator.free(U->keepers, sizeof(Keepers) + (nbKeepers - 1) * sizeof(Keeper)); @@ -673,9 +673,14 @@ void init_keepers(Universe* U, lua_State* L) { std::ignore = luaL_error(L, "init_keepers() failed while creating keeper array; out of memory"); } - memset(U->keepers, 0, bytes); + U->keepers->Keepers::Keepers(); U->keepers->gc_threshold = keepers_gc_threshold; U->keepers->nb_keepers = nb_keepers; + + for (int i = 0; i < nb_keepers; ++i) + { + U->keepers->keeper_array[i].Keeper::Keeper(); + } } for (int i = 0; i < nb_keepers; ++i) // keepersUD { @@ -687,10 +692,6 @@ void init_keepers(Universe* U, lua_State* L) } U->keepers->keeper_array[i].L = K; - // we can trigger a GC from inside keeper_call(), where a keeper is acquired - // from there, GC can collect a linda, which would acquire the keeper again, and deadlock the thread. - // therefore, we need a recursive mutex. - MUTEX_RECURSIVE_INIT(&U->keepers->keeper_array[i].keeper_cs); if (U->keepers->gc_threshold >= 0) { @@ -772,8 +773,7 @@ Keeper* keeper_acquire(Keepers* keepers_, uintptr_t magic_) */ unsigned int i = (unsigned int)((magic_ >> KEEPER_MAGIC_SHIFT) % nbKeepers); Keeper* K = &keepers_->keeper_array[i]; - - MUTEX_LOCK( &K->keeper_cs); + K->m_mutex.lock(); //++ K->count; return K; } @@ -787,7 +787,7 @@ void keeper_release(Keeper* K) //-- K->count; if (K) { - MUTEX_UNLOCK(&K->keeper_cs); + K->m_mutex.unlock(); } } @@ -843,7 +843,6 @@ int keeper_call(Universe* U, lua_State* K, keeper_api_t func_, lua_State* L, voi if ((args == 0) || luaG_inter_copy(U, L, K, args, LookupMode::ToKeeper) == 0) // L->K { lua_call(K, 1 + args, LUA_MULTRET); - retvals = lua_gettop(K) - Ktos; // note that this can raise a luaL_error while the keeper state (and its mutex) is acquired // this may interrupt a lane, causing the destruction of the underlying OS thread diff --git a/src/keeper.h b/src/keeper.h index f7e3951..931c1d5 100644 --- a/src/keeper.h +++ b/src/keeper.h @@ -11,21 +11,23 @@ extern "C" { #include "threading.h" #include "uniquekey.h" +#include + // forwards enum class LookupMode; struct Universe; struct Keeper { - MUTEX_T keeper_cs; - lua_State* L; + std::mutex m_mutex; + lua_State* L{ nullptr }; // int count; }; struct Keepers { int gc_threshold{ 0 }; - int nb_keepers; + int nb_keepers{ 0 }; Keeper keeper_array[1]; }; @@ -38,7 +40,7 @@ void close_keepers(Universe* U); Keeper* which_keeper(Keepers* keepers_, uintptr_t magic_); Keeper* keeper_acquire(Keepers* keepers_, uintptr_t magic_); -void keeper_release(Keeper* K); +void keeper_release(Keeper* K_); void keeper_toggle_nil_sentinels(lua_State* L, int val_i_, LookupMode const mode_); int keeper_push_linda_storage(Universe* U, lua_State* L, void* ptr_, uintptr_t magic_); diff --git a/src/lanes.cpp b/src/lanes.cpp index 08584a2..4dd9b46 100644 --- a/src/lanes.cpp +++ b/src/lanes.cpp @@ -108,11 +108,6 @@ Lane::Lane(Universe* U_, lua_State* L_) : U{ U_ } , L{ L_ } { -#if THREADWAIT_METHOD == THREADWAIT_CONDVAR - MUTEX_INIT(&done_lock); - SIGNAL_INIT(&done_signal); -#endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR - #if HAVE_LANE_TRACKING() if (U->tracking_first) { @@ -121,6 +116,29 @@ Lane::Lane(Universe* U_, lua_State* L_) #endif // HAVE_LANE_TRACKING() } +bool Lane::waitForCompletion(lua_Duration duration_) +{ + std::chrono::time_point until{ std::chrono::time_point::max() }; + if (duration_.count() >= 0.0) + { + until = std::chrono::steady_clock::now() + std::chrono::duration_cast(duration_); + } + + std::unique_lock lock{ m_done_mutex }; + //std::stop_token token{ m_thread.get_stop_token() }; + //return m_done_signal.wait_for(lock, token, secs_, [this](){ return status >= DONE; }); + return m_done_signal.wait_until(lock, until, [this](){ return status >= DONE; }); +} + +static void lane_main(Lane* lane); +void Lane::startThread(int priority_) +{ + m_thread = std::jthread([this]() { lane_main(this); }); + if (priority_ != THREAD_PRIO_DEFAULT) + { + JTHREAD_SET_PRIORITY(m_thread, priority_); + } +} /* Do you want full call stacks, or just the line where the error happened? * @@ -144,7 +162,7 @@ static void securize_debug_threadname(lua_State* L, Lane* lane_) } #if ERROR_FULL_STACK -static int lane_error( lua_State* L); +static int lane_error(lua_State* L); // crc64/we of string "STACKTRACE_REGKEY" generated at http://www.nitrxgen.net/hashgen/ static constexpr UniqueKey STACKTRACE_REGKEY{ 0x534af7d3226a429full }; #endif // ERROR_FULL_STACK @@ -255,11 +273,6 @@ Lane::~Lane() { // Clean up after a (finished) thread // -#if THREADWAIT_METHOD == THREADWAIT_CONDVAR - SIGNAL_FREE(&done_signal); - MUTEX_FREE(&done_lock); -#endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR - #if HAVE_LANE_TRACKING() if (U->tracking_first != nullptr) { @@ -455,26 +468,27 @@ static bool selfdestruct_remove(Lane* lane_) static int universe_gc( lua_State* L) { Universe* const U{ lua_tofulluserdata(L, 1) }; + lua_Duration const shutdown_timeout{ lua_tonumber(L, lua_upvalueindex(1)) }; + [[maybe_unused]] char const* const op_string{ lua_tostring(L, lua_upvalueindex(2)) }; + CancelOp const op{ which_cancel_op(op_string) }; - while (U->selfdestruct_first != SELFDESTRUCT_END) // true at most once! + if (U->selfdestruct_first != SELFDESTRUCT_END) { + // Signal _all_ still running threads to exit (including the timer thread) // { std::lock_guard guard{ U->selfdestruct_cs }; Lane* lane{ U->selfdestruct_first }; + lua_Duration timeout{ 1us }; while (lane != SELFDESTRUCT_END) { - // attempt a regular unforced hard cancel with a small timeout - bool const cancelled{ THREAD_ISNULL(lane->thread) || thread_cancel(L, lane, CancelOp::Hard, 0.0001, false, 0.0) != CancelResult::Timeout }; - // if we failed, and we know the thread is waiting on a linda - if (cancelled == false && lane->status == WAITING && lane->waiting_on != nullptr) + // attempt the requested cancel with a small timeout. + // if waiting on a linda, they will raise a cancel_error. + // if a cancellation hook is desired, it will be installed to try to raise an error + if (lane->m_thread.joinable()) { - // signal the linda to wake up the thread so that it can react to the cancel query - // let us hope we never land here with a pointer on a linda that has been destroyed... - SIGNAL_T* const waiting_on{ lane->waiting_on }; - // lane->waiting_on = nullptr; // useful, or not? - SIGNAL_ALL(waiting_on); + std::ignore = thread_cancel(lane, op, 1, timeout, true); } lane = lane->selfdestruct_next; } @@ -482,47 +496,32 @@ static int universe_gc( lua_State* L) // When noticing their cancel, the lanes will remove themselves from // the selfdestruct chain. - - // TBD: Not sure if Windows (multi core) will require the timed approach, - // or single Yield. I don't have machine to test that (so leaving - // for timed approach). -- AKa 25-Oct-2008 - - // OS X 10.5 (Intel) needs more to avoid segfaults. - // - // "make test" is okay. 100's of "make require" are okay. - // - // Tested on MacBook Core Duo 2GHz and 10.5.5: - // -- AKa 25-Oct-2008 - // { - lua_Number const shutdown_timeout = lua_tonumber(L, lua_upvalueindex(1)); - double const t_until = now_secs() + shutdown_timeout; + std::chrono::time_point t_until{ std::chrono::steady_clock::now() + std::chrono::duration_cast(shutdown_timeout) }; while (U->selfdestruct_first != SELFDESTRUCT_END) { - YIELD(); // give threads time to act on their cancel + // give threads time to act on their cancel + YIELD(); + // count the number of cancelled thread that didn't have the time to act yet + int n{ 0 }; { - // count the number of cancelled thread that didn't have the time to act yet - int n = 0; - double t_now = 0.0; + std::lock_guard guard{ U->selfdestruct_cs }; + Lane* lane{ U->selfdestruct_first }; + while (lane != SELFDESTRUCT_END) { - std::lock_guard guard{ U->selfdestruct_cs }; - Lane* lane{ U->selfdestruct_first }; - while (lane != SELFDESTRUCT_END) - { - if (lane->cancel_request == CancelRequest::Hard) - ++n; - lane = lane->selfdestruct_next; - } - } - // if timeout elapsed, or we know all threads have acted, stop waiting - t_now = now_secs(); - if (n == 0 || (t_now >= t_until)) - { - DEBUGSPEW_CODE(fprintf(stderr, "%d uncancelled lane(s) remain after waiting %fs at process end.\n", n, shutdown_timeout - (t_until - t_now))); - break; + if (lane->cancel_request != CancelRequest::None) + ++n; + lane = lane->selfdestruct_next; } } + // if timeout elapsed, or we know all threads have acted, stop waiting + std::chrono::time_point t_now = std::chrono::steady_clock::now(); + if (n == 0 || (t_now >= t_until)) + { + DEBUGSPEW_CODE(fprintf(stderr, "%d uncancelled lane(s) remain after waiting %fs at process end.\n", n, shutdown_timeout.count())); + break; + } } } @@ -532,48 +531,17 @@ static int universe_gc( lua_State* L) { YIELD(); } - - //--- - // Kill the still free running threads - // - if (U->selfdestruct_first != SELFDESTRUCT_END) - { - unsigned int n = 0; - // first thing we did was to raise the linda signals the threads were waiting on (if any) - // therefore, any well-behaved thread should be in CANCELLED state - // these are not running, and the state can be closed - { - std::lock_guard guard{ U->selfdestruct_cs }; - Lane* lane{ U->selfdestruct_first }; - while (lane != SELFDESTRUCT_END) - { - Lane* const next_s{ lane->selfdestruct_next }; - lane->selfdestruct_next = nullptr; // detach from selfdestruct chain - if (!THREAD_ISNULL(lane->thread)) // can be nullptr if previous 'soft' termination succeeded - { - THREAD_KILL(&lane->thread); -#if THREADAPI == THREADAPI_PTHREAD - // pthread: make sure the thread is really stopped! - THREAD_WAIT(&lane->thread, -1, &lane->done_signal, &lane->done_lock, &lane->status); -#endif // THREADAPI == THREADAPI_PTHREAD - } - // NO lua_close() in this case because we don't know where execution of the state was interrupted - delete lane; - lane = next_s; - ++n; - } - U->selfdestruct_first = SELFDESTRUCT_END; - } - - DEBUGSPEW_CODE(fprintf(stderr, "Killed %d lane(s) at process end.\n", n)); - } } - // If some lanes are currently cleaning after themselves, wait until they are done. - // They are no longer listed in the selfdestruct chain, but they still have to lua_close(). - while (U->selfdestructing_count.load(std::memory_order_acquire) > 0) + // If after all this, we still have some free-running lanes, it's an external user error, they should have stopped appropriately { - YIELD(); + std::lock_guard guard{ U->selfdestruct_cs }; + Lane* lane{ U->selfdestruct_first }; + if (lane != SELFDESTRUCT_END) + { + // this causes a leak because we don't call U's destructor (which could be bad if the still running lanes are accessing it) + std::ignore = luaL_error(L, "Zombie thread %s refuses to die!", lane->debug_name); + } } // necessary so that calling free_deep_prelude doesn't crash because linda_id expects a linda lightuserdata at absolute slot 1 @@ -874,20 +842,8 @@ static char const* get_errcode_name( int _code) } #endif // USE_DEBUG_SPEW() -#if THREADWAIT_METHOD == THREADWAIT_CONDVAR // implies THREADAPI == THREADAPI_PTHREAD -static void thread_cleanup_handler(void* opaque) +static void lane_main(Lane* lane) { - Lane* lane{ (Lane*) opaque }; - MUTEX_LOCK(&lane->done_lock); - lane->status = CANCELLED; - SIGNAL_ONE(&lane->done_signal); // wake up master (while 'lane->done_lock' is on) - MUTEX_UNLOCK(&lane->done_lock); -} -#endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR - -static THREAD_RETURN_T THREAD_CALLCONV lane_main(void* vs) -{ - Lane* lane{ (Lane*) vs }; lua_State* const L{ lane->L }; // wait until the launching thread has finished preparing L lane->m_ready.wait(); @@ -897,8 +853,6 @@ static THREAD_RETURN_T THREAD_CALLCONV lane_main(void* vs) // At this point, the lane function and arguments are on the stack int const nargs{ lua_gettop(L) - 1 }; DEBUGSPEW_CODE(Universe* U = universe_get(L)); - THREAD_MAKE_ASYNCH_CANCELLABLE(); - THREAD_CLEANUP_PUSH(thread_cleanup_handler, lane); lane->status = RUNNING; // PENDING -> RUNNING // Tie "set_finalizer()" to the state @@ -949,18 +903,19 @@ static THREAD_RETURN_T THREAD_CALLCONV lane_main(void* vs) // the finalizer generated an error, and left its own error message [and stack trace] on the stack rc = rc2; // we're overruling the earlier script error or normal return } - lane->waiting_on = nullptr; // just in case + lane->m_waiting_on = nullptr; // just in case if (selfdestruct_remove(lane)) // check and remove (under lock!) { // We're a free-running thread and no-one's there to clean us up. - // lua_close(lane->L); - + lane->L = nullptr; // just in case lane->U->selfdestruct_cs.lock(); // done with lua_close(), terminal shutdown sequence may proceed lane->U->selfdestructing_count.fetch_sub(1, std::memory_order_release); lane->U->selfdestruct_cs.unlock(); + // we destroy our jthread member from inside the thread body, so we have to detach so that we don't try to join, as this doesn't seem a good idea + lane->m_thread.detach(); delete lane; lane = nullptr; } @@ -972,21 +927,14 @@ static THREAD_RETURN_T THREAD_CALLCONV lane_main(void* vs) enum e_status st = (rc == 0) ? DONE : CANCEL_ERROR.equals(L, 1) ? CANCELLED : ERROR_ST; // Posix no PTHREAD_TIMEDJOIN: - // 'done_lock' protects the -> DONE|ERROR_ST|CANCELLED state change + // 'm_done_mutex' protects the -> DONE|ERROR_ST|CANCELLED state change // -#if THREADWAIT_METHOD == THREADWAIT_CONDVAR - MUTEX_LOCK(&lane->done_lock); { -#endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR + std::lock_guard lock{ lane->m_done_mutex }; lane->status = st; -#if THREADWAIT_METHOD == THREADWAIT_CONDVAR - SIGNAL_ONE(&lane->done_signal); // wake up master (while 'lane->done_lock' is on) + lane->m_done_signal.notify_one();// wake up master (while 'lane->m_done_mutex' is on) } - MUTEX_UNLOCK(&lane->done_lock); -#endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR } - THREAD_CLEANUP_POP(false); - return 0; // ignored } // ################################################################################################# @@ -1115,13 +1063,11 @@ LUAG_FUNC(lane_new) // leave a single cancel_error on the stack for the caller lua_settop(m_lane->L, 0); CANCEL_ERROR.pushKey(m_lane->L); -#if THREADWAIT_METHOD == THREADWAIT_CONDVAR - MUTEX_LOCK(&m_lane->done_lock); -#endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR - m_lane->status = CANCELLED; -#if THREADWAIT_METHOD == THREADWAIT_CONDVAR - MUTEX_UNLOCK(&m_lane->done_lock); -#endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR + { + std::lock_guard lock{ m_lane->m_done_mutex }; + m_lane->status = CANCELLED; + m_lane->m_done_signal.notify_one(); // wake up master (while 'lane->m_done_mutex' is on) + } // unblock the thread so that it can terminate gracefully m_lane->m_ready.count_down(); } @@ -1170,7 +1116,7 @@ LUAG_FUNC(lane_new) } onExit{ L, lane, gc_cb_idx }; // launch the thread early, it will sync with a std::latch to parallelize OS thread warmup and L2 preparation DEBUGSPEW_CODE(fprintf(stderr, INDENT_BEGIN "lane_new: launching thread\n" INDENT_END)); - THREAD_CREATE(&lane->thread, lane_main, lane, priority); + lane->startThread(priority); STACK_GROW( L2, nargs + 3); // STACK_CHECK_START_REL(L2, 0); @@ -1347,7 +1293,7 @@ LUAG_FUNC(lane_new) static int lane_gc(lua_State* L) { bool have_gc_cb{ false }; - Lane* lane{ lua_toLane(L, 1) }; // ud + Lane* const lane{ lua_toLane(L, 1) }; // ud // if there a gc callback? lua_getiuservalue(L, 1, 1); // ud uservalue @@ -1365,30 +1311,7 @@ static int lane_gc(lua_State* L) } // We can read 'lane->status' without locks, but not wait for it - // test Killed state first, as it doesn't need to enter the selfdestruct chain - if (lane->mstatus == Lane::Killed) - { - // Make sure a kill has proceeded, before cleaning up the data structure. - // - // NO lua_close() in this case because we don't know where execution of the state was interrupted - DEBUGSPEW_CODE(fprintf(stderr, "** Joining with a killed thread (needs testing) **")); - // make sure the thread is no longer running, just like thread_join() - if (!THREAD_ISNULL(lane->thread)) - { - THREAD_WAIT(&lane->thread, -1, &lane->done_signal, &lane->done_lock, &lane->status); - } - if (lane->status >= DONE && lane->L) - { - // we know the thread was killed while the Lua VM was not doing anything: we should be able to close it without crashing - // now, thread_cancel() will not forcefully kill a lane with lane->status >= DONE, so I am not sure it can ever happen - lua_close(lane->L); - lane->L = nullptr; - // just in case, but s will be freed soon so... - lane->debug_name = ""; - } - DEBUGSPEW_CODE(fprintf(stderr, "** Joined ok **")); - } - else if (lane->status < DONE) + if (lane->status < DONE) { // still running: will have to be cleaned up later selfdestruct_add(lane); @@ -1437,7 +1360,6 @@ static char const * thread_status_string(Lane* lane_) { enum e_status const st{ lane_->status }; // read just once (volatile) char const* str = - (lane_->mstatus == Lane::Killed) ? "killed" : // new to v3.3.0! (st == PENDING) ? "pending" : (st == RUNNING) ? "running" : // like in 'co.status()' (st == WAITING) ? "waiting" : @@ -1471,9 +1393,10 @@ int push_thread_status(lua_State* L, Lane* lane_) LUAG_FUNC(thread_join) { Lane* const lane{ lua_toLane(L, 1) }; - lua_Number const wait_secs{ luaL_optnumber(L, 2, -1.0) }; + lua_Duration const duration{ luaL_optnumber(L, 2, -1.0) }; lua_State* const L2{ lane->L }; - bool const done{ THREAD_ISNULL(lane->thread) || THREAD_WAIT(&lane->thread, wait_secs, &lane->done_signal, &lane->done_lock, &lane->status) }; + + bool const done{ !lane->m_thread.joinable() || lane->waitForCompletion(duration) }; if (!done || !L2) { STACK_GROW(L, 2); @@ -1486,58 +1409,47 @@ LUAG_FUNC(thread_join) // Thread is DONE/ERROR_ST/CANCELLED; all ours now int ret{ 0 }; - if (lane->mstatus == Lane::Killed) // OS thread was killed if thread_cancel was forced - { - // in that case, even if the thread was killed while DONE/ERROR_ST/CANCELLED, ignore regular return values - STACK_GROW(L, 2); - lua_pushnil(L); - lua_pushliteral(L, "killed"); - ret = 2; - } - else + Universe* const U{ lane->U }; + // debug_name is a pointer to string possibly interned in the lane's state, that no longer exists when the state is closed + // so store it in the userdata uservalue at a key that can't possibly collide + securize_debug_threadname(L, lane); + switch (lane->status) { - Universe* const U{ lane->U }; - // debug_name is a pointer to string possibly interned in the lane's state, that no longer exists when the state is closed - // so store it in the userdata uservalue at a key that can't possibly collide - securize_debug_threadname(L, lane); - switch (lane->status) + case DONE: { - case DONE: + int const n{ lua_gettop(L2) }; // whole L2 stack + if ((n > 0) && (luaG_inter_move(U, L2, L, n, LookupMode::LaneBody) != 0)) { - int const n{ lua_gettop(L2) }; // whole L2 stack - if ((n > 0) && (luaG_inter_move(U, L2, L, n, LookupMode::LaneBody) != 0)) - { - return luaL_error(L, "tried to copy unsupported types"); - } - ret = n; + return luaL_error(L, "tried to copy unsupported types"); } - break; + ret = n; + } + break; - case ERROR_ST: + case ERROR_ST: + { + int const n{ lua_gettop(L2) }; + STACK_GROW(L, 3); + lua_pushnil(L); + // even when ERROR_FULL_STACK, if the error is not LUA_ERRRUN, the handler wasn't called, and we only have 1 error message on the stack ... + if (luaG_inter_move(U, L2, L, n, LookupMode::LaneBody) != 0) // nil "err" [trace] { - int const n{ lua_gettop(L2) }; - STACK_GROW(L, 3); - lua_pushnil(L); - // even when ERROR_FULL_STACK, if the error is not LUA_ERRRUN, the handler wasn't called, and we only have 1 error message on the stack ... - if (luaG_inter_move(U, L2, L, n, LookupMode::LaneBody) != 0) // nil "err" [trace] - { - return luaL_error(L, "tried to copy unsupported types: %s", lua_tostring(L, -n)); - } - ret = 1 + n; + return luaL_error(L, "tried to copy unsupported types: %s", lua_tostring(L, -n)); } - break; + ret = 1 + n; + } + break; - case CANCELLED: - ret = 0; - break; + case CANCELLED: + ret = 0; + break; - default: - DEBUGSPEW_CODE(fprintf(stderr, "Status: %d\n", lane->status)); - ASSERT_L(false); - ret = 0; - } - lua_close(L2); + default: + DEBUGSPEW_CODE(fprintf(stderr, "Status: %d\n", lane->status)); + ASSERT_L(false); + ret = 0; } + lua_close(L2); lane->L = nullptr; STACK_CHECK(L, ret); return ret; @@ -1596,15 +1508,12 @@ LUAG_FUNC(thread_index) switch (lane->status) { default: - if (lane->mstatus != Lane::Killed) - { - // this is an internal error, we probably never get here - lua_settop(L, 0); - lua_pushliteral(L, "Unexpected status: "); - lua_pushstring(L, thread_status_string(lane)); - lua_concat(L, 2); - raise_lua_error(L); - } + // this is an internal error, we probably never get here + lua_settop(L, 0); + lua_pushliteral(L, "Unexpected status: "); + lua_pushstring(L, thread_status_string(lane)); + lua_concat(L, 2); + raise_lua_error(L); [[fallthrough]]; // fall through if we are killed, as we got nil, "killed" on the stack case DONE: // got regular return values @@ -1790,8 +1699,7 @@ LUAG_FUNC(wakeup_conv) lua_pop(L,1); STACK_CHECK(L, 0); - struct tm t; - memset(&t, 0, sizeof(t)); + std::tm t{}; t.tm_year = year - 1900; t.tm_mon= month-1; // 0..11 t.tm_mday= day; // 1..31 @@ -1800,7 +1708,7 @@ LUAG_FUNC(wakeup_conv) t.tm_sec= sec; // 0..60 t.tm_isdst= isdst; // 0/1/negative - lua_pushnumber(L, static_cast(mktime(&t))); // ms=0 + lua_pushnumber(L, static_cast(std::mktime(&t))); // resolution: 1 second return 1; } @@ -1909,13 +1817,14 @@ LUAG_FUNC(configure) DEBUGSPEW_CODE(fprintf( stderr, INDENT_BEGIN "%p: lanes.configure() BEGIN\n" INDENT_END, L)); DEBUGSPEW_CODE(if (U) U->debugspew_indent_depth.fetch_add(1, std::memory_order_relaxed)); - if(U == nullptr) + if (U == nullptr) { - U = universe_create( L); // settings universe + U = universe_create(L); // settings universe DEBUGSPEW_CODE(U->debugspew_indent_depth.fetch_add(1, std::memory_order_relaxed)); lua_newtable( L); // settings universe mt lua_getfield(L, 1, "shutdown_timeout"); // settings universe mt shutdown_timeout - lua_pushcclosure(L, universe_gc, 1); // settings universe mt universe_gc + lua_getfield(L, 1, "shutdown_mode"); // settings universe mt shutdown_timeout shutdown_mode + lua_pushcclosure(L, universe_gc, 2); // settings universe mt universe_gc lua_setfield(L, -2, "__gc"); // settings universe mt lua_setmetatable(L, -2); // settings universe lua_pop(L, 1); // settings diff --git a/src/lanes.lua b/src/lanes.lua index 6af286a..fd3d22b 100644 --- a/src/lanes.lua +++ b/src/lanes.lua @@ -73,6 +73,7 @@ lanes.configure = function( settings_) keepers_gc_threshold = -1, on_state_create = nil, shutdown_timeout = 0.25, + shutdown_mode = "hard", with_timers = true, track_lanes = false, demote_full_userdata = nil, @@ -113,6 +114,11 @@ lanes.configure = function( settings_) -- shutdown_timeout should be a number >= 0 return type( val_) == "number" and val_ >= 0 end, + shutdown_mode = function( val_) + local valid_hooks = { soft = true, hard = true, call = true, ret = true, line = true, count = true } + -- shutdown_mode should be a known hook mask + return valid_hooks[val_] + end, track_lanes = boolean_param_checker, demote_full_userdata = boolean_param_checker, verbose_errors = boolean_param_checker @@ -367,262 +373,263 @@ lanes.configure = function( settings_) if settings.with_timers ~= false then + -- + -- On first 'require "lanes"', a timer lane is spawned that will maintain + -- timer tables and sleep in between the timer events. All interaction with + -- the timer lane happens via a 'timer_gateway' Linda, which is common to + -- all that 'require "lanes"'. + -- + -- Linda protocol to timer lane: + -- + -- TGW_KEY: linda_h, key, [wakeup_at_secs], [repeat_secs] + -- + local TGW_KEY= "(timer control)" -- the key does not matter, a 'weird' key may help debugging + local TGW_QUERY, TGW_REPLY = "(timer query)", "(timer reply)" + local first_time_key= "first time" - -- - -- On first 'require "lanes"', a timer lane is spawned that will maintain - -- timer tables and sleep in between the timer events. All interaction with - -- the timer lane happens via a 'timer_gateway' Linda, which is common to - -- all that 'require "lanes"'. - -- - -- Linda protocol to timer lane: - -- - -- TGW_KEY: linda_h, key, [wakeup_at_secs], [repeat_secs] - -- - local TGW_KEY= "(timer control)" -- the key does not matter, a 'weird' key may help debugging - local TGW_QUERY, TGW_REPLY = "(timer query)", "(timer reply)" - local first_time_key= "first time" - - local first_time = timer_gateway:get( first_time_key) == nil - timer_gateway:set( first_time_key, true) - - -- - -- Timer lane; initialize only on the first 'require "lanes"' instance (which naturally - -- has 'table' always declared) - -- - if first_time then + local first_time = timer_gateway:get( first_time_key) == nil + timer_gateway:set( first_time_key, true) local now_secs = core.now_secs - assert( type( now_secs) == "function") - ----- - -- Snore loop (run as a lane on the background) - -- - -- High priority, to get trustworthy timings. + local wakeup_conv = core.wakeup_conv + -- - -- We let the timer lane be a "free running" thread; no handle to it - -- remains. + -- Timer lane; initialize only on the first 'require "lanes"' instance (which naturally + -- has 'table' always declared) -- - local timer_body = function() - set_debug_threadname( "LanesTimer") - -- - -- { [deep_linda_lightuserdata]= { [deep_linda_lightuserdata]=linda_h, - -- [key]= { wakeup_secs [,period_secs] } [, ...] }, - -- } - -- - -- Collection of all running timers, indexed with linda's & key. + if first_time then + + assert( type( now_secs) == "function") + ----- + -- Snore loop (run as a lane on the background) -- - -- Note that we need to use the deep lightuserdata identifiers, instead - -- of 'linda_h' themselves as table indices. Otherwise, we'd get multiple - -- entries for the same timer. + -- High priority, to get trustworthy timings. -- - -- The 'hidden' reference to Linda proxy is used in 'check_timers()' but - -- also important to keep the Linda alive, even if all outside world threw - -- away pointers to it (which would ruin uniqueness of the deep pointer). - -- Now we're safe. + -- We let the timer lane be a "free running" thread; no handle to it + -- remains. -- - local collection = {} - local table_insert = assert( table.insert) - - local get_timers = function() - local r = {} - for deep, t in pairs( collection) do - -- WR( tostring( deep)) - local l = t[deep] - for key, timer_data in pairs( t) do - if key ~= deep then - table_insert( r, {l, key, timer_data}) + local timer_body = function() + set_debug_threadname( "LanesTimer") + -- + -- { [deep_linda_lightuserdata]= { [deep_linda_lightuserdata]=linda_h, + -- [key]= { wakeup_secs [,period_secs] } [, ...] }, + -- } + -- + -- Collection of all running timers, indexed with linda's & key. + -- + -- Note that we need to use the deep lightuserdata identifiers, instead + -- of 'linda_h' themselves as table indices. Otherwise, we'd get multiple + -- entries for the same timer. + -- + -- The 'hidden' reference to Linda proxy is used in 'check_timers()' but + -- also important to keep the Linda alive, even if all outside world threw + -- away pointers to it (which would ruin uniqueness of the deep pointer). + -- Now we're safe. + -- + local collection = {} + local table_insert = assert( table.insert) + + local get_timers = function() + local r = {} + for deep, t in pairs( collection) do + -- WR( tostring( deep)) + local l = t[deep] + for key, timer_data in pairs( t) do + if key ~= deep then + table_insert( r, {l, key, timer_data}) + end end end - end - return r - end -- get_timers() - - -- - -- set_timer( linda_h, key [,wakeup_at_secs [,period_secs]] ) - -- - local set_timer = function( linda, key, wakeup_at, period) - assert( wakeup_at == nil or wakeup_at > 0.0) - assert( period == nil or period > 0.0) + return r + end -- get_timers() - local linda_deep = linda:deep() - assert( linda_deep) - - -- Find or make a lookup for this timer -- - local t1 = collection[linda_deep] - if not t1 then - t1 = { [linda_deep] = linda} -- proxy to use the Linda - collection[linda_deep] = t1 - end + -- set_timer( linda_h, key [,wakeup_at_secs [,period_secs]] ) + -- + local set_timer = function( linda, key, wakeup_at, period) + assert( wakeup_at == nil or wakeup_at > 0.0) + assert( period == nil or period > 0.0) - if wakeup_at == nil then - -- Clear the timer - -- - t1[key]= nil + local linda_deep = linda:deep() + assert( linda_deep) - -- Remove empty tables from collection; speeds timer checks and - -- lets our 'safety reference' proxy be gc:ed as well. + -- Find or make a lookup for this timer -- - local empty = true - for k, _ in pairs( t1) do - if k ~= linda_deep then - empty = false - break - end - end - if empty then - collection[linda_deep] = nil + local t1 = collection[linda_deep] + if not t1 then + t1 = { [linda_deep] = linda} -- proxy to use the Linda + collection[linda_deep] = t1 end - -- Note: any unread timer value is left at 'linda[key]' intensionally; - -- clearing a timer just stops it. - else - -- New timer or changing the timings - -- - local t2 = t1[key] - if not t2 then - t2= {} - t1[key]= t2 - end + if wakeup_at == nil then + -- Clear the timer + -- + t1[key]= nil - t2[1] = wakeup_at - t2[2] = period -- can be 'nil' - end - end -- set_timer() + -- Remove empty tables from collection; speeds timer checks and + -- lets our 'safety reference' proxy be gc:ed as well. + -- + local empty = true + for k, _ in pairs( t1) do + if k ~= linda_deep then + empty = false + break + end + end + if empty then + collection[linda_deep] = nil + end - ----- - -- [next_wakeup_at]= check_timers() - -- Check timers, and wake up the ones expired (if any) - -- Returns the closest upcoming (remaining) wakeup time (or 'nil' if none). - local check_timers = function() - local now = now_secs() - local next_wakeup - - for linda_deep,t1 in pairs(collection) do - for key,t2 in pairs(t1) do + -- Note: any unread timer value is left at 'linda[key]' intensionally; + -- clearing a timer just stops it. + else + -- New timer or changing the timings -- - if key==linda_deep then - -- no 'continue' in Lua :/ - else - -- 't2': { wakeup_at_secs [,period_secs] } + local t2 = t1[key] + if not t2 then + t2= {} + t1[key]= t2 + end + + t2[1] = wakeup_at + t2[2] = period -- can be 'nil' + end + end -- set_timer() + + ----- + -- [next_wakeup_at]= check_timers() + -- Check timers, and wake up the ones expired (if any) + -- Returns the closest upcoming (remaining) wakeup time (or 'nil' if none). + local check_timers = function() + local now = now_secs() + local next_wakeup + + for linda_deep,t1 in pairs(collection) do + for key,t2 in pairs(t1) do -- - local wakeup_at= t2[1] - local period= t2[2] -- may be 'nil' - - if wakeup_at <= now then - local linda= t1[linda_deep] - assert(linda) - - linda:set( key, now ) - - -- 'pairs()' allows the values to be modified (and even - -- removed) as far as keys are not touched - - if not period then - -- one-time timer; gone - -- - t1[key]= nil - wakeup_at= nil -- no 'continue' in Lua :/ - else - -- repeating timer; find next wakeup (may jump multiple repeats) - -- - repeat - wakeup_at= wakeup_at+period - until wakeup_at > now - - t2[1]= wakeup_at + if key==linda_deep then + -- no 'continue' in Lua :/ + else + -- 't2': { wakeup_at_secs [,period_secs] } + -- + local wakeup_at= t2[1] + local period= t2[2] -- may be 'nil' + + if wakeup_at <= now then + local linda= t1[linda_deep] + assert(linda) + + linda:set( key, now ) + + -- 'pairs()' allows the values to be modified (and even + -- removed) as far as keys are not touched + + if not period then + -- one-time timer; gone + -- + t1[key]= nil + wakeup_at= nil -- no 'continue' in Lua :/ + else + -- repeating timer; find next wakeup (may jump multiple repeats) + -- + repeat + wakeup_at= wakeup_at+period + until wakeup_at > now + + t2[1]= wakeup_at + end end - end - if wakeup_at and ((not next_wakeup) or (wakeup_at < next_wakeup)) then - next_wakeup= wakeup_at + if wakeup_at and ((not next_wakeup) or (wakeup_at < next_wakeup)) then + next_wakeup= wakeup_at + end end + end -- t2 loop + end -- t1 loop + + return next_wakeup -- may be 'nil' + end -- check_timers() + + local timer_gateway_batched = timer_gateway.batched + set_finalizer( function( err, stk) + if err and type( err) ~= "userdata" then + WR( "LanesTimer error: "..tostring(err)) + --elseif type( err) == "userdata" then + -- WR( "LanesTimer after cancel" ) + --else + -- WR("LanesTimer finalized") + end + end) + while true do + local next_wakeup = check_timers() + + -- Sleep until next timer to wake up, or a set/clear command + -- + local secs + if next_wakeup then + secs = next_wakeup - now_secs() + if secs < 0 then secs = 0 end + end + local key, what = timer_gateway:receive( secs, TGW_KEY, TGW_QUERY) + + if key == TGW_KEY then + assert( getmetatable( what) == "Linda") -- 'what' should be a linda on which the client sets a timer + local _, key, wakeup_at, period = timer_gateway:receive( 0, timer_gateway_batched, TGW_KEY, 3) + assert( key) + set_timer( what, key, wakeup_at, period and period > 0 and period or nil) + elseif key == TGW_QUERY then + if what == "get_timers" then + timer_gateway:send( TGW_REPLY, get_timers()) + else + timer_gateway:send( TGW_REPLY, "unknown query " .. what) end - end -- t2 loop - end -- t1 loop - - return next_wakeup -- may be 'nil' - end -- check_timers() - - local timer_gateway_batched = timer_gateway.batched - set_finalizer( function( err, stk) - if err and type( err) ~= "userdata" then - WR( "LanesTimer error: "..tostring(err)) - --elseif type( err) == "userdata" then - -- WR( "LanesTimer after cancel" ) - --else - -- WR("LanesTimer finalized") + --elseif secs == nil then -- got no value while block-waiting? + -- WR( "timer lane: no linda, aborted?") + end end - end) - while true do - local next_wakeup = check_timers() + end -- timer_body() + timer_lane = gen( "*", { package= {}, priority = max_prio}, timer_body)() -- "*" instead of "io,package" for LuaJIT compatibility... + end -- first_time - -- Sleep until next timer to wake up, or a set/clear command + ----- + -- = timer( linda_h, key_val, date_tbl|first_secs [,period_secs] ) + -- + -- PUBLIC LANES API + timer = function( linda, key, a, period ) + if getmetatable( linda) ~= "Linda" then + error "expecting a Linda" + end + if a == 0.0 then + -- Caller expects to get current time stamp in Linda, on return + -- (like the timer had expired instantly); it would be good to set this + -- as late as possible (to give most current time) but also we want it + -- to precede any possible timers that might start striking. -- - local secs - if next_wakeup then - secs = next_wakeup - now_secs() - if secs < 0 then secs = 0 end - end - local key, what = timer_gateway:receive( secs, TGW_KEY, TGW_QUERY) - - if key == TGW_KEY then - assert( getmetatable( what) == "Linda") -- 'what' should be a linda on which the client sets a timer - local _, key, wakeup_at, period = timer_gateway:receive( 0, timer_gateway_batched, TGW_KEY, 3) - assert( key) - set_timer( what, key, wakeup_at, period and period > 0 and period or nil) - elseif key == TGW_QUERY then - if what == "get_timers" then - timer_gateway:send( TGW_REPLY, get_timers()) - else - timer_gateway:send( TGW_REPLY, "unknown query " .. what) - end - --elseif secs == nil then -- got no value while block-waiting? - -- WR( "timer lane: no linda, aborted?") + linda:set( key, now_secs()) + + if not period or period==0.0 then + timer_gateway:send( TGW_KEY, linda, key, nil, nil ) -- clear the timer + return -- nothing more to do end + a= period end - end -- timer_body() - timer_lane = gen( "*", { package= {}, priority = max_prio}, timer_body)() -- "*" instead of "io,package" for LuaJIT compatibility... - end -- first_time - ----- - -- = timer( linda_h, key_val, date_tbl|first_secs [,period_secs] ) - -- - -- PUBLIC LANES API - timer = function( linda, key, a, period ) - if getmetatable( linda) ~= "Linda" then - error "expecting a Linda" - end - if a == 0.0 then - -- Caller expects to get current time stamp in Linda, on return - -- (like the timer had expired instantly); it would be good to set this - -- as late as possible (to give most current time) but also we want it - -- to precede any possible timers that might start striking. + local wakeup_at= type(a)=="table" and wakeup_conv(a) -- given point of time + or (a and now_secs()+a or nil) + -- queue to timer -- - linda:set( key, core.now_secs()) + timer_gateway:send( TGW_KEY, linda, key, wakeup_at, period ) + end -- timer() - if not period or period==0.0 then - timer_gateway:send( TGW_KEY, linda, key, nil, nil ) -- clear the timer - return -- nothing more to do - end - a= period - end - - local wakeup_at= type(a)=="table" and core.wakeup_conv(a) -- given point of time - or (a and core.now_secs()+a or nil) - -- queue to timer + ----- + -- {[{linda, slot, when, period}[,...]]} = timers() -- - timer_gateway:send( TGW_KEY, linda, key, wakeup_at, period ) - end - - ----- - -- {[{linda, slot, when, period}[,...]]} = timers() - -- - -- PUBLIC LANES API - timers = function() - timer_gateway:send( TGW_QUERY, "get_timers") - local _, r = timer_gateway:receive( TGW_REPLY) - return r - end + -- PUBLIC LANES API + timers = function() + timer_gateway:send( TGW_QUERY, "get_timers") + local _, r = timer_gateway:receive( TGW_REPLY) + return r + end -- timers() end -- settings.with_timers diff --git a/src/lanes_private.h b/src/lanes_private.h index bcc3014..01d43c0 100644 --- a/src/lanes_private.h +++ b/src/lanes_private.h @@ -4,27 +4,26 @@ #include "uniquekey.h" #include "universe.h" +#include +#include #include +#include +#include // NOTE: values to be changed by either thread, during execution, without // locking, are marked "volatile" // class Lane { - private: - - enum class ThreadStatus - { - Normal, // normal master side state - Killed // issued an OS kill - }; - public: - using enum ThreadStatus; - - THREAD_T thread; + // the thread + std::jthread m_thread; + // a latch to wait for the lua_State to be ready std::latch m_ready{ 1 }; + // to wait for stop requests through m_thread's stop_source + std::mutex m_done_mutex; + std::condition_variable m_done_signal; // use condition_variable_any if waiting for a stop_token // // M: sub-thread OS thread // S: not used @@ -42,7 +41,7 @@ class Lane // M: sets to PENDING (before launching) // S: updates -> RUNNING/WAITING -> DONE/ERROR_ST/CANCELLED - SIGNAL_T* volatile waiting_on{ nullptr }; + std::condition_variable* volatile m_waiting_on{ nullptr }; // // When status is WAITING, points on the linda's signal the thread waits on, else nullptr @@ -51,23 +50,6 @@ class Lane // M: sets to false, flags true for cancel request // S: reads to see if cancel is requested -#if THREADWAIT_METHOD == THREADWAIT_CONDVAR - SIGNAL_T done_signal; - // - // M: Waited upon at lane ending (if Posix with no PTHREAD_TIMEDJOIN) - // S: sets the signal once cancellation is noticed (avoids a kill) - - MUTEX_T done_lock; - // - // Lock required by 'done_signal' condition variable, protecting - // lane status changes to DONE/ERROR_ST/CANCELLED. -#endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR - - volatile ThreadStatus mstatus{ Normal }; - // - // M: sets to Normal, if issued a kill changes to Killed - // S: not used - Lane* volatile selfdestruct_next{ nullptr }; // // M: sets to non-nullptr if facing lane handle '__gc' cycle but the lane @@ -88,6 +70,9 @@ class Lane Lane(Universe* U_, lua_State* L_); ~Lane(); + + bool waitForCompletion(lua_Duration duration_); + void startThread(int priority_); }; // xxh64 of string "LANE_POINTER_REGKEY" generated at https://www.pelock.com/products/hash-calculator diff --git a/src/linda.cpp b/src/linda.cpp index 5ee4768..ea1410e 100644 --- a/src/linda.cpp +++ b/src/linda.cpp @@ -61,8 +61,8 @@ class Linda : public DeepPrelude // Deep userdata MUST start with this header public: - SIGNAL_T read_happened; - SIGNAL_T write_happened; + std::condition_variable m_read_happened; + std::condition_variable m_write_happened; Universe* const U; // the universe this linda belongs to uintptr_t const group; // a group to control keeper allocation between lindas CancelRequest simulate_cancel{ CancelRequest::None }; @@ -81,17 +81,11 @@ class Linda : public DeepPrelude // Deep userdata MUST start with this header : U{ U_ } , group{ group_ << KEEPER_MAGIC_SHIFT } { - SIGNAL_INIT(&read_happened); - SIGNAL_INIT(&write_happened); - setName(name_, len_); } ~Linda() { - // There aren't any lanes waiting on these lindas, since all proxies have been gc'ed. Right? - SIGNAL_FREE(&read_happened); - SIGNAL_FREE(&write_happened); if (std::holds_alternative(m_name)) { AllocatedName& name = std::get(m_name); @@ -216,15 +210,19 @@ LUAG_FUNC(linda_protected_call) LUAG_FUNC(linda_send) { Linda* const linda{ lua_toLinda(L, 1) }; - time_d timeout{ -1.0 }; + std::chrono::time_point until{ std::chrono::time_point::max() }; int key_i{ 2 }; // index of first key, if timeout not there if (lua_type(L, 2) == LUA_TNUMBER) // we don't want to use lua_isnumber() because of autocoercion { - timeout = SIGNAL_TIMEOUT_PREPARE(lua_tonumber(L, 2)); + lua_Duration const duration{ lua_tonumber(L, 2) }; + if (duration.count() >= 0.0) + { + until = std::chrono::steady_clock::now() + std::chrono::duration_cast(duration); + } ++key_i; } - else if (lua_isnil(L, 2)) // alternate explicit "no timeout" by passing nil before the key + else if (lua_isnil(L, 2)) // alternate explicit "infinite timeout" by passing nil before the key { ++key_i; } @@ -266,6 +264,7 @@ LUAG_FUNC(linda_send) lua_State* const KL{ K ? K->L : nullptr }; if (KL == nullptr) return 0; + STACK_CHECK_START_REL(KL, 0); for (bool try_again{ true };;) { @@ -295,12 +294,12 @@ LUAG_FUNC(linda_send) if (ret) { // Wake up ALL waiting threads - SIGNAL_ALL(&linda->write_happened); + linda->m_write_happened.notify_all(); break; } // instant timout to bypass the wait syscall - if (timeout == 0.0) + if (std::chrono::steady_clock::now() >= until) { break; /* no wait; instant timeout */ } @@ -314,14 +313,17 @@ LUAG_FUNC(linda_send) prev_status = lane->status; // RUNNING, most likely ASSERT_L(prev_status == RUNNING); // but check, just in case lane->status = WAITING; - ASSERT_L(lane->waiting_on == nullptr); - lane->waiting_on = &linda->read_happened; + ASSERT_L(lane->m_waiting_on == nullptr); + lane->m_waiting_on = &linda->m_read_happened; } // could not send because no room: wait until some data was read before trying again, or until timeout is reached - try_again = SIGNAL_WAIT(&linda->read_happened, &K->keeper_cs, timeout); + std::unique_lock keeper_lock{ K->m_mutex, std::adopt_lock }; + std::cv_status const status{ linda->m_read_happened.wait_until(keeper_lock, until) }; + keeper_lock.release(); // we don't want to release the lock! + try_again = (status == std::cv_status::no_timeout); // detect spurious wakeups if (lane != nullptr) { - lane->waiting_on = nullptr; + lane->m_waiting_on = nullptr; lane->status = prev_status; } } @@ -369,21 +371,24 @@ static constexpr UniqueKey BATCH_SENTINEL{ 0x2DDFEE0968C62AA7ull }; LUAG_FUNC(linda_receive) { Linda* const linda{ lua_toLinda(L, 1) }; - - time_d timeout{ -1.0 }; - int key_i{ 2 }; + std::chrono::time_point until{ std::chrono::time_point::max() }; + int key_i{ 2 }; // index of first key, if timeout not there if (lua_type(L, 2) == LUA_TNUMBER) // we don't want to use lua_isnumber() because of autocoercion { - timeout = SIGNAL_TIMEOUT_PREPARE(lua_tonumber(L, 2)); + lua_Duration const duration{ lua_tonumber(L, 2) }; + if (duration.count() >= 0.0) + { + until = std::chrono::steady_clock::now() + std::chrono::duration_cast(duration); + } ++key_i; } - else if (lua_isnil(L, 2)) // alternate explicit "no timeout" by passing nil before the key + else if (lua_isnil(L, 2)) // alternate explicit "infinite timeout" by passing nil before the key { ++key_i; } - keeper_api_t keeper_receive; + keeper_api_t selected_keeper_receive{ nullptr }; int expected_pushed_min{ 0 }, expected_pushed_max{ 0 }; // are we in batched mode? BATCH_SENTINEL.pushKey(L); @@ -396,7 +401,7 @@ LUAG_FUNC(linda_receive) // make sure the keys are of a valid type check_key_types(L, key_i, key_i); // receive multiple values from a single slot - keeper_receive = KEEPER_API(receive_batched); + selected_keeper_receive = KEEPER_API(receive_batched); // we expect a user-defined amount of return value expected_pushed_min = (int) luaL_checkinteger(L, key_i + 1); expected_pushed_max = (int) luaL_optinteger(L, key_i + 2, expected_pushed_min); @@ -413,17 +418,20 @@ LUAG_FUNC(linda_receive) // make sure the keys are of a valid type check_key_types(L, key_i, lua_gettop(L)); // receive a single value, checking multiple slots - keeper_receive = KEEPER_API(receive); + selected_keeper_receive = KEEPER_API(receive); // we expect a single (value, key) pair of returned values expected_pushed_min = expected_pushed_max = 2; } Lane* const lane{ LANE_POINTER_REGKEY.readLightUserDataValue(L) }; Keeper* const K{ which_keeper(linda->U->keepers, linda->hashSeed()) }; - if (K == nullptr) + lua_State* const KL{ K ? K->L : nullptr }; + if (KL == nullptr) return 0; + CancelRequest cancel{ CancelRequest::None }; int pushed{ 0 }; + STACK_CHECK_START_REL(KL, 0); for (bool try_again{ true };;) { if (lane != nullptr) @@ -439,7 +447,7 @@ LUAG_FUNC(linda_receive) } // all arguments of receive() but the first are passed to the keeper's receive function - pushed = keeper_call(linda->U, K->L, keeper_receive, L, linda, key_i); + pushed = keeper_call(linda->U, KL, selected_keeper_receive, L, linda, key_i); if (pushed < 0) { break; @@ -451,11 +459,11 @@ LUAG_FUNC(linda_receive) keeper_toggle_nil_sentinels(L, lua_gettop(L) - pushed, LookupMode::FromKeeper); // To be done from within the 'K' locking area // - SIGNAL_ALL(&linda->read_happened); + linda->m_read_happened.notify_all(); break; } - if (timeout == 0.0) + if (std::chrono::steady_clock::now() >= until) { break; /* instant timeout */ } @@ -469,18 +477,22 @@ LUAG_FUNC(linda_receive) prev_status = lane->status; // RUNNING, most likely ASSERT_L(prev_status == RUNNING); // but check, just in case lane->status = WAITING; - ASSERT_L(lane->waiting_on == nullptr); - lane->waiting_on = &linda->write_happened; + ASSERT_L(lane->m_waiting_on == nullptr); + lane->m_waiting_on = &linda->m_write_happened; } // not enough data to read: wakeup when data was sent, or when timeout is reached - try_again = SIGNAL_WAIT(&linda->write_happened, &K->keeper_cs, timeout); + std::unique_lock keeper_lock{ K->m_mutex, std::adopt_lock }; + std::cv_status const status{ linda->m_write_happened.wait_until(keeper_lock, until) }; + keeper_lock.release(); // we don't want to release the lock! + try_again = (status == std::cv_status::no_timeout); // detect spurious wakeups if (lane != nullptr) { - lane->waiting_on = nullptr; + lane->m_waiting_on = nullptr; lane->status = prev_status; } } } + STACK_CHECK(KL, 0); if (pushed < 0) { @@ -537,13 +549,13 @@ LUAG_FUNC(linda_set) if (has_value) { // we put some data in the slot, tell readers that they should wake - SIGNAL_ALL(&linda->write_happened); // To be done from within the 'K' locking area + linda->m_write_happened.notify_all(); // To be done from within the 'K' locking area } if (pushed == 1) { // the key was full, but it is no longer the case, tell writers they should wake ASSERT_L(lua_type(L, -1) == LUA_TBOOLEAN && lua_toboolean(L, -1) == 1); - SIGNAL_ALL(&linda->read_happened); // To be done from within the 'K' locking area + linda->m_read_happened.notify_all(); // To be done from within the 'K' locking area } } } @@ -648,7 +660,7 @@ LUAG_FUNC( linda_limit) if( pushed == 1) { ASSERT_L( lua_type( L, -1) == LUA_TBOOLEAN && lua_toboolean( L, -1) == 1); - SIGNAL_ALL( &linda->read_happened); // To be done from within the 'K' locking area + linda->m_read_happened.notify_all(); // To be done from within the 'K' locking area } } else // linda is cancelled @@ -678,8 +690,8 @@ LUAG_FUNC(linda_cancel) linda->simulate_cancel = CancelRequest::Soft; if (strcmp(who, "both") == 0) // tell everyone writers to wake up { - SIGNAL_ALL(&linda->write_happened); - SIGNAL_ALL(&linda->read_happened); + linda->m_write_happened.notify_all(); + linda->m_read_happened.notify_all(); } else if (strcmp(who, "none") == 0) // reset flag { @@ -687,11 +699,11 @@ LUAG_FUNC(linda_cancel) } else if (strcmp(who, "read") == 0) // tell blocked readers to wake up { - SIGNAL_ALL(&linda->write_happened); + linda->m_write_happened.notify_all(); } else if (strcmp(who, "write") == 0) // tell blocked writers to wake up { - SIGNAL_ALL(&linda->read_happened); + linda->m_read_happened.notify_all(); } else { diff --git a/src/macros_and_utils.h b/src/macros_and_utils.h index e29e7fb..997b452 100644 --- a/src/macros_and_utils.h +++ b/src/macros_and_utils.h @@ -11,9 +11,12 @@ extern "C" { #endif // __cplusplus #include +#include #include #include +using namespace std::chrono_literals; + #define USE_DEBUG_SPEW() 0 #if USE_DEBUG_SPEW() extern char const* debugspew_indent; @@ -167,3 +170,5 @@ T* lua_newuserdatauv(lua_State* L, int nuvalue_) std::ignore = lua_error(L); // doesn't return assert(false); // we should never get here, but i'm paranoid } + +using lua_Duration = std::chrono::template duration; diff --git a/src/threading.cpp b/src/threading.cpp index afeb184..fc20931 100644 --- a/src/threading.cpp +++ b/src/threading.cpp @@ -93,9 +93,6 @@ THE SOFTWARE. # pragma warning( disable : 4054 ) #endif -//#define THREAD_CREATE_RETRIES_MAX 20 - // loops (maybe retry forever?) - /* * FAIL is for unexpected API return values - essentially programming * error in _this_ code. @@ -196,36 +193,6 @@ time_d now_secs(void) { } -/* -*/ -time_d SIGNAL_TIMEOUT_PREPARE( double secs ) { - if (secs<=0.0) return secs; - else return now_secs() + secs; -} - - -#if THREADAPI == THREADAPI_PTHREAD -/* -* Prepare 'abs_secs' kind of timeout to 'timespec' format -*/ -static void prepare_timeout( struct timespec *ts, time_d abs_secs ) { - assert(ts); - assert( abs_secs >= 0.0 ); - - if (abs_secs==0.0) - abs_secs= now_secs(); - - ts->tv_sec= (time_t) floor( abs_secs ); - ts->tv_nsec= ((long)((abs_secs - ts->tv_sec) * 1000.0 +0.5)) * 1000000UL; // 1ms = 1000000ns - if (ts->tv_nsec == 1000000000UL) - { - ts->tv_nsec = 0; - ts->tv_sec = ts->tv_sec + 1; - } -} -#endif // THREADAPI == THREADAPI_PTHREAD - - /*---=== Threading ===---*/ //--- @@ -268,30 +235,6 @@ static void prepare_timeout( struct timespec *ts, time_d abs_secs ) { #if THREADAPI == THREADAPI_WINDOWS -#if _WIN32_WINNT < 0x0600 // CONDITION_VARIABLE aren't available - // - void MUTEX_INIT( MUTEX_T *ref ) { - *ref= CreateMutex( nullptr /*security attr*/, false /*not locked*/, nullptr ); - if (!ref) FAIL( "CreateMutex", GetLastError() ); - } - void MUTEX_FREE( MUTEX_T *ref ) { - if (!CloseHandle(*ref)) FAIL( "CloseHandle (mutex)", GetLastError() ); - *ref= nullptr; - } - void MUTEX_LOCK( MUTEX_T *ref ) - { - DWORD rc = WaitForSingleObject( *ref, INFINITE); - // ERROR_WAIT_NO_CHILDREN means a thread was killed (lane terminated because of error raised during a linda transfer for example) while having grabbed this mutex - // this is not a big problem as we will grab it just the same, so ignore this particular error - if( rc != 0 && rc != ERROR_WAIT_NO_CHILDREN) - FAIL( "WaitForSingleObject", (rc == WAIT_FAILED) ? GetLastError() : rc); - } - void MUTEX_UNLOCK( MUTEX_T *ref ) { - if (!ReleaseMutex(*ref)) - FAIL( "ReleaseMutex", GetLastError() ); - } -#endif // CONDITION_VARIABLE aren't available - static int const gs_prio_remap[] = { THREAD_PRIORITY_IDLE, @@ -303,37 +246,7 @@ static int const gs_prio_remap[] = THREAD_PRIORITY_TIME_CRITICAL }; -/* MSDN: "If you would like to use the CRT in ThreadProc, use the -_beginthreadex function instead (of CreateThread)." -MSDN: "you can create at most 2028 threads" -*/ -// Note: Visual C++ requires '__stdcall' where it is -void THREAD_CREATE( THREAD_T* ref, THREAD_RETURN_T (__stdcall *func)( void*), void* data, int prio /* -3..+3 */) -{ - HANDLE h = (HANDLE) _beginthreadex(nullptr, // security - _THREAD_STACK_SIZE, - func, - data, - 0, // flags (0/CREATE_SUSPENDED) - nullptr // thread id (not used) - ); - - if (h == nullptr) // _beginthreadex returns 0L on failure instead of -1L (like _beginthread) - { - FAIL( "CreateThread", GetLastError()); - } - - if (prio != THREAD_PRIO_DEFAULT) - { - if (!SetThreadPriority( h, gs_prio_remap[prio + 3])) - { - FAIL( "SetThreadPriority", GetLastError()); - } - } - - *ref = h; -} - +// ############################################################################################### void THREAD_SET_PRIORITY( int prio) { @@ -344,42 +257,26 @@ void THREAD_SET_PRIORITY( int prio) } } -void THREAD_SET_AFFINITY( unsigned int aff) +// ############################################################################################### + +void JTHREAD_SET_PRIORITY(std::jthread& thread_, int prio_) { - if( !SetThreadAffinityMask( GetCurrentThread(), aff)) + // prio range [-3,+3] was checked by the caller + if (!SetThreadPriority(thread_.native_handle(), gs_prio_remap[prio_ + 3])) { - FAIL( "THREAD_SET_AFFINITY", GetLastError()); + FAIL("JTHREAD_SET_PRIORITY", GetLastError()); } } -bool THREAD_WAIT_IMPL( THREAD_T *ref, double secs) -{ - DWORD ms = (secs<0.0) ? INFINITE : (DWORD)((secs*1000.0)+0.5); +// ############################################################################################### - DWORD rc= WaitForSingleObject( *ref, ms /*timeout*/ ); - // - // (WAIT_ABANDONED) - // WAIT_OBJECT_0 success (0) - // WAIT_TIMEOUT - // WAIT_FAILED more info via GetLastError() - - if (rc == WAIT_TIMEOUT) return false; - if( rc !=0) FAIL( "WaitForSingleObject", rc==WAIT_FAILED ? GetLastError() : rc); - *ref = nullptr; // thread no longer usable - return true; - } - // - void THREAD_KILL( THREAD_T *ref ) +void THREAD_SET_AFFINITY(unsigned int aff) +{ + if( !SetThreadAffinityMask( GetCurrentThread(), aff)) { - // nonexistent on Xbox360, simply disable until a better solution is found - #if !defined( PLATFORM_XBOX) - // in theory no-one should call this as it is very dangerous (memory and mutex leaks, no notification of DLLs, etc.) - if (!TerminateThread( *ref, 0 )) FAIL("TerminateThread", GetLastError()); - #endif // PLATFORM_XBOX - *ref = nullptr; + FAIL( "THREAD_SET_AFFINITY", GetLastError()); } - - void THREAD_MAKE_ASYNCH_CANCELLABLE() {} // nothing to do for windows threads, we can cancel them anytime we want +} #if !defined __GNUC__ //see http://msdn.microsoft.com/en-us/library/xcb2z8hs.aspx @@ -414,158 +311,6 @@ bool THREAD_WAIT_IMPL( THREAD_T *ref, double secs) #endif // !__GNUC__ } -#if _WIN32_WINNT < 0x0600 // CONDITION_VARIABLE aren't available - - void SIGNAL_INIT( SIGNAL_T* ref) - { - InitializeCriticalSection( &ref->signalCS); - InitializeCriticalSection( &ref->countCS); - if( 0 == (ref->waitEvent = CreateEvent( 0, true, false, 0))) // manual-reset - FAIL( "CreateEvent", GetLastError()); - if( 0 == (ref->waitDoneEvent = CreateEvent( 0, false, false, 0))) // auto-reset - FAIL( "CreateEvent", GetLastError()); - ref->waitersCount = 0; - } - - void SIGNAL_FREE( SIGNAL_T* ref) - { - CloseHandle( ref->waitDoneEvent); - CloseHandle( ref->waitEvent); - DeleteCriticalSection( &ref->countCS); - DeleteCriticalSection( &ref->signalCS); - } - - bool SIGNAL_WAIT( SIGNAL_T* ref, MUTEX_T* mu_ref, time_d abs_secs) - { - DWORD errc; - DWORD ms; - - if( abs_secs < 0.0) - ms = INFINITE; - else if( abs_secs == 0.0) - ms = 0; - else - { - time_d msd = (abs_secs - now_secs()) * 1000.0 + 0.5; - // If the time already passed, still try once (ms==0). A short timeout - // may have turned negative or 0 because of the two time samples done. - ms = msd <= 0.0 ? 0 : (DWORD)msd; - } - - EnterCriticalSection( &ref->signalCS); - EnterCriticalSection( &ref->countCS); - ++ ref->waitersCount; - LeaveCriticalSection( &ref->countCS); - LeaveCriticalSection( &ref->signalCS); - - errc = SignalObjectAndWait( *mu_ref, ref->waitEvent, ms, false); - - EnterCriticalSection( &ref->countCS); - if( 0 == -- ref->waitersCount) - { - // we're the last one leaving... - ResetEvent( ref->waitEvent); - SetEvent( ref->waitDoneEvent); - } - LeaveCriticalSection( &ref->countCS); - MUTEX_LOCK( mu_ref); - - switch( errc) - { - case WAIT_TIMEOUT: - return false; - case WAIT_OBJECT_0: - return true; - } - - FAIL( "SignalObjectAndWait", GetLastError()); - return false; - } - - void SIGNAL_ALL( SIGNAL_T* ref) - { - DWORD errc = WAIT_OBJECT_0; - - EnterCriticalSection( &ref->signalCS); - EnterCriticalSection( &ref->countCS); - - if( ref->waitersCount > 0) - { - ResetEvent( ref->waitDoneEvent); - SetEvent( ref->waitEvent); - LeaveCriticalSection( &ref->countCS); - errc = WaitForSingleObject( ref->waitDoneEvent, INFINITE); - } - else - { - LeaveCriticalSection( &ref->countCS); - } - - LeaveCriticalSection( &ref->signalCS); - - if( WAIT_OBJECT_0 != errc) - FAIL( "WaitForSingleObject", GetLastError()); - } - -#else // CONDITION_VARIABLE are available, use them - - // - void SIGNAL_INIT( SIGNAL_T *ref ) - { - InitializeConditionVariable( ref); - } - - void SIGNAL_FREE( SIGNAL_T *ref ) - { - // nothing to do - (void)ref; - } - - bool SIGNAL_WAIT( SIGNAL_T *ref, MUTEX_T *mu_ref, time_d abs_secs) - { - long ms; - - if( abs_secs < 0.0) - ms = INFINITE; - else if( abs_secs == 0.0) - ms = 0; - else - { - ms = (long) ((abs_secs - now_secs())*1000.0 + 0.5); - - // If the time already passed, still try once (ms==0). A short timeout - // may have turned negative or 0 because of the two time samples done. - // - if( ms < 0) - ms = 0; - } - - if( !SleepConditionVariableCS( ref, mu_ref, ms)) - { - if( GetLastError() == ERROR_TIMEOUT) - { - return false; - } - else - { - FAIL( "SleepConditionVariableCS", GetLastError()); - } - } - return true; - } - - void SIGNAL_ONE( SIGNAL_T *ref ) - { - WakeConditionVariable( ref); - } - - void SIGNAL_ALL( SIGNAL_T *ref ) - { - WakeAllConditionVariable( ref); - } - -#endif // CONDITION_VARIABLE are available - #else // THREADAPI == THREADAPI_PTHREAD // PThread (Linux, OS X, ...) // @@ -607,44 +352,6 @@ bool THREAD_WAIT_IMPL( THREAD_T *ref, double secs) abort(); } #define PT_CALL( call ) { int rc= call; if (rc!=0) _PT_FAIL( rc, #call, __FILE__, __LINE__ ); } - // - void SIGNAL_INIT( SIGNAL_T *ref ) { - PT_CALL(pthread_cond_init(ref, nullptr /*attr*/)); - } - void SIGNAL_FREE( SIGNAL_T *ref ) { - PT_CALL( pthread_cond_destroy(ref) ); - } - // - /* - * Timeout is given as absolute since we may have fake wakeups during - * a timed out sleep. A Linda with some other key read, or just because - * PThread cond vars can wake up unwantedly. - */ - bool SIGNAL_WAIT( SIGNAL_T *ref, pthread_mutex_t *mu, time_d abs_secs ) { - if (abs_secs<0.0) { - PT_CALL( pthread_cond_wait( ref, mu ) ); // infinite - } else { - int rc; - struct timespec ts; - - assert( abs_secs != 0.0 ); - prepare_timeout( &ts, abs_secs ); - - rc= pthread_cond_timedwait( ref, mu, &ts ); - - if (rc==ETIMEDOUT) return false; - if (rc) { _PT_FAIL( rc, "pthread_cond_timedwait()", __FILE__, __LINE__ ); } - } - return true; - } - // - void SIGNAL_ONE( SIGNAL_T *ref ) { - PT_CALL( pthread_cond_signal(ref) ); // wake up ONE (or no) waiting thread - } - // - void SIGNAL_ALL( SIGNAL_T *ref ) { - PT_CALL( pthread_cond_broadcast(ref) ); // wake up ALL waiting threads - } // array of 7 thread priority values, hand-tuned by platform so that we offer a uniform [-3,+3] public priority range static int const gs_prio_remap[] = @@ -775,129 +482,36 @@ static int select_prio(int prio /* -3..+3 */) return gs_prio_remap[prio + 3]; } -void THREAD_CREATE( THREAD_T* ref, THREAD_RETURN_T (*func)( void*), void* data, int prio /* -3..+3 */) +void THREAD_SET_PRIORITY( int prio) { - pthread_attr_t a; - bool const change_priority = #ifdef PLATFORM_LINUX - sudo && // only root-privileged process can change priorities -#endif - (prio != THREAD_PRIO_DEFAULT); - - PT_CALL( pthread_attr_init( &a)); - -#ifndef PTHREAD_TIMEDJOIN - // We create a NON-JOINABLE thread. This is mainly due to the lack of - // 'pthread_timedjoin()', but does offer other benefits (s.a. earlier - // freeing of the thread's resources). - // - PT_CALL( pthread_attr_setdetachstate( &a, PTHREAD_CREATE_DETACHED)); -#endif // PTHREAD_TIMEDJOIN - - // Use this to find a system's default stack size (DEBUG) -#if 0 - { - size_t n; - pthread_attr_getstacksize( &a, &n); - fprintf( stderr, "Getstack: %u\n", (unsigned int)n); - } - // 524288 on OS X - // 2097152 on Linux x86 (Ubuntu 7.04) - // 1048576 on FreeBSD 6.2 SMP i386 -#endif // 0 - -#if defined _THREAD_STACK_SIZE && _THREAD_STACK_SIZE > 0 - PT_CALL( pthread_attr_setstacksize( &a, _THREAD_STACK_SIZE)); -#endif - - if (change_priority) + if( sudo) // only root-privileged process can change priorities +#endif // PLATFORM_LINUX { struct sched_param sp; - // "The specified scheduling parameters are only used if the scheduling - // parameter inheritance attribute is PTHREAD_EXPLICIT_SCHED." - // -#if !defined __ANDROID__ || ( defined __ANDROID__ && __ANDROID_API__ >= 28 ) - PT_CALL( pthread_attr_setinheritsched( &a, PTHREAD_EXPLICIT_SCHED)); -#endif - -#ifdef _PRIO_SCOPE - PT_CALL( pthread_attr_setscope( &a, _PRIO_SCOPE)); -#endif // _PRIO_SCOPE - - PT_CALL( pthread_attr_setschedpolicy( &a, _PRIO_MODE)); - - sp.sched_priority = select_prio(prio); - PT_CALL( pthread_attr_setschedparam( &a, &sp)); - } - - //--- - // Seems on OS X, _POSIX_THREAD_THREADS_MAX is some kind of system - // thread limit (not userland thread). Actual limit for us is way higher. - // PTHREAD_THREADS_MAX is not defined (even though man page refers to it!) - // -# ifndef THREAD_CREATE_RETRIES_MAX - // Don't bother with retries; a failure is a failure - // - { - int rc = pthread_create( ref, &a, func, data); - if( rc) _PT_FAIL( rc, "pthread_create()", __FILE__, __LINE__ - 1); + // prio range [-3,+3] was checked by the caller + sp.sched_priority = gs_prio_remap[ prio + 3]; + PT_CALL( pthread_setschedparam( pthread_self(), _PRIO_MODE, &sp)); } -# else -# error "This code deprecated" - /* - // Wait slightly if thread creation has exchausted the system - // - { int retries; - for( retries=0; retries(thread_.native_handle()), _PRIO_MODE, &sp)); } } +// ################################################################################################# + void THREAD_SET_AFFINITY( unsigned int aff) { int bit = 0; @@ -929,93 +543,6 @@ void THREAD_SET_AFFINITY( unsigned int aff) #endif } - /* - * Wait for a thread to finish. - * - * 'mu_ref' is a lock we should use for the waiting; initially unlocked. - * Same lock as passed to THREAD_EXIT. - * - * Returns true for successful wait, false for timed out - */ -bool THREAD_WAIT( THREAD_T *ref, double secs , SIGNAL_T *signal_ref, MUTEX_T *mu_ref, volatile enum e_status *st_ref) -{ - struct timespec ts_store; - const struct timespec* timeout = nullptr; - bool done; - - // Do timeout counting before the locks - // -#if THREADWAIT_METHOD == THREADWAIT_TIMEOUT - if (secs>=0.0) -#else // THREADWAIT_METHOD == THREADWAIT_CONDVAR - if (secs>0.0) -#endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR - { - prepare_timeout( &ts_store, now_secs()+secs ); - timeout= &ts_store; - } - -#if THREADWAIT_METHOD == THREADWAIT_TIMEOUT - /* Thread is joinable - */ - if (!timeout) { - PT_CALL(pthread_join(*ref, nullptr /*ignore exit value*/)); - done = true; - } else { - int rc = PTHREAD_TIMEDJOIN(*ref, nullptr, timeout); - if ((rc!=0) && (rc!=ETIMEDOUT)) { - _PT_FAIL( rc, "PTHREAD_TIMEDJOIN", __FILE__, __LINE__-2 ); - } - done= rc==0; - } -#else // THREADWAIT_METHOD == THREADWAIT_CONDVAR - /* Since we've set the thread up as PTHREAD_CREATE_DETACHED, we cannot - * join with it. Use the cond.var. - */ - (void) ref; // unused - MUTEX_LOCK( mu_ref ); - - // 'secs'==0.0 does not need to wait, just take the current status - // within the 'mu_ref' locks - // - if (secs != 0.0) { - while( *st_ref < DONE ) { - if (!timeout) { - PT_CALL( pthread_cond_wait( signal_ref, mu_ref )); - } else { - int rc= pthread_cond_timedwait( signal_ref, mu_ref, timeout ); - if (rc==ETIMEDOUT) break; - if (rc!=0) _PT_FAIL( rc, "pthread_cond_timedwait", __FILE__, __LINE__-2 ); - } - } - } - done= *st_ref >= DONE; // DONE|ERROR_ST|CANCELLED - - MUTEX_UNLOCK( mu_ref ); -#endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR - return done; - } - // - void THREAD_KILL( THREAD_T *ref ) { -#ifdef __ANDROID__ - __android_log_print(ANDROID_LOG_WARN, LOG_TAG, "Cannot kill thread!"); -#else - pthread_cancel( *ref ); -#endif - } - - void THREAD_MAKE_ASYNCH_CANCELLABLE() - { -#ifdef __ANDROID__ - __android_log_print(ANDROID_LOG_WARN, LOG_TAG, "Cannot make thread async cancellable!"); -#else - // that's the default, but just in case... - pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, nullptr); - // we want cancellation to take effect immediately if possible, instead of waiting for a cancellation point (which is the default) - pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, nullptr); -#endif - } - void THREAD_SETNAME( char const* _name) { // exact API to set the thread name is platform-dependant diff --git a/src/threading.h b/src/threading.h index 38a021f..e9f302a 100644 --- a/src/threading.h +++ b/src/threading.h @@ -1,13 +1,9 @@ #pragma once -/* - * win32-pthread: - * define HAVE_WIN32_PTHREAD and PTW32_INCLUDE_WINDOWS_H in your project configuration when building for win32-pthread. - * link against pthreadVC2.lib, and of course have pthreadVC2.dll somewhere in your path. - */ #include "platform.h" #include +#include /* Note: ERROR is a defined entity on Win32 PENDING: The Lua VM hasn't done anything yet. @@ -19,7 +15,7 @@ enum e_status { PENDING, RUNNING, WAITING, DONE, ERROR_ST, CANCELLED }; #define THREADAPI_WINDOWS 1 #define THREADAPI_PTHREAD 2 -#if( defined( PLATFORM_XBOX) || defined( PLATFORM_WIN32) || defined( PLATFORM_POCKETPC)) && !defined( HAVE_WIN32_PTHREAD) +#if( defined( PLATFORM_XBOX) || defined( PLATFORM_WIN32) || defined( PLATFORM_POCKETPC)) //#pragma message ( "THREADAPI_WINDOWS" ) #define THREADAPI THREADAPI_WINDOWS #else // (defined PLATFORM_WIN32) || (defined PLATFORM_POCKETPC) @@ -68,16 +64,9 @@ enum e_status { PENDING, RUNNING, WAITING, DONE, ERROR_ST, CANCELLED }; }; - #define MUTEX_T HANDLE - void MUTEX_INIT( MUTEX_T* ref); - void MUTEX_FREE( MUTEX_T* ref); - void MUTEX_LOCK( MUTEX_T* ref); - void MUTEX_UNLOCK( MUTEX_T* ref); - #else // CONDITION_VARIABLE are available, use them #define SIGNAL_T CONDITION_VARIABLE - #define MUTEX_T CRITICAL_SECTION #define MUTEX_INIT( ref) InitializeCriticalSection( ref) #define MUTEX_FREE( ref) DeleteCriticalSection( ref) #define MUTEX_LOCK( ref) EnterCriticalSection( ref) @@ -111,7 +100,6 @@ enum e_status { PENDING, RUNNING, WAITING, DONE, ERROR_ST, CANCELLED }; # define _MUTEX_RECURSIVE PTHREAD_MUTEX_RECURSIVE #endif - #define MUTEX_T pthread_mutex_t #define MUTEX_INIT(ref) pthread_mutex_init(ref, nullptr) #define MUTEX_RECURSIVE_INIT(ref) \ { pthread_mutexattr_t a; pthread_mutexattr_init( &a ); \ @@ -126,8 +114,6 @@ enum e_status { PENDING, RUNNING, WAITING, DONE, ERROR_ST, CANCELLED }; using SIGNAL_T = pthread_cond_t; - void SIGNAL_ONE( SIGNAL_T *ref ); - // Yield is non-portable: // // OS X 10.4.8/9 has pthread_yield_np() @@ -143,10 +129,6 @@ enum e_status { PENDING, RUNNING, WAITING, DONE, ERROR_ST, CANCELLED }; #define THREAD_CALLCONV #endif //THREADAPI == THREADAPI_PTHREAD -void SIGNAL_INIT( SIGNAL_T *ref ); -void SIGNAL_FREE( SIGNAL_T *ref ); -void SIGNAL_ALL( SIGNAL_T *ref ); - /* * 'time_d': <0.0 for no timeout * 0.0 for instant check @@ -155,11 +137,6 @@ void SIGNAL_ALL( SIGNAL_T *ref ); using time_d = double; time_d now_secs(void); -time_d SIGNAL_TIMEOUT_PREPARE( double rel_secs ); - -bool SIGNAL_WAIT( SIGNAL_T *ref, MUTEX_T *mu, time_d timeout ); - - /*---=== Threading ===--- */ @@ -167,16 +144,9 @@ bool SIGNAL_WAIT( SIGNAL_T *ref, MUTEX_T *mu, time_d timeout ); #if THREADAPI == THREADAPI_WINDOWS - using THREAD_T = HANDLE; -# define THREAD_ISNULL( _h) (_h == 0) - void THREAD_CREATE( THREAD_T* ref, THREAD_RETURN_T (__stdcall *func)( void*), void* data, int prio /* -3..+3 */); - # define THREAD_PRIO_MIN (-3) # define THREAD_PRIO_MAX (+3) -# define THREAD_CLEANUP_PUSH( cb_, val_) -# define THREAD_CLEANUP_POP( execute_) - #else // THREADAPI == THREADAPI_PTHREAD /* Platforms that have a timed 'pthread_join()' can get away with a simpler @@ -195,11 +165,6 @@ bool SIGNAL_WAIT( SIGNAL_T *ref, MUTEX_T *mu, time_d timeout ); # endif # endif - using THREAD_T = pthread_t; -# define THREAD_ISNULL( _h) 0 // pthread_t may be a structure: never 'null' by itself - - void THREAD_CREATE( THREAD_T* ref, THREAD_RETURN_T (*func)( void*), void* data, int prio /* -3..+3 */); - # if defined(PLATFORM_LINUX) extern volatile bool sudo; # ifdef LINUX_SCHED_RR @@ -213,13 +178,6 @@ bool SIGNAL_WAIT( SIGNAL_T *ref, MUTEX_T *mu, time_d timeout ); # define THREAD_PRIO_MAX (+3) # endif -# if THREADWAIT_METHOD == THREADWAIT_CONDVAR -# define THREAD_CLEANUP_PUSH( cb_, val_) pthread_cleanup_push( cb_, val_) -# define THREAD_CLEANUP_POP( execute_) pthread_cleanup_pop( execute_) -# else -# define THREAD_CLEANUP_PUSH( cb_, val_) { -# define THREAD_CLEANUP_POP( execute_) } -# endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR #endif // THREADAPI == THREADAPI_WINDOWS /* @@ -236,16 +194,8 @@ bool SIGNAL_WAIT( SIGNAL_T *ref, MUTEX_T *mu, time_d timeout ); #endif // THREADAPI == THREADAPI_WINDOWS || (defined PTHREAD_TIMEDJOIN) -#if THREADWAIT_METHOD == THREADWAIT_TIMEOUT -bool THREAD_WAIT_IMPL( THREAD_T *ref, double secs); -#define THREAD_WAIT( a, b, c, d, e) THREAD_WAIT_IMPL( a, b) -#else // THREADWAIT_METHOD == THREADWAIT_CONDVAR -bool THREAD_WAIT_IMPL( THREAD_T *ref, double secs, SIGNAL_T *signal_ref, MUTEX_T *mu_ref, volatile enum e_status *st_ref); -#define THREAD_WAIT THREAD_WAIT_IMPL -#endif // // THREADWAIT_METHOD == THREADWAIT_CONDVAR - -void THREAD_KILL( THREAD_T* ref); void THREAD_SETNAME( char const* _name); -void THREAD_MAKE_ASYNCH_CANCELLABLE(); void THREAD_SET_PRIORITY( int prio); void THREAD_SET_AFFINITY( unsigned int aff); + +void JTHREAD_SET_PRIORITY(std::jthread& thread_, int prio_); -- cgit v1.2.3-55-g6feb