diff options
author | Benoit Germain <benoit.germain@ubisoft.com> | 2024-11-13 10:00:20 +0100 |
---|---|---|
committer | Benoit Germain <benoit.germain@ubisoft.com> | 2024-11-13 10:00:20 +0100 |
commit | 43915511f5e0c74a5aa6e0d02fe62505eb133191 (patch) | |
tree | b4e4e06a85e8d8868c0d551ebce74f9900606020 | |
parent | d9879337c9843d7bcc936a6fbf0755288ed70607 (diff) | |
download | lanes-43915511f5e0c74a5aa6e0d02fe62505eb133191.tar.gz lanes-43915511f5e0c74a5aa6e0d02fe62505eb133191.tar.bz2 lanes-43915511f5e0c74a5aa6e0d02fe62505eb133191.zip |
Cleaning up guano
Converted volatile Lane::status to std::atomic
-rw-r--r-- | src/lane.cpp | 46 | ||||
-rw-r--r-- | src/lane.hpp | 3 | ||||
-rw-r--r-- | src/lanes.cpp | 2 | ||||
-rw-r--r-- | src/linda.cpp | 12 |
4 files changed, 35 insertions, 28 deletions
diff --git a/src/lane.cpp b/src/lane.cpp index c9e334c..ac3fffa 100644 --- a/src/lane.cpp +++ b/src/lane.cpp | |||
@@ -143,7 +143,7 @@ static LUAG_FUNC(lane_join) | |||
143 | int _ret{ 0 }; | 143 | int _ret{ 0 }; |
144 | int const _stored{ _lane->storeResults(L_) }; | 144 | int const _stored{ _lane->storeResults(L_) }; |
145 | STACK_GROW(L_, std::max(3, _stored + 1)); | 145 | STACK_GROW(L_, std::max(3, _stored + 1)); |
146 | switch (_lane->status) { | 146 | switch (_lane->status.load(std::memory_order_acquire)) { |
147 | case Lane::Suspended: // got yielded values | 147 | case Lane::Suspended: // got yielded values |
148 | case Lane::Done: // got regular return values | 148 | case Lane::Done: // got regular return values |
149 | { | 149 | { |
@@ -207,12 +207,13 @@ LUAG_FUNC(lane_resume) | |||
207 | std::optional<Lane::Status> _hadToWait{}; // for debugging, if we ever raise the error just below | 207 | std::optional<Lane::Status> _hadToWait{}; // for debugging, if we ever raise the error just below |
208 | { | 208 | { |
209 | std::unique_lock _guard{ _lane->doneMutex }; | 209 | std::unique_lock _guard{ _lane->doneMutex }; |
210 | if (_lane->status == Lane::Pending || _lane->status == Lane::Running || _lane->status == Lane::Resuming) { | 210 | Lane::Status const _status{ _lane->status.load(std::memory_order_acquire) }; |
211 | _hadToWait = _lane->status; | 211 | if (_status == Lane::Pending || _status == Lane::Running || _status == Lane::Resuming) { |
212 | _lane->doneCondVar.wait(_guard, [_lane]() { return _lane->status == Lane::Suspended; }); | 212 | _hadToWait = _status; |
213 | _lane->doneCondVar.wait(_guard, [_lane]() { return _lane->status.load(std::memory_order_acquire) == Lane::Suspended; }); | ||
213 | } | 214 | } |
214 | } | 215 | } |
215 | if (_lane->status != Lane::Suspended) { | 216 | if (_lane->status.load(std::memory_order_acquire) != Lane::Suspended) { |
216 | if (_hadToWait) { | 217 | if (_hadToWait) { |
217 | raise_luaL_error(L_, "INTERNAL ERROR: Lane status is %s instead of 'suspended'", _lane->threadStatusString().data()); | 218 | raise_luaL_error(L_, "INTERNAL ERROR: Lane status is %s instead of 'suspended'", _lane->threadStatusString().data()); |
218 | } else { | 219 | } else { |
@@ -242,7 +243,7 @@ LUAG_FUNC(lane_resume) | |||
242 | STACK_CHECK(_L2, _nargs); // we should have removed everything from the lane's stack, and pushed our args | 243 | STACK_CHECK(_L2, _nargs); // we should have removed everything from the lane's stack, and pushed our args |
243 | STACK_CHECK(L_, 1 + _nargs + _nresults); // and the results of the coroutine are on top here | 244 | STACK_CHECK(L_, 1 + _nargs + _nresults); // and the results of the coroutine are on top here |
244 | std::unique_lock _guard{ _lane->doneMutex }; | 245 | std::unique_lock _guard{ _lane->doneMutex }; |
245 | _lane->status = Lane::Resuming; | 246 | _lane->status.store(Lane::Resuming, std::memory_order_release); |
246 | _lane->doneCondVar.notify_one(); | 247 | _lane->doneCondVar.notify_one(); |
247 | return _nresults; | 248 | return _nresults; |
248 | } | 249 | } |
@@ -704,13 +705,13 @@ static void lane_main(Lane* const lane_) | |||
704 | 705 | ||
705 | lua_State* const _L{ lane_->L }; | 706 | lua_State* const _L{ lane_->L }; |
706 | LuaError _rc{ LuaError::ERRRUN }; | 707 | LuaError _rc{ LuaError::ERRRUN }; |
707 | if (lane_->status == Lane::Pending) { // nothing wrong happened during preparation, we can work | 708 | if (lane_->status.load(std::memory_order_acquire) == Lane::Pending) { // nothing wrong happened during preparation, we can work |
708 | // At this point, the lane function and arguments are on the stack, possibly preceded by the error handler | 709 | // At this point, the lane function and arguments are on the stack, possibly preceded by the error handler |
709 | int const _errorHandlerCount{ lane_->errorHandlerCount() }; | 710 | int const _errorHandlerCount{ lane_->errorHandlerCount() }; |
710 | int _nargs{ lua_gettop(_L) - 1 - _errorHandlerCount }; | 711 | int _nargs{ lua_gettop(_L) - 1 - _errorHandlerCount }; |
711 | { | 712 | { |
712 | std::unique_lock _guard{ lane_->doneMutex }; | 713 | std::unique_lock _guard{ lane_->doneMutex }; |
713 | lane_->status = Lane::Running; // Pending -> Running | 714 | lane_->status.store(Lane::Running, std::memory_order_release); // Pending -> Running |
714 | } | 715 | } |
715 | 716 | ||
716 | PrepareLaneHelpers(lane_); | 717 | PrepareLaneHelpers(lane_); |
@@ -726,15 +727,15 @@ static void lane_main(Lane* const lane_) | |||
726 | if (_rc == LuaError::YIELD) { | 727 | if (_rc == LuaError::YIELD) { |
727 | // change our status to suspended, and wait until someone wants us to resume | 728 | // change our status to suspended, and wait until someone wants us to resume |
728 | std::unique_lock _guard{ lane_->doneMutex }; | 729 | std::unique_lock _guard{ lane_->doneMutex }; |
729 | lane_->status = Lane::Suspended; // Running -> Suspended | 730 | lane_->status.store(Lane::Suspended, std::memory_order_release); // Running -> Suspended |
730 | lane_->doneCondVar.notify_one(); | 731 | lane_->doneCondVar.notify_one(); |
731 | // wait until the user wants us to resume | 732 | // wait until the user wants us to resume |
732 | // TODO: do I update waiting_on or not, so that the lane can be woken by cancellation requests here? | 733 | // TODO: do I update waiting_on or not, so that the lane can be woken by cancellation requests here? |
733 | // lane_->waiting_on = &lane_->doneCondVar; | 734 | // lane_->waiting_on = &lane_->doneCondVar; |
734 | lane_->doneCondVar.wait(_guard, [lane_]() { return lane_->status == Lane::Resuming; }); | 735 | lane_->doneCondVar.wait(_guard, [lane_]() { return lane_->status.load(std::memory_order_acquire) == Lane::Resuming; }); |
735 | // here lane_->doneMutex is locked again | 736 | // here lane_->doneMutex is locked again |
736 | // lane_->waiting_on = nullptr; | 737 | // lane_->waiting_on = nullptr; |
737 | lane_->status = Lane::Running; // Resuming -> Running | 738 | lane_->status.store(Lane::Running, std::memory_order_release); // Resuming -> Running |
738 | // on the stack we find the values pushed by lane:resume() | 739 | // on the stack we find the values pushed by lane:resume() |
739 | _nargs = lua_gettop(_L); | 740 | _nargs = lua_gettop(_L); |
740 | } | 741 | } |
@@ -789,7 +790,7 @@ static void lane_main(Lane* const lane_) | |||
789 | Lane::Status const _st{ (_rc == LuaError::OK) ? Lane::Done : kCancelError.equals(_L, StackIndex{ 1 }) ? Lane::Cancelled : Lane::Error }; | 790 | Lane::Status const _st{ (_rc == LuaError::OK) ? Lane::Done : kCancelError.equals(_L, StackIndex{ 1 }) ? Lane::Cancelled : Lane::Error }; |
790 | // 'doneMutex' protects the -> Done|Error|Cancelled state change, and the Running|Suspended|Resuming state change too | 791 | // 'doneMutex' protects the -> Done|Error|Cancelled state change, and the Running|Suspended|Resuming state change too |
791 | std::lock_guard _guard{ lane_->doneMutex }; | 792 | std::lock_guard _guard{ lane_->doneMutex }; |
792 | lane_->status = _st; | 793 | lane_->status.store(_st, std::memory_order_release); |
793 | lane_->doneCondVar.notify_one(); // wake up master (while 'lane_->doneMutex' is on) | 794 | lane_->doneCondVar.notify_one(); // wake up master (while 'lane_->doneMutex' is on) |
794 | } | 795 | } |
795 | 796 | ||
@@ -842,7 +843,7 @@ static LUAG_FUNC(lane_gc) | |||
842 | } | 843 | } |
843 | 844 | ||
844 | // We can read 'lane->status' without locks, but not wait for it | 845 | // We can read 'lane->status' without locks, but not wait for it |
845 | if (_lane->status < Lane::Done) { | 846 | if (_lane->status.load(std::memory_order_acquire) < Lane::Done) { |
846 | // still running: will have to be cleaned up later | 847 | // still running: will have to be cleaned up later |
847 | _lane->selfdestructAdd(); | 848 | _lane->selfdestructAdd(); |
848 | assert(_lane->selfdestruct_next); | 849 | assert(_lane->selfdestruct_next); |
@@ -918,7 +919,7 @@ CancelResult Lane::cancel(CancelOp const op_, std::chrono::time_point<std::chron | |||
918 | 919 | ||
919 | // remember that lanes are not transferable: only one thread can cancel a lane, so no multithreading issue here | 920 | // remember that lanes are not transferable: only one thread can cancel a lane, so no multithreading issue here |
920 | // We can read status without locks, but not wait for it (if Posix no PTHREAD_TIMEDJOIN) | 921 | // We can read status without locks, but not wait for it (if Posix no PTHREAD_TIMEDJOIN) |
921 | if (status >= Lane::Done) { | 922 | if (status.load(std::memory_order_acquire) >= Lane::Done) { |
922 | // say "ok" by default, including when lane is already done | 923 | // say "ok" by default, including when lane is already done |
923 | return CancelResult::Cancelled; | 924 | return CancelResult::Cancelled; |
924 | } | 925 | } |
@@ -944,7 +945,7 @@ CancelResult Lane::cancel(CancelOp const op_, std::chrono::time_point<std::chron | |||
944 | } | 945 | } |
945 | if (wakeLane_ == WakeLane::Yes) { // wake the thread so that execution returns from any pending linda operation if desired | 946 | if (wakeLane_ == WakeLane::Yes) { // wake the thread so that execution returns from any pending linda operation if desired |
946 | std::condition_variable* const _waiting_on{ waiting_on }; | 947 | std::condition_variable* const _waiting_on{ waiting_on }; |
947 | if (status == Lane::Waiting && _waiting_on != nullptr) { | 948 | if (status.load(std::memory_order_acquire) == Lane::Waiting && _waiting_on != nullptr) { |
948 | _waiting_on->notify_all(); | 949 | _waiting_on->notify_all(); |
949 | } | 950 | } |
950 | } | 951 | } |
@@ -1058,7 +1059,7 @@ void Lane::pushIndexedResult(lua_State* const L_, int const key_) const | |||
1058 | STACK_GROW(L_, 3); | 1059 | STACK_GROW(L_, 3); |
1059 | 1060 | ||
1060 | lua_getiuservalue(L_, kIdxSelf, UserValueIndex{ 1 }); // L_: lane ... {uv} | 1061 | lua_getiuservalue(L_, kIdxSelf, UserValueIndex{ 1 }); // L_: lane ... {uv} |
1061 | if (status != Lane::Error) { | 1062 | if (status.load(std::memory_order_acquire) != Lane::Error) { |
1062 | lua_rawgeti(L_, -1, key_); // L_: lane ... {uv} uv[i] | 1063 | lua_rawgeti(L_, -1, key_); // L_: lane ... {uv} uv[i] |
1063 | lua_remove(L_, -2); // L_: lane ... uv[i] | 1064 | lua_remove(L_, -2); // L_: lane ... uv[i] |
1064 | return; | 1065 | return; |
@@ -1194,7 +1195,7 @@ int Lane::storeResults(lua_State* const L_) | |||
1194 | return _stored; | 1195 | return _stored; |
1195 | } | 1196 | } |
1196 | 1197 | ||
1197 | switch (status) { | 1198 | switch (status.load(std::memory_order_acquire)) { |
1198 | default: | 1199 | default: |
1199 | // this is an internal error, we probably never get here | 1200 | // this is an internal error, we probably never get here |
1200 | lua_settop(L_, 0); // L_: | 1201 | lua_settop(L_, 0); // L_: |
@@ -1258,7 +1259,7 @@ int Lane::storeResults(lua_State* const L_) | |||
1258 | STACK_CHECK(L_, 0); | 1259 | STACK_CHECK(L_, 0); |
1259 | LUA_ASSERT(L_, lua_gettop(L) == 0 && nresults == 0); | 1260 | LUA_ASSERT(L_, lua_gettop(L) == 0 && nresults == 0); |
1260 | // if we are suspended, all we want to do is gather the current yielded values | 1261 | // if we are suspended, all we want to do is gather the current yielded values |
1261 | if (status != Lane::Suspended) { | 1262 | if (status.load(std::memory_order_acquire) != Lane::Suspended) { |
1262 | // debugName is a pointer to string possibly interned in the lane's state, that no longer exists when the state is closed | 1263 | // debugName is a pointer to string possibly interned in the lane's state, that no longer exists when the state is closed |
1263 | // so store it in the userdata uservalue at a key that can't possibly collide | 1264 | // so store it in the userdata uservalue at a key that can't possibly collide |
1264 | securizeDebugName(L_); | 1265 | securizeDebugName(L_); |
@@ -1300,7 +1301,7 @@ int Lane::storeResults(lua_State* const L_) | |||
1300 | static_assert(5 == static_cast<std::underlying_type_t<Lane::Status>>(Done)); | 1301 | static_assert(5 == static_cast<std::underlying_type_t<Lane::Status>>(Done)); |
1301 | static_assert(6 == static_cast<std::underlying_type_t<Lane::Status>>(Error)); | 1302 | static_assert(6 == static_cast<std::underlying_type_t<Lane::Status>>(Error)); |
1302 | static_assert(7 == static_cast<std::underlying_type_t<Lane::Status>>(Cancelled)); | 1303 | static_assert(7 == static_cast<std::underlying_type_t<Lane::Status>>(Cancelled)); |
1303 | auto const _status{ static_cast<std::underlying_type_t<Lane::Status>>(status) }; | 1304 | auto const _status{ static_cast<std::underlying_type_t<Lane::Status>>(status.load(std::memory_order_acquire)) }; |
1304 | if (_status < 0 || _status > 7) { // should never happen, but better safe than sorry | 1305 | if (_status < 0 || _status > 7) { // should never happen, but better safe than sorry |
1305 | return ""; | 1306 | return ""; |
1306 | } | 1307 | } |
@@ -1316,5 +1317,10 @@ bool Lane::waitForCompletion(std::chrono::time_point<std::chrono::steady_clock> | |||
1316 | // return doneCondVar.wait_until(lock, token, secs_, [this](){ return status >= Lane::Done; }); | 1317 | // return doneCondVar.wait_until(lock, token, secs_, [this](){ return status >= Lane::Done; }); |
1317 | 1318 | ||
1318 | // wait until the lane stops working with its state (either Suspended or Done+) | 1319 | // wait until the lane stops working with its state (either Suspended or Done+) |
1319 | return doneCondVar.wait_until(_guard, until_, [this]() { return status == Lane::Suspended || status >= Lane::Done; }); | 1320 | return doneCondVar.wait_until(_guard, until_, [this]() |
1321 | { | ||
1322 | auto const _status{ status.load(std::memory_order_acquire) }; | ||
1323 | return _status == Lane::Suspended || _status >= Lane::Done; | ||
1324 | } | ||
1325 | ); | ||
1320 | } | 1326 | } |
diff --git a/src/lane.hpp b/src/lane.hpp index 7821112..4fd0f6d 100644 --- a/src/lane.hpp +++ b/src/lane.hpp | |||
@@ -112,7 +112,8 @@ class Lane | |||
112 | // M: prepares the state, and reads results | 112 | // M: prepares the state, and reads results |
113 | // S: while S is running, M must keep out of modifying the state | 113 | // S: while S is running, M must keep out of modifying the state |
114 | 114 | ||
115 | Status volatile status{ Pending }; | 115 | std::atomic<Status> status{ Pending }; |
116 | static_assert(std::atomic<Status>::is_always_lock_free); | ||
116 | // | 117 | // |
117 | // M: sets to Pending (before launching) | 118 | // M: sets to Pending (before launching) |
118 | // S: updates -> Running/Waiting/Suspended -> Done/Error/Cancelled | 119 | // S: updates -> Running/Waiting/Suspended -> Done/Error/Cancelled |
diff --git a/src/lanes.cpp b/src/lanes.cpp index df57bb4..c65fc1c 100644 --- a/src/lanes.cpp +++ b/src/lanes.cpp | |||
@@ -312,7 +312,7 @@ LUAG_FUNC(lane_new) | |||
312 | { | 312 | { |
313 | std::lock_guard _guard{ lane->doneMutex }; | 313 | std::lock_guard _guard{ lane->doneMutex }; |
314 | // this will cause lane_main to skip actual running (because we are not Pending anymore) | 314 | // this will cause lane_main to skip actual running (because we are not Pending anymore) |
315 | lane->status = Lane::Running; | 315 | lane->status.store(Lane::Running, std::memory_order_release); |
316 | } | 316 | } |
317 | // unblock the thread so that it can terminate gracefully | 317 | // unblock the thread so that it can terminate gracefully |
318 | #ifndef __PROSPERO__ | 318 | #ifndef __PROSPERO__ |
diff --git a/src/linda.cpp b/src/linda.cpp index 9658e38..80f62d3 100644 --- a/src/linda.cpp +++ b/src/linda.cpp | |||
@@ -671,9 +671,9 @@ LUAG_FUNC(linda_receive) | |||
671 | Lane::Status _prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings | 671 | Lane::Status _prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings |
672 | if (_lane != nullptr) { | 672 | if (_lane != nullptr) { |
673 | // change status of lane to "waiting" | 673 | // change status of lane to "waiting" |
674 | _prev_status = _lane->status; // Running, most likely | 674 | _prev_status = _lane->status.load(std::memory_order_acquire); // Running, most likely |
675 | LUA_ASSERT(L_, _prev_status == Lane::Running); // but check, just in case | 675 | LUA_ASSERT(L_, _prev_status == Lane::Running); // but check, just in case |
676 | _lane->status = Lane::Waiting; | 676 | _lane->status.store(Lane::Waiting, std::memory_order_release); |
677 | LUA_ASSERT(L_, _lane->waiting_on == nullptr); | 677 | LUA_ASSERT(L_, _lane->waiting_on == nullptr); |
678 | _lane->waiting_on = &_linda->writeHappened; | 678 | _lane->waiting_on = &_linda->writeHappened; |
679 | } | 679 | } |
@@ -684,7 +684,7 @@ LUAG_FUNC(linda_receive) | |||
684 | _try_again = (_status == std::cv_status::no_timeout); // detect spurious wakeups | 684 | _try_again = (_status == std::cv_status::no_timeout); // detect spurious wakeups |
685 | if (_lane != nullptr) { | 685 | if (_lane != nullptr) { |
686 | _lane->waiting_on = nullptr; | 686 | _lane->waiting_on = nullptr; |
687 | _lane->status = _prev_status; | 687 | _lane->status.store(_prev_status, std::memory_order_release); |
688 | } | 688 | } |
689 | } | 689 | } |
690 | } | 690 | } |
@@ -816,9 +816,9 @@ LUAG_FUNC(linda_send) | |||
816 | Lane::Status _prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings | 816 | Lane::Status _prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings |
817 | if (_lane != nullptr) { | 817 | if (_lane != nullptr) { |
818 | // change status of lane to "waiting" | 818 | // change status of lane to "waiting" |
819 | _prev_status = _lane->status; // Running, most likely | 819 | _prev_status = _lane->status.load(std::memory_order_acquire); // Running, most likely |
820 | LUA_ASSERT(L_, _prev_status == Lane::Running); // but check, just in case | 820 | LUA_ASSERT(L_, _prev_status == Lane::Running); // but check, just in case |
821 | _lane->status = Lane::Waiting; | 821 | _lane->status.store(Lane::Waiting, std::memory_order_release); |
822 | LUA_ASSERT(L_, _lane->waiting_on == nullptr); | 822 | LUA_ASSERT(L_, _lane->waiting_on == nullptr); |
823 | _lane->waiting_on = &_linda->readHappened; | 823 | _lane->waiting_on = &_linda->readHappened; |
824 | } | 824 | } |
@@ -829,7 +829,7 @@ LUAG_FUNC(linda_send) | |||
829 | _try_again = (status == std::cv_status::no_timeout); // detect spurious wakeups | 829 | _try_again = (status == std::cv_status::no_timeout); // detect spurious wakeups |
830 | if (_lane != nullptr) { | 830 | if (_lane != nullptr) { |
831 | _lane->waiting_on = nullptr; | 831 | _lane->waiting_on = nullptr; |
832 | _lane->status = _prev_status; | 832 | _lane->status.store(_prev_status, std::memory_order_release); |
833 | } | 833 | } |
834 | } | 834 | } |
835 | } | 835 | } |