From cc03c619dafe39cab231045f3c8592398e4b6944 Mon Sep 17 00:00:00 2001
From: Benoit Germain
- This document was revised on 06-Jan-14, and applies to version 3.7.6. + This document was revised on 09-Jan-14, and applies to version 3.7.7.
@@ -1010,13 +1010,13 @@h = lanes.linda( [opt_name]) - bool = h:send( [timeout_secs,] key, ... ) + bool = h:send( [timeout_secs,] key, ...) [key, val] = h:receive( [timeout_secs,] key [, ...]) [key, val [, ...]] = h:receive( timeout, h.batched, key, n_uint_min[, n_uint_max]) - void = h:limit( key, n_uint) + [true] = h:limit( key, n_uint) |
@@ -1024,7 +1024,11 @@
- By default, stack sizes are unlimited but limits can be enforced using the limit method. This can be useful to balance execution speeds in a producer/consumer scenario.
+ By default, stack sizes are unlimited but limits can be enforced using the limit method. This can be useful to balance execution speeds in a producer/consumer scenario. Any negative value removes the limit.
+
+ A limit of 0 is allowed to block everything.
+
+ (Since version 3.7.7) if the key was full but the limit change added some room, limit() returns true and the linda is signalled so that send()-blocked threads are awakened.
@@ -1036,13 +1040,13 @@
- send returns true if the sending succeeded, and false if the queue limit was met, and the queue did not empty enough during the given timeout. + send() returns true if the sending succeeded, and false if the queue limit was met, and the queue did not empty enough during the given timeout.
- Equally, receive returns a key and the value extracted from it, or nothing for timeout. Note that nils can be sent and received; the key value will tell it apart from a timeout.
+ Equally, receive() returns a key and the value extracted from it, or nothing for timeout. Note that nils can be sent and received; the key value will tell it apart from a timeout.
- Version 3.4.0 introduces an API change in the returned values: receive returns the key followed by the value(s), in that order, and not the other way around.
+ Version 3.4.0 introduces an API change in the returned values: receive() returns the key followed by the value(s), in that order, and not the other way around.
@@ -1055,9 +1059,9 @@
- linda_h:set( key, [val]) + [true] = linda_h:set( key, [val]) - [val] = linda_h:get( key ) + [val] = linda_h:get( key) |
@@ -1065,7 +1069,13 @@
- Writing to a slot overwrites existing value, and clears any possible queued entries. Table access and send/receive can be used together; reading a slot essentially peeks the next outcoming value of a queue.
+ Writing to a slot never blocks because it ignores the limit. It overwrites existing value and clears any possible queued entries.
+
+ set() signals the linda from write if a value is stored.
+
+ (Since version 3.7.7) if the key was full but the new data count of the key after set() is below its limit, set() returns true and the linda is also signaled for read so that send()-blocked threads are awakened.
+
+ Table access and send()/receive() can be used together; reading a slot essentially peeks the next outcoming value of a queue.
diff --git a/src/keeper.c b/src/keeper.c index 4c90069..6d5ecbf 100644 --- a/src/keeper.c +++ b/src/keeper.c @@ -95,7 +95,7 @@ static void fifo_new( lua_State* L) } // in: expect fifo ... on top of the stack -// out: nothing, removes all pushed values on the stack +// out: nothing, removes all pushed values from the stack static void fifo_push( lua_State* L, keeper_fifo* fifo, int _count) { int idx = lua_gettop( L) - _count; @@ -349,7 +349,7 @@ int keepercall_receive_batched( lua_State* L) } // in: linda_ud key n -// out: nothing +// out: true or nil int keepercall_limit( lua_State* L) { keeper_fifo* fifo; @@ -358,22 +358,37 @@ int keepercall_limit( lua_State* L) lua_replace( L, 1); // fifos key n lua_pop( L, 1); // fifos key lua_pushvalue( L, -1); // fifos key key - lua_rawget( L, -3); // fifos key fifo + lua_rawget( L, -3); // fifos key fifo|nil fifo = (keeper_fifo*) lua_touserdata( L, -1); if( fifo == NULL) - { + { // fifos key nil lua_pop( L, 1); // fifos key fifo_new( L); // fifos key fifo fifo = (keeper_fifo*) lua_touserdata( L, -1); lua_rawset( L, -3); // fifos } + // remove any clutter on the stack + lua_settop( L, 0); + // return true if we decide that blocked threads waiting to write on that key should be awakened + // this is the case if we detect the key was full but it is no longer the case + if( + ((fifo->limit >= 0) && (fifo->count >= fifo->limit)) // the key was full if limited and count exceeded the previous limit + && ((limit < 0) || (fifo->count < limit)) // the key is not full if unlimited or count is lower than the new limit + ) + { + lua_pushboolean( L, 1); + } + // set the new limit fifo->limit = limit; - return 0; + // return 0 or 1 value + return lua_gettop( L); } //in: linda_ud key [val] +//out: true or nil int keepercall_set( lua_State* L) { + bool_t should_wake_writers = FALSE; STACK_GROW( L, 6); // make sure we have a value on the stack if( lua_gettop( L) == 2) // ud key val? @@ -391,16 +406,18 @@ int keepercall_set( lua_State* L) lua_pushvalue( L, -2); // fifos key val key lua_rawget( L, 1); // fifos key val fifo|nil fifo = (keeper_fifo*) lua_touserdata( L, -1); - if( fifo == NULL) // might be NULL if we set a nonexistent key to nil - { + if( fifo == NULL) // can be NULL if we store a value at a new key + { // fifos key val nil lua_pop( L, 1); // fifos key val fifo_new( L); // fifos key val fifo lua_pushvalue( L, 2); // fifos key val fifo key lua_pushvalue( L, -2); // fifos key val fifo key fifo lua_rawset( L, 1); // fifos key val fifo } - else // the fifo exists, we just want to clear its contents - { + else // the fifo exists, we just want to update its contents + { // fifos key val fifo + // we create room if the fifo was full but it is no longer the case + should_wake_writers = (fifo->limit > 0) && (fifo->count >= fifo->limit); // empty the fifo for the specified key: replace uservalue with a virgin table, reset counters, but leave limit unchanged! lua_newtable( L); // fifos key val fifo {} lua_setuservalue( L, -2); // fifos key val fifo @@ -411,22 +428,35 @@ int keepercall_set( lua_State* L) lua_insert( L, -2); // fifos key fifo val fifo_push( L, fifo, 1); // fifos key fifo } - else // val == nil // fifos key nil - { + else // val == nil: we clear the key contents + { // fifos key nil keeper_fifo* fifo; lua_pop( L, 1); // fifos key - lua_rawget( L, 1); // fifos fifo|nil + lua_pushvalue( L, -1); // fifos key key + lua_rawget( L, 1); // fifos key fifo|nil // empty the fifo for the specified key: replace uservalue with a virgin table, reset counters, but leave limit unchanged! fifo = (keeper_fifo*) lua_touserdata( L, -1); if( fifo != NULL) // might be NULL if we set a nonexistent key to nil - { - lua_newtable( L); // fifos fifo {} - lua_setuservalue( L, -2); // fifos fifo - fifo->first = 1; - fifo->count = 0; + { // fifos key fifo + if( fifo->limit < 0) // fifo limit value is the default (unlimited): we can totally remove it + { + lua_pop( L, 1); // fifos key + lua_pushnil( L); // fifos key nil + lua_rawset( L, -3); // fifos + } + else + { + // we create room if the fifo was full but it is no longer the case + should_wake_writers = (fifo->limit > 0) && (fifo->count >= fifo->limit); + lua_remove( L, -2); // fifos fifo + lua_newtable( L); // fifos fifo {} + lua_setuservalue( L, -2); // fifos fifo + fifo->first = 1; + fifo->count = 0; + } } } - return 0; + return should_wake_writers ? (lua_pushboolean( L, 1), 1) : 0; } // in: linda_ud key @@ -476,11 +506,18 @@ int keepercall_count( lua_State* L) { keeper_fifo* fifo; lua_replace( L, 1); // fifos key - lua_rawget( L, -2); // fifos fifo - fifo = prepare_fifo_access( L, -1); // fifos fifo - lua_pushinteger( L, fifo->count); // fifos fifo count - lua_replace( L, -3); // count fifo - lua_pop( L, 1); // count + lua_rawget( L, -2); // fifos fifo|nil + if( lua_isnil( L, -1)) // the key is unknown + { // fifos nil + lua_remove( L, -2); // nil + } + else // the key is known + { // fifos fifo + fifo = prepare_fifo_access( L, -1); // fifos fifo + lua_pushinteger( L, fifo->count); // fifos fifo count + lua_replace( L, -3); // count fifo + lua_pop( L, 1); // count + } } break; @@ -494,21 +531,22 @@ int keepercall_count( lua_State* L) { keeper_fifo* fifo; lua_pushvalue( L, -1); // out fifos keys key - lua_rawget( L, 2); // out fifos keys fifo - fifo = prepare_fifo_access( L, -1); // out fifos keys fifo + lua_rawget( L, 2); // out fifos keys fifo|nil + fifo = prepare_fifo_access( L, -1); // out fifos keys fifo|nil lua_pop( L, 1); // out fifos keys - if( fifo != NULL) + if( fifo != NULL) // the key is known { lua_pushinteger( L, fifo->count); // out fifos keys count lua_rawset( L, 1); // out fifos keys } - else + else // the key is unknown { lua_pop( L, 1); // out fifos keys } } lua_pop( L, 1); // out } + ASSERT_L( lua_gettop( L) == 1); return 1; } @@ -681,7 +719,7 @@ void keeper_toggle_nil_sentinels( lua_State* L, int _val_i, int _nil_to_sentinel * * Returns: number of return values (pushed to 'L') or -1 in case of error */ -int keeper_call( lua_State *K, keeper_api_t _func, lua_State *L, void *linda, uint_t starting_index) +int keeper_call( lua_State* K, keeper_api_t func_, lua_State* L, void* linda, uint_t starting_index) { int const args = starting_index ? (lua_gettop( L) - starting_index + 1) : 0; int const Ktos = lua_gettop( K); @@ -689,7 +727,7 @@ int keeper_call( lua_State *K, keeper_api_t _func, lua_State *L, void *linda, ui STACK_GROW( K, 2); - PUSH_KEEPER_FUNC( K, _func); + PUSH_KEEPER_FUNC( K, func_); lua_pushlightuserdata( K, linda); diff --git a/src/lanes.c b/src/lanes.c index 3ccf915..d852a20 100644 --- a/src/lanes.c +++ b/src/lanes.c @@ -52,7 +52,7 @@ * ... */ -char const* VERSION = "3.7.6"; +char const* VERSION = "3.7.7"; /* =============================================================================== @@ -672,7 +672,7 @@ LUAG_FUNC( linda_receive) /* -* = linda_set( linda_ud, key_num|str|bool|lightuserdata [,value] ) +* [true] = linda_set( linda_ud, key_num|str|bool|lightuserdata [,value] ) * * Set a value to Linda. * TODO: what do we do if we set to non-nil and limit is 0? @@ -682,6 +682,7 @@ LUAG_FUNC( linda_receive) LUAG_FUNC( linda_set) { struct s_Linda *linda = lua_toLinda( L, 1); + int pushed; bool_t has_value = !lua_isnil( L, 3); luaL_argcheck( L, linda, 1, "expected a linda object!"); luaL_argcheck( L, lua_gettop( L) <= 3, 4, "too many arguments"); @@ -690,31 +691,31 @@ LUAG_FUNC( linda_set) check_key_types( L, 2, 2); { - int pushed; - struct s_Keeper *K = keeper_acquire( linda); + struct s_Keeper* K = keeper_acquire( linda); if( K == NULL) return 0; // no nil->sentinel toggling, we really clear the linda contents for the given key with a set() pushed = keeper_call( K->L, KEEPER_API( set), L, linda, 2); if( pushed >= 0) // no error? { - ASSERT_L( pushed == 0); + ASSERT_L( pushed == 0 || pushed == 1); - /* Set the signal from within 'K' locking. - */ if( has_value) { - SIGNAL_ALL( &linda->write_happened); + // we put some data in the slot, tell readers that they should wake + SIGNAL_ALL( &linda->write_happened); // To be done from within the 'K' locking area + } + if( pushed == 1) + { + // the key was full, but it is no longer the case, tell writers they should wake + ASSERT_L( lua_type( L, -1) == LUA_TBOOLEAN && lua_toboolean( L, -1) == 1); + SIGNAL_ALL( &linda->read_happened); // To be done from within the 'K' locking area } } keeper_release( K); - // must trigger error after keeper state has been released - if( pushed < 0) - { - return luaL_error( L, "tried to copy unsupported types"); - } } - return 0; + // must trigger any error after keeper state has been released + return (pushed < 0) ? luaL_error( L, "tried to copy unsupported types") : pushed; } @@ -783,18 +784,20 @@ LUAG_FUNC( linda_get) /* -* = linda_limit( linda_ud, key_num|str|bool|lightuserdata, uint [, ...] ) +* [true] = linda_limit( linda_ud, key_num|str|bool|lightuserdata, int [, bool] ) * -* Set limits to 1 or more Linda keys. +* Set limit to 1 Linda keys. +* Optionally wake threads waiting to write on the linda, in case the limit enables them to do so */ LUAG_FUNC( linda_limit) { - struct s_Linda* linda= lua_toLinda( L, 1); + struct s_Linda* linda = lua_toLinda( L, 1); int pushed; + bool_t wake_writers = FALSE; + // make sure we got at most 4 arguments: the linda, a key, a limit, and an optional wake-up flag. luaL_argcheck( L, linda, 1, "expected a linda object!"); - // make sure we got a key and a limit - luaL_argcheck( L, lua_gettop( L) == 3, 2, "wrong number of arguments"); + luaL_argcheck( L, lua_gettop( L) <= 4, 2, "wrong number of arguments"); // make sure we got a numeric limit luaL_checknumber( L, 3); // make sure the key is of a valid type @@ -804,16 +807,16 @@ LUAG_FUNC( linda_limit) struct s_Keeper* K = keeper_acquire( linda); if( K == NULL) return 0; pushed = keeper_call( K->L, KEEPER_API( limit), L, linda, 2); - ASSERT_L( pushed <= 0); // either error or no return values - keeper_release( K); - // must trigger error after keeper state has been released - if( pushed < 0) + ASSERT_L( pushed == 0 || pushed == 1); // no error, optional boolean value saying if we should wake blocked writer threads + if( pushed == 1) { - return luaL_error( L, "tried to copy unsupported types"); + ASSERT_L( lua_type( L, -1) == LUA_TBOOLEAN && lua_toboolean( L, -1) == 1); + SIGNAL_ALL( &linda->read_happened); // To be done from within the 'K' locking area } + keeper_release( K); } - - return 0; + // propagate pushed boolean if any + return pushed; } -- cgit v1.2.3-55-g6feb |