From 43915511f5e0c74a5aa6e0d02fe62505eb133191 Mon Sep 17 00:00:00 2001 From: Benoit Germain Date: Wed, 13 Nov 2024 10:00:20 +0100 Subject: Cleaning up guano Converted volatile Lane::status to std::atomic --- src/lane.cpp | 46 ++++++++++++++++++++++++++-------------------- src/lane.hpp | 3 ++- src/lanes.cpp | 2 +- src/linda.cpp | 12 ++++++------ 4 files changed, 35 insertions(+), 28 deletions(-) (limited to 'src') diff --git a/src/lane.cpp b/src/lane.cpp index c9e334c..ac3fffa 100644 --- a/src/lane.cpp +++ b/src/lane.cpp @@ -143,7 +143,7 @@ static LUAG_FUNC(lane_join) int _ret{ 0 }; int const _stored{ _lane->storeResults(L_) }; STACK_GROW(L_, std::max(3, _stored + 1)); - switch (_lane->status) { + switch (_lane->status.load(std::memory_order_acquire)) { case Lane::Suspended: // got yielded values case Lane::Done: // got regular return values { @@ -207,12 +207,13 @@ LUAG_FUNC(lane_resume) std::optional _hadToWait{}; // for debugging, if we ever raise the error just below { std::unique_lock _guard{ _lane->doneMutex }; - if (_lane->status == Lane::Pending || _lane->status == Lane::Running || _lane->status == Lane::Resuming) { - _hadToWait = _lane->status; - _lane->doneCondVar.wait(_guard, [_lane]() { return _lane->status == Lane::Suspended; }); + Lane::Status const _status{ _lane->status.load(std::memory_order_acquire) }; + if (_status == Lane::Pending || _status == Lane::Running || _status == Lane::Resuming) { + _hadToWait = _status; + _lane->doneCondVar.wait(_guard, [_lane]() { return _lane->status.load(std::memory_order_acquire) == Lane::Suspended; }); } } - if (_lane->status != Lane::Suspended) { + if (_lane->status.load(std::memory_order_acquire) != Lane::Suspended) { if (_hadToWait) { raise_luaL_error(L_, "INTERNAL ERROR: Lane status is %s instead of 'suspended'", _lane->threadStatusString().data()); } else { @@ -242,7 +243,7 @@ LUAG_FUNC(lane_resume) STACK_CHECK(_L2, _nargs); // we should have removed everything from the lane's stack, and pushed our args STACK_CHECK(L_, 1 + _nargs + _nresults); // and the results of the coroutine are on top here std::unique_lock _guard{ _lane->doneMutex }; - _lane->status = Lane::Resuming; + _lane->status.store(Lane::Resuming, std::memory_order_release); _lane->doneCondVar.notify_one(); return _nresults; } @@ -704,13 +705,13 @@ static void lane_main(Lane* const lane_) lua_State* const _L{ lane_->L }; LuaError _rc{ LuaError::ERRRUN }; - if (lane_->status == Lane::Pending) { // nothing wrong happened during preparation, we can work + if (lane_->status.load(std::memory_order_acquire) == Lane::Pending) { // nothing wrong happened during preparation, we can work // At this point, the lane function and arguments are on the stack, possibly preceded by the error handler int const _errorHandlerCount{ lane_->errorHandlerCount() }; int _nargs{ lua_gettop(_L) - 1 - _errorHandlerCount }; { std::unique_lock _guard{ lane_->doneMutex }; - lane_->status = Lane::Running; // Pending -> Running + lane_->status.store(Lane::Running, std::memory_order_release); // Pending -> Running } PrepareLaneHelpers(lane_); @@ -726,15 +727,15 @@ static void lane_main(Lane* const lane_) if (_rc == LuaError::YIELD) { // change our status to suspended, and wait until someone wants us to resume std::unique_lock _guard{ lane_->doneMutex }; - lane_->status = Lane::Suspended; // Running -> Suspended + lane_->status.store(Lane::Suspended, std::memory_order_release); // Running -> Suspended lane_->doneCondVar.notify_one(); // wait until the user wants us to resume // TODO: do I update waiting_on or not, so that the lane can be woken by cancellation requests here? // lane_->waiting_on = &lane_->doneCondVar; - lane_->doneCondVar.wait(_guard, [lane_]() { return lane_->status == Lane::Resuming; }); + lane_->doneCondVar.wait(_guard, [lane_]() { return lane_->status.load(std::memory_order_acquire) == Lane::Resuming; }); // here lane_->doneMutex is locked again // lane_->waiting_on = nullptr; - lane_->status = Lane::Running; // Resuming -> Running + lane_->status.store(Lane::Running, std::memory_order_release); // Resuming -> Running // on the stack we find the values pushed by lane:resume() _nargs = lua_gettop(_L); } @@ -789,7 +790,7 @@ static void lane_main(Lane* const lane_) Lane::Status const _st{ (_rc == LuaError::OK) ? Lane::Done : kCancelError.equals(_L, StackIndex{ 1 }) ? Lane::Cancelled : Lane::Error }; // 'doneMutex' protects the -> Done|Error|Cancelled state change, and the Running|Suspended|Resuming state change too std::lock_guard _guard{ lane_->doneMutex }; - lane_->status = _st; + lane_->status.store(_st, std::memory_order_release); lane_->doneCondVar.notify_one(); // wake up master (while 'lane_->doneMutex' is on) } @@ -842,7 +843,7 @@ static LUAG_FUNC(lane_gc) } // We can read 'lane->status' without locks, but not wait for it - if (_lane->status < Lane::Done) { + if (_lane->status.load(std::memory_order_acquire) < Lane::Done) { // still running: will have to be cleaned up later _lane->selfdestructAdd(); assert(_lane->selfdestruct_next); @@ -918,7 +919,7 @@ CancelResult Lane::cancel(CancelOp const op_, std::chrono::time_point= Lane::Done) { + if (status.load(std::memory_order_acquire) >= Lane::Done) { // say "ok" by default, including when lane is already done return CancelResult::Cancelled; } @@ -944,7 +945,7 @@ CancelResult Lane::cancel(CancelOp const op_, std::chrono::time_pointnotify_all(); } } @@ -1058,7 +1059,7 @@ void Lane::pushIndexedResult(lua_State* const L_, int const key_) const STACK_GROW(L_, 3); lua_getiuservalue(L_, kIdxSelf, UserValueIndex{ 1 }); // L_: lane ... {uv} - if (status != Lane::Error) { + if (status.load(std::memory_order_acquire) != Lane::Error) { lua_rawgeti(L_, -1, key_); // L_: lane ... {uv} uv[i] lua_remove(L_, -2); // L_: lane ... uv[i] return; @@ -1194,7 +1195,7 @@ int Lane::storeResults(lua_State* const L_) return _stored; } - switch (status) { + switch (status.load(std::memory_order_acquire)) { default: // this is an internal error, we probably never get here lua_settop(L_, 0); // L_: @@ -1258,7 +1259,7 @@ int Lane::storeResults(lua_State* const L_) STACK_CHECK(L_, 0); LUA_ASSERT(L_, lua_gettop(L) == 0 && nresults == 0); // if we are suspended, all we want to do is gather the current yielded values - if (status != Lane::Suspended) { + if (status.load(std::memory_order_acquire) != Lane::Suspended) { // debugName is a pointer to string possibly interned in the lane's state, that no longer exists when the state is closed // so store it in the userdata uservalue at a key that can't possibly collide securizeDebugName(L_); @@ -1300,7 +1301,7 @@ int Lane::storeResults(lua_State* const L_) static_assert(5 == static_cast>(Done)); static_assert(6 == static_cast>(Error)); static_assert(7 == static_cast>(Cancelled)); - auto const _status{ static_cast>(status) }; + auto const _status{ static_cast>(status.load(std::memory_order_acquire)) }; if (_status < 0 || _status > 7) { // should never happen, but better safe than sorry return ""; } @@ -1316,5 +1317,10 @@ bool Lane::waitForCompletion(std::chrono::time_point // return doneCondVar.wait_until(lock, token, secs_, [this](){ return status >= Lane::Done; }); // wait until the lane stops working with its state (either Suspended or Done+) - return doneCondVar.wait_until(_guard, until_, [this]() { return status == Lane::Suspended || status >= Lane::Done; }); + return doneCondVar.wait_until(_guard, until_, [this]() + { + auto const _status{ status.load(std::memory_order_acquire) }; + return _status == Lane::Suspended || _status >= Lane::Done; + } + ); } diff --git a/src/lane.hpp b/src/lane.hpp index 7821112..4fd0f6d 100644 --- a/src/lane.hpp +++ b/src/lane.hpp @@ -112,7 +112,8 @@ class Lane // M: prepares the state, and reads results // S: while S is running, M must keep out of modifying the state - Status volatile status{ Pending }; + std::atomic status{ Pending }; + static_assert(std::atomic::is_always_lock_free); // // M: sets to Pending (before launching) // S: updates -> Running/Waiting/Suspended -> Done/Error/Cancelled diff --git a/src/lanes.cpp b/src/lanes.cpp index df57bb4..c65fc1c 100644 --- a/src/lanes.cpp +++ b/src/lanes.cpp @@ -312,7 +312,7 @@ LUAG_FUNC(lane_new) { std::lock_guard _guard{ lane->doneMutex }; // this will cause lane_main to skip actual running (because we are not Pending anymore) - lane->status = Lane::Running; + lane->status.store(Lane::Running, std::memory_order_release); } // unblock the thread so that it can terminate gracefully #ifndef __PROSPERO__ diff --git a/src/linda.cpp b/src/linda.cpp index 9658e38..80f62d3 100644 --- a/src/linda.cpp +++ b/src/linda.cpp @@ -671,9 +671,9 @@ LUAG_FUNC(linda_receive) Lane::Status _prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings if (_lane != nullptr) { // change status of lane to "waiting" - _prev_status = _lane->status; // Running, most likely + _prev_status = _lane->status.load(std::memory_order_acquire); // Running, most likely LUA_ASSERT(L_, _prev_status == Lane::Running); // but check, just in case - _lane->status = Lane::Waiting; + _lane->status.store(Lane::Waiting, std::memory_order_release); LUA_ASSERT(L_, _lane->waiting_on == nullptr); _lane->waiting_on = &_linda->writeHappened; } @@ -684,7 +684,7 @@ LUAG_FUNC(linda_receive) _try_again = (_status == std::cv_status::no_timeout); // detect spurious wakeups if (_lane != nullptr) { _lane->waiting_on = nullptr; - _lane->status = _prev_status; + _lane->status.store(_prev_status, std::memory_order_release); } } } @@ -816,9 +816,9 @@ LUAG_FUNC(linda_send) Lane::Status _prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings if (_lane != nullptr) { // change status of lane to "waiting" - _prev_status = _lane->status; // Running, most likely + _prev_status = _lane->status.load(std::memory_order_acquire); // Running, most likely LUA_ASSERT(L_, _prev_status == Lane::Running); // but check, just in case - _lane->status = Lane::Waiting; + _lane->status.store(Lane::Waiting, std::memory_order_release); LUA_ASSERT(L_, _lane->waiting_on == nullptr); _lane->waiting_on = &_linda->readHappened; } @@ -829,7 +829,7 @@ LUAG_FUNC(linda_send) _try_again = (status == std::cv_status::no_timeout); // detect spurious wakeups if (_lane != nullptr) { _lane->waiting_on = nullptr; - _lane->status = _prev_status; + _lane->status.store(_prev_status, std::memory_order_release); } } } -- cgit v1.2.3-55-g6feb