aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBenoit Germain <bnt.germain@gmail.com>2012-06-12 07:47:33 +0200
committerBenoit Germain <bnt.germain@gmail.com>2012-06-12 07:47:33 +0200
commit2f1efc2d1f63adbd8099991ff9b0f0e35b52a26f (patch)
tree7e4df947ede97e509b33a68124c0319b304c8d1c /src
parentad54bd374ff86f2d523167fadfb07d36e223e966 (diff)
downloadlanes-2f1efc2d1f63adbd8099991ff9b0f0e35b52a26f.tar.gz
lanes-2f1efc2d1f63adbd8099991ff9b0f0e35b52a26f.tar.bz2
lanes-2f1efc2d1f63adbd8099991ff9b0f0e35b52a26f.zip
* linda:receive() batched mode now accepts a max_count optional argument
Diffstat (limited to 'src')
-rw-r--r--src/keeper.lua26
-rw-r--r--src/lanes.c55
2 files changed, 50 insertions, 31 deletions
diff --git a/src/keeper.lua b/src/keeper.lua
index 77bf880..1f17599 100644
--- a/src/keeper.lua
+++ b/src/keeper.lua
@@ -87,7 +87,6 @@ local fifo_peek = function( fifo, count)
87end 87end
88 88
89local fifo_pop = function( fifo, count) 89local fifo_pop = function( fifo, count)
90 if count > fifo.count then error("list is too short") end
91 local first = fifo.first 90 local first = fifo.first
92 local last = first + count - 1 91 local last = first + count - 1
93 local out = { unpack( fifo, first, last)} 92 local out = { unpack( fifo, first, last)}
@@ -178,7 +177,7 @@ end
178-- 177--
179function receive( ud, ...) 178function receive( ud, ...)
180 179
181 local data, _ = tables( ud) 180 local data = tables( ud)
182 181
183 for i = 1, select( '#', ...) do 182 for i = 1, select( '#', ...) do
184 local key = select( i, ...) 183 local key = select( i, ...)
@@ -186,7 +185,7 @@ function receive( ud, ...)
186 if fifo and fifo.count > 0 then 185 if fifo and fifo.count > 0 then
187 local val = fifo_pop( fifo, 1) 186 local val = fifo_pop( fifo, 1)
188 if val ~= nil then 187 if val ~= nil then
189 return val, key 188 return val, key
190 end 189 end
191 end 190 end
192 end 191 end
@@ -194,17 +193,20 @@ end
194 193
195 194
196----- 195-----
197-- [val1, ... valCOUNT]= receive_batched( linda_deep_ud, batch_sentinel, key , COUNT) 196-- [val1, ... valCOUNT]= receive_batched( linda_deep_ud, key , min_COUNT, max_COUNT)
198-- 197--
199-- Read any of the given keys, consuming the data found. Keys are read in 198-- Read a single key, consuming the data found.
200-- order.
201-- 199--
202receive_batched = function( ud, batch_sentinel, key, count) 200receive_batched = function( ud, key, min_count, max_count)
203 if count > 0 then 201 if min_count > 0 then
204 local data, _ = tables( ud) 202 local fifo = tables( ud)[key]
205 local fifo = data[key] 203 if fifo then
206 if fifo and fifo.count >= count then 204 local fifo_count = fifo.count
207 return fifo_pop( fifo, count) 205 if fifo_count >= min_count then
206 max_count = max_count or min_count
207 max_count = (max_count > fifo_count) and fifo_count or max_count
208 return fifo_pop( fifo, max_count)
209 end
208 end 210 end
209 end 211 end
210end 212end
diff --git a/src/lanes.c b/src/lanes.c
index d777be1..931af98 100644
--- a/src/lanes.c
+++ b/src/lanes.c
@@ -51,7 +51,7 @@
51 * ... 51 * ...
52 */ 52 */
53 53
54char const* VERSION = "3.1.2"; 54char const* VERSION = "3.1.3";
55 55
56/* 56/*
57=============================================================================== 57===============================================================================
@@ -398,16 +398,16 @@ LUAG_FUNC( linda_send)
398 * Consumes a single value from the Linda, in any key. 398 * Consumes a single value from the Linda, in any key.
399 * Returns: received value (which is consumed from the slot), and the key which had it 399 * Returns: received value (which is consumed from the slot), and the key which had it
400 400
401 * [val1, ... valCOUNT]= linda_receive( linda_ud, [timeout_secs_num=-1], linda.batched, key_num|str|bool|lightuserdata, COUNT) 401 * [val1, ... valCOUNT]= linda_receive( linda_ud, [timeout_secs_num=-1], linda.batched, key_num|str|bool|lightuserdata, min_COUNT[, max_COUNT])
402 * Consumes COUNT values from the linda, from a single key. 402 * Consumes between min_COUNT and max_COUNT values from the linda, from a single key.
403 * returns the COUNT consumed values, or nil if there weren't enough values to consume 403 * returns the actual consumed values, or nil if there weren't enough values to consume
404 * 404 *
405 */ 405 */
406#define BATCH_SENTINEL "270e6c9d-280f-4983-8fee-a7ecdda01475" 406#define BATCH_SENTINEL "270e6c9d-280f-4983-8fee-a7ecdda01475"
407LUAG_FUNC( linda_receive) 407LUAG_FUNC( linda_receive)
408{ 408{
409 struct s_Linda *linda = lua_toLinda( L, 1); 409 struct s_Linda *linda = lua_toLinda( L, 1);
410 int pushed, expected_pushed; 410 int pushed, expected_pushed_min, expected_pushed_max;
411 bool_t cancel = FALSE; 411 bool_t cancel = FALSE;
412 char *keeper_receive; 412 char *keeper_receive;
413 413
@@ -426,27 +426,44 @@ LUAG_FUNC( linda_receive)
426 ++ key_i; 426 ++ key_i;
427 } 427 }
428 428
429 // make sure the keys are of a valid type
430 check_key_types( L, key_i, lua_gettop( L));
431
432 // are we in batched mode? 429 // are we in batched mode?
433 lua_pushliteral( L, BATCH_SENTINEL);
434 if( lua_equal( L, key_i, -1))
435 {
436 keeper_receive = "receive_batched";
437 expected_pushed = (int)luaL_checkinteger( L, key_i + 2);
438 }
439 else
440 { 430 {
441 keeper_receive = "receive"; 431 int is_batched;
442 expected_pushed = 2; 432 lua_pushliteral( L, BATCH_SENTINEL);
433 is_batched = lua_equal( L, key_i, -1);
434 lua_pop( L, 1);
435 if( is_batched)
436 {
437 // no need to pass linda.batched in the keeper state
438 ++ key_i;
439 // make sure the keys are of a valid type
440 check_key_types( L, key_i, key_i);
441 // receive multiple values from a single slot
442 keeper_receive = "receive_batched";
443 // we expect a user-defined amount of return value
444 expected_pushed_min = (int)luaL_checkinteger( L, key_i + 1);
445 expected_pushed_max = (int)luaL_optinteger( L, key_i + 2, expected_pushed_min);
446 if( expected_pushed_min > expected_pushed_max)
447 {
448 luaL_error( L, "batched min/max error");
449 }
450 }
451 else
452 {
453 // make sure the keys are of a valid type
454 check_key_types( L, key_i, lua_gettop( L));
455 // receive a single value, checking multiple slots
456 keeper_receive = "receive";
457 // we expect a single (value, key) pair of returned values
458 expected_pushed_min = expected_pushed_max = 2;
459 }
443 } 460 }
444 lua_pop( L, 1);
445 461
446 { 462 {
447 struct s_Keeper *K = keeper_acquire( linda); 463 struct s_Keeper *K = keeper_acquire( linda);
448 for( ;;) 464 for( ;;)
449 { 465 {
466 // all arguments of receive() but the first are passed to the keeper's receive function
450 pushed = keeper_call( K->L, keeper_receive, L, linda, key_i); 467 pushed = keeper_call( K->L, keeper_receive, L, linda, key_i);
451 if( pushed < 0) 468 if( pushed < 0)
452 { 469 {
@@ -454,7 +471,7 @@ LUAG_FUNC( linda_receive)
454 } 471 }
455 if( pushed > 0) 472 if( pushed > 0)
456 { 473 {
457 ASSERT_L( pushed == expected_pushed); 474 ASSERT_L( pushed >= expected_pushed_min && pushed <= expected_pushed_max);
458 // replace sentinels with real nils 475 // replace sentinels with real nils
459 keeper_toggle_nil_sentinels( L, lua_gettop( L) - pushed, 0); 476 keeper_toggle_nil_sentinels( L, lua_gettop( L) - pushed, 0);
460 // To be done from within the 'K' locking area 477 // To be done from within the 'K' locking area