From c38429579de57d338e0a1c14a7ed7b4aa90e059a Mon Sep 17 00:00:00 2001 From: Benoit Germain Date: Mon, 10 Feb 2014 17:18:14 +0100 Subject: new API linda:cancel("read"|"write"|"both"|"none") * bumped version to 3.8.4 * all linda operations return lanes.cancel_error on a cancelled linda * raised an internal string length so that longer linda names are fully output before truncation applies when doing tostring( linda) --- src/lanes.c | 405 ++++++++++++++++++++++++++++++++++++------------------------ 1 file changed, 245 insertions(+), 160 deletions(-) (limited to 'src') diff --git a/src/lanes.c b/src/lanes.c index 64c144c..cf88171 100644 --- a/src/lanes.c +++ b/src/lanes.c @@ -52,7 +52,7 @@ * ... */ -char const* VERSION = "3.8.3"; +char const* VERSION = "3.8.4"; /* =============================================================================== @@ -147,7 +147,7 @@ struct s_lane // M: sets to PENDING (before launching) // S: updates -> RUNNING/WAITING -> DONE/ERROR_ST/CANCELLED - SIGNAL_T * volatile waiting_on; + SIGNAL_T* volatile waiting_on; // // When status is WAITING, points on the linda's signal the thread waits on, else NULL @@ -414,10 +414,12 @@ static void lane_cleanup( struct s_lane* s) * Actual data is kept within a keeper state, which is hashed by the 's_Linda' * pointer (which is same to all userdatas pointing to it). */ -struct s_Linda { - SIGNAL_T read_happened; - SIGNAL_T write_happened; - char name[1]; +struct s_Linda +{ + SIGNAL_T read_happened; + SIGNAL_T write_happened; + enum e_cancel_request simulate_cancel; + char name[1]; }; static void linda_id( lua_State*, char const * const which); @@ -450,11 +452,11 @@ static void check_key_types( lua_State*L, int _start, int _end) */ LUAG_FUNC( linda_send) { - struct s_Linda *linda = lua_toLinda( L, 1); + struct s_Linda* linda = lua_toLinda( L, 1); bool_t ret; enum e_cancel_request cancel = CANCEL_NONE; int pushed; - time_d timeout= -1.0; + time_d timeout = -1.0; uint_t key_i = 2; // index of first key, if timeout not there luaL_argcheck( L, linda, 1, "expected a linda object!"); @@ -507,7 +509,6 @@ LUAG_FUNC( linda_send) // SIGNAL_ALL( &linda->write_happened); break; - } if( timeout == 0.0) { @@ -520,11 +521,15 @@ LUAG_FUNC( linda_send) struct s_lane* const s = get_lane_from_registry( L); if( s != NULL) { - cancel = s->cancel_request; // testing here causes no delays - if( cancel != CANCEL_NONE) // if user wants to cancel, the call returns without sending anything - { - break; - } + cancel = s->cancel_request; + } + cancel = (cancel != CANCEL_NONE) ? cancel : linda->simulate_cancel; + if( cancel != CANCEL_NONE) // if user wants to cancel, the call returns without sending anything + { + break; + } + if( s != NULL) + { // change status of lane to "waiting" prev_status = s->status; // RUNNING, most likely ASSERT_L( prev_status == RUNNING); // but check, just in case @@ -532,22 +537,22 @@ LUAG_FUNC( linda_send) ASSERT_L( s->waiting_on == NULL); s->waiting_on = &linda->read_happened; } - // could not send because no room: wait until some data was read before trying again, or until timeout is reached - if( !SIGNAL_WAIT( &linda->read_happened, &K->lock_, timeout)) { + // could not send because no room: wait until some data was read before trying again, or until timeout is reached + bool_t const signalled = SIGNAL_WAIT( &linda->read_happened, &K->lock_, timeout); if( s != NULL) { s->waiting_on = NULL; s->status = prev_status; - // if woken by a cancel request, be sure to handle it properly + // if a cancel request is pending, be sure to handle it as soon as possible cancel = s->cancel_request; } - break; - } - if( s != NULL) - { - s->waiting_on = NULL; - s->status = prev_status; + cancel = (cancel != CANCEL_NONE) ? cancel : linda->simulate_cancel; + if( !signalled || cancel != CANCEL_NONE) + { + // waiting returned after a timeout, or pending cancel: we are done + break; + } } } } @@ -593,7 +598,7 @@ LUAG_FUNC( linda_send) #define BATCH_SENTINEL "270e6c9d-280f-4983-8fee-a7ecdda01475" LUAG_FUNC( linda_receive) { - struct s_Linda *linda = lua_toLinda( L, 1); + struct s_Linda* linda = lua_toLinda( L, 1); int pushed, expected_pushed_min, expected_pushed_max; enum e_cancel_request cancel = CANCEL_NONE; keeper_api_t keeper_receive; @@ -650,7 +655,7 @@ LUAG_FUNC( linda_receive) } { - struct s_Keeper *K = keeper_acquire( linda); + struct s_Keeper* K = keeper_acquire( linda); if( K == NULL) return 0; for( ;;) { @@ -682,11 +687,15 @@ LUAG_FUNC( linda_receive) struct s_lane* const s = get_lane_from_registry( L); if( s != NULL) { - cancel = s->cancel_request; // testing here causes no delays - if( cancel != CANCEL_NONE) // if user wants to cancel, the call returns without providing anything - { - break; - } + cancel = s->cancel_request; + } + cancel = (cancel != CANCEL_NONE) ? cancel : linda->simulate_cancel; + if( cancel != CANCEL_NONE) // if user wants to cancel, the call returns without providing anything + { + break; + } + if( s != NULL) + { // change status of lane to "waiting" prev_status = s->status; // RUNNING, most likely ASSERT_L( prev_status == RUNNING); // but check, just in case @@ -694,22 +703,22 @@ LUAG_FUNC( linda_receive) ASSERT_L( s->waiting_on == NULL); s->waiting_on = &linda->write_happened; } - // not enough data to read: wakeup when data was sent, or when timeout is reached - if( !SIGNAL_WAIT( &linda->write_happened, &K->lock_, timeout)) { + // not enough data to read: wakeup when data was sent, or when timeout is reached + bool_t const signalled = SIGNAL_WAIT( &linda->write_happened, &K->lock_, timeout); if( s != NULL) { s->waiting_on = NULL; s->status = prev_status; - // if woken by a cancel request, be sure to handle it properly + // if a cancel request is pending, be sure to handle it as soon as possible cancel = s->cancel_request; } - break; - } - if( s != NULL) - { - s->waiting_on = NULL; - s->status = prev_status; + cancel = (cancel != CANCEL_NONE) ? cancel : linda->simulate_cancel; + if( !signalled || cancel != CANCEL_NONE) + { + // waiting returned after a timeout, or pending cancel: we are done + break; + } } } } @@ -740,7 +749,7 @@ LUAG_FUNC( linda_receive) /* -* [true] = linda_set( linda_ud, key_num|str|bool|lightuserdata [, value [, ...]]) +* [true|lanes.cancel_error] = linda_set( linda_ud, key_num|str|bool|lightuserdata [, value [, ...]]) * * Set one or more value to Linda. * TODO: what do we do if we set to non-nil and limit is 0? @@ -760,28 +769,38 @@ LUAG_FUNC( linda_set) { struct s_Keeper* K = keeper_acquire( linda); if( K == NULL) return 0; - if( has_value) - { - // convert nils to some special non-nil sentinel in sent values - keeper_toggle_nil_sentinels( L, 3, eLM_ToKeeper); - } - pushed = keeper_call( K->L, KEEPER_API( set), L, linda, 2); - if( pushed >= 0) // no error? - { - ASSERT_L( pushed == 0 || pushed == 1); + if( linda->simulate_cancel == CANCEL_NONE) + { if( has_value) { - // we put some data in the slot, tell readers that they should wake - SIGNAL_ALL( &linda->write_happened); // To be done from within the 'K' locking area + // convert nils to some special non-nil sentinel in sent values + keeper_toggle_nil_sentinels( L, 3, eLM_ToKeeper); } - if( pushed == 1) + pushed = keeper_call( K->L, KEEPER_API( set), L, linda, 2); + if( pushed >= 0) // no error? { - // the key was full, but it is no longer the case, tell writers they should wake - ASSERT_L( lua_type( L, -1) == LUA_TBOOLEAN && lua_toboolean( L, -1) == 1); - SIGNAL_ALL( &linda->read_happened); // To be done from within the 'K' locking area + ASSERT_L( pushed == 0 || pushed == 1); + + if( has_value) + { + // we put some data in the slot, tell readers that they should wake + SIGNAL_ALL( &linda->write_happened); // To be done from within the 'K' locking area + } + if( pushed == 1) + { + // the key was full, but it is no longer the case, tell writers they should wake + ASSERT_L( lua_type( L, -1) == LUA_TBOOLEAN && lua_toboolean( L, -1) == 1); + SIGNAL_ALL( &linda->read_happened); // To be done from within the 'K' locking area + } } } + else // linda is cancelled + { + // do nothing and return lanes.cancel_error + lua_pushlightuserdata( L, CANCEL_ERROR); + pushed = 1; + } keeper_release( K); } @@ -837,10 +856,20 @@ LUAG_FUNC( linda_get) { struct s_Keeper* K = keeper_acquire( linda); if( K == NULL) return 0; - pushed = keeper_call( K->L, KEEPER_API( get), L, linda, 2); - if( pushed > 0) + + if( linda->simulate_cancel == CANCEL_NONE) + { + pushed = keeper_call( K->L, KEEPER_API( get), L, linda, 2); + if( pushed > 0) + { + keeper_toggle_nil_sentinels( L, lua_gettop( L) - pushed, eLM_FromKeeper); + } + } + else // linda is cancelled { - keeper_toggle_nil_sentinels( L, lua_gettop( L) - pushed, eLM_FromKeeper); + // do nothing and return lanes.cancel_error + lua_pushlightuserdata( L, CANCEL_ERROR); + pushed = 1; } keeper_release( K); // must trigger error after keeper state has been released @@ -892,6 +921,59 @@ LUAG_FUNC( linda_limit) } +/* +* (void) = linda_cancel( linda_ud, "read"|"write"|"both"|"none") +* +* Signal linda so that waiting threads wake up as if their own lane was cancelled +*/ +LUAG_FUNC( linda_cancel) +{ + struct s_Linda* linda = lua_toLinda( L, 1); + char const* who = luaL_checkstring( L, 2); + struct s_Keeper* K; + + // make sure we got 3 arguments: the linda, a key and a limit + luaL_argcheck( L, linda, 1, "expected a linda object!"); + luaL_argcheck( L, lua_gettop( L) == 2, 2, "wrong number of arguments"); + + // signalling must be done from inside the K locking area + K = keeper_acquire( linda); + if( K == NULL) return 0; + + linda->simulate_cancel = CANCEL_SOFT; + if( strcmp( who, "both") == 0) // tell everyone writers to wake up + { + SIGNAL_ALL( &linda->write_happened); + SIGNAL_ALL( &linda->read_happened); + } + else if( strcmp( who, "none") == 0) // reset flag + { + linda->simulate_cancel = CANCEL_NONE; + } + else if( strcmp( who, "read") == 0) // tell blocked readers to wake up + { + SIGNAL_ALL( &linda->write_happened); + } + else if( strcmp( who, "write") == 0) // tell blocked writers to wake up + { + SIGNAL_ALL( &linda->read_happened); + } + else + { + // error ... + linda = NULL; + } + keeper_release( K); + + // ... but we must raise it outside the lock + if( !linda) + { + return luaL_error( L, "unknown wake hint '%s'", who); + } + return 0; +} + + /* * lightuserdata= linda_deep( linda_ud ) * @@ -918,16 +1000,16 @@ LUAG_FUNC( linda_deep ) { * Useful for concatenation or debugging purposes */ -static int linda_tostring( lua_State* L, int _idx, bool_t _opt) +static int linda_tostring( lua_State* L, int idx_, bool_t opt_) { - struct s_Linda* linda = lua_toLinda( L, _idx); - if( !_opt) + struct s_Linda* linda = lua_toLinda( L, idx_); + if( !opt_) { - luaL_argcheck( L, linda, _idx, "expected a linda object!"); + luaL_argcheck( L, linda, idx_, "expected a linda object!"); } if( linda != NULL) { - char text[32]; + char text[128]; int len; if( linda->name[0]) len = sprintf( text, "Linda: %.*s", (int)sizeof(text) - 8, linda->name); @@ -1008,120 +1090,123 @@ LUAG_FUNC( linda_dump) * For any other strings, the ID function must not react at all. This allows * future extensions of the system. */ -static void linda_id( lua_State*L, char const * const which) +static void linda_id( lua_State* L, char const* const which) { - if (strcmp( which, "new" )==0) - { - struct s_Linda *s; - size_t name_len = 0; - char const* linda_name = NULL; - int const top = lua_gettop( L); + if( strcmp( which, "new" ) == 0) + { + struct s_Linda* s; + size_t name_len = 0; + char const* linda_name = NULL; + int const top = lua_gettop( L); - if( top > 0 && lua_type( L, top) == LUA_TSTRING) - { - linda_name = lua_tostring( L, top); - name_len = strlen( linda_name); - } + if( top > 0 && lua_type( L, top) == LUA_TSTRING) + { + linda_name = lua_tostring( L, top); + name_len = strlen( linda_name); + } - /* The deep data is allocated separately of Lua stack; we might no - * longer be around when last reference to it is being released. - * One can use any memory allocation scheme. - */ - s= (struct s_Linda *) malloc( sizeof(struct s_Linda) + name_len); // terminating 0 is already included - ASSERT_L(s); + /* The deep data is allocated separately of Lua stack; we might no + * longer be around when last reference to it is being released. + * One can use any memory allocation scheme. + */ + s = (struct s_Linda*) malloc( sizeof(struct s_Linda) + name_len); // terminating 0 is already included + ASSERT_L( s); - SIGNAL_INIT( &s->read_happened ); - SIGNAL_INIT( &s->write_happened ); - s->name[0] = 0; - memcpy( s->name, linda_name, name_len ? name_len + 1 : 0); + SIGNAL_INIT( &s->read_happened); + SIGNAL_INIT( &s->write_happened); + s->simulate_cancel = CANCEL_NONE; + s->name[0] = 0; + memcpy( s->name, linda_name, name_len ? name_len + 1 : 0); - lua_pushlightuserdata( L, s ); - } - else if( strcmp( which, "delete" ) == 0) - { - struct s_Keeper* K; - struct s_Linda* l= lua_touserdata( L, 1); - ASSERT_L( l); - - /* Clean associated structures in the keeper state. - */ - K = keeper_acquire( l); - if( K && K->L) // can be NULL if this happens during main state shutdown (lanes is GC'ed -> no keepers -> no need to cleanup) - { - keeper_call( K->L, KEEPER_API( clear), L, l, 0); - } - keeper_release( K); - - /* There aren't any lanes waiting on these lindas, since all proxies - * have been gc'ed. Right? - */ - SIGNAL_FREE( &l->read_happened); - SIGNAL_FREE( &l->write_happened); - free( l); - } - else if (strcmp( which, "metatable" )==0) - { + lua_pushlightuserdata( L, s); + } + else if( strcmp( which, "delete") == 0) + { + struct s_Keeper* K; + struct s_Linda* l = lua_touserdata( L, 1); + ASSERT_L( l); - STACK_CHECK( L); - lua_newtable(L); - // metatable is its own index - lua_pushvalue( L, -1); - lua_setfield( L, -2, "__index"); + /* Clean associated structures in the keeper state. + */ + K = keeper_acquire( l); + if( K && K->L) // can be NULL if this happens during main state shutdown (lanes is GC'ed -> no keepers -> no need to cleanup) + { + keeper_call( K->L, KEEPER_API( clear), L, l, 0); + } + keeper_release( K); - // protect metatable from external access - lua_pushliteral( L, "Linda"); - lua_setfield( L, -2, "__metatable"); + /* There aren't any lanes waiting on these lindas, since all proxies + * have been gc'ed. Right? + */ + SIGNAL_FREE( &l->read_happened); + SIGNAL_FREE( &l->write_happened); + free( l); + } + else if( strcmp( which, "metatable" ) == 0) + { - lua_pushcfunction( L, LG_linda_tostring); - lua_setfield( L, -2, "__tostring"); + STACK_CHECK( L); + lua_newtable( L); + // metatable is its own index + lua_pushvalue( L, -1); + lua_setfield( L, -2, "__index"); - // Decoda __towatch support - lua_pushcfunction( L, LG_linda_dump); - lua_setfield( L, -2, "__towatch"); + // protect metatable from external access + lua_pushliteral( L, "Linda"); + lua_setfield( L, -2, "__metatable"); - lua_pushcfunction( L, LG_linda_concat); - lua_setfield( L, -2, "__concat"); + lua_pushcfunction( L, LG_linda_tostring); + lua_setfield( L, -2, "__tostring"); - // - // [-1]: linda metatable - lua_pushcfunction( L, LG_linda_send ); - lua_setfield( L, -2, "send" ); + // Decoda __towatch support + lua_pushcfunction( L, LG_linda_dump); + lua_setfield( L, -2, "__towatch"); - lua_pushcfunction( L, LG_linda_receive ); - lua_setfield( L, -2, "receive" ); + lua_pushcfunction( L, LG_linda_concat); + lua_setfield( L, -2, "__concat"); - lua_pushcfunction( L, LG_linda_limit ); - lua_setfield( L, -2, "limit" ); + // [-1]: linda metatable + lua_pushcfunction( L, LG_linda_send); + lua_setfield( L, -2, "send"); - lua_pushcfunction( L, LG_linda_set ); - lua_setfield( L, -2, "set" ); - - lua_pushcfunction( L, LG_linda_count ); - lua_setfield( L, -2, "count" ); - - lua_pushcfunction( L, LG_linda_get ); - lua_setfield( L, -2, "get" ); + lua_pushcfunction( L, LG_linda_receive); + lua_setfield( L, -2, "receive"); - lua_pushcfunction( L, LG_linda_deep ); - lua_setfield( L, -2, "deep" ); + lua_pushcfunction( L, LG_linda_limit); + lua_setfield( L, -2, "limit"); - lua_pushcfunction( L, LG_linda_dump); - lua_setfield( L, -2, "dump" ); - - lua_pushliteral( L, BATCH_SENTINEL); - lua_setfield(L, -2, "batched"); + lua_pushcfunction( L, LG_linda_set); + lua_setfield( L, -2, "set"); - STACK_END( L, 1); - } - else if( strcmp( which, "module") == 0) - { - // linda is a special case because we know lanes must be loaded from the main lua state - // to be able to ever get here, so we know it will remain loaded as long a the main state is around - // in other words, forever. - lua_pushnil( L); - // other idfuncs must push a string naming the module they come from - //lua_pushliteral( L, "lanes.core"); - } + lua_pushcfunction( L, LG_linda_count); + lua_setfield( L, -2, "count"); + + lua_pushcfunction( L, LG_linda_get); + lua_setfield( L, -2, "get"); + + lua_pushcfunction( L, LG_linda_cancel); + lua_setfield( L, -2, "cancel"); + + lua_pushcfunction( L, LG_linda_deep); + lua_setfield( L, -2, "deep"); + + lua_pushcfunction( L, LG_linda_dump); + lua_setfield( L, -2, "dump"); + + lua_pushliteral( L, BATCH_SENTINEL); + lua_setfield(L, -2, "batched"); + + STACK_END( L, 1); + } + else if( strcmp( which, "module") == 0) + { + // linda is a special case because we know lanes must be loaded from the main lua state + // to be able to ever get here, so we know it will remain loaded as long a the main state is around + // in other words, forever. + lua_pushnil( L); + // other idfuncs must push a string naming the module they come from + //lua_pushliteral( L, "lanes.core"); + } } /* -- cgit v1.2.3-55-g6feb