diff options
| author | Benoit Germain <benoit.germain@ubisoft.com> | 2025-07-03 18:11:13 +0200 |
|---|---|---|
| committer | Benoit Germain <benoit.germain@ubisoft.com> | 2025-07-03 18:11:13 +0200 |
| commit | 72d7b36e020fd3f11ec002c110e7340f667d6628 (patch) | |
| tree | 00b42d89192b1bfa88245224f827544a7c5dbc50 /src | |
| parent | 45774df1eeeaae0868420104a4cdad4691995dc9 (diff) | |
| download | lanes-72d7b36e020fd3f11ec002c110e7340f667d6628.tar.gz lanes-72d7b36e020fd3f11ec002c110e7340f667d6628.tar.bz2 lanes-72d7b36e020fd3f11ec002c110e7340f667d6628.zip | |
Fix more issues related to suspended coroutines and join/indexing operations
* indexing and joining a suspended lane closes the coroutine, causing the termination of the lane
* properly close coroutine to-be-closed variables on interruption
Diffstat (limited to 'src')
| -rw-r--r-- | src/lane.cpp | 275 | ||||
| -rw-r--r-- | src/lane.hpp | 6 |
2 files changed, 163 insertions, 118 deletions
diff --git a/src/lane.cpp b/src/lane.cpp index 5ec6bcf..33ee8a0 100644 --- a/src/lane.cpp +++ b/src/lane.cpp | |||
| @@ -130,93 +130,18 @@ static LUAG_FUNC(lane_join) | |||
| 130 | } | 130 | } |
| 131 | lua_settop(L_, 1); // L_: lane | 131 | lua_settop(L_, 1); // L_: lane |
| 132 | 132 | ||
| 133 | |||
| 134 | // wait until suspended or done | 133 | // wait until suspended or done |
| 135 | bool const _done{ !_lane->thread.joinable() || _lane->waitForCompletion(_until, true) }; | 134 | STACK_CHECK_START_REL(L_, 0); // L_: lane |
| 136 | 135 | if (!_lane->waitForJoin(L_, _until)) { | |
| 137 | if (!_done) { | 136 | // in that case, should have pushed nil, "timeout" |
| 138 | lua_pushnil(L_); // L_: lane nil | 137 | STACK_CHECK(L_, 2); |
| 139 | luaG_pushstring(L_, "timeout"); // L_: lane nil "timeout" | ||
| 140 | return 2; | 138 | return 2; |
| 141 | } | 139 | } |
| 140 | STACK_CHECK(L_, 0); // L_: lane | ||
| 141 | // Thread is Done/Error/Cancelled; the Lane thread isn't working with it, therefore we can. | ||
| 142 | 142 | ||
| 143 | // if lane is suspended, force a cancellation that will cause the yield loop to break, and the termination of the thread | 143 | std::ignore = _lane->storeResults(L_); |
| 144 | if (_lane->status.load(std::memory_order_acquire) == Lane::Suspended) { | 144 | int const _ret{ _lane->pushStoredResults(L_) }; |
| 145 | auto const _cr{ _lane->cancel(CancelOp{ CancelRequest::Soft, LuaHookMask::None }, std::chrono::time_point<std::chrono::steady_clock>::max(), WakeLane::Yes, 0) }; | ||
| 146 | if (_cr == CancelResult::Timeout) { | ||
| 147 | lua_pushnil(L_); // L_: lane nil | ||
| 148 | luaG_pushstring(L_, "timeout"); // L_: lane nil "timeout" | ||
| 149 | return 2; | ||
| 150 | } | ||
| 151 | if (_lane->status.load(std::memory_order_acquire) == Lane::Suspended) { | ||
| 152 | raise_luaL_error(L_, "INTERNAL ERROR: Lane should not be suspended here"); | ||
| 153 | } | ||
| 154 | } | ||
| 155 | |||
| 156 | STACK_CHECK_START_REL(L_, 0); // L_: lane | ||
| 157 | // Thread is Suspended or Done/Error/Cancelled; the Lane thread isn't working with it, therefore we can. | ||
| 158 | |||
| 159 | int _ret{ 0 }; | ||
| 160 | int const _stored{ _lane->storeResults(L_) }; | ||
| 161 | STACK_GROW(L_, std::max(3, _stored + 1)); | ||
| 162 | switch (_lane->status.load(std::memory_order_acquire)) { | ||
| 163 | case Lane::Suspended: | ||
| 164 | raise_luaL_error(L_, "INTERNAL ERROR: SHOULD NEVER BE SUSPENDED HERE"); | ||
| 165 | break; | ||
| 166 | |||
| 167 | case Lane::Done: // got regular return values | ||
| 168 | if (_stored > 0) { | ||
| 169 | lua_getiuservalue(L_, StackIndex{ 1 }, UserValueIndex{ 1 }); // L_: lane {uv} | ||
| 170 | for (int _i = 2; _i <= _stored; ++_i) { | ||
| 171 | lua_rawgeti(L_, 2, _i); // L_: lane {uv} results2...N | ||
| 172 | } | ||
| 173 | lua_rawgeti(L_, 2, 1); // L_: lane {uv} results2...N result1 | ||
| 174 | lua_replace(L_, 2); // L_: lane results | ||
| 175 | } | ||
| 176 | // we precede the lane body returned values with boolean true | ||
| 177 | lua_pushboolean(L_, 1); // L_: lane results true | ||
| 178 | lua_replace(L_, 1); // L_: true results | ||
| 179 | _ret = _stored + 1; | ||
| 180 | STACK_CHECK(L_, _stored); | ||
| 181 | break; | ||
| 182 | |||
| 183 | case Lane::Error: | ||
| 184 | { | ||
| 185 | LUA_ASSERT(L_, _stored == 2 || _stored == 3); | ||
| 186 | lua_getiuservalue(L_, StackIndex{ 1 }, UserValueIndex{ 1 }); // L_: lane {uv} | ||
| 187 | lua_rawgeti(L_, 2, 2); // L_: lane {uv} <error> | ||
| 188 | lua_rawgeti(L_, 2, 3); // L_: lane {uv} <error> <trace>|nil | ||
| 189 | if (lua_isnil(L_, -1)) { | ||
| 190 | lua_replace(L_, 2); // L_: lane nil <error> | ||
| 191 | } else { | ||
| 192 | lua_rawgeti(L_, 2, 1); // L_: lane {uv} <error> <trace> nil | ||
| 193 | lua_replace(L_, 2); // L_: lane nil <error> <trace> | ||
| 194 | } | ||
| 195 | _ret = lua_gettop(L_) - 1; // 2 or 3 | ||
| 196 | STACK_CHECK(L_, _ret); | ||
| 197 | } | ||
| 198 | break; | ||
| 199 | |||
| 200 | case Lane::Cancelled: | ||
| 201 | { | ||
| 202 | LUA_ASSERT(L_, _stored == 2); | ||
| 203 | lua_getiuservalue(L_, StackIndex{ 1 }, UserValueIndex{ 1 }); // L_: lane {uv} | ||
| 204 | lua_rawgeti(L_, 2, 2); // L_: lane {uv} cancel_error | ||
| 205 | lua_rawgeti(L_, 2, 1); // L_: lane {uv} cancel_error nil | ||
| 206 | lua_replace(L_, -3); // L_: lane nil cancel_error | ||
| 207 | LUA_ASSERT(L_, lua_isnil(L_, -2) && kCancelError.equals(L_, kIdxTop)); | ||
| 208 | _ret = 2; | ||
| 209 | STACK_CHECK(L_, _ret); | ||
| 210 | } | ||
| 211 | break; | ||
| 212 | |||
| 213 | default: | ||
| 214 | DEBUGSPEW_CODE(DebugSpew(nullptr) << "Unknown Lane status: " << static_cast<int>(_lane->status.load(std::memory_order_relaxed)) << std::endl); | ||
| 215 | LUA_ASSERT(L_, false); | ||
| 216 | _ret = 0; | ||
| 217 | STACK_CHECK(L_, _ret); | ||
| 218 | } | ||
| 219 | LUA_ASSERT(L_, lua_gettop(L_) >= _ret); | ||
| 220 | return _ret; | 145 | return _ret; |
| 221 | } | 146 | } |
| 222 | 147 | ||
| @@ -277,13 +202,17 @@ static int lane_index_number(lua_State* L_) | |||
| 277 | int const _key{ static_cast<int>(lua_tointeger(L_, 2)) }; | 202 | int const _key{ static_cast<int>(lua_tointeger(L_, 2)) }; |
| 278 | lua_pop(L_, 1); // L_: lane | 203 | lua_pop(L_, 1); // L_: lane |
| 279 | 204 | ||
| 280 | // wait until the lane finishes or is suspended | 205 | // wait until suspended or done |
| 206 | STACK_CHECK_START_REL(L_, 0); // L_: lane | ||
| 281 | std::chrono::time_point<std::chrono::steady_clock> _until{ std::chrono::time_point<std::chrono::steady_clock>::max() }; | 207 | std::chrono::time_point<std::chrono::steady_clock> _until{ std::chrono::time_point<std::chrono::steady_clock>::max() }; |
| 282 | if (!_lane->waitForCompletion(_until, true)) { | 208 | if (!_lane->waitForJoin(L_, _until)) { |
| 283 | raise_luaL_error(L_, "INTERNAL ERROR: Failed to join"); | 209 | // in that case, should have pushed nil, "timeout" |
| 210 | STACK_CHECK(L_, 2); | ||
| 211 | return 2; | ||
| 284 | } | 212 | } |
| 213 | STACK_CHECK(L_, 0); // L_: lane | ||
| 214 | // Thread is Done/Error/Cancelled; the Lane thread isn't working with it, therefore we can. | ||
| 285 | 215 | ||
| 286 | // make sure results are stored | ||
| 287 | int const _stored{ _lane->storeResults(L_) }; | 216 | int const _stored{ _lane->storeResults(L_) }; |
| 288 | if (_key > _stored) { | 217 | if (_key > _stored) { |
| 289 | // get nil if indexing beyond the actual returned value count | 218 | // get nil if indexing beyond the actual returned value count |
| @@ -741,7 +670,7 @@ static void lane_main(Lane* const lane_) | |||
| 741 | lane_->nresults = lua_gettop(_L) - _errorHandlerCount; | 670 | lane_->nresults = lua_gettop(_L) - _errorHandlerCount; |
| 742 | } else { | 671 | } else { |
| 743 | // S and L are different: we run as a coroutine in Lua thread L created in state S | 672 | // S and L are different: we run as a coroutine in Lua thread L created in state S |
| 744 | bool _again{ true }; | 673 | bool _shouldClose{ false }; |
| 745 | do { | 674 | do { |
| 746 | // starting with Lua 5.4, lua_resume can leave more stuff on the stack below the actual yielded values. | 675 | // starting with Lua 5.4, lua_resume can leave more stuff on the stack below the actual yielded values. |
| 747 | // that's why we have lane_->nresults | 676 | // that's why we have lane_->nresults |
| @@ -754,19 +683,25 @@ static void lane_main(Lane* const lane_) | |||
| 754 | lane_->status.store(Lane::Suspended, std::memory_order_release); // Running -> Suspended | 683 | lane_->status.store(Lane::Suspended, std::memory_order_release); // Running -> Suspended |
| 755 | lane_->doneCondVar.notify_one(); | 684 | lane_->doneCondVar.notify_one(); |
| 756 | // wait until the user wants us to resume | 685 | // wait until the user wants us to resume |
| 757 | // update waiting_on, so that the lane can be woken by cancellation requests here? | 686 | // update waiting_on, so that the lane can be woken by cancellation requests here |
| 758 | lane_->waiting_on = &lane_->doneCondVar; | 687 | lane_->waiting_on = &lane_->doneCondVar; |
| 759 | lane_->doneCondVar.wait(_guard, [lane_]() { return lane_->status.load(std::memory_order_acquire) == Lane::Resuming || lane_->cancelRequest.load(std::memory_order_relaxed) != CancelRequest::None; }); | 688 | lane_->doneCondVar.wait(_guard, |
| 689 | [lane_,&_shouldClose]() | ||
| 690 | { | ||
| 691 | auto const _status{ lane_->status.load(std::memory_order_acquire) }; | ||
| 692 | // wait interrupted because of a cancellation or join request means we have to abort the resume loop | ||
| 693 | _shouldClose = (_status == Lane::Closing); | ||
| 694 | return _shouldClose || (_status == Lane::Resuming) || (lane_->cancelRequest.load(std::memory_order_relaxed) != CancelRequest::None); | ||
| 695 | } | ||
| 696 | ); | ||
| 760 | // here lane_->doneMutex is locked again | 697 | // here lane_->doneMutex is locked again |
| 761 | lane_->waiting_on = nullptr; | 698 | lane_->waiting_on = nullptr; |
| 762 | lane_->status.store(Lane::Running, std::memory_order_release); // Resuming -> Running | 699 | lane_->status.store(Lane::Running, std::memory_order_release); // Resuming -> Running |
| 763 | } | 700 | } |
| 764 | // wait was interrupted because of a cancellation, finish the lane | ||
| 765 | _again = (lane_->cancelRequest.load(std::memory_order_relaxed) == CancelRequest::None); | ||
| 766 | } else { | 701 | } else { |
| 767 | _again = false; | 702 | _shouldClose = true; |
| 768 | } | 703 | } |
| 769 | } while (_again); | 704 | } while (!_shouldClose); |
| 770 | if (_rc == LuaError::YIELD) { | 705 | if (_rc == LuaError::YIELD) { |
| 771 | #if LUA_VERSION_NUM >= 504 | 706 | #if LUA_VERSION_NUM >= 504 |
| 772 | lua_State* const _S{ lane_->S }; | 707 | lua_State* const _S{ lane_->S }; |
| @@ -855,7 +790,7 @@ static void lane_main(Lane* const lane_) | |||
| 855 | #if LUA_VERSION_NUM >= 504 | 790 | #if LUA_VERSION_NUM >= 504 |
| 856 | 791 | ||
| 857 | // __close(lane_ud, <err>) | 792 | // __close(lane_ud, <err>) |
| 858 | static LUAG_FUNC(lane___close) | 793 | static LUAG_FUNC(lane_close) |
| 859 | { | 794 | { |
| 860 | [[maybe_unused]] Lane* const _lane{ ToLane(L_, StackIndex{ 1 }) }; // L_: lane err|nil | 795 | [[maybe_unused]] Lane* const _lane{ ToLane(L_, StackIndex{ 1 }) }; // L_: lane err|nil |
| 861 | // drop the error if any | 796 | // drop the error if any |
| @@ -869,15 +804,6 @@ static LUAG_FUNC(lane___close) | |||
| 869 | return lua_gettop(L_); | 804 | return lua_gettop(L_); |
| 870 | } | 805 | } |
| 871 | 806 | ||
| 872 | // ################################################################################################# | ||
| 873 | |||
| 874 | // close(lane_ud) | ||
| 875 | static LUAG_FUNC(lane_close) | ||
| 876 | { | ||
| 877 | [[maybe_unused]] Lane* const _lane{ ToLane(L_, StackIndex{ 1 }) }; // L_: lane err|nil | ||
| 878 | raise_luaL_error(L_, "not implemented"); // TODO! | ||
| 879 | return 0; | ||
| 880 | } | ||
| 881 | #endif // LUA_VERSION_NUM >= 504 | 807 | #endif // LUA_VERSION_NUM >= 504 |
| 882 | 808 | ||
| 883 | // ################################################################################################# | 809 | // ################################################################################################# |
| @@ -1069,14 +995,11 @@ namespace { | |||
| 1069 | namespace local { | 995 | namespace local { |
| 1070 | static struct luaL_Reg const sLaneFunctions[] = { | 996 | static struct luaL_Reg const sLaneFunctions[] = { |
| 1071 | #if LUA_VERSION_NUM >= 504 | 997 | #if LUA_VERSION_NUM >= 504 |
| 1072 | { "__close", LG_lane___close }, | 998 | { "__close", LG_lane_close }, |
| 1073 | #endif // LUA_VERSION_NUM >= 504 | 999 | #endif // LUA_VERSION_NUM >= 504 |
| 1074 | { "__gc", LG_lane_gc }, | 1000 | { "__gc", LG_lane_gc }, |
| 1075 | { "__index", LG_lane_index }, | 1001 | { "__index", LG_lane_index }, |
| 1076 | { "cancel", LG_lane_cancel }, | 1002 | { "cancel", LG_lane_cancel }, |
| 1077 | #if LUA_VERSION_NUM >= 504 | ||
| 1078 | { "close", LG_lane_close }, | ||
| 1079 | #endif // LUA_VERSION_NUM >= 504 | ||
| 1080 | { "get_threadname", LG_lane_get_threadname }, | 1003 | { "get_threadname", LG_lane_get_threadname }, |
| 1081 | { "join", LG_lane_join }, | 1004 | { "join", LG_lane_join }, |
| 1082 | { "resume", LG_lane_resume }, | 1005 | { "resume", LG_lane_resume }, |
| @@ -1171,6 +1094,82 @@ void Lane::pushIndexedResult(lua_State* const L_, int const key_) const | |||
| 1171 | // ################################################################################################# | 1094 | // ################################################################################################# |
| 1172 | 1095 | ||
| 1173 | [[nodiscard]] | 1096 | [[nodiscard]] |
| 1097 | int Lane::pushStoredResults(lua_State* const L_) const | ||
| 1098 | { | ||
| 1099 | STACK_CHECK_START_ABS(L_, 1); // should only have the lane UD on the stack | ||
| 1100 | static constexpr StackIndex kIdxSelf{ 1 }; | ||
| 1101 | static constexpr UserValueIndex kUvResults{ 1 }; | ||
| 1102 | LUA_ASSERT(L_, ToLane(L_, kIdxSelf) == this); // L_: lane | ||
| 1103 | lua_getiuservalue(L_, kIdxSelf, kUvResults); // L_: lane {uv} | ||
| 1104 | lua_rawgeti(L_, kIdxTop, 0); // L_: lane {uv} stored | ||
| 1105 | int const _stored{ static_cast<int>(lua_tointeger(L_, kIdxTop)) }; | ||
| 1106 | lua_pop(L_, 1); // L_: lane {uv} | ||
| 1107 | |||
| 1108 | int _ret{}; | ||
| 1109 | STACK_GROW(L_, std::max(3, _stored + 1)); | ||
| 1110 | switch (status.load(std::memory_order_acquire)) { | ||
| 1111 | case Lane::Suspended: | ||
| 1112 | raise_luaL_error(L_, "INTERNAL ERROR: SHOULD NEVER BE SUSPENDED HERE"); | ||
| 1113 | break; | ||
| 1114 | |||
| 1115 | case Lane::Done: // got regular return values | ||
| 1116 | if (_stored > 0) { | ||
| 1117 | for (int _i = 2; _i <= _stored; ++_i) { | ||
| 1118 | lua_rawgeti(L_, 2, _i); // L_: lane {uv} results2...N | ||
| 1119 | } | ||
| 1120 | lua_rawgeti(L_, 2, 1); // L_: lane {uv} results2...N result1 | ||
| 1121 | lua_replace(L_, 2); // L_: lane results | ||
| 1122 | } else { | ||
| 1123 | lua_pop(L_, 1); // L_: lane | ||
| 1124 | } | ||
| 1125 | // we precede the lane body returned values with boolean true | ||
| 1126 | lua_pushboolean(L_, 1); // L_: lane results true | ||
| 1127 | lua_replace(L_, 1); // L_: true results | ||
| 1128 | _ret = _stored + 1; | ||
| 1129 | STACK_CHECK(L_, _ret); | ||
| 1130 | break; | ||
| 1131 | |||
| 1132 | case Lane::Error: | ||
| 1133 | { | ||
| 1134 | LUA_ASSERT(L_, _stored == 2 || _stored == 3); // contains nil error [trace] | ||
| 1135 | lua_rawgeti(L_, 2, 2); // L_: lane {uv} <error> | ||
| 1136 | lua_rawgeti(L_, 2, 3); // L_: lane {uv} <error> <trace>|nil | ||
| 1137 | if (lua_isnil(L_, -1)) { | ||
| 1138 | lua_replace(L_, 2); // L_: lane nil <error> | ||
| 1139 | } else { | ||
| 1140 | lua_rawgeti(L_, 2, 1); // L_: lane {uv} <error> <trace> nil | ||
| 1141 | lua_replace(L_, 2); // L_: lane nil <error> <trace> | ||
| 1142 | } | ||
| 1143 | _ret = _stored; // 2 or 3 | ||
| 1144 | STACK_CHECK(L_, _ret + 1); // stack still contains the lane UD below | ||
| 1145 | } | ||
| 1146 | break; | ||
| 1147 | |||
| 1148 | case Lane::Cancelled: | ||
| 1149 | { | ||
| 1150 | LUA_ASSERT(L_, _stored == 2); | ||
| 1151 | lua_rawgeti(L_, 2, 2); // L_: lane {uv} cancel_error | ||
| 1152 | lua_rawgeti(L_, 2, 1); // L_: lane {uv} cancel_error nil | ||
| 1153 | lua_replace(L_, -3); // L_: lane nil cancel_error | ||
| 1154 | LUA_ASSERT(L_, lua_isnil(L_, -2) && kCancelError.equals(L_, kIdxTop)); | ||
| 1155 | _ret = 2; | ||
| 1156 | STACK_CHECK(L_, _ret + 1); // stack still contains the lane UD below | ||
| 1157 | } | ||
| 1158 | break; | ||
| 1159 | |||
| 1160 | default: | ||
| 1161 | DEBUGSPEW_CODE(DebugSpew(nullptr) << "Unknown Lane status: " << static_cast<int>(_lane->status.load(std::memory_order_relaxed)) << std::endl); | ||
| 1162 | LUA_ASSERT(L_, false); | ||
| 1163 | _ret = 0; | ||
| 1164 | STACK_CHECK(L_, _ret); | ||
| 1165 | } | ||
| 1166 | LUA_ASSERT(L_, lua_gettop(L_) >= _ret); | ||
| 1167 | return _ret; | ||
| 1168 | } | ||
| 1169 | |||
| 1170 | // ################################################################################################# | ||
| 1171 | |||
| 1172 | [[nodiscard]] | ||
| 1174 | std::string_view Lane::pushErrorTraceLevel(lua_State* L_) const | 1173 | std::string_view Lane::pushErrorTraceLevel(lua_State* L_) const |
| 1175 | { | 1174 | { |
| 1176 | std::string_view const _str{ errorTraceLevelString() }; | 1175 | std::string_view const _str{ errorTraceLevelString() }; |
| @@ -1270,15 +1269,16 @@ int Lane::storeResults(lua_State* const L_) | |||
| 1270 | lua_getiuservalue(L_, kIdxSelf, UserValueIndex{ 1 }); // L_: lane ... {uv} | 1269 | lua_getiuservalue(L_, kIdxSelf, UserValueIndex{ 1 }); // L_: lane ... {uv} |
| 1271 | StackIndex const _tidx{ lua_gettop(L_) }; | 1270 | StackIndex const _tidx{ lua_gettop(L_) }; |
| 1272 | 1271 | ||
| 1273 | int _stored{}; | 1272 | // if the results were already stored from a previous indexing, just say how many values we have in store |
| 1274 | if (nresults == 0) { | 1273 | if (!L) { |
| 1275 | lua_rawgeti(L_, -1, 0); // L_: lane ... {uv} nresults | 1274 | lua_rawgeti(L_, -1, 0); // L_: lane ... {uv} nresults |
| 1276 | _stored = static_cast<int>(lua_tointeger(L_, -1)); | 1275 | auto const _stored{ static_cast<int>(lua_tointeger(L_, -1)) }; |
| 1277 | lua_pop(L_, 2); | 1276 | lua_pop(L_, 2); |
| 1278 | STACK_CHECK(L_, 0); | 1277 | STACK_CHECK(L_, 0); |
| 1279 | return _stored; | 1278 | return _stored; |
| 1280 | } | 1279 | } |
| 1281 | 1280 | ||
| 1281 | int _stored{}; | ||
| 1282 | switch (status.load(std::memory_order_acquire)) { | 1282 | switch (status.load(std::memory_order_acquire)) { |
| 1283 | default: | 1283 | default: |
| 1284 | // this is an internal error, we probably never get here | 1284 | // this is an internal error, we probably never get here |
| @@ -1358,12 +1358,13 @@ int Lane::storeResults(lua_State* const L_) | |||
| 1358 | //--- | 1358 | //--- |
| 1359 | // str= thread_status( lane ) | 1359 | // str= thread_status( lane ) |
| 1360 | // | 1360 | // |
| 1361 | // "pending" -> | ("running" <-> "waiting") <-> "suspended" <-> "resuming" | -> "done"/"error"/"cancelled" | 1361 | // "pending" -> | ("running" <-> "waiting") <-> "suspended" <-> "resuming/closing" | -> "done"/"error"/"cancelled" |
| 1362 | 1362 | ||
| 1363 | // "pending" not started yet | 1363 | // "pending" not started yet |
| 1364 | // "running" started, doing its work.. | 1364 | // "running" started, doing its work.. |
| 1365 | // "suspended" returned from a lua_resume | 1365 | // "suspended" returned from a lua_resume |
| 1366 | // "resuming" told by its parent state to resume | 1366 | // "resuming" told by its parent state to resume |
| 1367 | // "closing" not observable from the outside: happens only inside a join()/indexation call to unblock a suspended coroutine Lane so that it can join properly | ||
| 1367 | // "waiting" blocked in a send()/receive() | 1368 | // "waiting" blocked in a send()/receive() |
| 1368 | // "done" finished, results are there | 1369 | // "done" finished, results are there |
| 1369 | // "error" finished at an error, error value is there | 1370 | // "error" finished at an error, error value is there |
| @@ -1374,7 +1375,7 @@ std::string_view Lane::threadStatusString() const | |||
| 1374 | { | 1375 | { |
| 1375 | static constexpr std::string_view kStrs[] = { | 1376 | static constexpr std::string_view kStrs[] = { |
| 1376 | "pending", | 1377 | "pending", |
| 1377 | "running", "suspended", "resuming", | 1378 | "running", "suspended", "resuming", "closing", |
| 1378 | "waiting", | 1379 | "waiting", |
| 1379 | "done", "error", "cancelled" | 1380 | "done", "error", "cancelled" |
| 1380 | }; | 1381 | }; |
| @@ -1382,12 +1383,13 @@ std::string_view Lane::threadStatusString() const | |||
| 1382 | static_assert(1 == static_cast<std::underlying_type_t<Lane::Status>>(Running)); | 1383 | static_assert(1 == static_cast<std::underlying_type_t<Lane::Status>>(Running)); |
| 1383 | static_assert(2 == static_cast<std::underlying_type_t<Lane::Status>>(Suspended)); | 1384 | static_assert(2 == static_cast<std::underlying_type_t<Lane::Status>>(Suspended)); |
| 1384 | static_assert(3 == static_cast<std::underlying_type_t<Lane::Status>>(Resuming)); | 1385 | static_assert(3 == static_cast<std::underlying_type_t<Lane::Status>>(Resuming)); |
| 1385 | static_assert(4 == static_cast<std::underlying_type_t<Lane::Status>>(Waiting)); | 1386 | static_assert(4 == static_cast<std::underlying_type_t<Lane::Status>>(Closing)); |
| 1386 | static_assert(5 == static_cast<std::underlying_type_t<Lane::Status>>(Done)); | 1387 | static_assert(5 == static_cast<std::underlying_type_t<Lane::Status>>(Waiting)); |
| 1387 | static_assert(6 == static_cast<std::underlying_type_t<Lane::Status>>(Error)); | 1388 | static_assert(6 == static_cast<std::underlying_type_t<Lane::Status>>(Done)); |
| 1388 | static_assert(7 == static_cast<std::underlying_type_t<Lane::Status>>(Cancelled)); | 1389 | static_assert(7 == static_cast<std::underlying_type_t<Lane::Status>>(Error)); |
| 1390 | static_assert(8 == static_cast<std::underlying_type_t<Lane::Status>>(Cancelled)); | ||
| 1389 | auto const _status{ static_cast<std::underlying_type_t<Lane::Status>>(status.load(std::memory_order_acquire)) }; | 1391 | auto const _status{ static_cast<std::underlying_type_t<Lane::Status>>(status.load(std::memory_order_acquire)) }; |
| 1390 | if (_status < 0 || _status > 7) { // should never happen, but better safe than sorry | 1392 | if (_status < 0 || _status > 8) { // should never happen, but better safe than sorry |
| 1391 | return ""; | 1393 | return ""; |
| 1392 | } | 1394 | } |
| 1393 | return kStrs[_status]; | 1395 | return kStrs[_status]; |
| @@ -1407,3 +1409,40 @@ bool Lane::waitForCompletion(std::chrono::time_point<std::chrono::steady_clock> | |||
| 1407 | return _status == Lane::Done || _status == Lane::Error || _status == Lane::Cancelled || _status == suspended; | 1409 | return _status == Lane::Done || _status == Lane::Error || _status == Lane::Cancelled || _status == suspended; |
| 1408 | }); | 1410 | }); |
| 1409 | } | 1411 | } |
| 1412 | |||
| 1413 | // ################################################################################################# | ||
| 1414 | |||
| 1415 | [[nodiscard]] | ||
| 1416 | bool Lane::waitForJoin(lua_State* const L_, std::chrono::time_point<std::chrono::steady_clock> until_) | ||
| 1417 | { | ||
| 1418 | // wait until suspended or done | ||
| 1419 | { | ||
| 1420 | bool const _done{ !thread.joinable() || waitForCompletion(until_, true) }; | ||
| 1421 | |||
| 1422 | if (!_done) { | ||
| 1423 | lua_pushnil(L_); // L_: lane nil | ||
| 1424 | luaG_pushstring(L_, "timeout"); // L_: lane nil "timeout" | ||
| 1425 | return false; | ||
| 1426 | } | ||
| 1427 | } | ||
| 1428 | |||
| 1429 | // if lane is suspended, force the yield loop to break, and the termination of the thread | ||
| 1430 | if (status.load(std::memory_order_acquire) == Lane::Suspended) { | ||
| 1431 | LUA_ASSERT(L_, waiting_on == &doneCondVar); | ||
| 1432 | status.store(Lane::Closing, std::memory_order_release); | ||
| 1433 | doneCondVar.notify_all(); | ||
| 1434 | // wait until done | ||
| 1435 | { | ||
| 1436 | bool const _done{ !thread.joinable() || waitForCompletion(until_, true) }; | ||
| 1437 | |||
| 1438 | if (!_done) { | ||
| 1439 | lua_pushnil(L_); // L_: lane nil | ||
| 1440 | luaG_pushstring(L_, "timeout"); // L_: lane nil "timeout" | ||
| 1441 | return false; | ||
| 1442 | } | ||
| 1443 | } | ||
| 1444 | LUA_ASSERT(L_, status.load(std::memory_order_acquire) != Lane::Closing); | ||
| 1445 | } | ||
| 1446 | LUA_ASSERT(L_, status.load(std::memory_order_acquire) != Lane::Suspended); | ||
| 1447 | return true; | ||
| 1448 | } | ||
diff --git a/src/lane.hpp b/src/lane.hpp index fd46d68..bd328b1 100644 --- a/src/lane.hpp +++ b/src/lane.hpp | |||
| @@ -56,6 +56,7 @@ class Lane final | |||
| 56 | /* | 56 | /* |
| 57 | Pending: The Lua VM hasn't done anything yet. | 57 | Pending: The Lua VM hasn't done anything yet. |
| 58 | Resuming: The user requested the lane to resume execution from Suspended state. | 58 | Resuming: The user requested the lane to resume execution from Suspended state. |
| 59 | Closing: The user is joining the lane, specifically interrupting a suspended Lane. | ||
| 59 | Suspended: returned from lua_resume, waiting for the client to request a lua_resume. | 60 | Suspended: returned from lua_resume, waiting for the client to request a lua_resume. |
| 60 | Running, Suspended, Waiting: Thread is inside the Lua VM. | 61 | Running, Suspended, Waiting: Thread is inside the Lua VM. |
| 61 | Done, Error, Cancelled: Thread execution is outside the Lua VM. It can be lua_close()d. | 62 | Done, Error, Cancelled: Thread execution is outside the Lua VM. It can be lua_close()d. |
| @@ -66,6 +67,7 @@ class Lane final | |||
| 66 | Running, | 67 | Running, |
| 67 | Suspended, | 68 | Suspended, |
| 68 | Resuming, | 69 | Resuming, |
| 70 | Closing, | ||
| 69 | Waiting, | 71 | Waiting, |
| 70 | Done, | 72 | Done, |
| 71 | Error, | 73 | Error, |
| @@ -199,6 +201,8 @@ class Lane final | |||
| 199 | static void PushMetatable(lua_State* L_); | 201 | static void PushMetatable(lua_State* L_); |
| 200 | void pushStatusString(lua_State* L_) const; | 202 | void pushStatusString(lua_State* L_) const; |
| 201 | void pushIndexedResult(lua_State* L_, int key_) const; | 203 | void pushIndexedResult(lua_State* L_, int key_) const; |
| 204 | [[nodiscard]] | ||
| 205 | int pushStoredResults(lua_State* L_) const; | ||
| 202 | void resetResultsStorage(lua_State* L_, StackIndex self_idx_); | 206 | void resetResultsStorage(lua_State* L_, StackIndex self_idx_); |
| 203 | void selfdestructAdd(); | 207 | void selfdestructAdd(); |
| 204 | [[nodiscard]] | 208 | [[nodiscard]] |
| @@ -213,6 +217,8 @@ class Lane final | |||
| 213 | // wait until the lane stops working with its state (either Suspended or Done+) | 217 | // wait until the lane stops working with its state (either Suspended or Done+) |
| 214 | [[nodiscard]] | 218 | [[nodiscard]] |
| 215 | bool waitForCompletion(std::chrono::time_point<std::chrono::steady_clock> until_, bool const _acceptSuspended); | 219 | bool waitForCompletion(std::chrono::time_point<std::chrono::steady_clock> until_, bool const _acceptSuspended); |
| 220 | [[nodiscard]] | ||
| 221 | bool waitForJoin(lua_State* _L, std::chrono::time_point<std::chrono::steady_clock> until_); | ||
| 216 | }; | 222 | }; |
| 217 | 223 | ||
| 218 | // ################################################################################################# | 224 | // ################################################################################################# |
