From 2f1efc2d1f63adbd8099991ff9b0f0e35b52a26f Mon Sep 17 00:00:00 2001 From: Benoit Germain Date: Tue, 12 Jun 2012 07:47:33 +0200 Subject: * linda:receive() batched mode now accepts a max_count optional argument --- src/keeper.lua | 26 ++++++++++++++------------ src/lanes.c | 55 ++++++++++++++++++++++++++++++++++++------------------- 2 files changed, 50 insertions(+), 31 deletions(-) (limited to 'src') diff --git a/src/keeper.lua b/src/keeper.lua index 77bf880..1f17599 100644 --- a/src/keeper.lua +++ b/src/keeper.lua @@ -87,7 +87,6 @@ local fifo_peek = function( fifo, count) end local fifo_pop = function( fifo, count) - if count > fifo.count then error("list is too short") end local first = fifo.first local last = first + count - 1 local out = { unpack( fifo, first, last)} @@ -178,7 +177,7 @@ end -- function receive( ud, ...) - local data, _ = tables( ud) + local data = tables( ud) for i = 1, select( '#', ...) do local key = select( i, ...) @@ -186,7 +185,7 @@ function receive( ud, ...) if fifo and fifo.count > 0 then local val = fifo_pop( fifo, 1) if val ~= nil then - return val, key + return val, key end end end @@ -194,17 +193,20 @@ end ----- --- [val1, ... valCOUNT]= receive_batched( linda_deep_ud, batch_sentinel, key , COUNT) +-- [val1, ... valCOUNT]= receive_batched( linda_deep_ud, key , min_COUNT, max_COUNT) -- --- Read any of the given keys, consuming the data found. Keys are read in --- order. +-- Read a single key, consuming the data found. -- -receive_batched = function( ud, batch_sentinel, key, count) - if count > 0 then - local data, _ = tables( ud) - local fifo = data[key] - if fifo and fifo.count >= count then - return fifo_pop( fifo, count) +receive_batched = function( ud, key, min_count, max_count) + if min_count > 0 then + local fifo = tables( ud)[key] + if fifo then + local fifo_count = fifo.count + if fifo_count >= min_count then + max_count = max_count or min_count + max_count = (max_count > fifo_count) and fifo_count or max_count + return fifo_pop( fifo, max_count) + end end end end diff --git a/src/lanes.c b/src/lanes.c index d777be1..931af98 100644 --- a/src/lanes.c +++ b/src/lanes.c @@ -51,7 +51,7 @@ * ... */ -char const* VERSION = "3.1.2"; +char const* VERSION = "3.1.3"; /* =============================================================================== @@ -398,16 +398,16 @@ LUAG_FUNC( linda_send) * Consumes a single value from the Linda, in any key. * Returns: received value (which is consumed from the slot), and the key which had it - * [val1, ... valCOUNT]= linda_receive( linda_ud, [timeout_secs_num=-1], linda.batched, key_num|str|bool|lightuserdata, COUNT) - * Consumes COUNT values from the linda, from a single key. - * returns the COUNT consumed values, or nil if there weren't enough values to consume + * [val1, ... valCOUNT]= linda_receive( linda_ud, [timeout_secs_num=-1], linda.batched, key_num|str|bool|lightuserdata, min_COUNT[, max_COUNT]) + * Consumes between min_COUNT and max_COUNT values from the linda, from a single key. + * returns the actual consumed values, or nil if there weren't enough values to consume * */ #define BATCH_SENTINEL "270e6c9d-280f-4983-8fee-a7ecdda01475" LUAG_FUNC( linda_receive) { struct s_Linda *linda = lua_toLinda( L, 1); - int pushed, expected_pushed; + int pushed, expected_pushed_min, expected_pushed_max; bool_t cancel = FALSE; char *keeper_receive; @@ -426,27 +426,44 @@ LUAG_FUNC( linda_receive) ++ key_i; } - // make sure the keys are of a valid type - check_key_types( L, key_i, lua_gettop( L)); - // are we in batched mode? - lua_pushliteral( L, BATCH_SENTINEL); - if( lua_equal( L, key_i, -1)) - { - keeper_receive = "receive_batched"; - expected_pushed = (int)luaL_checkinteger( L, key_i + 2); - } - else { - keeper_receive = "receive"; - expected_pushed = 2; + int is_batched; + lua_pushliteral( L, BATCH_SENTINEL); + is_batched = lua_equal( L, key_i, -1); + lua_pop( L, 1); + if( is_batched) + { + // no need to pass linda.batched in the keeper state + ++ key_i; + // make sure the keys are of a valid type + check_key_types( L, key_i, key_i); + // receive multiple values from a single slot + keeper_receive = "receive_batched"; + // we expect a user-defined amount of return value + expected_pushed_min = (int)luaL_checkinteger( L, key_i + 1); + expected_pushed_max = (int)luaL_optinteger( L, key_i + 2, expected_pushed_min); + if( expected_pushed_min > expected_pushed_max) + { + luaL_error( L, "batched min/max error"); + } + } + else + { + // make sure the keys are of a valid type + check_key_types( L, key_i, lua_gettop( L)); + // receive a single value, checking multiple slots + keeper_receive = "receive"; + // we expect a single (value, key) pair of returned values + expected_pushed_min = expected_pushed_max = 2; + } } - lua_pop( L, 1); { struct s_Keeper *K = keeper_acquire( linda); for( ;;) { + // all arguments of receive() but the first are passed to the keeper's receive function pushed = keeper_call( K->L, keeper_receive, L, linda, key_i); if( pushed < 0) { @@ -454,7 +471,7 @@ LUAG_FUNC( linda_receive) } if( pushed > 0) { - ASSERT_L( pushed == expected_pushed); + ASSERT_L( pushed >= expected_pushed_min && pushed <= expected_pushed_max); // replace sentinels with real nils keeper_toggle_nil_sentinels( L, lua_gettop( L) - pushed, 0); // To be done from within the 'K' locking area -- cgit v1.2.3-55-g6feb