From 726aee3fbb909946e69866cc6c4497c5ec365fe8 Mon Sep 17 00:00:00 2001 From: Benoit Germain Date: Thu, 27 Jun 2024 12:40:36 +0200 Subject: linda:limit() and linda:set() return a second value, a string representing the fill status --- docs/index.html | 37 ++++++++++++++-------------- src/keeper.cpp | 75 +++++++++++++++++++++++++++++++++++++++++++------------- src/lane.cpp | 2 +- src/lanes.lua | 3 ++- src/linda.cpp | 20 ++++++++------- tests/atomic.lua | 1 + 6 files changed, 92 insertions(+), 46 deletions(-) diff --git a/docs/index.html b/docs/index.html index 8e84fcb..148b5ab 100644 --- a/docs/index.html +++ b/docs/index.html @@ -1222,15 +1222,16 @@

-	bool|(nil,[lanes.cancel_error|"timeout"]) = h:limit(key, <limit>)
-	>limit< = h:limit(key)
+	bool,string|(nil,[lanes.cancel_error|"timeout"]) = h:limit(key, <limit>)
+	(number|string),string = h:limit(key)
 

By default, queue sizes are unlimited but limits can be enforced using the limit() method. This can be useful to balance execution speeds in a producer/consumer scenario.
A limit of 0 is allowed to block everything. "unlimited" removes the limit.
- If the key was full but the limit change added some room, limit() returns true and the Linda is signalled so that send()-blocked threads are awakened, else the return value is false. - If no limit is provided, limit() returns a single value, the current limit for the specified key.
+ If the key was full but the limit change added some room, limit() first return value is true and the Linda is signalled so that send()-blocked threads are awakened, else the return value is false. + If no limit is provided, limit() first return value is the current limit for the specified key.
+ The second returned value is a string representing the fill status relatively to the key's current limit (one of "over", "under", "exact"). Whether reading or writing, if the Linda is cancelled, limit() returns nil, lanes.cancel_error.

@@ -1292,30 +1293,30 @@

-	true|nil,lanes.cancel_error = linda_h:set(key [, val [, ...]])
+	(bool,string)|(nil,lanes.cancel_error) = linda_h:set(key [, val [, ...]])
 
-	[number,[val [, ...]]|nil,lanes.cancel_error] = linda_h:get(key [, count = 1])
+	(number,[val [, ...]])|(nil,lanes.cancel_error) = linda_h:get(key [, count = 1])
 

- The table access methods are for accessing a slot without queuing or consuming. They can be used for making shared tables of storage among the lanes.
- Writing to a slot never blocks because it ignores the limit. It overwrites existing value and clears any possible queued entries.
- Reading doesn't block either because get() returns: + get()/set() and send()/receive() can be used together; reading a slot essentially peeks the next outcoming value of a queue.
+ get()/set() are for accessing a key without queuing or consuming. They can be used for making shared tables of storage among the lanes.
+ Writing to a key never blocks because it ignores the limit. It overwrites existing values and clears any possible queued entries.
+

+

+ get() can read several values at once, and does not block. Return values ares:

- Table access and send()/receive() can be used together; reading a slot essentially peeks the next outcoming value of a queue. -

- -

- set() signals the Linda for write if a value is stored. If nothing special happens, set() returns nothing.
- If the key was full but the new data count of the key after set() is below its limit, set() returns true and the Linda is also signaled for read so that send()-blocked threads are awakened.

-

- set() can write several values at the specified key, writing nil values is now possible, and clearing the contents at the specified key is done by not providing any value.
- Also, get() can read several values at once. If the key contains no data, get() returns no value. This can be used to separate the case when reading stored nil values. + set() can write several values at the specified key. Writing nil values is possible, and clearing the contents at the specified key is done by not providing any value.
+ If set() actually stores data, the Linda is signalled for write, so that receive()-blocked Lanes are awakened.
+ Clearing the contents of a non-existent key does not create it!
+ If the key was full but the new data count of the key after set() is below its limit, set() first return value is true and the Linda is also signaled for read, so that send()-blocked Lanes are awakened.
+ If the key was not already full, nothing additional happens, and set() first return value is false.
+ The second return value is a string representing the fill status relatively to the key's current limit (one of "over", "under", "exact").

diff --git a/src/keeper.cpp b/src/keeper.cpp index ae09b37..cfeebbb 100644 --- a/src/keeper.cpp +++ b/src/keeper.cpp @@ -66,6 +66,10 @@ class KeyUD static constexpr int kContentsTableIndex{ 1 }; public: + static constexpr std::string_view kUnder{ "under" }; + static constexpr std::string_view kExact{ "exact" }; + static constexpr std::string_view kOver{ "over" }; + int first{ 1 }; int count{ 0 }; LindaLimit limit{ -1 }; @@ -83,6 +87,8 @@ class KeyUD [[nodiscard]] int pop(KeeperState K_, int minCount_, int maxCount_); // keepercall_receive[_batched] void prepareAccess(KeeperState K_, int idx_) const; [[nodiscard]] bool push(KeeperState K_, int count_, bool enforceLimit_); // keepercall_send and keepercall_set + void pushFillStatus(KeeperState K_) const; + static void PushFillStatus(KeeperState K_, KeyUD const* key_); [[nodiscard]] bool reset(KeeperState K_); }; @@ -232,6 +238,35 @@ bool KeyUD::push(KeeperState const K_, int const count_, bool const enforceLimit // ################################################################################################# +void KeyUD::pushFillStatus(KeeperState const K_) const +{ + if (limit < 0) { + luaG_pushstring(K_, kUnder); + return; + } + int const _delta{limit - count}; + if (_delta < 0) { + luaG_pushstring(K_, kOver); + } else if (_delta > 0) { + luaG_pushstring(K_, kUnder); + } else { + luaG_pushstring(K_, kExact); + } +} + +// ################################################################################################# + +void KeyUD::PushFillStatus(KeeperState const K_, KeyUD const* const key_) +{ + if (key_) { + key_->pushFillStatus(K_); // _K: ... + } else { + luaG_pushstring(K_, KeyUD::kUnder); // _K: ... "under" + } +} + +// ################################################################################################# + // expects 'this' on top of the stack bool KeyUD::reset(KeeperState const K_) { @@ -394,10 +429,11 @@ int keepercall_get(lua_State* const L_) // ################################################################################################# // in: linda key [n|nil] -// out: true or nil +// out: boolean, int keepercall_limit(lua_State* const L_) { KeeperState const _K{ L_ }; + STACK_CHECK_START_ABS(_K, lua_gettop(_K)); // no limit to set, means we read and return the current limit instead bool const _reading{ lua_gettop(_K) == 2 }; LindaLimit const _limit{ static_cast(luaL_optinteger(_K, 3, -1)) }; // -1 if we read nil because the argument is absent @@ -408,10 +444,12 @@ int keepercall_limit(lua_State* const L_) lua_rawget(_K, -3); // _K: KeysDB key KeyUD|nil KeyUD* _key{ KeyUD::GetPtr(_K, -1) }; if (_reading) { + // remove any clutter on the stack + lua_settop(_K, 0); // _K: if (_key && _key->limit >= 0) { - lua_pushinteger(_K, _key->limit); // _K: KeysDB key KeyUD limit + lua_pushinteger(_K, _key->limit); // _K: limit } else { // if the key doesn't exist, it is unlimited by default - luaG_pushstring(_K, "unlimited"); // _K: KeysDB key KeyUD "unlimited" + luaG_pushstring(_K, "unlimited"); // _K: "unlimited" } // return a single value: the limit of the key } else { @@ -426,7 +464,9 @@ int keepercall_limit(lua_State* const L_) // this is the case if we detect the key was full but it is no longer the case lua_pushboolean(_K, _key->changeLimit(_limit) ? 1 : 0); // _K: bool } - return 1; + KeyUD::PushFillStatus(_K, _key); // _K: limit|bool + STACK_CHECK(_K, 2); + return 2; } // ################################################################################################# @@ -538,12 +578,12 @@ int keepercall_set(lua_State* const L_) // retrieve KeysDB associated with the linda PushKeysDB(_K, 1); // _K: linda key val... KeysDB lua_replace(_K, 1); // _K: KeysDB key val... + lua_pushvalue(_K, 2); // _K: KeysDB key val... key + lua_rawget(_K, 1); // _K: KeysDB key val KeyUD|nil + KeyUD* _key{ KeyUD::GetPtr(_K, -1) }; - if (lua_gettop(_K) == 2) { // no value to set // _K: KeysDB key - lua_pushvalue(_K, -1); // _K: KeysDB key key - lua_rawget(_K, 1); // _K: KeysDB key KeyUD|nil + if (lua_gettop(_K) == 3) { // no value to set // _K: KeysDB key KeyUD|nil // empty the KeyUD for the specified key: replace uservalue with a virgin table, reset counters, but leave limit unchanged! - KeyUD* const _key{ KeyUD::GetPtr(_K, -1) }; if (_key != nullptr) { // might be nullptr if we set a nonexistent key to nil // _K: KeysDB key KeyUD if (_key->limit < 0) { // KeyUD limit value is the default (unlimited): we can totally remove it lua_pop(_K, 1); // _K: KeysDB key @@ -555,18 +595,17 @@ int keepercall_set(lua_State* const L_) _should_wake_writers = _key->reset(_K); } } + lua_settop(_K, 0); // we are done, remove everything // _K: } else { // set/replace contents stored at the specified key? - int const _count{ lua_gettop(_K) - 2 }; // number of items we want to store - lua_pushvalue(_K, 2); // _K: KeysDB key val... key - lua_rawget(_K, 1); // _K: KeysDB key val... KeyUD|nil - KeyUD* _key{ KeyUD::GetPtr(_K, -1) }; - if (_key == nullptr) { // can be nullptr if we store a value at a new key // KeysDB key val... nil - // no need to wake writers in that case, because a writer can't wait on an inexistent key + int const _count{ lua_gettop(_K) - 3 }; // number of items we want to store // _K: KeysDB key val... KeyUD|nil + if (_key == nullptr) { // can be nullptr if we store a value at a new key // _K: KeysDB key val... nil + assert(lua_isnil(_K, -1)); lua_pop(_K, 1); // _K: KeysDB key val... _key = KeyUD::Create(KeeperState{ _K }); // _K: KeysDB key val... KeyUD lua_pushvalue(_K, 2); // _K: KeysDB key val... KeyUD key lua_pushvalue(_K, -2); // _K: KeysDB key val... KeyUD key KeyUD lua_rawset(_K, 1); // _K: KeysDB key val... KeyUD + // no need to wake writers, because a writer can't wait on an inexistent key } else { // _K: KeysDB key val... KeyUD // the KeyUD exists, we just want to update its contents // we create room if the KeyUD was full but we didn't refill it to the brim with new data @@ -575,10 +614,12 @@ int keepercall_set(lua_State* const L_) // replace the key with the KeyUD in the stack lua_replace(_K, -2 - _count); // _K: KeysDB KeyUD val... [[maybe_unused]] bool const _pushed{ _key->push(_K, _count, false) }; // _K: KeysDB + lua_pop(_K, 1); // _K: } - // stack isn't the same here depending on what we did before, but that's not a problem - lua_pushboolean(_K, _should_wake_writers ? 1 : 0); // _K: ... bool - return 1; + assert(lua_gettop(_K) == 0); + lua_pushboolean(_K, _should_wake_writers ? 1 : 0); // _K: bool + KeyUD::PushFillStatus(_K, _key); // _K: bool + return 2; } // ################################################################################################# diff --git a/src/lane.cpp b/src/lane.cpp index d4175c6..ba24af3 100644 --- a/src/lane.cpp +++ b/src/lane.cpp @@ -435,7 +435,7 @@ static constexpr RegistryUniqueKey kStackTraceRegKey{ 0x3F327747CACAA904ull }; [[nodiscard]] static int lane_error(lua_State* L_) { // error message (any type) - STACK_CHECK_START_ABS(L_, 1); // L_: some_error + STACK_CHECK_START_ABS(L_, 1); // L_: some_error // Don't do stack survey for cancelled lanes. // diff --git a/src/lanes.lua b/src/lanes.lua index 6a4f149..48ebeb6 100644 --- a/src/lanes.lua +++ b/src/lanes.lua @@ -792,7 +792,8 @@ local genatomic = function(linda_, key_, initial_val_) if val ~= cancel_error then val = val + (diff_ or 1.0) -- set() releases the lock by emptying queue - if linda_:set(key_, val) == cancel_error then + local _res, _err = linda_:set(key_, val) + if _err == cancel_error then val = cancel_error end end diff --git a/src/linda.cpp b/src/linda.cpp index f4dd7e7..13627aa 100644 --- a/src/linda.cpp +++ b/src/linda.cpp @@ -477,22 +477,24 @@ LUAG_FUNC(linda_limit) KeeperCallResult _pushed; if (_linda->cancelRequest == CancelRequest::None) { - Keeper* const _keeper{ _linda->whichKeeper() }; if (_unlimited) { LUA_ASSERT(L_, lua_gettop(L_) == 3 && luaG_tostring(L_, 3) == "unlimited"); // inside the Keeper, unlimited is signified with a -1 limit (can't use nil because of nil kNilSentinel conversions!) lua_pop(L_, 1); // L_: linda key lua_pushinteger(L_, -1); // L_: linda key nil } + Keeper* const _keeper{ _linda->whichKeeper() }; _pushed = keeper_call(_keeper->K, KEEPER_API(limit), L_, _linda, 2); - LUA_ASSERT(L_, _pushed.has_value() && (_pushed.value() == 1)); + LUA_ASSERT(L_, _pushed.has_value() && (_pushed.value() == 2) && luaG_type(L_, -1) == LuaType::STRING); if (_nargs == 3) { // 3 args: setting the limit - LUA_ASSERT(L_, luaG_type(L_, -1) == LuaType::BOOLEAN); // changing the limit: no error, boolean value saying if we should wake blocked writer threads - if (lua_toboolean(L_, -1)) { + // changing the limit: no error, boolean value saying if we should wake blocked writer threads + LUA_ASSERT(L_, luaG_type(L_, -2) == LuaType::BOOLEAN); // L_: bool string + if (lua_toboolean(L_, -2)) { _linda->readHappened.notify_all(); // To be done from within the 'K' locking area } } else { // 2 args: reading the limit - LUA_ASSERT(L_, luaG_type(L_, -1) == LuaType::NUMBER || luaG_tostring(L_, -1) == "unlimited"); // reading the limit: a number >=0 or "unlimited" + // reading the limit: a number >=0 or "unlimited" + LUA_ASSERT(L_, luaG_type(L_, -2) == LuaType::NUMBER || luaG_tostring(L_, -2) == "unlimited"); } } else { // linda is cancelled // do nothing and return nil,lanes.cancel_error @@ -500,7 +502,7 @@ LUAG_FUNC(linda_limit) kCancelError.pushKey(L_); _pushed.emplace(2); } - // propagate pushed boolean if any + // propagate returned values return _pushed.value(); } }; @@ -807,7 +809,7 @@ LUAG_FUNC(linda_send) // ################################################################################################# /* - * [true|nil,lanes.cancel_error] = linda:set(key_num|str|bool|lightuserdata [, value [, ...]]) + * (boolean,string)|(nil,lanes.cancel_error) = linda:set(key_num|str|bool|lightuserdata [, value [, ...]]) * * Set one or more value to Linda. Ignores limits. * @@ -827,13 +829,13 @@ LUAG_FUNC(linda_set) Keeper* const _keeper{ _linda->whichKeeper() }; _pushed = keeper_call(_keeper->K, KEEPER_API(set), L_, _linda, 2); if (_pushed.has_value()) { // no error? - LUA_ASSERT(L_, _pushed.value() == 1 && luaG_type(L_, -1) == LuaType::BOOLEAN); + LUA_ASSERT(L_, _pushed.value() == 2 && luaG_type(L_, -1) == LuaType::STRING && luaG_type(L_, -2) == LuaType::BOOLEAN); if (_has_data) { // we put some data in the slot, tell readers that they should wake _linda->writeHappened.notify_all(); // To be done from within the 'K' locking area } - if (lua_toboolean(L_, -1)) { + if (lua_toboolean(L_, -2)) { // the key was full, but it is no longer the case, tell writers they should wake _linda->readHappened.notify_all(); // To be done from within the 'K' locking area } diff --git a/tests/atomic.lua b/tests/atomic.lua index 2de8f52..511a19e 100644 --- a/tests/atomic.lua +++ b/tests/atomic.lua @@ -9,6 +9,7 @@ local lanes = require "lanes" local linda= lanes.linda() local key= "$" +-- TODO: test what happens when we cancel the linda local f= lanes.genatomic( linda, key, 5 ) local v -- cgit v1.2.3-55-g6feb