diff options
| author | Benoit Germain <benoit.germain@ubisoft.com> | 2025-07-01 08:01:46 +0200 |
|---|---|---|
| committer | Benoit Germain <benoit.germain@ubisoft.com> | 2025-07-01 08:01:46 +0200 |
| commit | 45774df1eeeaae0868420104a4cdad4691995dc9 (patch) | |
| tree | 9e7347ce1b8cff7cca2c5a733c33431a07636023 /src/lane.cpp | |
| parent | d7d756e30680bcff036118b47ac45b740e020ea2 (diff) | |
| download | lanes-45774df1eeeaae0868420104a4cdad4691995dc9.tar.gz lanes-45774df1eeeaae0868420104a4cdad4691995dc9.tar.bz2 lanes-45774df1eeeaae0868420104a4cdad4691995dc9.zip | |
Clarified interactions between join() and coroutines
Diffstat (limited to 'src/lane.cpp')
| -rw-r--r-- | src/lane.cpp | 76 |
1 files changed, 48 insertions, 28 deletions
diff --git a/src/lane.cpp b/src/lane.cpp index f8685e9..5ec6bcf 100644 --- a/src/lane.cpp +++ b/src/lane.cpp | |||
| @@ -128,14 +128,11 @@ static LUAG_FUNC(lane_join) | |||
| 128 | } else if (!lua_isnoneornil(L_, 2)) { | 128 | } else if (!lua_isnoneornil(L_, 2)) { |
| 129 | raise_luaL_argerror(L_, StackIndex{ 2 }, "incorrect duration type"); | 129 | raise_luaL_argerror(L_, StackIndex{ 2 }, "incorrect duration type"); |
| 130 | } | 130 | } |
| 131 | lua_settop(L_, 1); // L_: lane | ||
| 131 | 132 | ||
| 132 | // it is forbidden to join a suspended coroutine | ||
| 133 | if (_lane->status.load(std::memory_order_acquire) == Lane::Suspended) { | ||
| 134 | raise_luaL_error(L_, "cannot join a suspended coroutine"); | ||
| 135 | } | ||
| 136 | 133 | ||
| 137 | lua_settop(L_, 1); // L_: lane | 134 | // wait until suspended or done |
| 138 | bool const _done{ !_lane->thread.joinable() || _lane->waitForCompletion(_until, false) }; | 135 | bool const _done{ !_lane->thread.joinable() || _lane->waitForCompletion(_until, true) }; |
| 139 | 136 | ||
| 140 | if (!_done) { | 137 | if (!_done) { |
| 141 | lua_pushnil(L_); // L_: lane nil | 138 | lua_pushnil(L_); // L_: lane nil |
| @@ -143,6 +140,19 @@ static LUAG_FUNC(lane_join) | |||
| 143 | return 2; | 140 | return 2; |
| 144 | } | 141 | } |
| 145 | 142 | ||
| 143 | // if lane is suspended, force a cancellation that will cause the yield loop to break, and the termination of the thread | ||
| 144 | if (_lane->status.load(std::memory_order_acquire) == Lane::Suspended) { | ||
| 145 | auto const _cr{ _lane->cancel(CancelOp{ CancelRequest::Soft, LuaHookMask::None }, std::chrono::time_point<std::chrono::steady_clock>::max(), WakeLane::Yes, 0) }; | ||
| 146 | if (_cr == CancelResult::Timeout) { | ||
| 147 | lua_pushnil(L_); // L_: lane nil | ||
| 148 | luaG_pushstring(L_, "timeout"); // L_: lane nil "timeout" | ||
| 149 | return 2; | ||
| 150 | } | ||
| 151 | if (_lane->status.load(std::memory_order_acquire) == Lane::Suspended) { | ||
| 152 | raise_luaL_error(L_, "INTERNAL ERROR: Lane should not be suspended here"); | ||
| 153 | } | ||
| 154 | } | ||
| 155 | |||
| 146 | STACK_CHECK_START_REL(L_, 0); // L_: lane | 156 | STACK_CHECK_START_REL(L_, 0); // L_: lane |
| 147 | // Thread is Suspended or Done/Error/Cancelled; the Lane thread isn't working with it, therefore we can. | 157 | // Thread is Suspended or Done/Error/Cancelled; the Lane thread isn't working with it, therefore we can. |
| 148 | 158 | ||
| @@ -151,9 +161,8 @@ static LUAG_FUNC(lane_join) | |||
| 151 | STACK_GROW(L_, std::max(3, _stored + 1)); | 161 | STACK_GROW(L_, std::max(3, _stored + 1)); |
| 152 | switch (_lane->status.load(std::memory_order_acquire)) { | 162 | switch (_lane->status.load(std::memory_order_acquire)) { |
| 153 | case Lane::Suspended: | 163 | case Lane::Suspended: |
| 154 | // TODO: maybe this can happen if we have a to-be-closed handle on a suspended lane? TO BE TESTED! | 164 | raise_luaL_error(L_, "INTERNAL ERROR: SHOULD NEVER BE SUSPENDED HERE"); |
| 155 | raise_luaL_error(L_, "INTERNAL ERROR: should not join a suspended coroutine"); | 165 | break; |
| 156 | [[fallthrough]]; | ||
| 157 | 166 | ||
| 158 | case Lane::Done: // got regular return values | 167 | case Lane::Done: // got regular return values |
| 159 | if (_stored > 0) { | 168 | if (_stored > 0) { |
| @@ -274,11 +283,6 @@ static int lane_index_number(lua_State* L_) | |||
| 274 | raise_luaL_error(L_, "INTERNAL ERROR: Failed to join"); | 283 | raise_luaL_error(L_, "INTERNAL ERROR: Failed to join"); |
| 275 | } | 284 | } |
| 276 | 285 | ||
| 277 | // it is forbidden to index a suspended coroutine. if you want to read yielded values, use lane:resume() | ||
| 278 | if (_lane->status.load(std::memory_order_acquire) == Lane::Suspended) { | ||
| 279 | raise_luaL_error(L_, "cannot index a suspended coroutine"); | ||
| 280 | } | ||
| 281 | |||
| 282 | // make sure results are stored | 286 | // make sure results are stored |
| 283 | int const _stored{ _lane->storeResults(L_) }; | 287 | int const _stored{ _lane->storeResults(L_) }; |
| 284 | if (_key > _stored) { | 288 | if (_key > _stored) { |
| @@ -287,6 +291,7 @@ static int lane_index_number(lua_State* L_) | |||
| 287 | } else { | 291 | } else { |
| 288 | _lane->pushIndexedResult(L_, _key); // L_: lane result | 292 | _lane->pushIndexedResult(L_, _key); // L_: lane result |
| 289 | } | 293 | } |
| 294 | |||
| 290 | return 1; | 295 | return 1; |
| 291 | } | 296 | } |
| 292 | 297 | ||
| @@ -744,31 +749,45 @@ static void lane_main(Lane* const lane_) | |||
| 744 | if (_rc == LuaError::YIELD) { | 749 | if (_rc == LuaError::YIELD) { |
| 745 | // on the stack we find the values pushed by lane:resume() | 750 | // on the stack we find the values pushed by lane:resume() |
| 746 | _nargs = lua_gettop(_L); | 751 | _nargs = lua_gettop(_L); |
| 747 | if (std::unique_lock _guard{ lane_->doneMutex }; true) | 752 | if (std::unique_lock _guard{ lane_->doneMutex }; true) { |
| 748 | { | 753 | // change our status to suspended, and wait until someone wants us to resume |
| 749 | // change our status to suspended, and wait until someone wants us to resume | 754 | lane_->status.store(Lane::Suspended, std::memory_order_release); // Running -> Suspended |
| 750 | lane_->status.store(Lane::Suspended, std::memory_order_release); // Running -> Suspended | 755 | lane_->doneCondVar.notify_one(); |
| 751 | lane_->doneCondVar.notify_one(); | 756 | // wait until the user wants us to resume |
| 752 | // wait until the user wants us to resume | ||
| 753 | // update waiting_on, so that the lane can be woken by cancellation requests here? | 757 | // update waiting_on, so that the lane can be woken by cancellation requests here? |
| 754 | lane_->waiting_on = &lane_->doneCondVar; | 758 | lane_->waiting_on = &lane_->doneCondVar; |
| 755 | lane_->doneCondVar.wait(_guard, [lane_]() { return lane_->status.load(std::memory_order_acquire) == Lane::Resuming || lane_->cancelRequest.load(std::memory_order_relaxed) != CancelRequest::None; }); | 759 | lane_->doneCondVar.wait(_guard, [lane_]() { return lane_->status.load(std::memory_order_acquire) == Lane::Resuming || lane_->cancelRequest.load(std::memory_order_relaxed) != CancelRequest::None; }); |
| 756 | // here lane_->doneMutex is locked again | 760 | // here lane_->doneMutex is locked again |
| 757 | lane_->waiting_on = nullptr; | 761 | lane_->waiting_on = nullptr; |
| 758 | lane_->status.store(Lane::Running, std::memory_order_release); // Resuming -> Running | 762 | lane_->status.store(Lane::Running, std::memory_order_release); // Resuming -> Running |
| 759 | } | 763 | } |
| 760 | // wait was interrupted because of a cancellation, finish the lane | 764 | // wait was interrupted because of a cancellation, finish the lane |
| 761 | _again = (lane_->cancelRequest.load(std::memory_order_relaxed) == CancelRequest::None); | 765 | _again = (lane_->cancelRequest.load(std::memory_order_relaxed) == CancelRequest::None); |
| 762 | } else { | 766 | } else { |
| 763 | _again = false; | 767 | _again = false; |
| 764 | } | 768 | } |
| 765 | } while (_again); | 769 | } while (_again); |
| 766 | #if LUA_VERSION_NUM >= 504 | ||
| 767 | if (_rc == LuaError::YIELD) { | 770 | if (_rc == LuaError::YIELD) { |
| 771 | #if LUA_VERSION_NUM >= 504 | ||
| 772 | lua_State* const _S{ lane_->S }; | ||
| 773 | STACK_CHECK_START_REL(_S, 0); | ||
| 774 | // lua_closethread cleans the stack, meaning we lose the yielded values! -> store | ||
| 775 | lua_xmove(_L, _S, lane_->nresults); | ||
| 768 | // lane is cancelled before completion (for example at Lanes shutdown), close everything | 776 | // lane is cancelled before completion (for example at Lanes shutdown), close everything |
| 769 | _rc = static_cast<LuaError>(lua_closethread(_L, nullptr)); | 777 | _rc = static_cast<LuaError>(lua_closethread(_L, nullptr)); // L: ... retvals|err <close_err> |
| 770 | } | 778 | // then restore the yielded values |
| 779 | if (_rc == LuaError::OK) { | ||
| 780 | lua_xmove(_S, _L, lane_->nresults); | ||
| 781 | } else { | ||
| 782 | lua_pop(_S, lane_->nresults); | ||
| 783 | } | ||
| 784 | STACK_CHECK(_S, 0); | ||
| 785 | |||
| 786 | #else // LUA_VERSION_NUM | ||
| 787 | // Lua prior to 5.4 do not have lua_closethread. | ||
| 788 | _rc = LuaError::OK; | ||
| 771 | #endif // LUA_VERSION_NUM | 789 | #endif // LUA_VERSION_NUM |
| 790 | } | ||
| 772 | if (_rc != LuaError::OK) { // an error occurred // L: err... | 791 | if (_rc != LuaError::OK) { // an error occurred // L: err... |
| 773 | // for some reason, in my tests with Lua 5.4, when the coroutine raises an error, I have 3 copies of it on the stack | 792 | // for some reason, in my tests with Lua 5.4, when the coroutine raises an error, I have 3 copies of it on the stack |
| 774 | // or false + the error message when running Lua 5.1 | 793 | // or false + the error message when running Lua 5.1 |
| @@ -980,7 +999,8 @@ CancelResult Lane::cancel(CancelOp const op_, std::chrono::time_point<std::chron | |||
| 980 | 999 | ||
| 981 | // remember that lanes are not transferable: only one thread can cancel a lane, so no multithreading issue here | 1000 | // remember that lanes are not transferable: only one thread can cancel a lane, so no multithreading issue here |
| 982 | // We can read status without locks, but not wait for it (if Posix no PTHREAD_TIMEDJOIN) | 1001 | // We can read status without locks, but not wait for it (if Posix no PTHREAD_TIMEDJOIN) |
| 983 | if (status.load(std::memory_order_acquire) >= Lane::Done) { | 1002 | auto const _status{ status.load(std::memory_order_acquire) }; |
| 1003 | if (_status == Lane::Done || _status == Lane::Error || _status == Lane::Cancelled) { | ||
| 984 | // say "ok" by default, including when lane is already done | 1004 | // say "ok" by default, including when lane is already done |
| 985 | return CancelResult::Cancelled; | 1005 | return CancelResult::Cancelled; |
| 986 | } | 1006 | } |
| @@ -1015,7 +1035,7 @@ CancelResult Lane::internalCancel(CancelRequest const rq_, std::chrono::time_poi | |||
| 1015 | } | 1035 | } |
| 1016 | } | 1036 | } |
| 1017 | // wait until the lane stops working with its state (either Suspended or Done+) | 1037 | // wait until the lane stops working with its state (either Suspended or Done+) |
| 1018 | CancelResult const result{ waitForCompletion(until_, true) ? CancelResult::Cancelled : CancelResult::Timeout }; | 1038 | CancelResult const result{ waitForCompletion(until_, false) ? CancelResult::Cancelled : CancelResult::Timeout }; |
| 1019 | return result; | 1039 | return result; |
| 1020 | } | 1040 | } |
| 1021 | 1041 | ||
