From d93a433732c5ca72923100f5ab8139a8a5f072b6 Mon Sep 17 00:00:00 2001 From: Benoit Germain Date: Fri, 25 Apr 2025 18:45:45 +0200 Subject: Code factorization in linda send/receive implementation --- src/linda.cpp | 104 ++++++++++++++++++++++------------------------------------ 1 file changed, 40 insertions(+), 64 deletions(-) (limited to 'src/linda.cpp') diff --git a/src/linda.cpp b/src/linda.cpp index fa28385..5fb8279 100644 --- a/src/linda.cpp +++ b/src/linda.cpp @@ -146,6 +146,42 @@ namespace { return std::make_pair(_key_i, _until); } + // ############################################################################################# + static bool WaitInternal([[maybe_unused]] lua_State* const L_, Lane* const lane_, Linda* const linda_, Keeper* const keeper_, std::condition_variable& waitingOn_, std::chrono::time_point until_) + { + Lane::Status _prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings + if (lane_ != nullptr) { + // change status of lane to "waiting" + _prev_status = lane_->status.load(std::memory_order_acquire); // Running, most likely + LUA_ASSERT(L_, _prev_status == Lane::Running); // but check, just in case + LUA_ASSERT(L_, lane_->waiting_on == nullptr); + lane_->waiting_on = &waitingOn_; + lane_->status.store(Lane::Waiting, std::memory_order_release); + } + + // wait until the final target date by small increments, interrupting regularly so that we can check for cancel requests, + // in case some timing issue caused a cancel request to be issued, and the condvar signalled, before we actually wait for it + auto const [_forceTryAgain, _until_check_cancel] = std::invoke([until_, wakePeriod = linda_->getWakePeriod()] { + auto _until_check_cancel{ std::chrono::time_point::max() }; + if (wakePeriod.count() > 0.0f) { + _until_check_cancel = std::chrono::steady_clock::now() + std::chrono::duration_cast(wakePeriod); + } + bool const _forceTryAgain{ _until_check_cancel < until_ }; + return std::make_tuple(_forceTryAgain, _forceTryAgain ? _until_check_cancel : until_); + }); + + // operation can't complete: wake when it is signalled to be possible, or when timeout is reached + std::unique_lock _guard{ keeper_->mutex, std::adopt_lock }; + std::cv_status const _status{ waitingOn_.wait_until(_guard, _until_check_cancel) }; + _guard.release(); // we don't want to unlock the mutex on exit! + bool const _try_again{ _forceTryAgain || (_status == std::cv_status::no_timeout) }; // detect spurious wakeups + if (lane_ != nullptr) { + lane_->waiting_on = nullptr; + lane_->status.store(_prev_status, std::memory_order_release); + } + return _try_again; + } + // ############################################################################################# // the implementation for linda:receive() and linda:receive_batched() @@ -201,6 +237,7 @@ namespace { _cancel = (_cancel != CancelRequest::None) ? _cancel : ((_linda->cancelStatus == Linda::Cancelled) ? CancelRequest::Soft : CancelRequest::None); + // if user wants to cancel, or looped because of a timeout, the call returns without sending anything if (!_try_again || _cancel != CancelRequest::None) { _pushed.emplace(0); @@ -227,38 +264,7 @@ namespace { } // nothing received, wait until timeout or signalled that we should try again - { - Lane::Status _prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings - if (_lane != nullptr) { - // change status of lane to "waiting" - _prev_status = _lane->status.load(std::memory_order_acquire); // Running, most likely - LUA_ASSERT(L_, _prev_status == Lane::Running); // but check, just in case - LUA_ASSERT(L_, _lane->waiting_on == nullptr); - _lane->waiting_on = &_linda->writeHappened; - _lane->status.store(Lane::Waiting, std::memory_order_release); - } - - // wait until the final target date by small increments, interrupting regularly so that we can check for cancel requests, - // in case some timing issue caused a cancel request to be issued, and the condvar signalled, before we actually wait for it - auto const [_forceTryAgain, _until_check_cancel] = std::invoke([_until, wakePeriod = _linda->getWakePeriod()] { - auto _until_check_cancel{ std::chrono::time_point::max() }; - if (wakePeriod.count() > 0.0f) { - _until_check_cancel = std::chrono::steady_clock::now() + std::chrono::duration_cast(wakePeriod); - } - bool const _forceTryAgain{ _until_check_cancel < _until }; - return std::make_tuple(_forceTryAgain, _forceTryAgain ? _until_check_cancel : _until); - }); - - // not enough data to read: wakeup when data was sent, or when timeout is reached - std::unique_lock _guard{ _keeper->mutex, std::adopt_lock }; - std::cv_status const _status{ _linda->writeHappened.wait_until(_guard, _until_check_cancel) }; - _guard.release(); // we don't want to unlock the mutex on exit! - _try_again = _forceTryAgain || (_status == std::cv_status::no_timeout); // detect spurious wakeups - if (_lane != nullptr) { - _lane->waiting_on = nullptr; - _lane->status.store(_prev_status, std::memory_order_release); - } - } + _try_again = WaitInternal(L_, _lane, _linda, _keeper, _linda->writeHappened, _until); } STACK_CHECK(_K, 0); @@ -916,6 +922,7 @@ LUAG_FUNC(linda_send) _cancel = (_cancel != CancelRequest::None) ? _cancel : ((_linda->cancelStatus == Linda::Cancelled) ? CancelRequest::Soft : CancelRequest::None); + // if user wants to cancel, or looped because of a timeout, the call returns without sending anything if (!_try_again || _cancel != CancelRequest::None) { _pushed.emplace(0); @@ -948,38 +955,7 @@ LUAG_FUNC(linda_send) } // storage limit hit, wait until timeout or signalled that we should try again - { - Lane::Status _prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings - if (_lane != nullptr) { - // change status of lane to "waiting" - _prev_status = _lane->status.load(std::memory_order_acquire); // Running, most likely - LUA_ASSERT(L_, _prev_status == Lane::Running); // but check, just in case - LUA_ASSERT(L_, _lane->waiting_on == nullptr); - _lane->waiting_on = &_linda->readHappened; - _lane->status.store(Lane::Waiting, std::memory_order_release); - } - - // wait until the final target date by small increments, interrupting regularly so that we can check for cancel requests, - // in case some timing issue caused a cancel request to be issued, and the condvar signalled, before we actually wait for it - auto const [_forceTryAgain, _until_check_cancel] = std::invoke([_until, wakePeriod = _linda->getWakePeriod()] { - auto _until_check_cancel{ std::chrono::time_point::max() }; - if (wakePeriod.count() > 0.0f) { - _until_check_cancel = std::chrono::steady_clock::now() + std::chrono::duration_cast(wakePeriod); - } - bool const _forceTryAgain{ _until_check_cancel < _until }; - return std::make_tuple(_forceTryAgain, _forceTryAgain ? _until_check_cancel : _until); - }); - - // could not send because no room: wait until some data was read before trying again, or until timeout is reached - std::unique_lock _guard{ _keeper->mutex, std::adopt_lock }; - std::cv_status const status{ _linda->readHappened.wait_until(_guard, _until_check_cancel) }; - _guard.release(); // we don't want to unlock the mutex on exit! - _try_again = _forceTryAgain || (status == std::cv_status::no_timeout); // detect spurious wakeups - if (_lane != nullptr) { - _lane->waiting_on = nullptr; - _lane->status.store(_prev_status, std::memory_order_release); - } - } + _try_again = WaitInternal(L_, _lane, _linda, _keeper, _linda->readHappened, _until); } STACK_CHECK(_K, 0); -- cgit v1.2.3-55-g6feb