From 706d064e17d19c888c6d1e4d404ad34ac808589c Mon Sep 17 00:00:00 2001 From: Benoit Germain Date: Tue, 15 Apr 2025 16:18:41 +0200 Subject: A bit of code factorization --- src/linda.cpp | 174 +++++++++++++++++++++++++++++----------------------------- 1 file changed, 86 insertions(+), 88 deletions(-) (limited to 'src') diff --git a/src/linda.cpp b/src/linda.cpp index a094a8f..1119d71 100644 --- a/src/linda.cpp +++ b/src/linda.cpp @@ -43,7 +43,6 @@ namespace { // ############################################################################################# // ############################################################################################# - static void CheckKeyTypes(lua_State* const L_, StackIndex const start_, StackIndex const end_) { STACK_CHECK_START_REL(L_, 0); @@ -124,6 +123,27 @@ namespace { return 0; } + // a helper to process the timeout argument of linda:send() and linda:receive() + [[nodiscard]] + static auto ProcessTimeoutArg(lua_State* const L_) + { + StackIndex _key_i{ 2 }; // index of first slot, if timeout not there + + std::chrono::time_point _until{ std::chrono::time_point::max() }; + if (luaG_type(L_, StackIndex{ 2 }) == LuaType::NUMBER) { // we don't want to use lua_isnumber() because of autocoercion + lua_Duration const _duration{ lua_tonumber(L_, 2) }; + if (_duration.count() >= 0.0) { + _until = std::chrono::steady_clock::now() + std::chrono::duration_cast(_duration); + } else { + raise_luaL_argerror(L_, StackIndex{ 2 }, "duration cannot be < 0"); + } + ++_key_i; + } else if (lua_isnil(L_, 2)) { // alternate explicit "infinite timeout" by passing nil before the slot + ++_key_i; + } + return std::make_pair(_key_i, _until); + } + // ############################################################################################# // ############################################################################################# } // namespace @@ -644,20 +664,8 @@ LUAG_FUNC(linda_receive) static constexpr lua_CFunction _receive{ +[](lua_State* const L_) { Linda* const _linda{ ToLinda(L_, StackIndex{ 1 }) }; - StackIndex _key_i{ 2 }; // index of first slot, if timeout not there - std::chrono::time_point _until{ std::chrono::time_point::max() }; - if (luaG_type(L_, StackIndex{ 2 }) == LuaType::NUMBER) { // we don't want to use lua_isnumber() because of autocoercion - lua_Duration const _duration{ lua_tonumber(L_, 2) }; - if (_duration.count() >= 0.0) { - _until = std::chrono::steady_clock::now() + std::chrono::duration_cast(_duration); - } else { - raise_luaL_argerror(L_, StackIndex{ 2 }, "duration cannot be < 0"); - } - ++_key_i; - } else if (lua_isnil(L_, 2)) { // alternate explicit "infinite timeout" by passing nil before the slot - ++_key_i; - } + auto [_key_i, _until] = ProcessTimeoutArg(L_); keeper_api_t _selected_keeper_receive{ nullptr }; int _expected_pushed_min{ 0 }, _expected_pushed_max{ 0 }; @@ -698,6 +706,7 @@ LUAG_FUNC(linda_receive) CancelRequest _cancel{ CancelRequest::None }; KeeperCallResult _pushed{}; + STACK_CHECK_START_REL(_K, 0); for (bool _try_again{ true };;) { if (_lane != nullptr) { @@ -713,6 +722,7 @@ LUAG_FUNC(linda_receive) } // all arguments of receive() but the first are passed to the keeper's receive function + STACK_CHECK(_K, 0); _pushed = keeper_call(_K, _selected_keeper_receive, L_, _linda, _key_i); if (!_pushed.has_value()) { break; @@ -848,20 +858,8 @@ LUAG_FUNC(linda_send) static constexpr lua_CFunction _send{ +[](lua_State* const L_) { Linda* const _linda{ ToLinda(L_, StackIndex{ 1 }) }; - StackIndex _key_i{ 2 }; // index of first slot, if timeout not there - std::chrono::time_point _until{ std::chrono::time_point::max() }; - if (luaG_type(L_, StackIndex{ 2 }) == LuaType::NUMBER) { // we don't want to use lua_isnumber() because of autocoercion - lua_Duration const _duration{ lua_tonumber(L_, 2) }; - if (_duration.count() >= 0.0) { - _until = std::chrono::steady_clock::now() + std::chrono::duration_cast(_duration); - } else { - raise_luaL_argerror(L_, StackIndex{ 2 }, "duration cannot be < 0"); - } - ++_key_i; - } else if (lua_isnil(L_, 2)) { // alternate explicit "infinite timeout" by passing nil before the slot - ++_key_i; - } + auto const [_key_i, _until] = ProcessTimeoutArg(L_); // make sure the slot is of a valid type CheckKeyTypes(L_, _key_i, _key_i); @@ -873,78 +871,78 @@ LUAG_FUNC(linda_send) raise_luaL_error(L_, "no data to send"); } + Lane* const _lane{ kLanePointerRegKey.readLightUserDataValue(L_) }; + Keeper* const _keeper{ _linda->whichKeeper() }; + KeeperState const _K{ _keeper ? _keeper->K : KeeperState{ static_cast(nullptr) } }; + if (_K == nullptr) + return 0; + bool _ret{ false }; CancelRequest _cancel{ CancelRequest::None }; - KeeperCallResult _pushed; - { - Lane* const _lane{ kLanePointerRegKey.readLightUserDataValue(L_) }; - Keeper* const _keeper{ _linda->whichKeeper() }; - KeeperState const _K{ _keeper ? _keeper->K : KeeperState{ static_cast(nullptr) } }; - if (_K == nullptr) - return 0; + KeeperCallResult _pushed{}; - STACK_CHECK_START_REL(_K, 0); - for (bool _try_again{ true };;) { - if (_lane != nullptr) { - _cancel = _lane->cancelRequest.load(std::memory_order_relaxed); - } - _cancel = (_cancel != CancelRequest::None) - ? _cancel - : ((_linda->cancelStatus == Linda::Cancelled) ? CancelRequest::Soft : CancelRequest::None); - // if user wants to cancel, or looped because of a timeout, the call returns without sending anything - if (!_try_again || _cancel != CancelRequest::None) { - _pushed.emplace(0); - break; - } + STACK_CHECK_START_REL(_K, 0); + for (bool _try_again{ true };;) { + if (_lane != nullptr) { + _cancel = _lane->cancelRequest.load(std::memory_order_relaxed); + } + _cancel = (_cancel != CancelRequest::None) + ? _cancel + : ((_linda->cancelStatus == Linda::Cancelled) ? CancelRequest::Soft : CancelRequest::None); + // if user wants to cancel, or looped because of a timeout, the call returns without sending anything + if (!_try_again || _cancel != CancelRequest::None) { + _pushed.emplace(0); + break; + } - STACK_CHECK(_K, 0); - _pushed = keeper_call(_K, KEEPER_API(send), L_, _linda, _key_i); - if (!_pushed.has_value()) { - break; - } - LUA_ASSERT(L_, _pushed.value() == 1); + // all arguments of send() but the first are passed to the keeper's send function + STACK_CHECK(_K, 0); + _pushed = keeper_call(_K, KEEPER_API(send), L_, _linda, _key_i); + if (!_pushed.has_value()) { + break; + } + LUA_ASSERT(L_, _pushed.value() == 1); - if (kRestrictedChannel.equals(L_, StackIndex{ kIdxTop })) { - raise_luaL_error(L_, "Key is restricted"); - } - _ret = lua_toboolean(L_, -1) ? true : false; - lua_pop(L_, 1); + if (kRestrictedChannel.equals(L_, StackIndex{ kIdxTop })) { + raise_luaL_error(L_, "Key is restricted"); + } + _ret = lua_toboolean(L_, -1) ? true : false; + lua_pop(L_, 1); - if (_ret) { - // Wake up ALL waiting threads - _linda->writeHappened.notify_all(); - break; - } + if (_ret) { + // Wake up ALL waiting threads + _linda->writeHappened.notify_all(); + break; + } - // instant timout to bypass the wait syscall - if (std::chrono::steady_clock::now() >= _until) { - break; /* no wait; instant timeout */ - } + // instant timout to bypass the wait syscall + if (std::chrono::steady_clock::now() >= _until) { + break; /* no wait; instant timeout */ + } - // storage limit hit, wait until timeout or signalled that we should try again - { - Lane::Status _prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings - if (_lane != nullptr) { - // change status of lane to "waiting" - _prev_status = _lane->status.load(std::memory_order_acquire); // Running, most likely - LUA_ASSERT(L_, _prev_status == Lane::Running); // but check, just in case - LUA_ASSERT(L_, _lane->waiting_on == nullptr); - _lane->waiting_on = &_linda->readHappened; - _lane->status.store(Lane::Waiting, std::memory_order_release); - } - // could not send because no room: wait until some data was read before trying again, or until timeout is reached - std::unique_lock _guard{ _keeper->mutex, std::adopt_lock }; - std::cv_status const status{ _linda->readHappened.wait_until(_guard, _until) }; - _guard.release(); // we don't want to unlock the mutex on exit! - _try_again = (status == std::cv_status::no_timeout); // detect spurious wakeups - if (_lane != nullptr) { - _lane->waiting_on = nullptr; - _lane->status.store(_prev_status, std::memory_order_release); - } + // storage limit hit, wait until timeout or signalled that we should try again + { + Lane::Status _prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings + if (_lane != nullptr) { + // change status of lane to "waiting" + _prev_status = _lane->status.load(std::memory_order_acquire); // Running, most likely + LUA_ASSERT(L_, _prev_status == Lane::Running); // but check, just in case + LUA_ASSERT(L_, _lane->waiting_on == nullptr); + _lane->waiting_on = &_linda->readHappened; + _lane->status.store(Lane::Waiting, std::memory_order_release); + } + // could not send because no room: wait until some data was read before trying again, or until timeout is reached + std::unique_lock _guard{ _keeper->mutex, std::adopt_lock }; + std::cv_status const status{ _linda->readHappened.wait_until(_guard, _until) }; + _guard.release(); // we don't want to unlock the mutex on exit! + _try_again = (status == std::cv_status::no_timeout); // detect spurious wakeups + if (_lane != nullptr) { + _lane->waiting_on = nullptr; + _lane->status.store(_prev_status, std::memory_order_release); } } - STACK_CHECK(_K, 0); } + STACK_CHECK(_K, 0); if (!_pushed.has_value()) { raise_luaL_error(L_, "tried to copy unsupported types"); -- cgit v1.2.3-55-g6feb