diff options
author | Benoit Germain <benoit.germain@ubisoft.com> | 2025-07-03 18:11:13 +0200 |
---|---|---|
committer | Benoit Germain <benoit.germain@ubisoft.com> | 2025-07-03 18:11:13 +0200 |
commit | 72d7b36e020fd3f11ec002c110e7340f667d6628 (patch) | |
tree | 00b42d89192b1bfa88245224f827544a7c5dbc50 /src | |
parent | 45774df1eeeaae0868420104a4cdad4691995dc9 (diff) | |
download | lanes-72d7b36e020fd3f11ec002c110e7340f667d6628.tar.gz lanes-72d7b36e020fd3f11ec002c110e7340f667d6628.tar.bz2 lanes-72d7b36e020fd3f11ec002c110e7340f667d6628.zip |
Fix more issues related to suspended coroutines and join/indexing operations
* indexing and joining a suspended lane closes the coroutine, causing the termination of the lane
* properly close coroutine to-be-closed variables on interruption
Diffstat (limited to 'src')
-rw-r--r-- | src/lane.cpp | 275 | ||||
-rw-r--r-- | src/lane.hpp | 6 |
2 files changed, 163 insertions, 118 deletions
diff --git a/src/lane.cpp b/src/lane.cpp index 5ec6bcf..33ee8a0 100644 --- a/src/lane.cpp +++ b/src/lane.cpp | |||
@@ -130,93 +130,18 @@ static LUAG_FUNC(lane_join) | |||
130 | } | 130 | } |
131 | lua_settop(L_, 1); // L_: lane | 131 | lua_settop(L_, 1); // L_: lane |
132 | 132 | ||
133 | |||
134 | // wait until suspended or done | 133 | // wait until suspended or done |
135 | bool const _done{ !_lane->thread.joinable() || _lane->waitForCompletion(_until, true) }; | 134 | STACK_CHECK_START_REL(L_, 0); // L_: lane |
136 | 135 | if (!_lane->waitForJoin(L_, _until)) { | |
137 | if (!_done) { | 136 | // in that case, should have pushed nil, "timeout" |
138 | lua_pushnil(L_); // L_: lane nil | 137 | STACK_CHECK(L_, 2); |
139 | luaG_pushstring(L_, "timeout"); // L_: lane nil "timeout" | ||
140 | return 2; | 138 | return 2; |
141 | } | 139 | } |
140 | STACK_CHECK(L_, 0); // L_: lane | ||
141 | // Thread is Done/Error/Cancelled; the Lane thread isn't working with it, therefore we can. | ||
142 | 142 | ||
143 | // if lane is suspended, force a cancellation that will cause the yield loop to break, and the termination of the thread | 143 | std::ignore = _lane->storeResults(L_); |
144 | if (_lane->status.load(std::memory_order_acquire) == Lane::Suspended) { | 144 | int const _ret{ _lane->pushStoredResults(L_) }; |
145 | auto const _cr{ _lane->cancel(CancelOp{ CancelRequest::Soft, LuaHookMask::None }, std::chrono::time_point<std::chrono::steady_clock>::max(), WakeLane::Yes, 0) }; | ||
146 | if (_cr == CancelResult::Timeout) { | ||
147 | lua_pushnil(L_); // L_: lane nil | ||
148 | luaG_pushstring(L_, "timeout"); // L_: lane nil "timeout" | ||
149 | return 2; | ||
150 | } | ||
151 | if (_lane->status.load(std::memory_order_acquire) == Lane::Suspended) { | ||
152 | raise_luaL_error(L_, "INTERNAL ERROR: Lane should not be suspended here"); | ||
153 | } | ||
154 | } | ||
155 | |||
156 | STACK_CHECK_START_REL(L_, 0); // L_: lane | ||
157 | // Thread is Suspended or Done/Error/Cancelled; the Lane thread isn't working with it, therefore we can. | ||
158 | |||
159 | int _ret{ 0 }; | ||
160 | int const _stored{ _lane->storeResults(L_) }; | ||
161 | STACK_GROW(L_, std::max(3, _stored + 1)); | ||
162 | switch (_lane->status.load(std::memory_order_acquire)) { | ||
163 | case Lane::Suspended: | ||
164 | raise_luaL_error(L_, "INTERNAL ERROR: SHOULD NEVER BE SUSPENDED HERE"); | ||
165 | break; | ||
166 | |||
167 | case Lane::Done: // got regular return values | ||
168 | if (_stored > 0) { | ||
169 | lua_getiuservalue(L_, StackIndex{ 1 }, UserValueIndex{ 1 }); // L_: lane {uv} | ||
170 | for (int _i = 2; _i <= _stored; ++_i) { | ||
171 | lua_rawgeti(L_, 2, _i); // L_: lane {uv} results2...N | ||
172 | } | ||
173 | lua_rawgeti(L_, 2, 1); // L_: lane {uv} results2...N result1 | ||
174 | lua_replace(L_, 2); // L_: lane results | ||
175 | } | ||
176 | // we precede the lane body returned values with boolean true | ||
177 | lua_pushboolean(L_, 1); // L_: lane results true | ||
178 | lua_replace(L_, 1); // L_: true results | ||
179 | _ret = _stored + 1; | ||
180 | STACK_CHECK(L_, _stored); | ||
181 | break; | ||
182 | |||
183 | case Lane::Error: | ||
184 | { | ||
185 | LUA_ASSERT(L_, _stored == 2 || _stored == 3); | ||
186 | lua_getiuservalue(L_, StackIndex{ 1 }, UserValueIndex{ 1 }); // L_: lane {uv} | ||
187 | lua_rawgeti(L_, 2, 2); // L_: lane {uv} <error> | ||
188 | lua_rawgeti(L_, 2, 3); // L_: lane {uv} <error> <trace>|nil | ||
189 | if (lua_isnil(L_, -1)) { | ||
190 | lua_replace(L_, 2); // L_: lane nil <error> | ||
191 | } else { | ||
192 | lua_rawgeti(L_, 2, 1); // L_: lane {uv} <error> <trace> nil | ||
193 | lua_replace(L_, 2); // L_: lane nil <error> <trace> | ||
194 | } | ||
195 | _ret = lua_gettop(L_) - 1; // 2 or 3 | ||
196 | STACK_CHECK(L_, _ret); | ||
197 | } | ||
198 | break; | ||
199 | |||
200 | case Lane::Cancelled: | ||
201 | { | ||
202 | LUA_ASSERT(L_, _stored == 2); | ||
203 | lua_getiuservalue(L_, StackIndex{ 1 }, UserValueIndex{ 1 }); // L_: lane {uv} | ||
204 | lua_rawgeti(L_, 2, 2); // L_: lane {uv} cancel_error | ||
205 | lua_rawgeti(L_, 2, 1); // L_: lane {uv} cancel_error nil | ||
206 | lua_replace(L_, -3); // L_: lane nil cancel_error | ||
207 | LUA_ASSERT(L_, lua_isnil(L_, -2) && kCancelError.equals(L_, kIdxTop)); | ||
208 | _ret = 2; | ||
209 | STACK_CHECK(L_, _ret); | ||
210 | } | ||
211 | break; | ||
212 | |||
213 | default: | ||
214 | DEBUGSPEW_CODE(DebugSpew(nullptr) << "Unknown Lane status: " << static_cast<int>(_lane->status.load(std::memory_order_relaxed)) << std::endl); | ||
215 | LUA_ASSERT(L_, false); | ||
216 | _ret = 0; | ||
217 | STACK_CHECK(L_, _ret); | ||
218 | } | ||
219 | LUA_ASSERT(L_, lua_gettop(L_) >= _ret); | ||
220 | return _ret; | 145 | return _ret; |
221 | } | 146 | } |
222 | 147 | ||
@@ -277,13 +202,17 @@ static int lane_index_number(lua_State* L_) | |||
277 | int const _key{ static_cast<int>(lua_tointeger(L_, 2)) }; | 202 | int const _key{ static_cast<int>(lua_tointeger(L_, 2)) }; |
278 | lua_pop(L_, 1); // L_: lane | 203 | lua_pop(L_, 1); // L_: lane |
279 | 204 | ||
280 | // wait until the lane finishes or is suspended | 205 | // wait until suspended or done |
206 | STACK_CHECK_START_REL(L_, 0); // L_: lane | ||
281 | std::chrono::time_point<std::chrono::steady_clock> _until{ std::chrono::time_point<std::chrono::steady_clock>::max() }; | 207 | std::chrono::time_point<std::chrono::steady_clock> _until{ std::chrono::time_point<std::chrono::steady_clock>::max() }; |
282 | if (!_lane->waitForCompletion(_until, true)) { | 208 | if (!_lane->waitForJoin(L_, _until)) { |
283 | raise_luaL_error(L_, "INTERNAL ERROR: Failed to join"); | 209 | // in that case, should have pushed nil, "timeout" |
210 | STACK_CHECK(L_, 2); | ||
211 | return 2; | ||
284 | } | 212 | } |
213 | STACK_CHECK(L_, 0); // L_: lane | ||
214 | // Thread is Done/Error/Cancelled; the Lane thread isn't working with it, therefore we can. | ||
285 | 215 | ||
286 | // make sure results are stored | ||
287 | int const _stored{ _lane->storeResults(L_) }; | 216 | int const _stored{ _lane->storeResults(L_) }; |
288 | if (_key > _stored) { | 217 | if (_key > _stored) { |
289 | // get nil if indexing beyond the actual returned value count | 218 | // get nil if indexing beyond the actual returned value count |
@@ -741,7 +670,7 @@ static void lane_main(Lane* const lane_) | |||
741 | lane_->nresults = lua_gettop(_L) - _errorHandlerCount; | 670 | lane_->nresults = lua_gettop(_L) - _errorHandlerCount; |
742 | } else { | 671 | } else { |
743 | // S and L are different: we run as a coroutine in Lua thread L created in state S | 672 | // S and L are different: we run as a coroutine in Lua thread L created in state S |
744 | bool _again{ true }; | 673 | bool _shouldClose{ false }; |
745 | do { | 674 | do { |
746 | // starting with Lua 5.4, lua_resume can leave more stuff on the stack below the actual yielded values. | 675 | // starting with Lua 5.4, lua_resume can leave more stuff on the stack below the actual yielded values. |
747 | // that's why we have lane_->nresults | 676 | // that's why we have lane_->nresults |
@@ -754,19 +683,25 @@ static void lane_main(Lane* const lane_) | |||
754 | lane_->status.store(Lane::Suspended, std::memory_order_release); // Running -> Suspended | 683 | lane_->status.store(Lane::Suspended, std::memory_order_release); // Running -> Suspended |
755 | lane_->doneCondVar.notify_one(); | 684 | lane_->doneCondVar.notify_one(); |
756 | // wait until the user wants us to resume | 685 | // wait until the user wants us to resume |
757 | // update waiting_on, so that the lane can be woken by cancellation requests here? | 686 | // update waiting_on, so that the lane can be woken by cancellation requests here |
758 | lane_->waiting_on = &lane_->doneCondVar; | 687 | lane_->waiting_on = &lane_->doneCondVar; |
759 | lane_->doneCondVar.wait(_guard, [lane_]() { return lane_->status.load(std::memory_order_acquire) == Lane::Resuming || lane_->cancelRequest.load(std::memory_order_relaxed) != CancelRequest::None; }); | 688 | lane_->doneCondVar.wait(_guard, |
689 | [lane_,&_shouldClose]() | ||
690 | { | ||
691 | auto const _status{ lane_->status.load(std::memory_order_acquire) }; | ||
692 | // wait interrupted because of a cancellation or join request means we have to abort the resume loop | ||
693 | _shouldClose = (_status == Lane::Closing); | ||
694 | return _shouldClose || (_status == Lane::Resuming) || (lane_->cancelRequest.load(std::memory_order_relaxed) != CancelRequest::None); | ||
695 | } | ||
696 | ); | ||
760 | // here lane_->doneMutex is locked again | 697 | // here lane_->doneMutex is locked again |
761 | lane_->waiting_on = nullptr; | 698 | lane_->waiting_on = nullptr; |
762 | lane_->status.store(Lane::Running, std::memory_order_release); // Resuming -> Running | 699 | lane_->status.store(Lane::Running, std::memory_order_release); // Resuming -> Running |
763 | } | 700 | } |
764 | // wait was interrupted because of a cancellation, finish the lane | ||
765 | _again = (lane_->cancelRequest.load(std::memory_order_relaxed) == CancelRequest::None); | ||
766 | } else { | 701 | } else { |
767 | _again = false; | 702 | _shouldClose = true; |
768 | } | 703 | } |
769 | } while (_again); | 704 | } while (!_shouldClose); |
770 | if (_rc == LuaError::YIELD) { | 705 | if (_rc == LuaError::YIELD) { |
771 | #if LUA_VERSION_NUM >= 504 | 706 | #if LUA_VERSION_NUM >= 504 |
772 | lua_State* const _S{ lane_->S }; | 707 | lua_State* const _S{ lane_->S }; |
@@ -855,7 +790,7 @@ static void lane_main(Lane* const lane_) | |||
855 | #if LUA_VERSION_NUM >= 504 | 790 | #if LUA_VERSION_NUM >= 504 |
856 | 791 | ||
857 | // __close(lane_ud, <err>) | 792 | // __close(lane_ud, <err>) |
858 | static LUAG_FUNC(lane___close) | 793 | static LUAG_FUNC(lane_close) |
859 | { | 794 | { |
860 | [[maybe_unused]] Lane* const _lane{ ToLane(L_, StackIndex{ 1 }) }; // L_: lane err|nil | 795 | [[maybe_unused]] Lane* const _lane{ ToLane(L_, StackIndex{ 1 }) }; // L_: lane err|nil |
861 | // drop the error if any | 796 | // drop the error if any |
@@ -869,15 +804,6 @@ static LUAG_FUNC(lane___close) | |||
869 | return lua_gettop(L_); | 804 | return lua_gettop(L_); |
870 | } | 805 | } |
871 | 806 | ||
872 | // ################################################################################################# | ||
873 | |||
874 | // close(lane_ud) | ||
875 | static LUAG_FUNC(lane_close) | ||
876 | { | ||
877 | [[maybe_unused]] Lane* const _lane{ ToLane(L_, StackIndex{ 1 }) }; // L_: lane err|nil | ||
878 | raise_luaL_error(L_, "not implemented"); // TODO! | ||
879 | return 0; | ||
880 | } | ||
881 | #endif // LUA_VERSION_NUM >= 504 | 807 | #endif // LUA_VERSION_NUM >= 504 |
882 | 808 | ||
883 | // ################################################################################################# | 809 | // ################################################################################################# |
@@ -1069,14 +995,11 @@ namespace { | |||
1069 | namespace local { | 995 | namespace local { |
1070 | static struct luaL_Reg const sLaneFunctions[] = { | 996 | static struct luaL_Reg const sLaneFunctions[] = { |
1071 | #if LUA_VERSION_NUM >= 504 | 997 | #if LUA_VERSION_NUM >= 504 |
1072 | { "__close", LG_lane___close }, | 998 | { "__close", LG_lane_close }, |
1073 | #endif // LUA_VERSION_NUM >= 504 | 999 | #endif // LUA_VERSION_NUM >= 504 |
1074 | { "__gc", LG_lane_gc }, | 1000 | { "__gc", LG_lane_gc }, |
1075 | { "__index", LG_lane_index }, | 1001 | { "__index", LG_lane_index }, |
1076 | { "cancel", LG_lane_cancel }, | 1002 | { "cancel", LG_lane_cancel }, |
1077 | #if LUA_VERSION_NUM >= 504 | ||
1078 | { "close", LG_lane_close }, | ||
1079 | #endif // LUA_VERSION_NUM >= 504 | ||
1080 | { "get_threadname", LG_lane_get_threadname }, | 1003 | { "get_threadname", LG_lane_get_threadname }, |
1081 | { "join", LG_lane_join }, | 1004 | { "join", LG_lane_join }, |
1082 | { "resume", LG_lane_resume }, | 1005 | { "resume", LG_lane_resume }, |
@@ -1171,6 +1094,82 @@ void Lane::pushIndexedResult(lua_State* const L_, int const key_) const | |||
1171 | // ################################################################################################# | 1094 | // ################################################################################################# |
1172 | 1095 | ||
1173 | [[nodiscard]] | 1096 | [[nodiscard]] |
1097 | int Lane::pushStoredResults(lua_State* const L_) const | ||
1098 | { | ||
1099 | STACK_CHECK_START_ABS(L_, 1); // should only have the lane UD on the stack | ||
1100 | static constexpr StackIndex kIdxSelf{ 1 }; | ||
1101 | static constexpr UserValueIndex kUvResults{ 1 }; | ||
1102 | LUA_ASSERT(L_, ToLane(L_, kIdxSelf) == this); // L_: lane | ||
1103 | lua_getiuservalue(L_, kIdxSelf, kUvResults); // L_: lane {uv} | ||
1104 | lua_rawgeti(L_, kIdxTop, 0); // L_: lane {uv} stored | ||
1105 | int const _stored{ static_cast<int>(lua_tointeger(L_, kIdxTop)) }; | ||
1106 | lua_pop(L_, 1); // L_: lane {uv} | ||
1107 | |||
1108 | int _ret{}; | ||
1109 | STACK_GROW(L_, std::max(3, _stored + 1)); | ||
1110 | switch (status.load(std::memory_order_acquire)) { | ||
1111 | case Lane::Suspended: | ||
1112 | raise_luaL_error(L_, "INTERNAL ERROR: SHOULD NEVER BE SUSPENDED HERE"); | ||
1113 | break; | ||
1114 | |||
1115 | case Lane::Done: // got regular return values | ||
1116 | if (_stored > 0) { | ||
1117 | for (int _i = 2; _i <= _stored; ++_i) { | ||
1118 | lua_rawgeti(L_, 2, _i); // L_: lane {uv} results2...N | ||
1119 | } | ||
1120 | lua_rawgeti(L_, 2, 1); // L_: lane {uv} results2...N result1 | ||
1121 | lua_replace(L_, 2); // L_: lane results | ||
1122 | } else { | ||
1123 | lua_pop(L_, 1); // L_: lane | ||
1124 | } | ||
1125 | // we precede the lane body returned values with boolean true | ||
1126 | lua_pushboolean(L_, 1); // L_: lane results true | ||
1127 | lua_replace(L_, 1); // L_: true results | ||
1128 | _ret = _stored + 1; | ||
1129 | STACK_CHECK(L_, _ret); | ||
1130 | break; | ||
1131 | |||
1132 | case Lane::Error: | ||
1133 | { | ||
1134 | LUA_ASSERT(L_, _stored == 2 || _stored == 3); // contains nil error [trace] | ||
1135 | lua_rawgeti(L_, 2, 2); // L_: lane {uv} <error> | ||
1136 | lua_rawgeti(L_, 2, 3); // L_: lane {uv} <error> <trace>|nil | ||
1137 | if (lua_isnil(L_, -1)) { | ||
1138 | lua_replace(L_, 2); // L_: lane nil <error> | ||
1139 | } else { | ||
1140 | lua_rawgeti(L_, 2, 1); // L_: lane {uv} <error> <trace> nil | ||
1141 | lua_replace(L_, 2); // L_: lane nil <error> <trace> | ||
1142 | } | ||
1143 | _ret = _stored; // 2 or 3 | ||
1144 | STACK_CHECK(L_, _ret + 1); // stack still contains the lane UD below | ||
1145 | } | ||
1146 | break; | ||
1147 | |||
1148 | case Lane::Cancelled: | ||
1149 | { | ||
1150 | LUA_ASSERT(L_, _stored == 2); | ||
1151 | lua_rawgeti(L_, 2, 2); // L_: lane {uv} cancel_error | ||
1152 | lua_rawgeti(L_, 2, 1); // L_: lane {uv} cancel_error nil | ||
1153 | lua_replace(L_, -3); // L_: lane nil cancel_error | ||
1154 | LUA_ASSERT(L_, lua_isnil(L_, -2) && kCancelError.equals(L_, kIdxTop)); | ||
1155 | _ret = 2; | ||
1156 | STACK_CHECK(L_, _ret + 1); // stack still contains the lane UD below | ||
1157 | } | ||
1158 | break; | ||
1159 | |||
1160 | default: | ||
1161 | DEBUGSPEW_CODE(DebugSpew(nullptr) << "Unknown Lane status: " << static_cast<int>(_lane->status.load(std::memory_order_relaxed)) << std::endl); | ||
1162 | LUA_ASSERT(L_, false); | ||
1163 | _ret = 0; | ||
1164 | STACK_CHECK(L_, _ret); | ||
1165 | } | ||
1166 | LUA_ASSERT(L_, lua_gettop(L_) >= _ret); | ||
1167 | return _ret; | ||
1168 | } | ||
1169 | |||
1170 | // ################################################################################################# | ||
1171 | |||
1172 | [[nodiscard]] | ||
1174 | std::string_view Lane::pushErrorTraceLevel(lua_State* L_) const | 1173 | std::string_view Lane::pushErrorTraceLevel(lua_State* L_) const |
1175 | { | 1174 | { |
1176 | std::string_view const _str{ errorTraceLevelString() }; | 1175 | std::string_view const _str{ errorTraceLevelString() }; |
@@ -1270,15 +1269,16 @@ int Lane::storeResults(lua_State* const L_) | |||
1270 | lua_getiuservalue(L_, kIdxSelf, UserValueIndex{ 1 }); // L_: lane ... {uv} | 1269 | lua_getiuservalue(L_, kIdxSelf, UserValueIndex{ 1 }); // L_: lane ... {uv} |
1271 | StackIndex const _tidx{ lua_gettop(L_) }; | 1270 | StackIndex const _tidx{ lua_gettop(L_) }; |
1272 | 1271 | ||
1273 | int _stored{}; | 1272 | // if the results were already stored from a previous indexing, just say how many values we have in store |
1274 | if (nresults == 0) { | 1273 | if (!L) { |
1275 | lua_rawgeti(L_, -1, 0); // L_: lane ... {uv} nresults | 1274 | lua_rawgeti(L_, -1, 0); // L_: lane ... {uv} nresults |
1276 | _stored = static_cast<int>(lua_tointeger(L_, -1)); | 1275 | auto const _stored{ static_cast<int>(lua_tointeger(L_, -1)) }; |
1277 | lua_pop(L_, 2); | 1276 | lua_pop(L_, 2); |
1278 | STACK_CHECK(L_, 0); | 1277 | STACK_CHECK(L_, 0); |
1279 | return _stored; | 1278 | return _stored; |
1280 | } | 1279 | } |
1281 | 1280 | ||
1281 | int _stored{}; | ||
1282 | switch (status.load(std::memory_order_acquire)) { | 1282 | switch (status.load(std::memory_order_acquire)) { |
1283 | default: | 1283 | default: |
1284 | // this is an internal error, we probably never get here | 1284 | // this is an internal error, we probably never get here |
@@ -1358,12 +1358,13 @@ int Lane::storeResults(lua_State* const L_) | |||
1358 | //--- | 1358 | //--- |
1359 | // str= thread_status( lane ) | 1359 | // str= thread_status( lane ) |
1360 | // | 1360 | // |
1361 | // "pending" -> | ("running" <-> "waiting") <-> "suspended" <-> "resuming" | -> "done"/"error"/"cancelled" | 1361 | // "pending" -> | ("running" <-> "waiting") <-> "suspended" <-> "resuming/closing" | -> "done"/"error"/"cancelled" |
1362 | 1362 | ||
1363 | // "pending" not started yet | 1363 | // "pending" not started yet |
1364 | // "running" started, doing its work.. | 1364 | // "running" started, doing its work.. |
1365 | // "suspended" returned from a lua_resume | 1365 | // "suspended" returned from a lua_resume |
1366 | // "resuming" told by its parent state to resume | 1366 | // "resuming" told by its parent state to resume |
1367 | // "closing" not observable from the outside: happens only inside a join()/indexation call to unblock a suspended coroutine Lane so that it can join properly | ||
1367 | // "waiting" blocked in a send()/receive() | 1368 | // "waiting" blocked in a send()/receive() |
1368 | // "done" finished, results are there | 1369 | // "done" finished, results are there |
1369 | // "error" finished at an error, error value is there | 1370 | // "error" finished at an error, error value is there |
@@ -1374,7 +1375,7 @@ std::string_view Lane::threadStatusString() const | |||
1374 | { | 1375 | { |
1375 | static constexpr std::string_view kStrs[] = { | 1376 | static constexpr std::string_view kStrs[] = { |
1376 | "pending", | 1377 | "pending", |
1377 | "running", "suspended", "resuming", | 1378 | "running", "suspended", "resuming", "closing", |
1378 | "waiting", | 1379 | "waiting", |
1379 | "done", "error", "cancelled" | 1380 | "done", "error", "cancelled" |
1380 | }; | 1381 | }; |
@@ -1382,12 +1383,13 @@ std::string_view Lane::threadStatusString() const | |||
1382 | static_assert(1 == static_cast<std::underlying_type_t<Lane::Status>>(Running)); | 1383 | static_assert(1 == static_cast<std::underlying_type_t<Lane::Status>>(Running)); |
1383 | static_assert(2 == static_cast<std::underlying_type_t<Lane::Status>>(Suspended)); | 1384 | static_assert(2 == static_cast<std::underlying_type_t<Lane::Status>>(Suspended)); |
1384 | static_assert(3 == static_cast<std::underlying_type_t<Lane::Status>>(Resuming)); | 1385 | static_assert(3 == static_cast<std::underlying_type_t<Lane::Status>>(Resuming)); |
1385 | static_assert(4 == static_cast<std::underlying_type_t<Lane::Status>>(Waiting)); | 1386 | static_assert(4 == static_cast<std::underlying_type_t<Lane::Status>>(Closing)); |
1386 | static_assert(5 == static_cast<std::underlying_type_t<Lane::Status>>(Done)); | 1387 | static_assert(5 == static_cast<std::underlying_type_t<Lane::Status>>(Waiting)); |
1387 | static_assert(6 == static_cast<std::underlying_type_t<Lane::Status>>(Error)); | 1388 | static_assert(6 == static_cast<std::underlying_type_t<Lane::Status>>(Done)); |
1388 | static_assert(7 == static_cast<std::underlying_type_t<Lane::Status>>(Cancelled)); | 1389 | static_assert(7 == static_cast<std::underlying_type_t<Lane::Status>>(Error)); |
1390 | static_assert(8 == static_cast<std::underlying_type_t<Lane::Status>>(Cancelled)); | ||
1389 | auto const _status{ static_cast<std::underlying_type_t<Lane::Status>>(status.load(std::memory_order_acquire)) }; | 1391 | auto const _status{ static_cast<std::underlying_type_t<Lane::Status>>(status.load(std::memory_order_acquire)) }; |
1390 | if (_status < 0 || _status > 7) { // should never happen, but better safe than sorry | 1392 | if (_status < 0 || _status > 8) { // should never happen, but better safe than sorry |
1391 | return ""; | 1393 | return ""; |
1392 | } | 1394 | } |
1393 | return kStrs[_status]; | 1395 | return kStrs[_status]; |
@@ -1407,3 +1409,40 @@ bool Lane::waitForCompletion(std::chrono::time_point<std::chrono::steady_clock> | |||
1407 | return _status == Lane::Done || _status == Lane::Error || _status == Lane::Cancelled || _status == suspended; | 1409 | return _status == Lane::Done || _status == Lane::Error || _status == Lane::Cancelled || _status == suspended; |
1408 | }); | 1410 | }); |
1409 | } | 1411 | } |
1412 | |||
1413 | // ################################################################################################# | ||
1414 | |||
1415 | [[nodiscard]] | ||
1416 | bool Lane::waitForJoin(lua_State* const L_, std::chrono::time_point<std::chrono::steady_clock> until_) | ||
1417 | { | ||
1418 | // wait until suspended or done | ||
1419 | { | ||
1420 | bool const _done{ !thread.joinable() || waitForCompletion(until_, true) }; | ||
1421 | |||
1422 | if (!_done) { | ||
1423 | lua_pushnil(L_); // L_: lane nil | ||
1424 | luaG_pushstring(L_, "timeout"); // L_: lane nil "timeout" | ||
1425 | return false; | ||
1426 | } | ||
1427 | } | ||
1428 | |||
1429 | // if lane is suspended, force the yield loop to break, and the termination of the thread | ||
1430 | if (status.load(std::memory_order_acquire) == Lane::Suspended) { | ||
1431 | LUA_ASSERT(L_, waiting_on == &doneCondVar); | ||
1432 | status.store(Lane::Closing, std::memory_order_release); | ||
1433 | doneCondVar.notify_all(); | ||
1434 | // wait until done | ||
1435 | { | ||
1436 | bool const _done{ !thread.joinable() || waitForCompletion(until_, true) }; | ||
1437 | |||
1438 | if (!_done) { | ||
1439 | lua_pushnil(L_); // L_: lane nil | ||
1440 | luaG_pushstring(L_, "timeout"); // L_: lane nil "timeout" | ||
1441 | return false; | ||
1442 | } | ||
1443 | } | ||
1444 | LUA_ASSERT(L_, status.load(std::memory_order_acquire) != Lane::Closing); | ||
1445 | } | ||
1446 | LUA_ASSERT(L_, status.load(std::memory_order_acquire) != Lane::Suspended); | ||
1447 | return true; | ||
1448 | } | ||
diff --git a/src/lane.hpp b/src/lane.hpp index fd46d68..bd328b1 100644 --- a/src/lane.hpp +++ b/src/lane.hpp | |||
@@ -56,6 +56,7 @@ class Lane final | |||
56 | /* | 56 | /* |
57 | Pending: The Lua VM hasn't done anything yet. | 57 | Pending: The Lua VM hasn't done anything yet. |
58 | Resuming: The user requested the lane to resume execution from Suspended state. | 58 | Resuming: The user requested the lane to resume execution from Suspended state. |
59 | Closing: The user is joining the lane, specifically interrupting a suspended Lane. | ||
59 | Suspended: returned from lua_resume, waiting for the client to request a lua_resume. | 60 | Suspended: returned from lua_resume, waiting for the client to request a lua_resume. |
60 | Running, Suspended, Waiting: Thread is inside the Lua VM. | 61 | Running, Suspended, Waiting: Thread is inside the Lua VM. |
61 | Done, Error, Cancelled: Thread execution is outside the Lua VM. It can be lua_close()d. | 62 | Done, Error, Cancelled: Thread execution is outside the Lua VM. It can be lua_close()d. |
@@ -66,6 +67,7 @@ class Lane final | |||
66 | Running, | 67 | Running, |
67 | Suspended, | 68 | Suspended, |
68 | Resuming, | 69 | Resuming, |
70 | Closing, | ||
69 | Waiting, | 71 | Waiting, |
70 | Done, | 72 | Done, |
71 | Error, | 73 | Error, |
@@ -199,6 +201,8 @@ class Lane final | |||
199 | static void PushMetatable(lua_State* L_); | 201 | static void PushMetatable(lua_State* L_); |
200 | void pushStatusString(lua_State* L_) const; | 202 | void pushStatusString(lua_State* L_) const; |
201 | void pushIndexedResult(lua_State* L_, int key_) const; | 203 | void pushIndexedResult(lua_State* L_, int key_) const; |
204 | [[nodiscard]] | ||
205 | int pushStoredResults(lua_State* L_) const; | ||
202 | void resetResultsStorage(lua_State* L_, StackIndex self_idx_); | 206 | void resetResultsStorage(lua_State* L_, StackIndex self_idx_); |
203 | void selfdestructAdd(); | 207 | void selfdestructAdd(); |
204 | [[nodiscard]] | 208 | [[nodiscard]] |
@@ -213,6 +217,8 @@ class Lane final | |||
213 | // wait until the lane stops working with its state (either Suspended or Done+) | 217 | // wait until the lane stops working with its state (either Suspended or Done+) |
214 | [[nodiscard]] | 218 | [[nodiscard]] |
215 | bool waitForCompletion(std::chrono::time_point<std::chrono::steady_clock> until_, bool const _acceptSuspended); | 219 | bool waitForCompletion(std::chrono::time_point<std::chrono::steady_clock> until_, bool const _acceptSuspended); |
220 | [[nodiscard]] | ||
221 | bool waitForJoin(lua_State* _L, std::chrono::time_point<std::chrono::steady_clock> until_); | ||
216 | }; | 222 | }; |
217 | 223 | ||
218 | // ################################################################################################# | 224 | // ################################################################################################# |