From d7d756e30680bcff036118b47ac45b740e020ea2 Mon Sep 17 00:00:00 2001 From: Benoit Germain Date: Thu, 26 Jun 2025 09:18:54 +0200 Subject: Preparation for lane:close() and correct to-be-closed variables * lane:join() can no longer be used to read yielded values * same with lane indexing * stub for lane:close(), similar to coroutine.close() (not implemented yet) * preparing tests for to-be-closed variables in yielded coroutine lanes * yielded lanes unlock and terminate properly at Lanes shutdown --- src/lane.cpp | 79 +++++++++++++---- unit_tests/UnitTests.vcxproj | 4 +- unit_tests/UnitTests.vcxproj.filters | 8 +- unit_tests/lane_tests.cpp | 9 +- unit_tests/scripts/coro/basics.lua | 99 --------------------- unit_tests/scripts/coro/collect_yielded_lane.lua | 72 +++++++++++++++ unit_tests/scripts/coro/error_handling.lua | 6 +- unit_tests/scripts/coro/regular_function.lua | 38 ++++++++ unit_tests/scripts/coro/yielding_function.lua | 107 +++++++++++++++++++++++ 9 files changed, 299 insertions(+), 123 deletions(-) delete mode 100644 unit_tests/scripts/coro/basics.lua create mode 100644 unit_tests/scripts/coro/collect_yielded_lane.lua create mode 100644 unit_tests/scripts/coro/regular_function.lua create mode 100644 unit_tests/scripts/coro/yielding_function.lua diff --git a/src/lane.cpp b/src/lane.cpp index c6ea358..f8685e9 100644 --- a/src/lane.cpp +++ b/src/lane.cpp @@ -129,8 +129,13 @@ static LUAG_FUNC(lane_join) raise_luaL_argerror(L_, StackIndex{ 2 }, "incorrect duration type"); } + // it is forbidden to join a suspended coroutine + if (_lane->status.load(std::memory_order_acquire) == Lane::Suspended) { + raise_luaL_error(L_, "cannot join a suspended coroutine"); + } + lua_settop(L_, 1); // L_: lane - bool const _done{ !_lane->thread.joinable() || _lane->waitForCompletion(_until, true) }; + bool const _done{ !_lane->thread.joinable() || _lane->waitForCompletion(_until, false) }; if (!_done) { lua_pushnil(L_); // L_: lane nil @@ -145,7 +150,11 @@ static LUAG_FUNC(lane_join) int const _stored{ _lane->storeResults(L_) }; STACK_GROW(L_, std::max(3, _stored + 1)); switch (_lane->status.load(std::memory_order_acquire)) { - case Lane::Suspended: // got yielded values + case Lane::Suspended: + // TODO: maybe this can happen if we have a to-be-closed handle on a suspended lane? TO BE TESTED! + raise_luaL_error(L_, "INTERNAL ERROR: should not join a suspended coroutine"); + [[fallthrough]]; + case Lane::Done: // got regular return values if (_stored > 0) { lua_getiuservalue(L_, StackIndex{ 1 }, UserValueIndex{ 1 }); // L_: lane {uv} @@ -210,7 +219,7 @@ LUAG_FUNC(lane_resume) Lane* const _lane{ ToLane(L_, kIdxSelf) }; lua_State* const _L2{ _lane->L }; -// wait until the lane yields or returns + // wait until the lane yields or returns std::ignore = _lane->waitForCompletion(std::chrono::time_point::max(), true); if (_lane->status.load(std::memory_order_acquire) != Lane::Suspended) { @@ -259,11 +268,17 @@ static int lane_index_number(lua_State* L_) int const _key{ static_cast(lua_tointeger(L_, 2)) }; lua_pop(L_, 1); // L_: lane + // wait until the lane finishes or is suspended std::chrono::time_point _until{ std::chrono::time_point::max() }; if (!_lane->waitForCompletion(_until, true)) { raise_luaL_error(L_, "INTERNAL ERROR: Failed to join"); } + // it is forbidden to index a suspended coroutine. if you want to read yielded values, use lane:resume() + if (_lane->status.load(std::memory_order_acquire) == Lane::Suspended) { + raise_luaL_error(L_, "cannot index a suspended coroutine"); + } + // make sure results are stored int const _stored{ _lane->storeResults(L_) }; if (_key > _stored) { @@ -485,6 +500,7 @@ static int PushStackTrace(lua_State* const L_, Lane::ErrorTraceLevel const error StackIndex const _top{ lua_gettop(L_) }; switch (rc_) { case LuaError::OK: // no error, body return values are on the stack + case LuaError::YIELD: break; case LuaError::ERRRUN: // cancellation or a runtime error @@ -707,7 +723,7 @@ static void lane_main(Lane* const lane_) LuaError _rc{ LuaError::ERRRUN }; if (lane_->status.load(std::memory_order_acquire) == Lane::Pending) { // nothing wrong happened during preparation, we can work // At this point, the lane function and arguments are on the stack, possibly preceded by the error handler - int const _errorHandlerCount{ lane_->errorHandlerCount() }; + int const _errorHandlerCount{ lane_->errorHandlerCount() }; // no error handler for coroutines, ever. int _nargs{ lua_gettop(_L) - 1 - _errorHandlerCount }; { std::unique_lock _guard{ lane_->doneMutex }; @@ -720,27 +736,40 @@ static void lane_main(Lane* const lane_) lane_->nresults = lua_gettop(_L) - _errorHandlerCount; } else { // S and L are different: we run as a coroutine in Lua thread L created in state S + bool _again{ true }; do { // starting with Lua 5.4, lua_resume can leave more stuff on the stack below the actual yielded values. // that's why we have lane_->nresults - _rc = luaG_resume(_L, nullptr, _nargs, &lane_->nresults); // L: eh? ... retvals|err... + _rc = luaG_resume(_L, nullptr, _nargs, &lane_->nresults); // L: ... retvals|err... if (_rc == LuaError::YIELD) { + // on the stack we find the values pushed by lane:resume() + _nargs = lua_gettop(_L); + if (std::unique_lock _guard{ lane_->doneMutex }; true) + { // change our status to suspended, and wait until someone wants us to resume - std::unique_lock _guard{ lane_->doneMutex }; lane_->status.store(Lane::Suspended, std::memory_order_release); // Running -> Suspended lane_->doneCondVar.notify_one(); // wait until the user wants us to resume - // TODO: do I update waiting_on or not, so that the lane can be woken by cancellation requests here? - // lane_->waiting_on = &lane_->doneCondVar; - lane_->doneCondVar.wait(_guard, [lane_]() { return lane_->status.load(std::memory_order_acquire) == Lane::Resuming; }); + // update waiting_on, so that the lane can be woken by cancellation requests here? + lane_->waiting_on = &lane_->doneCondVar; + lane_->doneCondVar.wait(_guard, [lane_]() { return lane_->status.load(std::memory_order_acquire) == Lane::Resuming || lane_->cancelRequest.load(std::memory_order_relaxed) != CancelRequest::None; }); // here lane_->doneMutex is locked again - // lane_->waiting_on = nullptr; + lane_->waiting_on = nullptr; lane_->status.store(Lane::Running, std::memory_order_release); // Resuming -> Running - // on the stack we find the values pushed by lane:resume() - _nargs = lua_gettop(_L); } - } while (_rc == LuaError::YIELD); - if (_rc != LuaError::OK) { // : err... + // wait was interrupted because of a cancellation, finish the lane + _again = (lane_->cancelRequest.load(std::memory_order_relaxed) == CancelRequest::None); + } else { + _again = false; + } + } while (_again); +#if LUA_VERSION_NUM >= 504 + if (_rc == LuaError::YIELD) { + // lane is cancelled before completion (for example at Lanes shutdown), close everything + _rc = static_cast(lua_closethread(_L, nullptr)); + } +#endif // LUA_VERSION_NUM + if (_rc != LuaError::OK) { // an error occurred // L: err... // for some reason, in my tests with Lua 5.4, when the coroutine raises an error, I have 3 copies of it on the stack // or false + the error message when running Lua 5.1 // since the rest of our code wants only the error message, let us keep only the latter. @@ -805,7 +834,9 @@ static void lane_main(Lane* const lane_) // ################################################################################################# #if LUA_VERSION_NUM >= 504 -static LUAG_FUNC(lane_close) + +// __close(lane_ud, ) +static LUAG_FUNC(lane___close) { [[maybe_unused]] Lane* const _lane{ ToLane(L_, StackIndex{ 1 }) }; // L_: lane err|nil // drop the error if any @@ -818,6 +849,16 @@ static LUAG_FUNC(lane_close) lua_call(L_, 1, LUA_MULTRET); // L_: join() results return lua_gettop(L_); } + +// ################################################################################################# + +// close(lane_ud) +static LUAG_FUNC(lane_close) +{ + [[maybe_unused]] Lane* const _lane{ ToLane(L_, StackIndex{ 1 }) }; // L_: lane err|nil + raise_luaL_error(L_, "not implemented"); // TODO! + return 0; +} #endif // LUA_VERSION_NUM >= 504 // ################################################################################################# @@ -966,7 +1007,8 @@ CancelResult Lane::internalCancel(CancelRequest const rq_, std::chrono::time_poi // lane_->thread.get_stop_source().request_stop(); } if (wakeLane_ == WakeLane::Yes) { // wake the thread so that execution returns from any pending linda operation if desired - if (status.load(std::memory_order_acquire) == Lane::Waiting) { // waiting_on is updated under control of status acquire/release semantics + auto const _status{ status.load(std::memory_order_acquire) }; + if (_status == Lane::Waiting || _status == Lane::Suspended) { // waiting_on is updated under control of status acquire/release semantics if (std::condition_variable* const _waiting_on{ waiting_on }) { _waiting_on->notify_all(); } @@ -1007,11 +1049,14 @@ namespace { namespace local { static struct luaL_Reg const sLaneFunctions[] = { #if LUA_VERSION_NUM >= 504 - { "__close", LG_lane_close }, + { "__close", LG_lane___close }, #endif // LUA_VERSION_NUM >= 504 { "__gc", LG_lane_gc }, { "__index", LG_lane_index }, { "cancel", LG_lane_cancel }, +#if LUA_VERSION_NUM >= 504 + { "close", LG_lane_close }, +#endif // LUA_VERSION_NUM >= 504 { "get_threadname", LG_lane_get_threadname }, { "join", LG_lane_join }, { "resume", LG_lane_resume }, diff --git a/unit_tests/UnitTests.vcxproj b/unit_tests/UnitTests.vcxproj index 79361c5..d6bfd5a 100644 --- a/unit_tests/UnitTests.vcxproj +++ b/unit_tests/UnitTests.vcxproj @@ -967,11 +967,13 @@ + + - + diff --git a/unit_tests/UnitTests.vcxproj.filters b/unit_tests/UnitTests.vcxproj.filters index dfa642b..c5a2761 100644 --- a/unit_tests/UnitTests.vcxproj.filters +++ b/unit_tests/UnitTests.vcxproj.filters @@ -95,7 +95,7 @@ Scripts\lane - + Scripts\coro @@ -119,8 +119,14 @@ Scripts\linda + + Scripts\coro + Scripts\lane + + Scripts\coro + \ No newline at end of file diff --git a/unit_tests/lane_tests.cpp b/unit_tests/lane_tests.cpp index 82ca1ad..c4d3c95 100644 --- a/unit_tests/lane_tests.cpp +++ b/unit_tests/lane_tests.cpp @@ -421,7 +421,7 @@ TEST_CASE("scripted_tests." #DIR "." #FILE) \ MAKE_TEST_CASE(lane, body_is_a_c_function, AssertNoLuaError) MAKE_TEST_CASE(lane, cooperative_shutdown, AssertNoLuaError) -#if LUA_VERSION_NUM >= 504 // // warnings are a Lua 5.4 feature +#if LUA_VERSION_NUM >= 504 // warnings are a Lua 5.4 feature // NOTE: when this test ends, there are resource leaks and a dangling thread MAKE_TEST_CASE(lane, uncooperative_shutdown, AssertWarns) #endif // LUA_VERSION_NUM @@ -434,11 +434,16 @@ MAKE_TEST_CASE(lane, tasking_error, AssertNoLuaError) MAKE_TEST_CASE(lane, tasking_join_test, AssertNoLuaError) MAKE_TEST_CASE(lane, tasking_send_receive_code, AssertNoLuaError) MAKE_TEST_CASE(lane, stdlib_naming, AssertNoLuaError) -MAKE_TEST_CASE(coro, basics, AssertNoLuaError) + +#if LUA_VERSION_NUM >= 504 // this makes use of to-be-closed variables, a Lua 5.4 feature +MAKE_TEST_CASE(coro, collect_yielded_lane, AssertNoLuaError) +#endif // LUA_VERSION_NUM #if LUAJIT_FLAVOR() == 0 // TODO: for some reason, the test fails with LuaJIT. To be investigated MAKE_TEST_CASE(coro, error_handling, AssertNoLuaError) #endif // LUAJIT_FLAVOR() +MAKE_TEST_CASE(coro, regular_function, AssertNoLuaError) +MAKE_TEST_CASE(coro, yielding_function, AssertNoLuaError) /* TEST_CASE("lanes.scripted_tests") diff --git a/unit_tests/scripts/coro/basics.lua b/unit_tests/scripts/coro/basics.lua deleted file mode 100644 index c0b7a36..0000000 --- a/unit_tests/scripts/coro/basics.lua +++ /dev/null @@ -1,99 +0,0 @@ -local lanes = require "lanes" - -local fixture = require "fixture" -lanes.finally(fixture.throwing_finalizer) - -local utils = lanes.require "_utils" -local PRINT = utils.MAKE_PRINT() - -if true then - -- a lane body that just returns some value - local lane = function(msg_) - local utils = lanes.require "_utils" - local PRINT = utils.MAKE_PRINT() - PRINT "In lane" - assert(msg_ == "hi") - return "bye" - end - - -- the generator - local g1 = lanes.coro("*", {name = "auto"}, lane) - - -- launch lane - local h1 = g1("hi") - - local r = h1[1] - assert(r == "bye") -end - --- a lane coroutine that yields back what got in, one element at a time -local yielder = function(...) - local utils = lanes.require "_utils" - local PRINT = utils.MAKE_PRINT() - PRINT "In lane" - for _i = 1, select('#', ...) do - local _val = select(_i, ...) - PRINT("yielding #", _i, _val) - local _ack = coroutine.yield(_val) - assert(_ack == _i) - end - return "done!" -end - -if true then - -- if we start a non-coroutine lane with a yielding function, we should get an error, right? - local fun_g = lanes.gen("*", { name = 'auto' }, yielder) - local h = fun_g("hello", "world", "!") - local err, status, stack = h:join() - PRINT(err, status, stack) - -- the actual error message is not the same for Lua 5.1 - -- of course, it also has to be different for LuaJIT as well - -- also, LuaJIT prepends a file:line to the actual error message, which Lua5.1 does not. - local msgs = { - ["Lua 5.1"] = jit and "attempt to yield across C-call boundary" or "attempt to yield across metamethod/C-call boundary", - ["Lua 5.2"] = "attempt to yield from outside a coroutine", - ["Lua 5.3"] = "attempt to yield from outside a coroutine", - ["Lua 5.4"] = "attempt to yield from outside a coroutine" - } - local expected_msg = msgs[_VERSION] - PRINT("expected_msg = " .. expected_msg) - assert(err == nil and string.find(status, expected_msg, 1, true) and stack == nil, "status = " .. status) -end - --- the generator -local coro_g = lanes.coro("*", {name = "auto"}, yielder) - -if true then - -- launch coroutine lane - local h2 = coro_g("hello", "world", "!") - -- read the yielded values, sending back the expected index - assert(h2:resume(1) == "hello") - assert(h2:resume(2) == "world") - assert(h2:resume(3) == "!") - -- the lane return value is available as usual - local r = h2[1] - assert(r == "done!") -end - -if true then - -- another coroutine lane - local h3 = coro_g("hello", "world", "!") - - -- yielded values are available as regular return values - assert(h3[1] == "hello" and h3.status == "suspended") - -- since we consumed the returned values, they should not be here when we resume - assert(h3:resume(1) == nil) - - -- similarly, we can get them with join() - local r3, ret3 = h3:join() - assert(r3 == true and ret3 == "world" and h3.status == "suspended") - -- since we consumed the returned values, they should not be here when we resume - assert(h3:resume(2) == nil) - - -- the rest should work as usual - assert(h3:resume(3) == "!") - - -- the final return value of the lane body remains to be read - local r3, ret3 = h3:join() - assert(r3 == true and ret3 == "done!" and h3.status == "done") -end diff --git a/unit_tests/scripts/coro/collect_yielded_lane.lua b/unit_tests/scripts/coro/collect_yielded_lane.lua new file mode 100644 index 0000000..0459698 --- /dev/null +++ b/unit_tests/scripts/coro/collect_yielded_lane.lua @@ -0,0 +1,72 @@ +local lanes = require "lanes" + +local fixture = require "fixture" +lanes.finally(fixture.throwing_finalizer) + +local utils = lanes.require "_utils" +local PRINT = utils.MAKE_PRINT() + +-- a lane body that yields stuff +local yielder = function(out_linda_) + -- here is a to-be-closed variable that, when closed, sends "Closed!" in the "out" slot of the provided linda + local t = setmetatable( + { text = "Closed!" }, { + __close = function(self, err) + out_linda_:send("out", self.text) + end + } + ) + -- yield forever + while true do + coroutine.yield("I yield!") + end +end + +local out_linda = lanes.linda() + +local test_close = function(what_, f_) + local c = coroutine.create(f_) + for i = 1, 10 do + local t, r = coroutine.resume(c, out_linda) -- returns true + + assert(t == true and r == "I yield!", "got " .. tostring(t) .. " " .. tostring(r)) + local s = coroutine.status(c) + assert(s == "suspended") + end + local r, s = coroutine.close(c) + assert(r == true and s == nil) + -- the local variable inside the yielder body should be closed + local s, r = out_linda:receive(0, "out") + assert(s == "out" and r == "Closed!", what_ .. " got " .. tostring(s) .. " " .. tostring(r)) +end + +-- first, try the close mechanism outside of a lane +test_close("base", yielder) + +-- try again with a function obtained through dump/undump +-- note this means our yielder implementation can't have upvalues, as they are lost in the process +test_close("dumped", load(string.dump(yielder))) + +------------------------------------------------------------------------------ +-- TEST: to-be-closed variables are properly closed when the lane is collected +------------------------------------------------------------------------------ +if false then -- NOT IMPLEMENTED YET! + + -- the generator + local coro_g = lanes.coro("*", yielder) + + -- start the lane + local h = coro_g(out_linda) + + -- join it so that it reaches suspended state + local r, v = h:join(0.5) + assert(r == nil and v == "timeout", "got " .. tostring(r) .. " " .. tostring(v)) + assert(h.status == "suspended") + + -- force collection of the lane + h = nil + collectgarbage() + + -- I want the to-be-closed variable of the coroutine linda to be properly closed + local s, r = out_linda:receive(0, "out") + assert(s == "out" and r == "Closed!", "coro got " .. tostring(s) .. " " .. tostring(r)) +end diff --git a/unit_tests/scripts/coro/error_handling.lua b/unit_tests/scripts/coro/error_handling.lua index ba6cff6..1cfb8c8 100644 --- a/unit_tests/scripts/coro/error_handling.lua +++ b/unit_tests/scripts/coro/error_handling.lua @@ -38,15 +38,15 @@ local force_error_test = function(error_trace_level_) utils.dump_error_stack(error_trace_level_, c) end -if false then +if true then force_error_test("minimal") end -if false then +if true then force_error_test("basic") end -if false then +if true then force_error_test("extended") end diff --git a/unit_tests/scripts/coro/regular_function.lua b/unit_tests/scripts/coro/regular_function.lua new file mode 100644 index 0000000..09aa3b7 --- /dev/null +++ b/unit_tests/scripts/coro/regular_function.lua @@ -0,0 +1,38 @@ +local lanes = require "lanes".configure() + +local utils = lanes.require "_utils" +local PRINT = utils.MAKE_PRINT() + +-- a lane body that just returns some value +local returner = function(msg_) + local utils = lanes.require "_utils" + local PRINT = utils.MAKE_PRINT() + PRINT "In lane" + assert(msg_ == "hi") + return "bye" +end + +-- a function that returns some value can run in a coroutine +if true then + -- the generator + local g = lanes.coro("*", {name = "auto"}, returner) + + -- launch lane + local h = g("hi") + + local r = h[1] + assert(r == "bye") +end + +-- can't resume a coro after the lane body has returned +if true then + -- the generator + local g = lanes.coro("*", {name = "auto"}, returner) + + -- launch lane + local h = g("hi") + + -- resuming a lane that terminated execution should raise an error + local b, e = pcall(h.resume, h) + assert(b == false and type(e) == "string") +end diff --git a/unit_tests/scripts/coro/yielding_function.lua b/unit_tests/scripts/coro/yielding_function.lua new file mode 100644 index 0000000..e7367ea --- /dev/null +++ b/unit_tests/scripts/coro/yielding_function.lua @@ -0,0 +1,107 @@ +local lanes = require "lanes" + +local fixture = require "fixture" +lanes.finally(fixture.throwing_finalizer) + +local utils = lanes.require "_utils" +local PRINT = utils.MAKE_PRINT() + +-- a lane coroutine that yields back what got in, one element at a time +local yielder = function(...) + local utils = lanes.require "_utils" + local PRINT = utils.MAKE_PRINT() + PRINT "In lane" + for _i = 1, select('#', ...) do + local _val = select(_i, ...) + PRINT("yielding #", _i, _val) + local _ack = coroutine.yield(_val) + assert(_ack == _i) + end + return "done!" +end + +-------------------------------------------------------------------------------------------------- +-- TEST: if we start a non-coroutine lane with a yielding function, we should get an error, right? +-------------------------------------------------------------------------------------------------- +if true then + local fun_g = lanes.gen("*", { name = 'auto' }, yielder) + local h = fun_g("hello", "world", "!") + local err, status, stack = h:join() + PRINT(err, status, stack) + -- the actual error message is not the same for Lua 5.1 + -- of course, it also has to be different for LuaJIT as well + -- also, LuaJIT prepends a file:line to the actual error message, which Lua5.1 does not. + local msgs = { + ["Lua 5.1"] = jit and "attempt to yield across C-call boundary" or "attempt to yield across metamethod/C-call boundary", + ["Lua 5.2"] = "attempt to yield from outside a coroutine", + ["Lua 5.3"] = "attempt to yield from outside a coroutine", + ["Lua 5.4"] = "attempt to yield from outside a coroutine" + } + local expected_msg = msgs[_VERSION] + PRINT("expected_msg = " .. expected_msg) + assert(err == nil and string.find(status, expected_msg, 1, true) and stack == nil, "status = " .. status) +end + +-- the coroutine generator +local coro_g = lanes.coro("*", {name = "auto"}, yielder) + +------------------------------------------------------------------------------------------------- +-- TEST: we can resume as many times as the lane yields, then read the returned value on indexing +------------------------------------------------------------------------------------------------- +if true then + -- launch coroutine lane + local h = coro_g("hello", "world", "!") + -- read the yielded values, sending back the expected index + assert(h:resume(1) == "hello") + assert(h:resume(2) == "world") + assert(h:resume(3) == "!") + -- the lane return value is available as usual + local r = h[1] + assert(r == "done!") +end + +--------------------------------------------------------------------------------------------- +-- TEST: we can resume as many times as the lane yields, then read the returned value on join +--------------------------------------------------------------------------------------------- +if true then + -- launch coroutine lane + local h = coro_g("hello", "world", "!") + -- read the yielded values, sending back the expected index + assert(h:resume(1) == "hello") + assert(h:resume(2) == "world") + assert(h:resume(3) == "!") + -- the lane return value is available as usual + local s, r = h:join() + assert(h.status == "done" and s == true and r == "done!") +end + +--------------------------------------------------------------------------------------------------- +-- TEST: if we join a yielded lane, we get a timeout, and we can resume as if we didn't try to join +--------------------------------------------------------------------------------------------------- +if true then + -- launch coroutine lane + local h = coro_g("hello", "world", "!") + -- read the first yielded value, sending back the expected index + assert(h:resume(1) == "hello") + -- join the lane. since it will reach a yield point, it remains suspended, and we should get a timeout + local b, r = h:join(0.5) + local s = h.status + assert(s == "suspended" and b == nil and r == "timeout", "got " .. s .. " " .. tostring(b) .. " " .. r) + -- trying to resume again should proceed normally, since nothing changed + assert(h:resume(2) == "world") + assert(h:resume(3) == "!") + -- the lane return value is available as usual + local s, r = h:join() + assert(h.status == "done" and s == true and r == "done!") +end + +--------------------------------------------------------- +-- TEST: if we index yielded lane, we should get an error +--------------------------------------------------------- +-- TODO: implement this test! + + +------------------------------------------------------------------------------------- +-- TEST: if we close yielded lane, we can join it and get the last yielded values out +------------------------------------------------------------------------------------- +-- TODO: we need to implement lane:close() for that! -- cgit v1.2.3-55-g6feb