From 72d7b36e020fd3f11ec002c110e7340f667d6628 Mon Sep 17 00:00:00 2001 From: Benoit Germain Date: Thu, 3 Jul 2025 18:11:13 +0200 Subject: Fix more issues related to suspended coroutines and join/indexing operations * indexing and joining a suspended lane closes the coroutine, causing the termination of the lane * properly close coroutine to-be-closed variables on interruption --- src/lane.cpp | 275 ++++++++++++++++++++++++++++++++++------------------------- 1 file changed, 157 insertions(+), 118 deletions(-) (limited to 'src/lane.cpp') diff --git a/src/lane.cpp b/src/lane.cpp index 5ec6bcf..33ee8a0 100644 --- a/src/lane.cpp +++ b/src/lane.cpp @@ -130,93 +130,18 @@ static LUAG_FUNC(lane_join) } lua_settop(L_, 1); // L_: lane - // wait until suspended or done - bool const _done{ !_lane->thread.joinable() || _lane->waitForCompletion(_until, true) }; - - if (!_done) { - lua_pushnil(L_); // L_: lane nil - luaG_pushstring(L_, "timeout"); // L_: lane nil "timeout" + STACK_CHECK_START_REL(L_, 0); // L_: lane + if (!_lane->waitForJoin(L_, _until)) { + // in that case, should have pushed nil, "timeout" + STACK_CHECK(L_, 2); return 2; } + STACK_CHECK(L_, 0); // L_: lane + // Thread is Done/Error/Cancelled; the Lane thread isn't working with it, therefore we can. - // if lane is suspended, force a cancellation that will cause the yield loop to break, and the termination of the thread - if (_lane->status.load(std::memory_order_acquire) == Lane::Suspended) { - auto const _cr{ _lane->cancel(CancelOp{ CancelRequest::Soft, LuaHookMask::None }, std::chrono::time_point::max(), WakeLane::Yes, 0) }; - if (_cr == CancelResult::Timeout) { - lua_pushnil(L_); // L_: lane nil - luaG_pushstring(L_, "timeout"); // L_: lane nil "timeout" - return 2; - } - if (_lane->status.load(std::memory_order_acquire) == Lane::Suspended) { - raise_luaL_error(L_, "INTERNAL ERROR: Lane should not be suspended here"); - } - } - - STACK_CHECK_START_REL(L_, 0); // L_: lane - // Thread is Suspended or Done/Error/Cancelled; the Lane thread isn't working with it, therefore we can. - - int _ret{ 0 }; - int const _stored{ _lane->storeResults(L_) }; - STACK_GROW(L_, std::max(3, _stored + 1)); - switch (_lane->status.load(std::memory_order_acquire)) { - case Lane::Suspended: - raise_luaL_error(L_, "INTERNAL ERROR: SHOULD NEVER BE SUSPENDED HERE"); - break; - - case Lane::Done: // got regular return values - if (_stored > 0) { - lua_getiuservalue(L_, StackIndex{ 1 }, UserValueIndex{ 1 }); // L_: lane {uv} - for (int _i = 2; _i <= _stored; ++_i) { - lua_rawgeti(L_, 2, _i); // L_: lane {uv} results2...N - } - lua_rawgeti(L_, 2, 1); // L_: lane {uv} results2...N result1 - lua_replace(L_, 2); // L_: lane results - } - // we precede the lane body returned values with boolean true - lua_pushboolean(L_, 1); // L_: lane results true - lua_replace(L_, 1); // L_: true results - _ret = _stored + 1; - STACK_CHECK(L_, _stored); - break; - - case Lane::Error: - { - LUA_ASSERT(L_, _stored == 2 || _stored == 3); - lua_getiuservalue(L_, StackIndex{ 1 }, UserValueIndex{ 1 }); // L_: lane {uv} - lua_rawgeti(L_, 2, 2); // L_: lane {uv} - lua_rawgeti(L_, 2, 3); // L_: lane {uv} |nil - if (lua_isnil(L_, -1)) { - lua_replace(L_, 2); // L_: lane nil - } else { - lua_rawgeti(L_, 2, 1); // L_: lane {uv} nil - lua_replace(L_, 2); // L_: lane nil - } - _ret = lua_gettop(L_) - 1; // 2 or 3 - STACK_CHECK(L_, _ret); - } - break; - - case Lane::Cancelled: - { - LUA_ASSERT(L_, _stored == 2); - lua_getiuservalue(L_, StackIndex{ 1 }, UserValueIndex{ 1 }); // L_: lane {uv} - lua_rawgeti(L_, 2, 2); // L_: lane {uv} cancel_error - lua_rawgeti(L_, 2, 1); // L_: lane {uv} cancel_error nil - lua_replace(L_, -3); // L_: lane nil cancel_error - LUA_ASSERT(L_, lua_isnil(L_, -2) && kCancelError.equals(L_, kIdxTop)); - _ret = 2; - STACK_CHECK(L_, _ret); - } - break; - - default: - DEBUGSPEW_CODE(DebugSpew(nullptr) << "Unknown Lane status: " << static_cast(_lane->status.load(std::memory_order_relaxed)) << std::endl); - LUA_ASSERT(L_, false); - _ret = 0; - STACK_CHECK(L_, _ret); - } - LUA_ASSERT(L_, lua_gettop(L_) >= _ret); + std::ignore = _lane->storeResults(L_); + int const _ret{ _lane->pushStoredResults(L_) }; return _ret; } @@ -277,13 +202,17 @@ static int lane_index_number(lua_State* L_) int const _key{ static_cast(lua_tointeger(L_, 2)) }; lua_pop(L_, 1); // L_: lane - // wait until the lane finishes or is suspended + // wait until suspended or done + STACK_CHECK_START_REL(L_, 0); // L_: lane std::chrono::time_point _until{ std::chrono::time_point::max() }; - if (!_lane->waitForCompletion(_until, true)) { - raise_luaL_error(L_, "INTERNAL ERROR: Failed to join"); + if (!_lane->waitForJoin(L_, _until)) { + // in that case, should have pushed nil, "timeout" + STACK_CHECK(L_, 2); + return 2; } + STACK_CHECK(L_, 0); // L_: lane + // Thread is Done/Error/Cancelled; the Lane thread isn't working with it, therefore we can. - // make sure results are stored int const _stored{ _lane->storeResults(L_) }; if (_key > _stored) { // get nil if indexing beyond the actual returned value count @@ -741,7 +670,7 @@ static void lane_main(Lane* const lane_) lane_->nresults = lua_gettop(_L) - _errorHandlerCount; } else { // S and L are different: we run as a coroutine in Lua thread L created in state S - bool _again{ true }; + bool _shouldClose{ false }; do { // starting with Lua 5.4, lua_resume can leave more stuff on the stack below the actual yielded values. // that's why we have lane_->nresults @@ -754,19 +683,25 @@ static void lane_main(Lane* const lane_) lane_->status.store(Lane::Suspended, std::memory_order_release); // Running -> Suspended lane_->doneCondVar.notify_one(); // wait until the user wants us to resume - // update waiting_on, so that the lane can be woken by cancellation requests here? + // update waiting_on, so that the lane can be woken by cancellation requests here lane_->waiting_on = &lane_->doneCondVar; - lane_->doneCondVar.wait(_guard, [lane_]() { return lane_->status.load(std::memory_order_acquire) == Lane::Resuming || lane_->cancelRequest.load(std::memory_order_relaxed) != CancelRequest::None; }); + lane_->doneCondVar.wait(_guard, + [lane_,&_shouldClose]() + { + auto const _status{ lane_->status.load(std::memory_order_acquire) }; + // wait interrupted because of a cancellation or join request means we have to abort the resume loop + _shouldClose = (_status == Lane::Closing); + return _shouldClose || (_status == Lane::Resuming) || (lane_->cancelRequest.load(std::memory_order_relaxed) != CancelRequest::None); + } + ); // here lane_->doneMutex is locked again lane_->waiting_on = nullptr; lane_->status.store(Lane::Running, std::memory_order_release); // Resuming -> Running } - // wait was interrupted because of a cancellation, finish the lane - _again = (lane_->cancelRequest.load(std::memory_order_relaxed) == CancelRequest::None); } else { - _again = false; + _shouldClose = true; } - } while (_again); + } while (!_shouldClose); if (_rc == LuaError::YIELD) { #if LUA_VERSION_NUM >= 504 lua_State* const _S{ lane_->S }; @@ -855,7 +790,7 @@ static void lane_main(Lane* const lane_) #if LUA_VERSION_NUM >= 504 // __close(lane_ud, ) -static LUAG_FUNC(lane___close) +static LUAG_FUNC(lane_close) { [[maybe_unused]] Lane* const _lane{ ToLane(L_, StackIndex{ 1 }) }; // L_: lane err|nil // drop the error if any @@ -869,15 +804,6 @@ static LUAG_FUNC(lane___close) return lua_gettop(L_); } -// ################################################################################################# - -// close(lane_ud) -static LUAG_FUNC(lane_close) -{ - [[maybe_unused]] Lane* const _lane{ ToLane(L_, StackIndex{ 1 }) }; // L_: lane err|nil - raise_luaL_error(L_, "not implemented"); // TODO! - return 0; -} #endif // LUA_VERSION_NUM >= 504 // ################################################################################################# @@ -1069,14 +995,11 @@ namespace { namespace local { static struct luaL_Reg const sLaneFunctions[] = { #if LUA_VERSION_NUM >= 504 - { "__close", LG_lane___close }, + { "__close", LG_lane_close }, #endif // LUA_VERSION_NUM >= 504 { "__gc", LG_lane_gc }, { "__index", LG_lane_index }, { "cancel", LG_lane_cancel }, -#if LUA_VERSION_NUM >= 504 - { "close", LG_lane_close }, -#endif // LUA_VERSION_NUM >= 504 { "get_threadname", LG_lane_get_threadname }, { "join", LG_lane_join }, { "resume", LG_lane_resume }, @@ -1170,6 +1093,82 @@ void Lane::pushIndexedResult(lua_State* const L_, int const key_) const // ################################################################################################# +[[nodiscard]] +int Lane::pushStoredResults(lua_State* const L_) const +{ + STACK_CHECK_START_ABS(L_, 1); // should only have the lane UD on the stack + static constexpr StackIndex kIdxSelf{ 1 }; + static constexpr UserValueIndex kUvResults{ 1 }; + LUA_ASSERT(L_, ToLane(L_, kIdxSelf) == this); // L_: lane + lua_getiuservalue(L_, kIdxSelf, kUvResults); // L_: lane {uv} + lua_rawgeti(L_, kIdxTop, 0); // L_: lane {uv} stored + int const _stored{ static_cast(lua_tointeger(L_, kIdxTop)) }; + lua_pop(L_, 1); // L_: lane {uv} + + int _ret{}; + STACK_GROW(L_, std::max(3, _stored + 1)); + switch (status.load(std::memory_order_acquire)) { + case Lane::Suspended: + raise_luaL_error(L_, "INTERNAL ERROR: SHOULD NEVER BE SUSPENDED HERE"); + break; + + case Lane::Done: // got regular return values + if (_stored > 0) { + for (int _i = 2; _i <= _stored; ++_i) { + lua_rawgeti(L_, 2, _i); // L_: lane {uv} results2...N + } + lua_rawgeti(L_, 2, 1); // L_: lane {uv} results2...N result1 + lua_replace(L_, 2); // L_: lane results + } else { + lua_pop(L_, 1); // L_: lane + } + // we precede the lane body returned values with boolean true + lua_pushboolean(L_, 1); // L_: lane results true + lua_replace(L_, 1); // L_: true results + _ret = _stored + 1; + STACK_CHECK(L_, _ret); + break; + + case Lane::Error: + { + LUA_ASSERT(L_, _stored == 2 || _stored == 3); // contains nil error [trace] + lua_rawgeti(L_, 2, 2); // L_: lane {uv} + lua_rawgeti(L_, 2, 3); // L_: lane {uv} |nil + if (lua_isnil(L_, -1)) { + lua_replace(L_, 2); // L_: lane nil + } else { + lua_rawgeti(L_, 2, 1); // L_: lane {uv} nil + lua_replace(L_, 2); // L_: lane nil + } + _ret = _stored; // 2 or 3 + STACK_CHECK(L_, _ret + 1); // stack still contains the lane UD below + } + break; + + case Lane::Cancelled: + { + LUA_ASSERT(L_, _stored == 2); + lua_rawgeti(L_, 2, 2); // L_: lane {uv} cancel_error + lua_rawgeti(L_, 2, 1); // L_: lane {uv} cancel_error nil + lua_replace(L_, -3); // L_: lane nil cancel_error + LUA_ASSERT(L_, lua_isnil(L_, -2) && kCancelError.equals(L_, kIdxTop)); + _ret = 2; + STACK_CHECK(L_, _ret + 1); // stack still contains the lane UD below + } + break; + + default: + DEBUGSPEW_CODE(DebugSpew(nullptr) << "Unknown Lane status: " << static_cast(_lane->status.load(std::memory_order_relaxed)) << std::endl); + LUA_ASSERT(L_, false); + _ret = 0; + STACK_CHECK(L_, _ret); + } + LUA_ASSERT(L_, lua_gettop(L_) >= _ret); + return _ret; +} + +// ################################################################################################# + [[nodiscard]] std::string_view Lane::pushErrorTraceLevel(lua_State* L_) const { @@ -1270,15 +1269,16 @@ int Lane::storeResults(lua_State* const L_) lua_getiuservalue(L_, kIdxSelf, UserValueIndex{ 1 }); // L_: lane ... {uv} StackIndex const _tidx{ lua_gettop(L_) }; - int _stored{}; - if (nresults == 0) { + // if the results were already stored from a previous indexing, just say how many values we have in store + if (!L) { lua_rawgeti(L_, -1, 0); // L_: lane ... {uv} nresults - _stored = static_cast(lua_tointeger(L_, -1)); + auto const _stored{ static_cast(lua_tointeger(L_, -1)) }; lua_pop(L_, 2); STACK_CHECK(L_, 0); return _stored; } + int _stored{}; switch (status.load(std::memory_order_acquire)) { default: // this is an internal error, we probably never get here @@ -1358,12 +1358,13 @@ int Lane::storeResults(lua_State* const L_) //--- // str= thread_status( lane ) // -// "pending" -> | ("running" <-> "waiting") <-> "suspended" <-> "resuming" | -> "done"/"error"/"cancelled" +// "pending" -> | ("running" <-> "waiting") <-> "suspended" <-> "resuming/closing" | -> "done"/"error"/"cancelled" // "pending" not started yet // "running" started, doing its work.. // "suspended" returned from a lua_resume // "resuming" told by its parent state to resume +// "closing" not observable from the outside: happens only inside a join()/indexation call to unblock a suspended coroutine Lane so that it can join properly // "waiting" blocked in a send()/receive() // "done" finished, results are there // "error" finished at an error, error value is there @@ -1374,7 +1375,7 @@ std::string_view Lane::threadStatusString() const { static constexpr std::string_view kStrs[] = { "pending", - "running", "suspended", "resuming", + "running", "suspended", "resuming", "closing", "waiting", "done", "error", "cancelled" }; @@ -1382,12 +1383,13 @@ std::string_view Lane::threadStatusString() const static_assert(1 == static_cast>(Running)); static_assert(2 == static_cast>(Suspended)); static_assert(3 == static_cast>(Resuming)); - static_assert(4 == static_cast>(Waiting)); - static_assert(5 == static_cast>(Done)); - static_assert(6 == static_cast>(Error)); - static_assert(7 == static_cast>(Cancelled)); + static_assert(4 == static_cast>(Closing)); + static_assert(5 == static_cast>(Waiting)); + static_assert(6 == static_cast>(Done)); + static_assert(7 == static_cast>(Error)); + static_assert(8 == static_cast>(Cancelled)); auto const _status{ static_cast>(status.load(std::memory_order_acquire)) }; - if (_status < 0 || _status > 7) { // should never happen, but better safe than sorry + if (_status < 0 || _status > 8) { // should never happen, but better safe than sorry return ""; } return kStrs[_status]; @@ -1407,3 +1409,40 @@ bool Lane::waitForCompletion(std::chrono::time_point return _status == Lane::Done || _status == Lane::Error || _status == Lane::Cancelled || _status == suspended; }); } + +// ################################################################################################# + +[[nodiscard]] +bool Lane::waitForJoin(lua_State* const L_, std::chrono::time_point until_) +{ + // wait until suspended or done + { + bool const _done{ !thread.joinable() || waitForCompletion(until_, true) }; + + if (!_done) { + lua_pushnil(L_); // L_: lane nil + luaG_pushstring(L_, "timeout"); // L_: lane nil "timeout" + return false; + } + } + + // if lane is suspended, force the yield loop to break, and the termination of the thread + if (status.load(std::memory_order_acquire) == Lane::Suspended) { + LUA_ASSERT(L_, waiting_on == &doneCondVar); + status.store(Lane::Closing, std::memory_order_release); + doneCondVar.notify_all(); + // wait until done + { + bool const _done{ !thread.joinable() || waitForCompletion(until_, true) }; + + if (!_done) { + lua_pushnil(L_); // L_: lane nil + luaG_pushstring(L_, "timeout"); // L_: lane nil "timeout" + return false; + } + } + LUA_ASSERT(L_, status.load(std::memory_order_acquire) != Lane::Closing); + } + LUA_ASSERT(L_, status.load(std::memory_order_acquire) != Lane::Suspended); + return true; +} -- cgit v1.2.3-55-g6feb