From 1bff784b474261212a996ac9fc59389d53a69590 Mon Sep 17 00:00:00 2001 From: Benoit Germain Date: Tue, 15 Apr 2025 18:21:41 +0200 Subject: Linda batched mode rework * linda.batched special value is removed * new function linda:receive_batched --- src/intercopycontext.cpp | 2 +- src/lanes.lua | 5 +- src/linda.cpp | 293 ++++++++++++++++++++++++----------------------- src/linda.hpp | 5 - src/lindafactory.cpp | 4 - 5 files changed, 155 insertions(+), 154 deletions(-) (limited to 'src') diff --git a/src/intercopycontext.cpp b/src/intercopycontext.cpp index 93a8160..a93615b 100644 --- a/src/intercopycontext.cpp +++ b/src/intercopycontext.cpp @@ -994,7 +994,7 @@ bool InterCopyContext::interCopyLightuserdata() const // recognize and print known UniqueKey names here if constexpr (USE_DEBUG_SPEW()) { bool _found{ false }; - static constexpr std::array, 3> kKeysToCheck{ kLindaBatched, kCancelError, kNilSentinel }; + static constexpr std::array, 2> kKeysToCheck{ kCancelError, kNilSentinel }; for (UniqueKey const& _key : kKeysToCheck) { if (_key.equals(L1, L1_i)) { DEBUGSPEW_CODE(DebugSpew(nullptr) << _key.debugName); diff --git a/src/lanes.lua b/src/lanes.lua index 98f8c20..bd94a14 100644 --- a/src/lanes.lua +++ b/src/lanes.lua @@ -603,7 +603,6 @@ local configure_timers = function() return next_wakeup -- may be 'nil' end -- check_timers() - local timer_gateway_batched = timerLinda.batched set_finalizer(function(err, stk) if err and type(err) ~= "userdata" then error("LanesTimer error: "..tostring(err)) @@ -628,7 +627,7 @@ local configure_timers = function() if _timerKey == TGW_KEY then assert(getmetatable(_what) == "Linda") -- '_what' should be a linda on which the client sets a timer - local _, key, wakeup_at, period = timerLinda:receive(0, timer_gateway_batched, TGW_KEY, 3) + local _, key, wakeup_at, period = timerLinda:receive_batched(0, TGW_KEY, 3) assert(key) set_timer(_what, key, wakeup_at, period and period > 0 and period or nil) elseif _timerKey == TGW_QUERY then @@ -758,7 +757,7 @@ local genlock = function(linda_, key_, N) -- 'nil' timeout allows 'key_' to be numeric return linda_:send(timeout, key_, trues(M_)) -- suspends until been able to push them else - local _k, _v = linda_:receive(nil, linda_.batched, key_, -M_) + local _k, _v = linda_:receive_batched(nil, key_, -M_) -- propagate cancel_error if we got it, else return true or false return (_v == cancel_error and _v) or (_k and true or false) end diff --git a/src/linda.cpp b/src/linda.cpp index 1119d71..0cdacfa 100644 --- a/src/linda.cpp +++ b/src/linda.cpp @@ -62,7 +62,7 @@ namespace { case LuaType::LIGHTUSERDATA: { - static constexpr std::array, 3> kKeysToCheck{ kLindaBatched, kCancelError, kNilSentinel }; + static constexpr std::array, 2> kKeysToCheck{ kCancelError, kNilSentinel }; for (UniqueKey const& _key : kKeysToCheck) { if (_key.equals(L_, _i)) { raise_luaL_error(L_, "argument #%d: can't use %s as a slot", _i, _key.debugName.data()); @@ -123,6 +123,8 @@ namespace { return 0; } + // ############################################################################################# + // a helper to process the timeout argument of linda:send() and linda:receive() [[nodiscard]] static auto ProcessTimeoutArg(lua_State* const L_) @@ -144,6 +146,142 @@ namespace { return std::make_pair(_key_i, _until); } + // ############################################################################################# + + // the implementation for linda:receive() and linda:receive_batched() + static int ReceiveInternal(lua_State* const L_, bool const batched_) + { + Linda* const _linda{ ToLinda(L_, StackIndex{ 1 }) }; + + auto const [_key_i, _until] = ProcessTimeoutArg(L_); + + keeper_api_t _selected_keeper_receive{ nullptr }; + int _expected_pushed_min{ 0 }, _expected_pushed_max{ 0 }; + // are we in batched mode? + if (batched_) { + // make sure the keys are of a valid type + CheckKeyTypes(L_, _key_i, _key_i); + // receive multiple values from a single slot + _selected_keeper_receive = KEEPER_API(receive_batched); + // we expect a user-defined amount of return value + _expected_pushed_min = (int) luaL_checkinteger(L_, _key_i + 1); + if (_expected_pushed_min < 1) { + raise_luaL_argerror(L_, StackIndex{ _key_i + 1 }, "bad min count"); + } + _expected_pushed_max = (int) luaL_optinteger(L_, _key_i + 2, _expected_pushed_min); + // don't forget to count the slot in addition to the values + ++_expected_pushed_min; + ++_expected_pushed_max; + if (_expected_pushed_min > _expected_pushed_max) { + raise_luaL_argerror(L_, StackIndex{ _key_i + 2 }, "batched min/max error"); + } + } else { + // make sure the keys are of a valid type + CheckKeyTypes(L_, _key_i, StackIndex{ lua_gettop(L_) }); + // receive a single value, checking multiple slots + _selected_keeper_receive = KEEPER_API(receive); + // we expect a single (value, slot) pair of returned values + _expected_pushed_min = _expected_pushed_max = 2; + } + + Lane* const _lane{ kLanePointerRegKey.readLightUserDataValue(L_) }; + Keeper* const _keeper{ _linda->whichKeeper() }; + KeeperState const _K{ _keeper ? _keeper->K : KeeperState{ static_cast(nullptr) } }; + if (_K == nullptr) + return 0; + + CancelRequest _cancel{ CancelRequest::None }; + KeeperCallResult _pushed{}; + + STACK_CHECK_START_REL(_K, 0); + for (bool _try_again{ true };;) { + if (_lane != nullptr) { + _cancel = _lane->cancelRequest.load(std::memory_order_relaxed); + } + _cancel = (_cancel != CancelRequest::None) + ? _cancel + : ((_linda->cancelStatus == Linda::Cancelled) ? CancelRequest::Soft : CancelRequest::None); + // if user wants to cancel, or looped because of a timeout, the call returns without sending anything + if (!_try_again || _cancel != CancelRequest::None) { + _pushed.emplace(0); + break; + } + + // all arguments of receive() but the first are passed to the keeper's receive function + STACK_CHECK(_K, 0); + _pushed = keeper_call(_K, _selected_keeper_receive, L_, _linda, _key_i); + if (!_pushed.has_value()) { + break; + } + if (_pushed.value() > 0) { + LUA_ASSERT(L_, _pushed.value() >= _expected_pushed_min && _pushed.value() <= _expected_pushed_max); + if (kRestrictedChannel.equals(L_, StackIndex{ kIdxTop })) { + raise_luaL_error(L_, "Key is restricted"); + } + _linda->readHappened.notify_all(); + break; + } + + if (std::chrono::steady_clock::now() >= _until) { + break; /* instant timeout */ + } + + // nothing received, wait until timeout or signalled that we should try again + { + Lane::Status _prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings + if (_lane != nullptr) { + // change status of lane to "waiting" + _prev_status = _lane->status.load(std::memory_order_acquire); // Running, most likely + LUA_ASSERT(L_, _prev_status == Lane::Running); // but check, just in case + LUA_ASSERT(L_, _lane->waiting_on == nullptr); + _lane->waiting_on = &_linda->writeHappened; + _lane->status.store(Lane::Waiting, std::memory_order_release); + } + // not enough data to read: wakeup when data was sent, or when timeout is reached + std::unique_lock _guard{ _keeper->mutex, std::adopt_lock }; + std::cv_status const _status{ _linda->writeHappened.wait_until(_guard, _until) }; + _guard.release(); // we don't want to unlock the mutex on exit! + _try_again = (_status == std::cv_status::no_timeout); // detect spurious wakeups + if (_lane != nullptr) { + _lane->waiting_on = nullptr; + _lane->status.store(_prev_status, std::memory_order_release); + } + } + } + STACK_CHECK(_K, 0); + + if (!_pushed.has_value()) { + raise_luaL_error(L_, "tried to copy unsupported types"); + } + + switch (_cancel) { + case CancelRequest::None: + { + int const _nbPushed{ _pushed.value() }; + if (_nbPushed == 0) { + // not enough data in the linda slot to fulfill the request, return nil, "timeout" + lua_pushnil(L_); + luaG_pushstring(L_, "timeout"); + return 2; + } + return _nbPushed; + } + + case CancelRequest::Soft: + // if user wants to soft-cancel, the call returns nil, kCancelError + lua_pushnil(L_); + kCancelError.pushKey(L_); + return 2; + + case CancelRequest::Hard: + // raise an error interrupting execution only in case of hard cancel + raise_cancel_error(L_); // raises an error and doesn't return + + default: + raise_luaL_error(L_, "internal error: unknown cancel request"); + } + } + // ############################################################################################# // ############################################################################################# } // namespace @@ -650,153 +788,25 @@ LUAG_FUNC(linda_limit) // ################################################################################################# /* - * 2 modes of operation - * [val, slot]= linda:receive([timeout_secs_num=nil], key_num|str|bool|lightuserdata [, ...] ) + * [val, slot] = linda:receive([timeout_secs_num=nil], key_num|str|bool|lightuserdata [, ...] ) * Consumes a single value from the Linda, in any slot. * Returns: received value (which is consumed from the slot), and the slot which had it - - * [val1, ... valCOUNT]= linda_receive( linda_ud, [timeout_secs_num=-1], linda.batched, key_num|str|bool|lightuserdata, min_COUNT[, max_COUNT]) - * Consumes between min_COUNT and max_COUNT values from the linda, from a single slot. - * returns the actual consumed values, or nil if there weren't enough values to consume */ LUAG_FUNC(linda_receive) { - static constexpr lua_CFunction _receive{ - +[](lua_State* const L_) { - Linda* const _linda{ ToLinda(L_, StackIndex{ 1 }) }; - - auto [_key_i, _until] = ProcessTimeoutArg(L_); - - keeper_api_t _selected_keeper_receive{ nullptr }; - int _expected_pushed_min{ 0 }, _expected_pushed_max{ 0 }; - // are we in batched mode? - if (kLindaBatched.equals(L_, _key_i)) { - // no need to pass linda.batched in the keeper state - ++_key_i; - // make sure the keys are of a valid type - CheckKeyTypes(L_, _key_i, _key_i); - // receive multiple values from a single slot - _selected_keeper_receive = KEEPER_API(receive_batched); - // we expect a user-defined amount of return value - _expected_pushed_min = (int) luaL_checkinteger(L_, _key_i + 1); - if (_expected_pushed_min < 1) { - raise_luaL_argerror(L_, StackIndex{ _key_i + 1 }, "bad min count"); - } - _expected_pushed_max = (int) luaL_optinteger(L_, _key_i + 2, _expected_pushed_min); - // don't forget to count the slot in addition to the values - ++_expected_pushed_min; - ++_expected_pushed_max; - if (_expected_pushed_min > _expected_pushed_max) { - raise_luaL_argerror(L_, StackIndex{ _key_i + 2 }, "batched min/max error"); - } - } else { - // make sure the keys are of a valid type - CheckKeyTypes(L_, _key_i, StackIndex{ lua_gettop(L_) }); - // receive a single value, checking multiple slots - _selected_keeper_receive = KEEPER_API(receive); - // we expect a single (value, slot) pair of returned values - _expected_pushed_min = _expected_pushed_max = 2; - } - - Lane* const _lane{ kLanePointerRegKey.readLightUserDataValue(L_) }; - Keeper* const _keeper{ _linda->whichKeeper() }; - KeeperState const _K{ _keeper ? _keeper->K : KeeperState{ static_cast(nullptr) } }; - if (_K == nullptr) - return 0; - - CancelRequest _cancel{ CancelRequest::None }; - KeeperCallResult _pushed{}; - - STACK_CHECK_START_REL(_K, 0); - for (bool _try_again{ true };;) { - if (_lane != nullptr) { - _cancel = _lane->cancelRequest.load(std::memory_order_relaxed); - } - _cancel = (_cancel != CancelRequest::None) - ? _cancel - : ((_linda->cancelStatus == Linda::Cancelled) ? CancelRequest::Soft : CancelRequest::None); - // if user wants to cancel, or looped because of a timeout, the call returns without sending anything - if (!_try_again || _cancel != CancelRequest::None) { - _pushed.emplace(0); - break; - } - - // all arguments of receive() but the first are passed to the keeper's receive function - STACK_CHECK(_K, 0); - _pushed = keeper_call(_K, _selected_keeper_receive, L_, _linda, _key_i); - if (!_pushed.has_value()) { - break; - } - if (_pushed.value() > 0) { - LUA_ASSERT(L_, _pushed.value() >= _expected_pushed_min && _pushed.value() <= _expected_pushed_max); - if (kRestrictedChannel.equals(L_, StackIndex{ kIdxTop })) { - raise_luaL_error(L_, "Key is restricted"); - } - _linda->readHappened.notify_all(); - break; - } - - if (std::chrono::steady_clock::now() >= _until) { - break; /* instant timeout */ - } - - // nothing received, wait until timeout or signalled that we should try again - { - Lane::Status _prev_status{ Lane::Error }; // prevent 'might be used uninitialized' warnings - if (_lane != nullptr) { - // change status of lane to "waiting" - _prev_status = _lane->status.load(std::memory_order_acquire); // Running, most likely - LUA_ASSERT(L_, _prev_status == Lane::Running); // but check, just in case - LUA_ASSERT(L_, _lane->waiting_on == nullptr); - _lane->waiting_on = &_linda->writeHappened; - _lane->status.store(Lane::Waiting, std::memory_order_release); - } - // not enough data to read: wakeup when data was sent, or when timeout is reached - std::unique_lock _guard{ _keeper->mutex, std::adopt_lock }; - std::cv_status const _status{ _linda->writeHappened.wait_until(_guard, _until) }; - _guard.release(); // we don't want to unlock the mutex on exit! - _try_again = (_status == std::cv_status::no_timeout); // detect spurious wakeups - if (_lane != nullptr) { - _lane->waiting_on = nullptr; - _lane->status.store(_prev_status, std::memory_order_release); - } - } - } - STACK_CHECK(_K, 0); - - if (!_pushed.has_value()) { - raise_luaL_error(L_, "tried to copy unsupported types"); - } - - switch (_cancel) { - case CancelRequest::None: - { - int const _nbPushed{ _pushed.value() }; - if (_nbPushed == 0) { - // not enough data in the linda slot to fulfill the request, return nil, "timeout" - lua_pushnil(L_); - luaG_pushstring(L_, "timeout"); - return 2; - } - return _nbPushed; - } - - case CancelRequest::Soft: - // if user wants to soft-cancel, the call returns nil, kCancelError - lua_pushnil(L_); - kCancelError.pushKey(L_); - return 2; + return Linda::ProtectedCall(L_, [](lua_State* const L_) { return ReceiveInternal(L_, false); }); +} - case CancelRequest::Hard: - // raise an error interrupting execution only in case of hard cancel - raise_cancel_error(L_); // raises an error and doesn't return +// ################################################################################################# - default: - raise_luaL_error(L_, "internal error: unknown cancel request"); - } - } - }; - return Linda::ProtectedCall(L_, _receive); +/* + * [val1, ... valCOUNT] = linda_receive_batched( linda_ud, [timeout_secs_num=-1], key_num|str|bool|lightuserdata, min_COUNT[, max_COUNT]) + * Consumes between min_COUNT and max_COUNT values from the linda, from a single slot. + * returns the actual consumed values, or nil if there weren't enough values to consume + */ +LUAG_FUNC(linda_receive_batched) +{ + return Linda::ProtectedCall(L_, [](lua_State* const L_) { return ReceiveInternal(L_, true); }); } // ################################################################################################# @@ -1102,6 +1112,7 @@ namespace { { "get", LG_linda_get }, { "limit", LG_linda_limit }, { "receive", LG_linda_receive }, + { "receive_batched", LG_linda_receive_batched }, { "restrict", LG_linda_restrict }, { "send", LG_linda_send }, { "set", LG_linda_set }, diff --git a/src/linda.hpp b/src/linda.hpp index 01ca7e1..2d5c9dc 100644 --- a/src/linda.hpp +++ b/src/linda.hpp @@ -8,11 +8,6 @@ struct Keeper; // ################################################################################################# -// xxh64 of string "kLindaBatched" generated at https://www.pelock.com/products/hash-calculator -static constexpr UniqueKey kLindaBatched{ 0xB8234DF772646567ull, "linda.batched" }; - -// ################################################################################################# - DECLARE_UNIQUE_TYPE(LindaGroup, int); class Linda final diff --git a/src/lindafactory.cpp b/src/lindafactory.cpp index 6e3f759..42d0984 100644 --- a/src/lindafactory.cpp +++ b/src/lindafactory.cpp @@ -53,10 +53,6 @@ void LindaFactory::createMetatable(lua_State* L_) const // the linda functions luaG_registerlibfuncs(L_, mLindaMT); - // some constants - kLindaBatched.pushKey(L_); // L_: mt kLindaBatched - lua_setfield(L_, -2, "batched"); // L_: mt - kNilSentinel.pushKey(L_); // L_: mt kNilSentinel lua_setfield(L_, -2, "null"); // L_: mt -- cgit v1.2.3-55-g6feb