From 242feeb342f68999b02c2b8dc4614abefdab8431 Mon Sep 17 00:00:00 2001 From: Benoit Germain Date: Mon, 10 Sep 2012 20:41:03 +0200 Subject: version 3.3.0 * lane.status can return "killed" if lane was forcefully killed with lanes:cancel() * lane:join(): return nil, "killed" if called on a killed lane. * lane[]: produces [1] = nil, [2] = "killed" if the lane was killed * lane:join(): fixed an assertion in debug builds when joining a lane forcefully cancelled with lane:cancel( , true). * indexing a lane with a string other than "join", "cancel" or "status" raises an error. * fixed configure() to correctly apply defaults when they are missing from the provided settings * added a shutdown_timeout to control the duration Lanes will wait for graceful termination of running lanes at application shutdown. Default is 0.25. Among other things, fixes issue #31. --- CHANGES | 10 ++ docs/index.html | 33 +++-- src/lanes.c | 417 ++++++++++++++++++++++++++++++---------------------- src/lanes.lua | 142 +++++++++++------- src/threading.h | 3 + tests/fibonacci.lua | 6 +- tests/finalizer.lua | 2 + 7 files changed, 369 insertions(+), 244 deletions(-) diff --git a/CHANGES b/CHANGES index cc7b4a2..36f06ab 100644 --- a/CHANGES +++ b/CHANGES @@ -1,5 +1,15 @@ CHANGES: +CHAGE 46: BGe 10-Sep-2012 + * version 3.3.0 + * lane.status can return "killed" if lane was forcefully killed with lanes:cancel() + * lane:join(): return nil, "killed" if called on a killed lane. + * lane[]: produces [1] = nil, [2] = "killed" if the lane was killed + * lane:join(): fixed an assertion in debug builds when joining a lane forcefully cancelled with lane:cancel( , true). + * indexing a lane with a string other than "join", "cancel" or "status" raises an error. + * fixed configure() to correctly apply defaults when they are missing from the provided settings + * added a shutdown_timeout to control the duration Lanes will wait for graceful termination of running lanes at application shutdown. Default is 0.25. + CHANGE 45: BGe 21-Aug-2012 * keeper internals implemented in C instead of Lua for better performances * fixed arguments checks in linda:limit() and linda:set() diff --git a/docs/index.html b/docs/index.html index 84ae5c0..932a712 100644 --- a/docs/index.html +++ b/docs/index.html @@ -56,7 +56,7 @@


Copyright © 2007-12 Asko Kauppi, Benoit Germain. All rights reserved.
Lua Lanes is published under the same MIT license as Lua 5.1 and 5.2. -

This document was revised on 21-Aug-12, and applies to version 3.2.0 +

This document was revised on 10-Sep-12, and applies to version 3.3.0

@@ -205,7 +205,7 @@ Or use Lua Rocks package m .nb_keepers
N - Controls the number of keeper states used internally by lindas to transfer data between lanes. (see below). Default is 1. + Controls the number of keeper states used internally by lindas to transfer data between lanes. (see below). Default is 1. @@ -213,7 +213,7 @@ Or use Lua Rocks package m If equal to false, Lanes doesn't start the timer service, and the associated API will be absent from the interface (see below). - Any other value (including nil), starts the timer service. + Any other value (including nil), starts the timer service. Default is true. @@ -223,9 +223,15 @@ Or use Lua Rocks package m If provided, will be called in every created Lua state (keepers and lanes) right after it is created, and *before* any library is loaded. - That way, all C functions it loads in the state can be added to the function lookup database. + That way, all C functions it loads in the state can be added to the function lookup database. Default is nil. + + + .shutdown_timeout
N + + (Since v3.3.0) Sets the duration in seconds Lanes will wait for graceful termination of running lanes at application shutdown. Irrelevant for builds using pthreads. Default is 0.25. +

Creation

@@ -292,7 +298,7 @@ also in the new lanes. "string"string.* namespace "table"table.* namespace
- "*"all standard libraries + "*"all standard libraries (including those specific to LuaJIT and not listed above)

@@ -318,7 +324,7 @@ also in the new lanes. .globals
globals_tbl Sets the globals table for the launched threads. This can be used for giving - them constants. + them constants. The key/value pairs of globals_tbl are transfered in the lane globals after the libraries have been loaded and the modules required.
The global values of different lanes are in no manner connected; modifying one will only affect the particular lane. @@ -354,7 +360,7 @@ also in the new lanes. package contents overrides, if needed. Specifying it when libs_str doesn't cause the package library to be loaded will generate an error. - If not specified, the created lane will receive the current values of package. Only path, cpath, preload and loaders are transfered. + If not specified, the created lane will receive the current values of package. Only path, cpath, preload and loaders are transfered.
@@ -406,9 +412,10 @@ member, providing one of these values: (2 "pending"not started, yet "running"running "waiting"waiting at a Linda :receive() or :send() - "done"finished executing (results are ready) + "done"finished executing (results are ready) "error"met an error (reading results will propagate it) "cancelled"received cancellation and finished itself + "killed"was forcefully killed by lane_h:cancel() (since v3.3.0)

This is similar to coroutine.status, which has: "running" / @@ -441,8 +448,9 @@ If the lane ended in an error, it is propagated to master state at this place.

Waits until the lane finishes, or timeout seconds have passed. Returns nil on timeout, nil,err,stack_tbl if the lane hit an error, -or the return values of the lane. Unlike in reading the results in table -fashion, errors are not propagated. +nil, "killed" if forcefully killed (starting with v3.3.0), or the return values of the lane. +Unlike in reading the results in table fashion, errors are not propagated. +

@@ -490,7 +498,7 @@ that id over a Linda once that thread is done (as the last thing you do).

Cancelling

- bool= lane_h:cancel( [timeout_secs=0.0,] [force_kill_bool=false] ) + bool[,reason]= lane_h:cancel( [timeout_secs=0.0,] [force_kill_bool=false] )

@@ -507,6 +515,9 @@ If the lane is still running and force_kill is true, the OS thread running the lane is forcefully killed. This means no GC, and should generally be the last resort.

+

+Starting with v3.3.0, if cancel() returns false, it also returns either "timeout" or "killed" as second return value. +

Cancellation is tested before going to sleep in receive() or send() calls and after executing cancelstep Lua statements. Starting with version 3.0-beta, a pending receive() or send() call is awakened. This means the execution of the lane will resume although the operation has diff --git a/src/lanes.c b/src/lanes.c index 9f455b2..462999f 100644 --- a/src/lanes.c +++ b/src/lanes.c @@ -47,16 +47,17 @@ * * To-do: * - * Make waiting threads cancelable. + * Make waiting threads cancellable. * ... */ -char const* VERSION = "3.2.0"; +char const* VERSION = "3.3.0"; /* =============================================================================== Copyright (C) 2007-10 Asko Kauppi + 2011-12 Benoit Germain Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal @@ -136,12 +137,12 @@ struct s_lane { // S: reads to see if cancel is requested #if THREADWAIT_METHOD == THREADWAIT_CONDVAR - SIGNAL_T done_signal_; + SIGNAL_T done_signal; // // M: Waited upon at lane ending (if Posix with no PTHREAD_TIMEDJOIN) // S: sets the signal once cancellation is noticed (avoids a kill) - MUTEX_T done_lock_; + MUTEX_T done_lock; // // Lock required by 'done_signal' condition variable, protecting // lane status changes to DONE/ERROR_ST/CANCELLED. @@ -256,7 +257,7 @@ static void check_key_types( lua_State *L, int _start, int _end) { continue; } - luaL_error( L, "argument #%d: invalid key type (not a boolean, string, number or light userdata)", i); + (void) luaL_error( L, "argument #%d: invalid key type (not a boolean, string, number or light userdata)", i); } } @@ -295,7 +296,7 @@ LUAG_FUNC( linda_send) // make sure there is something to send if( (uint_t)lua_gettop( L) == key_i) { - luaL_error( L, "no data to send"); + return luaL_error( L, "no data to send"); } // convert nils to some special non-nil sentinel in sent values @@ -353,7 +354,8 @@ LUAG_FUNC( linda_send) STACK_END(L,0) if( s) { - prev_status = s->status; + prev_status = s->status; // RUNNING, most likely + ASSERT_L( prev_status == RUNNING); // but check, just in case s->status = WAITING; ASSERT_L( s->waiting_on == NULL); s->waiting_on = &linda->read_happened; @@ -382,7 +384,7 @@ LUAG_FUNC( linda_send) // must trigger error after keeper state has been released if( pushed < 0) { - luaL_error( L, "tried to copy unsupported types"); + return luaL_error( L, "tried to copy unsupported types"); } if( cancel) @@ -446,7 +448,7 @@ LUAG_FUNC( linda_receive) expected_pushed_max = (int)luaL_optinteger( L, key_i + 2, expected_pushed_min); if( expected_pushed_min > expected_pushed_max) { - luaL_error( L, "batched min/max error"); + return luaL_error( L, "batched min/max error"); } } else @@ -507,7 +509,8 @@ LUAG_FUNC( linda_receive) STACK_END(L, 0) if( s) { - prev_status = s->status; + prev_status = s->status; // RUNNING, most likely + ASSERT_L( prev_status == RUNNING); // but check, just in case s->status = WAITING; ASSERT_L( s->waiting_on == NULL); s->waiting_on = &linda->write_happened; @@ -535,7 +538,7 @@ LUAG_FUNC( linda_receive) // must trigger error after keeper state has been released if( pushed < 0) { - luaL_error( L, "tried to copy unsupported types"); + return luaL_error( L, "tried to copy unsupported types"); } if( cancel) @@ -583,7 +586,7 @@ LUAG_FUNC( linda_set) // must trigger error after keeper state has been released if( pushed < 0) { - luaL_error( L, "tried to copy unsupported types"); + return luaL_error( L, "tried to copy unsupported types"); } } @@ -611,7 +614,7 @@ LUAG_FUNC( linda_count) keeper_release( K); if( pushed < 0) { - luaL_error( L, "tried to count an invalid key"); + return luaL_error( L, "tried to count an invalid key"); } } return pushed; @@ -645,7 +648,7 @@ LUAG_FUNC( linda_get) // must trigger error after keeper state has been released if( pushed < 0) { - luaL_error( L, "tried to copy unsupported types"); + return luaL_error( L, "tried to copy unsupported types"); } } @@ -678,7 +681,7 @@ LUAG_FUNC( linda_limit) // must trigger error after keeper state has been released if( pushed < 0) { - luaL_error( L, "tried to copy unsupported types"); + return luaL_error( L, "tried to copy unsupported types"); } } @@ -1125,11 +1128,9 @@ static int selfdestruct_gc( lua_State *L) // Tested on MacBook Core Duo 2GHz and 10.5.5: // -- AKa 25-Oct-2008 // - #ifndef ATEXIT_WAIT_SECS - # define ATEXIT_WAIT_SECS (0.25) - #endif { - double t_until= now_secs() + ATEXIT_WAIT_SECS; + lua_Number const shutdown_timeout = lua_tonumber( L, lua_upvalueindex( 1)); + double const t_until = now_secs() + shutdown_timeout; while( selfdestruct_first != SELFDESTRUCT_END ) { @@ -1153,7 +1154,7 @@ static int selfdestruct_gc( lua_State *L) t_now = now_secs(); if( n == 0 || ( t_now >= t_until)) { - DEBUGEXEC(fprintf( stderr, "%d uncancelled lane(s) remain after waiting %fs at process end.\n", n, ATEXIT_WAIT_SECS - (t_until - t_now))); + DEBUGEXEC(fprintf( stderr, "%d uncancelled lane(s) remain after waiting %fs at process end.\n", n, shutdown_timeout - (t_until - t_now))); break; } } @@ -1201,13 +1202,13 @@ static int selfdestruct_gc( lua_State *L) THREAD_KILL( &s->thread); #if THREADAPI == THREADAPI_PTHREAD // pthread: make sure the thread is really stopped! - THREAD_WAIT( &s->thread, -1, &s->done_signal_, &s->done_lock_, &s->status); + THREAD_WAIT( &s->thread, -1, &s->done_signal, &s->done_lock, &s->status); #endif // THREADAPI == THREADAPI_PTHREAD } // NO lua_close() in this case because we don't know where execution of the state was interrupted #if THREADWAIT_METHOD == THREADWAIT_CONDVAR - SIGNAL_FREE( &s->done_signal_); - MUTEX_FREE( &s->done_lock_); + SIGNAL_FREE( &s->done_signal); + MUTEX_FREE( &s->done_lock); #endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR free( s); s = next_s; @@ -1288,22 +1289,26 @@ LUAG_FUNC( cancel_test) // Limits the process to use only 'cores' CPU cores. To be used for performance // testing on multicore devices. DEBUGGING ONLY! // -LUAG_FUNC( _single ) { - uint_t cores= luaG_optunsigned(L,1,1); +LUAG_FUNC( set_singlethreaded) +{ + uint_t cores = luaG_optunsigned( L, 1, 1); + (void) cores; // prevent "unused" warning #ifdef PLATFORM_OSX - #ifdef _UTILBINDTHREADTOCPU - if (cores > 1) luaL_error( L, "Limiting to N>1 cores not possible." ); - // requires 'chudInitialize()' - utilBindThreadToCPU(0); // # of CPU to run on (we cannot limit to 2..N CPUs?) - #else - luaL_error( L, "Not available: compile with _UTILBINDTHREADTOCPU" ); - #endif +#ifdef _UTILBINDTHREADTOCPU + if( cores > 1) + { + return luaL_error( L, "Limiting to N>1 cores not possible"); + } + // requires 'chudInitialize()' + utilBindThreadToCPU(0); // # of CPU to run on (we cannot limit to 2..N CPUs?) #else - luaL_error( L, "not implemented!" ); + return luaL_error( L, "Not available: compile with _UTILBINDTHREADTOCPU"); #endif - (void)cores; - +#else + return luaL_error( L, "not implemented"); +#endif + return 0; } @@ -1592,8 +1597,8 @@ static THREAD_RETURN_T THREAD_CALLCONV lane_main( void *vs) s->L = L = 0; #if THREADWAIT_METHOD == THREADWAIT_CONDVAR - SIGNAL_FREE( &s->done_signal_); - MUTEX_FREE( &s->done_lock_); + SIGNAL_FREE( &s->done_signal); + MUTEX_FREE( &s->done_lock); #endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR free(s); @@ -1611,14 +1616,14 @@ static THREAD_RETURN_T THREAD_CALLCONV lane_main( void *vs) // 'done_lock' protects the -> DONE|ERROR_ST|CANCELLED state change // #if THREADWAIT_METHOD == THREADWAIT_CONDVAR - MUTEX_LOCK( &s->done_lock_); + MUTEX_LOCK( &s->done_lock); { #endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR s->status = st; #if THREADWAIT_METHOD == THREADWAIT_CONDVAR - SIGNAL_ONE( &s->done_signal_); // wake up master (while 's->done_lock' is on) + SIGNAL_ONE( &s->done_signal); // wake up master (while 's->done_lock' is on) } - MUTEX_UNLOCK( &s->done_lock_); + MUTEX_UNLOCK( &s->done_lock); #endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR } return 0; // ignored @@ -1680,8 +1685,7 @@ LUAG_FUNC( thread_new ) if (prio < THREAD_PRIO_MIN || prio > THREAD_PRIO_MAX) { - luaL_error( L, "Priority out of range: %d..+%d (%d)", - THREAD_PRIO_MIN, THREAD_PRIO_MAX, prio ); + return luaL_error( L, "Priority out of range: %d..+%d (%d)", THREAD_PRIO_MIN, THREAD_PRIO_MAX, prio); } /* --- Create and prepare the sub state --- */ @@ -1689,7 +1693,10 @@ LUAG_FUNC( thread_new ) // populate with selected libraries at the same time // L2 = luaG_newstate( libs, on_state_create); - if (!L2) luaL_error( L, "'luaL_newstate()' failed; out of memory" ); + if (!L2) + { + return luaL_error( L, "'luaL_newstate()' failed; out of memory"); + } STACK_GROW( L, 2); STACK_GROW( L2, 3); @@ -1701,8 +1708,10 @@ LUAG_FUNC( thread_new ) STACK_CHECK(L2) if( package) { - if (lua_type(L,package) != LUA_TTABLE) - luaL_error( L, "expected package as table, got %s", luaL_typename(L,package)); + if( lua_type( L, package) != LUA_TTABLE) + { + return luaL_error( L, "expected package as table, got %s", luaL_typename( L, package)); + } lua_getglobal( L2, "package"); if( !lua_isnil( L2, -1)) // package library not loaded: do nothing { @@ -1746,13 +1755,15 @@ LUAG_FUNC( thread_new ) int nbRequired = 1; // should not happen, was checked in lanes.lua before calling thread_new() if (lua_type(L, required) != LUA_TTABLE) - luaL_error( L, "expected required module list as a table, got %s", luaL_typename( L, required)); + { + return luaL_error( L, "expected required module list as a table, got %s", luaL_typename( L, required)); + } lua_pushnil( L); while( lua_next( L, required) != 0) { if (lua_type(L,-1) != LUA_TSTRING || lua_type(L,-2) != LUA_TNUMBER || lua_tonumber( L, -2) != nbRequired) { - luaL_error( L, "required module list should be a list of strings."); + return luaL_error( L, "required module list should be a list of strings"); } else { @@ -1772,8 +1783,10 @@ LUAG_FUNC( thread_new ) { STACK_CHECK(L) STACK_CHECK(L2) - if (!lua_istable(L,glob)) - luaL_error( L, "Expected table, got %s", luaL_typename(L,glob)); + if( !lua_istable( L, glob)) + { + return luaL_error( L, "Expected table, got %s", luaL_typename(L,glob)); + } lua_pushnil( L); lua_pushglobaltable( L2); // Lua 5.2 wants us to push the globals table on the stack @@ -1799,7 +1812,9 @@ LUAG_FUNC( thread_new ) { lua_pushvalue( L, 1); if( luaG_inter_move( L, L2, 1) != 0) // L->L2 - luaL_error( L, "tried to copy unsupported types"); + { + return luaL_error( L, "tried to copy unsupported types"); + } STACK_MID(L,0) } else if( lua_type(L, 1) == LUA_TSTRING) @@ -1807,7 +1822,7 @@ LUAG_FUNC( thread_new ) // compile the string if( luaL_loadstring( L2, lua_tostring( L, 1)) != 0) { - luaL_error( L, "error when parsing lane function code"); + return luaL_error( L, "error when parsing lane function code"); } } @@ -1817,7 +1832,9 @@ LUAG_FUNC( thread_new ) // revive arguments // if( (args > 0) && (luaG_inter_copy( L, L2, args) != 0)) // L->L2 - luaL_error( L, "tried to copy unsupported types"); + { + return luaL_error( L, "tried to copy unsupported types"); + } STACK_MID(L,0) ASSERT_L( (uint_t)lua_gettop(L2) == 1+args ); @@ -1839,8 +1856,8 @@ LUAG_FUNC( thread_new ) s->cancel_request= FALSE; #if THREADWAIT_METHOD == THREADWAIT_CONDVAR - MUTEX_INIT( &s->done_lock_); - SIGNAL_INIT( &s->done_signal_); + MUTEX_INIT( &s->done_lock); + SIGNAL_INIT( &s->done_signal); #endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR s->mstatus= NORMAL; s->selfdestruct_next= NULL; @@ -1884,50 +1901,44 @@ LUAG_FUNC( thread_new ) // * Why NOT cancel/kill a loose thread: // // At least timer system uses a free-running thread, they should be handy -// and the issue of cancelling/killing threads at gc is not very nice, either +// and the issue of canceling/killing threads at gc is not very nice, either // (would easily cause waits at gc cycle, which we don't want). // -// * Why YES kill a loose thread: -// -// Current way causes segfaults at program exit, if free-running threads are -// in certain stages. Details are not clear, but this is the core reason. -// If gc would kill threads then at process exit only one thread would remain. -// -// Todo: Maybe we should have a clear #define for selecting either behaviour. -// -LUAG_FUNC( thread_gc ) +LUAG_FUNC( thread_gc) { - struct s_lane *s= lua_toLane(L,1); + struct s_lane* s = lua_toLane( L, 1); // We can read 's->status' without locks, but not wait for it - // - if (s->status < DONE) - { - // - selfdestruct_add(s); - assert( s->selfdestruct_next ); - return 0; - - } - else if (s->mstatus==KILLED) + // test KILLED state first, as it doesn't need to enter the selfdestruct chain + if( s->mstatus == KILLED) { // Make sure a kill has proceeded, before cleaning up the data structure. // // NO lua_close() in this case because we don't know where execution of the state was interrupted - // If not doing 'THREAD_WAIT()' we should close the Lua state here - // (can it be out of order, since we killed the lane abruptly?) - // -#if 0 - lua_close( s->L ); - s->L = 0; -#else // 0 DEBUGEXEC(fprintf( stderr, "** Joining with a killed thread (needs testing) **" )); - THREAD_WAIT( &s->thread, -1, &s->done_signal_, &s->done_lock_, &s->status); + // make sure the thread is no longer running, just like thread_join() + if(! THREAD_ISNULL( s->thread)) + THREAD_WAIT( &s->thread, -1, &s->done_signal, &s->done_lock, &s->status); + // we know the thread was killed while the Lua VM was not doing anything: we should be able to close it without crashing + // now, thread_cancel() will not forcefully kill a lane with s->status >= DONE, so I am not sure it can ever happen + if( s->status >= DONE && s->L) + { + lua_close( s->L); + s->L = 0; + } DEBUGEXEC(fprintf( stderr, "** Joined ok **" )); -#endif // 0 + } + else if( s->status < DONE) + { + // still running: will have to be cleaned up later + selfdestruct_add( s); + assert( s->selfdestruct_next); + return 0; + } else if( s->L) { + // no longer accessing the Lua VM: we can close right now lua_close( s->L); s->L = 0; } @@ -1935,12 +1946,11 @@ LUAG_FUNC( thread_gc ) // Clean up after a (finished) thread // #if THREADWAIT_METHOD == THREADWAIT_CONDVAR - SIGNAL_FREE( &s->done_signal_); - MUTEX_FREE( &s->done_lock_); + SIGNAL_FREE( &s->done_signal); + MUTEX_FREE( &s->done_lock); #endif // THREADWAIT_METHOD == THREADWAIT_CONDVAR free( s); - return 0; } @@ -1961,17 +1971,29 @@ LUAG_FUNC( thread_gc ) // managed to cancel it. // false if the cancellation timed out, or a kill was needed. // -static bool_t thread_cancel( struct s_lane *s, double secs, bool_t force) + +typedef enum +{ + CR_Timeout, + CR_Cancelled, + CR_Killed +} cancel_result; + +static cancel_result thread_cancel( struct s_lane *s, double secs, bool_t force) { - bool_t done= TRUE; + cancel_result result; + + // remember that lanes are not transferable: only one thread can cancel a lane, so no multithreading issue here // We can read 's->status' without locks, but not wait for it (if Posix no PTHREAD_TIMEDJOIN) - // - if( s->status < DONE) + if( s->mstatus == KILLED) + { + result = CR_Killed; + } + else if( s->status < DONE) { s->cancel_request = TRUE; // it's now signaled to stop // signal the linda the wake up the thread so that it can react to the cancel query // let us hope we never land here with a pointer on a linda that has been destroyed... - //MUTEX_LOCK( &selfdestruct_cs ); { SIGNAL_T *waiting_on = s->waiting_on; if( s->status == WAITING && waiting_on != NULL) @@ -1979,10 +2001,10 @@ static bool_t thread_cancel( struct s_lane *s, double secs, bool_t force) SIGNAL_ALL( waiting_on); } } - //MUTEX_UNLOCK( &selfdestruct_cs ); - done = THREAD_WAIT( &s->thread, secs, &s->done_signal_, &s->done_lock_, &s->status); - if ((!done) && force) + result = THREAD_WAIT( &s->thread, secs, &s->done_signal, &s->done_lock, &s->status) ? CR_Cancelled : CR_Timeout; + + if( (result == CR_Timeout) && force) { // Killing is asynchronous; we _will_ wait for it to be done at // GC, to make sure the data structure can be released (alternative @@ -1990,10 +2012,18 @@ static bool_t thread_cancel( struct s_lane *s, double secs, bool_t force) // PThread seems to have). // THREAD_KILL( &s->thread); - s->mstatus= KILLED; // mark 'gc' to wait for it + s->mstatus = KILLED; // mark 'gc' to wait for it + // note that s->status value must remain to whatever it was at the time of the kill + // because we need to know if we can lua_close() the Lua State or not. + result = CR_Killed; } } - return done; + else + { + // say "ok" by default, including when lane is already done + result = CR_Cancelled; + } + return result; } LUAG_FUNC( thread_cancel) @@ -2004,10 +2034,11 @@ LUAG_FUNC( thread_cancel) } else { - struct s_lane *s = lua_toLane( L, 1); + struct s_lane* s = lua_toLane( L, 1); double secs = 0.0; uint_t force_i = 2; - bool_t force, done= TRUE; + cancel_result result; + bool_t force; if( lua_isnumber( L, 2)) { @@ -2015,15 +2046,32 @@ LUAG_FUNC( thread_cancel) ++ force_i; } else if( lua_isnil( L, 2)) + { ++ force_i; + } force = lua_toboolean( L, force_i); // FALSE if nothing there - done = thread_cancel( s, secs, force); - - lua_pushboolean( L, done); - return 1; + result = thread_cancel( s, secs, force); + switch( result) + { + case CR_Timeout: + lua_pushboolean( L, 0); + lua_pushstring( L, "timeout"); + return 2; + + case CR_Cancelled: + lua_pushboolean( L, 1); + return 1; + + case CR_Killed: + lua_pushboolean( L, 0); + lua_pushstring( L, "killed"); + return 2; + } } + // should never happen, only here to prevent the compiler from complaining of "not all control paths returning a value" + return 0; } //--- @@ -2039,12 +2087,9 @@ LUAG_FUNC( thread_cancel) static char const * thread_status_string( struct s_lane *s) { enum e_status st = s->status; // read just once (volatile) - char const * str; - - if (s->mstatus == KILLED) - st= CANCELLED; - - str= (st==PENDING) ? "pending" : + char const * str = + (s->mstatus == KILLED) ? "killed" : // new to v3.3.0! + (st==PENDING) ? "pending" : (st==RUNNING) ? "running" : // like in 'co.status()' (st==WAITING) ? "waiting" : (st==DONE) ? "done" : @@ -2053,12 +2098,13 @@ static char const * thread_status_string( struct s_lane *s) return str; } -static void push_thread_status( lua_State *L, struct s_lane *s) +static int push_thread_status( lua_State *L, struct s_lane *s) { char const * const str = thread_status_string( s); ASSERT_L( str); lua_pushstring( L, str ); + return 1; } @@ -2070,15 +2116,15 @@ static void push_thread_status( lua_State *L, struct s_lane *s) // error: returns nil + error value + stack table // cancelled: returns nil // -LUAG_FUNC( thread_join ) +LUAG_FUNC( thread_join) { - struct s_lane *s= lua_toLane(L,1); + struct s_lane* const s = lua_toLane( L, 1); double wait_secs= luaL_optnumber(L,2,-1.0); lua_State *L2= s->L; int ret; bool_t done; - done = THREAD_ISNULL( s->thread) || THREAD_WAIT( &s->thread, wait_secs, &s->done_signal_, &s->done_lock_, &s->status); + done = THREAD_ISNULL( s->thread) || THREAD_WAIT( &s->thread, wait_secs, &s->done_signal, &s->done_lock, &s->status); if (!done || !L2) return 0; // timeout: pushes none, leaves 'L2' alive @@ -2086,34 +2132,49 @@ LUAG_FUNC( thread_join ) STACK_GROW( L, 1); - switch( s->status) + if( s->mstatus == KILLED) // OS thread was killed if thread_cancel was forced { - case DONE: + // in that case, even if the thread was killed while DONE/ERROR_ST/CANCELLED, ignore regular return values + + lua_pushnil( L); + lua_pushliteral( L, "killed"); + ret = 2; + } + else + { + switch( s->status) { - uint_t n = lua_gettop( L2); // whole L2 stack - if( (n > 0) && (luaG_inter_move( L2, L, n) != 0)) - luaL_error( L, "tried to copy unsupported types"); - ret = n; - } - break; + case DONE: + { + uint_t n = lua_gettop( L2); // whole L2 stack + if( (n > 0) && (luaG_inter_move( L2, L, n) != 0)) + { + return luaL_error( L, "tried to copy unsupported types"); + } + ret = n; + } + break; - case ERROR_ST: - lua_pushnil( L); - if( luaG_inter_move( L2, L, 2) != 0) // error message at [-2], stack trace at [-1] - luaL_error( L, "tried to copy unsupported types"); - ret= 3; - break; - - case CANCELLED: - ret= 0; - break; - - default: - DEBUGEXEC(fprintf( stderr, "Status: %d\n", s->status)); - ASSERT_L( FALSE ); ret= 0; + case ERROR_ST: + lua_pushnil( L); + if( luaG_inter_move( L2, L, 2) != 0) // error message at [-2], stack trace at [-1] + { + return luaL_error( L, "tried to copy unsupported types"); + } + ret= 3; + break; + + case CANCELLED: + ret= 0; + break; + + default: + DEBUGEXEC(fprintf( stderr, "Status: %d\n", s->status)); + ASSERT_L( FALSE ); ret= 0; + } + lua_close( L2); } - lua_close( L2); - s->L = L2 = 0; + s->L = 0; return ret; } @@ -2131,7 +2192,7 @@ LUAG_FUNC( thread_index) { int const UD = 1; int const KEY = 2; - int const ENV = 3; + int const USR = 3; struct s_lane *s = lua_toLane( L, UD); ASSERT_L( lua_gettop( L) == 2); @@ -2142,10 +2203,10 @@ LUAG_FUNC( thread_index) { // first, check that we don't already have an environment that holds the requested value { - // If key is found in the environment, return it + // If key is found in the uservalue, return it lua_getuservalue( L, UD); lua_pushvalue( L, KEY); - lua_rawget( L, ENV); + lua_rawget( L, USR); if( !lua_isnil( L, -1)) { return 1; @@ -2157,27 +2218,40 @@ LUAG_FUNC( thread_index) bool_t fetched; lua_Integer key = lua_tointeger( L, KEY); lua_pushinteger( L, 0); - lua_rawget( L, ENV); + lua_rawget( L, USR); fetched = !lua_isnil( L, -1); - lua_pop( L, 1); // back to our 2 args + env on the stack + lua_pop( L, 1); // back to our 2 args + uservalue on the stack if( !fetched) { lua_pushinteger( L, 0); lua_pushboolean( L, 1); - lua_rawset( L, ENV); + lua_rawset( L, USR); // wait until thread has completed lua_pushcfunction( L, LG_thread_join); lua_pushvalue( L, UD); lua_call( L, 1, LUA_MULTRET); // all return values are on the stack, at slots 4+ switch( s->status) { + default: + if( s->mstatus != KILLED) + { + // this is an internal error, we probably never get here + lua_settop( L, 0); + lua_pushliteral( L, "Unexpected status: "); + lua_pushstring( L, thread_status_string( s)); + lua_concat( L, 2); + lua_error( L); + break; + } + // fall through if we are killed, as we got nil, "killed" on the stack + case DONE: // got regular return values { int i, nvalues = lua_gettop( L) - 3; for( i = nvalues; i > 0; -- i) { - // pop the last element of the stack, to store it in the environment at its proper index - lua_rawseti( L, ENV, i); + // pop the last element of the stack, to store it in the uservalue at its proper index + lua_rawseti( L, USR, i); } } break; @@ -2190,28 +2264,19 @@ LUAG_FUNC( thread_index) // store errstring at key -1 lua_pushnumber( L, -1); lua_pushvalue( L, 5); - lua_rawset( L, ENV); + lua_rawset( L, USR); break; case CANCELLED: // do nothing break; - - default: - // this is an internal error, we probably never get here - lua_settop( L, 0); - lua_pushliteral( L, "Unexpected status: "); - lua_pushstring( L, thread_status_string( s)); - lua_concat( L, 2); - lua_error( L); - break; } } lua_settop( L, 3); // UD KEY ENV if( key != -1) { lua_pushnumber( L, -1); // UD KEY ENV -1 - lua_rawget( L, ENV); // UD KEY ENV "error" + lua_rawget( L, USR); // UD KEY ENV "error" if( !lua_isnil( L, -1)) // an error was stored { // Note: Lua 5.1 interpreter is not prepared to show @@ -2238,7 +2303,7 @@ LUAG_FUNC( thread_index) lua_pop( L, 1); // back to our 3 arguments on the stack } } - lua_rawgeti( L, ENV, (int)key); + lua_rawgeti( L, USR, (int)key); } return 1; } @@ -2248,17 +2313,18 @@ LUAG_FUNC( thread_index) lua_settop( L, 2); // keep only our original arguments on the stack if( strcmp( keystr, "status") == 0) { - push_thread_status( L, s); // push the string representing the status + return push_thread_status( L, s); // push the string representing the status } - else if( strcmp( keystr, "cancel") == 0 || strcmp( keystr, "join") == 0) + // return UD.metatable[key] + lua_getmetatable( L, UD); // UD KEY mt + lua_replace( L, -3); // mt KEY + lua_rawget( L, -2); // mt value + // only "cancel" and "join" are registered as functions, any other string will raise an error + if( lua_iscfunction( L, -1)) { - // return UD.metatable[key] (should be a function in both cases) - lua_getmetatable( L, UD); // UD KEY mt - lua_replace( L, -3); // mt KEY - lua_rawget( L, -2); // mt value - ASSERT_L( lua_iscfunction( L, -1)); + return 1; } - return 1; + return luaL_error( L, "can't index a lane with '%s'", keystr); } // unknown key lua_getmetatable( L, UD); @@ -2338,14 +2404,14 @@ static const struct luaL_Reg lanes_functions [] = { {"now_secs", LG_now_secs}, {"wakeup_conv", LG_wakeup_conv}, {"nameof", luaG_nameof}, - {"_single", LG__single}, + {"set_singlethreaded", LG_set_singlethreaded}, {NULL, NULL} }; /* * One-time initializations */ -static void init_once_LOCKED( lua_State* L, volatile DEEP_PRELUDE** timer_deep_ref, int const nbKeepers, lua_CFunction _on_state_create) +static void init_once_LOCKED( lua_State* L, volatile DEEP_PRELUDE** timer_deep_ref, int const nbKeepers, lua_CFunction _on_state_create, lua_Number _shutdown_timeout) { const char *err; @@ -2398,7 +2464,7 @@ static void init_once_LOCKED( lua_State* L, volatile DEEP_PRELUDE** timer_deep_r err = init_keepers( nbKeepers, _on_state_create); if (err) { - luaL_error( L, "Unable to initialize: %s", err ); + (void) luaL_error( L, "Unable to initialize: %s", err ); } // Initialize 'timer_deep'; a common Linda object shared by all states @@ -2428,7 +2494,8 @@ static void init_once_LOCKED( lua_State* L, volatile DEEP_PRELUDE** timer_deep_r { lua_newuserdata( L, 1); lua_newtable( L); - lua_pushcfunction( L, selfdestruct_gc); + lua_pushnumber( L, _shutdown_timeout); + lua_pushcclosure( L, selfdestruct_gc, 1); lua_setfield( L, -2, "__gc"); lua_pushliteral( L, "AtExit"); lua_setfield( L, -2, "__metatable"); @@ -2446,10 +2513,10 @@ static volatile long s_initCount = 0; LUAG_FUNC( configure ) { char const* name = luaL_checkstring( L, lua_upvalueindex( 1)); - int const nbKeepers = luaL_optint( L, 1, 1); + // all parameter checks are done lua-side + int const nbKeepers = (int)lua_tointeger( L, 1); lua_CFunction on_state_create = lua_iscfunction( L, 2) ? lua_tocfunction( L, 2) : NULL; - luaL_argcheck( L, nbKeepers > 0, 1, "Number of keeper states must be > 0"); - luaL_argcheck( L, lua_iscfunction( L, 2) || lua_isnil( L, 2), 2, "on_state_create should be a C function"); + lua_Number shutdown_timeout = lua_tonumber( L, 3); /* * Making one-time initializations. * @@ -2462,7 +2529,7 @@ LUAG_FUNC( configure ) static volatile int /*bool*/ go_ahead; // = 0 if( InterlockedCompareExchange( &s_initCount, 1, 0) == 0) { - init_once_LOCKED( L, &timer_deep, nbKeepers, on_state_create); + init_once_LOCKED( L, &timer_deep, nbKeepers, on_state_create, shutdown_timeout); go_ahead= 1; // let others pass } else @@ -2480,7 +2547,7 @@ LUAG_FUNC( configure ) // if( s_initCount == 0) { - init_once_LOCKED( L, &timer_deep, nbKeepers, on_state_create); + init_once_LOCKED( L, &timer_deep, nbKeepers, on_state_create, shutdown_timeout); s_initCount = 1; } } @@ -2525,7 +2592,7 @@ LUAG_FUNC( configure ) lua_setfield(L, -2, "timer_gateway"); lua_pushstring(L, VERSION); - lua_setfield(L, -2, "_version"); + lua_setfield(L, -2, "version"); lua_pushinteger(L, THREAD_PRIO_MAX); lua_setfield(L, -2, "max_prio"); diff --git a/src/lanes.lua b/src/lanes.lua index e6400df..c9ab07d 100644 --- a/src/lanes.lua +++ b/src/lanes.lua @@ -40,55 +40,93 @@ THE SOFTWARE. ]]-- -- Lua 5.1: module() creates a global variable --- Lua 5.2: module() might go away --- almost everything module() does is done by require() +-- Lua 5.2: module() is gone +-- almost everything module() does is done by require() anyway -- -> simply create a table, populate it, return it, and be done local lanes = {} lanes.configure = function( _params) -_params = _params or { nb_keepers = 1, with_timers = true, on_state_create = nil} -if type( _params) ~= "table" then - error( "Bad parameter #1 to lanes.configure(), should be a table") -end --- on_state_create may be nil or a function -if _params.on_state_create and (type( _params.on_state_create) ~= "function") then - error( "Bad on_state_create: " .. tostring( _params.on_state_create), 2) -end -local mm = require "lanes.core" -assert( type(mm)=="table" ) + -- This check is for sublanes requiring Lanes + -- + -- TBD: We could also have the C level expose 'string.gmatch' for us. But this is simpler. + -- + if not string then + error( "To use 'lanes', you will also need to have 'string' available.", 2) + end --- configure() is available only the first time lanes.core is required process-wide, and we *must* call it to have the other functions in the interface -if mm.configure then mm.configure( _params.nb_keepers, _params.on_state_create) end + -- + -- Cache globals for code that might run under sandboxing + -- + local assert = assert + local string_gmatch = assert( string.gmatch) + local select = assert( select) + local type = assert( type) + local pairs = assert( pairs) + local tostring = assert( tostring) + local error = assert( error) + + local default_params = { nb_keepers = 1, on_state_create = nil, shutdown_timeout = 0.25, with_timers = true} + local param_checkers = + { + nb_keepers = function( _val) + -- nb_keepers should be a number > 0 + return type( _val) == "number" and _val > 0 + end, + with_timers = function( _val) + -- with_timers may be nil or boolean + return _val and type( _val) == "boolean" or true + end, + on_state_create = function( _val) + -- on_state_create may be nil or a function + return _val and type( _val) == "function" or true + end, + shutdown_timeout = function( _val) + -- nb_keepers should be a number >= 0 + return type( _val) == "number" and _val >= 0 + end + } -local thread_new = assert(mm.thread_new) + local params_checker = function( _params) + if not _params then + return default_params + end + if type( _params) ~= "table" then + error( "Bad parameter #1 to lanes.configure(), should be a table") + end + -- any setting not present in the provided parameters takes the default value + for key, value in pairs( default_params) do + local my_param = _params[key] + local param + if my_param ~= nil then + param = my_param + else + param = default_params[key] + end + if not param_checkers[key]( param) then + error( "Bad " .. key .. ": " .. tostring( param), 2) + end + _params[key] = param + end + return _params + end -local _single= assert(mm._single) -local _version= assert(mm._version) + _params = params_checker( _params) -local now_secs= assert( mm.now_secs ) -local wakeup_conv= assert( mm.wakeup_conv ) + local core = require "lanes.core" + assert( type( core)=="table") -local max_prio= assert( mm.max_prio ) + -- configure() is available only the first time lanes.core is required process-wide, and we *must* call it to have the other functions in the interface + if core.configure then core.configure( _params.nb_keepers, _params.on_state_create, _params.shutdown_timeout) end --- This check is for sublanes requiring Lanes --- --- TBD: We could also have the C level expose 'string.gmatch' for us. But this is simpler. --- -if not string then - error( "To use 'lanes', you will also need to have 'string' available.", 2 ) -end + local thread_new = assert( core.thread_new) --- --- Cache globals for code that might run under sandboxing --- -local assert= assert -local string_gmatch= assert( string.gmatch ) -local select= assert( select ) -local type= assert( type ) -local pairs= assert( pairs ) -local tostring= assert( tostring ) -local error= assert( error ) + local set_singlethreaded = assert( core.set_singlethreaded) + + local now_secs = assert( core.now_secs) + local wakeup_conv = assert( core.wakeup_conv) + + local max_prio = assert( core.max_prio) lanes.ABOUT= { @@ -96,7 +134,7 @@ lanes.ABOUT= description= "Running multiple Lua states in parallel", license= "MIT/X11", copyright= "Copyright (c) 2007-10, Asko Kauppi; (c) 2011-12, Benoit Germain", - version= _version, + version = assert( core.version) } @@ -258,7 +296,7 @@ end -- lanes.linda(["name"]) -> linda_ud -- -- PUBLIC LANES API -local linda = mm.linda +local linda = core.linda ---=== Timers ===--- @@ -268,7 +306,7 @@ local timer = function() error "timers are not active" end if _params.with_timers ~= false then -local timer_gateway= assert( mm.timer_gateway ) +local timer_gateway = assert( core.timer_gateway) -- -- On first 'require "lanes"', a timer lane is spawned that will maintain -- timer tables and sleep in between the timer events. All interaction with @@ -558,28 +596,23 @@ local function genatomic( linda, key, initial_val ) end end --- newuserdata = mm.newuserdata - -- activate full interface lanes.gen = gen - lanes.linda = mm.linda - lanes.cancel_error = mm.cancel_error - lanes.nameof = mm.nameof + lanes.linda = core.linda + lanes.cancel_error = core.cancel_error + lanes.nameof = core.nameof lanes.timer = timer lanes.genlock = genlock lanes.now_secs = now_secs lanes.genatomic = genatomic -- from now on, calling configure does nothing but checking that we don't call it with parameters that changed compared to the first invocation lanes.configure = function( _params2) - _params2 = _params2 or _params - if _params2.nb_keepers ~= _params.nb_keepers then - error( "mismatched configuration: " .. tostring( _params2.nb_keepers) .. " keepers instead of " .. tostring( _params.nb_keepers)) - end - if _params2.with_timers ~= _params.with_timers then - error( "mismatched configuration: " .. tostring( _params2.with_timers) .. " timer activity instead of " .. tostring( _params.with_timers)) - end - if _params2.on_create_state and _params2.on_create_state ~= _params.on_create_state then - error( "mismatched configuration: " .. tostring( _params2.on_create_state) .. " timer activity instead of " .. tostring( _params.on_create_state)) + _params2 = params_checker( _params2 or _params) + for key, value2 in pairs( _params2) do + local value = _params[key] + if value2 ~= value then + error( "mismatched configuration: " .. key .. " is " .. tostring( value2) .. " instead of " .. tostring( value)) + end end return lanes end @@ -588,4 +621,3 @@ end -- lanes.configure --the end return lanes - diff --git a/src/threading.h b/src/threading.h index b0a3db0..f4f1ada 100644 --- a/src/threading.h +++ b/src/threading.h @@ -35,6 +35,9 @@ typedef unsigned int uint_t; #include /* Note: ERROR is a defined entity on Win32 + PENDING: The Lua VM hasn't done anything yet. + RUNNING, WAITING: Thread is inside the Lua VM. If the thread is forcefully stopped, we can't lua_close() the Lua State. + DONE, ERROR_ST, CANCELLED: Thread execution is outside the Lua VM. It can be lua_close()d. */ enum e_status { PENDING, RUNNING, WAITING, DONE, ERROR_ST, CANCELLED }; diff --git a/tests/fibonacci.lua b/tests/fibonacci.lua index 452a770..48cd8d7 100644 --- a/tests/fibonacci.lua +++ b/tests/fibonacci.lua @@ -4,7 +4,7 @@ -- Parallel calculation of Fibonacci numbers -- -- A sample of task splitting like Intel TBB library does. --- +-- -- References: -- Intel Threading Building Blocks, 'test all' -- @@ -41,7 +41,7 @@ local function fib( n ) -- -- note that lanes is pulled in as upvalue, so we need package library to require internals properly -- (because lua51-lanes is always required internally if possible, which is necessary in that case) - local gen_f= lanes.gen( "package,string,io,math,debug", fib ) + local gen_f= lanes.gen( "*", fib ) local n1=floor(n/2) +1 local n2=floor(n/2) -1 + n%2 @@ -60,7 +60,7 @@ local function fib( n ) end io.stderr:write( "fib("..n..") = "..sum.."\n" ) - + return sum end diff --git a/tests/finalizer.lua b/tests/finalizer.lua index 6f186ab..dc9ed34 100644 --- a/tests/finalizer.lua +++ b/tests/finalizer.lua @@ -31,6 +31,7 @@ local function lane() io.stderr:write( "File "..FN.." created\n" ) if which==0 then + print "you loose" error("aa") -- exception here; the value needs NOT be a string end @@ -55,6 +56,7 @@ cleanup= function(err) end local _,err2= os.remove(FN) + print( "file removal result: ", tostring( err2)) assert(not err2) -- if this fails, it will be shown in the calling script -- as an error from the lane itself -- cgit v1.2.3-55-g6feb