From d7d756e30680bcff036118b47ac45b740e020ea2 Mon Sep 17 00:00:00 2001 From: Benoit Germain Date: Thu, 26 Jun 2025 09:18:54 +0200 Subject: Preparation for lane:close() and correct to-be-closed variables * lane:join() can no longer be used to read yielded values * same with lane indexing * stub for lane:close(), similar to coroutine.close() (not implemented yet) * preparing tests for to-be-closed variables in yielded coroutine lanes * yielded lanes unlock and terminate properly at Lanes shutdown --- src/lane.cpp | 79 +++++++++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 62 insertions(+), 17 deletions(-) (limited to 'src/lane.cpp') diff --git a/src/lane.cpp b/src/lane.cpp index c6ea358..f8685e9 100644 --- a/src/lane.cpp +++ b/src/lane.cpp @@ -129,8 +129,13 @@ static LUAG_FUNC(lane_join) raise_luaL_argerror(L_, StackIndex{ 2 }, "incorrect duration type"); } + // it is forbidden to join a suspended coroutine + if (_lane->status.load(std::memory_order_acquire) == Lane::Suspended) { + raise_luaL_error(L_, "cannot join a suspended coroutine"); + } + lua_settop(L_, 1); // L_: lane - bool const _done{ !_lane->thread.joinable() || _lane->waitForCompletion(_until, true) }; + bool const _done{ !_lane->thread.joinable() || _lane->waitForCompletion(_until, false) }; if (!_done) { lua_pushnil(L_); // L_: lane nil @@ -145,7 +150,11 @@ static LUAG_FUNC(lane_join) int const _stored{ _lane->storeResults(L_) }; STACK_GROW(L_, std::max(3, _stored + 1)); switch (_lane->status.load(std::memory_order_acquire)) { - case Lane::Suspended: // got yielded values + case Lane::Suspended: + // TODO: maybe this can happen if we have a to-be-closed handle on a suspended lane? TO BE TESTED! + raise_luaL_error(L_, "INTERNAL ERROR: should not join a suspended coroutine"); + [[fallthrough]]; + case Lane::Done: // got regular return values if (_stored > 0) { lua_getiuservalue(L_, StackIndex{ 1 }, UserValueIndex{ 1 }); // L_: lane {uv} @@ -210,7 +219,7 @@ LUAG_FUNC(lane_resume) Lane* const _lane{ ToLane(L_, kIdxSelf) }; lua_State* const _L2{ _lane->L }; -// wait until the lane yields or returns + // wait until the lane yields or returns std::ignore = _lane->waitForCompletion(std::chrono::time_point::max(), true); if (_lane->status.load(std::memory_order_acquire) != Lane::Suspended) { @@ -259,11 +268,17 @@ static int lane_index_number(lua_State* L_) int const _key{ static_cast(lua_tointeger(L_, 2)) }; lua_pop(L_, 1); // L_: lane + // wait until the lane finishes or is suspended std::chrono::time_point _until{ std::chrono::time_point::max() }; if (!_lane->waitForCompletion(_until, true)) { raise_luaL_error(L_, "INTERNAL ERROR: Failed to join"); } + // it is forbidden to index a suspended coroutine. if you want to read yielded values, use lane:resume() + if (_lane->status.load(std::memory_order_acquire) == Lane::Suspended) { + raise_luaL_error(L_, "cannot index a suspended coroutine"); + } + // make sure results are stored int const _stored{ _lane->storeResults(L_) }; if (_key > _stored) { @@ -485,6 +500,7 @@ static int PushStackTrace(lua_State* const L_, Lane::ErrorTraceLevel const error StackIndex const _top{ lua_gettop(L_) }; switch (rc_) { case LuaError::OK: // no error, body return values are on the stack + case LuaError::YIELD: break; case LuaError::ERRRUN: // cancellation or a runtime error @@ -707,7 +723,7 @@ static void lane_main(Lane* const lane_) LuaError _rc{ LuaError::ERRRUN }; if (lane_->status.load(std::memory_order_acquire) == Lane::Pending) { // nothing wrong happened during preparation, we can work // At this point, the lane function and arguments are on the stack, possibly preceded by the error handler - int const _errorHandlerCount{ lane_->errorHandlerCount() }; + int const _errorHandlerCount{ lane_->errorHandlerCount() }; // no error handler for coroutines, ever. int _nargs{ lua_gettop(_L) - 1 - _errorHandlerCount }; { std::unique_lock _guard{ lane_->doneMutex }; @@ -720,27 +736,40 @@ static void lane_main(Lane* const lane_) lane_->nresults = lua_gettop(_L) - _errorHandlerCount; } else { // S and L are different: we run as a coroutine in Lua thread L created in state S + bool _again{ true }; do { // starting with Lua 5.4, lua_resume can leave more stuff on the stack below the actual yielded values. // that's why we have lane_->nresults - _rc = luaG_resume(_L, nullptr, _nargs, &lane_->nresults); // L: eh? ... retvals|err... + _rc = luaG_resume(_L, nullptr, _nargs, &lane_->nresults); // L: ... retvals|err... if (_rc == LuaError::YIELD) { + // on the stack we find the values pushed by lane:resume() + _nargs = lua_gettop(_L); + if (std::unique_lock _guard{ lane_->doneMutex }; true) + { // change our status to suspended, and wait until someone wants us to resume - std::unique_lock _guard{ lane_->doneMutex }; lane_->status.store(Lane::Suspended, std::memory_order_release); // Running -> Suspended lane_->doneCondVar.notify_one(); // wait until the user wants us to resume - // TODO: do I update waiting_on or not, so that the lane can be woken by cancellation requests here? - // lane_->waiting_on = &lane_->doneCondVar; - lane_->doneCondVar.wait(_guard, [lane_]() { return lane_->status.load(std::memory_order_acquire) == Lane::Resuming; }); + // update waiting_on, so that the lane can be woken by cancellation requests here? + lane_->waiting_on = &lane_->doneCondVar; + lane_->doneCondVar.wait(_guard, [lane_]() { return lane_->status.load(std::memory_order_acquire) == Lane::Resuming || lane_->cancelRequest.load(std::memory_order_relaxed) != CancelRequest::None; }); // here lane_->doneMutex is locked again - // lane_->waiting_on = nullptr; + lane_->waiting_on = nullptr; lane_->status.store(Lane::Running, std::memory_order_release); // Resuming -> Running - // on the stack we find the values pushed by lane:resume() - _nargs = lua_gettop(_L); } - } while (_rc == LuaError::YIELD); - if (_rc != LuaError::OK) { // : err... + // wait was interrupted because of a cancellation, finish the lane + _again = (lane_->cancelRequest.load(std::memory_order_relaxed) == CancelRequest::None); + } else { + _again = false; + } + } while (_again); +#if LUA_VERSION_NUM >= 504 + if (_rc == LuaError::YIELD) { + // lane is cancelled before completion (for example at Lanes shutdown), close everything + _rc = static_cast(lua_closethread(_L, nullptr)); + } +#endif // LUA_VERSION_NUM + if (_rc != LuaError::OK) { // an error occurred // L: err... // for some reason, in my tests with Lua 5.4, when the coroutine raises an error, I have 3 copies of it on the stack // or false + the error message when running Lua 5.1 // since the rest of our code wants only the error message, let us keep only the latter. @@ -805,7 +834,9 @@ static void lane_main(Lane* const lane_) // ################################################################################################# #if LUA_VERSION_NUM >= 504 -static LUAG_FUNC(lane_close) + +// __close(lane_ud, ) +static LUAG_FUNC(lane___close) { [[maybe_unused]] Lane* const _lane{ ToLane(L_, StackIndex{ 1 }) }; // L_: lane err|nil // drop the error if any @@ -818,6 +849,16 @@ static LUAG_FUNC(lane_close) lua_call(L_, 1, LUA_MULTRET); // L_: join() results return lua_gettop(L_); } + +// ################################################################################################# + +// close(lane_ud) +static LUAG_FUNC(lane_close) +{ + [[maybe_unused]] Lane* const _lane{ ToLane(L_, StackIndex{ 1 }) }; // L_: lane err|nil + raise_luaL_error(L_, "not implemented"); // TODO! + return 0; +} #endif // LUA_VERSION_NUM >= 504 // ################################################################################################# @@ -966,7 +1007,8 @@ CancelResult Lane::internalCancel(CancelRequest const rq_, std::chrono::time_poi // lane_->thread.get_stop_source().request_stop(); } if (wakeLane_ == WakeLane::Yes) { // wake the thread so that execution returns from any pending linda operation if desired - if (status.load(std::memory_order_acquire) == Lane::Waiting) { // waiting_on is updated under control of status acquire/release semantics + auto const _status{ status.load(std::memory_order_acquire) }; + if (_status == Lane::Waiting || _status == Lane::Suspended) { // waiting_on is updated under control of status acquire/release semantics if (std::condition_variable* const _waiting_on{ waiting_on }) { _waiting_on->notify_all(); } @@ -1007,11 +1049,14 @@ namespace { namespace local { static struct luaL_Reg const sLaneFunctions[] = { #if LUA_VERSION_NUM >= 504 - { "__close", LG_lane_close }, + { "__close", LG_lane___close }, #endif // LUA_VERSION_NUM >= 504 { "__gc", LG_lane_gc }, { "__index", LG_lane_index }, { "cancel", LG_lane_cancel }, +#if LUA_VERSION_NUM >= 504 + { "close", LG_lane_close }, +#endif // LUA_VERSION_NUM >= 504 { "get_threadname", LG_lane_get_threadname }, { "join", LG_lane_join }, { "resume", LG_lane_resume }, -- cgit v1.2.3-55-g6feb