From 45774df1eeeaae0868420104a4cdad4691995dc9 Mon Sep 17 00:00:00 2001 From: Benoit Germain Date: Tue, 1 Jul 2025 08:01:46 +0200 Subject: Clarified interactions between join() and coroutines --- src/lane.cpp | 76 ++++++++++++++++++++++++++++++++++++++---------------------- 1 file changed, 48 insertions(+), 28 deletions(-) (limited to 'src') diff --git a/src/lane.cpp b/src/lane.cpp index f8685e9..5ec6bcf 100644 --- a/src/lane.cpp +++ b/src/lane.cpp @@ -128,14 +128,11 @@ static LUAG_FUNC(lane_join) } else if (!lua_isnoneornil(L_, 2)) { raise_luaL_argerror(L_, StackIndex{ 2 }, "incorrect duration type"); } + lua_settop(L_, 1); // L_: lane - // it is forbidden to join a suspended coroutine - if (_lane->status.load(std::memory_order_acquire) == Lane::Suspended) { - raise_luaL_error(L_, "cannot join a suspended coroutine"); - } - lua_settop(L_, 1); // L_: lane - bool const _done{ !_lane->thread.joinable() || _lane->waitForCompletion(_until, false) }; + // wait until suspended or done + bool const _done{ !_lane->thread.joinable() || _lane->waitForCompletion(_until, true) }; if (!_done) { lua_pushnil(L_); // L_: lane nil @@ -143,6 +140,19 @@ static LUAG_FUNC(lane_join) return 2; } + // if lane is suspended, force a cancellation that will cause the yield loop to break, and the termination of the thread + if (_lane->status.load(std::memory_order_acquire) == Lane::Suspended) { + auto const _cr{ _lane->cancel(CancelOp{ CancelRequest::Soft, LuaHookMask::None }, std::chrono::time_point::max(), WakeLane::Yes, 0) }; + if (_cr == CancelResult::Timeout) { + lua_pushnil(L_); // L_: lane nil + luaG_pushstring(L_, "timeout"); // L_: lane nil "timeout" + return 2; + } + if (_lane->status.load(std::memory_order_acquire) == Lane::Suspended) { + raise_luaL_error(L_, "INTERNAL ERROR: Lane should not be suspended here"); + } + } + STACK_CHECK_START_REL(L_, 0); // L_: lane // Thread is Suspended or Done/Error/Cancelled; the Lane thread isn't working with it, therefore we can. @@ -151,9 +161,8 @@ static LUAG_FUNC(lane_join) STACK_GROW(L_, std::max(3, _stored + 1)); switch (_lane->status.load(std::memory_order_acquire)) { case Lane::Suspended: - // TODO: maybe this can happen if we have a to-be-closed handle on a suspended lane? TO BE TESTED! - raise_luaL_error(L_, "INTERNAL ERROR: should not join a suspended coroutine"); - [[fallthrough]]; + raise_luaL_error(L_, "INTERNAL ERROR: SHOULD NEVER BE SUSPENDED HERE"); + break; case Lane::Done: // got regular return values if (_stored > 0) { @@ -274,11 +283,6 @@ static int lane_index_number(lua_State* L_) raise_luaL_error(L_, "INTERNAL ERROR: Failed to join"); } - // it is forbidden to index a suspended coroutine. if you want to read yielded values, use lane:resume() - if (_lane->status.load(std::memory_order_acquire) == Lane::Suspended) { - raise_luaL_error(L_, "cannot index a suspended coroutine"); - } - // make sure results are stored int const _stored{ _lane->storeResults(L_) }; if (_key > _stored) { @@ -287,6 +291,7 @@ static int lane_index_number(lua_State* L_) } else { _lane->pushIndexedResult(L_, _key); // L_: lane result } + return 1; } @@ -744,31 +749,45 @@ static void lane_main(Lane* const lane_) if (_rc == LuaError::YIELD) { // on the stack we find the values pushed by lane:resume() _nargs = lua_gettop(_L); - if (std::unique_lock _guard{ lane_->doneMutex }; true) - { - // change our status to suspended, and wait until someone wants us to resume - lane_->status.store(Lane::Suspended, std::memory_order_release); // Running -> Suspended - lane_->doneCondVar.notify_one(); - // wait until the user wants us to resume + if (std::unique_lock _guard{ lane_->doneMutex }; true) { + // change our status to suspended, and wait until someone wants us to resume + lane_->status.store(Lane::Suspended, std::memory_order_release); // Running -> Suspended + lane_->doneCondVar.notify_one(); + // wait until the user wants us to resume // update waiting_on, so that the lane can be woken by cancellation requests here? lane_->waiting_on = &lane_->doneCondVar; lane_->doneCondVar.wait(_guard, [lane_]() { return lane_->status.load(std::memory_order_acquire) == Lane::Resuming || lane_->cancelRequest.load(std::memory_order_relaxed) != CancelRequest::None; }); - // here lane_->doneMutex is locked again + // here lane_->doneMutex is locked again lane_->waiting_on = nullptr; - lane_->status.store(Lane::Running, std::memory_order_release); // Resuming -> Running - } + lane_->status.store(Lane::Running, std::memory_order_release); // Resuming -> Running + } // wait was interrupted because of a cancellation, finish the lane _again = (lane_->cancelRequest.load(std::memory_order_relaxed) == CancelRequest::None); } else { _again = false; } } while (_again); -#if LUA_VERSION_NUM >= 504 if (_rc == LuaError::YIELD) { +#if LUA_VERSION_NUM >= 504 + lua_State* const _S{ lane_->S }; + STACK_CHECK_START_REL(_S, 0); + // lua_closethread cleans the stack, meaning we lose the yielded values! -> store + lua_xmove(_L, _S, lane_->nresults); // lane is cancelled before completion (for example at Lanes shutdown), close everything - _rc = static_cast(lua_closethread(_L, nullptr)); - } + _rc = static_cast(lua_closethread(_L, nullptr)); // L: ... retvals|err + // then restore the yielded values + if (_rc == LuaError::OK) { + lua_xmove(_S, _L, lane_->nresults); + } else { + lua_pop(_S, lane_->nresults); + } + STACK_CHECK(_S, 0); + +#else // LUA_VERSION_NUM + // Lua prior to 5.4 do not have lua_closethread. + _rc = LuaError::OK; #endif // LUA_VERSION_NUM + } if (_rc != LuaError::OK) { // an error occurred // L: err... // for some reason, in my tests with Lua 5.4, when the coroutine raises an error, I have 3 copies of it on the stack // or false + the error message when running Lua 5.1 @@ -980,7 +999,8 @@ CancelResult Lane::cancel(CancelOp const op_, std::chrono::time_point= Lane::Done) { + auto const _status{ status.load(std::memory_order_acquire) }; + if (_status == Lane::Done || _status == Lane::Error || _status == Lane::Cancelled) { // say "ok" by default, including when lane is already done return CancelResult::Cancelled; } @@ -1015,7 +1035,7 @@ CancelResult Lane::internalCancel(CancelRequest const rq_, std::chrono::time_poi } } // wait until the lane stops working with its state (either Suspended or Done+) - CancelResult const result{ waitForCompletion(until_, true) ? CancelResult::Cancelled : CancelResult::Timeout }; + CancelResult const result{ waitForCompletion(until_, false) ? CancelResult::Cancelled : CancelResult::Timeout }; return result; } -- cgit v1.2.3-55-g6feb