diff options
| author | Benoit Germain <benoit.germain@ubisoft.com> | 2025-06-26 09:18:54 +0200 |
|---|---|---|
| committer | Benoit Germain <benoit.germain@ubisoft.com> | 2025-06-26 09:18:54 +0200 |
| commit | d7d756e30680bcff036118b47ac45b740e020ea2 (patch) | |
| tree | 3e2a26409154760d66092e6e04a9fcb4ad4ed02a /src | |
| parent | 4f5fa626c0279f5aefac477a29702c43a6754c5a (diff) | |
| download | lanes-d7d756e30680bcff036118b47ac45b740e020ea2.tar.gz lanes-d7d756e30680bcff036118b47ac45b740e020ea2.tar.bz2 lanes-d7d756e30680bcff036118b47ac45b740e020ea2.zip | |
Preparation for lane:close() and correct to-be-closed variables
* lane:join() can no longer be used to read yielded values
* same with lane indexing
* stub for lane:close(), similar to coroutine.close() (not implemented yet)
* preparing tests for to-be-closed variables in yielded coroutine lanes
* yielded lanes unlock and terminate properly at Lanes shutdown
Diffstat (limited to 'src')
| -rw-r--r-- | src/lane.cpp | 79 |
1 files changed, 62 insertions, 17 deletions
diff --git a/src/lane.cpp b/src/lane.cpp index c6ea358..f8685e9 100644 --- a/src/lane.cpp +++ b/src/lane.cpp | |||
| @@ -129,8 +129,13 @@ static LUAG_FUNC(lane_join) | |||
| 129 | raise_luaL_argerror(L_, StackIndex{ 2 }, "incorrect duration type"); | 129 | raise_luaL_argerror(L_, StackIndex{ 2 }, "incorrect duration type"); |
| 130 | } | 130 | } |
| 131 | 131 | ||
| 132 | // it is forbidden to join a suspended coroutine | ||
| 133 | if (_lane->status.load(std::memory_order_acquire) == Lane::Suspended) { | ||
| 134 | raise_luaL_error(L_, "cannot join a suspended coroutine"); | ||
| 135 | } | ||
| 136 | |||
| 132 | lua_settop(L_, 1); // L_: lane | 137 | lua_settop(L_, 1); // L_: lane |
| 133 | bool const _done{ !_lane->thread.joinable() || _lane->waitForCompletion(_until, true) }; | 138 | bool const _done{ !_lane->thread.joinable() || _lane->waitForCompletion(_until, false) }; |
| 134 | 139 | ||
| 135 | if (!_done) { | 140 | if (!_done) { |
| 136 | lua_pushnil(L_); // L_: lane nil | 141 | lua_pushnil(L_); // L_: lane nil |
| @@ -145,7 +150,11 @@ static LUAG_FUNC(lane_join) | |||
| 145 | int const _stored{ _lane->storeResults(L_) }; | 150 | int const _stored{ _lane->storeResults(L_) }; |
| 146 | STACK_GROW(L_, std::max(3, _stored + 1)); | 151 | STACK_GROW(L_, std::max(3, _stored + 1)); |
| 147 | switch (_lane->status.load(std::memory_order_acquire)) { | 152 | switch (_lane->status.load(std::memory_order_acquire)) { |
| 148 | case Lane::Suspended: // got yielded values | 153 | case Lane::Suspended: |
| 154 | // TODO: maybe this can happen if we have a to-be-closed handle on a suspended lane? TO BE TESTED! | ||
| 155 | raise_luaL_error(L_, "INTERNAL ERROR: should not join a suspended coroutine"); | ||
| 156 | [[fallthrough]]; | ||
| 157 | |||
| 149 | case Lane::Done: // got regular return values | 158 | case Lane::Done: // got regular return values |
| 150 | if (_stored > 0) { | 159 | if (_stored > 0) { |
| 151 | lua_getiuservalue(L_, StackIndex{ 1 }, UserValueIndex{ 1 }); // L_: lane {uv} | 160 | lua_getiuservalue(L_, StackIndex{ 1 }, UserValueIndex{ 1 }); // L_: lane {uv} |
| @@ -210,7 +219,7 @@ LUAG_FUNC(lane_resume) | |||
| 210 | Lane* const _lane{ ToLane(L_, kIdxSelf) }; | 219 | Lane* const _lane{ ToLane(L_, kIdxSelf) }; |
| 211 | lua_State* const _L2{ _lane->L }; | 220 | lua_State* const _L2{ _lane->L }; |
| 212 | 221 | ||
| 213 | // wait until the lane yields or returns | 222 | // wait until the lane yields or returns |
| 214 | std::ignore = _lane->waitForCompletion(std::chrono::time_point<std::chrono::steady_clock>::max(), true); | 223 | std::ignore = _lane->waitForCompletion(std::chrono::time_point<std::chrono::steady_clock>::max(), true); |
| 215 | 224 | ||
| 216 | if (_lane->status.load(std::memory_order_acquire) != Lane::Suspended) { | 225 | if (_lane->status.load(std::memory_order_acquire) != Lane::Suspended) { |
| @@ -259,11 +268,17 @@ static int lane_index_number(lua_State* L_) | |||
| 259 | int const _key{ static_cast<int>(lua_tointeger(L_, 2)) }; | 268 | int const _key{ static_cast<int>(lua_tointeger(L_, 2)) }; |
| 260 | lua_pop(L_, 1); // L_: lane | 269 | lua_pop(L_, 1); // L_: lane |
| 261 | 270 | ||
| 271 | // wait until the lane finishes or is suspended | ||
| 262 | std::chrono::time_point<std::chrono::steady_clock> _until{ std::chrono::time_point<std::chrono::steady_clock>::max() }; | 272 | std::chrono::time_point<std::chrono::steady_clock> _until{ std::chrono::time_point<std::chrono::steady_clock>::max() }; |
| 263 | if (!_lane->waitForCompletion(_until, true)) { | 273 | if (!_lane->waitForCompletion(_until, true)) { |
| 264 | raise_luaL_error(L_, "INTERNAL ERROR: Failed to join"); | 274 | raise_luaL_error(L_, "INTERNAL ERROR: Failed to join"); |
| 265 | } | 275 | } |
| 266 | 276 | ||
| 277 | // it is forbidden to index a suspended coroutine. if you want to read yielded values, use lane:resume() | ||
| 278 | if (_lane->status.load(std::memory_order_acquire) == Lane::Suspended) { | ||
| 279 | raise_luaL_error(L_, "cannot index a suspended coroutine"); | ||
| 280 | } | ||
| 281 | |||
| 267 | // make sure results are stored | 282 | // make sure results are stored |
| 268 | int const _stored{ _lane->storeResults(L_) }; | 283 | int const _stored{ _lane->storeResults(L_) }; |
| 269 | if (_key > _stored) { | 284 | if (_key > _stored) { |
| @@ -485,6 +500,7 @@ static int PushStackTrace(lua_State* const L_, Lane::ErrorTraceLevel const error | |||
| 485 | StackIndex const _top{ lua_gettop(L_) }; | 500 | StackIndex const _top{ lua_gettop(L_) }; |
| 486 | switch (rc_) { | 501 | switch (rc_) { |
| 487 | case LuaError::OK: // no error, body return values are on the stack | 502 | case LuaError::OK: // no error, body return values are on the stack |
| 503 | case LuaError::YIELD: | ||
| 488 | break; | 504 | break; |
| 489 | 505 | ||
| 490 | case LuaError::ERRRUN: // cancellation or a runtime error | 506 | case LuaError::ERRRUN: // cancellation or a runtime error |
| @@ -707,7 +723,7 @@ static void lane_main(Lane* const lane_) | |||
| 707 | LuaError _rc{ LuaError::ERRRUN }; | 723 | LuaError _rc{ LuaError::ERRRUN }; |
| 708 | if (lane_->status.load(std::memory_order_acquire) == Lane::Pending) { // nothing wrong happened during preparation, we can work | 724 | if (lane_->status.load(std::memory_order_acquire) == Lane::Pending) { // nothing wrong happened during preparation, we can work |
| 709 | // At this point, the lane function and arguments are on the stack, possibly preceded by the error handler | 725 | // At this point, the lane function and arguments are on the stack, possibly preceded by the error handler |
| 710 | int const _errorHandlerCount{ lane_->errorHandlerCount() }; | 726 | int const _errorHandlerCount{ lane_->errorHandlerCount() }; // no error handler for coroutines, ever. |
| 711 | int _nargs{ lua_gettop(_L) - 1 - _errorHandlerCount }; | 727 | int _nargs{ lua_gettop(_L) - 1 - _errorHandlerCount }; |
| 712 | { | 728 | { |
| 713 | std::unique_lock _guard{ lane_->doneMutex }; | 729 | std::unique_lock _guard{ lane_->doneMutex }; |
| @@ -720,27 +736,40 @@ static void lane_main(Lane* const lane_) | |||
| 720 | lane_->nresults = lua_gettop(_L) - _errorHandlerCount; | 736 | lane_->nresults = lua_gettop(_L) - _errorHandlerCount; |
| 721 | } else { | 737 | } else { |
| 722 | // S and L are different: we run as a coroutine in Lua thread L created in state S | 738 | // S and L are different: we run as a coroutine in Lua thread L created in state S |
| 739 | bool _again{ true }; | ||
| 723 | do { | 740 | do { |
| 724 | // starting with Lua 5.4, lua_resume can leave more stuff on the stack below the actual yielded values. | 741 | // starting with Lua 5.4, lua_resume can leave more stuff on the stack below the actual yielded values. |
| 725 | // that's why we have lane_->nresults | 742 | // that's why we have lane_->nresults |
| 726 | _rc = luaG_resume(_L, nullptr, _nargs, &lane_->nresults); // L: eh? ... retvals|err... | 743 | _rc = luaG_resume(_L, nullptr, _nargs, &lane_->nresults); // L: ... retvals|err... |
| 727 | if (_rc == LuaError::YIELD) { | 744 | if (_rc == LuaError::YIELD) { |
| 745 | // on the stack we find the values pushed by lane:resume() | ||
| 746 | _nargs = lua_gettop(_L); | ||
| 747 | if (std::unique_lock _guard{ lane_->doneMutex }; true) | ||
| 748 | { | ||
| 728 | // change our status to suspended, and wait until someone wants us to resume | 749 | // change our status to suspended, and wait until someone wants us to resume |
| 729 | std::unique_lock _guard{ lane_->doneMutex }; | ||
| 730 | lane_->status.store(Lane::Suspended, std::memory_order_release); // Running -> Suspended | 750 | lane_->status.store(Lane::Suspended, std::memory_order_release); // Running -> Suspended |
| 731 | lane_->doneCondVar.notify_one(); | 751 | lane_->doneCondVar.notify_one(); |
| 732 | // wait until the user wants us to resume | 752 | // wait until the user wants us to resume |
| 733 | // TODO: do I update waiting_on or not, so that the lane can be woken by cancellation requests here? | 753 | // update waiting_on, so that the lane can be woken by cancellation requests here? |
| 734 | // lane_->waiting_on = &lane_->doneCondVar; | 754 | lane_->waiting_on = &lane_->doneCondVar; |
| 735 | lane_->doneCondVar.wait(_guard, [lane_]() { return lane_->status.load(std::memory_order_acquire) == Lane::Resuming; }); | 755 | lane_->doneCondVar.wait(_guard, [lane_]() { return lane_->status.load(std::memory_order_acquire) == Lane::Resuming || lane_->cancelRequest.load(std::memory_order_relaxed) != CancelRequest::None; }); |
| 736 | // here lane_->doneMutex is locked again | 756 | // here lane_->doneMutex is locked again |
| 737 | // lane_->waiting_on = nullptr; | 757 | lane_->waiting_on = nullptr; |
| 738 | lane_->status.store(Lane::Running, std::memory_order_release); // Resuming -> Running | 758 | lane_->status.store(Lane::Running, std::memory_order_release); // Resuming -> Running |
| 739 | // on the stack we find the values pushed by lane:resume() | ||
| 740 | _nargs = lua_gettop(_L); | ||
| 741 | } | 759 | } |
| 742 | } while (_rc == LuaError::YIELD); | 760 | // wait was interrupted because of a cancellation, finish the lane |
| 743 | if (_rc != LuaError::OK) { // : err... | 761 | _again = (lane_->cancelRequest.load(std::memory_order_relaxed) == CancelRequest::None); |
| 762 | } else { | ||
| 763 | _again = false; | ||
| 764 | } | ||
| 765 | } while (_again); | ||
| 766 | #if LUA_VERSION_NUM >= 504 | ||
| 767 | if (_rc == LuaError::YIELD) { | ||
| 768 | // lane is cancelled before completion (for example at Lanes shutdown), close everything | ||
| 769 | _rc = static_cast<LuaError>(lua_closethread(_L, nullptr)); | ||
| 770 | } | ||
| 771 | #endif // LUA_VERSION_NUM | ||
| 772 | if (_rc != LuaError::OK) { // an error occurred // L: err... | ||
| 744 | // for some reason, in my tests with Lua 5.4, when the coroutine raises an error, I have 3 copies of it on the stack | 773 | // for some reason, in my tests with Lua 5.4, when the coroutine raises an error, I have 3 copies of it on the stack |
| 745 | // or false + the error message when running Lua 5.1 | 774 | // or false + the error message when running Lua 5.1 |
| 746 | // since the rest of our code wants only the error message, let us keep only the latter. | 775 | // since the rest of our code wants only the error message, let us keep only the latter. |
| @@ -805,7 +834,9 @@ static void lane_main(Lane* const lane_) | |||
| 805 | // ################################################################################################# | 834 | // ################################################################################################# |
| 806 | 835 | ||
| 807 | #if LUA_VERSION_NUM >= 504 | 836 | #if LUA_VERSION_NUM >= 504 |
| 808 | static LUAG_FUNC(lane_close) | 837 | |
| 838 | // __close(lane_ud, <err>) | ||
| 839 | static LUAG_FUNC(lane___close) | ||
| 809 | { | 840 | { |
| 810 | [[maybe_unused]] Lane* const _lane{ ToLane(L_, StackIndex{ 1 }) }; // L_: lane err|nil | 841 | [[maybe_unused]] Lane* const _lane{ ToLane(L_, StackIndex{ 1 }) }; // L_: lane err|nil |
| 811 | // drop the error if any | 842 | // drop the error if any |
| @@ -818,6 +849,16 @@ static LUAG_FUNC(lane_close) | |||
| 818 | lua_call(L_, 1, LUA_MULTRET); // L_: join() results | 849 | lua_call(L_, 1, LUA_MULTRET); // L_: join() results |
| 819 | return lua_gettop(L_); | 850 | return lua_gettop(L_); |
| 820 | } | 851 | } |
| 852 | |||
| 853 | // ################################################################################################# | ||
| 854 | |||
| 855 | // close(lane_ud) | ||
| 856 | static LUAG_FUNC(lane_close) | ||
| 857 | { | ||
| 858 | [[maybe_unused]] Lane* const _lane{ ToLane(L_, StackIndex{ 1 }) }; // L_: lane err|nil | ||
| 859 | raise_luaL_error(L_, "not implemented"); // TODO! | ||
| 860 | return 0; | ||
| 861 | } | ||
| 821 | #endif // LUA_VERSION_NUM >= 504 | 862 | #endif // LUA_VERSION_NUM >= 504 |
| 822 | 863 | ||
| 823 | // ################################################################################################# | 864 | // ################################################################################################# |
| @@ -966,7 +1007,8 @@ CancelResult Lane::internalCancel(CancelRequest const rq_, std::chrono::time_poi | |||
| 966 | // lane_->thread.get_stop_source().request_stop(); | 1007 | // lane_->thread.get_stop_source().request_stop(); |
| 967 | } | 1008 | } |
| 968 | if (wakeLane_ == WakeLane::Yes) { // wake the thread so that execution returns from any pending linda operation if desired | 1009 | if (wakeLane_ == WakeLane::Yes) { // wake the thread so that execution returns from any pending linda operation if desired |
| 969 | if (status.load(std::memory_order_acquire) == Lane::Waiting) { // waiting_on is updated under control of status acquire/release semantics | 1010 | auto const _status{ status.load(std::memory_order_acquire) }; |
| 1011 | if (_status == Lane::Waiting || _status == Lane::Suspended) { // waiting_on is updated under control of status acquire/release semantics | ||
| 970 | if (std::condition_variable* const _waiting_on{ waiting_on }) { | 1012 | if (std::condition_variable* const _waiting_on{ waiting_on }) { |
| 971 | _waiting_on->notify_all(); | 1013 | _waiting_on->notify_all(); |
| 972 | } | 1014 | } |
| @@ -1007,11 +1049,14 @@ namespace { | |||
| 1007 | namespace local { | 1049 | namespace local { |
| 1008 | static struct luaL_Reg const sLaneFunctions[] = { | 1050 | static struct luaL_Reg const sLaneFunctions[] = { |
| 1009 | #if LUA_VERSION_NUM >= 504 | 1051 | #if LUA_VERSION_NUM >= 504 |
| 1010 | { "__close", LG_lane_close }, | 1052 | { "__close", LG_lane___close }, |
| 1011 | #endif // LUA_VERSION_NUM >= 504 | 1053 | #endif // LUA_VERSION_NUM >= 504 |
| 1012 | { "__gc", LG_lane_gc }, | 1054 | { "__gc", LG_lane_gc }, |
| 1013 | { "__index", LG_lane_index }, | 1055 | { "__index", LG_lane_index }, |
| 1014 | { "cancel", LG_lane_cancel }, | 1056 | { "cancel", LG_lane_cancel }, |
| 1057 | #if LUA_VERSION_NUM >= 504 | ||
| 1058 | { "close", LG_lane_close }, | ||
| 1059 | #endif // LUA_VERSION_NUM >= 504 | ||
| 1015 | { "get_threadname", LG_lane_get_threadname }, | 1060 | { "get_threadname", LG_lane_get_threadname }, |
| 1016 | { "join", LG_lane_join }, | 1061 | { "join", LG_lane_join }, |
| 1017 | { "resume", LG_lane_resume }, | 1062 | { "resume", LG_lane_resume }, |
