aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenoit Germain <benoit.germain@ubisoft.com>2025-07-01 08:01:46 +0200
committerBenoit Germain <benoit.germain@ubisoft.com>2025-07-01 08:01:46 +0200
commit45774df1eeeaae0868420104a4cdad4691995dc9 (patch)
tree9e7347ce1b8cff7cca2c5a733c33431a07636023
parentd7d756e30680bcff036118b47ac45b740e020ea2 (diff)
downloadlanes-45774df1eeeaae0868420104a4cdad4691995dc9.tar.gz
lanes-45774df1eeeaae0868420104a4cdad4691995dc9.tar.bz2
lanes-45774df1eeeaae0868420104a4cdad4691995dc9.zip
Clarified interactions between join() and coroutines
-rw-r--r--src/lane.cpp76
-rw-r--r--unit_tests/scripts/coro/yielding_function.lua45
2 files changed, 75 insertions, 46 deletions
diff --git a/src/lane.cpp b/src/lane.cpp
index f8685e9..5ec6bcf 100644
--- a/src/lane.cpp
+++ b/src/lane.cpp
@@ -128,14 +128,11 @@ static LUAG_FUNC(lane_join)
128 } else if (!lua_isnoneornil(L_, 2)) { 128 } else if (!lua_isnoneornil(L_, 2)) {
129 raise_luaL_argerror(L_, StackIndex{ 2 }, "incorrect duration type"); 129 raise_luaL_argerror(L_, StackIndex{ 2 }, "incorrect duration type");
130 } 130 }
131 lua_settop(L_, 1); // L_: lane
131 132
132 // it is forbidden to join a suspended coroutine
133 if (_lane->status.load(std::memory_order_acquire) == Lane::Suspended) {
134 raise_luaL_error(L_, "cannot join a suspended coroutine");
135 }
136 133
137 lua_settop(L_, 1); // L_: lane 134 // wait until suspended or done
138 bool const _done{ !_lane->thread.joinable() || _lane->waitForCompletion(_until, false) }; 135 bool const _done{ !_lane->thread.joinable() || _lane->waitForCompletion(_until, true) };
139 136
140 if (!_done) { 137 if (!_done) {
141 lua_pushnil(L_); // L_: lane nil 138 lua_pushnil(L_); // L_: lane nil
@@ -143,6 +140,19 @@ static LUAG_FUNC(lane_join)
143 return 2; 140 return 2;
144 } 141 }
145 142
143 // if lane is suspended, force a cancellation that will cause the yield loop to break, and the termination of the thread
144 if (_lane->status.load(std::memory_order_acquire) == Lane::Suspended) {
145 auto const _cr{ _lane->cancel(CancelOp{ CancelRequest::Soft, LuaHookMask::None }, std::chrono::time_point<std::chrono::steady_clock>::max(), WakeLane::Yes, 0) };
146 if (_cr == CancelResult::Timeout) {
147 lua_pushnil(L_); // L_: lane nil
148 luaG_pushstring(L_, "timeout"); // L_: lane nil "timeout"
149 return 2;
150 }
151 if (_lane->status.load(std::memory_order_acquire) == Lane::Suspended) {
152 raise_luaL_error(L_, "INTERNAL ERROR: Lane should not be suspended here");
153 }
154 }
155
146 STACK_CHECK_START_REL(L_, 0); // L_: lane 156 STACK_CHECK_START_REL(L_, 0); // L_: lane
147 // Thread is Suspended or Done/Error/Cancelled; the Lane thread isn't working with it, therefore we can. 157 // Thread is Suspended or Done/Error/Cancelled; the Lane thread isn't working with it, therefore we can.
148 158
@@ -151,9 +161,8 @@ static LUAG_FUNC(lane_join)
151 STACK_GROW(L_, std::max(3, _stored + 1)); 161 STACK_GROW(L_, std::max(3, _stored + 1));
152 switch (_lane->status.load(std::memory_order_acquire)) { 162 switch (_lane->status.load(std::memory_order_acquire)) {
153 case Lane::Suspended: 163 case Lane::Suspended:
154 // TODO: maybe this can happen if we have a to-be-closed handle on a suspended lane? TO BE TESTED! 164 raise_luaL_error(L_, "INTERNAL ERROR: SHOULD NEVER BE SUSPENDED HERE");
155 raise_luaL_error(L_, "INTERNAL ERROR: should not join a suspended coroutine"); 165 break;
156 [[fallthrough]];
157 166
158 case Lane::Done: // got regular return values 167 case Lane::Done: // got regular return values
159 if (_stored > 0) { 168 if (_stored > 0) {
@@ -274,11 +283,6 @@ static int lane_index_number(lua_State* L_)
274 raise_luaL_error(L_, "INTERNAL ERROR: Failed to join"); 283 raise_luaL_error(L_, "INTERNAL ERROR: Failed to join");
275 } 284 }
276 285
277 // it is forbidden to index a suspended coroutine. if you want to read yielded values, use lane:resume()
278 if (_lane->status.load(std::memory_order_acquire) == Lane::Suspended) {
279 raise_luaL_error(L_, "cannot index a suspended coroutine");
280 }
281
282 // make sure results are stored 286 // make sure results are stored
283 int const _stored{ _lane->storeResults(L_) }; 287 int const _stored{ _lane->storeResults(L_) };
284 if (_key > _stored) { 288 if (_key > _stored) {
@@ -287,6 +291,7 @@ static int lane_index_number(lua_State* L_)
287 } else { 291 } else {
288 _lane->pushIndexedResult(L_, _key); // L_: lane result 292 _lane->pushIndexedResult(L_, _key); // L_: lane result
289 } 293 }
294
290 return 1; 295 return 1;
291} 296}
292 297
@@ -744,31 +749,45 @@ static void lane_main(Lane* const lane_)
744 if (_rc == LuaError::YIELD) { 749 if (_rc == LuaError::YIELD) {
745 // on the stack we find the values pushed by lane:resume() 750 // on the stack we find the values pushed by lane:resume()
746 _nargs = lua_gettop(_L); 751 _nargs = lua_gettop(_L);
747 if (std::unique_lock _guard{ lane_->doneMutex }; true) 752 if (std::unique_lock _guard{ lane_->doneMutex }; true) {
748 { 753 // change our status to suspended, and wait until someone wants us to resume
749 // change our status to suspended, and wait until someone wants us to resume 754 lane_->status.store(Lane::Suspended, std::memory_order_release); // Running -> Suspended
750 lane_->status.store(Lane::Suspended, std::memory_order_release); // Running -> Suspended 755 lane_->doneCondVar.notify_one();
751 lane_->doneCondVar.notify_one(); 756 // wait until the user wants us to resume
752 // wait until the user wants us to resume
753 // update waiting_on, so that the lane can be woken by cancellation requests here? 757 // update waiting_on, so that the lane can be woken by cancellation requests here?
754 lane_->waiting_on = &lane_->doneCondVar; 758 lane_->waiting_on = &lane_->doneCondVar;
755 lane_->doneCondVar.wait(_guard, [lane_]() { return lane_->status.load(std::memory_order_acquire) == Lane::Resuming || lane_->cancelRequest.load(std::memory_order_relaxed) != CancelRequest::None; }); 759 lane_->doneCondVar.wait(_guard, [lane_]() { return lane_->status.load(std::memory_order_acquire) == Lane::Resuming || lane_->cancelRequest.load(std::memory_order_relaxed) != CancelRequest::None; });
756 // here lane_->doneMutex is locked again 760 // here lane_->doneMutex is locked again
757 lane_->waiting_on = nullptr; 761 lane_->waiting_on = nullptr;
758 lane_->status.store(Lane::Running, std::memory_order_release); // Resuming -> Running 762 lane_->status.store(Lane::Running, std::memory_order_release); // Resuming -> Running
759 } 763 }
760 // wait was interrupted because of a cancellation, finish the lane 764 // wait was interrupted because of a cancellation, finish the lane
761 _again = (lane_->cancelRequest.load(std::memory_order_relaxed) == CancelRequest::None); 765 _again = (lane_->cancelRequest.load(std::memory_order_relaxed) == CancelRequest::None);
762 } else { 766 } else {
763 _again = false; 767 _again = false;
764 } 768 }
765 } while (_again); 769 } while (_again);
766#if LUA_VERSION_NUM >= 504
767 if (_rc == LuaError::YIELD) { 770 if (_rc == LuaError::YIELD) {
771#if LUA_VERSION_NUM >= 504
772 lua_State* const _S{ lane_->S };
773 STACK_CHECK_START_REL(_S, 0);
774 // lua_closethread cleans the stack, meaning we lose the yielded values! -> store
775 lua_xmove(_L, _S, lane_->nresults);
768 // lane is cancelled before completion (for example at Lanes shutdown), close everything 776 // lane is cancelled before completion (for example at Lanes shutdown), close everything
769 _rc = static_cast<LuaError>(lua_closethread(_L, nullptr)); 777 _rc = static_cast<LuaError>(lua_closethread(_L, nullptr)); // L: ... retvals|err <close_err>
770 } 778 // then restore the yielded values
779 if (_rc == LuaError::OK) {
780 lua_xmove(_S, _L, lane_->nresults);
781 } else {
782 lua_pop(_S, lane_->nresults);
783 }
784 STACK_CHECK(_S, 0);
785
786#else // LUA_VERSION_NUM
787 // Lua prior to 5.4 do not have lua_closethread.
788 _rc = LuaError::OK;
771#endif // LUA_VERSION_NUM 789#endif // LUA_VERSION_NUM
790 }
772 if (_rc != LuaError::OK) { // an error occurred // L: err... 791 if (_rc != LuaError::OK) { // an error occurred // L: err...
773 // for some reason, in my tests with Lua 5.4, when the coroutine raises an error, I have 3 copies of it on the stack 792 // for some reason, in my tests with Lua 5.4, when the coroutine raises an error, I have 3 copies of it on the stack
774 // or false + the error message when running Lua 5.1 793 // or false + the error message when running Lua 5.1
@@ -980,7 +999,8 @@ CancelResult Lane::cancel(CancelOp const op_, std::chrono::time_point<std::chron
980 999
981 // remember that lanes are not transferable: only one thread can cancel a lane, so no multithreading issue here 1000 // remember that lanes are not transferable: only one thread can cancel a lane, so no multithreading issue here
982 // We can read status without locks, but not wait for it (if Posix no PTHREAD_TIMEDJOIN) 1001 // We can read status without locks, but not wait for it (if Posix no PTHREAD_TIMEDJOIN)
983 if (status.load(std::memory_order_acquire) >= Lane::Done) { 1002 auto const _status{ status.load(std::memory_order_acquire) };
1003 if (_status == Lane::Done || _status == Lane::Error || _status == Lane::Cancelled) {
984 // say "ok" by default, including when lane is already done 1004 // say "ok" by default, including when lane is already done
985 return CancelResult::Cancelled; 1005 return CancelResult::Cancelled;
986 } 1006 }
@@ -1015,7 +1035,7 @@ CancelResult Lane::internalCancel(CancelRequest const rq_, std::chrono::time_poi
1015 } 1035 }
1016 } 1036 }
1017 // wait until the lane stops working with its state (either Suspended or Done+) 1037 // wait until the lane stops working with its state (either Suspended or Done+)
1018 CancelResult const result{ waitForCompletion(until_, true) ? CancelResult::Cancelled : CancelResult::Timeout }; 1038 CancelResult const result{ waitForCompletion(until_, false) ? CancelResult::Cancelled : CancelResult::Timeout };
1019 return result; 1039 return result;
1020} 1040}
1021 1041
diff --git a/unit_tests/scripts/coro/yielding_function.lua b/unit_tests/scripts/coro/yielding_function.lua
index e7367ea..636f094 100644
--- a/unit_tests/scripts/coro/yielding_function.lua
+++ b/unit_tests/scripts/coro/yielding_function.lua
@@ -17,13 +17,13 @@ local yielder = function(...)
17 local _ack = coroutine.yield(_val) 17 local _ack = coroutine.yield(_val)
18 assert(_ack == _i) 18 assert(_ack == _i)
19 end 19 end
20 return "done!" 20 return "bye!"
21end 21end
22 22
23-------------------------------------------------------------------------------------------------- 23--------------------------------------------------------------------------------------------------
24-- TEST: if we start a non-coroutine lane with a yielding function, we should get an error, right? 24-- TEST: if we start a non-coroutine lane with a yielding function, we should get an error, right?
25-------------------------------------------------------------------------------------------------- 25--------------------------------------------------------------------------------------------------
26if true then 26if false then
27 local fun_g = lanes.gen("*", { name = 'auto' }, yielder) 27 local fun_g = lanes.gen("*", { name = 'auto' }, yielder)
28 local h = fun_g("hello", "world", "!") 28 local h = fun_g("hello", "world", "!")
29 local err, status, stack = h:join() 29 local err, status, stack = h:join()
@@ -48,7 +48,7 @@ local coro_g = lanes.coro("*", {name = "auto"}, yielder)
48------------------------------------------------------------------------------------------------- 48-------------------------------------------------------------------------------------------------
49-- TEST: we can resume as many times as the lane yields, then read the returned value on indexing 49-- TEST: we can resume as many times as the lane yields, then read the returned value on indexing
50------------------------------------------------------------------------------------------------- 50-------------------------------------------------------------------------------------------------
51if true then 51if false then
52 -- launch coroutine lane 52 -- launch coroutine lane
53 local h = coro_g("hello", "world", "!") 53 local h = coro_g("hello", "world", "!")
54 -- read the yielded values, sending back the expected index 54 -- read the yielded values, sending back the expected index
@@ -57,13 +57,13 @@ if true then
57 assert(h:resume(3) == "!") 57 assert(h:resume(3) == "!")
58 -- the lane return value is available as usual 58 -- the lane return value is available as usual
59 local r = h[1] 59 local r = h[1]
60 assert(r == "done!") 60 assert(r == "bye!")
61end 61end
62 62
63--------------------------------------------------------------------------------------------- 63---------------------------------------------------------------------------------------------
64-- TEST: we can resume as many times as the lane yields, then read the returned value on join 64-- TEST: we can resume as many times as the lane yields, then read the returned value on join
65--------------------------------------------------------------------------------------------- 65---------------------------------------------------------------------------------------------
66if true then 66if false then
67 -- launch coroutine lane 67 -- launch coroutine lane
68 local h = coro_g("hello", "world", "!") 68 local h = coro_g("hello", "world", "!")
69 -- read the yielded values, sending back the expected index 69 -- read the yielded values, sending back the expected index
@@ -72,7 +72,7 @@ if true then
72 assert(h:resume(3) == "!") 72 assert(h:resume(3) == "!")
73 -- the lane return value is available as usual 73 -- the lane return value is available as usual
74 local s, r = h:join() 74 local s, r = h:join()
75 assert(h.status == "done" and s == true and r == "done!") 75 assert(h.status == "done" and s == true and r == "bye!")
76end 76end
77 77
78--------------------------------------------------------------------------------------------------- 78---------------------------------------------------------------------------------------------------
@@ -83,23 +83,32 @@ if true then
83 local h = coro_g("hello", "world", "!") 83 local h = coro_g("hello", "world", "!")
84 -- read the first yielded value, sending back the expected index 84 -- read the first yielded value, sending back the expected index
85 assert(h:resume(1) == "hello") 85 assert(h:resume(1) == "hello")
86 -- join the lane. since it will reach a yield point, it remains suspended, and we should get a timeout 86 -- join the lane. since it will reach a yield point, it unblocks and ends. last yielded values are returned normally
87 local b, r = h:join(0.5) 87 local b, r = h:join(0.5)
88 local s = h.status 88 local s = h.status
89 assert(s == "suspended" and b == nil and r == "timeout", "got " .. s .. " " .. tostring(b) .. " " .. r) 89 assert(s == "done" and b == true and r == "world", "got " .. s .. " " .. tostring(b) .. " " .. tostring(r))
90 -- trying to resume again should proceed normally, since nothing changed
91 assert(h:resume(2) == "world")
92 assert(h:resume(3) == "!")
93 -- the lane return value is available as usual
94 local s, r = h:join()
95 assert(h.status == "done" and s == true and r == "done!")
96end 90end
97 91
98--------------------------------------------------------- 92-----------------------------------------------------------------------
99-- TEST: if we index yielded lane, we should get an error 93-- TEST: if we index yielded lane, we should get the last yielded value
100--------------------------------------------------------- 94-----------------------------------------------------------------------
101-- TODO: implement this test! 95if false then
96 -- launch coroutine lane
97 local h = coro_g("hello", "world", "!")
98 -- read the first yielded value, sending back the expected index
99 assert(h:resume(1) == "hello")
100 -- indexing multiple times gives back the same us the same yielded value
101 local r1 = h[1]
102 local r2 = h[1]
103 local r3 = h[1]
104 assert(r1 == "world" and r2 == "world" and r3 == "world", "got " .. r1 .. " " .. r2 .. " " .. r3)
105 assert(h:resume(2) == "world")
102 106
107 -- THERE IS AN INCONSISTENCY: h:resume pulls the yielded values directly out of the lane's stack
108 -- but h[n] removes them and stores them in the internal values storage table
109 -- TODO: so we need to decide: should indexing a yielded lane work like resume()?
110 assert(h:resume(3) == "!")
111end
103 112
104------------------------------------------------------------------------------------- 113-------------------------------------------------------------------------------------
105-- TEST: if we close yielded lane, we can join it and get the last yielded values out 114-- TEST: if we close yielded lane, we can join it and get the last yielded values out