diff options
| author | Benoit Germain <benoit.germain@ubisoft.com> | 2025-04-25 18:45:45 +0200 |
|---|---|---|
| committer | Benoit Germain <benoit.germain@ubisoft.com> | 2025-04-25 18:45:45 +0200 |
| commit | d93a433732c5ca72923100f5ab8139a8a5f072b6 (patch) | |
| tree | cdbb37b2ec0f0baa239a0786404062e62f462502 /src | |
| parent | fe51e51bb07b1cb662b0d4f972604f56e63f8432 (diff) | |
| download | lanes-d93a433732c5ca72923100f5ab8139a8a5f072b6.tar.gz lanes-d93a433732c5ca72923100f5ab8139a8a5f072b6.tar.bz2 lanes-d93a433732c5ca72923100f5ab8139a8a5f072b6.zip | |
Code factorization in linda send/receive implementation
Diffstat (limited to 'src')
| -rw-r--r-- | src/linda.cpp | 104 |
1 files changed, 40 insertions, 64 deletions
diff --git a/src/linda.cpp b/src/linda.cpp index fa28385..5fb8279 100644 --- a/src/linda.cpp +++ b/src/linda.cpp | |||
| @@ -147,6 +147,42 @@ namespace { | |||
| 147 | } | 147 | } |
| 148 | 148 | ||
| 149 | // ############################################################################################# | 149 | // ############################################################################################# |
| 150 | static bool WaitInternal([[maybe_unused]] lua_State* const L_, Lane* const lane_, Linda* const linda_, Keeper* const keeper_, std::condition_variable& waitingOn_, std::chrono::time_point<std::chrono::steady_clock> until_) | ||
| 151 | { | ||
| 152 | Lane::Status _prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings | ||
| 153 | if (lane_ != nullptr) { | ||
| 154 | // change status of lane to "waiting" | ||
| 155 | _prev_status = lane_->status.load(std::memory_order_acquire); // Running, most likely | ||
| 156 | LUA_ASSERT(L_, _prev_status == Lane::Running); // but check, just in case | ||
| 157 | LUA_ASSERT(L_, lane_->waiting_on == nullptr); | ||
| 158 | lane_->waiting_on = &waitingOn_; | ||
| 159 | lane_->status.store(Lane::Waiting, std::memory_order_release); | ||
| 160 | } | ||
| 161 | |||
| 162 | // wait until the final target date by small increments, interrupting regularly so that we can check for cancel requests, | ||
| 163 | // in case some timing issue caused a cancel request to be issued, and the condvar signalled, before we actually wait for it | ||
| 164 | auto const [_forceTryAgain, _until_check_cancel] = std::invoke([until_, wakePeriod = linda_->getWakePeriod()] { | ||
| 165 | auto _until_check_cancel{ std::chrono::time_point<std::chrono::steady_clock>::max() }; | ||
| 166 | if (wakePeriod.count() > 0.0f) { | ||
| 167 | _until_check_cancel = std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::steady_clock::duration>(wakePeriod); | ||
| 168 | } | ||
| 169 | bool const _forceTryAgain{ _until_check_cancel < until_ }; | ||
| 170 | return std::make_tuple(_forceTryAgain, _forceTryAgain ? _until_check_cancel : until_); | ||
| 171 | }); | ||
| 172 | |||
| 173 | // operation can't complete: wake when it is signalled to be possible, or when timeout is reached | ||
| 174 | std::unique_lock<std::mutex> _guard{ keeper_->mutex, std::adopt_lock }; | ||
| 175 | std::cv_status const _status{ waitingOn_.wait_until(_guard, _until_check_cancel) }; | ||
| 176 | _guard.release(); // we don't want to unlock the mutex on exit! | ||
| 177 | bool const _try_again{ _forceTryAgain || (_status == std::cv_status::no_timeout) }; // detect spurious wakeups | ||
| 178 | if (lane_ != nullptr) { | ||
| 179 | lane_->waiting_on = nullptr; | ||
| 180 | lane_->status.store(_prev_status, std::memory_order_release); | ||
| 181 | } | ||
| 182 | return _try_again; | ||
| 183 | } | ||
| 184 | |||
| 185 | // ############################################################################################# | ||
| 150 | 186 | ||
| 151 | // the implementation for linda:receive() and linda:receive_batched() | 187 | // the implementation for linda:receive() and linda:receive_batched() |
| 152 | static int ReceiveInternal(lua_State* const L_, bool const batched_) | 188 | static int ReceiveInternal(lua_State* const L_, bool const batched_) |
| @@ -201,6 +237,7 @@ namespace { | |||
| 201 | _cancel = (_cancel != CancelRequest::None) | 237 | _cancel = (_cancel != CancelRequest::None) |
| 202 | ? _cancel | 238 | ? _cancel |
| 203 | : ((_linda->cancelStatus == Linda::Cancelled) ? CancelRequest::Soft : CancelRequest::None); | 239 | : ((_linda->cancelStatus == Linda::Cancelled) ? CancelRequest::Soft : CancelRequest::None); |
| 240 | |||
| 204 | // if user wants to cancel, or looped because of a timeout, the call returns without sending anything | 241 | // if user wants to cancel, or looped because of a timeout, the call returns without sending anything |
| 205 | if (!_try_again || _cancel != CancelRequest::None) { | 242 | if (!_try_again || _cancel != CancelRequest::None) { |
| 206 | _pushed.emplace(0); | 243 | _pushed.emplace(0); |
| @@ -227,38 +264,7 @@ namespace { | |||
| 227 | } | 264 | } |
| 228 | 265 | ||
| 229 | // nothing received, wait until timeout or signalled that we should try again | 266 | // nothing received, wait until timeout or signalled that we should try again |
| 230 | { | 267 | _try_again = WaitInternal(L_, _lane, _linda, _keeper, _linda->writeHappened, _until); |
| 231 | Lane::Status _prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings | ||
| 232 | if (_lane != nullptr) { | ||
| 233 | // change status of lane to "waiting" | ||
| 234 | _prev_status = _lane->status.load(std::memory_order_acquire); // Running, most likely | ||
| 235 | LUA_ASSERT(L_, _prev_status == Lane::Running); // but check, just in case | ||
| 236 | LUA_ASSERT(L_, _lane->waiting_on == nullptr); | ||
| 237 | _lane->waiting_on = &_linda->writeHappened; | ||
| 238 | _lane->status.store(Lane::Waiting, std::memory_order_release); | ||
| 239 | } | ||
| 240 | |||
| 241 | // wait until the final target date by small increments, interrupting regularly so that we can check for cancel requests, | ||
| 242 | // in case some timing issue caused a cancel request to be issued, and the condvar signalled, before we actually wait for it | ||
| 243 | auto const [_forceTryAgain, _until_check_cancel] = std::invoke([_until, wakePeriod = _linda->getWakePeriod()] { | ||
| 244 | auto _until_check_cancel{ std::chrono::time_point<std::chrono::steady_clock>::max() }; | ||
| 245 | if (wakePeriod.count() > 0.0f) { | ||
| 246 | _until_check_cancel = std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::steady_clock::duration>(wakePeriod); | ||
| 247 | } | ||
| 248 | bool const _forceTryAgain{ _until_check_cancel < _until }; | ||
| 249 | return std::make_tuple(_forceTryAgain, _forceTryAgain ? _until_check_cancel : _until); | ||
| 250 | }); | ||
| 251 | |||
| 252 | // not enough data to read: wakeup when data was sent, or when timeout is reached | ||
| 253 | std::unique_lock<std::mutex> _guard{ _keeper->mutex, std::adopt_lock }; | ||
| 254 | std::cv_status const _status{ _linda->writeHappened.wait_until(_guard, _until_check_cancel) }; | ||
| 255 | _guard.release(); // we don't want to unlock the mutex on exit! | ||
| 256 | _try_again = _forceTryAgain || (_status == std::cv_status::no_timeout); // detect spurious wakeups | ||
| 257 | if (_lane != nullptr) { | ||
| 258 | _lane->waiting_on = nullptr; | ||
| 259 | _lane->status.store(_prev_status, std::memory_order_release); | ||
| 260 | } | ||
| 261 | } | ||
| 262 | } | 268 | } |
| 263 | STACK_CHECK(_K, 0); | 269 | STACK_CHECK(_K, 0); |
| 264 | 270 | ||
| @@ -916,6 +922,7 @@ LUAG_FUNC(linda_send) | |||
| 916 | _cancel = (_cancel != CancelRequest::None) | 922 | _cancel = (_cancel != CancelRequest::None) |
| 917 | ? _cancel | 923 | ? _cancel |
| 918 | : ((_linda->cancelStatus == Linda::Cancelled) ? CancelRequest::Soft : CancelRequest::None); | 924 | : ((_linda->cancelStatus == Linda::Cancelled) ? CancelRequest::Soft : CancelRequest::None); |
| 925 | |||
| 919 | // if user wants to cancel, or looped because of a timeout, the call returns without sending anything | 926 | // if user wants to cancel, or looped because of a timeout, the call returns without sending anything |
| 920 | if (!_try_again || _cancel != CancelRequest::None) { | 927 | if (!_try_again || _cancel != CancelRequest::None) { |
| 921 | _pushed.emplace(0); | 928 | _pushed.emplace(0); |
| @@ -948,38 +955,7 @@ LUAG_FUNC(linda_send) | |||
| 948 | } | 955 | } |
| 949 | 956 | ||
| 950 | // storage limit hit, wait until timeout or signalled that we should try again | 957 | // storage limit hit, wait until timeout or signalled that we should try again |
| 951 | { | 958 | _try_again = WaitInternal(L_, _lane, _linda, _keeper, _linda->readHappened, _until); |
| 952 | Lane::Status _prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings | ||
| 953 | if (_lane != nullptr) { | ||
| 954 | // change status of lane to "waiting" | ||
| 955 | _prev_status = _lane->status.load(std::memory_order_acquire); // Running, most likely | ||
| 956 | LUA_ASSERT(L_, _prev_status == Lane::Running); // but check, just in case | ||
| 957 | LUA_ASSERT(L_, _lane->waiting_on == nullptr); | ||
| 958 | _lane->waiting_on = &_linda->readHappened; | ||
| 959 | _lane->status.store(Lane::Waiting, std::memory_order_release); | ||
| 960 | } | ||
| 961 | |||
| 962 | // wait until the final target date by small increments, interrupting regularly so that we can check for cancel requests, | ||
| 963 | // in case some timing issue caused a cancel request to be issued, and the condvar signalled, before we actually wait for it | ||
| 964 | auto const [_forceTryAgain, _until_check_cancel] = std::invoke([_until, wakePeriod = _linda->getWakePeriod()] { | ||
| 965 | auto _until_check_cancel{ std::chrono::time_point<std::chrono::steady_clock>::max() }; | ||
| 966 | if (wakePeriod.count() > 0.0f) { | ||
| 967 | _until_check_cancel = std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::steady_clock::duration>(wakePeriod); | ||
| 968 | } | ||
| 969 | bool const _forceTryAgain{ _until_check_cancel < _until }; | ||
| 970 | return std::make_tuple(_forceTryAgain, _forceTryAgain ? _until_check_cancel : _until); | ||
| 971 | }); | ||
| 972 | |||
| 973 | // could not send because no room: wait until some data was read before trying again, or until timeout is reached | ||
| 974 | std::unique_lock<std::mutex> _guard{ _keeper->mutex, std::adopt_lock }; | ||
| 975 | std::cv_status const status{ _linda->readHappened.wait_until(_guard, _until_check_cancel) }; | ||
| 976 | _guard.release(); // we don't want to unlock the mutex on exit! | ||
| 977 | _try_again = _forceTryAgain || (status == std::cv_status::no_timeout); // detect spurious wakeups | ||
| 978 | if (_lane != nullptr) { | ||
| 979 | _lane->waiting_on = nullptr; | ||
| 980 | _lane->status.store(_prev_status, std::memory_order_release); | ||
| 981 | } | ||
| 982 | } | ||
| 983 | } | 959 | } |
| 984 | STACK_CHECK(_K, 0); | 960 | STACK_CHECK(_K, 0); |
| 985 | 961 | ||
