aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenoit Germain <benoit.germain@ubisoft.com>2024-11-13 10:00:20 +0100
committerBenoit Germain <benoit.germain@ubisoft.com>2024-11-13 10:00:20 +0100
commit43915511f5e0c74a5aa6e0d02fe62505eb133191 (patch)
treeb4e4e06a85e8d8868c0d551ebce74f9900606020
parentd9879337c9843d7bcc936a6fbf0755288ed70607 (diff)
downloadlanes-43915511f5e0c74a5aa6e0d02fe62505eb133191.tar.gz
lanes-43915511f5e0c74a5aa6e0d02fe62505eb133191.tar.bz2
lanes-43915511f5e0c74a5aa6e0d02fe62505eb133191.zip
Cleaning up guano
Converted volatile Lane::status to std::atomic
-rw-r--r--src/lane.cpp46
-rw-r--r--src/lane.hpp3
-rw-r--r--src/lanes.cpp2
-rw-r--r--src/linda.cpp12
4 files changed, 35 insertions, 28 deletions
diff --git a/src/lane.cpp b/src/lane.cpp
index c9e334c..ac3fffa 100644
--- a/src/lane.cpp
+++ b/src/lane.cpp
@@ -143,7 +143,7 @@ static LUAG_FUNC(lane_join)
143 int _ret{ 0 }; 143 int _ret{ 0 };
144 int const _stored{ _lane->storeResults(L_) }; 144 int const _stored{ _lane->storeResults(L_) };
145 STACK_GROW(L_, std::max(3, _stored + 1)); 145 STACK_GROW(L_, std::max(3, _stored + 1));
146 switch (_lane->status) { 146 switch (_lane->status.load(std::memory_order_acquire)) {
147 case Lane::Suspended: // got yielded values 147 case Lane::Suspended: // got yielded values
148 case Lane::Done: // got regular return values 148 case Lane::Done: // got regular return values
149 { 149 {
@@ -207,12 +207,13 @@ LUAG_FUNC(lane_resume)
207 std::optional<Lane::Status> _hadToWait{}; // for debugging, if we ever raise the error just below 207 std::optional<Lane::Status> _hadToWait{}; // for debugging, if we ever raise the error just below
208 { 208 {
209 std::unique_lock _guard{ _lane->doneMutex }; 209 std::unique_lock _guard{ _lane->doneMutex };
210 if (_lane->status == Lane::Pending || _lane->status == Lane::Running || _lane->status == Lane::Resuming) { 210 Lane::Status const _status{ _lane->status.load(std::memory_order_acquire) };
211 _hadToWait = _lane->status; 211 if (_status == Lane::Pending || _status == Lane::Running || _status == Lane::Resuming) {
212 _lane->doneCondVar.wait(_guard, [_lane]() { return _lane->status == Lane::Suspended; }); 212 _hadToWait = _status;
213 _lane->doneCondVar.wait(_guard, [_lane]() { return _lane->status.load(std::memory_order_acquire) == Lane::Suspended; });
213 } 214 }
214 } 215 }
215 if (_lane->status != Lane::Suspended) { 216 if (_lane->status.load(std::memory_order_acquire) != Lane::Suspended) {
216 if (_hadToWait) { 217 if (_hadToWait) {
217 raise_luaL_error(L_, "INTERNAL ERROR: Lane status is %s instead of 'suspended'", _lane->threadStatusString().data()); 218 raise_luaL_error(L_, "INTERNAL ERROR: Lane status is %s instead of 'suspended'", _lane->threadStatusString().data());
218 } else { 219 } else {
@@ -242,7 +243,7 @@ LUAG_FUNC(lane_resume)
242 STACK_CHECK(_L2, _nargs); // we should have removed everything from the lane's stack, and pushed our args 243 STACK_CHECK(_L2, _nargs); // we should have removed everything from the lane's stack, and pushed our args
243 STACK_CHECK(L_, 1 + _nargs + _nresults); // and the results of the coroutine are on top here 244 STACK_CHECK(L_, 1 + _nargs + _nresults); // and the results of the coroutine are on top here
244 std::unique_lock _guard{ _lane->doneMutex }; 245 std::unique_lock _guard{ _lane->doneMutex };
245 _lane->status = Lane::Resuming; 246 _lane->status.store(Lane::Resuming, std::memory_order_release);
246 _lane->doneCondVar.notify_one(); 247 _lane->doneCondVar.notify_one();
247 return _nresults; 248 return _nresults;
248} 249}
@@ -704,13 +705,13 @@ static void lane_main(Lane* const lane_)
704 705
705 lua_State* const _L{ lane_->L }; 706 lua_State* const _L{ lane_->L };
706 LuaError _rc{ LuaError::ERRRUN }; 707 LuaError _rc{ LuaError::ERRRUN };
707 if (lane_->status == Lane::Pending) { // nothing wrong happened during preparation, we can work 708 if (lane_->status.load(std::memory_order_acquire) == Lane::Pending) { // nothing wrong happened during preparation, we can work
708 // At this point, the lane function and arguments are on the stack, possibly preceded by the error handler 709 // At this point, the lane function and arguments are on the stack, possibly preceded by the error handler
709 int const _errorHandlerCount{ lane_->errorHandlerCount() }; 710 int const _errorHandlerCount{ lane_->errorHandlerCount() };
710 int _nargs{ lua_gettop(_L) - 1 - _errorHandlerCount }; 711 int _nargs{ lua_gettop(_L) - 1 - _errorHandlerCount };
711 { 712 {
712 std::unique_lock _guard{ lane_->doneMutex }; 713 std::unique_lock _guard{ lane_->doneMutex };
713 lane_->status = Lane::Running; // Pending -> Running 714 lane_->status.store(Lane::Running, std::memory_order_release); // Pending -> Running
714 } 715 }
715 716
716 PrepareLaneHelpers(lane_); 717 PrepareLaneHelpers(lane_);
@@ -726,15 +727,15 @@ static void lane_main(Lane* const lane_)
726 if (_rc == LuaError::YIELD) { 727 if (_rc == LuaError::YIELD) {
727 // change our status to suspended, and wait until someone wants us to resume 728 // change our status to suspended, and wait until someone wants us to resume
728 std::unique_lock _guard{ lane_->doneMutex }; 729 std::unique_lock _guard{ lane_->doneMutex };
729 lane_->status = Lane::Suspended; // Running -> Suspended 730 lane_->status.store(Lane::Suspended, std::memory_order_release); // Running -> Suspended
730 lane_->doneCondVar.notify_one(); 731 lane_->doneCondVar.notify_one();
731 // wait until the user wants us to resume 732 // wait until the user wants us to resume
732 // TODO: do I update waiting_on or not, so that the lane can be woken by cancellation requests here? 733 // TODO: do I update waiting_on or not, so that the lane can be woken by cancellation requests here?
733 // lane_->waiting_on = &lane_->doneCondVar; 734 // lane_->waiting_on = &lane_->doneCondVar;
734 lane_->doneCondVar.wait(_guard, [lane_]() { return lane_->status == Lane::Resuming; }); 735 lane_->doneCondVar.wait(_guard, [lane_]() { return lane_->status.load(std::memory_order_acquire) == Lane::Resuming; });
735 // here lane_->doneMutex is locked again 736 // here lane_->doneMutex is locked again
736 // lane_->waiting_on = nullptr; 737 // lane_->waiting_on = nullptr;
737 lane_->status = Lane::Running; // Resuming -> Running 738 lane_->status.store(Lane::Running, std::memory_order_release); // Resuming -> Running
738 // on the stack we find the values pushed by lane:resume() 739 // on the stack we find the values pushed by lane:resume()
739 _nargs = lua_gettop(_L); 740 _nargs = lua_gettop(_L);
740 } 741 }
@@ -789,7 +790,7 @@ static void lane_main(Lane* const lane_)
789 Lane::Status const _st{ (_rc == LuaError::OK) ? Lane::Done : kCancelError.equals(_L, StackIndex{ 1 }) ? Lane::Cancelled : Lane::Error }; 790 Lane::Status const _st{ (_rc == LuaError::OK) ? Lane::Done : kCancelError.equals(_L, StackIndex{ 1 }) ? Lane::Cancelled : Lane::Error };
790 // 'doneMutex' protects the -> Done|Error|Cancelled state change, and the Running|Suspended|Resuming state change too 791 // 'doneMutex' protects the -> Done|Error|Cancelled state change, and the Running|Suspended|Resuming state change too
791 std::lock_guard _guard{ lane_->doneMutex }; 792 std::lock_guard _guard{ lane_->doneMutex };
792 lane_->status = _st; 793 lane_->status.store(_st, std::memory_order_release);
793 lane_->doneCondVar.notify_one(); // wake up master (while 'lane_->doneMutex' is on) 794 lane_->doneCondVar.notify_one(); // wake up master (while 'lane_->doneMutex' is on)
794} 795}
795 796
@@ -842,7 +843,7 @@ static LUAG_FUNC(lane_gc)
842 } 843 }
843 844
844 // We can read 'lane->status' without locks, but not wait for it 845 // We can read 'lane->status' without locks, but not wait for it
845 if (_lane->status < Lane::Done) { 846 if (_lane->status.load(std::memory_order_acquire) < Lane::Done) {
846 // still running: will have to be cleaned up later 847 // still running: will have to be cleaned up later
847 _lane->selfdestructAdd(); 848 _lane->selfdestructAdd();
848 assert(_lane->selfdestruct_next); 849 assert(_lane->selfdestruct_next);
@@ -918,7 +919,7 @@ CancelResult Lane::cancel(CancelOp const op_, std::chrono::time_point<std::chron
918 919
919 // remember that lanes are not transferable: only one thread can cancel a lane, so no multithreading issue here 920 // remember that lanes are not transferable: only one thread can cancel a lane, so no multithreading issue here
920 // We can read status without locks, but not wait for it (if Posix no PTHREAD_TIMEDJOIN) 921 // We can read status without locks, but not wait for it (if Posix no PTHREAD_TIMEDJOIN)
921 if (status >= Lane::Done) { 922 if (status.load(std::memory_order_acquire) >= Lane::Done) {
922 // say "ok" by default, including when lane is already done 923 // say "ok" by default, including when lane is already done
923 return CancelResult::Cancelled; 924 return CancelResult::Cancelled;
924 } 925 }
@@ -944,7 +945,7 @@ CancelResult Lane::cancel(CancelOp const op_, std::chrono::time_point<std::chron
944 } 945 }
945 if (wakeLane_ == WakeLane::Yes) { // wake the thread so that execution returns from any pending linda operation if desired 946 if (wakeLane_ == WakeLane::Yes) { // wake the thread so that execution returns from any pending linda operation if desired
946 std::condition_variable* const _waiting_on{ waiting_on }; 947 std::condition_variable* const _waiting_on{ waiting_on };
947 if (status == Lane::Waiting && _waiting_on != nullptr) { 948 if (status.load(std::memory_order_acquire) == Lane::Waiting && _waiting_on != nullptr) {
948 _waiting_on->notify_all(); 949 _waiting_on->notify_all();
949 } 950 }
950 } 951 }
@@ -1058,7 +1059,7 @@ void Lane::pushIndexedResult(lua_State* const L_, int const key_) const
1058 STACK_GROW(L_, 3); 1059 STACK_GROW(L_, 3);
1059 1060
1060 lua_getiuservalue(L_, kIdxSelf, UserValueIndex{ 1 }); // L_: lane ... {uv} 1061 lua_getiuservalue(L_, kIdxSelf, UserValueIndex{ 1 }); // L_: lane ... {uv}
1061 if (status != Lane::Error) { 1062 if (status.load(std::memory_order_acquire) != Lane::Error) {
1062 lua_rawgeti(L_, -1, key_); // L_: lane ... {uv} uv[i] 1063 lua_rawgeti(L_, -1, key_); // L_: lane ... {uv} uv[i]
1063 lua_remove(L_, -2); // L_: lane ... uv[i] 1064 lua_remove(L_, -2); // L_: lane ... uv[i]
1064 return; 1065 return;
@@ -1194,7 +1195,7 @@ int Lane::storeResults(lua_State* const L_)
1194 return _stored; 1195 return _stored;
1195 } 1196 }
1196 1197
1197 switch (status) { 1198 switch (status.load(std::memory_order_acquire)) {
1198 default: 1199 default:
1199 // this is an internal error, we probably never get here 1200 // this is an internal error, we probably never get here
1200 lua_settop(L_, 0); // L_: 1201 lua_settop(L_, 0); // L_:
@@ -1258,7 +1259,7 @@ int Lane::storeResults(lua_State* const L_)
1258 STACK_CHECK(L_, 0); 1259 STACK_CHECK(L_, 0);
1259 LUA_ASSERT(L_, lua_gettop(L) == 0 && nresults == 0); 1260 LUA_ASSERT(L_, lua_gettop(L) == 0 && nresults == 0);
1260 // if we are suspended, all we want to do is gather the current yielded values 1261 // if we are suspended, all we want to do is gather the current yielded values
1261 if (status != Lane::Suspended) { 1262 if (status.load(std::memory_order_acquire) != Lane::Suspended) {
1262 // debugName is a pointer to string possibly interned in the lane's state, that no longer exists when the state is closed 1263 // debugName is a pointer to string possibly interned in the lane's state, that no longer exists when the state is closed
1263 // so store it in the userdata uservalue at a key that can't possibly collide 1264 // so store it in the userdata uservalue at a key that can't possibly collide
1264 securizeDebugName(L_); 1265 securizeDebugName(L_);
@@ -1300,7 +1301,7 @@ int Lane::storeResults(lua_State* const L_)
1300 static_assert(5 == static_cast<std::underlying_type_t<Lane::Status>>(Done)); 1301 static_assert(5 == static_cast<std::underlying_type_t<Lane::Status>>(Done));
1301 static_assert(6 == static_cast<std::underlying_type_t<Lane::Status>>(Error)); 1302 static_assert(6 == static_cast<std::underlying_type_t<Lane::Status>>(Error));
1302 static_assert(7 == static_cast<std::underlying_type_t<Lane::Status>>(Cancelled)); 1303 static_assert(7 == static_cast<std::underlying_type_t<Lane::Status>>(Cancelled));
1303 auto const _status{ static_cast<std::underlying_type_t<Lane::Status>>(status) }; 1304 auto const _status{ static_cast<std::underlying_type_t<Lane::Status>>(status.load(std::memory_order_acquire)) };
1304 if (_status < 0 || _status > 7) { // should never happen, but better safe than sorry 1305 if (_status < 0 || _status > 7) { // should never happen, but better safe than sorry
1305 return ""; 1306 return "";
1306 } 1307 }
@@ -1316,5 +1317,10 @@ bool Lane::waitForCompletion(std::chrono::time_point<std::chrono::steady_clock>
1316 // return doneCondVar.wait_until(lock, token, secs_, [this](){ return status >= Lane::Done; }); 1317 // return doneCondVar.wait_until(lock, token, secs_, [this](){ return status >= Lane::Done; });
1317 1318
1318 // wait until the lane stops working with its state (either Suspended or Done+) 1319 // wait until the lane stops working with its state (either Suspended or Done+)
1319 return doneCondVar.wait_until(_guard, until_, [this]() { return status == Lane::Suspended || status >= Lane::Done; }); 1320 return doneCondVar.wait_until(_guard, until_, [this]()
1321 {
1322 auto const _status{ status.load(std::memory_order_acquire) };
1323 return _status == Lane::Suspended || _status >= Lane::Done;
1324 }
1325 );
1320} 1326}
diff --git a/src/lane.hpp b/src/lane.hpp
index 7821112..4fd0f6d 100644
--- a/src/lane.hpp
+++ b/src/lane.hpp
@@ -112,7 +112,8 @@ class Lane
112 // M: prepares the state, and reads results 112 // M: prepares the state, and reads results
113 // S: while S is running, M must keep out of modifying the state 113 // S: while S is running, M must keep out of modifying the state
114 114
115 Status volatile status{ Pending }; 115 std::atomic<Status> status{ Pending };
116 static_assert(std::atomic<Status>::is_always_lock_free);
116 // 117 //
117 // M: sets to Pending (before launching) 118 // M: sets to Pending (before launching)
118 // S: updates -> Running/Waiting/Suspended -> Done/Error/Cancelled 119 // S: updates -> Running/Waiting/Suspended -> Done/Error/Cancelled
diff --git a/src/lanes.cpp b/src/lanes.cpp
index df57bb4..c65fc1c 100644
--- a/src/lanes.cpp
+++ b/src/lanes.cpp
@@ -312,7 +312,7 @@ LUAG_FUNC(lane_new)
312 { 312 {
313 std::lock_guard _guard{ lane->doneMutex }; 313 std::lock_guard _guard{ lane->doneMutex };
314 // this will cause lane_main to skip actual running (because we are not Pending anymore) 314 // this will cause lane_main to skip actual running (because we are not Pending anymore)
315 lane->status = Lane::Running; 315 lane->status.store(Lane::Running, std::memory_order_release);
316 } 316 }
317 // unblock the thread so that it can terminate gracefully 317 // unblock the thread so that it can terminate gracefully
318#ifndef __PROSPERO__ 318#ifndef __PROSPERO__
diff --git a/src/linda.cpp b/src/linda.cpp
index 9658e38..80f62d3 100644
--- a/src/linda.cpp
+++ b/src/linda.cpp
@@ -671,9 +671,9 @@ LUAG_FUNC(linda_receive)
671 Lane::Status _prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings 671 Lane::Status _prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings
672 if (_lane != nullptr) { 672 if (_lane != nullptr) {
673 // change status of lane to "waiting" 673 // change status of lane to "waiting"
674 _prev_status = _lane->status; // Running, most likely 674 _prev_status = _lane->status.load(std::memory_order_acquire); // Running, most likely
675 LUA_ASSERT(L_, _prev_status == Lane::Running); // but check, just in case 675 LUA_ASSERT(L_, _prev_status == Lane::Running); // but check, just in case
676 _lane->status = Lane::Waiting; 676 _lane->status.store(Lane::Waiting, std::memory_order_release);
677 LUA_ASSERT(L_, _lane->waiting_on == nullptr); 677 LUA_ASSERT(L_, _lane->waiting_on == nullptr);
678 _lane->waiting_on = &_linda->writeHappened; 678 _lane->waiting_on = &_linda->writeHappened;
679 } 679 }
@@ -684,7 +684,7 @@ LUAG_FUNC(linda_receive)
684 _try_again = (_status == std::cv_status::no_timeout); // detect spurious wakeups 684 _try_again = (_status == std::cv_status::no_timeout); // detect spurious wakeups
685 if (_lane != nullptr) { 685 if (_lane != nullptr) {
686 _lane->waiting_on = nullptr; 686 _lane->waiting_on = nullptr;
687 _lane->status = _prev_status; 687 _lane->status.store(_prev_status, std::memory_order_release);
688 } 688 }
689 } 689 }
690 } 690 }
@@ -816,9 +816,9 @@ LUAG_FUNC(linda_send)
816 Lane::Status _prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings 816 Lane::Status _prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings
817 if (_lane != nullptr) { 817 if (_lane != nullptr) {
818 // change status of lane to "waiting" 818 // change status of lane to "waiting"
819 _prev_status = _lane->status; // Running, most likely 819 _prev_status = _lane->status.load(std::memory_order_acquire); // Running, most likely
820 LUA_ASSERT(L_, _prev_status == Lane::Running); // but check, just in case 820 LUA_ASSERT(L_, _prev_status == Lane::Running); // but check, just in case
821 _lane->status = Lane::Waiting; 821 _lane->status.store(Lane::Waiting, std::memory_order_release);
822 LUA_ASSERT(L_, _lane->waiting_on == nullptr); 822 LUA_ASSERT(L_, _lane->waiting_on == nullptr);
823 _lane->waiting_on = &_linda->readHappened; 823 _lane->waiting_on = &_linda->readHappened;
824 } 824 }
@@ -829,7 +829,7 @@ LUAG_FUNC(linda_send)
829 _try_again = (status == std::cv_status::no_timeout); // detect spurious wakeups 829 _try_again = (status == std::cv_status::no_timeout); // detect spurious wakeups
830 if (_lane != nullptr) { 830 if (_lane != nullptr) {
831 _lane->waiting_on = nullptr; 831 _lane->waiting_on = nullptr;
832 _lane->status = _prev_status; 832 _lane->status.store(_prev_status, std::memory_order_release);
833 } 833 }
834 } 834 }
835 } 835 }