From 32ad991eb8c590472607d61e9a831d2ca9db05c5 Mon Sep 17 00:00:00 2001 From: Benoit Germain Date: Thu, 13 Feb 2014 16:09:15 +0100 Subject: more fixes/tweaks about cancelled lindas * bumped version to 3.8.5 * linda:limit() returns lanes.cancel_error on a limited linda * lanes.genlock() and lanes.genatomic() support cancelled lindas by returning lanes.cancel_error whenever appropriate * fixed a possible Lua stack overflow when calling linda:dump() * fixed cases where linda:send() and linda:receive() would not return lanes.cancel_error when they should --- src/keeper.c | 2 + src/lanes.c | 160 ++++++++++++++++++++++++++++------------------------------ src/lanes.lua | 88 +++++++++++++++++++------------- 3 files changed, 132 insertions(+), 118 deletions(-) (limited to 'src') diff --git a/src/keeper.c b/src/keeper.c index 1a696aa..c22bfed 100644 --- a/src/keeper.c +++ b/src/keeper.c @@ -190,6 +190,7 @@ int keeper_push_linda_storage( lua_State* L, void* ptr) struct s_Keeper* K = keeper_acquire( ptr); lua_State* KL = K ? K->L : NULL; if( KL == NULL) return 0; + STACK_GROW( KL, 4); STACK_CHECK( KL); lua_pushlightuserdata( KL, fifos_key); // fifos_key lua_rawget( KL, LUA_REGISTRYINDEX); // fifos @@ -204,6 +205,7 @@ int keeper_push_linda_storage( lua_State* L, void* ptr) } // move data from keeper to destination state KEEPER MAIN lua_pushnil( KL); // storage nil + STACK_GROW( L, 5); STACK_CHECK( L); lua_newtable( L); // out while( lua_next( KL, -2)) // storage key fifo diff --git a/src/lanes.c b/src/lanes.c index cf88171..dbb0a82 100644 --- a/src/lanes.c +++ b/src/lanes.c @@ -52,7 +52,7 @@ * ... */ -char const* VERSION = "3.8.4"; +char const* VERSION = "3.8.5"; /* =============================================================================== @@ -424,13 +424,17 @@ struct s_Linda static void linda_id( lua_State*, char const * const which); -#define lua_toLinda(L,n) ((struct s_Linda *)luaG_todeep( L, linda_id, n )) - +static inline struct s_Linda* lua_toLinda( lua_State* L, int idx_) +{ + struct s_Linda* linda = luaG_todeep( L, linda_id, idx_); + luaL_argcheck( L, linda != NULL, idx_, "expecting a linda object"); + return linda; +} -static void check_key_types( lua_State*L, int _start, int _end) +static void check_key_types( lua_State* L, int start_, int end_) { int i; - for( i = _start; i <= _end; ++ i) + for( i = start_; i <= end_; ++ i) { int t = lua_type( L, i); if( t == LUA_TBOOLEAN || t == LUA_TNUMBER || t == LUA_TSTRING || t == LUA_TLIGHTUSERDATA) @@ -459,8 +463,6 @@ LUAG_FUNC( linda_send) time_d timeout = -1.0; uint_t key_i = 2; // index of first key, if timeout not there - luaL_argcheck( L, linda, 1, "expected a linda object!"); - if( lua_type( L, 2) == LUA_TNUMBER) // we don't want to use lua_isnumber() because of autocoercion { timeout = SIGNAL_TIMEOUT_PREPARE( lua_tonumber( L,2)); @@ -485,12 +487,26 @@ LUAG_FUNC( linda_send) STACK_GROW( L, 1); { + bool_t try_again = TRUE; + struct s_lane* const s = get_lane_from_registry( L); struct s_Keeper* K = keeper_acquire( linda); lua_State* KL = K ? K->L : NULL; // need to do this for 'STACK_CHECK' if( KL == NULL) return 0; STACK_CHECK( KL); for( ;;) { + if( s != NULL) + { + cancel = s->cancel_request; + } + cancel = (cancel != CANCEL_NONE) ? cancel : linda->simulate_cancel; + // if user wants to cancel, or looped because of a timeout, the call returns without sending anything + if( !try_again || cancel != CANCEL_NONE) + { + pushed = 0; + break; + } + STACK_MID( KL, 0); pushed = keeper_call( KL, KEEPER_API( send), L, linda, key_i); if( pushed < 0) @@ -506,28 +522,19 @@ LUAG_FUNC( linda_send) if( ret) { // Wake up ALL waiting threads - // SIGNAL_ALL( &linda->write_happened); break; } + + // instant timout to bypass the if( timeout == 0.0) { break; /* no wait; instant timeout */ } - /* limit faced; push until timeout */ + // storage limit hit, wait until timeout or signalled that we should try again { enum e_status prev_status = ERROR_ST; // prevent 'might be used uninitialized' warnings - struct s_lane* const s = get_lane_from_registry( L); - if( s != NULL) - { - cancel = s->cancel_request; - } - cancel = (cancel != CANCEL_NONE) ? cancel : linda->simulate_cancel; - if( cancel != CANCEL_NONE) // if user wants to cancel, the call returns without sending anything - { - break; - } if( s != NULL) { // change status of lane to "waiting" @@ -537,22 +544,12 @@ LUAG_FUNC( linda_send) ASSERT_L( s->waiting_on == NULL); s->waiting_on = &linda->read_happened; } + // could not send because no room: wait until some data was read before trying again, or until timeout is reached + try_again = SIGNAL_WAIT( &linda->read_happened, &K->lock_, timeout); + if( s != NULL) { - // could not send because no room: wait until some data was read before trying again, or until timeout is reached - bool_t const signalled = SIGNAL_WAIT( &linda->read_happened, &K->lock_, timeout); - if( s != NULL) - { - s->waiting_on = NULL; - s->status = prev_status; - // if a cancel request is pending, be sure to handle it as soon as possible - cancel = s->cancel_request; - } - cancel = (cancel != CANCEL_NONE) ? cancel : linda->simulate_cancel; - if( !signalled || cancel != CANCEL_NONE) - { - // waiting returned after a timeout, or pending cancel: we are done - break; - } + s->waiting_on = NULL; + s->status = prev_status; } } } @@ -569,7 +566,7 @@ LUAG_FUNC( linda_send) switch( cancel) { case CANCEL_SOFT: - // if user wants to soft-cancel, the call returns CANCEL_ERROR + // if user wants to soft-cancel, the call returns lanes.cancel_error lua_pushlightuserdata( L, CANCEL_ERROR); return 1; @@ -606,8 +603,6 @@ LUAG_FUNC( linda_receive) time_d timeout = -1.0; uint_t key_i = 2; - luaL_argcheck( L, linda, 1, "expected a linda object!"); - if( lua_type( L, 2) == LUA_TNUMBER) // we don't want to use lua_isnumber() because of autocoercion { timeout = SIGNAL_TIMEOUT_PREPARE( lua_tonumber( L, 2)); @@ -655,10 +650,24 @@ LUAG_FUNC( linda_receive) } { + bool_t try_again = TRUE; + struct s_lane* const s = get_lane_from_registry( L); struct s_Keeper* K = keeper_acquire( linda); if( K == NULL) return 0; for( ;;) { + if( s != NULL) + { + cancel = s->cancel_request; + } + cancel = (cancel != CANCEL_NONE) ? cancel : linda->simulate_cancel; + // if user wants to cancel, or looped because of a timeout, the call returns without sending anything + if( !try_again || cancel != CANCEL_NONE) + { + pushed = 0; + break; + } + // all arguments of receive() but the first are passed to the keeper's receive function pushed = keeper_call( K->L, keeper_receive, L, linda, key_i); if( pushed < 0) @@ -674,26 +683,16 @@ LUAG_FUNC( linda_receive) // SIGNAL_ALL( &linda->read_happened); break; - } + if( timeout == 0.0) { break; /* instant timeout */ } - /* nothing received; wait until timeout */ + // nothing received, wait until timeout or signalled that we should try again { enum e_status prev_status = ERROR_ST; // prevent 'might be used uninitialized' warnings - struct s_lane* const s = get_lane_from_registry( L); - if( s != NULL) - { - cancel = s->cancel_request; - } - cancel = (cancel != CANCEL_NONE) ? cancel : linda->simulate_cancel; - if( cancel != CANCEL_NONE) // if user wants to cancel, the call returns without providing anything - { - break; - } if( s != NULL) { // change status of lane to "waiting" @@ -703,22 +702,12 @@ LUAG_FUNC( linda_receive) ASSERT_L( s->waiting_on == NULL); s->waiting_on = &linda->write_happened; } + // not enough data to read: wakeup when data was sent, or when timeout is reached + try_again = SIGNAL_WAIT( &linda->write_happened, &K->lock_, timeout); + if( s != NULL) { - // not enough data to read: wakeup when data was sent, or when timeout is reached - bool_t const signalled = SIGNAL_WAIT( &linda->write_happened, &K->lock_, timeout); - if( s != NULL) - { - s->waiting_on = NULL; - s->status = prev_status; - // if a cancel request is pending, be sure to handle it as soon as possible - cancel = s->cancel_request; - } - cancel = (cancel != CANCEL_NONE) ? cancel : linda->simulate_cancel; - if( !signalled || cancel != CANCEL_NONE) - { - // waiting returned after a timeout, or pending cancel: we are done - break; - } + s->waiting_on = NULL; + s->status = prev_status; } } } @@ -761,7 +750,6 @@ LUAG_FUNC( linda_set) struct s_Linda* const linda = lua_toLinda( L, 1); int pushed; bool_t has_value = lua_gettop( L) > 2; - luaL_argcheck( L, linda, 1, "expected a linda object!"); // make sure the key is of a valid type (throws an error if not the case) check_key_types( L, 2, 2); @@ -819,7 +807,6 @@ LUAG_FUNC( linda_count) struct s_Linda* linda = lua_toLinda( L, 1); int pushed; - luaL_argcheck( L, linda, 1, "expected a linda object!"); // make sure the keys are of a valid type check_key_types( L, 2, lua_gettop( L)); @@ -848,7 +835,6 @@ LUAG_FUNC( linda_get) int pushed; int count = luaL_optint( L, 3, 1); luaL_argcheck( L, count >= 1, 3, "count should be >= 1"); - luaL_argcheck( L, linda, 1, "expected a linda object"); luaL_argcheck( L, lua_gettop( L) <= 3, 4, "too many arguments"); // make sure the key is of a valid type (throws an error if not the case) @@ -897,7 +883,6 @@ LUAG_FUNC( linda_limit) bool_t wake_writers = FALSE; // make sure we got 3 arguments: the linda, a key and a limit - luaL_argcheck( L, linda, 1, "expected a linda object!"); luaL_argcheck( L, lua_gettop( L) == 3, 2, "wrong number of arguments"); // make sure we got a numeric limit luaL_checknumber( L, 3); @@ -907,12 +892,22 @@ LUAG_FUNC( linda_limit) { struct s_Keeper* K = keeper_acquire( linda); if( K == NULL) return 0; - pushed = keeper_call( K->L, KEEPER_API( limit), L, linda, 2); - ASSERT_L( pushed == 0 || pushed == 1); // no error, optional boolean value saying if we should wake blocked writer threads - if( pushed == 1) + + if( linda->simulate_cancel == CANCEL_NONE) + { + pushed = keeper_call( K->L, KEEPER_API( limit), L, linda, 2); + ASSERT_L( pushed == 0 || pushed == 1); // no error, optional boolean value saying if we should wake blocked writer threads + if( pushed == 1) + { + ASSERT_L( lua_type( L, -1) == LUA_TBOOLEAN && lua_toboolean( L, -1) == 1); + SIGNAL_ALL( &linda->read_happened); // To be done from within the 'K' locking area + } + } + else // linda is cancelled { - ASSERT_L( lua_type( L, -1) == LUA_TBOOLEAN && lua_toboolean( L, -1) == 1); - SIGNAL_ALL( &linda->read_happened); // To be done from within the 'K' locking area + // do nothing and return lanes.cancel_error + lua_pushlightuserdata( L, CANCEL_ERROR); + pushed = 1; } keeper_release( K); } @@ -929,12 +924,11 @@ LUAG_FUNC( linda_limit) LUAG_FUNC( linda_cancel) { struct s_Linda* linda = lua_toLinda( L, 1); - char const* who = luaL_checkstring( L, 2); + char const* who = luaL_optstring( L, 2, "both"); struct s_Keeper* K; // make sure we got 3 arguments: the linda, a key and a limit - luaL_argcheck( L, linda, 1, "expected a linda object!"); - luaL_argcheck( L, lua_gettop( L) == 2, 2, "wrong number of arguments"); + luaL_argcheck( L, lua_gettop( L) <= 2, 2, "wrong number of arguments"); // signalling must be done from inside the K locking area K = keeper_acquire( linda); @@ -984,11 +978,11 @@ LUAG_FUNC( linda_cancel) * different userdata and won't be known to be essentially the same deep one * without this. */ -LUAG_FUNC( linda_deep ) { - struct s_Linda *linda= lua_toLinda( L, 1 ); - luaL_argcheck( L, linda, 1, "expected a linda object!"); - lua_pushlightuserdata( L, linda ); // just the address - return 1; +LUAG_FUNC( linda_deep) +{ + struct s_Linda* linda= lua_toLinda( L, 1); + lua_pushlightuserdata( L, linda); // just the address + return 1; } @@ -1002,10 +996,10 @@ LUAG_FUNC( linda_deep ) { static int linda_tostring( lua_State* L, int idx_, bool_t opt_) { - struct s_Linda* linda = lua_toLinda( L, idx_); + struct s_Linda* linda = luaG_todeep( L, linda_id, idx_); if( !opt_) { - luaL_argcheck( L, linda, idx_, "expected a linda object!"); + luaL_argcheck( L, linda, idx_, "expecting a linda object"); } if( linda != NULL) { diff --git a/src/lanes.lua b/src/lanes.lua index 1286099..86dbe47 100644 --- a/src/lanes.lua +++ b/src/lanes.lua @@ -585,13 +585,25 @@ end end -- settings.with_timers +-- avoid pulling the whole core module as upvalue when cancel_error is enough +local cancel_error = assert( core.cancel_error) + ---=== Lock & atomic generators ===--- -- These functions are just surface sugar, but make solutions easier to read. -- Not many applications should even need explicit locks or atomic counters. -- --- lock_f= lanes.genlock( linda_h, key [,N_uint=1] ) +-- [true [, ...]= trues(uint) +-- +local function trues( n) + if n > 0 then + return true, trues( n - 1) + end +end + +-- +-- lock_f = lanes.genlock( linda_h, key [,N_uint=1] ) -- -- = lock_f( +M ) -- acquire M -- ...locked... @@ -602,16 +614,10 @@ end -- settings.with_timers -- -- PUBLIC LANES API local genlock = function( linda, key, N) - linda:set( key) -- clears existing data - linda:limit( key, N) - - -- - -- [true [, ...]= trues(uint) - -- - local function trues( n) - if n > 0 then - return true, trues( n - 1) - end + -- clear existing data and set the limit + N = N or 1 + if linda:set( key) == cancel_error or linda:limit( key, N) == cancel_error then + return cancel_error end -- use an optimized version for case N == 1 @@ -623,7 +629,8 @@ local genlock = function( linda, key, N) return linda:send( timeout, key, true) -- suspends until been able to push them else local k = linda:receive( nil, key) - return k and true or false + -- propagate cancel_error if we got it, else return true or false + return k and ((k ~= cancel_error) and true or k) or false end end or @@ -634,34 +641,45 @@ local genlock = function( linda, key, N) return linda:send( timeout, key, trues(M)) -- suspends until been able to push them else local k = linda:receive( nil, linda.batched, key, -M) - return k and true or false + -- propagate cancel_error if we got it, else return true or false + return k and ((k ~= cancel_error) and true or k) or false end end end --- --- atomic_f= lanes.genatomic( linda_h, key [,initial_num=0.0] ) --- --- int= atomic_f( [diff_num=1.0] ) --- --- Returns an access function that allows atomic increment/decrement of the --- number in 'key'. --- --- PUBLIC LANES API -local function genatomic( linda, key, initial_val ) - linda:limit(key,2) -- value [,true] - linda:set(key,initial_val or 0.0) -- clears existing data (also queue) - - return - function(diff) - -- 'nil' allows 'key' to be numeric - linda:send( nil, key, true ) -- suspends until our 'true' is in - local val= linda:get(key) + (diff or 1.0) - linda:set( key, val ) -- releases the lock, by emptying queue - return val - end -end + -- + -- atomic_f = lanes.genatomic( linda_h, key [,initial_num=0.0]) + -- + -- int|cancel_error = atomic_f( [diff_num = 1.0]) + -- + -- Returns an access function that allows atomic increment/decrement of the + -- number in 'key'. + -- + -- PUBLIC LANES API + local genatomic = function( linda, key, initial_val) + -- clears existing data (also queue). the slot may contain the stored value, and an additional boolean value + if linda:limit( key, 2) == cancel_error or linda:set( key, initial_val or 0.0) == cancel_error then + return cancel_error + end + + return function( diff) + -- 'nil' allows 'key' to be numeric + -- suspends until our 'true' is in + if linda:send( nil, key, true) == cancel_error then + return cancel_error + end + local val = linda:get( key) + if val ~= cancel_error then + val = val + (diff or 1.0) + -- set() releases the lock by emptying queue + if linda:set( key, val) == cancel_error then + val = cancel_error + end + end + return val + end + end -- activate full interface lanes.require = core.require -- cgit v1.2.3-55-g6feb