aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenoit Germain <benoit.germain@ubisoft.com>2025-07-03 18:11:13 +0200
committerBenoit Germain <benoit.germain@ubisoft.com>2025-07-03 18:11:13 +0200
commit72d7b36e020fd3f11ec002c110e7340f667d6628 (patch)
tree00b42d89192b1bfa88245224f827544a7c5dbc50
parent45774df1eeeaae0868420104a4cdad4691995dc9 (diff)
downloadlanes-72d7b36e020fd3f11ec002c110e7340f667d6628.tar.gz
lanes-72d7b36e020fd3f11ec002c110e7340f667d6628.tar.bz2
lanes-72d7b36e020fd3f11ec002c110e7340f667d6628.zip
Fix more issues related to suspended coroutines and join/indexing operations
* indexing and joining a suspended lane closes the coroutine, causing the termination of the lane * properly close coroutine to-be-closed variables on interruption
-rw-r--r--docs/index.html11
-rw-r--r--src/lane.cpp275
-rw-r--r--src/lane.hpp6
-rw-r--r--unit_tests/scripts/coro/collect_yielded_lane.lua105
-rw-r--r--unit_tests/scripts/coro/yielding_function.lua34
5 files changed, 269 insertions, 162 deletions
diff --git a/docs/index.html b/docs/index.html
index ebb93d3..f6201cc 100644
--- a/docs/index.html
+++ b/docs/index.html
@@ -71,7 +71,7 @@
71 </p> 71 </p>
72 72
73 <p> 73 <p>
74 This document was revised on 26-Jun-25, and applies to version <tt>4.0.0</tt>. 74 This document was revised on 03-Jul-25, and applies to version <tt>4.0.0</tt>.
75 </p> 75 </p>
76 </font> 76 </font>
77 </center> 77 </center>
@@ -922,8 +922,9 @@
922 </tr> 922 </tr>
923 </table> 923 </table>
924 924
925 Coroutine lanes function mostly like regular coroutines. They can use <tt>coroutine.yield()</tt> normally, in which case the yielded values can be obtained with regular lane indexing (see <a href="#results">Results and errors</a>).<br /> 925 Coroutine lanes function mostly like regular coroutines. They can use <tt>coroutine.yield()</tt> normally.<br />
926 A yielded coroutine lane has a <tt>"suspended"</tt> status. It can be resumed with <tt>lane_h:resume(values...)</tt>. 926 A yielded coroutine lane has a <tt>"suspended"</tt> status. It can be resumed with <tt>lane_h:resume(values...), which returns the yielded values</tt>.
927 The latter can also be the returned values of <tt>lane_h:join()</tt> or accessed by regular lane indexing (see <a href="#results">Results and errors</a>).<br />
927 <table border="1" bgcolor="#FFFFE0" cellpadding="10" style="width:50%"> 928 <table border="1" bgcolor="#FFFFE0" cellpadding="10" style="width:50%">
928 <tr> 929 <tr>
929 <td> 930 <td>
@@ -932,8 +933,8 @@
932 </tr> 933 </tr>
933 </table> 934 </table>
934 935
935 The reply values are returned to the lane body at the <tt>coroutine.yield()</tt> point.<br /> 936 Just like regulare coroutines, the reply values passed to <tt>h:resume()</tt> are returned to the lane body at the <tt>coroutine.yield()</tt> point.<br />
936 If the yielded values were previously obtained by lane indexing, <tt>resume()</tt> returns <tt>nil</tt>. 937 If a coroutine lane is suspended when it is joined either by indexing or <tt>lane_h:join()</tt>, active to-be-closed variables are closed at that point, and the Lane can no longer be resumed.
937</p> 938</p>
938<h3>Free running lanes</h3> 939<h3>Free running lanes</h3>
939 940
diff --git a/src/lane.cpp b/src/lane.cpp
index 5ec6bcf..33ee8a0 100644
--- a/src/lane.cpp
+++ b/src/lane.cpp
@@ -130,93 +130,18 @@ static LUAG_FUNC(lane_join)
130 } 130 }
131 lua_settop(L_, 1); // L_: lane 131 lua_settop(L_, 1); // L_: lane
132 132
133
134 // wait until suspended or done 133 // wait until suspended or done
135 bool const _done{ !_lane->thread.joinable() || _lane->waitForCompletion(_until, true) }; 134 STACK_CHECK_START_REL(L_, 0); // L_: lane
136 135 if (!_lane->waitForJoin(L_, _until)) {
137 if (!_done) { 136 // in that case, should have pushed nil, "timeout"
138 lua_pushnil(L_); // L_: lane nil 137 STACK_CHECK(L_, 2);
139 luaG_pushstring(L_, "timeout"); // L_: lane nil "timeout"
140 return 2; 138 return 2;
141 } 139 }
140 STACK_CHECK(L_, 0); // L_: lane
141 // Thread is Done/Error/Cancelled; the Lane thread isn't working with it, therefore we can.
142 142
143 // if lane is suspended, force a cancellation that will cause the yield loop to break, and the termination of the thread 143 std::ignore = _lane->storeResults(L_);
144 if (_lane->status.load(std::memory_order_acquire) == Lane::Suspended) { 144 int const _ret{ _lane->pushStoredResults(L_) };
145 auto const _cr{ _lane->cancel(CancelOp{ CancelRequest::Soft, LuaHookMask::None }, std::chrono::time_point<std::chrono::steady_clock>::max(), WakeLane::Yes, 0) };
146 if (_cr == CancelResult::Timeout) {
147 lua_pushnil(L_); // L_: lane nil
148 luaG_pushstring(L_, "timeout"); // L_: lane nil "timeout"
149 return 2;
150 }
151 if (_lane->status.load(std::memory_order_acquire) == Lane::Suspended) {
152 raise_luaL_error(L_, "INTERNAL ERROR: Lane should not be suspended here");
153 }
154 }
155
156 STACK_CHECK_START_REL(L_, 0); // L_: lane
157 // Thread is Suspended or Done/Error/Cancelled; the Lane thread isn't working with it, therefore we can.
158
159 int _ret{ 0 };
160 int const _stored{ _lane->storeResults(L_) };
161 STACK_GROW(L_, std::max(3, _stored + 1));
162 switch (_lane->status.load(std::memory_order_acquire)) {
163 case Lane::Suspended:
164 raise_luaL_error(L_, "INTERNAL ERROR: SHOULD NEVER BE SUSPENDED HERE");
165 break;
166
167 case Lane::Done: // got regular return values
168 if (_stored > 0) {
169 lua_getiuservalue(L_, StackIndex{ 1 }, UserValueIndex{ 1 }); // L_: lane {uv}
170 for (int _i = 2; _i <= _stored; ++_i) {
171 lua_rawgeti(L_, 2, _i); // L_: lane {uv} results2...N
172 }
173 lua_rawgeti(L_, 2, 1); // L_: lane {uv} results2...N result1
174 lua_replace(L_, 2); // L_: lane results
175 }
176 // we precede the lane body returned values with boolean true
177 lua_pushboolean(L_, 1); // L_: lane results true
178 lua_replace(L_, 1); // L_: true results
179 _ret = _stored + 1;
180 STACK_CHECK(L_, _stored);
181 break;
182
183 case Lane::Error:
184 {
185 LUA_ASSERT(L_, _stored == 2 || _stored == 3);
186 lua_getiuservalue(L_, StackIndex{ 1 }, UserValueIndex{ 1 }); // L_: lane {uv}
187 lua_rawgeti(L_, 2, 2); // L_: lane {uv} <error>
188 lua_rawgeti(L_, 2, 3); // L_: lane {uv} <error> <trace>|nil
189 if (lua_isnil(L_, -1)) {
190 lua_replace(L_, 2); // L_: lane nil <error>
191 } else {
192 lua_rawgeti(L_, 2, 1); // L_: lane {uv} <error> <trace> nil
193 lua_replace(L_, 2); // L_: lane nil <error> <trace>
194 }
195 _ret = lua_gettop(L_) - 1; // 2 or 3
196 STACK_CHECK(L_, _ret);
197 }
198 break;
199
200 case Lane::Cancelled:
201 {
202 LUA_ASSERT(L_, _stored == 2);
203 lua_getiuservalue(L_, StackIndex{ 1 }, UserValueIndex{ 1 }); // L_: lane {uv}
204 lua_rawgeti(L_, 2, 2); // L_: lane {uv} cancel_error
205 lua_rawgeti(L_, 2, 1); // L_: lane {uv} cancel_error nil
206 lua_replace(L_, -3); // L_: lane nil cancel_error
207 LUA_ASSERT(L_, lua_isnil(L_, -2) && kCancelError.equals(L_, kIdxTop));
208 _ret = 2;
209 STACK_CHECK(L_, _ret);
210 }
211 break;
212
213 default:
214 DEBUGSPEW_CODE(DebugSpew(nullptr) << "Unknown Lane status: " << static_cast<int>(_lane->status.load(std::memory_order_relaxed)) << std::endl);
215 LUA_ASSERT(L_, false);
216 _ret = 0;
217 STACK_CHECK(L_, _ret);
218 }
219 LUA_ASSERT(L_, lua_gettop(L_) >= _ret);
220 return _ret; 145 return _ret;
221} 146}
222 147
@@ -277,13 +202,17 @@ static int lane_index_number(lua_State* L_)
277 int const _key{ static_cast<int>(lua_tointeger(L_, 2)) }; 202 int const _key{ static_cast<int>(lua_tointeger(L_, 2)) };
278 lua_pop(L_, 1); // L_: lane 203 lua_pop(L_, 1); // L_: lane
279 204
280 // wait until the lane finishes or is suspended 205 // wait until suspended or done
206 STACK_CHECK_START_REL(L_, 0); // L_: lane
281 std::chrono::time_point<std::chrono::steady_clock> _until{ std::chrono::time_point<std::chrono::steady_clock>::max() }; 207 std::chrono::time_point<std::chrono::steady_clock> _until{ std::chrono::time_point<std::chrono::steady_clock>::max() };
282 if (!_lane->waitForCompletion(_until, true)) { 208 if (!_lane->waitForJoin(L_, _until)) {
283 raise_luaL_error(L_, "INTERNAL ERROR: Failed to join"); 209 // in that case, should have pushed nil, "timeout"
210 STACK_CHECK(L_, 2);
211 return 2;
284 } 212 }
213 STACK_CHECK(L_, 0); // L_: lane
214 // Thread is Done/Error/Cancelled; the Lane thread isn't working with it, therefore we can.
285 215
286 // make sure results are stored
287 int const _stored{ _lane->storeResults(L_) }; 216 int const _stored{ _lane->storeResults(L_) };
288 if (_key > _stored) { 217 if (_key > _stored) {
289 // get nil if indexing beyond the actual returned value count 218 // get nil if indexing beyond the actual returned value count
@@ -741,7 +670,7 @@ static void lane_main(Lane* const lane_)
741 lane_->nresults = lua_gettop(_L) - _errorHandlerCount; 670 lane_->nresults = lua_gettop(_L) - _errorHandlerCount;
742 } else { 671 } else {
743 // S and L are different: we run as a coroutine in Lua thread L created in state S 672 // S and L are different: we run as a coroutine in Lua thread L created in state S
744 bool _again{ true }; 673 bool _shouldClose{ false };
745 do { 674 do {
746 // starting with Lua 5.4, lua_resume can leave more stuff on the stack below the actual yielded values. 675 // starting with Lua 5.4, lua_resume can leave more stuff on the stack below the actual yielded values.
747 // that's why we have lane_->nresults 676 // that's why we have lane_->nresults
@@ -754,19 +683,25 @@ static void lane_main(Lane* const lane_)
754 lane_->status.store(Lane::Suspended, std::memory_order_release); // Running -> Suspended 683 lane_->status.store(Lane::Suspended, std::memory_order_release); // Running -> Suspended
755 lane_->doneCondVar.notify_one(); 684 lane_->doneCondVar.notify_one();
756 // wait until the user wants us to resume 685 // wait until the user wants us to resume
757 // update waiting_on, so that the lane can be woken by cancellation requests here? 686 // update waiting_on, so that the lane can be woken by cancellation requests here
758 lane_->waiting_on = &lane_->doneCondVar; 687 lane_->waiting_on = &lane_->doneCondVar;
759 lane_->doneCondVar.wait(_guard, [lane_]() { return lane_->status.load(std::memory_order_acquire) == Lane::Resuming || lane_->cancelRequest.load(std::memory_order_relaxed) != CancelRequest::None; }); 688 lane_->doneCondVar.wait(_guard,
689 [lane_,&_shouldClose]()
690 {
691 auto const _status{ lane_->status.load(std::memory_order_acquire) };
692 // wait interrupted because of a cancellation or join request means we have to abort the resume loop
693 _shouldClose = (_status == Lane::Closing);
694 return _shouldClose || (_status == Lane::Resuming) || (lane_->cancelRequest.load(std::memory_order_relaxed) != CancelRequest::None);
695 }
696 );
760 // here lane_->doneMutex is locked again 697 // here lane_->doneMutex is locked again
761 lane_->waiting_on = nullptr; 698 lane_->waiting_on = nullptr;
762 lane_->status.store(Lane::Running, std::memory_order_release); // Resuming -> Running 699 lane_->status.store(Lane::Running, std::memory_order_release); // Resuming -> Running
763 } 700 }
764 // wait was interrupted because of a cancellation, finish the lane
765 _again = (lane_->cancelRequest.load(std::memory_order_relaxed) == CancelRequest::None);
766 } else { 701 } else {
767 _again = false; 702 _shouldClose = true;
768 } 703 }
769 } while (_again); 704 } while (!_shouldClose);
770 if (_rc == LuaError::YIELD) { 705 if (_rc == LuaError::YIELD) {
771#if LUA_VERSION_NUM >= 504 706#if LUA_VERSION_NUM >= 504
772 lua_State* const _S{ lane_->S }; 707 lua_State* const _S{ lane_->S };
@@ -855,7 +790,7 @@ static void lane_main(Lane* const lane_)
855#if LUA_VERSION_NUM >= 504 790#if LUA_VERSION_NUM >= 504
856 791
857// __close(lane_ud, <err>) 792// __close(lane_ud, <err>)
858static LUAG_FUNC(lane___close) 793static LUAG_FUNC(lane_close)
859{ 794{
860 [[maybe_unused]] Lane* const _lane{ ToLane(L_, StackIndex{ 1 }) }; // L_: lane err|nil 795 [[maybe_unused]] Lane* const _lane{ ToLane(L_, StackIndex{ 1 }) }; // L_: lane err|nil
861 // drop the error if any 796 // drop the error if any
@@ -869,15 +804,6 @@ static LUAG_FUNC(lane___close)
869 return lua_gettop(L_); 804 return lua_gettop(L_);
870} 805}
871 806
872// #################################################################################################
873
874// close(lane_ud)
875static LUAG_FUNC(lane_close)
876{
877 [[maybe_unused]] Lane* const _lane{ ToLane(L_, StackIndex{ 1 }) }; // L_: lane err|nil
878 raise_luaL_error(L_, "not implemented"); // TODO!
879 return 0;
880}
881#endif // LUA_VERSION_NUM >= 504 807#endif // LUA_VERSION_NUM >= 504
882 808
883// ################################################################################################# 809// #################################################################################################
@@ -1069,14 +995,11 @@ namespace {
1069 namespace local { 995 namespace local {
1070 static struct luaL_Reg const sLaneFunctions[] = { 996 static struct luaL_Reg const sLaneFunctions[] = {
1071#if LUA_VERSION_NUM >= 504 997#if LUA_VERSION_NUM >= 504
1072 { "__close", LG_lane___close }, 998 { "__close", LG_lane_close },
1073#endif // LUA_VERSION_NUM >= 504 999#endif // LUA_VERSION_NUM >= 504
1074 { "__gc", LG_lane_gc }, 1000 { "__gc", LG_lane_gc },
1075 { "__index", LG_lane_index }, 1001 { "__index", LG_lane_index },
1076 { "cancel", LG_lane_cancel }, 1002 { "cancel", LG_lane_cancel },
1077#if LUA_VERSION_NUM >= 504
1078 { "close", LG_lane_close },
1079#endif // LUA_VERSION_NUM >= 504
1080 { "get_threadname", LG_lane_get_threadname }, 1003 { "get_threadname", LG_lane_get_threadname },
1081 { "join", LG_lane_join }, 1004 { "join", LG_lane_join },
1082 { "resume", LG_lane_resume }, 1005 { "resume", LG_lane_resume },
@@ -1171,6 +1094,82 @@ void Lane::pushIndexedResult(lua_State* const L_, int const key_) const
1171// ################################################################################################# 1094// #################################################################################################
1172 1095
1173[[nodiscard]] 1096[[nodiscard]]
1097int Lane::pushStoredResults(lua_State* const L_) const
1098{
1099 STACK_CHECK_START_ABS(L_, 1); // should only have the lane UD on the stack
1100 static constexpr StackIndex kIdxSelf{ 1 };
1101 static constexpr UserValueIndex kUvResults{ 1 };
1102 LUA_ASSERT(L_, ToLane(L_, kIdxSelf) == this); // L_: lane
1103 lua_getiuservalue(L_, kIdxSelf, kUvResults); // L_: lane {uv}
1104 lua_rawgeti(L_, kIdxTop, 0); // L_: lane {uv} stored
1105 int const _stored{ static_cast<int>(lua_tointeger(L_, kIdxTop)) };
1106 lua_pop(L_, 1); // L_: lane {uv}
1107
1108 int _ret{};
1109 STACK_GROW(L_, std::max(3, _stored + 1));
1110 switch (status.load(std::memory_order_acquire)) {
1111 case Lane::Suspended:
1112 raise_luaL_error(L_, "INTERNAL ERROR: SHOULD NEVER BE SUSPENDED HERE");
1113 break;
1114
1115 case Lane::Done: // got regular return values
1116 if (_stored > 0) {
1117 for (int _i = 2; _i <= _stored; ++_i) {
1118 lua_rawgeti(L_, 2, _i); // L_: lane {uv} results2...N
1119 }
1120 lua_rawgeti(L_, 2, 1); // L_: lane {uv} results2...N result1
1121 lua_replace(L_, 2); // L_: lane results
1122 } else {
1123 lua_pop(L_, 1); // L_: lane
1124 }
1125 // we precede the lane body returned values with boolean true
1126 lua_pushboolean(L_, 1); // L_: lane results true
1127 lua_replace(L_, 1); // L_: true results
1128 _ret = _stored + 1;
1129 STACK_CHECK(L_, _ret);
1130 break;
1131
1132 case Lane::Error:
1133 {
1134 LUA_ASSERT(L_, _stored == 2 || _stored == 3); // contains nil error [trace]
1135 lua_rawgeti(L_, 2, 2); // L_: lane {uv} <error>
1136 lua_rawgeti(L_, 2, 3); // L_: lane {uv} <error> <trace>|nil
1137 if (lua_isnil(L_, -1)) {
1138 lua_replace(L_, 2); // L_: lane nil <error>
1139 } else {
1140 lua_rawgeti(L_, 2, 1); // L_: lane {uv} <error> <trace> nil
1141 lua_replace(L_, 2); // L_: lane nil <error> <trace>
1142 }
1143 _ret = _stored; // 2 or 3
1144 STACK_CHECK(L_, _ret + 1); // stack still contains the lane UD below
1145 }
1146 break;
1147
1148 case Lane::Cancelled:
1149 {
1150 LUA_ASSERT(L_, _stored == 2);
1151 lua_rawgeti(L_, 2, 2); // L_: lane {uv} cancel_error
1152 lua_rawgeti(L_, 2, 1); // L_: lane {uv} cancel_error nil
1153 lua_replace(L_, -3); // L_: lane nil cancel_error
1154 LUA_ASSERT(L_, lua_isnil(L_, -2) && kCancelError.equals(L_, kIdxTop));
1155 _ret = 2;
1156 STACK_CHECK(L_, _ret + 1); // stack still contains the lane UD below
1157 }
1158 break;
1159
1160 default:
1161 DEBUGSPEW_CODE(DebugSpew(nullptr) << "Unknown Lane status: " << static_cast<int>(_lane->status.load(std::memory_order_relaxed)) << std::endl);
1162 LUA_ASSERT(L_, false);
1163 _ret = 0;
1164 STACK_CHECK(L_, _ret);
1165 }
1166 LUA_ASSERT(L_, lua_gettop(L_) >= _ret);
1167 return _ret;
1168}
1169
1170// #################################################################################################
1171
1172[[nodiscard]]
1174std::string_view Lane::pushErrorTraceLevel(lua_State* L_) const 1173std::string_view Lane::pushErrorTraceLevel(lua_State* L_) const
1175{ 1174{
1176 std::string_view const _str{ errorTraceLevelString() }; 1175 std::string_view const _str{ errorTraceLevelString() };
@@ -1270,15 +1269,16 @@ int Lane::storeResults(lua_State* const L_)
1270 lua_getiuservalue(L_, kIdxSelf, UserValueIndex{ 1 }); // L_: lane ... {uv} 1269 lua_getiuservalue(L_, kIdxSelf, UserValueIndex{ 1 }); // L_: lane ... {uv}
1271 StackIndex const _tidx{ lua_gettop(L_) }; 1270 StackIndex const _tidx{ lua_gettop(L_) };
1272 1271
1273 int _stored{}; 1272 // if the results were already stored from a previous indexing, just say how many values we have in store
1274 if (nresults == 0) { 1273 if (!L) {
1275 lua_rawgeti(L_, -1, 0); // L_: lane ... {uv} nresults 1274 lua_rawgeti(L_, -1, 0); // L_: lane ... {uv} nresults
1276 _stored = static_cast<int>(lua_tointeger(L_, -1)); 1275 auto const _stored{ static_cast<int>(lua_tointeger(L_, -1)) };
1277 lua_pop(L_, 2); 1276 lua_pop(L_, 2);
1278 STACK_CHECK(L_, 0); 1277 STACK_CHECK(L_, 0);
1279 return _stored; 1278 return _stored;
1280 } 1279 }
1281 1280
1281 int _stored{};
1282 switch (status.load(std::memory_order_acquire)) { 1282 switch (status.load(std::memory_order_acquire)) {
1283 default: 1283 default:
1284 // this is an internal error, we probably never get here 1284 // this is an internal error, we probably never get here
@@ -1358,12 +1358,13 @@ int Lane::storeResults(lua_State* const L_)
1358//--- 1358//---
1359// str= thread_status( lane ) 1359// str= thread_status( lane )
1360// 1360//
1361// "pending" -> | ("running" <-> "waiting") <-> "suspended" <-> "resuming" | -> "done"/"error"/"cancelled" 1361// "pending" -> | ("running" <-> "waiting") <-> "suspended" <-> "resuming/closing" | -> "done"/"error"/"cancelled"
1362 1362
1363// "pending" not started yet 1363// "pending" not started yet
1364// "running" started, doing its work.. 1364// "running" started, doing its work..
1365// "suspended" returned from a lua_resume 1365// "suspended" returned from a lua_resume
1366// "resuming" told by its parent state to resume 1366// "resuming" told by its parent state to resume
1367// "closing" not observable from the outside: happens only inside a join()/indexation call to unblock a suspended coroutine Lane so that it can join properly
1367// "waiting" blocked in a send()/receive() 1368// "waiting" blocked in a send()/receive()
1368// "done" finished, results are there 1369// "done" finished, results are there
1369// "error" finished at an error, error value is there 1370// "error" finished at an error, error value is there
@@ -1374,7 +1375,7 @@ std::string_view Lane::threadStatusString() const
1374{ 1375{
1375 static constexpr std::string_view kStrs[] = { 1376 static constexpr std::string_view kStrs[] = {
1376 "pending", 1377 "pending",
1377 "running", "suspended", "resuming", 1378 "running", "suspended", "resuming", "closing",
1378 "waiting", 1379 "waiting",
1379 "done", "error", "cancelled" 1380 "done", "error", "cancelled"
1380 }; 1381 };
@@ -1382,12 +1383,13 @@ std::string_view Lane::threadStatusString() const
1382 static_assert(1 == static_cast<std::underlying_type_t<Lane::Status>>(Running)); 1383 static_assert(1 == static_cast<std::underlying_type_t<Lane::Status>>(Running));
1383 static_assert(2 == static_cast<std::underlying_type_t<Lane::Status>>(Suspended)); 1384 static_assert(2 == static_cast<std::underlying_type_t<Lane::Status>>(Suspended));
1384 static_assert(3 == static_cast<std::underlying_type_t<Lane::Status>>(Resuming)); 1385 static_assert(3 == static_cast<std::underlying_type_t<Lane::Status>>(Resuming));
1385 static_assert(4 == static_cast<std::underlying_type_t<Lane::Status>>(Waiting)); 1386 static_assert(4 == static_cast<std::underlying_type_t<Lane::Status>>(Closing));
1386 static_assert(5 == static_cast<std::underlying_type_t<Lane::Status>>(Done)); 1387 static_assert(5 == static_cast<std::underlying_type_t<Lane::Status>>(Waiting));
1387 static_assert(6 == static_cast<std::underlying_type_t<Lane::Status>>(Error)); 1388 static_assert(6 == static_cast<std::underlying_type_t<Lane::Status>>(Done));
1388 static_assert(7 == static_cast<std::underlying_type_t<Lane::Status>>(Cancelled)); 1389 static_assert(7 == static_cast<std::underlying_type_t<Lane::Status>>(Error));
1390 static_assert(8 == static_cast<std::underlying_type_t<Lane::Status>>(Cancelled));
1389 auto const _status{ static_cast<std::underlying_type_t<Lane::Status>>(status.load(std::memory_order_acquire)) }; 1391 auto const _status{ static_cast<std::underlying_type_t<Lane::Status>>(status.load(std::memory_order_acquire)) };
1390 if (_status < 0 || _status > 7) { // should never happen, but better safe than sorry 1392 if (_status < 0 || _status > 8) { // should never happen, but better safe than sorry
1391 return ""; 1393 return "";
1392 } 1394 }
1393 return kStrs[_status]; 1395 return kStrs[_status];
@@ -1407,3 +1409,40 @@ bool Lane::waitForCompletion(std::chrono::time_point<std::chrono::steady_clock>
1407 return _status == Lane::Done || _status == Lane::Error || _status == Lane::Cancelled || _status == suspended; 1409 return _status == Lane::Done || _status == Lane::Error || _status == Lane::Cancelled || _status == suspended;
1408 }); 1410 });
1409} 1411}
1412
1413// #################################################################################################
1414
1415[[nodiscard]]
1416bool Lane::waitForJoin(lua_State* const L_, std::chrono::time_point<std::chrono::steady_clock> until_)
1417{
1418 // wait until suspended or done
1419 {
1420 bool const _done{ !thread.joinable() || waitForCompletion(until_, true) };
1421
1422 if (!_done) {
1423 lua_pushnil(L_); // L_: lane nil
1424 luaG_pushstring(L_, "timeout"); // L_: lane nil "timeout"
1425 return false;
1426 }
1427 }
1428
1429 // if lane is suspended, force the yield loop to break, and the termination of the thread
1430 if (status.load(std::memory_order_acquire) == Lane::Suspended) {
1431 LUA_ASSERT(L_, waiting_on == &doneCondVar);
1432 status.store(Lane::Closing, std::memory_order_release);
1433 doneCondVar.notify_all();
1434 // wait until done
1435 {
1436 bool const _done{ !thread.joinable() || waitForCompletion(until_, true) };
1437
1438 if (!_done) {
1439 lua_pushnil(L_); // L_: lane nil
1440 luaG_pushstring(L_, "timeout"); // L_: lane nil "timeout"
1441 return false;
1442 }
1443 }
1444 LUA_ASSERT(L_, status.load(std::memory_order_acquire) != Lane::Closing);
1445 }
1446 LUA_ASSERT(L_, status.load(std::memory_order_acquire) != Lane::Suspended);
1447 return true;
1448}
diff --git a/src/lane.hpp b/src/lane.hpp
index fd46d68..bd328b1 100644
--- a/src/lane.hpp
+++ b/src/lane.hpp
@@ -56,6 +56,7 @@ class Lane final
56 /* 56 /*
57 Pending: The Lua VM hasn't done anything yet. 57 Pending: The Lua VM hasn't done anything yet.
58 Resuming: The user requested the lane to resume execution from Suspended state. 58 Resuming: The user requested the lane to resume execution from Suspended state.
59 Closing: The user is joining the lane, specifically interrupting a suspended Lane.
59 Suspended: returned from lua_resume, waiting for the client to request a lua_resume. 60 Suspended: returned from lua_resume, waiting for the client to request a lua_resume.
60 Running, Suspended, Waiting: Thread is inside the Lua VM. 61 Running, Suspended, Waiting: Thread is inside the Lua VM.
61 Done, Error, Cancelled: Thread execution is outside the Lua VM. It can be lua_close()d. 62 Done, Error, Cancelled: Thread execution is outside the Lua VM. It can be lua_close()d.
@@ -66,6 +67,7 @@ class Lane final
66 Running, 67 Running,
67 Suspended, 68 Suspended,
68 Resuming, 69 Resuming,
70 Closing,
69 Waiting, 71 Waiting,
70 Done, 72 Done,
71 Error, 73 Error,
@@ -199,6 +201,8 @@ class Lane final
199 static void PushMetatable(lua_State* L_); 201 static void PushMetatable(lua_State* L_);
200 void pushStatusString(lua_State* L_) const; 202 void pushStatusString(lua_State* L_) const;
201 void pushIndexedResult(lua_State* L_, int key_) const; 203 void pushIndexedResult(lua_State* L_, int key_) const;
204 [[nodiscard]]
205 int pushStoredResults(lua_State* L_) const;
202 void resetResultsStorage(lua_State* L_, StackIndex self_idx_); 206 void resetResultsStorage(lua_State* L_, StackIndex self_idx_);
203 void selfdestructAdd(); 207 void selfdestructAdd();
204 [[nodiscard]] 208 [[nodiscard]]
@@ -213,6 +217,8 @@ class Lane final
213 // wait until the lane stops working with its state (either Suspended or Done+) 217 // wait until the lane stops working with its state (either Suspended or Done+)
214 [[nodiscard]] 218 [[nodiscard]]
215 bool waitForCompletion(std::chrono::time_point<std::chrono::steady_clock> until_, bool const _acceptSuspended); 219 bool waitForCompletion(std::chrono::time_point<std::chrono::steady_clock> until_, bool const _acceptSuspended);
220 [[nodiscard]]
221 bool waitForJoin(lua_State* _L, std::chrono::time_point<std::chrono::steady_clock> until_);
216}; 222};
217 223
218// ################################################################################################# 224// #################################################################################################
diff --git a/unit_tests/scripts/coro/collect_yielded_lane.lua b/unit_tests/scripts/coro/collect_yielded_lane.lua
index 0459698..2bc4ae8 100644
--- a/unit_tests/scripts/coro/collect_yielded_lane.lua
+++ b/unit_tests/scripts/coro/collect_yielded_lane.lua
@@ -1,4 +1,5 @@
1local lanes = require "lanes" 1local fixture = require "fixture"
2local lanes = require "lanes".configure{on_state_create = fixture.on_state_create}
2 3
3local fixture = require "fixture" 4local fixture = require "fixture"
4lanes.finally(fixture.throwing_finalizer) 5lanes.finally(fixture.throwing_finalizer)
@@ -7,18 +8,27 @@ local utils = lanes.require "_utils"
7local PRINT = utils.MAKE_PRINT() 8local PRINT = utils.MAKE_PRINT()
8 9
9-- a lane body that yields stuff 10-- a lane body that yields stuff
10local yielder = function(out_linda_) 11local yielder = function(out_linda_, wait_)
12 local fixture = assert(require "fixture")
11 -- here is a to-be-closed variable that, when closed, sends "Closed!" in the "out" slot of the provided linda 13 -- here is a to-be-closed variable that, when closed, sends "Closed!" in the "out" slot of the provided linda
12 local t <close> = setmetatable( 14 local t <close> = setmetatable(
13 { text = "Closed!" }, { 15 { text = "Closed!" }, {
14 __close = function(self, err) 16 __close = function(self, err)
17 if wait_ then
18 fixture.block_for(wait_)
19 end
15 out_linda_:send("out", self.text) 20 out_linda_:send("out", self.text)
16 end 21 end
17 } 22 }
18 ) 23 )
19 -- yield forever 24 -- yield forever, but be cancel-friendly
25 local n = 1
20 while true do 26 while true do
21 coroutine.yield("I yield!") 27 coroutine.yield("I yield!", n)
28 if cancel_test and cancel_test() then -- cancel_test does not exist when run immediately (not in a Lane)
29 return "I am cancelled"
30 end
31 n = n + 1
22 end 32 end
23end 33end
24 34
@@ -27,8 +37,8 @@ local out_linda = lanes.linda()
27local test_close = function(what_, f_) 37local test_close = function(what_, f_)
28 local c = coroutine.create(f_) 38 local c = coroutine.create(f_)
29 for i = 1, 10 do 39 for i = 1, 10 do
30 local t, r = coroutine.resume(c, out_linda) -- returns true + <yielded values> 40 local t, r1, r2 = coroutine.resume(c, out_linda) -- returns true + <yielded values>
31 assert(t == true and r == "I yield!", "got " .. tostring(t) .. " " .. tostring(r)) 41 assert(t == true and r1 == "I yield!" and r2 == i, "got " .. tostring(t) .. " " .. tostring(r1) .. " " .. tostring(r2))
32 local s = coroutine.status(c) 42 local s = coroutine.status(c)
33 assert(s == "suspended") 43 assert(s == "suspended")
34 end 44 end
@@ -39,28 +49,64 @@ local test_close = function(what_, f_)
39 assert(s == "out" and r == "Closed!", what_ .. " got " .. tostring(s) .. " " .. tostring(r)) 49 assert(s == "out" and r == "Closed!", what_ .. " got " .. tostring(s) .. " " .. tostring(r))
40end 50end
41 51
42-- first, try the close mechanism outside of a lane 52---------------------------------------------------------
43test_close("base", yielder) 53-- TEST: first, try the close mechanism outside of a lane
54---------------------------------------------------------
55if true then
56 test_close("base", yielder)
57end
44 58
45-- try again with a function obtained through dump/undump 59---------------------------------------------------------------
46-- note this means our yielder implementation can't have upvalues, as they are lost in the process 60-- TEST: try again with a function obtained through dump/undump
47test_close("dumped", load(string.dump(yielder))) 61---------------------------------------------------------------
62if true then
63 -- note this means our yielder implementation can't have upvalues, as they are lost in the process
64 test_close("dumped", load(string.dump(yielder)))
65end
48 66
49------------------------------------------------------------------------------ 67------------------------------------------------------------------------------
50-- TEST: to-be-closed variables are properly closed when the lane is collected 68-- TEST: to-be-closed variables are properly closed whzen the lane is collected
51------------------------------------------------------------------------------ 69------------------------------------------------------------------------------
52if false then -- NOT IMPLEMENTED YET! 70if true then
53
54 -- the generator 71 -- the generator
55 local coro_g = lanes.coro("*", yielder) 72 local coro_g = lanes.coro("*", yielder)
56 73
57 -- start the lane 74 -- start the lane
58 local h = coro_g(out_linda) 75 local h = coro_g(out_linda)
59 76
60 -- join it so that it reaches suspended state 77 -- join the lane. it should be done and give back the values resulting of the first yield point
61 local r, v = h:join(0.5) 78 local r, v1, v2 = h:join()
79 assert(r == true and v1 == "I yield!" and v2 == 1, "got " .. tostring(r) .. " " .. tostring(v1) .. " " .. tostring(v2))
80 assert(h.status == "done", "got " .. h.status)
81
82 -- force collection of the lane
83 h = nil
84 collectgarbage()
85
86 -- I want the to-be-closed variable of the coroutine linda to be properly closed
87 local s, r = out_linda:receive(0, "out")
88 assert(s == "out" and r == "Closed!", "coro got " .. tostring(s) .. " " .. tostring(r)) -- THIS TEST FAILS
89end
90
91---------------------------------------------------------------------------------------------------
92-- TEST: if a to-be-closed handler takes longer than the join timeout, everything works as expected
93---------------------------------------------------------------------------------------------------
94if true then
95 -- the generator
96 local coro_g = lanes.coro("*", yielder)
97
98 -- start the lane. The to-be-closed handler will sleep for 1 second
99 local h = coro_g(out_linda, 1)
100
101 -- first join attempt should timeout
102 local r, v = h:join(0.6)
62 assert(r == nil and v == "timeout", "got " .. tostring(r) .. " " .. tostring(v)) 103 assert(r == nil and v == "timeout", "got " .. tostring(r) .. " " .. tostring(v))
63 assert(h.status == "suspended") 104 assert(h.status == "running", "got " .. h.status)
105
106 -- join the lane again. it should be done and give back the values resulting of the first yield point
107 local r, v1, v2 = h:join(0.6)
108 assert(r == true and v1 == "I yield!" and v2 == 1, "got " .. tostring(r) .. " " .. tostring(v1) .. " " .. tostring(v2))
109 assert(h.status == "done", "got " .. h.status)
64 110
65 -- force collection of the lane 111 -- force collection of the lane
66 h = nil 112 h = nil
@@ -68,5 +114,28 @@ if false then -- NOT IMPLEMENTED YET!
68 114
69 -- I want the to-be-closed variable of the coroutine linda to be properly closed 115 -- I want the to-be-closed variable of the coroutine linda to be properly closed
70 local s, r = out_linda:receive(0, "out") 116 local s, r = out_linda:receive(0, "out")
71 assert(s == "out" and r == "Closed!", "coro got " .. tostring(s) .. " " .. tostring(r)) 117 assert(s == "out" and r == "Closed!", "coro got " .. tostring(s) .. " " .. tostring(r)) -- THIS TEST FAILS
118end
119
120--------------------------------------------------
121-- TEST: cancelling a suspended Lane should end it
122--------------------------------------------------
123if true then
124 -- the generator
125 local coro_g = lanes.coro("*", yielder)
126
127 -- start the lane
128 local h = coro_g(out_linda)
129 repeat until h.status == "suspended"
130
131 -- first cancellation attempt: don't wake the lane
132 local b, r = h:cancel("soft", 0.5)
133 -- the lane is still blocked in its suspended state
134 assert(b == false and r == "timeout" and h.status == "suspended", "got " .. tostring(b) .. " " .. tostring(r) .. " " .. h.status)
135
136 -- cancel the Lane again, this time waking it. it will resume, and yielder()'s will break out of its infinite loop
137 h:cancel("soft", nil, true)
138
139 -- lane should be done, because it returned cooperatively when detecting a soft cancel
140 assert(h.status == "done", "got " .. h.status)
72end 141end
diff --git a/unit_tests/scripts/coro/yielding_function.lua b/unit_tests/scripts/coro/yielding_function.lua
index 636f094..6518d1f 100644
--- a/unit_tests/scripts/coro/yielding_function.lua
+++ b/unit_tests/scripts/coro/yielding_function.lua
@@ -23,7 +23,7 @@ end
23-------------------------------------------------------------------------------------------------- 23--------------------------------------------------------------------------------------------------
24-- TEST: if we start a non-coroutine lane with a yielding function, we should get an error, right? 24-- TEST: if we start a non-coroutine lane with a yielding function, we should get an error, right?
25-------------------------------------------------------------------------------------------------- 25--------------------------------------------------------------------------------------------------
26if false then 26if true then
27 local fun_g = lanes.gen("*", { name = 'auto' }, yielder) 27 local fun_g = lanes.gen("*", { name = 'auto' }, yielder)
28 local h = fun_g("hello", "world", "!") 28 local h = fun_g("hello", "world", "!")
29 local err, status, stack = h:join() 29 local err, status, stack = h:join()
@@ -48,7 +48,7 @@ local coro_g = lanes.coro("*", {name = "auto"}, yielder)
48------------------------------------------------------------------------------------------------- 48-------------------------------------------------------------------------------------------------
49-- TEST: we can resume as many times as the lane yields, then read the returned value on indexing 49-- TEST: we can resume as many times as the lane yields, then read the returned value on indexing
50------------------------------------------------------------------------------------------------- 50-------------------------------------------------------------------------------------------------
51if false then 51if true then
52 -- launch coroutine lane 52 -- launch coroutine lane
53 local h = coro_g("hello", "world", "!") 53 local h = coro_g("hello", "world", "!")
54 -- read the yielded values, sending back the expected index 54 -- read the yielded values, sending back the expected index
@@ -63,7 +63,7 @@ end
63--------------------------------------------------------------------------------------------- 63---------------------------------------------------------------------------------------------
64-- TEST: we can resume as many times as the lane yields, then read the returned value on join 64-- TEST: we can resume as many times as the lane yields, then read the returned value on join
65--------------------------------------------------------------------------------------------- 65---------------------------------------------------------------------------------------------
66if false then 66if true then
67 -- launch coroutine lane 67 -- launch coroutine lane
68 local h = coro_g("hello", "world", "!") 68 local h = coro_g("hello", "world", "!")
69 -- read the yielded values, sending back the expected index 69 -- read the yielded values, sending back the expected index
@@ -75,9 +75,9 @@ if false then
75 assert(h.status == "done" and s == true and r == "bye!") 75 assert(h.status == "done" and s == true and r == "bye!")
76end 76end
77 77
78--------------------------------------------------------------------------------------------------- 78---------------------------------------------------
79-- TEST: if we join a yielded lane, we get a timeout, and we can resume as if we didn't try to join 79-- TEST: if we join a yielded lane, the lane aborts
80--------------------------------------------------------------------------------------------------- 80---------------------------------------------------
81if true then 81if true then
82 -- launch coroutine lane 82 -- launch coroutine lane
83 local h = coro_g("hello", "world", "!") 83 local h = coro_g("hello", "world", "!")
@@ -89,10 +89,10 @@ if true then
89 assert(s == "done" and b == true and r == "world", "got " .. s .. " " .. tostring(b) .. " " .. tostring(r)) 89 assert(s == "done" and b == true and r == "world", "got " .. s .. " " .. tostring(b) .. " " .. tostring(r))
90end 90end
91 91
92----------------------------------------------------------------------- 92-------------------------------------------------------------------------
93-- TEST: if we index yielded lane, we should get the last yielded value 93-- TEST: if we index a yielded lane, we should get the last yielded value
94----------------------------------------------------------------------- 94-------------------------------------------------------------------------
95if false then 95if true then
96 -- launch coroutine lane 96 -- launch coroutine lane
97 local h = coro_g("hello", "world", "!") 97 local h = coro_g("hello", "world", "!")
98 -- read the first yielded value, sending back the expected index 98 -- read the first yielded value, sending back the expected index
@@ -102,15 +102,7 @@ if false then
102 local r2 = h[1] 102 local r2 = h[1]
103 local r3 = h[1] 103 local r3 = h[1]
104 assert(r1 == "world" and r2 == "world" and r3 == "world", "got " .. r1 .. " " .. r2 .. " " .. r3) 104 assert(r1 == "world" and r2 == "world" and r3 == "world", "got " .. r1 .. " " .. r2 .. " " .. r3)
105 assert(h:resume(2) == "world") 105 -- once the lane was indexed, it is no longer resumable (just like after join)
106 106 local b, e = pcall(h.resume, h, 2)
107 -- THERE IS AN INCONSISTENCY: h:resume pulls the yielded values directly out of the lane's stack 107 assert(b == false and e == "cannot resume non-suspended coroutine Lane")
108 -- but h[n] removes them and stores them in the internal values storage table
109 -- TODO: so we need to decide: should indexing a yielded lane work like resume()?
110 assert(h:resume(3) == "!")
111end 108end
112
113-------------------------------------------------------------------------------------
114-- TEST: if we close yielded lane, we can join it and get the last yielded values out
115-------------------------------------------------------------------------------------
116-- TODO: we need to implement lane:close() for that!