From 45774df1eeeaae0868420104a4cdad4691995dc9 Mon Sep 17 00:00:00 2001 From: Benoit Germain Date: Tue, 1 Jul 2025 08:01:46 +0200 Subject: Clarified interactions between join() and coroutines --- src/lane.cpp | 76 +++++++++++++++++---------- unit_tests/scripts/coro/yielding_function.lua | 45 +++++++++------- 2 files changed, 75 insertions(+), 46 deletions(-) diff --git a/src/lane.cpp b/src/lane.cpp index f8685e9..5ec6bcf 100644 --- a/src/lane.cpp +++ b/src/lane.cpp @@ -128,14 +128,11 @@ static LUAG_FUNC(lane_join) } else if (!lua_isnoneornil(L_, 2)) { raise_luaL_argerror(L_, StackIndex{ 2 }, "incorrect duration type"); } + lua_settop(L_, 1); // L_: lane - // it is forbidden to join a suspended coroutine - if (_lane->status.load(std::memory_order_acquire) == Lane::Suspended) { - raise_luaL_error(L_, "cannot join a suspended coroutine"); - } - lua_settop(L_, 1); // L_: lane - bool const _done{ !_lane->thread.joinable() || _lane->waitForCompletion(_until, false) }; + // wait until suspended or done + bool const _done{ !_lane->thread.joinable() || _lane->waitForCompletion(_until, true) }; if (!_done) { lua_pushnil(L_); // L_: lane nil @@ -143,6 +140,19 @@ static LUAG_FUNC(lane_join) return 2; } + // if lane is suspended, force a cancellation that will cause the yield loop to break, and the termination of the thread + if (_lane->status.load(std::memory_order_acquire) == Lane::Suspended) { + auto const _cr{ _lane->cancel(CancelOp{ CancelRequest::Soft, LuaHookMask::None }, std::chrono::time_point::max(), WakeLane::Yes, 0) }; + if (_cr == CancelResult::Timeout) { + lua_pushnil(L_); // L_: lane nil + luaG_pushstring(L_, "timeout"); // L_: lane nil "timeout" + return 2; + } + if (_lane->status.load(std::memory_order_acquire) == Lane::Suspended) { + raise_luaL_error(L_, "INTERNAL ERROR: Lane should not be suspended here"); + } + } + STACK_CHECK_START_REL(L_, 0); // L_: lane // Thread is Suspended or Done/Error/Cancelled; the Lane thread isn't working with it, therefore we can. @@ -151,9 +161,8 @@ static LUAG_FUNC(lane_join) STACK_GROW(L_, std::max(3, _stored + 1)); switch (_lane->status.load(std::memory_order_acquire)) { case Lane::Suspended: - // TODO: maybe this can happen if we have a to-be-closed handle on a suspended lane? TO BE TESTED! - raise_luaL_error(L_, "INTERNAL ERROR: should not join a suspended coroutine"); - [[fallthrough]]; + raise_luaL_error(L_, "INTERNAL ERROR: SHOULD NEVER BE SUSPENDED HERE"); + break; case Lane::Done: // got regular return values if (_stored > 0) { @@ -274,11 +283,6 @@ static int lane_index_number(lua_State* L_) raise_luaL_error(L_, "INTERNAL ERROR: Failed to join"); } - // it is forbidden to index a suspended coroutine. if you want to read yielded values, use lane:resume() - if (_lane->status.load(std::memory_order_acquire) == Lane::Suspended) { - raise_luaL_error(L_, "cannot index a suspended coroutine"); - } - // make sure results are stored int const _stored{ _lane->storeResults(L_) }; if (_key > _stored) { @@ -287,6 +291,7 @@ static int lane_index_number(lua_State* L_) } else { _lane->pushIndexedResult(L_, _key); // L_: lane result } + return 1; } @@ -744,31 +749,45 @@ static void lane_main(Lane* const lane_) if (_rc == LuaError::YIELD) { // on the stack we find the values pushed by lane:resume() _nargs = lua_gettop(_L); - if (std::unique_lock _guard{ lane_->doneMutex }; true) - { - // change our status to suspended, and wait until someone wants us to resume - lane_->status.store(Lane::Suspended, std::memory_order_release); // Running -> Suspended - lane_->doneCondVar.notify_one(); - // wait until the user wants us to resume + if (std::unique_lock _guard{ lane_->doneMutex }; true) { + // change our status to suspended, and wait until someone wants us to resume + lane_->status.store(Lane::Suspended, std::memory_order_release); // Running -> Suspended + lane_->doneCondVar.notify_one(); + // wait until the user wants us to resume // update waiting_on, so that the lane can be woken by cancellation requests here? lane_->waiting_on = &lane_->doneCondVar; lane_->doneCondVar.wait(_guard, [lane_]() { return lane_->status.load(std::memory_order_acquire) == Lane::Resuming || lane_->cancelRequest.load(std::memory_order_relaxed) != CancelRequest::None; }); - // here lane_->doneMutex is locked again + // here lane_->doneMutex is locked again lane_->waiting_on = nullptr; - lane_->status.store(Lane::Running, std::memory_order_release); // Resuming -> Running - } + lane_->status.store(Lane::Running, std::memory_order_release); // Resuming -> Running + } // wait was interrupted because of a cancellation, finish the lane _again = (lane_->cancelRequest.load(std::memory_order_relaxed) == CancelRequest::None); } else { _again = false; } } while (_again); -#if LUA_VERSION_NUM >= 504 if (_rc == LuaError::YIELD) { +#if LUA_VERSION_NUM >= 504 + lua_State* const _S{ lane_->S }; + STACK_CHECK_START_REL(_S, 0); + // lua_closethread cleans the stack, meaning we lose the yielded values! -> store + lua_xmove(_L, _S, lane_->nresults); // lane is cancelled before completion (for example at Lanes shutdown), close everything - _rc = static_cast(lua_closethread(_L, nullptr)); - } + _rc = static_cast(lua_closethread(_L, nullptr)); // L: ... retvals|err + // then restore the yielded values + if (_rc == LuaError::OK) { + lua_xmove(_S, _L, lane_->nresults); + } else { + lua_pop(_S, lane_->nresults); + } + STACK_CHECK(_S, 0); + +#else // LUA_VERSION_NUM + // Lua prior to 5.4 do not have lua_closethread. + _rc = LuaError::OK; #endif // LUA_VERSION_NUM + } if (_rc != LuaError::OK) { // an error occurred // L: err... // for some reason, in my tests with Lua 5.4, when the coroutine raises an error, I have 3 copies of it on the stack // or false + the error message when running Lua 5.1 @@ -980,7 +999,8 @@ CancelResult Lane::cancel(CancelOp const op_, std::chrono::time_point= Lane::Done) { + auto const _status{ status.load(std::memory_order_acquire) }; + if (_status == Lane::Done || _status == Lane::Error || _status == Lane::Cancelled) { // say "ok" by default, including when lane is already done return CancelResult::Cancelled; } @@ -1015,7 +1035,7 @@ CancelResult Lane::internalCancel(CancelRequest const rq_, std::chrono::time_poi } } // wait until the lane stops working with its state (either Suspended or Done+) - CancelResult const result{ waitForCompletion(until_, true) ? CancelResult::Cancelled : CancelResult::Timeout }; + CancelResult const result{ waitForCompletion(until_, false) ? CancelResult::Cancelled : CancelResult::Timeout }; return result; } diff --git a/unit_tests/scripts/coro/yielding_function.lua b/unit_tests/scripts/coro/yielding_function.lua index e7367ea..636f094 100644 --- a/unit_tests/scripts/coro/yielding_function.lua +++ b/unit_tests/scripts/coro/yielding_function.lua @@ -17,13 +17,13 @@ local yielder = function(...) local _ack = coroutine.yield(_val) assert(_ack == _i) end - return "done!" + return "bye!" end -------------------------------------------------------------------------------------------------- -- TEST: if we start a non-coroutine lane with a yielding function, we should get an error, right? -------------------------------------------------------------------------------------------------- -if true then +if false then local fun_g = lanes.gen("*", { name = 'auto' }, yielder) local h = fun_g("hello", "world", "!") local err, status, stack = h:join() @@ -48,7 +48,7 @@ local coro_g = lanes.coro("*", {name = "auto"}, yielder) ------------------------------------------------------------------------------------------------- -- TEST: we can resume as many times as the lane yields, then read the returned value on indexing ------------------------------------------------------------------------------------------------- -if true then +if false then -- launch coroutine lane local h = coro_g("hello", "world", "!") -- read the yielded values, sending back the expected index @@ -57,13 +57,13 @@ if true then assert(h:resume(3) == "!") -- the lane return value is available as usual local r = h[1] - assert(r == "done!") + assert(r == "bye!") end --------------------------------------------------------------------------------------------- -- TEST: we can resume as many times as the lane yields, then read the returned value on join --------------------------------------------------------------------------------------------- -if true then +if false then -- launch coroutine lane local h = coro_g("hello", "world", "!") -- read the yielded values, sending back the expected index @@ -72,7 +72,7 @@ if true then assert(h:resume(3) == "!") -- the lane return value is available as usual local s, r = h:join() - assert(h.status == "done" and s == true and r == "done!") + assert(h.status == "done" and s == true and r == "bye!") end --------------------------------------------------------------------------------------------------- @@ -83,23 +83,32 @@ if true then local h = coro_g("hello", "world", "!") -- read the first yielded value, sending back the expected index assert(h:resume(1) == "hello") - -- join the lane. since it will reach a yield point, it remains suspended, and we should get a timeout + -- join the lane. since it will reach a yield point, it unblocks and ends. last yielded values are returned normally local b, r = h:join(0.5) local s = h.status - assert(s == "suspended" and b == nil and r == "timeout", "got " .. s .. " " .. tostring(b) .. " " .. r) - -- trying to resume again should proceed normally, since nothing changed - assert(h:resume(2) == "world") - assert(h:resume(3) == "!") - -- the lane return value is available as usual - local s, r = h:join() - assert(h.status == "done" and s == true and r == "done!") + assert(s == "done" and b == true and r == "world", "got " .. s .. " " .. tostring(b) .. " " .. tostring(r)) end ---------------------------------------------------------- --- TEST: if we index yielded lane, we should get an error ---------------------------------------------------------- --- TODO: implement this test! +----------------------------------------------------------------------- +-- TEST: if we index yielded lane, we should get the last yielded value +----------------------------------------------------------------------- +if false then + -- launch coroutine lane + local h = coro_g("hello", "world", "!") + -- read the first yielded value, sending back the expected index + assert(h:resume(1) == "hello") + -- indexing multiple times gives back the same us the same yielded value + local r1 = h[1] + local r2 = h[1] + local r3 = h[1] + assert(r1 == "world" and r2 == "world" and r3 == "world", "got " .. r1 .. " " .. r2 .. " " .. r3) + assert(h:resume(2) == "world") + -- THERE IS AN INCONSISTENCY: h:resume pulls the yielded values directly out of the lane's stack + -- but h[n] removes them and stores them in the internal values storage table + -- TODO: so we need to decide: should indexing a yielded lane work like resume()? + assert(h:resume(3) == "!") +end ------------------------------------------------------------------------------------- -- TEST: if we close yielded lane, we can join it and get the last yielded values out -- cgit v1.2.3-55-g6feb